Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
陈曦
sub2api
Commits
bcf4aedc
Commit
bcf4aedc
authored
Apr 23, 2026
by
wucm667
Browse files
fix: 修复账户配额跨越时调度快照入队逻辑
parent
6449da6c
Changes
2
Show whitespace changes
Inline
Side-by-side
backend/internal/repository/usage_billing_repo.go
View file @
bcf4aedc
...
@@ -290,7 +290,6 @@ func incrementUsageBillingAccountQuota(ctx context.Context, tx *sql.Tx, accountI
...
@@ -290,7 +290,6 @@ func incrementUsageBillingAccountQuota(ctx context.Context, tx *sql.Tx, accountI
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
}
}
defer
func
()
{
_
=
rows
.
Close
()
}()
var
state
service
.
AccountQuotaState
var
state
service
.
AccountQuotaState
if
rows
.
Next
()
{
if
rows
.
Next
()
{
...
@@ -299,18 +298,36 @@ func incrementUsageBillingAccountQuota(ctx context.Context, tx *sql.Tx, accountI
...
@@ -299,18 +298,36 @@ func incrementUsageBillingAccountQuota(ctx context.Context, tx *sql.Tx, accountI
&
state
.
DailyUsed
,
&
state
.
DailyLimit
,
&
state
.
DailyUsed
,
&
state
.
DailyLimit
,
&
state
.
WeeklyUsed
,
&
state
.
WeeklyLimit
,
&
state
.
WeeklyUsed
,
&
state
.
WeeklyLimit
,
);
err
!=
nil
{
);
err
!=
nil
{
_
=
rows
.
Close
()
return
nil
,
err
return
nil
,
err
}
}
}
else
{
}
else
{
if
err
:=
rows
.
Err
();
err
!=
nil
{
if
err
:=
rows
.
Err
();
err
!=
nil
{
_
=
rows
.
Close
()
return
nil
,
err
return
nil
,
err
}
}
_
=
rows
.
Close
()
return
nil
,
service
.
ErrAccountNotFound
return
nil
,
service
.
ErrAccountNotFound
}
}
if
err
:=
rows
.
Err
();
err
!=
nil
{
if
err
:=
rows
.
Err
();
err
!=
nil
{
_
=
rows
.
Close
()
return
nil
,
err
return
nil
,
err
}
}
if
state
.
TotalLimit
>
0
&&
state
.
TotalUsed
>=
state
.
TotalLimit
&&
(
state
.
TotalUsed
-
amount
)
<
state
.
TotalLimit
{
// 必须在执行下一条 SQL 前显式关闭 rows:pq 驱动在同一连接上
// 不允许前一条查询的结果集未耗尽时启动新查询,否则会返回
// "unexpected Parse response" 错误。
if
err
:=
rows
.
Close
();
err
!=
nil
{
return
nil
,
err
}
// 任意维度额度在本次递增中从"未超"跨越到"已超"时,必须刷新调度快照,
// 否则 Redis 中缓存的 Account 仍显示旧的 used 值,后续请求会继续选中本账号,
// 最终观察到 daily_used / weekly_used 大幅超过配置的 limit。
// 对于日/周额度,即使本次触发了周期重置(pre=0、post=amount),
// 判定式 (post-amount) < limit 同样成立,逻辑与总额度保持一致。
crossedTotal
:=
state
.
TotalLimit
>
0
&&
state
.
TotalUsed
>=
state
.
TotalLimit
&&
(
state
.
TotalUsed
-
amount
)
<
state
.
TotalLimit
crossedDaily
:=
state
.
DailyLimit
>
0
&&
state
.
DailyUsed
>=
state
.
DailyLimit
&&
(
state
.
DailyUsed
-
amount
)
<
state
.
DailyLimit
crossedWeekly
:=
state
.
WeeklyLimit
>
0
&&
state
.
WeeklyUsed
>=
state
.
WeeklyLimit
&&
(
state
.
WeeklyUsed
-
amount
)
<
state
.
WeeklyLimit
if
crossedTotal
||
crossedDaily
||
crossedWeekly
{
if
err
:=
enqueueSchedulerOutbox
(
ctx
,
tx
,
service
.
SchedulerOutboxEventAccountChanged
,
&
accountID
,
nil
,
nil
);
err
!=
nil
{
if
err
:=
enqueueSchedulerOutbox
(
ctx
,
tx
,
service
.
SchedulerOutboxEventAccountChanged
,
&
accountID
,
nil
,
nil
);
err
!=
nil
{
logger
.
LegacyPrintf
(
"repository.usage_billing"
,
"[SchedulerOutbox] enqueue quota exceeded failed: account=%d err=%v"
,
accountID
,
err
)
logger
.
LegacyPrintf
(
"repository.usage_billing"
,
"[SchedulerOutbox] enqueue quota exceeded failed: account=%d err=%v"
,
accountID
,
err
)
return
nil
,
err
return
nil
,
err
...
...
backend/internal/repository/usage_billing_repo_integration_test.go
View file @
bcf4aedc
...
@@ -199,6 +199,94 @@ func TestUsageBillingRepositoryApply_UpdatesAccountQuota(t *testing.T) {
...
@@ -199,6 +199,94 @@ func TestUsageBillingRepositoryApply_UpdatesAccountQuota(t *testing.T) {
require
.
InDelta
(
t
,
3.5
,
quotaUsed
,
0.000001
)
require
.
InDelta
(
t
,
3.5
,
quotaUsed
,
0.000001
)
}
}
func
TestUsageBillingRepositoryApply_EnqueuesSchedulerOutboxOnQuotaCrossing
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
client
:=
testEntClient
(
t
)
repo
:=
NewUsageBillingRepository
(
client
,
integrationDB
)
newFixture
:=
func
(
t
*
testing
.
T
,
extra
map
[
string
]
any
)
(
int64
,
int64
)
{
t
.
Helper
()
user
:=
mustCreateUser
(
t
,
client
,
&
service
.
User
{
Email
:
fmt
.
Sprintf
(
"usage-billing-outbox-user-%d-%s@example.com"
,
time
.
Now
()
.
UnixNano
(),
uuid
.
NewString
()),
PasswordHash
:
"hash"
,
})
apiKey
:=
mustCreateApiKey
(
t
,
client
,
&
service
.
APIKey
{
UserID
:
user
.
ID
,
Key
:
"sk-usage-billing-outbox-"
+
uuid
.
NewString
(),
Name
:
"billing-outbox"
,
})
account
:=
mustCreateAccount
(
t
,
client
,
&
service
.
Account
{
Name
:
"usage-billing-outbox-"
+
uuid
.
NewString
(),
Type
:
service
.
AccountTypeAPIKey
,
Extra
:
extra
,
})
return
apiKey
.
ID
,
account
.
ID
}
outboxCountFor
:=
func
(
t
*
testing
.
T
,
accountID
int64
)
int
{
t
.
Helper
()
var
count
int
require
.
NoError
(
t
,
integrationDB
.
QueryRowContext
(
ctx
,
"SELECT COUNT(*) FROM scheduler_outbox WHERE event_type = $1 AND account_id = $2"
,
service
.
SchedulerOutboxEventAccountChanged
,
accountID
,
)
.
Scan
(
&
count
))
return
count
}
t
.
Run
(
"daily_first_crossing_enqueues"
,
func
(
t
*
testing
.
T
)
{
apiKeyID
,
accountID
:=
newFixture
(
t
,
map
[
string
]
any
{
"quota_daily_limit"
:
10.0
,
})
// 第一次低于日限额:不应入队 outbox
_
,
err
:=
repo
.
Apply
(
ctx
,
&
service
.
UsageBillingCommand
{
RequestID
:
uuid
.
NewString
(),
APIKeyID
:
apiKeyID
,
AccountID
:
accountID
,
AccountType
:
service
.
AccountTypeAPIKey
,
AccountQuotaCost
:
4
,
})
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
0
,
outboxCountFor
(
t
,
accountID
),
"below limit should not enqueue"
)
// 第二次跨越日限额:应入队一次 outbox
_
,
err
=
repo
.
Apply
(
ctx
,
&
service
.
UsageBillingCommand
{
RequestID
:
uuid
.
NewString
(),
APIKeyID
:
apiKeyID
,
AccountID
:
accountID
,
AccountType
:
service
.
AccountTypeAPIKey
,
AccountQuotaCost
:
8
,
})
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
1
,
outboxCountFor
(
t
,
accountID
),
"crossing daily limit should enqueue once"
)
// 再次递增(已超):不应重复入队
_
,
err
=
repo
.
Apply
(
ctx
,
&
service
.
UsageBillingCommand
{
RequestID
:
uuid
.
NewString
(),
APIKeyID
:
apiKeyID
,
AccountID
:
accountID
,
AccountType
:
service
.
AccountTypeAPIKey
,
AccountQuotaCost
:
2
,
})
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
1
,
outboxCountFor
(
t
,
accountID
),
"subsequent increments beyond limit should not re-enqueue"
)
})
t
.
Run
(
"weekly_first_crossing_enqueues"
,
func
(
t
*
testing
.
T
)
{
apiKeyID
,
accountID
:=
newFixture
(
t
,
map
[
string
]
any
{
"quota_weekly_limit"
:
10.0
,
})
_
,
err
:=
repo
.
Apply
(
ctx
,
&
service
.
UsageBillingCommand
{
RequestID
:
uuid
.
NewString
(),
APIKeyID
:
apiKeyID
,
AccountID
:
accountID
,
AccountType
:
service
.
AccountTypeAPIKey
,
AccountQuotaCost
:
15
,
// 单次即跨越
})
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
1
,
outboxCountFor
(
t
,
accountID
),
"single-shot crossing weekly limit should enqueue once"
)
})
}
func
TestDashboardAggregationRepositoryCleanupUsageBillingDedup_BatchDeletesOldRows
(
t
*
testing
.
T
)
{
func
TestDashboardAggregationRepositoryCleanupUsageBillingDedup_BatchDeletesOldRows
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
ctx
:=
context
.
Background
()
repo
:=
newDashboardAggregationRepositoryWithSQL
(
integrationDB
)
repo
:=
newDashboardAggregationRepositoryWithSQL
(
integrationDB
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment