Commit 64236361 authored by yangjianbo's avatar yangjianbo
Browse files

Merge branch 'test' into dev

parents 2d6066f9 b6aaee01
package service
import (
"context"
"encoding/json"
"errors"
"testing"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
)
type runtimeSettingRepoStub struct {
values map[string]string
deleted map[string]bool
setCalls int
getValueFn func(key string) (string, error)
setFn func(key, value string) error
deleteFn func(key string) error
}
func newRuntimeSettingRepoStub() *runtimeSettingRepoStub {
return &runtimeSettingRepoStub{
values: map[string]string{},
deleted: map[string]bool{},
}
}
func (s *runtimeSettingRepoStub) Get(ctx context.Context, key string) (*Setting, error) {
value, err := s.GetValue(ctx, key)
if err != nil {
return nil, err
}
return &Setting{Key: key, Value: value}, nil
}
func (s *runtimeSettingRepoStub) GetValue(_ context.Context, key string) (string, error) {
if s.getValueFn != nil {
return s.getValueFn(key)
}
value, ok := s.values[key]
if !ok {
return "", ErrSettingNotFound
}
return value, nil
}
func (s *runtimeSettingRepoStub) Set(_ context.Context, key, value string) error {
if s.setFn != nil {
if err := s.setFn(key, value); err != nil {
return err
}
}
s.values[key] = value
s.setCalls++
return nil
}
func (s *runtimeSettingRepoStub) GetMultiple(_ context.Context, keys []string) (map[string]string, error) {
out := make(map[string]string, len(keys))
for _, key := range keys {
if value, ok := s.values[key]; ok {
out[key] = value
}
}
return out, nil
}
func (s *runtimeSettingRepoStub) SetMultiple(_ context.Context, settings map[string]string) error {
for key, value := range settings {
s.values[key] = value
}
return nil
}
func (s *runtimeSettingRepoStub) GetAll(_ context.Context) (map[string]string, error) {
out := make(map[string]string, len(s.values))
for key, value := range s.values {
out[key] = value
}
return out, nil
}
func (s *runtimeSettingRepoStub) Delete(_ context.Context, key string) error {
if s.deleteFn != nil {
if err := s.deleteFn(key); err != nil {
return err
}
}
if _, ok := s.values[key]; !ok {
return ErrSettingNotFound
}
delete(s.values, key)
s.deleted[key] = true
return nil
}
func TestUpdateRuntimeLogConfig_InvalidConfigShouldNotApply(t *testing.T) {
repo := newRuntimeSettingRepoStub()
svc := &OpsService{
settingRepo: repo,
cfg: &config.Config{
Log: config.LogConfig{
Level: "info",
Caller: true,
StacktraceLevel: "error",
Sampling: config.LogSamplingConfig{
Enabled: false,
Initial: 100,
Thereafter: 100,
},
},
},
}
if err := logger.Init(logger.InitOptions{
Level: "info",
Format: "json",
ServiceName: "sub2api",
Environment: "test",
Output: logger.OutputOptions{
ToStdout: true,
ToFile: false,
},
}); err != nil {
t.Fatalf("init logger: %v", err)
}
_, err := svc.UpdateRuntimeLogConfig(context.Background(), &OpsRuntimeLogConfig{
Level: "trace",
EnableSampling: true,
SamplingInitial: 100,
SamplingNext: 100,
Caller: true,
StacktraceLevel: "error",
RetentionDays: 30,
}, 1)
if err == nil {
t.Fatalf("expected validation error")
}
if logger.CurrentLevel() != "info" {
t.Fatalf("logger level changed unexpectedly: %s", logger.CurrentLevel())
}
if repo.setCalls != 1 {
// GetRuntimeLogConfig() 会在 key 缺失时写入默认值,此处应只有这一次持久化。
t.Fatalf("unexpected set calls: %d", repo.setCalls)
}
}
func TestResetRuntimeLogConfig_ShouldFallbackToBaseline(t *testing.T) {
repo := newRuntimeSettingRepoStub()
existing := &OpsRuntimeLogConfig{
Level: "debug",
EnableSampling: true,
SamplingInitial: 50,
SamplingNext: 50,
Caller: true,
StacktraceLevel: "error",
RetentionDays: 60,
Source: "runtime_setting",
}
raw, _ := json.Marshal(existing)
repo.values[SettingKeyOpsRuntimeLogConfig] = string(raw)
svc := &OpsService{
settingRepo: repo,
cfg: &config.Config{
Log: config.LogConfig{
Level: "warn",
Caller: false,
StacktraceLevel: "fatal",
Sampling: config.LogSamplingConfig{
Enabled: false,
Initial: 100,
Thereafter: 100,
},
},
Ops: config.OpsConfig{
Cleanup: config.OpsCleanupConfig{
ErrorLogRetentionDays: 45,
},
},
},
}
if err := logger.Init(logger.InitOptions{
Level: "debug",
Format: "json",
ServiceName: "sub2api",
Environment: "test",
Output: logger.OutputOptions{
ToStdout: true,
ToFile: false,
},
}); err != nil {
t.Fatalf("init logger: %v", err)
}
resetCfg, err := svc.ResetRuntimeLogConfig(context.Background(), 9)
if err != nil {
t.Fatalf("ResetRuntimeLogConfig() error: %v", err)
}
if resetCfg.Source != "baseline" {
t.Fatalf("source = %q, want baseline", resetCfg.Source)
}
if resetCfg.Level != "warn" {
t.Fatalf("level = %q, want warn", resetCfg.Level)
}
if resetCfg.RetentionDays != 45 {
t.Fatalf("retention_days = %d, want 45", resetCfg.RetentionDays)
}
if logger.CurrentLevel() != "warn" {
t.Fatalf("logger level = %q, want warn", logger.CurrentLevel())
}
if !repo.deleted[SettingKeyOpsRuntimeLogConfig] {
t.Fatalf("runtime setting key should be deleted")
}
}
func TestResetRuntimeLogConfig_InvalidOperator(t *testing.T) {
svc := &OpsService{settingRepo: newRuntimeSettingRepoStub()}
_, err := svc.ResetRuntimeLogConfig(context.Background(), 0)
if err == nil {
t.Fatalf("expected invalid operator error")
}
if err.Error() != "invalid operator id" {
t.Fatalf("unexpected error: %v", err)
}
}
func TestGetRuntimeLogConfig_InvalidJSONFallback(t *testing.T) {
repo := newRuntimeSettingRepoStub()
repo.values[SettingKeyOpsRuntimeLogConfig] = `{invalid-json}`
svc := &OpsService{
settingRepo: repo,
cfg: &config.Config{
Log: config.LogConfig{
Level: "warn",
Caller: true,
StacktraceLevel: "error",
Sampling: config.LogSamplingConfig{
Enabled: false,
Initial: 100,
Thereafter: 100,
},
},
},
}
got, err := svc.GetRuntimeLogConfig(context.Background())
if err != nil {
t.Fatalf("GetRuntimeLogConfig() error: %v", err)
}
if got.Level != "warn" {
t.Fatalf("level = %q, want warn", got.Level)
}
}
func TestUpdateRuntimeLogConfig_PersistFailureRollback(t *testing.T) {
repo := newRuntimeSettingRepoStub()
oldCfg := &OpsRuntimeLogConfig{
Level: "info",
EnableSampling: false,
SamplingInitial: 100,
SamplingNext: 100,
Caller: true,
StacktraceLevel: "error",
RetentionDays: 30,
}
raw, _ := json.Marshal(oldCfg)
repo.values[SettingKeyOpsRuntimeLogConfig] = string(raw)
repo.setFn = func(key, value string) error {
if key == SettingKeyOpsRuntimeLogConfig {
return errors.New("db down")
}
return nil
}
svc := &OpsService{
settingRepo: repo,
cfg: &config.Config{
Log: config.LogConfig{
Level: "info",
Caller: true,
StacktraceLevel: "error",
Sampling: config.LogSamplingConfig{
Enabled: false,
Initial: 100,
Thereafter: 100,
},
},
},
}
if err := logger.Init(logger.InitOptions{
Level: "info",
Format: "json",
ServiceName: "sub2api",
Environment: "test",
Output: logger.OutputOptions{
ToStdout: true,
ToFile: false,
},
}); err != nil {
t.Fatalf("init logger: %v", err)
}
_, err := svc.UpdateRuntimeLogConfig(context.Background(), &OpsRuntimeLogConfig{
Level: "debug",
EnableSampling: false,
SamplingInitial: 100,
SamplingNext: 100,
Caller: true,
StacktraceLevel: "error",
RetentionDays: 30,
}, 5)
if err == nil {
t.Fatalf("expected persist error")
}
// Persist failure should rollback runtime level back to old effective level.
if logger.CurrentLevel() != "info" {
t.Fatalf("logger level should rollback to info, got %s", logger.CurrentLevel())
}
}
func TestApplyRuntimeLogConfigOnStartup(t *testing.T) {
repo := newRuntimeSettingRepoStub()
cfgRaw := `{"level":"debug","enable_sampling":false,"sampling_initial":100,"sampling_thereafter":100,"caller":true,"stacktrace_level":"error","retention_days":30}`
repo.values[SettingKeyOpsRuntimeLogConfig] = cfgRaw
svc := &OpsService{
settingRepo: repo,
cfg: &config.Config{
Log: config.LogConfig{
Level: "info",
Caller: true,
StacktraceLevel: "error",
Sampling: config.LogSamplingConfig{
Enabled: false,
Initial: 100,
Thereafter: 100,
},
},
},
}
if err := logger.Init(logger.InitOptions{
Level: "info",
Format: "json",
ServiceName: "sub2api",
Environment: "test",
Output: logger.OutputOptions{
ToStdout: true,
ToFile: false,
},
}); err != nil {
t.Fatalf("init logger: %v", err)
}
svc.applyRuntimeLogConfigOnStartup(context.Background())
if logger.CurrentLevel() != "debug" {
t.Fatalf("expected startup apply debug, got %s", logger.CurrentLevel())
}
}
func TestDefaultNormalizeAndValidateRuntimeLogConfig(t *testing.T) {
defaults := defaultOpsRuntimeLogConfig(&config.Config{
Log: config.LogConfig{
Level: "DEBUG",
Caller: false,
StacktraceLevel: "FATAL",
Sampling: config.LogSamplingConfig{
Enabled: true,
Initial: 50,
Thereafter: 20,
},
},
Ops: config.OpsConfig{
Cleanup: config.OpsCleanupConfig{
ErrorLogRetentionDays: 7,
},
},
})
if defaults.Level != "debug" || defaults.StacktraceLevel != "fatal" || defaults.RetentionDays != 7 {
t.Fatalf("unexpected defaults: %+v", defaults)
}
cfg := &OpsRuntimeLogConfig{
Level: " ",
EnableSampling: true,
SamplingInitial: 0,
SamplingNext: -1,
Caller: true,
StacktraceLevel: "",
RetentionDays: 0,
}
normalizeOpsRuntimeLogConfig(cfg, defaults)
if cfg.Level != "debug" || cfg.StacktraceLevel != "fatal" {
t.Fatalf("normalize level/stacktrace failed: %+v", cfg)
}
if cfg.SamplingInitial != 50 || cfg.SamplingNext != 20 || cfg.RetentionDays != 7 {
t.Fatalf("normalize numeric defaults failed: %+v", cfg)
}
if err := validateOpsRuntimeLogConfig(cfg); err != nil {
t.Fatalf("validate normalized config should pass: %v", err)
}
}
func TestValidateRuntimeLogConfigErrors(t *testing.T) {
cases := []struct {
name string
cfg *OpsRuntimeLogConfig
}{
{name: "nil", cfg: nil},
{name: "bad level", cfg: &OpsRuntimeLogConfig{Level: "trace", StacktraceLevel: "error", SamplingInitial: 1, SamplingNext: 1, RetentionDays: 1}},
{name: "bad stack", cfg: &OpsRuntimeLogConfig{Level: "info", StacktraceLevel: "warn", SamplingInitial: 1, SamplingNext: 1, RetentionDays: 1}},
{name: "bad initial", cfg: &OpsRuntimeLogConfig{Level: "info", StacktraceLevel: "error", SamplingInitial: 0, SamplingNext: 1, RetentionDays: 1}},
{name: "bad next", cfg: &OpsRuntimeLogConfig{Level: "info", StacktraceLevel: "error", SamplingInitial: 1, SamplingNext: 0, RetentionDays: 1}},
{name: "bad retention", cfg: &OpsRuntimeLogConfig{Level: "info", StacktraceLevel: "error", SamplingInitial: 1, SamplingNext: 1, RetentionDays: 0}},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if err := validateOpsRuntimeLogConfig(tc.cfg); err == nil {
t.Fatalf("expected validation error")
}
})
}
}
func TestGetRuntimeLogConfigFallbackAndErrors(t *testing.T) {
var nilSvc *OpsService
cfg, err := nilSvc.GetRuntimeLogConfig(context.Background())
if err != nil {
t.Fatalf("nil svc should fallback default: %v", err)
}
if cfg.Level != "info" {
t.Fatalf("unexpected nil svc default level: %s", cfg.Level)
}
repo := newRuntimeSettingRepoStub()
repo.getValueFn = func(key string) (string, error) {
return "", errors.New("boom")
}
svc := &OpsService{
settingRepo: repo,
cfg: &config.Config{
Log: config.LogConfig{
Level: "warn",
Caller: true,
StacktraceLevel: "error",
Sampling: config.LogSamplingConfig{
Enabled: false,
Initial: 100,
Thereafter: 100,
},
},
},
}
if _, err := svc.GetRuntimeLogConfig(context.Background()); err == nil {
t.Fatalf("expected get value error")
}
}
func TestUpdateRuntimeLogConfig_PreconditionErrors(t *testing.T) {
svc := &OpsService{}
if _, err := svc.UpdateRuntimeLogConfig(context.Background(), &OpsRuntimeLogConfig{}, 1); err == nil {
t.Fatalf("expected setting repo not initialized")
}
svc = &OpsService{settingRepo: newRuntimeSettingRepoStub()}
if _, err := svc.UpdateRuntimeLogConfig(context.Background(), nil, 1); err == nil {
t.Fatalf("expected invalid config")
}
if _, err := svc.UpdateRuntimeLogConfig(context.Background(), &OpsRuntimeLogConfig{
Level: "info",
StacktraceLevel: "error",
SamplingInitial: 1,
SamplingNext: 1,
RetentionDays: 1,
}, 0); err == nil {
t.Fatalf("expected invalid operator")
}
}
func TestUpdateRuntimeLogConfig_Success(t *testing.T) {
repo := newRuntimeSettingRepoStub()
svc := &OpsService{
settingRepo: repo,
cfg: &config.Config{
Log: config.LogConfig{
Level: "info",
Caller: true,
StacktraceLevel: "error",
Sampling: config.LogSamplingConfig{
Enabled: false,
Initial: 100,
Thereafter: 100,
},
},
},
}
if err := logger.Init(logger.InitOptions{
Level: "info",
Format: "json",
ServiceName: "sub2api",
Environment: "test",
Output: logger.OutputOptions{
ToStdout: true,
ToFile: false,
},
}); err != nil {
t.Fatalf("init logger: %v", err)
}
next, err := svc.UpdateRuntimeLogConfig(context.Background(), &OpsRuntimeLogConfig{
Level: "debug",
EnableSampling: false,
SamplingInitial: 100,
SamplingNext: 100,
Caller: true,
StacktraceLevel: "error",
RetentionDays: 30,
}, 2)
if err != nil {
t.Fatalf("UpdateRuntimeLogConfig() error: %v", err)
}
if next.Source != "runtime_setting" || next.UpdatedByUserID != 2 || next.UpdatedAt == "" {
t.Fatalf("unexpected metadata: %+v", next)
}
if logger.CurrentLevel() != "debug" {
t.Fatalf("expected applied level debug, got %s", logger.CurrentLevel())
}
}
func TestResetRuntimeLogConfig_IgnoreNotFoundDelete(t *testing.T) {
repo := newRuntimeSettingRepoStub()
repo.deleteFn = func(key string) error { return ErrSettingNotFound }
svc := &OpsService{
settingRepo: repo,
cfg: &config.Config{
Log: config.LogConfig{
Level: "info",
Caller: true,
StacktraceLevel: "error",
Sampling: config.LogSamplingConfig{
Enabled: false,
Initial: 100,
Thereafter: 100,
},
},
},
}
if _, err := svc.ResetRuntimeLogConfig(context.Background(), 1); err != nil {
t.Fatalf("reset should ignore ErrSettingNotFound: %v", err)
}
}
func TestApplyRuntimeLogConfigHelpers(t *testing.T) {
if err := applyOpsRuntimeLogConfig(nil); err == nil {
t.Fatalf("expected nil config error")
}
normalizeOpsRuntimeLogConfig(nil, &OpsRuntimeLogConfig{Level: "info"})
normalizeOpsRuntimeLogConfig(&OpsRuntimeLogConfig{Level: "debug"}, nil)
var nilSvc *OpsService
nilSvc.applyRuntimeLogConfigOnStartup(context.Background())
}
......@@ -2,6 +2,21 @@ package service
import "time"
type OpsSystemLog struct {
ID int64 `json:"id"`
CreatedAt time.Time `json:"created_at"`
Level string `json:"level"`
Component string `json:"component"`
Message string `json:"message"`
RequestID string `json:"request_id"`
ClientRequestID string `json:"client_request_id"`
UserID *int64 `json:"user_id"`
AccountID *int64 `json:"account_id"`
Platform string `json:"platform"`
Model string `json:"model"`
Extra map[string]any `json:"extra,omitempty"`
}
type OpsErrorLog struct {
ID int64 `json:"id"`
CreatedAt time.Time `json:"created_at"`
......
......@@ -10,6 +10,10 @@ type OpsRepository interface {
ListErrorLogs(ctx context.Context, filter *OpsErrorLogFilter) (*OpsErrorLogList, error)
GetErrorLogByID(ctx context.Context, id int64) (*OpsErrorLogDetail, error)
ListRequestDetails(ctx context.Context, filter *OpsRequestDetailFilter) ([]*OpsRequestDetail, int64, error)
BatchInsertSystemLogs(ctx context.Context, inputs []*OpsInsertSystemLogInput) (int64, error)
ListSystemLogs(ctx context.Context, filter *OpsSystemLogFilter) (*OpsSystemLogList, error)
DeleteSystemLogs(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error)
InsertSystemLogCleanupAudit(ctx context.Context, input *OpsSystemLogCleanupAudit) error
InsertRetryAttempt(ctx context.Context, input *OpsInsertRetryAttemptInput) (int64, error)
UpdateRetryAttempt(ctx context.Context, input *OpsUpdateRetryAttemptInput) error
......@@ -205,6 +209,69 @@ type OpsInsertSystemMetricsInput struct {
ConcurrencyQueueDepth *int
}
type OpsInsertSystemLogInput struct {
CreatedAt time.Time
Level string
Component string
Message string
RequestID string
ClientRequestID string
UserID *int64
AccountID *int64
Platform string
Model string
ExtraJSON string
}
type OpsSystemLogFilter struct {
StartTime *time.Time
EndTime *time.Time
Level string
Component string
RequestID string
ClientRequestID string
UserID *int64
AccountID *int64
Platform string
Model string
Query string
Page int
PageSize int
}
type OpsSystemLogCleanupFilter struct {
StartTime *time.Time
EndTime *time.Time
Level string
Component string
RequestID string
ClientRequestID string
UserID *int64
AccountID *int64
Platform string
Model string
Query string
}
type OpsSystemLogList struct {
Logs []*OpsSystemLog `json:"logs"`
Total int `json:"total"`
Page int `json:"page"`
PageSize int `json:"page_size"`
}
type OpsSystemLogCleanupAudit struct {
CreatedAt time.Time
OperatorID int64
Conditions string
DeletedRows int64
}
type OpsSystemMetricsSnapshot struct {
ID int64 `json:"id"`
CreatedAt time.Time `json:"created_at"`
......
package service
import (
"context"
"time"
)
// opsRepoMock is a test-only OpsRepository implementation with optional function hooks.
type opsRepoMock struct {
BatchInsertSystemLogsFn func(ctx context.Context, inputs []*OpsInsertSystemLogInput) (int64, error)
ListSystemLogsFn func(ctx context.Context, filter *OpsSystemLogFilter) (*OpsSystemLogList, error)
DeleteSystemLogsFn func(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error)
InsertSystemLogCleanupAuditFn func(ctx context.Context, input *OpsSystemLogCleanupAudit) error
}
func (m *opsRepoMock) InsertErrorLog(ctx context.Context, input *OpsInsertErrorLogInput) (int64, error) {
return 0, nil
}
func (m *opsRepoMock) ListErrorLogs(ctx context.Context, filter *OpsErrorLogFilter) (*OpsErrorLogList, error) {
return &OpsErrorLogList{Errors: []*OpsErrorLog{}, Page: 1, PageSize: 20}, nil
}
func (m *opsRepoMock) GetErrorLogByID(ctx context.Context, id int64) (*OpsErrorLogDetail, error) {
return &OpsErrorLogDetail{}, nil
}
func (m *opsRepoMock) ListRequestDetails(ctx context.Context, filter *OpsRequestDetailFilter) ([]*OpsRequestDetail, int64, error) {
return []*OpsRequestDetail{}, 0, nil
}
func (m *opsRepoMock) BatchInsertSystemLogs(ctx context.Context, inputs []*OpsInsertSystemLogInput) (int64, error) {
if m.BatchInsertSystemLogsFn != nil {
return m.BatchInsertSystemLogsFn(ctx, inputs)
}
return int64(len(inputs)), nil
}
func (m *opsRepoMock) ListSystemLogs(ctx context.Context, filter *OpsSystemLogFilter) (*OpsSystemLogList, error) {
if m.ListSystemLogsFn != nil {
return m.ListSystemLogsFn(ctx, filter)
}
return &OpsSystemLogList{Logs: []*OpsSystemLog{}, Total: 0, Page: 1, PageSize: 50}, nil
}
func (m *opsRepoMock) DeleteSystemLogs(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error) {
if m.DeleteSystemLogsFn != nil {
return m.DeleteSystemLogsFn(ctx, filter)
}
return 0, nil
}
func (m *opsRepoMock) InsertSystemLogCleanupAudit(ctx context.Context, input *OpsSystemLogCleanupAudit) error {
if m.InsertSystemLogCleanupAuditFn != nil {
return m.InsertSystemLogCleanupAuditFn(ctx, input)
}
return nil
}
func (m *opsRepoMock) InsertRetryAttempt(ctx context.Context, input *OpsInsertRetryAttemptInput) (int64, error) {
return 0, nil
}
func (m *opsRepoMock) UpdateRetryAttempt(ctx context.Context, input *OpsUpdateRetryAttemptInput) error {
return nil
}
func (m *opsRepoMock) GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*OpsRetryAttempt, error) {
return nil, nil
}
func (m *opsRepoMock) ListRetryAttemptsByErrorID(ctx context.Context, sourceErrorID int64, limit int) ([]*OpsRetryAttempt, error) {
return []*OpsRetryAttempt{}, nil
}
func (m *opsRepoMock) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64, resolvedAt *time.Time) error {
return nil
}
func (m *opsRepoMock) GetWindowStats(ctx context.Context, filter *OpsDashboardFilter) (*OpsWindowStats, error) {
return &OpsWindowStats{}, nil
}
func (m *opsRepoMock) GetRealtimeTrafficSummary(ctx context.Context, filter *OpsDashboardFilter) (*OpsRealtimeTrafficSummary, error) {
return &OpsRealtimeTrafficSummary{}, nil
}
func (m *opsRepoMock) GetDashboardOverview(ctx context.Context, filter *OpsDashboardFilter) (*OpsDashboardOverview, error) {
return &OpsDashboardOverview{}, nil
}
func (m *opsRepoMock) GetThroughputTrend(ctx context.Context, filter *OpsDashboardFilter, bucketSeconds int) (*OpsThroughputTrendResponse, error) {
return &OpsThroughputTrendResponse{}, nil
}
func (m *opsRepoMock) GetLatencyHistogram(ctx context.Context, filter *OpsDashboardFilter) (*OpsLatencyHistogramResponse, error) {
return &OpsLatencyHistogramResponse{}, nil
}
func (m *opsRepoMock) GetErrorTrend(ctx context.Context, filter *OpsDashboardFilter, bucketSeconds int) (*OpsErrorTrendResponse, error) {
return &OpsErrorTrendResponse{}, nil
}
func (m *opsRepoMock) GetErrorDistribution(ctx context.Context, filter *OpsDashboardFilter) (*OpsErrorDistributionResponse, error) {
return &OpsErrorDistributionResponse{}, nil
}
func (m *opsRepoMock) GetOpenAITokenStats(ctx context.Context, filter *OpsOpenAITokenStatsFilter) (*OpsOpenAITokenStatsResponse, error) {
return &OpsOpenAITokenStatsResponse{}, nil
}
func (m *opsRepoMock) InsertSystemMetrics(ctx context.Context, input *OpsInsertSystemMetricsInput) error {
return nil
}
func (m *opsRepoMock) GetLatestSystemMetrics(ctx context.Context, windowMinutes int) (*OpsSystemMetricsSnapshot, error) {
return &OpsSystemMetricsSnapshot{}, nil
}
func (m *opsRepoMock) UpsertJobHeartbeat(ctx context.Context, input *OpsUpsertJobHeartbeatInput) error {
return nil
}
func (m *opsRepoMock) ListJobHeartbeats(ctx context.Context) ([]*OpsJobHeartbeat, error) {
return []*OpsJobHeartbeat{}, nil
}
func (m *opsRepoMock) ListAlertRules(ctx context.Context) ([]*OpsAlertRule, error) {
return []*OpsAlertRule{}, nil
}
func (m *opsRepoMock) CreateAlertRule(ctx context.Context, input *OpsAlertRule) (*OpsAlertRule, error) {
return input, nil
}
func (m *opsRepoMock) UpdateAlertRule(ctx context.Context, input *OpsAlertRule) (*OpsAlertRule, error) {
return input, nil
}
func (m *opsRepoMock) DeleteAlertRule(ctx context.Context, id int64) error {
return nil
}
func (m *opsRepoMock) ListAlertEvents(ctx context.Context, filter *OpsAlertEventFilter) ([]*OpsAlertEvent, error) {
return []*OpsAlertEvent{}, nil
}
func (m *opsRepoMock) GetAlertEventByID(ctx context.Context, eventID int64) (*OpsAlertEvent, error) {
return &OpsAlertEvent{}, nil
}
func (m *opsRepoMock) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) {
return nil, nil
}
func (m *opsRepoMock) GetLatestAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) {
return nil, nil
}
func (m *opsRepoMock) CreateAlertEvent(ctx context.Context, event *OpsAlertEvent) (*OpsAlertEvent, error) {
return event, nil
}
func (m *opsRepoMock) UpdateAlertEventStatus(ctx context.Context, eventID int64, status string, resolvedAt *time.Time) error {
return nil
}
func (m *opsRepoMock) UpdateAlertEventEmailSent(ctx context.Context, eventID int64, emailSent bool) error {
return nil
}
func (m *opsRepoMock) CreateAlertSilence(ctx context.Context, input *OpsAlertSilence) (*OpsAlertSilence, error) {
return input, nil
}
func (m *opsRepoMock) IsAlertSilenced(ctx context.Context, ruleID int64, platform string, groupID *int64, region *string, now time.Time) (bool, error) {
return false, nil
}
func (m *opsRepoMock) UpsertHourlyMetrics(ctx context.Context, startTime, endTime time.Time) error {
return nil
}
func (m *opsRepoMock) UpsertDailyMetrics(ctx context.Context, startTime, endTime time.Time) error {
return nil
}
func (m *opsRepoMock) GetLatestHourlyBucketStart(ctx context.Context) (time.Time, bool, error) {
return time.Time{}, false, nil
}
func (m *opsRepoMock) GetLatestDailyBucketDate(ctx context.Context) (time.Time, bool, error) {
return time.Time{}, false, nil
}
var _ OpsRepository = (*opsRepoMock)(nil)
......@@ -37,6 +37,7 @@ type OpsService struct {
openAIGatewayService *OpenAIGatewayService
geminiCompatService *GeminiMessagesCompatService
antigravityGatewayService *AntigravityGatewayService
systemLogSink *OpsSystemLogSink
}
func NewOpsService(
......@@ -50,8 +51,9 @@ func NewOpsService(
openAIGatewayService *OpenAIGatewayService,
geminiCompatService *GeminiMessagesCompatService,
antigravityGatewayService *AntigravityGatewayService,
systemLogSink *OpsSystemLogSink,
) *OpsService {
return &OpsService{
svc := &OpsService{
opsRepo: opsRepo,
settingRepo: settingRepo,
cfg: cfg,
......@@ -64,7 +66,10 @@ func NewOpsService(
openAIGatewayService: openAIGatewayService,
geminiCompatService: geminiCompatService,
antigravityGatewayService: antigravityGatewayService,
systemLogSink: systemLogSink,
}
svc.applyRuntimeLogConfigOnStartup(context.Background())
return svc
}
func (s *OpsService) RequireMonitoringEnabled(ctx context.Context) error {
......
......@@ -68,6 +68,20 @@ type OpsMetricThresholds struct {
UpstreamErrorRatePercentMax *float64 `json:"upstream_error_rate_percent_max,omitempty"` // 上游错误率高于此值变红
}
type OpsRuntimeLogConfig struct {
Level string `json:"level"`
EnableSampling bool `json:"enable_sampling"`
SamplingInitial int `json:"sampling_initial"`
SamplingNext int `json:"sampling_thereafter"`
Caller bool `json:"caller"`
StacktraceLevel string `json:"stacktrace_level"`
RetentionDays int `json:"retention_days"`
Source string `json:"source,omitempty"`
UpdatedAt string `json:"updated_at,omitempty"`
UpdatedByUserID int64 `json:"updated_by_user_id,omitempty"`
Extra map[string]any `json:"extra,omitempty"`
}
type OpsAlertRuntimeSettings struct {
EvaluationIntervalSeconds int `json:"evaluation_interval_seconds"`
......
package service
import (
"context"
"database/sql"
"encoding/json"
"errors"
"log"
"strings"
"time"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
func (s *OpsService) ListSystemLogs(ctx context.Context, filter *OpsSystemLogFilter) (*OpsSystemLogList, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return &OpsSystemLogList{
Logs: []*OpsSystemLog{},
Total: 0,
Page: 1,
PageSize: 50,
}, nil
}
if filter == nil {
filter = &OpsSystemLogFilter{}
}
if filter.Page <= 0 {
filter.Page = 1
}
if filter.PageSize <= 0 {
filter.PageSize = 50
}
if filter.PageSize > 200 {
filter.PageSize = 200
}
result, err := s.opsRepo.ListSystemLogs(ctx, filter)
if err != nil {
return nil, infraerrors.InternalServer("OPS_SYSTEM_LOG_LIST_FAILED", "Failed to list system logs").WithCause(err)
}
return result, nil
}
func (s *OpsService) CleanupSystemLogs(ctx context.Context, filter *OpsSystemLogCleanupFilter, operatorID int64) (int64, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return 0, err
}
if s.opsRepo == nil {
return 0, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if operatorID <= 0 {
return 0, infraerrors.BadRequest("OPS_SYSTEM_LOG_CLEANUP_INVALID_OPERATOR", "invalid operator")
}
if filter == nil {
filter = &OpsSystemLogCleanupFilter{}
}
if filter.EndTime != nil && filter.StartTime != nil && filter.StartTime.After(*filter.EndTime) {
return 0, infraerrors.BadRequest("OPS_SYSTEM_LOG_CLEANUP_INVALID_RANGE", "invalid time range")
}
deletedRows, err := s.opsRepo.DeleteSystemLogs(ctx, filter)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
if strings.Contains(strings.ToLower(err.Error()), "requires at least one filter") {
return 0, infraerrors.BadRequest("OPS_SYSTEM_LOG_CLEANUP_FILTER_REQUIRED", "cleanup requires at least one filter condition")
}
return 0, infraerrors.InternalServer("OPS_SYSTEM_LOG_CLEANUP_FAILED", "Failed to cleanup system logs").WithCause(err)
}
if auditErr := s.opsRepo.InsertSystemLogCleanupAudit(ctx, &OpsSystemLogCleanupAudit{
CreatedAt: time.Now().UTC(),
OperatorID: operatorID,
Conditions: marshalSystemLogCleanupConditions(filter),
DeletedRows: deletedRows,
}); auditErr != nil {
// 审计失败不影响主流程,避免运维清理被阻塞。
log.Printf("[OpsSystemLog] cleanup audit failed: %v", auditErr)
}
return deletedRows, nil
}
func marshalSystemLogCleanupConditions(filter *OpsSystemLogCleanupFilter) string {
if filter == nil {
return "{}"
}
payload := map[string]any{
"level": strings.TrimSpace(filter.Level),
"component": strings.TrimSpace(filter.Component),
"request_id": strings.TrimSpace(filter.RequestID),
"client_request_id": strings.TrimSpace(filter.ClientRequestID),
"platform": strings.TrimSpace(filter.Platform),
"model": strings.TrimSpace(filter.Model),
"query": strings.TrimSpace(filter.Query),
}
if filter.UserID != nil {
payload["user_id"] = *filter.UserID
}
if filter.AccountID != nil {
payload["account_id"] = *filter.AccountID
}
if filter.StartTime != nil && !filter.StartTime.IsZero() {
payload["start_time"] = filter.StartTime.UTC().Format(time.RFC3339Nano)
}
if filter.EndTime != nil && !filter.EndTime.IsZero() {
payload["end_time"] = filter.EndTime.UTC().Format(time.RFC3339Nano)
}
raw, err := json.Marshal(payload)
if err != nil {
return "{}"
}
return string(raw)
}
func (s *OpsService) GetSystemLogSinkHealth() OpsSystemLogSinkHealth {
if s == nil || s.systemLogSink == nil {
return OpsSystemLogSinkHealth{}
}
return s.systemLogSink.Health()
}
package service
import (
"context"
"database/sql"
"errors"
"strings"
"testing"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
)
func TestOpsServiceListSystemLogs_DefaultClampAndSuccess(t *testing.T) {
var gotFilter *OpsSystemLogFilter
repo := &opsRepoMock{
ListSystemLogsFn: func(ctx context.Context, filter *OpsSystemLogFilter) (*OpsSystemLogList, error) {
gotFilter = filter
return &OpsSystemLogList{
Logs: []*OpsSystemLog{{ID: 1, Level: "warn", Message: "x"}},
Total: 1,
Page: filter.Page,
PageSize: filter.PageSize,
}, nil
},
}
svc := NewOpsService(repo, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
out, err := svc.ListSystemLogs(context.Background(), &OpsSystemLogFilter{
Page: 0,
PageSize: 999,
})
if err != nil {
t.Fatalf("ListSystemLogs() error: %v", err)
}
if gotFilter == nil {
t.Fatalf("expected repository to receive filter")
}
if gotFilter.Page != 1 || gotFilter.PageSize != 200 {
t.Fatalf("filter normalized unexpectedly: page=%d pageSize=%d", gotFilter.Page, gotFilter.PageSize)
}
if out.Total != 1 || len(out.Logs) != 1 {
t.Fatalf("unexpected result: %+v", out)
}
}
func TestOpsServiceListSystemLogs_MonitoringDisabled(t *testing.T) {
svc := NewOpsService(
&opsRepoMock{},
nil,
&config.Config{Ops: config.OpsConfig{Enabled: false}},
nil, nil, nil, nil, nil, nil, nil, nil,
)
_, err := svc.ListSystemLogs(context.Background(), &OpsSystemLogFilter{})
if err == nil {
t.Fatalf("expected disabled error")
}
}
func TestOpsServiceListSystemLogs_NilRepoReturnsEmpty(t *testing.T) {
svc := NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
out, err := svc.ListSystemLogs(context.Background(), nil)
if err != nil {
t.Fatalf("ListSystemLogs() error: %v", err)
}
if out == nil || out.Page != 1 || out.PageSize != 50 || out.Total != 0 || len(out.Logs) != 0 {
t.Fatalf("unexpected nil-repo result: %+v", out)
}
}
func TestOpsServiceListSystemLogs_RepoErrorMapped(t *testing.T) {
repo := &opsRepoMock{
ListSystemLogsFn: func(ctx context.Context, filter *OpsSystemLogFilter) (*OpsSystemLogList, error) {
return nil, errors.New("db down")
},
}
svc := NewOpsService(repo, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
_, err := svc.ListSystemLogs(context.Background(), &OpsSystemLogFilter{})
if err == nil {
t.Fatalf("expected mapped internal error")
}
if !strings.Contains(err.Error(), "OPS_SYSTEM_LOG_LIST_FAILED") {
t.Fatalf("unexpected error: %v", err)
}
}
func TestOpsServiceCleanupSystemLogs_SuccessAndAudit(t *testing.T) {
var audit *OpsSystemLogCleanupAudit
repo := &opsRepoMock{
DeleteSystemLogsFn: func(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error) {
return 3, nil
},
InsertSystemLogCleanupAuditFn: func(ctx context.Context, input *OpsSystemLogCleanupAudit) error {
audit = input
return nil
},
}
svc := NewOpsService(repo, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
userID := int64(7)
now := time.Now().UTC()
filter := &OpsSystemLogCleanupFilter{
StartTime: &now,
Level: "warn",
RequestID: "req-1",
ClientRequestID: "creq-1",
UserID: &userID,
Query: "timeout",
}
deleted, err := svc.CleanupSystemLogs(context.Background(), filter, 99)
if err != nil {
t.Fatalf("CleanupSystemLogs() error: %v", err)
}
if deleted != 3 {
t.Fatalf("deleted=%d, want 3", deleted)
}
if audit == nil {
t.Fatalf("expected cleanup audit")
}
if !strings.Contains(audit.Conditions, `"client_request_id":"creq-1"`) {
t.Fatalf("audit conditions should include client_request_id: %s", audit.Conditions)
}
if !strings.Contains(audit.Conditions, `"user_id":7`) {
t.Fatalf("audit conditions should include user_id: %s", audit.Conditions)
}
}
func TestOpsServiceCleanupSystemLogs_RepoUnavailableAndInvalidOperator(t *testing.T) {
svc := NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
if _, err := svc.CleanupSystemLogs(context.Background(), &OpsSystemLogCleanupFilter{RequestID: "r"}, 1); err == nil {
t.Fatalf("expected repo unavailable error")
}
svc = NewOpsService(&opsRepoMock{}, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
if _, err := svc.CleanupSystemLogs(context.Background(), &OpsSystemLogCleanupFilter{RequestID: "r"}, 0); err == nil {
t.Fatalf("expected invalid operator error")
}
}
func TestOpsServiceCleanupSystemLogs_FilterRequired(t *testing.T) {
repo := &opsRepoMock{
DeleteSystemLogsFn: func(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error) {
return 0, errors.New("cleanup requires at least one filter condition")
},
}
svc := NewOpsService(repo, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
_, err := svc.CleanupSystemLogs(context.Background(), &OpsSystemLogCleanupFilter{}, 1)
if err == nil {
t.Fatalf("expected filter required error")
}
if !strings.Contains(strings.ToLower(err.Error()), "filter") {
t.Fatalf("unexpected error: %v", err)
}
}
func TestOpsServiceCleanupSystemLogs_InvalidRange(t *testing.T) {
repo := &opsRepoMock{}
svc := NewOpsService(repo, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
start := time.Now().UTC()
end := start.Add(-time.Hour)
_, err := svc.CleanupSystemLogs(context.Background(), &OpsSystemLogCleanupFilter{
StartTime: &start,
EndTime: &end,
}, 1)
if err == nil {
t.Fatalf("expected invalid range error")
}
}
func TestOpsServiceCleanupSystemLogs_NoRowsAndInternalError(t *testing.T) {
repo := &opsRepoMock{
DeleteSystemLogsFn: func(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error) {
return 0, sql.ErrNoRows
},
}
svc := NewOpsService(repo, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
deleted, err := svc.CleanupSystemLogs(context.Background(), &OpsSystemLogCleanupFilter{
RequestID: "req-1",
}, 1)
if err != nil || deleted != 0 {
t.Fatalf("expected no rows shortcut, deleted=%d err=%v", deleted, err)
}
repo.DeleteSystemLogsFn = func(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error) {
return 0, errors.New("boom")
}
if _, err := svc.CleanupSystemLogs(context.Background(), &OpsSystemLogCleanupFilter{
RequestID: "req-1",
}, 1); err == nil {
t.Fatalf("expected internal cleanup error")
}
}
func TestOpsServiceCleanupSystemLogs_AuditFailureIgnored(t *testing.T) {
repo := &opsRepoMock{
DeleteSystemLogsFn: func(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error) {
return 5, nil
},
InsertSystemLogCleanupAuditFn: func(ctx context.Context, input *OpsSystemLogCleanupAudit) error {
return errors.New("audit down")
},
}
svc := NewOpsService(repo, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
deleted, err := svc.CleanupSystemLogs(context.Background(), &OpsSystemLogCleanupFilter{
RequestID: "r1",
}, 1)
if err != nil || deleted != 5 {
t.Fatalf("audit failure should not break cleanup, deleted=%d err=%v", deleted, err)
}
}
func TestMarshalSystemLogCleanupConditions_NilAndMarshalError(t *testing.T) {
if got := marshalSystemLogCleanupConditions(nil); got != "{}" {
t.Fatalf("nil filter should return {}, got %s", got)
}
now := time.Now().UTC()
userID := int64(1)
filter := &OpsSystemLogCleanupFilter{
StartTime: &now,
EndTime: &now,
UserID: &userID,
}
got := marshalSystemLogCleanupConditions(filter)
if !strings.Contains(got, `"start_time"`) || !strings.Contains(got, `"user_id":1`) {
t.Fatalf("unexpected marshal payload: %s", got)
}
}
func TestOpsServiceGetSystemLogSinkHealth(t *testing.T) {
svc := NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
health := svc.GetSystemLogSinkHealth()
if health.QueueCapacity != 0 || health.QueueDepth != 0 {
t.Fatalf("unexpected health for nil sink: %+v", health)
}
sink := NewOpsSystemLogSink(&opsRepoMock{})
svc = NewOpsService(&opsRepoMock{}, nil, nil, nil, nil, nil, nil, nil, nil, nil, sink)
health = svc.GetSystemLogSinkHealth()
if health.QueueCapacity <= 0 {
t.Fatalf("expected non-zero queue capacity: %+v", health)
}
}
package service
import (
"context"
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/util/logredact"
)
type OpsSystemLogSinkHealth struct {
QueueDepth int64 `json:"queue_depth"`
QueueCapacity int64 `json:"queue_capacity"`
DroppedCount uint64 `json:"dropped_count"`
WriteFailed uint64 `json:"write_failed_count"`
WrittenCount uint64 `json:"written_count"`
AvgWriteDelayMs uint64 `json:"avg_write_delay_ms"`
LastError string `json:"last_error"`
}
type OpsSystemLogSink struct {
opsRepo OpsRepository
queue chan *logger.LogEvent
batchSize int
flushInterval time.Duration
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
droppedCount uint64
writeFailed uint64
writtenCount uint64
totalDelayNs uint64
lastError atomic.Value
}
func NewOpsSystemLogSink(opsRepo OpsRepository) *OpsSystemLogSink {
ctx, cancel := context.WithCancel(context.Background())
s := &OpsSystemLogSink{
opsRepo: opsRepo,
queue: make(chan *logger.LogEvent, 5000),
batchSize: 200,
flushInterval: time.Second,
ctx: ctx,
cancel: cancel,
}
s.lastError.Store("")
return s
}
func (s *OpsSystemLogSink) Start() {
if s == nil || s.opsRepo == nil {
return
}
s.wg.Add(1)
go s.run()
}
func (s *OpsSystemLogSink) Stop() {
if s == nil {
return
}
s.cancel()
s.wg.Wait()
}
func (s *OpsSystemLogSink) WriteLogEvent(event *logger.LogEvent) {
if s == nil || event == nil || !s.shouldIndex(event) {
return
}
if s.ctx != nil {
select {
case <-s.ctx.Done():
return
default:
}
}
select {
case s.queue <- event:
default:
atomic.AddUint64(&s.droppedCount, 1)
}
}
func (s *OpsSystemLogSink) shouldIndex(event *logger.LogEvent) bool {
level := strings.ToLower(strings.TrimSpace(event.Level))
switch level {
case "warn", "warning", "error", "fatal", "panic", "dpanic":
return true
}
component := strings.ToLower(strings.TrimSpace(event.Component))
// zap 的 LoggerName 往往为空或不等于业务组件名;业务组件名通常以字段 component 透传。
if event.Fields != nil {
if fc := strings.ToLower(strings.TrimSpace(asString(event.Fields["component"]))); fc != "" {
component = fc
}
}
if strings.Contains(component, "http.access") {
return true
}
if strings.Contains(component, "audit") {
return true
}
return false
}
func (s *OpsSystemLogSink) run() {
defer s.wg.Done()
ticker := time.NewTicker(s.flushInterval)
defer ticker.Stop()
batch := make([]*logger.LogEvent, 0, s.batchSize)
flush := func(baseCtx context.Context) {
if len(batch) == 0 {
return
}
started := time.Now()
inserted, err := s.flushBatch(baseCtx, batch)
delay := time.Since(started)
if err != nil {
atomic.AddUint64(&s.writeFailed, uint64(len(batch)))
s.lastError.Store(err.Error())
_, _ = fmt.Fprintf(os.Stderr, "time=%s level=WARN msg=\"ops system log sink flush failed\" err=%v batch=%d\n",
time.Now().Format(time.RFC3339Nano), err, len(batch),
)
} else {
atomic.AddUint64(&s.writtenCount, uint64(inserted))
atomic.AddUint64(&s.totalDelayNs, uint64(delay.Nanoseconds()))
s.lastError.Store("")
}
batch = batch[:0]
}
drainAndFlush := func() {
for {
select {
case item := <-s.queue:
if item == nil {
continue
}
batch = append(batch, item)
if len(batch) >= s.batchSize {
flush(context.Background())
}
default:
flush(context.Background())
return
}
}
}
for {
select {
case <-s.ctx.Done():
drainAndFlush()
return
case item := <-s.queue:
if item == nil {
continue
}
batch = append(batch, item)
if len(batch) >= s.batchSize {
flush(s.ctx)
}
case <-ticker.C:
flush(s.ctx)
}
}
}
func (s *OpsSystemLogSink) flushBatch(baseCtx context.Context, batch []*logger.LogEvent) (int, error) {
inputs := make([]*OpsInsertSystemLogInput, 0, len(batch))
for _, event := range batch {
if event == nil {
continue
}
createdAt := event.Time.UTC()
if createdAt.IsZero() {
createdAt = time.Now().UTC()
}
fields := copyMap(event.Fields)
requestID := asString(fields["request_id"])
clientRequestID := asString(fields["client_request_id"])
platform := asString(fields["platform"])
model := asString(fields["model"])
component := strings.TrimSpace(event.Component)
if fieldComponent := asString(fields["component"]); fieldComponent != "" {
component = fieldComponent
}
if component == "" {
component = "app"
}
userID := asInt64Ptr(fields["user_id"])
accountID := asInt64Ptr(fields["account_id"])
// 统一脱敏后写入索引。
message := logredact.RedactText(strings.TrimSpace(event.Message))
redactedExtra := logredact.RedactMap(fields)
extraJSONBytes, _ := json.Marshal(redactedExtra)
extraJSON := string(extraJSONBytes)
if strings.TrimSpace(extraJSON) == "" {
extraJSON = "{}"
}
inputs = append(inputs, &OpsInsertSystemLogInput{
CreatedAt: createdAt,
Level: strings.ToLower(strings.TrimSpace(event.Level)),
Component: component,
Message: message,
RequestID: requestID,
ClientRequestID: clientRequestID,
UserID: userID,
AccountID: accountID,
Platform: platform,
Model: model,
ExtraJSON: extraJSON,
})
}
if len(inputs) == 0 {
return 0, nil
}
if baseCtx == nil || baseCtx.Err() != nil {
baseCtx = context.Background()
}
ctx, cancel := context.WithTimeout(baseCtx, 5*time.Second)
defer cancel()
inserted, err := s.opsRepo.BatchInsertSystemLogs(ctx, inputs)
if err != nil {
return 0, err
}
return int(inserted), nil
}
func (s *OpsSystemLogSink) Health() OpsSystemLogSinkHealth {
if s == nil {
return OpsSystemLogSinkHealth{}
}
written := atomic.LoadUint64(&s.writtenCount)
totalDelay := atomic.LoadUint64(&s.totalDelayNs)
var avgDelay uint64
if written > 0 {
avgDelay = (totalDelay / written) / uint64(time.Millisecond)
}
lastErr, _ := s.lastError.Load().(string)
return OpsSystemLogSinkHealth{
QueueDepth: int64(len(s.queue)),
QueueCapacity: int64(cap(s.queue)),
DroppedCount: atomic.LoadUint64(&s.droppedCount),
WriteFailed: atomic.LoadUint64(&s.writeFailed),
WrittenCount: written,
AvgWriteDelayMs: avgDelay,
LastError: strings.TrimSpace(lastErr),
}
}
func copyMap(in map[string]any) map[string]any {
if len(in) == 0 {
return map[string]any{}
}
out := make(map[string]any, len(in))
for k, v := range in {
out[k] = v
}
return out
}
func asString(v any) string {
switch t := v.(type) {
case string:
return strings.TrimSpace(t)
case fmt.Stringer:
return strings.TrimSpace(t.String())
default:
return ""
}
}
func asInt64Ptr(v any) *int64 {
switch t := v.(type) {
case int:
n := int64(t)
if n <= 0 {
return nil
}
return &n
case int64:
n := t
if n <= 0 {
return nil
}
return &n
case float64:
n := int64(t)
if n <= 0 {
return nil
}
return &n
case json.Number:
if n, err := t.Int64(); err == nil {
if n <= 0 {
return nil
}
return &n
}
case string:
raw := strings.TrimSpace(t)
if raw == "" {
return nil
}
if n, err := strconv.ParseInt(raw, 10, 64); err == nil {
if n <= 0 {
return nil
}
return &n
}
}
return nil
}
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
)
func TestOpsSystemLogSink_ShouldIndex(t *testing.T) {
sink := &OpsSystemLogSink{}
cases := []struct {
name string
event *logger.LogEvent
want bool
}{
{
name: "warn level",
event: &logger.LogEvent{Level: "warn", Component: "app"},
want: true,
},
{
name: "error level",
event: &logger.LogEvent{Level: "error", Component: "app"},
want: true,
},
{
name: "access component",
event: &logger.LogEvent{Level: "info", Component: "http.access"},
want: true,
},
{
name: "access component from fields (real zap path)",
event: &logger.LogEvent{
Level: "info",
Component: "",
Fields: map[string]any{"component": "http.access"},
},
want: true,
},
{
name: "audit component",
event: &logger.LogEvent{Level: "info", Component: "audit.log_config_change"},
want: true,
},
{
name: "audit component from fields (real zap path)",
event: &logger.LogEvent{
Level: "info",
Component: "",
Fields: map[string]any{"component": "audit.log_config_change"},
},
want: true,
},
{
name: "plain info",
event: &logger.LogEvent{Level: "info", Component: "app"},
want: false,
},
}
for _, tc := range cases {
if got := sink.shouldIndex(tc.event); got != tc.want {
t.Fatalf("%s: shouldIndex()=%v, want %v", tc.name, got, tc.want)
}
}
}
func TestOpsSystemLogSink_WriteLogEvent_ShouldDropWhenQueueFull(t *testing.T) {
sink := &OpsSystemLogSink{
queue: make(chan *logger.LogEvent, 1),
}
sink.WriteLogEvent(&logger.LogEvent{Level: "warn", Component: "app"})
sink.WriteLogEvent(&logger.LogEvent{Level: "warn", Component: "app"})
if got := len(sink.queue); got != 1 {
t.Fatalf("queue len = %d, want 1", got)
}
if dropped := atomic.LoadUint64(&sink.droppedCount); dropped != 1 {
t.Fatalf("droppedCount = %d, want 1", dropped)
}
}
func TestOpsSystemLogSink_Health(t *testing.T) {
sink := &OpsSystemLogSink{
queue: make(chan *logger.LogEvent, 10),
}
sink.lastError.Store("db timeout")
atomic.StoreUint64(&sink.droppedCount, 3)
atomic.StoreUint64(&sink.writeFailed, 2)
atomic.StoreUint64(&sink.writtenCount, 5)
atomic.StoreUint64(&sink.totalDelayNs, uint64(5000000)) // 5ms total -> avg 1ms
sink.queue <- &logger.LogEvent{Level: "warn", Component: "app"}
sink.queue <- &logger.LogEvent{Level: "warn", Component: "app"}
health := sink.Health()
if health.QueueDepth != 2 {
t.Fatalf("queue depth = %d, want 2", health.QueueDepth)
}
if health.QueueCapacity != 10 {
t.Fatalf("queue capacity = %d, want 10", health.QueueCapacity)
}
if health.DroppedCount != 3 {
t.Fatalf("dropped = %d, want 3", health.DroppedCount)
}
if health.WriteFailed != 2 {
t.Fatalf("write failed = %d, want 2", health.WriteFailed)
}
if health.WrittenCount != 5 {
t.Fatalf("written = %d, want 5", health.WrittenCount)
}
if health.AvgWriteDelayMs != 1 {
t.Fatalf("avg delay ms = %d, want 1", health.AvgWriteDelayMs)
}
if health.LastError != "db timeout" {
t.Fatalf("last error = %q, want db timeout", health.LastError)
}
}
func TestOpsSystemLogSink_StartStopAndFlushSuccess(t *testing.T) {
done := make(chan struct{}, 1)
var captured []*OpsInsertSystemLogInput
repo := &opsRepoMock{
BatchInsertSystemLogsFn: func(_ context.Context, inputs []*OpsInsertSystemLogInput) (int64, error) {
captured = append(captured, inputs...)
select {
case done <- struct{}{}:
default:
}
return int64(len(inputs)), nil
},
}
sink := NewOpsSystemLogSink(repo)
sink.batchSize = 1
sink.flushInterval = 10 * time.Millisecond
sink.Start()
defer sink.Stop()
sink.WriteLogEvent(&logger.LogEvent{
Time: time.Now().UTC(),
Level: "warn",
Component: "http.access",
Message: `authorization="Bearer sk-test-123"`,
Fields: map[string]any{
"component": "http.access",
"request_id": "req-1",
"client_request_id": "creq-1",
"user_id": "12",
"account_id": json.Number("34"),
"platform": "openai",
"model": "gpt-5",
},
})
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for sink flush")
}
if len(captured) != 1 {
t.Fatalf("captured len = %d, want 1", len(captured))
}
item := captured[0]
if item.RequestID != "req-1" || item.ClientRequestID != "creq-1" {
t.Fatalf("unexpected request ids: %+v", item)
}
if item.UserID == nil || *item.UserID != 12 {
t.Fatalf("unexpected user_id: %+v", item.UserID)
}
if item.AccountID == nil || *item.AccountID != 34 {
t.Fatalf("unexpected account_id: %+v", item.AccountID)
}
if strings.TrimSpace(item.Message) == "" {
t.Fatalf("message should not be empty")
}
health := sink.Health()
if health.WrittenCount == 0 {
t.Fatalf("written_count should be >0")
}
}
func TestOpsSystemLogSink_FlushFailureUpdatesHealth(t *testing.T) {
repo := &opsRepoMock{
BatchInsertSystemLogsFn: func(_ context.Context, inputs []*OpsInsertSystemLogInput) (int64, error) {
return 0, errors.New("db unavailable")
},
}
sink := NewOpsSystemLogSink(repo)
sink.batchSize = 1
sink.flushInterval = 10 * time.Millisecond
sink.Start()
defer sink.Stop()
sink.WriteLogEvent(&logger.LogEvent{
Time: time.Now().UTC(),
Level: "warn",
Component: "app",
Message: "boom",
Fields: map[string]any{},
})
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
health := sink.Health()
if health.WriteFailed > 0 {
if !strings.Contains(health.LastError, "db unavailable") {
t.Fatalf("unexpected last error: %s", health.LastError)
}
return
}
time.Sleep(20 * time.Millisecond)
}
t.Fatalf("write_failed_count not updated")
}
func TestOpsSystemLogSink_StopFlushUsesActiveContextAndDrainsQueue(t *testing.T) {
var inserted int64
var canceledCtxCalls int64
repo := &opsRepoMock{
BatchInsertSystemLogsFn: func(ctx context.Context, inputs []*OpsInsertSystemLogInput) (int64, error) {
if err := ctx.Err(); err != nil {
atomic.AddInt64(&canceledCtxCalls, 1)
return 0, err
}
atomic.AddInt64(&inserted, int64(len(inputs)))
return int64(len(inputs)), nil
},
}
sink := NewOpsSystemLogSink(repo)
sink.batchSize = 200
sink.flushInterval = time.Hour
sink.Start()
sink.WriteLogEvent(&logger.LogEvent{
Time: time.Now().UTC(),
Level: "warn",
Component: "app",
Message: "pending-on-shutdown",
Fields: map[string]any{"component": "http.access"},
})
sink.Stop()
if got := atomic.LoadInt64(&inserted); got != 1 {
t.Fatalf("inserted = %d, want 1", got)
}
if got := atomic.LoadInt64(&canceledCtxCalls); got != 0 {
t.Fatalf("canceled ctx calls = %d, want 0", got)
}
health := sink.Health()
if health.WrittenCount != 1 {
t.Fatalf("written_count = %d, want 1", health.WrittenCount)
}
}
type stringerValue string
func (s stringerValue) String() string { return string(s) }
func TestOpsSystemLogSink_HelperFunctions(t *testing.T) {
src := map[string]any{"a": 1}
cloned := copyMap(src)
src["a"] = 2
v, ok := cloned["a"].(int)
if !ok || v != 1 {
t.Fatalf("copyMap should create copy")
}
if got := asString(stringerValue(" hello ")); got != "hello" {
t.Fatalf("asString stringer = %q", got)
}
if got := asString(fmt.Errorf("x")); got != "" {
t.Fatalf("asString error should be empty, got %q", got)
}
if got := asString(123); got != "" {
t.Fatalf("asString non-string should be empty, got %q", got)
}
cases := []struct {
in any
want int64
ok bool
}{
{in: 5, want: 5, ok: true},
{in: int64(6), want: 6, ok: true},
{in: float64(7), want: 7, ok: true},
{in: json.Number("8"), want: 8, ok: true},
{in: "9", want: 9, ok: true},
{in: "0", ok: false},
{in: -1, ok: false},
{in: "abc", ok: false},
}
for _, tc := range cases {
got := asInt64Ptr(tc.in)
if tc.ok {
if got == nil || *got != tc.want {
t.Fatalf("asInt64Ptr(%v) = %+v, want %d", tc.in, got, tc.want)
}
} else if got != nil {
t.Fatalf("asInt64Ptr(%v) should be nil, got %d", tc.in, *got)
}
}
}
......@@ -6,7 +6,6 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"regexp"
......@@ -15,6 +14,7 @@ import (
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/pkg/openai"
"github.com/Wei-Shaw/sub2api/internal/util/urlvalidator"
)
......@@ -84,12 +84,12 @@ func NewPricingService(cfg *config.Config, remoteClient PricingRemoteClient) *Pr
func (s *PricingService) Initialize() error {
// 确保数据目录存在
if err := os.MkdirAll(s.cfg.Pricing.DataDir, 0755); err != nil {
log.Printf("[Pricing] Failed to create data directory: %v", err)
logger.LegacyPrintf("service.pricing", "[Pricing] Failed to create data directory: %v", err)
}
// 首次加载价格数据
if err := s.checkAndUpdatePricing(); err != nil {
log.Printf("[Pricing] Initial load failed, using fallback: %v", err)
logger.LegacyPrintf("service.pricing", "[Pricing] Initial load failed, using fallback: %v", err)
if err := s.useFallbackPricing(); err != nil {
return fmt.Errorf("failed to load pricing data: %w", err)
}
......@@ -98,7 +98,7 @@ func (s *PricingService) Initialize() error {
// 启动定时更新
s.startUpdateScheduler()
log.Printf("[Pricing] Service initialized with %d models", len(s.pricingData))
logger.LegacyPrintf("service.pricing", "[Pricing] Service initialized with %d models", len(s.pricingData))
return nil
}
......@@ -106,7 +106,7 @@ func (s *PricingService) Initialize() error {
func (s *PricingService) Stop() {
close(s.stopCh)
s.wg.Wait()
log.Println("[Pricing] Service stopped")
logger.LegacyPrintf("service.pricing", "%s", "[Pricing] Service stopped")
}
// startUpdateScheduler 启动定时更新调度器
......@@ -127,7 +127,7 @@ func (s *PricingService) startUpdateScheduler() {
select {
case <-ticker.C:
if err := s.syncWithRemote(); err != nil {
log.Printf("[Pricing] Sync failed: %v", err)
logger.LegacyPrintf("service.pricing", "[Pricing] Sync failed: %v", err)
}
case <-s.stopCh:
return
......@@ -135,7 +135,7 @@ func (s *PricingService) startUpdateScheduler() {
}
}()
log.Printf("[Pricing] Update scheduler started (check every %v)", hashInterval)
logger.LegacyPrintf("service.pricing", "[Pricing] Update scheduler started (check every %v)", hashInterval)
}
// checkAndUpdatePricing 检查并更新价格数据
......@@ -144,7 +144,7 @@ func (s *PricingService) checkAndUpdatePricing() error {
// 检查本地文件是否存在
if _, err := os.Stat(pricingFile); os.IsNotExist(err) {
log.Println("[Pricing] Local pricing file not found, downloading...")
logger.LegacyPrintf("service.pricing", "%s", "[Pricing] Local pricing file not found, downloading...")
return s.downloadPricingData()
}
......@@ -158,9 +158,9 @@ func (s *PricingService) checkAndUpdatePricing() error {
maxAge := time.Duration(s.cfg.Pricing.UpdateIntervalHours) * time.Hour
if fileAge > maxAge {
log.Printf("[Pricing] Local file is %v old, updating...", fileAge.Round(time.Hour))
logger.LegacyPrintf("service.pricing", "[Pricing] Local file is %v old, updating...", fileAge.Round(time.Hour))
if err := s.downloadPricingData(); err != nil {
log.Printf("[Pricing] Download failed, using existing file: %v", err)
logger.LegacyPrintf("service.pricing", "[Pricing] Download failed, using existing file: %v", err)
}
}
......@@ -175,7 +175,7 @@ func (s *PricingService) syncWithRemote() error {
// 计算本地文件哈希
localHash, err := s.computeFileHash(pricingFile)
if err != nil {
log.Printf("[Pricing] Failed to compute local hash: %v", err)
logger.LegacyPrintf("service.pricing", "[Pricing] Failed to compute local hash: %v", err)
return s.downloadPricingData()
}
......@@ -183,15 +183,15 @@ func (s *PricingService) syncWithRemote() error {
if s.cfg.Pricing.HashURL != "" {
remoteHash, err := s.fetchRemoteHash()
if err != nil {
log.Printf("[Pricing] Failed to fetch remote hash: %v", err)
logger.LegacyPrintf("service.pricing", "[Pricing] Failed to fetch remote hash: %v", err)
return nil // 哈希获取失败不影响正常使用
}
if remoteHash != localHash {
log.Println("[Pricing] Remote hash differs, downloading new version...")
logger.LegacyPrintf("service.pricing", "%s", "[Pricing] Remote hash differs, downloading new version...")
return s.downloadPricingData()
}
log.Println("[Pricing] Hash check passed, no update needed")
logger.LegacyPrintf("service.pricing", "%s", "[Pricing] Hash check passed, no update needed")
return nil
}
......@@ -205,7 +205,7 @@ func (s *PricingService) syncWithRemote() error {
maxAge := time.Duration(s.cfg.Pricing.UpdateIntervalHours) * time.Hour
if fileAge > maxAge {
log.Printf("[Pricing] File is %v old, downloading...", fileAge.Round(time.Hour))
logger.LegacyPrintf("service.pricing", "[Pricing] File is %v old, downloading...", fileAge.Round(time.Hour))
return s.downloadPricingData()
}
......@@ -218,7 +218,7 @@ func (s *PricingService) downloadPricingData() error {
if err != nil {
return err
}
log.Printf("[Pricing] Downloading from %s", remoteURL)
logger.LegacyPrintf("service.pricing", "[Pricing] Downloading from %s", remoteURL)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
......@@ -252,7 +252,7 @@ func (s *PricingService) downloadPricingData() error {
// 保存到本地文件
pricingFile := s.getPricingFilePath()
if err := os.WriteFile(pricingFile, body, 0644); err != nil {
log.Printf("[Pricing] Failed to save file: %v", err)
logger.LegacyPrintf("service.pricing", "[Pricing] Failed to save file: %v", err)
}
// 保存哈希
......@@ -260,7 +260,7 @@ func (s *PricingService) downloadPricingData() error {
hashStr := hex.EncodeToString(hash[:])
hashFile := s.getHashFilePath()
if err := os.WriteFile(hashFile, []byte(hashStr+"\n"), 0644); err != nil {
log.Printf("[Pricing] Failed to save hash: %v", err)
logger.LegacyPrintf("service.pricing", "[Pricing] Failed to save hash: %v", err)
}
// 更新内存数据
......@@ -270,7 +270,7 @@ func (s *PricingService) downloadPricingData() error {
s.localHash = hashStr
s.mu.Unlock()
log.Printf("[Pricing] Downloaded %d models successfully", len(data))
logger.LegacyPrintf("service.pricing", "[Pricing] Downloaded %d models successfully", len(data))
return nil
}
......@@ -329,7 +329,7 @@ func (s *PricingService) parsePricingData(body []byte) (map[string]*LiteLLMModel
}
if skipped > 0 {
log.Printf("[Pricing] Skipped %d invalid entries", skipped)
logger.LegacyPrintf("service.pricing", "[Pricing] Skipped %d invalid entries", skipped)
}
if len(result) == 0 {
......@@ -368,7 +368,7 @@ func (s *PricingService) loadPricingData(filePath string) error {
}
s.mu.Unlock()
log.Printf("[Pricing] Loaded %d models from %s", len(pricingData), filePath)
logger.LegacyPrintf("service.pricing", "[Pricing] Loaded %d models from %s", len(pricingData), filePath)
return nil
}
......@@ -380,7 +380,7 @@ func (s *PricingService) useFallbackPricing() error {
return fmt.Errorf("fallback file not found: %s", fallbackFile)
}
log.Printf("[Pricing] Using fallback file: %s", fallbackFile)
logger.LegacyPrintf("service.pricing", "[Pricing] Using fallback file: %s", fallbackFile)
// 复制到数据目录
data, err := os.ReadFile(fallbackFile)
......@@ -390,7 +390,7 @@ func (s *PricingService) useFallbackPricing() error {
pricingFile := s.getPricingFilePath()
if err := os.WriteFile(pricingFile, data, 0644); err != nil {
log.Printf("[Pricing] Failed to copy fallback: %v", err)
logger.LegacyPrintf("service.pricing", "[Pricing] Failed to copy fallback: %v", err)
}
return s.loadPricingData(fallbackFile)
......@@ -639,7 +639,7 @@ func (s *PricingService) matchByModelFamily(model string) *LiteLLMModelPricing {
for key, pricing := range s.pricingData {
keyLower := strings.ToLower(key)
if strings.Contains(keyLower, pattern) {
log.Printf("[Pricing] Fuzzy matched %s -> %s", model, key)
logger.LegacyPrintf("service.pricing", "[Pricing] Fuzzy matched %s -> %s", model, key)
return pricing
}
}
......@@ -660,14 +660,14 @@ func (s *PricingService) matchOpenAIModel(model string) *LiteLLMModelPricing {
for _, variant := range variants {
if pricing, ok := s.pricingData[variant]; ok {
log.Printf("[Pricing] OpenAI fallback matched %s -> %s", model, variant)
logger.LegacyPrintf("service.pricing", "[Pricing] OpenAI fallback matched %s -> %s", model, variant)
return pricing
}
}
if strings.HasPrefix(model, "gpt-5.3-codex") {
if pricing, ok := s.pricingData["gpt-5.2-codex"]; ok {
log.Printf("[Pricing] OpenAI fallback matched %s -> %s", model, "gpt-5.2-codex")
logger.LegacyPrintf("service.pricing", "[Pricing] OpenAI fallback matched %s -> %s", model, "gpt-5.2-codex")
return pricing
}
}
......@@ -675,7 +675,7 @@ func (s *PricingService) matchOpenAIModel(model string) *LiteLLMModelPricing {
// 最终回退到 DefaultTestModel
defaultModel := strings.ToLower(openai.DefaultTestModel)
if pricing, ok := s.pricingData[defaultModel]; ok {
log.Printf("[Pricing] OpenAI fallback to default model %s -> %s", model, defaultModel)
logger.LegacyPrintf("service.pricing", "[Pricing] OpenAI fallback to default model %s -> %s", model, defaultModel)
return pricing
}
......
......@@ -4,13 +4,13 @@ import (
"context"
"encoding/json"
"errors"
"log"
"log/slog"
"strconv"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
)
var (
......@@ -104,7 +104,7 @@ func (s *SchedulerSnapshotService) ListSchedulableAccounts(ctx context.Context,
if s.cache != nil {
cached, hit, err := s.cache.GetSnapshot(ctx, bucket)
if err != nil {
log.Printf("[Scheduler] cache read failed: bucket=%s err=%v", bucket.String(), err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] cache read failed: bucket=%s err=%v", bucket.String(), err)
} else if hit {
return derefAccounts(cached), useMixed, nil
}
......@@ -124,7 +124,7 @@ func (s *SchedulerSnapshotService) ListSchedulableAccounts(ctx context.Context,
if s.cache != nil {
if err := s.cache.SetSnapshot(fallbackCtx, bucket, accounts); err != nil {
log.Printf("[Scheduler] cache write failed: bucket=%s err=%v", bucket.String(), err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] cache write failed: bucket=%s err=%v", bucket.String(), err)
}
}
......@@ -138,7 +138,7 @@ func (s *SchedulerSnapshotService) GetAccount(ctx context.Context, accountID int
if s.cache != nil {
account, err := s.cache.GetAccount(ctx, accountID)
if err != nil {
log.Printf("[Scheduler] account cache read failed: id=%d err=%v", accountID, err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] account cache read failed: id=%d err=%v", accountID, err)
} else if account != nil {
return account, nil
}
......@@ -168,17 +168,17 @@ func (s *SchedulerSnapshotService) runInitialRebuild() {
defer cancel()
buckets, err := s.cache.ListBuckets(ctx)
if err != nil {
log.Printf("[Scheduler] list buckets failed: %v", err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] list buckets failed: %v", err)
}
if len(buckets) == 0 {
buckets, err = s.defaultBuckets(ctx)
if err != nil {
log.Printf("[Scheduler] default buckets failed: %v", err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] default buckets failed: %v", err)
return
}
}
if err := s.rebuildBuckets(ctx, buckets, "startup"); err != nil {
log.Printf("[Scheduler] rebuild startup failed: %v", err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] rebuild startup failed: %v", err)
}
}
......@@ -205,7 +205,7 @@ func (s *SchedulerSnapshotService) runFullRebuildWorker(interval time.Duration)
select {
case <-ticker.C:
if err := s.triggerFullRebuild("interval"); err != nil {
log.Printf("[Scheduler] full rebuild failed: %v", err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] full rebuild failed: %v", err)
}
case <-s.stopCh:
return
......@@ -222,13 +222,13 @@ func (s *SchedulerSnapshotService) pollOutbox() {
watermark, err := s.cache.GetOutboxWatermark(ctx)
if err != nil {
log.Printf("[Scheduler] outbox watermark read failed: %v", err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox watermark read failed: %v", err)
return
}
events, err := s.outboxRepo.ListAfter(ctx, watermark, 200)
if err != nil {
log.Printf("[Scheduler] outbox poll failed: %v", err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox poll failed: %v", err)
return
}
if len(events) == 0 {
......@@ -241,14 +241,14 @@ func (s *SchedulerSnapshotService) pollOutbox() {
err := s.handleOutboxEvent(eventCtx, event)
cancel()
if err != nil {
log.Printf("[Scheduler] outbox handle failed: id=%d type=%s err=%v", event.ID, event.EventType, err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox handle failed: id=%d type=%s err=%v", event.ID, event.EventType, err)
return
}
}
lastID := events[len(events)-1].ID
if err := s.cache.SetOutboxWatermark(ctx, lastID); err != nil {
log.Printf("[Scheduler] outbox watermark write failed: %v", err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox watermark write failed: %v", err)
} else {
watermarkForCheck = lastID
}
......@@ -445,11 +445,11 @@ func (s *SchedulerSnapshotService) rebuildBucket(ctx context.Context, bucket Sch
accounts, err := s.loadAccountsFromDB(rebuildCtx, bucket, bucket.Mode == SchedulerModeMixed)
if err != nil {
log.Printf("[Scheduler] rebuild failed: bucket=%s reason=%s err=%v", bucket.String(), reason, err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] rebuild failed: bucket=%s reason=%s err=%v", bucket.String(), reason, err)
return err
}
if err := s.cache.SetSnapshot(rebuildCtx, bucket, accounts); err != nil {
log.Printf("[Scheduler] rebuild cache failed: bucket=%s reason=%s err=%v", bucket.String(), reason, err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] rebuild cache failed: bucket=%s reason=%s err=%v", bucket.String(), reason, err)
return err
}
slog.Debug("[Scheduler] rebuild ok", "bucket", bucket.String(), "reason", reason, "size", len(accounts))
......@@ -465,13 +465,13 @@ func (s *SchedulerSnapshotService) triggerFullRebuild(reason string) error {
buckets, err := s.cache.ListBuckets(ctx)
if err != nil {
log.Printf("[Scheduler] list buckets failed: %v", err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] list buckets failed: %v", err)
return err
}
if len(buckets) == 0 {
buckets, err = s.defaultBuckets(ctx)
if err != nil {
log.Printf("[Scheduler] default buckets failed: %v", err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] default buckets failed: %v", err)
return err
}
}
......@@ -485,7 +485,7 @@ func (s *SchedulerSnapshotService) checkOutboxLag(ctx context.Context, oldest Sc
lag := time.Since(oldest.CreatedAt)
if lagSeconds := int(lag.Seconds()); lagSeconds >= s.cfg.Gateway.Scheduling.OutboxLagWarnSeconds && s.cfg.Gateway.Scheduling.OutboxLagWarnSeconds > 0 {
log.Printf("[Scheduler] outbox lag warning: %ds", lagSeconds)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox lag warning: %ds", lagSeconds)
}
if s.cfg.Gateway.Scheduling.OutboxLagRebuildSeconds > 0 && int(lag.Seconds()) >= s.cfg.Gateway.Scheduling.OutboxLagRebuildSeconds {
......@@ -495,12 +495,12 @@ func (s *SchedulerSnapshotService) checkOutboxLag(ctx context.Context, oldest Sc
s.lagMu.Unlock()
if failures >= s.cfg.Gateway.Scheduling.OutboxLagRebuildFailures {
log.Printf("[Scheduler] outbox lag rebuild triggered: lag=%s failures=%d", lag, failures)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox lag rebuild triggered: lag=%s failures=%d", lag, failures)
s.lagMu.Lock()
s.lagFailures = 0
s.lagMu.Unlock()
if err := s.triggerFullRebuild("outbox_lag"); err != nil {
log.Printf("[Scheduler] outbox lag rebuild failed: %v", err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox lag rebuild failed: %v", err)
}
}
} else {
......@@ -518,9 +518,9 @@ func (s *SchedulerSnapshotService) checkOutboxLag(ctx context.Context, oldest Sc
return
}
if maxID-watermark >= int64(threshold) {
log.Printf("[Scheduler] outbox backlog rebuild triggered: backlog=%d", maxID-watermark)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox backlog rebuild triggered: backlog=%d", maxID-watermark)
if err := s.triggerFullRebuild("outbox_backlog"); err != nil {
log.Printf("[Scheduler] outbox backlog rebuild failed: %v", err)
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox backlog rebuild failed: %v", err)
}
}
}
......
package service
import (
"log"
"os"
"path/filepath"
"strings"
......@@ -9,6 +8,7 @@ import (
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/robfig/cron/v3"
)
......@@ -37,18 +37,18 @@ func (s *SoraMediaCleanupService) Start() {
return
}
if !s.cfg.Sora.Storage.Cleanup.Enabled {
log.Printf("[SoraCleanup] not started (disabled)")
logger.LegacyPrintf("service.sora_media_cleanup", "[SoraCleanup] not started (disabled)")
return
}
if s.storage == nil || !s.storage.Enabled() {
log.Printf("[SoraCleanup] not started (storage disabled)")
logger.LegacyPrintf("service.sora_media_cleanup", "[SoraCleanup] not started (storage disabled)")
return
}
s.startOnce.Do(func() {
schedule := strings.TrimSpace(s.cfg.Sora.Storage.Cleanup.Schedule)
if schedule == "" {
log.Printf("[SoraCleanup] not started (empty schedule)")
logger.LegacyPrintf("service.sora_media_cleanup", "[SoraCleanup] not started (empty schedule)")
return
}
loc := time.Local
......@@ -59,12 +59,12 @@ func (s *SoraMediaCleanupService) Start() {
}
c := cron.New(cron.WithParser(soraCleanupCronParser), cron.WithLocation(loc))
if _, err := c.AddFunc(schedule, func() { s.runCleanup() }); err != nil {
log.Printf("[SoraCleanup] not started (invalid schedule=%q): %v", schedule, err)
logger.LegacyPrintf("service.sora_media_cleanup", "[SoraCleanup] not started (invalid schedule=%q): %v", schedule, err)
return
}
s.cron = c
s.cron.Start()
log.Printf("[SoraCleanup] started (schedule=%q tz=%s)", schedule, loc.String())
logger.LegacyPrintf("service.sora_media_cleanup", "[SoraCleanup] started (schedule=%q tz=%s)", schedule, loc.String())
})
}
......@@ -78,7 +78,7 @@ func (s *SoraMediaCleanupService) Stop() {
select {
case <-ctx.Done():
case <-time.After(3 * time.Second):
log.Printf("[SoraCleanup] cron stop timed out")
logger.LegacyPrintf("service.sora_media_cleanup", "[SoraCleanup] cron stop timed out")
}
}
})
......@@ -90,7 +90,7 @@ func (s *SoraMediaCleanupService) runCleanup() {
}
retention := s.cfg.Sora.Storage.Cleanup.RetentionDays
if retention <= 0 {
log.Printf("[SoraCleanup] skipped (retention_days=%d)", retention)
logger.LegacyPrintf("service.sora_media_cleanup", "[SoraCleanup] skipped (retention_days=%d)", retention)
return
}
cutoff := time.Now().AddDate(0, 0, -retention)
......@@ -116,5 +116,5 @@ func (s *SoraMediaCleanupService) runCleanup() {
return nil
})
}
log.Printf("[SoraCleanup] cleanup finished, deleted=%d", deleted)
logger.LegacyPrintf("service.sora_media_cleanup", "[SoraCleanup] cleanup finished, deleted=%d", deleted)
}
......@@ -2,10 +2,10 @@ package service
import (
"fmt"
"log"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/zeromicro/go-zero/core/collection"
)
......@@ -34,21 +34,21 @@ func NewTimingWheelService() (*TimingWheelService, error) {
// Start starts the timing wheel
func (s *TimingWheelService) Start() {
log.Println("[TimingWheel] Started (auto-start by go-zero)")
logger.LegacyPrintf("service.timing_wheel", "%s", "[TimingWheel] Started (auto-start by go-zero)")
}
// Stop stops the timing wheel
func (s *TimingWheelService) Stop() {
s.stopOnce.Do(func() {
s.tw.Stop()
log.Println("[TimingWheel] Stopped")
logger.LegacyPrintf("service.timing_wheel", "%s", "[TimingWheel] Stopped")
})
}
// Schedule schedules a one-time task
func (s *TimingWheelService) Schedule(name string, delay time.Duration, fn func()) {
if err := s.tw.SetTimer(name, fn, delay); err != nil {
log.Printf("[TimingWheel] SetTimer failed for %q: %v", name, err)
logger.LegacyPrintf("service.timing_wheel", "[TimingWheel] SetTimer failed for %q: %v", name, err)
}
}
......@@ -58,11 +58,11 @@ func (s *TimingWheelService) ScheduleRecurring(name string, interval time.Durati
schedule = func() {
fn()
if err := s.tw.SetTimer(name, schedule, interval); err != nil {
log.Printf("[TimingWheel] recurring SetTimer failed for %q: %v", name, err)
logger.LegacyPrintf("service.timing_wheel", "[TimingWheel] recurring SetTimer failed for %q: %v", name, err)
}
}
if err := s.tw.SetTimer(name, schedule, interval); err != nil {
log.Printf("[TimingWheel] initial SetTimer failed for %q: %v", name, err)
logger.LegacyPrintf("service.timing_wheel", "[TimingWheel] initial SetTimer failed for %q: %v", name, err)
}
}
......
......@@ -3,7 +3,6 @@ package service
import (
"context"
"fmt"
"log"
"log/slog"
"strings"
"sync"
......@@ -70,22 +69,24 @@ func (s *TokenRefreshService) SetSoraAccountRepo(repo SoraAccountRepository) {
// Start 启动后台刷新服务
func (s *TokenRefreshService) Start() {
if !s.cfg.Enabled {
log.Println("[TokenRefresh] Service disabled by configuration")
slog.Info("token_refresh.service_disabled")
return
}
s.wg.Add(1)
go s.refreshLoop()
log.Printf("[TokenRefresh] Service started (check every %d minutes, refresh %v hours before expiry)",
s.cfg.CheckIntervalMinutes, s.cfg.RefreshBeforeExpiryHours)
slog.Info("token_refresh.service_started",
"check_interval_minutes", s.cfg.CheckIntervalMinutes,
"refresh_before_expiry_hours", s.cfg.RefreshBeforeExpiryHours,
)
}
// Stop 停止刷新服务
func (s *TokenRefreshService) Stop() {
close(s.stopCh)
s.wg.Wait()
log.Println("[TokenRefresh] Service stopped")
slog.Info("token_refresh.service_stopped")
}
// refreshLoop 刷新循环
......@@ -124,7 +125,7 @@ func (s *TokenRefreshService) processRefresh() {
// 获取所有active状态的账号
accounts, err := s.listActiveAccounts(ctx)
if err != nil {
log.Printf("[TokenRefresh] Failed to list accounts: %v", err)
slog.Error("token_refresh.list_accounts_failed", "error", err)
return
}
......@@ -153,10 +154,17 @@ func (s *TokenRefreshService) processRefresh() {
// 执行刷新
if err := s.refreshWithRetry(ctx, account, refresher); err != nil {
log.Printf("[TokenRefresh] Account %d (%s) failed: %v", account.ID, account.Name, err)
slog.Warn("token_refresh.account_refresh_failed",
"account_id", account.ID,
"account_name", account.Name,
"error", err,
)
failed++
} else {
log.Printf("[TokenRefresh] Account %d (%s) refreshed successfully", account.ID, account.Name)
slog.Info("token_refresh.account_refreshed",
"account_id", account.ID,
"account_name", account.Name,
)
refreshed++
}
......@@ -167,12 +175,17 @@ func (s *TokenRefreshService) processRefresh() {
// 无刷新活动时降级为 Debug,有实际刷新活动时保持 Info
if needsRefresh == 0 && failed == 0 {
slog.Debug("[TokenRefresh] Cycle complete",
slog.Debug("token_refresh.cycle_completed",
"total", totalAccounts, "oauth", oauthAccounts,
"needs_refresh", needsRefresh, "refreshed", refreshed, "failed", failed)
} else {
log.Printf("[TokenRefresh] Cycle complete: total=%d, oauth=%d, needs_refresh=%d, refreshed=%d, failed=%d",
totalAccounts, oauthAccounts, needsRefresh, refreshed, failed)
slog.Info("token_refresh.cycle_completed",
"total", totalAccounts,
"oauth", oauthAccounts,
"needs_refresh", needsRefresh,
"refreshed", refreshed,
"failed", failed,
)
}
}
......@@ -207,26 +220,35 @@ func (s *TokenRefreshService) refreshWithRetry(ctx context.Context, account *Acc
account.Status == StatusError &&
strings.Contains(account.ErrorMessage, "missing_project_id:") {
if clearErr := s.accountRepo.ClearError(ctx, account.ID); clearErr != nil {
log.Printf("[TokenRefresh] Failed to clear error status for account %d: %v", account.ID, clearErr)
slog.Warn("token_refresh.clear_account_error_failed",
"account_id", account.ID,
"error", clearErr,
)
} else {
log.Printf("[TokenRefresh] Account %d: cleared missing_project_id error", account.ID)
slog.Info("token_refresh.cleared_missing_project_id_error", "account_id", account.ID)
}
}
// 对所有 OAuth 账号调用缓存失效(InvalidateToken 内部根据平台判断是否需要处理)
if s.cacheInvalidator != nil && account.Type == AccountTypeOAuth {
if err := s.cacheInvalidator.InvalidateToken(ctx, account); err != nil {
log.Printf("[TokenRefresh] Failed to invalidate token cache for account %d: %v", account.ID, err)
slog.Warn("token_refresh.invalidate_token_cache_failed",
"account_id", account.ID,
"error", err,
)
} else {
log.Printf("[TokenRefresh] Token cache invalidated for account %d", account.ID)
slog.Debug("token_refresh.token_cache_invalidated", "account_id", account.ID)
}
}
// 同步更新调度器缓存,确保调度获取的 Account 对象包含最新的 credentials
// 这解决了 token 刷新后调度器缓存数据不一致的问题(#445)
if s.schedulerCache != nil {
if err := s.schedulerCache.SetAccount(ctx, account); err != nil {
log.Printf("[TokenRefresh] Failed to sync scheduler cache for account %d: %v", account.ID, err)
slog.Warn("token_refresh.sync_scheduler_cache_failed",
"account_id", account.ID,
"error", err,
)
} else {
log.Printf("[TokenRefresh] Scheduler cache synced for account %d", account.ID)
slog.Debug("token_refresh.scheduler_cache_synced", "account_id", account.ID)
}
}
return nil
......@@ -236,14 +258,21 @@ func (s *TokenRefreshService) refreshWithRetry(ctx context.Context, account *Acc
if account.Platform == PlatformAntigravity && isNonRetryableRefreshError(err) {
errorMsg := fmt.Sprintf("Token refresh failed (non-retryable): %v", err)
if setErr := s.accountRepo.SetError(ctx, account.ID, errorMsg); setErr != nil {
log.Printf("[TokenRefresh] Failed to set error status for account %d: %v", account.ID, setErr)
slog.Error("token_refresh.set_error_status_failed",
"account_id", account.ID,
"error", setErr,
)
}
return err
}
lastErr = err
log.Printf("[TokenRefresh] Account %d attempt %d/%d failed: %v",
account.ID, attempt, s.cfg.MaxRetries, err)
slog.Warn("token_refresh.retry_attempt_failed",
"account_id", account.ID,
"attempt", attempt,
"max_retries", s.cfg.MaxRetries,
"error", err,
)
// 如果还有重试机会,等待后重试
if attempt < s.cfg.MaxRetries {
......@@ -256,11 +285,18 @@ func (s *TokenRefreshService) refreshWithRetry(ctx context.Context, account *Acc
// Antigravity 账户:其他错误仅记录日志,不标记 error(可能是临时网络问题)
// 其他平台账户:重试失败后标记 error
if account.Platform == PlatformAntigravity {
log.Printf("[TokenRefresh] Account %d: refresh failed after %d retries: %v", account.ID, s.cfg.MaxRetries, lastErr)
slog.Warn("token_refresh.retry_exhausted_antigravity",
"account_id", account.ID,
"max_retries", s.cfg.MaxRetries,
"error", lastErr,
)
} else {
errorMsg := fmt.Sprintf("Token refresh failed after %d retries: %v", s.cfg.MaxRetries, lastErr)
if err := s.accountRepo.SetError(ctx, account.ID, errorMsg); err != nil {
log.Printf("[TokenRefresh] Failed to set error status for account %d: %v", account.ID, err)
slog.Error("token_refresh.set_error_status_failed",
"account_id", account.ID,
"error", err,
)
}
}
......
......@@ -3,9 +3,9 @@ package service
import (
"context"
"fmt"
"log"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
)
var (
......@@ -47,36 +47,36 @@ func NewTurnstileService(settingService *SettingService, verifier TurnstileVerif
func (s *TurnstileService) VerifyToken(ctx context.Context, token string, remoteIP string) error {
// 检查是否启用 Turnstile
if !s.settingService.IsTurnstileEnabled(ctx) {
log.Println("[Turnstile] Disabled, skipping verification")
logger.LegacyPrintf("service.turnstile", "%s", "[Turnstile] Disabled, skipping verification")
return nil
}
// 获取 Secret Key
secretKey := s.settingService.GetTurnstileSecretKey(ctx)
if secretKey == "" {
log.Println("[Turnstile] Secret key not configured")
logger.LegacyPrintf("service.turnstile", "%s", "[Turnstile] Secret key not configured")
return ErrTurnstileNotConfigured
}
// 如果 token 为空,返回错误
if token == "" {
log.Println("[Turnstile] Token is empty")
logger.LegacyPrintf("service.turnstile", "%s", "[Turnstile] Token is empty")
return ErrTurnstileVerificationFailed
}
log.Printf("[Turnstile] Verifying token for IP: %s", remoteIP)
logger.LegacyPrintf("service.turnstile", "[Turnstile] Verifying token for IP: %s", remoteIP)
result, err := s.verifier.VerifyToken(ctx, secretKey, token, remoteIP)
if err != nil {
log.Printf("[Turnstile] Request failed: %v", err)
logger.LegacyPrintf("service.turnstile", "[Turnstile] Request failed: %v", err)
return fmt.Errorf("send request: %w", err)
}
if !result.Success {
log.Printf("[Turnstile] Verification failed, error codes: %v", result.ErrorCodes)
logger.LegacyPrintf("service.turnstile", "[Turnstile] Verification failed, error codes: %v", result.ErrorCodes)
return ErrTurnstileVerificationFailed
}
log.Println("[Turnstile] Verification successful")
logger.LegacyPrintf("service.turnstile", "%s", "[Turnstile] Verification successful")
return nil
}
......
......@@ -5,7 +5,6 @@ import (
"database/sql"
"errors"
"fmt"
"log"
"log/slog"
"net/http"
"strings"
......@@ -15,6 +14,7 @@ import (
"github.com/Wei-Shaw/sub2api/internal/config"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
)
......@@ -82,18 +82,18 @@ func (s *UsageCleanupService) Start() {
return
}
if s.cfg != nil && !s.cfg.UsageCleanup.Enabled {
log.Printf("[UsageCleanup] not started (disabled)")
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] not started (disabled)")
return
}
if s.repo == nil || s.timingWheel == nil {
log.Printf("[UsageCleanup] not started (missing deps)")
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] not started (missing deps)")
return
}
interval := s.workerInterval()
s.startOnce.Do(func() {
s.timingWheel.ScheduleRecurring(usageCleanupWorkerName, interval, s.runOnce)
log.Printf("[UsageCleanup] started (interval=%s max_range_days=%d batch_size=%d task_timeout=%s)", interval, s.maxRangeDays(), s.batchSize(), s.taskTimeout())
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] started (interval=%s max_range_days=%d batch_size=%d task_timeout=%s)", interval, s.maxRangeDays(), s.batchSize(), s.taskTimeout())
})
}
......@@ -108,7 +108,7 @@ func (s *UsageCleanupService) Stop() {
if s.timingWheel != nil {
s.timingWheel.Cancel(usageCleanupWorkerName)
}
log.Printf("[UsageCleanup] stopped")
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] stopped")
})
}
......@@ -130,10 +130,10 @@ func (s *UsageCleanupService) CreateTask(ctx context.Context, filters UsageClean
return nil, infraerrors.BadRequest("USAGE_CLEANUP_INVALID_CREATOR", "invalid creator")
}
log.Printf("[UsageCleanup] create_task requested: operator=%d %s", createdBy, describeUsageCleanupFilters(filters))
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] create_task requested: operator=%d %s", createdBy, describeUsageCleanupFilters(filters))
sanitizeUsageCleanupFilters(&filters)
if err := s.validateFilters(filters); err != nil {
log.Printf("[UsageCleanup] create_task rejected: operator=%d err=%v %s", createdBy, err, describeUsageCleanupFilters(filters))
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] create_task rejected: operator=%d err=%v %s", createdBy, err, describeUsageCleanupFilters(filters))
return nil, err
}
......@@ -143,10 +143,10 @@ func (s *UsageCleanupService) CreateTask(ctx context.Context, filters UsageClean
CreatedBy: createdBy,
}
if err := s.repo.CreateTask(ctx, task); err != nil {
log.Printf("[UsageCleanup] create_task persist failed: operator=%d err=%v %s", createdBy, err, describeUsageCleanupFilters(filters))
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] create_task persist failed: operator=%d err=%v %s", createdBy, err, describeUsageCleanupFilters(filters))
return nil, fmt.Errorf("create cleanup task: %w", err)
}
log.Printf("[UsageCleanup] create_task persisted: task=%d operator=%d status=%s deleted_rows=%d %s", task.ID, createdBy, task.Status, task.DeletedRows, describeUsageCleanupFilters(filters))
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] create_task persisted: task=%d operator=%d status=%s deleted_rows=%d %s", task.ID, createdBy, task.Status, task.DeletedRows, describeUsageCleanupFilters(filters))
go s.runOnce()
return task, nil
}
......@@ -157,7 +157,7 @@ func (s *UsageCleanupService) runOnce() {
return
}
if !atomic.CompareAndSwapInt32(&svc.running, 0, 1) {
log.Printf("[UsageCleanup] run_once skipped: already_running=true")
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] run_once skipped: already_running=true")
return
}
defer atomic.StoreInt32(&svc.running, 0)
......@@ -171,7 +171,7 @@ func (s *UsageCleanupService) runOnce() {
task, err := svc.repo.ClaimNextPendingTask(ctx, int64(svc.taskTimeout().Seconds()))
if err != nil {
log.Printf("[UsageCleanup] claim pending task failed: %v", err)
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] claim pending task failed: %v", err)
return
}
if task == nil {
......@@ -179,7 +179,7 @@ func (s *UsageCleanupService) runOnce() {
return
}
log.Printf("[UsageCleanup] task claimed: task=%d status=%s created_by=%d deleted_rows=%d %s", task.ID, task.Status, task.CreatedBy, task.DeletedRows, describeUsageCleanupFilters(task.Filters))
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] task claimed: task=%d status=%s created_by=%d deleted_rows=%d %s", task.ID, task.Status, task.CreatedBy, task.DeletedRows, describeUsageCleanupFilters(task.Filters))
svc.executeTask(ctx, task)
}
......@@ -191,12 +191,12 @@ func (s *UsageCleanupService) executeTask(ctx context.Context, task *UsageCleanu
batchSize := s.batchSize()
deletedTotal := task.DeletedRows
start := time.Now()
log.Printf("[UsageCleanup] task started: task=%d batch_size=%d deleted_rows=%d %s", task.ID, batchSize, deletedTotal, describeUsageCleanupFilters(task.Filters))
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] task started: task=%d batch_size=%d deleted_rows=%d %s", task.ID, batchSize, deletedTotal, describeUsageCleanupFilters(task.Filters))
var batchNum int
for {
if ctx != nil && ctx.Err() != nil {
log.Printf("[UsageCleanup] task interrupted: task=%d err=%v", task.ID, ctx.Err())
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] task interrupted: task=%d err=%v", task.ID, ctx.Err())
return
}
canceled, err := s.isTaskCanceled(ctx, task.ID)
......@@ -205,7 +205,7 @@ func (s *UsageCleanupService) executeTask(ctx context.Context, task *UsageCleanu
return
}
if canceled {
log.Printf("[UsageCleanup] task canceled: task=%d deleted_rows=%d duration=%s", task.ID, deletedTotal, time.Since(start))
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] task canceled: task=%d deleted_rows=%d duration=%s", task.ID, deletedTotal, time.Since(start))
return
}
......@@ -214,7 +214,7 @@ func (s *UsageCleanupService) executeTask(ctx context.Context, task *UsageCleanu
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// 任务被中断(例如服务停止/超时),保持 running 状态,后续通过 stale reclaim 续跑。
log.Printf("[UsageCleanup] task interrupted: task=%d err=%v", task.ID, err)
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] task interrupted: task=%d err=%v", task.ID, err)
return
}
s.markTaskFailed(task.ID, deletedTotal, err)
......@@ -224,12 +224,12 @@ func (s *UsageCleanupService) executeTask(ctx context.Context, task *UsageCleanu
if deleted > 0 {
updateCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
if err := s.repo.UpdateTaskProgress(updateCtx, task.ID, deletedTotal); err != nil {
log.Printf("[UsageCleanup] task progress update failed: task=%d deleted_rows=%d err=%v", task.ID, deletedTotal, err)
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] task progress update failed: task=%d deleted_rows=%d err=%v", task.ID, deletedTotal, err)
}
cancel()
}
if batchNum <= 3 || batchNum%20 == 0 || deleted < int64(batchSize) {
log.Printf("[UsageCleanup] task batch done: task=%d batch=%d deleted=%d deleted_total=%d", task.ID, batchNum, deleted, deletedTotal)
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] task batch done: task=%d batch=%d deleted=%d deleted_total=%d", task.ID, batchNum, deleted, deletedTotal)
}
if deleted == 0 || deleted < int64(batchSize) {
break
......@@ -239,16 +239,16 @@ func (s *UsageCleanupService) executeTask(ctx context.Context, task *UsageCleanu
updateCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := s.repo.MarkTaskSucceeded(updateCtx, task.ID, deletedTotal); err != nil {
log.Printf("[UsageCleanup] update task succeeded failed: task=%d err=%v", task.ID, err)
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] update task succeeded failed: task=%d err=%v", task.ID, err)
} else {
log.Printf("[UsageCleanup] task succeeded: task=%d deleted_rows=%d duration=%s", task.ID, deletedTotal, time.Since(start))
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] task succeeded: task=%d deleted_rows=%d duration=%s", task.ID, deletedTotal, time.Since(start))
}
if s.dashboard != nil {
if err := s.dashboard.TriggerRecomputeRange(task.Filters.StartTime, task.Filters.EndTime); err != nil {
log.Printf("[UsageCleanup] trigger dashboard recompute failed: task=%d err=%v", task.ID, err)
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] trigger dashboard recompute failed: task=%d err=%v", task.ID, err)
} else {
log.Printf("[UsageCleanup] trigger dashboard recompute: task=%d start=%s end=%s", task.ID, task.Filters.StartTime.UTC().Format(time.RFC3339), task.Filters.EndTime.UTC().Format(time.RFC3339))
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] trigger dashboard recompute: task=%d start=%s end=%s", task.ID, task.Filters.StartTime.UTC().Format(time.RFC3339), task.Filters.EndTime.UTC().Format(time.RFC3339))
}
}
}
......@@ -258,11 +258,11 @@ func (s *UsageCleanupService) markTaskFailed(taskID int64, deletedRows int64, er
if len(msg) > 500 {
msg = msg[:500]
}
log.Printf("[UsageCleanup] task failed: task=%d deleted_rows=%d err=%s", taskID, deletedRows, msg)
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] task failed: task=%d deleted_rows=%d err=%s", taskID, deletedRows, msg)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if updateErr := s.repo.MarkTaskFailed(ctx, taskID, deletedRows, msg); updateErr != nil {
log.Printf("[UsageCleanup] update task failed failed: task=%d err=%v", taskID, updateErr)
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] update task failed failed: task=%d err=%v", taskID, updateErr)
}
}
......@@ -280,7 +280,7 @@ func (s *UsageCleanupService) isTaskCanceled(ctx context.Context, taskID int64)
return false, err
}
if status == UsageCleanupStatusCanceled {
log.Printf("[UsageCleanup] task cancel detected: task=%d", taskID)
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] task cancel detected: task=%d", taskID)
}
return status == UsageCleanupStatusCanceled, nil
}
......@@ -319,7 +319,7 @@ func (s *UsageCleanupService) CancelTask(ctx context.Context, taskID int64, canc
}
return err
}
log.Printf("[UsageCleanup] cancel_task requested: task=%d operator=%d status=%s", taskID, canceledBy, status)
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] cancel_task requested: task=%d operator=%d status=%s", taskID, canceledBy, status)
if status != UsageCleanupStatusPending && status != UsageCleanupStatusRunning {
return infraerrors.New(http.StatusConflict, "USAGE_CLEANUP_CANCEL_CONFLICT", "cleanup task cannot be canceled in current status")
}
......@@ -331,7 +331,7 @@ func (s *UsageCleanupService) CancelTask(ctx context.Context, taskID int64, canc
// 状态可能并发改变
return infraerrors.New(http.StatusConflict, "USAGE_CLEANUP_CANCEL_CONFLICT", "cleanup task cannot be canceled in current status")
}
log.Printf("[UsageCleanup] cancel_task done: task=%d operator=%d", taskID, canceledBy)
logger.LegacyPrintf("service.usage_cleanup", "[UsageCleanup] cancel_task done: task=%d operator=%d", taskID, canceledBy)
return nil
}
......
......@@ -6,6 +6,7 @@ import (
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/google/wire"
"github.com/redis/go-redis/v9"
)
......@@ -193,6 +194,13 @@ func ProvideOpsCleanupService(
return svc
}
func ProvideOpsSystemLogSink(opsRepo OpsRepository) *OpsSystemLogSink {
sink := NewOpsSystemLogSink(opsRepo)
sink.Start()
logger.SetSink(sink)
return sink
}
// ProvideSoraMediaStorage 初始化 Sora 媒体存储
func ProvideSoraMediaStorage(cfg *config.Config) *SoraMediaStorage {
return NewSoraMediaStorage(cfg)
......@@ -268,6 +276,7 @@ var ProviderSet = wire.NewSet(
NewAccountUsageService,
NewAccountTestService,
NewSettingService,
ProvideOpsSystemLogSink,
NewOpsService,
ProvideOpsMetricsCollector,
ProvideOpsAggregationService,
......
......@@ -7,11 +7,12 @@ import (
"database/sql"
"encoding/hex"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/repository"
"github.com/Wei-Shaw/sub2api/internal/service"
......@@ -103,6 +104,36 @@ type JWTConfig struct {
ExpireHour int `json:"expire_hour" yaml:"expire_hour"`
}
const (
adminBootstrapReasonEmptyDatabase = "empty_database"
adminBootstrapReasonAdminExists = "admin_exists"
adminBootstrapReasonUsersExistWithoutAdmin = "users_exist_without_admin"
)
type adminBootstrapDecision struct {
shouldCreate bool
reason string
}
func decideAdminBootstrap(totalUsers, adminUsers int64) adminBootstrapDecision {
if adminUsers > 0 {
return adminBootstrapDecision{
shouldCreate: false,
reason: adminBootstrapReasonAdminExists,
}
}
if totalUsers > 0 {
return adminBootstrapDecision{
shouldCreate: false,
reason: adminBootstrapReasonUsersExistWithoutAdmin,
}
}
return adminBootstrapDecision{
shouldCreate: true,
reason: adminBootstrapReasonEmptyDatabase,
}
}
// NeedsSetup checks if the system needs initial setup
// Uses multiple checks to prevent attackers from forcing re-setup by deleting config
func NeedsSetup() bool {
......@@ -137,7 +168,7 @@ func TestDatabaseConnection(cfg *DatabaseConfig) error {
return
}
if err := db.Close(); err != nil {
log.Printf("failed to close postgres connection: %v", err)
logger.LegacyPrintf("setup", "failed to close postgres connection: %v", err)
}
}()
......@@ -164,12 +195,12 @@ func TestDatabaseConnection(cfg *DatabaseConfig) error {
if err != nil {
return fmt.Errorf("failed to create database '%s': %w", cfg.DBName, err)
}
log.Printf("Database '%s' created successfully", cfg.DBName)
logger.LegacyPrintf("setup", "Database '%s' created successfully", cfg.DBName)
}
// Now connect to the target database to verify
if err := db.Close(); err != nil {
log.Printf("failed to close postgres connection: %v", err)
logger.LegacyPrintf("setup", "failed to close postgres connection: %v", err)
}
db = nil
......@@ -185,7 +216,7 @@ func TestDatabaseConnection(cfg *DatabaseConfig) error {
defer func() {
if err := targetDB.Close(); err != nil {
log.Printf("failed to close postgres connection: %v", err)
logger.LegacyPrintf("setup", "failed to close postgres connection: %v", err)
}
}()
......@@ -217,7 +248,7 @@ func TestRedisConnection(cfg *RedisConfig) error {
rdb := redis.NewClient(opts)
defer func() {
if err := rdb.Close(); err != nil {
log.Printf("failed to close redis client: %v", err)
logger.LegacyPrintf("setup", "failed to close redis client: %v", err)
}
}()
......@@ -245,7 +276,7 @@ func Install(cfg *SetupConfig) error {
return fmt.Errorf("failed to generate jwt secret: %w", err)
}
cfg.JWT.Secret = secret
log.Println("Warning: JWT secret auto-generated. Consider setting a fixed secret for production.")
logger.LegacyPrintf("setup", "%s", "Warning: JWT secret auto-generated. Consider setting a fixed secret for production.")
}
// Test connections
......@@ -262,8 +293,8 @@ func Install(cfg *SetupConfig) error {
return fmt.Errorf("database initialization failed: %w", err)
}
// Create admin user
if err := createAdminUser(cfg); err != nil {
// Create admin user (only when database is empty and no admin exists).
if _, _, err := createAdminUser(cfg); err != nil {
return fmt.Errorf("admin user creation failed: %w", err)
}
......@@ -300,7 +331,7 @@ func initializeDatabase(cfg *SetupConfig) error {
defer func() {
if err := db.Close(); err != nil {
log.Printf("failed to close postgres connection: %v", err)
logger.LegacyPrintf("setup", "failed to close postgres connection: %v", err)
}
}()
......@@ -309,7 +340,7 @@ func initializeDatabase(cfg *SetupConfig) error {
return repository.ApplyMigrations(migrationCtx, db)
}
func createAdminUser(cfg *SetupConfig) error {
func createAdminUser(cfg *SetupConfig) (bool, string, error) {
dsn := fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
cfg.Database.Host, cfg.Database.Port, cfg.Database.User,
......@@ -318,12 +349,12 @@ func createAdminUser(cfg *SetupConfig) error {
db, err := sql.Open("postgres", dsn)
if err != nil {
return err
return false, "", err
}
defer func() {
if err := db.Close(); err != nil {
log.Printf("failed to close postgres connection: %v", err)
logger.LegacyPrintf("setup", "failed to close postgres connection: %v", err)
}
}()
......@@ -331,13 +362,27 @@ func createAdminUser(cfg *SetupConfig) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Check if admin already exists
var count int64
if err := db.QueryRowContext(ctx, "SELECT COUNT(1) FROM users WHERE role = $1", service.RoleAdmin).Scan(&count); err != nil {
return err
var totalUsers int64
if err := db.QueryRowContext(ctx, "SELECT COUNT(1) FROM users").Scan(&totalUsers); err != nil {
return false, "", err
}
var adminUsers int64
if err := db.QueryRowContext(ctx, "SELECT COUNT(1) FROM users WHERE role = $1", service.RoleAdmin).Scan(&adminUsers); err != nil {
return false, "", err
}
decision := decideAdminBootstrap(totalUsers, adminUsers)
if !decision.shouldCreate {
return false, decision.reason, nil
}
if strings.TrimSpace(cfg.Admin.Password) == "" {
password, genErr := generateSecret(16)
if genErr != nil {
return false, "", fmt.Errorf("failed to generate admin password: %w", genErr)
}
if count > 0 {
return nil // Admin already exists
cfg.Admin.Password = password
fmt.Printf("Generated admin password (one-time): %s\n", cfg.Admin.Password)
fmt.Println("IMPORTANT: Save this password! It will not be shown again.")
}
admin := &service.User{
......@@ -351,7 +396,7 @@ func createAdminUser(cfg *SetupConfig) error {
}
if err := admin.SetPassword(cfg.Admin.Password); err != nil {
return err
return false, "", err
}
_, err = db.ExecContext(
......@@ -367,7 +412,10 @@ func createAdminUser(cfg *SetupConfig) error {
admin.CreatedAt,
admin.UpdatedAt,
)
return err
if err != nil {
return false, "", err
}
return true, decision.reason, nil
}
func writeConfigFile(cfg *SetupConfig) error {
......@@ -476,8 +524,8 @@ func getEnvIntOrDefault(key string, defaultValue int) int {
// AutoSetupFromEnv performs automatic setup using environment variables
// This is designed for Docker deployment where all config is passed via env vars
func AutoSetupFromEnv() error {
log.Println("Auto setup enabled, configuring from environment variables...")
log.Printf("Data directory: %s", GetDataDir())
logger.LegacyPrintf("setup", "%s", "Auto setup enabled, configuring from environment variables...")
logger.LegacyPrintf("setup", "Data directory: %s", GetDataDir())
// Get timezone from TZ or TIMEZONE env var (TZ is standard for Docker)
tz := getEnvOrDefault("TZ", "")
......@@ -525,61 +573,62 @@ func AutoSetupFromEnv() error {
return fmt.Errorf("failed to generate jwt secret: %w", err)
}
cfg.JWT.Secret = secret
log.Println("Warning: JWT secret auto-generated. Consider setting a fixed secret for production.")
}
// Generate admin password if not provided
if cfg.Admin.Password == "" {
password, err := generateSecret(16)
if err != nil {
return fmt.Errorf("failed to generate admin password: %w", err)
}
cfg.Admin.Password = password
fmt.Printf("Generated admin password (one-time): %s\n", cfg.Admin.Password)
fmt.Println("IMPORTANT: Save this password! It will not be shown again.")
logger.LegacyPrintf("setup", "%s", "Warning: JWT secret auto-generated. Consider setting a fixed secret for production.")
}
// Test database connection
log.Println("Testing database connection...")
logger.LegacyPrintf("setup", "%s", "Testing database connection...")
if err := TestDatabaseConnection(&cfg.Database); err != nil {
return fmt.Errorf("database connection failed: %w", err)
}
log.Println("Database connection successful")
logger.LegacyPrintf("setup", "%s", "Database connection successful")
// Test Redis connection
log.Println("Testing Redis connection...")
logger.LegacyPrintf("setup", "%s", "Testing Redis connection...")
if err := TestRedisConnection(&cfg.Redis); err != nil {
return fmt.Errorf("redis connection failed: %w", err)
}
log.Println("Redis connection successful")
logger.LegacyPrintf("setup", "%s", "Redis connection successful")
// Initialize database
log.Println("Initializing database...")
logger.LegacyPrintf("setup", "%s", "Initializing database...")
if err := initializeDatabase(cfg); err != nil {
return fmt.Errorf("database initialization failed: %w", err)
}
log.Println("Database initialized successfully")
logger.LegacyPrintf("setup", "%s", "Database initialized successfully")
// Create admin user
log.Println("Creating admin user...")
if err := createAdminUser(cfg); err != nil {
logger.LegacyPrintf("setup", "%s", "Creating admin user...")
created, reason, err := createAdminUser(cfg)
if err != nil {
return fmt.Errorf("admin user creation failed: %w", err)
}
log.Printf("Admin user created: %s", cfg.Admin.Email)
if created {
logger.LegacyPrintf("setup", "Admin user created: %s", cfg.Admin.Email)
} else {
switch reason {
case adminBootstrapReasonAdminExists:
logger.LegacyPrintf("setup", "%s", "Admin user already exists, skipping admin bootstrap")
case adminBootstrapReasonUsersExistWithoutAdmin:
logger.LegacyPrintf("setup", "%s", "Database already has user data; skipping auto admin bootstrap to avoid password overwrite")
default:
logger.LegacyPrintf("setup", "%s", "Admin bootstrap skipped")
}
}
// Write config file
log.Println("Writing configuration file...")
logger.LegacyPrintf("setup", "%s", "Writing configuration file...")
if err := writeConfigFile(cfg); err != nil {
return fmt.Errorf("config file creation failed: %w", err)
}
log.Println("Configuration file created")
logger.LegacyPrintf("setup", "%s", "Configuration file created")
// Create installation lock file
if err := createInstallLock(); err != nil {
return fmt.Errorf("failed to create install lock: %w", err)
}
log.Println("Installation lock created")
logger.LegacyPrintf("setup", "%s", "Installation lock created")
log.Println("Auto setup completed successfully!")
logger.LegacyPrintf("setup", "%s", "Auto setup completed successfully!")
return nil
}
package setup
import "testing"
func TestDecideAdminBootstrap(t *testing.T) {
t.Parallel()
tests := []struct {
name string
totalUsers int64
adminUsers int64
should bool
reason string
}{
{
name: "empty database should create admin",
totalUsers: 0,
adminUsers: 0,
should: true,
reason: adminBootstrapReasonEmptyDatabase,
},
{
name: "admin exists should skip",
totalUsers: 10,
adminUsers: 1,
should: false,
reason: adminBootstrapReasonAdminExists,
},
{
name: "users exist without admin should skip",
totalUsers: 5,
adminUsers: 0,
should: false,
reason: adminBootstrapReasonUsersExistWithoutAdmin,
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
got := decideAdminBootstrap(tc.totalUsers, tc.adminUsers)
if got.shouldCreate != tc.should {
t.Fatalf("shouldCreate=%v, want %v", got.shouldCreate, tc.should)
}
if got.reason != tc.reason {
t.Fatalf("reason=%q, want %q", got.reason, tc.reason)
}
})
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment