콘텐츠로 이동

spakky-grpc

gRPC 서비스 컨트롤러 통합 — code-first, 타입 안전 프로토콜 생성

스테레오타입

gRPC controller stereotype for service grouping.

Provides the @GrpcController stereotype for marking classes as gRPC service controllers with automatic service registration and protobuf package configuration.

GrpcController(package, service_name=None, *, name='', scope=Scope.SINGLETON) dataclass

Bases: Controller

Stereotype for gRPC service controllers.

Marks a class as a gRPC service controller with automatic service registration. Methods decorated with @rpc will be registered as gRPC service methods.

Attributes:

Name Type Description
package str

Protobuf package name for the service.

service_name str | None

gRPC service name. Defaults to the class name if not provided.

package instance-attribute

Protobuf package name for the service.

service_name = None class-attribute instance-attribute

gRPC service name. Defaults to the class name.

__call__(obj)

Apply the gRPC controller stereotype to a class.

Automatically generates the service name from the class name if not provided.

Parameters:

Name Type Description Default
obj AnyT

The class to decorate.

required

Returns:

Type Description
AnyT

The decorated class registered as a Pod.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/stereotypes/grpc_controller.py
def __call__(self, obj: AnyT) -> AnyT:
    """Apply the gRPC controller stereotype to a class.

    Automatically generates the service name from the class name
    if not provided.

    Args:
        obj: The class to decorate.

    Returns:
        The decorated class registered as a Pod.
    """
    if self.service_name is None:
        self.service_name = obj.__name__  # type: ignore[union-attr] - 제네릭 AnyT가 클래스 타입으로 좁혀지지 않아 __name__ 접근 오탐
    return super().__call__(obj)

데코레이터

RPC method decorator for gRPC service methods.

Provides the @rpc decorator for marking controller methods as gRPC service methods with support for all four gRPC streaming patterns.

RpcMethodType

Bases: StrEnum

gRPC method streaming patterns.

Attributes:

Name Type Description
UNARY

Single request, single response.

SERVER_STREAMING

Single request, stream of responses.

CLIENT_STREAMING

Stream of requests, single response.

BIDI_STREAMING

Stream of requests, stream of responses.

Rpc(method_type=RpcMethodType.UNARY, request_type=None, response_type=None) dataclass

Bases: FunctionAnnotation

Function annotation for marking methods as gRPC RPC endpoints.

Stores RPC configuration including the streaming pattern and request/response type metadata.

Attributes:

Name Type Description
method_type RpcMethodType

gRPC streaming pattern for this method.

request_type type | None

Request message type. Auto-extracted from type hints if not provided.

response_type type | None

Response message type. Auto-extracted from type hints if not provided.

__call__(obj)

Annotate a method as an RPC endpoint.

Extracts request and response types from type hints if not explicitly provided.

Parameters:

Name Type Description Default
obj Callable[..., AnyT]

The method to annotate.

required

Returns:

Type Description
Callable[..., AnyT]

The annotated method.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/decorators/rpc.py
def __call__(self, obj: Callable[..., AnyT]) -> Callable[..., AnyT]:
    """Annotate a method as an RPC endpoint.

    Extracts request and response types from type hints if not
    explicitly provided.

    Args:
        obj: The method to annotate.

    Returns:
        The annotated method.
    """
    if self.request_type is None or self.response_type is None:
        self._extract_types(obj)
    return super().__call__(obj)

rpc(method_type=RpcMethodType.UNARY, request_type=None, response_type=None)

Decorator to mark a controller method as a gRPC RPC endpoint.

Attaches RPC configuration to the method including streaming pattern and message type metadata.

Parameters:

Name Type Description Default
method_type RpcMethodType

gRPC streaming pattern for this method.

UNARY
request_type type | None

Request message type. Auto-extracted from type hints if not provided.

None
response_type type | None

Response message type. Auto-extracted from type hints if not provided.

None

Returns:

Type Description
Callable[[Callable[..., AnyT]], Callable[..., AnyT]]

A decorator function that attaches the RPC configuration.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/decorators/rpc.py
def rpc(
    method_type: RpcMethodType = RpcMethodType.UNARY,
    request_type: type | None = None,
    response_type: type | None = None,
) -> Callable[[Callable[..., AnyT]], Callable[..., AnyT]]:
    """Decorator to mark a controller method as a gRPC RPC endpoint.

    Attaches RPC configuration to the method including streaming pattern
    and message type metadata.

    Args:
        method_type: gRPC streaming pattern for this method.
        request_type: Request message type. Auto-extracted from type hints
            if not provided.
        response_type: Response message type. Auto-extracted from type hints
            if not provided.

    Returns:
        A decorator function that attaches the RPC configuration.
    """

    def wrapper(method: Callable[..., AnyT]) -> Callable[..., AnyT]:
        return Rpc(
            method_type=method_type,
            request_type=request_type,
            response_type=response_type,
        )(method)

    return wrapper

어노테이션

Protobuf field annotation for pydantic-based message definitions.

Provides the ProtoField annotation for specifying protobuf field numbers on pydantic BaseModel fields using the Annotated type hint pattern.

Example::

from pydantic import BaseModel
from typing import Annotated

class HelloRequest(BaseModel):
    name: Annotated[str, ProtoField(number=1)]
    greeting_count: Annotated[int, ProtoField(number=2)]

ProtoField(number) dataclass

Protobuf field number annotation for pydantic model fields.

Used with typing.Annotated to specify the protobuf field number for a pydantic BaseModel field, enabling code-first protobuf message definition. The annotation is read at runtime from the model's model_fields[name].metadata tuple.

Attributes:

Name Type Description
number int

The protobuf field number. Must be a positive integer.

number instance-attribute

The protobuf field number.

Handler

Generic RPC handler for code-first gRPC service dispatch.

Routes incoming gRPC calls to @GrpcController methods by matching the fully-qualified method name, performing protobuf ↔ pydantic BaseModel conversion via the google.protobuf.json_format bridge.

GrpcServiceHandler(*, controller_type, package, service_name, container, application_context, registry)

Bases: GenericRpcHandler

Generic handler dispatching gRPC calls to @GrpcController methods.

For each @rpc-decorated method, builds a grpc.RpcMethodHandler with serialiser/deserialiser that convert between protobuf wire format and pydantic BaseModel instances.

Attributes:

Name Type Description
_full_service_name str

Fully-qualified <package>.<service> name.

_controller_type type

The @GrpcController class.

_container IContainer

IoC container for obtaining fresh controller instances.

_application_context IApplicationContext

Application context for request-scoped isolation.

_registry DescriptorRegistry

Descriptor registry for message class lookup.

_handlers dict[str, RpcMethodHandler]

Pre-built map of /<package>.<service>/<method>RpcMethodHandler.

Initialise the handler and pre-build per-method dispatchers.

Parameters:

Name Type Description Default
controller_type type

The @GrpcController-decorated class.

required
package str

Protobuf package name.

required
service_name str

gRPC service name.

required
container IContainer

IoC container for obtaining controller instances.

required
application_context IApplicationContext

Application context for request isolation.

required
registry DescriptorRegistry

Descriptor registry for message class lookup.

required
Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/handler.py
def __init__(
    self,
    *,
    controller_type: type,
    package: str,
    service_name: str,
    container: IContainer,
    application_context: IApplicationContext,
    registry: DescriptorRegistry,
) -> None:
    """Initialise the handler and pre-build per-method dispatchers.

    Args:
        controller_type: The ``@GrpcController``-decorated class.
        package: Protobuf package name.
        service_name: gRPC service name.
        container: IoC container for obtaining controller instances.
        application_context: Application context for request isolation.
        registry: Descriptor registry for message class lookup.
    """
    self._full_service_name = f"{package}.{service_name}"
    self._controller_type = controller_type
    self._container = container
    self._application_context = application_context
    self._registry = registry
    self._handlers = {}
    self._build_handlers()

service(handler_call_details)

Resolve an RPC method handler for the incoming call.

Parameters:

Name Type Description Default
handler_call_details HandlerCallDetails

Describes the incoming RPC.

required

Returns:

Type Description
RpcMethodHandler | None

The matched handler, or None if not handled.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/handler.py
@override
def service(
    self,
    handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler | None:
    """Resolve an RPC method handler for the incoming call.

    Args:
        handler_call_details: Describes the incoming RPC.

    Returns:
        The matched handler, or ``None`` if not handled.
    """
    return self._handlers.get(handler_call_details.method)

서버 명세

Deferred gRPC server configuration.

grpc.aio.server() binds to the current event loop at creation time, so the real server must be instantiated on the event loop that eventually runs it. :class:GrpcServerSpec collects everything needed to build the server (interceptors, generic handlers, bind addresses) during post-processing, and :class:GrpcServerService materialises it at start_async time on the correct loop.

GrpcServerSpec()

Configuration collected during post-processing for deferred server creation.

Attributes:

Name Type Description
handlers list[GenericRpcHandler]

Generic RPC handlers to register on the server.

interceptors list[ServerInterceptor]

Server interceptors to apply at creation time.

bind_addresses list[str]

host:port strings to pass to add_insecure_port.

bound_ports list[int]

Ports returned by add_insecure_port for each bind address, populated when :meth:build runs. Useful when binding to :0 and needing to discover the OS-assigned port.

Initialise an empty spec.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/server_spec.py
def __init__(self) -> None:
    """Initialise an empty spec."""
    self.handlers = []
    self.interceptors = []
    self.bind_addresses = []
    self.bound_ports = []

add_handler(handler)

Register a generic RPC handler.

Parameters:

Name Type Description Default
handler GenericRpcHandler

The handler to add to the server.

required
Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/server_spec.py
def add_handler(self, handler: grpc.GenericRpcHandler) -> None:
    """Register a generic RPC handler.

    Args:
        handler: The handler to add to the server.
    """
    self.handlers.append(handler)

add_interceptor(interceptor)

Register a server interceptor.

Parameters:

Name Type Description Default
interceptor ServerInterceptor

The interceptor to install on the server.

required
Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/server_spec.py
def add_interceptor(self, interceptor: grpc.aio.ServerInterceptor) -> None:
    """Register a server interceptor.

    Args:
        interceptor: The interceptor to install on the server.
    """
    self.interceptors.append(interceptor)

add_insecure_port(address)

Register an insecure bind address.

Parameters:

Name Type Description Default
address str

Address in host:port form.

required
Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/server_spec.py
def add_insecure_port(self, address: str) -> None:
    """Register an insecure bind address.

    Args:
        address: Address in ``host:port`` form.
    """
    self.bind_addresses.append(address)

build()

Instantiate the underlying grpc.aio.Server on the current loop.

Must be called from the event loop that will run the server; see the module docstring for the rationale.

Returns:

Type Description
Server

The fully-configured server ready for .start().

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/server_spec.py
def build(self) -> grpc.aio.Server:
    """Instantiate the underlying ``grpc.aio.Server`` on the current loop.

    Must be called from the event loop that will run the server; see
    the module docstring for the rationale.

    Returns:
        The fully-configured server ready for ``.start()``.
    """
    server = grpc.aio.server(interceptors=list(self.interceptors))
    server.add_generic_rpc_handlers(tuple(self.handlers))
    self.bound_ports = [
        server.add_insecure_port(address) for address in self.bind_addresses
    ]
    return server

스키마

Registry

Descriptor pool registry for compiled protobuf descriptors.

Manages FileDescriptorProto registration in a descriptor_pool, provides caching, and returns compiled message classes and service descriptors.

DescriptorRegistry(pool=None)

Registry for protobuf descriptors backed by a DescriptorPool.

Registers FileDescriptorProto instances, prevents duplicates, and provides access to compiled message classes and service descriptors.

Attributes:

Name Type Description
pool DescriptorPool

The underlying DescriptorPool.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/schema/registry.py
def __init__(self, pool: DescriptorPool | None = None) -> None:
    self.pool: DescriptorPool = pool or DescriptorPool()
    self._registered_files: set[str] = set()

register(file_proto)

Register a FileDescriptorProto in the pool.

Parameters:

Name Type Description Default
file_proto FileDescriptorProto

The file descriptor proto to register.

required

Returns:

Type Description
FileDescriptor

The compiled FileDescriptor.

Raises:

Type Description
DescriptorAlreadyRegisteredError

If the file is already registered.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/schema/registry.py
def register(self, file_proto: FileDescriptorProto) -> FileDescriptor:
    """Register a FileDescriptorProto in the pool.

    Args:
        file_proto: The file descriptor proto to register.

    Returns:
        The compiled FileDescriptor.

    Raises:
        DescriptorAlreadyRegisteredError: If the file is already
            registered.
    """
    if file_proto.name in self._registered_files:
        raise DescriptorAlreadyRegisteredError(file_proto.name)

    self._registered_files.add(file_proto.name)
    serialized = file_proto.SerializeToString()
    self.pool.AddSerializedFile(serialized)
    return self.pool.FindFileByName(file_proto.name)

is_registered(file_name)

Check if a file is already registered.

Parameters:

Name Type Description Default
file_name str

The proto file name.

required

Returns:

Type Description
bool

True if the file has been registered.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/schema/registry.py
def is_registered(self, file_name: str) -> bool:
    """Check if a file is already registered.

    Args:
        file_name: The proto file name.

    Returns:
        True if the file has been registered.
    """
    return file_name in self._registered_files

find_message_descriptor(full_name)

Find a message descriptor by its fully-qualified name.

Parameters:

Name Type Description Default
full_name str

The fully-qualified protobuf message name (e.g. package.MessageName).

required

Returns:

Type Description
Descriptor

The message Descriptor.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/schema/registry.py
def find_message_descriptor(self, full_name: str) -> Descriptor:
    """Find a message descriptor by its fully-qualified name.

    Args:
        full_name: The fully-qualified protobuf message name
            (e.g. ``package.MessageName``).

    Returns:
        The message Descriptor.
    """
    return self.pool.FindMessageTypeByName(full_name)

get_message_class(full_name)

Get a runtime message class for the given type name.

Parameters:

Name Type Description Default
full_name str

The fully-qualified protobuf message name.

required

Returns:

Type Description
type[Message]

A Message subclass that can be instantiated.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/schema/registry.py
def get_message_class(self, full_name: str) -> type[Message]:
    """Get a runtime message class for the given type name.

    Args:
        full_name: The fully-qualified protobuf message name.

    Returns:
        A Message subclass that can be instantiated.
    """
    descriptor = self.find_message_descriptor(full_name)
    return GetMessageClass(descriptor)

find_service_descriptor(full_name)

Find a service descriptor by its fully-qualified name.

Parameters:

Name Type Description Default
full_name str

The fully-qualified protobuf service name (e.g. package.ServiceName).

required

Returns:

Type Description
ServiceDescriptor

The ServiceDescriptor.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/schema/registry.py
def find_service_descriptor(self, full_name: str) -> ServiceDescriptor:
    """Find a service descriptor by its fully-qualified name.

    Args:
        full_name: The fully-qualified protobuf service name
            (e.g. ``package.ServiceName``).

    Returns:
        The ServiceDescriptor.
    """
    return self.pool.FindServiceByName(full_name)

Descriptor Builder

Pydantic BaseModel to protobuf FileDescriptorProto builder.

Converts pydantic BaseModel subclasses with ProtoField metadata and @rpc-decorated controller methods into protobuf FileDescriptorProto instances.

build_message_descriptor(model_type, collected=None)

Build a DescriptorProto from a pydantic BaseModel subclass.

Recursively processes nested BaseModel fields into nested message descriptors.

Parameters:

Name Type Description Default
model_type type[BaseModel]

The BaseModel subclass to convert.

required
collected dict[str, DescriptorProto] | None

Accumulator for all message descriptors encountered during recursive processing. Used internally.

None

Returns:

Type Description
DescriptorProto

A tuple of (root DescriptorProto, dict of all collected

dict[str, DescriptorProto]

descriptors).

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/schema/descriptor_builder.py
def build_message_descriptor(
    model_type: type[BaseModel],
    collected: dict[str, DescriptorProto] | None = None,
) -> tuple[DescriptorProto, dict[str, DescriptorProto]]:
    """Build a ``DescriptorProto`` from a pydantic ``BaseModel`` subclass.

    Recursively processes nested ``BaseModel`` fields into nested message
    descriptors.

    Args:
        model_type: The ``BaseModel`` subclass to convert.
        collected: Accumulator for all message descriptors encountered
            during recursive processing. Used internally.

    Returns:
        A tuple of (root ``DescriptorProto``, dict of all collected
        descriptors).
    """
    if collected is None:
        collected = {}

    name = model_type.__name__
    if name in collected:
        return collected[name], collected

    descriptor = DescriptorProto(name=name)
    collected[name] = descriptor

    for field_name, field_info in model_type.model_fields.items():
        resolved = resolve_type(field_info.annotation)
        proto_field = extract_proto_field(model_type, field_name)

        field_desc = FieldDescriptorProto(
            name=field_name,
            number=proto_field.number,
            type=cast(FieldDescriptorProto.Type.ValueType, resolved.proto_type),
        )

        if resolved.is_repeated:
            field_desc.label = FieldDescriptorProto.LABEL_REPEATED
        elif resolved.is_optional:
            field_desc.label = FieldDescriptorProto.LABEL_OPTIONAL
            field_desc.proto3_optional = True
            oneof_index = len(descriptor.oneof_decl)
            descriptor.oneof_decl.append(OneofDescriptorProto(name=f"__{field_name}"))
            field_desc.oneof_index = oneof_index
        else:
            field_desc.label = FieldDescriptorProto.LABEL_OPTIONAL

        if resolved.is_message and resolved.message_type is not None:
            field_desc.type_name = resolved.message_type.__name__
            build_message_descriptor(resolved.message_type, collected)

        descriptor.field.append(field_desc)

    return descriptor, collected

build_service_descriptor(controller_type, package, service_name, collected)

Build a ServiceDescriptorProto from an @GrpcController class.

Inspects all @rpc-decorated methods on the controller and generates method descriptors with fully-qualified type names.

Parameters:

Name Type Description Default
controller_type type

The controller class to inspect.

required
package str

The protobuf package name.

required
service_name str

The gRPC service name.

required
collected dict[str, DescriptorProto]

Accumulator for message descriptors found in method signatures.

required

Returns:

Type Description
ServiceDescriptorProto

A ServiceDescriptorProto for the controller.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/schema/descriptor_builder.py
def build_service_descriptor(
    controller_type: type,
    package: str,
    service_name: str,
    collected: dict[str, DescriptorProto],
) -> ServiceDescriptorProto:
    """Build a ``ServiceDescriptorProto`` from an ``@GrpcController`` class.

    Inspects all ``@rpc``-decorated methods on the controller and generates
    method descriptors with fully-qualified type names.

    Args:
        controller_type: The controller class to inspect.
        package: The protobuf package name.
        service_name: The gRPC service name.
        collected: Accumulator for message descriptors found in method
            signatures.

    Returns:
        A ``ServiceDescriptorProto`` for the controller.
    """
    service = ServiceDescriptorProto(name=service_name)

    for method_name, method in getmembers(controller_type, predicate=isfunction):
        if not Rpc.exists(method):
            continue

        rpc_annotation = Rpc.get(method)
        request_type = rpc_annotation.request_type
        response_type = rpc_annotation.response_type

        if request_type is not None:
            build_message_descriptor(request_type, collected)
            input_type = f".{package}.{request_type.__name__}"
        else:
            input_type = ""

        if response_type is not None:
            build_message_descriptor(response_type, collected)
            output_type = f".{package}.{response_type.__name__}"
        else:
            output_type = ""

        method_desc = MethodDescriptorProto(
            name=method_name,
            input_type=input_type,
            output_type=output_type,
        )

        service.method.append(method_desc)

    return service

build_file_descriptor(controller_type)

Build a complete FileDescriptorProto from an @GrpcController class.

Generates all message descriptors referenced by @rpc methods and the service descriptor, packaged into a single FileDescriptorProto.

Parameters:

Name Type Description Default
controller_type type

The @GrpcController-decorated class.

required

Returns:

Type Description
FileDescriptorProto

A FileDescriptorProto ready for descriptor_pool registration.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/schema/descriptor_builder.py
def build_file_descriptor(controller_type: type) -> FileDescriptorProto:
    """Build a complete ``FileDescriptorProto`` from an ``@GrpcController`` class.

    Generates all message descriptors referenced by ``@rpc`` methods and
    the service descriptor, packaged into a single ``FileDescriptorProto``.

    Args:
        controller_type: The ``@GrpcController``-decorated class.

    Returns:
        A ``FileDescriptorProto`` ready for ``descriptor_pool`` registration.
    """
    annotation = GrpcController.get(controller_type)
    package = annotation.package
    service_name = annotation.service_name or controller_type.__name__

    file_name = f"{package.replace('.', '/')}/{service_name}.proto"

    collected: dict[str, DescriptorProto] = {}
    service = build_service_descriptor(
        controller_type, package, service_name, collected
    )

    file_desc = FileDescriptorProto(
        name=file_name,
        package=package,
        syntax="proto3",
    )

    for message_desc in collected.values():
        file_desc.message_type.append(message_desc)

    file_desc.service.append(service)

    return file_desc

타입 맵

Python type to protobuf type mapping.

Maps Python built-in types and composite types (list, Optional, nested BaseModel) to their protobuf FieldDescriptorProto equivalents. Field-number metadata is extracted from pydantic BaseModel.model_fields[name].metadata entries carrying a :class:ProtoField instance.

PYTHON_TO_PROTO_TYPE = {str: FieldDescriptorProto.TYPE_STRING, int: FieldDescriptorProto.TYPE_INT64, float: FieldDescriptorProto.TYPE_DOUBLE, bool: FieldDescriptorProto.TYPE_BOOL, bytes: FieldDescriptorProto.TYPE_BYTES} module-attribute

Mapping of Python primitive types to protobuf field type constants.

ResolvedFieldType(proto_type, *, is_repeated=False, is_optional=False, is_message=False, message_type=None)

Result of resolving a Python type annotation to protobuf metadata.

Attributes:

Name Type Description
proto_type

Protobuf field type constant from FieldDescriptorProto.

is_repeated

Whether the field is a repeated (list) field.

is_optional

Whether the field is optional.

is_message

Whether the field references a nested message type.

message_type

The nested BaseModel type for message fields.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/schema/type_map.py
def __init__(
    self,
    proto_type: FieldDescriptorProto.Type.ValueType,
    *,
    is_repeated: bool = False,
    is_optional: bool = False,
    is_message: bool = False,
    message_type: type[BaseModel] | None = None,
) -> None:
    self.proto_type = proto_type
    self.is_repeated = is_repeated
    self.is_optional = is_optional
    self.is_message = is_message
    self.message_type = message_type

resolve_type(annotation)

Resolve a Python type annotation to protobuf field metadata.

Handles: - Primitive types (str, int, float, bool, bytes) - list[T] → repeated field - Optional[T] (T | None) → optional field - Nested BaseModel → message type

Parameters:

Name Type Description Default
annotation object

The Python type annotation to resolve.

required

Returns:

Type Description
ResolvedFieldType

A ResolvedFieldType with protobuf mapping information.

Raises:

Type Description
UnsupportedFieldTypeError

If the type cannot be mapped.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/schema/type_map.py
def resolve_type(annotation: object) -> ResolvedFieldType:
    """Resolve a Python type annotation to protobuf field metadata.

    Handles:
    - Primitive types (str, int, float, bool, bytes)
    - ``list[T]`` → repeated field
    - ``Optional[T]`` (``T | None``) → optional field
    - Nested ``BaseModel`` → message type

    Args:
        annotation: The Python type annotation to resolve.

    Returns:
        A ResolvedFieldType with protobuf mapping information.

    Raises:
        UnsupportedFieldTypeError: If the type cannot be mapped.
    """
    origin = get_origin(annotation)
    args = get_args(annotation)

    if origin is list:
        inner = args[0] if args else None
        if inner is None:  # pragma: no cover - defensive guard, list[T] always has args
            raise UnsupportedFieldTypeError(list)
        inner_resolved = _resolve_scalar(inner)
        return ResolvedFieldType(
            proto_type=inner_resolved.proto_type,
            is_repeated=True,
            is_message=inner_resolved.is_message,
            message_type=inner_resolved.message_type,
        )

    if _is_union(annotation, origin, args):
        non_none = [a for a in args if a is not type(None)]
        if len(non_none) != 1:
            raise UnsupportedFieldTypeError(type(annotation))
        inner_resolved = _resolve_scalar(non_none[0])
        return ResolvedFieldType(
            proto_type=inner_resolved.proto_type,
            is_optional=True,
            is_message=inner_resolved.is_message,
            message_type=inner_resolved.message_type,
        )

    return _resolve_scalar(annotation)

extract_proto_field(model_type, field_name)

Extract the ProtoField metadata from a pydantic model field.

Parameters:

Name Type Description Default
model_type type[BaseModel]

The BaseModel subclass to inspect.

required
field_name str

The name of the field.

required

Returns:

Type Description
ProtoField

The ProtoField metadata attached to the field.

Raises:

Type Description
MissingProtoFieldAnnotationError

If the field is absent or carries no ProtoField metadata.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/schema/type_map.py
def extract_proto_field(model_type: type[BaseModel], field_name: str) -> ProtoField:
    """Extract the ``ProtoField`` metadata from a pydantic model field.

    Args:
        model_type: The ``BaseModel`` subclass to inspect.
        field_name: The name of the field.

    Returns:
        The ``ProtoField`` metadata attached to the field.

    Raises:
        MissingProtoFieldAnnotationError: If the field is absent or
            carries no ``ProtoField`` metadata.
    """
    field_info = model_type.model_fields.get(field_name)
    if field_info is None:
        raise MissingProtoFieldAnnotationError(model_type, field_name)
    for meta in field_info.metadata:
        if isinstance(meta, ProtoField):
            return meta
    raise MissingProtoFieldAnnotationError(model_type, field_name)

인터셉터

Tracing interceptor for W3C Trace Context propagation over gRPC.

Extracts trace context from incoming gRPC metadata, activates a child span for the RPC lifetime, and injects trace context into trailing metadata.

TracingInterceptor(*, propagator)

Bases: ServerInterceptor

Interceptor that propagates W3C Trace Context across gRPC boundaries.

Extracts traceparent / tracestate from incoming request metadata, activates a child span for the RPC lifetime, and injects the current trace context into trailing metadata.

Initialize the tracing interceptor.

Parameters:

Name Type Description Default
propagator ITracePropagator

Trace context propagator for extract/inject.

required
Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/interceptors/tracing.py
def __init__(self, *, propagator: ITracePropagator) -> None:
    """Initialize the tracing interceptor.

    Args:
        propagator: Trace context propagator for extract/inject.
    """
    self.__propagator = propagator

intercept_service(continuation, handler_call_details) async

Intercept an RPC and set up W3C Trace Context.

Extracts trace context from incoming metadata, creates a child span (or a new root when no parent exists), and wraps the handler to inject trace context into trailing metadata and clear it after completion.

Parameters:

Name Type Description Default
continuation Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler]]

Calls the next interceptor or resolves the handler.

required
handler_call_details HandlerCallDetails

Describes the incoming RPC.

required

Returns:

Type Description
RpcMethodHandler

A handler with trace-context lifecycle wrappers.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/interceptors/tracing.py
@override
async def intercept_service(
    self,
    continuation: Callable[
        [grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]
    ],
    handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler:
    """Intercept an RPC and set up W3C Trace Context.

    Extracts trace context from incoming metadata, creates a child span
    (or a new root when no parent exists), and wraps the handler to inject
    trace context into trailing metadata and clear it after completion.

    Args:
        continuation: Calls the next interceptor or resolves the handler.
        handler_call_details: Describes the incoming RPC.

    Returns:
        A handler with trace-context lifecycle wrappers.
    """
    carrier = self._metadata_to_dict(handler_call_details.invocation_metadata)
    parent = self.__propagator.extract(carrier)
    ctx = parent.child() if parent is not None else TraceContext.new_root()
    TraceContext.set(ctx)

    try:
        handler = await continuation(handler_call_details)
    except Exception:
        TraceContext.clear()
        raise

    if handler is None:
        TraceContext.clear()
        return handler  # type: ignore[return-value] — unimplemented method

    return _WrappedHandler(
        handler,
        wrap_unary=self._wrap_unary_behavior,
        wrap_stream=self._wrap_stream_behavior,
    )

Error handling interceptor for gRPC servers.

Catches domain exceptions and maps them to appropriate gRPC status codes. Unexpected exceptions are logged and returned as INTERNAL status.

ErrorHandlingInterceptor(*, debug=False)

Bases: ServerInterceptor

Interceptor that converts exceptions to gRPC status codes.

AbstractGrpcStatusError subclasses are mapped to their declared status_code. All other exceptions become INTERNAL.

Attributes:

Name Type Description
__debug bool

When True, include tracebacks in error details.

Initialize the error handling interceptor.

Parameters:

Name Type Description Default
debug bool

Whether to include full tracebacks in error details.

False
Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/interceptors/error_handling.py
def __init__(self, *, debug: bool = False) -> None:
    """Initialize the error handling interceptor.

    Args:
        debug: Whether to include full tracebacks in error details.
    """
    self.__debug = debug

intercept_service(continuation, handler_call_details) async

Intercept an RPC and wrap the handler with error handling.

Parameters:

Name Type Description Default
continuation Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler]]

Calls the next interceptor or resolves the handler.

required
handler_call_details HandlerCallDetails

Describes the incoming RPC.

required

Returns:

Type Description
RpcMethodHandler

A handler with error-catching wrappers on its behavior methods.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/interceptors/error_handling.py
@override
async def intercept_service(
    self,
    continuation: Callable[
        [grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]
    ],
    handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler:
    """Intercept an RPC and wrap the handler with error handling.

    Args:
        continuation: Calls the next interceptor or resolves the handler.
        handler_call_details: Describes the incoming RPC.

    Returns:
        A handler with error-catching wrappers on its behavior methods.
    """
    handler = await continuation(handler_call_details)
    if handler is None:
        return handler  # type: ignore[return-value] — unimplemented method
    return _WrappedHandler(
        handler,
        wrap_unary=self._wrap_unary_behavior,
        wrap_stream=self._wrap_stream_behavior,
    )

후처리기s

Post-processor for registering gRPC services from controllers.

Scans @GrpcController-decorated Pods, builds protobuf descriptors at runtime, and appends generic RPC handlers to the shared :class:GrpcServerSpec.

RegisterServicesPostProcessor

Bases: IPostProcessor, IContainerAware, IApplicationContextAware

Post-processor that registers gRPC services from controllers.

When a @GrpcController Pod is created, this processor:

  1. Builds a FileDescriptorProto from the controller's @rpc methods and dataclass message types.
  2. Registers the descriptor in the shared DescriptorRegistry.
  3. Creates a GrpcServiceHandler (generic handler) and appends it to the shared :class:GrpcServerSpec.

Runs at @Order(0) — first in the gRPC post-processor chain.

set_container(container)

Inject the IoC container.

Parameters:

Name Type Description Default
container IContainer

The IoC container.

required
Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/post_processors/register_services.py
@override
def set_container(self, container: IContainer) -> None:
    """Inject the IoC container.

    Args:
        container: The IoC container.
    """
    self.__container = container

set_application_context(application_context)

Inject the application context.

Parameters:

Name Type Description Default
application_context IApplicationContext

The application context.

required
Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/post_processors/register_services.py
@override
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Inject the application context.

    Args:
        application_context: The application context.
    """
    self.__application_context = application_context

post_process(pod)

Register a gRPC service if pod is a @GrpcController.

Non-controller Pods are returned unchanged.

Parameters:

Name Type Description Default
pod object

The Pod instance to process.

required

Returns:

Type Description
object

The unmodified Pod.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/post_processors/register_services.py
@override
def post_process(self, pod: object) -> object:
    """Register a gRPC service if *pod* is a ``@GrpcController``.

    Non-controller Pods are returned unchanged.

    Args:
        pod: The Pod instance to process.

    Returns:
        The unmodified Pod.
    """
    if not GrpcController.exists(type(pod)):
        return pod

    controller_type = self._unwrap_proxy_type(type(pod))
    annotation = GrpcController.get(controller_type)
    package = annotation.package
    service_name = annotation.service_name or controller_type.__name__

    file_desc = build_file_descriptor(controller_type)

    registry = self.__container.get(DescriptorRegistry)
    if not registry.is_registered(file_desc.name):
        registry.register(file_desc)

    handler = GrpcServiceHandler(
        controller_type=controller_type,
        package=package,
        service_name=service_name,
        container=self.__container,
        application_context=self.__application_context,
        registry=registry,
    )

    spec = self.__container.get(GrpcServerSpec)
    spec.add_handler(handler)

    logger.info(
        f"Registered gRPC service {package}.{service_name} "
        f"from {controller_type.__qualname__}"
    )
    return pod

Post-processor for recording interceptors on the gRPC server spec.

Adds ErrorHandlingInterceptor and (when the tracing plugin is loaded) TracingInterceptor to the shared :class:GrpcServerSpec. The actual grpc.aio.Server is instantiated later, on the event loop that will run it (see :mod:spakky.plugins.grpc.server_spec).

AddInterceptorsPostProcessor

Bases: IPostProcessor, IContainerAware, IApplicationContextAware

Post-processor that records interceptors on the shared server spec.

Interceptors added (in order):

  1. ErrorHandlingInterceptor — always.
  2. TracingInterceptor — only when an ITracePropagator is available in the application context.

Runs at @Order(1) — after service registration.

set_container(container)

Inject the IoC container.

Parameters:

Name Type Description Default
container IContainer

The IoC container.

required
Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/post_processors/add_interceptors.py
@override
def set_container(self, container: IContainer) -> None:
    """Inject the IoC container.

    Args:
        container: The IoC container.
    """
    self.__container = container

set_application_context(application_context)

Inject the application context.

Parameters:

Name Type Description Default
application_context IApplicationContext

The application context.

required
Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/post_processors/add_interceptors.py
@override
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Inject the application context.

    Args:
        application_context: The application context.
    """
    self.__application_context = application_context

post_process(pod)

Record interceptors on the server spec once per spec instance.

The spec is resolved lazily so that interceptor registration runs exactly once: the first time a GrpcServerSpec Pod is seen.

Parameters:

Name Type Description Default
pod object

The Pod instance to process.

required

Returns:

Type Description
object

The pod unchanged.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/post_processors/add_interceptors.py
@override
def post_process(self, pod: object) -> object:
    """Record interceptors on the server spec once per spec instance.

    The spec is resolved lazily so that interceptor registration runs
    exactly once: the first time a ``GrpcServerSpec`` Pod is seen.

    Args:
        pod: The Pod instance to process.

    Returns:
        The pod unchanged.
    """
    if not isinstance(pod, GrpcServerSpec):
        return pod

    pod.add_interceptor(ErrorHandlingInterceptor())

    propagator = self.__application_context.get_or_none(ITracePropagator)
    if propagator is not None:
        pod.add_interceptor(TracingInterceptor(propagator=propagator))

    logger.info(f"Registered {len(pod.interceptors)} interceptor(s) on gRPC spec")
    return pod

Post-processor for binding gRPC server lifecycle.

Wires a :class:GrpcServerSpec Pod into the ApplicationContext so that the underlying grpc.aio.Server is materialised on the context's event loop and started/stopped alongside the application.

GRACEFUL_SHUTDOWN_SECONDS = 5.0 module-attribute

Default grace period (seconds) for server shutdown.

GrpcServerService(spec)

Bases: IAsyncService

Async service wrapper that instantiates the gRPC server on the right loop.

grpc.aio.server() binds to whatever event loop is running when it is called, so the real server is created inside :meth:start_async from the captured :class:GrpcServerSpec.

Attributes:

Name Type Description
_spec GrpcServerSpec

Configuration collected during post-processing.

_server Server | None

The materialised server, set once :meth:start_async runs.

_stop_event Event

Async event passed by the application context; unused internally because shutdown is driven by :meth:stop_async, but retained to satisfy the :class:IAsyncService contract.

Initialise the service with a server spec.

Parameters:

Name Type Description Default
spec GrpcServerSpec

Collected server configuration.

required
Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/post_processors/bind_server.py
def __init__(self, spec: GrpcServerSpec) -> None:
    """Initialise the service with a server spec.

    Args:
        spec: Collected server configuration.
    """
    self._spec = spec
    self._server = None

set_stop_event(stop_event)

Store the async stop event from the application context.

Parameters:

Name Type Description Default
stop_event Event

Async event forwarded by the application context.

required
Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/post_processors/bind_server.py
@override
def set_stop_event(self, stop_event: locks.Event) -> None:
    """Store the async stop event from the application context.

    Args:
        stop_event: Async event forwarded by the application context.
    """
    self._stop_event = stop_event

start_async() async

Build the gRPC server on the current loop and start it.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/post_processors/bind_server.py
@override
async def start_async(self) -> None:
    """Build the gRPC server on the current loop and start it."""
    self._server = self._spec.build()
    await self._server.start()
    logger.info("gRPC server started")

stop_async() async

Gracefully stop the gRPC server if it was started.

Clears _server after a successful stop so that repeated stop_async calls are safe no-ops.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/post_processors/bind_server.py
@override
async def stop_async(self) -> None:
    """Gracefully stop the gRPC server if it was started.

    Clears ``_server`` after a successful stop so that repeated
    ``stop_async`` calls are safe no-ops.
    """
    if self._server is None:
        return
    server = self._server
    self._server = None
    await server.stop(grace=GRACEFUL_SHUTDOWN_SECONDS)
    logger.info("gRPC server stopped")

BindServerPostProcessor

Bases: IPostProcessor, IContainerAware, IApplicationContextAware

Post-processor that binds a :class:GrpcServerSpec to the ApplicationContext.

Wraps the spec in a :class:GrpcServerService and registers it with the ApplicationContext for automatic start/stop management.

Runs at @Order(2) — last in the gRPC post-processor chain.

set_container(container)

Inject the IoC container.

Parameters:

Name Type Description Default
container IContainer

The IoC container.

required
Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/post_processors/bind_server.py
@override
def set_container(self, container: IContainer) -> None:
    """Inject the IoC container.

    Args:
        container: The IoC container.
    """
    self.__container = container

set_application_context(application_context)

Inject the application context.

Parameters:

Name Type Description Default
application_context IApplicationContext

The application context.

required
Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/post_processors/bind_server.py
@override
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Inject the application context.

    Args:
        application_context: The application context.
    """
    self.__application_context = application_context

post_process(pod)

Bind server lifecycle if pod is a GrpcServerSpec.

Non-spec Pods are returned unchanged.

Parameters:

Name Type Description Default
pod object

The Pod instance to process.

required

Returns:

Type Description
object

The unmodified spec Pod.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/post_processors/bind_server.py
@override
def post_process(self, pod: object) -> object:
    """Bind server lifecycle if *pod* is a ``GrpcServerSpec``.

    Non-spec Pods are returned unchanged.

    Args:
        pod: The Pod instance to process.

    Returns:
        The unmodified spec Pod.
    """
    if not isinstance(pod, GrpcServerSpec):
        return pod

    service = GrpcServerService(pod)
    service.set_stop_event(self.__application_context.task_stop_event)
    self.__application_context.add_service(service)
    logger.info("Bound gRPC server lifecycle to ApplicationContext")
    return pod

에러

gRPC plugin error hierarchy.

Provides base error classes, gRPC status-mapped errors, and schema errors.

AbstractSpakkyGrpcError

Bases: AbstractSpakkyFrameworkError, ABC

Base exception for all Spakky gRPC errors.

AbstractGrpcStatusError

Bases: AbstractSpakkyGrpcError, ABC

Base for gRPC errors that map to a specific status code.

Subclasses must define status_code to specify which gRPC status code the error maps to.

InvalidArgument

Bases: AbstractGrpcStatusError

gRPC INVALID_ARGUMENT error.

NotFound

Bases: AbstractGrpcStatusError

gRPC NOT_FOUND error.

AlreadyExists

Bases: AbstractGrpcStatusError

gRPC ALREADY_EXISTS error.

PermissionDenied

Bases: AbstractGrpcStatusError

gRPC PERMISSION_DENIED error.

Unauthenticated

Bases: AbstractGrpcStatusError

gRPC UNAUTHENTICATED error.

FailedPrecondition

Bases: AbstractGrpcStatusError

gRPC FAILED_PRECONDITION error.

Unavailable

Bases: AbstractGrpcStatusError

gRPC UNAVAILABLE error.

InternalError

Bases: AbstractGrpcStatusError

gRPC INTERNAL error.

UnsupportedFieldTypeError(field_type)

Bases: AbstractSpakkyGrpcError

Raised when a Python type cannot be mapped to a protobuf type.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/error.py
def __init__(self, field_type: type[object]) -> None:
    super().__init__()
    self.field_type = field_type

MissingProtoFieldAnnotationError(model_type, field_name)

Bases: AbstractSpakkyGrpcError

Raised when a BaseModel field lacks a ProtoField annotation.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/error.py
def __init__(self, model_type: type, field_name: str) -> None:
    super().__init__()
    self.model_type = model_type
    self.field_name = field_name

UnsupportedResponseTypeError(value_type)

Bases: AbstractSpakkyGrpcError

Raised when a serializer receives an object it cannot encode.

The gRPC response serializer accepts either a protobuf Message (passed through verbatim) or a pydantic BaseModel (encoded via the json_format bridge). Any other type signals a controller returned an unsupported value.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/error.py
def __init__(self, value_type: type[object]) -> None:
    super().__init__()
    self.value_type = value_type

DescriptorAlreadyRegisteredError(file_name)

Bases: AbstractSpakkyGrpcError

Raised when a FileDescriptorProto is registered more than once.

Source code in plugins/spakky-grpc/src/spakky/plugins/grpc/error.py
def __init__(self, file_name: str) -> None:
    super().__init__()
    self.file_name = file_name