Skip to content

v0.0.12

New: Composable DAGs

DAGs can now contain other DAGs as steps, with a node hierarchy for map-reduce and inline sub-DAG execution.

  • Dag.map(sub_dag, label) — run a sub-DAG on each element of a list or dict, concurrently via asyncio.gather. Results collected back into the same collection type.
  • CallNode / Dag.node(sub_dag, label) — run a sub-DAG once as a single step in the parent DAG.
  • CompositeDagNode — base class for MapNode and CallNode, validates one source and one sink in the sub-DAG.
  • Dag-Namespace refactoring — each Dag owns its own Namespace. dag.node(callable) auto-registers callables. Removed clone(), _merge_and_wire(), and DagNode >> Dag.
  • Dag.parent_namespace — set when a DAG is registered in a namespace. Dag.resolve(nsref) checks parent first, then own namespace.

New: Callable DAGs

  • await dag(**kwargs) — run a DAG directly with NullProvenance. Kwargs matched to source node parameters by name. Returns DagRunResult.

New: DAG Factory Convention

  • @dag_factory — marks a function as a DAG factory. When registered in a namespace, the factory is called and the DAG registered with __ suffix. Dag.node() and Dag.map() also accept factories directly, auto-deriving the label from the function name.
  • GlobalRef __ suffixGlobalRef("module:name__") falls back to calling module.name() if module.name__ doesn't exist.

New: Event-Driven Triggers (lythonic.compose.trigger)

  • TriggerDef — declarative trigger definitions (poll/push) registered via Namespace.register_trigger(). Purely metadata.
  • TriggerStore — SQLite-backed persistence for trigger activations and events (separate DB from DagProvenance).
  • TriggerManager — runtime coordinator: activate(), deactivate(), fire(name, payload) for push triggers, start()/stop() for background poll loop. Uses cron expressions via croniter.

New: DAG Execution Improvements

  • Sync executor — sync DAG nodes run in asyncio thread executor by default, preventing event loop blocking. @inline decorator to opt out.
  • Runtime resolutionDagRunner._call_node() resolves callables via Dag.resolve() at runtime, enabling cache wrappers registered in the parent namespace to override raw callables.
  • Edge traversal trackingedge_traversals table in DagProvenance records which edges were traversed during execution.
  • Node type markersnode_type column (source/sink/internal/ composite) on node_executions for DAG visualization.
  • Node run log contextContextVar-based run_id and node_label available during node execution via logging.Filter.

Changed

  • DagRunner constructor — takes provenance parameter instead of db_path. Defaults to NullProvenance.
  • register_cached_callablensref is now optional (last parameter).
  • CacheProhibitDirectCall — new ContextVar-based guard with .require() static method for methods that must go through cache wrapper.

Dependencies

  • Added croniter for cron expression parsing in triggers.

Documentation

  • Updated compose pipeline tutorial for new API patterns.
  • Added composable DAGs how-to guide.
  • Added scheduled triggers tutorial.
  • Updated module docstrings for namespace.py and dag_runner.py.
  • Added trigger reference page.