Commit 90bce60b authored by yangjianbo's avatar yangjianbo
Browse files

feat: merge dev

parent a458e684
...@@ -9,16 +9,19 @@ import ( ...@@ -9,16 +9,19 @@ import (
) )
type Account struct { type Account struct {
ID int64 ID int64
Name string Name string
Notes *string Notes *string
Platform string Platform string
Type string Type string
Credentials map[string]any Credentials map[string]any
Extra map[string]any Extra map[string]any
ProxyID *int64 ProxyID *int64
Concurrency int Concurrency int
Priority int Priority int
// RateMultiplier 账号计费倍率(>=0,允许 0 表示该账号计费为 0)。
// 使用指针用于兼容旧版本调度缓存(Redis)中缺字段的情况:nil 表示按 1.0 处理。
RateMultiplier *float64
Status string Status string
ErrorMessage string ErrorMessage string
LastUsedAt *time.Time LastUsedAt *time.Time
...@@ -57,6 +60,20 @@ func (a *Account) IsActive() bool { ...@@ -57,6 +60,20 @@ func (a *Account) IsActive() bool {
return a.Status == StatusActive return a.Status == StatusActive
} }
// BillingRateMultiplier 返回账号计费倍率。
// - nil 表示未配置/旧缓存缺字段,按 1.0 处理
// - 允许 0,表示该账号计费为 0
// - 负数属于非法数据,出于安全考虑按 1.0 处理
func (a *Account) BillingRateMultiplier() float64 {
if a == nil || a.RateMultiplier == nil {
return 1.0
}
if *a.RateMultiplier < 0 {
return 1.0
}
return *a.RateMultiplier
}
func (a *Account) IsSchedulable() bool { func (a *Account) IsSchedulable() bool {
if !a.IsActive() || !a.Schedulable { if !a.IsActive() || !a.Schedulable {
return false return false
......
package service
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/require"
)
func TestAccount_BillingRateMultiplier_DefaultsToOneWhenNil(t *testing.T) {
var a Account
require.NoError(t, json.Unmarshal([]byte(`{"id":1,"name":"acc","status":"active"}`), &a))
require.Nil(t, a.RateMultiplier)
require.Equal(t, 1.0, a.BillingRateMultiplier())
}
func TestAccount_BillingRateMultiplier_AllowsZero(t *testing.T) {
v := 0.0
a := Account{RateMultiplier: &v}
require.Equal(t, 0.0, a.BillingRateMultiplier())
}
func TestAccount_BillingRateMultiplier_NegativeFallsBackToOne(t *testing.T) {
v := -1.0
a := Account{RateMultiplier: &v}
require.Equal(t, 1.0, a.BillingRateMultiplier())
}
...@@ -63,14 +63,15 @@ type AccountRepository interface { ...@@ -63,14 +63,15 @@ type AccountRepository interface {
// AccountBulkUpdate describes the fields that can be updated in a bulk operation. // AccountBulkUpdate describes the fields that can be updated in a bulk operation.
// Nil pointers mean "do not change". // Nil pointers mean "do not change".
type AccountBulkUpdate struct { type AccountBulkUpdate struct {
Name *string Name *string
ProxyID *int64 ProxyID *int64
Concurrency *int Concurrency *int
Priority *int Priority *int
Status *string RateMultiplier *float64
Schedulable *bool Status *string
Credentials map[string]any Schedulable *bool
Extra map[string]any Credentials map[string]any
Extra map[string]any
} }
// CreateAccountRequest 创建账号请求 // CreateAccountRequest 创建账号请求
......
...@@ -32,8 +32,8 @@ type UsageLogRepository interface { ...@@ -32,8 +32,8 @@ type UsageLogRepository interface {
// Admin dashboard stats // Admin dashboard stats
GetDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) GetDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error)
GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID int64) ([]usagestats.TrendDataPoint, error) GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID, accountID, groupID int64, model string, stream *bool) ([]usagestats.TrendDataPoint, error)
GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID int64) ([]usagestats.ModelStat, error) GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID, groupID int64, stream *bool) ([]usagestats.ModelStat, error)
GetAPIKeyUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.APIKeyUsageTrendPoint, error) GetAPIKeyUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.APIKeyUsageTrendPoint, error)
GetUserUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.UserUsageTrendPoint, error) GetUserUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.UserUsageTrendPoint, error)
GetBatchUserUsageStats(ctx context.Context, userIDs []int64) (map[int64]*usagestats.BatchUserUsageStats, error) GetBatchUserUsageStats(ctx context.Context, userIDs []int64) (map[int64]*usagestats.BatchUserUsageStats, error)
...@@ -96,10 +96,16 @@ func NewUsageCache() *UsageCache { ...@@ -96,10 +96,16 @@ func NewUsageCache() *UsageCache {
} }
// WindowStats 窗口期统计 // WindowStats 窗口期统计
//
// cost: 账号口径费用(total_cost * account_rate_multiplier)
// standard_cost: 标准费用(total_cost,不含倍率)
// user_cost: 用户/API Key 口径费用(actual_cost,受分组倍率影响)
type WindowStats struct { type WindowStats struct {
Requests int64 `json:"requests"` Requests int64 `json:"requests"`
Tokens int64 `json:"tokens"` Tokens int64 `json:"tokens"`
Cost float64 `json:"cost"` Cost float64 `json:"cost"`
StandardCost float64 `json:"standard_cost"`
UserCost float64 `json:"user_cost"`
} }
// UsageProgress 使用量进度 // UsageProgress 使用量进度
...@@ -266,7 +272,7 @@ func (s *AccountUsageService) getGeminiUsage(ctx context.Context, account *Accou ...@@ -266,7 +272,7 @@ func (s *AccountUsageService) getGeminiUsage(ctx context.Context, account *Accou
} }
dayStart := geminiDailyWindowStart(now) dayStart := geminiDailyWindowStart(now)
stats, err := s.usageLogRepo.GetModelStatsWithFilters(ctx, dayStart, now, 0, 0, account.ID) stats, err := s.usageLogRepo.GetModelStatsWithFilters(ctx, dayStart, now, 0, 0, account.ID, 0, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("get gemini usage stats failed: %w", err) return nil, fmt.Errorf("get gemini usage stats failed: %w", err)
} }
...@@ -288,7 +294,7 @@ func (s *AccountUsageService) getGeminiUsage(ctx context.Context, account *Accou ...@@ -288,7 +294,7 @@ func (s *AccountUsageService) getGeminiUsage(ctx context.Context, account *Accou
// Minute window (RPM) - fixed-window approximation: current minute [truncate(now), truncate(now)+1m) // Minute window (RPM) - fixed-window approximation: current minute [truncate(now), truncate(now)+1m)
minuteStart := now.Truncate(time.Minute) minuteStart := now.Truncate(time.Minute)
minuteResetAt := minuteStart.Add(time.Minute) minuteResetAt := minuteStart.Add(time.Minute)
minuteStats, err := s.usageLogRepo.GetModelStatsWithFilters(ctx, minuteStart, now, 0, 0, account.ID) minuteStats, err := s.usageLogRepo.GetModelStatsWithFilters(ctx, minuteStart, now, 0, 0, account.ID, 0, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("get gemini minute usage stats failed: %w", err) return nil, fmt.Errorf("get gemini minute usage stats failed: %w", err)
} }
...@@ -377,9 +383,11 @@ func (s *AccountUsageService) addWindowStats(ctx context.Context, account *Accou ...@@ -377,9 +383,11 @@ func (s *AccountUsageService) addWindowStats(ctx context.Context, account *Accou
} }
windowStats = &WindowStats{ windowStats = &WindowStats{
Requests: stats.Requests, Requests: stats.Requests,
Tokens: stats.Tokens, Tokens: stats.Tokens,
Cost: stats.Cost, Cost: stats.Cost,
StandardCost: stats.StandardCost,
UserCost: stats.UserCost,
} }
// 缓存窗口统计(1 分钟) // 缓存窗口统计(1 分钟)
...@@ -403,9 +411,11 @@ func (s *AccountUsageService) GetTodayStats(ctx context.Context, accountID int64 ...@@ -403,9 +411,11 @@ func (s *AccountUsageService) GetTodayStats(ctx context.Context, accountID int64
} }
return &WindowStats{ return &WindowStats{
Requests: stats.Requests, Requests: stats.Requests,
Tokens: stats.Tokens, Tokens: stats.Tokens,
Cost: stats.Cost, Cost: stats.Cost,
StandardCost: stats.StandardCost,
UserCost: stats.UserCost,
}, nil }, nil
} }
......
...@@ -54,7 +54,8 @@ type AdminService interface { ...@@ -54,7 +54,8 @@ type AdminService interface {
CreateProxy(ctx context.Context, input *CreateProxyInput) (*Proxy, error) CreateProxy(ctx context.Context, input *CreateProxyInput) (*Proxy, error)
UpdateProxy(ctx context.Context, id int64, input *UpdateProxyInput) (*Proxy, error) UpdateProxy(ctx context.Context, id int64, input *UpdateProxyInput) (*Proxy, error)
DeleteProxy(ctx context.Context, id int64) error DeleteProxy(ctx context.Context, id int64) error
GetProxyAccounts(ctx context.Context, proxyID int64, page, pageSize int) ([]Account, int64, error) BatchDeleteProxies(ctx context.Context, ids []int64) (*ProxyBatchDeleteResult, error)
GetProxyAccounts(ctx context.Context, proxyID int64) ([]ProxyAccountSummary, error)
CheckProxyExists(ctx context.Context, host string, port int, username, password string) (bool, error) CheckProxyExists(ctx context.Context, host string, port int, username, password string) (bool, error)
TestProxy(ctx context.Context, id int64) (*ProxyTestResult, error) TestProxy(ctx context.Context, id int64) (*ProxyTestResult, error)
...@@ -136,6 +137,7 @@ type CreateAccountInput struct { ...@@ -136,6 +137,7 @@ type CreateAccountInput struct {
ProxyID *int64 ProxyID *int64
Concurrency int Concurrency int
Priority int Priority int
RateMultiplier *float64 // 账号计费倍率(>=0,允许 0)
GroupIDs []int64 GroupIDs []int64
ExpiresAt *int64 ExpiresAt *int64
AutoPauseOnExpired *bool AutoPauseOnExpired *bool
...@@ -151,8 +153,9 @@ type UpdateAccountInput struct { ...@@ -151,8 +153,9 @@ type UpdateAccountInput struct {
Credentials map[string]any Credentials map[string]any
Extra map[string]any Extra map[string]any
ProxyID *int64 ProxyID *int64
Concurrency *int // 使用指针区分"未提供"和"设置为0" Concurrency *int // 使用指针区分"未提供"和"设置为0"
Priority *int // 使用指针区分"未提供"和"设置为0" Priority *int // 使用指针区分"未提供"和"设置为0"
RateMultiplier *float64 // 账号计费倍率(>=0,允许 0)
Status string Status string
GroupIDs *[]int64 GroupIDs *[]int64
ExpiresAt *int64 ExpiresAt *int64
...@@ -162,16 +165,17 @@ type UpdateAccountInput struct { ...@@ -162,16 +165,17 @@ type UpdateAccountInput struct {
// BulkUpdateAccountsInput describes the payload for bulk updating accounts. // BulkUpdateAccountsInput describes the payload for bulk updating accounts.
type BulkUpdateAccountsInput struct { type BulkUpdateAccountsInput struct {
AccountIDs []int64 AccountIDs []int64
Name string Name string
ProxyID *int64 ProxyID *int64
Concurrency *int Concurrency *int
Priority *int Priority *int
Status string RateMultiplier *float64 // 账号计费倍率(>=0,允许 0)
Schedulable *bool Status string
GroupIDs *[]int64 Schedulable *bool
Credentials map[string]any GroupIDs *[]int64
Extra map[string]any Credentials map[string]any
Extra map[string]any
// SkipMixedChannelCheck skips the mixed channel risk check when binding groups. // SkipMixedChannelCheck skips the mixed channel risk check when binding groups.
// This should only be set when the caller has explicitly confirmed the risk. // This should only be set when the caller has explicitly confirmed the risk.
SkipMixedChannelCheck bool SkipMixedChannelCheck bool
...@@ -220,6 +224,16 @@ type GenerateRedeemCodesInput struct { ...@@ -220,6 +224,16 @@ type GenerateRedeemCodesInput struct {
ValidityDays int // 订阅类型专用:有效天数 ValidityDays int // 订阅类型专用:有效天数
} }
type ProxyBatchDeleteResult struct {
DeletedIDs []int64 `json:"deleted_ids"`
Skipped []ProxyBatchDeleteSkipped `json:"skipped"`
}
type ProxyBatchDeleteSkipped struct {
ID int64 `json:"id"`
Reason string `json:"reason"`
}
// ProxyTestResult represents the result of testing a proxy // ProxyTestResult represents the result of testing a proxy
type ProxyTestResult struct { type ProxyTestResult struct {
Success bool `json:"success"` Success bool `json:"success"`
...@@ -254,6 +268,7 @@ type adminServiceImpl struct { ...@@ -254,6 +268,7 @@ type adminServiceImpl struct {
redeemCodeRepo RedeemCodeRepository redeemCodeRepo RedeemCodeRepository
billingCacheService *BillingCacheService billingCacheService *BillingCacheService
proxyProber ProxyExitInfoProber proxyProber ProxyExitInfoProber
proxyLatencyCache ProxyLatencyCache
authCacheInvalidator APIKeyAuthCacheInvalidator authCacheInvalidator APIKeyAuthCacheInvalidator
} }
...@@ -267,6 +282,7 @@ func NewAdminService( ...@@ -267,6 +282,7 @@ func NewAdminService(
redeemCodeRepo RedeemCodeRepository, redeemCodeRepo RedeemCodeRepository,
billingCacheService *BillingCacheService, billingCacheService *BillingCacheService,
proxyProber ProxyExitInfoProber, proxyProber ProxyExitInfoProber,
proxyLatencyCache ProxyLatencyCache,
authCacheInvalidator APIKeyAuthCacheInvalidator, authCacheInvalidator APIKeyAuthCacheInvalidator,
) AdminService { ) AdminService {
return &adminServiceImpl{ return &adminServiceImpl{
...@@ -278,6 +294,7 @@ func NewAdminService( ...@@ -278,6 +294,7 @@ func NewAdminService(
redeemCodeRepo: redeemCodeRepo, redeemCodeRepo: redeemCodeRepo,
billingCacheService: billingCacheService, billingCacheService: billingCacheService,
proxyProber: proxyProber, proxyProber: proxyProber,
proxyLatencyCache: proxyLatencyCache,
authCacheInvalidator: authCacheInvalidator, authCacheInvalidator: authCacheInvalidator,
} }
} }
...@@ -817,6 +834,12 @@ func (s *adminServiceImpl) CreateAccount(ctx context.Context, input *CreateAccou ...@@ -817,6 +834,12 @@ func (s *adminServiceImpl) CreateAccount(ctx context.Context, input *CreateAccou
} else { } else {
account.AutoPauseOnExpired = true account.AutoPauseOnExpired = true
} }
if input.RateMultiplier != nil {
if *input.RateMultiplier < 0 {
return nil, errors.New("rate_multiplier must be >= 0")
}
account.RateMultiplier = input.RateMultiplier
}
if err := s.accountRepo.Create(ctx, account); err != nil { if err := s.accountRepo.Create(ctx, account); err != nil {
return nil, err return nil, err
} }
...@@ -869,6 +892,12 @@ func (s *adminServiceImpl) UpdateAccount(ctx context.Context, id int64, input *U ...@@ -869,6 +892,12 @@ func (s *adminServiceImpl) UpdateAccount(ctx context.Context, id int64, input *U
if input.Priority != nil { if input.Priority != nil {
account.Priority = *input.Priority account.Priority = *input.Priority
} }
if input.RateMultiplier != nil {
if *input.RateMultiplier < 0 {
return nil, errors.New("rate_multiplier must be >= 0")
}
account.RateMultiplier = input.RateMultiplier
}
if input.Status != "" { if input.Status != "" {
account.Status = input.Status account.Status = input.Status
} }
...@@ -942,6 +971,12 @@ func (s *adminServiceImpl) BulkUpdateAccounts(ctx context.Context, input *BulkUp ...@@ -942,6 +971,12 @@ func (s *adminServiceImpl) BulkUpdateAccounts(ctx context.Context, input *BulkUp
} }
} }
if input.RateMultiplier != nil {
if *input.RateMultiplier < 0 {
return nil, errors.New("rate_multiplier must be >= 0")
}
}
// Prepare bulk updates for columns and JSONB fields. // Prepare bulk updates for columns and JSONB fields.
repoUpdates := AccountBulkUpdate{ repoUpdates := AccountBulkUpdate{
Credentials: input.Credentials, Credentials: input.Credentials,
...@@ -959,6 +994,9 @@ func (s *adminServiceImpl) BulkUpdateAccounts(ctx context.Context, input *BulkUp ...@@ -959,6 +994,9 @@ func (s *adminServiceImpl) BulkUpdateAccounts(ctx context.Context, input *BulkUp
if input.Priority != nil { if input.Priority != nil {
repoUpdates.Priority = input.Priority repoUpdates.Priority = input.Priority
} }
if input.RateMultiplier != nil {
repoUpdates.RateMultiplier = input.RateMultiplier
}
if input.Status != "" { if input.Status != "" {
repoUpdates.Status = &input.Status repoUpdates.Status = &input.Status
} }
...@@ -1069,6 +1107,7 @@ func (s *adminServiceImpl) ListProxiesWithAccountCount(ctx context.Context, page ...@@ -1069,6 +1107,7 @@ func (s *adminServiceImpl) ListProxiesWithAccountCount(ctx context.Context, page
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
s.attachProxyLatency(ctx, proxies)
return proxies, result.Total, nil return proxies, result.Total, nil
} }
...@@ -1077,7 +1116,12 @@ func (s *adminServiceImpl) GetAllProxies(ctx context.Context) ([]Proxy, error) { ...@@ -1077,7 +1116,12 @@ func (s *adminServiceImpl) GetAllProxies(ctx context.Context) ([]Proxy, error) {
} }
func (s *adminServiceImpl) GetAllProxiesWithAccountCount(ctx context.Context) ([]ProxyWithAccountCount, error) { func (s *adminServiceImpl) GetAllProxiesWithAccountCount(ctx context.Context) ([]ProxyWithAccountCount, error) {
return s.proxyRepo.ListActiveWithAccountCount(ctx) proxies, err := s.proxyRepo.ListActiveWithAccountCount(ctx)
if err != nil {
return nil, err
}
s.attachProxyLatency(ctx, proxies)
return proxies, nil
} }
func (s *adminServiceImpl) GetProxy(ctx context.Context, id int64) (*Proxy, error) { func (s *adminServiceImpl) GetProxy(ctx context.Context, id int64) (*Proxy, error) {
...@@ -1097,6 +1141,8 @@ func (s *adminServiceImpl) CreateProxy(ctx context.Context, input *CreateProxyIn ...@@ -1097,6 +1141,8 @@ func (s *adminServiceImpl) CreateProxy(ctx context.Context, input *CreateProxyIn
if err := s.proxyRepo.Create(ctx, proxy); err != nil { if err := s.proxyRepo.Create(ctx, proxy); err != nil {
return nil, err return nil, err
} }
// Probe latency asynchronously so creation isn't blocked by network timeout.
go s.probeProxyLatency(context.Background(), proxy)
return proxy, nil return proxy, nil
} }
...@@ -1135,12 +1181,53 @@ func (s *adminServiceImpl) UpdateProxy(ctx context.Context, id int64, input *Upd ...@@ -1135,12 +1181,53 @@ func (s *adminServiceImpl) UpdateProxy(ctx context.Context, id int64, input *Upd
} }
func (s *adminServiceImpl) DeleteProxy(ctx context.Context, id int64) error { func (s *adminServiceImpl) DeleteProxy(ctx context.Context, id int64) error {
count, err := s.proxyRepo.CountAccountsByProxyID(ctx, id)
if err != nil {
return err
}
if count > 0 {
return ErrProxyInUse
}
return s.proxyRepo.Delete(ctx, id) return s.proxyRepo.Delete(ctx, id)
} }
func (s *adminServiceImpl) GetProxyAccounts(ctx context.Context, proxyID int64, page, pageSize int) ([]Account, int64, error) { func (s *adminServiceImpl) BatchDeleteProxies(ctx context.Context, ids []int64) (*ProxyBatchDeleteResult, error) {
// Return mock data for now - would need a dedicated repository method result := &ProxyBatchDeleteResult{}
return []Account{}, 0, nil if len(ids) == 0 {
return result, nil
}
for _, id := range ids {
count, err := s.proxyRepo.CountAccountsByProxyID(ctx, id)
if err != nil {
result.Skipped = append(result.Skipped, ProxyBatchDeleteSkipped{
ID: id,
Reason: err.Error(),
})
continue
}
if count > 0 {
result.Skipped = append(result.Skipped, ProxyBatchDeleteSkipped{
ID: id,
Reason: ErrProxyInUse.Error(),
})
continue
}
if err := s.proxyRepo.Delete(ctx, id); err != nil {
result.Skipped = append(result.Skipped, ProxyBatchDeleteSkipped{
ID: id,
Reason: err.Error(),
})
continue
}
result.DeletedIDs = append(result.DeletedIDs, id)
}
return result, nil
}
func (s *adminServiceImpl) GetProxyAccounts(ctx context.Context, proxyID int64) ([]ProxyAccountSummary, error) {
return s.proxyRepo.ListAccountSummariesByProxyID(ctx, proxyID)
} }
func (s *adminServiceImpl) CheckProxyExists(ctx context.Context, host string, port int, username, password string) (bool, error) { func (s *adminServiceImpl) CheckProxyExists(ctx context.Context, host string, port int, username, password string) (bool, error) {
...@@ -1240,12 +1327,24 @@ func (s *adminServiceImpl) TestProxy(ctx context.Context, id int64) (*ProxyTestR ...@@ -1240,12 +1327,24 @@ func (s *adminServiceImpl) TestProxy(ctx context.Context, id int64) (*ProxyTestR
proxyURL := proxy.URL() proxyURL := proxy.URL()
exitInfo, latencyMs, err := s.proxyProber.ProbeProxy(ctx, proxyURL) exitInfo, latencyMs, err := s.proxyProber.ProbeProxy(ctx, proxyURL)
if err != nil { if err != nil {
s.saveProxyLatency(ctx, id, &ProxyLatencyInfo{
Success: false,
Message: err.Error(),
UpdatedAt: time.Now(),
})
return &ProxyTestResult{ return &ProxyTestResult{
Success: false, Success: false,
Message: err.Error(), Message: err.Error(),
}, nil }, nil
} }
latency := latencyMs
s.saveProxyLatency(ctx, id, &ProxyLatencyInfo{
Success: true,
LatencyMs: &latency,
Message: "Proxy is accessible",
UpdatedAt: time.Now(),
})
return &ProxyTestResult{ return &ProxyTestResult{
Success: true, Success: true,
Message: "Proxy is accessible", Message: "Proxy is accessible",
...@@ -1257,6 +1356,29 @@ func (s *adminServiceImpl) TestProxy(ctx context.Context, id int64) (*ProxyTestR ...@@ -1257,6 +1356,29 @@ func (s *adminServiceImpl) TestProxy(ctx context.Context, id int64) (*ProxyTestR
}, nil }, nil
} }
func (s *adminServiceImpl) probeProxyLatency(ctx context.Context, proxy *Proxy) {
if s.proxyProber == nil || proxy == nil {
return
}
_, latencyMs, err := s.proxyProber.ProbeProxy(ctx, proxy.URL())
if err != nil {
s.saveProxyLatency(ctx, proxy.ID, &ProxyLatencyInfo{
Success: false,
Message: err.Error(),
UpdatedAt: time.Now(),
})
return
}
latency := latencyMs
s.saveProxyLatency(ctx, proxy.ID, &ProxyLatencyInfo{
Success: true,
LatencyMs: &latency,
Message: "Proxy is accessible",
UpdatedAt: time.Now(),
})
}
// checkMixedChannelRisk 检查分组中是否存在混合渠道(Antigravity + Anthropic) // checkMixedChannelRisk 检查分组中是否存在混合渠道(Antigravity + Anthropic)
// 如果存在混合,返回错误提示用户确认 // 如果存在混合,返回错误提示用户确认
func (s *adminServiceImpl) checkMixedChannelRisk(ctx context.Context, currentAccountID int64, currentAccountPlatform string, groupIDs []int64) error { func (s *adminServiceImpl) checkMixedChannelRisk(ctx context.Context, currentAccountID int64, currentAccountPlatform string, groupIDs []int64) error {
...@@ -1306,6 +1428,46 @@ func (s *adminServiceImpl) checkMixedChannelRisk(ctx context.Context, currentAcc ...@@ -1306,6 +1428,46 @@ func (s *adminServiceImpl) checkMixedChannelRisk(ctx context.Context, currentAcc
return nil return nil
} }
func (s *adminServiceImpl) attachProxyLatency(ctx context.Context, proxies []ProxyWithAccountCount) {
if s.proxyLatencyCache == nil || len(proxies) == 0 {
return
}
ids := make([]int64, 0, len(proxies))
for i := range proxies {
ids = append(ids, proxies[i].ID)
}
latencies, err := s.proxyLatencyCache.GetProxyLatencies(ctx, ids)
if err != nil {
log.Printf("Warning: load proxy latency cache failed: %v", err)
return
}
for i := range proxies {
info := latencies[proxies[i].ID]
if info == nil {
continue
}
if info.Success {
proxies[i].LatencyStatus = "success"
proxies[i].LatencyMs = info.LatencyMs
} else {
proxies[i].LatencyStatus = "failed"
}
proxies[i].LatencyMessage = info.Message
}
}
func (s *adminServiceImpl) saveProxyLatency(ctx context.Context, proxyID int64, info *ProxyLatencyInfo) {
if s.proxyLatencyCache == nil || info == nil {
return
}
if err := s.proxyLatencyCache.SetProxyLatency(ctx, proxyID, info); err != nil {
log.Printf("Warning: store proxy latency cache failed: %v", err)
}
}
// getAccountPlatform 根据账号 platform 判断混合渠道检查用的平台标识 // getAccountPlatform 根据账号 platform 判断混合渠道检查用的平台标识
func getAccountPlatform(accountPlatform string) string { func getAccountPlatform(accountPlatform string) string {
switch strings.ToLower(strings.TrimSpace(accountPlatform)) { switch strings.ToLower(strings.TrimSpace(accountPlatform)) {
......
...@@ -12,9 +12,9 @@ import ( ...@@ -12,9 +12,9 @@ import (
type accountRepoStubForBulkUpdate struct { type accountRepoStubForBulkUpdate struct {
accountRepoStub accountRepoStub
bulkUpdateErr error bulkUpdateErr error
bulkUpdateIDs []int64 bulkUpdateIDs []int64
bindGroupErrByID map[int64]error bindGroupErrByID map[int64]error
} }
func (s *accountRepoStubForBulkUpdate) BulkUpdate(_ context.Context, ids []int64, _ AccountBulkUpdate) (int64, error) { func (s *accountRepoStubForBulkUpdate) BulkUpdate(_ context.Context, ids []int64, _ AccountBulkUpdate) (int64, error) {
......
...@@ -153,8 +153,10 @@ func (s *groupRepoStub) DeleteAccountGroupsByGroupID(ctx context.Context, groupI ...@@ -153,8 +153,10 @@ func (s *groupRepoStub) DeleteAccountGroupsByGroupID(ctx context.Context, groupI
} }
type proxyRepoStub struct { type proxyRepoStub struct {
deleteErr error deleteErr error
deletedIDs []int64 countErr error
accountCount int64
deletedIDs []int64
} }
func (s *proxyRepoStub) Create(ctx context.Context, proxy *Proxy) error { func (s *proxyRepoStub) Create(ctx context.Context, proxy *Proxy) error {
...@@ -199,7 +201,14 @@ func (s *proxyRepoStub) ExistsByHostPortAuth(ctx context.Context, host string, p ...@@ -199,7 +201,14 @@ func (s *proxyRepoStub) ExistsByHostPortAuth(ctx context.Context, host string, p
} }
func (s *proxyRepoStub) CountAccountsByProxyID(ctx context.Context, proxyID int64) (int64, error) { func (s *proxyRepoStub) CountAccountsByProxyID(ctx context.Context, proxyID int64) (int64, error) {
panic("unexpected CountAccountsByProxyID call") if s.countErr != nil {
return 0, s.countErr
}
return s.accountCount, nil
}
func (s *proxyRepoStub) ListAccountSummariesByProxyID(ctx context.Context, proxyID int64) ([]ProxyAccountSummary, error) {
panic("unexpected ListAccountSummariesByProxyID call")
} }
type redeemRepoStub struct { type redeemRepoStub struct {
...@@ -409,6 +418,15 @@ func TestAdminService_DeleteProxy_Idempotent(t *testing.T) { ...@@ -409,6 +418,15 @@ func TestAdminService_DeleteProxy_Idempotent(t *testing.T) {
require.Equal(t, []int64{404}, repo.deletedIDs) require.Equal(t, []int64{404}, repo.deletedIDs)
} }
func TestAdminService_DeleteProxy_InUse(t *testing.T) {
repo := &proxyRepoStub{accountCount: 2}
svc := &adminServiceImpl{proxyRepo: repo}
err := svc.DeleteProxy(context.Background(), 77)
require.ErrorIs(t, err, ErrProxyInUse)
require.Empty(t, repo.deletedIDs)
}
func TestAdminService_DeleteProxy_Error(t *testing.T) { func TestAdminService_DeleteProxy_Error(t *testing.T) {
deleteErr := errors.New("delete failed") deleteErr := errors.New("delete failed")
repo := &proxyRepoStub{deleteErr: deleteErr} repo := &proxyRepoStub{deleteErr: deleteErr}
......
...@@ -564,6 +564,10 @@ urlFallbackLoop: ...@@ -564,6 +564,10 @@ urlFallbackLoop:
} }
upstreamReq, err := antigravity.NewAPIRequestWithURL(ctx, baseURL, action, accessToken, geminiBody) upstreamReq, err := antigravity.NewAPIRequestWithURL(ctx, baseURL, action, accessToken, geminiBody)
// Capture upstream request body for ops retry of this attempt.
if c != nil {
c.Set(OpsUpstreamRequestBodyKey, string(geminiBody))
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -574,6 +578,7 @@ urlFallbackLoop: ...@@ -574,6 +578,7 @@ urlFallbackLoop:
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: 0, UpstreamStatusCode: 0,
Kind: "request_error", Kind: "request_error",
Message: safeErr, Message: safeErr,
...@@ -615,6 +620,7 @@ urlFallbackLoop: ...@@ -615,6 +620,7 @@ urlFallbackLoop:
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"), UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "retry", Kind: "retry",
...@@ -645,6 +651,7 @@ urlFallbackLoop: ...@@ -645,6 +651,7 @@ urlFallbackLoop:
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"), UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "retry", Kind: "retry",
...@@ -697,6 +704,7 @@ urlFallbackLoop: ...@@ -697,6 +704,7 @@ urlFallbackLoop:
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"), UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "signature_error", Kind: "signature_error",
...@@ -740,6 +748,7 @@ urlFallbackLoop: ...@@ -740,6 +748,7 @@ urlFallbackLoop:
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: 0, UpstreamStatusCode: 0,
Kind: "signature_retry_request_error", Kind: "signature_retry_request_error",
Message: sanitizeUpstreamErrorMessage(retryErr.Error()), Message: sanitizeUpstreamErrorMessage(retryErr.Error()),
...@@ -770,6 +779,7 @@ urlFallbackLoop: ...@@ -770,6 +779,7 @@ urlFallbackLoop:
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: retryResp.StatusCode, UpstreamStatusCode: retryResp.StatusCode,
UpstreamRequestID: retryResp.Header.Get("x-request-id"), UpstreamRequestID: retryResp.Header.Get("x-request-id"),
Kind: kind, Kind: kind,
...@@ -817,6 +827,7 @@ urlFallbackLoop: ...@@ -817,6 +827,7 @@ urlFallbackLoop:
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"), UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "failover", Kind: "failover",
...@@ -1371,6 +1382,7 @@ urlFallbackLoop: ...@@ -1371,6 +1382,7 @@ urlFallbackLoop:
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: 0, UpstreamStatusCode: 0,
Kind: "request_error", Kind: "request_error",
Message: safeErr, Message: safeErr,
...@@ -1412,6 +1424,7 @@ urlFallbackLoop: ...@@ -1412,6 +1424,7 @@ urlFallbackLoop:
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"), UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "retry", Kind: "retry",
...@@ -1442,6 +1455,7 @@ urlFallbackLoop: ...@@ -1442,6 +1455,7 @@ urlFallbackLoop:
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"), UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "retry", Kind: "retry",
...@@ -1543,6 +1557,7 @@ urlFallbackLoop: ...@@ -1543,6 +1557,7 @@ urlFallbackLoop:
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: requestID, UpstreamRequestID: requestID,
Kind: "failover", Kind: "failover",
...@@ -1559,6 +1574,7 @@ urlFallbackLoop: ...@@ -1559,6 +1574,7 @@ urlFallbackLoop:
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: requestID, UpstreamRequestID: requestID,
Kind: "http_error", Kind: "http_error",
...@@ -2039,6 +2055,7 @@ func (s *AntigravityGatewayService) writeMappedClaudeError(c *gin.Context, accou ...@@ -2039,6 +2055,7 @@ func (s *AntigravityGatewayService) writeMappedClaudeError(c *gin.Context, accou
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: upstreamStatus, UpstreamStatusCode: upstreamStatus,
UpstreamRequestID: upstreamRequestID, UpstreamRequestID: upstreamRequestID,
Kind: "http_error", Kind: "http_error",
......
...@@ -124,16 +124,16 @@ func (s *DashboardService) GetDashboardStats(ctx context.Context) (*usagestats.D ...@@ -124,16 +124,16 @@ func (s *DashboardService) GetDashboardStats(ctx context.Context) (*usagestats.D
return stats, nil return stats, nil
} }
func (s *DashboardService) GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID int64) ([]usagestats.TrendDataPoint, error) { func (s *DashboardService) GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID, accountID, groupID int64, model string, stream *bool) ([]usagestats.TrendDataPoint, error) {
trend, err := s.usageRepo.GetUsageTrendWithFilters(ctx, startTime, endTime, granularity, userID, apiKeyID) trend, err := s.usageRepo.GetUsageTrendWithFilters(ctx, startTime, endTime, granularity, userID, apiKeyID, accountID, groupID, model, stream)
if err != nil { if err != nil {
return nil, fmt.Errorf("get usage trend with filters: %w", err) return nil, fmt.Errorf("get usage trend with filters: %w", err)
} }
return trend, nil return trend, nil
} }
func (s *DashboardService) GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID int64) ([]usagestats.ModelStat, error) { func (s *DashboardService) GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID, groupID int64, stream *bool) ([]usagestats.ModelStat, error) {
stats, err := s.usageRepo.GetModelStatsWithFilters(ctx, startTime, endTime, userID, apiKeyID, 0) stats, err := s.usageRepo.GetModelStatsWithFilters(ctx, startTime, endTime, userID, apiKeyID, accountID, groupID, stream)
if err != nil { if err != nil {
return nil, fmt.Errorf("get model stats with filters: %w", err) return nil, fmt.Errorf("get model stats with filters: %w", err)
} }
......
...@@ -1466,6 +1466,9 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A ...@@ -1466,6 +1466,9 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
for attempt := 1; attempt <= maxRetryAttempts; attempt++ { for attempt := 1; attempt <= maxRetryAttempts; attempt++ {
// 构建上游请求(每次重试需要重新构建,因为请求体需要重新读取) // 构建上游请求(每次重试需要重新构建,因为请求体需要重新读取)
upstreamReq, err := s.buildUpstreamRequest(ctx, c, account, body, token, tokenType, reqModel) upstreamReq, err := s.buildUpstreamRequest(ctx, c, account, body, token, tokenType, reqModel)
// Capture upstream request body for ops retry of this attempt.
c.Set(OpsUpstreamRequestBodyKey, string(body))
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -1482,6 +1485,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A ...@@ -1482,6 +1485,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: 0, UpstreamStatusCode: 0,
Kind: "request_error", Kind: "request_error",
Message: safeErr, Message: safeErr,
...@@ -1506,6 +1510,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A ...@@ -1506,6 +1510,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"), UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "signature_error", Kind: "signature_error",
...@@ -1557,6 +1562,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A ...@@ -1557,6 +1562,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: retryResp.StatusCode, UpstreamStatusCode: retryResp.StatusCode,
UpstreamRequestID: retryResp.Header.Get("x-request-id"), UpstreamRequestID: retryResp.Header.Get("x-request-id"),
Kind: "signature_retry_thinking", Kind: "signature_retry_thinking",
...@@ -1585,6 +1591,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A ...@@ -1585,6 +1591,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: 0, UpstreamStatusCode: 0,
Kind: "signature_retry_tools_request_error", Kind: "signature_retry_tools_request_error",
Message: sanitizeUpstreamErrorMessage(retryErr2.Error()), Message: sanitizeUpstreamErrorMessage(retryErr2.Error()),
...@@ -1643,6 +1650,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A ...@@ -1643,6 +1650,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"), UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "retry", Kind: "retry",
...@@ -1691,6 +1699,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A ...@@ -1691,6 +1699,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"), UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "retry_exhausted_failover", Kind: "retry_exhausted_failover",
...@@ -1757,6 +1766,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A ...@@ -1757,6 +1766,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"), UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "failover_on_400", Kind: "failover_on_400",
...@@ -2634,30 +2644,32 @@ func (s *GatewayService) RecordUsage(ctx context.Context, input *RecordUsageInpu ...@@ -2634,30 +2644,32 @@ func (s *GatewayService) RecordUsage(ctx context.Context, input *RecordUsageInpu
if result.ImageSize != "" { if result.ImageSize != "" {
imageSize = &result.ImageSize imageSize = &result.ImageSize
} }
accountRateMultiplier := account.BillingRateMultiplier()
usageLog := &UsageLog{ usageLog := &UsageLog{
UserID: user.ID, UserID: user.ID,
APIKeyID: apiKey.ID, APIKeyID: apiKey.ID,
AccountID: account.ID, AccountID: account.ID,
RequestID: result.RequestID, RequestID: result.RequestID,
Model: result.Model, Model: result.Model,
InputTokens: result.Usage.InputTokens, InputTokens: result.Usage.InputTokens,
OutputTokens: result.Usage.OutputTokens, OutputTokens: result.Usage.OutputTokens,
CacheCreationTokens: result.Usage.CacheCreationInputTokens, CacheCreationTokens: result.Usage.CacheCreationInputTokens,
CacheReadTokens: result.Usage.CacheReadInputTokens, CacheReadTokens: result.Usage.CacheReadInputTokens,
InputCost: cost.InputCost, InputCost: cost.InputCost,
OutputCost: cost.OutputCost, OutputCost: cost.OutputCost,
CacheCreationCost: cost.CacheCreationCost, CacheCreationCost: cost.CacheCreationCost,
CacheReadCost: cost.CacheReadCost, CacheReadCost: cost.CacheReadCost,
TotalCost: cost.TotalCost, TotalCost: cost.TotalCost,
ActualCost: cost.ActualCost, ActualCost: cost.ActualCost,
RateMultiplier: multiplier, RateMultiplier: multiplier,
BillingType: billingType, AccountRateMultiplier: &accountRateMultiplier,
Stream: result.Stream, BillingType: billingType,
DurationMs: &durationMs, Stream: result.Stream,
FirstTokenMs: result.FirstTokenMs, DurationMs: &durationMs,
ImageCount: result.ImageCount, FirstTokenMs: result.FirstTokenMs,
ImageSize: imageSize, ImageCount: result.ImageCount,
CreatedAt: time.Now(), ImageSize: imageSize,
CreatedAt: time.Now(),
} }
// 添加 UserAgent // 添加 UserAgent
......
...@@ -545,12 +545,19 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex ...@@ -545,12 +545,19 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
} }
requestIDHeader = idHeader requestIDHeader = idHeader
// Capture upstream request body for ops retry of this attempt.
if c != nil {
// In this code path `body` is already the JSON sent to upstream.
c.Set(OpsUpstreamRequestBodyKey, string(body))
}
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil { if err != nil {
safeErr := sanitizeUpstreamErrorMessage(err.Error()) safeErr := sanitizeUpstreamErrorMessage(err.Error())
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: 0, UpstreamStatusCode: 0,
Kind: "request_error", Kind: "request_error",
Message: safeErr, Message: safeErr,
...@@ -588,6 +595,7 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex ...@@ -588,6 +595,7 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: upstreamReqID, UpstreamRequestID: upstreamReqID,
Kind: "signature_error", Kind: "signature_error",
...@@ -662,6 +670,7 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex ...@@ -662,6 +670,7 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: upstreamReqID, UpstreamRequestID: upstreamReqID,
Kind: "retry", Kind: "retry",
...@@ -711,6 +720,7 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex ...@@ -711,6 +720,7 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: upstreamReqID, UpstreamRequestID: upstreamReqID,
Kind: "failover", Kind: "failover",
...@@ -737,6 +747,7 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex ...@@ -737,6 +747,7 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: upstreamReqID, UpstreamRequestID: upstreamReqID,
Kind: "failover", Kind: "failover",
...@@ -972,12 +983,19 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin. ...@@ -972,12 +983,19 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin.
} }
requestIDHeader = idHeader requestIDHeader = idHeader
// Capture upstream request body for ops retry of this attempt.
if c != nil {
// In this code path `body` is already the JSON sent to upstream.
c.Set(OpsUpstreamRequestBodyKey, string(body))
}
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil { if err != nil {
safeErr := sanitizeUpstreamErrorMessage(err.Error()) safeErr := sanitizeUpstreamErrorMessage(err.Error())
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: 0, UpstreamStatusCode: 0,
Kind: "request_error", Kind: "request_error",
Message: safeErr, Message: safeErr,
...@@ -1036,6 +1054,7 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin. ...@@ -1036,6 +1054,7 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin.
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: upstreamReqID, UpstreamRequestID: upstreamReqID,
Kind: "retry", Kind: "retry",
...@@ -1120,6 +1139,7 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin. ...@@ -1120,6 +1139,7 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin.
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: requestID, UpstreamRequestID: requestID,
Kind: "failover", Kind: "failover",
...@@ -1143,6 +1163,7 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin. ...@@ -1143,6 +1163,7 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin.
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: requestID, UpstreamRequestID: requestID,
Kind: "failover", Kind: "failover",
...@@ -1168,6 +1189,7 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin. ...@@ -1168,6 +1189,7 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin.
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: requestID, UpstreamRequestID: requestID,
Kind: "http_error", Kind: "http_error",
...@@ -1300,6 +1322,7 @@ func (s *GeminiMessagesCompatService) writeGeminiMappedError(c *gin.Context, acc ...@@ -1300,6 +1322,7 @@ func (s *GeminiMessagesCompatService) writeGeminiMappedError(c *gin.Context, acc
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: upstreamStatus, UpstreamStatusCode: upstreamStatus,
UpstreamRequestID: upstreamRequestID, UpstreamRequestID: upstreamRequestID,
Kind: "http_error", Kind: "http_error",
......
...@@ -664,6 +664,11 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco ...@@ -664,6 +664,11 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
proxyURL = account.Proxy.URL() proxyURL = account.Proxy.URL()
} }
// Capture upstream request body for ops retry of this attempt.
if c != nil {
c.Set(OpsUpstreamRequestBodyKey, string(body))
}
// Send request // Send request
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil { if err != nil {
...@@ -673,6 +678,7 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco ...@@ -673,6 +678,7 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: 0, UpstreamStatusCode: 0,
Kind: "request_error", Kind: "request_error",
Message: safeErr, Message: safeErr,
...@@ -707,6 +713,7 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco ...@@ -707,6 +713,7 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"), UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "failover", Kind: "failover",
...@@ -864,6 +871,7 @@ func (s *OpenAIGatewayService) handleErrorResponse(ctx context.Context, resp *ht ...@@ -864,6 +871,7 @@ func (s *OpenAIGatewayService) handleErrorResponse(ctx context.Context, resp *ht
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"), UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "http_error", Kind: "http_error",
...@@ -894,6 +902,7 @@ func (s *OpenAIGatewayService) handleErrorResponse(ctx context.Context, resp *ht ...@@ -894,6 +902,7 @@ func (s *OpenAIGatewayService) handleErrorResponse(ctx context.Context, resp *ht
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform, Platform: account.Platform,
AccountID: account.ID, AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode, UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"), UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: kind, Kind: kind,
...@@ -1443,28 +1452,30 @@ func (s *OpenAIGatewayService) RecordUsage(ctx context.Context, input *OpenAIRec ...@@ -1443,28 +1452,30 @@ func (s *OpenAIGatewayService) RecordUsage(ctx context.Context, input *OpenAIRec
// Create usage log // Create usage log
durationMs := int(result.Duration.Milliseconds()) durationMs := int(result.Duration.Milliseconds())
accountRateMultiplier := account.BillingRateMultiplier()
usageLog := &UsageLog{ usageLog := &UsageLog{
UserID: user.ID, UserID: user.ID,
APIKeyID: apiKey.ID, APIKeyID: apiKey.ID,
AccountID: account.ID, AccountID: account.ID,
RequestID: result.RequestID, RequestID: result.RequestID,
Model: result.Model, Model: result.Model,
InputTokens: actualInputTokens, InputTokens: actualInputTokens,
OutputTokens: result.Usage.OutputTokens, OutputTokens: result.Usage.OutputTokens,
CacheCreationTokens: result.Usage.CacheCreationInputTokens, CacheCreationTokens: result.Usage.CacheCreationInputTokens,
CacheReadTokens: result.Usage.CacheReadInputTokens, CacheReadTokens: result.Usage.CacheReadInputTokens,
InputCost: cost.InputCost, InputCost: cost.InputCost,
OutputCost: cost.OutputCost, OutputCost: cost.OutputCost,
CacheCreationCost: cost.CacheCreationCost, CacheCreationCost: cost.CacheCreationCost,
CacheReadCost: cost.CacheReadCost, CacheReadCost: cost.CacheReadCost,
TotalCost: cost.TotalCost, TotalCost: cost.TotalCost,
ActualCost: cost.ActualCost, ActualCost: cost.ActualCost,
RateMultiplier: multiplier, RateMultiplier: multiplier,
BillingType: billingType, AccountRateMultiplier: &accountRateMultiplier,
Stream: result.Stream, BillingType: billingType,
DurationMs: &durationMs, Stream: result.Stream,
FirstTokenMs: result.FirstTokenMs, DurationMs: &durationMs,
CreatedAt: time.Now(), FirstTokenMs: result.FirstTokenMs,
CreatedAt: time.Now(),
} }
// 添加 UserAgent // 添加 UserAgent
......
...@@ -206,7 +206,7 @@ func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) { ...@@ -206,7 +206,7 @@ func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) {
continue continue
} }
scopePlatform, scopeGroupID := parseOpsAlertRuleScope(rule.Filters) scopePlatform, scopeGroupID, scopeRegion := parseOpsAlertRuleScope(rule.Filters)
windowMinutes := rule.WindowMinutes windowMinutes := rule.WindowMinutes
if windowMinutes <= 0 { if windowMinutes <= 0 {
...@@ -236,6 +236,17 @@ func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) { ...@@ -236,6 +236,17 @@ func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) {
continue continue
} }
// Scoped silencing: if a matching silence exists, skip creating a firing event.
if s.opsService != nil {
platform := strings.TrimSpace(scopePlatform)
region := scopeRegion
if platform != "" {
if ok, err := s.opsService.IsAlertSilenced(ctx, rule.ID, platform, scopeGroupID, region, now); err == nil && ok {
continue
}
}
}
latestEvent, err := s.opsRepo.GetLatestAlertEvent(ctx, rule.ID) latestEvent, err := s.opsRepo.GetLatestAlertEvent(ctx, rule.ID)
if err != nil { if err != nil {
log.Printf("[OpsAlertEvaluator] get latest event failed (rule=%d): %v", rule.ID, err) log.Printf("[OpsAlertEvaluator] get latest event failed (rule=%d): %v", rule.ID, err)
...@@ -359,9 +370,9 @@ func requiredSustainedBreaches(sustainedMinutes int, interval time.Duration) int ...@@ -359,9 +370,9 @@ func requiredSustainedBreaches(sustainedMinutes int, interval time.Duration) int
return required return required
} }
func parseOpsAlertRuleScope(filters map[string]any) (platform string, groupID *int64) { func parseOpsAlertRuleScope(filters map[string]any) (platform string, groupID *int64, region *string) {
if filters == nil { if filters == nil {
return "", nil return "", nil, nil
} }
if v, ok := filters["platform"]; ok { if v, ok := filters["platform"]; ok {
if s, ok := v.(string); ok { if s, ok := v.(string); ok {
...@@ -392,7 +403,15 @@ func parseOpsAlertRuleScope(filters map[string]any) (platform string, groupID *i ...@@ -392,7 +403,15 @@ func parseOpsAlertRuleScope(filters map[string]any) (platform string, groupID *i
} }
} }
} }
return platform, groupID if v, ok := filters["region"]; ok {
if s, ok := v.(string); ok {
vv := strings.TrimSpace(s)
if vv != "" {
region = &vv
}
}
}
return platform, groupID, region
} }
func (s *OpsAlertEvaluatorService) computeRuleMetric( func (s *OpsAlertEvaluatorService) computeRuleMetric(
...@@ -504,16 +523,6 @@ func (s *OpsAlertEvaluatorService) computeRuleMetric( ...@@ -504,16 +523,6 @@ func (s *OpsAlertEvaluatorService) computeRuleMetric(
return 0, false return 0, false
} }
return overview.UpstreamErrorRate * 100, true return overview.UpstreamErrorRate * 100, true
case "p95_latency_ms":
if overview.Duration.P95 == nil {
return 0, false
}
return float64(*overview.Duration.P95), true
case "p99_latency_ms":
if overview.Duration.P99 == nil {
return 0, false
}
return float64(*overview.Duration.P99), true
default: default:
return 0, false return 0, false
} }
......
...@@ -8,8 +8,9 @@ import "time" ...@@ -8,8 +8,9 @@ import "time"
// with the existing ops dashboard frontend (backup style). // with the existing ops dashboard frontend (backup style).
const ( const (
OpsAlertStatusFiring = "firing" OpsAlertStatusFiring = "firing"
OpsAlertStatusResolved = "resolved" OpsAlertStatusResolved = "resolved"
OpsAlertStatusManualResolved = "manual_resolved"
) )
type OpsAlertRule struct { type OpsAlertRule struct {
...@@ -58,12 +59,32 @@ type OpsAlertEvent struct { ...@@ -58,12 +59,32 @@ type OpsAlertEvent struct {
CreatedAt time.Time `json:"created_at"` CreatedAt time.Time `json:"created_at"`
} }
type OpsAlertSilence struct {
ID int64 `json:"id"`
RuleID int64 `json:"rule_id"`
Platform string `json:"platform"`
GroupID *int64 `json:"group_id,omitempty"`
Region *string `json:"region,omitempty"`
Until time.Time `json:"until"`
Reason string `json:"reason"`
CreatedBy *int64 `json:"created_by,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
type OpsAlertEventFilter struct { type OpsAlertEventFilter struct {
Limit int Limit int
// Cursor pagination (descending by fired_at, then id).
BeforeFiredAt *time.Time
BeforeID *int64
// Optional filters. // Optional filters.
Status string Status string
Severity string Severity string
EmailSent *bool
StartTime *time.Time StartTime *time.Time
EndTime *time.Time EndTime *time.Time
......
...@@ -88,6 +88,29 @@ func (s *OpsService) ListAlertEvents(ctx context.Context, filter *OpsAlertEventF ...@@ -88,6 +88,29 @@ func (s *OpsService) ListAlertEvents(ctx context.Context, filter *OpsAlertEventF
return s.opsRepo.ListAlertEvents(ctx, filter) return s.opsRepo.ListAlertEvents(ctx, filter)
} }
func (s *OpsService) GetAlertEventByID(ctx context.Context, eventID int64) (*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if eventID <= 0 {
return nil, infraerrors.BadRequest("INVALID_EVENT_ID", "invalid event id")
}
ev, err := s.opsRepo.GetAlertEventByID(ctx, eventID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, infraerrors.NotFound("OPS_ALERT_EVENT_NOT_FOUND", "alert event not found")
}
return nil, err
}
if ev == nil {
return nil, infraerrors.NotFound("OPS_ALERT_EVENT_NOT_FOUND", "alert event not found")
}
return ev, nil
}
func (s *OpsService) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) { func (s *OpsService) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil { if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err return nil, err
...@@ -101,6 +124,49 @@ func (s *OpsService) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*Op ...@@ -101,6 +124,49 @@ func (s *OpsService) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*Op
return s.opsRepo.GetActiveAlertEvent(ctx, ruleID) return s.opsRepo.GetActiveAlertEvent(ctx, ruleID)
} }
func (s *OpsService) CreateAlertSilence(ctx context.Context, input *OpsAlertSilence) (*OpsAlertSilence, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if input == nil {
return nil, infraerrors.BadRequest("INVALID_SILENCE", "invalid silence")
}
if input.RuleID <= 0 {
return nil, infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id")
}
if strings.TrimSpace(input.Platform) == "" {
return nil, infraerrors.BadRequest("INVALID_PLATFORM", "invalid platform")
}
if input.Until.IsZero() {
return nil, infraerrors.BadRequest("INVALID_UNTIL", "invalid until")
}
created, err := s.opsRepo.CreateAlertSilence(ctx, input)
if err != nil {
return nil, err
}
return created, nil
}
func (s *OpsService) IsAlertSilenced(ctx context.Context, ruleID int64, platform string, groupID *int64, region *string, now time.Time) (bool, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return false, err
}
if s.opsRepo == nil {
return false, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if ruleID <= 0 {
return false, infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id")
}
if strings.TrimSpace(platform) == "" {
return false, nil
}
return s.opsRepo.IsAlertSilenced(ctx, ruleID, platform, groupID, region, now)
}
func (s *OpsService) GetLatestAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) { func (s *OpsService) GetLatestAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil { if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err return nil, err
...@@ -142,7 +208,11 @@ func (s *OpsService) UpdateAlertEventStatus(ctx context.Context, eventID int64, ...@@ -142,7 +208,11 @@ func (s *OpsService) UpdateAlertEventStatus(ctx context.Context, eventID int64,
if eventID <= 0 { if eventID <= 0 {
return infraerrors.BadRequest("INVALID_EVENT_ID", "invalid event id") return infraerrors.BadRequest("INVALID_EVENT_ID", "invalid event id")
} }
if strings.TrimSpace(status) == "" { status = strings.TrimSpace(status)
if status == "" {
return infraerrors.BadRequest("INVALID_STATUS", "invalid status")
}
if status != OpsAlertStatusResolved && status != OpsAlertStatusManualResolved {
return infraerrors.BadRequest("INVALID_STATUS", "invalid status") return infraerrors.BadRequest("INVALID_STATUS", "invalid status")
} }
return s.opsRepo.UpdateAlertEventStatus(ctx, eventID, status, resolvedAt) return s.opsRepo.UpdateAlertEventStatus(ctx, eventID, status, resolvedAt)
......
...@@ -32,49 +32,38 @@ func computeDashboardHealthScore(now time.Time, overview *OpsDashboardOverview) ...@@ -32,49 +32,38 @@ func computeDashboardHealthScore(now time.Time, overview *OpsDashboardOverview)
} }
// computeBusinessHealth calculates business health score (0-100) // computeBusinessHealth calculates business health score (0-100)
// Components: SLA (50%) + Error Rate (30%) + Latency (20%) // Components: Error Rate (50%) + TTFT (50%)
func computeBusinessHealth(overview *OpsDashboardOverview) float64 { func computeBusinessHealth(overview *OpsDashboardOverview) float64 {
// SLA score: 99.5% → 100, 95% → 0 (linear) // Error rate score: 1% → 100, 10% → 0 (linear)
slaScore := 100.0
slaPct := clampFloat64(overview.SLA*100, 0, 100)
if slaPct < 99.5 {
if slaPct >= 95 {
slaScore = (slaPct - 95) / 4.5 * 100
} else {
slaScore = 0
}
}
// Error rate score: 0.5% → 100, 5% → 0 (linear)
// Combines request errors and upstream errors // Combines request errors and upstream errors
errorScore := 100.0 errorScore := 100.0
errorPct := clampFloat64(overview.ErrorRate*100, 0, 100) errorPct := clampFloat64(overview.ErrorRate*100, 0, 100)
upstreamPct := clampFloat64(overview.UpstreamErrorRate*100, 0, 100) upstreamPct := clampFloat64(overview.UpstreamErrorRate*100, 0, 100)
combinedErrorPct := math.Max(errorPct, upstreamPct) // Use worst case combinedErrorPct := math.Max(errorPct, upstreamPct) // Use worst case
if combinedErrorPct > 0.5 { if combinedErrorPct > 1.0 {
if combinedErrorPct <= 5 { if combinedErrorPct <= 10.0 {
errorScore = (5 - combinedErrorPct) / 4.5 * 100 errorScore = (10.0 - combinedErrorPct) / 9.0 * 100
} else { } else {
errorScore = 0 errorScore = 0
} }
} }
// Latency score: 1s → 100, 10s → 0 (linear) // TTFT score: 1s → 100, 3s → 0 (linear)
// Uses P99 of duration (TTFT is less critical for overall health) // Time to first token is critical for user experience
latencyScore := 100.0 ttftScore := 100.0
if overview.Duration.P99 != nil { if overview.TTFT.P99 != nil {
p99 := float64(*overview.Duration.P99) p99 := float64(*overview.TTFT.P99)
if p99 > 1000 { if p99 > 1000 {
if p99 <= 10000 { if p99 <= 3000 {
latencyScore = (10000 - p99) / 9000 * 100 ttftScore = (3000 - p99) / 2000 * 100
} else { } else {
latencyScore = 0 ttftScore = 0
} }
} }
} }
// Weighted combination // Weighted combination: 50% error rate + 50% TTFT
return slaScore*0.5 + errorScore*0.3 + latencyScore*0.2 return errorScore*0.5 + ttftScore*0.5
} }
// computeInfraHealth calculates infrastructure health score (0-100) // computeInfraHealth calculates infrastructure health score (0-100)
......
...@@ -127,8 +127,8 @@ func TestComputeDashboardHealthScore_Comprehensive(t *testing.T) { ...@@ -127,8 +127,8 @@ func TestComputeDashboardHealthScore_Comprehensive(t *testing.T) {
MemoryUsagePercent: float64Ptr(75), MemoryUsagePercent: float64Ptr(75),
}, },
}, },
wantMin: 60, wantMin: 96,
wantMax: 85, wantMax: 97,
}, },
{ {
name: "DB failure", name: "DB failure",
...@@ -203,8 +203,8 @@ func TestComputeDashboardHealthScore_Comprehensive(t *testing.T) { ...@@ -203,8 +203,8 @@ func TestComputeDashboardHealthScore_Comprehensive(t *testing.T) {
MemoryUsagePercent: float64Ptr(30), MemoryUsagePercent: float64Ptr(30),
}, },
}, },
wantMin: 25, wantMin: 84,
wantMax: 50, wantMax: 85,
}, },
{ {
name: "combined failures - business healthy + infra degraded", name: "combined failures - business healthy + infra degraded",
...@@ -277,30 +277,41 @@ func TestComputeBusinessHealth(t *testing.T) { ...@@ -277,30 +277,41 @@ func TestComputeBusinessHealth(t *testing.T) {
UpstreamErrorRate: 0, UpstreamErrorRate: 0,
Duration: OpsPercentiles{P99: intPtr(500)}, Duration: OpsPercentiles{P99: intPtr(500)},
}, },
wantMin: 50, wantMin: 100,
wantMax: 60, wantMax: 100,
}, },
{ {
name: "error rate boundary 0.5%", name: "error rate boundary 1%",
overview: &OpsDashboardOverview{ overview: &OpsDashboardOverview{
SLA: 0.995, SLA: 0.99,
ErrorRate: 0.005, ErrorRate: 0.01,
UpstreamErrorRate: 0, UpstreamErrorRate: 0,
Duration: OpsPercentiles{P99: intPtr(500)}, Duration: OpsPercentiles{P99: intPtr(500)},
}, },
wantMin: 95, wantMin: 100,
wantMax: 100, wantMax: 100,
}, },
{ {
name: "latency boundary 1000ms", name: "error rate 5%",
overview: &OpsDashboardOverview{ overview: &OpsDashboardOverview{
SLA: 0.995, SLA: 0.95,
ErrorRate: 0.05,
UpstreamErrorRate: 0,
Duration: OpsPercentiles{P99: intPtr(500)},
},
wantMin: 77,
wantMax: 78,
},
{
name: "TTFT boundary 2s",
overview: &OpsDashboardOverview{
SLA: 0.99,
ErrorRate: 0, ErrorRate: 0,
UpstreamErrorRate: 0, UpstreamErrorRate: 0,
Duration: OpsPercentiles{P99: intPtr(1000)}, TTFT: OpsPercentiles{P99: intPtr(2000)},
}, },
wantMin: 95, wantMin: 75,
wantMax: 100, wantMax: 75,
}, },
{ {
name: "upstream error dominates", name: "upstream error dominates",
...@@ -310,7 +321,7 @@ func TestComputeBusinessHealth(t *testing.T) { ...@@ -310,7 +321,7 @@ func TestComputeBusinessHealth(t *testing.T) {
UpstreamErrorRate: 0.03, UpstreamErrorRate: 0.03,
Duration: OpsPercentiles{P99: intPtr(500)}, Duration: OpsPercentiles{P99: intPtr(500)},
}, },
wantMin: 75, wantMin: 88,
wantMax: 90, wantMax: 90,
}, },
} }
......
...@@ -6,24 +6,43 @@ type OpsErrorLog struct { ...@@ -6,24 +6,43 @@ type OpsErrorLog struct {
ID int64 `json:"id"` ID int64 `json:"id"`
CreatedAt time.Time `json:"created_at"` CreatedAt time.Time `json:"created_at"`
Phase string `json:"phase"` // Standardized classification
Type string `json:"type"` // - phase: request|auth|routing|upstream|network|internal
// - owner: client|provider|platform
// - source: client_request|upstream_http|gateway
Phase string `json:"phase"`
Type string `json:"type"`
Owner string `json:"error_owner"`
Source string `json:"error_source"`
Severity string `json:"severity"` Severity string `json:"severity"`
StatusCode int `json:"status_code"` StatusCode int `json:"status_code"`
Platform string `json:"platform"` Platform string `json:"platform"`
Model string `json:"model"` Model string `json:"model"`
LatencyMs *int `json:"latency_ms"` IsRetryable bool `json:"is_retryable"`
RetryCount int `json:"retry_count"`
Resolved bool `json:"resolved"`
ResolvedAt *time.Time `json:"resolved_at"`
ResolvedByUserID *int64 `json:"resolved_by_user_id"`
ResolvedByUserName string `json:"resolved_by_user_name"`
ResolvedRetryID *int64 `json:"resolved_retry_id"`
ResolvedStatusRaw string `json:"-"`
ClientRequestID string `json:"client_request_id"` ClientRequestID string `json:"client_request_id"`
RequestID string `json:"request_id"` RequestID string `json:"request_id"`
Message string `json:"message"` Message string `json:"message"`
UserID *int64 `json:"user_id"` UserID *int64 `json:"user_id"`
APIKeyID *int64 `json:"api_key_id"` UserEmail string `json:"user_email"`
AccountID *int64 `json:"account_id"` APIKeyID *int64 `json:"api_key_id"`
GroupID *int64 `json:"group_id"` AccountID *int64 `json:"account_id"`
AccountName string `json:"account_name"`
GroupID *int64 `json:"group_id"`
GroupName string `json:"group_name"`
ClientIP *string `json:"client_ip"` ClientIP *string `json:"client_ip"`
RequestPath string `json:"request_path"` RequestPath string `json:"request_path"`
...@@ -67,9 +86,24 @@ type OpsErrorLogFilter struct { ...@@ -67,9 +86,24 @@ type OpsErrorLogFilter struct {
GroupID *int64 GroupID *int64
AccountID *int64 AccountID *int64
StatusCodes []int StatusCodes []int
Phase string StatusCodesOther bool
Query string Phase string
Owner string
Source string
Resolved *bool
Query string
UserQuery string // Search by user email
// Optional correlation keys for exact matching.
RequestID string
ClientRequestID string
// View controls error categorization for list endpoints.
// - errors: show actionable errors (exclude business-limited / 429 / 529)
// - excluded: only show excluded errors
// - all: show everything
View string
Page int Page int
PageSize int PageSize int
...@@ -90,12 +124,23 @@ type OpsRetryAttempt struct { ...@@ -90,12 +124,23 @@ type OpsRetryAttempt struct {
SourceErrorID int64 `json:"source_error_id"` SourceErrorID int64 `json:"source_error_id"`
Mode string `json:"mode"` Mode string `json:"mode"`
PinnedAccountID *int64 `json:"pinned_account_id"` PinnedAccountID *int64 `json:"pinned_account_id"`
PinnedAccountName string `json:"pinned_account_name"`
Status string `json:"status"` Status string `json:"status"`
StartedAt *time.Time `json:"started_at"` StartedAt *time.Time `json:"started_at"`
FinishedAt *time.Time `json:"finished_at"` FinishedAt *time.Time `json:"finished_at"`
DurationMs *int64 `json:"duration_ms"` DurationMs *int64 `json:"duration_ms"`
// Persisted execution results (best-effort)
Success *bool `json:"success"`
HTTPStatusCode *int `json:"http_status_code"`
UpstreamRequestID *string `json:"upstream_request_id"`
UsedAccountID *int64 `json:"used_account_id"`
UsedAccountName string `json:"used_account_name"`
ResponsePreview *string `json:"response_preview"`
ResponseTruncated *bool `json:"response_truncated"`
// Optional correlation
ResultRequestID *string `json:"result_request_id"` ResultRequestID *string `json:"result_request_id"`
ResultErrorID *int64 `json:"result_error_id"` ResultErrorID *int64 `json:"result_error_id"`
......
...@@ -14,6 +14,8 @@ type OpsRepository interface { ...@@ -14,6 +14,8 @@ type OpsRepository interface {
InsertRetryAttempt(ctx context.Context, input *OpsInsertRetryAttemptInput) (int64, error) InsertRetryAttempt(ctx context.Context, input *OpsInsertRetryAttemptInput) (int64, error)
UpdateRetryAttempt(ctx context.Context, input *OpsUpdateRetryAttemptInput) error UpdateRetryAttempt(ctx context.Context, input *OpsUpdateRetryAttemptInput) error
GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*OpsRetryAttempt, error) GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*OpsRetryAttempt, error)
ListRetryAttemptsByErrorID(ctx context.Context, sourceErrorID int64, limit int) ([]*OpsRetryAttempt, error)
UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64, resolvedAt *time.Time) error
// Lightweight window stats (for realtime WS / quick sampling). // Lightweight window stats (for realtime WS / quick sampling).
GetWindowStats(ctx context.Context, filter *OpsDashboardFilter) (*OpsWindowStats, error) GetWindowStats(ctx context.Context, filter *OpsDashboardFilter) (*OpsWindowStats, error)
...@@ -39,12 +41,17 @@ type OpsRepository interface { ...@@ -39,12 +41,17 @@ type OpsRepository interface {
DeleteAlertRule(ctx context.Context, id int64) error DeleteAlertRule(ctx context.Context, id int64) error
ListAlertEvents(ctx context.Context, filter *OpsAlertEventFilter) ([]*OpsAlertEvent, error) ListAlertEvents(ctx context.Context, filter *OpsAlertEventFilter) ([]*OpsAlertEvent, error)
GetAlertEventByID(ctx context.Context, eventID int64) (*OpsAlertEvent, error)
GetActiveAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error)
GetLatestAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) GetLatestAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error)
CreateAlertEvent(ctx context.Context, event *OpsAlertEvent) (*OpsAlertEvent, error) CreateAlertEvent(ctx context.Context, event *OpsAlertEvent) (*OpsAlertEvent, error)
UpdateAlertEventStatus(ctx context.Context, eventID int64, status string, resolvedAt *time.Time) error UpdateAlertEventStatus(ctx context.Context, eventID int64, status string, resolvedAt *time.Time) error
UpdateAlertEventEmailSent(ctx context.Context, eventID int64, emailSent bool) error UpdateAlertEventEmailSent(ctx context.Context, eventID int64, emailSent bool) error
// Alert silences
CreateAlertSilence(ctx context.Context, input *OpsAlertSilence) (*OpsAlertSilence, error)
IsAlertSilenced(ctx context.Context, ruleID int64, platform string, groupID *int64, region *string, now time.Time) (bool, error)
// Pre-aggregation (hourly/daily) used for long-window dashboard performance. // Pre-aggregation (hourly/daily) used for long-window dashboard performance.
UpsertHourlyMetrics(ctx context.Context, startTime, endTime time.Time) error UpsertHourlyMetrics(ctx context.Context, startTime, endTime time.Time) error
UpsertDailyMetrics(ctx context.Context, startTime, endTime time.Time) error UpsertDailyMetrics(ctx context.Context, startTime, endTime time.Time) error
...@@ -91,7 +98,6 @@ type OpsInsertErrorLogInput struct { ...@@ -91,7 +98,6 @@ type OpsInsertErrorLogInput struct {
// It is set by OpsService.RecordError before persisting. // It is set by OpsService.RecordError before persisting.
UpstreamErrorsJSON *string UpstreamErrorsJSON *string
DurationMs *int
TimeToFirstTokenMs *int64 TimeToFirstTokenMs *int64
RequestBodyJSON *string // sanitized json string (not raw bytes) RequestBodyJSON *string // sanitized json string (not raw bytes)
...@@ -124,7 +130,15 @@ type OpsUpdateRetryAttemptInput struct { ...@@ -124,7 +130,15 @@ type OpsUpdateRetryAttemptInput struct {
FinishedAt time.Time FinishedAt time.Time
DurationMs int64 DurationMs int64
// Optional correlation // Persisted execution results (best-effort)
Success *bool
HTTPStatusCode *int
UpstreamRequestID *string
UsedAccountID *int64
ResponsePreview *string
ResponseTruncated *bool
// Optional correlation (legacy fields kept)
ResultRequestID *string ResultRequestID *string
ResultErrorID *int64 ResultErrorID *int64
......
...@@ -108,6 +108,10 @@ func (w *limitedResponseWriter) truncated() bool { ...@@ -108,6 +108,10 @@ func (w *limitedResponseWriter) truncated() bool {
return w.totalWritten > int64(w.limit) return w.totalWritten > int64(w.limit)
} }
const (
OpsRetryModeUpstreamEvent = "upstream_event"
)
func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, errorID int64, mode string, pinnedAccountID *int64) (*OpsRetryResult, error) { func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, errorID int64, mode string, pinnedAccountID *int64) (*OpsRetryResult, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil { if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err return nil, err
...@@ -123,6 +127,81 @@ func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, er ...@@ -123,6 +127,81 @@ func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, er
return nil, infraerrors.BadRequest("OPS_RETRY_INVALID_MODE", "mode must be client or upstream") return nil, infraerrors.BadRequest("OPS_RETRY_INVALID_MODE", "mode must be client or upstream")
} }
errorLog, err := s.GetErrorLogByID(ctx, errorID)
if err != nil {
return nil, err
}
if errorLog == nil {
return nil, infraerrors.NotFound("OPS_ERROR_NOT_FOUND", "ops error log not found")
}
if strings.TrimSpace(errorLog.RequestBody) == "" {
return nil, infraerrors.BadRequest("OPS_RETRY_NO_REQUEST_BODY", "No request body found to retry")
}
var pinned *int64
if mode == OpsRetryModeUpstream {
if pinnedAccountID != nil && *pinnedAccountID > 0 {
pinned = pinnedAccountID
} else if errorLog.AccountID != nil && *errorLog.AccountID > 0 {
pinned = errorLog.AccountID
} else {
return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "pinned_account_id is required for upstream retry")
}
}
return s.retryWithErrorLog(ctx, requestedByUserID, errorID, mode, mode, pinned, errorLog)
}
// RetryUpstreamEvent retries a specific upstream attempt captured inside ops_error_logs.upstream_errors.
// idx is 0-based. It always pins the original event account_id.
func (s *OpsService) RetryUpstreamEvent(ctx context.Context, requestedByUserID int64, errorID int64, idx int) (*OpsRetryResult, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if idx < 0 {
return nil, infraerrors.BadRequest("OPS_RETRY_INVALID_UPSTREAM_IDX", "invalid upstream idx")
}
errorLog, err := s.GetErrorLogByID(ctx, errorID)
if err != nil {
return nil, err
}
if errorLog == nil {
return nil, infraerrors.NotFound("OPS_ERROR_NOT_FOUND", "ops error log not found")
}
events, err := ParseOpsUpstreamErrors(errorLog.UpstreamErrors)
if err != nil {
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_EVENTS_INVALID", "invalid upstream_errors")
}
if idx >= len(events) {
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_IDX_OOB", "upstream idx out of range")
}
ev := events[idx]
if ev == nil {
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_EVENT_MISSING", "upstream event missing")
}
if ev.AccountID <= 0 {
return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "account_id is required for upstream retry")
}
upstreamBody := strings.TrimSpace(ev.UpstreamRequestBody)
if upstreamBody == "" {
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_NO_REQUEST_BODY", "No upstream request body found to retry")
}
override := *errorLog
override.RequestBody = upstreamBody
pinned := ev.AccountID
// Persist as upstream_event, execute as upstream pinned retry.
return s.retryWithErrorLog(ctx, requestedByUserID, errorID, OpsRetryModeUpstreamEvent, OpsRetryModeUpstream, &pinned, &override)
}
func (s *OpsService) retryWithErrorLog(ctx context.Context, requestedByUserID int64, errorID int64, mode string, execMode string, pinnedAccountID *int64, errorLog *OpsErrorLogDetail) (*OpsRetryResult, error) {
latest, err := s.opsRepo.GetLatestRetryAttemptForError(ctx, errorID) latest, err := s.opsRepo.GetLatestRetryAttemptForError(ctx, errorID)
if err != nil && !errors.Is(err, sql.ErrNoRows) { if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, infraerrors.InternalServer("OPS_RETRY_LOAD_LATEST_FAILED", "Failed to check retry status").WithCause(err) return nil, infraerrors.InternalServer("OPS_RETRY_LOAD_LATEST_FAILED", "Failed to check retry status").WithCause(err)
...@@ -144,22 +223,18 @@ func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, er ...@@ -144,22 +223,18 @@ func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, er
} }
} }
errorLog, err := s.GetErrorLogByID(ctx, errorID) if errorLog == nil || strings.TrimSpace(errorLog.RequestBody) == "" {
if err != nil {
return nil, err
}
if strings.TrimSpace(errorLog.RequestBody) == "" {
return nil, infraerrors.BadRequest("OPS_RETRY_NO_REQUEST_BODY", "No request body found to retry") return nil, infraerrors.BadRequest("OPS_RETRY_NO_REQUEST_BODY", "No request body found to retry")
} }
var pinned *int64 var pinned *int64
if mode == OpsRetryModeUpstream { if execMode == OpsRetryModeUpstream {
if pinnedAccountID != nil && *pinnedAccountID > 0 { if pinnedAccountID != nil && *pinnedAccountID > 0 {
pinned = pinnedAccountID pinned = pinnedAccountID
} else if errorLog.AccountID != nil && *errorLog.AccountID > 0 { } else if errorLog.AccountID != nil && *errorLog.AccountID > 0 {
pinned = errorLog.AccountID pinned = errorLog.AccountID
} else { } else {
return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "pinned_account_id is required for upstream retry") return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "account_id is required for upstream retry")
} }
} }
...@@ -196,7 +271,7 @@ func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, er ...@@ -196,7 +271,7 @@ func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, er
execCtx, cancel := context.WithTimeout(ctx, opsRetryTimeout) execCtx, cancel := context.WithTimeout(ctx, opsRetryTimeout)
defer cancel() defer cancel()
execRes := s.executeRetry(execCtx, errorLog, mode, pinned) execRes := s.executeRetry(execCtx, errorLog, execMode, pinned)
finishedAt := time.Now() finishedAt := time.Now()
result.FinishedAt = finishedAt result.FinishedAt = finishedAt
...@@ -220,27 +295,40 @@ func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, er ...@@ -220,27 +295,40 @@ func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, er
msg := result.ErrorMessage msg := result.ErrorMessage
updateErrMsg = &msg updateErrMsg = &msg
} }
// Keep legacy result_request_id empty; use upstream_request_id instead.
var resultRequestID *string var resultRequestID *string
if strings.TrimSpace(result.UpstreamRequestID) != "" {
v := result.UpstreamRequestID
resultRequestID = &v
}
finalStatus := result.Status finalStatus := result.Status
if strings.TrimSpace(finalStatus) == "" { if strings.TrimSpace(finalStatus) == "" {
finalStatus = opsRetryStatusFailed finalStatus = opsRetryStatusFailed
} }
success := strings.EqualFold(finalStatus, opsRetryStatusSucceeded)
httpStatus := result.HTTPStatusCode
upstreamReqID := result.UpstreamRequestID
usedAccountID := result.UsedAccountID
preview := result.ResponsePreview
truncated := result.ResponseTruncated
if err := s.opsRepo.UpdateRetryAttempt(updateCtx, &OpsUpdateRetryAttemptInput{ if err := s.opsRepo.UpdateRetryAttempt(updateCtx, &OpsUpdateRetryAttemptInput{
ID: attemptID, ID: attemptID,
Status: finalStatus, Status: finalStatus,
FinishedAt: finishedAt, FinishedAt: finishedAt,
DurationMs: result.DurationMs, DurationMs: result.DurationMs,
ResultRequestID: resultRequestID, Success: &success,
ErrorMessage: updateErrMsg, HTTPStatusCode: &httpStatus,
UpstreamRequestID: &upstreamReqID,
UsedAccountID: usedAccountID,
ResponsePreview: &preview,
ResponseTruncated: &truncated,
ResultRequestID: resultRequestID,
ErrorMessage: updateErrMsg,
}); err != nil { }); err != nil {
// Best-effort: retry itself already executed; do not fail the API response.
log.Printf("[Ops] UpdateRetryAttempt failed: %v", err) log.Printf("[Ops] UpdateRetryAttempt failed: %v", err)
} else if success {
if err := s.opsRepo.UpdateErrorResolution(updateCtx, errorID, true, &requestedByUserID, &attemptID, &finishedAt); err != nil {
log.Printf("[Ops] UpdateErrorResolution failed: %v", err)
}
} }
return result, nil return result, nil
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment