"""Signed grant tokens for cross-agent workspace handoff. A grant is a small, self-contained, signed claim issued by one agent that the platform (or the receiving agent) can verify without a registry round-trip. Wire format:: "." The payload describes *what* the callee is allowed to do, *whose* workspace they can see, and *for how long*. The runtime on the receiving side materializes a :class:`WorkspaceClient` scoped to that grant. Auth model is intentionally simple for v1: a shared platform secret signs every grant. Swap for asymmetric (X.509 / JWKS) when crossing trust domains. """ from __future__ import annotations import base64 import hashlib import hmac import json import os import secrets import time from typing import Any from pydantic import BaseModel, ConfigDict, Field, NonNegativeInt from .workspace import WorkspaceMode DEFAULT_TTL_SECONDS = 5 * 60 class GrantInvalid(PermissionError): """Raised by :func:`verify_grant` when a grant is bad/expired/forged.""" class Grant(BaseModel): """The payload of a signed grant token. A grant binds *who* (issuer) gave *whom* (audience) access to *which* workspace files (bucket + allow/deny patterns) under *what* mode and *how long*. The runtime enforces every line of this payload. """ model_config = ConfigDict(extra="forbid", frozen=True) grant_id: str issuer: str # caller agent name or URL audience: str # callee agent name or URL bucket: str # workspace bucket the grant covers mode: WorkspaceMode = WorkspaceMode.READ_ONLY allow_patterns: tuple[str, ...] = ("**",) deny_patterns: tuple[str, ...] = () outputs_prefix: str | None = None # if set, callee writes only here expires_at: NonNegativeInt = 0 issued_at: NonNegativeInt = 0 nonce: str = Field(default_factory=lambda: secrets.token_hex(8)) def _b64encode(b: bytes) -> str: return base64.urlsafe_b64encode(b).rstrip(b"=").decode("ascii") def _b64decode(s: str) -> bytes: pad = "=" * (-len(s) % 4) return base64.urlsafe_b64decode(s + pad) def _platform_secret() -> bytes: secret = os.environ.get("A2A_PLATFORM_SECRET", "dev-secret-rotate-me") return secret.encode("utf-8") def mint_grant( *, issuer: str, audience: str, bucket: str, mode: WorkspaceMode = WorkspaceMode.READ_ONLY, allow_patterns: tuple[str, ...] = ("**",), deny_patterns: tuple[str, ...] = (), outputs_prefix: str | None = None, ttl_seconds: int = DEFAULT_TTL_SECONDS, secret: bytes | None = None, ) -> tuple[Grant, str]: """Build a :class:`Grant` and return it together with its signed token.""" now = int(time.time()) grant = Grant( grant_id=secrets.token_hex(8), issuer=issuer, audience=audience, bucket=bucket, mode=mode, allow_patterns=tuple(allow_patterns), deny_patterns=tuple(deny_patterns), outputs_prefix=outputs_prefix, expires_at=now + ttl_seconds, issued_at=now, ) return grant, sign_grant(grant, secret=secret) def sign_grant(grant: Grant, *, secret: bytes | None = None) -> str: payload = grant.model_dump_json(exclude_none=False).encode("utf-8") sig = hmac.new(secret or _platform_secret(), payload, hashlib.sha256).digest() return f"{_b64encode(payload)}.{_b64encode(sig)}" def verify_grant(token: str, *, secret: bytes | None = None) -> Grant: """Parse + verify ``token``. Raises :class:`GrantInvalid` on any failure. Checks signature, expiry, and minimal structural shape. Caller-specific audience checks are layered on top by the server adapter. """ if not token or "." not in token: raise GrantInvalid("malformed grant token") payload_b64, sig_b64 = token.rsplit(".", 1) try: payload = _b64decode(payload_b64) sig = _b64decode(sig_b64) except (ValueError, base64.binascii.Error) as exc: # type: ignore[attr-defined] raise GrantInvalid(f"grant decode failed: {exc}") from exc expected = hmac.new(secret or _platform_secret(), payload, hashlib.sha256).digest() if not hmac.compare_digest(expected, sig): raise GrantInvalid("grant signature mismatch") try: data = json.loads(payload) grant = Grant.model_validate(data) except Exception as exc: # noqa: BLE001 raise GrantInvalid(f"grant payload invalid: {exc}") from exc if grant.expires_at and grant.expires_at < int(time.time()): raise GrantInvalid(f"grant expired at {grant.expires_at}") return grant __all__ = [ "Grant", "GrantInvalid", "mint_grant", "sign_grant", "verify_grant", "DEFAULT_TTL_SECONDS", ]