"backend/cmd/vscode:/vscode.git/clone" did not exist on "bd18f4b8ef2d1bbb713c362a5efbe20d4bc4fbc8"
Commit 839ab37d authored by yangjianbo's avatar yangjianbo
Browse files
parents 9dd0ef18 fd8473f2
......@@ -126,6 +126,4 @@ backend/cmd/server/server
deploy/docker-compose.override.yml
.gocache/
vite.config.js
!docs/
docs/*
!docs/dependency-security.md
.cache/
.DS_Store
......@@ -18,6 +18,12 @@ linters:
list-mode: original
files:
- "**/internal/service/**"
- "!**/internal/service/ops_aggregation_service.go"
- "!**/internal/service/ops_alert_evaluator_service.go"
- "!**/internal/service/ops_cleanup_service.go"
- "!**/internal/service/ops_metrics_collector.go"
- "!**/internal/service/ops_scheduled_report_service.go"
- "!**/internal/service/wire.go"
deny:
- pkg: github.com/Wei-Shaw/sub2api/internal/repository
desc: "service must not import repository"
......
......@@ -62,6 +62,11 @@ func provideServiceBuildInfo(buildInfo handler.BuildInfo) service.BuildInfo {
func provideCleanup(
entClient *ent.Client,
rdb *redis.Client,
opsMetricsCollector *service.OpsMetricsCollector,
opsAggregation *service.OpsAggregationService,
opsAlertEvaluator *service.OpsAlertEvaluatorService,
opsCleanup *service.OpsCleanupService,
opsScheduledReport *service.OpsScheduledReportService,
tokenRefresh *service.TokenRefreshService,
accountExpiry *service.AccountExpiryService,
pricing *service.PricingService,
......@@ -81,6 +86,36 @@ func provideCleanup(
name string
fn func() error
}{
{"OpsScheduledReportService", func() error {
if opsScheduledReport != nil {
opsScheduledReport.Stop()
}
return nil
}},
{"OpsCleanupService", func() error {
if opsCleanup != nil {
opsCleanup.Stop()
}
return nil
}},
{"OpsAlertEvaluatorService", func() error {
if opsAlertEvaluator != nil {
opsAlertEvaluator.Stop()
}
return nil
}},
{"OpsAggregationService", func() error {
if opsAggregation != nil {
opsAggregation.Stop()
}
return nil
}},
{"OpsMetricsCollector", func() error {
if opsMetricsCollector != nil {
opsMetricsCollector.Stop()
}
return nil
}},
{"TokenRefreshService", func() error {
tokenRefresh.Stop()
return nil
......
......@@ -120,19 +120,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
proxyHandler := admin.NewProxyHandler(adminService)
adminRedeemHandler := admin.NewRedeemHandler(adminService)
promoHandler := admin.NewPromoHandler(promoService)
settingHandler := admin.NewSettingHandler(settingService, emailService, turnstileService)
updateCache := repository.NewUpdateCache(redisClient)
gitHubReleaseClient := repository.ProvideGitHubReleaseClient(configConfig)
serviceBuildInfo := provideServiceBuildInfo(buildInfo)
updateService := service.ProvideUpdateService(updateCache, gitHubReleaseClient, serviceBuildInfo)
systemHandler := handler.ProvideSystemHandler(updateService)
adminSubscriptionHandler := admin.NewSubscriptionHandler(subscriptionService)
adminUsageHandler := admin.NewUsageHandler(usageService, apiKeyService, adminService)
userAttributeDefinitionRepository := repository.NewUserAttributeDefinitionRepository(client)
userAttributeValueRepository := repository.NewUserAttributeValueRepository(client)
userAttributeService := service.NewUserAttributeService(userAttributeDefinitionRepository, userAttributeValueRepository)
userAttributeHandler := admin.NewUserAttributeHandler(userAttributeService)
adminHandlers := handler.ProvideAdminHandlers(dashboardHandler, adminUserHandler, groupHandler, accountHandler, oAuthHandler, openAIOAuthHandler, geminiOAuthHandler, antigravityOAuthHandler, proxyHandler, adminRedeemHandler, promoHandler, settingHandler, systemHandler, adminSubscriptionHandler, adminUsageHandler, userAttributeHandler)
opsRepository := repository.NewOpsRepository(db)
pricingRemoteClient := repository.ProvidePricingRemoteClient(configConfig)
pricingService, err := service.ProvidePricingService(configConfig, pricingRemoteClient)
if err != nil {
......@@ -143,20 +131,40 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
identityService := service.NewIdentityService(identityCache)
deferredService := service.ProvideDeferredService(accountRepository, timingWheelService)
gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, userRepository, userSubscriptionRepository, gatewayCache, configConfig, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService)
openAIGatewayService := service.NewOpenAIGatewayService(accountRepository, usageLogRepository, userRepository, userSubscriptionRepository, gatewayCache, configConfig, concurrencyService, billingService, rateLimitService, billingCacheService, httpUpstream, deferredService)
geminiMessagesCompatService := service.NewGeminiMessagesCompatService(accountRepository, groupRepository, gatewayCache, geminiTokenProvider, rateLimitService, httpUpstream, antigravityGatewayService, configConfig)
opsService := service.NewOpsService(opsRepository, settingRepository, configConfig, accountRepository, concurrencyService, gatewayService, openAIGatewayService, geminiMessagesCompatService, antigravityGatewayService)
settingHandler := admin.NewSettingHandler(settingService, emailService, turnstileService, opsService)
opsHandler := admin.NewOpsHandler(opsService)
updateCache := repository.NewUpdateCache(redisClient)
gitHubReleaseClient := repository.ProvideGitHubReleaseClient(configConfig)
serviceBuildInfo := provideServiceBuildInfo(buildInfo)
updateService := service.ProvideUpdateService(updateCache, gitHubReleaseClient, serviceBuildInfo)
systemHandler := handler.ProvideSystemHandler(updateService)
adminSubscriptionHandler := admin.NewSubscriptionHandler(subscriptionService)
adminUsageHandler := admin.NewUsageHandler(usageService, apiKeyService, adminService)
userAttributeDefinitionRepository := repository.NewUserAttributeDefinitionRepository(client)
userAttributeValueRepository := repository.NewUserAttributeValueRepository(client)
userAttributeService := service.NewUserAttributeService(userAttributeDefinitionRepository, userAttributeValueRepository)
userAttributeHandler := admin.NewUserAttributeHandler(userAttributeService)
adminHandlers := handler.ProvideAdminHandlers(dashboardHandler, adminUserHandler, groupHandler, accountHandler, oAuthHandler, openAIOAuthHandler, geminiOAuthHandler, antigravityOAuthHandler, proxyHandler, adminRedeemHandler, promoHandler, settingHandler, opsHandler, systemHandler, adminSubscriptionHandler, adminUsageHandler, userAttributeHandler)
gatewayHandler := handler.NewGatewayHandler(gatewayService, geminiMessagesCompatService, antigravityGatewayService, userService, concurrencyService, billingCacheService, configConfig)
openAIGatewayService := service.NewOpenAIGatewayService(accountRepository, usageLogRepository, userRepository, userSubscriptionRepository, gatewayCache, configConfig, concurrencyService, billingService, rateLimitService, billingCacheService, httpUpstream, deferredService)
openAIGatewayHandler := handler.NewOpenAIGatewayHandler(openAIGatewayService, concurrencyService, billingCacheService, configConfig)
handlerSettingHandler := handler.ProvideSettingHandler(settingService, buildInfo)
handlers := handler.ProvideHandlers(authHandler, userHandler, apiKeyHandler, usageHandler, redeemHandler, subscriptionHandler, adminHandlers, gatewayHandler, openAIGatewayHandler, handlerSettingHandler)
jwtAuthMiddleware := middleware.NewJWTAuthMiddleware(authService, userService)
adminAuthMiddleware := middleware.NewAdminAuthMiddleware(authService, userService, settingService)
apiKeyAuthMiddleware := middleware.NewAPIKeyAuthMiddleware(apiKeyService, subscriptionService, configConfig)
engine := server.ProvideRouter(configConfig, handlers, jwtAuthMiddleware, adminAuthMiddleware, apiKeyAuthMiddleware, apiKeyService, subscriptionService, settingService, redisClient)
engine := server.ProvideRouter(configConfig, handlers, jwtAuthMiddleware, adminAuthMiddleware, apiKeyAuthMiddleware, apiKeyService, subscriptionService, opsService, settingService, redisClient)
httpServer := server.ProvideHTTPServer(configConfig, engine)
opsMetricsCollector := service.ProvideOpsMetricsCollector(opsRepository, settingRepository, accountRepository, concurrencyService, db, redisClient, configConfig)
opsAggregationService := service.ProvideOpsAggregationService(opsRepository, settingRepository, db, redisClient, configConfig)
opsAlertEvaluatorService := service.ProvideOpsAlertEvaluatorService(opsService, opsRepository, emailService, redisClient, configConfig)
opsCleanupService := service.ProvideOpsCleanupService(opsRepository, db, redisClient, configConfig)
opsScheduledReportService := service.ProvideOpsScheduledReportService(opsService, userService, emailService, redisClient, configConfig)
tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, configConfig)
accountExpiryService := service.ProvideAccountExpiryService(accountRepository)
v := provideCleanup(client, redisClient, tokenRefreshService, accountExpiryService, pricingService, emailQueueService, billingCacheService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService)
v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, tokenRefreshService, accountExpiryService, pricingService, emailQueueService, billingCacheService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService)
application := &Application{
Server: httpServer,
Cleanup: v,
......@@ -181,6 +189,11 @@ func provideServiceBuildInfo(buildInfo handler.BuildInfo) service.BuildInfo {
func provideCleanup(
entClient *ent.Client,
rdb *redis.Client,
opsMetricsCollector *service.OpsMetricsCollector,
opsAggregation *service.OpsAggregationService,
opsAlertEvaluator *service.OpsAlertEvaluatorService,
opsCleanup *service.OpsCleanupService,
opsScheduledReport *service.OpsScheduledReportService,
tokenRefresh *service.TokenRefreshService,
accountExpiry *service.AccountExpiryService,
pricing *service.PricingService,
......@@ -199,6 +212,36 @@ func provideCleanup(
name string
fn func() error
}{
{"OpsScheduledReportService", func() error {
if opsScheduledReport != nil {
opsScheduledReport.Stop()
}
return nil
}},
{"OpsCleanupService", func() error {
if opsCleanup != nil {
opsCleanup.Stop()
}
return nil
}},
{"OpsAlertEvaluatorService", func() error {
if opsAlertEvaluator != nil {
opsAlertEvaluator.Stop()
}
return nil
}},
{"OpsAggregationService", func() error {
if opsAggregation != nil {
opsAggregation.Stop()
}
return nil
}},
{"OpsMetricsCollector", func() error {
if opsMetricsCollector != nil {
opsMetricsCollector.Stop()
}
return nil
}},
{"TokenRefreshService", func() error {
tokenRefresh.Stop()
return nil
......
......@@ -8,9 +8,11 @@ require (
github.com/golang-jwt/jwt/v5 v5.2.2
github.com/google/uuid v1.6.0
github.com/google/wire v0.7.0
github.com/gorilla/websocket v1.5.3
github.com/imroc/req/v3 v3.57.0
github.com/lib/pq v1.10.9
github.com/redis/go-redis/v9 v9.17.2
github.com/shirou/gopsutil/v4 v4.25.6
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.11.1
github.com/testcontainers/testcontainers-go/modules/postgres v0.40.0
......@@ -106,9 +108,9 @@ require (
github.com/quic-go/quic-go v0.57.1 // indirect
github.com/refraction-networking/utls v1.8.1 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/shirou/gopsutil/v4 v4.25.6 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
......
......@@ -117,6 +117,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/wire v0.7.0 h1:JxUKI6+CVBgCO2WToKy/nQk0sS+amI9z9EjVmdaocj4=
github.com/google/wire v0.7.0/go.mod h1:n6YbUQD9cPKTnHXEBN2DXlOp/mVADhVErcMFb0v3J18=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
......@@ -224,6 +226,8 @@ github.com/refraction-networking/utls v1.8.1 h1:yNY1kapmQU8JeM1sSw2H2asfTIwWxIkr
github.com/refraction-networking/utls v1.8.1/go.mod h1:jkSOEkLqn+S/jtpEHPOsVv/4V4EVnelwbMQl4vCWXAM=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
......
......@@ -43,6 +43,7 @@ type Config struct {
Turnstile TurnstileConfig `mapstructure:"turnstile"`
Database DatabaseConfig `mapstructure:"database"`
Redis RedisConfig `mapstructure:"redis"`
Ops OpsConfig `mapstructure:"ops"`
JWT JWTConfig `mapstructure:"jwt"`
LinuxDo LinuxDoConnectConfig `mapstructure:"linuxdo_connect"`
Default DefaultConfig `mapstructure:"default"`
......@@ -60,14 +61,6 @@ type Config struct {
Update UpdateConfig `mapstructure:"update"`
}
// UpdateConfig 在线更新相关配置
type UpdateConfig struct {
// ProxyURL 用于访问 GitHub 的代理地址
// 支持 http/https/socks5/socks5h 协议
// 例如: "http://127.0.0.1:7890", "socks5://127.0.0.1:1080"
ProxyURL string `mapstructure:"proxy_url"`
}
type GeminiConfig struct {
OAuth GeminiOAuthConfig `mapstructure:"oauth"`
Quota GeminiQuotaConfig `mapstructure:"quota"`
......@@ -90,6 +83,33 @@ type GeminiTierQuotaConfig struct {
CooldownMinutes *int `mapstructure:"cooldown_minutes" json:"cooldown_minutes"`
}
type UpdateConfig struct {
// ProxyURL 用于访问 GitHub 的代理地址
// 支持 http/https/socks5/socks5h 协议
// 例如: "http://127.0.0.1:7890", "socks5://127.0.0.1:1080"
ProxyURL string `mapstructure:"proxy_url"`
}
type LinuxDoConnectConfig struct {
Enabled bool `mapstructure:"enabled"`
ClientID string `mapstructure:"client_id"`
ClientSecret string `mapstructure:"client_secret"`
AuthorizeURL string `mapstructure:"authorize_url"`
TokenURL string `mapstructure:"token_url"`
UserInfoURL string `mapstructure:"userinfo_url"`
Scopes string `mapstructure:"scopes"`
RedirectURL string `mapstructure:"redirect_url"` // 后端回调地址(需在提供方后台登记)
FrontendRedirectURL string `mapstructure:"frontend_redirect_url"` // 前端接收 token 的路由(默认:/auth/linuxdo/callback)
TokenAuthMethod string `mapstructure:"token_auth_method"` // client_secret_post / client_secret_basic / none
UsePKCE bool `mapstructure:"use_pkce"`
// 可选:用于从 userinfo JSON 中提取字段的 gjson 路径。
// 为空时,服务端会尝试一组常见字段名。
UserInfoEmailPath string `mapstructure:"userinfo_email_path"`
UserInfoIDPath string `mapstructure:"userinfo_id_path"`
UserInfoUsernamePath string `mapstructure:"userinfo_username_path"`
}
// TokenRefreshConfig OAuth token自动刷新配置
type TokenRefreshConfig struct {
// 是否启用自动刷新
......@@ -332,6 +352,47 @@ func (r *RedisConfig) Address() string {
return fmt.Sprintf("%s:%d", r.Host, r.Port)
}
type OpsConfig struct {
// Enabled controls whether ops features should run.
//
// NOTE: vNext still has a DB-backed feature flag (ops_monitoring_enabled) for runtime on/off.
// This config flag is the "hard switch" for deployments that want to disable ops completely.
Enabled bool `mapstructure:"enabled"`
// UsePreaggregatedTables prefers ops_metrics_hourly/daily for long-window dashboard queries.
UsePreaggregatedTables bool `mapstructure:"use_preaggregated_tables"`
// Cleanup controls periodic deletion of old ops data to prevent unbounded growth.
Cleanup OpsCleanupConfig `mapstructure:"cleanup"`
// MetricsCollectorCache controls Redis caching for expensive per-window collector queries.
MetricsCollectorCache OpsMetricsCollectorCacheConfig `mapstructure:"metrics_collector_cache"`
// Pre-aggregation configuration.
Aggregation OpsAggregationConfig `mapstructure:"aggregation"`
}
type OpsCleanupConfig struct {
Enabled bool `mapstructure:"enabled"`
Schedule string `mapstructure:"schedule"`
// Retention days (0 disables that cleanup target).
//
// vNext requirement: default 30 days across ops datasets.
ErrorLogRetentionDays int `mapstructure:"error_log_retention_days"`
MinuteMetricsRetentionDays int `mapstructure:"minute_metrics_retention_days"`
HourlyMetricsRetentionDays int `mapstructure:"hourly_metrics_retention_days"`
}
type OpsAggregationConfig struct {
Enabled bool `mapstructure:"enabled"`
}
type OpsMetricsCollectorCacheConfig struct {
Enabled bool `mapstructure:"enabled"`
TTL time.Duration `mapstructure:"ttl"`
}
type JWTConfig struct {
Secret string `mapstructure:"secret"`
ExpireHour int `mapstructure:"expire_hour"`
......@@ -341,30 +402,6 @@ type TurnstileConfig struct {
Required bool `mapstructure:"required"`
}
// LinuxDoConnectConfig 用于 LinuxDo Connect OAuth 登录(终端用户 SSO)。
//
// 注意:这与上游账号的 OAuth(例如 OpenAI/Gemini 账号接入)不是一回事。
// 这里是用于登录 Sub2API 本身的用户体系。
type LinuxDoConnectConfig struct {
Enabled bool `mapstructure:"enabled"`
ClientID string `mapstructure:"client_id"`
ClientSecret string `mapstructure:"client_secret"`
AuthorizeURL string `mapstructure:"authorize_url"`
TokenURL string `mapstructure:"token_url"`
UserInfoURL string `mapstructure:"userinfo_url"`
Scopes string `mapstructure:"scopes"`
RedirectURL string `mapstructure:"redirect_url"` // 后端回调地址(需在提供方后台登记)
FrontendRedirectURL string `mapstructure:"frontend_redirect_url"` // 前端接收 token 的路由(默认:/auth/linuxdo/callback)
TokenAuthMethod string `mapstructure:"token_auth_method"` // client_secret_post / client_secret_basic / none
UsePKCE bool `mapstructure:"use_pkce"`
// 可选:用于从 userinfo JSON 中提取字段的 gjson 路径。
// 为空时,服务端会尝试一组常见字段名。
UserInfoEmailPath string `mapstructure:"userinfo_email_path"`
UserInfoIDPath string `mapstructure:"userinfo_id_path"`
UserInfoUsernamePath string `mapstructure:"userinfo_username_path"`
}
type DefaultConfig struct {
AdminEmail string `mapstructure:"admin_email"`
AdminPassword string `mapstructure:"admin_password"`
......@@ -531,81 +568,6 @@ func Load() (*Config, error) {
return &cfg, nil
}
// ValidateAbsoluteHTTPURL 校验一个绝对 http(s) URL(禁止 fragment)。
func ValidateAbsoluteHTTPURL(raw string) error {
raw = strings.TrimSpace(raw)
if raw == "" {
return fmt.Errorf("empty url")
}
u, err := url.Parse(raw)
if err != nil {
return err
}
if !u.IsAbs() {
return fmt.Errorf("must be absolute")
}
if !isHTTPScheme(u.Scheme) {
return fmt.Errorf("unsupported scheme: %s", u.Scheme)
}
if strings.TrimSpace(u.Host) == "" {
return fmt.Errorf("missing host")
}
if u.Fragment != "" {
return fmt.Errorf("must not include fragment")
}
return nil
}
// ValidateFrontendRedirectURL 校验前端回调地址:
// - 允许同源相对路径(以 / 开头)
// - 或绝对 http(s) URL(禁止 fragment)
func ValidateFrontendRedirectURL(raw string) error {
raw = strings.TrimSpace(raw)
if raw == "" {
return fmt.Errorf("empty url")
}
if strings.ContainsAny(raw, "\r\n") {
return fmt.Errorf("contains invalid characters")
}
if strings.HasPrefix(raw, "/") {
if strings.HasPrefix(raw, "//") {
return fmt.Errorf("must not start with //")
}
return nil
}
u, err := url.Parse(raw)
if err != nil {
return err
}
if !u.IsAbs() {
return fmt.Errorf("must be absolute http(s) url or relative path")
}
if !isHTTPScheme(u.Scheme) {
return fmt.Errorf("unsupported scheme: %s", u.Scheme)
}
if strings.TrimSpace(u.Host) == "" {
return fmt.Errorf("missing host")
}
if u.Fragment != "" {
return fmt.Errorf("must not include fragment")
}
return nil
}
func isHTTPScheme(scheme string) bool {
return strings.EqualFold(scheme, "http") || strings.EqualFold(scheme, "https")
}
func warnIfInsecureURL(field, raw string) {
u, err := url.Parse(strings.TrimSpace(raw))
if err != nil {
return
}
if strings.EqualFold(u.Scheme, "http") {
log.Printf("Warning: %s uses http scheme; use https in production to avoid token leakage.", field)
}
}
func setDefaults() {
viper.SetDefault("run_mode", RunModeStandard)
......@@ -655,7 +617,7 @@ func setDefaults() {
// Turnstile
viper.SetDefault("turnstile.required", false)
// LinuxDo Connect OAuth 登录(终端用户 SSO)
// LinuxDo Connect OAuth 登录
viper.SetDefault("linuxdo_connect.enabled", false)
viper.SetDefault("linuxdo_connect.client_id", "")
viper.SetDefault("linuxdo_connect.client_secret", "")
......@@ -694,6 +656,20 @@ func setDefaults() {
viper.SetDefault("redis.pool_size", 128)
viper.SetDefault("redis.min_idle_conns", 10)
// Ops (vNext)
viper.SetDefault("ops.enabled", true)
viper.SetDefault("ops.use_preaggregated_tables", false)
viper.SetDefault("ops.cleanup.enabled", true)
viper.SetDefault("ops.cleanup.schedule", "0 2 * * *")
// Retention days: vNext defaults to 30 days across ops datasets.
viper.SetDefault("ops.cleanup.error_log_retention_days", 30)
viper.SetDefault("ops.cleanup.minute_metrics_retention_days", 30)
viper.SetDefault("ops.cleanup.hourly_metrics_retention_days", 30)
viper.SetDefault("ops.aggregation.enabled", true)
viper.SetDefault("ops.metrics_collector_cache.enabled", true)
// TTL should be slightly larger than collection interval (1m) to maximize cross-replica cache hits.
viper.SetDefault("ops.metrics_collector_cache.ttl", 65*time.Second)
// JWT
viper.SetDefault("jwt.secret", "")
viper.SetDefault("jwt.expire_hour", 24)
......@@ -750,7 +726,7 @@ func setDefaults() {
// Gateway
viper.SetDefault("gateway.response_header_timeout", 600) // 600秒(10分钟)等待上游响应头,LLM高负载时可能排队较久
viper.SetDefault("gateway.log_upstream_error_body", false)
viper.SetDefault("gateway.log_upstream_error_body", true)
viper.SetDefault("gateway.log_upstream_error_body_max_bytes", 2048)
viper.SetDefault("gateway.inject_beta_for_apikey", false)
viper.SetDefault("gateway.failover_on_400", false)
......@@ -766,7 +742,7 @@ func setDefaults() {
viper.SetDefault("gateway.concurrency_slot_ttl_minutes", 30) // 并发槽位过期时间(支持超长请求)
viper.SetDefault("gateway.stream_data_interval_timeout", 180)
viper.SetDefault("gateway.stream_keepalive_interval", 10)
viper.SetDefault("gateway.max_line_size", 40*1024*1024)
viper.SetDefault("gateway.max_line_size", 10*1024*1024)
viper.SetDefault("gateway.scheduling.sticky_session_max_waiting", 3)
viper.SetDefault("gateway.scheduling.sticky_session_wait_timeout", 45*time.Second)
viper.SetDefault("gateway.scheduling.fallback_wait_timeout", 30*time.Second)
......@@ -789,10 +765,6 @@ func setDefaults() {
viper.SetDefault("gemini.oauth.client_secret", "")
viper.SetDefault("gemini.oauth.scopes", "")
viper.SetDefault("gemini.quota.policy", "")
// Update - 在线更新配置
// 代理地址为空表示直连 GitHub(适用于海外服务器)
viper.SetDefault("update.proxy_url", "")
}
func (c *Config) Validate() error {
......@@ -833,7 +805,8 @@ func (c *Config) Validate() error {
if method == "none" && !c.LinuxDo.UsePKCE {
return fmt.Errorf("linuxdo_connect.use_pkce must be true when linuxdo_connect.token_auth_method=none")
}
if (method == "" || method == "client_secret_post" || method == "client_secret_basic") && strings.TrimSpace(c.LinuxDo.ClientSecret) == "" {
if (method == "" || method == "client_secret_post" || method == "client_secret_basic") &&
strings.TrimSpace(c.LinuxDo.ClientSecret) == "" {
return fmt.Errorf("linuxdo_connect.client_secret is required when linuxdo_connect.enabled=true and token_auth_method is client_secret_post/client_secret_basic")
}
if strings.TrimSpace(c.LinuxDo.FrontendRedirectURL) == "" {
......@@ -1048,6 +1021,21 @@ func (c *Config) Validate() error {
if c.Gateway.Scheduling.SlotCleanupInterval < 0 {
return fmt.Errorf("gateway.scheduling.slot_cleanup_interval must be non-negative")
}
if c.Ops.MetricsCollectorCache.TTL < 0 {
return fmt.Errorf("ops.metrics_collector_cache.ttl must be non-negative")
}
if c.Ops.Cleanup.ErrorLogRetentionDays < 0 {
return fmt.Errorf("ops.cleanup.error_log_retention_days must be non-negative")
}
if c.Ops.Cleanup.MinuteMetricsRetentionDays < 0 {
return fmt.Errorf("ops.cleanup.minute_metrics_retention_days must be non-negative")
}
if c.Ops.Cleanup.HourlyMetricsRetentionDays < 0 {
return fmt.Errorf("ops.cleanup.hourly_metrics_retention_days must be non-negative")
}
if c.Ops.Cleanup.Enabled && strings.TrimSpace(c.Ops.Cleanup.Schedule) == "" {
return fmt.Errorf("ops.cleanup.schedule is required when ops.cleanup.enabled=true")
}
if c.Concurrency.PingInterval < 5 || c.Concurrency.PingInterval > 30 {
return fmt.Errorf("concurrency.ping_interval must be between 5-30 seconds")
}
......@@ -1124,3 +1112,77 @@ func GetServerAddress() string {
port := v.GetInt("server.port")
return fmt.Sprintf("%s:%d", host, port)
}
// ValidateAbsoluteHTTPURL 验证是否为有效的绝对 HTTP(S) URL
func ValidateAbsoluteHTTPURL(raw string) error {
raw = strings.TrimSpace(raw)
if raw == "" {
return fmt.Errorf("empty url")
}
u, err := url.Parse(raw)
if err != nil {
return err
}
if !u.IsAbs() {
return fmt.Errorf("must be absolute")
}
if !isHTTPScheme(u.Scheme) {
return fmt.Errorf("unsupported scheme: %s", u.Scheme)
}
if strings.TrimSpace(u.Host) == "" {
return fmt.Errorf("missing host")
}
if u.Fragment != "" {
return fmt.Errorf("must not include fragment")
}
return nil
}
// ValidateFrontendRedirectURL 验证前端重定向 URL(可以是绝对 URL 或相对路径)
func ValidateFrontendRedirectURL(raw string) error {
raw = strings.TrimSpace(raw)
if raw == "" {
return fmt.Errorf("empty url")
}
if strings.ContainsAny(raw, "\r\n") {
return fmt.Errorf("contains invalid characters")
}
if strings.HasPrefix(raw, "/") {
if strings.HasPrefix(raw, "//") {
return fmt.Errorf("must not start with //")
}
return nil
}
u, err := url.Parse(raw)
if err != nil {
return err
}
if !u.IsAbs() {
return fmt.Errorf("must be absolute http(s) url or relative path")
}
if !isHTTPScheme(u.Scheme) {
return fmt.Errorf("unsupported scheme: %s", u.Scheme)
}
if strings.TrimSpace(u.Host) == "" {
return fmt.Errorf("missing host")
}
if u.Fragment != "" {
return fmt.Errorf("must not include fragment")
}
return nil
}
// isHTTPScheme 检查是否为 HTTP 或 HTTPS 协议
func isHTTPScheme(scheme string) bool {
return strings.EqualFold(scheme, "http") || strings.EqualFold(scheme, "https")
}
func warnIfInsecureURL(field, raw string) {
u, err := url.Parse(strings.TrimSpace(raw))
if err != nil {
return
}
if strings.EqualFold(u.Scheme, "http") {
log.Printf("Warning: %s uses http scheme; use https in production to avoid token leakage.", field)
}
}
package admin
import (
"encoding/json"
"fmt"
"math"
"net/http"
"strconv"
"strings"
"github.com/Wei-Shaw/sub2api/internal/pkg/response"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
)
var validOpsAlertMetricTypes = []string{
"success_rate",
"error_rate",
"upstream_error_rate",
"p95_latency_ms",
"p99_latency_ms",
"cpu_usage_percent",
"memory_usage_percent",
"concurrency_queue_depth",
}
var validOpsAlertMetricTypeSet = func() map[string]struct{} {
set := make(map[string]struct{}, len(validOpsAlertMetricTypes))
for _, v := range validOpsAlertMetricTypes {
set[v] = struct{}{}
}
return set
}()
var validOpsAlertOperators = []string{">", "<", ">=", "<=", "==", "!="}
var validOpsAlertOperatorSet = func() map[string]struct{} {
set := make(map[string]struct{}, len(validOpsAlertOperators))
for _, v := range validOpsAlertOperators {
set[v] = struct{}{}
}
return set
}()
var validOpsAlertSeverities = []string{"P0", "P1", "P2", "P3"}
var validOpsAlertSeveritySet = func() map[string]struct{} {
set := make(map[string]struct{}, len(validOpsAlertSeverities))
for _, v := range validOpsAlertSeverities {
set[v] = struct{}{}
}
return set
}()
type opsAlertRuleValidatedInput struct {
Name string
MetricType string
Operator string
Threshold float64
Severity string
WindowMinutes int
SustainedMinutes int
CooldownMinutes int
Enabled bool
NotifyEmail bool
WindowProvided bool
SustainedProvided bool
CooldownProvided bool
SeverityProvided bool
EnabledProvided bool
NotifyProvided bool
}
func isPercentOrRateMetric(metricType string) bool {
switch metricType {
case "success_rate",
"error_rate",
"upstream_error_rate",
"cpu_usage_percent",
"memory_usage_percent":
return true
default:
return false
}
}
func validateOpsAlertRulePayload(raw map[string]json.RawMessage) (*opsAlertRuleValidatedInput, error) {
if raw == nil {
return nil, fmt.Errorf("invalid request body")
}
requiredFields := []string{"name", "metric_type", "operator", "threshold"}
for _, field := range requiredFields {
if _, ok := raw[field]; !ok {
return nil, fmt.Errorf("%s is required", field)
}
}
var name string
if err := json.Unmarshal(raw["name"], &name); err != nil || strings.TrimSpace(name) == "" {
return nil, fmt.Errorf("name is required")
}
name = strings.TrimSpace(name)
var metricType string
if err := json.Unmarshal(raw["metric_type"], &metricType); err != nil || strings.TrimSpace(metricType) == "" {
return nil, fmt.Errorf("metric_type is required")
}
metricType = strings.TrimSpace(metricType)
if _, ok := validOpsAlertMetricTypeSet[metricType]; !ok {
return nil, fmt.Errorf("metric_type must be one of: %s", strings.Join(validOpsAlertMetricTypes, ", "))
}
var operator string
if err := json.Unmarshal(raw["operator"], &operator); err != nil || strings.TrimSpace(operator) == "" {
return nil, fmt.Errorf("operator is required")
}
operator = strings.TrimSpace(operator)
if _, ok := validOpsAlertOperatorSet[operator]; !ok {
return nil, fmt.Errorf("operator must be one of: %s", strings.Join(validOpsAlertOperators, ", "))
}
var threshold float64
if err := json.Unmarshal(raw["threshold"], &threshold); err != nil {
return nil, fmt.Errorf("threshold must be a number")
}
if math.IsNaN(threshold) || math.IsInf(threshold, 0) {
return nil, fmt.Errorf("threshold must be a finite number")
}
if isPercentOrRateMetric(metricType) {
if threshold < 0 || threshold > 100 {
return nil, fmt.Errorf("threshold must be between 0 and 100 for metric_type %s", metricType)
}
} else if threshold < 0 {
return nil, fmt.Errorf("threshold must be >= 0")
}
validated := &opsAlertRuleValidatedInput{
Name: name,
MetricType: metricType,
Operator: operator,
Threshold: threshold,
}
if v, ok := raw["severity"]; ok {
validated.SeverityProvided = true
var sev string
if err := json.Unmarshal(v, &sev); err != nil {
return nil, fmt.Errorf("severity must be a string")
}
sev = strings.ToUpper(strings.TrimSpace(sev))
if sev != "" {
if _, ok := validOpsAlertSeveritySet[sev]; !ok {
return nil, fmt.Errorf("severity must be one of: %s", strings.Join(validOpsAlertSeverities, ", "))
}
validated.Severity = sev
}
}
if validated.Severity == "" {
validated.Severity = "P2"
}
if v, ok := raw["enabled"]; ok {
validated.EnabledProvided = true
if err := json.Unmarshal(v, &validated.Enabled); err != nil {
return nil, fmt.Errorf("enabled must be a boolean")
}
} else {
validated.Enabled = true
}
if v, ok := raw["notify_email"]; ok {
validated.NotifyProvided = true
if err := json.Unmarshal(v, &validated.NotifyEmail); err != nil {
return nil, fmt.Errorf("notify_email must be a boolean")
}
} else {
validated.NotifyEmail = true
}
if v, ok := raw["window_minutes"]; ok {
validated.WindowProvided = true
if err := json.Unmarshal(v, &validated.WindowMinutes); err != nil {
return nil, fmt.Errorf("window_minutes must be an integer")
}
switch validated.WindowMinutes {
case 1, 5, 60:
default:
return nil, fmt.Errorf("window_minutes must be one of: 1, 5, 60")
}
} else {
validated.WindowMinutes = 1
}
if v, ok := raw["sustained_minutes"]; ok {
validated.SustainedProvided = true
if err := json.Unmarshal(v, &validated.SustainedMinutes); err != nil {
return nil, fmt.Errorf("sustained_minutes must be an integer")
}
if validated.SustainedMinutes < 1 || validated.SustainedMinutes > 1440 {
return nil, fmt.Errorf("sustained_minutes must be between 1 and 1440")
}
} else {
validated.SustainedMinutes = 1
}
if v, ok := raw["cooldown_minutes"]; ok {
validated.CooldownProvided = true
if err := json.Unmarshal(v, &validated.CooldownMinutes); err != nil {
return nil, fmt.Errorf("cooldown_minutes must be an integer")
}
if validated.CooldownMinutes < 0 || validated.CooldownMinutes > 1440 {
return nil, fmt.Errorf("cooldown_minutes must be between 0 and 1440")
}
} else {
validated.CooldownMinutes = 0
}
return validated, nil
}
// ListAlertRules returns all ops alert rules.
// GET /api/v1/admin/ops/alert-rules
func (h *OpsHandler) ListAlertRules(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
rules, err := h.opsService.ListAlertRules(c.Request.Context())
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, rules)
}
// CreateAlertRule creates an ops alert rule.
// POST /api/v1/admin/ops/alert-rules
func (h *OpsHandler) CreateAlertRule(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
var raw map[string]json.RawMessage
if err := c.ShouldBindBodyWith(&raw, binding.JSON); err != nil {
response.BadRequest(c, "Invalid request body")
return
}
validated, err := validateOpsAlertRulePayload(raw)
if err != nil {
response.BadRequest(c, err.Error())
return
}
var rule service.OpsAlertRule
if err := c.ShouldBindBodyWith(&rule, binding.JSON); err != nil {
response.BadRequest(c, "Invalid request body")
return
}
rule.Name = validated.Name
rule.MetricType = validated.MetricType
rule.Operator = validated.Operator
rule.Threshold = validated.Threshold
rule.WindowMinutes = validated.WindowMinutes
rule.SustainedMinutes = validated.SustainedMinutes
rule.CooldownMinutes = validated.CooldownMinutes
rule.Severity = validated.Severity
rule.Enabled = validated.Enabled
rule.NotifyEmail = validated.NotifyEmail
created, err := h.opsService.CreateAlertRule(c.Request.Context(), &rule)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, created)
}
// UpdateAlertRule updates an existing ops alert rule.
// PUT /api/v1/admin/ops/alert-rules/:id
func (h *OpsHandler) UpdateAlertRule(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
id, err := strconv.ParseInt(c.Param("id"), 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid rule ID")
return
}
var raw map[string]json.RawMessage
if err := c.ShouldBindBodyWith(&raw, binding.JSON); err != nil {
response.BadRequest(c, "Invalid request body")
return
}
validated, err := validateOpsAlertRulePayload(raw)
if err != nil {
response.BadRequest(c, err.Error())
return
}
var rule service.OpsAlertRule
if err := c.ShouldBindBodyWith(&rule, binding.JSON); err != nil {
response.BadRequest(c, "Invalid request body")
return
}
rule.ID = id
rule.Name = validated.Name
rule.MetricType = validated.MetricType
rule.Operator = validated.Operator
rule.Threshold = validated.Threshold
rule.WindowMinutes = validated.WindowMinutes
rule.SustainedMinutes = validated.SustainedMinutes
rule.CooldownMinutes = validated.CooldownMinutes
rule.Severity = validated.Severity
rule.Enabled = validated.Enabled
rule.NotifyEmail = validated.NotifyEmail
updated, err := h.opsService.UpdateAlertRule(c.Request.Context(), &rule)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, updated)
}
// DeleteAlertRule deletes an ops alert rule.
// DELETE /api/v1/admin/ops/alert-rules/:id
func (h *OpsHandler) DeleteAlertRule(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
id, err := strconv.ParseInt(c.Param("id"), 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid rule ID")
return
}
if err := h.opsService.DeleteAlertRule(c.Request.Context(), id); err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, gin.H{"deleted": true})
}
// ListAlertEvents lists recent ops alert events.
// GET /api/v1/admin/ops/alert-events
func (h *OpsHandler) ListAlertEvents(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
limit := 100
if raw := strings.TrimSpace(c.Query("limit")); raw != "" {
n, err := strconv.Atoi(raw)
if err != nil || n <= 0 {
response.BadRequest(c, "Invalid limit")
return
}
limit = n
}
filter := &service.OpsAlertEventFilter{
Limit: limit,
Status: strings.TrimSpace(c.Query("status")),
Severity: strings.TrimSpace(c.Query("severity")),
}
// Optional global filter support (platform/group/time range).
if platform := strings.TrimSpace(c.Query("platform")); platform != "" {
filter.Platform = platform
}
if v := strings.TrimSpace(c.Query("group_id")); v != "" {
id, err := strconv.ParseInt(v, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid group_id")
return
}
filter.GroupID = &id
}
if startTime, endTime, err := parseOpsTimeRange(c, "24h"); err == nil {
// Only apply when explicitly provided to avoid surprising default narrowing.
if strings.TrimSpace(c.Query("start_time")) != "" || strings.TrimSpace(c.Query("end_time")) != "" || strings.TrimSpace(c.Query("time_range")) != "" {
filter.StartTime = &startTime
filter.EndTime = &endTime
}
} else {
response.BadRequest(c, err.Error())
return
}
events, err := h.opsService.ListAlertEvents(c.Request.Context(), filter)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, events)
}
package admin
import (
"net/http"
"strconv"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/response"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
)
// GetDashboardOverview returns vNext ops dashboard overview (raw path).
// GET /api/v1/admin/ops/dashboard/overview
func (h *OpsHandler) GetDashboardOverview(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
startTime, endTime, err := parseOpsTimeRange(c, "1h")
if err != nil {
response.BadRequest(c, err.Error())
return
}
filter := &service.OpsDashboardFilter{
StartTime: startTime,
EndTime: endTime,
Platform: strings.TrimSpace(c.Query("platform")),
QueryMode: parseOpsQueryMode(c),
}
if v := strings.TrimSpace(c.Query("group_id")); v != "" {
id, err := strconv.ParseInt(v, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid group_id")
return
}
filter.GroupID = &id
}
data, err := h.opsService.GetDashboardOverview(c.Request.Context(), filter)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, data)
}
// GetDashboardThroughputTrend returns throughput time series (raw path).
// GET /api/v1/admin/ops/dashboard/throughput-trend
func (h *OpsHandler) GetDashboardThroughputTrend(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
startTime, endTime, err := parseOpsTimeRange(c, "1h")
if err != nil {
response.BadRequest(c, err.Error())
return
}
filter := &service.OpsDashboardFilter{
StartTime: startTime,
EndTime: endTime,
Platform: strings.TrimSpace(c.Query("platform")),
QueryMode: parseOpsQueryMode(c),
}
if v := strings.TrimSpace(c.Query("group_id")); v != "" {
id, err := strconv.ParseInt(v, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid group_id")
return
}
filter.GroupID = &id
}
bucketSeconds := pickThroughputBucketSeconds(endTime.Sub(startTime))
data, err := h.opsService.GetThroughputTrend(c.Request.Context(), filter, bucketSeconds)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, data)
}
// GetDashboardLatencyHistogram returns the latency distribution histogram (success requests).
// GET /api/v1/admin/ops/dashboard/latency-histogram
func (h *OpsHandler) GetDashboardLatencyHistogram(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
startTime, endTime, err := parseOpsTimeRange(c, "1h")
if err != nil {
response.BadRequest(c, err.Error())
return
}
filter := &service.OpsDashboardFilter{
StartTime: startTime,
EndTime: endTime,
Platform: strings.TrimSpace(c.Query("platform")),
QueryMode: parseOpsQueryMode(c),
}
if v := strings.TrimSpace(c.Query("group_id")); v != "" {
id, err := strconv.ParseInt(v, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid group_id")
return
}
filter.GroupID = &id
}
data, err := h.opsService.GetLatencyHistogram(c.Request.Context(), filter)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, data)
}
// GetDashboardErrorTrend returns error counts time series (raw path).
// GET /api/v1/admin/ops/dashboard/error-trend
func (h *OpsHandler) GetDashboardErrorTrend(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
startTime, endTime, err := parseOpsTimeRange(c, "1h")
if err != nil {
response.BadRequest(c, err.Error())
return
}
filter := &service.OpsDashboardFilter{
StartTime: startTime,
EndTime: endTime,
Platform: strings.TrimSpace(c.Query("platform")),
QueryMode: parseOpsQueryMode(c),
}
if v := strings.TrimSpace(c.Query("group_id")); v != "" {
id, err := strconv.ParseInt(v, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid group_id")
return
}
filter.GroupID = &id
}
bucketSeconds := pickThroughputBucketSeconds(endTime.Sub(startTime))
data, err := h.opsService.GetErrorTrend(c.Request.Context(), filter, bucketSeconds)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, data)
}
// GetDashboardErrorDistribution returns error distribution by status code (raw path).
// GET /api/v1/admin/ops/dashboard/error-distribution
func (h *OpsHandler) GetDashboardErrorDistribution(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
startTime, endTime, err := parseOpsTimeRange(c, "1h")
if err != nil {
response.BadRequest(c, err.Error())
return
}
filter := &service.OpsDashboardFilter{
StartTime: startTime,
EndTime: endTime,
Platform: strings.TrimSpace(c.Query("platform")),
QueryMode: parseOpsQueryMode(c),
}
if v := strings.TrimSpace(c.Query("group_id")); v != "" {
id, err := strconv.ParseInt(v, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid group_id")
return
}
filter.GroupID = &id
}
data, err := h.opsService.GetErrorDistribution(c.Request.Context(), filter)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, data)
}
func pickThroughputBucketSeconds(window time.Duration) int {
// Keep buckets predictable and avoid huge responses.
switch {
case window <= 2*time.Hour:
return 60
case window <= 24*time.Hour:
return 300
default:
return 3600
}
}
func parseOpsQueryMode(c *gin.Context) service.OpsQueryMode {
if c == nil {
return ""
}
raw := strings.TrimSpace(c.Query("mode"))
if raw == "" {
// Empty means "use server default" (DB setting ops_query_mode_default).
return ""
}
return service.ParseOpsQueryMode(raw)
}
package admin
import (
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/response"
"github.com/Wei-Shaw/sub2api/internal/server/middleware"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
)
type OpsHandler struct {
opsService *service.OpsService
}
func NewOpsHandler(opsService *service.OpsService) *OpsHandler {
return &OpsHandler{opsService: opsService}
}
// GetErrorLogs lists ops error logs.
// GET /api/v1/admin/ops/errors
func (h *OpsHandler) GetErrorLogs(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
page, pageSize := response.ParsePagination(c)
// Ops list can be larger than standard admin tables.
if pageSize > 500 {
pageSize = 500
}
startTime, endTime, err := parseOpsTimeRange(c, "1h")
if err != nil {
response.BadRequest(c, err.Error())
return
}
filter := &service.OpsErrorLogFilter{
Page: page,
PageSize: pageSize,
}
if !startTime.IsZero() {
filter.StartTime = &startTime
}
if !endTime.IsZero() {
filter.EndTime = &endTime
}
if platform := strings.TrimSpace(c.Query("platform")); platform != "" {
filter.Platform = platform
}
if v := strings.TrimSpace(c.Query("group_id")); v != "" {
id, err := strconv.ParseInt(v, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid group_id")
return
}
filter.GroupID = &id
}
if v := strings.TrimSpace(c.Query("account_id")); v != "" {
id, err := strconv.ParseInt(v, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid account_id")
return
}
filter.AccountID = &id
}
if phase := strings.TrimSpace(c.Query("phase")); phase != "" {
filter.Phase = phase
}
if q := strings.TrimSpace(c.Query("q")); q != "" {
filter.Query = q
}
if statusCodesStr := strings.TrimSpace(c.Query("status_codes")); statusCodesStr != "" {
parts := strings.Split(statusCodesStr, ",")
out := make([]int, 0, len(parts))
for _, part := range parts {
p := strings.TrimSpace(part)
if p == "" {
continue
}
n, err := strconv.Atoi(p)
if err != nil || n < 0 {
response.BadRequest(c, "Invalid status_codes")
return
}
out = append(out, n)
}
filter.StatusCodes = out
}
result, err := h.opsService.GetErrorLogs(c.Request.Context(), filter)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Paginated(c, result.Errors, int64(result.Total), result.Page, result.PageSize)
}
// GetErrorLogByID returns a single error log detail.
// GET /api/v1/admin/ops/errors/:id
func (h *OpsHandler) GetErrorLogByID(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
idStr := strings.TrimSpace(c.Param("id"))
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid error id")
return
}
detail, err := h.opsService.GetErrorLogByID(c.Request.Context(), id)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, detail)
}
// ListRequestDetails returns a request-level list (success + error) for drill-down.
// GET /api/v1/admin/ops/requests
func (h *OpsHandler) ListRequestDetails(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
page, pageSize := response.ParsePagination(c)
if pageSize > 100 {
pageSize = 100
}
startTime, endTime, err := parseOpsTimeRange(c, "1h")
if err != nil {
response.BadRequest(c, err.Error())
return
}
filter := &service.OpsRequestDetailFilter{
Page: page,
PageSize: pageSize,
StartTime: &startTime,
EndTime: &endTime,
}
filter.Kind = strings.TrimSpace(c.Query("kind"))
filter.Platform = strings.TrimSpace(c.Query("platform"))
filter.Model = strings.TrimSpace(c.Query("model"))
filter.RequestID = strings.TrimSpace(c.Query("request_id"))
filter.Query = strings.TrimSpace(c.Query("q"))
filter.Sort = strings.TrimSpace(c.Query("sort"))
if v := strings.TrimSpace(c.Query("user_id")); v != "" {
id, err := strconv.ParseInt(v, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid user_id")
return
}
filter.UserID = &id
}
if v := strings.TrimSpace(c.Query("api_key_id")); v != "" {
id, err := strconv.ParseInt(v, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid api_key_id")
return
}
filter.APIKeyID = &id
}
if v := strings.TrimSpace(c.Query("account_id")); v != "" {
id, err := strconv.ParseInt(v, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid account_id")
return
}
filter.AccountID = &id
}
if v := strings.TrimSpace(c.Query("group_id")); v != "" {
id, err := strconv.ParseInt(v, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid group_id")
return
}
filter.GroupID = &id
}
if v := strings.TrimSpace(c.Query("min_duration_ms")); v != "" {
parsed, err := strconv.Atoi(v)
if err != nil || parsed < 0 {
response.BadRequest(c, "Invalid min_duration_ms")
return
}
filter.MinDurationMs = &parsed
}
if v := strings.TrimSpace(c.Query("max_duration_ms")); v != "" {
parsed, err := strconv.Atoi(v)
if err != nil || parsed < 0 {
response.BadRequest(c, "Invalid max_duration_ms")
return
}
filter.MaxDurationMs = &parsed
}
out, err := h.opsService.ListRequestDetails(c.Request.Context(), filter)
if err != nil {
// Invalid sort/kind/platform etc should be a bad request; keep it simple.
if strings.Contains(strings.ToLower(err.Error()), "invalid") {
response.BadRequest(c, err.Error())
return
}
response.Error(c, http.StatusInternalServerError, "Failed to list request details")
return
}
response.Paginated(c, out.Items, out.Total, out.Page, out.PageSize)
}
type opsRetryRequest struct {
Mode string `json:"mode"`
PinnedAccountID *int64 `json:"pinned_account_id"`
}
// RetryErrorRequest retries a failed request using stored request_body.
// POST /api/v1/admin/ops/errors/:id/retry
func (h *OpsHandler) RetryErrorRequest(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
subject, ok := middleware.GetAuthSubjectFromContext(c)
if !ok || subject.UserID <= 0 {
response.Error(c, http.StatusUnauthorized, "Unauthorized")
return
}
idStr := strings.TrimSpace(c.Param("id"))
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid error id")
return
}
req := opsRetryRequest{Mode: service.OpsRetryModeClient}
if err := c.ShouldBindJSON(&req); err != nil && !errors.Is(err, io.EOF) {
response.BadRequest(c, "Invalid request: "+err.Error())
return
}
if strings.TrimSpace(req.Mode) == "" {
req.Mode = service.OpsRetryModeClient
}
result, err := h.opsService.RetryError(c.Request.Context(), subject.UserID, id, req.Mode, req.PinnedAccountID)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, result)
}
func parseOpsTimeRange(c *gin.Context, defaultRange string) (time.Time, time.Time, error) {
startStr := strings.TrimSpace(c.Query("start_time"))
endStr := strings.TrimSpace(c.Query("end_time"))
parseTS := func(s string) (time.Time, error) {
if s == "" {
return time.Time{}, nil
}
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
return t, nil
}
return time.Parse(time.RFC3339, s)
}
start, err := parseTS(startStr)
if err != nil {
return time.Time{}, time.Time{}, err
}
end, err := parseTS(endStr)
if err != nil {
return time.Time{}, time.Time{}, err
}
// start/end explicitly provided (even partially)
if startStr != "" || endStr != "" {
if end.IsZero() {
end = time.Now()
}
if start.IsZero() {
dur, _ := parseOpsDuration(defaultRange)
start = end.Add(-dur)
}
if start.After(end) {
return time.Time{}, time.Time{}, fmt.Errorf("invalid time range: start_time must be <= end_time")
}
if end.Sub(start) > 30*24*time.Hour {
return time.Time{}, time.Time{}, fmt.Errorf("invalid time range: max window is 30 days")
}
return start, end, nil
}
// time_range fallback
tr := strings.TrimSpace(c.Query("time_range"))
if tr == "" {
tr = defaultRange
}
dur, ok := parseOpsDuration(tr)
if !ok {
dur, _ = parseOpsDuration(defaultRange)
}
end = time.Now()
start = end.Add(-dur)
if end.Sub(start) > 30*24*time.Hour {
return time.Time{}, time.Time{}, fmt.Errorf("invalid time range: max window is 30 days")
}
return start, end, nil
}
func parseOpsDuration(v string) (time.Duration, bool) {
switch strings.TrimSpace(v) {
case "5m":
return 5 * time.Minute, true
case "30m":
return 30 * time.Minute, true
case "1h":
return time.Hour, true
case "6h":
return 6 * time.Hour, true
case "24h":
return 24 * time.Hour, true
default:
return 0, false
}
}
package admin
import (
"net/http"
"strconv"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/response"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
)
// GetConcurrencyStats returns real-time concurrency usage aggregated by platform/group/account.
// GET /api/v1/admin/ops/concurrency
func (h *OpsHandler) GetConcurrencyStats(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
if !h.opsService.IsRealtimeMonitoringEnabled(c.Request.Context()) {
response.Success(c, gin.H{
"enabled": false,
"platform": map[string]*service.PlatformConcurrencyInfo{},
"group": map[int64]*service.GroupConcurrencyInfo{},
"account": map[int64]*service.AccountConcurrencyInfo{},
"timestamp": time.Now().UTC(),
})
return
}
platformFilter := strings.TrimSpace(c.Query("platform"))
var groupID *int64
if v := strings.TrimSpace(c.Query("group_id")); v != "" {
id, err := strconv.ParseInt(v, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid group_id")
return
}
groupID = &id
}
platform, group, account, collectedAt, err := h.opsService.GetConcurrencyStats(c.Request.Context(), platformFilter, groupID)
if err != nil {
response.ErrorFrom(c, err)
return
}
payload := gin.H{
"enabled": true,
"platform": platform,
"group": group,
"account": account,
}
if collectedAt != nil {
payload["timestamp"] = collectedAt.UTC()
}
response.Success(c, payload)
}
// GetAccountAvailability returns account availability statistics.
// GET /api/v1/admin/ops/account-availability
//
// Query params:
// - platform: optional
// - group_id: optional
func (h *OpsHandler) GetAccountAvailability(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
if !h.opsService.IsRealtimeMonitoringEnabled(c.Request.Context()) {
response.Success(c, gin.H{
"enabled": false,
"platform": map[string]*service.PlatformAvailability{},
"group": map[int64]*service.GroupAvailability{},
"account": map[int64]*service.AccountAvailability{},
"timestamp": time.Now().UTC(),
})
return
}
platform := strings.TrimSpace(c.Query("platform"))
var groupID *int64
if v := strings.TrimSpace(c.Query("group_id")); v != "" {
id, err := strconv.ParseInt(v, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid group_id")
return
}
groupID = &id
}
platformStats, groupStats, accountStats, collectedAt, err := h.opsService.GetAccountAvailabilityStats(c.Request.Context(), platform, groupID)
if err != nil {
response.ErrorFrom(c, err)
return
}
payload := gin.H{
"enabled": true,
"platform": platformStats,
"group": groupStats,
"account": accountStats,
}
if collectedAt != nil {
payload["timestamp"] = collectedAt.UTC()
}
response.Success(c, payload)
}
package admin
import (
"net/http"
"github.com/Wei-Shaw/sub2api/internal/pkg/response"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
)
// GetEmailNotificationConfig returns Ops email notification config (DB-backed).
// GET /api/v1/admin/ops/email-notification/config
func (h *OpsHandler) GetEmailNotificationConfig(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
cfg, err := h.opsService.GetEmailNotificationConfig(c.Request.Context())
if err != nil {
response.Error(c, http.StatusInternalServerError, "Failed to get email notification config")
return
}
response.Success(c, cfg)
}
// UpdateEmailNotificationConfig updates Ops email notification config (DB-backed).
// PUT /api/v1/admin/ops/email-notification/config
func (h *OpsHandler) UpdateEmailNotificationConfig(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
var req service.OpsEmailNotificationConfigUpdateRequest
if err := c.ShouldBindJSON(&req); err != nil {
response.BadRequest(c, "Invalid request body")
return
}
updated, err := h.opsService.UpdateEmailNotificationConfig(c.Request.Context(), &req)
if err != nil {
// Most failures here are validation errors from request payload; treat as 400.
response.Error(c, http.StatusBadRequest, err.Error())
return
}
response.Success(c, updated)
}
// GetAlertRuntimeSettings returns Ops alert evaluator runtime settings (DB-backed).
// GET /api/v1/admin/ops/runtime/alert
func (h *OpsHandler) GetAlertRuntimeSettings(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
cfg, err := h.opsService.GetOpsAlertRuntimeSettings(c.Request.Context())
if err != nil {
response.Error(c, http.StatusInternalServerError, "Failed to get alert runtime settings")
return
}
response.Success(c, cfg)
}
// UpdateAlertRuntimeSettings updates Ops alert evaluator runtime settings (DB-backed).
// PUT /api/v1/admin/ops/runtime/alert
func (h *OpsHandler) UpdateAlertRuntimeSettings(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
var req service.OpsAlertRuntimeSettings
if err := c.ShouldBindJSON(&req); err != nil {
response.BadRequest(c, "Invalid request body")
return
}
updated, err := h.opsService.UpdateOpsAlertRuntimeSettings(c.Request.Context(), &req)
if err != nil {
response.Error(c, http.StatusBadRequest, err.Error())
return
}
response.Success(c, updated)
}
// GetAdvancedSettings returns Ops advanced settings (DB-backed).
// GET /api/v1/admin/ops/advanced-settings
func (h *OpsHandler) GetAdvancedSettings(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
cfg, err := h.opsService.GetOpsAdvancedSettings(c.Request.Context())
if err != nil {
response.Error(c, http.StatusInternalServerError, "Failed to get advanced settings")
return
}
response.Success(c, cfg)
}
// UpdateAdvancedSettings updates Ops advanced settings (DB-backed).
// PUT /api/v1/admin/ops/advanced-settings
func (h *OpsHandler) UpdateAdvancedSettings(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
var req service.OpsAdvancedSettings
if err := c.ShouldBindJSON(&req); err != nil {
response.BadRequest(c, "Invalid request body")
return
}
updated, err := h.opsService.UpdateOpsAdvancedSettings(c.Request.Context(), &req)
if err != nil {
response.Error(c, http.StatusBadRequest, err.Error())
return
}
response.Success(c, updated)
}
package admin
import (
"context"
"encoding/json"
"log"
"math"
"net"
"net/http"
"net/netip"
"net/url"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
type OpsWSProxyConfig struct {
TrustProxy bool
TrustedProxies []netip.Prefix
OriginPolicy string
}
const (
envOpsWSTrustProxy = "OPS_WS_TRUST_PROXY"
envOpsWSTrustedProxies = "OPS_WS_TRUSTED_PROXIES"
envOpsWSOriginPolicy = "OPS_WS_ORIGIN_POLICY"
envOpsWSMaxConns = "OPS_WS_MAX_CONNS"
envOpsWSMaxConnsPerIP = "OPS_WS_MAX_CONNS_PER_IP"
)
const (
OriginPolicyStrict = "strict"
OriginPolicyPermissive = "permissive"
)
var opsWSProxyConfig = loadOpsWSProxyConfigFromEnv()
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return isAllowedOpsWSOrigin(r)
},
// Subprotocol negotiation:
// - The frontend passes ["sub2api-admin", "jwt.<token>"].
// - We always select "sub2api-admin" so the token is never echoed back in the handshake response.
Subprotocols: []string{"sub2api-admin"},
}
const (
qpsWSPushInterval = 2 * time.Second
qpsWSRefreshInterval = 5 * time.Second
qpsWSRequestCountWindow = 1 * time.Minute
defaultMaxWSConns = 100
defaultMaxWSConnsPerIP = 20
)
var wsConnCount atomic.Int32
var wsConnCountByIP sync.Map // map[string]*atomic.Int32
const qpsWSIdleStopDelay = 30 * time.Second
const (
opsWSCloseRealtimeDisabled = 4001
)
var qpsWSIdleStopMu sync.Mutex
var qpsWSIdleStopTimer *time.Timer
func cancelQPSWSIdleStop() {
qpsWSIdleStopMu.Lock()
if qpsWSIdleStopTimer != nil {
qpsWSIdleStopTimer.Stop()
qpsWSIdleStopTimer = nil
}
qpsWSIdleStopMu.Unlock()
}
func scheduleQPSWSIdleStop() {
qpsWSIdleStopMu.Lock()
if qpsWSIdleStopTimer != nil {
qpsWSIdleStopMu.Unlock()
return
}
qpsWSIdleStopTimer = time.AfterFunc(qpsWSIdleStopDelay, func() {
// Only stop if truly idle at fire time.
if wsConnCount.Load() == 0 {
qpsWSCache.Stop()
}
qpsWSIdleStopMu.Lock()
qpsWSIdleStopTimer = nil
qpsWSIdleStopMu.Unlock()
})
qpsWSIdleStopMu.Unlock()
}
type opsWSRuntimeLimits struct {
MaxConns int32
MaxConnsPerIP int32
}
var opsWSLimits = loadOpsWSRuntimeLimitsFromEnv()
const (
qpsWSWriteTimeout = 10 * time.Second
qpsWSPongWait = 60 * time.Second
qpsWSPingInterval = 30 * time.Second
// We don't expect clients to send application messages; we only read to process control frames (Pong/Close).
qpsWSMaxReadBytes = 1024
)
type opsWSQPSCache struct {
refreshInterval time.Duration
requestCountWindow time.Duration
lastUpdatedUnixNano atomic.Int64
payload atomic.Value // []byte
opsService *service.OpsService
cancel context.CancelFunc
done chan struct{}
mu sync.Mutex
running bool
}
var qpsWSCache = &opsWSQPSCache{
refreshInterval: qpsWSRefreshInterval,
requestCountWindow: qpsWSRequestCountWindow,
}
func (c *opsWSQPSCache) start(opsService *service.OpsService) {
if c == nil || opsService == nil {
return
}
for {
c.mu.Lock()
if c.running {
c.mu.Unlock()
return
}
// If a previous refresh loop is currently stopping, wait for it to fully exit.
done := c.done
if done != nil {
c.mu.Unlock()
<-done
c.mu.Lock()
if c.done == done && !c.running {
c.done = nil
}
c.mu.Unlock()
continue
}
c.opsService = opsService
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
c.done = make(chan struct{})
done = c.done
c.running = true
c.mu.Unlock()
go func() {
defer close(done)
c.refreshLoop(ctx)
}()
return
}
}
// Stop stops the background refresh loop.
// It is safe to call multiple times.
func (c *opsWSQPSCache) Stop() {
if c == nil {
return
}
c.mu.Lock()
if !c.running {
done := c.done
c.mu.Unlock()
if done != nil {
<-done
}
return
}
cancel := c.cancel
c.cancel = nil
c.running = false
c.opsService = nil
done := c.done
c.mu.Unlock()
if cancel != nil {
cancel()
}
if done != nil {
<-done
}
c.mu.Lock()
if c.done == done && !c.running {
c.done = nil
}
c.mu.Unlock()
}
func (c *opsWSQPSCache) refreshLoop(ctx context.Context) {
ticker := time.NewTicker(c.refreshInterval)
defer ticker.Stop()
c.refresh(ctx)
for {
select {
case <-ticker.C:
c.refresh(ctx)
case <-ctx.Done():
return
}
}
}
func (c *opsWSQPSCache) refresh(parentCtx context.Context) {
if c == nil {
return
}
c.mu.Lock()
opsService := c.opsService
c.mu.Unlock()
if opsService == nil {
return
}
if parentCtx == nil {
parentCtx = context.Background()
}
ctx, cancel := context.WithTimeout(parentCtx, 10*time.Second)
defer cancel()
now := time.Now().UTC()
stats, err := opsService.GetWindowStats(ctx, now.Add(-c.requestCountWindow), now)
if err != nil || stats == nil {
if err != nil {
log.Printf("[OpsWS] refresh: get window stats failed: %v", err)
}
return
}
requestCount := stats.SuccessCount + stats.ErrorCountTotal
qps := 0.0
tps := 0.0
if c.requestCountWindow > 0 {
seconds := c.requestCountWindow.Seconds()
qps = roundTo1DP(float64(requestCount) / seconds)
tps = roundTo1DP(float64(stats.TokenConsumed) / seconds)
}
payload := gin.H{
"type": "qps_update",
"timestamp": now.Format(time.RFC3339),
"data": gin.H{
"qps": qps,
"tps": tps,
"request_count": requestCount,
},
}
msg, err := json.Marshal(payload)
if err != nil {
log.Printf("[OpsWS] refresh: marshal payload failed: %v", err)
return
}
c.payload.Store(msg)
c.lastUpdatedUnixNano.Store(now.UnixNano())
}
func roundTo1DP(v float64) float64 {
return math.Round(v*10) / 10
}
func (c *opsWSQPSCache) getPayload() []byte {
if c == nil {
return nil
}
if cached, ok := c.payload.Load().([]byte); ok && cached != nil {
return cached
}
return nil
}
func closeWS(conn *websocket.Conn, code int, reason string) {
if conn == nil {
return
}
msg := websocket.FormatCloseMessage(code, reason)
_ = conn.WriteControl(websocket.CloseMessage, msg, time.Now().Add(qpsWSWriteTimeout))
_ = conn.Close()
}
// QPSWSHandler handles realtime QPS push via WebSocket.
// GET /api/v1/admin/ops/ws/qps
func (h *OpsHandler) QPSWSHandler(c *gin.Context) {
clientIP := requestClientIP(c.Request)
if h == nil || h.opsService == nil {
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "ops service not initialized"})
return
}
// If realtime monitoring is disabled, prefer a successful WS upgrade followed by a clean close
// with a deterministic close code. This prevents clients from spinning on 404/1006 reconnect loops.
if !h.opsService.IsRealtimeMonitoringEnabled(c.Request.Context()) {
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "ops realtime monitoring is disabled"})
return
}
closeWS(conn, opsWSCloseRealtimeDisabled, "realtime_disabled")
return
}
cancelQPSWSIdleStop()
// Lazily start the background refresh loop so unit tests that never hit the
// websocket route don't spawn goroutines that depend on DB/Redis stubs.
qpsWSCache.start(h.opsService)
// Reserve a global slot before upgrading the connection to keep the limit strict.
if !tryAcquireOpsWSTotalSlot(opsWSLimits.MaxConns) {
log.Printf("[OpsWS] connection limit reached: %d/%d", wsConnCount.Load(), opsWSLimits.MaxConns)
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "too many connections"})
return
}
defer func() {
if wsConnCount.Add(-1) == 0 {
scheduleQPSWSIdleStop()
}
}()
if opsWSLimits.MaxConnsPerIP > 0 && clientIP != "" {
if !tryAcquireOpsWSIPSlot(clientIP, opsWSLimits.MaxConnsPerIP) {
log.Printf("[OpsWS] per-ip connection limit reached: ip=%s limit=%d", clientIP, opsWSLimits.MaxConnsPerIP)
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "too many connections"})
return
}
defer releaseOpsWSIPSlot(clientIP)
}
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Printf("[OpsWS] upgrade failed: %v", err)
return
}
defer func() {
_ = conn.Close()
}()
handleQPSWebSocket(c.Request.Context(), conn)
}
func tryAcquireOpsWSTotalSlot(limit int32) bool {
if limit <= 0 {
return true
}
for {
current := wsConnCount.Load()
if current >= limit {
return false
}
if wsConnCount.CompareAndSwap(current, current+1) {
return true
}
}
}
func tryAcquireOpsWSIPSlot(clientIP string, limit int32) bool {
if strings.TrimSpace(clientIP) == "" || limit <= 0 {
return true
}
v, _ := wsConnCountByIP.LoadOrStore(clientIP, &atomic.Int32{})
counter, ok := v.(*atomic.Int32)
if !ok {
return false
}
for {
current := counter.Load()
if current >= limit {
return false
}
if counter.CompareAndSwap(current, current+1) {
return true
}
}
}
func releaseOpsWSIPSlot(clientIP string) {
if strings.TrimSpace(clientIP) == "" {
return
}
v, ok := wsConnCountByIP.Load(clientIP)
if !ok {
return
}
counter, ok := v.(*atomic.Int32)
if !ok {
return
}
next := counter.Add(-1)
if next <= 0 {
// Best-effort cleanup; safe even if a new slot was acquired concurrently.
wsConnCountByIP.Delete(clientIP)
}
}
func handleQPSWebSocket(parentCtx context.Context, conn *websocket.Conn) {
if conn == nil {
return
}
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()
var closeOnce sync.Once
closeConn := func() {
closeOnce.Do(func() {
_ = conn.Close()
})
}
closeFrameCh := make(chan []byte, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
conn.SetReadLimit(qpsWSMaxReadBytes)
if err := conn.SetReadDeadline(time.Now().Add(qpsWSPongWait)); err != nil {
log.Printf("[OpsWS] set read deadline failed: %v", err)
return
}
conn.SetPongHandler(func(string) error {
return conn.SetReadDeadline(time.Now().Add(qpsWSPongWait))
})
conn.SetCloseHandler(func(code int, text string) error {
select {
case closeFrameCh <- websocket.FormatCloseMessage(code, text):
default:
}
cancel()
return nil
})
for {
_, _, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) {
log.Printf("[OpsWS] read failed: %v", err)
}
return
}
}
}()
// Push QPS data every 2 seconds (values are globally cached and refreshed at most once per qpsWSRefreshInterval).
pushTicker := time.NewTicker(qpsWSPushInterval)
defer pushTicker.Stop()
// Heartbeat ping every 30 seconds.
pingTicker := time.NewTicker(qpsWSPingInterval)
defer pingTicker.Stop()
writeWithTimeout := func(messageType int, data []byte) error {
if err := conn.SetWriteDeadline(time.Now().Add(qpsWSWriteTimeout)); err != nil {
return err
}
return conn.WriteMessage(messageType, data)
}
sendClose := func(closeFrame []byte) {
if closeFrame == nil {
closeFrame = websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
}
_ = writeWithTimeout(websocket.CloseMessage, closeFrame)
}
for {
select {
case <-pushTicker.C:
msg := qpsWSCache.getPayload()
if msg == nil {
continue
}
if err := writeWithTimeout(websocket.TextMessage, msg); err != nil {
log.Printf("[OpsWS] write failed: %v", err)
cancel()
closeConn()
wg.Wait()
return
}
case <-pingTicker.C:
if err := writeWithTimeout(websocket.PingMessage, nil); err != nil {
log.Printf("[OpsWS] ping failed: %v", err)
cancel()
closeConn()
wg.Wait()
return
}
case closeFrame := <-closeFrameCh:
sendClose(closeFrame)
closeConn()
wg.Wait()
return
case <-ctx.Done():
var closeFrame []byte
select {
case closeFrame = <-closeFrameCh:
default:
}
sendClose(closeFrame)
closeConn()
wg.Wait()
return
}
}
}
func isAllowedOpsWSOrigin(r *http.Request) bool {
if r == nil {
return false
}
origin := strings.TrimSpace(r.Header.Get("Origin"))
if origin == "" {
switch strings.ToLower(strings.TrimSpace(opsWSProxyConfig.OriginPolicy)) {
case OriginPolicyStrict:
return false
case OriginPolicyPermissive, "":
return true
default:
return true
}
}
parsed, err := url.Parse(origin)
if err != nil || parsed.Hostname() == "" {
return false
}
originHost := strings.ToLower(parsed.Hostname())
trustProxyHeaders := shouldTrustOpsWSProxyHeaders(r)
reqHost := hostWithoutPort(r.Host)
if trustProxyHeaders {
xfHost := strings.TrimSpace(r.Header.Get("X-Forwarded-Host"))
if xfHost != "" {
xfHost = strings.TrimSpace(strings.Split(xfHost, ",")[0])
if xfHost != "" {
reqHost = hostWithoutPort(xfHost)
}
}
}
reqHost = strings.ToLower(reqHost)
if reqHost == "" {
return false
}
return originHost == reqHost
}
func shouldTrustOpsWSProxyHeaders(r *http.Request) bool {
if r == nil {
return false
}
if !opsWSProxyConfig.TrustProxy {
return false
}
peerIP, ok := requestPeerIP(r)
if !ok {
return false
}
return isAddrInTrustedProxies(peerIP, opsWSProxyConfig.TrustedProxies)
}
func requestPeerIP(r *http.Request) (netip.Addr, bool) {
if r == nil {
return netip.Addr{}, false
}
host, _, err := net.SplitHostPort(strings.TrimSpace(r.RemoteAddr))
if err != nil {
host = strings.TrimSpace(r.RemoteAddr)
}
host = strings.TrimPrefix(host, "[")
host = strings.TrimSuffix(host, "]")
if host == "" {
return netip.Addr{}, false
}
addr, err := netip.ParseAddr(host)
if err != nil {
return netip.Addr{}, false
}
return addr.Unmap(), true
}
func requestClientIP(r *http.Request) string {
if r == nil {
return ""
}
trustProxyHeaders := shouldTrustOpsWSProxyHeaders(r)
if trustProxyHeaders {
xff := strings.TrimSpace(r.Header.Get("X-Forwarded-For"))
if xff != "" {
// Use the left-most entry (original client). If multiple proxies add values, they are comma-separated.
xff = strings.TrimSpace(strings.Split(xff, ",")[0])
xff = strings.TrimPrefix(xff, "[")
xff = strings.TrimSuffix(xff, "]")
if addr, err := netip.ParseAddr(xff); err == nil && addr.IsValid() {
return addr.Unmap().String()
}
}
}
if peer, ok := requestPeerIP(r); ok && peer.IsValid() {
return peer.String()
}
return ""
}
func isAddrInTrustedProxies(addr netip.Addr, trusted []netip.Prefix) bool {
if !addr.IsValid() {
return false
}
for _, p := range trusted {
if p.Contains(addr) {
return true
}
}
return false
}
func loadOpsWSProxyConfigFromEnv() OpsWSProxyConfig {
cfg := OpsWSProxyConfig{
TrustProxy: true,
TrustedProxies: defaultTrustedProxies(),
OriginPolicy: OriginPolicyPermissive,
}
if v := strings.TrimSpace(os.Getenv(envOpsWSTrustProxy)); v != "" {
if parsed, err := strconv.ParseBool(v); err == nil {
cfg.TrustProxy = parsed
} else {
log.Printf("[OpsWS] invalid %s=%q (expected bool); using default=%v", envOpsWSTrustProxy, v, cfg.TrustProxy)
}
}
if raw := strings.TrimSpace(os.Getenv(envOpsWSTrustedProxies)); raw != "" {
prefixes, invalid := parseTrustedProxyList(raw)
if len(invalid) > 0 {
log.Printf("[OpsWS] invalid %s entries ignored: %s", envOpsWSTrustedProxies, strings.Join(invalid, ", "))
}
cfg.TrustedProxies = prefixes
}
if v := strings.TrimSpace(os.Getenv(envOpsWSOriginPolicy)); v != "" {
normalized := strings.ToLower(v)
switch normalized {
case OriginPolicyStrict, OriginPolicyPermissive:
cfg.OriginPolicy = normalized
default:
log.Printf("[OpsWS] invalid %s=%q (expected %q or %q); using default=%q", envOpsWSOriginPolicy, v, OriginPolicyStrict, OriginPolicyPermissive, cfg.OriginPolicy)
}
}
return cfg
}
func loadOpsWSRuntimeLimitsFromEnv() opsWSRuntimeLimits {
cfg := opsWSRuntimeLimits{
MaxConns: defaultMaxWSConns,
MaxConnsPerIP: defaultMaxWSConnsPerIP,
}
if v := strings.TrimSpace(os.Getenv(envOpsWSMaxConns)); v != "" {
if parsed, err := strconv.Atoi(v); err == nil && parsed > 0 {
cfg.MaxConns = int32(parsed)
} else {
log.Printf("[OpsWS] invalid %s=%q (expected int>0); using default=%d", envOpsWSMaxConns, v, cfg.MaxConns)
}
}
if v := strings.TrimSpace(os.Getenv(envOpsWSMaxConnsPerIP)); v != "" {
if parsed, err := strconv.Atoi(v); err == nil && parsed >= 0 {
cfg.MaxConnsPerIP = int32(parsed)
} else {
log.Printf("[OpsWS] invalid %s=%q (expected int>=0); using default=%d", envOpsWSMaxConnsPerIP, v, cfg.MaxConnsPerIP)
}
}
return cfg
}
func defaultTrustedProxies() []netip.Prefix {
prefixes, _ := parseTrustedProxyList("127.0.0.0/8,::1/128")
return prefixes
}
func parseTrustedProxyList(raw string) (prefixes []netip.Prefix, invalid []string) {
for _, token := range strings.Split(raw, ",") {
item := strings.TrimSpace(token)
if item == "" {
continue
}
var (
p netip.Prefix
err error
)
if strings.Contains(item, "/") {
p, err = netip.ParsePrefix(item)
} else {
var addr netip.Addr
addr, err = netip.ParseAddr(item)
if err == nil {
addr = addr.Unmap()
bits := 128
if addr.Is4() {
bits = 32
}
p = netip.PrefixFrom(addr, bits)
}
}
if err != nil || !p.IsValid() {
invalid = append(invalid, item)
continue
}
prefixes = append(prefixes, p.Masked())
}
return prefixes, invalid
}
func hostWithoutPort(hostport string) string {
hostport = strings.TrimSpace(hostport)
if hostport == "" {
return ""
}
if host, _, err := net.SplitHostPort(hostport); err == nil {
return host
}
if strings.HasPrefix(hostport, "[") && strings.HasSuffix(hostport, "]") {
return strings.Trim(hostport, "[]")
}
parts := strings.Split(hostport, ":")
return parts[0]
}
......@@ -19,14 +19,16 @@ type SettingHandler struct {
settingService *service.SettingService
emailService *service.EmailService
turnstileService *service.TurnstileService
opsService *service.OpsService
}
// NewSettingHandler 创建系统设置处理器
func NewSettingHandler(settingService *service.SettingService, emailService *service.EmailService, turnstileService *service.TurnstileService) *SettingHandler {
func NewSettingHandler(settingService *service.SettingService, emailService *service.EmailService, turnstileService *service.TurnstileService, opsService *service.OpsService) *SettingHandler {
return &SettingHandler{
settingService: settingService,
emailService: emailService,
turnstileService: turnstileService,
opsService: opsService,
}
}
......@@ -39,6 +41,9 @@ func (h *SettingHandler) GetSettings(c *gin.Context) {
return
}
// Check if ops monitoring is enabled (respects config.ops.enabled)
opsEnabled := h.opsService != nil && h.opsService.IsMonitoringEnabled(c.Request.Context())
response.Success(c, dto.SystemSettings{
RegistrationEnabled: settings.RegistrationEnabled,
EmailVerifyEnabled: settings.EmailVerifyEnabled,
......@@ -72,6 +77,10 @@ func (h *SettingHandler) GetSettings(c *gin.Context) {
FallbackModelAntigravity: settings.FallbackModelAntigravity,
EnableIdentityPatch: settings.EnableIdentityPatch,
IdentityPatchPrompt: settings.IdentityPatchPrompt,
OpsMonitoringEnabled: opsEnabled && settings.OpsMonitoringEnabled,
OpsRealtimeMonitoringEnabled: settings.OpsRealtimeMonitoringEnabled,
OpsQueryModeDefault: settings.OpsQueryModeDefault,
OpsMetricsIntervalSeconds: settings.OpsMetricsIntervalSeconds,
})
}
......@@ -95,7 +104,7 @@ type UpdateSettingsRequest struct {
TurnstileSiteKey string `json:"turnstile_site_key"`
TurnstileSecretKey string `json:"turnstile_secret_key"`
// LinuxDo Connect OAuth 登录(终端用户 SSO)
// LinuxDo Connect OAuth 登录
LinuxDoConnectEnabled bool `json:"linuxdo_connect_enabled"`
LinuxDoConnectClientID string `json:"linuxdo_connect_client_id"`
LinuxDoConnectClientSecret string `json:"linuxdo_connect_client_secret"`
......@@ -124,6 +133,12 @@ type UpdateSettingsRequest struct {
// Identity patch configuration (Claude -> Gemini)
EnableIdentityPatch bool `json:"enable_identity_patch"`
IdentityPatchPrompt string `json:"identity_patch_prompt"`
// Ops monitoring (vNext)
OpsMonitoringEnabled *bool `json:"ops_monitoring_enabled"`
OpsRealtimeMonitoringEnabled *bool `json:"ops_realtime_monitoring_enabled"`
OpsQueryModeDefault *string `json:"ops_query_mode_default"`
OpsMetricsIntervalSeconds *int `json:"ops_metrics_interval_seconds"`
}
// UpdateSettings 更新系统设置
......@@ -208,6 +223,18 @@ func (h *SettingHandler) UpdateSettings(c *gin.Context) {
}
}
// Ops metrics collector interval validation (seconds).
if req.OpsMetricsIntervalSeconds != nil {
v := *req.OpsMetricsIntervalSeconds
if v < 60 {
v = 60
}
if v > 3600 {
v = 3600
}
req.OpsMetricsIntervalSeconds = &v
}
settings := &service.SystemSettings{
RegistrationEnabled: req.RegistrationEnabled,
EmailVerifyEnabled: req.EmailVerifyEnabled,
......@@ -241,6 +268,30 @@ func (h *SettingHandler) UpdateSettings(c *gin.Context) {
FallbackModelAntigravity: req.FallbackModelAntigravity,
EnableIdentityPatch: req.EnableIdentityPatch,
IdentityPatchPrompt: req.IdentityPatchPrompt,
OpsMonitoringEnabled: func() bool {
if req.OpsMonitoringEnabled != nil {
return *req.OpsMonitoringEnabled
}
return previousSettings.OpsMonitoringEnabled
}(),
OpsRealtimeMonitoringEnabled: func() bool {
if req.OpsRealtimeMonitoringEnabled != nil {
return *req.OpsRealtimeMonitoringEnabled
}
return previousSettings.OpsRealtimeMonitoringEnabled
}(),
OpsQueryModeDefault: func() string {
if req.OpsQueryModeDefault != nil {
return *req.OpsQueryModeDefault
}
return previousSettings.OpsQueryModeDefault
}(),
OpsMetricsIntervalSeconds: func() int {
if req.OpsMetricsIntervalSeconds != nil {
return *req.OpsMetricsIntervalSeconds
}
return previousSettings.OpsMetricsIntervalSeconds
}(),
}
if err := h.settingService.UpdateSettings(c.Request.Context(), settings); err != nil {
......@@ -290,6 +341,10 @@ func (h *SettingHandler) UpdateSettings(c *gin.Context) {
FallbackModelAntigravity: updatedSettings.FallbackModelAntigravity,
EnableIdentityPatch: updatedSettings.EnableIdentityPatch,
IdentityPatchPrompt: updatedSettings.IdentityPatchPrompt,
OpsMonitoringEnabled: updatedSettings.OpsMonitoringEnabled,
OpsRealtimeMonitoringEnabled: updatedSettings.OpsRealtimeMonitoringEnabled,
OpsQueryModeDefault: updatedSettings.OpsQueryModeDefault,
OpsMetricsIntervalSeconds: updatedSettings.OpsMetricsIntervalSeconds,
})
}
......@@ -411,6 +466,18 @@ func diffSettings(before *service.SystemSettings, after *service.SystemSettings,
if before.IdentityPatchPrompt != after.IdentityPatchPrompt {
changed = append(changed, "identity_patch_prompt")
}
if before.OpsMonitoringEnabled != after.OpsMonitoringEnabled {
changed = append(changed, "ops_monitoring_enabled")
}
if before.OpsRealtimeMonitoringEnabled != after.OpsRealtimeMonitoringEnabled {
changed = append(changed, "ops_realtime_monitoring_enabled")
}
if before.OpsQueryModeDefault != after.OpsQueryModeDefault {
changed = append(changed, "ops_query_mode_default")
}
if before.OpsMetricsIntervalSeconds != after.OpsMetricsIntervalSeconds {
changed = append(changed, "ops_metrics_interval_seconds")
}
return changed
}
......
......@@ -43,6 +43,12 @@ type SystemSettings struct {
// Identity patch configuration (Claude -> Gemini)
EnableIdentityPatch bool `json:"enable_identity_patch"`
IdentityPatchPrompt string `json:"identity_patch_prompt"`
// Ops monitoring (vNext)
OpsMonitoringEnabled bool `json:"ops_monitoring_enabled"`
OpsRealtimeMonitoringEnabled bool `json:"ops_realtime_monitoring_enabled"`
OpsQueryModeDefault string `json:"ops_query_mode_default"`
OpsMetricsIntervalSeconds int `json:"ops_metrics_interval_seconds"`
}
type PublicSettings struct {
......
......@@ -15,7 +15,6 @@ import (
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
"github.com/Wei-Shaw/sub2api/internal/pkg/claude"
pkgerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/Wei-Shaw/sub2api/internal/pkg/ip"
"github.com/Wei-Shaw/sub2api/internal/pkg/openai"
middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware"
"github.com/Wei-Shaw/sub2api/internal/service"
......@@ -89,6 +88,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
return
}
setOpsRequestContext(c, "", false, body)
parsedReq, err := service.ParseGatewayRequest(body)
if err != nil {
h.errorResponse(c, http.StatusBadRequest, "invalid_request_error", "Failed to parse request body")
......@@ -97,8 +98,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
reqModel := parsedReq.Model
reqStream := parsedReq.Stream
// 设置 Claude Code 客户端标识到 context(用于分组限制检查)
SetClaudeCodeClientContext(c, body)
setOpsRequestContext(c, reqModel, reqStream, body)
// 验证 model 必填
if reqModel == "" {
......@@ -112,15 +112,10 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
// 获取订阅信息(可能为nil)- 提前获取用于后续检查
subscription, _ := middleware2.GetSubscriptionFromContext(c)
// 获取 User-Agent
userAgent := c.Request.UserAgent()
// 获取客户端 IP
clientIP := ip.GetClientIP(c)
// 0. 检查wait队列是否已满
maxWait := service.CalculateMaxWait(subject.Concurrency)
canWait, err := h.concurrencyHelper.IncrementWaitCount(c.Request.Context(), subject.UserID, maxWait)
waitCounted := false
if err != nil {
log.Printf("Increment wait count failed: %v", err)
// On error, allow request to proceed
......@@ -128,8 +123,15 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
h.errorResponse(c, http.StatusTooManyRequests, "rate_limit_error", "Too many pending requests, please retry later")
return
}
// 确保在函数退出时减少wait计数
defer h.concurrencyHelper.DecrementWaitCount(c.Request.Context(), subject.UserID)
if err == nil && canWait {
waitCounted = true
}
// Ensure we decrement if we exit before acquiring the user slot.
defer func() {
if waitCounted {
h.concurrencyHelper.DecrementWaitCount(c.Request.Context(), subject.UserID)
}
}()
// 1. 首先获取用户并发槽位
userReleaseFunc, err := h.concurrencyHelper.AcquireUserSlotWithWait(c, subject.UserID, subject.Concurrency, reqStream, &streamStarted)
......@@ -138,6 +140,11 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
h.handleConcurrencyError(c, err, "user", streamStarted)
return
}
// User slot acquired: no longer waiting in the queue.
if waitCounted {
h.concurrencyHelper.DecrementWaitCount(c.Request.Context(), subject.UserID)
waitCounted = false
}
// 在请求结束或 Context 取消时确保释放槽位,避免客户端断开造成泄漏
userReleaseFunc = wrapReleaseOnDone(c.Request.Context(), userReleaseFunc)
if userReleaseFunc != nil {
......@@ -184,6 +191,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
return
}
account := selection.Account
setOpsSelectedAccount(c, account.ID)
// 检查预热请求拦截(在账号选择后、转发前检查)
if account.IsInterceptWarmupEnabled() && isWarmupRequest(body) {
......@@ -200,12 +208,12 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
// 3. 获取账号并发槽位
accountReleaseFunc := selection.ReleaseFunc
var accountWaitRelease func()
if !selection.Acquired {
if selection.WaitPlan == nil {
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", streamStarted)
return
}
accountWaitCounted := false
canWait, err := h.concurrencyHelper.IncrementAccountWaitCount(c.Request.Context(), account.ID, selection.WaitPlan.MaxWaiting)
if err != nil {
log.Printf("Increment account wait count failed: %v", err)
......@@ -213,12 +221,16 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
log.Printf("Account wait queue full: account=%d", account.ID)
h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", "Too many pending requests, please retry later", streamStarted)
return
} else {
// Only set release function if increment succeeded
accountWaitRelease = func() {
}
if err == nil && canWait {
accountWaitCounted = true
}
// Ensure the wait counter is decremented if we exit before acquiring the slot.
defer func() {
if accountWaitCounted {
h.concurrencyHelper.DecrementAccountWaitCount(c.Request.Context(), account.ID)
}
}
}()
accountReleaseFunc, err = h.concurrencyHelper.AcquireAccountSlotWithWaitTimeout(
c,
......@@ -229,20 +241,21 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
&streamStarted,
)
if err != nil {
if accountWaitRelease != nil {
accountWaitRelease()
}
log.Printf("Account concurrency acquire failed: %v", err)
h.handleConcurrencyError(c, err, "account", streamStarted)
return
}
// Slot acquired: no longer waiting in queue.
if accountWaitCounted {
h.concurrencyHelper.DecrementAccountWaitCount(c.Request.Context(), account.ID)
accountWaitCounted = false
}
if err := h.gatewayService.BindStickySession(c.Request.Context(), apiKey.GroupID, sessionKey, account.ID); err != nil {
log.Printf("Bind sticky session failed: %v", err)
}
}
// 账号槽位/等待计数需要在超时或断开时安全回收
accountReleaseFunc = wrapReleaseOnDone(c.Request.Context(), accountReleaseFunc)
accountWaitRelease = wrapReleaseOnDone(c.Request.Context(), accountWaitRelease)
// 转发请求 - 根据账号平台分流
var result *service.ForwardResult
......@@ -254,9 +267,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
if accountReleaseFunc != nil {
accountReleaseFunc()
}
if accountWaitRelease != nil {
accountWaitRelease()
}
if err != nil {
var failoverErr *service.UpstreamFailoverError
if errors.As(err, &failoverErr) {
......@@ -277,7 +287,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
}
// 异步记录使用量(subscription已在函数开头获取)
go func(result *service.ForwardResult, usedAccount *service.Account, ua string, cip string) {
go func(result *service.ForwardResult, usedAccount *service.Account) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := h.gatewayService.RecordUsage(ctx, &service.RecordUsageInput{
......@@ -286,12 +296,10 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
User: apiKey.User,
Account: usedAccount,
Subscription: subscription,
UserAgent: ua,
IPAddress: cip,
}); err != nil {
log.Printf("Record usage failed: %v", err)
}
}(result, account, userAgent, clientIP)
}(result, account)
return
}
}
......@@ -313,6 +321,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
return
}
account := selection.Account
setOpsSelectedAccount(c, account.ID)
// 检查预热请求拦截(在账号选择后、转发前检查)
if account.IsInterceptWarmupEnabled() && isWarmupRequest(body) {
......@@ -329,12 +338,12 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
// 3. 获取账号并发槽位
accountReleaseFunc := selection.ReleaseFunc
var accountWaitRelease func()
if !selection.Acquired {
if selection.WaitPlan == nil {
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", streamStarted)
return
}
accountWaitCounted := false
canWait, err := h.concurrencyHelper.IncrementAccountWaitCount(c.Request.Context(), account.ID, selection.WaitPlan.MaxWaiting)
if err != nil {
log.Printf("Increment account wait count failed: %v", err)
......@@ -342,12 +351,15 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
log.Printf("Account wait queue full: account=%d", account.ID)
h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", "Too many pending requests, please retry later", streamStarted)
return
} else {
// Only set release function if increment succeeded
accountWaitRelease = func() {
}
if err == nil && canWait {
accountWaitCounted = true
}
defer func() {
if accountWaitCounted {
h.concurrencyHelper.DecrementAccountWaitCount(c.Request.Context(), account.ID)
}
}
}()
accountReleaseFunc, err = h.concurrencyHelper.AcquireAccountSlotWithWaitTimeout(
c,
......@@ -358,20 +370,20 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
&streamStarted,
)
if err != nil {
if accountWaitRelease != nil {
accountWaitRelease()
}
log.Printf("Account concurrency acquire failed: %v", err)
h.handleConcurrencyError(c, err, "account", streamStarted)
return
}
if accountWaitCounted {
h.concurrencyHelper.DecrementAccountWaitCount(c.Request.Context(), account.ID)
accountWaitCounted = false
}
if err := h.gatewayService.BindStickySession(c.Request.Context(), apiKey.GroupID, sessionKey, account.ID); err != nil {
log.Printf("Bind sticky session failed: %v", err)
}
}
// 账号槽位/等待计数需要在超时或断开时安全回收
accountReleaseFunc = wrapReleaseOnDone(c.Request.Context(), accountReleaseFunc)
accountWaitRelease = wrapReleaseOnDone(c.Request.Context(), accountWaitRelease)
// 转发请求 - 根据账号平台分流
var result *service.ForwardResult
......@@ -383,9 +395,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
if accountReleaseFunc != nil {
accountReleaseFunc()
}
if accountWaitRelease != nil {
accountWaitRelease()
}
if err != nil {
var failoverErr *service.UpstreamFailoverError
if errors.As(err, &failoverErr) {
......@@ -406,7 +415,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
}
// 异步记录使用量(subscription已在函数开头获取)
go func(result *service.ForwardResult, usedAccount *service.Account, ua string, cip string) {
go func(result *service.ForwardResult, usedAccount *service.Account) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := h.gatewayService.RecordUsage(ctx, &service.RecordUsageInput{
......@@ -415,12 +424,10 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
User: apiKey.User,
Account: usedAccount,
Subscription: subscription,
UserAgent: ua,
IPAddress: cip,
}); err != nil {
log.Printf("Record usage failed: %v", err)
}
}(result, account, userAgent, clientIP)
}(result, account)
return
}
}
......@@ -686,21 +693,22 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) {
return
}
setOpsRequestContext(c, "", false, body)
parsedReq, err := service.ParseGatewayRequest(body)
if err != nil {
h.errorResponse(c, http.StatusBadRequest, "invalid_request_error", "Failed to parse request body")
return
}
// 设置 Claude Code 客户端标识到 context(用于分组限制检查)
SetClaudeCodeClientContext(c, body)
// 验证 model 必填
if parsedReq.Model == "" {
h.errorResponse(c, http.StatusBadRequest, "invalid_request_error", "model is required")
return
}
setOpsRequestContext(c, parsedReq.Model, parsedReq.Stream, body)
// 获取订阅信息(可能为nil)
subscription, _ := middleware2.GetSubscriptionFromContext(c)
......@@ -721,6 +729,7 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) {
h.errorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error())
return
}
setOpsSelectedAccount(c, account.ID)
// 转发请求(不记录使用量)
if err := h.gatewayService.ForwardCountTokens(c.Request.Context(), c, account, parsedReq); err != nil {
......
......@@ -12,7 +12,6 @@ import (
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
"github.com/Wei-Shaw/sub2api/internal/pkg/gemini"
"github.com/Wei-Shaw/sub2api/internal/pkg/googleapi"
"github.com/Wei-Shaw/sub2api/internal/pkg/ip"
"github.com/Wei-Shaw/sub2api/internal/server/middleware"
"github.com/Wei-Shaw/sub2api/internal/service"
......@@ -162,28 +161,32 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
return
}
setOpsRequestContext(c, modelName, stream, body)
// Get subscription (may be nil)
subscription, _ := middleware.GetSubscriptionFromContext(c)
// 获取 User-Agent
userAgent := c.Request.UserAgent()
// 获取客户端 IP
clientIP := ip.GetClientIP(c)
// For Gemini native API, do not send Claude-style ping frames.
geminiConcurrency := NewConcurrencyHelper(h.concurrencyHelper.concurrencyService, SSEPingFormatNone, 0)
// 0) wait queue check
maxWait := service.CalculateMaxWait(authSubject.Concurrency)
canWait, err := geminiConcurrency.IncrementWaitCount(c.Request.Context(), authSubject.UserID, maxWait)
waitCounted := false
if err != nil {
log.Printf("Increment wait count failed: %v", err)
} else if !canWait {
googleError(c, http.StatusTooManyRequests, "Too many pending requests, please retry later")
return
}
defer geminiConcurrency.DecrementWaitCount(c.Request.Context(), authSubject.UserID)
if err == nil && canWait {
waitCounted = true
}
defer func() {
if waitCounted {
geminiConcurrency.DecrementWaitCount(c.Request.Context(), authSubject.UserID)
}
}()
// 1) user concurrency slot
streamStarted := false
......@@ -192,6 +195,10 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
googleError(c, http.StatusTooManyRequests, err.Error())
return
}
if waitCounted {
geminiConcurrency.DecrementWaitCount(c.Request.Context(), authSubject.UserID)
waitCounted = false
}
// 确保请求取消时也会释放槽位,避免长连接被动中断造成泄漏
userReleaseFunc = wrapReleaseOnDone(c.Request.Context(), userReleaseFunc)
if userReleaseFunc != nil {
......@@ -207,10 +214,6 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
// 3) select account (sticky session based on request body)
parsedReq, _ := service.ParseGatewayRequest(body)
// 设置 Claude Code 客户端标识到 context(用于分组限制检查)
SetClaudeCodeClientContext(c, body)
sessionHash := h.gatewayService.GenerateSessionHash(parsedReq)
sessionKey := sessionHash
if sessionHash != "" {
......@@ -232,15 +235,16 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
return
}
account := selection.Account
setOpsSelectedAccount(c, account.ID)
// 4) account concurrency slot
accountReleaseFunc := selection.ReleaseFunc
var accountWaitRelease func()
if !selection.Acquired {
if selection.WaitPlan == nil {
googleError(c, http.StatusServiceUnavailable, "No available Gemini accounts")
return
}
accountWaitCounted := false
canWait, err := geminiConcurrency.IncrementAccountWaitCount(c.Request.Context(), account.ID, selection.WaitPlan.MaxWaiting)
if err != nil {
log.Printf("Increment account wait count failed: %v", err)
......@@ -248,12 +252,15 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
log.Printf("Account wait queue full: account=%d", account.ID)
googleError(c, http.StatusTooManyRequests, "Too many pending requests, please retry later")
return
} else {
// Only set release function if increment succeeded
accountWaitRelease = func() {
}
if err == nil && canWait {
accountWaitCounted = true
}
defer func() {
if accountWaitCounted {
geminiConcurrency.DecrementAccountWaitCount(c.Request.Context(), account.ID)
}
}
}()
accountReleaseFunc, err = geminiConcurrency.AcquireAccountSlotWithWaitTimeout(
c,
......@@ -264,19 +271,19 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
&streamStarted,
)
if err != nil {
if accountWaitRelease != nil {
accountWaitRelease()
}
googleError(c, http.StatusTooManyRequests, err.Error())
return
}
if accountWaitCounted {
geminiConcurrency.DecrementAccountWaitCount(c.Request.Context(), account.ID)
accountWaitCounted = false
}
if err := h.gatewayService.BindStickySession(c.Request.Context(), apiKey.GroupID, sessionKey, account.ID); err != nil {
log.Printf("Bind sticky session failed: %v", err)
}
}
// 账号槽位/等待计数需要在超时或断开时安全回收
accountReleaseFunc = wrapReleaseOnDone(c.Request.Context(), accountReleaseFunc)
accountWaitRelease = wrapReleaseOnDone(c.Request.Context(), accountWaitRelease)
// 5) forward (根据平台分流)
var result *service.ForwardResult
......@@ -288,9 +295,6 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
if accountReleaseFunc != nil {
accountReleaseFunc()
}
if accountWaitRelease != nil {
accountWaitRelease()
}
if err != nil {
var failoverErr *service.UpstreamFailoverError
if errors.As(err, &failoverErr) {
......@@ -311,7 +315,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
}
// 6) record usage async
go func(result *service.ForwardResult, usedAccount *service.Account, ua string, cip string) {
go func(result *service.ForwardResult, usedAccount *service.Account) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := h.gatewayService.RecordUsage(ctx, &service.RecordUsageInput{
......@@ -320,12 +324,10 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
User: apiKey.User,
Account: usedAccount,
Subscription: subscription,
UserAgent: ua,
IPAddress: cip,
}); err != nil {
log.Printf("Record usage failed: %v", err)
}
}(result, account, userAgent, clientIP)
}(result, account)
return
}
}
......
......@@ -18,6 +18,7 @@ type AdminHandlers struct {
Redeem *admin.RedeemHandler
Promo *admin.PromoHandler
Setting *admin.SettingHandler
Ops *admin.OpsHandler
System *admin.SystemHandler
Subscription *admin.SubscriptionHandler
Usage *admin.UsageHandler
......
......@@ -12,7 +12,6 @@ import (
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/pkg/ip"
"github.com/Wei-Shaw/sub2api/internal/pkg/openai"
middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware"
"github.com/Wei-Shaw/sub2api/internal/service"
......@@ -77,6 +76,8 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
return
}
setOpsRequestContext(c, "", false, body)
// Parse request body to map for potential modification
var reqBody map[string]any
if err := json.Unmarshal(body, &reqBody); err != nil {
......@@ -95,10 +96,6 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
}
userAgent := c.GetHeader("User-Agent")
// 获取客户端 IP
clientIP := ip.GetClientIP(c)
if !openai.IsCodexCLIRequest(userAgent) {
existingInstructions, _ := reqBody["instructions"].(string)
if strings.TrimSpace(existingInstructions) == "" {
......@@ -114,6 +111,8 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
}
}
setOpsRequestContext(c, reqModel, reqStream, body)
// Track if we've started streaming (for error handling)
streamStarted := false
......@@ -123,6 +122,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
// 0. Check if wait queue is full
maxWait := service.CalculateMaxWait(subject.Concurrency)
canWait, err := h.concurrencyHelper.IncrementWaitCount(c.Request.Context(), subject.UserID, maxWait)
waitCounted := false
if err != nil {
log.Printf("Increment wait count failed: %v", err)
// On error, allow request to proceed
......@@ -130,8 +130,14 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
h.errorResponse(c, http.StatusTooManyRequests, "rate_limit_error", "Too many pending requests, please retry later")
return
}
// Ensure wait count is decremented when function exits
defer h.concurrencyHelper.DecrementWaitCount(c.Request.Context(), subject.UserID)
if err == nil && canWait {
waitCounted = true
}
defer func() {
if waitCounted {
h.concurrencyHelper.DecrementWaitCount(c.Request.Context(), subject.UserID)
}
}()
// 1. First acquire user concurrency slot
userReleaseFunc, err := h.concurrencyHelper.AcquireUserSlotWithWait(c, subject.UserID, subject.Concurrency, reqStream, &streamStarted)
......@@ -140,6 +146,11 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
h.handleConcurrencyError(c, err, "user", streamStarted)
return
}
// User slot acquired: no longer waiting.
if waitCounted {
h.concurrencyHelper.DecrementWaitCount(c.Request.Context(), subject.UserID)
waitCounted = false
}
// 确保请求取消时也会释放槽位,避免长连接被动中断造成泄漏
userReleaseFunc = wrapReleaseOnDone(c.Request.Context(), userReleaseFunc)
if userReleaseFunc != nil {
......@@ -177,15 +188,16 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
}
account := selection.Account
log.Printf("[OpenAI Handler] Selected account: id=%d name=%s", account.ID, account.Name)
setOpsSelectedAccount(c, account.ID)
// 3. Acquire account concurrency slot
accountReleaseFunc := selection.ReleaseFunc
var accountWaitRelease func()
if !selection.Acquired {
if selection.WaitPlan == nil {
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", streamStarted)
return
}
accountWaitCounted := false
canWait, err := h.concurrencyHelper.IncrementAccountWaitCount(c.Request.Context(), account.ID, selection.WaitPlan.MaxWaiting)
if err != nil {
log.Printf("Increment account wait count failed: %v", err)
......@@ -193,12 +205,15 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
log.Printf("Account wait queue full: account=%d", account.ID)
h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", "Too many pending requests, please retry later", streamStarted)
return
} else {
// Only set release function if increment succeeded
accountWaitRelease = func() {
}
if err == nil && canWait {
accountWaitCounted = true
}
defer func() {
if accountWaitCounted {
h.concurrencyHelper.DecrementAccountWaitCount(c.Request.Context(), account.ID)
}
}
}()
accountReleaseFunc, err = h.concurrencyHelper.AcquireAccountSlotWithWaitTimeout(
c,
......@@ -209,29 +224,26 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
&streamStarted,
)
if err != nil {
if accountWaitRelease != nil {
accountWaitRelease()
}
log.Printf("Account concurrency acquire failed: %v", err)
h.handleConcurrencyError(c, err, "account", streamStarted)
return
}
if accountWaitCounted {
h.concurrencyHelper.DecrementAccountWaitCount(c.Request.Context(), account.ID)
accountWaitCounted = false
}
if err := h.gatewayService.BindStickySession(c.Request.Context(), apiKey.GroupID, sessionHash, account.ID); err != nil {
log.Printf("Bind sticky session failed: %v", err)
}
}
// 账号槽位/等待计数需要在超时或断开时安全回收
accountReleaseFunc = wrapReleaseOnDone(c.Request.Context(), accountReleaseFunc)
accountWaitRelease = wrapReleaseOnDone(c.Request.Context(), accountWaitRelease)
// Forward request
result, err := h.gatewayService.Forward(c.Request.Context(), c, account, body)
if accountReleaseFunc != nil {
accountReleaseFunc()
}
if accountWaitRelease != nil {
accountWaitRelease()
}
if err != nil {
var failoverErr *service.UpstreamFailoverError
if errors.As(err, &failoverErr) {
......@@ -252,7 +264,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
}
// Async record usage
go func(result *service.OpenAIForwardResult, usedAccount *service.Account, ua string, cip string) {
go func(result *service.OpenAIForwardResult, usedAccount *service.Account) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := h.gatewayService.RecordUsage(ctx, &service.OpenAIRecordUsageInput{
......@@ -261,12 +273,10 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
User: apiKey.User,
Account: usedAccount,
Subscription: subscription,
UserAgent: ua,
IPAddress: cip,
}); err != nil {
log.Printf("Record usage failed: %v", err)
}
}(result, account, userAgent, clientIP)
}(result, account)
return
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment