spakky-saga¶
spakky-saga는 보상 기반 분산 트랜잭션 흐름을 선언하고 실행하는 Saga 계약을 제공합니다.
사가 오케스트레이션 — SagaFlow, SagaStep, 보상 기반 롤백
플러그인 진입점¶
Plugin initialization entry point.
initialize(app)
¶
Initialize the spakky-saga plugin.
@Saga() 스테레오타입은 Pod 하위 클래스이므로, 패키지 스캔만으로
DI 컨테이너가 사가 클래스를 자동 관리한다. 사가 실행은
AbstractSaga.execute(data) 또는 run_saga_flow(flow, data)를 통해
이루어지며, 별도 post-processor 등록이 필요하지 않다.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
app
|
SpakkyApplication
|
The SpakkyApplication instance. |
required |
Source code in core/spakky-saga/src/spakky/saga/main.py
스테레오타입¶
Saga stereotype for distributed transaction orchestration.
This module provides @Saga stereotype for organizing classes that implement saga orchestration logic.
기반 클래스¶
AbstractSaga base class + @saga_step 데코레이터.
사용자는 saga 메서드에 @saga_step을 명시적으로 적용한다. 데코레이터가
_SagaStepDescriptor[SagaDataT]를 반환하므로 타입체커는 클래스 속성 수준에서
명확한 타입을 인식하고, instance access 시 overload된 __get__이
SagaStep[SagaDataT]를 반환한다고 판단한다. 그 결과 self.method >> ...,
self.method & ..., self.method | ... 같은 연산자 표현이 타입 안전하게 작동한다.
AbstractSaga
¶
Bases: ABC
사가를 정의하는 제네릭 베이스 클래스.
서브클래스는 flow()를 구현하여 사가 흐름을 선언적으로 정의한다. 사가의
step 역할을 하는 async 메서드에는 @saga_step 데코레이터를 붙여 연산자
>>, &, | 사용을 타입 안전하게 활성화한다.
Example
@Saga() class CreateOrderSaga(AbstractSaga[CreateOrderSagaData]): @saga_step async def create_ticket(self, data): ...
@saga_step
async def cancel_ticket(self, data):
...
def flow(self) -> SagaFlow[CreateOrderSagaData]:
return SagaFlow(items=(
self.create_ticket >> self.cancel_ticket,
))
flow()
abstractmethod
¶
execute(data, *, auth_context=None)
async
¶
사가를 실행한다.
SagaFlow에 정의된 step들을 순차 실행하고, 실패 시 compensate가 있는 step만 역순으로 보상을 실행한다.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
SagaDataT
|
사가 비즈니스 데이터. |
required |
Returns:
| Type | Description |
|---|---|
SagaResult[SagaDataT]
|
SagaResult[SagaDataT]: 사가 실행 결과. |
Raises:
| Type | Description |
|---|---|
SagaCompensationFailedError
|
보상 실행 중 에러 발생 (on_compensation_failure 미설정 시). |
Source code in core/spakky-saga/src/spakky/saga/base.py
saga_step(fn)
¶
사가 step 메서드를 descriptor로 감싼다.
데코레이트된 메서드에 instance-level로 접근하면 SagaStep이 반환되어
>>, &, | 연산자로 Transaction/Parallel/에러 전략을 구성할 수 있다.
Example
@Saga() class CreateOrderSaga(AbstractSaga[OrderData]): @saga_step async def issue_ticket(self, data: OrderData) -> OrderData: ...
@saga_step
async def cancel_ticket(self, data: OrderData) -> None: ...
def flow(self) -> SagaFlow[OrderData]:
return SagaFlow(items=(self.issue_ticket >> self.cancel_ticket,))
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Callable[[SelfT, SagaDataT], Awaitable[SagaDataT | None]]
|
데코레이트할 async 메서드. |
required |
Returns:
| Type | Description |
|---|---|
_SagaStepDescriptor[SagaDataT]
|
|
Source code in core/spakky-saga/src/spakky/saga/base.py
데이터¶
Saga business data model.
AbstractSagaData
¶
Bases: AbstractDomainModel, ABC
사가의 비즈니스 데이터. 엔진 상태를 포함하지 않는다.
auth_context_snapshot = None
class-attribute
instance-attribute
¶
Signed AuthContextSnapshot envelope propagated across saga execution.
흐름¶
Saga flow composition types, type aliases, and builder functions.
ActionFn = Callable[[SagaDataT], Awaitable[SagaDataT | None]]
¶
commit 액션 함수 시그니처. data를 변환하거나 None을 반환한다.
CompensateFn = Callable[[SagaDataT], Awaitable[None]]
¶
보상 함수 시그니처. 부수효과만 수행한다.
FlowItem = SagaStep[SagaDataT] | Transaction[SagaDataT] | Parallel[SagaDataT] | Callable[[SagaDataT], Awaitable[SagaDataT | None]]
¶
saga_flow에 넣을 수 있는 아이템 유니온 타입.
SagaStep
¶
개별 saga step. 연산자로 Transaction, Parallel을 구성한다.
Transaction
¶
commit + compensate 쌍. >> 연산자의 결과.
Parallel
¶
동시 실행 그룹. & 연산자의 결과.
SagaFlow
¶
전체 사가 흐름 정의.
timeout(duration)
¶
사가 전체 타임아웃을 설정한다.
v1 제약: 타임아웃이 parallel() 그룹 실행 도중 만료되면, 그 그룹 내에서
이미 성공했으나 compensable 리스트에 등록되기 전(gather 반환 전) 상태였던
side-effect는 보상되지 않는다. 순차 step이나 이미 완료된 parallel 그룹의
commit된 step은 정상 보상된다.
Source code in core/spakky-saga/src/spakky/saga/flow.py
on_compensation_failure(handler)
¶
step(action, *, compensate=None, on_error=None, timeout=None)
¶
commit-compensate 바인딩을 생성한다.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
action
|
Callable[[SagaDataT], Awaitable[SagaDataT | None]]
|
commit 액션 함수. |
required |
compensate
|
Callable[[SagaDataT], Awaitable[None]] | None
|
보상 함수. 지정 시 Transaction을 반환한다. |
None
|
on_error
|
ErrorStrategy | None
|
에러 전략. 미지정 시 Compensate. |
None
|
timeout
|
timedelta | None
|
step 타임아웃. |
None
|
Returns:
| Type | Description |
|---|---|
SagaStep[SagaDataT] | Transaction[SagaDataT]
|
compensate 미지정 시 SagaStep, 지정 시 Transaction. |
Source code in core/spakky-saga/src/spakky/saga/flow.py
parallel(*items)
¶
동시 실행 그룹을 구성한다.
Callable은 SagaStep으로 자동 승격된다.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*items
|
SagaStep[SagaDataT] | Transaction[SagaDataT] | Parallel[SagaDataT] | Callable[[SagaDataT], Awaitable[SagaDataT | None]]
|
병렬 실행할 FlowItem들. 최소 2개 필요. |
()
|
Raises:
| Type | Description |
|---|---|
SagaFlowDefinitionError
|
아이템이 2개 미만일 때. |
Source code in core/spakky-saga/src/spakky/saga/flow.py
saga_flow(*items)
¶
사가 흐름을 정의한다.
Callable은 SagaStep으로 자동 승격된다.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*items
|
SagaStep[SagaDataT] | Transaction[SagaDataT] | Parallel[SagaDataT] | Callable[[SagaDataT], Awaitable[SagaDataT | None]]
|
순차 실행할 FlowItem들. 최소 1개 필요. |
()
|
Raises:
| Type | Description |
|---|---|
SagaFlowDefinitionError
|
아이템이 비어있을 때. |
Source code in core/spakky-saga/src/spakky/saga/flow.py
에러 전략¶
Error strategy types for saga step failure handling.
ErrorStrategy = Compensate | Skip | Retry
¶
에러 전략 유니온 타입.
Compensate
¶
역순 보상을 트리거한다 (기본 전략).
Skip
¶
실패를 무시하고 다음 step으로 진행한다.
ExponentialBackoff
¶
지수 백오프 전략.
delay_for(attempt)
¶
attempt(1-indexed) 직전에 대기할 초 단위 지연을 반환한다.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
attempt
|
int
|
시도 번호 (1-indexed). attempt=1 → base, attempt=2 → base*2, ... |
required |
Returns:
| Name | Type | Description |
|---|---|---|
float |
float
|
base * 2^(attempt-1). |
Source code in core/spakky-saga/src/spakky/saga/strategy.py
Retry
¶
지정 횟수만큼 재시도 후 then 전략을 적용한다.
결과¶
상태¶
Saga execution status.
SagaStatus
¶
Bases: Enum
Saga 실행 상태를 나타내는 열거형.
엔진¶
Saga execution engine — sequential/parallel execution, retry, timeout, compensation.
SagaExecutor(flow, data, saga_name=_ANONYMOUS_SAGA_NAME, auth_context=None)
¶
사가 실행 오케스트레이터.
SagaFlow를 입력받아 step 단위로 실행하고, 실패 시 on_error 전략을 적용하며,
필요 시 역순 보상을 수행한다. 공개 API는 run_saga_flow(flow, data)를 통한 얇은
래퍼로 노출된다.
Source code in core/spakky-saga/src/spakky/saga/engine.py
run()
async
¶
사가를 실행하고 결과를 반환한다.
Source code in core/spakky-saga/src/spakky/saga/engine.py
run_saga_flow(flow, data, *, saga_name=_ANONYMOUS_SAGA_NAME, auth_context=None)
async
¶
SagaFlow를 실행하고 결과를 반환한다.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
flow
|
SagaFlow[SagaDataT]
|
사가 흐름 정의. |
required |
data
|
SagaDataT
|
초기 사가 비즈니스 데이터. |
required |
saga_name
|
str
|
구조화 로그에 포함될 사가 이름. 기본값은 익명 표기. |
_ANONYMOUS_SAGA_NAME
|
auth_context
|
SagaAuthExecutionContext | None
|
protected saga step enforcement에 사용할 auth provider ports. |
None
|
Returns:
| Type | Description |
|---|---|
SagaResult[SagaDataT]
|
SagaResult[SagaDataT]: 사가 실행 결과. 예외를 발생시키지 않는다 (SagaCompensationFailedError 제외). |
Raises:
| Type | Description |
|---|---|
SagaCompensationFailedError
|
보상 실행 중 에러 발생 (on_compensation_failure 미설정 시). |
Source code in core/spakky-saga/src/spakky/saga/engine.py
Auth Propagation¶
Saga auth snapshot verification and protected step enforcement.
SAGA_AUTH_BOUNDARY = 'saga'
module-attribute
¶
AuthInvocation boundary value used for protected saga steps.
SagaAuthExecutionContext(*, snapshot_verifier=None, authorization_policy_evaluator=None, permission_checker=None, relation_checker=None, role_checker=None, scope_checker=None)
dataclass
¶
Provider-neutral auth ports used while executing protected saga steps.
authorize_step(*, step_name, boundary, owner_type, data)
¶
Authorize a saga step when protected auth metadata is present.
Source code in core/spakky-saga/src/spakky/saga/auth.py
에러¶
Spakky Saga error hierarchy.
추가 모듈¶
Saga auth snapshot verification and protected step enforcement.
SAGA_AUTH_BOUNDARY = 'saga'
module-attribute
¶
AuthInvocation boundary value used for protected saga steps.
SagaAuthExecutionContext(*, snapshot_verifier=None, authorization_policy_evaluator=None, permission_checker=None, relation_checker=None, role_checker=None, scope_checker=None)
dataclass
¶
Provider-neutral auth ports used while executing protected saga steps.
authorize_step(*, step_name, boundary, owner_type, data)
¶
Authorize a saga step when protected auth metadata is present.
Source code in core/spakky-saga/src/spakky/saga/auth.py
Plugin initialization entry point.
initialize(app)
¶
Initialize the spakky-saga plugin.
@Saga() 스테레오타입은 Pod 하위 클래스이므로, 패키지 스캔만으로
DI 컨테이너가 사가 클래스를 자동 관리한다. 사가 실행은
AbstractSaga.execute(data) 또는 run_saga_flow(flow, data)를 통해
이루어지며, 별도 post-processor 등록이 필요하지 않다.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
app
|
SpakkyApplication
|
The SpakkyApplication instance. |
required |