diff --git a/server/internal/handler/daemon.go b/server/internal/handler/daemon.go index e593868339..78775541b2 100644 --- a/server/internal/handler/daemon.go +++ b/server/internal/handler/daemon.go @@ -1135,7 +1135,7 @@ func logHeartbeatEndpointSlow(runtimeID, outcome, authPath string, start time.Ti // logClaimEndpointSlow emits one structured log when the /tasks/claim endpoint // exceeds 500ms, splitting auth / claim / response-build phases so the prod // tail can be diagnosed without flooding logs at normal poll rates. -func logClaimEndpointSlow(runtimeID, outcome string, start time.Time, authMs, claimMs, buildMs int64) { +func logClaimEndpointSlow(runtimeID, outcome string, start time.Time, authMs, claimMs, buildMs int64, payloadBytes, agentSkillCount, builtinSkillCount, skillPayloadBytes int) { totalMs := time.Since(start).Milliseconds() if totalMs < 500 { return @@ -1147,6 +1147,10 @@ func logClaimEndpointSlow(runtimeID, outcome string, start time.Time, authMs, cl "auth_ms", authMs, "claim_ms", claimMs, "build_ms", buildMs, + "payload_bytes", payloadBytes, + "agent_skill_count", agentSkillCount, + "builtin_skill_count", builtinSkillCount, + "skill_payload_bytes", skillPayloadBytes, ) } @@ -1159,6 +1163,10 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) { var ( outcome = "unauth" authMs, claimMs, buildMs int64 + payloadBytes int + agentSkillCount int + builtinSkillCount int + skillPayloadBytes int buildStart time.Time ) defer func() { @@ -1168,7 +1176,7 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) { if !buildStart.IsZero() { buildMs = time.Since(buildStart).Milliseconds() } - logClaimEndpointSlow(runtimeID, outcome, start, authMs, claimMs, buildMs) + logClaimEndpointSlow(runtimeID, outcome, start, authMs, claimMs, buildMs, payloadBytes, agentSkillCount, builtinSkillCount, skillPayloadBytes) }() // Verify the caller owns this runtime's workspace. The runtime's @@ -1194,7 +1202,7 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) { if task == nil { slog.Debug("no task to claim", "runtime_id", runtimeID) - writeJSON(w, http.StatusOK, map[string]any{"task": nil}) + payloadBytes, _ = writeMeasuredJSON(w, http.StatusOK, map[string]any{"task": nil}) outcome = "no_task" return } @@ -1209,7 +1217,10 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) { // names carry a "multica-" prefix so their on-disk slugs never collide // with a user-authored workspace skill (see writeSkillFiles). skills := h.TaskService.LoadAgentSkills(r.Context(), task.AgentID) - skills = append(skills, h.TaskService.BuiltinSkills()...) + agentSkillCount = len(skills) + builtinSkills := h.TaskService.BuiltinSkills() + builtinSkillCount = len(builtinSkills) + skills = append(skills, builtinSkills...) var customEnv map[string]string if agent.CustomEnv != nil { if err := json.Unmarshal(agent.CustomEnv, &customEnv); err != nil { @@ -1800,7 +1811,12 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) { resp.AuthToken = tokenStr slog.Info("task claimed by runtime", "task_id", uuidToString(task.ID), "runtime_id", runtimeID, "agent_id", uuidToString(task.AgentID), "prior_session", resp.PriorSessionID) - writeJSON(w, http.StatusOK, map[string]any{"task": resp}) + if resp.Agent != nil && len(resp.Agent.Skills) > 0 { + if skillPayload, err := json.Marshal(resp.Agent.Skills); err == nil { + skillPayloadBytes = len(skillPayload) + } + } + payloadBytes, _ = writeMeasuredJSON(w, http.StatusOK, map[string]any{"task": resp}) } // trailingUserMessages returns the run of user messages after the last diff --git a/server/internal/handler/daemon_test.go b/server/internal/handler/daemon_test.go index cffbc82a9a..d41c746b12 100644 --- a/server/internal/handler/daemon_test.go +++ b/server/internal/handler/daemon_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "log/slog" "net/http" "net/http/httptest" "strings" @@ -22,6 +23,29 @@ import ( "github.com/multica-ai/multica/server/pkg/protocol" ) +func TestLogClaimEndpointSlowIncludesPayloadFields(t *testing.T) { + var logs bytes.Buffer + prev := slog.Default() + slog.SetDefault(slog.New(slog.NewTextHandler(&logs, &slog.HandlerOptions{Level: slog.LevelInfo}))) + t.Cleanup(func() { slog.SetDefault(prev) }) + + logClaimEndpointSlow("runtime-1", "claimed", time.Now().Add(-600*time.Millisecond), 10, 20, 30, 4096, 2, 8, 3072) + + got := logs.String() + for _, want := range []string{ + "msg=\"claim_endpoint slow\"", + "runtime_id=runtime-1", + "payload_bytes=4096", + "agent_skill_count=2", + "builtin_skill_count=8", + "skill_payload_bytes=3072", + } { + if !strings.Contains(got, want) { + t.Fatalf("slow claim log missing %q in %s", want, got) + } + } +} + // slowProbeLocalSkillListStore wraps a LocalSkillListStore but blocks inside // HasPending until the provided context is cancelled. PopPending delegates // to the underlying store. Used to verify that a stalled probe cannot wedge diff --git a/server/internal/handler/handler.go b/server/internal/handler/handler.go index dd66ad51ba..a89e627cf7 100644 --- a/server/internal/handler/handler.go +++ b/server/internal/handler/handler.go @@ -230,6 +230,24 @@ func writeJSON(w http.ResponseWriter, status int, v any) { json.NewEncoder(w).Encode(v) } +// writeMeasuredJSON behaves like writeJSON but returns the encoded body size so +// callers can record payload bytes in slow-endpoint diagnostics. It measures the +// uncompressed JSON length and is unrelated to transport compression. +func writeMeasuredJSON(w http.ResponseWriter, status int, v any) (int, error) { + body, err := json.Marshal(v) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to encode response") + return 0, err + } + body = append(body, '\n') + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + if _, err := w.Write(body); err != nil { + return len(body), err + } + return len(body), nil +} + func writeError(w http.ResponseWriter, status int, msg string) { writeJSON(w, status, map[string]string{"error": msg}) } diff --git a/server/internal/handler/handler_writejson_test.go b/server/internal/handler/handler_writejson_test.go new file mode 100644 index 0000000000..268485f983 --- /dev/null +++ b/server/internal/handler/handler_writejson_test.go @@ -0,0 +1,99 @@ +package handler + +import ( + "bytes" + "net/http" + "net/http/httptest" + "testing" +) + +// TestWriteMeasuredJSONByteIdenticalToWriteJSON locks the load-bearing assumption +// behind the F2 claim-observability patch: swapping writeJSON for writeMeasuredJSON +// at the /tasks/claim response sites must not change a single byte on the wire. +// +// writeJSON encodes via json.NewEncoder(w).Encode (which appends a trailing +// newline); writeMeasuredJSON marshals via json.Marshal and appends the newline +// by hand. Both HTML-escape by default, so the emitted bytes must match for every +// input. This table-driven test fails closed if that invariant ever drifts, so the +// "no wire-behavior change" claim is provable rather than reasoned. +func TestWriteMeasuredJSONByteIdenticalToWriteJSON(t *testing.T) { + type skill struct { + Name string `json:"name"` + Description string `json:"description"` + Files map[string]string `json:"files"` + } + type claimResp struct { + ID string `json:"id"` + Name string `json:"name"` + Skills []skill `json:"skills"` + Args []string `json:"args"` + } + + cases := []struct { + name string + v any + }{ + {"nil", nil}, + {"no_task", map[string]any{"task": nil}}, + {"empty_map", map[string]any{}}, + {"empty_slice", []string{}}, + {"scalar_string", "plain string"}, + {"scalar_bool", true}, + {"numbers", map[string]any{"i": 42, "f": 3.5, "neg": -17, "big": 1234567890123}}, + {"html_escapable", map[string]any{"s": `a & "c" 'd'