Skip to content

lythonic.compose.dag_provenance

SQLite-backed storage for DAG run state and node execution traces.

DagProvenance: SQLite-backed storage for DAG run state and node execution traces.

Records the full lifecycle of each DAG run: creation, node-by-node execution (inputs, outputs, timing, errors), and final status. Supports querying for restart and replay scenarios.

Pydantic Inspection Models

DagRun, NodeExecution, and EdgeTraversal are Pydantic models returned by query methods like get_run() and get_recent_runs(). All timestamps are timezone-aware UTC datetimes. DagRun.nodes is a dict keyed by node_label. Each NodeExecution has an edges list and a sub_dags dict (non-None for composite nodes only, keyed by expansion key like "chunks[0]" or "label[]"). Convenience methods:

  • DagRun.latest_update() — most recent timestamp across all nodes/edges
  • DagRun.nodes_changed_since(dt) — nodes updated after a given datetime

Parent Run Tracking

Sub-DAG runs link to their parent via parent_run_id. Use get_child_runs(parent_run_id) to recursively retrieve all descendant runs (children, grandchildren, etc.) using a recursive CTE.

Batch Operations

complete_node_with_edges() and fail_node_and_finish_run() batch multiple writes (node status + edge traversals, or node failure + run finish) into a single open_sqlite_db cycle to reduce lock contention.

Serialization Helpers

json_default() and safe_json_dumps() handle Pydantic models and other non-JSON-serializable types when recording provenance data.

Private _-prefixed methods take a cursor and don't commit — they're building blocks for batch operations. Public methods open the DB, batch writes, and commit in one cycle.

DagProvenance

SQLite-backed storage for DAG run state and node execution traces.

Public methods batch related writes into a single open_sqlite_db cycle.

Source code in src/lythonic/compose/dag_provenance.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
class DagProvenance:
    """
    SQLite-backed storage for DAG run state and node execution traces.

    Public methods batch related writes into a single `open_sqlite_db` cycle.
    """

    db_path: Path

    def __init__(self, db_path: Path) -> None:
        self.db_path = db_path
        self.db_path.parent.mkdir(parents=True, exist_ok=True)
        with open_sqlite_db(self.db_path) as conn:
            cursor = conn.cursor()
            execute_sql(cursor, _DAG_RUNS_DDL)
            execute_sql(cursor, _NODE_EXECUTIONS_DDL)
            execute_sql(cursor, _EDGE_TRAVERSALS_DDL)
            conn.commit()

    def create_run(
        self,
        run_id: str,
        dag_nsref: str,
        source_inputs: dict[str, Any],
        parent_run_id: str | None = None,
    ) -> None:
        """Create a new run record with status 'running'."""
        with open_sqlite_db(self.db_path) as conn:
            _insert_run(conn.cursor(), run_id, dag_nsref, source_inputs, parent_run_id)
            conn.commit()

    def update_run_status(self, run_id: str, status: str) -> None:
        """Update the status of a run without setting finished_at."""
        with open_sqlite_db(self.db_path) as conn:
            _update_run_status(conn.cursor(), run_id, status)
            conn.commit()

    def finish_run(self, run_id: str, status: str, sink_outputs_json: str | None = None) -> None:
        """Set final status and finished_at timestamp."""
        with open_sqlite_db(self.db_path) as conn:
            _finish_run(conn.cursor(), run_id, status, sink_outputs_json)
            conn.commit()

    def record_node_start(
        self,
        run_id: str,
        node_label: str,
        input_json: str,
        is_source: bool = False,
        is_sink: bool = False,
    ) -> None:
        """Record that a node has started execution."""
        with open_sqlite_db(self.db_path) as conn:
            _insert_node_start(conn.cursor(), run_id, node_label, input_json, is_source, is_sink)
            conn.commit()

    def record_node_skipped(self, run_id: str, node_label: str, output_json: str) -> None:
        """Record a node as skipped with a copied output (used in replay)."""
        with open_sqlite_db(self.db_path) as conn:
            _insert_node_skipped(conn.cursor(), run_id, node_label, output_json)
            conn.commit()

    # Batch operations — multiple writes in one open/close cycle

    def complete_node_with_edges(
        self,
        run_id: str,
        node_label: str,
        output_json: str,
        edges: list[tuple[str, str]],
    ) -> None:
        """Record node completion and all outgoing edge traversals in one batch."""
        with open_sqlite_db(self.db_path) as conn:
            cursor = conn.cursor()
            _update_node_complete(cursor, run_id, node_label, output_json)
            for upstream, downstream in edges:
                _insert_edge_traversal(cursor, run_id, upstream, downstream)
            conn.commit()

    def fail_node_and_finish_run(
        self,
        run_id: str,
        node_label: str,
        error: str,
    ) -> None:
        """Record node failure and mark the run as failed in one batch."""
        with open_sqlite_db(self.db_path) as conn:
            cursor = conn.cursor()
            _update_node_failed(cursor, run_id, node_label, error)
            _finish_run(cursor, run_id, "failed")
            conn.commit()

    # Read operations

    def get_run(self, run_id: str) -> dict[str, Any] | None:
        """Get a run record by ID, or None if not found."""
        with open_sqlite_db(self.db_path) as conn:
            cursor = conn.cursor()
            execute_sql(
                cursor,
                "SELECT run_id, dag_nsref, parent_run_id, status, started_at, finished_at, "
                "source_inputs_json, sink_outputs_json FROM dag_runs WHERE run_id = ?",
                (run_id,),
            )
            row = cursor.fetchone()
            if row is None:
                return None
            return {
                "run_id": row[0],
                "dag_nsref": row[1],
                "parent_run_id": row[2],
                "status": row[3],
                "started_at": row[4],
                "finished_at": row[5],
                "source_inputs_json": row[6],
                "sink_outputs_json": row[7],
            }

    def get_node_executions(self, run_id: str) -> list[dict[str, Any]]:
        """Get all node execution records for a run."""
        with open_sqlite_db(self.db_path) as conn:
            cursor = conn.cursor()
            execute_sql(
                cursor,
                "SELECT run_id, node_label, status, is_source, is_sink, input_json, output_json, "
                "started_at, finished_at, error "
                "FROM node_executions WHERE run_id = ?",
                (run_id,),
            )
            return [
                {
                    "run_id": r[0],
                    "node_label": r[1],
                    "status": r[2],
                    "is_source": bool(r[3]),
                    "is_sink": bool(r[4]),
                    "input_json": r[5],
                    "output_json": r[6],
                    "started_at": r[7],
                    "finished_at": r[8],
                    "error": r[9],
                }
                for r in cursor.fetchall()
            ]

    def get_node_output(self, run_id: str, node_label: str) -> str | None:
        """Get the output JSON of a completed or skipped node, or None."""
        with open_sqlite_db(self.db_path) as conn:
            cursor = conn.cursor()
            execute_sql(
                cursor,
                "SELECT output_json FROM node_executions "
                "WHERE run_id = ? AND node_label = ? AND status IN ('completed', 'skipped')",
                (run_id, node_label),
            )
            row = cursor.fetchone()
            return row[0] if row else None

    def get_pending_nodes(self, run_id: str) -> list[str]:
        """Get labels of nodes that are not yet completed or skipped."""
        with open_sqlite_db(self.db_path) as conn:
            cursor = conn.cursor()
            execute_sql(
                cursor,
                "SELECT node_label FROM node_executions "
                "WHERE run_id = ? AND status IN ('pending', 'running', 'failed')",
                (run_id,),
            )
            return [row[0] for row in cursor.fetchall()]

    def get_edge_traversals(self, run_id: str) -> list[dict[str, Any]]:
        """Get all edge traversals for a run."""
        with open_sqlite_db(self.db_path) as conn:
            cursor = conn.cursor()
            execute_sql(
                cursor,
                "SELECT run_id, upstream_label, downstream_label, traversed_at "
                "FROM edge_traversals WHERE run_id = ? ORDER BY traversed_at",
                (run_id,),
            )
            return [
                {
                    "run_id": r[0],
                    "upstream_label": r[1],
                    "downstream_label": r[2],
                    "traversed_at": r[3],
                }
                for r in cursor.fetchall()
            ]

    # Inspection API

    def _load_dag_runs(
        self, cursor: sqlite3.Cursor, run_rows: list[tuple[Any, ...]]
    ) -> list[DagRun]:
        """
        Build DagRun models with nested nodes, edges, and sub-DAGs from run rows.

        JSON payload fields (`input_json`, `output_json`, `source_inputs_json`,
        `sink_outputs_json`) are NOT loaded here — `io` stays `None` on all
        returned models. Use `load_io()` to populate them on demand.
        """
        if not run_rows:
            return []

        run_ids = [r[0] for r in run_rows]
        placeholders = ",".join("?" * len(run_ids))

        # Load node executions (no JSON payload columns)
        execute_sql(
            cursor,
            f"SELECT run_id, node_label, status, is_source, is_sink, "
            f"started_at, finished_at, error "
            f"FROM node_executions WHERE run_id IN ({placeholders})",
            tuple(run_ids),
        )
        nodes_by_run: dict[str, dict[str, NodeExecution]] = {}
        for r in cursor.fetchall():
            node = NodeExecution(
                node_label=r[1],
                status=r[2],
                is_source=bool(r[3]),
                is_sink=bool(r[4]),
                started_at=_ts(r[5]),
                finished_at=_ts(r[6]),
                error=r[7],
            )
            nodes_by_run.setdefault(r[0], {})[r[1]] = node

        # Load edge traversals and attach to upstream nodes
        execute_sql(
            cursor,
            f"SELECT run_id, upstream_label, downstream_label, traversed_at "
            f"FROM edge_traversals WHERE run_id IN ({placeholders}) ORDER BY traversed_at",
            tuple(run_ids),
        )
        for r in cursor.fetchall():
            run_id, upstream, downstream, ts = r
            edge = EdgeTraversal(downstream_label=downstream, traversed_at=_ts(ts))  # pyright: ignore[reportArgumentType]
            run_nodes = nodes_by_run.get(run_id)
            if run_nodes and upstream in run_nodes:
                run_nodes[upstream].edges.append(edge)

        # Build DagRun models (no JSON payload columns)
        dag_runs: list[DagRun] = []
        runs_by_id: dict[str, DagRun] = {}
        for r in run_rows:
            dag_run = DagRun(
                run_id=r[0],
                dag_nsref=r[1],
                parent_run_id=r[2],
                status=r[3],
                started_at=_ts(r[4]),  # pyright: ignore[reportArgumentType]
                finished_at=_ts(r[5]),
                nodes=nodes_by_run.get(r[0], {}),
            )
            dag_runs.append(dag_run)
            runs_by_id[r[0]] = dag_run

        # Detect composite nodes from child runs across ALL loaded run_ids
        child_placeholders = ",".join("?" * len(run_ids))
        execute_sql(
            cursor,
            f"SELECT run_id, dag_nsref, parent_run_id, status, started_at, finished_at "
            f"FROM dag_runs WHERE parent_run_id IN ({child_placeholders})",
            tuple(run_ids),
        )
        child_rows = cursor.fetchall()
        if child_rows:
            child_runs = self._load_dag_runs(cursor, child_rows)
            for child in child_runs:
                parent_run = runs_by_id.get(child.parent_run_id or "")
                if parent_run is None:
                    continue
                # Extract expansion key from dag_nsref
                prefix = parent_run.dag_nsref + "/"
                if not child.dag_nsref.startswith(prefix):
                    continue
                expansion_key = child.dag_nsref[len(prefix) :]
                # Match key prefix to composite node label
                bracket_pos = expansion_key.find("[")
                node_label = expansion_key[:bracket_pos] if bracket_pos >= 0 else expansion_key
                node = parent_run.nodes.get(node_label)
                if node is not None:
                    if node.sub_dags is None:
                        node.sub_dags = {}
                    node.sub_dags[expansion_key] = child

        return dag_runs

    def _fetch_run_rows(
        self,
        cursor: sqlite3.Cursor,
        where: str,
        params: tuple[Any, ...],
    ) -> list[tuple[Any, ...]]:
        execute_sql(
            cursor,
            f"SELECT run_id, dag_nsref, parent_run_id, status, started_at, "
            f"finished_at FROM dag_runs {where}",
            params,
        )
        return cursor.fetchall()

    def inspect_run(self, run_id: str) -> DagRun | None:
        """Get a full DagRun model by ID, with nested nodes, edges, and sub-DAGs."""
        with open_sqlite_db(self.db_path) as conn:
            cursor = conn.cursor()
            rows = self._fetch_run_rows(cursor, "WHERE run_id = ?", (run_id,))
            runs = self._load_dag_runs(cursor, rows)
            return runs[0] if runs else None

    def load_io(
        self,
        dag_run: DagRun,
        node_labels: Sequence[str] | None = None,
    ) -> None:
        """
        Populate `io` on `dag_run` and its nodes in a single DB round-trip.

        `dag_run.io` gets `source_inputs_json` / `sink_outputs_json`.
        `node.io` gets `input_json` / `output_json` for each node, filtered
        by `node_labels` if provided (all nodes if `None`).

        Does NOT recurse into sub-DAGs.
        """
        with open_sqlite_db(self.db_path) as conn:
            cursor = conn.cursor()

            # Load run-level IO
            execute_sql(
                cursor,
                "SELECT source_inputs_json, sink_outputs_json FROM dag_runs WHERE run_id = ?",
                (dag_run.run_id,),
            )
            row = cursor.fetchone()
            if row:
                dag_run.io = IoPayload(input_json=row[0], output_json=row[1])

            # Load node-level IO
            labels = list(node_labels) if node_labels is not None else list(dag_run.nodes.keys())
            if labels:
                placeholders = ",".join("?" * len(labels))
                execute_sql(
                    cursor,
                    f"SELECT node_label, input_json, output_json FROM node_executions "
                    f"WHERE run_id = ? AND node_label IN ({placeholders})",
                    (dag_run.run_id, *labels),
                )
                for r in cursor.fetchall():
                    node = dag_run.nodes.get(r[0])
                    if node:
                        node.io = IoPayload(input_json=r[1], output_json=r[2])

    def get_recent_runs(self, limit: int = 20, status: str | None = None) -> list[DagRun]:
        """List runs with full node/edge data, ordered by started_at descending."""
        with open_sqlite_db(self.db_path) as conn:
            cursor = conn.cursor()
            if status:
                rows = self._fetch_run_rows(
                    cursor, "WHERE status = ? ORDER BY started_at DESC LIMIT ?", (status, limit)
                )
            else:
                rows = self._fetch_run_rows(cursor, "ORDER BY started_at DESC LIMIT ?", (limit,))
            return self._load_dag_runs(cursor, rows)

    def get_active_runs(self) -> list[DagRun]:
        """Get all currently running DAG executions."""
        return self.get_recent_runs(limit=100, status="running")

    def get_child_runs(self, parent_run_id: str) -> list[DagRun]:
        """Recursively get all descendant runs (children, grandchildren, etc.)."""
        with open_sqlite_db(self.db_path) as conn:
            cursor = conn.cursor()
            execute_sql(
                cursor,
                "WITH RECURSIVE descendants AS ( "
                "  SELECT run_id, dag_nsref, parent_run_id, status, started_at, finished_at "
                "  FROM dag_runs WHERE parent_run_id = ? "
                "  UNION ALL "
                "  SELECT d.run_id, d.dag_nsref, d.parent_run_id, d.status, d.started_at, "
                "    d.finished_at "
                "  FROM dag_runs d JOIN descendants p ON d.parent_run_id = p.run_id "
                ") SELECT * FROM descendants ORDER BY started_at",
                (parent_run_id,),
            )
            rows = cursor.fetchall()
            return self._load_dag_runs(cursor, rows)

create_run(run_id, dag_nsref, source_inputs, parent_run_id=None)

Create a new run record with status 'running'.

Source code in src/lythonic/compose/dag_provenance.py
def create_run(
    self,
    run_id: str,
    dag_nsref: str,
    source_inputs: dict[str, Any],
    parent_run_id: str | None = None,
) -> None:
    """Create a new run record with status 'running'."""
    with open_sqlite_db(self.db_path) as conn:
        _insert_run(conn.cursor(), run_id, dag_nsref, source_inputs, parent_run_id)
        conn.commit()

update_run_status(run_id, status)

Update the status of a run without setting finished_at.

Source code in src/lythonic/compose/dag_provenance.py
def update_run_status(self, run_id: str, status: str) -> None:
    """Update the status of a run without setting finished_at."""
    with open_sqlite_db(self.db_path) as conn:
        _update_run_status(conn.cursor(), run_id, status)
        conn.commit()

finish_run(run_id, status, sink_outputs_json=None)

Set final status and finished_at timestamp.

Source code in src/lythonic/compose/dag_provenance.py
def finish_run(self, run_id: str, status: str, sink_outputs_json: str | None = None) -> None:
    """Set final status and finished_at timestamp."""
    with open_sqlite_db(self.db_path) as conn:
        _finish_run(conn.cursor(), run_id, status, sink_outputs_json)
        conn.commit()

record_node_start(run_id, node_label, input_json, is_source=False, is_sink=False)

Record that a node has started execution.

Source code in src/lythonic/compose/dag_provenance.py
def record_node_start(
    self,
    run_id: str,
    node_label: str,
    input_json: str,
    is_source: bool = False,
    is_sink: bool = False,
) -> None:
    """Record that a node has started execution."""
    with open_sqlite_db(self.db_path) as conn:
        _insert_node_start(conn.cursor(), run_id, node_label, input_json, is_source, is_sink)
        conn.commit()

record_node_skipped(run_id, node_label, output_json)

Record a node as skipped with a copied output (used in replay).

Source code in src/lythonic/compose/dag_provenance.py
def record_node_skipped(self, run_id: str, node_label: str, output_json: str) -> None:
    """Record a node as skipped with a copied output (used in replay)."""
    with open_sqlite_db(self.db_path) as conn:
        _insert_node_skipped(conn.cursor(), run_id, node_label, output_json)
        conn.commit()

complete_node_with_edges(run_id, node_label, output_json, edges)

Record node completion and all outgoing edge traversals in one batch.

Source code in src/lythonic/compose/dag_provenance.py
def complete_node_with_edges(
    self,
    run_id: str,
    node_label: str,
    output_json: str,
    edges: list[tuple[str, str]],
) -> None:
    """Record node completion and all outgoing edge traversals in one batch."""
    with open_sqlite_db(self.db_path) as conn:
        cursor = conn.cursor()
        _update_node_complete(cursor, run_id, node_label, output_json)
        for upstream, downstream in edges:
            _insert_edge_traversal(cursor, run_id, upstream, downstream)
        conn.commit()

fail_node_and_finish_run(run_id, node_label, error)

Record node failure and mark the run as failed in one batch.

Source code in src/lythonic/compose/dag_provenance.py
def fail_node_and_finish_run(
    self,
    run_id: str,
    node_label: str,
    error: str,
) -> None:
    """Record node failure and mark the run as failed in one batch."""
    with open_sqlite_db(self.db_path) as conn:
        cursor = conn.cursor()
        _update_node_failed(cursor, run_id, node_label, error)
        _finish_run(cursor, run_id, "failed")
        conn.commit()

get_run(run_id)

Get a run record by ID, or None if not found.

Source code in src/lythonic/compose/dag_provenance.py
def get_run(self, run_id: str) -> dict[str, Any] | None:
    """Get a run record by ID, or None if not found."""
    with open_sqlite_db(self.db_path) as conn:
        cursor = conn.cursor()
        execute_sql(
            cursor,
            "SELECT run_id, dag_nsref, parent_run_id, status, started_at, finished_at, "
            "source_inputs_json, sink_outputs_json FROM dag_runs WHERE run_id = ?",
            (run_id,),
        )
        row = cursor.fetchone()
        if row is None:
            return None
        return {
            "run_id": row[0],
            "dag_nsref": row[1],
            "parent_run_id": row[2],
            "status": row[3],
            "started_at": row[4],
            "finished_at": row[5],
            "source_inputs_json": row[6],
            "sink_outputs_json": row[7],
        }

get_node_executions(run_id)

Get all node execution records for a run.

Source code in src/lythonic/compose/dag_provenance.py
def get_node_executions(self, run_id: str) -> list[dict[str, Any]]:
    """Get all node execution records for a run."""
    with open_sqlite_db(self.db_path) as conn:
        cursor = conn.cursor()
        execute_sql(
            cursor,
            "SELECT run_id, node_label, status, is_source, is_sink, input_json, output_json, "
            "started_at, finished_at, error "
            "FROM node_executions WHERE run_id = ?",
            (run_id,),
        )
        return [
            {
                "run_id": r[0],
                "node_label": r[1],
                "status": r[2],
                "is_source": bool(r[3]),
                "is_sink": bool(r[4]),
                "input_json": r[5],
                "output_json": r[6],
                "started_at": r[7],
                "finished_at": r[8],
                "error": r[9],
            }
            for r in cursor.fetchall()
        ]

get_node_output(run_id, node_label)

Get the output JSON of a completed or skipped node, or None.

Source code in src/lythonic/compose/dag_provenance.py
def get_node_output(self, run_id: str, node_label: str) -> str | None:
    """Get the output JSON of a completed or skipped node, or None."""
    with open_sqlite_db(self.db_path) as conn:
        cursor = conn.cursor()
        execute_sql(
            cursor,
            "SELECT output_json FROM node_executions "
            "WHERE run_id = ? AND node_label = ? AND status IN ('completed', 'skipped')",
            (run_id, node_label),
        )
        row = cursor.fetchone()
        return row[0] if row else None

get_pending_nodes(run_id)

Get labels of nodes that are not yet completed or skipped.

Source code in src/lythonic/compose/dag_provenance.py
def get_pending_nodes(self, run_id: str) -> list[str]:
    """Get labels of nodes that are not yet completed or skipped."""
    with open_sqlite_db(self.db_path) as conn:
        cursor = conn.cursor()
        execute_sql(
            cursor,
            "SELECT node_label FROM node_executions "
            "WHERE run_id = ? AND status IN ('pending', 'running', 'failed')",
            (run_id,),
        )
        return [row[0] for row in cursor.fetchall()]

get_edge_traversals(run_id)

Get all edge traversals for a run.

Source code in src/lythonic/compose/dag_provenance.py
def get_edge_traversals(self, run_id: str) -> list[dict[str, Any]]:
    """Get all edge traversals for a run."""
    with open_sqlite_db(self.db_path) as conn:
        cursor = conn.cursor()
        execute_sql(
            cursor,
            "SELECT run_id, upstream_label, downstream_label, traversed_at "
            "FROM edge_traversals WHERE run_id = ? ORDER BY traversed_at",
            (run_id,),
        )
        return [
            {
                "run_id": r[0],
                "upstream_label": r[1],
                "downstream_label": r[2],
                "traversed_at": r[3],
            }
            for r in cursor.fetchall()
        ]

inspect_run(run_id)

Get a full DagRun model by ID, with nested nodes, edges, and sub-DAGs.

Source code in src/lythonic/compose/dag_provenance.py
def inspect_run(self, run_id: str) -> DagRun | None:
    """Get a full DagRun model by ID, with nested nodes, edges, and sub-DAGs."""
    with open_sqlite_db(self.db_path) as conn:
        cursor = conn.cursor()
        rows = self._fetch_run_rows(cursor, "WHERE run_id = ?", (run_id,))
        runs = self._load_dag_runs(cursor, rows)
        return runs[0] if runs else None

load_io(dag_run, node_labels=None)

Populate io on dag_run and its nodes in a single DB round-trip.

dag_run.io gets source_inputs_json / sink_outputs_json. node.io gets input_json / output_json for each node, filtered by node_labels if provided (all nodes if None).

Does NOT recurse into sub-DAGs.

Source code in src/lythonic/compose/dag_provenance.py
def load_io(
    self,
    dag_run: DagRun,
    node_labels: Sequence[str] | None = None,
) -> None:
    """
    Populate `io` on `dag_run` and its nodes in a single DB round-trip.

    `dag_run.io` gets `source_inputs_json` / `sink_outputs_json`.
    `node.io` gets `input_json` / `output_json` for each node, filtered
    by `node_labels` if provided (all nodes if `None`).

    Does NOT recurse into sub-DAGs.
    """
    with open_sqlite_db(self.db_path) as conn:
        cursor = conn.cursor()

        # Load run-level IO
        execute_sql(
            cursor,
            "SELECT source_inputs_json, sink_outputs_json FROM dag_runs WHERE run_id = ?",
            (dag_run.run_id,),
        )
        row = cursor.fetchone()
        if row:
            dag_run.io = IoPayload(input_json=row[0], output_json=row[1])

        # Load node-level IO
        labels = list(node_labels) if node_labels is not None else list(dag_run.nodes.keys())
        if labels:
            placeholders = ",".join("?" * len(labels))
            execute_sql(
                cursor,
                f"SELECT node_label, input_json, output_json FROM node_executions "
                f"WHERE run_id = ? AND node_label IN ({placeholders})",
                (dag_run.run_id, *labels),
            )
            for r in cursor.fetchall():
                node = dag_run.nodes.get(r[0])
                if node:
                    node.io = IoPayload(input_json=r[1], output_json=r[2])

get_recent_runs(limit=20, status=None)

List runs with full node/edge data, ordered by started_at descending.

Source code in src/lythonic/compose/dag_provenance.py
def get_recent_runs(self, limit: int = 20, status: str | None = None) -> list[DagRun]:
    """List runs with full node/edge data, ordered by started_at descending."""
    with open_sqlite_db(self.db_path) as conn:
        cursor = conn.cursor()
        if status:
            rows = self._fetch_run_rows(
                cursor, "WHERE status = ? ORDER BY started_at DESC LIMIT ?", (status, limit)
            )
        else:
            rows = self._fetch_run_rows(cursor, "ORDER BY started_at DESC LIMIT ?", (limit,))
        return self._load_dag_runs(cursor, rows)

get_active_runs()

Get all currently running DAG executions.

Source code in src/lythonic/compose/dag_provenance.py
def get_active_runs(self) -> list[DagRun]:
    """Get all currently running DAG executions."""
    return self.get_recent_runs(limit=100, status="running")

get_child_runs(parent_run_id)

Recursively get all descendant runs (children, grandchildren, etc.).

Source code in src/lythonic/compose/dag_provenance.py
def get_child_runs(self, parent_run_id: str) -> list[DagRun]:
    """Recursively get all descendant runs (children, grandchildren, etc.)."""
    with open_sqlite_db(self.db_path) as conn:
        cursor = conn.cursor()
        execute_sql(
            cursor,
            "WITH RECURSIVE descendants AS ( "
            "  SELECT run_id, dag_nsref, parent_run_id, status, started_at, finished_at "
            "  FROM dag_runs WHERE parent_run_id = ? "
            "  UNION ALL "
            "  SELECT d.run_id, d.dag_nsref, d.parent_run_id, d.status, d.started_at, "
            "    d.finished_at "
            "  FROM dag_runs d JOIN descendants p ON d.parent_run_id = p.run_id "
            ") SELECT * FROM descendants ORDER BY started_at",
            (parent_run_id,),
        )
        rows = cursor.fetchall()
        return self._load_dag_runs(cursor, rows)

NullProvenance

No-op provenance -- discards writes, returns None/empty on reads. Used when DagRunner is created without explicit provenance.

Source code in src/lythonic/compose/dag_provenance.py
class NullProvenance:
    """
    No-op provenance -- discards writes, returns None/empty on reads.
    Used when `DagRunner` is created without explicit provenance.
    """

    def create_run(
        self,
        run_id: str,  # pyright: ignore[reportUnusedParameter]
        dag_nsref: str,  # pyright: ignore[reportUnusedParameter]
        source_inputs: dict[str, Any],  # pyright: ignore[reportUnusedParameter]
        parent_run_id: str | None = None,  # pyright: ignore[reportUnusedParameter]
    ) -> None:
        pass

    def update_run_status(self, run_id: str, status: str) -> None:  # pyright: ignore[reportUnusedParameter]
        pass

    def finish_run(self, run_id: str, status: str, sink_outputs_json: str | None = None) -> None:  # pyright: ignore[reportUnusedParameter]
        pass

    def record_node_start(
        self,
        run_id: str,  # pyright: ignore[reportUnusedParameter]
        node_label: str,  # pyright: ignore[reportUnusedParameter]
        input_json: str,  # pyright: ignore[reportUnusedParameter]
        is_source: bool = False,  # pyright: ignore[reportUnusedParameter]
        is_sink: bool = False,  # pyright: ignore[reportUnusedParameter]
    ) -> None:
        pass

    def record_node_skipped(self, run_id: str, node_label: str, output_json: str) -> None:  # pyright: ignore[reportUnusedParameter]
        pass

    def complete_node_with_edges(
        self,
        run_id: str,  # pyright: ignore[reportUnusedParameter]
        node_label: str,  # pyright: ignore[reportUnusedParameter]
        output_json: str,  # pyright: ignore[reportUnusedParameter]
        edges: list[tuple[str, str]],  # pyright: ignore[reportUnusedParameter]
    ) -> None:
        pass

    def fail_node_and_finish_run(self, run_id: str, node_label: str, error: str) -> None:  # pyright: ignore[reportUnusedParameter]
        pass

    def get_run(self, run_id: str) -> dict[str, Any] | None:  # pyright: ignore[reportUnusedParameter]
        return None

    def get_node_executions(self, run_id: str) -> list[dict[str, Any]]:  # pyright: ignore[reportUnusedParameter]
        return []

    def get_node_output(self, run_id: str, node_label: str) -> str | None:  # pyright: ignore[reportUnusedParameter]
        return None

    def get_pending_nodes(self, run_id: str) -> list[str]:  # pyright: ignore[reportUnusedParameter]
        return []

    def get_edge_traversals(self, run_id: str) -> list[dict[str, Any]]:  # pyright: ignore[reportUnusedParameter]
        return []

    def inspect_run(self, run_id: str) -> DagRun | None:  # pyright: ignore[reportUnusedParameter]
        return None

    def get_recent_runs(self, limit: int = 20, status: str | None = None) -> list[DagRun]:  # pyright: ignore[reportUnusedParameter]
        return []

    def get_active_runs(self) -> list[DagRun]:
        return []

    def get_child_runs(self, parent_run_id: str) -> list[DagRun]:  # pyright: ignore[reportUnusedParameter]
        return []

DagRun

Bases: BaseModel

A DAG execution record with nested node executions and edge traversals.

Source code in src/lythonic/compose/dag_provenance.py
class DagRun(BaseModel):
    """A DAG execution record with nested node executions and edge traversals."""

    run_id: str
    dag_nsref: str
    parent_run_id: str | None = None
    status: str
    started_at: datetime
    finished_at: datetime | None = None
    nodes: dict[str, NodeExecution] = {}
    io: IoPayload | None = None

    def latest_update(self) -> datetime:
        """The most recent timestamp across all nodes and traversals."""
        candidates: list[datetime] = [self.started_at]
        if self.finished_at:
            candidates.append(self.finished_at)
        for n in self.nodes.values():
            if n.started_at:
                candidates.append(n.started_at)
            if n.finished_at:
                candidates.append(n.finished_at)
            for e in n.edges:
                candidates.append(e.traversed_at)
        return max(candidates)

    def nodes_changed_since(self, dt: datetime) -> list[NodeExecution]:
        """Nodes whose started_at or finished_at is after `dt`."""
        result: list[NodeExecution] = []
        for n in self.nodes.values():
            if (n.started_at and n.started_at > dt) or (n.finished_at and n.finished_at > dt):
                result.append(n)
        return result

latest_update()

The most recent timestamp across all nodes and traversals.

Source code in src/lythonic/compose/dag_provenance.py
def latest_update(self) -> datetime:
    """The most recent timestamp across all nodes and traversals."""
    candidates: list[datetime] = [self.started_at]
    if self.finished_at:
        candidates.append(self.finished_at)
    for n in self.nodes.values():
        if n.started_at:
            candidates.append(n.started_at)
        if n.finished_at:
            candidates.append(n.finished_at)
        for e in n.edges:
            candidates.append(e.traversed_at)
    return max(candidates)

nodes_changed_since(dt)

Nodes whose started_at or finished_at is after dt.

Source code in src/lythonic/compose/dag_provenance.py
def nodes_changed_since(self, dt: datetime) -> list[NodeExecution]:
    """Nodes whose started_at or finished_at is after `dt`."""
    result: list[NodeExecution] = []
    for n in self.nodes.values():
        if (n.started_at and n.started_at > dt) or (n.finished_at and n.finished_at > dt):
            result.append(n)
    return result

NodeExecution

Bases: BaseModel

Execution record for a single node in a DAG run.

Source code in src/lythonic/compose/dag_provenance.py
class NodeExecution(BaseModel):
    """Execution record for a single node in a DAG run."""

    node_label: str
    status: str
    is_source: bool = False
    is_sink: bool = False
    started_at: datetime | None = None
    finished_at: datetime | None = None
    error: str | None = None
    edges: list[EdgeTraversal] = []
    sub_dags: dict[str, DagRun] | None = None
    io: IoPayload | None = None

EdgeTraversal

Bases: BaseModel

Edge traversed during a DAG run. downstream_label identifies the target node.

Source code in src/lythonic/compose/dag_provenance.py
class EdgeTraversal(BaseModel):
    """Edge traversed during a DAG run. `downstream_label` identifies the target node."""

    downstream_label: str
    traversed_at: datetime

json_default(obj)

JSON serialization fallback for Pydantic models and other types.

Source code in src/lythonic/compose/dag_provenance.py
def json_default(obj: Any) -> Any:
    """JSON serialization fallback for Pydantic models and other types."""
    if isinstance(obj, BaseModel):
        return obj.model_dump()
    return str(obj)

safe_json_dumps(obj)

JSON serialize with Pydantic support. Logs a warning on failure.

Source code in src/lythonic/compose/dag_provenance.py
def safe_json_dumps(obj: Any) -> str:
    """JSON serialize with Pydantic support. Logs a warning on failure."""
    try:
        return json.dumps(obj, default=json_default)
    except Exception:
        _log.warning("Failed to serialize to JSON: %s", type(obj).__name__, exc_info=True)
        return json.dumps(str(obj))