Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
陈曦
sub2api
Commits
b3b2868f
Unverified
Commit
b3b2868f
authored
Jan 14, 2026
by
Wesley Liddick
Committed by
GitHub
Jan 14, 2026
Browse files
Merge pull request #278 from IanShaw027/fix/openai-account-schedulability-check
fix(网关): 修复账号选择中的调度器快照延迟问题
parents
99cbfa15
25b00abc
Changes
5
Hide whitespace changes
Inline
Side-by-side
backend/internal/repository/ops_repo_preagg.go
View file @
b3b2868f
...
@@ -71,7 +71,9 @@ usage_agg AS (
...
@@ -71,7 +71,9 @@ usage_agg AS (
error_base AS (
error_base AS (
SELECT
SELECT
date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start,
date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start,
platform AS platform,
-- platform is NULL for some early-phase errors (e.g. before routing); map to a sentinel
-- value so platform-level GROUPING SETS don't collide with the overall (platform=NULL) row.
COALESCE(platform, 'unknown') AS platform,
group_id AS group_id,
group_id AS group_id,
is_business_limited AS is_business_limited,
is_business_limited AS is_business_limited,
error_owner AS error_owner,
error_owner AS error_owner,
...
...
backend/internal/service/gateway_multiplatform_test.go
View file @
b3b2868f
...
@@ -1211,6 +1211,72 @@ func TestGatewayService_SelectAccountWithLoadAwareness(t *testing.T) {
...
@@ -1211,6 +1211,72 @@ func TestGatewayService_SelectAccountWithLoadAwareness(t *testing.T) {
require
.
Nil
(
t
,
result
)
require
.
Nil
(
t
,
result
)
require
.
Contains
(
t
,
err
.
Error
(),
"no available accounts"
)
require
.
Contains
(
t
,
err
.
Error
(),
"no available accounts"
)
})
})
t
.
Run
(
"过滤不可调度账号-限流账号被跳过"
,
func
(
t
*
testing
.
T
)
{
now
:=
time
.
Now
()
resetAt
:=
now
.
Add
(
10
*
time
.
Minute
)
repo
:=
&
mockAccountRepoForPlatform
{
accounts
:
[]
Account
{
{
ID
:
1
,
Platform
:
PlatformAnthropic
,
Priority
:
1
,
Status
:
StatusActive
,
Schedulable
:
true
,
Concurrency
:
5
,
RateLimitResetAt
:
&
resetAt
},
{
ID
:
2
,
Platform
:
PlatformAnthropic
,
Priority
:
2
,
Status
:
StatusActive
,
Schedulable
:
true
,
Concurrency
:
5
},
},
accountsByID
:
map
[
int64
]
*
Account
{},
}
for
i
:=
range
repo
.
accounts
{
repo
.
accountsByID
[
repo
.
accounts
[
i
]
.
ID
]
=
&
repo
.
accounts
[
i
]
}
cache
:=
&
mockGatewayCacheForPlatform
{}
cfg
:=
testConfig
()
cfg
.
Gateway
.
Scheduling
.
LoadBatchEnabled
=
false
svc
:=
&
GatewayService
{
accountRepo
:
repo
,
cache
:
cache
,
cfg
:
cfg
,
concurrencyService
:
nil
,
}
result
,
err
:=
svc
.
SelectAccountWithLoadAwareness
(
ctx
,
nil
,
""
,
"claude-3-5-sonnet-20241022"
,
nil
)
require
.
NoError
(
t
,
err
)
require
.
NotNil
(
t
,
result
)
require
.
NotNil
(
t
,
result
.
Account
)
require
.
Equal
(
t
,
int64
(
2
),
result
.
Account
.
ID
,
"应跳过限流账号,选择可用账号"
)
})
t
.
Run
(
"过滤不可调度账号-过载账号被跳过"
,
func
(
t
*
testing
.
T
)
{
now
:=
time
.
Now
()
overloadUntil
:=
now
.
Add
(
10
*
time
.
Minute
)
repo
:=
&
mockAccountRepoForPlatform
{
accounts
:
[]
Account
{
{
ID
:
1
,
Platform
:
PlatformAnthropic
,
Priority
:
1
,
Status
:
StatusActive
,
Schedulable
:
true
,
Concurrency
:
5
,
OverloadUntil
:
&
overloadUntil
},
{
ID
:
2
,
Platform
:
PlatformAnthropic
,
Priority
:
2
,
Status
:
StatusActive
,
Schedulable
:
true
,
Concurrency
:
5
},
},
accountsByID
:
map
[
int64
]
*
Account
{},
}
for
i
:=
range
repo
.
accounts
{
repo
.
accountsByID
[
repo
.
accounts
[
i
]
.
ID
]
=
&
repo
.
accounts
[
i
]
}
cache
:=
&
mockGatewayCacheForPlatform
{}
cfg
:=
testConfig
()
cfg
.
Gateway
.
Scheduling
.
LoadBatchEnabled
=
false
svc
:=
&
GatewayService
{
accountRepo
:
repo
,
cache
:
cache
,
cfg
:
cfg
,
concurrencyService
:
nil
,
}
result
,
err
:=
svc
.
SelectAccountWithLoadAwareness
(
ctx
,
nil
,
""
,
"claude-3-5-sonnet-20241022"
,
nil
)
require
.
NoError
(
t
,
err
)
require
.
NotNil
(
t
,
result
)
require
.
NotNil
(
t
,
result
.
Account
)
require
.
Equal
(
t
,
int64
(
2
),
result
.
Account
.
ID
,
"应跳过过载账号,选择可用账号"
)
})
}
}
func
TestGatewayService_GroupResolution_ReusesContextGroup
(
t
*
testing
.
T
)
{
func
TestGatewayService_GroupResolution_ReusesContextGroup
(
t
*
testing
.
T
)
{
...
...
backend/internal/service/gateway_service.go
View file @
b3b2868f
...
@@ -511,6 +511,12 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
...
@@ -511,6 +511,12 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
if
isExcluded
(
acc
.
ID
)
{
if
isExcluded
(
acc
.
ID
)
{
continue
continue
}
}
// Scheduler snapshots can be temporarily stale (bucket rebuild is throttled);
// re-check schedulability here so recently rate-limited/overloaded accounts
// are not selected again before the bucket is rebuilt.
if
!
acc
.
IsSchedulable
()
{
continue
}
if
!
s
.
isAccountAllowedForPlatform
(
acc
,
platform
,
useMixed
)
{
if
!
s
.
isAccountAllowedForPlatform
(
acc
,
platform
,
useMixed
)
{
continue
continue
}
}
...
@@ -893,6 +899,11 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context,
...
@@ -893,6 +899,11 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context,
if
_
,
excluded
:=
excludedIDs
[
acc
.
ID
];
excluded
{
if
_
,
excluded
:=
excludedIDs
[
acc
.
ID
];
excluded
{
continue
continue
}
}
// Scheduler snapshots can be temporarily stale; re-check schedulability here to
// avoid selecting accounts that were recently rate-limited/overloaded.
if
!
acc
.
IsSchedulable
()
{
continue
}
if
!
acc
.
IsSchedulableForModel
(
requestedModel
)
{
if
!
acc
.
IsSchedulableForModel
(
requestedModel
)
{
continue
continue
}
}
...
@@ -977,6 +988,11 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g
...
@@ -977,6 +988,11 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g
if
_
,
excluded
:=
excludedIDs
[
acc
.
ID
];
excluded
{
if
_
,
excluded
:=
excludedIDs
[
acc
.
ID
];
excluded
{
continue
continue
}
}
// Scheduler snapshots can be temporarily stale; re-check schedulability here to
// avoid selecting accounts that were recently rate-limited/overloaded.
if
!
acc
.
IsSchedulable
()
{
continue
}
// 过滤:原生平台直接通过,antigravity 需要启用混合调度
// 过滤:原生平台直接通过,antigravity 需要启用混合调度
if
acc
.
Platform
==
PlatformAntigravity
&&
!
acc
.
IsMixedSchedulingEnabled
()
{
if
acc
.
Platform
==
PlatformAntigravity
&&
!
acc
.
IsMixedSchedulingEnabled
()
{
continue
continue
...
...
backend/internal/service/openai_gateway_service.go
View file @
b3b2868f
...
@@ -186,6 +186,11 @@ func (s *OpenAIGatewayService) SelectAccountForModelWithExclusions(ctx context.C
...
@@ -186,6 +186,11 @@ func (s *OpenAIGatewayService) SelectAccountForModelWithExclusions(ctx context.C
if
_
,
excluded
:=
excludedIDs
[
acc
.
ID
];
excluded
{
if
_
,
excluded
:=
excludedIDs
[
acc
.
ID
];
excluded
{
continue
continue
}
}
// Scheduler snapshots can be temporarily stale; re-check schedulability here to
// avoid selecting accounts that were recently rate-limited/overloaded.
if
!
acc
.
IsSchedulable
()
{
continue
}
// Check model support
// Check model support
if
requestedModel
!=
""
&&
!
acc
.
IsModelSupported
(
requestedModel
)
{
if
requestedModel
!=
""
&&
!
acc
.
IsModelSupported
(
requestedModel
)
{
continue
continue
...
@@ -332,6 +337,12 @@ func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Contex
...
@@ -332,6 +337,12 @@ func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Contex
if
isExcluded
(
acc
.
ID
)
{
if
isExcluded
(
acc
.
ID
)
{
continue
continue
}
}
// Scheduler snapshots can be temporarily stale (bucket rebuild is throttled);
// re-check schedulability here so recently rate-limited/overloaded accounts
// are not selected again before the bucket is rebuilt.
if
!
acc
.
IsSchedulable
()
{
continue
}
if
requestedModel
!=
""
&&
!
acc
.
IsModelSupported
(
requestedModel
)
{
if
requestedModel
!=
""
&&
!
acc
.
IsModelSupported
(
requestedModel
)
{
continue
continue
}
}
...
...
backend/internal/service/openai_gateway_service_test.go
View file @
b3b2868f
...
@@ -3,6 +3,7 @@ package service
...
@@ -3,6 +3,7 @@ package service
import
(
import
(
"bufio"
"bufio"
"bytes"
"bytes"
"context"
"errors"
"errors"
"io"
"io"
"net/http"
"net/http"
...
@@ -15,6 +16,129 @@ import (
...
@@ -15,6 +16,129 @@ import (
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin"
)
)
type
stubOpenAIAccountRepo
struct
{
AccountRepository
accounts
[]
Account
}
func
(
r
stubOpenAIAccountRepo
)
ListSchedulableByGroupIDAndPlatform
(
ctx
context
.
Context
,
groupID
int64
,
platform
string
)
([]
Account
,
error
)
{
return
append
([]
Account
(
nil
),
r
.
accounts
...
),
nil
}
func
(
r
stubOpenAIAccountRepo
)
ListSchedulableByPlatform
(
ctx
context
.
Context
,
platform
string
)
([]
Account
,
error
)
{
return
append
([]
Account
(
nil
),
r
.
accounts
...
),
nil
}
type
stubConcurrencyCache
struct
{
ConcurrencyCache
}
func
(
c
stubConcurrencyCache
)
AcquireAccountSlot
(
ctx
context
.
Context
,
accountID
int64
,
maxConcurrency
int
,
requestID
string
)
(
bool
,
error
)
{
return
true
,
nil
}
func
(
c
stubConcurrencyCache
)
ReleaseAccountSlot
(
ctx
context
.
Context
,
accountID
int64
,
requestID
string
)
error
{
return
nil
}
func
(
c
stubConcurrencyCache
)
GetAccountsLoadBatch
(
ctx
context
.
Context
,
accounts
[]
AccountWithConcurrency
)
(
map
[
int64
]
*
AccountLoadInfo
,
error
)
{
out
:=
make
(
map
[
int64
]
*
AccountLoadInfo
,
len
(
accounts
))
for
_
,
acc
:=
range
accounts
{
out
[
acc
.
ID
]
=
&
AccountLoadInfo
{
AccountID
:
acc
.
ID
,
LoadRate
:
0
}
}
return
out
,
nil
}
func
TestOpenAISelectAccountWithLoadAwareness_FiltersUnschedulable
(
t
*
testing
.
T
)
{
now
:=
time
.
Now
()
resetAt
:=
now
.
Add
(
10
*
time
.
Minute
)
groupID
:=
int64
(
1
)
rateLimited
:=
Account
{
ID
:
1
,
Platform
:
PlatformOpenAI
,
Type
:
AccountTypeAPIKey
,
Status
:
StatusActive
,
Schedulable
:
true
,
Concurrency
:
1
,
Priority
:
0
,
RateLimitResetAt
:
&
resetAt
,
}
available
:=
Account
{
ID
:
2
,
Platform
:
PlatformOpenAI
,
Type
:
AccountTypeAPIKey
,
Status
:
StatusActive
,
Schedulable
:
true
,
Concurrency
:
1
,
Priority
:
1
,
}
svc
:=
&
OpenAIGatewayService
{
accountRepo
:
stubOpenAIAccountRepo
{
accounts
:
[]
Account
{
rateLimited
,
available
}},
concurrencyService
:
NewConcurrencyService
(
stubConcurrencyCache
{}),
}
selection
,
err
:=
svc
.
SelectAccountWithLoadAwareness
(
context
.
Background
(),
&
groupID
,
""
,
"gpt-5.2"
,
nil
)
if
err
!=
nil
{
t
.
Fatalf
(
"SelectAccountWithLoadAwareness error: %v"
,
err
)
}
if
selection
==
nil
||
selection
.
Account
==
nil
{
t
.
Fatalf
(
"expected selection with account"
)
}
if
selection
.
Account
.
ID
!=
available
.
ID
{
t
.
Fatalf
(
"expected account %d, got %d"
,
available
.
ID
,
selection
.
Account
.
ID
)
}
if
selection
.
ReleaseFunc
!=
nil
{
selection
.
ReleaseFunc
()
}
}
func
TestOpenAISelectAccountWithLoadAwareness_FiltersUnschedulableWhenNoConcurrencyService
(
t
*
testing
.
T
)
{
now
:=
time
.
Now
()
resetAt
:=
now
.
Add
(
10
*
time
.
Minute
)
groupID
:=
int64
(
1
)
rateLimited
:=
Account
{
ID
:
1
,
Platform
:
PlatformOpenAI
,
Type
:
AccountTypeAPIKey
,
Status
:
StatusActive
,
Schedulable
:
true
,
Concurrency
:
1
,
Priority
:
0
,
RateLimitResetAt
:
&
resetAt
,
}
available
:=
Account
{
ID
:
2
,
Platform
:
PlatformOpenAI
,
Type
:
AccountTypeAPIKey
,
Status
:
StatusActive
,
Schedulable
:
true
,
Concurrency
:
1
,
Priority
:
1
,
}
svc
:=
&
OpenAIGatewayService
{
accountRepo
:
stubOpenAIAccountRepo
{
accounts
:
[]
Account
{
rateLimited
,
available
}},
// concurrencyService is nil, forcing the non-load-batch selection path.
}
selection
,
err
:=
svc
.
SelectAccountWithLoadAwareness
(
context
.
Background
(),
&
groupID
,
""
,
"gpt-5.2"
,
nil
)
if
err
!=
nil
{
t
.
Fatalf
(
"SelectAccountWithLoadAwareness error: %v"
,
err
)
}
if
selection
==
nil
||
selection
.
Account
==
nil
{
t
.
Fatalf
(
"expected selection with account"
)
}
if
selection
.
Account
.
ID
!=
available
.
ID
{
t
.
Fatalf
(
"expected account %d, got %d"
,
available
.
ID
,
selection
.
Account
.
ID
)
}
if
selection
.
ReleaseFunc
!=
nil
{
selection
.
ReleaseFunc
()
}
}
func
TestOpenAIStreamingTimeout
(
t
*
testing
.
T
)
{
func
TestOpenAIStreamingTimeout
(
t
*
testing
.
T
)
{
gin
.
SetMode
(
gin
.
TestMode
)
gin
.
SetMode
(
gin
.
TestMode
)
cfg
:=
&
config
.
Config
{
cfg
:=
&
config
.
Config
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment