Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
陈曦
sub2api
Commits
7451b6f9
"backend/vscode:/vscode.git/clone" did not exist on "9d801595c95eb5f5616bca0ec409a42d73325987"
Commit
7451b6f9
authored
Apr 15, 2026
by
Wesley Liddick
Browse files
修复 OpenAI 账号限流回流误判:7d 窗口可用时不因 5h 窗口为 0 回写 429
parent
7c671b53
Changes
7
Show whitespace changes
Inline
Side-by-side
backend/internal/service/account_test_service.go
View file @
7451b6f9
...
@@ -515,22 +515,10 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account
...
@@ -515,22 +515,10 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account
_
=
s
.
accountRepo
.
UpdateExtra
(
ctx
,
account
.
ID
,
updates
)
_
=
s
.
accountRepo
.
UpdateExtra
(
ctx
,
account
.
ID
,
updates
)
mergeAccountExtra
(
account
,
updates
)
mergeAccountExtra
(
account
,
updates
)
}
}
if
snapshot
:=
ParseCodexRateLimitHeaders
(
resp
.
Header
);
snapshot
!=
nil
{
if
resetAt
:=
codexRateLimitResetAtFromSnapshot
(
snapshot
,
time
.
Now
());
resetAt
!=
nil
{
_
=
s
.
accountRepo
.
SetRateLimited
(
ctx
,
account
.
ID
,
*
resetAt
)
account
.
RateLimitResetAt
=
resetAt
}
}
}
}
if
resp
.
StatusCode
!=
http
.
StatusOK
{
if
resp
.
StatusCode
!=
http
.
StatusOK
{
body
,
_
:=
io
.
ReadAll
(
resp
.
Body
)
body
,
_
:=
io
.
ReadAll
(
resp
.
Body
)
if
isOAuth
&&
s
.
accountRepo
!=
nil
{
if
resetAt
:=
(
&
RateLimitService
{})
.
calculateOpenAI429ResetTime
(
resp
.
Header
);
resetAt
!=
nil
{
_
=
s
.
accountRepo
.
SetRateLimited
(
ctx
,
account
.
ID
,
*
resetAt
)
account
.
RateLimitResetAt
=
resetAt
}
}
// 401 Unauthorized: 标记账号为永久错误
// 401 Unauthorized: 标记账号为永久错误
if
resp
.
StatusCode
==
http
.
StatusUnauthorized
&&
s
.
accountRepo
!=
nil
{
if
resp
.
StatusCode
==
http
.
StatusUnauthorized
&&
s
.
accountRepo
!=
nil
{
errMsg
:=
fmt
.
Sprintf
(
"Authentication failed (401): %s"
,
string
(
body
))
errMsg
:=
fmt
.
Sprintf
(
"Authentication failed (401): %s"
,
string
(
body
))
...
...
backend/internal/service/account_test_service_openai_test.go
View file @
7451b6f9
...
@@ -111,7 +111,7 @@ func TestAccountTestService_OpenAISuccessPersistsSnapshotFromHeaders(t *testing.
...
@@ -111,7 +111,7 @@ func TestAccountTestService_OpenAISuccessPersistsSnapshotFromHeaders(t *testing.
require
.
Contains
(
t
,
recorder
.
Body
.
String
(),
"test_complete"
)
require
.
Contains
(
t
,
recorder
.
Body
.
String
(),
"test_complete"
)
}
}
func
TestAccountTestService_OpenAI429PersistsSnapshot
And
RateLimit
(
t
*
testing
.
T
)
{
func
TestAccountTestService_OpenAI429PersistsSnapshot
Without
RateLimit
(
t
*
testing
.
T
)
{
gin
.
SetMode
(
gin
.
TestMode
)
gin
.
SetMode
(
gin
.
TestMode
)
ctx
,
_
:=
newTestContext
()
ctx
,
_
:=
newTestContext
()
...
@@ -138,10 +138,7 @@ func TestAccountTestService_OpenAI429PersistsSnapshotAndRateLimit(t *testing.T)
...
@@ -138,10 +138,7 @@ func TestAccountTestService_OpenAI429PersistsSnapshotAndRateLimit(t *testing.T)
require
.
Error
(
t
,
err
)
require
.
Error
(
t
,
err
)
require
.
NotEmpty
(
t
,
repo
.
updatedExtra
)
require
.
NotEmpty
(
t
,
repo
.
updatedExtra
)
require
.
Equal
(
t
,
100.0
,
repo
.
updatedExtra
[
"codex_5h_used_percent"
])
require
.
Equal
(
t
,
100.0
,
repo
.
updatedExtra
[
"codex_5h_used_percent"
])
require
.
Equal
(
t
,
int64
(
88
),
repo
.
rateLimitedID
)
require
.
Zero
(
t
,
repo
.
rateLimitedID
)
require
.
NotNil
(
t
,
repo
.
rateLimitedAt
)
require
.
Nil
(
t
,
repo
.
rateLimitedAt
)
require
.
NotNil
(
t
,
account
.
RateLimitResetAt
)
require
.
Nil
(
t
,
account
.
RateLimitResetAt
)
if
account
.
RateLimitResetAt
!=
nil
&&
repo
.
rateLimitedAt
!=
nil
{
require
.
WithinDuration
(
t
,
*
repo
.
rateLimitedAt
,
*
account
.
RateLimitResetAt
,
time
.
Second
)
}
}
}
backend/internal/service/account_usage_service.go
View file @
7451b6f9
...
@@ -499,7 +499,6 @@ func (s *AccountUsageService) getOpenAIUsage(ctx context.Context, account *Accou
...
@@ -499,7 +499,6 @@ func (s *AccountUsageService) getOpenAIUsage(ctx context.Context, account *Accou
if
account
==
nil
{
if
account
==
nil
{
return
usage
,
nil
return
usage
,
nil
}
}
syncOpenAICodexRateLimitFromExtra
(
ctx
,
s
.
accountRepo
,
account
,
now
)
if
progress
:=
buildCodexUsageProgressFromExtra
(
account
.
Extra
,
"5h"
,
now
);
progress
!=
nil
{
if
progress
:=
buildCodexUsageProgressFromExtra
(
account
.
Extra
,
"5h"
,
now
);
progress
!=
nil
{
usage
.
FiveHour
=
progress
usage
.
FiveHour
=
progress
...
@@ -509,11 +508,8 @@ func (s *AccountUsageService) getOpenAIUsage(ctx context.Context, account *Accou
...
@@ -509,11 +508,8 @@ func (s *AccountUsageService) getOpenAIUsage(ctx context.Context, account *Accou
}
}
if
shouldRefreshOpenAICodexSnapshot
(
account
,
usage
,
now
)
&&
s
.
shouldProbeOpenAICodexSnapshot
(
account
.
ID
,
now
)
{
if
shouldRefreshOpenAICodexSnapshot
(
account
,
usage
,
now
)
&&
s
.
shouldProbeOpenAICodexSnapshot
(
account
.
ID
,
now
)
{
if
updates
,
resetAt
,
err
:=
s
.
probeOpenAICodexSnapshot
(
ctx
,
account
);
err
==
nil
&&
(
len
(
updates
)
>
0
||
resetAt
!=
nil
)
{
if
updates
,
err
:=
s
.
probeOpenAICodexSnapshot
(
ctx
,
account
);
err
==
nil
&&
len
(
updates
)
>
0
{
mergeAccountExtra
(
account
,
updates
)
mergeAccountExtra
(
account
,
updates
)
if
resetAt
!=
nil
{
account
.
RateLimitResetAt
=
resetAt
}
if
usage
.
UpdatedAt
==
nil
{
if
usage
.
UpdatedAt
==
nil
{
usage
.
UpdatedAt
=
&
now
usage
.
UpdatedAt
=
&
now
}
}
...
@@ -594,26 +590,26 @@ func (s *AccountUsageService) shouldProbeOpenAICodexSnapshot(accountID int64, no
...
@@ -594,26 +590,26 @@ func (s *AccountUsageService) shouldProbeOpenAICodexSnapshot(accountID int64, no
return
true
return
true
}
}
func
(
s
*
AccountUsageService
)
probeOpenAICodexSnapshot
(
ctx
context
.
Context
,
account
*
Account
)
(
map
[
string
]
any
,
*
time
.
Time
,
error
)
{
func
(
s
*
AccountUsageService
)
probeOpenAICodexSnapshot
(
ctx
context
.
Context
,
account
*
Account
)
(
map
[
string
]
any
,
error
)
{
if
account
==
nil
||
!
account
.
IsOAuth
()
{
if
account
==
nil
||
!
account
.
IsOAuth
()
{
return
nil
,
nil
,
nil
return
nil
,
nil
}
}
accessToken
:=
account
.
GetOpenAIAccessToken
()
accessToken
:=
account
.
GetOpenAIAccessToken
()
if
accessToken
==
""
{
if
accessToken
==
""
{
return
nil
,
nil
,
fmt
.
Errorf
(
"no access token available"
)
return
nil
,
fmt
.
Errorf
(
"no access token available"
)
}
}
modelID
:=
openaipkg
.
DefaultTestModel
modelID
:=
openaipkg
.
DefaultTestModel
payload
:=
createOpenAITestPayload
(
modelID
,
true
)
payload
:=
createOpenAITestPayload
(
modelID
,
true
)
payloadBytes
,
err
:=
json
.
Marshal
(
payload
)
payloadBytes
,
err
:=
json
.
Marshal
(
payload
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
nil
,
fmt
.
Errorf
(
"marshal openai probe payload: %w"
,
err
)
return
nil
,
fmt
.
Errorf
(
"marshal openai probe payload: %w"
,
err
)
}
}
reqCtx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
15
*
time
.
Second
)
reqCtx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
15
*
time
.
Second
)
defer
cancel
()
defer
cancel
()
req
,
err
:=
http
.
NewRequestWithContext
(
reqCtx
,
http
.
MethodPost
,
chatgptCodexURL
,
bytes
.
NewReader
(
payloadBytes
))
req
,
err
:=
http
.
NewRequestWithContext
(
reqCtx
,
http
.
MethodPost
,
chatgptCodexURL
,
bytes
.
NewReader
(
payloadBytes
))
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
nil
,
fmt
.
Errorf
(
"create openai probe request: %w"
,
err
)
return
nil
,
fmt
.
Errorf
(
"create openai probe request: %w"
,
err
)
}
}
req
.
Host
=
"chatgpt.com"
req
.
Host
=
"chatgpt.com"
req
.
Header
.
Set
(
"Content-Type"
,
"application/json"
)
req
.
Header
.
Set
(
"Content-Type"
,
"application/json"
)
...
@@ -642,67 +638,51 @@ func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, acco
...
@@ -642,67 +638,51 @@ func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, acco
ResponseHeaderTimeout
:
10
*
time
.
Second
,
ResponseHeaderTimeout
:
10
*
time
.
Second
,
})
})
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
nil
,
fmt
.
Errorf
(
"build openai probe client: %w"
,
err
)
return
nil
,
fmt
.
Errorf
(
"build openai probe client: %w"
,
err
)
}
}
resp
,
err
:=
client
.
Do
(
req
)
resp
,
err
:=
client
.
Do
(
req
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
nil
,
fmt
.
Errorf
(
"openai codex probe request failed: %w"
,
err
)
return
nil
,
fmt
.
Errorf
(
"openai codex probe request failed: %w"
,
err
)
}
}
defer
func
()
{
_
=
resp
.
Body
.
Close
()
}()
defer
func
()
{
_
=
resp
.
Body
.
Close
()
}()
updates
,
resetAt
,
err
:=
extractOpenAICodexProbe
Snapshot
(
resp
)
updates
,
err
:=
extractOpenAICodexProbe
Updates
(
resp
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
nil
,
err
return
nil
,
err
}
}
if
len
(
updates
)
>
0
||
resetAt
!=
nil
{
if
len
(
updates
)
>
0
{
s
.
persistOpenAICodexProbeSnapshot
(
account
.
ID
,
updates
,
resetAt
)
s
.
persistOpenAICodexProbeSnapshot
(
account
.
ID
,
updates
)
return
updates
,
resetAt
,
nil
return
updates
,
nil
}
}
return
nil
,
nil
,
nil
return
nil
,
nil
}
}
func
(
s
*
AccountUsageService
)
persistOpenAICodexProbeSnapshot
(
accountID
int64
,
updates
map
[
string
]
any
,
resetAt
*
time
.
Time
)
{
func
(
s
*
AccountUsageService
)
persistOpenAICodexProbeSnapshot
(
accountID
int64
,
updates
map
[
string
]
any
)
{
if
s
==
nil
||
s
.
accountRepo
==
nil
||
accountID
<=
0
{
if
s
==
nil
||
s
.
accountRepo
==
nil
||
accountID
<=
0
{
return
return
}
}
if
len
(
updates
)
==
0
&&
resetAt
==
nil
{
if
len
(
updates
)
==
0
{
return
return
}
}
go
func
()
{
go
func
()
{
updateCtx
,
updateCancel
:=
context
.
WithTimeout
(
context
.
Background
(),
5
*
time
.
Second
)
updateCtx
,
updateCancel
:=
context
.
WithTimeout
(
context
.
Background
(),
5
*
time
.
Second
)
defer
updateCancel
()
defer
updateCancel
()
if
len
(
updates
)
>
0
{
_
=
s
.
accountRepo
.
UpdateExtra
(
updateCtx
,
accountID
,
updates
)
_
=
s
.
accountRepo
.
UpdateExtra
(
updateCtx
,
accountID
,
updates
)
}
if
resetAt
!=
nil
{
_
=
s
.
accountRepo
.
SetRateLimited
(
updateCtx
,
accountID
,
*
resetAt
)
}
}()
}()
}
}
func
extractOpenAICodexProbe
Snapshot
(
resp
*
http
.
Response
)
(
map
[
string
]
any
,
*
time
.
Time
,
error
)
{
func
extractOpenAICodexProbe
Updates
(
resp
*
http
.
Response
)
(
map
[
string
]
any
,
error
)
{
if
resp
==
nil
{
if
resp
==
nil
{
return
nil
,
nil
,
nil
return
nil
,
nil
}
}
if
snapshot
:=
ParseCodexRateLimitHeaders
(
resp
.
Header
);
snapshot
!=
nil
{
if
snapshot
:=
ParseCodexRateLimitHeaders
(
resp
.
Header
);
snapshot
!=
nil
{
baseTime
:=
time
.
Now
()
return
buildCodexUsageExtraUpdates
(
snapshot
,
time
.
Now
()),
nil
updates
:=
buildCodexUsageExtraUpdates
(
snapshot
,
baseTime
)
resetAt
:=
codexRateLimitResetAtFromSnapshot
(
snapshot
,
baseTime
)
if
len
(
updates
)
>
0
{
return
updates
,
resetAt
,
nil
}
return
nil
,
resetAt
,
nil
}
}
if
resp
.
StatusCode
<
200
||
resp
.
StatusCode
>=
300
{
if
resp
.
StatusCode
<
200
||
resp
.
StatusCode
>=
300
{
return
nil
,
nil
,
fmt
.
Errorf
(
"openai codex probe returned status %d"
,
resp
.
StatusCode
)
return
nil
,
fmt
.
Errorf
(
"openai codex probe returned status %d"
,
resp
.
StatusCode
)
}
}
return
nil
,
nil
,
nil
return
nil
,
nil
}
func
extractOpenAICodexProbeUpdates
(
resp
*
http
.
Response
)
(
map
[
string
]
any
,
error
)
{
updates
,
_
,
err
:=
extractOpenAICodexProbeSnapshot
(
resp
)
return
updates
,
err
}
}
func
mergeAccountExtra
(
account
*
Account
,
updates
map
[
string
]
any
)
{
func
mergeAccountExtra
(
account
*
Account
,
updates
map
[
string
]
any
)
{
...
...
backend/internal/service/account_usage_service_test.go
View file @
7451b6f9
...
@@ -92,30 +92,7 @@ func TestExtractOpenAICodexProbeUpdatesAccepts429WithCodexHeaders(t *testing.T)
...
@@ -92,30 +92,7 @@ func TestExtractOpenAICodexProbeUpdatesAccepts429WithCodexHeaders(t *testing.T)
}
}
}
}
func
TestExtractOpenAICodexProbeSnapshotAccepts429WithResetAt
(
t
*
testing
.
T
)
{
func
TestAccountUsageService_PersistOpenAICodexProbeSnapshotOnlyUpdatesExtra
(
t
*
testing
.
T
)
{
t
.
Parallel
()
headers
:=
make
(
http
.
Header
)
headers
.
Set
(
"x-codex-primary-used-percent"
,
"100"
)
headers
.
Set
(
"x-codex-primary-reset-after-seconds"
,
"604800"
)
headers
.
Set
(
"x-codex-primary-window-minutes"
,
"10080"
)
headers
.
Set
(
"x-codex-secondary-used-percent"
,
"100"
)
headers
.
Set
(
"x-codex-secondary-reset-after-seconds"
,
"18000"
)
headers
.
Set
(
"x-codex-secondary-window-minutes"
,
"300"
)
updates
,
resetAt
,
err
:=
extractOpenAICodexProbeSnapshot
(
&
http
.
Response
{
StatusCode
:
http
.
StatusTooManyRequests
,
Header
:
headers
})
if
err
!=
nil
{
t
.
Fatalf
(
"extractOpenAICodexProbeSnapshot() error = %v"
,
err
)
}
if
len
(
updates
)
==
0
{
t
.
Fatal
(
"expected codex probe updates from 429 headers"
)
}
if
resetAt
==
nil
{
t
.
Fatal
(
"expected resetAt from exhausted codex headers"
)
}
}
func
TestAccountUsageService_PersistOpenAICodexProbeSnapshotSetsRateLimit
(
t
*
testing
.
T
)
{
t
.
Parallel
()
t
.
Parallel
()
repo
:=
&
accountUsageCodexProbeRepo
{
repo
:=
&
accountUsageCodexProbeRepo
{
...
@@ -123,12 +100,10 @@ func TestAccountUsageService_PersistOpenAICodexProbeSnapshotSetsRateLimit(t *tes
...
@@ -123,12 +100,10 @@ func TestAccountUsageService_PersistOpenAICodexProbeSnapshotSetsRateLimit(t *tes
rateLimitCh
:
make
(
chan
time
.
Time
,
1
),
rateLimitCh
:
make
(
chan
time
.
Time
,
1
),
}
}
svc
:=
&
AccountUsageService
{
accountRepo
:
repo
}
svc
:=
&
AccountUsageService
{
accountRepo
:
repo
}
resetAt
:=
time
.
Now
()
.
Add
(
2
*
time
.
Hour
)
.
UTC
()
.
Truncate
(
time
.
Second
)
svc
.
persistOpenAICodexProbeSnapshot
(
321
,
map
[
string
]
any
{
svc
.
persistOpenAICodexProbeSnapshot
(
321
,
map
[
string
]
any
{
"codex_7d_used_percent"
:
100.0
,
"codex_7d_used_percent"
:
100.0
,
"codex_7d_reset_at"
:
resetAt
.
Format
(
time
.
RFC3339
),
"codex_7d_reset_at"
:
time
.
Now
()
.
Add
(
2
*
time
.
Hour
)
.
UTC
()
.
Truncate
(
time
.
Second
)
.
Format
(
time
.
RFC3339
),
}
,
&
resetAt
)
})
select
{
select
{
case
updates
:=
<-
repo
.
updateExtraCh
:
case
updates
:=
<-
repo
.
updateExtraCh
:
...
@@ -136,16 +111,49 @@ func TestAccountUsageService_PersistOpenAICodexProbeSnapshotSetsRateLimit(t *tes
...
@@ -136,16 +111,49 @@ func TestAccountUsageService_PersistOpenAICodexProbeSnapshotSetsRateLimit(t *tes
t
.
Fatalf
(
"codex_7d_used_percent = %v, want 100"
,
got
)
t
.
Fatalf
(
"codex_7d_used_percent = %v, want 100"
,
got
)
}
}
case
<-
time
.
After
(
2
*
time
.
Second
)
:
case
<-
time
.
After
(
2
*
time
.
Second
)
:
t
.
Fatal
(
"
waiting for
codex
probe
extra
persistence timed out
"
)
t
.
Fatal
(
"
等待
codex
探测快照写入
extra
超时
"
)
}
}
select
{
select
{
case
got
:=
<-
repo
.
rateLimitCh
:
case
got
:=
<-
repo
.
rateLimitCh
:
if
got
.
Before
(
resetAt
.
Add
(
-
time
.
Second
))
||
got
.
After
(
resetAt
.
Add
(
time
.
Second
))
{
t
.
Fatalf
(
"不应将探测快照写入运行时限流状态: %v"
,
got
)
t
.
Fatalf
(
"rate limit resetAt = %v, want around %v"
,
got
,
resetAt
)
case
<-
time
.
After
(
200
*
time
.
Millisecond
)
:
}
}
case
<-
time
.
After
(
2
*
time
.
Second
)
:
}
t
.
Fatal
(
"waiting for codex probe rate limit persistence timed out"
)
func
TestAccountUsageService_GetOpenAIUsage_DoesNotPromoteCodexExtraToRateLimit
(
t
*
testing
.
T
)
{
t
.
Parallel
()
resetAt
:=
time
.
Now
()
.
Add
(
6
*
24
*
time
.
Hour
)
.
UTC
()
.
Truncate
(
time
.
Second
)
repo
:=
&
accountUsageCodexProbeRepo
{
rateLimitCh
:
make
(
chan
time
.
Time
,
1
),
}
svc
:=
&
AccountUsageService
{
accountRepo
:
repo
}
account
:=
&
Account
{
Platform
:
PlatformOpenAI
,
Type
:
AccountTypeOAuth
,
Extra
:
map
[
string
]
any
{
"codex_5h_used_percent"
:
1.0
,
"codex_5h_reset_at"
:
time
.
Now
()
.
Add
(
2
*
time
.
Hour
)
.
UTC
()
.
Truncate
(
time
.
Second
)
.
Format
(
time
.
RFC3339
),
"codex_7d_used_percent"
:
100.0
,
"codex_7d_reset_at"
:
resetAt
.
Format
(
time
.
RFC3339
),
},
}
usage
,
err
:=
svc
.
getOpenAIUsage
(
context
.
Background
(),
account
)
if
err
!=
nil
{
t
.
Fatalf
(
"getOpenAIUsage() error = %v"
,
err
)
}
if
usage
.
SevenDay
==
nil
||
usage
.
SevenDay
.
Utilization
!=
100.0
{
t
.
Fatalf
(
"预期 7 天用量仍然可见,实际为 %#v"
,
usage
.
SevenDay
)
}
if
account
.
RateLimitResetAt
!=
nil
{
t
.
Fatalf
(
"不应让已耗尽的 codex extra 改写运行时限流状态: %v"
,
account
.
RateLimitResetAt
)
}
select
{
case
got
:=
<-
repo
.
rateLimitCh
:
t
.
Fatalf
(
"不应将已耗尽的 codex extra 持久化为运行时限流状态: %v"
,
got
)
case
<-
time
.
After
(
200
*
time
.
Millisecond
)
:
}
}
}
}
...
...
backend/internal/service/admin_service.go
View file @
7451b6f9
...
@@ -1470,10 +1470,6 @@ func (s *adminServiceImpl) ListAccounts(ctx context.Context, page, pageSize int,
...
@@ -1470,10 +1470,6 @@ func (s *adminServiceImpl) ListAccounts(ctx context.Context, page, pageSize int,
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
0
,
err
return
nil
,
0
,
err
}
}
now
:=
time
.
Now
()
for
i
:=
range
accounts
{
syncOpenAICodexRateLimitFromExtra
(
ctx
,
s
.
accountRepo
,
&
accounts
[
i
],
now
)
}
return
accounts
,
result
.
Total
,
nil
return
accounts
,
result
.
Total
,
nil
}
}
...
...
backend/internal/service/openai_gateway_service.go
View file @
7451b6f9
...
@@ -1681,7 +1681,6 @@ func (s *OpenAIGatewayService) recheckSelectedOpenAIAccountFromDB(ctx context.Co
...
@@ -1681,7 +1681,6 @@ func (s *OpenAIGatewayService) recheckSelectedOpenAIAccountFromDB(ctx context.Co
if
err
!=
nil
||
latest
==
nil
{
if
err
!=
nil
||
latest
==
nil
{
return
nil
return
nil
}
}
syncOpenAICodexRateLimitFromExtra
(
ctx
,
s
.
accountRepo
,
latest
,
time
.
Now
())
if
!
latest
.
IsSchedulable
()
||
!
latest
.
IsOpenAI
()
{
if
!
latest
.
IsSchedulable
()
||
!
latest
.
IsOpenAI
()
{
return
nil
return
nil
}
}
...
@@ -1704,7 +1703,6 @@ func (s *OpenAIGatewayService) getSchedulableAccount(ctx context.Context, accoun
...
@@ -1704,7 +1703,6 @@ func (s *OpenAIGatewayService) getSchedulableAccount(ctx context.Context, accoun
if
err
!=
nil
||
account
==
nil
{
if
err
!=
nil
||
account
==
nil
{
return
account
,
err
return
account
,
err
}
}
syncOpenAICodexRateLimitFromExtra
(
ctx
,
s
.
accountRepo
,
account
,
time
.
Now
())
return
account
,
nil
return
account
,
nil
}
}
...
@@ -4768,69 +4766,6 @@ func buildCodexUsageExtraUpdates(snapshot *OpenAICodexUsageSnapshot, fallbackNow
...
@@ -4768,69 +4766,6 @@ func buildCodexUsageExtraUpdates(snapshot *OpenAICodexUsageSnapshot, fallbackNow
return
updates
return
updates
}
}
func
codexUsagePercentExhausted
(
value
*
float64
)
bool
{
return
value
!=
nil
&&
*
value
>=
100
-
1e-9
}
func
codexRateLimitResetAtFromSnapshot
(
snapshot
*
OpenAICodexUsageSnapshot
,
fallbackNow
time
.
Time
)
*
time
.
Time
{
if
snapshot
==
nil
{
return
nil
}
normalized
:=
snapshot
.
Normalize
()
if
normalized
==
nil
{
return
nil
}
baseTime
:=
codexSnapshotBaseTime
(
snapshot
,
fallbackNow
)
if
codexUsagePercentExhausted
(
normalized
.
Used7dPercent
)
&&
normalized
.
Reset7dSeconds
!=
nil
{
resetAt
:=
baseTime
.
Add
(
time
.
Duration
(
*
normalized
.
Reset7dSeconds
)
*
time
.
Second
)
return
&
resetAt
}
if
codexUsagePercentExhausted
(
normalized
.
Used5hPercent
)
&&
normalized
.
Reset5hSeconds
!=
nil
{
resetAt
:=
baseTime
.
Add
(
time
.
Duration
(
*
normalized
.
Reset5hSeconds
)
*
time
.
Second
)
return
&
resetAt
}
return
nil
}
func
codexRateLimitResetAtFromExtra
(
extra
map
[
string
]
any
,
now
time
.
Time
)
*
time
.
Time
{
if
len
(
extra
)
==
0
{
return
nil
}
if
progress
:=
buildCodexUsageProgressFromExtra
(
extra
,
"7d"
,
now
);
progress
!=
nil
&&
codexUsagePercentExhausted
(
&
progress
.
Utilization
)
&&
progress
.
ResetsAt
!=
nil
&&
now
.
Before
(
*
progress
.
ResetsAt
)
{
resetAt
:=
progress
.
ResetsAt
.
UTC
()
return
&
resetAt
}
if
progress
:=
buildCodexUsageProgressFromExtra
(
extra
,
"5h"
,
now
);
progress
!=
nil
&&
codexUsagePercentExhausted
(
&
progress
.
Utilization
)
&&
progress
.
ResetsAt
!=
nil
&&
now
.
Before
(
*
progress
.
ResetsAt
)
{
resetAt
:=
progress
.
ResetsAt
.
UTC
()
return
&
resetAt
}
return
nil
}
func
applyOpenAICodexRateLimitFromExtra
(
account
*
Account
,
now
time
.
Time
)
(
*
time
.
Time
,
bool
)
{
if
account
==
nil
||
!
account
.
IsOpenAI
()
{
return
nil
,
false
}
resetAt
:=
codexRateLimitResetAtFromExtra
(
account
.
Extra
,
now
)
if
resetAt
==
nil
{
return
nil
,
false
}
if
account
.
RateLimitResetAt
!=
nil
&&
now
.
Before
(
*
account
.
RateLimitResetAt
)
&&
!
account
.
RateLimitResetAt
.
Before
(
*
resetAt
)
{
return
account
.
RateLimitResetAt
,
false
}
account
.
RateLimitResetAt
=
resetAt
return
resetAt
,
true
}
func
syncOpenAICodexRateLimitFromExtra
(
ctx
context
.
Context
,
repo
AccountRepository
,
account
*
Account
,
now
time
.
Time
)
*
time
.
Time
{
resetAt
,
changed
:=
applyOpenAICodexRateLimitFromExtra
(
account
,
now
)
if
!
changed
||
resetAt
==
nil
||
repo
==
nil
||
account
==
nil
||
account
.
ID
<=
0
{
return
resetAt
}
_
=
repo
.
SetRateLimited
(
ctx
,
account
.
ID
,
*
resetAt
)
return
resetAt
}
// updateCodexUsageSnapshot saves the Codex usage snapshot to account's Extra field
// updateCodexUsageSnapshot saves the Codex usage snapshot to account's Extra field
func
(
s
*
OpenAIGatewayService
)
updateCodexUsageSnapshot
(
ctx
context
.
Context
,
accountID
int64
,
snapshot
*
OpenAICodexUsageSnapshot
)
{
func
(
s
*
OpenAIGatewayService
)
updateCodexUsageSnapshot
(
ctx
context
.
Context
,
accountID
int64
,
snapshot
*
OpenAICodexUsageSnapshot
)
{
if
snapshot
==
nil
{
if
snapshot
==
nil
{
...
@@ -4842,24 +4777,17 @@ func (s *OpenAIGatewayService) updateCodexUsageSnapshot(ctx context.Context, acc
...
@@ -4842,24 +4777,17 @@ func (s *OpenAIGatewayService) updateCodexUsageSnapshot(ctx context.Context, acc
now
:=
time
.
Now
()
now
:=
time
.
Now
()
updates
:=
buildCodexUsageExtraUpdates
(
snapshot
,
now
)
updates
:=
buildCodexUsageExtraUpdates
(
snapshot
,
now
)
resetAt
:=
codexRateLimitResetAtFromSnapshot
(
snapshot
,
now
)
if
len
(
updates
)
==
0
{
if
len
(
updates
)
==
0
&&
resetAt
==
nil
{
return
return
}
}
shouldPersistUpdates
:=
len
(
updates
)
>
0
&&
s
.
getCodexSnapshotThrottle
()
.
Allow
(
accountID
,
now
)
if
!
s
.
getCodexSnapshotThrottle
()
.
Allow
(
accountID
,
now
)
{
if
!
shouldPersistUpdates
&&
resetAt
==
nil
{
return
return
}
}
go
func
()
{
go
func
()
{
updateCtx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
5
*
time
.
Second
)
updateCtx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
5
*
time
.
Second
)
defer
cancel
()
defer
cancel
()
if
shouldPersistUpdates
{
_
=
s
.
accountRepo
.
UpdateExtra
(
updateCtx
,
accountID
,
updates
)
_
=
s
.
accountRepo
.
UpdateExtra
(
updateCtx
,
accountID
,
updates
)
}
if
resetAt
!=
nil
{
_
=
s
.
accountRepo
.
SetRateLimited
(
updateCtx
,
accountID
,
*
resetAt
)
}
}()
}()
}
}
...
...
backend/internal/service/openai_ws_ratelimit_signal_test.go
View file @
7451b6f9
...
@@ -345,7 +345,7 @@ func TestOpenAIGatewayService_ProxyResponsesWebSocketFromClient_ErrorEventUsageL
...
@@ -345,7 +345,7 @@ func TestOpenAIGatewayService_ProxyResponsesWebSocketFromClient_ErrorEventUsageL
}
}
}
}
func
TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ExhaustedSnapshotSet
s
RateLimit
(
t
*
testing
.
T
)
{
func
TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ExhaustedSnapshot
DoesNot
SetRateLimit
(
t
*
testing
.
T
)
{
repo
:=
&
openAICodexSnapshotAsyncRepo
{
repo
:=
&
openAICodexSnapshotAsyncRepo
{
updateExtraCh
:
make
(
chan
map
[
string
]
any
,
1
),
updateExtraCh
:
make
(
chan
map
[
string
]
any
,
1
),
rateLimitCh
:
make
(
chan
time
.
Time
,
1
),
rateLimitCh
:
make
(
chan
time
.
Time
,
1
),
...
@@ -359,7 +359,6 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ExhaustedSnapshotSetsRate
...
@@ -359,7 +359,6 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ExhaustedSnapshotSetsRate
SecondaryResetAfterSeconds
:
ptrIntWS
(
1200
),
SecondaryResetAfterSeconds
:
ptrIntWS
(
1200
),
SecondaryWindowMinutes
:
ptrIntWS
(
300
),
SecondaryWindowMinutes
:
ptrIntWS
(
300
),
}
}
before
:=
time
.
Now
()
svc
.
updateCodexUsageSnapshot
(
context
.
Background
(),
601
,
snapshot
)
svc
.
updateCodexUsageSnapshot
(
context
.
Background
(),
601
,
snapshot
)
select
{
select
{
...
@@ -371,9 +370,8 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ExhaustedSnapshotSetsRate
...
@@ -371,9 +370,8 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ExhaustedSnapshotSetsRate
select
{
select
{
case
resetAt
:=
<-
repo
.
rateLimitCh
:
case
resetAt
:=
<-
repo
.
rateLimitCh
:
require
.
WithinDuration
(
t
,
before
.
Add
(
time
.
Hour
),
resetAt
,
2
*
time
.
Second
)
t
.
Fatalf
(
"不应因仅写入快照而生成运行时限流时间: %v"
,
resetAt
)
case
<-
time
.
After
(
2
*
time
.
Second
)
:
case
<-
time
.
After
(
2
*
time
.
Second
)
:
t
.
Fatal
(
"等待 codex 100% 自动切换限流超时"
)
}
}
}
}
...
@@ -401,7 +399,7 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_NonExhaustedSnapshotDoesN
...
@@ -401,7 +399,7 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_NonExhaustedSnapshotDoesN
select
{
select
{
case
resetAt
:=
<-
repo
.
rateLimitCh
:
case
resetAt
:=
<-
repo
.
rateLimitCh
:
t
.
Fatalf
(
"
unexpected rate limit reset at
: %v"
,
resetAt
)
t
.
Fatalf
(
"
不应写入运行时限流时间
: %v"
,
resetAt
)
case
<-
time
.
After
(
200
*
time
.
Millisecond
)
:
case
<-
time
.
After
(
200
*
time
.
Millisecond
)
:
}
}
}
}
...
@@ -409,7 +407,6 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_NonExhaustedSnapshotDoesN
...
@@ -409,7 +407,6 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_NonExhaustedSnapshotDoesN
func
TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ThrottlesExtraWrites
(
t
*
testing
.
T
)
{
func
TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ThrottlesExtraWrites
(
t
*
testing
.
T
)
{
repo
:=
&
openAICodexSnapshotAsyncRepo
{
repo
:=
&
openAICodexSnapshotAsyncRepo
{
updateExtraCh
:
make
(
chan
map
[
string
]
any
,
2
),
updateExtraCh
:
make
(
chan
map
[
string
]
any
,
2
),
rateLimitCh
:
make
(
chan
time
.
Time
,
2
),
}
}
svc
:=
&
OpenAIGatewayService
{
svc
:=
&
OpenAIGatewayService
{
accountRepo
:
repo
,
accountRepo
:
repo
,
...
@@ -443,7 +440,7 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ThrottlesExtraWrites(t *t
...
@@ -443,7 +440,7 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ThrottlesExtraWrites(t *t
func
ptrFloat64WS
(
v
float64
)
*
float64
{
return
&
v
}
func
ptrFloat64WS
(
v
float64
)
*
float64
{
return
&
v
}
func
ptrIntWS
(
v
int
)
*
int
{
return
&
v
}
func
ptrIntWS
(
v
int
)
*
int
{
return
&
v
}
func
TestOpenAIGatewayService_GetSchedulableAccount_ExhaustedCodexExtraSet
s
RateLimit
(
t
*
testing
.
T
)
{
func
TestOpenAIGatewayService_GetSchedulableAccount_ExhaustedCodexExtra
DoesNot
SetRateLimit
(
t
*
testing
.
T
)
{
resetAt
:=
time
.
Now
()
.
Add
(
6
*
24
*
time
.
Hour
)
resetAt
:=
time
.
Now
()
.
Add
(
6
*
24
*
time
.
Hour
)
account
:=
Account
{
account
:=
Account
{
ID
:
701
,
ID
:
701
,
...
@@ -463,17 +460,15 @@ func TestOpenAIGatewayService_GetSchedulableAccount_ExhaustedCodexExtraSetsRateL
...
@@ -463,17 +460,15 @@ func TestOpenAIGatewayService_GetSchedulableAccount_ExhaustedCodexExtraSetsRateL
fresh
,
err
:=
svc
.
getSchedulableAccount
(
context
.
Background
(),
account
.
ID
)
fresh
,
err
:=
svc
.
getSchedulableAccount
(
context
.
Background
(),
account
.
ID
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
require
.
NotNil
(
t
,
fresh
)
require
.
NotNil
(
t
,
fresh
)
require
.
NotNil
(
t
,
fresh
.
RateLimitResetAt
)
require
.
Nil
(
t
,
fresh
.
RateLimitResetAt
)
require
.
WithinDuration
(
t
,
resetAt
.
UTC
(),
*
fresh
.
RateLimitResetAt
,
time
.
Second
)
select
{
select
{
case
persisted
:=
<-
repo
.
rateLimitCh
:
case
persisted
:=
<-
repo
.
rateLimitCh
:
require
.
WithinDuration
(
t
,
resetAt
.
UTC
(),
persisted
,
time
.
Secon
d
)
t
.
Fatalf
(
"不应将已耗尽的 codex extra 提升为运行时限流状态: %v"
,
persiste
d
)
case
<-
time
.
After
(
2
*
time
.
Second
)
:
case
<-
time
.
After
(
2
*
time
.
Second
)
:
t
.
Fatal
(
"等待旧快照补写限流状态超时"
)
}
}
}
}
func
TestAdminService_ListAccounts_ExhaustedCodexExtra
ReturnsRateLimitedAccoun
t
(
t
*
testing
.
T
)
{
func
TestAdminService_ListAccounts_ExhaustedCodexExtra
DoesNotSetRateLimi
t
(
t
*
testing
.
T
)
{
resetAt
:=
time
.
Now
()
.
Add
(
4
*
24
*
time
.
Hour
)
resetAt
:=
time
.
Now
()
.
Add
(
4
*
24
*
time
.
Hour
)
repo
:=
&
openAICodexExtraListRepo
{
repo
:=
&
openAICodexExtraListRepo
{
stubOpenAIAccountRepo
:
stubOpenAIAccountRepo
{
accounts
:
[]
Account
{{
stubOpenAIAccountRepo
:
stubOpenAIAccountRepo
{
accounts
:
[]
Account
{{
...
@@ -496,13 +491,11 @@ func TestAdminService_ListAccounts_ExhaustedCodexExtraReturnsRateLimitedAccount(
...
@@ -496,13 +491,11 @@ func TestAdminService_ListAccounts_ExhaustedCodexExtraReturnsRateLimitedAccount(
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
int64
(
1
),
total
)
require
.
Equal
(
t
,
int64
(
1
),
total
)
require
.
Len
(
t
,
accounts
,
1
)
require
.
Len
(
t
,
accounts
,
1
)
require
.
NotNil
(
t
,
accounts
[
0
]
.
RateLimitResetAt
)
require
.
Nil
(
t
,
accounts
[
0
]
.
RateLimitResetAt
)
require
.
WithinDuration
(
t
,
resetAt
.
UTC
(),
*
accounts
[
0
]
.
RateLimitResetAt
,
time
.
Second
)
select
{
select
{
case
persisted
:=
<-
repo
.
rateLimitCh
:
case
persisted
:=
<-
repo
.
rateLimitCh
:
require
.
WithinDuration
(
t
,
resetAt
.
UTC
(),
persisted
,
time
.
Secon
d
)
t
.
Fatalf
(
"不应在账号列表查询时将 codex extra 持久化为运行时限流状态: %v"
,
persiste
d
)
case
<-
time
.
After
(
2
*
time
.
Second
)
:
case
<-
time
.
After
(
2
*
time
.
Second
)
:
t
.
Fatal
(
"等待列表补写限流状态超时"
)
}
}
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment