Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions .design/a2a-sdk-migration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# A2A Go SDK Migration

## Status: In Progress
## Date: 2026-06-08

## Summary

Migrate the scion-a2a-bridge from a hand-rolled A2A protocol implementation to
the official `a2a-go` SDK (`github.com/a2aproject/a2a-go/v2`). This replaces
our custom JSON-RPC handling, task lifecycle management, and streaming
infrastructure with the SDK's spec-compliant implementations while preserving
our Scion Hub routing core.

## Motivation

- **Spec compliance**: The SDK tracks the A2A spec automatically. Our hand-rolled
implementation required manual updates for each spec revision.
- **Reduced maintenance**: ~500 lines of JSON-RPC, SSE streaming, and task store
code replaced by SDK.
- **Multi-transport**: SDK provides JSON-RPC, REST, and gRPC transports from a
single `RequestHandler` — we get gRPC and REST nearly for free.
- **Correctness**: SDK handles edge cases (OCC, concurrent cancellation, event
ordering) that our MVP implementation simplified or deferred.

## Architecture

### Before (hand-rolled)

```
HTTP Request → server.go (JSON-RPC dispatch) → bridge.go (task management)
→ Hub API → Broker → bridge.go (response correlation) → JSON-RPC response
```

### After (SDK-based)

```
HTTP Request → auth middleware → route extraction → SDK JSONRPC Handler
→ SDK RequestHandler → SDK task lifecycle → ScionExecutor.Execute()
→ bridge.go (Hub routing) → Broker → waiter channel → SDK events
→ SDK response serialization → HTTP response
```

### Key Components

**ScionExecutor** (`executor.go`): Implements `a2asrv.AgentExecutor`. The bridge
between the SDK's event-driven model and our Scion Hub message routing.

- `Execute()`: Translates SDK message → Scion StructuredMessage, sends to Hub,
waits for broker response, yields SDK events.
- `Cancel()`: Sends interrupt to Scion agent, yields canceled status event.

**Server** (`server.go`): Simplified HTTP routing layer. Handles:
- Multi-project/agent URL routing (`/projects/{p}/agents/{a}/jsonrpc`)
- Agent card serving (kept custom — SDK's card handler is single-agent)
- Auth middleware, rate limiting, metrics (unchanged)
- Delegates JSON-RPC to SDK's `NewJSONRPCHandler`

**Bridge** (`bridge.go`): Core Hub routing preserved. Changes:
- Added `sdkRequestHandler` field for multi-transport access
- Task lifecycle now managed by SDK's in-memory task store
- SQLite store retained for context mapping and broker correlation

**Translate** (`translate.go`): Added SDK-compatible translation functions:
- `TranslateA2APartsToScion()`: SDK `a2a.ContentParts` → Scion message
- `TranslateScionToA2AParts()`: Scion message → SDK `a2a.Message` + `a2a.Artifact`
- `MapActivityToSDKTaskState()`: Scion activity → SDK `a2a.TaskState`
- Original functions retained for backward compatibility

## What Changed

| Component | Before | After |
|-----------|--------|-------|
| JSON-RPC parsing | `server.go` hand-rolled | SDK `a2asrv.NewJSONRPCHandler` |
| Task lifecycle | `bridge.go` + SQLite | SDK in-memory task store |
| SSE streaming | `stream.go` custom | SDK built-in |
| Push notifications | `push.go` custom | SDK `push.Sender` (future) |
| A2A types | `translate.go` custom structs | SDK `a2a` package |
| Error codes | Custom constants | SDK `a2a.Err*` sentinel errors |

## What's Preserved

- **Bridge core**: Hub client routing, broker plugin, agent lookup, context
resolution, auto-provisioning — all unchanged.
- **Config**: Same YAML format, same fields.
- **Auth**: Same API key / Bearer middleware.
- **Metrics**: Same Prometheus metrics.
- **Rate limiting**: Same per-IP/key token bucket.
- **Broker plugin**: Same go-plugin RPC server.
- **SQLite store**: Retained for context mapping. Task state now also in SDK
in-memory store.

## PR Structure

### PR A: SDK Adoption (`a2a/sdk-migration`)
- Add `a2a-go/v2` dependency
- New `executor.go` (AgentExecutor implementation)
- Rewritten `server.go` (SDK handler delegation)
- Updated `translate.go` (SDK type translations)
- Updated `bridge.go` (sdkRequestHandler field)
- Updated `main.go` (SDK wiring)
- Updated tests

### PR B: gRPC + REST Transports (`a2a/sdk-grpc-rest`)
- `a2agrpc.NewHandler` for gRPC transport
- `a2asrv.NewRESTHandler` for REST transport
- Config fields: `grpc_listen_address`, `rest_listen_address`
- Startup wiring in `main.go`

## Migration Risks

1. **Task store divergence**: SDK uses in-memory store; our SQLite store tracks
context mappings separately. Tasks visible via A2A protocol come from SDK
store; context lookups use SQLite.

2. **Broker correlation**: The SDK doesn't know about our broker. Response
correlation happens inside `ScionExecutor.Execute()` using the same waiter
channel pattern.

3. **Push notification gap**: SDK has `push.Sender` interface but we haven't
wired our SSRF-safe push dispatcher yet. Push is disabled in capabilities.

## Future Work

- Wire SDK push notification support with our SSRF-safe dispatcher
- Implement SDK `taskstore.Store` interface backed by SQLite for persistence
- Add multi-turn conversation support (SDK handles it; our executor needs updates)
- Evaluate SDK's work queue for distributed deployment
164 changes: 161 additions & 3 deletions extras/scion-a2a-bridge/cmd/scion-a2a-bridge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@ import (
"syscall"
"time"

"net"

secretmanager "cloud.google.com/go/secretmanager/apiv1"
smpb "cloud.google.com/go/secretmanager/apiv1/secretmanagerpb"
"github.com/a2aproject/a2a-go/v2/a2a"
a2agrpc "github.com/a2aproject/a2a-go/v2/a2agrpc/v0"
"github.com/a2aproject/a2a-go/v2/a2asrv"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"gopkg.in/yaml.v3"

"github.com/GoogleCloudPlatform/scion/extras/scion-a2a-bridge/internal/bridge"
Expand Down Expand Up @@ -136,20 +142,39 @@ func main() {
// Wire broker into the bridge for subscription management.
b.SetBroker(broker)

// Create SDK executor and request handler.
executor := bridge.NewScionExecutor(b, log.With("component", "executor"))
sdkRequestHandler := a2asrv.NewHandler(
executor,
a2asrv.WithLogger(log.With("component", "a2a-sdk")),
a2asrv.WithCapabilityChecks(&a2a.AgentCapabilities{
Streaming: true,
PushNotifications: false,
}),
a2asrv.WithAgentInactivityTimeout(cfg.Timeouts.SendMessage),
)
b.SetSDKRequestHandler(sdkRequestHandler)

// Create SDK JSON-RPC transport handler.
sdkJSONRPCHandler := a2asrv.NewJSONRPCHandler(
sdkRequestHandler,
a2asrv.WithTransportKeepAlive(cfg.Timeouts.SSEKeepalive),
)

// Start A2A HTTP server.
listenAddr := cfg.Bridge.ListenAddress
if listenAddr == "" {
listenAddr = ":8443"
}

srv := bridge.NewServer(b, cfg, metrics, log.With("component", "a2a-server"))
srv := bridge.NewServer(b, cfg, metrics, log.With("component", "a2a-server"), sdkJSONRPCHandler)
srv.WarnOnOpenAuth()

httpServer := &http.Server{
Addr: listenAddr,
Handler: srv.Handler(),
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
WriteTimeout: 0, // Disabled for SSE connections; SDK handles timeouts.
Comment thread
zeroasterisk marked this conversation as resolved.
IdleTimeout: 120 * time.Second,
MaxHeaderBytes: 1 << 20,
}
Expand All @@ -163,7 +188,118 @@ func main() {
}
}()

log.Info("scion-a2a-bridge ready")
// Start gRPC server if configured.
// NOTE: gRPC and REST transports require a single-project, single-agent
// configuration because they lack per-request project/agent routing. The
// executor injects the configured default route into every request context.
// Auth is also not applied to these transports — secure them via network
// policy or a proxy.
var grpcServer *grpc.Server
if cfg.Bridge.GRPCListenAddress != "" {
if len(cfg.Projects) == 0 || len(cfg.Projects[0].ExposedAgents) == 0 {
log.Error("gRPC transport requires at least one project with exposed agents in config")
os.Exit(1)
}
defaultRoute := bridge.RouteInfo{
ProjectSlug: cfg.Projects[0].Slug,
AgentSlug: cfg.Projects[0].ExposedAgents[0],
}
log.Warn("gRPC transport uses fixed routing — all requests go to the first configured agent",
"project", defaultRoute.ProjectSlug, "agent", defaultRoute.AgentSlug)

if !cfg.Bridge.GRPCInsecure {
log.Warn("gRPC transport: auth enabled — clients must provide credentials via gRPC metadata",
"scheme", cfg.Auth.Scheme)
} else {
log.Warn("⚠ gRPC transport: auth DISABLED (grpc_insecure: true) — any client can send requests without credentials",
"address", cfg.Bridge.GRPCListenAddress)
}

grpcServer = grpc.NewServer(
grpc.ChainUnaryInterceptor(
bridge.AuthUnaryInterceptor(cfg),
bridge.RouteInfoUnaryInterceptor(defaultRoute),
),
grpc.ChainStreamInterceptor(
bridge.AuthStreamInterceptor(cfg),
bridge.RouteInfoStreamInterceptor(defaultRoute),
),
)
grpcHandler := a2agrpc.NewHandler(sdkRequestHandler)
grpcHandler.RegisterWith(grpcServer)

grpcListener, err := net.Listen("tcp", cfg.Bridge.GRPCListenAddress)
if err != nil {
log.Error("failed to listen for gRPC", "address", cfg.Bridge.GRPCListenAddress, "error", err)
os.Exit(1)
}

go func() {
log.Info("gRPC transport starting", "address", cfg.Bridge.GRPCListenAddress)
if err := grpcServer.Serve(grpcListener); err != nil {
errCh <- fmt.Errorf("gRPC server: %w", err)
}
}()
}

// Start REST server if configured.
var restServer *http.Server
if cfg.Bridge.RESTListenAddress != "" {
if len(cfg.Projects) == 0 || len(cfg.Projects[0].ExposedAgents) == 0 {
log.Error("REST transport requires at least one project with exposed agents in config")
os.Exit(1)
}
defaultRoute := bridge.RouteInfo{
ProjectSlug: cfg.Projects[0].Slug,
AgentSlug: cfg.Projects[0].ExposedAgents[0],
}
log.Warn("REST transport uses fixed routing — all requests go to the first configured agent",
"project", defaultRoute.ProjectSlug, "agent", defaultRoute.AgentSlug)
if !cfg.Bridge.RESTInsecure {
log.Warn("REST transport: auth enabled — clients must provide credentials via HTTP headers",
"scheme", cfg.Auth.Scheme)
} else {
log.Warn("⚠ REST transport: auth DISABLED (rest_insecure: true) — any client can send requests without credentials",
"address", cfg.Bridge.RESTListenAddress)
}

restHandler := bridge.AuthHTTPMiddleware(cfg, bridge.RouteInfoMiddleware(defaultRoute,
bridge.SSEWriteDeadlineMiddleware(
a2asrv.NewRESTHandler(
sdkRequestHandler,
a2asrv.WithTransportKeepAlive(cfg.Timeouts.SSEKeepalive),
),
),
))

restServer = &http.Server{
Addr: cfg.Bridge.RESTListenAddress,
Handler: restHandler,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 120 * time.Second,
MaxHeaderBytes: 1 << 20,
}

go func() {
log.Info("REST transport starting", "address", cfg.Bridge.RESTListenAddress)
if err := restServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
errCh <- fmt.Errorf("REST server: %w", err)
}
}()
}

transports := []string{"JSON-RPC"}
if cfg.Bridge.GRPCListenAddress != "" {
transports = append(transports, "gRPC")
}
if cfg.Bridge.RESTListenAddress != "" {
transports = append(transports, "REST")
}
log.Info("scion-a2a-bridge ready",
"transports", transports,
"sdk", "a2a-go/v2",
)

// Wait for shutdown signal.
sigCh := make(chan os.Signal, 1)
Expand All @@ -183,6 +319,28 @@ func main() {
log.Error("failed to stop A2A server", "error", err)
}

if grpcServer != nil {
grpcStopped := make(chan struct{})
go func() {
grpcServer.GracefulStop()
close(grpcStopped)
}()
select {
case <-grpcStopped:
log.Info("gRPC server stopped gracefully")
case <-shutdownCtx.Done():
log.Warn("gRPC graceful shutdown timed out, forcing stop")
grpcServer.Stop()
}
}
Comment thread
zeroasterisk marked this conversation as resolved.

if restServer != nil {
if err := restServer.Shutdown(shutdownCtx); err != nil {
log.Error("failed to stop REST server", "error", err)
}
log.Info("REST server stopped")
}

// Drain background goroutines before closing the store.
b.Shutdown()

Expand Down
9 changes: 6 additions & 3 deletions extras/scion-a2a-bridge/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ go 1.26.1
require (
cloud.google.com/go/secretmanager v1.16.0
github.com/GoogleCloudPlatform/scion v0.0.0-00010101000000-000000000000
github.com/a2aproject/a2a-go/v2 v2.3.1
github.com/go-jose/go-jose/v4 v4.1.4
github.com/google/uuid v1.6.0
github.com/hashicorp/go-plugin v1.7.0
github.com/mattn/go-sqlite3 v1.14.28
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
google.golang.org/grpc v1.80.0
gopkg.in/yaml.v3 v3.0.1
)

Expand All @@ -19,6 +21,7 @@ require (
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
cloud.google.com/go/compute/metadata v0.9.0 // indirect
cloud.google.com/go/iam v1.5.3 // indirect
github.com/a2aproject/a2a-go v0.3.15 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/fatih/color v1.16.0 // indirect
Expand All @@ -45,6 +48,7 @@ require (
go.opentelemetry.io/otel/trace v1.43.0 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
golang.org/x/crypto v0.49.0 // indirect
golang.org/x/mod v0.33.0 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/oauth2 v0.35.0 // indirect
golang.org/x/sync v0.20.0 // indirect
Expand All @@ -54,9 +58,8 @@ require (
golang.org/x/time v0.14.0 // indirect
google.golang.org/api v0.259.0 // indirect
google.golang.org/genproto v0.0.0-20251202230838-ff82c1b0f217 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect
google.golang.org/grpc v1.80.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260427160629-7cedc36a6bc4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260427160629-7cedc36a6bc4 // indirect
google.golang.org/protobuf v1.36.11 // indirect
)

Expand Down
Loading