콘텐츠로 이동

spakky-celery

Celery 통합 — 태스크 디스패치, 스케줄링

Main

spakky.plugins.celery.main

Plugin initialization entry point.

initialize(app)

Initialize the Celery plugin.

Registers CeleryConfig, CeleryPostProcessor, and task dispatch aspects.

Parameters:

Name Type Description Default
app SpakkyApplication

The SpakkyApplication instance.

required
Source code in plugins/spakky-celery/src/spakky/plugins/celery/main.py
def initialize(app: SpakkyApplication) -> None:
    """Initialize the Celery plugin.

    Registers CeleryConfig, CeleryPostProcessor, and task dispatch aspects.

    Args:
        app: The SpakkyApplication instance.
    """
    app.add(CeleryConfig)
    app.add(CeleryPostProcessor)
    app.add(CeleryTaskDispatchAspect)
    app.add(AsyncCeleryTaskDispatchAspect)

options: show_root_heading: false

Aspects

spakky.plugins.celery.aspects.task_dispatch

AOP aspects for intercepting @task method calls and dispatching them to Celery.

CeleryTaskDispatchAspect(celery)

Bases: IAspect, IApplicationContextAware

Intercepts synchronous @task method calls and dispatches them to Celery broker.

All @task method calls from outside Celery context are dispatched via send_task(). Inside a Celery worker, calls execute directly to avoid re-dispatch loops.

Initialize with the Celery application instance.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/aspects/task_dispatch.py
def __init__(self, celery: Celery) -> None:
    """Initialize with the Celery application instance."""
    self._celery = celery

set_application_context(application_context)

Store the application context for checking worker context.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/aspects/task_dispatch.py
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Store the application context for checking worker context."""
    self._application_context = application_context

AsyncCeleryTaskDispatchAspect(celery)

Bases: IAsyncAspect, IApplicationContextAware

Intercepts asynchronous @task method calls and dispatches them to Celery broker.

All @task method calls from outside Celery context are dispatched via send_task(). Inside a Celery worker, calls execute directly to avoid re-dispatch loops.

Initialize with the Celery application instance.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/aspects/task_dispatch.py
def __init__(self, celery: Celery) -> None:
    """Initialize with the Celery application instance."""
    self._celery = celery

set_application_context(application_context)

Store the application context for checking worker context.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/aspects/task_dispatch.py
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Store the application context for checking worker context."""
    self._application_context = application_context

options: show_root_heading: false

Configuration

spakky.plugins.celery.common.config

Celery configuration.

CelerySerializer

Bases: str, Enum

Celery message serialization formats.

CeleryConfig()

Bases: BaseSettings

Celery plugin configuration loaded from environment variables.

Initialize config from environment variables.

Note: This override is required for Spakky's @Configuration decorator to work with Pydantic BaseSettings. The decorator analyzes init parameters as dependencies, and BaseSettings' default init has a pydantic_self parameter that Spakky cannot resolve.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/common/config.py
def __init__(self) -> None:
    """Initialize config from environment variables.

    Note: This override is required for Spakky's @Configuration decorator
    to work with Pydantic BaseSettings. The decorator analyzes __init__
    parameters as dependencies, and BaseSettings' default __init__ has
    a __pydantic_self__ parameter that Spakky cannot resolve.
    """
    super().__init__()

app_name = 'spakky-celery' class-attribute instance-attribute

Celery application name.

broker_url instance-attribute

Celery broker URL (e.g., 'amqp://user:pass@host:5672//').

result_backend = None class-attribute instance-attribute

Celery result backend URL. None disables result storage.

task_serializer = CelerySerializer.JSON class-attribute instance-attribute

Serializer for task messages.

result_serializer = CelerySerializer.JSON class-attribute instance-attribute

Serializer for task results.

accept_content = [CelerySerializer.JSON] class-attribute instance-attribute

Accepted content types for deserialization.

timezone = 'UTC' class-attribute instance-attribute

Timezone for scheduled tasks (IANA timezone, e.g., 'UTC', 'Asia/Seoul').

enable_utc = True class-attribute instance-attribute

Use UTC for internal datetime handling.

options: show_root_heading: false

Task Result

spakky.plugins.celery.common.task_result

Celery-backed implementation of TaskResult.

CeleryTaskResult(result)

Bases: AbstractTaskResult[T], Generic[T]

Wraps a Celery AsyncResult, exposing the broker-agnostic TaskResult interface.

Wrap a Celery AsyncResult.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/common/task_result.py
def __init__(self, result: AsyncResult) -> None:
    """Wrap a Celery AsyncResult."""
    self._result = result

options: show_root_heading: false

Post Processor

spakky.plugins.celery.post_processor

Post-processor for registering TaskHandler methods as Celery tasks.

CeleryPostProcessor

Bases: IPostProcessor, IApplicationContextAware

Post-processor that registers TaskHandler-annotated Pods as Celery tasks.

set_application_context(application_context)

Store the application context for resolving handlers at runtime.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/post_processor.py
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Store the application context for resolving handlers at runtime."""
    self.__application_context = application_context

post_process(pod)

Register TaskHandler methods as Celery tasks and beat schedules.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/post_processor.py
def post_process(self, pod: object) -> object:
    """Register TaskHandler methods as Celery tasks and beat schedules."""
    if not TaskHandler.exists(pod):
        return pod

    celery: Celery = self.__application_context.get(Celery)
    pod_type = TaskHandler.get(pod).type_

    for _, method in getmembers(pod_type, isfunction):
        has_task = TaskRoute.get_or_none(method) is not None
        schedule_route = ScheduleRoute.get_or_none(method)
        has_schedule = schedule_route is not None

        if not has_task and not has_schedule:
            continue

        task_name = self._register_method(celery, pod_type, method)

        if has_schedule:
            assert schedule_route is not None
            celery.conf.beat_schedule[task_name] = {
                "task": task_name,
                "schedule": self._to_celery_schedule(schedule_route),
            }
            logger.debug(
                "Registered beat schedule '%s' from handler '%s'",
                task_name,
                pod_type.__name__,
            )
        else:
            logger.debug(
                "Registered task '%s' from handler '%s'",
                task_name,
                pod_type.__name__,
            )

    return pod

options: show_root_heading: false

Errors

spakky.plugins.celery.error

Celery plugin error hierarchy.

AbstractSpakkyCeleryError

Bases: AbstractSpakkyFrameworkError, ABC

Base exception for Spakky Celery errors.

InvalidScheduleRouteError

Bases: AbstractSpakkyCeleryError

Raised when a ScheduleRoute has no valid schedule specification.

options: show_root_heading: false