콘텐츠로 이동

spakky-auth

spakky-auth는 인증 상태, 인가 요구사항, provider capability, snapshot 전파 계약을 정의합니다.

Provider-neutral 인증/인가 의미 모델입니다.

패키지 루트

Provider-neutral authentication and authorization package root.

AUTH_CONTEXT_CONTEXT_KEY = 'spakky.auth.context' module-attribute

ApplicationContext context value key for the current AuthContext.

AUTH_CONTEXT_SNAPSHOT_HEADER_KEY = 'x-spakky-auth-context-snapshot' module-attribute

HTTP/gRPC-style header key for signed AuthContextSnapshot propagation.

AUTH_CONTEXT_SNAPSHOT_METADATA_KEY = 'spakky.auth.context_snapshot' module-attribute

Transport metadata key for signed AuthContextSnapshot propagation.

AUTH_CONTEXT_SNAPSHOT_SCHEMA_VERSION = 1 module-attribute

Current signed AuthContextSnapshot envelope schema version.

DEFAULT_AUTH_CLOCK_SKEW_SECONDS = 60 module-attribute

Default tolerated clock skew for AuthContextSnapshot validation.

AUTH_CONTRIBUTION_ENTRY_POINT_GROUP = 'spakky.contributions.spakky.auth' module-attribute

Entry point group used by providers contributing to spakky-auth.

EXPIRED_SNAPSHOT_DECISION = AuthorizationDecision.challenge(AuthorizationReasonCode.SNAPSHOT_EXPIRED) module-attribute

Default decision for expired AuthContextSnapshot metadata.

INVALID_SNAPSHOT_DECISION = AuthorizationDecision.challenge(AuthorizationReasonCode.SNAPSHOT_INVALID) module-attribute

Default decision for malformed or unsigned AuthContextSnapshot metadata.

MISSING_SNAPSHOT_DECISION = AuthorizationDecision.challenge(AuthorizationReasonCode.SNAPSHOT_MISSING) module-attribute

Default decision for missing AuthContextSnapshot propagation metadata.

VERIFICATION_PROVIDER_UNAVAILABLE_DECISION = AuthorizationDecision.error(AuthorizationReasonCode.VERIFICATION_PROVIDER_UNAVAILABLE) module-attribute

Default decision when snapshot verification provider is unavailable.

AUTH_STARTUP_VALIDATION_ERROR_DETAIL_KEY = 'auth.capability.validation.error' module-attribute

Startup diagnostic detail key for invalid auth capability provider counts.

SPAKKY_AUTH_SNAPSHOT_PROPAGATION_CONFIG_ENV_PREFIX = 'SPAKKY_AUTH_SNAPSHOT_PROPAGATION_' module-attribute

Environment prefix for signed AuthContextSnapshot propagation settings.

PLUGIN_NAME = Plugin(name='spakky-auth') module-attribute

Plugin identifier for the Spakky Auth package.

AsyncAuthorizationAspect(application_context=None, authorization_policy_evaluator=None, permission_checker=None, relation_checker=None, role_checker=None, scope_checker=None)

Bases: IAsyncAspect, IApplicationContextAware

Asynchronous aspect enforcing protected auth metadata.

Source code in core/spakky-auth/src/spakky/auth/aspects/authorization.py
def __init__(
    self,
    application_context: IApplicationContext | None = None,
    authorization_policy_evaluator: IAuthorizationPolicyEvaluator | None = None,
    permission_checker: IPermissionChecker | None = None,
    relation_checker: IRelationChecker | None = None,
    role_checker: IRoleChecker | None = None,
    scope_checker: IScopeChecker | None = None,
) -> None:
    self._application_context = application_context
    self._authorization_policy_evaluator = authorization_policy_evaluator
    self._permission_checker = permission_checker
    self._relation_checker = relation_checker
    self._role_checker = role_checker
    self._scope_checker = scope_checker

set_application_context(application_context)

Inject the application context when managed as a Pod.

Source code in core/spakky-auth/src/spakky/auth/aspects/authorization.py
@override
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Inject the application context when managed as a Pod."""
    self._application_context = application_context

AuthorizationAspect(application_context=None, authorization_policy_evaluator=None, permission_checker=None, relation_checker=None, role_checker=None, scope_checker=None)

Bases: IAspect, IApplicationContextAware

Synchronous aspect enforcing protected auth metadata.

Source code in core/spakky-auth/src/spakky/auth/aspects/authorization.py
def __init__(
    self,
    application_context: IApplicationContext | None = None,
    authorization_policy_evaluator: IAuthorizationPolicyEvaluator | None = None,
    permission_checker: IPermissionChecker | None = None,
    relation_checker: IRelationChecker | None = None,
    role_checker: IRoleChecker | None = None,
    scope_checker: IScopeChecker | None = None,
) -> None:
    self._application_context = application_context
    self._authorization_policy_evaluator = authorization_policy_evaluator
    self._permission_checker = permission_checker
    self._relation_checker = relation_checker
    self._role_checker = role_checker
    self._scope_checker = scope_checker

set_application_context(application_context)

Inject the application context when managed as a Pod.

Source code in core/spakky-auth/src/spakky/auth/aspects/authorization.py
@override
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Inject the application context when managed as a Pod."""
    self._application_context = application_context

AuthCapability

Bases: StrEnum

Auth provider capabilities declared by feature contributions.

AuthProviderContribution(*, provider_id, capabilities) dataclass

Provider contribution metadata consumed by auth startup validation.

provider_id instance-attribute

Stable provider-neutral contribution identifier.

capabilities instance-attribute

Capabilities implemented by the contribution provider.

supports(capability)

Return whether this contribution declares a capability.

Source code in core/spakky-auth/src/spakky/auth/contribution.py
def supports(self, capability: AuthCapability) -> bool:
    """Return whether this contribution declares a capability."""
    return capability in self.capabilities

AuthClaim(*, name, value) dataclass

Selected provider claim safe for framework-level auth decisions.

name instance-attribute

Canonical claim name.

value instance-attribute

JSON-scalar claim value.

AuthContext(*, subject, issuer, tenant=None, roles=(), scopes=(), claims=(), credential_carrier=None, metadata=tuple()) dataclass

Request/context-scoped authentication state seeded by inbound adapters.

subject instance-attribute

Authenticated subject.

issuer instance-attribute

Authority that authenticated the subject.

tenant = None class-attribute instance-attribute

Optional tenant canonical reference.

roles = () class-attribute instance-attribute

Role canonical references granted to the subject.

scopes = () class-attribute instance-attribute

Scope canonical references granted to the subject.

claims = () class-attribute instance-attribute

Selected provider claims retained for downstream decisions.

credential_carrier = None class-attribute instance-attribute

Boundary-local credential carrier that produced this context.

metadata = field(default_factory=tuple) class-attribute instance-attribute

Framework-safe metadata associated with this auth context.

AuthSubject(*, id, display_name=None) dataclass

Authenticated subject identity.

id instance-attribute

Stable provider-neutral subject identifier.

display_name = None class-attribute instance-attribute

Optional human-readable subject label.

CredentialCarrier(*, kind, location, material, name=None, scheme=None) dataclass

Boundary-local credential material handed to an auth provider.

kind instance-attribute

Provider-neutral credential category.

location instance-attribute

Transport location where the credential was discovered.

material instance-attribute

Opaque credential material read by the inbound adapter.

name = None class-attribute instance-attribute

Optional transport field name, such as an HTTP header name.

scheme = None class-attribute instance-attribute

Optional credential scheme, such as Bearer.

CredentialCarrierKind

Bases: StrEnum

Canonical credential material categories accepted by auth providers.

CredentialCarrierLocation

Bases: StrEnum

Where an inbound adapter observed credential material.

AuthorizationDecision(*, state, reason_code, reason=None) dataclass

Provider-neutral result of an auth enforcement decision.

state instance-attribute

ALLOW, CHALLENGE, DENY, or ERROR.

reason_code instance-attribute

Machine-readable reason for the decision.

reason = None class-attribute instance-attribute

Optional operator-facing reason text.

allow(reason_code=AuthorizationReasonCode.AUTHORIZED, reason=None) classmethod

Create an ALLOW decision.

Source code in core/spakky-auth/src/spakky/auth/decision.py
@classmethod
def allow(
    cls,
    reason_code: AuthorizationReasonCode = AuthorizationReasonCode.AUTHORIZED,
    reason: str | None = None,
) -> Self:
    """Create an ALLOW decision."""
    return cls(
        state=AuthorizationDecisionState.ALLOW,
        reason_code=reason_code,
        reason=reason,
    )

challenge(reason_code, reason=None) classmethod

Create a CHALLENGE decision.

Source code in core/spakky-auth/src/spakky/auth/decision.py
@classmethod
def challenge(
    cls,
    reason_code: AuthorizationReasonCode,
    reason: str | None = None,
) -> Self:
    """Create a CHALLENGE decision."""
    return cls(
        state=AuthorizationDecisionState.CHALLENGE,
        reason_code=reason_code,
        reason=reason,
    )

deny(reason_code, reason=None) classmethod

Create a DENY decision.

Source code in core/spakky-auth/src/spakky/auth/decision.py
@classmethod
def deny(
    cls,
    reason_code: AuthorizationReasonCode,
    reason: str | None = None,
) -> Self:
    """Create a DENY decision."""
    return cls(
        state=AuthorizationDecisionState.DENY,
        reason_code=reason_code,
        reason=reason,
    )

error(reason_code, reason=None) classmethod

Create an ERROR decision.

Source code in core/spakky-auth/src/spakky/auth/decision.py
@classmethod
def error(
    cls,
    reason_code: AuthorizationReasonCode,
    reason: str | None = None,
) -> Self:
    """Create an ERROR decision."""
    return cls(
        state=AuthorizationDecisionState.ERROR,
        reason_code=reason_code,
        reason=reason,
    )

AuthorizationDecisionState

Bases: StrEnum

Canonical auth enforcement outcomes.

AuthorizationReasonCode

Bases: StrEnum

Machine-readable reason codes attached to authorization decisions.

AbstractSpakkyAuthError

Bases: AbstractSpakkyFrameworkError, ABC

Base class for all spakky-auth errors.

AuthContextError

Bases: AbstractSpakkyAuthError

Raised when AuthContext storage or lookup fails.

AuthContextNotFoundError

Bases: AuthContextError

Raised when ApplicationContext has no AuthContext value.

AuthRequirementDeniedError(decision=None)

Bases: AuthorizationError

Raised when an auth requirement decision is not ALLOW.

Source code in core/spakky-auth/src/spakky/auth/error.py
def __init__(self, decision: AuthorizationDecision | None = None) -> None:
    self.decision = decision
    super().__init__()

AuthRequirementProviderUnavailableError

Bases: AuthorizationError

Raised when no provider is available for a protected requirement.

AuthContextSnapshotError

Bases: AbstractSpakkyAuthError

Raised when an AuthContextSnapshot cannot be used.

AuthVerificationProviderUnavailableError

Bases: AuthContextSnapshotError

Raised when snapshot verification cannot reach its provider.

AuthenticationError

Bases: AbstractSpakkyAuthError

Raised when authentication fails before authorization policy evaluation.

AuthorizationError

Bases: AbstractSpakkyAuthError

Raised when authorization policy evaluation denies access.

ConflictingAuthMetadataError

Bases: AuthorizationError

Raised when public and protected auth metadata are both effective.

CredentialCarrierError

Bases: AbstractSpakkyAuthError

Raised when a credential carrier cannot be interpreted.

ExpiredAuthContextSnapshotError

Bases: AuthContextSnapshotError

Raised when snapshot propagation metadata is outside its time window.

InvalidAuthContextSnapshotError

Bases: AuthContextSnapshotError

Raised when snapshot propagation metadata is malformed or unsigned.

InvalidAuthContextValueError

Bases: AuthContextError

Raised when ApplicationContext contains a non-AuthContext value.

MissingAuthContextSnapshotError

Bases: AuthContextSnapshotError

Raised when snapshot propagation metadata is absent.

AuthDynamicRef(*, kind, expression) dataclass

Late-bound resource, action, or tenant reference expression.

kind instance-attribute

Reference kind expected from the resolver output.

expression instance-attribute

Provider-neutral expression name understood by the resolver.

AuthDynamicRefKind

Bases: StrEnum

Dynamic auth reference target kinds resolved from an invocation.

AuthInvocation(*, boundary, operation, subject=None, attributes=()) dataclass

Provider-neutral description of the boundary call being authorized.

boundary instance-attribute

Boundary category such as HTTP, gRPC, CLI, task, event, or saga.

operation instance-attribute

Canonical operation or handler reference at the boundary.

subject = None class-attribute instance-attribute

Optional pre-authenticated subject when a boundary already has one.

attributes = () class-attribute instance-attribute

Scalar invocation attributes available to dynamic ref resolvers.

AuthInvocationAttribute(*, name, value) dataclass

Typed scalar attribute made available to auth reference resolvers.

name instance-attribute

Provider-neutral attribute name.

value instance-attribute

JSON-scalar invocation attribute value.

ResolvedAuthReference(*, kind, value) dataclass

Resolved auth reference value returned by invocation resolvers.

kind instance-attribute

Resolved reference kind.

value instance-attribute

Canonical resource, action, or tenant reference.

EffectiveAuthMetadata(*, public_access, requirements) dataclass

Effective auth metadata after class/method aggregation.

public_access instance-attribute

Whether a public access marker is effective.

requirements instance-attribute

Ordered, duplicate-free protected requirements with AND semantics.

protected property

Return whether this boundary has protected requirements.

AuthRequirement(*, kind, ref, resource=None, action=None, tenant=None) dataclass

Canonical protected-boundary auth requirement.

kind instance-attribute

Requirement category used by enforcement to select the provider port.

ref instance-attribute

Canonical permission, role, scope, relation, or marker reference.

resource = None class-attribute instance-attribute

Optional resource reference for permission, policy, or relation checks.

action = None class-attribute instance-attribute

Optional action reference for policy checks.

tenant = None class-attribute instance-attribute

Optional tenant reference for tenant-scoped checks.

AuthRequirementKind

Bases: StrEnum

Canonical auth requirement categories supported by decorators.

ProtectedRequirement(requirement) dataclass

Bases: Annotation

Annotation carrying one protected auth requirement.

PublicAccess() dataclass

Bases: Annotation

Marker annotation for explicitly public boundaries.

AuthorizationRequest(*, auth_context, resource, action, tenant=None) dataclass

Provider-neutral policy evaluation request.

auth_context instance-attribute

Authenticated subject context.

resource instance-attribute

Canonical resource reference being accessed.

action instance-attribute

Canonical action reference being attempted.

tenant = None class-attribute instance-attribute

Optional tenant reference; None means tenant is not applicable.

IAuthContextSnapshotSigner

Bases: ABC

Provider-neutral AuthContextSnapshot signing port.

sign_snapshot(request) abstractmethod

Create a signed AuthContextSnapshot for propagation.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def sign_snapshot(self, request: SnapshotSignRequest) -> AuthContextSnapshot:
    """Create a signed AuthContextSnapshot for propagation."""
    ...

IAuthContextSnapshotVerifier

Bases: ABC

Provider-neutral AuthContextSnapshot verification port.

verify_snapshot(snapshot_envelope, invocation) abstractmethod

Verify a signed snapshot envelope and return its auth context.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def verify_snapshot(
    self,
    snapshot_envelope: str,
    invocation: AuthInvocation,
) -> AuthContext:
    """Verify a signed snapshot envelope and return its auth context."""
    ...

IAuthInvocationResolver

Bases: ABC

Resolver for invocation-scoped resource, action, and tenant refs.

resolve_ref(invocation, dynamic_ref) abstractmethod

Resolve a late-bound auth reference from invocation attributes.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def resolve_ref(
    self,
    invocation: AuthInvocation,
    dynamic_ref: AuthDynamicRef,
) -> ResolvedAuthReference:
    """Resolve a late-bound auth reference from invocation attributes."""
    ...

IAuthenticationProvider

Bases: ABC

Provider-neutral authentication port.

authenticate(credential, invocation) abstractmethod

Authenticate a credential observed at an invocation boundary.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def authenticate(
    self,
    credential: CredentialCarrier,
    invocation: AuthInvocation,
) -> AuthContext:
    """Authenticate a credential observed at an invocation boundary."""
    ...

IAuthorizationPolicyEvaluator

Bases: ABC

Provider-neutral policy evaluation port.

evaluate_policy(request) abstractmethod

Evaluate a resource/action authorization request.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def evaluate_policy(self, request: AuthorizationRequest) -> AuthorizationDecision:
    """Evaluate a resource/action authorization request."""
    ...

IPasswordHasher

Bases: ABC

Provider-neutral password hashing port.

hash_password(password) abstractmethod

Hash plaintext password material for storage.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def hash_password(self, password: AuthPasswordPlaintext) -> AuthPasswordHash:
    """Hash plaintext password material for storage."""
    ...

IPasswordVerifier

Bases: ABC

Provider-neutral password verification port.

verify_password(password, password_hash) abstractmethod

Verify plaintext password material against a stored hash.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def verify_password(
    self,
    password: AuthPasswordPlaintext,
    password_hash: AuthPasswordHash,
) -> AuthorizationDecision:
    """Verify plaintext password material against a stored hash."""
    ...

IPermissionChecker

Bases: ABC

Provider-neutral permission check port.

check_permission(request) abstractmethod

Check whether a subject has a permission.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def check_permission(
    self, request: PermissionCheckRequest
) -> AuthorizationDecision:
    """Check whether a subject has a permission."""
    ...

IRelationChecker

Bases: ABC

Provider-neutral relation check port.

check_relation(request) abstractmethod

Check whether a subject has a relation to a resource.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def check_relation(self, request: RelationCheckRequest) -> AuthorizationDecision:
    """Check whether a subject has a relation to a resource."""
    ...

IRoleChecker

Bases: ABC

Provider-neutral role check port.

check_role(request) abstractmethod

Check whether a subject has a role.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def check_role(self, request: RoleCheckRequest) -> AuthorizationDecision:
    """Check whether a subject has a role."""
    ...

IScopeChecker

Bases: ABC

Provider-neutral scope check port.

check_scope(request) abstractmethod

Check whether a subject has a scope.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def check_scope(self, request: ScopeCheckRequest) -> AuthorizationDecision:
    """Check whether a subject has a scope."""
    ...

PermissionCheckRequest(*, auth_context, permission, resource=None, tenant=None) dataclass

Permission check request scoped to an authenticated subject.

auth_context instance-attribute

Authenticated subject context.

permission instance-attribute

Canonical permission reference.

resource = None class-attribute instance-attribute

Optional resource reference; None means resource-independent.

tenant = None class-attribute instance-attribute

Optional tenant reference; None means tenant is not applicable.

RelationCheckRequest(*, auth_context, relation, resource, tenant=None) dataclass

Relationship authorization request scoped to a resource.

auth_context instance-attribute

Authenticated subject context.

relation instance-attribute

Canonical relationship reference.

resource instance-attribute

Canonical resource reference for the relationship check.

tenant = None class-attribute instance-attribute

Optional tenant reference; None means tenant is not applicable.

RoleCheckRequest(*, auth_context, role, tenant=None) dataclass

Role check request scoped to an authenticated subject.

auth_context instance-attribute

Authenticated subject context.

role instance-attribute

Canonical role reference.

tenant = None class-attribute instance-attribute

Optional tenant reference; None means tenant is not applicable.

ScopeCheckRequest(*, auth_context, scope) dataclass

Scope check request scoped to an authenticated subject.

auth_context instance-attribute

Authenticated subject context.

scope instance-attribute

Canonical scope reference.

SnapshotSignRequest(*, auth_context, tenant=None) dataclass

Request for signing an AuthContext snapshot for propagation.

auth_context instance-attribute

Auth context to serialize and sign.

tenant = None class-attribute instance-attribute

Optional tenant override; None means use the context tenant.

AuthContextSnapshot(*, subject, issuer, issued_at, expires_at, signature, schema_version=AUTH_CONTEXT_SNAPSHOT_SCHEMA_VERSION, tenant=None, roles=(), scopes=(), selected_claims=(), correlation_id=None, delegation_chain=()) dataclass

Canonical JSON signed envelope propagated instead of raw credentials.

subject instance-attribute

Subject represented by this snapshot.

issuer instance-attribute

Issuer that created the snapshot.

issued_at instance-attribute

Snapshot issue timestamp.

expires_at instance-attribute

Snapshot expiration timestamp.

signature instance-attribute

Signature material required by verification providers.

schema_version = AUTH_CONTEXT_SNAPSHOT_SCHEMA_VERSION class-attribute instance-attribute

Canonical snapshot schema version.

tenant = None class-attribute instance-attribute

Optional tenant canonical reference.

roles = () class-attribute instance-attribute

Role canonical references included in the snapshot.

scopes = () class-attribute instance-attribute

Scope canonical references included in the snapshot.

selected_claims = () class-attribute instance-attribute

Selected claims included in the signed snapshot.

correlation_id = None class-attribute instance-attribute

Optional trace/correlation reference for the propagated context.

delegation_chain = () class-attribute instance-attribute

Ordered subject/service delegation references.

canonical_payload()

Return canonical JSON-ready signed envelope payload.

Source code in core/spakky-auth/src/spakky/auth/snapshot.py
def canonical_payload(self) -> dict[str, object]:
    """Return canonical JSON-ready signed envelope payload."""
    selected_claims = {
        claim.name: claim.value
        for claim in sorted(self.selected_claims, key=lambda claim: claim.name)
    }
    return {
        "correlation_id": self.correlation_id,
        "delegation_chain": self.delegation_chain,
        "expires_at": self.expires_at.isoformat(),
        "issued_at": self.issued_at.isoformat(),
        "issuer": self.issuer,
        "roles": self.roles,
        "schema_version": self.schema_version,
        "scopes": self.scopes,
        "selected_claims": selected_claims,
        "signature": self.signature.canonical_payload(),
        "subject": {
            "display_name": self.subject.display_name,
            "id": self.subject.id,
        },
        "tenant": self.tenant,
    }

canonical_json()

Return compact, sorted canonical JSON for the signed envelope.

Source code in core/spakky-auth/src/spakky/auth/snapshot.py
def canonical_json(self) -> str:
    """Return compact, sorted canonical JSON for the signed envelope."""
    return json.dumps(
        self.canonical_payload(),
        sort_keys=True,
        separators=(",", ":"),
    )

base64url_canonical_json()

Return unpadded base64url canonical JSON envelope text.

Source code in core/spakky-auth/src/spakky/auth/snapshot.py
def base64url_canonical_json(self) -> str:
    """Return unpadded base64url canonical JSON envelope text."""
    encoded = urlsafe_b64encode(self.canonical_json().encode())
    return encoded.decode().rstrip("=")

AuthContextSnapshotSignature(*, key_id, algorithm, signature) dataclass

Signature material carried with a signed AuthContextSnapshot envelope.

key_id instance-attribute

Provider key identifier used to verify the envelope.

algorithm instance-attribute

Signature algorithm name.

signature instance-attribute

Base64url signature bytes produced by the signer provider.

canonical_payload()

Return canonical JSON-ready signature material.

Source code in core/spakky-auth/src/spakky/auth/snapshot.py
def canonical_payload(self) -> dict[str, str]:
    """Return canonical JSON-ready signature material."""
    return {
        "algorithm": self.algorithm,
        "key_id": self.key_id,
        "signature": self.signature,
    }

AuthCapabilityStartupValidationService()

Bases: IService, IContainerAware

Service that runs auth capability validation before user services start.

Source code in core/spakky-auth/src/spakky/auth/startup.py
def __init__(self) -> None:
    self._validator = None
    self._stop_event = None

set_container(container)

Inject the application container for startup validation.

Source code in core/spakky-auth/src/spakky/auth/startup.py
@override
def set_container(self, container: IContainer) -> None:
    """Inject the application container for startup validation."""
    self._validator = AuthCapabilityStartupValidator(container=container)

set_stop_event(stop_event)

Store the lifecycle stop event supplied by the application context.

Source code in core/spakky-auth/src/spakky/auth/startup.py
@override
def set_stop_event(self, stop_event: threading.Event) -> None:
    """Store the lifecycle stop event supplied by the application context."""
    self._stop_event = stop_event

start()

Validate auth capabilities before subsequent services start.

Source code in core/spakky-auth/src/spakky/auth/startup.py
@override
def start(self) -> None:
    """Validate auth capabilities before subsequent services start."""
    if self._validator is None:
        raise AuthStartupContainerUnavailableError()
    self._validator.validate()

stop()

Signal validation service shutdown.

Source code in core/spakky-auth/src/spakky/auth/startup.py
@override
def stop(self) -> None:
    """Signal validation service shutdown."""
    if self._stop_event is not None:
        self._stop_event.set()

AuthSnapshotPropagationConfig

Bases: BaseSettings

Feature-local config declaring signed AuthContextSnapshot propagation use.

enabled = False class-attribute instance-attribute

Whether this application propagates signed AuthContextSnapshot envelopes.

AuthStartupCapabilityDiagnostic(*, capability, provider_count, provider_ids, required_by) dataclass

Structured diagnostic for one invalid required auth capability count.

capability instance-attribute

Capability required by protected metadata or snapshot propagation.

provider_count instance-attribute

Number of auth provider contributions declaring the capability.

provider_ids instance-attribute

Provider identifiers declaring the capability.

required_by instance-attribute

Startup configuration or scanned boundary sources requiring the capability.

as_startup_diagnostic_detail()

Convert this auth diagnostic into a startup report detail.

Source code in core/spakky-auth/src/spakky/auth/startup.py
def as_startup_diagnostic_detail(self) -> StartupDiagnosticDetail:
    """Convert this auth diagnostic into a startup report detail."""
    return StartupDiagnosticDetail(
        key=AUTH_STARTUP_VALIDATION_ERROR_DETAIL_KEY,
        value=(
            f"capability={self.capability.value};"
            f"provider_count={self.provider_count};"
            f"providers={','.join(self.provider_ids)};"
            f"required_by={','.join(self.required_by)}"
        ),
    )

AuthStartupCapabilityValidationError(diagnostics)

Bases: AbstractSpakkyAuthError, IStartupDiagnosticDetailProvider

Raised when required auth capabilities have zero or multiple providers.

Source code in core/spakky-auth/src/spakky/auth/startup.py
def __init__(
    self,
    diagnostics: tuple[AuthStartupCapabilityDiagnostic, ...],
) -> None:
    self.diagnostics = diagnostics
    super().__init__(self.message)

startup_diagnostic_details property

Return diagnostics attachable to the startup failure summary.

AuthStartupContainerUnavailableError

Bases: AbstractSpakkyAuthError

Raised when startup validation runs before container injection.

require_auth_context(application_context)

Load AuthContext from ApplicationContext or raise an auth error.

Source code in core/spakky-auth/src/spakky/auth/context.py
def require_auth_context(application_context: IApplicationContext) -> AuthContext:
    """Load AuthContext from ApplicationContext or raise an auth error."""
    value = application_context.get_context_value(AUTH_CONTEXT_CONTEXT_KEY)
    if isinstance(value, AuthContext):
        return value
    if value is None:
        raise AuthContextNotFoundError()
    raise InvalidAuthContextValueError()

store_auth_context(application_context, auth_context)

Store AuthContext in ApplicationContext context values.

Source code in core/spakky-auth/src/spakky/auth/context.py
def store_auth_context(
    application_context: IApplicationContext,
    auth_context: AuthContext,
) -> None:
    """Store AuthContext in ApplicationContext context values."""
    application_context.set_context_value(AUTH_CONTEXT_CONTEXT_KEY, auth_context)

get_effective_auth_metadata(obj, *, owner_type=None)

Aggregate class and method auth metadata using AND semantics.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def get_effective_auth_metadata(
    obj: object,
    *,
    owner_type: type[object] | None = None,
) -> EffectiveAuthMetadata:
    """Aggregate class and method auth metadata using AND semantics."""
    sources = _metadata_sources(obj, owner_type=owner_type)
    public = any(PublicAccess.exists(source) for source in sources)
    requirements = _dedupe_requirements(
        requirement.requirement
        for source in sources
        for requirement in ProtectedRequirement.all(source)
    )
    if public and requirements:
        raise ConflictingAuthMetadataError()
    return EffectiveAuthMetadata(public_access=public, requirements=requirements)

has_auth_boundary_metadata(obj)

Return whether an object or any method declares auth metadata.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def has_auth_boundary_metadata(obj: object) -> bool:
    """Return whether an object or any method declares auth metadata."""
    if _source_has_auth_metadata(obj):
        return True
    if isinstance(obj, type):
        return _type_has_method_auth_metadata(obj)
    if callable(obj):
        owner_type = _owner_type(obj)
        if owner_type is None:
            return _source_has_auth_metadata(obj)
        return _source_has_auth_metadata(owner_type) or _source_has_auth_metadata(obj)
    return _source_has_auth_metadata(type(obj)) or _type_has_method_auth_metadata(
        type(obj)
    )

protected(obj)

Require an authenticated request-scope AuthContext.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def protected[T: object](obj: T) -> T:
    """Require an authenticated request-scope AuthContext."""
    return _requirement_decorator(
        AuthRequirement(
            kind=AuthRequirementKind.AUTHENTICATED,
            ref=AUTHENTICATED_REQUIREMENT_REF,
        )
    )(obj)

public_access(obj)

Mark a class or method as explicitly public.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def public_access[T: object](obj: T) -> T:
    """Mark a class or method as explicitly public."""
    return PublicAccess()(obj)

require_permission(permission, *, resource=None, tenant=None)

Require a permission decision for a class or method boundary.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def require_permission[T: object](
    permission: AuthPermissionRef,
    *,
    resource: AuthResourceRef | None = None,
    tenant: AuthTenantRef | None = None,
) -> Callable[[T], T]:
    """Require a permission decision for a class or method boundary."""
    return _requirement_decorator(
        AuthRequirement(
            kind=AuthRequirementKind.PERMISSION,
            ref=permission,
            resource=resource,
            tenant=tenant,
        )
    )

require_policy(resource, action, *, tenant=None)

Require a resource/action policy decision for a boundary.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def require_policy[T: object](
    resource: AuthResourceRef,
    action: AuthActionRef,
    *,
    tenant: AuthTenantRef | None = None,
) -> Callable[[T], T]:
    """Require a resource/action policy decision for a boundary."""
    return _requirement_decorator(
        AuthRequirement(
            kind=AuthRequirementKind.POLICY,
            ref=resource,
            resource=resource,
            action=action,
            tenant=tenant,
        )
    )

require_relation(relation, *, resource, tenant=None)

Require a relationship decision for a resource boundary.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def require_relation[T: object](
    relation: AuthRelationRef,
    *,
    resource: AuthResourceRef,
    tenant: AuthTenantRef | None = None,
) -> Callable[[T], T]:
    """Require a relationship decision for a resource boundary."""
    return _requirement_decorator(
        AuthRequirement(
            kind=AuthRequirementKind.RELATION,
            ref=relation,
            resource=resource,
            tenant=tenant,
        )
    )

require_role(role, *, tenant=None)

Require a role decision for a class or method boundary.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def require_role[T: object](
    role: AuthRoleRef,
    *,
    tenant: AuthTenantRef | None = None,
) -> Callable[[T], T]:
    """Require a role decision for a class or method boundary."""
    return _requirement_decorator(
        AuthRequirement(
            kind=AuthRequirementKind.ROLE,
            ref=role,
            tenant=tenant,
        )
    )

require_scope(scope)

Require a scope decision for a class or method boundary.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def require_scope[T: object](scope: AuthScopeRef) -> Callable[[T], T]:
    """Require a scope decision for a class or method boundary."""
    return _requirement_decorator(
        AuthRequirement(kind=AuthRequirementKind.SCOPE, ref=scope)
    )

effective_auth_snapshot_propagation_config(configs)

Collapse all registered snapshot propagation configs into one decision.

Source code in core/spakky-auth/src/spakky/auth/startup.py
def effective_auth_snapshot_propagation_config(
    configs: tuple[AuthSnapshotPropagationConfig, ...],
) -> AuthSnapshotPropagationConfig:
    """Collapse all registered snapshot propagation configs into one decision."""
    if len(configs) == 0:
        return AuthSnapshotPropagationConfig()
    return AuthSnapshotPropagationConfig(
        enabled=any(config.enabled for config in configs)
    )

플러그인 진입점

Plugin initialization entry point for spakky-auth.

initialize(app)

Initialize the spakky-auth package.

Registers auth AOP enforcement components and feature-local startup capability validation. Provider implementations and boundary AuthContext seeding are added by downstream auth issues.

Source code in core/spakky-auth/src/spakky/auth/main.py
def initialize(app: SpakkyApplication) -> None:
    """Initialize the spakky-auth package.

    Registers auth AOP enforcement components and feature-local startup
    capability validation. Provider implementations and boundary AuthContext
    seeding are added by downstream auth issues.
    """
    app.add(auth_snapshot_propagation_config)
    app.add(AuthCapabilityStartupValidationService)
    app.add(AuthorizationAspect)
    app.add(AsyncAuthorizationAspect)

Stable keys and defaults for provider-neutral auth contracts.

AUTH_CONTEXT_CONTEXT_KEY = 'spakky.auth.context' module-attribute

ApplicationContext context value key for the current AuthContext.

AUTH_CONTEXT_SNAPSHOT_METADATA_KEY = 'spakky.auth.context_snapshot' module-attribute

Transport metadata key for signed AuthContextSnapshot propagation.

AUTH_CONTEXT_SNAPSHOT_HEADER_KEY = 'x-spakky-auth-context-snapshot' module-attribute

HTTP/gRPC-style header key for signed AuthContextSnapshot propagation.

AUTH_CONTEXT_SNAPSHOT_SCHEMA_VERSION = 1 module-attribute

Current signed AuthContextSnapshot envelope schema version.

DEFAULT_AUTH_CLOCK_SKEW_SECONDS = 60 module-attribute

Default tolerated clock skew for AuthContextSnapshot validation.

의미 모델

AuthContext model and ApplicationContext storage helpers.

AuthClaim(*, name, value) dataclass

Selected provider claim safe for framework-level auth decisions.

name instance-attribute

Canonical claim name.

value instance-attribute

JSON-scalar claim value.

AuthSubject(*, id, display_name=None) dataclass

Authenticated subject identity.

id instance-attribute

Stable provider-neutral subject identifier.

display_name = None class-attribute instance-attribute

Optional human-readable subject label.

AuthContext(*, subject, issuer, tenant=None, roles=(), scopes=(), claims=(), credential_carrier=None, metadata=tuple()) dataclass

Request/context-scoped authentication state seeded by inbound adapters.

subject instance-attribute

Authenticated subject.

issuer instance-attribute

Authority that authenticated the subject.

tenant = None class-attribute instance-attribute

Optional tenant canonical reference.

roles = () class-attribute instance-attribute

Role canonical references granted to the subject.

scopes = () class-attribute instance-attribute

Scope canonical references granted to the subject.

claims = () class-attribute instance-attribute

Selected provider claims retained for downstream decisions.

credential_carrier = None class-attribute instance-attribute

Boundary-local credential carrier that produced this context.

metadata = field(default_factory=tuple) class-attribute instance-attribute

Framework-safe metadata associated with this auth context.

store_auth_context(application_context, auth_context)

Store AuthContext in ApplicationContext context values.

Source code in core/spakky-auth/src/spakky/auth/context.py
def store_auth_context(
    application_context: IApplicationContext,
    auth_context: AuthContext,
) -> None:
    """Store AuthContext in ApplicationContext context values."""
    application_context.set_context_value(AUTH_CONTEXT_CONTEXT_KEY, auth_context)

require_auth_context(application_context)

Load AuthContext from ApplicationContext or raise an auth error.

Source code in core/spakky-auth/src/spakky/auth/context.py
def require_auth_context(application_context: IApplicationContext) -> AuthContext:
    """Load AuthContext from ApplicationContext or raise an auth error."""
    value = application_context.get_context_value(AUTH_CONTEXT_CONTEXT_KEY)
    if isinstance(value, AuthContext):
        return value
    if value is None:
        raise AuthContextNotFoundError()
    raise InvalidAuthContextValueError()

Provider-neutral credential carrier model.

CredentialCarrierKind

Bases: StrEnum

Canonical credential material categories accepted by auth providers.

CredentialCarrierLocation

Bases: StrEnum

Where an inbound adapter observed credential material.

CredentialCarrier(*, kind, location, material, name=None, scheme=None) dataclass

Boundary-local credential material handed to an auth provider.

kind instance-attribute

Provider-neutral credential category.

location instance-attribute

Transport location where the credential was discovered.

material instance-attribute

Opaque credential material read by the inbound adapter.

name = None class-attribute instance-attribute

Optional transport field name, such as an HTTP header name.

scheme = None class-attribute instance-attribute

Optional credential scheme, such as Bearer.

Authorization decision state and reason model.

MISSING_SNAPSHOT_DECISION = AuthorizationDecision.challenge(AuthorizationReasonCode.SNAPSHOT_MISSING) module-attribute

Default decision for missing AuthContextSnapshot propagation metadata.

INVALID_SNAPSHOT_DECISION = AuthorizationDecision.challenge(AuthorizationReasonCode.SNAPSHOT_INVALID) module-attribute

Default decision for malformed or unsigned AuthContextSnapshot metadata.

EXPIRED_SNAPSHOT_DECISION = AuthorizationDecision.challenge(AuthorizationReasonCode.SNAPSHOT_EXPIRED) module-attribute

Default decision for expired AuthContextSnapshot metadata.

VERIFICATION_PROVIDER_UNAVAILABLE_DECISION = AuthorizationDecision.error(AuthorizationReasonCode.VERIFICATION_PROVIDER_UNAVAILABLE) module-attribute

Default decision when snapshot verification provider is unavailable.

AuthorizationDecisionState

Bases: StrEnum

Canonical auth enforcement outcomes.

AuthorizationReasonCode

Bases: StrEnum

Machine-readable reason codes attached to authorization decisions.

AuthorizationDecision(*, state, reason_code, reason=None) dataclass

Provider-neutral result of an auth enforcement decision.

state instance-attribute

ALLOW, CHALLENGE, DENY, or ERROR.

reason_code instance-attribute

Machine-readable reason for the decision.

reason = None class-attribute instance-attribute

Optional operator-facing reason text.

allow(reason_code=AuthorizationReasonCode.AUTHORIZED, reason=None) classmethod

Create an ALLOW decision.

Source code in core/spakky-auth/src/spakky/auth/decision.py
@classmethod
def allow(
    cls,
    reason_code: AuthorizationReasonCode = AuthorizationReasonCode.AUTHORIZED,
    reason: str | None = None,
) -> Self:
    """Create an ALLOW decision."""
    return cls(
        state=AuthorizationDecisionState.ALLOW,
        reason_code=reason_code,
        reason=reason,
    )

challenge(reason_code, reason=None) classmethod

Create a CHALLENGE decision.

Source code in core/spakky-auth/src/spakky/auth/decision.py
@classmethod
def challenge(
    cls,
    reason_code: AuthorizationReasonCode,
    reason: str | None = None,
) -> Self:
    """Create a CHALLENGE decision."""
    return cls(
        state=AuthorizationDecisionState.CHALLENGE,
        reason_code=reason_code,
        reason=reason,
    )

deny(reason_code, reason=None) classmethod

Create a DENY decision.

Source code in core/spakky-auth/src/spakky/auth/decision.py
@classmethod
def deny(
    cls,
    reason_code: AuthorizationReasonCode,
    reason: str | None = None,
) -> Self:
    """Create a DENY decision."""
    return cls(
        state=AuthorizationDecisionState.DENY,
        reason_code=reason_code,
        reason=reason,
    )

error(reason_code, reason=None) classmethod

Create an ERROR decision.

Source code in core/spakky-auth/src/spakky/auth/decision.py
@classmethod
def error(
    cls,
    reason_code: AuthorizationReasonCode,
    reason: str | None = None,
) -> Self:
    """Create an ERROR decision."""
    return cls(
        state=AuthorizationDecisionState.ERROR,
        reason_code=reason_code,
        reason=reason,
    )

Provider-neutral auth capability declarations.

AuthCapability

Bases: StrEnum

Auth provider capabilities declared by feature contributions.

Feature-local contribution contract for auth providers.

AUTH_CONTRIBUTION_ENTRY_POINT_GROUP = 'spakky.contributions.spakky.auth' module-attribute

Entry point group used by providers contributing to spakky-auth.

AuthProviderContribution(*, provider_id, capabilities) dataclass

Provider contribution metadata consumed by auth startup validation.

provider_id instance-attribute

Stable provider-neutral contribution identifier.

capabilities instance-attribute

Capabilities implemented by the contribution provider.

supports(capability)

Return whether this contribution declares a capability.

Source code in core/spakky-auth/src/spakky/auth/contribution.py
def supports(self, capability: AuthCapability) -> bool:
    """Return whether this contribution declares a capability."""
    return capability in self.capabilities

Provider-neutral invocation and dynamic auth reference contracts.

AuthDynamicRefKind

Bases: StrEnum

Dynamic auth reference target kinds resolved from an invocation.

AuthInvocationAttribute(*, name, value) dataclass

Typed scalar attribute made available to auth reference resolvers.

name instance-attribute

Provider-neutral attribute name.

value instance-attribute

JSON-scalar invocation attribute value.

AuthInvocation(*, boundary, operation, subject=None, attributes=()) dataclass

Provider-neutral description of the boundary call being authorized.

boundary instance-attribute

Boundary category such as HTTP, gRPC, CLI, task, event, or saga.

operation instance-attribute

Canonical operation or handler reference at the boundary.

subject = None class-attribute instance-attribute

Optional pre-authenticated subject when a boundary already has one.

attributes = () class-attribute instance-attribute

Scalar invocation attributes available to dynamic ref resolvers.

AuthDynamicRef(*, kind, expression) dataclass

Late-bound resource, action, or tenant reference expression.

kind instance-attribute

Reference kind expected from the resolver output.

expression instance-attribute

Provider-neutral expression name understood by the resolver.

ResolvedAuthReference(*, kind, value) dataclass

Resolved auth reference value returned by invocation resolvers.

kind instance-attribute

Resolved reference kind.

value instance-attribute

Canonical resource, action, or tenant reference.

Decorator metadata contracts for protected auth boundaries.

AuthRequirementKind

Bases: StrEnum

Canonical auth requirement categories supported by decorators.

AuthRequirement(*, kind, ref, resource=None, action=None, tenant=None) dataclass

Canonical protected-boundary auth requirement.

kind instance-attribute

Requirement category used by enforcement to select the provider port.

ref instance-attribute

Canonical permission, role, scope, relation, or marker reference.

resource = None class-attribute instance-attribute

Optional resource reference for permission, policy, or relation checks.

action = None class-attribute instance-attribute

Optional action reference for policy checks.

tenant = None class-attribute instance-attribute

Optional tenant reference for tenant-scoped checks.

EffectiveAuthMetadata(*, public_access, requirements) dataclass

Effective auth metadata after class/method aggregation.

public_access instance-attribute

Whether a public access marker is effective.

requirements instance-attribute

Ordered, duplicate-free protected requirements with AND semantics.

protected property

Return whether this boundary has protected requirements.

PublicAccess() dataclass

Bases: Annotation

Marker annotation for explicitly public boundaries.

ProtectedRequirement(requirement) dataclass

Bases: Annotation

Annotation carrying one protected auth requirement.

public_access(obj)

Mark a class or method as explicitly public.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def public_access[T: object](obj: T) -> T:
    """Mark a class or method as explicitly public."""
    return PublicAccess()(obj)

protected(obj)

Require an authenticated request-scope AuthContext.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def protected[T: object](obj: T) -> T:
    """Require an authenticated request-scope AuthContext."""
    return _requirement_decorator(
        AuthRequirement(
            kind=AuthRequirementKind.AUTHENTICATED,
            ref=AUTHENTICATED_REQUIREMENT_REF,
        )
    )(obj)

require_permission(permission, *, resource=None, tenant=None)

Require a permission decision for a class or method boundary.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def require_permission[T: object](
    permission: AuthPermissionRef,
    *,
    resource: AuthResourceRef | None = None,
    tenant: AuthTenantRef | None = None,
) -> Callable[[T], T]:
    """Require a permission decision for a class or method boundary."""
    return _requirement_decorator(
        AuthRequirement(
            kind=AuthRequirementKind.PERMISSION,
            ref=permission,
            resource=resource,
            tenant=tenant,
        )
    )

require_policy(resource, action, *, tenant=None)

Require a resource/action policy decision for a boundary.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def require_policy[T: object](
    resource: AuthResourceRef,
    action: AuthActionRef,
    *,
    tenant: AuthTenantRef | None = None,
) -> Callable[[T], T]:
    """Require a resource/action policy decision for a boundary."""
    return _requirement_decorator(
        AuthRequirement(
            kind=AuthRequirementKind.POLICY,
            ref=resource,
            resource=resource,
            action=action,
            tenant=tenant,
        )
    )

require_relation(relation, *, resource, tenant=None)

Require a relationship decision for a resource boundary.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def require_relation[T: object](
    relation: AuthRelationRef,
    *,
    resource: AuthResourceRef,
    tenant: AuthTenantRef | None = None,
) -> Callable[[T], T]:
    """Require a relationship decision for a resource boundary."""
    return _requirement_decorator(
        AuthRequirement(
            kind=AuthRequirementKind.RELATION,
            ref=relation,
            resource=resource,
            tenant=tenant,
        )
    )

require_role(role, *, tenant=None)

Require a role decision for a class or method boundary.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def require_role[T: object](
    role: AuthRoleRef,
    *,
    tenant: AuthTenantRef | None = None,
) -> Callable[[T], T]:
    """Require a role decision for a class or method boundary."""
    return _requirement_decorator(
        AuthRequirement(
            kind=AuthRequirementKind.ROLE,
            ref=role,
            tenant=tenant,
        )
    )

require_scope(scope)

Require a scope decision for a class or method boundary.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def require_scope[T: object](scope: AuthScopeRef) -> Callable[[T], T]:
    """Require a scope decision for a class or method boundary."""
    return _requirement_decorator(
        AuthRequirement(kind=AuthRequirementKind.SCOPE, ref=scope)
    )

get_effective_auth_metadata(obj, *, owner_type=None)

Aggregate class and method auth metadata using AND semantics.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def get_effective_auth_metadata(
    obj: object,
    *,
    owner_type: type[object] | None = None,
) -> EffectiveAuthMetadata:
    """Aggregate class and method auth metadata using AND semantics."""
    sources = _metadata_sources(obj, owner_type=owner_type)
    public = any(PublicAccess.exists(source) for source in sources)
    requirements = _dedupe_requirements(
        requirement.requirement
        for source in sources
        for requirement in ProtectedRequirement.all(source)
    )
    if public and requirements:
        raise ConflictingAuthMetadataError()
    return EffectiveAuthMetadata(public_access=public, requirements=requirements)

has_auth_boundary_metadata(obj)

Return whether an object or any method declares auth metadata.

Source code in core/spakky-auth/src/spakky/auth/metadata.py
def has_auth_boundary_metadata(obj: object) -> bool:
    """Return whether an object or any method declares auth metadata."""
    if _source_has_auth_metadata(obj):
        return True
    if isinstance(obj, type):
        return _type_has_method_auth_metadata(obj)
    if callable(obj):
        owner_type = _owner_type(obj)
        if owner_type is None:
            return _source_has_auth_metadata(obj)
        return _source_has_auth_metadata(owner_type) or _source_has_auth_metadata(obj)
    return _source_has_auth_metadata(type(obj)) or _type_has_method_auth_metadata(
        type(obj)
    )

ABC ports for provider-neutral auth providers.

AuthorizationRequest(*, auth_context, resource, action, tenant=None) dataclass

Provider-neutral policy evaluation request.

auth_context instance-attribute

Authenticated subject context.

resource instance-attribute

Canonical resource reference being accessed.

action instance-attribute

Canonical action reference being attempted.

tenant = None class-attribute instance-attribute

Optional tenant reference; None means tenant is not applicable.

PermissionCheckRequest(*, auth_context, permission, resource=None, tenant=None) dataclass

Permission check request scoped to an authenticated subject.

auth_context instance-attribute

Authenticated subject context.

permission instance-attribute

Canonical permission reference.

resource = None class-attribute instance-attribute

Optional resource reference; None means resource-independent.

tenant = None class-attribute instance-attribute

Optional tenant reference; None means tenant is not applicable.

RoleCheckRequest(*, auth_context, role, tenant=None) dataclass

Role check request scoped to an authenticated subject.

auth_context instance-attribute

Authenticated subject context.

role instance-attribute

Canonical role reference.

tenant = None class-attribute instance-attribute

Optional tenant reference; None means tenant is not applicable.

ScopeCheckRequest(*, auth_context, scope) dataclass

Scope check request scoped to an authenticated subject.

auth_context instance-attribute

Authenticated subject context.

scope instance-attribute

Canonical scope reference.

RelationCheckRequest(*, auth_context, relation, resource, tenant=None) dataclass

Relationship authorization request scoped to a resource.

auth_context instance-attribute

Authenticated subject context.

relation instance-attribute

Canonical relationship reference.

resource instance-attribute

Canonical resource reference for the relationship check.

tenant = None class-attribute instance-attribute

Optional tenant reference; None means tenant is not applicable.

SnapshotSignRequest(*, auth_context, tenant=None) dataclass

Request for signing an AuthContext snapshot for propagation.

auth_context instance-attribute

Auth context to serialize and sign.

tenant = None class-attribute instance-attribute

Optional tenant override; None means use the context tenant.

IAuthenticationProvider

Bases: ABC

Provider-neutral authentication port.

authenticate(credential, invocation) abstractmethod

Authenticate a credential observed at an invocation boundary.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def authenticate(
    self,
    credential: CredentialCarrier,
    invocation: AuthInvocation,
) -> AuthContext:
    """Authenticate a credential observed at an invocation boundary."""
    ...

IAuthorizationPolicyEvaluator

Bases: ABC

Provider-neutral policy evaluation port.

evaluate_policy(request) abstractmethod

Evaluate a resource/action authorization request.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def evaluate_policy(self, request: AuthorizationRequest) -> AuthorizationDecision:
    """Evaluate a resource/action authorization request."""
    ...

IPermissionChecker

Bases: ABC

Provider-neutral permission check port.

check_permission(request) abstractmethod

Check whether a subject has a permission.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def check_permission(
    self, request: PermissionCheckRequest
) -> AuthorizationDecision:
    """Check whether a subject has a permission."""
    ...

IRoleChecker

Bases: ABC

Provider-neutral role check port.

check_role(request) abstractmethod

Check whether a subject has a role.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def check_role(self, request: RoleCheckRequest) -> AuthorizationDecision:
    """Check whether a subject has a role."""
    ...

IScopeChecker

Bases: ABC

Provider-neutral scope check port.

check_scope(request) abstractmethod

Check whether a subject has a scope.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def check_scope(self, request: ScopeCheckRequest) -> AuthorizationDecision:
    """Check whether a subject has a scope."""
    ...

IRelationChecker

Bases: ABC

Provider-neutral relation check port.

check_relation(request) abstractmethod

Check whether a subject has a relation to a resource.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def check_relation(self, request: RelationCheckRequest) -> AuthorizationDecision:
    """Check whether a subject has a relation to a resource."""
    ...

IAuthContextSnapshotSigner

Bases: ABC

Provider-neutral AuthContextSnapshot signing port.

sign_snapshot(request) abstractmethod

Create a signed AuthContextSnapshot for propagation.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def sign_snapshot(self, request: SnapshotSignRequest) -> AuthContextSnapshot:
    """Create a signed AuthContextSnapshot for propagation."""
    ...

IAuthContextSnapshotVerifier

Bases: ABC

Provider-neutral AuthContextSnapshot verification port.

verify_snapshot(snapshot_envelope, invocation) abstractmethod

Verify a signed snapshot envelope and return its auth context.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def verify_snapshot(
    self,
    snapshot_envelope: str,
    invocation: AuthInvocation,
) -> AuthContext:
    """Verify a signed snapshot envelope and return its auth context."""
    ...

IPasswordHasher

Bases: ABC

Provider-neutral password hashing port.

hash_password(password) abstractmethod

Hash plaintext password material for storage.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def hash_password(self, password: AuthPasswordPlaintext) -> AuthPasswordHash:
    """Hash plaintext password material for storage."""
    ...

IPasswordVerifier

Bases: ABC

Provider-neutral password verification port.

verify_password(password, password_hash) abstractmethod

Verify plaintext password material against a stored hash.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def verify_password(
    self,
    password: AuthPasswordPlaintext,
    password_hash: AuthPasswordHash,
) -> AuthorizationDecision:
    """Verify plaintext password material against a stored hash."""
    ...

IAuthInvocationResolver

Bases: ABC

Resolver for invocation-scoped resource, action, and tenant refs.

resolve_ref(invocation, dynamic_ref) abstractmethod

Resolve a late-bound auth reference from invocation attributes.

Source code in core/spakky-auth/src/spakky/auth/ports.py
@abstractmethod
def resolve_ref(
    self,
    invocation: AuthInvocation,
    dynamic_ref: AuthDynamicRef,
) -> ResolvedAuthReference:
    """Resolve a late-bound auth reference from invocation attributes."""
    ...

Signed AuthContextSnapshot envelope contract.

AuthContextSnapshotSignature(*, key_id, algorithm, signature) dataclass

Signature material carried with a signed AuthContextSnapshot envelope.

key_id instance-attribute

Provider key identifier used to verify the envelope.

algorithm instance-attribute

Signature algorithm name.

signature instance-attribute

Base64url signature bytes produced by the signer provider.

canonical_payload()

Return canonical JSON-ready signature material.

Source code in core/spakky-auth/src/spakky/auth/snapshot.py
def canonical_payload(self) -> dict[str, str]:
    """Return canonical JSON-ready signature material."""
    return {
        "algorithm": self.algorithm,
        "key_id": self.key_id,
        "signature": self.signature,
    }

AuthContextSnapshot(*, subject, issuer, issued_at, expires_at, signature, schema_version=AUTH_CONTEXT_SNAPSHOT_SCHEMA_VERSION, tenant=None, roles=(), scopes=(), selected_claims=(), correlation_id=None, delegation_chain=()) dataclass

Canonical JSON signed envelope propagated instead of raw credentials.

subject instance-attribute

Subject represented by this snapshot.

issuer instance-attribute

Issuer that created the snapshot.

issued_at instance-attribute

Snapshot issue timestamp.

expires_at instance-attribute

Snapshot expiration timestamp.

signature instance-attribute

Signature material required by verification providers.

schema_version = AUTH_CONTEXT_SNAPSHOT_SCHEMA_VERSION class-attribute instance-attribute

Canonical snapshot schema version.

tenant = None class-attribute instance-attribute

Optional tenant canonical reference.

roles = () class-attribute instance-attribute

Role canonical references included in the snapshot.

scopes = () class-attribute instance-attribute

Scope canonical references included in the snapshot.

selected_claims = () class-attribute instance-attribute

Selected claims included in the signed snapshot.

correlation_id = None class-attribute instance-attribute

Optional trace/correlation reference for the propagated context.

delegation_chain = () class-attribute instance-attribute

Ordered subject/service delegation references.

canonical_payload()

Return canonical JSON-ready signed envelope payload.

Source code in core/spakky-auth/src/spakky/auth/snapshot.py
def canonical_payload(self) -> dict[str, object]:
    """Return canonical JSON-ready signed envelope payload."""
    selected_claims = {
        claim.name: claim.value
        for claim in sorted(self.selected_claims, key=lambda claim: claim.name)
    }
    return {
        "correlation_id": self.correlation_id,
        "delegation_chain": self.delegation_chain,
        "expires_at": self.expires_at.isoformat(),
        "issued_at": self.issued_at.isoformat(),
        "issuer": self.issuer,
        "roles": self.roles,
        "schema_version": self.schema_version,
        "scopes": self.scopes,
        "selected_claims": selected_claims,
        "signature": self.signature.canonical_payload(),
        "subject": {
            "display_name": self.subject.display_name,
            "id": self.subject.id,
        },
        "tenant": self.tenant,
    }

canonical_json()

Return compact, sorted canonical JSON for the signed envelope.

Source code in core/spakky-auth/src/spakky/auth/snapshot.py
def canonical_json(self) -> str:
    """Return compact, sorted canonical JSON for the signed envelope."""
    return json.dumps(
        self.canonical_payload(),
        sort_keys=True,
        separators=(",", ":"),
    )

base64url_canonical_json()

Return unpadded base64url canonical JSON envelope text.

Source code in core/spakky-auth/src/spakky/auth/snapshot.py
def base64url_canonical_json(self) -> str:
    """Return unpadded base64url canonical JSON envelope text."""
    encoded = urlsafe_b64encode(self.canonical_json().encode())
    return encoded.decode().rstrip("=")

Feature-local startup validation for auth capability contributions.

AUTH_STARTUP_VALIDATION_ERROR_DETAIL_KEY = 'auth.capability.validation.error' module-attribute

Startup diagnostic detail key for invalid auth capability provider counts.

SPAKKY_AUTH_SNAPSHOT_PROPAGATION_CONFIG_ENV_PREFIX = 'SPAKKY_AUTH_SNAPSHOT_PROPAGATION_' module-attribute

Environment prefix for signed AuthContextSnapshot propagation settings.

AuthSnapshotPropagationConfig

Bases: BaseSettings

Feature-local config declaring signed AuthContextSnapshot propagation use.

enabled = False class-attribute instance-attribute

Whether this application propagates signed AuthContextSnapshot envelopes.

AuthStartupCapabilityDiagnostic(*, capability, provider_count, provider_ids, required_by) dataclass

Structured diagnostic for one invalid required auth capability count.

capability instance-attribute

Capability required by protected metadata or snapshot propagation.

provider_count instance-attribute

Number of auth provider contributions declaring the capability.

provider_ids instance-attribute

Provider identifiers declaring the capability.

required_by instance-attribute

Startup configuration or scanned boundary sources requiring the capability.

as_startup_diagnostic_detail()

Convert this auth diagnostic into a startup report detail.

Source code in core/spakky-auth/src/spakky/auth/startup.py
def as_startup_diagnostic_detail(self) -> StartupDiagnosticDetail:
    """Convert this auth diagnostic into a startup report detail."""
    return StartupDiagnosticDetail(
        key=AUTH_STARTUP_VALIDATION_ERROR_DETAIL_KEY,
        value=(
            f"capability={self.capability.value};"
            f"provider_count={self.provider_count};"
            f"providers={','.join(self.provider_ids)};"
            f"required_by={','.join(self.required_by)}"
        ),
    )

AuthStartupCapabilityValidationError(diagnostics)

Bases: AbstractSpakkyAuthError, IStartupDiagnosticDetailProvider

Raised when required auth capabilities have zero or multiple providers.

Source code in core/spakky-auth/src/spakky/auth/startup.py
def __init__(
    self,
    diagnostics: tuple[AuthStartupCapabilityDiagnostic, ...],
) -> None:
    self.diagnostics = diagnostics
    super().__init__(self.message)

startup_diagnostic_details property

Return diagnostics attachable to the startup failure summary.

AuthStartupContainerUnavailableError

Bases: AbstractSpakkyAuthError

Raised when startup validation runs before container injection.

AuthCapabilityStartupValidator(container)

Validate auth capability providers required by this application.

Source code in core/spakky-auth/src/spakky/auth/startup.py
def __init__(self, container: IContainer) -> None:
    self._container = container

validate()

Validate required capability provider counts and fail fast on mismatch.

Source code in core/spakky-auth/src/spakky/auth/startup.py
def validate(self) -> None:
    """Validate required capability provider counts and fail fast on mismatch."""
    diagnostics = self._invalid_capability_diagnostics()
    if len(diagnostics) > 0:
        raise AuthStartupCapabilityValidationError(diagnostics=diagnostics)

AuthCapabilityStartupValidationService()

Bases: IService, IContainerAware

Service that runs auth capability validation before user services start.

Source code in core/spakky-auth/src/spakky/auth/startup.py
def __init__(self) -> None:
    self._validator = None
    self._stop_event = None

set_container(container)

Inject the application container for startup validation.

Source code in core/spakky-auth/src/spakky/auth/startup.py
@override
def set_container(self, container: IContainer) -> None:
    """Inject the application container for startup validation."""
    self._validator = AuthCapabilityStartupValidator(container=container)

set_stop_event(stop_event)

Store the lifecycle stop event supplied by the application context.

Source code in core/spakky-auth/src/spakky/auth/startup.py
@override
def set_stop_event(self, stop_event: threading.Event) -> None:
    """Store the lifecycle stop event supplied by the application context."""
    self._stop_event = stop_event

start()

Validate auth capabilities before subsequent services start.

Source code in core/spakky-auth/src/spakky/auth/startup.py
@override
def start(self) -> None:
    """Validate auth capabilities before subsequent services start."""
    if self._validator is None:
        raise AuthStartupContainerUnavailableError()
    self._validator.validate()

stop()

Signal validation service shutdown.

Source code in core/spakky-auth/src/spakky/auth/startup.py
@override
def stop(self) -> None:
    """Signal validation service shutdown."""
    if self._stop_event is not None:
        self._stop_event.set()

auth_snapshot_propagation_config()

Load signed AuthContextSnapshot propagation settings from environment.

Source code in core/spakky-auth/src/spakky/auth/startup.py
@Pod(name="auth_snapshot_propagation_config")
def auth_snapshot_propagation_config() -> AuthSnapshotPropagationConfig:
    """Load signed AuthContextSnapshot propagation settings from environment."""
    return AuthSnapshotPropagationConfig()

effective_auth_snapshot_propagation_config(configs)

Collapse all registered snapshot propagation configs into one decision.

Source code in core/spakky-auth/src/spakky/auth/startup.py
def effective_auth_snapshot_propagation_config(
    configs: tuple[AuthSnapshotPropagationConfig, ...],
) -> AuthSnapshotPropagationConfig:
    """Collapse all registered snapshot propagation configs into one decision."""
    if len(configs) == 0:
        return AuthSnapshotPropagationConfig()
    return AuthSnapshotPropagationConfig(
        enabled=any(config.enabled for config in configs)
    )

Aspect

AOP enforcement for protected auth metadata.

AuthorizationAspect(application_context=None, authorization_policy_evaluator=None, permission_checker=None, relation_checker=None, role_checker=None, scope_checker=None)

Bases: IAspect, IApplicationContextAware

Synchronous aspect enforcing protected auth metadata.

Source code in core/spakky-auth/src/spakky/auth/aspects/authorization.py
def __init__(
    self,
    application_context: IApplicationContext | None = None,
    authorization_policy_evaluator: IAuthorizationPolicyEvaluator | None = None,
    permission_checker: IPermissionChecker | None = None,
    relation_checker: IRelationChecker | None = None,
    role_checker: IRoleChecker | None = None,
    scope_checker: IScopeChecker | None = None,
) -> None:
    self._application_context = application_context
    self._authorization_policy_evaluator = authorization_policy_evaluator
    self._permission_checker = permission_checker
    self._relation_checker = relation_checker
    self._role_checker = role_checker
    self._scope_checker = scope_checker

set_application_context(application_context)

Inject the application context when managed as a Pod.

Source code in core/spakky-auth/src/spakky/auth/aspects/authorization.py
@override
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Inject the application context when managed as a Pod."""
    self._application_context = application_context

AsyncAuthorizationAspect(application_context=None, authorization_policy_evaluator=None, permission_checker=None, relation_checker=None, role_checker=None, scope_checker=None)

Bases: IAsyncAspect, IApplicationContextAware

Asynchronous aspect enforcing protected auth metadata.

Source code in core/spakky-auth/src/spakky/auth/aspects/authorization.py
def __init__(
    self,
    application_context: IApplicationContext | None = None,
    authorization_policy_evaluator: IAuthorizationPolicyEvaluator | None = None,
    permission_checker: IPermissionChecker | None = None,
    relation_checker: IRelationChecker | None = None,
    role_checker: IRoleChecker | None = None,
    scope_checker: IScopeChecker | None = None,
) -> None:
    self._application_context = application_context
    self._authorization_policy_evaluator = authorization_policy_evaluator
    self._permission_checker = permission_checker
    self._relation_checker = relation_checker
    self._role_checker = role_checker
    self._scope_checker = scope_checker

set_application_context(application_context)

Inject the application context when managed as a Pod.

Source code in core/spakky-auth/src/spakky/auth/aspects/authorization.py
@override
def set_application_context(self, application_context: IApplicationContext) -> None:
    """Inject the application context when managed as a Pod."""
    self._application_context = application_context

Error hierarchy for spakky-auth semantic model contracts.

AbstractSpakkyAuthError

Bases: AbstractSpakkyFrameworkError, ABC

Base class for all spakky-auth errors.

AuthContextError

Bases: AbstractSpakkyAuthError

Raised when AuthContext storage or lookup fails.

AuthContextNotFoundError

Bases: AuthContextError

Raised when ApplicationContext has no AuthContext value.

InvalidAuthContextValueError

Bases: AuthContextError

Raised when ApplicationContext contains a non-AuthContext value.

CredentialCarrierError

Bases: AbstractSpakkyAuthError

Raised when a credential carrier cannot be interpreted.

AuthenticationError

Bases: AbstractSpakkyAuthError

Raised when authentication fails before authorization policy evaluation.

AuthorizationError

Bases: AbstractSpakkyAuthError

Raised when authorization policy evaluation denies access.

ConflictingAuthMetadataError

Bases: AuthorizationError

Raised when public and protected auth metadata are both effective.

AuthRequirementDeniedError(decision=None)

Bases: AuthorizationError

Raised when an auth requirement decision is not ALLOW.

Source code in core/spakky-auth/src/spakky/auth/error.py
def __init__(self, decision: AuthorizationDecision | None = None) -> None:
    self.decision = decision
    super().__init__()

AuthRequirementProviderUnavailableError

Bases: AuthorizationError

Raised when no provider is available for a protected requirement.

AuthContextSnapshotError

Bases: AbstractSpakkyAuthError

Raised when an AuthContextSnapshot cannot be used.

MissingAuthContextSnapshotError

Bases: AuthContextSnapshotError

Raised when snapshot propagation metadata is absent.

InvalidAuthContextSnapshotError

Bases: AuthContextSnapshotError

Raised when snapshot propagation metadata is malformed or unsigned.

ExpiredAuthContextSnapshotError

Bases: AuthContextSnapshotError

Raised when snapshot propagation metadata is outside its time window.

AuthVerificationProviderUnavailableError

Bases: AuthContextSnapshotError

Raised when snapshot verification cannot reach its provider.