이벤트 시스템 가이드¶
이 문서는 Spakky Framework의 이벤트 시스템을 설명합니다.
개요¶
Spakky 이벤트 시스템은 두 가지 이벤트 타입을 지원합니다:
- DomainEvent — 동일 바운디드 컨텍스트 내 상태 변경 알림
- IntegrationEvent — 바운디드 컨텍스트 간 또는 서비스 간 통신
패키지 분리:
spakky-domain— 이벤트 모델 정의 (AbstractDomainEvent,AbstractIntegrationEvent)spakky-event— 인프로세스 이벤트 처리 (Mediator, Publisher, Consumer)spakky-rabbitmq,spakky-kafka— 분산 이벤트 브로커 통합
이벤트 정의¶
DomainEvent¶
동일 애플리케이션 내에서 발생하는 도메인 상태 변경을 표현합니다.
from dataclasses import dataclass
from uuid import UUID
from spakky.core.common.mutability import immutable
from spakky.domain.models.event import AbstractDomainEvent
@immutable
class UserCreatedEvent(AbstractDomainEvent):
"""사용자 생성됨 도메인 이벤트"""
user_id: UUID
email: str
IntegrationEvent¶
서비스 경계를 넘어 통신에 사용됩니다. RabbitMQ, Kafka 등으로 전송됩니다.
from dataclasses import dataclass
from decimal import Decimal
from uuid import UUID
from spakky.core.common.mutability import immutable
from spakky.domain.models.event import AbstractIntegrationEvent
@immutable
class OrderPlacedEvent(AbstractIntegrationEvent):
"""주문 완료 통합 이벤트"""
order_id: UUID
customer_id: UUID
total_amount: Decimal
이벤트 공통 속성¶
모든 이벤트는 다음 속성을 자동으로 가집니다:
event_id: UUID— 고유 식별자 (자동 생성)timestamp: datetime— 발생 시각 (UTC, 자동 생성)event_name: str— 이벤트 클래스 이름 (property)
AggregateRoot에서 이벤트 발생¶
AggregateRoot는 도메인 이벤트를 수집하고, 트랜잭션 커밋 시 발행합니다.
from spakky.domain.models.aggregate_root import AbstractAggregateRoot
from spakky.core.common.mutability import mutable
from uuid import UUID
@mutable
class Order(AbstractAggregateRoot[UUID]):
"""주문 애그리게이트"""
items: list[OrderItem]
status: OrderStatus
def place(self) -> None:
"""주문 확정"""
self.status = OrderStatus.PLACED
# 도메인 이벤트 추가
self.add_event(OrderPlacedEvent(
order_id=self.id,
customer_id=self.customer_id,
total_amount=self.total_amount,
))
def cancel(self, reason: str) -> None:
"""주문 취소"""
self.status = OrderStatus.CANCELLED
self.add_event(OrderCancelledEvent(
order_id=self.id,
reason=reason,
))
이벤트 수집 메서드¶
add_event(event)— 이벤트 추가remove_event(event)— 이벤트 제거clear_events()— 모든 이벤트 제거events— 현재 이벤트 목록 (읽기 전용)
인프로세스 이벤트 처리¶
아키텍처¶
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Publisher │────▶│ Dispatcher │────▶│ Handler │
└─────────────┘ └─────────────┘ └─────────────┘
│
│ (Consumer 역할)
▼
┌─────────────┐
│ Mediator │
│ (Consumer + │
│ Dispatcher)│
└─────────────┘
Mediator¶
Consumer(핸들러 등록)와 Dispatcher(이벤트 전달) 책임을 결합한 인프로세스 구현체입니다.
from spakky.event.mediator import AsyncEventMediator
# Mediator 생성 (보통 DI로 주입)
mediator = AsyncEventMediator()
# 핸들러 등록
mediator.register(UserCreatedEvent, handle_user_created)
mediator.register(UserCreatedEvent, send_welcome_email) # 복수 핸들러 가능
# 이벤트 디스패치
await mediator.dispatch(UserCreatedEvent(user_id=uuid4(), email="user@example.com"))
Publisher¶
Dispatcher에 위임하여 이벤트를 발행합니다. 핸들러 등록 세부사항을 몰라도 됩니다.
from spakky.event.publisher import AsyncEventPublisher
# UseCase에서 Publisher 사용
@UseCase()
class CreateUserUseCase:
def __init__(self, publisher: IAsyncEventPublisher) -> None:
self.publisher = publisher
async def execute(self, command: CreateUserCommand) -> User:
user = User.create(command)
await self.publisher.publish(UserCreatedEvent(
user_id=user.id,
email=user.email,
))
return user
@EventHandler로 이벤트 처리¶
선언적 방식으로 이벤트 핸들러를 정의합니다.
from spakky.event.stereotype.event_handler import EventHandler, on_event
@EventHandler()
class UserEventHandler:
"""사용자 관련 이벤트 핸들러"""
def __init__(self, mailer: IMailer, analytics: IAnalytics) -> None:
self.mailer = mailer
self.analytics = analytics
@on_event(UserCreatedEvent)
async def send_welcome_email(self, event: UserCreatedEvent) -> None:
"""신규 사용자에게 환영 이메일 발송"""
await self.mailer.send_welcome(event.email)
@on_event(UserCreatedEvent)
async def track_signup(self, event: UserCreatedEvent) -> None:
"""회원가입 분석 추적"""
await self.analytics.track("signup", user_id=str(event.user_id))
@on_event(UserDeletedEvent)
async def cleanup_user_data(self, event: UserDeletedEvent) -> None:
"""사용자 데이터 정리"""
await self.cleanup_service.remove_user_data(event.user_id)
핸들러 자동 등록¶
@EventHandler Pod가 등록되면, @on_event로 표시된 메서드가 자동으로 Consumer에 등록됩니다.
인터페이스 구조¶
설계 배경은 ADR-0001 참조. 동사 규칙은 glossary — 동사 규칙 참조.
| 인터페이스 | 동사 | 역할 |
|---|---|---|
IEventPublisher / IAsyncEventPublisher |
publish |
단일 발행 진입점 (타입 기반 라우팅) |
IEventBus / IAsyncEventBus |
send |
Integration Event 발행 진입점 (Outbox seam) |
IEventTransport / IAsyncEventTransport |
send |
실제 메시지 브로커 전송 |
IEventConsumer / IAsyncEventConsumer |
register |
핸들러 콜백 등록 (통합) |
IEventDispatcher / IAsyncEventDispatcher |
dispatch |
인프로세스 핸들러 전달 (통합) |
DomainEvent vs IntegrationEvent¶
| 특성 | DomainEvent | IntegrationEvent |
|---|---|---|
| 범위 | 단일 바운디드 컨텍스트 | 서비스/컨텍스트 간 |
| 전달 | 인프로세스 (Mediator) | 메시지 브로커 (RabbitMQ, Kafka) |
| 일관성 | 동일 트랜잭션 | 최종 일관성 (eventual) |
| 직렬화 | 불필요 | JSON/Protobuf 등 |
| 용도 | 내부 도메인 로직 | 외부 시스템 통합 |
패턴: DomainEvent → IntegrationEvent 변환¶
내부 도메인 이벤트를 외부에 발행할 때 변환합니다.
핸들러가 IAsyncEventPublisher를 통해 IntegrationEvent를 명시적으로 생성합니다:
@EventHandler()
class OrderEventBridge:
"""도메인 이벤트를 통합 이벤트로 변환"""
def __init__(self, publisher: IAsyncEventPublisher) -> None:
self.publisher = publisher
@on_event(OrderPlacedDomainEvent)
async def bridge_order_placed(self, event: OrderPlacedDomainEvent) -> None:
# 도메인 이벤트를 통합 이벤트로 변환하여 발행
await self.publisher.publish(OrderPlacedIntegrationEvent(
order_id=event.order_id,
customer_id=event.customer_id,
total_amount=event.total_amount,
))
Outbox 패턴¶
트랜잭션과 이벤트 발행의 원자성을 보장합니다.
문제¶
async def place_order(self, command: PlaceOrderCommand) -> Order:
async with transaction:
order = Order.create(command)
await self.repository.save(order)
# ❌ 트랜잭션 외부에서 발행 - 실패하면 이벤트 누락
await self.publisher.publish(OrderPlacedEvent(...))
return order
해결: Outbox 테이블¶
- 이벤트를 DB 테이블에 트랜잭션 내에서 저장
- 백그라운드 워커가 테이블을 폴링하여 브로커에 발행
- 발행 성공 시 테이블에서 제거
Spakky에서는 AsyncOutboxEventBus가 @Primary로 IAsyncEventBus를 교체하므로, 별도 코드 변경 없이 기존 IAsyncEventPublisher.publish() 호출이 자동으로 Outbox를 통해 저장됩니다.
# UseCase 코드는 동일 — Outbox 플러그인이 투명하게 동작
@UseCase()
class PlaceOrderUseCase:
def __init__(self, publisher: IAsyncEventPublisher) -> None:
self.publisher = publisher
@transactional
async def place_order(self, command: PlaceOrderCommand) -> Order:
order = Order.create(command)
await self.repository.save(order)
# AsyncOutboxEventBus가 IAsyncEventBus를 교체하여
# 같은 트랜잭션에서 OutboxMessage로 자동 저장
await self.publisher.publish(OrderPlacedEvent(
order_id=order.uid,
))
return order
분산 이벤트 브로커¶
RabbitMQ (spakky-rabbitmq)¶
RabbitMQEventTransport(implementsIEventTransport) /RabbitMQEventConsumer
@Pod()
class OrderService:
def __init__(self, publisher: IAsyncEventPublisher) -> None:
self.publisher = publisher
async def place_order(self, command: PlaceOrderCommand) -> Order:
order = await self.create_order(command)
await self.publisher.publish(OrderPlacedEvent(...))
return order
Kafka (spakky-kafka)¶
KafkaEventTransport(implementsIEventTransport) /KafkaEventConsumer
오류 처리¶
Mediator의 탄력적 디스패치¶
인프로세스 Mediator는 핸들러 실패 시에도 나머지 핸들러를 계속 실행합니다:
재시도 전략¶
분산 환경에서는 재시도 로직이 필요합니다:
@EventHandler()
class ResilientHandler:
@on_event(OrderPlacedEvent)
async def handle_with_retry(self, event: OrderPlacedEvent) -> None:
for attempt in range(3):
try:
await self.process(event)
return
except RetryableError:
if attempt == 2:
raise
await asyncio.sleep(2 ** attempt)
테스트¶
import pytest
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_event_handler():
# 목 의존성
mailer = AsyncMock()
# 핸들러 생성
handler = UserEventHandler(mailer=mailer, analytics=AsyncMock())
# 이벤트 발생
event = UserCreatedEvent(user_id=uuid4(), email="test@example.com")
await handler.send_welcome_email(event)
# 검증
mailer.send_welcome.assert_called_once_with("test@example.com")
Mediator 테스트¶
@pytest.mark.asyncio
async def test_mediator_dispatches_to_handlers():
mediator = AsyncEventMediator()
handler = AsyncMock()
mediator.register(UserCreatedEvent, handler)
event = UserCreatedEvent(user_id=uuid4(), email="test@example.com")
await mediator.dispatch(event)
handler.assert_called_once_with(event)