콘텐츠로 이동

spakky-event

인프로세스 이벤트 시스템 — EventBus, EventTransport, EventMediator

Publisher Interfaces

spakky.event.event_publisher

Event publishing and transport interfaces.

Provides publisher, bus, and transport abstractions for event routing: - IEventPublisher: Routes events by type (domain vs integration). - IEventBus: Serializes and sends integration events via transport. - IEventTransport: Low-level transport for serialized event payloads.

IEventPublisher

Bases: ABC

Publishes events by routing to dispatcher or bus based on event type.

publish(event) abstractmethod

Publish an event (domain → dispatcher, integration → bus).

Source code in core/spakky-event/src/spakky/event/event_publisher.py
@abstractmethod
def publish(self, event: AbstractEvent) -> None:
    """Publish an event (domain → dispatcher, integration → bus)."""
    ...

IAsyncEventPublisher

Bases: ABC

Async counterpart of IEventPublisher.

publish(event) abstractmethod async

Publish an event asynchronously.

Source code in core/spakky-event/src/spakky/event/event_publisher.py
@abstractmethod
async def publish(self, event: AbstractEvent) -> None:
    """Publish an event asynchronously."""
    ...

IEventBus

Bases: ABC

Synchronous event bus for sending integration events.

send(event) abstractmethod

Serialize and send an integration event via transport.

Source code in core/spakky-event/src/spakky/event/event_publisher.py
@abstractmethod
def send(self, event: AbstractIntegrationEvent) -> None:
    """Serialize and send an integration event via transport."""
    ...

IAsyncEventBus

Bases: ABC

Asynchronous event bus for sending integration events.

send(event) abstractmethod async

Serialize and send an integration event via transport.

Source code in core/spakky-event/src/spakky/event/event_publisher.py
@abstractmethod
async def send(self, event: AbstractIntegrationEvent) -> None:
    """Serialize and send an integration event via transport."""
    ...

IEventTransport

Bases: ABC

Low-level synchronous transport for pre-serialized event payloads.

send(event_name, payload) abstractmethod

Send a serialized event payload to the message broker.

Source code in core/spakky-event/src/spakky/event/event_publisher.py
@abstractmethod
def send(self, event_name: str, payload: bytes) -> None:
    """Send a serialized event payload to the message broker."""
    ...

IAsyncEventTransport

Bases: ABC

Low-level asynchronous transport for pre-serialized event payloads.

send(event_name, payload) abstractmethod async

Send a serialized event payload to the message broker.

Source code in core/spakky-event/src/spakky/event/event_publisher.py
@abstractmethod
async def send(self, event_name: str, payload: bytes) -> None:
    """Send a serialized event payload to the message broker."""
    ...

options: show_root_heading: false

Consumer Interfaces

spakky.event.event_consumer

Event consumer interfaces for registering event handlers.

EventHandlerCallback = Callable[[EventT_contra], None] module-attribute

Synchronous event handler callback type.

AsyncEventHandlerCallback = Callable[[EventT_contra], Awaitable[None]] module-attribute

Asynchronous event handler callback type.

IEventConsumer

Bases: ABC

Synchronous event consumer interface for registering event handlers.

register(event, handler) abstractmethod

Register a handler callback for the given event type.

Source code in core/spakky-event/src/spakky/event/event_consumer.py
@abstractmethod
def register(
    self,
    event: type[EventT_contra],
    handler: EventHandlerCallback[EventT_contra],
) -> None:
    """Register a handler callback for the given event type."""
    ...

IAsyncEventConsumer

Bases: ABC

Asynchronous event consumer interface for registering event handlers.

register(event, handler) abstractmethod

Register an async handler callback for the given event type.

Source code in core/spakky-event/src/spakky/event/event_consumer.py
@abstractmethod
def register(
    self,
    event: type[EventT_contra],
    handler: AsyncEventHandlerCallback[EventT_contra],
) -> None:
    """Register an async handler callback for the given event type."""
    ...

options: show_root_heading: false

Dispatcher Interfaces

spakky.event.event_dispatcher

Event dispatcher interfaces for dispatching events to registered handlers.

This module provides unified dispatcher interfaces that handle all event types. Dispatchers are responsible for delivering events to registered handlers, while Consumers are responsible for handler registration. These interfaces are combined in Mediator implementations.

IEventDispatcher

Bases: ABC

Synchronous event dispatcher interface.

dispatch(event) abstractmethod

Dispatch an event to all registered handlers.

Source code in core/spakky-event/src/spakky/event/event_dispatcher.py
@abstractmethod
def dispatch(self, event: AbstractEvent) -> None:
    """Dispatch an event to all registered handlers."""
    ...

IAsyncEventDispatcher

Bases: ABC

Asynchronous event dispatcher interface.

dispatch(event) abstractmethod async

Dispatch an event to all registered async handlers.

Source code in core/spakky-event/src/spakky/event/event_dispatcher.py
@abstractmethod
async def dispatch(self, event: AbstractEvent) -> None:
    """Dispatch an event to all registered async handlers."""
    ...

options: show_root_heading: false

Event Bus

spakky.event.bus.transport_event_bus

Default EventBus implementations that delegate to EventTransport.

DirectEventBus(transport)

Bases: IEventBus

Synchronous event bus that serializes and delegates to IEventTransport.

Initialize with the given transport.

Source code in core/spakky-event/src/spakky/event/bus/transport_event_bus.py
def __init__(self, transport: IEventTransport) -> None:
    """Initialize with the given transport."""
    self._transport = transport
    self._adapters = {}

send(event)

Serialize and send an integration event via transport.

Source code in core/spakky-event/src/spakky/event/bus/transport_event_bus.py
def send(self, event: AbstractIntegrationEvent) -> None:
    """Serialize and send an integration event via transport."""
    event_type = type(event)
    if event_type not in self._adapters:
        self._adapters[event_type] = TypeAdapter(event_type)
    adapter = self._adapters[event_type]
    self._transport.send(event.event_name, adapter.dump_json(event))

AsyncDirectEventBus(transport)

Bases: IAsyncEventBus

Asynchronous event bus that serializes and delegates to IAsyncEventTransport.

Initialize with the given async transport.

Source code in core/spakky-event/src/spakky/event/bus/transport_event_bus.py
def __init__(self, transport: IAsyncEventTransport) -> None:
    """Initialize with the given async transport."""
    self._transport = transport
    self._adapters = {}

send(event) async

Serialize and send an integration event via async transport.

Source code in core/spakky-event/src/spakky/event/bus/transport_event_bus.py
async def send(self, event: AbstractIntegrationEvent) -> None:
    """Serialize and send an integration event via async transport."""
    event_type = type(event)
    if event_type not in self._adapters:
        self._adapters[event_type] = TypeAdapter(event_type)
    adapter = self._adapters[event_type]
    await self._transport.send(event.event_name, adapter.dump_json(event))

options: show_root_heading: false

Mediator

spakky.event.mediator.domain_event_mediator

Event mediator implementations.

This module provides in-process mediator implementations that combine Consumer and Dispatcher interfaces. Mediators manage handler registration and event dispatching within the same bounded context.

EventMediator()

Bases: IEventConsumer, IEventDispatcher

In-process synchronous event mediator combining consumer and dispatcher roles.

Initialize an empty handler registry.

Source code in core/spakky-event/src/spakky/event/mediator/domain_event_mediator.py
def __init__(self) -> None:
    """Initialize an empty handler registry."""
    self._handlers = {}

register(event, handler)

Register a handler callback for the given event type.

Source code in core/spakky-event/src/spakky/event/mediator/domain_event_mediator.py
def register(
    self,
    event: type[AbstractEvent],
    handler: EventHandlerCallback[Any],
) -> None:
    """Register a handler callback for the given event type."""
    if event not in self._handlers:
        self._handlers[event] = []
    self._handlers[event].append(handler)
    logger.debug(f"Registered handler for {event.__name__}")

dispatch(event)

Dispatch an event to all registered handlers.

Source code in core/spakky-event/src/spakky/event/mediator/domain_event_mediator.py
def dispatch(self, event: AbstractEvent) -> None:
    """Dispatch an event to all registered handlers."""
    event_type = type(event)
    handlers = self._handlers.get(event_type, [])

    if not handlers:
        logger.debug(f"No handlers registered for {event_type.__name__}")
        return

    for handler in handlers:
        try:
            handler(event)
        except Exception as e:
            logger.error(
                f"Handler {handler} failed for {event_type.__name__}: {e}",
                exc_info=True,
            )

AsyncEventMediator()

Bases: IAsyncEventConsumer, IAsyncEventDispatcher

In-process asynchronous event mediator combining consumer and dispatcher roles.

Initialize an empty async handler registry.

Source code in core/spakky-event/src/spakky/event/mediator/domain_event_mediator.py
def __init__(self) -> None:
    """Initialize an empty async handler registry."""
    self._handlers = {}

register(event, handler)

Register an async handler callback for the given event type.

Source code in core/spakky-event/src/spakky/event/mediator/domain_event_mediator.py
def register(
    self,
    event: type[AbstractEvent],
    handler: AsyncEventHandlerCallback[Any],
) -> None:
    """Register an async handler callback for the given event type."""
    if event not in self._handlers:
        self._handlers[event] = []
    self._handlers[event].append(handler)
    logger.debug(f"Registered async handler for {event.__name__}")

dispatch(event) async

Dispatch an event to all registered async handlers.

Source code in core/spakky-event/src/spakky/event/mediator/domain_event_mediator.py
async def dispatch(self, event: AbstractEvent) -> None:
    """Dispatch an event to all registered async handlers."""
    event_type = type(event)
    handlers = self._handlers.get(event_type, [])

    if not handlers:
        logger.debug(f"No handlers registered for {event_type.__name__}")
        return

    for handler in handlers:
        try:
            await handler(event)
        except Exception as e:
            logger.error(
                f"Handler {handler} failed for {event_type.__name__}: {e}",
                exc_info=True,
            )

options: show_root_heading: false

Publisher

spakky.event.publisher.domain_event_publisher

Event publisher implementations.

This module provides event publishers that route events by type: - AbstractDomainEvent → EventMediator (in-process dispatch) - AbstractIntegrationEvent → IEventBus (external transport)

EventPublisher(dispatcher, bus)

Bases: IEventPublisher

Routes events by type: domain events to dispatcher, integration events to bus.

Initialize with dispatcher and bus dependencies.

Source code in core/spakky-event/src/spakky/event/publisher/domain_event_publisher.py
def __init__(
    self,
    dispatcher: IEventDispatcher,
    bus: IEventBus,
) -> None:
    """Initialize with dispatcher and bus dependencies."""
    self._dispatcher = dispatcher
    self._bus = bus

publish(event)

Route an event to the appropriate handler based on its type.

Source code in core/spakky-event/src/spakky/event/publisher/domain_event_publisher.py
def publish(self, event: AbstractEvent) -> None:
    """Route an event to the appropriate handler based on its type."""
    match event:
        case AbstractDomainEvent():
            self._dispatcher.dispatch(event)
        case AbstractIntegrationEvent():
            self._bus.send(event)
        case _:  # pragma: no cover - 방어적 분기 (정상 흐름 불가)
            raise AssertionError(f"Unknown event type: {type(event)!r}")

AsyncEventPublisher(dispatcher, bus)

Bases: IAsyncEventPublisher

Async counterpart that routes events by type.

Initialize with async dispatcher and bus dependencies.

Source code in core/spakky-event/src/spakky/event/publisher/domain_event_publisher.py
def __init__(
    self,
    dispatcher: IAsyncEventDispatcher,
    bus: IAsyncEventBus,
) -> None:
    """Initialize with async dispatcher and bus dependencies."""
    self._dispatcher = dispatcher
    self._bus = bus

publish(event) async

Route an event to the appropriate async handler based on its type.

Source code in core/spakky-event/src/spakky/event/publisher/domain_event_publisher.py
async def publish(self, event: AbstractEvent) -> None:
    """Route an event to the appropriate async handler based on its type."""
    match event:
        case AbstractDomainEvent():
            await self._dispatcher.dispatch(event)
        case AbstractIntegrationEvent():
            await self._bus.send(event)
        case _:  # pragma: no cover - 방어적 분기 (정상 흐름 불가)
            raise AssertionError(f"Unknown event type: {type(event)!r}")

options: show_root_heading: false

Aspects

spakky.event.aspects.transactional_event_publishing

Transactional event publishing aspect for automatic domain event publishing.

AsyncTransactionalEventPublishingAspect(collector, publisher)

Bases: IAsyncAspect

Source code in core/spakky-event/src/spakky/event/aspects/transactional_event_publishing.py
def __init__(
    self,
    collector: AggregateCollector,
    publisher: IAsyncEventPublisher,
) -> None:
    self._collector = collector
    self._publisher = publisher

after_returning_async(result) async

Publish domain events from collected aggregates after successful commit.

Source code in core/spakky-event/src/spakky/event/aspects/transactional_event_publishing.py
@AfterReturning(lambda x: Transactional.exists(x) and iscoroutinefunction(x))
async def after_returning_async(self, result: Any) -> None:
    """Publish domain events from collected aggregates after successful commit."""
    for aggregate in self._collector.all():
        for event in aggregate.events:
            await self._publisher.publish(event)
        aggregate.clear_events()

after_async() async

Clear the aggregate collector after transaction completion.

Source code in core/spakky-event/src/spakky/event/aspects/transactional_event_publishing.py
@After(lambda x: Transactional.exists(x) and iscoroutinefunction(x))
async def after_async(self) -> None:
    """Clear the aggregate collector after transaction completion."""
    self._collector.clear()

TransactionalEventPublishingAspect(collector, publisher)

Bases: IAspect

Source code in core/spakky-event/src/spakky/event/aspects/transactional_event_publishing.py
def __init__(
    self,
    collector: AggregateCollector,
    publisher: IEventPublisher,
) -> None:
    self._collector = collector
    self._publisher = publisher

after_returning(result)

Publish domain events from collected aggregates after successful commit.

Source code in core/spakky-event/src/spakky/event/aspects/transactional_event_publishing.py
@AfterReturning(lambda x: Transactional.exists(x) and not iscoroutinefunction(x))
def after_returning(self, result: Any) -> None:
    """Publish domain events from collected aggregates after successful commit."""
    for aggregate in self._collector.all():
        for event in aggregate.events:
            self._publisher.publish(event)
        aggregate.clear_events()

after()

Clear the aggregate collector after transaction completion.

Source code in core/spakky-event/src/spakky/event/aspects/transactional_event_publishing.py
@After(lambda x: Transactional.exists(x) and not iscoroutinefunction(x))
def after(self) -> None:
    """Clear the aggregate collector after transaction completion."""
    self._collector.clear()

options: show_root_heading: false

Stereotype

spakky.event.stereotype.event_handler

EventHandler stereotype and event routing decorators.

This module provides @EventHandler stereotype and @on_event decorator for organizing event-driven architectures.

EventT_contra = TypeVar('EventT_contra', bound=AbstractEvent, contravariant=True) module-attribute

Type variable for domain event types (contravariant for handler parameters).

EventHandlerMethod = Callable[[Any, EventT_contra], None | Awaitable[None]] module-attribute

Type alias for event handler callback functions.

EventRoute(event_type) dataclass

Bases: FunctionAnnotation, Generic[EventT_contra]

Annotation for marking methods as event handlers.

Associates a method with a specific domain event type.

event_type instance-attribute

The domain event type this handler processes.

__call__(obj)

Apply event route annotation to method.

Parameters:

Name Type Description Default
obj EventHandlerMethod[EventT_contra]

The method to annotate.

required

Returns:

Type Description
EventHandlerMethod[EventT_contra]

The annotated method.

Source code in core/spakky-event/src/spakky/event/stereotype/event_handler.py
def __call__(
    self, obj: EventHandlerMethod[EventT_contra]
) -> EventHandlerMethod[EventT_contra]:
    """Apply event route annotation to method.

    Args:
        obj: The method to annotate.

    Returns:
        The annotated method.
    """
    return super().__call__(obj)

EventHandler(*, name='', scope=Scope.SINGLETON) dataclass

Bases: Pod

Stereotype for event handler classes.

EventHandlers contain methods decorated with @on_event that process domain events asynchronously.

on_event(event_type)

Decorator for marking methods as event handlers.

Parameters:

Name Type Description Default
event_type type[EventT_contra]

The domain event type to handle.

required

Returns:

Type Description
Callable[[EventHandlerMethod[EventT_contra]], EventHandlerMethod[EventT_contra]]

Decorator function that applies EventRoute annotation.

Example

@EventHandler() class UserEventHandler: @on_event(UserCreatedEvent) async def handle_user_created(self, event: UserCreatedEvent) -> None: # Handle event pass

Source code in core/spakky-event/src/spakky/event/stereotype/event_handler.py
def on_event(
    event_type: type[EventT_contra],
) -> Callable[
    [EventHandlerMethod[EventT_contra]],
    EventHandlerMethod[EventT_contra],
]:
    """Decorator for marking methods as event handlers.

    Args:
        event_type: The domain event type to handle.

    Returns:
        Decorator function that applies EventRoute annotation.

    Example:
        @EventHandler()
        class UserEventHandler:
            @on_event(UserCreatedEvent)
            async def handle_user_created(self, event: UserCreatedEvent) -> None:
                # Handle event
                pass
    """

    def wrapper(
        method: EventHandlerMethod[EventT_contra],
    ) -> EventHandlerMethod[EventT_contra]:
        return EventRoute(event_type)(method)

    return wrapper

options: show_root_heading: false

Post Processor

spakky.event.post_processor

Event handler registration post-processor.

EventHandlerRegistrationPostProcessor

Bases: IPostProcessor, IContainerAware

Scans @EventHandler Pods and registers their @on_event methods with consumers.

set_container(container)

Receive the container reference via IContainerAware.

Source code in core/spakky-event/src/spakky/event/post_processor.py
def set_container(self, container: IContainer) -> None:
    """Receive the container reference via IContainerAware."""
    self.__container = container

post_process(pod)

Register event handler methods with the appropriate consumer.

Source code in core/spakky-event/src/spakky/event/post_processor.py
def post_process(self, pod: object) -> object:
    """Register event handler methods with the appropriate consumer."""
    pod_type = type(pod)

    if not EventHandler.exists(pod_type):
        return pod

    sync_consumer = self.__container.get(IEventConsumer)
    async_consumer = self.__container.get(IAsyncEventConsumer)

    for name, method in getmembers(pod, predicate=ismethod):
        route = EventRoute[AbstractEvent].get_or_none(method)
        if route is None:
            continue
        if not issubclass(route.event_type, AbstractDomainEvent):
            continue

        event_type = route.event_type

        if iscoroutinefunction(method):
            async_consumer.register(event_type, method)
            logger.debug(
                f"Registered async handler {pod_type.__name__}.{name} "
                f"for {event_type.__name__}"
            )
        else:
            sync_consumer.register(event_type, method)
            logger.debug(
                f"Registered sync handler {pod_type.__name__}.{name} "
                f"for {event_type.__name__}"
            )

    return pod

options: show_root_heading: false

Errors

spakky.event.error

AbstractSpakkyEventError

Bases: AbstractSpakkyFrameworkError, ABC

Base error for event system operations.

InvalidMessageError

Bases: AbstractSpakkyEventError

Raised when a message received is invalid or malformed.

options: show_root_heading: false