Skip to content

Schema

Schema is the registry helper that performs database-schema operations such as creating and dropping schemas.

It matters most in multi-schema and multi-tenant deployments where the application needs to provision or tear down schema-scoped tables explicitly.

Typical use

Most application code reaches Schema through the registry:

await models.schema.create_schema("tenant_a", if_not_exists=True)
await models.schema.drop_schema("tenant_a", cascade=True, if_exists=True)

Important distinction

This helper manages database schemas, not Saffier model field schemas or JSON schemas.

saffier.core.connection.schemas.Schema

Schema(registry)

Schema management helper bound to a registry.

The object centralizes low-level schema creation, teardown, and schema-path switching so the registry can reuse the same logic for migrations, tenancy, and table-creation helpers.

Source code in saffier/core/connection/schemas.py
22
23
def __init__(self, registry: type["Registry"]) -> None:
    self.registry = registry

registry instance-attribute

registry = registry

database property

database

get_default_schema

get_default_schema()

Return the database dialect's default schema name.

RETURNS DESCRIPTION
str | None

str | None: Default schema reported by the active dialect.

Source code in saffier/core/connection/schemas.py
29
30
31
32
33
34
35
36
37
def get_default_schema(self) -> str | None:
    """Return the database dialect's default schema name.

    Returns:
        str | None: Default schema reported by the active dialect.
    """
    if not hasattr(self, "_default_schema"):
        self._default_schema = self.database.url.sqla_url.get_dialect(True).default_schema_name
    return self._default_schema

activate_schema_path async

activate_schema_path(database, schema, is_shared=True)
Source code in saffier/core/connection/schemas.py
39
40
41
42
43
44
45
46
47
48
async def activate_schema_path(
    self, database: "Database", schema: str, is_shared: bool = True
) -> None:
    path = (
        f"SET search_path TO {schema}, shared;"
        if is_shared
        else f"SET search_path TO {schema};"
    )
    expression = sqlalchemy.text(path)
    await database.execute(expression)

create_schema async

create_schema(
    schema,
    if_not_exists=False,
    init_models=False,
    init_tenant_models=False,
    update_cache=True,
    databases=(None,),
)

Create a schema and optionally create model tables inside it.

PARAMETER DESCRIPTION
schema

Schema name to create. None means "create tables in the default database namespace only".

TYPE: str | None

if_not_exists

Whether schema and table creation should be idempotent.

TYPE: bool DEFAULT: False

init_models

Whether registered model tables should be created after the schema exists.

TYPE: bool DEFAULT: False

init_tenant_models

Reserved for backward compatibility.

TYPE: bool DEFAULT: False

update_cache

Whether schema-specific table caches should be refreshed when building tables.

TYPE: bool DEFAULT: True

databases

Database aliases that should receive the operation.

TYPE: Sequence[str | None] DEFAULT: (None,)

Source code in saffier/core/connection/schemas.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
async def create_schema(
    self,
    schema: str | None,
    if_not_exists: bool = False,
    init_models: bool = False,
    init_tenant_models: bool = False,
    update_cache: bool = True,
    databases: Sequence[str | None] = (None,),
) -> None:
    """Create a schema and optionally create model tables inside it.

    Args:
        schema: Schema name to create. `None` means "create tables in the
            default database namespace only".
        if_not_exists: Whether schema and table creation should be
            idempotent.
        init_models: Whether registered model tables should be created after
            the schema exists.
        init_tenant_models: Reserved for backward compatibility.
        update_cache: Whether schema-specific table caches should be
            refreshed when building tables.
        databases: Database aliases that should receive the operation.
    """
    del init_tenant_models
    schema_tables_by_metadata: dict[sqlalchemy.MetaData, list[sqlalchemy.Table]] = {}

    if init_models:
        for collection_name in ("models", "reflected"):
            for model_class in getattr(self.registry, collection_name, {}).values():
                if getattr(model_class, "__using_schema__", None) is not None:
                    continue
                table = model_class.table_schema(schema=schema, update_cache=update_cache)
                schema_tables_by_metadata.setdefault(table.metadata, []).append(table)

    for database_name in databases:
        database = (
            self.registry.database
            if database_name is None
            else self.registry.extra[database_name]
        )
        async with database as database:
            with database.force_rollback(False):
                if schema is None:
                    metadata = self.registry.metadata_by_name[database_name]

                    def execute_create_all(
                        connection: sqlalchemy.Connection,
                        metadata: sqlalchemy.MetaData = metadata,
                    ) -> None:
                        metadata.create_all(connection, checkfirst=if_not_exists)

                    await database.run_sync(execute_create_all)
                    continue

                try:
                    await database.run_sync(
                        self._execute_create_schema,
                        schema,
                        if_not_exists,
                        schema_tables_by_metadata,
                        init_models,
                    )
                except ProgrammingError as e:
                    raise SchemaError(detail=e.orig.args[0]) from e  # type: ignore[index]

drop_schema async

drop_schema(
    schema,
    cascade=False,
    if_exists=False,
    databases=(None,),
)

Drop a schema or all tables from the selected databases.

PARAMETER DESCRIPTION
schema

Schema name to drop, or None to drop tables from the default namespace.

TYPE: str | None

cascade

Whether the schema drop should cascade.

TYPE: bool DEFAULT: False

if_exists

Whether missing schema/table objects should be ignored.

TYPE: bool DEFAULT: False

databases

Database aliases that should receive the operation.

TYPE: Sequence[str | None] DEFAULT: (None,)

Source code in saffier/core/connection/schemas.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
async def drop_schema(
    self,
    schema: str | None,
    cascade: bool = False,
    if_exists: bool = False,
    databases: Sequence[str | None] = (None,),
) -> None:
    """Drop a schema or all tables from the selected databases.

    Args:
        schema: Schema name to drop, or `None` to drop tables from the
            default namespace.
        cascade: Whether the schema drop should cascade.
        if_exists: Whether missing schema/table objects should be ignored.
        databases: Database aliases that should receive the operation.
    """

    for database_name in databases:
        database = (
            self.registry.database
            if database_name is None
            else self.registry.extra[database_name]
        )
        async with database as database:
            with database.force_rollback(False):
                try:
                    await database.run_sync(
                        self._execute_drop_schema,
                        database_name,
                        schema,
                        cascade,
                        if_exists,
                    )
                except DBAPIError as e:
                    raise SchemaError(detail=e.orig.args[0]) from e  # type: ignore[index]

_execute_create_schema

_execute_create_schema(
    connection,
    schema,
    if_not_exists,
    schema_tables_by_metadata,
    init_models,
)
Source code in saffier/core/connection/schemas.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def _execute_create_schema(
    self,
    connection: sqlalchemy.Connection,
    schema: str,
    if_not_exists: bool,
    schema_tables_by_metadata: dict[sqlalchemy.MetaData, list[sqlalchemy.Table]],
    init_models: bool,
) -> None:
    connection.execute(
        sqlalchemy.schema.CreateSchema(
            name=schema,
            if_not_exists=if_not_exists,
        )
    )
    if init_models:
        for metadata, tables in schema_tables_by_metadata.items():
            metadata.create_all(
                connection,
                checkfirst=if_not_exists,
                tables=list(tables),
            )

_execute_drop_schema

_execute_drop_schema(
    connection, database_name, schema, cascade, if_exists
)
Source code in saffier/core/connection/schemas.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def _execute_drop_schema(
    self,
    connection: sqlalchemy.Connection,
    database_name: str | None,
    schema: str | None,
    cascade: bool,
    if_exists: bool,
) -> None:
    if schema is not None:
        connection.execute(
            sqlalchemy.schema.DropSchema(
                name=schema,
                cascade=cascade,
                if_exists=if_exists,
            )
        )
        return
    self.registry.metadata_by_name[database_name].drop_all(
        connection,
        checkfirst=if_exists,
    )