콘텐츠로 이동

spakky-rabbitmq

RabbitMQ 통합 — 이벤트 전송/수신

Main

spakky.plugins.rabbitmq.main

Plugin initialization for RabbitMQ integration.

Registers event consumers, transports, and post-processors for automatic event handler registration in RabbitMQ-enabled applications.

initialize(app)

Initialize the RabbitMQ plugin.

Registers event consumers, transports, and the post-processor for automatic event handler registration. This function is called automatically by the Spakky framework during plugin loading.

Parameters:

Name Type Description Default
app SpakkyApplication

The Spakky application instance.

required
Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/main.py
def initialize(app: SpakkyApplication) -> None:
    """Initialize the RabbitMQ plugin.

    Registers event consumers, transports, and the post-processor for automatic
    event handler registration. This function is called automatically by the
    Spakky framework during plugin loading.

    Args:
        app: The Spakky application instance.
    """
    app.add(RabbitMQConnectionConfig)

    app.add(RabbitMQPostProcessor)

    app.add(RabbitMQEventConsumer)
    app.add(RabbitMQEventTransport)

    app.add(AsyncRabbitMQEventConsumer)
    app.add(AsyncRabbitMQEventTransport)

options: show_root_heading: false

Event Transport

spakky.plugins.rabbitmq.event.transport

RabbitMQ event transports for integration events.

Provides synchronous and asynchronous event transports that send integration events to RabbitMQ queues with optional exchange routing.

RabbitMQEventTransport(config)

Bases: IEventTransport

Synchronous RabbitMQ event transport.

Sends pre-serialized event payloads to RabbitMQ queues using blocking connections. Optionally routes through an exchange for pub/sub patterns.

Attributes:

Name Type Description
connection_string str

AMQP connection string.

exchange_name str | None

Optional exchange name for routing.

Initialize the synchronous RabbitMQ event transport.

Parameters:

Name Type Description Default
config RabbitMQConnectionConfig

RabbitMQ connection configuration.

required
Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/event/transport.py
def __init__(self, config: RabbitMQConnectionConfig) -> None:
    """Initialize the synchronous RabbitMQ event transport.

    Args:
        config: RabbitMQ connection configuration.
    """
    self.connection_string = config.connection_string
    self.exchange_name = config.exchange_name

send(event_name, payload)

Send a pre-serialized event payload to RabbitMQ.

Creates a new connection, sends the payload to the appropriate queue, and closes the connection.

Parameters:

Name Type Description Default
event_name str

Routing key / queue name for the event.

required
payload bytes

Pre-serialized JSON bytes.

required
Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/event/transport.py
def send(self, event_name: str, payload: bytes) -> None:
    """Send a pre-serialized event payload to RabbitMQ.

    Creates a new connection, sends the payload to the appropriate queue,
    and closes the connection.

    Args:
        event_name: Routing key / queue name for the event.
        payload: Pre-serialized JSON bytes.
    """
    connection = BlockingConnection(URLParameters(self.connection_string))
    channel = connection.channel()
    channel.queue_declare(event_name)
    if self.exchange_name is not None:
        channel.exchange_declare(self.exchange_name)
        channel.queue_bind(event_name, self.exchange_name, event_name)
    channel.basic_publish(
        self.exchange_name if self.exchange_name is not None else "",
        event_name,
        payload,
    )
    channel.close()
    connection.close()

AsyncRabbitMQEventTransport(config)

Bases: IAsyncEventTransport

Asynchronous RabbitMQ event transport.

Sends pre-serialized event payloads to RabbitMQ queues using async connections. Optionally routes through an exchange for pub/sub patterns.

Attributes:

Name Type Description
connection_string str

AMQP connection string.

exchange_name str | None

Optional exchange name for routing.

Initialize the asynchronous RabbitMQ event transport.

Parameters:

Name Type Description Default
config RabbitMQConnectionConfig

RabbitMQ connection configuration.

required
Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/event/transport.py
def __init__(self, config: RabbitMQConnectionConfig) -> None:
    """Initialize the asynchronous RabbitMQ event transport.

    Args:
        config: RabbitMQ connection configuration.
    """
    self.connection_string = config.connection_string
    self.exchange_name = config.exchange_name

send(event_name, payload) async

Send a pre-serialized event payload to RabbitMQ asynchronously.

Creates a new robust connection, sends the payload to the appropriate queue, and closes the connection.

Parameters:

Name Type Description Default
event_name str

Routing key / queue name for the event.

required
payload bytes

Pre-serialized JSON bytes.

required
Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/event/transport.py
async def send(self, event_name: str, payload: bytes) -> None:
    """Send a pre-serialized event payload to RabbitMQ asynchronously.

    Creates a new robust connection, sends the payload to the appropriate
    queue, and closes the connection.

    Args:
        event_name: Routing key / queue name for the event.
        payload: Pre-serialized JSON bytes.
    """
    async with await connect_robust(self.connection_string) as connection:
        channel = await connection.channel()
        exchange = (
            await channel.declare_exchange(self.exchange_name)
            if self.exchange_name is not None
            else channel.default_exchange
        )
        queue = await channel.declare_queue(event_name)
        if self.exchange_name is not None:
            await queue.bind(exchange, event_name)
        await exchange.publish(
            Message(body=payload),
            routing_key=event_name,
        )
        await channel.close()

options: show_root_heading: false

Event Consumer

spakky.plugins.rabbitmq.event.consumer

RabbitMQ event consumers for integration events.

Provides synchronous and asynchronous event consumers that run as background services, consuming integration events from RabbitMQ queues and dispatching them to registered handlers.

RabbitMQEventConsumer(config)

Bases: IEventConsumer, AbstractBackgroundService

Synchronous RabbitMQ event consumer.

Runs as a background service that consumes integration events from RabbitMQ queues and dispatches them to registered synchronous event handlers. Uses blocking connection for synchronous event processing.

Attributes:

Name Type Description
connection_string str

AMQP connection string.

type_lookup dict[str, type[AbstractEvent]]

Maps consumer tags to event types.

handlers dict[type[AbstractEvent], list[EventHandlerCallback[Any]]]

Maps event types to handler callbacks.

connection BlockingConnection

Blocking RabbitMQ connection.

channel BlockingChannel

Blocking channel for message consumption.

Initialize the synchronous RabbitMQ event consumer.

Parameters:

Name Type Description Default
config RabbitMQConnectionConfig

RabbitMQ connection configuration.

required
Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/event/consumer.py
def __init__(self, config: RabbitMQConnectionConfig) -> None:
    """Initialize the synchronous RabbitMQ event consumer.

    Args:
        config: RabbitMQ connection configuration.
    """
    super().__init__()
    self.connection_string = config.connection_string
    self.type_lookup = {}
    self.type_adapters = {}
    self.handlers = {}

register(event, handler)

Register an event handler for a specific event type.

Multiple handlers can be registered for the same event type.

Parameters:

Name Type Description Default
event type[EventT_contra]

The event type to handle.

required
handler EventHandlerCallback[EventT_contra]

The callback function to handle the event.

required
Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/event/consumer.py
def register(
    self,
    event: type[EventT_contra],
    handler: EventHandlerCallback[EventT_contra],
) -> None:
    """Register an event handler for a specific event type.

    Multiple handlers can be registered for the same event type.

    Args:
        event: The event type to handle.
        handler: The callback function to handle the event.
    """
    if event not in self.handlers:
        self.handlers[event] = []
        self.type_adapters[event] = TypeAdapter(event)
    self.handlers[event].append(handler)

initialize()

Initialize RabbitMQ connection and declare queues.

Establishes connection to RabbitMQ, creates a channel, and sets up queue consumers for all registered event handlers.

Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/event/consumer.py
def initialize(self) -> None:
    """Initialize RabbitMQ connection and declare queues.

    Establishes connection to RabbitMQ, creates a channel, and sets up
    queue consumers for all registered event handlers.
    """
    self.connection = BlockingConnection(
        parameters=URLParameters(self.connection_string)
    )
    self.channel = self.connection.channel()

    for event_type in self.handlers:
        self.channel.queue_declare(event_type.__name__)
        consumer_tag = self.channel.basic_consume(
            event_type.__name__,
            self._route_event_handler,
        )
        self.type_lookup[consumer_tag] = event_type

dispose()

Clean up RabbitMQ resources.

Closes the channel and connection when the service is stopped.

Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/event/consumer.py
def dispose(self) -> None:
    """Clean up RabbitMQ resources.

    Closes the channel and connection when the service is stopped.
    """
    self.channel.close()
    self.connection.close()
    return

run()

Run the event consumer loop.

Starts consuming messages from RabbitMQ queues and blocks until the stop event is set.

Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/event/consumer.py
def run(self) -> None:
    """Run the event consumer loop.

    Starts consuming messages from RabbitMQ queues and blocks until
    the stop event is set.
    """
    self.connection.add_callback_threadsafe(self._check_if_event_set)
    self.channel.start_consuming()

AsyncRabbitMQEventConsumer(config)

Bases: IAsyncEventConsumer, AbstractAsyncBackgroundService

Asynchronous RabbitMQ event consumer.

Runs as an async background service that consumes integration events from RabbitMQ queues and dispatches them to registered asynchronous event handlers. Uses robust connection for automatic reconnection.

Attributes:

Name Type Description
connection_string str

AMQP connection string.

type_lookup dict[str, type[AbstractEvent]]

Maps consumer tags to event types.

handlers dict[type[AbstractEvent], list[AsyncEventHandlerCallback[Any]]]

Maps event types to async handler callbacks.

connection AbstractRobustConnection

Robust RabbitMQ connection for async operations.

Initialize the asynchronous RabbitMQ event consumer.

Parameters:

Name Type Description Default
config RabbitMQConnectionConfig

RabbitMQ connection configuration.

required
Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/event/consumer.py
def __init__(self, config: RabbitMQConnectionConfig) -> None:
    """Initialize the asynchronous RabbitMQ event consumer.

    Args:
        config: RabbitMQ connection configuration.
    """
    self.connection_string = config.connection_string
    self.type_lookup = {}
    self.type_adapters = {}
    self.handlers = {}

register(event, handler)

Register an async event handler for a specific event type.

Multiple handlers can be registered for the same event type.

Parameters:

Name Type Description Default
event type[EventT_contra]

The event type to handle.

required
handler AsyncEventHandlerCallback[EventT_contra]

The async callback function to handle the event.

required
Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/event/consumer.py
def register(
    self,
    event: type[EventT_contra],
    handler: AsyncEventHandlerCallback[EventT_contra],
) -> None:
    """Register an async event handler for a specific event type.

    Multiple handlers can be registered for the same event type.

    Args:
        event: The event type to handle.
        handler: The async callback function to handle the event.
    """
    if event not in self.handlers:
        self.handlers[event] = []
        self.type_adapters[event] = TypeAdapter(event)
    self.handlers[event].append(handler)

initialize_async() async

Initialize async RabbitMQ connection and declare queues.

Establishes robust connection to RabbitMQ, creates a channel, and sets up queue consumers for all registered async event handlers.

Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/event/consumer.py
async def initialize_async(self) -> None:
    """Initialize async RabbitMQ connection and declare queues.

    Establishes robust connection to RabbitMQ, creates a channel, and sets
    up queue consumers for all registered async event handlers.
    """
    self.connection = await connect_robust(self.connection_string)
    self.channel = await self.connection.channel()

    for event_type in self.handlers:
        queue = await self.channel.declare_queue(event_type.__name__)
        consumer_tag = await queue.consume(self._route_event_handler)
        self.type_lookup[consumer_tag] = event_type

dispose_async() async

Clean up async RabbitMQ resources.

Closes the channel and connection when the service is stopped.

Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/event/consumer.py
async def dispose_async(self) -> None:
    """Clean up async RabbitMQ resources.

    Closes the channel and connection when the service is stopped.
    """
    await self.channel.close()
    await self.connection.close()
    return

run_async() async

Run the async event consumer loop.

Waits for the stop event to be set while consuming messages from RabbitMQ queues in the background.

Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/event/consumer.py
async def run_async(self) -> None:
    """Run the async event consumer loop.

    Waits for the stop event to be set while consuming messages from
    RabbitMQ queues in the background.
    """
    await self._stop_event.wait()

options: show_root_heading: false

Configuration

spakky.plugins.rabbitmq.common.config

Configuration for RabbitMQ connections.

Provides configuration dataclass for RabbitMQ connection parameters including host, port, credentials, and exchange settings.

RabbitMQConnectionConfig()

Bases: BaseSettings

Configuration for RabbitMQ connection.

Stores connection parameters and provides a formatted connection string for establishing RabbitMQ connections.

Attributes:

Name Type Description
host str

RabbitMQ server hostname.

port int

RabbitMQ server port.

user str

Username for authentication.

password str

Password for authentication.

exchange_name str | None

Optional exchange name for pub/sub routing.

Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/common/config.py
def __init__(self) -> None:
    super().__init__()

use_ssl instance-attribute

Flag indicating whether to use SSL for the connection.

host instance-attribute

RabbitMQ server hostname or IP address.

port instance-attribute

RabbitMQ server port number.

user instance-attribute

Username for RabbitMQ authentication.

password instance-attribute

Password for RabbitMQ authentication.

exchange_name = None class-attribute instance-attribute

Optional exchange name for pub/sub message routing.

protocol property

Determine protocol based on SSL usage.

Returns:

Type Description
str

'amqps' if SSL is enabled, otherwise 'amqp'.

connection_string property

Generate AMQP connection string from configuration.

Returns:

Type Description
str

Formatted AMQP connection string with credentials and host information.

options: show_root_heading: false

Post Processor

spakky.plugins.rabbitmq.post_processor

Post-processor for registering RabbitMQ event handlers.

Automatically discovers and registers event handlers from @EventHandler decorated classes, connecting them to RabbitMQ consumers with dependency injection.

RabbitMQPostProcessor

Bases: IPostProcessor, IContainerAware, IApplicationContextAware

Post-processor that registers event handlers with RabbitMQ consumers.

Scans @EventHandler decorated classes for @event decorated methods and automatically registers them with the appropriate RabbitMQ consumer (sync or async) with proper dependency injection.

set_container(container)

Set the container for dependency injection.

Parameters:

Name Type Description Default
container IContainer

The IoC container.

required
Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/post_processor.py
def set_container(self, container: IContainer) -> None:
    """Set the container for dependency injection.

    Args:
        container: The IoC container.
    """
    self.__container = container

set_application_context(application_context)

Set the application context.

Parameters:

Name Type Description Default
application_context IApplicationContext

The application context instance.

required
Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/post_processor.py
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Set the application context.

    Args:
        application_context: The application context instance.
    """
    self.__application_context = application_context

post_process(pod)

Register event handlers from event handler classes.

Scans the event handler for methods decorated with @event and registers them with the appropriate RabbitMQ consumer (sync or async) based on whether the method is a coroutine function.

Parameters:

Name Type Description Default
pod object

The Pod to process, potentially an event handler.

required

Returns:

Type Description
object

The Pod, with event handlers registered if it's an event handler.

Source code in plugins/spakky-rabbitmq/src/spakky/plugins/rabbitmq/post_processor.py
def post_process(self, pod: object) -> object:
    """Register event handlers from event handler classes.

    Scans the event handler for methods decorated with @event and registers
    them with the appropriate RabbitMQ consumer (sync or async) based on
    whether the method is a coroutine function.

    Args:
        pod: The Pod to process, potentially an event handler.

    Returns:
        The Pod, with event handlers registered if it's an event handler.
    """
    if not EventHandler.exists(pod):
        return pod
    handler: EventHandler = EventHandler.get(pod)
    consumer = self.__container.get(IEventConsumer)
    async_consumer = self.__container.get(IAsyncEventConsumer)
    for name, method in getmembers(pod, ismethod):
        route: EventRoute[AbstractEvent] | None = EventRoute[
            AbstractEvent
        ].get_or_none(method)
        if route is None:
            continue
        if not issubclass(route.event_type, AbstractIntegrationEvent):
            continue

        # pylint: disable=line-too-long
        logger.info(
            f"[{type(self).__name__}] {route.event_type.__name__} -> {method.__qualname__}"
        )

        if iscoroutinefunction(method):

            @wraps(method)
            async def async_endpoint(
                *args: Any,
                method_name: str = name,
                controller_type: type[object] = handler.type_,
                context: IContainer = self.__container,
                **kwargs: Any,
            ) -> Any:
                # Each message is handled in isolation, so clear the
                # application context to avoid reusing dependency state.
                self.__application_context.clear_context()
                controller_instance = context.get(controller_type)
                method_to_call = getattr(controller_instance, method_name)
                return await method_to_call(*args, **kwargs)

            async_consumer.register(route.event_type, async_endpoint)
            continue

        @wraps(method)
        def endpoint(
            *args: Any,
            method_name: str = name,
            controller_type: type[object] = handler.type_,
            context: IContainer = self.__container,
            **kwargs: Any,
        ) -> Any:
            # Synchronous consumers share threads, so drop any lingering
            # scoped data before invoking the handler.
            self.__application_context.clear_context()
            controller_instance = context.get(controller_type)
            method_to_call = getattr(controller_instance, method_name)
            return method_to_call(*args, **kwargs)

        consumer.register(route.event_type, endpoint)
    return pod

options: show_root_heading: false