Commit 5906f9ab authored by yangjianbo's avatar yangjianbo
Browse files

fix(数据层): 修复数据完整性与仓储一致性问题

## 数据完整性修复 (fix-critical-data-integrity)
- 添加 error_translate.go 统一错误转换层
- 修复 nil 输入和 NotFound 错误处理
- 增强仓储层错误一致性

## 仓储一致性修复 (fix-high-repository-consistency)
- Group schema 添加 default_validity_days 字段
- Account schema 添加 proxy edge 关联
- 新增 UsageLog ent schema 定义
- 修复 UpdateBalance/UpdateConcurrency 受影响行数校验

## 数据卫生修复 (fix-medium-data-hygiene)
- UserSubscription 添加软删除支持 (SoftDeleteMixin)
- RedeemCode/Setting 添加硬删除策略文档
- account_groups/user_allowed_groups 的 created_at 声明 timestamptz
- 停止写入 legacy users.allowed_groups 列
- 新增迁移: 011-014 (索引优化、软删除、孤立数据审计、列清理)

## 测试补充
- 添加 UserSubscription 软删除测试
- 添加迁移回归测试
- 添加 NotFound 错误测试

🤖 Generated with [Claude Code](https://claude.com/claude-code

)
Co-Authored-By: default avatarClaude Opus 4.5 <noreply@anthropic.com>
parent 820bb16c
......@@ -11,10 +11,11 @@ type Group struct {
IsExclusive bool
Status string
SubscriptionType string
DailyLimitUSD *float64
WeeklyLimitUSD *float64
MonthlyLimitUSD *float64
SubscriptionType string
DailyLimitUSD *float64
WeeklyLimitUSD *float64
MonthlyLimitUSD *float64
DefaultValidityDays int
CreatedAt time.Time
UpdatedAt time.Time
......
......@@ -9,6 +9,7 @@ import (
"strings"
"time"
dbent "github.com/Wei-Shaw/sub2api/ent"
infraerrors "github.com/Wei-Shaw/sub2api/internal/infrastructure/errors"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
)
......@@ -72,6 +73,7 @@ type RedeemService struct {
subscriptionService *SubscriptionService
cache RedeemCache
billingCacheService *BillingCacheService
entClient *dbent.Client
}
// NewRedeemService 创建兑换码服务实例
......@@ -81,6 +83,7 @@ func NewRedeemService(
subscriptionService *SubscriptionService,
cache RedeemCache,
billingCacheService *BillingCacheService,
entClient *dbent.Client,
) *RedeemService {
return &RedeemService{
redeemRepo: redeemRepo,
......@@ -88,6 +91,7 @@ func NewRedeemService(
subscriptionService: subscriptionService,
cache: cache,
billingCacheService: billingCacheService,
entClient: entClient,
}
}
......@@ -248,9 +252,19 @@ func (s *RedeemService) Redeem(ctx context.Context, userID int64, code string) (
}
_ = user // 使用变量避免未使用错误
// 使用数据库事务保证兑换码标记与权益发放的原子性
tx, err := s.entClient.Tx(ctx)
if err != nil {
return nil, fmt.Errorf("begin transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()
// 将事务放入 context,使 repository 方法能够使用同一事务
txCtx := dbent.NewTxContext(ctx, tx)
// 【关键】先标记兑换码为已使用,确保并发安全
// 利用数据库乐观锁(WHERE status = 'unused')保证原子性
if err := s.redeemRepo.Use(ctx, redeemCode.ID, userID); err != nil {
if err := s.redeemRepo.Use(txCtx, redeemCode.ID, userID); err != nil {
if errors.Is(err, ErrRedeemCodeNotFound) || errors.Is(err, ErrRedeemCodeUsed) {
return nil, ErrRedeemCodeUsed
}
......@@ -261,21 +275,13 @@ func (s *RedeemService) Redeem(ctx context.Context, userID int64, code string) (
switch redeemCode.Type {
case RedeemTypeBalance:
// 增加用户余额
if err := s.userRepo.UpdateBalance(ctx, userID, redeemCode.Value); err != nil {
if err := s.userRepo.UpdateBalance(txCtx, userID, redeemCode.Value); err != nil {
return nil, fmt.Errorf("update user balance: %w", err)
}
// 失效余额缓存
if s.billingCacheService != nil {
go func() {
cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = s.billingCacheService.InvalidateUserBalance(cacheCtx, userID)
}()
}
case RedeemTypeConcurrency:
// 增加用户并发数
if err := s.userRepo.UpdateConcurrency(ctx, userID, int(redeemCode.Value)); err != nil {
if err := s.userRepo.UpdateConcurrency(txCtx, userID, int(redeemCode.Value)); err != nil {
return nil, fmt.Errorf("update user concurrency: %w", err)
}
......@@ -284,7 +290,7 @@ func (s *RedeemService) Redeem(ctx context.Context, userID int64, code string) (
if validityDays <= 0 {
validityDays = 30
}
_, _, err := s.subscriptionService.AssignOrExtendSubscription(ctx, &AssignSubscriptionInput{
_, _, err := s.subscriptionService.AssignOrExtendSubscription(txCtx, &AssignSubscriptionInput{
UserID: userID,
GroupID: *redeemCode.GroupID,
ValidityDays: validityDays,
......@@ -294,20 +300,19 @@ func (s *RedeemService) Redeem(ctx context.Context, userID int64, code string) (
if err != nil {
return nil, fmt.Errorf("assign or extend subscription: %w", err)
}
// 失效订阅缓存
if s.billingCacheService != nil {
groupID := *redeemCode.GroupID
go func() {
cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = s.billingCacheService.InvalidateSubscription(cacheCtx, userID, groupID)
}()
}
default:
return nil, fmt.Errorf("unsupported redeem type: %s", redeemCode.Type)
}
// 提交事务
if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("commit transaction: %w", err)
}
// 事务提交成功后失效缓存
s.invalidateRedeemCaches(ctx, userID, redeemCode)
// 重新获取更新后的兑换码
redeemCode, err = s.redeemRepo.GetByID(ctx, redeemCode.ID)
if err != nil {
......@@ -317,6 +322,31 @@ func (s *RedeemService) Redeem(ctx context.Context, userID int64, code string) (
return redeemCode, nil
}
// invalidateRedeemCaches 失效兑换相关的缓存
func (s *RedeemService) invalidateRedeemCaches(ctx context.Context, userID int64, redeemCode *RedeemCode) {
if s.billingCacheService == nil {
return
}
switch redeemCode.Type {
case RedeemTypeBalance:
go func() {
cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = s.billingCacheService.InvalidateUserBalance(cacheCtx, userID)
}()
case RedeemTypeSubscription:
if redeemCode.GroupID != nil {
groupID := *redeemCode.GroupID
go func() {
cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = s.billingCacheService.InvalidateSubscription(cacheCtx, userID, groupID)
}()
}
}
}
// GetByID 根据ID获取兑换码
func (s *RedeemService) GetByID(ctx context.Context, id int64) (*RedeemCode, error) {
code, err := s.redeemRepo.GetByID(ctx, id)
......
......@@ -26,6 +26,7 @@ var (
ErrDailyLimitExceeded = infraerrors.TooManyRequests("DAILY_LIMIT_EXCEEDED", "daily usage limit exceeded")
ErrWeeklyLimitExceeded = infraerrors.TooManyRequests("WEEKLY_LIMIT_EXCEEDED", "weekly usage limit exceeded")
ErrMonthlyLimitExceeded = infraerrors.TooManyRequests("MONTHLY_LIMIT_EXCEEDED", "monthly usage limit exceeded")
ErrSubscriptionNilInput = infraerrors.BadRequest("SUBSCRIPTION_NIL_INPUT", "subscription input cannot be nil")
)
// SubscriptionService 订阅服务
......
-- 011_remove_duplicate_unique_indexes.sql
-- 移除重复的唯一索引
-- 这些字段在 ent schema 的 Fields() 中已声明 .Unique(),
-- 因此在 Indexes() 中再次声明 index.Fields("x").Unique() 会创建重复索引。
-- 本迁移脚本清理这些冗余索引。
-- 重复索引命名约定(由 Ent 自动生成/历史迁移遗留):
-- - 字段级 Unique() 创建的索引名: <table>_<field>_key
-- - Indexes() 中的 Unique() 创建的索引名: <table>_<field>
-- - 初始化迁移中的非唯一索引: idx_<table>_<field>
-- 仅当索引存在时才删除(幂等操作)
-- api_keys 表: key 字段
DROP INDEX IF EXISTS apikey_key;
DROP INDEX IF EXISTS api_keys_key;
DROP INDEX IF EXISTS idx_api_keys_key;
-- users 表: email 字段
DROP INDEX IF EXISTS user_email;
DROP INDEX IF EXISTS users_email;
DROP INDEX IF EXISTS idx_users_email;
-- settings 表: key 字段
DROP INDEX IF EXISTS settings_key;
DROP INDEX IF EXISTS idx_settings_key;
-- redeem_codes 表: code 字段
DROP INDEX IF EXISTS redeemcode_code;
DROP INDEX IF EXISTS redeem_codes_code;
DROP INDEX IF EXISTS idx_redeem_codes_code;
-- groups 表: name 字段
DROP INDEX IF EXISTS group_name;
DROP INDEX IF EXISTS groups_name;
DROP INDEX IF EXISTS idx_groups_name;
-- 注意: 每个字段的唯一约束仍由字段级 Unique() 创建的约束保留,
-- 如 api_keys_key_key、users_email_key 等。
-- 012: 为 user_subscriptions 表添加软删除支持
-- 任务:fix-medium-data-hygiene 1.1
-- 添加 deleted_at 字段
ALTER TABLE user_subscriptions
ADD COLUMN IF NOT EXISTS deleted_at TIMESTAMPTZ DEFAULT NULL;
-- 添加 deleted_at 索引以优化软删除查询
CREATE INDEX IF NOT EXISTS usersubscription_deleted_at
ON user_subscriptions (deleted_at);
-- 注释:与其他使用软删除的实体保持一致
COMMENT ON COLUMN user_subscriptions.deleted_at IS '软删除时间戳,NULL 表示未删除';
-- 013: 记录 users.allowed_groups 中的孤立 group_id
-- 任务:fix-medium-data-hygiene 3.1
--
-- 目的:在删除 legacy allowed_groups 列前,记录所有引用了不存在 group 的孤立记录
-- 这些记录可用于审计或后续数据修复
-- 创建审计表存储孤立的 allowed_groups 记录
CREATE TABLE IF NOT EXISTS orphan_allowed_groups_audit (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
group_id BIGINT NOT NULL,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (user_id, group_id)
);
-- 记录孤立的 group_id(存在于 users.allowed_groups 但不存在于 groups 表)
INSERT INTO orphan_allowed_groups_audit (user_id, group_id)
SELECT u.id, x.group_id
FROM users u
CROSS JOIN LATERAL unnest(u.allowed_groups) AS x(group_id)
LEFT JOIN groups g ON g.id = x.group_id
WHERE u.allowed_groups IS NOT NULL
AND g.id IS NULL
ON CONFLICT (user_id, group_id) DO NOTHING;
-- 添加索引便于查询
CREATE INDEX IF NOT EXISTS idx_orphan_allowed_groups_audit_user_id
ON orphan_allowed_groups_audit(user_id);
-- 记录迁移完成信息
COMMENT ON TABLE orphan_allowed_groups_audit IS
'审计表:记录 users.allowed_groups 中引用的不存在的 group_id,用于数据清理前的审计';
-- 014: 删除 legacy users.allowed_groups 列
-- 任务:fix-medium-data-hygiene 3.3
--
-- 前置条件:
-- - 迁移 007 已将数据回填到 user_allowed_groups 联接表
-- - 迁移 013 已记录所有孤立的 group_id 到审计表
-- - 应用代码已停止写入该列(3.2 完成)
--
-- 该列现已废弃,所有读写操作均使用 user_allowed_groups 联接表。
-- 删除 allowed_groups 列
ALTER TABLE users DROP COLUMN IF EXISTS allowed_groups;
-- 添加注释记录删除原因
COMMENT ON TABLE users IS '用户表。注:原 allowed_groups BIGINT[] 列已迁移至 user_allowed_groups 联接表';
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment