Unverified Commit 06e2756e authored by Wesley Liddick's avatar Wesley Liddick Committed by GitHub
Browse files

Merge pull request #1501 from StarryKira/fix/1493-non-streaming-empty-output

fix: 非流式响应路径扩展SSE检测至所有账号类型
parents 1c9a2128 9e515ea7
...@@ -3007,6 +3007,14 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough( ...@@ -3007,6 +3007,14 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough(
return nil, err return nil, err
} }
// Detect SSE responses from upstream and convert to JSON.
// Some upstreams (e.g. other sub2api instances) may return SSE even when
// stream=false was requested. Without this conversion the client would
// receive raw SSE text or a terminal event with empty output.
if isEventStreamResponse(resp.Header) {
return s.handlePassthroughSSEToJSON(resp, c, body)
}
usage := &OpenAIUsage{} usage := &OpenAIUsage{}
usageParsed := false usageParsed := false
if len(body) > 0 { if len(body) > 0 {
...@@ -3030,6 +3038,56 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough( ...@@ -3030,6 +3038,56 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough(
return usage, nil return usage, nil
} }
// handlePassthroughSSEToJSON converts an SSE response body into a JSON
// response for the passthrough path. It mirrors handleSSEToJSON but skips
// model replacement (passthrough does not remap models).
func (s *OpenAIGatewayService) handlePassthroughSSEToJSON(resp *http.Response, c *gin.Context, body []byte) (*OpenAIUsage, error) {
bodyText := string(body)
finalResponse, ok := extractCodexFinalResponse(bodyText)
usage := &OpenAIUsage{}
if ok {
if parsedUsage, parsed := extractOpenAIUsageFromJSONBytes(finalResponse); parsed {
*usage = parsedUsage
}
// When the terminal event has an empty output array, reconstruct
// output from accumulated delta events so the client gets full content.
if len(gjson.GetBytes(finalResponse, "output").Array()) == 0 {
if outputJSON, reconstructed := reconstructResponseOutputFromSSE(bodyText); reconstructed {
if patched, err := sjson.SetRawBytes(finalResponse, "output", outputJSON); err == nil {
finalResponse = patched
}
}
}
body = finalResponse
// Correct tool calls in final response
body = s.correctToolCallsInResponseBody(body)
} else {
terminalType, terminalPayload, terminalOK := extractOpenAISSETerminalEvent(bodyText)
if terminalOK && terminalType == "response.failed" {
msg := extractOpenAISSEErrorMessage(terminalPayload)
if msg == "" {
msg = "Upstream compact response failed"
}
return nil, s.writeOpenAINonStreamingProtocolError(resp, c, msg)
}
usage = s.parseSSEUsageFromBody(bodyText)
}
writeOpenAIPassthroughResponseHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
contentType := "application/json; charset=utf-8"
if !ok {
contentType = resp.Header.Get("Content-Type")
if contentType == "" {
contentType = "text/event-stream"
}
}
c.Data(resp.StatusCode, contentType, body)
return usage, nil
}
func writeOpenAIPassthroughResponseHeaders(dst http.Header, src http.Header, filter *responseheaders.CompiledHeaderFilter) { func writeOpenAIPassthroughResponseHeaders(dst http.Header, src http.Header, filter *responseheaders.CompiledHeaderFilter) {
if dst == nil || src == nil { if dst == nil || src == nil {
return return
...@@ -3858,10 +3916,21 @@ func (s *OpenAIGatewayService) handleNonStreamingResponse(ctx context.Context, r ...@@ -3858,10 +3916,21 @@ func (s *OpenAIGatewayService) handleNonStreamingResponse(ctx context.Context, r
return nil, err return nil, err
} }
// Detect SSE responses for ALL account types via Content-Type header.
// Some OpenAI-compatible upstreams (including other sub2api instances)
// may return SSE even when stream=false was requested.
if isEventStreamResponse(resp.Header) {
return s.handleSSEToJSON(resp, c, body, originalModel, mappedModel)
}
// For OAuth accounts, also fall back to a body-content heuristic because
// the upstream may omit the Content-Type header while still sending SSE.
// This heuristic is NOT applied to API-key accounts to avoid false
// positives on JSON responses that coincidentally contain "data:" or
// "event:" in their text content.
if account.Type == AccountTypeOAuth { if account.Type == AccountTypeOAuth {
bodyLooksLikeSSE := bytes.Contains(body, []byte("data:")) || bytes.Contains(body, []byte("event:")) bodyLooksLikeSSE := bytes.Contains(body, []byte("data:")) || bytes.Contains(body, []byte("event:"))
if isEventStreamResponse(resp.Header) || bodyLooksLikeSSE { if bodyLooksLikeSSE {
return s.handleOAuthSSEToJSON(resp, c, body, originalModel, mappedModel) return s.handleSSEToJSON(resp, c, body, originalModel, mappedModel)
} }
} }
...@@ -3895,7 +3964,7 @@ func isEventStreamResponse(header http.Header) bool { ...@@ -3895,7 +3964,7 @@ func isEventStreamResponse(header http.Header) bool {
return strings.Contains(contentType, "text/event-stream") return strings.Contains(contentType, "text/event-stream")
} }
func (s *OpenAIGatewayService) handleOAuthSSEToJSON(resp *http.Response, c *gin.Context, body []byte, originalModel, mappedModel string) (*OpenAIUsage, error) { func (s *OpenAIGatewayService) handleSSEToJSON(resp *http.Response, c *gin.Context, body []byte, originalModel, mappedModel string) (*OpenAIUsage, error) {
bodyText := string(body) bodyText := string(body)
finalResponse, ok := extractCodexFinalResponse(bodyText) finalResponse, ok := extractCodexFinalResponse(bodyText)
......
...@@ -1797,7 +1797,7 @@ func TestExtractCodexFinalResponse_SampleReplay(t *testing.T) { ...@@ -1797,7 +1797,7 @@ func TestExtractCodexFinalResponse_SampleReplay(t *testing.T) {
require.Contains(t, string(finalResp), `"input_tokens":11`) require.Contains(t, string(finalResp), `"input_tokens":11`)
} }
func TestHandleOAuthSSEToJSON_CompletedEventReturnsJSON(t *testing.T) { func TestHandleSSEToJSON_CompletedEventReturnsJSON(t *testing.T) {
gin.SetMode(gin.TestMode) gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder() rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec) c, _ := gin.CreateTestContext(rec)
...@@ -1814,7 +1814,7 @@ func TestHandleOAuthSSEToJSON_CompletedEventReturnsJSON(t *testing.T) { ...@@ -1814,7 +1814,7 @@ func TestHandleOAuthSSEToJSON_CompletedEventReturnsJSON(t *testing.T) {
`data: [DONE]`, `data: [DONE]`,
}, "\n")) }, "\n"))
usage, err := svc.handleOAuthSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o") usage, err := svc.handleSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o")
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, usage) require.NotNil(t, usage)
require.Equal(t, 7, usage.InputTokens) require.Equal(t, 7, usage.InputTokens)
...@@ -1826,7 +1826,7 @@ func TestHandleOAuthSSEToJSON_CompletedEventReturnsJSON(t *testing.T) { ...@@ -1826,7 +1826,7 @@ func TestHandleOAuthSSEToJSON_CompletedEventReturnsJSON(t *testing.T) {
require.NotContains(t, rec.Body.String(), "data:") require.NotContains(t, rec.Body.String(), "data:")
} }
func TestHandleOAuthSSEToJSON_NoFinalResponseKeepsSSEBody(t *testing.T) { func TestHandleSSEToJSON_NoFinalResponseKeepsSSEBody(t *testing.T) {
gin.SetMode(gin.TestMode) gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder() rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec) c, _ := gin.CreateTestContext(rec)
...@@ -1842,7 +1842,7 @@ func TestHandleOAuthSSEToJSON_NoFinalResponseKeepsSSEBody(t *testing.T) { ...@@ -1842,7 +1842,7 @@ func TestHandleOAuthSSEToJSON_NoFinalResponseKeepsSSEBody(t *testing.T) {
`data: [DONE]`, `data: [DONE]`,
}, "\n")) }, "\n"))
usage, err := svc.handleOAuthSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o") usage, err := svc.handleSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o")
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, usage) require.NotNil(t, usage)
require.Equal(t, 0, usage.InputTokens) require.Equal(t, 0, usage.InputTokens)
...@@ -1850,7 +1850,7 @@ func TestHandleOAuthSSEToJSON_NoFinalResponseKeepsSSEBody(t *testing.T) { ...@@ -1850,7 +1850,7 @@ func TestHandleOAuthSSEToJSON_NoFinalResponseKeepsSSEBody(t *testing.T) {
require.Contains(t, rec.Body.String(), `data: {"type":"response.in_progress"`) require.Contains(t, rec.Body.String(), `data: {"type":"response.in_progress"`)
} }
func TestHandleOAuthSSEToJSON_ResponseFailedReturnsProtocolError(t *testing.T) { func TestHandleSSEToJSON_ResponseFailedReturnsProtocolError(t *testing.T) {
gin.SetMode(gin.TestMode) gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder() rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec) c, _ := gin.CreateTestContext(rec)
...@@ -1866,7 +1866,7 @@ func TestHandleOAuthSSEToJSON_ResponseFailedReturnsProtocolError(t *testing.T) { ...@@ -1866,7 +1866,7 @@ func TestHandleOAuthSSEToJSON_ResponseFailedReturnsProtocolError(t *testing.T) {
`data: [DONE]`, `data: [DONE]`,
}, "\n")) }, "\n"))
usage, err := svc.handleOAuthSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o") usage, err := svc.handleSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o")
require.Nil(t, usage) require.Nil(t, usage)
require.Error(t, err) require.Error(t, err)
require.Equal(t, http.StatusBadGateway, rec.Code) require.Equal(t, http.StatusBadGateway, rec.Code)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment