Commit 3ee6f085 authored by erio's avatar erio
Browse files

refactor: extract internal500 penalty logic to dedicated file

Move constants, detection, and penalty functions from
antigravity_gateway_service.go to antigravity_internal500_penalty.go.
Fix gofmt alignment and replace hardcoded duration strings with
constant references.
parent 7cca69a1
...@@ -9,7 +9,7 @@ import ( ...@@ -9,7 +9,7 @@ import (
) )
const ( const (
internal500CounterPrefix = "internal500_count:account:" internal500CounterPrefix = "internal500_count:account:"
internal500CounterTTLSeconds = 86400 // 24 小时兜底 internal500CounterTTLSeconds = 86400 // 24 小时兜底
) )
......
...@@ -71,11 +71,6 @@ const ( ...@@ -71,11 +71,6 @@ const (
// MODEL_CAPACITY_EXHAUSTED 全局去重:重试全部失败后的 cooldown 时间 // MODEL_CAPACITY_EXHAUSTED 全局去重:重试全部失败后的 cooldown 时间
antigravityModelCapacityCooldown = 10 * time.Second antigravityModelCapacityCooldown = 10 * time.Second
// INTERNAL 500 渐进惩罚:连续多轮全部返回特定 500 错误时的惩罚时长
internal500PenaltyTier1Duration = 10 * time.Minute
internal500PenaltyTier2Duration = 10 * time.Hour
internal500PenaltyTier3Threshold = 3 // 第 3+ 轮:永久禁用
) )
// antigravityPassthroughErrorMessages 透传给客户端的错误消息白名单(小写) // antigravityPassthroughErrorMessages 透传给客户端的错误消息白名单(小写)
...@@ -781,16 +776,8 @@ urlFallbackLoop: ...@@ -781,16 +776,8 @@ urlFallbackLoop:
} }
// INTERNAL 500 渐进惩罚:3 次重试全部命中特定 500 时递增计数器并惩罚 // INTERNAL 500 渐进惩罚:3 次重试全部命中特定 500 时递增计数器并惩罚
if allAttemptsInternal500 && if allAttemptsInternal500 && isAntigravityInternalServerError(resp.StatusCode, respBody) {
isAntigravityInternalServerError(resp.StatusCode, respBody) && s.handleInternal500RetryExhausted(p.ctx, p.prefix, p.account)
s.internal500Cache != nil {
count, incrErr := s.internal500Cache.IncrementInternal500Count(p.ctx, p.account.ID)
if incrErr != nil {
slog.Error("internal500_counter_increment_failed",
"prefix", p.prefix, "account_id", p.account.ID, "error", incrErr)
} else {
s.applyInternal500Penalty(p.ctx, p.prefix, p.account, count)
}
} }
// 其他 4xx 错误或重试用尽,直接返回 // 其他 4xx 错误或重试用尽,直接返回
...@@ -812,11 +799,8 @@ urlFallbackLoop: ...@@ -812,11 +799,8 @@ urlFallbackLoop:
} }
// 成功响应时清零 INTERNAL 500 连续失败计数器(覆盖所有成功路径,含 smart retry) // 成功响应时清零 INTERNAL 500 连续失败计数器(覆盖所有成功路径,含 smart retry)
if resp != nil && resp.StatusCode < 400 && s.internal500Cache != nil { if resp != nil && resp.StatusCode < 400 {
if err := s.internal500Cache.ResetInternal500Count(p.ctx, p.account.ID); err != nil { s.resetInternal500Counter(p.ctx, p.prefix, p.account.ID)
slog.Error("internal500_counter_reset_failed",
"prefix", p.prefix, "account_id", p.account.ID, "error", err)
}
} }
return &antigravityRetryLoopResult{resp: resp}, nil return &antigravityRetryLoopResult{resp: resp}, nil
...@@ -832,56 +816,6 @@ func shouldRetryAntigravityError(statusCode int) bool { ...@@ -832,56 +816,6 @@ func shouldRetryAntigravityError(statusCode int) bool {
} }
} }
// isAntigravityInternalServerError 检测特定的 INTERNAL 500 错误
// 必须同时匹配 error.code==500, error.message=="Internal error encountered.", error.status=="INTERNAL"
func isAntigravityInternalServerError(statusCode int, body []byte) bool {
if statusCode != http.StatusInternalServerError {
return false
}
return gjson.GetBytes(body, "error.code").Int() == 500 &&
gjson.GetBytes(body, "error.message").String() == "Internal error encountered." &&
gjson.GetBytes(body, "error.status").String() == "INTERNAL"
}
// applyInternal500Penalty 根据连续 INTERNAL 500 轮次数应用渐进惩罚
// count=1: temp_unschedulable 10 分钟
// count=2: temp_unschedulable 10 小时
// count>=3: SetError 永久禁用
func (s *AntigravityGatewayService) applyInternal500Penalty(
ctx context.Context, prefix string, account *Account, count int64,
) {
switch {
case count >= int64(internal500PenaltyTier3Threshold):
reason := fmt.Sprintf("INTERNAL 500 consecutive failures: %d rounds", count)
if err := s.accountRepo.SetError(ctx, account.ID, reason); err != nil {
slog.Error("internal500_set_error_failed", "account_id", account.ID, "error", err)
return
}
slog.Warn("internal500_account_disabled",
"account_id", account.ID, "account_name", account.Name, "consecutive_count", count)
case count == 2:
until := time.Now().Add(internal500PenaltyTier2Duration)
reason := fmt.Sprintf("INTERNAL 500 x%d (temp unsched 10h)", count)
if err := s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason); err != nil {
slog.Error("internal500_temp_unsched_failed", "account_id", account.ID, "error", err)
return
}
slog.Warn("internal500_temp_unschedulable",
"account_id", account.ID, "account_name", account.Name,
"duration", internal500PenaltyTier2Duration, "consecutive_count", count)
case count == 1:
until := time.Now().Add(internal500PenaltyTier1Duration)
reason := fmt.Sprintf("INTERNAL 500 x%d (temp unsched 10m)", count)
if err := s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason); err != nil {
slog.Error("internal500_temp_unsched_failed", "account_id", account.ID, "error", err)
return
}
slog.Info("internal500_temp_unschedulable",
"account_id", account.ID, "account_name", account.Name,
"duration", internal500PenaltyTier1Duration, "consecutive_count", count)
}
}
// isURLLevelRateLimit 判断是否为 URL 级别的限流(应切换 URL 重试) // isURLLevelRateLimit 判断是否为 URL 级别的限流(应切换 URL 重试)
// "Resource has been exhausted" 是 URL/节点级别限流,切换 URL 可能成功 // "Resource has been exhausted" 是 URL/节点级别限流,切换 URL 可能成功
// "exhausted your capacity on this model" 是账户/模型配额限流,切换 URL 无效 // "exhausted your capacity on this model" 是账户/模型配额限流,切换 URL 无效
......
package service
import (
"context"
"fmt"
"log/slog"
"net/http"
"time"
"github.com/tidwall/gjson"
)
// INTERNAL 500 渐进惩罚:连续多轮全部返回特定 500 错误时的惩罚时长
const (
internal500PenaltyTier1Duration = 10 * time.Minute // 第 1 轮:临时不可调度 10 分钟
internal500PenaltyTier2Duration = 10 * time.Hour // 第 2 轮:临时不可调度 10 小时
internal500PenaltyTier3Threshold = 3 // 第 3+ 轮:永久禁用
)
// isAntigravityInternalServerError 检测特定的 INTERNAL 500 错误
// 必须同时匹配 error.code==500, error.message=="Internal error encountered.", error.status=="INTERNAL"
func isAntigravityInternalServerError(statusCode int, body []byte) bool {
if statusCode != http.StatusInternalServerError {
return false
}
return gjson.GetBytes(body, "error.code").Int() == 500 &&
gjson.GetBytes(body, "error.message").String() == "Internal error encountered." &&
gjson.GetBytes(body, "error.status").String() == "INTERNAL"
}
// applyInternal500Penalty 根据连续 INTERNAL 500 轮次数应用渐进惩罚
// count=1: temp_unschedulable 10 分钟
// count=2: temp_unschedulable 10 小时
// count>=3: SetError 永久禁用
func (s *AntigravityGatewayService) applyInternal500Penalty(
ctx context.Context, prefix string, account *Account, count int64,
) {
switch {
case count >= int64(internal500PenaltyTier3Threshold):
reason := fmt.Sprintf("INTERNAL 500 consecutive failures: %d rounds", count)
if err := s.accountRepo.SetError(ctx, account.ID, reason); err != nil {
slog.Error("internal500_set_error_failed", "account_id", account.ID, "error", err)
return
}
slog.Warn("internal500_account_disabled",
"account_id", account.ID, "account_name", account.Name, "consecutive_count", count)
case count == 2:
until := time.Now().Add(internal500PenaltyTier2Duration)
reason := fmt.Sprintf("INTERNAL 500 x%d (temp unsched %v)", count, internal500PenaltyTier2Duration)
if err := s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason); err != nil {
slog.Error("internal500_temp_unsched_failed", "account_id", account.ID, "error", err)
return
}
slog.Warn("internal500_temp_unschedulable",
"account_id", account.ID, "account_name", account.Name,
"duration", internal500PenaltyTier2Duration, "consecutive_count", count)
case count == 1:
until := time.Now().Add(internal500PenaltyTier1Duration)
reason := fmt.Sprintf("INTERNAL 500 x%d (temp unsched %v)", count, internal500PenaltyTier1Duration)
if err := s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason); err != nil {
slog.Error("internal500_temp_unsched_failed", "account_id", account.ID, "error", err)
return
}
slog.Info("internal500_temp_unschedulable",
"account_id", account.ID, "account_name", account.Name,
"duration", internal500PenaltyTier1Duration, "consecutive_count", count)
}
}
// handleInternal500RetryExhausted 处理 INTERNAL 500 重试耗尽:递增计数器并应用惩罚
func (s *AntigravityGatewayService) handleInternal500RetryExhausted(
ctx context.Context, prefix string, account *Account,
) {
if s.internal500Cache == nil {
return
}
count, err := s.internal500Cache.IncrementInternal500Count(ctx, account.ID)
if err != nil {
slog.Error("internal500_counter_increment_failed",
"prefix", prefix, "account_id", account.ID, "error", err)
return
}
s.applyInternal500Penalty(ctx, prefix, account, count)
}
// resetInternal500Counter 成功响应时清零 INTERNAL 500 计数器
func (s *AntigravityGatewayService) resetInternal500Counter(
ctx context.Context, prefix string, accountID int64,
) {
if s.internal500Cache == nil {
return
}
if err := s.internal500Cache.ResetInternal500Count(ctx, accountID); err != nil {
slog.Error("internal500_counter_reset_failed",
"prefix", prefix, "account_id", accountID, "error", err)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment