콘텐츠로 이동

spakky-sqlalchemy

SQLAlchemy 통합 — ORM, Repository, Transaction, Outbox Storage

Main

spakky.plugins.sqlalchemy.main

initialize(app)

Initialize the SQLAlchemy plugin.

Registers SQLAlchemy configuration, schema registry, session managers, and transaction handlers. Async Pods (AsyncSessionManager, AsyncTransaction) are only registered when support_async_mode is True in the configuration.

When spakky-outbox is installed, Outbox storage implementations are automatically registered (runtime detection).

Parameters:

Name Type Description Default
app SpakkyApplication

The Spakky application instance.

required
Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/main.py
def initialize(app: SpakkyApplication) -> None:
    """Initialize the SQLAlchemy plugin.

    Registers SQLAlchemy configuration, schema registry, session managers,
    and transaction handlers. Async Pods (AsyncSessionManager, AsyncTransaction)
    are only registered when support_async_mode is True in the configuration.

    When spakky-outbox is installed, Outbox storage implementations are
    automatically registered (runtime detection).

    Args:
        app: The Spakky application instance.
    """
    config = SQLAlchemyConnectionConfig()

    app.add(SQLAlchemyConnectionConfig)
    app.add(SchemaRegistry)

    app.add(ConnectionManager)
    app.add(SessionManager)
    app.add(Transaction)

    if config.support_async_mode:
        app.add(AsyncConnectionManager)
        app.add(AsyncSessionManager)
        app.add(AsyncTransaction)

    if _HAS_OUTBOX:
        app.add(OutboxMessageTable)
        app.add(SqlAlchemyOutboxStorage)
        if config.support_async_mode:
            app.add(AsyncSqlAlchemyOutboxStorage)

options: show_root_heading: false

Configuration

spakky.plugins.sqlalchemy.common.config

SQLAlchemyConnectionConfig()

Bases: BaseSettings

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/common/config.py
def __init__(self) -> None:
    super().__init__()

connection_string instance-attribute

SQLAlchemy database connection URL (e.g. postgresql+psycopg://user:pass@host/db).

echo = False class-attribute instance-attribute

If True, the engine logs all SQL statements to the default logger.

echo_pool = False class-attribute instance-attribute

If True, the connection pool logs all checkout/checkin events.

DEFAULT_POOL_SIZE = 5 class-attribute

Default number of connections in the pool.

DEFAULT_POOL_MAX_OVERFLOW = 10 class-attribute

Default maximum overflow connections.

DEFAULT_POOL_TIMEOUT = 30.0 class-attribute

Default seconds to wait when pool is exhausted.

DEFAULT_POOL_RECYCLE = -1 class-attribute

Default connection recycle time (-1 = disabled).

DEFAULT_POOL_PRE_PING = False class-attribute

Default pre-ping setting.

pool_size = DEFAULT_POOL_SIZE class-attribute instance-attribute

Number of connections to maintain persistently in the pool.

pool_max_overflow = DEFAULT_POOL_MAX_OVERFLOW class-attribute instance-attribute

Maximum number of connections that can be opened beyond pool_size.

pool_timeout = DEFAULT_POOL_TIMEOUT class-attribute instance-attribute

Seconds to wait before raising an error when the pool is exhausted.

pool_recycle = DEFAULT_POOL_RECYCLE class-attribute instance-attribute

Recycle connections after this many seconds to prevent stale connections. Recommended when connecting through a proxy or firewall with idle timeouts.

pool_pre_ping = DEFAULT_POOL_PRE_PING class-attribute instance-attribute

If True, tests each connection for liveness before returning it from the pool. Prevents errors caused by connections dropped by the database server.

session_autoflush = True class-attribute instance-attribute

If True, the session automatically flushes pending changes before queries.

session_expire_on_commit = True class-attribute instance-attribute

If True, ORM objects are expired after each commit, forcing a reload on next access.

autocommit = True class-attribute instance-attribute

If True, transactions are automatically committed after with statements. If False, transactions must be manually committed or rolled back.

support_async_mode = True class-attribute instance-attribute

If True, registers async Pods (AsyncSessionManager, AsyncTransaction). Set to False when using database drivers that don't support async operations.

options: show_root_heading: false

ORM

spakky.plugins.sqlalchemy.orm.table

AbstractTable

Bases: DeclarativeBase, AsyncAttrs

Base table class for SQLAlchemy ORM integration with Spakky.

Use this class for infrastructure tables that don't map to domain models (e.g., outbox tables, audit logs). For tables that map to domain entities, use AbstractMappableTable instead.

AbstractMappableTable

Bases: AbstractTable, Generic[ObjectT]

Table class with domain object mapping support.

Use this class for tables that map to domain entities. Subclasses must implement from_domain() and to_domain() methods for bidirectional mapping.

options: show_root_heading: false

spakky.plugins.sqlalchemy.orm.schema_registry

SchemaRegistry()

Bases: ITagRegistryAware

Registry that maps domain types to SQLAlchemy table schemas.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/orm/schema_registry.py
def __init__(self) -> None:
    self._metadata = MetaData()
    self._domain_to_table_map = {}
    self._table_to_domain_map = {}

metadata property

Return MetaData containing only tables registered via @Table tag.

This filters out tables that were defined but not scanned by the SpakkyApplication, ensuring only application-scoped tables are included.

Returns:

Type Description
MetaData

MetaData with only @Table-tagged tables.

get_type(domain_type)

Look up the table class registered for the given domain type.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/orm/schema_registry.py
def get_type(
    self, domain_type: type[ObjectT]
) -> type[AbstractMappableTable[ObjectT]]:
    """Look up the table class registered for the given domain type."""
    table = self._domain_to_table_map.get(domain_type)
    if table is None:
        raise NoSchemaFoundFromDomainError(domain_type)
    return cast(type[AbstractMappableTable[ObjectT]], table)

from_domain(domain)

Convert a domain object to its corresponding table instance.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/orm/schema_registry.py
def from_domain(self, domain: ObjectT) -> AbstractMappableTable[ObjectT]:
    """Convert a domain object to its corresponding table instance."""
    table: type[AbstractMappableTable[ObjectT]] | None = (
        self._domain_to_table_map.get(type(domain))
    )
    if table is None:
        raise NoSchemaFoundFromDomainError(domain)
    return table.from_domain(domain)

options: show_root_heading: false

spakky.plugins.sqlalchemy.orm.error

AbstractSpakkySqlAlchemyORMError

Bases: AbstractSpakkySqlAlchemyError, ABC

Base exception for Spakky SQLAlchemy ORM errors.

options: show_root_heading: false

Persistency

spakky.plugins.sqlalchemy.persistency.repository

CannotDetermineAggregateTypeError

Bases: AbstractSpakkySqlAlchemyPersistencyError

Raised when aggregate type cannot be resolved from generic parameters.

AbstractGenericRepository(session_manager, schema_registry, aggregate_collector)

Bases: IGenericRepository[AggregateRootT, AggregateIdT_contra], ABC

Generic repository implementation for SQLAlchemy.

This class provides basic CRUD operations using SQLAlchemy sessions. It serves as a base class for specific entity repositories.

Resolve aggregate type from generic parameters and store dependencies.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/repository.py
def __init__(
    self,
    session_manager: SessionManager,
    schema_registry: SchemaRegistry,
    aggregate_collector: AggregateCollector,
) -> None:
    """Resolve aggregate type from generic parameters and store dependencies."""
    parameterized_base = next(
        (
            type_
            for type_ in generic_mro(type(self))
            if get_origin(type_) is AbstractGenericRepository
        ),
        None,
    )
    if parameterized_base is None:
        raise CannotDetermineAggregateTypeError(type(self))
    self._aggregate_type = next(iter(get_args(parameterized_base)))
    self._session_manager = session_manager
    self._schema_registry = schema_registry
    self._aggregate_collector = aggregate_collector

AbstractAsyncGenericRepository(session_manager, schema_registry, aggregate_collector)

Bases: IAsyncGenericRepository[AggregateRootT, AggregateIdT_contra], ABC

Async generic repository implementation for SQLAlchemy.

This class provides basic CRUD operations using SQLAlchemy's async sessions. It serves as a base class for specific entity repositories that require async support.

Resolve aggregate type from generic parameters and store dependencies.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/repository.py
def __init__(
    self,
    session_manager: AsyncSessionManager,
    schema_registry: SchemaRegistry,
    aggregate_collector: AggregateCollector,
) -> None:
    """Resolve aggregate type from generic parameters and store dependencies."""
    parameterized_base = next(
        (
            type_
            for type_ in generic_mro(type(self))
            if get_origin(type_) is AbstractAsyncGenericRepository
        ),
        None,
    )
    if parameterized_base is None:
        raise CannotDetermineAggregateTypeError(type(self))
    self._aggregate_type = next(iter(get_args(parameterized_base)))
    self._session_manager = session_manager
    self._schema_registry = schema_registry
    self._aggregate_collector = aggregate_collector

options: show_root_heading: false

spakky.plugins.sqlalchemy.persistency.transaction

Transaction(config, session_manager)

Bases: AbstractTransaction

SQLAlchemy synchronous transaction implementation.

Initialize with autocommit config and session manager.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/transaction.py
def __init__(
    self,
    config: SQLAlchemyConnectionConfig,
    session_manager: SessionManager,
) -> None:
    """Initialize with autocommit config and session manager."""
    super().__init__(autocommit=config.autocommit)
    self._session_manager = session_manager

initialize()

Open a new session for this transaction scope.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/transaction.py
def initialize(self) -> None:
    """Open a new session for this transaction scope."""
    self._session_manager.open()

dispose()

Close the session after transaction completes.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/transaction.py
def dispose(self) -> None:
    """Close the session after transaction completes."""
    self._session_manager.close()

commit()

Commit the current session.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/transaction.py
def commit(self) -> None:
    """Commit the current session."""
    self._session_manager.session.commit()

rollback()

Roll back the current session.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/transaction.py
def rollback(self) -> None:
    """Roll back the current session."""
    self._session_manager.session.rollback()

AsyncTransaction(config, session_manager)

Bases: AbstractAsyncTransaction

SQLAlchemy asynchronous transaction implementation.

Initialize with autocommit config and async session manager.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/transaction.py
def __init__(
    self,
    config: SQLAlchemyConnectionConfig,
    session_manager: AsyncSessionManager,
) -> None:
    """Initialize with autocommit config and async session manager."""
    super().__init__(autocommit=config.autocommit)
    self._session_manager = session_manager

initialize() async

Open a new async session for this transaction scope.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/transaction.py
async def initialize(self) -> None:
    """Open a new async session for this transaction scope."""
    await self._session_manager.open()

dispose() async

Close the async session after transaction completes.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/transaction.py
async def dispose(self) -> None:
    """Close the async session after transaction completes."""
    await self._session_manager.close()

commit() async

Commit the current async session.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/transaction.py
async def commit(self) -> None:
    """Commit the current async session."""
    await self._session_manager.session.commit()

rollback() async

Roll back the current async session.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/transaction.py
async def rollback(self) -> None:
    """Roll back the current async session."""
    await self._session_manager.session.rollback()

options: show_root_heading: false

spakky.plugins.sqlalchemy.persistency.session_manager

SessionNotInitializedError

Bases: AbstractSpakkySqlAlchemyPersistencyError

Raised when trying to access a session that has not been initialized.

SessionManager(connection_manager)

Bases: IApplicationContextAware

Manages scoped synchronous SQLAlchemy sessions.

Initialize with engine from connection manager.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/session_manager.py
def __init__(self, connection_manager: ConnectionManager) -> None:
    """Initialize with engine from connection manager."""
    self._engine = connection_manager.connection
    self._current_session = None

open()

Open a new scoped session.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/session_manager.py
def open(self) -> None:
    """Open a new scoped session."""
    self._current_session = self._scoped_session()

close()

Close the current session and remove the scoped session.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/session_manager.py
def close(self) -> None:
    """Close the current session and remove the scoped session."""
    if self._current_session is not None:
        self._current_session.close()
    self._scoped_session.remove()
    self._current_session = None

AsyncSessionManager(connection_manager)

Bases: IApplicationContextAware

Manages scoped asynchronous SQLAlchemy sessions.

Initialize with engine from async connection manager.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/session_manager.py
def __init__(self, connection_manager: AsyncConnectionManager) -> None:
    """Initialize with engine from async connection manager."""
    self._engine = connection_manager.connection
    self._current_session = None

open() async

Open a new scoped async session.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/session_manager.py
async def open(self) -> None:
    """Open a new scoped async session."""
    self._current_session = self._scoped_session()

close() async

Close the current session and remove the scoped async session.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/session_manager.py
async def close(self) -> None:
    """Close the current session and remove the scoped async session."""
    if self._current_session is not None:
        await self._current_session.close()
    await self._scoped_session.remove()
    self._current_session = None

options: show_root_heading: false

spakky.plugins.sqlalchemy.persistency.connection_manager

Connection managers for SQLAlchemy engines.

This module provides connection managers that handle SQLAlchemy engine lifecycle, including proper disposal on application shutdown.

ConnectionManager(config)

Manages synchronous SQLAlchemy engine lifecycle.

Create a synchronous SQLAlchemy engine from configuration.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/connection_manager.py
def __init__(self, config: SQLAlchemyConnectionConfig) -> None:
    """Create a synchronous SQLAlchemy engine from configuration."""
    self._connection = create_engine(
        url=config.connection_string,
        echo=config.echo,
        echo_pool=config.echo_pool,
        pool_size=config.pool_size,
        max_overflow=config.pool_max_overflow,
        pool_timeout=config.pool_timeout,
        pool_recycle=config.pool_recycle,
        pool_pre_ping=config.pool_pre_ping,
    )

connection property

Get the SQLAlchemy Engine instance.

dispose()

Dispose the engine connection pool.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/connection_manager.py
def dispose(self) -> None:
    """Dispose the engine connection pool."""
    self._connection.dispose()

AsyncConnectionManager(config)

Manages asynchronous SQLAlchemy engine lifecycle.

Note: Does not implement IAsyncService because AsyncEngine.dispose() must be called from the same event loop where connections were created. In test environments with pytest-asyncio, this means dispose must be called from the test's event loop, not ApplicationContext's internal event loop thread.

Create an asynchronous SQLAlchemy engine from configuration.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/connection_manager.py
def __init__(self, config: SQLAlchemyConnectionConfig) -> None:
    """Create an asynchronous SQLAlchemy engine from configuration."""
    self._connection = create_async_engine(
        url=config.connection_string,
        echo=config.echo,
        echo_pool=config.echo_pool,
        pool_size=config.pool_size,
        max_overflow=config.pool_max_overflow,
        pool_timeout=config.pool_timeout,
        pool_recycle=config.pool_recycle,
        pool_pre_ping=config.pool_pre_ping,
    )

connection property

Get the async SQLAlchemy Engine instance.

dispose() async

Dispose the engine connection pool.

Must be called from the same event loop where connections were used.

Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/persistency/connection_manager.py
async def dispose(self) -> None:
    """Dispose the engine connection pool.

    Must be called from the same event loop where connections were used.
    """
    await self._connection.dispose()

options: show_root_heading: false

spakky.plugins.sqlalchemy.persistency.error

AbstractSpakkySqlAlchemyPersistencyError

Bases: AbstractSpakkySqlAlchemyError, ABC

Base exception for Spakky SQLAlchemy persistency errors.

options: show_root_heading: false

Outbox

spakky.plugins.sqlalchemy.outbox.storage

SQLAlchemy implementation of IOutboxStorage / IAsyncOutboxStorage.

SqlAlchemyOutboxStorage(session_manager, connection_manager, config=None)

Bases: IOutboxStorage

Synchronous SQLAlchemy-based Outbox storage implementation.

  • save(): uses the current transactional session (same TX as business data).
  • fetch_pending/mark_published/increment_retry: use independent sessions.
Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/outbox/storage.py
def __init__(
    self,
    session_manager: SessionManager,
    connection_manager: ConnectionManager,
    config: OutboxConfig | None = None,
) -> None:
    self._session_manager = session_manager
    self._session_factory = sessionmaker(
        bind=connection_manager.connection,
        expire_on_commit=False,
    )
    self._claim_timeout_seconds = (
        config.claim_timeout_seconds if config else _DEFAULT_CLAIM_TIMEOUT_SECONDS
    )

AsyncSqlAlchemyOutboxStorage(session_manager, connection_manager, config=None)

Bases: IAsyncOutboxStorage

Asynchronous SQLAlchemy-based Outbox storage implementation.

  • save(): uses the current transactional session (same TX as business data).
  • fetch_pending/mark_published/increment_retry: use independent sessions.
Source code in plugins/spakky-sqlalchemy/src/spakky/plugins/sqlalchemy/outbox/storage.py
def __init__(
    self,
    session_manager: AsyncSessionManager,
    connection_manager: AsyncConnectionManager,
    config: OutboxConfig | None = None,
) -> None:
    self._session_manager = session_manager
    self._session_factory = async_sessionmaker(
        bind=connection_manager.connection,
        expire_on_commit=False,
    )
    self._claim_timeout_seconds = (
        config.claim_timeout_seconds if config else _DEFAULT_CLAIM_TIMEOUT_SECONDS
    )

options: show_root_heading: false

spakky.plugins.sqlalchemy.outbox.table

SQLAlchemy table definition for the Outbox pattern.

OutboxMessageTable

Bases: AbstractTable

Outbox message table for transactional outbox pattern.

This is an infrastructure table that doesn't map to a domain model, so it inherits from AbstractTable (not AbstractMappableTable).

options: show_root_heading: false

Errors

spakky.plugins.sqlalchemy.error

AbstractSpakkySqlAlchemyError

Bases: AbstractSpakkyFrameworkError, ABC

Base exception for Spakky SQLAlchemy errors.

options: show_root_heading: false