콘텐츠로 이동

gRPC 통합

spakky-grpc는 code-first 방식의 gRPC 서비스 통합을 제공합니다. pydantic BaseModel에서 protobuf descriptor를 자동 생성하며, @GrpcController@rpc 데코레이터로 선언적으로 gRPC 서비스를 정의합니다.


동작 원리

  1. @GrpcController로 gRPC 서비스 컨트롤러를 선언
  2. @rpc로 메서드를 RPC 엔드포인트로 마크
  3. ProtoField 어노테이션으로 요청/응답 타입에 protobuf 메타데이터 부착
  4. DescriptorBuilder가 Python 타입에서 protobuf descriptor를 자동 생성
  5. PostProcessor들이 서비스 등록, 인터셉터 추가, 서버 바인딩을 자동 처리

설정

pip install spakky-grpc

spakky-grpcspakky, spakky-tracing, grpcio, protobuf, pydantic>=2.4, pydantic-settings에 의존합니다.

spakky-grpc 플러그인은 GrpcConfig, GrpcServerSpec, DescriptorRegistry를 기본 Pod로 등록합니다. 서버를 리슨하려면 bind address를 환경변수로 지정합니다. bind address가 비어 있으면 descriptor와 handler 등록은 가능하지만 네트워크 listener는 열지 않습니다.

export SPAKKY_GRPC_BIND_ADDRESSES='["127.0.0.1:50051"]'
import spakky.plugins.grpc
from spakky.core.application.application import SpakkyApplication
from spakky.core.application.application_context import ApplicationContext

import apps  # `@GrpcController`가 정의된 사용자 패키지


app = (
    SpakkyApplication(ApplicationContext())
    .load_plugins(include={
        spakky.plugins.grpc.PLUGIN_NAME,
    })
    .scan(apps)
    .start()
)

서비스 정의

@GrpcController

gRPC 서비스 컨트롤러를 선언합니다. @Controller의 서브클래스이므로 DI 컨테이너에 자동 등록됩니다.

from spakky.plugins.grpc.stereotypes.grpc_controller import GrpcController

@GrpcController(package="example.user", service_name="UserService")
class UserServiceController:
    def __init__(self, user_service: UserService) -> None:
        self._user_service = user_service

    ...

@rpc

메서드를 gRPC RPC 엔드포인트로 마크합니다. RpcMethodType으로 스트리밍 모드를 지정합니다.

from spakky.plugins.grpc.decorators.rpc import rpc, RpcMethodType

@GrpcController(package="example.user", service_name="UserService")
class UserServiceController:
    @rpc(method_type=RpcMethodType.UNARY)
    async def get_user(self, request: GetUserRequest) -> GetUserResponse:
        user = await self._user_service.get_user(request.user_id)
        return GetUserResponse(user_id=user.uid, name=user.name)

RpcMethodType

설명
UNARY 단일 요청, 단일 응답
SERVER_STREAMING 단일 요청, 스트림 응답
CLIENT_STREAMING 스트림 요청, 단일 응답
BIDI_STREAMING 양방향 스트리밍

Code-First Protobuf

ProtoField

pydantic BaseModel 필드에 protobuf 필드 번호를 부착합니다. .proto 파일 없이 Python 타입만으로 protobuf descriptor를 생성합니다. DescriptorBuilderBaseModel.model_fields[name].metadata에서 ProtoField 인스턴스를 읽어오므로 메시지 타입은 반드시 pydantic.BaseModel 서브클래스여야 합니다.

from typing import Annotated
from pydantic import BaseModel
from spakky.plugins.grpc.annotations.field import ProtoField

class GetUserRequest(BaseModel):
    user_id: Annotated[str, ProtoField(number=1)]

class GetUserResponse(BaseModel):
    user_id: Annotated[str, ProtoField(number=1)]
    name: Annotated[str, ProtoField(number=2)]
    email: Annotated[str, ProtoField(number=3)]

지원되는 타입 매핑

type_map 모듈이 Python 타입을 protobuf 타입으로 자동 매핑합니다.

Python 타입 Protobuf 타입
str string
int int64
float double
bool bool
bytes bytes
list[T] repeated T
T \| None optional T (proto3 optional)
중첩 BaseModel message (재귀적으로 중첩 descriptor 생성)

지원되지 않는 타입은 UnsupportedFieldTypeError를 던집니다.

DescriptorRegistry

protobuf descriptor를 캐싱하고 관리합니다. DescriptorBuilderProtoField 어노테이션이 부착된 Python 타입에서 descriptor를 자동 생성합니다.


인터셉터

TracingInterceptor

spakky-tracing과 연동하여 gRPC 요청의 분산 트레이싱을 자동 처리합니다. 요청 메타데이터에서 traceparent 헤더를 추출하여 TraceContext를 복원합니다.

ErrorHandlingInterceptor

AbstractGrpcStatusError 서브클래스를 적절한 gRPC 상태 코드로 자동 변환합니다. 처리되지 않은 예외는 INTERNAL 상태로 매핑됩니다.


PostProcessor

spakky-grpc 플러그인이 제공하는 GrpcServerSpec Pod에 아래 세 PostProcessor가 순서대로 구성을 누적합니다. 실제 grpc.aio.Server 인스턴스는 start() 시점에 ApplicationContext의 이벤트 루프에서 GrpcServerSpec.build()로 생성됩니다.

PostProcessor Order 역할
RegisterServicesPostProcessor 0 @GrpcController@rpc 메서드를 generic handler로 빌드하여 spec에 추가
AddInterceptorsPostProcessor 1 ErrorHandlingInterceptor, TracingInterceptor를 spec에 추가
BindServerPostProcessor 2 GrpcServerService를 ApplicationContext에 등록하여 spec 기반으로 서버를 생성·시작·종료

에러 계층

gRPC 상태 코드 에러

AbstractGrpcStatusError를 상속하며, 각 에러가 gRPC StatusCode에 매핑됩니다.

에러 gRPC 상태 코드 설명
InvalidArgument INVALID_ARGUMENT 잘못된 요청 인자
NotFound NOT_FOUND 리소스 없음
AlreadyExists ALREADY_EXISTS 리소스 이미 존재
PermissionDenied PERMISSION_DENIED 권한 없음
Unauthenticated UNAUTHENTICATED 인증 필요
FailedPrecondition FAILED_PRECONDITION 사전 조건 미충족
Unavailable UNAVAILABLE 서비스 이용 불가
InternalError INTERNAL 내부 서버 에러

스키마 에러

에러 설명
UnsupportedFieldTypeError 지원하지 않는 protobuf 필드 타입
MissingProtoFieldAnnotationError ProtoField 어노테이션 누락
DescriptorAlreadyRegisteredError 이미 등록된 descriptor 재등록 시도

End-to-End 예제

단일 서비스를 부트스트랩하고 grpc.aio.insecure_channel로 호출하는 완성 예제입니다.

서버 정의

# apps/echo.py
from typing import Annotated

from pydantic import BaseModel
from spakky.plugins.grpc.annotations.field import ProtoField
from spakky.plugins.grpc.decorators.rpc import rpc
from spakky.plugins.grpc.stereotypes.grpc_controller import GrpcController


class EchoRequest(BaseModel):
    text: Annotated[str, ProtoField(number=1)]


class EchoReply(BaseModel):
    text: Annotated[str, ProtoField(number=1)]


@GrpcController(package="example.echo")
class EchoController:
    @rpc()
    async def echo(self, request: EchoRequest) -> EchoReply:
        return EchoReply(text=request.text)

부트스트랩

# main.py
from spakky.core.application.application import SpakkyApplication
from spakky.core.application.application_context import ApplicationContext

import spakky.plugins.grpc
import spakky.tracing

import apps


app = (
    SpakkyApplication(ApplicationContext())
    .load_plugins(include={
        spakky.plugins.grpc.PLUGIN_NAME,
        spakky.tracing.PLUGIN_NAME,
    })
    .scan(apps)
)
app.start()

SPAKKY_GRPC_BIND_ADDRESSES='["127.0.0.1:50051"]'가 설정되어 있으면 app.start() 호출 시 PostProcessor 체인이 실행되어 EchoController의 핸들러와 인터셉터가 spec에 누적되고, GrpcServerService가 ApplicationContext의 이벤트 루프 스레드에서 spec.build()로 실제 서버를 생성한 뒤 127.0.0.1:50051에서 리슨합니다.

클라이언트 호출

클라이언트는 DescriptorRegistry에서 컴파일된 protobuf 메시지 클래스를 얻어 요청을 직렬화합니다.

# client.py
import asyncio

import grpc.aio

from spakky.plugins.grpc.schema.registry import DescriptorRegistry


async def main(registry: DescriptorRegistry) -> None:
    request_cls = registry.get_message_class("example.echo.EchoRequest")
    reply_cls = registry.get_message_class("example.echo.EchoReply")

    async with grpc.aio.insecure_channel("127.0.0.1:50051") as channel:
        call = channel.unary_unary(
            "/example.echo.EchoController/echo",
            request_serializer=lambda msg: msg.SerializeToString(),
            response_deserializer=lambda data: reply_cls.FromString(data),
        )
        request = request_cls()
        request.text = "hello"
        reply = await call(request)
        print(reply.text)  # "hello"


asyncio.run(main(app.container.get(DescriptorRegistry)))

통합 테스트 전체 예제는 plugins/spakky-grpc/tests/integration/를 참고하세요. 유닛·에러·트레이싱 시나리오를 실제 grpc.aio.Server로 검증합니다.

FastAPI @ApiController와의 비교

@GrpcController는 FastAPI 플러그인의 @ApiController와 동일한 설계 철학을 따릅니다. REST에서 gRPC로 이동할 때 참고하세요.

개념적으로 동일한 점

항목 설명
스테레오타입 데코레이터 둘 다 @Controller의 서브클래스. DI 컨테이너가 자동 인식
스캔 기반 자동 등록 SpakkyApplication.scan(...)으로 컨트롤러 Pod를 탐색해 핸들러에 등록
DI 주입 생성자 인자로 @UseCase·@Repository 등 다른 Pod를 주입받음
AOP 적용 @Transactional, @logged 등 AOP Aspect가 동일하게 동작

gRPC 고유 차이점

항목 설명
SPAKKY_GRPC_BIND_ADDRESSES FastAPI와 마찬가지로 런타임 공유 객체는 플러그인이 제공하고, gRPC listener 주소만 환경 설정으로 지정함
메서드 시그니처 제약 @rpc 메서드는 요청 BaseModel 1개만 파라미터로 받음. FastAPI처럼 path/query 파라미터를 분리하지 않음 (path·query 개념이 gRPC에 없음)
메시지는 pydantic BaseModel + ProtoField pydantic.BaseModel 서브클래스 + Annotated[T, ProtoField(number=N)]로 선언. 필드 번호는 사용자가 명시. protobuf ↔ BaseModel 변환은 google.protobuf.json_format 브릿지로 수행
스트리밍 AsyncIterator[T]를 요청/응답 타입으로 사용하여 4가지 스트리밍 패턴 지원 (FastAPI는 StreamingResponse로 단방향만)
에러 → 상태 코드 매핑 HTTP 상태 코드 대신 gRPC StatusCode. AbstractGrpcStatusError 서브클래스를 ErrorHandlingInterceptor가 매핑

다음 단계