Commit 5baa8b56 authored by IanShaw027's avatar IanShaw027
Browse files

feat(service): 实现运维监控业务逻辑层

- 新增 ops 主服务(ops_service.go)和端口定义(ops_port.go)
- 实现账号可用性检查服务(ops_account_availability.go)
- 实现数据聚合服务(ops_aggregation_service.go)
- 实现告警评估服务(ops_alert_evaluator_service.go)
- 实现告警管理服务(ops_alerts.go)
- 实现数据清理服务(ops_cleanup_service.go)
- 实现并发控制服务(ops_concurrency.go)
- 实现仪表板服务(ops_dashboard.go)
- 实现错误处理服务(ops_errors.go)
- 实现直方图服务(ops_histograms.go)
- 实现指标采集服务(ops_metrics_collector.go)
- 实现查询模式服务(ops_query_mode.go)
- 实现实时监控服务(ops_realtime.go)
- 实现请求详情服务(ops_request_details.go)
- 实现重试机制服务(ops_retry.go)
- 实现配置管理服务(ops_settings.go)
- 实现趋势分析服务(ops_trends.go)
- 实现窗口统计服务(ops_window_stats.go)
- 添加 ops 相关领域常量
- 注册 service 依赖注入
parent bb530327
...@@ -105,6 +105,28 @@ const ( ...@@ -105,6 +105,28 @@ const (
// Request identity patch (Claude -> Gemini systemInstruction injection) // Request identity patch (Claude -> Gemini systemInstruction injection)
SettingKeyEnableIdentityPatch = "enable_identity_patch" SettingKeyEnableIdentityPatch = "enable_identity_patch"
SettingKeyIdentityPatchPrompt = "identity_patch_prompt" SettingKeyIdentityPatchPrompt = "identity_patch_prompt"
// =========================
// Ops Monitoring (vNext)
// =========================
// SettingKeyOpsMonitoringEnabled is a DB-backed soft switch to enable/disable ops module at runtime.
SettingKeyOpsMonitoringEnabled = "ops_monitoring_enabled"
// SettingKeyOpsRealtimeMonitoringEnabled controls realtime features (e.g. WS/QPS push).
SettingKeyOpsRealtimeMonitoringEnabled = "ops_realtime_monitoring_enabled"
// SettingKeyOpsQueryModeDefault controls the default query mode for ops dashboard (auto/raw/preagg).
SettingKeyOpsQueryModeDefault = "ops_query_mode_default"
// SettingKeyOpsEmailNotificationConfig stores JSON config for ops email notifications.
SettingKeyOpsEmailNotificationConfig = "ops_email_notification_config"
// SettingKeyOpsAlertRuntimeSettings stores JSON config for ops alert evaluator runtime settings.
SettingKeyOpsAlertRuntimeSettings = "ops_alert_runtime_settings"
// SettingKeyOpsMetricsIntervalSeconds controls the ops metrics collector interval (>=60).
SettingKeyOpsMetricsIntervalSeconds = "ops_metrics_interval_seconds"
) )
// AdminAPIKeyPrefix is the prefix for admin API keys (distinct from user "sk-" keys). // AdminAPIKeyPrefix is the prefix for admin API keys (distinct from user "sk-" keys).
......
package service
import (
"context"
"time"
)
// GetAccountAvailabilityStats returns current account availability stats.
//
// Query-level filtering is intentionally limited to platform/group to match the dashboard scope.
func (s *OpsService) GetAccountAvailabilityStats(ctx context.Context, platformFilter string, groupIDFilter *int64) (
map[string]*PlatformAvailability,
map[int64]*GroupAvailability,
map[int64]*AccountAvailability,
*time.Time,
error,
) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, nil, nil, nil, err
}
accounts, err := s.listAllAccountsForOps(ctx, platformFilter)
if err != nil {
return nil, nil, nil, nil, err
}
if groupIDFilter != nil && *groupIDFilter > 0 {
filtered := make([]Account, 0, len(accounts))
for _, acc := range accounts {
for _, grp := range acc.Groups {
if grp != nil && grp.ID == *groupIDFilter {
filtered = append(filtered, acc)
break
}
}
}
accounts = filtered
}
now := time.Now()
collectedAt := now
platform := make(map[string]*PlatformAvailability)
group := make(map[int64]*GroupAvailability)
account := make(map[int64]*AccountAvailability)
for _, acc := range accounts {
if acc.ID <= 0 {
continue
}
isTempUnsched := false
if acc.TempUnschedulableUntil != nil && now.Before(*acc.TempUnschedulableUntil) {
isTempUnsched = true
}
isRateLimited := acc.RateLimitResetAt != nil && now.Before(*acc.RateLimitResetAt)
isOverloaded := acc.OverloadUntil != nil && now.Before(*acc.OverloadUntil)
hasError := acc.Status == StatusError
// Normalize exclusive status flags so the UI doesn't show conflicting badges.
if hasError {
isRateLimited = false
isOverloaded = false
}
isAvailable := acc.Status == StatusActive && acc.Schedulable && !isRateLimited && !isOverloaded && !isTempUnsched
if acc.Platform != "" {
if _, ok := platform[acc.Platform]; !ok {
platform[acc.Platform] = &PlatformAvailability{
Platform: acc.Platform,
}
}
p := platform[acc.Platform]
p.TotalAccounts++
if isAvailable {
p.AvailableCount++
}
if isRateLimited {
p.RateLimitCount++
}
if hasError {
p.ErrorCount++
}
}
for _, grp := range acc.Groups {
if grp == nil || grp.ID <= 0 {
continue
}
if _, ok := group[grp.ID]; !ok {
group[grp.ID] = &GroupAvailability{
GroupID: grp.ID,
GroupName: grp.Name,
Platform: grp.Platform,
}
}
g := group[grp.ID]
g.TotalAccounts++
if isAvailable {
g.AvailableCount++
}
if isRateLimited {
g.RateLimitCount++
}
if hasError {
g.ErrorCount++
}
}
displayGroupID := int64(0)
displayGroupName := ""
if len(acc.Groups) > 0 && acc.Groups[0] != nil {
displayGroupID = acc.Groups[0].ID
displayGroupName = acc.Groups[0].Name
}
item := &AccountAvailability{
AccountID: acc.ID,
AccountName: acc.Name,
Platform: acc.Platform,
GroupID: displayGroupID,
GroupName: displayGroupName,
Status: acc.Status,
IsAvailable: isAvailable,
IsRateLimited: isRateLimited,
IsOverloaded: isOverloaded,
HasError: hasError,
ErrorMessage: acc.ErrorMessage,
}
if isRateLimited && acc.RateLimitResetAt != nil {
item.RateLimitResetAt = acc.RateLimitResetAt
remainingSec := int64(time.Until(*acc.RateLimitResetAt).Seconds())
if remainingSec > 0 {
item.RateLimitRemainingSec = &remainingSec
}
}
if isOverloaded && acc.OverloadUntil != nil {
item.OverloadUntil = acc.OverloadUntil
remainingSec := int64(time.Until(*acc.OverloadUntil).Seconds())
if remainingSec > 0 {
item.OverloadRemainingSec = &remainingSec
}
}
if isTempUnsched && acc.TempUnschedulableUntil != nil {
item.TempUnschedulableUntil = acc.TempUnschedulableUntil
}
account[acc.ID] = item
}
return platform, group, account, &collectedAt, nil
}
package service
import (
"context"
"database/sql"
"errors"
"log"
"strings"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
)
const (
opsAggHourlyJobName = "ops_preaggregation_hourly"
opsAggDailyJobName = "ops_preaggregation_daily"
opsAggHourlyInterval = 10 * time.Minute
opsAggDailyInterval = 1 * time.Hour
// Keep in sync with ops retention target (vNext default 30d).
opsAggBackfillWindow = 30 * 24 * time.Hour
// Recompute overlap to absorb late-arriving rows near boundaries.
opsAggHourlyOverlap = 2 * time.Hour
opsAggDailyOverlap = 48 * time.Hour
opsAggHourlyChunk = 24 * time.Hour
opsAggDailyChunk = 7 * 24 * time.Hour
// Delay around boundaries (e.g. 10:00..10:05) to avoid aggregating buckets
// that may still receive late inserts.
opsAggSafeDelay = 5 * time.Minute
opsAggMaxQueryTimeout = 3 * time.Second
opsAggHourlyTimeout = 5 * time.Minute
opsAggDailyTimeout = 2 * time.Minute
opsAggHourlyLeaderLockKey = "ops:aggregation:hourly:leader"
opsAggDailyLeaderLockKey = "ops:aggregation:daily:leader"
opsAggHourlyLeaderLockTTL = 15 * time.Minute
opsAggDailyLeaderLockTTL = 10 * time.Minute
)
// OpsAggregationService periodically backfills ops_metrics_hourly / ops_metrics_daily
// for stable long-window dashboard queries.
//
// It is safe to run in multi-replica deployments when Redis is available (leader lock).
type OpsAggregationService struct {
opsRepo OpsRepository
settingRepo SettingRepository
cfg *config.Config
db *sql.DB
redisClient *redis.Client
instanceID string
stopCh chan struct{}
startOnce sync.Once
stopOnce sync.Once
hourlyMu sync.Mutex
dailyMu sync.Mutex
skipLogMu sync.Mutex
skipLogAt time.Time
}
func NewOpsAggregationService(
opsRepo OpsRepository,
settingRepo SettingRepository,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
) *OpsAggregationService {
return &OpsAggregationService{
opsRepo: opsRepo,
settingRepo: settingRepo,
cfg: cfg,
db: db,
redisClient: redisClient,
instanceID: uuid.NewString(),
}
}
func (s *OpsAggregationService) Start() {
if s == nil {
return
}
s.startOnce.Do(func() {
if s.stopCh == nil {
s.stopCh = make(chan struct{})
}
go s.hourlyLoop()
go s.dailyLoop()
})
}
func (s *OpsAggregationService) Stop() {
if s == nil {
return
}
s.stopOnce.Do(func() {
if s.stopCh != nil {
close(s.stopCh)
}
})
}
func (s *OpsAggregationService) hourlyLoop() {
// First run immediately.
s.aggregateHourly()
ticker := time.NewTicker(opsAggHourlyInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.aggregateHourly()
case <-s.stopCh:
return
}
}
}
func (s *OpsAggregationService) dailyLoop() {
// First run immediately.
s.aggregateDaily()
ticker := time.NewTicker(opsAggDailyInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.aggregateDaily()
case <-s.stopCh:
return
}
}
}
func (s *OpsAggregationService) aggregateHourly() {
if s == nil || s.opsRepo == nil {
return
}
if s.cfg != nil {
if !s.cfg.Ops.Enabled {
return
}
if !s.cfg.Ops.Aggregation.Enabled {
return
}
}
ctx, cancel := context.WithTimeout(context.Background(), opsAggHourlyTimeout)
defer cancel()
if !s.isMonitoringEnabled(ctx) {
return
}
release, ok := s.tryAcquireLeaderLock(ctx, opsAggHourlyLeaderLockKey, opsAggHourlyLeaderLockTTL, "[OpsAggregation][hourly]")
if !ok {
return
}
if release != nil {
defer release()
}
s.hourlyMu.Lock()
defer s.hourlyMu.Unlock()
startedAt := time.Now().UTC()
runAt := startedAt
// Aggregate stable full hours only.
end := utcFloorToHour(time.Now().UTC().Add(-opsAggSafeDelay))
start := end.Add(-opsAggBackfillWindow)
// Resume from the latest bucket with overlap.
{
ctxMax, cancelMax := context.WithTimeout(context.Background(), opsAggMaxQueryTimeout)
latest, ok, err := s.opsRepo.GetLatestHourlyBucketStart(ctxMax)
cancelMax()
if err != nil {
log.Printf("[OpsAggregation][hourly] failed to read latest bucket: %v", err)
} else if ok {
candidate := latest.Add(-opsAggHourlyOverlap)
if candidate.After(start) {
start = candidate
}
}
}
start = utcFloorToHour(start)
if !start.Before(end) {
return
}
var aggErr error
for cursor := start; cursor.Before(end); cursor = cursor.Add(opsAggHourlyChunk) {
chunkEnd := minTime(cursor.Add(opsAggHourlyChunk), end)
if err := s.opsRepo.UpsertHourlyMetrics(ctx, cursor, chunkEnd); err != nil {
aggErr = err
log.Printf("[OpsAggregation][hourly] upsert failed (%s..%s): %v", cursor.Format(time.RFC3339), chunkEnd.Format(time.RFC3339), err)
break
}
}
finishedAt := time.Now().UTC()
durationMs := finishedAt.Sub(startedAt).Milliseconds()
dur := durationMs
if aggErr != nil {
msg := truncateString(aggErr.Error(), 2048)
errAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer hbCancel()
_ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsAggHourlyJobName,
LastRunAt: &runAt,
LastErrorAt: &errAt,
LastError: &msg,
LastDurationMs: &dur,
})
return
}
successAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer hbCancel()
_ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsAggHourlyJobName,
LastRunAt: &runAt,
LastSuccessAt: &successAt,
LastDurationMs: &dur,
})
}
func (s *OpsAggregationService) aggregateDaily() {
if s == nil || s.opsRepo == nil {
return
}
if s.cfg != nil {
if !s.cfg.Ops.Enabled {
return
}
if !s.cfg.Ops.Aggregation.Enabled {
return
}
}
ctx, cancel := context.WithTimeout(context.Background(), opsAggDailyTimeout)
defer cancel()
if !s.isMonitoringEnabled(ctx) {
return
}
release, ok := s.tryAcquireLeaderLock(ctx, opsAggDailyLeaderLockKey, opsAggDailyLeaderLockTTL, "[OpsAggregation][daily]")
if !ok {
return
}
if release != nil {
defer release()
}
s.dailyMu.Lock()
defer s.dailyMu.Unlock()
startedAt := time.Now().UTC()
runAt := startedAt
end := utcFloorToDay(time.Now().UTC())
start := end.Add(-opsAggBackfillWindow)
{
ctxMax, cancelMax := context.WithTimeout(context.Background(), opsAggMaxQueryTimeout)
latest, ok, err := s.opsRepo.GetLatestDailyBucketDate(ctxMax)
cancelMax()
if err != nil {
log.Printf("[OpsAggregation][daily] failed to read latest bucket: %v", err)
} else if ok {
candidate := latest.Add(-opsAggDailyOverlap)
if candidate.After(start) {
start = candidate
}
}
}
start = utcFloorToDay(start)
if !start.Before(end) {
return
}
var aggErr error
for cursor := start; cursor.Before(end); cursor = cursor.Add(opsAggDailyChunk) {
chunkEnd := minTime(cursor.Add(opsAggDailyChunk), end)
if err := s.opsRepo.UpsertDailyMetrics(ctx, cursor, chunkEnd); err != nil {
aggErr = err
log.Printf("[OpsAggregation][daily] upsert failed (%s..%s): %v", cursor.Format("2006-01-02"), chunkEnd.Format("2006-01-02"), err)
break
}
}
finishedAt := time.Now().UTC()
durationMs := finishedAt.Sub(startedAt).Milliseconds()
dur := durationMs
if aggErr != nil {
msg := truncateString(aggErr.Error(), 2048)
errAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer hbCancel()
_ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsAggDailyJobName,
LastRunAt: &runAt,
LastErrorAt: &errAt,
LastError: &msg,
LastDurationMs: &dur,
})
return
}
successAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer hbCancel()
_ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsAggDailyJobName,
LastRunAt: &runAt,
LastSuccessAt: &successAt,
LastDurationMs: &dur,
})
}
func (s *OpsAggregationService) isMonitoringEnabled(ctx context.Context) bool {
if s == nil {
return false
}
if s.cfg != nil && !s.cfg.Ops.Enabled {
return false
}
if s.settingRepo == nil {
return true
}
if ctx == nil {
ctx = context.Background()
}
value, err := s.settingRepo.GetValue(ctx, SettingKeyOpsMonitoringEnabled)
if err != nil {
if errors.Is(err, ErrSettingNotFound) {
return true
}
return true
}
switch strings.ToLower(strings.TrimSpace(value)) {
case "false", "0", "off", "disabled":
return false
default:
return true
}
}
var opsAggReleaseScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`)
func (s *OpsAggregationService) tryAcquireLeaderLock(ctx context.Context, key string, ttl time.Duration, logPrefix string) (func(), bool) {
if s == nil || s.redisClient == nil {
return nil, true
}
if ctx == nil {
ctx = context.Background()
}
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
if err != nil {
// Fail-open: do not block single-instance deployments.
return nil, true
}
if !ok {
s.maybeLogSkip(logPrefix)
return nil, false
}
release := func() {
ctx2, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, _ = opsAggReleaseScript.Run(ctx2, s.redisClient, []string{key}, s.instanceID).Result()
}
return release, true
}
func (s *OpsAggregationService) maybeLogSkip(prefix string) {
s.skipLogMu.Lock()
defer s.skipLogMu.Unlock()
now := time.Now()
if !s.skipLogAt.IsZero() && now.Sub(s.skipLogAt) < time.Minute {
return
}
s.skipLogAt = now
if prefix == "" {
prefix = "[OpsAggregation]"
}
log.Printf("%s leader lock held by another instance; skipping", prefix)
}
func utcFloorToHour(t time.Time) time.Time {
return t.UTC().Truncate(time.Hour)
}
func utcFloorToDay(t time.Time) time.Time {
u := t.UTC()
y, m, d := u.Date()
return time.Date(y, m, d, 0, 0, 0, 0, time.UTC)
}
func minTime(a, b time.Time) time.Time {
if a.Before(b) {
return a
}
return b
}
This diff is collapsed.
package service
import (
"context"
"database/sql"
"errors"
"strings"
"time"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
func (s *OpsService) ListAlertRules(ctx context.Context) ([]*OpsAlertRule, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return []*OpsAlertRule{}, nil
}
return s.opsRepo.ListAlertRules(ctx)
}
func (s *OpsService) CreateAlertRule(ctx context.Context, rule *OpsAlertRule) (*OpsAlertRule, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if rule == nil {
return nil, infraerrors.BadRequest("INVALID_RULE", "invalid rule")
}
created, err := s.opsRepo.CreateAlertRule(ctx, rule)
if err != nil {
return nil, err
}
return created, nil
}
func (s *OpsService) UpdateAlertRule(ctx context.Context, rule *OpsAlertRule) (*OpsAlertRule, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if rule == nil || rule.ID <= 0 {
return nil, infraerrors.BadRequest("INVALID_RULE", "invalid rule")
}
updated, err := s.opsRepo.UpdateAlertRule(ctx, rule)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, infraerrors.NotFound("OPS_ALERT_RULE_NOT_FOUND", "alert rule not found")
}
return nil, err
}
return updated, nil
}
func (s *OpsService) DeleteAlertRule(ctx context.Context, id int64) error {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return err
}
if s.opsRepo == nil {
return infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if id <= 0 {
return infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id")
}
if err := s.opsRepo.DeleteAlertRule(ctx, id); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return infraerrors.NotFound("OPS_ALERT_RULE_NOT_FOUND", "alert rule not found")
}
return err
}
return nil
}
func (s *OpsService) ListAlertEvents(ctx context.Context, filter *OpsAlertEventFilter) ([]*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return []*OpsAlertEvent{}, nil
}
return s.opsRepo.ListAlertEvents(ctx, filter)
}
func (s *OpsService) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if ruleID <= 0 {
return nil, infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id")
}
return s.opsRepo.GetActiveAlertEvent(ctx, ruleID)
}
func (s *OpsService) GetLatestAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if ruleID <= 0 {
return nil, infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id")
}
return s.opsRepo.GetLatestAlertEvent(ctx, ruleID)
}
func (s *OpsService) CreateAlertEvent(ctx context.Context, event *OpsAlertEvent) (*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if event == nil {
return nil, infraerrors.BadRequest("INVALID_EVENT", "invalid event")
}
created, err := s.opsRepo.CreateAlertEvent(ctx, event)
if err != nil {
return nil, err
}
return created, nil
}
func (s *OpsService) UpdateAlertEventStatus(ctx context.Context, eventID int64, status string, resolvedAt *time.Time) error {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return err
}
if s.opsRepo == nil {
return infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if eventID <= 0 {
return infraerrors.BadRequest("INVALID_EVENT_ID", "invalid event id")
}
if strings.TrimSpace(status) == "" {
return infraerrors.BadRequest("INVALID_STATUS", "invalid status")
}
return s.opsRepo.UpdateAlertEventStatus(ctx, eventID, status, resolvedAt)
}
func (s *OpsService) UpdateAlertEventEmailSent(ctx context.Context, eventID int64, emailSent bool) error {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return err
}
if s.opsRepo == nil {
return infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if eventID <= 0 {
return infraerrors.BadRequest("INVALID_EVENT_ID", "invalid event id")
}
return s.opsRepo.UpdateAlertEventEmailSent(ctx, eventID, emailSent)
}
package service
import (
"context"
"database/sql"
"fmt"
"log"
"strings"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
)
const (
opsCleanupJobName = "ops_cleanup"
opsCleanupLeaderLockKeyDefault = "ops:cleanup:leader"
opsCleanupLeaderLockTTLDefault = 30 * time.Minute
)
var opsCleanupCronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
var opsCleanupReleaseScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`)
// OpsCleanupService periodically deletes old ops data to prevent unbounded DB growth.
//
// - Scheduling: 5-field cron spec (minute hour dom month dow).
// - Multi-instance: best-effort Redis leader lock so only one node runs cleanup.
// - Safety: deletes in batches to avoid long transactions.
type OpsCleanupService struct {
opsRepo OpsRepository
db *sql.DB
redisClient *redis.Client
cfg *config.Config
instanceID string
cron *cron.Cron
entryID cron.EntryID
startOnce sync.Once
stopOnce sync.Once
warnNoRedisOnce sync.Once
}
func NewOpsCleanupService(
opsRepo OpsRepository,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
) *OpsCleanupService {
return &OpsCleanupService{
opsRepo: opsRepo,
db: db,
redisClient: redisClient,
cfg: cfg,
instanceID: uuid.NewString(),
}
}
func (s *OpsCleanupService) Start() {
if s == nil {
return
}
if s.cfg != nil && !s.cfg.Ops.Enabled {
return
}
if s.cfg != nil && !s.cfg.Ops.Cleanup.Enabled {
log.Printf("[OpsCleanup] not started (disabled)")
return
}
if s.opsRepo == nil || s.db == nil {
log.Printf("[OpsCleanup] not started (missing deps)")
return
}
s.startOnce.Do(func() {
schedule := "0 2 * * *"
if s.cfg != nil && strings.TrimSpace(s.cfg.Ops.Cleanup.Schedule) != "" {
schedule = strings.TrimSpace(s.cfg.Ops.Cleanup.Schedule)
}
loc := time.Local
if s.cfg != nil && strings.TrimSpace(s.cfg.Timezone) != "" {
if parsed, err := time.LoadLocation(strings.TrimSpace(s.cfg.Timezone)); err == nil && parsed != nil {
loc = parsed
}
}
c := cron.New(cron.WithParser(opsCleanupCronParser), cron.WithLocation(loc))
id, err := c.AddFunc(schedule, func() { s.runScheduled() })
if err != nil {
log.Printf("[OpsCleanup] not started (invalid schedule=%q): %v", schedule, err)
return
}
s.cron = c
s.entryID = id
s.cron.Start()
log.Printf("[OpsCleanup] started (schedule=%q tz=%s)", schedule, loc.String())
})
}
func (s *OpsCleanupService) Stop() {
if s == nil {
return
}
s.stopOnce.Do(func() {
if s.cron != nil {
ctx := s.cron.Stop()
select {
case <-ctx.Done():
case <-time.After(3 * time.Second):
log.Printf("[OpsCleanup] cron stop timed out")
}
}
})
}
func (s *OpsCleanupService) runScheduled() {
if s == nil || s.db == nil || s.opsRepo == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
release, ok := s.tryAcquireLeaderLock(ctx)
if !ok {
return
}
if release != nil {
defer release()
}
startedAt := time.Now().UTC()
runAt := startedAt
counts, err := s.runCleanupOnce(ctx)
if err != nil {
s.recordHeartbeatError(runAt, time.Since(startedAt), err)
log.Printf("[OpsCleanup] cleanup failed: %v", err)
return
}
s.recordHeartbeatSuccess(runAt, time.Since(startedAt))
log.Printf("[OpsCleanup] cleanup complete: %s", counts)
}
type opsCleanupDeletedCounts struct {
errorLogs int64
retryAttempts int64
alertEvents int64
systemMetrics int64
hourlyPreagg int64
dailyPreagg int64
}
func (c opsCleanupDeletedCounts) String() string {
return fmt.Sprintf(
"error_logs=%d retry_attempts=%d alert_events=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d",
c.errorLogs,
c.retryAttempts,
c.alertEvents,
c.systemMetrics,
c.hourlyPreagg,
c.dailyPreagg,
)
}
func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDeletedCounts, error) {
out := opsCleanupDeletedCounts{}
if s == nil || s.db == nil || s.cfg == nil {
return out, nil
}
batchSize := 5000
now := time.Now().UTC()
// Error-like tables: error logs / retry attempts / alert events.
if days := s.cfg.Ops.Cleanup.ErrorLogRetentionDays; days > 0 {
cutoff := now.AddDate(0, 0, -days)
n, err := deleteOldRowsByID(ctx, s.db, "ops_error_logs", "created_at", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.errorLogs = n
n, err = deleteOldRowsByID(ctx, s.db, "ops_retry_attempts", "created_at", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.retryAttempts = n
n, err = deleteOldRowsByID(ctx, s.db, "ops_alert_events", "created_at", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.alertEvents = n
}
// Minute-level metrics snapshots.
if days := s.cfg.Ops.Cleanup.MinuteMetricsRetentionDays; days > 0 {
cutoff := now.AddDate(0, 0, -days)
n, err := deleteOldRowsByID(ctx, s.db, "ops_system_metrics", "created_at", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.systemMetrics = n
}
// Pre-aggregation tables (hourly/daily).
if days := s.cfg.Ops.Cleanup.HourlyMetricsRetentionDays; days > 0 {
cutoff := now.AddDate(0, 0, -days)
n, err := deleteOldRowsByID(ctx, s.db, "ops_metrics_hourly", "bucket_start", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.hourlyPreagg = n
n, err = deleteOldRowsByID(ctx, s.db, "ops_metrics_daily", "bucket_date", cutoff, batchSize, true)
if err != nil {
return out, err
}
out.dailyPreagg = n
}
return out, nil
}
func deleteOldRowsByID(
ctx context.Context,
db *sql.DB,
table string,
timeColumn string,
cutoff time.Time,
batchSize int,
castCutoffToDate bool,
) (int64, error) {
if db == nil {
return 0, nil
}
if batchSize <= 0 {
batchSize = 5000
}
where := fmt.Sprintf("%s < $1", timeColumn)
if castCutoffToDate {
where = fmt.Sprintf("%s < $1::date", timeColumn)
}
q := fmt.Sprintf(`
WITH batch AS (
SELECT id FROM %s
WHERE %s
ORDER BY id
LIMIT $2
)
DELETE FROM %s
WHERE id IN (SELECT id FROM batch)
`, table, where, table)
var total int64
for {
res, err := db.ExecContext(ctx, q, cutoff, batchSize)
if err != nil {
// If ops tables aren't present yet (partial deployments), treat as no-op.
if strings.Contains(strings.ToLower(err.Error()), "does not exist") && strings.Contains(strings.ToLower(err.Error()), "relation") {
return total, nil
}
return total, err
}
affected, err := res.RowsAffected()
if err != nil {
return total, err
}
total += affected
if affected == 0 {
break
}
}
return total, nil
}
func (s *OpsCleanupService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) {
if s == nil {
return nil, false
}
// In simple run mode, assume single instance.
if s.cfg != nil && s.cfg.RunMode == config.RunModeSimple {
return nil, true
}
if s.redisClient == nil {
s.warnNoRedisOnce.Do(func() {
log.Printf("[OpsCleanup] redis not configured; running without distributed lock")
})
return nil, true
}
key := opsCleanupLeaderLockKeyDefault
ttl := opsCleanupLeaderLockTTLDefault
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
if err != nil {
s.warnNoRedisOnce.Do(func() {
log.Printf("[OpsCleanup] leader lock SetNX failed; running without lock: %v", err)
})
return nil, true
}
if !ok {
return nil, false
}
return func() {
_, _ = opsCleanupReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result()
}, true
}
func (s *OpsCleanupService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration) {
if s == nil || s.opsRepo == nil {
return
}
now := time.Now().UTC()
durMs := duration.Milliseconds()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsCleanupJobName,
LastRunAt: &runAt,
LastSuccessAt: &now,
LastDurationMs: &durMs,
})
}
func (s *OpsCleanupService) recordHeartbeatError(runAt time.Time, duration time.Duration, err error) {
if s == nil || s.opsRepo == nil || err == nil {
return
}
now := time.Now().UTC()
durMs := duration.Milliseconds()
msg := truncateString(err.Error(), 2048)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsCleanupJobName,
LastRunAt: &runAt,
LastErrorAt: &now,
LastError: &msg,
LastDurationMs: &durMs,
})
}
package service
import (
"context"
"log"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
)
const (
opsAccountsPageSize = 100
opsConcurrencyBatchChunkSize = 200
)
func (s *OpsService) listAllAccountsForOps(ctx context.Context, platformFilter string) ([]Account, error) {
if s == nil || s.accountRepo == nil {
return []Account{}, nil
}
out := make([]Account, 0, 128)
page := 1
for {
accounts, pageInfo, err := s.accountRepo.ListWithFilters(ctx, pagination.PaginationParams{
Page: page,
PageSize: opsAccountsPageSize,
}, platformFilter, "", "", "")
if err != nil {
return nil, err
}
if len(accounts) == 0 {
break
}
out = append(out, accounts...)
if pageInfo != nil && int64(len(out)) >= pageInfo.Total {
break
}
if len(accounts) < opsAccountsPageSize {
break
}
page++
if page > 10_000 {
log.Printf("[Ops] listAllAccountsForOps: aborting after too many pages (platform=%q)", platformFilter)
break
}
}
return out, nil
}
func (s *OpsService) getAccountsLoadMapBestEffort(ctx context.Context, accounts []Account) map[int64]*AccountLoadInfo {
if s == nil || s.concurrencyService == nil {
return map[int64]*AccountLoadInfo{}
}
if len(accounts) == 0 {
return map[int64]*AccountLoadInfo{}
}
// De-duplicate IDs (and keep the max concurrency to avoid under-reporting).
unique := make(map[int64]int, len(accounts))
for _, acc := range accounts {
if acc.ID <= 0 {
continue
}
if prev, ok := unique[acc.ID]; !ok || acc.Concurrency > prev {
unique[acc.ID] = acc.Concurrency
}
}
batch := make([]AccountWithConcurrency, 0, len(unique))
for id, maxConc := range unique {
batch = append(batch, AccountWithConcurrency{
ID: id,
MaxConcurrency: maxConc,
})
}
out := make(map[int64]*AccountLoadInfo, len(batch))
for i := 0; i < len(batch); i += opsConcurrencyBatchChunkSize {
end := i + opsConcurrencyBatchChunkSize
if end > len(batch) {
end = len(batch)
}
part, err := s.concurrencyService.GetAccountsLoadBatch(ctx, batch[i:end])
if err != nil {
// Best-effort: return zeros rather than failing the ops UI.
log.Printf("[Ops] GetAccountsLoadBatch failed: %v", err)
continue
}
for k, v := range part {
out[k] = v
}
}
return out
}
// GetConcurrencyStats returns real-time concurrency usage aggregated by platform/group/account.
//
// Optional filters:
// - platformFilter: only include accounts in that platform (best-effort reduces DB load)
// - groupIDFilter: only include accounts that belong to that group
func (s *OpsService) GetConcurrencyStats(
ctx context.Context,
platformFilter string,
groupIDFilter *int64,
) (map[string]*PlatformConcurrencyInfo, map[int64]*GroupConcurrencyInfo, map[int64]*AccountConcurrencyInfo, *time.Time, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, nil, nil, nil, err
}
accounts, err := s.listAllAccountsForOps(ctx, platformFilter)
if err != nil {
return nil, nil, nil, nil, err
}
collectedAt := time.Now()
loadMap := s.getAccountsLoadMapBestEffort(ctx, accounts)
platform := make(map[string]*PlatformConcurrencyInfo)
group := make(map[int64]*GroupConcurrencyInfo)
account := make(map[int64]*AccountConcurrencyInfo)
for _, acc := range accounts {
if acc.ID <= 0 {
continue
}
var matchedGroup *Group
if groupIDFilter != nil && *groupIDFilter > 0 {
for _, grp := range acc.Groups {
if grp == nil || grp.ID <= 0 {
continue
}
if grp.ID == *groupIDFilter {
matchedGroup = grp
break
}
}
// Group filter provided: skip accounts not in that group.
if matchedGroup == nil {
continue
}
}
load := loadMap[acc.ID]
currentInUse := int64(0)
waiting := int64(0)
if load != nil {
currentInUse = int64(load.CurrentConcurrency)
waiting = int64(load.WaitingCount)
}
// Account-level view picks one display group (the first group).
displayGroupID := int64(0)
displayGroupName := ""
if matchedGroup != nil {
displayGroupID = matchedGroup.ID
displayGroupName = matchedGroup.Name
} else if len(acc.Groups) > 0 && acc.Groups[0] != nil {
displayGroupID = acc.Groups[0].ID
displayGroupName = acc.Groups[0].Name
}
if _, ok := account[acc.ID]; !ok {
info := &AccountConcurrencyInfo{
AccountID: acc.ID,
AccountName: acc.Name,
Platform: acc.Platform,
GroupID: displayGroupID,
GroupName: displayGroupName,
CurrentInUse: currentInUse,
MaxCapacity: int64(acc.Concurrency),
WaitingInQueue: waiting,
}
if info.MaxCapacity > 0 {
info.LoadPercentage = float64(info.CurrentInUse) / float64(info.MaxCapacity) * 100
}
account[acc.ID] = info
}
// Platform aggregation.
if acc.Platform != "" {
if _, ok := platform[acc.Platform]; !ok {
platform[acc.Platform] = &PlatformConcurrencyInfo{
Platform: acc.Platform,
}
}
p := platform[acc.Platform]
p.MaxCapacity += int64(acc.Concurrency)
p.CurrentInUse += currentInUse
p.WaitingInQueue += waiting
}
// Group aggregation (one account may contribute to multiple groups).
if matchedGroup != nil {
grp := matchedGroup
if _, ok := group[grp.ID]; !ok {
group[grp.ID] = &GroupConcurrencyInfo{
GroupID: grp.ID,
GroupName: grp.Name,
Platform: grp.Platform,
}
}
g := group[grp.ID]
if g.GroupName == "" && grp.Name != "" {
g.GroupName = grp.Name
}
if g.Platform != "" && grp.Platform != "" && g.Platform != grp.Platform {
// Groups are expected to be platform-scoped. If mismatch is observed, avoid misleading labels.
g.Platform = ""
}
g.MaxCapacity += int64(acc.Concurrency)
g.CurrentInUse += currentInUse
g.WaitingInQueue += waiting
} else {
for _, grp := range acc.Groups {
if grp == nil || grp.ID <= 0 {
continue
}
if _, ok := group[grp.ID]; !ok {
group[grp.ID] = &GroupConcurrencyInfo{
GroupID: grp.ID,
GroupName: grp.Name,
Platform: grp.Platform,
}
}
g := group[grp.ID]
if g.GroupName == "" && grp.Name != "" {
g.GroupName = grp.Name
}
if g.Platform != "" && grp.Platform != "" && g.Platform != grp.Platform {
// Groups are expected to be platform-scoped. If mismatch is observed, avoid misleading labels.
g.Platform = ""
}
g.MaxCapacity += int64(acc.Concurrency)
g.CurrentInUse += currentInUse
g.WaitingInQueue += waiting
}
}
}
for _, info := range platform {
if info.MaxCapacity > 0 {
info.LoadPercentage = float64(info.CurrentInUse) / float64(info.MaxCapacity) * 100
}
}
for _, info := range group {
if info.MaxCapacity > 0 {
info.LoadPercentage = float64(info.CurrentInUse) / float64(info.MaxCapacity) * 100
}
}
return platform, group, account, &collectedAt, nil
}
package service
import (
"context"
"database/sql"
"errors"
"log"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
func (s *OpsService) GetDashboardOverview(ctx context.Context, filter *OpsDashboardFilter) (*OpsDashboardOverview, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if filter == nil {
return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required")
}
if filter.StartTime.IsZero() || filter.EndTime.IsZero() {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required")
}
if filter.StartTime.After(filter.EndTime) {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time")
}
// Resolve query mode (requested via query param, or DB default).
filter.QueryMode = s.resolveOpsQueryMode(ctx, filter.QueryMode)
overview, err := s.opsRepo.GetDashboardOverview(ctx, filter)
if err != nil {
if errors.Is(err, ErrOpsPreaggregatedNotPopulated) {
return nil, infraerrors.Conflict("OPS_PREAGG_NOT_READY", "Pre-aggregated ops metrics are not populated yet")
}
return nil, err
}
// Best-effort system health + jobs; dashboard metrics should still render if these are missing.
if metrics, err := s.opsRepo.GetLatestSystemMetrics(ctx, 1); err == nil {
overview.SystemMetrics = metrics
} else if err != nil && !errors.Is(err, sql.ErrNoRows) {
log.Printf("[Ops] GetLatestSystemMetrics failed: %v", err)
}
if heartbeats, err := s.opsRepo.ListJobHeartbeats(ctx); err == nil {
overview.JobHeartbeats = heartbeats
} else {
log.Printf("[Ops] ListJobHeartbeats failed: %v", err)
}
return overview, nil
}
func (s *OpsService) resolveOpsQueryMode(ctx context.Context, requested OpsQueryMode) OpsQueryMode {
if requested.IsValid() {
// Allow "auto" to be disabled via config until preagg is proven stable in production.
// Forced `preagg` via query param still works.
if requested == OpsQueryModeAuto && s != nil && s.cfg != nil && !s.cfg.Ops.UsePreaggregatedTables {
return OpsQueryModeRaw
}
return requested
}
mode := OpsQueryModeAuto
if s != nil && s.settingRepo != nil {
if raw, err := s.settingRepo.GetValue(ctx, SettingKeyOpsQueryModeDefault); err == nil {
mode = ParseOpsQueryMode(raw)
}
}
if mode == OpsQueryModeAuto && s != nil && s.cfg != nil && !s.cfg.Ops.UsePreaggregatedTables {
return OpsQueryModeRaw
}
return mode
}
package service
import (
"context"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
func (s *OpsService) GetErrorTrend(ctx context.Context, filter *OpsDashboardFilter, bucketSeconds int) (*OpsErrorTrendResponse, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if filter == nil {
return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required")
}
if filter.StartTime.IsZero() || filter.EndTime.IsZero() {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required")
}
if filter.StartTime.After(filter.EndTime) {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time")
}
return s.opsRepo.GetErrorTrend(ctx, filter, bucketSeconds)
}
func (s *OpsService) GetErrorDistribution(ctx context.Context, filter *OpsDashboardFilter) (*OpsErrorDistributionResponse, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if filter == nil {
return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required")
}
if filter.StartTime.IsZero() || filter.EndTime.IsZero() {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required")
}
if filter.StartTime.After(filter.EndTime) {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time")
}
return s.opsRepo.GetErrorDistribution(ctx, filter)
}
package service
import (
"context"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
func (s *OpsService) GetLatencyHistogram(ctx context.Context, filter *OpsDashboardFilter) (*OpsLatencyHistogramResponse, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if filter == nil {
return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required")
}
if filter.StartTime.IsZero() || filter.EndTime.IsZero() {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required")
}
if filter.StartTime.After(filter.EndTime) {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time")
}
return s.opsRepo.GetLatencyHistogram(ctx, filter)
}
This diff is collapsed.
package service
import (
"context"
"time"
)
type OpsRepository interface {
InsertErrorLog(ctx context.Context, input *OpsInsertErrorLogInput) (int64, error)
ListErrorLogs(ctx context.Context, filter *OpsErrorLogFilter) (*OpsErrorLogList, error)
GetErrorLogByID(ctx context.Context, id int64) (*OpsErrorLogDetail, error)
ListRequestDetails(ctx context.Context, filter *OpsRequestDetailFilter) ([]*OpsRequestDetail, int64, error)
InsertRetryAttempt(ctx context.Context, input *OpsInsertRetryAttemptInput) (int64, error)
UpdateRetryAttempt(ctx context.Context, input *OpsUpdateRetryAttemptInput) error
GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*OpsRetryAttempt, error)
// Lightweight window stats (for realtime WS / quick sampling).
GetWindowStats(ctx context.Context, filter *OpsDashboardFilter) (*OpsWindowStats, error)
GetDashboardOverview(ctx context.Context, filter *OpsDashboardFilter) (*OpsDashboardOverview, error)
GetThroughputTrend(ctx context.Context, filter *OpsDashboardFilter, bucketSeconds int) (*OpsThroughputTrendResponse, error)
GetLatencyHistogram(ctx context.Context, filter *OpsDashboardFilter) (*OpsLatencyHistogramResponse, error)
GetErrorTrend(ctx context.Context, filter *OpsDashboardFilter, bucketSeconds int) (*OpsErrorTrendResponse, error)
GetErrorDistribution(ctx context.Context, filter *OpsDashboardFilter) (*OpsErrorDistributionResponse, error)
InsertSystemMetrics(ctx context.Context, input *OpsInsertSystemMetricsInput) error
GetLatestSystemMetrics(ctx context.Context, windowMinutes int) (*OpsSystemMetricsSnapshot, error)
UpsertJobHeartbeat(ctx context.Context, input *OpsUpsertJobHeartbeatInput) error
ListJobHeartbeats(ctx context.Context) ([]*OpsJobHeartbeat, error)
// Alerts (rules + events)
ListAlertRules(ctx context.Context) ([]*OpsAlertRule, error)
CreateAlertRule(ctx context.Context, input *OpsAlertRule) (*OpsAlertRule, error)
UpdateAlertRule(ctx context.Context, input *OpsAlertRule) (*OpsAlertRule, error)
DeleteAlertRule(ctx context.Context, id int64) error
ListAlertEvents(ctx context.Context, filter *OpsAlertEventFilter) ([]*OpsAlertEvent, error)
GetActiveAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error)
GetLatestAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error)
CreateAlertEvent(ctx context.Context, event *OpsAlertEvent) (*OpsAlertEvent, error)
UpdateAlertEventStatus(ctx context.Context, eventID int64, status string, resolvedAt *time.Time) error
UpdateAlertEventEmailSent(ctx context.Context, eventID int64, emailSent bool) error
// Pre-aggregation (hourly/daily) used for long-window dashboard performance.
UpsertHourlyMetrics(ctx context.Context, startTime, endTime time.Time) error
UpsertDailyMetrics(ctx context.Context, startTime, endTime time.Time) error
GetLatestHourlyBucketStart(ctx context.Context) (time.Time, bool, error)
GetLatestDailyBucketDate(ctx context.Context) (time.Time, bool, error)
}
type OpsInsertErrorLogInput struct {
RequestID string
ClientRequestID string
UserID *int64
APIKeyID *int64
AccountID *int64
GroupID *int64
ClientIP *string
Platform string
Model string
RequestPath string
Stream bool
UserAgent string
ErrorPhase string
ErrorType string
Severity string
StatusCode int
IsBusinessLimited bool
ErrorMessage string
ErrorBody string
ErrorSource string
ErrorOwner string
UpstreamStatusCode *int
UpstreamErrorMessage *string
UpstreamErrorDetail *string
DurationMs *int
TimeToFirstTokenMs *int64
RequestBodyJSON *string // sanitized json string (not raw bytes)
RequestBodyTruncated bool
RequestBodyBytes *int
RequestHeadersJSON *string // optional json string
IsRetryable bool
RetryCount int
CreatedAt time.Time
}
type OpsInsertRetryAttemptInput struct {
RequestedByUserID int64
SourceErrorID int64
Mode string
PinnedAccountID *int64
// running|queued etc.
Status string
StartedAt time.Time
}
type OpsUpdateRetryAttemptInput struct {
ID int64
// succeeded|failed
Status string
FinishedAt time.Time
DurationMs int64
// Optional correlation
ResultRequestID *string
ResultErrorID *int64
ErrorMessage *string
}
type OpsInsertSystemMetricsInput struct {
CreatedAt time.Time
WindowMinutes int
Platform *string
GroupID *int64
SuccessCount int64
ErrorCountTotal int64
BusinessLimitedCount int64
ErrorCountSLA int64
UpstreamErrorCountExcl429529 int64
Upstream429Count int64
Upstream529Count int64
TokenConsumed int64
QPS *float64
TPS *float64
DurationP50Ms *int
DurationP90Ms *int
DurationP95Ms *int
DurationP99Ms *int
DurationAvgMs *float64
DurationMaxMs *int
TTFTP50Ms *int
TTFTP90Ms *int
TTFTP95Ms *int
TTFTP99Ms *int
TTFTAvgMs *float64
TTFTMaxMs *int
CPUUsagePercent *float64
MemoryUsedMB *int64
MemoryTotalMB *int64
MemoryUsagePercent *float64
DBOK *bool
RedisOK *bool
DBConnActive *int
DBConnIdle *int
DBConnWaiting *int
GoroutineCount *int
ConcurrencyQueueDepth *int
}
type OpsSystemMetricsSnapshot struct {
ID int64 `json:"id"`
CreatedAt time.Time `json:"created_at"`
WindowMinutes int `json:"window_minutes"`
CPUUsagePercent *float64 `json:"cpu_usage_percent"`
MemoryUsedMB *int64 `json:"memory_used_mb"`
MemoryTotalMB *int64 `json:"memory_total_mb"`
MemoryUsagePercent *float64 `json:"memory_usage_percent"`
DBOK *bool `json:"db_ok"`
RedisOK *bool `json:"redis_ok"`
DBConnActive *int `json:"db_conn_active"`
DBConnIdle *int `json:"db_conn_idle"`
DBConnWaiting *int `json:"db_conn_waiting"`
GoroutineCount *int `json:"goroutine_count"`
ConcurrencyQueueDepth *int `json:"concurrency_queue_depth"`
}
type OpsUpsertJobHeartbeatInput struct {
JobName string
LastRunAt *time.Time
LastSuccessAt *time.Time
LastErrorAt *time.Time
LastError *string
LastDurationMs *int64
}
type OpsJobHeartbeat struct {
JobName string `json:"job_name"`
LastRunAt *time.Time `json:"last_run_at"`
LastSuccessAt *time.Time `json:"last_success_at"`
LastErrorAt *time.Time `json:"last_error_at"`
LastError *string `json:"last_error"`
LastDurationMs *int64 `json:"last_duration_ms"`
UpdatedAt time.Time `json:"updated_at"`
}
type OpsWindowStats struct {
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
SuccessCount int64 `json:"success_count"`
ErrorCountTotal int64 `json:"error_count_total"`
TokenConsumed int64 `json:"token_consumed"`
}
package service
import (
"errors"
"strings"
)
type OpsQueryMode string
const (
OpsQueryModeAuto OpsQueryMode = "auto"
OpsQueryModeRaw OpsQueryMode = "raw"
OpsQueryModePreagg OpsQueryMode = "preagg"
)
// ErrOpsPreaggregatedNotPopulated indicates that raw logs exist for a window, but the
// pre-aggregation tables are not populated yet. This is primarily used to implement
// the forced `preagg` mode UX.
var ErrOpsPreaggregatedNotPopulated = errors.New("ops pre-aggregated tables not populated")
func ParseOpsQueryMode(raw string) OpsQueryMode {
v := strings.ToLower(strings.TrimSpace(raw))
switch v {
case string(OpsQueryModeRaw):
return OpsQueryModeRaw
case string(OpsQueryModePreagg):
return OpsQueryModePreagg
default:
return OpsQueryModeAuto
}
}
func (m OpsQueryMode) IsValid() bool {
switch m {
case OpsQueryModeAuto, OpsQueryModeRaw, OpsQueryModePreagg:
return true
default:
return false
}
}
package service
import (
"context"
"errors"
"strings"
)
// IsRealtimeMonitoringEnabled returns true when realtime ops features are enabled.
//
// This is a soft switch controlled by the DB setting `ops_realtime_monitoring_enabled`,
// and it is also gated by the hard switch/soft switch of overall ops monitoring.
func (s *OpsService) IsRealtimeMonitoringEnabled(ctx context.Context) bool {
if !s.IsMonitoringEnabled(ctx) {
return false
}
if s.settingRepo == nil {
return true
}
value, err := s.settingRepo.GetValue(ctx, SettingKeyOpsRealtimeMonitoringEnabled)
if err != nil {
// Default enabled when key is missing; fail-open on transient errors.
if errors.Is(err, ErrSettingNotFound) {
return true
}
return true
}
switch strings.ToLower(strings.TrimSpace(value)) {
case "false", "0", "off", "disabled":
return false
default:
return true
}
}
package service
import (
"context"
"time"
)
type OpsRequestKind string
const (
OpsRequestKindSuccess OpsRequestKind = "success"
OpsRequestKindError OpsRequestKind = "error"
)
// OpsRequestDetail is a request-level view across success (usage_logs) and error (ops_error_logs).
// It powers "request drilldown" UIs without exposing full request bodies for successful requests.
type OpsRequestDetail struct {
Kind OpsRequestKind `json:"kind"`
CreatedAt time.Time `json:"created_at"`
RequestID string `json:"request_id"`
Platform string `json:"platform,omitempty"`
Model string `json:"model,omitempty"`
DurationMs *int `json:"duration_ms,omitempty"`
StatusCode *int `json:"status_code,omitempty"`
// When Kind == "error", ErrorID links to /admin/ops/errors/:id.
ErrorID *int64 `json:"error_id,omitempty"`
Phase string `json:"phase,omitempty"`
Severity string `json:"severity,omitempty"`
Message string `json:"message,omitempty"`
UserID *int64 `json:"user_id,omitempty"`
APIKeyID *int64 `json:"api_key_id,omitempty"`
AccountID *int64 `json:"account_id,omitempty"`
GroupID *int64 `json:"group_id,omitempty"`
Stream bool `json:"stream"`
}
type OpsRequestDetailFilter struct {
StartTime *time.Time
EndTime *time.Time
// kind: success|error|all
Kind string
Platform string
GroupID *int64
UserID *int64
APIKeyID *int64
AccountID *int64
Model string
RequestID string
Query string
MinDurationMs *int
MaxDurationMs *int
// Sort: created_at_desc (default) or duration_desc.
Sort string
Page int
PageSize int
}
func (f *OpsRequestDetailFilter) Normalize() (page, pageSize int, startTime, endTime time.Time) {
page = 1
pageSize = 50
endTime = time.Now()
startTime = endTime.Add(-1 * time.Hour)
if f == nil {
return page, pageSize, startTime, endTime
}
if f.Page > 0 {
page = f.Page
}
if f.PageSize > 0 {
pageSize = f.PageSize
}
if pageSize > 100 {
pageSize = 100
}
if f.EndTime != nil {
endTime = *f.EndTime
}
if f.StartTime != nil {
startTime = *f.StartTime
} else if f.EndTime != nil {
startTime = endTime.Add(-1 * time.Hour)
}
if startTime.After(endTime) {
startTime, endTime = endTime, startTime
}
return page, pageSize, startTime, endTime
}
type OpsRequestDetailList struct {
Items []*OpsRequestDetail `json:"items"`
Total int64 `json:"total"`
Page int `json:"page"`
PageSize int `json:"page_size"`
}
func (s *OpsService) ListRequestDetails(ctx context.Context, filter *OpsRequestDetailFilter) (*OpsRequestDetailList, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return &OpsRequestDetailList{
Items: []*OpsRequestDetail{},
Total: 0,
Page: 1,
PageSize: 50,
}, nil
}
page, pageSize, startTime, endTime := filter.Normalize()
filterCopy := &OpsRequestDetailFilter{}
if filter != nil {
*filterCopy = *filter
}
filterCopy.Page = page
filterCopy.PageSize = pageSize
filterCopy.StartTime = &startTime
filterCopy.EndTime = &endTime
items, total, err := s.opsRepo.ListRequestDetails(ctx, filterCopy)
if err != nil {
return nil, err
}
if items == nil {
items = []*OpsRequestDetail{}
}
return &OpsRequestDetailList{
Items: items,
Total: total,
Page: page,
PageSize: pageSize,
}, nil
}
package service
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"strings"
"time"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/gin-gonic/gin"
"github.com/lib/pq"
)
const (
OpsRetryModeClient = "client"
OpsRetryModeUpstream = "upstream"
)
const (
opsRetryStatusRunning = "running"
opsRetryStatusSucceeded = "succeeded"
opsRetryStatusFailed = "failed"
)
const (
opsRetryTimeout = 60 * time.Second
opsRetryCaptureBytesLimit = 64 * 1024
opsRetryResponsePreviewMax = 8 * 1024
opsRetryMinIntervalPerError = 10 * time.Second
opsRetryMaxAccountSwitches = 3
)
var opsRetryRequestHeaderAllowlist = map[string]bool{
"anthropic-beta": true,
"anthropic-version": true,
}
type opsRetryRequestType string
const (
opsRetryTypeMessages opsRetryRequestType = "messages"
opsRetryTypeOpenAI opsRetryRequestType = "openai_responses"
opsRetryTypeGeminiV1B opsRetryRequestType = "gemini_v1beta"
)
type limitedResponseWriter struct {
header http.Header
status int
wroteHeader bool
limit int
totalWritten int64
buf bytes.Buffer
}
func newLimitedResponseWriter(limit int) *limitedResponseWriter {
if limit <= 0 {
limit = 1
}
return &limitedResponseWriter{
header: make(http.Header),
status: http.StatusOK,
limit: limit,
}
}
func (w *limitedResponseWriter) Header() http.Header {
return w.header
}
func (w *limitedResponseWriter) WriteHeader(statusCode int) {
if w.wroteHeader {
return
}
w.wroteHeader = true
w.status = statusCode
}
func (w *limitedResponseWriter) Write(p []byte) (int, error) {
if !w.wroteHeader {
w.WriteHeader(http.StatusOK)
}
w.totalWritten += int64(len(p))
if w.buf.Len() < w.limit {
remaining := w.limit - w.buf.Len()
if len(p) > remaining {
_, _ = w.buf.Write(p[:remaining])
} else {
_, _ = w.buf.Write(p)
}
}
// Pretend we wrote everything to avoid upstream/client code treating it as an error.
return len(p), nil
}
func (w *limitedResponseWriter) Flush() {}
func (w *limitedResponseWriter) bodyBytes() []byte {
return w.buf.Bytes()
}
func (w *limitedResponseWriter) truncated() bool {
return w.totalWritten > int64(w.limit)
}
func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, errorID int64, mode string, pinnedAccountID *int64) (*OpsRetryResult, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
mode = strings.ToLower(strings.TrimSpace(mode))
switch mode {
case OpsRetryModeClient, OpsRetryModeUpstream:
default:
return nil, infraerrors.BadRequest("OPS_RETRY_INVALID_MODE", "mode must be client or upstream")
}
latest, err := s.opsRepo.GetLatestRetryAttemptForError(ctx, errorID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, infraerrors.InternalServer("OPS_RETRY_LOAD_LATEST_FAILED", "Failed to check retry status").WithCause(err)
}
if latest != nil {
if strings.EqualFold(latest.Status, opsRetryStatusRunning) || strings.EqualFold(latest.Status, "queued") {
return nil, infraerrors.Conflict("OPS_RETRY_IN_PROGRESS", "A retry is already in progress for this error")
}
lastAttemptAt := latest.CreatedAt
if latest.FinishedAt != nil && !latest.FinishedAt.IsZero() {
lastAttemptAt = *latest.FinishedAt
} else if latest.StartedAt != nil && !latest.StartedAt.IsZero() {
lastAttemptAt = *latest.StartedAt
}
if time.Since(lastAttemptAt) < opsRetryMinIntervalPerError {
return nil, infraerrors.Conflict("OPS_RETRY_TOO_FREQUENT", "Please wait before retrying this error again")
}
}
errorLog, err := s.GetErrorLogByID(ctx, errorID)
if err != nil {
return nil, err
}
if strings.TrimSpace(errorLog.RequestBody) == "" {
return nil, infraerrors.BadRequest("OPS_RETRY_NO_REQUEST_BODY", "No request body found to retry")
}
var pinned *int64
if mode == OpsRetryModeUpstream {
if pinnedAccountID != nil && *pinnedAccountID > 0 {
pinned = pinnedAccountID
} else if errorLog.AccountID != nil && *errorLog.AccountID > 0 {
pinned = errorLog.AccountID
} else {
return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "pinned_account_id is required for upstream retry")
}
}
startedAt := time.Now()
attemptID, err := s.opsRepo.InsertRetryAttempt(ctx, &OpsInsertRetryAttemptInput{
RequestedByUserID: requestedByUserID,
SourceErrorID: errorID,
Mode: mode,
PinnedAccountID: pinned,
Status: opsRetryStatusRunning,
StartedAt: startedAt,
})
if err != nil {
var pqErr *pq.Error
if errors.As(err, &pqErr) && string(pqErr.Code) == "23505" {
return nil, infraerrors.Conflict("OPS_RETRY_IN_PROGRESS", "A retry is already in progress for this error")
}
return nil, infraerrors.InternalServer("OPS_RETRY_CREATE_ATTEMPT_FAILED", "Failed to create retry attempt").WithCause(err)
}
result := &OpsRetryResult{
AttemptID: attemptID,
Mode: mode,
Status: opsRetryStatusFailed,
PinnedAccountID: pinned,
HTTPStatusCode: 0,
UpstreamRequestID: "",
ResponsePreview: "",
ResponseTruncated: false,
ErrorMessage: "",
StartedAt: startedAt,
}
execCtx, cancel := context.WithTimeout(ctx, opsRetryTimeout)
defer cancel()
execRes := s.executeRetry(execCtx, errorLog, mode, pinned)
finishedAt := time.Now()
result.FinishedAt = finishedAt
result.DurationMs = finishedAt.Sub(startedAt).Milliseconds()
if execRes != nil {
result.Status = execRes.status
result.UsedAccountID = execRes.usedAccountID
result.HTTPStatusCode = execRes.httpStatusCode
result.UpstreamRequestID = execRes.upstreamRequestID
result.ResponsePreview = execRes.responsePreview
result.ResponseTruncated = execRes.responseTruncated
result.ErrorMessage = execRes.errorMessage
}
updateCtx, updateCancel := context.WithTimeout(context.Background(), 3*time.Second)
defer updateCancel()
var updateErrMsg *string
if strings.TrimSpace(result.ErrorMessage) != "" {
msg := result.ErrorMessage
updateErrMsg = &msg
}
var resultRequestID *string
if strings.TrimSpace(result.UpstreamRequestID) != "" {
v := result.UpstreamRequestID
resultRequestID = &v
}
finalStatus := result.Status
if strings.TrimSpace(finalStatus) == "" {
finalStatus = opsRetryStatusFailed
}
if err := s.opsRepo.UpdateRetryAttempt(updateCtx, &OpsUpdateRetryAttemptInput{
ID: attemptID,
Status: finalStatus,
FinishedAt: finishedAt,
DurationMs: result.DurationMs,
ResultRequestID: resultRequestID,
ErrorMessage: updateErrMsg,
}); err != nil {
// Best-effort: retry itself already executed; do not fail the API response.
log.Printf("[Ops] UpdateRetryAttempt failed: %v", err)
}
return result, nil
}
type opsRetryExecution struct {
status string
usedAccountID *int64
httpStatusCode int
upstreamRequestID string
responsePreview string
responseTruncated bool
errorMessage string
}
func (s *OpsService) executeRetry(ctx context.Context, errorLog *OpsErrorLogDetail, mode string, pinnedAccountID *int64) *opsRetryExecution {
if errorLog == nil {
return &opsRetryExecution{
status: opsRetryStatusFailed,
errorMessage: "missing error log",
}
}
reqType := detectOpsRetryType(errorLog.RequestPath)
bodyBytes := []byte(errorLog.RequestBody)
switch reqType {
case opsRetryTypeMessages:
bodyBytes = FilterThinkingBlocksForRetry(bodyBytes)
case opsRetryTypeOpenAI, opsRetryTypeGeminiV1B:
// No-op
}
switch strings.ToLower(strings.TrimSpace(mode)) {
case OpsRetryModeUpstream:
if pinnedAccountID == nil || *pinnedAccountID <= 0 {
return &opsRetryExecution{
status: opsRetryStatusFailed,
errorMessage: "pinned_account_id required for upstream retry",
}
}
return s.executePinnedRetry(ctx, reqType, errorLog, bodyBytes, *pinnedAccountID)
case OpsRetryModeClient:
return s.executeClientRetry(ctx, reqType, errorLog, bodyBytes)
default:
return &opsRetryExecution{
status: opsRetryStatusFailed,
errorMessage: "invalid retry mode",
}
}
}
func detectOpsRetryType(path string) opsRetryRequestType {
p := strings.ToLower(strings.TrimSpace(path))
switch {
case strings.Contains(p, "/responses"):
return opsRetryTypeOpenAI
case strings.Contains(p, "/v1beta/"):
return opsRetryTypeGeminiV1B
default:
return opsRetryTypeMessages
}
}
func (s *OpsService) executePinnedRetry(ctx context.Context, reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte, pinnedAccountID int64) *opsRetryExecution {
if s.accountRepo == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account repository not available"}
}
account, err := s.accountRepo.GetByID(ctx, pinnedAccountID)
if err != nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: fmt.Sprintf("account not found: %v", err)}
}
if account == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account not found"}
}
if !account.IsSchedulable() {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account is not schedulable"}
}
if errorLog.GroupID != nil && *errorLog.GroupID > 0 {
if !containsInt64(account.GroupIDs, *errorLog.GroupID) {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "pinned account is not in the same group as the original request"}
}
}
var release func()
if s.concurrencyService != nil {
acq, err := s.concurrencyService.AcquireAccountSlot(ctx, account.ID, account.Concurrency)
if err != nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: fmt.Sprintf("acquire account slot failed: %v", err)}
}
if acq == nil || !acq.Acquired {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account concurrency limit reached"}
}
release = acq.ReleaseFunc
}
if release != nil {
defer release()
}
usedID := account.ID
exec := s.executeWithAccount(ctx, reqType, errorLog, body, account)
exec.usedAccountID = &usedID
if exec.status == "" {
exec.status = opsRetryStatusFailed
}
return exec
}
func (s *OpsService) executeClientRetry(ctx context.Context, reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte) *opsRetryExecution {
groupID := errorLog.GroupID
if groupID == nil || *groupID <= 0 {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "group_id missing; cannot reselect account"}
}
model, stream, parsedErr := extractRetryModelAndStream(reqType, errorLog, body)
if parsedErr != nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: parsedErr.Error()}
}
_ = stream
excluded := make(map[int64]struct{})
switches := 0
for {
if switches >= opsRetryMaxAccountSwitches {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "retry failed after exhausting account failovers"}
}
selection, selErr := s.selectAccountForRetry(ctx, reqType, groupID, model, excluded)
if selErr != nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: selErr.Error()}
}
if selection == nil || selection.Account == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "no available accounts"}
}
account := selection.Account
if !selection.Acquired || selection.ReleaseFunc == nil {
excluded[account.ID] = struct{}{}
switches++
continue
}
exec := func() *opsRetryExecution {
defer selection.ReleaseFunc()
return s.executeWithAccount(ctx, reqType, errorLog, body, account)
}()
if exec != nil {
if exec.status == opsRetryStatusSucceeded {
usedID := account.ID
exec.usedAccountID = &usedID
return exec
}
// If the gateway services ask for failover, try another account.
if s.isFailoverError(exec.errorMessage) {
excluded[account.ID] = struct{}{}
switches++
continue
}
usedID := account.ID
exec.usedAccountID = &usedID
return exec
}
excluded[account.ID] = struct{}{}
switches++
}
}
func (s *OpsService) selectAccountForRetry(ctx context.Context, reqType opsRetryRequestType, groupID *int64, model string, excludedIDs map[int64]struct{}) (*AccountSelectionResult, error) {
switch reqType {
case opsRetryTypeOpenAI:
if s.openAIGatewayService == nil {
return nil, fmt.Errorf("openai gateway service not available")
}
return s.openAIGatewayService.SelectAccountWithLoadAwareness(ctx, groupID, "", model, excludedIDs)
case opsRetryTypeGeminiV1B, opsRetryTypeMessages:
if s.gatewayService == nil {
return nil, fmt.Errorf("gateway service not available")
}
return s.gatewayService.SelectAccountWithLoadAwareness(ctx, groupID, "", model, excludedIDs)
default:
return nil, fmt.Errorf("unsupported retry type: %s", reqType)
}
}
func extractRetryModelAndStream(reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte) (model string, stream bool, err error) {
switch reqType {
case opsRetryTypeMessages:
parsed, parseErr := ParseGatewayRequest(body)
if parseErr != nil {
return "", false, fmt.Errorf("failed to parse messages request body: %w", parseErr)
}
return parsed.Model, parsed.Stream, nil
case opsRetryTypeOpenAI:
var v struct {
Model string `json:"model"`
Stream bool `json:"stream"`
}
if err := json.Unmarshal(body, &v); err != nil {
return "", false, fmt.Errorf("failed to parse openai request body: %w", err)
}
return strings.TrimSpace(v.Model), v.Stream, nil
case opsRetryTypeGeminiV1B:
if strings.TrimSpace(errorLog.Model) == "" {
return "", false, fmt.Errorf("missing model for gemini v1beta retry")
}
return strings.TrimSpace(errorLog.Model), errorLog.Stream, nil
default:
return "", false, fmt.Errorf("unsupported retry type: %s", reqType)
}
}
func (s *OpsService) executeWithAccount(ctx context.Context, reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte, account *Account) *opsRetryExecution {
if account == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "missing account"}
}
c, w := newOpsRetryContext(ctx, errorLog)
var err error
switch reqType {
case opsRetryTypeOpenAI:
if s.openAIGatewayService == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "openai gateway service not available"}
}
_, err = s.openAIGatewayService.Forward(ctx, c, account, body)
case opsRetryTypeGeminiV1B:
if s.geminiCompatService == nil || s.antigravityGatewayService == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "gemini services not available"}
}
modelName := strings.TrimSpace(errorLog.Model)
action := "generateContent"
if errorLog.Stream {
action = "streamGenerateContent"
}
if account.Platform == PlatformAntigravity {
_, err = s.antigravityGatewayService.ForwardGemini(ctx, c, account, modelName, action, errorLog.Stream, body)
} else {
_, err = s.geminiCompatService.ForwardNative(ctx, c, account, modelName, action, errorLog.Stream, body)
}
case opsRetryTypeMessages:
switch account.Platform {
case PlatformAntigravity:
if s.antigravityGatewayService == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "antigravity gateway service not available"}
}
_, err = s.antigravityGatewayService.Forward(ctx, c, account, body)
case PlatformGemini:
if s.geminiCompatService == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "gemini gateway service not available"}
}
_, err = s.geminiCompatService.Forward(ctx, c, account, body)
default:
if s.gatewayService == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "gateway service not available"}
}
parsedReq, parseErr := ParseGatewayRequest(body)
if parseErr != nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "failed to parse request body"}
}
_, err = s.gatewayService.Forward(ctx, c, account, parsedReq)
}
default:
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "unsupported retry type"}
}
statusCode := http.StatusOK
if c != nil && c.Writer != nil {
statusCode = c.Writer.Status()
}
upstreamReqID := extractUpstreamRequestID(c)
preview, truncated := extractResponsePreview(w)
exec := &opsRetryExecution{
status: opsRetryStatusFailed,
httpStatusCode: statusCode,
upstreamRequestID: upstreamReqID,
responsePreview: preview,
responseTruncated: truncated,
errorMessage: "",
}
if err == nil && statusCode < 400 {
exec.status = opsRetryStatusSucceeded
return exec
}
if err != nil {
exec.errorMessage = err.Error()
} else {
exec.errorMessage = fmt.Sprintf("upstream returned status %d", statusCode)
}
return exec
}
func newOpsRetryContext(ctx context.Context, errorLog *OpsErrorLogDetail) (*gin.Context, *limitedResponseWriter) {
w := newLimitedResponseWriter(opsRetryCaptureBytesLimit)
c, _ := gin.CreateTestContext(w)
path := "/"
if errorLog != nil && strings.TrimSpace(errorLog.RequestPath) != "" {
path = errorLog.RequestPath
}
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, "http://localhost"+path, bytes.NewReader(nil))
req.Header.Set("content-type", "application/json")
if errorLog != nil && strings.TrimSpace(errorLog.UserAgent) != "" {
req.Header.Set("user-agent", errorLog.UserAgent)
}
// Restore a minimal, whitelisted subset of request headers to improve retry fidelity
// (e.g. anthropic-beta / anthropic-version). Never replay auth credentials.
if errorLog != nil && strings.TrimSpace(errorLog.RequestHeaders) != "" {
var stored map[string]string
if err := json.Unmarshal([]byte(errorLog.RequestHeaders), &stored); err == nil {
for k, v := range stored {
key := strings.TrimSpace(k)
if key == "" {
continue
}
if !opsRetryRequestHeaderAllowlist[strings.ToLower(key)] {
continue
}
val := strings.TrimSpace(v)
if val == "" {
continue
}
req.Header.Set(key, val)
}
}
}
c.Request = req
return c, w
}
func extractUpstreamRequestID(c *gin.Context) string {
if c == nil || c.Writer == nil {
return ""
}
h := c.Writer.Header()
if h == nil {
return ""
}
for _, key := range []string{"x-request-id", "X-Request-Id", "X-Request-ID"} {
if v := strings.TrimSpace(h.Get(key)); v != "" {
return v
}
}
return ""
}
func extractResponsePreview(w *limitedResponseWriter) (preview string, truncated bool) {
if w == nil {
return "", false
}
b := bytes.TrimSpace(w.bodyBytes())
if len(b) == 0 {
return "", w.truncated()
}
if len(b) > opsRetryResponsePreviewMax {
return string(b[:opsRetryResponsePreviewMax]), true
}
return string(b), w.truncated()
}
func containsInt64(items []int64, needle int64) bool {
for _, v := range items {
if v == needle {
return true
}
}
return false
}
func (s *OpsService) isFailoverError(message string) bool {
msg := strings.ToLower(strings.TrimSpace(message))
if msg == "" {
return false
}
return strings.Contains(msg, "upstream error:") && strings.Contains(msg, "failover")
}
package service
import (
"context"
"database/sql"
"encoding/json"
"errors"
"log"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
var ErrOpsDisabled = infraerrors.NotFound("OPS_DISABLED", "Ops monitoring is disabled")
const (
opsMaxStoredRequestBodyBytes = 10 * 1024
opsMaxStoredErrorBodyBytes = 20 * 1024
)
// OpsService provides ingestion and query APIs for the Ops monitoring module.
type OpsService struct {
opsRepo OpsRepository
settingRepo SettingRepository
cfg *config.Config
accountRepo AccountRepository
concurrencyService *ConcurrencyService
gatewayService *GatewayService
openAIGatewayService *OpenAIGatewayService
geminiCompatService *GeminiMessagesCompatService
antigravityGatewayService *AntigravityGatewayService
}
func NewOpsService(
opsRepo OpsRepository,
settingRepo SettingRepository,
cfg *config.Config,
accountRepo AccountRepository,
concurrencyService *ConcurrencyService,
gatewayService *GatewayService,
openAIGatewayService *OpenAIGatewayService,
geminiCompatService *GeminiMessagesCompatService,
antigravityGatewayService *AntigravityGatewayService,
) *OpsService {
return &OpsService{
opsRepo: opsRepo,
settingRepo: settingRepo,
cfg: cfg,
accountRepo: accountRepo,
concurrencyService: concurrencyService,
gatewayService: gatewayService,
openAIGatewayService: openAIGatewayService,
geminiCompatService: geminiCompatService,
antigravityGatewayService: antigravityGatewayService,
}
}
func (s *OpsService) RequireMonitoringEnabled(ctx context.Context) error {
if s.IsMonitoringEnabled(ctx) {
return nil
}
return ErrOpsDisabled
}
func (s *OpsService) IsMonitoringEnabled(ctx context.Context) bool {
// Hard switch: disable ops entirely.
if s.cfg != nil && !s.cfg.Ops.Enabled {
return false
}
if s.settingRepo == nil {
return true
}
value, err := s.settingRepo.GetValue(ctx, SettingKeyOpsMonitoringEnabled)
if err != nil {
// Default enabled when key is missing, and fail-open on transient errors
// (ops should never block gateway traffic).
if errors.Is(err, ErrSettingNotFound) {
return true
}
return true
}
switch strings.ToLower(strings.TrimSpace(value)) {
case "false", "0", "off", "disabled":
return false
default:
return true
}
}
func (s *OpsService) RecordError(ctx context.Context, entry *OpsInsertErrorLogInput, rawRequestBody []byte) error {
if entry == nil {
return nil
}
if !s.IsMonitoringEnabled(ctx) {
return nil
}
if s.opsRepo == nil {
return nil
}
// Ensure timestamps are always populated.
if entry.CreatedAt.IsZero() {
entry.CreatedAt = time.Now()
}
// Ensure required fields exist (DB has NOT NULL constraints).
entry.ErrorPhase = strings.TrimSpace(entry.ErrorPhase)
entry.ErrorType = strings.TrimSpace(entry.ErrorType)
if entry.ErrorPhase == "" {
entry.ErrorPhase = "internal"
}
if entry.ErrorType == "" {
entry.ErrorType = "api_error"
}
// Sanitize + trim request body (errors only).
if len(rawRequestBody) > 0 {
sanitized, truncated, bytesLen := sanitizeAndTrimRequestBody(rawRequestBody, opsMaxStoredRequestBodyBytes)
if sanitized != "" {
entry.RequestBodyJSON = &sanitized
}
entry.RequestBodyTruncated = truncated
entry.RequestBodyBytes = &bytesLen
}
// Sanitize + truncate error_body to avoid storing sensitive data.
if strings.TrimSpace(entry.ErrorBody) != "" {
sanitized, _ := sanitizeErrorBodyForStorage(entry.ErrorBody, opsMaxStoredErrorBodyBytes)
entry.ErrorBody = sanitized
}
if _, err := s.opsRepo.InsertErrorLog(ctx, entry); err != nil {
// Never bubble up to gateway; best-effort logging.
log.Printf("[Ops] RecordError failed: %v", err)
return err
}
return nil
}
func (s *OpsService) GetErrorLogs(ctx context.Context, filter *OpsErrorLogFilter) (*OpsErrorLogList, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return &OpsErrorLogList{Errors: []*OpsErrorLog{}, Total: 0, Page: 1, PageSize: 20}, nil
}
return s.opsRepo.ListErrorLogs(ctx, filter)
}
func (s *OpsService) GetErrorLogByID(ctx context.Context, id int64) (*OpsErrorLogDetail, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.NotFound("OPS_ERROR_NOT_FOUND", "ops error log not found")
}
detail, err := s.opsRepo.GetErrorLogByID(ctx, id)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, infraerrors.NotFound("OPS_ERROR_NOT_FOUND", "ops error log not found")
}
return nil, infraerrors.InternalServer("OPS_ERROR_LOAD_FAILED", "Failed to load ops error log").WithCause(err)
}
return detail, nil
}
func sanitizeAndTrimRequestBody(raw []byte, maxBytes int) (jsonString string, truncated bool, bytesLen int) {
bytesLen = len(raw)
if len(raw) == 0 {
return "", false, 0
}
var decoded any
if err := json.Unmarshal(raw, &decoded); err != nil {
// If it's not valid JSON, don't store (retry would not be reliable anyway).
return "", false, bytesLen
}
decoded = redactSensitiveJSON(decoded)
encoded, err := json.Marshal(decoded)
if err != nil {
return "", false, bytesLen
}
if len(encoded) <= maxBytes {
return string(encoded), false, bytesLen
}
// Trim conversation history to keep the most recent context.
if root, ok := decoded.(map[string]any); ok {
if trimmed, ok := trimConversationArrays(root, maxBytes); ok {
encoded2, err2 := json.Marshal(trimmed)
if err2 == nil && len(encoded2) <= maxBytes {
return string(encoded2), true, bytesLen
}
// Fallthrough: keep shrinking.
decoded = trimmed
}
essential := shrinkToEssentials(root)
encoded3, err3 := json.Marshal(essential)
if err3 == nil && len(encoded3) <= maxBytes {
return string(encoded3), true, bytesLen
}
}
// Last resort: store a minimal placeholder (still valid JSON).
placeholder := map[string]any{
"request_body_truncated": true,
}
if model := extractString(decoded, "model"); model != "" {
placeholder["model"] = model
}
encoded4, err4 := json.Marshal(placeholder)
if err4 != nil {
return "", true, bytesLen
}
return string(encoded4), true, bytesLen
}
func redactSensitiveJSON(v any) any {
switch t := v.(type) {
case map[string]any:
out := make(map[string]any, len(t))
for k, vv := range t {
if isSensitiveKey(k) {
out[k] = "[REDACTED]"
continue
}
out[k] = redactSensitiveJSON(vv)
}
return out
case []any:
out := make([]any, 0, len(t))
for _, vv := range t {
out = append(out, redactSensitiveJSON(vv))
}
return out
default:
return v
}
}
func isSensitiveKey(key string) bool {
k := strings.ToLower(strings.TrimSpace(key))
if k == "" {
return false
}
// Exact matches (common credential fields).
switch k {
case "authorization",
"proxy-authorization",
"x-api-key",
"api_key",
"apikey",
"access_token",
"refresh_token",
"id_token",
"session_token",
"token",
"password",
"passwd",
"passphrase",
"secret",
"client_secret",
"private_key",
"jwt",
"signature",
"accesskeyid",
"secretaccesskey":
return true
}
// Suffix matches.
for _, suffix := range []string{
"_secret",
"_token",
"_id_token",
"_session_token",
"_password",
"_passwd",
"_passphrase",
"_key",
"secret_key",
"private_key",
} {
if strings.HasSuffix(k, suffix) {
return true
}
}
// Substring matches (conservative, but errs on the side of privacy).
for _, sub := range []string{
"secret",
"token",
"password",
"passwd",
"passphrase",
"privatekey",
"private_key",
"apikey",
"api_key",
"accesskeyid",
"secretaccesskey",
"bearer",
"cookie",
"credential",
"session",
"jwt",
"signature",
} {
if strings.Contains(k, sub) {
return true
}
}
return false
}
func trimConversationArrays(root map[string]any, maxBytes int) (map[string]any, bool) {
// Supported: anthropic/openai: messages; gemini: contents.
if out, ok := trimArrayField(root, "messages", maxBytes); ok {
return out, true
}
if out, ok := trimArrayField(root, "contents", maxBytes); ok {
return out, true
}
return root, false
}
func trimArrayField(root map[string]any, field string, maxBytes int) (map[string]any, bool) {
raw, ok := root[field]
if !ok {
return nil, false
}
arr, ok := raw.([]any)
if !ok || len(arr) == 0 {
return nil, false
}
// Keep at least the last message/content. Use binary search so we don't marshal O(n) times.
// We are dropping from the *front* of the array (oldest context first).
lo := 0
hi := len(arr) - 1 // inclusive; hi ensures at least one item remains
var best map[string]any
found := false
for lo <= hi {
mid := (lo + hi) / 2
candidateArr := arr[mid:]
if len(candidateArr) == 0 {
lo = mid + 1
continue
}
next := shallowCopyMap(root)
next[field] = candidateArr
encoded, err := json.Marshal(next)
if err != nil {
// If marshal fails, try dropping more.
lo = mid + 1
continue
}
if len(encoded) <= maxBytes {
best = next
found = true
// Try to keep more context by dropping fewer items.
hi = mid - 1
continue
}
// Need to drop more.
lo = mid + 1
}
if found {
return best, true
}
// Nothing fit (even with only one element); return the smallest slice and let the
// caller fall back to shrinkToEssentials().
next := shallowCopyMap(root)
next[field] = arr[len(arr)-1:]
return next, true
}
func shrinkToEssentials(root map[string]any) map[string]any {
out := make(map[string]any)
for _, key := range []string{"model", "stream", "max_tokens", "temperature", "top_p", "top_k"} {
if v, ok := root[key]; ok {
out[key] = v
}
}
// Keep only the last element of the conversation array.
if v, ok := root["messages"]; ok {
if arr, ok := v.([]any); ok && len(arr) > 0 {
out["messages"] = []any{arr[len(arr)-1]}
}
}
if v, ok := root["contents"]; ok {
if arr, ok := v.([]any); ok && len(arr) > 0 {
out["contents"] = []any{arr[len(arr)-1]}
}
}
return out
}
func shallowCopyMap(m map[string]any) map[string]any {
out := make(map[string]any, len(m))
for k, v := range m {
out[k] = v
}
return out
}
func sanitizeErrorBodyForStorage(raw string, maxBytes int) (sanitized string, truncated bool) {
raw = strings.TrimSpace(raw)
if raw == "" {
return "", false
}
// Prefer JSON-safe sanitization when possible.
if out, trunc, _ := sanitizeAndTrimRequestBody([]byte(raw), maxBytes); out != "" {
return out, trunc
}
// Non-JSON: best-effort truncate.
if maxBytes > 0 && len(raw) > maxBytes {
return truncateString(raw, maxBytes), true
}
return raw, false
}
func extractString(v any, key string) string {
root, ok := v.(map[string]any)
if !ok {
return ""
}
s, _ := root[key].(string)
return strings.TrimSpace(s)
}
This diff is collapsed.
package service
import (
"context"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
func (s *OpsService) GetThroughputTrend(ctx context.Context, filter *OpsDashboardFilter, bucketSeconds int) (*OpsThroughputTrendResponse, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if filter == nil {
return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required")
}
if filter.StartTime.IsZero() || filter.EndTime.IsZero() {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required")
}
if filter.StartTime.After(filter.EndTime) {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time")
}
return s.opsRepo.GetThroughputTrend(ctx, filter, bucketSeconds)
}
package service
import (
"context"
"time"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
// GetWindowStats returns lightweight request/token counts for the provided window.
// It is intended for realtime sampling (e.g. WebSocket QPS push) without computing percentiles/peaks.
func (s *OpsService) GetWindowStats(ctx context.Context, startTime, endTime time.Time) (*OpsWindowStats, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
filter := &OpsDashboardFilter{
StartTime: startTime,
EndTime: endTime,
}
return s.opsRepo.GetWindowStats(ctx, filter)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment