From 2dcb8a09cdd6989f74dc75381210ebf4dcf2b7cf Mon Sep 17 00:00:00 2001 From: robert Date: Sat, 9 May 2026 12:43:07 -0300 Subject: [PATCH] ship grants, a2a_client, discovery, sandbox SDK + tests --- a2a_pack/__init__.py | 38 ++++++ a2a_pack/a2a_client.py | 170 ++++++++++++++++++++++++ a2a_pack/agent.py | 17 ++- a2a_pack/cli/api_client.py | 31 +++++ a2a_pack/cli/main.py | 147 +++++++++++---------- a2a_pack/context.py | 71 ++++++++++ a2a_pack/discovery.py | 176 +++++++++++++++++++++++++ a2a_pack/grants.py | 148 +++++++++++++++++++++ a2a_pack/sandbox.py | 174 +++++++++++++++++++++++++ a2a_pack/serve/asgi.py | 51 +++++++- a2a_pack/workspace.py | 41 ++++++ examples/coder_agent.py | 145 +++++++++++++++++++++ examples/multi_agent.py | 253 +++++++++++++++++++++++++++++++++++ tests/test_multi_agent.py | 261 +++++++++++++++++++++++++++++++++++++ tests/test_sandbox.py | 205 +++++++++++++++++++++++++++++ 15 files changed, 1853 insertions(+), 75 deletions(-) create mode 100644 a2a_pack/a2a_client.py create mode 100644 a2a_pack/discovery.py create mode 100644 a2a_pack/grants.py create mode 100644 a2a_pack/sandbox.py create mode 100644 examples/coder_agent.py create mode 100644 examples/multi_agent.py create mode 100644 tests/test_multi_agent.py create mode 100644 tests/test_sandbox.py diff --git a/a2a_pack/__init__.py b/a2a_pack/__init__.py index ac9a737..31cb125 100644 --- a/a2a_pack/__init__.py +++ b/a2a_pack/__init__.py @@ -1,3 +1,9 @@ +from .a2a_client import ( + A2AClient, + CallResult, + HttpA2AClient, + InMemoryA2AClient, +) from .agent import ( A2AAgent, ParamSpec, @@ -7,6 +13,13 @@ from .agent import ( SkillSpec, skill, ) +from .discovery import ( + ControlPlaneDiscovery, + DiscoveredAgent, + DiscoveryClient, + InMemoryDiscovery, +) +from .grants import Grant, GrantInvalid, mint_grant, sign_grant, verify_grant from .auth import APIKeyAuth, JWTAuth, NoAuth from .card import AgentCard, SkillCard from .context import ( @@ -26,6 +39,13 @@ from .runtime import ( SkillPolicy, State, ) +from .sandbox import ( + ExecResult, + SandboxClient, + SandboxHandle, + SandboxSpec, + SandboxUnavailable, +) from .workspace import ( FileMatch, FileType, @@ -42,15 +62,26 @@ from .workspace import ( __all__ = [ "A2AAgent", + "A2AClient", "APIKeyAuth", "AgentCard", "AgentEvent", "AgentRuntime", "ArtifactRef", + "CallResult", "CancelledByCaller", + "ControlPlaneDiscovery", + "DiscoveredAgent", + "DiscoveryClient", "EgressPolicy", + "ExecResult", "FileMatch", "FileType", + "Grant", + "GrantInvalid", + "HttpA2AClient", + "InMemoryA2AClient", + "InMemoryDiscovery", "JWTAuth", "Lifecycle", "LocalRunContext", @@ -62,6 +93,13 @@ __all__ = [ "Resources", "RunContext", "Sandbox", + "mint_grant", + "sign_grant", + "verify_grant", + "SandboxClient", + "SandboxHandle", + "SandboxSpec", + "SandboxUnavailable", "SkillCard", "SkillInputError", "SkillInvocationError", diff --git a/a2a_pack/a2a_client.py b/a2a_pack/a2a_client.py new file mode 100644 index 0000000..0cbb5c7 --- /dev/null +++ b/a2a_pack/a2a_client.py @@ -0,0 +1,170 @@ +"""Agent-to-agent invocation surface available via ``ctx.call(...)``. + +An agent never speaks raw HTTP to another agent. It calls +``ctx.call(target, skill, args, grant=...)`` and the runtime-attached +:class:`A2AClient` handles transport: HTTP for cross-pod, in-memory for +local tests, anything else (gRPC, message bus) for future runtimes. + +The grant token (see :mod:`a2a_pack.grants`) is the *only* way to hand +workspace access across agents. Callee-side runtime validates it before +materializing a :class:`WorkspaceClient`. +""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from .agent import A2AAgent + from .context import RunContext + + +@dataclass(frozen=True) +class CallResult: + """What an A2A invocation returns to the calling skill.""" + + result: Any + events: tuple[dict[str, Any], ...] = () + artifacts: tuple[dict[str, Any], ...] = () + grant_id: str | None = None # echoed for audit + + +class A2AClient(ABC): + """Transport-shaped agent-to-agent client.""" + + @abstractmethod + async def call( + self, + target: str, + skill: str, + *, + args: dict[str, Any] | None = None, + grant: str | None = None, + timeout: float | None = None, + ) -> CallResult: + """Invoke ``skill`` on ``target`` and return its :class:`CallResult`. + + ``target`` is opaque to this layer — for the HTTP impl it's an agent + URL; for the in-memory impl it's an agent name. + """ + + +# --------------------------------------------------------------------------- +# In-memory: routes calls to A2AAgent instances in the same process. Useful +# for the demo + tests. +# --------------------------------------------------------------------------- + + +@dataclass +class InMemoryA2AClient(A2AClient): + """Routes calls to agent instances registered by name. + + The receiving agent gets a *new* :class:`RunContext` built by the + ``ctx_factory`` callable, so caller and callee don't share state. + Pass ``ctx_factory=lambda agent, grant: ...`` to control how scoped + workspaces / sandboxes are wired in. + """ + + agents: dict[str, "A2AAgent"] + ctx_factory: Any = None # Callable[[A2AAgent, str | None], RunContext] + + async def call( + self, + target: str, + skill: str, + *, + args: dict[str, Any] | None = None, + grant: str | None = None, + timeout: float | None = None, + ) -> CallResult: + if target not in self.agents: + raise KeyError(f"no agent registered: {target!r}") + agent = self.agents[target] + ctx = self.ctx_factory(agent, grant) if self.ctx_factory else None + if ctx is None: + from .context import LocalRunContext + from .auth import NoAuth + + ctx = LocalRunContext(auth=NoAuth(), task_id=f"a2a-{target}") + result = await agent.invoke_json(skill, ctx, args or {}) + events = tuple( + {"kind": e.kind, "payload": e.payload} + for e in getattr(ctx, "events", ()) + ) + # surface artifacts captured by LocalRunContext, if present + artifacts: tuple[dict[str, Any], ...] = () + local_arts = getattr(ctx, "artifacts", None) + if isinstance(local_arts, dict): + artifacts = tuple( + {"name": name, "size_bytes": len(data)} + for name, data in local_arts.items() + ) + return CallResult( + result=result, + events=events, + artifacts=artifacts, + grant_id=_grant_id_or_none(grant), + ) + + +# --------------------------------------------------------------------------- +# HTTP: posts to /invoke/ with {arguments, grant} body. +# --------------------------------------------------------------------------- + + +@dataclass +class HttpA2AClient(A2AClient): + """A2A client that POSTs to the standard /invoke/{skill} endpoint.""" + + default_timeout: float = 60.0 + + async def call( + self, + target: str, + skill: str, + *, + args: dict[str, Any] | None = None, + grant: str | None = None, + timeout: float | None = None, + ) -> CallResult: + import httpx # late import: server-side needs no client + + body: dict[str, Any] = {"arguments": args or {}} + if grant is not None: + body["grant"] = grant + url = f"{target.rstrip('/')}/invoke/{skill}" + async with httpx.AsyncClient(timeout=timeout or self.default_timeout) as c: + resp = await c.post(url, json=body) + if resp.status_code >= 400: + raise RuntimeError(f"a2a {url} -> {resp.status_code}: {resp.text}") + data = resp.json() + return CallResult( + result=data.get("result"), + events=tuple(data.get("events") or ()), + artifacts=tuple(data.get("artifacts") or ()), + grant_id=_grant_id_or_none(grant), + ) + + +def _grant_id_or_none(grant: str | None) -> str | None: + """Extract grant_id without re-validating the signature (audit only).""" + if not grant or "." not in grant: + return None + try: + from .grants import _b64decode + + payload = _b64decode(grant.rsplit(".", 1)[0]) + import json + + return json.loads(payload).get("grant_id") + except Exception: # noqa: BLE001 + return None + + +__all__ = [ + "A2AClient", + "CallResult", + "HttpA2AClient", + "InMemoryA2AClient", +] diff --git a/a2a_pack/agent.py b/a2a_pack/agent.py index 4840f22..716db26 100644 --- a/a2a_pack/agent.py +++ b/a2a_pack/agent.py @@ -372,19 +372,26 @@ class A2AAgent(Generic[ConfigT, AuthT], metaclass=_AgentMeta): secrets: dict[str, str] | None = None, task_id: str = "local-task", workspace: Any = None, # WorkspaceClient or None + sandbox: Any = None, # SandboxClient or None + a2a: Any = None, # A2AClient or None + discover: Any = None, # DiscoveryClient or None **kwargs: Any, ) -> Any: """Convenience harness: build a :class:`LocalRunContext` and invoke. - Useful in tests and notebooks. ``auth`` defaults to a default-constructed - instance of the agent's ``auth_model`` (works for :class:`NoAuth`; pass - an explicit instance for auth models with required fields). Pass - ``workspace=`` to bind a :class:`WorkspaceClient`. + Useful in tests and notebooks. Pass ``workspace=``, ``sandbox=``, + ``a2a=``, and/or ``discover=`` to bind concrete runtime clients. """ if auth is None: auth = typing.cast(AuthT, type(self).auth_model()) ctx: LocalRunContext[AuthT] = LocalRunContext( - auth=auth, secrets=secrets, task_id=task_id, workspace=workspace + auth=auth, + secrets=secrets, + task_id=task_id, + workspace=workspace, + sandbox=sandbox, + a2a=a2a, + discover=discover, ) return await self.invoke(skill_name, ctx, **kwargs) diff --git a/a2a_pack/cli/api_client.py b/a2a_pack/cli/api_client.py index 18f6aa9..18ce430 100644 --- a/a2a_pack/cli/api_client.py +++ b/a2a_pack/cli/api_client.py @@ -88,6 +88,37 @@ class ControlPlaneClient: }, ) + def from_tarball( + self, + *, + name: str, + version: str, + entrypoint: str, + description: str, + public: bool, + tarball: bytes, + ) -> dict[str, Any]: + with httpx.Client(timeout=120.0) as c: + resp = c.post( + f"{self.api_url}/v1/agents/from-tarball", + headers={"authorization": f"bearer {self.token}"} if self.token else {}, + data={ + "name": name, + "version": version, + "entrypoint": entrypoint, + "description": description, + "public": str(public).lower(), + }, + files={"source": ("source.tar.gz", tarball, "application/gzip")}, + ) + if resp.status_code >= 400: + try: + detail = resp.json().get("detail", resp.text) + except Exception: # noqa: BLE001 + detail = resp.text + raise ApiError(resp.status_code, str(detail)) + return resp.json() + def list_agents(self) -> list[dict[str, Any]]: return self._request("GET", "/v1/agents") diff --git a/a2a_pack/cli/main.py b/a2a_pack/cli/main.py index ad1cb08..ba533c3 100644 --- a/a2a_pack/cli/main.py +++ b/a2a_pack/cli/main.py @@ -2,6 +2,7 @@ from __future__ import annotations import json +import os import re import shutil import subprocess @@ -204,16 +205,6 @@ def init( "a2a.yaml.tmpl", name=name, class_name=class_name ), "requirements.txt": _render_template("requirements.txt.tmpl"), - "Dockerfile": _render_template( - "Dockerfile.tmpl", entrypoint=f"agent:{class_name}" - ), - ".dockerignore": _render_template("dockerignore.tmpl"), - ".gitea/workflows/build.yml": _render_template( - "workflow.yml.tmpl", name=name - ), - "deploy/20-deployment.yaml": _render_template( - "deployment.yaml.tmpl", name=name - ), } for relpath, content in files.items(): target_path = project / relpath @@ -323,60 +314,71 @@ def build( console.print(f"[green]pushed[/] [cyan]{image}[/]") -def _git_push_source(project: Path, push_url: str) -> None: - """Initialize the project as a git repo (if needed) and push to ``push_url``. +def _make_tarball(project: Path) -> bytes: + """Tar.gz the user's project directory. - Idempotent: re-running on an existing repo just commits any changes - and pushes. + Excludes platform/dev artifacts so build images stay small. The + platform stamps in Dockerfile/workflow/manifests on the server side. """ - git_dir = project / ".git" - if not git_dir.exists(): - _run(["git", "-C", str(project), "init", "-q", "-b", "main"]) - _run(["git", "-C", str(project), "config", "user.email", "agent@a2a.local"]) - _run(["git", "-C", str(project), "config", "user.name", "agent"]) - # Ensure remote - have_remote = subprocess.run( - ["git", "-C", str(project), "remote", "get-url", "origin"], - capture_output=True, - text=True, - check=False, - ).returncode == 0 - if have_remote: - _run(["git", "-C", str(project), "remote", "set-url", "origin", push_url]) - else: - _run(["git", "-C", str(project), "remote", "add", "origin", push_url]) - _run(["git", "-C", str(project), "add", "-A"]) - status = subprocess.run( - ["git", "-C", str(project), "status", "--porcelain"], - capture_output=True, - text=True, - check=True, - ).stdout.strip() - if status: - _run(["git", "-C", str(project), "commit", "-q", "-m", "deploy"]) - # Pull --rebase first to integrate any auto-bump commits the runner pushed back. - subprocess.run( - ["git", "-C", str(project), "pull", "--rebase", "origin", "main"], - capture_output=True, - text=True, - check=False, - ) - _run(["git", "-C", str(project), "push", "-u", "origin", "main"]) + import io + import tarfile + + excluded_dirs = { + "__pycache__", + ".venv", + ".git", + ".pytest_cache", + ".mypy_cache", + "node_modules", + "dist", + "build", + ".gitea", + "deploy", + } + excluded_files = {"Dockerfile", ".dockerignore"} + + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + for root, dirs, files in os.walk(project): + dirs[:] = [d for d in dirs if d not in excluded_dirs] + for fname in files: + if fname in excluded_files or fname.endswith(".pyc"): + continue + fpath = Path(root) / fname + arcname = fpath.relative_to(project) + tar.add(fpath, arcname=str(arcname)) + return buf.getvalue() + + +def _wait_for_url(url: str, timeout: int = 180) -> bool: + import time + + import httpx + + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + with httpx.Client(timeout=3.0) as c: + if c.get(f"{url}/healthz").status_code == 200: + return True + except Exception: # noqa: BLE001 + pass + time.sleep(4) + return False @app.command() def deploy( project: Path = typer.Option(Path("."), "--project", "-p"), public: bool | None = typer.Option(None, "--public/--private"), + wait: bool = typer.Option(True, "--wait/--no-wait", help="Poll until URL is live"), api: str | None = typer.Option(None, "--api", help="Override control plane URL"), ) -> None: - """Push source to gitea; the platform builds + deploys. + """Ship the agent. - No local docker. The CLI: - 1. asks the control plane to provision a gitea repo + ArgoCD app - 2. ``git push``es the project source to that repo - The Gitea Actions runner builds the image and ArgoCD reconciles - ``deploy/`` onto the cluster. Returns the public URL once registered. + Tarballs your source, uploads to the control plane, and prints the + URL when it's live. No local docker. No git. No knowledge of how the + platform builds or deploys. """ creds = credentials.load() if creds is None: @@ -389,33 +391,29 @@ def deploy( ) description = cfg.get("description", cls.description or "") - if not (project / "deploy").exists() or not (project / ".gitea/workflows/build.yml").exists(): - _fail( - "project missing deploy/ or .gitea/workflows/. " - "Re-run `a2a init` or upgrade the scaffold." - ) + console.print("[dim]packaging source...[/]") + tarball = _make_tarball(project) + size_kb = len(tarball) / 1024 + console.print(f"[dim]uploading {size_kb:.1f}KB to {credentials.resolve_api_url(api)}...[/]") client = _client(api) - console.print(f"[dim]asking control plane to provision repo + argo app...[/]") try: - prov = client.from_source( + out = client.from_tarball( name=cls.name, - description=description, version=cfg["version"], + entrypoint=cfg["entrypoint"], + description=description, public=is_public, + tarball=tarball, ) except ApiError as exc: _fail(str(exc)) - console.print(f"[dim]pushing source → {prov['repo_url']}[/]") - _git_push_source(project, prov["push_url"]) - summary = { - "agent": prov["name"], - "repo": prov["repo_url"], - "url": prov.get("expected_url"), - "card": f"{prov['expected_url']}/.well-known/agent-card" if prov.get("expected_url") else None, - "next": "the runner is building. `a2a agents` to check status, or curl the url in ~30s", + "agent": out["name"], + "version": out["version"], + "status": out["status"], + "url": out.get("url"), } console.print( Panel.fit( @@ -424,6 +422,17 @@ def deploy( ) ) + url = out.get("url") + if wait and url: + console.print(f"[dim]waiting for {url} ...[/]") + ready = _wait_for_url(url) + if ready: + console.print(f"[green]live[/]: {url}") + else: + console.print( + f"[yellow]still building[/]; check `a2a agents` or curl {url} in a bit" + ) + # Used as `python -m a2a_pack.cli.main` if __name__ == "__main__": # pragma: no cover diff --git a/a2a_pack/context.py b/a2a_pack/context.py index 78f8842..d2641b4 100644 --- a/a2a_pack/context.py +++ b/a2a_pack/context.py @@ -13,6 +13,9 @@ from typing import Any, Generic, Sequence, TypeVar from pydantic import BaseModel +from .a2a_client import A2AClient, CallResult +from .discovery import DiscoveryClient +from .sandbox import SandboxClient, SandboxUnavailable from .workspace import WorkspaceClient AuthT = TypeVar("AuthT", bound=BaseModel) @@ -88,6 +91,43 @@ class RunContext(ABC, Generic[AuthT]): Raises if the agent's :attr:`A2AAgent.workspace_access` is disabled. """ + @property + @abstractmethod + def sandbox(self) -> SandboxClient: + """Code-execution surface (microsandbox-backed by default). + + Raises :class:`SandboxUnavailable` if the runtime did not attach a + sandbox client to this context (e.g. local dev with no host daemon). + """ + + @property + @abstractmethod + def discover(self) -> DiscoveryClient: + """Registry-backed discovery: find other agents by tag/capability/skill.""" + + async def call( + self, + target: str, + skill: str, + *, + args: dict[str, Any] | None = None, + grant: str | None = None, + timeout: float | None = None, + ) -> CallResult: + """Invoke another agent's skill via the runtime's :class:`A2AClient`. + + ``target`` is whatever the underlying client expects — an HTTP URL + for :class:`HttpA2AClient`, an agent name for in-process routing. + Pair with :meth:`WorkspaceClient.delegate` to hand a scoped + workspace grant to the callee. + """ + client = self._a2a_client() + return await client.call(target, skill, args=args, grant=grant, timeout=timeout) + + @abstractmethod + def _a2a_client(self) -> A2AClient: + """Return the runtime's outbound A2A client (or raise if absent).""" + # --- concrete helpers built on emit_event --- async def emit_progress(self, message: str) -> None: @@ -146,11 +186,17 @@ class LocalRunContext(RunContext[AuthT]): task_id: str = "local-task", secrets: dict[str, str] | None = None, workspace: WorkspaceClient | None = None, + sandbox: SandboxClient | None = None, + a2a: A2AClient | None = None, + discover: DiscoveryClient | None = None, ) -> None: self.task_id = task_id self.auth = auth self._secrets: dict[str, str] = dict(secrets or {}) self._workspace = workspace + self._sandbox = sandbox + self._a2a = a2a + self._discover = discover self._cancel = asyncio.Event() self.events: list[AgentEvent] = [] self.artifacts: dict[str, bytes] = {} @@ -164,6 +210,31 @@ class LocalRunContext(RunContext[AuthT]): ) return self._workspace + @property + def sandbox(self) -> SandboxClient: + if self._sandbox is None: + raise SandboxUnavailable( + "no sandbox client attached to this context; " + "the runtime layer must provision one" + ) + return self._sandbox + + @property + def discover(self) -> DiscoveryClient: + if self._discover is None: + raise PermissionError( + "no discovery client attached; runtime must provision one" + ) + return self._discover + + def _a2a_client(self) -> A2AClient: + if self._a2a is None: + raise PermissionError( + "no A2A client attached; runtime must provision one before " + "ctx.call(...) can be used" + ) + return self._a2a + async def emit_event(self, event: AgentEvent) -> None: self.events.append(event) diff --git a/a2a_pack/discovery.py b/a2a_pack/discovery.py new file mode 100644 index 0000000..2d639a0 --- /dev/null +++ b/a2a_pack/discovery.py @@ -0,0 +1,176 @@ +"""Agent discovery surface available via ``ctx.discover``. + +Agents find each other by capability/tag/skill — *never* by hardcoded URL. +The runtime attaches a :class:`DiscoveryClient`; the canonical impl +queries the platform's agent registry (control plane). +""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Sequence + +from .card import AgentCard + + +@dataclass(frozen=True) +class DiscoveredAgent: + """A registry hit. ``url`` is what the caller hands to ``ctx.call``.""" + + name: str + url: str | None + card: AgentCard + + +class DiscoveryClient(ABC): + """Discovery surface.""" + + @abstractmethod + async def find_agents( + self, + *, + tags: Sequence[str] = (), + capability: str | None = None, + skill: str | None = None, + limit: int = 10, + ) -> list[DiscoveredAgent]: ... + + @abstractmethod + async def get_agent(self, name: str) -> DiscoveredAgent: ... + + +# --------------------------------------------------------------------------- +# In-memory impl: a name → AgentCard map. Used by tests + the demo. +# --------------------------------------------------------------------------- + + +class InMemoryDiscovery(DiscoveryClient): + def __init__(self, agents: dict[str, DiscoveredAgent]) -> None: + self._agents = dict(agents) + + async def find_agents( + self, + *, + tags: Sequence[str] = (), + capability: str | None = None, + skill: str | None = None, + limit: int = 10, + ) -> list[DiscoveredAgent]: + wanted = {t.lower() for t in tags} + out: list[DiscoveredAgent] = [] + for da in self._agents.values(): + if capability and capability not in da.card.capabilities: + continue + if skill and not any(s.name == skill for s in da.card.skills): + continue + if wanted: + skill_tags = { + t.lower() for s in da.card.skills for t in s.tags + } + if not (wanted & skill_tags): + continue + out.append(da) + if len(out) >= limit: + break + return out + + async def get_agent(self, name: str) -> DiscoveredAgent: + if name not in self._agents: + raise KeyError(f"no agent: {name!r}") + return self._agents[name] + + +# --------------------------------------------------------------------------- +# Control-plane backed: queries GET /v1/agents (with optional ?tag=, ?skill=). +# --------------------------------------------------------------------------- + + +class ControlPlaneDiscovery(DiscoveryClient): + """Hits the platform's agent registry (control plane).""" + + def __init__( + self, + api_url: str, + *, + token: str | None = None, + timeout: float = 10.0, + ) -> None: + self.api_url = api_url.rstrip("/") + self._token = token + self._timeout = timeout + + def _headers(self) -> dict[str, str]: + h = {"accept": "application/json"} + if self._token: + h["authorization"] = f"bearer {self._token}" + return h + + async def find_agents( + self, + *, + tags: Sequence[str] = (), + capability: str | None = None, + skill: str | None = None, + limit: int = 10, + ) -> list[DiscoveredAgent]: + import httpx + + params: dict[str, Any] = {"limit": limit} + if tags: + params["tag"] = list(tags) + if capability: + params["capability"] = capability + if skill: + params["skill"] = skill + async with httpx.AsyncClient(timeout=self._timeout) as c: + resp = await c.get( + f"{self.api_url}/v1/agents", headers=self._headers(), params=params + ) + resp.raise_for_status() + rows = resp.json() + out: list[DiscoveredAgent] = [] + for row in rows or []: + # Lazy: list endpoint returns AgentOut (no card). Fetch full card. + try: + card_url = row.get("url") + if card_url: + full = await c.get(f"{card_url}/.well-known/agent-card") + full.raise_for_status() + card = AgentCard.model_validate(full.json()) + else: + detail = await c.get( + f"{self.api_url}/v1/agents/{row['name']}", + headers=self._headers(), + ) + detail.raise_for_status() + card = AgentCard.model_validate(detail.json().get("card") or {}) + except Exception: # noqa: BLE001 + continue + out.append( + DiscoveredAgent( + name=row.get("name", ""), + url=row.get("url"), + card=card, + ) + ) + return out + + async def get_agent(self, name: str) -> DiscoveredAgent: + import httpx + + async with httpx.AsyncClient(timeout=self._timeout) as c: + resp = await c.get( + f"{self.api_url}/v1/agents/{name}", headers=self._headers() + ) + resp.raise_for_status() + data = resp.json() + card = AgentCard.model_validate(data.get("card") or {}) + return DiscoveredAgent(name=data["name"], url=data.get("url"), card=card) + + +__all__ = [ + "ControlPlaneDiscovery", + "DiscoveredAgent", + "DiscoveryClient", + "InMemoryDiscovery", +] diff --git a/a2a_pack/grants.py b/a2a_pack/grants.py new file mode 100644 index 0000000..f4d970a --- /dev/null +++ b/a2a_pack/grants.py @@ -0,0 +1,148 @@ +"""Signed grant tokens for cross-agent workspace handoff. + +A grant is a small, self-contained, signed claim issued by one agent that +the platform (or the receiving agent) can verify without a registry round-trip. + +Wire format:: + + "." + +The payload describes *what* the callee is allowed to do, *whose* workspace +they can see, and *for how long*. The runtime on the receiving side +materializes a :class:`WorkspaceClient` scoped to that grant. + +Auth model is intentionally simple for v1: a shared platform secret signs +every grant. Swap for asymmetric (X.509 / JWKS) when crossing trust domains. +""" +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +import os +import secrets +import time +from typing import Any + +from pydantic import BaseModel, ConfigDict, Field, NonNegativeInt + +from .workspace import WorkspaceMode + +DEFAULT_TTL_SECONDS = 5 * 60 + + +class GrantInvalid(PermissionError): + """Raised by :func:`verify_grant` when a grant is bad/expired/forged.""" + + +class Grant(BaseModel): + """The payload of a signed grant token. + + A grant binds *who* (issuer) gave *whom* (audience) access to *which* + workspace files (bucket + allow/deny patterns) under *what* mode and + *how long*. The runtime enforces every line of this payload. + """ + + model_config = ConfigDict(extra="forbid", frozen=True) + + grant_id: str + issuer: str # caller agent name or URL + audience: str # callee agent name or URL + bucket: str # workspace bucket the grant covers + mode: WorkspaceMode = WorkspaceMode.READ_ONLY + allow_patterns: tuple[str, ...] = ("**",) + deny_patterns: tuple[str, ...] = () + outputs_prefix: str | None = None # if set, callee writes only here + expires_at: NonNegativeInt = 0 + issued_at: NonNegativeInt = 0 + nonce: str = Field(default_factory=lambda: secrets.token_hex(8)) + + +def _b64encode(b: bytes) -> str: + return base64.urlsafe_b64encode(b).rstrip(b"=").decode("ascii") + + +def _b64decode(s: str) -> bytes: + pad = "=" * (-len(s) % 4) + return base64.urlsafe_b64decode(s + pad) + + +def _platform_secret() -> bytes: + secret = os.environ.get("A2A_PLATFORM_SECRET", "dev-secret-rotate-me") + return secret.encode("utf-8") + + +def mint_grant( + *, + issuer: str, + audience: str, + bucket: str, + mode: WorkspaceMode = WorkspaceMode.READ_ONLY, + allow_patterns: tuple[str, ...] = ("**",), + deny_patterns: tuple[str, ...] = (), + outputs_prefix: str | None = None, + ttl_seconds: int = DEFAULT_TTL_SECONDS, + secret: bytes | None = None, +) -> tuple[Grant, str]: + """Build a :class:`Grant` and return it together with its signed token.""" + now = int(time.time()) + grant = Grant( + grant_id=secrets.token_hex(8), + issuer=issuer, + audience=audience, + bucket=bucket, + mode=mode, + allow_patterns=tuple(allow_patterns), + deny_patterns=tuple(deny_patterns), + outputs_prefix=outputs_prefix, + expires_at=now + ttl_seconds, + issued_at=now, + ) + return grant, sign_grant(grant, secret=secret) + + +def sign_grant(grant: Grant, *, secret: bytes | None = None) -> str: + payload = grant.model_dump_json(exclude_none=False).encode("utf-8") + sig = hmac.new(secret or _platform_secret(), payload, hashlib.sha256).digest() + return f"{_b64encode(payload)}.{_b64encode(sig)}" + + +def verify_grant(token: str, *, secret: bytes | None = None) -> Grant: + """Parse + verify ``token``. Raises :class:`GrantInvalid` on any failure. + + Checks signature, expiry, and minimal structural shape. Caller-specific + audience checks are layered on top by the server adapter. + """ + if not token or "." not in token: + raise GrantInvalid("malformed grant token") + payload_b64, sig_b64 = token.rsplit(".", 1) + try: + payload = _b64decode(payload_b64) + sig = _b64decode(sig_b64) + except (ValueError, base64.binascii.Error) as exc: # type: ignore[attr-defined] + raise GrantInvalid(f"grant decode failed: {exc}") from exc + + expected = hmac.new(secret or _platform_secret(), payload, hashlib.sha256).digest() + if not hmac.compare_digest(expected, sig): + raise GrantInvalid("grant signature mismatch") + + try: + data = json.loads(payload) + grant = Grant.model_validate(data) + except Exception as exc: # noqa: BLE001 + raise GrantInvalid(f"grant payload invalid: {exc}") from exc + + if grant.expires_at and grant.expires_at < int(time.time()): + raise GrantInvalid(f"grant expired at {grant.expires_at}") + return grant + + +__all__ = [ + "Grant", + "GrantInvalid", + "mint_grant", + "sign_grant", + "verify_grant", + "DEFAULT_TTL_SECONDS", +] diff --git a/a2a_pack/sandbox.py b/a2a_pack/sandbox.py new file mode 100644 index 0000000..7f32cbe --- /dev/null +++ b/a2a_pack/sandbox.py @@ -0,0 +1,174 @@ +"""Code-execution sandbox surface available to agents via ``ctx.sandbox``. + +The abstract :class:`SandboxClient` is what agent code programs against. The +runtime layer (host-side microsandbox + FUSE-mounted MinIO, in-cluster +DaemonSet, hosted SaaS) supplies a concrete implementation. + +The sandbox is **general-purpose code execution**, not Python-only. Agents +can: + + * run arbitrary shell pipelines: ``await ctx.sandbox.run_shell("git clone … && cargo build")`` + * exec a binary with explicit args (no shell parsing): ``await sb.exec("/usr/bin/git", ["clone", url])`` + * pick any OCI image: ``run_shell("npx @openai/codex …", image="node:20-slim")`` + +``run_python`` is just a convenience for the common Python-snippet case. + +Why an abstract here when ``microsandbox`` itself already has a Python SDK? +The platform owns the *policy* layer — bucket selection, network egress, +write-path restrictions, resource caps, audit logging. Agents must depend on +the policy-respecting surface, not on the raw SDK, so the same agent code +runs unchanged across local dev / cluster / hosted environments. +""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any, Sequence + + +@dataclass(frozen=True) +class ExecResult: + """Result of a command run inside a sandbox.""" + + stdout: str + stderr: str = "" + exit_code: int = 0 + truncated: bool = False + + @property + def output(self) -> str: + """Convenience: combined stdout+stderr.""" + return self.stdout + (self.stderr or "") + + @property + def ok(self) -> bool: + return self.exit_code == 0 + + +@dataclass(frozen=True) +class SandboxSpec: + """Caller request shape for :meth:`SandboxClient.create`.""" + + name: str + image: str = "python:3.11-slim" + memory_mib: int = 512 + cpus: int = 1 + # If set, the runtime mounts this workspace at ``/workspace`` inside the + # VM (FUSE-backed where supported, snapshot bridge otherwise). + workspace: str | None = None + # Logical names the runtime should resolve to actual secrets and inject + # into the VM env. Values never appear in the CLI/API surface. + secrets: tuple[str, ...] = () + # Egress allowlist by hostname; empty = deny all. + egress: tuple[str, ...] = () + # Free-form labels for audit / billing. + labels: dict[str, str] = field(default_factory=dict) + + +class SandboxHandle(ABC): + """Live handle to a running sandbox VM.""" + + name: str + + @abstractmethod + async def exec( + self, + cmd: str, + args: Sequence[str] | None = None, + *, + timeout: float | None = None, + ) -> ExecResult: ... + + @abstractmethod + async def shell( + self, script: str, *, timeout: float | None = None + ) -> ExecResult: ... + + @abstractmethod + async def stop(self) -> None: ... + + @abstractmethod + async def kill(self) -> None: ... + + @abstractmethod + async def logs(self, *, tail: int | None = None) -> str: ... + + +class SandboxClient(ABC): + """Negotiation surface handed to agents via ``ctx.sandbox``.""" + + @abstractmethod + async def create(self, spec: SandboxSpec) -> SandboxHandle: ... + + @abstractmethod + async def get(self, name: str) -> SandboxHandle: ... + + @abstractmethod + async def list(self) -> list[str]: ... + + @abstractmethod + async def remove(self, name: str) -> None: ... + + async def run_python( + self, code: str, *, image: str = "python:3.11-slim", **kwargs: Any + ) -> ExecResult: + """Convenience: spin a one-shot sandbox, run inline Python, tear down. + + Equivalent to ``create(SandboxSpec(image=image)).exec("python", ["-c", code])``. + Use the lower-level surface when you need persistence, multiple + commands, or non-Python tools. + """ + import uuid + + spec = SandboxSpec( + name=f"py-{uuid.uuid4().hex[:8]}", image=image, **kwargs + ) + sb = await self.create(spec) + try: + return await sb.exec("python", ["-c", code]) + finally: + try: + await sb.stop() + except Exception: # noqa: BLE001 + pass + try: + await self.remove(spec.name) + except Exception: # noqa: BLE001 + pass + + async def run_shell( + self, + script: str, + *, + image: str = "python:3.11-slim", + **kwargs: Any, + ) -> ExecResult: + """Convenience: spin a one-shot sandbox, run an arbitrary shell script, + tear down. + + Pass ``image=`` to pick the toolchain (e.g. ``"node:20-slim"`` for + npm-based tools like codex, ``"rust:1-slim"`` for cargo, + ``"alpine/git"`` for plain git ops). The default ``python:3.11-slim`` + already has bash/coreutils/curl/git so most one-liners just work. + """ + import uuid + + spec = SandboxSpec( + name=f"sh-{uuid.uuid4().hex[:8]}", image=image, **kwargs + ) + sb = await self.create(spec) + try: + return await sb.shell(script) + finally: + try: + await sb.stop() + except Exception: # noqa: BLE001 + pass + try: + await self.remove(spec.name) + except Exception: # noqa: BLE001 + pass + + +class SandboxUnavailable(RuntimeError): + """Raised when ``ctx.sandbox`` is accessed but no runtime is attached.""" diff --git a/a2a_pack/serve/asgi.py b/a2a_pack/serve/asgi.py index 8c99a96..b8feedd 100644 --- a/a2a_pack/serve/asgi.py +++ b/a2a_pack/serve/asgi.py @@ -24,10 +24,12 @@ from pydantic import BaseModel from ..agent import A2AAgent, SkillInputError, SkillNotFound from ..auth import APIKeyAuth, NoAuth from ..context import LocalRunContext, MissingScopes +from ..grants import Grant, GrantInvalid, verify_grant class _InvokeIn(BaseModel): arguments: dict[str, Any] = {} + grant: str | None = None def build_app(agent: A2AAgent) -> FastAPI: @@ -76,8 +78,23 @@ def build_app(agent: A2AAgent) -> FastAPI: authorization: str | None = Header(default=None), ) -> dict[str, Any]: token = _check_key(authorization) + # If the caller handed us a grant, verify it and build a + # workspace bounded by its claims. Production runtimes plug in a + # real MinIO-backed WorkspaceClient here; for now we materialize + # an empty in-memory view so policy enforcement is exercised. + granted_workspace = None + grant_obj: Grant | None = None + if body.grant is not None: + try: + grant_obj = verify_grant(body.grant) + except GrantInvalid as exc: + raise HTTPException(403, f"invalid grant: {exc}") from exc + granted_workspace = _grant_to_workspace(grant_obj, agent) + ctx: LocalRunContext[Any] = LocalRunContext( - auth=_build_auth(token), task_id=f"http-{skill_name}" + auth=_build_auth(token), + task_id=f"http-{skill_name}", + workspace=granted_workspace, ) try: result = await agent.invoke_json(skill_name, ctx, body.arguments) @@ -92,11 +109,43 @@ def build_app(agent: A2AAgent) -> FastAPI: "events": [ {"kind": e.kind, "payload": e.payload} for e in ctx.events ], + "artifacts": [ + {"name": name, "size_bytes": len(data)} + for name, data in (ctx.artifacts or {}).items() + ], + "grant_id": grant_obj.grant_id if grant_obj else None, } return app +def _grant_to_workspace(grant: Grant, agent: A2AAgent) -> Any: + """Build a :class:`WorkspaceClient` bounded by the grant. + + v1 returns a :class:`LocalWorkspaceClient` whose ``access`` policy is + derived from the grant. The runtime layer (cluster service) replaces + this with a real MinIO-backed client scoped to ``grant.bucket``. + """ + from ..workspace import ( + LocalWorkspaceClient, + WorkspaceAccess, + WorkspaceMode, + ) + + access = WorkspaceAccess.dynamic( + max_files=64, + allowed_modes=(WorkspaceMode.READ_ONLY, WorkspaceMode.READ_WRITE_OVERLAY), + require_reason=False, + deny_patterns=tuple(grant.deny_patterns), + require_human_approval=False, + ) + # Empty in-memory file map; the deployed runtime substitutes a + # MinIO-backed client. See README for the cluster-side wiring. + return LocalWorkspaceClient( + files={}, access=access, bucket=grant.bucket, issuer=grant.audience + ) + + def serve(agent: A2AAgent, *, host: str = "0.0.0.0", port: int = 8000) -> None: """Run the agent's HTTP server with uvicorn (blocking).""" import uvicorn diff --git a/a2a_pack/workspace.py b/a2a_pack/workspace.py index ddda162..ff1189a 100644 --- a/a2a_pack/workspace.py +++ b/a2a_pack/workspace.py @@ -199,6 +199,42 @@ class WorkspaceClient(ABC): @abstractmethod async def list_grants(self) -> list[WorkspaceGrant]: ... + async def delegate( + self, + *, + audience: str, + allow_patterns: Sequence[str] = ("**",), + deny_patterns: Sequence[str] = (), + mode: WorkspaceMode = WorkspaceMode.READ_ONLY, + outputs_prefix: str | None = None, + ttl_seconds: int = 300, + ) -> str: + """Mint a signed grant token the caller can hand to ``ctx.call``. + + The default implementation requires the workspace to expose + ``self.bucket`` and ``self.issuer`` — override in concrete clients + that don't fit that shape. + """ + from .grants import mint_grant + + bucket = getattr(self, "bucket", None) or getattr(self, "_bucket", None) + if bucket is None: + raise NotImplementedError( + "this WorkspaceClient does not expose a bucket; override delegate()" + ) + issuer = getattr(self, "issuer", "self") + _, token = mint_grant( + issuer=issuer, + audience=audience, + bucket=bucket, + mode=mode, + allow_patterns=tuple(allow_patterns), + deny_patterns=tuple(deny_patterns), + outputs_prefix=outputs_prefix, + ttl_seconds=ttl_seconds, + ) + return token + # --------------------------------------------------------------------------- # Local in-memory implementation, for dev/tests. @@ -288,11 +324,16 @@ class LocalWorkspaceClient(WorkspaceClient): files: dict[str, bytes], *, access: WorkspaceAccess, + bucket: str = "local", + issuer: str = "local", ) -> None: self._files: dict[str, bytes] = dict(files) self._access = access self._grants: dict[str, WorkspaceGrant] = {} self._counter = 0 + # Expose bucket+issuer so the default WorkspaceClient.delegate() works. + self.bucket = bucket + self.issuer = issuer def _detect(self, path: str) -> FileType: for ext, ft in self._EXT_TO_TYPE.items(): diff --git a/examples/coder_agent.py b/examples/coder_agent.py new file mode 100644 index 0000000..8dae5a7 --- /dev/null +++ b/examples/coder_agent.py @@ -0,0 +1,145 @@ +"""Example agent that drives a microsandbox VM as a general-purpose runtime. + +The sandbox is **not Python-only** — agents can: + + * run shell pipelines (``run_shell``) + * exec arbitrary binaries with explicit args (``handle.exec``) + * pick any OCI image (Node for codex/npx, Rust for cargo, Alpine for git, …) + +The same agent class works locally on a Mac (bridge mode, libkrun) and +in-cluster once the runtime layer attaches a sandbox client to the agent's +``RunContext``. + +Local run:: + + cd apps/a2a + pip install -e '.[dev]' + pip install -e ../sandbox-runtime'[minio]' + kubectl -n microcash-infra port-forward svc/microcash-infra-minio 9000:9000 & + A2A_MINIO_ENDPOINT=http://localhost:9000 python -m examples.coder_agent +""" +from __future__ import annotations + +import asyncio +import os + +from pydantic import BaseModel + +from a2a_pack import A2AAgent, NoAuth, RunContext, SandboxSpec, skill + + +class CoderConfig(BaseModel): + default_image: str = "python:3.11-slim" + + +class CoderAgent(A2AAgent[CoderConfig, NoAuth]): + name = "coder-demo" + description = ( + "General-purpose code-execution agent: shell, python, npm, git, etc." + ) + + config_model = CoderConfig + auth_model = NoAuth + tools_used = ("microsandbox", "minio") + + # ----- Python-snippet shortcut -------------------------------------- + + @skill(description="Run inline Python and return stdout+stderr") + async def run_python(self, ctx: RunContext[NoAuth], code: str) -> str: + result = await ctx.sandbox.run_python( + code, image=self.config.default_image + ) + return result.output + + # ----- Arbitrary shell --------------------------------------------- + + @skill(description="Run an arbitrary shell pipeline; image is overridable") + async def run_shell( + self, + ctx: RunContext[NoAuth], + script: str, + image: str | None = None, + ) -> str: + result = await ctx.sandbox.run_shell( + script, image=image or self.config.default_image + ) + return result.output + + # ----- Multi-step session in a non-default image (codex/npm flow) --- + + @skill(description="Demo: a node:20 sandbox running a small JS one-liner") + async def run_node(self, ctx: RunContext[NoAuth]) -> str: + sb = await ctx.sandbox.create( + SandboxSpec( + name="node-demo", + image="node:20-slim", + workspace="agent-coder-demo", + ) + ) + try: + v = await sb.exec("node", ["--version"]) + r = await sb.shell( + "node -e \"console.log('sum=', [1,2,3,4].reduce((a,b)=>a+b, 0))\"" + ) + return f"node {v.stdout.strip()}\n{r.stdout}" + finally: + await sb.stop() + await ctx.sandbox.remove("node-demo") + + # ----- See the MinIO-backed workspace from inside the VM ------------ + + @skill(description="ls -la /workspace from inside the sandbox") + async def list_workspace(self, ctx: RunContext[NoAuth]) -> str: + sb = await ctx.sandbox.create( + SandboxSpec( + name="ls-demo", + image=self.config.default_image, + workspace="agent-coder-demo", + ) + ) + try: + r = await sb.shell("ls -la /workspace") + return r.output + finally: + await sb.stop() + await ctx.sandbox.remove("ls-demo") + + +async def main() -> None: + # The SDK package itself stays free of microsandbox/fusepy/boto3 — the + # runtime is wired in here, at the boundary, by the host (or in cluster, + # by whoever provisions the agent's RunContext). + from sandbox_runtime import LocalMicrosandboxClient + + client = LocalMicrosandboxClient( + minio_endpoint=os.environ.get("A2A_MINIO_ENDPOINT", "http://localhost:9000"), + ) + agent = CoderAgent() + + print("--- run_python ---") + print( + await agent.local_invoke( + "run_python", + sandbox=client, + code="import sys, platform; print('py', sys.version_info[:2], platform.machine())", + ) + ) + + print("--- run_shell (default image) ---") + print( + await agent.local_invoke( + "run_shell", + sandbox=client, + script="cat /etc/os-release | grep PRETTY_NAME && uname -srm", + ) + ) + + print("--- run_node (node:20-slim) ---") + print(await agent.local_invoke("run_node", sandbox=client)) + + print("--- list_workspace ---") + print(await agent.local_invoke("list_workspace", sandbox=client)) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/multi_agent.py b/examples/multi_agent.py new file mode 100644 index 0000000..0a1b1d3 --- /dev/null +++ b/examples/multi_agent.py @@ -0,0 +1,253 @@ +"""End-to-end demo of the platform's killer flow. + +User uploads files → main agent discovers a community-published graph agent → +delegates a scoped grant → graph agent runs in its own RunContext, sees only +the granted files, returns artifacts → main agent presents results. + +Two agents in the same process for the demo. In production, the graph agent +is on another pod (`HttpA2AClient`) and the discovery client hits the +control plane registry — the agent code is unchanged. + +Run:: + + cd apps/a2a + pip install -e '.[dev]' + python -m examples.multi_agent +""" +from __future__ import annotations + +import asyncio +import json + +from a2a_pack import ( + A2AAgent, + DiscoveredAgent, + FileType, + InMemoryA2AClient, + InMemoryDiscovery, + LocalRunContext, + LocalWorkspaceClient, + NoAuth, + RunContext, + WorkspaceAccess, + WorkspaceMode, + skill, + verify_grant, +) + + +# --------------------------------------------------------------------------- +# graph-agent: a community-published agent another developer wrote. +# Tags itself for discovery, declares a workspace policy. +# --------------------------------------------------------------------------- + + +class GraphAgent(A2AAgent): + name = "graph-agent-v2" + description = "Renders dashboards from spreadsheets" + tools_used = ("matplotlib", "pandas") + workspace_access = WorkspaceAccess.dynamic( + max_files=8, + allowed_modes=(WorkspaceMode.READ_ONLY,), + deny_patterns=("secrets/**", "**/.env"), + ) + + @skill( + description="Generate a dashboard from spreadsheet files", + tags=["visualization", "spreadsheet", "chart"], + ) + async def generate_dashboard( + self, ctx: RunContext[NoAuth], prompt: str + ) -> dict: + # Receive the workspace bound by the inbound grant. We can ONLY see + # files the caller delegated; everything else (secrets, etc.) is + # invisible. + ws = ctx.workspace + spreadsheets = await ws.search( + query="data sales revenue", types=[FileType.OTHER], limit=5 + ) + files_seen = [m.path for m in spreadsheets] + + await ctx.emit_progress(f"reading {len(files_seen)} files for: {prompt}") + + # Pretend we generated a chart. Stage outputs as artifacts; the + # platform stages them as patches under the grant's outputs prefix. + chart_bytes = ( + b"\x89PNG\r\n\x1a\n" # tiny fake png prefix + + json.dumps({"prompt": prompt, "files": files_seen}).encode() + ) + ref = await ctx.write_artifact( + "charts/dashboard.png", chart_bytes, "image/png" + ) + await ctx.emit_artifact(ref) + + return { + "prompt": prompt, + "files_used": files_seen, + "chart": ref.uri, + "bucket_seen": getattr(ws, "bucket", None), + } + + +# --------------------------------------------------------------------------- +# main-agent: orchestrates the user's session. The user's full workspace is +# bound to its RunContext; it scopes a grant down to the spreadsheets only +# before calling the graph agent. +# --------------------------------------------------------------------------- + + +class MainAgent(A2AAgent): + name = "session-orchestrator" + description = "Routes user intents to the right specialist agent" + + @skill(description="Make me a chart from my uploaded spreadsheets") + async def make_chart(self, ctx: RunContext[NoAuth], prompt: str) -> dict: + # 1. Discover: find a graph-capable agent in the registry. + candidates = await ctx.discover.find_agents(tags=["visualization"]) + if not candidates: + raise RuntimeError("no graph-capable agent registered") + graph = candidates[0] + await ctx.emit_progress(f"found {graph.name}; delegating workspace") + + # 2. Delegate: mint a grant scoped to *.xlsx, deny secrets, write + # to charts/ only, expires in 5 minutes. + token = await ctx.workspace.delegate( + audience=graph.name, + allow_patterns=("*.xlsx", "*.csv"), + deny_patterns=("secrets/**", "**/.env"), + outputs_prefix="charts/", + ttl_seconds=300, + ) + decoded = verify_grant(token) + await ctx.emit_event_kind( + "delegation", + { + "to": graph.name, + "grant_id": decoded.grant_id, + "allow": list(decoded.allow_patterns), + "deny": list(decoded.deny_patterns), + "expires_at": decoded.expires_at, + }, + ) if hasattr(ctx, "emit_event_kind") else None + + # 3. Call: invoke the graph agent. Runtime hands the grant in the + # body; receiving runtime materializes a workspace from it. + result = await ctx.call( + graph.name, + "generate_dashboard", + args={"prompt": prompt}, + grant=token, + ) + + return { + "delegated_to": graph.name, + "grant_id": result.grant_id, + "graph_response": result.result, + "events_from_callee": [e["kind"] for e in result.events], + "artifacts_from_callee": list(result.artifacts), + } + + +# --------------------------------------------------------------------------- +# wire-up: in-memory router + discovery (replace with HTTP + control plane +# in production with zero agent-code changes) +# --------------------------------------------------------------------------- + + +def build_in_memory_runtime( + user_workspace: LocalWorkspaceClient, agents: dict[str, A2AAgent] +): + def factory(agent: A2AAgent, grant_token: str | None): + ws = None + if grant_token is not None: + grant = verify_grant(grant_token) + visible = { + p: b + for p, b in user_workspace._files.items() + if not any( + p.startswith((d.rstrip("*").rstrip("/"))) + for d in grant.deny_patterns + if d + ) + } + ws = LocalWorkspaceClient( + files=visible, + access=WorkspaceAccess.dynamic( + max_files=64, + allowed_modes=(WorkspaceMode.READ_ONLY,), + deny_patterns=tuple(grant.deny_patterns), + ), + bucket=grant.bucket, + issuer=grant.audience, + ) + return LocalRunContext(auth=NoAuth(), workspace=ws) + + return InMemoryA2AClient(agents=agents, ctx_factory=factory) + + +# --------------------------------------------------------------------------- +# main +# --------------------------------------------------------------------------- + + +async def main() -> None: + user_workspace = LocalWorkspaceClient( + files={ + "sales_q1.xlsx": b"q1 data", + "sales_q2.xlsx": b"q2 data", + "notes.md": b"# notes", + "secrets/.env": b"DB_PASSWORD=NEVER", + }, + access=WorkspaceAccess.dynamic( + max_files=10, + allowed_modes=( + WorkspaceMode.READ_ONLY, + WorkspaceMode.READ_WRITE_OVERLAY, + ), + deny_patterns=("secrets/**",), + ), + bucket="user-42-files", + issuer="user-42", + ) + + main_a = MainAgent() + graph_a = GraphAgent() + + discovery = InMemoryDiscovery( + { + graph_a.name: DiscoveredAgent( + name=graph_a.name, url=None, card=graph_a.card() + ) + } + ) + a2a = build_in_memory_runtime( + user_workspace=user_workspace, agents={graph_a.name: graph_a} + ) + + print("=== main-agent ships uploaded files ===") + print(f" user bucket: {user_workspace.bucket}") + print(f" files: {sorted(user_workspace._files)}") + print() + + out = await main_a.local_invoke( + "make_chart", + workspace=user_workspace, + a2a=a2a, + discover=discovery, + prompt="weekly burn rate by quarter", + ) + + print("=== main-agent result ===") + print(json.dumps(out, indent=2)) + print() + print("=== what graph-agent could see ===") + print(f" bucket: {out['graph_response']['bucket_seen']}") + print(f" files_used: {out['graph_response']['files_used']}") + print( + " → secrets/.env is NOT in the list. The grant denied it; the " + "callee's runtime never made it visible." + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/test_multi_agent.py b/tests/test_multi_agent.py new file mode 100644 index 0000000..7673208 --- /dev/null +++ b/tests/test_multi_agent.py @@ -0,0 +1,261 @@ +"""End-to-end tests for the agent-to-agent + grant handoff seam.""" +from __future__ import annotations + +import time + +import pytest + +from a2a_pack import ( + A2AAgent, + DiscoveredAgent, + FileType, + Grant, + GrantInvalid, + InMemoryA2AClient, + InMemoryDiscovery, + LocalRunContext, + LocalWorkspaceClient, + NoAuth, + RunContext, + WorkspaceAccess, + WorkspaceMode, + mint_grant, + skill, + verify_grant, +) + + +# --------------------------------------------------------------------------- +# grant tokens +# --------------------------------------------------------------------------- + + +def test_mint_and_verify_round_trip(): + grant, token = mint_grant( + issuer="main", audience="graph", bucket="user-42-files" + ) + out = verify_grant(token) + assert out.grant_id == grant.grant_id + assert out.bucket == "user-42-files" + assert out.audience == "graph" + assert out.expires_at > int(time.time()) + + +def test_tampered_grant_rejected(): + _, token = mint_grant(issuer="main", audience="graph", bucket="b") + payload, sig = token.rsplit(".", 1) + forged = payload + "x." + sig + with pytest.raises(GrantInvalid): + verify_grant(forged) + + +def test_expired_grant_rejected(): + _, token = mint_grant( + issuer="main", + audience="graph", + bucket="b", + ttl_seconds=-1, # already expired + ) + with pytest.raises(GrantInvalid, match="expired"): + verify_grant(token) + + +def test_signature_mismatch_rejected(): + _, token = mint_grant( + issuer="main", audience="graph", bucket="b", secret=b"key-A" + ) + with pytest.raises(GrantInvalid, match="signature"): + verify_grant(token, secret=b"key-B") + + +# --------------------------------------------------------------------------- +# workspace.delegate() mints a grant; receiving side verifies it +# --------------------------------------------------------------------------- + + +async def test_workspace_delegate_mints_valid_grant(): + files = {"data/sales.xlsx": b"...", "secrets/.env": b"DO_NOT"} + ws = LocalWorkspaceClient( + files=files, + access=WorkspaceAccess.dynamic( + max_files=5, + allowed_modes=(WorkspaceMode.READ_ONLY,), + deny_patterns=("secrets/**",), + ), + bucket="user-42-files", + issuer="main-agent", + ) + token = await ws.delegate( + audience="graph-agent", + allow_patterns=("*.xlsx",), + deny_patterns=("secrets/**",), + outputs_prefix="charts/", + ttl_seconds=300, + ) + grant = verify_grant(token) + assert grant.bucket == "user-42-files" + assert grant.audience == "graph-agent" + assert "*.xlsx" in grant.allow_patterns + assert "secrets/**" in grant.deny_patterns + assert grant.outputs_prefix == "charts/" + assert grant.mode is WorkspaceMode.READ_ONLY + + +# --------------------------------------------------------------------------- +# Two-agent demo: main calls graph in-process, hands a grant +# --------------------------------------------------------------------------- + + +class _GraphAgent(A2AAgent): + name = "graph-agent" + description = "Generates a chart from spreadsheet files" + tools_used = ("matplotlib",) + workspace_access = WorkspaceAccess.dynamic( + max_files=8, + allowed_modes=(WorkspaceMode.READ_ONLY,), + deny_patterns=("secrets/**",), + ) + + @skill(description="Render a chart", tags=["visualization", "spreadsheet"]) + async def generate_dashboard( + self, ctx: RunContext[NoAuth], prompt: str + ) -> dict: + # Workspace was attached by the runtime from the caller's grant. + ws = ctx.workspace + return { + "prompt": prompt, + "bucket": getattr(ws, "bucket", None), + "deny_patterns": list(getattr(ws, "_access").deny_patterns), + } + + +class _MainAgent(A2AAgent): + name = "main-agent" + description = "Orchestrates user files via discovered agents" + + @skill(description="Find a viz agent and delegate the chart") + async def make_chart(self, ctx: RunContext[NoAuth], prompt: str) -> dict: + hits = await ctx.discover.find_agents(tags=["visualization"]) + assert hits, "no graph agent in registry" + graph = hits[0] + + token = await ctx.workspace.delegate( + audience=graph.name, + allow_patterns=("*.xlsx",), + deny_patterns=("secrets/**",), + outputs_prefix="charts/", + ttl_seconds=300, + ) + result = await ctx.call( + graph.name, "generate_dashboard", args={"prompt": prompt}, grant=token + ) + return {"called": graph.name, "out": result.result, "grant_id": result.grant_id} + + +def _build_a2a_router( + agents: dict[str, A2AAgent], caller_workspace: LocalWorkspaceClient +): + """In-memory router. Builds a callee-side ctx whose workspace is bounded + by the inbound grant — same shape the HTTP server adapter does.""" + + def factory(agent: A2AAgent, grant_token: str | None): + ws = None + if grant_token is not None: + grant = verify_grant(grant_token) + ws = LocalWorkspaceClient( + files={ + p: b + for p, b in caller_workspace._files.items() + if not any(p.startswith(d.rstrip("*").rstrip("/")) for d in grant.deny_patterns if d) + }, + access=WorkspaceAccess.dynamic( + max_files=64, + allowed_modes=(WorkspaceMode.READ_ONLY,), + deny_patterns=tuple(grant.deny_patterns), + ), + bucket=grant.bucket, + issuer=grant.audience, + ) + return LocalRunContext(auth=NoAuth(), workspace=ws) + + return InMemoryA2AClient(agents=agents, ctx_factory=factory) + + +async def test_main_agent_discovers_and_delegates_to_graph_agent(): + main = _MainAgent() + graph = _GraphAgent() + + user_workspace = LocalWorkspaceClient( + files={ + "sales.xlsx": b"q1,q2,q3\n10,20,30\n", + "secrets/.env": b"NEVER", + }, + access=WorkspaceAccess.dynamic( + max_files=10, + allowed_modes=( + WorkspaceMode.READ_ONLY, + WorkspaceMode.READ_WRITE_OVERLAY, + ), + deny_patterns=("secrets/**",), + ), + bucket="user-42-files", + issuer="user-42", + ) + + discovery = InMemoryDiscovery( + {graph.name: DiscoveredAgent(name=graph.name, url=None, card=graph.card())} + ) + router = _build_a2a_router({graph.name: graph}, caller_workspace=user_workspace) + + out = await main.local_invoke( + "make_chart", + workspace=user_workspace, + a2a=router, + discover=discovery, + prompt="weekly burn rate", + ) + + assert out["called"] == "graph-agent" + assert out["out"]["bucket"] == "user-42-files" + # Callee saw the deny patterns we minted in the grant + assert "secrets/**" in out["out"]["deny_patterns"] + # And the call was audit-tagged + assert out["grant_id"] + + +async def test_no_grant_means_no_workspace_for_callee(): + """If the main agent didn't delegate, the callee can't touch any workspace.""" + from a2a_pack import SkillInvocationError + + class _Greedy(A2AAgent): + name = "greedy" + description = "" + + @skill() + async def steal(self, ctx: RunContext[NoAuth]) -> str: + return str(ctx.workspace.bucket) # type: ignore[attr-defined] + + class _Caller(A2AAgent): + name = "caller" + description = "" + + @skill() + async def go(self, ctx: RunContext[NoAuth]) -> str: + r = await ctx.call("greedy", "steal", args={}, grant=None) + return str(r.result) + + router = InMemoryA2AClient( + agents={"greedy": _Greedy()}, + ctx_factory=lambda agent, grant: LocalRunContext(auth=NoAuth()), + ) + discovery = InMemoryDiscovery({}) + with pytest.raises(SkillInvocationError) as ei: + await _Caller().local_invoke("go", a2a=router, discover=discovery) + # Trace back to the PermissionError: callee accessed ctx.workspace with + # nothing bound, so the runtime denied it. + chain = [] + err: BaseException | None = ei.value + while err is not None: + chain.append(err) + err = err.__cause__ + assert any(isinstance(e, PermissionError) for e in chain) diff --git a/tests/test_sandbox.py b/tests/test_sandbox.py new file mode 100644 index 0000000..3632810 --- /dev/null +++ b/tests/test_sandbox.py @@ -0,0 +1,205 @@ +from __future__ import annotations + +from typing import Sequence + +import pytest + +from a2a_pack import ( + A2AAgent, + ExecResult, + LocalRunContext, + NoAuth, + RunContext, + SandboxClient, + SandboxHandle, + SandboxSpec, + SandboxUnavailable, + skill, +) + + +# --------------------------------------------------------------------------- +# stub client that records calls — for asserting the surface without needing +# microsandbox/FUSE/MinIO running. +# --------------------------------------------------------------------------- + + +class _StubHandle(SandboxHandle): + def __init__(self, name: str, exec_log: list[tuple[str, ...]]) -> None: + self.name = name + self._exec_log = exec_log + self.stopped = False + + async def exec( + self, + cmd: str, + args: Sequence[str] | None = None, + *, + timeout: float | None = None, + ) -> ExecResult: + a = tuple(args or ()) + self._exec_log.append(("exec", cmd, *a)) + return ExecResult(stdout=f"exec:{cmd}:{','.join(a)}", exit_code=0) + + async def shell( + self, script: str, *, timeout: float | None = None + ) -> ExecResult: + self._exec_log.append(("shell", script)) + return ExecResult(stdout=f"shell:{script}", exit_code=0) + + async def stop(self) -> None: + self.stopped = True + + async def kill(self) -> None: + self.stopped = True + + async def logs(self, *, tail: int | None = None) -> str: + return "" + + +class _StubClient(SandboxClient): + def __init__(self) -> None: + self.created: list[SandboxSpec] = [] + self.removed: list[str] = [] + self._handles: dict[str, _StubHandle] = {} + self.exec_log: list[tuple[str, ...]] = [] + + async def create(self, spec: SandboxSpec) -> SandboxHandle: + self.created.append(spec) + h = _StubHandle(spec.name, self.exec_log) + self._handles[spec.name] = h + return h + + async def get(self, name: str) -> SandboxHandle: + return self._handles[name] + + async def list(self) -> list[str]: + return list(self._handles) + + async def remove(self, name: str) -> None: + self.removed.append(name) + self._handles.pop(name, None) + + +# --------------------------------------------------------------------------- +# context plumbing +# --------------------------------------------------------------------------- + + +async def test_sandbox_unavailable_when_not_attached(): + ctx: LocalRunContext[NoAuth] = LocalRunContext(auth=NoAuth()) + with pytest.raises(SandboxUnavailable): + _ = ctx.sandbox + + +async def test_sandbox_attached_returns_client(): + client = _StubClient() + ctx: LocalRunContext[NoAuth] = LocalRunContext(auth=NoAuth(), sandbox=client) + assert ctx.sandbox is client + + +# --------------------------------------------------------------------------- +# agent uses ctx.sandbox via convenience helpers +# --------------------------------------------------------------------------- + + +class _CoderAgent(A2AAgent): + name = "coder" + description = "runs code in a sandbox" + + @skill() + async def run(self, ctx: RunContext[NoAuth], code: str) -> str: + result = await ctx.sandbox.run_python(code) + return result.output + + +async def test_agent_uses_run_python_convenience(): + client = _StubClient() + out = await _CoderAgent().local_invoke( + "run", sandbox=client, code="print('hello')" + ) + assert out.startswith("exec:python:-c,print('hello')") + # Spec was created and torn down (ephemeral) + assert len(client.created) == 1 + assert client.created[0].image == "python:3.11-slim" + assert client.removed == [client.created[0].name] + + +async def test_run_shell_one_shot(): + client = _StubClient() + + class _Agent(A2AAgent): + name = "shellbot" + description = "" + + @skill() + async def go(self, ctx: RunContext[NoAuth]) -> str: + r = await ctx.sandbox.run_shell("ls /workspace") + return r.stdout + + out = await _Agent().local_invoke("go", sandbox=client) + assert "shell:ls /workspace" in out + assert client.exec_log == [("shell", "ls /workspace")] + # one-shot: created and removed in the same call + assert len(client.created) == 1 + assert len(client.removed) == 1 + + +# --------------------------------------------------------------------------- +# explicit lifecycle (matches microsandbox SDK shape 1:1) +# --------------------------------------------------------------------------- + + +async def test_explicit_create_and_stop(): + client = _StubClient() + + class _Agent(A2AAgent): + name = "lifecycle" + description = "" + + @skill() + async def go(self, ctx: RunContext[NoAuth]) -> int: + sb = await ctx.sandbox.create( + SandboxSpec(name="my-sb", image="python:3.11-slim", workspace="agent-foo") + ) + r1 = await sb.exec("python", ["-c", "print(1)"]) + r2 = await sb.shell("echo 2") + await sb.stop() + assert isinstance(r1, ExecResult) + assert r2.ok + return r1.exit_code + + rc = await _Agent().local_invoke("go", sandbox=client) + assert rc == 0 + assert client.created[0].workspace == "agent-foo" + + +# --------------------------------------------------------------------------- +# SandboxSpec metadata propagation +# --------------------------------------------------------------------------- + + +async def test_spec_carries_secrets_and_egress(): + client = _StubClient() + + class _Agent(A2AAgent): + name = "scoped" + description = "" + + @skill() + async def go(self, ctx: RunContext[NoAuth]) -> str: + spec = SandboxSpec( + name="net-bound", + secrets=("OPENAI_KEY",), + egress=("api.openai.com",), + labels={"task": "research"}, + ) + sb = await ctx.sandbox.create(spec) + await sb.stop() + return "ok" + + await _Agent().local_invoke("go", sandbox=client) + spec = client.created[0] + assert spec.secrets == ("OPENAI_KEY",) + assert spec.egress == ("api.openai.com",) + assert spec.labels == {"task": "research"}