콘텐츠로 이동

spakky-saga

사가 오케스트레이션 — SagaFlow, SagaStep, 보상 기반 롤백

스테레오타입

Saga stereotype for distributed transaction orchestration.

This module provides @Saga stereotype for organizing classes that implement saga orchestration logic.

Saga(*, name='', scope=Scope.SINGLETON) dataclass

Bases: Pod

Stereotype for saga orchestration classes.

Sagas represent distributed transaction orchestrations, coordinating multiple services through sequential/parallel steps with compensation logic.

기반 클래스

AbstractSaga base class + @saga_step 데코레이터.

사용자는 saga 메서드에 @saga_step을 명시적으로 적용한다. 데코레이터가 _SagaStepDescriptor[SagaDataT]를 반환하므로 타입체커는 클래스 속성 수준에서 명확한 타입을 인식하고, instance access 시 overload된 __get__SagaStep[SagaDataT]를 반환한다고 판단한다. 그 결과 self.method >> ..., self.method & ..., self.method | ... 같은 연산자 표현이 타입 안전하게 작동한다.

AbstractSaga

Bases: ABC

사가를 정의하는 제네릭 베이스 클래스.

서브클래스는 flow()를 구현하여 사가 흐름을 선언적으로 정의한다. 사가의 step 역할을 하는 async 메서드에는 @saga_step 데코레이터를 붙여 연산자 >>, &, | 사용을 타입 안전하게 활성화한다.

Example

@Saga() class CreateOrderSaga(AbstractSaga[CreateOrderSagaData]): @saga_step async def create_ticket(self, data): ...

@saga_step
async def cancel_ticket(self, data):
    ...

def flow(self) -> SagaFlow[CreateOrderSagaData]:
    return SagaFlow(items=(
        self.create_ticket >> self.cancel_ticket,
    ))

flow() abstractmethod

사가의 실행 흐름을 정의한다.

Returns:

Type Description
SagaFlow[SagaDataT]

SagaFlow[SagaDataT]: 사가 흐름 정의.

Source code in core/spakky-saga/src/spakky/saga/base.py
@abstractmethod
def flow(self) -> SagaFlow[SagaDataT]:
    """사가의 실행 흐름을 정의한다.

    Returns:
        SagaFlow[SagaDataT]: 사가 흐름 정의.
    """
    ...  # pragma: no branch - AbstractMethod only

execute(data) async

사가를 실행한다.

SagaFlow에 정의된 step들을 순차 실행하고, 실패 시 compensate가 있는 step만 역순으로 보상을 실행한다.

Parameters:

Name Type Description Default
data SagaDataT

사가 비즈니스 데이터.

required

Returns:

Type Description
SagaResult[SagaDataT]

SagaResult[SagaDataT]: 사가 실행 결과.

Raises:

Type Description
SagaCompensationFailedError

보상 실행 중 에러 발생 (on_compensation_failure 미설정 시).

Source code in core/spakky-saga/src/spakky/saga/base.py
async def execute(self, data: SagaDataT) -> SagaResult[SagaDataT]:
    """사가를 실행한다.

    SagaFlow에 정의된 step들을 순차 실행하고, 실패 시
    compensate가 있는 step만 역순으로 보상을 실행한다.

    Args:
        data: 사가 비즈니스 데이터.

    Returns:
        SagaResult[SagaDataT]: 사가 실행 결과.

    Raises:
        SagaCompensationFailedError: 보상 실행 중 에러 발생
            (on_compensation_failure 미설정 시).
    """
    return await run_saga_flow(self.flow(), data, saga_name=type(self).__name__)

saga_step(fn)

사가 step 메서드를 descriptor로 감싼다.

데코레이트된 메서드에 instance-level로 접근하면 SagaStep이 반환되어 >>, &, | 연산자로 Transaction/Parallel/에러 전략을 구성할 수 있다.

Example

@Saga() class CreateOrderSaga(AbstractSaga[OrderData]): @saga_step async def issue_ticket(self, data: OrderData) -> OrderData: ...

@saga_step
async def cancel_ticket(self, data: OrderData) -> None: ...

def flow(self) -> SagaFlow[OrderData]:
    return SagaFlow(items=(self.issue_ticket >> self.cancel_ticket,))

Parameters:

Name Type Description Default
fn Callable[[_SelfT, SagaDataT], Awaitable[SagaDataT | None]]

데코레이트할 async 메서드.

required

Returns:

Type Description
_SagaStepDescriptor[SagaDataT]

_SagaStepDescriptor[SagaDataT]: 런타임 descriptor, 인스턴스 접근 시 SagaStep을 반환한다.

Source code in core/spakky-saga/src/spakky/saga/base.py
def saga_step[_SelfT, SagaDataT: AbstractSagaData](
    fn: Callable[[_SelfT, SagaDataT], Awaitable[SagaDataT | None]],
) -> _SagaStepDescriptor[SagaDataT]:
    """사가 step 메서드를 descriptor로 감싼다.

    데코레이트된 메서드에 instance-level로 접근하면 `SagaStep`이 반환되어
    `>>`, `&`, `|` 연산자로 Transaction/Parallel/에러 전략을 구성할 수 있다.

    Example:
        @Saga()
        class CreateOrderSaga(AbstractSaga[OrderData]):
            @saga_step
            async def issue_ticket(self, data: OrderData) -> OrderData: ...

            @saga_step
            async def cancel_ticket(self, data: OrderData) -> None: ...

            def flow(self) -> SagaFlow[OrderData]:
                return SagaFlow(items=(self.issue_ticket >> self.cancel_ticket,))

    Args:
        fn: 데코레이트할 async 메서드.

    Returns:
        `_SagaStepDescriptor[SagaDataT]`: 런타임 descriptor, 인스턴스 접근 시 `SagaStep`을 반환한다.
    """
    return _SagaStepDescriptor(fn)

데이터

Saga business data model.

AbstractSagaData

Bases: AbstractDomainModel, ABC

사가의 비즈니스 데이터. 엔진 상태를 포함하지 않는다.

흐름

Saga flow composition types, type aliases, and builder functions.

ActionFn = Callable[[SagaDataT], Awaitable[SagaDataT | None]]

commit 액션 함수 시그니처. data를 변환하거나 None을 반환한다.

CompensateFn = Callable[[SagaDataT], Awaitable[None]]

보상 함수 시그니처. 부수효과만 수행한다.

FlowItem = SagaStep[SagaDataT] | Transaction[SagaDataT] | Parallel[SagaDataT] | Callable[[SagaDataT], Awaitable[SagaDataT | None]]

saga_flow에 넣을 수 있는 아이템 유니온 타입.

SagaStep

개별 saga step. 연산자로 Transaction, Parallel을 구성한다.

Transaction

commit + compensate 쌍. >> 연산자의 결과.

Parallel

동시 실행 그룹. & 연산자의 결과.

SagaFlow

전체 사가 흐름 정의.

timeout(duration)

사가 전체 타임아웃을 설정한다.

v1 제약: 타임아웃이 parallel() 그룹 실행 도중 만료되면, 그 그룹 내에서 이미 성공했으나 compensable 리스트에 등록되기 전(gather 반환 전) 상태였던 side-effect는 보상되지 않는다. 순차 step이나 이미 완료된 parallel 그룹의 commit된 step은 정상 보상된다.

Source code in core/spakky-saga/src/spakky/saga/flow.py
def timeout(self, duration: timedelta) -> Self:
    """사가 전체 타임아웃을 설정한다.

    v1 제약: 타임아웃이 `parallel()` 그룹 실행 도중 만료되면, 그 그룹 내에서
    이미 성공했으나 compensable 리스트에 등록되기 전(gather 반환 전) 상태였던
    side-effect는 보상되지 않는다. 순차 step이나 이미 완료된 parallel 그룹의
    commit된 step은 정상 보상된다.
    """
    return replace(self, saga_timeout=duration)

on_compensation_failure(handler)

보상 실패 시 에스컬레이션 핸들러를 설정한다.

Source code in core/spakky-saga/src/spakky/saga/flow.py
def on_compensation_failure(
    self,
    handler: Callable[[SagaDataT], Awaitable[None]],
) -> Self:
    """보상 실패 시 에스컬레이션 핸들러를 설정한다."""
    return replace(self, compensation_failure_handler=handler)

step(action, *, compensate=None, on_error=None, timeout=None)

commit-compensate 바인딩을 생성한다.

Parameters:

Name Type Description Default
action Callable[[SagaDataT], Awaitable[SagaDataT | None]]

commit 액션 함수.

required
compensate Callable[[SagaDataT], Awaitable[None]] | None

보상 함수. 지정 시 Transaction을 반환한다.

None
on_error ErrorStrategy | None

에러 전략. 미지정 시 Compensate.

None
timeout timedelta | None

step 타임아웃.

None

Returns:

Type Description
SagaStep[SagaDataT] | Transaction[SagaDataT]

compensate 미지정 시 SagaStep, 지정 시 Transaction.

Source code in core/spakky-saga/src/spakky/saga/flow.py
def step[SagaDataT: AbstractSagaData](
    action: Callable[[SagaDataT], Awaitable[SagaDataT | None]],
    *,
    compensate: Callable[[SagaDataT], Awaitable[None]] | None = None,
    on_error: ErrorStrategy | None = None,
    timeout: timedelta | None = None,
) -> SagaStep[SagaDataT] | Transaction[SagaDataT]:
    """commit-compensate 바인딩을 생성한다.

    Args:
        action: commit 액션 함수.
        compensate: 보상 함수. 지정 시 Transaction을 반환한다.
        on_error: 에러 전략. 미지정 시 Compensate.
        timeout: step 타임아웃.

    Returns:
        compensate 미지정 시 SagaStep, 지정 시 Transaction.
    """
    resolved_on_error: ErrorStrategy = (
        on_error if on_error is not None else Compensate()
    )
    if compensate is not None:
        return Transaction(
            action=action,
            compensate=compensate,
            on_error=resolved_on_error,
            timeout=timeout,
        )
    return SagaStep(
        action=action,
        on_error=resolved_on_error,
        timeout=timeout,
    )

parallel(*items)

동시 실행 그룹을 구성한다.

Callable은 SagaStep으로 자동 승격된다.

Parameters:

Name Type Description Default
*items SagaStep[SagaDataT] | Transaction[SagaDataT] | Parallel[SagaDataT] | Callable[[SagaDataT], Awaitable[SagaDataT | None]]

병렬 실행할 FlowItem들. 최소 2개 필요.

()

Raises:

Type Description
SagaFlowDefinitionError

아이템이 2개 미만일 때.

Source code in core/spakky-saga/src/spakky/saga/flow.py
def parallel[SagaDataT: AbstractSagaData](
    *items: SagaStep[SagaDataT]
    | Transaction[SagaDataT]
    | Parallel[SagaDataT]
    | Callable[[SagaDataT], Awaitable[SagaDataT | None]],
) -> Parallel[SagaDataT]:
    """동시 실행 그룹을 구성한다.

    Callable은 SagaStep으로 자동 승격된다.

    Args:
        *items: 병렬 실행할 FlowItem들. 최소 2개 필요.

    Raises:
        SagaFlowDefinitionError: 아이템이 2개 미만일 때.
    """
    if len(items) < _MIN_PARALLEL_ITEMS:
        raise SagaFlowDefinitionError
    promoted: list[SagaStep[SagaDataT] | Transaction[SagaDataT]] = []
    for item in items:
        if isinstance(item, (SagaStep, Transaction)):
            promoted.append(item)
        elif isinstance(item, Parallel):
            promoted.extend(item.items)
        elif callable(item):
            promoted.append(SagaStep(action=item))
        else:
            raise SagaFlowDefinitionError
    return Parallel(items=tuple(promoted))

saga_flow(*items)

사가 흐름을 정의한다.

Callable은 SagaStep으로 자동 승격된다.

Parameters:

Name Type Description Default
*items SagaStep[SagaDataT] | Transaction[SagaDataT] | Parallel[SagaDataT] | Callable[[SagaDataT], Awaitable[SagaDataT | None]]

순차 실행할 FlowItem들. 최소 1개 필요.

()

Raises:

Type Description
SagaFlowDefinitionError

아이템이 비어있을 때.

Source code in core/spakky-saga/src/spakky/saga/flow.py
def saga_flow[SagaDataT: AbstractSagaData](
    *items: SagaStep[SagaDataT]
    | Transaction[SagaDataT]
    | Parallel[SagaDataT]
    | Callable[[SagaDataT], Awaitable[SagaDataT | None]],
) -> SagaFlow[SagaDataT]:
    """사가 흐름을 정의한다.

    Callable은 SagaStep으로 자동 승격된다.

    Args:
        *items: 순차 실행할 FlowItem들. 최소 1개 필요.

    Raises:
        SagaFlowDefinitionError: 아이템이 비어있을 때.
    """
    if len(items) == 0:
        raise SagaFlowDefinitionError
    promoted: list[
        SagaStep[SagaDataT] | Transaction[SagaDataT] | Parallel[SagaDataT]
    ] = []
    for item in items:
        if isinstance(item, (SagaStep, Transaction, Parallel)):
            promoted.append(item)
        elif callable(item):
            promoted.append(SagaStep(action=item))
        else:
            raise SagaFlowDefinitionError
    return SagaFlow(items=tuple(promoted))

에러 전략

Error strategy types for saga step failure handling.

ErrorStrategy = Compensate | Skip | Retry

에러 전략 유니온 타입.

Compensate

역순 보상을 트리거한다 (기본 전략).

Skip

실패를 무시하고 다음 step으로 진행한다.

ExponentialBackoff

지수 백오프 전략.

delay_for(attempt)

attempt(1-indexed) 직전에 대기할 초 단위 지연을 반환한다.

Parameters:

Name Type Description Default
attempt int

시도 번호 (1-indexed). attempt=1 → base, attempt=2 → base*2, ...

required

Returns:

Name Type Description
float float

base * 2^(attempt-1).

Source code in core/spakky-saga/src/spakky/saga/strategy.py
def delay_for(self, attempt: int) -> float:
    """attempt(1-indexed) 직전에 대기할 초 단위 지연을 반환한다.

    Args:
        attempt: 시도 번호 (1-indexed). attempt=1 → base, attempt=2 → base*2, ...

    Returns:
        float: base * 2^(attempt-1).
    """
    return self.base * (2 ** (attempt - 1))

Retry

지정 횟수만큼 재시도 후 then 전략을 적용한다.

결과

Saga execution result.

StepStatus

Bases: Enum

개별 step의 실행 상태.

StepRecord

단일 step의 실행 기록.

SagaResult

사가 실행 결과. 예외를 발생시키지 않고 결과를 전달한다.

상태

Saga execution status.

SagaStatus

Bases: Enum

Saga 실행 상태를 나타내는 열거형.

엔진

Saga execution engine — sequential/parallel execution, retry, timeout, compensation.

SagaExecutor(flow, data, saga_name=_ANONYMOUS_SAGA_NAME)

사가 실행 오케스트레이터.

SagaFlow를 입력받아 step 단위로 실행하고, 실패 시 on_error 전략을 적용하며, 필요 시 역순 보상을 수행한다. 공개 API는 run_saga_flow(flow, data)를 통한 얇은 래퍼로 노출된다.

Source code in core/spakky-saga/src/spakky/saga/engine.py
def __init__(
    self,
    flow: SagaFlow[SagaDataT],
    data: SagaDataT,
    saga_name: str = _ANONYMOUS_SAGA_NAME,
) -> None:
    self._flow = flow
    self._data: SagaDataT = data
    self._saga_name = saga_name
    self._compensable: list[tuple[str, CompensateFn[SagaDataT]]] = []
    self._history: list[StepRecord] = []
    self._saga_start: float = 0.0

run() async

사가를 실행하고 결과를 반환한다.

Source code in core/spakky-saga/src/spakky/saga/engine.py
async def run(self) -> SagaResult[SagaDataT]:
    """사가를 실행하고 결과를 반환한다."""
    self._saga_start = monotonic()
    logger.info("[saga=%s status=started]", self._saga_name)
    try:
        normalized = self._normalize(self._flow.items)
        saga_timeout = self._flow.saga_timeout
        if saga_timeout is None:
            result = await self._run_items(normalized)
        else:
            result = await self._run_with_saga_timeout(normalized, saga_timeout)
        logger.info(
            "[saga=%s status=%s elapsed=%s]",
            self._saga_name,
            result.status.value,
            _format_ms(result.elapsed),
        )
        return result
    except BaseException as error:
        elapsed = timedelta(seconds=monotonic() - self._saga_start)
        logger.warning(
            "[saga=%s status=aborted error=%s elapsed=%s]",
            self._saga_name,
            type(error).__name__,
            _format_ms(elapsed),
        )
        raise

run_saga_flow(flow, data, *, saga_name=_ANONYMOUS_SAGA_NAME) async

SagaFlow를 실행하고 결과를 반환한다.

Parameters:

Name Type Description Default
flow SagaFlow[SagaDataT]

사가 흐름 정의.

required
data SagaDataT

초기 사가 비즈니스 데이터.

required
saga_name str

구조화 로그에 포함될 사가 이름. 기본값은 익명 표기.

_ANONYMOUS_SAGA_NAME

Returns:

Type Description
SagaResult[SagaDataT]

SagaResult[SagaDataT]: 사가 실행 결과. 예외를 발생시키지 않는다 (SagaCompensationFailedError 제외).

Raises:

Type Description
SagaCompensationFailedError

보상 실행 중 에러 발생 (on_compensation_failure 미설정 시).

Source code in core/spakky-saga/src/spakky/saga/engine.py
async def run_saga_flow[SagaDataT: AbstractSagaData](
    flow: SagaFlow[SagaDataT],
    data: SagaDataT,
    *,
    saga_name: str = _ANONYMOUS_SAGA_NAME,
) -> SagaResult[SagaDataT]:
    """SagaFlow를 실행하고 결과를 반환한다.

    Args:
        flow: 사가 흐름 정의.
        data: 초기 사가 비즈니스 데이터.
        saga_name: 구조화 로그에 포함될 사가 이름. 기본값은 익명 표기.

    Returns:
        SagaResult[SagaDataT]: 사가 실행 결과. 예외를 발생시키지 않는다
            (SagaCompensationFailedError 제외).

    Raises:
        SagaCompensationFailedError: 보상 실행 중 에러 발생
            (on_compensation_failure 미설정 시).
    """
    return await SagaExecutor(flow, data, saga_name=saga_name).run()

에러

Spakky Saga error hierarchy.

AbstractSpakkySagaError

Bases: AbstractSpakkyFrameworkError, ABC

Base class for all spakky-saga errors.

SagaFlowDefinitionError

Bases: AbstractSpakkySagaError

Raised when a saga flow definition is invalid (static validation).

SagaCompensationFailedError

Bases: AbstractSpakkySagaError

Raised when compensation fails during saga rollback.

SagaStepTimeoutError

Bases: AbstractSpakkySagaError

Raised internally when a step exceeds its timeout (routed through on_error).

SagaParallelMergeConflictError

Bases: AbstractSpakkySagaError

Raised when parallel steps modify the same field during data merge.

SagaEngineNotConnectedError

Bases: AbstractSpakkySagaError

Raised when execute() is called before the saga engine is connected.