Skip to content

Database class

saffier.Database

Database(url=None, *, force_rollback=None, config=None, **options)

An abstraction on the top of the EncodeORM databases.Database object.

This object allows to pass also a configuration dictionary in the format of

DATABASEZ_CONFIG = { "connection": { "credentials": { "scheme": 'sqlite', "postgres"... "host": ..., "port": ..., "user": ..., "password": ..., "database": ..., "options": { "driver": ... "ssl": ... } } } }

PARAMETER DESCRIPTION
url

TYPE: Optional[Union[str, DatabaseURL, URL, Database]] DEFAULT: None

force_rollback

TYPE: Union[bool, None] DEFAULT: None

config

TYPE: Optional['DictAny'] DEFAULT: None

**options

TYPE: Any DEFAULT: {}

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self,
    url: typing.Optional[typing.Union[str, DatabaseURL, URL, Database]] = None,
    *,
    force_rollback: typing.Union[bool, None] = None,
    config: typing.Optional["DictAny"] = None,
    **options: typing.Any,
):
    init()
    assert config is None or url is None, "Use either 'url' or 'config', not both."
    if isinstance(url, Database):
        assert not options, "Cannot specify options when copying a Database object."
        self.backend = url.backend.__copy__()
        self.url = url.url
        self.options = url.options
        if force_rollback is None:
            self._force_rollback = url._force_rollback
        else:
            self._force_rollback = force_rollback
    else:
        url = DatabaseURL(url)
        if config and "connection" in config:
            connection_config = config["connection"]
            if "credentials" in connection_config:
                connection_config = connection_config["credentials"]
                url = url.replace(**connection_config)
        self.backend, self.url, self.options = self.apply_database_url_and_options(
            url, **options
        )
        self._force_rollback = bool(force_rollback)
    self.backend.owner = self
    self._connection_map = weakref.WeakKeyDictionary()

    # When `force_rollback=True` is used, we use a single global
    # connection, within a transaction that always rolls back.
    self._global_connection: typing.Optional[Connection] = None
    self._global_transaction: typing.Optional[Transaction] = None

_connection_map instance-attribute

_connection_map = WeakKeyDictionary()

backend instance-attribute

backend

url instance-attribute

url

options instance-attribute

options

_force_rollback instance-attribute

_force_rollback

is_connected class-attribute instance-attribute

is_connected = False

_global_connection instance-attribute

_global_connection = None

_global_transaction instance-attribute

_global_transaction = None

_current_task property

_current_task

_connection property writable

_connection

engine property

engine

__copy__

__copy__()
Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
133
134
def __copy__(self) -> Database:
    return self.__class__(self)

connect async

connect()

Establish the connection pool.

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
async def connect(self) -> None:
    """
    Establish the connection pool.
    """
    if self.is_connected:
        logger.debug("Already connected, skipping connection")
        return None

    await self.backend.connect(self.url, **self.options)
    logger.info("Connected to database %s", self.url.obscure_password, extra=CONNECT_EXTRA)
    self.is_connected = True

    if self._force_rollback:
        assert self._global_connection is None
        assert self._global_transaction is None

        self._global_connection = Connection(self, self.backend)
        self._global_transaction = self._global_connection.transaction(force_rollback=True)

        await self._global_transaction.__aenter__()

disconnect async

disconnect()

Close all connections in the connection pool.

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
async def disconnect(self) -> None:
    """
    Close all connections in the connection pool.
    """
    if not self.is_connected:
        logger.debug("Already disconnected, skipping disconnection")
        return None

    if self._force_rollback:
        assert self._global_connection is not None
        assert self._global_transaction is not None

        await self._global_transaction.__aexit__()

        self._global_transaction = None
        self._global_connection = None
    else:
        self._connection = None

    await self.backend.disconnect()
    logger.info(
        "Disconnected from database %s",
        self.url.obscure_password,
        extra=DISCONNECT_EXTRA,
    )
    self.is_connected = False

fetch_all async

fetch_all(query, values=None)
PARAMETER DESCRIPTION
query

TYPE: Union[ClauseElement, str]

values

TYPE: Optional[dict] DEFAULT: None

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
218
219
220
221
222
223
224
async def fetch_all(
    self,
    query: typing.Union[ClauseElement, str],
    values: typing.Optional[dict] = None,
) -> typing.List[interfaces.Record]:
    async with self.connection() as connection:
        return await connection.fetch_all(query, values)

fetch_one async

fetch_one(query, values=None, pos=0)
PARAMETER DESCRIPTION
query

TYPE: Union[ClauseElement, str]

values

TYPE: Optional[dict] DEFAULT: None

pos

TYPE: int DEFAULT: 0

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
226
227
228
229
230
231
232
233
async def fetch_one(
    self,
    query: typing.Union[ClauseElement, str],
    values: typing.Optional[dict] = None,
    pos: int = 0,
) -> typing.Optional[interfaces.Record]:
    async with self.connection() as connection:
        return await connection.fetch_one(query, values)

fetch_val async

fetch_val(query, values=None, column=0, pos=0)
PARAMETER DESCRIPTION
query

TYPE: Union[ClauseElement, str]

values

TYPE: Optional[dict] DEFAULT: None

column

TYPE: Any DEFAULT: 0

pos

TYPE: int DEFAULT: 0

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
235
236
237
238
239
240
241
242
243
async def fetch_val(
    self,
    query: typing.Union[ClauseElement, str],
    values: typing.Optional[dict] = None,
    column: typing.Any = 0,
    pos: int = 0,
) -> typing.Any:
    async with self.connection() as connection:
        return await connection.fetch_val(query, values, column=column, pos=pos)

execute async

execute(query, values=None)
PARAMETER DESCRIPTION
query

TYPE: Union[ClauseElement, str]

values

TYPE: Any DEFAULT: None

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
245
246
247
248
249
250
251
async def execute(
    self,
    query: typing.Union[ClauseElement, str],
    values: typing.Any = None,
) -> typing.Union[interfaces.Record, int]:
    async with self.connection() as connection:
        return await connection.execute(query, values)

execute_many async

execute_many(query, values)
PARAMETER DESCRIPTION
query

TYPE: Union[ClauseElement, str]

values

TYPE: list

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
253
254
255
async def execute_many(self, query: typing.Union[ClauseElement, str], values: list) -> None:
    async with self.connection() as connection:
        return await connection.execute_many(query, values)

iterate async

iterate(query, values=None, chunk_size=None)
PARAMETER DESCRIPTION
query

TYPE: Union[ClauseElement, str]

values

TYPE: Optional[dict] DEFAULT: None

chunk_size

TYPE: Optional[int] DEFAULT: None

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
257
258
259
260
261
262
263
264
265
async def iterate(
    self,
    query: typing.Union[ClauseElement, str],
    values: typing.Optional[dict] = None,
    chunk_size: typing.Optional[int] = None,
) -> typing.AsyncGenerator[interfaces.Record, None]:
    async with self.connection() as connection:
        async for record in connection.iterate(query, values, chunk_size):
            yield record

batched_iterate async

batched_iterate(query, values=None, batch_size=None, batch_wrapper=tuple)
PARAMETER DESCRIPTION
query

TYPE: Union[ClauseElement, str]

values

TYPE: Optional[dict] DEFAULT: None

batch_size

TYPE: Optional[int] DEFAULT: None

batch_wrapper

TYPE: Union[BatchCallable] DEFAULT: tuple

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
267
268
269
270
271
272
273
274
275
276
async def batched_iterate(
    self,
    query: typing.Union[ClauseElement, str],
    values: typing.Optional[dict] = None,
    batch_size: typing.Optional[int] = None,
    batch_wrapper: typing.Union[BatchCallable] = tuple,
) -> typing.AsyncGenerator[BatchCallableResult, None]:
    async with self.connection() as connection:
        async for records in connection.batched_iterate(query, values, batch_size):
            yield batch_wrapper(records)

transaction

transaction(*, force_rollback=False, **kwargs)
PARAMETER DESCRIPTION
force_rollback

TYPE: bool DEFAULT: False

**kwargs

TYPE: Any DEFAULT: {}

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
278
279
def transaction(self, *, force_rollback: bool = False, **kwargs: typing.Any) -> "Transaction":
    return Transaction(self.connection, force_rollback=force_rollback, **kwargs)

run_sync async

run_sync(fn, *args, **kwargs)
PARAMETER DESCRIPTION
fn

TYPE: Callable[..., Any]

*args

TYPE: Any DEFAULT: ()

**kwargs

TYPE: Any DEFAULT: {}

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
281
282
283
284
285
286
287
288
async def run_sync(
    self,
    fn: typing.Callable[..., typing.Any],
    *args: typing.Any,
    **kwargs: typing.Any,
) -> typing.Any:
    async with self.connection() as connection:
        return await connection.run_sync(fn, *args, **kwargs)

create_all async

create_all(meta, **kwargs)
PARAMETER DESCRIPTION
meta

TYPE: MetaData

**kwargs

TYPE: Any DEFAULT: {}

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
290
291
292
async def create_all(self, meta: MetaData, **kwargs: typing.Any) -> None:
    async with self.connection() as connection:
        await connection.create_all(meta, **kwargs)

drop_all async

drop_all(meta, **kwargs)
PARAMETER DESCRIPTION
meta

TYPE: MetaData

**kwargs

TYPE: Any DEFAULT: {}

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
294
295
296
async def drop_all(self, meta: MetaData, **kwargs: typing.Any) -> None:
    async with self.connection() as connection:
        await connection.drop_all(meta, **kwargs)

connection

connection()
Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
298
299
300
301
302
303
304
def connection(self) -> Connection:
    if self._global_connection is not None:
        return self._global_connection

    if not self._connection:
        self._connection = Connection(self, self.backend)
    return self._connection

force_rollback

force_rollback()
YIELDS DESCRIPTION
None
Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
310
311
312
313
314
315
316
317
@contextlib.contextmanager
def force_rollback(self) -> typing.Iterator[None]:
    initial = self._force_rollback
    self._force_rollback = True
    try:
        yield
    finally:
        self._force_rollback = initial

get_backends classmethod

get_backends(scheme='', *, overwrite_paths=['databasez.overwrites'], database_name='Database', connection_name='Connection', transaction_name='Transaction', database_class=None, connection_class=None, transaction_class=None)
PARAMETER DESCRIPTION
scheme

TYPE: str DEFAULT: ''

overwrite_paths

TYPE: Sequence[str] DEFAULT: ['databasez.overwrites']

database_name

TYPE: str DEFAULT: 'Database'

connection_name

TYPE: str DEFAULT: 'Connection'

transaction_name

TYPE: str DEFAULT: 'Transaction'

database_class

TYPE: Optional[Type[DatabaseBackend]] DEFAULT: None

connection_class

TYPE: Optional[Type[ConnectionBackend]] DEFAULT: None

transaction_class

TYPE: Optional[Type[TransactionBackend]] DEFAULT: None

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
@classmethod
def get_backends(
    cls,
    # let scheme empty for direct imports
    scheme: str = "",
    *,
    overwrite_paths: typing.Sequence[str] = ["databasez.overwrites"],
    database_name: str = "Database",
    connection_name: str = "Connection",
    transaction_name: str = "Transaction",
    database_class: typing.Optional[typing.Type[interfaces.DatabaseBackend]] = None,
    connection_class: typing.Optional[typing.Type[interfaces.ConnectionBackend]] = None,
    transaction_class: typing.Optional[typing.Type[interfaces.TransactionBackend]] = None,
) -> typing.Tuple[
    typing.Type[interfaces.DatabaseBackend],
    typing.Type[interfaces.ConnectionBackend],
    typing.Type[interfaces.TransactionBackend],
]:
    module: typing.Any = None
    for overwrite_path in overwrite_paths:
        imp_path = f"{overwrite_path}.{scheme.replace('+', '_')}" if scheme else overwrite_path
        try:
            module = importlib.import_module(imp_path)
        except ImportError as exc:
            logging.debug(
                f'Import of "{imp_path}" failed. This is not an error.', exc_info=exc
            )
            if "+" in scheme:
                imp_path = f"{overwrite_path}.{scheme.split('+', 1)[0]}"
                try:
                    module = importlib.import_module(imp_path)
                except ImportError as exc:
                    logging.debug(
                        f'Import of "{imp_path}" failed. This is not an error.', exc_info=exc
                    )
        if module is not None:
            break
    database_class = getattr(module, database_name, database_class)
    assert database_class is not None and issubclass(
        database_class, interfaces.DatabaseBackend
    )
    connection_class = getattr(module, connection_name, connection_class)
    assert connection_class is not None and issubclass(
        connection_class, interfaces.ConnectionBackend
    )
    transaction_class = getattr(module, transaction_name, transaction_class)
    assert transaction_class is not None and issubclass(
        transaction_class, interfaces.TransactionBackend
    )
    return database_class, connection_class, transaction_class

apply_database_url_and_options classmethod

apply_database_url_and_options(url, *, overwrite_paths=['databasez.overwrites'], **options)
PARAMETER DESCRIPTION
url

TYPE: Union[DatabaseURL, str]

overwrite_paths

TYPE: Sequence[str] DEFAULT: ['databasez.overwrites']

**options

TYPE: Any DEFAULT: {}

Source code in .venv/lib/python3.8/site-packages/databasez/core/database.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
@classmethod
def apply_database_url_and_options(
    cls,
    url: typing.Union[DatabaseURL, str],
    *,
    overwrite_paths: typing.Sequence[str] = ["databasez.overwrites"],
    **options: typing.Any,
) -> typing.Tuple[interfaces.DatabaseBackend, DatabaseURL, typing.Dict[str, typing.Any]]:
    url = DatabaseURL(url)
    database_class, connection_class, transaction_class = cls.get_backends(
        url.scheme,
        database_class=default_database,
        connection_class=default_connection,
        transaction_class=default_transaction,
        overwrite_paths=overwrite_paths,
    )

    backend = database_class(
        connection_class=connection_class, transaction_class=transaction_class
    )
    url, options = backend.extract_options(url, **options)
    # check against transformed url
    assert url.sqla_url.get_dialect(True).is_async, f'Dialect: "{url.scheme}" is not async.'

    return backend, url, options