Skip to content

lythonic.compose.namespace

Hierarchical registry of callables with metadata and DAG composition.

Namespace: Hierarchical registry of callables with metadata and DAG composition.

Provides Namespace for registering callables wrapped in NamespaceNode (which wraps Method), using GlobalRef-style paths ("branch.sub:leaf"). Includes DagContext as base context for DAG-participating callables.

Path Scheme

  • . separates namespace levels (branches)
  • : separates the leaf callable from its namespace
  • Example: "market.data:fetch_prices" -> branch market.data, leaf fetch_prices

Usage

Each Dag owns a Namespace. Passing a callable to dag.node() auto-registers it in the DAG's namespace (derived from the callable's module and name):

from lythonic.compose.namespace import Dag

def fetch(url: str) -> dict:
    return {"source": url, "raw": [1, 2, 3]}

def transform(data: dict) -> dict:
    return {"source": data["source"], "values": [v * 2 for v in data["raw"]]}

dag = Dag()
dag.node(fetch) >> dag.node(transform)

You can also register callables in a standalone Namespace and look them up by path or dot-access (e.g. ns.get("market:fetch") or ns.market.fetch).

Tags

Nodes can be tagged at registration for later querying:

ns.register(fetch_prices, nsref="market:fetch", tags={"slow", "market-data"})
ns.register(compute_stats, nsref="market:stats", tags={"fast", "market-data"})

ns.query("market-data")              # both nodes
ns.query("slow & market-data")       # only fetch
ns.query("~slow")                    # only stats
ns.query("slow | fast")              # both nodes

Tag expressions support & (AND), | (OR), ~ (NOT) with standard precedence (~ > & > |). Tags must not contain spaces or operator characters.

Map

Dag.map(sub_dag, label) creates a MapNode that runs a sub-DAG on each element of an upstream list or dict, concurrently via asyncio.gather:

sub_dag = Dag()
sub_dag.node(tokenize) >> sub_dag.node(count)

parent = Dag()
parent.node(split_text) >> parent.map(sub_dag, label="chunks") >> parent.node(reduce)

The sub-DAG must have exactly one source and one sink.

Composable DAGs

dag.node(sub_dag, label="enrich") creates a CallNode that runs a sub-DAG once as a single step, passing the upstream output to the sub-DAG's source:

enrich_dag = Dag()
enrich_dag.node(lookup) >> enrich_dag.node(annotate)

parent = Dag()
parent.node(fetch) >> parent.node(enrich_dag, label="enrich") >> parent.node(save)

Callable DAGs

A Dag is directly callable. await dag(**kwargs) creates a DagRunner with NullProvenance and returns a DagRunResult. Kwargs are matched to source node parameters by name:

result = await dag(url="https://example.com")
print(result.status)   # "completed"
print(result.outputs)  # sink node outputs

DAG Factories

Use @dag_factory to mark a function that builds a DAG. When registered in a namespace, the factory is called and the resulting DAG is registered with a __ suffix:

from lythonic.compose.namespace import dag_factory, Dag

@dag_factory
def my_pipeline() -> Dag:
    dag = Dag()
    dag.node(step1) >> dag.node(step2)
    return dag

ns.register(my_pipeline)  # registers DAG under "module:my_pipeline__"

Namespace

Flat registry of callables wrapped in NamespaceNode. Uses GlobalRef-style paths as keys: "module.sub:callable_name".

Source code in src/lythonic/compose/namespace.py
class Namespace:
    """
    Flat registry of callables wrapped in `NamespaceNode`.
    Uses GlobalRef-style paths as keys: `"module.sub:callable_name"`.
    """

    _nodes: dict[str, NamespaceNode]

    def __init__(self) -> None:
        self._nodes = {}

    def register(
        self,
        c: Callable[..., Any] | str | Dag,
        nsref: str | None = None,
        decorate: Callable[[Callable[..., Any]], Callable[..., Any]] | None = None,
        tags: frozenset[str] | set[str] | list[str] | None = None,
        config: NsNodeConfig | None = None,
    ) -> NamespaceNode:
        """
        Register a callable. If `nsref` is `None`, derive from the callable's
        module and name. If the callable is marked with `@dag_factory`,
        call it and register the returned DAG with a `__` suffix. If
        `decorate` is provided, wrap the callable for invocation (metadata
        is still extracted from the original). Optional `tags` are stored
        on the node for querying via `query()`.
        """
        if isinstance(c, Dag):
            return self._register_dag(c, nsref, tags=tags, config=config)

        # Handle @dag_factory decorated callables
        if callable(c) and getattr(c, "_is_dag_factory", False):
            gref = GlobalRef(c)
            factory_nsref = nsref or f"{gref.module}:{gref.name}__"
            dag = c()
            if not isinstance(dag, Dag):
                raise TypeError(f"@dag_factory {gref} must return a Dag, got {type(dag).__name__}")
            return self._register_dag(dag, factory_nsref, tags=tags, config=config)

        gref = GlobalRef(c)

        if nsref is None:
            nsref = f"{gref.module}:{gref.name}"

        method = Method(gref)
        decorated = decorate(method.o) if decorate else None

        if nsref in self._nodes:
            raise ValueError(f"'{nsref}' already exists in namespace")

        node = NamespaceNode(
            method=method,
            nsref=nsref,
            namespace=self,
            decorated=decorated,
            tags=tags,
            config=config,
        )
        self._nodes[nsref] = node
        return node

    def _register_dag(
        self,
        dag: Dag,
        nsref: str | None,
        tags: frozenset[str] | set[str] | list[str] | None = None,
        config: NsNodeConfig | None = None,
    ) -> NamespaceNode:
        """
        Register a Dag as a callable NamespaceNode. Sets the DAG's
        `parent_namespace` to this namespace for runtime resolution.
        The DAG keeps its own namespace; callables are not copied.
        """
        if nsref is None:
            raise ValueError("nsref is required when registering a Dag")

        if dag.parent_namespace is self:
            raise ValueError(f"Dag '{nsref}' is already registered with this namespace")

        dag.parent_namespace = self

        from lythonic.compose.dag_runner import DagRunner  # pyright: ignore[reportImportCycles]

        runner = DagRunner(dag)

        async def dag_wrapper(**kwargs: Any) -> Any:
            source_labels = {n.label for n in dag.sources()}
            source_inputs: dict[str, dict[str, Any]] = {}
            for label in source_labels:
                # If a kwarg key matches a source label and its value is a dict,
                # use it directly as that node's inputs.
                if label in kwargs and isinstance(kwargs[label], dict):
                    source_inputs[label] = kwargs[label]
                    continue
                node = dag.nodes[label]
                node_args = node.ns_node.method.args
                if node.ns_node.expects_dag_context():
                    node_args = node_args[1:]
                node_kwargs = {a.name: kwargs[a.name] for a in node_args if a.name in kwargs}
                if node_kwargs:
                    source_inputs[label] = node_kwargs
            return await runner.run(source_inputs=source_inputs, dag_nsref=nsref)

        method = Method(dag_wrapper)

        if nsref in self._nodes:
            raise ValueError(f"'{nsref}' already exists in namespace")

        node = NamespaceNode(method=method, nsref=nsref, namespace=self, tags=tags, config=config)
        self._nodes[nsref] = node
        return node

    def get(self, nsref: str) -> NamespaceNode:
        """Retrieve a node by nsref. Raises `KeyError` if not found."""
        if nsref not in self._nodes:
            raise KeyError(f"'{nsref}' not found in namespace")
        return self._nodes[nsref]

    def register_all(
        self,
        *cc: Callable[..., Any],
        decorate: Callable[[Callable[..., Any]], Callable[..., Any]] | None = None,
        tags: frozenset[str] | set[str] | list[str] | None = None,
    ) -> list[NamespaceNode]:
        """Bulk register callables using derived paths."""
        return [self.register(c, decorate=decorate, tags=tags) for c in cc]

    def _all_leaves(self) -> list[NamespaceNode]:
        """Return all registered nodes."""
        return list(self._nodes.values())

    def query(self, expr: str) -> list[NamespaceNode]:
        """
        Return all nodes matching a tag expression. Supports `&` (AND),
        `|` (OR), `~` (NOT) with standard precedence (`~` > `&` > `|`).
        """
        tokens = _parse_tag_expr(expr)
        return [node for node in self._all_leaves() if _eval_tag_expr(tokens, node.tags)]

    def get_trigger(self, name: str) -> tuple[NamespaceNode, TriggerConfig]:
        """Find a trigger by name across all nodes. Returns (node, trigger_config)."""
        for node in self._nodes.values():
            for tc in node.config.triggers:
                if tc.name == name:
                    return node, tc
        raise KeyError(f"Trigger '{name}' not found")

    def to_dict(self) -> list[dict[str, Any]]:
        """Serialize all node configs to a list of dicts (YAML/JSON-ready)."""
        return [node.config.model_dump(exclude_none=True) for node in self._all_leaves()]

    @classmethod
    def from_dict(cls, entries: list[dict[str, Any]]) -> Namespace:
        """Build a Namespace from a list of serialized node configs."""
        ns = cls()
        for entry in entries:
            config = NsNodeConfig.model_validate(entry)
            if config.gref is not None:
                gref = GlobalRef(config.gref)
                instance = gref.get_instance()
                ns.register(instance, nsref=config.nsref, config=config)
            else:
                ns.register(lambda: None, nsref=config.nsref, config=config)
        return ns

    def __getattr__(self, name: str) -> Any:
        # Avoid recursion for internal attributes.
        if name == "_nodes":
            raise AttributeError(name)
        # Look up by name as a leaf suffix (for simple nsrefs like "t:fetch" -> "fetch")
        for nsref, node in self._nodes.items():
            _, leaf = _parse_nsref(nsref)
            if leaf == name:
                return node
        raise AttributeError(f"'{name}' not found in namespace")

register(c, nsref=None, decorate=None, tags=None, config=None)

Register a callable. If nsref is None, derive from the callable's module and name. If the callable is marked with @dag_factory, call it and register the returned DAG with a __ suffix. If decorate is provided, wrap the callable for invocation (metadata is still extracted from the original). Optional tags are stored on the node for querying via query().

Source code in src/lythonic/compose/namespace.py
def register(
    self,
    c: Callable[..., Any] | str | Dag,
    nsref: str | None = None,
    decorate: Callable[[Callable[..., Any]], Callable[..., Any]] | None = None,
    tags: frozenset[str] | set[str] | list[str] | None = None,
    config: NsNodeConfig | None = None,
) -> NamespaceNode:
    """
    Register a callable. If `nsref` is `None`, derive from the callable's
    module and name. If the callable is marked with `@dag_factory`,
    call it and register the returned DAG with a `__` suffix. If
    `decorate` is provided, wrap the callable for invocation (metadata
    is still extracted from the original). Optional `tags` are stored
    on the node for querying via `query()`.
    """
    if isinstance(c, Dag):
        return self._register_dag(c, nsref, tags=tags, config=config)

    # Handle @dag_factory decorated callables
    if callable(c) and getattr(c, "_is_dag_factory", False):
        gref = GlobalRef(c)
        factory_nsref = nsref or f"{gref.module}:{gref.name}__"
        dag = c()
        if not isinstance(dag, Dag):
            raise TypeError(f"@dag_factory {gref} must return a Dag, got {type(dag).__name__}")
        return self._register_dag(dag, factory_nsref, tags=tags, config=config)

    gref = GlobalRef(c)

    if nsref is None:
        nsref = f"{gref.module}:{gref.name}"

    method = Method(gref)
    decorated = decorate(method.o) if decorate else None

    if nsref in self._nodes:
        raise ValueError(f"'{nsref}' already exists in namespace")

    node = NamespaceNode(
        method=method,
        nsref=nsref,
        namespace=self,
        decorated=decorated,
        tags=tags,
        config=config,
    )
    self._nodes[nsref] = node
    return node

get(nsref)

Retrieve a node by nsref. Raises KeyError if not found.

Source code in src/lythonic/compose/namespace.py
def get(self, nsref: str) -> NamespaceNode:
    """Retrieve a node by nsref. Raises `KeyError` if not found."""
    if nsref not in self._nodes:
        raise KeyError(f"'{nsref}' not found in namespace")
    return self._nodes[nsref]

register_all(*cc, decorate=None, tags=None)

Bulk register callables using derived paths.

Source code in src/lythonic/compose/namespace.py
def register_all(
    self,
    *cc: Callable[..., Any],
    decorate: Callable[[Callable[..., Any]], Callable[..., Any]] | None = None,
    tags: frozenset[str] | set[str] | list[str] | None = None,
) -> list[NamespaceNode]:
    """Bulk register callables using derived paths."""
    return [self.register(c, decorate=decorate, tags=tags) for c in cc]

query(expr)

Return all nodes matching a tag expression. Supports & (AND), | (OR), ~ (NOT) with standard precedence (~ > & > |).

Source code in src/lythonic/compose/namespace.py
def query(self, expr: str) -> list[NamespaceNode]:
    """
    Return all nodes matching a tag expression. Supports `&` (AND),
    `|` (OR), `~` (NOT) with standard precedence (`~` > `&` > `|`).
    """
    tokens = _parse_tag_expr(expr)
    return [node for node in self._all_leaves() if _eval_tag_expr(tokens, node.tags)]

get_trigger(name)

Find a trigger by name across all nodes. Returns (node, trigger_config).

Source code in src/lythonic/compose/namespace.py
def get_trigger(self, name: str) -> tuple[NamespaceNode, TriggerConfig]:
    """Find a trigger by name across all nodes. Returns (node, trigger_config)."""
    for node in self._nodes.values():
        for tc in node.config.triggers:
            if tc.name == name:
                return node, tc
    raise KeyError(f"Trigger '{name}' not found")

to_dict()

Serialize all node configs to a list of dicts (YAML/JSON-ready).

Source code in src/lythonic/compose/namespace.py
def to_dict(self) -> list[dict[str, Any]]:
    """Serialize all node configs to a list of dicts (YAML/JSON-ready)."""
    return [node.config.model_dump(exclude_none=True) for node in self._all_leaves()]

from_dict(entries) classmethod

Build a Namespace from a list of serialized node configs.

Source code in src/lythonic/compose/namespace.py
@classmethod
def from_dict(cls, entries: list[dict[str, Any]]) -> Namespace:
    """Build a Namespace from a list of serialized node configs."""
    ns = cls()
    for entry in entries:
        config = NsNodeConfig.model_validate(entry)
        if config.gref is not None:
            gref = GlobalRef(config.gref)
            instance = gref.get_instance()
            ns.register(instance, nsref=config.nsref, config=config)
        else:
            ns.register(lambda: None, nsref=config.nsref, config=config)
    return ns

NamespaceNode

Wraps a Method with namespace identity. Callable -- delegates to the decorated callable if present, otherwise to method.o.

Source code in src/lythonic/compose/namespace.py
class NamespaceNode:
    """
    Wraps a `Method` with namespace identity. Callable -- delegates to the
    decorated callable if present, otherwise to `method.o`.
    """

    method: Method
    namespace: Namespace
    config: NsNodeConfig
    _decorated: Callable[..., Any] | None

    def __init__(
        self,
        method: Method,
        nsref: str,
        namespace: Namespace,
        decorated: Callable[..., Any] | None = None,
        tags: frozenset[str] | set[str] | list[str] | None = None,
        config: NsNodeConfig | None = None,
    ) -> None:
        self.method = method
        self.namespace = namespace
        self._decorated = decorated
        self.metadata: dict[str, Any] = {}
        validated_tags = _validate_tags(tags)
        self.config = config or NsNodeConfig(
            nsref=nsref,
            gref=method.gref if method.gref else None,
            tags=sorted(validated_tags) if validated_tags else None,
        )

    @property
    def nsref(self) -> str:
        return self.config.nsref

    @property
    def tags(self) -> frozenset[str]:
        return frozenset(self.config.tags) if self.config.tags else frozenset()

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        if self._decorated is not None:
            return self._decorated(*args, **kwargs)
        return self.method(*args, **kwargs)

    def expects_dag_context(self) -> bool:
        """True if first parameter is `DagContext` or a subclass."""
        return self.dag_context_type() is not None

    def dag_context_type(self) -> type[DagContext] | None:
        """Return the `DagContext` subclass expected, or `None`."""
        resolved = _resolve_first_param_type(self.method.o)
        if resolved is not None and issubclass(resolved, DagContext):
            return resolved
        return None

    def __repr__(self) -> str:  # pyright: ignore[reportImplicitOverride]
        return f"NamespaceNode(nsref={self.nsref!r})"

expects_dag_context()

True if first parameter is DagContext or a subclass.

Source code in src/lythonic/compose/namespace.py
def expects_dag_context(self) -> bool:
    """True if first parameter is `DagContext` or a subclass."""
    return self.dag_context_type() is not None

dag_context_type()

Return the DagContext subclass expected, or None.

Source code in src/lythonic/compose/namespace.py
def dag_context_type(self) -> type[DagContext] | None:
    """Return the `DagContext` subclass expected, or `None`."""
    resolved = _resolve_first_param_type(self.method.o)
    if resolved is not None and issubclass(resolved, DagContext):
        return resolved
    return None

DagContext

Bases: BaseModel

Base context injected into DAG-participating callables. Subclass to add domain-specific fields.

Source code in src/lythonic/compose/namespace.py
class DagContext(BaseModel):
    """
    Base context injected into DAG-participating callables.
    Subclass to add domain-specific fields.
    """

    dag_nsref: str
    node_label: str
    run_id: str

Dag

Directed acyclic graph of DagNodes with type-based validation. Use node() to add nodes and >> to declare edges.

Source code in src/lythonic/compose/namespace.py
class Dag:
    """
    Directed acyclic graph of `DagNode`s with type-based validation.
    Use `node()` to add nodes and `>>` to declare edges.
    """

    nodes: dict[str, DagNode]
    edges: list[DagEdge]
    namespace: Namespace

    def __init__(self) -> None:
        self.nodes = {}
        self.edges = []
        self.namespace = Namespace()
        self.parent_namespace: Namespace | None = None

    def resolve(self, nsref: str) -> NamespaceNode:
        """
        Resolve a callable by nsref. Tries `parent_namespace` first
        (if set), then falls back to the DAG's own namespace.
        """
        if self.parent_namespace is not None:
            try:
                return self.parent_namespace.get(nsref)
            except KeyError:
                pass
        return self.namespace.get(nsref)

    def node(
        self,
        source: NamespaceNode | Callable[..., Any] | Dag,
        label: str | None = None,
    ) -> DagNode:
        """
        Create a unique `DagNode` in this graph. If `label` is `None`,
        derived from the source's nsref leaf name. Raises `ValueError`
        if the label already exists.

        When `source` is a `Dag` or `@dag_factory`, creates a `CallNode`
        that runs the sub-DAG as a single step. Label is auto-derived
        from the factory function name if not provided.
        """
        # Expand @dag_factory to a Dag
        if callable(source) and getattr(source, "_is_dag_factory", False):
            if label is None:
                label = str(getattr(source, "__name__", ""))
            factory_result = source()  # pyright: ignore[reportCallIssue]
            if not isinstance(factory_result, Dag):
                raise TypeError(
                    f"@dag_factory must return a Dag, got {type(factory_result).__name__}"
                )
            source = factory_result

        if isinstance(source, Dag):
            if label is None:
                raise ValueError("label is required when passing a Dag to node()")
            if label in self.nodes:
                raise ValueError(
                    f"Label '{label}' already exists in DAG. "
                    f"Use an explicit label for duplicate callables."
                )
            call_node = CallNode(sub_dag=source, label=label, dag=self)
            self.nodes[label] = call_node
            return call_node

        if isinstance(source, NamespaceNode):
            ns_node = source
        else:
            gref = GlobalRef(source)
            try:
                ns_node = self.namespace.get(str(gref))
            except KeyError:
                ns_node = self.namespace.register(source)

        if label is None:
            _, leaf = _parse_nsref(ns_node.nsref)
            label = leaf

        if label in self.nodes:
            raise ValueError(
                f"Label '{label}' already exists in DAG. "
                f"Use an explicit label for duplicate callables."
            )

        dag_node = DagNode(ns_node=ns_node, label=label, dag=self)
        self.nodes[label] = dag_node
        return dag_node

    def map(
        self,
        sub_dag: Dag | Callable[..., Any],
        label: str | None = None,
    ) -> MapNode:
        """
        Create a `MapNode` that runs `sub_dag` on each element of an
        upstream collection. The sub-DAG must have exactly one source
        and one sink. Accepts a `@dag_factory` function, in which case
        the label is auto-derived from the function name.
        """
        # Expand @dag_factory to a Dag
        if callable(sub_dag) and getattr(sub_dag, "_is_dag_factory", False):
            if label is None:
                label = str(getattr(sub_dag, "__name__", ""))
            factory_result = sub_dag()  # pyright: ignore[reportCallIssue]
            if not isinstance(factory_result, Dag):
                raise TypeError(
                    f"@dag_factory must return a Dag, got {type(factory_result).__name__}"
                )
            sub_dag = factory_result

        if not label:
            raise ValueError("label is required for map()")

        if label in self.nodes:
            raise ValueError(
                f"Label '{label}' already exists in DAG. Use a unique label for map nodes."
            )

        map_node = MapNode(
            sub_dag=sub_dag,  # pyright: ignore[reportArgumentType]
            label=label,
            dag=self,
        )
        self.nodes[label] = map_node
        return map_node

    def add_edge(self, upstream: DagNode, downstream: DagNode) -> DagEdge:
        """Register a directed edge between two nodes."""
        edge = DagEdge(upstream=upstream.label, downstream=downstream.label)
        self.edges.append(edge)
        return edge

    def validate(self) -> None:
        """Check acyclicity and type compatibility between connected nodes."""
        self._check_acyclicity()
        self._check_type_compatibility()

    def _check_acyclicity(self) -> None:
        """Kahn's algorithm for topological sort -- raises if cycle detected."""
        in_degree: dict[str, int] = {label: 0 for label in self.nodes}
        adj: dict[str, list[str]] = {label: [] for label in self.nodes}
        for edge in self.edges:
            adj[edge.upstream].append(edge.downstream)
            in_degree[edge.downstream] += 1

        queue = [label for label, deg in in_degree.items() if deg == 0]
        visited = 0
        while queue:
            current = queue.pop(0)
            visited += 1
            for neighbor in adj[current]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)

        if visited != len(self.nodes):
            raise ValueError("DAG contains a cycle")

    def _check_type_compatibility(self) -> None:
        """Warn-level check: upstream return type should match a downstream input type."""
        for edge in self.edges:
            upstream_node = self.nodes[edge.upstream]
            downstream_node = self.nodes[edge.downstream]

            upstream_return = upstream_node.ns_node.method.return_annotation
            if upstream_return is None or upstream_return is _inspect.Parameter.empty:
                continue

            downstream_args = downstream_node.ns_node.method.args
            if downstream_node.ns_node.expects_dag_context() and len(downstream_args) > 0:
                downstream_args = downstream_args[1:]

            if not downstream_args:
                continue

            downstream_types = {
                arg.annotation for arg in downstream_args if arg.annotation is not None
            }
            # Also accept list[X] params when upstream returns X (fan-in)
            fan_in_types: set[str] = set()
            for dt in downstream_types:
                if isinstance(dt, str) and dt.startswith("list[") and dt.endswith("]"):
                    fan_in_types.add(dt[5:-1])
            upstream_str = str(upstream_return)
            if (
                downstream_types
                and upstream_return not in downstream_types
                and upstream_str not in fan_in_types
            ):
                logging.getLogger(__name__).warning(
                    "Type mismatch on edge %s -> %s: upstream returns %s, downstream accepts %s",
                    edge.upstream,
                    edge.downstream,
                    upstream_return,
                    downstream_types,
                )

    def topological_order(self) -> list[DagNode]:
        """Return nodes in topological order. Stable sort by label for determinism."""
        in_degree: dict[str, int] = {label: 0 for label in self.nodes}
        adj: dict[str, list[str]] = {label: [] for label in self.nodes}
        for edge in self.edges:
            adj[edge.upstream].append(edge.downstream)
            in_degree[edge.downstream] += 1

        queue = sorted([label for label, deg in in_degree.items() if deg == 0])
        result: list[DagNode] = []
        while queue:
            current = queue.pop(0)
            result.append(self.nodes[current])
            for neighbor in sorted(adj[current]):
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)

        return result

    def sources(self) -> list[DagNode]:
        """Return nodes with no upstream edges."""
        has_upstream = {edge.downstream for edge in self.edges}
        return [self.nodes[label] for label in self.nodes if label not in has_upstream]

    def sinks(self) -> list[DagNode]:
        """Return nodes with no downstream edges."""
        has_downstream = {edge.upstream for edge in self.edges}
        return [self.nodes[label] for label in self.nodes if label not in has_downstream]

    async def __call__(self, **kwargs: Any) -> DagRunResult:
        """
        Run the DAG directly with DagRunner's default - NullProvenance. Kwargs are matched
        to source node parameters by name.
        """
        from lythonic.compose.dag_runner import (  # pyright: ignore[reportImportCycles]
            DagRunner,
        )

        source_inputs: dict[str, dict[str, Any]] = {}
        for node in self.sources():
            if node.label in kwargs and isinstance(kwargs[node.label], dict):
                source_inputs[node.label] = kwargs[node.label]
                continue
            node_args = node.ns_node.method.args
            if node.ns_node.expects_dag_context():
                node_args = node_args[1:]
            node_kwargs = {a.name: kwargs[a.name] for a in node_args if a.name in kwargs}
            if node_kwargs:
                source_inputs[node.label] = node_kwargs

        runner = DagRunner(self)
        return await runner.run(source_inputs=source_inputs)

    def __enter__(self) -> Dag:
        return self

    def __exit__(
        self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any
    ) -> None:
        if exc_type is None:
            self.validate()

resolve(nsref)

Resolve a callable by nsref. Tries parent_namespace first (if set), then falls back to the DAG's own namespace.

Source code in src/lythonic/compose/namespace.py
def resolve(self, nsref: str) -> NamespaceNode:
    """
    Resolve a callable by nsref. Tries `parent_namespace` first
    (if set), then falls back to the DAG's own namespace.
    """
    if self.parent_namespace is not None:
        try:
            return self.parent_namespace.get(nsref)
        except KeyError:
            pass
    return self.namespace.get(nsref)

node(source, label=None)

Create a unique DagNode in this graph. If label is None, derived from the source's nsref leaf name. Raises ValueError if the label already exists.

When source is a Dag or @dag_factory, creates a CallNode that runs the sub-DAG as a single step. Label is auto-derived from the factory function name if not provided.

Source code in src/lythonic/compose/namespace.py
def node(
    self,
    source: NamespaceNode | Callable[..., Any] | Dag,
    label: str | None = None,
) -> DagNode:
    """
    Create a unique `DagNode` in this graph. If `label` is `None`,
    derived from the source's nsref leaf name. Raises `ValueError`
    if the label already exists.

    When `source` is a `Dag` or `@dag_factory`, creates a `CallNode`
    that runs the sub-DAG as a single step. Label is auto-derived
    from the factory function name if not provided.
    """
    # Expand @dag_factory to a Dag
    if callable(source) and getattr(source, "_is_dag_factory", False):
        if label is None:
            label = str(getattr(source, "__name__", ""))
        factory_result = source()  # pyright: ignore[reportCallIssue]
        if not isinstance(factory_result, Dag):
            raise TypeError(
                f"@dag_factory must return a Dag, got {type(factory_result).__name__}"
            )
        source = factory_result

    if isinstance(source, Dag):
        if label is None:
            raise ValueError("label is required when passing a Dag to node()")
        if label in self.nodes:
            raise ValueError(
                f"Label '{label}' already exists in DAG. "
                f"Use an explicit label for duplicate callables."
            )
        call_node = CallNode(sub_dag=source, label=label, dag=self)
        self.nodes[label] = call_node
        return call_node

    if isinstance(source, NamespaceNode):
        ns_node = source
    else:
        gref = GlobalRef(source)
        try:
            ns_node = self.namespace.get(str(gref))
        except KeyError:
            ns_node = self.namespace.register(source)

    if label is None:
        _, leaf = _parse_nsref(ns_node.nsref)
        label = leaf

    if label in self.nodes:
        raise ValueError(
            f"Label '{label}' already exists in DAG. "
            f"Use an explicit label for duplicate callables."
        )

    dag_node = DagNode(ns_node=ns_node, label=label, dag=self)
    self.nodes[label] = dag_node
    return dag_node

map(sub_dag, label=None)

Create a MapNode that runs sub_dag on each element of an upstream collection. The sub-DAG must have exactly one source and one sink. Accepts a @dag_factory function, in which case the label is auto-derived from the function name.

Source code in src/lythonic/compose/namespace.py
def map(
    self,
    sub_dag: Dag | Callable[..., Any],
    label: str | None = None,
) -> MapNode:
    """
    Create a `MapNode` that runs `sub_dag` on each element of an
    upstream collection. The sub-DAG must have exactly one source
    and one sink. Accepts a `@dag_factory` function, in which case
    the label is auto-derived from the function name.
    """
    # Expand @dag_factory to a Dag
    if callable(sub_dag) and getattr(sub_dag, "_is_dag_factory", False):
        if label is None:
            label = str(getattr(sub_dag, "__name__", ""))
        factory_result = sub_dag()  # pyright: ignore[reportCallIssue]
        if not isinstance(factory_result, Dag):
            raise TypeError(
                f"@dag_factory must return a Dag, got {type(factory_result).__name__}"
            )
        sub_dag = factory_result

    if not label:
        raise ValueError("label is required for map()")

    if label in self.nodes:
        raise ValueError(
            f"Label '{label}' already exists in DAG. Use a unique label for map nodes."
        )

    map_node = MapNode(
        sub_dag=sub_dag,  # pyright: ignore[reportArgumentType]
        label=label,
        dag=self,
    )
    self.nodes[label] = map_node
    return map_node

add_edge(upstream, downstream)

Register a directed edge between two nodes.

Source code in src/lythonic/compose/namespace.py
def add_edge(self, upstream: DagNode, downstream: DagNode) -> DagEdge:
    """Register a directed edge between two nodes."""
    edge = DagEdge(upstream=upstream.label, downstream=downstream.label)
    self.edges.append(edge)
    return edge

validate()

Check acyclicity and type compatibility between connected nodes.

Source code in src/lythonic/compose/namespace.py
def validate(self) -> None:
    """Check acyclicity and type compatibility between connected nodes."""
    self._check_acyclicity()
    self._check_type_compatibility()

topological_order()

Return nodes in topological order. Stable sort by label for determinism.

Source code in src/lythonic/compose/namespace.py
def topological_order(self) -> list[DagNode]:
    """Return nodes in topological order. Stable sort by label for determinism."""
    in_degree: dict[str, int] = {label: 0 for label in self.nodes}
    adj: dict[str, list[str]] = {label: [] for label in self.nodes}
    for edge in self.edges:
        adj[edge.upstream].append(edge.downstream)
        in_degree[edge.downstream] += 1

    queue = sorted([label for label, deg in in_degree.items() if deg == 0])
    result: list[DagNode] = []
    while queue:
        current = queue.pop(0)
        result.append(self.nodes[current])
        for neighbor in sorted(adj[current]):
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)

    return result

sources()

Return nodes with no upstream edges.

Source code in src/lythonic/compose/namespace.py
def sources(self) -> list[DagNode]:
    """Return nodes with no upstream edges."""
    has_upstream = {edge.downstream for edge in self.edges}
    return [self.nodes[label] for label in self.nodes if label not in has_upstream]

sinks()

Return nodes with no downstream edges.

Source code in src/lythonic/compose/namespace.py
def sinks(self) -> list[DagNode]:
    """Return nodes with no downstream edges."""
    has_downstream = {edge.upstream for edge in self.edges}
    return [self.nodes[label] for label in self.nodes if label not in has_downstream]

__call__(**kwargs) async

Run the DAG directly with DagRunner's default - NullProvenance. Kwargs are matched to source node parameters by name.

Source code in src/lythonic/compose/namespace.py
async def __call__(self, **kwargs: Any) -> DagRunResult:
    """
    Run the DAG directly with DagRunner's default - NullProvenance. Kwargs are matched
    to source node parameters by name.
    """
    from lythonic.compose.dag_runner import (  # pyright: ignore[reportImportCycles]
        DagRunner,
    )

    source_inputs: dict[str, dict[str, Any]] = {}
    for node in self.sources():
        if node.label in kwargs and isinstance(kwargs[node.label], dict):
            source_inputs[node.label] = kwargs[node.label]
            continue
        node_args = node.ns_node.method.args
        if node.ns_node.expects_dag_context():
            node_args = node_args[1:]
        node_kwargs = {a.name: kwargs[a.name] for a in node_args if a.name in kwargs}
        if node_kwargs:
            source_inputs[node.label] = node_kwargs

    runner = DagRunner(self)
    return await runner.run(source_inputs=source_inputs)

DagNode

Unique node in a Dag, wrapping a NamespaceNode. Same callable can appear multiple times with different labels.

Source code in src/lythonic/compose/namespace.py
class DagNode:
    """
    Unique node in a `Dag`, wrapping a `NamespaceNode`.
    Same callable can appear multiple times with different labels.
    """

    ns_node: NamespaceNode
    label: str
    dag: Dag

    def __init__(self, ns_node: NamespaceNode, label: str, dag: Dag) -> None:
        self.ns_node = ns_node
        self.label = label
        self.dag = dag

    def __rshift__(self, other: DagNode) -> DagNode:
        """Register edge from self to other. Returns downstream for chaining."""
        self.dag.add_edge(self, other)
        return other

__rshift__(other)

Register edge from self to other. Returns downstream for chaining.

Source code in src/lythonic/compose/namespace.py
def __rshift__(self, other: DagNode) -> DagNode:
    """Register edge from self to other. Returns downstream for chaining."""
    self.dag.add_edge(self, other)
    return other

DagEdge

Bases: BaseModel

Edge between two DagNodes in a Dag.

Source code in src/lythonic/compose/namespace.py
class DagEdge(BaseModel):
    """Edge between two `DagNode`s in a `Dag`."""

    upstream: str
    downstream: str

CompositeDagNode

Bases: DagNode

Base class for DAG nodes that contain a sub-DAG. Validates the sub-DAG has exactly one source and one sink.

Source code in src/lythonic/compose/namespace.py
class CompositeDagNode(DagNode):
    """
    Base class for DAG nodes that contain a sub-DAG. Validates the
    sub-DAG has exactly one source and one sink.
    """

    sub_dag: Dag

    def __init__(self, sub_dag: Dag, label: str, dag: Dag) -> None:
        sources = sub_dag.sources()
        sinks = sub_dag.sinks()
        if len(sources) != 1:
            raise ValueError(f"Sub-DAG must have exactly one source, found {len(sources)}")
        if len(sinks) != 1:
            raise ValueError(f"Sub-DAG must have exactly one sink, found {len(sinks)}")

        placeholder = NamespaceNode(
            method=Method(lambda: None),
            nsref=f"__composite__:{label}",
            namespace=dag.namespace,
        )
        super().__init__(ns_node=placeholder, label=label, dag=dag)
        self.sub_dag = sub_dag

MapNode

Bases: CompositeDagNode

Runs a sub-DAG on each element of a collection.

Source code in src/lythonic/compose/namespace.py
class MapNode(CompositeDagNode):
    """Runs a sub-DAG on each element of a collection."""

    pass

CallNode

Bases: CompositeDagNode

Runs a sub-DAG once as a single step.

Source code in src/lythonic/compose/namespace.py
class CallNode(CompositeDagNode):
    """Runs a sub-DAG once as a single step."""

    pass