Namespace: Hierarchical registry of callables with metadata and DAG composition.
Provides Namespace for registering callables wrapped in NamespaceNode
(which wraps Method), using GlobalRef-style paths ("branch.sub:leaf").
Includes DagContext as base context for DAG-participating callables.
Path Scheme
. separates namespace levels (branches)
: separates the leaf callable from its namespace
- Example:
"market.data:fetch_prices" -> branch market.data, leaf fetch_prices
Usage
Each Dag owns a Namespace. Passing a callable to dag.node() auto-registers
it in the DAG's namespace (derived from the callable's module and name):
from lythonic.compose.namespace import Dag
def fetch(url: str) -> dict:
return {"source": url, "raw": [1, 2, 3]}
def transform(data: dict) -> dict:
return {"source": data["source"], "values": [v * 2 for v in data["raw"]]}
dag = Dag()
dag.node(fetch) >> dag.node(transform)
You can also register callables in a standalone Namespace and look them up
by path or dot-access (e.g. ns.get("market:fetch") or ns.market.fetch).
Nodes can be tagged at registration for later querying:
ns.register(fetch_prices, nsref="market:fetch", tags={"slow", "market-data"})
ns.register(compute_stats, nsref="market:stats", tags={"fast", "market-data"})
ns.query("market-data") # both nodes
ns.query("slow & market-data") # only fetch
ns.query("~slow") # only stats
ns.query("slow | fast") # both nodes
Tag expressions support & (AND), | (OR), ~ (NOT) with standard
precedence (~ > & > |). Tags must not contain spaces or operator
characters.
Map
Dag.map(sub_dag, label) creates a MapNode that runs a sub-DAG on each
element of an upstream list or dict, concurrently via asyncio.gather:
sub_dag = Dag()
sub_dag.node(tokenize) >> sub_dag.node(count)
parent = Dag()
parent.node(split_text) >> parent.map(sub_dag, label="chunks") >> parent.node(reduce)
The sub-DAG must have exactly one source and one sink.
Composable DAGs
dag.node(sub_dag, label="enrich") creates a CallNode that runs a sub-DAG
once as a single step, passing the upstream output to the sub-DAG's source:
enrich_dag = Dag()
enrich_dag.node(lookup) >> enrich_dag.node(annotate)
parent = Dag()
parent.node(fetch) >> parent.node(enrich_dag, label="enrich") >> parent.node(save)
Callable DAGs
A Dag is directly callable. await dag(**kwargs) creates a DagRunner
with NullProvenance and returns a DagRunResult. Kwargs are matched to
source node parameters by name:
result = await dag(url="https://example.com")
print(result.status) # "completed"
print(result.outputs) # sink node outputs
DAG Factories
Use @dag_factory to mark a function that builds a DAG. When registered
in a namespace, the factory is called and the resulting DAG is registered
with a __ suffix:
from lythonic.compose.namespace import dag_factory, Dag
@dag_factory
def my_pipeline() -> Dag:
dag = Dag()
dag.node(step1) >> dag.node(step2)
return dag
ns.register(my_pipeline) # registers DAG under "module:my_pipeline__"
Namespace
Flat registry of callables wrapped in NamespaceNode.
Uses GlobalRef-style paths as keys: "module.sub:callable_name".
Source code in src/lythonic/compose/namespace.py
| class Namespace:
"""
Flat registry of callables wrapped in `NamespaceNode`.
Uses GlobalRef-style paths as keys: `"module.sub:callable_name"`.
"""
_nodes: dict[str, NamespaceNode]
def __init__(self) -> None:
self._nodes = {}
def register(
self,
c: Callable[..., Any] | str | Dag,
nsref: str | None = None,
decorate: Callable[[Callable[..., Any]], Callable[..., Any]] | None = None,
tags: frozenset[str] | set[str] | list[str] | None = None,
config: NsNodeConfig | None = None,
) -> NamespaceNode:
"""
Register a callable. If `nsref` is `None`, derive from the callable's
module and name. If the callable is marked with `@dag_factory`,
call it and register the returned DAG with a `__` suffix. If
`decorate` is provided, wrap the callable for invocation (metadata
is still extracted from the original). Optional `tags` are stored
on the node for querying via `query()`.
"""
if isinstance(c, Dag):
return self._register_dag(c, nsref, tags=tags, config=config)
# Handle @dag_factory decorated callables
if callable(c) and getattr(c, "_is_dag_factory", False):
gref = GlobalRef(c)
factory_nsref = nsref or f"{gref.module}:{gref.name}__"
dag = c()
if not isinstance(dag, Dag):
raise TypeError(f"@dag_factory {gref} must return a Dag, got {type(dag).__name__}")
return self._register_dag(dag, factory_nsref, tags=tags, config=config)
gref = GlobalRef(c)
if nsref is None:
nsref = f"{gref.module}:{gref.name}"
method = Method(gref)
decorated = decorate(method.o) if decorate else None
if nsref in self._nodes:
raise ValueError(f"'{nsref}' already exists in namespace")
node = NamespaceNode(
method=method,
nsref=nsref,
namespace=self,
decorated=decorated,
tags=tags,
config=config,
)
self._nodes[nsref] = node
return node
def _register_dag(
self,
dag: Dag,
nsref: str | None,
tags: frozenset[str] | set[str] | list[str] | None = None,
config: NsNodeConfig | None = None,
) -> NamespaceNode:
"""
Register a Dag as a callable NamespaceNode. Sets the DAG's
`parent_namespace` to this namespace for runtime resolution.
The DAG keeps its own namespace; callables are not copied.
"""
if nsref is None:
raise ValueError("nsref is required when registering a Dag")
if dag.parent_namespace is self:
raise ValueError(f"Dag '{nsref}' is already registered with this namespace")
dag.parent_namespace = self
from lythonic.compose.dag_runner import DagRunner # pyright: ignore[reportImportCycles]
runner = DagRunner(dag)
async def dag_wrapper(**kwargs: Any) -> Any:
source_labels = {n.label for n in dag.sources()}
source_inputs: dict[str, dict[str, Any]] = {}
for label in source_labels:
# If a kwarg key matches a source label and its value is a dict,
# use it directly as that node's inputs.
if label in kwargs and isinstance(kwargs[label], dict):
source_inputs[label] = kwargs[label]
continue
node = dag.nodes[label]
node_args = node.ns_node.method.args
if node.ns_node.expects_dag_context():
node_args = node_args[1:]
node_kwargs = {a.name: kwargs[a.name] for a in node_args if a.name in kwargs}
if node_kwargs:
source_inputs[label] = node_kwargs
return await runner.run(source_inputs=source_inputs, dag_nsref=nsref)
method = Method(dag_wrapper)
if nsref in self._nodes:
raise ValueError(f"'{nsref}' already exists in namespace")
node = NamespaceNode(method=method, nsref=nsref, namespace=self, tags=tags, config=config)
self._nodes[nsref] = node
return node
def get(self, nsref: str) -> NamespaceNode:
"""Retrieve a node by nsref. Raises `KeyError` if not found."""
if nsref not in self._nodes:
raise KeyError(f"'{nsref}' not found in namespace")
return self._nodes[nsref]
def register_all(
self,
*cc: Callable[..., Any],
decorate: Callable[[Callable[..., Any]], Callable[..., Any]] | None = None,
tags: frozenset[str] | set[str] | list[str] | None = None,
) -> list[NamespaceNode]:
"""Bulk register callables using derived paths."""
return [self.register(c, decorate=decorate, tags=tags) for c in cc]
def _all_leaves(self) -> list[NamespaceNode]:
"""Return all registered nodes."""
return list(self._nodes.values())
def query(self, expr: str) -> list[NamespaceNode]:
"""
Return all nodes matching a tag expression. Supports `&` (AND),
`|` (OR), `~` (NOT) with standard precedence (`~` > `&` > `|`).
"""
tokens = _parse_tag_expr(expr)
return [node for node in self._all_leaves() if _eval_tag_expr(tokens, node.tags)]
def get_trigger(self, name: str) -> tuple[NamespaceNode, TriggerConfig]:
"""Find a trigger by name across all nodes. Returns (node, trigger_config)."""
for node in self._nodes.values():
for tc in node.config.triggers:
if tc.name == name:
return node, tc
raise KeyError(f"Trigger '{name}' not found")
def to_dict(self) -> list[dict[str, Any]]:
"""Serialize all node configs to a list of dicts (YAML/JSON-ready)."""
return [node.config.model_dump(exclude_none=True) for node in self._all_leaves()]
@classmethod
def from_dict(cls, entries: list[dict[str, Any]]) -> Namespace:
"""Build a Namespace from a list of serialized node configs."""
ns = cls()
for entry in entries:
config = NsNodeConfig.model_validate(entry)
if config.gref is not None:
gref = GlobalRef(config.gref)
instance = gref.get_instance()
ns.register(instance, nsref=config.nsref, config=config)
else:
ns.register(lambda: None, nsref=config.nsref, config=config)
return ns
def __getattr__(self, name: str) -> Any:
# Avoid recursion for internal attributes.
if name == "_nodes":
raise AttributeError(name)
# Look up by name as a leaf suffix (for simple nsrefs like "t:fetch" -> "fetch")
for nsref, node in self._nodes.items():
_, leaf = _parse_nsref(nsref)
if leaf == name:
return node
raise AttributeError(f"'{name}' not found in namespace")
|
register(c, nsref=None, decorate=None, tags=None, config=None)
Register a callable. If nsref is None, derive from the callable's
module and name. If the callable is marked with @dag_factory,
call it and register the returned DAG with a __ suffix. If
decorate is provided, wrap the callable for invocation (metadata
is still extracted from the original). Optional tags are stored
on the node for querying via query().
Source code in src/lythonic/compose/namespace.py
| def register(
self,
c: Callable[..., Any] | str | Dag,
nsref: str | None = None,
decorate: Callable[[Callable[..., Any]], Callable[..., Any]] | None = None,
tags: frozenset[str] | set[str] | list[str] | None = None,
config: NsNodeConfig | None = None,
) -> NamespaceNode:
"""
Register a callable. If `nsref` is `None`, derive from the callable's
module and name. If the callable is marked with `@dag_factory`,
call it and register the returned DAG with a `__` suffix. If
`decorate` is provided, wrap the callable for invocation (metadata
is still extracted from the original). Optional `tags` are stored
on the node for querying via `query()`.
"""
if isinstance(c, Dag):
return self._register_dag(c, nsref, tags=tags, config=config)
# Handle @dag_factory decorated callables
if callable(c) and getattr(c, "_is_dag_factory", False):
gref = GlobalRef(c)
factory_nsref = nsref or f"{gref.module}:{gref.name}__"
dag = c()
if not isinstance(dag, Dag):
raise TypeError(f"@dag_factory {gref} must return a Dag, got {type(dag).__name__}")
return self._register_dag(dag, factory_nsref, tags=tags, config=config)
gref = GlobalRef(c)
if nsref is None:
nsref = f"{gref.module}:{gref.name}"
method = Method(gref)
decorated = decorate(method.o) if decorate else None
if nsref in self._nodes:
raise ValueError(f"'{nsref}' already exists in namespace")
node = NamespaceNode(
method=method,
nsref=nsref,
namespace=self,
decorated=decorated,
tags=tags,
config=config,
)
self._nodes[nsref] = node
return node
|
get(nsref)
Retrieve a node by nsref. Raises KeyError if not found.
Source code in src/lythonic/compose/namespace.py
| def get(self, nsref: str) -> NamespaceNode:
"""Retrieve a node by nsref. Raises `KeyError` if not found."""
if nsref not in self._nodes:
raise KeyError(f"'{nsref}' not found in namespace")
return self._nodes[nsref]
|
register_all(*cc, decorate=None, tags=None)
Bulk register callables using derived paths.
Source code in src/lythonic/compose/namespace.py
| def register_all(
self,
*cc: Callable[..., Any],
decorate: Callable[[Callable[..., Any]], Callable[..., Any]] | None = None,
tags: frozenset[str] | set[str] | list[str] | None = None,
) -> list[NamespaceNode]:
"""Bulk register callables using derived paths."""
return [self.register(c, decorate=decorate, tags=tags) for c in cc]
|
query(expr)
Return all nodes matching a tag expression. Supports & (AND),
| (OR), ~ (NOT) with standard precedence (~ > & > |).
Source code in src/lythonic/compose/namespace.py
| def query(self, expr: str) -> list[NamespaceNode]:
"""
Return all nodes matching a tag expression. Supports `&` (AND),
`|` (OR), `~` (NOT) with standard precedence (`~` > `&` > `|`).
"""
tokens = _parse_tag_expr(expr)
return [node for node in self._all_leaves() if _eval_tag_expr(tokens, node.tags)]
|
get_trigger(name)
Find a trigger by name across all nodes. Returns (node, trigger_config).
Source code in src/lythonic/compose/namespace.py
| def get_trigger(self, name: str) -> tuple[NamespaceNode, TriggerConfig]:
"""Find a trigger by name across all nodes. Returns (node, trigger_config)."""
for node in self._nodes.values():
for tc in node.config.triggers:
if tc.name == name:
return node, tc
raise KeyError(f"Trigger '{name}' not found")
|
to_dict()
Serialize all node configs to a list of dicts (YAML/JSON-ready).
Source code in src/lythonic/compose/namespace.py
| def to_dict(self) -> list[dict[str, Any]]:
"""Serialize all node configs to a list of dicts (YAML/JSON-ready)."""
return [node.config.model_dump(exclude_none=True) for node in self._all_leaves()]
|
from_dict(entries)
classmethod
Build a Namespace from a list of serialized node configs.
Source code in src/lythonic/compose/namespace.py
| @classmethod
def from_dict(cls, entries: list[dict[str, Any]]) -> Namespace:
"""Build a Namespace from a list of serialized node configs."""
ns = cls()
for entry in entries:
config = NsNodeConfig.model_validate(entry)
if config.gref is not None:
gref = GlobalRef(config.gref)
instance = gref.get_instance()
ns.register(instance, nsref=config.nsref, config=config)
else:
ns.register(lambda: None, nsref=config.nsref, config=config)
return ns
|
NamespaceNode
Wraps a Method with namespace identity. Callable -- delegates to the
decorated callable if present, otherwise to method.o.
Source code in src/lythonic/compose/namespace.py
| class NamespaceNode:
"""
Wraps a `Method` with namespace identity. Callable -- delegates to the
decorated callable if present, otherwise to `method.o`.
"""
method: Method
namespace: Namespace
config: NsNodeConfig
_decorated: Callable[..., Any] | None
def __init__(
self,
method: Method,
nsref: str,
namespace: Namespace,
decorated: Callable[..., Any] | None = None,
tags: frozenset[str] | set[str] | list[str] | None = None,
config: NsNodeConfig | None = None,
) -> None:
self.method = method
self.namespace = namespace
self._decorated = decorated
self.metadata: dict[str, Any] = {}
validated_tags = _validate_tags(tags)
self.config = config or NsNodeConfig(
nsref=nsref,
gref=method.gref if method.gref else None,
tags=sorted(validated_tags) if validated_tags else None,
)
@property
def nsref(self) -> str:
return self.config.nsref
@property
def tags(self) -> frozenset[str]:
return frozenset(self.config.tags) if self.config.tags else frozenset()
def __call__(self, *args: Any, **kwargs: Any) -> Any:
if self._decorated is not None:
return self._decorated(*args, **kwargs)
return self.method(*args, **kwargs)
def expects_dag_context(self) -> bool:
"""True if first parameter is `DagContext` or a subclass."""
return self.dag_context_type() is not None
def dag_context_type(self) -> type[DagContext] | None:
"""Return the `DagContext` subclass expected, or `None`."""
resolved = _resolve_first_param_type(self.method.o)
if resolved is not None and issubclass(resolved, DagContext):
return resolved
return None
def __repr__(self) -> str: # pyright: ignore[reportImplicitOverride]
return f"NamespaceNode(nsref={self.nsref!r})"
|
expects_dag_context()
True if first parameter is DagContext or a subclass.
Source code in src/lythonic/compose/namespace.py
| def expects_dag_context(self) -> bool:
"""True if first parameter is `DagContext` or a subclass."""
return self.dag_context_type() is not None
|
dag_context_type()
Return the DagContext subclass expected, or None.
Source code in src/lythonic/compose/namespace.py
| def dag_context_type(self) -> type[DagContext] | None:
"""Return the `DagContext` subclass expected, or `None`."""
resolved = _resolve_first_param_type(self.method.o)
if resolved is not None and issubclass(resolved, DagContext):
return resolved
return None
|
DagContext
Bases: BaseModel
Base context injected into DAG-participating callables.
Subclass to add domain-specific fields.
Source code in src/lythonic/compose/namespace.py
| class DagContext(BaseModel):
"""
Base context injected into DAG-participating callables.
Subclass to add domain-specific fields.
"""
dag_nsref: str
node_label: str
run_id: str
|
Dag
Directed acyclic graph of DagNodes with type-based validation.
Use node() to add nodes and >> to declare edges.
Source code in src/lythonic/compose/namespace.py
| class Dag:
"""
Directed acyclic graph of `DagNode`s with type-based validation.
Use `node()` to add nodes and `>>` to declare edges.
"""
nodes: dict[str, DagNode]
edges: list[DagEdge]
namespace: Namespace
def __init__(self) -> None:
self.nodes = {}
self.edges = []
self.namespace = Namespace()
self.parent_namespace: Namespace | None = None
def resolve(self, nsref: str) -> NamespaceNode:
"""
Resolve a callable by nsref. Tries `parent_namespace` first
(if set), then falls back to the DAG's own namespace.
"""
if self.parent_namespace is not None:
try:
return self.parent_namespace.get(nsref)
except KeyError:
pass
return self.namespace.get(nsref)
def node(
self,
source: NamespaceNode | Callable[..., Any] | Dag,
label: str | None = None,
) -> DagNode:
"""
Create a unique `DagNode` in this graph. If `label` is `None`,
derived from the source's nsref leaf name. Raises `ValueError`
if the label already exists.
When `source` is a `Dag` or `@dag_factory`, creates a `CallNode`
that runs the sub-DAG as a single step. Label is auto-derived
from the factory function name if not provided.
"""
# Expand @dag_factory to a Dag
if callable(source) and getattr(source, "_is_dag_factory", False):
if label is None:
label = str(getattr(source, "__name__", ""))
factory_result = source() # pyright: ignore[reportCallIssue]
if not isinstance(factory_result, Dag):
raise TypeError(
f"@dag_factory must return a Dag, got {type(factory_result).__name__}"
)
source = factory_result
if isinstance(source, Dag):
if label is None:
raise ValueError("label is required when passing a Dag to node()")
if label in self.nodes:
raise ValueError(
f"Label '{label}' already exists in DAG. "
f"Use an explicit label for duplicate callables."
)
call_node = CallNode(sub_dag=source, label=label, dag=self)
self.nodes[label] = call_node
return call_node
if isinstance(source, NamespaceNode):
ns_node = source
else:
gref = GlobalRef(source)
try:
ns_node = self.namespace.get(str(gref))
except KeyError:
ns_node = self.namespace.register(source)
if label is None:
_, leaf = _parse_nsref(ns_node.nsref)
label = leaf
if label in self.nodes:
raise ValueError(
f"Label '{label}' already exists in DAG. "
f"Use an explicit label for duplicate callables."
)
dag_node = DagNode(ns_node=ns_node, label=label, dag=self)
self.nodes[label] = dag_node
return dag_node
def map(
self,
sub_dag: Dag | Callable[..., Any],
label: str | None = None,
) -> MapNode:
"""
Create a `MapNode` that runs `sub_dag` on each element of an
upstream collection. The sub-DAG must have exactly one source
and one sink. Accepts a `@dag_factory` function, in which case
the label is auto-derived from the function name.
"""
# Expand @dag_factory to a Dag
if callable(sub_dag) and getattr(sub_dag, "_is_dag_factory", False):
if label is None:
label = str(getattr(sub_dag, "__name__", ""))
factory_result = sub_dag() # pyright: ignore[reportCallIssue]
if not isinstance(factory_result, Dag):
raise TypeError(
f"@dag_factory must return a Dag, got {type(factory_result).__name__}"
)
sub_dag = factory_result
if not label:
raise ValueError("label is required for map()")
if label in self.nodes:
raise ValueError(
f"Label '{label}' already exists in DAG. Use a unique label for map nodes."
)
map_node = MapNode(
sub_dag=sub_dag, # pyright: ignore[reportArgumentType]
label=label,
dag=self,
)
self.nodes[label] = map_node
return map_node
def add_edge(self, upstream: DagNode, downstream: DagNode) -> DagEdge:
"""Register a directed edge between two nodes."""
edge = DagEdge(upstream=upstream.label, downstream=downstream.label)
self.edges.append(edge)
return edge
def validate(self) -> None:
"""Check acyclicity and type compatibility between connected nodes."""
self._check_acyclicity()
self._check_type_compatibility()
def _check_acyclicity(self) -> None:
"""Kahn's algorithm for topological sort -- raises if cycle detected."""
in_degree: dict[str, int] = {label: 0 for label in self.nodes}
adj: dict[str, list[str]] = {label: [] for label in self.nodes}
for edge in self.edges:
adj[edge.upstream].append(edge.downstream)
in_degree[edge.downstream] += 1
queue = [label for label, deg in in_degree.items() if deg == 0]
visited = 0
while queue:
current = queue.pop(0)
visited += 1
for neighbor in adj[current]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if visited != len(self.nodes):
raise ValueError("DAG contains a cycle")
def _check_type_compatibility(self) -> None:
"""Warn-level check: upstream return type should match a downstream input type."""
for edge in self.edges:
upstream_node = self.nodes[edge.upstream]
downstream_node = self.nodes[edge.downstream]
upstream_return = upstream_node.ns_node.method.return_annotation
if upstream_return is None or upstream_return is _inspect.Parameter.empty:
continue
downstream_args = downstream_node.ns_node.method.args
if downstream_node.ns_node.expects_dag_context() and len(downstream_args) > 0:
downstream_args = downstream_args[1:]
if not downstream_args:
continue
downstream_types = {
arg.annotation for arg in downstream_args if arg.annotation is not None
}
# Also accept list[X] params when upstream returns X (fan-in)
fan_in_types: set[str] = set()
for dt in downstream_types:
if isinstance(dt, str) and dt.startswith("list[") and dt.endswith("]"):
fan_in_types.add(dt[5:-1])
upstream_str = str(upstream_return)
if (
downstream_types
and upstream_return not in downstream_types
and upstream_str not in fan_in_types
):
logging.getLogger(__name__).warning(
"Type mismatch on edge %s -> %s: upstream returns %s, downstream accepts %s",
edge.upstream,
edge.downstream,
upstream_return,
downstream_types,
)
def topological_order(self) -> list[DagNode]:
"""Return nodes in topological order. Stable sort by label for determinism."""
in_degree: dict[str, int] = {label: 0 for label in self.nodes}
adj: dict[str, list[str]] = {label: [] for label in self.nodes}
for edge in self.edges:
adj[edge.upstream].append(edge.downstream)
in_degree[edge.downstream] += 1
queue = sorted([label for label, deg in in_degree.items() if deg == 0])
result: list[DagNode] = []
while queue:
current = queue.pop(0)
result.append(self.nodes[current])
for neighbor in sorted(adj[current]):
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return result
def sources(self) -> list[DagNode]:
"""Return nodes with no upstream edges."""
has_upstream = {edge.downstream for edge in self.edges}
return [self.nodes[label] for label in self.nodes if label not in has_upstream]
def sinks(self) -> list[DagNode]:
"""Return nodes with no downstream edges."""
has_downstream = {edge.upstream for edge in self.edges}
return [self.nodes[label] for label in self.nodes if label not in has_downstream]
async def __call__(self, **kwargs: Any) -> DagRunResult:
"""
Run the DAG directly with DagRunner's default - NullProvenance. Kwargs are matched
to source node parameters by name.
"""
from lythonic.compose.dag_runner import ( # pyright: ignore[reportImportCycles]
DagRunner,
)
source_inputs: dict[str, dict[str, Any]] = {}
for node in self.sources():
if node.label in kwargs and isinstance(kwargs[node.label], dict):
source_inputs[node.label] = kwargs[node.label]
continue
node_args = node.ns_node.method.args
if node.ns_node.expects_dag_context():
node_args = node_args[1:]
node_kwargs = {a.name: kwargs[a.name] for a in node_args if a.name in kwargs}
if node_kwargs:
source_inputs[node.label] = node_kwargs
runner = DagRunner(self)
return await runner.run(source_inputs=source_inputs)
def __enter__(self) -> Dag:
return self
def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any
) -> None:
if exc_type is None:
self.validate()
|
resolve(nsref)
Resolve a callable by nsref. Tries parent_namespace first
(if set), then falls back to the DAG's own namespace.
Source code in src/lythonic/compose/namespace.py
| def resolve(self, nsref: str) -> NamespaceNode:
"""
Resolve a callable by nsref. Tries `parent_namespace` first
(if set), then falls back to the DAG's own namespace.
"""
if self.parent_namespace is not None:
try:
return self.parent_namespace.get(nsref)
except KeyError:
pass
return self.namespace.get(nsref)
|
node(source, label=None)
Create a unique DagNode in this graph. If label is None,
derived from the source's nsref leaf name. Raises ValueError
if the label already exists.
When source is a Dag or @dag_factory, creates a CallNode
that runs the sub-DAG as a single step. Label is auto-derived
from the factory function name if not provided.
Source code in src/lythonic/compose/namespace.py
| def node(
self,
source: NamespaceNode | Callable[..., Any] | Dag,
label: str | None = None,
) -> DagNode:
"""
Create a unique `DagNode` in this graph. If `label` is `None`,
derived from the source's nsref leaf name. Raises `ValueError`
if the label already exists.
When `source` is a `Dag` or `@dag_factory`, creates a `CallNode`
that runs the sub-DAG as a single step. Label is auto-derived
from the factory function name if not provided.
"""
# Expand @dag_factory to a Dag
if callable(source) and getattr(source, "_is_dag_factory", False):
if label is None:
label = str(getattr(source, "__name__", ""))
factory_result = source() # pyright: ignore[reportCallIssue]
if not isinstance(factory_result, Dag):
raise TypeError(
f"@dag_factory must return a Dag, got {type(factory_result).__name__}"
)
source = factory_result
if isinstance(source, Dag):
if label is None:
raise ValueError("label is required when passing a Dag to node()")
if label in self.nodes:
raise ValueError(
f"Label '{label}' already exists in DAG. "
f"Use an explicit label for duplicate callables."
)
call_node = CallNode(sub_dag=source, label=label, dag=self)
self.nodes[label] = call_node
return call_node
if isinstance(source, NamespaceNode):
ns_node = source
else:
gref = GlobalRef(source)
try:
ns_node = self.namespace.get(str(gref))
except KeyError:
ns_node = self.namespace.register(source)
if label is None:
_, leaf = _parse_nsref(ns_node.nsref)
label = leaf
if label in self.nodes:
raise ValueError(
f"Label '{label}' already exists in DAG. "
f"Use an explicit label for duplicate callables."
)
dag_node = DagNode(ns_node=ns_node, label=label, dag=self)
self.nodes[label] = dag_node
return dag_node
|
map(sub_dag, label=None)
Create a MapNode that runs sub_dag on each element of an
upstream collection. The sub-DAG must have exactly one source
and one sink. Accepts a @dag_factory function, in which case
the label is auto-derived from the function name.
Source code in src/lythonic/compose/namespace.py
| def map(
self,
sub_dag: Dag | Callable[..., Any],
label: str | None = None,
) -> MapNode:
"""
Create a `MapNode` that runs `sub_dag` on each element of an
upstream collection. The sub-DAG must have exactly one source
and one sink. Accepts a `@dag_factory` function, in which case
the label is auto-derived from the function name.
"""
# Expand @dag_factory to a Dag
if callable(sub_dag) and getattr(sub_dag, "_is_dag_factory", False):
if label is None:
label = str(getattr(sub_dag, "__name__", ""))
factory_result = sub_dag() # pyright: ignore[reportCallIssue]
if not isinstance(factory_result, Dag):
raise TypeError(
f"@dag_factory must return a Dag, got {type(factory_result).__name__}"
)
sub_dag = factory_result
if not label:
raise ValueError("label is required for map()")
if label in self.nodes:
raise ValueError(
f"Label '{label}' already exists in DAG. Use a unique label for map nodes."
)
map_node = MapNode(
sub_dag=sub_dag, # pyright: ignore[reportArgumentType]
label=label,
dag=self,
)
self.nodes[label] = map_node
return map_node
|
add_edge(upstream, downstream)
Register a directed edge between two nodes.
Source code in src/lythonic/compose/namespace.py
| def add_edge(self, upstream: DagNode, downstream: DagNode) -> DagEdge:
"""Register a directed edge between two nodes."""
edge = DagEdge(upstream=upstream.label, downstream=downstream.label)
self.edges.append(edge)
return edge
|
validate()
Check acyclicity and type compatibility between connected nodes.
Source code in src/lythonic/compose/namespace.py
| def validate(self) -> None:
"""Check acyclicity and type compatibility between connected nodes."""
self._check_acyclicity()
self._check_type_compatibility()
|
topological_order()
Return nodes in topological order. Stable sort by label for determinism.
Source code in src/lythonic/compose/namespace.py
| def topological_order(self) -> list[DagNode]:
"""Return nodes in topological order. Stable sort by label for determinism."""
in_degree: dict[str, int] = {label: 0 for label in self.nodes}
adj: dict[str, list[str]] = {label: [] for label in self.nodes}
for edge in self.edges:
adj[edge.upstream].append(edge.downstream)
in_degree[edge.downstream] += 1
queue = sorted([label for label, deg in in_degree.items() if deg == 0])
result: list[DagNode] = []
while queue:
current = queue.pop(0)
result.append(self.nodes[current])
for neighbor in sorted(adj[current]):
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return result
|
sources()
Return nodes with no upstream edges.
Source code in src/lythonic/compose/namespace.py
| def sources(self) -> list[DagNode]:
"""Return nodes with no upstream edges."""
has_upstream = {edge.downstream for edge in self.edges}
return [self.nodes[label] for label in self.nodes if label not in has_upstream]
|
sinks()
Return nodes with no downstream edges.
Source code in src/lythonic/compose/namespace.py
| def sinks(self) -> list[DagNode]:
"""Return nodes with no downstream edges."""
has_downstream = {edge.upstream for edge in self.edges}
return [self.nodes[label] for label in self.nodes if label not in has_downstream]
|
__call__(**kwargs)
async
Run the DAG directly with DagRunner's default - NullProvenance. Kwargs are matched
to source node parameters by name.
Source code in src/lythonic/compose/namespace.py
| async def __call__(self, **kwargs: Any) -> DagRunResult:
"""
Run the DAG directly with DagRunner's default - NullProvenance. Kwargs are matched
to source node parameters by name.
"""
from lythonic.compose.dag_runner import ( # pyright: ignore[reportImportCycles]
DagRunner,
)
source_inputs: dict[str, dict[str, Any]] = {}
for node in self.sources():
if node.label in kwargs and isinstance(kwargs[node.label], dict):
source_inputs[node.label] = kwargs[node.label]
continue
node_args = node.ns_node.method.args
if node.ns_node.expects_dag_context():
node_args = node_args[1:]
node_kwargs = {a.name: kwargs[a.name] for a in node_args if a.name in kwargs}
if node_kwargs:
source_inputs[node.label] = node_kwargs
runner = DagRunner(self)
return await runner.run(source_inputs=source_inputs)
|
DagNode
Unique node in a Dag, wrapping a NamespaceNode.
Same callable can appear multiple times with different labels.
Source code in src/lythonic/compose/namespace.py
| class DagNode:
"""
Unique node in a `Dag`, wrapping a `NamespaceNode`.
Same callable can appear multiple times with different labels.
"""
ns_node: NamespaceNode
label: str
dag: Dag
def __init__(self, ns_node: NamespaceNode, label: str, dag: Dag) -> None:
self.ns_node = ns_node
self.label = label
self.dag = dag
def __rshift__(self, other: DagNode) -> DagNode:
"""Register edge from self to other. Returns downstream for chaining."""
self.dag.add_edge(self, other)
return other
|
__rshift__(other)
Register edge from self to other. Returns downstream for chaining.
Source code in src/lythonic/compose/namespace.py
| def __rshift__(self, other: DagNode) -> DagNode:
"""Register edge from self to other. Returns downstream for chaining."""
self.dag.add_edge(self, other)
return other
|
DagEdge
Bases: BaseModel
Edge between two DagNodes in a Dag.
Source code in src/lythonic/compose/namespace.py
| class DagEdge(BaseModel):
"""Edge between two `DagNode`s in a `Dag`."""
upstream: str
downstream: str
|
CompositeDagNode
Bases: DagNode
Base class for DAG nodes that contain a sub-DAG. Validates the
sub-DAG has exactly one source and one sink.
Source code in src/lythonic/compose/namespace.py
| class CompositeDagNode(DagNode):
"""
Base class for DAG nodes that contain a sub-DAG. Validates the
sub-DAG has exactly one source and one sink.
"""
sub_dag: Dag
def __init__(self, sub_dag: Dag, label: str, dag: Dag) -> None:
sources = sub_dag.sources()
sinks = sub_dag.sinks()
if len(sources) != 1:
raise ValueError(f"Sub-DAG must have exactly one source, found {len(sources)}")
if len(sinks) != 1:
raise ValueError(f"Sub-DAG must have exactly one sink, found {len(sinks)}")
placeholder = NamespaceNode(
method=Method(lambda: None),
nsref=f"__composite__:{label}",
namespace=dag.namespace,
)
super().__init__(ns_node=placeholder, label=label, dag=dag)
self.sub_dag = sub_dag
|
MapNode
Bases: CompositeDagNode
Runs a sub-DAG on each element of a collection.
Source code in src/lythonic/compose/namespace.py
| class MapNode(CompositeDagNode):
"""Runs a sub-DAG on each element of a collection."""
pass
|
CallNode
Bases: CompositeDagNode
Runs a sub-DAG once as a single step.
Source code in src/lythonic/compose/namespace.py
| class CallNode(CompositeDagNode):
"""Runs a sub-DAG once as a single step."""
pass
|