Lightweight SQLite ORM with Pydantic-based schema definition.
This module provides a simple ORM-like interface for SQLite databases using
Pydantic models to define the schema. It supports automatic DDL generation,
CRUD operations, type mapping, and multi-tenant access patterns.
Quick Start
Define your models by subclassing DbModel:
from pydantic import Field
from lythonic.state import DbModel, Schema, open_sqlite_db
class Author(DbModel["Author"]):
author_id: int = Field(default=-1, description="(PK)")
name: str = Field(description="Author name")
class Book(DbModel["Book"]):
book_id: int = Field(default=-1, description="(PK)")
author_id: int = Field(description="(FK:Author.author_id)")
title: str
year: int | None = None
SCHEMA = Schema([Author, Book])
Create tables and use CRUD operations:
SCHEMA.create_schema(Path("books.db"))
with open_sqlite_db("books.db") as conn:
author = Author(name="Jane Austen")
author.save(conn) # Inserts with auto-increment, sets author_id
book = Book(author_id=author.author_id, title="Pride and Prejudice", year=1813)
book.save(conn)
# Query
books = Book.select(conn, author_id=author.author_id)
loaded = Book.load_by_id(conn, book.book_id)
conn.commit()
Schema Definition
Primary Keys
Mark a field as primary key by adding (PK) at the start of the description:
user_id: int = Field(default=-1, description="(PK) Unique user identifier")
When save() is called on a model with pk_field=-1, it auto-increments.
Foreign Keys
Mark foreign keys with (FK:Table.field):
author_id: int = Field(description="(FK:Author.author_id) Reference to author")
Nullable Fields
Use | None union type:
email: str | None = Field(default=None, description="Optional email")
Enum and Literal Constraints
Enum and Literal types generate CHECK constraints:
from typing import Literal
status: Literal["active", "inactive", "pending"]
Generates: status TEXT NOT NULL CHECK (status IN ('active', 'inactive', 'pending'))
Supported Types
The following Python types are automatically mapped to SQLite:
| Python Type |
SQLite Type |
Notes |
| int, bool |
INTEGER |
bool stored as 0/1 |
| float |
REAL |
|
| str |
TEXT |
|
| bytes |
BLOB |
|
| datetime |
TEXT |
Stored as ISO format string |
| date |
TEXT |
Stored as ISO format string |
| Path |
TEXT |
Stored as string path |
| Enum |
TEXT |
Stored as enum value |
| IntEnum |
INTEGER |
|
| BaseModel |
TEXT |
Stored as JSON string |
| JsonBase |
TEXT |
Stored as JSON with type info |
CRUD Operations
Insert
record = MyModel(field1="value")
record.insert(conn, auto_increment=True) # Sets PK from lastrowid
Save (Upsert)
record.save(conn) # Inserts if pk=-1, else updates existing row
Select
# Select all
all_records = MyModel.select(conn)
# Filter by field value
filtered = MyModel.select(conn, status="active")
# Filter with operators: eq, ne, gt, lt, gte, lte
recent = MyModel.select(conn, gt__year=2020)
# IN clause (pass a list)
specific = MyModel.select(conn, status=["active", "pending"])
Load by ID
record = MyModel.load_by_id(conn, 42) # Returns None if not found
Update
record.field = "new value"
n_updated = record.update(conn, pk_field=record.pk_field)
Delete
# Delete all matching rows, returns count of deleted rows
n_deleted = MyModel.delete(conn, status="inactive")
Primary Key Filter
Use get_pk_filter() to get a filter dict for the current record's primary key:
record = MyModel.load_by_id(conn, 42)
pk_filter, pk_defined = record.get_pk_filter()
# pk_filter is e.g. {"my_id": 42}, pk_defined is True if pk != -1
# Useful for deleting related records
RelatedModel.delete(conn, **pk_filter)
Count and Exists
count = MyModel.select_count(conn, status="active")
exists = MyModel.exists(conn, email="user@example.com")
Multi-Model Queries
For joins, use from_multi_model_row():
from lythonic.state import execute_sql, from_multi_model_row
cursor = conn.cursor()
execute_sql(
cursor,
f"SELECT {Author.columns('a')}, {Book.columns('b')} "
f"FROM {Author.alias('a')}, {Book.alias('b')} "
"WHERE a.author_id = b.author_id"
)
for row in cursor.fetchall():
author, book = from_multi_model_row(row, [Author, Book])
Multi-Tenant Support (UserOwned)
For multi-tenant applications, use UserOwned base class (from lythonic.state.user):
from lythonic.state.user import User, UserOwned, UserContext
class Task(UserOwned["Task"]):
task_id: int = Field(default=-1, description="(PK)")
title: str
# All operations require a UserContext
user_ctx = UserContext(user=current_user)
task = Task(title="My Task")
task.save_with_ctx(user_ctx, conn) # Automatically sets user_id
# Queries are automatically scoped to user
tasks = Task.select(conn, user_ctx=user_ctx)
task = Task.load_by_id_with_ctx(conn, user_ctx, task_id=42)
Alternative Keys
Mark fields as part of the alternative key with (AK):
class Region(DbModel["Region"]):
id: int = Field(default=-1, description="(PK)")
code: str = Field(description="(AK) Region code")
name: str
class Team(DbModel["Team"]):
id: int = Field(default=-1, description="(PK)")
region_id: int = Field(description="(FK:Region.id)(AK)")
name: str = Field(description="(AK) Team name")
AK fields generate a UNIQUE constraint. FK fields marked (AK) cascade through
to the referenced table's AK for external resolution:
SCHEMA = Schema([Region, Team]) # Validates and builds cascade chains
with open_sqlite_db("db.sqlite") as conn:
# Resolve external AK values to integer PK
pk = Team.resolve_ak(conn, code="us-west", name="Eagles")
# Load by AK in one call
team = Team.load_by_ak(conn, code="us-west", name="Eagles")
# Serialize integer FK back to external AK values
ak_dict = team.to_ak_dict(conn) # {"region_id": "us-west", "name": "Eagles"}
See lythonic.state.alt_key for the AltKey class that handles cascade
chain computation, JOIN-based resolution, and serialization.
Schema Management
from lythonic.state import Schema, DbFile
SCHEMA = Schema([User, Task, Event])
# Create tables directly
SCHEMA.create_schema(Path("app.db"))
# Or use DbFile for lifecycle management
db = DbFile("app.db", SCHEMA)
db.check(ensure=True) # Creates tables if missing
with db.open() as conn:
# Use connection
pass
Exports
DbModel: Base class for database models
Schema: Collection of DbModel classes
DbFile: Database file manager
FieldInfo: Field metadata extraction
open_sqlite_db: Context manager for SQLite connections
execute_sql: Execute SQL with logging
from_multi_model_row: Parse multi-model query results
to_sql_datetime: Convert datetime to SQL string
AltKey: Alternative key metadata and resolution (from lythonic.state.alt_key)
DbModel
Bases: BaseModel, Generic[T]
Source code in src/lythonic/state/__init__.py
| class DbModel(BaseModel, Generic[T]):
@classmethod
def get_table_name(cls: type) -> str:
return cls.__name__
@classmethod
def alias(cls, alias: str) -> str:
return f"{cls.get_table_name()} as {alias}"
@classmethod
def columns(cls, alias: str | None = None) -> str:
alias = "" if alias is None else f"{alias}."
return ", ".join([f"{alias}{c.name}" for c in cls.get_field_infos()])
@classmethod
def get_field_infos(
cls, filter: Callable[[FieldInfo], bool] = lambda _: True
) -> Generator[FieldInfo, None, None]:
for field_name, field_info in cls.model_fields.items():
fi = FieldInfo.build(field_name, field_info)
if filter(fi):
yield fi
@classmethod
def get_field_map(cls) -> dict[str, FieldInfo]:
return {fi.name: fi for fi in cls.get_field_infos()}
@classmethod
def create_ddl(cls) -> str:
fields: list[str] = []
ak_fields: list[str] = []
for fi in cls.get_field_infos():
type_name = (
f"{fi.ktype.db_type_info.name}{'' if fi.primary_key or fi.nullable else ' NOT NULL'}"
+ f"{' PRIMARY KEY' if fi.primary_key else ''}"
+ (
f" REFERENCES {fi.foreign_key[0]}({fi.foreign_key[1]})"
if fi.foreign_key
else ""
)
+ fi.check_constraint_ddl()
)
fields.append(f"{fi.name} {type_name}")
if fi.alt_key:
ak_fields.append(fi.name)
if ak_fields:
fields.append(f"UNIQUE ({', '.join(ak_fields)})")
return f"CREATE TABLE {cls.get_table_name()} (" + ", ".join(fields) + ")"
def insert(self, conn: sqlite3.Connection, auto_increment: bool = False):
cursor = conn.cursor()
cls = self.__class__
fields = self._choose_fields(lambda fi: not auto_increment or not fi.primary_key)
execute_sql(
cursor,
f"INSERT INTO {cls.get_table_name()} ({', '.join(map(lambda fi: fi.name, fields))}) "
+ f"VALUES ({', '.join(['?'] * len(fields))})",
[fi.to_sql_value(self) for fi in fields],
)
if auto_increment:
pks = list(cls.get_field_infos(lambda fi: fi.primary_key))
assert len(pks) == 1
pks[0].set_value(self, cursor.lastrowid)
@classmethod
def _choose_fields(cls, lambda_filter: Callable[[FieldInfo], bool]) -> list[FieldInfo]:
return list(cls.get_field_infos(lambda_filter))
@classmethod
def _ensure_pk(cls) -> FieldInfo:
pks = cls._choose_fields(lambda fi: fi.primary_key)
assert len(pks) == 1
return pks[0]
def get_pk_filter(self) -> tuple[dict[str, Any], bool]:
"""
Returns: primary key filter and whether it is defined
"""
cls = self.__class__
pk = cls._ensure_pk()
pk_val = getattr(self, pk.name)
return {pk.name: pk_val}, pk_val != -1
def save(self, conn: sqlite3.Connection) -> Self:
pk_filter, pk_defined = self.get_pk_filter()
if not pk_defined:
self.insert(conn, auto_increment=True)
return self
n_updated = self.update(conn, **pk_filter)
if n_updated == 0:
self.insert(conn)
else:
assert n_updated == 1
return self
class _WhereBased(NamedTuple):
cursor: sqlite3.Cursor
table_name: str
field_map: dict[str, FieldInfo]
where_keys: set[str]
where_clauses: list[str]
args: list[Any]
def fields(self) -> list[FieldInfo]:
return list(self.field_map.values())
def where_clause(self) -> str:
return (
f"WHERE {' AND '.join(self.where_clauses)}" if len(self.where_clauses) > 0 else ""
)
def execute_select(self, select_vars: str | None = None):
if select_vars is None:
select_vars = ", ".join([fi.name for fi in self.fields()])
execute_sql(
self.cursor,
f"SELECT {select_vars} FROM {self.table_name} {self.where_clause()}",
self.args,
)
def execute_delete(self):
execute_sql(
self.cursor,
f"DELETE FROM {self.table_name} {self.where_clause()}",
self.args,
)
@classmethod
def _prepare_where(cls, conn: sqlite3.Connection, **filters: Any) -> _WhereBased:
"""Select all rows from the database that match the filters.
Filters are given as keyword arguments, the keys are the field names
and the values are the values to filter by.
"""
field_map: dict[str, FieldInfo] = cls.get_field_map()
filter_ops: dict[FilterOp, Any] = {FilterOp.parse(k): v for k, v in filters.items()}
unknown_filters = {k: filter_ops[k] for k in filter_ops if k.name not in field_map}
assert len(unknown_filters) == 0, (
f"Known fields: {field_map.keys()}, but unknown filters: {unknown_filters}"
)
cursor = conn.cursor()
args: list[Any] = []
where_clauses: list[str] = []
where_keys: set[str] = set()
for f_op, v in filter_ops.items():
fi = field_map[f_op.name]
where_keys.add(f_op.name)
if f_op.operator == "eq":
if isinstance(v, list):
v_list: list[Any] = [fi.ktype.db.map_to(val) for val in cast(list[Any], v)]
where_clauses.append(f"{f_op.name} IN ({', '.join('?' * len(v_list))})")
args.extend(v_list)
else:
where_clauses.append(f"{f_op.name} = ?")
args.append(fi.ktype.db.map_to(v))
else:
op_resolved = FILTER_OPERATOR_SQL[f_op.operator]
where_clauses.append(f"{f_op.name} {op_resolved} ?")
args.append(fi.ktype.db.map_to(v))
return cls._WhereBased(
cursor, cls.get_table_name(), field_map, where_keys, where_clauses, args
)
def update(self, conn: sqlite3.Connection, **filters: Any) -> int:
cls = self.__class__
sc = cls._prepare_where(conn, **filters)
rest_of_the_fields = cls._choose_fields(lambda fi: fi.name not in sc.where_keys)
execute_sql(
sc.cursor,
f"UPDATE {cls.get_table_name()} "
+ f"SET {', '.join([f'{fi.name} = ?' for fi in rest_of_the_fields])} "
+ sc.where_clause(),
[fi.to_sql_value(self) for fi in rest_of_the_fields] + sc.args,
)
return sc.cursor.rowcount
@classmethod
def select(cls: type[T], conn: sqlite3.Connection, **filters: Any) -> list[T]:
"""Select all rows from the table that match the filters.
Filters are given as keyword arguments, the keys are the field names
and the values are the values to filter by.
"""
sc = cls._prepare_where(conn, **filters)
sc.execute_select()
return [cls._from_row(row, sc.fields()) for row in sc.cursor.fetchall()]
@classmethod
def delete(cls, conn: sqlite3.Connection, **filters: Any) -> int:
"""Delete all rows from the table that match the filters.
Filters are given as keyword arguments, the keys are the field names
and the values are the values to filter by.
"""
sc = cls._prepare_where(conn, **filters)
sc.execute_delete()
return sc.cursor.rowcount
@classmethod
def _from_row(cls, row: tuple[Any, ...], fields: list[FieldInfo]) -> T:
assert len(row) == len(fields)
return cast(
T,
cls.model_validate({fi.name: fi.from_sql_value(row[i]) for i, fi in enumerate(fields)}),
)
@classmethod
def select_count(cls, conn: sqlite3.Connection, **filters: Any) -> int:
sc = cls._prepare_where(conn, **filters)
sc.execute_select("COUNT(*)")
return sc.cursor.fetchone()[0]
@classmethod
def exists(cls, conn: sqlite3.Connection, **filters: Any) -> bool:
return cls.select_count(conn, **filters) > 0
@classmethod
def load_by_id(cls: type[T], conn: sqlite3.Connection, id: int) -> T | None:
rr: list[T] = cls.select(conn, **{cls._ensure_pk().name: id}) # pyright: ignore[reportUnknownVariableType]
assert len(rr) <= 1
return rr[0] if rr else None # pyright: ignore[reportUnknownVariableType]
# Alternative key support
_alt_key_cache: ClassVar[AltKey | None] = None
_alt_key_initialized: ClassVar[bool] = False
@classmethod
def _get_alt_key(cls) -> AltKey | None:
if not cls._alt_key_initialized:
from lythonic.state.alt_key import AltKey
cls._alt_key_cache = AltKey.from_model(cls)
cls._alt_key_initialized = True
return cls._alt_key_cache
@classmethod
def _ensure_alt_key(cls) -> AltKey:
ak = cls._get_alt_key()
assert ak is not None, f"{cls.get_table_name()} has no alternative key defined"
return ak
@classmethod
def resolve_ak(cls, conn: sqlite3.Connection, **ak_values: Any) -> int | None:
"""Resolve alternative key values to the integer PK."""
return cls._ensure_alt_key().resolve(conn, **ak_values)
@classmethod
def load_by_ak(cls: type[T], conn: sqlite3.Connection, **ak_values: Any) -> T | None:
"""Resolve AK values and load the record."""
pk = cls.resolve_ak(conn, **ak_values)
if pk is None:
return None
return cls.load_by_id(conn, pk) # pyright: ignore[reportUnknownVariableType]
def to_ak_dict(self, conn: sqlite3.Connection) -> dict[str, Any]:
"""Serialize this record's alternative key values."""
return self.__class__._ensure_alt_key().to_ak_dict(conn, self)
@classmethod
def to_ak_dicts(
cls, conn: sqlite3.Connection, records: Sequence[DbModel[Any]]
) -> list[dict[str, Any]]:
"""Batch serialize alternative key values for multiple records."""
return cls._ensure_alt_key().to_ak_dicts(conn, records)
|
get_pk_filter()
Returns: primary key filter and whether it is defined
Source code in src/lythonic/state/__init__.py
| def get_pk_filter(self) -> tuple[dict[str, Any], bool]:
"""
Returns: primary key filter and whether it is defined
"""
cls = self.__class__
pk = cls._ensure_pk()
pk_val = getattr(self, pk.name)
return {pk.name: pk_val}, pk_val != -1
|
select(conn, **filters)
classmethod
Select all rows from the table that match the filters.
Filters are given as keyword arguments, the keys are the field names
and the values are the values to filter by.
Source code in src/lythonic/state/__init__.py
| @classmethod
def select(cls: type[T], conn: sqlite3.Connection, **filters: Any) -> list[T]:
"""Select all rows from the table that match the filters.
Filters are given as keyword arguments, the keys are the field names
and the values are the values to filter by.
"""
sc = cls._prepare_where(conn, **filters)
sc.execute_select()
return [cls._from_row(row, sc.fields()) for row in sc.cursor.fetchall()]
|
delete(conn, **filters)
classmethod
Delete all rows from the table that match the filters.
Filters are given as keyword arguments, the keys are the field names
and the values are the values to filter by.
Source code in src/lythonic/state/__init__.py
| @classmethod
def delete(cls, conn: sqlite3.Connection, **filters: Any) -> int:
"""Delete all rows from the table that match the filters.
Filters are given as keyword arguments, the keys are the field names
and the values are the values to filter by.
"""
sc = cls._prepare_where(conn, **filters)
sc.execute_delete()
return sc.cursor.rowcount
|
resolve_ak(conn, **ak_values)
classmethod
Resolve alternative key values to the integer PK.
Source code in src/lythonic/state/__init__.py
| @classmethod
def resolve_ak(cls, conn: sqlite3.Connection, **ak_values: Any) -> int | None:
"""Resolve alternative key values to the integer PK."""
return cls._ensure_alt_key().resolve(conn, **ak_values)
|
load_by_ak(conn, **ak_values)
classmethod
Resolve AK values and load the record.
Source code in src/lythonic/state/__init__.py
| @classmethod
def load_by_ak(cls: type[T], conn: sqlite3.Connection, **ak_values: Any) -> T | None:
"""Resolve AK values and load the record."""
pk = cls.resolve_ak(conn, **ak_values)
if pk is None:
return None
return cls.load_by_id(conn, pk) # pyright: ignore[reportUnknownVariableType]
|
to_ak_dict(conn)
Serialize this record's alternative key values.
Source code in src/lythonic/state/__init__.py
| def to_ak_dict(self, conn: sqlite3.Connection) -> dict[str, Any]:
"""Serialize this record's alternative key values."""
return self.__class__._ensure_alt_key().to_ak_dict(conn, self)
|
to_ak_dicts(conn, records)
classmethod
Batch serialize alternative key values for multiple records.
Source code in src/lythonic/state/__init__.py
| @classmethod
def to_ak_dicts(
cls, conn: sqlite3.Connection, records: Sequence[DbModel[Any]]
) -> list[dict[str, Any]]:
"""Batch serialize alternative key values for multiple records."""
return cls._ensure_alt_key().to_ak_dicts(conn, records)
|
Schema
Source code in src/lythonic/state/__init__.py
| class Schema:
tables: list[type[DbModel[Any]]]
table_map: dict[str, type[DbModel[Any]]]
def __init__(self, tables: list[type[DbModel[Any]]]):
self.tables = tables
self.table_map = {t.get_table_name(): t for t in tables}
self._validate_alt_keys()
def _validate_alt_keys(self) -> None:
"""Validate AK-FK consistency across all tables."""
from lythonic.state.alt_key import AltKey
for table in self.tables:
ak = AltKey.from_model(table)
if ak is None:
continue
table_name = table.get_table_name()
for field in ak.fields:
if field.foreign_key is None:
continue
ref_table_name, _ref_field = field.foreign_key
assert ref_table_name != table_name, (
f"{table_name}.{field.name}: self-referential FK in AK is disallowed"
)
assert ref_table_name in self.table_map, (
f"{table_name}.{field.name}: FK references '{ref_table_name}' "
f"which is not in schema"
)
ref_ak = AltKey.from_model(self.table_map[ref_table_name])
assert ref_ak is not None, (
f"{table_name}.{field.name}: FK references '{ref_table_name}' "
f"which has no alternative key"
)
# Build cascade chains now that validation passed
for table in self.tables:
ak = AltKey.from_model(table)
if ak is not None:
ak.build_chain(self.table_map)
table._alt_key_cache = ak # pyright: ignore[reportPrivateUsage]
table._alt_key_initialized = True # pyright: ignore[reportPrivateUsage]
def check_all_tables_exist(self, conn: sqlite3.Connection):
cursor = conn.cursor()
execute_sql(cursor, "SELECT name FROM sqlite_master WHERE type='table' ")
all_tables = set(r[0] for r in cursor.fetchall())
return all(t.get_table_name() in all_tables for t in self.tables)
def create_tables(self, conn: sqlite3.Connection):
cursor = conn.cursor()
for table in self.tables:
execute_sql(cursor, table.create_ddl())
conn.commit()
def create_schema(self, path: Path):
with open_sqlite_db(path) as conn:
self.create_tables(conn)
|
DbFile
Source code in src/lythonic/state/__init__.py
| class DbFile:
path: Path
schema: Schema
def __init__(self, db_path: Path | str, schema: Schema) -> None:
self.path = Path(db_path)
self.schema = schema
def check(self, ensure: bool = False) -> bool:
if not self.path.exists():
if ensure:
with self.open() as conn:
self.schema.create_tables(conn)
return True
return False
else:
# TODO : migrate db if necessary
with self.open() as conn:
if not self.schema.check_all_tables_exist(conn):
if not ensure:
return False
self.schema.create_tables(conn)
return True
return False
def open(self):
return open_sqlite_db(self.path)
|
FieldInfo
Bases: NamedTuple
Source code in src/lythonic/state/__init__.py
| class FieldInfo(NamedTuple):
name: str
ktype: KnownType
description: str
nullable: bool
primary_key: bool
foreign_key: tuple[str, str] | None
fixed_choices: list[Any] | None # For enum types and literal types
alt_key: bool
@classmethod
def build(cls, name: str, field_info: Any) -> FieldInfo:
assert field_info.annotation is not None, f"Field {name} has no annotation"
ann = field_info.annotation
type_: type[Any]
fixed_choices: list[Any] | None = None
if isinstance(ann, type):
type_ = ann
is_nullable = False
else:
origin = get_origin(ann)
if origin is UnionType:
args = get_args(ann)
nones = [arg for arg in args if arg is NoneType]
valid_types = [arg for arg in args if arg is not NoneType]
assert len(valid_types) == 1 and len(nones) == 1, (
f"Union {ann} must have exactly 2 types and one of them must be None"
)
type_ = valid_types[0]
is_nullable = True
elif origin is Literal:
args = get_args(ann)
nones = [arg is NoneType or arg is None for arg in args]
fixed_choices = [a for a, n in zip(args, nones, strict=True) if not n]
is_nullable = any(nones)
types: set[type[Any]] = {type(c) for c in fixed_choices}
assert len(types) == 1, f"Literal {ann} must have only one type"
type_ = types.pop()
else:
raise AssertionError(f"Unknown field annotation type: {ann}")
if issubclass(type_, Enum):
fixed_choices = list(type_)
description = field_info.description or ""
is_primary_key = description.startswith("(PK)")
if is_primary_key:
description = description[4:].strip()
assert not (is_primary_key and is_nullable), (
"A field cannot be both a primary key and nullable"
)
is_foreign_key = description.startswith("(FK:")
if is_foreign_key:
x, description = description[4:].strip().split(")", 1)
table_name, field_name = x.split(".")
foreign_key = table_name, field_name
else:
foreign_key = None
is_alt_key = description.startswith("(AK)")
if is_alt_key:
description = description[4:].strip()
assert not (is_alt_key and is_nullable), (
f"Field {name} cannot be both an alternative key and nullable"
)
return cls(
name=name,
ktype=KnownType.ensure(type_),
description=description,
nullable=is_nullable,
primary_key=is_primary_key,
foreign_key=foreign_key,
fixed_choices=fixed_choices,
alt_key=is_alt_key,
)
def check_constraint_ddl(self) -> str:
if self.fixed_choices is not None:
return f" CHECK ({self.name} IN ({', '.join([repr(self.ktype.db.map_to(c)) for c in self.fixed_choices])}))"
return ""
def to_sql_value(self, o: DbModel[T]) -> Any:
v = getattr(o, self.name)
return self.ktype.db.map_to(v)
def from_sql_value(self, v: Any) -> Any:
return self.ktype.db.map_from(v)
def set_value(self, o: DbModel[T], v: Any):
setattr(o, self.name, self.from_sql_value(v))
|
open_sqlite_db(db_name)
Open or create SQLite database with given name.
Parameters:
| Name |
Type |
Description |
Default |
db_name
|
str | Path
|
Path or name of the SQLite database file
|
required
|
Yields:
| Type |
Description |
|
|
sqlite3.Connection: Database connection object
|
Source code in src/lythonic/state/__init__.py
| @contextmanager
def open_sqlite_db(db_name: str | Path):
"""Open or create SQLite database with given name.
Args:
db_name: Path or name of the SQLite database file
Yields:
sqlite3.Connection: Database connection object
"""
logger.info(f"Opening database {db_name}")
conn = sqlite3.connect(db_name)
try:
yield conn
finally:
conn.close()
|
execute_sql(cursor, sql, *args)
Source code in src/lythonic/state/__init__.py
| def execute_sql(cursor: sqlite3.Cursor, sql: str, *args: Any):
logger.debug(f"execute: {sql}" + (f" -- with args: {args}" if len(args) > 0 else ""))
cursor.execute(sql, *args)
|
from_multi_model_row(row, models)
Source code in src/lythonic/state/__init__.py
| def from_multi_model_row(
row: tuple[Any, ...], models: list[type[DbModel[Any]]]
) -> Generator[DbModel[Any], None, None]:
start = 0
for model in models:
fields = list(model.get_field_infos())
yield model._from_row(row[start : start + len(fields)], fields) # pyright: ignore [reportPrivateUsage]
start += len(fields)
|
to_sql_datetime(dt)
Source code in src/lythonic/state/__init__.py
| def to_sql_datetime(dt: datetime) -> str:
return dt.isoformat()
|