# Request/Response Capture 功能变更报告 > **功能概述:** 对指定 API Key 开启请求体与响应体的双路采集,支持数据库和 NFS 两种存储方式, > 覆盖 Claude / OpenAI / Gemini 全部网关路径。支持通过管理员 API 动态开关,变更立即生效(缓存强制失效)。 --- ## 一、数据库迁移 **文件:`backend/migrations/108_request_capture_log.sql`** ```sql -- api_keys 新增开关字段 ALTER TABLE api_keys ADD COLUMN IF NOT EXISTS capture_requests boolean NOT NULL DEFAULT false; -- 按月分区的捕获日志表 CREATE TABLE IF NOT EXISTS request_capture_logs ( id bigserial NOT NULL, api_key_id bigint NOT NULL, user_id bigint NOT NULL, request_id varchar(64), path varchar(100), method varchar(10), ip_address varchar(45), request_body text, response_body text, nfs_file_path varchar(500), created_at timestamptz NOT NULL DEFAULT now(), PRIMARY KEY (id, created_at) ) PARTITION BY RANGE (created_at); CREATE INDEX IF NOT EXISTS idx_rcl_api_key_created ON request_capture_logs (api_key_id, created_at DESC); CREATE INDEX IF NOT EXISTS idx_rcl_user_id ON request_capture_logs (user_id); ``` 预建前、当、后三个月分区,后续需定期维护新分区。 --- ## 二、配置 ### 2.1 `backend/internal/config/config.go`(新增结构体) ```go type RequestCaptureConfig struct { NFSPath string `mapstructure:"nfs_path"` WorkerTimeoutSeconds int `mapstructure:"worker_timeout_seconds"` } ``` ### 2.2 `backend/config.yaml`(新增块) ```yaml request_capture: nfs_path: "" # 留空则跳过 NFS,仅写 DB worker_timeout_seconds: 5 ``` ### 2.3 `deploy/.env.example`(新增两行) ```env REQUEST_CAPTURE_NFS_PATH= REQUEST_CAPTURE_WORKER_TIMEOUT_SECONDS=5 ``` > viper 使用 `AutomaticEnv()` + `SetEnvKeyReplacer(".", "_")`, > 环境变量 `REQUEST_CAPTURE_NFS_PATH` 自动映射到 `request_capture.nfs_path`。 --- ## 三、核心服务 ### 3.1 `backend/internal/service/request_capture_service.go`(全新文件) | 方法 | 行为 | |---|---| | `Capture(apiKeyID, userID, requestID, path, method, ipAddr, body)` | **同步**写 DB(返回 captureID),**异步**写 NFS 请求文件 | | `CaptureResponse(captureID, responseBody)` | **全异步**:更新 DB `response_body` + 写 NFS 响应文件 | | `nfsResponseFilePath(requestPath)` | `xxx.json` → `xxx_response.json` | | `writeResponseToNFS(...)` | 写 `nfsResponseEnvelope{capture_id, created_at, body}` | **关键设计:** - `Capture()` 是**同步** DB 写入,保证调用方拿到 captureID 后立即可用于 `CaptureResponse()` - `CaptureResponse()` 是**全异步**(goroutine),不阻塞响应链路 - `sync.Map` 以 `captureID` 为 key 暂存 nfsFilePath,`LoadAndDelete` 一次性消费,避免内存泄漏 - `captureID == 0` 时 `CaptureResponse()` 静默跳过(DB 写失败时的降级) - NFS 响应文件 `Body` 字段类型为 `any`:先用 `json.Valid()` 判断——合法 JSON(非流式)用 `json.RawMessage` 保留结构,否则(流式纯文本)直接存 `string`,避免编码失败 **NFS 文件组织结构:** ``` {nfsPath}/ └── {YYYY-MM-DD}/ └── {apiKeyID}/ ├── {unixNano}_{requestID}.json ← 请求体 └── {unixNano}_{requestID}_response.json ← 响应体 ``` **NFS 文件格式:** ```json // 请求文件 { "api_key_id": 42, "user_id": 1, "request_id": "req-xxx", "created_at": "2024-01-01T00:00:00Z", "path": "/v1/messages", "method": "POST", "ip_address": "1.2.3.4", "body": { "model": "claude-3-5-haiku-20241022", "messages": [...] } } // 响应文件(非流式:body 为 JSON 对象;流式:body 为纯文本字符串) { "capture_id": 123, "created_at": "2024-01-01T00:00:01Z", "body": { "id": "msg_xxx", "content": [...] } } ``` ### 3.2 `backend/internal/repository/request_capture_log_repo.go`(全新文件) ```go Create(ctx, params CreateRequestCaptureLogParams) (int64, error) UpdateResponseBody(ctx, id int64, responseBody string) error ``` --- ## 四、认证缓存层(关键修复) ### 4.1 问题根因 `capture_requests` 字段要在每次请求认证时读取。认证路径经过三层缓存,字段必须在每一层都正确传递,否则即使数据库更新了,运行时读到的也是旧值 `false`。 ### 4.2 `backend/internal/repository/api_key_repo.go` **修复 1:`GetByKeyForAuth` Select 白名单补充字段** ```go // 之前缺失,导致认证路径查出来的字段值恒为 false apikey.FieldCaptureRequests, // ← 新增 ``` **修复 2:`Update()` 方法持久化字段** ```go builder.SetCaptureRequests(key.CaptureRequests) // ← 新增,否则 Update 不会写入该列 ``` ### 4.3 `backend/internal/service/api_key_auth_cache.go` ```go // APIKeyAuthSnapshot 新增字段 CaptureRequests bool `json:"capture_requests"` ``` ### 4.4 `backend/internal/service/api_key_auth_cache_impl.go` ```go // 版本号从 7 升到 8,使所有旧快照(不含该字段)在 Redis 中自动失效 const apiKeyAuthSnapshotVersion = 8 // v8: added CaptureRequests on api key snapshot // snapshotFromAPIKey:DB → 快照 CaptureRequests: apiKey.CaptureRequests, // snapshotToAPIKey:快照 → 运行时 APIKey 对象 CaptureRequests: snapshot.CaptureRequests, ``` **缓存 TTL 参考:** - L1 in-memory(ristretto):15 秒 - L2 Redis:300 秒(5 分钟) 版本号升级后,所有 Redis 中的旧快照会因版本不匹配而被丢弃,强制回源 DB。 --- ## 五、Context Buffer(流式响应文本采集) **`backend/internal/pkg/ctxkey/ctxkey.go`(新增常量)** ```go // ResponseCaptureBuffer 流式响应中收集 assistant 文本,供 request_capture 使用。 // 值类型为 *strings.Builder,由 handler 层注入,service 层只负责追加文本。 ResponseCaptureBuffer Key = "ctx_response_capture_buffer" ``` **流式文本采集流程:** ``` 1. handler 判断 apiKey.CaptureRequests && captureID > 0 ↓ 2. 注入 *strings.Builder 到 context(WithValue) ↓ 3. service streaming handler 解析 SSE 事件 在 content_block_delta + text_delta 事件中 captureBuilder.WriteString(text) ↓ 4. Forward() / forwardXxx() 读取 builder.String() 赋值给 ForwardResult.ResponseBody ↓ 5. handler 在 Forward() 返回后调用 CaptureResponse(captureID, result.ResponseBody) ``` --- ## 六、各端点覆盖详情 ### 6.1 `POST /v1/messages` → Anthropic 账号 | 文件 | 变更 | |---|---| | `handler/gateway_handler.go` | `Capture()` 保存 captureID;流式注入 `ResponseCaptureBuffer`;Anthropic & Gemini success path 调用 `CaptureResponse()` | | `service/gateway_service.go` | `ForwardResult` 新增 `ResponseBody string`;非流式读响应字节;流式三条路径均读 context buffer | **gateway_service.go 三条流式路径(均已覆盖):** - `handleStreamingResponseAnthropicAPIKeyPassthrough`(Anthropic API Key 直传流式) - `handleStreamingResponseForClaude`(OAuth 账号流式) - `handleNonStreamingResponseAnthropicAPIKeyPassthrough`(Anthropic 非流式,返回签名改为 `(string, *ClaudeUsage, error)`) ### 6.2 `POST /v1/messages` → Bedrock 账号 | 文件 | 变更 | |---|---| | `service/bedrock_stream.go` | 从 context 读取 `captureBuilder`;在 `content_block_delta + text_delta` 中追加文本 | | `service/gateway_service.go`(`forwardBedrock`) | 非流式使用 `handleBedrockNonStreamingResponse` 返回的 string;流式读 context buffer;`ForwardResult.ResponseBody` 填充 | | `service/gateway_service.go`(`handleBedrockNonStreamingResponse`) | 签名改为 `(string, *ClaudeUsage, error)`,返回 `string(body)` | ### 6.3 `POST /v1/messages` → Antigravity(Gemini)账号 | 文件 | 变更 | |---|---| | `handler/gateway_handler.go` | 复用 6.1 的 `captureID` + `CaptureResponse` 调用 | | `service/antigravity_gateway_service.go` | 四条路径均已覆盖(见下) | **antigravity_gateway_service.go 四条路径:** | 函数 | 类型 | 采集方式 | |---|---|---| | `handleClaudeStreamToNonStreaming` | Claude→非流式输出 | `responseBody = string(claudeResp)` | | `handleClaudeStreamingResponse` | Claude→流式输出 | 从 context buffer 读(`candidates[0].content.parts[0].text`) | | `handleGeminiStreamToNonStreaming` | Gemini→非流式输出 | `strings.Join(collectedTextParts, "")` | | `handleGeminiStreamingResponse` | Gemini→流式输出 | 从 context buffer 读(遍历 `candidates[0].content.parts[*].text`) | `streamUpstreamResponse` 函数签名新增 `ctx context.Context` 参数,两处调用点同步更新。 ### 6.4 `POST /v1/messages` → GeminiMessagesCompat 账号 | 文件 | 变更 | |---|---| | `handler/gateway_handler.go` | 复用 6.1 的 `captureID` + `CaptureResponse` 调用 | | `service/gemini_messages_compat_service.go` | `geminiStreamResult` 新增 `responseBody string` | | `handleStreamingResponse` | `responseBody = seenText`(函数内已有文本累积) | | `handleNonStreamingResponse` | 签名改为 `(string, *ClaudeUsage, error)`;`c.JSON` 改为 `json.Marshal + c.Data`,填充 `ResponseBody` | | `Forward()` | 三条 non-streaming 子路径均填 `responseBody`;`ForwardResult.ResponseBody` 填充 | ### 6.5 `POST /openai/v1/chat/completions` | 文件 | 变更 | |---|---| | `handler/openai_chat_completions.go` | `Capture()` + `CaptureResponse()` | | `service/openai_gateway_chat_completions.go` | 非流式:marshal `ccResp`;流式:`textBuilder` 收集 `Delta.Content` | ### 6.6 `POST /openai/v1/responses`(Codex path) | 文件 | 变更 | |---|---| | `handler/openai_gateway_handler.go`(`Responses` 函数) | `captureID` 变量声明保存返回值;success block 调用 `CaptureResponse()` | | `service/gateway_forward_as_responses.go` | 流式:`textBuilder` 收集 `content_block_delta + text_delta` | ### 6.7 `POST /openai/v1/responses`(Messages path)→ Anthropic 路由 | 文件 | 变更 | |---|---| | `handler/openai_gateway_handler.go`(`Messages` 函数) | **新增** `captureID` 声明及 `Capture()` 调用;success block 调用 `CaptureResponse()` | | `service/openai_gateway_messages.go`(`handleAnthropicBufferedResponse`) | `c.JSON` 改为 `json.Marshal + c.Data`,填充 `ResponseBody` | | `service/openai_gateway_messages.go`(`handleAnthropicStreamingResponse`) | 新增 `textBuilder`;在 `processDataLine` 中捕获 `content_block_delta + text_delta`;`resultWithUsage()` 加 `ResponseBody` | --- ## 七、管理员 API(动态开关 + 缓存强制失效) ### 7.1 接口 **`PUT /api/v1/admin/api-keys/:id/capture-requests`** ```bash # 开启 curl -X PUT https://your-server/api/v1/admin/api-keys/35/capture-requests \ -H "Authorization: Bearer $ADMIN_KEY" \ -H "Content-Type: application/json" \ -d '{"enabled": true}' # 关闭 curl -X PUT https://your-server/api/v1/admin/api-keys/35/capture-requests \ -H "Authorization: Bearer $ADMIN_KEY" \ -H "Content-Type: application/json" \ -d '{"enabled": false}' ``` **响应:** ```json {"code": 0, "data": {"id": 35, "capture_requests": true}} ``` ### 7.2 实现文件 | 文件 | 变更 | |---|---| | `handler/admin/apikey_handler.go` | 新增 `AdminSetCaptureRequestsRequest` 结构体 + `SetCaptureRequests` handler | | `service/admin_service.go` | 接口新增 `AdminSetCaptureRequests`;实现:`GetByID → Update → InvalidateAuthCacheByKey` | | `server/routes/admin.go` | `apiKeys.PUT("/:id/capture-requests", h.Admin.APIKey.SetCaptureRequests)` | | `handler/admin/admin_service_stub_test.go` | 新增 stub 实现 | ### 7.3 `AdminSetCaptureRequests` 执行流程 ``` 1. GetByID(keyID) ← 从 DB 读取完整 APIKey 对象 2. apiKey.CaptureRequests = enabled 3. apiKeyRepo.Update(apiKey) ← 持久化(包含 SetCaptureRequests builder 调用) 4. InvalidateAuthCacheByKey(apiKey.Key) ├─ 删除 L1 in-memory 缓存 └─ 删除 L2 Redis 缓存 ``` 步骤 4 确保下一条请求立即回源 DB,获取最新的 `capture_requests` 值。 ### 7.4 bash 脚本(项目根目录) **文件:`capture_requests.sh`** ```bash ./capture_requests.sh # 示例 BASE_URL=https://s2a-st.appbym.com ADMIN_KEY=sk-xxx ./capture_requests.sh 35 on BASE_URL=https://s2a-st.appbym.com ADMIN_KEY=sk-xxx ./capture_requests.sh 35 off ``` 支持 `on/true/1/yes` 和 `off/false/0/no`,输出带颜色,自动用 jq 格式化 JSON。 --- ## 八、端点覆盖总览 | 端点 | 协议 | 请求捕获 | 响应捕获 | |---|---|:---:|:---:| | `POST /v1/messages` → Anthropic 账号(非流式) | Claude Direct | ✅ | ✅ | | `POST /v1/messages` → Anthropic 账号(流式) | Claude Direct | ✅ | ✅ | | `POST /v1/messages` → Bedrock 账号(非流式) | Claude via Bedrock | ✅ | ✅ | | `POST /v1/messages` → Bedrock 账号(流式) | Claude via Bedrock | ✅ | ✅ | | `POST /v1/messages` → CC 转发(非流式) | Claude → OpenAI CC | ✅ | ✅ | | `POST /v1/messages` → CC 转发(流式) | Claude → OpenAI CC | ✅ | ✅ | | `POST /v1/messages` → Antigravity 账号(非流式) | Claude → Gemini | ✅ | ✅ | | `POST /v1/messages` → Antigravity 账号(流式) | Claude → Gemini | ✅ | ✅ | | `POST /v1/messages` → GeminiCompat 账号(非流式)| Claude → Gemini | ✅ | ✅ | | `POST /v1/messages` → GeminiCompat 账号(流式) | Claude → Gemini | ✅ | ✅ | | `POST /openai/v1/chat/completions`(非流式) | OpenAI CC | ✅ | ✅ | | `POST /openai/v1/chat/completions`(流式) | OpenAI CC | ✅ | ✅ | | `POST /openai/v1/responses`(Codex path,非流式)| OpenAI Responses | ✅ | ✅ | | `POST /openai/v1/responses`(Codex path,流式) | OpenAI Responses | ✅ | ✅ | | `POST /openai/v1/responses`(Messages path)| OpenAI → Anthropic | ✅ | ✅ | --- ## 九、依赖注入 **`backend/cmd/server/wire_gen.go`**(已生成,确认包含) ```go requestCaptureLogRepository := repository.NewRequestCaptureLogRepository(client) requestCaptureService := service.NewRequestCaptureService(requestCaptureLogRepository, configConfig) gatewayHandler := handler.NewGatewayHandler(..., requestCaptureService, ...) openAIGatewayHandler := handler.NewOpenAIGatewayHandler(..., requestCaptureService, ...) ``` --- ## 十、部署检查清单 部署前确认以下事项: - [ ] 执行数据库迁移(`108_request_capture_log.sql`),确认 `api_keys.capture_requests` 列存在 - [ ] 确认 `request_capture_logs` 分区表及当月分区已创建 - [ ] 确认服务重启后日志中出现 `request_capture: NFS storage enabled/disabled` - [ ] 若需 NFS 落盘,确认环境变量 `REQUEST_CAPTURE_NFS_PATH` 已设置且目录可写 - [ ] 部署完成后通过脚本测试管理 API:`./capture_requests.sh on` --- ## 十一、测试与验证 ### 11.1 前置条件 ```bash # 执行数据库迁移 psql -U sub2api -d sub2api -f backend/migrations/108_request_capture_log.sql # 配置 NFS(可选,留空则只写 DB) export REQUEST_CAPTURE_NFS_PATH=/tmp/nfs_test/ # 开启测试用 Key 的采集(通过管理员 API,避免绕过缓存失效) BASE_URL=http://localhost:8080 ADMIN_KEY=sk-admin \ ./capture_requests.sh on ``` ### 11.2 测试矩阵 ```bash KEY="your-capture-enabled-key" BASE="http://localhost:8080" # Claude非流式调用 curl -s -X POST $BASE/v1/messages \ -H "x-api-key: $KEY" -H "Content-Type: application/json" \ -d '{"model":"claude-sonnet-4-6","max_tokens":50, "messages":[{"role":"user","content":"你到底是谁呀?"}]}' # Claude流式调用 curl -s -X POST $BASE/v1/messages \ -H "x-api-key: $KEY" -H "Content-Type: application/json" \ -d '{"model":"claude-sonnet-4-6","max_tokens":50,"stream":true, "messages":[{"role":"user","content":"你用的什么模型"}]}' # OpenAI非流式调用 curl -X POST "$BASE/v1/chat/completions" \ -H "Authorization: Bearer $KEY" \ -H "Content-Type: application/json" \ -d '{ "model": "gpt-5.2", "messages": [ { "role": "user", "content": "现在几点了" } ], "max_tokens": 1024 }' # OpenAI流式调用 curl -X POST "$BASE/v1/chat/completions" \ -H "Authorization: Bearer $KEY" \ -H "Content-Type: application/json" \ -d '{ "model": "gpt-5.2", "messages": [ { "role": "user", "content": "你可以告诉我现在几点了吗" } ], "max_tokens": 1024, "stream": true }' ``` ### 11.3 DB 验证 ```sql -- 1. 请求后立即查(DB 写是同步的,应立即有记录) SELECT id, api_key_id, path, method, (request_body IS NOT NULL) AS has_req, (response_body IS NOT NULL) AS has_resp, nfs_file_path, created_at FROM request_capture_logs ORDER BY created_at DESC LIMIT 10; -- 2. 约 1 秒后查响应体(CaptureResponse 是异步的) SELECT id, length(request_body) AS req_len, length(response_body) AS resp_len, left(response_body, 100) AS resp_preview FROM request_capture_logs ORDER BY id DESC LIMIT 5; ``` **预期结果:** | 场景 | has_req | has_resp(~1s 后) | resp_preview | |---|:---:|:---:|---| | 非流式请求 | `true` | `true` | JSON,以 `{` 开头 | | 流式请求 | `true` | `true` | 纯文本,如 `"Hi! How can I..."` | | 中文流式请求 | `true` | `true` | 中文字符(无乱码) | | 未开启 capture_requests 的 Key | 无记录 | — | — | ### 11.4 NFS 验证(配置了 `nfs_path` 时) ```bash DATE=$(date +%Y-%m-%d) API_KEY_ID= NFS_DIR="${REQUEST_CAPTURE_NFS_PATH}/${DATE}/${API_KEY_ID}" # 查看文件列表(请求文件和响应文件应成对出现) ls -la "$NFS_DIR/" # 验证 JSON 格式 cat "$NFS_DIR/"*.json | python3 -m json.tool | head -30 # 验证中文无乱码 grep -r "你好" "$NFS_DIR/" && echo "中文正常" ``` ### 11.5 管理员 API 验证(关键:验证缓存失效是否生效) ```bash # 1. 开启 ./capture_requests.sh 35 on # 发一条请求,等 1 秒,查 DB 是否有记录 # 2. 关闭 ./capture_requests.sh 35 off # 再发一条请求,查 DB 新记录数不应增加 # 验证关闭生效(应无新记录) psql -c "SELECT count(*) FROM request_capture_logs WHERE api_key_id = 35 AND created_at > now() - interval '10 seconds';" ``` > **注意:** 若直接用 SQL `UPDATE api_keys SET capture_requests = false` 绕过服务层, > 缓存不会失效,关闭最多需等 L2 Redis TTL(5 分钟)才生效。 > **必须通过管理员 API 或脚本操作,才能立即生效。** ### 11.6 负向验证 ```bash NO_CAPTURE_KEY="your-normal-key" curl -s -X POST $BASE/v1/messages \ -H "x-api-key: $NO_CAPTURE_KEY" \ -H "Content-Type: application/json" \ -d '{"model":"claude-3-5-haiku-20241022","max_tokens":10, "messages":[{"role":"user","content":"hi"}]}' ``` ```sql -- 该 key 对应的 api_key_id 不应有任何记录 SELECT count(*) FROM request_capture_logs WHERE api_key_id = ; -- 预期:0 ``` --- ## 十二、已知限制与边界行为 | 场景 | 行为 | |---|---| | DB 写 request 失败 | `captureID = 0`,后续 `CaptureResponse` 自动跳过,请求正常返回 | | DB 写 response 失败 | 记录 error 日志,请求已正常返回,不影响用户 | | NFS 目录不存在 | `MkdirAll` 自动创建;失败则 error 日志,不影响 DB 写入 | | 流式请求客户端中途断开 | buffer 内已采集的文本会被写入,响应体为截断内容(属预期行为) | | `captureID` 对应 `CaptureResponse` 从未被调用 | `sync.Map` 中的条目滞留,但量级等同于并发请求数,可忽略 | | 直接 SQL 修改 `capture_requests` 字段 | 缓存不失效,最多等 5 分钟(Redis TTL)才生效;**应通过管理 API 操作** | | 流式纯文本响应写入 NFS | `json.Valid()` 判断后以 string 类型存入 `body` 字段,不会引起编码错误 | | 中文字符 | `json.Encoder` 设置 `SetEscapeHTML(false)`,中文原样存储,不转义为 `\uXXXX` |