Kafka 통합¶
spakky-kafka는IEventTransport인터페이스를 통해 Integration Event를 Apache Kafka로 전송하고, 백그라운드 Consumer로 수신합니다. 이벤트 클래스 이름을 Kafka topic으로 사용하므로 발행자와 소비자가 같은 이벤트 타입 계약을 공유해야 합니다.
동작 원리¶
@EventHandler의@on_event메서드가KafkaPostProcessor에 의해 Consumer에 자동 등록- Integration Event 발행 시
KafkaEventTransport가 Kafka 토픽으로 전송 KafkaEventConsumer가 백그라운드 서비스로 토픽을 소비하며 핸들러에 dispatch
설정¶
KafkaConnectionConfig는 @Configuration이므로 환경변수에서 자동 로딩됩니다.
from spakky.core.application.application import SpakkyApplication
from spakky.core.application.application_context import ApplicationContext
import spakky.plugins.kafka
import apps
app = (
SpakkyApplication(ApplicationContext())
.load_plugins(include={spakky.plugins.kafka.PLUGIN_NAME})
.scan(apps)
.start()
)
환경변수 예시:
export SPAKKY_KAFKA__GROUP_ID=my-consumer-group
export SPAKKY_KAFKA__CLIENT_ID=my-app
export SPAKKY_KAFKA__BOOTSTRAP_SERVERS=localhost:9092
export SPAKKY_KAFKA__AUTO_OFFSET_RESET=earliest
export SPAKKY_KAFKA__POLL_TIMEOUT=1.0
| 필드 | 환경변수 | 기본값 | 설명 |
|---|---|---|---|
group_id |
SPAKKY_KAFKA__GROUP_ID |
(필수) | Consumer 그룹 ID |
client_id |
SPAKKY_KAFKA__CLIENT_ID |
(필수) | Kafka 클라이언트 ID |
bootstrap_servers |
SPAKKY_KAFKA__BOOTSTRAP_SERVERS |
(필수) | 부트스트랩 서버 주소 |
security_protocol |
SPAKKY_KAFKA__SECURITY_PROTOCOL |
None |
보안 프로토콜 |
sasl_mechanism |
SPAKKY_KAFKA__SASL_MECHANISM |
None |
SASL 인증 메커니즘 |
sasl_username |
SPAKKY_KAFKA__SASL_USERNAME |
None |
SASL 사용자명 |
sasl_password |
SPAKKY_KAFKA__SASL_PASSWORD |
None |
SASL 비밀번호 |
number_of_partitions |
SPAKKY_KAFKA__NUMBER_OF_PARTITIONS |
1 |
토픽 파티션 수 |
replication_factor |
SPAKKY_KAFKA__REPLICATION_FACTOR |
1 |
토픽 복제 팩터 |
auto_offset_reset |
SPAKKY_KAFKA__AUTO_OFFSET_RESET |
earliest |
오프셋 리셋 정책 |
poll_timeout |
SPAKKY_KAFKA__POLL_TIMEOUT |
1.0 |
폴링 타임아웃 (초) |
이벤트 발행¶
Integration Event를 발행하면 EventPublisher가 IEventBus를 통해 KafkaEventTransport로 전달합니다.
from uuid import UUID
from spakky.core.common.mutability import immutable
from spakky.domain.models.event import AbstractIntegrationEvent
@immutable
class OrderPlacedEvent(AbstractIntegrationEvent):
order_id: UUID
total_amount: float
from spakky.core.stereotype.usecase import UseCase
from spakky.event.event_publisher import IAsyncEventPublisher
@UseCase()
class PlaceOrderUseCase:
_publisher: IAsyncEventPublisher
def __init__(self, publisher: IAsyncEventPublisher) -> None:
self._publisher = publisher
async def execute(self, order_id: UUID, total: float) -> None:
event = OrderPlacedEvent(order_id=order_id, total_amount=total)
await self._publisher.publish(event)
이벤트 수신¶
@EventHandler와 @on_event로 수신 핸들러를 정의합니다. KafkaPostProcessor가 자동으로 Consumer에 등록합니다.
from spakky.event.stereotype.event_handler import EventHandler, on_event
@EventHandler()
class OrderEventHandler:
@on_event(OrderPlacedEvent)
async def on_order_placed(self, event: OrderPlacedEvent) -> None:
print(f"주문 접수: {event.order_id}, 금액: {event.total_amount}")
토픽 이름은 이벤트 클래스의 __name__(예: OrderPlacedEvent)으로 자동 결정됩니다. 토픽이 존재하지 않으면 number_of_partitions와 replication_factor 설정값으로 자동 생성합니다.
운영 흐름¶
IAsyncEventPublisher.publish()는 Integration Event를 IAsyncEventBus로 넘기고, AsyncDirectEventBus가 이벤트를 JSON bytes로 직렬화한 뒤 AsyncKafkaEventTransport에 전달합니다. Kafka transport는 이벤트 이름을 topic으로 사용하고 trace header를 Kafka headers로 보냅니다.
sequenceDiagram
participant UseCase
participant Publisher as IAsyncEventPublisher
participant Bus as AsyncDirectEventBus
participant Transport as AsyncKafkaEventTransport
participant Broker as Kafka
participant Consumer as AsyncKafkaEventConsumer
participant Handler as @EventHandler
UseCase->>Publisher: publish(OrderPlacedEvent)
Publisher->>Bus: send(integration_event)
Bus->>Transport: send(event_name, json_payload, trace_headers)
Transport->>Broker: produce topic=OrderPlacedEvent
Consumer->>Broker: poll topic=OrderPlacedEvent
Consumer->>Handler: on_order_placed(event)
운영 시에는 아래 항목을 명시적으로 정합니다.
| 항목 | 규칙 |
|---|---|
| topic | AbstractIntegrationEvent.event_name 값, 기본은 클래스명 |
| payload | Pydantic TypeAdapter가 만든 JSON bytes |
| headers | ITracePropagator.inject()가 넣은 trace header |
| consumer group | SPAKKY_KAFKA__GROUP_ID |
| topic 생성 | 없으면 number_of_partitions, replication_factor로 생성 |
| offset reset | SPAKKY_KAFKA__AUTO_OFFSET_RESET (earliest/latest/none) |
spakky-outbox를 함께 로드하면 OutboxEventBus / AsyncOutboxEventBus가 기본 bus를 대체하므로 이벤트는 Kafka에 즉시 produce되지 않고 Outbox 테이블에 저장됩니다. Relay가 재시도 가능한 방식으로 Kafka transport를 호출하므로, 주문 생성 같은 DB 변경과 Kafka 발행을 원자적으로 묶어야 할 때 기본 선택은 Outbox 조합입니다.
SASL 인증¶
프로덕션 환경에서 SASL 인증을 사용하려면:
export SPAKKY_KAFKA__SECURITY_PROTOCOL=SASL_SSL
export SPAKKY_KAFKA__SASL_MECHANISM=PLAIN
export SPAKKY_KAFKA__SASL_USERNAME=my-api-key
export SPAKKY_KAFKA__SASL_PASSWORD=my-api-secret
분산 트레이싱¶
spakky-tracing은 spakky-kafka의 필수 의존성입니다. 컨테이너에 ITracePropagator가 등록되어 있으면 메시지 헤더를 통해 TraceContext가 자동 전파됩니다.
- 발행 측:
OutboxEventBus또는DirectEventBus가 현재TraceContext를 메시지 헤더에 주입 - 수신 측:
KafkaEventConsumer가 헤더에서TraceContext를 추출하여 자식 span 생성 - 헤더가 없으면 새로운 루트 트레이스를 시작
ITracePropagator가 컨테이너에 없으면 트레이싱은 비활성 상태로, 별도 에러 없이 동작합니다
별도 설정이나 코드 변경 없이, 플러그인 로드만으로 동작합니다.