Files
a2a/a2a_pack/serve/asgi.py

154 lines
5.6 KiB
Python

"""HTTP adapter that turns any :class:`A2AAgent` into a service.
This is intentionally minimal: it covers the surface needed to plug into
the wider A2A ecosystem and the platform's control plane.
Endpoints:
GET /healthz -> liveness
GET /.well-known/agent-card -> Agent Card JSON
POST /invoke/{skill} -> invoke skill (JSON in, JSON out)
Auth: a single bearer token is read from the ``A2A_API_KEY`` env var. If set,
all routes except ``/healthz`` and the card require ``Authorization: Bearer
<key>``. The bearer token is materialized into the agent's declared
``auth_model`` (best-effort: APIKeyAuth, otherwise NoAuth).
"""
from __future__ import annotations
import os
from typing import Any
from fastapi import FastAPI, Header, HTTPException
from pydantic import BaseModel
from ..agent import A2AAgent, SkillInputError, SkillNotFound
from ..auth import APIKeyAuth, NoAuth
from ..context import LocalRunContext, MissingScopes
from ..grants import Grant, GrantInvalid, verify_grant
class _InvokeIn(BaseModel):
arguments: dict[str, Any] = {}
grant: str | None = None
def build_app(agent: A2AAgent) -> FastAPI:
"""Build a FastAPI app for the given agent instance."""
app = FastAPI(title=type(agent).name, version=type(agent).version)
api_key = os.environ.get("A2A_API_KEY")
def _build_auth(provided_key: str | None) -> Any:
auth_cls = type(agent).auth_model
if auth_cls is APIKeyAuth:
return APIKeyAuth(api_key_id=provided_key or "anonymous")
if auth_cls is NoAuth:
return NoAuth()
# Unknown auth model: try default-construct, else fail loudly.
try:
return auth_cls()
except Exception as exc: # pragma: no cover - depends on user model
raise HTTPException(
status_code=500,
detail=f"cannot materialize auth_model {auth_cls.__name__}: {exc}",
) from exc
def _check_key(authorization: str | None) -> str | None:
if api_key is None:
return None
if not authorization or not authorization.lower().startswith("bearer "):
raise HTTPException(401, "missing bearer token")
token = authorization.split(None, 1)[1].strip()
if token != api_key:
raise HTTPException(401, "invalid bearer token")
return token
@app.get("/healthz")
async def healthz() -> dict[str, Any]:
ok = await agent.health()
return {"ok": ok, "agent": type(agent).name, "version": type(agent).version}
@app.get("/.well-known/agent-card")
async def agent_card() -> dict[str, Any]:
return agent.card().model_dump(mode="json")
@app.post("/invoke/{skill_name}")
async def invoke(
skill_name: str,
body: _InvokeIn,
authorization: str | None = Header(default=None),
) -> dict[str, Any]:
token = _check_key(authorization)
# If the caller handed us a grant, verify it and build a
# workspace bounded by its claims. Production runtimes plug in a
# real MinIO-backed WorkspaceClient here; for now we materialize
# an empty in-memory view so policy enforcement is exercised.
granted_workspace = None
grant_obj: Grant | None = None
if body.grant is not None:
try:
grant_obj = verify_grant(body.grant)
except GrantInvalid as exc:
raise HTTPException(403, f"invalid grant: {exc}") from exc
granted_workspace = _grant_to_workspace(grant_obj, agent)
ctx: LocalRunContext[Any] = LocalRunContext(
auth=_build_auth(token),
task_id=f"http-{skill_name}",
workspace=granted_workspace,
)
try:
result = await agent.invoke_json(skill_name, ctx, body.arguments)
except SkillNotFound:
raise HTTPException(404, f"unknown skill: {skill_name}")
except SkillInputError as exc:
raise HTTPException(400, str(exc))
except MissingScopes as exc:
raise HTTPException(403, str(exc))
return {
"result": result,
"events": [
{"kind": e.kind, "payload": e.payload} for e in ctx.events
],
"artifacts": [
{"name": name, "size_bytes": len(data)}
for name, data in (ctx.artifacts or {}).items()
],
"grant_id": grant_obj.grant_id if grant_obj else None,
}
return app
def _grant_to_workspace(grant: Grant, agent: A2AAgent) -> Any:
"""Build a :class:`WorkspaceClient` bounded by the grant.
v1 returns a :class:`LocalWorkspaceClient` whose ``access`` policy is
derived from the grant. The runtime layer (cluster service) replaces
this with a real MinIO-backed client scoped to ``grant.bucket``.
"""
from ..workspace import (
LocalWorkspaceClient,
WorkspaceAccess,
WorkspaceMode,
)
access = WorkspaceAccess.dynamic(
max_files=64,
allowed_modes=(WorkspaceMode.READ_ONLY, WorkspaceMode.READ_WRITE_OVERLAY),
require_reason=False,
deny_patterns=tuple(grant.deny_patterns),
require_human_approval=False,
)
# Empty in-memory file map; the deployed runtime substitutes a
# MinIO-backed client. See README for the cluster-side wiring.
return LocalWorkspaceClient(
files={}, access=access, bucket=grant.bucket, issuer=grant.audience
)
def serve(agent: A2AAgent, *, host: str = "0.0.0.0", port: int = 8000) -> None:
"""Run the agent's HTTP server with uvicorn (blocking)."""
import uvicorn
uvicorn.run(build_app(agent), host=host, port=port, log_level="info")