Commit b9b4db3d authored by song's avatar song
Browse files

Merge upstream/main

parents 5a6f60a9 dae0d532
package service
import (
"context"
"database/sql"
"errors"
"strings"
"time"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
func (s *OpsService) ListAlertRules(ctx context.Context) ([]*OpsAlertRule, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return []*OpsAlertRule{}, nil
}
return s.opsRepo.ListAlertRules(ctx)
}
func (s *OpsService) CreateAlertRule(ctx context.Context, rule *OpsAlertRule) (*OpsAlertRule, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if rule == nil {
return nil, infraerrors.BadRequest("INVALID_RULE", "invalid rule")
}
created, err := s.opsRepo.CreateAlertRule(ctx, rule)
if err != nil {
return nil, err
}
return created, nil
}
func (s *OpsService) UpdateAlertRule(ctx context.Context, rule *OpsAlertRule) (*OpsAlertRule, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if rule == nil || rule.ID <= 0 {
return nil, infraerrors.BadRequest("INVALID_RULE", "invalid rule")
}
updated, err := s.opsRepo.UpdateAlertRule(ctx, rule)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, infraerrors.NotFound("OPS_ALERT_RULE_NOT_FOUND", "alert rule not found")
}
return nil, err
}
return updated, nil
}
func (s *OpsService) DeleteAlertRule(ctx context.Context, id int64) error {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return err
}
if s.opsRepo == nil {
return infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if id <= 0 {
return infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id")
}
if err := s.opsRepo.DeleteAlertRule(ctx, id); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return infraerrors.NotFound("OPS_ALERT_RULE_NOT_FOUND", "alert rule not found")
}
return err
}
return nil
}
func (s *OpsService) ListAlertEvents(ctx context.Context, filter *OpsAlertEventFilter) ([]*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return []*OpsAlertEvent{}, nil
}
return s.opsRepo.ListAlertEvents(ctx, filter)
}
func (s *OpsService) GetAlertEventByID(ctx context.Context, eventID int64) (*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if eventID <= 0 {
return nil, infraerrors.BadRequest("INVALID_EVENT_ID", "invalid event id")
}
ev, err := s.opsRepo.GetAlertEventByID(ctx, eventID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, infraerrors.NotFound("OPS_ALERT_EVENT_NOT_FOUND", "alert event not found")
}
return nil, err
}
if ev == nil {
return nil, infraerrors.NotFound("OPS_ALERT_EVENT_NOT_FOUND", "alert event not found")
}
return ev, nil
}
func (s *OpsService) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if ruleID <= 0 {
return nil, infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id")
}
return s.opsRepo.GetActiveAlertEvent(ctx, ruleID)
}
func (s *OpsService) CreateAlertSilence(ctx context.Context, input *OpsAlertSilence) (*OpsAlertSilence, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if input == nil {
return nil, infraerrors.BadRequest("INVALID_SILENCE", "invalid silence")
}
if input.RuleID <= 0 {
return nil, infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id")
}
if strings.TrimSpace(input.Platform) == "" {
return nil, infraerrors.BadRequest("INVALID_PLATFORM", "invalid platform")
}
if input.Until.IsZero() {
return nil, infraerrors.BadRequest("INVALID_UNTIL", "invalid until")
}
created, err := s.opsRepo.CreateAlertSilence(ctx, input)
if err != nil {
return nil, err
}
return created, nil
}
func (s *OpsService) IsAlertSilenced(ctx context.Context, ruleID int64, platform string, groupID *int64, region *string, now time.Time) (bool, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return false, err
}
if s.opsRepo == nil {
return false, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if ruleID <= 0 {
return false, infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id")
}
if strings.TrimSpace(platform) == "" {
return false, nil
}
return s.opsRepo.IsAlertSilenced(ctx, ruleID, platform, groupID, region, now)
}
func (s *OpsService) GetLatestAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if ruleID <= 0 {
return nil, infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id")
}
return s.opsRepo.GetLatestAlertEvent(ctx, ruleID)
}
func (s *OpsService) CreateAlertEvent(ctx context.Context, event *OpsAlertEvent) (*OpsAlertEvent, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if event == nil {
return nil, infraerrors.BadRequest("INVALID_EVENT", "invalid event")
}
created, err := s.opsRepo.CreateAlertEvent(ctx, event)
if err != nil {
return nil, err
}
return created, nil
}
func (s *OpsService) UpdateAlertEventStatus(ctx context.Context, eventID int64, status string, resolvedAt *time.Time) error {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return err
}
if s.opsRepo == nil {
return infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if eventID <= 0 {
return infraerrors.BadRequest("INVALID_EVENT_ID", "invalid event id")
}
status = strings.TrimSpace(status)
if status == "" {
return infraerrors.BadRequest("INVALID_STATUS", "invalid status")
}
if status != OpsAlertStatusResolved && status != OpsAlertStatusManualResolved {
return infraerrors.BadRequest("INVALID_STATUS", "invalid status")
}
return s.opsRepo.UpdateAlertEventStatus(ctx, eventID, status, resolvedAt)
}
func (s *OpsService) UpdateAlertEventEmailSent(ctx context.Context, eventID int64, emailSent bool) error {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return err
}
if s.opsRepo == nil {
return infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if eventID <= 0 {
return infraerrors.BadRequest("INVALID_EVENT_ID", "invalid event id")
}
return s.opsRepo.UpdateAlertEventEmailSent(ctx, eventID, emailSent)
}
package service
import (
"context"
"database/sql"
"fmt"
"log"
"strings"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
)
const (
opsCleanupJobName = "ops_cleanup"
opsCleanupLeaderLockKeyDefault = "ops:cleanup:leader"
opsCleanupLeaderLockTTLDefault = 30 * time.Minute
)
var opsCleanupCronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
var opsCleanupReleaseScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`)
// OpsCleanupService periodically deletes old ops data to prevent unbounded DB growth.
//
// - Scheduling: 5-field cron spec (minute hour dom month dow).
// - Multi-instance: best-effort Redis leader lock so only one node runs cleanup.
// - Safety: deletes in batches to avoid long transactions.
type OpsCleanupService struct {
opsRepo OpsRepository
db *sql.DB
redisClient *redis.Client
cfg *config.Config
instanceID string
cron *cron.Cron
startOnce sync.Once
stopOnce sync.Once
warnNoRedisOnce sync.Once
}
func NewOpsCleanupService(
opsRepo OpsRepository,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
) *OpsCleanupService {
return &OpsCleanupService{
opsRepo: opsRepo,
db: db,
redisClient: redisClient,
cfg: cfg,
instanceID: uuid.NewString(),
}
}
func (s *OpsCleanupService) Start() {
if s == nil {
return
}
if s.cfg != nil && !s.cfg.Ops.Enabled {
return
}
if s.cfg != nil && !s.cfg.Ops.Cleanup.Enabled {
log.Printf("[OpsCleanup] not started (disabled)")
return
}
if s.opsRepo == nil || s.db == nil {
log.Printf("[OpsCleanup] not started (missing deps)")
return
}
s.startOnce.Do(func() {
schedule := "0 2 * * *"
if s.cfg != nil && strings.TrimSpace(s.cfg.Ops.Cleanup.Schedule) != "" {
schedule = strings.TrimSpace(s.cfg.Ops.Cleanup.Schedule)
}
loc := time.Local
if s.cfg != nil && strings.TrimSpace(s.cfg.Timezone) != "" {
if parsed, err := time.LoadLocation(strings.TrimSpace(s.cfg.Timezone)); err == nil && parsed != nil {
loc = parsed
}
}
c := cron.New(cron.WithParser(opsCleanupCronParser), cron.WithLocation(loc))
_, err := c.AddFunc(schedule, func() { s.runScheduled() })
if err != nil {
log.Printf("[OpsCleanup] not started (invalid schedule=%q): %v", schedule, err)
return
}
s.cron = c
s.cron.Start()
log.Printf("[OpsCleanup] started (schedule=%q tz=%s)", schedule, loc.String())
})
}
func (s *OpsCleanupService) Stop() {
if s == nil {
return
}
s.stopOnce.Do(func() {
if s.cron != nil {
ctx := s.cron.Stop()
select {
case <-ctx.Done():
case <-time.After(3 * time.Second):
log.Printf("[OpsCleanup] cron stop timed out")
}
}
})
}
func (s *OpsCleanupService) runScheduled() {
if s == nil || s.db == nil || s.opsRepo == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
release, ok := s.tryAcquireLeaderLock(ctx)
if !ok {
return
}
if release != nil {
defer release()
}
startedAt := time.Now().UTC()
runAt := startedAt
counts, err := s.runCleanupOnce(ctx)
if err != nil {
s.recordHeartbeatError(runAt, time.Since(startedAt), err)
log.Printf("[OpsCleanup] cleanup failed: %v", err)
return
}
s.recordHeartbeatSuccess(runAt, time.Since(startedAt), counts)
log.Printf("[OpsCleanup] cleanup complete: %s", counts)
}
type opsCleanupDeletedCounts struct {
errorLogs int64
retryAttempts int64
alertEvents int64
systemMetrics int64
hourlyPreagg int64
dailyPreagg int64
}
func (c opsCleanupDeletedCounts) String() string {
return fmt.Sprintf(
"error_logs=%d retry_attempts=%d alert_events=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d",
c.errorLogs,
c.retryAttempts,
c.alertEvents,
c.systemMetrics,
c.hourlyPreagg,
c.dailyPreagg,
)
}
func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDeletedCounts, error) {
out := opsCleanupDeletedCounts{}
if s == nil || s.db == nil || s.cfg == nil {
return out, nil
}
batchSize := 5000
now := time.Now().UTC()
// Error-like tables: error logs / retry attempts / alert events.
if days := s.cfg.Ops.Cleanup.ErrorLogRetentionDays; days > 0 {
cutoff := now.AddDate(0, 0, -days)
n, err := deleteOldRowsByID(ctx, s.db, "ops_error_logs", "created_at", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.errorLogs = n
n, err = deleteOldRowsByID(ctx, s.db, "ops_retry_attempts", "created_at", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.retryAttempts = n
n, err = deleteOldRowsByID(ctx, s.db, "ops_alert_events", "created_at", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.alertEvents = n
}
// Minute-level metrics snapshots.
if days := s.cfg.Ops.Cleanup.MinuteMetricsRetentionDays; days > 0 {
cutoff := now.AddDate(0, 0, -days)
n, err := deleteOldRowsByID(ctx, s.db, "ops_system_metrics", "created_at", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.systemMetrics = n
}
// Pre-aggregation tables (hourly/daily).
if days := s.cfg.Ops.Cleanup.HourlyMetricsRetentionDays; days > 0 {
cutoff := now.AddDate(0, 0, -days)
n, err := deleteOldRowsByID(ctx, s.db, "ops_metrics_hourly", "bucket_start", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.hourlyPreagg = n
n, err = deleteOldRowsByID(ctx, s.db, "ops_metrics_daily", "bucket_date", cutoff, batchSize, true)
if err != nil {
return out, err
}
out.dailyPreagg = n
}
return out, nil
}
func deleteOldRowsByID(
ctx context.Context,
db *sql.DB,
table string,
timeColumn string,
cutoff time.Time,
batchSize int,
castCutoffToDate bool,
) (int64, error) {
if db == nil {
return 0, nil
}
if batchSize <= 0 {
batchSize = 5000
}
where := fmt.Sprintf("%s < $1", timeColumn)
if castCutoffToDate {
where = fmt.Sprintf("%s < $1::date", timeColumn)
}
q := fmt.Sprintf(`
WITH batch AS (
SELECT id FROM %s
WHERE %s
ORDER BY id
LIMIT $2
)
DELETE FROM %s
WHERE id IN (SELECT id FROM batch)
`, table, where, table)
var total int64
for {
res, err := db.ExecContext(ctx, q, cutoff, batchSize)
if err != nil {
// If ops tables aren't present yet (partial deployments), treat as no-op.
if strings.Contains(strings.ToLower(err.Error()), "does not exist") && strings.Contains(strings.ToLower(err.Error()), "relation") {
return total, nil
}
return total, err
}
affected, err := res.RowsAffected()
if err != nil {
return total, err
}
total += affected
if affected == 0 {
break
}
}
return total, nil
}
func (s *OpsCleanupService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) {
if s == nil {
return nil, false
}
// In simple run mode, assume single instance.
if s.cfg != nil && s.cfg.RunMode == config.RunModeSimple {
return nil, true
}
key := opsCleanupLeaderLockKeyDefault
ttl := opsCleanupLeaderLockTTLDefault
// Prefer Redis leader lock when available, but avoid stampeding the DB when Redis is flaky by
// falling back to a DB advisory lock.
if s.redisClient != nil {
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
if err == nil {
if !ok {
return nil, false
}
return func() {
_, _ = opsCleanupReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result()
}, true
}
// Redis error: fall back to DB advisory lock.
s.warnNoRedisOnce.Do(func() {
log.Printf("[OpsCleanup] leader lock SetNX failed; falling back to DB advisory lock: %v", err)
})
} else {
s.warnNoRedisOnce.Do(func() {
log.Printf("[OpsCleanup] redis not configured; using DB advisory lock")
})
}
release, ok := tryAcquireDBAdvisoryLock(ctx, s.db, hashAdvisoryLockID(key))
if !ok {
return nil, false
}
return release, true
}
func (s *OpsCleanupService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration, counts opsCleanupDeletedCounts) {
if s == nil || s.opsRepo == nil {
return
}
now := time.Now().UTC()
durMs := duration.Milliseconds()
result := truncateString(counts.String(), 2048)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsCleanupJobName,
LastRunAt: &runAt,
LastSuccessAt: &now,
LastDurationMs: &durMs,
LastResult: &result,
})
}
func (s *OpsCleanupService) recordHeartbeatError(runAt time.Time, duration time.Duration, err error) {
if s == nil || s.opsRepo == nil || err == nil {
return
}
now := time.Now().UTC()
durMs := duration.Milliseconds()
msg := truncateString(err.Error(), 2048)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsCleanupJobName,
LastRunAt: &runAt,
LastErrorAt: &now,
LastError: &msg,
LastDurationMs: &durMs,
})
}
package service
import (
"context"
"log"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
)
const (
opsAccountsPageSize = 100
opsConcurrencyBatchChunkSize = 200
)
func (s *OpsService) listAllAccountsForOps(ctx context.Context, platformFilter string) ([]Account, error) {
if s == nil || s.accountRepo == nil {
return []Account{}, nil
}
out := make([]Account, 0, 128)
page := 1
for {
accounts, pageInfo, err := s.accountRepo.ListWithFilters(ctx, pagination.PaginationParams{
Page: page,
PageSize: opsAccountsPageSize,
}, platformFilter, "", "", "")
if err != nil {
return nil, err
}
if len(accounts) == 0 {
break
}
out = append(out, accounts...)
if pageInfo != nil && int64(len(out)) >= pageInfo.Total {
break
}
if len(accounts) < opsAccountsPageSize {
break
}
page++
if page > 10_000 {
log.Printf("[Ops] listAllAccountsForOps: aborting after too many pages (platform=%q)", platformFilter)
break
}
}
return out, nil
}
func (s *OpsService) getAccountsLoadMapBestEffort(ctx context.Context, accounts []Account) map[int64]*AccountLoadInfo {
if s == nil || s.concurrencyService == nil {
return map[int64]*AccountLoadInfo{}
}
if len(accounts) == 0 {
return map[int64]*AccountLoadInfo{}
}
// De-duplicate IDs (and keep the max concurrency to avoid under-reporting).
unique := make(map[int64]int, len(accounts))
for _, acc := range accounts {
if acc.ID <= 0 {
continue
}
if prev, ok := unique[acc.ID]; !ok || acc.Concurrency > prev {
unique[acc.ID] = acc.Concurrency
}
}
batch := make([]AccountWithConcurrency, 0, len(unique))
for id, maxConc := range unique {
batch = append(batch, AccountWithConcurrency{
ID: id,
MaxConcurrency: maxConc,
})
}
out := make(map[int64]*AccountLoadInfo, len(batch))
for i := 0; i < len(batch); i += opsConcurrencyBatchChunkSize {
end := i + opsConcurrencyBatchChunkSize
if end > len(batch) {
end = len(batch)
}
part, err := s.concurrencyService.GetAccountsLoadBatch(ctx, batch[i:end])
if err != nil {
// Best-effort: return zeros rather than failing the ops UI.
log.Printf("[Ops] GetAccountsLoadBatch failed: %v", err)
continue
}
for k, v := range part {
out[k] = v
}
}
return out
}
// GetConcurrencyStats returns real-time concurrency usage aggregated by platform/group/account.
//
// Optional filters:
// - platformFilter: only include accounts in that platform (best-effort reduces DB load)
// - groupIDFilter: only include accounts that belong to that group
func (s *OpsService) GetConcurrencyStats(
ctx context.Context,
platformFilter string,
groupIDFilter *int64,
) (map[string]*PlatformConcurrencyInfo, map[int64]*GroupConcurrencyInfo, map[int64]*AccountConcurrencyInfo, *time.Time, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, nil, nil, nil, err
}
accounts, err := s.listAllAccountsForOps(ctx, platformFilter)
if err != nil {
return nil, nil, nil, nil, err
}
collectedAt := time.Now()
loadMap := s.getAccountsLoadMapBestEffort(ctx, accounts)
platform := make(map[string]*PlatformConcurrencyInfo)
group := make(map[int64]*GroupConcurrencyInfo)
account := make(map[int64]*AccountConcurrencyInfo)
for _, acc := range accounts {
if acc.ID <= 0 {
continue
}
var matchedGroup *Group
if groupIDFilter != nil && *groupIDFilter > 0 {
for _, grp := range acc.Groups {
if grp == nil || grp.ID <= 0 {
continue
}
if grp.ID == *groupIDFilter {
matchedGroup = grp
break
}
}
// Group filter provided: skip accounts not in that group.
if matchedGroup == nil {
continue
}
}
load := loadMap[acc.ID]
currentInUse := int64(0)
waiting := int64(0)
if load != nil {
currentInUse = int64(load.CurrentConcurrency)
waiting = int64(load.WaitingCount)
}
// Account-level view picks one display group (the first group).
displayGroupID := int64(0)
displayGroupName := ""
if matchedGroup != nil {
displayGroupID = matchedGroup.ID
displayGroupName = matchedGroup.Name
} else if len(acc.Groups) > 0 && acc.Groups[0] != nil {
displayGroupID = acc.Groups[0].ID
displayGroupName = acc.Groups[0].Name
}
if _, ok := account[acc.ID]; !ok {
info := &AccountConcurrencyInfo{
AccountID: acc.ID,
AccountName: acc.Name,
Platform: acc.Platform,
GroupID: displayGroupID,
GroupName: displayGroupName,
CurrentInUse: currentInUse,
MaxCapacity: int64(acc.Concurrency),
WaitingInQueue: waiting,
}
if info.MaxCapacity > 0 {
info.LoadPercentage = float64(info.CurrentInUse) / float64(info.MaxCapacity) * 100
}
account[acc.ID] = info
}
// Platform aggregation.
if acc.Platform != "" {
if _, ok := platform[acc.Platform]; !ok {
platform[acc.Platform] = &PlatformConcurrencyInfo{
Platform: acc.Platform,
}
}
p := platform[acc.Platform]
p.MaxCapacity += int64(acc.Concurrency)
p.CurrentInUse += currentInUse
p.WaitingInQueue += waiting
}
// Group aggregation (one account may contribute to multiple groups).
if matchedGroup != nil {
grp := matchedGroup
if _, ok := group[grp.ID]; !ok {
group[grp.ID] = &GroupConcurrencyInfo{
GroupID: grp.ID,
GroupName: grp.Name,
Platform: grp.Platform,
}
}
g := group[grp.ID]
if g.GroupName == "" && grp.Name != "" {
g.GroupName = grp.Name
}
if g.Platform != "" && grp.Platform != "" && g.Platform != grp.Platform {
// Groups are expected to be platform-scoped. If mismatch is observed, avoid misleading labels.
g.Platform = ""
}
g.MaxCapacity += int64(acc.Concurrency)
g.CurrentInUse += currentInUse
g.WaitingInQueue += waiting
} else {
for _, grp := range acc.Groups {
if grp == nil || grp.ID <= 0 {
continue
}
if _, ok := group[grp.ID]; !ok {
group[grp.ID] = &GroupConcurrencyInfo{
GroupID: grp.ID,
GroupName: grp.Name,
Platform: grp.Platform,
}
}
g := group[grp.ID]
if g.GroupName == "" && grp.Name != "" {
g.GroupName = grp.Name
}
if g.Platform != "" && grp.Platform != "" && g.Platform != grp.Platform {
// Groups are expected to be platform-scoped. If mismatch is observed, avoid misleading labels.
g.Platform = ""
}
g.MaxCapacity += int64(acc.Concurrency)
g.CurrentInUse += currentInUse
g.WaitingInQueue += waiting
}
}
}
for _, info := range platform {
if info.MaxCapacity > 0 {
info.LoadPercentage = float64(info.CurrentInUse) / float64(info.MaxCapacity) * 100
}
}
for _, info := range group {
if info.MaxCapacity > 0 {
info.LoadPercentage = float64(info.CurrentInUse) / float64(info.MaxCapacity) * 100
}
}
return platform, group, account, &collectedAt, nil
}
package service
import (
"context"
"database/sql"
"errors"
"log"
"time"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
func (s *OpsService) GetDashboardOverview(ctx context.Context, filter *OpsDashboardFilter) (*OpsDashboardOverview, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if filter == nil {
return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required")
}
if filter.StartTime.IsZero() || filter.EndTime.IsZero() {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required")
}
if filter.StartTime.After(filter.EndTime) {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time")
}
// Resolve query mode (requested via query param, or DB default).
filter.QueryMode = s.resolveOpsQueryMode(ctx, filter.QueryMode)
overview, err := s.opsRepo.GetDashboardOverview(ctx, filter)
if err != nil {
if errors.Is(err, ErrOpsPreaggregatedNotPopulated) {
return nil, infraerrors.Conflict("OPS_PREAGG_NOT_READY", "Pre-aggregated ops metrics are not populated yet")
}
return nil, err
}
// Best-effort system health + jobs; dashboard metrics should still render if these are missing.
if metrics, err := s.opsRepo.GetLatestSystemMetrics(ctx, 1); err == nil {
// Attach config-derived limits so the UI can show "current / max" for connection pools.
// These are best-effort and should never block the dashboard rendering.
if s != nil && s.cfg != nil {
if s.cfg.Database.MaxOpenConns > 0 {
metrics.DBMaxOpenConns = intPtr(s.cfg.Database.MaxOpenConns)
}
if s.cfg.Redis.PoolSize > 0 {
metrics.RedisPoolSize = intPtr(s.cfg.Redis.PoolSize)
}
}
overview.SystemMetrics = metrics
} else if err != nil && !errors.Is(err, sql.ErrNoRows) {
log.Printf("[Ops] GetLatestSystemMetrics failed: %v", err)
}
if heartbeats, err := s.opsRepo.ListJobHeartbeats(ctx); err == nil {
overview.JobHeartbeats = heartbeats
} else {
log.Printf("[Ops] ListJobHeartbeats failed: %v", err)
}
overview.HealthScore = computeDashboardHealthScore(time.Now().UTC(), overview)
return overview, nil
}
func (s *OpsService) resolveOpsQueryMode(ctx context.Context, requested OpsQueryMode) OpsQueryMode {
if requested.IsValid() {
// Allow "auto" to be disabled via config until preagg is proven stable in production.
// Forced `preagg` via query param still works.
if requested == OpsQueryModeAuto && s != nil && s.cfg != nil && !s.cfg.Ops.UsePreaggregatedTables {
return OpsQueryModeRaw
}
return requested
}
mode := OpsQueryModeAuto
if s != nil && s.settingRepo != nil {
if raw, err := s.settingRepo.GetValue(ctx, SettingKeyOpsQueryModeDefault); err == nil {
mode = ParseOpsQueryMode(raw)
}
}
if mode == OpsQueryModeAuto && s != nil && s.cfg != nil && !s.cfg.Ops.UsePreaggregatedTables {
return OpsQueryModeRaw
}
return mode
}
package service
import "time"
type OpsDashboardFilter struct {
StartTime time.Time
EndTime time.Time
Platform string
GroupID *int64
// QueryMode controls whether dashboard queries should use raw logs or pre-aggregated tables.
// Expected values: auto/raw/preagg (see OpsQueryMode).
QueryMode OpsQueryMode
}
type OpsRateSummary struct {
Current float64 `json:"current"`
Peak float64 `json:"peak"`
Avg float64 `json:"avg"`
}
type OpsPercentiles struct {
P50 *int `json:"p50_ms"`
P90 *int `json:"p90_ms"`
P95 *int `json:"p95_ms"`
P99 *int `json:"p99_ms"`
Avg *int `json:"avg_ms"`
Max *int `json:"max_ms"`
}
type OpsDashboardOverview struct {
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Platform string `json:"platform"`
GroupID *int64 `json:"group_id"`
// HealthScore is a backend-computed overall health score (0-100).
// It is derived from the monitored metrics in this overview, plus best-effort system metrics/job heartbeats.
HealthScore int `json:"health_score"`
// Latest system-level snapshot (window=1m, global).
SystemMetrics *OpsSystemMetricsSnapshot `json:"system_metrics"`
// Background jobs health (heartbeats).
JobHeartbeats []*OpsJobHeartbeat `json:"job_heartbeats"`
SuccessCount int64 `json:"success_count"`
ErrorCountTotal int64 `json:"error_count_total"`
BusinessLimitedCount int64 `json:"business_limited_count"`
ErrorCountSLA int64 `json:"error_count_sla"`
RequestCountTotal int64 `json:"request_count_total"`
RequestCountSLA int64 `json:"request_count_sla"`
TokenConsumed int64 `json:"token_consumed"`
SLA float64 `json:"sla"`
ErrorRate float64 `json:"error_rate"`
UpstreamErrorRate float64 `json:"upstream_error_rate"`
UpstreamErrorCountExcl429529 int64 `json:"upstream_error_count_excl_429_529"`
Upstream429Count int64 `json:"upstream_429_count"`
Upstream529Count int64 `json:"upstream_529_count"`
QPS OpsRateSummary `json:"qps"`
TPS OpsRateSummary `json:"tps"`
Duration OpsPercentiles `json:"duration"`
TTFT OpsPercentiles `json:"ttft"`
}
type OpsLatencyHistogramBucket struct {
Range string `json:"range"`
Count int64 `json:"count"`
}
// OpsLatencyHistogramResponse is a coarse latency distribution histogram (success requests only).
// It is used by the Ops dashboard to quickly identify tail latency regressions.
type OpsLatencyHistogramResponse struct {
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Platform string `json:"platform"`
GroupID *int64 `json:"group_id"`
TotalRequests int64 `json:"total_requests"`
Buckets []*OpsLatencyHistogramBucket `json:"buckets"`
}
package service
import (
"context"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
func (s *OpsService) GetErrorTrend(ctx context.Context, filter *OpsDashboardFilter, bucketSeconds int) (*OpsErrorTrendResponse, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if filter == nil {
return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required")
}
if filter.StartTime.IsZero() || filter.EndTime.IsZero() {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required")
}
if filter.StartTime.After(filter.EndTime) {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time")
}
return s.opsRepo.GetErrorTrend(ctx, filter, bucketSeconds)
}
func (s *OpsService) GetErrorDistribution(ctx context.Context, filter *OpsDashboardFilter) (*OpsErrorDistributionResponse, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if filter == nil {
return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required")
}
if filter.StartTime.IsZero() || filter.EndTime.IsZero() {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required")
}
if filter.StartTime.After(filter.EndTime) {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time")
}
return s.opsRepo.GetErrorDistribution(ctx, filter)
}
package service
import (
"math"
"time"
)
// computeDashboardHealthScore computes a 0-100 health score from the metrics returned by the dashboard overview.
//
// Design goals:
// - Backend-owned scoring (UI only displays).
// - Layered scoring: Business Health (70%) + Infrastructure Health (30%)
// - Avoids double-counting (e.g., DB failure affects both infra and business metrics)
// - Conservative + stable: penalize clear degradations; avoid overreacting to missing/idle data.
func computeDashboardHealthScore(now time.Time, overview *OpsDashboardOverview) int {
if overview == nil {
return 0
}
// Idle/no-data: avoid showing a "bad" score when there is no traffic.
// UI can still render a gray/idle state based on QPS + error rate.
if overview.RequestCountSLA <= 0 && overview.RequestCountTotal <= 0 && overview.ErrorCountTotal <= 0 {
return 100
}
businessHealth := computeBusinessHealth(overview)
infraHealth := computeInfraHealth(now, overview)
// Weighted combination: 70% business + 30% infrastructure
score := businessHealth*0.7 + infraHealth*0.3
return int(math.Round(clampFloat64(score, 0, 100)))
}
// computeBusinessHealth calculates business health score (0-100)
// Components: Error Rate (50%) + TTFT (50%)
func computeBusinessHealth(overview *OpsDashboardOverview) float64 {
// Error rate score: 1% → 100, 10% → 0 (linear)
// Combines request errors and upstream errors
errorScore := 100.0
errorPct := clampFloat64(overview.ErrorRate*100, 0, 100)
upstreamPct := clampFloat64(overview.UpstreamErrorRate*100, 0, 100)
combinedErrorPct := math.Max(errorPct, upstreamPct) // Use worst case
if combinedErrorPct > 1.0 {
if combinedErrorPct <= 10.0 {
errorScore = (10.0 - combinedErrorPct) / 9.0 * 100
} else {
errorScore = 0
}
}
// TTFT score: 1s → 100, 3s → 0 (linear)
// Time to first token is critical for user experience
ttftScore := 100.0
if overview.TTFT.P99 != nil {
p99 := float64(*overview.TTFT.P99)
if p99 > 1000 {
if p99 <= 3000 {
ttftScore = (3000 - p99) / 2000 * 100
} else {
ttftScore = 0
}
}
}
// Weighted combination: 50% error rate + 50% TTFT
return errorScore*0.5 + ttftScore*0.5
}
// computeInfraHealth calculates infrastructure health score (0-100)
// Components: Storage (40%) + Compute Resources (30%) + Background Jobs (30%)
func computeInfraHealth(now time.Time, overview *OpsDashboardOverview) float64 {
// Storage score: DB critical, Redis less critical
storageScore := 100.0
if overview.SystemMetrics != nil {
if overview.SystemMetrics.DBOK != nil && !*overview.SystemMetrics.DBOK {
storageScore = 0 // DB failure is critical
} else if overview.SystemMetrics.RedisOK != nil && !*overview.SystemMetrics.RedisOK {
storageScore = 50 // Redis failure is degraded but not critical
}
}
// Compute resources score: CPU + Memory
computeScore := 100.0
if overview.SystemMetrics != nil {
cpuScore := 100.0
if overview.SystemMetrics.CPUUsagePercent != nil {
cpuPct := clampFloat64(*overview.SystemMetrics.CPUUsagePercent, 0, 100)
if cpuPct > 80 {
if cpuPct <= 100 {
cpuScore = (100 - cpuPct) / 20 * 100
} else {
cpuScore = 0
}
}
}
memScore := 100.0
if overview.SystemMetrics.MemoryUsagePercent != nil {
memPct := clampFloat64(*overview.SystemMetrics.MemoryUsagePercent, 0, 100)
if memPct > 85 {
if memPct <= 100 {
memScore = (100 - memPct) / 15 * 100
} else {
memScore = 0
}
}
}
computeScore = (cpuScore + memScore) / 2
}
// Background jobs score
jobScore := 100.0
failedJobs := 0
totalJobs := 0
for _, hb := range overview.JobHeartbeats {
if hb == nil {
continue
}
totalJobs++
if hb.LastErrorAt != nil && (hb.LastSuccessAt == nil || hb.LastErrorAt.After(*hb.LastSuccessAt)) {
failedJobs++
} else if hb.LastSuccessAt != nil && now.Sub(*hb.LastSuccessAt) > 15*time.Minute {
failedJobs++
}
}
if totalJobs > 0 && failedJobs > 0 {
jobScore = (1 - float64(failedJobs)/float64(totalJobs)) * 100
}
// Weighted combination
return storageScore*0.4 + computeScore*0.3 + jobScore*0.3
}
func clampFloat64(v float64, min float64, max float64) float64 {
if v < min {
return min
}
if v > max {
return max
}
return v
}
//go:build unit
package service
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestComputeDashboardHealthScore_IdleReturns100(t *testing.T) {
t.Parallel()
score := computeDashboardHealthScore(time.Now().UTC(), &OpsDashboardOverview{})
require.Equal(t, 100, score)
}
func TestComputeDashboardHealthScore_DegradesOnBadSignals(t *testing.T) {
t.Parallel()
ov := &OpsDashboardOverview{
RequestCountTotal: 100,
RequestCountSLA: 100,
SuccessCount: 90,
ErrorCountTotal: 10,
ErrorCountSLA: 10,
SLA: 0.90,
ErrorRate: 0.10,
UpstreamErrorRate: 0.08,
Duration: OpsPercentiles{P99: intPtr(20_000)},
TTFT: OpsPercentiles{P99: intPtr(2_000)},
SystemMetrics: &OpsSystemMetricsSnapshot{
DBOK: boolPtr(false),
RedisOK: boolPtr(false),
CPUUsagePercent: float64Ptr(98.0),
MemoryUsagePercent: float64Ptr(97.0),
DBConnWaiting: intPtr(3),
ConcurrencyQueueDepth: intPtr(10),
},
JobHeartbeats: []*OpsJobHeartbeat{
{
JobName: "job-a",
LastErrorAt: timePtr(time.Now().UTC().Add(-1 * time.Minute)),
LastError: stringPtr("boom"),
},
},
}
score := computeDashboardHealthScore(time.Now().UTC(), ov)
require.Less(t, score, 80)
require.GreaterOrEqual(t, score, 0)
}
func TestComputeDashboardHealthScore_Comprehensive(t *testing.T) {
t.Parallel()
tests := []struct {
name string
overview *OpsDashboardOverview
wantMin int
wantMax int
}{
{
name: "nil overview returns 0",
overview: nil,
wantMin: 0,
wantMax: 0,
},
{
name: "perfect health",
overview: &OpsDashboardOverview{
RequestCountTotal: 1000,
RequestCountSLA: 1000,
SLA: 1.0,
ErrorRate: 0,
UpstreamErrorRate: 0,
Duration: OpsPercentiles{P99: intPtr(500)},
TTFT: OpsPercentiles{P99: intPtr(100)},
SystemMetrics: &OpsSystemMetricsSnapshot{
DBOK: boolPtr(true),
RedisOK: boolPtr(true),
CPUUsagePercent: float64Ptr(30),
MemoryUsagePercent: float64Ptr(40),
},
},
wantMin: 100,
wantMax: 100,
},
{
name: "good health - SLA 99.8%",
overview: &OpsDashboardOverview{
RequestCountTotal: 1000,
RequestCountSLA: 1000,
SLA: 0.998,
ErrorRate: 0.003,
UpstreamErrorRate: 0.001,
Duration: OpsPercentiles{P99: intPtr(800)},
TTFT: OpsPercentiles{P99: intPtr(200)},
SystemMetrics: &OpsSystemMetricsSnapshot{
DBOK: boolPtr(true),
RedisOK: boolPtr(true),
CPUUsagePercent: float64Ptr(50),
MemoryUsagePercent: float64Ptr(60),
},
},
wantMin: 95,
wantMax: 100,
},
{
name: "medium health - SLA 96%",
overview: &OpsDashboardOverview{
RequestCountTotal: 1000,
RequestCountSLA: 1000,
SLA: 0.96,
ErrorRate: 0.02,
UpstreamErrorRate: 0.01,
Duration: OpsPercentiles{P99: intPtr(3000)},
TTFT: OpsPercentiles{P99: intPtr(600)},
SystemMetrics: &OpsSystemMetricsSnapshot{
DBOK: boolPtr(true),
RedisOK: boolPtr(true),
CPUUsagePercent: float64Ptr(70),
MemoryUsagePercent: float64Ptr(75),
},
},
wantMin: 96,
wantMax: 97,
},
{
name: "DB failure",
overview: &OpsDashboardOverview{
RequestCountTotal: 1000,
RequestCountSLA: 1000,
SLA: 0.995,
ErrorRate: 0,
UpstreamErrorRate: 0,
Duration: OpsPercentiles{P99: intPtr(500)},
SystemMetrics: &OpsSystemMetricsSnapshot{
DBOK: boolPtr(false),
RedisOK: boolPtr(true),
CPUUsagePercent: float64Ptr(30),
MemoryUsagePercent: float64Ptr(40),
},
},
wantMin: 70,
wantMax: 90,
},
{
name: "Redis failure",
overview: &OpsDashboardOverview{
RequestCountTotal: 1000,
RequestCountSLA: 1000,
SLA: 0.995,
ErrorRate: 0,
UpstreamErrorRate: 0,
Duration: OpsPercentiles{P99: intPtr(500)},
SystemMetrics: &OpsSystemMetricsSnapshot{
DBOK: boolPtr(true),
RedisOK: boolPtr(false),
CPUUsagePercent: float64Ptr(30),
MemoryUsagePercent: float64Ptr(40),
},
},
wantMin: 85,
wantMax: 95,
},
{
name: "high CPU usage",
overview: &OpsDashboardOverview{
RequestCountTotal: 1000,
RequestCountSLA: 1000,
SLA: 0.995,
ErrorRate: 0,
UpstreamErrorRate: 0,
Duration: OpsPercentiles{P99: intPtr(500)},
SystemMetrics: &OpsSystemMetricsSnapshot{
DBOK: boolPtr(true),
RedisOK: boolPtr(true),
CPUUsagePercent: float64Ptr(95),
MemoryUsagePercent: float64Ptr(40),
},
},
wantMin: 85,
wantMax: 100,
},
{
name: "combined failures - business degraded + infra healthy",
overview: &OpsDashboardOverview{
RequestCountTotal: 1000,
RequestCountSLA: 1000,
SLA: 0.90,
ErrorRate: 0.05,
UpstreamErrorRate: 0.02,
Duration: OpsPercentiles{P99: intPtr(10000)},
SystemMetrics: &OpsSystemMetricsSnapshot{
DBOK: boolPtr(true),
RedisOK: boolPtr(true),
CPUUsagePercent: float64Ptr(20),
MemoryUsagePercent: float64Ptr(30),
},
},
wantMin: 84,
wantMax: 85,
},
{
name: "combined failures - business healthy + infra degraded",
overview: &OpsDashboardOverview{
RequestCountTotal: 1000,
RequestCountSLA: 1000,
SLA: 0.998,
ErrorRate: 0.001,
UpstreamErrorRate: 0,
Duration: OpsPercentiles{P99: intPtr(600)},
SystemMetrics: &OpsSystemMetricsSnapshot{
DBOK: boolPtr(false),
RedisOK: boolPtr(false),
CPUUsagePercent: float64Ptr(95),
MemoryUsagePercent: float64Ptr(95),
},
},
wantMin: 70,
wantMax: 90,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
score := computeDashboardHealthScore(time.Now().UTC(), tt.overview)
require.GreaterOrEqual(t, score, tt.wantMin, "score should be >= %d", tt.wantMin)
require.LessOrEqual(t, score, tt.wantMax, "score should be <= %d", tt.wantMax)
require.GreaterOrEqual(t, score, 0, "score must be >= 0")
require.LessOrEqual(t, score, 100, "score must be <= 100")
})
}
}
func TestComputeBusinessHealth(t *testing.T) {
t.Parallel()
tests := []struct {
name string
overview *OpsDashboardOverview
wantMin float64
wantMax float64
}{
{
name: "perfect metrics",
overview: &OpsDashboardOverview{
SLA: 1.0,
ErrorRate: 0,
UpstreamErrorRate: 0,
Duration: OpsPercentiles{P99: intPtr(500)},
},
wantMin: 100,
wantMax: 100,
},
{
name: "SLA boundary 99.5%",
overview: &OpsDashboardOverview{
SLA: 0.995,
ErrorRate: 0,
UpstreamErrorRate: 0,
Duration: OpsPercentiles{P99: intPtr(500)},
},
wantMin: 100,
wantMax: 100,
},
{
name: "SLA boundary 95%",
overview: &OpsDashboardOverview{
SLA: 0.95,
ErrorRate: 0,
UpstreamErrorRate: 0,
Duration: OpsPercentiles{P99: intPtr(500)},
},
wantMin: 100,
wantMax: 100,
},
{
name: "error rate boundary 1%",
overview: &OpsDashboardOverview{
SLA: 0.99,
ErrorRate: 0.01,
UpstreamErrorRate: 0,
Duration: OpsPercentiles{P99: intPtr(500)},
},
wantMin: 100,
wantMax: 100,
},
{
name: "error rate 5%",
overview: &OpsDashboardOverview{
SLA: 0.95,
ErrorRate: 0.05,
UpstreamErrorRate: 0,
Duration: OpsPercentiles{P99: intPtr(500)},
},
wantMin: 77,
wantMax: 78,
},
{
name: "TTFT boundary 2s",
overview: &OpsDashboardOverview{
SLA: 0.99,
ErrorRate: 0,
UpstreamErrorRate: 0,
TTFT: OpsPercentiles{P99: intPtr(2000)},
},
wantMin: 75,
wantMax: 75,
},
{
name: "upstream error dominates",
overview: &OpsDashboardOverview{
SLA: 0.995,
ErrorRate: 0.001,
UpstreamErrorRate: 0.03,
Duration: OpsPercentiles{P99: intPtr(500)},
},
wantMin: 88,
wantMax: 90,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
score := computeBusinessHealth(tt.overview)
require.GreaterOrEqual(t, score, tt.wantMin, "score should be >= %.1f", tt.wantMin)
require.LessOrEqual(t, score, tt.wantMax, "score should be <= %.1f", tt.wantMax)
require.GreaterOrEqual(t, score, 0.0, "score must be >= 0")
require.LessOrEqual(t, score, 100.0, "score must be <= 100")
})
}
}
func TestComputeInfraHealth(t *testing.T) {
t.Parallel()
now := time.Now().UTC()
tests := []struct {
name string
overview *OpsDashboardOverview
wantMin float64
wantMax float64
}{
{
name: "all infrastructure healthy",
overview: &OpsDashboardOverview{
RequestCountTotal: 1000,
SystemMetrics: &OpsSystemMetricsSnapshot{
DBOK: boolPtr(true),
RedisOK: boolPtr(true),
CPUUsagePercent: float64Ptr(30),
MemoryUsagePercent: float64Ptr(40),
},
},
wantMin: 100,
wantMax: 100,
},
{
name: "DB down",
overview: &OpsDashboardOverview{
RequestCountTotal: 1000,
SystemMetrics: &OpsSystemMetricsSnapshot{
DBOK: boolPtr(false),
RedisOK: boolPtr(true),
CPUUsagePercent: float64Ptr(30),
MemoryUsagePercent: float64Ptr(40),
},
},
wantMin: 50,
wantMax: 70,
},
{
name: "Redis down",
overview: &OpsDashboardOverview{
RequestCountTotal: 1000,
SystemMetrics: &OpsSystemMetricsSnapshot{
DBOK: boolPtr(true),
RedisOK: boolPtr(false),
CPUUsagePercent: float64Ptr(30),
MemoryUsagePercent: float64Ptr(40),
},
},
wantMin: 80,
wantMax: 95,
},
{
name: "CPU at 90%",
overview: &OpsDashboardOverview{
RequestCountTotal: 1000,
SystemMetrics: &OpsSystemMetricsSnapshot{
DBOK: boolPtr(true),
RedisOK: boolPtr(true),
CPUUsagePercent: float64Ptr(90),
MemoryUsagePercent: float64Ptr(40),
},
},
wantMin: 85,
wantMax: 95,
},
{
name: "failed background job",
overview: &OpsDashboardOverview{
RequestCountTotal: 1000,
SystemMetrics: &OpsSystemMetricsSnapshot{
DBOK: boolPtr(true),
RedisOK: boolPtr(true),
CPUUsagePercent: float64Ptr(30),
MemoryUsagePercent: float64Ptr(40),
},
JobHeartbeats: []*OpsJobHeartbeat{
{
JobName: "test-job",
LastErrorAt: &now,
},
},
},
wantMin: 70,
wantMax: 90,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
score := computeInfraHealth(now, tt.overview)
require.GreaterOrEqual(t, score, tt.wantMin, "score should be >= %.1f", tt.wantMin)
require.LessOrEqual(t, score, tt.wantMax, "score should be <= %.1f", tt.wantMax)
require.GreaterOrEqual(t, score, 0.0, "score must be >= 0")
require.LessOrEqual(t, score, 100.0, "score must be <= 100")
})
}
}
func timePtr(v time.Time) *time.Time { return &v }
func stringPtr(v string) *string { return &v }
package service
import (
"context"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
func (s *OpsService) GetLatencyHistogram(ctx context.Context, filter *OpsDashboardFilter) (*OpsLatencyHistogramResponse, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if filter == nil {
return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required")
}
if filter.StartTime.IsZero() || filter.EndTime.IsZero() {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required")
}
if filter.StartTime.After(filter.EndTime) {
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time")
}
return s.opsRepo.GetLatencyHistogram(ctx, filter)
}
package service
import (
"context"
"database/sql"
"errors"
"fmt"
"log"
"math"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"
"unicode/utf8"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"github.com/shirou/gopsutil/v4/cpu"
"github.com/shirou/gopsutil/v4/mem"
)
const (
opsMetricsCollectorJobName = "ops_metrics_collector"
opsMetricsCollectorMinInterval = 60 * time.Second
opsMetricsCollectorMaxInterval = 1 * time.Hour
opsMetricsCollectorTimeout = 10 * time.Second
opsMetricsCollectorLeaderLockKey = "ops:metrics:collector:leader"
opsMetricsCollectorLeaderLockTTL = 90 * time.Second
opsMetricsCollectorHeartbeatTimeout = 2 * time.Second
bytesPerMB = 1024 * 1024
)
var opsMetricsCollectorAdvisoryLockID = hashAdvisoryLockID(opsMetricsCollectorLeaderLockKey)
type OpsMetricsCollector struct {
opsRepo OpsRepository
settingRepo SettingRepository
cfg *config.Config
accountRepo AccountRepository
concurrencyService *ConcurrencyService
db *sql.DB
redisClient *redis.Client
instanceID string
lastCgroupCPUUsageNanos uint64
lastCgroupCPUSampleAt time.Time
stopCh chan struct{}
startOnce sync.Once
stopOnce sync.Once
skipLogMu sync.Mutex
skipLogAt time.Time
}
func NewOpsMetricsCollector(
opsRepo OpsRepository,
settingRepo SettingRepository,
accountRepo AccountRepository,
concurrencyService *ConcurrencyService,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
) *OpsMetricsCollector {
return &OpsMetricsCollector{
opsRepo: opsRepo,
settingRepo: settingRepo,
cfg: cfg,
accountRepo: accountRepo,
concurrencyService: concurrencyService,
db: db,
redisClient: redisClient,
instanceID: uuid.NewString(),
}
}
func (c *OpsMetricsCollector) Start() {
if c == nil {
return
}
c.startOnce.Do(func() {
if c.stopCh == nil {
c.stopCh = make(chan struct{})
}
go c.run()
})
}
func (c *OpsMetricsCollector) Stop() {
if c == nil {
return
}
c.stopOnce.Do(func() {
if c.stopCh != nil {
close(c.stopCh)
}
})
}
func (c *OpsMetricsCollector) run() {
// First run immediately so the dashboard has data soon after startup.
c.collectOnce()
for {
interval := c.getInterval()
timer := time.NewTimer(interval)
select {
case <-timer.C:
c.collectOnce()
case <-c.stopCh:
timer.Stop()
return
}
}
}
func (c *OpsMetricsCollector) getInterval() time.Duration {
interval := opsMetricsCollectorMinInterval
if c.settingRepo == nil {
return interval
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
raw, err := c.settingRepo.GetValue(ctx, SettingKeyOpsMetricsIntervalSeconds)
if err != nil {
return interval
}
raw = strings.TrimSpace(raw)
if raw == "" {
return interval
}
seconds, err := strconv.Atoi(raw)
if err != nil {
return interval
}
if seconds < int(opsMetricsCollectorMinInterval.Seconds()) {
seconds = int(opsMetricsCollectorMinInterval.Seconds())
}
if seconds > int(opsMetricsCollectorMaxInterval.Seconds()) {
seconds = int(opsMetricsCollectorMaxInterval.Seconds())
}
return time.Duration(seconds) * time.Second
}
func (c *OpsMetricsCollector) collectOnce() {
if c == nil {
return
}
if c.cfg != nil && !c.cfg.Ops.Enabled {
return
}
if c.opsRepo == nil {
return
}
if c.db == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), opsMetricsCollectorTimeout)
defer cancel()
if !c.isMonitoringEnabled(ctx) {
return
}
release, ok := c.tryAcquireLeaderLock(ctx)
if !ok {
return
}
if release != nil {
defer release()
}
startedAt := time.Now().UTC()
err := c.collectAndPersist(ctx)
finishedAt := time.Now().UTC()
durationMs := finishedAt.Sub(startedAt).Milliseconds()
dur := durationMs
runAt := startedAt
if err != nil {
msg := truncateString(err.Error(), 2048)
errAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), opsMetricsCollectorHeartbeatTimeout)
defer hbCancel()
_ = c.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsMetricsCollectorJobName,
LastRunAt: &runAt,
LastErrorAt: &errAt,
LastError: &msg,
LastDurationMs: &dur,
})
log.Printf("[OpsMetricsCollector] collect failed: %v", err)
return
}
successAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), opsMetricsCollectorHeartbeatTimeout)
defer hbCancel()
_ = c.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsMetricsCollectorJobName,
LastRunAt: &runAt,
LastSuccessAt: &successAt,
LastDurationMs: &dur,
})
}
func (c *OpsMetricsCollector) isMonitoringEnabled(ctx context.Context) bool {
if c == nil {
return false
}
if c.cfg != nil && !c.cfg.Ops.Enabled {
return false
}
if c.settingRepo == nil {
return true
}
if ctx == nil {
ctx = context.Background()
}
value, err := c.settingRepo.GetValue(ctx, SettingKeyOpsMonitoringEnabled)
if err != nil {
if errors.Is(err, ErrSettingNotFound) {
return true
}
// Fail-open: collector should not become a hard dependency.
return true
}
switch strings.ToLower(strings.TrimSpace(value)) {
case "false", "0", "off", "disabled":
return false
default:
return true
}
}
func (c *OpsMetricsCollector) collectAndPersist(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
}
// Align to stable minute boundaries to avoid partial buckets and to maximize cache hits.
now := time.Now().UTC()
windowEnd := now.Truncate(time.Minute)
windowStart := windowEnd.Add(-1 * time.Minute)
sys, err := c.collectSystemStats(ctx)
if err != nil {
// Continue; system stats are best-effort.
log.Printf("[OpsMetricsCollector] system stats error: %v", err)
}
dbOK := c.checkDB(ctx)
redisOK := c.checkRedis(ctx)
active, idle := c.dbPoolStats()
redisTotal, redisIdle, redisStatsOK := c.redisPoolStats()
successCount, tokenConsumed, err := c.queryUsageCounts(ctx, windowStart, windowEnd)
if err != nil {
return fmt.Errorf("query usage counts: %w", err)
}
duration, ttft, err := c.queryUsageLatency(ctx, windowStart, windowEnd)
if err != nil {
return fmt.Errorf("query usage latency: %w", err)
}
errorTotal, businessLimited, errorSLA, upstreamExcl, upstream429, upstream529, err := c.queryErrorCounts(ctx, windowStart, windowEnd)
if err != nil {
return fmt.Errorf("query error counts: %w", err)
}
windowSeconds := windowEnd.Sub(windowStart).Seconds()
if windowSeconds <= 0 {
windowSeconds = 60
}
requestTotal := successCount + errorTotal
qps := float64(requestTotal) / windowSeconds
tps := float64(tokenConsumed) / windowSeconds
goroutines := runtime.NumGoroutine()
concurrencyQueueDepth := c.collectConcurrencyQueueDepth(ctx)
input := &OpsInsertSystemMetricsInput{
CreatedAt: windowEnd,
WindowMinutes: 1,
SuccessCount: successCount,
ErrorCountTotal: errorTotal,
BusinessLimitedCount: businessLimited,
ErrorCountSLA: errorSLA,
UpstreamErrorCountExcl429529: upstreamExcl,
Upstream429Count: upstream429,
Upstream529Count: upstream529,
TokenConsumed: tokenConsumed,
QPS: float64Ptr(roundTo1DP(qps)),
TPS: float64Ptr(roundTo1DP(tps)),
DurationP50Ms: duration.p50,
DurationP90Ms: duration.p90,
DurationP95Ms: duration.p95,
DurationP99Ms: duration.p99,
DurationAvgMs: duration.avg,
DurationMaxMs: duration.max,
TTFTP50Ms: ttft.p50,
TTFTP90Ms: ttft.p90,
TTFTP95Ms: ttft.p95,
TTFTP99Ms: ttft.p99,
TTFTAvgMs: ttft.avg,
TTFTMaxMs: ttft.max,
CPUUsagePercent: sys.cpuUsagePercent,
MemoryUsedMB: sys.memoryUsedMB,
MemoryTotalMB: sys.memoryTotalMB,
MemoryUsagePercent: sys.memoryUsagePercent,
DBOK: boolPtr(dbOK),
RedisOK: boolPtr(redisOK),
RedisConnTotal: func() *int {
if !redisStatsOK {
return nil
}
return intPtr(redisTotal)
}(),
RedisConnIdle: func() *int {
if !redisStatsOK {
return nil
}
return intPtr(redisIdle)
}(),
DBConnActive: intPtr(active),
DBConnIdle: intPtr(idle),
GoroutineCount: intPtr(goroutines),
ConcurrencyQueueDepth: concurrencyQueueDepth,
}
return c.opsRepo.InsertSystemMetrics(ctx, input)
}
func (c *OpsMetricsCollector) collectConcurrencyQueueDepth(parentCtx context.Context) *int {
if c == nil || c.accountRepo == nil || c.concurrencyService == nil {
return nil
}
if parentCtx == nil {
parentCtx = context.Background()
}
// Best-effort: never let concurrency sampling break the metrics collector.
ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second)
defer cancel()
accounts, err := c.accountRepo.ListSchedulable(ctx)
if err != nil {
return nil
}
if len(accounts) == 0 {
zero := 0
return &zero
}
batch := make([]AccountWithConcurrency, 0, len(accounts))
for _, acc := range accounts {
if acc.ID <= 0 {
continue
}
maxConc := acc.Concurrency
if maxConc < 0 {
maxConc = 0
}
batch = append(batch, AccountWithConcurrency{
ID: acc.ID,
MaxConcurrency: maxConc,
})
}
if len(batch) == 0 {
zero := 0
return &zero
}
loadMap, err := c.concurrencyService.GetAccountsLoadBatch(ctx, batch)
if err != nil {
return nil
}
var total int64
for _, info := range loadMap {
if info == nil || info.WaitingCount <= 0 {
continue
}
total += int64(info.WaitingCount)
}
if total < 0 {
total = 0
}
maxInt := int64(^uint(0) >> 1)
if total > maxInt {
total = maxInt
}
v := int(total)
return &v
}
type opsCollectedPercentiles struct {
p50 *int
p90 *int
p95 *int
p99 *int
avg *float64
max *int
}
func (c *OpsMetricsCollector) queryUsageCounts(ctx context.Context, start, end time.Time) (successCount int64, tokenConsumed int64, err error) {
q := `
SELECT
COALESCE(COUNT(*), 0) AS success_count,
COALESCE(SUM(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens), 0) AS token_consumed
FROM usage_logs
WHERE created_at >= $1 AND created_at < $2`
var tokens sql.NullInt64
if err := c.db.QueryRowContext(ctx, q, start, end).Scan(&successCount, &tokens); err != nil {
return 0, 0, err
}
if tokens.Valid {
tokenConsumed = tokens.Int64
}
return successCount, tokenConsumed, nil
}
func (c *OpsMetricsCollector) queryUsageLatency(ctx context.Context, start, end time.Time) (duration opsCollectedPercentiles, ttft opsCollectedPercentiles, err error) {
{
q := `
SELECT
percentile_cont(0.50) WITHIN GROUP (ORDER BY duration_ms) AS p50,
percentile_cont(0.90) WITHIN GROUP (ORDER BY duration_ms) AS p90,
percentile_cont(0.95) WITHIN GROUP (ORDER BY duration_ms) AS p95,
percentile_cont(0.99) WITHIN GROUP (ORDER BY duration_ms) AS p99,
AVG(duration_ms) AS avg_ms,
MAX(duration_ms) AS max_ms
FROM usage_logs
WHERE created_at >= $1 AND created_at < $2
AND duration_ms IS NOT NULL`
var p50, p90, p95, p99 sql.NullFloat64
var avg sql.NullFloat64
var max sql.NullInt64
if err := c.db.QueryRowContext(ctx, q, start, end).Scan(&p50, &p90, &p95, &p99, &avg, &max); err != nil {
return opsCollectedPercentiles{}, opsCollectedPercentiles{}, err
}
duration.p50 = floatToIntPtr(p50)
duration.p90 = floatToIntPtr(p90)
duration.p95 = floatToIntPtr(p95)
duration.p99 = floatToIntPtr(p99)
if avg.Valid {
v := roundTo1DP(avg.Float64)
duration.avg = &v
}
if max.Valid {
v := int(max.Int64)
duration.max = &v
}
}
{
q := `
SELECT
percentile_cont(0.50) WITHIN GROUP (ORDER BY first_token_ms) AS p50,
percentile_cont(0.90) WITHIN GROUP (ORDER BY first_token_ms) AS p90,
percentile_cont(0.95) WITHIN GROUP (ORDER BY first_token_ms) AS p95,
percentile_cont(0.99) WITHIN GROUP (ORDER BY first_token_ms) AS p99,
AVG(first_token_ms) AS avg_ms,
MAX(first_token_ms) AS max_ms
FROM usage_logs
WHERE created_at >= $1 AND created_at < $2
AND first_token_ms IS NOT NULL`
var p50, p90, p95, p99 sql.NullFloat64
var avg sql.NullFloat64
var max sql.NullInt64
if err := c.db.QueryRowContext(ctx, q, start, end).Scan(&p50, &p90, &p95, &p99, &avg, &max); err != nil {
return opsCollectedPercentiles{}, opsCollectedPercentiles{}, err
}
ttft.p50 = floatToIntPtr(p50)
ttft.p90 = floatToIntPtr(p90)
ttft.p95 = floatToIntPtr(p95)
ttft.p99 = floatToIntPtr(p99)
if avg.Valid {
v := roundTo1DP(avg.Float64)
ttft.avg = &v
}
if max.Valid {
v := int(max.Int64)
ttft.max = &v
}
}
return duration, ttft, nil
}
func (c *OpsMetricsCollector) queryErrorCounts(ctx context.Context, start, end time.Time) (
errorTotal int64,
businessLimited int64,
errorSLA int64,
upstreamExcl429529 int64,
upstream429 int64,
upstream529 int64,
err error,
) {
q := `
SELECT
COALESCE(COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400), 0) AS error_total,
COALESCE(COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400 AND is_business_limited), 0) AS business_limited,
COALESCE(COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400 AND NOT is_business_limited), 0) AS error_sla,
COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) NOT IN (429, 529)), 0) AS upstream_excl,
COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 429), 0) AS upstream_429,
COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 529), 0) AS upstream_529
FROM ops_error_logs
WHERE created_at >= $1 AND created_at < $2`
if err := c.db.QueryRowContext(ctx, q, start, end).Scan(
&errorTotal,
&businessLimited,
&errorSLA,
&upstreamExcl429529,
&upstream429,
&upstream529,
); err != nil {
return 0, 0, 0, 0, 0, 0, err
}
return errorTotal, businessLimited, errorSLA, upstreamExcl429529, upstream429, upstream529, nil
}
type opsCollectedSystemStats struct {
cpuUsagePercent *float64
memoryUsedMB *int64
memoryTotalMB *int64
memoryUsagePercent *float64
}
func (c *OpsMetricsCollector) collectSystemStats(ctx context.Context) (*opsCollectedSystemStats, error) {
out := &opsCollectedSystemStats{}
if ctx == nil {
ctx = context.Background()
}
sampleAt := time.Now().UTC()
// Prefer cgroup (container) metrics when available.
if cpuPct := c.tryCgroupCPUPercent(sampleAt); cpuPct != nil {
out.cpuUsagePercent = cpuPct
}
cgroupUsed, cgroupTotal, cgroupOK := readCgroupMemoryBytes()
if cgroupOK {
usedMB := int64(cgroupUsed / bytesPerMB)
out.memoryUsedMB = &usedMB
if cgroupTotal > 0 {
totalMB := int64(cgroupTotal / bytesPerMB)
out.memoryTotalMB = &totalMB
pct := roundTo1DP(float64(cgroupUsed) / float64(cgroupTotal) * 100)
out.memoryUsagePercent = &pct
}
}
// Fallback to host metrics if cgroup metrics are unavailable (or incomplete).
if out.cpuUsagePercent == nil {
if cpuPercents, err := cpu.PercentWithContext(ctx, 0, false); err == nil && len(cpuPercents) > 0 {
v := roundTo1DP(cpuPercents[0])
out.cpuUsagePercent = &v
}
}
// If total memory isn't available from cgroup (e.g. memory.max = "max"), fill total from host.
if out.memoryUsedMB == nil || out.memoryTotalMB == nil || out.memoryUsagePercent == nil {
if vm, err := mem.VirtualMemoryWithContext(ctx); err == nil && vm != nil {
if out.memoryUsedMB == nil {
usedMB := int64(vm.Used / bytesPerMB)
out.memoryUsedMB = &usedMB
}
if out.memoryTotalMB == nil {
totalMB := int64(vm.Total / bytesPerMB)
out.memoryTotalMB = &totalMB
}
if out.memoryUsagePercent == nil {
if out.memoryUsedMB != nil && out.memoryTotalMB != nil && *out.memoryTotalMB > 0 {
pct := roundTo1DP(float64(*out.memoryUsedMB) / float64(*out.memoryTotalMB) * 100)
out.memoryUsagePercent = &pct
} else {
pct := roundTo1DP(vm.UsedPercent)
out.memoryUsagePercent = &pct
}
}
}
}
return out, nil
}
func (c *OpsMetricsCollector) tryCgroupCPUPercent(now time.Time) *float64 {
usageNanos, ok := readCgroupCPUUsageNanos()
if !ok {
return nil
}
// Initialize baseline sample.
if c.lastCgroupCPUSampleAt.IsZero() {
c.lastCgroupCPUUsageNanos = usageNanos
c.lastCgroupCPUSampleAt = now
return nil
}
elapsed := now.Sub(c.lastCgroupCPUSampleAt)
if elapsed <= 0 {
c.lastCgroupCPUUsageNanos = usageNanos
c.lastCgroupCPUSampleAt = now
return nil
}
prev := c.lastCgroupCPUUsageNanos
c.lastCgroupCPUUsageNanos = usageNanos
c.lastCgroupCPUSampleAt = now
if usageNanos < prev {
// Counter reset (container restarted).
return nil
}
deltaUsageSec := float64(usageNanos-prev) / 1e9
elapsedSec := elapsed.Seconds()
if elapsedSec <= 0 {
return nil
}
cores := readCgroupCPULimitCores()
if cores <= 0 {
// Can't reliably normalize; skip and fall back to gopsutil.
return nil
}
pct := (deltaUsageSec / (elapsedSec * cores)) * 100
if pct < 0 {
pct = 0
}
// Clamp to avoid noise/jitter showing impossible values.
if pct > 100 {
pct = 100
}
v := roundTo1DP(pct)
return &v
}
func readCgroupMemoryBytes() (usedBytes uint64, totalBytes uint64, ok bool) {
// cgroup v2 (most common in modern containers)
if used, ok1 := readUintFile("/sys/fs/cgroup/memory.current"); ok1 {
usedBytes = used
rawMax, err := os.ReadFile("/sys/fs/cgroup/memory.max")
if err == nil {
s := strings.TrimSpace(string(rawMax))
if s != "" && s != "max" {
if v, err := strconv.ParseUint(s, 10, 64); err == nil {
totalBytes = v
}
}
}
return usedBytes, totalBytes, true
}
// cgroup v1 fallback
if used, ok1 := readUintFile("/sys/fs/cgroup/memory/memory.usage_in_bytes"); ok1 {
usedBytes = used
if limit, ok2 := readUintFile("/sys/fs/cgroup/memory/memory.limit_in_bytes"); ok2 {
// Some environments report a very large number when unlimited.
if limit > 0 && limit < (1<<60) {
totalBytes = limit
}
}
return usedBytes, totalBytes, true
}
return 0, 0, false
}
func readCgroupCPUUsageNanos() (usageNanos uint64, ok bool) {
// cgroup v2: cpu.stat has usage_usec
if raw, err := os.ReadFile("/sys/fs/cgroup/cpu.stat"); err == nil {
lines := strings.Split(string(raw), "\n")
for _, line := range lines {
fields := strings.Fields(line)
if len(fields) != 2 {
continue
}
if fields[0] != "usage_usec" {
continue
}
v, err := strconv.ParseUint(fields[1], 10, 64)
if err != nil {
continue
}
return v * 1000, true
}
}
// cgroup v1: cpuacct.usage is in nanoseconds
if v, ok := readUintFile("/sys/fs/cgroup/cpuacct/cpuacct.usage"); ok {
return v, true
}
return 0, false
}
func readCgroupCPULimitCores() float64 {
// cgroup v2: cpu.max => "<quota> <period>" or "max <period>"
if raw, err := os.ReadFile("/sys/fs/cgroup/cpu.max"); err == nil {
fields := strings.Fields(string(raw))
if len(fields) >= 2 && fields[0] != "max" {
quota, err1 := strconv.ParseFloat(fields[0], 64)
period, err2 := strconv.ParseFloat(fields[1], 64)
if err1 == nil && err2 == nil && quota > 0 && period > 0 {
return quota / period
}
}
}
// cgroup v1: cpu.cfs_quota_us / cpu.cfs_period_us
quota, okQuota := readIntFile("/sys/fs/cgroup/cpu/cpu.cfs_quota_us")
period, okPeriod := readIntFile("/sys/fs/cgroup/cpu/cpu.cfs_period_us")
if okQuota && okPeriod && quota > 0 && period > 0 {
return float64(quota) / float64(period)
}
return 0
}
func readUintFile(path string) (uint64, bool) {
raw, err := os.ReadFile(path)
if err != nil {
return 0, false
}
s := strings.TrimSpace(string(raw))
if s == "" {
return 0, false
}
v, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return 0, false
}
return v, true
}
func readIntFile(path string) (int64, bool) {
raw, err := os.ReadFile(path)
if err != nil {
return 0, false
}
s := strings.TrimSpace(string(raw))
if s == "" {
return 0, false
}
v, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0, false
}
return v, true
}
func (c *OpsMetricsCollector) checkDB(ctx context.Context) bool {
if c == nil || c.db == nil {
return false
}
if ctx == nil {
ctx = context.Background()
}
var one int
if err := c.db.QueryRowContext(ctx, "SELECT 1").Scan(&one); err != nil {
return false
}
return one == 1
}
func (c *OpsMetricsCollector) checkRedis(ctx context.Context) bool {
if c == nil || c.redisClient == nil {
return false
}
if ctx == nil {
ctx = context.Background()
}
return c.redisClient.Ping(ctx).Err() == nil
}
func (c *OpsMetricsCollector) redisPoolStats() (total int, idle int, ok bool) {
if c == nil || c.redisClient == nil {
return 0, 0, false
}
stats := c.redisClient.PoolStats()
if stats == nil {
return 0, 0, false
}
return int(stats.TotalConns), int(stats.IdleConns), true
}
func (c *OpsMetricsCollector) dbPoolStats() (active int, idle int) {
if c == nil || c.db == nil {
return 0, 0
}
stats := c.db.Stats()
return stats.InUse, stats.Idle
}
var opsMetricsCollectorReleaseScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`)
func (c *OpsMetricsCollector) tryAcquireLeaderLock(ctx context.Context) (func(), bool) {
if c == nil || c.redisClient == nil {
return nil, true
}
if ctx == nil {
ctx = context.Background()
}
ok, err := c.redisClient.SetNX(ctx, opsMetricsCollectorLeaderLockKey, c.instanceID, opsMetricsCollectorLeaderLockTTL).Result()
if err != nil {
// Prefer fail-closed to avoid stampeding the database when Redis is flaky.
// Fallback to a DB advisory lock when Redis is present but unavailable.
release, ok := tryAcquireDBAdvisoryLock(ctx, c.db, opsMetricsCollectorAdvisoryLockID)
if !ok {
c.maybeLogSkip()
return nil, false
}
return release, true
}
if !ok {
c.maybeLogSkip()
return nil, false
}
release := func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, _ = opsMetricsCollectorReleaseScript.Run(ctx, c.redisClient, []string{opsMetricsCollectorLeaderLockKey}, c.instanceID).Result()
}
return release, true
}
func (c *OpsMetricsCollector) maybeLogSkip() {
c.skipLogMu.Lock()
defer c.skipLogMu.Unlock()
now := time.Now()
if !c.skipLogAt.IsZero() && now.Sub(c.skipLogAt) < time.Minute {
return
}
c.skipLogAt = now
log.Printf("[OpsMetricsCollector] leader lock held by another instance; skipping")
}
func floatToIntPtr(v sql.NullFloat64) *int {
if !v.Valid {
return nil
}
n := int(math.Round(v.Float64))
return &n
}
func roundTo1DP(v float64) float64 {
return math.Round(v*10) / 10
}
func truncateString(s string, max int) string {
if max <= 0 {
return ""
}
if len(s) <= max {
return s
}
cut := s[:max]
for len(cut) > 0 && !utf8.ValidString(cut) {
cut = cut[:len(cut)-1]
}
return cut
}
func boolPtr(v bool) *bool {
out := v
return &out
}
func intPtr(v int) *int {
out := v
return &out
}
func float64Ptr(v float64) *float64 {
out := v
return &out
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment