Skip to content

lythonic.compose.namespace

Hierarchical registry of callables with metadata and DAG composition.

Namespace: Hierarchical registry of callables with metadata and DAG composition.

Provides Namespace for registering callables wrapped in NamespaceNode (which wraps Method), using GlobalRef-style paths ("branch.sub:leaf"). Includes DagContext as base context for DAG-participating callables.

Path Scheme

  • . separates namespace levels (branches)
  • : separates the leaf callable from its namespace
  • Example: "market.data:fetch_prices" -> branch market.data, leaf fetch_prices

Usage

Each Dag owns a Namespace. Passing a callable to dag.node() auto-registers it in the DAG's namespace (derived from the callable's module and name):

from lythonic.compose.namespace import Dag

def fetch(url: str) -> dict:
    return {"source": url, "raw": [1, 2, 3]}

def transform(data: dict) -> dict:
    return {"source": data["source"], "values": [v * 2 for v in data["raw"]]}

dag = Dag()
dag.node(fetch) >> dag.node(transform)

You can also register callables in a standalone Namespace and look them up by path or dot-access (e.g. ns.get("market:fetch") or ns.market.fetch).

Tags

Nodes can be tagged at registration for later querying:

ns.register(fetch_prices, nsref="market:fetch", tags={"slow", "market-data"})
ns.register(compute_stats, nsref="market:stats", tags={"fast", "market-data"})

ns.query("market-data")              # both nodes
ns.query("slow & market-data")       # only fetch
ns.query("~slow")                    # only stats
ns.query("slow | fast")              # both nodes

Tag expressions support & (AND), | (OR), ~ (NOT) with standard precedence (~ > & > |). Tags must not contain spaces or operator characters.

Map

Dag.map(sub_dag, label) creates a MapNode that runs a sub-DAG on each element of an upstream list or dict, concurrently via asyncio.gather:

sub_dag = Dag()
sub_dag.node(tokenize) >> sub_dag.node(count)

parent = Dag()
parent.node(split_text) >> parent.map(sub_dag, label="chunks") >> parent.node(reduce)

The sub-DAG must have exactly one source and one sink.

FlatMap behavior: When the input is a list and a sub-DAG returns a list, its elements are flattened into the parent result rather than nested. Scalar results are appended as before. This does not apply to dict inputs.

Switch Nodes

Dag.switch(branches, label) creates a SwitchNode that routes data to one of several branch DAGs based on a LabelSwitch string extracted from the upstream output. The upstream must return either a dict with a "__switch__" key or a tuple/list where the first element is the label.

Branches can be DAGs, @dag_factory functions, or plain callables. When a list is passed, labels are auto-derived from function/factory names:

dag = Dag()
dag.node(classify) >> dag.switch([handle_text, handle_image], label="router")

With a dict for explicit labels:

dag.switch({"text": text_dag, "image": image_dag}, label="router")

MapSwitch

dag.map(dag.switch(...)) creates a MapSwitchNode that maps over a collection, routing each element through the switch by its LabelSwitch value. Each element must provide a switch label (same protocol as SwitchNode):

dag = Dag()
dag.node(split_items) >> dag.map(
    dag.switch([process_a, process_b], label="route"),
    label="map_route",
) >> dag.node(merge)

DagContext Namespace Access

DagContext provides access to the mounted Namespace so callables can invoke other registered nodes (including cached ones) at runtime:

async def my_task(ctx: DagContext) -> dict:
    # Sync call (sync targets only, raises TypeError on async)
    price = ctx.ns_call("market:get_price", ticker="AAPL")

    # Async call (handles async, sync, and @inline callables)
    volume = await ctx.ns_acall("market:get_volume", ticker="AAPL")

    return {"price": price, "volume": volume}

ns_call is for sync contexts — it raises TypeError if the target is async, directing the caller to use ns_acall. ns_acall handles all callable types: async targets are awaited, @inline sync targets run directly, and blocking sync targets are dispatched to a thread executor.

Trigger Shorthand

TriggerConfig accepts a bare cron string as shorthand for a poll trigger. Trigger names are auto-generated from the node's nsref leaf when omitted:

namespace:
  - gref: "myapp:task1"
    triggers:
      - "*/30 * * * * *"          # expands to {type: poll, schedule: ...}
      - name: "custom_name"       # explicit name
        schedule: "0 0 * * *"

Singleton DAG

Namespace.get_as_dag(nsref) returns the underlying DAG for DAG nodes or wraps plain callables in a single-node DAG (with parent_namespace set), providing uniform execution.

Composable DAGs

dag.node(sub_dag, label="enrich") creates a CallNode that runs a sub-DAG once as a single step, passing the upstream output to the sub-DAG's source:

enrich_dag = Dag()
enrich_dag.node(lookup) >> enrich_dag.node(annotate)

parent = Dag()
parent.node(fetch) >> parent.node(enrich_dag, label="enrich") >> parent.node(save)

Callable DAGs

A Dag is directly callable. await dag(**kwargs) creates a DagRunner with NullProvenance and returns a DagRunResult. Kwargs are matched to source node parameters by name:

result = await dag(url="https://example.com")
print(result.status)   # "completed"
print(result.outputs)  # sink node outputs

DAG Factories

Use @dag_factory to mark a function that builds a DAG. When registered in a namespace, the factory is called and the resulting DAG is registered with a __ suffix:

from lythonic.compose.namespace import dag_factory, Dag

@dag_factory
def my_pipeline() -> Dag:
    dag = Dag()
    dag.node(step1) >> dag.node(step2)
    return dag

ns.register(my_pipeline)  # registers DAG under "module:my_pipeline__"

Namespace Fragments

A NamespaceFragment groups related methods under a shared nsref prefix. Subclass it and decorate methods with @nsnode(tags=[...]) for discovery. Use @require_cache to enforce cache configuration at registration time.

namespace:
  - type: fragment
    gref: "myapp.downloads:DownloadFragment"
    nsref: "downloads:"
    init:
      api_key: "abc123"
    configs:
      fetch_prices:
        min_ttl: 0.5
        max_ttl: 2.0

Modules can also serve as fragments — decorate module-level functions with @nsnode or @dag_factory, and reference the module path without a : separator:

  - type: fragment
    gref: "myapp.transforms"
    nsref: "transforms:"

LabelSwitch = str module-attribute

Routing key for SwitchNode — the value selects which branch DAG to run.

Namespace

Flat registry of callables wrapped in NamespaceNode. Uses GlobalRef-style paths as keys: "module.sub:callable_name".

Source code in src/lythonic/compose/namespace.py
class Namespace:
    """
    Flat registry of callables wrapped in `NamespaceNode`.
    Uses GlobalRef-style paths as keys: `"module.sub:callable_name"`.
    """

    _nodes: dict[str, NamespaceNode]

    def __init__(self) -> None:
        self._nodes = {}
        self._storage: Any = None
        self._provenance: Any = None

    @property
    def is_mounted(self) -> bool:
        """True if mount() has been called."""
        return self._storage is not None

    @property
    def requires_mount(self) -> bool:
        """True if any node has cache config, triggers, or @mount_required."""
        for node in self._nodes.values():
            if isinstance(node.config, NsCacheConfig):
                return True
            if node.config.triggers:
                return True
            if getattr(node.method.o, "_is_mount_required", False):
                return True
        return False

    @property
    def has_mountable(self) -> bool:
        """True if any node would alter behavior when mounted."""
        if self.requires_mount:
            return True
        for node in self._nodes.values():
            if node.dag is not None:
                return True
            if getattr(node.method.o, "_is_mountable", False):
                return True
        return False

    def mount(self, storage: Any) -> None:
        """Activate persistence features for all declared nodes."""
        self._storage = storage

        if storage.dags_db is not None:
            from lythonic.compose.dag_provenance import DagProvenance

            self._provenance = DagProvenance(storage.dags_db)

        if storage.cache_db is not None:
            from lythonic.compose.cached import mount_cached_node

            for node in self._nodes.values():
                if isinstance(node.config, NsCacheConfig):
                    mount_cached_node(node, storage.cache_db)

        storage.setup_logging()

    def register(
        self,
        c: Callable[..., Any] | str | Dag,
        nsref: str | NsRef | None = None,
        decorate: Callable[[Callable[..., Any]], Callable[..., Any]] | None = None,
        tags: frozenset[str] | set[str] | list[str] | None = None,
        config: NsNodeConfig | None = None,
    ) -> NamespaceNode:
        """
        Register a callable. If `nsref` is `None`, derive from the callable's
        module and name. If the callable is marked with `@dag_factory`,
        call it and register the returned DAG with a `__` suffix. If
        `decorate` is provided, wrap the callable for invocation (metadata
        is still extracted from the original). Optional `tags` are stored
        on the node for querying via `query()`.
        """
        if isinstance(c, Dag):
            return self._register_dag(c, nsref, tags=tags, config=config)

        # Enforce @require_cache: callable must have a NsCacheConfig.
        if callable(c) and getattr(c, "_require_cache", False):
            if not isinstance(config, NsCacheConfig):
                name = getattr(c, "__name__", str(c))
                raise ValueError(
                    f"'{name}' is decorated with @require_cache but no NsCacheConfig was provided"
                )

        # Handle @dag_factory decorated callables
        if callable(c) and getattr(c, "_is_dag_factory", False):
            gref = GlobalRef(c)
            factory_nsref = nsref or f"{gref.module}:{gref.name}__"
            dag = c()
            if not isinstance(dag, Dag):
                raise TypeError(f"@dag_factory {gref} must return a Dag, got {type(dag).__name__}")
            return self._register_dag(dag, factory_nsref, tags=tags, config=config)

        gref = GlobalRef(c)

        if nsref is None:
            nsref = f"{gref.module}:{gref.name}"

        key = str(nsref)

        # Pass c directly (not gref) so Method._o is set immediately, preserving
        # any decorator attributes (e.g. _is_mount_required) on the callable.
        method = Method(c) if callable(c) else Method(gref)
        decorated = decorate(method.o) if decorate else None

        if key in self._nodes:
            raise ValueError(f"'{key}' already exists in namespace")

        node = NamespaceNode(
            method=method,
            nsref=nsref,
            namespace=self,
            decorated=decorated,
            tags=tags,
            config=config,
        )
        self._nodes[key] = node
        return node

    def _register_dag(
        self,
        dag: Dag,
        nsref: str | NsRef | None,
        tags: frozenset[str] | set[str] | list[str] | None = None,
        config: NsNodeConfig | None = None,
    ) -> NamespaceNode:
        """
        Register a Dag as a callable NamespaceNode. Sets the DAG's
        `parent_namespace` to this namespace for runtime resolution.
        The DAG keeps its own namespace; callables are not copied.
        """
        if nsref is None:
            raise ValueError("nsref is required when registering a Dag")

        key = str(nsref)

        if dag.parent_namespace is self:
            raise ValueError(f"Dag '{key}' is already registered with this namespace")

        dag.parent_namespace = self

        ns_ref = self

        async def dag_wrapper(**kwargs: Any) -> Any:
            from lythonic.compose.dag_runner import DagRunner  # pyright: ignore[reportImportCycles]

            runner = DagRunner(dag, provenance=ns_ref._provenance)
            source_labels = {n.label for n in dag.sources()}
            source_inputs: dict[str, dict[str, Any]] = {}
            for label in source_labels:
                # If a kwarg key matches a source label and its value is a dict,
                # use it directly as that node's inputs.
                if label in kwargs and isinstance(kwargs[label], dict):
                    source_inputs[label] = kwargs[label]
                    continue
                node = dag.nodes[label]
                node_args = node.ns_node.method.args
                if node.ns_node.expects_dag_context():
                    node_args = node_args[1:]
                node_kwargs = {a.name: kwargs[a.name] for a in node_args if a.name in kwargs}
                if node_kwargs:
                    source_inputs[label] = node_kwargs
            return await runner.run(source_inputs=source_inputs, dag_nsref=key)

        method = Method(dag_wrapper)

        if key in self._nodes:
            raise ValueError(f"'{key}' already exists in namespace")

        node = NamespaceNode(
            method=method, nsref=nsref, namespace=self, tags=tags, config=config, dag=dag
        )
        self._nodes[key] = node
        return node

    def get(self, nsref: str | NsRef) -> NamespaceNode:
        """Retrieve a node by nsref. Raises `KeyError` if not found."""
        key = str(nsref)
        if key not in self._nodes:
            raise KeyError(f"'{key}' not found in namespace")
        return self._nodes[key]

    def get_as_dag(self, nsref: str | NsRef) -> Dag:
        """
        Return a DAG for any node. If the node wraps a DAG (registered
        via `_register_dag`), return that DAG. Otherwise create a
        single-node DAG wrapping the callable.
        """
        node = self.get(nsref)
        if node.dag is not None:
            return node.dag
        # Wrap plain callable in a single-node DAG
        dag = Dag()
        dag.node(node)
        dag.parent_namespace = self
        return dag

    def register_all(
        self,
        *cc: Callable[..., Any],
        decorate: Callable[[Callable[..., Any]], Callable[..., Any]] | None = None,
        tags: frozenset[str] | set[str] | list[str] | None = None,
    ) -> list[NamespaceNode]:
        """Bulk register callables using derived paths."""
        return [self.register(c, decorate=decorate, tags=tags) for c in cc]

    def _all_leaves(self) -> list[NamespaceNode]:
        """Return all registered nodes."""
        return list(self._nodes.values())

    def query(self, expr: str) -> list[NamespaceNode]:
        """
        Return all nodes matching a tag expression. Supports `&` (AND),
        `|` (OR), `~` (NOT) with standard precedence (`~` > `&` > `|`).
        """
        tokens = _parse_tag_expr(expr)
        return [node for node in self._all_leaves() if _eval_tag_expr(tokens, node.tags)]

    def get_trigger(self, name: str) -> tuple[NamespaceNode, TriggerConfig]:
        """Find a trigger by name across all nodes. Returns (node, trigger_config)."""
        for node in self._nodes.values():
            for tc in node.config.triggers:
                if tc.name == name:
                    return node, tc
        raise KeyError(f"Trigger '{name}' not found")

    def to_dict(self) -> list[dict[str, Any]]:
        """Serialize all node configs to a list of dicts (YAML/JSON-ready)."""
        return [node.config.model_dump(exclude_none=True) for node in self._all_leaves()]

    @classmethod
    def from_dict(cls, entries: list[dict[str, Any]]) -> Namespace:
        """Build a Namespace from a list of serialized node configs."""
        ns = cls()
        for entry in entries:
            entry_type = entry.get("type", "auto")
            if entry_type == "fragment":
                ns._register_fragment(entry)
                continue
            config_cls = _CONFIG_TYPES.get(entry_type, NsNodeConfig)
            config = config_cls.model_validate(entry)
            _auto_fill_trigger_names(config)
            if config.gref is not None:
                gref = GlobalRef(config.gref)
                instance = gref.get_instance()
                ns.register(instance, nsref=config.nsref, config=config)
            else:
                ns.register(lambda: None, nsref=config.nsref, config=config)
        return ns

    def _register_fragment(self, entry: dict[str, Any]) -> None:
        """Register all discovered methods from a fragment config entry."""
        config = NsFragmentConfig.model_validate(entry)
        if config.gref is None:
            raise ValueError("Fragment config must have a gref")

        gref = GlobalRef(config.gref)
        prefix = str(config.nsref) if config.nsref else ""
        if not prefix.endswith(":"):
            raise ValueError(f"Fragment nsref must end with ':', got {prefix!r}")

        # Resolve to module or class instance
        if gref.is_module():
            if config.init:
                raise ValueError(
                    f"Module fragment '{gref}' cannot have 'init' (modules are not instantiated)"
                )
            target = gref.get_module()
        else:
            resolved = gref.get_instance()
            if isinstance(resolved, type) and issubclass(resolved, NamespaceFragment):
                target = resolved(**config.init)
            elif isinstance(resolved, type):
                raise TypeError(f"Class '{gref}' must be a NamespaceFragment subclass")
            else:
                raise TypeError(
                    f"Fragment gref must resolve to a module or NamespaceFragment "
                    f"subclass, got {type(resolved).__name__}"
                )

        methods = _discover_fragment_methods(target)

        # Warn about configs entries that don\'t match any discovered method
        method_names = {name for name, *_ in methods}
        for cfg_name in config.configs:
            if cfg_name not in method_names:
                logging.getLogger(__name__).warning(
                    "Fragment '%s' configs entry '%s' does not match any discovered method",
                    gref,
                    cfg_name,
                )

        for name, method_callable, is_factory, tags, needs_cache in methods:
            method_config_dict = config.configs.get(name, {})

            has_ttl = "min_ttl" in method_config_dict and "max_ttl" in method_config_dict

            if is_factory:
                method_nsref = f"{prefix}{name}__"
            else:
                method_nsref = f"{prefix}{name}"

            if has_ttl:
                node_config: NsNodeConfig = NsCacheConfig(
                    nsref=method_nsref,
                    min_ttl=method_config_dict["min_ttl"],
                    max_ttl=method_config_dict["max_ttl"],
                )
            else:
                node_config = NsNodeConfig(nsref=method_nsref)

            # Apply tags to the config
            if tags:
                node_config.tags = tags

            if "triggers" in method_config_dict:
                triggers = [TriggerConfig.model_validate(t) for t in method_config_dict["triggers"]]
                node_config.triggers = triggers
                _auto_fill_trigger_names(node_config)

            # Enforce @require_cache
            if needs_cache and not isinstance(node_config, NsCacheConfig):
                raise ValueError(
                    f"Fragment method '{name}' is decorated with @require_cache "
                    f"but no min_ttl/max_ttl found in configs['{name}']"
                )

            if is_factory:
                dag_result = method_callable()
                if not isinstance(dag_result, Dag):
                    raise TypeError(
                        f"@dag_factory '{name}' must return a Dag, got {type(dag_result).__name__}"
                    )
                self._register_dag(dag_result, method_nsref, tags=tags, config=node_config)
            else:
                method_obj = Method(method_callable, name=name)
                key = method_nsref
                if key in self._nodes:
                    raise ValueError(f"'{key}' already exists in namespace")
                node = NamespaceNode(
                    method=method_obj,
                    nsref=method_nsref,
                    namespace=self,
                    config=node_config,
                )
                self._nodes[key] = node

is_mounted property

True if mount() has been called.

requires_mount property

True if any node has cache config, triggers, or @mount_required.

has_mountable property

True if any node would alter behavior when mounted.

mount(storage)

Activate persistence features for all declared nodes.

Source code in src/lythonic/compose/namespace.py
def mount(self, storage: Any) -> None:
    """Activate persistence features for all declared nodes."""
    self._storage = storage

    if storage.dags_db is not None:
        from lythonic.compose.dag_provenance import DagProvenance

        self._provenance = DagProvenance(storage.dags_db)

    if storage.cache_db is not None:
        from lythonic.compose.cached import mount_cached_node

        for node in self._nodes.values():
            if isinstance(node.config, NsCacheConfig):
                mount_cached_node(node, storage.cache_db)

    storage.setup_logging()

register(c, nsref=None, decorate=None, tags=None, config=None)

Register a callable. If nsref is None, derive from the callable's module and name. If the callable is marked with @dag_factory, call it and register the returned DAG with a __ suffix. If decorate is provided, wrap the callable for invocation (metadata is still extracted from the original). Optional tags are stored on the node for querying via query().

Source code in src/lythonic/compose/namespace.py
def register(
    self,
    c: Callable[..., Any] | str | Dag,
    nsref: str | NsRef | None = None,
    decorate: Callable[[Callable[..., Any]], Callable[..., Any]] | None = None,
    tags: frozenset[str] | set[str] | list[str] | None = None,
    config: NsNodeConfig | None = None,
) -> NamespaceNode:
    """
    Register a callable. If `nsref` is `None`, derive from the callable's
    module and name. If the callable is marked with `@dag_factory`,
    call it and register the returned DAG with a `__` suffix. If
    `decorate` is provided, wrap the callable for invocation (metadata
    is still extracted from the original). Optional `tags` are stored
    on the node for querying via `query()`.
    """
    if isinstance(c, Dag):
        return self._register_dag(c, nsref, tags=tags, config=config)

    # Enforce @require_cache: callable must have a NsCacheConfig.
    if callable(c) and getattr(c, "_require_cache", False):
        if not isinstance(config, NsCacheConfig):
            name = getattr(c, "__name__", str(c))
            raise ValueError(
                f"'{name}' is decorated with @require_cache but no NsCacheConfig was provided"
            )

    # Handle @dag_factory decorated callables
    if callable(c) and getattr(c, "_is_dag_factory", False):
        gref = GlobalRef(c)
        factory_nsref = nsref or f"{gref.module}:{gref.name}__"
        dag = c()
        if not isinstance(dag, Dag):
            raise TypeError(f"@dag_factory {gref} must return a Dag, got {type(dag).__name__}")
        return self._register_dag(dag, factory_nsref, tags=tags, config=config)

    gref = GlobalRef(c)

    if nsref is None:
        nsref = f"{gref.module}:{gref.name}"

    key = str(nsref)

    # Pass c directly (not gref) so Method._o is set immediately, preserving
    # any decorator attributes (e.g. _is_mount_required) on the callable.
    method = Method(c) if callable(c) else Method(gref)
    decorated = decorate(method.o) if decorate else None

    if key in self._nodes:
        raise ValueError(f"'{key}' already exists in namespace")

    node = NamespaceNode(
        method=method,
        nsref=nsref,
        namespace=self,
        decorated=decorated,
        tags=tags,
        config=config,
    )
    self._nodes[key] = node
    return node

get(nsref)

Retrieve a node by nsref. Raises KeyError if not found.

Source code in src/lythonic/compose/namespace.py
def get(self, nsref: str | NsRef) -> NamespaceNode:
    """Retrieve a node by nsref. Raises `KeyError` if not found."""
    key = str(nsref)
    if key not in self._nodes:
        raise KeyError(f"'{key}' not found in namespace")
    return self._nodes[key]

get_as_dag(nsref)

Return a DAG for any node. If the node wraps a DAG (registered via _register_dag), return that DAG. Otherwise create a single-node DAG wrapping the callable.

Source code in src/lythonic/compose/namespace.py
def get_as_dag(self, nsref: str | NsRef) -> Dag:
    """
    Return a DAG for any node. If the node wraps a DAG (registered
    via `_register_dag`), return that DAG. Otherwise create a
    single-node DAG wrapping the callable.
    """
    node = self.get(nsref)
    if node.dag is not None:
        return node.dag
    # Wrap plain callable in a single-node DAG
    dag = Dag()
    dag.node(node)
    dag.parent_namespace = self
    return dag

register_all(*cc, decorate=None, tags=None)

Bulk register callables using derived paths.

Source code in src/lythonic/compose/namespace.py
def register_all(
    self,
    *cc: Callable[..., Any],
    decorate: Callable[[Callable[..., Any]], Callable[..., Any]] | None = None,
    tags: frozenset[str] | set[str] | list[str] | None = None,
) -> list[NamespaceNode]:
    """Bulk register callables using derived paths."""
    return [self.register(c, decorate=decorate, tags=tags) for c in cc]

query(expr)

Return all nodes matching a tag expression. Supports & (AND), | (OR), ~ (NOT) with standard precedence (~ > & > |).

Source code in src/lythonic/compose/namespace.py
def query(self, expr: str) -> list[NamespaceNode]:
    """
    Return all nodes matching a tag expression. Supports `&` (AND),
    `|` (OR), `~` (NOT) with standard precedence (`~` > `&` > `|`).
    """
    tokens = _parse_tag_expr(expr)
    return [node for node in self._all_leaves() if _eval_tag_expr(tokens, node.tags)]

get_trigger(name)

Find a trigger by name across all nodes. Returns (node, trigger_config).

Source code in src/lythonic/compose/namespace.py
def get_trigger(self, name: str) -> tuple[NamespaceNode, TriggerConfig]:
    """Find a trigger by name across all nodes. Returns (node, trigger_config)."""
    for node in self._nodes.values():
        for tc in node.config.triggers:
            if tc.name == name:
                return node, tc
    raise KeyError(f"Trigger '{name}' not found")

to_dict()

Serialize all node configs to a list of dicts (YAML/JSON-ready).

Source code in src/lythonic/compose/namespace.py
def to_dict(self) -> list[dict[str, Any]]:
    """Serialize all node configs to a list of dicts (YAML/JSON-ready)."""
    return [node.config.model_dump(exclude_none=True) for node in self._all_leaves()]

from_dict(entries) classmethod

Build a Namespace from a list of serialized node configs.

Source code in src/lythonic/compose/namespace.py
@classmethod
def from_dict(cls, entries: list[dict[str, Any]]) -> Namespace:
    """Build a Namespace from a list of serialized node configs."""
    ns = cls()
    for entry in entries:
        entry_type = entry.get("type", "auto")
        if entry_type == "fragment":
            ns._register_fragment(entry)
            continue
        config_cls = _CONFIG_TYPES.get(entry_type, NsNodeConfig)
        config = config_cls.model_validate(entry)
        _auto_fill_trigger_names(config)
        if config.gref is not None:
            gref = GlobalRef(config.gref)
            instance = gref.get_instance()
            ns.register(instance, nsref=config.nsref, config=config)
        else:
            ns.register(lambda: None, nsref=config.nsref, config=config)
    return ns

NamespaceNode

Wraps a Method with namespace identity. Callable -- delegates to the decorated callable if present, otherwise to method.o.

Source code in src/lythonic/compose/namespace.py
class NamespaceNode:
    """
    Wraps a `Method` with namespace identity. Callable -- delegates to the
    decorated callable if present, otherwise to `method.o`.
    """

    method: Method
    namespace: Namespace | None
    config: NsNodeConfig
    _decorated: Callable[..., Any] | None

    def __init__(
        self,
        method: Method,
        nsref: str | NsRef | None = None,
        namespace: Namespace | None = None,
        decorated: Callable[..., Any] | None = None,
        tags: frozenset[str] | set[str] | list[str] | None = None,
        config: NsNodeConfig | None = None,
        dag: Dag | None = None,
    ) -> None:
        self.method = method
        self.namespace = namespace
        self._decorated = decorated
        self.dag: Dag | None = dag
        self.metadata: dict[str, Any] = {}
        validated_tags = _validate_tags(tags)
        self.config = config or NsNodeConfig(
            nsref=NsRef(nsref) if isinstance(nsref, str) else nsref,
            gref=method.gref if method.gref else None,
            tags=sorted(validated_tags) if validated_tags else None,
        )

    @property
    def nsref(self) -> NsRef:
        ref = self.config.nsref
        assert isinstance(ref, NsRef)
        return ref

    @property
    def tags(self) -> frozenset[str]:
        return frozenset(self.config.tags) if self.config.tags else frozenset()

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        if self._decorated is not None:
            return self._decorated(*args, **kwargs)
        return self.method(*args, **kwargs)

    def expects_dag_context(self) -> bool:
        """True if first parameter is `DagContext` or a subclass."""
        return self.dag_context_type() is not None

    def dag_context_type(self) -> type[DagContext] | None:
        """Return the `DagContext` subclass expected, or `None`."""
        resolved = _resolve_first_param_type(self.method.o)
        if resolved is not None and issubclass(resolved, DagContext):
            return resolved
        return None

    def __repr__(self) -> str:  # pyright: ignore[reportImplicitOverride]
        return f"NamespaceNode(nsref={self.nsref!r})"

expects_dag_context()

True if first parameter is DagContext or a subclass.

Source code in src/lythonic/compose/namespace.py
def expects_dag_context(self) -> bool:
    """True if first parameter is `DagContext` or a subclass."""
    return self.dag_context_type() is not None

dag_context_type()

Return the DagContext subclass expected, or None.

Source code in src/lythonic/compose/namespace.py
def dag_context_type(self) -> type[DagContext] | None:
    """Return the `DagContext` subclass expected, or `None`."""
    resolved = _resolve_first_param_type(self.method.o)
    if resolved is not None and issubclass(resolved, DagContext):
        return resolved
    return None

DagContext

Bases: BaseModel

Base context injected into DAG-participating callables. Subclass to add domain-specific fields.

The namespace field provides access to the mounted Namespace, allowing callables to look up and invoke other registered nodes (including cached ones). Excluded from serialization.

Source code in src/lythonic/compose/namespace.py
class DagContext(BaseModel):
    """
    Base context injected into DAG-participating callables.
    Subclass to add domain-specific fields.

    The `namespace` field provides access to the mounted `Namespace`,
    allowing callables to look up and invoke other registered nodes
    (including cached ones). Excluded from serialization.
    """

    model_config: typing.ClassVar[ConfigDict] = ConfigDict(arbitrary_types_allowed=True)  # pyright: ignore[reportIncompatibleVariableOverride]

    dag_nsref: DagPath
    node_label: str
    run_id: str
    namespace: Any = PydanticField(default=None, exclude=True)

    def _resolve_node(self, nsref: str) -> NamespaceNode:
        if self.namespace is None:
            raise RuntimeError("DagContext has no namespace (not mounted)")
        return self.namespace.get(nsref)

    def ns_call(self, nsref: str, *args: Any, **kwargs: Any) -> Any:
        """Call a sync registered node by nsref. Raises on async targets."""
        node = self._resolve_node(nsref)
        import asyncio as _asyncio

        if _asyncio.iscoroutinefunction(node.method.o):
            raise TypeError(
                f"'{nsref}' is async — use await ctx.ns_acall(\"{nsref}\", ...) instead"
            )
        return node(*args, **kwargs)

    async def ns_acall(self, nsref: str, *args: Any, **kwargs: Any) -> Any:
        """Call a registered node by nsref from an async context. Handles both
        async and sync callables (sync dispatched to executor unless @inline)."""
        import asyncio
        import functools

        node = self._resolve_node(nsref)
        fn = node._decorated or node.method.o  # pyright: ignore[reportPrivateUsage]
        if asyncio.iscoroutinefunction(fn):
            return await fn(*args, **kwargs)
        if getattr(fn, "_lythonic_inline", False):
            return fn(*args, **kwargs)
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, functools.partial(fn, *args, **kwargs))

ns_call(nsref, *args, **kwargs)

Call a sync registered node by nsref. Raises on async targets.

Source code in src/lythonic/compose/namespace.py
def ns_call(self, nsref: str, *args: Any, **kwargs: Any) -> Any:
    """Call a sync registered node by nsref. Raises on async targets."""
    node = self._resolve_node(nsref)
    import asyncio as _asyncio

    if _asyncio.iscoroutinefunction(node.method.o):
        raise TypeError(
            f"'{nsref}' is async — use await ctx.ns_acall(\"{nsref}\", ...) instead"
        )
    return node(*args, **kwargs)

ns_acall(nsref, *args, **kwargs) async

Call a registered node by nsref from an async context. Handles both async and sync callables (sync dispatched to executor unless @inline).

Source code in src/lythonic/compose/namespace.py
async def ns_acall(self, nsref: str, *args: Any, **kwargs: Any) -> Any:
    """Call a registered node by nsref from an async context. Handles both
    async and sync callables (sync dispatched to executor unless @inline)."""
    import asyncio
    import functools

    node = self._resolve_node(nsref)
    fn = node._decorated or node.method.o  # pyright: ignore[reportPrivateUsage]
    if asyncio.iscoroutinefunction(fn):
        return await fn(*args, **kwargs)
    if getattr(fn, "_lythonic_inline", False):
        return fn(*args, **kwargs)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, functools.partial(fn, *args, **kwargs))

Dag

Directed acyclic graph of DagNodes with type-based validation. Use node() to add nodes and >> to declare edges.

Source code in src/lythonic/compose/namespace.py
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
class Dag:
    """
    Directed acyclic graph of `DagNode`s with type-based validation.
    Use `node()` to add nodes and `>>` to declare edges.
    """

    nodes: dict[str, DagNode]
    edges: list[DagEdge]
    namespace: Namespace

    def __init__(self) -> None:
        self.nodes = {}
        self.edges = []
        self.namespace = Namespace()
        self.parent_namespace: Namespace | None = None

    def resolve(self, nsref: str | NsRef) -> NamespaceNode:
        """
        Resolve a callable by nsref. Tries `parent_namespace` first
        (if set), then falls back to the DAG's own namespace.
        """
        if self.parent_namespace is not None:
            try:
                return self.parent_namespace.get(nsref)
            except KeyError:
                pass
        return self.namespace.get(nsref)

    def node(
        self,
        source: NamespaceNode | Callable[..., Any] | Dag,
        label: str | None = None,
    ) -> DagNode:
        """
        Create a unique `DagNode` in this graph. If `label` is `None`,
        derived from the source's nsref leaf name. Raises `ValueError`
        if the label already exists.

        When `source` is a `Dag` or `@dag_factory`, creates a `CallNode`
        that runs the sub-DAG as a single step. Label is auto-derived
        from the factory function name if not provided.
        """
        # Expand @dag_factory to a Dag
        if callable(source) and getattr(source, "_is_dag_factory", False):
            if label is None:
                label = str(getattr(source, "__name__", ""))
            factory_result = source()  # pyright: ignore[reportCallIssue]
            if not isinstance(factory_result, Dag):
                raise TypeError(
                    f"@dag_factory must return a Dag, got {type(factory_result).__name__}"
                )
            source = factory_result

        if isinstance(source, Dag):
            if label is None:
                raise ValueError("label is required when passing a Dag to node()")
            if label in self.nodes:
                raise ValueError(
                    f"Label '{label}' already exists in DAG. "
                    f"Use an explicit label for duplicate callables."
                )
            call_node = CallNode(sub_dag=source, label=label, dag=self)
            self.nodes[label] = call_node
            return call_node

        if isinstance(source, NamespaceNode):
            ns_node = source
        else:
            gref = GlobalRef(source)
            try:
                ns_node = self.namespace.get(str(gref))
            except KeyError:
                ns_node = self.namespace.register(source)

        if label is None:
            label = ns_node.nsref.name

        if label in self.nodes:
            raise ValueError(
                f"Label '{label}' already exists in DAG. "
                f"Use an explicit label for duplicate callables."
            )

        dag_node = DagNode(ns_node=ns_node, label=label, dag=self)
        self.nodes[label] = dag_node
        return dag_node

    def map(
        self,
        sub_dag: Dag | SwitchNode | Callable[..., Any],
        label: str | None = None,
    ) -> MapNode:
        """
        Create a `MapNode` that runs `sub_dag` on each element of an
        upstream collection. The sub-DAG must have exactly one source
        and one sink. Accepts a `@dag_factory` function, `SwitchNode`,
        or `Dag`. When a `SwitchNode` is passed, it is removed from the
        parent DAG and wrapped as the map's sub-processing.
        """
        # Expand @dag_factory to a Dag
        if callable(sub_dag) and getattr(sub_dag, "_is_dag_factory", False):
            if label is None:
                label = str(getattr(sub_dag, "__name__", ""))
            factory_result = sub_dag()  # pyright: ignore[reportCallIssue]
            if not isinstance(factory_result, Dag):
                raise TypeError(
                    f"@dag_factory must return a Dag, got {type(factory_result).__name__}"
                )
            sub_dag = factory_result

        # SwitchNode passed directly — remove from parent, wrap in a MapSwitchNode
        if isinstance(sub_dag, SwitchNode):
            switch_node = sub_dag
            if label is None:
                label = switch_node.label
            # Remove switch from parent DAG since it's now inside the map
            self.nodes.pop(switch_node.label, None)
            if not label:
                raise ValueError("label is required for map()")
            if label in self.nodes:
                raise ValueError(f"Label '{label}' already exists in DAG.")
            map_switch = MapSwitchNode(switch_node=switch_node, label=label, dag=self)
            self.nodes[label] = map_switch
            return map_switch  # pyright: ignore[reportReturnType]

        if not label:
            raise ValueError("label is required for map()")

        if label in self.nodes:
            raise ValueError(
                f"Label '{label}' already exists in DAG. Use a unique label for map nodes."
            )

        map_node = MapNode(
            sub_dag=sub_dag,  # pyright: ignore[reportArgumentType]
            label=label,
            dag=self,
        )
        self.nodes[label] = map_node
        return map_node

    def switch(
        self,
        branches: list[Dag | Callable[..., Any]] | dict[str, Dag | Callable[..., Any]],
        label: str,
    ) -> SwitchNode:
        """
        Create a `SwitchNode` that routes to one of several branch DAGs
        based on a `LabelSwitch` value. Accepts a list (labels auto-derived)
        or dict of DAGs, `@dag_factory` functions, or plain callables.
        """
        if not label:
            raise ValueError("label is required for switch()")

        if label in self.nodes:
            raise ValueError(f"Label '{label}' already exists in DAG.")

        # Normalize to dict[str, Dag]
        if isinstance(branches, list):
            branch_dict: dict[str, Dag] = {}
            for item in branches:
                dag, branch_label = self._resolve_branch(item)
                branch_dict[branch_label] = dag
        else:
            branch_dict = {}
            for key, item in branches.items():
                dag, _ = self._resolve_branch(item)
                branch_dict[key] = dag

        switch_node = SwitchNode(branches=branch_dict, label=label, dag=self)
        self.nodes[label] = switch_node
        return switch_node

    def _resolve_branch(self, item: Dag | Callable[..., Any]) -> tuple[Dag, str]:
        """Resolve a branch item to (Dag, label). Handles Dag, @dag_factory, and callables."""
        if isinstance(item, Dag):
            return item, ""
        if callable(item) and getattr(item, "_is_dag_factory", False):
            branch_label = str(getattr(item, "__name__", ""))
            result = item()  # pyright: ignore[reportCallIssue]
            if not isinstance(result, Dag):
                raise TypeError(f"@dag_factory must return a Dag, got {type(result).__name__}")
            return result, branch_label
        if callable(item):
            branch_label = str(getattr(item, "__name__", ""))
            dag = Dag()
            ns_node = dag.namespace.register(item)
            dag.node(ns_node)
            return dag, branch_label
        raise TypeError(f"Expected Dag, @dag_factory, or callable, got {type(item).__name__}")

    def add_edge(self, upstream: DagNode, downstream: DagNode) -> DagEdge:
        """Register a directed edge between two nodes."""
        edge = DagEdge(upstream=upstream.label, downstream=downstream.label)
        self.edges.append(edge)
        return edge

    def validate(self) -> None:
        """Check acyclicity and type compatibility between connected nodes."""
        self._check_acyclicity()
        self._check_type_compatibility()

    def _check_acyclicity(self) -> None:
        """Kahn's algorithm for topological sort -- raises if cycle detected."""
        in_degree: dict[str, int] = {label: 0 for label in self.nodes}
        adj: dict[str, list[str]] = {label: [] for label in self.nodes}
        for edge in self.edges:
            adj[edge.upstream].append(edge.downstream)
            in_degree[edge.downstream] += 1

        queue = [label for label, deg in in_degree.items() if deg == 0]
        visited = 0
        while queue:
            current = queue.pop(0)
            visited += 1
            for neighbor in adj[current]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)

        if visited != len(self.nodes):
            raise ValueError("DAG contains a cycle")

    def _check_type_compatibility(self) -> None:
        """Warn-level check: upstream return type should match a downstream input type."""
        for edge in self.edges:
            upstream_node = self.nodes[edge.upstream]
            downstream_node = self.nodes[edge.downstream]

            upstream_return = upstream_node.ns_node.method.return_annotation
            if upstream_return is None or upstream_return is _inspect.Parameter.empty:
                continue

            downstream_args = downstream_node.ns_node.method.args
            if downstream_node.ns_node.expects_dag_context() and len(downstream_args) > 0:
                downstream_args = downstream_args[1:]

            if not downstream_args:
                continue

            downstream_types = {
                arg.annotation for arg in downstream_args if arg.annotation is not None
            }
            # Also accept list[X] params when upstream returns X (fan-in)
            fan_in_types: set[str] = set()
            for dt in downstream_types:
                if isinstance(dt, str) and dt.startswith("list[") and dt.endswith("]"):
                    fan_in_types.add(dt[5:-1])
            upstream_str = str(upstream_return)
            if (
                downstream_types
                and upstream_return not in downstream_types
                and upstream_str not in fan_in_types
            ):
                logging.getLogger(__name__).warning(
                    "Type mismatch on edge %s -> %s: upstream returns %s, downstream accepts %s",
                    edge.upstream,
                    edge.downstream,
                    upstream_return,
                    downstream_types,
                )

    def topological_order(self) -> list[DagNode]:
        """Return nodes in topological order. Stable sort by label for determinism."""
        in_degree: dict[str, int] = {label: 0 for label in self.nodes}
        adj: dict[str, list[str]] = {label: [] for label in self.nodes}
        for edge in self.edges:
            adj[edge.upstream].append(edge.downstream)
            in_degree[edge.downstream] += 1

        queue = sorted([label for label, deg in in_degree.items() if deg == 0])
        result: list[DagNode] = []
        while queue:
            current = queue.pop(0)
            result.append(self.nodes[current])
            for neighbor in sorted(adj[current]):
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)

        return result

    def sources(self) -> list[DagNode]:
        """Return nodes with no upstream edges."""
        has_upstream = {edge.downstream for edge in self.edges}
        return [self.nodes[label] for label in self.nodes if label not in has_upstream]

    def sinks(self) -> list[DagNode]:
        """Return nodes with no downstream edges."""
        has_downstream = {edge.upstream for edge in self.edges}
        return [self.nodes[label] for label in self.nodes if label not in has_downstream]

    async def __call__(self, **kwargs: Any) -> DagRunResult:
        """
        Run the DAG directly with DagRunner's default - NullProvenance. Kwargs are matched
        to source node parameters by name.
        """
        from lythonic.compose.dag_runner import (  # pyright: ignore[reportImportCycles]
            DagRunner,
        )

        source_inputs: dict[str, dict[str, Any]] = {}
        for node in self.sources():
            if node.label in kwargs and isinstance(kwargs[node.label], dict):
                source_inputs[node.label] = kwargs[node.label]
                continue
            node_args = node.ns_node.method.args
            if node.ns_node.expects_dag_context():
                node_args = node_args[1:]
            node_kwargs = {a.name: kwargs[a.name] for a in node_args if a.name in kwargs}
            if node_kwargs:
                source_inputs[node.label] = node_kwargs

        runner = DagRunner(self)
        return await runner.run(source_inputs=source_inputs)

    def __enter__(self) -> Dag:
        return self

    def __exit__(
        self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any
    ) -> None:
        if exc_type is None:
            self.validate()

resolve(nsref)

Resolve a callable by nsref. Tries parent_namespace first (if set), then falls back to the DAG's own namespace.

Source code in src/lythonic/compose/namespace.py
def resolve(self, nsref: str | NsRef) -> NamespaceNode:
    """
    Resolve a callable by nsref. Tries `parent_namespace` first
    (if set), then falls back to the DAG's own namespace.
    """
    if self.parent_namespace is not None:
        try:
            return self.parent_namespace.get(nsref)
        except KeyError:
            pass
    return self.namespace.get(nsref)

node(source, label=None)

Create a unique DagNode in this graph. If label is None, derived from the source's nsref leaf name. Raises ValueError if the label already exists.

When source is a Dag or @dag_factory, creates a CallNode that runs the sub-DAG as a single step. Label is auto-derived from the factory function name if not provided.

Source code in src/lythonic/compose/namespace.py
def node(
    self,
    source: NamespaceNode | Callable[..., Any] | Dag,
    label: str | None = None,
) -> DagNode:
    """
    Create a unique `DagNode` in this graph. If `label` is `None`,
    derived from the source's nsref leaf name. Raises `ValueError`
    if the label already exists.

    When `source` is a `Dag` or `@dag_factory`, creates a `CallNode`
    that runs the sub-DAG as a single step. Label is auto-derived
    from the factory function name if not provided.
    """
    # Expand @dag_factory to a Dag
    if callable(source) and getattr(source, "_is_dag_factory", False):
        if label is None:
            label = str(getattr(source, "__name__", ""))
        factory_result = source()  # pyright: ignore[reportCallIssue]
        if not isinstance(factory_result, Dag):
            raise TypeError(
                f"@dag_factory must return a Dag, got {type(factory_result).__name__}"
            )
        source = factory_result

    if isinstance(source, Dag):
        if label is None:
            raise ValueError("label is required when passing a Dag to node()")
        if label in self.nodes:
            raise ValueError(
                f"Label '{label}' already exists in DAG. "
                f"Use an explicit label for duplicate callables."
            )
        call_node = CallNode(sub_dag=source, label=label, dag=self)
        self.nodes[label] = call_node
        return call_node

    if isinstance(source, NamespaceNode):
        ns_node = source
    else:
        gref = GlobalRef(source)
        try:
            ns_node = self.namespace.get(str(gref))
        except KeyError:
            ns_node = self.namespace.register(source)

    if label is None:
        label = ns_node.nsref.name

    if label in self.nodes:
        raise ValueError(
            f"Label '{label}' already exists in DAG. "
            f"Use an explicit label for duplicate callables."
        )

    dag_node = DagNode(ns_node=ns_node, label=label, dag=self)
    self.nodes[label] = dag_node
    return dag_node

map(sub_dag, label=None)

Create a MapNode that runs sub_dag on each element of an upstream collection. The sub-DAG must have exactly one source and one sink. Accepts a @dag_factory function, SwitchNode, or Dag. When a SwitchNode is passed, it is removed from the parent DAG and wrapped as the map's sub-processing.

Source code in src/lythonic/compose/namespace.py
def map(
    self,
    sub_dag: Dag | SwitchNode | Callable[..., Any],
    label: str | None = None,
) -> MapNode:
    """
    Create a `MapNode` that runs `sub_dag` on each element of an
    upstream collection. The sub-DAG must have exactly one source
    and one sink. Accepts a `@dag_factory` function, `SwitchNode`,
    or `Dag`. When a `SwitchNode` is passed, it is removed from the
    parent DAG and wrapped as the map's sub-processing.
    """
    # Expand @dag_factory to a Dag
    if callable(sub_dag) and getattr(sub_dag, "_is_dag_factory", False):
        if label is None:
            label = str(getattr(sub_dag, "__name__", ""))
        factory_result = sub_dag()  # pyright: ignore[reportCallIssue]
        if not isinstance(factory_result, Dag):
            raise TypeError(
                f"@dag_factory must return a Dag, got {type(factory_result).__name__}"
            )
        sub_dag = factory_result

    # SwitchNode passed directly — remove from parent, wrap in a MapSwitchNode
    if isinstance(sub_dag, SwitchNode):
        switch_node = sub_dag
        if label is None:
            label = switch_node.label
        # Remove switch from parent DAG since it's now inside the map
        self.nodes.pop(switch_node.label, None)
        if not label:
            raise ValueError("label is required for map()")
        if label in self.nodes:
            raise ValueError(f"Label '{label}' already exists in DAG.")
        map_switch = MapSwitchNode(switch_node=switch_node, label=label, dag=self)
        self.nodes[label] = map_switch
        return map_switch  # pyright: ignore[reportReturnType]

    if not label:
        raise ValueError("label is required for map()")

    if label in self.nodes:
        raise ValueError(
            f"Label '{label}' already exists in DAG. Use a unique label for map nodes."
        )

    map_node = MapNode(
        sub_dag=sub_dag,  # pyright: ignore[reportArgumentType]
        label=label,
        dag=self,
    )
    self.nodes[label] = map_node
    return map_node

switch(branches, label)

Create a SwitchNode that routes to one of several branch DAGs based on a LabelSwitch value. Accepts a list (labels auto-derived) or dict of DAGs, @dag_factory functions, or plain callables.

Source code in src/lythonic/compose/namespace.py
def switch(
    self,
    branches: list[Dag | Callable[..., Any]] | dict[str, Dag | Callable[..., Any]],
    label: str,
) -> SwitchNode:
    """
    Create a `SwitchNode` that routes to one of several branch DAGs
    based on a `LabelSwitch` value. Accepts a list (labels auto-derived)
    or dict of DAGs, `@dag_factory` functions, or plain callables.
    """
    if not label:
        raise ValueError("label is required for switch()")

    if label in self.nodes:
        raise ValueError(f"Label '{label}' already exists in DAG.")

    # Normalize to dict[str, Dag]
    if isinstance(branches, list):
        branch_dict: dict[str, Dag] = {}
        for item in branches:
            dag, branch_label = self._resolve_branch(item)
            branch_dict[branch_label] = dag
    else:
        branch_dict = {}
        for key, item in branches.items():
            dag, _ = self._resolve_branch(item)
            branch_dict[key] = dag

    switch_node = SwitchNode(branches=branch_dict, label=label, dag=self)
    self.nodes[label] = switch_node
    return switch_node

add_edge(upstream, downstream)

Register a directed edge between two nodes.

Source code in src/lythonic/compose/namespace.py
def add_edge(self, upstream: DagNode, downstream: DagNode) -> DagEdge:
    """Register a directed edge between two nodes."""
    edge = DagEdge(upstream=upstream.label, downstream=downstream.label)
    self.edges.append(edge)
    return edge

validate()

Check acyclicity and type compatibility between connected nodes.

Source code in src/lythonic/compose/namespace.py
def validate(self) -> None:
    """Check acyclicity and type compatibility between connected nodes."""
    self._check_acyclicity()
    self._check_type_compatibility()

topological_order()

Return nodes in topological order. Stable sort by label for determinism.

Source code in src/lythonic/compose/namespace.py
def topological_order(self) -> list[DagNode]:
    """Return nodes in topological order. Stable sort by label for determinism."""
    in_degree: dict[str, int] = {label: 0 for label in self.nodes}
    adj: dict[str, list[str]] = {label: [] for label in self.nodes}
    for edge in self.edges:
        adj[edge.upstream].append(edge.downstream)
        in_degree[edge.downstream] += 1

    queue = sorted([label for label, deg in in_degree.items() if deg == 0])
    result: list[DagNode] = []
    while queue:
        current = queue.pop(0)
        result.append(self.nodes[current])
        for neighbor in sorted(adj[current]):
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)

    return result

sources()

Return nodes with no upstream edges.

Source code in src/lythonic/compose/namespace.py
def sources(self) -> list[DagNode]:
    """Return nodes with no upstream edges."""
    has_upstream = {edge.downstream for edge in self.edges}
    return [self.nodes[label] for label in self.nodes if label not in has_upstream]

sinks()

Return nodes with no downstream edges.

Source code in src/lythonic/compose/namespace.py
def sinks(self) -> list[DagNode]:
    """Return nodes with no downstream edges."""
    has_downstream = {edge.upstream for edge in self.edges}
    return [self.nodes[label] for label in self.nodes if label not in has_downstream]

__call__(**kwargs) async

Run the DAG directly with DagRunner's default - NullProvenance. Kwargs are matched to source node parameters by name.

Source code in src/lythonic/compose/namespace.py
async def __call__(self, **kwargs: Any) -> DagRunResult:
    """
    Run the DAG directly with DagRunner's default - NullProvenance. Kwargs are matched
    to source node parameters by name.
    """
    from lythonic.compose.dag_runner import (  # pyright: ignore[reportImportCycles]
        DagRunner,
    )

    source_inputs: dict[str, dict[str, Any]] = {}
    for node in self.sources():
        if node.label in kwargs and isinstance(kwargs[node.label], dict):
            source_inputs[node.label] = kwargs[node.label]
            continue
        node_args = node.ns_node.method.args
        if node.ns_node.expects_dag_context():
            node_args = node_args[1:]
        node_kwargs = {a.name: kwargs[a.name] for a in node_args if a.name in kwargs}
        if node_kwargs:
            source_inputs[node.label] = node_kwargs

    runner = DagRunner(self)
    return await runner.run(source_inputs=source_inputs)

DagNode

Unique node in a Dag, wrapping a NamespaceNode. Same callable can appear multiple times with different labels.

Source code in src/lythonic/compose/namespace.py
class DagNode:
    """
    Unique node in a `Dag`, wrapping a `NamespaceNode`.
    Same callable can appear multiple times with different labels.
    """

    ns_node: NamespaceNode
    label: str
    dag: Dag

    def __init__(self, ns_node: NamespaceNode, label: str, dag: Dag) -> None:
        self.ns_node = ns_node
        self.label = label
        self.dag = dag

    def __rshift__(self, other: DagNode) -> DagNode:
        """Register edge from self to other. Returns downstream for chaining."""
        self.dag.add_edge(self, other)
        return other

__rshift__(other)

Register edge from self to other. Returns downstream for chaining.

Source code in src/lythonic/compose/namespace.py
def __rshift__(self, other: DagNode) -> DagNode:
    """Register edge from self to other. Returns downstream for chaining."""
    self.dag.add_edge(self, other)
    return other

DagEdge

Bases: BaseModel

Edge between two DagNodes in a Dag.

Source code in src/lythonic/compose/namespace.py
class DagEdge(BaseModel):
    """Edge between two `DagNode`s in a `Dag`."""

    upstream: str
    downstream: str

CompositeDagNode

Bases: DagNode

Base class for DAG nodes that contain sub-DAGs. Stores source/sink type contracts for compatibility checking.

Source code in src/lythonic/compose/namespace.py
class CompositeDagNode(DagNode):
    """
    Base class for DAG nodes that contain sub-DAGs. Stores source/sink
    type contracts for compatibility checking.
    """

    def __init__(self, label: str, dag: Dag) -> None:
        placeholder = NamespaceNode(
            method=Method(lambda: None),
            nsref=f"__composite__:{label}",
            namespace=dag.namespace,
        )
        super().__init__(ns_node=placeholder, label=label, dag=dag)

MapNode

Bases: CompositeDagNode

Runs a sub-DAG on each element of a collection.

Source code in src/lythonic/compose/namespace.py
class MapNode(CompositeDagNode):
    """Runs a sub-DAG on each element of a collection."""

    sub_dag: Dag

    def __init__(self, sub_dag: Dag, label: str, dag: Dag) -> None:
        _validate_single_source_sink(sub_dag)
        super().__init__(label=label, dag=dag)
        self.sub_dag = sub_dag

CallNode

Bases: CompositeDagNode

Runs a sub-DAG once as a single step.

Source code in src/lythonic/compose/namespace.py
class CallNode(CompositeDagNode):
    """Runs a sub-DAG once as a single step."""

    sub_dag: Dag

    def __init__(self, sub_dag: Dag, label: str, dag: Dag) -> None:
        _validate_single_source_sink(sub_dag)
        super().__init__(label=label, dag=dag)
        self.sub_dag = sub_dag

SwitchNode

Bases: CompositeDagNode

Routes data to one of several branch DAGs based on a LabelSwitch value. All branches must have compatible single source and single sink.

Source code in src/lythonic/compose/namespace.py
class SwitchNode(CompositeDagNode):
    """
    Routes data to one of several branch DAGs based on a `LabelSwitch` value.
    All branches must have compatible single source and single sink.
    """

    branches: dict[str, Dag]

    def __init__(self, branches: dict[str, Dag], label: str, dag: Dag) -> None:
        for branch_label, branch_dag in branches.items():
            try:
                _validate_single_source_sink(branch_dag)
            except ValueError as e:
                raise ValueError(f"Branch '{branch_label}': {e}") from e
        super().__init__(label=label, dag=dag)
        self.branches = branches

MapSwitchNode

Bases: CompositeDagNode

Combines MapNode and SwitchNode: maps over a collection, routing each element through a switch. Created by dag.map(dag.switch(...)).

Source code in src/lythonic/compose/namespace.py
class MapSwitchNode(CompositeDagNode):
    """
    Combines MapNode and SwitchNode: maps over a collection, routing each
    element through a switch. Created by `dag.map(dag.switch(...))`.
    """

    switch_node: SwitchNode

    def __init__(self, switch_node: SwitchNode, label: str, dag: Dag) -> None:
        super().__init__(label=label, dag=dag)
        self.switch_node = switch_node