Unverified Commit 83a16dec authored by Wesley Liddick's avatar Wesley Liddick Committed by GitHub
Browse files

Merge pull request #1407 from DaydreamCoding/feat/cache-driven-rpm-buffer

feat(gateway): Cache-Driven RPM Buffer
parents 820c5318 72e5876c
......@@ -1742,22 +1742,47 @@ func (a *Account) GetRPMStrategy() string {
}
// GetRPMStickyBuffer 获取 RPM 粘性缓冲数量
// tiered 模式下的黄区大小,默认为 base_rpm 的 20%(至少 1)
// Cache-driven: buffer = concurrency + maxSessions(覆盖幽灵窗口 + 稳态会话需求)
// floor = baseRPM / 5(向后兼容 maxSessions=0 且 concurrency=0 场景)
func (a *Account) GetRPMStickyBuffer() int {
if a.Extra == nil {
return 0
}
// 手动 override 最高优先级
if v, ok := a.Extra["rpm_sticky_buffer"]; ok {
val := parseExtraInt(v)
if val > 0 {
return val
}
}
base := a.GetBaseRPM()
buffer := base / 5
if buffer < 1 && base > 0 {
buffer = 1
if base <= 0 {
return 0
}
// Cache-driven buffer = concurrency + maxSessions
conc := a.Concurrency
if conc < 0 {
conc = 0
}
sess := a.GetMaxSessions()
if sess < 0 {
sess = 0
}
buffer := conc + sess
// floor: 向后兼容
floor := base / 5
if floor < 1 {
floor = 1
}
if buffer < floor {
buffer = floor
}
return buffer
}
......
......@@ -90,28 +90,47 @@ func TestCheckRPMSchedulability(t *testing.T) {
func TestGetRPMStickyBuffer(t *testing.T) {
tests := []struct {
name string
extra map[string]any
expected int
name string
concurrency int
extra map[string]any
expected int
}{
{"nil extra", nil, 0},
{"no keys", map[string]any{}, 0},
{"base_rpm=0", map[string]any{"base_rpm": 0}, 0},
{"base_rpm=1 min buffer 1", map[string]any{"base_rpm": 1}, 1},
{"base_rpm=4 min buffer 1", map[string]any{"base_rpm": 4}, 1},
{"base_rpm=5 buffer 1", map[string]any{"base_rpm": 5}, 1},
{"base_rpm=10 buffer 2", map[string]any{"base_rpm": 10}, 2},
{"base_rpm=15 buffer 3", map[string]any{"base_rpm": 15}, 3},
{"base_rpm=100 buffer 20", map[string]any{"base_rpm": 100}, 20},
{"custom buffer=5", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 5}, 5},
{"custom buffer=0 fallback to default", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 0}, 2},
{"custom buffer negative fallback", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": -1}, 2},
{"custom buffer with float", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": float64(7)}, 7},
{"json.Number base_rpm", map[string]any{"base_rpm": json.Number("10")}, 2},
// 基础退化
{"nil extra", 0, nil, 0},
{"no keys", 0, map[string]any{}, 0},
{"base_rpm=0", 0, map[string]any{"base_rpm": 0}, 0},
// 新公式: concurrency + maxSessions, floor = base/5
{"conc=3 sess=10 → 13", 3, map[string]any{"base_rpm": 15, "max_sessions": 10}, 13},
{"conc=2 sess=5 → 7", 2, map[string]any{"base_rpm": 10, "max_sessions": 5}, 7},
{"conc=3 sess=15 → 18", 3, map[string]any{"base_rpm": 30, "max_sessions": 15}, 18},
// floor 生效 (conc+sess < base/5)
{"conc=0 sess=0 base=15 → floor 3", 0, map[string]any{"base_rpm": 15}, 3},
{"conc=0 sess=0 base=10 → floor 2", 0, map[string]any{"base_rpm": 10}, 2},
{"conc=0 sess=0 base=1 → floor 1", 0, map[string]any{"base_rpm": 1}, 1},
{"conc=0 sess=0 base=4 → floor 1", 0, map[string]any{"base_rpm": 4}, 1},
{"conc=1 sess=0 base=15 → floor 3", 1, map[string]any{"base_rpm": 15}, 3},
// 手动 override
{"custom buffer=5", 3, map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 5, "max_sessions": 10}, 5},
{"custom buffer=0 fallback", 3, map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 0, "max_sessions": 10}, 13},
{"custom buffer negative fallback", 3, map[string]any{"base_rpm": 10, "rpm_sticky_buffer": -1, "max_sessions": 10}, 13},
{"custom buffer with float", 3, map[string]any{"base_rpm": 10, "rpm_sticky_buffer": float64(7)}, 7},
// 负值 clamp
{"negative concurrency clamped", -5, map[string]any{"base_rpm": 15, "max_sessions": 10}, 10},
{"negative maxSessions clamped", 3, map[string]any{"base_rpm": 15, "max_sessions": -5}, 3},
// 高并发低会话
{"conc=10 sess=5 → 15", 10, map[string]any{"base_rpm": 10, "max_sessions": 5}, 15},
// json.Number
{"json.Number base_rpm", 3, map[string]any{"base_rpm": json.Number("10"), "max_sessions": json.Number("5")}, 8},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := &Account{Extra: tt.extra}
a := &Account{Concurrency: tt.concurrency, Extra: tt.extra}
if got := a.GetRPMStickyBuffer(); got != tt.expected {
t.Errorf("GetRPMStickyBuffer() = %d, want %d", got, tt.expected)
}
......
......@@ -1418,19 +1418,24 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
if containsInt64(routingAccountIDs, stickyAccountID) && !isExcluded(stickyAccountID) {
// 粘性账号在路由列表中,优先使用
if stickyAccount, ok := accountByID[stickyAccountID]; ok {
if s.isAccountSchedulableForSelection(stickyAccount) &&
var stickyCacheMissReason string
gatePass := s.isAccountSchedulableForSelection(stickyAccount) &&
s.isAccountAllowedForPlatform(stickyAccount, platform, useMixed) &&
(requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, stickyAccount, requestedModel)) &&
s.isAccountSchedulableForModelSelection(ctx, stickyAccount, requestedModel) &&
s.isAccountSchedulableForQuota(stickyAccount) &&
s.isAccountSchedulableForWindowCost(ctx, stickyAccount, true) &&
s.isAccountSchedulableForWindowCost(ctx, stickyAccount, true)
rpmPass := gatePass && s.isAccountSchedulableForRPM(ctx, stickyAccount, true)
s.isAccountSchedulableForRPM(ctx, stickyAccount, true) { // 粘性会话窗口费用+RPM 检查
if rpmPass { // 粘性会话窗口费用+RPM 检查
result, err := s.tryAcquireAccountSlot(ctx, stickyAccountID, stickyAccount.Concurrency)
if err == nil && result.Acquired {
// 会话数量限制检查
if !s.checkAndRegisterSession(ctx, stickyAccount, sessionHash) {
result.ReleaseFunc() // 释放槽位
stickyCacheMissReason = "session_limit"
// 继续到负载感知选择
} else {
if s.debugModelRoutingEnabled() {
......@@ -1444,27 +1449,49 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
}
}
waitingCount, _ := s.concurrencyService.GetAccountWaitingCount(ctx, stickyAccountID)
if waitingCount < cfg.StickySessionMaxWaiting {
// 会话数量限制检查(等待计划也需要占用会话配额)
if !s.checkAndRegisterSession(ctx, stickyAccount, sessionHash) {
// 会话限制已满,继续到负载感知选择
if stickyCacheMissReason == "" {
waitingCount, _ := s.concurrencyService.GetAccountWaitingCount(ctx, stickyAccountID)
if waitingCount < cfg.StickySessionMaxWaiting {
// 会话数量限制检查(等待计划也需要占用会话配额)
if !s.checkAndRegisterSession(ctx, stickyAccount, sessionHash) {
stickyCacheMissReason = "session_limit"
// 会话限制已满,继续到负载感知选择
} else {
return &AccountSelectionResult{
Account: stickyAccount,
WaitPlan: &AccountWaitPlan{
AccountID: stickyAccountID,
MaxConcurrency: stickyAccount.Concurrency,
Timeout: cfg.StickySessionWaitTimeout,
MaxWaiting: cfg.StickySessionMaxWaiting,
},
}, nil
}
} else {
return &AccountSelectionResult{
Account: stickyAccount,
WaitPlan: &AccountWaitPlan{
AccountID: stickyAccountID,
MaxConcurrency: stickyAccount.Concurrency,
Timeout: cfg.StickySessionWaitTimeout,
MaxWaiting: cfg.StickySessionMaxWaiting,
},
}, nil
stickyCacheMissReason = "wait_queue_full"
}
}
// 粘性账号槽位满且等待队列已满,继续使用负载感知选择
} else if !gatePass {
stickyCacheMissReason = "gate_check"
} else {
stickyCacheMissReason = "rpm_red"
}
// 记录粘性缓存未命中的结构化日志
if stickyCacheMissReason != "" {
baseRPM := stickyAccount.GetBaseRPM()
var currentRPM int
if count, ok := rpmFromPrefetchContext(ctx, stickyAccount.ID); ok {
currentRPM = count
}
logger.LegacyPrintf("service.gateway", "[StickyCacheMiss] reason=%s account_id=%d session=%s current_rpm=%d base_rpm=%d",
stickyCacheMissReason, stickyAccountID, shortSessionHash(sessionHash), currentRPM, baseRPM)
}
} else {
_ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash)
logger.LegacyPrintf("service.gateway", "[StickyCacheMiss] reason=account_cleared account_id=%d session=%s current_rpm=0 base_rpm=0",
stickyAccountID, shortSessionHash(sessionHash))
}
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment