콘텐츠로 이동

gRPC 통합

spakky-grpc는 code-first 방식의 gRPC 서비스 통합을 제공합니다. pydantic BaseModel에서 protobuf descriptor를 자동 생성하며, @GrpcController@rpc 데코레이터로 선언적으로 gRPC 서비스를 정의합니다.


동작 원리

  1. @GrpcController로 gRPC 서비스 컨트롤러를 선언
  2. @rpc로 메서드를 RPC 엔드포인트로 마크
  3. ProtoField 어노테이션으로 요청/응답 타입에 protobuf 메타데이터 부착
  4. DescriptorBuilder가 Python 타입에서 protobuf descriptor를 자동 생성
  5. PostProcessor들이 서비스 등록, 인터셉터 추가, 서버 바인딩을 자동 처리

설정

pip install spakky-grpc

spakky-grpcspakky, spakky-tracing, grpcio, protobuf, pydantic>=2.4에 의존합니다.

GrpcServerSpecDescriptorRegistry Pod를 둘 다 등록해야 합니다. PostProcessor가 인터셉터와 서비스 핸들러를 spec에 누적하고, RegisterServicesPostProcessor가 컨테이너에서 DescriptorRegistry를 조회(container.get(DescriptorRegistry))하여 생성된 protobuf descriptor를 등록하므로 DescriptorRegistry Pod는 필수입니다. start() 호출 시 ApplicationContext의 이벤트 루프에서 실제 grpc.aio.Server가 생성·구동됩니다.

import spakky.plugins.grpc
from spakky.core.application.application import SpakkyApplication
from spakky.core.application.application_context import ApplicationContext
from spakky.core.pod.annotations.pod import Pod
from spakky.plugins.grpc.schema.registry import DescriptorRegistry
from spakky.plugins.grpc.server_spec import GrpcServerSpec

import apps  # `@GrpcController`가 정의된 사용자 패키지


@Pod()
def get_spec() -> GrpcServerSpec:
    spec = GrpcServerSpec()
    spec.add_insecure_port("127.0.0.1:50051")
    return spec


@Pod()
def get_registry() -> DescriptorRegistry:
    return DescriptorRegistry()


app = (
    SpakkyApplication(ApplicationContext())
    .load_plugins(include={
        spakky.plugins.grpc.PLUGIN_NAME,
    })
    .scan(apps)
    .add(get_spec)
    .add(get_registry)
    .start()
)

서비스 정의

@GrpcController

gRPC 서비스 컨트롤러를 선언합니다. @Controller의 서브클래스이므로 DI 컨테이너에 자동 등록됩니다.

from spakky.plugins.grpc.stereotypes.grpc_controller import GrpcController

@GrpcController(package="example.user", service_name="UserService")
class UserServiceController:
    def __init__(self, user_service: UserService) -> None:
        self._user_service = user_service

    ...

@rpc

메서드를 gRPC RPC 엔드포인트로 마크합니다. RpcMethodType으로 스트리밍 모드를 지정합니다.

from spakky.plugins.grpc.decorators.rpc import rpc, RpcMethodType

@GrpcController(package="example.user", service_name="UserService")
class UserServiceController:
    @rpc(method_type=RpcMethodType.UNARY)
    async def get_user(self, request: GetUserRequest) -> GetUserResponse:
        user = await self._user_service.get_user(request.user_id)
        return GetUserResponse(user_id=user.uid, name=user.name)

RpcMethodType

설명
UNARY 단일 요청, 단일 응답
SERVER_STREAMING 단일 요청, 스트림 응답
CLIENT_STREAMING 스트림 요청, 단일 응답
BIDI_STREAMING 양방향 스트리밍

Code-First Protobuf

ProtoField

pydantic BaseModel 필드에 protobuf 필드 번호를 부착합니다. .proto 파일 없이 Python 타입만으로 protobuf descriptor를 생성합니다. DescriptorBuilderBaseModel.model_fields[name].metadata에서 ProtoField 인스턴스를 읽어오므로 메시지 타입은 반드시 pydantic.BaseModel 서브클래스여야 합니다.

from typing import Annotated
from pydantic import BaseModel
from spakky.plugins.grpc.annotations.field import ProtoField

class GetUserRequest(BaseModel):
    user_id: Annotated[str, ProtoField(number=1)]

class GetUserResponse(BaseModel):
    user_id: Annotated[str, ProtoField(number=1)]
    name: Annotated[str, ProtoField(number=2)]
    email: Annotated[str, ProtoField(number=3)]

지원되는 타입 매핑

type_map 모듈이 Python 타입을 protobuf 타입으로 자동 매핑합니다.

Python 타입 Protobuf 타입
str string
int int64
float double
bool bool
bytes bytes
list[T] repeated T
T \| None optional T (proto3 optional)
중첩 BaseModel message (재귀적으로 중첩 descriptor 생성)

지원되지 않는 타입은 UnsupportedFieldTypeError를 던집니다.

DescriptorRegistry

protobuf descriptor를 캐싱하고 관리합니다. DescriptorBuilderProtoField 어노테이션이 부착된 Python 타입에서 descriptor를 자동 생성합니다.


인터셉터

TracingInterceptor

spakky-tracing과 연동하여 gRPC 요청의 분산 트레이싱을 자동 처리합니다. 요청 메타데이터에서 traceparent 헤더를 추출하여 TraceContext를 복원합니다.

ErrorHandlingInterceptor

AbstractGrpcStatusError 서브클래스를 적절한 gRPC 상태 코드로 자동 변환합니다. 처리되지 않은 예외는 INTERNAL 상태로 매핑됩니다.


PostProcessor

GrpcServerSpec Pod를 등록하면 아래 세 PostProcessor가 순서대로 spec에 구성을 누적합니다. 실제 grpc.aio.Server 인스턴스는 start() 시점에 ApplicationContext의 이벤트 루프에서 GrpcServerSpec.build()로 생성됩니다.

PostProcessor Order 역할
RegisterServicesPostProcessor 0 @GrpcController@rpc 메서드를 generic handler로 빌드하여 spec에 추가
AddInterceptorsPostProcessor 1 ErrorHandlingInterceptor, TracingInterceptor를 spec에 추가
BindServerPostProcessor 2 GrpcServerService를 ApplicationContext에 등록하여 spec 기반으로 서버를 생성·시작·종료

에러 계층

gRPC 상태 코드 에러

AbstractGrpcStatusError를 상속하며, 각 에러가 gRPC StatusCode에 매핑됩니다.

에러 gRPC 상태 코드 설명
InvalidArgument INVALID_ARGUMENT 잘못된 요청 인자
NotFound NOT_FOUND 리소스 없음
AlreadyExists ALREADY_EXISTS 리소스 이미 존재
PermissionDenied PERMISSION_DENIED 권한 없음
Unauthenticated UNAUTHENTICATED 인증 필요
FailedPrecondition FAILED_PRECONDITION 사전 조건 미충족
Unavailable UNAVAILABLE 서비스 이용 불가
InternalError INTERNAL 내부 서버 에러

스키마 에러

에러 설명
UnsupportedFieldTypeError 지원하지 않는 protobuf 필드 타입
MissingProtoFieldAnnotationError ProtoField 어노테이션 누락
DescriptorAlreadyRegisteredError 이미 등록된 descriptor 재등록 시도

gRPC Controller에서 Saga 호출

gRPC Controller도 다른 Controller와 동일하게 Saga Pod를 생성자 주입으로 받습니다. AbstractSaga.execute(data)SagaResult[T]를 반환하므로, RPC 메서드에서는 SagaStatus를 보고 응답 메시지 또는 AbstractGrpcStatusError 서브클래스로 분기합니다. ErrorHandlingInterceptor가 이 에러를 잡아 선언된 gRPC StatusCode로 변환합니다.

from typing import Annotated
from uuid import UUID

from pydantic import BaseModel
from spakky.core.common.error import AbstractSpakkyFrameworkError
from spakky.plugins.grpc.annotations.field import ProtoField
from spakky.plugins.grpc.decorators.rpc import rpc
from spakky.plugins.grpc.error import (
    AbstractGrpcStatusError,
    FailedPrecondition,
    InternalError,
    InvalidArgument,
    Unavailable,
)
from spakky.plugins.grpc.stereotypes.grpc_controller import GrpcController
from spakky.saga import SagaStatus


class CreateOrderRequest(BaseModel):
    customer_id: Annotated[str, ProtoField(number=1)]
    total_amount: Annotated[float, ProtoField(number=2)]


class CreateOrderReply(BaseModel):
    order_id: Annotated[str, ProtoField(number=1)]
    status: Annotated[str, ProtoField(number=2)]


class OrderBusinessRuleViolation(AbstractSpakkyFrameworkError):
    """Application error raised by an expected order-domain rule."""

    message = "Order cannot be created in the current state"


class OrderDependencyUnavailable(AbstractSpakkyFrameworkError):
    """Application error raised when inventory/payment dependencies fail."""

    message = "Order dependency is unavailable"


def map_saga_failure(error: Exception | None) -> AbstractGrpcStatusError:
    if isinstance(error, OrderBusinessRuleViolation):
        return FailedPrecondition()
    if isinstance(error, OrderDependencyUnavailable):
        return Unavailable()
    return InternalError()


def require_created_order_id(data: OrderSagaData) -> UUID:
    if data.order_id is None:
        raise InternalError()
    return data.order_id


@GrpcController(package="example.order", service_name="OrderService")
class OrderGrpcController:
    def __init__(self, order_saga: OrderSaga) -> None:
        self._order_saga = order_saga

    @rpc()
    async def create_order(self, request: CreateOrderRequest) -> CreateOrderReply:
        try:
            customer_id = UUID(request.customer_id)
        except ValueError as error:
            raise InvalidArgument() from error

        result = await self._order_saga.execute(
            OrderSagaData(
                customer_id=customer_id,
                total_amount=request.total_amount,
            )
        )

        match result.status:
            case SagaStatus.COMPLETED:
                return CreateOrderReply(
                    order_id=str(require_created_order_id(result.data)),
                    status=result.status.value,
                )
            case SagaStatus.FAILED:
                raise map_saga_failure(result.error)
            case SagaStatus.TIMED_OUT:
                raise Unavailable()
            case _:
                raise InternalError()

상태 매핑은 서비스 계약에 맞게 고정합니다.

SagaStatus gRPC 에러 의미
COMPLETED 정상 응답 모든 step이 성공했고 최종 SagaData로 응답 생성
FAILED 분류 후 매핑 result.error가 기대된 도메인 실패이면 FailedPrecondition, 외부 의존성 장애이면 Unavailable, 그 외는 InternalError
TIMED_OUT Unavailable 외부 결제·재고 등 일시 장애로 재시도 가능한 실패
그 외 InternalError execute() 반환 시점에 관찰되면 안 되는 내부 상태

SagaStatus.FAILED는 step에서 발생한 임의의 예외를 result.error에 담을 수 있으므로, 모든 실패를 FailedPrecondition으로 취급하지 않습니다. InvalidArgument, NotFound, AlreadyExists 같은 더 구체적인 에러를 쓰려면 Saga step 내부의 UseCase가 반환하거나 발생시킨 애플리케이션 에러를 Controller 경계에서 분류한 뒤 해당 AbstractGrpcStatusError 서브클래스를 raise합니다. 이 에러 클래스들은 plugins/spakky-grpc/src/spakky/plugins/grpc/error.py에 정의되어 있으며, 인터셉터는 error.message를 gRPC detail로 사용합니다.


End-to-End 예제

단일 서비스를 부트스트랩하고 grpc.aio.insecure_channel로 호출하는 완성 예제입니다.

서버 정의

# apps/echo.py
from typing import Annotated

from pydantic import BaseModel
from spakky.plugins.grpc.annotations.field import ProtoField
from spakky.plugins.grpc.decorators.rpc import rpc
from spakky.plugins.grpc.stereotypes.grpc_controller import GrpcController


class EchoRequest(BaseModel):
    text: Annotated[str, ProtoField(number=1)]


class EchoReply(BaseModel):
    text: Annotated[str, ProtoField(number=1)]


@GrpcController(package="example.echo")
class EchoController:
    @rpc()
    async def echo(self, request: EchoRequest) -> EchoReply:
        return EchoReply(text=request.text)

부트스트랩

# main.py
from spakky.core.application.application import SpakkyApplication
from spakky.core.application.application_context import ApplicationContext
from spakky.core.pod.annotations.pod import Pod

import spakky.plugins.grpc
import spakky.tracing
from spakky.plugins.grpc.schema.registry import DescriptorRegistry
from spakky.plugins.grpc.server_spec import GrpcServerSpec

import apps


@Pod()
def get_spec() -> GrpcServerSpec:
    spec = GrpcServerSpec()
    spec.add_insecure_port("127.0.0.1:50051")
    return spec


@Pod()
def get_registry() -> DescriptorRegistry:
    return DescriptorRegistry()


app = (
    SpakkyApplication(ApplicationContext())
    .load_plugins(include={
        spakky.plugins.grpc.PLUGIN_NAME,
        spakky.tracing.PLUGIN_NAME,
    })
    .scan(apps)
    .add(get_spec)
    .add(get_registry)
)
app.start()

app.start() 호출 시 PostProcessor 체인이 실행되어 EchoController의 핸들러와 인터셉터가 spec에 누적되고, GrpcServerService가 ApplicationContext의 이벤트 루프 스레드에서 spec.build()로 실제 서버를 생성한 뒤 127.0.0.1:50051에서 리슨합니다.

클라이언트 호출

클라이언트는 DescriptorRegistry에서 컴파일된 protobuf 메시지 클래스를 얻어 요청을 직렬화합니다.

# client.py
import asyncio

import grpc.aio

from spakky.plugins.grpc.schema.registry import DescriptorRegistry


async def main(registry: DescriptorRegistry) -> None:
    request_cls = registry.get_message_class("example.echo.EchoRequest")
    reply_cls = registry.get_message_class("example.echo.EchoReply")

    async with grpc.aio.insecure_channel("127.0.0.1:50051") as channel:
        call = channel.unary_unary(
            "/example.echo.EchoController/echo",
            request_serializer=lambda msg: msg.SerializeToString(),
            response_deserializer=lambda data: reply_cls.FromString(data),
        )
        request = request_cls()
        request.text = "hello"
        reply = await call(request)
        print(reply.text)  # "hello"


asyncio.run(main(app.container.get(DescriptorRegistry)))

통합 테스트 전체 예제는 plugins/spakky-grpc/tests/integration/를 참고하세요. 유닛·에러·트레이싱 시나리오를 실제 grpc.aio.Server로 검증합니다.

FastAPI @ApiController와의 비교

@GrpcController는 FastAPI 플러그인의 @ApiController와 동일한 설계 철학을 따릅니다. REST에서 gRPC로 이동할 때 참고하세요.

개념적으로 동일한 점

항목 설명
스테레오타입 데코레이터 둘 다 @Controller의 서브클래스. DI 컨테이너가 자동 인식
스캔 기반 자동 등록 SpakkyApplication.scan(...)으로 컨트롤러 Pod를 탐색해 핸들러에 등록
DI 주입 생성자 인자로 @UseCase·@Repository 등 다른 Pod를 주입받음
AOP 적용 @Transactional, @logged 등 AOP Aspect가 동일하게 동작

gRPC 고유 차이점

항목 설명
GrpcServerSpec + DescriptorRegistry Pod 필수 FastAPI는 FastAPI() 인스턴스를 플러그인이 제공하지만, gRPC는 바인드 주소를 보유한 GrpcServerSpec과 protobuf descriptor 저장소인 DescriptorRegistry를 사용자가 Pod로 등록해야 함
메서드 시그니처 제약 @rpc 메서드는 요청 BaseModel 1개만 파라미터로 받음. FastAPI처럼 path/query 파라미터를 분리하지 않음 (path·query 개념이 gRPC에 없음)
메시지는 pydantic BaseModel + ProtoField pydantic.BaseModel 서브클래스 + Annotated[T, ProtoField(number=N)]로 선언. 필드 번호는 사용자가 명시. protobuf ↔ BaseModel 변환은 google.protobuf.json_format 브릿지로 수행
스트리밍 AsyncIterator[T]를 요청/응답 타입으로 사용하여 4가지 스트리밍 패턴 지원 (FastAPI는 StreamingResponse로 단방향만)
에러 → 상태 코드 매핑 HTTP 상태 코드 대신 gRPC StatusCode. AbstractGrpcStatusError 서브클래스를 ErrorHandlingInterceptor가 매핑

다음 단계