Commit 4321adab authored by Ethan0x0000's avatar Ethan0x0000
Browse files

feat(service): add ForwardAsResponses/ForwardAsChatCompletions on GatewayService

New forwarding methods on GatewayService for Anthropic platform groups:

- ForwardAsResponses: accept Responses body → convert to Anthropic →
  forward to upstream → convert response back to Responses format.
  Supports both streaming (SSE event-by-event conversion) and buffered
  (accumulate then convert) response modes.
- ForwardAsChatCompletions: chain CC→Responses→Anthropic for request,
  Anthropic→Responses→CC for response. Streaming uses dual state machine
  chain with [DONE] marker.

Both methods reuse existing GatewayService infrastructure:
buildUpstreamRequest, Claude Code mimicry, cache control enforcement,
model mapping, and return UpstreamFailoverError for handler-level retry.
parent 68f151f5
package service
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/apicompat"
"github.com/Wei-Shaw/sub2api/internal/pkg/claude"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/util/responseheaders"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)
// ForwardAsChatCompletions accepts an OpenAI Chat Completions API request body,
// converts it to Anthropic Messages format (chained via Responses format),
// forwards to the Anthropic upstream, and converts the response back to Chat
// Completions format. This enables Chat Completions clients to access Anthropic
// models through Anthropic platform groups.
func (s *GatewayService) ForwardAsChatCompletions(
ctx context.Context,
c *gin.Context,
account *Account,
body []byte,
parsed *ParsedRequest,
) (*ForwardResult, error) {
startTime := time.Now()
// 1. Parse Chat Completions request
var ccReq apicompat.ChatCompletionsRequest
if err := json.Unmarshal(body, &ccReq); err != nil {
return nil, fmt.Errorf("parse chat completions request: %w", err)
}
originalModel := ccReq.Model
clientStream := ccReq.Stream
includeUsage := ccReq.StreamOptions != nil && ccReq.StreamOptions.IncludeUsage
// 2. Convert CC → Responses → Anthropic (chained conversion)
responsesReq, err := apicompat.ChatCompletionsToResponses(&ccReq)
if err != nil {
return nil, fmt.Errorf("convert chat completions to responses: %w", err)
}
anthropicReq, err := apicompat.ResponsesToAnthropicRequest(responsesReq)
if err != nil {
return nil, fmt.Errorf("convert responses to anthropic: %w", err)
}
// 3. Force upstream streaming
anthropicReq.Stream = true
reqStream := true
// 4. Model mapping
mappedModel := originalModel
if account.Type == AccountTypeAPIKey {
mappedModel = account.GetMappedModel(originalModel)
}
if mappedModel == originalModel && account.Platform == PlatformAnthropic && account.Type != AccountTypeAPIKey {
normalized := claude.NormalizeModelID(originalModel)
if normalized != originalModel {
mappedModel = normalized
}
}
anthropicReq.Model = mappedModel
logger.L().Debug("gateway forward_as_chat_completions: model mapping applied",
zap.Int64("account_id", account.ID),
zap.String("original_model", originalModel),
zap.String("mapped_model", mappedModel),
zap.Bool("client_stream", clientStream),
)
// 5. Marshal Anthropic request body
anthropicBody, err := json.Marshal(anthropicReq)
if err != nil {
return nil, fmt.Errorf("marshal anthropic request: %w", err)
}
// 6. Apply Claude Code mimicry for OAuth accounts
isClaudeCode := false // CC API is never Claude Code
shouldMimicClaudeCode := account.IsOAuth() && !isClaudeCode
if shouldMimicClaudeCode {
if !strings.Contains(strings.ToLower(mappedModel), "haiku") &&
!systemIncludesClaudeCodePrompt(anthropicReq.System) {
anthropicBody = injectClaudeCodePrompt(anthropicBody, anthropicReq.System)
}
}
// 7. Enforce cache_control block limit
anthropicBody = enforceCacheControlLimit(anthropicBody)
// 8. Get access token
token, tokenType, err := s.GetAccessToken(ctx, account)
if err != nil {
return nil, fmt.Errorf("get access token: %w", err)
}
// 9. Get proxy URL
proxyURL := ""
if account.ProxyID != nil && account.Proxy != nil {
proxyURL = account.Proxy.URL()
}
// 10. Build upstream request
upstreamCtx, releaseUpstreamCtx := detachStreamUpstreamContext(ctx, reqStream)
upstreamReq, err := s.buildUpstreamRequest(upstreamCtx, c, account, anthropicBody, token, tokenType, mappedModel, reqStream, shouldMimicClaudeCode)
releaseUpstreamCtx()
if err != nil {
return nil, fmt.Errorf("build upstream request: %w", err)
}
// 11. Send request
resp, err := s.httpUpstream.DoWithTLS(upstreamReq, proxyURL, account.ID, account.Concurrency, account.IsTLSFingerprintEnabled())
if err != nil {
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
safeErr := sanitizeUpstreamErrorMessage(err.Error())
setOpsUpstreamError(c, 0, safeErr, "")
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: 0,
Kind: "request_error",
Message: safeErr,
})
writeGatewayCCError(c, http.StatusBadGateway, "server_error", "Upstream request failed")
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
}
defer func() { _ = resp.Body.Close() }()
// 12. Handle error response with failover
if resp.StatusCode >= 400 {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
resp.Body = io.NopCloser(bytes.NewReader(respBody))
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
if s.shouldFailoverUpstreamError(resp.StatusCode) {
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "failover",
Message: upstreamMsg,
})
if s.rateLimitService != nil {
s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
}
return nil, &UpstreamFailoverError{
StatusCode: resp.StatusCode,
ResponseBody: respBody,
}
}
writeGatewayCCError(c, mapUpstreamStatusCode(resp.StatusCode), "server_error", upstreamMsg)
return nil, fmt.Errorf("upstream error: %d %s", resp.StatusCode, upstreamMsg)
}
// 13. Handle normal response
// Read Anthropic SSE → convert to Responses events → convert to CC format
var result *ForwardResult
var handleErr error
if clientStream {
result, handleErr = s.handleCCStreamingFromAnthropic(resp, c, originalModel, mappedModel, startTime, includeUsage)
} else {
result, handleErr = s.handleCCBufferedFromAnthropic(resp, c, originalModel, mappedModel, startTime)
}
return result, handleErr
}
// handleCCBufferedFromAnthropic reads Anthropic SSE events, assembles the full
// response, then converts Anthropic → Responses → Chat Completions.
func (s *GatewayService) handleCCBufferedFromAnthropic(
resp *http.Response,
c *gin.Context,
originalModel string,
mappedModel string,
startTime time.Time,
) (*ForwardResult, error) {
requestID := resp.Header.Get("x-request-id")
scanner := bufio.NewScanner(resp.Body)
maxLineSize := defaultMaxLineSize
if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 {
maxLineSize = s.cfg.Gateway.MaxLineSize
}
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
var finalResp *apicompat.AnthropicResponse
var usage ClaudeUsage
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "event: ") {
continue
}
if !scanner.Scan() {
break
}
dataLine := scanner.Text()
if !strings.HasPrefix(dataLine, "data: ") {
continue
}
payload := dataLine[6:]
var event apicompat.AnthropicStreamEvent
if err := json.Unmarshal([]byte(payload), &event); err != nil {
continue
}
if event.Type == "message_start" && event.Message != nil {
finalResp = event.Message
}
if event.Type == "message_delta" {
if event.Usage != nil {
usage = ClaudeUsage{
InputTokens: event.Usage.InputTokens,
OutputTokens: event.Usage.OutputTokens,
}
if event.Usage.CacheReadInputTokens > 0 {
usage.CacheReadInputTokens = event.Usage.CacheReadInputTokens
}
}
if event.Delta != nil && event.Delta.StopReason != "" && finalResp != nil {
finalResp.StopReason = event.Delta.StopReason
}
}
if event.Type == "content_block_start" && event.ContentBlock != nil && finalResp != nil {
finalResp.Content = append(finalResp.Content, *event.ContentBlock)
}
if event.Type == "content_block_delta" && event.Delta != nil && finalResp != nil && event.Index != nil {
idx := *event.Index
if idx < len(finalResp.Content) {
switch event.Delta.Type {
case "text_delta":
finalResp.Content[idx].Text += event.Delta.Text
case "thinking_delta":
finalResp.Content[idx].Thinking += event.Delta.Thinking
case "input_json_delta":
finalResp.Content[idx].Input = appendRawJSON(finalResp.Content[idx].Input, event.Delta.PartialJSON)
}
}
}
}
if err := scanner.Err(); err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
logger.L().Warn("forward_as_cc buffered: read error",
zap.Error(err),
zap.String("request_id", requestID),
)
}
}
if finalResp == nil {
writeGatewayCCError(c, http.StatusBadGateway, "server_error", "Upstream stream ended without a response")
return nil, fmt.Errorf("upstream stream ended without response")
}
if usage.InputTokens > 0 || usage.OutputTokens > 0 {
finalResp.Usage = apicompat.AnthropicUsage{
InputTokens: usage.InputTokens,
OutputTokens: usage.OutputTokens,
}
}
// Chain: Anthropic → Responses → Chat Completions
responsesResp := apicompat.AnthropicToResponsesResponse(finalResp)
ccResp := apicompat.ResponsesToChatCompletions(responsesResp, originalModel)
if s.responseHeaderFilter != nil {
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
}
c.JSON(http.StatusOK, ccResp)
return &ForwardResult{
RequestID: requestID,
Usage: usage,
Model: originalModel,
UpstreamModel: mappedModel,
Stream: false,
Duration: time.Since(startTime),
}, nil
}
// handleCCStreamingFromAnthropic reads Anthropic SSE events, converts each
// to Responses events, then to Chat Completions chunks, and writes them.
func (s *GatewayService) handleCCStreamingFromAnthropic(
resp *http.Response,
c *gin.Context,
originalModel string,
mappedModel string,
startTime time.Time,
includeUsage bool,
) (*ForwardResult, error) {
requestID := resp.Header.Get("x-request-id")
if s.responseHeaderFilter != nil {
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
}
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("X-Accel-Buffering", "no")
c.Writer.WriteHeader(http.StatusOK)
// Use Anthropic→Responses state machine, then convert Responses→CC
anthState := apicompat.NewAnthropicEventToResponsesState()
anthState.Model = originalModel
ccState := apicompat.NewResponsesEventToChatState()
ccState.Model = originalModel
ccState.IncludeUsage = includeUsage
var usage ClaudeUsage
var firstTokenMs *int
firstChunk := true
scanner := bufio.NewScanner(resp.Body)
maxLineSize := defaultMaxLineSize
if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 {
maxLineSize = s.cfg.Gateway.MaxLineSize
}
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
resultWithUsage := func() *ForwardResult {
return &ForwardResult{
RequestID: requestID,
Usage: usage,
Model: originalModel,
UpstreamModel: mappedModel,
Stream: true,
Duration: time.Since(startTime),
FirstTokenMs: firstTokenMs,
}
}
writeChunk := func(chunk apicompat.ChatCompletionsChunk) bool {
sse, err := apicompat.ChatChunkToSSE(chunk)
if err != nil {
return false
}
if _, err := fmt.Fprint(c.Writer, sse); err != nil {
return true // client disconnected
}
return false
}
processAnthropicEvent := func(event *apicompat.AnthropicStreamEvent) bool {
if firstChunk {
firstChunk = false
ms := int(time.Since(startTime).Milliseconds())
firstTokenMs = &ms
}
// Extract usage
if event.Type == "message_delta" && event.Usage != nil {
usage = ClaudeUsage{
InputTokens: event.Usage.InputTokens,
OutputTokens: event.Usage.OutputTokens,
}
if event.Usage.CacheReadInputTokens > 0 {
usage.CacheReadInputTokens = event.Usage.CacheReadInputTokens
}
}
if event.Type == "message_start" && event.Message != nil && event.Message.Usage.InputTokens > 0 {
usage.InputTokens = event.Message.Usage.InputTokens
}
// Chain: Anthropic event → Responses events → CC chunks
responsesEvents := apicompat.AnthropicEventToResponsesEvents(event, anthState)
for _, resEvt := range responsesEvents {
ccChunks := apicompat.ResponsesEventToChatChunks(&resEvt, ccState)
for _, chunk := range ccChunks {
if disconnected := writeChunk(chunk); disconnected {
return true
}
}
}
c.Writer.Flush()
return false
}
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "event: ") {
continue
}
if !scanner.Scan() {
break
}
dataLine := scanner.Text()
if !strings.HasPrefix(dataLine, "data: ") {
continue
}
payload := dataLine[6:]
var event apicompat.AnthropicStreamEvent
if err := json.Unmarshal([]byte(payload), &event); err != nil {
continue
}
if processAnthropicEvent(&event) {
return resultWithUsage(), nil
}
}
if err := scanner.Err(); err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
logger.L().Warn("forward_as_cc stream: read error",
zap.Error(err),
zap.String("request_id", requestID),
)
}
}
// Finalize both state machines
finalResEvents := apicompat.FinalizeAnthropicResponsesStream(anthState)
for _, resEvt := range finalResEvents {
ccChunks := apicompat.ResponsesEventToChatChunks(&resEvt, ccState)
for _, chunk := range ccChunks {
writeChunk(chunk) //nolint:errcheck
}
}
finalCCChunks := apicompat.FinalizeResponsesChatStream(ccState)
for _, chunk := range finalCCChunks {
writeChunk(chunk) //nolint:errcheck
}
// Write [DONE] marker
fmt.Fprint(c.Writer, "data: [DONE]\n\n") //nolint:errcheck
c.Writer.Flush()
return resultWithUsage(), nil
}
// writeGatewayCCError writes an error in OpenAI Chat Completions format for
// the Anthropic-upstream CC forwarding path.
func writeGatewayCCError(c *gin.Context, statusCode int, errType, message string) {
c.JSON(statusCode, gin.H{
"error": gin.H{
"type": errType,
"message": message,
},
})
}
package service
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/apicompat"
"github.com/Wei-Shaw/sub2api/internal/pkg/claude"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/util/responseheaders"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)
// ForwardAsResponses accepts an OpenAI Responses API request body, converts it
// to Anthropic Messages format, forwards to the Anthropic upstream, and converts
// the response back to Responses format. This enables OpenAI Responses API
// clients to access Anthropic models through Anthropic platform groups.
//
// The method follows the same pattern as OpenAIGatewayService.ForwardAsAnthropic
// but in reverse direction: Responses → Anthropic upstream → Responses.
func (s *GatewayService) ForwardAsResponses(
ctx context.Context,
c *gin.Context,
account *Account,
body []byte,
parsed *ParsedRequest,
) (*ForwardResult, error) {
startTime := time.Now()
// 1. Parse Responses request
var responsesReq apicompat.ResponsesRequest
if err := json.Unmarshal(body, &responsesReq); err != nil {
return nil, fmt.Errorf("parse responses request: %w", err)
}
originalModel := responsesReq.Model
clientStream := responsesReq.Stream
// 2. Convert Responses → Anthropic
anthropicReq, err := apicompat.ResponsesToAnthropicRequest(&responsesReq)
if err != nil {
return nil, fmt.Errorf("convert responses to anthropic: %w", err)
}
// 3. Force upstream streaming (Anthropic works best with streaming)
anthropicReq.Stream = true
reqStream := true
// 4. Model mapping
mappedModel := originalModel
if account.Type == AccountTypeAPIKey {
mappedModel = account.GetMappedModel(originalModel)
}
if mappedModel == originalModel && account.Platform == PlatformAnthropic && account.Type != AccountTypeAPIKey {
normalized := claude.NormalizeModelID(originalModel)
if normalized != originalModel {
mappedModel = normalized
}
}
anthropicReq.Model = mappedModel
logger.L().Debug("gateway forward_as_responses: model mapping applied",
zap.Int64("account_id", account.ID),
zap.String("original_model", originalModel),
zap.String("mapped_model", mappedModel),
zap.Bool("client_stream", clientStream),
)
// 5. Marshal Anthropic request body
anthropicBody, err := json.Marshal(anthropicReq)
if err != nil {
return nil, fmt.Errorf("marshal anthropic request: %w", err)
}
// 6. Apply Claude Code mimicry for OAuth accounts (non-Claude-Code endpoints)
isClaudeCode := false // Responses API is never Claude Code
shouldMimicClaudeCode := account.IsOAuth() && !isClaudeCode
if shouldMimicClaudeCode {
if !strings.Contains(strings.ToLower(mappedModel), "haiku") &&
!systemIncludesClaudeCodePrompt(anthropicReq.System) {
anthropicBody = injectClaudeCodePrompt(anthropicBody, anthropicReq.System)
}
}
// 7. Enforce cache_control block limit
anthropicBody = enforceCacheControlLimit(anthropicBody)
// 8. Get access token
token, tokenType, err := s.GetAccessToken(ctx, account)
if err != nil {
return nil, fmt.Errorf("get access token: %w", err)
}
// 9. Get proxy URL
proxyURL := ""
if account.ProxyID != nil && account.Proxy != nil {
proxyURL = account.Proxy.URL()
}
// 10. Build upstream request
upstreamCtx, releaseUpstreamCtx := detachStreamUpstreamContext(ctx, reqStream)
upstreamReq, err := s.buildUpstreamRequest(upstreamCtx, c, account, anthropicBody, token, tokenType, mappedModel, reqStream, shouldMimicClaudeCode)
releaseUpstreamCtx()
if err != nil {
return nil, fmt.Errorf("build upstream request: %w", err)
}
// 11. Send request
resp, err := s.httpUpstream.DoWithTLS(upstreamReq, proxyURL, account.ID, account.Concurrency, account.IsTLSFingerprintEnabled())
if err != nil {
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
safeErr := sanitizeUpstreamErrorMessage(err.Error())
setOpsUpstreamError(c, 0, safeErr, "")
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: 0,
Kind: "request_error",
Message: safeErr,
})
writeResponsesError(c, http.StatusBadGateway, "server_error", "Upstream request failed")
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
}
defer func() { _ = resp.Body.Close() }()
// 12. Handle error response with failover
if resp.StatusCode >= 400 {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
resp.Body = io.NopCloser(bytes.NewReader(respBody))
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
if s.shouldFailoverUpstreamError(resp.StatusCode) {
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "failover",
Message: upstreamMsg,
})
if s.rateLimitService != nil {
s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
}
return nil, &UpstreamFailoverError{
StatusCode: resp.StatusCode,
ResponseBody: respBody,
}
}
// Non-failover error: return Responses-formatted error to client
writeResponsesError(c, mapUpstreamStatusCode(resp.StatusCode), "server_error", upstreamMsg)
return nil, fmt.Errorf("upstream error: %d %s", resp.StatusCode, upstreamMsg)
}
// 13. Handle normal response (convert Anthropic → Responses)
var result *ForwardResult
var handleErr error
if clientStream {
result, handleErr = s.handleResponsesStreamingResponse(resp, c, originalModel, mappedModel, startTime)
} else {
result, handleErr = s.handleResponsesBufferedStreamingResponse(resp, c, originalModel, mappedModel, startTime)
}
return result, handleErr
}
// handleResponsesBufferedStreamingResponse reads all Anthropic SSE events from
// the upstream streaming response, assembles them into a complete Anthropic
// response, converts to Responses API JSON format, and writes it to the client.
func (s *GatewayService) handleResponsesBufferedStreamingResponse(
resp *http.Response,
c *gin.Context,
originalModel string,
mappedModel string,
startTime time.Time,
) (*ForwardResult, error) {
requestID := resp.Header.Get("x-request-id")
scanner := bufio.NewScanner(resp.Body)
maxLineSize := defaultMaxLineSize
if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 {
maxLineSize = s.cfg.Gateway.MaxLineSize
}
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
// Accumulate the final Anthropic response from streaming events
var finalResp *apicompat.AnthropicResponse
var usage ClaudeUsage
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "event: ") {
continue
}
eventType := strings.TrimPrefix(line, "event: ")
// Read the data line
if !scanner.Scan() {
break
}
dataLine := scanner.Text()
if !strings.HasPrefix(dataLine, "data: ") {
continue
}
payload := dataLine[6:]
var event apicompat.AnthropicStreamEvent
if err := json.Unmarshal([]byte(payload), &event); err != nil {
logger.L().Warn("forward_as_responses buffered: failed to parse event",
zap.Error(err),
zap.String("request_id", requestID),
zap.String("event_type", eventType),
)
continue
}
// message_start carries the initial response structure
if event.Type == "message_start" && event.Message != nil {
finalResp = event.Message
}
// message_delta carries final usage and stop_reason
if event.Type == "message_delta" {
if event.Usage != nil {
usage = ClaudeUsage{
InputTokens: event.Usage.InputTokens,
OutputTokens: event.Usage.OutputTokens,
}
if event.Usage.CacheReadInputTokens > 0 {
usage.CacheReadInputTokens = event.Usage.CacheReadInputTokens
}
if event.Usage.CacheCreationInputTokens > 0 {
usage.CacheCreationInputTokens = event.Usage.CacheCreationInputTokens
}
}
if event.Delta != nil && event.Delta.StopReason != "" && finalResp != nil {
finalResp.StopReason = event.Delta.StopReason
}
}
// Accumulate content blocks
if event.Type == "content_block_start" && event.ContentBlock != nil && finalResp != nil {
finalResp.Content = append(finalResp.Content, *event.ContentBlock)
}
if event.Type == "content_block_delta" && event.Delta != nil && finalResp != nil && event.Index != nil {
idx := *event.Index
if idx < len(finalResp.Content) {
switch event.Delta.Type {
case "text_delta":
finalResp.Content[idx].Text += event.Delta.Text
case "thinking_delta":
finalResp.Content[idx].Thinking += event.Delta.Thinking
case "input_json_delta":
finalResp.Content[idx].Input = appendRawJSON(finalResp.Content[idx].Input, event.Delta.PartialJSON)
}
}
}
}
if err := scanner.Err(); err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
logger.L().Warn("forward_as_responses buffered: read error",
zap.Error(err),
zap.String("request_id", requestID),
)
}
}
if finalResp == nil {
writeResponsesError(c, http.StatusBadGateway, "server_error", "Upstream stream ended without a response")
return nil, fmt.Errorf("upstream stream ended without response")
}
// Update usage from accumulated delta
if usage.InputTokens > 0 || usage.OutputTokens > 0 {
finalResp.Usage = apicompat.AnthropicUsage{
InputTokens: usage.InputTokens,
OutputTokens: usage.OutputTokens,
CacheCreationInputTokens: usage.CacheCreationInputTokens,
CacheReadInputTokens: usage.CacheReadInputTokens,
}
}
// Convert to Responses format
responsesResp := apicompat.AnthropicToResponsesResponse(finalResp)
responsesResp.Model = originalModel // Use original model name
if s.responseHeaderFilter != nil {
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
}
c.JSON(http.StatusOK, responsesResp)
return &ForwardResult{
RequestID: requestID,
Usage: usage,
Model: originalModel,
UpstreamModel: mappedModel,
Stream: false,
Duration: time.Since(startTime),
}, nil
}
// handleResponsesStreamingResponse reads Anthropic SSE events from upstream,
// converts each to Responses SSE events, and writes them to the client.
func (s *GatewayService) handleResponsesStreamingResponse(
resp *http.Response,
c *gin.Context,
originalModel string,
mappedModel string,
startTime time.Time,
) (*ForwardResult, error) {
requestID := resp.Header.Get("x-request-id")
if s.responseHeaderFilter != nil {
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
}
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("X-Accel-Buffering", "no")
c.Writer.WriteHeader(http.StatusOK)
state := apicompat.NewAnthropicEventToResponsesState()
state.Model = originalModel
var usage ClaudeUsage
var firstTokenMs *int
firstChunk := true
scanner := bufio.NewScanner(resp.Body)
maxLineSize := defaultMaxLineSize
if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 {
maxLineSize = s.cfg.Gateway.MaxLineSize
}
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
resultWithUsage := func() *ForwardResult {
return &ForwardResult{
RequestID: requestID,
Usage: usage,
Model: originalModel,
UpstreamModel: mappedModel,
Stream: true,
Duration: time.Since(startTime),
FirstTokenMs: firstTokenMs,
}
}
// processEvent handles a single parsed Anthropic SSE event.
processEvent := func(event *apicompat.AnthropicStreamEvent) bool {
if firstChunk {
firstChunk = false
ms := int(time.Since(startTime).Milliseconds())
firstTokenMs = &ms
}
// Extract usage from message_delta
if event.Type == "message_delta" && event.Usage != nil {
usage = ClaudeUsage{
InputTokens: event.Usage.InputTokens,
OutputTokens: event.Usage.OutputTokens,
}
if event.Usage.CacheReadInputTokens > 0 {
usage.CacheReadInputTokens = event.Usage.CacheReadInputTokens
}
if event.Usage.CacheCreationInputTokens > 0 {
usage.CacheCreationInputTokens = event.Usage.CacheCreationInputTokens
}
}
// Also capture usage from message_start
if event.Type == "message_start" && event.Message != nil {
if event.Message.Usage.InputTokens > 0 {
usage.InputTokens = event.Message.Usage.InputTokens
}
}
// Convert to Responses events
events := apicompat.AnthropicEventToResponsesEvents(event, state)
for _, evt := range events {
sse, err := apicompat.ResponsesEventToSSE(evt)
if err != nil {
logger.L().Warn("forward_as_responses stream: failed to marshal event",
zap.Error(err),
zap.String("request_id", requestID),
)
continue
}
if _, err := fmt.Fprint(c.Writer, sse); err != nil {
logger.L().Info("forward_as_responses stream: client disconnected",
zap.String("request_id", requestID),
)
return true // client disconnected
}
}
if len(events) > 0 {
c.Writer.Flush()
}
return false
}
finalizeStream := func() (*ForwardResult, error) {
if finalEvents := apicompat.FinalizeAnthropicResponsesStream(state); len(finalEvents) > 0 {
for _, evt := range finalEvents {
sse, err := apicompat.ResponsesEventToSSE(evt)
if err != nil {
continue
}
fmt.Fprint(c.Writer, sse) //nolint:errcheck
}
c.Writer.Flush()
}
return resultWithUsage(), nil
}
// Read Anthropic SSE events
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "event: ") {
continue
}
eventType := strings.TrimPrefix(line, "event: ")
// Read data line
if !scanner.Scan() {
break
}
dataLine := scanner.Text()
if !strings.HasPrefix(dataLine, "data: ") {
continue
}
payload := dataLine[6:]
var event apicompat.AnthropicStreamEvent
if err := json.Unmarshal([]byte(payload), &event); err != nil {
logger.L().Warn("forward_as_responses stream: failed to parse event",
zap.Error(err),
zap.String("request_id", requestID),
zap.String("event_type", eventType),
)
continue
}
if processEvent(&event) {
return resultWithUsage(), nil
}
}
if err := scanner.Err(); err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
logger.L().Warn("forward_as_responses stream: read error",
zap.Error(err),
zap.String("request_id", requestID),
)
}
}
return finalizeStream()
}
// appendRawJSON appends a JSON fragment string to existing raw JSON.
func appendRawJSON(existing json.RawMessage, fragment string) json.RawMessage {
if len(existing) == 0 {
return json.RawMessage(fragment)
}
return json.RawMessage(string(existing) + fragment)
}
// writeResponsesError writes an error response in OpenAI Responses API format.
func writeResponsesError(c *gin.Context, statusCode int, code, message string) {
c.JSON(statusCode, gin.H{
"error": gin.H{
"code": code,
"message": message,
},
})
}
// mapUpstreamStatusCode maps upstream HTTP status codes to appropriate client-facing codes.
func mapUpstreamStatusCode(code int) int {
if code >= 500 {
return http.StatusBadGateway
}
return code
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment