Lightweight SQLite ORM with Pydantic-based schema definition.
This module provides a simple ORM-like interface for SQLite databases using
Pydantic models to define the schema. It supports automatic DDL generation,
CRUD operations, type mapping, and multi-tenant access patterns.
Quick Start
Define your models by subclassing DbModel:
from pydantic import Field
from lythonic.state import DbModel, Schema, open_sqlite_db
class Author(DbModel["Author"]):
author_id: int = Field(default=-1, description="(PK)")
name: str = Field(description="Author name")
class Book(DbModel["Book"]):
book_id: int = Field(default=-1, description="(PK)")
author_id: int = Field(description="(FK:Author.author_id)")
title: str
year: int | None = None
SCHEMA = Schema([Author, Book])
Create tables and use CRUD operations:
SCHEMA.create_schema(Path("books.db"))
with open_sqlite_db("books.db") as conn:
author = Author(name="Jane Austen")
author.save(conn) # Inserts with auto-increment, sets author_id
book = Book(author_id=author.author_id, title="Pride and Prejudice", year=1813)
book.save(conn)
# Query
books = Book.select(conn, author_id=author.author_id)
loaded = Book.load_by_id(conn, book.book_id)
conn.commit()
Schema Definition
Primary Keys
Mark a field as primary key by adding (PK) at the start of the description:
user_id: int = Field(default=-1, description="(PK) Unique user identifier")
When save() is called on a model with pk_field=-1, it auto-increments.
Foreign Keys
Mark foreign keys with (FK:Table.field):
author_id: int = Field(description="(FK:Author.author_id) Reference to author")
Nullable Fields
Use | None union type:
email: str | None = Field(default=None, description="Optional email")
Enum and Literal Constraints
Enum and Literal types generate CHECK constraints:
from typing import Literal
status: Literal["active", "inactive", "pending"]
Generates: status TEXT NOT NULL CHECK (status IN ('active', 'inactive', 'pending'))
Supported Types
The following Python types are automatically mapped to SQLite:
| Python Type |
SQLite Type |
Notes |
| int, bool |
INTEGER |
bool stored as 0/1 |
| float |
REAL |
|
| str |
TEXT |
|
| bytes |
BLOB |
|
| datetime |
TEXT |
Stored as ISO format string |
| date |
TEXT |
Stored as ISO format string |
| Path |
TEXT |
Stored as string path |
| Enum |
TEXT |
Stored as enum value |
| IntEnum |
INTEGER |
|
| BaseModel |
TEXT |
Stored as JSON string |
| JsonBase |
TEXT |
Stored as JSON with type info |
CRUD Operations
Insert
record = MyModel(field1="value")
record.insert(conn, auto_increment=True) # Sets PK from lastrowid
Save (Upsert)
record.save(conn) # Inserts if pk=-1, else updates existing row
Select
# Select all
all_records = MyModel.select(conn)
# Filter by field value
filtered = MyModel.select(conn, status="active")
# Filter with operators: eq, ne, gt, lt, gte, lte
recent = MyModel.select(conn, gt__year=2020)
# IN clause (pass a list)
specific = MyModel.select(conn, status=["active", "pending"])
Load by ID
record = MyModel.load_by_id(conn, 42) # Returns None if not found
Update
record.field = "new value"
n_updated = record.update(conn, pk_field=record.pk_field)
Delete
# Delete all matching rows, returns count of deleted rows
n_deleted = MyModel.delete(conn, status="inactive")
Primary Key Filter
Use get_pk_filter() to get a filter dict for the current record's primary key:
record = MyModel.load_by_id(conn, 42)
pk_filter, pk_defined = record.get_pk_filter()
# pk_filter is e.g. {"my_id": 42}, pk_defined is True if pk != -1
# Useful for deleting related records
RelatedModel.delete(conn, **pk_filter)
Count and Exists
count = MyModel.select_count(conn, status="active")
exists = MyModel.exists(conn, email="user@example.com")
Multi-Model Queries
For joins, use from_multi_model_row():
from lythonic.state import execute_sql, from_multi_model_row
cursor = conn.cursor()
execute_sql(
cursor,
f"SELECT {Author.columns('a')}, {Book.columns('b')} "
f"FROM {Author.alias('a')}, {Book.alias('b')} "
"WHERE a.author_id = b.author_id"
)
for row in cursor.fetchall():
author, book = from_multi_model_row(row, [Author, Book])
Multi-Tenant Support (UserOwned)
For multi-tenant applications, use UserOwned base class (from lythonic.state.user):
from lythonic.state.user import User, UserOwned, UserContext
class Task(UserOwned["Task"]):
task_id: int = Field(default=-1, description="(PK)")
title: str
# All operations require a UserContext
user_ctx = UserContext(user=current_user)
task = Task(title="My Task")
task.save_with_ctx(user_ctx, conn) # Automatically sets user_id
# Queries are automatically scoped to user
tasks = Task.select(conn, user_ctx=user_ctx)
task = Task.load_by_id_with_ctx(conn, user_ctx, task_id=42)
Schema Management
from lythonic.state import Schema, DbFile
SCHEMA = Schema([User, Task, Event])
# Create tables directly
SCHEMA.create_schema(Path("app.db"))
# Or use DbFile for lifecycle management
db = DbFile("app.db", SCHEMA)
db.check(ensure=True) # Creates tables if missing
with db.open() as conn:
# Use connection
pass
Exports
DbModel: Base class for database models
Schema: Collection of DbModel classes
DbFile: Database file manager
FieldInfo: Field metadata extraction
open_sqlite_db: Context manager for SQLite connections
execute_sql: Execute SQL with logging
from_multi_model_row: Parse multi-model query results
to_sql_datetime: Convert datetime to SQL string
DbModel
Bases: BaseModel, Generic[T]
Source code in src/lythonic/state/__init__.py
| class DbModel(BaseModel, Generic[T]):
@classmethod
def get_table_name(cls: type) -> str:
return cls.__name__
@classmethod
def alias(cls, alias: str) -> str:
return f"{cls.get_table_name()} as {alias}"
@classmethod
def columns(cls, alias: str | None = None) -> str:
alias = "" if alias is None else f"{alias}."
return ", ".join([f"{alias}{c.name}" for c in cls.get_field_infos()])
@classmethod
def get_field_infos(
cls, filter: Callable[[FieldInfo], bool] = lambda _: True
) -> Generator[FieldInfo, None, None]:
for field_name, field_info in cls.model_fields.items():
fi = FieldInfo.build(field_name, field_info)
if filter(fi):
yield fi
@classmethod
def get_field_map(cls) -> dict[str, FieldInfo]:
return {fi.name: fi for fi in cls.get_field_infos()}
@classmethod
def create_ddl(cls) -> str:
fields: list[str] = []
for fi in cls.get_field_infos():
type_name = (
f"{fi.ktype.db_type_info.name}{'' if fi.primary_key or fi.nullable else ' NOT NULL'}"
+ f"{' PRIMARY KEY' if fi.primary_key else ''}"
+ (
f" REFERENCES {fi.foreign_key[0]}({fi.foreign_key[1]})"
if fi.foreign_key
else ""
)
+ fi.check_constraint_ddl()
)
fields.append(f"{fi.name} {type_name}")
return f"CREATE TABLE {cls.get_table_name()} (" + ", ".join(fields) + ")"
def insert(self, conn: sqlite3.Connection, auto_increment: bool = False):
cursor = conn.cursor()
cls = self.__class__
fields = self._choose_fields(lambda fi: not auto_increment or not fi.primary_key)
execute_sql(
cursor,
f"INSERT INTO {cls.get_table_name()} ({', '.join(map(lambda fi: fi.name, fields))}) "
+ f"VALUES ({', '.join(['?'] * len(fields))})",
[fi.to_sql_value(self) for fi in fields],
)
if auto_increment:
pks = list(cls.get_field_infos(lambda fi: fi.primary_key))
assert len(pks) == 1
pks[0].set_value(self, cursor.lastrowid)
@classmethod
def _choose_fields(cls, lambda_filter: Callable[[FieldInfo], bool]) -> list[FieldInfo]:
return list(cls.get_field_infos(lambda_filter))
@classmethod
def _ensure_pk(cls) -> FieldInfo:
pks = cls._choose_fields(lambda fi: fi.primary_key)
assert len(pks) == 1
return pks[0]
def get_pk_filter(self) -> tuple[dict[str, Any], bool]:
"""
Returns: primary key filter and whether it is defined
"""
cls = self.__class__
pk = cls._ensure_pk()
pk_val = getattr(self, pk.name)
return {pk.name: pk_val}, pk_val != -1
def save(self, conn: sqlite3.Connection) -> Self:
pk_filter, pk_defined = self.get_pk_filter()
if not pk_defined:
self.insert(conn, auto_increment=True)
return self
n_updated = self.update(conn, **pk_filter)
if n_updated == 0:
self.insert(conn)
else:
assert n_updated == 1
return self
class _WhereBased(NamedTuple):
cursor: sqlite3.Cursor
table_name: str
field_map: dict[str, FieldInfo]
where_keys: set[str]
where_clauses: list[str]
args: list[Any]
def fields(self) -> list[FieldInfo]:
return list(self.field_map.values())
def where_clause(self) -> str:
return (
f"WHERE {' AND '.join(self.where_clauses)}" if len(self.where_clauses) > 0 else ""
)
def execute_select(self, select_vars: str | None = None):
if select_vars is None:
select_vars = ", ".join([fi.name for fi in self.fields()])
execute_sql(
self.cursor,
f"SELECT {select_vars} FROM {self.table_name} {self.where_clause()}",
self.args,
)
def execute_delete(self):
execute_sql(
self.cursor,
f"DELETE FROM {self.table_name} {self.where_clause()}",
self.args,
)
@classmethod
def _prepare_where(cls, conn: sqlite3.Connection, **filters: Any) -> _WhereBased:
"""Select all rows from the database that match the filters.
Filters are given as keyword arguments, the keys are the field names
and the values are the values to filter by.
"""
field_map: dict[str, FieldInfo] = cls.get_field_map()
filter_ops: dict[FilterOp, Any] = {FilterOp.parse(k): v for k, v in filters.items()}
unknown_filters = {k: filter_ops[k] for k in filter_ops if k.name not in field_map}
assert len(unknown_filters) == 0, (
f"Known fields: {field_map.keys()}, but unknown filters: {unknown_filters}"
)
cursor = conn.cursor()
args: list[Any] = []
where_clauses: list[str] = []
where_keys: set[str] = set()
for f_op, v in filter_ops.items():
fi = field_map[f_op.name]
where_keys.add(f_op.name)
if f_op.operator == "eq":
if isinstance(v, list):
v_list: list[Any] = [fi.ktype.db.map_to(val) for val in cast(list[Any], v)]
where_clauses.append(f"{f_op.name} IN ({', '.join('?' * len(v_list))})")
args.extend(v_list)
else:
where_clauses.append(f"{f_op.name} = ?")
args.append(fi.ktype.db.map_to(v))
else:
op_resolved = FILTER_OPERATOR_SQL[f_op.operator]
where_clauses.append(f"{f_op.name} {op_resolved} ?")
args.append(fi.ktype.db.map_to(v))
return cls._WhereBased(
cursor, cls.get_table_name(), field_map, where_keys, where_clauses, args
)
def update(self, conn: sqlite3.Connection, **filters: Any) -> int:
cls = self.__class__
sc = cls._prepare_where(conn, **filters)
rest_of_the_fields = cls._choose_fields(lambda fi: fi.name not in sc.where_keys)
execute_sql(
sc.cursor,
f"UPDATE {cls.get_table_name()} "
+ f"SET {', '.join([f'{fi.name} = ?' for fi in rest_of_the_fields])} "
+ sc.where_clause(),
[fi.to_sql_value(self) for fi in rest_of_the_fields] + sc.args,
)
return sc.cursor.rowcount
@classmethod
def select(cls, conn: sqlite3.Connection, **filters: Any) -> list[T]:
"""Select all rows from the table that match the filters.
Filters are given as keyword arguments, the keys are the field names
and the values are the values to filter by.
"""
sc = cls._prepare_where(conn, **filters)
sc.execute_select()
return [cls._from_row(row, sc.fields()) for row in sc.cursor.fetchall()]
@classmethod
def delete(cls, conn: sqlite3.Connection, **filters: Any) -> int:
"""Delete all rows from the table that match the filters.
Filters are given as keyword arguments, the keys are the field names
and the values are the values to filter by.
"""
sc = cls._prepare_where(conn, **filters)
sc.execute_delete()
return sc.cursor.rowcount
@classmethod
def _from_row(cls, row: tuple[Any, ...], fields: list[FieldInfo]) -> T:
assert len(row) == len(fields)
return cast(
T,
cls.model_validate({fi.name: fi.from_sql_value(row[i]) for i, fi in enumerate(fields)}),
)
@classmethod
def select_count(cls, conn: sqlite3.Connection, **filters: Any) -> int:
sc = cls._prepare_where(conn, **filters)
sc.execute_select("COUNT(*)")
return sc.cursor.fetchone()[0]
@classmethod
def exists(cls, conn: sqlite3.Connection, **filters: Any) -> bool:
return cls.select_count(conn, **filters) > 0
@classmethod
def load_by_id(cls: type[T], conn: sqlite3.Connection, id: int) -> T | None:
rr: list[T] = cls.select(conn, **{cls._ensure_pk().name: id})
assert len(rr) <= 1
return rr[0] if rr else None
|
get_pk_filter()
Returns: primary key filter and whether it is defined
Source code in src/lythonic/state/__init__.py
| def get_pk_filter(self) -> tuple[dict[str, Any], bool]:
"""
Returns: primary key filter and whether it is defined
"""
cls = self.__class__
pk = cls._ensure_pk()
pk_val = getattr(self, pk.name)
return {pk.name: pk_val}, pk_val != -1
|
select(conn, **filters)
classmethod
Select all rows from the table that match the filters.
Filters are given as keyword arguments, the keys are the field names
and the values are the values to filter by.
Source code in src/lythonic/state/__init__.py
| @classmethod
def select(cls, conn: sqlite3.Connection, **filters: Any) -> list[T]:
"""Select all rows from the table that match the filters.
Filters are given as keyword arguments, the keys are the field names
and the values are the values to filter by.
"""
sc = cls._prepare_where(conn, **filters)
sc.execute_select()
return [cls._from_row(row, sc.fields()) for row in sc.cursor.fetchall()]
|
delete(conn, **filters)
classmethod
Delete all rows from the table that match the filters.
Filters are given as keyword arguments, the keys are the field names
and the values are the values to filter by.
Source code in src/lythonic/state/__init__.py
| @classmethod
def delete(cls, conn: sqlite3.Connection, **filters: Any) -> int:
"""Delete all rows from the table that match the filters.
Filters are given as keyword arguments, the keys are the field names
and the values are the values to filter by.
"""
sc = cls._prepare_where(conn, **filters)
sc.execute_delete()
return sc.cursor.rowcount
|
Schema
Source code in src/lythonic/state/__init__.py
| class Schema:
tables: list[type[DbModel[Any]]]
def __init__(self, tables: list[type[DbModel[Any]]]):
self.tables = tables
def check_all_tables_exist(self, conn: sqlite3.Connection):
cursor = conn.cursor()
execute_sql(cursor, "SELECT name FROM sqlite_master WHERE type='table' ")
all_tables = set(r[0] for r in cursor.fetchall())
return all(t.get_table_name() in all_tables for t in self.tables)
def create_tables(self, conn: sqlite3.Connection):
cursor = conn.cursor()
for table in self.tables:
execute_sql(cursor, table.create_ddl())
conn.commit()
def create_schema(self, path: Path):
with open_sqlite_db(path) as conn:
self.create_tables(conn)
|
DbFile
Source code in src/lythonic/state/__init__.py
| class DbFile:
path: Path
schema: Schema
def __init__(self, db_path: Path | str, schema: Schema) -> None:
self.path = Path(db_path)
self.schema = schema
def check(self, ensure: bool = False) -> bool:
if not self.path.exists():
if ensure:
with self.open() as conn:
self.schema.create_tables(conn)
return True
return False
else:
# TODO : migrate db if necessary
with self.open() as conn:
if not self.schema.check_all_tables_exist(conn):
if not ensure:
return False
self.schema.create_tables(conn)
return True
return False
def open(self):
return open_sqlite_db(self.path)
|
FieldInfo
Bases: NamedTuple
Source code in src/lythonic/state/__init__.py
| class FieldInfo(NamedTuple):
name: str
ktype: KnownType
description: str
nullable: bool
primary_key: bool
foreign_key: tuple[str, str] | None
fixed_choices: list[Any] | None # For enum types and literal types
@classmethod
def build(cls, name: str, field_info: Any) -> "FieldInfo":
assert field_info.annotation is not None, f"Field {name} has no annotation"
ann = field_info.annotation
type_: type[Any]
fixed_choices: list[Any] | None = None
if isinstance(ann, type):
type_ = ann
is_nullable = False
else:
origin = get_origin(ann)
if origin is UnionType:
args = get_args(ann)
nones = [arg for arg in args if arg is NoneType]
valid_types = [arg for arg in args if arg is not NoneType]
assert len(valid_types) == 1 and len(nones) == 1, (
f"Union {ann} must have exactly 2 types and one of them must be None"
)
type_ = valid_types[0]
is_nullable = True
elif origin is Literal:
args = get_args(ann)
nones = [arg is NoneType or arg is None for arg in args]
fixed_choices = [a for a, n in zip(args, nones, strict=True) if not n]
is_nullable = any(nones)
types: set[type[Any]] = {type(c) for c in fixed_choices}
assert len(types) == 1, f"Literal {ann} must have only one type"
type_ = types.pop()
else:
raise AssertionError(f"Unknown field annotation type: {ann}")
if issubclass(type_, Enum):
fixed_choices = list(type_)
description = field_info.description or ""
is_primary_key = description.startswith("(PK)")
if is_primary_key:
description = description[4:].strip()
assert not (is_primary_key and is_nullable), (
"A field cannot be both a primary key and nullable"
)
is_foreign_key = description.startswith("(FK:")
if is_foreign_key:
x, description = description[4:].strip().split(")")
table_name, field_name = x.split(".")
foreign_key = table_name, field_name
else:
foreign_key = None
return cls(
name=name,
ktype=KnownType.ensure(type_),
description=description,
nullable=is_nullable,
primary_key=is_primary_key,
foreign_key=foreign_key,
fixed_choices=fixed_choices,
)
def check_constraint_ddl(self) -> str:
if self.fixed_choices is not None:
return f" CHECK ({self.name} IN ({', '.join([repr(self.ktype.db.map_to(c)) for c in self.fixed_choices])}))"
return ""
def to_sql_value(self, o: "DbModel[T]") -> Any:
v = getattr(o, self.name)
return self.ktype.db.map_to(v)
def from_sql_value(self, v: Any) -> Any:
return self.ktype.db.map_from(v)
def set_value(self, o: "DbModel[T]", v: Any):
setattr(o, self.name, self.from_sql_value(v))
|
open_sqlite_db(db_name)
Open or create SQLite database with given name.
Parameters:
| Name |
Type |
Description |
Default |
db_name
|
str | Path
|
Path or name of the SQLite database file
|
required
|
Yields:
| Type |
Description |
|
|
sqlite3.Connection: Database connection object
|
Source code in src/lythonic/state/__init__.py
| @contextmanager
def open_sqlite_db(db_name: str | Path):
"""Open or create SQLite database with given name.
Args:
db_name: Path or name of the SQLite database file
Yields:
sqlite3.Connection: Database connection object
"""
logger.info(f"Opening database {db_name}")
conn = sqlite3.connect(db_name)
try:
yield conn
finally:
conn.close()
|
execute_sql(cursor, sql, *args)
Source code in src/lythonic/state/__init__.py
| def execute_sql(cursor: sqlite3.Cursor, sql: str, *args: Any):
logger.debug(f"execute: {sql}" + (f" -- with args: {args}" if len(args) > 0 else ""))
cursor.execute(sql, *args)
|
from_multi_model_row(row, models)
Source code in src/lythonic/state/__init__.py
| def from_multi_model_row(
row: tuple[Any, ...], models: list[type[DbModel[Any]]]
) -> Generator[DbModel[Any], None, None]:
start = 0
for model in models:
fields = list(model.get_field_infos())
yield model._from_row(row[start : start + len(fields)], fields) # pyright: ignore [reportPrivateUsage]
start += len(fields)
|
to_sql_datetime(dt)
Source code in src/lythonic/state/__init__.py
| def to_sql_datetime(dt: datetime) -> str:
return dt.isoformat()
|