RabbitMQ 통합¶
spakky-rabbitmq는IEventTransport인터페이스를 통해 Integration Event를 RabbitMQ로 전송하고, 백그라운드 Consumer로 수신합니다. 이벤트 클래스 이름을 큐/라우팅 키로 사용하므로 발행자와 소비자가 같은 이벤트 타입 계약을 공유해야 합니다.
동작 원리¶
@EventHandler의@on_event메서드가RabbitMQPostProcessor에 의해 Consumer에 자동 등록- Integration Event 발행 시
RabbitMQEventTransport가 RabbitMQ 큐로 전송 RabbitMQEventConsumer가 백그라운드 서비스로 큐를 소비하며 핸들러에 dispatch
설정¶
RabbitMQConnectionConfig는 @Configuration이므로 환경변수에서 자동 로딩됩니다.
from spakky.core.application.application import SpakkyApplication
from spakky.core.application.application_context import ApplicationContext
import spakky.plugins.rabbitmq
import apps
app = (
SpakkyApplication(ApplicationContext())
.load_plugins(include={spakky.plugins.rabbitmq.PLUGIN_NAME})
.scan(apps)
.start()
)
환경변수 예시:
export SPAKKY_RABBITMQ__USE_SSL=false
export SPAKKY_RABBITMQ__HOST=localhost
export SPAKKY_RABBITMQ__PORT=5672
export SPAKKY_RABBITMQ__USER=guest
export SPAKKY_RABBITMQ__PASSWORD=guest
export SPAKKY_RABBITMQ__EXCHANGE_NAME=my-exchange # Optional
| 필드 | 환경변수 | 기본값 | 설명 |
|---|---|---|---|
use_ssl |
SPAKKY_RABBITMQ__USE_SSL |
(필수) | SSL 사용 여부 |
host |
SPAKKY_RABBITMQ__HOST |
(필수) | RabbitMQ 호스트 |
port |
SPAKKY_RABBITMQ__PORT |
(필수) | RabbitMQ 포트 |
user |
SPAKKY_RABBITMQ__USER |
(필수) | 인증 사용자명 |
password |
SPAKKY_RABBITMQ__PASSWORD |
(필수) | 인증 비밀번호 |
exchange_name |
SPAKKY_RABBITMQ__EXCHANGE_NAME |
None |
Exchange 이름 (pub/sub 라우팅) |
이벤트 발행¶
Integration Event를 발행하면 EventPublisher가 IEventBus를 통해 RabbitMQEventTransport로 전달합니다.
from uuid import UUID
from spakky.core.common.mutability import immutable
from spakky.domain.models.event import AbstractIntegrationEvent
@immutable
class OrderPlacedEvent(AbstractIntegrationEvent):
order_id: UUID
total_amount: float
from spakky.core.stereotype.usecase import UseCase
from spakky.event.event_publisher import IAsyncEventPublisher
@UseCase()
class PlaceOrderUseCase:
_publisher: IAsyncEventPublisher
def __init__(self, publisher: IAsyncEventPublisher) -> None:
self._publisher = publisher
async def execute(self, order_id: UUID, total: float) -> None:
event = OrderPlacedEvent(order_id=order_id, total_amount=total)
await self._publisher.publish(event)
이벤트 수신¶
@EventHandler와 @on_event로 수신 핸들러를 정의합니다. RabbitMQPostProcessor가 자동으로 Consumer에 등록합니다.
from spakky.event.stereotype.event_handler import EventHandler, on_event
@EventHandler()
class OrderEventHandler:
@on_event(OrderPlacedEvent)
async def on_order_placed(self, event: OrderPlacedEvent) -> None:
print(f"주문 접수: {event.order_id}, 금액: {event.total_amount}")
큐 이름은 이벤트 클래스의 __name__(예: OrderPlacedEvent)으로 자동 결정됩니다.
운영 흐름¶
발행 경로는 IAsyncEventPublisher에서 시작하지만, RabbitMQ transport가 직접 도메인 객체를 직렬화하지는 않습니다. AsyncDirectEventBus가 TypeAdapter(type(event)).dump_json(event)로 JSON bytes payload를 만들고, AsyncRabbitMQEventTransport가 이벤트 이름과 payload를 RabbitMQ에 씁니다. 동기 경로에서는 같은 역할을 DirectEventBus와 RabbitMQEventTransport가 수행합니다.
sequenceDiagram
participant UseCase
participant Publisher as IAsyncEventPublisher
participant Bus as AsyncDirectEventBus
participant Transport as AsyncRabbitMQEventTransport
participant Broker as RabbitMQ
participant Consumer as AsyncRabbitMQEventConsumer
participant Handler as @EventHandler
UseCase->>Publisher: publish(OrderPlacedEvent)
Publisher->>Bus: send(integration_event)
Bus->>Transport: send(event_name, json_payload, trace_headers)
Transport->>Broker: publish routing_key=OrderPlacedEvent
Consumer->>Broker: consume queue=OrderPlacedEvent
Consumer->>Handler: on_order_placed(event)
실무에서는 아래 규칙을 맞춥니다.
| 항목 | 규칙 |
|---|---|
| 이벤트 이름 | AbstractIntegrationEvent.event_name 값, 기본은 클래스명 |
| 큐 이름 | 수신 핸들러가 등록한 이벤트 클래스명, durable queue로 선언 |
| payload | Pydantic TypeAdapter가 만든 JSON bytes |
| headers | ITracePropagator.inject()가 넣은 trace header |
| exchange 없음 | 기본 exchange에 큐 이름 routing key로 발행 |
| exchange 있음 | configured exchange에 이벤트 이름 routing key로 발행하고 큐를 bind |
spakky-outbox를 함께 로드하면 OutboxEventBus / AsyncOutboxEventBus가 @Primary로 기본 bus를 대체합니다. 이 경우 UseCase 안의 publish() 호출은 RabbitMQ에 즉시 전송되지 않고 Outbox 테이블에 저장되며, Relay가 나중에 RabbitMQ transport의 send()를 호출합니다. 비즈니스 데이터와 메시지 저장을 같은 DB 트랜잭션에 묶어야 하면 Outbox를 사용하세요.
Exchange 라우팅¶
exchange_name을 설정하면 pub/sub 패턴으로 동작합니다.
- Transport가 exchange를 선언하고 큐를 바인딩
- 같은 exchange에 여러 큐가 바인딩되면 메시지가 팬아웃
exchange_name이 None이면 기본 exchange(direct)를 사용합니다.
분산 트레이싱¶
spakky-tracing은 spakky-rabbitmq의 필수 의존성입니다. 컨테이너에 ITracePropagator가 등록되어 있으면 메시지 헤더를 통해 TraceContext가 자동 전파됩니다.
- 발행 측:
OutboxEventBus또는DirectEventBus가 현재TraceContext를 메시지 헤더에 주입 - 수신 측:
RabbitMQEventConsumer가 헤더에서TraceContext를 추출하여 자식 span 생성 - 헤더가 없으면 새로운 루트 트레이스를 시작
ITracePropagator가 컨테이너에 없으면 트레이싱은 비활성 상태로, 별도 에러 없이 동작합니다
별도 설정이나 코드 변경 없이, 플러그인 로드만으로 동작합니다.