Unverified Commit fd8473f2 authored by 程序猿MT's avatar 程序猿MT Committed by GitHub
Browse files

Merge branch 'Wei-Shaw:main' into main

parents 18b8bd43 cc4910dd
package middleware
import (
"context"
"github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
// ClientRequestID ensures every request has a unique client_request_id in request.Context().
//
// This is used by the Ops monitoring module for end-to-end request correlation.
func ClientRequestID() gin.HandlerFunc {
return func(c *gin.Context) {
if c.Request == nil {
c.Next()
return
}
if v := c.Request.Context().Value(ctxkey.ClientRequestID); v != nil {
c.Next()
return
}
id := uuid.New().String()
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), ctxkey.ClientRequestID, id))
c.Next()
}
}
......@@ -23,6 +23,7 @@ func SetupRouter(
apiKeyAuth middleware2.APIKeyAuthMiddleware,
apiKeyService *service.APIKeyService,
subscriptionService *service.SubscriptionService,
opsService *service.OpsService,
settingService *service.SettingService,
cfg *config.Config,
redisClient *redis.Client,
......@@ -46,7 +47,7 @@ func SetupRouter(
}
// 注册路由
registerRoutes(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, cfg, redisClient)
registerRoutes(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, cfg, redisClient)
return r
}
......@@ -60,6 +61,7 @@ func registerRoutes(
apiKeyAuth middleware2.APIKeyAuthMiddleware,
apiKeyService *service.APIKeyService,
subscriptionService *service.SubscriptionService,
opsService *service.OpsService,
cfg *config.Config,
redisClient *redis.Client,
) {
......@@ -73,5 +75,5 @@ func registerRoutes(
routes.RegisterAuthRoutes(v1, h, jwtAuth, redisClient)
routes.RegisterUserRoutes(v1, h, jwtAuth)
routes.RegisterAdminRoutes(v1, h, adminAuth)
routes.RegisterGatewayRoutes(r, h, apiKeyAuth, apiKeyService, subscriptionService, cfg)
routes.RegisterGatewayRoutes(r, h, apiKeyAuth, apiKeyService, subscriptionService, opsService, cfg)
}
......@@ -50,6 +50,9 @@ func RegisterAdminRoutes(
// 系统设置
registerSettingsRoutes(admin, h)
// 运维监控(Ops)
registerOpsRoutes(admin, h)
// 系统管理
registerSystemRoutes(admin, h)
......@@ -64,6 +67,58 @@ func RegisterAdminRoutes(
}
}
func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
ops := admin.Group("/ops")
{
// Realtime ops signals
ops.GET("/concurrency", h.Admin.Ops.GetConcurrencyStats)
ops.GET("/account-availability", h.Admin.Ops.GetAccountAvailability)
// Alerts (rules + events)
ops.GET("/alert-rules", h.Admin.Ops.ListAlertRules)
ops.POST("/alert-rules", h.Admin.Ops.CreateAlertRule)
ops.PUT("/alert-rules/:id", h.Admin.Ops.UpdateAlertRule)
ops.DELETE("/alert-rules/:id", h.Admin.Ops.DeleteAlertRule)
ops.GET("/alert-events", h.Admin.Ops.ListAlertEvents)
// Email notification config (DB-backed)
ops.GET("/email-notification/config", h.Admin.Ops.GetEmailNotificationConfig)
ops.PUT("/email-notification/config", h.Admin.Ops.UpdateEmailNotificationConfig)
// Runtime settings (DB-backed)
runtime := ops.Group("/runtime")
{
runtime.GET("/alert", h.Admin.Ops.GetAlertRuntimeSettings)
runtime.PUT("/alert", h.Admin.Ops.UpdateAlertRuntimeSettings)
}
// Advanced settings (DB-backed)
ops.GET("/advanced-settings", h.Admin.Ops.GetAdvancedSettings)
ops.PUT("/advanced-settings", h.Admin.Ops.UpdateAdvancedSettings)
// WebSocket realtime (QPS/TPS)
ws := ops.Group("/ws")
{
ws.GET("/qps", h.Admin.Ops.QPSWSHandler)
}
// Error logs (MVP-1)
ops.GET("/errors", h.Admin.Ops.GetErrorLogs)
ops.GET("/errors/:id", h.Admin.Ops.GetErrorLogByID)
ops.POST("/errors/:id/retry", h.Admin.Ops.RetryErrorRequest)
// Request drilldown (success + error)
ops.GET("/requests", h.Admin.Ops.ListRequestDetails)
// Dashboard (vNext - raw path for MVP)
ops.GET("/dashboard/overview", h.Admin.Ops.GetDashboardOverview)
ops.GET("/dashboard/throughput-trend", h.Admin.Ops.GetDashboardThroughputTrend)
ops.GET("/dashboard/latency-histogram", h.Admin.Ops.GetDashboardLatencyHistogram)
ops.GET("/dashboard/error-trend", h.Admin.Ops.GetDashboardErrorTrend)
ops.GET("/dashboard/error-distribution", h.Admin.Ops.GetDashboardErrorDistribution)
}
}
func registerDashboardRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
dashboard := admin.Group("/dashboard")
{
......
......@@ -16,13 +16,18 @@ func RegisterGatewayRoutes(
apiKeyAuth middleware.APIKeyAuthMiddleware,
apiKeyService *service.APIKeyService,
subscriptionService *service.SubscriptionService,
opsService *service.OpsService,
cfg *config.Config,
) {
bodyLimit := middleware.RequestBodyLimit(cfg.Gateway.MaxBodySize)
clientRequestID := middleware.ClientRequestID()
opsErrorLogger := handler.OpsErrorLoggerMiddleware(opsService)
// API网关(Claude API兼容)
gateway := r.Group("/v1")
gateway.Use(bodyLimit)
gateway.Use(clientRequestID)
gateway.Use(opsErrorLogger)
gateway.Use(gin.HandlerFunc(apiKeyAuth))
{
gateway.POST("/messages", h.Gateway.Messages)
......@@ -36,6 +41,8 @@ func RegisterGatewayRoutes(
// Gemini 原生 API 兼容层(Gemini SDK/CLI 直连)
gemini := r.Group("/v1beta")
gemini.Use(bodyLimit)
gemini.Use(clientRequestID)
gemini.Use(opsErrorLogger)
gemini.Use(middleware.APIKeyAuthWithSubscriptionGoogle(apiKeyService, subscriptionService, cfg))
{
gemini.GET("/models", h.Gateway.GeminiV1BetaListModels)
......@@ -45,7 +52,7 @@ func RegisterGatewayRoutes(
}
// OpenAI Responses API(不带v1前缀的别名)
r.POST("/responses", bodyLimit, gin.HandlerFunc(apiKeyAuth), h.OpenAIGateway.Responses)
r.POST("/responses", bodyLimit, clientRequestID, opsErrorLogger, gin.HandlerFunc(apiKeyAuth), h.OpenAIGateway.Responses)
// Antigravity 模型列表
r.GET("/antigravity/models", gin.HandlerFunc(apiKeyAuth), h.Gateway.AntigravityModels)
......@@ -53,6 +60,8 @@ func RegisterGatewayRoutes(
// Antigravity 专用路由(仅使用 antigravity 账户,不混合调度)
antigravityV1 := r.Group("/antigravity/v1")
antigravityV1.Use(bodyLimit)
antigravityV1.Use(clientRequestID)
antigravityV1.Use(opsErrorLogger)
antigravityV1.Use(middleware.ForcePlatform(service.PlatformAntigravity))
antigravityV1.Use(gin.HandlerFunc(apiKeyAuth))
{
......@@ -64,6 +73,8 @@ func RegisterGatewayRoutes(
antigravityV1Beta := r.Group("/antigravity/v1beta")
antigravityV1Beta.Use(bodyLimit)
antigravityV1Beta.Use(clientRequestID)
antigravityV1Beta.Use(opsErrorLogger)
antigravityV1Beta.Use(middleware.ForcePlatform(service.PlatformAntigravity))
antigravityV1Beta.Use(middleware.APIKeyAuthWithSubscriptionGoogle(apiKeyService, subscriptionService, cfg))
{
......
......@@ -564,6 +564,14 @@ urlFallbackLoop:
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil {
safeErr := sanitizeUpstreamErrorMessage(err.Error())
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: 0,
Kind: "request_error",
Message: safeErr,
})
// 检查是否应触发 URL 降级
if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 {
antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
......@@ -579,6 +587,7 @@ urlFallbackLoop:
continue
}
log.Printf("%s status=request_failed retries_exhausted error=%v", prefix, err)
setOpsUpstreamError(c, 0, safeErr, "")
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries")
}
......@@ -586,6 +595,26 @@ urlFallbackLoop:
if resp.StatusCode == http.StatusTooManyRequests && urlIdx < len(availableURLs)-1 {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody
maxBytes := 2048
if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 {
maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
}
upstreamDetail := ""
if logBody {
upstreamDetail = truncateString(string(respBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "retry",
Message: upstreamMsg,
Detail: upstreamDetail,
})
antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
log.Printf("%s URL fallback (HTTP 429): %s -> %s body=%s", prefix, baseURL, availableURLs[urlIdx+1], truncateForLog(respBody, 200))
continue urlFallbackLoop
......@@ -596,6 +625,26 @@ urlFallbackLoop:
_ = resp.Body.Close()
if attempt < antigravityMaxRetries {
upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody
maxBytes := 2048
if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 {
maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
}
upstreamDetail := ""
if logBody {
upstreamDetail = truncateString(string(respBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "retry",
Message: upstreamMsg,
Detail: upstreamDetail,
})
log.Printf("%s status=%d retry=%d/%d body=%s", prefix, resp.StatusCode, attempt, antigravityMaxRetries, truncateForLog(respBody, 500))
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
log.Printf("%s status=context_canceled_during_backoff", prefix)
......@@ -628,6 +677,27 @@ urlFallbackLoop:
// Antigravity /v1internal 链路在部分场景会对 thought/thinking signature 做严格校验,
// 当历史消息携带的 signature 不合法时会直接 400;去除 thinking 后可继续完成请求。
if resp.StatusCode == http.StatusBadRequest && isSignatureRelatedError(respBody) {
upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody
maxBytes := 2048
if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 {
maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
}
upstreamDetail := ""
if logBody {
upstreamDetail = truncateString(string(respBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "signature_error",
Message: upstreamMsg,
Detail: upstreamDetail,
})
// Conservative two-stage fallback:
// 1) Disable top-level thinking + thinking->text
// 2) Only if still signature-related 400: also downgrade tool_use/tool_result to text.
......@@ -661,6 +731,13 @@ urlFallbackLoop:
}
retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
if retryErr != nil {
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: 0,
Kind: "signature_retry_request_error",
Message: sanitizeUpstreamErrorMessage(retryErr.Error()),
})
log.Printf("Antigravity account %d: signature retry request failed (%s): %v", account.ID, stage.name, retryErr)
continue
}
......@@ -674,6 +751,25 @@ urlFallbackLoop:
retryBody, _ := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
_ = retryResp.Body.Close()
kind := "signature_retry"
if strings.TrimSpace(stage.name) != "" {
kind = "signature_retry_" + strings.ReplaceAll(stage.name, "+", "_")
}
retryUpstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(retryBody))
retryUpstreamMsg = sanitizeUpstreamErrorMessage(retryUpstreamMsg)
retryUpstreamDetail := ""
if logBody {
retryUpstreamDetail = truncateString(string(retryBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: retryResp.StatusCode,
UpstreamRequestID: retryResp.Header.Get("x-request-id"),
Kind: kind,
Message: retryUpstreamMsg,
Detail: retryUpstreamDetail,
})
// If this stage fixed the signature issue, we stop; otherwise we may try the next stage.
if retryResp.StatusCode != http.StatusBadRequest || !isSignatureRelatedError(retryBody) {
......@@ -701,10 +797,30 @@ urlFallbackLoop:
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
if s.shouldFailoverUpstreamError(resp.StatusCode) {
upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody
maxBytes := 2048
if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 {
maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
}
upstreamDetail := ""
if logBody {
upstreamDetail = truncateString(string(respBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "failover",
Message: upstreamMsg,
Detail: upstreamDetail,
})
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
return nil, s.writeMappedClaudeError(c, resp.StatusCode, respBody)
return nil, s.writeMappedClaudeError(c, account, resp.StatusCode, resp.Header.Get("x-request-id"), respBody)
}
}
......@@ -1108,6 +1224,14 @@ urlFallbackLoop:
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil {
safeErr := sanitizeUpstreamErrorMessage(err.Error())
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: 0,
Kind: "request_error",
Message: safeErr,
})
// 检查是否应触发 URL 降级
if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 {
antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
......@@ -1123,6 +1247,7 @@ urlFallbackLoop:
continue
}
log.Printf("%s status=request_failed retries_exhausted error=%v", prefix, err)
setOpsUpstreamError(c, 0, safeErr, "")
return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries")
}
......@@ -1130,6 +1255,26 @@ urlFallbackLoop:
if resp.StatusCode == http.StatusTooManyRequests && urlIdx < len(availableURLs)-1 {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody
maxBytes := 2048
if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 {
maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
}
upstreamDetail := ""
if logBody {
upstreamDetail = truncateString(string(respBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "retry",
Message: upstreamMsg,
Detail: upstreamDetail,
})
antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
log.Printf("%s URL fallback (HTTP 429): %s -> %s body=%s", prefix, baseURL, availableURLs[urlIdx+1], truncateForLog(respBody, 200))
continue urlFallbackLoop
......@@ -1140,6 +1285,26 @@ urlFallbackLoop:
_ = resp.Body.Close()
if attempt < antigravityMaxRetries {
upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody
maxBytes := 2048
if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 {
maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
}
upstreamDetail := ""
if logBody {
upstreamDetail = truncateString(string(respBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "retry",
Message: upstreamMsg,
Detail: upstreamDetail,
})
log.Printf("%s status=%d retry=%d/%d", prefix, resp.StatusCode, attempt, antigravityMaxRetries)
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
log.Printf("%s status=context_canceled_during_backoff", prefix)
......@@ -1205,21 +1370,59 @@ urlFallbackLoop:
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
if s.shouldFailoverUpstreamError(resp.StatusCode) {
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
// 解包并返回错误
requestID := resp.Header.Get("x-request-id")
if requestID != "" {
c.Header("x-request-id", requestID)
}
unwrapped, _ := s.unwrapV1InternalResponse(respBody)
unwrapped, unwrapErr := s.unwrapV1InternalResponse(respBody)
unwrappedForOps := unwrapped
if unwrapErr != nil || len(unwrappedForOps) == 0 {
unwrappedForOps = respBody
}
upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(unwrappedForOps))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody
maxBytes := 2048
if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 {
maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
}
upstreamDetail := ""
if logBody {
upstreamDetail = truncateString(string(unwrappedForOps), maxBytes)
}
// Always record upstream context for Ops error logs, even when we will failover.
setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail)
if s.shouldFailoverUpstreamError(resp.StatusCode) {
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: requestID,
Kind: "failover",
Message: upstreamMsg,
Detail: upstreamDetail,
})
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
contentType := resp.Header.Get("Content-Type")
if contentType == "" {
contentType = "application/json"
}
c.Data(resp.StatusCode, contentType, unwrapped)
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: requestID,
Kind: "http_error",
Message: upstreamMsg,
Detail: upstreamDetail,
})
c.Data(resp.StatusCode, contentType, unwrappedForOps)
return nil, fmt.Errorf("antigravity upstream error: %d", resp.StatusCode)
}
......@@ -1674,9 +1877,35 @@ func (s *AntigravityGatewayService) writeClaudeError(c *gin.Context, status int,
return fmt.Errorf("%s", message)
}
func (s *AntigravityGatewayService) writeMappedClaudeError(c *gin.Context, upstreamStatus int, body []byte) error {
// 记录上游错误详情便于调试
log.Printf("[antigravity-Forward] upstream_error status=%d body=%s", upstreamStatus, string(body))
func (s *AntigravityGatewayService) writeMappedClaudeError(c *gin.Context, account *Account, upstreamStatus int, upstreamRequestID string, body []byte) error {
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(body))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody
maxBytes := 2048
if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 {
maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
}
upstreamDetail := ""
if logBody {
upstreamDetail = truncateString(string(body), maxBytes)
}
setOpsUpstreamError(c, upstreamStatus, upstreamMsg, upstreamDetail)
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: upstreamStatus,
UpstreamRequestID: upstreamRequestID,
Kind: "http_error",
Message: upstreamMsg,
Detail: upstreamDetail,
})
// 记录上游错误详情便于排障(可选:由配置控制;不回显到客户端)
if logBody {
log.Printf("[antigravity-Forward] upstream_error status=%d body=%s", upstreamStatus, truncateForLog(body, maxBytes))
}
var statusCode int
var errType, errMsg string
......@@ -1712,7 +1941,10 @@ func (s *AntigravityGatewayService) writeMappedClaudeError(c *gin.Context, upstr
"type": "error",
"error": gin.H{"type": errType, "message": errMsg},
})
if upstreamMsg == "" {
return fmt.Errorf("upstream error: %d", upstreamStatus)
}
return fmt.Errorf("upstream error: %d message=%s", upstreamStatus, upstreamMsg)
}
func (s *AntigravityGatewayService) writeGoogleError(c *gin.Context, status int, message string) error {
......
......@@ -357,7 +357,7 @@ func (s *AuthService) Login(ctx context.Context, email, password string) (string
// - 如果邮箱已存在:直接登录(不需要本地密码)
// - 如果邮箱不存在:创建新用户并登录
//
// 注意:该函数用于“终端用户登录 Sub2API 本身”的场景(不同于上游账号的 OAuth,例如 OpenAI/Gemini)。
// 注意:该函数用于 LinuxDo OAuth 登录场景(不同于上游账号的 OAuth,例如 Claude/OpenAI/Gemini)。
// 为了满足现有数据库约束(需要密码哈希),新用户会生成随机密码并进行哈希保存。
func (s *AuthService) LoginOrRegisterOAuth(ctx context.Context, email, username string) (string, *User, error) {
email = strings.TrimSpace(email)
......@@ -376,8 +376,8 @@ func (s *AuthService) LoginOrRegisterOAuth(ctx context.Context, email, username
user, err := s.userRepo.GetByEmail(ctx, email)
if err != nil {
if errors.Is(err, ErrUserNotFound) {
// OAuth 首次登录视为注册
if s.settingService != nil && !s.settingService.IsRegistrationEnabled(ctx) {
// OAuth 首次登录视为注册(fail-close:settingService 未配置时不允许注册)
if s.settingService == nil || !s.settingService.IsRegistrationEnabled(ctx) {
return "", nil, ErrRegDisabled
}
......
......@@ -63,6 +63,9 @@ const (
SubscriptionStatusSuspended = "suspended"
)
// LinuxDoConnectSyntheticEmailDomain 是 LinuxDo Connect 用户的合成邮箱后缀(RFC 保留域名)。
const LinuxDoConnectSyntheticEmailDomain = "@linuxdo-connect.invalid"
// Setting keys
const (
// 注册设置
......@@ -83,6 +86,12 @@ const (
SettingKeyTurnstileSiteKey = "turnstile_site_key" // Turnstile Site Key
SettingKeyTurnstileSecretKey = "turnstile_secret_key" // Turnstile Secret Key
// LinuxDo Connect OAuth 登录设置
SettingKeyLinuxDoConnectEnabled = "linuxdo_connect_enabled"
SettingKeyLinuxDoConnectClientID = "linuxdo_connect_client_id"
SettingKeyLinuxDoConnectClientSecret = "linuxdo_connect_client_secret"
SettingKeyLinuxDoConnectRedirectURL = "linuxdo_connect_redirect_url"
// OEM设置
SettingKeySiteName = "site_name" // 网站名称
SettingKeySiteLogo = "site_logo" // 网站Logo (base64)
......@@ -113,16 +122,31 @@ const (
SettingKeyEnableIdentityPatch = "enable_identity_patch"
SettingKeyIdentityPatchPrompt = "identity_patch_prompt"
// LinuxDo Connect OAuth 登录(终端用户 SSO)
SettingKeyLinuxDoConnectEnabled = "linuxdo_connect_enabled"
SettingKeyLinuxDoConnectClientID = "linuxdo_connect_client_id"
SettingKeyLinuxDoConnectClientSecret = "linuxdo_connect_client_secret"
SettingKeyLinuxDoConnectRedirectURL = "linuxdo_connect_redirect_url"
)
// =========================
// Ops Monitoring (vNext)
// =========================
// LinuxDoConnectSyntheticEmailDomain 是 LinuxDo Connect 用户的合成邮箱后缀(RFC 保留域名)。
// 目的:避免第三方登录返回的用户标识与本地真实邮箱发生碰撞,进而造成账号被接管的风险。
const LinuxDoConnectSyntheticEmailDomain = "@linuxdo-connect.invalid"
// SettingKeyOpsMonitoringEnabled is a DB-backed soft switch to enable/disable ops module at runtime.
SettingKeyOpsMonitoringEnabled = "ops_monitoring_enabled"
// SettingKeyOpsRealtimeMonitoringEnabled controls realtime features (e.g. WS/QPS push).
SettingKeyOpsRealtimeMonitoringEnabled = "ops_realtime_monitoring_enabled"
// SettingKeyOpsQueryModeDefault controls the default query mode for ops dashboard (auto/raw/preagg).
SettingKeyOpsQueryModeDefault = "ops_query_mode_default"
// SettingKeyOpsEmailNotificationConfig stores JSON config for ops email notifications.
SettingKeyOpsEmailNotificationConfig = "ops_email_notification_config"
// SettingKeyOpsAlertRuntimeSettings stores JSON config for ops alert evaluator runtime settings.
SettingKeyOpsAlertRuntimeSettings = "ops_alert_runtime_settings"
// SettingKeyOpsMetricsIntervalSeconds controls the ops metrics collector interval (>=60).
SettingKeyOpsMetricsIntervalSeconds = "ops_metrics_interval_seconds"
// SettingKeyOpsAdvancedSettings stores JSON config for ops advanced settings (data retention, aggregation).
SettingKeyOpsAdvancedSettings = "ops_advanced_settings"
)
// AdminAPIKeyPrefix is the prefix for admin API keys (distinct from user "sk-" keys).
const AdminAPIKeyPrefix = "admin-"
......@@ -1399,7 +1399,24 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
return nil, fmt.Errorf("upstream request failed: %w", err)
// Ensure the client receives an error response (handlers assume Forward writes on non-failover errors).
safeErr := sanitizeUpstreamErrorMessage(err.Error())
setOpsUpstreamError(c, 0, safeErr, "")
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: 0,
Kind: "request_error",
Message: safeErr,
})
c.JSON(http.StatusBadGateway, gin.H{
"type": "error",
"error": gin.H{
"type": "upstream_error",
"message": "Upstream request failed",
},
})
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
}
// 优先检测thinking block签名错误(400)并重试一次
......@@ -1409,6 +1426,21 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
_ = resp.Body.Close()
if s.isThinkingBlockSignatureError(respBody) {
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "signature_error",
Message: extractUpstreamErrorMessage(respBody),
Detail: func() string {
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
return truncateString(string(respBody), s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes)
}
return ""
}(),
})
looksLikeToolSignatureError := func(msg string) bool {
m := strings.ToLower(msg)
return strings.Contains(m, "tool_use") ||
......@@ -1445,6 +1477,20 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
retryRespBody, retryReadErr := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
_ = retryResp.Body.Close()
if retryReadErr == nil && retryResp.StatusCode == 400 && s.isThinkingBlockSignatureError(retryRespBody) {
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: retryResp.StatusCode,
UpstreamRequestID: retryResp.Header.Get("x-request-id"),
Kind: "signature_retry_thinking",
Message: extractUpstreamErrorMessage(retryRespBody),
Detail: func() string {
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
return truncateString(string(retryRespBody), s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes)
}
return ""
}(),
})
msg2 := extractUpstreamErrorMessage(retryRespBody)
if looksLikeToolSignatureError(msg2) && time.Since(retryStart) < maxRetryElapsed {
log.Printf("Account %d: signature retry still failing and looks tool-related, retrying with tool blocks downgraded", account.ID)
......@@ -1459,6 +1505,13 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
if retryResp2 != nil && retryResp2.Body != nil {
_ = retryResp2.Body.Close()
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: 0,
Kind: "signature_retry_tools_request_error",
Message: sanitizeUpstreamErrorMessage(retryErr2.Error()),
})
log.Printf("Account %d: tool-downgrade signature retry failed: %v", account.ID, retryErr2)
} else {
log.Printf("Account %d: tool-downgrade signature retry build failed: %v", account.ID, buildErr2)
......@@ -1508,9 +1561,24 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
break
}
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "retry",
Message: extractUpstreamErrorMessage(respBody),
Detail: func() string {
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
return truncateString(string(respBody), s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes)
}
return ""
}(),
})
log.Printf("Account %d: upstream error %d, retry %d/%d after %v (elapsed=%v/%v)",
account.ID, resp.StatusCode, attempt, maxRetryAttempts, delay, elapsed, maxRetryElapsed)
_ = resp.Body.Close()
if err := sleepWithContext(ctx, delay); err != nil {
return nil, err
}
......@@ -1538,7 +1606,25 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
// 处理重试耗尽的情况
if resp.StatusCode >= 400 && s.shouldRetryUpstreamError(account, resp.StatusCode) {
if s.shouldFailoverUpstreamError(resp.StatusCode) {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
resp.Body = io.NopCloser(bytes.NewReader(respBody))
s.handleRetryExhaustedSideEffects(ctx, resp, account)
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "retry_exhausted_failover",
Message: extractUpstreamErrorMessage(respBody),
Detail: func() string {
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
return truncateString(string(respBody), s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes)
}
return ""
}(),
})
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
return s.handleRetryExhaustedError(ctx, resp, c, account)
......@@ -1546,7 +1632,25 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
// 处理可切换账号的错误
if resp.StatusCode >= 400 && s.shouldFailoverUpstreamError(resp.StatusCode) {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
resp.Body = io.NopCloser(bytes.NewReader(respBody))
s.handleFailoverSideEffects(ctx, resp, account)
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "failover",
Message: extractUpstreamErrorMessage(respBody),
Detail: func() string {
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
return truncateString(string(respBody), s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes)
}
return ""
}(),
})
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
......@@ -1563,6 +1667,26 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
resp.Body = io.NopCloser(bytes.NewReader(respBody))
if s.shouldFailoverOn400(respBody) {
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(respBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "failover_on_400",
Message: upstreamMsg,
Detail: upstreamDetail,
})
if s.cfg.Gateway.LogUpstreamErrorBody {
log.Printf(
"Account %d: 400 error, attempting failover: %s",
......@@ -1859,7 +1983,30 @@ func extractUpstreamErrorMessage(body []byte) string {
}
func (s *GatewayService) handleErrorResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *Account) (*ForwardResult, error) {
body, _ := io.ReadAll(resp.Body)
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(body))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
// Enrich Ops error logs with upstream status + message, and optionally a truncated body snippet.
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(body), maxBytes)
}
setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail)
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "http_error",
Message: upstreamMsg,
Detail: upstreamDetail,
})
// 处理上游错误,标记账号状态
shouldDisable := false
......@@ -1870,24 +2017,33 @@ func (s *GatewayService) handleErrorResponse(ctx context.Context, resp *http.Res
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
// 根据状态码返回适当的自定义错误响应(不透传上游详细信息)
var errType, errMsg string
var statusCode int
switch resp.StatusCode {
case 400:
// 仅记录上游错误摘要(避免输出请求内容);需要时可通过配置打开
// 记录上游错误响应体摘要便于排障(可选:由配置控制;不回显到客户端)
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
log.Printf(
"Upstream 400 error (account=%d platform=%s type=%s): %s",
"Upstream error %d (account=%d platform=%s type=%s): %s",
resp.StatusCode,
account.ID,
account.Platform,
account.Type,
truncateForLog(body, s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes),
)
}
// 根据状态码返回适当的自定义错误响应(不透传上游详细信息)
var errType, errMsg string
var statusCode int
switch resp.StatusCode {
case 400:
c.Data(http.StatusBadRequest, "application/json", body)
summary := upstreamMsg
if summary == "" {
summary = truncateForLog(body, 512)
}
if summary == "" {
return nil, fmt.Errorf("upstream error: %d", resp.StatusCode)
}
return nil, fmt.Errorf("upstream error: %d message=%s", resp.StatusCode, summary)
case 401:
statusCode = http.StatusBadGateway
errType = "upstream_error"
......@@ -1923,11 +2079,14 @@ func (s *GatewayService) handleErrorResponse(ctx context.Context, resp *http.Res
},
})
if upstreamMsg == "" {
return nil, fmt.Errorf("upstream error: %d", resp.StatusCode)
}
return nil, fmt.Errorf("upstream error: %d message=%s", resp.StatusCode, upstreamMsg)
}
func (s *GatewayService) handleRetryExhaustedSideEffects(ctx context.Context, resp *http.Response, account *Account) {
body, _ := io.ReadAll(resp.Body)
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
statusCode := resp.StatusCode
// OAuth/Setup Token 账号的 403:标记账号异常
......@@ -1941,7 +2100,7 @@ func (s *GatewayService) handleRetryExhaustedSideEffects(ctx context.Context, re
}
func (s *GatewayService) handleFailoverSideEffects(ctx context.Context, resp *http.Response, account *Account) {
body, _ := io.ReadAll(resp.Body)
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, body)
}
......@@ -1949,8 +2108,45 @@ func (s *GatewayService) handleFailoverSideEffects(ctx context.Context, resp *ht
// OAuth 403:标记账号异常
// API Key 未配置错误码:仅返回错误,不标记账号
func (s *GatewayService) handleRetryExhaustedError(ctx context.Context, resp *http.Response, c *gin.Context, account *Account) (*ForwardResult, error) {
// Capture upstream error body before side-effects consume the stream.
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
resp.Body = io.NopCloser(bytes.NewReader(respBody))
s.handleRetryExhaustedSideEffects(ctx, resp, account)
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(respBody), maxBytes)
}
setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail)
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "retry_exhausted",
Message: upstreamMsg,
Detail: upstreamDetail,
})
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
log.Printf(
"Upstream error %d retries_exhausted (account=%d platform=%s type=%s): %s",
resp.StatusCode,
account.ID,
account.Platform,
account.Type,
truncateForLog(respBody, s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes),
)
}
// 返回统一的重试耗尽错误响应
c.JSON(http.StatusBadGateway, gin.H{
"type": "error",
......@@ -1960,7 +2156,10 @@ func (s *GatewayService) handleRetryExhaustedError(ctx context.Context, resp *ht
},
})
if upstreamMsg == "" {
return nil, fmt.Errorf("upstream error: %d (retries exhausted)", resp.StatusCode)
}
return nil, fmt.Errorf("upstream error: %d (retries exhausted) message=%s", resp.StatusCode, upstreamMsg)
}
// streamingResult 流式响应结果
......@@ -2490,6 +2689,7 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context,
// 发送请求
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil {
setOpsUpstreamError(c, 0, sanitizeUpstreamErrorMessage(err.Error()), "")
s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Request failed")
return fmt.Errorf("upstream request failed: %w", err)
}
......@@ -2527,6 +2727,18 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context,
// 标记账号状态(429/529等)
s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(respBody), maxBytes)
}
setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail)
// 记录上游错误摘要便于排障(不回显请求内容)
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
log.Printf(
......@@ -2548,8 +2760,11 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context,
errMsg = "Service overloaded"
}
s.countTokensError(c, resp.StatusCode, "upstream_error", errMsg)
if upstreamMsg == "" {
return fmt.Errorf("upstream error: %d", resp.StatusCode)
}
return fmt.Errorf("upstream error: %d message=%s", resp.StatusCode, upstreamMsg)
}
// 透传成功响应
c.Data(resp.StatusCode, "application/json", respBody)
......
......@@ -543,12 +543,21 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil {
safeErr := sanitizeUpstreamErrorMessage(err.Error())
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: 0,
Kind: "request_error",
Message: safeErr,
})
if attempt < geminiMaxRetries {
log.Printf("Gemini account %d: upstream request failed, retry %d/%d: %v", account.ID, attempt, geminiMaxRetries, err)
sleepGeminiBackoff(attempt)
continue
}
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries: "+sanitizeUpstreamErrorMessage(err.Error()))
setOpsUpstreamError(c, 0, safeErr, "")
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries: "+safeErr)
}
// Special-case: signature/thought_signature validation errors are not transient, but may be fixed by
......@@ -558,6 +567,30 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
_ = resp.Body.Close()
if isGeminiSignatureRelatedError(respBody) {
upstreamReqID := resp.Header.Get(requestIDHeader)
if upstreamReqID == "" {
upstreamReqID = resp.Header.Get("x-goog-request-id")
}
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(respBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: upstreamReqID,
Kind: "signature_error",
Message: upstreamMsg,
Detail: upstreamDetail,
})
var strippedClaudeBody []byte
stageName := ""
switch signatureRetryStage {
......@@ -608,6 +641,30 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
s.handleGeminiUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
}
if attempt < geminiMaxRetries {
upstreamReqID := resp.Header.Get(requestIDHeader)
if upstreamReqID == "" {
upstreamReqID = resp.Header.Get("x-goog-request-id")
}
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(respBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: upstreamReqID,
Kind: "retry",
Message: upstreamMsg,
Detail: upstreamDetail,
})
log.Printf("Gemini account %d: upstream status %d, retry %d/%d", account.ID, resp.StatusCode, attempt, geminiMaxRetries)
sleepGeminiBackoff(attempt)
continue
......@@ -633,12 +690,62 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
}
s.handleGeminiUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
if tempMatched {
upstreamReqID := resp.Header.Get(requestIDHeader)
if upstreamReqID == "" {
upstreamReqID = resp.Header.Get("x-goog-request-id")
}
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(respBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: upstreamReqID,
Kind: "failover",
Message: upstreamMsg,
Detail: upstreamDetail,
})
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
if s.shouldFailoverGeminiUpstreamError(resp.StatusCode) {
upstreamReqID := resp.Header.Get(requestIDHeader)
if upstreamReqID == "" {
upstreamReqID = resp.Header.Get("x-goog-request-id")
}
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(respBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: upstreamReqID,
Kind: "failover",
Message: upstreamMsg,
Detail: upstreamDetail,
})
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
return nil, s.writeGeminiMappedError(c, resp.StatusCode, respBody)
upstreamReqID := resp.Header.Get(requestIDHeader)
if upstreamReqID == "" {
upstreamReqID = resp.Header.Get("x-goog-request-id")
}
return nil, s.writeGeminiMappedError(c, account, resp.StatusCode, upstreamReqID, respBody)
}
requestID := resp.Header.Get(requestIDHeader)
......@@ -863,6 +970,14 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin.
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil {
safeErr := sanitizeUpstreamErrorMessage(err.Error())
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: 0,
Kind: "request_error",
Message: safeErr,
})
if attempt < geminiMaxRetries {
log.Printf("Gemini account %d: upstream request failed, retry %d/%d: %v", account.ID, attempt, geminiMaxRetries, err)
sleepGeminiBackoff(attempt)
......@@ -880,7 +995,8 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin.
FirstTokenMs: nil,
}, nil
}
return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries: "+sanitizeUpstreamErrorMessage(err.Error()))
setOpsUpstreamError(c, 0, safeErr, "")
return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries: "+safeErr)
}
if resp.StatusCode >= 400 && s.shouldRetryGeminiUpstreamError(account, resp.StatusCode) {
......@@ -899,6 +1015,30 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin.
s.handleGeminiUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
}
if attempt < geminiMaxRetries {
upstreamReqID := resp.Header.Get(requestIDHeader)
if upstreamReqID == "" {
upstreamReqID = resp.Header.Get("x-goog-request-id")
}
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(respBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: upstreamReqID,
Kind: "retry",
Message: upstreamMsg,
Detail: upstreamDetail,
})
log.Printf("Gemini account %d: upstream status %d, retry %d/%d", account.ID, resp.StatusCode, attempt, geminiMaxRetries)
sleepGeminiBackoff(attempt)
continue
......@@ -962,20 +1102,85 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin.
}
if tempMatched {
evBody := unwrapIfNeeded(isOAuth, respBody)
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(evBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(evBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: requestID,
Kind: "failover",
Message: upstreamMsg,
Detail: upstreamDetail,
})
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
if s.shouldFailoverGeminiUpstreamError(resp.StatusCode) {
evBody := unwrapIfNeeded(isOAuth, respBody)
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(evBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(evBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: requestID,
Kind: "failover",
Message: upstreamMsg,
Detail: upstreamDetail,
})
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
respBody = unwrapIfNeeded(isOAuth, respBody)
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(respBody), maxBytes)
log.Printf("[Gemini] native upstream error %d: %s", resp.StatusCode, truncateForLog(respBody, s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes))
}
setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail)
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: requestID,
Kind: "http_error",
Message: upstreamMsg,
Detail: upstreamDetail,
})
contentType := resp.Header.Get("Content-Type")
if contentType == "" {
contentType = "application/json"
}
c.Data(resp.StatusCode, contentType, respBody)
if upstreamMsg == "" {
return nil, fmt.Errorf("gemini upstream error: %d", resp.StatusCode)
}
return nil, fmt.Errorf("gemini upstream error: %d message=%s", resp.StatusCode, upstreamMsg)
}
var usage *ClaudeUsage
var firstTokenMs *int
......@@ -1076,7 +1281,32 @@ func sanitizeUpstreamErrorMessage(msg string) string {
return sensitiveQueryParamRegex.ReplaceAllString(msg, `$1***`)
}
func (s *GeminiMessagesCompatService) writeGeminiMappedError(c *gin.Context, upstreamStatus int, body []byte) error {
func (s *GeminiMessagesCompatService) writeGeminiMappedError(c *gin.Context, account *Account, upstreamStatus int, upstreamRequestID string, body []byte) error {
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(body))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(body), maxBytes)
}
setOpsUpstreamError(c, upstreamStatus, upstreamMsg, upstreamDetail)
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: upstreamStatus,
UpstreamRequestID: upstreamRequestID,
Kind: "http_error",
Message: upstreamMsg,
Detail: upstreamDetail,
})
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
log.Printf("[Gemini] upstream error %d: %s", upstreamStatus, truncateForLog(body, s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes))
}
var statusCode int
var errType, errMsg string
......@@ -1184,7 +1414,10 @@ func (s *GeminiMessagesCompatService) writeGeminiMappedError(c *gin.Context, ups
"type": "error",
"error": gin.H{"type": errType, "message": errMsg},
})
if upstreamMsg == "" {
return fmt.Errorf("upstream error: %d", upstreamStatus)
}
return fmt.Errorf("upstream error: %d message=%s", upstreamStatus, upstreamMsg)
}
type claudeErrorMapping struct {
......
......@@ -115,12 +115,6 @@ func applyCodexOAuthTransform(reqBody map[string]any) codexTransformResult {
existingInstructions = strings.TrimSpace(existingInstructions)
if instructions != "" {
if existingInstructions != "" && existingInstructions != instructions {
if input, ok := reqBody["input"].([]any); ok {
reqBody["input"] = prependSystemInstruction(input, existingInstructions)
result.Modified = true
}
}
if existingInstructions != instructions {
reqBody["instructions"] = instructions
result.Modified = true
......@@ -129,7 +123,6 @@ func applyCodexOAuthTransform(reqBody map[string]any) codexTransformResult {
if input, ok := reqBody["input"].([]any); ok {
input = filterCodexInput(input)
input = normalizeOrphanedToolOutputs(input)
reqBody["input"] = input
result.Modified = true
}
......@@ -266,19 +259,6 @@ func filterCodexInput(input []any) []any {
return filtered
}
func prependSystemInstruction(input []any, instructions string) []any {
message := map[string]any{
"role": "system",
"content": []any{
map[string]any{
"type": "input_text",
"text": instructions,
},
},
}
return append([]any{message}, input...)
}
func normalizeCodexTools(reqBody map[string]any) bool {
rawTools, ok := reqBody["tools"]
if !ok || rawTools == nil {
......@@ -341,110 +321,6 @@ func normalizeCodexTools(reqBody map[string]any) bool {
return modified
}
func normalizeOrphanedToolOutputs(input []any) []any {
functionCallIDs := map[string]bool{}
localShellCallIDs := map[string]bool{}
customToolCallIDs := map[string]bool{}
for _, item := range input {
m, ok := item.(map[string]any)
if !ok {
continue
}
callID := getCallID(m)
if callID == "" {
continue
}
switch m["type"] {
case "function_call":
functionCallIDs[callID] = true
case "local_shell_call":
localShellCallIDs[callID] = true
case "custom_tool_call":
customToolCallIDs[callID] = true
}
}
output := make([]any, 0, len(input))
for _, item := range input {
m, ok := item.(map[string]any)
if !ok {
output = append(output, item)
continue
}
switch m["type"] {
case "function_call_output":
callID := getCallID(m)
if callID == "" || (!functionCallIDs[callID] && !localShellCallIDs[callID]) {
output = append(output, convertOrphanedOutputToMessage(m, callID))
continue
}
case "custom_tool_call_output":
callID := getCallID(m)
if callID == "" || !customToolCallIDs[callID] {
output = append(output, convertOrphanedOutputToMessage(m, callID))
continue
}
case "local_shell_call_output":
callID := getCallID(m)
if callID == "" || !localShellCallIDs[callID] {
output = append(output, convertOrphanedOutputToMessage(m, callID))
continue
}
}
output = append(output, m)
}
return output
}
func getCallID(item map[string]any) string {
raw, ok := item["call_id"]
if !ok {
return ""
}
callID, ok := raw.(string)
if !ok {
return ""
}
callID = strings.TrimSpace(callID)
if callID == "" {
return ""
}
return callID
}
func convertOrphanedOutputToMessage(item map[string]any, callID string) map[string]any {
toolName := "tool"
if name, ok := item["name"].(string); ok && name != "" {
toolName = name
}
labelID := callID
if labelID == "" {
labelID = "unknown"
}
text := stringifyOutput(item["output"])
if len(text) > 16000 {
text = text[:16000] + "\n...[truncated]"
}
return map[string]any{
"type": "message",
"role": "assistant",
"content": fmt.Sprintf("[Previous %s result; call_id=%s]: %s", toolName, labelID, text),
}
}
func stringifyOutput(output any) string {
switch v := output.(type) {
case string:
return v
default:
if data, err := json.Marshal(v); err == nil {
return string(data)
}
return fmt.Sprintf("%v", v)
}
}
func codexCachePath(filename string) string {
home, err := os.UserHomeDir()
if err != nil {
......
......@@ -12,7 +12,6 @@ import (
"io"
"log"
"net/http"
"os"
"regexp"
"sort"
"strconv"
......@@ -513,7 +512,7 @@ func (s *OpenAIGatewayService) shouldFailoverUpstreamError(statusCode int) bool
}
func (s *OpenAIGatewayService) handleFailoverSideEffects(ctx context.Context, resp *http.Response, account *Account) {
body, _ := io.ReadAll(resp.Body)
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, body)
}
......@@ -594,13 +593,53 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
// Send request
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil {
return nil, fmt.Errorf("upstream request failed: %w", err)
// Ensure the client receives an error response (handlers assume Forward writes on non-failover errors).
safeErr := sanitizeUpstreamErrorMessage(err.Error())
setOpsUpstreamError(c, 0, safeErr, "")
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: 0,
Kind: "request_error",
Message: safeErr,
})
c.JSON(http.StatusBadGateway, gin.H{
"error": gin.H{
"type": "upstream_error",
"message": "Upstream request failed",
},
})
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
}
defer func() { _ = resp.Body.Close() }()
// Handle error response
if resp.StatusCode >= 400 {
if s.shouldFailoverUpstreamError(resp.StatusCode) {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
resp.Body = io.NopCloser(bytes.NewReader(respBody))
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(respBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "failover",
Message: upstreamMsg,
Detail: upstreamDetail,
})
s.handleFailoverSideEffects(ctx, resp, account)
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
......@@ -724,25 +763,72 @@ func (s *OpenAIGatewayService) buildUpstreamRequest(ctx context.Context, c *gin.
}
func (s *OpenAIGatewayService) handleErrorResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *Account) (*OpenAIForwardResult, error) {
body, _ := io.ReadAll(resp.Body)
logUpstreamErrorBody(account.ID, resp.StatusCode, body)
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(body))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(body), maxBytes)
}
setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail)
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
log.Printf(
"OpenAI upstream error %d (account=%d platform=%s type=%s): %s",
resp.StatusCode,
account.ID,
account.Platform,
account.Type,
truncateForLog(body, s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes),
)
}
// Check custom error codes
if !account.ShouldHandleErrorCode(resp.StatusCode) {
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "http_error",
Message: upstreamMsg,
Detail: upstreamDetail,
})
c.JSON(http.StatusInternalServerError, gin.H{
"error": gin.H{
"type": "upstream_error",
"message": "Upstream gateway error",
},
})
if upstreamMsg == "" {
return nil, fmt.Errorf("upstream error: %d (not in custom error codes)", resp.StatusCode)
}
return nil, fmt.Errorf("upstream error: %d (not in custom error codes) message=%s", resp.StatusCode, upstreamMsg)
}
// Handle upstream error (mark account status)
shouldDisable := false
if s.rateLimitService != nil {
shouldDisable = s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, body)
}
kind := "http_error"
if shouldDisable {
kind = "failover"
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: kind,
Message: upstreamMsg,
Detail: upstreamDetail,
})
if shouldDisable {
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
......@@ -781,25 +867,10 @@ func (s *OpenAIGatewayService) handleErrorResponse(ctx context.Context, resp *ht
},
})
if upstreamMsg == "" {
return nil, fmt.Errorf("upstream error: %d", resp.StatusCode)
}
func logUpstreamErrorBody(accountID int64, statusCode int, body []byte) {
if strings.ToLower(strings.TrimSpace(os.Getenv("GATEWAY_LOG_UPSTREAM_ERROR_BODY"))) != "true" {
return
}
maxBytes := 2048
if rawMax := strings.TrimSpace(os.Getenv("GATEWAY_LOG_UPSTREAM_ERROR_BODY_MAX_BYTES")); rawMax != "" {
if parsed, err := strconv.Atoi(rawMax); err == nil && parsed > 0 {
maxBytes = parsed
}
}
if len(body) > maxBytes {
body = body[:maxBytes]
}
log.Printf("Upstream error body: account=%d status=%d body=%q", accountID, statusCode, string(body))
return nil, fmt.Errorf("upstream error: %d message=%s", resp.StatusCode, upstreamMsg)
}
// openaiStreamingResult streaming response result
......
package service
import (
"context"
"errors"
"time"
)
// GetAccountAvailabilityStats returns current account availability stats.
//
// Query-level filtering is intentionally limited to platform/group to match the dashboard scope.
func (s *OpsService) GetAccountAvailabilityStats(ctx context.Context, platformFilter string, groupIDFilter *int64) (
map[string]*PlatformAvailability,
map[int64]*GroupAvailability,
map[int64]*AccountAvailability,
*time.Time,
error,
) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, nil, nil, nil, err
}
accounts, err := s.listAllAccountsForOps(ctx, platformFilter)
if err != nil {
return nil, nil, nil, nil, err
}
if groupIDFilter != nil && *groupIDFilter > 0 {
filtered := make([]Account, 0, len(accounts))
for _, acc := range accounts {
for _, grp := range acc.Groups {
if grp != nil && grp.ID == *groupIDFilter {
filtered = append(filtered, acc)
break
}
}
}
accounts = filtered
}
now := time.Now()
collectedAt := now
platform := make(map[string]*PlatformAvailability)
group := make(map[int64]*GroupAvailability)
account := make(map[int64]*AccountAvailability)
for _, acc := range accounts {
if acc.ID <= 0 {
continue
}
isTempUnsched := false
if acc.TempUnschedulableUntil != nil && now.Before(*acc.TempUnschedulableUntil) {
isTempUnsched = true
}
isRateLimited := acc.RateLimitResetAt != nil && now.Before(*acc.RateLimitResetAt)
isOverloaded := acc.OverloadUntil != nil && now.Before(*acc.OverloadUntil)
hasError := acc.Status == StatusError
// Normalize exclusive status flags so the UI doesn't show conflicting badges.
if hasError {
isRateLimited = false
isOverloaded = false
}
isAvailable := acc.Status == StatusActive && acc.Schedulable && !isRateLimited && !isOverloaded && !isTempUnsched
if acc.Platform != "" {
if _, ok := platform[acc.Platform]; !ok {
platform[acc.Platform] = &PlatformAvailability{
Platform: acc.Platform,
}
}
p := platform[acc.Platform]
p.TotalAccounts++
if isAvailable {
p.AvailableCount++
}
if isRateLimited {
p.RateLimitCount++
}
if hasError {
p.ErrorCount++
}
}
for _, grp := range acc.Groups {
if grp == nil || grp.ID <= 0 {
continue
}
if _, ok := group[grp.ID]; !ok {
group[grp.ID] = &GroupAvailability{
GroupID: grp.ID,
GroupName: grp.Name,
Platform: grp.Platform,
}
}
g := group[grp.ID]
g.TotalAccounts++
if isAvailable {
g.AvailableCount++
}
if isRateLimited {
g.RateLimitCount++
}
if hasError {
g.ErrorCount++
}
}
displayGroupID := int64(0)
displayGroupName := ""
if len(acc.Groups) > 0 && acc.Groups[0] != nil {
displayGroupID = acc.Groups[0].ID
displayGroupName = acc.Groups[0].Name
}
item := &AccountAvailability{
AccountID: acc.ID,
AccountName: acc.Name,
Platform: acc.Platform,
GroupID: displayGroupID,
GroupName: displayGroupName,
Status: acc.Status,
IsAvailable: isAvailable,
IsRateLimited: isRateLimited,
IsOverloaded: isOverloaded,
HasError: hasError,
ErrorMessage: acc.ErrorMessage,
}
if isRateLimited && acc.RateLimitResetAt != nil {
item.RateLimitResetAt = acc.RateLimitResetAt
remainingSec := int64(time.Until(*acc.RateLimitResetAt).Seconds())
if remainingSec > 0 {
item.RateLimitRemainingSec = &remainingSec
}
}
if isOverloaded && acc.OverloadUntil != nil {
item.OverloadUntil = acc.OverloadUntil
remainingSec := int64(time.Until(*acc.OverloadUntil).Seconds())
if remainingSec > 0 {
item.OverloadRemainingSec = &remainingSec
}
}
if isTempUnsched && acc.TempUnschedulableUntil != nil {
item.TempUnschedulableUntil = acc.TempUnschedulableUntil
}
account[acc.ID] = item
}
return platform, group, account, &collectedAt, nil
}
type OpsAccountAvailability struct {
Group *GroupAvailability
Accounts map[int64]*AccountAvailability
CollectedAt *time.Time
}
func (s *OpsService) GetAccountAvailability(ctx context.Context, platformFilter string, groupIDFilter *int64) (*OpsAccountAvailability, error) {
if s == nil {
return nil, errors.New("ops service is nil")
}
if s.getAccountAvailability != nil {
return s.getAccountAvailability(ctx, platformFilter, groupIDFilter)
}
_, groupStats, accountStats, collectedAt, err := s.GetAccountAvailabilityStats(ctx, platformFilter, groupIDFilter)
if err != nil {
return nil, err
}
var group *GroupAvailability
if groupIDFilter != nil && *groupIDFilter > 0 {
group = groupStats[*groupIDFilter]
}
if accountStats == nil {
accountStats = map[int64]*AccountAvailability{}
}
return &OpsAccountAvailability{
Group: group,
Accounts: accountStats,
CollectedAt: collectedAt,
}, nil
}
package service
import (
"context"
"database/sql"
"hash/fnv"
"time"
)
func hashAdvisoryLockID(key string) int64 {
h := fnv.New64a()
_, _ = h.Write([]byte(key))
return int64(h.Sum64())
}
func tryAcquireDBAdvisoryLock(ctx context.Context, db *sql.DB, lockID int64) (func(), bool) {
if db == nil {
return nil, false
}
if ctx == nil {
ctx = context.Background()
}
conn, err := db.Conn(ctx)
if err != nil {
return nil, false
}
acquired := false
if err := conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", lockID).Scan(&acquired); err != nil {
_ = conn.Close()
return nil, false
}
if !acquired {
_ = conn.Close()
return nil, false
}
release := func() {
unlockCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, _ = conn.ExecContext(unlockCtx, "SELECT pg_advisory_unlock($1)", lockID)
_ = conn.Close()
}
return release, true
}
package service
import (
"context"
"database/sql"
"errors"
"log"
"strings"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
)
const (
opsAggHourlyJobName = "ops_preaggregation_hourly"
opsAggDailyJobName = "ops_preaggregation_daily"
opsAggHourlyInterval = 10 * time.Minute
opsAggDailyInterval = 1 * time.Hour
// Keep in sync with ops retention target (vNext default 30d).
opsAggBackfillWindow = 30 * 24 * time.Hour
// Recompute overlap to absorb late-arriving rows near boundaries.
opsAggHourlyOverlap = 2 * time.Hour
opsAggDailyOverlap = 48 * time.Hour
opsAggHourlyChunk = 24 * time.Hour
opsAggDailyChunk = 7 * 24 * time.Hour
// Delay around boundaries (e.g. 10:00..10:05) to avoid aggregating buckets
// that may still receive late inserts.
opsAggSafeDelay = 5 * time.Minute
opsAggMaxQueryTimeout = 3 * time.Second
opsAggHourlyTimeout = 5 * time.Minute
opsAggDailyTimeout = 2 * time.Minute
opsAggHourlyLeaderLockKey = "ops:aggregation:hourly:leader"
opsAggDailyLeaderLockKey = "ops:aggregation:daily:leader"
opsAggHourlyLeaderLockTTL = 15 * time.Minute
opsAggDailyLeaderLockTTL = 10 * time.Minute
)
// OpsAggregationService periodically backfills ops_metrics_hourly / ops_metrics_daily
// for stable long-window dashboard queries.
//
// It is safe to run in multi-replica deployments when Redis is available (leader lock).
type OpsAggregationService struct {
opsRepo OpsRepository
settingRepo SettingRepository
cfg *config.Config
db *sql.DB
redisClient *redis.Client
instanceID string
stopCh chan struct{}
startOnce sync.Once
stopOnce sync.Once
hourlyMu sync.Mutex
dailyMu sync.Mutex
skipLogMu sync.Mutex
skipLogAt time.Time
}
func NewOpsAggregationService(
opsRepo OpsRepository,
settingRepo SettingRepository,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
) *OpsAggregationService {
return &OpsAggregationService{
opsRepo: opsRepo,
settingRepo: settingRepo,
cfg: cfg,
db: db,
redisClient: redisClient,
instanceID: uuid.NewString(),
}
}
func (s *OpsAggregationService) Start() {
if s == nil {
return
}
s.startOnce.Do(func() {
if s.stopCh == nil {
s.stopCh = make(chan struct{})
}
go s.hourlyLoop()
go s.dailyLoop()
})
}
func (s *OpsAggregationService) Stop() {
if s == nil {
return
}
s.stopOnce.Do(func() {
if s.stopCh != nil {
close(s.stopCh)
}
})
}
func (s *OpsAggregationService) hourlyLoop() {
// First run immediately.
s.aggregateHourly()
ticker := time.NewTicker(opsAggHourlyInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.aggregateHourly()
case <-s.stopCh:
return
}
}
}
func (s *OpsAggregationService) dailyLoop() {
// First run immediately.
s.aggregateDaily()
ticker := time.NewTicker(opsAggDailyInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.aggregateDaily()
case <-s.stopCh:
return
}
}
}
func (s *OpsAggregationService) aggregateHourly() {
if s == nil || s.opsRepo == nil {
return
}
if s.cfg != nil {
if !s.cfg.Ops.Enabled {
return
}
if !s.cfg.Ops.Aggregation.Enabled {
return
}
}
ctx, cancel := context.WithTimeout(context.Background(), opsAggHourlyTimeout)
defer cancel()
if !s.isMonitoringEnabled(ctx) {
return
}
release, ok := s.tryAcquireLeaderLock(ctx, opsAggHourlyLeaderLockKey, opsAggHourlyLeaderLockTTL, "[OpsAggregation][hourly]")
if !ok {
return
}
if release != nil {
defer release()
}
s.hourlyMu.Lock()
defer s.hourlyMu.Unlock()
startedAt := time.Now().UTC()
runAt := startedAt
// Aggregate stable full hours only.
end := utcFloorToHour(time.Now().UTC().Add(-opsAggSafeDelay))
start := end.Add(-opsAggBackfillWindow)
// Resume from the latest bucket with overlap.
{
ctxMax, cancelMax := context.WithTimeout(context.Background(), opsAggMaxQueryTimeout)
latest, ok, err := s.opsRepo.GetLatestHourlyBucketStart(ctxMax)
cancelMax()
if err != nil {
log.Printf("[OpsAggregation][hourly] failed to read latest bucket: %v", err)
} else if ok {
candidate := latest.Add(-opsAggHourlyOverlap)
if candidate.After(start) {
start = candidate
}
}
}
start = utcFloorToHour(start)
if !start.Before(end) {
return
}
var aggErr error
for cursor := start; cursor.Before(end); cursor = cursor.Add(opsAggHourlyChunk) {
chunkEnd := minTime(cursor.Add(opsAggHourlyChunk), end)
if err := s.opsRepo.UpsertHourlyMetrics(ctx, cursor, chunkEnd); err != nil {
aggErr = err
log.Printf("[OpsAggregation][hourly] upsert failed (%s..%s): %v", cursor.Format(time.RFC3339), chunkEnd.Format(time.RFC3339), err)
break
}
}
finishedAt := time.Now().UTC()
durationMs := finishedAt.Sub(startedAt).Milliseconds()
dur := durationMs
if aggErr != nil {
msg := truncateString(aggErr.Error(), 2048)
errAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer hbCancel()
_ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsAggHourlyJobName,
LastRunAt: &runAt,
LastErrorAt: &errAt,
LastError: &msg,
LastDurationMs: &dur,
})
return
}
successAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer hbCancel()
_ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsAggHourlyJobName,
LastRunAt: &runAt,
LastSuccessAt: &successAt,
LastDurationMs: &dur,
})
}
func (s *OpsAggregationService) aggregateDaily() {
if s == nil || s.opsRepo == nil {
return
}
if s.cfg != nil {
if !s.cfg.Ops.Enabled {
return
}
if !s.cfg.Ops.Aggregation.Enabled {
return
}
}
ctx, cancel := context.WithTimeout(context.Background(), opsAggDailyTimeout)
defer cancel()
if !s.isMonitoringEnabled(ctx) {
return
}
release, ok := s.tryAcquireLeaderLock(ctx, opsAggDailyLeaderLockKey, opsAggDailyLeaderLockTTL, "[OpsAggregation][daily]")
if !ok {
return
}
if release != nil {
defer release()
}
s.dailyMu.Lock()
defer s.dailyMu.Unlock()
startedAt := time.Now().UTC()
runAt := startedAt
end := utcFloorToDay(time.Now().UTC())
start := end.Add(-opsAggBackfillWindow)
{
ctxMax, cancelMax := context.WithTimeout(context.Background(), opsAggMaxQueryTimeout)
latest, ok, err := s.opsRepo.GetLatestDailyBucketDate(ctxMax)
cancelMax()
if err != nil {
log.Printf("[OpsAggregation][daily] failed to read latest bucket: %v", err)
} else if ok {
candidate := latest.Add(-opsAggDailyOverlap)
if candidate.After(start) {
start = candidate
}
}
}
start = utcFloorToDay(start)
if !start.Before(end) {
return
}
var aggErr error
for cursor := start; cursor.Before(end); cursor = cursor.Add(opsAggDailyChunk) {
chunkEnd := minTime(cursor.Add(opsAggDailyChunk), end)
if err := s.opsRepo.UpsertDailyMetrics(ctx, cursor, chunkEnd); err != nil {
aggErr = err
log.Printf("[OpsAggregation][daily] upsert failed (%s..%s): %v", cursor.Format("2006-01-02"), chunkEnd.Format("2006-01-02"), err)
break
}
}
finishedAt := time.Now().UTC()
durationMs := finishedAt.Sub(startedAt).Milliseconds()
dur := durationMs
if aggErr != nil {
msg := truncateString(aggErr.Error(), 2048)
errAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer hbCancel()
_ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsAggDailyJobName,
LastRunAt: &runAt,
LastErrorAt: &errAt,
LastError: &msg,
LastDurationMs: &dur,
})
return
}
successAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer hbCancel()
_ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsAggDailyJobName,
LastRunAt: &runAt,
LastSuccessAt: &successAt,
LastDurationMs: &dur,
})
}
func (s *OpsAggregationService) isMonitoringEnabled(ctx context.Context) bool {
if s == nil {
return false
}
if s.cfg != nil && !s.cfg.Ops.Enabled {
return false
}
if s.settingRepo == nil {
return true
}
if ctx == nil {
ctx = context.Background()
}
value, err := s.settingRepo.GetValue(ctx, SettingKeyOpsMonitoringEnabled)
if err != nil {
if errors.Is(err, ErrSettingNotFound) {
return true
}
return true
}
switch strings.ToLower(strings.TrimSpace(value)) {
case "false", "0", "off", "disabled":
return false
default:
return true
}
}
var opsAggReleaseScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`)
func (s *OpsAggregationService) tryAcquireLeaderLock(ctx context.Context, key string, ttl time.Duration, logPrefix string) (func(), bool) {
if s == nil {
return nil, false
}
if ctx == nil {
ctx = context.Background()
}
// Prefer Redis leader lock when available (multi-instance), but avoid stampeding
// the DB when Redis is flaky by falling back to a DB advisory lock.
if s.redisClient != nil {
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
if err == nil {
if !ok {
s.maybeLogSkip(logPrefix)
return nil, false
}
release := func() {
ctx2, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, _ = opsAggReleaseScript.Run(ctx2, s.redisClient, []string{key}, s.instanceID).Result()
}
return release, true
}
// Redis error: fall through to DB advisory lock.
}
release, ok := tryAcquireDBAdvisoryLock(ctx, s.db, hashAdvisoryLockID(key))
if !ok {
s.maybeLogSkip(logPrefix)
return nil, false
}
return release, true
}
func (s *OpsAggregationService) maybeLogSkip(prefix string) {
s.skipLogMu.Lock()
defer s.skipLogMu.Unlock()
now := time.Now()
if !s.skipLogAt.IsZero() && now.Sub(s.skipLogAt) < time.Minute {
return
}
s.skipLogAt = now
if prefix == "" {
prefix = "[OpsAggregation]"
}
log.Printf("%s leader lock held by another instance; skipping", prefix)
}
func utcFloorToHour(t time.Time) time.Time {
return t.UTC().Truncate(time.Hour)
}
func utcFloorToDay(t time.Time) time.Time {
u := t.UTC()
y, m, d := u.Date()
return time.Date(y, m, d, 0, 0, 0, 0, time.UTC)
}
func minTime(a, b time.Time) time.Time {
if a.Before(b) {
return a
}
return b
}
package service
import (
"context"
"fmt"
"log"
"math"
"strconv"
"strings"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
)
const (
opsAlertEvaluatorJobName = "ops_alert_evaluator"
opsAlertEvaluatorTimeout = 45 * time.Second
opsAlertEvaluatorLeaderLockKey = "ops:alert:evaluator:leader"
opsAlertEvaluatorLeaderLockTTL = 90 * time.Second
opsAlertEvaluatorSkipLogInterval = 1 * time.Minute
)
var opsAlertEvaluatorReleaseScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`)
type OpsAlertEvaluatorService struct {
opsService *OpsService
opsRepo OpsRepository
emailService *EmailService
redisClient *redis.Client
cfg *config.Config
instanceID string
stopCh chan struct{}
startOnce sync.Once
stopOnce sync.Once
wg sync.WaitGroup
mu sync.Mutex
ruleStates map[int64]*opsAlertRuleState
emailLimiter *slidingWindowLimiter
skipLogMu sync.Mutex
skipLogAt time.Time
warnNoRedisOnce sync.Once
}
type opsAlertRuleState struct {
LastEvaluatedAt time.Time
ConsecutiveBreaches int
}
func NewOpsAlertEvaluatorService(
opsService *OpsService,
opsRepo OpsRepository,
emailService *EmailService,
redisClient *redis.Client,
cfg *config.Config,
) *OpsAlertEvaluatorService {
return &OpsAlertEvaluatorService{
opsService: opsService,
opsRepo: opsRepo,
emailService: emailService,
redisClient: redisClient,
cfg: cfg,
instanceID: uuid.NewString(),
ruleStates: map[int64]*opsAlertRuleState{},
emailLimiter: newSlidingWindowLimiter(0, time.Hour),
}
}
func (s *OpsAlertEvaluatorService) Start() {
if s == nil {
return
}
s.startOnce.Do(func() {
if s.stopCh == nil {
s.stopCh = make(chan struct{})
}
go s.run()
})
}
func (s *OpsAlertEvaluatorService) Stop() {
if s == nil {
return
}
s.stopOnce.Do(func() {
if s.stopCh != nil {
close(s.stopCh)
}
})
s.wg.Wait()
}
func (s *OpsAlertEvaluatorService) run() {
s.wg.Add(1)
defer s.wg.Done()
// Start immediately to produce early feedback in ops dashboard.
timer := time.NewTimer(0)
defer timer.Stop()
for {
select {
case <-timer.C:
interval := s.getInterval()
s.evaluateOnce(interval)
timer.Reset(interval)
case <-s.stopCh:
return
}
}
}
func (s *OpsAlertEvaluatorService) getInterval() time.Duration {
// Default.
interval := 60 * time.Second
if s == nil || s.opsService == nil {
return interval
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
cfg, err := s.opsService.GetOpsAlertRuntimeSettings(ctx)
if err != nil || cfg == nil {
return interval
}
if cfg.EvaluationIntervalSeconds <= 0 {
return interval
}
if cfg.EvaluationIntervalSeconds < 1 {
return interval
}
if cfg.EvaluationIntervalSeconds > int((24 * time.Hour).Seconds()) {
return interval
}
return time.Duration(cfg.EvaluationIntervalSeconds) * time.Second
}
func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) {
if s == nil || s.opsRepo == nil {
return
}
if s.cfg != nil && !s.cfg.Ops.Enabled {
return
}
ctx, cancel := context.WithTimeout(context.Background(), opsAlertEvaluatorTimeout)
defer cancel()
if s.opsService != nil && !s.opsService.IsMonitoringEnabled(ctx) {
return
}
runtimeCfg := defaultOpsAlertRuntimeSettings()
if s.opsService != nil {
if loaded, err := s.opsService.GetOpsAlertRuntimeSettings(ctx); err == nil && loaded != nil {
runtimeCfg = loaded
}
}
release, ok := s.tryAcquireLeaderLock(ctx, runtimeCfg.DistributedLock)
if !ok {
return
}
if release != nil {
defer release()
}
startedAt := time.Now().UTC()
runAt := startedAt
rules, err := s.opsRepo.ListAlertRules(ctx)
if err != nil {
s.recordHeartbeatError(runAt, time.Since(startedAt), err)
log.Printf("[OpsAlertEvaluator] list rules failed: %v", err)
return
}
now := time.Now().UTC()
safeEnd := now.Truncate(time.Minute)
if safeEnd.IsZero() {
safeEnd = now
}
systemMetrics, _ := s.opsRepo.GetLatestSystemMetrics(ctx, 1)
// Cleanup stale state for removed rules.
s.pruneRuleStates(rules)
for _, rule := range rules {
if rule == nil || !rule.Enabled || rule.ID <= 0 {
continue
}
scopePlatform, scopeGroupID := parseOpsAlertRuleScope(rule.Filters)
windowMinutes := rule.WindowMinutes
if windowMinutes <= 0 {
windowMinutes = 1
}
windowStart := safeEnd.Add(-time.Duration(windowMinutes) * time.Minute)
windowEnd := safeEnd
metricValue, ok := s.computeRuleMetric(ctx, rule, systemMetrics, windowStart, windowEnd, scopePlatform, scopeGroupID)
if !ok {
s.resetRuleState(rule.ID, now)
continue
}
breachedNow := compareMetric(metricValue, rule.Operator, rule.Threshold)
required := requiredSustainedBreaches(rule.SustainedMinutes, interval)
consecutive := s.updateRuleBreaches(rule.ID, now, interval, breachedNow)
activeEvent, err := s.opsRepo.GetActiveAlertEvent(ctx, rule.ID)
if err != nil {
log.Printf("[OpsAlertEvaluator] get active event failed (rule=%d): %v", rule.ID, err)
continue
}
if breachedNow && consecutive >= required {
if activeEvent != nil {
continue
}
latestEvent, err := s.opsRepo.GetLatestAlertEvent(ctx, rule.ID)
if err != nil {
log.Printf("[OpsAlertEvaluator] get latest event failed (rule=%d): %v", rule.ID, err)
continue
}
if latestEvent != nil && rule.CooldownMinutes > 0 {
cooldown := time.Duration(rule.CooldownMinutes) * time.Minute
if now.Sub(latestEvent.FiredAt) < cooldown {
continue
}
}
firedEvent := &OpsAlertEvent{
RuleID: rule.ID,
Severity: strings.TrimSpace(rule.Severity),
Status: OpsAlertStatusFiring,
Title: fmt.Sprintf("%s: %s", strings.TrimSpace(rule.Severity), strings.TrimSpace(rule.Name)),
Description: buildOpsAlertDescription(rule, metricValue, windowMinutes, scopePlatform, scopeGroupID),
MetricValue: float64Ptr(metricValue),
ThresholdValue: float64Ptr(rule.Threshold),
Dimensions: buildOpsAlertDimensions(scopePlatform, scopeGroupID),
FiredAt: now,
CreatedAt: now,
}
created, err := s.opsRepo.CreateAlertEvent(ctx, firedEvent)
if err != nil {
log.Printf("[OpsAlertEvaluator] create event failed (rule=%d): %v", rule.ID, err)
continue
}
if created != nil && created.ID > 0 {
s.maybeSendAlertEmail(ctx, runtimeCfg, rule, created)
}
continue
}
// Not breached: resolve active event if present.
if activeEvent != nil {
resolvedAt := now
if err := s.opsRepo.UpdateAlertEventStatus(ctx, activeEvent.ID, OpsAlertStatusResolved, &resolvedAt); err != nil {
log.Printf("[OpsAlertEvaluator] resolve event failed (event=%d): %v", activeEvent.ID, err)
}
}
}
s.recordHeartbeatSuccess(runAt, time.Since(startedAt))
}
func (s *OpsAlertEvaluatorService) pruneRuleStates(rules []*OpsAlertRule) {
s.mu.Lock()
defer s.mu.Unlock()
live := map[int64]struct{}{}
for _, r := range rules {
if r != nil && r.ID > 0 {
live[r.ID] = struct{}{}
}
}
for id := range s.ruleStates {
if _, ok := live[id]; !ok {
delete(s.ruleStates, id)
}
}
}
func (s *OpsAlertEvaluatorService) resetRuleState(ruleID int64, now time.Time) {
if ruleID <= 0 {
return
}
s.mu.Lock()
defer s.mu.Unlock()
state, ok := s.ruleStates[ruleID]
if !ok {
state = &opsAlertRuleState{}
s.ruleStates[ruleID] = state
}
state.LastEvaluatedAt = now
state.ConsecutiveBreaches = 0
}
func (s *OpsAlertEvaluatorService) updateRuleBreaches(ruleID int64, now time.Time, interval time.Duration, breached bool) int {
if ruleID <= 0 {
return 0
}
s.mu.Lock()
defer s.mu.Unlock()
state, ok := s.ruleStates[ruleID]
if !ok {
state = &opsAlertRuleState{}
s.ruleStates[ruleID] = state
}
if !state.LastEvaluatedAt.IsZero() && interval > 0 {
if now.Sub(state.LastEvaluatedAt) > interval*2 {
state.ConsecutiveBreaches = 0
}
}
state.LastEvaluatedAt = now
if breached {
state.ConsecutiveBreaches++
} else {
state.ConsecutiveBreaches = 0
}
return state.ConsecutiveBreaches
}
func requiredSustainedBreaches(sustainedMinutes int, interval time.Duration) int {
if sustainedMinutes <= 0 {
return 1
}
if interval <= 0 {
return sustainedMinutes
}
required := int(math.Ceil(float64(sustainedMinutes*60) / interval.Seconds()))
if required < 1 {
return 1
}
return required
}
func parseOpsAlertRuleScope(filters map[string]any) (platform string, groupID *int64) {
if filters == nil {
return "", nil
}
if v, ok := filters["platform"]; ok {
if s, ok := v.(string); ok {
platform = strings.TrimSpace(s)
}
}
if v, ok := filters["group_id"]; ok {
switch t := v.(type) {
case float64:
if t > 0 {
id := int64(t)
groupID = &id
}
case int64:
if t > 0 {
id := t
groupID = &id
}
case int:
if t > 0 {
id := int64(t)
groupID = &id
}
case string:
n, err := strconv.ParseInt(strings.TrimSpace(t), 10, 64)
if err == nil && n > 0 {
groupID = &n
}
}
}
return platform, groupID
}
func (s *OpsAlertEvaluatorService) computeRuleMetric(
ctx context.Context,
rule *OpsAlertRule,
systemMetrics *OpsSystemMetricsSnapshot,
start time.Time,
end time.Time,
platform string,
groupID *int64,
) (float64, bool) {
if rule == nil {
return 0, false
}
switch strings.TrimSpace(rule.MetricType) {
case "cpu_usage_percent":
if systemMetrics != nil && systemMetrics.CPUUsagePercent != nil {
return *systemMetrics.CPUUsagePercent, true
}
return 0, false
case "memory_usage_percent":
if systemMetrics != nil && systemMetrics.MemoryUsagePercent != nil {
return *systemMetrics.MemoryUsagePercent, true
}
return 0, false
case "concurrency_queue_depth":
if systemMetrics != nil && systemMetrics.ConcurrencyQueueDepth != nil {
return float64(*systemMetrics.ConcurrencyQueueDepth), true
}
return 0, false
case "group_available_accounts":
if groupID == nil || *groupID <= 0 {
return 0, false
}
if s == nil || s.opsService == nil {
return 0, false
}
availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID)
if err != nil || availability == nil {
return 0, false
}
if availability.Group == nil {
return 0, true
}
return float64(availability.Group.AvailableCount), true
case "group_available_ratio":
if groupID == nil || *groupID <= 0 {
return 0, false
}
if s == nil || s.opsService == nil {
return 0, false
}
availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID)
if err != nil || availability == nil {
return 0, false
}
return computeGroupAvailableRatio(availability.Group), true
case "account_rate_limited_count":
if s == nil || s.opsService == nil {
return 0, false
}
availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID)
if err != nil || availability == nil {
return 0, false
}
return float64(countAccountsByCondition(availability.Accounts, func(acc *AccountAvailability) bool {
return acc.IsRateLimited
})), true
case "account_error_count":
if s == nil || s.opsService == nil {
return 0, false
}
availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID)
if err != nil || availability == nil {
return 0, false
}
return float64(countAccountsByCondition(availability.Accounts, func(acc *AccountAvailability) bool {
return acc.HasError && acc.TempUnschedulableUntil == nil
})), true
}
overview, err := s.opsRepo.GetDashboardOverview(ctx, &OpsDashboardFilter{
StartTime: start,
EndTime: end,
Platform: platform,
GroupID: groupID,
QueryMode: OpsQueryModeRaw,
})
if err != nil {
return 0, false
}
if overview == nil {
return 0, false
}
switch strings.TrimSpace(rule.MetricType) {
case "success_rate":
if overview.RequestCountSLA <= 0 {
return 0, false
}
return overview.SLA * 100, true
case "error_rate":
if overview.RequestCountSLA <= 0 {
return 0, false
}
return overview.ErrorRate * 100, true
case "upstream_error_rate":
if overview.RequestCountSLA <= 0 {
return 0, false
}
return overview.UpstreamErrorRate * 100, true
case "p95_latency_ms":
if overview.Duration.P95 == nil {
return 0, false
}
return float64(*overview.Duration.P95), true
case "p99_latency_ms":
if overview.Duration.P99 == nil {
return 0, false
}
return float64(*overview.Duration.P99), true
default:
return 0, false
}
}
func compareMetric(value float64, operator string, threshold float64) bool {
switch strings.TrimSpace(operator) {
case ">":
return value > threshold
case ">=":
return value >= threshold
case "<":
return value < threshold
case "<=":
return value <= threshold
case "==":
return value == threshold
case "!=":
return value != threshold
default:
return false
}
}
func buildOpsAlertDimensions(platform string, groupID *int64) map[string]any {
dims := map[string]any{}
if strings.TrimSpace(platform) != "" {
dims["platform"] = strings.TrimSpace(platform)
}
if groupID != nil && *groupID > 0 {
dims["group_id"] = *groupID
}
if len(dims) == 0 {
return nil
}
return dims
}
func buildOpsAlertDescription(rule *OpsAlertRule, value float64, windowMinutes int, platform string, groupID *int64) string {
if rule == nil {
return ""
}
scope := "overall"
if strings.TrimSpace(platform) != "" {
scope = fmt.Sprintf("platform=%s", strings.TrimSpace(platform))
}
if groupID != nil && *groupID > 0 {
scope = fmt.Sprintf("%s group_id=%d", scope, *groupID)
}
if windowMinutes <= 0 {
windowMinutes = 1
}
return fmt.Sprintf("%s %s %.2f (current %.2f) over last %dm (%s)",
strings.TrimSpace(rule.MetricType),
strings.TrimSpace(rule.Operator),
rule.Threshold,
value,
windowMinutes,
strings.TrimSpace(scope),
)
}
func (s *OpsAlertEvaluatorService) maybeSendAlertEmail(ctx context.Context, runtimeCfg *OpsAlertRuntimeSettings, rule *OpsAlertRule, event *OpsAlertEvent) {
if s == nil || s.emailService == nil || s.opsService == nil || event == nil || rule == nil {
return
}
if event.EmailSent {
return
}
if !rule.NotifyEmail {
return
}
emailCfg, err := s.opsService.GetEmailNotificationConfig(ctx)
if err != nil || emailCfg == nil || !emailCfg.Alert.Enabled {
return
}
if len(emailCfg.Alert.Recipients) == 0 {
return
}
if !shouldSendOpsAlertEmailByMinSeverity(strings.TrimSpace(emailCfg.Alert.MinSeverity), strings.TrimSpace(rule.Severity)) {
return
}
if runtimeCfg != nil && runtimeCfg.Silencing.Enabled {
if isOpsAlertSilenced(time.Now().UTC(), rule, event, runtimeCfg.Silencing) {
return
}
}
// Apply/update rate limiter.
s.emailLimiter.SetLimit(emailCfg.Alert.RateLimitPerHour)
subject := fmt.Sprintf("[Ops Alert][%s] %s", strings.TrimSpace(rule.Severity), strings.TrimSpace(rule.Name))
body := buildOpsAlertEmailBody(rule, event)
anySent := false
for _, to := range emailCfg.Alert.Recipients {
addr := strings.TrimSpace(to)
if addr == "" {
continue
}
if !s.emailLimiter.Allow(time.Now().UTC()) {
continue
}
if err := s.emailService.SendEmail(ctx, addr, subject, body); err != nil {
// Ignore per-recipient failures; continue best-effort.
continue
}
anySent = true
}
if anySent {
_ = s.opsRepo.UpdateAlertEventEmailSent(context.Background(), event.ID, true)
}
}
func buildOpsAlertEmailBody(rule *OpsAlertRule, event *OpsAlertEvent) string {
if rule == nil || event == nil {
return ""
}
metric := strings.TrimSpace(rule.MetricType)
value := "-"
threshold := fmt.Sprintf("%.2f", rule.Threshold)
if event.MetricValue != nil {
value = fmt.Sprintf("%.2f", *event.MetricValue)
}
if event.ThresholdValue != nil {
threshold = fmt.Sprintf("%.2f", *event.ThresholdValue)
}
return fmt.Sprintf(`
<h2>Ops Alert</h2>
<p><b>Rule</b>: %s</p>
<p><b>Severity</b>: %s</p>
<p><b>Status</b>: %s</p>
<p><b>Metric</b>: %s %s %s</p>
<p><b>Fired at</b>: %s</p>
<p><b>Description</b>: %s</p>
`,
htmlEscape(rule.Name),
htmlEscape(rule.Severity),
htmlEscape(event.Status),
htmlEscape(metric),
htmlEscape(rule.Operator),
htmlEscape(fmt.Sprintf("%s (threshold %s)", value, threshold)),
event.FiredAt.Format(time.RFC3339),
htmlEscape(event.Description),
)
}
func shouldSendOpsAlertEmailByMinSeverity(minSeverity string, ruleSeverity string) bool {
minSeverity = strings.ToLower(strings.TrimSpace(minSeverity))
if minSeverity == "" {
return true
}
eventLevel := opsEmailSeverityForOps(ruleSeverity)
minLevel := strings.ToLower(minSeverity)
rank := func(level string) int {
switch level {
case "critical":
return 3
case "warning":
return 2
case "info":
return 1
default:
return 0
}
}
return rank(eventLevel) >= rank(minLevel)
}
func opsEmailSeverityForOps(severity string) string {
switch strings.ToUpper(strings.TrimSpace(severity)) {
case "P0":
return "critical"
case "P1":
return "warning"
default:
return "info"
}
}
func isOpsAlertSilenced(now time.Time, rule *OpsAlertRule, event *OpsAlertEvent, silencing OpsAlertSilencingSettings) bool {
if !silencing.Enabled {
return false
}
if now.IsZero() {
now = time.Now().UTC()
}
if strings.TrimSpace(silencing.GlobalUntilRFC3339) != "" {
if t, err := time.Parse(time.RFC3339, strings.TrimSpace(silencing.GlobalUntilRFC3339)); err == nil {
if now.Before(t) {
return true
}
}
}
for _, entry := range silencing.Entries {
untilRaw := strings.TrimSpace(entry.UntilRFC3339)
if untilRaw == "" {
continue
}
until, err := time.Parse(time.RFC3339, untilRaw)
if err != nil {
continue
}
if now.After(until) {
continue
}
if entry.RuleID != nil && rule != nil && rule.ID > 0 && *entry.RuleID != rule.ID {
continue
}
if len(entry.Severities) > 0 {
match := false
for _, s := range entry.Severities {
if strings.EqualFold(strings.TrimSpace(s), strings.TrimSpace(event.Severity)) || strings.EqualFold(strings.TrimSpace(s), strings.TrimSpace(rule.Severity)) {
match = true
break
}
}
if !match {
continue
}
}
return true
}
return false
}
func (s *OpsAlertEvaluatorService) tryAcquireLeaderLock(ctx context.Context, lock OpsDistributedLockSettings) (func(), bool) {
if !lock.Enabled {
return nil, true
}
if s.redisClient == nil {
s.warnNoRedisOnce.Do(func() {
log.Printf("[OpsAlertEvaluator] redis not configured; running without distributed lock")
})
return nil, true
}
key := strings.TrimSpace(lock.Key)
if key == "" {
key = opsAlertEvaluatorLeaderLockKey
}
ttl := time.Duration(lock.TTLSeconds) * time.Second
if ttl <= 0 {
ttl = opsAlertEvaluatorLeaderLockTTL
}
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
if err != nil {
// Prefer fail-closed to avoid duplicate evaluators stampeding the DB when Redis is flaky.
// Single-node deployments can disable the distributed lock via runtime settings.
s.warnNoRedisOnce.Do(func() {
log.Printf("[OpsAlertEvaluator] leader lock SetNX failed; skipping this cycle: %v", err)
})
return nil, false
}
if !ok {
s.maybeLogSkip(key)
return nil, false
}
return func() {
_, _ = opsAlertEvaluatorReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result()
}, true
}
func (s *OpsAlertEvaluatorService) maybeLogSkip(key string) {
s.skipLogMu.Lock()
defer s.skipLogMu.Unlock()
now := time.Now()
if !s.skipLogAt.IsZero() && now.Sub(s.skipLogAt) < opsAlertEvaluatorSkipLogInterval {
return
}
s.skipLogAt = now
log.Printf("[OpsAlertEvaluator] leader lock held by another instance; skipping (key=%q)", key)
}
func (s *OpsAlertEvaluatorService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration) {
if s == nil || s.opsRepo == nil {
return
}
now := time.Now().UTC()
durMs := duration.Milliseconds()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsAlertEvaluatorJobName,
LastRunAt: &runAt,
LastSuccessAt: &now,
LastDurationMs: &durMs,
})
}
func (s *OpsAlertEvaluatorService) recordHeartbeatError(runAt time.Time, duration time.Duration, err error) {
if s == nil || s.opsRepo == nil || err == nil {
return
}
now := time.Now().UTC()
durMs := duration.Milliseconds()
msg := truncateString(err.Error(), 2048)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsAlertEvaluatorJobName,
LastRunAt: &runAt,
LastErrorAt: &now,
LastError: &msg,
LastDurationMs: &durMs,
})
}
func htmlEscape(s string) string {
replacer := strings.NewReplacer(
"&", "&amp;",
"<", "&lt;",
">", "&gt;",
`"`, "&quot;",
"'", "&#39;",
)
return replacer.Replace(s)
}
type slidingWindowLimiter struct {
mu sync.Mutex
limit int
window time.Duration
sent []time.Time
}
func newSlidingWindowLimiter(limit int, window time.Duration) *slidingWindowLimiter {
if window <= 0 {
window = time.Hour
}
return &slidingWindowLimiter{
limit: limit,
window: window,
sent: []time.Time{},
}
}
func (l *slidingWindowLimiter) SetLimit(limit int) {
l.mu.Lock()
defer l.mu.Unlock()
l.limit = limit
}
func (l *slidingWindowLimiter) Allow(now time.Time) bool {
l.mu.Lock()
defer l.mu.Unlock()
if l.limit <= 0 {
return true
}
cutoff := now.Add(-l.window)
keep := l.sent[:0]
for _, t := range l.sent {
if t.After(cutoff) {
keep = append(keep, t)
}
}
l.sent = keep
if len(l.sent) >= l.limit {
return false
}
l.sent = append(l.sent, now)
return true
}
// computeGroupAvailableRatio returns the available percentage for a group.
// Formula: (AvailableCount / TotalAccounts) * 100.
// Returns 0 when TotalAccounts is 0.
func computeGroupAvailableRatio(group *GroupAvailability) float64 {
if group == nil || group.TotalAccounts <= 0 {
return 0
}
return (float64(group.AvailableCount) / float64(group.TotalAccounts)) * 100
}
// countAccountsByCondition counts accounts that satisfy the given condition.
func countAccountsByCondition(accounts map[int64]*AccountAvailability, condition func(*AccountAvailability) bool) int64 {
if len(accounts) == 0 || condition == nil {
return 0
}
var count int64
for _, account := range accounts {
if account != nil && condition(account) {
count++
}
}
return count
}
//go:build unit
package service
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
type stubOpsRepo struct {
OpsRepository
overview *OpsDashboardOverview
err error
}
func (s *stubOpsRepo) GetDashboardOverview(ctx context.Context, filter *OpsDashboardFilter) (*OpsDashboardOverview, error) {
if s.err != nil {
return nil, s.err
}
if s.overview != nil {
return s.overview, nil
}
return &OpsDashboardOverview{}, nil
}
func TestComputeGroupAvailableRatio(t *testing.T) {
t.Parallel()
t.Run("正常情况: 10个账号, 8个可用 = 80%", func(t *testing.T) {
t.Parallel()
got := computeGroupAvailableRatio(&GroupAvailability{
TotalAccounts: 10,
AvailableCount: 8,
})
require.InDelta(t, 80.0, got, 0.0001)
})
t.Run("边界情况: TotalAccounts = 0 应返回 0", func(t *testing.T) {
t.Parallel()
got := computeGroupAvailableRatio(&GroupAvailability{
TotalAccounts: 0,
AvailableCount: 8,
})
require.Equal(t, 0.0, got)
})
t.Run("边界情况: AvailableCount = 0 应返回 0%", func(t *testing.T) {
t.Parallel()
got := computeGroupAvailableRatio(&GroupAvailability{
TotalAccounts: 10,
AvailableCount: 0,
})
require.Equal(t, 0.0, got)
})
}
func TestCountAccountsByCondition(t *testing.T) {
t.Parallel()
t.Run("测试限流账号统计: acc.IsRateLimited", func(t *testing.T) {
t.Parallel()
accounts := map[int64]*AccountAvailability{
1: {IsRateLimited: true},
2: {IsRateLimited: false},
3: {IsRateLimited: true},
}
got := countAccountsByCondition(accounts, func(acc *AccountAvailability) bool {
return acc.IsRateLimited
})
require.Equal(t, int64(2), got)
})
t.Run("测试错误账号统计(排除临时不可调度): acc.HasError && acc.TempUnschedulableUntil == nil", func(t *testing.T) {
t.Parallel()
until := time.Now().UTC().Add(5 * time.Minute)
accounts := map[int64]*AccountAvailability{
1: {HasError: true},
2: {HasError: true, TempUnschedulableUntil: &until},
3: {HasError: false},
}
got := countAccountsByCondition(accounts, func(acc *AccountAvailability) bool {
return acc.HasError && acc.TempUnschedulableUntil == nil
})
require.Equal(t, int64(1), got)
})
t.Run("边界情况: 空 map 应返回 0", func(t *testing.T) {
t.Parallel()
got := countAccountsByCondition(map[int64]*AccountAvailability{}, func(acc *AccountAvailability) bool {
return acc.IsRateLimited
})
require.Equal(t, int64(0), got)
})
}
func TestComputeRuleMetricNewIndicators(t *testing.T) {
t.Parallel()
groupID := int64(101)
platform := "openai"
availability := &OpsAccountAvailability{
Group: &GroupAvailability{
GroupID: groupID,
TotalAccounts: 10,
AvailableCount: 8,
},
Accounts: map[int64]*AccountAvailability{
1: {IsRateLimited: true},
2: {IsRateLimited: true},
3: {HasError: true},
4: {HasError: true, TempUnschedulableUntil: timePtr(time.Now().UTC().Add(2 * time.Minute))},
5: {HasError: false, IsRateLimited: false},
},
}
opsService := &OpsService{
getAccountAvailability: func(_ context.Context, _ string, _ *int64) (*OpsAccountAvailability, error) {
return availability, nil
},
}
svc := &OpsAlertEvaluatorService{
opsService: opsService,
opsRepo: &stubOpsRepo{overview: &OpsDashboardOverview{}},
}
start := time.Now().UTC().Add(-5 * time.Minute)
end := time.Now().UTC()
ctx := context.Background()
tests := []struct {
name string
metricType string
groupID *int64
wantValue float64
wantOK bool
}{
{
name: "group_available_accounts",
metricType: "group_available_accounts",
groupID: &groupID,
wantValue: 8,
wantOK: true,
},
{
name: "group_available_ratio",
metricType: "group_available_ratio",
groupID: &groupID,
wantValue: 80.0,
wantOK: true,
},
{
name: "account_rate_limited_count",
metricType: "account_rate_limited_count",
groupID: nil,
wantValue: 2,
wantOK: true,
},
{
name: "account_error_count",
metricType: "account_error_count",
groupID: nil,
wantValue: 1,
wantOK: true,
},
{
name: "group_available_accounts without group_id returns false",
metricType: "group_available_accounts",
groupID: nil,
wantValue: 0,
wantOK: false,
},
{
name: "group_available_ratio without group_id returns false",
metricType: "group_available_ratio",
groupID: nil,
wantValue: 0,
wantOK: false,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
rule := &OpsAlertRule{
MetricType: tt.metricType,
}
gotValue, gotOK := svc.computeRuleMetric(ctx, rule, nil, start, end, platform, tt.groupID)
require.Equal(t, tt.wantOK, gotOK)
if !tt.wantOK {
return
}
require.InDelta(t, tt.wantValue, gotValue, 0.0001)
})
}
}
package service
import "time"
// Ops alert rule/event models.
//
// NOTE: These are admin-facing DTOs and intentionally keep JSON naming aligned
// with the existing ops dashboard frontend (backup style).
const (
OpsAlertStatusFiring = "firing"
OpsAlertStatusResolved = "resolved"
)
type OpsAlertRule struct {
ID int64 `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Enabled bool `json:"enabled"`
Severity string `json:"severity"`
MetricType string `json:"metric_type"`
Operator string `json:"operator"`
Threshold float64 `json:"threshold"`
WindowMinutes int `json:"window_minutes"`
SustainedMinutes int `json:"sustained_minutes"`
CooldownMinutes int `json:"cooldown_minutes"`
NotifyEmail bool `json:"notify_email"`
Filters map[string]any `json:"filters,omitempty"`
LastTriggeredAt *time.Time `json:"last_triggered_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type OpsAlertEvent struct {
ID int64 `json:"id"`
RuleID int64 `json:"rule_id"`
Severity string `json:"severity"`
Status string `json:"status"`
Title string `json:"title"`
Description string `json:"description"`
MetricValue *float64 `json:"metric_value,omitempty"`
ThresholdValue *float64 `json:"threshold_value,omitempty"`
Dimensions map[string]any `json:"dimensions,omitempty"`
FiredAt time.Time `json:"fired_at"`
ResolvedAt *time.Time `json:"resolved_at,omitempty"`
EmailSent bool `json:"email_sent"`
CreatedAt time.Time `json:"created_at"`
}
type OpsAlertEventFilter struct {
Limit int
// Optional filters.
Status string
Severity string
StartTime *time.Time
EndTime *time.Time
// Dimensions filters (best-effort).
Platform string
GroupID *int64
}
package service
import (
"context"
"database/sql"
"errors"
"strings"
"time"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
func (s *OpsService) ListAlertRules(ctx context.Context) ([]*OpsAlertRule, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return []*OpsAlertRule{}, nil
}
return s.opsRepo.ListAlertRules(ctx)
}
func (s *OpsService) CreateAlertRule(ctx context.Context, rule *OpsAlertRule) (*OpsAlertRule, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if rule == nil {
return nil, infraerrors.BadRequest("INVALID_RULE", "invalid rule")
}
created, err := s.opsRepo.CreateAlertRule(ctx, rule)
if err != nil {
return nil, err
}
return created, nil
}
func (s *OpsService) UpdateAlertRule(ctx context.Context, rule *OpsAlertRule) (*OpsAlertRule, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if rule == nil || rule.ID <= 0 {
return nil, infraerrors.BadRequest("INVALID_RULE", "invalid rule")
}
updated, err := s.opsRepo.UpdateAlertRule(ctx, rule)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, infraerrors.NotFound("OPS_ALERT_RULE_NOT_FOUND", "alert rule not found")
}
return nil, err
}
return updated, nil
}
func (s *OpsService) DeleteAlertRule(ctx context.Context, id int64) error {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return err
}
if s.opsRepo == nil {
return infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if id <= 0 {
return infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id")
}
if err := s.opsRepo.DeleteAlertRule(ctx, id); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return infraerrors.NotFound("OPS_ALERT_RULE_NOT_FOUND", "alert rule not found")
}
return err
}
return nil
}
func (s *OpsService) ListAlertEvents(ctx context.Context, filter *OpsAlertEventFilter) ([]*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return []*OpsAlertEvent{}, nil
}
return s.opsRepo.ListAlertEvents(ctx, filter)
}
func (s *OpsService) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if ruleID <= 0 {
return nil, infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id")
}
return s.opsRepo.GetActiveAlertEvent(ctx, ruleID)
}
func (s *OpsService) GetLatestAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if ruleID <= 0 {
return nil, infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id")
}
return s.opsRepo.GetLatestAlertEvent(ctx, ruleID)
}
func (s *OpsService) CreateAlertEvent(ctx context.Context, event *OpsAlertEvent) (*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if event == nil {
return nil, infraerrors.BadRequest("INVALID_EVENT", "invalid event")
}
created, err := s.opsRepo.CreateAlertEvent(ctx, event)
if err != nil {
return nil, err
}
return created, nil
}
func (s *OpsService) UpdateAlertEventStatus(ctx context.Context, eventID int64, status string, resolvedAt *time.Time) error {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return err
}
if s.opsRepo == nil {
return infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if eventID <= 0 {
return infraerrors.BadRequest("INVALID_EVENT_ID", "invalid event id")
}
if strings.TrimSpace(status) == "" {
return infraerrors.BadRequest("INVALID_STATUS", "invalid status")
}
return s.opsRepo.UpdateAlertEventStatus(ctx, eventID, status, resolvedAt)
}
func (s *OpsService) UpdateAlertEventEmailSent(ctx context.Context, eventID int64, emailSent bool) error {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return err
}
if s.opsRepo == nil {
return infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if eventID <= 0 {
return infraerrors.BadRequest("INVALID_EVENT_ID", "invalid event id")
}
return s.opsRepo.UpdateAlertEventEmailSent(ctx, eventID, emailSent)
}
package service
import (
"context"
"database/sql"
"fmt"
"log"
"strings"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
)
const (
opsCleanupJobName = "ops_cleanup"
opsCleanupLeaderLockKeyDefault = "ops:cleanup:leader"
opsCleanupLeaderLockTTLDefault = 30 * time.Minute
)
var opsCleanupCronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
var opsCleanupReleaseScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`)
// OpsCleanupService periodically deletes old ops data to prevent unbounded DB growth.
//
// - Scheduling: 5-field cron spec (minute hour dom month dow).
// - Multi-instance: best-effort Redis leader lock so only one node runs cleanup.
// - Safety: deletes in batches to avoid long transactions.
type OpsCleanupService struct {
opsRepo OpsRepository
db *sql.DB
redisClient *redis.Client
cfg *config.Config
instanceID string
cron *cron.Cron
startOnce sync.Once
stopOnce sync.Once
warnNoRedisOnce sync.Once
}
func NewOpsCleanupService(
opsRepo OpsRepository,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
) *OpsCleanupService {
return &OpsCleanupService{
opsRepo: opsRepo,
db: db,
redisClient: redisClient,
cfg: cfg,
instanceID: uuid.NewString(),
}
}
func (s *OpsCleanupService) Start() {
if s == nil {
return
}
if s.cfg != nil && !s.cfg.Ops.Enabled {
return
}
if s.cfg != nil && !s.cfg.Ops.Cleanup.Enabled {
log.Printf("[OpsCleanup] not started (disabled)")
return
}
if s.opsRepo == nil || s.db == nil {
log.Printf("[OpsCleanup] not started (missing deps)")
return
}
s.startOnce.Do(func() {
schedule := "0 2 * * *"
if s.cfg != nil && strings.TrimSpace(s.cfg.Ops.Cleanup.Schedule) != "" {
schedule = strings.TrimSpace(s.cfg.Ops.Cleanup.Schedule)
}
loc := time.Local
if s.cfg != nil && strings.TrimSpace(s.cfg.Timezone) != "" {
if parsed, err := time.LoadLocation(strings.TrimSpace(s.cfg.Timezone)); err == nil && parsed != nil {
loc = parsed
}
}
c := cron.New(cron.WithParser(opsCleanupCronParser), cron.WithLocation(loc))
_, err := c.AddFunc(schedule, func() { s.runScheduled() })
if err != nil {
log.Printf("[OpsCleanup] not started (invalid schedule=%q): %v", schedule, err)
return
}
s.cron = c
s.cron.Start()
log.Printf("[OpsCleanup] started (schedule=%q tz=%s)", schedule, loc.String())
})
}
func (s *OpsCleanupService) Stop() {
if s == nil {
return
}
s.stopOnce.Do(func() {
if s.cron != nil {
ctx := s.cron.Stop()
select {
case <-ctx.Done():
case <-time.After(3 * time.Second):
log.Printf("[OpsCleanup] cron stop timed out")
}
}
})
}
func (s *OpsCleanupService) runScheduled() {
if s == nil || s.db == nil || s.opsRepo == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
release, ok := s.tryAcquireLeaderLock(ctx)
if !ok {
return
}
if release != nil {
defer release()
}
startedAt := time.Now().UTC()
runAt := startedAt
counts, err := s.runCleanupOnce(ctx)
if err != nil {
s.recordHeartbeatError(runAt, time.Since(startedAt), err)
log.Printf("[OpsCleanup] cleanup failed: %v", err)
return
}
s.recordHeartbeatSuccess(runAt, time.Since(startedAt))
log.Printf("[OpsCleanup] cleanup complete: %s", counts)
}
type opsCleanupDeletedCounts struct {
errorLogs int64
retryAttempts int64
alertEvents int64
systemMetrics int64
hourlyPreagg int64
dailyPreagg int64
}
func (c opsCleanupDeletedCounts) String() string {
return fmt.Sprintf(
"error_logs=%d retry_attempts=%d alert_events=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d",
c.errorLogs,
c.retryAttempts,
c.alertEvents,
c.systemMetrics,
c.hourlyPreagg,
c.dailyPreagg,
)
}
func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDeletedCounts, error) {
out := opsCleanupDeletedCounts{}
if s == nil || s.db == nil || s.cfg == nil {
return out, nil
}
batchSize := 5000
now := time.Now().UTC()
// Error-like tables: error logs / retry attempts / alert events.
if days := s.cfg.Ops.Cleanup.ErrorLogRetentionDays; days > 0 {
cutoff := now.AddDate(0, 0, -days)
n, err := deleteOldRowsByID(ctx, s.db, "ops_error_logs", "created_at", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.errorLogs = n
n, err = deleteOldRowsByID(ctx, s.db, "ops_retry_attempts", "created_at", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.retryAttempts = n
n, err = deleteOldRowsByID(ctx, s.db, "ops_alert_events", "created_at", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.alertEvents = n
}
// Minute-level metrics snapshots.
if days := s.cfg.Ops.Cleanup.MinuteMetricsRetentionDays; days > 0 {
cutoff := now.AddDate(0, 0, -days)
n, err := deleteOldRowsByID(ctx, s.db, "ops_system_metrics", "created_at", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.systemMetrics = n
}
// Pre-aggregation tables (hourly/daily).
if days := s.cfg.Ops.Cleanup.HourlyMetricsRetentionDays; days > 0 {
cutoff := now.AddDate(0, 0, -days)
n, err := deleteOldRowsByID(ctx, s.db, "ops_metrics_hourly", "bucket_start", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.hourlyPreagg = n
n, err = deleteOldRowsByID(ctx, s.db, "ops_metrics_daily", "bucket_date", cutoff, batchSize, true)
if err != nil {
return out, err
}
out.dailyPreagg = n
}
return out, nil
}
func deleteOldRowsByID(
ctx context.Context,
db *sql.DB,
table string,
timeColumn string,
cutoff time.Time,
batchSize int,
castCutoffToDate bool,
) (int64, error) {
if db == nil {
return 0, nil
}
if batchSize <= 0 {
batchSize = 5000
}
where := fmt.Sprintf("%s < $1", timeColumn)
if castCutoffToDate {
where = fmt.Sprintf("%s < $1::date", timeColumn)
}
q := fmt.Sprintf(`
WITH batch AS (
SELECT id FROM %s
WHERE %s
ORDER BY id
LIMIT $2
)
DELETE FROM %s
WHERE id IN (SELECT id FROM batch)
`, table, where, table)
var total int64
for {
res, err := db.ExecContext(ctx, q, cutoff, batchSize)
if err != nil {
// If ops tables aren't present yet (partial deployments), treat as no-op.
if strings.Contains(strings.ToLower(err.Error()), "does not exist") && strings.Contains(strings.ToLower(err.Error()), "relation") {
return total, nil
}
return total, err
}
affected, err := res.RowsAffected()
if err != nil {
return total, err
}
total += affected
if affected == 0 {
break
}
}
return total, nil
}
func (s *OpsCleanupService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) {
if s == nil {
return nil, false
}
// In simple run mode, assume single instance.
if s.cfg != nil && s.cfg.RunMode == config.RunModeSimple {
return nil, true
}
key := opsCleanupLeaderLockKeyDefault
ttl := opsCleanupLeaderLockTTLDefault
// Prefer Redis leader lock when available, but avoid stampeding the DB when Redis is flaky by
// falling back to a DB advisory lock.
if s.redisClient != nil {
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
if err == nil {
if !ok {
return nil, false
}
return func() {
_, _ = opsCleanupReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result()
}, true
}
// Redis error: fall back to DB advisory lock.
s.warnNoRedisOnce.Do(func() {
log.Printf("[OpsCleanup] leader lock SetNX failed; falling back to DB advisory lock: %v", err)
})
} else {
s.warnNoRedisOnce.Do(func() {
log.Printf("[OpsCleanup] redis not configured; using DB advisory lock")
})
}
release, ok := tryAcquireDBAdvisoryLock(ctx, s.db, hashAdvisoryLockID(key))
if !ok {
return nil, false
}
return release, true
}
func (s *OpsCleanupService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration) {
if s == nil || s.opsRepo == nil {
return
}
now := time.Now().UTC()
durMs := duration.Milliseconds()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsCleanupJobName,
LastRunAt: &runAt,
LastSuccessAt: &now,
LastDurationMs: &durMs,
})
}
func (s *OpsCleanupService) recordHeartbeatError(runAt time.Time, duration time.Duration, err error) {
if s == nil || s.opsRepo == nil || err == nil {
return
}
now := time.Now().UTC()
durMs := duration.Milliseconds()
msg := truncateString(err.Error(), 2048)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsCleanupJobName,
LastRunAt: &runAt,
LastErrorAt: &now,
LastError: &msg,
LastDurationMs: &durMs,
})
}
package service
import (
"context"
"log"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
)
const (
opsAccountsPageSize = 100
opsConcurrencyBatchChunkSize = 200
)
func (s *OpsService) listAllAccountsForOps(ctx context.Context, platformFilter string) ([]Account, error) {
if s == nil || s.accountRepo == nil {
return []Account{}, nil
}
out := make([]Account, 0, 128)
page := 1
for {
accounts, pageInfo, err := s.accountRepo.ListWithFilters(ctx, pagination.PaginationParams{
Page: page,
PageSize: opsAccountsPageSize,
}, platformFilter, "", "", "")
if err != nil {
return nil, err
}
if len(accounts) == 0 {
break
}
out = append(out, accounts...)
if pageInfo != nil && int64(len(out)) >= pageInfo.Total {
break
}
if len(accounts) < opsAccountsPageSize {
break
}
page++
if page > 10_000 {
log.Printf("[Ops] listAllAccountsForOps: aborting after too many pages (platform=%q)", platformFilter)
break
}
}
return out, nil
}
func (s *OpsService) getAccountsLoadMapBestEffort(ctx context.Context, accounts []Account) map[int64]*AccountLoadInfo {
if s == nil || s.concurrencyService == nil {
return map[int64]*AccountLoadInfo{}
}
if len(accounts) == 0 {
return map[int64]*AccountLoadInfo{}
}
// De-duplicate IDs (and keep the max concurrency to avoid under-reporting).
unique := make(map[int64]int, len(accounts))
for _, acc := range accounts {
if acc.ID <= 0 {
continue
}
if prev, ok := unique[acc.ID]; !ok || acc.Concurrency > prev {
unique[acc.ID] = acc.Concurrency
}
}
batch := make([]AccountWithConcurrency, 0, len(unique))
for id, maxConc := range unique {
batch = append(batch, AccountWithConcurrency{
ID: id,
MaxConcurrency: maxConc,
})
}
out := make(map[int64]*AccountLoadInfo, len(batch))
for i := 0; i < len(batch); i += opsConcurrencyBatchChunkSize {
end := i + opsConcurrencyBatchChunkSize
if end > len(batch) {
end = len(batch)
}
part, err := s.concurrencyService.GetAccountsLoadBatch(ctx, batch[i:end])
if err != nil {
// Best-effort: return zeros rather than failing the ops UI.
log.Printf("[Ops] GetAccountsLoadBatch failed: %v", err)
continue
}
for k, v := range part {
out[k] = v
}
}
return out
}
// GetConcurrencyStats returns real-time concurrency usage aggregated by platform/group/account.
//
// Optional filters:
// - platformFilter: only include accounts in that platform (best-effort reduces DB load)
// - groupIDFilter: only include accounts that belong to that group
func (s *OpsService) GetConcurrencyStats(
ctx context.Context,
platformFilter string,
groupIDFilter *int64,
) (map[string]*PlatformConcurrencyInfo, map[int64]*GroupConcurrencyInfo, map[int64]*AccountConcurrencyInfo, *time.Time, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, nil, nil, nil, err
}
accounts, err := s.listAllAccountsForOps(ctx, platformFilter)
if err != nil {
return nil, nil, nil, nil, err
}
collectedAt := time.Now()
loadMap := s.getAccountsLoadMapBestEffort(ctx, accounts)
platform := make(map[string]*PlatformConcurrencyInfo)
group := make(map[int64]*GroupConcurrencyInfo)
account := make(map[int64]*AccountConcurrencyInfo)
for _, acc := range accounts {
if acc.ID <= 0 {
continue
}
var matchedGroup *Group
if groupIDFilter != nil && *groupIDFilter > 0 {
for _, grp := range acc.Groups {
if grp == nil || grp.ID <= 0 {
continue
}
if grp.ID == *groupIDFilter {
matchedGroup = grp
break
}
}
// Group filter provided: skip accounts not in that group.
if matchedGroup == nil {
continue
}
}
load := loadMap[acc.ID]
currentInUse := int64(0)
waiting := int64(0)
if load != nil {
currentInUse = int64(load.CurrentConcurrency)
waiting = int64(load.WaitingCount)
}
// Account-level view picks one display group (the first group).
displayGroupID := int64(0)
displayGroupName := ""
if matchedGroup != nil {
displayGroupID = matchedGroup.ID
displayGroupName = matchedGroup.Name
} else if len(acc.Groups) > 0 && acc.Groups[0] != nil {
displayGroupID = acc.Groups[0].ID
displayGroupName = acc.Groups[0].Name
}
if _, ok := account[acc.ID]; !ok {
info := &AccountConcurrencyInfo{
AccountID: acc.ID,
AccountName: acc.Name,
Platform: acc.Platform,
GroupID: displayGroupID,
GroupName: displayGroupName,
CurrentInUse: currentInUse,
MaxCapacity: int64(acc.Concurrency),
WaitingInQueue: waiting,
}
if info.MaxCapacity > 0 {
info.LoadPercentage = float64(info.CurrentInUse) / float64(info.MaxCapacity) * 100
}
account[acc.ID] = info
}
// Platform aggregation.
if acc.Platform != "" {
if _, ok := platform[acc.Platform]; !ok {
platform[acc.Platform] = &PlatformConcurrencyInfo{
Platform: acc.Platform,
}
}
p := platform[acc.Platform]
p.MaxCapacity += int64(acc.Concurrency)
p.CurrentInUse += currentInUse
p.WaitingInQueue += waiting
}
// Group aggregation (one account may contribute to multiple groups).
if matchedGroup != nil {
grp := matchedGroup
if _, ok := group[grp.ID]; !ok {
group[grp.ID] = &GroupConcurrencyInfo{
GroupID: grp.ID,
GroupName: grp.Name,
Platform: grp.Platform,
}
}
g := group[grp.ID]
if g.GroupName == "" && grp.Name != "" {
g.GroupName = grp.Name
}
if g.Platform != "" && grp.Platform != "" && g.Platform != grp.Platform {
// Groups are expected to be platform-scoped. If mismatch is observed, avoid misleading labels.
g.Platform = ""
}
g.MaxCapacity += int64(acc.Concurrency)
g.CurrentInUse += currentInUse
g.WaitingInQueue += waiting
} else {
for _, grp := range acc.Groups {
if grp == nil || grp.ID <= 0 {
continue
}
if _, ok := group[grp.ID]; !ok {
group[grp.ID] = &GroupConcurrencyInfo{
GroupID: grp.ID,
GroupName: grp.Name,
Platform: grp.Platform,
}
}
g := group[grp.ID]
if g.GroupName == "" && grp.Name != "" {
g.GroupName = grp.Name
}
if g.Platform != "" && grp.Platform != "" && g.Platform != grp.Platform {
// Groups are expected to be platform-scoped. If mismatch is observed, avoid misleading labels.
g.Platform = ""
}
g.MaxCapacity += int64(acc.Concurrency)
g.CurrentInUse += currentInUse
g.WaitingInQueue += waiting
}
}
}
for _, info := range platform {
if info.MaxCapacity > 0 {
info.LoadPercentage = float64(info.CurrentInUse) / float64(info.MaxCapacity) * 100
}
}
for _, info := range group {
if info.MaxCapacity > 0 {
info.LoadPercentage = float64(info.CurrentInUse) / float64(info.MaxCapacity) * 100
}
}
return platform, group, account, &collectedAt, nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment