Commit b2e379cf authored by shaw's avatar shaw
Browse files

fix: 非流式路径在上游终态事件output为空时从delta事件重建响应内容

上游API近期更新后,response.completed终态SSE事件的output字段可能为空,
实际内容仅通过response.output_text.delta等增量事件下发。流式路径不受影响,
但chat_completions非流式路径和responses OAuth非流式路径只依赖终态事件的
output,导致返回空响应。

新增BufferedResponseAccumulator累积器,在SSE扫描过程中收集delta事件内容
(文本、function_call、reasoning),当终态output为空时补充重建。

同时修复handleChatBufferedStreamingResponse遗漏response.done事件类型的问题。
parent 08b45442
...@@ -876,3 +876,182 @@ func TestChatCompletionsStreamRoundTrip(t *testing.T) { ...@@ -876,3 +876,182 @@ func TestChatCompletionsStreamRoundTrip(t *testing.T) {
assert.Equal(t, "resp_rt", c.ID) assert.Equal(t, "resp_rt", c.ID)
} }
} }
// ---------------------------------------------------------------------------
// BufferedResponseAccumulator tests
// ---------------------------------------------------------------------------
func TestBufferedResponseAccumulator_TextOnly(t *testing.T) {
acc := NewBufferedResponseAccumulator()
acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.output_text.delta", Delta: "Hello"})
acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.output_text.delta", Delta: ", world!"})
assert.True(t, acc.HasContent())
output := acc.BuildOutput()
require.Len(t, output, 1)
assert.Equal(t, "message", output[0].Type)
assert.Equal(t, "assistant", output[0].Role)
require.Len(t, output[0].Content, 1)
assert.Equal(t, "output_text", output[0].Content[0].Type)
assert.Equal(t, "Hello, world!", output[0].Content[0].Text)
}
func TestBufferedResponseAccumulator_ToolCalls(t *testing.T) {
acc := NewBufferedResponseAccumulator()
// Add function call at output_index=1
acc.ProcessEvent(&ResponsesStreamEvent{
Type: "response.output_item.added",
OutputIndex: 1,
Item: &ResponsesOutput{
Type: "function_call",
CallID: "call_abc",
Name: "get_weather",
},
})
acc.ProcessEvent(&ResponsesStreamEvent{
Type: "response.function_call_arguments.delta",
OutputIndex: 1,
Delta: `{"city":`,
})
acc.ProcessEvent(&ResponsesStreamEvent{
Type: "response.function_call_arguments.delta",
OutputIndex: 1,
Delta: `"NYC"}`,
})
assert.True(t, acc.HasContent())
output := acc.BuildOutput()
require.Len(t, output, 1)
assert.Equal(t, "function_call", output[0].Type)
assert.Equal(t, "call_abc", output[0].CallID)
assert.Equal(t, "get_weather", output[0].Name)
assert.Equal(t, `{"city":"NYC"}`, output[0].Arguments)
}
func TestBufferedResponseAccumulator_Reasoning(t *testing.T) {
acc := NewBufferedResponseAccumulator()
acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.reasoning_summary_text.delta", Delta: "Step 1: "})
acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.reasoning_summary_text.delta", Delta: "think about it"})
assert.True(t, acc.HasContent())
output := acc.BuildOutput()
require.Len(t, output, 1)
assert.Equal(t, "reasoning", output[0].Type)
require.Len(t, output[0].Summary, 1)
assert.Equal(t, "summary_text", output[0].Summary[0].Type)
assert.Equal(t, "Step 1: think about it", output[0].Summary[0].Text)
}
func TestBufferedResponseAccumulator_Mixed(t *testing.T) {
acc := NewBufferedResponseAccumulator()
// Reasoning first
acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.reasoning_summary_text.delta", Delta: "I thought about it."})
// Then text
acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.output_text.delta", Delta: "The answer is 42."})
// Then a tool call
acc.ProcessEvent(&ResponsesStreamEvent{
Type: "response.output_item.added",
OutputIndex: 2,
Item: &ResponsesOutput{
Type: "function_call",
CallID: "call_1",
Name: "verify",
},
})
acc.ProcessEvent(&ResponsesStreamEvent{
Type: "response.function_call_arguments.delta",
OutputIndex: 2,
Delta: `{}`,
})
assert.True(t, acc.HasContent())
output := acc.BuildOutput()
// Order: reasoning → message → function_calls
require.Len(t, output, 3)
assert.Equal(t, "reasoning", output[0].Type)
assert.Equal(t, "message", output[1].Type)
assert.Equal(t, "function_call", output[2].Type)
assert.Equal(t, "The answer is 42.", output[1].Content[0].Text)
assert.Equal(t, "verify", output[2].Name)
}
func TestBufferedResponseAccumulator_SupplementEmptyOutput(t *testing.T) {
acc := NewBufferedResponseAccumulator()
acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.output_text.delta", Delta: "Hello"})
resp := &ResponsesResponse{
ID: "resp_1",
Status: "completed",
Output: nil, // empty output
Usage: &ResponsesUsage{InputTokens: 10, OutputTokens: 5},
}
acc.SupplementResponseOutput(resp)
require.Len(t, resp.Output, 1)
assert.Equal(t, "message", resp.Output[0].Type)
assert.Equal(t, "Hello", resp.Output[0].Content[0].Text)
// Usage should be untouched
assert.Equal(t, 10, resp.Usage.InputTokens)
}
func TestBufferedResponseAccumulator_NoSupplementWhenOutputExists(t *testing.T) {
acc := NewBufferedResponseAccumulator()
acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.output_text.delta", Delta: "from deltas"})
resp := &ResponsesResponse{
ID: "resp_2",
Status: "completed",
Output: []ResponsesOutput{
{
Type: "message",
Content: []ResponsesContentPart{
{Type: "output_text", Text: "from terminal event"},
},
},
},
}
acc.SupplementResponseOutput(resp)
// Output should NOT be overwritten
require.Len(t, resp.Output, 1)
assert.Equal(t, "from terminal event", resp.Output[0].Content[0].Text)
}
func TestBufferedResponseAccumulator_EmptyDeltas(t *testing.T) {
acc := NewBufferedResponseAccumulator()
// Process events with empty delta — should not accumulate
acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.output_text.delta", Delta: ""})
acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.created"})
assert.False(t, acc.HasContent())
resp := &ResponsesResponse{ID: "resp_3", Status: "completed"}
acc.SupplementResponseOutput(resp)
assert.Nil(t, resp.Output)
}
func TestBufferedResponseAccumulator_IgnoresNonFunctionCallItems(t *testing.T) {
acc := NewBufferedResponseAccumulator()
// output_item.added with type "message" should be ignored
acc.ProcessEvent(&ResponsesStreamEvent{
Type: "response.output_item.added",
OutputIndex: 0,
Item: &ResponsesOutput{Type: "message"},
})
assert.False(t, acc.HasContent())
}
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings"
"time" "time"
) )
...@@ -372,3 +373,119 @@ func generateChatCmplID() string { ...@@ -372,3 +373,119 @@ func generateChatCmplID() string {
_, _ = rand.Read(b) _, _ = rand.Read(b)
return "chatcmpl-" + hex.EncodeToString(b) return "chatcmpl-" + hex.EncodeToString(b)
} }
// ---------------------------------------------------------------------------
// BufferedResponseAccumulator: accumulates SSE delta events for non-streaming
// paths where the terminal event may have empty output.
// ---------------------------------------------------------------------------
type bufferedFuncCall struct {
CallID string
Name string
Args strings.Builder
}
// BufferedResponseAccumulator collects content from Responses SSE delta events
// so that non-streaming handlers can reconstruct output when the terminal event
// (response.completed / response.done) carries an empty output array.
type BufferedResponseAccumulator struct {
text strings.Builder
reasoning strings.Builder
funcCalls []bufferedFuncCall
outputIndexToFuncIdx map[int]int
}
// NewBufferedResponseAccumulator returns an initialised accumulator.
func NewBufferedResponseAccumulator() *BufferedResponseAccumulator {
return &BufferedResponseAccumulator{
outputIndexToFuncIdx: make(map[int]int),
}
}
// ProcessEvent inspects a single Responses SSE event and accumulates any
// content it carries. Only delta events that contribute to the final output
// are handled; all other event types are silently ignored.
func (a *BufferedResponseAccumulator) ProcessEvent(event *ResponsesStreamEvent) {
switch event.Type {
case "response.output_text.delta":
if event.Delta != "" {
_, _ = a.text.WriteString(event.Delta)
}
case "response.output_item.added":
if event.Item != nil && event.Item.Type == "function_call" {
idx := len(a.funcCalls)
a.outputIndexToFuncIdx[event.OutputIndex] = idx
a.funcCalls = append(a.funcCalls, bufferedFuncCall{
CallID: event.Item.CallID,
Name: event.Item.Name,
})
}
case "response.function_call_arguments.delta":
if event.Delta != "" {
if idx, ok := a.outputIndexToFuncIdx[event.OutputIndex]; ok {
_, _ = a.funcCalls[idx].Args.WriteString(event.Delta)
}
}
case "response.reasoning_summary_text.delta":
if event.Delta != "" {
_, _ = a.reasoning.WriteString(event.Delta)
}
}
}
// HasContent reports whether any content has been accumulated.
func (a *BufferedResponseAccumulator) HasContent() bool {
return a.text.Len() > 0 || len(a.funcCalls) > 0 || a.reasoning.Len() > 0
}
// BuildOutput constructs a []ResponsesOutput from the accumulated delta
// content. The order matches what ResponsesToChatCompletions expects:
// reasoning → message → function_calls.
func (a *BufferedResponseAccumulator) BuildOutput() []ResponsesOutput {
var out []ResponsesOutput
if a.reasoning.Len() > 0 {
out = append(out, ResponsesOutput{
Type: "reasoning",
Summary: []ResponsesSummary{{
Type: "summary_text",
Text: a.reasoning.String(),
}},
})
}
if a.text.Len() > 0 {
out = append(out, ResponsesOutput{
Type: "message",
Role: "assistant",
Content: []ResponsesContentPart{{
Type: "output_text",
Text: a.text.String(),
}},
})
}
for i := range a.funcCalls {
out = append(out, ResponsesOutput{
Type: "function_call",
CallID: a.funcCalls[i].CallID,
Name: a.funcCalls[i].Name,
Arguments: a.funcCalls[i].Args.String(),
})
}
return out
}
// SupplementResponseOutput fills resp.Output from accumulated delta content
// when the terminal event delivered an empty output array. If resp.Output is
// already populated, this is a no-op (preserves backward compatibility).
func (a *BufferedResponseAccumulator) SupplementResponseOutput(resp *ResponsesResponse) {
if resp == nil || len(resp.Output) > 0 {
return
}
if !a.HasContent() {
return
}
resp.Output = a.BuildOutput()
}
...@@ -244,6 +244,7 @@ func (s *OpenAIGatewayService) handleChatBufferedStreamingResponse( ...@@ -244,6 +244,7 @@ func (s *OpenAIGatewayService) handleChatBufferedStreamingResponse(
var finalResponse *apicompat.ResponsesResponse var finalResponse *apicompat.ResponsesResponse
var usage OpenAIUsage var usage OpenAIUsage
acc := apicompat.NewBufferedResponseAccumulator()
for scanner.Scan() { for scanner.Scan() {
line := scanner.Text() line := scanner.Text()
...@@ -261,7 +262,11 @@ func (s *OpenAIGatewayService) handleChatBufferedStreamingResponse( ...@@ -261,7 +262,11 @@ func (s *OpenAIGatewayService) handleChatBufferedStreamingResponse(
continue continue
} }
if (event.Type == "response.completed" || event.Type == "response.incomplete" || event.Type == "response.failed") && // Accumulate delta content for fallback when terminal output is empty.
acc.ProcessEvent(&event)
if (event.Type == "response.completed" || event.Type == "response.done" ||
event.Type == "response.incomplete" || event.Type == "response.failed") &&
event.Response != nil { event.Response != nil {
finalResponse = event.Response finalResponse = event.Response
if event.Response.Usage != nil { if event.Response.Usage != nil {
...@@ -290,6 +295,10 @@ func (s *OpenAIGatewayService) handleChatBufferedStreamingResponse( ...@@ -290,6 +295,10 @@ func (s *OpenAIGatewayService) handleChatBufferedStreamingResponse(
return nil, fmt.Errorf("upstream stream ended without terminal event") return nil, fmt.Errorf("upstream stream ended without terminal event")
} }
// When the terminal event has an empty output array, reconstruct from
// accumulated delta events so the client receives the full content.
acc.SupplementResponseOutput(finalResponse)
chatResp := apicompat.ResponsesToChatCompletions(finalResponse, originalModel) chatResp := apicompat.ResponsesToChatCompletions(finalResponse, originalModel)
if s.responseHeaderFilter != nil { if s.responseHeaderFilter != nil {
......
...@@ -21,6 +21,7 @@ import ( ...@@ -21,6 +21,7 @@ import (
"time" "time"
"github.com/Wei-Shaw/sub2api/internal/config" "github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/pkg/apicompat"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger" "github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/pkg/openai" "github.com/Wei-Shaw/sub2api/internal/pkg/openai"
"github.com/Wei-Shaw/sub2api/internal/util/responseheaders" "github.com/Wei-Shaw/sub2api/internal/util/responseheaders"
...@@ -3901,6 +3902,16 @@ func (s *OpenAIGatewayService) handleOAuthSSEToJSON(resp *http.Response, c *gin. ...@@ -3901,6 +3902,16 @@ func (s *OpenAIGatewayService) handleOAuthSSEToJSON(resp *http.Response, c *gin.
if parsedUsage, parsed := extractOpenAIUsageFromJSONBytes(finalResponse); parsed { if parsedUsage, parsed := extractOpenAIUsageFromJSONBytes(finalResponse); parsed {
*usage = parsedUsage *usage = parsedUsage
} }
// When the terminal event has an empty output array, reconstruct
// output from accumulated delta events so the client gets full content.
// gjson Array() returns empty slice for null, missing, or empty arrays.
if len(gjson.GetBytes(finalResponse, "output").Array()) == 0 {
if outputJSON, reconstructed := reconstructResponseOutputFromSSE(bodyText); reconstructed {
if patched, err := sjson.SetRawBytes(finalResponse, "output", outputJSON); err == nil {
finalResponse = patched
}
}
}
body = finalResponse body = finalResponse
if originalModel != mappedModel { if originalModel != mappedModel {
body = s.replaceModelInResponseBody(body, mappedModel, originalModel) body = s.replaceModelInResponseBody(body, mappedModel, originalModel)
...@@ -4002,6 +4013,34 @@ func extractCodexFinalResponse(body string) ([]byte, bool) { ...@@ -4002,6 +4013,34 @@ func extractCodexFinalResponse(body string) ([]byte, bool) {
return nil, false return nil, false
} }
// reconstructResponseOutputFromSSE scans raw SSE body text for delta events and
// returns a JSON-encoded output array reconstructed from accumulated deltas.
// Returns (nil, false) if no content was found in deltas.
func reconstructResponseOutputFromSSE(bodyText string) ([]byte, bool) {
acc := apicompat.NewBufferedResponseAccumulator()
lines := strings.Split(bodyText, "\n")
for _, line := range lines {
data, ok := extractOpenAISSEDataLine(line)
if !ok || data == "" || data == "[DONE]" {
continue
}
var event apicompat.ResponsesStreamEvent
if err := json.Unmarshal([]byte(data), &event); err != nil {
continue
}
acc.ProcessEvent(&event)
}
if !acc.HasContent() {
return nil, false
}
output := acc.BuildOutput()
outputJSON, err := json.Marshal(output)
if err != nil {
return nil, false
}
return outputJSON, true
}
func (s *OpenAIGatewayService) parseSSEUsageFromBody(body string) *OpenAIUsage { func (s *OpenAIGatewayService) parseSSEUsageFromBody(body string) *OpenAIUsage {
usage := &OpenAIUsage{} usage := &OpenAIUsage{}
lines := strings.Split(body, "\n") lines := strings.Split(body, "\n")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment