콘텐츠로 이동

spakky-agent

spakky-agent는 Agent workflow를 Spakky 컴포넌트로 모델링하기 위한 계약, 도구, 상태, signal, evidence 타입을 제공합니다.

Agentic Hexagonal Architecture의 core 계약입니다.

설치

pip install spakky-agent

spakky-agent@Agent, AgentExecutionSpec, AgentYield, AgentState, AgentSignal, AgentEvidence, IAgentModel, @agent_tool, context/safety/recovery, delegation 타입 같은 public contract를 소유합니다. 이 패키지는 의도적으로 vLLM, SQLAlchemy, FastAPI, Typer를 import하지 않습니다. 운영에서 durable execution을 사용하려면 spakky-sqlalchemy[agent] 같은 provider contribution의 repository 구현이 필요하며, 운영용 in-memory fallback은 제공하지 않습니다.

Public API

Agentic hexagonal architecture contracts for Spakky Framework.

Message = Progress module-attribute

Backward-compatible progress-yield payload alias.

TextDelta = Token module-attribute

Backward-compatible token-yield payload alias.

PLUGIN_NAME = Plugin(name='spakky-agent') module-attribute

Plugin identifier for the Spakky Agent package.

AgentCancellationCleanupCallable = Callable[[AgentCancellationRequest], Awaitable[AgentCancellationCleanupResult]]

Async hook shape invoked for model stream/tool/delegate cancellation cleanup.

JsonObject = Mapping[str, JsonValue]

JSON object payload used by public agent contracts.

JsonPrimitive = bool | float | int | str | None

Scalar JSON value accepted by agent public contracts.

JsonValue = JsonPrimitive | Mapping[str, JsonValue] | Sequence[JsonValue]

Recursive JSON-compatible value used at model, signal, and evidence boundaries.

AbstractSpakkyAgentError

Bases: AbstractSpakkyFrameworkError, ABC

Base class for agent-related errors.

AgentBootstrapError

Bases: AbstractSpakkyAgentError

Raised when agent bootstrap validation fails.

AgentDefinitionError

Bases: AbstractSpakkyAgentError

Raised when an agent contract cannot be defined safely.

AgentModelConfigurationError

Bases: AgentBootstrapError

Raised when an agent requires a model adapter but none is registered.

AgentOutputGuardError

Bases: AbstractSpakkyAgentError

Raised when an output guard detects unsafe streaming exposure.

AgentPersistenceConfigurationError

Bases: AgentBootstrapError

Raised when production agent persistence is required but not provided.

AgentToolBindingError

Bases: AbstractSpakkyAgentError

Raised when a model tool-call payload cannot be bound safely.

ContextDigest(id, context_identity, source_manifest_ref, digest, derived_from_pack_ids=tuple(), compression_evidence_ref=None, algorithm=None, summary=None, created_at=None, metadata=dict()) dataclass

Derived compression evidence for a context identity.

ContextFreshness

Bases: StrEnum

Freshness classification for context rot and budget decisions.

ContextHealthSignal(id, symptom, manifest_ref=None, pack_id=None, evidence_ref=None, score=None, observed_at=None, metadata=dict()) dataclass

Observed context rot signal used to choose optimization actions.

evidence_payload()

Return JSON-compatible signal metadata for append-only evidence.

Source code in core/spakky-agent/src/spakky/agent/context.py
def evidence_payload(self) -> Mapping[str, JsonValue]:
    """Return JSON-compatible signal metadata for append-only evidence."""
    return {
        "id": self.id,
        "symptom": self.symptom.value,
        "manifest_ref": self.manifest_ref,
        "pack_id": self.pack_id,
        "evidence_ref": self.evidence_ref,
        "score": self.score,
        "observed_at": self.observed_at.isoformat()
        if self.observed_at is not None
        else None,
        "metadata": self.metadata,
    }

ContextManifest(id, entries, origin_ref=None, evidence_refs=tuple(), created_at=None, metadata=dict()) dataclass

Auditable composition record for model input context packs.

ContextManifestEntry(pack_id, source, role, origin_ref, evidence_ref=None, digest_ref=None, sensitive_fields=(), metadata=dict()) dataclass

One audited pack entry inside a context manifest.

ContextOptimizationAction(id, kind, signal_refs=tuple(), target_pack_ids=tuple(), manifest_ref=None, digest_ref=None, delegation_ref=None, result_evidence_ref=None, reason=None, metadata=dict()) dataclass

Selected optimization action derived from context health signals.

evidence_payload()

Return JSON-compatible action metadata without raw context contents.

Source code in core/spakky-agent/src/spakky/agent/context.py
def evidence_payload(self) -> Mapping[str, JsonValue]:
    """Return JSON-compatible action metadata without raw context contents."""
    return {
        "id": self.id,
        "kind": self.kind.value,
        "signal_refs": tuple(self.signal_refs),
        "target_pack_ids": tuple(self.target_pack_ids),
        "manifest_ref": self.manifest_ref,
        "digest_ref": self.digest_ref,
        "delegation_ref": self.delegation_ref,
        "result_evidence_ref": self.result_evidence_ref,
        "reason": self.reason,
        "metadata": self.metadata,
    }

ContextOptimizationActionKind

Bases: StrEnum

Optimization actions that can be selected from context health signals.

ContextOptimizationEvidenceStage

Bases: StrEnum

Where an optimization action evidence item sits in the agent flow.

ContextPack(id, content, source, role, freshness=ContextFreshness.UNKNOWN, relevance=None, token_budget=ContextTokenBudget(), sensitivity=ContextSensitivity.INTERNAL, sensitive_fields=(), metadata=dict()) dataclass

LLM-facing context unit derived from state, signal, or evidence.

guarded_content(policy=None)

Return deterministic model-safe content for this context pack.

Source code in core/spakky-agent/src/spakky/agent/context.py
def guarded_content(
    self,
    policy: ContextExposurePolicy | None = None,
) -> str:
    """Return deterministic model-safe content for this context pack."""
    exposure_policy = policy or ContextExposurePolicy()
    if self.sensitivity == ContextSensitivity.REDACTED:
        return "[REDACTED]"
    guarded = guard_json_value(
        {"content": self.content},
        tuple(
            SensitiveFieldDescriptor(
                ("content", *descriptor.path), descriptor.field
            )
            for descriptor in self.sensitive_fields
        ),
        exposure_policy,
    )
    content = cast(Mapping[str, JsonValue], guarded).get("content")
    if isinstance(content, str):
        return content
    return "[REDACTED]"

message_metadata(policy=None)

Return non-content metadata for provider-neutral model messages.

Source code in core/spakky-agent/src/spakky/agent/context.py
def message_metadata(
    self,
    policy: ContextExposurePolicy | None = None,
) -> Mapping[str, JsonValue]:
    """Return non-content metadata for provider-neutral model messages."""
    exposure_policy = policy or ContextExposurePolicy()
    metadata: dict[str, JsonValue] = {
        "context_pack_id": self.id,
        "source": self.source,
        "role": self.role.value,
        "freshness": self.freshness.value,
        "relevance": self.relevance,
        "token_budget": {
            "max_tokens": self.token_budget.max_tokens,
            "estimated_tokens": self.token_budget.estimated_tokens,
            "reserved_output_tokens": self.token_budget.reserved_output_tokens,
        },
        "sensitivity": self.sensitivity.value,
        "metadata": self.metadata,
    }
    if exposure_policy.include_sensitive_context_metadata and self.sensitive_fields:
        metadata["sensitive_fields"] = tuple(
            descriptor.to_metadata() for descriptor in self.sensitive_fields
        )
    return metadata

ContextPackRole

Bases: StrEnum

Semantic role of a context pack inside a model request.

ContextRotSymptom

Bases: StrEnum

Typed context rot symptoms observed before model input assembly.

ContextSensitivity

Bases: StrEnum

Deterministic sensitivity metadata carried before model input.

ContextTokenBudget(max_tokens=None, estimated_tokens=None, reserved_output_tokens=None) dataclass

Token budget allocated to one context pack.

IAgentContextHandler

Bases: ABC

Select context optimization actions from health signals and manifests.

select_optimization_actions(signals, manifest) abstractmethod

Return optimization actions without mutating raw evidence.

Source code in core/spakky-agent/src/spakky/agent/context.py
@abstractmethod
def select_optimization_actions(
    self,
    signals: Sequence[ContextHealthSignal],
    manifest: "ContextManifest",
) -> Sequence[ContextOptimizationAction]:
    """Return optimization actions without mutating raw evidence."""
    ...

AgentCancellationCleanupReport(state_id, signal_id, outcomes=()) dataclass

Aggregate cleanup evidence for one CANCEL signal.

cleanup_succeeded property

Return whether all cleanup hooks completed without failure.

failed_outcomes property

Return the hook outcomes that force FAILED terminal state.

__post_init__()

Reject reports that cannot be attached to state and signal evidence.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def __post_init__(self) -> None:
    """Reject reports that cannot be attached to state and signal evidence."""
    _require_non_blank(self.state_id, "Agent cancellation report state id")
    _require_non_blank(self.signal_id, "Agent cancellation report signal id")

to_payload()

Serialize the report for state metadata and evidence payloads.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def to_payload(self) -> JsonObject:
    """Serialize the report for state metadata and evidence payloads."""
    return {
        "state_id": self.state_id,
        "signal_id": self.signal_id,
        "cleanup_succeeded": self.cleanup_succeeded,
        "outcomes": tuple(outcome.to_payload() for outcome in self.outcomes),
    }

to_evidence_candidate(*, summary=None)

Represent cancellation cleanup as append-only evidence.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def to_evidence_candidate(
    self,
    *,
    summary: str | None = None,
) -> AgentEvidenceCandidate:
    """Represent cancellation cleanup as append-only evidence."""
    return AgentEvidenceCandidate(
        kind=AgentEvidenceKind.CANCELLATION,
        payload=self.to_payload(),
        summary=summary,
    )

AgentCancellationCleanupResult(target_kind, target_ref, status, reason=None, error_code=None, message=None, metadata=dict()) dataclass

Result returned by one cancellation cleanup hook.

failed_cleanup property

Return whether this hook prevents a clean CANCELLED terminal state.

__post_init__()

Reject outcomes that cannot be correlated with the cleanup target.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def __post_init__(self) -> None:
    """Reject outcomes that cannot be correlated with the cleanup target."""
    _require_non_blank(self.target_ref, "Agent cancellation cleanup target ref")
    _require_optional_non_blank(self.reason, "Agent cancellation cleanup reason")
    _require_optional_non_blank(
        self.error_code,
        "Agent cancellation cleanup error code",
    )
    _require_optional_non_blank(self.message, "Agent cancellation cleanup message")

succeeded(*, target_kind, target_ref, reason=None, metadata=None) classmethod

Record a cleanup hook that released its target.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
@classmethod
def succeeded(
    cls,
    *,
    target_kind: AgentCancellationTargetKind,
    target_ref: str,
    reason: str | None = None,
    metadata: JsonObject | None = None,
) -> "AgentCancellationCleanupResult":
    """Record a cleanup hook that released its target."""
    return cls(
        target_kind=target_kind,
        target_ref=target_ref,
        status=AgentCancellationCleanupStatus.SUCCEEDED,
        reason=reason,
        metadata={} if metadata is None else metadata,
    )

failed(*, target_kind, target_ref, error_code, message, metadata=None) classmethod

Record a cleanup hook that could not release its target.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
@classmethod
def failed(
    cls,
    *,
    target_kind: AgentCancellationTargetKind,
    target_ref: str,
    error_code: str,
    message: str,
    metadata: JsonObject | None = None,
) -> "AgentCancellationCleanupResult":
    """Record a cleanup hook that could not release its target."""
    return cls(
        target_kind=target_kind,
        target_ref=target_ref,
        status=AgentCancellationCleanupStatus.FAILED,
        error_code=error_code,
        message=message,
        metadata={} if metadata is None else metadata,
    )

skipped(*, target_kind, target_ref, reason, metadata=None) classmethod

Record a cleanup target that was already inactive or unavailable.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
@classmethod
def skipped(
    cls,
    *,
    target_kind: AgentCancellationTargetKind,
    target_ref: str,
    reason: str,
    metadata: JsonObject | None = None,
) -> "AgentCancellationCleanupResult":
    """Record a cleanup target that was already inactive or unavailable."""
    return cls(
        target_kind=target_kind,
        target_ref=target_ref,
        status=AgentCancellationCleanupStatus.SKIPPED,
        reason=reason,
        metadata={} if metadata is None else metadata,
    )

to_payload()

Serialize this hook outcome into evidence/state metadata.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def to_payload(self) -> JsonObject:
    """Serialize this hook outcome into evidence/state metadata."""
    return {
        "target_kind": self.target_kind.value,
        "target_ref": self.target_ref,
        "status": self.status.value,
        "reason": self.reason,
        "error_code": self.error_code,
        "message": self.message,
        "metadata": self.metadata,
    }

AgentCancellationCleanupStatus

Bases: StrEnum

Outcome of one cancellation cleanup hook.

AgentCancellationCleanupTask(target_kind, target_ref, cleanup, metadata=dict()) dataclass

One cleanup hook registered for a running cancellation target.

__post_init__()

Reject hooks that cannot be identified in cleanup evidence.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def __post_init__(self) -> None:
    """Reject hooks that cannot be identified in cleanup evidence."""
    _require_non_blank(self.target_ref, "Agent cancellation cleanup target ref")

run(*, state, signal) async

Invoke the hook and verify that it reports the same target.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
async def run(
    self,
    *,
    state: AgentState,
    signal: AgentSignal,
) -> AgentCancellationCleanupResult:
    """Invoke the hook and verify that it reports the same target."""
    request = AgentCancellationRequest.from_signal(
        state=state,
        signal=signal,
        target_kind=self.target_kind,
        target_ref=self.target_ref,
        metadata=self.metadata,
    )
    result = await self.cleanup(request)
    if (
        result.target_kind is not self.target_kind
        or result.target_ref != self.target_ref
    ):
        raise AgentDefinitionError(
            "Agent cancellation cleanup result target must match the task"
        )
    return result

AgentCancellationRequest(state_id, signal_id, target_kind, target_ref, reason=None, requested_by=None, metadata=dict()) dataclass

Cancellation request passed to a model stream, tool, or delegate hook.

__post_init__()

Reject requests that cannot be traced back to state/signal/target.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def __post_init__(self) -> None:
    """Reject requests that cannot be traced back to state/signal/target."""
    _require_non_blank(self.state_id, "Agent cancellation state id")
    _require_non_blank(self.signal_id, "Agent cancellation signal id")
    _require_non_blank(self.target_ref, "Agent cancellation target ref")
    _require_optional_non_blank(self.reason, "Agent cancellation reason")
    _require_optional_non_blank(self.requested_by, "Agent cancellation requester")

from_signal(*, state, signal, target_kind, target_ref, metadata=None) classmethod

Build a hook request from the durable CANCEL signal payload.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
@classmethod
def from_signal(
    cls,
    *,
    state: AgentState,
    signal: AgentSignal,
    target_kind: AgentCancellationTargetKind,
    target_ref: str,
    metadata: JsonObject | None = None,
) -> "AgentCancellationRequest":
    """Build a hook request from the durable CANCEL signal payload."""
    _require_cancel_signal(signal)
    _require_signal_matches_state(state, signal)
    return cls(
        state_id=state.id,
        signal_id=signal.id,
        target_kind=target_kind,
        target_ref=target_ref,
        reason=_optional_string(signal.payload, "reason"),
        requested_by=_optional_string(signal.payload, "requested_by"),
        metadata={} if metadata is None else metadata,
    )

to_payload()

Serialize the request into append-only evidence metadata.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def to_payload(self) -> JsonObject:
    """Serialize the request into append-only evidence metadata."""
    return {
        "state_id": self.state_id,
        "signal_id": self.signal_id,
        "target_kind": self.target_kind.value,
        "target_ref": self.target_ref,
        "reason": self.reason,
        "requested_by": self.requested_by,
        "metadata": self.metadata,
    }

AgentCancellationTargetKind

Bases: StrEnum

Running execution target that can receive a cancellation cleanup hook.

AgentEvidence(id, agent_state_id, kind, payload=dict(), summary=None, digest=None, manifest_ref=None, reference=None, sensitive_fields=(), created_at=None) dataclass

Append-only artifact captured during an agent execution.

AgentEvidenceCandidate(kind, payload=dict(), summary=None, digest=None, manifest_ref=None, reference=None, sensitive_fields=()) dataclass

Append-only evidence candidate before repository id assignment.

tool_result(*, tool_identity, tool_schema_name, result, capture, sensitive_fields=(), exposure_policy=None, summary=None) classmethod

Create evidence metadata for a captured tool result.

Source code in core/spakky-agent/src/spakky/agent/evidence.py
@classmethod
def tool_result(
    cls,
    *,
    tool_identity: str,
    tool_schema_name: str,
    result: JsonObject,
    capture: EvidenceCapture,
    sensitive_fields: tuple[SensitiveFieldDescriptor, ...] = (),
    exposure_policy: EvidenceExposurePolicy | None = None,
    summary: str | None = None,
) -> "AgentEvidenceCandidate":
    """Create evidence metadata for a captured tool result."""
    _require_non_blank(tool_identity, "Agent tool identity")
    _require_non_blank(tool_schema_name, "Agent tool schema name")
    policy = exposure_policy or EvidenceExposurePolicy()
    guarded_result = guard_json_value(result, sensitive_fields, policy)
    if not isinstance(guarded_result, Mapping):
        guarded_result = {}
    return cls(
        kind=AgentEvidenceKind.TOOL,
        payload={
            "tool_identity": tool_identity,
            "tool_schema_name": tool_schema_name,
            "capture": capture.value,
            "result": guarded_result,
        },
        sensitive_fields=sensitive_fields
        if policy.include_sensitive_metadata
        else (),
        summary=summary,
    )

model_decision(*, model, decision, summary=None) classmethod

Create evidence metadata for a model decision.

Source code in core/spakky-agent/src/spakky/agent/evidence.py
@classmethod
def model_decision(
    cls,
    *,
    model: str,
    decision: JsonObject,
    summary: str | None = None,
) -> "AgentEvidenceCandidate":
    """Create evidence metadata for a model decision."""
    _require_non_blank(model, "Agent model name")
    return cls(
        kind=AgentEvidenceKind.MODEL,
        payload={"model": model, "decision": decision},
        summary=summary,
    )

tool_decision(*, tool_identity, decision, summary=None) classmethod

Create evidence metadata for a tool-routing decision.

Source code in core/spakky-agent/src/spakky/agent/evidence.py
@classmethod
def tool_decision(
    cls,
    *,
    tool_identity: str,
    decision: JsonObject,
    summary: str | None = None,
) -> "AgentEvidenceCandidate":
    """Create evidence metadata for a tool-routing decision."""
    _require_non_blank(tool_identity, "Agent tool identity")
    return cls(
        kind=AgentEvidenceKind.TOOL,
        payload={"tool_identity": tool_identity, "decision": decision},
        summary=summary,
    )

context_optimization(*, action, stage, signals=(), summary=None) classmethod

Create before/after evidence for a context optimization action.

Source code in core/spakky-agent/src/spakky/agent/evidence.py
@classmethod
def context_optimization(
    cls,
    *,
    action: ContextOptimizationAction,
    stage: ContextOptimizationEvidenceStage,
    signals: Sequence[ContextHealthSignal] = (),
    summary: str | None = None,
) -> "AgentEvidenceCandidate":
    """Create before/after evidence for a context optimization action."""
    _require_non_blank(action.id, "Context optimization action id")
    return cls(
        kind=AgentEvidenceKind.CONTEXT_OPTIMIZATION,
        payload={
            "stage": stage.value,
            "action": action.evidence_payload(),
            "signals": tuple(signal.evidence_payload() for signal in signals),
        },
        summary=summary,
        digest=action.digest_ref,
        manifest_ref=action.manifest_ref,
        reference=action.result_evidence_ref,
    )

to_evidence(*, evidence_id, agent_state_id, created_at=None)

Assign repository identity while preserving append-only contents.

Source code in core/spakky-agent/src/spakky/agent/evidence.py
def to_evidence(
    self,
    *,
    evidence_id: str,
    agent_state_id: str,
    created_at: datetime | None = None,
) -> AgentEvidence:
    """Assign repository identity while preserving append-only contents."""
    _require_non_blank(evidence_id, "Agent evidence id")
    _require_non_blank(agent_state_id, "Agent state id")
    return AgentEvidence(
        id=evidence_id,
        agent_state_id=agent_state_id,
        kind=self.kind,
        payload=self.payload,
        summary=self.summary,
        digest=self.digest,
        manifest_ref=self.manifest_ref,
        reference=self.reference,
        sensitive_fields=self.sensitive_fields,
        created_at=created_at,
    )

AgentEvidenceKind

Bases: StrEnum

Kinds of append-only evidence captured by agent execution.

AgentApprovalBoundaryKind

Bases: StrEnum

Action boundaries where orchestration may require human approval.

AgentApprovalDecisionOutcome(request_id, decision, status, transition, reason=None, modified_payload=dict(), comment=None) dataclass

Typed result of an approval decision signal.

AgentApprovalPlan(action, request=None, state=None, yield_item=None) dataclass

Plan for an action boundary before executing it.

requires_approval property

Return whether orchestration must wait for a HITL decision.

AgentApprovalPlanAction

Bases: StrEnum

Approval plan outcome before an action boundary is executed.

AgentApprovalRequest(id, agent_state_id, boundary, prompt, risk, action_ref, allowed_decisions=DEFAULT_APPROVAL_DECISIONS, metadata=dict()) dataclass

Approval request materialized at a risky action boundary.

__post_init__()

Reject approval requests that cannot be matched by a signal.

Source code in core/spakky-agent/src/spakky/agent/approval.py
def __post_init__(self) -> None:
    """Reject approval requests that cannot be matched by a signal."""
    _require_non_blank(self.id, "Agent approval id")
    _require_non_blank(self.agent_state_id, "Agent approval state id")
    _require_non_blank(self.prompt, "Agent approval prompt")
    _require_non_blank(self.action_ref, "Agent approval action ref")
    if len(self.allowed_decisions) == 0:
        raise AgentDefinitionError("Agent approval decisions cannot be empty")

from_tool_descriptor(*, approval_id, agent_state_id, descriptor, prompt=None, call_id=None, metadata=None) classmethod

Build an approval request from a risky tool descriptor.

Source code in core/spakky-agent/src/spakky/agent/approval.py
@classmethod
def from_tool_descriptor(
    cls,
    *,
    approval_id: str,
    agent_state_id: str,
    descriptor: AgentToolDescriptor,
    prompt: str | None = None,
    call_id: str | None = None,
    metadata: JsonObject | None = None,
) -> "AgentApprovalRequest":
    """Build an approval request from a risky tool descriptor."""
    request_metadata = _tool_approval_metadata(descriptor, call_id, metadata or {})
    return cls(
        id=approval_id,
        agent_state_id=agent_state_id,
        boundary=AgentApprovalBoundaryKind.TOOL_INVOCATION,
        prompt=prompt or f"Approve tool invocation: {descriptor.name}",
        risk=descriptor.metadata.risk,
        action_ref=descriptor.identity.key,
        metadata=request_metadata,
    )

to_state(*, agent_type)

Materialize the approval wait as interrupted lifecycle state.

Source code in core/spakky-agent/src/spakky/agent/approval.py
def to_state(self, *, agent_type: str) -> AgentState:
    """Materialize the approval wait as interrupted lifecycle state."""
    _require_non_blank(agent_type, "Agent approval agent type")
    return AgentState(
        id=self.agent_state_id,
        agent_type=agent_type,
        status=AgentStatus.INTERRUPTED,
        transition=AgentStateTransition.WAITING_APPROVAL,
        reason=AgentStateReason.APPROVAL_REQUIRED,
        current_activity=self.prompt,
        metadata={"approval": self.to_metadata()},
    )

to_yield()

Expose this approval request to an inbound adapter stream.

Source code in core/spakky-agent/src/spakky/agent/approval.py
def to_yield(self) -> AgentYield[Approval]:
    """Expose this approval request to an inbound adapter stream."""
    return AgentYield(
        kind=AgentYieldKind.APPROVAL,
        payload=Approval(
            id=self.id,
            prompt=self.prompt,
            allowed_decisions=self.allowed_decisions,
            metadata=self.to_metadata(),
        ),
    )

to_metadata()

Return JSON-compatible metadata for state, yield, and evidence.

Source code in core/spakky-agent/src/spakky/agent/approval.py
def to_metadata(self) -> JsonObject:
    """Return JSON-compatible metadata for state, yield, and evidence."""
    return {
        "id": self.id,
        "agent_state_id": self.agent_state_id,
        "boundary": self.boundary.value,
        "action_ref": self.action_ref,
        "risk_axes": [axis.value for axis in self.risk.axes],
        "allowed_decisions": [
            decision.value for decision in self.allowed_decisions
        ],
        "metadata": self.metadata,
    }

AgentActionBoundaryCheckpoint(action_id, action_kind, stage, idempotency=Idempotency.UNKNOWN, metadata=dict()) dataclass

Serializable checkpoint recorded before or after one external action.

__post_init__()

Reject checkpoints that cannot be correlated after restart.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
def __post_init__(self) -> None:
    """Reject checkpoints that cannot be correlated after restart."""
    if not self.action_id.strip():
        raise AgentDefinitionError("Agent action boundary id cannot be blank")

before_model_call(action_id, *, idempotency=Idempotency.UNKNOWN, metadata=None) classmethod

Create the checkpoint recorded before a model call is attempted.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
@classmethod
def before_model_call(
    cls,
    action_id: str,
    *,
    idempotency: Idempotency = Idempotency.UNKNOWN,
    metadata: JsonObject | None = None,
) -> "AgentActionBoundaryCheckpoint":
    """Create the checkpoint recorded before a model call is attempted."""
    return cls._for_action(
        action_id,
        AgentActionKind.MODEL_CALL,
        AgentActionBoundaryStage.BEFORE,
        idempotency=idempotency,
        metadata=metadata,
    )

after_model_call(action_id, *, idempotency=Idempotency.UNKNOWN, metadata=None) classmethod

Create the checkpoint recorded after a model call completes.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
@classmethod
def after_model_call(
    cls,
    action_id: str,
    *,
    idempotency: Idempotency = Idempotency.UNKNOWN,
    metadata: JsonObject | None = None,
) -> "AgentActionBoundaryCheckpoint":
    """Create the checkpoint recorded after a model call completes."""
    return cls._for_action(
        action_id,
        AgentActionKind.MODEL_CALL,
        AgentActionBoundaryStage.AFTER,
        idempotency=idempotency,
        metadata=metadata,
    )

before_tool_call(action_id, *, idempotency, metadata=None) classmethod

Create the checkpoint recorded before a tool call is attempted.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
@classmethod
def before_tool_call(
    cls,
    action_id: str,
    *,
    idempotency: Idempotency,
    metadata: JsonObject | None = None,
) -> "AgentActionBoundaryCheckpoint":
    """Create the checkpoint recorded before a tool call is attempted."""
    return cls._for_action(
        action_id,
        AgentActionKind.TOOL_CALL,
        AgentActionBoundaryStage.BEFORE,
        idempotency=idempotency,
        metadata=metadata,
    )

after_tool_call(action_id, *, idempotency, metadata=None) classmethod

Create the checkpoint recorded after a tool call completes.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
@classmethod
def after_tool_call(
    cls,
    action_id: str,
    *,
    idempotency: Idempotency,
    metadata: JsonObject | None = None,
) -> "AgentActionBoundaryCheckpoint":
    """Create the checkpoint recorded after a tool call completes."""
    return cls._for_action(
        action_id,
        AgentActionKind.TOOL_CALL,
        AgentActionBoundaryStage.AFTER,
        idempotency=idempotency,
        metadata=metadata,
    )

before_approval_wait(action_id, *, metadata=None) classmethod

Create the checkpoint recorded before waiting for approval.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
@classmethod
def before_approval_wait(
    cls,
    action_id: str,
    *,
    metadata: JsonObject | None = None,
) -> "AgentActionBoundaryCheckpoint":
    """Create the checkpoint recorded before waiting for approval."""
    return cls._for_action(
        action_id,
        AgentActionKind.APPROVAL_WAIT,
        AgentActionBoundaryStage.BEFORE,
        idempotency=Idempotency.NON_IDEMPOTENT,
        metadata=metadata,
    )

after_approval_wait(action_id, *, metadata=None) classmethod

Create the checkpoint recorded after an approval wait resolves.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
@classmethod
def after_approval_wait(
    cls,
    action_id: str,
    *,
    metadata: JsonObject | None = None,
) -> "AgentActionBoundaryCheckpoint":
    """Create the checkpoint recorded after an approval wait resolves."""
    return cls._for_action(
        action_id,
        AgentActionKind.APPROVAL_WAIT,
        AgentActionBoundaryStage.AFTER,
        idempotency=Idempotency.NON_IDEMPOTENT,
        metadata=metadata,
    )

to_evidence_candidate(*, summary=None)

Represent this checkpoint as append-only evidence.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
def to_evidence_candidate(
    self,
    *,
    summary: str | None = None,
) -> AgentEvidenceCandidate:
    """Represent this checkpoint as append-only evidence."""
    return AgentEvidenceCandidate(
        kind=AgentEvidenceKind.ACTION_BOUNDARY,
        payload={
            "action_id": self.action_id,
            "action_kind": self.action_kind.value,
            "stage": self.stage.value,
            "idempotency": self.idempotency.value,
            "metadata": self.metadata,
        },
        summary=summary,
    )

AgentActionBoundaryStage

Bases: StrEnum

Checkpoint side recorded around an action boundary.

AgentActionKind

Bases: StrEnum

Recoverable external action classes in an agent execution.

AgentResumeAction

Bases: StrEnum

Orchestration action selected from persisted checkpoint evidence.

AgentResumeBoundary(action_id, action_kind, stage, idempotency, evidence_id) dataclass

Last action boundary reconstructed from append-only evidence.

AgentResumePlan(state, action, boundary=None, signals=()) dataclass

Resume decision derived from persisted state and checkpoint evidence.

requires_human_input property

Return whether automatic replay must stop at HITL recovery.

can_resume_automatically property

Return whether orchestration may continue without approval.

AgentDelegateTarget(agent_type, agent_name=None, metadata=dict()) dataclass

First-class delegate target represented by another @Agent component.

__post_init__()

Reject delegate targets that cannot identify an agent component.

Source code in core/spakky-agent/src/spakky/agent/delegation.py
def __post_init__(self) -> None:
    """Reject delegate targets that cannot identify an agent component."""
    if not self.agent_type.strip():
        raise AgentDefinitionError("Delegate target agent type cannot be blank")
    if self.agent_name is not None and not self.agent_name.strip():
        raise AgentDefinitionError("Delegate target agent name cannot be blank")

DelegationBudget(max_steps=None, max_tokens=None, timeout_seconds=None, deadline_at=None, metadata=dict()) dataclass

Budget metadata attached to a delegation packet.

__post_init__()

Reject delegation budgets that cannot be enforced consistently.

Source code in core/spakky-agent/src/spakky/agent/delegation.py
def __post_init__(self) -> None:
    """Reject delegation budgets that cannot be enforced consistently."""
    if self.max_steps is not None and self.max_steps <= 0:
        raise AgentDefinitionError("Delegation max steps must be positive")
    if self.max_tokens is not None and self.max_tokens <= 0:
        raise AgentDefinitionError("Delegation max tokens must be positive")
    if self.timeout_seconds is not None and self.timeout_seconds <= 0:
        raise AgentDefinitionError("Delegation timeout must be positive")

DelegationContextSlice(summary=None, evidence_refs=(), manifest_ref=None, metadata=dict()) dataclass

Minimal parent context projected for a child agent.

DelegationExpectedOutput(description=None, schema=dict(), metadata=dict()) dataclass

Expected child output description and optional JSON schema.

DelegationPacket(id, parent_agent_state_id, target, task, context=DelegationContextSlice(), constraints=(), expected_output=DelegationExpectedOutput(), budget=DelegationBudget(), allowed_capabilities=(), return_policy=DelegationReturnPolicy.SUMMARY_AND_EVIDENCE, metadata=dict()) dataclass

Task packet passed from a parent agent to a delegate agent.

__post_init__()

Reject delegation packets without parent linkage or task identity.

Source code in core/spakky-agent/src/spakky/agent/delegation.py
def __post_init__(self) -> None:
    """Reject delegation packets without parent linkage or task identity."""
    if not self.id.strip():
        raise AgentDefinitionError("Delegation id cannot be blank")
    if not self.parent_agent_state_id.strip():
        raise AgentDefinitionError("Delegation parent state id cannot be blank")

DelegationResult(id, packet_id, target, summary, output=None, evidence_refs=(), metadata=dict(), created_at=None) dataclass

Child agent result projected back to the parent execution.

__post_init__()

Reject delegated results that cannot be linked to a packet.

Source code in core/spakky-agent/src/spakky/agent/delegation.py
def __post_init__(self) -> None:
    """Reject delegated results that cannot be linked to a packet."""
    if not self.id.strip():
        raise AgentDefinitionError("Delegation result id cannot be blank")
    if not self.packet_id.strip():
        raise AgentDefinitionError("Delegation result packet id cannot be blank")
    if not self.summary.strip():
        raise AgentDefinitionError("Delegation result summary cannot be blank")

to_parent_evidence(*, evidence_id, parent_agent_state_id)

Represent a delegated result as append-only parent evidence.

Source code in core/spakky-agent/src/spakky/agent/delegation.py
def to_parent_evidence(
    self,
    *,
    evidence_id: str,
    parent_agent_state_id: str,
) -> AgentEvidence:
    """Represent a delegated result as append-only parent evidence."""
    payload: dict[str, JsonValue] = {
        "delegation_id": self.id,
        "packet_id": self.packet_id,
        "target_agent_type": self.target.agent_type,
        "evidence_refs": self.evidence_refs,
        "metadata": self.metadata,
    }
    if self.target.agent_name is not None:
        payload["target_agent_name"] = self.target.agent_name
    if self.output is not None:
        payload["output"] = self.output
    return AgentEvidence(
        id=evidence_id,
        agent_state_id=parent_agent_state_id,
        kind=AgentEvidenceKind.DELEGATION,
        payload=payload,
        summary=self.summary,
        created_at=self.created_at,
    )

to_parent_yield(*, evidence_id, parent_agent_state_id)

Expose the delegated result on the parent's AgentYield stream.

Source code in core/spakky-agent/src/spakky/agent/delegation.py
def to_parent_yield(
    self,
    *,
    evidence_id: str,
    parent_agent_state_id: str,
) -> AgentYield[Evidence]:
    """Expose the delegated result on the parent's AgentYield stream."""
    evidence = self.to_parent_evidence(
        evidence_id=evidence_id,
        parent_agent_state_id=parent_agent_state_id,
    )
    return AgentYield(
        kind=AgentYieldKind.EVIDENCE,
        payload=Evidence(
            evidence=evidence,
            metadata={"delegation_id": self.id, "packet_id": self.packet_id},
        ),
    )

DelegationReturnPolicy

Bases: StrEnum

How a child agent result should be projected back to the parent.

IAgentDelegate

Bases: ABC

Execution hook that runs a delegation packet against a delegate target.

delegate(packet) abstractmethod

Execute delegation without prescribing spawn topology or transport.

Source code in core/spakky-agent/src/spakky/agent/delegation.py
@abstractmethod
def delegate(
    self,
    packet: DelegationPacket,
) -> AsyncGenerator[AgentYield[DelegationResult], None]:
    """Execute delegation without prescribing spawn topology or transport."""

Agent(spec=AgentExecutionSpec(), *, name='', scope=Scope.SINGLETON) dataclass

Bases: Pod

UseCase-equivalent Pod stereotype for agentic workflow components.

validate_bootstrap()

Re-run definition validation during application bootstrap.

Source code in core/spakky-agent/src/spakky/agent/execution.py
def validate_bootstrap(self) -> None:
    """Re-run definition validation during application bootstrap."""
    self._validate_execute_contract(self._ensure_agent_class(self.target))

required_persistence_repository_types()

Return repository ports required by this Agent's durable path.

Source code in core/spakky-agent/src/spakky/agent/execution.py
def required_persistence_repository_types(self) -> tuple[type[object], ...]:
    """Return repository ports required by this Agent's durable path."""
    from spakky.agent.interfaces.repository import (
        IAgentEvidenceRepository,
        IAgentSignalRepository,
        IAgentStateRepository,
    )

    if (
        self.spec.recovery is RecoveryStrategy.ACTION_BOUNDARY
        or len(self.spec.accepted_signals) > 0
    ):
        return (
            IAgentStateRepository,
            IAgentSignalRepository,
            IAgentEvidenceRepository,
        )
    return ()

AgentExecutionLimits(timeout_seconds=None) dataclass

Bounded execution limits declared outside infrastructure capabilities.

__post_init__()

Reject limits that would fail later at bootstrap.

Source code in core/spakky-agent/src/spakky/agent/execution.py
def __post_init__(self) -> None:
    """Reject limits that would fail later at bootstrap."""
    if self.timeout_seconds is not None and self.timeout_seconds <= 0:
        raise AgentDefinitionError("Agent timeout limit must be positive")

AgentExecutionSpec(name=None, objective=None, accepted_signals=(), recovery=RecoveryStrategy.NONE, streaming_exposure_mode=StreamingExposureMode.BALANCED, timeout_seconds=None, limits=AgentExecutionLimits(), delegation_allowed=False, metadata=dict()) dataclass

Declarative execution semantics that cannot be inferred from DI alone.

__post_init__()

Reject execution specs that would fail later at bootstrap.

Source code in core/spakky-agent/src/spakky/agent/execution.py
def __post_init__(self) -> None:
    """Reject execution specs that would fail later at bootstrap."""
    if self.name is not None and not self.name.strip():
        raise AgentDefinitionError("Agent name cannot be blank")
    if self.objective is not None and not self.objective.strip():
        raise AgentDefinitionError("Agent objective cannot be blank")
    if self.timeout_seconds is not None and self.timeout_seconds <= 0:
        raise AgentDefinitionError("Agent timeout must be positive")
    if (
        self.timeout_seconds is not None
        and self.limits.timeout_seconds is not None
        and self.timeout_seconds != self.limits.timeout_seconds
    ):
        raise AgentDefinitionError("Agent timeout declarations must match")

AgentSignalKind

Bases: StrEnum

Inbound stimulus kinds that an agent may accept while running.

RecoveryStrategy

Bases: StrEnum

Recovery strategy requested by an agent execution contract.

StreamingExposureMode

Bases: StrEnum

Streaming output guard profile exposed by agent execution.

IAgentEvidenceRepository

Bases: ABC

Append-only evidence repository exposed to agent-facing code.

append(evidence) abstractmethod

Append immutable evidence and return the stored artifact.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def append(self, evidence: AgentEvidence) -> AgentEvidence:
    """Append immutable evidence and return the stored artifact."""
    ...

get(evidence_id) abstractmethod

Return one evidence artifact by id.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def get(self, evidence_id: str) -> AgentEvidence:
    """Return one evidence artifact by id."""
    ...

list_by_state(state_id) abstractmethod

Return evidence artifacts captured for an agent state.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def list_by_state(self, state_id: str) -> Sequence[AgentEvidence]:
    """Return evidence artifacts captured for an agent state."""
    ...

list_by_manifest_ref(manifest_ref) abstractmethod

Return evidence artifacts associated with a context manifest.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def list_by_manifest_ref(self, manifest_ref: str) -> Sequence[AgentEvidence]:
    """Return evidence artifacts associated with a context manifest."""
    ...

IAgentSignalRepository

Bases: ABC

Durable inbound queue repository for agent signals.

append(signal) abstractmethod

Append a signal to the inbound queue and return the stored entry.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def append(self, signal: AgentSignal) -> AgentSignal:
    """Append a signal to the inbound queue and return the stored entry."""
    ...

list_pending(state_id) abstractmethod

Return unconsumed signals for an agent state in append/queue order.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def list_pending(self, state_id: str) -> Sequence[AgentSignal]:
    """Return unconsumed signals for an agent state in append/queue order."""
    ...

mark_consumed(signal_id) abstractmethod

Mark one queued signal as consumed after a non-blocking poll.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def mark_consumed(self, signal_id: str) -> AgentSignal:
    """Mark one queued signal as consumed after a non-blocking poll."""
    ...

IAgentStateRepository

Bases: ABC

Materialized state repository for long-running agent executions.

get(state_id) abstractmethod

Return a persisted agent state by id.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def get(self, state_id: str) -> AgentState:
    """Return a persisted agent state by id."""
    ...

get_or_none(state_id) abstractmethod

Return a persisted agent state by id, or None when absent.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def get_or_none(self, state_id: str) -> AgentState | None:
    """Return a persisted agent state by id, or None when absent."""
    ...

save(state) abstractmethod

Persist the materialized state and return the saved snapshot.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def save(self, state: AgentState) -> AgentState:
    """Persist the materialized state and return the saved snapshot."""
    ...

list_by_status(status) abstractmethod

Return states matching an externally observable lifecycle status.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def list_by_status(self, status: AgentStatus) -> Sequence[AgentState]:
    """Return states matching an externally observable lifecycle status."""
    ...

list_resume_candidates() abstractmethod

Return active or interrupted states that may resume after restart.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def list_resume_candidates(self) -> Sequence[AgentState]:
    """Return active or interrupted states that may resume after restart."""
    ...

IAgentModel

Bases: ABC

Outbound model adapter port owned by spakky-agent core.

complete(request) abstractmethod async

Return a complete model response for the request.

Source code in core/spakky-agent/src/spakky/agent/interfaces/model.py
@abstractmethod
async def complete(self, request: ModelRequest) -> ModelResponse:
    """Return a complete model response for the request."""
    ...

stream(request) abstractmethod

Return provider-neutral stream events for the request.

Source code in core/spakky-agent/src/spakky/agent/interfaces/model.py
@abstractmethod
def stream(self, request: ModelRequest) -> AsyncIterator[ModelStreamEvent]:
    """Return provider-neutral stream events for the request."""
    ...

JsonSchemaConstraint(schema, strict=True) dataclass

JSON schema constraint shared by structured output and tool calling.

ModelError(code, message, retryable=False, metadata=dict()) dataclass

Provider-neutral model failure payload.

ModelMessage(role, content, metadata=dict()) dataclass

Provider-neutral model message.

ModelMessageRole

Bases: StrEnum

Roles accepted by provider-neutral model messages.

ModelRequest(messages, context=tuple(), context_manifest=None, context_digest=None, structured_output=None, tool_calling=None, sampling=SamplingOptions(), streaming=StreamingOptions(), metadata=dict()) dataclass

Provider-neutral request passed to an agent model adapter.

assemble_messages(policy=None)

Assemble prompt messages from typed context packs without concatenation.

Source code in core/spakky-agent/src/spakky/agent/interfaces/model.py
def assemble_messages(
    self,
    policy: ContextExposurePolicy | None = None,
) -> tuple[ModelMessage, ...]:
    """Assemble prompt messages from typed context packs without concatenation."""
    exposure_policy = policy or ContextExposurePolicy()
    context_messages = tuple(
        ModelMessage(
            role=ModelMessageRole.EVIDENCE,
            content=pack.guarded_content(exposure_policy),
            metadata=pack.message_metadata(exposure_policy),
        )
        for pack in self.context
    )
    return (*self.messages, *context_messages)

ModelResponse(content, structured_output=None, tool_calls=tuple(), usage=ModelUsage(), metadata=dict()) dataclass

Provider-neutral non-streaming model response.

guarded(sensitive_fields, policy=None)

Return a copy with sensitive output payloads deterministically guarded.

Source code in core/spakky-agent/src/spakky/agent/interfaces/model.py
def guarded(
    self,
    sensitive_fields: Sequence[SensitiveFieldDescriptor],
    policy: EvidenceExposurePolicy | None = None,
) -> "ModelResponse":
    """Return a copy with sensitive output payloads deterministically guarded."""
    exposure_policy = policy or EvidenceExposurePolicy()
    content = self.content
    structured_output = self.structured_output
    content_descriptors = tuple(
        descriptor
        for descriptor in sensitive_fields
        if descriptor.path in ((), ("content",))
    )
    if content_descriptors:
        guarded_content = guard_json_value(
            {"content": content},
            tuple(
                SensitiveFieldDescriptor(("content",), descriptor.field)
                for descriptor in content_descriptors
            ),
            exposure_policy,
        )
        content_value = cast(Mapping[str, JsonValue], guarded_content).get(
            "content"
        )
        if isinstance(content_value, str):
            content = content_value
        else:
            content = "[REDACTED]"
    structured_output_descriptors = tuple(
        descriptor
        for descriptor in sensitive_fields
        if descriptor.path not in ((), ("content",))
    )
    if structured_output_descriptors:
        structured_output = guard_json_value(
            structured_output,
            structured_output_descriptors,
            exposure_policy,
        )
    return replace(self, content=content, structured_output=structured_output)

ModelStreamEvent(kind, token_delta=None, tool_call=None, structured_output=None, error=None, usage=None, metadata=dict()) dataclass

Provider-neutral model streaming event.

guarded(sensitive_fields, policy=None)

Return a copy with sensitive streaming payloads guarded.

Source code in core/spakky-agent/src/spakky/agent/interfaces/model.py
def guarded(
    self,
    sensitive_fields: Sequence[SensitiveFieldDescriptor],
    policy: EvidenceExposurePolicy | None = None,
) -> "ModelStreamEvent":
    """Return a copy with sensitive streaming payloads guarded."""
    exposure_policy = policy or EvidenceExposurePolicy()
    token_delta = self.token_delta
    structured_output = self.structured_output
    tool_call = self.tool_call
    if token_delta is not None:
        token_descriptors = tuple(
            descriptor
            for descriptor in sensitive_fields
            if descriptor.path in ((), ("token_delta",))
        )
        if token_descriptors:
            guarded_token = guard_json_value(
                {"token_delta": token_delta},
                tuple(
                    SensitiveFieldDescriptor(("token_delta",), descriptor.field)
                    for descriptor in token_descriptors
                ),
                exposure_policy,
            )
            token_value = cast(Mapping[str, JsonValue], guarded_token).get(
                "token_delta"
            )
            if isinstance(token_value, str):
                token_delta = token_value
            else:
                token_delta = "[REDACTED]"
    structured_descriptors = tuple(
        descriptor
        for descriptor in sensitive_fields
        if descriptor.path not in ((), ("token_delta",))
    )
    if structured_descriptors:
        structured_output = guard_json_value(
            structured_output,
            structured_descriptors,
            exposure_policy,
        )
    if tool_call is not None:
        tool_call = tool_call.guarded(sensitive_fields, exposure_policy)
    return replace(
        self,
        token_delta=token_delta,
        structured_output=structured_output,
        tool_call=tool_call,
    )

ModelStreamEventKind

Bases: StrEnum

Provider-neutral streaming event kinds emitted by a model adapter.

ModelToolCall(name, arguments, call_id=None, metadata=dict()) dataclass

Tool invocation candidate emitted by a model adapter.

guarded(sensitive_fields, policy=None)

Return a copy with sensitive argument values deterministically guarded.

Source code in core/spakky-agent/src/spakky/agent/interfaces/model.py
def guarded(
    self,
    sensitive_fields: Sequence[SensitiveFieldDescriptor],
    policy: EvidenceExposurePolicy | None = None,
) -> "ModelToolCall":
    """Return a copy with sensitive argument values deterministically guarded."""
    exposure_policy = policy or EvidenceExposurePolicy()
    guarded_arguments = guard_json_value(
        self.arguments,
        sensitive_fields,
        exposure_policy,
    )
    if not isinstance(guarded_arguments, Mapping):
        guarded_arguments = {}
    return replace(self, arguments=guarded_arguments)

ModelToolChoice

Bases: StrEnum

Provider-neutral tool calling strategy requested from a model adapter.

ModelToolSpec(name, parameters, description=None, metadata=dict()) dataclass

LLM-facing tool descriptor normalized by agent tooling.

ModelUsage(input_tokens=None, output_tokens=None, total_tokens=None) dataclass

Token accounting reported by a model adapter.

SamplingOptions(temperature=None, top_p=None, max_tokens=None) dataclass

Portable model sampling options.

StreamingOptions(include_usage=True, include_progress=True) dataclass

Portable model streaming options.

StructuredOutputSpec(constraint, output_type_name=None) dataclass

Structured output contract requested from a model adapter.

ToolCallingSpec(tools, choice=ModelToolChoice.AUTO) dataclass

Tool calling contract requested from a model adapter.

AgentSignal(id, agent_state_id, kind, payload=dict(), created_at=None) dataclass

Inbound stimulus appended for an agent execution.

ApprovalDecision

Bases: StrEnum

Human-in-the-loop approval outcomes.

AgentSignalConsumptionBatch(state_id, poll_point, signals) dataclass

Signals consumed during one non-blocking poll.

consumed_count property

Return the number of signals consumed by this poll.

AgentSignalPollPoint

Bases: StrEnum

Places where orchestration may poll durable signals without waiting.

ContextExposurePolicy(include_pii_values=False, include_sensitive_values=False, include_sensitive_schema_metadata=False, include_sensitive_context_metadata=False) dataclass

Policy for LLM-facing context and schema metadata exposure.

can_expose_value(field)

Return whether a sensitive value may cross the model boundary.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def can_expose_value(self, field: "SensitiveField | SecretField") -> bool:
    """Return whether a sensitive value may cross the model boundary."""
    if isinstance(field, SecretField):
        return False
    if field.sensitivity == DataSensitivity.PII:
        return self.include_pii_values
    if field.sensitivity in (DataSensitivity.CONFIDENTIAL, DataSensitivity.SECRET):
        return self.include_sensitive_values
    return True

CredentialRef(id, provider=None) dataclass

Reference to a credential outside LLM-facing context.

__post_init__()

Reject blank credential references.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def __post_init__(self) -> None:
    """Reject blank credential references."""
    _require_non_blank(self.id, "Credential reference id")
    if self.provider is not None:
        _require_non_blank(self.provider, "Credential provider")

DataSensitivity

Bases: StrEnum

Canonical sensitivity classes carried by descriptors.

EvidenceExposurePolicy(include_pii_values=False, include_sensitive_values=False, include_sensitive_metadata=True) dataclass

Policy for evidence payload exposure before append-only capture.

can_expose_value(field)

Return whether a sensitive value may be stored in evidence payloads.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def can_expose_value(self, field: "SensitiveField | SecretField") -> bool:
    """Return whether a sensitive value may be stored in evidence payloads."""
    if isinstance(field, SecretField):
        return False
    if field.sensitivity == DataSensitivity.PII:
        return self.include_pii_values
    if field.sensitivity in (DataSensitivity.CONFIDENTIAL, DataSensitivity.SECRET):
        return self.include_sensitive_values
    return True

MaskingPolicy

Bases: StrEnum

Deterministic text masking strategies for sensitive values.

PII

Bases: StrEnum

PII categories that can be declared with typing.Annotated.

RedactionPolicy

Bases: StrEnum

Boundary action used when a value must not be exposed.

SecretField(redaction=RedactionPolicy.REFERENCE_ONLY, label=None, metadata=dict()) dataclass

typing.Annotated metadata for values that must never be model text.

sensitivity property

Return the normalized sensitivity class.

__post_init__()

Reject blank secret field labels.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def __post_init__(self) -> None:
    """Reject blank secret field labels."""
    if self.label is not None:
        _require_non_blank(self.label, "Secret field label")

guard_text(value)

Return deterministic replacement text for a secret value.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def guard_text(self, value: str) -> str:
    """Return deterministic replacement text for a secret value."""
    return SECRET_VALUE

to_metadata()

Serialize marker metadata without including the secret value.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def to_metadata(self) -> JsonObject:
    """Serialize marker metadata without including the secret value."""
    payload: dict[str, JsonValue] = {
        "kind": "secret",
        "sensitivity": DataSensitivity.SECRET.value,
        "redaction": self.redaction.value,
    }
    if self.label is not None:
        payload["label"] = self.label
    if self.metadata:
        payload["metadata"] = self.metadata
    return payload

SecretRef(id, credential=None) dataclass

Opaque reference to a secret value stored outside model context.

__post_init__()

Reject blank secret references.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def __post_init__(self) -> None:
    """Reject blank secret references."""
    _require_non_blank(self.id, "Secret reference id")

SensitiveField(category, masking=MaskingPolicy.REDACT, redaction=RedactionPolicy.REDACT, label=None, metadata=dict()) dataclass

typing.Annotated metadata for deterministic sensitive-field handling.

sensitivity property

Return the normalized sensitivity class.

category_name property

Return the stable public category name.

__post_init__()

Reject blank sensitive field labels.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def __post_init__(self) -> None:
    """Reject blank sensitive field labels."""
    if self.label is not None:
        _require_non_blank(self.label, "Sensitive field label")

guard_text(value)

Return deterministic model/evidence-safe text for this field.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def guard_text(self, value: str) -> str:
    """Return deterministic model/evidence-safe text for this field."""
    return _mask_text(value, self.masking, self.category_name)

to_metadata()

Serialize marker metadata without including the sensitive value.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def to_metadata(self) -> JsonObject:
    """Serialize marker metadata without including the sensitive value."""
    payload: dict[str, JsonValue] = {
        "kind": "sensitive",
        "category": self.category_name,
        "sensitivity": self.sensitivity.value,
        "masking": self.masking.value,
        "redaction": self.redaction.value,
    }
    if self.label is not None:
        payload["label"] = self.label
    if self.metadata:
        payload["metadata"] = self.metadata
    return payload

SensitiveFieldDescriptor(path, field) dataclass

Path-bound sensitive metadata extracted from Annotated types.

to_metadata()

Serialize descriptor metadata without leaking the field value.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def to_metadata(self) -> JsonObject:
    """Serialize descriptor metadata without leaking the field value."""
    return {
        "path": list(self.path),
        "field": self.field.to_metadata(),
    }

StreamingGuardFailureMode

Bases: StrEnum

Final audit behavior when a streaming guard missed a raw candidate.

StreamingRedactionAudit(status, detected_count, redacted_count, missed_matches, buffer_size, emitted_char_count, original_char_count) dataclass

Final aggregate audit for a bounded streaming redaction session.

missed_count property

Return the number of raw candidates still present after streaming.

to_evidence_payload()

Serialize audit evidence without raw streamed content.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def to_evidence_payload(self) -> JsonObject:
    """Serialize audit evidence without raw streamed content."""
    return {
        "kind": "streaming_redaction_audit",
        "status": self.status.value,
        "detected_count": self.detected_count,
        "redacted_count": self.redacted_count,
        "missed_count": self.missed_count,
        "missed_matches": tuple(
            match.to_payload() for match in self.missed_matches
        ),
        "buffer_size": self.buffer_size,
        "emitted_char_count": self.emitted_char_count,
        "original_char_count": self.original_char_count,
    }

to_error_payload()

Serialize a typed error payload for stream consumers.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def to_error_payload(self) -> JsonObject:
    """Serialize a typed error payload for stream consumers."""
    return {
        "code": "streaming_redaction_audit_failed",
        "message": "Streaming redaction final audit detected unmasked candidates",
        "retryable": False,
        "metadata": self.to_evidence_payload(),
    }

StreamingRedactionAuditStatus

Bases: StrEnum

Final aggregate streaming redaction audit status.

StreamingRedactionMatch(pattern_name, start, end, metadata=dict()) dataclass

Sanitized final-audit match location for a missed redaction candidate.

to_payload()

Serialize a match without the sensitive value itself.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def to_payload(self) -> JsonObject:
    """Serialize a match without the sensitive value itself."""
    payload: dict[str, JsonValue] = {
        "pattern_name": self.pattern_name,
        "start": self.start,
        "end": self.end,
    }
    if self.metadata:
        payload["metadata"] = self.metadata
    return payload

StreamingRedactionPolicy(patterns, buffer_size=64, emit_chunk_size=None, failure_mode=StreamingGuardFailureMode.RAISE) dataclass

Bounded buffering policy balancing stream latency and redaction correctness.

__post_init__()

Reject policies that would make the guard unbounded or silent.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def __post_init__(self) -> None:
    """Reject policies that would make the guard unbounded or silent."""
    if self.buffer_size <= 0:
        raise AgentDefinitionError(
            "Streaming redaction buffer size must be positive"
        )
    if self.emit_chunk_size is not None and self.emit_chunk_size <= 0:
        raise AgentDefinitionError(
            "Streaming redaction emit chunk size must be positive"
        )
    if len(self.patterns) == 0:
        raise AgentDefinitionError("Streaming redaction patterns cannot be empty")

StreamingRedactionResult(chunks, audit=None, error=None) dataclass

Output produced by a bounded streaming redaction step.

StreamingRedactionSession(policy)

Stateful bounded redactor for model token streams.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def __init__(self, policy: StreamingRedactionPolicy) -> None:
    self._policy = policy
    self._patterns = tuple(policy.patterns)
    self._buffer = ""
    self._original_chunks: list[str] = []
    self._emitted_chunks: list[str] = []
    self._redacted_count = 0
    self._closed = False

push(chunk)

Redact one token chunk and emit only the safe bounded prefix.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def push(self, chunk: str) -> StreamingRedactionResult:
    """Redact one token chunk and emit only the safe bounded prefix."""
    self._ensure_open()
    if chunk == "":
        return StreamingRedactionResult(chunks=())
    self._original_chunks.append(chunk)
    self._buffer = self._redact_text(f"{self._buffer}{chunk}")
    chunks = self._emit_safe_prefix()
    return StreamingRedactionResult(chunks=chunks)

finish()

Flush the remaining buffer and run the mandatory final audit.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def finish(self) -> StreamingRedactionResult:
    """Flush the remaining buffer and run the mandatory final audit."""
    self._ensure_open()
    self._closed = True
    redacted_buffer = self._redact_text(self._buffer)
    chunks = self._split_for_emit(redacted_buffer)
    self._buffer = ""
    self._emitted_chunks.extend(chunks)
    audit = self._audit()
    if audit.status == StreamingRedactionAuditStatus.FAILED:
        if self._policy.failure_mode == StreamingGuardFailureMode.RAISE:
            raise AgentOutputGuardError(
                "Streaming redaction final audit detected unmasked candidates"
            )
        return StreamingRedactionResult(
            chunks=chunks,
            audit=audit,
            error=audit.to_error_payload(),
        )
    return StreamingRedactionResult(chunks=chunks, audit=audit)

StreamingSensitivePattern(name, pattern, replacement=REDACTED_VALUE, metadata=dict()) dataclass

Caller-supplied deterministic pattern used by streaming redaction.

__post_init__()

Reject blank names and invalid pattern syntax before streaming starts.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def __post_init__(self) -> None:
    """Reject blank names and invalid pattern syntax before streaming starts."""
    _require_non_blank(self.name, "Streaming sensitive pattern name")
    _require_non_blank(self.pattern, "Streaming sensitive pattern")
    try:
        re.compile(self.pattern)
    except re.error as e:
        raise AgentDefinitionError("Streaming sensitive pattern is invalid") from e

redact(value)

Return redacted text and the number of replacements applied.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def redact(self, value: str) -> tuple[str, int]:
    """Return redacted text and the number of replacements applied."""
    compiled = re.compile(self.pattern)
    return compiled.subn(self.replacement, value)

find_matches(value)

Return sanitized match locations without exposing raw text.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def find_matches(self, value: str) -> tuple["StreamingRedactionMatch", ...]:
    """Return sanitized match locations without exposing raw text."""
    compiled = re.compile(self.pattern)
    return tuple(
        StreamingRedactionMatch(
            pattern_name=self.name,
            start=match.start(),
            end=match.end(),
            metadata=self.metadata,
        )
        for match in compiled.finditer(value)
    )

AgentState(id, agent_type, status, transition=None, reason=None, current_activity=None, input_ref=None, output_ref=None, pending_signal_count=0, last_event_cursor=None, recovery_marker=None, metadata=dict(), created_at=None, updated_at=None) dataclass

Materialized state for a long-running agent execution.

__post_init__()

Reject state snapshots that cannot represent a real queue count.

Source code in core/spakky-agent/src/spakky/agent/state.py
def __post_init__(self) -> None:
    """Reject state snapshots that cannot represent a real queue count."""
    if self.pending_signal_count < 0:
        raise AgentDefinitionError("Agent pending signal count cannot be negative")

AgentStateReason

Bases: StrEnum

Structured reason that refines an externally observable lifecycle state.

AgentStateTransition

Bases: StrEnum

State transition vocabulary accepted by durable agent orchestration.

AgentStatus

Bases: StrEnum

Externally observable lifecycle states for an agent execution.

AgentToolCatalog(descriptors=()) dataclass

Deterministic catalog of descriptors discovered from an Agent class.

__post_init__()

Reject duplicate identity or schema names before model lookup.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject duplicate identity or schema names before model lookup."""
    identity_keys: set[str] = set()
    schema_names: set[str] = set()
    for descriptor in self.descriptors:
        if descriptor.identity.key in identity_keys:
            raise AgentDefinitionError("Agent tool identity must be unique")
        if descriptor.schema.name in schema_names:
            raise AgentDefinitionError("Agent tool schema name must be unique")
        identity_keys.add(descriptor.identity.key)
        schema_names.add(descriptor.schema.name)

by_identity(identity)

Lookup a descriptor by typed identity.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def by_identity(self, identity: AgentToolIdentity) -> AgentToolDescriptor:
    """Lookup a descriptor by typed identity."""
    for descriptor in self.descriptors:
        if descriptor.identity == identity:
            return descriptor
    raise AgentDefinitionError("Agent tool identity is not registered")

by_schema_name(schema_name)

Lookup a descriptor by model-facing schema name.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def by_schema_name(self, schema_name: str) -> AgentToolDescriptor:
    """Lookup a descriptor by model-facing schema name."""
    _require_non_blank(schema_name, "Agent tool schema name")
    for descriptor in self.descriptors:
        if descriptor.schema.name == schema_name:
            return descriptor
    raise AgentDefinitionError("Agent tool schema name is not registered")

AgentToolBoundInvocation(args=(), kwargs=dict()) dataclass

Python-call-ready tool invocation arguments.

AgentToolDefinition(name, schema_name, description=None, metadata=AgentToolMetadata()) dataclass

Method-level metadata attached by @agent_tool before owner discovery.

__post_init__()

Reject definitions that would make catalog lookup ambiguous.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject definitions that would make catalog lookup ambiguous."""
    _require_non_blank(self.name, "Agent tool name")
    _require_non_blank(self.schema_name, "Agent tool schema name")
    if self.description is not None and not self.description.strip():
        raise AgentDefinitionError("Agent tool description cannot be blank")

AgentToolDescriptor(identity, owner, callable, schema, description=None, metadata=AgentToolMetadata()) dataclass

Discovered tool descriptor bound to an owner class and callable.

name property

Return the descriptor-local tool name.

bind_invocation(payload)

Bind a decoded model payload before the tool callable is invoked.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def bind_invocation(self, payload: JsonObject) -> "AgentToolBoundInvocation":
    """Bind a decoded model payload before the tool callable is invoked."""
    return bind_agent_tool_invocation(self.callable, payload)

AgentToolIdentity(owner_module, owner_qualname, name) dataclass

Descriptor identity independent from free-form model output text.

key property

Return a stable key for maps, logs, and evidence metadata.

__post_init__()

Reject identity parts that cannot be serialized or matched.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject identity parts that cannot be serialized or matched."""
    _require_non_blank(self.owner_module, "Agent tool owner module")
    _require_non_blank(self.owner_qualname, "Agent tool owner qualname")
    _require_non_blank(self.name, "Agent tool name")

AgentToolMetadata(permissions=(), effects=ToolEffects(), idempotency=Idempotency.UNKNOWN, data_access=DataAccess.NONE, externality=Externality.LOCAL, timeout=TimeoutPolicy(), result_budget=ResultBudget(), evidence=EvidenceCapture.NONE, approval=ToolApprovalRequirement.DERIVED) dataclass

Typed approval, idempotency, and evidence metadata for a descriptor.

risk property

Return derived risk axes without storing risk as source metadata.

resume property

Return resume metadata derived from stored idempotency.

requires_approval_candidate property

Return whether this tool is a HITL approval candidate.

AgentToolSchemaHandle(name, input_schema_name, output_schema_name, input_schema=dict(), output_schema=dict(), input_sensitive_fields=(), output_sensitive_fields=()) dataclass

Stable schema names and generated JSON schemas owned by a descriptor.

__post_init__()

Reject blank schema handles.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject blank schema handles."""
    _require_non_blank(self.name, "Agent tool schema name")
    _require_non_blank(self.input_schema_name, "Agent tool input schema name")
    _require_non_blank(self.output_schema_name, "Agent tool output schema name")

input_schema_for(policy=None)

Return model-facing input schema under the requested exposure policy.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def input_schema_for(
    self,
    policy: ContextExposurePolicy | None = None,
) -> JsonObject:
    """Return model-facing input schema under the requested exposure policy."""
    return schema_with_sensitive_metadata(
        self.input_schema,
        self.input_sensitive_fields,
        policy or ContextExposurePolicy(),
    )

output_schema_for(policy=None)

Return model-facing output schema under the requested exposure policy.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def output_schema_for(
    self,
    policy: ContextExposurePolicy | None = None,
) -> JsonObject:
    """Return model-facing output schema under the requested exposure policy."""
    return schema_with_sensitive_metadata(
        self.output_schema,
        self.output_sensitive_fields,
        policy or ContextExposurePolicy(),
    )

DataAccess

Bases: StrEnum

Data access level declared by a tool.

EvidenceCapture

Bases: StrEnum

Evidence capture strategy for tool results.

Externality

Bases: StrEnum

External side-effect boundary declared by a tool.

Idempotency

Bases: StrEnum

Action idempotency declared by a tool.

ResultBudget(max_bytes=None) dataclass

Optional result-size budget for model-facing tool output.

__post_init__()

Reject result budgets that cannot constrain output.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject result budgets that cannot constrain output."""
    if self.max_bytes is not None and self.max_bytes <= 0:
        raise AgentDefinitionError("Agent tool result budget must be positive")

TimeoutPolicy(seconds=None) dataclass

Optional timeout boundary for a tool invocation.

__post_init__()

Reject non-positive timeout policies at definition time.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject non-positive timeout policies at definition time."""
    if self.seconds is not None and self.seconds <= 0:
        raise AgentDefinitionError("Agent tool timeout must be positive")

ToolApprovalRequirement

Bases: StrEnum

Human approval requirement at the tool boundary.

ToolEffects(data_access=DataAccess.NONE, externality=Externality.LOCAL, destructive=False, network=False) dataclass

Typed effect metadata used to derive display risk outside core.

read_only() classmethod

Declare a local read-only tool.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
@classmethod
def read_only(cls) -> "ToolEffects":
    """Declare a local read-only tool."""
    return cls(data_access=DataAccess.READ, externality=Externality.LOCAL)

write_state() classmethod

Declare a local state-writing tool.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
@classmethod
def write_state(cls) -> "ToolEffects":
    """Declare a local state-writing tool."""
    return cls(data_access=DataAccess.WRITE, externality=Externality.LOCAL)

external_side_effect() classmethod

Declare a tool that crosses an external side-effect boundary.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
@classmethod
def external_side_effect(cls) -> "ToolEffects":
    """Declare a tool that crosses an external side-effect boundary."""
    return cls(
        data_access=DataAccess.READ_WRITE,
        externality=Externality.EXTERNAL,
        network=True,
    )

destructive_action() classmethod

Declare a tool that may irreversibly mutate local or external state.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
@classmethod
def destructive_action(cls) -> "ToolEffects":
    """Declare a tool that may irreversibly mutate local or external state."""
    return cls(
        data_access=DataAccess.WRITE,
        externality=Externality.EXTERNAL,
        destructive=True,
    )

ToolPermission(name) dataclass

Typed permission marker attached to a tool descriptor.

__post_init__()

Reject permission names that cannot be matched deterministically.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject permission names that cannot be matched deterministically."""
    if not self.name.strip():
        raise AgentDefinitionError("Agent tool permission name cannot be blank")

ToolResumeAction

Bases: StrEnum

Resume action allowed by stored tool idempotency metadata.

ToolResumeMetadata(idempotency=Idempotency.UNKNOWN) dataclass

Stored idempotency metadata used when resuming an incomplete action.

from_metadata(metadata) classmethod

Build resume metadata from a tool descriptor metadata object.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
@classmethod
def from_metadata(cls, metadata: "AgentToolMetadata") -> "ToolResumeMetadata":
    """Build resume metadata from a tool descriptor metadata object."""
    return cls(idempotency=metadata.idempotency)

action_for_completed_boundary()

Return the resume action for an already completed action boundary.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def action_for_completed_boundary(self) -> ToolResumeAction:
    """Return the resume action for an already completed action boundary."""
    return ToolResumeAction.SKIP_COMPLETED

action_for_incomplete_boundary()

Return the resume action for an incomplete action boundary.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def action_for_incomplete_boundary(self) -> ToolResumeAction:
    """Return the resume action for an incomplete action boundary."""
    if self.idempotency in (
        Idempotency.IDEMPOTENT,
        Idempotency.CONDITIONALLY_IDEMPOTENT,
    ):
        return ToolResumeAction.RETRY
    return ToolResumeAction.REQUIRE_APPROVAL

ToolRisk(axes=()) dataclass

Derived typed risk axes for policy and evidence annotations.

requires_approval_candidate property

Return whether the risk is strong enough to suggest HITL approval.

__post_init__()

Reject duplicate axes so risk comparisons stay deterministic.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject duplicate axes so risk comparisons stay deterministic."""
    seen: set[ToolRiskAxis] = set()
    for axis in self.axes:
        if axis in seen:
            raise AgentDefinitionError("Agent tool risk axes must be unique")
        seen.add(axis)

from_metadata(metadata) classmethod

Derive risk axes from source tool metadata.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
@classmethod
def from_metadata(cls, metadata: AgentToolMetadata) -> "ToolRisk":
    """Derive risk axes from source tool metadata."""
    axes: list[ToolRiskAxis] = []
    if metadata.data_access in (DataAccess.READ, DataAccess.READ_WRITE):
        axes.append(ToolRiskAxis.READ)
    if metadata.data_access in (DataAccess.WRITE, DataAccess.READ_WRITE):
        axes.append(ToolRiskAxis.WRITE)
    if metadata.data_access in (DataAccess.WRITE, DataAccess.READ_WRITE):
        axes.append(ToolRiskAxis.SIDE_EFFECT)
    if metadata.externality in (Externality.EXTERNAL, Externality.UNKNOWN):
        axes.append(ToolRiskAxis.SIDE_EFFECT)
    if metadata.effects.destructive:
        axes.append(ToolRiskAxis.DESTRUCTIVE)
    if metadata.effects.network or metadata.externality == Externality.EXTERNAL:
        axes.append(ToolRiskAxis.NETWORK)
    return cls(axes=_deduplicate_axes(axes))

includes(axis)

Return whether this risk contains the requested axis.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def includes(self, axis: ToolRiskAxis) -> bool:
    """Return whether this risk contains the requested axis."""
    return axis in self.axes

ToolRiskAxis

Bases: StrEnum

Derived risk axes exposed for policy and UI decisions.

AgentYield(kind, payload) dataclass

Typed stream item returned from an agent execute generator.

AgentYieldKind

Bases: StrEnum

Canonical public vocabulary yielded by agent execution.

Approval(id, prompt, allowed_decisions, metadata) dataclass

Approval request surfaced to the inbound adapter.

Cancel(reason=None, requested_by=None, metadata=dict()) dataclass

Cancellation acknowledgement surfaced to the caller.

Error(code, message, retryable=False, metadata=dict()) dataclass

Recoverable or terminal execution error surfaced to the caller.

Evidence(evidence, metadata=dict()) dataclass

Evidence item surfaced to the inbound adapter.

Final(output, metadata) dataclass

Final output carried by a generator stream.

Progress(message, current_step=None, metadata=dict()) dataclass

Agent progress update intended for direct inbound adapter consumption.

Token(text, metadata=dict()) dataclass

Incremental model token intended for streaming clients.

Tool(name, call_id=None, arguments=dict(), result=None, metadata=dict()) dataclass

Tool call or tool result surfaced by agent execution.

begin_agent_cancellation(state, signal)

Materialize receipt of a CANCEL signal as CANCELLING state.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def begin_agent_cancellation(
    state: AgentState,
    signal: AgentSignal,
) -> AgentState:
    """Materialize receipt of a CANCEL signal as CANCELLING state."""
    _require_cancel_signal(signal)
    _require_signal_matches_state(state, signal)
    return replace(
        state,
        status=AgentStatus.CANCELLING,
        transition=AgentStateTransition.CANCELLING,
        reason=AgentStateReason.CANCELLATION_REQUESTED,
        metadata={
            **state.metadata,
            "cancellation_signal_id": signal.id,
            "cancellation_reason": _optional_string(signal.payload, "reason"),
            "cancellation_requested_by": _optional_string(
                signal.payload,
                "requested_by",
            ),
        },
    )

complete_agent_cancellation(state, report)

Resolve CANCELLING state into CANCELLED or FAILED cleanup outcome.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def complete_agent_cancellation(
    state: AgentState,
    report: AgentCancellationCleanupReport,
) -> AgentState:
    """Resolve CANCELLING state into CANCELLED or FAILED cleanup outcome."""
    if state.id != report.state_id:
        raise AgentDefinitionError("Agent cancellation report state id mismatch")
    metadata = {**state.metadata, "cancellation_cleanup": report.to_payload()}
    if report.cleanup_succeeded:
        return replace(
            state,
            status=AgentStatus.CANCELLED,
            transition=AgentStateTransition.CANCELLED,
            reason=AgentStateReason.CANCELLATION_REQUESTED,
            metadata=metadata,
        )
    return replace(
        state,
        status=AgentStatus.FAILED,
        transition=AgentStateTransition.FAILED,
        reason=AgentStateReason.CANCELLATION_CLEANUP_FAILED,
        metadata=metadata,
    )

run_agent_cancellation_cleanup(*, state, signal, tasks) async

Invoke model stream/tool/delegate cleanup hooks for a CANCEL signal.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
async def run_agent_cancellation_cleanup(
    *,
    state: AgentState,
    signal: AgentSignal,
    tasks: Sequence[AgentCancellationCleanupTask],
) -> AgentCancellationCleanupReport:
    """Invoke model stream/tool/delegate cleanup hooks for a CANCEL signal."""
    _require_cancel_signal(signal)
    outcomes: list[AgentCancellationCleanupResult] = []
    for task in tasks:
        outcomes.append(await task.run(state=state, signal=signal))
    return AgentCancellationCleanupReport(
        state_id=state.id,
        signal_id=signal.id,
        outcomes=tuple(outcomes),
    )

materialize_agent_approval_decision_state(current, outcome)

Apply a typed approval decision outcome to an existing state snapshot.

Source code in core/spakky-agent/src/spakky/agent/approval.py
def materialize_agent_approval_decision_state(
    current: AgentState,
    outcome: AgentApprovalDecisionOutcome,
) -> AgentState:
    """Apply a typed approval decision outcome to an existing state snapshot."""
    return AgentState(
        id=current.id,
        agent_type=current.agent_type,
        status=outcome.status,
        transition=outcome.transition,
        reason=outcome.reason,
        current_activity=_activity_for_decision(outcome),
        input_ref=current.input_ref,
        output_ref=current.output_ref,
        pending_signal_count=current.pending_signal_count,
        last_event_cursor=current.last_event_cursor,
        recovery_marker=current.recovery_marker,
        metadata={
            **current.metadata,
            "approval_decision": {
                "request_id": outcome.request_id,
                "decision": outcome.decision.value,
                "modified_payload": outcome.modified_payload,
                "comment": outcome.comment,
            },
        },
        created_at=current.created_at,
        updated_at=current.updated_at,
    )

parse_agent_approval_decision_signal(signal, *, request=None)

Parse an approval decision signal into a typed workflow outcome.

Source code in core/spakky-agent/src/spakky/agent/approval.py
def parse_agent_approval_decision_signal(
    signal: AgentSignal,
    *,
    request: AgentApprovalRequest | None = None,
) -> AgentApprovalDecisionOutcome:
    """Parse an approval decision signal into a typed workflow outcome."""
    if signal.kind != AgentSignalKind.APPROVAL_DECISION:
        raise AgentDefinitionError(
            "Agent approval signal kind must be approval_decision"
        )

    request_id = _require_payload_text(signal.payload, "request_id")
    if request is not None and request_id != request.id:
        raise AgentDefinitionError("Agent approval signal request id does not match")
    if request is not None and signal.agent_state_id != request.agent_state_id:
        raise AgentDefinitionError("Agent approval signal state id does not match")

    decision = _parse_approval_decision(
        _require_payload_text(signal.payload, "decision")
    )
    modified_payload = _payload_object(signal.payload.get("modified_payload", {}))
    comment = _optional_payload_text(signal.payload, "comment")
    status, transition, reason = _decision_state_target(decision)

    return AgentApprovalDecisionOutcome(
        request_id=request_id,
        decision=decision,
        status=status,
        transition=transition,
        reason=reason,
        modified_payload=modified_payload,
        comment=comment,
    )

plan_agent_tool_approval(*, descriptor, approval_id, agent_state_id, agent_type, prompt=None, call_id=None, metadata=None)

Plan whether a tool invocation should proceed or wait for approval.

Source code in core/spakky-agent/src/spakky/agent/approval.py
def plan_agent_tool_approval(
    *,
    descriptor: AgentToolDescriptor,
    approval_id: str,
    agent_state_id: str,
    agent_type: str,
    prompt: str | None = None,
    call_id: str | None = None,
    metadata: JsonObject | None = None,
) -> AgentApprovalPlan:
    """Plan whether a tool invocation should proceed or wait for approval."""
    if not descriptor.metadata.requires_approval_candidate:
        return AgentApprovalPlan(action=AgentApprovalPlanAction.PROCEED)

    request = AgentApprovalRequest.from_tool_descriptor(
        approval_id=approval_id,
        agent_state_id=agent_state_id,
        descriptor=descriptor,
        prompt=prompt,
        call_id=call_id,
        metadata=metadata,
    )
    return AgentApprovalPlan(
        action=AgentApprovalPlanAction.WAIT_FOR_APPROVAL,
        request=request,
        state=request.to_state(agent_type=agent_type),
        yield_item=request.to_yield(),
    )

plan_agent_resume(state, evidence, signals=())

Restore the next resume action using only persisted state and evidence.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
def plan_agent_resume(
    state: AgentState,
    evidence: Sequence[AgentEvidence],
    signals: Sequence[AgentSignal] = (),
) -> AgentResumePlan:
    """Restore the next resume action using only persisted state and evidence."""
    persisted_signals = tuple(signals)
    if state.status in (
        AgentStatus.COMPLETED,
        AgentStatus.FAILED,
        AgentStatus.CANCELLED,
    ):
        return AgentResumePlan(
            state=state,
            action=AgentResumeAction.NOT_RESUMABLE,
            signals=persisted_signals,
        )

    boundary = _last_action_boundary(evidence)
    if boundary is None:
        return AgentResumePlan(
            state=state,
            action=AgentResumeAction.START,
            signals=persisted_signals,
        )

    if boundary.stage is AgentActionBoundaryStage.AFTER:
        return AgentResumePlan(
            state=replace(state, recovery_marker=boundary.action_id),
            action=AgentResumeAction.SKIP_COMPLETED,
            boundary=boundary,
            signals=persisted_signals,
        )

    if boundary.action_kind is AgentActionKind.APPROVAL_WAIT:
        return AgentResumePlan(
            state=_interrupt_for_human_recovery(state, boundary),
            action=AgentResumeAction.REQUIRE_HITL,
            boundary=boundary,
            signals=persisted_signals,
        )

    resume_action = ToolResumeMetadata(
        idempotency=boundary.idempotency,
    ).action_for_incomplete_boundary()
    if resume_action is ToolResumeAction.RETRY:
        return AgentResumePlan(
            state=replace(state, recovery_marker=boundary.action_id),
            action=AgentResumeAction.RETRY,
            boundary=boundary,
            signals=persisted_signals,
        )
    return AgentResumePlan(
        state=_interrupt_for_human_recovery(state, boundary),
        action=AgentResumeAction.REQUIRE_HITL,
        boundary=boundary,
        signals=persisted_signals,
    )

consume_pending_agent_signals(repository, state_id, *, poll_point=AgentSignalPollPoint.SAFE_BOUNDARY, accepted_signals=None, max_signals=None)

Consume currently pending signals without sleeping or waiting for new input.

The repository remains responsible for durable queue ordering. This helper consumes only the eligible prefix of the pending queue so later signals never overtake an earlier unaccepted signal.

Source code in core/spakky-agent/src/spakky/agent/signal_consumption.py
def consume_pending_agent_signals(
    repository: IAgentSignalRepository,
    state_id: str,
    *,
    poll_point: AgentSignalPollPoint = AgentSignalPollPoint.SAFE_BOUNDARY,
    accepted_signals: Sequence[AgentSignalKind] | None = None,
    max_signals: int | None = None,
) -> AgentSignalConsumptionBatch:
    """Consume currently pending signals without sleeping or waiting for new input.

    The repository remains responsible for durable queue ordering. This helper
    consumes only the eligible prefix of the pending queue so later signals never
    overtake an earlier unaccepted signal.
    """
    if not state_id.strip():
        raise AgentDefinitionError("Agent signal state id cannot be blank")
    if max_signals is not None and max_signals <= 0:
        raise AgentDefinitionError("Agent signal max_signals must be positive")

    accepted = frozenset(accepted_signals) if accepted_signals is not None else None
    pending = tuple(repository.list_pending(state_id))
    selected = _select_consumable_prefix(
        pending,
        accepted_signals=accepted,
        max_signals=max_signals,
    )
    consumed = tuple(repository.mark_consumed(signal.id) for signal in selected)

    return AgentSignalConsumptionBatch(
        state_id=state_id,
        poll_point=poll_point,
        signals=consumed,
    )

agent_tool(*, name=None, schema_name=None, description=None, permissions=(), effects=None, idempotency=Idempotency.UNKNOWN, data_access=None, externality=None, timeout=None, result_budget=None, evidence=EvidenceCapture.NONE, approval=ToolApprovalRequirement.DERIVED)

Attach typed agent-tool metadata to a method object.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def agent_tool(
    *,
    name: str | None = None,
    schema_name: str | None = None,
    description: str | None = None,
    permissions: Sequence[ToolPermission] = (),
    effects: ToolEffects | None = None,
    idempotency: Idempotency = Idempotency.UNKNOWN,
    data_access: DataAccess | None = None,
    externality: Externality | None = None,
    timeout: TimeoutPolicy | None = None,
    result_budget: ResultBudget | None = None,
    evidence: EvidenceCapture = EvidenceCapture.NONE,
    approval: ToolApprovalRequirement = ToolApprovalRequirement.DERIVED,
) -> Callable[[FunctionType], FunctionType]:
    """Attach typed agent-tool metadata to a method object."""

    def decorate(function: FunctionType) -> FunctionType:
        tool_name = _normalize_name(name, function.__name__, "Agent tool name")
        normalized_schema_name = _normalize_name(
            schema_name,
            tool_name,
            "Agent tool schema name",
        )
        tool_effects = effects or ToolEffects()
        metadata = AgentToolMetadata(
            permissions=tuple(permissions),
            effects=tool_effects,
            idempotency=idempotency,
            data_access=data_access or tool_effects.data_access,
            externality=externality or tool_effects.externality,
            timeout=timeout or TimeoutPolicy(),
            result_budget=result_budget or ResultBudget(),
            evidence=evidence,
            approval=approval,
        )
        definition = AgentToolDefinition(
            name=tool_name,
            schema_name=normalized_schema_name,
            description=description,
            metadata=metadata,
        )
        function.__dict__[AGENT_TOOL_DEFINITION_KEY] = definition
        return function

    return decorate

bind_agent_tool_invocation(function, payload)

Bind a structured model payload to a tool function signature.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def bind_agent_tool_invocation(
    function: AgentToolCallable,
    payload: JsonObject,
) -> AgentToolBoundInvocation:
    """Bind a structured model payload to a tool function signature."""
    invocation = _parse_invocation_payload(payload)
    try:
        function_signature = signature(function)
    except (TypeError, ValueError) as e:
        raise AgentToolBindingError(
            "Agent tool callable signature cannot be inspected"
        ) from e
    call_signature = _drop_owner_parameter(function_signature)
    try:
        bound = call_signature.bind(*invocation.args, **invocation.kwargs)
    except TypeError as e:
        raise AgentToolBindingError(
            "Agent tool invocation payload cannot be bound"
        ) from e
    bound.apply_defaults()
    return AgentToolBoundInvocation(
        args=tuple(bound.args),
        kwargs=dict(bound.kwargs),
    )

discover_agent_tools(owner)

Discover @agent_tool methods in deterministic class-definition order.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def discover_agent_tools(owner: type[object]) -> AgentToolCatalog:
    """Discover @agent_tool methods in deterministic class-definition order."""
    descriptors: list[AgentToolDescriptor] = []
    resolved_member_names: set[str] = set()
    for declaring_owner in owner.__mro__:
        if declaring_owner is object:
            continue
        for member_name, member in vars(declaring_owner).items():
            if member_name in resolved_member_names:
                continue
            resolved_member_names.add(member_name)
            function = _unwrap_function(member)
            if function is None:
                continue
            definition = get_agent_tool_definition(function)
            if definition is None:
                continue
            descriptors.append(
                _build_descriptor(declaring_owner, function, definition),
            )
    ordered = tuple(
        sorted(
            descriptors,
            key=lambda descriptor: descriptor.identity.key,
        ),
    )
    return AgentToolCatalog(descriptors=ordered)

실행

Agent execution metadata contracts.

RecoveryStrategy

Bases: StrEnum

Recovery strategy requested by an agent execution contract.

StreamingExposureMode

Bases: StrEnum

Streaming output guard profile exposed by agent execution.

AgentSignalKind

Bases: StrEnum

Inbound stimulus kinds that an agent may accept while running.

AgentExecutionLimits(timeout_seconds=None) dataclass

Bounded execution limits declared outside infrastructure capabilities.

__post_init__()

Reject limits that would fail later at bootstrap.

Source code in core/spakky-agent/src/spakky/agent/execution.py
def __post_init__(self) -> None:
    """Reject limits that would fail later at bootstrap."""
    if self.timeout_seconds is not None and self.timeout_seconds <= 0:
        raise AgentDefinitionError("Agent timeout limit must be positive")

AgentExecutionSpec(name=None, objective=None, accepted_signals=(), recovery=RecoveryStrategy.NONE, streaming_exposure_mode=StreamingExposureMode.BALANCED, timeout_seconds=None, limits=AgentExecutionLimits(), delegation_allowed=False, metadata=dict()) dataclass

Declarative execution semantics that cannot be inferred from DI alone.

__post_init__()

Reject execution specs that would fail later at bootstrap.

Source code in core/spakky-agent/src/spakky/agent/execution.py
def __post_init__(self) -> None:
    """Reject execution specs that would fail later at bootstrap."""
    if self.name is not None and not self.name.strip():
        raise AgentDefinitionError("Agent name cannot be blank")
    if self.objective is not None and not self.objective.strip():
        raise AgentDefinitionError("Agent objective cannot be blank")
    if self.timeout_seconds is not None and self.timeout_seconds <= 0:
        raise AgentDefinitionError("Agent timeout must be positive")
    if (
        self.timeout_seconds is not None
        and self.limits.timeout_seconds is not None
        and self.timeout_seconds != self.limits.timeout_seconds
    ):
        raise AgentDefinitionError("Agent timeout declarations must match")

Agent(spec=AgentExecutionSpec(), *, name='', scope=Scope.SINGLETON) dataclass

Bases: Pod

UseCase-equivalent Pod stereotype for agentic workflow components.

validate_bootstrap()

Re-run definition validation during application bootstrap.

Source code in core/spakky-agent/src/spakky/agent/execution.py
def validate_bootstrap(self) -> None:
    """Re-run definition validation during application bootstrap."""
    self._validate_execute_contract(self._ensure_agent_class(self.target))

required_persistence_repository_types()

Return repository ports required by this Agent's durable path.

Source code in core/spakky-agent/src/spakky/agent/execution.py
def required_persistence_repository_types(self) -> tuple[type[object], ...]:
    """Return repository ports required by this Agent's durable path."""
    from spakky.agent.interfaces.repository import (
        IAgentEvidenceRepository,
        IAgentSignalRepository,
        IAgentStateRepository,
    )

    if (
        self.spec.recovery is RecoveryStrategy.ACTION_BOUNDARY
        or len(self.spec.accepted_signals) > 0
    ):
        return (
            IAgentStateRepository,
            IAgentSignalRepository,
            IAgentEvidenceRepository,
        )
    return ()

State

Agent lifecycle state contracts.

AgentStatus

Bases: StrEnum

Externally observable lifecycle states for an agent execution.

AgentStateTransition

Bases: StrEnum

State transition vocabulary accepted by durable agent orchestration.

AgentStateReason

Bases: StrEnum

Structured reason that refines an externally observable lifecycle state.

AgentState(id, agent_type, status, transition=None, reason=None, current_activity=None, input_ref=None, output_ref=None, pending_signal_count=0, last_event_cursor=None, recovery_marker=None, metadata=dict(), created_at=None, updated_at=None) dataclass

Materialized state for a long-running agent execution.

__post_init__()

Reject state snapshots that cannot represent a real queue count.

Source code in core/spakky-agent/src/spakky/agent/state.py
def __post_init__(self) -> None:
    """Reject state snapshots that cannot represent a real queue count."""
    if self.pending_signal_count < 0:
        raise AgentDefinitionError("Agent pending signal count cannot be negative")

Signal

Agent signal contracts.

ApprovalDecision

Bases: StrEnum

Human-in-the-loop approval outcomes.

AgentSignal(id, agent_state_id, kind, payload=dict(), created_at=None) dataclass

Inbound stimulus appended for an agent execution.

Non-blocking consumption helpers for durable agent signals.

AgentSignalPollPoint

Bases: StrEnum

Places where orchestration may poll durable signals without waiting.

AgentSignalConsumptionBatch(state_id, poll_point, signals) dataclass

Signals consumed during one non-blocking poll.

consumed_count property

Return the number of signals consumed by this poll.

consume_pending_agent_signals(repository, state_id, *, poll_point=AgentSignalPollPoint.SAFE_BOUNDARY, accepted_signals=None, max_signals=None)

Consume currently pending signals without sleeping or waiting for new input.

The repository remains responsible for durable queue ordering. This helper consumes only the eligible prefix of the pending queue so later signals never overtake an earlier unaccepted signal.

Source code in core/spakky-agent/src/spakky/agent/signal_consumption.py
def consume_pending_agent_signals(
    repository: IAgentSignalRepository,
    state_id: str,
    *,
    poll_point: AgentSignalPollPoint = AgentSignalPollPoint.SAFE_BOUNDARY,
    accepted_signals: Sequence[AgentSignalKind] | None = None,
    max_signals: int | None = None,
) -> AgentSignalConsumptionBatch:
    """Consume currently pending signals without sleeping or waiting for new input.

    The repository remains responsible for durable queue ordering. This helper
    consumes only the eligible prefix of the pending queue so later signals never
    overtake an earlier unaccepted signal.
    """
    if not state_id.strip():
        raise AgentDefinitionError("Agent signal state id cannot be blank")
    if max_signals is not None and max_signals <= 0:
        raise AgentDefinitionError("Agent signal max_signals must be positive")

    accepted = frozenset(accepted_signals) if accepted_signals is not None else None
    pending = tuple(repository.list_pending(state_id))
    selected = _select_consumable_prefix(
        pending,
        accepted_signals=accepted,
        max_signals=max_signals,
    )
    consumed = tuple(repository.mark_consumed(signal.id) for signal in selected)

    return AgentSignalConsumptionBatch(
        state_id=state_id,
        poll_point=poll_point,
        signals=consumed,
    )

Evidence

Agent evidence contracts.

AgentEvidenceKind

Bases: StrEnum

Kinds of append-only evidence captured by agent execution.

AgentEvidence(id, agent_state_id, kind, payload=dict(), summary=None, digest=None, manifest_ref=None, reference=None, sensitive_fields=(), created_at=None) dataclass

Append-only artifact captured during an agent execution.

AgentEvidenceCandidate(kind, payload=dict(), summary=None, digest=None, manifest_ref=None, reference=None, sensitive_fields=()) dataclass

Append-only evidence candidate before repository id assignment.

tool_result(*, tool_identity, tool_schema_name, result, capture, sensitive_fields=(), exposure_policy=None, summary=None) classmethod

Create evidence metadata for a captured tool result.

Source code in core/spakky-agent/src/spakky/agent/evidence.py
@classmethod
def tool_result(
    cls,
    *,
    tool_identity: str,
    tool_schema_name: str,
    result: JsonObject,
    capture: EvidenceCapture,
    sensitive_fields: tuple[SensitiveFieldDescriptor, ...] = (),
    exposure_policy: EvidenceExposurePolicy | None = None,
    summary: str | None = None,
) -> "AgentEvidenceCandidate":
    """Create evidence metadata for a captured tool result."""
    _require_non_blank(tool_identity, "Agent tool identity")
    _require_non_blank(tool_schema_name, "Agent tool schema name")
    policy = exposure_policy or EvidenceExposurePolicy()
    guarded_result = guard_json_value(result, sensitive_fields, policy)
    if not isinstance(guarded_result, Mapping):
        guarded_result = {}
    return cls(
        kind=AgentEvidenceKind.TOOL,
        payload={
            "tool_identity": tool_identity,
            "tool_schema_name": tool_schema_name,
            "capture": capture.value,
            "result": guarded_result,
        },
        sensitive_fields=sensitive_fields
        if policy.include_sensitive_metadata
        else (),
        summary=summary,
    )

model_decision(*, model, decision, summary=None) classmethod

Create evidence metadata for a model decision.

Source code in core/spakky-agent/src/spakky/agent/evidence.py
@classmethod
def model_decision(
    cls,
    *,
    model: str,
    decision: JsonObject,
    summary: str | None = None,
) -> "AgentEvidenceCandidate":
    """Create evidence metadata for a model decision."""
    _require_non_blank(model, "Agent model name")
    return cls(
        kind=AgentEvidenceKind.MODEL,
        payload={"model": model, "decision": decision},
        summary=summary,
    )

tool_decision(*, tool_identity, decision, summary=None) classmethod

Create evidence metadata for a tool-routing decision.

Source code in core/spakky-agent/src/spakky/agent/evidence.py
@classmethod
def tool_decision(
    cls,
    *,
    tool_identity: str,
    decision: JsonObject,
    summary: str | None = None,
) -> "AgentEvidenceCandidate":
    """Create evidence metadata for a tool-routing decision."""
    _require_non_blank(tool_identity, "Agent tool identity")
    return cls(
        kind=AgentEvidenceKind.TOOL,
        payload={"tool_identity": tool_identity, "decision": decision},
        summary=summary,
    )

context_optimization(*, action, stage, signals=(), summary=None) classmethod

Create before/after evidence for a context optimization action.

Source code in core/spakky-agent/src/spakky/agent/evidence.py
@classmethod
def context_optimization(
    cls,
    *,
    action: ContextOptimizationAction,
    stage: ContextOptimizationEvidenceStage,
    signals: Sequence[ContextHealthSignal] = (),
    summary: str | None = None,
) -> "AgentEvidenceCandidate":
    """Create before/after evidence for a context optimization action."""
    _require_non_blank(action.id, "Context optimization action id")
    return cls(
        kind=AgentEvidenceKind.CONTEXT_OPTIMIZATION,
        payload={
            "stage": stage.value,
            "action": action.evidence_payload(),
            "signals": tuple(signal.evidence_payload() for signal in signals),
        },
        summary=summary,
        digest=action.digest_ref,
        manifest_ref=action.manifest_ref,
        reference=action.result_evidence_ref,
    )

to_evidence(*, evidence_id, agent_state_id, created_at=None)

Assign repository identity while preserving append-only contents.

Source code in core/spakky-agent/src/spakky/agent/evidence.py
def to_evidence(
    self,
    *,
    evidence_id: str,
    agent_state_id: str,
    created_at: datetime | None = None,
) -> AgentEvidence:
    """Assign repository identity while preserving append-only contents."""
    _require_non_blank(evidence_id, "Agent evidence id")
    _require_non_blank(agent_state_id, "Agent state id")
    return AgentEvidence(
        id=evidence_id,
        agent_state_id=agent_state_id,
        kind=self.kind,
        payload=self.payload,
        summary=self.summary,
        digest=self.digest,
        manifest_ref=self.manifest_ref,
        reference=self.reference,
        sensitive_fields=self.sensitive_fields,
        created_at=created_at,
    )

Context

Typed context contracts for agent model input assembly.

ContextPackRole

Bases: StrEnum

Semantic role of a context pack inside a model request.

ContextFreshness

Bases: StrEnum

Freshness classification for context rot and budget decisions.

ContextSensitivity

Bases: StrEnum

Deterministic sensitivity metadata carried before model input.

ContextRotSymptom

Bases: StrEnum

Typed context rot symptoms observed before model input assembly.

ContextOptimizationActionKind

Bases: StrEnum

Optimization actions that can be selected from context health signals.

ContextOptimizationEvidenceStage

Bases: StrEnum

Where an optimization action evidence item sits in the agent flow.

ContextTokenBudget(max_tokens=None, estimated_tokens=None, reserved_output_tokens=None) dataclass

Token budget allocated to one context pack.

ContextHealthSignal(id, symptom, manifest_ref=None, pack_id=None, evidence_ref=None, score=None, observed_at=None, metadata=dict()) dataclass

Observed context rot signal used to choose optimization actions.

evidence_payload()

Return JSON-compatible signal metadata for append-only evidence.

Source code in core/spakky-agent/src/spakky/agent/context.py
def evidence_payload(self) -> Mapping[str, JsonValue]:
    """Return JSON-compatible signal metadata for append-only evidence."""
    return {
        "id": self.id,
        "symptom": self.symptom.value,
        "manifest_ref": self.manifest_ref,
        "pack_id": self.pack_id,
        "evidence_ref": self.evidence_ref,
        "score": self.score,
        "observed_at": self.observed_at.isoformat()
        if self.observed_at is not None
        else None,
        "metadata": self.metadata,
    }

ContextOptimizationAction(id, kind, signal_refs=tuple(), target_pack_ids=tuple(), manifest_ref=None, digest_ref=None, delegation_ref=None, result_evidence_ref=None, reason=None, metadata=dict()) dataclass

Selected optimization action derived from context health signals.

evidence_payload()

Return JSON-compatible action metadata without raw context contents.

Source code in core/spakky-agent/src/spakky/agent/context.py
def evidence_payload(self) -> Mapping[str, JsonValue]:
    """Return JSON-compatible action metadata without raw context contents."""
    return {
        "id": self.id,
        "kind": self.kind.value,
        "signal_refs": tuple(self.signal_refs),
        "target_pack_ids": tuple(self.target_pack_ids),
        "manifest_ref": self.manifest_ref,
        "digest_ref": self.digest_ref,
        "delegation_ref": self.delegation_ref,
        "result_evidence_ref": self.result_evidence_ref,
        "reason": self.reason,
        "metadata": self.metadata,
    }

IAgentContextHandler

Bases: ABC

Select context optimization actions from health signals and manifests.

select_optimization_actions(signals, manifest) abstractmethod

Return optimization actions without mutating raw evidence.

Source code in core/spakky-agent/src/spakky/agent/context.py
@abstractmethod
def select_optimization_actions(
    self,
    signals: Sequence[ContextHealthSignal],
    manifest: "ContextManifest",
) -> Sequence[ContextOptimizationAction]:
    """Return optimization actions without mutating raw evidence."""
    ...

ContextPack(id, content, source, role, freshness=ContextFreshness.UNKNOWN, relevance=None, token_budget=ContextTokenBudget(), sensitivity=ContextSensitivity.INTERNAL, sensitive_fields=(), metadata=dict()) dataclass

LLM-facing context unit derived from state, signal, or evidence.

guarded_content(policy=None)

Return deterministic model-safe content for this context pack.

Source code in core/spakky-agent/src/spakky/agent/context.py
def guarded_content(
    self,
    policy: ContextExposurePolicy | None = None,
) -> str:
    """Return deterministic model-safe content for this context pack."""
    exposure_policy = policy or ContextExposurePolicy()
    if self.sensitivity == ContextSensitivity.REDACTED:
        return "[REDACTED]"
    guarded = guard_json_value(
        {"content": self.content},
        tuple(
            SensitiveFieldDescriptor(
                ("content", *descriptor.path), descriptor.field
            )
            for descriptor in self.sensitive_fields
        ),
        exposure_policy,
    )
    content = cast(Mapping[str, JsonValue], guarded).get("content")
    if isinstance(content, str):
        return content
    return "[REDACTED]"

message_metadata(policy=None)

Return non-content metadata for provider-neutral model messages.

Source code in core/spakky-agent/src/spakky/agent/context.py
def message_metadata(
    self,
    policy: ContextExposurePolicy | None = None,
) -> Mapping[str, JsonValue]:
    """Return non-content metadata for provider-neutral model messages."""
    exposure_policy = policy or ContextExposurePolicy()
    metadata: dict[str, JsonValue] = {
        "context_pack_id": self.id,
        "source": self.source,
        "role": self.role.value,
        "freshness": self.freshness.value,
        "relevance": self.relevance,
        "token_budget": {
            "max_tokens": self.token_budget.max_tokens,
            "estimated_tokens": self.token_budget.estimated_tokens,
            "reserved_output_tokens": self.token_budget.reserved_output_tokens,
        },
        "sensitivity": self.sensitivity.value,
        "metadata": self.metadata,
    }
    if exposure_policy.include_sensitive_context_metadata and self.sensitive_fields:
        metadata["sensitive_fields"] = tuple(
            descriptor.to_metadata() for descriptor in self.sensitive_fields
        )
    return metadata

ContextManifestEntry(pack_id, source, role, origin_ref, evidence_ref=None, digest_ref=None, sensitive_fields=(), metadata=dict()) dataclass

One audited pack entry inside a context manifest.

ContextManifest(id, entries, origin_ref=None, evidence_refs=tuple(), created_at=None, metadata=dict()) dataclass

Auditable composition record for model input context packs.

ContextDigest(id, context_identity, source_manifest_ref, digest, derived_from_pack_ids=tuple(), compression_evidence_ref=None, algorithm=None, summary=None, created_at=None, metadata=dict()) dataclass

Derived compression evidence for a context identity.

Recovery

Action-boundary recovery contracts for durable agent execution.

AgentActionKind

Bases: StrEnum

Recoverable external action classes in an agent execution.

AgentActionBoundaryStage

Bases: StrEnum

Checkpoint side recorded around an action boundary.

AgentResumeAction

Bases: StrEnum

Orchestration action selected from persisted checkpoint evidence.

AgentActionBoundaryCheckpoint(action_id, action_kind, stage, idempotency=Idempotency.UNKNOWN, metadata=dict()) dataclass

Serializable checkpoint recorded before or after one external action.

__post_init__()

Reject checkpoints that cannot be correlated after restart.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
def __post_init__(self) -> None:
    """Reject checkpoints that cannot be correlated after restart."""
    if not self.action_id.strip():
        raise AgentDefinitionError("Agent action boundary id cannot be blank")

before_model_call(action_id, *, idempotency=Idempotency.UNKNOWN, metadata=None) classmethod

Create the checkpoint recorded before a model call is attempted.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
@classmethod
def before_model_call(
    cls,
    action_id: str,
    *,
    idempotency: Idempotency = Idempotency.UNKNOWN,
    metadata: JsonObject | None = None,
) -> "AgentActionBoundaryCheckpoint":
    """Create the checkpoint recorded before a model call is attempted."""
    return cls._for_action(
        action_id,
        AgentActionKind.MODEL_CALL,
        AgentActionBoundaryStage.BEFORE,
        idempotency=idempotency,
        metadata=metadata,
    )

after_model_call(action_id, *, idempotency=Idempotency.UNKNOWN, metadata=None) classmethod

Create the checkpoint recorded after a model call completes.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
@classmethod
def after_model_call(
    cls,
    action_id: str,
    *,
    idempotency: Idempotency = Idempotency.UNKNOWN,
    metadata: JsonObject | None = None,
) -> "AgentActionBoundaryCheckpoint":
    """Create the checkpoint recorded after a model call completes."""
    return cls._for_action(
        action_id,
        AgentActionKind.MODEL_CALL,
        AgentActionBoundaryStage.AFTER,
        idempotency=idempotency,
        metadata=metadata,
    )

before_tool_call(action_id, *, idempotency, metadata=None) classmethod

Create the checkpoint recorded before a tool call is attempted.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
@classmethod
def before_tool_call(
    cls,
    action_id: str,
    *,
    idempotency: Idempotency,
    metadata: JsonObject | None = None,
) -> "AgentActionBoundaryCheckpoint":
    """Create the checkpoint recorded before a tool call is attempted."""
    return cls._for_action(
        action_id,
        AgentActionKind.TOOL_CALL,
        AgentActionBoundaryStage.BEFORE,
        idempotency=idempotency,
        metadata=metadata,
    )

after_tool_call(action_id, *, idempotency, metadata=None) classmethod

Create the checkpoint recorded after a tool call completes.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
@classmethod
def after_tool_call(
    cls,
    action_id: str,
    *,
    idempotency: Idempotency,
    metadata: JsonObject | None = None,
) -> "AgentActionBoundaryCheckpoint":
    """Create the checkpoint recorded after a tool call completes."""
    return cls._for_action(
        action_id,
        AgentActionKind.TOOL_CALL,
        AgentActionBoundaryStage.AFTER,
        idempotency=idempotency,
        metadata=metadata,
    )

before_approval_wait(action_id, *, metadata=None) classmethod

Create the checkpoint recorded before waiting for approval.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
@classmethod
def before_approval_wait(
    cls,
    action_id: str,
    *,
    metadata: JsonObject | None = None,
) -> "AgentActionBoundaryCheckpoint":
    """Create the checkpoint recorded before waiting for approval."""
    return cls._for_action(
        action_id,
        AgentActionKind.APPROVAL_WAIT,
        AgentActionBoundaryStage.BEFORE,
        idempotency=Idempotency.NON_IDEMPOTENT,
        metadata=metadata,
    )

after_approval_wait(action_id, *, metadata=None) classmethod

Create the checkpoint recorded after an approval wait resolves.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
@classmethod
def after_approval_wait(
    cls,
    action_id: str,
    *,
    metadata: JsonObject | None = None,
) -> "AgentActionBoundaryCheckpoint":
    """Create the checkpoint recorded after an approval wait resolves."""
    return cls._for_action(
        action_id,
        AgentActionKind.APPROVAL_WAIT,
        AgentActionBoundaryStage.AFTER,
        idempotency=Idempotency.NON_IDEMPOTENT,
        metadata=metadata,
    )

to_evidence_candidate(*, summary=None)

Represent this checkpoint as append-only evidence.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
def to_evidence_candidate(
    self,
    *,
    summary: str | None = None,
) -> AgentEvidenceCandidate:
    """Represent this checkpoint as append-only evidence."""
    return AgentEvidenceCandidate(
        kind=AgentEvidenceKind.ACTION_BOUNDARY,
        payload={
            "action_id": self.action_id,
            "action_kind": self.action_kind.value,
            "stage": self.stage.value,
            "idempotency": self.idempotency.value,
            "metadata": self.metadata,
        },
        summary=summary,
    )

AgentResumeBoundary(action_id, action_kind, stage, idempotency, evidence_id) dataclass

Last action boundary reconstructed from append-only evidence.

AgentResumePlan(state, action, boundary=None, signals=()) dataclass

Resume decision derived from persisted state and checkpoint evidence.

requires_human_input property

Return whether automatic replay must stop at HITL recovery.

can_resume_automatically property

Return whether orchestration may continue without approval.

plan_agent_resume(state, evidence, signals=())

Restore the next resume action using only persisted state and evidence.

Source code in core/spakky-agent/src/spakky/agent/recovery.py
def plan_agent_resume(
    state: AgentState,
    evidence: Sequence[AgentEvidence],
    signals: Sequence[AgentSignal] = (),
) -> AgentResumePlan:
    """Restore the next resume action using only persisted state and evidence."""
    persisted_signals = tuple(signals)
    if state.status in (
        AgentStatus.COMPLETED,
        AgentStatus.FAILED,
        AgentStatus.CANCELLED,
    ):
        return AgentResumePlan(
            state=state,
            action=AgentResumeAction.NOT_RESUMABLE,
            signals=persisted_signals,
        )

    boundary = _last_action_boundary(evidence)
    if boundary is None:
        return AgentResumePlan(
            state=state,
            action=AgentResumeAction.START,
            signals=persisted_signals,
        )

    if boundary.stage is AgentActionBoundaryStage.AFTER:
        return AgentResumePlan(
            state=replace(state, recovery_marker=boundary.action_id),
            action=AgentResumeAction.SKIP_COMPLETED,
            boundary=boundary,
            signals=persisted_signals,
        )

    if boundary.action_kind is AgentActionKind.APPROVAL_WAIT:
        return AgentResumePlan(
            state=_interrupt_for_human_recovery(state, boundary),
            action=AgentResumeAction.REQUIRE_HITL,
            boundary=boundary,
            signals=persisted_signals,
        )

    resume_action = ToolResumeMetadata(
        idempotency=boundary.idempotency,
    ).action_for_incomplete_boundary()
    if resume_action is ToolResumeAction.RETRY:
        return AgentResumePlan(
            state=replace(state, recovery_marker=boundary.action_id),
            action=AgentResumeAction.RETRY,
            boundary=boundary,
            signals=persisted_signals,
        )
    return AgentResumePlan(
        state=_interrupt_for_human_recovery(state, boundary),
        action=AgentResumeAction.REQUIRE_HITL,
        boundary=boundary,
        signals=persisted_signals,
    )

Approval

Human-in-the-loop approval workflow contracts.

AgentApprovalBoundaryKind

Bases: StrEnum

Action boundaries where orchestration may require human approval.

AgentApprovalPlanAction

Bases: StrEnum

Approval plan outcome before an action boundary is executed.

AgentApprovalRequest(id, agent_state_id, boundary, prompt, risk, action_ref, allowed_decisions=DEFAULT_APPROVAL_DECISIONS, metadata=dict()) dataclass

Approval request materialized at a risky action boundary.

__post_init__()

Reject approval requests that cannot be matched by a signal.

Source code in core/spakky-agent/src/spakky/agent/approval.py
def __post_init__(self) -> None:
    """Reject approval requests that cannot be matched by a signal."""
    _require_non_blank(self.id, "Agent approval id")
    _require_non_blank(self.agent_state_id, "Agent approval state id")
    _require_non_blank(self.prompt, "Agent approval prompt")
    _require_non_blank(self.action_ref, "Agent approval action ref")
    if len(self.allowed_decisions) == 0:
        raise AgentDefinitionError("Agent approval decisions cannot be empty")

from_tool_descriptor(*, approval_id, agent_state_id, descriptor, prompt=None, call_id=None, metadata=None) classmethod

Build an approval request from a risky tool descriptor.

Source code in core/spakky-agent/src/spakky/agent/approval.py
@classmethod
def from_tool_descriptor(
    cls,
    *,
    approval_id: str,
    agent_state_id: str,
    descriptor: AgentToolDescriptor,
    prompt: str | None = None,
    call_id: str | None = None,
    metadata: JsonObject | None = None,
) -> "AgentApprovalRequest":
    """Build an approval request from a risky tool descriptor."""
    request_metadata = _tool_approval_metadata(descriptor, call_id, metadata or {})
    return cls(
        id=approval_id,
        agent_state_id=agent_state_id,
        boundary=AgentApprovalBoundaryKind.TOOL_INVOCATION,
        prompt=prompt or f"Approve tool invocation: {descriptor.name}",
        risk=descriptor.metadata.risk,
        action_ref=descriptor.identity.key,
        metadata=request_metadata,
    )

to_state(*, agent_type)

Materialize the approval wait as interrupted lifecycle state.

Source code in core/spakky-agent/src/spakky/agent/approval.py
def to_state(self, *, agent_type: str) -> AgentState:
    """Materialize the approval wait as interrupted lifecycle state."""
    _require_non_blank(agent_type, "Agent approval agent type")
    return AgentState(
        id=self.agent_state_id,
        agent_type=agent_type,
        status=AgentStatus.INTERRUPTED,
        transition=AgentStateTransition.WAITING_APPROVAL,
        reason=AgentStateReason.APPROVAL_REQUIRED,
        current_activity=self.prompt,
        metadata={"approval": self.to_metadata()},
    )

to_yield()

Expose this approval request to an inbound adapter stream.

Source code in core/spakky-agent/src/spakky/agent/approval.py
def to_yield(self) -> AgentYield[Approval]:
    """Expose this approval request to an inbound adapter stream."""
    return AgentYield(
        kind=AgentYieldKind.APPROVAL,
        payload=Approval(
            id=self.id,
            prompt=self.prompt,
            allowed_decisions=self.allowed_decisions,
            metadata=self.to_metadata(),
        ),
    )

to_metadata()

Return JSON-compatible metadata for state, yield, and evidence.

Source code in core/spakky-agent/src/spakky/agent/approval.py
def to_metadata(self) -> JsonObject:
    """Return JSON-compatible metadata for state, yield, and evidence."""
    return {
        "id": self.id,
        "agent_state_id": self.agent_state_id,
        "boundary": self.boundary.value,
        "action_ref": self.action_ref,
        "risk_axes": [axis.value for axis in self.risk.axes],
        "allowed_decisions": [
            decision.value for decision in self.allowed_decisions
        ],
        "metadata": self.metadata,
    }

AgentApprovalPlan(action, request=None, state=None, yield_item=None) dataclass

Plan for an action boundary before executing it.

requires_approval property

Return whether orchestration must wait for a HITL decision.

AgentApprovalDecisionOutcome(request_id, decision, status, transition, reason=None, modified_payload=dict(), comment=None) dataclass

Typed result of an approval decision signal.

plan_agent_tool_approval(*, descriptor, approval_id, agent_state_id, agent_type, prompt=None, call_id=None, metadata=None)

Plan whether a tool invocation should proceed or wait for approval.

Source code in core/spakky-agent/src/spakky/agent/approval.py
def plan_agent_tool_approval(
    *,
    descriptor: AgentToolDescriptor,
    approval_id: str,
    agent_state_id: str,
    agent_type: str,
    prompt: str | None = None,
    call_id: str | None = None,
    metadata: JsonObject | None = None,
) -> AgentApprovalPlan:
    """Plan whether a tool invocation should proceed or wait for approval."""
    if not descriptor.metadata.requires_approval_candidate:
        return AgentApprovalPlan(action=AgentApprovalPlanAction.PROCEED)

    request = AgentApprovalRequest.from_tool_descriptor(
        approval_id=approval_id,
        agent_state_id=agent_state_id,
        descriptor=descriptor,
        prompt=prompt,
        call_id=call_id,
        metadata=metadata,
    )
    return AgentApprovalPlan(
        action=AgentApprovalPlanAction.WAIT_FOR_APPROVAL,
        request=request,
        state=request.to_state(agent_type=agent_type),
        yield_item=request.to_yield(),
    )

parse_agent_approval_decision_signal(signal, *, request=None)

Parse an approval decision signal into a typed workflow outcome.

Source code in core/spakky-agent/src/spakky/agent/approval.py
def parse_agent_approval_decision_signal(
    signal: AgentSignal,
    *,
    request: AgentApprovalRequest | None = None,
) -> AgentApprovalDecisionOutcome:
    """Parse an approval decision signal into a typed workflow outcome."""
    if signal.kind != AgentSignalKind.APPROVAL_DECISION:
        raise AgentDefinitionError(
            "Agent approval signal kind must be approval_decision"
        )

    request_id = _require_payload_text(signal.payload, "request_id")
    if request is not None and request_id != request.id:
        raise AgentDefinitionError("Agent approval signal request id does not match")
    if request is not None and signal.agent_state_id != request.agent_state_id:
        raise AgentDefinitionError("Agent approval signal state id does not match")

    decision = _parse_approval_decision(
        _require_payload_text(signal.payload, "decision")
    )
    modified_payload = _payload_object(signal.payload.get("modified_payload", {}))
    comment = _optional_payload_text(signal.payload, "comment")
    status, transition, reason = _decision_state_target(decision)

    return AgentApprovalDecisionOutcome(
        request_id=request_id,
        decision=decision,
        status=status,
        transition=transition,
        reason=reason,
        modified_payload=modified_payload,
        comment=comment,
    )

materialize_agent_approval_decision_state(current, outcome)

Apply a typed approval decision outcome to an existing state snapshot.

Source code in core/spakky-agent/src/spakky/agent/approval.py
def materialize_agent_approval_decision_state(
    current: AgentState,
    outcome: AgentApprovalDecisionOutcome,
) -> AgentState:
    """Apply a typed approval decision outcome to an existing state snapshot."""
    return AgentState(
        id=current.id,
        agent_type=current.agent_type,
        status=outcome.status,
        transition=outcome.transition,
        reason=outcome.reason,
        current_activity=_activity_for_decision(outcome),
        input_ref=current.input_ref,
        output_ref=current.output_ref,
        pending_signal_count=current.pending_signal_count,
        last_event_cursor=current.last_event_cursor,
        recovery_marker=current.recovery_marker,
        metadata={
            **current.metadata,
            "approval_decision": {
                "request_id": outcome.request_id,
                "decision": outcome.decision.value,
                "modified_payload": outcome.modified_payload,
                "comment": outcome.comment,
            },
        },
        created_at=current.created_at,
        updated_at=current.updated_at,
    )

Cancellation

Cancellation lifecycle and cleanup contracts for agent execution.

AgentCancellationCleanupCallable = Callable[[AgentCancellationRequest], Awaitable[AgentCancellationCleanupResult]]

Async hook shape invoked for model stream/tool/delegate cancellation cleanup.

AgentCancellationTargetKind

Bases: StrEnum

Running execution target that can receive a cancellation cleanup hook.

AgentCancellationCleanupStatus

Bases: StrEnum

Outcome of one cancellation cleanup hook.

AgentCancellationRequest(state_id, signal_id, target_kind, target_ref, reason=None, requested_by=None, metadata=dict()) dataclass

Cancellation request passed to a model stream, tool, or delegate hook.

__post_init__()

Reject requests that cannot be traced back to state/signal/target.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def __post_init__(self) -> None:
    """Reject requests that cannot be traced back to state/signal/target."""
    _require_non_blank(self.state_id, "Agent cancellation state id")
    _require_non_blank(self.signal_id, "Agent cancellation signal id")
    _require_non_blank(self.target_ref, "Agent cancellation target ref")
    _require_optional_non_blank(self.reason, "Agent cancellation reason")
    _require_optional_non_blank(self.requested_by, "Agent cancellation requester")

from_signal(*, state, signal, target_kind, target_ref, metadata=None) classmethod

Build a hook request from the durable CANCEL signal payload.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
@classmethod
def from_signal(
    cls,
    *,
    state: AgentState,
    signal: AgentSignal,
    target_kind: AgentCancellationTargetKind,
    target_ref: str,
    metadata: JsonObject | None = None,
) -> "AgentCancellationRequest":
    """Build a hook request from the durable CANCEL signal payload."""
    _require_cancel_signal(signal)
    _require_signal_matches_state(state, signal)
    return cls(
        state_id=state.id,
        signal_id=signal.id,
        target_kind=target_kind,
        target_ref=target_ref,
        reason=_optional_string(signal.payload, "reason"),
        requested_by=_optional_string(signal.payload, "requested_by"),
        metadata={} if metadata is None else metadata,
    )

to_payload()

Serialize the request into append-only evidence metadata.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def to_payload(self) -> JsonObject:
    """Serialize the request into append-only evidence metadata."""
    return {
        "state_id": self.state_id,
        "signal_id": self.signal_id,
        "target_kind": self.target_kind.value,
        "target_ref": self.target_ref,
        "reason": self.reason,
        "requested_by": self.requested_by,
        "metadata": self.metadata,
    }

AgentCancellationCleanupResult(target_kind, target_ref, status, reason=None, error_code=None, message=None, metadata=dict()) dataclass

Result returned by one cancellation cleanup hook.

failed_cleanup property

Return whether this hook prevents a clean CANCELLED terminal state.

__post_init__()

Reject outcomes that cannot be correlated with the cleanup target.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def __post_init__(self) -> None:
    """Reject outcomes that cannot be correlated with the cleanup target."""
    _require_non_blank(self.target_ref, "Agent cancellation cleanup target ref")
    _require_optional_non_blank(self.reason, "Agent cancellation cleanup reason")
    _require_optional_non_blank(
        self.error_code,
        "Agent cancellation cleanup error code",
    )
    _require_optional_non_blank(self.message, "Agent cancellation cleanup message")

succeeded(*, target_kind, target_ref, reason=None, metadata=None) classmethod

Record a cleanup hook that released its target.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
@classmethod
def succeeded(
    cls,
    *,
    target_kind: AgentCancellationTargetKind,
    target_ref: str,
    reason: str | None = None,
    metadata: JsonObject | None = None,
) -> "AgentCancellationCleanupResult":
    """Record a cleanup hook that released its target."""
    return cls(
        target_kind=target_kind,
        target_ref=target_ref,
        status=AgentCancellationCleanupStatus.SUCCEEDED,
        reason=reason,
        metadata={} if metadata is None else metadata,
    )

failed(*, target_kind, target_ref, error_code, message, metadata=None) classmethod

Record a cleanup hook that could not release its target.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
@classmethod
def failed(
    cls,
    *,
    target_kind: AgentCancellationTargetKind,
    target_ref: str,
    error_code: str,
    message: str,
    metadata: JsonObject | None = None,
) -> "AgentCancellationCleanupResult":
    """Record a cleanup hook that could not release its target."""
    return cls(
        target_kind=target_kind,
        target_ref=target_ref,
        status=AgentCancellationCleanupStatus.FAILED,
        error_code=error_code,
        message=message,
        metadata={} if metadata is None else metadata,
    )

skipped(*, target_kind, target_ref, reason, metadata=None) classmethod

Record a cleanup target that was already inactive or unavailable.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
@classmethod
def skipped(
    cls,
    *,
    target_kind: AgentCancellationTargetKind,
    target_ref: str,
    reason: str,
    metadata: JsonObject | None = None,
) -> "AgentCancellationCleanupResult":
    """Record a cleanup target that was already inactive or unavailable."""
    return cls(
        target_kind=target_kind,
        target_ref=target_ref,
        status=AgentCancellationCleanupStatus.SKIPPED,
        reason=reason,
        metadata={} if metadata is None else metadata,
    )

to_payload()

Serialize this hook outcome into evidence/state metadata.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def to_payload(self) -> JsonObject:
    """Serialize this hook outcome into evidence/state metadata."""
    return {
        "target_kind": self.target_kind.value,
        "target_ref": self.target_ref,
        "status": self.status.value,
        "reason": self.reason,
        "error_code": self.error_code,
        "message": self.message,
        "metadata": self.metadata,
    }

AgentCancellationCleanupTask(target_kind, target_ref, cleanup, metadata=dict()) dataclass

One cleanup hook registered for a running cancellation target.

__post_init__()

Reject hooks that cannot be identified in cleanup evidence.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def __post_init__(self) -> None:
    """Reject hooks that cannot be identified in cleanup evidence."""
    _require_non_blank(self.target_ref, "Agent cancellation cleanup target ref")

run(*, state, signal) async

Invoke the hook and verify that it reports the same target.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
async def run(
    self,
    *,
    state: AgentState,
    signal: AgentSignal,
) -> AgentCancellationCleanupResult:
    """Invoke the hook and verify that it reports the same target."""
    request = AgentCancellationRequest.from_signal(
        state=state,
        signal=signal,
        target_kind=self.target_kind,
        target_ref=self.target_ref,
        metadata=self.metadata,
    )
    result = await self.cleanup(request)
    if (
        result.target_kind is not self.target_kind
        or result.target_ref != self.target_ref
    ):
        raise AgentDefinitionError(
            "Agent cancellation cleanup result target must match the task"
        )
    return result

AgentCancellationCleanupReport(state_id, signal_id, outcomes=()) dataclass

Aggregate cleanup evidence for one CANCEL signal.

cleanup_succeeded property

Return whether all cleanup hooks completed without failure.

failed_outcomes property

Return the hook outcomes that force FAILED terminal state.

__post_init__()

Reject reports that cannot be attached to state and signal evidence.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def __post_init__(self) -> None:
    """Reject reports that cannot be attached to state and signal evidence."""
    _require_non_blank(self.state_id, "Agent cancellation report state id")
    _require_non_blank(self.signal_id, "Agent cancellation report signal id")

to_payload()

Serialize the report for state metadata and evidence payloads.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def to_payload(self) -> JsonObject:
    """Serialize the report for state metadata and evidence payloads."""
    return {
        "state_id": self.state_id,
        "signal_id": self.signal_id,
        "cleanup_succeeded": self.cleanup_succeeded,
        "outcomes": tuple(outcome.to_payload() for outcome in self.outcomes),
    }

to_evidence_candidate(*, summary=None)

Represent cancellation cleanup as append-only evidence.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def to_evidence_candidate(
    self,
    *,
    summary: str | None = None,
) -> AgentEvidenceCandidate:
    """Represent cancellation cleanup as append-only evidence."""
    return AgentEvidenceCandidate(
        kind=AgentEvidenceKind.CANCELLATION,
        payload=self.to_payload(),
        summary=summary,
    )

run_agent_cancellation_cleanup(*, state, signal, tasks) async

Invoke model stream/tool/delegate cleanup hooks for a CANCEL signal.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
async def run_agent_cancellation_cleanup(
    *,
    state: AgentState,
    signal: AgentSignal,
    tasks: Sequence[AgentCancellationCleanupTask],
) -> AgentCancellationCleanupReport:
    """Invoke model stream/tool/delegate cleanup hooks for a CANCEL signal."""
    _require_cancel_signal(signal)
    outcomes: list[AgentCancellationCleanupResult] = []
    for task in tasks:
        outcomes.append(await task.run(state=state, signal=signal))
    return AgentCancellationCleanupReport(
        state_id=state.id,
        signal_id=signal.id,
        outcomes=tuple(outcomes),
    )

begin_agent_cancellation(state, signal)

Materialize receipt of a CANCEL signal as CANCELLING state.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def begin_agent_cancellation(
    state: AgentState,
    signal: AgentSignal,
) -> AgentState:
    """Materialize receipt of a CANCEL signal as CANCELLING state."""
    _require_cancel_signal(signal)
    _require_signal_matches_state(state, signal)
    return replace(
        state,
        status=AgentStatus.CANCELLING,
        transition=AgentStateTransition.CANCELLING,
        reason=AgentStateReason.CANCELLATION_REQUESTED,
        metadata={
            **state.metadata,
            "cancellation_signal_id": signal.id,
            "cancellation_reason": _optional_string(signal.payload, "reason"),
            "cancellation_requested_by": _optional_string(
                signal.payload,
                "requested_by",
            ),
        },
    )

complete_agent_cancellation(state, report)

Resolve CANCELLING state into CANCELLED or FAILED cleanup outcome.

Source code in core/spakky-agent/src/spakky/agent/cancellation.py
def complete_agent_cancellation(
    state: AgentState,
    report: AgentCancellationCleanupReport,
) -> AgentState:
    """Resolve CANCELLING state into CANCELLED or FAILED cleanup outcome."""
    if state.id != report.state_id:
        raise AgentDefinitionError("Agent cancellation report state id mismatch")
    metadata = {**state.metadata, "cancellation_cleanup": report.to_payload()}
    if report.cleanup_succeeded:
        return replace(
            state,
            status=AgentStatus.CANCELLED,
            transition=AgentStateTransition.CANCELLED,
            reason=AgentStateReason.CANCELLATION_REQUESTED,
            metadata=metadata,
        )
    return replace(
        state,
        status=AgentStatus.FAILED,
        transition=AgentStateTransition.FAILED,
        reason=AgentStateReason.CANCELLATION_CLEANUP_FAILED,
        metadata=metadata,
    )

Delegation

Agent-to-agent delegation contracts.

DelegationReturnPolicy

Bases: StrEnum

How a child agent result should be projected back to the parent.

AgentDelegateTarget(agent_type, agent_name=None, metadata=dict()) dataclass

First-class delegate target represented by another @Agent component.

__post_init__()

Reject delegate targets that cannot identify an agent component.

Source code in core/spakky-agent/src/spakky/agent/delegation.py
def __post_init__(self) -> None:
    """Reject delegate targets that cannot identify an agent component."""
    if not self.agent_type.strip():
        raise AgentDefinitionError("Delegate target agent type cannot be blank")
    if self.agent_name is not None and not self.agent_name.strip():
        raise AgentDefinitionError("Delegate target agent name cannot be blank")

DelegationBudget(max_steps=None, max_tokens=None, timeout_seconds=None, deadline_at=None, metadata=dict()) dataclass

Budget metadata attached to a delegation packet.

__post_init__()

Reject delegation budgets that cannot be enforced consistently.

Source code in core/spakky-agent/src/spakky/agent/delegation.py
def __post_init__(self) -> None:
    """Reject delegation budgets that cannot be enforced consistently."""
    if self.max_steps is not None and self.max_steps <= 0:
        raise AgentDefinitionError("Delegation max steps must be positive")
    if self.max_tokens is not None and self.max_tokens <= 0:
        raise AgentDefinitionError("Delegation max tokens must be positive")
    if self.timeout_seconds is not None and self.timeout_seconds <= 0:
        raise AgentDefinitionError("Delegation timeout must be positive")

DelegationContextSlice(summary=None, evidence_refs=(), manifest_ref=None, metadata=dict()) dataclass

Minimal parent context projected for a child agent.

DelegationExpectedOutput(description=None, schema=dict(), metadata=dict()) dataclass

Expected child output description and optional JSON schema.

DelegationPacket(id, parent_agent_state_id, target, task, context=DelegationContextSlice(), constraints=(), expected_output=DelegationExpectedOutput(), budget=DelegationBudget(), allowed_capabilities=(), return_policy=DelegationReturnPolicy.SUMMARY_AND_EVIDENCE, metadata=dict()) dataclass

Task packet passed from a parent agent to a delegate agent.

__post_init__()

Reject delegation packets without parent linkage or task identity.

Source code in core/spakky-agent/src/spakky/agent/delegation.py
def __post_init__(self) -> None:
    """Reject delegation packets without parent linkage or task identity."""
    if not self.id.strip():
        raise AgentDefinitionError("Delegation id cannot be blank")
    if not self.parent_agent_state_id.strip():
        raise AgentDefinitionError("Delegation parent state id cannot be blank")

DelegationResult(id, packet_id, target, summary, output=None, evidence_refs=(), metadata=dict(), created_at=None) dataclass

Child agent result projected back to the parent execution.

__post_init__()

Reject delegated results that cannot be linked to a packet.

Source code in core/spakky-agent/src/spakky/agent/delegation.py
def __post_init__(self) -> None:
    """Reject delegated results that cannot be linked to a packet."""
    if not self.id.strip():
        raise AgentDefinitionError("Delegation result id cannot be blank")
    if not self.packet_id.strip():
        raise AgentDefinitionError("Delegation result packet id cannot be blank")
    if not self.summary.strip():
        raise AgentDefinitionError("Delegation result summary cannot be blank")

to_parent_evidence(*, evidence_id, parent_agent_state_id)

Represent a delegated result as append-only parent evidence.

Source code in core/spakky-agent/src/spakky/agent/delegation.py
def to_parent_evidence(
    self,
    *,
    evidence_id: str,
    parent_agent_state_id: str,
) -> AgentEvidence:
    """Represent a delegated result as append-only parent evidence."""
    payload: dict[str, JsonValue] = {
        "delegation_id": self.id,
        "packet_id": self.packet_id,
        "target_agent_type": self.target.agent_type,
        "evidence_refs": self.evidence_refs,
        "metadata": self.metadata,
    }
    if self.target.agent_name is not None:
        payload["target_agent_name"] = self.target.agent_name
    if self.output is not None:
        payload["output"] = self.output
    return AgentEvidence(
        id=evidence_id,
        agent_state_id=parent_agent_state_id,
        kind=AgentEvidenceKind.DELEGATION,
        payload=payload,
        summary=self.summary,
        created_at=self.created_at,
    )

to_parent_yield(*, evidence_id, parent_agent_state_id)

Expose the delegated result on the parent's AgentYield stream.

Source code in core/spakky-agent/src/spakky/agent/delegation.py
def to_parent_yield(
    self,
    *,
    evidence_id: str,
    parent_agent_state_id: str,
) -> AgentYield[Evidence]:
    """Expose the delegated result on the parent's AgentYield stream."""
    evidence = self.to_parent_evidence(
        evidence_id=evidence_id,
        parent_agent_state_id=parent_agent_state_id,
    )
    return AgentYield(
        kind=AgentYieldKind.EVIDENCE,
        payload=Evidence(
            evidence=evidence,
            metadata={"delegation_id": self.id, "packet_id": self.packet_id},
        ),
    )

IAgentDelegate

Bases: ABC

Execution hook that runs a delegation packet against a delegate target.

delegate(packet) abstractmethod

Execute delegation without prescribing spawn topology or transport.

Source code in core/spakky-agent/src/spakky/agent/delegation.py
@abstractmethod
def delegate(
    self,
    packet: DelegationPacket,
) -> AsyncGenerator[AgentYield[DelegationResult], None]:
    """Execute delegation without prescribing spawn topology or transport."""

Safety

Deterministic sensitive-data contracts for agent boundaries.

DataSensitivity

Bases: StrEnum

Canonical sensitivity classes carried by descriptors.

PII

Bases: StrEnum

PII categories that can be declared with typing.Annotated.

MaskingPolicy

Bases: StrEnum

Deterministic text masking strategies for sensitive values.

RedactionPolicy

Bases: StrEnum

Boundary action used when a value must not be exposed.

StreamingGuardFailureMode

Bases: StrEnum

Final audit behavior when a streaming guard missed a raw candidate.

StreamingRedactionAuditStatus

Bases: StrEnum

Final aggregate streaming redaction audit status.

ContextExposurePolicy(include_pii_values=False, include_sensitive_values=False, include_sensitive_schema_metadata=False, include_sensitive_context_metadata=False) dataclass

Policy for LLM-facing context and schema metadata exposure.

can_expose_value(field)

Return whether a sensitive value may cross the model boundary.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def can_expose_value(self, field: "SensitiveField | SecretField") -> bool:
    """Return whether a sensitive value may cross the model boundary."""
    if isinstance(field, SecretField):
        return False
    if field.sensitivity == DataSensitivity.PII:
        return self.include_pii_values
    if field.sensitivity in (DataSensitivity.CONFIDENTIAL, DataSensitivity.SECRET):
        return self.include_sensitive_values
    return True

EvidenceExposurePolicy(include_pii_values=False, include_sensitive_values=False, include_sensitive_metadata=True) dataclass

Policy for evidence payload exposure before append-only capture.

can_expose_value(field)

Return whether a sensitive value may be stored in evidence payloads.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def can_expose_value(self, field: "SensitiveField | SecretField") -> bool:
    """Return whether a sensitive value may be stored in evidence payloads."""
    if isinstance(field, SecretField):
        return False
    if field.sensitivity == DataSensitivity.PII:
        return self.include_pii_values
    if field.sensitivity in (DataSensitivity.CONFIDENTIAL, DataSensitivity.SECRET):
        return self.include_sensitive_values
    return True

CredentialRef(id, provider=None) dataclass

Reference to a credential outside LLM-facing context.

__post_init__()

Reject blank credential references.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def __post_init__(self) -> None:
    """Reject blank credential references."""
    _require_non_blank(self.id, "Credential reference id")
    if self.provider is not None:
        _require_non_blank(self.provider, "Credential provider")

SecretRef(id, credential=None) dataclass

Opaque reference to a secret value stored outside model context.

__post_init__()

Reject blank secret references.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def __post_init__(self) -> None:
    """Reject blank secret references."""
    _require_non_blank(self.id, "Secret reference id")

SensitiveField(category, masking=MaskingPolicy.REDACT, redaction=RedactionPolicy.REDACT, label=None, metadata=dict()) dataclass

typing.Annotated metadata for deterministic sensitive-field handling.

sensitivity property

Return the normalized sensitivity class.

category_name property

Return the stable public category name.

__post_init__()

Reject blank sensitive field labels.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def __post_init__(self) -> None:
    """Reject blank sensitive field labels."""
    if self.label is not None:
        _require_non_blank(self.label, "Sensitive field label")

guard_text(value)

Return deterministic model/evidence-safe text for this field.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def guard_text(self, value: str) -> str:
    """Return deterministic model/evidence-safe text for this field."""
    return _mask_text(value, self.masking, self.category_name)

to_metadata()

Serialize marker metadata without including the sensitive value.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def to_metadata(self) -> JsonObject:
    """Serialize marker metadata without including the sensitive value."""
    payload: dict[str, JsonValue] = {
        "kind": "sensitive",
        "category": self.category_name,
        "sensitivity": self.sensitivity.value,
        "masking": self.masking.value,
        "redaction": self.redaction.value,
    }
    if self.label is not None:
        payload["label"] = self.label
    if self.metadata:
        payload["metadata"] = self.metadata
    return payload

SecretField(redaction=RedactionPolicy.REFERENCE_ONLY, label=None, metadata=dict()) dataclass

typing.Annotated metadata for values that must never be model text.

sensitivity property

Return the normalized sensitivity class.

__post_init__()

Reject blank secret field labels.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def __post_init__(self) -> None:
    """Reject blank secret field labels."""
    if self.label is not None:
        _require_non_blank(self.label, "Secret field label")

guard_text(value)

Return deterministic replacement text for a secret value.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def guard_text(self, value: str) -> str:
    """Return deterministic replacement text for a secret value."""
    return SECRET_VALUE

to_metadata()

Serialize marker metadata without including the secret value.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def to_metadata(self) -> JsonObject:
    """Serialize marker metadata without including the secret value."""
    payload: dict[str, JsonValue] = {
        "kind": "secret",
        "sensitivity": DataSensitivity.SECRET.value,
        "redaction": self.redaction.value,
    }
    if self.label is not None:
        payload["label"] = self.label
    if self.metadata:
        payload["metadata"] = self.metadata
    return payload

SensitiveFieldDescriptor(path, field) dataclass

Path-bound sensitive metadata extracted from Annotated types.

to_metadata()

Serialize descriptor metadata without leaking the field value.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def to_metadata(self) -> JsonObject:
    """Serialize descriptor metadata without leaking the field value."""
    return {
        "path": list(self.path),
        "field": self.field.to_metadata(),
    }

StreamingSensitivePattern(name, pattern, replacement=REDACTED_VALUE, metadata=dict()) dataclass

Caller-supplied deterministic pattern used by streaming redaction.

__post_init__()

Reject blank names and invalid pattern syntax before streaming starts.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def __post_init__(self) -> None:
    """Reject blank names and invalid pattern syntax before streaming starts."""
    _require_non_blank(self.name, "Streaming sensitive pattern name")
    _require_non_blank(self.pattern, "Streaming sensitive pattern")
    try:
        re.compile(self.pattern)
    except re.error as e:
        raise AgentDefinitionError("Streaming sensitive pattern is invalid") from e

redact(value)

Return redacted text and the number of replacements applied.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def redact(self, value: str) -> tuple[str, int]:
    """Return redacted text and the number of replacements applied."""
    compiled = re.compile(self.pattern)
    return compiled.subn(self.replacement, value)

find_matches(value)

Return sanitized match locations without exposing raw text.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def find_matches(self, value: str) -> tuple["StreamingRedactionMatch", ...]:
    """Return sanitized match locations without exposing raw text."""
    compiled = re.compile(self.pattern)
    return tuple(
        StreamingRedactionMatch(
            pattern_name=self.name,
            start=match.start(),
            end=match.end(),
            metadata=self.metadata,
        )
        for match in compiled.finditer(value)
    )

StreamingRedactionPolicy(patterns, buffer_size=64, emit_chunk_size=None, failure_mode=StreamingGuardFailureMode.RAISE) dataclass

Bounded buffering policy balancing stream latency and redaction correctness.

__post_init__()

Reject policies that would make the guard unbounded or silent.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def __post_init__(self) -> None:
    """Reject policies that would make the guard unbounded or silent."""
    if self.buffer_size <= 0:
        raise AgentDefinitionError(
            "Streaming redaction buffer size must be positive"
        )
    if self.emit_chunk_size is not None and self.emit_chunk_size <= 0:
        raise AgentDefinitionError(
            "Streaming redaction emit chunk size must be positive"
        )
    if len(self.patterns) == 0:
        raise AgentDefinitionError("Streaming redaction patterns cannot be empty")

StreamingRedactionMatch(pattern_name, start, end, metadata=dict()) dataclass

Sanitized final-audit match location for a missed redaction candidate.

to_payload()

Serialize a match without the sensitive value itself.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def to_payload(self) -> JsonObject:
    """Serialize a match without the sensitive value itself."""
    payload: dict[str, JsonValue] = {
        "pattern_name": self.pattern_name,
        "start": self.start,
        "end": self.end,
    }
    if self.metadata:
        payload["metadata"] = self.metadata
    return payload

StreamingRedactionAudit(status, detected_count, redacted_count, missed_matches, buffer_size, emitted_char_count, original_char_count) dataclass

Final aggregate audit for a bounded streaming redaction session.

missed_count property

Return the number of raw candidates still present after streaming.

to_evidence_payload()

Serialize audit evidence without raw streamed content.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def to_evidence_payload(self) -> JsonObject:
    """Serialize audit evidence without raw streamed content."""
    return {
        "kind": "streaming_redaction_audit",
        "status": self.status.value,
        "detected_count": self.detected_count,
        "redacted_count": self.redacted_count,
        "missed_count": self.missed_count,
        "missed_matches": tuple(
            match.to_payload() for match in self.missed_matches
        ),
        "buffer_size": self.buffer_size,
        "emitted_char_count": self.emitted_char_count,
        "original_char_count": self.original_char_count,
    }

to_error_payload()

Serialize a typed error payload for stream consumers.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def to_error_payload(self) -> JsonObject:
    """Serialize a typed error payload for stream consumers."""
    return {
        "code": "streaming_redaction_audit_failed",
        "message": "Streaming redaction final audit detected unmasked candidates",
        "retryable": False,
        "metadata": self.to_evidence_payload(),
    }

StreamingRedactionResult(chunks, audit=None, error=None) dataclass

Output produced by a bounded streaming redaction step.

StreamingRedactionSession(policy)

Stateful bounded redactor for model token streams.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def __init__(self, policy: StreamingRedactionPolicy) -> None:
    self._policy = policy
    self._patterns = tuple(policy.patterns)
    self._buffer = ""
    self._original_chunks: list[str] = []
    self._emitted_chunks: list[str] = []
    self._redacted_count = 0
    self._closed = False

push(chunk)

Redact one token chunk and emit only the safe bounded prefix.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def push(self, chunk: str) -> StreamingRedactionResult:
    """Redact one token chunk and emit only the safe bounded prefix."""
    self._ensure_open()
    if chunk == "":
        return StreamingRedactionResult(chunks=())
    self._original_chunks.append(chunk)
    self._buffer = self._redact_text(f"{self._buffer}{chunk}")
    chunks = self._emit_safe_prefix()
    return StreamingRedactionResult(chunks=chunks)

finish()

Flush the remaining buffer and run the mandatory final audit.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def finish(self) -> StreamingRedactionResult:
    """Flush the remaining buffer and run the mandatory final audit."""
    self._ensure_open()
    self._closed = True
    redacted_buffer = self._redact_text(self._buffer)
    chunks = self._split_for_emit(redacted_buffer)
    self._buffer = ""
    self._emitted_chunks.extend(chunks)
    audit = self._audit()
    if audit.status == StreamingRedactionAuditStatus.FAILED:
        if self._policy.failure_mode == StreamingGuardFailureMode.RAISE:
            raise AgentOutputGuardError(
                "Streaming redaction final audit detected unmasked candidates"
            )
        return StreamingRedactionResult(
            chunks=chunks,
            audit=audit,
            error=audit.to_error_payload(),
        )
    return StreamingRedactionResult(chunks=chunks, audit=audit)

guard_json_value(value, sensitive_fields, policy)

Redact JSON-compatible values according to path-bound descriptors.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def guard_json_value(
    value: JsonValue,
    sensitive_fields: Sequence[SensitiveFieldDescriptor],
    policy: ContextExposurePolicy | EvidenceExposurePolicy,
) -> JsonValue:
    """Redact JSON-compatible values according to path-bound descriptors."""
    result = value
    for descriptor in sensitive_fields:
        result = _guard_json_path(result, descriptor.path, descriptor.field, policy)
    return result

schema_with_sensitive_metadata(schema, sensitive_fields, policy)

Return a JSON schema copy with policy-approved sensitivity extensions.

Source code in core/spakky-agent/src/spakky/agent/safety.py
def schema_with_sensitive_metadata(
    schema: Mapping[str, JsonValue],
    sensitive_fields: Sequence[SensitiveFieldDescriptor],
    policy: ContextExposurePolicy,
) -> JsonObject:
    """Return a JSON schema copy with policy-approved sensitivity extensions."""
    if not policy.include_sensitive_schema_metadata or not sensitive_fields:
        return schema
    result = _copy_json_object(schema)
    for descriptor in sensitive_fields:
        _attach_schema_extension(result, descriptor.path, descriptor.to_metadata())
    return result

Tooling

Agent tool descriptor discovery contracts.

Idempotency

Bases: StrEnum

Action idempotency declared by a tool.

ToolResumeAction

Bases: StrEnum

Resume action allowed by stored tool idempotency metadata.

DataAccess

Bases: StrEnum

Data access level declared by a tool.

Externality

Bases: StrEnum

External side-effect boundary declared by a tool.

EvidenceCapture

Bases: StrEnum

Evidence capture strategy for tool results.

ToolApprovalRequirement

Bases: StrEnum

Human approval requirement at the tool boundary.

ToolRiskAxis

Bases: StrEnum

Derived risk axes exposed for policy and UI decisions.

ToolPermission(name) dataclass

Typed permission marker attached to a tool descriptor.

__post_init__()

Reject permission names that cannot be matched deterministically.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject permission names that cannot be matched deterministically."""
    if not self.name.strip():
        raise AgentDefinitionError("Agent tool permission name cannot be blank")

ToolEffects(data_access=DataAccess.NONE, externality=Externality.LOCAL, destructive=False, network=False) dataclass

Typed effect metadata used to derive display risk outside core.

read_only() classmethod

Declare a local read-only tool.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
@classmethod
def read_only(cls) -> "ToolEffects":
    """Declare a local read-only tool."""
    return cls(data_access=DataAccess.READ, externality=Externality.LOCAL)

write_state() classmethod

Declare a local state-writing tool.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
@classmethod
def write_state(cls) -> "ToolEffects":
    """Declare a local state-writing tool."""
    return cls(data_access=DataAccess.WRITE, externality=Externality.LOCAL)

external_side_effect() classmethod

Declare a tool that crosses an external side-effect boundary.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
@classmethod
def external_side_effect(cls) -> "ToolEffects":
    """Declare a tool that crosses an external side-effect boundary."""
    return cls(
        data_access=DataAccess.READ_WRITE,
        externality=Externality.EXTERNAL,
        network=True,
    )

destructive_action() classmethod

Declare a tool that may irreversibly mutate local or external state.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
@classmethod
def destructive_action(cls) -> "ToolEffects":
    """Declare a tool that may irreversibly mutate local or external state."""
    return cls(
        data_access=DataAccess.WRITE,
        externality=Externality.EXTERNAL,
        destructive=True,
    )

TimeoutPolicy(seconds=None) dataclass

Optional timeout boundary for a tool invocation.

__post_init__()

Reject non-positive timeout policies at definition time.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject non-positive timeout policies at definition time."""
    if self.seconds is not None and self.seconds <= 0:
        raise AgentDefinitionError("Agent tool timeout must be positive")

ResultBudget(max_bytes=None) dataclass

Optional result-size budget for model-facing tool output.

__post_init__()

Reject result budgets that cannot constrain output.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject result budgets that cannot constrain output."""
    if self.max_bytes is not None and self.max_bytes <= 0:
        raise AgentDefinitionError("Agent tool result budget must be positive")

ToolResumeMetadata(idempotency=Idempotency.UNKNOWN) dataclass

Stored idempotency metadata used when resuming an incomplete action.

from_metadata(metadata) classmethod

Build resume metadata from a tool descriptor metadata object.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
@classmethod
def from_metadata(cls, metadata: "AgentToolMetadata") -> "ToolResumeMetadata":
    """Build resume metadata from a tool descriptor metadata object."""
    return cls(idempotency=metadata.idempotency)

action_for_completed_boundary()

Return the resume action for an already completed action boundary.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def action_for_completed_boundary(self) -> ToolResumeAction:
    """Return the resume action for an already completed action boundary."""
    return ToolResumeAction.SKIP_COMPLETED

action_for_incomplete_boundary()

Return the resume action for an incomplete action boundary.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def action_for_incomplete_boundary(self) -> ToolResumeAction:
    """Return the resume action for an incomplete action boundary."""
    if self.idempotency in (
        Idempotency.IDEMPOTENT,
        Idempotency.CONDITIONALLY_IDEMPOTENT,
    ):
        return ToolResumeAction.RETRY
    return ToolResumeAction.REQUIRE_APPROVAL

AgentToolSchemaHandle(name, input_schema_name, output_schema_name, input_schema=dict(), output_schema=dict(), input_sensitive_fields=(), output_sensitive_fields=()) dataclass

Stable schema names and generated JSON schemas owned by a descriptor.

__post_init__()

Reject blank schema handles.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject blank schema handles."""
    _require_non_blank(self.name, "Agent tool schema name")
    _require_non_blank(self.input_schema_name, "Agent tool input schema name")
    _require_non_blank(self.output_schema_name, "Agent tool output schema name")

input_schema_for(policy=None)

Return model-facing input schema under the requested exposure policy.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def input_schema_for(
    self,
    policy: ContextExposurePolicy | None = None,
) -> JsonObject:
    """Return model-facing input schema under the requested exposure policy."""
    return schema_with_sensitive_metadata(
        self.input_schema,
        self.input_sensitive_fields,
        policy or ContextExposurePolicy(),
    )

output_schema_for(policy=None)

Return model-facing output schema under the requested exposure policy.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def output_schema_for(
    self,
    policy: ContextExposurePolicy | None = None,
) -> JsonObject:
    """Return model-facing output schema under the requested exposure policy."""
    return schema_with_sensitive_metadata(
        self.output_schema,
        self.output_sensitive_fields,
        policy or ContextExposurePolicy(),
    )

AgentToolIdentity(owner_module, owner_qualname, name) dataclass

Descriptor identity independent from free-form model output text.

key property

Return a stable key for maps, logs, and evidence metadata.

__post_init__()

Reject identity parts that cannot be serialized or matched.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject identity parts that cannot be serialized or matched."""
    _require_non_blank(self.owner_module, "Agent tool owner module")
    _require_non_blank(self.owner_qualname, "Agent tool owner qualname")
    _require_non_blank(self.name, "Agent tool name")

AgentToolMetadata(permissions=(), effects=ToolEffects(), idempotency=Idempotency.UNKNOWN, data_access=DataAccess.NONE, externality=Externality.LOCAL, timeout=TimeoutPolicy(), result_budget=ResultBudget(), evidence=EvidenceCapture.NONE, approval=ToolApprovalRequirement.DERIVED) dataclass

Typed approval, idempotency, and evidence metadata for a descriptor.

risk property

Return derived risk axes without storing risk as source metadata.

resume property

Return resume metadata derived from stored idempotency.

requires_approval_candidate property

Return whether this tool is a HITL approval candidate.

ToolRisk(axes=()) dataclass

Derived typed risk axes for policy and evidence annotations.

requires_approval_candidate property

Return whether the risk is strong enough to suggest HITL approval.

__post_init__()

Reject duplicate axes so risk comparisons stay deterministic.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject duplicate axes so risk comparisons stay deterministic."""
    seen: set[ToolRiskAxis] = set()
    for axis in self.axes:
        if axis in seen:
            raise AgentDefinitionError("Agent tool risk axes must be unique")
        seen.add(axis)

from_metadata(metadata) classmethod

Derive risk axes from source tool metadata.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
@classmethod
def from_metadata(cls, metadata: AgentToolMetadata) -> "ToolRisk":
    """Derive risk axes from source tool metadata."""
    axes: list[ToolRiskAxis] = []
    if metadata.data_access in (DataAccess.READ, DataAccess.READ_WRITE):
        axes.append(ToolRiskAxis.READ)
    if metadata.data_access in (DataAccess.WRITE, DataAccess.READ_WRITE):
        axes.append(ToolRiskAxis.WRITE)
    if metadata.data_access in (DataAccess.WRITE, DataAccess.READ_WRITE):
        axes.append(ToolRiskAxis.SIDE_EFFECT)
    if metadata.externality in (Externality.EXTERNAL, Externality.UNKNOWN):
        axes.append(ToolRiskAxis.SIDE_EFFECT)
    if metadata.effects.destructive:
        axes.append(ToolRiskAxis.DESTRUCTIVE)
    if metadata.effects.network or metadata.externality == Externality.EXTERNAL:
        axes.append(ToolRiskAxis.NETWORK)
    return cls(axes=_deduplicate_axes(axes))

includes(axis)

Return whether this risk contains the requested axis.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def includes(self, axis: ToolRiskAxis) -> bool:
    """Return whether this risk contains the requested axis."""
    return axis in self.axes

AgentToolDefinition(name, schema_name, description=None, metadata=AgentToolMetadata()) dataclass

Method-level metadata attached by @agent_tool before owner discovery.

__post_init__()

Reject definitions that would make catalog lookup ambiguous.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject definitions that would make catalog lookup ambiguous."""
    _require_non_blank(self.name, "Agent tool name")
    _require_non_blank(self.schema_name, "Agent tool schema name")
    if self.description is not None and not self.description.strip():
        raise AgentDefinitionError("Agent tool description cannot be blank")

AgentToolDescriptor(identity, owner, callable, schema, description=None, metadata=AgentToolMetadata()) dataclass

Discovered tool descriptor bound to an owner class and callable.

name property

Return the descriptor-local tool name.

bind_invocation(payload)

Bind a decoded model payload before the tool callable is invoked.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def bind_invocation(self, payload: JsonObject) -> "AgentToolBoundInvocation":
    """Bind a decoded model payload before the tool callable is invoked."""
    return bind_agent_tool_invocation(self.callable, payload)

AgentToolBoundInvocation(args=(), kwargs=dict()) dataclass

Python-call-ready tool invocation arguments.

AgentToolCatalog(descriptors=()) dataclass

Deterministic catalog of descriptors discovered from an Agent class.

__post_init__()

Reject duplicate identity or schema names before model lookup.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def __post_init__(self) -> None:
    """Reject duplicate identity or schema names before model lookup."""
    identity_keys: set[str] = set()
    schema_names: set[str] = set()
    for descriptor in self.descriptors:
        if descriptor.identity.key in identity_keys:
            raise AgentDefinitionError("Agent tool identity must be unique")
        if descriptor.schema.name in schema_names:
            raise AgentDefinitionError("Agent tool schema name must be unique")
        identity_keys.add(descriptor.identity.key)
        schema_names.add(descriptor.schema.name)

by_identity(identity)

Lookup a descriptor by typed identity.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def by_identity(self, identity: AgentToolIdentity) -> AgentToolDescriptor:
    """Lookup a descriptor by typed identity."""
    for descriptor in self.descriptors:
        if descriptor.identity == identity:
            return descriptor
    raise AgentDefinitionError("Agent tool identity is not registered")

by_schema_name(schema_name)

Lookup a descriptor by model-facing schema name.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def by_schema_name(self, schema_name: str) -> AgentToolDescriptor:
    """Lookup a descriptor by model-facing schema name."""
    _require_non_blank(schema_name, "Agent tool schema name")
    for descriptor in self.descriptors:
        if descriptor.schema.name == schema_name:
            return descriptor
    raise AgentDefinitionError("Agent tool schema name is not registered")

agent_tool(*, name=None, schema_name=None, description=None, permissions=(), effects=None, idempotency=Idempotency.UNKNOWN, data_access=None, externality=None, timeout=None, result_budget=None, evidence=EvidenceCapture.NONE, approval=ToolApprovalRequirement.DERIVED)

Attach typed agent-tool metadata to a method object.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def agent_tool(
    *,
    name: str | None = None,
    schema_name: str | None = None,
    description: str | None = None,
    permissions: Sequence[ToolPermission] = (),
    effects: ToolEffects | None = None,
    idempotency: Idempotency = Idempotency.UNKNOWN,
    data_access: DataAccess | None = None,
    externality: Externality | None = None,
    timeout: TimeoutPolicy | None = None,
    result_budget: ResultBudget | None = None,
    evidence: EvidenceCapture = EvidenceCapture.NONE,
    approval: ToolApprovalRequirement = ToolApprovalRequirement.DERIVED,
) -> Callable[[FunctionType], FunctionType]:
    """Attach typed agent-tool metadata to a method object."""

    def decorate(function: FunctionType) -> FunctionType:
        tool_name = _normalize_name(name, function.__name__, "Agent tool name")
        normalized_schema_name = _normalize_name(
            schema_name,
            tool_name,
            "Agent tool schema name",
        )
        tool_effects = effects or ToolEffects()
        metadata = AgentToolMetadata(
            permissions=tuple(permissions),
            effects=tool_effects,
            idempotency=idempotency,
            data_access=data_access or tool_effects.data_access,
            externality=externality or tool_effects.externality,
            timeout=timeout or TimeoutPolicy(),
            result_budget=result_budget or ResultBudget(),
            evidence=evidence,
            approval=approval,
        )
        definition = AgentToolDefinition(
            name=tool_name,
            schema_name=normalized_schema_name,
            description=description,
            metadata=metadata,
        )
        function.__dict__[AGENT_TOOL_DEFINITION_KEY] = definition
        return function

    return decorate

discover_agent_tools(owner)

Discover @agent_tool methods in deterministic class-definition order.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def discover_agent_tools(owner: type[object]) -> AgentToolCatalog:
    """Discover @agent_tool methods in deterministic class-definition order."""
    descriptors: list[AgentToolDescriptor] = []
    resolved_member_names: set[str] = set()
    for declaring_owner in owner.__mro__:
        if declaring_owner is object:
            continue
        for member_name, member in vars(declaring_owner).items():
            if member_name in resolved_member_names:
                continue
            resolved_member_names.add(member_name)
            function = _unwrap_function(member)
            if function is None:
                continue
            definition = get_agent_tool_definition(function)
            if definition is None:
                continue
            descriptors.append(
                _build_descriptor(declaring_owner, function, definition),
            )
    ordered = tuple(
        sorted(
            descriptors,
            key=lambda descriptor: descriptor.identity.key,
        ),
    )
    return AgentToolCatalog(descriptors=ordered)

get_agent_tool_definition(function)

Return decorator metadata attached to a function object.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def get_agent_tool_definition(
    function: FunctionType,
) -> AgentToolDefinition | None:
    """Return decorator metadata attached to a function object."""
    candidate = vars(function).get(AGENT_TOOL_DEFINITION_KEY)
    if candidate is None:
        return None
    if not isinstance(candidate, AgentToolDefinition):
        raise AgentDefinitionError("Agent tool metadata is invalid")
    return candidate

bind_agent_tool_invocation(function, payload)

Bind a structured model payload to a tool function signature.

Source code in core/spakky-agent/src/spakky/agent/tooling.py
def bind_agent_tool_invocation(
    function: AgentToolCallable,
    payload: JsonObject,
) -> AgentToolBoundInvocation:
    """Bind a structured model payload to a tool function signature."""
    invocation = _parse_invocation_payload(payload)
    try:
        function_signature = signature(function)
    except (TypeError, ValueError) as e:
        raise AgentToolBindingError(
            "Agent tool callable signature cannot be inspected"
        ) from e
    call_signature = _drop_owner_parameter(function_signature)
    try:
        bound = call_signature.bind(*invocation.args, **invocation.kwargs)
    except TypeError as e:
        raise AgentToolBindingError(
            "Agent tool invocation payload cannot be bound"
        ) from e
    bound.apply_defaults()
    return AgentToolBoundInvocation(
        args=tuple(bound.args),
        kwargs=dict(bound.kwargs),
    )

Yield

Agent streaming yield contracts.

TextDelta = Token module-attribute

Backward-compatible token-yield payload alias.

Message = Progress module-attribute

Backward-compatible progress-yield payload alias.

AgentYieldKind

Bases: StrEnum

Canonical public vocabulary yielded by agent execution.

AgentYield(kind, payload) dataclass

Typed stream item returned from an agent execute generator.

Token(text, metadata=dict()) dataclass

Incremental model token intended for streaming clients.

Progress(message, current_step=None, metadata=dict()) dataclass

Agent progress update intended for direct inbound adapter consumption.

Tool(name, call_id=None, arguments=dict(), result=None, metadata=dict()) dataclass

Tool call or tool result surfaced by agent execution.

Evidence(evidence, metadata=dict()) dataclass

Evidence item surfaced to the inbound adapter.

Approval(id, prompt, allowed_decisions, metadata) dataclass

Approval request surfaced to the inbound adapter.

Error(code, message, retryable=False, metadata=dict()) dataclass

Recoverable or terminal execution error surfaced to the caller.

Cancel(reason=None, requested_by=None, metadata=dict()) dataclass

Cancellation acknowledgement surfaced to the caller.

Final(output, metadata) dataclass

Final output carried by a generator stream.

Model Interface

Provider-neutral agent model port.

ModelMessageRole

Bases: StrEnum

Roles accepted by provider-neutral model messages.

ModelMessage(role, content, metadata=dict()) dataclass

Provider-neutral model message.

JsonSchemaConstraint(schema, strict=True) dataclass

JSON schema constraint shared by structured output and tool calling.

StructuredOutputSpec(constraint, output_type_name=None) dataclass

Structured output contract requested from a model adapter.

ModelToolChoice

Bases: StrEnum

Provider-neutral tool calling strategy requested from a model adapter.

ModelToolSpec(name, parameters, description=None, metadata=dict()) dataclass

LLM-facing tool descriptor normalized by agent tooling.

ToolCallingSpec(tools, choice=ModelToolChoice.AUTO) dataclass

Tool calling contract requested from a model adapter.

SamplingOptions(temperature=None, top_p=None, max_tokens=None) dataclass

Portable model sampling options.

StreamingOptions(include_usage=True, include_progress=True) dataclass

Portable model streaming options.

ModelRequest(messages, context=tuple(), context_manifest=None, context_digest=None, structured_output=None, tool_calling=None, sampling=SamplingOptions(), streaming=StreamingOptions(), metadata=dict()) dataclass

Provider-neutral request passed to an agent model adapter.

assemble_messages(policy=None)

Assemble prompt messages from typed context packs without concatenation.

Source code in core/spakky-agent/src/spakky/agent/interfaces/model.py
def assemble_messages(
    self,
    policy: ContextExposurePolicy | None = None,
) -> tuple[ModelMessage, ...]:
    """Assemble prompt messages from typed context packs without concatenation."""
    exposure_policy = policy or ContextExposurePolicy()
    context_messages = tuple(
        ModelMessage(
            role=ModelMessageRole.EVIDENCE,
            content=pack.guarded_content(exposure_policy),
            metadata=pack.message_metadata(exposure_policy),
        )
        for pack in self.context
    )
    return (*self.messages, *context_messages)

ModelUsage(input_tokens=None, output_tokens=None, total_tokens=None) dataclass

Token accounting reported by a model adapter.

ModelToolCall(name, arguments, call_id=None, metadata=dict()) dataclass

Tool invocation candidate emitted by a model adapter.

guarded(sensitive_fields, policy=None)

Return a copy with sensitive argument values deterministically guarded.

Source code in core/spakky-agent/src/spakky/agent/interfaces/model.py
def guarded(
    self,
    sensitive_fields: Sequence[SensitiveFieldDescriptor],
    policy: EvidenceExposurePolicy | None = None,
) -> "ModelToolCall":
    """Return a copy with sensitive argument values deterministically guarded."""
    exposure_policy = policy or EvidenceExposurePolicy()
    guarded_arguments = guard_json_value(
        self.arguments,
        sensitive_fields,
        exposure_policy,
    )
    if not isinstance(guarded_arguments, Mapping):
        guarded_arguments = {}
    return replace(self, arguments=guarded_arguments)

ModelError(code, message, retryable=False, metadata=dict()) dataclass

Provider-neutral model failure payload.

ModelResponse(content, structured_output=None, tool_calls=tuple(), usage=ModelUsage(), metadata=dict()) dataclass

Provider-neutral non-streaming model response.

guarded(sensitive_fields, policy=None)

Return a copy with sensitive output payloads deterministically guarded.

Source code in core/spakky-agent/src/spakky/agent/interfaces/model.py
def guarded(
    self,
    sensitive_fields: Sequence[SensitiveFieldDescriptor],
    policy: EvidenceExposurePolicy | None = None,
) -> "ModelResponse":
    """Return a copy with sensitive output payloads deterministically guarded."""
    exposure_policy = policy or EvidenceExposurePolicy()
    content = self.content
    structured_output = self.structured_output
    content_descriptors = tuple(
        descriptor
        for descriptor in sensitive_fields
        if descriptor.path in ((), ("content",))
    )
    if content_descriptors:
        guarded_content = guard_json_value(
            {"content": content},
            tuple(
                SensitiveFieldDescriptor(("content",), descriptor.field)
                for descriptor in content_descriptors
            ),
            exposure_policy,
        )
        content_value = cast(Mapping[str, JsonValue], guarded_content).get(
            "content"
        )
        if isinstance(content_value, str):
            content = content_value
        else:
            content = "[REDACTED]"
    structured_output_descriptors = tuple(
        descriptor
        for descriptor in sensitive_fields
        if descriptor.path not in ((), ("content",))
    )
    if structured_output_descriptors:
        structured_output = guard_json_value(
            structured_output,
            structured_output_descriptors,
            exposure_policy,
        )
    return replace(self, content=content, structured_output=structured_output)

ModelStreamEventKind

Bases: StrEnum

Provider-neutral streaming event kinds emitted by a model adapter.

ModelStreamEvent(kind, token_delta=None, tool_call=None, structured_output=None, error=None, usage=None, metadata=dict()) dataclass

Provider-neutral model streaming event.

guarded(sensitive_fields, policy=None)

Return a copy with sensitive streaming payloads guarded.

Source code in core/spakky-agent/src/spakky/agent/interfaces/model.py
def guarded(
    self,
    sensitive_fields: Sequence[SensitiveFieldDescriptor],
    policy: EvidenceExposurePolicy | None = None,
) -> "ModelStreamEvent":
    """Return a copy with sensitive streaming payloads guarded."""
    exposure_policy = policy or EvidenceExposurePolicy()
    token_delta = self.token_delta
    structured_output = self.structured_output
    tool_call = self.tool_call
    if token_delta is not None:
        token_descriptors = tuple(
            descriptor
            for descriptor in sensitive_fields
            if descriptor.path in ((), ("token_delta",))
        )
        if token_descriptors:
            guarded_token = guard_json_value(
                {"token_delta": token_delta},
                tuple(
                    SensitiveFieldDescriptor(("token_delta",), descriptor.field)
                    for descriptor in token_descriptors
                ),
                exposure_policy,
            )
            token_value = cast(Mapping[str, JsonValue], guarded_token).get(
                "token_delta"
            )
            if isinstance(token_value, str):
                token_delta = token_value
            else:
                token_delta = "[REDACTED]"
    structured_descriptors = tuple(
        descriptor
        for descriptor in sensitive_fields
        if descriptor.path not in ((), ("token_delta",))
    )
    if structured_descriptors:
        structured_output = guard_json_value(
            structured_output,
            structured_descriptors,
            exposure_policy,
        )
    if tool_call is not None:
        tool_call = tool_call.guarded(sensitive_fields, exposure_policy)
    return replace(
        self,
        token_delta=token_delta,
        structured_output=structured_output,
        tool_call=tool_call,
    )

IAgentModel

Bases: ABC

Outbound model adapter port owned by spakky-agent core.

complete(request) abstractmethod async

Return a complete model response for the request.

Source code in core/spakky-agent/src/spakky/agent/interfaces/model.py
@abstractmethod
async def complete(self, request: ModelRequest) -> ModelResponse:
    """Return a complete model response for the request."""
    ...

stream(request) abstractmethod

Return provider-neutral stream events for the request.

Source code in core/spakky-agent/src/spakky/agent/interfaces/model.py
@abstractmethod
def stream(self, request: ModelRequest) -> AsyncIterator[ModelStreamEvent]:
    """Return provider-neutral stream events for the request."""
    ...

Persistence ports for durable agent state, signal, and evidence.

IAgentStateRepository

Bases: ABC

Materialized state repository for long-running agent executions.

get(state_id) abstractmethod

Return a persisted agent state by id.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def get(self, state_id: str) -> AgentState:
    """Return a persisted agent state by id."""
    ...

get_or_none(state_id) abstractmethod

Return a persisted agent state by id, or None when absent.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def get_or_none(self, state_id: str) -> AgentState | None:
    """Return a persisted agent state by id, or None when absent."""
    ...

save(state) abstractmethod

Persist the materialized state and return the saved snapshot.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def save(self, state: AgentState) -> AgentState:
    """Persist the materialized state and return the saved snapshot."""
    ...

list_by_status(status) abstractmethod

Return states matching an externally observable lifecycle status.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def list_by_status(self, status: AgentStatus) -> Sequence[AgentState]:
    """Return states matching an externally observable lifecycle status."""
    ...

list_resume_candidates() abstractmethod

Return active or interrupted states that may resume after restart.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def list_resume_candidates(self) -> Sequence[AgentState]:
    """Return active or interrupted states that may resume after restart."""
    ...

IAgentSignalRepository

Bases: ABC

Durable inbound queue repository for agent signals.

append(signal) abstractmethod

Append a signal to the inbound queue and return the stored entry.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def append(self, signal: AgentSignal) -> AgentSignal:
    """Append a signal to the inbound queue and return the stored entry."""
    ...

list_pending(state_id) abstractmethod

Return unconsumed signals for an agent state in append/queue order.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def list_pending(self, state_id: str) -> Sequence[AgentSignal]:
    """Return unconsumed signals for an agent state in append/queue order."""
    ...

mark_consumed(signal_id) abstractmethod

Mark one queued signal as consumed after a non-blocking poll.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def mark_consumed(self, signal_id: str) -> AgentSignal:
    """Mark one queued signal as consumed after a non-blocking poll."""
    ...

IAgentEvidenceRepository

Bases: ABC

Append-only evidence repository exposed to agent-facing code.

append(evidence) abstractmethod

Append immutable evidence and return the stored artifact.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def append(self, evidence: AgentEvidence) -> AgentEvidence:
    """Append immutable evidence and return the stored artifact."""
    ...

get(evidence_id) abstractmethod

Return one evidence artifact by id.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def get(self, evidence_id: str) -> AgentEvidence:
    """Return one evidence artifact by id."""
    ...

list_by_state(state_id) abstractmethod

Return evidence artifacts captured for an agent state.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def list_by_state(self, state_id: str) -> Sequence[AgentEvidence]:
    """Return evidence artifacts captured for an agent state."""
    ...

list_by_manifest_ref(manifest_ref) abstractmethod

Return evidence artifacts associated with a context manifest.

Source code in core/spakky-agent/src/spakky/agent/interfaces/repository.py
@abstractmethod
def list_by_manifest_ref(self, manifest_ref: str) -> Sequence[AgentEvidence]:
    """Return evidence artifacts associated with a context manifest."""
    ...

Types

Shared JSON-compatible type aliases for agent contracts.

JsonPrimitive = bool | float | int | str | None

Scalar JSON value accepted by agent public contracts.

JsonValue = JsonPrimitive | Mapping[str, JsonValue] | Sequence[JsonValue]

Recursive JSON-compatible value used at model, signal, and evidence boundaries.

JsonObject = Mapping[str, JsonValue]

JSON object payload used by public agent contracts.

Plugin

Plugin initialization entry point for spakky-agent.

initialize(app)

Initialize spakky-agent core contracts.

The package intentionally registers no persistence implementation; production repositories must arrive through feature contributions.

Source code in core/spakky-agent/src/spakky/agent/main.py
def initialize(app: SpakkyApplication) -> None:
    """Initialize spakky-agent core contracts.

    The package intentionally registers no persistence implementation; production
    repositories must arrive through feature contributions.
    """
    app.add(AgentBootstrapValidationPostProcessor)

Bootstrap validation for Agent Pod instances.

AgentBootstrapValidationPostProcessor()

Bases: IPostProcessor, IContainerAware

Validate Agent metadata during application bootstrap.

Initialize without assuming application context injection happened.

Source code in core/spakky-agent/src/spakky/agent/post_processor.py
def __init__(self) -> None:
    """Initialize without assuming application context injection happened."""
    self._container = None

set_container(container)

Receive the active container for contribution validation.

Source code in core/spakky-agent/src/spakky/agent/post_processor.py
@override
def set_container(self, container: IContainer) -> None:
    """Receive the active container for contribution validation."""
    self._container = container

post_process(pod)

Fail startup when an Agent contract is no longer valid.

Source code in core/spakky-agent/src/spakky/agent/post_processor.py
@override
def post_process(self, pod: object) -> object:
    """Fail startup when an Agent contract is no longer valid."""
    agent = Agent.get_or_none(type(pod))
    if agent is None:
        return pod
    try:
        agent.validate_bootstrap()
    except AgentDefinitionError as e:
        raise AgentBootstrapError("Agent bootstrap contract is invalid") from e
    self._validate_persistence_contributions(agent)
    return pod

에러

Error classes for the spakky-agent package.

AbstractSpakkyAgentError

Bases: AbstractSpakkyFrameworkError, ABC

Base class for agent-related errors.

AgentDefinitionError

Bases: AbstractSpakkyAgentError

Raised when an agent contract cannot be defined safely.

AgentToolBindingError

Bases: AbstractSpakkyAgentError

Raised when a model tool-call payload cannot be bound safely.

AgentBootstrapError

Bases: AbstractSpakkyAgentError

Raised when agent bootstrap validation fails.

AgentPersistenceConfigurationError

Bases: AgentBootstrapError

Raised when production agent persistence is required but not provided.

AgentModelConfigurationError

Bases: AgentBootstrapError

Raised when an agent requires a model adapter but none is registered.

AgentOutputGuardError

Bases: AbstractSpakkyAgentError

Raised when an output guard detects unsafe streaming exposure.