콘텐츠로 이동

spakky-celery

Celery 통합 — 태스크 디스패치, 스케줄링

메인

spakky.plugins.celery.main

Plugin initialization entry point.

celery_app(config)

Create the default Celery application managed by the plugin.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/main.py
@Pod(name="celery_app")
def celery_app(config: CeleryConfig) -> Celery:
    """Create the default Celery application managed by the plugin."""
    celery = Celery(
        main=config.app_name,
        broker=config.broker_url,
        backend=config.result_backend,
    )
    celery.conf.update(
        task_serializer=config.task_serializer.value,
        result_serializer=config.result_serializer.value,
        accept_content=[serializer.value for serializer in config.accept_content],
        timezone=config.timezone,
        enable_utc=config.enable_utc,
    )
    return celery

initialize(app)

Initialize the Celery plugin.

Registers CeleryConfig, CeleryPostProcessor, and task dispatch aspects.

Parameters:

Name Type Description Default
app SpakkyApplication

The SpakkyApplication instance.

required
Source code in plugins/spakky-celery/src/spakky/plugins/celery/main.py
def initialize(app: SpakkyApplication) -> None:
    """Initialize the Celery plugin.

    Registers CeleryConfig, CeleryPostProcessor, and task dispatch aspects.

    Args:
        app: The SpakkyApplication instance.
    """
    if not _has_celery_pod(app):
        app.add(CeleryConfig)
        app.add(celery_app)
    app.add(CeleryPostProcessor)
    app.add(CeleryTaskDispatchAspect)
    app.add(AsyncCeleryTaskDispatchAspect)

options: show_root_heading: false

Aspect

spakky.plugins.celery.aspects.task_dispatch

AOP aspects for intercepting @task method calls and dispatching them to Celery.

CeleryTaskDispatchAspect(celery, auth_snapshot_signer=None, auth_snapshot_propagation_configs=())

Bases: IAspect, IApplicationContextAware

Intercepts synchronous @task method calls and dispatches them to Celery broker.

All @task method calls from outside Celery context are dispatched via send_task(). Inside a Celery worker, calls execute directly to avoid re-dispatch loops.

Initialize with the Celery application instance.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/aspects/task_dispatch.py
def __init__(
    self,
    celery: Celery,
    auth_snapshot_signer: IAuthContextSnapshotSigner | None = None,
    auth_snapshot_propagation_configs: tuple[
        AuthSnapshotPropagationConfig, ...
    ] = (),
) -> None:
    """Initialize with the Celery application instance."""
    self._celery = celery
    self._propagator = None
    self._auth_snapshot_signer = auth_snapshot_signer
    self._auth_snapshot_propagation_config = (
        effective_auth_snapshot_propagation_config(
            auth_snapshot_propagation_configs
        )
    )

set_application_context(application_context)

Store the application context for checking worker context.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/aspects/task_dispatch.py
@override
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Store the application context for checking worker context."""
    self._application_context = application_context

set_propagator(propagator)

Set the trace propagator for injecting trace context into task headers.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/aspects/task_dispatch.py
def set_propagator(self, propagator: ITracePropagator) -> None:
    """Set the trace propagator for injecting trace context into task headers."""
    self._propagator = propagator

AsyncCeleryTaskDispatchAspect(celery, auth_snapshot_signer=None, auth_snapshot_propagation_configs=())

Bases: IAsyncAspect, IApplicationContextAware

Intercepts asynchronous @task method calls and dispatches them to Celery broker.

All @task method calls from outside Celery context are dispatched via send_task(). Inside a Celery worker, calls execute directly to avoid re-dispatch loops.

Initialize with the Celery application instance.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/aspects/task_dispatch.py
def __init__(
    self,
    celery: Celery,
    auth_snapshot_signer: IAuthContextSnapshotSigner | None = None,
    auth_snapshot_propagation_configs: tuple[
        AuthSnapshotPropagationConfig, ...
    ] = (),
) -> None:
    """Initialize with the Celery application instance."""
    self._celery = celery
    self._propagator = None
    self._auth_snapshot_signer = auth_snapshot_signer
    self._auth_snapshot_propagation_config = (
        effective_auth_snapshot_propagation_config(
            auth_snapshot_propagation_configs
        )
    )

set_application_context(application_context)

Store the application context for checking worker context.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/aspects/task_dispatch.py
@override
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Store the application context for checking worker context."""
    self._application_context = application_context

set_propagator(propagator)

Set the trace propagator for injecting trace context into task headers.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/aspects/task_dispatch.py
def set_propagator(self, propagator: ITracePropagator) -> None:
    """Set the trace propagator for injecting trace context into task headers."""
    self._propagator = propagator

options: show_root_heading: false

Auth

spakky.plugins.celery.auth

Auth snapshot propagation and enforcement for Celery task boundaries.

CELERY_AUTH_BOUNDARY = 'task' module-attribute

AuthInvocation boundary value used for Celery task workers.

inject_auth_context_snapshot(headers, *, application_context, auth_snapshot_signer, auth_snapshot_propagation_config)

Inject a signed AuthContextSnapshot into Celery task headers.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/auth.py
def inject_auth_context_snapshot(
    headers: dict[str, str],
    *,
    application_context: IApplicationContext,
    auth_snapshot_signer: IAuthContextSnapshotSigner | None,
    auth_snapshot_propagation_config: AuthSnapshotPropagationConfig,
) -> None:
    """Inject a signed AuthContextSnapshot into Celery task headers."""
    if not auth_snapshot_propagation_config.enabled:
        return
    value = application_context.get_context_value(AUTH_CONTEXT_CONTEXT_KEY)
    if value is None:
        return
    if not isinstance(value, AuthContext):
        raise InvalidAuthContextValueError()
    if auth_snapshot_signer is None:
        raise AuthSnapshotPropagationSignerUnavailableError()
    snapshot = auth_snapshot_signer.sign_snapshot(
        SnapshotSignRequest(auth_context=value)
    )
    headers[AUTH_CONTEXT_SNAPSHOT_METADATA_KEY] = snapshot.base64url_canonical_json()

seed_and_authorize_celery_task(*, application_context, task_name, headers, auth_metadata, snapshot_verifier, authorization_policy_evaluator, permission_checker, relation_checker, role_checker, scope_checker)

Verify a worker snapshot, seed AuthContext, and enforce task metadata.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/auth.py
def seed_and_authorize_celery_task(
    *,
    application_context: IApplicationContext,
    task_name: str,
    headers: dict[str, str],
    auth_metadata: TaskAuthMetadata,
    snapshot_verifier: IAuthContextSnapshotVerifier | None,
    authorization_policy_evaluator: IAuthorizationPolicyEvaluator | None,
    permission_checker: IPermissionChecker | None,
    relation_checker: IRelationChecker | None,
    role_checker: IRoleChecker | None,
    scope_checker: IScopeChecker | None,
) -> None:
    """Verify a worker snapshot, seed AuthContext, and enforce task metadata."""
    if not auth_metadata.protected:
        return
    invocation = AuthInvocation(
        boundary=CELERY_AUTH_BOUNDARY,
        operation=task_name,
        attributes=(
            AuthInvocationAttribute(
                name="auth_context_snapshot",
                value=headers.get(AUTH_CONTEXT_SNAPSHOT_METADATA_KEY),
            ),
        ),
    )
    auth_context = _verify_snapshot(
        headers.get(AUTH_CONTEXT_SNAPSHOT_METADATA_KEY),
        invocation,
        snapshot_verifier,
    )
    store_auth_context(application_context, auth_context)
    for requirement in auth_metadata.requirements:
        decision = _evaluate_requirement(
            requirement,
            auth_context,
            authorization_policy_evaluator=authorization_policy_evaluator,
            permission_checker=permission_checker,
            relation_checker=relation_checker,
            role_checker=role_checker,
            scope_checker=scope_checker,
        )
        if decision.state is not AuthorizationDecisionState.ALLOW:
            raise AuthRequirementDeniedError(decision)

is_retryable_auth_failure(error)

Return whether a task auth failure should use Celery retry semantics.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/auth.py
def is_retryable_auth_failure(error: AuthRequirementDeniedError) -> bool:
    """Return whether a task auth failure should use Celery retry semantics."""
    return (
        error.decision is not None
        and error.decision.state is AuthorizationDecisionState.ERROR
    )

options: show_root_heading: false

설정

spakky.plugins.celery.common.config

Celery configuration.

CelerySerializer

Bases: StrEnum

Celery message serialization formats.

CeleryConfig()

Bases: BaseSettings

Celery plugin configuration loaded from environment variables.

Initialize config from environment variables.

Note: This override is required for Spakky's @Configuration decorator to work with Pydantic BaseSettings. The decorator analyzes init parameters as dependencies, and BaseSettings' default init has a pydantic_self parameter that Spakky cannot resolve.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/common/config.py
def __init__(self) -> None:
    """Initialize config from environment variables.

    Note: This override is required for Spakky's @Configuration decorator
    to work with Pydantic BaseSettings. The decorator analyzes __init__
    parameters as dependencies, and BaseSettings' default __init__ has
    a __pydantic_self__ parameter that Spakky cannot resolve.
    """
    super().__init__()

app_name = 'spakky-celery' class-attribute instance-attribute

Celery application name.

broker_url instance-attribute

Celery broker URL (e.g., 'amqp://user:pass@host:5672//').

result_backend = None class-attribute instance-attribute

Celery result backend URL. None disables result storage.

task_serializer = CelerySerializer.JSON class-attribute instance-attribute

Serializer for task messages.

result_serializer = CelerySerializer.JSON class-attribute instance-attribute

Serializer for task results.

accept_content = [CelerySerializer.JSON] class-attribute instance-attribute

Accepted content types for deserialization.

timezone = 'UTC' class-attribute instance-attribute

Timezone for scheduled tasks (IANA timezone, e.g., 'UTC', 'Asia/Seoul').

enable_utc = True class-attribute instance-attribute

Use UTC for internal datetime handling.

options: show_root_heading: false

태스크 결과

spakky.plugins.celery.common.task_result

Celery-backed implementation of TaskResult.

CeleryTaskResult(result)

Bases: ITaskResult[T]

Wraps a Celery AsyncResult, exposing the broker-agnostic TaskResult interface.

Wrap a Celery AsyncResult.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/common/task_result.py
def __init__(self, result: AsyncResult) -> None:
    """Wrap a Celery AsyncResult."""
    self._result = result

options: show_root_heading: false

후처리기

spakky.plugins.celery.post_processor

Post-processor for registering TaskHandler methods as Celery tasks.

CeleryPostProcessor(auth_snapshot_verifier=None, authorization_policy_evaluator=None, permission_checker=None, relation_checker=None, role_checker=None, scope_checker=None)

Bases: IPostProcessor, IApplicationContextAware

Post-processor that registers TaskHandler-annotated Pods as Celery tasks.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/post_processor.py
def __init__(
    self,
    auth_snapshot_verifier: IAuthContextSnapshotVerifier | None = None,
    authorization_policy_evaluator: IAuthorizationPolicyEvaluator | None = None,
    permission_checker: IPermissionChecker | None = None,
    relation_checker: IRelationChecker | None = None,
    role_checker: IRoleChecker | None = None,
    scope_checker: IScopeChecker | None = None,
) -> None:
    self.__auth_snapshot_verifier = auth_snapshot_verifier
    self.__authorization_policy_evaluator = authorization_policy_evaluator
    self.__permission_checker = permission_checker
    self.__relation_checker = relation_checker
    self.__role_checker = role_checker
    self.__scope_checker = scope_checker

set_application_context(application_context)

Store the application context for resolving handlers at runtime.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/post_processor.py
@override
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Store the application context for resolving handlers at runtime."""
    self.__application_context = application_context

post_process(pod)

Register TaskHandler methods as Celery tasks and beat schedules.

Source code in plugins/spakky-celery/src/spakky/plugins/celery/post_processor.py
@override
def post_process(self, pod: object) -> object:
    """Register TaskHandler methods as Celery tasks and beat schedules."""
    if not TaskHandler.exists(pod):
        return pod

    celery: Celery = self.__application_context.get(Celery)
    pod_type = TaskHandler.get(pod).type_

    propagator = self.__application_context.get_or_none(type_=ITracePropagator)
    if propagator is not None:
        for aspect_type in (
            CeleryTaskDispatchAspect,
            AsyncCeleryTaskDispatchAspect,
        ):
            if self.__application_context.contains(aspect_type):
                aspect = self.__application_context.get(type_=aspect_type)
                aspect.set_propagator(propagator)

    for _, method in getmembers(pod_type, isfunction):
        has_task = TaskRoute.get_or_none(method) is not None
        schedule_route = ScheduleRoute.get_or_none(method)
        has_schedule = schedule_route is not None

        if not has_task and not has_schedule:
            continue

        task_name = self._register_method(celery, pod_type, method, propagator)

        if has_schedule:
            if (
                schedule_route is None
            ):  # pragma: no cover - type-narrowing guard; has_schedule implies schedule_route is not None
                raise InvalidScheduleRouteError
            celery.conf.beat_schedule[task_name] = {
                "task": task_name,
                "schedule": self._to_celery_schedule(schedule_route),
            }
            logger.debug(
                "Registered beat schedule '%s' from handler '%s'",
                task_name,
                pod_type.__name__,
            )
        else:
            logger.debug(
                "Registered task '%s' from handler '%s'",
                task_name,
                pod_type.__name__,
            )

    return pod

options: show_root_heading: false

에러

spakky.plugins.celery.error

Celery plugin error hierarchy.

AbstractSpakkyCeleryError

Bases: AbstractSpakkyFrameworkError, ABC

Base exception for Spakky Celery errors.

InvalidTimezoneError

Bases: AbstractSpakkyCeleryError

Raised when an invalid IANA timezone string is provided.

InvalidScheduleRouteError

Bases: AbstractSpakkyCeleryError

Raised when a ScheduleRoute has no valid schedule specification.

AuthSnapshotPropagationSignerUnavailableError

Bases: AbstractSpakkyCeleryError

Raised when signed task snapshot propagation has no signer provider.

options: show_root_heading: false

추가 모듈

Shared constants for the spakky-celery plugin.

CELERY_TASK_CONTEXT_KEY = '__celery_task__' module-attribute

ApplicationContext key indicating current execution is inside a Celery task.