Skip to content

StrictModel

StrictModel keeps the same ORM API as Model, but tightens runtime behavior.

What changes compared to Model

StrictModel adds two important guarantees:

  • scalar field assignments are validated immediately
  • undeclared public attributes raise AttributeError

That makes it a good fit when you want Saffier's ORM without the looser Python object behavior of the default model class.

Practical example

class Product(saffier.StrictModel):
    id = saffier.IntegerField(primary_key=True, autoincrement=True)
    name = saffier.CharField(max_length=100)
    rating = saffier.IntegerField(minimum=1, maximum=5, default=1)

    class Meta:
        registry = models


product = Product(name="Tea", rating=5)
product.rating = 6  # raises ValidationError
product.unknown = "value"  # raises AttributeError

What it does not change

StrictModel does not change query semantics, registry behavior, relation loading, or migration output. It is a runtime discipline layer over the normal Saffier model lifecycle.

saffier.StrictModel

StrictModel(*model_refs, **kwargs)

Bases: Model

Model variant that validates assignments and forbids ad-hoc public attributes.

StrictModel is useful when you want the Saffier ORM surface but prefer a more defensive runtime model that rejects undeclared attributes and validates scalar assignments immediately instead of waiting until persistence time.

Source code in saffier/core/db/models/base.py
76
77
78
79
80
81
82
def __init__(self, *model_refs: Any, **kwargs: Any) -> None:
    self.__dict__["__no_load_trigger_attrs__"] = set(
        getattr(self.__class__, "__no_load_trigger_attrs__", set())
    )
    self.__dict__["_db_deleted"] = False
    self.__dict__["transaction"] = functools.partial(self._instance_transaction)
    self.setup_model_fields_from_kwargs(model_refs, kwargs)

is_proxy_model class-attribute

is_proxy_model = False

query class-attribute

query = Manager()
query_related = RedirectManager(redirect_name='query')

meta class-attribute

meta = MetaInfo(None)

__db_model__ class-attribute

__db_model__ = False

__raw_query__ class-attribute

__raw_query__ = None

__proxy_model__ class-attribute

__proxy_model__ = None

__no_load_trigger_attrs__ class-attribute

__no_load_trigger_attrs__ = set()

__using_schema__ class-attribute

__using_schema__ = None

__require_model_based_deletion__ class-attribute

__require_model_based_deletion__ = False

__deletion_with_signals__ class-attribute

__deletion_with_signals__ = False

__skip_generic_reverse_delete__ class-attribute

__skip_generic_reverse_delete__ = False

pk property writable

pk

Return the model primary key in scalar or mapping form.

Single-column primary keys are returned as the raw value. Composite primary keys are returned as a mapping from logical field name to value, unless every component is None, in which case None is returned.

raw_query property writable

raw_query

table property writable

table

proxy_model cached property

proxy_model

signals cached property

signals

identifying_db_fields cached property

identifying_db_fields

Returns the database columns that uniquely identify the current instance.

Saffier currently uses the model primary key columns for this.

can_load property

can_load

Return whether the instance can be reloaded from the database.

The instance must belong to a concrete registered model and expose all identifying database fields.

copy_model class-attribute instance-attribute

copy_model = copy_saffier_model

copy_edgy_model class-attribute instance-attribute

copy_edgy_model = copy_saffier_model

Meta

abstract class-attribute instance-attribute

abstract = True

registry class-attribute instance-attribute

registry = False

get_admin_marshall_config classmethod

get_admin_marshall_config(*, phase, for_schema)
Source code in saffier/core/db/models/mixins/admin.py
12
13
14
15
16
17
18
19
20
21
22
@classmethod
def get_admin_marshall_config(
    cls: type[Model], *, phase: str, for_schema: bool
) -> dict[str, Any]:
    del for_schema
    return {
        "fields": ["__all__"],
        "exclude_read_only": phase in {"create", "update"},
        "primary_key_read_only": phase != "create",
        "exclude_autoincrement": phase == "create",
    }

get_admin_marshall_class classmethod

get_admin_marshall_class(*, phase, for_schema=False)
Source code in saffier/core/db/models/mixins/admin.py
24
25
26
27
28
29
30
31
32
33
34
@classmethod
def get_admin_marshall_class(
    cls: type[Model], *, phase: str, for_schema: bool = False
) -> type[marshalls.Marshall]:
    config = cls.get_admin_marshall_config(phase=phase, for_schema=for_schema)

    class AdminMarshall(marshalls.Marshall):
        marshall_config = marshalls.ConfigMarshall(model=cls, **config)

    AdminMarshall.__name__ = f"{cls.__name__}{phase.title()}AdminMarshall"
    return AdminMarshall

get_admin_marshall_for_save classmethod

get_admin_marshall_for_save(instance=None, /, **kwargs)
Source code in saffier/core/db/models/mixins/admin.py
36
37
38
39
40
41
42
@classmethod
def get_admin_marshall_for_save(
    cls: type[Model], instance: Model | None = None, /, **kwargs: Any
) -> marshalls.Marshall:
    phase = "update" if instance is not None else "create"
    AdminMarshallClass = cls.get_admin_marshall_class(phase=phase, for_schema=False)
    return cast("marshalls.Marshall", AdminMarshallClass(instance, **kwargs))

declarative classmethod

declarative()

Return the generated SQLAlchemy declarative model class.

RETURNS DESCRIPTION
Any

typing.Any: Declarative SQLAlchemy model created from the Saffier

Any

model definition.

Source code in saffier/core/db/models/mixins/generics.py
15
16
17
18
19
20
21
22
23
@classmethod
def declarative(cls) -> typing.Any:
    """Return the generated SQLAlchemy declarative model class.

    Returns:
        typing.Any: Declarative SQLAlchemy model created from the Saffier
        model definition.
    """
    return cls.generate_model_declarative()

generate_model_declarative classmethod

generate_model_declarative()

Transform the Saffier table into a SQLAlchemy declarative model.

RETURNS DESCRIPTION
Any

typing.Any: Declarative model class bound to the same SQLAlchemy

Any

table.

Source code in saffier/core/db/models/mixins/generics.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@classmethod
def generate_model_declarative(cls) -> typing.Any:
    """Transform the Saffier table into a SQLAlchemy declarative model.

    Returns:
        typing.Any: Declarative model class bound to the same SQLAlchemy
        table.
    """
    Base = cls.meta.registry.declarative_base

    # Build the original table
    fields = {"__table__": cls.table}

    # Generate base
    model_table = type(cls.__name__, (Base,), fields)

    # Make sure if there are foreignkeys, builds the relationships
    for column in cls.table.columns:
        if not column.foreign_keys:
            continue

        # Maps the relationships with the foreign keys and related names
        field = cls.fields.get(column.name)
        to = field.to.__name__ if inspect.isclass(field.to) else field.to
        mapped_model: Mapped[to] = relationship(to)  # type: ignore

        # Adds to the current model
        model_table.__mapper__.add_property(f"{column.name}_relation", mapped_model)

    return model_table

_update_auto_now_fields

_update_auto_now_fields(values, fields)

Refresh auto_now date and datetime fields inside a payload.

RETURNS DESCRIPTION
Any

Updated payload dictionary.

TYPE: Any

Source code in saffier/core/utils/models.py
29
30
31
32
33
34
35
36
37
38
def _update_auto_now_fields(self, values: Any, fields: Any) -> Any:
    """Refresh `auto_now` date and datetime fields inside a payload.

    Returns:
        Any: Updated payload dictionary.
    """
    for k, v in fields.items():
        if isinstance(v, (DateField, DateTimeField)) and v.auto_now:
            values[k] = v.validator.get_default_value()  # type: ignore
    return values

_resolve_value

_resolve_value(value)

Normalize one value for storage or query generation.

Dictionaries are JSON-encoded, enums are converted to their member names, and every other value is returned unchanged.

PARAMETER DESCRIPTION
value

Raw value supplied by model or queryset code.

TYPE: Any

RETURNS DESCRIPTION
Any

typing.Any: Normalized value suitable for downstream processing.

Source code in saffier/core/utils/models.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def _resolve_value(self, value: typing.Any) -> typing.Any:
    """Normalize one value for storage or query generation.

    Dictionaries are JSON-encoded, enums are converted to their member
    names, and every other value is returned unchanged.

    Args:
        value (typing.Any): Raw value supplied by model or queryset code.

    Returns:
        typing.Any: Normalized value suitable for downstream processing.
    """
    if isinstance(value, dict):
        return dumps(
            value,
            option=OPT_SERIALIZE_NUMPY | OPT_OMIT_MICROSECONDS,
        ).decode("utf-8")
    elif isinstance(value, Enum):
        return value.name
    return value

_is_model_ref_instance staticmethod

_is_model_ref_instance(value)
Source code in saffier/core/db/models/base.py
84
85
86
87
88
89
90
@staticmethod
def _is_model_ref_instance(value: Any) -> bool:
    return (
        hasattr(value, "model_dump")
        and hasattr(value.__class__, "__model_ref_fields__")
        and hasattr(value.__class__, "__related_name__")
    )

resolve_model_ref_field_name classmethod

resolve_model_ref_field_name(ref_type)

Return the RefForeignKey field that accepts ref_type references.

PARAMETER DESCRIPTION
ref_type

Concrete ModelRef subclass being supplied positionally.

TYPE: type[Any]

RETURNS DESCRIPTION
str

Name of the field that can store references of this type.

TYPE: str

RAISES DESCRIPTION
ModelReferenceError

If no RefForeignKey on the model accepts the provided reference type.

Source code in saffier/core/db/models/base.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@classmethod
def resolve_model_ref_field_name(cls, ref_type: type[Any]) -> str:
    """Return the `RefForeignKey` field that accepts `ref_type` references.

    Args:
        ref_type: Concrete `ModelRef` subclass being supplied positionally.

    Returns:
        str: Name of the field that can store references of this type.

    Raises:
        ModelReferenceError: If no `RefForeignKey` on the model accepts the
            provided reference type.
    """
    for field_name, field in cls.fields.items():
        model_ref = getattr(field, "model_ref", None)
        if model_ref is not None and issubclass(ref_type, model_ref):
            return field_name
    raise saffier.ModelReferenceError(
        detail=(
            f"No RefForeignKey on '{cls.__name__}' accepts model references of type "
            f"'{ref_type.__name__}'."
        )
    )

merge_model_refs classmethod

merge_model_refs(model_refs, kwargs=None)

Merge positional ModelRef objects into a keyword payload.

Saffier reserves positional constructor arguments for model reference objects so callers can write Article(tag_ref, author_ref, title="...") and still end up with a normal keyword payload keyed by the owning RefForeignKey fields.

PARAMETER DESCRIPTION
model_refs

Positional constructor arguments.

TYPE: Sequence[Any]

kwargs

Existing keyword payload.

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: Normalized keyword payload including merged model

dict[str, Any]

references.

RAISES DESCRIPTION
TypeError

If any positional argument is not a model reference instance.

Source code in saffier/core/db/models/base.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@classmethod
def merge_model_refs(
    cls,
    model_refs: Sequence[Any],
    kwargs: dict[str, Any] | None = None,
) -> dict[str, Any]:
    """Merge positional `ModelRef` objects into a keyword payload.

    Saffier reserves positional constructor arguments for model reference
    objects so callers can write `Article(tag_ref, author_ref, title="...")`
    and still end up with a normal keyword payload keyed by the owning
    `RefForeignKey` fields.

    Args:
        model_refs: Positional constructor arguments.
        kwargs: Existing keyword payload.

    Returns:
        dict[str, Any]: Normalized keyword payload including merged model
        references.

    Raises:
        TypeError: If any positional argument is not a model reference
            instance.
    """
    payload = dict(kwargs or {})
    if not model_refs:
        return payload

    for model_ref in model_refs:
        if not cls._is_model_ref_instance(model_ref):
            raise TypeError("Positional arguments are reserved for ModelRef instances.")

        field_name = cls.resolve_model_ref_field_name(model_ref.__class__)
        existing = payload.get(field_name)
        if existing is None:
            payload[field_name] = [model_ref]
        elif isinstance(existing, Sequence) and not isinstance(
            existing, (str, bytes, bytearray)
        ):
            payload[field_name] = [*existing, model_ref]
        else:
            payload[field_name] = [existing, model_ref]
    return payload

setup_model_fields_from_kwargs

setup_model_fields_from_kwargs(model_refs, kwargs)

Normalize constructor input and assign values onto the instance.

Positional ModelRef arguments are merged into the keyword payload, composite or virtual inputs are expanded through field hooks, and each resulting value is assigned using normal model attribute semantics so relation-aware setters still run.

PARAMETER DESCRIPTION
model_refs

Positional model-reference objects.

TYPE: Sequence[Any]

kwargs

Raw keyword payload supplied to the constructor.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
Any

Normalized payload after assignment.

TYPE: Any

RAISES DESCRIPTION
ValueError

If the payload contains unknown public attributes.

Source code in saffier/core/db/models/base.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def setup_model_fields_from_kwargs(
    self,
    model_refs: Sequence[Any],
    kwargs: dict[str, Any],
) -> Any:
    """Normalize constructor input and assign values onto the instance.

    Positional `ModelRef` arguments are merged into the keyword payload,
    composite or virtual inputs are expanded through field hooks, and each
    resulting value is assigned using normal model attribute semantics so
    relation-aware setters still run.

    Args:
        model_refs: Positional model-reference objects.
        kwargs: Raw keyword payload supplied to the constructor.

    Returns:
        Any: Normalized payload after assignment.

    Raises:
        ValueError: If the payload contains unknown public attributes.
    """
    kwargs = self.__class__.merge_model_refs(model_refs, kwargs)
    kwargs = self.__class__.normalize_field_kwargs(kwargs)

    if "pk" in kwargs:
        self.pk = kwargs.pop("pk")

    for key, value in kwargs.items():
        if key not in self.fields and not hasattr(self, key):
            raise ValueError(f"Invalid keyword {key} for class {self.__class__.__name__}")

        if key in self.get_plain_model_fields():
            value = self.validate_plain_field_value(key, value)

        # Set model field and add to the kwargs dict
        setattr(self, key, value)
        kwargs[key] = value
    return kwargs

get_plain_model_fields classmethod

get_plain_model_fields()
Source code in saffier/core/db/models/base.py
202
203
204
@classmethod
def get_plain_model_fields(cls) -> dict[str, dict[str, Any]]:
    return cast("dict[str, dict[str, Any]]", getattr(cls, "__plain_model_fields__", {}))

_matches_plain_annotation classmethod

_matches_plain_annotation(annotation, value)
Source code in saffier/core/db/models/base.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@classmethod
def _matches_plain_annotation(cls, annotation: Any, value: Any) -> bool:
    del cls
    if annotation in (Any, None):
        return True

    origin = get_origin(annotation)
    if origin is None:
        if annotation is type(None):
            return value is None
        if isinstance(annotation, type):
            if annotation is int and isinstance(value, bool):
                return False
            return isinstance(value, annotation)
        return True

    if str(origin) in {"typing.Union", "types.UnionType"}:
        return any(
            SaffierBaseModel._matches_plain_annotation(arg, value)
            for arg in get_args(annotation)
        )

    if origin in (list, set, tuple, dict):
        return isinstance(value, origin)

    if origin in (Sequence,):
        return isinstance(value, Sequence) and not isinstance(value, (str, bytes, bytearray))

    if isinstance(origin, type):
        return isinstance(value, origin)
    return True

validate_plain_field_value classmethod

validate_plain_field_value(key, value)
Source code in saffier/core/db/models/base.py
238
239
240
241
242
243
244
245
246
@classmethod
def validate_plain_field_value(cls, key: str, value: Any) -> Any:
    plain_field = cls.get_plain_model_fields().get(key)
    if plain_field is None:
        return value
    annotation = plain_field.get("annotation")
    if cls._matches_plain_annotation(annotation, value):
        return value
    raise ValidationError(text=f"Invalid value for '{key}'.", code="type")

normalize_field_kwargs classmethod

normalize_field_kwargs(kwargs=None)

Normalize constructor or persistence payloads through field hooks.

Every field may rewrite or expand incoming values before validation. This is how composite fields, foreign keys, and other virtual helpers expose a convenient high-level API while still producing a concrete payload.

PARAMETER DESCRIPTION
kwargs

Input payload keyed by logical field name.

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: Normalized payload.

Source code in saffier/core/db/models/base.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
@classmethod
def normalize_field_kwargs(cls, kwargs: dict[str, Any] | None = None) -> dict[str, Any]:
    """Normalize constructor or persistence payloads through field hooks.

    Every field may rewrite or expand incoming values before validation. This
    is how composite fields, foreign keys, and other virtual helpers expose a
    convenient high-level API while still producing a concrete payload.

    Args:
        kwargs: Input payload keyed by logical field name.

    Returns:
        dict[str, Any]: Normalized payload.
    """
    payload = dict(kwargs or {})
    for field_name, field in cls.fields.items():
        field.modify_input(field_name, payload)
    return payload

get_model_engine_name classmethod

get_model_engine_name()

Return the configured engine name for the model, if any.

RETURNS DESCRIPTION
str | None

str | None: Explicit model engine name, inherited registry default, or None when engine support is disabled.

Source code in saffier/core/db/models/base.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
@classmethod
def get_model_engine_name(cls) -> str | None:
    """Return the configured engine name for the model, if any.

    Returns:
        str | None: Explicit model engine name, inherited registry default,
            or `None` when engine support is disabled.
    """
    configured = getattr(cls.meta, "model_engine", None)
    if configured is False:
        return None
    if configured is None:
        configured = getattr(getattr(cls.meta, "registry", None), "model_engine", None)
    if configured in (None, False):
        return None
    if isinstance(configured, str):
        return configured
    return cast("str", getattr(configured, "name", None))

get_model_engine classmethod

get_model_engine()

Resolve and return the configured engine adapter for the model.

Source code in saffier/core/db/models/base.py
286
287
288
289
290
291
292
293
294
@classmethod
def get_model_engine(cls) -> "ModelEngine | None":
    """Resolve and return the configured engine adapter for the model."""
    configured = getattr(cls.meta, "model_engine", None)
    if configured is False:
        return None
    if configured is None:
        configured = getattr(getattr(cls.meta, "registry", None), "model_engine", None)
    return resolve_model_engine(configured)

require_model_engine classmethod

require_model_engine()

Return the model engine or raise when none is configured.

Source code in saffier/core/db/models/base.py
296
297
298
299
300
301
302
@classmethod
def require_model_engine(cls) -> "ModelEngine":
    """Return the model engine or raise when none is configured."""
    engine = cls.get_model_engine()
    if engine is None:
        raise ImproperlyConfigured(f"Model '{cls.__name__}' has no model engine configured.")
    return engine

get_engine_model_class classmethod

get_engine_model_class(*, mode='projection')

Return the engine-backed class for the model.

Source code in saffier/core/db/models/base.py
304
305
306
307
@classmethod
def get_engine_model_class(cls, *, mode: str = "projection") -> type[Any]:
    """Return the engine-backed class for the model."""
    return cls.require_model_engine().get_model_class(cls, mode=mode)

engine_validate classmethod

engine_validate(value, *, mode='validation')

Validate external data through the configured engine adapter.

Source code in saffier/core/db/models/base.py
309
310
311
312
@classmethod
def engine_validate(cls, value: Any, *, mode: str = "validation") -> Any:
    """Validate external data through the configured engine adapter."""
    return cls.require_model_engine().validate_model(cls, value, mode=mode)

from_engine classmethod

from_engine(value, *, exclude_unset=True)

Build a Saffier model instance from an engine-backed value.

PARAMETER DESCRIPTION
value

Engine-backed object or input payload accepted by the engine.

TYPE: Any

exclude_unset

Whether engine defaults absent from the original input should be omitted from the Saffier constructor payload.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Self

New Saffier model instance.

TYPE: Self

Source code in saffier/core/db/models/base.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
@classmethod
def from_engine(cls, value: Any, *, exclude_unset: bool = True) -> "Self":
    """Build a Saffier model instance from an engine-backed value.

    Args:
        value: Engine-backed object or input payload accepted by the engine.
        exclude_unset: Whether engine defaults absent from the original
            input should be omitted from the Saffier constructor payload.

    Returns:
        Self: New Saffier model instance.
    """
    payload = cls.require_model_engine().to_saffier_data(
        cls,
        value,
        exclude_unset=exclude_unset,
    )
    return cls(**payload)

engine_json_schema classmethod

engine_json_schema(*, mode='projection', **kwargs)

Return the engine-generated JSON schema for the model.

Source code in saffier/core/db/models/base.py
333
334
335
336
@classmethod
def engine_json_schema(cls, *, mode: str = "projection", **kwargs: Any) -> dict[str, Any]:
    """Return the engine-generated JSON schema for the model."""
    return cls.require_model_engine().json_schema(cls, mode=mode, **kwargs)

to_engine_model

to_engine_model(
    *, include=None, exclude=None, exclude_none=False
)

Project the current instance into the configured engine model.

Source code in saffier/core/db/models/base.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def to_engine_model(
    self,
    *,
    include: EngineIncludeExclude = None,
    exclude: EngineIncludeExclude = None,
    exclude_none: bool = False,
) -> Any:
    """Project the current instance into the configured engine model."""
    return (
        type(self)
        .require_model_engine()
        .project_model(
            self,
            include=include,
            exclude=exclude,
            exclude_none=exclude_none,
        )
    )

engine_dump

engine_dump(
    *, include=None, exclude=None, exclude_none=False
)

Serialize the engine-backed projection of the current instance.

Source code in saffier/core/db/models/base.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def engine_dump(
    self,
    *,
    include: EngineIncludeExclude = None,
    exclude: EngineIncludeExclude = None,
    exclude_none: bool = False,
) -> dict[str, Any]:
    """Serialize the engine-backed projection of the current instance."""
    return (
        type(self)
        .require_model_engine()
        .dump_model(
            self,
            include=include,
            exclude=exclude,
            exclude_none=exclude_none,
        )
    )

engine_dump_json

engine_dump_json(
    *, include=None, exclude=None, exclude_none=False
)

Serialize the engine-backed projection of the current instance to JSON.

Source code in saffier/core/db/models/base.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def engine_dump_json(
    self,
    *,
    include: EngineIncludeExclude = None,
    exclude: EngineIncludeExclude = None,
    exclude_none: bool = False,
) -> str:
    """Serialize the engine-backed projection of the current instance to JSON."""
    return (
        type(self)
        .require_model_engine()
        .dump_model_json(
            self,
            include=include,
            exclude=exclude,
            exclude_none=exclude_none,
        )
    )

_persist_model_references async

_persist_model_references(field_names=None)

Persist staged RefForeignKey values after a successful save.

PARAMETER DESCRIPTION
field_names

Optional subset of reference fields to persist.

TYPE: set[str] | None DEFAULT: None

Source code in saffier/core/db/models/base.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
async def _persist_model_references(self, field_names: set[str] | None = None) -> None:
    """Persist staged `RefForeignKey` values after a successful save.

    Args:
        field_names: Optional subset of reference fields to persist.
    """
    for field_name, field in self.fields.items():
        if not getattr(field, "is_model_reference", False):
            continue
        if field_names is not None and field_name not in field_names:
            continue
        if field_name not in self.__dict__:
            continue
        await field.persist_references(self, self.__dict__[field_name])
_persist_related_fields(field_names=None)

Persist staged reverse-relation values after the model is saved.

PARAMETER DESCRIPTION
field_names

Optional subset of reverse relation names to flush.

TYPE: set[str] | None DEFAULT: None

Source code in saffier/core/db/models/base.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
async def _persist_related_fields(self, field_names: set[str] | None = None) -> None:
    """Persist staged reverse-relation values after the model is saved.

    Args:
        field_names: Optional subset of reverse relation names to flush.
    """
    for field_name in self.meta.related_fields:
        if field_names is not None and field_name not in field_names:
            continue
        if field_name not in self.__dict__:
            continue
        value = self.__dict__[field_name]
        save_related = getattr(value, "save_related", None)
        if callable(save_related):
            await save_related()

_normalize_identifier_value staticmethod

_normalize_identifier_value(value)
Source code in saffier/core/db/models/base.py
426
427
428
429
430
@staticmethod
def _normalize_identifier_value(value: Any) -> Any:
    if hasattr(value, "__db_model__"):
        return getattr(value, "pk", None)
    return value

_pk_values

_pk_values()
Source code in saffier/core/db/models/base.py
432
433
434
435
436
def _pk_values(self) -> dict[str, Any]:
    return {
        field_name: self._normalize_identifier_value(getattr(self, field_name, None))
        for field_name in type(self).pknames
    }

_instance_transaction

_instance_transaction(*, force_rollback=False, **kwargs)
Source code in saffier/core/db/models/base.py
438
439
def _instance_transaction(self, *, force_rollback: bool = False, **kwargs: Any) -> Any:
    return self.database.transaction(force_rollback=force_rollback, **kwargs)

join_identifiers_to_string

join_identifiers_to_string(*, sep=', ', sep_inner='=')
Source code in saffier/core/db/models/base.py
490
491
492
493
494
495
def join_identifiers_to_string(self, *, sep: str = ", ", sep_inner: str = "=") -> str:
    pairs = [
        f"{field_name}{sep_inner}{self._normalize_identifier_value(getattr(self, field_name, None))}"
        for field_name in self.identifying_db_fields
    ]
    return sep.join(pairs)

get_real_class classmethod

get_real_class()
Source code in saffier/core/db/models/base.py
521
522
523
524
525
@classmethod
def get_real_class(cls) -> type["Model"]:
    if getattr(cls, "is_proxy_model", False):
        return cast("type[Model]", cls.parent)
    return cast("type[Model]", cls)

create_model_key

create_model_key()

Create a stable identity tuple for recursion guards.

RETURNS DESCRIPTION
Any

tuple[Any, ...]: Tuple containing the model class and identifying

...

values.

Source code in saffier/core/db/models/base.py
553
554
555
556
557
558
559
560
561
562
563
564
565
566
def create_model_key(self) -> tuple[Any, ...]:
    """Create a stable identity tuple for recursion guards.

    Returns:
        tuple[Any, ...]: Tuple containing the model class and identifying
        values.
    """
    return (
        self.__class__,
        *(
            self._normalize_identifier_value(getattr(self, field_name, None))
            for field_name in self.identifying_db_fields
        ),
    )

identifying_clauses

identifying_clauses(*, table=None, fields=None)

Build equality clauses that uniquely identify the current instance.

PARAMETER DESCRIPTION
table

Optional table object to target instead of the active instance table.

TYPE: Table | None DEFAULT: None

fields

Optional subset of identifying fields to include.

TYPE: Sequence[str] | None DEFAULT: None

RETURNS DESCRIPTION
list[Any]

list[Any]: SQLAlchemy boolean expressions suitable for WHERE.

Source code in saffier/core/db/models/base.py
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
def identifying_clauses(
    self,
    *,
    table: sqlalchemy.Table | None = None,
    fields: Sequence[str] | None = None,
) -> list[Any]:
    """Build equality clauses that uniquely identify the current instance.

    Args:
        table: Optional table object to target instead of the active instance
            table.
        fields: Optional subset of identifying fields to include.

    Returns:
        list[Any]: SQLAlchemy boolean expressions suitable for `WHERE`.
    """
    active_table = table or self.table
    clauses = []
    for field_name in tuple(fields or self.identifying_db_fields):
        column = getattr(active_table.c, field_name, None)
        if column is None:
            continue
        clauses.append(
            column == self._normalize_identifier_value(getattr(self, field_name, None))
        )
    return clauses

_has_loaded_db_fields

_has_loaded_db_fields()
Source code in saffier/core/db/models/base.py
595
596
597
598
599
600
def _has_loaded_db_fields(self) -> bool:
    return all(
        field_name in self.__dict__
        for field_name, field in self.fields.items()
        if field.has_column()
    )

get_active_class_schema classmethod

get_active_class_schema()

Return the schema pinned directly on the model class.

RETURNS DESCRIPTION
str | None

str | None: Class-level schema override, if present.

Source code in saffier/core/db/models/base.py
602
603
604
605
606
607
608
609
@classmethod
def get_active_class_schema(cls) -> str | None:
    """Return the schema pinned directly on the model class.

    Returns:
        str | None: Class-level schema override, if present.
    """
    return cast("str | None", getattr(cls, "__using_schema__", None))

get_active_instance_schema

get_active_instance_schema()

Return the schema currently active for this instance.

Instance-level overrides win over cached table schema, which in turn wins over the class-level override.

RETURNS DESCRIPTION
str | None

str | None: Active schema for this instance.

Source code in saffier/core/db/models/base.py
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
def get_active_instance_schema(self) -> str | None:
    """Return the schema currently active for this instance.

    Instance-level overrides win over cached table schema, which in turn wins
    over the class-level override.

    Returns:
        str | None: Active schema for this instance.
    """
    explicit = self.__dict__.get("__using_schema__", None)
    if explicit is not None:
        return cast("str | None", explicit)
    table = self.__dict__.get("_table", None)
    if table is not None:
        return cast("str | None", getattr(table, "schema", None))
    return self.__class__.get_active_class_schema()

load_recursive async

load_recursive(
    only_needed=False, only_needed_nest=False, _seen=None
)

Load the instance and recurse through already-linked FK relations.

The method is careful to avoid infinite loops by tracking visited model identities while traversing the object graph.

PARAMETER DESCRIPTION
only_needed

Whether to skip reloading when DB-backed fields are already populated.

TYPE: bool DEFAULT: False

only_needed_nest

Whether nested relations should stop after the current object was already fully loaded.

TYPE: bool DEFAULT: False

_seen

Internal recursion guard keyed by model identity.

TYPE: set[tuple[Any, ...]] | None DEFAULT: None

Source code in saffier/core/db/models/base.py
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
async def load_recursive(
    self,
    only_needed: bool = False,
    only_needed_nest: bool = False,
    _seen: set[tuple[Any, ...]] | None = None,
) -> None:
    """Load the instance and recurse through already-linked FK relations.

    The method is careful to avoid infinite loops by tracking visited model
    identities while traversing the object graph.

    Args:
        only_needed: Whether to skip reloading when DB-backed fields are
            already populated.
        only_needed_nest: Whether nested relations should stop after the
            current object was already fully loaded.
        _seen: Internal recursion guard keyed by model identity.
    """
    model_key = self.create_model_key()
    if _seen is None:
        _seen = {model_key}
    elif model_key in _seen:
        return
    else:
        _seen.add(model_key)

    was_loaded = self._has_loaded_db_fields()
    if self.can_load:
        await self.load(only_needed=only_needed)

    if only_needed_nest and was_loaded:
        return

    for field_name in self.meta.foreign_key_fields:
        value = getattr(self, field_name, None)
        if value is not None and hasattr(value, "load_recursive"):
            await value.load_recursive(
                only_needed=only_needed,
                only_needed_nest=True,
                _seen=_seen,
            )

get_instance_name

get_instance_name()

Return the lowercase class name used by relation helpers.

RETURNS DESCRIPTION
str

Lowercase model class name.

TYPE: str

Source code in saffier/core/db/models/base.py
670
671
672
673
674
675
676
def get_instance_name(self) -> str:
    """Return the lowercase class name used by relation helpers.

    Returns:
        str: Lowercase model class name.
    """
    return self.__class__.__name__.lower()

_copy_model_definitions classmethod

_copy_model_definitions(
    *, registry=None, unlink_same_registry=True
)

Copy fields and managers for dynamic model cloning.

Relation fields may be rewritten as string references when the target registry is still under construction so copied models do not retain live references back into the source registry.

PARAMETER DESCRIPTION
registry

Optional destination registry receiving the copy.

TYPE: Registry | None DEFAULT: None

unlink_same_registry

Whether relations inside the same source registry should be rewritten as string references.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: Definitions suitable for dynamic model creation.

Source code in saffier/core/db/models/base.py
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
@classmethod
def _copy_model_definitions(
    cls,
    *,
    registry: "saffier.Registry | None" = None,
    unlink_same_registry: bool = True,
) -> dict[str, Any]:
    """Copy fields and managers for dynamic model cloning.

    Relation fields may be rewritten as string references when the target
    registry is still under construction so copied models do not retain live
    references back into the source registry.

    Args:
        registry: Optional destination registry receiving the copy.
        unlink_same_registry: Whether relations inside the same source
            registry should be rewritten as string references.

    Returns:
        dict[str, Any]: Definitions suitable for dynamic model creation.
    """
    from saffier.core.db.fields.base import ForeignKey, ManyToManyField

    definitions: dict[str, Any] = {}
    source_registry = getattr(cls.meta, "registry", None)
    existing_annotations = dict(getattr(cls, "__annotations__", {}))
    manager_annotations: dict[str, Any] = {}

    for field_name, field in cls.fields.items():
        if getattr(field, "no_copy", False):
            continue
        field_copy = copy.copy(field)
        if hasattr(field_copy, "_target"):
            delattr(field_copy, "_target")

        if source_registry not in (None, False):
            if isinstance(field_copy, ForeignKey):
                target = field.target
                if getattr(target.meta, "registry", None) is source_registry:
                    if unlink_same_registry:
                        field_copy.to = target.__name__
                    elif registry is not None:
                        field_copy.related_name = False
            elif isinstance(field_copy, ManyToManyField):
                target = field.target
                if (
                    getattr(target.meta, "registry", None) is source_registry
                    and unlink_same_registry
                ):
                    field_copy.to = target.__name__
                through = getattr(field_copy, "through", None)
                if (
                    isinstance(through, type)
                    and getattr(through.meta, "registry", None) is source_registry
                    and unlink_same_registry
                ):
                    field_copy.through = through.__name__

        definitions[field_name] = field_copy

    for manager_name in getattr(cls.meta, "managers", []):
        manager = getattr(cls, manager_name, None)
        if isinstance(manager, Manager):
            definitions[manager_name] = copy.copy(manager)
            manager_annotations[manager_name] = existing_annotations.get(
                manager_name,
                ClassVar[Any],
            )

    if manager_annotations:
        definitions["__annotations__"] = {
            **existing_annotations,
            **manager_annotations,
        }

    return definitions

copy_saffier_model classmethod

copy_saffier_model(
    registry=None,
    name="",
    unlink_same_registry=True,
    on_conflict="error",
)

Create a detached copy of the model class with copied field state.

The copy is used by migration preparation, proxy generation, and tests that need to attach the same logical model definition to another registry. Field instances are copied so later mutations on the copied model do not leak back into the original model definition.

PARAMETER DESCRIPTION
registry

Optional registry to immediately attach the copied model to.

TYPE: Registry | None DEFAULT: None

name

Optional replacement class name.

TYPE: str DEFAULT: ''

unlink_same_registry

Whether same-registry relations should be rewritten as string references during the copy.

TYPE: bool DEFAULT: True

on_conflict

Conflict strategy used if registry already contains a model with the target name.

TYPE: str DEFAULT: 'error'

RETURNS DESCRIPTION
type[Model]

type[Model]: The detached or newly attached copied model class.

Source code in saffier/core/db/models/base.py
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
@classmethod
def copy_saffier_model(
    cls,
    registry: "saffier.Registry | None" = None,
    name: str = "",
    unlink_same_registry: bool = True,
    on_conflict: str = "error",
) -> type["Model"]:
    """Create a detached copy of the model class with copied field state.

    The copy is used by migration preparation, proxy generation, and tests
    that need to attach the same logical model definition to another
    registry. Field instances are copied so later mutations on the copied
    model do not leak back into the original model definition.

    Args:
        registry: Optional registry to immediately attach the copied model
            to.
        name: Optional replacement class name.
        unlink_same_registry: Whether same-registry relations should be
            rewritten as string references during the copy.
        on_conflict: Conflict strategy used if `registry` already contains a
            model with the target name.

    Returns:
        type[Model]: The detached or newly attached copied model class.
    """
    existing_annotations = dict(getattr(cls, "__annotations__", {}))
    definitions = {
        key: value
        for key, value in cls.__dict__.items()
        if key not in _MODEL_COPY_EXCLUDED_ATTRS and not key.startswith("__")
    }
    definitions.update(
        cls._copy_model_definitions(
            registry=registry, unlink_same_registry=unlink_same_registry
        )
    )
    manager_annotations = dict(definitions.get("__annotations__", {}))
    for manager_name, value in definitions.items():
        if isinstance(value, Manager):
            manager_annotations.setdefault(
                manager_name,
                existing_annotations.get(manager_name, ClassVar[Any]),
            )
    if manager_annotations:
        definitions["__annotations__"] = {
            **existing_annotations,
            **manager_annotations,
        }
    definitions["__skip_registry__"] = True

    meta = type(
        "Meta",
        (),
        {
            "registry": False,
            "tablename": getattr(cls.meta, "tablename", None),
            "table_prefix": getattr(cls.meta, "table_prefix", None),
            "unique_together": list(getattr(cls.meta, "unique_together", []) or []),
            "indexes": list(getattr(cls.meta, "indexes", []) or []),
            "constraints": list(getattr(cls.meta, "constraints", []) or []),
            "reflect": getattr(cls.meta, "reflect", False),
            "abstract": getattr(cls.meta, "abstract", False),
            "model_engine": getattr(cls.meta, "model_engine", None),
            "is_tenant": getattr(cls.meta, "is_tenant", None),
            "register_default": getattr(cls.meta, "register_default", None),
        },
    )

    copied_model = create_saffier_model(
        name or cls.__name__,
        cls.__module__,
        __definitions__=definitions,
        __metadata__=meta,
        __qualname__=cls.__qualname__,
        __bases__=cls.__bases__,
    )
    copied_model.database = getattr(cls, "database", None)
    copied_model.__using_schema__ = getattr(cls, "__using_schema__", None)

    if registry is None:
        copied_model.meta.registry = False
        return copied_model
    return copied_model.add_to_registry(registry, on_conflict=on_conflict)

real_add_to_registry classmethod

real_add_to_registry(
    *,
    registry,
    name="",
    database="keep",
    on_conflict="error",
)

Attach a copied model class to another registry.

This is the internal implementation behind add_to_registry(). It resolves naming conflicts, rebinds fields and managers to the target registry, rebuilds relation descriptors, and regenerates the proxy model so the attached class behaves like a first-class registered model.

PARAMETER DESCRIPTION
registry

Target registry.

TYPE: Registry

name

Optional replacement model name.

TYPE: str DEFAULT: ''

database

Database binding strategy or explicit database object.

TYPE: bool | Any | str DEFAULT: 'keep'

on_conflict

Conflict strategy for duplicate model names.

TYPE: str DEFAULT: 'error'

RETURNS DESCRIPTION
type[Model]

type[Model]: The attached model class.

RAISES DESCRIPTION
ImproperlyConfigured

If a conflicting model already exists and the conflict strategy is "error".

Source code in saffier/core/db/models/base.py
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
@classmethod
def real_add_to_registry(
    cls,
    *,
    registry: "saffier.Registry",
    name: str = "",
    database: bool | Any | str = "keep",
    on_conflict: str = "error",
) -> type["Model"]:
    """Attach a copied model class to another registry.

    This is the internal implementation behind `add_to_registry()`. It
    resolves naming conflicts, rebinds fields and managers to the target
    registry, rebuilds relation descriptors, and regenerates the proxy model
    so the attached class behaves like a first-class registered model.

    Args:
        registry: Target registry.
        name: Optional replacement model name.
        database: Database binding strategy or explicit database object.
        on_conflict: Conflict strategy for duplicate model names.

    Returns:
        type[Model]: The attached model class.

    Raises:
        ImproperlyConfigured: If a conflicting model already exists and the
            conflict strategy is `"error"`.
    """
    if getattr(cls.meta, "registry", None) not in (None, False, registry):
        return cls.copy_saffier_model(
            registry=registry,
            name=name or cls.__name__,
            on_conflict=on_conflict,
        )

    model_name = name or cls.__name__
    if model_name in registry.models:
        if on_conflict == "keep":
            return cast("type[Model]", registry.models[model_name])
        if on_conflict == "replace":
            registry.delete_model(model_name)
        else:
            raise ImproperlyConfigured(
                f'A model with the same name is already registered: "{model_name}".'
            )

    cls.meta.registry = registry
    cls.__name__ = model_name

    if database is True:
        cls.database = registry.database
    elif database not in (False, "keep"):
        cls.database = database
    elif getattr(cls, "database", None) is None:
        cls.database = registry.database

    registry.models[model_name] = cls
    cls.__db_model__ = True
    cls.meta.model = cls
    cls.fields = cls.meta.fields
    cls._table = None
    cls._db_schemas = {}

    for field_name, field in cls.fields.items():
        field.owner = cls
        field.registry = registry
        if field.primary_key and field_name == cls.meta.pk_attribute:
            cls.pkname = field_name

    for manager_name in getattr(cls.meta, "managers", []):
        manager = getattr(cls, manager_name, None)
        if isinstance(manager, Manager):
            manager.name = manager_name
            manager.model_class = cls

    if getattr(cls.meta, "foreign_key_fields", None) and not cls.is_proxy_model:
        related_names = _set_related_name_for_foreign_keys(cls.meta.foreign_key_fields, cls)
        cls.meta.related_names.update(related_names)

    for field_name, field in list(cls.fields.items()):
        if isinstance(field, saffier.ManyToManyField):
            _set_many_to_many_relation(field, cls, field_name)

    _register_model_signals(cls)
    cls.__proxy_model__ = None
    if not cls.is_proxy_model and not cls.meta.abstract:
        proxy_model = cls.generate_proxy_model()
        cls.__proxy_model__ = proxy_model
        cls.__proxy_model__.parent = cls
        registry.models[model_name] = cls

    if hasattr(registry, "_handle_model_registration"):
        registry._handle_model_registration(cls)
    registry.execute_model_callbacks(cls)
    return cast("type[Model]", cls)

add_to_registry classmethod

add_to_registry(
    registry,
    name="",
    database="keep",
    *,
    on_conflict="error",
)

Attach the model to another registry, copying if required.

PARAMETER DESCRIPTION
registry

Destination registry.

TYPE: Registry

name

Optional replacement model name.

TYPE: str DEFAULT: ''

database

Database rebinding strategy.

TYPE: bool | Any | str DEFAULT: 'keep'

on_conflict

Conflict strategy for duplicate model names.

TYPE: str DEFAULT: 'error'

RETURNS DESCRIPTION
type[Model]

type[Model]: Registered model class.

Source code in saffier/core/db/models/base.py
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
@classmethod
def add_to_registry(
    cls,
    registry: "saffier.Registry",
    name: str = "",
    database: bool | Any | str = "keep",
    *,
    on_conflict: str = "error",
) -> type["Model"]:
    """Attach the model to another registry, copying if required.

    Args:
        registry: Destination registry.
        name: Optional replacement model name.
        database: Database rebinding strategy.
        on_conflict: Conflict strategy for duplicate model names.

    Returns:
        type[Model]: Registered model class.
    """
    return cls.real_add_to_registry(
        registry=registry,
        name=name,
        database=database,
        on_conflict=on_conflict,
    )

generate_proxy_model classmethod

generate_proxy_model()

Generate the lightweight proxy model used for SQLAlchemy-style access.

Proxy models mirror the field layout of the concrete model but stay detached from registry registration. They exist so class-level field access can emulate SQLAlchemy's declarative attribute style.

RETURNS DESCRIPTION
type[Model]

type[Model]: Proxy model class for cls.

Source code in saffier/core/db/models/base.py
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
@classmethod
def generate_proxy_model(cls) -> type["Model"]:
    """Generate the lightweight proxy model used for SQLAlchemy-style access.

    Proxy models mirror the field layout of the concrete model but stay
    detached from registry registration. They exist so class-level field
    access can emulate SQLAlchemy's declarative attribute style.

    Returns:
        type[Model]: Proxy model class for `cls`.
    """
    existing_proxy = cls.__dict__.get("__proxy_model__")
    if existing_proxy:
        return existing_proxy

    fields = {key: copy.copy(field) for key, field in cls.fields.items()}
    fields["__skip_registry__"] = True
    proxy_model = ProxyModel(
        name=cls.__name__,
        module=cls.__module__,
        metadata=cls.meta,
        definitions=fields,
    )

    proxy_model.build()
    generify_model_fields(proxy_model.model)
    return proxy_model.model

build classmethod

build(schema=None)

Build the SQLAlchemy table for the model in the requested schema.

Table generation expands multi-column fields, applies field-level global constraints, attaches Meta-declared indexes and constraints, and keeps schema-specific tables isolated from the shared registry metadata.

PARAMETER DESCRIPTION
schema

Schema name to build against. None uses the registry default behavior.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
Table

sqlalchemy.Table: Built or cached SQLAlchemy table for the model.

Source code in saffier/core/db/models/base.py
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
@classmethod
def build(cls, schema: str | None = None) -> sqlalchemy.Table:
    """Build the SQLAlchemy table for the model in the requested schema.

    Table generation expands multi-column fields, applies field-level global
    constraints, attaches `Meta`-declared indexes and constraints, and keeps
    schema-specific tables isolated from the shared registry metadata.

    Args:
        schema: Schema name to build against. `None` uses the registry
            default behavior.

    Returns:
        sqlalchemy.Table: Built or cached SQLAlchemy table for the model.
    """
    tablename = cls.meta.tablename
    registry = cls.meta.registry
    database = getattr(cls, "database", None)
    if database is not None and hasattr(registry, "metadata_by_url"):
        registry_metadata = cast(
            "sqlalchemy.MetaData", registry.metadata_by_url[str(database.url)]
        )
    else:
        registry_metadata = cast("sqlalchemy.MetaData", registry._metadata)  # type: ignore
    schema_metadata_cache: dict[str | None, sqlalchemy.MetaData] = getattr(
        registry,
        "_schema_metadata_cache",
        {},
    )
    if not hasattr(registry, "_schema_metadata_cache"):
        registry._schema_metadata_cache = schema_metadata_cache
    registry_schema = registry.db_schema

    # Keep tenant/using table generation isolated from the shared registry
    # metadata. This prevents cross-schema table/index leakage in metadata.
    if schema == registry_schema:
        metadata = registry_metadata
        metadata.schema = schema
    else:
        if schema not in schema_metadata_cache:
            schema_metadata_cache[schema] = sqlalchemy.MetaData(schema=schema)
        metadata = schema_metadata_cache[schema]

    table_key = tablename if schema is None else f"{schema}.{tablename}"
    existing_table = metadata.tables.get(table_key)
    if existing_table is not None:
        return existing_table

    unique_together = cls.meta.unique_together
    index_constraints = cls.meta.indexes
    table_constraints = cls.meta.constraints

    columns = []
    field_constraints = []
    for name, field in cls.fields.items():
        if field.has_column():
            field_columns = list(field.get_columns(name))
            columns.extend(field_columns)
            field_constraints.extend(
                field.get_global_constraints(name, field_columns, schema=schema)
            )

    # Handle the uniqueness together
    uniques = []
    for field in unique_together or []:
        unique_constraint = cls._get_unique_constraints(field)
        uniques.append(unique_constraint)

    # Handle the indexes
    indexes = []
    for field in index_constraints or []:
        index = cls._get_indexes(field)
        indexes.append(index)

    constraints = []
    for constraint in table_constraints or []:
        constraints.append(constraint)
    constraints.extend(field_constraints)

    return sqlalchemy.Table(
        tablename,
        metadata,
        *columns,
        *uniques,
        *indexes,
        *constraints,
        extend_existing=True,  # type: ignore
    )

_get_unique_constraints classmethod

_get_unique_constraints(columns)

Returns the unique constraints for the model.

The columns must be a a list, tuple of strings or a UniqueConstraint object.

Source code in saffier/core/db/models/base.py
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
@classmethod
def _get_unique_constraints(cls, columns: Sequence) -> sqlalchemy.UniqueConstraint | None:
    """
    Returns the unique constraints for the model.

    The columns must be a a list, tuple of strings or a UniqueConstraint object.
    """
    if isinstance(columns, str):
        expanded = cls._expand_constraint_field_names((columns,))
        return sqlalchemy.UniqueConstraint(*expanded)
    elif isinstance(columns, UniqueConstraint):
        expanded = cls._expand_constraint_field_names(columns.fields)
        return sqlalchemy.UniqueConstraint(*expanded)
    expanded = cls._expand_constraint_field_names(columns)
    return sqlalchemy.UniqueConstraint(*expanded)

_get_indexes classmethod

_get_indexes(index)

Build a SQLAlchemy index from a Saffier Index declaration.

PARAMETER DESCRIPTION
index

Declared Saffier index definition.

TYPE: Index

RETURNS DESCRIPTION
Index | None

sqlalchemy.Index | None: SQLAlchemy index object.

Source code in saffier/core/db/models/base.py
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
@classmethod
def _get_indexes(cls, index: Index) -> sqlalchemy.Index | None:
    """Build a SQLAlchemy index from a Saffier `Index` declaration.

    Args:
        index: Declared Saffier index definition.

    Returns:
        sqlalchemy.Index | None: SQLAlchemy index object.
    """
    expanded = cls._expand_constraint_field_names(index.fields or ())
    return sqlalchemy.Index(index.name, *expanded)  # type: ignore

_expand_constraint_field_names classmethod

_expand_constraint_field_names(names)
Source code in saffier/core/db/models/base.py
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
@classmethod
def _expand_constraint_field_names(cls, names: Sequence[str]) -> tuple[str, ...]:
    expanded: list[str] = []
    for name in names:
        field = cls.fields.get(name)
        if field is not None and hasattr(field, "get_column_names"):
            expanded.extend(field.get_column_names(name))
        else:
            expanded.append(name)
    return tuple(expanded)

update_from_dict

update_from_dict(dict_values)

Assign values from a dictionary using normal attribute semantics.

RETURNS DESCRIPTION
Self

The current instance for fluent usage.

TYPE: Self

Source code in saffier/core/db/models/base.py
1125
1126
1127
1128
1129
1130
1131
1132
1133
def update_from_dict(self, dict_values: dict[str, Any]) -> Self:
    """Assign values from a dictionary using normal attribute semantics.

    Returns:
        Self: The current instance for fluent usage.
    """
    for key, value in dict_values.items():
        setattr(self, key, value)
    return self

extract_db_fields

extract_db_fields(only=None)

Collect persisted field values from the current instance.

PARAMETER DESCRIPTION
only

Optional subset of logical field names to include.

TYPE: Sequence[str] | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: Payload containing only database-backed fields.

Source code in saffier/core/db/models/base.py
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
def extract_db_fields(self, only: Sequence[str] | None = None) -> dict[str, Any]:
    """Collect persisted field values from the current instance.

    Args:
        only: Optional subset of logical field names to include.

    Returns:
        dict[str, Any]: Payload containing only database-backed fields.
    """
    if only is not None:
        allowed = set(only)
        return {
            k: v
            for k, v in self.__dict__.items()
            if k in allowed and k in self.fields and self.fields[k].has_column()
        }
    return {
        k: v
        for k, v in self.__dict__.items()
        if k in self.fields and self.fields[k].has_column()
    }

extract_column_values classmethod

extract_column_values(
    extracted_values,
    is_update=False,
    is_partial=False,
    phase="",
    instance=None,
    model_instance=None,
    evaluate_values=False,
)

Convert logical field payloads into database column payloads.

The method runs field-specific normalization, expands multi-column fields such as composite foreign keys, injects defaults when allowed, and returns the final column-value mapping used by insert/update expressions.

PARAMETER DESCRIPTION
extracted_values

Logical field payload keyed by Saffier field name.

TYPE: dict[str, Any]

is_update

Whether the payload is for an update operation.

TYPE: bool DEFAULT: False

is_partial

Whether the update payload omits untouched fields.

TYPE: bool DEFAULT: False

phase

Context label exposed to field hooks through context vars.

TYPE: str DEFAULT: ''

instance

Instance currently being persisted.

TYPE: Any | None DEFAULT: None

model_instance

Original model instance used by nested helpers.

TYPE: Any | None DEFAULT: None

evaluate_values

Whether callable payload values should be executed.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: Database-ready payload keyed by SQL column name.

Source code in saffier/core/db/models/base.py
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
@classmethod
def extract_column_values(
    cls,
    extracted_values: dict[str, Any],
    is_update: bool = False,
    is_partial: bool = False,
    phase: str = "",
    instance: Any | None = None,
    model_instance: Any | None = None,
    evaluate_values: bool = False,
) -> dict[str, Any]:
    """Convert logical field payloads into database column payloads.

    The method runs field-specific normalization, expands multi-column
    fields such as composite foreign keys, injects defaults when allowed,
    and returns the final column-value mapping used by insert/update
    expressions.

    Args:
        extracted_values: Logical field payload keyed by Saffier field name.
        is_update: Whether the payload is for an update operation.
        is_partial: Whether the update payload omits untouched fields.
        phase: Context label exposed to field hooks through context vars.
        instance: Instance currently being persisted.
        model_instance: Original model instance used by nested helpers.
        evaluate_values: Whether callable payload values should be executed.

    Returns:
        dict[str, Any]: Database-ready payload keyed by SQL column name.
    """
    validated: dict[str, Any] = {}
    token = CURRENT_PHASE.set(phase)
    token2 = CURRENT_INSTANCE.set(instance)
    token3 = CURRENT_MODEL_INSTANCE.set(model_instance)
    field_dict: dict[str, Any] = {}
    token_field_ctx = CURRENT_FIELD_CONTEXT.set(field_dict)

    try:
        extracted_values = cls.normalize_field_kwargs(extracted_values)
        if evaluate_values:
            new_extracted_values: dict[str, Any] = {}
            for key, value in extracted_values.items():
                if callable(value):
                    field_dict.clear()
                    field_dict["field"] = cls.meta.fields.get(key)
                    value = value()
                new_extracted_values[key] = value
            extracted_values = new_extracted_values
        else:
            extracted_values = dict(extracted_values)

        if cls.meta.input_modifying_fields:
            for field_name in cls.meta.input_modifying_fields:
                cls.meta.fields[field_name].modify_input(field_name, extracted_values)

        need_second_pass = []
        for field_name, field in cls.meta.fields.items():
            field_dict.clear()
            field_dict["field"] = field
            if field.validator.read_only:
                if field_name in extracted_values:
                    for sub_name, value in field.clean(
                        field_name, extracted_values[field_name]
                    ).items():
                        if sub_name in validated:
                            raise ValueError(f"value set twice for key: {sub_name}")
                        validated[sub_name] = value
                elif (
                    not is_partial or (field.inject_default_on_partial_update and is_update)
                ) and field.has_default():
                    validated.update(field.get_default_values(field_name, validated))
                continue
            if field_name in extracted_values:
                item = extracted_values[field_name]
                for sub_name, value in field.clean(field_name, item).items():
                    if sub_name in validated:
                        raise ValueError(f"value set twice for key: {sub_name}")
                    validated[sub_name] = value
            elif (
                not is_partial or (field.inject_default_on_partial_update and is_update)
            ) and field.has_default():
                need_second_pass.append(field)

        for field in need_second_pass:
            field_dict.clear()
            field_dict["field"] = field
            if field.name not in validated:
                validated.update(field.get_default_values(field.name, validated))
    finally:
        CURRENT_FIELD_CONTEXT.reset(token_field_ctx)
        CURRENT_MODEL_INSTANCE.reset(token3)
        CURRENT_INSTANCE.reset(token2)
        CURRENT_PHASE.reset(token)
    return validated

__get_instance_values

__get_instance_values(instance)
Source code in saffier/core/db/models/base.py
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
def __get_instance_values(self, instance: Any) -> set[tuple[str, Any]]:
    values: set[tuple[str, Any]] = set()
    for key, value in instance.__dict__.items():
        if key not in instance.fields or value is None:
            continue
        if hasattr(value, "pk"):
            value = value.pk
        try:
            hash(value)
        except TypeError:
            value = repr(value)
        values.add((key, value))
    return values

__eq__

__eq__(other)
Source code in saffier/core/db/models/base.py
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
def __eq__(self, other: Any) -> bool:
    if not isinstance(other, SaffierBaseModel):
        return False
    if self.meta.registry is not other.meta.registry:
        return False
    if self.meta.tablename != other.meta.tablename:
        return False

    for field_name in self.identifying_db_fields:
        left = getattr(self, field_name, None)
        right = getattr(other, field_name, None)
        if hasattr(left, "pk"):
            left = left.pk
        if hasattr(right, "pk"):
            right = right.pk
        if left != right:
            return False
    return True

_row_value staticmethod

_row_value(row, key)

Return one value from a SQLAlchemy row mapping.

The SQLAlchemy result API may expose values either through the row's mapping interface or as attributes. This helper hides that difference so higher-level row loaders can ask for one key without caring how the result object was produced.

PARAMETER DESCRIPTION
row

SQLAlchemy row produced by a compiled query.

TYPE: Row

key

Column or label name to resolve.

TYPE: str

RETURNS DESCRIPTION
Any

Resolved value, or None when the row does not expose key.

TYPE: Any

Source code in saffier/core/db/models/row.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@staticmethod
def _row_value(row: Row, key: str) -> Any:
    """Return one value from a SQLAlchemy row mapping.

    The SQLAlchemy result API may expose values either through the row's
    mapping interface or as attributes. This helper hides that difference so
    higher-level row loaders can ask for one key without caring how the
    result object was produced.

    Args:
        row (Row): SQLAlchemy row produced by a compiled query.
        key (str): Column or label name to resolve.

    Returns:
        Any: Resolved value, or `None` when the row does not expose `key`.
    """
    if key in row._mapping:
        return row._mapping[key]
    return getattr(row, key, None)

_table_row_value classmethod

_table_row_value(row, table, key)

Resolve a row value using table-column objects when available.

Joined queries may store row data under SQLAlchemy Column objects instead of plain string keys. This helper first tries the concrete table column and falls back to the generic key lookup used by _row_value.

PARAMETER DESCRIPTION
row

SQLAlchemy row produced by a query.

TYPE: Row

table

SQLAlchemy table-like object exposing .c.

TYPE: Any

key

Column key to resolve.

TYPE: str

RETURNS DESCRIPTION
Any

Value associated with the requested column, or None when the

TYPE: Any

Any

row does not contain it.

Source code in saffier/core/db/models/row.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@classmethod
def _table_row_value(cls, row: Row, table: Any, key: str) -> Any:
    """Resolve a row value using table-column objects when available.

    Joined queries may store row data under SQLAlchemy `Column` objects
    instead of plain string keys. This helper first tries the concrete table
    column and falls back to the generic key lookup used by `_row_value`.

    Args:
        row (Row): SQLAlchemy row produced by a query.
        table (Any): SQLAlchemy table-like object exposing `.c`.
        key (str): Column key to resolve.

    Returns:
        Any: Value associated with the requested column, or `None` when the
        row does not contain it.
    """
    column = getattr(getattr(table, "c", None), key, None)
    if column is not None and column in row._mapping:
        return row._mapping[column]
    return cls._row_value(row, key)
_build_related_pk_filter(path, row, model_class)

Build a queryset filter targeting one related object's primary key.

Prefetch helpers need a deterministic filter payload for related lookups, even when the related model has a composite primary key. This method expands the supplied path into one path__pk_field=value entry per primary-key column.

PARAMETER DESCRIPTION
path

Lookup prefix leading to the related model.

TYPE: str

row

Source row containing the root primary-key values.

TYPE: Row

model_class

Model class or instance exposing pknames.

TYPE: Any

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: Keyword arguments suitable for queryset filtering.

Source code in saffier/core/db/models/row.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@classmethod
def _build_related_pk_filter(cls, path: str, row: Row, model_class: Any) -> dict[str, Any]:
    """Build a queryset filter targeting one related object's primary key.

    Prefetch helpers need a deterministic filter payload for related
    lookups, even when the related model has a composite primary key. This
    method expands the supplied path into one `path__pk_field=value` entry
    per primary-key column.

    Args:
        path (str): Lookup prefix leading to the related model.
        row (Row): Source row containing the root primary-key values.
        model_class (Any): Model class or instance exposing `pknames`.

    Returns:
        dict[str, Any]: Keyword arguments suitable for queryset filtering.
    """
    target_class = model_class if isinstance(model_class, type) else type(model_class)
    return {
        f"{path}__{pk_name}": cls._row_value(row, pk_name) for pk_name in target_class.pknames
    }
apply_prefetch_related(row, model, prefetch_related)

Apply prefetch directives to an already materialized model instance.

PARAMETER DESCRIPTION
row

Source row used to derive relation filters.

TYPE: Row

model

Model instance returned by row materialization.

TYPE: type[Model]

prefetch_related

Prefetch directives attached to the queryset.

TYPE: Sequence[Prefetch]

RETURNS DESCRIPTION
type[Model]

type[Model]: The same model instance after all prefetch targets have

type[Model]

been attached.

Source code in saffier/core/db/models/row.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@classmethod
async def apply_prefetch_related(
    cls,
    row: Row,
    model: type["Model"],
    prefetch_related: Sequence["Prefetch"],
) -> type["Model"]:
    """Apply prefetch directives to an already materialized model instance.

    Args:
        row (Row): Source row used to derive relation filters.
        model (type[Model]): Model instance returned by row materialization.
        prefetch_related (Sequence[Prefetch]): Prefetch directives attached
            to the queryset.

    Returns:
        type[Model]: The same model instance after all prefetch targets have
        been attached.
    """
    if not prefetch_related:
        return model
    return await cls.__handle_prefetch_related_async(
        row=row,
        model=model,
        prefetch_related=prefetch_related,
    )

from_query_result classmethod

from_query_result(
    row,
    select_related=None,
    prefetch_related=None,
    is_only_fields=False,
    only_fields=None,
    is_defer_fields=False,
    exclude_secrets=False,
    using_schema=None,
    reference_select=None,
    tables_and_models=None,
    root_model_class=None,
    prefix="",
)

Convert one SQLAlchemy result row into a Saffier model instance.

The loader reconstructs the requested object graph in three phases: nested select_related branches are materialized first, local foreign-key placeholders are populated next, and finally the base model fields are copied from the row. Optional only(), defer(), reference_select(), secret-field exclusion, and schema overrides are all handled during the same pass so downstream queryset code receives a fully consistent model instance.

PARAMETER DESCRIPTION
row

SQLAlchemy row returned by the executed query.

TYPE: Row

select_related

Related paths that were joined into the query and should be materialized recursively.

TYPE: Sequence[Any] | None DEFAULT: None

prefetch_related

Deferred prefetch instructions to apply after the base instance exists.

TYPE: Sequence[Prefetch] | None DEFAULT: None

is_only_fields

Whether the row was produced by an only() queryset.

TYPE: bool DEFAULT: False

only_fields

Explicit field subset requested by only().

TYPE: Sequence[str] | None DEFAULT: None

is_defer_fields

Whether the row omits deferred fields.

TYPE: bool DEFAULT: False

exclude_secrets

Whether secret fields should be left unloaded on the resulting model.

TYPE: bool DEFAULT: False

using_schema

Schema override applied to generated table lookups and related instances.

TYPE: str | None DEFAULT: None

reference_select

Extra row aliases to copy onto the model or nested related objects.

TYPE: dict[str, Any] | None DEFAULT: None

tables_and_models

Mapping produced by queryset compilation for joined table aliases.

TYPE: dict[str, tuple[Any, Any]] | None DEFAULT: None

root_model_class

Root model being materialized when recursion traverses into nested related models.

TYPE: type[Model] | None DEFAULT: None

prefix

Prefix identifying the current join branch inside tables_and_models.

TYPE: str DEFAULT: ''

RETURNS DESCRIPTION
type[Model] | None

type[Model] | None: Materialized model instance, or None when a

type[Model] | None

nested branch cannot be populated from the row.

Source code in saffier/core/db/models/row.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
@classmethod
def from_query_result(
    cls,
    row: Row,
    select_related: Sequence[Any] | None = None,
    prefetch_related: Sequence["Prefetch"] | None = None,
    is_only_fields: bool = False,
    only_fields: Sequence[str] = None,
    is_defer_fields: bool = False,
    exclude_secrets: bool = False,
    using_schema: str | None = None,
    reference_select: dict[str, Any] | None = None,
    tables_and_models: dict[str, tuple[Any, Any]] | None = None,
    root_model_class: type["Model"] | None = None,
    prefix: str = "",
) -> type["Model"] | None:
    """Convert one SQLAlchemy result row into a Saffier model instance.

    The loader reconstructs the requested object graph in three phases:
    nested `select_related` branches are materialized first, local
    foreign-key placeholders are populated next, and finally the base model
    fields are copied from the row. Optional `only()`, `defer()`,
    `reference_select()`, secret-field exclusion, and schema overrides are
    all handled during the same pass so downstream queryset code receives a
    fully consistent model instance.

    Args:
        row (Row): SQLAlchemy row returned by the executed query.
        select_related (Sequence[Any] | None): Related paths that were joined
            into the query and should be materialized recursively.
        prefetch_related (Sequence[Prefetch] | None): Deferred prefetch
            instructions to apply after the base instance exists.
        is_only_fields (bool): Whether the row was produced by an `only()`
            queryset.
        only_fields (Sequence[str] | None): Explicit field subset requested
            by `only()`.
        is_defer_fields (bool): Whether the row omits deferred fields.
        exclude_secrets (bool): Whether secret fields should be left unloaded
            on the resulting model.
        using_schema (str | None): Schema override applied to generated table
            lookups and related instances.
        reference_select (dict[str, Any] | None): Extra row aliases to copy
            onto the model or nested related objects.
        tables_and_models (dict[str, tuple[Any, Any]] | None): Mapping
            produced by queryset compilation for joined table aliases.
        root_model_class (type[Model] | None): Root model being materialized
            when recursion traverses into nested related models.
        prefix (str): Prefix identifying the current join branch inside
            `tables_and_models`.

    Returns:
        type[Model] | None: Materialized model instance, or `None` when a
        nested branch cannot be populated from the row.
    """
    item: dict[str, Any] = {}
    select_related = select_related or []
    prefetch_related = prefetch_related or []
    reference_select = reference_select or {}
    root_model_class = root_model_class or cast("type[Model]", cls)
    if tables_and_models is not None and prefix in tables_and_models:
        model_table = tables_and_models[prefix][0]
    else:
        model_table = cls.table_schema(using_schema) if using_schema is not None else cls.table

    secret_fields = cls.meta.secret_fields if exclude_secrets else set()

    # Instantiate any child instances first.
    for related in select_related:
        if "__" in related:
            first_part, remainder = related.split("__", 1)
            try:
                model_cls = cls.fields[first_part].target
            except (KeyError, AttributeError):
                model_cls = getattr(cls, first_part).related_from

            item[first_part] = model_cls.from_query_result(
                row,
                select_related=[remainder],
                using_schema=using_schema,
                exclude_secrets=exclude_secrets,
                tables_and_models=tables_and_models,
                root_model_class=root_model_class,
                prefix=(first_part if not prefix else f"{prefix}__{first_part}"),
            )
        else:
            try:
                model_cls = cls.fields[related].target
            except (KeyError, AttributeError):
                model_cls = getattr(cls, related).related_from
            item[related] = model_cls.from_query_result(
                row,
                using_schema=using_schema,
                exclude_secrets=exclude_secrets,
                tables_and_models=tables_and_models,
                root_model_class=root_model_class,
                prefix=(related if not prefix else f"{prefix}__{related}"),
            )

    # Populate the related names
    # Making sure if the model being queried is not inside a select related
    # This way it is not overritten by any value
    for related, foreign_key in cls.meta.foreign_key_fields.items():
        ignore_related: bool = cls.__should_ignore_related_name(related, select_related)
        if ignore_related:
            continue

        model_related = foreign_key.target

        # Apply the schema to the model
        model_related = cls.__apply_schema(model_related, using_schema)

        child_item: dict[str, Any] = {}
        if related not in secret_fields:
            column_names = (
                foreign_key.get_column_names(related)
                if hasattr(foreign_key, "get_column_names")
                else (related,)
            )
            related_keys = (
                tuple(foreign_key.related_columns.keys())
                if hasattr(foreign_key, "related_columns")
                else (model_related.pkname,)
            )
            for related_key, column_name in zip(related_keys, column_names, strict=False):
                foreign_key_value = cls._table_row_value(row, model_table, column_name)
                if foreign_key_value is not None:
                    child_item[related_key] = foreign_key_value

        # Make sure we generate a temporary reduced model
        # For the related fields. We simply chnage the structure of the model
        # and rebuild it with the new fields.
        if related not in secret_fields:
            if not child_item and getattr(foreign_key, "null", False):
                related_instance = model_related()
                if exclude_secrets:
                    related_instance.__no_load_trigger_attrs__.update(
                        model_related.meta.secret_fields
                    )
                if using_schema is not None:
                    related_instance.table = model_related.table_schema(using_schema)
                item[related] = related_instance
            else:
                related_instance = model_related(**child_item)
                if exclude_secrets:
                    related_instance.__no_load_trigger_attrs__.update(
                        model_related.meta.secret_fields
                    )
                if using_schema is not None:
                    related_instance.table = model_related.table_schema(using_schema)
                item[related] = related_instance

    # Check for the only_fields
    if is_only_fields or is_defer_fields:
        mapping_fields = (
            [str(field) for field in only_fields]
            if is_only_fields
            else list(row._mapping.keys())  # type: ignore
        )

        for column, value in row._mapping.items():
            column_name = str(getattr(column, "key", column))
            mapped_field = cls.meta.columns_to_field.get(column_name)
            if mapped_field is None:
                continue
            if mapped_field in secret_fields:
                continue
            # Making sure when a table is reflected, maps the right fields of the ReflectModel
            if mapped_field not in mapping_fields and column_name not in mapping_fields:
                continue
            if column_name not in item:
                item[column_name] = value

        # We need to generify the model fields to make sure we can populate the
        # model without mandatory fields
        model = cast("type[Model]", cls.proxy_model(**item))

        cls.__apply_reference_select(
            model=model,
            row=row,
            references=reference_select,
            tables_and_models=tables_and_models,
            root_model_class=root_model_class,
            using_schema=using_schema,
        )

        # Apply the schema to the model
        model = cls.__apply_schema(model, using_schema)

        model = cls.__handle_prefetch_related(
            row=row, model=model, prefetch_related=prefetch_related
        )
        return model
    else:
        # Pull out the regular column values.
        for column in model_table.columns:
            mapped_field = cls.meta.columns_to_field.get(column.key)
            if mapped_field is None:
                continue
            if mapped_field in secret_fields:
                continue
            if mapped_field in item and (
                column.key == mapped_field
                or isinstance(
                    cls.fields.get(mapped_field),
                    (saffier_fields.ForeignKey, saffier_fields.OneToOneField),
                )
            ):
                continue
            if column.key not in item:
                if column in row._mapping:
                    item[column.key] = row._mapping[column]
                elif column.key in row._mapping:
                    item[column.key] = row._mapping[column.key]

    model = (
        cast("type[Model]", cls(**item))
        if not exclude_secrets
        else cast("type[Model]", cls.proxy_model(**item))
    )
    if exclude_secrets:
        model.__no_load_trigger_attrs__.update(cls.meta.secret_fields)

    cls.__apply_reference_select(
        model=model,
        row=row,
        references=reference_select,
        tables_and_models=tables_and_models,
        root_model_class=root_model_class,
        using_schema=using_schema,
    )

    # Apply the schema to the model
    model = cls.__apply_schema(model, using_schema)

    # Handle prefetch related fields.
    model = cls.__handle_prefetch_related(
        row=row, model=model, prefetch_related=prefetch_related
    )

    if using_schema is not None:
        model.table = model.build(using_schema)  # type: ignore
    return model

__apply_reference_select classmethod

__apply_reference_select(
    model,
    row,
    references,
    tables_and_models,
    root_model_class,
    using_schema=None,
)

Copy reference_select() aliases from a row onto the model tree.

reference_select() can target local attributes or nested related objects. Nested dictionaries mirror the object graph and are traversed recursively until a concrete row source is found.

PARAMETER DESCRIPTION
model

Materialized model instance to mutate.

TYPE: type[Model] | None

row

Row containing the selected values.

TYPE: Row

references

Mapping produced by queryset compilation for reference selections.

TYPE: dict[str, Any]

tables_and_models

Joined-table lookup map used to resolve nested sources.

TYPE: dict[str, tuple[Any, Any]] | None

root_model_class

Root model for fallback column resolution.

TYPE: type[Model]

using_schema

Optional schema override for table lookup.

TYPE: str | None DEFAULT: None

Source code in saffier/core/db/models/row.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
@classmethod
def __apply_reference_select(
    cls,
    model: type["Model"] | None,
    row: Row,
    references: dict[str, Any],
    tables_and_models: dict[str, tuple[Any, Any]] | None,
    root_model_class: type["Model"],
    using_schema: str | None = None,
) -> None:
    """Copy `reference_select()` aliases from a row onto the model tree.

    `reference_select()` can target local attributes or nested related
    objects. Nested dictionaries mirror the object graph and are traversed
    recursively until a concrete row source is found.

    Args:
        model (type[Model] | None): Materialized model instance to mutate.
        row (Row): Row containing the selected values.
        references (dict[str, Any]): Mapping produced by queryset
            compilation for reference selections.
        tables_and_models (dict[str, tuple[Any, Any]] | None): Joined-table
            lookup map used to resolve nested sources.
        root_model_class (type[Model]): Root model for fallback column
            resolution.
        using_schema (str | None): Optional schema override for table lookup.
    """
    if model is None:
        return

    for target, source in references.items():
        if isinstance(source, dict):
            child = getattr(model, target, None)
            if child is not None:
                cls.__apply_reference_select(
                    model=child,
                    row=row,
                    references=source,
                    tables_and_models=tables_and_models,
                    root_model_class=root_model_class,
                    using_schema=using_schema,
                )
            continue

        if source is None:
            continue

        value = cls.__resolve_reference_source(
            row=row,
            source=source,
            tables_and_models=tables_and_models,
            root_model_class=root_model_class,
            using_schema=using_schema,
        )
        setattr(model, target, value)

__resolve_reference_source classmethod

__resolve_reference_source(
    row,
    source,
    tables_and_models,
    root_model_class,
    using_schema=None,
)

Resolve one reference_select() source from the current row.

The source may already be a row key, a SQLAlchemy column-like object, or a double-underscore path pointing at a joined table column.

PARAMETER DESCRIPTION
row

SQLAlchemy row containing the selected values.

TYPE: Row

source

Raw source token stored in the reference-select map.

TYPE: Any

tables_and_models

Joined-table lookup map used for path-based resolution.

TYPE: dict[str, tuple[Any, Any]] | None

root_model_class

Root model used when a direct table lookup is required.

TYPE: type[Model]

using_schema

Optional schema override for table lookup.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
Any

Resolved value copied onto the model instance.

TYPE: Any

RAISES DESCRIPTION
QuerySetError

If the source cannot be resolved from the row.

Source code in saffier/core/db/models/row.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
@classmethod
def __resolve_reference_source(
    cls,
    row: Row,
    source: Any,
    tables_and_models: dict[str, tuple[Any, Any]] | None,
    root_model_class: type["Model"],
    using_schema: str | None = None,
) -> Any:
    """Resolve one `reference_select()` source from the current row.

    The source may already be a row key, a SQLAlchemy column-like object, or
    a double-underscore path pointing at a joined table column.

    Args:
        row (Row): SQLAlchemy row containing the selected values.
        source (Any): Raw source token stored in the reference-select map.
        tables_and_models (dict[str, tuple[Any, Any]] | None): Joined-table
            lookup map used for path-based resolution.
        root_model_class (type[Model]): Root model used when a direct table
            lookup is required.
        using_schema (str | None): Optional schema override for table lookup.

    Returns:
        Any: Resolved value copied onto the model instance.

    Raises:
        QuerySetError: If the source cannot be resolved from the row.
    """
    if source in row._mapping:
        return row._mapping[source]

    source_key = getattr(source, "key", None) or getattr(source, "name", None)
    if source_key is not None and source_key in row._mapping:
        return row._mapping[source_key]

    if isinstance(source, str):
        column = cls.__resolve_reference_column(
            source=source,
            tables_and_models=tables_and_models,
            root_model_class=root_model_class,
            using_schema=using_schema,
        )
        if column is not None:
            if column in row._mapping:
                return row._mapping[column]
            if column.key in row._mapping:
                return row._mapping[column.key]
        if source in row._mapping:
            return row._mapping[source]
        if hasattr(row, source):
            return getattr(row, source)

    raise QuerySetError(
        detail=f"Unable to resolve reference_select source '{source}' for {cls.__name__}."
    )

__resolve_reference_column classmethod

__resolve_reference_column(
    source,
    tables_and_models,
    root_model_class,
    using_schema=None,
)

Resolve a string reference-select source into a SQLAlchemy column.

PARAMETER DESCRIPTION
source

Column name or join__path__column lookup string.

TYPE: str

tables_and_models

Joined-table lookup map keyed by queryset prefixes.

TYPE: dict[str, tuple[Any, Any]] | None

root_model_class

Root model used when no joined table is specified.

TYPE: type[Model]

using_schema

Optional schema override for table lookup.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
Any | None

Any | None: Resolved SQLAlchemy column object, or None when the

Any | None

column is not available.

Source code in saffier/core/db/models/row.py
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
@classmethod
def __resolve_reference_column(
    cls,
    source: str,
    tables_and_models: dict[str, tuple[Any, Any]] | None,
    root_model_class: type["Model"],
    using_schema: str | None = None,
) -> Any | None:
    """Resolve a string reference-select source into a SQLAlchemy column.

    Args:
        source (str): Column name or `join__path__column` lookup string.
        tables_and_models (dict[str, tuple[Any, Any]] | None): Joined-table
            lookup map keyed by queryset prefixes.
        root_model_class (type[Model]): Root model used when no joined table
            is specified.
        using_schema (str | None): Optional schema override for table lookup.

    Returns:
        Any | None: Resolved SQLAlchemy column object, or `None` when the
        column is not available.
    """
    table = None
    column_name = source

    if "__" in source:
        prefix, column_name = source.rsplit("__", 1)
        if tables_and_models is not None:
            table = tables_and_models.get(prefix, (None, None))[0]
    elif tables_and_models is not None:
        table = tables_and_models.get("", (None, None))[0]

    if table is None:
        table = (
            root_model_class.table_schema(using_schema)
            if using_schema is not None
            else root_model_class.table
        )

    return getattr(table.c, column_name, None)

__apply_schema classmethod

__apply_schema(model, schema=None)

Attach a schema-specific table to one model instance when needed.

PARAMETER DESCRIPTION
model

Model class or instance being prepared.

TYPE: type[Model]

schema

Schema name requested by the queryset.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
type[Model]

type[Model]: The original model, potentially with its instance

type[Model]

table rebound to a schema-specific version.

Source code in saffier/core/db/models/row.py
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
@classmethod
def __apply_schema(cls, model: type["Model"], schema: str | None = None) -> type["Model"]:
    """Attach a schema-specific table to one model instance when needed.

    Args:
        model (type[Model]): Model class or instance being prepared.
        schema (str | None): Schema name requested by the queryset.

    Returns:
        type[Model]: The original `model`, potentially with its instance
        table rebound to a schema-specific version.
    """
    # Apply the schema to model instances without mutating class-level table caches.
    if schema is not None and not isinstance(model, type):
        model.table = model.build(schema)  # type: ignore
    return model
__should_ignore_related_name(related_name, select_related)

Return whether a foreign-key placeholder is covered by select_related.

When a joined related object is already being materialized recursively, the loader must not also inject the lighter foreign-key placeholder for the same attribute. Doing so would overwrite the fully populated related instance.

PARAMETER DESCRIPTION
related_name

Foreign-key attribute currently being evaluated.

TYPE: str

select_related

Joined relation paths attached to the queryset.

TYPE: Sequence[str]

RETURNS DESCRIPTION
bool

True when the foreign-key placeholder should be skipped.

TYPE: bool

Source code in saffier/core/db/models/row.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
@classmethod
def __should_ignore_related_name(
    cls, related_name: str, select_related: Sequence[str]
) -> bool:
    """Return whether a foreign-key placeholder is covered by `select_related`.

    When a joined related object is already being materialized recursively,
    the loader must not also inject the lighter foreign-key placeholder for
    the same attribute. Doing so would overwrite the fully populated related
    instance.

    Args:
        related_name (str): Foreign-key attribute currently being evaluated.
        select_related (Sequence[str]): Joined relation paths attached to the
            queryset.

    Returns:
        bool: `True` when the foreign-key placeholder should be skipped.
    """
    for related_field in select_related:
        fields = related_field.split("__")
        if related_name in fields:
            return True
    return False
__handle_prefetch_related(
    row,
    model,
    prefetch_related,
    parent_cls=None,
    original_prefetch=None,
    is_nested=False,
)

Populate prefetched relations for one materialized model instance.

The synchronous implementation is used when row materialization already runs inside sync-only code paths. It resolves plain relations, many-to-many paths, and nested prefetch chains while guarding against to_attr collisions on the destination model.

PARAMETER DESCRIPTION
row

Source row containing the root primary-key values.

TYPE: Row

model

Materialized model instance to enrich.

TYPE: type[Model]

prefetch_related

Prefetch directives attached to the queryset.

TYPE: Sequence[Prefetch]

parent_cls

Parent object for nested traversal.

TYPE: type[Model] | None DEFAULT: None

original_prefetch

Root prefetch object preserved while recursion walks nested paths.

TYPE: Prefetch | None DEFAULT: None

is_nested

Whether the current call is processing a nested branch.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
type[Model]

type[Model]: The same model instance with prefetched attributes set.

RAISES DESCRIPTION
QuerySetError

If to_attr would overwrite an existing attribute.

Source code in saffier/core/db/models/row.py
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
@classmethod
def __handle_prefetch_related(
    cls,
    row: Row,
    model: type["Model"],
    prefetch_related: Sequence["Prefetch"],
    parent_cls: type["Model"] | None = None,
    original_prefetch: Optional["Prefetch"] = None,
    is_nested: bool = False,
) -> type["Model"]:
    """Populate prefetched relations for one materialized model instance.

    The synchronous implementation is used when row materialization already
    runs inside sync-only code paths. It resolves plain relations,
    many-to-many paths, and nested prefetch chains while guarding against
    `to_attr` collisions on the destination model.

    Args:
        row (Row): Source row containing the root primary-key values.
        model (type[Model]): Materialized model instance to enrich.
        prefetch_related (Sequence[Prefetch]): Prefetch directives attached
            to the queryset.
        parent_cls (type[Model] | None): Parent object for nested traversal.
        original_prefetch (Prefetch | None): Root prefetch object preserved
            while recursion walks nested paths.
        is_nested (bool): Whether the current call is processing a nested
            branch.

    Returns:
        type[Model]: The same model instance with prefetched attributes set.

    Raises:
        QuerySetError: If `to_attr` would overwrite an existing attribute.
    """
    if not parent_cls:
        parent_cls = model

    for related in prefetch_related:
        if not original_prefetch:
            original_prefetch = related

        if original_prefetch and not is_nested:
            original_prefetch = related

        # Check for conflicting names
        # If to_attr has the same name of any
        if hasattr(parent_cls, original_prefetch.to_attr):
            raise QuerySetError(
                f"Conflicting attribute to_attr='{original_prefetch.related_name}' with '{original_prefetch.to_attr}' in {parent_cls.__class__.__name__}"
            )

        if "__" in related.related_name:
            first_part, remainder = related.related_name.split("__", 1)
            if isinstance(
                cls.fields.get(first_part), saffier_fields.ManyToManyField
            ) or hasattr(model, first_part):
                records = run_sync(
                    cls.__collect_prefetch_records(
                        model=model,
                        related_name=related.related_name,
                        queryset=original_prefetch.queryset,
                    )
                )
                saffier_setattr(model, related.to_attr, records)
                continue

            model_cls = cls.meta.related_fields[first_part].related_to

            # Build the new nested Prefetch object
            remainder_prefetch = related.__class__(
                related_name=remainder, to_attr=related.to_attr, queryset=related.queryset
            )

            # Recursively continue the process of handling the
            # new prefetch
            model_cls.__handle_prefetch_related(
                row,
                model,
                prefetch_related=[remainder_prefetch],
                original_prefetch=original_prefetch,
                parent_cls=model,
                is_nested=True,
            )

        # Check for individual not nested querysets
        elif related.queryset is not None and not is_nested:
            extra = cls._build_related_pk_filter(related.related_name, row, cls)
            related.queryset.extra = extra

            # Execute the queryset
            records = run_sync(cls.__run_query(queryset=related.queryset))
            saffier_setattr(model, related.to_attr, records)
        elif isinstance(
            cls.fields.get(related.related_name),
            saffier_fields.ManyToManyField,
        ) or hasattr(model, related.related_name):
            records = run_sync(
                cls.__collect_prefetch_records(
                    model=model,
                    related_name=related.related_name,
                    queryset=related.queryset,
                )
            )
            saffier_setattr(model, related.to_attr, records)
        else:
            model_cls = getattr(cls, related.related_name).related_from
            records = cls.__process_nested_prefetch_related(
                row,
                prefetch_related=related,
                original_prefetch=original_prefetch,
                parent_cls=model,
                queryset=original_prefetch.queryset,
            )

            saffier_setattr(model, related.to_attr, records)
    return model
__handle_prefetch_related_async(
    row,
    model,
    prefetch_related,
    parent_cls=None,
    original_prefetch=None,
    is_nested=False,
)

Async variant of __handle_prefetch_related.

PARAMETER DESCRIPTION
row

Source row containing the root primary-key values.

TYPE: Row

model

Materialized model instance to enrich.

TYPE: type[Model]

prefetch_related

Prefetch directives attached to the queryset.

TYPE: Sequence[Prefetch]

parent_cls

Parent object for nested traversal.

TYPE: type[Model] | None DEFAULT: None

original_prefetch

Root prefetch object preserved while recursion walks nested paths.

TYPE: Prefetch | None DEFAULT: None

is_nested

Whether the current call is processing a nested branch.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
type[Model]

type[Model]: The same model instance with prefetched attributes set.

RAISES DESCRIPTION
QuerySetError

If to_attr would overwrite an existing attribute.

Source code in saffier/core/db/models/row.py
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
@classmethod
async def __handle_prefetch_related_async(
    cls,
    row: Row,
    model: type["Model"],
    prefetch_related: Sequence["Prefetch"],
    parent_cls: type["Model"] | None = None,
    original_prefetch: Optional["Prefetch"] = None,
    is_nested: bool = False,
) -> type["Model"]:
    """Async variant of `__handle_prefetch_related`.

    Args:
        row (Row): Source row containing the root primary-key values.
        model (type[Model]): Materialized model instance to enrich.
        prefetch_related (Sequence[Prefetch]): Prefetch directives attached
            to the queryset.
        parent_cls (type[Model] | None): Parent object for nested traversal.
        original_prefetch (Prefetch | None): Root prefetch object preserved
            while recursion walks nested paths.
        is_nested (bool): Whether the current call is processing a nested
            branch.

    Returns:
        type[Model]: The same model instance with prefetched attributes set.

    Raises:
        QuerySetError: If `to_attr` would overwrite an existing attribute.
    """
    if not parent_cls:
        parent_cls = model

    for related in prefetch_related:
        if not original_prefetch:
            original_prefetch = related

        if original_prefetch and not is_nested:
            original_prefetch = related

        if hasattr(parent_cls, original_prefetch.to_attr):
            raise QuerySetError(
                f"Conflicting attribute to_attr='{original_prefetch.related_name}' with '{original_prefetch.to_attr}' in {parent_cls.__class__.__name__}"
            )

        if "__" in related.related_name:
            first_part, remainder = related.related_name.split("__", 1)
            if isinstance(
                cls.fields.get(first_part), saffier_fields.ManyToManyField
            ) or hasattr(model, first_part):
                records = await cls.__collect_prefetch_records(
                    model=model,
                    related_name=related.related_name,
                    queryset=original_prefetch.queryset,
                )
                saffier_setattr(model, related.to_attr, records)
                continue

            model_cls = cls.meta.related_fields[first_part].related_to
            remainder_prefetch = related.__class__(
                related_name=remainder,
                to_attr=related.to_attr,
                queryset=related.queryset,
            )
            await model_cls.__handle_prefetch_related_async(
                row,
                model,
                prefetch_related=[remainder_prefetch],
                original_prefetch=original_prefetch,
                parent_cls=model,
                is_nested=True,
            )
        elif related.queryset is not None and not is_nested:
            extra = cls._build_related_pk_filter(related.related_name, row, cls)
            related.queryset.extra = extra
            records = await cls.__run_query(queryset=related.queryset)
            saffier_setattr(model, related.to_attr, records)
        elif isinstance(
            cls.fields.get(related.related_name),
            saffier_fields.ManyToManyField,
        ) or hasattr(model, related.related_name):
            records = await cls.__collect_prefetch_records(
                model=model,
                related_name=related.related_name,
                queryset=related.queryset,
            )
            saffier_setattr(model, related.to_attr, records)
        else:
            records = await cls.__process_nested_prefetch_related_async(
                row=row,
                prefetch_related=related,
                parent_cls=model,
                original_prefetch=original_prefetch,
                queryset=original_prefetch.queryset,
            )
            saffier_setattr(model, related.to_attr, records)
    return model
__process_nested_prefetch_related(
    row,
    prefetch_related,
    parent_cls,
    original_prefetch,
    queryset,
)

Resolve one nested reverse-relation prefetch synchronously.

Nested prefetches such as "team__members" need to be rewritten into a queryset filter rooted at the child model. This helper derives the child foreign-key path, injects the current parent primary-key values, and executes the resulting queryset.

PARAMETER DESCRIPTION
row

Source row containing the current parent identifiers.

TYPE: Row

prefetch_related

Current nested prefetch branch.

TYPE: Prefetch

parent_cls

Parent model instance used to derive the related filter.

TYPE: type[Model]

original_prefetch

Root prefetch declaration from the queryset.

TYPE: Prefetch

queryset

Queryset used to load the related records.

TYPE: QuerySet

RETURNS DESCRIPTION
list[type[Model]]

list[type[Model]]: Prefetched related records for the current branch.

Source code in saffier/core/db/models/row.py
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
@classmethod
def __process_nested_prefetch_related(
    cls,
    row: Row,
    prefetch_related: "Prefetch",
    parent_cls: type["Model"],
    original_prefetch: "Prefetch",
    queryset: "QuerySet",
) -> list[type["Model"]]:
    """Resolve one nested reverse-relation prefetch synchronously.

    Nested prefetches such as `"team__members"` need to be rewritten into a
    queryset filter rooted at the child model. This helper derives the child
    foreign-key path, injects the current parent primary-key values, and
    executes the resulting queryset.

    Args:
        row (Row): Source row containing the current parent identifiers.
        prefetch_related (Prefetch): Current nested prefetch branch.
        parent_cls (type[Model]): Parent model instance used to derive the
            related filter.
        original_prefetch (Prefetch): Root prefetch declaration from the
            queryset.
        queryset (QuerySet): Queryset used to load the related records.

    Returns:
        list[type[Model]]: Prefetched related records for the current branch.
    """
    query_split = original_prefetch.related_name.split("__")
    query_split.reverse()
    query_split.pop(query_split.index(prefetch_related.related_name))

    # Get the model to query related
    model_class = getattr(cls, prefetch_related.related_name).related_from

    # Get the foreign key name from the model_class
    foreign_key_name = model_class.meta.related_names_mapping[prefetch_related.related_name]

    # Insert as the entry point of the query
    query_split.insert(0, foreign_key_name)

    # Build new filter
    query = "__".join(query_split)

    extra = cls._build_related_pk_filter(query, row, parent_cls)

    records = run_sync(cls.__run_query(model_class, extra, queryset))
    return records
__process_nested_prefetch_related_async(
    row,
    prefetch_related,
    parent_cls,
    original_prefetch,
    queryset,
)

Async variant of __process_nested_prefetch_related.

PARAMETER DESCRIPTION
row

Source row containing the current parent identifiers.

TYPE: Row

prefetch_related

Current nested prefetch branch.

TYPE: Prefetch

parent_cls

Parent model instance used to derive the related filter.

TYPE: type[Model]

original_prefetch

Root prefetch declaration from the queryset.

TYPE: Prefetch

queryset

Queryset used to load the related records.

TYPE: QuerySet

RETURNS DESCRIPTION
list[type[Model]]

list[type[Model]]: Prefetched related records for the current branch.

Source code in saffier/core/db/models/row.py
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
@classmethod
async def __process_nested_prefetch_related_async(
    cls,
    row: Row,
    prefetch_related: "Prefetch",
    parent_cls: type["Model"],
    original_prefetch: "Prefetch",
    queryset: "QuerySet",
) -> list[type["Model"]]:
    """Async variant of `__process_nested_prefetch_related`.

    Args:
        row (Row): Source row containing the current parent identifiers.
        prefetch_related (Prefetch): Current nested prefetch branch.
        parent_cls (type[Model]): Parent model instance used to derive the
            related filter.
        original_prefetch (Prefetch): Root prefetch declaration from the
            queryset.
        queryset (QuerySet): Queryset used to load the related records.

    Returns:
        list[type[Model]]: Prefetched related records for the current branch.
    """
    query_split = original_prefetch.related_name.split("__")
    query_split.reverse()
    query_split.pop(query_split.index(prefetch_related.related_name))

    model_class = getattr(cls, prefetch_related.related_name).related_from
    foreign_key_name = model_class.meta.related_names_mapping[prefetch_related.related_name]
    query_split.insert(0, foreign_key_name)

    query = "__".join(query_split)
    extra = cls._build_related_pk_filter(query, row, parent_cls)

    return await cls.__run_query(model_class, extra, queryset)

__collect_prefetch_records async classmethod

__collect_prefetch_records(
    model, related_name, queryset=None
)

Collect prefetched records reachable through an attribute path.

PARAMETER DESCRIPTION
model

Root model instance that already exists.

TYPE: type[Model]

related_name

Relation path, including nested __ segments.

TYPE: str

queryset

Optional queryset used to constrain the collected records by primary key.

TYPE: QuerySet | None DEFAULT: None

RETURNS DESCRIPTION
list[type[Model]]

list[type[Model]]: Related records discovered through the path. When

list[type[Model]]

queryset is provided, the returned records come from that queryset

list[type[Model]]

filtered to the discovered primary keys.

Source code in saffier/core/db/models/row.py
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
@classmethod
async def __collect_prefetch_records(
    cls,
    model: type["Model"],
    related_name: str,
    queryset: Optional["QuerySet"] = None,
) -> list[type["Model"]]:
    """Collect prefetched records reachable through an attribute path.

    Args:
        model (type[Model]): Root model instance that already exists.
        related_name (str): Relation path, including nested `__` segments.
        queryset (QuerySet | None): Optional queryset used to constrain the
            collected records by primary key.

    Returns:
        list[type[Model]]: Related records discovered through the path. When
        `queryset` is provided, the returned records come from that queryset
        filtered to the discovered primary keys.
    """
    records = await cls.__traverse_prefetch_path(model, related_name.split("__"))
    if queryset is None:
        return records
    if not records:
        return []

    filter_values = []
    for record in records:
        pk = getattr(record, "pk", None)
        if pk is not None and pk not in filter_values:
            filter_values.append(pk)

    if not filter_values:
        return []

    cloned_queryset = queryset._clone() if hasattr(queryset, "_clone") else queryset
    return await cloned_queryset.filter(pk__in=filter_values)

__traverse_prefetch_path async classmethod

__traverse_prefetch_path(model, path_parts)

Traverse a relation path on an already materialized model tree.

PARAMETER DESCRIPTION
model

Current model instance being inspected.

TYPE: type[Model]

path_parts

Remaining relation segments to resolve.

TYPE: Sequence[str]

RETURNS DESCRIPTION
list[type[Model]]

list[type[Model]]: Models found at the end of the path. Empty when

list[type[Model]]

any segment is missing.

Source code in saffier/core/db/models/row.py
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
@classmethod
async def __traverse_prefetch_path(
    cls,
    model: type["Model"],
    path_parts: Sequence[str],
) -> list[type["Model"]]:
    """Traverse a relation path on an already materialized model tree.

    Args:
        model (type[Model]): Current model instance being inspected.
        path_parts (Sequence[str]): Remaining relation segments to resolve.

    Returns:
        list[type[Model]]: Models found at the end of the path. Empty when
        any segment is missing.
    """
    if not path_parts:
        return [model]

    attr = getattr(model, path_parts[0], None)
    if attr is None:
        return []

    if hasattr(attr, "all"):
        values = await attr.all()
        results: list[type[Model]] = []
        for value in values:
            results.extend(await cls.__traverse_prefetch_path(value, path_parts[1:]))
        return results

    return await cls.__traverse_prefetch_path(attr, path_parts[1:])

__run_query async classmethod

__run_query(model=None, extra=None, queryset=None)

Execute a relation-loading queryset with optional extra filters.

PARAMETER DESCRIPTION
model

Model whose default manager should be used when queryset is not supplied.

TYPE: type[Model] | None DEFAULT: None

extra

Additional filter keyword arguments derived from the source row.

TYPE: dict[str, Any] | None DEFAULT: None

queryset

Explicit queryset to execute.

TYPE: QuerySet | None DEFAULT: None

RETURNS DESCRIPTION
list[type[Model]] | Any

list[type[Model]] | Any: Query result produced by the provided

list[type[Model]] | Any

queryset or the model's default manager.

Source code in saffier/core/db/models/row.py
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
@classmethod
async def __run_query(
    cls,
    model: type["Model"] | None = None,
    extra: dict[str, Any] | None = None,
    queryset: Optional["QuerySet"] = None,
) -> list[type["Model"]] | Any:
    """Execute a relation-loading queryset with optional extra filters.

    Args:
        model (type[Model] | None): Model whose default manager should be
            used when `queryset` is not supplied.
        extra (dict[str, Any] | None): Additional filter keyword arguments
            derived from the source row.
        queryset (QuerySet | None): Explicit queryset to execute.

    Returns:
        list[type[Model]] | Any: Query result produced by the provided
        queryset or the model's default manager.
    """

    if not queryset:
        return await model.query.filter(**extra)  # type: ignore

    if extra:
        queryset.extra = extra

    return await queryset

model_json_schema classmethod

model_json_schema(
    *,
    schema_generator=None,
    mode=None,
    phase=None,
    include_callable_defaults=None,
    **kwargs,
)

Generate the admin/marshalling JSON schema for this model.

PARAMETER DESCRIPTION
schema_generator

Optional schema generator carrying callable-default policy information.

TYPE: Any | None DEFAULT: None

mode

Requested schema mode, typically validation or serialization.

TYPE: str | None DEFAULT: None

phase

Explicit marshalling phase. When omitted, it is derived from mode.

TYPE: str | None DEFAULT: None

include_callable_defaults

Whether callable defaults should be preserved in the schema output.

TYPE: bool | None DEFAULT: None

**kwargs

Accepted for compatibility with Pydantic-style callers.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: JSON schema generated from the model's admin

dict[str, Any]

marshall class.

Source code in saffier/core/db/models/model.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@classmethod
def model_json_schema(
    cls,
    *,
    schema_generator: Any | None = None,
    mode: str | None = None,
    phase: str | None = None,
    include_callable_defaults: bool | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Generate the admin/marshalling JSON schema for this model.

    Args:
        schema_generator: Optional schema generator carrying callable-default
            policy information.
        mode: Requested schema mode, typically validation or serialization.
        phase: Explicit marshalling phase. When omitted, it is derived from
            `mode`.
        include_callable_defaults: Whether callable defaults should be
            preserved in the schema output.
        **kwargs: Accepted for compatibility with Pydantic-style callers.

    Returns:
        dict[str, Any]: JSON schema generated from the model's admin
        marshall class.
    """
    del kwargs
    if phase is None:
        phase = "create" if mode == "validation" else "view"
    if include_callable_defaults is None:
        marker = getattr(schema_generator, "include_callable_defaults", None)
        if marker is None:
            schema_name = getattr(schema_generator, "__name__", "")
            include_callable_defaults = schema_name != "NoCallableDefaultJsonSchema"
        else:
            include_callable_defaults = bool(marker)
    marshall_class = cls.get_admin_marshall_class(phase=phase, for_schema=True)
    return marshall_class.model_json_schema(
        include_callable_defaults=bool(include_callable_defaults)
    )

__getattribute__

__getattribute__(name)
Source code in saffier/core/db/models/model.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def __getattribute__(self, name: str) -> Any:
    value = super().__getattribute__(name)
    if value is not None or name.startswith("_"):
        return value

    try:
        fields = object.__getattribute__(self, "fields")
    except AttributeError:
        return value

    field = fields.get(name) if isinstance(fields, dict) else None
    if (
        field is not None
        and getattr(field, "null", False)
        and isinstance(field, (saffier.ForeignKey, saffier.OneToOneField))
    ):
        return None
    return value

model_dump

model_dump(
    include=None,
    exclude=None,
    exclude_none=False,
    _seen=None,
)

Serialize declared model fields into a dictionary.

The serializer walks Saffier field declarations rather than raw __dict__ state. That keeps virtual relation managers out of the output, respects exclude=True, supports nested include/exclude rules, and avoids infinite recursion by falling back to primary keys for already-seen related objects.

PARAMETER DESCRIPTION
include

Optional whitelist of fields or nested include rules.

TYPE: set[int] | set[str] | dict[int, Any] | dict[str, Any] | None DEFAULT: None

exclude

Optional blacklist of fields or nested exclude rules.

TYPE: set[int] | set[str] | dict[int, Any] | dict[str, Any] | None DEFAULT: None

exclude_none

Whether None values should be omitted.

TYPE: bool DEFAULT: False

_seen

Internal recursion guard used for nested model serialization.

TYPE: set[int] | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: Serialized model payload.

Source code in saffier/core/db/models/model.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def model_dump(
    self,
    include: set[int] | set[str] | dict[int, typing.Any] | dict[str, typing.Any] | None = None,
    exclude: set[int] | set[str] | dict[int, typing.Any] | dict[str, typing.Any] | None = None,
    exclude_none: bool = False,
    _seen: set[int] | None = None,
) -> dict[str, typing.Any]:
    """Serialize declared model fields into a dictionary.

    The serializer walks Saffier field declarations rather than raw
    `__dict__` state. That keeps virtual relation managers out of the
    output, respects `exclude=True`, supports nested include/exclude rules,
    and avoids infinite recursion by falling back to primary keys for
    already-seen related objects.

    Args:
        include: Optional whitelist of fields or nested include rules.
        exclude: Optional blacklist of fields or nested exclude rules.
        exclude_none: Whether `None` values should be omitted.
        _seen: Internal recursion guard used for nested model serialization.

    Returns:
        dict[str, Any]: Serialized model payload.
    """
    seen = set() if _seen is None else _seen
    seen.add(id(self))

    def is_included(
        field_name: str,
        include_rule: set[int]
        | set[str]
        | dict[int, typing.Any]
        | dict[str, typing.Any]
        | None,
    ) -> bool:
        if include_rule is None:
            return True
        if isinstance(include_rule, dict):
            return field_name in include_rule and include_rule[field_name] is not False
        return field_name in include_rule

    def is_excluded(
        field_name: str,
        exclude_rule: set[int]
        | set[str]
        | dict[int, typing.Any]
        | dict[str, typing.Any]
        | None,
    ) -> bool:
        if exclude_rule is None:
            return False
        if isinstance(exclude_rule, dict):
            return exclude_rule.get(field_name) is True
        return field_name in exclude_rule

    def nested_rule(
        rule: set[int] | set[str] | dict[int, typing.Any] | dict[str, typing.Any] | None,
        field_name: str,
    ) -> typing.Any:
        if not isinstance(rule, dict):
            return None
        value = rule.get(field_name)
        if value in (None, True, False):
            return None
        return value

    def serialize(
        value: typing.Any,
        *,
        sub_include: typing.Any = None,
        sub_exclude: typing.Any = None,
    ) -> typing.Any:
        if hasattr(value, "model_dump") and callable(value.model_dump):
            if id(value) in seen:
                return getattr(value, "pk", None)
            try:
                return value.model_dump(
                    include=sub_include,
                    exclude=sub_exclude,
                    exclude_none=exclude_none,
                    _seen=seen,
                )
            except TypeError:
                kwargs: dict[str, typing.Any] = {"exclude_none": exclude_none}
                if sub_include is not None:
                    kwargs["include"] = sub_include
                if sub_exclude is not None:
                    kwargs["exclude"] = sub_exclude
                return value.model_dump(**kwargs)

        if isinstance(value, dict):
            return {key: serialize(inner_value) for key, inner_value in value.items()}

        if isinstance(value, Sequence) and not isinstance(value, (str, bytes, bytearray)):
            return [serialize(item) for item in value]

        return value

    row_dict: dict[str, typing.Any] = {}

    for field_name, field in self.fields.items():
        if isinstance(field, saffier.ManyToManyField):
            continue
        if getattr(field, "exclude", False):
            continue
        if not is_included(field_name, include) or is_excluded(field_name, exclude):
            continue

        if field_name in self.__dict__:
            value = self.__dict__[field_name]
        elif getattr(field, "is_computed", False):
            try:
                value = getattr(self, field_name)
            except AttributeError:
                continue
        else:
            continue

        value = serialize(
            value,
            sub_include=nested_rule(include, field_name),
            sub_exclude=nested_rule(exclude, field_name),
        )
        if exclude_none and value is None:
            continue
        row_dict[field_name] = value

    return row_dict

model_dump_json

model_dump_json(
    include=None, exclude=None, exclude_none=False
)

Serialize the dumped model payload into JSON.

PARAMETER DESCRIPTION
include

Optional include rules forwarded to model_dump().

TYPE: set[int] | set[str] | dict[int, Any] | dict[str, Any] | None DEFAULT: None

exclude

Optional exclude rules forwarded to model_dump().

TYPE: set[int] | set[str] | dict[int, Any] | dict[str, Any] | None DEFAULT: None

exclude_none

Whether None values should be omitted.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

JSON representation of the dumped model payload.

TYPE: str

Source code in saffier/core/db/models/model.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def model_dump_json(
    self,
    include: set[int] | set[str] | dict[int, typing.Any] | dict[str, typing.Any] | None = None,
    exclude: set[int] | set[str] | dict[int, typing.Any] | dict[str, typing.Any] | None = None,
    exclude_none: bool = False,
) -> str:
    """Serialize the dumped model payload into JSON.

    Args:
        include: Optional include rules forwarded to `model_dump()`.
        exclude: Optional exclude rules forwarded to `model_dump()`.
        exclude_none: Whether `None` values should be omitted.

    Returns:
        str: JSON representation of the dumped model payload.
    """
    return orjson.dumps(
        self.model_dump(include=include, exclude=exclude, exclude_none=exclude_none),
        option=orjson.OPT_NON_STR_KEYS,
    ).decode("utf-8")

execute_pre_save_hooks async

execute_pre_save_hooks(
    field_values, original_values=None, *, is_update
)

Run field-level pre-save callbacks and merge their generated values.

PARAMETER DESCRIPTION
field_values

Current field payload about to be persisted.

TYPE: dict[str, Any]

original_values

Previously loaded field values for update flows.

TYPE: dict[str, Any] | None DEFAULT: None

is_update

Whether the hooks are executing for an update.

TYPE: bool

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: Additional column values generated by field hooks.

Source code in saffier/core/db/models/model.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
async def execute_pre_save_hooks(
    self,
    field_values: dict[str, typing.Any],
    original_values: dict[str, typing.Any] | None = None,
    *,
    is_update: bool,
) -> dict[str, typing.Any]:
    """Run field-level pre-save callbacks and merge their generated values.

    Args:
        field_values: Current field payload about to be persisted.
        original_values: Previously loaded field values for update flows.
        is_update: Whether the hooks are executing for an update.

    Returns:
        dict[str, Any]: Additional column values generated by field hooks.
    """
    hook_values: dict[str, typing.Any] = {}
    token = CURRENT_INSTANCE.set(self)
    try:
        all_field_names = {*field_values.keys(), *((original_values or {}).keys())}
        for field_name in all_field_names:
            field = self.fields.get(field_name)
            if field is None:
                continue
            pre_save_callback = getattr(field, "pre_save_callback", None)
            if not callable(pre_save_callback):
                continue
            hook_values.update(
                await pre_save_callback(
                    field_values.get(field_name),
                    (original_values or {}).get(field_name),
                    is_update=is_update,
                )
            )
        return hook_values
    finally:
        CURRENT_INSTANCE.reset(token)

_should_force_insert

_should_force_insert()
Source code in saffier/core/db/models/model.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def _should_force_insert(self) -> bool:
    for field_name in type(self).pknames:
        field = self.fields.get(field_name)
        if field is None:
            continue
        value = getattr(self, field_name, None)
        column = getattr(self.table.c, field_name, None)
        if value is None and column is not None and getattr(column, "autoincrement", False):
            return True
        if getattr(field, "increment_on_save", 0) != 0 or getattr(field, "auto_now", False):
            return True
        if getattr(field, "auto_now_add", False) and value is None:
            return True
    return False

_apply_persisted_db_values

_apply_persisted_db_values(db_kwargs)
Source code in saffier/core/db/models/model.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def _apply_persisted_db_values(self, db_kwargs: dict[str, typing.Any]) -> None:
    grouped_values: dict[str, dict[str, typing.Any]] = {}

    for key, value in db_kwargs.items():
        field_name = self.meta.columns_to_field.get(key)
        if field_name is None:
            field_name = key if key in self.fields else None
        if field_name is None or field_name not in self.fields:
            continue
        grouped_values.setdefault(field_name, {})[key] = value

    for field_name, field_values in grouped_values.items():
        field = self.fields[field_name]
        if getattr(field, "increment_on_save", 0) != 0 and any(
            isinstance(value, ColumnElement) for value in field_values.values()
        ):
            self.__dict__.pop(field_name, None)
            continue

        payload = dict(field_values)
        field.modify_input(field_name, payload)

        resolved_value = payload.get(field_name)
        if resolved_value is None and len(field_values) == 1:
            resolved_value = next(iter(field_values.values()))

        if isinstance(field, (saffier.ForeignKey, saffier.OneToOneField)):
            current_value = self.__dict__.get(field_name)
            if current_value is not None and hasattr(current_value, "pk"):
                if resolved_value is None:
                    self.__dict__[field_name] = None
                    continue
                try:
                    current_value.pk = (
                        getattr(resolved_value, "pk", resolved_value)
                        if not isinstance(resolved_value, dict)
                        else resolved_value
                    )
                    continue
                except Exception:
                    pass

        if field_name in payload:
            setattr(self, field_name, payload[field_name])
            continue

        if len(field_values) == 1:
            setattr(self, field_name, next(iter(field_values.values())))

update async

update(**kwargs)

Validate, persist, and reapply an in-place update to this row.

The update path normalizes virtual field input, executes pre-save hooks, validates only the fields being changed, writes the update statement, and reapplies database-side values back onto the current instance.

PARAMETER DESCRIPTION
**kwargs

Logical field values to update.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Any

The current model instance.

TYPE: Any

Source code in saffier/core/db/models/model.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
async def update(self, **kwargs: typing.Any) -> typing.Any:
    """Validate, persist, and reapply an in-place update to this row.

    The update path normalizes virtual field input, executes pre-save hooks,
    validates only the fields being changed, writes the update statement,
    and reapplies database-side values back onto the current instance.

    Args:
        **kwargs: Logical field values to update.

    Returns:
        Any: The current model instance.
    """
    await self.signals.pre_update.send(sender=self.__class__, instance=self)

    normalized_kwargs = self.__class__.normalize_field_kwargs(kwargs)
    original_field_values = {
        key: getattr(self, key, None) for key in normalized_kwargs if key in self.fields
    }
    self.update_from_dict(normalized_kwargs)
    field_values = {key: getattr(self, key) for key in normalized_kwargs if key in self.fields}
    db_kwargs = {
        key: value for key, value in field_values.items() if self.fields[key].has_column()
    }
    fields = {}
    for key, field in self.fields.items():
        if key not in db_kwargs:
            continue
        field_validator = field.validator
        if field_validator.read_only:
            field_validator = copy.copy(field_validator)
            field_validator.read_only = False
        fields[key] = field_validator
    validator = Schema(fields=fields)
    token = EXPLICIT_SPECIFIED_VALUES.set(set(normalized_kwargs.keys()))
    try:
        hook_values = await self.execute_pre_save_hooks(
            field_values,
            original_field_values,
            is_update=True,
        )
    finally:
        EXPLICIT_SPECIFIED_VALUES.reset(token)
    validated_kwargs = validator.check(db_kwargs)
    db_kwargs = self.__class__.extract_column_values(
        validated_kwargs,
        is_update=True,
        is_partial=True,
        phase="prepare_update",
        instance=self,
        model_instance=self,
    )
    db_kwargs.update(hook_values)
    db_kwargs = self._update_auto_now_fields(db_kwargs, self.fields)

    if db_kwargs:
        expression = self.table.update().values(**db_kwargs).where(*self.identifying_clauses())
        check_db_connection(self.database)
        async with self.database as database:
            await database.execute(expression)
    await self.signals.post_update.send(sender=self.__class__, instance=self)
    self._apply_persisted_db_values(db_kwargs)

    return self

_delete_forward_references async

_delete_forward_references(ignore_fields)
Source code in saffier/core/db/models/model.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
async def _delete_forward_references(self, ignore_fields: set[str]) -> None:
    has_forward_reference_cleanup = any(
        getattr(field, "remove_referenced", False)
        for field in self.meta.foreign_key_fields.values()
    )
    if has_forward_reference_cleanup and self.can_load and not self._has_loaded_db_fields():
        await self.load(only_needed=True)

    for field_name, field in self.meta.foreign_key_fields.items():
        related_name = getattr(field, "related_name", None)
        if (
            field_name in ignore_fields
            or related_name in ignore_fields
            or not getattr(field, "remove_referenced", False)
        ):
            continue

        value = self.__dict__.get(field_name)
        if value is None:
            continue

        related = field.expand_relationship(value)
        if related is None:
            continue
        if hasattr(related, "can_load") and not related.can_load:
            continue

        remove_call: str | bool = related_name or True
        raw_delete = getattr(related, "raw_delete", None)
        if callable(raw_delete):
            await raw_delete(
                skip_post_delete_hooks=False,
                remove_referenced_call=remove_call,
            )
        else:
            await related.delete()

_delete_reverse_relations async

_delete_reverse_relations(ignore_fields)
Source code in saffier/core/db/models/model.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
async def _delete_reverse_relations(self, ignore_fields: set[str]) -> None:
    for related_name, related_field in self.meta.related_fields.items():
        if related_name in ignore_fields:
            continue
        if getattr(self.__class__, related_name, None) is None:
            continue

        foreign_key_name = related_field.get_foreign_key_field_name()
        foreign_key = related_field.related_from.fields.get(foreign_key_name)
        if foreign_key is None:
            continue

        should_cascade = getattr(foreign_key, "force_cascade_deletion_relation", False) or (
            getattr(foreign_key, "no_constraint", False)
            and getattr(foreign_key, "on_delete", None) == saffier.CASCADE
        )
        if not should_cascade:
            continue

        queryset = getattr(self, related_name)
        schema_name = getattr(self, "schema_name", None)
        if schema_name is not None and hasattr(queryset, "using"):
            queryset = queryset.using(schema=schema_name)
        await queryset.raw_delete(
            use_models=getattr(foreign_key, "use_model_based_deletion", False),
            remove_referenced_call=related_name,
        )

raw_delete async

raw_delete(
    *,
    skip_post_delete_hooks=False,
    remove_referenced_call=False,
)

Delete the row directly and perform configured relation cleanup.

Unlike delete(), this method performs the low-level delete operation and relation cleanup directly. It is used by queryset deletion flows and recursive cleanup helpers where signal behavior may already be managed by the caller.

PARAMETER DESCRIPTION
skip_post_delete_hooks

Accepted for backward compatibility.

TYPE: bool DEFAULT: False

remove_referenced_call

Internal flag used to avoid recursive cleanup loops and to decide whether delete signals should be emitted.

TYPE: bool | str DEFAULT: False

RETURNS DESCRIPTION
int

Number of database rows deleted for this instance.

TYPE: int

Source code in saffier/core/db/models/model.py
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
async def raw_delete(
    self,
    *,
    skip_post_delete_hooks: bool = False,
    remove_referenced_call: bool | str = False,
) -> int:
    """Delete the row directly and perform configured relation cleanup.

    Unlike `delete()`, this method performs the low-level delete operation
    and relation cleanup directly. It is used by queryset deletion flows and
    recursive cleanup helpers where signal behavior may already be managed by
    the caller.

    Args:
        skip_post_delete_hooks: Accepted for backward compatibility.
        remove_referenced_call: Internal flag used to avoid recursive cleanup
            loops and to decide whether delete signals should be emitted.

    Returns:
        int: Number of database rows deleted for this instance.
    """
    del skip_post_delete_hooks
    if getattr(self, "_db_deleted", False):
        return 0

    if self.__deletion_with_signals__ and remove_referenced_call:
        await self.signals.pre_delete.send(
            sender=self.__class__,
            instance=self,
            model_instance=self,
        )

    count_expression = sqlalchemy.func.count().select().select_from(self.table)
    count_expression = count_expression.where(*self.identifying_clauses())
    check_db_connection(self.database)
    async with self.database as database:
        row_count = cast("int", await database.fetch_val(count_expression) or 0)

    if row_count:
        expression = self.table.delete().where(*self.identifying_clauses())
        async with self.database as database:
            await database.execute(expression)

    self.__dict__["_db_deleted"] = bool(row_count)

    ignore_fields: set[str] = set()
    if isinstance(remove_referenced_call, str):
        ignore_fields.add(remove_referenced_call)

    if row_count:
        await self._delete_forward_references(ignore_fields)
        if not getattr(self, "__skip_generic_reverse_delete__", False):
            await self._delete_reverse_relations(ignore_fields)

    if self.__deletion_with_signals__ and remove_referenced_call:
        await self.signals.post_delete.send(
            sender=self.__class__,
            instance=self,
            model_instance=self,
            row_count=row_count,
        )
    return row_count

delete async

delete(skip_post_delete_hooks=False)

Delete the row and emit the standard pre/post delete signals.

PARAMETER DESCRIPTION
skip_post_delete_hooks

Backward-compatible argument passed through to raw_delete().

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
int

Number of deleted rows.

TYPE: int

Source code in saffier/core/db/models/model.py
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
async def delete(self, skip_post_delete_hooks: bool = False) -> int:
    """Delete the row and emit the standard pre/post delete signals.

    Args:
        skip_post_delete_hooks: Backward-compatible argument passed through
            to `raw_delete()`.

    Returns:
        int: Number of deleted rows.
    """
    await self.signals.pre_delete.send(
        sender=self.__class__,
        instance=self,
        model_instance=self,
    )

    row_count = await self.raw_delete(
        skip_post_delete_hooks=skip_post_delete_hooks,
        remove_referenced_call=False,
    )

    await self.signals.post_delete.send(
        sender=self.__class__,
        instance=self,
        model_instance=self,
        row_count=row_count,
    )
    return row_count

load async

load(only_needed=False)

Reload this instance from the database using its identifying fields.

PARAMETER DESCRIPTION
only_needed

When True, skip the query if all database-backed fields are already loaded.

TYPE: bool DEFAULT: False

Source code in saffier/core/db/models/model.py
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
async def load(self, only_needed: bool = False) -> None:
    """Reload this instance from the database using its identifying fields.

    Args:
        only_needed: When `True`, skip the query if all database-backed
            fields are already loaded.
    """
    if only_needed and self._has_loaded_db_fields():
        return

    # Build the select expression.
    expression = self.table.select().where(*self.identifying_clauses())

    # Perform the fetch.
    check_db_connection(self.database)
    async with self.database as database:
        row = await database.fetch_one(expression)

    if row is None:
        return

    loaded = self.__class__.from_query_result(
        row,
        using_schema=self.get_active_instance_schema(),
    )
    for key, value in loaded.__dict__.items():
        if key.startswith("_"):
            self.__dict__[key] = value
            continue
        setattr(self, key, value)

_save async

_save(**kwargs)

Insert a new row and reapply generated database values to the instance.

PARAMETER DESCRIPTION
**kwargs

Database-ready insert payload keyed by column name.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Model

The current persisted instance.

TYPE: Model

Source code in saffier/core/db/models/model.py
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
async def _save(self, **kwargs: typing.Any) -> "Model":
    """Insert a new row and reapply generated database values to the instance.

    Args:
        **kwargs: Database-ready insert payload keyed by column name.

    Returns:
        Model: The current persisted instance.
    """
    expression = self.table.insert().values(**kwargs)
    check_db_connection(self.database)
    async with self.database as database:
        autoincrement_value = await database.execute(expression)
    persisted_values = dict(kwargs)
    # sqlalchemy supports only one autoincrement column
    if autoincrement_value:
        column = self.table.autoincrement_column
        if column is not None and isinstance(autoincrement_value, Row):
            mapping = autoincrement_value._mapping
            autoincrement_value = mapping.get(column.key, mapping.get(column.name))
        # can be explicit set, which causes an invalid value returned
        if column is not None and column.key not in kwargs:
            persisted_values[column.key] = autoincrement_value
    self._apply_persisted_db_values(persisted_values)
    for field_name, field in self.fields.items():
        if field_name in self.__dict__:
            continue
        if not getattr(field, "null", False):
            continue
        if isinstance(field, (saffier.ForeignKey, saffier.OneToOneField)):
            self.__dict__[field_name] = None
    return self

save async

save(
    force_save=None,
    values=None,
    force_insert=None,
    **kwargs,
)

Create or update the row, then persist staged relations and references.

The save flow decides between insert and update automatically unless the caller forces insertion. It validates the outgoing payload, runs field hooks, emits signals, refreshes server-generated defaults when needed, and finally persists staged model references and reverse relation writes.

PARAMETER DESCRIPTION
force_save

Backward-compatible alias for forcing an insert.

TYPE: bool | None DEFAULT: None

values

Optional subset payload for partial updates.

TYPE: Any DEFAULT: None

force_insert

When True, always perform an insert.

TYPE: bool | None DEFAULT: None

**kwargs

Additional compatibility kwargs reserved for future use.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
type[Model] | Any

type[Model] | Any: The current model instance.

Source code in saffier/core/db/models/model.py
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
async def save(
    self,
    force_save: bool | None = None,
    values: typing.Any = None,
    force_insert: bool | None = None,
    **kwargs: typing.Any,
) -> type["Model"] | Any:
    """Create or update the row, then persist staged relations and references.

    The save flow decides between insert and update automatically unless the
    caller forces insertion. It validates the outgoing payload, runs field
    hooks, emits signals, refreshes server-generated defaults when needed,
    and finally persists staged model references and reverse relation writes.

    Args:
        force_save: Backward-compatible alias for forcing an insert.
        values: Optional subset payload for partial updates.
        force_insert: When `True`, always perform an insert.
        **kwargs: Additional compatibility kwargs reserved for future use.

    Returns:
        type[Model] | Any: The current model instance.
    """
    if force_insert is None:
        force_insert = bool(force_save)
    self.__dict__["_saffier_save_in_progress"] = True
    try:
        await self.signals.pre_save.send(sender=self.__class__, instance=self)

        extracted_fields = self.extract_db_fields()
        original_extracted_fields = dict(extracted_fields)
        explicit_field_values: dict[str, typing.Any] | None = None
        explicit_field_names: set[str] = set()

        if getattr(self, "pk", None) is None and self.fields[self.pkname].autoincrement:
            extracted_fields.pop(self.pkname, None)
        else:
            for field_name in type(self).pknames:
                field = self.fields[field_name]
                if field.autoincrement and getattr(self, field_name, None) is None:
                    extracted_fields.pop(field_name, None)

        if isinstance(values, (set, list, tuple)):
            explicit_field_names = set(values)
            values = None

        if values:
            normalized_values = self.__class__.normalize_field_kwargs(values)
            self.update_from_dict(normalized_values)
            explicit_field_values = {
                key: getattr(self, key) for key in normalized_values if key in self.fields
            }
            explicit_field_names = set(explicit_field_values)
            extracted_fields.update(explicit_field_values)

        self.update_from_dict(dict(extracted_fields.items()))
        is_create = bool(force_insert) or not self.can_load or self._should_force_insert()
        token = EXPLICIT_SPECIFIED_VALUES.set(explicit_field_names)
        try:
            hook_values = await self.execute_pre_save_hooks(
                extracted_fields,
                original_extracted_fields,
                is_update=not is_create,
            )
        finally:
            EXPLICIT_SPECIFIED_VALUES.reset(token)

        input_values = (
            extracted_fields
            if is_create or explicit_field_values is None
            else explicit_field_values
        )
        fields = {}
        for key, field in self.fields.items():
            if key not in input_values:
                continue
            field_validator = field.validator
            if field_validator.read_only:
                field_validator = copy.copy(field_validator)
                field_validator.read_only = False
            fields[key] = field_validator
        validator = Schema(fields=fields)
        validated = validator.check(input_values)
        kwargs = self.__class__.extract_column_values(
            validated,
            is_update=not is_create,
            is_partial=not is_create and explicit_field_values is not None,
            phase="prepare_insert" if is_create else "prepare_update",
            instance=self,
            model_instance=self,
        )
        kwargs.update(hook_values)
        kwargs = self._update_auto_now_fields(kwargs, self.fields)

        # Performs the update or the create based on a possible existing primary key
        if is_create:
            await self._save(**kwargs)
        else:
            await self.signals.pre_update.send(
                sender=self.__class__, instance=self, kwargs=kwargs
            )
            if kwargs:
                expression = (
                    self.table.update().values(**kwargs).where(*self.identifying_clauses())
                )
                check_db_connection(self.database)
                async with self.database as database:
                    await database.execute(expression)
            await self.signals.post_update.send(sender=self.__class__, instance=self)
            self._apply_persisted_db_values(kwargs)

        # Refresh the results
        if any(
            field.server_default is not None
            for name, field in self.fields.items()
            if name not in extracted_fields
        ):
            await self.load()

        await self._persist_model_references()
        await self._persist_related_fields()
        await self.signals.post_save.send(sender=self.__class__, instance=self)
        return self
    finally:
        self.__dict__.pop("_saffier_save_in_progress", None)

real_save async

real_save(
    force_insert=False,
    values=None,
    force_save=None,
    **kwargs,
)

Backward-compatible wrapper around save().

Older Saffier APIs used force_save; newer code uses force_insert. This method preserves both spellings while delegating to save().

RETURNS DESCRIPTION
type[Model] | Any

type[Model] | Any: Value returned by save().

Source code in saffier/core/db/models/model.py
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
async def real_save(
    self,
    force_insert: bool = False,
    values: typing.Any = None,
    force_save: bool | None = None,
    **kwargs: typing.Any,
) -> type["Model"] | Any:
    """Backward-compatible wrapper around `save()`.

    Older Saffier APIs used `force_save`; newer code uses `force_insert`.
    This method preserves both spellings while delegating to `save()`.

    Returns:
        type[Model] | Any: Value returned by `save()`.
    """
    if force_save is not None:
        force_insert = force_save
    return await self.save(force_insert=force_insert, values=values, **kwargs)

__getattr__

__getattr__(name)

Resolve deferred attributes on demand.

The fallback handles computed fields, virtual relations, and unloaded database-backed fields. Accessing an unloaded DB field triggers a synchronous load() call through run_sync() unless the attribute was explicitly marked as a no-load trigger.

PARAMETER DESCRIPTION
name

Attribute being resolved.

TYPE: str

RETURNS DESCRIPTION
Any

Resolved attribute value.

TYPE: Any

RAISES DESCRIPTION
AttributeError

If the attribute cannot be resolved.

Source code in saffier/core/db/models/model.py
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
def __getattr__(self, name: str) -> Any:
    """Resolve deferred attributes on demand.

    The fallback handles computed fields, virtual relations, and unloaded
    database-backed fields. Accessing an unloaded DB field triggers a
    synchronous `load()` call through `run_sync()` unless the attribute was
    explicitly marked as a no-load trigger.

    Args:
        name: Attribute being resolved.

    Returns:
        Any: Resolved attribute value.

    Raises:
        AttributeError: If the attribute cannot be resolved.
    """
    if name not in self.__dict__ and name in self.fields and name not in type(self).pknames:
        field = self.fields[name]
        if getattr(field, "is_computed", False):
            return field.get_value(self, name)
        if field.is_virtual:
            if isinstance(field, saffier.ManyToManyField):
                return getattr(self, settings.many_to_many_relation.format(key=name))
            if hasattr(field, "get_value"):
                return field.get_value(self, name)
            raise AttributeError(name)
        if name in getattr(self, "__no_load_trigger_attrs__", set()):
            raise AttributeError(name)
        run_sync(self.load())
        return self.__dict__[name]
    raise AttributeError(name)

__setattr__

__setattr__(key, value)
Source code in saffier/core/db/models/model.py
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
def __setattr__(self, key: Any, value: Any) -> Any:
    if key in getattr(self, "fields", {}):
        field = self.fields[key]
        is_relationship = isinstance(
            field,
            (saffier.ForeignKey, saffier.OneToOneField, saffier.ManyToManyField),
        )
        if (
            not field.is_virtual
            and not is_relationship
            and not getattr(field, "is_computed", False)
        ):
            value = field.validator.check(value)
        return super().__setattr__(key, value)

    plain_fields = getattr(type(self), "__plain_model_fields__", {})
    if key in plain_fields:
        value = type(self).validate_plain_field_value(key, value)
        return super().__setattr__(key, value)

    if key.startswith("_") or key in self.__dict__ or hasattr(type(self), key):
        return super().__setattr__(key, value)

    raise AttributeError(
        f"Unknown attribute '{key}' for strict model '{self.__class__.__name__}'."
    )