Commit 994da655 authored by 陈曦's avatar 陈曦
Browse files

收集req和resp的相关更改

parent 8f7ac1ea
......@@ -20,6 +20,7 @@ import (
"github.com/Wei-Shaw/sub2api/ent/promocodeusage"
"github.com/Wei-Shaw/sub2api/ent/proxy"
"github.com/Wei-Shaw/sub2api/ent/redeemcode"
"github.com/Wei-Shaw/sub2api/ent/requestcapturelog"
"github.com/Wei-Shaw/sub2api/ent/schema"
"github.com/Wei-Shaw/sub2api/ent/securitysecret"
"github.com/Wei-Shaw/sub2api/ent/setting"
......@@ -132,6 +133,10 @@ func init() {
apikeyDescUsage7d := apikeyFields[16].Descriptor()
// apikey.DefaultUsage7d holds the default value on creation for the usage_7d field.
apikey.DefaultUsage7d = apikeyDescUsage7d.Default.(float64)
// apikeyDescCaptureRequests is the schema descriptor for capture_requests field.
apikeyDescCaptureRequests := apikeyFields[20].Descriptor()
// apikey.DefaultCaptureRequests holds the default value on creation for the capture_requests field.
apikey.DefaultCaptureRequests = apikeyDescCaptureRequests.Default.(bool)
accountMixin := schema.Account{}.Mixin()
accountMixinHooks1 := accountMixin[1].Hooks()
account.Hooks[0] = accountMixinHooks1[0]
......@@ -867,6 +872,32 @@ func init() {
redeemcodeDescValidityDays := redeemcodeFields[9].Descriptor()
// redeemcode.DefaultValidityDays holds the default value on creation for the validity_days field.
redeemcode.DefaultValidityDays = redeemcodeDescValidityDays.Default.(int)
requestcapturelogFields := schema.RequestCaptureLog{}.Fields()
_ = requestcapturelogFields
// requestcapturelogDescRequestID is the schema descriptor for request_id field.
requestcapturelogDescRequestID := requestcapturelogFields[2].Descriptor()
// requestcapturelog.RequestIDValidator is a validator for the "request_id" field. It is called by the builders before save.
requestcapturelog.RequestIDValidator = requestcapturelogDescRequestID.Validators[0].(func(string) error)
// requestcapturelogDescPath is the schema descriptor for path field.
requestcapturelogDescPath := requestcapturelogFields[3].Descriptor()
// requestcapturelog.PathValidator is a validator for the "path" field. It is called by the builders before save.
requestcapturelog.PathValidator = requestcapturelogDescPath.Validators[0].(func(string) error)
// requestcapturelogDescMethod is the schema descriptor for method field.
requestcapturelogDescMethod := requestcapturelogFields[4].Descriptor()
// requestcapturelog.MethodValidator is a validator for the "method" field. It is called by the builders before save.
requestcapturelog.MethodValidator = requestcapturelogDescMethod.Validators[0].(func(string) error)
// requestcapturelogDescIPAddress is the schema descriptor for ip_address field.
requestcapturelogDescIPAddress := requestcapturelogFields[5].Descriptor()
// requestcapturelog.IPAddressValidator is a validator for the "ip_address" field. It is called by the builders before save.
requestcapturelog.IPAddressValidator = requestcapturelogDescIPAddress.Validators[0].(func(string) error)
// requestcapturelogDescNfsFilePath is the schema descriptor for nfs_file_path field.
requestcapturelogDescNfsFilePath := requestcapturelogFields[8].Descriptor()
// requestcapturelog.NfsFilePathValidator is a validator for the "nfs_file_path" field. It is called by the builders before save.
requestcapturelog.NfsFilePathValidator = requestcapturelogDescNfsFilePath.Validators[0].(func(string) error)
// requestcapturelogDescCreatedAt is the schema descriptor for created_at field.
requestcapturelogDescCreatedAt := requestcapturelogFields[9].Descriptor()
// requestcapturelog.DefaultCreatedAt holds the default value on creation for the created_at field.
requestcapturelog.DefaultCreatedAt = requestcapturelogDescCreatedAt.Default.(func() time.Time)
securitysecretMixin := schema.SecuritySecret{}.Mixin()
securitysecretMixinFields0 := securitysecretMixin[0].Fields()
_ = securitysecretMixinFields0
......
......@@ -115,6 +115,11 @@ func (APIKey) Fields() []ent.Field {
Optional().
Nillable().
Comment("Start time of the current 7d rate limit window"),
// ========== Request capture ==========
field.Bool("capture_requests").
Default(false).
Comment("是否对该 API Key 的请求体进行存储捕获"),
}
}
......
package schema
import (
"time"
"entgo.io/ent"
"entgo.io/ent/dialect"
"entgo.io/ent/dialect/entsql"
"entgo.io/ent/schema"
"entgo.io/ent/schema/field"
"entgo.io/ent/schema/index"
)
// RequestCaptureLog 记录指定 API Key 的请求体,用于审计和分析。
// 只追加,不支持更新/删除(同 PaymentAuditLog 模式)。
type RequestCaptureLog struct {
ent.Schema
}
func (RequestCaptureLog) Annotations() []schema.Annotation {
return []schema.Annotation{
entsql.Annotation{Table: "request_capture_logs"},
}
}
func (RequestCaptureLog) Fields() []ent.Field {
return []ent.Field{
field.Int64("api_key_id"),
field.Int64("user_id"),
field.String("request_id").
MaxLen(64).
Optional().
Nillable(),
field.String("path").
MaxLen(100).
Optional().
Nillable(),
field.String("method").
MaxLen(10).
Optional().
Nillable(),
field.String("ip_address").
MaxLen(45).
Optional().
Nillable(),
// request_body 存原始 JSON 文本,不加索引,避免影响查询计划
field.Text("request_body").
Optional().
Nillable(),
// response_body 存响应文本(非 streaming 为完整 JSON,streaming 为拼接的 assistant text)
field.Text("response_body").
Optional().
Nillable(),
// nfs_file_path NFS 文件路径快照,方便核查
field.String("nfs_file_path").
MaxLen(500).
Optional().
Nillable(),
field.Time("created_at").
Default(time.Now).
Immutable().
SchemaType(map[string]string{dialect.Postgres: "timestamptz"}),
}
}
func (RequestCaptureLog) Edges() []ent.Edge {
return nil
}
func (RequestCaptureLog) Indexes() []ent.Index {
return []ent.Index{
index.Fields("api_key_id", "created_at"),
index.Fields("user_id"),
}
}
......@@ -44,6 +44,8 @@ type Tx struct {
Proxy *ProxyClient
// RedeemCode is the client for interacting with the RedeemCode builders.
RedeemCode *RedeemCodeClient
// RequestCaptureLog is the client for interacting with the RequestCaptureLog builders.
RequestCaptureLog *RequestCaptureLogClient
// SecuritySecret is the client for interacting with the SecuritySecret builders.
SecuritySecret *SecuritySecretClient
// Setting is the client for interacting with the Setting builders.
......@@ -212,6 +214,7 @@ func (tx *Tx) init() {
tx.PromoCodeUsage = NewPromoCodeUsageClient(tx.config)
tx.Proxy = NewProxyClient(tx.config)
tx.RedeemCode = NewRedeemCodeClient(tx.config)
tx.RequestCaptureLog = NewRequestCaptureLogClient(tx.config)
tx.SecuritySecret = NewSecuritySecretClient(tx.config)
tx.Setting = NewSettingClient(tx.config)
tx.SubscriptionPlan = NewSubscriptionPlanClient(tx.config)
......
......@@ -83,6 +83,16 @@ type Config struct {
Gemini GeminiConfig `mapstructure:"gemini"`
Update UpdateConfig `mapstructure:"update"`
Idempotency IdempotencyConfig `mapstructure:"idempotency"`
RequestCapture RequestCaptureConfig `mapstructure:"request_capture"`
}
// RequestCaptureConfig 配置请求体捕获功能
type RequestCaptureConfig struct {
// NFSPath 为本地挂载的 NFS 根目录(例如 /mnt/nfs/requests)。
// 留空则跳过文件写入,只写数据库。
NFSPath string `mapstructure:"nfs_path"`
// WorkerTimeoutSeconds 单次异步写入的超时时间(秒),默认 5。
WorkerTimeoutSeconds int `mapstructure:"worker_timeout_seconds"`
}
type LogConfig struct {
......
......@@ -45,6 +45,7 @@ type GatewayHandler struct {
apiKeyService *service.APIKeyService
usageRecordWorkerPool *service.UsageRecordWorkerPool
errorPassthroughService *service.ErrorPassthroughService
requestCaptureService *service.RequestCaptureService
concurrencyHelper *ConcurrencyHelper
userMsgQueueHelper *UserMsgQueueHelper
maxAccountSwitches int
......@@ -65,6 +66,7 @@ func NewGatewayHandler(
apiKeyService *service.APIKeyService,
usageRecordWorkerPool *service.UsageRecordWorkerPool,
errorPassthroughService *service.ErrorPassthroughService,
requestCaptureService *service.RequestCaptureService,
userMsgQueueService *service.UserMessageQueueService,
cfg *config.Config,
settingService *service.SettingService,
......@@ -98,6 +100,7 @@ func NewGatewayHandler(
apiKeyService: apiKeyService,
usageRecordWorkerPool: usageRecordWorkerPool,
errorPassthroughService: errorPassthroughService,
requestCaptureService: requestCaptureService,
concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatClaude, pingInterval),
userMsgQueueHelper: umqHelper,
maxAccountSwitches: maxAccountSwitches,
......@@ -147,6 +150,20 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
return
}
// 捕获请求体(仅当该 API Key 开启了 capture_requests)
var captureID int64
if apiKey.CaptureRequests && h.requestCaptureService != nil {
requestID, _ := c.Request.Context().Value(ctxkey.RequestID).(string)
captureID = h.requestCaptureService.Capture(
apiKey.ID, subject.UserID,
requestID,
c.Request.URL.Path,
c.Request.Method,
c.ClientIP(),
body,
)
}
setOpsRequestContext(c, "", false, body)
parsedReq, err := service.ParseGatewayRequest(body, domain.PlatformAnthropic)
......@@ -811,6 +828,11 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
result.ReasoningEffort = service.NormalizeClaudeOutputEffort(parsedReq.OutputEffort)
}
// 异步写入响应体到捕获记录
if captureID > 0 && h.requestCaptureService != nil {
h.requestCaptureService.CaptureResponse(captureID, result.ResponseBody)
}
// 使用量记录通过有界 worker 池提交,避免请求热路径创建无界 goroutine。
h.submitUsageRecordTask(func(ctx context.Context) {
if err := h.gatewayService.RecordUsage(ctx, &service.RecordUsageInput{
......
......@@ -7,6 +7,7 @@ import (
"time"
pkghttputil "github.com/Wei-Shaw/sub2api/internal/pkg/httputil"
"github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey"
"github.com/Wei-Shaw/sub2api/internal/pkg/ip"
middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware"
"github.com/Wei-Shaw/sub2api/internal/service"
......@@ -59,6 +60,19 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) {
return
}
// 异步捕获请求体(仅当该 API Key 开启了 capture_requests)
if apiKey.CaptureRequests && h.requestCaptureService != nil {
requestID, _ := c.Request.Context().Value(ctxkey.RequestID).(string)
h.requestCaptureService.Capture(
apiKey.ID, subject.UserID,
requestID,
c.Request.URL.Path,
c.Request.Method,
c.ClientIP(),
body,
)
}
setOpsRequestContext(c, "", false, body)
// Validate JSON
......
......@@ -7,6 +7,7 @@ import (
"time"
pkghttputil "github.com/Wei-Shaw/sub2api/internal/pkg/httputil"
"github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey"
"github.com/Wei-Shaw/sub2api/internal/pkg/ip"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware"
......@@ -61,6 +62,19 @@ func (h *OpenAIGatewayHandler) ChatCompletions(c *gin.Context) {
return
}
// 异步捕获请求体(仅当该 API Key 开启了 capture_requests)
if apiKey.CaptureRequests && h.requestCaptureService != nil {
requestID, _ := c.Request.Context().Value(ctxkey.RequestID).(string)
h.requestCaptureService.Capture(
apiKey.ID, subject.UserID,
requestID,
c.Request.URL.Path,
c.Request.Method,
c.ClientIP(),
body,
)
}
if !gjson.ValidBytes(body) {
h.errorResponse(c, http.StatusBadRequest, "invalid_request_error", "Failed to parse request body")
return
......
......@@ -13,6 +13,7 @@ import (
"github.com/Wei-Shaw/sub2api/internal/config"
pkghttputil "github.com/Wei-Shaw/sub2api/internal/pkg/httputil"
"github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey"
"github.com/Wei-Shaw/sub2api/internal/pkg/ip"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware"
......@@ -32,6 +33,7 @@ type OpenAIGatewayHandler struct {
apiKeyService *service.APIKeyService
usageRecordWorkerPool *service.UsageRecordWorkerPool
errorPassthroughService *service.ErrorPassthroughService
requestCaptureService *service.RequestCaptureService
concurrencyHelper *ConcurrencyHelper
maxAccountSwitches int
cfg *config.Config
......@@ -62,6 +64,7 @@ func NewOpenAIGatewayHandler(
apiKeyService *service.APIKeyService,
usageRecordWorkerPool *service.UsageRecordWorkerPool,
errorPassthroughService *service.ErrorPassthroughService,
requestCaptureService *service.RequestCaptureService,
cfg *config.Config,
) *OpenAIGatewayHandler {
pingInterval := time.Duration(0)
......@@ -78,6 +81,7 @@ func NewOpenAIGatewayHandler(
apiKeyService: apiKeyService,
usageRecordWorkerPool: usageRecordWorkerPool,
errorPassthroughService: errorPassthroughService,
requestCaptureService: requestCaptureService,
concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatComment, pingInterval),
maxAccountSwitches: maxAccountSwitches,
cfg: cfg,
......@@ -135,6 +139,19 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
return
}
// 异步捕获请求体(仅当该 API Key 开启了 capture_requests)
if apiKey.CaptureRequests && h.requestCaptureService != nil {
requestID, _ := c.Request.Context().Value(ctxkey.RequestID).(string)
h.requestCaptureService.Capture(
apiKey.ID, subject.UserID,
requestID,
c.Request.URL.Path,
c.Request.Method,
c.ClientIP(),
body,
)
}
setOpsRequestContext(c, "", false, body)
sessionHashBody := body
if service.IsOpenAIResponsesCompactPathForTest(c) {
......
......@@ -629,9 +629,10 @@ func apiKeyEntityToService(m *dbent.APIKey) *service.APIKey {
Usage5h: m.Usage5h,
Usage1d: m.Usage1d,
Usage7d: m.Usage7d,
Window5hStart: m.Window5hStart,
Window1dStart: m.Window1dStart,
Window7dStart: m.Window7dStart,
Window5hStart: m.Window5hStart,
Window1dStart: m.Window1dStart,
Window7dStart: m.Window7dStart,
CaptureRequests: m.CaptureRequests,
}
if m.Edges.User != nil {
out.User = userEntityToService(m.Edges.User)
......
package repository
import (
"context"
dbent "github.com/Wei-Shaw/sub2api/ent"
"github.com/Wei-Shaw/sub2api/internal/service"
)
type requestCaptureLogRepository struct {
client *dbent.Client
}
// NewRequestCaptureLogRepository 创建请求捕获日志仓储实例。
func NewRequestCaptureLogRepository(client *dbent.Client) service.RequestCaptureLogRepository {
return &requestCaptureLogRepository{client: client}
}
func (r *requestCaptureLogRepository) Create(ctx context.Context, params service.CreateRequestCaptureLogParams) (int64, error) {
q := r.client.RequestCaptureLog.Create().
SetAPIKeyID(params.APIKeyID).
SetUserID(params.UserID)
if params.RequestID != "" {
q = q.SetRequestID(params.RequestID)
}
if params.Path != "" {
q = q.SetPath(params.Path)
}
if params.Method != "" {
q = q.SetMethod(params.Method)
}
if params.IPAddress != "" {
q = q.SetIPAddress(params.IPAddress)
}
if params.RequestBody != "" {
q = q.SetRequestBody(params.RequestBody)
}
if params.NFSFilePath != "" {
q = q.SetNfsFilePath(params.NFSFilePath)
}
row, err := q.Save(ctx)
if err != nil {
return 0, err
}
return row.ID, nil
}
func (r *requestCaptureLogRepository) UpdateResponseBody(ctx context.Context, id int64, responseBody string) error {
return r.client.RequestCaptureLog.UpdateOneID(id).
SetResponseBody(responseBody).
Exec(ctx)
}
......@@ -89,6 +89,7 @@ var ProviderSet = wire.NewSet(
NewErrorPassthroughRepository,
NewTLSFingerprintProfileRepository,
NewChannelRepository,
NewRequestCaptureLogRepository,
// Cache implementations
NewGatewayCache,
......
......@@ -60,6 +60,9 @@ type APIKey struct {
Window5hStart *time.Time // Start of current 5h window
Window1dStart *time.Time // Start of current 1d window
Window7dStart *time.Time // Start of current 7d window
// 请求体捕获
CaptureRequests bool // 是否对该 Key 的请求体进行存储捕获
}
func (k *APIKey) IsActive() bool {
......
......@@ -500,6 +500,10 @@ type ForwardResult struct {
ClientDisconnect bool // 客户端是否在流式传输过程中断开
ReasoningEffort *string
// ResponseBody 响应内容:非 streaming 为完整 JSON,streaming 为拼接的 assistant text。
// 仅当 API Key 开启了 capture_requests 时才会被填充(通过 context 标记控制)。
ResponseBody string
// 图片生成计费字段(图片生成模型使用)
ImageCount int // 生成的图片数量
ImageSize string // 图片尺寸 "1K", "2K", "4K"
......@@ -4483,6 +4487,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
var usage *ClaudeUsage
var firstTokenMs *int
var clientDisconnect bool
var nonStreamingResponseBody string
if reqStream {
streamResult, err := s.handleStreamingResponse(ctx, resp, c, account, startTime, originalModel, reqModel, shouldMimicClaudeCode)
if err != nil {
......@@ -4497,10 +4502,12 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
firstTokenMs = streamResult.firstTokenMs
clientDisconnect = streamResult.clientDisconnect
} else {
usage, err = s.handleNonStreamingResponse(ctx, resp, c, account, originalModel, reqModel)
var nonStreamRespBody []byte
nonStreamRespBody, usage, err = s.handleNonStreamingResponse(ctx, resp, c, account, originalModel, reqModel)
if err != nil {
return nil, err
}
nonStreamingResponseBody = string(nonStreamRespBody)
}
return &ForwardResult{
......@@ -4512,6 +4519,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
Duration: time.Since(startTime),
FirstTokenMs: firstTokenMs,
ClientDisconnect: clientDisconnect,
ResponseBody: nonStreamingResponseBody,
}, nil
}
......@@ -7149,13 +7157,13 @@ func rewriteCacheCreationJSON(usageObj map[string]any, target string) bool {
return true
}
func (s *GatewayService) handleNonStreamingResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *Account, originalModel, mappedModel string) (*ClaudeUsage, error) {
func (s *GatewayService) handleNonStreamingResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *Account, originalModel, mappedModel string) ([]byte, *ClaudeUsage, error) {
// 更新5h窗口状态
s.rateLimitService.UpdateSessionWindow(ctx, account, resp.Header)
body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, anthropicTooLargeError)
if err != nil {
return nil, err
return nil, nil, err
}
// 解析usage
......@@ -7163,7 +7171,7 @@ func (s *GatewayService) handleNonStreamingResponse(ctx context.Context, resp *h
Usage ClaudeUsage `json:"usage"`
}
if err := json.Unmarshal(body, &response); err != nil {
return nil, fmt.Errorf("parse response: %w", err)
return nil, nil, fmt.Errorf("parse response: %w", err)
}
// 解析嵌套的 cache_creation 对象中的 5m/1h 明细
......@@ -7216,7 +7224,7 @@ func (s *GatewayService) handleNonStreamingResponse(ctx context.Context, resp *h
// 写入响应
c.Data(resp.StatusCode, contentType, body)
return &response.Usage, nil
return body, &response.Usage, nil
}
// replaceModelInResponseBody 替换响应体中的model字段
......
package service
import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"go.uber.org/zap"
)
// RequestCaptureLogRepository 定义请求捕获日志的持久化接口。
type RequestCaptureLogRepository interface {
Create(ctx context.Context, params CreateRequestCaptureLogParams) (int64, error)
UpdateResponseBody(ctx context.Context, id int64, responseBody string) error
}
// CreateRequestCaptureLogParams 创建请求捕获日志的参数。
type CreateRequestCaptureLogParams struct {
APIKeyID int64
UserID int64
RequestID string
Path string
Method string
IPAddress string
RequestBody string
NFSFilePath string
}
// RequestCaptureService 异步捕获指定 API Key 的请求体,写入数据库和 NFS。
type RequestCaptureService struct {
repo RequestCaptureLogRepository
nfsPath string
timeout time.Duration
}
// nfsFileEnvelope 是写入 NFS 文件的 JSON 结构。
type nfsFileEnvelope struct {
APIKeyID int64 `json:"api_key_id"`
UserID int64 `json:"user_id"`
RequestID string `json:"request_id"`
CreatedAt time.Time `json:"created_at"`
Path string `json:"path"`
Method string `json:"method"`
IPAddress string `json:"ip_address"`
Body json.RawMessage `json:"body"`
}
// NewRequestCaptureService 创建 RequestCaptureService。
func NewRequestCaptureService(repo RequestCaptureLogRepository, cfg *config.Config) *RequestCaptureService {
timeout := 5 * time.Second
if cfg != nil && cfg.RequestCapture.WorkerTimeoutSeconds > 0 {
timeout = time.Duration(cfg.RequestCapture.WorkerTimeoutSeconds) * time.Second
}
nfsPath := ""
if cfg != nil {
nfsPath = cfg.RequestCapture.NFSPath
}
return &RequestCaptureService{
repo: repo,
nfsPath: nfsPath,
timeout: timeout,
}
}
// Capture 异步捕获请求体,立即返回 captureID(DB 行 ID),不阻塞调用方。
// 返回 0 表示捕获未启用或写入失败。
// DB 写入与 NFS 写入各自独立,互不影响。
func (s *RequestCaptureService) Capture(
apiKeyID, userID int64,
requestID, path, method, ipAddr string,
body []byte,
) int64 {
now := time.Now()
// NFS 写入(独立 goroutine)
nfsFilePath := ""
if s.nfsPath != "" {
nfsFilePath = s.buildNFSFilePath(apiKeyID, requestID, now)
bodyCopy := make([]byte, len(body))
copy(bodyCopy, body)
go s.writeToNFS(nfsFilePath, apiKeyID, userID, requestID, path, method, ipAddr, bodyCopy, now)
}
// DB 写入(同步,需要拿到 ID)
ctx, cancel := context.WithTimeout(context.Background(), s.timeout)
defer cancel()
id, err := s.repo.Create(ctx, CreateRequestCaptureLogParams{
APIKeyID: apiKeyID,
UserID: userID,
RequestID: requestID,
Path: path,
Method: method,
IPAddress: ipAddr,
RequestBody: string(body),
NFSFilePath: nfsFilePath,
})
if err != nil {
logger.L().Error("request_capture: db write failed",
zap.Int64("api_key_id", apiKeyID),
zap.String("request_id", requestID),
zap.Error(err),
)
return 0
}
return id
}
// CaptureResponse 异步将响应体写入已有的捕获记录,不阻塞调用方。
// captureID 为 Capture 返回的 ID,为 0 时直接忽略。
func (s *RequestCaptureService) CaptureResponse(captureID int64, responseBody string) {
if captureID == 0 || responseBody == "" {
return
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), s.timeout)
defer cancel()
if err := s.repo.UpdateResponseBody(ctx, captureID, responseBody); err != nil {
logger.L().Error("request_capture: db update response failed",
zap.Int64("capture_id", captureID),
zap.Error(err),
)
}
}()
}
func (s *RequestCaptureService) buildNFSFilePath(apiKeyID int64, requestID string, t time.Time) string {
date := t.UTC().Format("2006-01-02")
filename := fmt.Sprintf("%d_%s.json", t.UnixNano(), requestID)
return filepath.Join(s.nfsPath, date, fmt.Sprintf("%d", apiKeyID), filename)
}
func (s *RequestCaptureService) writeToNFS(
filePath string,
apiKeyID, userID int64,
requestID, path, method, ipAddr string,
body []byte,
now time.Time,
) {
dir := filepath.Dir(filePath)
if err := os.MkdirAll(dir, 0o755); err != nil {
logger.L().Error("request_capture: mkdir failed",
zap.String("dir", dir),
zap.Error(err),
)
return
}
envelope := nfsFileEnvelope{
APIKeyID: apiKeyID,
UserID: userID,
RequestID: requestID,
CreatedAt: now.UTC(),
Path: path,
Method: method,
IPAddress: ipAddr,
Body: json.RawMessage(body),
}
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
enc.SetEscapeHTML(false)
if err := enc.Encode(envelope); err != nil {
logger.L().Error("request_capture: json marshal failed",
zap.String("request_id", requestID),
zap.Error(err),
)
return
}
if err := os.WriteFile(filePath, buf.Bytes(), 0o644); err != nil {
logger.L().Error("request_capture: nfs write failed",
zap.String("file", filePath),
zap.Error(err),
)
}
}
......@@ -462,6 +462,7 @@ var ProviderSet = wire.NewSet(
ProvideScheduledTestRunnerService,
NewGroupCapacityService,
NewChannelService,
NewRequestCaptureService,
NewModelPricingResolver,
ProvidePaymentConfigService,
NewPaymentService,
......
-- Add capture_requests flag to api_keys
ALTER TABLE api_keys ADD COLUMN IF NOT EXISTS capture_requests boolean NOT NULL DEFAULT false;
-- Create request_capture_logs table (monthly range-partitioned by created_at)
-- PRIMARY KEY must include the partition key, so we use (id, created_at).
CREATE TABLE IF NOT EXISTS request_capture_logs (
id bigserial NOT NULL,
api_key_id bigint NOT NULL,
user_id bigint NOT NULL,
request_id varchar(64),
path varchar(100),
method varchar(10),
ip_address varchar(45),
request_body text,
response_body text,
nfs_file_path varchar(500),
created_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at);
CREATE INDEX IF NOT EXISTS idx_rcl_api_key_created ON request_capture_logs (api_key_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_rcl_user_id ON request_capture_logs (user_id);
-- Pre-create partitions for previous, current, and next month
DO $$
DECLARE
month_start DATE;
prev_month DATE;
next_month DATE;
BEGIN
month_start := date_trunc('month', now() AT TIME ZONE 'UTC')::date;
prev_month := (month_start - INTERVAL '1 month')::date;
next_month := (month_start + INTERVAL '1 month')::date;
EXECUTE format(
'CREATE TABLE IF NOT EXISTS request_capture_logs_%s PARTITION OF request_capture_logs FOR VALUES FROM (%L) TO (%L)',
to_char(prev_month, 'YYYYMM'), prev_month, month_start
);
EXECUTE format(
'CREATE TABLE IF NOT EXISTS request_capture_logs_%s PARTITION OF request_capture_logs FOR VALUES FROM (%L) TO (%L)',
to_char(month_start, 'YYYYMM'), month_start, next_month
);
EXECUTE format(
'CREATE TABLE IF NOT EXISTS request_capture_logs_%s PARTITION OF request_capture_logs FOR VALUES FROM (%L) TO (%L)',
to_char(next_month, 'YYYYMM'), next_month, (next_month + INTERVAL '1 month')::date
);
END $$;
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment