콘텐츠로 이동

spakky-outbox

spakky-outbox는 Integration Event를 transaction 바깥으로 안전하게 전달하기 위한 Outbox 계약을 제공합니다.

Outbox 패턴 — 이벤트 발행 보장

플러그인 진입점

spakky.outbox.main

Plugin initialization entry point.

initialize(app)

Initialize the Outbox plugin.

Parameters:

Name Type Description Default
app SpakkyApplication

The Spakky application instance.

required
Source code in core/spakky-outbox/src/spakky/outbox/main.py
def initialize(app: SpakkyApplication) -> None:
    """Initialize the Outbox plugin.

    Args:
        app: The Spakky application instance.
    """
    app.add(OutboxConfig)
    app.add(OutboxEventBus)
    app.add(AsyncOutboxEventBus)
    app.add(OutboxRelayBackgroundService)
    app.add(AsyncOutboxRelayBackgroundService)

options: show_root_heading: false

EventBus

spakky.outbox.bus.outbox_event_bus

Outbox Event Bus — sync and async implementations replacing IEventBus/IAsyncEventBus via @Primary.

OutboxEventBus(storage, propagator, auth_snapshot_headers=None)

Bases: IEventBus

Intercepts integration events and stores them in the Outbox table (sync).

Replaces the default DirectEventBus so that events are persisted atomically within the same database transaction as the business data.

Source code in core/spakky-outbox/src/spakky/outbox/bus/outbox_event_bus.py
def __init__(
    self,
    storage: IOutboxStorage,
    propagator: ITracePropagator,
    auth_snapshot_headers: AuthContextSnapshotHeaderInjector | None = None,
) -> None:
    self._storage = storage
    self._propagator = propagator
    self._auth_snapshot_headers = (
        auth_snapshot_headers or AuthContextSnapshotHeaderInjector()
    )

AsyncOutboxEventBus(storage, propagator, auth_snapshot_headers=None)

Bases: IAsyncEventBus

Intercepts integration events and stores them in the Outbox table (async).

Replaces the default AsyncDirectEventBus so that events are persisted atomically within the same database transaction as the business data.

Source code in core/spakky-outbox/src/spakky/outbox/bus/outbox_event_bus.py
def __init__(
    self,
    storage: IAsyncOutboxStorage,
    propagator: ITracePropagator,
    auth_snapshot_headers: AuthContextSnapshotHeaderInjector | None = None,
) -> None:
    self._storage = storage
    self._propagator = propagator
    self._auth_snapshot_headers = (
        auth_snapshot_headers or AuthContextSnapshotHeaderInjector()
    )

options: show_root_heading: false

포트

spakky.outbox.ports.storage

Outbox storage port.

IOutboxStorage

Bases: ABC

Synchronous outbox message storage abstraction.

save(message) abstractmethod

Save message within the current transaction.

Source code in core/spakky-outbox/src/spakky/outbox/ports/storage.py
@abstractmethod
def save(self, message: OutboxMessage) -> None:
    """Save message within the current transaction."""

fetch_pending(limit, max_retry) abstractmethod

Fetch unpublished messages (with lock).

Source code in core/spakky-outbox/src/spakky/outbox/ports/storage.py
@abstractmethod
def fetch_pending(self, limit: int, max_retry: int) -> list[OutboxMessage]:
    """Fetch unpublished messages (with lock)."""

mark_published(message_id) abstractmethod

Mark a message as published.

Source code in core/spakky-outbox/src/spakky/outbox/ports/storage.py
@abstractmethod
def mark_published(self, message_id: UUID) -> None:
    """Mark a message as published."""

increment_retry(message_id) abstractmethod

Increment the retry count of a message.

Source code in core/spakky-outbox/src/spakky/outbox/ports/storage.py
@abstractmethod
def increment_retry(self, message_id: UUID) -> None:
    """Increment the retry count of a message."""

IAsyncOutboxStorage

Bases: ABC

Asynchronous outbox message storage abstraction.

save(message) abstractmethod async

Save message within the current transaction.

Source code in core/spakky-outbox/src/spakky/outbox/ports/storage.py
@abstractmethod
async def save(self, message: OutboxMessage) -> None:
    """Save message within the current transaction."""

fetch_pending(limit, max_retry) abstractmethod async

Fetch unpublished messages (with lock).

Source code in core/spakky-outbox/src/spakky/outbox/ports/storage.py
@abstractmethod
async def fetch_pending(self, limit: int, max_retry: int) -> list[OutboxMessage]:
    """Fetch unpublished messages (with lock)."""

mark_published(message_id) abstractmethod async

Mark a message as published.

Source code in core/spakky-outbox/src/spakky/outbox/ports/storage.py
@abstractmethod
async def mark_published(self, message_id: UUID) -> None:
    """Mark a message as published."""

increment_retry(message_id) abstractmethod async

Increment the retry count of a message.

Source code in core/spakky-outbox/src/spakky/outbox/ports/storage.py
@abstractmethod
async def increment_retry(self, message_id: UUID) -> None:
    """Increment the retry count of a message."""

options: show_root_heading: false

Relay

spakky.outbox.relay

Outbox relay module.

options: show_root_heading: false

spakky.outbox.relay.relay

Outbox Relay Background Services (sync and async).

OutboxRelayBackgroundService(storage, transport, config)

Bases: AbstractBackgroundService

Polls the Outbox storage and relays pending messages to the transport (sync).

Initialize with storage, transport, and config dependencies.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
def __init__(
    self,
    storage: IOutboxStorage,
    transport: IEventTransport,
    config: OutboxConfig,
) -> None:
    """Initialize with storage, transport, and config dependencies."""
    self._storage = storage
    self._transport = transport
    self._config = config

initialize()

No-op initialization for the relay service.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
@override
def initialize(self) -> None:
    """No-op initialization for the relay service."""
    return

dispose()

No-op disposal for the relay service.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
@override
def dispose(self) -> None:
    """No-op disposal for the relay service."""
    return

run()

Poll the outbox storage and relay pending messages until stopped.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
@override
def run(self) -> None:
    """Poll the outbox storage and relay pending messages until stopped."""
    while not self._stop_event.is_set():
        self._relay_batch()
        self._stop_event.wait(timeout=self._config.polling_interval_seconds)

AsyncOutboxRelayBackgroundService(storage, transport, config)

Bases: AbstractAsyncBackgroundService

Polls the Outbox storage and relays pending messages to the transport (async).

Initialize with async storage, transport, and config dependencies.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
def __init__(
    self,
    storage: IAsyncOutboxStorage,
    transport: IAsyncEventTransport,
    config: OutboxConfig,
) -> None:
    """Initialize with async storage, transport, and config dependencies."""
    self._storage = storage
    self._transport = transport
    self._config = config

initialize_async() async

No-op async initialization for the relay service.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
@override
async def initialize_async(self) -> None:
    """No-op async initialization for the relay service."""
    return

dispose_async() async

No-op async disposal for the relay service.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
@override
async def dispose_async(self) -> None:
    """No-op async disposal for the relay service."""
    return

run_async() async

Poll the outbox storage and relay pending messages asynchronously.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
@override
async def run_async(self) -> None:
    """Poll the outbox storage and relay pending messages asynchronously."""
    while not self._stop_event.is_set():
        await self._relay_batch()
        try:
            await wait_for(
                self._stop_event.wait(),
                timeout=self._config.polling_interval_seconds,
            )
            break
        except TimeoutError:
            continue

options: show_root_heading: false

공통

spakky.outbox.common.config

Outbox configuration.

OutboxConfig()

Bases: BaseSettings

Outbox plugin configuration loaded from environment variables.

Load outbox configuration from environment variables.

Source code in core/spakky-outbox/src/spakky/outbox/common/config.py
def __init__(self) -> None:
    """Load outbox configuration from environment variables."""
    super().__init__()

options: show_root_heading: false

spakky.outbox.common.message

Outbox message model.

OutboxMessage(id, event_name, payload, headers, created_at, published_at=None, retry_count=0, claimed_at=None) dataclass

Persistence-agnostic Outbox message model.

options: show_root_heading: false

에러

spakky.outbox.error

Outbox error classes.

AbstractSpakkyOutboxError

Bases: AbstractSpakkyFrameworkError, ABC

Base exception for Spakky Outbox errors.

options: show_root_heading: false

추가 모듈

Outbox Relay Background Services (sync and async).

OutboxRelayBackgroundService(storage, transport, config)

Bases: AbstractBackgroundService

Polls the Outbox storage and relays pending messages to the transport (sync).

Initialize with storage, transport, and config dependencies.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
def __init__(
    self,
    storage: IOutboxStorage,
    transport: IEventTransport,
    config: OutboxConfig,
) -> None:
    """Initialize with storage, transport, and config dependencies."""
    self._storage = storage
    self._transport = transport
    self._config = config

initialize()

No-op initialization for the relay service.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
@override
def initialize(self) -> None:
    """No-op initialization for the relay service."""
    return

dispose()

No-op disposal for the relay service.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
@override
def dispose(self) -> None:
    """No-op disposal for the relay service."""
    return

run()

Poll the outbox storage and relay pending messages until stopped.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
@override
def run(self) -> None:
    """Poll the outbox storage and relay pending messages until stopped."""
    while not self._stop_event.is_set():
        self._relay_batch()
        self._stop_event.wait(timeout=self._config.polling_interval_seconds)

AsyncOutboxRelayBackgroundService(storage, transport, config)

Bases: AbstractAsyncBackgroundService

Polls the Outbox storage and relays pending messages to the transport (async).

Initialize with async storage, transport, and config dependencies.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
def __init__(
    self,
    storage: IAsyncOutboxStorage,
    transport: IAsyncEventTransport,
    config: OutboxConfig,
) -> None:
    """Initialize with async storage, transport, and config dependencies."""
    self._storage = storage
    self._transport = transport
    self._config = config

initialize_async() async

No-op async initialization for the relay service.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
@override
async def initialize_async(self) -> None:
    """No-op async initialization for the relay service."""
    return

dispose_async() async

No-op async disposal for the relay service.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
@override
async def dispose_async(self) -> None:
    """No-op async disposal for the relay service."""
    return

run_async() async

Poll the outbox storage and relay pending messages asynchronously.

Source code in core/spakky-outbox/src/spakky/outbox/relay/relay.py
@override
async def run_async(self) -> None:
    """Poll the outbox storage and relay pending messages asynchronously."""
    while not self._stop_event.is_set():
        await self._relay_batch()
        try:
            await wait_for(
                self._stop_event.wait(),
                timeout=self._config.polling_interval_seconds,
            )
            break
        except TimeoutError:
            continue

Plugin initialization entry point.

initialize(app)

Initialize the Outbox plugin.

Parameters:

Name Type Description Default
app SpakkyApplication

The Spakky application instance.

required
Source code in core/spakky-outbox/src/spakky/outbox/main.py
def initialize(app: SpakkyApplication) -> None:
    """Initialize the Outbox plugin.

    Args:
        app: The Spakky application instance.
    """
    app.add(OutboxConfig)
    app.add(OutboxEventBus)
    app.add(AsyncOutboxEventBus)
    app.add(OutboxRelayBackgroundService)
    app.add(AsyncOutboxRelayBackgroundService)