Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions server/internal/handler/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,7 @@ func logHeartbeatEndpointSlow(runtimeID, outcome, authPath string, start time.Ti
// logClaimEndpointSlow emits one structured log when the /tasks/claim endpoint
// exceeds 500ms, splitting auth / claim / response-build phases so the prod
// tail can be diagnosed without flooding logs at normal poll rates.
func logClaimEndpointSlow(runtimeID, outcome string, start time.Time, authMs, claimMs, buildMs int64) {
func logClaimEndpointSlow(runtimeID, outcome string, start time.Time, authMs, claimMs, buildMs int64, payloadBytes, agentSkillCount, builtinSkillCount, skillPayloadBytes int) {
totalMs := time.Since(start).Milliseconds()
if totalMs < 500 {
return
Expand All @@ -1147,6 +1147,10 @@ func logClaimEndpointSlow(runtimeID, outcome string, start time.Time, authMs, cl
"auth_ms", authMs,
"claim_ms", claimMs,
"build_ms", buildMs,
"payload_bytes", payloadBytes,
"agent_skill_count", agentSkillCount,
"builtin_skill_count", builtinSkillCount,
"skill_payload_bytes", skillPayloadBytes,
)
}

Expand All @@ -1159,6 +1163,10 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
var (
outcome = "unauth"
authMs, claimMs, buildMs int64
payloadBytes int
agentSkillCount int
builtinSkillCount int
skillPayloadBytes int
buildStart time.Time
)
defer func() {
Expand All @@ -1168,7 +1176,7 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
if !buildStart.IsZero() {
buildMs = time.Since(buildStart).Milliseconds()
}
logClaimEndpointSlow(runtimeID, outcome, start, authMs, claimMs, buildMs)
logClaimEndpointSlow(runtimeID, outcome, start, authMs, claimMs, buildMs, payloadBytes, agentSkillCount, builtinSkillCount, skillPayloadBytes)
}()

// Verify the caller owns this runtime's workspace. The runtime's
Expand All @@ -1194,7 +1202,7 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {

if task == nil {
slog.Debug("no task to claim", "runtime_id", runtimeID)
writeJSON(w, http.StatusOK, map[string]any{"task": nil})
payloadBytes, _ = writeMeasuredJSON(w, http.StatusOK, map[string]any{"task": nil})
outcome = "no_task"
return
}
Expand All @@ -1209,7 +1217,10 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
// names carry a "multica-" prefix so their on-disk slugs never collide
// with a user-authored workspace skill (see writeSkillFiles).
skills := h.TaskService.LoadAgentSkills(r.Context(), task.AgentID)
skills = append(skills, h.TaskService.BuiltinSkills()...)
agentSkillCount = len(skills)
builtinSkills := h.TaskService.BuiltinSkills()
builtinSkillCount = len(builtinSkills)
skills = append(skills, builtinSkills...)
var customEnv map[string]string
if agent.CustomEnv != nil {
if err := json.Unmarshal(agent.CustomEnv, &customEnv); err != nil {
Expand Down Expand Up @@ -1800,7 +1811,12 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
resp.AuthToken = tokenStr

slog.Info("task claimed by runtime", "task_id", uuidToString(task.ID), "runtime_id", runtimeID, "agent_id", uuidToString(task.AgentID), "prior_session", resp.PriorSessionID)
writeJSON(w, http.StatusOK, map[string]any{"task": resp})
if resp.Agent != nil && len(resp.Agent.Skills) > 0 {
if skillPayload, err := json.Marshal(resp.Agent.Skills); err == nil {
skillPayloadBytes = len(skillPayload)
}
}
payloadBytes, _ = writeMeasuredJSON(w, http.StatusOK, map[string]any{"task": resp})
}

// trailingUserMessages returns the run of user messages after the last
Expand Down
24 changes: 24 additions & 0 deletions server/internal/handler/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"net/http/httptest"
"strings"
Expand All @@ -22,6 +23,29 @@ import (
"github.com/multica-ai/multica/server/pkg/protocol"
)

func TestLogClaimEndpointSlowIncludesPayloadFields(t *testing.T) {
var logs bytes.Buffer
prev := slog.Default()
slog.SetDefault(slog.New(slog.NewTextHandler(&logs, &slog.HandlerOptions{Level: slog.LevelInfo})))
t.Cleanup(func() { slog.SetDefault(prev) })

logClaimEndpointSlow("runtime-1", "claimed", time.Now().Add(-600*time.Millisecond), 10, 20, 30, 4096, 2, 8, 3072)

got := logs.String()
for _, want := range []string{
"msg=\"claim_endpoint slow\"",
"runtime_id=runtime-1",
"payload_bytes=4096",
"agent_skill_count=2",
"builtin_skill_count=8",
"skill_payload_bytes=3072",
} {
if !strings.Contains(got, want) {
t.Fatalf("slow claim log missing %q in %s", want, got)
}
}
}

// slowProbeLocalSkillListStore wraps a LocalSkillListStore but blocks inside
// HasPending until the provided context is cancelled. PopPending delegates
// to the underlying store. Used to verify that a stalled probe cannot wedge
Expand Down
18 changes: 18 additions & 0 deletions server/internal/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,24 @@ func writeJSON(w http.ResponseWriter, status int, v any) {
json.NewEncoder(w).Encode(v)
}

// writeMeasuredJSON behaves like writeJSON but returns the encoded body size so
// callers can record payload bytes in slow-endpoint diagnostics. It measures the
// uncompressed JSON length and is unrelated to transport compression.
func writeMeasuredJSON(w http.ResponseWriter, status int, v any) (int, error) {
body, err := json.Marshal(v)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to encode response")
return 0, err
}
body = append(body, '\n')
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if _, err := w.Write(body); err != nil {
return len(body), err
}
return len(body), nil
}

func writeError(w http.ResponseWriter, status int, msg string) {
writeJSON(w, status, map[string]string{"error": msg})
}
Expand Down
99 changes: 99 additions & 0 deletions server/internal/handler/handler_writejson_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package handler

import (
"bytes"
"net/http"
"net/http/httptest"
"testing"
)

// TestWriteMeasuredJSONByteIdenticalToWriteJSON locks the load-bearing assumption
// behind the F2 claim-observability patch: swapping writeJSON for writeMeasuredJSON
// at the /tasks/claim response sites must not change a single byte on the wire.
//
// writeJSON encodes via json.NewEncoder(w).Encode (which appends a trailing
// newline); writeMeasuredJSON marshals via json.Marshal and appends the newline
// by hand. Both HTML-escape by default, so the emitted bytes must match for every
// input. This table-driven test fails closed if that invariant ever drifts, so the
// "no wire-behavior change" claim is provable rather than reasoned.
func TestWriteMeasuredJSONByteIdenticalToWriteJSON(t *testing.T) {
type skill struct {
Name string `json:"name"`
Description string `json:"description"`
Files map[string]string `json:"files"`
}
type claimResp struct {
ID string `json:"id"`
Name string `json:"name"`
Skills []skill `json:"skills"`
Args []string `json:"args"`
}

cases := []struct {
name string
v any
}{
{"nil", nil},
{"no_task", map[string]any{"task": nil}},
{"empty_map", map[string]any{}},
{"empty_slice", []string{}},
{"scalar_string", "plain string"},
{"scalar_bool", true},
{"numbers", map[string]any{"i": 42, "f": 3.5, "neg": -17, "big": 1234567890123}},
{"html_escapable", map[string]any{"s": `a<b> & "c" 'd' <script>`}},
{"ampersand_lt_gt", map[string]any{"raw": "1 < 2 && 3 > 2"}},
{"unicode_and_separators", map[string]any{"s": "héllo 世界 🚀 

"}},
{"nested", map[string]any{"a": []any{1, "two", true, nil}, "b": map[string]any{"c": []int{1, 2, 3}}}},
{"large_claim_with_skills", map[string]any{"task": claimResp{
ID: "11111111-2222-3333-4444-555555555555",
Name: "agent <CC> & friends",
Skills: []skill{
{Name: "multica-working-on-issues", Description: "do work <safely> & well", Files: map[string]string{"SKILL.md": "# Title\n<b>x</b> & y"}},
{Name: "multica-mentioning", Description: "ping people", Files: map[string]string{"SKILL.md": "line1\nline2"}},
},
Args: []string{"--flag", "a<b", "c&d"},
}}},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
recEnc := httptest.NewRecorder()
writeJSON(recEnc, http.StatusOK, tc.v)

recMeasured := httptest.NewRecorder()
n, err := writeMeasuredJSON(recMeasured, http.StatusOK, tc.v)
if err != nil {
t.Fatalf("writeMeasuredJSON returned error: %v", err)
}

encBody := recEnc.Body.Bytes()
measuredBody := recMeasured.Body.Bytes()

if !bytes.Equal(encBody, measuredBody) {
t.Fatalf("wire bytes differ:\n writeJSON = %q\n writeMeasuredJSON = %q", encBody, measuredBody)
}
if n != len(measuredBody) {
t.Fatalf("reported payload bytes %d != actual body length %d", n, len(measuredBody))
}
if n != len(encBody) {
t.Fatalf("reported payload bytes %d != writeJSON body length %d", n, len(encBody))
}
if recEnc.Code != recMeasured.Code {
t.Fatalf("status code differs: writeJSON=%d writeMeasuredJSON=%d", recEnc.Code, recMeasured.Code)
}
if got, want := recMeasured.Header().Get("Content-Type"), recEnc.Header().Get("Content-Type"); got != want {
t.Fatalf("Content-Type differs: writeMeasuredJSON=%q writeJSON=%q", got, want)
}
})
}

// Sanity guard: both encoders HTML-escape by default, so a literal '<' rune must
// not survive into the body (it is emitted as the escaped form). This documents
// the escaping behaviour that makes the byte-identity comparison meaningful,
// without depending on the escaped literal appearing in source.
rec := httptest.NewRecorder()
writeJSON(rec, http.StatusOK, map[string]string{"x": "<&>"})
if bytes.ContainsRune(rec.Body.Bytes(), '<') {
t.Fatalf("expected '<' to be HTML-escaped out of the body, got %q", rec.Body.String())
}
}
Loading