# Request/Response Capture 功能变更报告 > 功能:对指定 API Key 开启请求体与响应体的双路采集,支持数据库和 NFS 两种存储方式,覆盖 Claude、OpenAI 及 Gemini 全部网关路径。 --- ## 一、数据库迁移 **文件:`backend/migrations/108_request_capture_log.sql`** ```sql -- api_keys 新增开关字段 ALTER TABLE api_keys ADD COLUMN IF NOT EXISTS capture_requests boolean NOT NULL DEFAULT false; -- 按月分区的捕获日志表 CREATE TABLE IF NOT EXISTS request_capture_logs ( id bigserial NOT NULL, api_key_id bigint NOT NULL, user_id bigint NOT NULL, request_id varchar(64), path varchar(100), method varchar(10), ip_address varchar(45), request_body text, response_body text, nfs_file_path varchar(500), created_at timestamptz NOT NULL DEFAULT now(), PRIMARY KEY (id, created_at) ) PARTITION BY RANGE (created_at); CREATE INDEX IF NOT EXISTS idx_rcl_api_key_created ON request_capture_logs (api_key_id, created_at DESC); CREATE INDEX IF NOT EXISTS idx_rcl_user_id ON request_capture_logs (user_id); ``` 预建前、当、后三个月分区,后续需定期维护新分区。 --- ## 二、配置 ### 2.1 `backend/internal/config/config.go`(新增结构体) ```go type RequestCaptureConfig struct { NFSPath string `mapstructure:"nfs_path"` WorkerTimeoutSeconds int `mapstructure:"worker_timeout_seconds"` } ``` ### 2.2 `backend/config.yaml`(新增块) ```yaml request_capture: nfs_path: "/app/logs/nfs/" worker_timeout_seconds: 5 ``` ### 2.3 `deploy/.env.example`(新增两行) ```env REQUEST_CAPTURE_NFS_PATH= REQUEST_CAPTURE_WORKER_TIMEOUT_SECONDS=5 ``` --- ## 三、核心服务 ### 3.1 `backend/internal/service/request_capture_service.go`(全新文件) | 方法 | 描述 | |---|---| | `Capture(...)` | 同步写 DB(返回 captureID),异步写 NFS 请求文件;若有 NFS 路径则将 `captureID→nfsFilePath` 存入 `sync.Map` | | `CaptureResponse(captureID, responseBody)` | 异步:更新 DB `response_body`;若有 NFS 路径则写 `<原文件名>_response.json` | | `nfsResponseFilePath(requestPath)` | `xxx.json` → `xxx_response.json` | | `writeResponseToNFS(...)` | 写 `nfsResponseEnvelope{capture_id, created_at, body}` | **关键设计:** - `Capture()` 是**同步** DB 写入,保证返回 captureID 后立即可用 - `CaptureResponse()` 是**全异步**,不阻塞请求响应链路 - `sync.Map` 以 captureID 为 key 暂存 nfsFilePath,`LoadAndDelete` 一次性消费,避免内存泄漏 **NFS 文件组织结构:** ``` {nfsPath}/ └── {YYYY-MM-DD}/ └── {apiKeyID}/ ├── {unixNano}_{requestID}.json ← 请求体 └── {unixNano}_{requestID}_response.json ← 响应体 ``` ### 3.2 `backend/internal/repository/request_capture_log_repo.go`(全新文件) ```go // 实现 RequestCaptureLogRepository 接口 Create(ctx, params) (int64, error) UpdateResponseBody(ctx, id, responseBody) error ``` --- ## 四、Context Key **`backend/internal/pkg/ctxkey/ctxkey.go`(新增)** ```go // ResponseCaptureBuffer 流式响应中收集 assistant 文本,供 request_capture 使用。 // 值类型为 *strings.Builder,由 handler 层注入,service 层只负责追加文本。 ResponseCaptureBuffer Key = "ctx_response_capture_buffer" ``` 流式请求的文本采集流程: ``` handler 注入 *strings.Builder 到 context ↓ service streaming handler 追加 text_delta ↓ Forward() / ForwardGemini() 读取 builder.String() ↓ ForwardResult.ResponseBody ↓ handler CaptureResponse() ``` --- ## 五、各端点覆盖详情 ### 5.1 `/v1/messages` → Anthropic 账号 | 位置 | 变更 | |---|---| | `handler/gateway_handler.go` | `Capture()` 保存 captureID;流式注入 `ResponseCaptureBuffer`;Anthropic success path 调用 `CaptureResponse()` | | `handler/gateway_handler.go`(Gemini success path)| **新增** `CaptureResponse(captureID, result.ResponseBody)`(约第513行) | | `service/gateway_service.go` | `ForwardResult` 新增 `ResponseBody string`;非流式读响应字节,流式读 context buffer | ### 5.2 `/v1/messages` → CC 转发(Anthropic CC 协议) | 位置 | 变更 | |---|---| | `handler/gateway_handler_chat_completions.go` | `Capture()` + `CaptureResponse()` | | `service/gateway_forward_as_chat_completions.go` | 非流式:marshal `ccResp` → `ResponseBody`;流式:`textBuilder` 收集 `Delta.Content` | ### 5.3 `/openai/v1/chat/completions` | 位置 | 变更 | |---|---| | `handler/openai_chat_completions.go` | `Capture()` + `CaptureResponse()` | | `service/openai_gateway_chat_completions.go` | 非流式:marshal `ccResp`;流式:`textBuilder` 收集 `Delta.Content` | ### 5.4 `/openai/v1/responses`(**本次重点修复**) **问题:** `Capture()` 返回值被丢弃,两条子路径均无 `CaptureResponse()` 调用,且 `ForwardAsAnthropic` 从未填充 `ResponseBody`。 | 位置 | 变更 | |---|---| | `handler/openai_gateway_handler.go` | **`Capture()` 改为保存 `captureID`**;Sub-path 1(`Forward()`)和 Sub-path 2(`ForwardAsAnthropic()`)success block 均**新增** `CaptureResponse()` | | `service/openai_gateway_messages.go`(`handleAnthropicBufferedResponse`)| `c.JSON` 改为 `json.Marshal + c.Data`,**填充 `ResponseBody`** | | `service/openai_gateway_messages.go`(`handleAnthropicStreamingResponse`)| **新增** `textBuilder`;`processDataLine` 中捕获 `content_block_delta / text_delta`;`resultWithUsage()` 加 `ResponseBody: textBuilder.String()` | ### 5.5 `/v1/messages` → Antigravity(Gemini)账号(**本次新增**) | 位置 | 变更 | |---|---| | `handler/gateway_handler.go` | 复用 5.1 的 `captureID` + `CaptureResponse` | | `service/antigravity_gateway_service.go` | `antigravityStreamResult` 新增 `responseBody string`;import 加 `ctxkey` | | `handleClaudeStreamToNonStreaming` | `responseBody: string(claudeResp)` | | `handleClaudeStreamingResponse` | 读 context buffer;gjson 提取每条 SSE 中的 `response.candidates.0.content.parts.0.text` 或 `candidates.0.content.parts.0.text` | | `handleGeminiStreamToNonStreaming` | `responseBody: strings.Join(collectedTextParts, "")` | | `handleGeminiStreamingResponse` | 读 context buffer;gjson 遍历 `candidates.0.content.parts[*].text` | | `Forward()` / `ForwardGemini()` | 流式从 context buffer 读;非流式从 `streamRes.responseBody` 读;`ForwardResult.ResponseBody` 填充 | ### 5.6 `/v1/messages` → GeminiMessagesCompat 账号(**本次新增**) | 位置 | 变更 | |---|---| | `handler/gateway_handler.go` | 复用 5.1 的 `captureID` + `CaptureResponse` | | `service/gemini_messages_compat_service.go` | `geminiStreamResult` 新增 `responseBody string` | | `handleStreamingResponse` | `responseBody: seenText`(函数内已有完整文本累积) | | `handleNonStreamingResponse` | 签名改为 `(string, *ClaudeUsage, error)`;`c.JSON` 改为 `json.Marshal + c.Data` | | `Forward()` | 三条 non-streaming 子路径均填 `responseBody`;`ForwardResult.ResponseBody` 填充 | --- ## 六、依赖注入 **`backend/cmd/server/wire_gen.go`**(已生成,确认包含) ```go requestCaptureLogRepository := repository.NewRequestCaptureLogRepository(client) requestCaptureService := service.NewRequestCaptureService(requestCaptureLogRepository, configConfig) gatewayHandler := handler.NewGatewayHandler(..., requestCaptureService, ...) openAIGatewayHandler := handler.NewOpenAIGatewayHandler(..., requestCaptureService, ...) ``` --- ## 七、端点覆盖总览 | 端点 | 协议 | 请求捕获 | 响应捕获 | |---|---|---|---| | `POST /v1/messages` → Anthropic 账号 | Claude | ✅ | ✅ | | `POST /v1/messages` → CC 转发 | Claude→OpenAI CC | ✅ | ✅ | | `POST /v1/messages` → Antigravity 账号 | Claude→Gemini | ✅ | ✅ | | `POST /v1/messages` → GeminiCompat 账号 | Claude→Gemini | ✅ | ✅ | | `POST /openai/v1/chat/completions` | OpenAI CC | ✅ | ✅ | | `POST /openai/v1/responses`(Codex path) | OpenAI Responses | ✅ | ✅ | | `POST /openai/v1/responses`(Messages path) | OpenAI→Anthropic | ✅ | ✅ | --- ## 八、测试流程 ### 8.1 前置条件 ```bash # 1. 执行数据库迁移 psql -U sub2api -d sub2api -f backend/migrations/108_request_capture_log.sql # 2. 配置(选其一) # 方式A:config.yaml 已有 request_capture 块,nfs_path 留空则只写 DB # 方式B:环境变量 export REQUEST_CAPTURE_NFS_PATH=/tmp/nfs_test/ # 留空则跳过 NFS # 3. 给测试用 API Key 开启采集标志 psql -U sub2api -d sub2api -c \ "UPDATE api_keys SET capture_requests = true WHERE id = ;" ``` ### 8.2 测试矩阵 每个端点各跑一次**非流式**和**流式**请求。 ```bash KEY="your-capture-enabled-key" BASE="http://localhost:8080" # ── Claude 端点(非流式)── curl -s -X POST $BASE/v1/messages \ -H "x-api-key: $KEY" -H "Content-Type: application/json" \ -d '{"model":"claude-3-5-haiku-20241022","max_tokens":50, "messages":[{"role":"user","content":"say hi"}]}' # ── Claude 端点(流式)── curl -s -X POST $BASE/v1/messages \ -H "x-api-key: $KEY" -H "Content-Type: application/json" \ -d '{"model":"claude-3-5-haiku-20241022","max_tokens":50,"stream":true, "messages":[{"role":"user","content":"say hi"}]}' # ── OpenAI Responses 端点 ── curl -s -X POST $BASE/openai/v1/responses \ -H "Authorization: Bearer $KEY" -H "Content-Type: application/json" \ -d '{"model":"gpt-4o","input":[{"role":"user","content":"say hi"}]}' # ── OpenAI Chat Completions 端点 ── curl -s -X POST $BASE/openai/v1/chat/completions \ -H "Authorization: Bearer $KEY" -H "Content-Type: application/json" \ -d '{"model":"gpt-4o","messages":[{"role":"user","content":"say hi"}]}' ``` ### 8.3 DB 验证 ```sql -- ① 请求后立即查(DB 写入是同步的,应立即有记录) SELECT id, api_key_id, path, method, (request_body IS NOT NULL) AS has_req, (response_body IS NOT NULL) AS has_resp, nfs_file_path, created_at FROM request_capture_logs ORDER BY created_at DESC LIMIT 10; -- ② 等待 ~1s 后查响应体(CaptureResponse 是异步的) SELECT id, length(request_body) AS req_len, length(response_body) AS resp_len, left(response_body, 100) AS resp_preview FROM request_capture_logs ORDER BY id DESC LIMIT 5; ``` **预期结果:** | 场景 | has_req | has_resp | resp_preview | |---|---|---|---| | 非流式请求 | `true` | `true`(~1s 内) | JSON,以 `{` 开头 | | 流式请求 | `true` | `true`(流结束后) | 纯文本,如 `"Hi! How can I..."` | | 未开启 capture_requests 的 Key | 无记录 | — | — | ### 8.4 NFS 验证(配置了 `nfs_path` 时) ```bash DATE=$(date +%Y-%m-%d) API_KEY_ID= NFS_DIR="${REQUEST_CAPTURE_NFS_PATH}/${DATE}/${API_KEY_ID}" # 查看文件列表(应有请求文件和响应文件成对出现) ls -la "$NFS_DIR/" # 查看请求文件结构 cat "$NFS_DIR/"*.json | python3 -m json.tool | head -20 # 查看响应文件结构(_response 后缀) cat "$NFS_DIR/"*_response.json | python3 -m json.tool | head -20 ``` **预期文件结构:** ```json // 请求文件:{unixNano}_{requestID}.json { "api_key_id": 42, "user_id": 1, "request_id": "req-xxx", "created_at": "2024-01-01T00:00:00Z", "path": "/v1/messages", "method": "POST", "ip_address": "127.0.0.1", "body": { "model": "...", "messages": [...] } } // 响应文件:{unixNano}_{requestID}_response.json { "capture_id": 123, "created_at": "2024-01-01T00:00:01Z", "body": { "id": "msg_xxx", "content": [...] } // 非流式为完整JSON;流式为纯文本字符串 } ``` ### 8.5 对照组(负向验证) ```bash NO_CAPTURE_KEY="your-normal-key" curl -s -X POST $BASE/v1/messages \ -H "x-api-key: $NO_CAPTURE_KEY" -H "Content-Type: application/json" \ -d '{"model":"claude-3-5-haiku-20241022","max_tokens":10, "messages":[{"role":"user","content":"hi"}]}' ``` ```sql -- 该 key 对应的 api_key_id 不应有任何记录 SELECT count(*) FROM request_capture_logs WHERE api_key_id = ; -- 预期:0 ``` ### 8.6 异常场景 | 场景 | 预期行为 | |---|---| | DB 写 request 失败 | `captureID = 0`,后续 `CaptureResponse` 自动跳过,请求正常返回 | | DB 写 response 失败 | 记录 error 日志,请求已正常返回,不影响用户 | | NFS 目录不存在 | `MkdirAll` 自动创建;失败则 error 日志,不影响 DB 写入 | | 流式请求客户端中途断开 | buffer 内已采集的文本会被写入,响应体为截断内容(属预期行为)| | `captureID` 泄漏(CaptureResponse 从未被调用)| `sync.Map` 中的条目会滞留,但量级等同于并发请求数,可忽略 |