Unverified Commit 9a88df7f authored by Wesley Liddick's avatar Wesley Liddick Committed by GitHub
Browse files

Merge pull request #1167 from touwaeriol/pr/proxy-fast-fail

fix(antigravity): fast-fail on proxy unavailable, temp-unschedule account
parents a47f622e 528ff5d2
......@@ -136,7 +136,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
gatewayCache := repository.NewGatewayCache(redisClient)
schedulerOutboxRepository := repository.NewSchedulerOutboxRepository(db)
schedulerSnapshotService := service.ProvideSchedulerSnapshotService(schedulerCache, schedulerOutboxRepository, accountRepository, groupRepository, configConfig)
antigravityTokenProvider := service.ProvideAntigravityTokenProvider(accountRepository, geminiTokenCache, antigravityOAuthService, oauthRefreshAPI)
antigravityTokenProvider := service.ProvideAntigravityTokenProvider(accountRepository, geminiTokenCache, antigravityOAuthService, oauthRefreshAPI, tempUnschedCache)
antigravityGatewayService := service.NewAntigravityGatewayService(accountRepository, gatewayCache, schedulerSnapshotService, antigravityTokenProvider, rateLimitService, httpUpstream, settingService)
accountTestService := service.NewAccountTestService(accountRepository, geminiTokenProvider, antigravityGatewayService, httpUpstream, configConfig)
crsSyncService := service.NewCRSSyncService(accountRepository, proxyRepository, oAuthService, openAIOAuthService, geminiOAuthService, configConfig)
......
......@@ -228,9 +228,18 @@ type Client struct {
httpClient *http.Client
}
const (
// proxyDialTimeout 代理 TCP 连接超时(含代理握手),代理不通时快速失败
proxyDialTimeout = 5 * time.Second
// proxyTLSHandshakeTimeout 代理 TLS 握手超时
proxyTLSHandshakeTimeout = 5 * time.Second
// clientTimeout 整体请求超时(含连接、发送、等待响应、读取 body)
clientTimeout = 10 * time.Second
)
func NewClient(proxyURL string) (*Client, error) {
client := &http.Client{
Timeout: 30 * time.Second,
Timeout: clientTimeout,
}
_, parsed, err := proxyurl.Parse(proxyURL)
......@@ -238,7 +247,12 @@ func NewClient(proxyURL string) (*Client, error) {
return nil, err
}
if parsed != nil {
transport := &http.Transport{}
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: proxyDialTimeout,
}).DialContext,
TLSHandshakeTimeout: proxyTLSHandshakeTimeout,
}
if err := proxyutil.ConfigureTransportProxy(transport, parsed); err != nil {
return nil, fmt.Errorf("configure proxy: %w", err)
}
......@@ -250,8 +264,8 @@ func NewClient(proxyURL string) (*Client, error) {
}, nil
}
// isConnectionError 判断是否为连接错误(网络超时、DNS 失败、连接拒绝)
func isConnectionError(err error) bool {
// IsConnectionError 判断是否为连接错误(网络超时、DNS 失败、连接拒绝)
func IsConnectionError(err error) bool {
if err == nil {
return false
}
......@@ -276,7 +290,7 @@ func isConnectionError(err error) bool {
// shouldFallbackToNextURL 判断是否应切换到下一个 URL
// 与 Antigravity-Manager 保持一致:连接错误、429、408、404、5xx 触发 URL 降级
func shouldFallbackToNextURL(err error, statusCode int) bool {
if isConnectionError(err) {
if IsConnectionError(err) {
return true
}
return statusCode == http.StatusTooManyRequests ||
......
......@@ -274,8 +274,8 @@ func TestNewClient_无代理(t *testing.T) {
if client.httpClient == nil {
t.Fatal("httpClient 为 nil")
}
if client.httpClient.Timeout != 30*time.Second {
t.Errorf("Timeout 不匹配: got %v, want 30s", client.httpClient.Timeout)
if client.httpClient.Timeout != clientTimeout {
t.Errorf("Timeout 不匹配: got %v, want %v", client.httpClient.Timeout, clientTimeout)
}
// 无代理时 Transport 应为 nil(使用默认)
if client.httpClient.Transport != nil {
......@@ -322,11 +322,11 @@ func TestNewClient_无效代理URL(t *testing.T) {
}
// ---------------------------------------------------------------------------
// isConnectionError
// IsConnectionError
// ---------------------------------------------------------------------------
func TestIsConnectionError_nil(t *testing.T) {
if isConnectionError(nil) {
if IsConnectionError(nil) {
t.Error("nil 错误不应判定为连接错误")
}
}
......@@ -338,7 +338,7 @@ func TestIsConnectionError_超时错误(t *testing.T) {
Net: "tcp",
Err: &timeoutError{},
}
if !isConnectionError(err) {
if !IsConnectionError(err) {
t.Error("超时错误应判定为连接错误")
}
}
......@@ -356,7 +356,7 @@ func TestIsConnectionError_netOpError(t *testing.T) {
Net: "tcp",
Err: fmt.Errorf("connection refused"),
}
if !isConnectionError(err) {
if !IsConnectionError(err) {
t.Error("net.OpError 应判定为连接错误")
}
}
......@@ -367,14 +367,14 @@ func TestIsConnectionError_urlError(t *testing.T) {
URL: "https://example.com",
Err: fmt.Errorf("some error"),
}
if !isConnectionError(err) {
if !IsConnectionError(err) {
t.Error("url.Error 应判定为连接错误")
}
}
func TestIsConnectionError_普通错误(t *testing.T) {
err := fmt.Errorf("some random error")
if isConnectionError(err) {
if IsConnectionError(err) {
t.Error("普通错误不应判定为连接错误")
}
}
......@@ -386,7 +386,7 @@ func TestIsConnectionError_包装的netOpError(t *testing.T) {
Err: fmt.Errorf("connection refused"),
}
err := fmt.Errorf("wrapping: %w", inner)
if !isConnectionError(err) {
if !IsConnectionError(err) {
t.Error("被包装的 net.OpError 应判定为连接错误")
}
}
......
......@@ -17,6 +17,7 @@ package httpclient
import (
"fmt"
"net"
"net/http"
"strings"
"sync"
......@@ -32,6 +33,8 @@ const (
defaultMaxIdleConns = 100 // 最大空闲连接数
defaultMaxIdleConnsPerHost = 10 // 每个主机最大空闲连接数
defaultIdleConnTimeout = 90 * time.Second // 空闲连接超时时间(建议小于上游 LB 超时)
defaultDialTimeout = 5 * time.Second // TCP 连接超时(含代理握手),代理不通时快速失败
defaultTLSHandshakeTimeout = 5 * time.Second // TLS 握手超时
validatedHostTTL = 30 * time.Second // DNS Rebinding 校验缓存 TTL
)
......@@ -107,6 +110,10 @@ func buildTransport(opts Options) (*http.Transport, error) {
}
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: defaultDialTimeout,
}).DialContext,
TLSHandshakeTimeout: defaultTLSHandshakeTimeout,
MaxIdleConns: maxIdleConns,
MaxIdleConnsPerHost: maxIdleConnsPerHost,
MaxConnsPerHost: opts.MaxConnsPerHost, // 0 表示无限制
......
......@@ -40,7 +40,7 @@ func NewProxyExitInfoProber(cfg *config.Config) service.ProxyExitInfoProber {
}
const (
defaultProxyProbeTimeout = 30 * time.Second
defaultProxyProbeTimeout = 10 * time.Second
defaultProxyProbeResponseMaxBytes = int64(1024 * 1024)
)
......
......@@ -1359,7 +1359,10 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
}
accessToken, err := s.tokenProvider.GetAccessToken(ctx, account)
if err != nil {
return nil, s.writeClaudeError(c, http.StatusBadGateway, "authentication_error", "Failed to get upstream access token")
return nil, &UpstreamFailoverError{
StatusCode: http.StatusBadGateway,
ResponseBody: []byte(`{"error":{"type":"authentication_error","message":"Failed to get upstream access token"},"type":"error"}`),
}
}
// 获取 project_id(部分账户类型可能没有)
......@@ -2101,7 +2104,10 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
}
accessToken, err := s.tokenProvider.GetAccessToken(ctx, account)
if err != nil {
return nil, s.writeGoogleError(c, http.StatusBadGateway, "Failed to get upstream access token")
return nil, &UpstreamFailoverError{
StatusCode: http.StatusBadGateway,
ResponseBody: []byte(`{"error":{"message":"Failed to get upstream access token","status":"UNAVAILABLE"}}`),
}
}
// 获取 project_id(部分账户类型可能没有)
......
......@@ -192,6 +192,10 @@ func (s *AntigravityOAuthService) RefreshToken(ctx context.Context, refreshToken
if isNonRetryableAntigravityOAuthError(err) {
return nil, err
}
// 代理连接错误(TCP 超时、连接拒绝、DNS 失败)不重试,立即返回
if antigravity.IsConnectionError(err) {
return nil, fmt.Errorf("proxy unavailable: %w", err)
}
lastErr = err
}
......
......@@ -14,6 +14,10 @@ const (
antigravityTokenRefreshSkew = 3 * time.Minute
antigravityTokenCacheSkew = 5 * time.Minute
antigravityBackfillCooldown = 5 * time.Minute
// antigravityRequestRefreshTimeout 请求路径上 token 刷新的最大等待时间。
// 超过此时间直接放弃刷新、标记账号临时不可调度并触发 failover,
// 让后台 TokenRefreshService 在下个周期继续重试。
antigravityRequestRefreshTimeout = 8 * time.Second
)
// AntigravityTokenCache token cache interface.
......@@ -28,6 +32,7 @@ type AntigravityTokenProvider struct {
refreshAPI *OAuthRefreshAPI
executor OAuthRefreshExecutor
refreshPolicy ProviderRefreshPolicy
tempUnschedCache TempUnschedCache // 用于同步更新 Redis 临时不可调度缓存
}
func NewAntigravityTokenProvider(
......@@ -54,6 +59,11 @@ func (p *AntigravityTokenProvider) SetRefreshPolicy(policy ProviderRefreshPolicy
p.refreshPolicy = policy
}
// SetTempUnschedCache injects temp unschedulable cache for immediate scheduler sync.
func (p *AntigravityTokenProvider) SetTempUnschedCache(cache TempUnschedCache) {
p.tempUnschedCache = cache
}
// GetAccessToken returns a valid access_token.
func (p *AntigravityTokenProvider) GetAccessToken(ctx context.Context, account *Account) (string, error) {
if account == nil {
......@@ -88,8 +98,13 @@ func (p *AntigravityTokenProvider) GetAccessToken(ctx context.Context, account *
expiresAt := account.GetCredentialAsTime("expires_at")
needsRefresh := expiresAt == nil || time.Until(*expiresAt) <= antigravityTokenRefreshSkew
if needsRefresh && p.refreshAPI != nil && p.executor != nil {
result, err := p.refreshAPI.RefreshIfNeeded(ctx, account, p.executor, antigravityTokenRefreshSkew)
// 请求路径使用短超时,避免代理不通时阻塞过久(后台刷新服务会继续重试)
refreshCtx, cancel := context.WithTimeout(ctx, antigravityRequestRefreshTimeout)
defer cancel()
result, err := p.refreshAPI.RefreshIfNeeded(refreshCtx, account, p.executor, antigravityTokenRefreshSkew)
if err != nil {
// 标记账号临时不可调度,避免后续请求继续命中
p.markTempUnschedulable(account, err)
if p.refreshPolicy.OnRefreshError == ProviderRefreshErrorReturn {
return "", err
}
......@@ -172,6 +187,45 @@ func (p *AntigravityTokenProvider) shouldAttemptBackfill(accountID int64) bool {
return true
}
// markTempUnschedulable 在请求路径上 token 刷新失败时标记账号临时不可调度。
// 同时写 DB 和 Redis 缓存,确保调度器立即跳过该账号。
// 使用 background context 因为请求 context 可能已超时。
func (p *AntigravityTokenProvider) markTempUnschedulable(account *Account, refreshErr error) {
if p.accountRepo == nil || account == nil {
return
}
now := time.Now()
until := now.Add(tokenRefreshTempUnschedDuration)
reason := "token refresh failed on request path: " + refreshErr.Error()
bgCtx := context.Background()
if err := p.accountRepo.SetTempUnschedulable(bgCtx, account.ID, until, reason); err != nil {
slog.Warn("antigravity_token_provider.set_temp_unschedulable_failed",
"account_id", account.ID,
"error", err,
)
return
}
slog.Warn("antigravity_token_provider.temp_unschedulable_set",
"account_id", account.ID,
"until", until.Format(time.RFC3339),
"reason", reason,
)
// 同步写 Redis 缓存,调度器立即生效
if p.tempUnschedCache != nil {
state := &TempUnschedState{
UntilUnix: until.Unix(),
TriggeredAtUnix: now.Unix(),
ErrorMessage: reason,
}
if err := p.tempUnschedCache.SetTempUnsched(bgCtx, account.ID, state); err != nil {
slog.Warn("antigravity_token_provider.temp_unsched_cache_set_failed",
"account_id", account.ID,
"error", err,
)
}
}
}
func (p *AntigravityTokenProvider) markBackfillAttempted(accountID int64) {
p.backfillCooldown.Store(accountID, time.Now())
}
......
......@@ -12,6 +12,9 @@ import (
"github.com/Wei-Shaw/sub2api/internal/config"
)
// tokenRefreshTempUnschedDuration token 刷新重试耗尽后临时不可调度的持续时间
const tokenRefreshTempUnschedDuration = 10 * time.Minute
// TokenRefreshService OAuth token自动刷新服务
// 定期检查并刷新即将过期的token
type TokenRefreshService struct {
......@@ -317,7 +320,7 @@ func (s *TokenRefreshService) refreshWithRetry(ctx context.Context, account *Acc
}
}
// 可重试错误耗尽:仅记录日志,不标记 error(可能是临时网络问题,下个周期继续重试)
// 可重试错误耗尽:临时标记账号不可调度,避免请求路径反复命中已知失败的账号
slog.Warn("token_refresh.retry_exhausted",
"account_id", account.ID,
"platform", account.Platform,
......@@ -325,6 +328,21 @@ func (s *TokenRefreshService) refreshWithRetry(ctx context.Context, account *Acc
"error", lastErr,
)
// 设置临时不可调度 10 分钟(不标记 error,保持 status=active 让下个刷新周期能继续尝试)
until := time.Now().Add(tokenRefreshTempUnschedDuration)
reason := fmt.Sprintf("token refresh retry exhausted: %v", lastErr)
if setErr := s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason); setErr != nil {
slog.Warn("token_refresh.set_temp_unschedulable_failed",
"account_id", account.ID,
"error", setErr,
)
} else {
slog.Info("token_refresh.temp_unschedulable_set",
"account_id", account.ID,
"until", until.Format(time.RFC3339),
)
}
return lastErr
}
......
......@@ -114,11 +114,13 @@ func ProvideAntigravityTokenProvider(
tokenCache GeminiTokenCache,
antigravityOAuthService *AntigravityOAuthService,
refreshAPI *OAuthRefreshAPI,
tempUnschedCache TempUnschedCache,
) *AntigravityTokenProvider {
p := NewAntigravityTokenProvider(accountRepo, tokenCache, antigravityOAuthService)
executor := NewAntigravityTokenRefresher(antigravityOAuthService)
p.SetRefreshAPI(refreshAPI, executor)
p.SetRefreshPolicy(AntigravityProviderRefreshPolicy())
p.SetTempUnschedCache(tempUnschedCache)
return p
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment