From 46b59c126a0893ee81ac685e4a80d69719310677 Mon Sep 17 00:00:00 2001 From: Ameer Deen Date: Sat, 30 May 2026 21:49:33 +0400 Subject: [PATCH] feat(server): OpenAI-compatible serve path for forged agents MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `forge-agent`: an executable that serves forge Agents behind a local OpenAI-compatible endpoint, so an existing host CLI (Codex, Grok Build) can point at it via OPENAI_BASE_URL and run "warm from prompt 1" — the Engine owns the loop and the agent's scaffold is the system prompt. - server/: http.Handler implementing GET /v1/models, POST /v1/chat/completions, and POST /v1/responses (Codex speaks the Responses API, not Chat Completions), both with SSE streaming. Translates OpenAI <-> forge messages. - Request-body decompression (zstd/gzip) via Content-Encoding + magic-byte sniffing — Codex ships the body zstd-compressed, which was the real failure. - Default-agent fallback: host GUIs send their own model ids (e.g. "gpt-5.5") that aren't agent names; unknown models resolve to a configured default agent instead of 404. - Request logging for live debuggability of host-CLI integration. - cmd/forge-agent: wires the north-star demo agents (vanilla_reviewer vs forged_reviewer) and serves them. - scripts/: serve.ps1 (launch) + demo.sh (curl driver). - Tests cover models/chat/responses, streaming, content-array + string inputs, zstd-encoded bodies, and the default-agent fallback. Validated live end-to-end through the real Codex CLI and GUI. Co-Authored-By: Claude Opus 4.8 (1M context) --- cmd/forge-agent/main.go | 83 ++++++++ go.mod | 8 +- go.sum | 9 + scripts/demo.sh | 33 +++ scripts/serve.ps1 | 31 +++ server/responses.go | 234 +++++++++++++++++++++ server/responses_test.go | 200 ++++++++++++++++++ server/server.go | 425 +++++++++++++++++++++++++++++++++++++++ server/server_test.go | 178 ++++++++++++++++ 9 files changed, 1199 insertions(+), 2 deletions(-) create mode 100644 cmd/forge-agent/main.go create mode 100755 scripts/demo.sh create mode 100755 scripts/serve.ps1 create mode 100644 server/responses.go create mode 100644 server/responses_test.go create mode 100644 server/server.go create mode 100644 server/server_test.go diff --git a/cmd/forge-agent/main.go b/cmd/forge-agent/main.go new file mode 100644 index 0000000..781521e --- /dev/null +++ b/cmd/forge-agent/main.go @@ -0,0 +1,83 @@ +// Command forge-agent serves forged agents behind a local OpenAI-compatible +// endpoint. It is the runnable proof of the serve path: it wires demo agents +// (defined in Go) to the OpenAI gateway so you can point any OpenAI client at +// it. +// +// Usage: +// +// export OPENAI_API_KEY=sk-... # key for the upstream provider +// go run ./cmd/forge-agent --addr :8787 +// +// export OPENAI_BASE_URL=http://localhost:8787/v1 +// export OPENAI_API_KEY=forge-local # the gateway ignores this +// # now start Codex CLI / Grok Build, or just curl /v1/chat/completions +// +// The two demo agents share one provider and model and differ only in their +// scaffold — the north-star demo: same model, same task, different package. +package main + +import ( + "flag" + "log" + "net/http" + "os" + + "github.com/katasec/forge" + "github.com/katasec/forge/provider/openai" + "github.com/katasec/forge/server" +) + +const forgedReviewerScaffold = `You are a repository reviewer operating under a mission scaffold. + +Operating rules: +- Start from a small orientation layer; do not assume the whole repo. +- Ground every finding in a concrete file, command, or repo fact — no generic advice. +- Identify exactly one concrete, high-value next improvement. +- Recommend the verification (build/test/lint) that proves the change. +- Keep output structured: Findings, Next Improvement, Verification.` + +const vanillaScaffold = "You are a helpful coding assistant. Review the repository." + +func main() { + addr := flag.String("addr", ":8787", "address to listen on") + model := flag.String("model", string(openai.ModelGPT54Nano), "upstream model id") + baseURL := flag.String("base-url", "", "override upstream OpenAI base URL (e.g. for xAI)") + defaultAgent := flag.String("default-agent", "forged_reviewer", + "agent to use when a client requests an unknown model id (host GUIs send their own model names); empty for strict 404") + flag.Parse() + + apiKey := os.Getenv("OPENAI_API_KEY") + if apiKey == "" { + log.Fatal("OPENAI_API_KEY is required (the upstream provider key)") + } + + var opts []openai.Option + if *baseURL != "" { + opts = append(opts, openai.WithBaseURL(*baseURL)) + } + provider := openai.New(apiKey, openai.Model(*model), opts...) + + agents := map[string]*forge.Agent{ + "vanilla_reviewer": mustAgent(provider, vanillaScaffold), + "forged_reviewer": mustAgent(provider, forgedReviewerScaffold), + } + + srv := server.New(agents, *defaultAgent) + log.Printf("forge-agent serving %d agents on %s (upstream model %s, default agent %q)", len(agents), *addr, *model, *defaultAgent) + log.Printf("point your client at: export OPENAI_BASE_URL=http://localhost%s/v1", *addr) + if err := http.ListenAndServe(*addr, srv); err != nil { + log.Fatal(err) + } +} + +func mustAgent(provider forge.Provider, scaffold string) *forge.Agent { + agent, err := forge.NewAgent(forge.Config{ + Provider: provider, + SystemPrompt: scaffold, + DisableMemory: true, // OpenAI clients are stateless; they resend history + }) + if err != nil { + log.Fatal(err) + } + return agent +} diff --git a/go.mod b/go.mod index 3131dc8..f1e68d4 100644 --- a/go.mod +++ b/go.mod @@ -2,11 +2,15 @@ module github.com/katasec/forge go 1.25.6 +require ( + github.com/google/uuid v1.6.0 + github.com/invopop/jsonschema v0.13.0 + github.com/klauspost/compress v1.18.6 +) + require ( github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect - github.com/google/uuid v1.6.0 // indirect - github.com/invopop/jsonschema v0.13.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index dfa246e..1a4991b 100644 --- a/go.sum +++ b/go.sum @@ -2,15 +2,24 @@ github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPn github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/invopop/jsonschema v0.13.0 h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcIi21E= github.com/invopop/jsonschema v0.13.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXDjuao= +github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/scripts/demo.sh b/scripts/demo.sh new file mode 100755 index 0000000..4905b64 --- /dev/null +++ b/scripts/demo.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash +# Exercise the forge-agent serve path end-to-end against a running server. +# +# Start the server first (see scripts/serve.ps1), then run this. No API key is +# needed here — we only curl localhost; the server holds the upstream key. +# +# ./scripts/demo.sh # default http://localhost:8787/v1 +# ./scripts/demo.sh http://localhost:9000/v1 +set -euo pipefail + +BASE="${1:-http://localhost:8787/v1}" +PROMPT='Review a small Go HTTP server package and suggest the single next useful improvement.' + +pp() { python3 -c 'import sys,json; d=json.load(sys.stdin); print(d.get("choices",[{}])[0].get("message",{}).get("content", d))'; } + +echo "== GET $BASE/models ==" +curl -s "$BASE/models" | python3 -m json.tool + +for model in vanilla_reviewer forged_reviewer; do + echo + echo "== POST $BASE/chat/completions (model=$model) ==" + curl -s "$BASE/chat/completions" \ + -H 'Content-Type: application/json' \ + -d "{\"model\":\"$model\",\"messages\":[{\"role\":\"user\",\"content\":\"$PROMPT\"}]}" \ + | pp +done + +echo +echo "== streaming (model=forged_reviewer, stream=true) ==" +curl -sN "$BASE/chat/completions" \ + -H 'Content-Type: application/json' \ + -d "{\"model\":\"forged_reviewer\",\"messages\":[{\"role\":\"user\",\"content\":\"$PROMPT\"}],\"stream\":true}" +echo diff --git a/scripts/serve.ps1 b/scripts/serve.ps1 new file mode 100755 index 0000000..ff0ef93 --- /dev/null +++ b/scripts/serve.ps1 @@ -0,0 +1,31 @@ +#!/usr/bin/env pwsh +# Launch forge-agent behind a local OpenAI-compatible endpoint. +# +# Reads OPENAI_API_KEY from your environment (your pwsh profile). Runs in the +# foreground so you see logs; press Ctrl-C to stop. +# +# ./scripts/serve.ps1 # :8799, model gpt-5.4-nano +# ./scripts/serve.ps1 -Addr :9000 # custom port +# ./scripts/serve.ps1 -BaseUrl https://api.x.ai/v1 # point upstream at xAI +# +# Then, in another terminal: ./scripts/demo.sh +param( + [string]$Addr = ':8787', + [string]$Model = 'gpt-5.4-nano', + [string]$BaseUrl = '' +) +$ErrorActionPreference = 'Stop' +Set-Location (Split-Path $PSScriptRoot -Parent) + +if (-not $env:OPENAI_API_KEY) { + Write-Error 'OPENAI_API_KEY is not set in this environment.' + exit 1 +} + +Write-Host "forge-agent -> http://localhost$Addr/v1 (upstream model $Model)" +Write-Host "verify with: ./scripts/demo.sh (or: curl http://localhost$Addr/v1/models)" +Write-Host '' + +$goArgs = @('run', './cmd/forge-agent', '--addr', $Addr, '--model', $Model) +if ($BaseUrl) { $goArgs += @('--base-url', $BaseUrl) } +& go @goArgs diff --git a/server/responses.go b/server/responses.go new file mode 100644 index 0000000..4784cd2 --- /dev/null +++ b/server/responses.go @@ -0,0 +1,234 @@ +package server + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "time" + + "github.com/google/uuid" + "github.com/katasec/forge" +) + +// This file implements the subset of the OpenAI Responses API that lets a host +// CLI which speaks Responses (e.g. Codex) point at a forged agent. Like the +// Chat Completions path, the Engine owns the loop: we map model -> agent, run +// the agent, and return the final assistant text. Inbound tools and the host's +// own instructions are not yet relayed (that is the passthrough / graduation +// work); the agent's scaffold remains the authoritative system prompt. + +// --- request --- + +type responsesRequest struct { + Model string `json:"model"` + Instructions string `json:"instructions,omitempty"` + Input responsesInput `json:"input"` + Stream bool `json:"stream,omitempty"` +} + +// responsesInput decodes the Responses "input" field, which may be a bare +// string or an array of items (messages, plus tool/reasoning items we skip). +type responsesInput struct { + messages []forge.Message +} + +func (ri *responsesInput) UnmarshalJSON(b []byte) error { + var s string + if err := json.Unmarshal(b, &s); err == nil { + ri.messages = []forge.Message{forge.UserText(s)} + return nil + } + var items []struct { + Type string `json:"type"` + Role string `json:"role"` + Content contentField `json:"content"` + } + if err := json.Unmarshal(b, &items); err != nil { + // Tolerate unexpected shapes rather than failing the whole request; the + // handler logs the raw body so the parser can be tightened to match. + return nil + } + for _, it := range items { + // Keep message items (type "message" or unset); skip function_call, + // function_call_output, reasoning, etc. + if it.Type != "" && it.Type != "message" { + continue + } + if it.Content.text == "" { + continue + } + ri.messages = append(ri.messages, forge.Message{ + Role: toForgeRole(it.Role), + Content: []forge.ContentBlock{forge.Text(it.Content.text)}, + }) + } + return nil +} + +// --- response object (shared by JSON reply and streaming events) --- + +type responsesResponse struct { + ID string `json:"id"` + Object string `json:"object"` + CreatedAt int64 `json:"created_at"` + Status string `json:"status"` + Model string `json:"model"` + Output []responsesItem `json:"output"` + Usage *responsesUsage `json:"usage,omitempty"` + Error any `json:"error"` +} + +type responsesItem struct { + ID string `json:"id"` + Type string `json:"type"` + Status string `json:"status"` + Role string `json:"role"` + Content []responsesPart `json:"content"` +} + +type responsesPart struct { + Type string `json:"type"` + Text string `json:"text"` + Annotations []any `json:"annotations"` +} + +type responsesUsage struct { + InputTokens int `json:"input_tokens"` + OutputTokens int `json:"output_tokens"` + TotalTokens int `json:"total_tokens"` +} + +func (s *Server) handleResponses(w http.ResponseWriter, r *http.Request) { + data, err := readBody(r) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid_request_error", "could not read request body") + return + } + + var req responsesRequest + if err := json.Unmarshal(data, &req); err != nil { + log.Printf("responses: decode error: %v; raw=%s", err, truncate(data, 2000)) + writeError(w, http.StatusBadRequest, "invalid_request_error", "could not parse request body") + return + } + + log.Printf("responses: model=%q stream=%v messages=%d", req.Model, req.Stream, len(req.Input.messages)) + if len(req.Input.messages) == 0 { + // We accepted the body but extracted no user content — log the raw + // payload so the parser can be matched to Codex's exact schema. + log.Printf("responses: WARNING parsed 0 messages; raw=%s", truncate(data, 2000)) + } + + agent, resolved, ok := s.resolve(req.Model) + if !ok { + writeError(w, http.StatusNotFound, "invalid_request_error", + fmt.Sprintf("model %q does not exist", req.Model)) + return + } + if resolved != req.Model { + log.Printf("responses: model %q -> agent %q (default)", req.Model, resolved) + } + + resp, err := agent.Run(r.Context(), forge.AgentRequest{Messages: req.Input.messages}) + if err != nil { + writeError(w, http.StatusBadGateway, "upstream_error", err.Error()) + return + } + + id := "resp_" + uuid.NewString() + itemID := "msg_" + uuid.NewString() + created := time.Now().Unix() + text := resp.LastText() + usage := &responsesUsage{ + InputTokens: resp.Usage.InputTokens, + OutputTokens: resp.Usage.OutputTokens, + TotalTokens: totalTokens(resp.Usage), + } + + completed := responsesResponse{ + ID: id, + Object: "response", + CreatedAt: created, + Status: "completed", + Model: req.Model, + Output: []responsesItem{{ + ID: itemID, + Type: "message", + Status: "completed", + Role: "assistant", + Content: []responsesPart{{Type: "output_text", Text: text, Annotations: []any{}}}, + }}, + Usage: usage, + } + + if req.Stream { + writeResponsesStream(w, completed, itemID, text) + return + } + writeJSON(w, http.StatusOK, completed) +} + +// writeResponsesStream emits the Responses streaming protocol: named SSE events +// terminating in response.completed. The Engine runs to completion first, so +// the text is delivered as a single output_text delta rather than token by +// token; the event envelope is what Codex needs to consume a streamed turn. +func writeResponsesStream(w http.ResponseWriter, completed responsesResponse, itemID, text string) { + flusher, ok := w.(http.Flusher) + if !ok { + writeError(w, http.StatusInternalServerError, "server_error", "streaming unsupported") + return + } + + h := w.Header() + h.Set("Content-Type", "text/event-stream") + h.Set("Cache-Control", "no-cache") + h.Set("Connection", "keep-alive") + w.WriteHeader(http.StatusOK) + + seq := 0 + send := func(eventType string, data map[string]any) { + data["type"] = eventType + data["sequence_number"] = seq + seq++ + b, _ := json.Marshal(data) + fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, b) + flusher.Flush() + } + + // In-progress snapshot: same response, empty output, status in_progress. + inProgress := completed + inProgress.Status = "in_progress" + inProgress.Output = []responsesItem{} + inProgress.Usage = nil + + emptyItem := responsesItem{ID: itemID, Type: "message", Status: "in_progress", Role: "assistant", Content: []responsesPart{}} + finalItem := completed.Output[0] + + send("response.created", map[string]any{"response": inProgress}) + send("response.in_progress", map[string]any{"response": inProgress}) + send("response.output_item.added", map[string]any{"output_index": 0, "item": emptyItem}) + send("response.content_part.added", map[string]any{ + "item_id": itemID, "output_index": 0, "content_index": 0, + "part": responsesPart{Type: "output_text", Text: "", Annotations: []any{}}, + }) + send("response.output_text.delta", map[string]any{ + "item_id": itemID, "output_index": 0, "content_index": 0, "delta": text, + }) + send("response.output_text.done", map[string]any{ + "item_id": itemID, "output_index": 0, "content_index": 0, "text": text, + }) + send("response.content_part.done", map[string]any{ + "item_id": itemID, "output_index": 0, "content_index": 0, + "part": responsesPart{Type: "output_text", Text: text, Annotations: []any{}}, + }) + send("response.output_item.done", map[string]any{"output_index": 0, "item": finalItem}) + send("response.completed", map[string]any{"response": completed}) +} + +func truncate(b []byte, n int) string { + if len(b) <= n { + return string(b) + } + return string(b[:n]) + "…(truncated)" +} diff --git a/server/responses_test.go b/server/responses_test.go new file mode 100644 index 0000000..1269819 --- /dev/null +++ b/server/responses_test.go @@ -0,0 +1,200 @@ +package server + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/klauspost/compress/zstd" +) + +func TestResponsesNonStreaming(t *testing.T) { + prov := &recordingProvider{reply: "a grounded review"} + srv := newTestServer(t, prov) + + // Responses-style input: array of message items with input_text parts. + body := `{"model":"forged_reviewer","instructions":"codex system prompt", + "input":[{"type":"message","role":"user","content":[{"type":"input_text","text":"review this repo"}]}]}` + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(body))) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200 (%s)", rec.Code, rec.Body.String()) + } + + var resp responsesResponse + if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp.Object != "response" || resp.Status != "completed" { + t.Errorf("object/status = %q/%q", resp.Object, resp.Status) + } + if len(resp.Output) != 1 || len(resp.Output[0].Content) != 1 { + t.Fatalf("unexpected output: %+v", resp.Output) + } + if got := resp.Output[0].Content[0].Text; got != "a grounded review" { + t.Errorf("output text = %q", got) + } + if resp.Output[0].Content[0].Type != "output_text" { + t.Errorf("part type = %q, want output_text", resp.Output[0].Content[0].Type) + } + + // The input_text part must have reached the provider. + last := prov.lastReq.Messages[len(prov.lastReq.Messages)-1] + if last.Text() != "review this repo" { + t.Errorf("user message not forwarded: %q", last.Text()) + } + if prov.lastReq.SystemPrompt != "SCAFFOLD" { + t.Errorf("scaffold not forwarded: %q", prov.lastReq.SystemPrompt) + } +} + +func TestResponsesStringInput(t *testing.T) { + prov := &recordingProvider{reply: "ok"} + srv := newTestServer(t, prov) + + body := `{"model":"forged_reviewer","input":"just a string"}` + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(body))) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200 (%s)", rec.Code, rec.Body.String()) + } + last := prov.lastReq.Messages[len(prov.lastReq.Messages)-1] + if last.Text() != "just a string" { + t.Errorf("string input not forwarded: %q", last.Text()) + } +} + +func TestResponsesStreaming(t *testing.T) { + srv := newTestServer(t, &recordingProvider{reply: "streamed answer"}) + + body := `{"model":"forged_reviewer","input":"go","stream":true}` + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(body))) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", rec.Code) + } + if ct := rec.Header().Get("Content-Type"); ct != "text/event-stream" { + t.Errorf("content-type = %q", ct) + } + + out := rec.Body.String() + for _, want := range []string{ + "event: response.created", + "event: response.output_text.delta", + `"delta":"streamed answer"`, + "event: response.completed", + `"status":"completed"`, + } { + if !strings.Contains(out, want) { + t.Errorf("stream missing %q in:\n%s", want, out) + } + } + + // Every event should carry an incrementing sequence_number. + if !strings.Contains(out, `"sequence_number":0`) || !strings.Contains(out, `"sequence_number":1`) { + t.Errorf("missing sequence numbers in:\n%s", out) + } +} + +// TestResponsesZstdEncodedBody reproduces what Codex actually sends: a +// zstd-compressed request body with Content-Encoding: zstd. +func TestResponsesZstdEncodedBody(t *testing.T) { + prov := &recordingProvider{reply: "decompressed ok"} + srv := newTestServer(t, prov) + + payload := `{"model":"forged_reviewer","input":[{"type":"message","role":"user","content":[{"type":"input_text","text":"hello zstd"}]}]}` + enc, err := zstd.NewWriter(nil) + if err != nil { + t.Fatalf("zstd writer: %v", err) + } + compressed := enc.EncodeAll([]byte(payload), nil) + enc.Close() + + req := httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(compressed)) + req.Header.Set("Content-Encoding", "zstd") + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200 (%s)", rec.Code, rec.Body.String()) + } + last := prov.lastReq.Messages[len(prov.lastReq.Messages)-1] + if last.Text() != "hello zstd" { + t.Errorf("zstd body not decoded: %q", last.Text()) + } +} + +// TestResponsesZstdSniffedNoHeader covers a compressed body with no +// Content-Encoding header — decoded via magic-byte sniffing. +func TestResponsesZstdSniffedNoHeader(t *testing.T) { + prov := &recordingProvider{reply: "ok"} + srv := newTestServer(t, prov) + + payload := `{"model":"forged_reviewer","input":"sniffed"}` + enc, _ := zstd.NewWriter(nil) + compressed := enc.EncodeAll([]byte(payload), nil) + enc.Close() + + req := httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(compressed)) + // deliberately no Content-Encoding header + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200 (%s)", rec.Code, rec.Body.String()) + } + last := prov.lastReq.Messages[len(prov.lastReq.Messages)-1] + if last.Text() != "sniffed" { + t.Errorf("sniffed zstd body not decoded: %q", last.Text()) + } +} + +// TestResponsesDefaultAgentFallback reproduces the GUI case: a host that sends +// its own model id (e.g. "gpt-5.5") should fall back to the default agent +// rather than 404. +func TestResponsesDefaultAgentFallback(t *testing.T) { + prov := &recordingProvider{reply: "ok"} + srv := newTestServerWithDefault(t, prov, "forged_reviewer") + + body := `{"model":"gpt-5.5","input":"hi from the GUI"}` + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(body))) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200 (fallback should apply) (%s)", rec.Code, rec.Body.String()) + } + // The scaffold of the default agent must still be applied. + if prov.lastReq.SystemPrompt != "SCAFFOLD" { + t.Errorf("default agent scaffold not applied: %q", prov.lastReq.SystemPrompt) + } +} + +func TestResponsesUnknownModelNoDefault(t *testing.T) { + srv := newTestServer(t, &recordingProvider{reply: "ok"}) // no default + + body := `{"model":"gpt-5.5","input":"hi"}` + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(body))) + + if rec.Code != http.StatusNotFound { + t.Fatalf("status = %d, want 404 (strict, no default)", rec.Code) + } +} + +func TestResponsesUnknownModel(t *testing.T) { + srv := newTestServer(t, &recordingProvider{reply: "ok"}) + + body := `{"model":"nope","input":"hi"}` + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(body))) + + if rec.Code != http.StatusNotFound { + t.Fatalf("status = %d, want 404", rec.Code) + } +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..2a1cc3d --- /dev/null +++ b/server/server.go @@ -0,0 +1,425 @@ +// Package server exposes a forge Agent behind an OpenAI-compatible HTTP API. +// +// It implements the minimal surface needed to point an existing OpenAI client +// (set OPENAI_BASE_URL) at a forged agent: GET /v1/models lists the available +// agents as models, and POST /v1/chat/completions runs the named agent's full +// loop (the Engine owns the loop) and returns an OpenAI chat completion. +// +// The agent name is the OpenAI "model" field. The server is stateless: an +// OpenAI client sends the full message history on every call, so the agents it +// serves should be created with forge.Config{DisableMemory: true}. +package server + +import ( + "bytes" + "compress/flate" + "compress/gzip" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "strings" + "time" + + "github.com/google/uuid" + "github.com/katasec/forge" + "github.com/klauspost/compress/zstd" +) + +// Server routes OpenAI-compatible requests to forge Agents keyed by model name. +type Server struct { + agents map[string]*forge.Agent + defaultAgent string + mux *http.ServeMux +} + +// New builds a Server that serves the given agents. The map key is the model +// name a client requests (e.g. "forged_reviewer"). +// +// defaultAgent names the agent to use when a client requests a model that is +// not a known agent — host GUIs/CLIs send their own model ids (e.g. "gpt-5.5") +// that won't match an agent name. If defaultAgent is "" or unknown, an +// unrecognized model returns 404 (strict mode). +func New(agents map[string]*forge.Agent, defaultAgent string) *Server { + s := &Server{ + agents: agents, + defaultAgent: defaultAgent, + mux: http.NewServeMux(), + } + s.mux.HandleFunc("GET /v1/models", s.handleModels) + s.mux.HandleFunc("POST /v1/chat/completions", s.handleChatCompletions) + s.mux.HandleFunc("POST /v1/responses", s.handleResponses) + return s +} + +// resolve maps a requested model name to an agent, falling back to the +// configured default agent for unrecognized model ids. It returns the agent and +// the resolved agent name, or ok=false when no agent applies. +func (s *Server) resolve(model string) (*forge.Agent, string, bool) { + if a, ok := s.agents[model]; ok { + return a, model, true + } + if s.defaultAgent != "" { + if a, ok := s.agents[s.defaultAgent]; ok { + return a, s.defaultAgent, true + } + } + return nil, "", false +} + +// ServeHTTP implements http.Handler. It logs each request's method, path, and +// status so the serve path is debuggable when a host CLI (e.g. Codex) points at +// it — an unexpected 404 on /v1/responses immediately shows a wire-API mismatch. +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + sw := &statusWriter{ResponseWriter: w, status: http.StatusOK} + s.mux.ServeHTTP(sw, r) + log.Printf("%s %s -> %d", r.Method, r.URL.Path, sw.status) +} + +// statusWriter captures the response status for logging while preserving the +// http.Flusher behaviour the streaming path depends on. +type statusWriter struct { + http.ResponseWriter + status int +} + +func (w *statusWriter) WriteHeader(code int) { + w.status = code + w.ResponseWriter.WriteHeader(code) +} + +func (w *statusWriter) Flush() { + if f, ok := w.ResponseWriter.(http.Flusher); ok { + f.Flush() + } +} + +// readBody reads and decompresses an HTTP request body. Host CLIs (notably +// Codex) send the request zstd- or gzip-compressed; it honours Content-Encoding +// and falls back to sniffing the magic bytes when the header is absent. +func readBody(r *http.Request) ([]byte, error) { + raw, err := io.ReadAll(r.Body) + if err != nil { + return nil, err + } + + enc := strings.ToLower(strings.TrimSpace(r.Header.Get("Content-Encoding"))) + if enc == "" { + switch { + case len(raw) >= 4 && raw[0] == 0x28 && raw[1] == 0xb5 && raw[2] == 0x2f && raw[3] == 0xfd: + enc = "zstd" + case len(raw) >= 2 && raw[0] == 0x1f && raw[1] == 0x8b: + enc = "gzip" + } + } + + switch enc { + case "zstd": + zr, err := zstd.NewReader(bytes.NewReader(raw)) + if err != nil { + return nil, err + } + defer zr.Close() + return io.ReadAll(zr) + case "gzip": + gz, err := gzip.NewReader(bytes.NewReader(raw)) + if err != nil { + return nil, err + } + defer gz.Close() + return io.ReadAll(gz) + case "deflate": + fr := flate.NewReader(bytes.NewReader(raw)) + defer fr.Close() + return io.ReadAll(fr) + default: + return raw, nil + } +} + +// --- OpenAI wire types (request) --- + +type chatRequest struct { + Model string `json:"model"` + Messages []reqMessage `json:"messages"` + Stream bool `json:"stream,omitempty"` +} + +type reqMessage struct { + Role string `json:"role"` + Content contentField `json:"content"` +} + +// contentField decodes message content that may be either a plain string or an +// array of typed parts. It covers both Chat Completions parts +// ([{type:"text",text:"..."}]) and Responses parts +// ([{type:"input_text"|"output_text",text:"..."}]) by concatenating every +// part's text regardless of part type. +type contentField struct { + text string +} + +func (c *contentField) UnmarshalJSON(b []byte) error { + var s string + if err := json.Unmarshal(b, &s); err == nil { + c.text = s + return nil + } + var parts []struct { + Text string `json:"text"` + } + if err := json.Unmarshal(b, &parts); err == nil { + var sb strings.Builder + for _, p := range parts { + sb.WriteString(p.Text) + } + c.text = sb.String() + return nil + } + // Tolerate unexpected shapes (objects, nulls, non-text parts) rather than + // failing the whole request; best-effort text extraction only. + return nil +} + +// --- OpenAI wire types (response) --- + +type chatResponse struct { + ID string `json:"id"` + Object string `json:"object"` + Created int64 `json:"created"` + Model string `json:"model"` + Choices []respChoice `json:"choices"` + Usage usage `json:"usage"` +} + +type respChoice struct { + Index int `json:"index"` + Message respMessage `json:"message"` + FinishReason string `json:"finish_reason"` +} + +type respMessage struct { + Role string `json:"role"` + Content string `json:"content"` +} + +type usage struct { + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + TotalTokens int `json:"total_tokens"` +} + +// --- streaming wire types --- + +type streamChunk struct { + ID string `json:"id"` + Object string `json:"object"` + Created int64 `json:"created"` + Model string `json:"model"` + Choices []streamChoice `json:"choices"` +} + +type streamChoice struct { + Index int `json:"index"` + Delta delta `json:"delta"` + FinishReason string `json:"finish_reason,omitempty"` +} + +type delta struct { + Role string `json:"role,omitempty"` + Content string `json:"content,omitempty"` +} + +// --- models endpoint --- + +type modelList struct { + Object string `json:"object"` + Data []modelInfo `json:"data"` +} + +type modelInfo struct { + ID string `json:"id"` + Object string `json:"object"` + Created int64 `json:"created"` + OwnedBy string `json:"owned_by"` +} + +func (s *Server) handleModels(w http.ResponseWriter, _ *http.Request) { + now := time.Now().Unix() + list := modelList{Object: "list"} + for name := range s.agents { + list.Data = append(list.Data, modelInfo{ + ID: name, + Object: "model", + Created: now, + OwnedBy: "forge", + }) + } + writeJSON(w, http.StatusOK, list) +} + +func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) { + data, err := readBody(r) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid_request_error", "could not read request body") + return + } + var req chatRequest + if err := json.Unmarshal(data, &req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_request_error", "could not parse request body") + return + } + + log.Printf("chat: model=%q stream=%v messages=%d", req.Model, req.Stream, len(req.Messages)) + + agent, resolved, ok := s.resolve(req.Model) + if !ok { + writeError(w, http.StatusNotFound, "invalid_request_error", + fmt.Sprintf("model %q does not exist", req.Model)) + return + } + if resolved != req.Model { + log.Printf("chat: model %q -> agent %q (default)", req.Model, resolved) + } + + resp, err := agent.Run(r.Context(), forge.AgentRequest{ + Messages: translateMessages(req.Messages), + }) + if err != nil { + writeError(w, http.StatusBadGateway, "upstream_error", err.Error()) + return + } + + id := "chatcmpl-" + uuid.NewString() + created := time.Now().Unix() + text := resp.LastText() + finish := toOAIFinish(resp.FinishReason) + + if req.Stream { + writeStream(w, id, created, req.Model, text, finish) + return + } + + writeJSON(w, http.StatusOK, chatResponse{ + ID: id, + Object: "chat.completion", + Created: created, + Model: req.Model, + Choices: []respChoice{{ + Index: 0, + Message: respMessage{Role: "assistant", Content: text}, + FinishReason: finish, + }}, + Usage: usage{ + PromptTokens: resp.Usage.InputTokens, + CompletionTokens: resp.Usage.OutputTokens, + TotalTokens: totalTokens(resp.Usage), + }, + }) +} + +// translateMessages maps OpenAI messages to forge messages 1:1. The agent's own +// scaffold remains the authoritative system prompt; any client-supplied system +// message flows through as a forge system message. +func translateMessages(msgs []reqMessage) []forge.Message { + out := make([]forge.Message, 0, len(msgs)) + for _, m := range msgs { + out = append(out, forge.Message{ + Role: toForgeRole(m.Role), + Content: []forge.ContentBlock{forge.Text(m.Content.text)}, + }) + } + return out +} + +func toForgeRole(r string) forge.Role { + switch r { + case "system": + return forge.RoleSystem + case "assistant": + return forge.RoleAssistant + case "tool": + return forge.RoleTool + default: + return forge.RoleUser + } +} + +func toOAIFinish(r forge.FinishReason) string { + switch r { + case forge.FinishReasonIterLimit: + return "length" + default: + // stop, error, and tool_use (already resolved by the loop) all present + // as a completed turn to the client. + return "stop" + } +} + +func totalTokens(u forge.TokenUsage) int { + if u.TotalTokens > 0 { + return u.TotalTokens + } + return u.InputTokens + u.OutputTokens +} + +// writeStream emits the completed response as SSE chat.completion.chunk frames. +// +// The Engine runs the loop to completion before this is called, so the content +// is delivered in a single delta rather than token-by-token. The endpoint still +// speaks SSE so that clients requiring stream:true work; true token streaming +// depends on a streaming Provider, which the Engine does not yet expose. +func writeStream(w http.ResponseWriter, id string, created int64, model, text, finish string) { + flusher, ok := w.(http.Flusher) + if !ok { + writeError(w, http.StatusInternalServerError, "server_error", "streaming unsupported") + return + } + + h := w.Header() + h.Set("Content-Type", "text/event-stream") + h.Set("Cache-Control", "no-cache") + h.Set("Connection", "keep-alive") + w.WriteHeader(http.StatusOK) + + send := func(c streamChunk) { + b, _ := json.Marshal(c) + fmt.Fprintf(w, "data: %s\n\n", b) + flusher.Flush() + } + + base := func() streamChunk { + return streamChunk{ID: id, Object: "chat.completion.chunk", Created: created, Model: model} + } + + role := base() + role.Choices = []streamChoice{{Index: 0, Delta: delta{Role: "assistant"}}} + send(role) + + content := base() + content.Choices = []streamChoice{{Index: 0, Delta: delta{Content: text}}} + send(content) + + stop := base() + stop.Choices = []streamChoice{{Index: 0, Delta: delta{}, FinishReason: finish}} + send(stop) + + fmt.Fprint(w, "data: [DONE]\n\n") + flusher.Flush() +} + +func writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(v) +} + +func writeError(w http.ResponseWriter, status int, errType, message string) { + writeJSON(w, status, map[string]any{ + "error": map[string]any{ + "message": message, + "type": errType, + }, + }) +} diff --git a/server/server_test.go b/server/server_test.go new file mode 100644 index 0000000..7457578 --- /dev/null +++ b/server/server_test.go @@ -0,0 +1,178 @@ +package server + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/katasec/forge" +) + +// recordingProvider is a stub forge.Provider that returns a canned assistant +// reply and records the request it was given, so tests can prove the HTTP layer +// translated and forwarded correctly. +type recordingProvider struct { + reply string + lastReq forge.ProviderRequest + gotCalls int +} + +func (p *recordingProvider) Generate(_ context.Context, req forge.ProviderRequest) (*forge.ProviderResponse, error) { + p.lastReq = req + p.gotCalls++ + return &forge.ProviderResponse{ + Messages: []forge.Message{forge.AssistantText(p.reply)}, + FinishReason: forge.FinishReasonStop, + Usage: forge.TokenUsage{InputTokens: 11, OutputTokens: 7}, + }, nil +} + +func newTestServer(t *testing.T, prov forge.Provider) *Server { + t.Helper() + agent, err := forge.NewAgent(forge.Config{ + Provider: prov, + SystemPrompt: "SCAFFOLD", + DisableMemory: true, + }) + if err != nil { + t.Fatalf("NewAgent: %v", err) + } + return New(map[string]*forge.Agent{"forged_reviewer": agent}, "") +} + +// newTestServerWithDefault is like newTestServer but configures a default agent +// so unknown model ids fall back instead of 404ing. +func newTestServerWithDefault(t *testing.T, prov forge.Provider, def string) *Server { + t.Helper() + agent, err := forge.NewAgent(forge.Config{ + Provider: prov, + SystemPrompt: "SCAFFOLD", + DisableMemory: true, + }) + if err != nil { + t.Fatalf("NewAgent: %v", err) + } + return New(map[string]*forge.Agent{"forged_reviewer": agent}, def) +} + +func TestModelsListsAgents(t *testing.T) { + srv := newTestServer(t, &recordingProvider{reply: "ok"}) + + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/v1/models", nil)) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", rec.Code) + } + var list modelList + if err := json.Unmarshal(rec.Body.Bytes(), &list); err != nil { + t.Fatalf("decode: %v", err) + } + if list.Object != "list" || len(list.Data) != 1 || list.Data[0].ID != "forged_reviewer" { + t.Fatalf("unexpected model list: %+v", list) + } +} + +func TestChatCompletionsRunsAgent(t *testing.T) { + prov := &recordingProvider{reply: "a grounded review"} + srv := newTestServer(t, prov) + + body := `{"model":"forged_reviewer","messages":[{"role":"user","content":"review this repo"}]}` + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(body))) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200 (%s)", rec.Code, rec.Body.String()) + } + + var resp chatResponse + if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp.Object != "chat.completion" { + t.Errorf("object = %q", resp.Object) + } + if len(resp.Choices) != 1 || resp.Choices[0].Message.Content != "a grounded review" { + t.Fatalf("unexpected choices: %+v", resp.Choices) + } + if resp.Choices[0].FinishReason != "stop" { + t.Errorf("finish_reason = %q, want stop", resp.Choices[0].FinishReason) + } + if resp.Usage.PromptTokens != 11 || resp.Usage.CompletionTokens != 7 || resp.Usage.TotalTokens != 18 { + t.Errorf("usage = %+v", resp.Usage) + } + + // Prove the loop actually ran and translation reached the provider. + if prov.gotCalls != 1 { + t.Errorf("provider calls = %d, want 1", prov.gotCalls) + } + if prov.lastReq.SystemPrompt != "SCAFFOLD" { + t.Errorf("scaffold system prompt not forwarded: %q", prov.lastReq.SystemPrompt) + } + last := prov.lastReq.Messages[len(prov.lastReq.Messages)-1] + if last.Text() != "review this repo" { + t.Errorf("user message not forwarded: %q", last.Text()) + } +} + +func TestChatCompletionsUnknownModel(t *testing.T) { + srv := newTestServer(t, &recordingProvider{reply: "ok"}) + + body := `{"model":"nope","messages":[{"role":"user","content":"hi"}]}` + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(body))) + + if rec.Code != http.StatusNotFound { + t.Fatalf("status = %d, want 404", rec.Code) + } +} + +func TestChatCompletionsContentArrayForm(t *testing.T) { + prov := &recordingProvider{reply: "ok"} + srv := newTestServer(t, prov) + + // Some clients send content as an array of typed parts. + body := `{"model":"forged_reviewer","messages":[{"role":"user","content":[{"type":"text","text":"part one "},{"type":"text","text":"part two"}]}]}` + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(body))) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200 (%s)", rec.Code, rec.Body.String()) + } + last := prov.lastReq.Messages[len(prov.lastReq.Messages)-1] + if last.Text() != "part one part two" { + t.Errorf("array content not joined: %q", last.Text()) + } +} + +func TestChatCompletionsStreaming(t *testing.T) { + srv := newTestServer(t, &recordingProvider{reply: "streamed answer"}) + + body := `{"model":"forged_reviewer","messages":[{"role":"user","content":"go"}],"stream":true}` + rec := httptest.NewRecorder() + srv.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(body))) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", rec.Code) + } + if ct := rec.Header().Get("Content-Type"); ct != "text/event-stream" { + t.Errorf("content-type = %q, want text/event-stream", ct) + } + + out := rec.Body.String() + if !strings.Contains(out, `"object":"chat.completion.chunk"`) { + t.Errorf("missing chunk object in stream:\n%s", out) + } + if !strings.Contains(out, `"content":"streamed answer"`) { + t.Errorf("missing content delta in stream:\n%s", out) + } + if !strings.Contains(out, `"finish_reason":"stop"`) { + t.Errorf("missing finish_reason in stream:\n%s", out) + } + if !strings.HasSuffix(strings.TrimSpace(out), "data: [DONE]") { + t.Errorf("stream did not end with [DONE]:\n%s", out) + } +}