package service import ( "bytes" "context" "encoding/json" "fmt" "os" "path/filepath" "sync" "time" "github.com/Wei-Shaw/sub2api/internal/config" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" "go.uber.org/zap" ) // RequestCaptureLogRepository 定义请求捕获日志的持久化接口。 type RequestCaptureLogRepository interface { Create(ctx context.Context, params CreateRequestCaptureLogParams) (int64, error) UpdateResponseBody(ctx context.Context, id int64, responseBody string) error } // CreateRequestCaptureLogParams 创建请求捕获日志的参数。 type CreateRequestCaptureLogParams struct { APIKeyID int64 UserID int64 RequestID string Path string Method string IPAddress string RequestBody string NFSFilePath string } // RequestCaptureService 异步捕获指定 API Key 的请求体,写入数据库和 NFS。 type RequestCaptureService struct { repo RequestCaptureLogRepository nfsPath string timeout time.Duration nfsPathMap sync.Map // captureID int64 → nfsFilePath string(短暂存活,CaptureResponse 调用后删除) } // nfsFileEnvelope 是写入 NFS 文件的 JSON 结构。 type nfsFileEnvelope struct { APIKeyID int64 `json:"api_key_id"` UserID int64 `json:"user_id"` RequestID string `json:"request_id"` CreatedAt time.Time `json:"created_at"` Path string `json:"path"` Method string `json:"method"` IPAddress string `json:"ip_address"` Body json.RawMessage `json:"body"` } // NewRequestCaptureService 创建 RequestCaptureService。 func NewRequestCaptureService(repo RequestCaptureLogRepository, cfg *config.Config) *RequestCaptureService { timeout := 5 * time.Second if cfg != nil && cfg.RequestCapture.WorkerTimeoutSeconds > 0 { timeout = time.Duration(cfg.RequestCapture.WorkerTimeoutSeconds) * time.Second } nfsPath := "" if cfg != nil { nfsPath = cfg.RequestCapture.NFSPath } if nfsPath != "" { logger.L().Info("request_capture: NFS storage enabled", zap.String("nfs_path", nfsPath)) } else { logger.L().Info("request_capture: NFS storage disabled (nfs_path not configured), DB-only mode") } return &RequestCaptureService{ repo: repo, nfsPath: nfsPath, timeout: timeout, } } // Capture 异步捕获请求体,立即返回 captureID(DB 行 ID),不阻塞调用方。 // 返回 0 表示捕获未启用或写入失败。 // DB 写入与 NFS 写入各自独立,互不影响。 func (s *RequestCaptureService) Capture( apiKeyID, userID int64, requestID, path, method, ipAddr string, body []byte, ) int64 { now := time.Now() // NFS 写入(独立 goroutine) nfsFilePath := "" if s.nfsPath != "" { nfsFilePath = s.buildNFSFilePath(apiKeyID, requestID, now) bodyCopy := make([]byte, len(body)) copy(bodyCopy, body) logger.L().Debug("request_capture: launching nfs request write", zap.Int64("api_key_id", apiKeyID), zap.String("nfs_file", nfsFilePath), ) go s.writeToNFS(nfsFilePath, apiKeyID, userID, requestID, path, method, ipAddr, bodyCopy, now) } // DB 写入(同步,需要拿到 ID) ctx, cancel := context.WithTimeout(context.Background(), s.timeout) defer cancel() id, err := s.repo.Create(ctx, CreateRequestCaptureLogParams{ APIKeyID: apiKeyID, UserID: userID, RequestID: requestID, Path: path, Method: method, IPAddress: ipAddr, RequestBody: string(body), NFSFilePath: nfsFilePath, }) if err != nil { logger.L().Error("request_capture: db write failed", zap.Int64("api_key_id", apiKeyID), zap.String("request_id", requestID), zap.Error(err), ) return 0 } // 记录 captureID → nfsFilePath 映射,供 CaptureResponse 写响应文件用 if nfsFilePath != "" { s.nfsPathMap.Store(id, nfsFilePath) } return id } // CaptureResponse 异步将响应体写入已有的捕获记录(数据库 + NFS),不阻塞调用方。 // captureID 为 Capture 返回的 ID,为 0 时直接忽略。 func (s *RequestCaptureService) CaptureResponse(captureID int64, responseBody string) { if captureID == 0 || responseBody == "" { return } // 取出并删除 NFS 路径映射(一次性消费) var nfsFilePath string if v, ok := s.nfsPathMap.LoadAndDelete(captureID); ok { nfsFilePath, _ = v.(string) } go func() { ctx, cancel := context.WithTimeout(context.Background(), s.timeout) defer cancel() if err := s.repo.UpdateResponseBody(ctx, captureID, responseBody); err != nil { logger.L().Error("request_capture: db update response failed", zap.Int64("capture_id", captureID), zap.Error(err), ) } // NFS 响应文件:与请求文件同目录,文件名加 _response 后缀 if nfsFilePath != "" { respPath := nfsResponseFilePath(nfsFilePath) logger.L().Debug("request_capture: launching nfs response write", zap.Int64("capture_id", captureID), zap.String("nfs_file", respPath), ) s.writeResponseToNFS(respPath, captureID, responseBody) } }() } // nfsResponseFilePath 将请求文件路径转换为响应文件路径。 // 例如:/nfs/2024-01-01/42/123_reqid.json → /nfs/2024-01-01/42/123_reqid_response.json func nfsResponseFilePath(requestPath string) string { ext := filepath.Ext(requestPath) base := requestPath[:len(requestPath)-len(ext)] return base + "_response" + ext } func (s *RequestCaptureService) buildNFSFilePath(apiKeyID int64, requestID string, t time.Time) string { date := t.UTC().Format("2006-01-02") filename := fmt.Sprintf("%d_%s.json", t.UnixNano(), requestID) return filepath.Join(s.nfsPath, date, fmt.Sprintf("%d", apiKeyID), filename) } func (s *RequestCaptureService) writeToNFS( filePath string, apiKeyID, userID int64, requestID, path, method, ipAddr string, body []byte, now time.Time, ) { dir := filepath.Dir(filePath) if err := os.MkdirAll(dir, 0o755); err != nil { logger.L().Error("request_capture: mkdir failed", zap.String("dir", dir), zap.Error(err), ) return } envelope := nfsFileEnvelope{ APIKeyID: apiKeyID, UserID: userID, RequestID: requestID, CreatedAt: now.UTC(), Path: path, Method: method, IPAddress: ipAddr, Body: json.RawMessage(body), } var buf bytes.Buffer enc := json.NewEncoder(&buf) enc.SetEscapeHTML(false) if err := enc.Encode(envelope); err != nil { logger.L().Error("request_capture: json marshal failed", zap.String("request_id", requestID), zap.Error(err), ) return } if err := os.WriteFile(filePath, buf.Bytes(), 0o644); err != nil { logger.L().Error("request_capture: nfs write failed", zap.String("file", filePath), zap.Error(err), ) } else { logger.L().Debug("request_capture: nfs request file written", zap.String("file", filePath)) } } // nfsResponseEnvelope 是写入 NFS 响应文件的 JSON 结构。 // Body 使用 any:非流式时为 json.RawMessage(保留原始 JSON 结构), // 流式时为 string(纯文本,如中文内容),避免将非法 JSON 作为 RawMessage 导致编码失败。 type nfsResponseEnvelope struct { CaptureID int64 `json:"capture_id"` CreatedAt time.Time `json:"created_at"` Body any `json:"body"` } func (s *RequestCaptureService) writeResponseToNFS(filePath string, captureID int64, responseBody string) { dir := filepath.Dir(filePath) if err := os.MkdirAll(dir, 0o755); err != nil { logger.L().Error("request_capture: mkdir failed (response)", zap.String("dir", dir), zap.Error(err), ) return } // 若 responseBody 是合法 JSON(非流式响应),直接嵌入保留结构; // 否则(流式纯文本),作为普通字符串存储,避免编码错误。 var body any if json.Valid([]byte(responseBody)) { body = json.RawMessage(responseBody) } else { body = responseBody } envelope := nfsResponseEnvelope{ CaptureID: captureID, CreatedAt: time.Now().UTC(), Body: body, } var buf bytes.Buffer enc := json.NewEncoder(&buf) enc.SetEscapeHTML(false) if err := enc.Encode(envelope); err != nil { logger.L().Error("request_capture: json marshal failed (response)", zap.Int64("capture_id", captureID), zap.Error(err), ) return } if err := os.WriteFile(filePath, buf.Bytes(), 0o644); err != nil { logger.L().Error("request_capture: nfs write failed (response)", zap.String("file", filePath), zap.Error(err), ) } else { logger.L().Debug("request_capture: nfs response file written", zap.String("file", filePath)) } }