spakky-saga¶
사가 오케스트레이션 — SagaFlow, SagaStep, 보상 기반 롤백
스테레오타입¶
Saga stereotype for distributed transaction orchestration.
This module provides @Saga stereotype for organizing classes that implement saga orchestration logic.
Saga(*, name='', scope=Scope.SINGLETON)
dataclass
¶
Bases: Pod
Stereotype for saga orchestration classes.
Sagas represent distributed transaction orchestrations, coordinating multiple services through sequential/parallel steps with compensation logic.
기반 클래스¶
AbstractSaga base class + @saga_step 데코레이터.
사용자는 saga 메서드에 @saga_step을 명시적으로 적용한다. 데코레이터가
_SagaStepDescriptor[SagaDataT]를 반환하므로 타입체커는 클래스 속성 수준에서
명확한 타입을 인식하고, instance access 시 overload된 __get__이
SagaStep[SagaDataT]를 반환한다고 판단한다. 그 결과 self.method >> ...,
self.method & ..., self.method | ... 같은 연산자 표현이 타입 안전하게 작동한다.
AbstractSaga
¶
Bases: ABC
사가를 정의하는 제네릭 베이스 클래스.
서브클래스는 flow()를 구현하여 사가 흐름을 선언적으로 정의한다. 사가의
step 역할을 하는 async 메서드에는 @saga_step 데코레이터를 붙여 연산자
>>, &, | 사용을 타입 안전하게 활성화한다.
Example
@Saga() class CreateOrderSaga(AbstractSaga[CreateOrderSagaData]): @saga_step async def create_ticket(self, data): ...
@saga_step
async def cancel_ticket(self, data):
...
def flow(self) -> SagaFlow[CreateOrderSagaData]:
return SagaFlow(items=(
self.create_ticket >> self.cancel_ticket,
))
flow()
abstractmethod
¶
execute(data)
async
¶
사가를 실행한다.
SagaFlow에 정의된 step들을 순차 실행하고, 실패 시 compensate가 있는 step만 역순으로 보상을 실행한다.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
SagaDataT
|
사가 비즈니스 데이터. |
required |
Returns:
| Type | Description |
|---|---|
SagaResult[SagaDataT]
|
SagaResult[SagaDataT]: 사가 실행 결과. |
Raises:
| Type | Description |
|---|---|
SagaCompensationFailedError
|
보상 실행 중 에러 발생 (on_compensation_failure 미설정 시). |
Source code in core/spakky-saga/src/spakky/saga/base.py
saga_step(fn)
¶
사가 step 메서드를 descriptor로 감싼다.
데코레이트된 메서드에 instance-level로 접근하면 SagaStep이 반환되어
>>, &, | 연산자로 Transaction/Parallel/에러 전략을 구성할 수 있다.
Example
@Saga() class CreateOrderSaga(AbstractSaga[OrderData]): @saga_step async def issue_ticket(self, data: OrderData) -> OrderData: ...
@saga_step
async def cancel_ticket(self, data: OrderData) -> None: ...
def flow(self) -> SagaFlow[OrderData]:
return SagaFlow(items=(self.issue_ticket >> self.cancel_ticket,))
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Callable[[_SelfT, SagaDataT], Awaitable[SagaDataT | None]]
|
데코레이트할 async 메서드. |
required |
Returns:
| Type | Description |
|---|---|
_SagaStepDescriptor[SagaDataT]
|
|
Source code in core/spakky-saga/src/spakky/saga/base.py
데이터¶
Saga business data model.
AbstractSagaData
¶
Bases: AbstractDomainModel, ABC
사가의 비즈니스 데이터. 엔진 상태를 포함하지 않는다.
흐름¶
Saga flow composition types, type aliases, and builder functions.
ActionFn = Callable[[SagaDataT], Awaitable[SagaDataT | None]]
¶
commit 액션 함수 시그니처. data를 변환하거나 None을 반환한다.
CompensateFn = Callable[[SagaDataT], Awaitable[None]]
¶
보상 함수 시그니처. 부수효과만 수행한다.
FlowItem = SagaStep[SagaDataT] | Transaction[SagaDataT] | Parallel[SagaDataT] | Callable[[SagaDataT], Awaitable[SagaDataT | None]]
¶
saga_flow에 넣을 수 있는 아이템 유니온 타입.
SagaStep
¶
개별 saga step. 연산자로 Transaction, Parallel을 구성한다.
Transaction
¶
commit + compensate 쌍. >> 연산자의 결과.
Parallel
¶
동시 실행 그룹. & 연산자의 결과.
SagaFlow
¶
전체 사가 흐름 정의.
timeout(duration)
¶
사가 전체 타임아웃을 설정한다.
v1 제약: 타임아웃이 parallel() 그룹 실행 도중 만료되면, 그 그룹 내에서
이미 성공했으나 compensable 리스트에 등록되기 전(gather 반환 전) 상태였던
side-effect는 보상되지 않는다. 순차 step이나 이미 완료된 parallel 그룹의
commit된 step은 정상 보상된다.
Source code in core/spakky-saga/src/spakky/saga/flow.py
on_compensation_failure(handler)
¶
step(action, *, compensate=None, on_error=None, timeout=None)
¶
commit-compensate 바인딩을 생성한다.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
action
|
Callable[[SagaDataT], Awaitable[SagaDataT | None]]
|
commit 액션 함수. |
required |
compensate
|
Callable[[SagaDataT], Awaitable[None]] | None
|
보상 함수. 지정 시 Transaction을 반환한다. |
None
|
on_error
|
ErrorStrategy | None
|
에러 전략. 미지정 시 Compensate. |
None
|
timeout
|
timedelta | None
|
step 타임아웃. |
None
|
Returns:
| Type | Description |
|---|---|
SagaStep[SagaDataT] | Transaction[SagaDataT]
|
compensate 미지정 시 SagaStep, 지정 시 Transaction. |
Source code in core/spakky-saga/src/spakky/saga/flow.py
parallel(*items)
¶
동시 실행 그룹을 구성한다.
Callable은 SagaStep으로 자동 승격된다.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*items
|
SagaStep[SagaDataT] | Transaction[SagaDataT] | Parallel[SagaDataT] | Callable[[SagaDataT], Awaitable[SagaDataT | None]]
|
병렬 실행할 FlowItem들. 최소 2개 필요. |
()
|
Raises:
| Type | Description |
|---|---|
SagaFlowDefinitionError
|
아이템이 2개 미만일 때. |
Source code in core/spakky-saga/src/spakky/saga/flow.py
saga_flow(*items)
¶
사가 흐름을 정의한다.
Callable은 SagaStep으로 자동 승격된다.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*items
|
SagaStep[SagaDataT] | Transaction[SagaDataT] | Parallel[SagaDataT] | Callable[[SagaDataT], Awaitable[SagaDataT | None]]
|
순차 실행할 FlowItem들. 최소 1개 필요. |
()
|
Raises:
| Type | Description |
|---|---|
SagaFlowDefinitionError
|
아이템이 비어있을 때. |
Source code in core/spakky-saga/src/spakky/saga/flow.py
에러 전략¶
Error strategy types for saga step failure handling.
ErrorStrategy = Compensate | Skip | Retry
¶
에러 전략 유니온 타입.
Compensate
¶
역순 보상을 트리거한다 (기본 전략).
Skip
¶
실패를 무시하고 다음 step으로 진행한다.
ExponentialBackoff
¶
지수 백오프 전략.
delay_for(attempt)
¶
attempt(1-indexed) 직전에 대기할 초 단위 지연을 반환한다.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
attempt
|
int
|
시도 번호 (1-indexed). attempt=1 → base, attempt=2 → base*2, ... |
required |
Returns:
| Name | Type | Description |
|---|---|---|
float |
float
|
base * 2^(attempt-1). |
Source code in core/spakky-saga/src/spakky/saga/strategy.py
Retry
¶
지정 횟수만큼 재시도 후 then 전략을 적용한다.
결과¶
상태¶
Saga execution status.
SagaStatus
¶
Bases: Enum
Saga 실행 상태를 나타내는 열거형.
엔진¶
Saga execution engine — sequential/parallel execution, retry, timeout, compensation.
SagaExecutor(flow, data, saga_name=_ANONYMOUS_SAGA_NAME)
¶
사가 실행 오케스트레이터.
SagaFlow를 입력받아 step 단위로 실행하고, 실패 시 on_error 전략을 적용하며,
필요 시 역순 보상을 수행한다. 공개 API는 run_saga_flow(flow, data)를 통한 얇은
래퍼로 노출된다.
Source code in core/spakky-saga/src/spakky/saga/engine.py
run()
async
¶
사가를 실행하고 결과를 반환한다.
Source code in core/spakky-saga/src/spakky/saga/engine.py
run_saga_flow(flow, data, *, saga_name=_ANONYMOUS_SAGA_NAME)
async
¶
SagaFlow를 실행하고 결과를 반환한다.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
flow
|
SagaFlow[SagaDataT]
|
사가 흐름 정의. |
required |
data
|
SagaDataT
|
초기 사가 비즈니스 데이터. |
required |
saga_name
|
str
|
구조화 로그에 포함될 사가 이름. 기본값은 익명 표기. |
_ANONYMOUS_SAGA_NAME
|
Returns:
| Type | Description |
|---|---|
SagaResult[SagaDataT]
|
SagaResult[SagaDataT]: 사가 실행 결과. 예외를 발생시키지 않는다 (SagaCompensationFailedError 제외). |
Raises:
| Type | Description |
|---|---|
SagaCompensationFailedError
|
보상 실행 중 에러 발생 (on_compensation_failure 미설정 시). |
Source code in core/spakky-saga/src/spakky/saga/engine.py
에러¶
Spakky Saga error hierarchy.
AbstractSpakkySagaError
¶
Bases: AbstractSpakkyFrameworkError, ABC
Base class for all spakky-saga errors.
SagaFlowDefinitionError
¶
SagaCompensationFailedError
¶
SagaStepTimeoutError
¶
Bases: AbstractSpakkySagaError
Raised internally when a step exceeds its timeout (routed through on_error).