Skip to content

Database

Database is Saffier's database connection wrapper.

Most applications interact with it indirectly through a Registry, but the class is still important because it defines connection lifecycle, transaction management, and the SQLAlchemy async engine used by queries and schema helpers.

Typical usage

database = saffier.Database(
    "postgresql+asyncpg://postgres:postgres@localhost:5432/app"
)
models = saffier.Registry(database=database)

What to know in practice

  • prefer saffier.Database, not the databases package object
  • registry lifecycle usually controls connect() and disconnect()
  • synchronous reflection paths use the wrapped sync engine derived from the async engine

saffier.Database

Database(
    url=None,
    *,
    force_rollback=None,
    config=None,
    full_isolation=None,
    poll_interval=None,
    **options,
)

An abstraction on the top of the EncodeORM databases.Database object.

This object allows to pass also a configuration dictionary in the format of

DATABASEZ_CONFIG = { "connection": { "credentials": { "scheme": 'sqlite', "postgres"... "host": ..., "port": ..., "user": ..., "password": ..., "database": ..., "options": { "driver": ... "ssl": ... } } } }

Source code in databasez/core/database.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def __init__(
    self,
    url: str | DatabaseURL | URL | Database | None = None,
    *,
    force_rollback: bool | None = None,
    config: DictAny | None = None,
    full_isolation: bool | None = None,
    # for custom poll intervals
    poll_interval: float | None = None,
    **options: Any,
):
    init()
    assert config is None or url is None, "Use either 'url' or 'config', not both."
    if isinstance(url, Database):
        assert not options, "Cannot specify options when copying a Database object."
        self.backend = url.backend.__copy__()
        self.url = url.url
        self.options = url.options
        self._call_hooks = url._call_hooks
        if poll_interval is None:
            poll_interval = url.poll_interval
        if force_rollback is None:
            force_rollback = bool(url.force_rollback)
        if full_isolation is None:
            full_isolation = bool(url._full_isolation)
    else:
        url = DatabaseURL(url)
        if config and "connection" in config:
            connection_config = config["connection"]
            if "credentials" in connection_config:
                connection_config = connection_config["credentials"]
                url = url.replace(**connection_config)
        self.backend, self.url, self.options = self.apply_database_url_and_options(
            url, **options
        )
        if force_rollback is None:
            force_rollback = False
        if full_isolation is None:
            full_isolation = False
        if poll_interval is None:
            # when not using utils...., the constant cannot be changed at runtime
            poll_interval = utils.DATABASEZ_POLL_INTERVAL
    self.poll_interval = poll_interval
    self._full_isolation = full_isolation
    self._force_rollback = ForceRollback(force_rollback)
    self.backend.owner = self
    self._connection_map = weakref.WeakKeyDictionary()
    self._databases_map = {}

    # When `force_rollback=True` is used, we use a single global
    # connection, within a transaction that always rolls back.
    self._global_connection: Connection | None = None

    self.ref_counter: int = 0
    self.ref_lock: asyncio.Lock = asyncio.Lock()

_connection_map instance-attribute

_connection_map = WeakKeyDictionary()

_databases_map instance-attribute

_databases_map = {}

_loop class-attribute instance-attribute

_loop = None

backend instance-attribute

backend

url instance-attribute

url

options instance-attribute

options

is_connected class-attribute instance-attribute

is_connected = False

_call_hooks class-attribute instance-attribute

_call_hooks = True

_remove_global_connection class-attribute instance-attribute

_remove_global_connection = True

_full_isolation class-attribute instance-attribute

_full_isolation = full_isolation

poll_interval instance-attribute

poll_interval = poll_interval

_force_rollback instance-attribute

_force_rollback = ForceRollback(force_rollback)

force_rollback class-attribute instance-attribute

force_rollback = ForceRollbackDescriptor()

async_helper class-attribute instance-attribute

async_helper = AsyncHelperDatabase

_global_connection instance-attribute

_global_connection = None

ref_counter instance-attribute

ref_counter = 0

ref_lock instance-attribute

ref_lock = Lock()

_current_task property

_current_task

Return the currently running asyncio task.

RAISES DESCRIPTION
RuntimeError

If no task is active.

_connection property writable

_connection

Return the connection bound to the current task, if any.

engine property

engine

The SQLAlchemy :class:AsyncEngine, if connected.

__copy__

__copy__()

Create a shallow copy of the database (preserving backend state).

RETURNS DESCRIPTION
Database

A new Database instance sharing the same backend.

TYPE: Database

Source code in databasez/core/database.py
329
330
331
332
333
334
335
def __copy__(self) -> Database:
    """Create a shallow copy of the database (preserving backend state).

    Returns:
        Database: A new Database instance sharing the same backend.
    """
    return self.__class__(self)

inc_refcount async

inc_refcount()

Internal method to bump the ref_count.

Return True if ref_count is 0, False otherwise.

Should not be used outside of tests. Use connect and hooks instead. Not multithreading safe!

Source code in databasez/core/database.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
async def inc_refcount(self) -> bool:
    """
    Internal method to bump the ref_count.

    Return True if ref_count is 0, False otherwise.

    Should not be used outside of tests. Use connect and hooks instead.
    Not multithreading safe!
    """
    async with self.ref_lock:
        self.ref_counter += 1
        # on the first call is count is 1 because of the former +1
        if self.ref_counter == 1:
            return True
    return False

decr_refcount async

decr_refcount()

Internal method to decrease the ref_count.

Return True if ref_count drops to 0, False otherwise.

Should not be used outside of tests. Use disconnect and hooks instead. Not multithreading safe!

Source code in databasez/core/database.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
async def decr_refcount(self) -> bool:
    """
    Internal method to decrease the ref_count.

    Return True if ref_count drops to 0, False otherwise.

    Should not be used outside of tests. Use disconnect and hooks instead.
    Not multithreading safe!
    """
    async with self.ref_lock:
        if self.ref_counter <= 0:
            self.ref_counter = 0
            return False
        self.ref_counter -= 1
        # on the last call, the count is 0
        if self.ref_counter == 0:
            return True
    return False

connect_hook async

connect_hook()

Hook called before engine setup on first connect.

Override this in subclasses to perform custom initialisation logic, such as creating test databases. Protected by the ref-counter so it runs only on the first connect() call.

Source code in databasez/core/database.py
399
400
401
402
403
404
405
async def connect_hook(self) -> None:
    """Hook called before engine setup on first connect.

    Override this in subclasses to perform custom initialisation logic,
    such as creating test databases.  Protected by the ref-counter so
    it runs only on the *first* ``connect()`` call.
    """

connect async

connect()

Establish the connection pool.

If called from a different event loop than the one the database was originally connected on, a sub-database is transparently created for the current loop.

RETURNS DESCRIPTION
bool

True if this was the first connection (pool created), False if only the ref-counter was incremented.

TYPE: bool

Source code in databasez/core/database.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
async def connect(self) -> bool:
    """Establish the connection pool.

    If called from a different event loop than the one the database was
    originally connected on, a sub-database is transparently created for
    the current loop.

    Returns:
        bool: ``True`` if this was the first connection (pool created),
            ``False`` if only the ref-counter was incremented.
    """
    loop = asyncio.get_running_loop()
    if self._loop is not None and loop != self._loop:
        if self.poll_interval < 0:
            raise RuntimeError("Subdatabases and polling are disabled")
        # copy when not in map
        if loop not in self._databases_map:
            assert self._global_connection is not None, (
                "global connection should have been set"
            )
            # correctly initialize force_rollback with parent value
            database = self.__class__(
                self, force_rollback=bool(self.force_rollback), full_isolation=False
            )
            # prevent side effects of connect_hook
            database._call_hooks = False
            database._global_connection = self._global_connection
            self._databases_map[loop] = database
        # forward call
        return await self._databases_map[loop].connect()

    if not await self.inc_refcount():
        assert self.is_connected, "ref_count < 0"
        return False
    if self._call_hooks:
        try:
            await self.connect_hook()
        except BaseException:
            await self.decr_refcount()
            raise
    self._loop = asyncio.get_event_loop()

    await self.backend.connect(self.url, **self.options)
    self.is_connected = True

    if self._global_connection is None:
        connection = Connection(self, force_rollback=True, full_isolation=self._full_isolation)
        self._global_connection = connection
    else:
        self._remove_global_connection = False
    return True

disconnect_hook async

disconnect_hook()

Hook called after engine teardown on last disconnect.

Override this in subclasses to perform custom cleanup logic. Protected by the ref-counter so it runs only on the last disconnect() call.

Source code in databasez/core/database.py
459
460
461
462
463
464
465
async def disconnect_hook(self) -> None:
    """Hook called after engine teardown on last disconnect.

    Override this in subclasses to perform custom cleanup logic.
    Protected by the ref-counter so it runs only on the *last*
    ``disconnect()`` call.
    """

disconnect async

disconnect(force=False, *, parent_database=None)

Close all connections in the connection pool.

PARAMETER DESCRIPTION
force

If True, disconnect even when the ref-counter is above zero.

TYPE: bool DEFAULT: False

parent_database

Injected by :func:multiloop_protector; must not be supplied manually.

TYPE: Database | None DEFAULT: None

RETURNS DESCRIPTION
bool

True if the pool was actually torn down, False if only the ref-counter was decremented.

TYPE: bool

Source code in databasez/core/database.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
@multiloop_protector(True, inject_parent=True)
async def disconnect(
    self, force: bool = False, *, parent_database: Database | None = None
) -> bool:
    """Close all connections in the connection pool.

    Args:
        force: If ``True``, disconnect even when the ref-counter is
            above zero.
        parent_database: Injected by :func:`multiloop_protector`;
            must not be supplied manually.

    Returns:
        bool: ``True`` if the pool was actually torn down, ``False``
            if only the ref-counter was decremented.
    """
    # parent_database is injected and should not be specified manually
    force_disconnect_with_refcount = False
    if force:
        async with self.ref_lock:
            force_disconnect_with_refcount = self.ref_counter > 0
            self.ref_counter = 0
    elif not await self.decr_refcount():
        if not self.is_connected:
            logger.debug("Already disconnected, skip disconnecting")
            return False
        return False

    if force and force_disconnect_with_refcount:
        logger.warning("Force disconnect, despite refcount not 0")
    if not self.is_connected:
        logger.debug("Already disconnected, skip disconnecting")
        return False

    if parent_database is not None:
        loop = asyncio.get_running_loop()
        parent_database._databases_map.pop(loop, None)
    if force and self._databases_map:
        for sub_database in self._databases_map.values():
            await arun_coroutine_threadsafe(
                sub_database.disconnect(True),
                sub_database._loop,
                self.poll_interval,
            )
        self._databases_map = {}
    assert not self._databases_map, "sub databases still active"

    try:
        assert self._global_connection is not None
        if self._remove_global_connection:
            await self._global_connection.__aexit__()
            self._global_connection = None
        self._connection = None
    finally:
        self.is_connected = False
        await self.backend.disconnect()
        self._loop = None
        if self._call_hooks:
            await self.disconnect_hook()
    return True

fetch_all async

fetch_all(query, values=None, timeout=None)

Execute query and return all result rows.

PARAMETER DESCRIPTION
query

SQL string or clause element.

TYPE: ClauseElement | str

values

Optional bind parameters.

TYPE: dict | None DEFAULT: None

timeout

Optional timeout in seconds.

TYPE: float | None DEFAULT: None

RETURNS DESCRIPTION
list[Record]

list[interfaces.Record]: All result rows.

Source code in databasez/core/database.py
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
async def fetch_all(
    self,
    query: ClauseElement | str,
    values: dict | None = None,
    timeout: float | None = None,
) -> list[interfaces.Record]:
    """Execute *query* and return all result rows.

    Args:
        query: SQL string or clause element.
        values: Optional bind parameters.
        timeout: Optional timeout in seconds.

    Returns:
        list[interfaces.Record]: All result rows.
    """
    async with self.connection() as connection:
        return await connection.fetch_all(query, values, timeout=timeout)

fetch_one async

fetch_one(query, values=None, pos=0, timeout=None)

Execute query and return a single row.

PARAMETER DESCRIPTION
query

SQL string or clause element.

TYPE: ClauseElement | str

values

Optional bind parameters.

TYPE: dict | None DEFAULT: None

pos

Row position (0-based, -1 for last).

TYPE: int DEFAULT: 0

timeout

Optional timeout in seconds.

TYPE: float | None DEFAULT: None

RETURNS DESCRIPTION
Record | None

interfaces.Record | None: The row, or None.

Source code in databasez/core/database.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
async def fetch_one(
    self,
    query: ClauseElement | str,
    values: dict | None = None,
    pos: int = 0,
    timeout: float | None = None,
) -> interfaces.Record | None:
    """Execute *query* and return a single row.

    Args:
        query: SQL string or clause element.
        values: Optional bind parameters.
        pos: Row position (0-based, ``-1`` for last).
        timeout: Optional timeout in seconds.

    Returns:
        interfaces.Record | None: The row, or ``None``.
    """
    async with self.connection() as connection:
        return await connection.fetch_one(query, values, pos=pos, timeout=timeout)

fetch_val async

fetch_val(
    query, values=None, column=0, pos=0, timeout=None
)

Execute query and return a single scalar value.

PARAMETER DESCRIPTION
query

SQL string or clause element.

TYPE: ClauseElement | str

values

Optional bind parameters.

TYPE: dict | None DEFAULT: None

column

Column index or name.

TYPE: int | str DEFAULT: 0

pos

Row position.

TYPE: int DEFAULT: 0

timeout

Optional timeout in seconds.

TYPE: float | None DEFAULT: None

RETURNS DESCRIPTION
Any

The scalar value, or None.

TYPE: Any

Source code in databasez/core/database.py
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
async def fetch_val(
    self,
    query: ClauseElement | str,
    values: dict | None = None,
    column: int | str = 0,
    pos: int = 0,
    timeout: float | None = None,
) -> Any:
    """Execute *query* and return a single scalar value.

    Args:
        query: SQL string or clause element.
        values: Optional bind parameters.
        column: Column index or name.
        pos: Row position.
        timeout: Optional timeout in seconds.

    Returns:
        Any: The scalar value, or ``None``.
    """
    async with self.connection() as connection:
        return await connection.fetch_val(
            query,
            values,
            column=column,
            pos=pos,
            timeout=timeout,
        )

execute async

execute(query, values=None, timeout=None)

Execute a statement and return a concise result.

PARAMETER DESCRIPTION
query

SQL string or clause element.

TYPE: ClauseElement | str

values

Optional bind parameters.

TYPE: Any DEFAULT: None

timeout

Optional timeout in seconds.

TYPE: float | None DEFAULT: None

RETURNS DESCRIPTION
Record | int

interfaces.Record | int: Primary-key row / rowcount.

Source code in databasez/core/database.py
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
async def execute(
    self,
    query: ClauseElement | str,
    values: Any = None,
    timeout: float | None = None,
) -> interfaces.Record | int:
    """Execute a statement and return a concise result.

    Args:
        query: SQL string or clause element.
        values: Optional bind parameters.
        timeout: Optional timeout in seconds.

    Returns:
        interfaces.Record | int: Primary-key row / rowcount.
    """
    async with self.connection() as connection:
        return await connection.execute(query, values, timeout=timeout)

execute_many async

execute_many(query, values=None, timeout=None)

Execute a statement with multiple parameter sets.

PARAMETER DESCRIPTION
query

SQL string or clause element.

TYPE: ClauseElement | str

values

A sequence of parameter mappings.

TYPE: Any DEFAULT: None

timeout

Optional timeout in seconds.

TYPE: float | None DEFAULT: None

RETURNS DESCRIPTION
Sequence[Record] | int

Sequence[interfaces.Record] | int: Primary-key rows / rowcount.

Source code in databasez/core/database.py
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
async def execute_many(
    self,
    query: ClauseElement | str,
    values: Any = None,
    timeout: float | None = None,
) -> Sequence[interfaces.Record] | int:
    """Execute a statement with multiple parameter sets.

    Args:
        query: SQL string or clause element.
        values: A sequence of parameter mappings.
        timeout: Optional timeout in seconds.

    Returns:
        Sequence[interfaces.Record] | int: Primary-key rows / rowcount.
    """
    async with self.connection() as connection:
        return await connection.execute_many(query, values, timeout=timeout)

iterate async

iterate(query, values=None, chunk_size=None, timeout=None)

Execute query and yield rows one by one.

PARAMETER DESCRIPTION
query

SQL string or clause element.

TYPE: ClauseElement | str

values

Optional bind parameters.

TYPE: dict | None DEFAULT: None

chunk_size

Backend batch-size hint.

TYPE: int | None DEFAULT: None

timeout

Per-row timeout in seconds.

TYPE: float | None DEFAULT: None

YIELDS DESCRIPTION
AsyncGenerator[Record, None]

interfaces.Record: Result rows.

Source code in databasez/core/database.py
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
async def iterate(
    self,
    query: ClauseElement | str,
    values: dict | None = None,
    chunk_size: int | None = None,
    timeout: float | None = None,
) -> AsyncGenerator[interfaces.Record, None]:
    """Execute *query* and yield rows one by one.

    Args:
        query: SQL string or clause element.
        values: Optional bind parameters.
        chunk_size: Backend batch-size hint.
        timeout: Per-row timeout in seconds.

    Yields:
        interfaces.Record: Result rows.
    """
    async with self.connection() as connection:
        async for record in connection.iterate(query, values, chunk_size, timeout=timeout):
            yield record

batched_iterate async

batched_iterate(
    query,
    values=None,
    batch_size=None,
    batch_wrapper=tuple,
    timeout=None,
)

Execute query and yield rows in batches.

PARAMETER DESCRIPTION
query

SQL string or clause element.

TYPE: ClauseElement | str

values

Optional bind parameters.

TYPE: dict | None DEFAULT: None

batch_size

Rows per batch.

TYPE: int | None DEFAULT: None

batch_wrapper

Callable to transform each batch.

TYPE: BatchCallable DEFAULT: tuple

timeout

Per-batch timeout in seconds.

TYPE: float | None DEFAULT: None

YIELDS DESCRIPTION
BatchCallableResult

Batches of result rows.

TYPE:: AsyncGenerator[BatchCallableResult, None]

Source code in databasez/core/database.py
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
async def batched_iterate(
    self,
    query: ClauseElement | str,
    values: dict | None = None,
    batch_size: int | None = None,
    batch_wrapper: BatchCallable = tuple,
    timeout: float | None = None,
) -> AsyncGenerator[BatchCallableResult, None]:
    """Execute *query* and yield rows in batches.

    Args:
        query: SQL string or clause element.
        values: Optional bind parameters.
        batch_size: Rows per batch.
        batch_wrapper: Callable to transform each batch.
        timeout: Per-batch timeout in seconds.

    Yields:
        BatchCallableResult: Batches of result rows.
    """
    async with self.connection() as connection:
        async for batch in cast(
            AsyncGenerator["BatchCallableResult", None],
            connection.batched_iterate(
                query,
                values,
                batch_wrapper=batch_wrapper,
                batch_size=batch_size,
                timeout=timeout,
            ),
        ):
            yield batch

transaction

transaction(*, force_rollback=False, **kwargs)

Create a new :class:Transaction on the current connection.

PARAMETER DESCRIPTION
force_rollback

If True, always roll back on exit.

TYPE: bool DEFAULT: False

**kwargs

Extra options forwarded to the transaction backend.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Transaction

A new transaction instance.

TYPE: Transaction

Source code in databasez/core/database.py
707
708
709
710
711
712
713
714
715
716
717
def transaction(self, *, force_rollback: bool = False, **kwargs: Any) -> Transaction:
    """Create a new :class:`Transaction` on the current connection.

    Args:
        force_rollback: If ``True``, always roll back on exit.
        **kwargs: Extra options forwarded to the transaction backend.

    Returns:
        Transaction: A new transaction instance.
    """
    return Transaction(self.connection, force_rollback=force_rollback, **kwargs)

run_sync async

run_sync(fn, *args, timeout=None, **kwargs)

Run a synchronous callable on the current connection.

PARAMETER DESCRIPTION
fn

A synchronous function.

TYPE: Callable[..., Any]

*args

Positional arguments.

TYPE: Any DEFAULT: ()

timeout

Optional timeout in seconds.

TYPE: float | None DEFAULT: None

**kwargs

Keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Any

The return value of fn.

TYPE: Any

Source code in databasez/core/database.py
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
async def run_sync(
    self,
    fn: Callable[..., Any],
    *args: Any,
    timeout: float | None = None,
    **kwargs: Any,
) -> Any:
    """Run a synchronous callable on the current connection.

    Args:
        fn: A synchronous function.
        *args: Positional arguments.
        timeout: Optional timeout in seconds.
        **kwargs: Keyword arguments.

    Returns:
        Any: The return value of *fn*.
    """
    async with self.connection() as connection:
        return await connection.run_sync(fn, *args, **kwargs, timeout=timeout)

create_all async

create_all(meta, timeout=None, **kwargs)

Create all tables defined in meta.

PARAMETER DESCRIPTION
meta

A SQLAlchemy :class:~sqlalchemy.MetaData.

TYPE: MetaData

timeout

Optional timeout in seconds.

TYPE: float | None DEFAULT: None

**kwargs

Extra arguments for meta.create_all.

TYPE: Any DEFAULT: {}

Source code in databasez/core/database.py
740
741
742
743
744
745
746
747
748
749
750
751
async def create_all(
    self, meta: MetaData, timeout: float | None = None, **kwargs: Any
) -> None:
    """Create all tables defined in *meta*.

    Args:
        meta: A SQLAlchemy :class:`~sqlalchemy.MetaData`.
        timeout: Optional timeout in seconds.
        **kwargs: Extra arguments for ``meta.create_all``.
    """
    async with self.connection() as connection:
        await connection.create_all(meta, **kwargs, timeout=timeout)

drop_all async

drop_all(meta, timeout=None, **kwargs)

Drop all tables defined in meta.

PARAMETER DESCRIPTION
meta

A SQLAlchemy :class:~sqlalchemy.MetaData.

TYPE: MetaData

timeout

Optional timeout in seconds.

TYPE: float | None DEFAULT: None

**kwargs

Extra arguments for meta.drop_all.

TYPE: Any DEFAULT: {}

Source code in databasez/core/database.py
753
754
755
756
757
758
759
760
761
762
async def drop_all(self, meta: MetaData, timeout: float | None = None, **kwargs: Any) -> None:
    """Drop all tables defined in *meta*.

    Args:
        meta: A SQLAlchemy :class:`~sqlalchemy.MetaData`.
        timeout: Optional timeout in seconds.
        **kwargs: Extra arguments for ``meta.drop_all``.
    """
    async with self.connection() as connection:
        await connection.drop_all(meta, **kwargs, timeout=timeout)

_non_global_connection

_non_global_connection(timeout=None)

Return or create the per-task connection (non-global).

RETURNS DESCRIPTION
Connection

The connection for the current task.

TYPE: Connection

Source code in databasez/core/database.py
764
765
766
767
768
769
770
771
772
773
774
775
776
777
@multiloop_protector(False)
def _non_global_connection(
    self,
    timeout: float | None = None,  # stub for multiloop_protector
) -> Connection:
    """Return or create the per-task connection (non-global).

    Returns:
        Connection: The connection for the current task.
    """
    if self._connection is None:
        _connection = self._connection = Connection(self)
        return _connection
    return self._connection

connection

connection(timeout=None)

Return a connection suitable for the current context.

In force-rollback mode the global connection is returned; otherwise a per-task connection is returned (created on demand).

PARAMETER DESCRIPTION
timeout

Optional timeout for cross-loop proxying.

TYPE: float | None DEFAULT: None

RETURNS DESCRIPTION
Connection

The active connection.

TYPE: Connection

RAISES DESCRIPTION
RuntimeError

If the database is not connected.

Source code in databasez/core/database.py
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
def connection(self, timeout: float | None = None) -> Connection:
    """Return a connection suitable for the current context.

    In force-rollback mode the global connection is returned; otherwise
    a per-task connection is returned (created on demand).

    Args:
        timeout: Optional timeout for cross-loop proxying.

    Returns:
        Connection: The active connection.

    Raises:
        RuntimeError: If the database is not connected.
    """
    if not self.is_connected:
        raise RuntimeError("Database is not connected")
    if self.force_rollback:
        return cast(Connection, self._global_connection)
    return self._non_global_connection(timeout=timeout)

asgi

asgi(
    app: None, handle_lifespan: bool = False
) -> Callable[[ASGIApp], ASGIApp]
asgi(
    app: ASGIApp, handle_lifespan: bool = False
) -> ASGIApp
asgi(app=None, handle_lifespan=False)

Return wrapper for asgi integration.

Source code in databasez/core/database.py
820
821
822
823
824
825
826
827
828
829
830
831
832
833
def asgi(
    self,
    app: ASGIApp | None = None,
    handle_lifespan: bool = False,
) -> ASGIApp | Callable[[ASGIApp], ASGIApp]:
    """Return wrapper for asgi integration."""

    async def setup() -> contextlib.AsyncExitStack:
        cleanupstack = contextlib.AsyncExitStack()
        await self.connect()
        cleanupstack.push_async_callback(self.disconnect)
        return cleanupstack

    return LifespanHook(app, setup=setup, do_forward=not handle_lifespan)

get_backends classmethod

get_backends(
    scheme="",
    *,
    overwrite_paths=("databasez.overwrites",),
    database_name="Database",
    connection_name="Connection",
    transaction_name="Transaction",
    database_class=None,
    connection_class=None,
    transaction_class=None,
)

Discover backend classes for the given scheme.

Searches overwrite_paths for modules that export Database, Connection, and Transaction classes matching the scheme. Falls back to the supplied defaults.

PARAMETER DESCRIPTION
scheme

The dialect scheme (e.g. "postgresql+asyncpg").

TYPE: str DEFAULT: ''

overwrite_paths

Module paths searched for overwrite modules.

TYPE: Sequence[str] DEFAULT: ('databasez.overwrites',)

database_name

Attribute name for the database backend class.

TYPE: str DEFAULT: 'Database'

connection_name

Attribute name for the connection backend class.

TYPE: str DEFAULT: 'Connection'

transaction_name

Attribute name for the transaction backend class.

TYPE: str DEFAULT: 'Transaction'

database_class

Fallback database backend class.

TYPE: type[DatabaseBackend] | None DEFAULT: None

connection_class

Fallback connection backend class.

TYPE: type[ConnectionBackend] | None DEFAULT: None

transaction_class

Fallback transaction backend class.

TYPE: type[TransactionBackend] | None DEFAULT: None

RETURNS DESCRIPTION
tuple

(database_class, connection_class, transaction_class).

TYPE: tuple[type[DatabaseBackend], type[ConnectionBackend], type[TransactionBackend]]

RAISES DESCRIPTION
AssertionError

If a discovered class does not subclass its expected abstract base.

Source code in databasez/core/database.py
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
@classmethod
def get_backends(
    cls,
    # let scheme empty for direct imports
    scheme: str = "",
    *,
    overwrite_paths: Sequence[str] = ("databasez.overwrites",),
    database_name: str = "Database",
    connection_name: str = "Connection",
    transaction_name: str = "Transaction",
    database_class: type[interfaces.DatabaseBackend] | None = None,
    connection_class: type[interfaces.ConnectionBackend] | None = None,
    transaction_class: type[interfaces.TransactionBackend] | None = None,
) -> tuple[
    type[interfaces.DatabaseBackend],
    type[interfaces.ConnectionBackend],
    type[interfaces.TransactionBackend],
]:
    """Discover backend classes for the given scheme.

    Searches *overwrite_paths* for modules that export ``Database``,
    ``Connection``, and ``Transaction`` classes matching the scheme.
    Falls back to the supplied defaults.

    Args:
        scheme: The dialect scheme (e.g. ``"postgresql+asyncpg"``).
        overwrite_paths: Module paths searched for overwrite modules.
        database_name: Attribute name for the database backend class.
        connection_name: Attribute name for the connection backend class.
        transaction_name: Attribute name for the transaction backend class.
        database_class: Fallback database backend class.
        connection_class: Fallback connection backend class.
        transaction_class: Fallback transaction backend class.

    Returns:
        tuple: ``(database_class, connection_class, transaction_class)``.

    Raises:
        AssertionError: If a discovered class does not subclass its
            expected abstract base.
    """
    module: Any = None
    # when not using utils...., the constant cannot be changed at runtime for debug purposes
    more_debug = utils.DATABASEZ_OVERWRITE_LOGGING
    for overwrite_path in overwrite_paths:
        imp_path = f"{overwrite_path}.{scheme.replace('+', '_')}" if scheme else overwrite_path
        try:
            module = importlib.import_module(imp_path)
        except ImportError as exc:
            logging.debug(
                f'Could not import "{imp_path}". Continue search.',
                exc_info=exc if more_debug else None,
            )
            if "+" in scheme:
                imp_path = f"{overwrite_path}.{scheme.split('+', 1)[0]}"
                try:
                    module = importlib.import_module(imp_path)
                except ImportError as exc:
                    logging.debug(
                        f'Could not import "{imp_path}". Continue search.',
                        exc_info=exc if more_debug else None,
                    )
        if module is not None:
            break
    if module is None:
        logging.debug(
            "No overwrites found. Use default.",
        )
    database_class = getattr(module, database_name, database_class)
    assert database_class is not None and issubclass(
        database_class, interfaces.DatabaseBackend
    )
    connection_class = getattr(module, connection_name, connection_class)
    assert connection_class is not None and issubclass(
        connection_class, interfaces.ConnectionBackend
    )
    transaction_class = getattr(module, transaction_name, transaction_class)
    assert transaction_class is not None and issubclass(
        transaction_class, interfaces.TransactionBackend
    )
    return database_class, connection_class, transaction_class

apply_database_url_and_options classmethod

apply_database_url_and_options(
    url,
    *,
    overwrite_paths=("databasez.overwrites",),
    **options,
)

Build a backend instance and normalise URL + options.

Discovers the correct backend classes for the URL's scheme, instantiates the database backend, and calls :meth:~DatabaseBackend.extract_options to normalise the URL and merge query-string options.

PARAMETER DESCRIPTION
url

The original database URL.

TYPE: DatabaseURL | str

overwrite_paths

Module paths searched for overwrite modules.

TYPE: Sequence[str] DEFAULT: ('databasez.overwrites',)

**options

Additional caller-supplied engine options.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
tuple

(backend, cleaned_url, options).

TYPE: tuple[DatabaseBackend, DatabaseURL, dict[str, Any]]

RAISES DESCRIPTION
AssertionError

If the resolved dialect is not async.

Source code in databasez/core/database.py
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
@classmethod
def apply_database_url_and_options(
    cls,
    url: DatabaseURL | str,
    *,
    overwrite_paths: Sequence[str] = ("databasez.overwrites",),
    **options: Any,
) -> tuple[interfaces.DatabaseBackend, DatabaseURL, dict[str, Any]]:
    """Build a backend instance and normalise URL + options.

    Discovers the correct backend classes for the URL's scheme,
    instantiates the database backend, and calls
    :meth:`~DatabaseBackend.extract_options` to normalise the URL and
    merge query-string options.

    Args:
        url: The original database URL.
        overwrite_paths: Module paths searched for overwrite modules.
        **options: Additional caller-supplied engine options.

    Returns:
        tuple: ``(backend, cleaned_url, options)``.

    Raises:
        AssertionError: If the resolved dialect is not async.
    """
    url = DatabaseURL(url)
    database_class, connection_class, transaction_class = cls.get_backends(
        url.scheme,
        database_class=default_database,
        connection_class=default_connection,
        transaction_class=default_transaction,
        overwrite_paths=overwrite_paths,
    )

    backend = database_class(
        connection_class=connection_class, transaction_class=transaction_class
    )
    url, options = backend.extract_options(url, **options)
    # check against transformed url
    assert url.sqla_url.get_dialect(True).is_async, f'Dialect: "{url.scheme}" is not async.'

    return backend, url, options