Scheduled Triggers¶
This tutorial shows how to set up a DAG that runs automatically on a schedule using lythonic's trigger system.
Define the DAG¶
Start with a simple DAG that fetches and processes data:
from lythonic.compose.namespace import Dag, Namespace
async def fetch_prices() -> dict[str, float]:
"""Simulate fetching latest prices."""
return {"AAPL": 185.0, "GOOG": 140.0}
async def summarize(prices: dict[str, float]) -> str:
total = sum(prices.values())
return f"Portfolio value: ${total:.2f}"
ns = Namespace()
ns.register(fetch_prices, nsref="market:fetch")
ns.register(summarize, nsref="market:summarize")
dag = Dag()
dag.node(ns.get("market:fetch")) >> dag.node(ns.get("market:summarize"))
ns.register(dag, nsref="pipelines:market_summary")
The DAG must be registered in the namespace so the trigger system can
look it up by nsref.
Register a Trigger Definition¶
A trigger definition is declarative metadata — it describes what should happen but doesn't start anything:
ns.register_trigger(
name="hourly_market",
dag_nsref="pipelines:market_summary",
trigger_type="poll",
schedule="0 * * * *",
)
The schedule is a standard cron expression. Common patterns:
"0 * * * *"— every hour"*/5 * * * *"— every 5 minutes"0 9 * * MON-FRI"— weekdays at 9am"0 0 * * *"— daily at midnight"0 0 * * 0"— weekly on Sunday
Activate and Run¶
Activating a trigger creates a persistent record in the trigger database.
The TriggerManager coordinates everything:
from pathlib import Path
from lythonic.compose.dag_provenance import DagProvenance
from lythonic.compose.trigger import TriggerManager, TriggerStore
store = TriggerStore(Path("triggers.db"))
provenance = DagProvenance(Path("provenance.db"))
manager = TriggerManager(
namespace=ns,
store=store,
provenance=provenance,
)
manager.activate("hourly_market")
At this point the trigger definition is stored in the database with
status active, but nothing is running yet.
Start the Poll Loop¶
Call start() to begin the background polling loop. It checks all active
poll triggers every second and fires them when their interval elapses:
import asyncio
async def main():
manager.start()
# Your application runs here...
# The trigger fires the DAG every hour in the background.
await asyncio.sleep(3600)
manager.stop()
asyncio.run(main())
Each time the trigger fires, the trigger system:
- Runs the DAG via the namespace node's async wrapper
- Records the event in
trigger_events(trigger name, payload, run ID) - Updates
last_run_atandlast_run_idin the activation record - The DAG run itself is recorded in the provenance database
Check Trigger History¶
Query the trigger store for past events:
events = store.get_events("hourly_market")
for event in events:
print(f"Fired at: {event['fired_at']}, status: {event['status']}, run: {event['run_id']}")
Deactivate¶
To stop a trigger from firing without stopping the manager:
The trigger stays in the database with status disabled. Reactivate it
later with manager.activate("hourly_market").
Without Provenance¶
If you don't need DAG run tracking, skip the provenance parameter.
The manager uses NullProvenance by default:
Custom Poll Functions¶
For triggers that check an external source instead of running on a fixed
schedule, provide a poll_fn. It's called on each schedule tick — return
data to fire the DAG, or None to skip:
def check_queue() -> dict[str, str] | None:
"""Check for new messages. Returns payload or None."""
# ... check external source ...
return {"message": "new order"} if has_messages else None
ns.register_trigger(
name="queue_check",
dag_nsref="pipelines:process_order",
trigger_type="poll",
poll_fn=check_queue,
schedule="*/10 * * * * *", # every 10 seconds
)
When poll_fn returns data, it becomes the DAG's input payload — the
dict keys are matched to the DAG's source node parameters by name.