Commit 093a5a26 authored by erio's avatar erio
Browse files

feat(antigravity): progressive penalty for consecutive INTERNAL 500 errors

When an antigravity account returns 500 "Internal error encountered."
on all 3 retry attempts, increment a Redis counter and apply escalating
penalties:
- 1st round: temp unschedulable 10 minutes
- 2nd round: temp unschedulable 10 hours
- 3rd round: permanently mark as error

Counter resets on any successful response (< 400).
parent fdd8499f
...@@ -137,7 +137,8 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { ...@@ -137,7 +137,8 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
schedulerOutboxRepository := repository.NewSchedulerOutboxRepository(db) schedulerOutboxRepository := repository.NewSchedulerOutboxRepository(db)
schedulerSnapshotService := service.ProvideSchedulerSnapshotService(schedulerCache, schedulerOutboxRepository, accountRepository, groupRepository, configConfig) schedulerSnapshotService := service.ProvideSchedulerSnapshotService(schedulerCache, schedulerOutboxRepository, accountRepository, groupRepository, configConfig)
antigravityTokenProvider := service.ProvideAntigravityTokenProvider(accountRepository, geminiTokenCache, antigravityOAuthService, oauthRefreshAPI, tempUnschedCache) antigravityTokenProvider := service.ProvideAntigravityTokenProvider(accountRepository, geminiTokenCache, antigravityOAuthService, oauthRefreshAPI, tempUnschedCache)
antigravityGatewayService := service.NewAntigravityGatewayService(accountRepository, gatewayCache, schedulerSnapshotService, antigravityTokenProvider, rateLimitService, httpUpstream, settingService) internal500CounterCache := repository.NewInternal500CounterCache(redisClient)
antigravityGatewayService := service.NewAntigravityGatewayService(accountRepository, gatewayCache, schedulerSnapshotService, antigravityTokenProvider, rateLimitService, httpUpstream, settingService, internal500CounterCache)
tlsFingerprintProfileRepository := repository.NewTLSFingerprintProfileRepository(client) tlsFingerprintProfileRepository := repository.NewTLSFingerprintProfileRepository(client)
tlsFingerprintProfileCache := repository.NewTLSFingerprintProfileCache(redisClient) tlsFingerprintProfileCache := repository.NewTLSFingerprintProfileCache(redisClient)
tlsFingerprintProfileService := service.NewTLSFingerprintProfileService(tlsFingerprintProfileRepository, tlsFingerprintProfileCache) tlsFingerprintProfileService := service.NewTLSFingerprintProfileService(tlsFingerprintProfileRepository, tlsFingerprintProfileCache)
......
package repository
import (
"context"
"fmt"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/redis/go-redis/v9"
)
const (
internal500CounterPrefix = "internal500_count:account:"
internal500CounterTTLSeconds = 86400 // 24 小时兜底
)
// internal500CounterIncrScript 使用 Lua 脚本原子性地增加计数并返回当前值
// 如果 key 不存在,则创建并设置过期时间
var internal500CounterIncrScript = redis.NewScript(`
local key = KEYS[1]
local ttl = tonumber(ARGV[1])
local count = redis.call('INCR', key)
if count == 1 then
redis.call('EXPIRE', key, ttl)
end
return count
`)
type internal500CounterCache struct {
rdb *redis.Client
}
// NewInternal500CounterCache 创建 INTERNAL 500 连续失败计数器缓存实例
func NewInternal500CounterCache(rdb *redis.Client) service.Internal500CounterCache {
return &internal500CounterCache{rdb: rdb}
}
// IncrementInternal500Count 原子递增计数并返回当前值
func (c *internal500CounterCache) IncrementInternal500Count(ctx context.Context, accountID int64) (int64, error) {
key := fmt.Sprintf("%s%d", internal500CounterPrefix, accountID)
result, err := internal500CounterIncrScript.Run(ctx, c.rdb, []string{key}, internal500CounterTTLSeconds).Int64()
if err != nil {
return 0, fmt.Errorf("increment internal500 count: %w", err)
}
return result, nil
}
// ResetInternal500Count 清零计数器(成功响应时调用)
func (c *internal500CounterCache) ResetInternal500Count(ctx context.Context, accountID int64) error {
key := fmt.Sprintf("%s%d", internal500CounterPrefix, accountID)
return c.rdb.Del(ctx, key).Err()
}
...@@ -81,6 +81,7 @@ var ProviderSet = wire.NewSet( ...@@ -81,6 +81,7 @@ var ProviderSet = wire.NewSet(
NewAPIKeyCache, NewAPIKeyCache,
NewTempUnschedCache, NewTempUnschedCache,
NewTimeoutCounterCache, NewTimeoutCounterCache,
NewInternal500CounterCache,
ProvideConcurrencyCache, ProvideConcurrencyCache,
ProvideSessionLimitCache, ProvideSessionLimitCache,
NewRPMCache, NewRPMCache,
......
...@@ -71,6 +71,11 @@ const ( ...@@ -71,6 +71,11 @@ const (
// MODEL_CAPACITY_EXHAUSTED 全局去重:重试全部失败后的 cooldown 时间 // MODEL_CAPACITY_EXHAUSTED 全局去重:重试全部失败后的 cooldown 时间
antigravityModelCapacityCooldown = 10 * time.Second antigravityModelCapacityCooldown = 10 * time.Second
// INTERNAL 500 渐进惩罚:连续多轮全部返回特定 500 错误时的惩罚时长
internal500PenaltyTier1Duration = 10 * time.Minute
internal500PenaltyTier2Duration = 10 * time.Hour
internal500PenaltyTier3Threshold = 3 // 第 3+ 轮:永久禁用
) )
// antigravityPassthroughErrorMessages 透传给客户端的错误消息白名单(小写) // antigravityPassthroughErrorMessages 透传给客户端的错误消息白名单(小写)
...@@ -614,6 +619,7 @@ func (s *AntigravityGatewayService) antigravityRetryLoop(p antigravityRetryLoopP ...@@ -614,6 +619,7 @@ func (s *AntigravityGatewayService) antigravityRetryLoop(p antigravityRetryLoopP
urlFallbackLoop: urlFallbackLoop:
for urlIdx, baseURL := range availableURLs { for urlIdx, baseURL := range availableURLs {
usedBaseURL = baseURL usedBaseURL = baseURL
allAttemptsInternal500 := true // 追踪本轮所有 attempt 是否全部命中 INTERNAL 500
for attempt := 1; attempt <= antigravityMaxRetries; attempt++ { for attempt := 1; attempt <= antigravityMaxRetries; attempt++ {
select { select {
case <-p.ctx.Done(): case <-p.ctx.Done():
...@@ -766,10 +772,27 @@ urlFallbackLoop: ...@@ -766,10 +772,27 @@ urlFallbackLoop:
logger.LegacyPrintf("service.antigravity_gateway", "%s status=context_canceled_during_backoff", p.prefix) logger.LegacyPrintf("service.antigravity_gateway", "%s status=context_canceled_during_backoff", p.prefix)
return nil, p.ctx.Err() return nil, p.ctx.Err()
} }
// 追踪 INTERNAL 500:非匹配的 attempt 清除标记
if !isAntigravityInternalServerError(resp.StatusCode, respBody) {
allAttemptsInternal500 = false
}
continue continue
} }
} }
// INTERNAL 500 渐进惩罚:3 次重试全部命中特定 500 时递增计数器并惩罚
if allAttemptsInternal500 &&
isAntigravityInternalServerError(resp.StatusCode, respBody) &&
s.internal500Cache != nil {
count, incrErr := s.internal500Cache.IncrementInternal500Count(p.ctx, p.account.ID)
if incrErr != nil {
slog.Error("internal500_counter_increment_failed",
"prefix", p.prefix, "account_id", p.account.ID, "error", incrErr)
} else {
s.applyInternal500Penalty(p.ctx, p.prefix, p.account, count)
}
}
// 其他 4xx 错误或重试用尽,直接返回 // 其他 4xx 错误或重试用尽,直接返回
resp = &http.Response{ resp = &http.Response{
StatusCode: resp.StatusCode, StatusCode: resp.StatusCode,
...@@ -779,7 +802,13 @@ urlFallbackLoop: ...@@ -779,7 +802,13 @@ urlFallbackLoop:
break urlFallbackLoop break urlFallbackLoop
} }
// 成功响应(< 400) // 成功响应(< 400):清零 INTERNAL 500 连续失败计数器
if s.internal500Cache != nil {
if err := s.internal500Cache.ResetInternal500Count(p.ctx, p.account.ID); err != nil {
slog.Error("internal500_counter_reset_failed",
"prefix", p.prefix, "account_id", p.account.ID, "error", err)
}
}
break urlFallbackLoop break urlFallbackLoop
} }
} }
...@@ -801,6 +830,56 @@ func shouldRetryAntigravityError(statusCode int) bool { ...@@ -801,6 +830,56 @@ func shouldRetryAntigravityError(statusCode int) bool {
} }
} }
// isAntigravityInternalServerError 检测特定的 INTERNAL 500 错误
// 必须同时匹配 error.code==500, error.message=="Internal error encountered.", error.status=="INTERNAL"
func isAntigravityInternalServerError(statusCode int, body []byte) bool {
if statusCode != http.StatusInternalServerError {
return false
}
return gjson.GetBytes(body, "error.code").Int() == 500 &&
gjson.GetBytes(body, "error.message").String() == "Internal error encountered." &&
gjson.GetBytes(body, "error.status").String() == "INTERNAL"
}
// applyInternal500Penalty 根据连续 INTERNAL 500 轮次数应用渐进惩罚
// count=1: temp_unschedulable 10 分钟
// count=2: temp_unschedulable 10 小时
// count>=3: SetError 永久禁用
func (s *AntigravityGatewayService) applyInternal500Penalty(
ctx context.Context, prefix string, account *Account, count int64,
) {
switch {
case count >= int64(internal500PenaltyTier3Threshold):
reason := fmt.Sprintf("INTERNAL 500 consecutive failures: %d rounds", count)
if err := s.accountRepo.SetError(ctx, account.ID, reason); err != nil {
slog.Error("internal500_set_error_failed", "account_id", account.ID, "error", err)
return
}
slog.Warn("internal500_account_disabled",
"account_id", account.ID, "account_name", account.Name, "consecutive_count", count)
case count == 2:
until := time.Now().Add(internal500PenaltyTier2Duration)
reason := fmt.Sprintf("INTERNAL 500 x%d (temp unsched 10h)", count)
if err := s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason); err != nil {
slog.Error("internal500_temp_unsched_failed", "account_id", account.ID, "error", err)
return
}
slog.Warn("internal500_temp_unschedulable",
"account_id", account.ID, "account_name", account.Name,
"duration", internal500PenaltyTier2Duration, "consecutive_count", count)
case count == 1:
until := time.Now().Add(internal500PenaltyTier1Duration)
reason := fmt.Sprintf("INTERNAL 500 x%d (temp unsched 10m)", count)
if err := s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason); err != nil {
slog.Error("internal500_temp_unsched_failed", "account_id", account.ID, "error", err)
return
}
slog.Info("internal500_temp_unschedulable",
"account_id", account.ID, "account_name", account.Name,
"duration", internal500PenaltyTier1Duration, "consecutive_count", count)
}
}
// isURLLevelRateLimit 判断是否为 URL 级别的限流(应切换 URL 重试) // isURLLevelRateLimit 判断是否为 URL 级别的限流(应切换 URL 重试)
// "Resource has been exhausted" 是 URL/节点级别限流,切换 URL 可能成功 // "Resource has been exhausted" 是 URL/节点级别限流,切换 URL 可能成功
// "exhausted your capacity on this model" 是账户/模型配额限流,切换 URL 无效 // "exhausted your capacity on this model" 是账户/模型配额限流,切换 URL 无效
...@@ -862,6 +941,7 @@ type AntigravityGatewayService struct { ...@@ -862,6 +941,7 @@ type AntigravityGatewayService struct {
settingService *SettingService settingService *SettingService
cache GatewayCache // 用于模型级限流时清除粘性会话绑定 cache GatewayCache // 用于模型级限流时清除粘性会话绑定
schedulerSnapshot *SchedulerSnapshotService schedulerSnapshot *SchedulerSnapshotService
internal500Cache Internal500CounterCache // INTERNAL 500 渐进惩罚计数器
} }
func NewAntigravityGatewayService( func NewAntigravityGatewayService(
...@@ -872,6 +952,7 @@ func NewAntigravityGatewayService( ...@@ -872,6 +952,7 @@ func NewAntigravityGatewayService(
rateLimitService *RateLimitService, rateLimitService *RateLimitService,
httpUpstream HTTPUpstream, httpUpstream HTTPUpstream,
settingService *SettingService, settingService *SettingService,
internal500Cache Internal500CounterCache,
) *AntigravityGatewayService { ) *AntigravityGatewayService {
return &AntigravityGatewayService{ return &AntigravityGatewayService{
accountRepo: accountRepo, accountRepo: accountRepo,
...@@ -881,6 +962,7 @@ func NewAntigravityGatewayService( ...@@ -881,6 +962,7 @@ func NewAntigravityGatewayService(
settingService: settingService, settingService: settingService,
cache: cache, cache: cache,
schedulerSnapshot: schedulerSnapshot, schedulerSnapshot: schedulerSnapshot,
internal500Cache: internal500Cache,
} }
} }
......
package service
import "context"
// Internal500CounterCache 追踪 Antigravity 账号连续 INTERNAL 500 失败轮数
type Internal500CounterCache interface {
// IncrementInternal500Count 原子递增计数并返回当前值
IncrementInternal500Count(ctx context.Context, accountID int64) (int64, error)
// ResetInternal500Count 清零计数器(成功响应时调用)
ResetInternal500Count(ctx context.Context, accountID int64) error
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment