package service import ( "context" "encoding/json" "io" "log/slog" "net/http" "strings" "time" "github.com/Wei-Shaw/sub2api/internal/pkg/antigravity" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" ) const ( // creditsExhaustedKey 是 model_rate_limits 中标记积分耗尽的特殊 key。 // 与普通模型限流完全同构:通过 SetModelRateLimit / isRateLimitActiveForKey 读写。 creditsExhaustedKey = "AICredits" creditsExhaustedDuration = 5 * time.Hour // credits 降级响应重试参数 creditsRetryMaxAttempts = 3 creditsRetryBaseInterval = 500 * time.Millisecond ) // creditsRetryableErrorCodes 是降级响应中可重试的错误码集合。 // forbidden 是稳定的封号状态,不属于可恢复的瞬态错误,不重试。 var creditsRetryableErrorCodes = map[string]bool{ errorCodeUnauthenticated: true, errorCodeRateLimited: true, errorCodeNetworkError: true, } // isAntigravityDegradedResponse 检查 UsageInfo 是否为可重试的降级响应。 // 仅检测 3 个瞬态错误码(unauthenticated/rate_limited/network_error), // forbidden 是稳定的封号状态,不属于降级。 func isAntigravityDegradedResponse(info *UsageInfo) bool { if info == nil || info.ErrorCode == "" { return false } return creditsRetryableErrorCodes[info.ErrorCode] } // checkAccountCredits 通过共享的 AccountUsageService 缓存检查账号是否有足够的 AI Credits。 // 缓存 TTL 不足时会自动从 Google loadCodeAssist API 刷新。 // 检测到降级响应时会清除缓存并重试,最终 fail-open(返回 true)。 func (s *AntigravityGatewayService) checkAccountCredits( ctx context.Context, account *Account, ) bool { if account == nil || account.ID == 0 { return false } if s.accountUsageService == nil { return true // 无 usage service 时不阻断 } usageInfo, err := s.accountUsageService.GetAntigravityCredits(ctx, account) if err != nil { slog.Error("check_credits: get_credits_failed", "account_id", account.ID, "error", err) return true // 出错时 fail-open } // 非降级响应:直接检查积分余额 if !isAntigravityDegradedResponse(usageInfo) { return s.logCreditsResult(account, usageInfo) } // 降级响应:清除缓存后重试 return s.retryCreditsOnDegraded(ctx, account, usageInfo) } // retryCreditsOnDegraded 在检测到降级响应后,清除缓存并重试获取 credits。 // 使用指数退避(500ms → 1s → 2s),最多重试 creditsRetryMaxAttempts 次。 // 所有重试失败后 fail-open(返回 true),不做熔断。 func (s *AntigravityGatewayService) retryCreditsOnDegraded( ctx context.Context, account *Account, lastInfo *UsageInfo, ) bool { for attempt := 1; attempt <= creditsRetryMaxAttempts; attempt++ { delay := creditsRetryBaseInterval << (attempt - 1) // 指数退避:500ms, 1s, 2s slog.Warn("check_credits: degraded response, retrying", "account_id", account.ID, "attempt", attempt, "max_attempts", creditsRetryMaxAttempts, "error_code", lastInfo.ErrorCode, "delay", delay, ) select { case <-ctx.Done(): slog.Warn("check_credits: context cancelled during retry, fail-open", "account_id", account.ID, "attempt", attempt) return true case <-time.After(delay): } // 清除缓存,强制下次 GetAntigravityCredits 重新拉取 s.accountUsageService.InvalidateAntigravityCreditsCache(account.ID) info, err := s.accountUsageService.GetAntigravityCredits(ctx, account) if err != nil { slog.Error("check_credits: retry get_credits_failed", "account_id", account.ID, "attempt", attempt, "error", err) continue } // 重试成功(不再是降级响应):检查积分余额 if !isAntigravityDegradedResponse(info) { slog.Info("check_credits: retry succeeded", "account_id", account.ID, "attempt", attempt) return s.logCreditsResult(account, info) } lastInfo = info } // 所有重试失败:fail-open,不做熔断 slog.Warn("check_credits: all retries exhausted, fail-open", "account_id", account.ID, "last_error_code", lastInfo.ErrorCode, ) return true } // logCreditsResult 检查积分并记录不足日志,返回是否有积分。 func (s *AntigravityGatewayService) logCreditsResult(account *Account, info *UsageInfo) bool { hasCredits := hasEnoughCredits(info) if !hasCredits { slog.Warn("check_credits: insufficient credits", "account_id", account.ID) } return hasCredits } // hasEnoughCredits 检查 UsageInfo 中是否有足够的 GOOGLE_ONE_AI 积分。 // 返回 true 表示积分可用,false 表示积分不足或无积分信息。 func hasEnoughCredits(info *UsageInfo) bool { if info == nil || len(info.AICredits) == 0 { return false } for _, credit := range info.AICredits { if credit.CreditType == "GOOGLE_ONE_AI" { minimum := credit.MinimumBalance if minimum <= 0 { minimum = 5 } return credit.Amount >= minimum } } return false } type antigravity429Category string const ( antigravity429Unknown antigravity429Category = "unknown" antigravity429RateLimited antigravity429Category = "rate_limited" antigravity429QuotaExhausted antigravity429Category = "quota_exhausted" ) var ( antigravityQuotaExhaustedKeywords = []string{ "quota_exhausted", "quota exhausted", } creditsExhaustedKeywords = []string{ "google_one_ai", "insufficient credit", "insufficient credits", "not enough credit", "not enough credits", "credit exhausted", "credits exhausted", "credit balance", "minimumcreditamountforusage", "minimum credit amount for usage", "minimum credit", "resource has been exhausted", } ) // isCreditsExhausted 检查账号的 AICredits 限流 key 是否生效(积分是否耗尽)。 func (a *Account) isCreditsExhausted() bool { if a == nil { return false } return a.isRateLimitActiveForKey(creditsExhaustedKey) } // setCreditsExhausted 标记账号积分耗尽:写入 model_rate_limits["AICredits"] + 更新缓存。 func (s *AntigravityGatewayService) setCreditsExhausted(ctx context.Context, account *Account) { if account == nil || account.ID == 0 { return } resetAt := time.Now().Add(creditsExhaustedDuration) if err := s.accountRepo.SetModelRateLimit(ctx, account.ID, creditsExhaustedKey, resetAt); err != nil { logger.LegacyPrintf("service.antigravity_gateway", "set credits exhausted failed: account=%d err=%v", account.ID, err) return } s.updateAccountModelRateLimitInCache(ctx, account, creditsExhaustedKey, resetAt) logger.LegacyPrintf("service.antigravity_gateway", "credits_exhausted_marked account=%d reset_at=%s", account.ID, resetAt.UTC().Format(time.RFC3339)) } // clearCreditsExhausted 清除账号的 AICredits 限流 key。 func (s *AntigravityGatewayService) clearCreditsExhausted(ctx context.Context, account *Account) { if account == nil || account.ID == 0 || account.Extra == nil { return } rawLimits, ok := account.Extra[modelRateLimitsKey].(map[string]any) if !ok { return } if _, exists := rawLimits[creditsExhaustedKey]; !exists { return } delete(rawLimits, creditsExhaustedKey) account.Extra[modelRateLimitsKey] = rawLimits if err := s.accountRepo.UpdateExtra(ctx, account.ID, map[string]any{ modelRateLimitsKey: rawLimits, }); err != nil { logger.LegacyPrintf("service.antigravity_gateway", "clear credits exhausted failed: account=%d err=%v", account.ID, err) } // 同步更新 Redis 调度快照,避免其他节点/请求延迟感知 if s.schedulerSnapshot != nil { _ = s.schedulerSnapshot.UpdateAccountInCache(ctx, account) } } // classifyAntigravity429 将 Antigravity 的 429 响应归类为配额耗尽、限流或未知。 func classifyAntigravity429(body []byte) antigravity429Category { if len(body) == 0 { return antigravity429Unknown } lowerBody := strings.ToLower(string(body)) for _, keyword := range antigravityQuotaExhaustedKeywords { if strings.Contains(lowerBody, keyword) { return antigravity429QuotaExhausted } } if info := parseAntigravitySmartRetryInfo(body); info != nil && !info.IsModelCapacityExhausted { return antigravity429RateLimited } return antigravity429Unknown } // injectEnabledCreditTypes 在已序列化的 v1internal JSON body 中注入 AI Credits 类型。 func injectEnabledCreditTypes(body []byte) []byte { var payload map[string]any if err := json.Unmarshal(body, &payload); err != nil { return nil } payload["enabledCreditTypes"] = []string{"GOOGLE_ONE_AI"} result, err := json.Marshal(payload) if err != nil { return nil } return result } // resolveCreditsOveragesModelKey 解析当前请求对应的 overages 状态模型 key。 func resolveCreditsOveragesModelKey(ctx context.Context, account *Account, upstreamModelName, requestedModel string) string { modelKey := strings.TrimSpace(upstreamModelName) if modelKey != "" { return modelKey } if account == nil { return "" } modelKey = resolveFinalAntigravityModelKey(ctx, account, requestedModel) if strings.TrimSpace(modelKey) != "" { return modelKey } return resolveAntigravityModelKey(requestedModel) } // shouldMarkCreditsExhausted 判断一次 credits 请求失败是否应标记为 credits 耗尽。 // 注意:不再检查 isURLLevelRateLimit。此函数仅在积分重试失败后调用, // 如果注入 enabledCreditTypes 后仍返回 "Resource has been exhausted", // 说明积分也已耗尽,应该标记。clearCreditsExhausted 会在后续成功时自动清除。 func shouldMarkCreditsExhausted(resp *http.Response, respBody []byte, reqErr error) bool { if reqErr != nil || resp == nil { return false } if resp.StatusCode >= 500 || resp.StatusCode == http.StatusRequestTimeout { return false } if info := parseAntigravitySmartRetryInfo(respBody); info != nil { return false } bodyLower := strings.ToLower(string(respBody)) for _, keyword := range creditsExhaustedKeywords { if strings.Contains(bodyLower, keyword) { return true } } return false } type creditsOveragesRetryResult struct { handled bool resp *http.Response } // attemptCreditsOveragesRetry 在确认免费配额耗尽后,尝试注入 AI Credits 继续请求。 func (s *AntigravityGatewayService) attemptCreditsOveragesRetry( p antigravityRetryLoopParams, baseURL string, modelName string, waitDuration time.Duration, originalStatusCode int, respBody []byte, ) *creditsOveragesRetryResult { creditsBody := injectEnabledCreditTypes(p.body) if creditsBody == nil { return &creditsOveragesRetryResult{handled: false} } // Check actual credits balance before attempting retry if !s.checkAccountCredits(p.ctx, p.account) { s.setCreditsExhausted(p.ctx, p.account) modelKey := resolveCreditsOveragesModelKey(p.ctx, p.account, modelName, p.requestedModel) logger.LegacyPrintf("service.antigravity_gateway", "%s credit_overages_no_credits model=%s account=%d (skipping credits retry)", p.prefix, modelKey, p.account.ID) return &creditsOveragesRetryResult{handled: true} } modelKey := resolveCreditsOveragesModelKey(p.ctx, p.account, modelName, p.requestedModel) logger.LegacyPrintf("service.antigravity_gateway", "%s status=429 credit_overages_retry model=%s account=%d (injecting enabledCreditTypes)", p.prefix, modelKey, p.account.ID) creditsReq, err := antigravity.NewAPIRequestWithURL(p.ctx, baseURL, p.action, p.accessToken, creditsBody) if err != nil { logger.LegacyPrintf("service.antigravity_gateway", "%s credit_overages_failed model=%s account=%d build_request_err=%v", p.prefix, modelKey, p.account.ID, err) return &creditsOveragesRetryResult{handled: true} } creditsResp, err := p.httpUpstream.Do(creditsReq, p.proxyURL, p.account.ID, p.account.Concurrency) if err == nil && creditsResp != nil && creditsResp.StatusCode < 400 { s.clearCreditsExhausted(p.ctx, p.account) logger.LegacyPrintf("service.antigravity_gateway", "%s status=%d credit_overages_success model=%s account=%d", p.prefix, creditsResp.StatusCode, modelKey, p.account.ID) return &creditsOveragesRetryResult{handled: true, resp: creditsResp} } s.handleCreditsRetryFailure(p.ctx, p.prefix, modelKey, p.account, creditsResp, err) return &creditsOveragesRetryResult{handled: true} } func (s *AntigravityGatewayService) handleCreditsRetryFailure( ctx context.Context, prefix string, modelKey string, account *Account, creditsResp *http.Response, reqErr error, ) { var creditsRespBody []byte creditsStatusCode := 0 if creditsResp != nil { creditsStatusCode = creditsResp.StatusCode if creditsResp.Body != nil { creditsRespBody, _ = io.ReadAll(io.LimitReader(creditsResp.Body, 64<<10)) _ = creditsResp.Body.Close() } } if shouldMarkCreditsExhausted(creditsResp, creditsRespBody, reqErr) && account != nil { s.setCreditsExhausted(ctx, account) logger.LegacyPrintf("service.antigravity_gateway", "%s credit_overages_failed model=%s account=%d marked_exhausted=true status=%d body=%s", prefix, modelKey, account.ID, creditsStatusCode, truncateForLog(creditsRespBody, 200)) return } if account != nil { logger.LegacyPrintf("service.antigravity_gateway", "%s credit_overages_failed model=%s account=%d marked_exhausted=false status=%d err=%v body=%s", prefix, modelKey, account.ID, creditsStatusCode, reqErr, truncateForLog(creditsRespBody, 200)) } }