Commit e83f0ee3 authored by yangjianbo's avatar yangjianbo
Browse files

Merge branch 'main' into test-dev

parents bff3c66d 942c3e15
package antigravity
import (
"bytes"
"encoding/json"
"fmt"
"strings"
)
// BlockType 内容块类型
type BlockType int
const (
BlockTypeNone BlockType = iota
BlockTypeText
BlockTypeThinking
BlockTypeFunction
)
// StreamingProcessor 流式响应处理器
type StreamingProcessor struct {
blockType BlockType
blockIndex int
messageStartSent bool
messageStopSent bool
usedTool bool
pendingSignature string
trailingSignature string
originalModel string
// 累计 usage
inputTokens int
outputTokens int
}
// NewStreamingProcessor 创建流式响应处理器
func NewStreamingProcessor(originalModel string) *StreamingProcessor {
return &StreamingProcessor{
blockType: BlockTypeNone,
originalModel: originalModel,
}
}
// ProcessLine 处理 SSE 行,返回 Claude SSE 事件
func (p *StreamingProcessor) ProcessLine(line string) []byte {
line = strings.TrimSpace(line)
if line == "" || !strings.HasPrefix(line, "data:") {
return nil
}
data := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
if data == "" || data == "[DONE]" {
return nil
}
// 解包 v1internal 响应
var v1Resp V1InternalResponse
if err := json.Unmarshal([]byte(data), &v1Resp); err != nil {
// 尝试直接解析为 GeminiResponse
var directResp GeminiResponse
if err2 := json.Unmarshal([]byte(data), &directResp); err2 != nil {
return nil
}
v1Resp.Response = directResp
v1Resp.ResponseID = directResp.ResponseID
v1Resp.ModelVersion = directResp.ModelVersion
}
geminiResp := &v1Resp.Response
var result bytes.Buffer
// 发送 message_start
if !p.messageStartSent {
_, _ = result.Write(p.emitMessageStart(&v1Resp))
}
// 更新 usage
if geminiResp.UsageMetadata != nil {
p.inputTokens = geminiResp.UsageMetadata.PromptTokenCount
p.outputTokens = geminiResp.UsageMetadata.CandidatesTokenCount
}
// 处理 parts
if len(geminiResp.Candidates) > 0 && geminiResp.Candidates[0].Content != nil {
for _, part := range geminiResp.Candidates[0].Content.Parts {
_, _ = result.Write(p.processPart(&part))
}
}
// 检查是否结束
if len(geminiResp.Candidates) > 0 {
finishReason := geminiResp.Candidates[0].FinishReason
if finishReason != "" {
_, _ = result.Write(p.emitFinish(finishReason))
}
}
return result.Bytes()
}
// Finish 结束处理,返回最终事件和用量
func (p *StreamingProcessor) Finish() ([]byte, *ClaudeUsage) {
var result bytes.Buffer
if !p.messageStopSent {
_, _ = result.Write(p.emitFinish(""))
}
usage := &ClaudeUsage{
InputTokens: p.inputTokens,
OutputTokens: p.outputTokens,
}
return result.Bytes(), usage
}
// emitMessageStart 发送 message_start 事件
func (p *StreamingProcessor) emitMessageStart(v1Resp *V1InternalResponse) []byte {
if p.messageStartSent {
return nil
}
usage := ClaudeUsage{}
if v1Resp.Response.UsageMetadata != nil {
usage.InputTokens = v1Resp.Response.UsageMetadata.PromptTokenCount
usage.OutputTokens = v1Resp.Response.UsageMetadata.CandidatesTokenCount
}
responseID := v1Resp.ResponseID
if responseID == "" {
responseID = v1Resp.Response.ResponseID
}
if responseID == "" {
responseID = "msg_" + generateRandomID()
}
message := map[string]any{
"id": responseID,
"type": "message",
"role": "assistant",
"content": []any{},
"model": p.originalModel,
"stop_reason": nil,
"stop_sequence": nil,
"usage": usage,
}
event := map[string]any{
"type": "message_start",
"message": message,
}
p.messageStartSent = true
return p.formatSSE("message_start", event)
}
// processPart 处理单个 part
func (p *StreamingProcessor) processPart(part *GeminiPart) []byte {
var result bytes.Buffer
signature := part.ThoughtSignature
// 1. FunctionCall 处理
if part.FunctionCall != nil {
// 先处理 trailingSignature
if p.trailingSignature != "" {
_, _ = result.Write(p.endBlock())
_, _ = result.Write(p.emitEmptyThinkingWithSignature(p.trailingSignature))
p.trailingSignature = ""
}
_, _ = result.Write(p.processFunctionCall(part.FunctionCall, signature))
return result.Bytes()
}
// 2. Text 处理
if part.Text != "" || part.Thought {
if part.Thought {
_, _ = result.Write(p.processThinking(part.Text, signature))
} else {
_, _ = result.Write(p.processText(part.Text, signature))
}
}
// 3. InlineData (Image) 处理
if part.InlineData != nil && part.InlineData.Data != "" {
markdownImg := fmt.Sprintf("![image](data:%s;base64,%s)",
part.InlineData.MimeType, part.InlineData.Data)
_, _ = result.Write(p.processText(markdownImg, ""))
}
return result.Bytes()
}
// processThinking 处理 thinking
func (p *StreamingProcessor) processThinking(text, signature string) []byte {
var result bytes.Buffer
// 处理之前的 trailingSignature
if p.trailingSignature != "" {
_, _ = result.Write(p.endBlock())
_, _ = result.Write(p.emitEmptyThinkingWithSignature(p.trailingSignature))
p.trailingSignature = ""
}
// 开始或继续 thinking 块
if p.blockType != BlockTypeThinking {
_, _ = result.Write(p.startBlock(BlockTypeThinking, map[string]any{
"type": "thinking",
"thinking": "",
}))
}
if text != "" {
_, _ = result.Write(p.emitDelta("thinking_delta", map[string]any{
"thinking": text,
}))
}
// 暂存签名
if signature != "" {
p.pendingSignature = signature
}
return result.Bytes()
}
// processText 处理普通 text
func (p *StreamingProcessor) processText(text, signature string) []byte {
var result bytes.Buffer
// 空 text 带签名 - 暂存
if text == "" {
if signature != "" {
p.trailingSignature = signature
}
return nil
}
// 处理之前的 trailingSignature
if p.trailingSignature != "" {
_, _ = result.Write(p.endBlock())
_, _ = result.Write(p.emitEmptyThinkingWithSignature(p.trailingSignature))
p.trailingSignature = ""
}
// 非空 text 带签名 - 特殊处理
if signature != "" {
_, _ = result.Write(p.startBlock(BlockTypeText, map[string]any{
"type": "text",
"text": "",
}))
_, _ = result.Write(p.emitDelta("text_delta", map[string]any{
"text": text,
}))
_, _ = result.Write(p.endBlock())
_, _ = result.Write(p.emitEmptyThinkingWithSignature(signature))
return result.Bytes()
}
// 普通 text (无签名)
if p.blockType != BlockTypeText {
_, _ = result.Write(p.startBlock(BlockTypeText, map[string]any{
"type": "text",
"text": "",
}))
}
_, _ = result.Write(p.emitDelta("text_delta", map[string]any{
"text": text,
}))
return result.Bytes()
}
// processFunctionCall 处理 function call
func (p *StreamingProcessor) processFunctionCall(fc *GeminiFunctionCall, signature string) []byte {
var result bytes.Buffer
p.usedTool = true
toolID := fc.ID
if toolID == "" {
toolID = fmt.Sprintf("%s-%s", fc.Name, generateRandomID())
}
toolUse := map[string]any{
"type": "tool_use",
"id": toolID,
"name": fc.Name,
"input": map[string]any{},
}
if signature != "" {
toolUse["signature"] = signature
}
_, _ = result.Write(p.startBlock(BlockTypeFunction, toolUse))
// 发送 input_json_delta
if fc.Args != nil {
argsJSON, _ := json.Marshal(fc.Args)
_, _ = result.Write(p.emitDelta("input_json_delta", map[string]any{
"partial_json": string(argsJSON),
}))
}
_, _ = result.Write(p.endBlock())
return result.Bytes()
}
// startBlock 开始新的内容块
func (p *StreamingProcessor) startBlock(blockType BlockType, contentBlock map[string]any) []byte {
var result bytes.Buffer
if p.blockType != BlockTypeNone {
_, _ = result.Write(p.endBlock())
}
event := map[string]any{
"type": "content_block_start",
"index": p.blockIndex,
"content_block": contentBlock,
}
_, _ = result.Write(p.formatSSE("content_block_start", event))
p.blockType = blockType
return result.Bytes()
}
// endBlock 结束当前内容块
func (p *StreamingProcessor) endBlock() []byte {
if p.blockType == BlockTypeNone {
return nil
}
var result bytes.Buffer
// Thinking 块结束时发送暂存的签名
if p.blockType == BlockTypeThinking && p.pendingSignature != "" {
_, _ = result.Write(p.emitDelta("signature_delta", map[string]any{
"signature": p.pendingSignature,
}))
p.pendingSignature = ""
}
event := map[string]any{
"type": "content_block_stop",
"index": p.blockIndex,
}
_, _ = result.Write(p.formatSSE("content_block_stop", event))
p.blockIndex++
p.blockType = BlockTypeNone
return result.Bytes()
}
// emitDelta 发送 delta 事件
func (p *StreamingProcessor) emitDelta(deltaType string, deltaContent map[string]any) []byte {
delta := map[string]any{
"type": deltaType,
}
for k, v := range deltaContent {
delta[k] = v
}
event := map[string]any{
"type": "content_block_delta",
"index": p.blockIndex,
"delta": delta,
}
return p.formatSSE("content_block_delta", event)
}
// emitEmptyThinkingWithSignature 发送空 thinking 块承载签名
func (p *StreamingProcessor) emitEmptyThinkingWithSignature(signature string) []byte {
var result bytes.Buffer
_, _ = result.Write(p.startBlock(BlockTypeThinking, map[string]any{
"type": "thinking",
"thinking": "",
}))
_, _ = result.Write(p.emitDelta("thinking_delta", map[string]any{
"thinking": "",
}))
_, _ = result.Write(p.emitDelta("signature_delta", map[string]any{
"signature": signature,
}))
_, _ = result.Write(p.endBlock())
return result.Bytes()
}
// emitFinish 发送结束事件
func (p *StreamingProcessor) emitFinish(finishReason string) []byte {
var result bytes.Buffer
// 关闭最后一个块
_, _ = result.Write(p.endBlock())
// 处理 trailingSignature
if p.trailingSignature != "" {
_, _ = result.Write(p.emitEmptyThinkingWithSignature(p.trailingSignature))
p.trailingSignature = ""
}
// 确定 stop_reason
stopReason := "end_turn"
if p.usedTool {
stopReason = "tool_use"
} else if finishReason == "MAX_TOKENS" {
stopReason = "max_tokens"
}
usage := ClaudeUsage{
InputTokens: p.inputTokens,
OutputTokens: p.outputTokens,
}
deltaEvent := map[string]any{
"type": "message_delta",
"delta": map[string]any{
"stop_reason": stopReason,
"stop_sequence": nil,
},
"usage": usage,
}
_, _ = result.Write(p.formatSSE("message_delta", deltaEvent))
if !p.messageStopSent {
stopEvent := map[string]any{
"type": "message_stop",
}
_, _ = result.Write(p.formatSSE("message_stop", stopEvent))
p.messageStopSent = true
}
return result.Bytes()
}
// formatSSE 格式化 SSE 事件
func (p *StreamingProcessor) formatSSE(eventType string, data any) []byte {
jsonData, err := json.Marshal(data)
if err != nil {
return nil
}
return []byte(fmt.Sprintf("event: %s\ndata: %s\n\n", eventType, string(jsonData)))
}
// Package ctxkey 定义用于 context.Value 的类型安全 key
package ctxkey
// Key 定义 context key 的类型,避免使用内置 string 类型(staticcheck SA1029)
type Key string
const (
// ForcePlatform 强制平台(用于 /antigravity 路由),由 middleware.ForcePlatform 设置
ForcePlatform Key = "ctx_force_platform"
)
......@@ -464,6 +464,56 @@ func (r *accountRepository) ListSchedulableByGroupIDAndPlatform(ctx context.Cont
})
}
func (r *accountRepository) ListSchedulableByPlatforms(ctx context.Context, platforms []string) ([]service.Account, error) {
if len(platforms) == 0 {
return nil, nil
}
var accounts []accountModel
now := time.Now()
err := r.db.WithContext(ctx).
Where("platform IN ?", platforms).
Where("status = ? AND schedulable = ?", service.StatusActive, true).
Where("(overload_until IS NULL OR overload_until <= ?)", now).
Where("(rate_limit_reset_at IS NULL OR rate_limit_reset_at <= ?)", now).
Preload("Proxy").
Order("priority ASC").
Find(&accounts).Error
if err != nil {
return nil, err
}
outAccounts := make([]service.Account, 0, len(accounts))
for i := range accounts {
outAccounts = append(outAccounts, *accountModelToService(&accounts[i]))
}
return outAccounts, nil
}
func (r *accountRepository) ListSchedulableByGroupIDAndPlatforms(ctx context.Context, groupID int64, platforms []string) ([]service.Account, error) {
if len(platforms) == 0 {
return nil, nil
}
var accounts []accountModel
now := time.Now()
err := r.db.WithContext(ctx).
Joins("JOIN account_groups ON account_groups.account_id = accounts.id").
Where("account_groups.group_id = ?", groupID).
Where("accounts.platform IN ?", platforms).
Where("accounts.status = ? AND accounts.schedulable = ?", service.StatusActive, true).
Where("(accounts.overload_until IS NULL OR accounts.overload_until <= ?)", now).
Where("(accounts.rate_limit_reset_at IS NULL OR accounts.rate_limit_reset_at <= ?)", now).
Preload("Proxy").
Order("account_groups.priority ASC, accounts.priority ASC").
Find(&accounts).Error
if err != nil {
return nil, err
}
outAccounts := make([]service.Account, 0, len(accounts))
for i := range accounts {
outAccounts = append(outAccounts, *accountModelToService(&accounts[i]))
}
return outAccounts, nil
}
func (r *accountRepository) SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error {
now := time.Now()
_, err := r.client.Account.Update().
......
//go:build integration
package repository
import (
"context"
"testing"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/stretchr/testify/suite"
"gorm.io/datatypes"
"gorm.io/gorm"
)
// GatewayRoutingSuite 测试网关路由相关的数据库查询
// 验证账户选择和分流逻辑在真实数据库环境下的行为
type GatewayRoutingSuite struct {
suite.Suite
ctx context.Context
db *gorm.DB
accountRepo *accountRepository
}
func (s *GatewayRoutingSuite) SetupTest() {
s.ctx = context.Background()
s.db = testTx(s.T())
s.accountRepo = NewAccountRepository(s.db).(*accountRepository)
}
func TestGatewayRoutingSuite(t *testing.T) {
suite.Run(t, new(GatewayRoutingSuite))
}
// TestListSchedulableByPlatforms_GeminiAndAntigravity 验证多平台账户查询
func (s *GatewayRoutingSuite) TestListSchedulableByPlatforms_GeminiAndAntigravity() {
// 创建各平台账户
geminiAcc := mustCreateAccount(s.T(), s.db, &accountModel{
Name: "gemini-oauth",
Platform: service.PlatformGemini,
Type: service.AccountTypeOAuth,
Status: service.StatusActive,
Schedulable: true,
Priority: 1,
})
antigravityAcc := mustCreateAccount(s.T(), s.db, &accountModel{
Name: "antigravity-oauth",
Platform: service.PlatformAntigravity,
Type: service.AccountTypeOAuth,
Status: service.StatusActive,
Schedulable: true,
Priority: 2,
Credentials: datatypes.JSONMap{
"access_token": "test-token",
"refresh_token": "test-refresh",
"project_id": "test-project",
},
})
// 创建不应被选中的 anthropic 账户
mustCreateAccount(s.T(), s.db, &accountModel{
Name: "anthropic-oauth",
Platform: service.PlatformAnthropic,
Type: service.AccountTypeOAuth,
Status: service.StatusActive,
Schedulable: true,
Priority: 0,
})
// 查询 gemini + antigravity 平台
accounts, err := s.accountRepo.ListSchedulableByPlatforms(s.ctx, []string{
service.PlatformGemini,
service.PlatformAntigravity,
})
s.Require().NoError(err)
s.Require().Len(accounts, 2, "应返回 gemini 和 antigravity 两个账户")
// 验证返回的账户平台
platforms := make(map[string]bool)
for _, acc := range accounts {
platforms[acc.Platform] = true
}
s.Require().True(platforms[service.PlatformGemini], "应包含 gemini 账户")
s.Require().True(platforms[service.PlatformAntigravity], "应包含 antigravity 账户")
s.Require().False(platforms[service.PlatformAnthropic], "不应包含 anthropic 账户")
// 验证账户 ID 匹配
ids := make(map[int64]bool)
for _, acc := range accounts {
ids[acc.ID] = true
}
s.Require().True(ids[geminiAcc.ID])
s.Require().True(ids[antigravityAcc.ID])
}
// TestListSchedulableByGroupIDAndPlatforms_WithGroupBinding 验证按分组过滤
func (s *GatewayRoutingSuite) TestListSchedulableByGroupIDAndPlatforms_WithGroupBinding() {
// 创建 gemini 分组
group := mustCreateGroup(s.T(), s.db, &groupModel{
Name: "gemini-group",
Platform: service.PlatformGemini,
Status: service.StatusActive,
})
// 创建账户
boundAcc := mustCreateAccount(s.T(), s.db, &accountModel{
Name: "bound-antigravity",
Platform: service.PlatformAntigravity,
Status: service.StatusActive,
Schedulable: true,
})
unboundAcc := mustCreateAccount(s.T(), s.db, &accountModel{
Name: "unbound-antigravity",
Platform: service.PlatformAntigravity,
Status: service.StatusActive,
Schedulable: true,
})
// 只绑定一个账户到分组
mustBindAccountToGroup(s.T(), s.db, boundAcc.ID, group.ID, 1)
// 查询分组内的账户
accounts, err := s.accountRepo.ListSchedulableByGroupIDAndPlatforms(s.ctx, group.ID, []string{
service.PlatformGemini,
service.PlatformAntigravity,
})
s.Require().NoError(err)
s.Require().Len(accounts, 1, "应只返回绑定到分组的账户")
s.Require().Equal(boundAcc.ID, accounts[0].ID)
// 确认未绑定的账户不在结果中
for _, acc := range accounts {
s.Require().NotEqual(unboundAcc.ID, acc.ID, "不应包含未绑定的账户")
}
}
// TestListSchedulableByPlatform_Antigravity 验证单平台查询
func (s *GatewayRoutingSuite) TestListSchedulableByPlatform_Antigravity() {
// 创建多种平台账户
mustCreateAccount(s.T(), s.db, &accountModel{
Name: "gemini-1",
Platform: service.PlatformGemini,
Status: service.StatusActive,
Schedulable: true,
})
antigravity := mustCreateAccount(s.T(), s.db, &accountModel{
Name: "antigravity-1",
Platform: service.PlatformAntigravity,
Status: service.StatusActive,
Schedulable: true,
})
// 只查询 antigravity 平台
accounts, err := s.accountRepo.ListSchedulableByPlatform(s.ctx, service.PlatformAntigravity)
s.Require().NoError(err)
s.Require().Len(accounts, 1)
s.Require().Equal(antigravity.ID, accounts[0].ID)
s.Require().Equal(service.PlatformAntigravity, accounts[0].Platform)
}
// TestSchedulableFilter_ExcludesInactive 验证不可调度账户被过滤
func (s *GatewayRoutingSuite) TestSchedulableFilter_ExcludesInactive() {
// 创建可调度账户
activeAcc := mustCreateAccount(s.T(), s.db, &accountModel{
Name: "active-antigravity",
Platform: service.PlatformAntigravity,
Status: service.StatusActive,
Schedulable: true,
})
// 创建不可调度账户(需要先创建再更新,因为 fixture 默认设置 Schedulable=true)
inactiveAcc := mustCreateAccount(s.T(), s.db, &accountModel{
Name: "inactive-antigravity",
Platform: service.PlatformAntigravity,
Status: service.StatusActive,
})
s.Require().NoError(s.db.Model(&accountModel{}).Where("id = ?", inactiveAcc.ID).Update("schedulable", false).Error)
// 创建错误状态账户
mustCreateAccount(s.T(), s.db, &accountModel{
Name: "error-antigravity",
Platform: service.PlatformAntigravity,
Status: service.StatusError,
Schedulable: true,
})
accounts, err := s.accountRepo.ListSchedulableByPlatform(s.ctx, service.PlatformAntigravity)
s.Require().NoError(err)
s.Require().Len(accounts, 1, "应只返回可调度的 active 账户")
s.Require().Equal(activeAcc.ID, accounts[0].ID)
}
// TestPlatformRoutingDecision 验证平台路由决策
// 这个测试模拟 Handler 层在选择账户后的路由决策逻辑
func (s *GatewayRoutingSuite) TestPlatformRoutingDecision() {
// 创建两种平台的账户
geminiAcc := mustCreateAccount(s.T(), s.db, &accountModel{
Name: "gemini-route-test",
Platform: service.PlatformGemini,
Status: service.StatusActive,
Schedulable: true,
})
antigravityAcc := mustCreateAccount(s.T(), s.db, &accountModel{
Name: "antigravity-route-test",
Platform: service.PlatformAntigravity,
Status: service.StatusActive,
Schedulable: true,
})
tests := []struct {
name string
accountID int64
expectedService string
}{
{
name: "Gemini账户路由到ForwardNative",
accountID: geminiAcc.ID,
expectedService: "GeminiMessagesCompatService.ForwardNative",
},
{
name: "Antigravity账户路由到ForwardGemini",
accountID: antigravityAcc.ID,
expectedService: "AntigravityGatewayService.ForwardGemini",
},
}
for _, tt := range tests {
s.Run(tt.name, func() {
// 从数据库获取账户
account, err := s.accountRepo.GetByID(s.ctx, tt.accountID)
s.Require().NoError(err)
// 模拟 Handler 层的路由决策
var routedService string
if account.Platform == service.PlatformAntigravity {
routedService = "AntigravityGatewayService.ForwardGemini"
} else {
routedService = "GeminiMessagesCompatService.ForwardNative"
}
s.Require().Equal(tt.expectedService, routedService)
})
}
}
package middleware
import "github.com/gin-gonic/gin"
import (
"context"
"github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey"
"github.com/gin-gonic/gin"
)
// ContextKey 定义上下文键类型
type ContextKey string
......@@ -14,8 +19,39 @@ const (
ContextKeyApiKey ContextKey = "api_key"
// ContextKeySubscription 订阅上下文键
ContextKeySubscription ContextKey = "subscription"
// ContextKeyForcePlatform 强制平台(用于 /antigravity 路由)
ContextKeyForcePlatform ContextKey = "force_platform"
)
// ForcePlatform 返回设置强制平台的中间件
// 同时设置 request.Context(供 Service 使用)和 gin.Context(供 Handler 快速检查)
func ForcePlatform(platform string) gin.HandlerFunc {
return func(c *gin.Context) {
// 设置到 request.Context,使用 ctxkey.ForcePlatform 供 Service 层读取
ctx := context.WithValue(c.Request.Context(), ctxkey.ForcePlatform, platform)
c.Request = c.Request.WithContext(ctx)
// 同时设置到 gin.Context,供 Handler 快速检查
c.Set(string(ContextKeyForcePlatform), platform)
c.Next()
}
}
// HasForcePlatform 检查是否有强制平台(用于 Handler 跳过分组检查)
func HasForcePlatform(c *gin.Context) bool {
_, exists := c.Get(string(ContextKeyForcePlatform))
return exists
}
// GetForcePlatformFromContext 从 gin.Context 获取强制平台
func GetForcePlatformFromContext(c *gin.Context) (string, bool) {
value, exists := c.Get(string(ContextKeyForcePlatform))
if !exists {
return "", false
}
platform, ok := value.(string)
return platform, ok
}
// ErrorResponse 标准错误响应结构
type ErrorResponse struct {
Code string `json:"code"`
......
......@@ -34,6 +34,9 @@ func RegisterAdminRoutes(
// Gemini OAuth
registerGeminiOAuthRoutes(admin, h)
// Antigravity OAuth
registerAntigravityOAuthRoutes(admin, h)
// 代理管理
registerProxyRoutes(admin, h)
......@@ -148,6 +151,14 @@ func registerGeminiOAuthRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
}
}
func registerAntigravityOAuthRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
antigravity := admin.Group("/antigravity")
{
antigravity.POST("/oauth/auth-url", h.Admin.AntigravityOAuth.GenerateAuthURL)
antigravity.POST("/oauth/exchange-code", h.Admin.AntigravityOAuth.ExchangeCode)
}
}
func registerProxyRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
proxies := admin.Group("/proxies")
{
......
......@@ -42,4 +42,24 @@ func RegisterGatewayRoutes(
// OpenAI Responses API(不带v1前缀的别名)
r.POST("/responses", gin.HandlerFunc(apiKeyAuth), h.OpenAIGateway.Responses)
// Antigravity 专用路由(仅使用 antigravity 账户,不混合调度)
antigravityV1 := r.Group("/antigravity/v1")
antigravityV1.Use(middleware.ForcePlatform(service.PlatformAntigravity))
antigravityV1.Use(gin.HandlerFunc(apiKeyAuth))
{
antigravityV1.POST("/messages", h.Gateway.Messages)
antigravityV1.POST("/messages/count_tokens", h.Gateway.CountTokens)
antigravityV1.GET("/models", h.Gateway.Models)
antigravityV1.GET("/usage", h.Gateway.Usage)
}
antigravityV1Beta := r.Group("/antigravity/v1beta")
antigravityV1Beta.Use(middleware.ForcePlatform(service.PlatformAntigravity))
antigravityV1Beta.Use(middleware.ApiKeyAuthWithSubscriptionGoogle(apiKeyService, subscriptionService, cfg))
{
antigravityV1Beta.GET("/models", h.Gateway.GeminiV1BetaListModels)
antigravityV1Beta.GET("/models/:model", h.Gateway.GeminiV1BetaGetModel)
antigravityV1Beta.POST("/models/*modelAction", h.Gateway.GeminiV1BetaModels)
}
}
......@@ -346,3 +346,20 @@ func (a *Account) IsOpenAITokenExpired() bool {
}
return time.Now().Add(60 * time.Second).After(*expiresAt)
}
// IsMixedSchedulingEnabled 检查 antigravity 账户是否启用混合调度
// 启用后可参与 anthropic/gemini 分组的账户调度
func (a *Account) IsMixedSchedulingEnabled() bool {
if a.Platform != PlatformAntigravity {
return false
}
if a.Extra == nil {
return false
}
if v, ok := a.Extra["mixed_scheduling"]; ok {
if enabled, ok := v.(bool); ok {
return enabled
}
}
return false
}
......@@ -40,6 +40,8 @@ type AccountRepository interface {
ListSchedulableByGroupID(ctx context.Context, groupID int64) ([]Account, error)
ListSchedulableByPlatform(ctx context.Context, platform string) ([]Account, error)
ListSchedulableByGroupIDAndPlatform(ctx context.Context, groupID int64, platform string) ([]Account, error)
ListSchedulableByPlatforms(ctx context.Context, platforms []string) ([]Account, error)
ListSchedulableByGroupIDAndPlatforms(ctx context.Context, groupID int64, platforms []string) ([]Account, error)
SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error
SetOverloaded(ctx context.Context, id int64, until time.Time) error
......
This diff is collapsed.
//go:build unit
package service
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestIsAntigravityModelSupported(t *testing.T) {
tests := []struct {
name string
model string
expected bool
}{
// 直接支持的模型
{"直接支持 - claude-sonnet-4-5", "claude-sonnet-4-5", true},
{"直接支持 - claude-opus-4-5-thinking", "claude-opus-4-5-thinking", true},
{"直接支持 - claude-sonnet-4-5-thinking", "claude-sonnet-4-5-thinking", true},
{"直接支持 - gemini-2.5-flash", "gemini-2.5-flash", true},
{"直接支持 - gemini-2.5-flash-lite", "gemini-2.5-flash-lite", true},
{"直接支持 - gemini-3-pro-high", "gemini-3-pro-high", true},
// 可映射的模型
{"可映射 - claude-3-5-sonnet-20241022", "claude-3-5-sonnet-20241022", true},
{"可映射 - claude-3-5-sonnet-20240620", "claude-3-5-sonnet-20240620", true},
{"可映射 - claude-opus-4", "claude-opus-4", true},
{"可映射 - claude-haiku-4", "claude-haiku-4", true},
{"可映射 - claude-3-haiku-20240307", "claude-3-haiku-20240307", true},
// Gemini 前缀透传
{"Gemini前缀 - gemini-1.5-pro", "gemini-1.5-pro", true},
{"Gemini前缀 - gemini-unknown-model", "gemini-unknown-model", true},
{"Gemini前缀 - gemini-future-version", "gemini-future-version", true},
// Claude 前缀兜底
{"Claude前缀 - claude-unknown-model", "claude-unknown-model", true},
{"Claude前缀 - claude-3-opus-20240229", "claude-3-opus-20240229", true},
{"Claude前缀 - claude-future-version", "claude-future-version", true},
// 不支持的模型
{"不支持 - gpt-4", "gpt-4", false},
{"不支持 - gpt-4o", "gpt-4o", false},
{"不支持 - llama-3", "llama-3", false},
{"不支持 - mistral-7b", "mistral-7b", false},
{"不支持 - 空字符串", "", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := IsAntigravityModelSupported(tt.model)
require.Equal(t, tt.expected, got, "model: %s", tt.model)
})
}
}
func TestAntigravityGatewayService_GetMappedModel(t *testing.T) {
svc := &AntigravityGatewayService{}
tests := []struct {
name string
requestedModel string
accountMapping map[string]string
expected string
}{
// 1. 账户级映射优先(注意:model_mapping 在 credentials 中存储为 map[string]any)
{
name: "账户映射优先",
requestedModel: "claude-3-5-sonnet-20241022",
accountMapping: map[string]string{"claude-3-5-sonnet-20241022": "custom-model"},
expected: "custom-model",
},
{
name: "账户映射覆盖系统映射",
requestedModel: "claude-opus-4",
accountMapping: map[string]string{"claude-opus-4": "my-opus"},
expected: "my-opus",
},
// 2. 系统默认映射
{
name: "系统映射 - claude-3-5-sonnet-20241022",
requestedModel: "claude-3-5-sonnet-20241022",
accountMapping: nil,
expected: "claude-sonnet-4-5",
},
{
name: "系统映射 - claude-3-5-sonnet-20240620",
requestedModel: "claude-3-5-sonnet-20240620",
accountMapping: nil,
expected: "claude-sonnet-4-5",
},
{
name: "系统映射 - claude-opus-4",
requestedModel: "claude-opus-4",
accountMapping: nil,
expected: "claude-opus-4-5-thinking",
},
{
name: "系统映射 - claude-opus-4-5-20251101",
requestedModel: "claude-opus-4-5-20251101",
accountMapping: nil,
expected: "claude-opus-4-5-thinking",
},
{
name: "系统映射 - claude-haiku-4 → gemini-3-flash",
requestedModel: "claude-haiku-4",
accountMapping: nil,
expected: "gemini-3-flash",
},
{
name: "系统映射 - claude-haiku-4-5 → gemini-3-flash",
requestedModel: "claude-haiku-4-5",
accountMapping: nil,
expected: "gemini-3-flash",
},
{
name: "系统映射 - claude-3-haiku-20240307 → gemini-3-flash",
requestedModel: "claude-3-haiku-20240307",
accountMapping: nil,
expected: "gemini-3-flash",
},
{
name: "系统映射 - claude-haiku-4-5-20251001 → gemini-3-flash",
requestedModel: "claude-haiku-4-5-20251001",
accountMapping: nil,
expected: "gemini-3-flash",
},
{
name: "系统映射 - claude-sonnet-4-5-20250929",
requestedModel: "claude-sonnet-4-5-20250929",
accountMapping: nil,
expected: "claude-sonnet-4-5-thinking",
},
// 3. Gemini 透传
{
name: "Gemini透传 - gemini-2.5-flash",
requestedModel: "gemini-2.5-flash",
accountMapping: nil,
expected: "gemini-2.5-flash",
},
{
name: "Gemini透传 - gemini-1.5-pro",
requestedModel: "gemini-1.5-pro",
accountMapping: nil,
expected: "gemini-1.5-pro",
},
{
name: "Gemini透传 - gemini-future-model",
requestedModel: "gemini-future-model",
accountMapping: nil,
expected: "gemini-future-model",
},
// 4. 直接支持的模型
{
name: "直接支持 - claude-sonnet-4-5",
requestedModel: "claude-sonnet-4-5",
accountMapping: nil,
expected: "claude-sonnet-4-5",
},
{
name: "直接支持 - claude-opus-4-5-thinking",
requestedModel: "claude-opus-4-5-thinking",
accountMapping: nil,
expected: "claude-opus-4-5-thinking",
},
{
name: "直接支持 - claude-sonnet-4-5-thinking",
requestedModel: "claude-sonnet-4-5-thinking",
accountMapping: nil,
expected: "claude-sonnet-4-5-thinking",
},
// 5. 默认值 fallback(未知 claude 模型)
{
name: "默认值 - claude-unknown",
requestedModel: "claude-unknown",
accountMapping: nil,
expected: "claude-sonnet-4-5",
},
{
name: "默认值 - claude-3-opus-20240229",
requestedModel: "claude-3-opus-20240229",
accountMapping: nil,
expected: "claude-sonnet-4-5",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
account := &Account{
Platform: PlatformAntigravity,
}
if tt.accountMapping != nil {
// GetModelMapping 期望 model_mapping 是 map[string]any 格式
mappingAny := make(map[string]any)
for k, v := range tt.accountMapping {
mappingAny[k] = v
}
account.Credentials = map[string]any{
"model_mapping": mappingAny,
}
}
got := svc.getMappedModel(account, tt.requestedModel)
require.Equal(t, tt.expected, got, "model: %s", tt.requestedModel)
})
}
}
func TestAntigravityGatewayService_GetMappedModel_EdgeCases(t *testing.T) {
svc := &AntigravityGatewayService{}
tests := []struct {
name string
requestedModel string
expected string
}{
// 空字符串回退到默认值
{"空字符串", "", "claude-sonnet-4-5"},
// 非 claude/gemini 前缀回退到默认值
{"非claude/gemini前缀 - gpt", "gpt-4", "claude-sonnet-4-5"},
{"非claude/gemini前缀 - llama", "llama-3", "claude-sonnet-4-5"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
account := &Account{Platform: PlatformAntigravity}
got := svc.getMappedModel(account, tt.requestedModel)
require.Equal(t, tt.expected, got)
})
}
}
func TestAntigravityGatewayService_IsModelSupported(t *testing.T) {
svc := &AntigravityGatewayService{}
tests := []struct {
name string
model string
expected bool
}{
// 直接支持
{"直接支持 - claude-sonnet-4-5", "claude-sonnet-4-5", true},
{"直接支持 - gemini-3-flash", "gemini-3-flash", true},
// 可映射
{"可映射 - claude-opus-4", "claude-opus-4", true},
// 前缀透传
{"Gemini前缀", "gemini-unknown", true},
{"Claude前缀", "claude-unknown", true},
// 不支持
{"不支持 - gpt-4", "gpt-4", false},
{"不支持 - 空字符串", "", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := svc.IsModelSupported(tt.model)
require.Equal(t, tt.expected, got)
})
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
package service
import (
"context"
"strconv"
"time"
)
// AntigravityTokenRefresher 实现 TokenRefresher 接口
type AntigravityTokenRefresher struct {
antigravityOAuthService *AntigravityOAuthService
}
func NewAntigravityTokenRefresher(antigravityOAuthService *AntigravityOAuthService) *AntigravityTokenRefresher {
return &AntigravityTokenRefresher{
antigravityOAuthService: antigravityOAuthService,
}
}
// CanRefresh 检查是否可以刷新此账户
func (r *AntigravityTokenRefresher) CanRefresh(account *Account) bool {
return account.Platform == PlatformAntigravity && account.Type == AccountTypeOAuth
}
// NeedsRefresh 检查账户是否需要刷新
func (r *AntigravityTokenRefresher) NeedsRefresh(account *Account, refreshWindow time.Duration) bool {
if !r.CanRefresh(account) {
return false
}
expiresAtStr := account.GetCredential("expires_at")
if expiresAtStr == "" {
return false
}
expiresAt, err := strconv.ParseInt(expiresAtStr, 10, 64)
if err != nil {
return false
}
expiryTime := time.Unix(expiresAt, 0)
return time.Until(expiryTime) < refreshWindow
}
// Refresh 执行 token 刷新
func (r *AntigravityTokenRefresher) Refresh(ctx context.Context, account *Account) (map[string]any, error) {
tokenInfo, err := r.antigravityOAuthService.RefreshAccountToken(ctx, account)
if err != nil {
return nil, err
}
newCredentials := r.antigravityOAuthService.BuildAccountCredentials(tokenInfo)
for k, v := range account.Credentials {
if _, exists := newCredentials[k]; !exists {
newCredentials[k] = v
}
}
return newCredentials, nil
}
......@@ -21,6 +21,7 @@ const (
PlatformAnthropic = "anthropic"
PlatformOpenAI = "openai"
PlatformGemini = "gemini"
PlatformAntigravity = "antigravity"
)
// Account type constants
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment