콘텐츠로 이동

spakky-task

spakky-task는 task handler, schedule, crontab, direct execution 계약을 제공합니다.

태스크 추상화 — 스케줄링, 디스패치

플러그인 진입점

spakky.task.main

Plugin initialization entry point.

initialize(app)

Initialize the spakky-task plugin.

Parameters:

Name Type Description Default
app SpakkyApplication

The SpakkyApplication instance.

required
Source code in core/spakky-task/src/spakky/task/main.py
def initialize(app: SpakkyApplication) -> None:
    """Initialize the spakky-task plugin.

    Args:
        app: The SpakkyApplication instance.
    """
    app.add(DirectTaskExecutor)
    app.add(TaskRegistrationPostProcessor)

options: show_root_heading: false

스테레오타입

spakky.task.stereotype.task_handler

TaskHandler stereotype and task routing decorators.

This module provides @TaskHandler stereotype and @task decorator for organizing task-queue-driven architectures.

TaskAuthRequirementRef = str

Canonical auth requirement reference carried with task metadata.

TaskAuthResourceRef = str

Canonical resource reference attached to task auth metadata.

TaskAuthActionRef = str

Canonical action reference attached to task auth metadata.

TaskAuthTenantRef = str

Canonical tenant reference attached to task auth metadata.

TaskAuthRequirementMetadata(*, kind, ref, resource=None, action=None, tenant=None) dataclass

Auth requirement metadata copied onto a task route.

kind instance-attribute

Provider-neutral auth requirement category.

ref instance-attribute

Canonical permission, role, scope, relation, policy, or marker ref.

resource = None class-attribute instance-attribute

Optional resource reference for resource-scoped checks.

action = None class-attribute instance-attribute

Optional action reference for policy checks.

tenant = None class-attribute instance-attribute

Optional tenant reference for tenant-scoped checks.

TaskAuthMetadata(*, public_access=False, requirements=()) dataclass

Auth boundary metadata associated with a task route.

public_access = False class-attribute instance-attribute

Whether the task is explicitly public.

requirements = () class-attribute instance-attribute

Ordered, duplicate-free auth requirements inherited by the task.

protected property

Return whether this task carries protected auth requirements.

TaskRoute(auth_metadata=TaskAuthMetadata()) dataclass

Bases: FunctionAnnotation

Annotation for marking methods as dispatchable tasks.

Associates a method as a task that can be dispatched to a task queue.

auth_metadata = field(default_factory=TaskAuthMetadata) class-attribute instance-attribute

Effective auth metadata for direct in-process task invocation.

TaskHandler(*, name='', scope=Scope.SINGLETON) dataclass

Bases: Pod

Stereotype for task handler classes.

TaskHandlers contain methods decorated with @task that can be dispatched to task queues asynchronously.

task(obj)

Decorator for marking methods as dispatchable tasks.

All @task methods are dispatched to the task queue by the plugin aspect.

Example

@TaskHandler() class EmailTaskHandler: @task def send_email(self, to: str, subject: str, body: str) -> None: ...

Parameters:

Name Type Description Default
obj Callable[P, T]

The method to mark as a task.

required

Returns:

Type Description
Callable[P, T]

The annotated method.

Source code in core/spakky-task/src/spakky/task/stereotype/task_handler.py
def task[**P, T](obj: Callable[P, T]) -> Callable[P, T]:
    """Decorator for marking methods as dispatchable tasks.

    All @task methods are dispatched to the task queue by the plugin aspect.

    Example:
        @TaskHandler()
        class EmailTaskHandler:
            @task
            def send_email(self, to: str, subject: str, body: str) -> None:
                ...

    Args:
        obj: The method to mark as a task.

    Returns:
        The annotated method.
    """
    route = TaskRoute()
    return cast(Callable[P, T], route(obj))

collect_task_auth_metadata(method, *, owner_type=None)

Collect auth decorator metadata for a task without importing spakky-auth.

Source code in core/spakky-task/src/spakky/task/stereotype/task_handler.py
def collect_task_auth_metadata(
    method: object,
    *,
    owner_type: type[object] | None = None,
) -> TaskAuthMetadata:
    """Collect auth decorator metadata for a task without importing spakky-auth."""
    sources = (owner_type, method) if owner_type is not None else (method,)
    public_access = any(
        _has_auth_annotation(source, _PUBLIC_ACCESS_QUALNAME)
        for source in sources
        if source is not None
    )
    requirements = _dedupe_requirements(
        requirement
        for source in sources
        if source is not None
        for requirement in _auth_requirements_from_source(source)
    )
    return TaskAuthMetadata(public_access=public_access, requirements=requirements)

options: show_root_heading: false

spakky.task.stereotype.schedule

Schedule stereotype for periodic task execution.

Provides @schedule decorator to mark TaskHandler methods for periodic execution (interval, daily, or crontab-based).

ScheduleRoute(interval=None, at=None, crontab=None) dataclass

Bases: FunctionAnnotation

Annotation for marking methods as periodically scheduled tasks.

Exactly one of interval, at, or crontab must be provided.

interval = None class-attribute instance-attribute

Fixed interval between executions.

at = None class-attribute instance-attribute

Daily execution at a specific time.

crontab = None class-attribute instance-attribute

Cron-like schedule specification.

schedule(*, interval=None, at=None, crontab=None)

Decorator for marking methods as periodically scheduled tasks.

Exactly one of interval, at, or crontab must be specified.

Example

@TaskHandler() class MaintenanceHandler: @schedule(interval=timedelta(minutes=30)) def health_check(self) -> None: ...

@schedule(at=time(3, 0))
def daily_cleanup(self) -> None:
    ...

@schedule(crontab=Crontab(hour=9, weekday=(Weekday.MONDAY, Weekday.WEDNESDAY, Weekday.FRIDAY)))
def triweekly_report(self) -> None:
    ...

Parameters:

Name Type Description Default
interval timedelta | None

Fixed interval between executions.

None
at time | None

Daily execution at a specific time.

None
crontab Crontab | None

Cron-like schedule specification.

None

Returns:

Type Description
Callable[[Callable[P, T]], Callable[P, T]]

A decorator that annotates the method with ScheduleRoute.

Source code in core/spakky-task/src/spakky/task/stereotype/schedule.py
def schedule[**P, T](
    *,
    interval: timedelta | None = None,
    at: time | None = None,
    crontab: Crontab | None = None,
) -> Callable[[Callable[P, T]], Callable[P, T]]:
    """Decorator for marking methods as periodically scheduled tasks.

    Exactly one of ``interval``, ``at``, or ``crontab`` must be specified.

    Example:
        @TaskHandler()
        class MaintenanceHandler:
            @schedule(interval=timedelta(minutes=30))
            def health_check(self) -> None:
                ...

            @schedule(at=time(3, 0))
            def daily_cleanup(self) -> None:
                ...

            @schedule(crontab=Crontab(hour=9, weekday=(Weekday.MONDAY, Weekday.WEDNESDAY, Weekday.FRIDAY)))
            def triweekly_report(self) -> None:
                ...

    Args:
        interval: Fixed interval between executions.
        at: Daily execution at a specific time.
        crontab: Cron-like schedule specification.

    Returns:
        A decorator that annotates the method with ScheduleRoute.
    """
    route = ScheduleRoute(interval=interval, at=at, crontab=crontab)
    return cast(Callable[[Callable[P, T]], Callable[P, T]], route)

options: show_root_heading: false

spakky.task.stereotype.crontab

Crontab value object for schedule specification.

Weekday

Bases: IntEnum

Day of the week (ISO 8601: Monday=0).

Month

Bases: IntEnum

Month of the year (1-12).

Crontab(month=None, day=None, weekday=None, hour=0, minute=0) dataclass

Cron-like schedule specification using Python native types.

Fields use None to mean "every" (wildcard). A single int means exactly that value; a tuple means multiple values.

Example

Every Monday at 03:00

Crontab(weekday=Weekday.MONDAY, hour=3)

Mon/Wed/Fri at 09:00

Crontab(weekday=(Weekday.MONDAY, Weekday.WEDNESDAY, Weekday.FRIDAY), hour=9)

1st and 15th of every month at midnight

Crontab(day=(1, 15))

month = None class-attribute instance-attribute

Month of the year. None means every month.

day = None class-attribute instance-attribute

Day of the month (1-31). None means every day.

weekday = None class-attribute instance-attribute

Day of the week. None means every day.

hour = 0 class-attribute instance-attribute

Hour of the day (0-23).

minute = 0 class-attribute instance-attribute

Minute of the hour (0-59).

options: show_root_heading: false

인터페이스

spakky.task.interfaces.task_result

Abstract task result handle for background task dispatchers.

ITaskResult

Bases: ABC

Abstract handle for the result of a dispatched background task.

Concrete adapters (e.g. CeleryTaskResult) implement this for each broker.

task_id abstractmethod property

Unique identifier for the dispatched task.

get() abstractmethod

Block until the task completes and return its result.

Returns:

Type Description
T

The return value of the executed task method.

Source code in core/spakky-task/src/spakky/task/interfaces/task_result.py
@abstractmethod
def get(self) -> T:
    """Block until the task completes and return its result.

    Returns:
        The return value of the executed task method.
    """
    ...

get_async() abstractmethod async

Await until the task completes and return its result.

Use this in asyncio contexts to avoid blocking the event loop.

Returns:

Type Description
T

The return value of the executed task method.

Source code in core/spakky-task/src/spakky/task/interfaces/task_result.py
@abstractmethod
async def get_async(self) -> T:
    """Await until the task completes and return its result.

    Use this in asyncio contexts to avoid blocking the event loop.

    Returns:
        The return value of the executed task method.
    """
    ...

options: show_root_heading: false

직접 실행

spakky.task.direct

Direct in-process task execution support.

DirectTaskInvocation(*, handler_type, method_name, args=(), kwargs=dict()) dataclass

A direct same-process task invocation request.

handler_type instance-attribute

TaskHandler type that owns the method.

method_name instance-attribute

Task method name on the handler type.

args = () class-attribute instance-attribute

Positional arguments for the task method.

kwargs = field(default_factory=dict) class-attribute instance-attribute

Keyword arguments for the task method.

DirectTaskExecutor()

Bases: IApplicationContextAware

Execute registered tasks in the current process and context scope.

Source code in core/spakky-task/src/spakky/task/direct.py
def __init__(self) -> None:
    self._application_context = None

set_application_context(application_context)

Inject the ApplicationContext used to resolve task handlers.

Source code in core/spakky-task/src/spakky/task/direct.py
@override
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Inject the ApplicationContext used to resolve task handlers."""
    self._application_context = application_context

execute(invocation)

Execute a synchronous task without clearing request-scope context.

Source code in core/spakky-task/src/spakky/task/direct.py
def execute(self, invocation: DirectTaskInvocation) -> object:
    """Execute a synchronous task without clearing request-scope context."""
    method = self._resolve_method(invocation)
    if iscoroutinefunction(method):
        raise TaskAsyncInvocationRequiredError()
    return method(*invocation.args, **dict(invocation.kwargs))

execute_async(invocation) async

Execute a task in async code without clearing request-scope context.

Source code in core/spakky-task/src/spakky/task/direct.py
async def execute_async(self, invocation: DirectTaskInvocation) -> object:
    """Execute a task in async code without clearing request-scope context."""
    method = self._resolve_method(invocation)
    result = method(*invocation.args, **dict(invocation.kwargs))
    if isinstance(result, Awaitable):
        return await result
    return result

options: show_root_heading: false

후처리기

spakky.task.post_processor

Task handler registration post-processor.

TaskRegistrationPostProcessor()

Bases: IPostProcessor

Post-processor that scans @TaskHandler pods for @task methods.

This post-processor collects all task routes from TaskHandler pods and makes them available for task queue implementations to register.

Source code in core/spakky-task/src/spakky/task/post_processor.py
def __init__(self) -> None:
    self._task_routes = {}

post_process(pod)

Scan pod for @task methods and register their routes.

Parameters:

Name Type Description Default
pod object

The pod instance to process.

required

Returns:

Type Description
object

The unmodified pod instance.

Source code in core/spakky-task/src/spakky/task/post_processor.py
@override
def post_process(self, pod: object) -> object:
    """Scan pod for @task methods and register their routes.

    Args:
        pod: The pod instance to process.

    Returns:
        The unmodified pod instance.
    """
    pod_type = type(pod)

    if not TaskHandler.exists(pod_type):
        return pod

    for name, method in getmembers(pod, predicate=ismethod):
        route = TaskRoute.get_or_none(method)
        if route is None:
            continue

        route.auth_metadata = collect_task_auth_metadata(
            method,
            owner_type=pod_type,
        )
        self._task_routes[method] = route
        logger.debug(f"Registered task {pod_type.__name__}.{name}")

    return pod

get_task_routes()

Get all registered task routes.

Returns:

Type Description
dict[Func, TaskRoute]

Dictionary mapping task methods to their routes.

Source code in core/spakky-task/src/spakky/task/post_processor.py
def get_task_routes(self) -> dict[Func, TaskRoute]:
    """Get all registered task routes.

    Returns:
        Dictionary mapping task methods to their routes.
    """
    return self._task_routes.copy()

options: show_root_heading: false

에러

spakky.task.error

Spakky Task error hierarchy.

AbstractSpakkyTaskError

Bases: AbstractSpakkyFrameworkError, ABC

Base class for all spakky-task errors.

TaskNotFoundError

Bases: AbstractSpakkyTaskError

Raised when a task reference cannot be found in the registry.

TaskApplicationContextNotFoundError

Bases: AbstractSpakkyTaskError

Raised when direct task execution has no ApplicationContext.

TaskAsyncInvocationRequiredError

Bases: AbstractSpakkyTaskError

Raised when an async task is invoked through the sync direct path.

DuplicateTaskError

Bases: AbstractSpakkyTaskError

Raised when attempting to register a task that already exists.

InvalidScheduleSpecificationError

Bases: AbstractSpakkyTaskError

Raised when a ScheduleRoute has invalid schedule options.

options: show_root_heading: false

추가 모듈

Plugin initialization entry point.

initialize(app)

Initialize the spakky-task plugin.

Parameters:

Name Type Description Default
app SpakkyApplication

The SpakkyApplication instance.

required
Source code in core/spakky-task/src/spakky/task/main.py
def initialize(app: SpakkyApplication) -> None:
    """Initialize the spakky-task plugin.

    Args:
        app: The SpakkyApplication instance.
    """
    app.add(DirectTaskExecutor)
    app.add(TaskRegistrationPostProcessor)