Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
陈曦
sub2api
Commits
2b30e3b6
Commit
2b30e3b6
authored
Mar 11, 2026
by
ius
Browse files
Reduce scheduler rebuilds on neutral extra updates
parent
7455476c
Changes
2
Hide whitespace changes
Inline
Side-by-side
backend/internal/repository/account_repo.go
View file @
2b30e3b6
...
...
@@ -16,6 +16,7 @@ import (
"encoding/json"
"errors"
"strconv"
"strings"
"time"
dbent
"github.com/Wei-Shaw/sub2api/ent"
...
...
@@ -50,6 +51,18 @@ type accountRepository struct {
schedulerCache
service
.
SchedulerCache
}
var
schedulerNeutralExtraKeyPrefixes
=
[]
string
{
"codex_primary_"
,
"codex_secondary_"
,
"codex_5h_"
,
"codex_7d_"
,
}
var
schedulerNeutralExtraKeys
=
map
[
string
]
struct
{}{
"codex_usage_updated_at"
:
{},
"session_window_utilization"
:
{},
}
// NewAccountRepository 创建账户仓储实例。
// 这是对外暴露的构造函数,返回接口类型以便于依赖注入。
func
NewAccountRepository
(
client
*
dbent
.
Client
,
sqlDB
*
sql
.
DB
,
schedulerCache
service
.
SchedulerCache
)
service
.
AccountRepository
{
...
...
@@ -613,6 +626,29 @@ func (r *accountRepository) syncSchedulerAccountSnapshot(ctx context.Context, ac
}
}
func
(
r
*
accountRepository
)
patchSchedulerAccountExtra
(
ctx
context
.
Context
,
accountID
int64
,
updates
map
[
string
]
any
)
{
if
r
==
nil
||
r
.
schedulerCache
==
nil
||
accountID
<=
0
||
len
(
updates
)
==
0
{
return
}
account
,
err
:=
r
.
schedulerCache
.
GetAccount
(
ctx
,
accountID
)
if
err
!=
nil
{
logger
.
LegacyPrintf
(
"repository.account"
,
"[Scheduler] patch account extra read failed: id=%d err=%v"
,
accountID
,
err
)
return
}
if
account
==
nil
{
return
}
if
account
.
Extra
==
nil
{
account
.
Extra
=
make
(
map
[
string
]
any
,
len
(
updates
))
}
for
key
,
value
:=
range
updates
{
account
.
Extra
[
key
]
=
value
}
if
err
:=
r
.
schedulerCache
.
SetAccount
(
ctx
,
account
);
err
!=
nil
{
logger
.
LegacyPrintf
(
"repository.account"
,
"[Scheduler] patch account extra write failed: id=%d err=%v"
,
accountID
,
err
)
}
}
func
(
r
*
accountRepository
)
syncSchedulerAccountSnapshots
(
ctx
context
.
Context
,
accountIDs
[]
int64
)
{
if
r
==
nil
||
r
.
schedulerCache
==
nil
||
len
(
accountIDs
)
==
0
{
return
...
...
@@ -1185,12 +1221,47 @@ func (r *accountRepository) UpdateExtra(ctx context.Context, id int64, updates m
if
affected
==
0
{
return
service
.
ErrAccountNotFound
}
if
err
:=
enqueueSchedulerOutbox
(
ctx
,
r
.
sql
,
service
.
SchedulerOutboxEventAccountChanged
,
&
id
,
nil
,
nil
);
err
!=
nil
{
logger
.
LegacyPrintf
(
"repository.account"
,
"[SchedulerOutbox] enqueue extra update failed: account=%d err=%v"
,
id
,
err
)
if
shouldEnqueueSchedulerOutboxForExtraUpdates
(
updates
)
{
if
err
:=
enqueueSchedulerOutbox
(
ctx
,
r
.
sql
,
service
.
SchedulerOutboxEventAccountChanged
,
&
id
,
nil
,
nil
);
err
!=
nil
{
logger
.
LegacyPrintf
(
"repository.account"
,
"[SchedulerOutbox] enqueue extra update failed: account=%d err=%v"
,
id
,
err
)
}
}
else
{
// 观测型 extra 字段不需要触发 bucket 重建,但尽量把单账号缓存补到最新,
// 让 sticky session / GetAccount 命中缓存时也能读到最新快照。
r
.
patchSchedulerAccountExtra
(
ctx
,
id
,
updates
)
}
return
nil
}
func
shouldEnqueueSchedulerOutboxForExtraUpdates
(
updates
map
[
string
]
any
)
bool
{
if
len
(
updates
)
==
0
{
return
false
}
for
key
:=
range
updates
{
if
!
isSchedulerNeutralExtraKey
(
key
)
{
return
true
}
}
return
false
}
func
isSchedulerNeutralExtraKey
(
key
string
)
bool
{
key
=
strings
.
TrimSpace
(
key
)
if
key
==
""
{
return
false
}
if
_
,
ok
:=
schedulerNeutralExtraKeys
[
key
];
ok
{
return
true
}
for
_
,
prefix
:=
range
schedulerNeutralExtraKeyPrefixes
{
if
strings
.
HasPrefix
(
key
,
prefix
)
{
return
true
}
}
return
false
}
func
(
r
*
accountRepository
)
BulkUpdate
(
ctx
context
.
Context
,
ids
[]
int64
,
updates
service
.
AccountBulkUpdate
)
(
int64
,
error
)
{
if
len
(
ids
)
==
0
{
return
0
,
nil
...
...
backend/internal/repository/account_repo_integration_test.go
View file @
2b30e3b6
...
...
@@ -23,6 +23,7 @@ type AccountRepoSuite struct {
type
schedulerCacheRecorder
struct
{
setAccounts
[]
*
service
.
Account
accounts
map
[
int64
]
*
service
.
Account
}
func
(
s
*
schedulerCacheRecorder
)
GetSnapshot
(
ctx
context
.
Context
,
bucket
service
.
SchedulerBucket
)
([]
*
service
.
Account
,
bool
,
error
)
{
...
...
@@ -34,11 +35,20 @@ func (s *schedulerCacheRecorder) SetSnapshot(ctx context.Context, bucket service
}
func
(
s
*
schedulerCacheRecorder
)
GetAccount
(
ctx
context
.
Context
,
accountID
int64
)
(
*
service
.
Account
,
error
)
{
return
nil
,
nil
if
s
.
accounts
==
nil
{
return
nil
,
nil
}
return
s
.
accounts
[
accountID
],
nil
}
func
(
s
*
schedulerCacheRecorder
)
SetAccount
(
ctx
context
.
Context
,
account
*
service
.
Account
)
error
{
s
.
setAccounts
=
append
(
s
.
setAccounts
,
account
)
if
s
.
accounts
==
nil
{
s
.
accounts
=
make
(
map
[
int64
]
*
service
.
Account
)
}
if
account
!=
nil
{
s
.
accounts
[
account
.
ID
]
=
account
}
return
nil
}
...
...
@@ -623,6 +633,64 @@ func (s *AccountRepoSuite) TestUpdateExtra_NilExtra() {
s
.
Require
()
.
Equal
(
"val"
,
got
.
Extra
[
"key"
])
}
func
(
s
*
AccountRepoSuite
)
TestUpdateExtra_SchedulerNeutralSkipsOutboxAndPatchesCache
()
{
account
:=
mustCreateAccount
(
s
.
T
(),
s
.
client
,
&
service
.
Account
{
Name
:
"acc-extra-neutral"
,
Platform
:
service
.
PlatformOpenAI
,
Extra
:
map
[
string
]
any
{
"codex_usage_updated_at"
:
"old"
},
})
cacheRecorder
:=
&
schedulerCacheRecorder
{
accounts
:
map
[
int64
]
*
service
.
Account
{
account
.
ID
:
{
ID
:
account
.
ID
,
Platform
:
account
.
Platform
,
Extra
:
map
[
string
]
any
{
"codex_usage_updated_at"
:
"old"
,
},
},
},
}
s
.
repo
.
schedulerCache
=
cacheRecorder
updates
:=
map
[
string
]
any
{
"codex_usage_updated_at"
:
"2026-03-11T10:00:00Z"
,
"codex_5h_used_percent"
:
88.5
,
"session_window_utilization"
:
0.42
,
}
s
.
Require
()
.
NoError
(
s
.
repo
.
UpdateExtra
(
s
.
ctx
,
account
.
ID
,
updates
))
got
,
err
:=
s
.
repo
.
GetByID
(
s
.
ctx
,
account
.
ID
)
s
.
Require
()
.
NoError
(
err
)
s
.
Require
()
.
Equal
(
"2026-03-11T10:00:00Z"
,
got
.
Extra
[
"codex_usage_updated_at"
])
s
.
Require
()
.
Equal
(
88.5
,
got
.
Extra
[
"codex_5h_used_percent"
])
s
.
Require
()
.
Equal
(
0.42
,
got
.
Extra
[
"session_window_utilization"
])
var
outboxCount
int
s
.
Require
()
.
NoError
(
scanSingleRow
(
s
.
ctx
,
s
.
repo
.
sql
,
"SELECT COUNT(*) FROM scheduler_outbox"
,
nil
,
&
outboxCount
))
s
.
Require
()
.
Zero
(
outboxCount
)
s
.
Require
()
.
Len
(
cacheRecorder
.
setAccounts
,
1
)
s
.
Require
()
.
NotNil
(
cacheRecorder
.
accounts
[
account
.
ID
])
s
.
Require
()
.
Equal
(
"2026-03-11T10:00:00Z"
,
cacheRecorder
.
accounts
[
account
.
ID
]
.
Extra
[
"codex_usage_updated_at"
])
}
func
(
s
*
AccountRepoSuite
)
TestUpdateExtra_SchedulerRelevantStillEnqueuesOutbox
()
{
account
:=
mustCreateAccount
(
s
.
T
(),
s
.
client
,
&
service
.
Account
{
Name
:
"acc-extra-mixed"
,
Platform
:
service
.
PlatformAntigravity
,
Extra
:
map
[
string
]
any
{},
})
updates
:=
map
[
string
]
any
{
"mixed_scheduling"
:
true
,
"codex_usage_updated_at"
:
"2026-03-11T10:00:00Z"
,
}
s
.
Require
()
.
NoError
(
s
.
repo
.
UpdateExtra
(
s
.
ctx
,
account
.
ID
,
updates
))
var
outboxCount
int
s
.
Require
()
.
NoError
(
scanSingleRow
(
s
.
ctx
,
s
.
repo
.
sql
,
"SELECT COUNT(*) FROM scheduler_outbox"
,
nil
,
&
outboxCount
))
s
.
Require
()
.
Equal
(
1
,
outboxCount
)
}
// --- GetByCRSAccountID ---
func
(
s
*
AccountRepoSuite
)
TestGetByCRSAccountID
()
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment