Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
陈曦
sub2api
Commits
9dcd3cd4
Unverified
Commit
9dcd3cd4
authored
Mar 04, 2026
by
Wesley Liddick
Committed by
GitHub
Mar 04, 2026
Browse files
Merge pull request #754 from xvhuan/perf/admin-core-large-dataset
perf(admin): 优化后台大数据场景加载性能(仪表盘/用户/账号/Ops)
parents
49767ccc
f6fe5b55
Changes
27
Hide whitespace changes
Inline
Side-by-side
backend/internal/config/config.go
View file @
9dcd3cd4
...
@@ -1227,7 +1227,7 @@ func setDefaults() {
...
@@ -1227,7 +1227,7 @@ func setDefaults() {
// Ops (vNext)
// Ops (vNext)
viper
.
SetDefault
(
"ops.enabled"
,
true
)
viper
.
SetDefault
(
"ops.enabled"
,
true
)
viper
.
SetDefault
(
"ops.use_preaggregated_tables"
,
fals
e
)
viper
.
SetDefault
(
"ops.use_preaggregated_tables"
,
tru
e
)
viper
.
SetDefault
(
"ops.cleanup.enabled"
,
true
)
viper
.
SetDefault
(
"ops.cleanup.enabled"
,
true
)
viper
.
SetDefault
(
"ops.cleanup.schedule"
,
"0 2 * * *"
)
viper
.
SetDefault
(
"ops.cleanup.schedule"
,
"0 2 * * *"
)
// Retention days: vNext defaults to 30 days across ops datasets.
// Retention days: vNext defaults to 30 days across ops datasets.
...
...
backend/internal/handler/admin/account_handler.go
View file @
9dcd3cd4
...
@@ -217,6 +217,7 @@ func (h *AccountHandler) List(c *gin.Context) {
...
@@ -217,6 +217,7 @@ func (h *AccountHandler) List(c *gin.Context) {
if
len
(
search
)
>
100
{
if
len
(
search
)
>
100
{
search
=
search
[
:
100
]
search
=
search
[
:
100
]
}
}
lite
:=
parseBoolQueryWithDefault
(
c
.
Query
(
"lite"
),
false
)
var
groupID
int64
var
groupID
int64
if
groupIDStr
:=
c
.
Query
(
"group"
);
groupIDStr
!=
""
{
if
groupIDStr
:=
c
.
Query
(
"group"
);
groupIDStr
!=
""
{
...
@@ -235,80 +236,81 @@ func (h *AccountHandler) List(c *gin.Context) {
...
@@ -235,80 +236,81 @@ func (h *AccountHandler) List(c *gin.Context) {
accountIDs
[
i
]
=
acc
.
ID
accountIDs
[
i
]
=
acc
.
ID
}
}
concurrencyCounts
,
err
:=
h
.
concurrencyService
.
GetAccountConcurrencyBatch
(
c
.
Request
.
Context
(),
accountIDs
)
concurrencyCounts
:=
make
(
map
[
int64
]
int
)
if
err
!=
nil
{
// Log error but don't fail the request, just use 0 for all
concurrencyCounts
=
make
(
map
[
int64
]
int
)
}
// 识别需要查询窗口费用、会话数和 RPM 的账号(Anthropic OAuth/SetupToken 且启用了相应功能)
windowCostAccountIDs
:=
make
([]
int64
,
0
)
sessionLimitAccountIDs
:=
make
([]
int64
,
0
)
rpmAccountIDs
:=
make
([]
int64
,
0
)
sessionIdleTimeouts
:=
make
(
map
[
int64
]
time
.
Duration
)
// 各账号的会话空闲超时配置
for
i
:=
range
accounts
{
acc
:=
&
accounts
[
i
]
if
acc
.
IsAnthropicOAuthOrSetupToken
()
{
if
acc
.
GetWindowCostLimit
()
>
0
{
windowCostAccountIDs
=
append
(
windowCostAccountIDs
,
acc
.
ID
)
}
if
acc
.
GetMaxSessions
()
>
0
{
sessionLimitAccountIDs
=
append
(
sessionLimitAccountIDs
,
acc
.
ID
)
sessionIdleTimeouts
[
acc
.
ID
]
=
time
.
Duration
(
acc
.
GetSessionIdleTimeoutMinutes
())
*
time
.
Minute
}
if
acc
.
GetBaseRPM
()
>
0
{
rpmAccountIDs
=
append
(
rpmAccountIDs
,
acc
.
ID
)
}
}
}
// 并行获取窗口费用、活跃会话数和 RPM 计数
var
windowCosts
map
[
int64
]
float64
var
windowCosts
map
[
int64
]
float64
var
activeSessions
map
[
int64
]
int
var
activeSessions
map
[
int64
]
int
var
rpmCounts
map
[
int64
]
int
var
rpmCounts
map
[
int64
]
int
if
!
lite
{
//
获取 RPM 计数(批量查询)
//
Get current concurrency counts for all accounts
if
len
(
rpmAccountIDs
)
>
0
&&
h
.
rpmCach
e
!=
nil
{
if
h
.
concurrencyServic
e
!=
nil
{
rpmCounts
,
_
=
h
.
rpmCache
.
GetRPM
Batch
(
c
.
Request
.
Context
(),
rpmA
ccountIDs
)
if
cc
,
ccErr
:=
h
.
concurrencyService
.
GetAccountConcurrency
Batch
(
c
.
Request
.
Context
(),
a
ccountIDs
)
;
ccErr
==
nil
&&
cc
!=
nil
{
if
rpm
Counts
=
=
nil
{
concurrency
Counts
=
cc
rpmCounts
=
make
(
map
[
int64
]
int
)
}
}
}
}
// 识别需要查询窗口费用、会话数和 RPM 的账号(Anthropic OAuth/SetupToken 且启用了相应功能)
windowCostAccountIDs
:=
make
([]
int64
,
0
)
// 获取活跃会话数(批量查询,传入各账号的 idleTimeout 配置)
sessionLimitAccountIDs
:=
make
([]
int64
,
0
)
if
len
(
sessionLimitAccountIDs
)
>
0
&&
h
.
sessionLimitCache
!=
nil
{
rpmAccountIDs
:=
make
([]
int64
,
0
)
activeSessions
,
_
=
h
.
sessionLimitCache
.
GetActiveSessionCountBatch
(
c
.
Request
.
Context
(),
sessionLimitAccountIDs
,
sessionIdleTimeouts
)
sessionIdleTimeouts
:=
make
(
map
[
int64
]
time
.
Duration
)
// 各账号的会话空闲超时配置
if
activeSessions
==
nil
{
for
i
:=
range
accounts
{
activeSessions
=
make
(
map
[
int64
]
int
)
acc
:=
&
accounts
[
i
]
if
acc
.
IsAnthropicOAuthOrSetupToken
()
{
if
acc
.
GetWindowCostLimit
()
>
0
{
windowCostAccountIDs
=
append
(
windowCostAccountIDs
,
acc
.
ID
)
}
if
acc
.
GetMaxSessions
()
>
0
{
sessionLimitAccountIDs
=
append
(
sessionLimitAccountIDs
,
acc
.
ID
)
sessionIdleTimeouts
[
acc
.
ID
]
=
time
.
Duration
(
acc
.
GetSessionIdleTimeoutMinutes
())
*
time
.
Minute
}
if
acc
.
GetBaseRPM
()
>
0
{
rpmAccountIDs
=
append
(
rpmAccountIDs
,
acc
.
ID
)
}
}
}
}
}
// 获取窗口费用(并行查询)
// 获取 RPM 计数(批量查询)
if
len
(
windowCostAccountIDs
)
>
0
{
if
len
(
rpmAccountIDs
)
>
0
&&
h
.
rpmCache
!=
nil
{
windowCosts
=
make
(
map
[
int64
]
float64
)
rpmCounts
,
_
=
h
.
rpmCache
.
GetRPMBatch
(
c
.
Request
.
Context
(),
rpmAccountIDs
)
var
mu
sync
.
Mutex
if
rpmCounts
==
nil
{
g
,
gctx
:=
errgroup
.
WithContext
(
c
.
Request
.
Context
())
rpmCounts
=
make
(
map
[
int64
]
int
)
g
.
SetLimit
(
10
)
// 限制并发数
}
}
for
i
:=
range
accounts
{
// 获取活跃会话数(批量查询,传入各账号的 idleTimeout 配置)
acc
:=
&
accounts
[
i
]
if
len
(
sessionLimitAccountIDs
)
>
0
&&
h
.
sessionLimitCache
!=
nil
{
if
!
acc
.
IsAnthropicOAuthOrSetupToken
()
||
acc
.
GetWindowCostLimit
()
<=
0
{
activeSessions
,
_
=
h
.
sessionLimitCache
.
GetActiveSessionCountBatch
(
c
.
Request
.
Context
(),
sessionLimitAccountIDs
,
sessionIdleTimeouts
)
continue
if
activeSessions
==
nil
{
activeSessions
=
make
(
map
[
int64
]
int
)
}
}
accCopy
:=
acc
// 闭包捕获
}
g
.
Go
(
func
()
error
{
// 使用统一的窗口开始时间计算逻辑(考虑窗口过期情况)
// 获取窗口费用(并行查询)
startTime
:=
accCopy
.
GetCurrentWindowStartTime
()
if
len
(
windowCostAccountIDs
)
>
0
{
stats
,
err
:=
h
.
accountUsageService
.
GetAccountWindowStats
(
gctx
,
accCopy
.
ID
,
startTime
)
windowCosts
=
make
(
map
[
int64
]
float64
)
if
err
==
nil
&&
stats
!=
nil
{
var
mu
sync
.
Mutex
mu
.
Lock
()
g
,
gctx
:=
errgroup
.
WithContext
(
c
.
Request
.
Context
())
windowCosts
[
accCopy
.
ID
]
=
stats
.
StandardCost
// 使用标准费用
g
.
SetLimit
(
10
)
// 限制并发数
mu
.
Unlock
()
for
i
:=
range
accounts
{
acc
:=
&
accounts
[
i
]
if
!
acc
.
IsAnthropicOAuthOrSetupToken
()
||
acc
.
GetWindowCostLimit
()
<=
0
{
continue
}
}
return
nil
// 不返回错误,允许部分失败
accCopy
:=
acc
// 闭包捕获
})
g
.
Go
(
func
()
error
{
// 使用统一的窗口开始时间计算逻辑(考虑窗口过期情况)
startTime
:=
accCopy
.
GetCurrentWindowStartTime
()
stats
,
err
:=
h
.
accountUsageService
.
GetAccountWindowStats
(
gctx
,
accCopy
.
ID
,
startTime
)
if
err
==
nil
&&
stats
!=
nil
{
mu
.
Lock
()
windowCosts
[
accCopy
.
ID
]
=
stats
.
StandardCost
// 使用标准费用
mu
.
Unlock
()
}
return
nil
// 不返回错误,允许部分失败
})
}
_
=
g
.
Wait
()
}
}
_
=
g
.
Wait
()
}
}
// Build response with concurrency info
// Build response with concurrency info
...
@@ -344,7 +346,7 @@ func (h *AccountHandler) List(c *gin.Context) {
...
@@ -344,7 +346,7 @@ func (h *AccountHandler) List(c *gin.Context) {
result
[
i
]
=
item
result
[
i
]
=
item
}
}
etag
:=
buildAccountsListETag
(
result
,
total
,
page
,
pageSize
,
platform
,
accountType
,
status
,
search
)
etag
:=
buildAccountsListETag
(
result
,
total
,
page
,
pageSize
,
platform
,
accountType
,
status
,
search
,
lite
)
if
etag
!=
""
{
if
etag
!=
""
{
c
.
Header
(
"ETag"
,
etag
)
c
.
Header
(
"ETag"
,
etag
)
c
.
Header
(
"Vary"
,
"If-None-Match"
)
c
.
Header
(
"Vary"
,
"If-None-Match"
)
...
@@ -362,6 +364,7 @@ func buildAccountsListETag(
...
@@ -362,6 +364,7 @@ func buildAccountsListETag(
total
int64
,
total
int64
,
page
,
pageSize
int
,
page
,
pageSize
int
,
platform
,
accountType
,
status
,
search
string
,
platform
,
accountType
,
status
,
search
string
,
lite
bool
,
)
string
{
)
string
{
payload
:=
struct
{
payload
:=
struct
{
Total
int64
`json:"total"`
Total
int64
`json:"total"`
...
@@ -371,6 +374,7 @@ func buildAccountsListETag(
...
@@ -371,6 +374,7 @@ func buildAccountsListETag(
AccountType
string
`json:"type"`
AccountType
string
`json:"type"`
Status
string
`json:"status"`
Status
string
`json:"status"`
Search
string
`json:"search"`
Search
string
`json:"search"`
Lite
bool
`json:"lite"`
Items
[]
AccountWithConcurrency
`json:"items"`
Items
[]
AccountWithConcurrency
`json:"items"`
}{
}{
Total
:
total
,
Total
:
total
,
...
@@ -380,6 +384,7 @@ func buildAccountsListETag(
...
@@ -380,6 +384,7 @@ func buildAccountsListETag(
AccountType
:
accountType
,
AccountType
:
accountType
,
Status
:
status
,
Status
:
status
,
Search
:
search
,
Search
:
search
,
Lite
:
lite
,
Items
:
items
,
Items
:
items
,
}
}
raw
,
err
:=
json
.
Marshal
(
payload
)
raw
,
err
:=
json
.
Marshal
(
payload
)
...
@@ -1398,18 +1403,41 @@ func (h *AccountHandler) GetBatchTodayStats(c *gin.Context) {
...
@@ -1398,18 +1403,41 @@ func (h *AccountHandler) GetBatchTodayStats(c *gin.Context) {
return
return
}
}
if
len
(
req
.
AccountIDs
)
==
0
{
accountIDs
:=
normalizeAccountIDList
(
req
.
AccountIDs
)
if
len
(
accountIDs
)
==
0
{
response
.
Success
(
c
,
gin
.
H
{
"stats"
:
map
[
string
]
any
{}})
response
.
Success
(
c
,
gin
.
H
{
"stats"
:
map
[
string
]
any
{}})
return
return
}
}
stats
,
err
:=
h
.
accountUsageService
.
GetTodayStatsBatch
(
c
.
Request
.
Context
(),
req
.
AccountIDs
)
cacheKey
:=
buildAccountTodayStatsBatchCacheKey
(
accountIDs
)
if
cached
,
ok
:=
accountTodayStatsBatchCache
.
Get
(
cacheKey
);
ok
{
if
cached
.
ETag
!=
""
{
c
.
Header
(
"ETag"
,
cached
.
ETag
)
c
.
Header
(
"Vary"
,
"If-None-Match"
)
if
ifNoneMatchMatched
(
c
.
GetHeader
(
"If-None-Match"
),
cached
.
ETag
)
{
c
.
Status
(
http
.
StatusNotModified
)
return
}
}
c
.
Header
(
"X-Snapshot-Cache"
,
"hit"
)
response
.
Success
(
c
,
cached
.
Payload
)
return
}
stats
,
err
:=
h
.
accountUsageService
.
GetTodayStatsBatch
(
c
.
Request
.
Context
(),
accountIDs
)
if
err
!=
nil
{
if
err
!=
nil
{
response
.
ErrorFrom
(
c
,
err
)
response
.
ErrorFrom
(
c
,
err
)
return
return
}
}
response
.
Success
(
c
,
gin
.
H
{
"stats"
:
stats
})
payload
:=
gin
.
H
{
"stats"
:
stats
}
cached
:=
accountTodayStatsBatchCache
.
Set
(
cacheKey
,
payload
)
if
cached
.
ETag
!=
""
{
c
.
Header
(
"ETag"
,
cached
.
ETag
)
c
.
Header
(
"Vary"
,
"If-None-Match"
)
}
c
.
Header
(
"X-Snapshot-Cache"
,
"miss"
)
response
.
Success
(
c
,
payload
)
}
}
// SetSchedulableRequest represents the request body for setting schedulable status
// SetSchedulableRequest represents the request body for setting schedulable status
...
...
backend/internal/handler/admin/account_today_stats_cache.go
0 → 100644
View file @
9dcd3cd4
package
admin
import
(
"sort"
"strconv"
"strings"
"time"
)
var
accountTodayStatsBatchCache
=
newSnapshotCache
(
30
*
time
.
Second
)
func
normalizeAccountIDList
(
accountIDs
[]
int64
)
[]
int64
{
if
len
(
accountIDs
)
==
0
{
return
nil
}
seen
:=
make
(
map
[
int64
]
struct
{},
len
(
accountIDs
))
out
:=
make
([]
int64
,
0
,
len
(
accountIDs
))
for
_
,
id
:=
range
accountIDs
{
if
id
<=
0
{
continue
}
if
_
,
ok
:=
seen
[
id
];
ok
{
continue
}
seen
[
id
]
=
struct
{}{}
out
=
append
(
out
,
id
)
}
sort
.
Slice
(
out
,
func
(
i
,
j
int
)
bool
{
return
out
[
i
]
<
out
[
j
]
})
return
out
}
func
buildAccountTodayStatsBatchCacheKey
(
accountIDs
[]
int64
)
string
{
if
len
(
accountIDs
)
==
0
{
return
"accounts_today_stats_empty"
}
var
b
strings
.
Builder
b
.
Grow
(
len
(
accountIDs
)
*
6
)
_
,
_
=
b
.
WriteString
(
"accounts_today_stats:"
)
for
i
,
id
:=
range
accountIDs
{
if
i
>
0
{
_
=
b
.
WriteByte
(
','
)
}
_
,
_
=
b
.
WriteString
(
strconv
.
FormatInt
(
id
,
10
))
}
return
b
.
String
()
}
backend/internal/handler/admin/dashboard_handler.go
View file @
9dcd3cd4
package
admin
package
admin
import
(
import
(
"encoding/json"
"errors"
"errors"
"strconv"
"strconv"
"strings"
"strings"
...
@@ -460,6 +461,9 @@ type BatchUsersUsageRequest struct {
...
@@ -460,6 +461,9 @@ type BatchUsersUsageRequest struct {
UserIDs
[]
int64
`json:"user_ids" binding:"required"`
UserIDs
[]
int64
`json:"user_ids" binding:"required"`
}
}
var
dashboardBatchUsersUsageCache
=
newSnapshotCache
(
30
*
time
.
Second
)
var
dashboardBatchAPIKeysUsageCache
=
newSnapshotCache
(
30
*
time
.
Second
)
// GetBatchUsersUsage handles getting usage stats for multiple users
// GetBatchUsersUsage handles getting usage stats for multiple users
// POST /api/v1/admin/dashboard/users-usage
// POST /api/v1/admin/dashboard/users-usage
func
(
h
*
DashboardHandler
)
GetBatchUsersUsage
(
c
*
gin
.
Context
)
{
func
(
h
*
DashboardHandler
)
GetBatchUsersUsage
(
c
*
gin
.
Context
)
{
...
@@ -469,18 +473,34 @@ func (h *DashboardHandler) GetBatchUsersUsage(c *gin.Context) {
...
@@ -469,18 +473,34 @@ func (h *DashboardHandler) GetBatchUsersUsage(c *gin.Context) {
return
return
}
}
if
len
(
req
.
UserIDs
)
==
0
{
userIDs
:=
normalizeInt64IDList
(
req
.
UserIDs
)
if
len
(
userIDs
)
==
0
{
response
.
Success
(
c
,
gin
.
H
{
"stats"
:
map
[
string
]
any
{}})
response
.
Success
(
c
,
gin
.
H
{
"stats"
:
map
[
string
]
any
{}})
return
return
}
}
stats
,
err
:=
h
.
dashboardService
.
GetBatchUserUsageStats
(
c
.
Request
.
Context
(),
req
.
UserIDs
,
time
.
Time
{},
time
.
Time
{})
keyRaw
,
_
:=
json
.
Marshal
(
struct
{
UserIDs
[]
int64
`json:"user_ids"`
}{
UserIDs
:
userIDs
,
})
cacheKey
:=
string
(
keyRaw
)
if
cached
,
ok
:=
dashboardBatchUsersUsageCache
.
Get
(
cacheKey
);
ok
{
c
.
Header
(
"X-Snapshot-Cache"
,
"hit"
)
response
.
Success
(
c
,
cached
.
Payload
)
return
}
stats
,
err
:=
h
.
dashboardService
.
GetBatchUserUsageStats
(
c
.
Request
.
Context
(),
userIDs
,
time
.
Time
{},
time
.
Time
{})
if
err
!=
nil
{
if
err
!=
nil
{
response
.
Error
(
c
,
500
,
"Failed to get user usage stats"
)
response
.
Error
(
c
,
500
,
"Failed to get user usage stats"
)
return
return
}
}
response
.
Success
(
c
,
gin
.
H
{
"stats"
:
stats
})
payload
:=
gin
.
H
{
"stats"
:
stats
}
dashboardBatchUsersUsageCache
.
Set
(
cacheKey
,
payload
)
c
.
Header
(
"X-Snapshot-Cache"
,
"miss"
)
response
.
Success
(
c
,
payload
)
}
}
// BatchAPIKeysUsageRequest represents the request body for batch api key usage stats
// BatchAPIKeysUsageRequest represents the request body for batch api key usage stats
...
@@ -497,16 +517,32 @@ func (h *DashboardHandler) GetBatchAPIKeysUsage(c *gin.Context) {
...
@@ -497,16 +517,32 @@ func (h *DashboardHandler) GetBatchAPIKeysUsage(c *gin.Context) {
return
return
}
}
if
len
(
req
.
APIKeyIDs
)
==
0
{
apiKeyIDs
:=
normalizeInt64IDList
(
req
.
APIKeyIDs
)
if
len
(
apiKeyIDs
)
==
0
{
response
.
Success
(
c
,
gin
.
H
{
"stats"
:
map
[
string
]
any
{}})
response
.
Success
(
c
,
gin
.
H
{
"stats"
:
map
[
string
]
any
{}})
return
return
}
}
stats
,
err
:=
h
.
dashboardService
.
GetBatchAPIKeyUsageStats
(
c
.
Request
.
Context
(),
req
.
APIKeyIDs
,
time
.
Time
{},
time
.
Time
{})
keyRaw
,
_
:=
json
.
Marshal
(
struct
{
APIKeyIDs
[]
int64
`json:"api_key_ids"`
}{
APIKeyIDs
:
apiKeyIDs
,
})
cacheKey
:=
string
(
keyRaw
)
if
cached
,
ok
:=
dashboardBatchAPIKeysUsageCache
.
Get
(
cacheKey
);
ok
{
c
.
Header
(
"X-Snapshot-Cache"
,
"hit"
)
response
.
Success
(
c
,
cached
.
Payload
)
return
}
stats
,
err
:=
h
.
dashboardService
.
GetBatchAPIKeyUsageStats
(
c
.
Request
.
Context
(),
apiKeyIDs
,
time
.
Time
{},
time
.
Time
{})
if
err
!=
nil
{
if
err
!=
nil
{
response
.
Error
(
c
,
500
,
"Failed to get API key usage stats"
)
response
.
Error
(
c
,
500
,
"Failed to get API key usage stats"
)
return
return
}
}
response
.
Success
(
c
,
gin
.
H
{
"stats"
:
stats
})
payload
:=
gin
.
H
{
"stats"
:
stats
}
dashboardBatchAPIKeysUsageCache
.
Set
(
cacheKey
,
payload
)
c
.
Header
(
"X-Snapshot-Cache"
,
"miss"
)
response
.
Success
(
c
,
payload
)
}
}
backend/internal/handler/admin/dashboard_snapshot_v2_handler.go
0 → 100644
View file @
9dcd3cd4
package
admin
import
(
"encoding/json"
"net/http"
"strconv"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/response"
"github.com/Wei-Shaw/sub2api/internal/pkg/usagestats"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
)
var
dashboardSnapshotV2Cache
=
newSnapshotCache
(
30
*
time
.
Second
)
type
dashboardSnapshotV2Stats
struct
{
usagestats
.
DashboardStats
Uptime
int64
`json:"uptime"`
}
type
dashboardSnapshotV2Response
struct
{
GeneratedAt
string
`json:"generated_at"`
StartDate
string
`json:"start_date"`
EndDate
string
`json:"end_date"`
Granularity
string
`json:"granularity"`
Stats
*
dashboardSnapshotV2Stats
`json:"stats,omitempty"`
Trend
[]
usagestats
.
TrendDataPoint
`json:"trend,omitempty"`
Models
[]
usagestats
.
ModelStat
`json:"models,omitempty"`
Groups
[]
usagestats
.
GroupStat
`json:"groups,omitempty"`
UsersTrend
[]
usagestats
.
UserUsageTrendPoint
`json:"users_trend,omitempty"`
}
type
dashboardSnapshotV2Filters
struct
{
UserID
int64
APIKeyID
int64
AccountID
int64
GroupID
int64
Model
string
RequestType
*
int16
Stream
*
bool
BillingType
*
int8
}
type
dashboardSnapshotV2CacheKey
struct
{
StartTime
string
`json:"start_time"`
EndTime
string
`json:"end_time"`
Granularity
string
`json:"granularity"`
UserID
int64
`json:"user_id"`
APIKeyID
int64
`json:"api_key_id"`
AccountID
int64
`json:"account_id"`
GroupID
int64
`json:"group_id"`
Model
string
`json:"model"`
RequestType
*
int16
`json:"request_type"`
Stream
*
bool
`json:"stream"`
BillingType
*
int8
`json:"billing_type"`
IncludeStats
bool
`json:"include_stats"`
IncludeTrend
bool
`json:"include_trend"`
IncludeModels
bool
`json:"include_models"`
IncludeGroups
bool
`json:"include_groups"`
IncludeUsersTrend
bool
`json:"include_users_trend"`
UsersTrendLimit
int
`json:"users_trend_limit"`
}
func
(
h
*
DashboardHandler
)
GetSnapshotV2
(
c
*
gin
.
Context
)
{
startTime
,
endTime
:=
parseTimeRange
(
c
)
granularity
:=
strings
.
TrimSpace
(
c
.
DefaultQuery
(
"granularity"
,
"day"
))
if
granularity
!=
"hour"
{
granularity
=
"day"
}
includeStats
:=
parseBoolQueryWithDefault
(
c
.
Query
(
"include_stats"
),
true
)
includeTrend
:=
parseBoolQueryWithDefault
(
c
.
Query
(
"include_trend"
),
true
)
includeModels
:=
parseBoolQueryWithDefault
(
c
.
Query
(
"include_model_stats"
),
true
)
includeGroups
:=
parseBoolQueryWithDefault
(
c
.
Query
(
"include_group_stats"
),
false
)
includeUsersTrend
:=
parseBoolQueryWithDefault
(
c
.
Query
(
"include_users_trend"
),
false
)
usersTrendLimit
:=
12
if
raw
:=
strings
.
TrimSpace
(
c
.
Query
(
"users_trend_limit"
));
raw
!=
""
{
if
parsed
,
err
:=
strconv
.
Atoi
(
raw
);
err
==
nil
&&
parsed
>
0
&&
parsed
<=
50
{
usersTrendLimit
=
parsed
}
}
filters
,
err
:=
parseDashboardSnapshotV2Filters
(
c
)
if
err
!=
nil
{
response
.
BadRequest
(
c
,
err
.
Error
())
return
}
keyRaw
,
_
:=
json
.
Marshal
(
dashboardSnapshotV2CacheKey
{
StartTime
:
startTime
.
UTC
()
.
Format
(
time
.
RFC3339
),
EndTime
:
endTime
.
UTC
()
.
Format
(
time
.
RFC3339
),
Granularity
:
granularity
,
UserID
:
filters
.
UserID
,
APIKeyID
:
filters
.
APIKeyID
,
AccountID
:
filters
.
AccountID
,
GroupID
:
filters
.
GroupID
,
Model
:
filters
.
Model
,
RequestType
:
filters
.
RequestType
,
Stream
:
filters
.
Stream
,
BillingType
:
filters
.
BillingType
,
IncludeStats
:
includeStats
,
IncludeTrend
:
includeTrend
,
IncludeModels
:
includeModels
,
IncludeGroups
:
includeGroups
,
IncludeUsersTrend
:
includeUsersTrend
,
UsersTrendLimit
:
usersTrendLimit
,
})
cacheKey
:=
string
(
keyRaw
)
if
cached
,
ok
:=
dashboardSnapshotV2Cache
.
Get
(
cacheKey
);
ok
{
if
cached
.
ETag
!=
""
{
c
.
Header
(
"ETag"
,
cached
.
ETag
)
c
.
Header
(
"Vary"
,
"If-None-Match"
)
if
ifNoneMatchMatched
(
c
.
GetHeader
(
"If-None-Match"
),
cached
.
ETag
)
{
c
.
Status
(
http
.
StatusNotModified
)
return
}
}
c
.
Header
(
"X-Snapshot-Cache"
,
"hit"
)
response
.
Success
(
c
,
cached
.
Payload
)
return
}
resp
:=
&
dashboardSnapshotV2Response
{
GeneratedAt
:
time
.
Now
()
.
UTC
()
.
Format
(
time
.
RFC3339
),
StartDate
:
startTime
.
Format
(
"2006-01-02"
),
EndDate
:
endTime
.
Add
(
-
24
*
time
.
Hour
)
.
Format
(
"2006-01-02"
),
Granularity
:
granularity
,
}
if
includeStats
{
stats
,
err
:=
h
.
dashboardService
.
GetDashboardStats
(
c
.
Request
.
Context
())
if
err
!=
nil
{
response
.
Error
(
c
,
500
,
"Failed to get dashboard statistics"
)
return
}
resp
.
Stats
=
&
dashboardSnapshotV2Stats
{
DashboardStats
:
*
stats
,
Uptime
:
int64
(
time
.
Since
(
h
.
startTime
)
.
Seconds
()),
}
}
if
includeTrend
{
trend
,
err
:=
h
.
dashboardService
.
GetUsageTrendWithFilters
(
c
.
Request
.
Context
(),
startTime
,
endTime
,
granularity
,
filters
.
UserID
,
filters
.
APIKeyID
,
filters
.
AccountID
,
filters
.
GroupID
,
filters
.
Model
,
filters
.
RequestType
,
filters
.
Stream
,
filters
.
BillingType
,
)
if
err
!=
nil
{
response
.
Error
(
c
,
500
,
"Failed to get usage trend"
)
return
}
resp
.
Trend
=
trend
}
if
includeModels
{
models
,
err
:=
h
.
dashboardService
.
GetModelStatsWithFilters
(
c
.
Request
.
Context
(),
startTime
,
endTime
,
filters
.
UserID
,
filters
.
APIKeyID
,
filters
.
AccountID
,
filters
.
GroupID
,
filters
.
RequestType
,
filters
.
Stream
,
filters
.
BillingType
,
)
if
err
!=
nil
{
response
.
Error
(
c
,
500
,
"Failed to get model statistics"
)
return
}
resp
.
Models
=
models
}
if
includeGroups
{
groups
,
err
:=
h
.
dashboardService
.
GetGroupStatsWithFilters
(
c
.
Request
.
Context
(),
startTime
,
endTime
,
filters
.
UserID
,
filters
.
APIKeyID
,
filters
.
AccountID
,
filters
.
GroupID
,
filters
.
RequestType
,
filters
.
Stream
,
filters
.
BillingType
,
)
if
err
!=
nil
{
response
.
Error
(
c
,
500
,
"Failed to get group statistics"
)
return
}
resp
.
Groups
=
groups
}
if
includeUsersTrend
{
usersTrend
,
err
:=
h
.
dashboardService
.
GetUserUsageTrend
(
c
.
Request
.
Context
(),
startTime
,
endTime
,
granularity
,
usersTrendLimit
,
)
if
err
!=
nil
{
response
.
Error
(
c
,
500
,
"Failed to get user usage trend"
)
return
}
resp
.
UsersTrend
=
usersTrend
}
cached
:=
dashboardSnapshotV2Cache
.
Set
(
cacheKey
,
resp
)
if
cached
.
ETag
!=
""
{
c
.
Header
(
"ETag"
,
cached
.
ETag
)
c
.
Header
(
"Vary"
,
"If-None-Match"
)
}
c
.
Header
(
"X-Snapshot-Cache"
,
"miss"
)
response
.
Success
(
c
,
resp
)
}
func
parseDashboardSnapshotV2Filters
(
c
*
gin
.
Context
)
(
*
dashboardSnapshotV2Filters
,
error
)
{
filters
:=
&
dashboardSnapshotV2Filters
{
Model
:
strings
.
TrimSpace
(
c
.
Query
(
"model"
)),
}
if
userIDStr
:=
strings
.
TrimSpace
(
c
.
Query
(
"user_id"
));
userIDStr
!=
""
{
id
,
err
:=
strconv
.
ParseInt
(
userIDStr
,
10
,
64
)
if
err
!=
nil
{
return
nil
,
err
}
filters
.
UserID
=
id
}
if
apiKeyIDStr
:=
strings
.
TrimSpace
(
c
.
Query
(
"api_key_id"
));
apiKeyIDStr
!=
""
{
id
,
err
:=
strconv
.
ParseInt
(
apiKeyIDStr
,
10
,
64
)
if
err
!=
nil
{
return
nil
,
err
}
filters
.
APIKeyID
=
id
}
if
accountIDStr
:=
strings
.
TrimSpace
(
c
.
Query
(
"account_id"
));
accountIDStr
!=
""
{
id
,
err
:=
strconv
.
ParseInt
(
accountIDStr
,
10
,
64
)
if
err
!=
nil
{
return
nil
,
err
}
filters
.
AccountID
=
id
}
if
groupIDStr
:=
strings
.
TrimSpace
(
c
.
Query
(
"group_id"
));
groupIDStr
!=
""
{
id
,
err
:=
strconv
.
ParseInt
(
groupIDStr
,
10
,
64
)
if
err
!=
nil
{
return
nil
,
err
}
filters
.
GroupID
=
id
}
if
requestTypeStr
:=
strings
.
TrimSpace
(
c
.
Query
(
"request_type"
));
requestTypeStr
!=
""
{
parsed
,
err
:=
service
.
ParseUsageRequestType
(
requestTypeStr
)
if
err
!=
nil
{
return
nil
,
err
}
value
:=
int16
(
parsed
)
filters
.
RequestType
=
&
value
}
else
if
streamStr
:=
strings
.
TrimSpace
(
c
.
Query
(
"stream"
));
streamStr
!=
""
{
streamVal
,
err
:=
strconv
.
ParseBool
(
streamStr
)
if
err
!=
nil
{
return
nil
,
err
}
filters
.
Stream
=
&
streamVal
}
if
billingTypeStr
:=
strings
.
TrimSpace
(
c
.
Query
(
"billing_type"
));
billingTypeStr
!=
""
{
v
,
err
:=
strconv
.
ParseInt
(
billingTypeStr
,
10
,
8
)
if
err
!=
nil
{
return
nil
,
err
}
bt
:=
int8
(
v
)
filters
.
BillingType
=
&
bt
}
return
filters
,
nil
}
backend/internal/handler/admin/id_list_utils.go
0 → 100644
View file @
9dcd3cd4
package
admin
import
"sort"
func
normalizeInt64IDList
(
ids
[]
int64
)
[]
int64
{
if
len
(
ids
)
==
0
{
return
nil
}
out
:=
make
([]
int64
,
0
,
len
(
ids
))
seen
:=
make
(
map
[
int64
]
struct
{},
len
(
ids
))
for
_
,
id
:=
range
ids
{
if
id
<=
0
{
continue
}
if
_
,
ok
:=
seen
[
id
];
ok
{
continue
}
seen
[
id
]
=
struct
{}{}
out
=
append
(
out
,
id
)
}
sort
.
Slice
(
out
,
func
(
i
,
j
int
)
bool
{
return
out
[
i
]
<
out
[
j
]
})
return
out
}
backend/internal/handler/admin/ops_snapshot_v2_handler.go
0 → 100644
View file @
9dcd3cd4
package
admin
import
(
"encoding/json"
"net/http"
"strconv"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/response"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
"golang.org/x/sync/errgroup"
)
var
opsDashboardSnapshotV2Cache
=
newSnapshotCache
(
30
*
time
.
Second
)
type
opsDashboardSnapshotV2Response
struct
{
GeneratedAt
string
`json:"generated_at"`
Overview
*
service
.
OpsDashboardOverview
`json:"overview"`
ThroughputTrend
*
service
.
OpsThroughputTrendResponse
`json:"throughput_trend"`
ErrorTrend
*
service
.
OpsErrorTrendResponse
`json:"error_trend"`
}
type
opsDashboardSnapshotV2CacheKey
struct
{
StartTime
string
`json:"start_time"`
EndTime
string
`json:"end_time"`
Platform
string
`json:"platform"`
GroupID
*
int64
`json:"group_id"`
QueryMode
service
.
OpsQueryMode
`json:"mode"`
BucketSecond
int
`json:"bucket_second"`
}
// GetDashboardSnapshotV2 returns ops dashboard core snapshot in one request.
// GET /api/v1/admin/ops/dashboard/snapshot-v2
func
(
h
*
OpsHandler
)
GetDashboardSnapshotV2
(
c
*
gin
.
Context
)
{
if
h
.
opsService
==
nil
{
response
.
Error
(
c
,
http
.
StatusServiceUnavailable
,
"Ops service not available"
)
return
}
if
err
:=
h
.
opsService
.
RequireMonitoringEnabled
(
c
.
Request
.
Context
());
err
!=
nil
{
response
.
ErrorFrom
(
c
,
err
)
return
}
startTime
,
endTime
,
err
:=
parseOpsTimeRange
(
c
,
"1h"
)
if
err
!=
nil
{
response
.
BadRequest
(
c
,
err
.
Error
())
return
}
filter
:=
&
service
.
OpsDashboardFilter
{
StartTime
:
startTime
,
EndTime
:
endTime
,
Platform
:
strings
.
TrimSpace
(
c
.
Query
(
"platform"
)),
QueryMode
:
parseOpsQueryMode
(
c
),
}
if
v
:=
strings
.
TrimSpace
(
c
.
Query
(
"group_id"
));
v
!=
""
{
id
,
err
:=
strconv
.
ParseInt
(
v
,
10
,
64
)
if
err
!=
nil
||
id
<=
0
{
response
.
BadRequest
(
c
,
"Invalid group_id"
)
return
}
filter
.
GroupID
=
&
id
}
bucketSeconds
:=
pickThroughputBucketSeconds
(
endTime
.
Sub
(
startTime
))
keyRaw
,
_
:=
json
.
Marshal
(
opsDashboardSnapshotV2CacheKey
{
StartTime
:
startTime
.
UTC
()
.
Format
(
time
.
RFC3339
),
EndTime
:
endTime
.
UTC
()
.
Format
(
time
.
RFC3339
),
Platform
:
filter
.
Platform
,
GroupID
:
filter
.
GroupID
,
QueryMode
:
filter
.
QueryMode
,
BucketSecond
:
bucketSeconds
,
})
cacheKey
:=
string
(
keyRaw
)
if
cached
,
ok
:=
opsDashboardSnapshotV2Cache
.
Get
(
cacheKey
);
ok
{
if
cached
.
ETag
!=
""
{
c
.
Header
(
"ETag"
,
cached
.
ETag
)
c
.
Header
(
"Vary"
,
"If-None-Match"
)
if
ifNoneMatchMatched
(
c
.
GetHeader
(
"If-None-Match"
),
cached
.
ETag
)
{
c
.
Status
(
http
.
StatusNotModified
)
return
}
}
c
.
Header
(
"X-Snapshot-Cache"
,
"hit"
)
response
.
Success
(
c
,
cached
.
Payload
)
return
}
var
(
overview
*
service
.
OpsDashboardOverview
trend
*
service
.
OpsThroughputTrendResponse
errTrend
*
service
.
OpsErrorTrendResponse
)
g
,
gctx
:=
errgroup
.
WithContext
(
c
.
Request
.
Context
())
g
.
Go
(
func
()
error
{
f
:=
*
filter
result
,
err
:=
h
.
opsService
.
GetDashboardOverview
(
gctx
,
&
f
)
if
err
!=
nil
{
return
err
}
overview
=
result
return
nil
})
g
.
Go
(
func
()
error
{
f
:=
*
filter
result
,
err
:=
h
.
opsService
.
GetThroughputTrend
(
gctx
,
&
f
,
bucketSeconds
)
if
err
!=
nil
{
return
err
}
trend
=
result
return
nil
})
g
.
Go
(
func
()
error
{
f
:=
*
filter
result
,
err
:=
h
.
opsService
.
GetErrorTrend
(
gctx
,
&
f
,
bucketSeconds
)
if
err
!=
nil
{
return
err
}
errTrend
=
result
return
nil
})
if
err
:=
g
.
Wait
();
err
!=
nil
{
response
.
ErrorFrom
(
c
,
err
)
return
}
resp
:=
&
opsDashboardSnapshotV2Response
{
GeneratedAt
:
time
.
Now
()
.
UTC
()
.
Format
(
time
.
RFC3339
),
Overview
:
overview
,
ThroughputTrend
:
trend
,
ErrorTrend
:
errTrend
,
}
cached
:=
opsDashboardSnapshotV2Cache
.
Set
(
cacheKey
,
resp
)
if
cached
.
ETag
!=
""
{
c
.
Header
(
"ETag"
,
cached
.
ETag
)
c
.
Header
(
"Vary"
,
"If-None-Match"
)
}
c
.
Header
(
"X-Snapshot-Cache"
,
"miss"
)
response
.
Success
(
c
,
resp
)
}
backend/internal/handler/admin/snapshot_cache.go
0 → 100644
View file @
9dcd3cd4
package
admin
import
(
"crypto/sha256"
"encoding/hex"
"encoding/json"
"strings"
"sync"
"time"
)
type
snapshotCacheEntry
struct
{
ETag
string
Payload
any
ExpiresAt
time
.
Time
}
type
snapshotCache
struct
{
mu
sync
.
RWMutex
ttl
time
.
Duration
items
map
[
string
]
snapshotCacheEntry
}
func
newSnapshotCache
(
ttl
time
.
Duration
)
*
snapshotCache
{
if
ttl
<=
0
{
ttl
=
30
*
time
.
Second
}
return
&
snapshotCache
{
ttl
:
ttl
,
items
:
make
(
map
[
string
]
snapshotCacheEntry
),
}
}
func
(
c
*
snapshotCache
)
Get
(
key
string
)
(
snapshotCacheEntry
,
bool
)
{
if
c
==
nil
||
key
==
""
{
return
snapshotCacheEntry
{},
false
}
now
:=
time
.
Now
()
c
.
mu
.
RLock
()
entry
,
ok
:=
c
.
items
[
key
]
c
.
mu
.
RUnlock
()
if
!
ok
{
return
snapshotCacheEntry
{},
false
}
if
now
.
After
(
entry
.
ExpiresAt
)
{
c
.
mu
.
Lock
()
delete
(
c
.
items
,
key
)
c
.
mu
.
Unlock
()
return
snapshotCacheEntry
{},
false
}
return
entry
,
true
}
func
(
c
*
snapshotCache
)
Set
(
key
string
,
payload
any
)
snapshotCacheEntry
{
if
c
==
nil
{
return
snapshotCacheEntry
{}
}
entry
:=
snapshotCacheEntry
{
ETag
:
buildETagFromAny
(
payload
),
Payload
:
payload
,
ExpiresAt
:
time
.
Now
()
.
Add
(
c
.
ttl
),
}
if
key
==
""
{
return
entry
}
c
.
mu
.
Lock
()
c
.
items
[
key
]
=
entry
c
.
mu
.
Unlock
()
return
entry
}
func
buildETagFromAny
(
payload
any
)
string
{
raw
,
err
:=
json
.
Marshal
(
payload
)
if
err
!=
nil
{
return
""
}
sum
:=
sha256
.
Sum256
(
raw
)
return
"
\"
"
+
hex
.
EncodeToString
(
sum
[
:
])
+
"
\"
"
}
func
parseBoolQueryWithDefault
(
raw
string
,
def
bool
)
bool
{
value
:=
strings
.
TrimSpace
(
strings
.
ToLower
(
raw
))
if
value
==
""
{
return
def
}
switch
value
{
case
"1"
,
"true"
,
"yes"
,
"on"
:
return
true
case
"0"
,
"false"
,
"no"
,
"off"
:
return
false
default
:
return
def
}
}
backend/internal/handler/admin/user_attribute_handler.go
View file @
9dcd3cd4
package
admin
package
admin
import
(
import
(
"encoding/json"
"strconv"
"strconv"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/response"
"github.com/Wei-Shaw/sub2api/internal/pkg/response"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/Wei-Shaw/sub2api/internal/service"
...
@@ -67,6 +69,8 @@ type BatchUserAttributesResponse struct {
...
@@ -67,6 +69,8 @@ type BatchUserAttributesResponse struct {
Attributes
map
[
int64
]
map
[
int64
]
string
`json:"attributes"`
Attributes
map
[
int64
]
map
[
int64
]
string
`json:"attributes"`
}
}
var
userAttributesBatchCache
=
newSnapshotCache
(
30
*
time
.
Second
)
// AttributeDefinitionResponse represents attribute definition response
// AttributeDefinitionResponse represents attribute definition response
type
AttributeDefinitionResponse
struct
{
type
AttributeDefinitionResponse
struct
{
ID
int64
`json:"id"`
ID
int64
`json:"id"`
...
@@ -327,16 +331,32 @@ func (h *UserAttributeHandler) GetBatchUserAttributes(c *gin.Context) {
...
@@ -327,16 +331,32 @@ func (h *UserAttributeHandler) GetBatchUserAttributes(c *gin.Context) {
return
return
}
}
if
len
(
req
.
UserIDs
)
==
0
{
userIDs
:=
normalizeInt64IDList
(
req
.
UserIDs
)
if
len
(
userIDs
)
==
0
{
response
.
Success
(
c
,
BatchUserAttributesResponse
{
Attributes
:
map
[
int64
]
map
[
int64
]
string
{}})
response
.
Success
(
c
,
BatchUserAttributesResponse
{
Attributes
:
map
[
int64
]
map
[
int64
]
string
{}})
return
return
}
}
attrs
,
err
:=
h
.
attrService
.
GetBatchUserAttributes
(
c
.
Request
.
Context
(),
req
.
UserIDs
)
keyRaw
,
_
:=
json
.
Marshal
(
struct
{
UserIDs
[]
int64
`json:"user_ids"`
}{
UserIDs
:
userIDs
,
})
cacheKey
:=
string
(
keyRaw
)
if
cached
,
ok
:=
userAttributesBatchCache
.
Get
(
cacheKey
);
ok
{
c
.
Header
(
"X-Snapshot-Cache"
,
"hit"
)
response
.
Success
(
c
,
cached
.
Payload
)
return
}
attrs
,
err
:=
h
.
attrService
.
GetBatchUserAttributes
(
c
.
Request
.
Context
(),
userIDs
)
if
err
!=
nil
{
if
err
!=
nil
{
response
.
ErrorFrom
(
c
,
err
)
response
.
ErrorFrom
(
c
,
err
)
return
return
}
}
response
.
Success
(
c
,
BatchUserAttributesResponse
{
Attributes
:
attrs
})
payload
:=
BatchUserAttributesResponse
{
Attributes
:
attrs
}
userAttributesBatchCache
.
Set
(
cacheKey
,
payload
)
c
.
Header
(
"X-Snapshot-Cache"
,
"miss"
)
response
.
Success
(
c
,
payload
)
}
}
backend/internal/handler/admin/user_handler.go
View file @
9dcd3cd4
...
@@ -91,6 +91,10 @@ func (h *UserHandler) List(c *gin.Context) {
...
@@ -91,6 +91,10 @@ func (h *UserHandler) List(c *gin.Context) {
Search
:
search
,
Search
:
search
,
Attributes
:
parseAttributeFilters
(
c
),
Attributes
:
parseAttributeFilters
(
c
),
}
}
if
raw
,
ok
:=
c
.
GetQuery
(
"include_subscriptions"
);
ok
{
includeSubscriptions
:=
parseBoolQueryWithDefault
(
raw
,
true
)
filters
.
IncludeSubscriptions
=
&
includeSubscriptions
}
users
,
total
,
err
:=
h
.
adminService
.
ListUsers
(
c
.
Request
.
Context
(),
page
,
pageSize
,
filters
)
users
,
total
,
err
:=
h
.
adminService
.
ListUsers
(
c
.
Request
.
Context
(),
page
,
pageSize
,
filters
)
if
err
!=
nil
{
if
err
!=
nil
{
...
...
backend/internal/repository/user_repo.go
View file @
9dcd3cd4
...
@@ -243,21 +243,24 @@ func (r *userRepository) ListWithFilters(ctx context.Context, params pagination.
...
@@ -243,21 +243,24 @@ func (r *userRepository) ListWithFilters(ctx context.Context, params pagination.
userMap
[
u
.
ID
]
=
&
outUsers
[
len
(
outUsers
)
-
1
]
userMap
[
u
.
ID
]
=
&
outUsers
[
len
(
outUsers
)
-
1
]
}
}
// Batch load active subscriptions with groups to avoid N+1.
shouldLoadSubscriptions
:=
filters
.
IncludeSubscriptions
==
nil
||
*
filters
.
IncludeSubscriptions
subs
,
err
:=
r
.
client
.
UserSubscription
.
Query
()
.
if
shouldLoadSubscriptions
{
Where
(
// Batch load active subscriptions with groups to avoid N+1.
usersubscription
.
UserIDIn
(
userIDs
...
),
subs
,
err
:=
r
.
client
.
UserSubscription
.
Query
()
.
usersubscription
.
StatusEQ
(
service
.
SubscriptionStatusActive
),
Where
(
)
.
usersubscription
.
UserIDIn
(
userIDs
...
),
WithGroup
()
.
usersubscription
.
StatusEQ
(
service
.
SubscriptionStatusActive
),
All
(
ctx
)
)
.
if
err
!=
nil
{
WithGroup
()
.
return
nil
,
nil
,
err
All
(
ctx
)
}
if
err
!=
nil
{
return
nil
,
nil
,
err
}
for
i
:=
range
subs
{
for
i
:=
range
subs
{
if
u
,
ok
:=
userMap
[
subs
[
i
]
.
UserID
];
ok
{
if
u
,
ok
:=
userMap
[
subs
[
i
]
.
UserID
];
ok
{
u
.
Subscriptions
=
append
(
u
.
Subscriptions
,
*
userSubscriptionEntityToService
(
subs
[
i
]))
u
.
Subscriptions
=
append
(
u
.
Subscriptions
,
*
userSubscriptionEntityToService
(
subs
[
i
]))
}
}
}
}
}
...
...
backend/internal/server/routes/admin.go
View file @
9dcd3cd4
...
@@ -168,6 +168,7 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
...
@@ -168,6 +168,7 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
ops
.
GET
(
"/system-logs/health"
,
h
.
Admin
.
Ops
.
GetSystemLogIngestionHealth
)
ops
.
GET
(
"/system-logs/health"
,
h
.
Admin
.
Ops
.
GetSystemLogIngestionHealth
)
// Dashboard (vNext - raw path for MVP)
// Dashboard (vNext - raw path for MVP)
ops
.
GET
(
"/dashboard/snapshot-v2"
,
h
.
Admin
.
Ops
.
GetDashboardSnapshotV2
)
ops
.
GET
(
"/dashboard/overview"
,
h
.
Admin
.
Ops
.
GetDashboardOverview
)
ops
.
GET
(
"/dashboard/overview"
,
h
.
Admin
.
Ops
.
GetDashboardOverview
)
ops
.
GET
(
"/dashboard/throughput-trend"
,
h
.
Admin
.
Ops
.
GetDashboardThroughputTrend
)
ops
.
GET
(
"/dashboard/throughput-trend"
,
h
.
Admin
.
Ops
.
GetDashboardThroughputTrend
)
ops
.
GET
(
"/dashboard/latency-histogram"
,
h
.
Admin
.
Ops
.
GetDashboardLatencyHistogram
)
ops
.
GET
(
"/dashboard/latency-histogram"
,
h
.
Admin
.
Ops
.
GetDashboardLatencyHistogram
)
...
@@ -180,6 +181,7 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
...
@@ -180,6 +181,7 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
func
registerDashboardRoutes
(
admin
*
gin
.
RouterGroup
,
h
*
handler
.
Handlers
)
{
func
registerDashboardRoutes
(
admin
*
gin
.
RouterGroup
,
h
*
handler
.
Handlers
)
{
dashboard
:=
admin
.
Group
(
"/dashboard"
)
dashboard
:=
admin
.
Group
(
"/dashboard"
)
{
{
dashboard
.
GET
(
"/snapshot-v2"
,
h
.
Admin
.
Dashboard
.
GetSnapshotV2
)
dashboard
.
GET
(
"/stats"
,
h
.
Admin
.
Dashboard
.
GetStats
)
dashboard
.
GET
(
"/stats"
,
h
.
Admin
.
Dashboard
.
GetStats
)
dashboard
.
GET
(
"/realtime"
,
h
.
Admin
.
Dashboard
.
GetRealtimeMetrics
)
dashboard
.
GET
(
"/realtime"
,
h
.
Admin
.
Dashboard
.
GetRealtimeMetrics
)
dashboard
.
GET
(
"/trend"
,
h
.
Admin
.
Dashboard
.
GetUsageTrend
)
dashboard
.
GET
(
"/trend"
,
h
.
Admin
.
Dashboard
.
GetUsageTrend
)
...
...
backend/internal/service/ops_dashboard.go
View file @
9dcd3cd4
...
@@ -31,6 +31,10 @@ func (s *OpsService) GetDashboardOverview(ctx context.Context, filter *OpsDashbo
...
@@ -31,6 +31,10 @@ func (s *OpsService) GetDashboardOverview(ctx context.Context, filter *OpsDashbo
filter
.
QueryMode
=
s
.
resolveOpsQueryMode
(
ctx
,
filter
.
QueryMode
)
filter
.
QueryMode
=
s
.
resolveOpsQueryMode
(
ctx
,
filter
.
QueryMode
)
overview
,
err
:=
s
.
opsRepo
.
GetDashboardOverview
(
ctx
,
filter
)
overview
,
err
:=
s
.
opsRepo
.
GetDashboardOverview
(
ctx
,
filter
)
if
err
!=
nil
&&
shouldFallbackOpsPreagg
(
filter
,
err
)
{
rawFilter
:=
cloneOpsFilterWithMode
(
filter
,
OpsQueryModeRaw
)
overview
,
err
=
s
.
opsRepo
.
GetDashboardOverview
(
ctx
,
rawFilter
)
}
if
err
!=
nil
{
if
err
!=
nil
{
if
errors
.
Is
(
err
,
ErrOpsPreaggregatedNotPopulated
)
{
if
errors
.
Is
(
err
,
ErrOpsPreaggregatedNotPopulated
)
{
return
nil
,
infraerrors
.
Conflict
(
"OPS_PREAGG_NOT_READY"
,
"Pre-aggregated ops metrics are not populated yet"
)
return
nil
,
infraerrors
.
Conflict
(
"OPS_PREAGG_NOT_READY"
,
"Pre-aggregated ops metrics are not populated yet"
)
...
...
backend/internal/service/ops_errors.go
View file @
9dcd3cd4
...
@@ -22,7 +22,14 @@ func (s *OpsService) GetErrorTrend(ctx context.Context, filter *OpsDashboardFilt
...
@@ -22,7 +22,14 @@ func (s *OpsService) GetErrorTrend(ctx context.Context, filter *OpsDashboardFilt
if
filter
.
StartTime
.
After
(
filter
.
EndTime
)
{
if
filter
.
StartTime
.
After
(
filter
.
EndTime
)
{
return
nil
,
infraerrors
.
BadRequest
(
"OPS_TIME_RANGE_INVALID"
,
"start_time must be <= end_time"
)
return
nil
,
infraerrors
.
BadRequest
(
"OPS_TIME_RANGE_INVALID"
,
"start_time must be <= end_time"
)
}
}
return
s
.
opsRepo
.
GetErrorTrend
(
ctx
,
filter
,
bucketSeconds
)
filter
.
QueryMode
=
s
.
resolveOpsQueryMode
(
ctx
,
filter
.
QueryMode
)
result
,
err
:=
s
.
opsRepo
.
GetErrorTrend
(
ctx
,
filter
,
bucketSeconds
)
if
err
!=
nil
&&
shouldFallbackOpsPreagg
(
filter
,
err
)
{
rawFilter
:=
cloneOpsFilterWithMode
(
filter
,
OpsQueryModeRaw
)
return
s
.
opsRepo
.
GetErrorTrend
(
ctx
,
rawFilter
,
bucketSeconds
)
}
return
result
,
err
}
}
func
(
s
*
OpsService
)
GetErrorDistribution
(
ctx
context
.
Context
,
filter
*
OpsDashboardFilter
)
(
*
OpsErrorDistributionResponse
,
error
)
{
func
(
s
*
OpsService
)
GetErrorDistribution
(
ctx
context
.
Context
,
filter
*
OpsDashboardFilter
)
(
*
OpsErrorDistributionResponse
,
error
)
{
...
@@ -41,5 +48,12 @@ func (s *OpsService) GetErrorDistribution(ctx context.Context, filter *OpsDashbo
...
@@ -41,5 +48,12 @@ func (s *OpsService) GetErrorDistribution(ctx context.Context, filter *OpsDashbo
if
filter
.
StartTime
.
After
(
filter
.
EndTime
)
{
if
filter
.
StartTime
.
After
(
filter
.
EndTime
)
{
return
nil
,
infraerrors
.
BadRequest
(
"OPS_TIME_RANGE_INVALID"
,
"start_time must be <= end_time"
)
return
nil
,
infraerrors
.
BadRequest
(
"OPS_TIME_RANGE_INVALID"
,
"start_time must be <= end_time"
)
}
}
return
s
.
opsRepo
.
GetErrorDistribution
(
ctx
,
filter
)
filter
.
QueryMode
=
s
.
resolveOpsQueryMode
(
ctx
,
filter
.
QueryMode
)
result
,
err
:=
s
.
opsRepo
.
GetErrorDistribution
(
ctx
,
filter
)
if
err
!=
nil
&&
shouldFallbackOpsPreagg
(
filter
,
err
)
{
rawFilter
:=
cloneOpsFilterWithMode
(
filter
,
OpsQueryModeRaw
)
return
s
.
opsRepo
.
GetErrorDistribution
(
ctx
,
rawFilter
)
}
return
result
,
err
}
}
backend/internal/service/ops_histograms.go
View file @
9dcd3cd4
...
@@ -22,5 +22,12 @@ func (s *OpsService) GetLatencyHistogram(ctx context.Context, filter *OpsDashboa
...
@@ -22,5 +22,12 @@ func (s *OpsService) GetLatencyHistogram(ctx context.Context, filter *OpsDashboa
if
filter
.
StartTime
.
After
(
filter
.
EndTime
)
{
if
filter
.
StartTime
.
After
(
filter
.
EndTime
)
{
return
nil
,
infraerrors
.
BadRequest
(
"OPS_TIME_RANGE_INVALID"
,
"start_time must be <= end_time"
)
return
nil
,
infraerrors
.
BadRequest
(
"OPS_TIME_RANGE_INVALID"
,
"start_time must be <= end_time"
)
}
}
return
s
.
opsRepo
.
GetLatencyHistogram
(
ctx
,
filter
)
filter
.
QueryMode
=
s
.
resolveOpsQueryMode
(
ctx
,
filter
.
QueryMode
)
result
,
err
:=
s
.
opsRepo
.
GetLatencyHistogram
(
ctx
,
filter
)
if
err
!=
nil
&&
shouldFallbackOpsPreagg
(
filter
,
err
)
{
rawFilter
:=
cloneOpsFilterWithMode
(
filter
,
OpsQueryModeRaw
)
return
s
.
opsRepo
.
GetLatencyHistogram
(
ctx
,
rawFilter
)
}
return
result
,
err
}
}
backend/internal/service/ops_query_mode.go
View file @
9dcd3cd4
...
@@ -38,3 +38,18 @@ func (m OpsQueryMode) IsValid() bool {
...
@@ -38,3 +38,18 @@ func (m OpsQueryMode) IsValid() bool {
return
false
return
false
}
}
}
}
func
shouldFallbackOpsPreagg
(
filter
*
OpsDashboardFilter
,
err
error
)
bool
{
return
filter
!=
nil
&&
filter
.
QueryMode
==
OpsQueryModeAuto
&&
errors
.
Is
(
err
,
ErrOpsPreaggregatedNotPopulated
)
}
func
cloneOpsFilterWithMode
(
filter
*
OpsDashboardFilter
,
mode
OpsQueryMode
)
*
OpsDashboardFilter
{
if
filter
==
nil
{
return
nil
}
cloned
:=
*
filter
cloned
.
QueryMode
=
mode
return
&
cloned
}
backend/internal/service/ops_trends.go
View file @
9dcd3cd4
...
@@ -22,5 +22,13 @@ func (s *OpsService) GetThroughputTrend(ctx context.Context, filter *OpsDashboar
...
@@ -22,5 +22,13 @@ func (s *OpsService) GetThroughputTrend(ctx context.Context, filter *OpsDashboar
if
filter
.
StartTime
.
After
(
filter
.
EndTime
)
{
if
filter
.
StartTime
.
After
(
filter
.
EndTime
)
{
return
nil
,
infraerrors
.
BadRequest
(
"OPS_TIME_RANGE_INVALID"
,
"start_time must be <= end_time"
)
return
nil
,
infraerrors
.
BadRequest
(
"OPS_TIME_RANGE_INVALID"
,
"start_time must be <= end_time"
)
}
}
return
s
.
opsRepo
.
GetThroughputTrend
(
ctx
,
filter
,
bucketSeconds
)
filter
.
QueryMode
=
s
.
resolveOpsQueryMode
(
ctx
,
filter
.
QueryMode
)
result
,
err
:=
s
.
opsRepo
.
GetThroughputTrend
(
ctx
,
filter
,
bucketSeconds
)
if
err
!=
nil
&&
shouldFallbackOpsPreagg
(
filter
,
err
)
{
rawFilter
:=
cloneOpsFilterWithMode
(
filter
,
OpsQueryModeRaw
)
return
s
.
opsRepo
.
GetThroughputTrend
(
ctx
,
rawFilter
,
bucketSeconds
)
}
return
result
,
err
}
}
backend/internal/service/user_service.go
View file @
9dcd3cd4
...
@@ -22,6 +22,10 @@ type UserListFilters struct {
...
@@ -22,6 +22,10 @@ type UserListFilters struct {
Role
string
// User role filter
Role
string
// User role filter
Search
string
// Search in email, username
Search
string
// Search in email, username
Attributes
map
[
int64
]
string
// Custom attribute filters: attributeID -> value
Attributes
map
[
int64
]
string
// Custom attribute filters: attributeID -> value
// IncludeSubscriptions controls whether ListWithFilters should load active subscriptions.
// For large datasets this can be expensive; admin list pages should enable it on demand.
// nil means not specified (default: load subscriptions for backward compatibility).
IncludeSubscriptions
*
bool
}
}
type
UserRepository
interface
{
type
UserRepository
interface
{
...
...
backend/migrations/065_add_search_trgm_indexes.sql
0 → 100644
View file @
9dcd3cd4
-- Improve admin fuzzy-search performance on large datasets.
-- Best effort:
-- 1) try enabling pg_trgm
-- 2) only create trigram indexes when extension is available
DO
$$
BEGIN
BEGIN
CREATE
EXTENSION
IF
NOT
EXISTS
pg_trgm
;
EXCEPTION
WHEN
OTHERS
THEN
RAISE
NOTICE
'pg_trgm extension not created: %'
,
SQLERRM
;
END
;
IF
EXISTS
(
SELECT
1
FROM
pg_extension
WHERE
extname
=
'pg_trgm'
)
THEN
EXECUTE
'CREATE INDEX IF NOT EXISTS idx_users_email_trgm
ON users USING gin (email gin_trgm_ops)'
;
EXECUTE
'CREATE INDEX IF NOT EXISTS idx_users_username_trgm
ON users USING gin (username gin_trgm_ops)'
;
EXECUTE
'CREATE INDEX IF NOT EXISTS idx_users_notes_trgm
ON users USING gin (notes gin_trgm_ops)'
;
EXECUTE
'CREATE INDEX IF NOT EXISTS idx_accounts_name_trgm
ON accounts USING gin (name gin_trgm_ops)'
;
EXECUTE
'CREATE INDEX IF NOT EXISTS idx_api_keys_key_trgm
ON api_keys USING gin ("key" gin_trgm_ops)'
;
EXECUTE
'CREATE INDEX IF NOT EXISTS idx_api_keys_name_trgm
ON api_keys USING gin (name gin_trgm_ops)'
;
ELSE
RAISE
NOTICE
'skip trigram indexes because pg_trgm is unavailable'
;
END
IF
;
END
$$
;
frontend/src/api/admin/accounts.ts
View file @
9dcd3cd4
...
@@ -36,6 +36,7 @@ export async function list(
...
@@ -36,6 +36,7 @@ export async function list(
status
?:
string
status
?:
string
group
?:
string
group
?:
string
search
?:
string
search
?:
string
lite
?:
string
},
},
options
?:
{
options
?:
{
signal
?:
AbortSignal
signal
?:
AbortSignal
...
@@ -66,6 +67,7 @@ export async function listWithEtag(
...
@@ -66,6 +67,7 @@ export async function listWithEtag(
type
?:
string
type
?:
string
status
?:
string
status
?:
string
search
?:
string
search
?:
string
lite
?:
string
},
},
options
?:
{
options
?:
{
signal
?:
AbortSignal
signal
?:
AbortSignal
...
...
Prev
1
2
Next
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment