Unverified Commit 0f8d42c5 authored by Wesley Liddick's avatar Wesley Liddick Committed by GitHub
Browse files

Merge pull request #327 from mt21625457/main

feat(usage): 添加清理任务与统计过滤
parents 03c75787 2a94cc76
......@@ -1254,11 +1254,11 @@ func (r *stubUsageLogRepo) GetDashboardStats(ctx context.Context) (*usagestats.D
return nil, errors.New("not implemented")
}
func (r *stubUsageLogRepo) GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID, accountID, groupID int64, model string, stream *bool) ([]usagestats.TrendDataPoint, error) {
func (r *stubUsageLogRepo) GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID, accountID, groupID int64, model string, stream *bool, billingType *int8) ([]usagestats.TrendDataPoint, error) {
return nil, errors.New("not implemented")
}
func (r *stubUsageLogRepo) GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID, groupID int64, stream *bool) ([]usagestats.ModelStat, error) {
func (r *stubUsageLogRepo) GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID, groupID int64, stream *bool, billingType *int8) ([]usagestats.ModelStat, error) {
return nil, errors.New("not implemented")
}
......
......@@ -354,6 +354,9 @@ func registerUsageRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
usage.GET("/stats", h.Admin.Usage.Stats)
usage.GET("/search-users", h.Admin.Usage.SearchUsers)
usage.GET("/search-api-keys", h.Admin.Usage.SearchAPIKeys)
usage.GET("/cleanup-tasks", h.Admin.Usage.ListCleanupTasks)
usage.POST("/cleanup-tasks", h.Admin.Usage.CreateCleanupTask)
usage.POST("/cleanup-tasks/:id/cancel", h.Admin.Usage.CancelCleanupTask)
}
}
......
......@@ -32,8 +32,8 @@ type UsageLogRepository interface {
// Admin dashboard stats
GetDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error)
GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID, accountID, groupID int64, model string, stream *bool) ([]usagestats.TrendDataPoint, error)
GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID, groupID int64, stream *bool) ([]usagestats.ModelStat, error)
GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID, accountID, groupID int64, model string, stream *bool, billingType *int8) ([]usagestats.TrendDataPoint, error)
GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID, groupID int64, stream *bool, billingType *int8) ([]usagestats.ModelStat, error)
GetAPIKeyUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.APIKeyUsageTrendPoint, error)
GetUserUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.UserUsageTrendPoint, error)
GetBatchUserUsageStats(ctx context.Context, userIDs []int64) (map[int64]*usagestats.BatchUserUsageStats, error)
......@@ -272,7 +272,7 @@ func (s *AccountUsageService) getGeminiUsage(ctx context.Context, account *Accou
}
dayStart := geminiDailyWindowStart(now)
stats, err := s.usageLogRepo.GetModelStatsWithFilters(ctx, dayStart, now, 0, 0, account.ID, 0, nil)
stats, err := s.usageLogRepo.GetModelStatsWithFilters(ctx, dayStart, now, 0, 0, account.ID, 0, nil, nil)
if err != nil {
return nil, fmt.Errorf("get gemini usage stats failed: %w", err)
}
......@@ -294,7 +294,7 @@ func (s *AccountUsageService) getGeminiUsage(ctx context.Context, account *Accou
// Minute window (RPM) - fixed-window approximation: current minute [truncate(now), truncate(now)+1m)
minuteStart := now.Truncate(time.Minute)
minuteResetAt := minuteStart.Add(time.Minute)
minuteStats, err := s.usageLogRepo.GetModelStatsWithFilters(ctx, minuteStart, now, 0, 0, account.ID, 0, nil)
minuteStats, err := s.usageLogRepo.GetModelStatsWithFilters(ctx, minuteStart, now, 0, 0, account.ID, 0, nil, nil)
if err != nil {
return nil, fmt.Errorf("get gemini minute usage stats failed: %w", err)
}
......
......@@ -20,12 +20,16 @@ var (
// ErrDashboardBackfillDisabled 当配置禁用回填时返回。
ErrDashboardBackfillDisabled = errors.New("仪表盘聚合回填已禁用")
// ErrDashboardBackfillTooLarge 当回填跨度超过限制时返回。
ErrDashboardBackfillTooLarge = errors.New("回填时间跨度过大")
ErrDashboardBackfillTooLarge = errors.New("回填时间跨度过大")
errDashboardAggregationRunning = errors.New("聚合作业正在运行")
)
// DashboardAggregationRepository 定义仪表盘预聚合仓储接口。
type DashboardAggregationRepository interface {
AggregateRange(ctx context.Context, start, end time.Time) error
// RecomputeRange 重新计算指定时间范围内的聚合数据(包含活跃用户等派生表)。
// 设计目的:当 usage_logs 被批量删除/回滚后,确保聚合表可恢复一致性。
RecomputeRange(ctx context.Context, start, end time.Time) error
GetAggregationWatermark(ctx context.Context) (time.Time, error)
UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error
CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error
......@@ -112,6 +116,41 @@ func (s *DashboardAggregationService) TriggerBackfill(start, end time.Time) erro
return nil
}
// TriggerRecomputeRange 触发指定范围的重新计算(异步)。
// 与 TriggerBackfill 不同:
// - 不依赖 backfill_enabled(这是内部一致性修复)
// - 不更新 watermark(避免影响正常增量聚合游标)
func (s *DashboardAggregationService) TriggerRecomputeRange(start, end time.Time) error {
if s == nil || s.repo == nil {
return errors.New("聚合服务未初始化")
}
if !s.cfg.Enabled {
return errors.New("聚合服务已禁用")
}
if !end.After(start) {
return errors.New("重新计算时间范围无效")
}
go func() {
const maxRetries = 3
for i := 0; i < maxRetries; i++ {
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout)
err := s.recomputeRange(ctx, start, end)
cancel()
if err == nil {
return
}
if !errors.Is(err, errDashboardAggregationRunning) {
log.Printf("[DashboardAggregation] 重新计算失败: %v", err)
return
}
time.Sleep(5 * time.Second)
}
log.Printf("[DashboardAggregation] 重新计算放弃: 聚合作业持续占用")
}()
return nil
}
func (s *DashboardAggregationService) recomputeRecentDays() {
days := s.cfg.RecomputeDays
if days <= 0 {
......@@ -128,6 +167,24 @@ func (s *DashboardAggregationService) recomputeRecentDays() {
}
}
func (s *DashboardAggregationService) recomputeRange(ctx context.Context, start, end time.Time) error {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return errDashboardAggregationRunning
}
defer atomic.StoreInt32(&s.running, 0)
jobStart := time.Now().UTC()
if err := s.repo.RecomputeRange(ctx, start, end); err != nil {
return err
}
log.Printf("[DashboardAggregation] 重新计算完成 (start=%s end=%s duration=%s)",
start.UTC().Format(time.RFC3339),
end.UTC().Format(time.RFC3339),
time.Since(jobStart).String(),
)
return nil
}
func (s *DashboardAggregationService) runScheduledAggregation() {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return
......@@ -179,7 +236,7 @@ func (s *DashboardAggregationService) runScheduledAggregation() {
func (s *DashboardAggregationService) backfillRange(ctx context.Context, start, end time.Time) error {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return errors.New("聚合作业正在运行")
return errDashboardAggregationRunning
}
defer atomic.StoreInt32(&s.running, 0)
......
......@@ -27,6 +27,10 @@ func (s *dashboardAggregationRepoTestStub) AggregateRange(ctx context.Context, s
return s.aggregateErr
}
func (s *dashboardAggregationRepoTestStub) RecomputeRange(ctx context.Context, start, end time.Time) error {
return s.AggregateRange(ctx, start, end)
}
func (s *dashboardAggregationRepoTestStub) GetAggregationWatermark(ctx context.Context) (time.Time, error) {
return s.watermark, nil
}
......
......@@ -124,16 +124,16 @@ func (s *DashboardService) GetDashboardStats(ctx context.Context) (*usagestats.D
return stats, nil
}
func (s *DashboardService) GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID, accountID, groupID int64, model string, stream *bool) ([]usagestats.TrendDataPoint, error) {
trend, err := s.usageRepo.GetUsageTrendWithFilters(ctx, startTime, endTime, granularity, userID, apiKeyID, accountID, groupID, model, stream)
func (s *DashboardService) GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID, accountID, groupID int64, model string, stream *bool, billingType *int8) ([]usagestats.TrendDataPoint, error) {
trend, err := s.usageRepo.GetUsageTrendWithFilters(ctx, startTime, endTime, granularity, userID, apiKeyID, accountID, groupID, model, stream, billingType)
if err != nil {
return nil, fmt.Errorf("get usage trend with filters: %w", err)
}
return trend, nil
}
func (s *DashboardService) GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID, groupID int64, stream *bool) ([]usagestats.ModelStat, error) {
stats, err := s.usageRepo.GetModelStatsWithFilters(ctx, startTime, endTime, userID, apiKeyID, accountID, groupID, stream)
func (s *DashboardService) GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID, groupID int64, stream *bool, billingType *int8) ([]usagestats.ModelStat, error) {
stats, err := s.usageRepo.GetModelStatsWithFilters(ctx, startTime, endTime, userID, apiKeyID, accountID, groupID, stream, billingType)
if err != nil {
return nil, fmt.Errorf("get model stats with filters: %w", err)
}
......
......@@ -101,6 +101,10 @@ func (s *dashboardAggregationRepoStub) AggregateRange(ctx context.Context, start
return nil
}
func (s *dashboardAggregationRepoStub) RecomputeRange(ctx context.Context, start, end time.Time) error {
return nil
}
func (s *dashboardAggregationRepoStub) GetAggregationWatermark(ctx context.Context) (time.Time, error) {
if s.err != nil {
return time.Time{}, s.err
......
......@@ -190,7 +190,7 @@ func (s *RateLimitService) PreCheckUsage(ctx context.Context, account *Account,
start := geminiDailyWindowStart(now)
totals, ok := s.getGeminiUsageTotals(account.ID, start, now)
if !ok {
stats, err := s.usageRepo.GetModelStatsWithFilters(ctx, start, now, 0, 0, account.ID, 0, nil)
stats, err := s.usageRepo.GetModelStatsWithFilters(ctx, start, now, 0, 0, account.ID, 0, nil, nil)
if err != nil {
return true, err
}
......@@ -237,7 +237,7 @@ func (s *RateLimitService) PreCheckUsage(ctx context.Context, account *Account,
if limit > 0 {
start := now.Truncate(time.Minute)
stats, err := s.usageRepo.GetModelStatsWithFilters(ctx, start, now, 0, 0, account.ID, 0, nil)
stats, err := s.usageRepo.GetModelStatsWithFilters(ctx, start, now, 0, 0, account.ID, 0, nil, nil)
if err != nil {
return true, err
}
......
package service
import (
"context"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
)
const (
UsageCleanupStatusPending = "pending"
UsageCleanupStatusRunning = "running"
UsageCleanupStatusSucceeded = "succeeded"
UsageCleanupStatusFailed = "failed"
UsageCleanupStatusCanceled = "canceled"
)
// UsageCleanupFilters 定义清理任务过滤条件
// 时间范围为必填,其他字段可选
// JSON 序列化用于存储任务参数
//
// start_time/end_time 使用 RFC3339 时间格式
// 以 UTC 或用户时区解析后的时间为准
//
// 说明:
// - nil 表示未设置该过滤条件
// - 过滤条件均为精确匹配
type UsageCleanupFilters struct {
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
UserID *int64 `json:"user_id,omitempty"`
APIKeyID *int64 `json:"api_key_id,omitempty"`
AccountID *int64 `json:"account_id,omitempty"`
GroupID *int64 `json:"group_id,omitempty"`
Model *string `json:"model,omitempty"`
Stream *bool `json:"stream,omitempty"`
BillingType *int8 `json:"billing_type,omitempty"`
}
// UsageCleanupTask 表示使用记录清理任务
// 状态包含 pending/running/succeeded/failed/canceled
type UsageCleanupTask struct {
ID int64
Status string
Filters UsageCleanupFilters
CreatedBy int64
DeletedRows int64
ErrorMsg *string
CanceledBy *int64
CanceledAt *time.Time
StartedAt *time.Time
FinishedAt *time.Time
CreatedAt time.Time
UpdatedAt time.Time
}
// UsageCleanupRepository 定义清理任务持久层接口
type UsageCleanupRepository interface {
CreateTask(ctx context.Context, task *UsageCleanupTask) error
ListTasks(ctx context.Context, params pagination.PaginationParams) ([]UsageCleanupTask, *pagination.PaginationResult, error)
// ClaimNextPendingTask 抢占下一条可执行任务:
// - 优先 pending
// - 若 running 超过 staleRunningAfterSeconds(可能由于进程退出/崩溃/超时),允许重新抢占继续执行
ClaimNextPendingTask(ctx context.Context, staleRunningAfterSeconds int64) (*UsageCleanupTask, error)
// GetTaskStatus 查询任务状态;若不存在返回 sql.ErrNoRows
GetTaskStatus(ctx context.Context, taskID int64) (string, error)
// UpdateTaskProgress 更新任务进度(deleted_rows)用于断点续跑/展示
UpdateTaskProgress(ctx context.Context, taskID int64, deletedRows int64) error
// CancelTask 将任务标记为 canceled(仅允许 pending/running)
CancelTask(ctx context.Context, taskID int64, canceledBy int64) (bool, error)
MarkTaskSucceeded(ctx context.Context, taskID int64, deletedRows int64) error
MarkTaskFailed(ctx context.Context, taskID int64, deletedRows int64, errorMsg string) error
DeleteUsageLogsBatch(ctx context.Context, filters UsageCleanupFilters, limit int) (int64, error)
}
package service
import (
"context"
"database/sql"
"errors"
"fmt"
"log"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
)
const (
usageCleanupWorkerName = "usage_cleanup_worker"
)
// UsageCleanupService 负责创建与执行使用记录清理任务
type UsageCleanupService struct {
repo UsageCleanupRepository
timingWheel *TimingWheelService
dashboard *DashboardAggregationService
cfg *config.Config
running int32
startOnce sync.Once
stopOnce sync.Once
workerCtx context.Context
workerCancel context.CancelFunc
}
func NewUsageCleanupService(repo UsageCleanupRepository, timingWheel *TimingWheelService, dashboard *DashboardAggregationService, cfg *config.Config) *UsageCleanupService {
workerCtx, workerCancel := context.WithCancel(context.Background())
return &UsageCleanupService{
repo: repo,
timingWheel: timingWheel,
dashboard: dashboard,
cfg: cfg,
workerCtx: workerCtx,
workerCancel: workerCancel,
}
}
func describeUsageCleanupFilters(filters UsageCleanupFilters) string {
var parts []string
parts = append(parts, "start="+filters.StartTime.UTC().Format(time.RFC3339))
parts = append(parts, "end="+filters.EndTime.UTC().Format(time.RFC3339))
if filters.UserID != nil {
parts = append(parts, fmt.Sprintf("user_id=%d", *filters.UserID))
}
if filters.APIKeyID != nil {
parts = append(parts, fmt.Sprintf("api_key_id=%d", *filters.APIKeyID))
}
if filters.AccountID != nil {
parts = append(parts, fmt.Sprintf("account_id=%d", *filters.AccountID))
}
if filters.GroupID != nil {
parts = append(parts, fmt.Sprintf("group_id=%d", *filters.GroupID))
}
if filters.Model != nil {
parts = append(parts, "model="+strings.TrimSpace(*filters.Model))
}
if filters.Stream != nil {
parts = append(parts, fmt.Sprintf("stream=%t", *filters.Stream))
}
if filters.BillingType != nil {
parts = append(parts, fmt.Sprintf("billing_type=%d", *filters.BillingType))
}
return strings.Join(parts, " ")
}
func (s *UsageCleanupService) Start() {
if s == nil {
return
}
if s.cfg != nil && !s.cfg.UsageCleanup.Enabled {
log.Printf("[UsageCleanup] not started (disabled)")
return
}
if s.repo == nil || s.timingWheel == nil {
log.Printf("[UsageCleanup] not started (missing deps)")
return
}
interval := s.workerInterval()
s.startOnce.Do(func() {
s.timingWheel.ScheduleRecurring(usageCleanupWorkerName, interval, s.runOnce)
log.Printf("[UsageCleanup] started (interval=%s max_range_days=%d batch_size=%d task_timeout=%s)", interval, s.maxRangeDays(), s.batchSize(), s.taskTimeout())
})
}
func (s *UsageCleanupService) Stop() {
if s == nil {
return
}
s.stopOnce.Do(func() {
if s.workerCancel != nil {
s.workerCancel()
}
if s.timingWheel != nil {
s.timingWheel.Cancel(usageCleanupWorkerName)
}
log.Printf("[UsageCleanup] stopped")
})
}
func (s *UsageCleanupService) ListTasks(ctx context.Context, params pagination.PaginationParams) ([]UsageCleanupTask, *pagination.PaginationResult, error) {
if s == nil || s.repo == nil {
return nil, nil, fmt.Errorf("cleanup service not ready")
}
return s.repo.ListTasks(ctx, params)
}
func (s *UsageCleanupService) CreateTask(ctx context.Context, filters UsageCleanupFilters, createdBy int64) (*UsageCleanupTask, error) {
if s == nil || s.repo == nil {
return nil, fmt.Errorf("cleanup service not ready")
}
if s.cfg != nil && !s.cfg.UsageCleanup.Enabled {
return nil, infraerrors.New(http.StatusServiceUnavailable, "USAGE_CLEANUP_DISABLED", "usage cleanup is disabled")
}
if createdBy <= 0 {
return nil, infraerrors.BadRequest("USAGE_CLEANUP_INVALID_CREATOR", "invalid creator")
}
log.Printf("[UsageCleanup] create_task requested: operator=%d %s", createdBy, describeUsageCleanupFilters(filters))
sanitizeUsageCleanupFilters(&filters)
if err := s.validateFilters(filters); err != nil {
log.Printf("[UsageCleanup] create_task rejected: operator=%d err=%v %s", createdBy, err, describeUsageCleanupFilters(filters))
return nil, err
}
task := &UsageCleanupTask{
Status: UsageCleanupStatusPending,
Filters: filters,
CreatedBy: createdBy,
}
if err := s.repo.CreateTask(ctx, task); err != nil {
log.Printf("[UsageCleanup] create_task persist failed: operator=%d err=%v %s", createdBy, err, describeUsageCleanupFilters(filters))
return nil, fmt.Errorf("create cleanup task: %w", err)
}
log.Printf("[UsageCleanup] create_task persisted: task=%d operator=%d status=%s deleted_rows=%d %s", task.ID, createdBy, task.Status, task.DeletedRows, describeUsageCleanupFilters(filters))
go s.runOnce()
return task, nil
}
func (s *UsageCleanupService) runOnce() {
svc := s
if svc == nil {
return
}
if !atomic.CompareAndSwapInt32(&svc.running, 0, 1) {
log.Printf("[UsageCleanup] run_once skipped: already_running=true")
return
}
defer atomic.StoreInt32(&svc.running, 0)
parent := context.Background()
if svc.workerCtx != nil {
parent = svc.workerCtx
}
ctx, cancel := context.WithTimeout(parent, svc.taskTimeout())
defer cancel()
task, err := svc.repo.ClaimNextPendingTask(ctx, int64(svc.taskTimeout().Seconds()))
if err != nil {
log.Printf("[UsageCleanup] claim pending task failed: %v", err)
return
}
if task == nil {
log.Printf("[UsageCleanup] run_once done: no_task=true")
return
}
log.Printf("[UsageCleanup] task claimed: task=%d status=%s created_by=%d deleted_rows=%d %s", task.ID, task.Status, task.CreatedBy, task.DeletedRows, describeUsageCleanupFilters(task.Filters))
svc.executeTask(ctx, task)
}
func (s *UsageCleanupService) executeTask(ctx context.Context, task *UsageCleanupTask) {
if task == nil {
return
}
batchSize := s.batchSize()
deletedTotal := task.DeletedRows
start := time.Now()
log.Printf("[UsageCleanup] task started: task=%d batch_size=%d deleted_rows=%d %s", task.ID, batchSize, deletedTotal, describeUsageCleanupFilters(task.Filters))
var batchNum int
for {
if ctx != nil && ctx.Err() != nil {
log.Printf("[UsageCleanup] task interrupted: task=%d err=%v", task.ID, ctx.Err())
return
}
canceled, err := s.isTaskCanceled(ctx, task.ID)
if err != nil {
s.markTaskFailed(task.ID, deletedTotal, err)
return
}
if canceled {
log.Printf("[UsageCleanup] task canceled: task=%d deleted_rows=%d duration=%s", task.ID, deletedTotal, time.Since(start))
return
}
batchNum++
deleted, err := s.repo.DeleteUsageLogsBatch(ctx, task.Filters, batchSize)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// 任务被中断(例如服务停止/超时),保持 running 状态,后续通过 stale reclaim 续跑。
log.Printf("[UsageCleanup] task interrupted: task=%d err=%v", task.ID, err)
return
}
s.markTaskFailed(task.ID, deletedTotal, err)
return
}
deletedTotal += deleted
if deleted > 0 {
updateCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
if err := s.repo.UpdateTaskProgress(updateCtx, task.ID, deletedTotal); err != nil {
log.Printf("[UsageCleanup] task progress update failed: task=%d deleted_rows=%d err=%v", task.ID, deletedTotal, err)
}
cancel()
}
if batchNum <= 3 || batchNum%20 == 0 || deleted < int64(batchSize) {
log.Printf("[UsageCleanup] task batch done: task=%d batch=%d deleted=%d deleted_total=%d", task.ID, batchNum, deleted, deletedTotal)
}
if deleted == 0 || deleted < int64(batchSize) {
break
}
}
updateCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := s.repo.MarkTaskSucceeded(updateCtx, task.ID, deletedTotal); err != nil {
log.Printf("[UsageCleanup] update task succeeded failed: task=%d err=%v", task.ID, err)
} else {
log.Printf("[UsageCleanup] task succeeded: task=%d deleted_rows=%d duration=%s", task.ID, deletedTotal, time.Since(start))
}
if s.dashboard != nil {
if err := s.dashboard.TriggerRecomputeRange(task.Filters.StartTime, task.Filters.EndTime); err != nil {
log.Printf("[UsageCleanup] trigger dashboard recompute failed: task=%d err=%v", task.ID, err)
} else {
log.Printf("[UsageCleanup] trigger dashboard recompute: task=%d start=%s end=%s", task.ID, task.Filters.StartTime.UTC().Format(time.RFC3339), task.Filters.EndTime.UTC().Format(time.RFC3339))
}
}
}
func (s *UsageCleanupService) markTaskFailed(taskID int64, deletedRows int64, err error) {
msg := strings.TrimSpace(err.Error())
if len(msg) > 500 {
msg = msg[:500]
}
log.Printf("[UsageCleanup] task failed: task=%d deleted_rows=%d err=%s", taskID, deletedRows, msg)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if updateErr := s.repo.MarkTaskFailed(ctx, taskID, deletedRows, msg); updateErr != nil {
log.Printf("[UsageCleanup] update task failed failed: task=%d err=%v", taskID, updateErr)
}
}
func (s *UsageCleanupService) isTaskCanceled(ctx context.Context, taskID int64) (bool, error) {
if s == nil || s.repo == nil {
return false, fmt.Errorf("cleanup service not ready")
}
checkCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
status, err := s.repo.GetTaskStatus(checkCtx, taskID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
return false, err
}
if status == UsageCleanupStatusCanceled {
log.Printf("[UsageCleanup] task cancel detected: task=%d", taskID)
}
return status == UsageCleanupStatusCanceled, nil
}
func (s *UsageCleanupService) validateFilters(filters UsageCleanupFilters) error {
if filters.StartTime.IsZero() || filters.EndTime.IsZero() {
return infraerrors.BadRequest("USAGE_CLEANUP_MISSING_RANGE", "start_date and end_date are required")
}
if filters.EndTime.Before(filters.StartTime) {
return infraerrors.BadRequest("USAGE_CLEANUP_INVALID_RANGE", "end_date must be after start_date")
}
maxDays := s.maxRangeDays()
if maxDays > 0 {
delta := filters.EndTime.Sub(filters.StartTime)
if delta > time.Duration(maxDays)*24*time.Hour {
return infraerrors.BadRequest("USAGE_CLEANUP_RANGE_TOO_LARGE", fmt.Sprintf("date range exceeds %d days", maxDays))
}
}
return nil
}
func (s *UsageCleanupService) CancelTask(ctx context.Context, taskID int64, canceledBy int64) error {
if s == nil || s.repo == nil {
return fmt.Errorf("cleanup service not ready")
}
if s.cfg != nil && !s.cfg.UsageCleanup.Enabled {
return infraerrors.New(http.StatusServiceUnavailable, "USAGE_CLEANUP_DISABLED", "usage cleanup is disabled")
}
if canceledBy <= 0 {
return infraerrors.BadRequest("USAGE_CLEANUP_INVALID_CANCELLER", "invalid canceller")
}
status, err := s.repo.GetTaskStatus(ctx, taskID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return infraerrors.New(http.StatusNotFound, "USAGE_CLEANUP_TASK_NOT_FOUND", "cleanup task not found")
}
return err
}
log.Printf("[UsageCleanup] cancel_task requested: task=%d operator=%d status=%s", taskID, canceledBy, status)
if status != UsageCleanupStatusPending && status != UsageCleanupStatusRunning {
return infraerrors.New(http.StatusConflict, "USAGE_CLEANUP_CANCEL_CONFLICT", "cleanup task cannot be canceled in current status")
}
ok, err := s.repo.CancelTask(ctx, taskID, canceledBy)
if err != nil {
return err
}
if !ok {
// 状态可能并发改变
return infraerrors.New(http.StatusConflict, "USAGE_CLEANUP_CANCEL_CONFLICT", "cleanup task cannot be canceled in current status")
}
log.Printf("[UsageCleanup] cancel_task done: task=%d operator=%d", taskID, canceledBy)
return nil
}
func sanitizeUsageCleanupFilters(filters *UsageCleanupFilters) {
if filters == nil {
return
}
if filters.UserID != nil && *filters.UserID <= 0 {
filters.UserID = nil
}
if filters.APIKeyID != nil && *filters.APIKeyID <= 0 {
filters.APIKeyID = nil
}
if filters.AccountID != nil && *filters.AccountID <= 0 {
filters.AccountID = nil
}
if filters.GroupID != nil && *filters.GroupID <= 0 {
filters.GroupID = nil
}
if filters.Model != nil {
model := strings.TrimSpace(*filters.Model)
if model == "" {
filters.Model = nil
} else {
filters.Model = &model
}
}
if filters.BillingType != nil && *filters.BillingType < 0 {
filters.BillingType = nil
}
}
func (s *UsageCleanupService) maxRangeDays() int {
if s == nil || s.cfg == nil {
return 31
}
if s.cfg.UsageCleanup.MaxRangeDays > 0 {
return s.cfg.UsageCleanup.MaxRangeDays
}
return 31
}
func (s *UsageCleanupService) batchSize() int {
if s == nil || s.cfg == nil {
return 5000
}
if s.cfg.UsageCleanup.BatchSize > 0 {
return s.cfg.UsageCleanup.BatchSize
}
return 5000
}
func (s *UsageCleanupService) workerInterval() time.Duration {
if s == nil || s.cfg == nil {
return 10 * time.Second
}
if s.cfg.UsageCleanup.WorkerIntervalSeconds > 0 {
return time.Duration(s.cfg.UsageCleanup.WorkerIntervalSeconds) * time.Second
}
return 10 * time.Second
}
func (s *UsageCleanupService) taskTimeout() time.Duration {
if s == nil || s.cfg == nil {
return 30 * time.Minute
}
if s.cfg.UsageCleanup.TaskTimeoutSeconds > 0 {
return time.Duration(s.cfg.UsageCleanup.TaskTimeoutSeconds) * time.Second
}
return 30 * time.Minute
}
package service
import (
"context"
"database/sql"
"errors"
"net/http"
"strings"
"sync"
"testing"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
"github.com/stretchr/testify/require"
)
type cleanupDeleteResponse struct {
deleted int64
err error
}
type cleanupDeleteCall struct {
filters UsageCleanupFilters
limit int
}
type cleanupMarkCall struct {
taskID int64
deletedRows int64
errMsg string
}
type cleanupRepoStub struct {
mu sync.Mutex
created []*UsageCleanupTask
createErr error
listTasks []UsageCleanupTask
listResult *pagination.PaginationResult
listErr error
claimQueue []*UsageCleanupTask
claimErr error
deleteQueue []cleanupDeleteResponse
deleteCalls []cleanupDeleteCall
markSucceeded []cleanupMarkCall
markFailed []cleanupMarkCall
statusByID map[int64]string
statusErr error
progressCalls []cleanupMarkCall
updateErr error
cancelCalls []int64
cancelErr error
cancelResult *bool
markFailedErr error
}
type dashboardRepoStub struct {
recomputeErr error
}
func (s *dashboardRepoStub) AggregateRange(ctx context.Context, start, end time.Time) error {
return nil
}
func (s *dashboardRepoStub) RecomputeRange(ctx context.Context, start, end time.Time) error {
return s.recomputeErr
}
func (s *dashboardRepoStub) GetAggregationWatermark(ctx context.Context) (time.Time, error) {
return time.Time{}, nil
}
func (s *dashboardRepoStub) UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error {
return nil
}
func (s *dashboardRepoStub) CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error {
return nil
}
func (s *dashboardRepoStub) CleanupUsageLogs(ctx context.Context, cutoff time.Time) error {
return nil
}
func (s *dashboardRepoStub) EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error {
return nil
}
func (s *cleanupRepoStub) CreateTask(ctx context.Context, task *UsageCleanupTask) error {
if task == nil {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
if s.createErr != nil {
return s.createErr
}
if task.ID == 0 {
task.ID = int64(len(s.created) + 1)
}
if task.CreatedAt.IsZero() {
task.CreatedAt = time.Now().UTC()
}
if task.UpdatedAt.IsZero() {
task.UpdatedAt = task.CreatedAt
}
clone := *task
s.created = append(s.created, &clone)
return nil
}
func (s *cleanupRepoStub) ListTasks(ctx context.Context, params pagination.PaginationParams) ([]UsageCleanupTask, *pagination.PaginationResult, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.listTasks, s.listResult, s.listErr
}
func (s *cleanupRepoStub) ClaimNextPendingTask(ctx context.Context, staleRunningAfterSeconds int64) (*UsageCleanupTask, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.claimErr != nil {
return nil, s.claimErr
}
if len(s.claimQueue) == 0 {
return nil, nil
}
task := s.claimQueue[0]
s.claimQueue = s.claimQueue[1:]
if s.statusByID == nil {
s.statusByID = map[int64]string{}
}
s.statusByID[task.ID] = UsageCleanupStatusRunning
return task, nil
}
func (s *cleanupRepoStub) GetTaskStatus(ctx context.Context, taskID int64) (string, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.statusErr != nil {
return "", s.statusErr
}
if s.statusByID == nil {
return "", sql.ErrNoRows
}
status, ok := s.statusByID[taskID]
if !ok {
return "", sql.ErrNoRows
}
return status, nil
}
func (s *cleanupRepoStub) UpdateTaskProgress(ctx context.Context, taskID int64, deletedRows int64) error {
s.mu.Lock()
defer s.mu.Unlock()
s.progressCalls = append(s.progressCalls, cleanupMarkCall{taskID: taskID, deletedRows: deletedRows})
if s.updateErr != nil {
return s.updateErr
}
return nil
}
func (s *cleanupRepoStub) CancelTask(ctx context.Context, taskID int64, canceledBy int64) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.cancelCalls = append(s.cancelCalls, taskID)
if s.cancelErr != nil {
return false, s.cancelErr
}
if s.cancelResult != nil {
ok := *s.cancelResult
if ok {
if s.statusByID == nil {
s.statusByID = map[int64]string{}
}
s.statusByID[taskID] = UsageCleanupStatusCanceled
}
return ok, nil
}
if s.statusByID == nil {
s.statusByID = map[int64]string{}
}
status := s.statusByID[taskID]
if status != UsageCleanupStatusPending && status != UsageCleanupStatusRunning {
return false, nil
}
s.statusByID[taskID] = UsageCleanupStatusCanceled
return true, nil
}
func (s *cleanupRepoStub) MarkTaskSucceeded(ctx context.Context, taskID int64, deletedRows int64) error {
s.mu.Lock()
defer s.mu.Unlock()
s.markSucceeded = append(s.markSucceeded, cleanupMarkCall{taskID: taskID, deletedRows: deletedRows})
if s.statusByID == nil {
s.statusByID = map[int64]string{}
}
s.statusByID[taskID] = UsageCleanupStatusSucceeded
return nil
}
func (s *cleanupRepoStub) MarkTaskFailed(ctx context.Context, taskID int64, deletedRows int64, errorMsg string) error {
s.mu.Lock()
defer s.mu.Unlock()
s.markFailed = append(s.markFailed, cleanupMarkCall{taskID: taskID, deletedRows: deletedRows, errMsg: errorMsg})
if s.statusByID == nil {
s.statusByID = map[int64]string{}
}
s.statusByID[taskID] = UsageCleanupStatusFailed
if s.markFailedErr != nil {
return s.markFailedErr
}
return nil
}
func (s *cleanupRepoStub) DeleteUsageLogsBatch(ctx context.Context, filters UsageCleanupFilters, limit int) (int64, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.deleteCalls = append(s.deleteCalls, cleanupDeleteCall{filters: filters, limit: limit})
if len(s.deleteQueue) == 0 {
return 0, nil
}
resp := s.deleteQueue[0]
s.deleteQueue = s.deleteQueue[1:]
return resp.deleted, resp.err
}
func TestUsageCleanupServiceCreateTaskSanitizeFilters(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, MaxRangeDays: 31}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
start := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
end := start.Add(24 * time.Hour)
userID := int64(-1)
apiKeyID := int64(10)
model := " gpt-4 "
billingType := int8(-2)
filters := UsageCleanupFilters{
StartTime: start,
EndTime: end,
UserID: &userID,
APIKeyID: &apiKeyID,
Model: &model,
BillingType: &billingType,
}
task, err := svc.CreateTask(context.Background(), filters, 9)
require.NoError(t, err)
require.Equal(t, UsageCleanupStatusPending, task.Status)
require.Nil(t, task.Filters.UserID)
require.NotNil(t, task.Filters.APIKeyID)
require.Equal(t, apiKeyID, *task.Filters.APIKeyID)
require.NotNil(t, task.Filters.Model)
require.Equal(t, "gpt-4", *task.Filters.Model)
require.Nil(t, task.Filters.BillingType)
require.Equal(t, int64(9), task.CreatedBy)
}
func TestUsageCleanupServiceCreateTaskInvalidCreator(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
filters := UsageCleanupFilters{
StartTime: time.Now(),
EndTime: time.Now().Add(24 * time.Hour),
}
_, err := svc.CreateTask(context.Background(), filters, 0)
require.Error(t, err)
require.Equal(t, "USAGE_CLEANUP_INVALID_CREATOR", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCreateTaskDisabled(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: false}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
filters := UsageCleanupFilters{
StartTime: time.Now(),
EndTime: time.Now().Add(24 * time.Hour),
}
_, err := svc.CreateTask(context.Background(), filters, 1)
require.Error(t, err)
require.Equal(t, http.StatusServiceUnavailable, infraerrors.Code(err))
require.Equal(t, "USAGE_CLEANUP_DISABLED", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCreateTaskRangeTooLarge(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, MaxRangeDays: 1}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
start := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
end := start.Add(48 * time.Hour)
filters := UsageCleanupFilters{StartTime: start, EndTime: end}
_, err := svc.CreateTask(context.Background(), filters, 1)
require.Error(t, err)
require.Equal(t, "USAGE_CLEANUP_RANGE_TOO_LARGE", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCreateTaskMissingRange(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
_, err := svc.CreateTask(context.Background(), UsageCleanupFilters{}, 1)
require.Error(t, err)
require.Equal(t, "USAGE_CLEANUP_MISSING_RANGE", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCreateTaskRepoError(t *testing.T) {
repo := &cleanupRepoStub{createErr: errors.New("db down")}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
filters := UsageCleanupFilters{
StartTime: time.Now(),
EndTime: time.Now().Add(24 * time.Hour),
}
_, err := svc.CreateTask(context.Background(), filters, 1)
require.Error(t, err)
require.Contains(t, err.Error(), "create cleanup task")
}
func TestUsageCleanupServiceRunOnceSuccess(t *testing.T) {
start := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
end := start.Add(2 * time.Hour)
repo := &cleanupRepoStub{
claimQueue: []*UsageCleanupTask{
{ID: 5, Filters: UsageCleanupFilters{StartTime: start, EndTime: end}},
},
deleteQueue: []cleanupDeleteResponse{
{deleted: 2},
{deleted: 2},
{deleted: 1},
},
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2, TaskTimeoutSeconds: 30}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
svc.runOnce()
repo.mu.Lock()
defer repo.mu.Unlock()
require.Len(t, repo.deleteCalls, 3)
require.Len(t, repo.markSucceeded, 1)
require.Empty(t, repo.markFailed)
require.Equal(t, int64(5), repo.markSucceeded[0].taskID)
require.Equal(t, int64(5), repo.markSucceeded[0].deletedRows)
require.Equal(t, 2, repo.deleteCalls[0].limit)
require.Equal(t, start, repo.deleteCalls[0].filters.StartTime)
require.Equal(t, end, repo.deleteCalls[0].filters.EndTime)
}
func TestUsageCleanupServiceRunOnceClaimError(t *testing.T) {
repo := &cleanupRepoStub{claimErr: errors.New("claim failed")}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
svc.runOnce()
repo.mu.Lock()
defer repo.mu.Unlock()
require.Empty(t, repo.markSucceeded)
require.Empty(t, repo.markFailed)
}
func TestUsageCleanupServiceRunOnceAlreadyRunning(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
svc.running = 1
svc.runOnce()
}
func TestUsageCleanupServiceExecuteTaskFailed(t *testing.T) {
longMsg := strings.Repeat("x", 600)
repo := &cleanupRepoStub{
deleteQueue: []cleanupDeleteResponse{
{err: errors.New(longMsg)},
},
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 3}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
task := &UsageCleanupTask{
ID: 11,
Filters: UsageCleanupFilters{
StartTime: time.Now(),
EndTime: time.Now().Add(24 * time.Hour),
},
}
svc.executeTask(context.Background(), task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Len(t, repo.markFailed, 1)
require.Equal(t, int64(11), repo.markFailed[0].taskID)
require.Equal(t, 500, len(repo.markFailed[0].errMsg))
}
func TestUsageCleanupServiceExecuteTaskProgressError(t *testing.T) {
repo := &cleanupRepoStub{
deleteQueue: []cleanupDeleteResponse{
{deleted: 2},
{deleted: 0},
},
updateErr: errors.New("update failed"),
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
task := &UsageCleanupTask{
ID: 8,
Filters: UsageCleanupFilters{
StartTime: time.Now().UTC(),
EndTime: time.Now().UTC().Add(time.Hour),
},
}
svc.executeTask(context.Background(), task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Len(t, repo.markSucceeded, 1)
require.Empty(t, repo.markFailed)
require.Len(t, repo.progressCalls, 1)
}
func TestUsageCleanupServiceExecuteTaskDeleteCanceled(t *testing.T) {
repo := &cleanupRepoStub{
deleteQueue: []cleanupDeleteResponse{
{err: context.Canceled},
},
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
task := &UsageCleanupTask{
ID: 12,
Filters: UsageCleanupFilters{
StartTime: time.Now().UTC(),
EndTime: time.Now().UTC().Add(time.Hour),
},
}
svc.executeTask(context.Background(), task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Empty(t, repo.markSucceeded)
require.Empty(t, repo.markFailed)
}
func TestUsageCleanupServiceExecuteTaskContextCanceled(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
task := &UsageCleanupTask{
ID: 9,
Filters: UsageCleanupFilters{
StartTime: time.Now().UTC(),
EndTime: time.Now().UTC().Add(time.Hour),
},
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
svc.executeTask(ctx, task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Empty(t, repo.markSucceeded)
require.Empty(t, repo.markFailed)
require.Empty(t, repo.deleteCalls)
}
func TestUsageCleanupServiceExecuteTaskMarkFailedUpdateError(t *testing.T) {
repo := &cleanupRepoStub{
deleteQueue: []cleanupDeleteResponse{
{err: errors.New("boom")},
},
markFailedErr: errors.New("update failed"),
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
task := &UsageCleanupTask{
ID: 13,
Filters: UsageCleanupFilters{
StartTime: time.Now().UTC(),
EndTime: time.Now().UTC().Add(time.Hour),
},
}
svc.executeTask(context.Background(), task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Len(t, repo.markFailed, 1)
require.Equal(t, int64(13), repo.markFailed[0].taskID)
}
func TestUsageCleanupServiceExecuteTaskDashboardRecomputeError(t *testing.T) {
repo := &cleanupRepoStub{
deleteQueue: []cleanupDeleteResponse{
{deleted: 0},
},
}
dashboard := NewDashboardAggregationService(&dashboardRepoStub{}, nil, &config.Config{
DashboardAgg: config.DashboardAggregationConfig{Enabled: false},
})
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}}
svc := NewUsageCleanupService(repo, nil, dashboard, cfg)
task := &UsageCleanupTask{
ID: 14,
Filters: UsageCleanupFilters{
StartTime: time.Now().UTC(),
EndTime: time.Now().UTC().Add(time.Hour),
},
}
svc.executeTask(context.Background(), task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Len(t, repo.markSucceeded, 1)
}
func TestUsageCleanupServiceExecuteTaskDashboardRecomputeSuccess(t *testing.T) {
repo := &cleanupRepoStub{
deleteQueue: []cleanupDeleteResponse{
{deleted: 0},
},
}
dashboard := NewDashboardAggregationService(&dashboardRepoStub{}, nil, &config.Config{
DashboardAgg: config.DashboardAggregationConfig{Enabled: true},
})
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}}
svc := NewUsageCleanupService(repo, nil, dashboard, cfg)
task := &UsageCleanupTask{
ID: 15,
Filters: UsageCleanupFilters{
StartTime: time.Now().UTC(),
EndTime: time.Now().UTC().Add(time.Hour),
},
}
svc.executeTask(context.Background(), task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Len(t, repo.markSucceeded, 1)
}
func TestUsageCleanupServiceExecuteTaskCanceled(t *testing.T) {
repo := &cleanupRepoStub{
statusByID: map[int64]string{
3: UsageCleanupStatusCanceled,
},
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
task := &UsageCleanupTask{
ID: 3,
Filters: UsageCleanupFilters{
StartTime: time.Now().UTC(),
EndTime: time.Now().UTC().Add(time.Hour),
},
}
svc.executeTask(context.Background(), task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Empty(t, repo.deleteCalls)
require.Empty(t, repo.markSucceeded)
require.Empty(t, repo.markFailed)
}
func TestUsageCleanupServiceCancelTaskSuccess(t *testing.T) {
repo := &cleanupRepoStub{
statusByID: map[int64]string{
5: UsageCleanupStatusPending,
},
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 5, 9)
require.NoError(t, err)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Equal(t, UsageCleanupStatusCanceled, repo.statusByID[5])
require.Len(t, repo.cancelCalls, 1)
}
func TestUsageCleanupServiceCancelTaskDisabled(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: false}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 1, 2)
require.Error(t, err)
require.Equal(t, http.StatusServiceUnavailable, infraerrors.Code(err))
require.Equal(t, "USAGE_CLEANUP_DISABLED", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCancelTaskNotFound(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 999, 1)
require.Error(t, err)
require.Equal(t, http.StatusNotFound, infraerrors.Code(err))
require.Equal(t, "USAGE_CLEANUP_TASK_NOT_FOUND", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCancelTaskStatusError(t *testing.T) {
repo := &cleanupRepoStub{statusErr: errors.New("status broken")}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 7, 1)
require.Error(t, err)
require.Contains(t, err.Error(), "status broken")
}
func TestUsageCleanupServiceCancelTaskConflict(t *testing.T) {
repo := &cleanupRepoStub{
statusByID: map[int64]string{
7: UsageCleanupStatusSucceeded,
},
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 7, 1)
require.Error(t, err)
require.Equal(t, http.StatusConflict, infraerrors.Code(err))
require.Equal(t, "USAGE_CLEANUP_CANCEL_CONFLICT", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCancelTaskRepoConflict(t *testing.T) {
shouldCancel := false
repo := &cleanupRepoStub{
statusByID: map[int64]string{
7: UsageCleanupStatusPending,
},
cancelResult: &shouldCancel,
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 7, 1)
require.Error(t, err)
require.Equal(t, http.StatusConflict, infraerrors.Code(err))
require.Equal(t, "USAGE_CLEANUP_CANCEL_CONFLICT", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCancelTaskRepoError(t *testing.T) {
repo := &cleanupRepoStub{
statusByID: map[int64]string{
7: UsageCleanupStatusPending,
},
cancelErr: errors.New("cancel failed"),
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 7, 1)
require.Error(t, err)
require.Contains(t, err.Error(), "cancel failed")
}
func TestUsageCleanupServiceCancelTaskInvalidCanceller(t *testing.T) {
repo := &cleanupRepoStub{
statusByID: map[int64]string{
7: UsageCleanupStatusRunning,
},
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 7, 0)
require.Error(t, err)
require.Equal(t, "USAGE_CLEANUP_INVALID_CANCELLER", infraerrors.Reason(err))
}
func TestUsageCleanupServiceListTasks(t *testing.T) {
repo := &cleanupRepoStub{
listTasks: []UsageCleanupTask{{ID: 1}, {ID: 2}},
listResult: &pagination.PaginationResult{
Total: 2,
Page: 1,
PageSize: 20,
Pages: 1,
},
}
svc := NewUsageCleanupService(repo, nil, nil, &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}})
tasks, result, err := svc.ListTasks(context.Background(), pagination.PaginationParams{Page: 1, PageSize: 20})
require.NoError(t, err)
require.Len(t, tasks, 2)
require.Equal(t, int64(2), result.Total)
}
func TestUsageCleanupServiceListTasksNotReady(t *testing.T) {
var nilSvc *UsageCleanupService
_, _, err := nilSvc.ListTasks(context.Background(), pagination.PaginationParams{Page: 1, PageSize: 20})
require.Error(t, err)
svc := NewUsageCleanupService(nil, nil, nil, &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}})
_, _, err = svc.ListTasks(context.Background(), pagination.PaginationParams{Page: 1, PageSize: 20})
require.Error(t, err)
}
func TestUsageCleanupServiceDefaultsAndLifecycle(t *testing.T) {
var nilSvc *UsageCleanupService
require.Equal(t, 31, nilSvc.maxRangeDays())
require.Equal(t, 5000, nilSvc.batchSize())
require.Equal(t, 10*time.Second, nilSvc.workerInterval())
require.Equal(t, 30*time.Minute, nilSvc.taskTimeout())
nilSvc.Start()
nilSvc.Stop()
repo := &cleanupRepoStub{}
cfgDisabled := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: false}}
svcDisabled := NewUsageCleanupService(repo, nil, nil, cfgDisabled)
svcDisabled.Start()
svcDisabled.Stop()
timingWheel, err := NewTimingWheelService()
require.NoError(t, err)
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, WorkerIntervalSeconds: 5}}
svc := NewUsageCleanupService(repo, timingWheel, nil, cfg)
require.Equal(t, 5*time.Second, svc.workerInterval())
svc.Start()
svc.Stop()
cfgFallback := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svcFallback := NewUsageCleanupService(repo, timingWheel, nil, cfgFallback)
require.Equal(t, 31, svcFallback.maxRangeDays())
require.Equal(t, 5000, svcFallback.batchSize())
require.Equal(t, 10*time.Second, svcFallback.workerInterval())
svcMissingDeps := NewUsageCleanupService(nil, nil, nil, cfgFallback)
svcMissingDeps.Start()
}
func TestSanitizeUsageCleanupFiltersModelEmpty(t *testing.T) {
model := " "
apiKeyID := int64(-5)
accountID := int64(-1)
groupID := int64(-2)
filters := UsageCleanupFilters{
UserID: &apiKeyID,
APIKeyID: &apiKeyID,
AccountID: &accountID,
GroupID: &groupID,
Model: &model,
}
sanitizeUsageCleanupFilters(&filters)
require.Nil(t, filters.UserID)
require.Nil(t, filters.APIKeyID)
require.Nil(t, filters.AccountID)
require.Nil(t, filters.GroupID)
require.Nil(t, filters.Model)
}
func TestDescribeUsageCleanupFiltersAllFields(t *testing.T) {
start := time.Date(2024, 2, 1, 10, 0, 0, 0, time.UTC)
end := start.Add(2 * time.Hour)
userID := int64(1)
apiKeyID := int64(2)
accountID := int64(3)
groupID := int64(4)
model := " gpt-4 "
stream := true
billingType := int8(2)
filters := UsageCleanupFilters{
StartTime: start,
EndTime: end,
UserID: &userID,
APIKeyID: &apiKeyID,
AccountID: &accountID,
GroupID: &groupID,
Model: &model,
Stream: &stream,
BillingType: &billingType,
}
desc := describeUsageCleanupFilters(filters)
require.Equal(t, "start=2024-02-01T10:00:00Z end=2024-02-01T12:00:00Z user_id=1 api_key_id=2 account_id=3 group_id=4 model=gpt-4 stream=true billing_type=2", desc)
}
func TestUsageCleanupServiceIsTaskCanceledNotFound(t *testing.T) {
repo := &cleanupRepoStub{}
svc := NewUsageCleanupService(repo, nil, nil, &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}})
canceled, err := svc.isTaskCanceled(context.Background(), 9)
require.NoError(t, err)
require.False(t, canceled)
}
func TestUsageCleanupServiceIsTaskCanceledError(t *testing.T) {
repo := &cleanupRepoStub{statusErr: errors.New("status err")}
svc := NewUsageCleanupService(repo, nil, nil, &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}})
_, err := svc.isTaskCanceled(context.Background(), 9)
require.Error(t, err)
require.Contains(t, err.Error(), "status err")
}
......@@ -58,6 +58,13 @@ func ProvideDashboardAggregationService(repo DashboardAggregationRepository, tim
return svc
}
// ProvideUsageCleanupService 创建并启动使用记录清理任务服务
func ProvideUsageCleanupService(repo UsageCleanupRepository, timingWheel *TimingWheelService, dashboardAgg *DashboardAggregationService, cfg *config.Config) *UsageCleanupService {
svc := NewUsageCleanupService(repo, timingWheel, dashboardAgg, cfg)
svc.Start()
return svc
}
// ProvideAccountExpiryService creates and starts AccountExpiryService.
func ProvideAccountExpiryService(accountRepo AccountRepository) *AccountExpiryService {
svc := NewAccountExpiryService(accountRepo, time.Minute)
......@@ -251,6 +258,7 @@ var ProviderSet = wire.NewSet(
ProvideAccountExpiryService,
ProvideTimingWheelService,
ProvideDashboardAggregationService,
ProvideUsageCleanupService,
ProvideDeferredService,
NewAntigravityQuotaFetcher,
NewUserAttributeService,
......
-- 兼容旧库:若尚未创建 user_allowed_groups,则确保 users.allowed_groups 存在,避免 007 迁移回填失败。
DO $$
BEGIN
IF to_regclass('public.user_allowed_groups') IS NULL THEN
IF EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'users'
) THEN
ALTER TABLE users
ADD COLUMN IF NOT EXISTS allowed_groups BIGINT[] DEFAULT NULL;
END IF;
END IF;
END $$;
-- 兼容缺失 users.allowed_groups 的老库,确保 007 回填可执行。
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'users'
) THEN
IF NOT EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'users'
AND column_name = 'allowed_groups'
) THEN
IF NOT EXISTS (
SELECT 1
FROM schema_migrations
WHERE filename = '014_drop_legacy_allowed_groups.sql'
) THEN
ALTER TABLE users
ADD COLUMN IF NOT EXISTS allowed_groups BIGINT[] DEFAULT NULL;
END IF;
END IF;
END IF;
END $$;
-- 042_add_usage_cleanup_tasks.sql
-- 使用记录清理任务表
CREATE TABLE IF NOT EXISTS usage_cleanup_tasks (
id BIGSERIAL PRIMARY KEY,
status VARCHAR(20) NOT NULL,
filters JSONB NOT NULL,
created_by BIGINT NOT NULL REFERENCES users(id) ON DELETE RESTRICT,
deleted_rows BIGINT NOT NULL DEFAULT 0,
error_message TEXT,
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_usage_cleanup_tasks_status_created_at
ON usage_cleanup_tasks(status, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_usage_cleanup_tasks_created_at
ON usage_cleanup_tasks(created_at DESC);
-- 043_add_usage_cleanup_cancel_audit.sql
-- usage_cleanup_tasks 取消任务审计字段
ALTER TABLE usage_cleanup_tasks
ADD COLUMN IF NOT EXISTS canceled_by BIGINT REFERENCES users(id) ON DELETE SET NULL,
ADD COLUMN IF NOT EXISTS canceled_at TIMESTAMPTZ;
CREATE INDEX IF NOT EXISTS idx_usage_cleanup_tasks_canceled_at
ON usage_cleanup_tasks(canceled_at DESC);
......@@ -251,6 +251,27 @@ dashboard_aggregation:
# 日聚合保留天数
daily_days: 730
# =============================================================================
# Usage Cleanup Task Configuration
# 使用记录清理任务配置(重启生效)
# =============================================================================
usage_cleanup:
# Enable cleanup task worker
# 启用清理任务执行器
enabled: true
# Max date range (days) per task
# 单次任务最大时间跨度(天)
max_range_days: 31
# Batch delete size
# 单批删除数量
batch_size: 5000
# Worker interval (seconds)
# 执行器轮询间隔(秒)
worker_interval_seconds: 10
# Task execution timeout (seconds)
# 单次任务最大执行时长(秒)
task_timeout_seconds: 1800
# =============================================================================
# Concurrency Wait Configuration
# 并发等待配置
......
......@@ -305,6 +305,27 @@ dashboard_aggregation:
# 日聚合保留天数
daily_days: 730
# =============================================================================
# Usage Cleanup Task Configuration
# 使用记录清理任务配置(重启生效)
# =============================================================================
usage_cleanup:
# Enable cleanup task worker
# 启用清理任务执行器
enabled: true
# Max date range (days) per task
# 单次任务最大时间跨度(天)
max_range_days: 31
# Batch delete size
# 单批删除数量
batch_size: 5000
# Worker interval (seconds)
# 执行器轮询间隔(秒)
worker_interval_seconds: 10
# Task execution timeout (seconds)
# 单次任务最大执行时长(秒)
task_timeout_seconds: 1800
# =============================================================================
# Concurrency Wait Configuration
# 并发等待配置
......
......@@ -50,6 +50,7 @@ export interface TrendParams {
account_id?: number
group_id?: number
stream?: boolean
billing_type?: number | null
}
export interface TrendResponse {
......@@ -78,6 +79,7 @@ export interface ModelStatsParams {
account_id?: number
group_id?: number
stream?: boolean
billing_type?: number | null
}
export interface ModelStatsResponse {
......
......@@ -31,6 +31,46 @@ export interface SimpleApiKey {
user_id: number
}
export interface UsageCleanupFilters {
start_time: string
end_time: string
user_id?: number
api_key_id?: number
account_id?: number
group_id?: number
model?: string | null
stream?: boolean | null
billing_type?: number | null
}
export interface UsageCleanupTask {
id: number
status: string
filters: UsageCleanupFilters
created_by: number
deleted_rows: number
error_message?: string | null
canceled_by?: number | null
canceled_at?: string | null
started_at?: string | null
finished_at?: string | null
created_at: string
updated_at: string
}
export interface CreateUsageCleanupTaskRequest {
start_date: string
end_date: string
user_id?: number
api_key_id?: number
account_id?: number
group_id?: number
model?: string | null
stream?: boolean | null
billing_type?: number | null
timezone?: string
}
export interface AdminUsageQueryParams extends UsageQueryParams {
user_id?: number
}
......@@ -108,11 +148,51 @@ export async function searchApiKeys(userId?: number, keyword?: string): Promise<
return data
}
/**
* List usage cleanup tasks (admin only)
* @param params - Query parameters for pagination
* @returns Paginated list of cleanup tasks
*/
export async function listCleanupTasks(
params: { page?: number; page_size?: number },
options?: { signal?: AbortSignal }
): Promise<PaginatedResponse<UsageCleanupTask>> {
const { data } = await apiClient.get<PaginatedResponse<UsageCleanupTask>>('/admin/usage/cleanup-tasks', {
params,
signal: options?.signal
})
return data
}
/**
* Create a usage cleanup task (admin only)
* @param payload - Cleanup task parameters
* @returns Created cleanup task
*/
export async function createCleanupTask(payload: CreateUsageCleanupTaskRequest): Promise<UsageCleanupTask> {
const { data } = await apiClient.post<UsageCleanupTask>('/admin/usage/cleanup-tasks', payload)
return data
}
/**
* Cancel a usage cleanup task (admin only)
* @param taskId - Task ID to cancel
*/
export async function cancelCleanupTask(taskId: number): Promise<{ id: number; status: string }> {
const { data } = await apiClient.post<{ id: number; status: string }>(
`/admin/usage/cleanup-tasks/${taskId}/cancel`
)
return data
}
export const adminUsageAPI = {
list,
getStats,
searchUsers,
searchApiKeys
searchApiKeys,
listCleanupTasks,
createCleanupTask,
cancelCleanupTask
}
export default adminUsageAPI
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment