Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
陈曦
sub2api
Commits
be4e49e6
Unverified
Commit
be4e49e6
authored
Mar 08, 2026
by
神乐
Committed by
GitHub
Mar 08, 2026
Browse files
Merge branch 'Wei-Shaw:main' into fix/openai-ws-usage-refresh
parents
1307d604
03bf3485
Changes
22
Show whitespace changes
Inline
Side-by-side
.github/workflows/backend-ci.yml
View file @
be4e49e6
...
@@ -42,6 +42,6 @@ jobs:
...
@@ -42,6 +42,6 @@ jobs:
-
name
:
golangci-lint
-
name
:
golangci-lint
uses
:
golangci/golangci-lint-action@v9
uses
:
golangci/golangci-lint-action@v9
with
:
with
:
version
:
v2.
11
version
:
v2.
9
args
:
--timeout=30m
args
:
--timeout=30m
working-directory
:
backend
working-directory
:
backend
\ No newline at end of file
backend/cmd/server/wire_gen.go
View file @
be4e49e6
...
@@ -162,7 +162,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
...
@@ -162,7 +162,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
deferredService
:=
service
.
ProvideDeferredService
(
accountRepository
,
timingWheelService
)
deferredService
:=
service
.
ProvideDeferredService
(
accountRepository
,
timingWheelService
)
claudeTokenProvider
:=
service
.
NewClaudeTokenProvider
(
accountRepository
,
geminiTokenCache
,
oAuthService
)
claudeTokenProvider
:=
service
.
NewClaudeTokenProvider
(
accountRepository
,
geminiTokenCache
,
oAuthService
)
digestSessionStore
:=
service
.
NewDigestSessionStore
()
digestSessionStore
:=
service
.
NewDigestSessionStore
()
gatewayService
:=
service
.
NewGatewayService
(
accountRepository
,
groupRepository
,
usageLogRepository
,
userRepository
,
userSubscriptionRepository
,
userGroupRateRepository
,
gatewayCache
,
configConfig
,
schedulerSnapshotService
,
concurrencyService
,
billingService
,
rateLimitService
,
billingCacheService
,
identityService
,
httpUpstream
,
deferredService
,
claudeTokenProvider
,
sessionLimitCache
,
rpmCache
,
digestSessionStore
)
gatewayService
:=
service
.
NewGatewayService
(
accountRepository
,
groupRepository
,
usageLogRepository
,
userRepository
,
userSubscriptionRepository
,
userGroupRateRepository
,
gatewayCache
,
configConfig
,
schedulerSnapshotService
,
concurrencyService
,
billingService
,
rateLimitService
,
billingCacheService
,
identityService
,
httpUpstream
,
deferredService
,
claudeTokenProvider
,
sessionLimitCache
,
rpmCache
,
digestSessionStore
,
settingService
)
openAITokenProvider
:=
service
.
NewOpenAITokenProvider
(
accountRepository
,
geminiTokenCache
,
openAIOAuthService
)
openAITokenProvider
:=
service
.
NewOpenAITokenProvider
(
accountRepository
,
geminiTokenCache
,
openAIOAuthService
)
openAIGatewayService
:=
service
.
NewOpenAIGatewayService
(
accountRepository
,
usageLogRepository
,
userRepository
,
userSubscriptionRepository
,
userGroupRateRepository
,
gatewayCache
,
configConfig
,
schedulerSnapshotService
,
concurrencyService
,
billingService
,
rateLimitService
,
billingCacheService
,
httpUpstream
,
deferredService
,
openAITokenProvider
)
openAIGatewayService
:=
service
.
NewOpenAIGatewayService
(
accountRepository
,
usageLogRepository
,
userRepository
,
userSubscriptionRepository
,
userGroupRateRepository
,
gatewayCache
,
configConfig
,
schedulerSnapshotService
,
concurrencyService
,
billingService
,
rateLimitService
,
billingCacheService
,
httpUpstream
,
deferredService
,
openAITokenProvider
)
geminiMessagesCompatService
:=
service
.
NewGeminiMessagesCompatService
(
accountRepository
,
groupRepository
,
gatewayCache
,
schedulerSnapshotService
,
geminiTokenProvider
,
rateLimitService
,
httpUpstream
,
antigravityGatewayService
,
configConfig
)
geminiMessagesCompatService
:=
service
.
NewGeminiMessagesCompatService
(
accountRepository
,
groupRepository
,
gatewayCache
,
schedulerSnapshotService
,
geminiTokenProvider
,
rateLimitService
,
httpUpstream
,
antigravityGatewayService
,
configConfig
)
...
...
backend/internal/handler/admin/setting_handler.go
View file @
be4e49e6
...
@@ -1348,6 +1348,63 @@ func (h *SettingHandler) TestSoraS3Connection(c *gin.Context) {
...
@@ -1348,6 +1348,63 @@ func (h *SettingHandler) TestSoraS3Connection(c *gin.Context) {
response
.
Success
(
c
,
gin
.
H
{
"message"
:
"S3 连接成功"
})
response
.
Success
(
c
,
gin
.
H
{
"message"
:
"S3 连接成功"
})
}
}
// GetRectifierSettings 获取请求整流器配置
// GET /api/v1/admin/settings/rectifier
func
(
h
*
SettingHandler
)
GetRectifierSettings
(
c
*
gin
.
Context
)
{
settings
,
err
:=
h
.
settingService
.
GetRectifierSettings
(
c
.
Request
.
Context
())
if
err
!=
nil
{
response
.
ErrorFrom
(
c
,
err
)
return
}
response
.
Success
(
c
,
dto
.
RectifierSettings
{
Enabled
:
settings
.
Enabled
,
ThinkingSignatureEnabled
:
settings
.
ThinkingSignatureEnabled
,
ThinkingBudgetEnabled
:
settings
.
ThinkingBudgetEnabled
,
})
}
// UpdateRectifierSettingsRequest 更新整流器配置请求
type
UpdateRectifierSettingsRequest
struct
{
Enabled
bool
`json:"enabled"`
ThinkingSignatureEnabled
bool
`json:"thinking_signature_enabled"`
ThinkingBudgetEnabled
bool
`json:"thinking_budget_enabled"`
}
// UpdateRectifierSettings 更新请求整流器配置
// PUT /api/v1/admin/settings/rectifier
func
(
h
*
SettingHandler
)
UpdateRectifierSettings
(
c
*
gin
.
Context
)
{
var
req
UpdateRectifierSettingsRequest
if
err
:=
c
.
ShouldBindJSON
(
&
req
);
err
!=
nil
{
response
.
BadRequest
(
c
,
"Invalid request: "
+
err
.
Error
())
return
}
settings
:=
&
service
.
RectifierSettings
{
Enabled
:
req
.
Enabled
,
ThinkingSignatureEnabled
:
req
.
ThinkingSignatureEnabled
,
ThinkingBudgetEnabled
:
req
.
ThinkingBudgetEnabled
,
}
if
err
:=
h
.
settingService
.
SetRectifierSettings
(
c
.
Request
.
Context
(),
settings
);
err
!=
nil
{
response
.
BadRequest
(
c
,
err
.
Error
())
return
}
// 重新获取设置返回
updatedSettings
,
err
:=
h
.
settingService
.
GetRectifierSettings
(
c
.
Request
.
Context
())
if
err
!=
nil
{
response
.
ErrorFrom
(
c
,
err
)
return
}
response
.
Success
(
c
,
dto
.
RectifierSettings
{
Enabled
:
updatedSettings
.
Enabled
,
ThinkingSignatureEnabled
:
updatedSettings
.
ThinkingSignatureEnabled
,
ThinkingBudgetEnabled
:
updatedSettings
.
ThinkingBudgetEnabled
,
})
}
// UpdateStreamTimeoutSettingsRequest 更新流超时配置请求
// UpdateStreamTimeoutSettingsRequest 更新流超时配置请求
type
UpdateStreamTimeoutSettingsRequest
struct
{
type
UpdateStreamTimeoutSettingsRequest
struct
{
Enabled
bool
`json:"enabled"`
Enabled
bool
`json:"enabled"`
...
...
backend/internal/handler/dto/mappers.go
View file @
be4e49e6
backend/internal/handler/dto/settings.go
View file @
be4e49e6
...
@@ -161,6 +161,13 @@ type StreamTimeoutSettings struct {
...
@@ -161,6 +161,13 @@ type StreamTimeoutSettings struct {
ThresholdWindowMinutes
int
`json:"threshold_window_minutes"`
ThresholdWindowMinutes
int
`json:"threshold_window_minutes"`
}
}
// RectifierSettings 请求整流器配置 DTO
type
RectifierSettings
struct
{
Enabled
bool
`json:"enabled"`
ThinkingSignatureEnabled
bool
`json:"thinking_signature_enabled"`
ThinkingBudgetEnabled
bool
`json:"thinking_budget_enabled"`
}
// ParseCustomMenuItems parses a JSON string into a slice of CustomMenuItem.
// ParseCustomMenuItems parses a JSON string into a slice of CustomMenuItem.
// Returns empty slice on empty/invalid input.
// Returns empty slice on empty/invalid input.
func
ParseCustomMenuItems
(
raw
string
)
[]
CustomMenuItem
{
func
ParseCustomMenuItems
(
raw
string
)
[]
CustomMenuItem
{
...
...
backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go
View file @
be4e49e6
...
@@ -155,6 +155,7 @@ func newTestGatewayHandler(t *testing.T, group *service.Group, accounts []*servi
...
@@ -155,6 +155,7 @@ func newTestGatewayHandler(t *testing.T, group *service.Group, accounts []*servi
nil
,
// sessionLimitCache
nil
,
// sessionLimitCache
nil
,
// rpmCache
nil
,
// rpmCache
nil
,
// digestStore
nil
,
// digestStore
nil
,
// settingService
)
)
// RunModeSimple:跳过计费检查,避免引入 repo/cache 依赖。
// RunModeSimple:跳过计费检查,避免引入 repo/cache 依赖。
...
...
backend/internal/handler/sora_client_handler_test.go
View file @
be4e49e6
...
@@ -2207,7 +2207,7 @@ func (s *stubSoraClientForHandler) GetVideoTask(_ context.Context, _ *service.Ac
...
@@ -2207,7 +2207,7 @@ func (s *stubSoraClientForHandler) GetVideoTask(_ context.Context, _ *service.Ac
func
newMinimalGatewayService
(
accountRepo
service
.
AccountRepository
)
*
service
.
GatewayService
{
func
newMinimalGatewayService
(
accountRepo
service
.
AccountRepository
)
*
service
.
GatewayService
{
return
service
.
NewGatewayService
(
return
service
.
NewGatewayService
(
accountRepo
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
accountRepo
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
nil
,
)
)
}
}
...
...
backend/internal/handler/sora_gateway_handler_test.go
View file @
be4e49e6
...
@@ -445,6 +445,7 @@ func TestSoraGatewayHandler_ChatCompletions(t *testing.T) {
...
@@ -445,6 +445,7 @@ func TestSoraGatewayHandler_ChatCompletions(t *testing.T) {
testutil
.
StubSessionLimitCache
{},
testutil
.
StubSessionLimitCache
{},
nil
,
// rpmCache
nil
,
// rpmCache
nil
,
// digestStore
nil
,
// digestStore
nil
,
// settingService
)
)
soraClient
:=
&
stubSoraClient
{
imageURLs
:
[]
string
{
"https://example.com/a.png"
}}
soraClient
:=
&
stubSoraClient
{
imageURLs
:
[]
string
{
"https://example.com/a.png"
}}
...
...
backend/internal/pkg/apicompat/types.go
View file @
be4e49e6
backend/internal/repository/api_key_repo.go
View file @
be4e49e6
backend/internal/server/routes/admin.go
View file @
be4e49e6
...
@@ -392,6 +392,9 @@ func registerSettingsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
...
@@ -392,6 +392,9 @@ func registerSettingsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
// 流超时处理配置
// 流超时处理配置
adminSettings
.
GET
(
"/stream-timeout"
,
h
.
Admin
.
Setting
.
GetStreamTimeoutSettings
)
adminSettings
.
GET
(
"/stream-timeout"
,
h
.
Admin
.
Setting
.
GetStreamTimeoutSettings
)
adminSettings
.
PUT
(
"/stream-timeout"
,
h
.
Admin
.
Setting
.
UpdateStreamTimeoutSettings
)
adminSettings
.
PUT
(
"/stream-timeout"
,
h
.
Admin
.
Setting
.
UpdateStreamTimeoutSettings
)
// 请求整流器配置
adminSettings
.
GET
(
"/rectifier"
,
h
.
Admin
.
Setting
.
GetRectifierSettings
)
adminSettings
.
PUT
(
"/rectifier"
,
h
.
Admin
.
Setting
.
UpdateRectifierSettings
)
// Sora S3 存储配置
// Sora S3 存储配置
adminSettings
.
GET
(
"/sora-s3"
,
h
.
Admin
.
Setting
.
GetSoraS3Settings
)
adminSettings
.
GET
(
"/sora-s3"
,
h
.
Admin
.
Setting
.
GetSoraS3Settings
)
adminSettings
.
PUT
(
"/sora-s3"
,
h
.
Admin
.
Setting
.
UpdateSoraS3Settings
)
adminSettings
.
PUT
(
"/sora-s3"
,
h
.
Admin
.
Setting
.
UpdateSoraS3Settings
)
...
...
backend/internal/service/antigravity_gateway_service.go
View file @
be4e49e6
...
@@ -1384,7 +1384,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
...
@@ -1384,7 +1384,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
// 优先检测 thinking block 的 signature 相关错误(400)并重试一次:
// 优先检测 thinking block 的 signature 相关错误(400)并重试一次:
// Antigravity /v1internal 链路在部分场景会对 thought/thinking signature 做严格校验,
// Antigravity /v1internal 链路在部分场景会对 thought/thinking signature 做严格校验,
// 当历史消息携带的 signature 不合法时会直接 400;去除 thinking 后可继续完成请求。
// 当历史消息携带的 signature 不合法时会直接 400;去除 thinking 后可继续完成请求。
if
resp
.
StatusCode
==
http
.
StatusBadRequest
&&
isSignatureRelatedError
(
respBody
)
{
if
resp
.
StatusCode
==
http
.
StatusBadRequest
&&
isSignatureRelatedError
(
respBody
)
&&
s
.
settingService
.
IsSignatureRectifierEnabled
(
ctx
)
{
upstreamMsg
:=
strings
.
TrimSpace
(
extractAntigravityErrorMessage
(
respBody
))
upstreamMsg
:=
strings
.
TrimSpace
(
extractAntigravityErrorMessage
(
respBody
))
upstreamMsg
=
sanitizeUpstreamErrorMessage
(
upstreamMsg
)
upstreamMsg
=
sanitizeUpstreamErrorMessage
(
upstreamMsg
)
logBody
,
maxBytes
:=
s
.
getLogConfig
()
logBody
,
maxBytes
:=
s
.
getLogConfig
()
...
@@ -1517,6 +1517,80 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
...
@@ -1517,6 +1517,80 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
}
}
}
}
// Budget 整流:检测 budget_tokens 约束错误并自动修正重试
if
resp
.
StatusCode
==
http
.
StatusBadRequest
&&
respBody
!=
nil
&&
!
isSignatureRelatedError
(
respBody
)
{
errMsg
:=
strings
.
TrimSpace
(
extractAntigravityErrorMessage
(
respBody
))
if
isThinkingBudgetConstraintError
(
errMsg
)
&&
s
.
settingService
.
IsBudgetRectifierEnabled
(
ctx
)
{
appendOpsUpstreamError
(
c
,
OpsUpstreamErrorEvent
{
Platform
:
account
.
Platform
,
AccountID
:
account
.
ID
,
AccountName
:
account
.
Name
,
UpstreamStatusCode
:
resp
.
StatusCode
,
UpstreamRequestID
:
resp
.
Header
.
Get
(
"x-request-id"
),
Kind
:
"budget_constraint_error"
,
Message
:
errMsg
,
Detail
:
s
.
getUpstreamErrorDetail
(
respBody
),
})
// 修正 claudeReq 的 thinking 参数(adaptive 模式不修正)
if
claudeReq
.
Thinking
==
nil
||
claudeReq
.
Thinking
.
Type
!=
"adaptive"
{
retryClaudeReq
:=
claudeReq
retryClaudeReq
.
Messages
=
append
([]
antigravity
.
ClaudeMessage
(
nil
),
claudeReq
.
Messages
...
)
// 创建新的 ThinkingConfig 避免修改原始 claudeReq.Thinking 指针
retryClaudeReq
.
Thinking
=
&
antigravity
.
ThinkingConfig
{
Type
:
"enabled"
,
BudgetTokens
:
BudgetRectifyBudgetTokens
,
}
if
retryClaudeReq
.
MaxTokens
<
BudgetRectifyMinMaxTokens
{
retryClaudeReq
.
MaxTokens
=
BudgetRectifyMaxTokens
}
logger
.
LegacyPrintf
(
"service.antigravity_gateway"
,
"Antigravity account %d: detected budget_tokens constraint error, retrying with rectified budget (budget_tokens=%d, max_tokens=%d)"
,
account
.
ID
,
BudgetRectifyBudgetTokens
,
BudgetRectifyMaxTokens
)
retryGeminiBody
,
txErr
:=
antigravity
.
TransformClaudeToGeminiWithOptions
(
&
retryClaudeReq
,
projectID
,
mappedModel
,
transformOpts
)
if
txErr
==
nil
{
retryResult
,
retryErr
:=
s
.
antigravityRetryLoop
(
antigravityRetryLoopParams
{
ctx
:
ctx
,
prefix
:
prefix
,
account
:
account
,
proxyURL
:
proxyURL
,
accessToken
:
accessToken
,
action
:
action
,
body
:
retryGeminiBody
,
c
:
c
,
httpUpstream
:
s
.
httpUpstream
,
settingService
:
s
.
settingService
,
accountRepo
:
s
.
accountRepo
,
handleError
:
s
.
handleUpstreamError
,
requestedModel
:
originalModel
,
isStickySession
:
isStickySession
,
groupID
:
0
,
sessionHash
:
""
,
})
if
retryErr
==
nil
{
retryResp
:=
retryResult
.
resp
if
retryResp
.
StatusCode
<
400
{
_
=
resp
.
Body
.
Close
()
resp
=
retryResp
respBody
=
nil
}
else
{
retryBody
,
_
:=
io
.
ReadAll
(
io
.
LimitReader
(
retryResp
.
Body
,
2
<<
20
))
_
=
retryResp
.
Body
.
Close
()
respBody
=
retryBody
resp
=
&
http
.
Response
{
StatusCode
:
retryResp
.
StatusCode
,
Header
:
retryResp
.
Header
.
Clone
(),
Body
:
io
.
NopCloser
(
bytes
.
NewReader
(
retryBody
)),
}
}
}
else
{
logger
.
LegacyPrintf
(
"service.antigravity_gateway"
,
"Antigravity account %d: budget rectifier retry failed: %v"
,
account
.
ID
,
retryErr
)
}
}
}
}
}
// 处理错误响应(重试后仍失败或不触发重试)
// 处理错误响应(重试后仍失败或不触发重试)
if
resp
.
StatusCode
>=
400
{
if
resp
.
StatusCode
>=
400
{
// 检测 prompt too long 错误,返回特殊错误类型供上层 fallback
// 检测 prompt too long 错误,返回特殊错误类型供上层 fallback
...
...
backend/internal/service/api_key_rate_limit_test.go
View file @
be4e49e6
backend/internal/service/domain_constants.go
View file @
be4e49e6
...
@@ -175,6 +175,13 @@ const (
...
@@ -175,6 +175,13 @@ const (
// SettingKeyStreamTimeoutSettings stores JSON config for stream timeout handling.
// SettingKeyStreamTimeoutSettings stores JSON config for stream timeout handling.
SettingKeyStreamTimeoutSettings
=
"stream_timeout_settings"
SettingKeyStreamTimeoutSettings
=
"stream_timeout_settings"
// =========================
// Request Rectifier (请求整流器)
// =========================
// SettingKeyRectifierSettings stores JSON config for rectifier settings (thinking signature + budget).
SettingKeyRectifierSettings
=
"rectifier_settings"
// =========================
// =========================
// Sora S3 存储配置
// Sora S3 存储配置
// =========================
// =========================
...
...
backend/internal/service/gateway_request.go
View file @
be4e49e6
...
@@ -5,6 +5,7 @@ import (
...
@@ -5,6 +5,7 @@ import (
"encoding/json"
"encoding/json"
"fmt"
"fmt"
"math"
"math"
"strings"
"unsafe"
"unsafe"
"github.com/Wei-Shaw/sub2api/internal/domain"
"github.com/Wei-Shaw/sub2api/internal/domain"
...
@@ -675,3 +676,90 @@ func filterThinkingBlocksInternal(body []byte, _ bool) []byte {
...
@@ -675,3 +676,90 @@ func filterThinkingBlocksInternal(body []byte, _ bool) []byte {
}
}
return
newBody
return
newBody
}
}
// =========================
// Thinking Budget Rectifier
// =========================
const
(
// BudgetRectifyBudgetTokens is the budget_tokens value to set when rectifying.
BudgetRectifyBudgetTokens
=
32000
// BudgetRectifyMaxTokens is the max_tokens value to set when rectifying.
BudgetRectifyMaxTokens
=
64000
// BudgetRectifyMinMaxTokens is the minimum max_tokens that must exceed budget_tokens.
BudgetRectifyMinMaxTokens
=
32001
)
// isThinkingBudgetConstraintError detects whether an upstream error message indicates
// a budget_tokens constraint violation (e.g. "budget_tokens >= 1024").
// Matches three conditions (all must be true):
// 1. Contains "budget_tokens" or "budget tokens"
// 2. Contains "thinking"
// 3. Contains ">= 1024" or "greater than or equal to 1024" or ("1024" + "input should be")
func
isThinkingBudgetConstraintError
(
errMsg
string
)
bool
{
m
:=
strings
.
ToLower
(
errMsg
)
// Condition 1: budget_tokens or budget tokens
hasBudget
:=
strings
.
Contains
(
m
,
"budget_tokens"
)
||
strings
.
Contains
(
m
,
"budget tokens"
)
if
!
hasBudget
{
return
false
}
// Condition 2: thinking
if
!
strings
.
Contains
(
m
,
"thinking"
)
{
return
false
}
// Condition 3: constraint indicator
if
strings
.
Contains
(
m
,
">= 1024"
)
||
strings
.
Contains
(
m
,
"greater than or equal to 1024"
)
{
return
true
}
if
strings
.
Contains
(
m
,
"1024"
)
&&
strings
.
Contains
(
m
,
"input should be"
)
{
return
true
}
return
false
}
// RectifyThinkingBudget modifies the request body to fix budget_tokens constraint errors.
// It sets thinking.budget_tokens = 32000, thinking.type = "enabled" (unless adaptive),
// and ensures max_tokens >= 32001.
// Returns (modified body, true) if changes were applied, or (original body, false) if not.
func
RectifyThinkingBudget
(
body
[]
byte
)
([]
byte
,
bool
)
{
// If thinking type is "adaptive", skip rectification entirely
thinkingType
:=
gjson
.
GetBytes
(
body
,
"thinking.type"
)
.
String
()
if
thinkingType
==
"adaptive"
{
return
body
,
false
}
modified
:=
body
changed
:=
false
// Set thinking.type = "enabled"
if
thinkingType
!=
"enabled"
{
if
result
,
err
:=
sjson
.
SetBytes
(
modified
,
"thinking.type"
,
"enabled"
);
err
==
nil
{
modified
=
result
changed
=
true
}
}
// Set thinking.budget_tokens = 32000
currentBudget
:=
gjson
.
GetBytes
(
modified
,
"thinking.budget_tokens"
)
.
Int
()
if
currentBudget
!=
BudgetRectifyBudgetTokens
{
if
result
,
err
:=
sjson
.
SetBytes
(
modified
,
"thinking.budget_tokens"
,
BudgetRectifyBudgetTokens
);
err
==
nil
{
modified
=
result
changed
=
true
}
}
// Ensure max_tokens >= BudgetRectifyMinMaxTokens
maxTokens
:=
gjson
.
GetBytes
(
modified
,
"max_tokens"
)
.
Int
()
if
maxTokens
<
int64
(
BudgetRectifyMinMaxTokens
)
{
if
result
,
err
:=
sjson
.
SetBytes
(
modified
,
"max_tokens"
,
BudgetRectifyMaxTokens
);
err
==
nil
{
modified
=
result
changed
=
true
}
}
return
modified
,
changed
}
backend/internal/service/gateway_service.go
View file @
be4e49e6
...
@@ -526,6 +526,7 @@ type GatewayService struct {
...
@@ -526,6 +526,7 @@ type GatewayService struct {
userGroupRateSF
singleflight
.
Group
userGroupRateSF
singleflight
.
Group
modelsListCache
*
gocache
.
Cache
modelsListCache
*
gocache
.
Cache
modelsListCacheTTL
time
.
Duration
modelsListCacheTTL
time
.
Duration
settingService
*
SettingService
responseHeaderFilter
*
responseheaders
.
CompiledHeaderFilter
responseHeaderFilter
*
responseheaders
.
CompiledHeaderFilter
debugModelRouting
atomic
.
Bool
debugModelRouting
atomic
.
Bool
debugClaudeMimic
atomic
.
Bool
debugClaudeMimic
atomic
.
Bool
...
@@ -553,6 +554,7 @@ func NewGatewayService(
...
@@ -553,6 +554,7 @@ func NewGatewayService(
sessionLimitCache
SessionLimitCache
,
sessionLimitCache
SessionLimitCache
,
rpmCache
RPMCache
,
rpmCache
RPMCache
,
digestStore
*
DigestSessionStore
,
digestStore
*
DigestSessionStore
,
settingService
*
SettingService
,
)
*
GatewayService
{
)
*
GatewayService
{
userGroupRateTTL
:=
resolveUserGroupRateCacheTTL
(
cfg
)
userGroupRateTTL
:=
resolveUserGroupRateCacheTTL
(
cfg
)
modelsListTTL
:=
resolveModelsListCacheTTL
(
cfg
)
modelsListTTL
:=
resolveModelsListCacheTTL
(
cfg
)
...
@@ -579,6 +581,7 @@ func NewGatewayService(
...
@@ -579,6 +581,7 @@ func NewGatewayService(
sessionLimitCache
:
sessionLimitCache
,
sessionLimitCache
:
sessionLimitCache
,
rpmCache
:
rpmCache
,
rpmCache
:
rpmCache
,
userGroupRateCache
:
gocache
.
New
(
userGroupRateTTL
,
time
.
Minute
),
userGroupRateCache
:
gocache
.
New
(
userGroupRateTTL
,
time
.
Minute
),
settingService
:
settingService
,
modelsListCache
:
gocache
.
New
(
modelsListTTL
,
time
.
Minute
),
modelsListCache
:
gocache
.
New
(
modelsListTTL
,
time
.
Minute
),
modelsListCacheTTL
:
modelsListTTL
,
modelsListCacheTTL
:
modelsListTTL
,
responseHeaderFilter
:
compileResponseHeaderFilter
(
cfg
),
responseHeaderFilter
:
compileResponseHeaderFilter
(
cfg
),
...
@@ -4069,7 +4072,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
...
@@ -4069,7 +4072,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
if
readErr
==
nil
{
if
readErr
==
nil
{
_
=
resp
.
Body
.
Close
()
_
=
resp
.
Body
.
Close
()
if
s
.
isThinkingBlockSignatureError
(
respBody
)
{
if
s
.
isThinkingBlockSignatureError
(
respBody
)
&&
s
.
settingService
.
IsSignatureRectifierEnabled
(
ctx
)
{
appendOpsUpstreamError
(
c
,
OpsUpstreamErrorEvent
{
appendOpsUpstreamError
(
c
,
OpsUpstreamErrorEvent
{
Platform
:
account
.
Platform
,
Platform
:
account
.
Platform
,
AccountID
:
account
.
ID
,
AccountID
:
account
.
ID
,
...
@@ -4186,7 +4189,45 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
...
@@ -4186,7 +4189,45 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
resp
.
Body
=
io
.
NopCloser
(
bytes
.
NewReader
(
respBody
))
resp
.
Body
=
io
.
NopCloser
(
bytes
.
NewReader
(
respBody
))
break
break
}
}
// 不是thinking签名错误,恢复响应体
// 不是签名错误(或整流器已关闭),继续检查 budget 约束
errMsg
:=
extractUpstreamErrorMessage
(
respBody
)
if
isThinkingBudgetConstraintError
(
errMsg
)
&&
s
.
settingService
.
IsBudgetRectifierEnabled
(
ctx
)
{
appendOpsUpstreamError
(
c
,
OpsUpstreamErrorEvent
{
Platform
:
account
.
Platform
,
AccountID
:
account
.
ID
,
AccountName
:
account
.
Name
,
UpstreamStatusCode
:
resp
.
StatusCode
,
UpstreamRequestID
:
resp
.
Header
.
Get
(
"x-request-id"
),
Kind
:
"budget_constraint_error"
,
Message
:
errMsg
,
Detail
:
func
()
string
{
if
s
.
cfg
!=
nil
&&
s
.
cfg
.
Gateway
.
LogUpstreamErrorBody
{
return
truncateString
(
string
(
respBody
),
s
.
cfg
.
Gateway
.
LogUpstreamErrorBodyMaxBytes
)
}
return
""
}(),
})
rectifiedBody
,
applied
:=
RectifyThinkingBudget
(
body
)
if
applied
&&
time
.
Since
(
retryStart
)
<
maxRetryElapsed
{
logger
.
LegacyPrintf
(
"service.gateway"
,
"Account %d: detected budget_tokens constraint error, retrying with rectified budget (budget_tokens=%d, max_tokens=%d)"
,
account
.
ID
,
BudgetRectifyBudgetTokens
,
BudgetRectifyMaxTokens
)
budgetRetryReq
,
buildErr
:=
s
.
buildUpstreamRequest
(
ctx
,
c
,
account
,
rectifiedBody
,
token
,
tokenType
,
reqModel
,
reqStream
,
shouldMimicClaudeCode
)
if
buildErr
==
nil
{
budgetRetryResp
,
retryErr
:=
s
.
httpUpstream
.
DoWithTLS
(
budgetRetryReq
,
proxyURL
,
account
.
ID
,
account
.
Concurrency
,
account
.
IsTLSFingerprintEnabled
())
if
retryErr
==
nil
{
resp
=
budgetRetryResp
break
}
if
budgetRetryResp
!=
nil
&&
budgetRetryResp
.
Body
!=
nil
{
_
=
budgetRetryResp
.
Body
.
Close
()
}
logger
.
LegacyPrintf
(
"service.gateway"
,
"Account %d: budget rectifier retry failed: %v"
,
account
.
ID
,
retryErr
)
}
else
{
logger
.
LegacyPrintf
(
"service.gateway"
,
"Account %d: budget rectifier retry build failed: %v"
,
account
.
ID
,
buildErr
)
}
}
}
resp
.
Body
=
io
.
NopCloser
(
bytes
.
NewReader
(
respBody
))
resp
.
Body
=
io
.
NopCloser
(
bytes
.
NewReader
(
respBody
))
}
}
}
}
...
@@ -6928,7 +6969,7 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context,
...
@@ -6928,7 +6969,7 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context,
}
}
// 检测 thinking block 签名错误(400)并重试一次(过滤 thinking blocks)
// 检测 thinking block 签名错误(400)并重试一次(过滤 thinking blocks)
if
resp
.
StatusCode
==
400
&&
s
.
isThinkingBlockSignatureError
(
respBody
)
{
if
resp
.
StatusCode
==
400
&&
s
.
isThinkingBlockSignatureError
(
respBody
)
&&
s
.
settingService
.
IsSignatureRectifierEnabled
(
ctx
)
{
logger
.
LegacyPrintf
(
"service.gateway"
,
"Account %d: detected thinking block signature error on count_tokens, retrying with filtered thinking blocks"
,
account
.
ID
)
logger
.
LegacyPrintf
(
"service.gateway"
,
"Account %d: detected thinking block signature error on count_tokens, retrying with filtered thinking blocks"
,
account
.
ID
)
filteredBody
:=
FilterThinkingBlocksForRetry
(
body
)
filteredBody
:=
FilterThinkingBlocksForRetry
(
body
)
...
...
backend/internal/service/setting_service.go
View file @
be4e49e6
...
@@ -1194,6 +1194,59 @@ func (s *SettingService) GetMinClaudeCodeVersion(ctx context.Context) string {
...
@@ -1194,6 +1194,59 @@ func (s *SettingService) GetMinClaudeCodeVersion(ctx context.Context) string {
return
ver
return
ver
}
}
// GetRectifierSettings 获取请求整流器配置
func
(
s
*
SettingService
)
GetRectifierSettings
(
ctx
context
.
Context
)
(
*
RectifierSettings
,
error
)
{
value
,
err
:=
s
.
settingRepo
.
GetValue
(
ctx
,
SettingKeyRectifierSettings
)
if
err
!=
nil
{
if
errors
.
Is
(
err
,
ErrSettingNotFound
)
{
return
DefaultRectifierSettings
(),
nil
}
return
nil
,
fmt
.
Errorf
(
"get rectifier settings: %w"
,
err
)
}
if
value
==
""
{
return
DefaultRectifierSettings
(),
nil
}
var
settings
RectifierSettings
if
err
:=
json
.
Unmarshal
([]
byte
(
value
),
&
settings
);
err
!=
nil
{
return
DefaultRectifierSettings
(),
nil
}
return
&
settings
,
nil
}
// SetRectifierSettings 设置请求整流器配置
func
(
s
*
SettingService
)
SetRectifierSettings
(
ctx
context
.
Context
,
settings
*
RectifierSettings
)
error
{
if
settings
==
nil
{
return
fmt
.
Errorf
(
"settings cannot be nil"
)
}
data
,
err
:=
json
.
Marshal
(
settings
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"marshal rectifier settings: %w"
,
err
)
}
return
s
.
settingRepo
.
Set
(
ctx
,
SettingKeyRectifierSettings
,
string
(
data
))
}
// IsSignatureRectifierEnabled 判断签名整流是否启用(总开关 && 签名子开关)
func
(
s
*
SettingService
)
IsSignatureRectifierEnabled
(
ctx
context
.
Context
)
bool
{
settings
,
err
:=
s
.
GetRectifierSettings
(
ctx
)
if
err
!=
nil
{
return
true
// fail-open: 查询失败时默认启用
}
return
settings
.
Enabled
&&
settings
.
ThinkingSignatureEnabled
}
// IsBudgetRectifierEnabled 判断 Budget 整流是否启用(总开关 && Budget 子开关)
func
(
s
*
SettingService
)
IsBudgetRectifierEnabled
(
ctx
context
.
Context
)
bool
{
settings
,
err
:=
s
.
GetRectifierSettings
(
ctx
)
if
err
!=
nil
{
return
true
// fail-open: 查询失败时默认启用
}
return
settings
.
Enabled
&&
settings
.
ThinkingBudgetEnabled
}
// SetStreamTimeoutSettings 设置流超时处理配置
// SetStreamTimeoutSettings 设置流超时处理配置
func
(
s
*
SettingService
)
SetStreamTimeoutSettings
(
ctx
context
.
Context
,
settings
*
StreamTimeoutSettings
)
error
{
func
(
s
*
SettingService
)
SetStreamTimeoutSettings
(
ctx
context
.
Context
,
settings
*
StreamTimeoutSettings
)
error
{
if
settings
==
nil
{
if
settings
==
nil
{
...
...
backend/internal/service/settings_view.go
View file @
be4e49e6
...
@@ -175,3 +175,19 @@ func DefaultStreamTimeoutSettings() *StreamTimeoutSettings {
...
@@ -175,3 +175,19 @@ func DefaultStreamTimeoutSettings() *StreamTimeoutSettings {
ThresholdWindowMinutes
:
10
,
ThresholdWindowMinutes
:
10
,
}
}
}
}
// RectifierSettings 请求整流器配置
type
RectifierSettings
struct
{
Enabled
bool
`json:"enabled"`
// 总开关
ThinkingSignatureEnabled
bool
`json:"thinking_signature_enabled"`
// Thinking 签名整流
ThinkingBudgetEnabled
bool
`json:"thinking_budget_enabled"`
// Thinking Budget 整流
}
// DefaultRectifierSettings 返回默认的整流器配置(全部启用)
func
DefaultRectifierSettings
()
*
RectifierSettings
{
return
&
RectifierSettings
{
Enabled
:
true
,
ThinkingSignatureEnabled
:
true
,
ThinkingBudgetEnabled
:
true
,
}
}
frontend/src/api/admin/settings.ts
View file @
be4e49e6
...
@@ -273,6 +273,41 @@ export async function updateStreamTimeoutSettings(
...
@@ -273,6 +273,41 @@ export async function updateStreamTimeoutSettings(
return
data
return
data
}
}
// ==================== Rectifier Settings ====================
/**
* Rectifier settings interface
*/
export
interface
RectifierSettings
{
enabled
:
boolean
thinking_signature_enabled
:
boolean
thinking_budget_enabled
:
boolean
}
/**
* Get rectifier settings
* @returns Rectifier settings
*/
export
async
function
getRectifierSettings
():
Promise
<
RectifierSettings
>
{
const
{
data
}
=
await
apiClient
.
get
<
RectifierSettings
>
(
'
/admin/settings/rectifier
'
)
return
data
}
/**
* Update rectifier settings
* @param settings - Rectifier settings to update
* @returns Updated settings
*/
export
async
function
updateRectifierSettings
(
settings
:
RectifierSettings
):
Promise
<
RectifierSettings
>
{
const
{
data
}
=
await
apiClient
.
put
<
RectifierSettings
>
(
'
/admin/settings/rectifier
'
,
settings
)
return
data
}
// ==================== Sora S3 Settings ====================
// ==================== Sora S3 Settings ====================
export
interface
SoraS3Settings
{
export
interface
SoraS3Settings
{
...
@@ -419,6 +454,8 @@ export const settingsAPI = {
...
@@ -419,6 +454,8 @@ export const settingsAPI = {
deleteAdminApiKey
,
deleteAdminApiKey
,
getStreamTimeoutSettings
,
getStreamTimeoutSettings
,
updateStreamTimeoutSettings
,
updateStreamTimeoutSettings
,
getRectifierSettings
,
updateRectifierSettings
,
getSoraS3Settings
,
getSoraS3Settings
,
updateSoraS3Settings
,
updateSoraS3Settings
,
testSoraS3Connection
,
testSoraS3Connection
,
...
...
frontend/src/i18n/locales/en.ts
View file @
be4e49e6
...
@@ -3977,6 +3977,18 @@ export default {
...
@@ -3977,6 +3977,18 @@ export default {
saved
:
'
Stream timeout settings saved
'
,
saved
:
'
Stream timeout settings saved
'
,
saveFailed
:
'
Failed to save stream timeout settings
'
saveFailed
:
'
Failed to save stream timeout settings
'
},
},
rectifier
:
{
title
:
'
Request Rectifier
'
,
description
:
'
Automatically fix request parameters and retry when upstream returns specific errors
'
,
enabled
:
'
Enable Request Rectifier
'
,
enabledHint
:
'
Master switch - disabling turns off all rectification features
'
,
thinkingSignature
:
'
Thinking Signature Rectifier
'
,
thinkingSignatureHint
:
'
Automatically strip signatures and retry when upstream returns thinking block signature validation errors
'
,
thinkingBudget
:
'
Thinking Budget Rectifier
'
,
thinkingBudgetHint
:
'
Automatically set budget to 32000 and retry when upstream returns budget_tokens constraint error (≥1024)
'
,
saved
:
'
Rectifier settings saved
'
,
saveFailed
:
'
Failed to save rectifier settings
'
},
saveSettings
:
'
Save Settings
'
,
saveSettings
:
'
Save Settings
'
,
saving
:
'
Saving...
'
,
saving
:
'
Saving...
'
,
settingsSaved
:
'
Settings saved successfully
'
,
settingsSaved
:
'
Settings saved successfully
'
,
...
...
Prev
1
2
Next
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment