Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
陈曦
sub2api
Commits
dbd7969a
"backend/vscode:/vscode.git/clone" did not exist on "f5764d8dc6fbc0f98e0bef292b16bd570b0897eb"
Commit
dbd7969a
authored
Feb 12, 2026
by
yangjianbo
Browse files
Merge branch 'test' into release
parents
10c1590b
af306907
Changes
23
Hide whitespace changes
Inline
Side-by-side
backend/internal/config/config.go
View file @
dbd7969a
...
...
@@ -280,6 +280,9 @@ type GatewayConfig struct {
// ForceCodexCLI: 强制将 OpenAI `/v1/responses` 请求按 Codex CLI 处理。
// 用于网关未透传/改写 User-Agent 时的兼容兜底(默认关闭,避免影响其他客户端)。
ForceCodexCLI
bool
`mapstructure:"force_codex_cli"`
// OpenAIPassthroughAllowTimeoutHeaders: OpenAI 透传模式是否放行客户端超时头
// 关闭(默认)可避免 x-stainless-timeout 等头导致上游提前断流。
OpenAIPassthroughAllowTimeoutHeaders
bool
`mapstructure:"openai_passthrough_allow_timeout_headers"`
// HTTP 上游连接池配置(性能优化:支持高并发场景调优)
// MaxIdleConns: 所有主机的最大空闲连接总数
...
...
@@ -995,6 +998,7 @@ func setDefaults() {
viper
.
SetDefault
(
"gateway.max_account_switches"
,
10
)
viper
.
SetDefault
(
"gateway.max_account_switches_gemini"
,
3
)
viper
.
SetDefault
(
"gateway.force_codex_cli"
,
false
)
viper
.
SetDefault
(
"gateway.openai_passthrough_allow_timeout_headers"
,
false
)
viper
.
SetDefault
(
"gateway.antigravity_fallback_cooldown_minutes"
,
1
)
viper
.
SetDefault
(
"gateway.max_body_size"
,
int64
(
100
*
1024
*
1024
))
viper
.
SetDefault
(
"gateway.sora_max_body_size"
,
int64
(
256
*
1024
*
1024
))
...
...
backend/internal/handler/admin/admin_helpers_test.go
View file @
dbd7969a
...
...
@@ -58,6 +58,96 @@ func TestParseOpsDuration(t *testing.T) {
require
.
False
(
t
,
ok
)
}
func
TestParseOpsOpenAITokenStatsDuration
(
t
*
testing
.
T
)
{
tests
:=
[]
struct
{
input
string
want
time
.
Duration
ok
bool
}{
{
input
:
"30m"
,
want
:
30
*
time
.
Minute
,
ok
:
true
},
{
input
:
"1h"
,
want
:
time
.
Hour
,
ok
:
true
},
{
input
:
"1d"
,
want
:
24
*
time
.
Hour
,
ok
:
true
},
{
input
:
"15d"
,
want
:
15
*
24
*
time
.
Hour
,
ok
:
true
},
{
input
:
"30d"
,
want
:
30
*
24
*
time
.
Hour
,
ok
:
true
},
{
input
:
"7d"
,
want
:
0
,
ok
:
false
},
}
for
_
,
tt
:=
range
tests
{
got
,
ok
:=
parseOpsOpenAITokenStatsDuration
(
tt
.
input
)
require
.
Equal
(
t
,
tt
.
ok
,
ok
,
"input=%s"
,
tt
.
input
)
require
.
Equal
(
t
,
tt
.
want
,
got
,
"input=%s"
,
tt
.
input
)
}
}
func
TestParseOpsOpenAITokenStatsFilter_Defaults
(
t
*
testing
.
T
)
{
gin
.
SetMode
(
gin
.
TestMode
)
w
:=
httptest
.
NewRecorder
()
c
,
_
:=
gin
.
CreateTestContext
(
w
)
c
.
Request
=
httptest
.
NewRequest
(
http
.
MethodGet
,
"/"
,
nil
)
before
:=
time
.
Now
()
.
UTC
()
filter
,
err
:=
parseOpsOpenAITokenStatsFilter
(
c
)
after
:=
time
.
Now
()
.
UTC
()
require
.
NoError
(
t
,
err
)
require
.
NotNil
(
t
,
filter
)
require
.
Equal
(
t
,
"30d"
,
filter
.
TimeRange
)
require
.
Equal
(
t
,
1
,
filter
.
Page
)
require
.
Equal
(
t
,
20
,
filter
.
PageSize
)
require
.
Equal
(
t
,
0
,
filter
.
TopN
)
require
.
Nil
(
t
,
filter
.
GroupID
)
require
.
Equal
(
t
,
""
,
filter
.
Platform
)
require
.
True
(
t
,
filter
.
StartTime
.
Before
(
filter
.
EndTime
))
require
.
WithinDuration
(
t
,
before
.
Add
(
-
30
*
24
*
time
.
Hour
),
filter
.
StartTime
,
2
*
time
.
Second
)
require
.
WithinDuration
(
t
,
after
,
filter
.
EndTime
,
2
*
time
.
Second
)
}
func
TestParseOpsOpenAITokenStatsFilter_WithTopN
(
t
*
testing
.
T
)
{
gin
.
SetMode
(
gin
.
TestMode
)
w
:=
httptest
.
NewRecorder
()
c
,
_
:=
gin
.
CreateTestContext
(
w
)
c
.
Request
=
httptest
.
NewRequest
(
http
.
MethodGet
,
"/?time_range=1h&platform=openai&group_id=12&top_n=50"
,
nil
,
)
filter
,
err
:=
parseOpsOpenAITokenStatsFilter
(
c
)
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
"1h"
,
filter
.
TimeRange
)
require
.
Equal
(
t
,
"openai"
,
filter
.
Platform
)
require
.
NotNil
(
t
,
filter
.
GroupID
)
require
.
Equal
(
t
,
int64
(
12
),
*
filter
.
GroupID
)
require
.
Equal
(
t
,
50
,
filter
.
TopN
)
require
.
Equal
(
t
,
0
,
filter
.
Page
)
require
.
Equal
(
t
,
0
,
filter
.
PageSize
)
}
func
TestParseOpsOpenAITokenStatsFilter_InvalidParams
(
t
*
testing
.
T
)
{
tests
:=
[]
string
{
"/?time_range=7d"
,
"/?group_id=0"
,
"/?group_id=abc"
,
"/?top_n=0"
,
"/?top_n=101"
,
"/?top_n=10&page=1"
,
"/?top_n=10&page_size=20"
,
"/?page=0"
,
"/?page_size=0"
,
"/?page_size=101"
,
}
gin
.
SetMode
(
gin
.
TestMode
)
for
_
,
rawURL
:=
range
tests
{
w
:=
httptest
.
NewRecorder
()
c
,
_
:=
gin
.
CreateTestContext
(
w
)
c
.
Request
=
httptest
.
NewRequest
(
http
.
MethodGet
,
rawURL
,
nil
)
_
,
err
:=
parseOpsOpenAITokenStatsFilter
(
c
)
require
.
Error
(
t
,
err
,
"url=%s"
,
rawURL
)
}
}
func
TestParseOpsTimeRange
(
t
*
testing
.
T
)
{
gin
.
SetMode
(
gin
.
TestMode
)
w
:=
httptest
.
NewRecorder
()
...
...
backend/internal/handler/admin/ops_dashboard_handler.go
View file @
dbd7969a
package
admin
import
(
"fmt"
"net/http"
"strconv"
"strings"
...
...
@@ -218,6 +219,115 @@ func (h *OpsHandler) GetDashboardErrorDistribution(c *gin.Context) {
response
.
Success
(
c
,
data
)
}
// GetDashboardOpenAITokenStats returns OpenAI token efficiency stats grouped by model.
// GET /api/v1/admin/ops/dashboard/openai-token-stats
func
(
h
*
OpsHandler
)
GetDashboardOpenAITokenStats
(
c
*
gin
.
Context
)
{
if
h
.
opsService
==
nil
{
response
.
Error
(
c
,
http
.
StatusServiceUnavailable
,
"Ops service not available"
)
return
}
if
err
:=
h
.
opsService
.
RequireMonitoringEnabled
(
c
.
Request
.
Context
());
err
!=
nil
{
response
.
ErrorFrom
(
c
,
err
)
return
}
filter
,
err
:=
parseOpsOpenAITokenStatsFilter
(
c
)
if
err
!=
nil
{
response
.
BadRequest
(
c
,
err
.
Error
())
return
}
data
,
err
:=
h
.
opsService
.
GetOpenAITokenStats
(
c
.
Request
.
Context
(),
filter
)
if
err
!=
nil
{
response
.
ErrorFrom
(
c
,
err
)
return
}
response
.
Success
(
c
,
data
)
}
func
parseOpsOpenAITokenStatsFilter
(
c
*
gin
.
Context
)
(
*
service
.
OpsOpenAITokenStatsFilter
,
error
)
{
if
c
==
nil
{
return
nil
,
fmt
.
Errorf
(
"invalid request"
)
}
timeRange
:=
strings
.
TrimSpace
(
c
.
Query
(
"time_range"
))
if
timeRange
==
""
{
timeRange
=
"30d"
}
dur
,
ok
:=
parseOpsOpenAITokenStatsDuration
(
timeRange
)
if
!
ok
{
return
nil
,
fmt
.
Errorf
(
"invalid time_range"
)
}
end
:=
time
.
Now
()
.
UTC
()
start
:=
end
.
Add
(
-
dur
)
filter
:=
&
service
.
OpsOpenAITokenStatsFilter
{
TimeRange
:
timeRange
,
StartTime
:
start
,
EndTime
:
end
,
Platform
:
strings
.
TrimSpace
(
c
.
Query
(
"platform"
)),
}
if
v
:=
strings
.
TrimSpace
(
c
.
Query
(
"group_id"
));
v
!=
""
{
id
,
err
:=
strconv
.
ParseInt
(
v
,
10
,
64
)
if
err
!=
nil
||
id
<=
0
{
return
nil
,
fmt
.
Errorf
(
"invalid group_id"
)
}
filter
.
GroupID
=
&
id
}
topNRaw
:=
strings
.
TrimSpace
(
c
.
Query
(
"top_n"
))
pageRaw
:=
strings
.
TrimSpace
(
c
.
Query
(
"page"
))
pageSizeRaw
:=
strings
.
TrimSpace
(
c
.
Query
(
"page_size"
))
if
topNRaw
!=
""
&&
(
pageRaw
!=
""
||
pageSizeRaw
!=
""
)
{
return
nil
,
fmt
.
Errorf
(
"invalid query: top_n cannot be used with page/page_size"
)
}
if
topNRaw
!=
""
{
topN
,
err
:=
strconv
.
Atoi
(
topNRaw
)
if
err
!=
nil
||
topN
<
1
||
topN
>
100
{
return
nil
,
fmt
.
Errorf
(
"invalid top_n"
)
}
filter
.
TopN
=
topN
return
filter
,
nil
}
filter
.
Page
=
1
filter
.
PageSize
=
20
if
pageRaw
!=
""
{
page
,
err
:=
strconv
.
Atoi
(
pageRaw
)
if
err
!=
nil
||
page
<
1
{
return
nil
,
fmt
.
Errorf
(
"invalid page"
)
}
filter
.
Page
=
page
}
if
pageSizeRaw
!=
""
{
pageSize
,
err
:=
strconv
.
Atoi
(
pageSizeRaw
)
if
err
!=
nil
||
pageSize
<
1
||
pageSize
>
100
{
return
nil
,
fmt
.
Errorf
(
"invalid page_size"
)
}
filter
.
PageSize
=
pageSize
}
return
filter
,
nil
}
func
parseOpsOpenAITokenStatsDuration
(
v
string
)
(
time
.
Duration
,
bool
)
{
switch
strings
.
TrimSpace
(
v
)
{
case
"30m"
:
return
30
*
time
.
Minute
,
true
case
"1h"
:
return
time
.
Hour
,
true
case
"1d"
:
return
24
*
time
.
Hour
,
true
case
"15d"
:
return
15
*
24
*
time
.
Hour
,
true
case
"30d"
:
return
30
*
24
*
time
.
Hour
,
true
default
:
return
0
,
false
}
}
func
pickThroughputBucketSeconds
(
window
time
.
Duration
)
int
{
// Keep buckets predictable and avoid huge responses.
switch
{
...
...
backend/internal/handler/openai_gateway_handler.go
View file @
dbd7969a
...
...
@@ -28,7 +28,6 @@ type OpenAIGatewayHandler struct {
errorPassthroughService
*
service
.
ErrorPassthroughService
concurrencyHelper
*
ConcurrencyHelper
maxAccountSwitches
int
cfg
*
config
.
Config
}
// NewOpenAIGatewayHandler creates a new OpenAIGatewayHandler
...
...
@@ -55,7 +54,6 @@ func NewOpenAIGatewayHandler(
errorPassthroughService
:
errorPassthroughService
,
concurrencyHelper
:
NewConcurrencyHelper
(
concurrencyService
,
SSEPingFormatComment
,
pingInterval
),
maxAccountSwitches
:
maxAccountSwitches
,
cfg
:
cfg
,
}
}
...
...
backend/internal/repository/concurrency_cache.go
View file @
dbd7969a
...
...
@@ -147,108 +147,6 @@ var (
return 1
`
)
// WARNING: Redis Cluster 不兼容 — 脚本内部拼接 key,Cluster 模式下可能路由到错误节点。
// 调用时传递空 KEYS 数组,所有 key 在 Lua 内通过 ARGV 动态拼接,
// 无法被 Redis Cluster 正确路由到对应 slot,仅适用于单节点或 Sentinel 模式。
//
// getAccountsLoadBatchScript - batch load query with expired slot cleanup
// ARGV[1] = slot TTL (seconds)
// ARGV[2..n] = accountID1, maxConcurrency1, accountID2, maxConcurrency2, ...
getAccountsLoadBatchScript
=
redis
.
NewScript
(
`
local result = {}
local slotTTL = tonumber(ARGV[1])
-- Get current server time
local timeResult = redis.call('TIME')
local nowSeconds = tonumber(timeResult[1])
local cutoffTime = nowSeconds - slotTTL
local i = 2
while i <= #ARGV do
local accountID = ARGV[i]
local maxConcurrency = tonumber(ARGV[i + 1])
local slotKey = 'concurrency:account:' .. accountID
-- Clean up expired slots before counting
redis.call('ZREMRANGEBYSCORE', slotKey, '-inf', cutoffTime)
local currentConcurrency = redis.call('ZCARD', slotKey)
local waitKey = 'wait:account:' .. accountID
local waitingCount = redis.call('GET', waitKey)
if waitingCount == false then
waitingCount = 0
else
waitingCount = tonumber(waitingCount)
end
local loadRate = 0
if maxConcurrency > 0 then
loadRate = math.floor((currentConcurrency + waitingCount) * 100 / maxConcurrency)
end
table.insert(result, accountID)
table.insert(result, currentConcurrency)
table.insert(result, waitingCount)
table.insert(result, loadRate)
i = i + 2
end
return result
`
)
// WARNING: Redis Cluster 不兼容 — 脚本内部拼接 key,Cluster 模式下可能路由到错误节点。
// 调用时传递空 KEYS 数组,所有 key 在 Lua 内通过 ARGV 动态拼接,
// 无法被 Redis Cluster 正确路由到对应 slot,仅适用于单节点或 Sentinel 模式。
//
// getUsersLoadBatchScript - batch load query for users with expired slot cleanup
// ARGV[1] = slot TTL (seconds)
// ARGV[2..n] = userID1, maxConcurrency1, userID2, maxConcurrency2, ...
getUsersLoadBatchScript
=
redis
.
NewScript
(
`
local result = {}
local slotTTL = tonumber(ARGV[1])
-- Get current server time
local timeResult = redis.call('TIME')
local nowSeconds = tonumber(timeResult[1])
local cutoffTime = nowSeconds - slotTTL
local i = 2
while i <= #ARGV do
local userID = ARGV[i]
local maxConcurrency = tonumber(ARGV[i + 1])
local slotKey = 'concurrency:user:' .. userID
-- Clean up expired slots before counting
redis.call('ZREMRANGEBYSCORE', slotKey, '-inf', cutoffTime)
local currentConcurrency = redis.call('ZCARD', slotKey)
local waitKey = 'concurrency:wait:' .. userID
local waitingCount = redis.call('GET', waitKey)
if waitingCount == false then
waitingCount = 0
else
waitingCount = tonumber(waitingCount)
end
local loadRate = 0
if maxConcurrency > 0 then
loadRate = math.floor((currentConcurrency + waitingCount) * 100 / maxConcurrency)
end
table.insert(result, userID)
table.insert(result, currentConcurrency)
table.insert(result, waitingCount)
table.insert(result, loadRate)
i = i + 2
end
return result
`
)
// cleanupExpiredSlotsScript - remove expired slots
// KEYS[1] = concurrency:account:{accountID}
// ARGV[1] = TTL (seconds)
...
...
backend/internal/repository/ops_repo_openai_token_stats.go
0 → 100644
View file @
dbd7969a
package
repository
import
(
"context"
"database/sql"
"fmt"
"strings"
"github.com/Wei-Shaw/sub2api/internal/service"
)
func
(
r
*
opsRepository
)
GetOpenAITokenStats
(
ctx
context
.
Context
,
filter
*
service
.
OpsOpenAITokenStatsFilter
)
(
*
service
.
OpsOpenAITokenStatsResponse
,
error
)
{
if
r
==
nil
||
r
.
db
==
nil
{
return
nil
,
fmt
.
Errorf
(
"nil ops repository"
)
}
if
filter
==
nil
{
return
nil
,
fmt
.
Errorf
(
"nil filter"
)
}
if
filter
.
StartTime
.
IsZero
()
||
filter
.
EndTime
.
IsZero
()
{
return
nil
,
fmt
.
Errorf
(
"start_time/end_time required"
)
}
// 允许 start_time == end_time(结果为空),与 service 层校验口径保持一致。
if
filter
.
StartTime
.
After
(
filter
.
EndTime
)
{
return
nil
,
fmt
.
Errorf
(
"start_time must be <= end_time"
)
}
dashboardFilter
:=
&
service
.
OpsDashboardFilter
{
StartTime
:
filter
.
StartTime
.
UTC
(),
EndTime
:
filter
.
EndTime
.
UTC
(),
Platform
:
strings
.
TrimSpace
(
strings
.
ToLower
(
filter
.
Platform
)),
GroupID
:
filter
.
GroupID
,
}
join
,
where
,
baseArgs
,
next
:=
buildUsageWhere
(
dashboardFilter
,
dashboardFilter
.
StartTime
,
dashboardFilter
.
EndTime
,
1
)
where
+=
" AND ul.model LIKE 'gpt%'"
baseCTE
:=
`
WITH stats AS (
SELECT
ul.model AS model,
COUNT(*)::bigint AS request_count,
ROUND(
AVG(
CASE
WHEN ul.duration_ms > 0 AND ul.output_tokens > 0
THEN ul.output_tokens * 1000.0 / ul.duration_ms
END
)::numeric,
2
)::float8 AS avg_tokens_per_sec,
ROUND(AVG(ul.first_token_ms)::numeric, 2)::float8 AS avg_first_token_ms,
COALESCE(SUM(ul.output_tokens), 0)::bigint AS total_output_tokens,
COALESCE(ROUND(AVG(ul.duration_ms)::numeric, 0), 0)::bigint AS avg_duration_ms,
COUNT(CASE WHEN ul.first_token_ms IS NOT NULL THEN 1 END)::bigint AS requests_with_first_token
FROM usage_logs ul
`
+
join
+
`
`
+
where
+
`
GROUP BY ul.model
)
`
countSQL
:=
baseCTE
+
`SELECT COUNT(*) FROM stats`
var
total
int64
if
err
:=
r
.
db
.
QueryRowContext
(
ctx
,
countSQL
,
baseArgs
...
)
.
Scan
(
&
total
);
err
!=
nil
{
return
nil
,
err
}
querySQL
:=
baseCTE
+
`
SELECT
model,
request_count,
avg_tokens_per_sec,
avg_first_token_ms,
total_output_tokens,
avg_duration_ms,
requests_with_first_token
FROM stats
ORDER BY request_count DESC, model ASC`
args
:=
make
([]
any
,
0
,
len
(
baseArgs
)
+
2
)
args
=
append
(
args
,
baseArgs
...
)
if
filter
.
IsTopNMode
()
{
querySQL
+=
fmt
.
Sprintf
(
"
\n
LIMIT $%d"
,
next
)
args
=
append
(
args
,
filter
.
TopN
)
}
else
{
offset
:=
(
filter
.
Page
-
1
)
*
filter
.
PageSize
querySQL
+=
fmt
.
Sprintf
(
"
\n
LIMIT $%d OFFSET $%d"
,
next
,
next
+
1
)
args
=
append
(
args
,
filter
.
PageSize
,
offset
)
}
rows
,
err
:=
r
.
db
.
QueryContext
(
ctx
,
querySQL
,
args
...
)
if
err
!=
nil
{
return
nil
,
err
}
defer
func
()
{
_
=
rows
.
Close
()
}()
items
:=
make
([]
*
service
.
OpsOpenAITokenStatsItem
,
0
,
32
)
for
rows
.
Next
()
{
item
:=
&
service
.
OpsOpenAITokenStatsItem
{}
var
avgTPS
sql
.
NullFloat64
var
avgFirstToken
sql
.
NullFloat64
if
err
:=
rows
.
Scan
(
&
item
.
Model
,
&
item
.
RequestCount
,
&
avgTPS
,
&
avgFirstToken
,
&
item
.
TotalOutputTokens
,
&
item
.
AvgDurationMs
,
&
item
.
RequestsWithFirstToken
,
);
err
!=
nil
{
return
nil
,
err
}
if
avgTPS
.
Valid
{
v
:=
avgTPS
.
Float64
item
.
AvgTokensPerSec
=
&
v
}
if
avgFirstToken
.
Valid
{
v
:=
avgFirstToken
.
Float64
item
.
AvgFirstTokenMs
=
&
v
}
items
=
append
(
items
,
item
)
}
if
err
:=
rows
.
Err
();
err
!=
nil
{
return
nil
,
err
}
resp
:=
&
service
.
OpsOpenAITokenStatsResponse
{
TimeRange
:
strings
.
TrimSpace
(
filter
.
TimeRange
),
StartTime
:
dashboardFilter
.
StartTime
,
EndTime
:
dashboardFilter
.
EndTime
,
Platform
:
dashboardFilter
.
Platform
,
GroupID
:
dashboardFilter
.
GroupID
,
Items
:
items
,
Total
:
total
,
}
if
filter
.
IsTopNMode
()
{
topN
:=
filter
.
TopN
resp
.
TopN
=
&
topN
}
else
{
resp
.
Page
=
filter
.
Page
resp
.
PageSize
=
filter
.
PageSize
}
return
resp
,
nil
}
backend/internal/repository/ops_repo_openai_token_stats_test.go
0 → 100644
View file @
dbd7969a
package
repository
import
(
"context"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/stretchr/testify/require"
)
func
TestOpsRepositoryGetOpenAITokenStats_PaginationMode
(
t
*
testing
.
T
)
{
db
,
mock
:=
newSQLMock
(
t
)
repo
:=
&
opsRepository
{
db
:
db
}
start
:=
time
.
Date
(
2026
,
1
,
1
,
0
,
0
,
0
,
0
,
time
.
UTC
)
end
:=
start
.
Add
(
24
*
time
.
Hour
)
groupID
:=
int64
(
9
)
filter
:=
&
service
.
OpsOpenAITokenStatsFilter
{
TimeRange
:
"1d"
,
StartTime
:
start
,
EndTime
:
end
,
Platform
:
" OpenAI "
,
GroupID
:
&
groupID
,
Page
:
2
,
PageSize
:
10
,
}
mock
.
ExpectQuery
(
`SELECT COUNT\(\*\) FROM stats`
)
.
WithArgs
(
start
,
end
,
groupID
,
"openai"
)
.
WillReturnRows
(
sqlmock
.
NewRows
([]
string
{
"count"
})
.
AddRow
(
int64
(
3
)))
rows
:=
sqlmock
.
NewRows
([]
string
{
"model"
,
"request_count"
,
"avg_tokens_per_sec"
,
"avg_first_token_ms"
,
"total_output_tokens"
,
"avg_duration_ms"
,
"requests_with_first_token"
,
})
.
AddRow
(
"gpt-4o-mini"
,
int64
(
20
),
21.56
,
120.34
,
int64
(
3000
),
int64
(
850
),
int64
(
18
))
.
AddRow
(
"gpt-4.1"
,
int64
(
20
),
10.2
,
240.0
,
int64
(
2500
),
int64
(
900
),
int64
(
20
))
mock
.
ExpectQuery
(
`ORDER BY request_count DESC, model ASC\s+LIMIT \$5 OFFSET \$6`
)
.
WithArgs
(
start
,
end
,
groupID
,
"openai"
,
10
,
10
)
.
WillReturnRows
(
rows
)
resp
,
err
:=
repo
.
GetOpenAITokenStats
(
context
.
Background
(),
filter
)
require
.
NoError
(
t
,
err
)
require
.
NotNil
(
t
,
resp
)
require
.
Equal
(
t
,
int64
(
3
),
resp
.
Total
)
require
.
Equal
(
t
,
2
,
resp
.
Page
)
require
.
Equal
(
t
,
10
,
resp
.
PageSize
)
require
.
Nil
(
t
,
resp
.
TopN
)
require
.
Equal
(
t
,
"openai"
,
resp
.
Platform
)
require
.
NotNil
(
t
,
resp
.
GroupID
)
require
.
Equal
(
t
,
groupID
,
*
resp
.
GroupID
)
require
.
Len
(
t
,
resp
.
Items
,
2
)
require
.
Equal
(
t
,
"gpt-4o-mini"
,
resp
.
Items
[
0
]
.
Model
)
require
.
NotNil
(
t
,
resp
.
Items
[
0
]
.
AvgTokensPerSec
)
require
.
InDelta
(
t
,
21.56
,
*
resp
.
Items
[
0
]
.
AvgTokensPerSec
,
0.0001
)
require
.
NotNil
(
t
,
resp
.
Items
[
0
]
.
AvgFirstTokenMs
)
require
.
InDelta
(
t
,
120.34
,
*
resp
.
Items
[
0
]
.
AvgFirstTokenMs
,
0.0001
)
require
.
NoError
(
t
,
mock
.
ExpectationsWereMet
())
}
func
TestOpsRepositoryGetOpenAITokenStats_TopNMode
(
t
*
testing
.
T
)
{
db
,
mock
:=
newSQLMock
(
t
)
repo
:=
&
opsRepository
{
db
:
db
}
start
:=
time
.
Date
(
2026
,
1
,
1
,
10
,
0
,
0
,
0
,
time
.
UTC
)
end
:=
start
.
Add
(
time
.
Hour
)
filter
:=
&
service
.
OpsOpenAITokenStatsFilter
{
TimeRange
:
"1h"
,
StartTime
:
start
,
EndTime
:
end
,
TopN
:
5
,
}
mock
.
ExpectQuery
(
`SELECT COUNT\(\*\) FROM stats`
)
.
WithArgs
(
start
,
end
)
.
WillReturnRows
(
sqlmock
.
NewRows
([]
string
{
"count"
})
.
AddRow
(
int64
(
1
)))
rows
:=
sqlmock
.
NewRows
([]
string
{
"model"
,
"request_count"
,
"avg_tokens_per_sec"
,
"avg_first_token_ms"
,
"total_output_tokens"
,
"avg_duration_ms"
,
"requests_with_first_token"
,
})
.
AddRow
(
"gpt-4o"
,
int64
(
5
),
nil
,
nil
,
int64
(
0
),
int64
(
0
),
int64
(
0
))
mock
.
ExpectQuery
(
`ORDER BY request_count DESC, model ASC\s+LIMIT \$3`
)
.
WithArgs
(
start
,
end
,
5
)
.
WillReturnRows
(
rows
)
resp
,
err
:=
repo
.
GetOpenAITokenStats
(
context
.
Background
(),
filter
)
require
.
NoError
(
t
,
err
)
require
.
NotNil
(
t
,
resp
)
require
.
NotNil
(
t
,
resp
.
TopN
)
require
.
Equal
(
t
,
5
,
*
resp
.
TopN
)
require
.
Equal
(
t
,
0
,
resp
.
Page
)
require
.
Equal
(
t
,
0
,
resp
.
PageSize
)
require
.
Len
(
t
,
resp
.
Items
,
1
)
require
.
Nil
(
t
,
resp
.
Items
[
0
]
.
AvgTokensPerSec
)
require
.
Nil
(
t
,
resp
.
Items
[
0
]
.
AvgFirstTokenMs
)
require
.
NoError
(
t
,
mock
.
ExpectationsWereMet
())
}
func
TestOpsRepositoryGetOpenAITokenStats_EmptyResult
(
t
*
testing
.
T
)
{
db
,
mock
:=
newSQLMock
(
t
)
repo
:=
&
opsRepository
{
db
:
db
}
start
:=
time
.
Date
(
2026
,
1
,
2
,
0
,
0
,
0
,
0
,
time
.
UTC
)
end
:=
start
.
Add
(
30
*
time
.
Minute
)
filter
:=
&
service
.
OpsOpenAITokenStatsFilter
{
TimeRange
:
"30m"
,
StartTime
:
start
,
EndTime
:
end
,
Page
:
1
,
PageSize
:
20
,
}
mock
.
ExpectQuery
(
`SELECT COUNT\(\*\) FROM stats`
)
.
WithArgs
(
start
,
end
)
.
WillReturnRows
(
sqlmock
.
NewRows
([]
string
{
"count"
})
.
AddRow
(
int64
(
0
)))
mock
.
ExpectQuery
(
`ORDER BY request_count DESC, model ASC\s+LIMIT \$3 OFFSET \$4`
)
.
WithArgs
(
start
,
end
,
20
,
0
)
.
WillReturnRows
(
sqlmock
.
NewRows
([]
string
{
"model"
,
"request_count"
,
"avg_tokens_per_sec"
,
"avg_first_token_ms"
,
"total_output_tokens"
,
"avg_duration_ms"
,
"requests_with_first_token"
,
}))
resp
,
err
:=
repo
.
GetOpenAITokenStats
(
context
.
Background
(),
filter
)
require
.
NoError
(
t
,
err
)
require
.
NotNil
(
t
,
resp
)
require
.
Equal
(
t
,
int64
(
0
),
resp
.
Total
)
require
.
Len
(
t
,
resp
.
Items
,
0
)
require
.
Equal
(
t
,
1
,
resp
.
Page
)
require
.
Equal
(
t
,
20
,
resp
.
PageSize
)
require
.
NoError
(
t
,
mock
.
ExpectationsWereMet
())
}
backend/internal/server/routes/admin.go
View file @
dbd7969a
...
...
@@ -150,6 +150,7 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
ops
.
GET
(
"/dashboard/latency-histogram"
,
h
.
Admin
.
Ops
.
GetDashboardLatencyHistogram
)
ops
.
GET
(
"/dashboard/error-trend"
,
h
.
Admin
.
Ops
.
GetDashboardErrorTrend
)
ops
.
GET
(
"/dashboard/error-distribution"
,
h
.
Admin
.
Ops
.
GetDashboardErrorDistribution
)
ops
.
GET
(
"/dashboard/openai-token-stats"
,
h
.
Admin
.
Ops
.
GetDashboardOpenAITokenStats
)
}
}
...
...
backend/internal/service/gateway_request.go
View file @
dbd7969a
...
...
@@ -189,45 +189,6 @@ func sliceRawFromBody(body []byte, r gjson.Result) []byte {
return
[]
byte
(
r
.
Raw
)
}
// parseIntegralNumber 将 JSON 解码后的数字安全转换为 int。
// 仅接受“整数值”的输入,小数/NaN/Inf/越界值都会返回 false。
func
parseIntegralNumber
(
raw
any
)
(
int
,
bool
)
{
switch
v
:=
raw
.
(
type
)
{
case
float64
:
if
math
.
IsNaN
(
v
)
||
math
.
IsInf
(
v
,
0
)
||
v
!=
math
.
Trunc
(
v
)
{
return
0
,
false
}
if
v
>
float64
(
math
.
MaxInt
)
||
v
<
float64
(
math
.
MinInt
)
{
return
0
,
false
}
return
int
(
v
),
true
case
int
:
return
v
,
true
case
int8
:
return
int
(
v
),
true
case
int16
:
return
int
(
v
),
true
case
int32
:
return
int
(
v
),
true
case
int64
:
if
v
>
int64
(
math
.
MaxInt
)
||
v
<
int64
(
math
.
MinInt
)
{
return
0
,
false
}
return
int
(
v
),
true
case
json
.
Number
:
i64
,
err
:=
v
.
Int64
()
if
err
!=
nil
{
return
0
,
false
}
if
i64
>
int64
(
math
.
MaxInt
)
||
i64
<
int64
(
math
.
MinInt
)
{
return
0
,
false
}
return
int
(
i64
),
true
default
:
return
0
,
false
}
}
// FilterThinkingBlocks removes thinking blocks from request body
// Returns filtered body or original body if filtering fails (fail-safe)
// This prevents 400 errors from invalid thinking block signatures
...
...
backend/internal/service/openai_gateway_service.go
View file @
dbd7969a
...
...
@@ -1020,6 +1020,23 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough(
reqModel
,
reqStream
,
)
if
reqStream
&&
c
!=
nil
&&
c
.
Request
!=
nil
{
if
timeoutHeaders
:=
collectOpenAIPassthroughTimeoutHeaders
(
c
.
Request
.
Header
);
len
(
timeoutHeaders
)
>
0
{
if
s
.
isOpenAIPassthroughTimeoutHeadersAllowed
()
{
log
.
Printf
(
"[WARN] [OpenAI passthrough] 透传请求包含超时相关请求头,且当前配置为放行,可能导致上游提前断流: account=%d headers=%s"
,
account
.
ID
,
strings
.
Join
(
timeoutHeaders
,
", "
),
)
}
else
{
log
.
Printf
(
"[WARN] [OpenAI passthrough] 检测到超时相关请求头,将按配置过滤以降低断流风险: account=%d headers=%s"
,
account
.
ID
,
strings
.
Join
(
timeoutHeaders
,
", "
),
)
}
}
}
// Get access token
token
,
_
,
err
:=
s
.
GetAccessToken
(
ctx
,
account
)
...
...
@@ -1135,12 +1152,16 @@ func (s *OpenAIGatewayService) buildUpstreamRequestOpenAIPassthrough(
}
// 透传客户端请求头(尽可能原样),并做安全剔除。
allowTimeoutHeaders
:=
s
.
isOpenAIPassthroughTimeoutHeadersAllowed
()
if
c
!=
nil
&&
c
.
Request
!=
nil
{
for
key
,
values
:=
range
c
.
Request
.
Header
{
lower
:=
strings
.
ToLower
(
key
)
if
isOpenAIPassthroughBlockedRequestHeader
(
lower
)
{
continue
}
if
!
allowTimeoutHeaders
&&
isOpenAIPassthroughTimeoutHeader
(
lower
)
{
continue
}
for
_
,
v
:=
range
values
{
req
.
Header
.
Add
(
key
,
v
)
}
...
...
@@ -1233,6 +1254,38 @@ func isOpenAIPassthroughBlockedRequestHeader(lowerKey string) bool {
}
}
func
isOpenAIPassthroughTimeoutHeader
(
lowerKey
string
)
bool
{
switch
lowerKey
{
case
"x-stainless-timeout"
,
"x-stainless-read-timeout"
,
"x-stainless-connect-timeout"
,
"x-request-timeout"
,
"request-timeout"
,
"grpc-timeout"
:
return
true
default
:
return
false
}
}
func
(
s
*
OpenAIGatewayService
)
isOpenAIPassthroughTimeoutHeadersAllowed
()
bool
{
return
s
!=
nil
&&
s
.
cfg
!=
nil
&&
s
.
cfg
.
Gateway
.
OpenAIPassthroughAllowTimeoutHeaders
}
func
collectOpenAIPassthroughTimeoutHeaders
(
h
http
.
Header
)
[]
string
{
if
h
==
nil
{
return
nil
}
var
matched
[]
string
for
key
,
values
:=
range
h
{
lowerKey
:=
strings
.
ToLower
(
strings
.
TrimSpace
(
key
))
if
isOpenAIPassthroughTimeoutHeader
(
lowerKey
)
{
entry
:=
lowerKey
if
len
(
values
)
>
0
{
entry
=
fmt
.
Sprintf
(
"%s=%s"
,
lowerKey
,
strings
.
Join
(
values
,
"|"
))
}
matched
=
append
(
matched
,
entry
)
}
}
sort
.
Strings
(
matched
)
return
matched
}
type
openaiStreamingResultPassthrough
struct
{
usage
*
OpenAIUsage
firstTokenMs
*
int
...
...
@@ -1265,6 +1318,8 @@ func (s *OpenAIGatewayService) handleStreamingResponsePassthrough(
usage
:=
&
OpenAIUsage
{}
var
firstTokenMs
*
int
clientDisconnected
:=
false
sawDone
:=
false
upstreamRequestID
:=
strings
.
TrimSpace
(
resp
.
Header
.
Get
(
"x-request-id"
))
scanner
:=
bufio
.
NewScanner
(
resp
.
Body
)
maxLineSize
:=
defaultMaxLineSize
...
...
@@ -1278,7 +1333,11 @@ func (s *OpenAIGatewayService) handleStreamingResponsePassthrough(
for
scanner
.
Scan
()
{
line
:=
scanner
.
Text
()
if
data
,
ok
:=
extractOpenAISSEDataLine
(
line
);
ok
{
if
firstTokenMs
==
nil
&&
strings
.
TrimSpace
(
data
)
!=
""
{
trimmedData
:=
strings
.
TrimSpace
(
data
)
if
trimmedData
==
"[DONE]"
{
sawDone
=
true
}
if
firstTokenMs
==
nil
&&
trimmedData
!=
""
&&
trimmedData
!=
"[DONE]"
{
ms
:=
int
(
time
.
Since
(
startTime
)
.
Milliseconds
())
firstTokenMs
=
&
ms
}
...
...
@@ -1300,14 +1359,34 @@ func (s *OpenAIGatewayService) handleStreamingResponsePassthrough(
return
&
openaiStreamingResultPassthrough
{
usage
:
usage
,
firstTokenMs
:
firstTokenMs
},
nil
}
if
errors
.
Is
(
err
,
context
.
Canceled
)
||
errors
.
Is
(
err
,
context
.
DeadlineExceeded
)
{
log
.
Printf
(
"[WARN] [OpenAI passthrough] 流读取被取消,可能发生断流: account=%d request_id=%s err=%v ctx_err=%v"
,
account
.
ID
,
upstreamRequestID
,
err
,
ctx
.
Err
(),
)
return
&
openaiStreamingResultPassthrough
{
usage
:
usage
,
firstTokenMs
:
firstTokenMs
},
nil
}
if
errors
.
Is
(
err
,
bufio
.
ErrTooLong
)
{
log
.
Printf
(
"[OpenAI passthrough] SSE line too long: account=%d max_size=%d error=%v"
,
account
.
ID
,
maxLineSize
,
err
)
return
&
openaiStreamingResultPassthrough
{
usage
:
usage
,
firstTokenMs
:
firstTokenMs
},
err
}
log
.
Printf
(
"[WARN] [OpenAI passthrough] 流读取异常中断: account=%d request_id=%s err=%v"
,
account
.
ID
,
upstreamRequestID
,
err
,
)
return
&
openaiStreamingResultPassthrough
{
usage
:
usage
,
firstTokenMs
:
firstTokenMs
},
fmt
.
Errorf
(
"stream read error: %w"
,
err
)
}
if
!
clientDisconnected
&&
!
sawDone
&&
ctx
.
Err
()
==
nil
{
log
.
Printf
(
"[WARN] [OpenAI passthrough] 上游流在未收到 [DONE] 时结束,疑似断流: account=%d request_id=%s"
,
account
.
ID
,
upstreamRequestID
,
)
}
return
&
openaiStreamingResultPassthrough
{
usage
:
usage
,
firstTokenMs
:
firstTokenMs
},
nil
}
...
...
backend/internal/service/openai_oauth_passthrough_test.go
View file @
dbd7969a
...
...
@@ -4,9 +4,12 @@ import (
"bytes"
"context"
"io"
"log"
"net/http"
"net/http/httptest"
"os"
"strings"
"sync"
"testing"
"time"
...
...
@@ -43,6 +46,27 @@ func (u *httpUpstreamRecorder) DoWithTLS(req *http.Request, proxyURL string, acc
return
u
.
Do
(
req
,
proxyURL
,
accountID
,
accountConcurrency
)
}
var
stdLogCaptureMu
sync
.
Mutex
func
captureStdLog
(
t
*
testing
.
T
)
(
*
bytes
.
Buffer
,
func
())
{
t
.
Helper
()
stdLogCaptureMu
.
Lock
()
buf
:=
&
bytes
.
Buffer
{}
prevWriter
:=
log
.
Writer
()
prevFlags
:=
log
.
Flags
()
log
.
SetFlags
(
0
)
log
.
SetOutput
(
buf
)
return
buf
,
func
()
{
log
.
SetOutput
(
prevWriter
)
log
.
SetFlags
(
prevFlags
)
// 防御性恢复,避免其他测试改动了底层 writer。
if
prevWriter
==
nil
{
log
.
SetOutput
(
os
.
Stderr
)
}
stdLogCaptureMu
.
Unlock
()
}
}
func
TestOpenAIGatewayService_OAuthPassthrough_StreamKeepsToolNameAndBodyUnchanged
(
t
*
testing
.
T
)
{
gin
.
SetMode
(
gin
.
TestMode
)
...
...
@@ -459,3 +483,170 @@ func TestOpenAIGatewayService_APIKeyPassthrough_PreservesBodyAndUsesResponsesEnd
require
.
Equal
(
t
,
"curl/8.0"
,
upstream
.
lastReq
.
Header
.
Get
(
"User-Agent"
))
require
.
Equal
(
t
,
"keep"
,
upstream
.
lastReq
.
Header
.
Get
(
"X-Test"
))
}
func
TestOpenAIGatewayService_OAuthPassthrough_WarnOnTimeoutHeadersForStream
(
t
*
testing
.
T
)
{
gin
.
SetMode
(
gin
.
TestMode
)
logBuf
,
restore
:=
captureStdLog
(
t
)
defer
restore
()
rec
:=
httptest
.
NewRecorder
()
c
,
_
:=
gin
.
CreateTestContext
(
rec
)
c
.
Request
=
httptest
.
NewRequest
(
http
.
MethodPost
,
"/v1/responses"
,
bytes
.
NewReader
(
nil
))
c
.
Request
.
Header
.
Set
(
"User-Agent"
,
"codex_cli_rs/0.1.0"
)
c
.
Request
.
Header
.
Set
(
"x-stainless-timeout"
,
"10000"
)
originalBody
:=
[]
byte
(
`{"model":"gpt-5.2","stream":true,"input":[{"type":"text","text":"hi"}]}`
)
resp
:=
&
http
.
Response
{
StatusCode
:
http
.
StatusOK
,
Header
:
http
.
Header
{
"Content-Type"
:
[]
string
{
"text/event-stream"
},
"X-Request-Id"
:
[]
string
{
"rid-timeout"
}},
Body
:
io
.
NopCloser
(
strings
.
NewReader
(
"data: [DONE]
\n\n
"
)),
}
upstream
:=
&
httpUpstreamRecorder
{
resp
:
resp
}
svc
:=
&
OpenAIGatewayService
{
cfg
:
&
config
.
Config
{
Gateway
:
config
.
GatewayConfig
{
ForceCodexCLI
:
false
}},
httpUpstream
:
upstream
,
}
account
:=
&
Account
{
ID
:
321
,
Name
:
"acc"
,
Platform
:
PlatformOpenAI
,
Type
:
AccountTypeOAuth
,
Concurrency
:
1
,
Credentials
:
map
[
string
]
any
{
"access_token"
:
"oauth-token"
,
"chatgpt_account_id"
:
"chatgpt-acc"
},
Extra
:
map
[
string
]
any
{
"openai_passthrough"
:
true
},
Status
:
StatusActive
,
Schedulable
:
true
,
RateMultiplier
:
f64p
(
1
),
}
_
,
err
:=
svc
.
Forward
(
context
.
Background
(),
c
,
account
,
originalBody
)
require
.
NoError
(
t
,
err
)
require
.
Contains
(
t
,
logBuf
.
String
(),
"检测到超时相关请求头,将按配置过滤以降低断流风险"
)
require
.
Contains
(
t
,
logBuf
.
String
(),
"x-stainless-timeout=10000"
)
}
func
TestOpenAIGatewayService_OAuthPassthrough_WarnWhenStreamEndsWithoutDone
(
t
*
testing
.
T
)
{
gin
.
SetMode
(
gin
.
TestMode
)
logBuf
,
restore
:=
captureStdLog
(
t
)
defer
restore
()
rec
:=
httptest
.
NewRecorder
()
c
,
_
:=
gin
.
CreateTestContext
(
rec
)
c
.
Request
=
httptest
.
NewRequest
(
http
.
MethodPost
,
"/v1/responses"
,
bytes
.
NewReader
(
nil
))
c
.
Request
.
Header
.
Set
(
"User-Agent"
,
"codex_cli_rs/0.1.0"
)
originalBody
:=
[]
byte
(
`{"model":"gpt-5.2","stream":true,"input":[{"type":"text","text":"hi"}]}`
)
// 注意:刻意不发送 [DONE],模拟上游中途断流。
resp
:=
&
http
.
Response
{
StatusCode
:
http
.
StatusOK
,
Header
:
http
.
Header
{
"Content-Type"
:
[]
string
{
"text/event-stream"
},
"X-Request-Id"
:
[]
string
{
"rid-truncate"
}},
Body
:
io
.
NopCloser
(
strings
.
NewReader
(
"data: {
\"
type
\"
:
\"
response.output_text.delta
\"
,
\"
delta
\"
:
\"
h
\"
}
\n\n
"
)),
}
upstream
:=
&
httpUpstreamRecorder
{
resp
:
resp
}
svc
:=
&
OpenAIGatewayService
{
cfg
:
&
config
.
Config
{
Gateway
:
config
.
GatewayConfig
{
ForceCodexCLI
:
false
}},
httpUpstream
:
upstream
,
}
account
:=
&
Account
{
ID
:
654
,
Name
:
"acc"
,
Platform
:
PlatformOpenAI
,
Type
:
AccountTypeOAuth
,
Concurrency
:
1
,
Credentials
:
map
[
string
]
any
{
"access_token"
:
"oauth-token"
,
"chatgpt_account_id"
:
"chatgpt-acc"
},
Extra
:
map
[
string
]
any
{
"openai_passthrough"
:
true
},
Status
:
StatusActive
,
Schedulable
:
true
,
RateMultiplier
:
f64p
(
1
),
}
_
,
err
:=
svc
.
Forward
(
context
.
Background
(),
c
,
account
,
originalBody
)
require
.
NoError
(
t
,
err
)
require
.
Contains
(
t
,
logBuf
.
String
(),
"上游流在未收到 [DONE] 时结束,疑似断流"
)
require
.
Contains
(
t
,
logBuf
.
String
(),
"rid-truncate"
)
}
func
TestOpenAIGatewayService_OAuthPassthrough_DefaultFiltersTimeoutHeaders
(
t
*
testing
.
T
)
{
gin
.
SetMode
(
gin
.
TestMode
)
rec
:=
httptest
.
NewRecorder
()
c
,
_
:=
gin
.
CreateTestContext
(
rec
)
c
.
Request
=
httptest
.
NewRequest
(
http
.
MethodPost
,
"/v1/responses"
,
bytes
.
NewReader
(
nil
))
c
.
Request
.
Header
.
Set
(
"User-Agent"
,
"codex_cli_rs/0.1.0"
)
c
.
Request
.
Header
.
Set
(
"x-stainless-timeout"
,
"120000"
)
c
.
Request
.
Header
.
Set
(
"X-Test"
,
"keep"
)
originalBody
:=
[]
byte
(
`{"model":"gpt-5.2","stream":false,"input":[{"type":"text","text":"hi"}]}`
)
resp
:=
&
http
.
Response
{
StatusCode
:
http
.
StatusOK
,
Header
:
http
.
Header
{
"Content-Type"
:
[]
string
{
"application/json"
},
"X-Request-Id"
:
[]
string
{
"rid-filter-default"
}},
Body
:
io
.
NopCloser
(
strings
.
NewReader
(
`{"output":[],"usage":{"input_tokens":1,"output_tokens":1,"input_tokens_details":{"cached_tokens":0}}}`
)),
}
upstream
:=
&
httpUpstreamRecorder
{
resp
:
resp
}
svc
:=
&
OpenAIGatewayService
{
cfg
:
&
config
.
Config
{
Gateway
:
config
.
GatewayConfig
{
ForceCodexCLI
:
false
}},
httpUpstream
:
upstream
,
}
account
:=
&
Account
{
ID
:
111
,
Name
:
"acc"
,
Platform
:
PlatformOpenAI
,
Type
:
AccountTypeOAuth
,
Concurrency
:
1
,
Credentials
:
map
[
string
]
any
{
"access_token"
:
"oauth-token"
,
"chatgpt_account_id"
:
"chatgpt-acc"
},
Extra
:
map
[
string
]
any
{
"openai_passthrough"
:
true
},
Status
:
StatusActive
,
Schedulable
:
true
,
RateMultiplier
:
f64p
(
1
),
}
_
,
err
:=
svc
.
Forward
(
context
.
Background
(),
c
,
account
,
originalBody
)
require
.
NoError
(
t
,
err
)
require
.
NotNil
(
t
,
upstream
.
lastReq
)
require
.
Empty
(
t
,
upstream
.
lastReq
.
Header
.
Get
(
"x-stainless-timeout"
))
require
.
Equal
(
t
,
"keep"
,
upstream
.
lastReq
.
Header
.
Get
(
"X-Test"
))
}
func
TestOpenAIGatewayService_OAuthPassthrough_AllowTimeoutHeadersWhenConfigured
(
t
*
testing
.
T
)
{
gin
.
SetMode
(
gin
.
TestMode
)
rec
:=
httptest
.
NewRecorder
()
c
,
_
:=
gin
.
CreateTestContext
(
rec
)
c
.
Request
=
httptest
.
NewRequest
(
http
.
MethodPost
,
"/v1/responses"
,
bytes
.
NewReader
(
nil
))
c
.
Request
.
Header
.
Set
(
"User-Agent"
,
"codex_cli_rs/0.1.0"
)
c
.
Request
.
Header
.
Set
(
"x-stainless-timeout"
,
"120000"
)
c
.
Request
.
Header
.
Set
(
"X-Test"
,
"keep"
)
originalBody
:=
[]
byte
(
`{"model":"gpt-5.2","stream":false,"input":[{"type":"text","text":"hi"}]}`
)
resp
:=
&
http
.
Response
{
StatusCode
:
http
.
StatusOK
,
Header
:
http
.
Header
{
"Content-Type"
:
[]
string
{
"application/json"
},
"X-Request-Id"
:
[]
string
{
"rid-filter-allow"
}},
Body
:
io
.
NopCloser
(
strings
.
NewReader
(
`{"output":[],"usage":{"input_tokens":1,"output_tokens":1,"input_tokens_details":{"cached_tokens":0}}}`
)),
}
upstream
:=
&
httpUpstreamRecorder
{
resp
:
resp
}
svc
:=
&
OpenAIGatewayService
{
cfg
:
&
config
.
Config
{
Gateway
:
config
.
GatewayConfig
{
ForceCodexCLI
:
false
,
OpenAIPassthroughAllowTimeoutHeaders
:
true
,
}},
httpUpstream
:
upstream
,
}
account
:=
&
Account
{
ID
:
222
,
Name
:
"acc"
,
Platform
:
PlatformOpenAI
,
Type
:
AccountTypeOAuth
,
Concurrency
:
1
,
Credentials
:
map
[
string
]
any
{
"access_token"
:
"oauth-token"
,
"chatgpt_account_id"
:
"chatgpt-acc"
},
Extra
:
map
[
string
]
any
{
"openai_passthrough"
:
true
},
Status
:
StatusActive
,
Schedulable
:
true
,
RateMultiplier
:
f64p
(
1
),
}
_
,
err
:=
svc
.
Forward
(
context
.
Background
(),
c
,
account
,
originalBody
)
require
.
NoError
(
t
,
err
)
require
.
NotNil
(
t
,
upstream
.
lastReq
)
require
.
Equal
(
t
,
"120000"
,
upstream
.
lastReq
.
Header
.
Get
(
"x-stainless-timeout"
))
require
.
Equal
(
t
,
"keep"
,
upstream
.
lastReq
.
Header
.
Get
(
"X-Test"
))
}
backend/internal/service/ops_openai_token_stats.go
0 → 100644
View file @
dbd7969a
package
service
import
(
"context"
infraerrors
"github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
func
(
s
*
OpsService
)
GetOpenAITokenStats
(
ctx
context
.
Context
,
filter
*
OpsOpenAITokenStatsFilter
)
(
*
OpsOpenAITokenStatsResponse
,
error
)
{
if
err
:=
s
.
RequireMonitoringEnabled
(
ctx
);
err
!=
nil
{
return
nil
,
err
}
if
s
.
opsRepo
==
nil
{
return
nil
,
infraerrors
.
ServiceUnavailable
(
"OPS_REPO_UNAVAILABLE"
,
"Ops repository not available"
)
}
if
filter
==
nil
{
return
nil
,
infraerrors
.
BadRequest
(
"OPS_FILTER_REQUIRED"
,
"filter is required"
)
}
if
filter
.
StartTime
.
IsZero
()
||
filter
.
EndTime
.
IsZero
()
{
return
nil
,
infraerrors
.
BadRequest
(
"OPS_TIME_RANGE_REQUIRED"
,
"start_time/end_time are required"
)
}
if
filter
.
StartTime
.
After
(
filter
.
EndTime
)
{
return
nil
,
infraerrors
.
BadRequest
(
"OPS_TIME_RANGE_INVALID"
,
"start_time must be <= end_time"
)
}
if
filter
.
GroupID
!=
nil
&&
*
filter
.
GroupID
<=
0
{
return
nil
,
infraerrors
.
BadRequest
(
"OPS_GROUP_ID_INVALID"
,
"group_id must be > 0"
)
}
// top_n cannot be mixed with page/page_size params.
if
filter
.
TopN
>
0
&&
(
filter
.
Page
>
0
||
filter
.
PageSize
>
0
)
{
return
nil
,
infraerrors
.
BadRequest
(
"OPS_PAGINATION_CONFLICT"
,
"top_n cannot be used with page/page_size"
)
}
if
filter
.
TopN
>
0
{
if
filter
.
TopN
<
1
||
filter
.
TopN
>
100
{
return
nil
,
infraerrors
.
BadRequest
(
"OPS_TOPN_INVALID"
,
"top_n must be between 1 and 100"
)
}
}
else
{
if
filter
.
Page
<=
0
{
filter
.
Page
=
1
}
if
filter
.
PageSize
<=
0
{
filter
.
PageSize
=
20
}
if
filter
.
Page
<
1
{
return
nil
,
infraerrors
.
BadRequest
(
"OPS_PAGE_INVALID"
,
"page must be >= 1"
)
}
if
filter
.
PageSize
<
1
||
filter
.
PageSize
>
100
{
return
nil
,
infraerrors
.
BadRequest
(
"OPS_PAGE_SIZE_INVALID"
,
"page_size must be between 1 and 100"
)
}
}
return
s
.
opsRepo
.
GetOpenAITokenStats
(
ctx
,
filter
)
}
backend/internal/service/ops_openai_token_stats_models.go
0 → 100644
View file @
dbd7969a
package
service
import
"time"
type
OpsOpenAITokenStatsFilter
struct
{
TimeRange
string
StartTime
time
.
Time
EndTime
time
.
Time
Platform
string
GroupID
*
int64
// Pagination mode (default): page/page_size
Page
int
PageSize
int
// TopN mode: top_n
TopN
int
}
func
(
f
*
OpsOpenAITokenStatsFilter
)
IsTopNMode
()
bool
{
return
f
!=
nil
&&
f
.
TopN
>
0
}
type
OpsOpenAITokenStatsItem
struct
{
Model
string
`json:"model"`
RequestCount
int64
`json:"request_count"`
AvgTokensPerSec
*
float64
`json:"avg_tokens_per_sec"`
AvgFirstTokenMs
*
float64
`json:"avg_first_token_ms"`
TotalOutputTokens
int64
`json:"total_output_tokens"`
AvgDurationMs
int64
`json:"avg_duration_ms"`
RequestsWithFirstToken
int64
`json:"requests_with_first_token"`
}
type
OpsOpenAITokenStatsResponse
struct
{
TimeRange
string
`json:"time_range"`
StartTime
time
.
Time
`json:"start_time"`
EndTime
time
.
Time
`json:"end_time"`
Platform
string
`json:"platform,omitempty"`
GroupID
*
int64
`json:"group_id,omitempty"`
Items
[]
*
OpsOpenAITokenStatsItem
`json:"items"`
// Total model rows before pagination/topN trimming.
Total
int64
`json:"total"`
// Pagination mode metadata.
Page
int
`json:"page,omitempty"`
PageSize
int
`json:"page_size,omitempty"`
// TopN mode metadata.
TopN
*
int
`json:"top_n,omitempty"`
}
backend/internal/service/ops_openai_token_stats_test.go
0 → 100644
View file @
dbd7969a
package
service
import
(
"context"
"testing"
"time"
infraerrors
"github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/stretchr/testify/require"
)
type
openAITokenStatsRepoStub
struct
{
OpsRepository
resp
*
OpsOpenAITokenStatsResponse
err
error
captured
*
OpsOpenAITokenStatsFilter
}
func
(
s
*
openAITokenStatsRepoStub
)
GetOpenAITokenStats
(
ctx
context
.
Context
,
filter
*
OpsOpenAITokenStatsFilter
)
(
*
OpsOpenAITokenStatsResponse
,
error
)
{
s
.
captured
=
filter
if
s
.
err
!=
nil
{
return
nil
,
s
.
err
}
if
s
.
resp
!=
nil
{
return
s
.
resp
,
nil
}
return
&
OpsOpenAITokenStatsResponse
{},
nil
}
func
TestOpsServiceGetOpenAITokenStats_Validation
(
t
*
testing
.
T
)
{
now
:=
time
.
Now
()
.
UTC
()
tests
:=
[]
struct
{
name
string
filter
*
OpsOpenAITokenStatsFilter
wantCode
int
wantReason
string
}{
{
name
:
"filter 不能为空"
,
filter
:
nil
,
wantCode
:
400
,
wantReason
:
"OPS_FILTER_REQUIRED"
,
},
{
name
:
"start_time/end_time 必填"
,
filter
:
&
OpsOpenAITokenStatsFilter
{
StartTime
:
time
.
Time
{},
EndTime
:
now
,
},
wantCode
:
400
,
wantReason
:
"OPS_TIME_RANGE_REQUIRED"
,
},
{
name
:
"start_time 不能晚于 end_time"
,
filter
:
&
OpsOpenAITokenStatsFilter
{
StartTime
:
now
,
EndTime
:
now
.
Add
(
-
1
*
time
.
Minute
),
},
wantCode
:
400
,
wantReason
:
"OPS_TIME_RANGE_INVALID"
,
},
{
name
:
"group_id 必须大于 0"
,
filter
:
&
OpsOpenAITokenStatsFilter
{
StartTime
:
now
.
Add
(
-
time
.
Hour
),
EndTime
:
now
,
GroupID
:
int64Ptr
(
0
),
},
wantCode
:
400
,
wantReason
:
"OPS_GROUP_ID_INVALID"
,
},
{
name
:
"top_n 与分页参数互斥"
,
filter
:
&
OpsOpenAITokenStatsFilter
{
StartTime
:
now
.
Add
(
-
time
.
Hour
),
EndTime
:
now
,
TopN
:
10
,
Page
:
1
,
},
wantCode
:
400
,
wantReason
:
"OPS_PAGINATION_CONFLICT"
,
},
{
name
:
"top_n 参数越界"
,
filter
:
&
OpsOpenAITokenStatsFilter
{
StartTime
:
now
.
Add
(
-
time
.
Hour
),
EndTime
:
now
,
TopN
:
101
,
},
wantCode
:
400
,
wantReason
:
"OPS_TOPN_INVALID"
,
},
{
name
:
"page_size 参数越界"
,
filter
:
&
OpsOpenAITokenStatsFilter
{
StartTime
:
now
.
Add
(
-
time
.
Hour
),
EndTime
:
now
,
Page
:
1
,
PageSize
:
101
,
},
wantCode
:
400
,
wantReason
:
"OPS_PAGE_SIZE_INVALID"
,
},
}
for
_
,
tt
:=
range
tests
{
t
.
Run
(
tt
.
name
,
func
(
t
*
testing
.
T
)
{
svc
:=
&
OpsService
{
opsRepo
:
&
openAITokenStatsRepoStub
{},
}
_
,
err
:=
svc
.
GetOpenAITokenStats
(
context
.
Background
(),
tt
.
filter
)
require
.
Error
(
t
,
err
)
require
.
Equal
(
t
,
tt
.
wantCode
,
infraerrors
.
Code
(
err
))
require
.
Equal
(
t
,
tt
.
wantReason
,
infraerrors
.
Reason
(
err
))
})
}
}
func
TestOpsServiceGetOpenAITokenStats_DefaultPagination
(
t
*
testing
.
T
)
{
now
:=
time
.
Now
()
.
UTC
()
repo
:=
&
openAITokenStatsRepoStub
{
resp
:
&
OpsOpenAITokenStatsResponse
{
Items
:
[]
*
OpsOpenAITokenStatsItem
{
{
Model
:
"gpt-4o-mini"
,
RequestCount
:
10
},
},
Total
:
1
,
},
}
svc
:=
&
OpsService
{
opsRepo
:
repo
}
filter
:=
&
OpsOpenAITokenStatsFilter
{
TimeRange
:
"30d"
,
StartTime
:
now
.
Add
(
-
30
*
24
*
time
.
Hour
),
EndTime
:
now
,
}
resp
,
err
:=
svc
.
GetOpenAITokenStats
(
context
.
Background
(),
filter
)
require
.
NoError
(
t
,
err
)
require
.
NotNil
(
t
,
resp
)
require
.
NotNil
(
t
,
repo
.
captured
)
require
.
Equal
(
t
,
1
,
repo
.
captured
.
Page
)
require
.
Equal
(
t
,
20
,
repo
.
captured
.
PageSize
)
require
.
Equal
(
t
,
0
,
repo
.
captured
.
TopN
)
}
func
TestOpsServiceGetOpenAITokenStats_RepoUnavailable
(
t
*
testing
.
T
)
{
now
:=
time
.
Now
()
.
UTC
()
svc
:=
&
OpsService
{}
_
,
err
:=
svc
.
GetOpenAITokenStats
(
context
.
Background
(),
&
OpsOpenAITokenStatsFilter
{
TimeRange
:
"1h"
,
StartTime
:
now
.
Add
(
-
time
.
Hour
),
EndTime
:
now
,
TopN
:
10
,
})
require
.
Error
(
t
,
err
)
require
.
Equal
(
t
,
503
,
infraerrors
.
Code
(
err
))
require
.
Equal
(
t
,
"OPS_REPO_UNAVAILABLE"
,
infraerrors
.
Reason
(
err
))
}
func
int64Ptr
(
v
int64
)
*
int64
{
return
&
v
}
backend/internal/service/ops_port.go
View file @
dbd7969a
...
...
@@ -27,6 +27,7 @@ type OpsRepository interface {
GetLatencyHistogram
(
ctx
context
.
Context
,
filter
*
OpsDashboardFilter
)
(
*
OpsLatencyHistogramResponse
,
error
)
GetErrorTrend
(
ctx
context
.
Context
,
filter
*
OpsDashboardFilter
,
bucketSeconds
int
)
(
*
OpsErrorTrendResponse
,
error
)
GetErrorDistribution
(
ctx
context
.
Context
,
filter
*
OpsDashboardFilter
)
(
*
OpsErrorDistributionResponse
,
error
)
GetOpenAITokenStats
(
ctx
context
.
Context
,
filter
*
OpsOpenAITokenStatsFilter
)
(
*
OpsOpenAITokenStatsResponse
,
error
)
InsertSystemMetrics
(
ctx
context
.
Context
,
input
*
OpsInsertSystemMetricsInput
)
error
GetLatestSystemMetrics
(
ctx
context
.
Context
,
windowMinutes
int
)
(
*
OpsSystemMetricsSnapshot
,
error
)
...
...
backend/internal/service/parse_integral_number_unit.go
0 → 100644
View file @
dbd7969a
//go:build unit
package
service
import
(
"encoding/json"
"math"
)
// parseIntegralNumber 将 JSON 解码后的数字安全转换为 int。
// 仅接受“整数值”的输入,小数/NaN/Inf/越界值都会返回 false。
//
// 说明:
// - 该函数当前仅用于 unit 测试中的 map-based 解析逻辑验证,因此放在 unit build tag 下,
// 避免在默认构建中触发 unused lint。
func
parseIntegralNumber
(
raw
any
)
(
int
,
bool
)
{
switch
v
:=
raw
.
(
type
)
{
case
float64
:
if
math
.
IsNaN
(
v
)
||
math
.
IsInf
(
v
,
0
)
||
v
!=
math
.
Trunc
(
v
)
{
return
0
,
false
}
if
v
>
float64
(
math
.
MaxInt
)
||
v
<
float64
(
math
.
MinInt
)
{
return
0
,
false
}
return
int
(
v
),
true
case
int
:
return
v
,
true
case
int8
:
return
int
(
v
),
true
case
int16
:
return
int
(
v
),
true
case
int32
:
return
int
(
v
),
true
case
int64
:
if
v
>
int64
(
math
.
MaxInt
)
||
v
<
int64
(
math
.
MinInt
)
{
return
0
,
false
}
return
int
(
v
),
true
case
json
.
Number
:
i64
,
err
:=
v
.
Int64
()
if
err
!=
nil
{
return
0
,
false
}
if
i64
>
int64
(
math
.
MaxInt
)
||
i64
<
int64
(
math
.
MinInt
)
{
return
0
,
false
}
return
int
(
i64
),
true
default
:
return
0
,
false
}
}
deploy/config.example.yaml
View file @
dbd7969a
...
...
@@ -187,6 +187,9 @@ gateway:
#
# 注意:开启后会影响所有客户端的行为(不仅限于 VS Code / Codex CLI),请谨慎开启。
force_codex_cli
:
false
# OpenAI 透传模式是否放行客户端超时头(如 x-stainless-timeout)
# 默认 false:过滤超时头,降低上游提前断流风险。
openai_passthrough_allow_timeout_headers
:
false
# HTTP upstream connection pool settings (HTTP/2 + multi-proxy scenario defaults)
# HTTP 上游连接池配置(HTTP/2 + 多代理场景默认值)
# Max idle connections across all hosts
...
...
frontend/src/api/admin/ops.ts
View file @
dbd7969a
...
...
@@ -259,6 +259,40 @@ export interface OpsErrorDistributionResponse {
items
:
OpsErrorDistributionItem
[]
}
export
type
OpsOpenAITokenStatsTimeRange
=
'
30m
'
|
'
1h
'
|
'
1d
'
|
'
15d
'
|
'
30d
'
export
interface
OpsOpenAITokenStatsItem
{
model
:
string
request_count
:
number
avg_tokens_per_sec
?:
number
|
null
avg_first_token_ms
?:
number
|
null
total_output_tokens
:
number
avg_duration_ms
:
number
requests_with_first_token
:
number
}
export
interface
OpsOpenAITokenStatsResponse
{
time_range
:
OpsOpenAITokenStatsTimeRange
start_time
:
string
end_time
:
string
platform
?:
string
group_id
?:
number
|
null
items
:
OpsOpenAITokenStatsItem
[]
total
:
number
page
?:
number
page_size
?:
number
top_n
?:
number
|
null
}
export
interface
OpsOpenAITokenStatsParams
{
time_range
?:
OpsOpenAITokenStatsTimeRange
platform
?:
string
group_id
?:
number
|
null
page
?:
number
page_size
?:
number
top_n
?:
number
}
export
interface
OpsSystemMetricsSnapshot
{
id
:
number
created_at
:
string
...
...
@@ -971,6 +1005,17 @@ export async function getErrorDistribution(
return
data
}
export
async
function
getOpenAITokenStats
(
params
:
OpsOpenAITokenStatsParams
,
options
:
OpsRequestOptions
=
{}
):
Promise
<
OpsOpenAITokenStatsResponse
>
{
const
{
data
}
=
await
apiClient
.
get
<
OpsOpenAITokenStatsResponse
>
(
'
/admin/ops/dashboard/openai-token-stats
'
,
{
params
,
signal
:
options
.
signal
})
return
data
}
export
type
OpsErrorListView
=
'
errors
'
|
'
excluded
'
|
'
all
'
export
type
OpsErrorListQueryParams
=
{
...
...
@@ -1188,6 +1233,7 @@ export const opsAPI = {
getLatencyHistogram
,
getErrorTrend
,
getErrorDistribution
,
getOpenAITokenStats
,
getConcurrencyStats
,
getUserConcurrencyStats
,
getAccountAvailabilityStats
,
...
...
frontend/src/i18n/locales/en.ts
View file @
dbd7969a
...
...
@@ -2508,11 +2508,33 @@ export default {
'
5m
'
:
'
Last 5 minutes
'
,
'
30m
'
:
'
Last 30 minutes
'
,
'
1h
'
:
'
Last 1 hour
'
,
'
1d
'
:
'
Last 1 day
'
,
'
15d
'
:
'
Last 15 days
'
,
'
6h
'
:
'
Last 6 hours
'
,
'
24h
'
:
'
Last 24 hours
'
,
'
7d
'
:
'
Last 7 days
'
,
'
30d
'
:
'
Last 30 days
'
},
openaiTokenStats
:
{
title
:
'
OpenAI Token Request Stats
'
,
viewModeTopN
:
'
TopN
'
,
viewModePagination
:
'
Pagination
'
,
prevPage
:
'
Previous
'
,
nextPage
:
'
Next
'
,
pageInfo
:
'
Page {page}/{total}
'
,
totalModels
:
'
Total models: {total}
'
,
failedToLoad
:
'
Failed to load OpenAI token stats
'
,
empty
:
'
No OpenAI token stats for the current filters
'
,
table
:
{
model
:
'
Model
'
,
requestCount
:
'
Requests
'
,
avgTokensPerSec
:
'
Avg Tokens/sec
'
,
avgFirstTokenMs
:
'
Avg First Token Latency (ms)
'
,
totalOutputTokens
:
'
Total Output Tokens
'
,
avgDurationMs
:
'
Avg Duration (ms)
'
,
requestsWithFirstToken
:
'
Requests With First Token
'
}
},
fullscreen
:
{
enter
:
'
Enter Fullscreen
'
},
...
...
frontend/src/i18n/locales/zh.ts
View file @
dbd7969a
...
...
@@ -2675,12 +2675,34 @@ export default {
'
5m
'
:
'
近5分钟
'
,
'
30m
'
:
'
近30分钟
'
,
'
1h
'
:
'
近1小时
'
,
'
1d
'
:
'
近1天
'
,
'
15d
'
:
'
近15天
'
,
'
6h
'
:
'
近6小时
'
,
'
24h
'
:
'
近24小时
'
,
'
7d
'
:
'
近7天
'
,
'
30d
'
:
'
近30天
'
,
custom
:
'
自定义
'
},
openaiTokenStats
:
{
title
:
'
OpenAI Token 请求统计
'
,
viewModeTopN
:
'
TopN
'
,
viewModePagination
:
'
分页
'
,
prevPage
:
'
上一页
'
,
nextPage
:
'
下一页
'
,
pageInfo
:
'
第 {page}/{total} 页
'
,
totalModels
:
'
模型总数:{total}
'
,
failedToLoad
:
'
加载 OpenAI Token 统计失败
'
,
empty
:
'
当前筛选条件下暂无 OpenAI Token 请求统计数据
'
,
table
:
{
model
:
'
模型
'
,
requestCount
:
'
请求数
'
,
avgTokensPerSec
:
'
平均 Tokens/秒
'
,
avgFirstTokenMs
:
'
平均首 Token 延迟(ms)
'
,
totalOutputTokens
:
'
输出 Token 总数
'
,
avgDurationMs
:
'
平均时长(ms)
'
,
requestsWithFirstToken
:
'
首 Token 样本数
'
}
},
customTimeRange
:
{
startTime
:
'
开始时间
'
,
endTime
:
'
结束时间
'
...
...
Prev
1
2
Next
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment