Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions backend/internal/service/openai_gateway_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,18 +1122,26 @@ func (s *OpenAIGatewayService) ExtractSessionID(c *gin.Context, body []byte) str
// GenerateSessionHash generates a sticky-session hash for OpenAI requests.
//
// Priority:
// 1. Header: session_id
// 2. Header: conversation_id
// 1. Header: conversation_id
// 2. Header: session_id
// 3. Body: prompt_cache_key (opencode)
// 4. Body: content-based fallback (model + system + tools + first user message)
//
// Why conversation_id comes first:
// Codex clients can keep a long-lived session_id while starting a brand-new
// conversation_id for each fresh thread. If we key sticky state by session_id
// first, a new question can accidentally inherit the previous conversation's
// turn_state / sessionConn / upstream continuation anchor and look like the
// model answered the last prompt again. Using conversation_id when available
// makes the sticky boundary follow the actual conversation switch.
func (s *OpenAIGatewayService) GenerateSessionHash(c *gin.Context, body []byte) string {
if c == nil {
return ""
}

sessionID := strings.TrimSpace(c.GetHeader("session_id"))
sessionID := strings.TrimSpace(c.GetHeader("conversation_id"))
if sessionID == "" {
sessionID = strings.TrimSpace(c.GetHeader("conversation_id"))
sessionID = strings.TrimSpace(c.GetHeader("session_id"))
}
if sessionID == "" && len(body) > 0 {
sessionID = strings.TrimSpace(gjson.GetBytes(body, "prompt_cache_key").String())
Expand Down Expand Up @@ -2698,8 +2706,16 @@ func (s *OpenAIGatewayService) buildUpstreamRequestOpenAIPassthrough(
}
apiKeyID := getAPIKeyIDFromContext(c)
// 先保存客户端原始值,再做 compact 补充,避免后续统一隔离时读到已处理的值。
clientSessionID := strings.TrimSpace(req.Header.Get("session_id"))
clientConversationID := strings.TrimSpace(req.Header.Get("conversation_id"))
// Keep the upstream session boundary aligned with conversation_id first.
// This matters when the client reuses one stable session_id across multiple
// independent conversations: if upstream session_id keeps following that
// stable value, ChatGPT/OpenAI can continue the old conversation and return
// the previous question's answer in the new thread.
clientSessionID := clientConversationID
if clientSessionID == "" {
clientSessionID = strings.TrimSpace(req.Header.Get("session_id"))
}
if isOpenAIResponsesCompactPath(c) {
req.Header.Set("accept", "application/json")
if req.Header.Get("version") == "" {
Expand Down Expand Up @@ -4317,12 +4333,15 @@ func normalizeOpenAICompactRequestBody(body []byte) ([]byte, bool, error) {

func resolveOpenAICompactSessionID(c *gin.Context) string {
if c != nil {
if sessionID := strings.TrimSpace(c.GetHeader("session_id")); sessionID != "" {
return sessionID
}
// compact requests can omit explicit sticky hints in the body, so when both
// headers exist we still prefer conversation_id to avoid carrying an older
// conversation forward under a reused client session_id.
if conversationID := strings.TrimSpace(c.GetHeader("conversation_id")); conversationID != "" {
return conversationID
}
if sessionID := strings.TrimSpace(c.GetHeader("session_id")); sessionID != "" {
return sessionID
}
if seed, ok := c.Get(openAICompactSessionSeedKey); ok {
if seedStr, ok := seed.(string); ok && strings.TrimSpace(seedStr) != "" {
return strings.TrimSpace(seedStr)
Expand Down
79 changes: 70 additions & 9 deletions backend/internal/service/openai_gateway_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,37 +154,48 @@ func TestOpenAIGatewayService_GenerateSessionHash_Priority(t *testing.T) {

bodyWithKey := []byte(`{"prompt_cache_key":"ses_aaa"}`)

// 1) session_id header wins
// 1) conversation_id should win when both headers are present because it is
// conversation-scoped while session_id may span multiple conversations.
c.Request.Header.Set("session_id", "sess-123")
c.Request.Header.Set("conversation_id", "conv-456")
h1 := svc.GenerateSessionHash(c, bodyWithKey)
if h1 == "" {
t.Fatalf("expected non-empty hash")
}

// 2) conversation_id used when session_id absent
c.Request.Header.Del("session_id")
c.Request.Header.Del("conversation_id")
h2 := svc.GenerateSessionHash(c, bodyWithKey)
if h2 == "" {
t.Fatalf("expected non-empty hash")
}
if h1 == h2 {
t.Fatalf("expected different hashes for different keys")
t.Fatalf("expected conversation_id to take precedence over session_id")
}

// 3) prompt_cache_key used when both headers absent
c.Request.Header.Del("conversation_id")
// 2) conversation_id used when session_id absent
c.Request.Header.Del("session_id")
c.Request.Header.Set("conversation_id", "conv-456")
h3 := svc.GenerateSessionHash(c, bodyWithKey)
if h3 == "" {
t.Fatalf("expected non-empty hash")
}
if h2 == h3 {
if h1 != h3 {
t.Fatalf("expected same hash when conversation_id is the effective signal")
}

// 3) prompt_cache_key used when both headers absent
c.Request.Header.Del("conversation_id")
h4 := svc.GenerateSessionHash(c, bodyWithKey)
if h4 == "" {
t.Fatalf("expected non-empty hash")
}
if h3 == h4 {
t.Fatalf("expected different hashes for different keys")
}

// 4) empty when no signals
h4 := svc.GenerateSessionHash(c, []byte(`{}`))
if h4 != "" {
hEmpty := svc.GenerateSessionHash(c, []byte(`{}`))
if hEmpty != "" {
t.Fatalf("expected empty hash when no signals")
}
}
Expand All @@ -203,6 +214,26 @@ func TestOpenAIGatewayService_GenerateSessionHash_UsesXXHash64(t *testing.T) {
require.Equal(t, want, got)
}

func TestOpenAIGatewayService_GenerateSessionHash_ConversationIDOverridesStableSessionID(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/openai/v1/responses", nil)
c.Request.Header.Set("session_id", "stable-session")

svc := &OpenAIGatewayService{}

c.Request.Header.Set("conversation_id", "conv-1")
hash1 := svc.GenerateSessionHash(c, nil)
require.NotEmpty(t, hash1)

c.Request.Header.Set("conversation_id", "conv-2")
hash2 := svc.GenerateSessionHash(c, nil)
require.NotEmpty(t, hash2)

require.NotEqual(t, hash1, hash2, "different conversation_id values must not collapse into one sticky session")
}

func TestOpenAIGatewayService_GenerateSessionHash_AttachesLegacyHashToContext(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
Expand Down Expand Up @@ -1486,6 +1517,36 @@ func TestOpenAIBuildUpstreamRequestOpenAIPassthroughPreservesCompactPath(t *test
require.NotEmpty(t, req.Header.Get("Session_Id"))
}

func TestOpenAIBuildUpstreamRequestOpenAIPassthroughPrefersConversationIDForSessionID(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader([]byte(`{"model":"gpt-5"}`)))
c.Request.Header.Set("session_id", "stable-session")
c.Request.Header.Set("conversation_id", "conv-fresh")

svc := &OpenAIGatewayService{}
account := &Account{Type: AccountTypeOAuth}

req, err := svc.buildUpstreamRequestOpenAIPassthrough(c.Request.Context(), c, account, []byte(`{"model":"gpt-5"}`), "token")
require.NoError(t, err)

expected := isolateOpenAISessionID(0, "conv-fresh")
require.Equal(t, expected, req.Header.Get("Conversation_Id"))
require.Equal(t, expected, req.Header.Get("Session_Id"), "passthrough upstream session_id should follow conversation_id")
}

func TestResolveOpenAICompactSessionIDPrefersConversationID(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses/compact", nil)
c.Request.Header.Set("session_id", "stable-session")
c.Request.Header.Set("conversation_id", "conv-fresh")

require.Equal(t, "conv-fresh", resolveOpenAICompactSessionID(c))
}

func TestOpenAIBuildUpstreamRequestCompactForcesJSONAcceptForOAuth(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
Expand Down
16 changes: 10 additions & 6 deletions backend/internal/service/openai_ws_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,16 +269,20 @@ func resolveOpenAIWSSessionHeaders(c *gin.Context, promptCacheKey string) openAI
ConversationSource: "none",
}
if c != nil && c.Request != nil {
if sessionID := strings.TrimSpace(c.Request.Header.Get("session_id")); sessionID != "" {
resolution.SessionID = sessionID
resolution.SessionSource = "header_session_id"
}
if conversationID := strings.TrimSpace(c.Request.Header.Get("conversation_id")); conversationID != "" {
resolution.ConversationID = conversationID
resolution.ConversationSource = "header_conversation_id"
// The upstream WS handshake must treat a new conversation_id as a brand-new
// conversation boundary even if the client keeps reusing session_id. This
// prevents the server from replaying a stale turn_state or reattaching to a
// continuation flow that belongs to the previous conversation.
resolution.SessionID = conversationID
resolution.SessionSource = "header_conversation_id"
}
if sessionID := strings.TrimSpace(c.Request.Header.Get("session_id")); sessionID != "" {
if resolution.SessionID == "" {
resolution.SessionID = conversationID
resolution.SessionSource = "header_conversation_id"
resolution.SessionID = sessionID
resolution.SessionSource = "header_session_id"
}
}
}
Expand Down
Loading