Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
陈曦
sub2api
Commits
03c75787
Unverified
Commit
03c75787
authored
Jan 19, 2026
by
Wesley Liddick
Committed by
GitHub
Jan 19, 2026
Browse files
Merge pull request #325 from slovx2/main
fix(antigravity): 修复Antigravity 频繁429的问题,以及一系列优化,配置增强
parents
de6797c5
c115c9e0
Changes
28
Show whitespace changes
Inline
Side-by-side
backend/internal/config/config.go
View file @
03c75787
...
...
@@ -257,6 +257,14 @@ type GatewayConfig struct {
// 是否允许对部分 400 错误触发 failover(默认关闭以避免改变语义)
FailoverOn400
bool
`mapstructure:"failover_on_400"`
// 账户切换最大次数(遇到上游错误时切换到其他账户的次数上限)
MaxAccountSwitches
int
`mapstructure:"max_account_switches"`
// Gemini 账户切换最大次数(Gemini 平台单独配置,因 API 限制更严格)
MaxAccountSwitchesGemini
int
`mapstructure:"max_account_switches_gemini"`
// Antigravity 429 fallback 限流时间(分钟),解析重置时间失败时使用
AntigravityFallbackCooldownMinutes
int
`mapstructure:"antigravity_fallback_cooldown_minutes"`
// Scheduling: 账号调度相关配置
Scheduling
GatewaySchedulingConfig
`mapstructure:"scheduling"`
...
...
@@ -298,6 +306,9 @@ type GatewaySchedulingConfig struct {
FallbackWaitTimeout
time
.
Duration
`mapstructure:"fallback_wait_timeout"`
FallbackMaxWaiting
int
`mapstructure:"fallback_max_waiting"`
// 兜底层账户选择策略: "last_used"(按最后使用时间排序,默认) 或 "random"(随机)
FallbackSelectionMode
string
`mapstructure:"fallback_selection_mode"`
// 负载计算
LoadBatchEnabled
bool
`mapstructure:"load_batch_enabled"`
...
...
@@ -786,6 +797,9 @@ func setDefaults() {
viper
.
SetDefault
(
"gateway.log_upstream_error_body_max_bytes"
,
2048
)
viper
.
SetDefault
(
"gateway.inject_beta_for_apikey"
,
false
)
viper
.
SetDefault
(
"gateway.failover_on_400"
,
false
)
viper
.
SetDefault
(
"gateway.max_account_switches"
,
10
)
viper
.
SetDefault
(
"gateway.max_account_switches_gemini"
,
3
)
viper
.
SetDefault
(
"gateway.antigravity_fallback_cooldown_minutes"
,
1
)
viper
.
SetDefault
(
"gateway.max_body_size"
,
int64
(
100
*
1024
*
1024
))
viper
.
SetDefault
(
"gateway.connection_pool_isolation"
,
ConnectionPoolIsolationAccountProxy
)
// HTTP 上游连接池配置(针对 5000+ 并发用户优化)
...
...
@@ -798,11 +812,12 @@ func setDefaults() {
viper
.
SetDefault
(
"gateway.concurrency_slot_ttl_minutes"
,
30
)
// 并发槽位过期时间(支持超长请求)
viper
.
SetDefault
(
"gateway.stream_data_interval_timeout"
,
180
)
viper
.
SetDefault
(
"gateway.stream_keepalive_interval"
,
10
)
viper
.
SetDefault
(
"gateway.max_line_size"
,
1
0
*
1024
*
1024
)
viper
.
SetDefault
(
"gateway.max_line_size"
,
4
0
*
1024
*
1024
)
viper
.
SetDefault
(
"gateway.scheduling.sticky_session_max_waiting"
,
3
)
viper
.
SetDefault
(
"gateway.scheduling.sticky_session_wait_timeout"
,
120
*
time
.
Second
)
viper
.
SetDefault
(
"gateway.scheduling.fallback_wait_timeout"
,
30
*
time
.
Second
)
viper
.
SetDefault
(
"gateway.scheduling.fallback_max_waiting"
,
100
)
viper
.
SetDefault
(
"gateway.scheduling.fallback_selection_mode"
,
"last_used"
)
viper
.
SetDefault
(
"gateway.scheduling.load_batch_enabled"
,
true
)
viper
.
SetDefault
(
"gateway.scheduling.slot_cleanup_interval"
,
30
*
time
.
Second
)
viper
.
SetDefault
(
"gateway.scheduling.db_fallback_enabled"
,
true
)
...
...
backend/internal/handler/admin/account_handler.go
View file @
03c75787
...
...
@@ -541,6 +541,36 @@ func (h *AccountHandler) Refresh(c *gin.Context) {
newCredentials
[
k
]
=
v
}
}
// 如果 project_id 获取失败,先更新凭证,再标记账户为 error
if
tokenInfo
.
ProjectIDMissing
{
// 先更新凭证
_
,
updateErr
:=
h
.
adminService
.
UpdateAccount
(
c
.
Request
.
Context
(),
accountID
,
&
service
.
UpdateAccountInput
{
Credentials
:
newCredentials
,
})
if
updateErr
!=
nil
{
response
.
InternalError
(
c
,
"Failed to update credentials: "
+
updateErr
.
Error
())
return
}
// 标记账户为 error
if
setErr
:=
h
.
adminService
.
SetAccountError
(
c
.
Request
.
Context
(),
accountID
,
"missing_project_id: 账户缺少project id,可能无法使用Antigravity"
);
setErr
!=
nil
{
response
.
InternalError
(
c
,
"Failed to set account error: "
+
setErr
.
Error
())
return
}
response
.
Success
(
c
,
gin
.
H
{
"message"
:
"Token refreshed but project_id is missing, account marked as error"
,
"warning"
:
"missing_project_id"
,
})
return
}
// 成功获取到 project_id,如果之前是 missing_project_id 错误则清除
if
account
.
Status
==
service
.
StatusError
&&
strings
.
Contains
(
account
.
ErrorMessage
,
"missing_project_id:"
)
{
if
_
,
clearErr
:=
h
.
adminService
.
ClearAccountError
(
c
.
Request
.
Context
(),
accountID
);
clearErr
!=
nil
{
response
.
InternalError
(
c
,
"Failed to clear account error: "
+
clearErr
.
Error
())
return
}
}
}
else
{
// Use Anthropic/Claude OAuth service to refresh token
tokenInfo
,
err
:=
h
.
oauthService
.
RefreshAccountToken
(
c
.
Request
.
Context
(),
account
)
...
...
backend/internal/handler/gateway_handler.go
View file @
03c75787
...
...
@@ -31,6 +31,8 @@ type GatewayHandler struct {
userService
*
service
.
UserService
billingCacheService
*
service
.
BillingCacheService
concurrencyHelper
*
ConcurrencyHelper
maxAccountSwitches
int
maxAccountSwitchesGemini
int
}
// NewGatewayHandler creates a new GatewayHandler
...
...
@@ -44,8 +46,16 @@ func NewGatewayHandler(
cfg
*
config
.
Config
,
)
*
GatewayHandler
{
pingInterval
:=
time
.
Duration
(
0
)
maxAccountSwitches
:=
10
maxAccountSwitchesGemini
:=
3
if
cfg
!=
nil
{
pingInterval
=
time
.
Duration
(
cfg
.
Concurrency
.
PingInterval
)
*
time
.
Second
if
cfg
.
Gateway
.
MaxAccountSwitches
>
0
{
maxAccountSwitches
=
cfg
.
Gateway
.
MaxAccountSwitches
}
if
cfg
.
Gateway
.
MaxAccountSwitchesGemini
>
0
{
maxAccountSwitchesGemini
=
cfg
.
Gateway
.
MaxAccountSwitchesGemini
}
}
return
&
GatewayHandler
{
gatewayService
:
gatewayService
,
...
...
@@ -54,6 +64,8 @@ func NewGatewayHandler(
userService
:
userService
,
billingCacheService
:
billingCacheService
,
concurrencyHelper
:
NewConcurrencyHelper
(
concurrencyService
,
SSEPingFormatClaude
,
pingInterval
),
maxAccountSwitches
:
maxAccountSwitches
,
maxAccountSwitchesGemini
:
maxAccountSwitchesGemini
,
}
}
...
...
@@ -179,7 +191,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
}
if
platform
==
service
.
PlatformGemini
{
const
maxAccountSwitches
=
3
maxAccountSwitches
:=
h
.
maxAccountSwitches
Gemini
switchCount
:=
0
failedAccountIDs
:=
make
(
map
[
int64
]
struct
{})
lastFailoverStatus
:=
0
...
...
@@ -313,7 +325,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
}
}
const
maxAccountSwitches
=
10
maxAccountSwitches
:=
h
.
maxAccountSwitches
switchCount
:=
0
failedAccountIDs
:=
make
(
map
[
int64
]
struct
{})
lastFailoverStatus
:=
0
...
...
backend/internal/handler/gemini_v1beta_handler.go
View file @
03c75787
...
...
@@ -220,7 +220,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
if
sessionHash
!=
""
{
sessionKey
=
"gemini:"
+
sessionHash
}
const
maxAccountSwitches
=
3
maxAccountSwitches
:=
h
.
maxAccountSwitches
Gemini
switchCount
:=
0
failedAccountIDs
:=
make
(
map
[
int64
]
struct
{})
lastFailoverStatus
:=
0
...
...
backend/internal/handler/openai_gateway_handler.go
View file @
03c75787
...
...
@@ -25,6 +25,7 @@ type OpenAIGatewayHandler struct {
gatewayService
*
service
.
OpenAIGatewayService
billingCacheService
*
service
.
BillingCacheService
concurrencyHelper
*
ConcurrencyHelper
maxAccountSwitches
int
}
// NewOpenAIGatewayHandler creates a new OpenAIGatewayHandler
...
...
@@ -35,13 +36,18 @@ func NewOpenAIGatewayHandler(
cfg
*
config
.
Config
,
)
*
OpenAIGatewayHandler
{
pingInterval
:=
time
.
Duration
(
0
)
maxAccountSwitches
:=
3
if
cfg
!=
nil
{
pingInterval
=
time
.
Duration
(
cfg
.
Concurrency
.
PingInterval
)
*
time
.
Second
if
cfg
.
Gateway
.
MaxAccountSwitches
>
0
{
maxAccountSwitches
=
cfg
.
Gateway
.
MaxAccountSwitches
}
}
return
&
OpenAIGatewayHandler
{
gatewayService
:
gatewayService
,
billingCacheService
:
billingCacheService
,
concurrencyHelper
:
NewConcurrencyHelper
(
concurrencyService
,
SSEPingFormatComment
,
pingInterval
),
maxAccountSwitches
:
maxAccountSwitches
,
}
}
...
...
@@ -189,7 +195,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
// Generate session hash (header first; fallback to prompt_cache_key)
sessionHash
:=
h
.
gatewayService
.
GenerateSessionHash
(
c
,
reqBody
)
const
maxAccountSwitches
=
3
maxAccountSwitches
:=
h
.
maxAccountSwitches
switchCount
:=
0
failedAccountIDs
:=
make
(
map
[
int64
]
struct
{})
lastFailoverStatus
:=
0
...
...
backend/internal/pkg/antigravity/client.go
View file @
03c75787
...
...
@@ -16,15 +16,6 @@ import (
"time"
)
// resolveHost 从 URL 解析 host
func
resolveHost
(
urlStr
string
)
string
{
parsed
,
err
:=
url
.
Parse
(
urlStr
)
if
err
!=
nil
{
return
""
}
return
parsed
.
Host
}
// NewAPIRequestWithURL 使用指定的 base URL 创建 Antigravity API 请求(v1internal 端点)
func
NewAPIRequestWithURL
(
ctx
context
.
Context
,
baseURL
,
action
,
accessToken
string
,
body
[]
byte
)
(
*
http
.
Request
,
error
)
{
// 构建 URL,流式请求添加 ?alt=sse 参数
...
...
@@ -39,23 +30,11 @@ func NewAPIRequestWithURL(ctx context.Context, baseURL, action, accessToken stri
return
nil
,
err
}
// 基础 Headers
// 基础 Headers
(与 Antigravity-Manager 保持一致,只设置这 3 个)
req
.
Header
.
Set
(
"Content-Type"
,
"application/json"
)
req
.
Header
.
Set
(
"Authorization"
,
"Bearer "
+
accessToken
)
req
.
Header
.
Set
(
"User-Agent"
,
UserAgent
)
// Accept Header 根据请求类型设置
if
isStream
{
req
.
Header
.
Set
(
"Accept"
,
"text/event-stream"
)
}
else
{
req
.
Header
.
Set
(
"Accept"
,
"application/json"
)
}
// 显式设置 Host Header
if
host
:=
resolveHost
(
apiURL
);
host
!=
""
{
req
.
Host
=
host
}
return
req
,
nil
}
...
...
@@ -195,12 +174,15 @@ func isConnectionError(err error) bool {
}
// shouldFallbackToNextURL 判断是否应切换到下一个 URL
//
仅连接错误和 HTTP 429
触发 URL 降级
//
与 Antigravity-Manager 保持一致:连接错误、429、408、404、5xx
触发 URL 降级
func
shouldFallbackToNextURL
(
err
error
,
statusCode
int
)
bool
{
if
isConnectionError
(
err
)
{
return
true
}
return
statusCode
==
http
.
StatusTooManyRequests
return
statusCode
==
http
.
StatusTooManyRequests
||
statusCode
==
http
.
StatusRequestTimeout
||
statusCode
==
http
.
StatusNotFound
||
statusCode
>=
500
}
// ExchangeCode 用 authorization code 交换 token
...
...
@@ -321,11 +303,8 @@ func (c *Client) LoadCodeAssist(ctx context.Context, accessToken string) (*LoadC
return
nil
,
nil
,
fmt
.
Errorf
(
"序列化请求失败: %w"
,
err
)
}
// 获取可用的 URL 列表
availableURLs
:=
DefaultURLAvailability
.
GetAvailableURLs
()
if
len
(
availableURLs
)
==
0
{
availableURLs
=
BaseURLs
// 所有 URL 都不可用时,重试所有
}
// 固定顺序:prod -> daily
availableURLs
:=
BaseURLs
var
lastErr
error
for
urlIdx
,
baseURL
:=
range
availableURLs
{
...
...
@@ -343,7 +322,6 @@ func (c *Client) LoadCodeAssist(ctx context.Context, accessToken string) (*LoadC
if
err
!=
nil
{
lastErr
=
fmt
.
Errorf
(
"loadCodeAssist 请求失败: %w"
,
err
)
if
shouldFallbackToNextURL
(
err
,
0
)
&&
urlIdx
<
len
(
availableURLs
)
-
1
{
DefaultURLAvailability
.
MarkUnavailable
(
baseURL
)
log
.
Printf
(
"[antigravity] loadCodeAssist URL fallback: %s -> %s"
,
baseURL
,
availableURLs
[
urlIdx
+
1
])
continue
}
...
...
@@ -358,7 +336,6 @@ func (c *Client) LoadCodeAssist(ctx context.Context, accessToken string) (*LoadC
// 检查是否需要 URL 降级
if
shouldFallbackToNextURL
(
nil
,
resp
.
StatusCode
)
&&
urlIdx
<
len
(
availableURLs
)
-
1
{
DefaultURLAvailability
.
MarkUnavailable
(
baseURL
)
log
.
Printf
(
"[antigravity] loadCodeAssist URL fallback (HTTP %d): %s -> %s"
,
resp
.
StatusCode
,
baseURL
,
availableURLs
[
urlIdx
+
1
])
continue
}
...
...
@@ -376,6 +353,8 @@ func (c *Client) LoadCodeAssist(ctx context.Context, accessToken string) (*LoadC
var
rawResp
map
[
string
]
any
_
=
json
.
Unmarshal
(
respBodyBytes
,
&
rawResp
)
// 标记成功的 URL,下次优先使用
DefaultURLAvailability
.
MarkSuccess
(
baseURL
)
return
&
loadResp
,
rawResp
,
nil
}
...
...
@@ -412,11 +391,8 @@ func (c *Client) FetchAvailableModels(ctx context.Context, accessToken, projectI
return
nil
,
nil
,
fmt
.
Errorf
(
"序列化请求失败: %w"
,
err
)
}
// 获取可用的 URL 列表
availableURLs
:=
DefaultURLAvailability
.
GetAvailableURLs
()
if
len
(
availableURLs
)
==
0
{
availableURLs
=
BaseURLs
// 所有 URL 都不可用时,重试所有
}
// 固定顺序:prod -> daily
availableURLs
:=
BaseURLs
var
lastErr
error
for
urlIdx
,
baseURL
:=
range
availableURLs
{
...
...
@@ -434,7 +410,6 @@ func (c *Client) FetchAvailableModels(ctx context.Context, accessToken, projectI
if
err
!=
nil
{
lastErr
=
fmt
.
Errorf
(
"fetchAvailableModels 请求失败: %w"
,
err
)
if
shouldFallbackToNextURL
(
err
,
0
)
&&
urlIdx
<
len
(
availableURLs
)
-
1
{
DefaultURLAvailability
.
MarkUnavailable
(
baseURL
)
log
.
Printf
(
"[antigravity] fetchAvailableModels URL fallback: %s -> %s"
,
baseURL
,
availableURLs
[
urlIdx
+
1
])
continue
}
...
...
@@ -449,7 +424,6 @@ func (c *Client) FetchAvailableModels(ctx context.Context, accessToken, projectI
// 检查是否需要 URL 降级
if
shouldFallbackToNextURL
(
nil
,
resp
.
StatusCode
)
&&
urlIdx
<
len
(
availableURLs
)
-
1
{
DefaultURLAvailability
.
MarkUnavailable
(
baseURL
)
log
.
Printf
(
"[antigravity] fetchAvailableModels URL fallback (HTTP %d): %s -> %s"
,
resp
.
StatusCode
,
baseURL
,
availableURLs
[
urlIdx
+
1
])
continue
}
...
...
@@ -467,6 +441,8 @@ func (c *Client) FetchAvailableModels(ctx context.Context, accessToken, projectI
var
rawResp
map
[
string
]
any
_
=
json
.
Unmarshal
(
respBodyBytes
,
&
rawResp
)
// 标记成功的 URL,下次优先使用
DefaultURLAvailability
.
MarkSuccess
(
baseURL
)
return
&
modelsResp
,
rawResp
,
nil
}
...
...
backend/internal/pkg/antigravity/gemini_types.go
View file @
03c75787
...
...
@@ -146,6 +146,7 @@ type GeminiCandidate struct {
Content
*
GeminiContent
`json:"content,omitempty"`
FinishReason
string
`json:"finishReason,omitempty"`
Index
int
`json:"index,omitempty"`
GroundingMetadata
*
GeminiGroundingMetadata
`json:"groundingMetadata,omitempty"`
}
// GeminiUsageMetadata Gemini 用量元数据
...
...
@@ -156,6 +157,23 @@ type GeminiUsageMetadata struct {
TotalTokenCount
int
`json:"totalTokenCount,omitempty"`
}
// GeminiGroundingMetadata Gemini grounding 元数据(Web Search)
type
GeminiGroundingMetadata
struct
{
WebSearchQueries
[]
string
`json:"webSearchQueries,omitempty"`
GroundingChunks
[]
GeminiGroundingChunk
`json:"groundingChunks,omitempty"`
}
// GeminiGroundingChunk Gemini grounding chunk
type
GeminiGroundingChunk
struct
{
Web
*
GeminiGroundingWeb
`json:"web,omitempty"`
}
// GeminiGroundingWeb Gemini grounding web 信息
type
GeminiGroundingWeb
struct
{
Title
string
`json:"title,omitempty"`
URI
string
`json:"uri,omitempty"`
}
// DefaultSafetySettings 默认安全设置(关闭所有过滤)
var
DefaultSafetySettings
=
[]
GeminiSafetySetting
{
{
Category
:
"HARM_CATEGORY_HARASSMENT"
,
Threshold
:
"OFF"
},
...
...
backend/internal/pkg/antigravity/oauth.go
View file @
03c75787
...
...
@@ -32,8 +32,8 @@ const (
"https://www.googleapis.com/auth/cclog "
+
"https://www.googleapis.com/auth/experimentsandconfigs"
// User-Agent(
模拟官方客户端
)
UserAgent
=
"antigravity/1.1
04.0 darwin/arm
64"
// User-Agent(
与 Antigravity-Manager 保持一致
)
UserAgent
=
"antigravity/1.1
1.9 windows/amd
64"
// Session 过期时间
SessionTTL
=
30
*
time
.
Minute
...
...
@@ -42,22 +42,21 @@ const (
URLAvailabilityTTL
=
5
*
time
.
Minute
)
// BaseURLs 定义 Antigravity API 端点,按优先级排序
// fallback 顺序: sandbox → daily → prod
// BaseURLs 定义 Antigravity API 端点(与 Antigravity-Manager 保持一致)
var
BaseURLs
=
[]
string
{
"https://daily-cloudcode-pa.sandbox.googleapis.com"
,
// sandbox
"https://daily-cloudcode-pa.googleapis.com"
,
// daily
"https://cloudcode-pa.googleapis.com"
,
// prod
"https://cloudcode-pa.googleapis.com"
,
// prod (优先)
"https://daily-cloudcode-pa.sandbox.googleapis.com"
,
// daily sandbox (备用)
}
// BaseURL 默认 URL(保持向后兼容)
var
BaseURL
=
BaseURLs
[
0
]
// URLAvailability 管理 URL 可用性状态(带 TTL 自动恢复)
// URLAvailability 管理 URL 可用性状态(带 TTL 自动恢复
和动态优先级
)
type
URLAvailability
struct
{
mu
sync
.
RWMutex
unavailable
map
[
string
]
time
.
Time
// URL -> 恢复时间
ttl
time
.
Duration
lastSuccess
string
// 最近成功请求的 URL,优先使用
}
// DefaultURLAvailability 全局 URL 可用性管理器
...
...
@@ -78,6 +77,15 @@ func (u *URLAvailability) MarkUnavailable(url string) {
u
.
unavailable
[
url
]
=
time
.
Now
()
.
Add
(
u
.
ttl
)
}
// MarkSuccess 标记 URL 请求成功,将其设为优先使用
func
(
u
*
URLAvailability
)
MarkSuccess
(
url
string
)
{
u
.
mu
.
Lock
()
defer
u
.
mu
.
Unlock
()
u
.
lastSuccess
=
url
// 成功后清除该 URL 的不可用标记
delete
(
u
.
unavailable
,
url
)
}
// IsAvailable 检查 URL 是否可用
func
(
u
*
URLAvailability
)
IsAvailable
(
url
string
)
bool
{
u
.
mu
.
RLock
()
...
...
@@ -89,14 +97,29 @@ func (u *URLAvailability) IsAvailable(url string) bool {
return
time
.
Now
()
.
After
(
expiry
)
}
// GetAvailableURLs 返回可用的 URL 列表(保持优先级顺序)
// GetAvailableURLs 返回可用的 URL 列表
// 最近成功的 URL 优先,其他按默认顺序
func
(
u
*
URLAvailability
)
GetAvailableURLs
()
[]
string
{
u
.
mu
.
RLock
()
defer
u
.
mu
.
RUnlock
()
now
:=
time
.
Now
()
result
:=
make
([]
string
,
0
,
len
(
BaseURLs
))
// 如果有最近成功的 URL 且可用,放在最前面
if
u
.
lastSuccess
!=
""
{
expiry
,
exists
:=
u
.
unavailable
[
u
.
lastSuccess
]
if
!
exists
||
now
.
After
(
expiry
)
{
result
=
append
(
result
,
u
.
lastSuccess
)
}
}
// 添加其他可用的 URL(按默认顺序)
for
_
,
url
:=
range
BaseURLs
{
// 跳过已添加的 lastSuccess
if
url
==
u
.
lastSuccess
{
continue
}
expiry
,
exists
:=
u
.
unavailable
[
url
]
if
!
exists
||
now
.
After
(
expiry
)
{
result
=
append
(
result
,
url
)
...
...
@@ -240,24 +263,3 @@ func BuildAuthorizationURL(state, codeChallenge string) string {
return
fmt
.
Sprintf
(
"%s?%s"
,
AuthorizeURL
,
params
.
Encode
())
}
// GenerateMockProjectID 生成随机 project_id(当 API 不返回时使用)
// 格式:{形容词}-{名词}-{5位随机字符}
func
GenerateMockProjectID
()
string
{
adjectives
:=
[]
string
{
"useful"
,
"bright"
,
"swift"
,
"calm"
,
"bold"
}
nouns
:=
[]
string
{
"fuze"
,
"wave"
,
"spark"
,
"flow"
,
"core"
}
randBytes
,
_
:=
GenerateRandomBytes
(
7
)
adj
:=
adjectives
[
int
(
randBytes
[
0
])
%
len
(
adjectives
)]
noun
:=
nouns
[
int
(
randBytes
[
1
])
%
len
(
nouns
)]
// 生成 5 位随机字符(a-z0-9)
const
charset
=
"abcdefghijklmnopqrstuvwxyz0123456789"
suffix
:=
make
([]
byte
,
5
)
for
i
:=
0
;
i
<
5
;
i
++
{
suffix
[
i
]
=
charset
[
int
(
randBytes
[
i
+
2
])
%
len
(
charset
)]
}
return
fmt
.
Sprintf
(
"%s-%s-%s"
,
adj
,
noun
,
string
(
suffix
))
}
backend/internal/pkg/antigravity/request_transformer.go
View file @
03c75787
...
...
@@ -54,6 +54,9 @@ func DefaultTransformOptions() TransformOptions {
}
}
// webSearchFallbackModel web_search 请求使用的降级模型
const
webSearchFallbackModel
=
"gemini-2.5-flash"
// TransformClaudeToGemini 将 Claude 请求转换为 v1internal Gemini 格式
func
TransformClaudeToGemini
(
claudeReq
*
ClaudeRequest
,
projectID
,
mappedModel
string
)
([]
byte
,
error
)
{
return
TransformClaudeToGeminiWithOptions
(
claudeReq
,
projectID
,
mappedModel
,
DefaultTransformOptions
())
...
...
@@ -64,12 +67,23 @@ func TransformClaudeToGeminiWithOptions(claudeReq *ClaudeRequest, projectID, map
// 用于存储 tool_use id -> name 映射
toolIDToName
:=
make
(
map
[
string
]
string
)
// 检测是否有 web_search 工具
hasWebSearchTool
:=
hasWebSearchTool
(
claudeReq
.
Tools
)
requestType
:=
"agent"
targetModel
:=
mappedModel
if
hasWebSearchTool
{
requestType
=
"web_search"
if
targetModel
!=
webSearchFallbackModel
{
targetModel
=
webSearchFallbackModel
}
}
// 检测是否启用 thinking
isThinkingEnabled
:=
claudeReq
.
Thinking
!=
nil
&&
claudeReq
.
Thinking
.
Type
==
"enabled"
// 只有 Gemini 模型支持 dummy thought workaround
// Claude 模型通过 Vertex/Google API 需要有效的 thought signatures
allowDummyThought
:=
strings
.
HasPrefix
(
mapped
Model
,
"gemini-"
)
allowDummyThought
:=
strings
.
HasPrefix
(
target
Model
,
"gemini-"
)
// 1. 构建 contents
contents
,
strippedThinking
,
err
:=
buildContents
(
claudeReq
.
Messages
,
toolIDToName
,
isThinkingEnabled
,
allowDummyThought
)
...
...
@@ -78,7 +92,7 @@ func TransformClaudeToGeminiWithOptions(claudeReq *ClaudeRequest, projectID, map
}
// 2. 构建 systemInstruction
systemInstruction
:=
buildSystemInstruction
(
claudeReq
.
System
,
claudeReq
.
Model
,
opts
)
systemInstruction
:=
buildSystemInstruction
(
claudeReq
.
System
,
claudeReq
.
Model
,
opts
,
claudeReq
.
Tools
)
// 3. 构建 generationConfig
reqForConfig
:=
claudeReq
...
...
@@ -89,6 +103,11 @@ func TransformClaudeToGeminiWithOptions(claudeReq *ClaudeRequest, projectID, map
reqCopy
.
Thinking
=
nil
reqForConfig
=
&
reqCopy
}
if
targetModel
!=
""
&&
targetModel
!=
reqForConfig
.
Model
{
reqCopy
:=
*
reqForConfig
reqCopy
.
Model
=
targetModel
reqForConfig
=
&
reqCopy
}
generationConfig
:=
buildGenerationConfig
(
reqForConfig
)
// 4. 构建 tools
...
...
@@ -127,8 +146,8 @@ func TransformClaudeToGeminiWithOptions(claudeReq *ClaudeRequest, projectID, map
Project
:
projectID
,
RequestID
:
"agent-"
+
uuid
.
New
()
.
String
(),
UserAgent
:
"antigravity"
,
// 固定值,与官方客户端一致
RequestType
:
"agent"
,
Model
:
mapped
Model
,
RequestType
:
requestType
,
Model
:
target
Model
,
Request
:
innerRequest
,
}
...
...
@@ -154,8 +173,40 @@ func GetDefaultIdentityPatch() string {
return
antigravityIdentity
}
// buildSystemInstruction 构建 systemInstruction
func
buildSystemInstruction
(
system
json
.
RawMessage
,
modelName
string
,
opts
TransformOptions
)
*
GeminiContent
{
// mcpXMLProtocol MCP XML 工具调用协议(与 Antigravity-Manager 保持一致)
const
mcpXMLProtocol
=
`
==== MCP XML 工具调用协议 (Workaround) ====
当你需要调用名称以 `
+
"`mcp__`"
+
` 开头的 MCP 工具时:
1) 优先尝试 XML 格式调用:输出 `
+
"`<mcp__tool_name>{
\"
arg
\"
:
\"
value
\"
}</mcp__tool_name>`"
+
`。
2) 必须直接输出 XML 块,无需 markdown 包装,内容为 JSON 格式的入参。
3) 这种方式具有更高的连通性和容错性,适用于大型结果返回场景。
===========================================`
// hasMCPTools 检测是否有 mcp__ 前缀的工具
func
hasMCPTools
(
tools
[]
ClaudeTool
)
bool
{
for
_
,
tool
:=
range
tools
{
if
strings
.
HasPrefix
(
tool
.
Name
,
"mcp__"
)
{
return
true
}
}
return
false
}
// filterOpenCodePrompt 过滤 OpenCode 默认提示词,只保留用户自定义指令
func
filterOpenCodePrompt
(
text
string
)
string
{
if
!
strings
.
Contains
(
text
,
"You are an interactive CLI tool"
)
{
return
text
}
// 提取 "Instructions from:" 及之后的部分
if
idx
:=
strings
.
Index
(
text
,
"Instructions from:"
);
idx
>=
0
{
return
text
[
idx
:
]
}
// 如果没有自定义指令,返回空
return
""
}
// buildSystemInstruction 构建 systemInstruction(与 Antigravity-Manager 保持一致)
func
buildSystemInstruction
(
system
json
.
RawMessage
,
modelName
string
,
opts
TransformOptions
,
tools
[]
ClaudeTool
)
*
GeminiContent
{
var
parts
[]
GeminiPart
// 先解析用户的 system prompt,检测是否已包含 Antigravity identity
...
...
@@ -167,10 +218,14 @@ func buildSystemInstruction(system json.RawMessage, modelName string, opts Trans
var
sysStr
string
if
err
:=
json
.
Unmarshal
(
system
,
&
sysStr
);
err
==
nil
{
if
strings
.
TrimSpace
(
sysStr
)
!=
""
{
userSystemParts
=
append
(
userSystemParts
,
GeminiPart
{
Text
:
sysStr
})
if
strings
.
Contains
(
sysStr
,
"You are Antigravity"
)
{
userHasAntigravityIdentity
=
true
}
// 过滤 OpenCode 默认提示词
filtered
:=
filterOpenCodePrompt
(
sysStr
)
if
filtered
!=
""
{
userSystemParts
=
append
(
userSystemParts
,
GeminiPart
{
Text
:
filtered
})
}
}
}
else
{
// 尝试解析为数组
...
...
@@ -178,10 +233,14 @@ func buildSystemInstruction(system json.RawMessage, modelName string, opts Trans
if
err
:=
json
.
Unmarshal
(
system
,
&
sysBlocks
);
err
==
nil
{
for
_
,
block
:=
range
sysBlocks
{
if
block
.
Type
==
"text"
&&
strings
.
TrimSpace
(
block
.
Text
)
!=
""
{
userSystemParts
=
append
(
userSystemParts
,
GeminiPart
{
Text
:
block
.
Text
})
if
strings
.
Contains
(
block
.
Text
,
"You are Antigravity"
)
{
userHasAntigravityIdentity
=
true
}
// 过滤 OpenCode 默认提示词
filtered
:=
filterOpenCodePrompt
(
block
.
Text
)
if
filtered
!=
""
{
userSystemParts
=
append
(
userSystemParts
,
GeminiPart
{
Text
:
filtered
})
}
}
}
}
...
...
@@ -200,6 +259,16 @@ func buildSystemInstruction(system json.RawMessage, modelName string, opts Trans
// 添加用户的 system prompt
parts
=
append
(
parts
,
userSystemParts
...
)
// 检测是否有 MCP 工具,如有则注入 XML 调用协议
if
hasMCPTools
(
tools
)
{
parts
=
append
(
parts
,
GeminiPart
{
Text
:
mcpXMLProtocol
})
}
// 如果用户没有提供 Antigravity 身份,添加结束标记
if
!
userHasAntigravityIdentity
{
parts
=
append
(
parts
,
GeminiPart
{
Text
:
"
\n
--- [SYSTEM_PROMPT_END] ---"
})
}
if
len
(
parts
)
==
0
{
return
nil
}
...
...
@@ -429,6 +498,11 @@ func buildGenerationConfig(req *ClaudeRequest) *GeminiGenerationConfig {
StopSequences
:
DefaultStopSequences
,
}
// 如果请求中指定了 MaxTokens,使用请求值
if
req
.
MaxTokens
>
0
{
config
.
MaxOutputTokens
=
req
.
MaxTokens
}
// Thinking 配置
if
req
.
Thinking
!=
nil
&&
req
.
Thinking
.
Type
==
"enabled"
{
config
.
ThinkingConfig
=
&
GeminiThinkingConfig
{
...
...
@@ -458,37 +532,43 @@ func buildGenerationConfig(req *ClaudeRequest) *GeminiGenerationConfig {
return
config
}
// buildTools 构建 tools
func
buildTools
(
tools
[]
ClaudeTool
)
[]
GeminiToolDeclaration
{
if
len
(
tools
)
==
0
{
return
nil
func
hasWebSearchTool
(
tools
[]
ClaudeTool
)
bool
{
for
_
,
tool
:=
range
tools
{
if
isWebSearchTool
(
tool
)
{
return
true
}
}
return
false
}
// 检查是否有 web_search 工具
hasWebSearch
:=
false
for
_
,
tool
:=
range
tools
{
if
tool
.
Name
==
"web_search"
{
hasWebSearch
=
true
break
func
isWebSearchTool
(
tool
ClaudeTool
)
bool
{
if
strings
.
HasPrefix
(
tool
.
Type
,
"web_search"
)
||
tool
.
Type
==
"google_search"
{
return
true
}
name
:=
strings
.
TrimSpace
(
tool
.
Name
)
switch
name
{
case
"web_search"
,
"google_search"
,
"web_search_20250305"
:
return
true
default
:
return
false
}
}
if
hasWebSearch
{
// Web Search 工具映射
return
[]
GeminiToolDeclaration
{{
GoogleSearch
:
&
GeminiGoogleSearch
{
EnhancedContent
:
&
GeminiEnhancedContent
{
ImageSearch
:
&
GeminiImageSearch
{
MaxResultCount
:
5
,
},
},
},
}}
// buildTools 构建 tools
func
buildTools
(
tools
[]
ClaudeTool
)
[]
GeminiToolDeclaration
{
if
len
(
tools
)
==
0
{
return
nil
}
hasWebSearch
:=
hasWebSearchTool
(
tools
)
// 普通工具
var
funcDecls
[]
GeminiFunctionDecl
for
_
,
tool
:=
range
tools
{
if
isWebSearchTool
(
tool
)
{
continue
}
// 跳过无效工具名称
if
strings
.
TrimSpace
(
tool
.
Name
)
==
""
{
log
.
Printf
(
"Warning: skipping tool with empty name"
)
...
...
@@ -531,9 +611,22 @@ func buildTools(tools []ClaudeTool) []GeminiToolDeclaration {
}
if
len
(
funcDecls
)
==
0
{
if
!
hasWebSearch
{
return
nil
}
// Web Search 工具映射
return
[]
GeminiToolDeclaration
{{
GoogleSearch
:
&
GeminiGoogleSearch
{
EnhancedContent
:
&
GeminiEnhancedContent
{
ImageSearch
:
&
GeminiImageSearch
{
MaxResultCount
:
5
,
},
},
},
}}
}
return
[]
GeminiToolDeclaration
{{
FunctionDeclarations
:
funcDecls
,
}}
...
...
backend/internal/pkg/antigravity/response_transformer.go
View file @
03c75787
...
...
@@ -3,6 +3,7 @@ package antigravity
import
(
"encoding/json"
"fmt"
"strings"
)
// TransformGeminiToClaude 将 Gemini 响应转换为 Claude 格式(非流式)
...
...
@@ -63,6 +64,12 @@ func (p *NonStreamingProcessor) Process(geminiResp *GeminiResponse, responseID,
p
.
processPart
(
&
part
)
}
if
len
(
geminiResp
.
Candidates
)
>
0
{
if
grounding
:=
geminiResp
.
Candidates
[
0
]
.
GroundingMetadata
;
grounding
!=
nil
{
p
.
processGrounding
(
grounding
)
}
}
// 刷新剩余内容
p
.
flushThinking
()
p
.
flushText
()
...
...
@@ -190,6 +197,18 @@ func (p *NonStreamingProcessor) processPart(part *GeminiPart) {
}
}
func
(
p
*
NonStreamingProcessor
)
processGrounding
(
grounding
*
GeminiGroundingMetadata
)
{
groundingText
:=
buildGroundingText
(
grounding
)
if
groundingText
==
""
{
return
}
p
.
flushThinking
()
p
.
flushText
()
p
.
textBuilder
+=
groundingText
p
.
flushText
()
}
// flushText 刷新 text builder
func
(
p
*
NonStreamingProcessor
)
flushText
()
{
if
p
.
textBuilder
==
""
{
...
...
@@ -262,6 +281,44 @@ func (p *NonStreamingProcessor) buildResponse(geminiResp *GeminiResponse, respon
}
}
func
buildGroundingText
(
grounding
*
GeminiGroundingMetadata
)
string
{
if
grounding
==
nil
{
return
""
}
var
builder
strings
.
Builder
if
len
(
grounding
.
WebSearchQueries
)
>
0
{
_
,
_
=
builder
.
WriteString
(
"
\n\n
---
\n
Web search queries: "
)
_
,
_
=
builder
.
WriteString
(
strings
.
Join
(
grounding
.
WebSearchQueries
,
", "
))
}
if
len
(
grounding
.
GroundingChunks
)
>
0
{
var
links
[]
string
for
i
,
chunk
:=
range
grounding
.
GroundingChunks
{
if
chunk
.
Web
==
nil
{
continue
}
title
:=
strings
.
TrimSpace
(
chunk
.
Web
.
Title
)
if
title
==
""
{
title
=
"Source"
}
uri
:=
strings
.
TrimSpace
(
chunk
.
Web
.
URI
)
if
uri
==
""
{
uri
=
"#"
}
links
=
append
(
links
,
fmt
.
Sprintf
(
"[%d] [%s](%s)"
,
i
+
1
,
title
,
uri
))
}
if
len
(
links
)
>
0
{
_
,
_
=
builder
.
WriteString
(
"
\n\n
Sources:
\n
"
)
_
,
_
=
builder
.
WriteString
(
strings
.
Join
(
links
,
"
\n
"
))
}
}
return
builder
.
String
()
}
// generateRandomID 生成随机 ID
func
generateRandomID
()
string
{
const
chars
=
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
...
...
backend/internal/pkg/antigravity/stream_transformer.go
View file @
03c75787
...
...
@@ -27,6 +27,8 @@ type StreamingProcessor struct {
pendingSignature
string
trailingSignature
string
originalModel
string
webSearchQueries
[]
string
groundingChunks
[]
GeminiGroundingChunk
// 累计 usage
inputTokens
int
...
...
@@ -93,6 +95,10 @@ func (p *StreamingProcessor) ProcessLine(line string) []byte {
}
}
if
len
(
geminiResp
.
Candidates
)
>
0
{
p
.
captureGrounding
(
geminiResp
.
Candidates
[
0
]
.
GroundingMetadata
)
}
// 检查是否结束
if
len
(
geminiResp
.
Candidates
)
>
0
{
finishReason
:=
geminiResp
.
Candidates
[
0
]
.
FinishReason
...
...
@@ -200,6 +206,20 @@ func (p *StreamingProcessor) processPart(part *GeminiPart) []byte {
return
result
.
Bytes
()
}
func
(
p
*
StreamingProcessor
)
captureGrounding
(
grounding
*
GeminiGroundingMetadata
)
{
if
grounding
==
nil
{
return
}
if
len
(
grounding
.
WebSearchQueries
)
>
0
&&
len
(
p
.
webSearchQueries
)
==
0
{
p
.
webSearchQueries
=
append
([]
string
(
nil
),
grounding
.
WebSearchQueries
...
)
}
if
len
(
grounding
.
GroundingChunks
)
>
0
&&
len
(
p
.
groundingChunks
)
==
0
{
p
.
groundingChunks
=
append
([]
GeminiGroundingChunk
(
nil
),
grounding
.
GroundingChunks
...
)
}
}
// processThinking 处理 thinking
func
(
p
*
StreamingProcessor
)
processThinking
(
text
,
signature
string
)
[]
byte
{
var
result
bytes
.
Buffer
...
...
@@ -417,6 +437,23 @@ func (p *StreamingProcessor) emitFinish(finishReason string) []byte {
p
.
trailingSignature
=
""
}
if
len
(
p
.
webSearchQueries
)
>
0
||
len
(
p
.
groundingChunks
)
>
0
{
groundingText
:=
buildGroundingText
(
&
GeminiGroundingMetadata
{
WebSearchQueries
:
p
.
webSearchQueries
,
GroundingChunks
:
p
.
groundingChunks
,
})
if
groundingText
!=
""
{
_
,
_
=
result
.
Write
(
p
.
startBlock
(
BlockTypeText
,
map
[
string
]
any
{
"type"
:
"text"
,
"text"
:
""
,
}))
_
,
_
=
result
.
Write
(
p
.
emitDelta
(
"text_delta"
,
map
[
string
]
any
{
"text"
:
groundingText
,
}))
_
,
_
=
result
.
Write
(
p
.
endBlock
())
}
}
// 确定 stop_reason
stopReason
:=
"end_turn"
if
p
.
usedTool
{
...
...
backend/internal/repository/account_repo.go
View file @
03c75787
...
...
@@ -543,6 +543,15 @@ func (r *accountRepository) SetError(ctx context.Context, id int64, errorMsg str
return
nil
}
func
(
r
*
accountRepository
)
ClearError
(
ctx
context
.
Context
,
id
int64
)
error
{
_
,
err
:=
r
.
client
.
Account
.
Update
()
.
Where
(
dbaccount
.
IDEQ
(
id
))
.
SetStatus
(
service
.
StatusActive
)
.
SetErrorMessage
(
""
)
.
Save
(
ctx
)
return
err
}
func
(
r
*
accountRepository
)
AddToGroup
(
ctx
context
.
Context
,
accountID
,
groupID
int64
,
priority
int
)
error
{
_
,
err
:=
r
.
client
.
AccountGroup
.
Create
()
.
SetAccountID
(
accountID
)
.
...
...
backend/internal/server/api_contract_test.go
View file @
03c75787
...
...
@@ -744,6 +744,10 @@ func (s *stubAccountRepo) SetError(ctx context.Context, id int64, errorMsg strin
return
errors
.
New
(
"not implemented"
)
}
func
(
s
*
stubAccountRepo
)
ClearError
(
ctx
context
.
Context
,
id
int64
)
error
{
return
errors
.
New
(
"not implemented"
)
}
func
(
s
*
stubAccountRepo
)
SetSchedulable
(
ctx
context
.
Context
,
id
int64
,
schedulable
bool
)
error
{
return
errors
.
New
(
"not implemented"
)
}
...
...
backend/internal/service/account_service.go
View file @
03c75787
...
...
@@ -37,6 +37,7 @@ type AccountRepository interface {
UpdateLastUsed
(
ctx
context
.
Context
,
id
int64
)
error
BatchUpdateLastUsed
(
ctx
context
.
Context
,
updates
map
[
int64
]
time
.
Time
)
error
SetError
(
ctx
context
.
Context
,
id
int64
,
errorMsg
string
)
error
ClearError
(
ctx
context
.
Context
,
id
int64
)
error
SetSchedulable
(
ctx
context
.
Context
,
id
int64
,
schedulable
bool
)
error
AutoPauseExpiredAccounts
(
ctx
context
.
Context
,
now
time
.
Time
)
(
int64
,
error
)
BindGroups
(
ctx
context
.
Context
,
accountID
int64
,
groupIDs
[]
int64
)
error
...
...
backend/internal/service/account_service_delete_test.go
View file @
03c75787
...
...
@@ -99,6 +99,10 @@ func (s *accountRepoStub) SetError(ctx context.Context, id int64, errorMsg strin
panic
(
"unexpected SetError call"
)
}
func
(
s
*
accountRepoStub
)
ClearError
(
ctx
context
.
Context
,
id
int64
)
error
{
panic
(
"unexpected ClearError call"
)
}
func
(
s
*
accountRepoStub
)
SetSchedulable
(
ctx
context
.
Context
,
id
int64
,
schedulable
bool
)
error
{
panic
(
"unexpected SetSchedulable call"
)
}
...
...
backend/internal/service/admin_service.go
View file @
03c75787
...
...
@@ -42,6 +42,7 @@ type AdminService interface {
DeleteAccount
(
ctx
context
.
Context
,
id
int64
)
error
RefreshAccountCredentials
(
ctx
context
.
Context
,
id
int64
)
(
*
Account
,
error
)
ClearAccountError
(
ctx
context
.
Context
,
id
int64
)
(
*
Account
,
error
)
SetAccountError
(
ctx
context
.
Context
,
id
int64
,
errorMsg
string
)
error
SetAccountSchedulable
(
ctx
context
.
Context
,
id
int64
,
schedulable
bool
)
(
*
Account
,
error
)
BulkUpdateAccounts
(
ctx
context
.
Context
,
input
*
BulkUpdateAccountsInput
)
(
*
BulkUpdateAccountsResult
,
error
)
...
...
@@ -1101,6 +1102,10 @@ func (s *adminServiceImpl) ClearAccountError(ctx context.Context, id int64) (*Ac
return
account
,
nil
}
func
(
s
*
adminServiceImpl
)
SetAccountError
(
ctx
context
.
Context
,
id
int64
,
errorMsg
string
)
error
{
return
s
.
accountRepo
.
SetError
(
ctx
,
id
,
errorMsg
)
}
func
(
s
*
adminServiceImpl
)
SetAccountSchedulable
(
ctx
context
.
Context
,
id
int64
,
schedulable
bool
)
(
*
Account
,
error
)
{
if
err
:=
s
.
accountRepo
.
SetSchedulable
(
ctx
,
id
,
schedulable
);
err
!=
nil
{
return
nil
,
err
...
...
backend/internal/service/antigravity_gateway_service.go
View file @
03c75787
...
...
@@ -12,6 +12,7 @@ import (
mathrand
"math/rand"
"net"
"net/http"
"os"
"strings"
"sync/atomic"
"time"
...
...
@@ -28,6 +29,207 @@ const (
antigravityRetryMaxDelay
=
16
*
time
.
Second
)
const
antigravityScopeRateLimitEnv
=
"GATEWAY_ANTIGRAVITY_429_SCOPE_LIMIT"
// antigravityRetryLoopParams 重试循环的参数
type
antigravityRetryLoopParams
struct
{
ctx
context
.
Context
prefix
string
account
*
Account
proxyURL
string
accessToken
string
action
string
body
[]
byte
quotaScope
AntigravityQuotaScope
c
*
gin
.
Context
httpUpstream
HTTPUpstream
settingService
*
SettingService
handleError
func
(
ctx
context
.
Context
,
prefix
string
,
account
*
Account
,
statusCode
int
,
headers
http
.
Header
,
body
[]
byte
,
quotaScope
AntigravityQuotaScope
)
}
// antigravityRetryLoopResult 重试循环的结果
type
antigravityRetryLoopResult
struct
{
resp
*
http
.
Response
}
// antigravityRetryLoop 执行带 URL fallback 的重试循环
func
antigravityRetryLoop
(
p
antigravityRetryLoopParams
)
(
*
antigravityRetryLoopResult
,
error
)
{
availableURLs
:=
antigravity
.
DefaultURLAvailability
.
GetAvailableURLs
()
if
len
(
availableURLs
)
==
0
{
availableURLs
=
antigravity
.
BaseURLs
}
var
resp
*
http
.
Response
var
usedBaseURL
string
logBody
:=
p
.
settingService
!=
nil
&&
p
.
settingService
.
cfg
!=
nil
&&
p
.
settingService
.
cfg
.
Gateway
.
LogUpstreamErrorBody
maxBytes
:=
2048
if
p
.
settingService
!=
nil
&&
p
.
settingService
.
cfg
!=
nil
&&
p
.
settingService
.
cfg
.
Gateway
.
LogUpstreamErrorBodyMaxBytes
>
0
{
maxBytes
=
p
.
settingService
.
cfg
.
Gateway
.
LogUpstreamErrorBodyMaxBytes
}
getUpstreamDetail
:=
func
(
body
[]
byte
)
string
{
if
!
logBody
{
return
""
}
return
truncateString
(
string
(
body
),
maxBytes
)
}
urlFallbackLoop
:
for
urlIdx
,
baseURL
:=
range
availableURLs
{
usedBaseURL
=
baseURL
for
attempt
:=
1
;
attempt
<=
antigravityMaxRetries
;
attempt
++
{
select
{
case
<-
p
.
ctx
.
Done
()
:
log
.
Printf
(
"%s status=context_canceled error=%v"
,
p
.
prefix
,
p
.
ctx
.
Err
())
return
nil
,
p
.
ctx
.
Err
()
default
:
}
upstreamReq
,
err
:=
antigravity
.
NewAPIRequestWithURL
(
p
.
ctx
,
baseURL
,
p
.
action
,
p
.
accessToken
,
p
.
body
)
if
err
!=
nil
{
return
nil
,
err
}
// Capture upstream request body for ops retry of this attempt.
if
p
.
c
!=
nil
&&
len
(
p
.
body
)
>
0
{
p
.
c
.
Set
(
OpsUpstreamRequestBodyKey
,
string
(
p
.
body
))
}
resp
,
err
=
p
.
httpUpstream
.
Do
(
upstreamReq
,
p
.
proxyURL
,
p
.
account
.
ID
,
p
.
account
.
Concurrency
)
if
err
!=
nil
{
safeErr
:=
sanitizeUpstreamErrorMessage
(
err
.
Error
())
appendOpsUpstreamError
(
p
.
c
,
OpsUpstreamErrorEvent
{
Platform
:
p
.
account
.
Platform
,
AccountID
:
p
.
account
.
ID
,
AccountName
:
p
.
account
.
Name
,
UpstreamStatusCode
:
0
,
Kind
:
"request_error"
,
Message
:
safeErr
,
})
if
shouldAntigravityFallbackToNextURL
(
err
,
0
)
&&
urlIdx
<
len
(
availableURLs
)
-
1
{
log
.
Printf
(
"%s URL fallback (connection error): %s -> %s"
,
p
.
prefix
,
baseURL
,
availableURLs
[
urlIdx
+
1
])
continue
urlFallbackLoop
}
if
attempt
<
antigravityMaxRetries
{
log
.
Printf
(
"%s status=request_failed retry=%d/%d error=%v"
,
p
.
prefix
,
attempt
,
antigravityMaxRetries
,
err
)
if
!
sleepAntigravityBackoffWithContext
(
p
.
ctx
,
attempt
)
{
log
.
Printf
(
"%s status=context_canceled_during_backoff"
,
p
.
prefix
)
return
nil
,
p
.
ctx
.
Err
()
}
continue
}
log
.
Printf
(
"%s status=request_failed retries_exhausted error=%v"
,
p
.
prefix
,
err
)
setOpsUpstreamError
(
p
.
c
,
0
,
safeErr
,
""
)
return
nil
,
fmt
.
Errorf
(
"upstream request failed after retries: %w"
,
err
)
}
// 429 限流处理:区分 URL 级别限流和账户配额限流
if
resp
.
StatusCode
==
http
.
StatusTooManyRequests
{
respBody
,
_
:=
io
.
ReadAll
(
io
.
LimitReader
(
resp
.
Body
,
2
<<
20
))
_
=
resp
.
Body
.
Close
()
// "Resource has been exhausted" 是 URL 级别限流,切换 URL
if
isURLLevelRateLimit
(
respBody
)
&&
urlIdx
<
len
(
availableURLs
)
-
1
{
log
.
Printf
(
"%s URL fallback (429): %s -> %s"
,
p
.
prefix
,
baseURL
,
availableURLs
[
urlIdx
+
1
])
continue
urlFallbackLoop
}
// 账户/模型配额限流,重试 3 次(指数退避)
if
attempt
<
antigravityMaxRetries
{
upstreamMsg
:=
strings
.
TrimSpace
(
extractAntigravityErrorMessage
(
respBody
))
upstreamMsg
=
sanitizeUpstreamErrorMessage
(
upstreamMsg
)
appendOpsUpstreamError
(
p
.
c
,
OpsUpstreamErrorEvent
{
Platform
:
p
.
account
.
Platform
,
AccountID
:
p
.
account
.
ID
,
AccountName
:
p
.
account
.
Name
,
UpstreamStatusCode
:
resp
.
StatusCode
,
UpstreamRequestID
:
resp
.
Header
.
Get
(
"x-request-id"
),
Kind
:
"retry"
,
Message
:
upstreamMsg
,
Detail
:
getUpstreamDetail
(
respBody
),
})
log
.
Printf
(
"%s status=429 retry=%d/%d body=%s"
,
p
.
prefix
,
attempt
,
antigravityMaxRetries
,
truncateForLog
(
respBody
,
200
))
if
!
sleepAntigravityBackoffWithContext
(
p
.
ctx
,
attempt
)
{
log
.
Printf
(
"%s status=context_canceled_during_backoff"
,
p
.
prefix
)
return
nil
,
p
.
ctx
.
Err
()
}
continue
}
// 重试用尽,标记账户限流
p
.
handleError
(
p
.
ctx
,
p
.
prefix
,
p
.
account
,
resp
.
StatusCode
,
resp
.
Header
,
respBody
,
p
.
quotaScope
)
log
.
Printf
(
"%s status=429 rate_limited base_url=%s body=%s"
,
p
.
prefix
,
baseURL
,
truncateForLog
(
respBody
,
200
))
resp
=
&
http
.
Response
{
StatusCode
:
resp
.
StatusCode
,
Header
:
resp
.
Header
.
Clone
(),
Body
:
io
.
NopCloser
(
bytes
.
NewReader
(
respBody
)),
}
break
urlFallbackLoop
}
// 其他可重试错误
if
resp
.
StatusCode
>=
400
&&
shouldRetryAntigravityError
(
resp
.
StatusCode
)
{
respBody
,
_
:=
io
.
ReadAll
(
io
.
LimitReader
(
resp
.
Body
,
2
<<
20
))
_
=
resp
.
Body
.
Close
()
if
attempt
<
antigravityMaxRetries
{
upstreamMsg
:=
strings
.
TrimSpace
(
extractAntigravityErrorMessage
(
respBody
))
upstreamMsg
=
sanitizeUpstreamErrorMessage
(
upstreamMsg
)
appendOpsUpstreamError
(
p
.
c
,
OpsUpstreamErrorEvent
{
Platform
:
p
.
account
.
Platform
,
AccountID
:
p
.
account
.
ID
,
AccountName
:
p
.
account
.
Name
,
UpstreamStatusCode
:
resp
.
StatusCode
,
UpstreamRequestID
:
resp
.
Header
.
Get
(
"x-request-id"
),
Kind
:
"retry"
,
Message
:
upstreamMsg
,
Detail
:
getUpstreamDetail
(
respBody
),
})
log
.
Printf
(
"%s status=%d retry=%d/%d body=%s"
,
p
.
prefix
,
resp
.
StatusCode
,
attempt
,
antigravityMaxRetries
,
truncateForLog
(
respBody
,
500
))
if
!
sleepAntigravityBackoffWithContext
(
p
.
ctx
,
attempt
)
{
log
.
Printf
(
"%s status=context_canceled_during_backoff"
,
p
.
prefix
)
return
nil
,
p
.
ctx
.
Err
()
}
continue
}
resp
=
&
http
.
Response
{
StatusCode
:
resp
.
StatusCode
,
Header
:
resp
.
Header
.
Clone
(),
Body
:
io
.
NopCloser
(
bytes
.
NewReader
(
respBody
)),
}
break
urlFallbackLoop
}
break
urlFallbackLoop
}
}
if
resp
!=
nil
&&
resp
.
StatusCode
<
400
&&
usedBaseURL
!=
""
{
antigravity
.
DefaultURLAvailability
.
MarkSuccess
(
usedBaseURL
)
}
return
&
antigravityRetryLoopResult
{
resp
:
resp
},
nil
}
// shouldRetryAntigravityError 判断是否应该重试
func
shouldRetryAntigravityError
(
statusCode
int
)
bool
{
switch
statusCode
{
case
429
,
500
,
502
,
503
,
504
,
529
:
return
true
default
:
return
false
}
}
// isURLLevelRateLimit 判断是否为 URL 级别的限流(应切换 URL 重试)
// "Resource has been exhausted" 是 URL/节点级别限流,切换 URL 可能成功
// "exhausted your capacity on this model" 是账户/模型配额限流,切换 URL 无效
func
isURLLevelRateLimit
(
body
[]
byte
)
bool
{
// 快速检查:包含 "Resource has been exhausted" 且不包含 "capacity on this model"
bodyStr
:=
string
(
body
)
return
strings
.
Contains
(
bodyStr
,
"Resource has been exhausted"
)
&&
!
strings
.
Contains
(
bodyStr
,
"capacity on this model"
)
}
// isAntigravityConnectionError 判断是否为连接错误(网络超时、DNS 失败、连接拒绝)
func
isAntigravityConnectionError
(
err
error
)
bool
{
if
err
==
nil
{
...
...
@@ -238,7 +440,6 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account
if
err
!=
nil
{
lastErr
=
fmt
.
Errorf
(
"请求失败: %w"
,
err
)
if
shouldAntigravityFallbackToNextURL
(
err
,
0
)
&&
urlIdx
<
len
(
availableURLs
)
-
1
{
antigravity
.
DefaultURLAvailability
.
MarkUnavailable
(
baseURL
)
log
.
Printf
(
"[antigravity-Test] URL fallback: %s -> %s"
,
baseURL
,
availableURLs
[
urlIdx
+
1
])
continue
}
...
...
@@ -254,7 +455,6 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account
// 检查是否需要 URL 降级
if
shouldAntigravityFallbackToNextURL
(
nil
,
resp
.
StatusCode
)
&&
urlIdx
<
len
(
availableURLs
)
-
1
{
antigravity
.
DefaultURLAvailability
.
MarkUnavailable
(
baseURL
)
log
.
Printf
(
"[antigravity-Test] URL fallback (HTTP %d): %s -> %s"
,
resp
.
StatusCode
,
baseURL
,
availableURLs
[
urlIdx
+
1
])
continue
}
...
...
@@ -266,6 +466,8 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account
// 解析流式响应,提取文本
text
:=
extractTextFromSSEResponse
(
respBody
)
// 标记成功的 URL,下次优先使用
antigravity
.
DefaultURLAvailability
.
MarkSuccess
(
baseURL
)
return
&
TestConnectionResult
{
Text
:
text
,
MappedModel
:
mappedModel
,
...
...
@@ -276,13 +478,14 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account
}
// buildGeminiTestRequest 构建 Gemini 格式测试请求
// 使用最小 token 消耗:输入 "." + maxOutputTokens: 1
func
(
s
*
AntigravityGatewayService
)
buildGeminiTestRequest
(
projectID
,
model
string
)
([]
byte
,
error
)
{
payload
:=
map
[
string
]
any
{
"contents"
:
[]
map
[
string
]
any
{
{
"role"
:
"user"
,
"parts"
:
[]
map
[
string
]
any
{
{
"text"
:
"
hi
"
},
{
"text"
:
"
.
"
},
},
},
},
...
...
@@ -292,22 +495,26 @@ func (s *AntigravityGatewayService) buildGeminiTestRequest(projectID, model stri
{
"text"
:
antigravity
.
GetDefaultIdentityPatch
()},
},
},
"generationConfig"
:
map
[
string
]
any
{
"maxOutputTokens"
:
1
,
},
}
payloadBytes
,
_
:=
json
.
Marshal
(
payload
)
return
s
.
wrapV1InternalRequest
(
projectID
,
model
,
payloadBytes
)
}
// buildClaudeTestRequest 构建 Claude 格式测试请求并转换为 Gemini 格式
// 使用最小 token 消耗:输入 "." + MaxTokens: 1
func
(
s
*
AntigravityGatewayService
)
buildClaudeTestRequest
(
projectID
,
mappedModel
string
)
([]
byte
,
error
)
{
claudeReq
:=
&
antigravity
.
ClaudeRequest
{
Model
:
mappedModel
,
Messages
:
[]
antigravity
.
ClaudeMessage
{
{
Role
:
"user"
,
Content
:
json
.
RawMessage
(
`"
hi
"`
),
Content
:
json
.
RawMessage
(
`"
.
"`
),
},
},
MaxTokens
:
1
024
,
MaxTokens
:
1
,
Stream
:
false
,
}
return
antigravity
.
TransformClaudeToGemini
(
claudeReq
,
projectID
,
mappedModel
)
...
...
@@ -523,9 +730,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
proxyURL
=
account
.
Proxy
.
URL
()
}
// Sanitize thinking blocks (clean cache_control and flatten history thinking)
sanitizeThinkingBlocks
(
&
claudeReq
)
// 获取转换选项
// Antigravity 上游要求必须包含身份提示词,否则会返回 429
transformOpts
:=
s
.
getClaudeTransformOptions
(
ctx
)
...
...
@@ -537,150 +741,29 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
return
nil
,
fmt
.
Errorf
(
"transform request: %w"
,
err
)
}
// Safety net: ensure no cache_control leaked into Gemini request
geminiBody
=
cleanCacheControlFromGeminiJSON
(
geminiBody
)
// Antigravity 上游只支持流式请求,统一使用 streamGenerateContent
// 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后转换返回
action
:=
"streamGenerateContent"
// URL fallback 循环
availableURLs
:=
antigravity
.
DefaultURLAvailability
.
GetAvailableURLs
()
if
len
(
availableURLs
)
==
0
{
availableURLs
=
antigravity
.
BaseURLs
// 所有 URL 都不可用时,重试所有
}
// 重试循环
var
resp
*
http
.
Response
urlFallbackLoop
:
for
urlIdx
,
baseURL
:=
range
availableURLs
{
for
attempt
:=
1
;
attempt
<=
antigravityMaxRetries
;
attempt
++
{
// 检查 context 是否已取消(客户端断开连接)
select
{
case
<-
ctx
.
Done
()
:
log
.
Printf
(
"%s status=context_canceled error=%v"
,
prefix
,
ctx
.
Err
())
return
nil
,
ctx
.
Err
()
default
:
}
upstreamReq
,
err
:=
antigravity
.
NewAPIRequestWithURL
(
ctx
,
baseURL
,
action
,
accessToken
,
geminiBody
)
// Capture upstream request body for ops retry of this attempt.
if
c
!=
nil
{
c
.
Set
(
OpsUpstreamRequestBodyKey
,
string
(
geminiBody
))
}
if
err
!=
nil
{
return
nil
,
err
}
resp
,
err
=
s
.
httpUpstream
.
Do
(
upstreamReq
,
proxyURL
,
account
.
ID
,
account
.
Concurrency
)
if
err
!=
nil
{
safeErr
:=
sanitizeUpstreamErrorMessage
(
err
.
Error
())
appendOpsUpstreamError
(
c
,
OpsUpstreamErrorEvent
{
Platform
:
account
.
Platform
,
AccountID
:
account
.
ID
,
AccountName
:
account
.
Name
,
UpstreamStatusCode
:
0
,
Kind
:
"request_error"
,
Message
:
safeErr
,
// 执行带重试的请求
result
,
err
:=
antigravityRetryLoop
(
antigravityRetryLoopParams
{
ctx
:
ctx
,
prefix
:
prefix
,
account
:
account
,
proxyURL
:
proxyURL
,
accessToken
:
accessToken
,
action
:
action
,
body
:
geminiBody
,
quotaScope
:
quotaScope
,
c
:
c
,
httpUpstream
:
s
.
httpUpstream
,
settingService
:
s
.
settingService
,
handleError
:
s
.
handleUpstreamError
,
})
// 检查是否应触发 URL 降级
if
shouldAntigravityFallbackToNextURL
(
err
,
0
)
&&
urlIdx
<
len
(
availableURLs
)
-
1
{
antigravity
.
DefaultURLAvailability
.
MarkUnavailable
(
baseURL
)
log
.
Printf
(
"%s URL fallback (connection error): %s -> %s"
,
prefix
,
baseURL
,
availableURLs
[
urlIdx
+
1
])
continue
urlFallbackLoop
}
if
attempt
<
antigravityMaxRetries
{
log
.
Printf
(
"%s status=request_failed retry=%d/%d error=%v"
,
prefix
,
attempt
,
antigravityMaxRetries
,
err
)
if
!
sleepAntigravityBackoffWithContext
(
ctx
,
attempt
)
{
log
.
Printf
(
"%s status=context_canceled_during_backoff"
,
prefix
)
return
nil
,
ctx
.
Err
()
}
continue
}
log
.
Printf
(
"%s status=request_failed retries_exhausted error=%v"
,
prefix
,
err
)
setOpsUpstreamError
(
c
,
0
,
safeErr
,
""
)
if
err
!=
nil
{
return
nil
,
s
.
writeClaudeError
(
c
,
http
.
StatusBadGateway
,
"upstream_error"
,
"Upstream request failed after retries"
)
}
// 检查是否应触发 URL 降级(仅 429)
if
resp
.
StatusCode
==
http
.
StatusTooManyRequests
&&
urlIdx
<
len
(
availableURLs
)
-
1
{
respBody
,
_
:=
io
.
ReadAll
(
io
.
LimitReader
(
resp
.
Body
,
2
<<
20
))
_
=
resp
.
Body
.
Close
()
upstreamMsg
:=
strings
.
TrimSpace
(
extractAntigravityErrorMessage
(
respBody
))
upstreamMsg
=
sanitizeUpstreamErrorMessage
(
upstreamMsg
)
logBody
:=
s
.
settingService
!=
nil
&&
s
.
settingService
.
cfg
!=
nil
&&
s
.
settingService
.
cfg
.
Gateway
.
LogUpstreamErrorBody
maxBytes
:=
2048
if
s
.
settingService
!=
nil
&&
s
.
settingService
.
cfg
!=
nil
&&
s
.
settingService
.
cfg
.
Gateway
.
LogUpstreamErrorBodyMaxBytes
>
0
{
maxBytes
=
s
.
settingService
.
cfg
.
Gateway
.
LogUpstreamErrorBodyMaxBytes
}
upstreamDetail
:=
""
if
logBody
{
upstreamDetail
=
truncateString
(
string
(
respBody
),
maxBytes
)
}
appendOpsUpstreamError
(
c
,
OpsUpstreamErrorEvent
{
Platform
:
account
.
Platform
,
AccountID
:
account
.
ID
,
AccountName
:
account
.
Name
,
UpstreamStatusCode
:
resp
.
StatusCode
,
UpstreamRequestID
:
resp
.
Header
.
Get
(
"x-request-id"
),
Kind
:
"retry"
,
Message
:
upstreamMsg
,
Detail
:
upstreamDetail
,
})
antigravity
.
DefaultURLAvailability
.
MarkUnavailable
(
baseURL
)
log
.
Printf
(
"%s URL fallback (HTTP 429): %s -> %s body=%s"
,
prefix
,
baseURL
,
availableURLs
[
urlIdx
+
1
],
truncateForLog
(
respBody
,
200
))
continue
urlFallbackLoop
}
if
resp
.
StatusCode
>=
400
&&
s
.
shouldRetryUpstreamError
(
resp
.
StatusCode
)
{
respBody
,
_
:=
io
.
ReadAll
(
io
.
LimitReader
(
resp
.
Body
,
2
<<
20
))
_
=
resp
.
Body
.
Close
()
if
attempt
<
antigravityMaxRetries
{
upstreamMsg
:=
strings
.
TrimSpace
(
extractAntigravityErrorMessage
(
respBody
))
upstreamMsg
=
sanitizeUpstreamErrorMessage
(
upstreamMsg
)
logBody
:=
s
.
settingService
!=
nil
&&
s
.
settingService
.
cfg
!=
nil
&&
s
.
settingService
.
cfg
.
Gateway
.
LogUpstreamErrorBody
maxBytes
:=
2048
if
s
.
settingService
!=
nil
&&
s
.
settingService
.
cfg
!=
nil
&&
s
.
settingService
.
cfg
.
Gateway
.
LogUpstreamErrorBodyMaxBytes
>
0
{
maxBytes
=
s
.
settingService
.
cfg
.
Gateway
.
LogUpstreamErrorBodyMaxBytes
}
upstreamDetail
:=
""
if
logBody
{
upstreamDetail
=
truncateString
(
string
(
respBody
),
maxBytes
)
}
appendOpsUpstreamError
(
c
,
OpsUpstreamErrorEvent
{
Platform
:
account
.
Platform
,
AccountID
:
account
.
ID
,
AccountName
:
account
.
Name
,
UpstreamStatusCode
:
resp
.
StatusCode
,
UpstreamRequestID
:
resp
.
Header
.
Get
(
"x-request-id"
),
Kind
:
"retry"
,
Message
:
upstreamMsg
,
Detail
:
upstreamDetail
,
})
log
.
Printf
(
"%s status=%d retry=%d/%d body=%s"
,
prefix
,
resp
.
StatusCode
,
attempt
,
antigravityMaxRetries
,
truncateForLog
(
respBody
,
500
))
if
!
sleepAntigravityBackoffWithContext
(
ctx
,
attempt
)
{
log
.
Printf
(
"%s status=context_canceled_during_backoff"
,
prefix
)
return
nil
,
ctx
.
Err
()
}
continue
}
// 所有重试都失败,标记限流状态
if
resp
.
StatusCode
==
429
{
s
.
handleUpstreamError
(
ctx
,
prefix
,
account
,
resp
.
StatusCode
,
resp
.
Header
,
respBody
,
quotaScope
)
}
// 最后一次尝试也失败
resp
=
&
http
.
Response
{
StatusCode
:
resp
.
StatusCode
,
Header
:
resp
.
Header
.
Clone
(),
Body
:
io
.
NopCloser
(
bytes
.
NewReader
(
respBody
)),
}
break
urlFallbackLoop
}
break
urlFallbackLoop
}
}
resp
:=
result
.
resp
defer
func
()
{
_
=
resp
.
Body
.
Close
()
}()
if
resp
.
StatusCode
>=
400
{
...
...
@@ -739,11 +822,20 @@ urlFallbackLoop:
if
txErr
!=
nil
{
continue
}
retryReq
,
buildErr
:=
antigravity
.
NewAPIRequest
(
ctx
,
action
,
accessToken
,
retryGeminiBody
)
if
buildErr
!=
nil
{
continue
}
retryResp
,
retryErr
:=
s
.
httpUpstream
.
Do
(
retryReq
,
proxyURL
,
account
.
ID
,
account
.
Concurrency
)
retryResult
,
retryErr
:=
antigravityRetryLoop
(
antigravityRetryLoopParams
{
ctx
:
ctx
,
prefix
:
prefix
,
account
:
account
,
proxyURL
:
proxyURL
,
accessToken
:
accessToken
,
action
:
action
,
body
:
retryGeminiBody
,
quotaScope
:
quotaScope
,
c
:
c
,
httpUpstream
:
s
.
httpUpstream
,
settingService
:
s
.
settingService
,
handleError
:
s
.
handleUpstreamError
,
})
if
retryErr
!=
nil
{
appendOpsUpstreamError
(
c
,
OpsUpstreamErrorEvent
{
Platform
:
account
.
Platform
,
...
...
@@ -757,6 +849,7 @@ urlFallbackLoop:
continue
}
retryResp
:=
retryResult
.
resp
if
retryResp
.
StatusCode
<
400
{
_
=
resp
.
Body
.
Close
()
resp
=
retryResp
...
...
@@ -766,6 +859,13 @@ urlFallbackLoop:
retryBody
,
_
:=
io
.
ReadAll
(
io
.
LimitReader
(
retryResp
.
Body
,
2
<<
20
))
_
=
retryResp
.
Body
.
Close
()
if
retryResp
.
StatusCode
==
http
.
StatusTooManyRequests
{
retryBaseURL
:=
""
if
retryResp
.
Request
!=
nil
&&
retryResp
.
Request
.
URL
!=
nil
{
retryBaseURL
=
retryResp
.
Request
.
URL
.
Scheme
+
"://"
+
retryResp
.
Request
.
URL
.
Host
}
log
.
Printf
(
"%s status=429 rate_limited base_url=%s retry_stage=%s body=%s"
,
prefix
,
retryBaseURL
,
stage
.
name
,
truncateForLog
(
retryBody
,
200
))
}
kind
:=
"signature_retry"
if
strings
.
TrimSpace
(
stage
.
name
)
!=
""
{
kind
=
"signature_retry_"
+
strings
.
ReplaceAll
(
stage
.
name
,
"+"
,
"_"
)
...
...
@@ -920,143 +1020,6 @@ func extractAntigravityErrorMessage(body []byte) string {
return
""
}
// cleanCacheControlFromGeminiJSON removes cache_control from Gemini JSON (emergency fix)
// This should not be needed if transformation is correct, but serves as a safety net
func
cleanCacheControlFromGeminiJSON
(
body
[]
byte
)
[]
byte
{
// Try a more robust approach: parse and clean
var
data
map
[
string
]
any
if
err
:=
json
.
Unmarshal
(
body
,
&
data
);
err
!=
nil
{
log
.
Printf
(
"[Antigravity] Failed to parse Gemini JSON for cache_control cleaning: %v"
,
err
)
return
body
}
cleaned
:=
removeCacheControlFromAny
(
data
)
if
!
cleaned
{
return
body
}
if
result
,
err
:=
json
.
Marshal
(
data
);
err
==
nil
{
log
.
Printf
(
"[Antigravity] Successfully cleaned cache_control from Gemini JSON"
)
return
result
}
return
body
}
// removeCacheControlFromAny recursively removes cache_control fields
func
removeCacheControlFromAny
(
v
any
)
bool
{
cleaned
:=
false
switch
val
:=
v
.
(
type
)
{
case
map
[
string
]
any
:
for
k
,
child
:=
range
val
{
if
k
==
"cache_control"
{
delete
(
val
,
k
)
cleaned
=
true
}
else
if
removeCacheControlFromAny
(
child
)
{
cleaned
=
true
}
}
case
[]
any
:
for
_
,
item
:=
range
val
{
if
removeCacheControlFromAny
(
item
)
{
cleaned
=
true
}
}
}
return
cleaned
}
// sanitizeThinkingBlocks cleans cache_control and flattens history thinking blocks
// Thinking blocks do NOT support cache_control field (Anthropic API/Vertex AI requirement)
// Additionally, history thinking blocks are flattened to text to avoid upstream validation errors
func
sanitizeThinkingBlocks
(
req
*
antigravity
.
ClaudeRequest
)
{
if
req
==
nil
{
return
}
log
.
Printf
(
"[Antigravity] sanitizeThinkingBlocks: processing request with %d messages"
,
len
(
req
.
Messages
))
// Clean system blocks
if
len
(
req
.
System
)
>
0
{
var
systemBlocks
[]
map
[
string
]
any
if
err
:=
json
.
Unmarshal
(
req
.
System
,
&
systemBlocks
);
err
==
nil
{
for
i
:=
range
systemBlocks
{
if
blockType
,
_
:=
systemBlocks
[
i
][
"type"
]
.
(
string
);
blockType
==
"thinking"
||
systemBlocks
[
i
][
"thinking"
]
!=
nil
{
if
removeCacheControlFromAny
(
systemBlocks
[
i
])
{
log
.
Printf
(
"[Antigravity] Deep cleaned cache_control from thinking block in system[%d]"
,
i
)
}
}
}
// Marshal back
if
cleaned
,
err
:=
json
.
Marshal
(
systemBlocks
);
err
==
nil
{
req
.
System
=
cleaned
}
}
}
// Clean message content blocks and flatten history
lastMsgIdx
:=
len
(
req
.
Messages
)
-
1
for
msgIdx
:=
range
req
.
Messages
{
raw
:=
req
.
Messages
[
msgIdx
]
.
Content
if
len
(
raw
)
==
0
{
continue
}
// Try to parse as blocks array
var
blocks
[]
map
[
string
]
any
if
err
:=
json
.
Unmarshal
(
raw
,
&
blocks
);
err
!=
nil
{
continue
}
cleaned
:=
false
for
blockIdx
:=
range
blocks
{
blockType
,
_
:=
blocks
[
blockIdx
][
"type"
]
.
(
string
)
// Check for thinking blocks (typed or untyped)
if
blockType
==
"thinking"
||
blocks
[
blockIdx
][
"thinking"
]
!=
nil
{
// 1. Clean cache_control
if
removeCacheControlFromAny
(
blocks
[
blockIdx
])
{
log
.
Printf
(
"[Antigravity] Deep cleaned cache_control from thinking block in messages[%d].content[%d]"
,
msgIdx
,
blockIdx
)
cleaned
=
true
}
// 2. Flatten to text if it's a history message (not the last one)
if
msgIdx
<
lastMsgIdx
{
log
.
Printf
(
"[Antigravity] Flattening history thinking block to text at messages[%d].content[%d]"
,
msgIdx
,
blockIdx
)
// Extract thinking content
var
textContent
string
if
t
,
ok
:=
blocks
[
blockIdx
][
"thinking"
]
.
(
string
);
ok
{
textContent
=
t
}
else
{
// Fallback for non-string content (marshal it)
if
b
,
err
:=
json
.
Marshal
(
blocks
[
blockIdx
][
"thinking"
]);
err
==
nil
{
textContent
=
string
(
b
)
}
}
// Convert to text block
blocks
[
blockIdx
][
"type"
]
=
"text"
blocks
[
blockIdx
][
"text"
]
=
textContent
delete
(
blocks
[
blockIdx
],
"thinking"
)
delete
(
blocks
[
blockIdx
],
"signature"
)
delete
(
blocks
[
blockIdx
],
"cache_control"
)
// Ensure it's gone
cleaned
=
true
}
}
}
// Marshal back if modified
if
cleaned
{
if
marshaled
,
err
:=
json
.
Marshal
(
blocks
);
err
==
nil
{
req
.
Messages
[
msgIdx
]
.
Content
=
marshaled
}
}
}
}
// stripThinkingFromClaudeRequest converts thinking blocks to text blocks in a Claude Messages request.
// This preserves the thinking content while avoiding signature validation errors.
// Note: redacted_thinking blocks are removed because they cannot be converted to text.
...
...
@@ -1352,138 +1315,25 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
// 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后返回
upstreamAction
:=
"streamGenerateContent"
// URL fallback 循环
availableURLs
:=
antigravity
.
DefaultURLAvailability
.
GetAvailableURLs
()
if
len
(
availableURLs
)
==
0
{
availableURLs
=
antigravity
.
BaseURLs
// 所有 URL 都不可用时,重试所有
}
// 重试循环
var
resp
*
http
.
Response
urlFallbackLoop
:
for
urlIdx
,
baseURL
:=
range
availableURLs
{
for
attempt
:=
1
;
attempt
<=
antigravityMaxRetries
;
attempt
++
{
// 检查 context 是否已取消(客户端断开连接)
select
{
case
<-
ctx
.
Done
()
:
log
.
Printf
(
"%s status=context_canceled error=%v"
,
prefix
,
ctx
.
Err
())
return
nil
,
ctx
.
Err
()
default
:
}
upstreamReq
,
err
:=
antigravity
.
NewAPIRequestWithURL
(
ctx
,
baseURL
,
upstreamAction
,
accessToken
,
wrappedBody
)
if
err
!=
nil
{
return
nil
,
err
}
resp
,
err
=
s
.
httpUpstream
.
Do
(
upstreamReq
,
proxyURL
,
account
.
ID
,
account
.
Concurrency
)
if
err
!=
nil
{
safeErr
:=
sanitizeUpstreamErrorMessage
(
err
.
Error
())
appendOpsUpstreamError
(
c
,
OpsUpstreamErrorEvent
{
Platform
:
account
.
Platform
,
AccountID
:
account
.
ID
,
AccountName
:
account
.
Name
,
UpstreamStatusCode
:
0
,
Kind
:
"request_error"
,
Message
:
safeErr
,
// 执行带重试的请求
result
,
err
:=
antigravityRetryLoop
(
antigravityRetryLoopParams
{
ctx
:
ctx
,
prefix
:
prefix
,
account
:
account
,
proxyURL
:
proxyURL
,
accessToken
:
accessToken
,
action
:
upstreamAction
,
body
:
wrappedBody
,
quotaScope
:
quotaScope
,
c
:
c
,
httpUpstream
:
s
.
httpUpstream
,
settingService
:
s
.
settingService
,
handleError
:
s
.
handleUpstreamError
,
})
// 检查是否应触发 URL 降级
if
shouldAntigravityFallbackToNextURL
(
err
,
0
)
&&
urlIdx
<
len
(
availableURLs
)
-
1
{
antigravity
.
DefaultURLAvailability
.
MarkUnavailable
(
baseURL
)
log
.
Printf
(
"%s URL fallback (connection error): %s -> %s"
,
prefix
,
baseURL
,
availableURLs
[
urlIdx
+
1
])
continue
urlFallbackLoop
}
if
attempt
<
antigravityMaxRetries
{
log
.
Printf
(
"%s status=request_failed retry=%d/%d error=%v"
,
prefix
,
attempt
,
antigravityMaxRetries
,
err
)
if
!
sleepAntigravityBackoffWithContext
(
ctx
,
attempt
)
{
log
.
Printf
(
"%s status=context_canceled_during_backoff"
,
prefix
)
return
nil
,
ctx
.
Err
()
}
continue
}
log
.
Printf
(
"%s status=request_failed retries_exhausted error=%v"
,
prefix
,
err
)
setOpsUpstreamError
(
c
,
0
,
safeErr
,
""
)
if
err
!=
nil
{
return
nil
,
s
.
writeGoogleError
(
c
,
http
.
StatusBadGateway
,
"Upstream request failed after retries"
)
}
// 检查是否应触发 URL 降级(仅 429)
if
resp
.
StatusCode
==
http
.
StatusTooManyRequests
&&
urlIdx
<
len
(
availableURLs
)
-
1
{
respBody
,
_
:=
io
.
ReadAll
(
io
.
LimitReader
(
resp
.
Body
,
2
<<
20
))
_
=
resp
.
Body
.
Close
()
upstreamMsg
:=
strings
.
TrimSpace
(
extractAntigravityErrorMessage
(
respBody
))
upstreamMsg
=
sanitizeUpstreamErrorMessage
(
upstreamMsg
)
logBody
:=
s
.
settingService
!=
nil
&&
s
.
settingService
.
cfg
!=
nil
&&
s
.
settingService
.
cfg
.
Gateway
.
LogUpstreamErrorBody
maxBytes
:=
2048
if
s
.
settingService
!=
nil
&&
s
.
settingService
.
cfg
!=
nil
&&
s
.
settingService
.
cfg
.
Gateway
.
LogUpstreamErrorBodyMaxBytes
>
0
{
maxBytes
=
s
.
settingService
.
cfg
.
Gateway
.
LogUpstreamErrorBodyMaxBytes
}
upstreamDetail
:=
""
if
logBody
{
upstreamDetail
=
truncateString
(
string
(
respBody
),
maxBytes
)
}
appendOpsUpstreamError
(
c
,
OpsUpstreamErrorEvent
{
Platform
:
account
.
Platform
,
AccountID
:
account
.
ID
,
AccountName
:
account
.
Name
,
UpstreamStatusCode
:
resp
.
StatusCode
,
UpstreamRequestID
:
resp
.
Header
.
Get
(
"x-request-id"
),
Kind
:
"retry"
,
Message
:
upstreamMsg
,
Detail
:
upstreamDetail
,
})
antigravity
.
DefaultURLAvailability
.
MarkUnavailable
(
baseURL
)
log
.
Printf
(
"%s URL fallback (HTTP 429): %s -> %s body=%s"
,
prefix
,
baseURL
,
availableURLs
[
urlIdx
+
1
],
truncateForLog
(
respBody
,
200
))
continue
urlFallbackLoop
}
if
resp
.
StatusCode
>=
400
&&
s
.
shouldRetryUpstreamError
(
resp
.
StatusCode
)
{
respBody
,
_
:=
io
.
ReadAll
(
io
.
LimitReader
(
resp
.
Body
,
2
<<
20
))
_
=
resp
.
Body
.
Close
()
if
attempt
<
antigravityMaxRetries
{
upstreamMsg
:=
strings
.
TrimSpace
(
extractAntigravityErrorMessage
(
respBody
))
upstreamMsg
=
sanitizeUpstreamErrorMessage
(
upstreamMsg
)
logBody
:=
s
.
settingService
!=
nil
&&
s
.
settingService
.
cfg
!=
nil
&&
s
.
settingService
.
cfg
.
Gateway
.
LogUpstreamErrorBody
maxBytes
:=
2048
if
s
.
settingService
!=
nil
&&
s
.
settingService
.
cfg
!=
nil
&&
s
.
settingService
.
cfg
.
Gateway
.
LogUpstreamErrorBodyMaxBytes
>
0
{
maxBytes
=
s
.
settingService
.
cfg
.
Gateway
.
LogUpstreamErrorBodyMaxBytes
}
upstreamDetail
:=
""
if
logBody
{
upstreamDetail
=
truncateString
(
string
(
respBody
),
maxBytes
)
}
appendOpsUpstreamError
(
c
,
OpsUpstreamErrorEvent
{
Platform
:
account
.
Platform
,
AccountID
:
account
.
ID
,
AccountName
:
account
.
Name
,
UpstreamStatusCode
:
resp
.
StatusCode
,
UpstreamRequestID
:
resp
.
Header
.
Get
(
"x-request-id"
),
Kind
:
"retry"
,
Message
:
upstreamMsg
,
Detail
:
upstreamDetail
,
})
log
.
Printf
(
"%s status=%d retry=%d/%d"
,
prefix
,
resp
.
StatusCode
,
attempt
,
antigravityMaxRetries
)
if
!
sleepAntigravityBackoffWithContext
(
ctx
,
attempt
)
{
log
.
Printf
(
"%s status=context_canceled_during_backoff"
,
prefix
)
return
nil
,
ctx
.
Err
()
}
continue
}
// 所有重试都失败,标记限流状态
if
resp
.
StatusCode
==
429
{
s
.
handleUpstreamError
(
ctx
,
prefix
,
account
,
resp
.
StatusCode
,
resp
.
Header
,
respBody
,
quotaScope
)
}
resp
=
&
http
.
Response
{
StatusCode
:
resp
.
StatusCode
,
Header
:
resp
.
Header
.
Clone
(),
Body
:
io
.
NopCloser
(
bytes
.
NewReader
(
respBody
)),
}
break
urlFallbackLoop
}
break
urlFallbackLoop
}
}
resp
:=
result
.
resp
defer
func
()
{
if
resp
!=
nil
&&
resp
.
Body
!=
nil
{
_
=
resp
.
Body
.
Close
()
...
...
@@ -1525,8 +1375,6 @@ urlFallbackLoop:
goto
handleSuccess
}
s
.
handleUpstreamError
(
ctx
,
prefix
,
account
,
resp
.
StatusCode
,
resp
.
Header
,
respBody
,
quotaScope
)
requestID
:=
resp
.
Header
.
Get
(
"x-request-id"
)
if
requestID
!=
""
{
c
.
Header
(
"x-request-id"
,
requestID
)
...
...
@@ -1537,6 +1385,7 @@ urlFallbackLoop:
if
unwrapErr
!=
nil
||
len
(
unwrappedForOps
)
==
0
{
unwrappedForOps
=
respBody
}
s
.
handleUpstreamError
(
ctx
,
prefix
,
account
,
resp
.
StatusCode
,
resp
.
Header
,
respBody
,
quotaScope
)
upstreamMsg
:=
strings
.
TrimSpace
(
extractAntigravityErrorMessage
(
unwrappedForOps
))
upstreamMsg
=
sanitizeUpstreamErrorMessage
(
upstreamMsg
)
...
...
@@ -1581,6 +1430,7 @@ urlFallbackLoop:
Message
:
upstreamMsg
,
Detail
:
upstreamDetail
,
})
log
.
Printf
(
"[antigravity-Forward] upstream error status=%d body=%s"
,
resp
.
StatusCode
,
truncateForLog
(
unwrappedForOps
,
500
))
c
.
Data
(
resp
.
StatusCode
,
contentType
,
unwrappedForOps
)
return
nil
,
fmt
.
Errorf
(
"antigravity upstream error: %d"
,
resp
.
StatusCode
)
}
...
...
@@ -1637,15 +1487,6 @@ handleSuccess:
},
nil
}
func
(
s
*
AntigravityGatewayService
)
shouldRetryUpstreamError
(
statusCode
int
)
bool
{
switch
statusCode
{
case
429
,
500
,
502
,
503
,
504
,
529
:
return
true
default
:
return
false
}
}
func
(
s
*
AntigravityGatewayService
)
shouldFailoverUpstreamError
(
statusCode
int
)
bool
{
switch
statusCode
{
case
401
,
403
,
429
,
529
:
...
...
@@ -1679,34 +1520,49 @@ func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool {
}
}
func
antigravityUseScopeRateLimit
()
bool
{
v
:=
strings
.
ToLower
(
strings
.
TrimSpace
(
os
.
Getenv
(
antigravityScopeRateLimitEnv
)))
return
v
==
"1"
||
v
==
"true"
||
v
==
"yes"
||
v
==
"on"
}
func
(
s
*
AntigravityGatewayService
)
handleUpstreamError
(
ctx
context
.
Context
,
prefix
string
,
account
*
Account
,
statusCode
int
,
headers
http
.
Header
,
body
[]
byte
,
quotaScope
AntigravityQuotaScope
)
{
// 429 使用 Gemini 格式解析(从 body 解析重置时间)
if
statusCode
==
429
{
useScopeLimit
:=
antigravityUseScopeRateLimit
()
&&
quotaScope
!=
""
resetAt
:=
ParseGeminiRateLimitResetTime
(
body
)
if
resetAt
==
nil
{
// 解析失败:
Gemini 有重试时间用 5 分钟,Claude 没有用 1 分钟
defaultDur
:=
1
*
time
.
Minute
if
bytes
.
Contains
(
body
,
[]
byte
(
"Please retry in"
))
||
bytes
.
Contains
(
body
,
[]
byte
(
"retryDelay"
))
{
defaultDur
=
5
*
time
.
Minute
// 解析失败:
使用配置的 fallback 时间,直接限流整个账户
fallbackMinutes
:=
5
if
s
.
settingService
!=
nil
&&
s
.
settingService
.
cfg
!=
nil
&&
s
.
settingService
.
cfg
.
Gateway
.
AntigravityFallbackCooldownMinutes
>
0
{
fallbackMinutes
=
s
.
settingService
.
cfg
.
Gateway
.
AntigravityFallbackCooldown
Minute
s
}
defaultDur
:=
time
.
Duration
(
fallbackMinutes
)
*
time
.
Minute
ra
:=
time
.
Now
()
.
Add
(
defaultDur
)
if
useScopeLimit
{
log
.
Printf
(
"%s status=429 rate_limited scope=%s reset_in=%v (fallback)"
,
prefix
,
quotaScope
,
defaultDur
)
if
quotaScope
==
""
{
return
}
if
err
:=
s
.
accountRepo
.
SetAntigravityQuotaScopeLimit
(
ctx
,
account
.
ID
,
quotaScope
,
ra
);
err
!=
nil
{
log
.
Printf
(
"%s status=429 rate_limit_set_failed scope=%s error=%v"
,
prefix
,
quotaScope
,
err
)
}
}
else
{
log
.
Printf
(
"%s status=429 rate_limited account=%d reset_in=%v (fallback)"
,
prefix
,
account
.
ID
,
defaultDur
)
if
err
:=
s
.
accountRepo
.
SetRateLimited
(
ctx
,
account
.
ID
,
ra
);
err
!=
nil
{
log
.
Printf
(
"%s status=429 rate_limit_set_failed account=%d error=%v"
,
prefix
,
account
.
ID
,
err
)
}
}
return
}
resetTime
:=
time
.
Unix
(
*
resetAt
,
0
)
if
useScopeLimit
{
log
.
Printf
(
"%s status=429 rate_limited scope=%s reset_at=%v reset_in=%v"
,
prefix
,
quotaScope
,
resetTime
.
Format
(
"15:04:05"
),
time
.
Until
(
resetTime
)
.
Truncate
(
time
.
Second
))
if
quotaScope
==
""
{
return
}
if
err
:=
s
.
accountRepo
.
SetAntigravityQuotaScopeLimit
(
ctx
,
account
.
ID
,
quotaScope
,
resetTime
);
err
!=
nil
{
log
.
Printf
(
"%s status=429 rate_limit_set_failed scope=%s error=%v"
,
prefix
,
quotaScope
,
err
)
}
}
else
{
log
.
Printf
(
"%s status=429 rate_limited account=%d reset_at=%v reset_in=%v"
,
prefix
,
account
.
ID
,
resetTime
.
Format
(
"15:04:05"
),
time
.
Until
(
resetTime
)
.
Truncate
(
time
.
Second
))
if
err
:=
s
.
accountRepo
.
SetRateLimited
(
ctx
,
account
.
ID
,
resetTime
);
err
!=
nil
{
log
.
Printf
(
"%s status=429 rate_limit_set_failed account=%d error=%v"
,
prefix
,
account
.
ID
,
err
)
}
}
return
}
// 其他错误码继续使用 rateLimitService
...
...
@@ -1884,7 +1740,7 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context
}
// handleGeminiStreamToNonStreaming 读取上游流式响应,合并为非流式响应返回给客户端
// Gemini 流式响应
中每个 chunk 都包含累积的完整文本,只需保留最后一个有效响应
// Gemini 流式响应
是增量的,需要累积所有 chunk 的内容
func
(
s
*
AntigravityGatewayService
)
handleGeminiStreamToNonStreaming
(
c
*
gin
.
Context
,
resp
*
http
.
Response
,
startTime
time
.
Time
)
(
*
antigravityStreamResult
,
error
)
{
scanner
:=
bufio
.
NewScanner
(
resp
.
Body
)
maxLineSize
:=
defaultMaxLineSize
...
...
@@ -1897,6 +1753,8 @@ func (s *AntigravityGatewayService) handleGeminiStreamToNonStreaming(c *gin.Cont
var
firstTokenMs
*
int
var
last
map
[
string
]
any
var
lastWithParts
map
[
string
]
any
var
collectedImageParts
[]
map
[
string
]
any
// 收集所有包含图片的 parts
var
collectedTextParts
[]
string
// 收集所有文本片段
type
scanEvent
struct
{
line
string
...
...
@@ -1999,6 +1857,16 @@ func (s *AntigravityGatewayService) handleGeminiStreamToNonStreaming(c *gin.Cont
// 保留最后一个有 parts 的响应
if
parts
:=
extractGeminiParts
(
parsed
);
len
(
parts
)
>
0
{
lastWithParts
=
parsed
// 收集包含图片和文本的 parts
for
_
,
part
:=
range
parts
{
if
inlineData
,
ok
:=
part
[
"inlineData"
]
.
(
map
[
string
]
any
);
ok
{
collectedImageParts
=
append
(
collectedImageParts
,
part
)
_
=
inlineData
// 避免 unused 警告
}
if
text
,
ok
:=
part
[
"text"
]
.
(
string
);
ok
&&
text
!=
""
{
collectedTextParts
=
append
(
collectedTextParts
,
text
)
}
}
}
case
<-
intervalCh
:
...
...
@@ -2020,6 +1888,16 @@ returnResponse:
log
.
Printf
(
"[antigravity-Forward] warning: empty stream response, no valid chunks received"
)
}
// 如果收集到了图片 parts,需要合并到最终响应中
if
len
(
collectedImageParts
)
>
0
{
finalResponse
=
mergeImagePartsToResponse
(
finalResponse
,
collectedImageParts
)
}
// 如果收集到了文本,需要合并到最终响应中
if
len
(
collectedTextParts
)
>
0
{
finalResponse
=
mergeTextPartsToResponse
(
finalResponse
,
collectedTextParts
)
}
respBody
,
err
:=
json
.
Marshal
(
finalResponse
)
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to marshal response: %w"
,
err
)
...
...
@@ -2029,6 +1907,115 @@ returnResponse:
return
&
antigravityStreamResult
{
usage
:
usage
,
firstTokenMs
:
firstTokenMs
},
nil
}
// getOrCreateGeminiParts 获取 Gemini 响应的 parts 结构,返回深拷贝和更新回调
func
getOrCreateGeminiParts
(
response
map
[
string
]
any
)
(
result
map
[
string
]
any
,
existingParts
[]
any
,
setParts
func
([]
any
))
{
// 深拷贝 response
result
=
make
(
map
[
string
]
any
)
for
k
,
v
:=
range
response
{
result
[
k
]
=
v
}
// 获取或创建 candidates
candidates
,
ok
:=
result
[
"candidates"
]
.
([]
any
)
if
!
ok
||
len
(
candidates
)
==
0
{
candidates
=
[]
any
{
map
[
string
]
any
{}}
}
// 获取第一个 candidate
candidate
,
ok
:=
candidates
[
0
]
.
(
map
[
string
]
any
)
if
!
ok
{
candidate
=
make
(
map
[
string
]
any
)
candidates
[
0
]
=
candidate
}
// 获取或创建 content
content
,
ok
:=
candidate
[
"content"
]
.
(
map
[
string
]
any
)
if
!
ok
{
content
=
map
[
string
]
any
{
"role"
:
"model"
}
candidate
[
"content"
]
=
content
}
// 获取现有 parts
existingParts
,
ok
=
content
[
"parts"
]
.
([]
any
)
if
!
ok
{
existingParts
=
[]
any
{}
}
// 返回更新回调
setParts
=
func
(
newParts
[]
any
)
{
content
[
"parts"
]
=
newParts
result
[
"candidates"
]
=
candidates
}
return
result
,
existingParts
,
setParts
}
// mergeImagePartsToResponse 将收集到的图片 parts 合并到 Gemini 响应中
func
mergeImagePartsToResponse
(
response
map
[
string
]
any
,
imageParts
[]
map
[
string
]
any
)
map
[
string
]
any
{
if
len
(
imageParts
)
==
0
{
return
response
}
result
,
existingParts
,
setParts
:=
getOrCreateGeminiParts
(
response
)
// 检查现有 parts 中是否已经有图片
for
_
,
p
:=
range
existingParts
{
if
pm
,
ok
:=
p
.
(
map
[
string
]
any
);
ok
{
if
_
,
hasInline
:=
pm
[
"inlineData"
];
hasInline
{
return
result
// 已有图片,不重复添加
}
}
}
// 添加收集到的图片 parts
for
_
,
imgPart
:=
range
imageParts
{
existingParts
=
append
(
existingParts
,
imgPart
)
}
setParts
(
existingParts
)
return
result
}
// mergeTextPartsToResponse 将收集到的文本合并到 Gemini 响应中
func
mergeTextPartsToResponse
(
response
map
[
string
]
any
,
textParts
[]
string
)
map
[
string
]
any
{
if
len
(
textParts
)
==
0
{
return
response
}
mergedText
:=
strings
.
Join
(
textParts
,
""
)
result
,
existingParts
,
setParts
:=
getOrCreateGeminiParts
(
response
)
// 查找并更新第一个 text part,或创建新的
newParts
:=
make
([]
any
,
0
,
len
(
existingParts
)
+
1
)
textUpdated
:=
false
for
_
,
p
:=
range
existingParts
{
pm
,
ok
:=
p
.
(
map
[
string
]
any
)
if
!
ok
{
newParts
=
append
(
newParts
,
p
)
continue
}
if
_
,
hasText
:=
pm
[
"text"
];
hasText
&&
!
textUpdated
{
// 用累积的文本替换
newPart
:=
make
(
map
[
string
]
any
)
for
k
,
v
:=
range
pm
{
newPart
[
k
]
=
v
}
newPart
[
"text"
]
=
mergedText
newParts
=
append
(
newParts
,
newPart
)
textUpdated
=
true
}
else
{
newParts
=
append
(
newParts
,
pm
)
}
}
if
!
textUpdated
{
newParts
=
append
([]
any
{
map
[
string
]
any
{
"text"
:
mergedText
}},
newParts
...
)
}
setParts
(
newParts
)
return
result
}
func
(
s
*
AntigravityGatewayService
)
writeClaudeError
(
c
*
gin
.
Context
,
status
int
,
errType
,
message
string
)
error
{
c
.
JSON
(
status
,
gin
.
H
{
"type"
:
"error"
,
...
...
backend/internal/service/antigravity_oauth_service.go
View file @
03c75787
...
...
@@ -89,6 +89,7 @@ type AntigravityTokenInfo struct {
TokenType
string
`json:"token_type"`
Email
string
`json:"email,omitempty"`
ProjectID
string
`json:"project_id,omitempty"`
ProjectIDMissing
bool
`json:"-"`
// LoadCodeAssist 未返回 project_id
}
// ExchangeCode 用 authorization code 交换 token
...
...
@@ -149,12 +150,6 @@ func (s *AntigravityOAuthService) ExchangeCode(ctx context.Context, input *Antig
result
.
ProjectID
=
loadResp
.
CloudAICompanionProject
}
// 兜底:随机生成 project_id
if
result
.
ProjectID
==
""
{
result
.
ProjectID
=
antigravity
.
GenerateMockProjectID
()
fmt
.
Printf
(
"[AntigravityOAuth] 使用随机生成的 project_id: %s
\n
"
,
result
.
ProjectID
)
}
return
result
,
nil
}
...
...
@@ -236,16 +231,24 @@ func (s *AntigravityOAuthService) RefreshAccountToken(ctx context.Context, accou
return
nil
,
err
}
// 保留原有的 project_id 和 email
existingProjectID
:=
strings
.
TrimSpace
(
account
.
GetCredential
(
"project_id"
))
if
existingProjectID
!=
""
{
tokenInfo
.
ProjectID
=
existingProjectID
}
// 保留原有的 email
existingEmail
:=
strings
.
TrimSpace
(
account
.
GetCredential
(
"email"
))
if
existingEmail
!=
""
{
tokenInfo
.
Email
=
existingEmail
}
// 每次刷新都调用 LoadCodeAssist 获取 project_id
client
:=
antigravity
.
NewClient
(
proxyURL
)
loadResp
,
_
,
err
:=
client
.
LoadCodeAssist
(
ctx
,
tokenInfo
.
AccessToken
)
if
err
!=
nil
||
loadResp
==
nil
||
loadResp
.
CloudAICompanionProject
==
""
{
// LoadCodeAssist 失败或返回空,保留原有 project_id,标记缺失
existingProjectID
:=
strings
.
TrimSpace
(
account
.
GetCredential
(
"project_id"
))
tokenInfo
.
ProjectID
=
existingProjectID
tokenInfo
.
ProjectIDMissing
=
true
}
else
{
tokenInfo
.
ProjectID
=
loadResp
.
CloudAICompanionProject
}
return
tokenInfo
,
nil
}
...
...
backend/internal/service/antigravity_quota_fetcher.go
View file @
03c75787
...
...
@@ -31,11 +31,6 @@ func (f *AntigravityQuotaFetcher) FetchQuota(ctx context.Context, account *Accou
accessToken
:=
account
.
GetCredential
(
"access_token"
)
projectID
:=
account
.
GetCredential
(
"project_id"
)
// 如果没有 project_id,生成一个随机的
if
projectID
==
""
{
projectID
=
antigravity
.
GenerateMockProjectID
()
}
client
:=
antigravity
.
NewClient
(
proxyURL
)
// 调用 API 获取配额
...
...
backend/internal/service/antigravity_rate_limit_test.go
0 → 100644
View file @
03c75787
//go:build unit
package
service
import
(
"context"
"fmt"
"io"
"net/http"
"strings"
"testing"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
"github.com/stretchr/testify/require"
)
type
stubAntigravityUpstream
struct
{
firstBase
string
secondBase
string
calls
[]
string
}
func
(
s
*
stubAntigravityUpstream
)
Do
(
req
*
http
.
Request
,
proxyURL
string
,
accountID
int64
,
accountConcurrency
int
)
(
*
http
.
Response
,
error
)
{
url
:=
req
.
URL
.
String
()
s
.
calls
=
append
(
s
.
calls
,
url
)
if
strings
.
HasPrefix
(
url
,
s
.
firstBase
)
{
return
&
http
.
Response
{
StatusCode
:
http
.
StatusTooManyRequests
,
Header
:
http
.
Header
{},
Body
:
io
.
NopCloser
(
strings
.
NewReader
(
`{"error":{"message":"Resource has been exhausted"}}`
)),
},
nil
}
return
&
http
.
Response
{
StatusCode
:
http
.
StatusOK
,
Header
:
http
.
Header
{},
Body
:
io
.
NopCloser
(
strings
.
NewReader
(
"ok"
)),
},
nil
}
type
scopeLimitCall
struct
{
accountID
int64
scope
AntigravityQuotaScope
resetAt
time
.
Time
}
type
rateLimitCall
struct
{
accountID
int64
resetAt
time
.
Time
}
type
stubAntigravityAccountRepo
struct
{
AccountRepository
scopeCalls
[]
scopeLimitCall
rateCalls
[]
rateLimitCall
}
func
(
s
*
stubAntigravityAccountRepo
)
SetAntigravityQuotaScopeLimit
(
ctx
context
.
Context
,
id
int64
,
scope
AntigravityQuotaScope
,
resetAt
time
.
Time
)
error
{
s
.
scopeCalls
=
append
(
s
.
scopeCalls
,
scopeLimitCall
{
accountID
:
id
,
scope
:
scope
,
resetAt
:
resetAt
})
return
nil
}
func
(
s
*
stubAntigravityAccountRepo
)
SetRateLimited
(
ctx
context
.
Context
,
id
int64
,
resetAt
time
.
Time
)
error
{
s
.
rateCalls
=
append
(
s
.
rateCalls
,
rateLimitCall
{
accountID
:
id
,
resetAt
:
resetAt
})
return
nil
}
func
TestAntigravityRetryLoop_URLFallback_UsesLatestSuccess
(
t
*
testing
.
T
)
{
oldBaseURLs
:=
append
([]
string
(
nil
),
antigravity
.
BaseURLs
...
)
oldAvailability
:=
antigravity
.
DefaultURLAvailability
defer
func
()
{
antigravity
.
BaseURLs
=
oldBaseURLs
antigravity
.
DefaultURLAvailability
=
oldAvailability
}()
base1
:=
"https://ag-1.test"
base2
:=
"https://ag-2.test"
antigravity
.
BaseURLs
=
[]
string
{
base1
,
base2
}
antigravity
.
DefaultURLAvailability
=
antigravity
.
NewURLAvailability
(
time
.
Minute
)
upstream
:=
&
stubAntigravityUpstream
{
firstBase
:
base1
,
secondBase
:
base2
}
account
:=
&
Account
{
ID
:
1
,
Name
:
"acc-1"
,
Platform
:
PlatformAntigravity
,
Schedulable
:
true
,
Status
:
StatusActive
,
Concurrency
:
1
,
}
var
handleErrorCalled
bool
result
,
err
:=
antigravityRetryLoop
(
antigravityRetryLoopParams
{
prefix
:
"[test]"
,
ctx
:
context
.
Background
(),
account
:
account
,
proxyURL
:
""
,
accessToken
:
"token"
,
action
:
"generateContent"
,
body
:
[]
byte
(
`{"input":"test"}`
),
quotaScope
:
AntigravityQuotaScopeClaude
,
httpUpstream
:
upstream
,
handleError
:
func
(
ctx
context
.
Context
,
prefix
string
,
account
*
Account
,
statusCode
int
,
headers
http
.
Header
,
body
[]
byte
,
quotaScope
AntigravityQuotaScope
)
{
handleErrorCalled
=
true
},
})
require
.
NoError
(
t
,
err
)
require
.
NotNil
(
t
,
result
)
require
.
NotNil
(
t
,
result
.
resp
)
defer
func
()
{
_
=
result
.
resp
.
Body
.
Close
()
}()
require
.
Equal
(
t
,
http
.
StatusOK
,
result
.
resp
.
StatusCode
)
require
.
False
(
t
,
handleErrorCalled
)
require
.
Len
(
t
,
upstream
.
calls
,
2
)
require
.
True
(
t
,
strings
.
HasPrefix
(
upstream
.
calls
[
0
],
base1
))
require
.
True
(
t
,
strings
.
HasPrefix
(
upstream
.
calls
[
1
],
base2
))
available
:=
antigravity
.
DefaultURLAvailability
.
GetAvailableURLs
()
require
.
NotEmpty
(
t
,
available
)
require
.
Equal
(
t
,
base2
,
available
[
0
])
}
func
TestAntigravityHandleUpstreamError_UsesScopeLimitWhenEnabled
(
t
*
testing
.
T
)
{
t
.
Setenv
(
antigravityScopeRateLimitEnv
,
"true"
)
repo
:=
&
stubAntigravityAccountRepo
{}
svc
:=
&
AntigravityGatewayService
{
accountRepo
:
repo
}
account
:=
&
Account
{
ID
:
9
,
Name
:
"acc-9"
,
Platform
:
PlatformAntigravity
}
body
:=
buildGeminiRateLimitBody
(
"3s"
)
svc
.
handleUpstreamError
(
context
.
Background
(),
"[test]"
,
account
,
http
.
StatusTooManyRequests
,
http
.
Header
{},
body
,
AntigravityQuotaScopeClaude
)
require
.
Len
(
t
,
repo
.
scopeCalls
,
1
)
require
.
Empty
(
t
,
repo
.
rateCalls
)
call
:=
repo
.
scopeCalls
[
0
]
require
.
Equal
(
t
,
account
.
ID
,
call
.
accountID
)
require
.
Equal
(
t
,
AntigravityQuotaScopeClaude
,
call
.
scope
)
require
.
WithinDuration
(
t
,
time
.
Now
()
.
Add
(
3
*
time
.
Second
),
call
.
resetAt
,
2
*
time
.
Second
)
}
func
TestAntigravityHandleUpstreamError_UsesAccountLimitWhenScopeDisabled
(
t
*
testing
.
T
)
{
t
.
Setenv
(
antigravityScopeRateLimitEnv
,
"false"
)
repo
:=
&
stubAntigravityAccountRepo
{}
svc
:=
&
AntigravityGatewayService
{
accountRepo
:
repo
}
account
:=
&
Account
{
ID
:
10
,
Name
:
"acc-10"
,
Platform
:
PlatformAntigravity
}
body
:=
buildGeminiRateLimitBody
(
"2s"
)
svc
.
handleUpstreamError
(
context
.
Background
(),
"[test]"
,
account
,
http
.
StatusTooManyRequests
,
http
.
Header
{},
body
,
AntigravityQuotaScopeClaude
)
require
.
Len
(
t
,
repo
.
rateCalls
,
1
)
require
.
Empty
(
t
,
repo
.
scopeCalls
)
call
:=
repo
.
rateCalls
[
0
]
require
.
Equal
(
t
,
account
.
ID
,
call
.
accountID
)
require
.
WithinDuration
(
t
,
time
.
Now
()
.
Add
(
2
*
time
.
Second
),
call
.
resetAt
,
2
*
time
.
Second
)
}
func
TestAccountIsSchedulableForModel_AntigravityRateLimits
(
t
*
testing
.
T
)
{
now
:=
time
.
Now
()
future
:=
now
.
Add
(
10
*
time
.
Minute
)
account
:=
&
Account
{
ID
:
1
,
Name
:
"acc"
,
Platform
:
PlatformAntigravity
,
Status
:
StatusActive
,
Schedulable
:
true
,
}
account
.
RateLimitResetAt
=
&
future
require
.
False
(
t
,
account
.
IsSchedulableForModel
(
"claude-sonnet-4-5"
))
require
.
False
(
t
,
account
.
IsSchedulableForModel
(
"gemini-3-flash"
))
account
.
RateLimitResetAt
=
nil
account
.
Extra
=
map
[
string
]
any
{
antigravityQuotaScopesKey
:
map
[
string
]
any
{
"claude"
:
map
[
string
]
any
{
"rate_limit_reset_at"
:
future
.
Format
(
time
.
RFC3339
),
},
},
}
require
.
False
(
t
,
account
.
IsSchedulableForModel
(
"claude-sonnet-4-5"
))
require
.
True
(
t
,
account
.
IsSchedulableForModel
(
"gemini-3-flash"
))
}
func
buildGeminiRateLimitBody
(
delay
string
)
[]
byte
{
return
[]
byte
(
fmt
.
Sprintf
(
`{"error":{"message":"too many requests","details":[{"metadata":{"quotaResetDelay":%q}}]}}`
,
delay
))
}
Prev
1
2
Next
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment