콘텐츠로 이동

spakky-kafka

Kafka 통합 — 이벤트 전송/수신

Main

spakky.plugins.kafka.main

initialize(app)

Initialize the spakky-kafka plugin.

Registers Kafka consumers, transports, and post-processor.

Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/main.py
def initialize(app: SpakkyApplication) -> None:
    """Initialize the spakky-kafka plugin.

    Registers Kafka consumers, transports, and post-processor.
    """
    app.add(KafkaConnectionConfig)

    app.add(KafkaEventConsumer)
    app.add(KafkaEventTransport)

    app.add(AsyncKafkaEventConsumer)
    app.add(AsyncKafkaEventTransport)

    app.add(KafkaPostProcessor)

options: show_root_heading: false

Event Transport

spakky.plugins.kafka.event.transport

KafkaEventTransport(config)

Bases: IEventTransport

Synchronous Kafka event transport using confluent_kafka Producer.

Initialize the Kafka producer with connection config.

Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/event/transport.py
def __init__(self, config: KafkaConnectionConfig) -> None:
    """Initialize the Kafka producer with connection config."""
    self.config = config
    self.admin = AdminClient(self.config.configuration_dict)
    self.producer = Producer(self.config.configuration_dict, logger=logger)

send(event_name, payload)

Send a pre-serialized event payload to Kafka.

Parameters:

Name Type Description Default
event_name str

Topic name (typically the event class name).

required
payload bytes

Pre-serialized JSON bytes.

required
Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/event/transport.py
def send(self, event_name: str, payload: bytes) -> None:
    """Send a pre-serialized event payload to Kafka.

    Args:
        event_name: Topic name (typically the event class name).
        payload: Pre-serialized JSON bytes.
    """
    self._create_topic(topic=event_name)
    self.producer.produce(
        topic=event_name,
        value=payload,
        callback=self._message_delivery_report,
    )
    self.producer.poll(0)
    self.producer.flush()

AsyncKafkaEventTransport(config)

Bases: IAsyncEventTransport

Asynchronous Kafka event transport using confluent_kafka AIOProducer.

Initialize the async Kafka transport with connection config.

Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/event/transport.py
def __init__(self, config: KafkaConnectionConfig) -> None:
    """Initialize the async Kafka transport with connection config."""
    self.config = config
    self.admin = AdminClient(self.config.configuration_dict)

send(event_name, payload) async

Asynchronously send a pre-serialized event payload to Kafka.

Parameters:

Name Type Description Default
event_name str

Topic name (typically the event class name).

required
payload bytes

Pre-serialized JSON bytes.

required
Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/event/transport.py
async def send(self, event_name: str, payload: bytes) -> None:
    """Asynchronously send a pre-serialized event payload to Kafka.

    Args:
        event_name: Topic name (typically the event class name).
        payload: Pre-serialized JSON bytes.
    """
    self.producer = AIOProducer(self.config.configuration_dict)
    self._create_topic(topic=event_name)
    await self.producer.produce(
        topic=event_name,
        value=payload,
        callback=self._message_delivery_report,
    )
    await self.producer.poll(0)
    await self.producer.flush()

options: show_root_heading: false

Event Consumer

spakky.plugins.kafka.event.consumer

KafkaEventConsumer(config)

Bases: IEventConsumer, AbstractBackgroundService

Synchronous Kafka event consumer that polls messages and dispatches to handlers.

Initialize the Kafka consumer with connection config.

Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/event/consumer.py
def __init__(self, config: KafkaConnectionConfig) -> None:
    """Initialize the Kafka consumer with connection config."""
    super().__init__()
    self.config = config
    self.type_lookup = {}
    self.type_adapters = {}
    self.handlers = {}
    self.admin = AdminClient(self.config.configuration_dict)
    self.consumer = Consumer(
        self.config.configuration_dict,
        logger=logger,
    )

register(event, handler)

Register a handler for the given event type.

Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/event/consumer.py
def register(
    self,
    event: type[EventT_contra],
    handler: EventHandlerCallback[EventT_contra],
) -> None:
    """Register a handler for the given event type."""
    if event not in self.handlers:
        self.handlers[event] = []
        self.type_adapters[event] = TypeAdapter(event)
        self.type_lookup[event.__name__] = event
    self.handlers[event].append(handler)

initialize()

Create Kafka topics and subscribe the consumer.

Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/event/consumer.py
def initialize(self) -> None:
    """Create Kafka topics and subscribe the consumer."""
    topics: list[str] = [event_type.__name__ for event_type in self.handlers.keys()]
    self._create_topics(topics=topics)
    self.consumer.subscribe(topics=topics)

run()

Poll Kafka for messages and route them to registered handlers.

Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/event/consumer.py
def run(self) -> None:
    """Poll Kafka for messages and route them to registered handlers."""
    while not self._stop_event.is_set():
        message: Message | None = self.consumer.poll(
            timeout=self.config.poll_timeout
        )
        if message is None:
            continue
        self._route_event_handler(message)

dispose()

Close the Kafka consumer connection.

Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/event/consumer.py
def dispose(self) -> None:
    """Close the Kafka consumer connection."""
    self.consumer.close()

AsyncKafkaEventConsumer(config)

Bases: IAsyncEventConsumer, AbstractAsyncBackgroundService

Asynchronous Kafka event consumer that polls messages and dispatches to handlers.

Initialize the async Kafka consumer with connection config.

Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/event/consumer.py
def __init__(self, config: KafkaConnectionConfig) -> None:
    """Initialize the async Kafka consumer with connection config."""
    super().__init__()
    self.config = config
    self.type_lookup = {}
    self.type_adapters = {}
    self.handlers = {}
    self.admin = AdminClient(self.config.configuration_dict)

register(event, handler)

Register an async handler for the given event type.

Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/event/consumer.py
def register(
    self,
    event: type[EventT_contra],
    handler: AsyncEventHandlerCallback[EventT_contra],
) -> None:
    """Register an async handler for the given event type."""
    if event not in self.handlers:
        self.handlers[event] = []
        self.type_adapters[event] = TypeAdapter(event)
        self.type_lookup[event.__name__] = event
    self.handlers[event].append(handler)

initialize_async() async

Create Kafka topics and subscribe the async consumer.

Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/event/consumer.py
async def initialize_async(self) -> None:
    """Create Kafka topics and subscribe the async consumer."""
    self.consumer = AIOConsumer(self.config.configuration_dict)
    topics: list[str] = [event_type.__name__ for event_type in self.handlers.keys()]
    self._create_topics(topics=topics)
    await self.consumer.subscribe(topics=topics)

run_async() async

Poll Kafka asynchronously for messages and route them to handlers.

Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/event/consumer.py
async def run_async(self) -> None:  # pragma: no cover - 별도 asyncio 태스크로 실행
    """Poll Kafka asynchronously for messages and route them to handlers."""
    while not self._stop_event.is_set():
        message: Message | None = await self.consumer.poll(
            timeout=self.config.poll_timeout
        )
        if message is None:
            continue
        await self._route_event_handler(message)

dispose_async() async

Close the async Kafka consumer connection.

Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/event/consumer.py
async def dispose_async(self) -> None:
    """Close the async Kafka consumer connection."""
    await self.consumer.close()

options: show_root_heading: false

Configuration

spakky.plugins.kafka.common.config

Configuration for Kafka connections.

Provides configuration dataclass for Kafka connection parameters including bootstrap servers, consumer group, and security settings.

AutoOffsetResetType

Bases: str, Enum

Kafka consumer auto offset reset policies.

KafkaConnectionConfig()

Bases: BaseSettings

Kafka connection configuration loaded from environment variables.

Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/common/config.py
def __init__(self) -> None:
    super().__init__()

group_id instance-attribute

Kafka consumer group identifier.

client_id instance-attribute

Kafka client identifier.

bootstrap_servers instance-attribute

Kafka bootstrap servers.

security_protocol = None class-attribute instance-attribute

Security protocol for Kafka connection.

sasl_mechanism = None class-attribute instance-attribute

SASL mechanism for Kafka authentication.

sasl_username = None class-attribute instance-attribute

SASL username for Kafka authentication.

sasl_password = None class-attribute instance-attribute

SASL password for Kafka authentication.

number_of_partitions = 1 class-attribute instance-attribute

Default number of partitions for created topics.

replication_factor = 1 class-attribute instance-attribute

Default replication factor for created topics.

auto_offset_reset = AutoOffsetResetType.EARLIEST class-attribute instance-attribute

Consumer auto offset reset policy (earliest, latest, none).

poll_timeout = 1.0 class-attribute instance-attribute

Consumer poll timeout in seconds.

options: show_root_heading: false

Post Processor

spakky.plugins.kafka.post_processor

KafkaPostProcessor

Bases: IPostProcessor, IContainerAware, IApplicationContextAware

Post-processor that registers event handlers with Kafka consumers.

Scans @EventHandler decorated classes for @event decorated methods and automatically registers them with the appropriate Kafka consumer (sync or async) with proper dependency injection.

set_container(container)

Set the container for dependency injection.

Parameters:

Name Type Description Default
container IContainer

The IoC container.

required
Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/post_processor.py
def set_container(self, container: IContainer) -> None:
    """Set the container for dependency injection.

    Args:
        container: The IoC container.
    """
    self.__container = container

set_application_context(application_context)

Set the application context.

Parameters:

Name Type Description Default
application_context IApplicationContext

The application context instance.

required
Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/post_processor.py
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Set the application context.

    Args:
        application_context: The application context instance.
    """
    self.__application_context = application_context

post_process(pod)

Register event handlers from event handler classes.

Scans the event handler for methods decorated with @on_event and registers them with the appropriate Kafka consumer (sync or async) based on whether the method is a coroutine function.

Parameters:

Name Type Description Default
pod object

The Pod to process.

required

Returns:

Type Description
object

The Pod, with event handlers registered if it's an event handler.

Source code in plugins/spakky-kafka/src/spakky/plugins/kafka/post_processor.py
def post_process(self, pod: object) -> object:
    """Register event handlers from event handler classes.

    Scans the event handler for methods decorated with @on_event and registers
    them with the appropriate Kafka consumer (sync or async) based on
    whether the method is a coroutine function.

    Args:
        pod: The Pod to process.

    Returns:
        The Pod, with event handlers registered if it's an event handler.
    """
    if not EventHandler.exists(pod):
        return pod
    handler: EventHandler = EventHandler.get(pod)
    consumer = self.__container.get(IEventConsumer)
    async_consumer = self.__container.get(IAsyncEventConsumer)
    for name, method in getmembers(pod, ismethod):
        route: EventRoute[AbstractEvent] | None = EventRoute[
            AbstractEvent
        ].get_or_none(method)
        if route is None:
            continue
        if not issubclass(route.event_type, AbstractIntegrationEvent):
            continue

        # pylint: disable=line-too-long
        logger.info(
            f"[{type(self).__name__}] {route.event_type.__name__} -> {method.__qualname__}"
        )

        if iscoroutinefunction(method):

            @wraps(method)
            async def async_endpoint(
                *args: Any,
                method_name: str = name,
                controller_type: type[object] = handler.type_,
                context: IContainer = self.__container,
                **kwargs: Any,
            ) -> Any:
                # Each message is handled in isolation, so clear the
                # application context to avoid reusing dependency state.
                self.__application_context.clear_context()
                controller_instance = context.get(controller_type)
                method_to_call = getattr(controller_instance, method_name)
                return await method_to_call(*args, **kwargs)

            async_consumer.register(route.event_type, async_endpoint)
            continue

        @wraps(method)
        def endpoint(
            *args: Any,
            method_name: str = name,
            controller_type: type[object] = handler.type_,
            context: IContainer = self.__container,
            **kwargs: Any,
        ) -> Any:
            # Synchronous consumers share threads, so drop any lingering
            # scoped data before invoking the handler.
            self.__application_context.clear_context()
            controller_instance = context.get(controller_type)
            method_to_call = getattr(controller_instance, method_name)
            return method_to_call(*args, **kwargs)

        consumer.register(route.event_type, endpoint)
    return pod

options: show_root_heading: false