Commit 065e4782 authored by 陈曦's avatar 陈曦
Browse files

不记录request_capture_logs表的bug修改

parent 1186671a
Pipeline #82292 passed with stage
in 2 minutes and 52 seconds
...@@ -134,6 +134,7 @@ func (r *apiKeyRepository) GetByKeyForAuth(ctx context.Context, key string) (*se ...@@ -134,6 +134,7 @@ func (r *apiKeyRepository) GetByKeyForAuth(ctx context.Context, key string) (*se
apikey.FieldRateLimit5h, apikey.FieldRateLimit5h,
apikey.FieldRateLimit1d, apikey.FieldRateLimit1d,
apikey.FieldRateLimit7d, apikey.FieldRateLimit7d,
apikey.FieldCaptureRequests,
). ).
WithUser(func(q *dbent.UserQuery) { WithUser(func(q *dbent.UserQuery) {
q.Select( q.Select(
......
...@@ -4335,6 +4335,7 @@ func (s *AntigravityGatewayService) ForwardUpstream(ctx context.Context, c *gin. ...@@ -4335,6 +4335,7 @@ func (s *AntigravityGatewayService) ForwardUpstream(ctx context.Context, c *gin.
var usage *ClaudeUsage var usage *ClaudeUsage
var firstTokenMs *int var firstTokenMs *int
var clientDisconnect bool var clientDisconnect bool
var responseBody string
if claudeReq.Stream { if claudeReq.Stream {
// 流式响应:透传 // 流式响应:透传
...@@ -4344,10 +4345,14 @@ func (s *AntigravityGatewayService) ForwardUpstream(ctx context.Context, c *gin. ...@@ -4344,10 +4345,14 @@ func (s *AntigravityGatewayService) ForwardUpstream(ctx context.Context, c *gin.
c.Header("X-Accel-Buffering", "no") c.Header("X-Accel-Buffering", "no")
c.Status(http.StatusOK) c.Status(http.StatusOK)
streamRes := s.streamUpstreamResponse(c, resp, startTime) streamRes := s.streamUpstreamResponse(ctx, c, resp, startTime)
usage = streamRes.usage usage = streamRes.usage
firstTokenMs = streamRes.firstTokenMs firstTokenMs = streamRes.firstTokenMs
clientDisconnect = streamRes.clientDisconnect clientDisconnect = streamRes.clientDisconnect
// 从 context buffer 读取已收集的 assistant 文本
if captureBuilder, ok := ctx.Value(ctxkey.ResponseCaptureBuffer).(*strings.Builder); ok && captureBuilder != nil {
responseBody = captureBuilder.String()
}
} else { } else {
// 非流式响应:直接透传 // 非流式响应:直接透传
respBody, err := io.ReadAll(resp.Body) respBody, err := io.ReadAll(resp.Body)
...@@ -4357,6 +4362,7 @@ func (s *AntigravityGatewayService) ForwardUpstream(ctx context.Context, c *gin. ...@@ -4357,6 +4362,7 @@ func (s *AntigravityGatewayService) ForwardUpstream(ctx context.Context, c *gin.
// 提取 usage // 提取 usage
usage = s.extractClaudeUsage(respBody) usage = s.extractClaudeUsage(respBody)
responseBody = string(respBody)
c.Header("Content-Type", resp.Header.Get("Content-Type")) c.Header("Content-Type", resp.Header.Get("Content-Type"))
c.Status(http.StatusOK) c.Status(http.StatusOK)
...@@ -4373,6 +4379,7 @@ func (s *AntigravityGatewayService) ForwardUpstream(ctx context.Context, c *gin. ...@@ -4373,6 +4379,7 @@ func (s *AntigravityGatewayService) ForwardUpstream(ctx context.Context, c *gin.
Duration: duration, Duration: duration,
FirstTokenMs: firstTokenMs, FirstTokenMs: firstTokenMs,
ClientDisconnect: clientDisconnect, ClientDisconnect: clientDisconnect,
ResponseBody: responseBody,
Usage: ClaudeUsage{ Usage: ClaudeUsage{
InputTokens: usage.InputTokens, InputTokens: usage.InputTokens,
OutputTokens: usage.OutputTokens, OutputTokens: usage.OutputTokens,
...@@ -4383,10 +4390,13 @@ func (s *AntigravityGatewayService) ForwardUpstream(ctx context.Context, c *gin. ...@@ -4383,10 +4390,13 @@ func (s *AntigravityGatewayService) ForwardUpstream(ctx context.Context, c *gin.
} }
// streamUpstreamResponse 透传上游 SSE 流并提取 Claude usage // streamUpstreamResponse 透传上游 SSE 流并提取 Claude usage
func (s *AntigravityGatewayService) streamUpstreamResponse(c *gin.Context, resp *http.Response, startTime time.Time) *antigravityStreamResult { func (s *AntigravityGatewayService) streamUpstreamResponse(ctx context.Context, c *gin.Context, resp *http.Response, startTime time.Time) *antigravityStreamResult {
usage := &ClaudeUsage{} usage := &ClaudeUsage{}
var firstTokenMs *int var firstTokenMs *int
// 响应体捕获:若 context 中注入了 ResponseCaptureBuffer,则收集 text_delta 文本
captureBuilder, _ := ctx.Value(ctxkey.ResponseCaptureBuffer).(*strings.Builder)
scanner := bufio.NewScanner(resp.Body) scanner := bufio.NewScanner(resp.Body)
maxLineSize := defaultMaxLineSize maxLineSize := defaultMaxLineSize
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.MaxLineSize > 0 { if s.settingService.cfg != nil && s.settingService.cfg.Gateway.MaxLineSize > 0 {
...@@ -4484,6 +4494,16 @@ func (s *AntigravityGatewayService) streamUpstreamResponse(c *gin.Context, resp ...@@ -4484,6 +4494,16 @@ func (s *AntigravityGatewayService) streamUpstreamResponse(c *gin.Context, resp
// 尝试从 message_delta 或 message_stop 事件提取 usage // 尝试从 message_delta 或 message_stop 事件提取 usage
s.extractSSEUsage(line, usage) s.extractSSEUsage(line, usage)
// 收集 assistant text(仅 content_block_delta + text_delta)
if captureBuilder != nil && strings.HasPrefix(line, "data: ") {
data := strings.TrimPrefix(line, "data: ")
if gjson.Get(data, "type").String() == "content_block_delta" {
if gjson.Get(data, "delta.type").String() == "text_delta" {
captureBuilder.WriteString(gjson.Get(data, "delta.text").String())
}
}
}
// 透传行 // 透传行
cw.Fprintf("%s\n", line) cw.Fprintf("%s\n", line)
......
...@@ -25,6 +25,8 @@ type APIKeyAuthSnapshot struct { ...@@ -25,6 +25,8 @@ type APIKeyAuthSnapshot struct {
RateLimit5h float64 `json:"rate_limit_5h"` RateLimit5h float64 `json:"rate_limit_5h"`
RateLimit1d float64 `json:"rate_limit_1d"` RateLimit1d float64 `json:"rate_limit_1d"`
RateLimit7d float64 `json:"rate_limit_7d"` RateLimit7d float64 `json:"rate_limit_7d"`
CaptureRequests bool `json:"capture_requests"`
} }
// APIKeyAuthUserSnapshot 用户快照 // APIKeyAuthUserSnapshot 用户快照
......
...@@ -14,7 +14,7 @@ import ( ...@@ -14,7 +14,7 @@ import (
"github.com/dgraph-io/ristretto" "github.com/dgraph-io/ristretto"
) )
const apiKeyAuthSnapshotVersion = 7 // v7: added UserGroupRPMOverride on user snapshot const apiKeyAuthSnapshotVersion = 8 // v8: added CaptureRequests on api key snapshot
type apiKeyAuthCacheConfig struct { type apiKeyAuthCacheConfig struct {
l1Size int l1Size int
...@@ -219,6 +219,7 @@ func (s *APIKeyService) snapshotFromAPIKey(ctx context.Context, apiKey *APIKey) ...@@ -219,6 +219,7 @@ func (s *APIKeyService) snapshotFromAPIKey(ctx context.Context, apiKey *APIKey)
RateLimit5h: apiKey.RateLimit5h, RateLimit5h: apiKey.RateLimit5h,
RateLimit1d: apiKey.RateLimit1d, RateLimit1d: apiKey.RateLimit1d,
RateLimit7d: apiKey.RateLimit7d, RateLimit7d: apiKey.RateLimit7d,
CaptureRequests: apiKey.CaptureRequests,
User: APIKeyAuthUserSnapshot{ User: APIKeyAuthUserSnapshot{
ID: apiKey.User.ID, ID: apiKey.User.ID,
Status: apiKey.User.Status, Status: apiKey.User.Status,
...@@ -292,6 +293,7 @@ func (s *APIKeyService) snapshotToAPIKey(key string, snapshot *APIKeyAuthSnapsho ...@@ -292,6 +293,7 @@ func (s *APIKeyService) snapshotToAPIKey(key string, snapshot *APIKeyAuthSnapsho
RateLimit5h: snapshot.RateLimit5h, RateLimit5h: snapshot.RateLimit5h,
RateLimit1d: snapshot.RateLimit1d, RateLimit1d: snapshot.RateLimit1d,
RateLimit7d: snapshot.RateLimit7d, RateLimit7d: snapshot.RateLimit7d,
CaptureRequests: snapshot.CaptureRequests,
User: &User{ User: &User{
ID: snapshot.User.ID, ID: snapshot.User.ID,
Status: snapshot.User.Status, Status: snapshot.User.Status,
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"hash/crc32" "hash/crc32"
"io" "io"
"net/http" "net/http"
"strings"
"sync/atomic" "sync/atomic"
"time" "time"
...@@ -16,6 +17,7 @@ import ( ...@@ -16,6 +17,7 @@ import (
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
"github.com/tidwall/sjson" "github.com/tidwall/sjson"
"github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger" "github.com/Wei-Shaw/sub2api/internal/pkg/logger"
) )
...@@ -48,6 +50,9 @@ func (s *GatewayService) handleBedrockStreamingResponse( ...@@ -48,6 +50,9 @@ func (s *GatewayService) handleBedrockStreamingResponse(
var firstTokenMs *int var firstTokenMs *int
clientDisconnected := false clientDisconnected := false
// 响应体捕获:若 context 中注入了 ResponseCaptureBuffer,则收集 text_delta 文本
captureBuilder, _ := ctx.Value(ctxkey.ResponseCaptureBuffer).(*strings.Builder)
// Bedrock EventStream 使用 application/vnd.amazon.eventstream 二进制格式。 // Bedrock EventStream 使用 application/vnd.amazon.eventstream 二进制格式。
// 每个帧结构:total_length(4) + headers_length(4) + prelude_crc(4) + headers + payload + message_crc(4) // 每个帧结构:total_length(4) + headers_length(4) + prelude_crc(4) + headers + payload + message_crc(4)
// 但更实用的方式是使用行扫描找 JSON chunks,因为 Bedrock 的响应在二进制帧中。 // 但更实用的方式是使用行扫描找 JSON chunks,因为 Bedrock 的响应在二进制帧中。
...@@ -141,6 +146,13 @@ func (s *GatewayService) handleBedrockStreamingResponse( ...@@ -141,6 +146,13 @@ func (s *GatewayService) handleBedrockStreamingResponse(
// 解析 SSE 事件数据提取 usage // 解析 SSE 事件数据提取 usage
s.parseSSEUsagePassthrough(string(sseData), usage) s.parseSSEUsagePassthrough(string(sseData), usage)
// 收集 assistant text(仅 content_block_delta + text_delta)
if captureBuilder != nil && gjson.GetBytes(sseData, "type").String() == "content_block_delta" {
if gjson.GetBytes(sseData, "delta.type").String() == "text_delta" {
captureBuilder.WriteString(gjson.GetBytes(sseData, "delta.text").String())
}
}
// 确定 SSE event type // 确定 SSE event type
eventType := gjson.GetBytes(sseData, "type").String() eventType := gjson.GetBytes(sseData, "type").String()
......
...@@ -4928,6 +4928,7 @@ func (s *GatewayService) forwardAnthropicAPIKeyPassthroughWithInput( ...@@ -4928,6 +4928,7 @@ func (s *GatewayService) forwardAnthropicAPIKeyPassthroughWithInput(
var usage *ClaudeUsage var usage *ClaudeUsage
var firstTokenMs *int var firstTokenMs *int
var clientDisconnect bool var clientDisconnect bool
var responseBody string
if input.RequestStream { if input.RequestStream {
streamResult, err := s.handleStreamingResponseAnthropicAPIKeyPassthrough(ctx, resp, c, account, input.StartTime, input.RequestModel) streamResult, err := s.handleStreamingResponseAnthropicAPIKeyPassthrough(ctx, resp, c, account, input.StartTime, input.RequestModel)
if err != nil { if err != nil {
...@@ -4936,8 +4937,12 @@ func (s *GatewayService) forwardAnthropicAPIKeyPassthroughWithInput( ...@@ -4936,8 +4937,12 @@ func (s *GatewayService) forwardAnthropicAPIKeyPassthroughWithInput(
usage = streamResult.usage usage = streamResult.usage
firstTokenMs = streamResult.firstTokenMs firstTokenMs = streamResult.firstTokenMs
clientDisconnect = streamResult.clientDisconnect clientDisconnect = streamResult.clientDisconnect
// 从 context buffer 读取已收集的 assistant 文本
if captureBuilder, ok := ctx.Value(ctxkey.ResponseCaptureBuffer).(*strings.Builder); ok && captureBuilder != nil {
responseBody = captureBuilder.String()
}
} else { } else {
usage, err = s.handleNonStreamingResponseAnthropicAPIKeyPassthrough(ctx, resp, c, account) responseBody, usage, err = s.handleNonStreamingResponseAnthropicAPIKeyPassthrough(ctx, resp, c, account)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -4955,6 +4960,7 @@ func (s *GatewayService) forwardAnthropicAPIKeyPassthroughWithInput( ...@@ -4955,6 +4960,7 @@ func (s *GatewayService) forwardAnthropicAPIKeyPassthroughWithInput(
Duration: time.Since(input.StartTime), Duration: time.Since(input.StartTime),
FirstTokenMs: firstTokenMs, FirstTokenMs: firstTokenMs,
ClientDisconnect: clientDisconnect, ClientDisconnect: clientDisconnect,
ResponseBody: responseBody,
}, nil }, nil
} }
...@@ -5051,6 +5057,9 @@ func (s *GatewayService) handleStreamingResponseAnthropicAPIKeyPassthrough( ...@@ -5051,6 +5057,9 @@ func (s *GatewayService) handleStreamingResponseAnthropicAPIKeyPassthrough(
clientDisconnected := false clientDisconnected := false
sawTerminalEvent := false sawTerminalEvent := false
// 响应体捕获:若 context 中注入了 ResponseCaptureBuffer,则收集 text_delta 文本
captureBuilder, _ := ctx.Value(ctxkey.ResponseCaptureBuffer).(*strings.Builder)
scanner := bufio.NewScanner(resp.Body) scanner := bufio.NewScanner(resp.Body)
maxLineSize := defaultMaxLineSize maxLineSize := defaultMaxLineSize
if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 { if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 {
...@@ -5145,6 +5154,12 @@ func (s *GatewayService) handleStreamingResponseAnthropicAPIKeyPassthrough( ...@@ -5145,6 +5154,12 @@ func (s *GatewayService) handleStreamingResponseAnthropicAPIKeyPassthrough(
firstTokenMs = &ms firstTokenMs = &ms
} }
s.parseSSEUsagePassthrough(data, usage) s.parseSSEUsagePassthrough(data, usage)
// 收集 assistant text(仅 content_block_delta + text_delta)
if captureBuilder != nil && gjson.Get(data, "type").String() == "content_block_delta" {
if gjson.Get(data, "delta.type").String() == "text_delta" {
captureBuilder.WriteString(gjson.Get(data, "delta.text").String())
}
}
} else { } else {
trimmed := strings.TrimSpace(line) trimmed := strings.TrimSpace(line)
if strings.HasPrefix(trimmed, "event:") && anthropicStreamEventIsTerminal(strings.TrimSpace(strings.TrimPrefix(trimmed, "event:")), "") { if strings.HasPrefix(trimmed, "event:") && anthropicStreamEventIsTerminal(strings.TrimSpace(strings.TrimPrefix(trimmed, "event:")), "") {
...@@ -5307,14 +5322,14 @@ func (s *GatewayService) handleNonStreamingResponseAnthropicAPIKeyPassthrough( ...@@ -5307,14 +5322,14 @@ func (s *GatewayService) handleNonStreamingResponseAnthropicAPIKeyPassthrough(
resp *http.Response, resp *http.Response,
c *gin.Context, c *gin.Context,
account *Account, account *Account,
) (*ClaudeUsage, error) { ) (string, *ClaudeUsage, error) {
if s.rateLimitService != nil { if s.rateLimitService != nil {
s.rateLimitService.UpdateSessionWindow(ctx, account, resp.Header) s.rateLimitService.UpdateSessionWindow(ctx, account, resp.Header)
} }
body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, anthropicTooLargeError) body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, anthropicTooLargeError)
if err != nil { if err != nil {
return nil, err return "", nil, err
} }
usage := parseClaudeUsageFromResponseBody(body) usage := parseClaudeUsageFromResponseBody(body)
...@@ -5326,7 +5341,7 @@ func (s *GatewayService) handleNonStreamingResponseAnthropicAPIKeyPassthrough( ...@@ -5326,7 +5341,7 @@ func (s *GatewayService) handleNonStreamingResponseAnthropicAPIKeyPassthrough(
} }
body = reverseToolNamesIfPresent(c, body) body = reverseToolNamesIfPresent(c, body)
c.Data(resp.StatusCode, contentType, body) c.Data(resp.StatusCode, contentType, body)
return usage, nil return string(body), usage, nil
} }
func writeAnthropicPassthroughResponseHeaders(dst http.Header, src http.Header, filter *responseheaders.CompiledHeaderFilter) { func writeAnthropicPassthroughResponseHeaders(dst http.Header, src http.Header, filter *responseheaders.CompiledHeaderFilter) {
...@@ -5427,6 +5442,7 @@ func (s *GatewayService) forwardBedrock( ...@@ -5427,6 +5442,7 @@ func (s *GatewayService) forwardBedrock(
var usage *ClaudeUsage var usage *ClaudeUsage
var firstTokenMs *int var firstTokenMs *int
var clientDisconnect bool var clientDisconnect bool
var responseBody string
if reqStream { if reqStream {
streamResult, err := s.handleBedrockStreamingResponse(ctx, resp, c, account, startTime, reqModel) streamResult, err := s.handleBedrockStreamingResponse(ctx, resp, c, account, startTime, reqModel)
if err != nil { if err != nil {
...@@ -5435,8 +5451,12 @@ func (s *GatewayService) forwardBedrock( ...@@ -5435,8 +5451,12 @@ func (s *GatewayService) forwardBedrock(
usage = streamResult.usage usage = streamResult.usage
firstTokenMs = streamResult.firstTokenMs firstTokenMs = streamResult.firstTokenMs
clientDisconnect = streamResult.clientDisconnect clientDisconnect = streamResult.clientDisconnect
// 从 context buffer 读取已收集的 assistant 文本
if captureBuilder, ok := ctx.Value(ctxkey.ResponseCaptureBuffer).(*strings.Builder); ok && captureBuilder != nil {
responseBody = captureBuilder.String()
}
} else { } else {
usage, err = s.handleBedrockNonStreamingResponse(ctx, resp, c, account) responseBody, usage, err = s.handleBedrockNonStreamingResponse(ctx, resp, c, account)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -5454,6 +5474,7 @@ func (s *GatewayService) forwardBedrock( ...@@ -5454,6 +5474,7 @@ func (s *GatewayService) forwardBedrock(
Duration: time.Since(startTime), Duration: time.Since(startTime),
FirstTokenMs: firstTokenMs, FirstTokenMs: firstTokenMs,
ClientDisconnect: clientDisconnect, ClientDisconnect: clientDisconnect,
ResponseBody: responseBody,
}, nil }, nil
} }
...@@ -5679,10 +5700,10 @@ func (s *GatewayService) handleBedrockNonStreamingResponse( ...@@ -5679,10 +5700,10 @@ func (s *GatewayService) handleBedrockNonStreamingResponse(
resp *http.Response, resp *http.Response,
c *gin.Context, c *gin.Context,
account *Account, account *Account,
) (*ClaudeUsage, error) { ) (string, *ClaudeUsage, error) {
body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, anthropicTooLargeError) body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, anthropicTooLargeError)
if err != nil { if err != nil {
return nil, err return "", nil, err
} }
// 转换 Bedrock 特有的 amazon-bedrock-invocationMetrics 为标准 Anthropic usage 格式 // 转换 Bedrock 特有的 amazon-bedrock-invocationMetrics 为标准 Anthropic usage 格式
...@@ -5696,7 +5717,7 @@ func (s *GatewayService) handleBedrockNonStreamingResponse( ...@@ -5696,7 +5717,7 @@ func (s *GatewayService) handleBedrockNonStreamingResponse(
c.Header("x-request-id", v) c.Header("x-request-id", v)
} }
c.Data(resp.StatusCode, "application/json", body) c.Data(resp.StatusCode, "application/json", body)
return usage, nil return string(body), usage, nil
} }
func (s *GatewayService) buildUpstreamRequest(ctx context.Context, c *gin.Context, account *Account, body []byte, token, tokenType, modelID string, reqStream bool, mimicClaudeCode bool) (*http.Request, error) { func (s *GatewayService) buildUpstreamRequest(ctx context.Context, c *gin.Context, account *Account, body []byte, token, tokenType, modelID string, reqStream bool, mimicClaudeCode bool) (*http.Request, error) {
......
...@@ -206,10 +206,12 @@ func (s *RequestCaptureService) writeToNFS( ...@@ -206,10 +206,12 @@ func (s *RequestCaptureService) writeToNFS(
} }
// nfsResponseEnvelope 是写入 NFS 响应文件的 JSON 结构。 // nfsResponseEnvelope 是写入 NFS 响应文件的 JSON 结构。
// Body 使用 any:非流式时为 json.RawMessage(保留原始 JSON 结构),
// 流式时为 string(纯文本,如中文内容),避免将非法 JSON 作为 RawMessage 导致编码失败。
type nfsResponseEnvelope struct { type nfsResponseEnvelope struct {
CaptureID int64 `json:"capture_id"` CaptureID int64 `json:"capture_id"`
CreatedAt time.Time `json:"created_at"` CreatedAt time.Time `json:"created_at"`
Body json.RawMessage `json:"body"` Body any `json:"body"`
} }
func (s *RequestCaptureService) writeResponseToNFS(filePath string, captureID int64, responseBody string) { func (s *RequestCaptureService) writeResponseToNFS(filePath string, captureID int64, responseBody string) {
...@@ -222,10 +224,19 @@ func (s *RequestCaptureService) writeResponseToNFS(filePath string, captureID in ...@@ -222,10 +224,19 @@ func (s *RequestCaptureService) writeResponseToNFS(filePath string, captureID in
return return
} }
// 若 responseBody 是合法 JSON(非流式响应),直接嵌入保留结构;
// 否则(流式纯文本),作为普通字符串存储,避免编码错误。
var body any
if json.Valid([]byte(responseBody)) {
body = json.RawMessage(responseBody)
} else {
body = responseBody
}
envelope := nfsResponseEnvelope{ envelope := nfsResponseEnvelope{
CaptureID: captureID, CaptureID: captureID,
CreatedAt: time.Now().UTC(), CreatedAt: time.Now().UTC(),
Body: json.RawMessage(responseBody), Body: body,
} }
var buf bytes.Buffer var buf bytes.Buffer
enc := json.NewEncoder(&buf) enc := json.NewEncoder(&buf)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment