Unverified Commit ac114738 authored by Wesley Liddick's avatar Wesley Liddick Committed by GitHub
Browse files

Merge pull request #1850 from touwaeriol/feat/channel-insights

feat(monitor): channel monitor with available channels & feature flags
parents 0a80ec80 09fd83ab
package service
import (
"context"
"fmt"
"regexp"
"strings"
)
// ChannelMonitorRequestTemplateRepository 模板数据访问接口。
type ChannelMonitorRequestTemplateRepository interface {
Create(ctx context.Context, t *ChannelMonitorRequestTemplate) error
GetByID(ctx context.Context, id int64) (*ChannelMonitorRequestTemplate, error)
Update(ctx context.Context, t *ChannelMonitorRequestTemplate) error
Delete(ctx context.Context, id int64) error
List(ctx context.Context, params ChannelMonitorRequestTemplateListParams) ([]*ChannelMonitorRequestTemplate, error)
// ApplyToMonitors 把模板当前的 extra_headers / body_override_mode / body_override
// 批量覆盖到指定 monitorIDs 的监控上(同时还要求这些监控当前 template_id = id,
// 防止误覆盖未关联的监控)。monitorIDs 必须非空;空列表直接返回 0 不写库。
// 返回被覆盖的监控数量。
ApplyToMonitors(ctx context.Context, id int64, monitorIDs []int64) (int64, error)
// CountAssociatedMonitors 统计 template_id = id 的监控数(用于 UI 展示「应用到 N 个配置」)。
CountAssociatedMonitors(ctx context.Context, id int64) (int64, error)
// ListAssociatedMonitors 列出所有 template_id = id 的监控简略信息(id/name/provider/enabled)
// 给 apply picker UI 用,避免前端再做一次 list+filter。
ListAssociatedMonitors(ctx context.Context, id int64) ([]*AssociatedMonitorBrief, error)
}
// AssociatedMonitorBrief 模板关联监控的简略信息(picker / 列表展示用)。
type AssociatedMonitorBrief struct {
ID int64
Name string
Provider string
Enabled bool
}
// ChannelMonitorRequestTemplateService 模板管理 service。
type ChannelMonitorRequestTemplateService struct {
repo ChannelMonitorRequestTemplateRepository
}
// NewChannelMonitorRequestTemplateService 创建模板 service。
func NewChannelMonitorRequestTemplateService(repo ChannelMonitorRequestTemplateRepository) *ChannelMonitorRequestTemplateService {
return &ChannelMonitorRequestTemplateService{repo: repo}
}
// ---------- CRUD ----------
// List 按 provider 过滤(空串 = 全部),不分页(模板量级小)。
func (s *ChannelMonitorRequestTemplateService) List(ctx context.Context, params ChannelMonitorRequestTemplateListParams) ([]*ChannelMonitorRequestTemplate, error) {
if params.Provider != "" {
if err := validateProvider(params.Provider); err != nil {
return nil, err
}
}
return s.repo.List(ctx, params)
}
// Get 返回单个模板。
func (s *ChannelMonitorRequestTemplateService) Get(ctx context.Context, id int64) (*ChannelMonitorRequestTemplate, error) {
return s.repo.GetByID(ctx, id)
}
// Create 创建模板(会校验 headers 黑名单和 body 模式匹配)。
func (s *ChannelMonitorRequestTemplateService) Create(ctx context.Context, p ChannelMonitorRequestTemplateCreateParams) (*ChannelMonitorRequestTemplate, error) {
if err := validateTemplateCreateParams(p); err != nil {
return nil, err
}
t := &ChannelMonitorRequestTemplate{
Name: strings.TrimSpace(p.Name),
Provider: p.Provider,
Description: strings.TrimSpace(p.Description),
ExtraHeaders: emptyHeadersIfNil(p.ExtraHeaders),
BodyOverrideMode: defaultBodyMode(p.BodyOverrideMode),
BodyOverride: p.BodyOverride,
}
if err := s.repo.Create(ctx, t); err != nil {
return nil, fmt.Errorf("create template: %w", err)
}
return t, nil
}
// Update 更新模板(provider 不可改)。
func (s *ChannelMonitorRequestTemplateService) Update(ctx context.Context, id int64, p ChannelMonitorRequestTemplateUpdateParams) (*ChannelMonitorRequestTemplate, error) {
existing, err := s.repo.GetByID(ctx, id)
if err != nil {
return nil, err
}
if err := applyTemplateUpdate(existing, p); err != nil {
return nil, err
}
if err := s.repo.Update(ctx, existing); err != nil {
return nil, fmt.Errorf("update template: %w", err)
}
return existing, nil
}
// Delete 删除模板。关联监控的 template_id 会被 SET NULL,监控保留快照继续跑。
func (s *ChannelMonitorRequestTemplateService) Delete(ctx context.Context, id int64) error {
if err := s.repo.Delete(ctx, id); err != nil {
return fmt.Errorf("delete template: %w", err)
}
return nil
}
// ApplyToMonitors 把模板当前配置应用到 monitorIDs 列表里的关联监控。
// monitorIDs 必须非空且每个 id 都必须当前 template_id = id;不满足条件的会被 SQL WHERE 过滤掉。
// 返回实际被覆盖的监控数。
func (s *ChannelMonitorRequestTemplateService) ApplyToMonitors(ctx context.Context, id int64, monitorIDs []int64) (int64, error) {
if _, err := s.repo.GetByID(ctx, id); err != nil {
return 0, err
}
if len(monitorIDs) == 0 {
return 0, ErrChannelMonitorTemplateApplyEmpty
}
affected, err := s.repo.ApplyToMonitors(ctx, id, monitorIDs)
if err != nil {
return 0, fmt.Errorf("apply template to monitors: %w", err)
}
return affected, nil
}
// CountAssociatedMonitors 返回关联监控数。
func (s *ChannelMonitorRequestTemplateService) CountAssociatedMonitors(ctx context.Context, id int64) (int64, error) {
return s.repo.CountAssociatedMonitors(ctx, id)
}
// ListAssociatedMonitors 返回模板关联的所有监控简略信息。
// 给前端 apply picker 用,handler 直接吐 JSON 不再做 join。
func (s *ChannelMonitorRequestTemplateService) ListAssociatedMonitors(ctx context.Context, id int64) ([]*AssociatedMonitorBrief, error) {
if _, err := s.repo.GetByID(ctx, id); err != nil {
return nil, err
}
return s.repo.ListAssociatedMonitors(ctx, id)
}
// ---------- 校验 & 工具 ----------
// validateTemplateCreateParams 聚合 create 入参校验,避免函数超过 30 行。
func validateTemplateCreateParams(p ChannelMonitorRequestTemplateCreateParams) error {
if strings.TrimSpace(p.Name) == "" {
return ErrChannelMonitorTemplateMissingName
}
if err := validateProvider(p.Provider); err != nil {
return ErrChannelMonitorTemplateInvalidProvider
}
if err := validateBodyModeParams(p.BodyOverrideMode, p.BodyOverride); err != nil {
return err
}
if err := validateExtraHeaders(p.ExtraHeaders); err != nil {
return err
}
return nil
}
// applyTemplateUpdate 把 update params 中非 nil 字段应用到 existing 上。
func applyTemplateUpdate(existing *ChannelMonitorRequestTemplate, p ChannelMonitorRequestTemplateUpdateParams) error {
if p.Name != nil {
name := strings.TrimSpace(*p.Name)
if name == "" {
return ErrChannelMonitorTemplateMissingName
}
existing.Name = name
}
if p.Description != nil {
existing.Description = strings.TrimSpace(*p.Description)
}
if p.ExtraHeaders != nil {
if err := validateExtraHeaders(*p.ExtraHeaders); err != nil {
return err
}
existing.ExtraHeaders = emptyHeadersIfNil(*p.ExtraHeaders)
}
// BodyOverrideMode / BodyOverride 联合校验:任一变化都用「更新后的值」做校验。
newMode := existing.BodyOverrideMode
newBody := existing.BodyOverride
if p.BodyOverrideMode != nil {
newMode = *p.BodyOverrideMode
}
if p.BodyOverride != nil {
newBody = *p.BodyOverride
}
if err := validateBodyModeParams(newMode, newBody); err != nil {
return err
}
existing.BodyOverrideMode = defaultBodyMode(newMode)
existing.BodyOverride = newBody
return nil
}
// validateBodyModeParams 校验 body_override_mode 合法,且 merge/replace 模式下 body_override 非空。
func validateBodyModeParams(mode string, body map[string]any) error {
switch mode {
case "", MonitorBodyOverrideModeOff:
return nil
case MonitorBodyOverrideModeMerge, MonitorBodyOverrideModeReplace:
if len(body) == 0 {
return ErrChannelMonitorTemplateBodyRequired
}
return nil
default:
return ErrChannelMonitorTemplateInvalidBodyMode
}
}
// headerNameRegex 合法 header 名:RFC 7230 token(ASCII 可见字符减特殊符号)。
var headerNameRegex = regexp.MustCompile(`^[A-Za-z0-9!#$%&'*+\-.^_` + "`" + `|~]+$`)
// forbiddenHeaderNames hop-by-hop + HTTP 客户端自管的 header;禁止用户覆盖,
// 否则会让 Go http.Client 行为异常(双重 Content-Length、连接复用错乱等)。
var forbiddenHeaderNames = map[string]bool{
"host": true,
"content-length": true,
"content-encoding": true,
"transfer-encoding": true,
"connection": true,
}
// IsForbiddenHeaderName 对外暴露,checker 运行时也会再过滤一次做兜底。
func IsForbiddenHeaderName(name string) bool {
return forbiddenHeaderNames[strings.ToLower(strings.TrimSpace(name))]
}
// validateExtraHeaders 校验 header 名字格式 + 黑名单。保存时就拒绝非法 header,早失败。
func validateExtraHeaders(h map[string]string) error {
for k := range h {
if !headerNameRegex.MatchString(k) {
return ErrChannelMonitorTemplateHeaderInvalidName
}
if IsForbiddenHeaderName(k) {
return ErrChannelMonitorTemplateHeaderForbidden
}
}
return nil
}
// emptyHeadersIfNil 把 nil map 归一成空 map(repo 层写库时 JSONB 需要非 nil)。
func emptyHeadersIfNil(h map[string]string) map[string]string {
if h == nil {
return map[string]string{}
}
return h
}
// defaultBodyMode 空串归一为 off。
func defaultBodyMode(mode string) string {
if mode == "" {
return MonitorBodyOverrideModeOff
}
return mode
}
package service
import (
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"time"
)
// ChannelMonitorRequestTemplate 请求模板(service 层模型)。
// 作用:把一组可复用的 headers + 可选 body 覆盖配置抽出来管理,
// 被监控「应用」时以快照方式拷贝到监控本身的同名字段。
type ChannelMonitorRequestTemplate struct {
ID int64
Name string
Provider string
Description string
ExtraHeaders map[string]string
BodyOverrideMode string
BodyOverride map[string]any
CreatedAt time.Time
UpdatedAt time.Time
}
// ChannelMonitorRequestTemplateListParams 列表过滤。
type ChannelMonitorRequestTemplateListParams struct {
Provider string // 空 = 全部;非空则按 provider 过滤
}
// ChannelMonitorRequestTemplateCreateParams 创建参数。
type ChannelMonitorRequestTemplateCreateParams struct {
Name string
Provider string
Description string
ExtraHeaders map[string]string
BodyOverrideMode string
BodyOverride map[string]any
}
// ChannelMonitorRequestTemplateUpdateParams 更新参数(指针字段 = 不修改)。
// 注意 Provider 不可修改:改 provider 会让已关联监控的 body 黑名单语义错乱。
type ChannelMonitorRequestTemplateUpdateParams struct {
Name *string
Description *string
ExtraHeaders *map[string]string
BodyOverrideMode *string
BodyOverride *map[string]any
}
// 模板相关错误(命名与现有 ErrChannelMonitor* 风格保持一致)。
var (
ErrChannelMonitorTemplateNotFound = infraerrors.NotFound(
"CHANNEL_MONITOR_TEMPLATE_NOT_FOUND", "channel monitor request template not found",
)
ErrChannelMonitorTemplateInvalidProvider = infraerrors.BadRequest(
"CHANNEL_MONITOR_TEMPLATE_INVALID_PROVIDER", "template provider must be one of openai/anthropic/gemini",
)
ErrChannelMonitorTemplateMissingName = infraerrors.BadRequest(
"CHANNEL_MONITOR_TEMPLATE_MISSING_NAME", "template name is required",
)
ErrChannelMonitorTemplateInvalidBodyMode = infraerrors.BadRequest(
"CHANNEL_MONITOR_TEMPLATE_INVALID_BODY_MODE", "body_override_mode must be one of off/merge/replace",
)
ErrChannelMonitorTemplateBodyRequired = infraerrors.BadRequest(
"CHANNEL_MONITOR_TEMPLATE_BODY_REQUIRED", "body_override is required when body_override_mode is merge or replace",
)
ErrChannelMonitorTemplateHeaderForbidden = infraerrors.BadRequest(
"CHANNEL_MONITOR_TEMPLATE_HEADER_FORBIDDEN", "header name is forbidden (hop-by-hop or computed by HTTP client)",
)
ErrChannelMonitorTemplateHeaderInvalidName = infraerrors.BadRequest(
"CHANNEL_MONITOR_TEMPLATE_HEADER_INVALID_NAME", "header name contains invalid characters",
)
ErrChannelMonitorTemplateProviderMismatch = infraerrors.BadRequest(
"CHANNEL_MONITOR_TEMPLATE_PROVIDER_MISMATCH", "monitor provider does not match template provider",
)
ErrChannelMonitorTemplateApplyEmpty = infraerrors.BadRequest(
"CHANNEL_MONITOR_TEMPLATE_APPLY_EMPTY", "monitor_ids must be a non-empty array",
)
)
package service
import "time"
// MonitorBodyOverrideMode 自定义请求体处理模式。
//
// - off 使用 adapter 默认 body(忽略 BodyOverride)
// - merge adapter 默认 body 与 BodyOverride 浅合并(用户优先;
// model/messages/contents 等关键字段在 checker 黑名单内会被静默丢弃)
// - replace 完全用 BodyOverride 作为 body;跳过 challenge 校验,
// 改成 HTTP 2xx + 响应非空即视为可用(用户负责构造 body)
const (
MonitorBodyOverrideModeOff = "off"
MonitorBodyOverrideModeMerge = "merge"
MonitorBodyOverrideModeReplace = "replace"
)
// ChannelMonitor 渠道监控配置(service 层模型,不直接暴露 ent 类型)。
type ChannelMonitor struct {
ID int64
Name string
Provider string
Endpoint string
APIKey string // 解密后的明文 API Key(仅在 service 内部使用,handler 层不应直接序列化返回)
PrimaryModel string
ExtraModels []string
GroupName string
Enabled bool
IntervalSeconds int
LastCheckedAt *time.Time
CreatedBy int64
CreatedAt time.Time
UpdatedAt time.Time
// 请求自定义快照(来自模板拷贝 or 用户手填,运行时直接读取)
TemplateID *int64 // 仅用于 UI 分组 + 一键应用,运行时不用
ExtraHeaders map[string]string // 与 adapter 默认 headers 合并,用户优先
BodyOverrideMode string // off / merge / replace
BodyOverride map[string]any // 仅 mode != off 时使用
// APIKeyDecryptFailed 表示 APIKey 字段无法解密(密钥不一致或损坏)。
// 此时 APIKey 为空字符串,runner / RunCheck 必须跳过该监控并提示重填。
APIKeyDecryptFailed bool
}
// ChannelMonitorListParams 列表查询过滤参数。
type ChannelMonitorListParams struct {
Page int
PageSize int
Provider string
Enabled *bool
Search string
}
// ChannelMonitorCreateParams 创建参数。
type ChannelMonitorCreateParams struct {
Name string
Provider string
Endpoint string
APIKey string
PrimaryModel string
ExtraModels []string
GroupName string
Enabled bool
IntervalSeconds int
CreatedBy int64
TemplateID *int64
ExtraHeaders map[string]string
BodyOverrideMode string
BodyOverride map[string]any
}
// ChannelMonitorUpdateParams 更新参数(指针字段表示"未提供则不更新")。
type ChannelMonitorUpdateParams struct {
Name *string
Provider *string
Endpoint *string
APIKey *string // 空字符串表示不修改;非空字符串覆盖
PrimaryModel *string
ExtraModels *[]string
GroupName *string
Enabled *bool
IntervalSeconds *int
// 自定义快照字段:指针为 nil 表示不更新,非 nil 覆盖
// TemplateID *(*int64):用 ** 表达三态:nil=不更新;&nil=清空;&&id=设为 id。
// 简化处理:用 ClearTemplate 显式标志 + TemplateID(普通指针)
TemplateID *int64
ClearTemplate bool // true 时无视 TemplateID,把监控的 template_id 置空
ExtraHeaders *map[string]string
BodyOverrideMode *string
BodyOverride *map[string]any
}
// CheckResult 单个模型一次检测的结果。
type CheckResult struct {
Model string
Status string // operational / degraded / failed / error
LatencyMs *int
PingLatencyMs *int
Message string
CheckedAt time.Time
}
// UserMonitorView 用户只读视图:监控概览(含主模型最近状态 + 7d 可用率 + 附加模型最近状态)。
type UserMonitorView struct {
ID int64
Name string
Provider string
GroupName string
PrimaryModel string
PrimaryStatus string
PrimaryLatencyMs *int
PrimaryPingLatencyMs *int // 主模型最近一次 ping 延迟
Availability7d float64 // 0-100
ExtraModels []ExtraModelStatus
Timeline []UserMonitorTimelinePoint // 主模型最近 N 个历史点(按 checked_at DESC,最新在前)
}
// UserMonitorTimelinePoint 用户视图 timeline 单点数据(去除 message 以减小响应体)。
type UserMonitorTimelinePoint struct {
Status string `json:"status"`
LatencyMs *int `json:"latency_ms"`
PingLatencyMs *int `json:"ping_latency_ms"`
CheckedAt time.Time `json:"checked_at"`
}
// ExtraModelStatus 附加模型最近一次状态。
type ExtraModelStatus struct {
Model string
Status string
LatencyMs *int
}
// UserMonitorDetail 用户只读视图:监控详情(含全部模型 7d/15d/30d 可用率与平均延迟)。
type UserMonitorDetail struct {
ID int64
Name string
Provider string
GroupName string
Models []ModelDetail
}
// ModelDetail 单个模型的可用率/延迟统计。
type ModelDetail struct {
Model string
LatestStatus string
LatestLatencyMs *int
Availability7d float64 // 0-100
Availability15d float64
Availability30d float64
AvgLatency7dMs *int
}
// ChannelMonitorHistoryRow 历史记录入库行(service 层向 repository 提交的数据)。
type ChannelMonitorHistoryRow struct {
MonitorID int64
Model string
Status string
LatencyMs *int
PingLatencyMs *int
Message string
CheckedAt time.Time
}
// ChannelMonitorHistoryEntry 历史记录查询返回行(含 ent 主键 ID)。
type ChannelMonitorHistoryEntry struct {
ID int64
Model string
Status string
LatencyMs *int
PingLatencyMs *int
Message string
CheckedAt time.Time
}
// ChannelMonitorLatest 最近一次检测的简明信息(用于 UserMonitorView 聚合)。
type ChannelMonitorLatest struct {
Model string
Status string
LatencyMs *int
PingLatencyMs *int
CheckedAt time.Time
}
// ChannelMonitorAvailability 单个模型在某窗口内的可用率与平均延迟(用于 UserMonitorDetail 聚合)。
type ChannelMonitorAvailability struct {
Model string
WindowDays int
TotalChecks int
OperationalChecks int // operational + degraded 视为可用
AvailabilityPct float64
AvgLatencyMs *int
}
// MonitorStatusSummary 监控状态聚合(admin list 用,单次 repo 查询消除前端 N+1)。
// PrimaryStatus / PrimaryLatencyMs 描述主模型最近状态;Availability7d 是主模型 7 天可用率;
// ExtraModels 描述附加模型最近状态(用于 hover 展示)。
type MonitorStatusSummary struct {
PrimaryStatus string // 空字符串表示无历史
PrimaryLatencyMs *int
Availability7d float64 // 0-100,无历史时为 0
ExtraModels []ExtraModelStatus
}
package service
import (
"context"
"net/url"
"strings"
)
// 渠道监控参数校验与归一化辅助函数。
// 校验失败一律返回 channel_monitor_const.go 中预定义的 Err* 错误,错误信息不含具体 IP/hostname,避免泄露内网拓扑。
// validateProvider 校验 provider 字符串。
// 唯一来源于 providerAdapters:新增 provider 只需要在 channel_monitor_checker.go 注册 adapter。
func validateProvider(p string) error {
if !isSupportedProvider(p) {
return ErrChannelMonitorInvalidProvider
}
return nil
}
// validateInterval 校验 interval_seconds 范围。
func validateInterval(sec int) error {
if sec < monitorMinIntervalSeconds || sec > monitorMaxIntervalSeconds {
return ErrChannelMonitorInvalidInterval
}
return nil
}
// validateEndpoint 校验 endpoint:
// - scheme 强制 https(拒绝 http,避免明文凭证 + 部分 SSRF 利用面)
// - 必须为 origin(无 path/query/fragment),防止用户填 https://api.openai.com/v1
// 导致 joinURL 拼出 /v1/v1/chat/completions
// - hostname 不能是 localhost/metadata 等已知元数据 hostname
// - 解析所有 IP,任一落在 loopback/RFC1918/link-local/ULA 段即拒绝(防 SSRF)
//
// 错误信息不暴露具体 IP / hostname,避免泄露内网拓扑。
func validateEndpoint(ep string) error {
ep = strings.TrimSpace(ep)
if ep == "" {
return ErrChannelMonitorInvalidEndpoint
}
u, err := url.Parse(ep)
if err != nil {
return ErrChannelMonitorInvalidEndpoint
}
if u.Scheme != "https" {
return ErrChannelMonitorEndpointScheme
}
if u.Host == "" {
return ErrChannelMonitorInvalidEndpoint
}
if u.Path != "" && u.Path != "/" {
return ErrChannelMonitorEndpointPath
}
if u.RawQuery != "" || u.Fragment != "" {
return ErrChannelMonitorEndpointPath
}
hostname := u.Hostname()
ctx, cancel := context.WithTimeout(context.Background(), monitorEndpointResolveTimeout)
defer cancel()
blocked, err := isPrivateOrLoopbackHost(ctx, hostname)
if err != nil {
return ErrChannelMonitorEndpointUnreachable
}
if blocked {
return ErrChannelMonitorEndpointPrivate
}
return nil
}
// normalizeEndpoint 去除前后空白与末尾 `/`,保证存储统一为 origin。
// validateEndpoint 已确保格式合法(仅 origin),这里只做最终归一化。
func normalizeEndpoint(ep string) string {
ep = strings.TrimSpace(ep)
ep = strings.TrimRight(ep, "/")
return ep
}
// normalizeModels 去除空白、重复模型名。保留输入顺序(map 的迭代顺序无关)。
func normalizeModels(in []string) []string {
if len(in) == 0 {
return []string{}
}
seen := make(map[string]struct{}, len(in))
out := make([]string, 0, len(in))
for _, m := range in {
m = strings.TrimSpace(m)
if m == "" {
continue
}
if _, ok := seen[m]; ok {
continue
}
seen[m] = struct{}{}
out = append(out, m)
}
return out
}
......@@ -141,17 +141,23 @@ const (
// ChannelService 渠道管理服务
type ChannelService struct {
repo ChannelRepository
groupRepo GroupRepository
authCacheInvalidator APIKeyAuthCacheInvalidator
pricingService *PricingService // 用于「可用渠道」展示时回落到全局定价;可为 nil(测试场景)
cache atomic.Value // *channelCache
cacheSF singleflight.Group
}
// NewChannelService 创建渠道服务实例
func NewChannelService(repo ChannelRepository, authCacheInvalidator APIKeyAuthCacheInvalidator) *ChannelService {
// NewChannelService 创建渠道服务实例。
// pricingService 仅供 ListAvailable 在渠道未配置定价时回落到全局 LiteLLM 数据;
// 计费热路径走独立的 ModelPricingResolver,与此参数无关。可传 nil。
func NewChannelService(repo ChannelRepository, groupRepo GroupRepository, authCacheInvalidator APIKeyAuthCacheInvalidator, pricingService *PricingService) *ChannelService {
s := &ChannelService{
repo: repo,
groupRepo: groupRepo,
authCacheInvalidator: authCacheInvalidator,
pricingService: pricingService,
}
return s
}
......@@ -299,6 +305,9 @@ func (s *ChannelService) fetchChannelData(ctx context.Context) ([]Channel, map[i
}
// populateChannelCache 将渠道列表和分组平台映射填充到缓存快照中。
// 装填时对每个 Channel 统一归一化 BillingModelSource,让缓存命中的所有下游
// (gateway routing / billing / 未来任何 cache-backed 读路径)都拿到已归一化的实体,
// 避免"每个出口各自记得 normalize"反模式。
func populateChannelCache(channels []Channel, groupPlatforms map[int64]string) *channelCache {
cache := newEmptyChannelCache()
cache.groupPlatform = groupPlatforms
......@@ -306,6 +315,7 @@ func populateChannelCache(channels []Channel, groupPlatforms map[int64]string) *
cache.loadedAt = time.Now()
for i := range channels {
channels[i].normalizeBillingModelSource()
ch := &channels[i]
cache.byID[ch.ID] = ch
for _, gid := range ch.GroupIDs {
......@@ -516,14 +526,13 @@ func (s *ChannelService) ResolveChannelMappingAndRestrict(ctx context.Context, g
// resolveMapping 基于已查找的渠道信息解析模型映射。
// antigravity 分组依次尝试所有匹配平台,确保跨平台同名映射各自独立。
func resolveMapping(lk *channelLookup, groupID int64, model string) ChannelMappingResult {
// lk.channel 来自已装填的缓存,BillingModelSource 已在 populateChannelCache 阶段归一化,
// 这里无需重复兜底。
result := ChannelMappingResult{
MappedModel: model,
ChannelID: lk.channel.ID,
BillingModelSource: lk.channel.BillingModelSource,
}
if result.BillingModelSource == "" {
result.BillingModelSource = BillingModelSourceChannelMapped
}
modelLower := strings.ToLower(model)
if mapped := lookupMappingAcrossPlatforms(lk.cache, groupID, lk.platform, modelLower); mapped != "" {
......@@ -684,9 +693,7 @@ func (s *ChannelService) Create(ctx context.Context, input *CreateChannelInput)
ApplyPricingToAccountStats: input.ApplyPricingToAccountStats,
AccountStatsPricingRules: input.AccountStatsPricingRules,
}
if channel.BillingModelSource == "" {
channel.BillingModelSource = BillingModelSourceChannelMapped
}
channel.normalizeBillingModelSource()
if err := validateChannelConfig(channel.ModelPricing, channel.ModelMapping); err != nil {
return nil, err
......@@ -702,12 +709,23 @@ func (s *ChannelService) Create(ctx context.Context, input *CreateChannelInput)
}
s.invalidateCache()
return s.repo.GetByID(ctx, channel.ID)
created, err := s.repo.GetByID(ctx, channel.ID)
if err != nil {
return nil, err
}
created.normalizeBillingModelSource()
return created, nil
}
// GetByID 获取渠道详情
// GetByID 获取渠道详情。返回前统一把空 BillingModelSource 回填为 ChannelMapped,
// 让所有 handler 无需重复处理历史空值。
func (s *ChannelService) GetByID(ctx context.Context, id int64) (*Channel, error) {
return s.repo.GetByID(ctx, id)
ch, err := s.repo.GetByID(ctx, id)
if err != nil {
return nil, err
}
ch.normalizeBillingModelSource()
return ch, nil
}
// Update 更新渠道
......@@ -739,7 +757,12 @@ func (s *ChannelService) Update(ctx context.Context, id int64, input *UpdateChan
s.invalidateCache()
s.invalidateAuthCacheForGroups(ctx, oldGroupIDs, channel.GroupIDs)
return s.repo.GetByID(ctx, id)
updated, err := s.repo.GetByID(ctx, id)
if err != nil {
return nil, err
}
updated.normalizeBillingModelSource()
return updated, nil
}
// applyUpdateInput 将更新请求的字段应用到渠道实体上。
......@@ -857,7 +880,14 @@ func (s *ChannelService) Delete(ctx context.Context, id int64) error {
// List 获取渠道列表
func (s *ChannelService) List(ctx context.Context, params pagination.PaginationParams, status, search string) ([]Channel, *pagination.PaginationResult, error) {
return s.repo.List(ctx, params, status, search)
channels, res, err := s.repo.List(ctx, params, status, search)
if err != nil {
return nil, nil, err
}
for i := range channels {
channels[i].normalizeBillingModelSource()
}
return channels, res, nil
}
// modelEntry 表示一个模型模式条目(用于冲突检测)
......@@ -884,12 +914,7 @@ func conflictsBetween(a, b modelEntry) bool {
// toModelEntry 将模型名转换为 modelEntry
func toModelEntry(pattern string) modelEntry {
lower := strings.ToLower(pattern)
isWild := strings.HasSuffix(lower, "*")
prefix := lower
if isWild {
prefix = strings.TrimSuffix(lower, "*")
}
prefix, isWild := splitWildcardSuffix(strings.ToLower(pattern))
return modelEntry{pattern: pattern, prefix: prefix, wildcard: isWild}
}
......
......@@ -189,11 +189,11 @@ func (m *mockChannelAuthCacheInvalidator) InvalidateAuthCacheByGroupID(_ context
// ---------------------------------------------------------------------------
func newTestChannelService(repo *mockChannelRepository) *ChannelService {
return NewChannelService(repo, nil)
return NewChannelService(repo, nil, nil, nil)
}
func newTestChannelServiceWithAuth(repo *mockChannelRepository, auth *mockChannelAuthCacheInvalidator) *ChannelService {
return NewChannelService(repo, auth)
return NewChannelService(repo, nil, auth, nil)
}
// makeStandardRepo returns a repo that serves one active channel with anthropic pricing
......
......@@ -433,3 +433,296 @@ func TestValidateIntervals_UnboundedNotLast(t *testing.T) {
require.Contains(t, err.Error(), "unbounded")
require.Contains(t, err.Error(), "last")
}
func TestSupportedModels_ExactKeysAndPricing(t *testing.T) {
ch := &Channel{
ModelPricing: []ChannelModelPricing{
{ID: 10, Platform: "anthropic", Models: []string{"claude-sonnet-4-6"}, InputPrice: testPtrFloat64(3e-6)},
{ID: 11, Platform: "anthropic", Models: []string{"claude-opus-4-6"}, InputPrice: testPtrFloat64(1.5e-5)},
},
ModelMapping: map[string]map[string]string{
"anthropic": {
"claude-sonnet-4-6": "claude-sonnet-4-6",
"claude-opus-4-6": "claude-opus-4-6",
},
},
}
got := ch.SupportedModels()
require.Len(t, got, 2)
require.Equal(t, "anthropic", got[0].Platform)
require.Equal(t, "claude-opus-4-6", got[0].Name)
require.NotNil(t, got[0].Pricing)
require.Equal(t, int64(11), got[0].Pricing.ID)
require.Equal(t, "claude-sonnet-4-6", got[1].Name)
require.Equal(t, int64(10), got[1].Pricing.ID)
}
func TestSupportedModels_WildcardExpandedFromPricing(t *testing.T) {
ch := &Channel{
ModelPricing: []ChannelModelPricing{
{ID: 1, Platform: "anthropic", Models: []string{"claude-sonnet-4-6", "claude-sonnet-4-5"}},
{ID: 2, Platform: "anthropic", Models: []string{"claude-opus-4-6"}},
},
ModelMapping: map[string]map[string]string{
"anthropic": {
"claude-sonnet-*": "claude-sonnet-4-6",
},
},
}
got := ch.SupportedModels()
names := make([]string, 0, len(got))
for _, m := range got {
names = append(names, m.Name)
}
require.ElementsMatch(t, []string{"claude-sonnet-4-5", "claude-sonnet-4-6", "claude-opus-4-6"}, names)
for _, m := range got {
require.NotContains(t, m.Name, "*")
}
}
func TestSupportedModels_MissingPricingKeepsNilPricing(t *testing.T) {
ch := &Channel{
ModelMapping: map[string]map[string]string{
"anthropic": {"claude-sonnet-4-6": "claude-sonnet-4-6"},
},
}
got := ch.SupportedModels()
require.Len(t, got, 1)
require.Equal(t, "claude-sonnet-4-6", got[0].Name)
require.Nil(t, got[0].Pricing)
}
func TestSupportedModels_DedupAndSort(t *testing.T) {
ch := &Channel{
ModelPricing: []ChannelModelPricing{
{ID: 1, Platform: "anthropic", Models: []string{"claude-sonnet-4-6", "claude-sonnet-4-5"}},
{ID: 2, Platform: "openai", Models: []string{"gpt-4o"}},
},
ModelMapping: map[string]map[string]string{
"anthropic": {
"claude-sonnet-4-6": "upstream-a",
"claude-sonnet-*": "upstream-a",
},
"openai": {"gpt-4o": "gpt-4o"},
},
}
got := ch.SupportedModels()
require.Len(t, got, 3)
require.Equal(t, "anthropic", got[0].Platform)
require.Equal(t, "claude-sonnet-4-5", got[0].Name)
require.Equal(t, "anthropic", got[1].Platform)
require.Equal(t, "claude-sonnet-4-6", got[1].Name)
require.Equal(t, "openai", got[2].Platform)
require.Equal(t, "gpt-4o", got[2].Name)
}
func TestSupportedModels_NilChannelAndEmpty(t *testing.T) {
var nilCh *Channel
require.Nil(t, nilCh.SupportedModels())
empty := &Channel{}
require.Nil(t, empty.SupportedModels())
}
func TestGetModelPricingByPlatform(t *testing.T) {
ch := &Channel{
ModelPricing: []ChannelModelPricing{
{ID: 1, Platform: "anthropic", Models: []string{"claude-sonnet-4-6"}, InputPrice: testPtrFloat64(3e-6)},
{ID: 2, Platform: "openai", Models: []string{"claude-sonnet-4-6"}, InputPrice: testPtrFloat64(1e-6)},
},
}
ant := ch.GetModelPricingByPlatform("anthropic", "claude-sonnet-4-6")
require.NotNil(t, ant)
require.Equal(t, int64(1), ant.ID)
oa := ch.GetModelPricingByPlatform("openai", "claude-sonnet-4-6")
require.NotNil(t, oa)
require.Equal(t, int64(2), oa.ID)
require.Nil(t, ch.GetModelPricingByPlatform("gemini", "claude-sonnet-4-6"))
}
func TestSupportedModels_WildcardOnlyPricingRowsSkipped(t *testing.T) {
// 定价中含通配符条目(pattern),不应被当作具体模型名展开。
ch := &Channel{
ModelPricing: []ChannelModelPricing{
{ID: 1, Platform: "anthropic", Models: []string{"claude-sonnet-*", "claude-sonnet-4-6"}},
},
ModelMapping: map[string]map[string]string{
"anthropic": {"claude-sonnet-*": "claude-sonnet-4-6"},
},
}
got := ch.SupportedModels()
require.Len(t, got, 1)
require.Equal(t, "claude-sonnet-4-6", got[0].Name)
for _, m := range got {
require.NotContains(t, m.Name, "*")
}
}
func TestSupportedModels_WildcardPrefixMatchesNothing(t *testing.T) {
// 通配符模式无任何对应定价模型时,该平台 mapping 路不产出;
// 但其他平台的 pricing-only 模型仍会通过 Pass B 出现。
ch := &Channel{
ModelPricing: []ChannelModelPricing{
{ID: 1, Platform: "openai", Models: []string{"gpt-4o"}},
},
ModelMapping: map[string]map[string]string{
"anthropic": {"gpt-foo-*": "gpt-foo-1"},
},
}
got := ch.SupportedModels()
require.Len(t, got, 1)
require.Equal(t, "openai", got[0].Platform)
require.Equal(t, "gpt-4o", got[0].Name)
}
func TestSupportedModels_CrossPlatformPricingDoesNotBleed(t *testing.T) {
// anthropic 的通配符不应把 openai 定价行拉到 anthropic 平台下;
// openai 的 pricing-only 模型则正常通过 Pass B 暴露在 openai 平台下。
ch := &Channel{
ModelPricing: []ChannelModelPricing{
{ID: 1, Platform: "openai", Models: []string{"claude-sonnet-4-6"}},
},
ModelMapping: map[string]map[string]string{
"anthropic": {"claude-sonnet-*": "x"},
},
}
got := ch.SupportedModels()
require.Len(t, got, 1)
require.Equal(t, "openai", got[0].Platform, "不能把 openai 定价标记为 anthropic 模型")
require.Equal(t, "claude-sonnet-4-6", got[0].Name)
}
func TestSupportedModels_CaseInsensitiveDedup(t *testing.T) {
// 两行定价用不同大小写定义了同一模型,结果应去重为 1 条;首次出现的原始大小写保留。
ch := &Channel{
ModelPricing: []ChannelModelPricing{
{ID: 1, Platform: "openai", Models: []string{"GPT-4o"}},
{ID: 2, Platform: "openai", Models: []string{"gpt-4o"}},
},
ModelMapping: map[string]map[string]string{
"openai": {"gpt-*": "x"},
},
}
got := ch.SupportedModels()
require.Len(t, got, 1)
require.Equal(t, "GPT-4o", got[0].Name)
}
func TestSupportedModels_EmptyPlatformMapping(t *testing.T) {
// ModelMapping 平台 key 存在但 value 为空 map:mapping 路跳过该平台,
// 但 pricing 路仍会把该平台的定价模型补齐(关键修复:azcc 这种"只配定价不配映射"渠道)。
ch := &Channel{
ModelPricing: []ChannelModelPricing{
{ID: 1, Platform: "anthropic", Models: []string{"claude-sonnet-4-6"}},
},
ModelMapping: map[string]map[string]string{
"anthropic": {},
},
}
got := ch.SupportedModels()
require.Len(t, got, 1)
require.Equal(t, "anthropic", got[0].Platform)
require.Equal(t, "claude-sonnet-4-6", got[0].Name)
require.NotNil(t, got[0].Pricing)
}
func TestSupportedModels_ExactKeyUsesPricedCaseWhenAvailable(t *testing.T) {
// mapping key uses uppercase, pricing uses lowercase — pricing's case should win.
ch := &Channel{
ModelPricing: []ChannelModelPricing{
{ID: 1, Platform: "openai", Models: []string{"gpt-4o"}},
},
ModelMapping: map[string]map[string]string{
"openai": {"GPT-4o": "gpt-4o"},
},
}
got := ch.SupportedModels()
require.Len(t, got, 1)
require.Equal(t, "gpt-4o", got[0].Name) // pricing's case wins
}
func TestSupportedModels_AsteriskOnlyMappingExpandsAllPriced(t *testing.T) {
// 映射 key 为单独的 "*":前缀为空 → 命中该平台所有定价模型(透传场景)。
ch := &Channel{
ModelPricing: []ChannelModelPricing{
{ID: 1, Platform: "openai", Models: []string{"gpt-4o", "gpt-4o-mini"}},
},
ModelMapping: map[string]map[string]string{
"openai": {"*": "gpt-4o"},
},
}
got := ch.SupportedModels()
require.Len(t, got, 2)
names := []string{got[0].Name, got[1].Name}
require.ElementsMatch(t, []string{"gpt-4o", "gpt-4o-mini"}, names)
}
func TestSupportedModels_PricingOnlyNoMapping(t *testing.T) {
// 渠道完全没配 mapping,只配了定价 —— 应该把所有定价模型作为支持模型返回。
// 这是修复前的核心 bug 场景(前端显示"未配置模型")。
ch := &Channel{
ModelPricing: []ChannelModelPricing{
{ID: 1, Platform: "anthropic", Models: []string{"claude-opus-4-6"}, InputPrice: testPtrFloat64(1.5e-5)},
{ID: 2, Platform: "anthropic", Models: []string{"claude-haiku-4-5"}, InputPrice: testPtrFloat64(3e-7)},
},
}
got := ch.SupportedModels()
require.Len(t, got, 2)
require.Equal(t, "claude-haiku-4-5", got[0].Name)
require.NotNil(t, got[0].Pricing)
require.Equal(t, int64(2), got[0].Pricing.ID)
require.Equal(t, "claude-opus-4-6", got[1].Name)
require.Equal(t, int64(1), got[1].Pricing.ID)
}
func TestSupportedModels_ExactMappingUsesTargetPricing(t *testing.T) {
// 精确 mapping `src → target`:定价应按 target 查(实际计费的是 target),
// 而不是按 src 自查。
ch := &Channel{
ModelPricing: []ChannelModelPricing{
{ID: 100, Platform: "anthropic", Models: []string{"req-model"}, InputPrice: testPtrFloat64(3e-6)},
{ID: 200, Platform: "anthropic", Models: []string{"served-model"}, InputPrice: testPtrFloat64(1.5e-5)},
},
ModelMapping: map[string]map[string]string{
"anthropic": {
"req-model": "served-model",
},
},
}
got := ch.SupportedModels()
require.Len(t, got, 2)
require.Equal(t, "req-model", got[0].Name)
require.NotNil(t, got[0].Pricing)
require.Equal(t, int64(200), got[0].Pricing.ID, "req-model 显示但定价是 served-model 的(mapping target)")
require.Equal(t, "served-model", got[1].Name)
require.Equal(t, int64(200), got[1].Pricing.ID)
}
func TestSupportedModels_ExactMappingTargetMissingFromPricing(t *testing.T) {
// `src → target` 但 target 不在渠道定价里 —— 结果中 src 的 Pricing 为 nil
// (等待 ListAvailable 阶段的全局 LiteLLM 回落填充)。
ch := &Channel{
ModelPricing: []ChannelModelPricing{
{ID: 1, Platform: "anthropic", Models: []string{"some-priced-model"}, InputPrice: testPtrFloat64(1.5e-5)},
},
ModelMapping: map[string]map[string]string{
"anthropic": {
"missing-src": "missing-target",
},
},
}
got := ch.SupportedModels()
require.Len(t, got, 2)
require.Equal(t, "missing-src", got[0].Name)
require.Nil(t, got[0].Pricing, "target 在渠道定价中缺失时不虚假填充,留给 ListAvailable 走 LiteLLM 回落")
require.Equal(t, "some-priced-model", got[1].Name)
require.NotNil(t, got[1].Pricing)
}
......@@ -243,6 +243,23 @@ const (
// SettingKeyOpsRuntimeLogConfig stores JSON config for runtime log settings.
SettingKeyOpsRuntimeLogConfig = "ops_runtime_log_config"
// =========================
// Channel Monitor (渠道监控)
// =========================
// SettingKeyChannelMonitorEnabled is a DB-backed soft switch for the channel monitor feature.
// When false: runner skips scheduling and user-facing endpoints return an empty list.
SettingKeyChannelMonitorEnabled = "channel_monitor_enabled"
// SettingKeyChannelMonitorDefaultIntervalSeconds controls the default interval (seconds)
// pre-filled when creating a new channel monitor from the admin UI. Range: [15, 3600].
SettingKeyChannelMonitorDefaultIntervalSeconds = "channel_monitor_default_interval_seconds"
// SettingKeyAvailableChannelsEnabled is a DB-backed soft switch for the "Available Channels"
// user-facing aggregate view. When false: user endpoint returns an empty list and the
// sidebar entry is hidden. Defaults to false (opt-in feature).
SettingKeyAvailableChannelsEnabled = "available_channels_enabled"
// =========================
// Overload Cooldown (529)
// =========================
......
......@@ -184,7 +184,7 @@ func newResolverWithChannel(t *testing.T, pricing []ChannelModelPricing) *ModelP
return map[int64]string{groupID: "anthropic"}, nil
},
}
cs := NewChannelService(repo, nil)
cs := NewChannelService(repo, nil, nil, nil)
bs := newTestBillingServiceForResolver()
return NewModelPricingResolver(cs, bs)
}
......@@ -517,7 +517,7 @@ func TestResolve_WithChannelOverride_CacheError(t *testing.T) {
return nil, errors.New("database unavailable")
},
}
cs := NewChannelService(repo, nil)
cs := NewChannelService(repo, nil, nil, nil)
bs := newTestBillingServiceForResolver()
r := NewModelPricingResolver(cs, bs)
......
......@@ -36,11 +36,15 @@ return 0
// - Scheduling: 5-field cron spec (minute hour dom month dow).
// - Multi-instance: best-effort Redis leader lock so only one node runs cleanup.
// - Safety: deletes in batches to avoid long transactions.
//
// 附带:在 runCleanupOnce 末尾调用 ChannelMonitorService.RunDailyMaintenance,
// 统一共享 cron schedule + leader lock + heartbeat,避免再引一套调度。
type OpsCleanupService struct {
opsRepo OpsRepository
db *sql.DB
redisClient *redis.Client
cfg *config.Config
opsRepo OpsRepository
db *sql.DB
redisClient *redis.Client
cfg *config.Config
channelMonitorSvc *ChannelMonitorService
instanceID string
......@@ -57,13 +61,15 @@ func NewOpsCleanupService(
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
channelMonitorSvc *ChannelMonitorService,
) *OpsCleanupService {
return &OpsCleanupService{
opsRepo: opsRepo,
db: db,
redisClient: redisClient,
cfg: cfg,
instanceID: uuid.NewString(),
opsRepo: opsRepo,
db: db,
redisClient: redisClient,
cfg: cfg,
channelMonitorSvc: channelMonitorSvc,
instanceID: uuid.NewString(),
}
}
......@@ -248,6 +254,15 @@ func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDelet
out.dailyPreagg = n
}
// Channel monitor 每日维护(聚合昨日明细 + 软删过期明细/聚合)。
// 失败只记日志,不影响 ops 清理的成功状态(与 ops 各步骤风格一致);
// 维护本身已经把每步错误打到 slog,heartbeat result 不再分项记录。
if s.channelMonitorSvc != nil {
if err := s.channelMonitorSvc.RunDailyMaintenance(ctx); err != nil {
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] channel monitor maintenance failed: %v", err)
}
}
return out, nil
}
......
......@@ -2,6 +2,7 @@ package service
import (
"context"
"errors"
"fmt"
"log/slog"
"math"
......@@ -16,6 +17,14 @@ import (
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
// ErrOrderNotFound is returned by HandlePaymentNotification when the webhook
// references an out_trade_no that does not exist in our DB. Callers (webhook
// handlers) should treat this as a terminal, non-retryable condition and still
// respond with a 2xx success to the provider — otherwise the provider will keep
// retrying forever (e.g. when a foreign environment's webhook endpoint is
// misconfigured to point at us, or when our orders table has been wiped).
var ErrOrderNotFound = errors.New("payment order not found")
// --- Payment Notification & Fulfillment ---
func (s *PaymentService) HandlePaymentNotification(ctx context.Context, n *payment.PaymentNotification, pk string) error {
......@@ -30,7 +39,10 @@ func (s *PaymentService) HandlePaymentNotification(ctx context.Context, n *payme
if oid, ok := parseLegacyPaymentOrderID(n.OrderID, err); ok {
return s.confirmPayment(ctx, oid, n.TradeNo, n.Amount, pk, n.Metadata)
}
return fmt.Errorf("order not found for out_trade_no: %s", n.OrderID)
if dbent.IsNotFound(err) {
return fmt.Errorf("%w: out_trade_no=%s", ErrOrderNotFound, n.OrderID)
}
return fmt.Errorf("lookup order failed for out_trade_no %s: %w", n.OrderID, err)
}
return s.confirmPayment(ctx, order.ID, n.TradeNo, n.Amount, pk, n.Metadata)
}
......
//go:build unit
package service
import (
"context"
"database/sql"
"errors"
"testing"
"entgo.io/ent/dialect"
entsql "entgo.io/ent/dialect/sql"
_ "modernc.org/sqlite"
dbent "github.com/Wei-Shaw/sub2api/ent"
"github.com/Wei-Shaw/sub2api/ent/enttest"
"github.com/Wei-Shaw/sub2api/internal/payment"
"github.com/stretchr/testify/require"
)
// newOrderNotFoundTestClient wires an in-memory sqlite-backed ent.Client so
// tests can exercise HandlePaymentNotification's real DB lookup path without
// standing up a service stack.
func newOrderNotFoundTestClient(t *testing.T) *dbent.Client {
t.Helper()
db, err := sql.Open("sqlite", "file:payment_order_not_found?mode=memory&cache=shared&_fk=1")
require.NoError(t, err)
t.Cleanup(func() { _ = db.Close() })
_, err = db.Exec("PRAGMA foreign_keys = ON")
require.NoError(t, err)
drv := entsql.OpenDB(dialect.SQLite, db)
client := enttest.NewClient(t, enttest.WithOptions(dbent.Driver(drv)))
t.Cleanup(func() { _ = client.Close() })
return client
}
// TestHandlePaymentNotification_UnknownOrder_ReturnsSentinel exercises the
// happy-path of the webhook 404 fix: when the notification references an
// out_trade_no that does not exist in our DB, HandlePaymentNotification must
// return an error that errors.Is(err, ErrOrderNotFound) recognizes. The
// webhook handler relies on that contract to ack with a 2xx so the provider
// stops retrying.
func TestHandlePaymentNotification_UnknownOrder_ReturnsSentinel(t *testing.T) {
ctx := context.Background()
client := newOrderNotFoundTestClient(t)
svc := &PaymentService{
entClient: client,
providersLoaded: true,
}
notification := &payment.PaymentNotification{
OrderID: "sub2_does_not_exist_12345",
TradeNo: "stripe_evt_test_xyz",
Status: payment.NotificationStatusSuccess,
Amount: 1000,
}
err := svc.HandlePaymentNotification(ctx, notification, payment.TypeStripe)
require.Error(t, err, "unknown out_trade_no should surface an error")
require.ErrorIs(t, err, ErrOrderNotFound,
"webhook handler relies on errors.Is(err, ErrOrderNotFound) to downgrade to 200")
// Sanity: the wrapped error message should still include the out_trade_no
// for operator diagnostics.
require.Contains(t, err.Error(), notification.OrderID)
}
// TestHandlePaymentNotification_NonSuccessStatus_Skips documents the
// short-circuit that precedes the DB lookup: when the notification is not a
// success event (e.g. Stripe non-payment events that reach us via the webhook
// route), we return nil without touching the DB and the handler responds 200.
func TestHandlePaymentNotification_NonSuccessStatus_Skips(t *testing.T) {
ctx := context.Background()
client := newOrderNotFoundTestClient(t)
svc := &PaymentService{
entClient: client,
providersLoaded: true,
}
notification := &payment.PaymentNotification{
OrderID: "sub2_does_not_exist_12345",
Status: "failed", // any value other than NotificationStatusSuccess
}
err := svc.HandlePaymentNotification(ctx, notification, payment.TypeStripe)
require.NoError(t, err,
"non-success notifications must short-circuit before the DB lookup")
}
// TestErrOrderNotFound_DistinctFromOtherErrors guards against an accidental
// collapse where a generic wrapped error would start matching ErrOrderNotFound
// (which would silently mask real DB failures).
func TestErrOrderNotFound_DistinctFromOtherErrors(t *testing.T) {
genericErr := errors.New("some other failure")
require.False(t, errors.Is(genericErr, ErrOrderNotFound))
require.False(t, errors.Is(ErrOrderNotFound, genericErr))
wrappedLookupErr := errors.New("lookup order failed for out_trade_no sub2_42: connection refused")
require.False(t, errors.Is(wrappedLookupErr, ErrOrderNotFound),
"DB connection failures must not masquerade as order-not-found")
}
......@@ -450,6 +450,9 @@ func (s *SettingService) GetPublicSettings(ctx context.Context) (*PublicSettings
SettingKeyBalanceLowNotifyThreshold,
SettingKeyBalanceLowNotifyRechargeURL,
SettingKeyAccountQuotaNotifyEnabled,
SettingKeyChannelMonitorEnabled,
SettingKeyChannelMonitorDefaultIntervalSeconds,
SettingKeyAvailableChannelsEnabled,
}
settings, err := s.settingRepo.GetMultiple(ctx, keys)
......@@ -532,9 +535,88 @@ func (s *SettingService) GetPublicSettings(ctx context.Context) (*PublicSettings
AccountQuotaNotifyEnabled: settings[SettingKeyAccountQuotaNotifyEnabled] == "true",
BalanceLowNotifyThreshold: balanceLowNotifyThreshold,
BalanceLowNotifyRechargeURL: settings[SettingKeyBalanceLowNotifyRechargeURL],
ChannelMonitorEnabled: !isFalseSettingValue(settings[SettingKeyChannelMonitorEnabled]),
ChannelMonitorDefaultIntervalSeconds: parseChannelMonitorInterval(settings[SettingKeyChannelMonitorDefaultIntervalSeconds]),
AvailableChannelsEnabled: settings[SettingKeyAvailableChannelsEnabled] == "true",
}, nil
}
// channelMonitorIntervalMin / channelMonitorIntervalMax bound the default interval
// (mirrors the monitor-level constraint but lives here so setting_service stays decoupled).
const (
channelMonitorIntervalMin = 15
channelMonitorIntervalMax = 3600
channelMonitorIntervalFallback = 60
)
// parseChannelMonitorInterval parses the stored string and clamps to [15, 3600].
// Empty / invalid input falls back to channelMonitorIntervalFallback.
func parseChannelMonitorInterval(raw string) int {
v, err := strconv.Atoi(strings.TrimSpace(raw))
if err != nil {
return channelMonitorIntervalFallback
}
return clampChannelMonitorInterval(v)
}
// clampChannelMonitorInterval clamps v to the allowed range. 0 means "not provided".
func clampChannelMonitorInterval(v int) int {
if v <= 0 {
return 0
}
if v < channelMonitorIntervalMin {
return channelMonitorIntervalMin
}
if v > channelMonitorIntervalMax {
return channelMonitorIntervalMax
}
return v
}
// ChannelMonitorRuntime is the lightweight view of the channel monitor feature
// consumed by the runner and user-facing handlers.
type ChannelMonitorRuntime struct {
Enabled bool
DefaultIntervalSeconds int
}
// GetChannelMonitorRuntime reads the channel monitor feature flags directly from
// the settings store. Fail-open: on error returns Enabled=true with the default interval.
func (s *SettingService) GetChannelMonitorRuntime(ctx context.Context) ChannelMonitorRuntime {
vals, err := s.settingRepo.GetMultiple(ctx, []string{
SettingKeyChannelMonitorEnabled,
SettingKeyChannelMonitorDefaultIntervalSeconds,
})
if err != nil {
return ChannelMonitorRuntime{Enabled: true, DefaultIntervalSeconds: channelMonitorIntervalFallback}
}
return ChannelMonitorRuntime{
Enabled: !isFalseSettingValue(vals[SettingKeyChannelMonitorEnabled]),
DefaultIntervalSeconds: parseChannelMonitorInterval(vals[SettingKeyChannelMonitorDefaultIntervalSeconds]),
}
}
// AvailableChannelsRuntime is the lightweight view of the available-channels feature
// switch consumed by the user-facing handler.
type AvailableChannelsRuntime struct {
Enabled bool
}
// GetAvailableChannelsRuntime reads the available-channels feature switch directly
// from the settings store. Fail-closed: on error returns Enabled=false, matching
// the opt-in default (unknown ↔ disabled).
func (s *SettingService) GetAvailableChannelsRuntime(ctx context.Context) AvailableChannelsRuntime {
vals, err := s.settingRepo.GetMultiple(ctx, []string{SettingKeyAvailableChannelsEnabled})
if err != nil {
return AvailableChannelsRuntime{Enabled: false}
}
return AvailableChannelsRuntime{
Enabled: vals[SettingKeyAvailableChannelsEnabled] == "true",
}
}
// SetOnUpdateCallback sets a callback function to be called when settings are updated
// This is used for cache invalidation (e.g., HTML cache in frontend server)
func (s *SettingService) SetOnUpdateCallback(callback func()) {
......@@ -546,54 +628,75 @@ func (s *SettingService) SetVersion(version string) {
s.version = version
}
// GetPublicSettingsForInjection returns public settings in a format suitable for HTML injection
// This implements the web.PublicSettingsProvider interface
// PublicSettingsInjectionPayload is the JSON shape embedded into HTML as
// `window.__APP_CONFIG__` so the frontend can hydrate feature flags & site
// config before the first XHR finishes.
//
// INVARIANT: every `json` tag here MUST also exist on handler/dto.PublicSettings.
// If you forget a feature-flag field here, the frontend's
// `cachedPublicSettings.xxx_enabled` will be `undefined` on refresh until the
// async `/api/v1/settings/public` call returns — which causes opt-in menus
// (strict `=== true`) to flicker off/on. See
// frontend/src/utils/featureFlags.ts for the matching registry.
//
// A unit test diffs this struct's JSON keys against dto.PublicSettings to catch
// drift automatically (see setting_service_injection_test.go).
type PublicSettingsInjectionPayload struct {
RegistrationEnabled bool `json:"registration_enabled"`
EmailVerifyEnabled bool `json:"email_verify_enabled"`
RegistrationEmailSuffixWhitelist []string `json:"registration_email_suffix_whitelist"`
PromoCodeEnabled bool `json:"promo_code_enabled"`
PasswordResetEnabled bool `json:"password_reset_enabled"`
InvitationCodeEnabled bool `json:"invitation_code_enabled"`
TotpEnabled bool `json:"totp_enabled"`
TurnstileEnabled bool `json:"turnstile_enabled"`
TurnstileSiteKey string `json:"turnstile_site_key"`
SiteName string `json:"site_name"`
SiteLogo string `json:"site_logo"`
SiteSubtitle string `json:"site_subtitle"`
APIBaseURL string `json:"api_base_url"`
ContactInfo string `json:"contact_info"`
DocURL string `json:"doc_url"`
HomeContent string `json:"home_content"`
HideCcsImportButton bool `json:"hide_ccs_import_button"`
PurchaseSubscriptionEnabled bool `json:"purchase_subscription_enabled"`
PurchaseSubscriptionURL string `json:"purchase_subscription_url"`
TableDefaultPageSize int `json:"table_default_page_size"`
TablePageSizeOptions []int `json:"table_page_size_options"`
CustomMenuItems json.RawMessage `json:"custom_menu_items"`
CustomEndpoints json.RawMessage `json:"custom_endpoints"`
LinuxDoOAuthEnabled bool `json:"linuxdo_oauth_enabled"`
WeChatOAuthEnabled bool `json:"wechat_oauth_enabled"`
WeChatOAuthOpenEnabled bool `json:"wechat_oauth_open_enabled"`
WeChatOAuthMPEnabled bool `json:"wechat_oauth_mp_enabled"`
WeChatOAuthMobileEnabled bool `json:"wechat_oauth_mobile_enabled"`
OIDCOAuthEnabled bool `json:"oidc_oauth_enabled"`
OIDCOAuthProviderName string `json:"oidc_oauth_provider_name"`
BackendModeEnabled bool `json:"backend_mode_enabled"`
PaymentEnabled bool `json:"payment_enabled"`
Version string `json:"version"`
BalanceLowNotifyEnabled bool `json:"balance_low_notify_enabled"`
AccountQuotaNotifyEnabled bool `json:"account_quota_notify_enabled"`
BalanceLowNotifyThreshold float64 `json:"balance_low_notify_threshold"`
BalanceLowNotifyRechargeURL string `json:"balance_low_notify_recharge_url"`
// Feature flags — MUST match the opt-in/opt-out registry in
// frontend/src/utils/featureFlags.ts. Missing a field here is the bug
// that hid the "可用渠道" menu on page refresh.
ChannelMonitorEnabled bool `json:"channel_monitor_enabled"`
ChannelMonitorDefaultIntervalSeconds int `json:"channel_monitor_default_interval_seconds"`
AvailableChannelsEnabled bool `json:"available_channels_enabled"`
}
// GetPublicSettingsForInjection returns public settings in a format suitable for HTML injection.
// This implements the web.PublicSettingsProvider interface.
func (s *SettingService) GetPublicSettingsForInjection(ctx context.Context) (any, error) {
settings, err := s.GetPublicSettings(ctx)
if err != nil {
return nil, err
}
// Return a struct that matches the frontend's expected format
return &struct {
RegistrationEnabled bool `json:"registration_enabled"`
EmailVerifyEnabled bool `json:"email_verify_enabled"`
RegistrationEmailSuffixWhitelist []string `json:"registration_email_suffix_whitelist"`
PromoCodeEnabled bool `json:"promo_code_enabled"`
PasswordResetEnabled bool `json:"password_reset_enabled"`
InvitationCodeEnabled bool `json:"invitation_code_enabled"`
TotpEnabled bool `json:"totp_enabled"`
TurnstileEnabled bool `json:"turnstile_enabled"`
TurnstileSiteKey string `json:"turnstile_site_key,omitempty"`
SiteName string `json:"site_name"`
SiteLogo string `json:"site_logo,omitempty"`
SiteSubtitle string `json:"site_subtitle,omitempty"`
APIBaseURL string `json:"api_base_url,omitempty"`
ContactInfo string `json:"contact_info,omitempty"`
DocURL string `json:"doc_url,omitempty"`
HomeContent string `json:"home_content,omitempty"`
HideCcsImportButton bool `json:"hide_ccs_import_button"`
PurchaseSubscriptionEnabled bool `json:"purchase_subscription_enabled"`
PurchaseSubscriptionURL string `json:"purchase_subscription_url,omitempty"`
TableDefaultPageSize int `json:"table_default_page_size"`
TablePageSizeOptions []int `json:"table_page_size_options"`
CustomMenuItems json.RawMessage `json:"custom_menu_items"`
CustomEndpoints json.RawMessage `json:"custom_endpoints"`
LinuxDoOAuthEnabled bool `json:"linuxdo_oauth_enabled"`
WeChatOAuthEnabled bool `json:"wechat_oauth_enabled"`
WeChatOAuthOpenEnabled bool `json:"wechat_oauth_open_enabled"`
WeChatOAuthMPEnabled bool `json:"wechat_oauth_mp_enabled"`
WeChatOAuthMobileEnabled bool `json:"wechat_oauth_mobile_enabled"`
BackendModeEnabled bool `json:"backend_mode_enabled"`
PaymentEnabled bool `json:"payment_enabled"`
OIDCOAuthEnabled bool `json:"oidc_oauth_enabled"`
OIDCOAuthProviderName string `json:"oidc_oauth_provider_name"`
Version string `json:"version,omitempty"`
BalanceLowNotifyEnabled bool `json:"balance_low_notify_enabled"`
AccountQuotaNotifyEnabled bool `json:"account_quota_notify_enabled"`
BalanceLowNotifyThreshold float64 `json:"balance_low_notify_threshold"`
BalanceLowNotifyRechargeURL string `json:"balance_low_notify_recharge_url"`
}{
return &PublicSettingsInjectionPayload{
RegistrationEnabled: settings.RegistrationEnabled,
EmailVerifyEnabled: settings.EmailVerifyEnabled,
RegistrationEmailSuffixWhitelist: settings.RegistrationEmailSuffixWhitelist,
......@@ -622,15 +725,19 @@ func (s *SettingService) GetPublicSettingsForInjection(ctx context.Context) (any
WeChatOAuthOpenEnabled: settings.WeChatOAuthOpenEnabled,
WeChatOAuthMPEnabled: settings.WeChatOAuthMPEnabled,
WeChatOAuthMobileEnabled: settings.WeChatOAuthMobileEnabled,
BackendModeEnabled: settings.BackendModeEnabled,
PaymentEnabled: settings.PaymentEnabled,
OIDCOAuthEnabled: settings.OIDCOAuthEnabled,
OIDCOAuthProviderName: settings.OIDCOAuthProviderName,
BackendModeEnabled: settings.BackendModeEnabled,
PaymentEnabled: settings.PaymentEnabled,
Version: s.version,
BalanceLowNotifyEnabled: settings.BalanceLowNotifyEnabled,
AccountQuotaNotifyEnabled: settings.AccountQuotaNotifyEnabled,
BalanceLowNotifyThreshold: settings.BalanceLowNotifyThreshold,
BalanceLowNotifyRechargeURL: settings.BalanceLowNotifyRechargeURL,
ChannelMonitorEnabled: settings.ChannelMonitorEnabled,
ChannelMonitorDefaultIntervalSeconds: settings.ChannelMonitorDefaultIntervalSeconds,
AvailableChannelsEnabled: settings.AvailableChannelsEnabled,
}, nil
}
......@@ -1086,6 +1193,15 @@ func (s *SettingService) buildSystemSettingsUpdates(ctx context.Context, setting
updates[SettingKeyOpsMetricsIntervalSeconds] = strconv.Itoa(settings.OpsMetricsIntervalSeconds)
}
// Channel monitor feature switch
updates[SettingKeyChannelMonitorEnabled] = strconv.FormatBool(settings.ChannelMonitorEnabled)
if v := clampChannelMonitorInterval(settings.ChannelMonitorDefaultIntervalSeconds); v > 0 {
updates[SettingKeyChannelMonitorDefaultIntervalSeconds] = strconv.Itoa(v)
}
// Available channels feature switch
updates[SettingKeyAvailableChannelsEnabled] = strconv.FormatBool(settings.AvailableChannelsEnabled)
// Claude Code version check
updates[SettingKeyMinClaudeCodeVersion] = settings.MinClaudeCodeVersion
updates[SettingKeyMaxClaudeCodeVersion] = settings.MaxClaudeCodeVersion
......@@ -1644,6 +1760,13 @@ func (s *SettingService) InitializeDefaultSettings(ctx context.Context) error {
SettingKeyOpsQueryModeDefault: "auto",
SettingKeyOpsMetricsIntervalSeconds: "60",
// Channel monitor defaults (enabled, 60s)
SettingKeyChannelMonitorEnabled: "true",
SettingKeyChannelMonitorDefaultIntervalSeconds: "60",
// Available channels feature (default disabled; opt-in)
SettingKeyAvailableChannelsEnabled: "false",
// Claude Code version check (default: empty = disabled)
SettingKeyMinClaudeCodeVersion: "",
SettingKeyMaxClaudeCodeVersion: "",
......@@ -1950,6 +2073,15 @@ func (s *SettingService) parseSettings(settings map[string]string) *SystemSettin
}
}
// Channel monitor feature (default: enabled, 60s)
result.ChannelMonitorEnabled = !isFalseSettingValue(settings[SettingKeyChannelMonitorEnabled])
result.ChannelMonitorDefaultIntervalSeconds = parseChannelMonitorInterval(
settings[SettingKeyChannelMonitorDefaultIntervalSeconds],
)
// Available channels feature (default: disabled; strict true)
result.AvailableChannelsEnabled = settings[SettingKeyAvailableChannelsEnabled] == "true"
// Claude Code version check
result.MinClaudeCodeVersion = settings[SettingKeyMinClaudeCodeVersion]
result.MaxClaudeCodeVersion = settings[SettingKeyMaxClaudeCodeVersion]
......
......@@ -126,6 +126,13 @@ type SystemSettings struct {
OpsQueryModeDefault string
OpsMetricsIntervalSeconds int
// Channel Monitor feature
ChannelMonitorEnabled bool `json:"channel_monitor_enabled"`
ChannelMonitorDefaultIntervalSeconds int `json:"channel_monitor_default_interval_seconds"`
// Available Channels feature (user-facing aggregate view)
AvailableChannelsEnabled bool `json:"available_channels_enabled"`
// Claude Code version check
MinClaudeCodeVersion string
MaxClaudeCodeVersion string
......@@ -210,6 +217,13 @@ type PublicSettings struct {
AccountQuotaNotifyEnabled bool
BalanceLowNotifyThreshold float64
BalanceLowNotifyRechargeURL string
// Channel Monitor feature
ChannelMonitorEnabled bool `json:"channel_monitor_enabled"`
ChannelMonitorDefaultIntervalSeconds int `json:"channel_monitor_default_interval_seconds"`
// Available Channels feature (user-facing aggregate view)
AvailableChannelsEnabled bool `json:"available_channels_enabled"`
}
type WeChatConnectOAuthConfig struct {
......
......@@ -269,13 +269,16 @@ func ProvideOpsAlertEvaluatorService(
}
// ProvideOpsCleanupService creates and starts OpsCleanupService (cron scheduled).
// channelMonitorSvc 让维护任务(聚合 + 历史/聚合软删)跟随 ops 清理 cron 一起跑,
// 共享 leader lock + heartbeat。
func ProvideOpsCleanupService(
opsRepo OpsRepository,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
channelMonitorSvc *ChannelMonitorService,
) *OpsCleanupService {
svc := NewOpsCleanupService(opsRepo, db, redisClient, cfg)
svc := NewOpsCleanupService(opsRepo, db, redisClient, cfg, channelMonitorSvc)
svc.Start()
return svc
}
......@@ -487,6 +490,9 @@ var ProviderSet = wire.NewSet(
NewPaymentService,
ProvidePaymentOrderExpiryService,
ProvideBalanceNotifyService,
ProvideChannelMonitorService,
ProvideChannelMonitorRunner,
NewChannelMonitorRequestTemplateService,
)
// ProvidePaymentConfigService wraps NewPaymentConfigService to accept the named
......@@ -506,3 +512,23 @@ func ProvidePaymentOrderExpiryService(paymentSvc *PaymentService) *PaymentOrderE
svc.Start()
return svc
}
// ProvideChannelMonitorService 创建渠道监控服务(CRUD + RunCheck + 用户视图聚合)。
// 加密器复用 wire 中已注入的 SecretEncryptor(AES-256-GCM)。
func ProvideChannelMonitorService(
repo ChannelMonitorRepository,
encryptor SecretEncryptor,
) *ChannelMonitorService {
return NewChannelMonitorService(repo, encryptor)
}
// ProvideChannelMonitorRunner 创建并启动渠道监控调度器。
// 通过 SetScheduler 注入回 service 后再 Start,确保启动时加载所有 enabled monitor,
// 后续 CRUD 也能即时同步任务表。Runner.Stop 由 cleanup function 调用。
// settingService 用于 runner 每次 fire 读取功能开关。
func ProvideChannelMonitorRunner(svc *ChannelMonitorService, settingService *SettingService) *ChannelMonitorRunner {
r := NewChannelMonitorRunner(svc, settingService)
svc.SetScheduler(r)
r.Start()
return r
}
-- Migration: 125_add_channel_monitors
-- 渠道监控 MVP:周期性对外部 provider/endpoint/api_key 做模型心跳测试。
--
-- 表结构说明:
-- - channel_monitors 渠道配置表(一行 = 一个监控对象)
-- - channel_monitor_histories 检测历史明细表(一次检测一个模型 = 一行)
--
-- 设计要点:
-- - api_key_encrypted 列存放 AES-256-GCM 密文(base64),由 service 层加密。
-- - extra_models 用 JSONB 存储字符串数组,便于扩展(后续可加权重等元数据)。
-- - history 表通过 ON DELETE CASCADE 自动清理已删除监控的历史。
-- - (enabled, last_checked_at) 索引服务于调度器扫描“到期需要检测”的监控。
-- - histories 上 (monitor_id, model, checked_at DESC) 服务用户视图聚合查询;
-- 单独的 (checked_at) 索引服务定期清理 30 天前数据的 DELETE。
CREATE TABLE IF NOT EXISTS channel_monitors (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
provider VARCHAR(20) NOT NULL, -- openai / anthropic / gemini
endpoint VARCHAR(500) NOT NULL, -- base origin
api_key_encrypted TEXT NOT NULL, -- AES-256-GCM (base64)
primary_model VARCHAR(200) NOT NULL,
extra_models JSONB NOT NULL DEFAULT '[]'::jsonb,
group_name VARCHAR(100) NOT NULL DEFAULT '',
enabled BOOLEAN NOT NULL DEFAULT TRUE,
interval_seconds INT NOT NULL,
last_checked_at TIMESTAMPTZ,
created_by BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT channel_monitors_provider_check CHECK (provider IN ('openai', 'anthropic', 'gemini')),
CONSTRAINT channel_monitors_interval_check CHECK (interval_seconds BETWEEN 15 AND 3600)
);
CREATE INDEX IF NOT EXISTS idx_channel_monitors_enabled_last_checked
ON channel_monitors (enabled, last_checked_at);
CREATE INDEX IF NOT EXISTS idx_channel_monitors_provider
ON channel_monitors (provider);
CREATE INDEX IF NOT EXISTS idx_channel_monitors_group_name
ON channel_monitors (group_name);
CREATE TABLE IF NOT EXISTS channel_monitor_histories (
id BIGSERIAL PRIMARY KEY,
monitor_id BIGINT NOT NULL REFERENCES channel_monitors(id) ON DELETE CASCADE,
model VARCHAR(200) NOT NULL,
status VARCHAR(20) NOT NULL,
latency_ms INT,
ping_latency_ms INT,
message VARCHAR(500) NOT NULL DEFAULT '',
checked_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT channel_monitor_histories_status_check
CHECK (status IN ('operational', 'degraded', 'failed', 'error'))
);
CREATE INDEX IF NOT EXISTS idx_channel_monitor_histories_monitor_model_checked
ON channel_monitor_histories (monitor_id, model, checked_at DESC);
CREATE INDEX IF NOT EXISTS idx_channel_monitor_histories_checked_at
ON channel_monitor_histories (checked_at);
-- Migration: 126_add_channel_monitor_aggregation
-- 渠道监控日聚合:把 channel_monitor_histories 的明细按天聚合,明细只保留 1 天,
-- 聚合保留 30 天。明细和聚合表都用软删除(deleted_at),由 ops cleanup 任务每天
-- 凌晨随运维监控清理一起跑(共享 cron)。
--
-- 设计要点:
-- - channel_monitor_histories 加 deleted_at 软删除字段(SoftDeleteMixin 全局
-- Hook 会把 DELETE 自动改写成 UPDATE deleted_at = NOW())。
-- - channel_monitor_daily_rollups 按 (monitor_id, model, bucket_date) 唯一,
-- 用 ON CONFLICT DO UPDATE 实现幂等回填,状态分布和延迟分子分母都保留,
-- 方便后续按窗口任意求加权可用率和均值。
-- - watermark 表只有一行(id=1),记录最近一次聚合到达的日期,避免重启后重复
-- 扫全表。
-- - rollup 上 (bucket_date) 索引服务清理任务的 DELETE WHERE bucket_date < cutoff。
-- 1) 给历史明细表加软删除字段
ALTER TABLE channel_monitor_histories
ADD COLUMN IF NOT EXISTS deleted_at TIMESTAMPTZ;
CREATE INDEX IF NOT EXISTS idx_channel_monitor_histories_deleted_at
ON channel_monitor_histories (deleted_at);
-- 2) 创建日聚合表
CREATE TABLE IF NOT EXISTS channel_monitor_daily_rollups (
id BIGSERIAL PRIMARY KEY,
monitor_id BIGINT NOT NULL REFERENCES channel_monitors(id) ON DELETE CASCADE,
model VARCHAR(200) NOT NULL,
bucket_date DATE NOT NULL,
total_checks INT NOT NULL DEFAULT 0,
ok_count INT NOT NULL DEFAULT 0,
operational_count INT NOT NULL DEFAULT 0,
degraded_count INT NOT NULL DEFAULT 0,
failed_count INT NOT NULL DEFAULT 0,
error_count INT NOT NULL DEFAULT 0,
sum_latency_ms BIGINT NOT NULL DEFAULT 0,
count_latency INT NOT NULL DEFAULT 0,
sum_ping_latency_ms BIGINT NOT NULL DEFAULT 0,
count_ping_latency INT NOT NULL DEFAULT 0,
computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_channel_monitor_daily_rollups_unique
ON channel_monitor_daily_rollups (monitor_id, model, bucket_date);
CREATE INDEX IF NOT EXISTS idx_channel_monitor_daily_rollups_bucket
ON channel_monitor_daily_rollups (bucket_date);
CREATE INDEX IF NOT EXISTS idx_channel_monitor_daily_rollups_deleted_at
ON channel_monitor_daily_rollups (deleted_at);
-- 3) 创建 watermark 表(单行:id=1)
CREATE TABLE IF NOT EXISTS channel_monitor_aggregation_watermark (
id INT PRIMARY KEY DEFAULT 1,
last_aggregated_date DATE,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT channel_monitor_aggregation_watermark_singleton CHECK (id = 1)
);
INSERT INTO channel_monitor_aggregation_watermark (id, last_aggregated_date, updated_at)
VALUES (1, NULL, NOW())
ON CONFLICT (id) DO NOTHING;
-- Migration: 127_drop_channel_monitor_deleted_at
-- 纠正 110 引入的 SoftDeleteMixin:日志/聚合表无恢复需求,软删会让行和索引只增不减,
-- 徒增磁盘和查询开销。改回分批物理删(由 OpsCleanupService 每天凌晨统一调度,
-- deleteOldRowsByID 模板,batch=5000)。
--
-- 110 尚未跑过聚合/清理(首次 maintenance 在次日 02:00),所以此处不担心业务数据。
-- 直接 DROP 列 + 索引;对应的 Go 侧 ent schema 已移除 SoftDeleteMixin、repo 的
-- raw SQL 已移除 deleted_at IS NULL 过滤。
DROP INDEX IF EXISTS idx_channel_monitor_histories_deleted_at;
ALTER TABLE channel_monitor_histories
DROP COLUMN IF EXISTS deleted_at;
DROP INDEX IF EXISTS idx_channel_monitor_daily_rollups_deleted_at;
ALTER TABLE channel_monitor_daily_rollups
DROP COLUMN IF EXISTS deleted_at;
-- Migration: 128_add_channel_monitor_request_templates
-- 加请求模板表 + 给 channel_monitors 加 4 个快照字段(template_id 关联引用 + extra_headers /
-- body_override_mode / body_override 三个真正运行时使用的快照)。
--
-- 设计要点:
-- 1) 模板与监控之间是「应用即拷贝」的快照语义,运行时 checker 不再回查模板表。
-- 模板 UPDATE 不会自动影响监控;只有用户主动「应用到关联监控」才会刷新快照。
-- 2) ON DELETE SET NULL:模板删除不级联清理监控;监控保留快照继续工作。
-- 3) extra_headers / body_override 都是 JSONB;body_override_mode 用 varchar(不是 enum)
-- 便于将来加新模式无需 ALTER TYPE。
-- 4) 同一 provider 内模板 name 唯一(允许 Anthropic + OpenAI 重名 "伪装官方客户端")。
CREATE TABLE IF NOT EXISTS channel_monitor_request_templates (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
provider VARCHAR(20) NOT NULL,
description VARCHAR(500) NOT NULL DEFAULT '',
extra_headers JSONB NOT NULL DEFAULT '{}'::jsonb,
body_override_mode VARCHAR(10) NOT NULL DEFAULT 'off',
body_override JSONB NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT channel_monitor_request_templates_provider_check
CHECK (provider IN ('openai', 'anthropic', 'gemini')),
CONSTRAINT channel_monitor_request_templates_body_mode_check
CHECK (body_override_mode IN ('off', 'merge', 'replace'))
);
CREATE UNIQUE INDEX IF NOT EXISTS channel_monitor_request_templates_provider_name
ON channel_monitor_request_templates (provider, name);
-- channel_monitors 加 4 列(ADD COLUMN IF NOT EXISTS 需要 PG 9.6+,生产使用 PG 16)
ALTER TABLE channel_monitors
ADD COLUMN IF NOT EXISTS template_id BIGINT NULL;
ALTER TABLE channel_monitors
ADD COLUMN IF NOT EXISTS extra_headers JSONB NOT NULL DEFAULT '{}'::jsonb;
ALTER TABLE channel_monitors
ADD COLUMN IF NOT EXISTS body_override_mode VARCHAR(10) NOT NULL DEFAULT 'off';
ALTER TABLE channel_monitors
ADD COLUMN IF NOT EXISTS body_override JSONB NULL;
-- 约束 + 外键(DO 块里 IF NOT EXISTS 判断,保证幂等)
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE constraint_name = 'channel_monitors_body_mode_check'
AND table_name = 'channel_monitors'
) THEN
ALTER TABLE channel_monitors
ADD CONSTRAINT channel_monitors_body_mode_check
CHECK (body_override_mode IN ('off', 'merge', 'replace'));
END IF;
IF NOT EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE constraint_name = 'channel_monitors_template_id_fkey'
AND table_name = 'channel_monitors'
) THEN
ALTER TABLE channel_monitors
ADD CONSTRAINT channel_monitors_template_id_fkey
FOREIGN KEY (template_id)
REFERENCES channel_monitor_request_templates (id)
ON DELETE SET NULL;
END IF;
END $$;
CREATE INDEX IF NOT EXISTS idx_channel_monitors_template_id
ON channel_monitors (template_id)
WHERE template_id IS NOT NULL;
-- Migration: 129_seed_claude_code_template
-- 内置「Claude Code 伪装」请求模板,覆盖 Anthropic 上游对官方 CLI 客户端的所有验证项:
-- 1) User-Agent / X-App / anthropic-beta / anthropic-version 等头
-- 2) system 数组首项与官方 system prompt 字面一致(Dice >= 0.5)
-- 3) metadata.user_id 满足 ParseMetadataUserID — 这里用 legacy 格式(user_<64hex>_account_<uuid>_session_<36char>)
-- 避免新版 JSON 字符串内嵌 JSON 在编辑器里出现一长串 \" 转义,便于用户阅读。
--
-- ON CONFLICT DO NOTHING:已部署环境(手动建过模板)跑此 migration 不会重复 / 覆盖。
-- 用户可自行编辑后续覆盖此 seed;CC 升大版时再起一条 migration 提供新模板,不动用户的旧模板。
INSERT INTO channel_monitor_request_templates (
name, provider, description, extra_headers, body_override_mode, body_override
)
VALUES (
'Claude Code 伪装',
'anthropic',
'完整模拟 Claude Code 2.1.114 客户端:UA + anthropic-beta + system + metadata.user_id 全部对齐,绕过 Anthropic 上游 ''Claude Code only'' 限制(如 Max 套餐)。',
'{
"User-Agent": "claude-cli/2.1.114 (external, sdk-cli)",
"X-App": "cli",
"anthropic-version": "2023-06-01",
"anthropic-beta": "claude-code-20250219,interleaved-thinking-2025-05-14,context-management-2025-06-27,prompt-caching-scope-2026-01-05,advisor-tool-2026-03-01",
"anthropic-dangerous-direct-browser-access": "true"
}'::jsonb,
'merge',
'{
"system": [
{
"type": "text",
"text": "You are Claude Code, Anthropic''s official CLI for Claude."
}
],
"metadata": {
"user_id": "user_0000000000000000000000000000000000000000000000000000000000000000_account_00000000-0000-0000-0000-000000000000_session_00000000-0000-0000-0000-000000000000"
}
}'::jsonb
)
ON CONFLICT (provider, name) DO NOTHING;
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment