Unverified Commit 3ff2ca8d authored by Wesley Liddick's avatar Wesley Liddick Committed by GitHub
Browse files

Merge pull request #303 from IanShaw027/feature/ops-account-health-score

feat(ops): 运维监控功能增强与优化
parents b36f3db9 b2ff326c
...@@ -296,9 +296,10 @@ INSERT INTO ops_job_heartbeats ( ...@@ -296,9 +296,10 @@ INSERT INTO ops_job_heartbeats (
last_error_at, last_error_at,
last_error, last_error,
last_duration_ms, last_duration_ms,
last_result,
updated_at updated_at
) VALUES ( ) VALUES (
$1,$2,$3,$4,$5,$6,NOW() $1,$2,$3,$4,$5,$6,$7,NOW()
) )
ON CONFLICT (job_name) DO UPDATE SET ON CONFLICT (job_name) DO UPDATE SET
last_run_at = COALESCE(EXCLUDED.last_run_at, ops_job_heartbeats.last_run_at), last_run_at = COALESCE(EXCLUDED.last_run_at, ops_job_heartbeats.last_run_at),
...@@ -312,6 +313,10 @@ ON CONFLICT (job_name) DO UPDATE SET ...@@ -312,6 +313,10 @@ ON CONFLICT (job_name) DO UPDATE SET
ELSE COALESCE(EXCLUDED.last_error, ops_job_heartbeats.last_error) ELSE COALESCE(EXCLUDED.last_error, ops_job_heartbeats.last_error)
END, END,
last_duration_ms = COALESCE(EXCLUDED.last_duration_ms, ops_job_heartbeats.last_duration_ms), last_duration_ms = COALESCE(EXCLUDED.last_duration_ms, ops_job_heartbeats.last_duration_ms),
last_result = CASE
WHEN EXCLUDED.last_success_at IS NOT NULL THEN COALESCE(EXCLUDED.last_result, ops_job_heartbeats.last_result)
ELSE ops_job_heartbeats.last_result
END,
updated_at = NOW()` updated_at = NOW()`
_, err := r.db.ExecContext( _, err := r.db.ExecContext(
...@@ -323,6 +328,7 @@ ON CONFLICT (job_name) DO UPDATE SET ...@@ -323,6 +328,7 @@ ON CONFLICT (job_name) DO UPDATE SET
opsNullTime(input.LastErrorAt), opsNullTime(input.LastErrorAt),
opsNullString(input.LastError), opsNullString(input.LastError),
opsNullInt(input.LastDurationMs), opsNullInt(input.LastDurationMs),
opsNullString(input.LastResult),
) )
return err return err
} }
...@@ -340,6 +346,7 @@ SELECT ...@@ -340,6 +346,7 @@ SELECT
last_error_at, last_error_at,
last_error, last_error,
last_duration_ms, last_duration_ms,
last_result,
updated_at updated_at
FROM ops_job_heartbeats FROM ops_job_heartbeats
ORDER BY job_name ASC` ORDER BY job_name ASC`
...@@ -359,6 +366,8 @@ ORDER BY job_name ASC` ...@@ -359,6 +366,8 @@ ORDER BY job_name ASC`
var lastError sql.NullString var lastError sql.NullString
var lastDuration sql.NullInt64 var lastDuration sql.NullInt64
var lastResult sql.NullString
if err := rows.Scan( if err := rows.Scan(
&item.JobName, &item.JobName,
&lastRun, &lastRun,
...@@ -366,6 +375,7 @@ ORDER BY job_name ASC` ...@@ -366,6 +375,7 @@ ORDER BY job_name ASC`
&lastErrorAt, &lastErrorAt,
&lastError, &lastError,
&lastDuration, &lastDuration,
&lastResult,
&item.UpdatedAt, &item.UpdatedAt,
); err != nil { ); err != nil {
return nil, err return nil, err
...@@ -391,6 +401,10 @@ ORDER BY job_name ASC` ...@@ -391,6 +401,10 @@ ORDER BY job_name ASC`
v := lastDuration.Int64 v := lastDuration.Int64
item.LastDurationMs = &v item.LastDurationMs = &v
} }
if lastResult.Valid {
v := lastResult.String
item.LastResult = &v
}
out = append(out, &item) out = append(out, &item)
} }
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"errors" "errors"
"fmt"
"log" "log"
"strings" "strings"
"sync" "sync"
...@@ -235,11 +236,13 @@ func (s *OpsAggregationService) aggregateHourly() { ...@@ -235,11 +236,13 @@ func (s *OpsAggregationService) aggregateHourly() {
successAt := finishedAt successAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second) hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer hbCancel() defer hbCancel()
result := truncateString(fmt.Sprintf("window=%s..%s", start.Format(time.RFC3339), end.Format(time.RFC3339)), 2048)
_ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{ _ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsAggHourlyJobName, JobName: opsAggHourlyJobName,
LastRunAt: &runAt, LastRunAt: &runAt,
LastSuccessAt: &successAt, LastSuccessAt: &successAt,
LastDurationMs: &dur, LastDurationMs: &dur,
LastResult: &result,
}) })
} }
...@@ -331,11 +334,13 @@ func (s *OpsAggregationService) aggregateDaily() { ...@@ -331,11 +334,13 @@ func (s *OpsAggregationService) aggregateDaily() {
successAt := finishedAt successAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second) hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer hbCancel() defer hbCancel()
result := truncateString(fmt.Sprintf("window=%s..%s", start.Format(time.RFC3339), end.Format(time.RFC3339)), 2048)
_ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{ _ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsAggDailyJobName, JobName: opsAggDailyJobName,
LastRunAt: &runAt, LastRunAt: &runAt,
LastSuccessAt: &successAt, LastSuccessAt: &successAt,
LastDurationMs: &dur, LastDurationMs: &dur,
LastResult: &result,
}) })
} }
......
...@@ -190,6 +190,13 @@ func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) { ...@@ -190,6 +190,13 @@ func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) {
return return
} }
rulesTotal := len(rules)
rulesEnabled := 0
rulesEvaluated := 0
eventsCreated := 0
eventsResolved := 0
emailsSent := 0
now := time.Now().UTC() now := time.Now().UTC()
safeEnd := now.Truncate(time.Minute) safeEnd := now.Truncate(time.Minute)
if safeEnd.IsZero() { if safeEnd.IsZero() {
...@@ -205,6 +212,7 @@ func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) { ...@@ -205,6 +212,7 @@ func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) {
if rule == nil || !rule.Enabled || rule.ID <= 0 { if rule == nil || !rule.Enabled || rule.ID <= 0 {
continue continue
} }
rulesEnabled++
scopePlatform, scopeGroupID, scopeRegion := parseOpsAlertRuleScope(rule.Filters) scopePlatform, scopeGroupID, scopeRegion := parseOpsAlertRuleScope(rule.Filters)
...@@ -220,6 +228,7 @@ func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) { ...@@ -220,6 +228,7 @@ func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) {
s.resetRuleState(rule.ID, now) s.resetRuleState(rule.ID, now)
continue continue
} }
rulesEvaluated++
breachedNow := compareMetric(metricValue, rule.Operator, rule.Threshold) breachedNow := compareMetric(metricValue, rule.Operator, rule.Threshold)
required := requiredSustainedBreaches(rule.SustainedMinutes, interval) required := requiredSustainedBreaches(rule.SustainedMinutes, interval)
...@@ -278,8 +287,11 @@ func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) { ...@@ -278,8 +287,11 @@ func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) {
continue continue
} }
eventsCreated++
if created != nil && created.ID > 0 { if created != nil && created.ID > 0 {
s.maybeSendAlertEmail(ctx, runtimeCfg, rule, created) if s.maybeSendAlertEmail(ctx, runtimeCfg, rule, created) {
emailsSent++
}
} }
continue continue
} }
...@@ -289,11 +301,14 @@ func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) { ...@@ -289,11 +301,14 @@ func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) {
resolvedAt := now resolvedAt := now
if err := s.opsRepo.UpdateAlertEventStatus(ctx, activeEvent.ID, OpsAlertStatusResolved, &resolvedAt); err != nil { if err := s.opsRepo.UpdateAlertEventStatus(ctx, activeEvent.ID, OpsAlertStatusResolved, &resolvedAt); err != nil {
log.Printf("[OpsAlertEvaluator] resolve event failed (event=%d): %v", activeEvent.ID, err) log.Printf("[OpsAlertEvaluator] resolve event failed (event=%d): %v", activeEvent.ID, err)
} else {
eventsResolved++
} }
} }
} }
s.recordHeartbeatSuccess(runAt, time.Since(startedAt)) result := truncateString(fmt.Sprintf("rules=%d enabled=%d evaluated=%d created=%d resolved=%d emails_sent=%d", rulesTotal, rulesEnabled, rulesEvaluated, eventsCreated, eventsResolved, emailsSent), 2048)
s.recordHeartbeatSuccess(runAt, time.Since(startedAt), result)
} }
func (s *OpsAlertEvaluatorService) pruneRuleStates(rules []*OpsAlertRule) { func (s *OpsAlertEvaluatorService) pruneRuleStates(rules []*OpsAlertRule) {
...@@ -585,32 +600,32 @@ func buildOpsAlertDescription(rule *OpsAlertRule, value float64, windowMinutes i ...@@ -585,32 +600,32 @@ func buildOpsAlertDescription(rule *OpsAlertRule, value float64, windowMinutes i
) )
} }
func (s *OpsAlertEvaluatorService) maybeSendAlertEmail(ctx context.Context, runtimeCfg *OpsAlertRuntimeSettings, rule *OpsAlertRule, event *OpsAlertEvent) { func (s *OpsAlertEvaluatorService) maybeSendAlertEmail(ctx context.Context, runtimeCfg *OpsAlertRuntimeSettings, rule *OpsAlertRule, event *OpsAlertEvent) bool {
if s == nil || s.emailService == nil || s.opsService == nil || event == nil || rule == nil { if s == nil || s.emailService == nil || s.opsService == nil || event == nil || rule == nil {
return return false
} }
if event.EmailSent { if event.EmailSent {
return return false
} }
if !rule.NotifyEmail { if !rule.NotifyEmail {
return return false
} }
emailCfg, err := s.opsService.GetEmailNotificationConfig(ctx) emailCfg, err := s.opsService.GetEmailNotificationConfig(ctx)
if err != nil || emailCfg == nil || !emailCfg.Alert.Enabled { if err != nil || emailCfg == nil || !emailCfg.Alert.Enabled {
return return false
} }
if len(emailCfg.Alert.Recipients) == 0 { if len(emailCfg.Alert.Recipients) == 0 {
return return false
} }
if !shouldSendOpsAlertEmailByMinSeverity(strings.TrimSpace(emailCfg.Alert.MinSeverity), strings.TrimSpace(rule.Severity)) { if !shouldSendOpsAlertEmailByMinSeverity(strings.TrimSpace(emailCfg.Alert.MinSeverity), strings.TrimSpace(rule.Severity)) {
return return false
} }
if runtimeCfg != nil && runtimeCfg.Silencing.Enabled { if runtimeCfg != nil && runtimeCfg.Silencing.Enabled {
if isOpsAlertSilenced(time.Now().UTC(), rule, event, runtimeCfg.Silencing) { if isOpsAlertSilenced(time.Now().UTC(), rule, event, runtimeCfg.Silencing) {
return return false
} }
} }
...@@ -639,6 +654,7 @@ func (s *OpsAlertEvaluatorService) maybeSendAlertEmail(ctx context.Context, runt ...@@ -639,6 +654,7 @@ func (s *OpsAlertEvaluatorService) maybeSendAlertEmail(ctx context.Context, runt
if anySent { if anySent {
_ = s.opsRepo.UpdateAlertEventEmailSent(context.Background(), event.ID, true) _ = s.opsRepo.UpdateAlertEventEmailSent(context.Background(), event.ID, true)
} }
return anySent
} }
func buildOpsAlertEmailBody(rule *OpsAlertRule, event *OpsAlertEvent) string { func buildOpsAlertEmailBody(rule *OpsAlertRule, event *OpsAlertEvent) string {
...@@ -806,7 +822,7 @@ func (s *OpsAlertEvaluatorService) maybeLogSkip(key string) { ...@@ -806,7 +822,7 @@ func (s *OpsAlertEvaluatorService) maybeLogSkip(key string) {
log.Printf("[OpsAlertEvaluator] leader lock held by another instance; skipping (key=%q)", key) log.Printf("[OpsAlertEvaluator] leader lock held by another instance; skipping (key=%q)", key)
} }
func (s *OpsAlertEvaluatorService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration) { func (s *OpsAlertEvaluatorService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration, result string) {
if s == nil || s.opsRepo == nil { if s == nil || s.opsRepo == nil {
return return
} }
...@@ -814,11 +830,17 @@ func (s *OpsAlertEvaluatorService) recordHeartbeatSuccess(runAt time.Time, durat ...@@ -814,11 +830,17 @@ func (s *OpsAlertEvaluatorService) recordHeartbeatSuccess(runAt time.Time, durat
durMs := duration.Milliseconds() durMs := duration.Milliseconds()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
msg := strings.TrimSpace(result)
if msg == "" {
msg = "ok"
}
msg = truncateString(msg, 2048)
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ _ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsAlertEvaluatorJobName, JobName: opsAlertEvaluatorJobName,
LastRunAt: &runAt, LastRunAt: &runAt,
LastSuccessAt: &now, LastSuccessAt: &now,
LastDurationMs: &durMs, LastDurationMs: &durMs,
LastResult: &msg,
}) })
} }
......
...@@ -149,7 +149,7 @@ func (s *OpsCleanupService) runScheduled() { ...@@ -149,7 +149,7 @@ func (s *OpsCleanupService) runScheduled() {
log.Printf("[OpsCleanup] cleanup failed: %v", err) log.Printf("[OpsCleanup] cleanup failed: %v", err)
return return
} }
s.recordHeartbeatSuccess(runAt, time.Since(startedAt)) s.recordHeartbeatSuccess(runAt, time.Since(startedAt), counts)
log.Printf("[OpsCleanup] cleanup complete: %s", counts) log.Printf("[OpsCleanup] cleanup complete: %s", counts)
} }
...@@ -330,12 +330,13 @@ func (s *OpsCleanupService) tryAcquireLeaderLock(ctx context.Context) (func(), b ...@@ -330,12 +330,13 @@ func (s *OpsCleanupService) tryAcquireLeaderLock(ctx context.Context) (func(), b
return release, true return release, true
} }
func (s *OpsCleanupService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration) { func (s *OpsCleanupService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration, counts opsCleanupDeletedCounts) {
if s == nil || s.opsRepo == nil { if s == nil || s.opsRepo == nil {
return return
} }
now := time.Now().UTC() now := time.Now().UTC()
durMs := duration.Milliseconds() durMs := duration.Milliseconds()
result := truncateString(counts.String(), 2048)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ _ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
...@@ -343,6 +344,7 @@ func (s *OpsCleanupService) recordHeartbeatSuccess(runAt time.Time, duration tim ...@@ -343,6 +344,7 @@ func (s *OpsCleanupService) recordHeartbeatSuccess(runAt time.Time, duration tim
LastRunAt: &runAt, LastRunAt: &runAt,
LastSuccessAt: &now, LastSuccessAt: &now,
LastDurationMs: &durMs, LastDurationMs: &durMs,
LastResult: &result,
}) })
} }
......
...@@ -235,6 +235,9 @@ type OpsUpsertJobHeartbeatInput struct { ...@@ -235,6 +235,9 @@ type OpsUpsertJobHeartbeatInput struct {
LastErrorAt *time.Time LastErrorAt *time.Time
LastError *string LastError *string
LastDurationMs *int64 LastDurationMs *int64
// LastResult is an optional human-readable summary of the last successful run.
LastResult *string
} }
type OpsJobHeartbeat struct { type OpsJobHeartbeat struct {
...@@ -245,6 +248,7 @@ type OpsJobHeartbeat struct { ...@@ -245,6 +248,7 @@ type OpsJobHeartbeat struct {
LastErrorAt *time.Time `json:"last_error_at"` LastErrorAt *time.Time `json:"last_error_at"`
LastError *string `json:"last_error"` LastError *string `json:"last_error"`
LastDurationMs *int64 `json:"last_duration_ms"` LastDurationMs *int64 `json:"last_duration_ms"`
LastResult *string `json:"last_result"`
UpdatedAt time.Time `json:"updated_at"` UpdatedAt time.Time `json:"updated_at"`
} }
......
...@@ -177,6 +177,10 @@ func (s *OpsScheduledReportService) runOnce() { ...@@ -177,6 +177,10 @@ func (s *OpsScheduledReportService) runOnce() {
return return
} }
reportsTotal := len(reports)
reportsDue := 0
sentAttempts := 0
for _, report := range reports { for _, report := range reports {
if report == nil || !report.Enabled { if report == nil || !report.Enabled {
continue continue
...@@ -184,14 +188,18 @@ func (s *OpsScheduledReportService) runOnce() { ...@@ -184,14 +188,18 @@ func (s *OpsScheduledReportService) runOnce() {
if report.NextRunAt.After(now) { if report.NextRunAt.After(now) {
continue continue
} }
reportsDue++
if err := s.runReport(ctx, report, now); err != nil { attempts, err := s.runReport(ctx, report, now)
if err != nil {
s.recordHeartbeatError(runAt, time.Since(startedAt), err) s.recordHeartbeatError(runAt, time.Since(startedAt), err)
return return
} }
sentAttempts += attempts
} }
s.recordHeartbeatSuccess(runAt, time.Since(startedAt)) result := truncateString(fmt.Sprintf("reports=%d due=%d send_attempts=%d", reportsTotal, reportsDue, sentAttempts), 2048)
s.recordHeartbeatSuccess(runAt, time.Since(startedAt), result)
} }
type opsScheduledReport struct { type opsScheduledReport struct {
...@@ -297,9 +305,9 @@ func (s *OpsScheduledReportService) listScheduledReports(ctx context.Context, no ...@@ -297,9 +305,9 @@ func (s *OpsScheduledReportService) listScheduledReports(ctx context.Context, no
return out return out
} }
func (s *OpsScheduledReportService) runReport(ctx context.Context, report *opsScheduledReport, now time.Time) error { func (s *OpsScheduledReportService) runReport(ctx context.Context, report *opsScheduledReport, now time.Time) (int, error) {
if s == nil || s.opsService == nil || s.emailService == nil || report == nil { if s == nil || s.opsService == nil || s.emailService == nil || report == nil {
return nil return 0, nil
} }
if ctx == nil { if ctx == nil {
ctx = context.Background() ctx = context.Background()
...@@ -310,11 +318,11 @@ func (s *OpsScheduledReportService) runReport(ctx context.Context, report *opsSc ...@@ -310,11 +318,11 @@ func (s *OpsScheduledReportService) runReport(ctx context.Context, report *opsSc
content, err := s.generateReportHTML(ctx, report, now) content, err := s.generateReportHTML(ctx, report, now)
if err != nil { if err != nil {
return err return 0, err
} }
if strings.TrimSpace(content) == "" { if strings.TrimSpace(content) == "" {
// Skip sending when the report decides not to emit content (e.g., digest below min count). // Skip sending when the report decides not to emit content (e.g., digest below min count).
return nil return 0, nil
} }
recipients := report.Recipients recipients := report.Recipients
...@@ -325,22 +333,24 @@ func (s *OpsScheduledReportService) runReport(ctx context.Context, report *opsSc ...@@ -325,22 +333,24 @@ func (s *OpsScheduledReportService) runReport(ctx context.Context, report *opsSc
} }
} }
if len(recipients) == 0 { if len(recipients) == 0 {
return nil return 0, nil
} }
subject := fmt.Sprintf("[Ops Report] %s", strings.TrimSpace(report.Name)) subject := fmt.Sprintf("[Ops Report] %s", strings.TrimSpace(report.Name))
attempts := 0
for _, to := range recipients { for _, to := range recipients {
addr := strings.TrimSpace(to) addr := strings.TrimSpace(to)
if addr == "" { if addr == "" {
continue continue
} }
attempts++
if err := s.emailService.SendEmail(ctx, addr, subject, content); err != nil { if err := s.emailService.SendEmail(ctx, addr, subject, content); err != nil {
// Ignore per-recipient failures; continue best-effort. // Ignore per-recipient failures; continue best-effort.
continue continue
} }
} }
return nil return attempts, nil
} }
func (s *OpsScheduledReportService) generateReportHTML(ctx context.Context, report *opsScheduledReport, now time.Time) (string, error) { func (s *OpsScheduledReportService) generateReportHTML(ctx context.Context, report *opsScheduledReport, now time.Time) (string, error) {
...@@ -650,7 +660,7 @@ func (s *OpsScheduledReportService) setLastRunAt(ctx context.Context, reportType ...@@ -650,7 +660,7 @@ func (s *OpsScheduledReportService) setLastRunAt(ctx context.Context, reportType
_ = s.redisClient.Set(ctx, key, strconv.FormatInt(t.UTC().Unix(), 10), 14*24*time.Hour).Err() _ = s.redisClient.Set(ctx, key, strconv.FormatInt(t.UTC().Unix(), 10), 14*24*time.Hour).Err()
} }
func (s *OpsScheduledReportService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration) { func (s *OpsScheduledReportService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration, result string) {
if s == nil || s.opsService == nil || s.opsService.opsRepo == nil { if s == nil || s.opsService == nil || s.opsService.opsRepo == nil {
return return
} }
...@@ -658,11 +668,17 @@ func (s *OpsScheduledReportService) recordHeartbeatSuccess(runAt time.Time, dura ...@@ -658,11 +668,17 @@ func (s *OpsScheduledReportService) recordHeartbeatSuccess(runAt time.Time, dura
durMs := duration.Milliseconds() durMs := duration.Milliseconds()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
msg := strings.TrimSpace(result)
if msg == "" {
msg = "ok"
}
msg = truncateString(msg, 2048)
_ = s.opsService.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ _ = s.opsService.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsScheduledReportJobName, JobName: opsScheduledReportJobName,
LastRunAt: &runAt, LastRunAt: &runAt,
LastSuccessAt: &now, LastSuccessAt: &now,
LastDurationMs: &durMs, LastDurationMs: &durMs,
LastResult: &msg,
}) })
} }
......
-- Add last_result to ops_job_heartbeats for UI job details.
ALTER TABLE IF EXISTS ops_job_heartbeats
ADD COLUMN IF NOT EXISTS last_result TEXT;
COMMENT ON COLUMN ops_job_heartbeats.last_result IS 'Last successful run result summary (human readable).';
...@@ -293,6 +293,7 @@ export interface OpsJobHeartbeat { ...@@ -293,6 +293,7 @@ export interface OpsJobHeartbeat {
last_error_at?: string | null last_error_at?: string | null
last_error?: string | null last_error?: string | null
last_duration_ms?: number | null last_duration_ms?: number | null
last_result?: string | null
updated_at: string updated_at: string
} }
......
...@@ -1925,7 +1925,7 @@ export default { ...@@ -1925,7 +1925,7 @@ export default {
errors: 'Errors', errors: 'Errors',
errorRate: 'error_rate:', errorRate: 'error_rate:',
upstreamRate: 'upstream_rate:', upstreamRate: 'upstream_rate:',
latencyDuration: 'Request Duration (ms)', latencyDuration: 'Request Duration',
ttftLabel: 'TTFT (first_token_ms)', ttftLabel: 'TTFT (first_token_ms)',
p50: 'p50:', p50: 'p50:',
p90: 'p90:', p90: 'p90:',
...@@ -2590,7 +2590,7 @@ export default { ...@@ -2590,7 +2590,7 @@ export default {
errors: 'Error statistics, including total errors, error rate, and upstream error rate.', errors: 'Error statistics, including total errors, error rate, and upstream error rate.',
upstreamErrors: 'Upstream error statistics, excluding rate limit errors (429/529).', upstreamErrors: 'Upstream error statistics, excluding rate limit errors (429/529).',
latency: 'Request duration statistics, including p50, p90, p95, p99 percentiles.', latency: 'Request duration statistics, including p50, p90, p95, p99 percentiles.',
ttft: 'Time To First Token, measuring the speed of first byte return in streaming responses.', ttft: 'Time To First Token, measuring the speed of first token return in streaming responses.',
health: 'System health score (0-100), considering SLA, error rate, and resource usage.' health: 'System health score (0-100), considering SLA, error rate, and resource usage.'
}, },
charts: { charts: {
......
...@@ -2048,6 +2048,7 @@ export default { ...@@ -2048,6 +2048,7 @@ export default {
lastRun: '最近运行', lastRun: '最近运行',
lastSuccess: '最近成功', lastSuccess: '最近成功',
lastError: '最近错误', lastError: '最近错误',
result: '结果',
noData: '暂无数据', noData: '暂无数据',
loadingText: '加载中...', loadingText: '加载中...',
ready: '就绪', ready: '就绪',
...@@ -2062,7 +2063,7 @@ export default { ...@@ -2062,7 +2063,7 @@ export default {
avgQps: '平均 QPS', avgQps: '平均 QPS',
avgTps: '平均 TPS', avgTps: '平均 TPS',
avgLatency: '平均请求时长', avgLatency: '平均请求时长',
avgTtft: '平均首延迟', avgTtft: '平均首 Token 延迟',
exceptions: '异常数', exceptions: '异常数',
requestErrors: '请求错误', requestErrors: '请求错误',
errorCount: '错误数', errorCount: '错误数',
...@@ -2073,8 +2074,8 @@ export default { ...@@ -2073,8 +2074,8 @@ export default {
errors: '错误', errors: '错误',
errorRate: '错误率:', errorRate: '错误率:',
upstreamRate: '上游错误率:', upstreamRate: '上游错误率:',
latencyDuration: '请求时长(毫秒)', latencyDuration: '请求时长',
ttftLabel: '延迟(毫秒)', ttftLabel: ' Token 延迟(毫秒)',
p50: 'p50', p50: 'p50',
p90: 'p90', p90: 'p90',
p95: 'p95', p95: 'p95',
...@@ -2117,7 +2118,12 @@ export default { ...@@ -2117,7 +2118,12 @@ export default {
'6h': '近6小时', '6h': '近6小时',
'24h': '近24小时', '24h': '近24小时',
'7d': '近7天', '7d': '近7天',
'30d': '近30天' '30d': '近30天',
custom: '自定义'
},
customTimeRange: {
startTime: '开始时间',
endTime: '结束时间'
}, },
fullscreen: { fullscreen: {
enter: '进入全屏' enter: '进入全屏'
...@@ -2146,7 +2152,7 @@ export default { ...@@ -2146,7 +2152,7 @@ export default {
memoryHigh: '内存使用率偏高 ({usage}%)', memoryHigh: '内存使用率偏高 ({usage}%)',
memoryHighImpact: '内存压力较大,需要关注', memoryHighImpact: '内存压力较大,需要关注',
memoryHighAction: '监控内存趋势,检查是否有内存泄漏', memoryHighAction: '监控内存趋势,检查是否有内存泄漏',
ttftHigh: '字节时间偏高 ({ttft}ms)', ttftHigh: ' Token 时间偏高 ({ttft}ms)',
ttftHighImpact: '用户感知时长增加', ttftHighImpact: '用户感知时长增加',
ttftHighAction: '优化请求处理流程,减少前置逻辑耗时', ttftHighAction: '优化请求处理流程,减少前置逻辑耗时',
// Error rate diagnostics // Error rate diagnostics
...@@ -2738,7 +2744,7 @@ export default { ...@@ -2738,7 +2744,7 @@ export default {
sla: '服务等级协议达成率,排除业务限制(如余额不足、配额超限)的成功请求占比。', sla: '服务等级协议达成率,排除业务限制(如余额不足、配额超限)的成功请求占比。',
errors: '错误统计,包括总错误数、错误率和上游错误率。', errors: '错误统计,包括总错误数、错误率和上游错误率。',
latency: '请求时长统计,包括 p50、p90、p95、p99 等百分位数。', latency: '请求时长统计,包括 p50、p90、p95、p99 等百分位数。',
ttft: '首Token延迟(Time To First Token),衡量流式响应的首字节返回速度。', ttft: ' Token 延迟(Time To First Token),衡量流式响应的首 Token 返回速度。',
health: '系统健康评分(0-100),综合考虑 SLA、错误率和资源使用情况。' health: '系统健康评分(0-100),综合考虑 SLA、错误率和资源使用情况。'
}, },
charts: { charts: {
......
...@@ -414,7 +414,17 @@ const handleScroll = () => { ...@@ -414,7 +414,17 @@ const handleScroll = () => {
menu.show = false menu.show = false
} }
onMounted(async () => { load(); try { const [p, g] = await Promise.all([adminAPI.proxies.getAll(), adminAPI.groups.getAll()]); proxies.value = p; groups.value = g } catch (error) { console.error('Failed to load proxies/groups:', error) }; window.addEventListener('scroll', handleScroll, true) }) onMounted(async () => {
load()
try {
const [p, g] = await Promise.all([adminAPI.proxies.getAll(), adminAPI.groups.getAll()])
proxies.value = p
groups.value = g
} catch (error) {
console.error('Failed to load proxies/groups:', error)
}
window.addEventListener('scroll', handleScroll, true)
})
onUnmounted(() => { onUnmounted(() => {
window.removeEventListener('scroll', handleScroll, true) window.removeEventListener('scroll', handleScroll, true)
......
...@@ -23,10 +23,13 @@ ...@@ -23,10 +23,13 @@
:auto-refresh-enabled="autoRefreshEnabled" :auto-refresh-enabled="autoRefreshEnabled"
:auto-refresh-countdown="autoRefreshCountdown" :auto-refresh-countdown="autoRefreshCountdown"
:fullscreen="isFullscreen" :fullscreen="isFullscreen"
:custom-start-time="customStartTime"
:custom-end-time="customEndTime"
@update:time-range="onTimeRangeChange" @update:time-range="onTimeRangeChange"
@update:platform="onPlatformChange" @update:platform="onPlatformChange"
@update:group="onGroupChange" @update:group="onGroupChange"
@update:query-mode="onQueryModeChange" @update:query-mode="onQueryModeChange"
@update:custom-time-range="onCustomTimeRangeChange"
@refresh="fetchData" @refresh="fetchData"
@open-request-details="handleOpenRequestDetails" @open-request-details="handleOpenRequestDetails"
@open-error-details="openErrorDetails" @open-error-details="openErrorDetails"
...@@ -39,7 +42,7 @@ ...@@ -39,7 +42,7 @@
<!-- Row: Concurrency + Throughput --> <!-- Row: Concurrency + Throughput -->
<div v-if="opsEnabled && !(loading && !hasLoadedOnce)" class="grid grid-cols-1 gap-6 lg:grid-cols-3"> <div v-if="opsEnabled && !(loading && !hasLoadedOnce)" class="grid grid-cols-1 gap-6 lg:grid-cols-3">
<div class="lg:col-span-1 min-h-[360px]"> <div class="lg:col-span-1 min-h-[360px]">
<OpsConcurrencyCard :platform-filter="platform" :group-id-filter="groupId" /> <OpsConcurrencyCard :platform-filter="platform" :group-id-filter="groupId" :refresh-token="dashboardRefreshToken" />
</div> </div>
<div class="lg:col-span-2 min-h-[360px]"> <div class="lg:col-span-2 min-h-[360px]">
<OpsThroughputTrendChart <OpsThroughputTrendChart
...@@ -148,8 +151,8 @@ const { t } = useI18n() ...@@ -148,8 +151,8 @@ const { t } = useI18n()
const opsEnabled = computed(() => adminSettingsStore.opsMonitoringEnabled) const opsEnabled = computed(() => adminSettingsStore.opsMonitoringEnabled)
type TimeRange = '5m' | '30m' | '1h' | '6h' | '24h' type TimeRange = '5m' | '30m' | '1h' | '6h' | '24h' | 'custom'
const allowedTimeRanges = new Set<TimeRange>(['5m', '30m', '1h', '6h', '24h']) const allowedTimeRanges = new Set<TimeRange>(['5m', '30m', '1h', '6h', '24h', 'custom'])
type QueryMode = 'auto' | 'raw' | 'preagg' type QueryMode = 'auto' | 'raw' | 'preagg'
const allowedQueryModes = new Set<QueryMode>(['auto', 'raw', 'preagg']) const allowedQueryModes = new Set<QueryMode>(['auto', 'raw', 'preagg'])
...@@ -163,6 +166,8 @@ const timeRange = ref<TimeRange>('1h') ...@@ -163,6 +166,8 @@ const timeRange = ref<TimeRange>('1h')
const platform = ref<string>('') const platform = ref<string>('')
const groupId = ref<number | null>(null) const groupId = ref<number | null>(null)
const queryMode = ref<QueryMode>('auto') const queryMode = ref<QueryMode>('auto')
const customStartTime = ref<string | null>(null)
const customEndTime = ref<string | null>(null)
const QUERY_KEYS = { const QUERY_KEYS = {
timeRange: 'tr', timeRange: 'tr',
...@@ -347,23 +352,24 @@ const autoRefreshEnabled = ref(false) ...@@ -347,23 +352,24 @@ const autoRefreshEnabled = ref(false)
const autoRefreshIntervalMs = ref(30000) // default 30 seconds const autoRefreshIntervalMs = ref(30000) // default 30 seconds
const autoRefreshCountdown = ref(0) const autoRefreshCountdown = ref(0)
// Auto refresh timer // Used to trigger child component refreshes in a single shared cadence.
const { pause: pauseAutoRefresh, resume: resumeAutoRefresh } = useIntervalFn( const dashboardRefreshToken = ref(0)
() => {
if (autoRefreshEnabled.value && opsEnabled.value && !loading.value) {
fetchData()
}
},
autoRefreshIntervalMs,
{ immediate: false }
)
// Countdown timer (updates every second) // Countdown timer (drives auto refresh; updates every second)
const { pause: pauseCountdown, resume: resumeCountdown } = useIntervalFn( const { pause: pauseCountdown, resume: resumeCountdown } = useIntervalFn(
() => { () => {
if (autoRefreshEnabled.value && autoRefreshCountdown.value > 0) { if (!autoRefreshEnabled.value) return
autoRefreshCountdown.value-- if (!opsEnabled.value) return
if (loading.value) return
if (autoRefreshCountdown.value <= 0) {
// Fetch immediately when the countdown reaches 0.
// fetchData() will reset the countdown to the full interval.
fetchData()
return
} }
autoRefreshCountdown.value -= 1
}, },
1000, 1000,
{ immediate: false } { immediate: false }
...@@ -420,6 +426,11 @@ function onTimeRangeChange(v: string | number | boolean | null) { ...@@ -420,6 +426,11 @@ function onTimeRangeChange(v: string | number | boolean | null) {
timeRange.value = v as TimeRange timeRange.value = v as TimeRange
} }
function onCustomTimeRangeChange(startTime: string, endTime: string) {
customStartTime.value = startTime
customEndTime.value = endTime
}
function onSettingsSaved() { function onSettingsSaved() {
loadThresholds() loadThresholds()
fetchData() fetchData()
...@@ -458,18 +469,32 @@ function openError(id: number) { ...@@ -458,18 +469,32 @@ function openError(id: number) {
showErrorModal.value = true showErrorModal.value = true
} }
async function refreshOverviewWithCancel(fetchSeq: number, signal: AbortSignal) { function buildApiParams() {
if (!opsEnabled.value) return const params: any = {
try {
const data = await opsAPI.getDashboardOverview(
{
time_range: timeRange.value,
platform: platform.value || undefined, platform: platform.value || undefined,
group_id: groupId.value ?? undefined, group_id: groupId.value ?? undefined,
mode: queryMode.value mode: queryMode.value
}, }
{ signal }
) if (timeRange.value === 'custom') {
if (customStartTime.value && customEndTime.value) {
params.start_time = customStartTime.value
params.end_time = customEndTime.value
} else {
// Safety fallback: avoid sending time_range=custom (backend may not support it)
params.time_range = '1h'
}
} else {
params.time_range = timeRange.value
}
return params
}
async function refreshOverviewWithCancel(fetchSeq: number, signal: AbortSignal) {
if (!opsEnabled.value) return
try {
const data = await opsAPI.getDashboardOverview(buildApiParams(), { signal })
if (fetchSeq !== dashboardFetchSeq) return if (fetchSeq !== dashboardFetchSeq) return
overview.value = data overview.value = data
} catch (err: any) { } catch (err: any) {
...@@ -483,15 +508,7 @@ async function refreshThroughputTrendWithCancel(fetchSeq: number, signal: AbortS ...@@ -483,15 +508,7 @@ async function refreshThroughputTrendWithCancel(fetchSeq: number, signal: AbortS
if (!opsEnabled.value) return if (!opsEnabled.value) return
loadingTrend.value = true loadingTrend.value = true
try { try {
const data = await opsAPI.getThroughputTrend( const data = await opsAPI.getThroughputTrend(buildApiParams(), { signal })
{
time_range: timeRange.value,
platform: platform.value || undefined,
group_id: groupId.value ?? undefined,
mode: queryMode.value
},
{ signal }
)
if (fetchSeq !== dashboardFetchSeq) return if (fetchSeq !== dashboardFetchSeq) return
throughputTrend.value = data throughputTrend.value = data
} catch (err: any) { } catch (err: any) {
...@@ -509,15 +526,7 @@ async function refreshLatencyHistogramWithCancel(fetchSeq: number, signal: Abort ...@@ -509,15 +526,7 @@ async function refreshLatencyHistogramWithCancel(fetchSeq: number, signal: Abort
if (!opsEnabled.value) return if (!opsEnabled.value) return
loadingLatency.value = true loadingLatency.value = true
try { try {
const data = await opsAPI.getLatencyHistogram( const data = await opsAPI.getLatencyHistogram(buildApiParams(), { signal })
{
time_range: timeRange.value,
platform: platform.value || undefined,
group_id: groupId.value ?? undefined,
mode: queryMode.value
},
{ signal }
)
if (fetchSeq !== dashboardFetchSeq) return if (fetchSeq !== dashboardFetchSeq) return
latencyHistogram.value = data latencyHistogram.value = data
} catch (err: any) { } catch (err: any) {
...@@ -535,15 +544,7 @@ async function refreshErrorTrendWithCancel(fetchSeq: number, signal: AbortSignal ...@@ -535,15 +544,7 @@ async function refreshErrorTrendWithCancel(fetchSeq: number, signal: AbortSignal
if (!opsEnabled.value) return if (!opsEnabled.value) return
loadingErrorTrend.value = true loadingErrorTrend.value = true
try { try {
const data = await opsAPI.getErrorTrend( const data = await opsAPI.getErrorTrend(buildApiParams(), { signal })
{
time_range: timeRange.value,
platform: platform.value || undefined,
group_id: groupId.value ?? undefined,
mode: queryMode.value
},
{ signal }
)
if (fetchSeq !== dashboardFetchSeq) return if (fetchSeq !== dashboardFetchSeq) return
errorTrend.value = data errorTrend.value = data
} catch (err: any) { } catch (err: any) {
...@@ -561,15 +562,7 @@ async function refreshErrorDistributionWithCancel(fetchSeq: number, signal: Abor ...@@ -561,15 +562,7 @@ async function refreshErrorDistributionWithCancel(fetchSeq: number, signal: Abor
if (!opsEnabled.value) return if (!opsEnabled.value) return
loadingErrorDistribution.value = true loadingErrorDistribution.value = true
try { try {
const data = await opsAPI.getErrorDistribution( const data = await opsAPI.getErrorDistribution(buildApiParams(), { signal })
{
time_range: timeRange.value,
platform: platform.value || undefined,
group_id: groupId.value ?? undefined,
mode: queryMode.value
},
{ signal }
)
if (fetchSeq !== dashboardFetchSeq) return if (fetchSeq !== dashboardFetchSeq) return
errorDistribution.value = data errorDistribution.value = data
} catch (err: any) { } catch (err: any) {
...@@ -612,7 +605,12 @@ async function fetchData() { ...@@ -612,7 +605,12 @@ async function fetchData() {
refreshErrorDistributionWithCancel(fetchSeq, dashboardFetchController.signal) refreshErrorDistributionWithCancel(fetchSeq, dashboardFetchController.signal)
]) ])
if (fetchSeq !== dashboardFetchSeq) return if (fetchSeq !== dashboardFetchSeq) return
lastUpdated.value = new Date() lastUpdated.value = new Date()
// Trigger child component refreshes using the same cadence as the header.
dashboardRefreshToken.value += 1
// Reset auto refresh countdown after successful fetch // Reset auto refresh countdown after successful fetch
if (autoRefreshEnabled.value) { if (autoRefreshEnabled.value) {
autoRefreshCountdown.value = Math.floor(autoRefreshIntervalMs.value / 1000) autoRefreshCountdown.value = Math.floor(autoRefreshIntervalMs.value / 1000)
...@@ -686,15 +684,14 @@ onMounted(async () => { ...@@ -686,15 +684,14 @@ onMounted(async () => {
// Start auto refresh if enabled // Start auto refresh if enabled
if (autoRefreshEnabled.value) { if (autoRefreshEnabled.value) {
resumeAutoRefresh()
resumeCountdown() resumeCountdown()
} }
}) })
async function loadThresholds() { async function loadThresholds() {
try { try {
const settings = await opsAPI.getAlertRuntimeSettings() const thresholds = await opsAPI.getMetricThresholds()
metricThresholds.value = settings.thresholds || null metricThresholds.value = thresholds || null
} catch (err) { } catch (err) {
console.warn('[OpsDashboard] Failed to load thresholds', err) console.warn('[OpsDashboard] Failed to load thresholds', err)
metricThresholds.value = null metricThresholds.value = null
...@@ -704,7 +701,6 @@ async function loadThresholds() { ...@@ -704,7 +701,6 @@ async function loadThresholds() {
onUnmounted(() => { onUnmounted(() => {
window.removeEventListener('keydown', handleKeydown) window.removeEventListener('keydown', handleKeydown)
abortDashboardFetch() abortDashboardFetch()
pauseAutoRefresh()
pauseCountdown() pauseCountdown()
}) })
...@@ -712,10 +708,8 @@ onUnmounted(() => { ...@@ -712,10 +708,8 @@ onUnmounted(() => {
watch(autoRefreshEnabled, (enabled) => { watch(autoRefreshEnabled, (enabled) => {
if (enabled) { if (enabled) {
autoRefreshCountdown.value = Math.floor(autoRefreshIntervalMs.value / 1000) autoRefreshCountdown.value = Math.floor(autoRefreshIntervalMs.value / 1000)
resumeAutoRefresh()
resumeCountdown() resumeCountdown()
} else { } else {
pauseAutoRefresh()
pauseCountdown() pauseCountdown()
autoRefreshCountdown.value = 0 autoRefreshCountdown.value = 0
} }
......
<script setup lang="ts"> <script setup lang="ts">
import { computed, onMounted, onUnmounted, ref, watch } from 'vue' import { computed, ref, watch } from 'vue'
import { useI18n } from 'vue-i18n' import { useI18n } from 'vue-i18n'
import { useIntervalFn } from '@vueuse/core'
import { opsAPI, type OpsAccountAvailabilityStatsResponse, type OpsConcurrencyStatsResponse } from '@/api/admin/ops' import { opsAPI, type OpsAccountAvailabilityStatsResponse, type OpsConcurrencyStatsResponse } from '@/api/admin/ops'
interface Props { interface Props {
platformFilter?: string platformFilter?: string
groupIdFilter?: number | null groupIdFilter?: number | null
refreshToken: number
} }
const props = withDefaults(defineProps<Props>(), { const props = withDefaults(defineProps<Props>(), {
...@@ -233,15 +233,13 @@ async function loadData() { ...@@ -233,15 +233,13 @@ async function loadData() {
} }
} }
// 定期刷新(5秒) // 刷新节奏由父组件统一控制(OpsDashboard Header 的刷新状态/倒计时)
const { pause: pauseRefresh, resume: resumeRefresh } = useIntervalFn( watch(
() => props.refreshToken,
() => { () => {
if (realtimeEnabled.value) { if (!realtimeEnabled.value) return
loadData() loadData()
} }
},
5000,
{ immediate: false }
) )
function getLoadBarClass(loadPct: number): string { function getLoadBarClass(loadPct: number): string {
...@@ -271,23 +269,15 @@ function formatDuration(seconds: number): string { ...@@ -271,23 +269,15 @@ function formatDuration(seconds: number): string {
return `${hours}h` return `${hours}h`
} }
onMounted(() => { watch(
loadData() () => realtimeEnabled.value,
resumeRefresh() async (enabled) => {
}) if (enabled) {
onUnmounted(() => {
pauseRefresh()
})
watch(realtimeEnabled, async (enabled) => {
if (!enabled) {
pauseRefresh()
} else {
resumeRefresh()
await loadData() await loadData()
} }
}) },
{ immediate: true }
)
</script> </script>
<template> <template>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment