Commit 429f38d0 authored by shaw's avatar shaw
Browse files

Merge PR #37: Add Gemini OAuth and Messages Compat Support

parents 2d89f366 2714be99
package repository
import (
"context"
"fmt"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/geminicli"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/imroc/req/v3"
)
type geminiCliCodeAssistClient struct {
baseURL string
}
func NewGeminiCliCodeAssistClient() service.GeminiCliCodeAssistClient {
return &geminiCliCodeAssistClient{baseURL: geminicli.GeminiCliBaseURL}
}
func (c *geminiCliCodeAssistClient) LoadCodeAssist(ctx context.Context, accessToken, proxyURL string, reqBody *geminicli.LoadCodeAssistRequest) (*geminicli.LoadCodeAssistResponse, error) {
if reqBody == nil {
reqBody = defaultLoadCodeAssistRequest()
}
var out geminicli.LoadCodeAssistResponse
resp, err := createGeminiCliReqClient(proxyURL).R().
SetContext(ctx).
SetHeader("Authorization", "Bearer "+accessToken).
SetHeader("Content-Type", "application/json").
SetHeader("User-Agent", geminicli.GeminiCLIUserAgent).
SetBody(reqBody).
SetSuccessResult(&out).
Post(c.baseURL + "/v1internal:loadCodeAssist")
if err != nil {
fmt.Printf("[CodeAssist] LoadCodeAssist request error: %v\n", err)
return nil, fmt.Errorf("request failed: %w", err)
}
if !resp.IsSuccessState() {
body := geminicli.SanitizeBodyForLogs(resp.String())
fmt.Printf("[CodeAssist] LoadCodeAssist failed: status %d, body: %s\n", resp.StatusCode, body)
return nil, fmt.Errorf("loadCodeAssist failed: status %d, body: %s", resp.StatusCode, body)
}
fmt.Printf("[CodeAssist] LoadCodeAssist success: status %d, response: %+v\n", resp.StatusCode, out)
return &out, nil
}
func (c *geminiCliCodeAssistClient) OnboardUser(ctx context.Context, accessToken, proxyURL string, reqBody *geminicli.OnboardUserRequest) (*geminicli.OnboardUserResponse, error) {
if reqBody == nil {
reqBody = defaultOnboardUserRequest()
}
fmt.Printf("[CodeAssist] OnboardUser request body: %+v\n", reqBody)
var out geminicli.OnboardUserResponse
resp, err := createGeminiCliReqClient(proxyURL).R().
SetContext(ctx).
SetHeader("Authorization", "Bearer "+accessToken).
SetHeader("Content-Type", "application/json").
SetHeader("User-Agent", geminicli.GeminiCLIUserAgent).
SetBody(reqBody).
SetSuccessResult(&out).
Post(c.baseURL + "/v1internal:onboardUser")
if err != nil {
fmt.Printf("[CodeAssist] OnboardUser request error: %v\n", err)
return nil, fmt.Errorf("request failed: %w", err)
}
if !resp.IsSuccessState() {
body := geminicli.SanitizeBodyForLogs(resp.String())
fmt.Printf("[CodeAssist] OnboardUser failed: status %d, body: %s\n", resp.StatusCode, body)
return nil, fmt.Errorf("onboardUser failed: status %d, body: %s", resp.StatusCode, body)
}
fmt.Printf("[CodeAssist] OnboardUser success: status %d, response: %+v\n", resp.StatusCode, out)
return &out, nil
}
func createGeminiCliReqClient(proxyURL string) *req.Client {
client := req.C().SetTimeout(30 * time.Second)
if proxyURL != "" {
client.SetProxyURL(proxyURL)
}
return client
}
func defaultLoadCodeAssistRequest() *geminicli.LoadCodeAssistRequest {
return &geminicli.LoadCodeAssistRequest{
Metadata: geminicli.LoadCodeAssistMetadata{
IDEType: "ANTIGRAVITY",
Platform: "PLATFORM_UNSPECIFIED",
PluginType: "GEMINI",
},
}
}
func defaultOnboardUserRequest() *geminicli.OnboardUserRequest {
return &geminicli.OnboardUserRequest{
TierID: "LEGACY",
Metadata: geminicli.LoadCodeAssistMetadata{
IDEType: "ANTIGRAVITY",
Platform: "PLATFORM_UNSPECIFIED",
PluginType: "GEMINI",
},
}
}
......@@ -120,10 +120,9 @@ func (s *PricingServiceSuite) TestFetchHashText_WhitespaceOnly() {
func (s *PricingServiceSuite) TestFetchPricingJSON_ContextCancel() {
started := make(chan struct{})
block := make(chan struct{})
s.setupServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
close(started)
<-block
<-r.Context().Done()
}))
ctx, cancel := context.WithCancel(s.ctx)
......@@ -136,7 +135,6 @@ func (s *PricingServiceSuite) TestFetchPricingJSON_ContextCancel() {
<-started
cancel()
close(block)
err := <-done
require.Error(s.T(), err)
......
......@@ -25,6 +25,7 @@ var ProviderSet = wire.NewSet(
NewIdentityCache,
NewRedeemCache,
NewUpdateCache,
NewGeminiTokenCache,
// HTTP service ports (DI Strategy A: return interface directly)
NewTurnstileVerifier,
......@@ -35,4 +36,6 @@ var ProviderSet = wire.NewSet(
NewClaudeOAuthClient,
NewHTTPUpstream,
NewOpenAIOAuthClient,
NewGeminiOAuthClient,
NewGeminiCliCodeAssistClient,
)
......@@ -7,6 +7,7 @@ import (
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/handler"
middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
"github.com/google/wire"
......@@ -25,6 +26,8 @@ func ProvideRouter(
jwtAuth middleware2.JWTAuthMiddleware,
adminAuth middleware2.AdminAuthMiddleware,
apiKeyAuth middleware2.ApiKeyAuthMiddleware,
apiKeyService *service.ApiKeyService,
subscriptionService *service.SubscriptionService,
) *gin.Engine {
if cfg.Server.Mode == "release" {
gin.SetMode(gin.ReleaseMode)
......@@ -33,7 +36,7 @@ func ProvideRouter(
r := gin.New()
r.Use(middleware2.Recovery())
return SetupRouter(r, handlers, jwtAuth, adminAuth, apiKeyAuth)
return SetupRouter(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService)
}
// ProvideHTTPServer 提供 HTTP 服务器
......
......@@ -35,9 +35,24 @@ func apiKeyAuthWithSubscription(apiKeyService *service.ApiKeyService, subscripti
apiKeyString = c.GetHeader("x-api-key")
}
// 如果两个header没有API key
// 如果x-api-key header没有,尝试从x-goog-api-key header中提取(Gemini CLI兼容)
if apiKeyString == "" {
AbortWithError(c, 401, "API_KEY_REQUIRED", "API key is required in Authorization header (Bearer scheme) or x-api-key header")
apiKeyString = c.GetHeader("x-goog-api-key")
}
// 如果header中没有,尝试从query参数中提取(Google API key风格)
if apiKeyString == "" {
apiKeyString = c.Query("key")
}
// 兼容常见别名
if apiKeyString == "" {
apiKeyString = c.Query("api_key")
}
// 如果所有header都没有API key
if apiKeyString == "" {
AbortWithError(c, 401, "API_KEY_REQUIRED", "API key is required in Authorization header (Bearer scheme), x-api-key header, x-goog-api-key header, or key/api_key query parameter")
return
}
......
package middleware
import (
"errors"
"strings"
"github.com/Wei-Shaw/sub2api/internal/pkg/googleapi"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
"gorm.io/gorm"
)
// ApiKeyAuthGoogle is a Google-style error wrapper for API key auth.
func ApiKeyAuthGoogle(apiKeyService *service.ApiKeyService) gin.HandlerFunc {
return ApiKeyAuthWithSubscriptionGoogle(apiKeyService, nil)
}
// ApiKeyAuthWithSubscriptionGoogle behaves like ApiKeyAuthWithSubscription but returns Google-style errors:
// {"error":{"code":401,"message":"...","status":"UNAUTHENTICATED"}}
//
// It is intended for Gemini native endpoints (/v1beta) to match Gemini SDK expectations.
func ApiKeyAuthWithSubscriptionGoogle(apiKeyService *service.ApiKeyService, subscriptionService *service.SubscriptionService) gin.HandlerFunc {
return func(c *gin.Context) {
apiKeyString := extractAPIKeyFromRequest(c)
if apiKeyString == "" {
abortWithGoogleError(c, 401, "API key is required")
return
}
apiKey, err := apiKeyService.GetByKey(c.Request.Context(), apiKeyString)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
abortWithGoogleError(c, 401, "Invalid API key")
return
}
abortWithGoogleError(c, 500, "Failed to validate API key")
return
}
if !apiKey.IsActive() {
abortWithGoogleError(c, 401, "API key is disabled")
return
}
if apiKey.User == nil {
abortWithGoogleError(c, 401, "User associated with API key not found")
return
}
if !apiKey.User.IsActive() {
abortWithGoogleError(c, 401, "User account is not active")
return
}
isSubscriptionType := apiKey.Group != nil && apiKey.Group.IsSubscriptionType()
if isSubscriptionType && subscriptionService != nil {
subscription, err := subscriptionService.GetActiveSubscription(
c.Request.Context(),
apiKey.User.ID,
apiKey.Group.ID,
)
if err != nil {
abortWithGoogleError(c, 403, "No active subscription found for this group")
return
}
if err := subscriptionService.ValidateSubscription(c.Request.Context(), subscription); err != nil {
abortWithGoogleError(c, 403, err.Error())
return
}
_ = subscriptionService.CheckAndActivateWindow(c.Request.Context(), subscription)
_ = subscriptionService.CheckAndResetWindows(c.Request.Context(), subscription)
if err := subscriptionService.CheckUsageLimits(c.Request.Context(), subscription, apiKey.Group, 0); err != nil {
abortWithGoogleError(c, 429, err.Error())
return
}
c.Set(string(ContextKeySubscription), subscription)
} else {
if apiKey.User.Balance <= 0 {
abortWithGoogleError(c, 403, "Insufficient account balance")
return
}
}
c.Set(string(ContextKeyApiKey), apiKey)
c.Set(string(ContextKeyUser), apiKey.User)
c.Next()
}
}
func extractAPIKeyFromRequest(c *gin.Context) string {
authHeader := c.GetHeader("Authorization")
if authHeader != "" {
parts := strings.SplitN(authHeader, " ", 2)
if len(parts) == 2 && parts[0] == "Bearer" && strings.TrimSpace(parts[1]) != "" {
return strings.TrimSpace(parts[1])
}
}
if v := strings.TrimSpace(c.GetHeader("x-api-key")); v != "" {
return v
}
if v := strings.TrimSpace(c.GetHeader("x-goog-api-key")); v != "" {
return v
}
if v := strings.TrimSpace(c.Query("key")); v != "" {
return v
}
if v := strings.TrimSpace(c.Query("api_key")); v != "" {
return v
}
return ""
}
func abortWithGoogleError(c *gin.Context, status int, message string) {
c.JSON(status, gin.H{
"error": gin.H{
"code": status,
"message": message,
"status": googleapi.HTTPStatusToGoogleStatus(status),
},
})
c.Abort()
}
......@@ -4,6 +4,7 @@ import (
"github.com/Wei-Shaw/sub2api/internal/handler"
middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware"
"github.com/Wei-Shaw/sub2api/internal/server/routes"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/Wei-Shaw/sub2api/internal/web"
"github.com/gin-gonic/gin"
......@@ -16,6 +17,8 @@ func SetupRouter(
jwtAuth middleware2.JWTAuthMiddleware,
adminAuth middleware2.AdminAuthMiddleware,
apiKeyAuth middleware2.ApiKeyAuthMiddleware,
apiKeyService *service.ApiKeyService,
subscriptionService *service.SubscriptionService,
) *gin.Engine {
// 应用中间件
r.Use(middleware2.Logger())
......@@ -27,7 +30,7 @@ func SetupRouter(
}
// 注册路由
registerRoutes(r, handlers, jwtAuth, adminAuth, apiKeyAuth)
registerRoutes(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService)
return r
}
......@@ -39,6 +42,8 @@ func registerRoutes(
jwtAuth middleware2.JWTAuthMiddleware,
adminAuth middleware2.AdminAuthMiddleware,
apiKeyAuth middleware2.ApiKeyAuthMiddleware,
apiKeyService *service.ApiKeyService,
subscriptionService *service.SubscriptionService,
) {
// 通用路由(健康检查、状态等)
routes.RegisterCommonRoutes(r)
......@@ -50,5 +55,5 @@ func registerRoutes(
routes.RegisterAuthRoutes(v1, h, jwtAuth)
routes.RegisterUserRoutes(v1, h, jwtAuth)
routes.RegisterAdminRoutes(v1, h, adminAuth)
routes.RegisterGatewayRoutes(r, h, apiKeyAuth)
routes.RegisterGatewayRoutes(r, h, apiKeyAuth, apiKeyService, subscriptionService)
}
......@@ -31,6 +31,9 @@ func RegisterAdminRoutes(
// OpenAI OAuth
registerOpenAIOAuthRoutes(admin, h)
// Gemini OAuth
registerGeminiOAuthRoutes(admin, h)
// 代理管理
registerProxyRoutes(admin, h)
......@@ -136,6 +139,15 @@ func registerOpenAIOAuthRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
}
}
func registerGeminiOAuthRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
gemini := admin.Group("/gemini")
{
gemini.POST("/oauth/auth-url", h.Admin.GeminiOAuth.GenerateAuthURL)
gemini.POST("/oauth/exchange-code", h.Admin.GeminiOAuth.ExchangeCode)
gemini.GET("/oauth/capabilities", h.Admin.GeminiOAuth.GetCapabilities)
}
}
func registerProxyRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
proxies := admin.Group("/proxies")
{
......
......@@ -3,15 +3,18 @@ package routes
import (
"github.com/Wei-Shaw/sub2api/internal/handler"
"github.com/Wei-Shaw/sub2api/internal/server/middleware"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
)
// RegisterGatewayRoutes 注册 API 网关路由(Claude/OpenAI 兼容)
// RegisterGatewayRoutes 注册 API 网关路由(Claude/OpenAI/Gemini 兼容)
func RegisterGatewayRoutes(
r *gin.Engine,
h *handler.Handlers,
apiKeyAuth middleware.ApiKeyAuthMiddleware,
apiKeyService *service.ApiKeyService,
subscriptionService *service.SubscriptionService,
) {
// API网关(Claude API兼容)
gateway := r.Group("/v1")
......@@ -25,6 +28,16 @@ func RegisterGatewayRoutes(
gateway.POST("/responses", h.OpenAIGateway.Responses)
}
// Gemini 原生 API 兼容层(Gemini SDK/CLI 直连)
gemini := r.Group("/v1beta")
gemini.Use(middleware.ApiKeyAuthWithSubscriptionGoogle(apiKeyService, subscriptionService))
{
gemini.GET("/models", h.Gateway.GeminiV1BetaListModels)
gemini.GET("/models/:model", h.Gateway.GeminiV1BetaGetModel)
// Gin treats ":" as a param marker, but Gemini uses "{model}:{action}" in the same segment.
gemini.POST("/models/*modelAction", h.Gateway.GeminiV1BetaModels)
}
// OpenAI Responses API(不带v1前缀的别名)
r.POST("/responses", gin.HandlerFunc(apiKeyAuth), h.OpenAIGateway.Responses)
}
......@@ -70,6 +70,10 @@ func (a *Account) IsOAuth() bool {
return a.Type == AccountTypeOAuth || a.Type == AccountTypeSetupToken
}
func (a *Account) IsGemini() bool {
return a.Platform == PlatformGemini
}
func (a *Account) CanGetUsage() bool {
return a.Type == AccountTypeOAuth
}
......
......@@ -3,6 +3,7 @@ package service
import (
"bufio"
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
......@@ -16,6 +17,7 @@ import (
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/claude"
"github.com/Wei-Shaw/sub2api/internal/pkg/geminicli"
"github.com/Wei-Shaw/sub2api/internal/pkg/openai"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
......@@ -45,15 +47,23 @@ type AccountTestService struct {
accountRepo AccountRepository
oauthService *OAuthService
openaiOAuthService *OpenAIOAuthService
geminiTokenProvider *GeminiTokenProvider
httpUpstream HTTPUpstream
}
// NewAccountTestService creates a new AccountTestService
func NewAccountTestService(accountRepo AccountRepository, oauthService *OAuthService, openaiOAuthService *OpenAIOAuthService, httpUpstream HTTPUpstream) *AccountTestService {
func NewAccountTestService(
accountRepo AccountRepository,
oauthService *OAuthService,
openaiOAuthService *OpenAIOAuthService,
geminiTokenProvider *GeminiTokenProvider,
httpUpstream HTTPUpstream,
) *AccountTestService {
return &AccountTestService{
accountRepo: accountRepo,
oauthService: oauthService,
openaiOAuthService: openaiOAuthService,
geminiTokenProvider: geminiTokenProvider,
httpUpstream: httpUpstream,
}
}
......@@ -127,6 +137,10 @@ func (s *AccountTestService) TestAccountConnection(c *gin.Context, accountID int
return s.testOpenAIAccountConnection(c, account, modelID)
}
if account.IsGemini() {
return s.testGeminiAccountConnection(c, account, modelID)
}
return s.testClaudeAccountConnection(c, account, modelID)
}
......@@ -372,6 +386,252 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account
return s.processOpenAIStream(c, resp.Body)
}
// testGeminiAccountConnection tests a Gemini account's connection
func (s *AccountTestService) testGeminiAccountConnection(c *gin.Context, account *Account, modelID string) error {
ctx := c.Request.Context()
// Determine the model to use
testModelID := modelID
if testModelID == "" {
testModelID = geminicli.DefaultTestModel
}
// For API Key accounts with model mapping, map the model
if account.Type == AccountTypeApiKey {
mapping := account.GetModelMapping()
if len(mapping) > 0 {
if mappedModel, exists := mapping[testModelID]; exists {
testModelID = mappedModel
}
}
}
// Set SSE headers
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("X-Accel-Buffering", "no")
c.Writer.Flush()
// Create test payload (Gemini format)
payload := createGeminiTestPayload()
// Build request based on account type
var req *http.Request
var err error
switch account.Type {
case AccountTypeApiKey:
req, err = s.buildGeminiAPIKeyRequest(ctx, account, testModelID, payload)
case AccountTypeOAuth:
req, err = s.buildGeminiOAuthRequest(ctx, account, testModelID, payload)
default:
return s.sendErrorAndEnd(c, fmt.Sprintf("Unsupported account type: %s", account.Type))
}
if err != nil {
return s.sendErrorAndEnd(c, fmt.Sprintf("Failed to build request: %s", err.Error()))
}
// Send test_start event
s.sendEvent(c, TestEvent{Type: "test_start", Model: testModelID})
// Get proxy and execute request
proxyURL := ""
if account.ProxyID != nil && account.Proxy != nil {
proxyURL = account.Proxy.URL()
}
resp, err := s.httpUpstream.Do(req, proxyURL)
if err != nil {
return s.sendErrorAndEnd(c, fmt.Sprintf("Request failed: %s", err.Error()))
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return s.sendErrorAndEnd(c, fmt.Sprintf("API returned %d: %s", resp.StatusCode, string(body)))
}
// Process SSE stream
return s.processGeminiStream(c, resp.Body)
}
// buildGeminiAPIKeyRequest builds request for Gemini API Key accounts
func (s *AccountTestService) buildGeminiAPIKeyRequest(ctx context.Context, account *Account, modelID string, payload []byte) (*http.Request, error) {
apiKey := account.GetCredential("api_key")
if strings.TrimSpace(apiKey) == "" {
return nil, fmt.Errorf("no API key available")
}
baseURL := account.GetCredential("base_url")
if baseURL == "" {
baseURL = geminicli.AIStudioBaseURL
}
// Use streamGenerateContent for real-time feedback
fullURL := fmt.Sprintf("%s/v1beta/models/%s:streamGenerateContent?alt=sse",
strings.TrimRight(baseURL, "/"), modelID)
req, err := http.NewRequestWithContext(ctx, "POST", fullURL, bytes.NewReader(payload))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("x-goog-api-key", apiKey)
return req, nil
}
// buildGeminiOAuthRequest builds request for Gemini OAuth accounts
func (s *AccountTestService) buildGeminiOAuthRequest(ctx context.Context, account *Account, modelID string, payload []byte) (*http.Request, error) {
if s.geminiTokenProvider == nil {
return nil, fmt.Errorf("gemini token provider not configured")
}
// Get access token (auto-refreshes if needed)
accessToken, err := s.geminiTokenProvider.GetAccessToken(ctx, account)
if err != nil {
return nil, fmt.Errorf("failed to get access token: %w", err)
}
projectID := strings.TrimSpace(account.GetCredential("project_id"))
if projectID == "" {
// AI Studio OAuth mode (no project_id): call generativelanguage API directly with Bearer token.
baseURL := account.GetCredential("base_url")
if strings.TrimSpace(baseURL) == "" {
baseURL = geminicli.AIStudioBaseURL
}
fullURL := fmt.Sprintf("%s/v1beta/models/%s:streamGenerateContent?alt=sse", strings.TrimRight(baseURL, "/"), modelID)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL, bytes.NewReader(payload))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+accessToken)
return req, nil
}
// Wrap payload in Code Assist format
var inner map[string]any
if err := json.Unmarshal(payload, &inner); err != nil {
return nil, err
}
wrapped := map[string]any{
"model": modelID,
"project": projectID,
"request": inner,
}
wrappedBytes, _ := json.Marshal(wrapped)
fullURL := fmt.Sprintf("%s/v1internal:streamGenerateContent?alt=sse", geminicli.GeminiCliBaseURL)
req, err := http.NewRequestWithContext(ctx, "POST", fullURL, bytes.NewReader(wrappedBytes))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+accessToken)
req.Header.Set("User-Agent", geminicli.GeminiCLIUserAgent)
return req, nil
}
// createGeminiTestPayload creates a minimal test payload for Gemini API
func createGeminiTestPayload() []byte {
payload := map[string]any{
"contents": []map[string]any{
{
"role": "user",
"parts": []map[string]any{
{"text": "hi"},
},
},
},
"systemInstruction": map[string]any{
"parts": []map[string]any{
{"text": "You are a helpful AI assistant."},
},
},
}
bytes, _ := json.Marshal(payload)
return bytes
}
// processGeminiStream processes SSE stream from Gemini API
func (s *AccountTestService) processGeminiStream(c *gin.Context, body io.Reader) error {
reader := bufio.NewReader(body)
for {
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
s.sendEvent(c, TestEvent{Type: "test_complete", Success: true})
return nil
}
return s.sendErrorAndEnd(c, fmt.Sprintf("Stream read error: %s", err.Error()))
}
line = strings.TrimSpace(line)
if line == "" || !strings.HasPrefix(line, "data: ") {
continue
}
jsonStr := strings.TrimPrefix(line, "data: ")
if jsonStr == "[DONE]" {
s.sendEvent(c, TestEvent{Type: "test_complete", Success: true})
return nil
}
var data map[string]any
if err := json.Unmarshal([]byte(jsonStr), &data); err != nil {
continue
}
// Support two Gemini response formats:
// - AI Studio: {"candidates": [...]}
// - Gemini CLI: {"response": {"candidates": [...]}}
if resp, ok := data["response"].(map[string]any); ok && resp != nil {
data = resp
}
if candidates, ok := data["candidates"].([]any); ok && len(candidates) > 0 {
if candidate, ok := candidates[0].(map[string]any); ok {
// Check for completion
if finishReason, ok := candidate["finishReason"].(string); ok && finishReason != "" {
s.sendEvent(c, TestEvent{Type: "test_complete", Success: true})
return nil
}
// Extract content
if content, ok := candidate["content"].(map[string]any); ok {
if parts, ok := content["parts"].([]any); ok {
for _, part := range parts {
if partMap, ok := part.(map[string]any); ok {
if text, ok := partMap["text"].(string); ok && text != "" {
s.sendEvent(c, TestEvent{Type: "content", Text: text})
}
}
}
}
}
}
}
// Handle errors
if errData, ok := data["error"].(map[string]any); ok {
errorMsg := "Unknown error"
if msg, ok := errData["message"].(string); ok {
errorMsg = msg
}
return s.sendErrorAndEnd(c, errorMsg)
}
}
}
// createOpenAITestPayload creates a test payload for OpenAI Responses API
func createOpenAITestPayload(modelID string, isOAuth bool) map[string]any {
payload := map[string]any{
......
......@@ -9,6 +9,7 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
)
......@@ -18,6 +19,7 @@ type CRSSyncService struct {
proxyRepo ProxyRepository
oauthService *OAuthService
openaiOAuthService *OpenAIOAuthService
geminiOAuthService *GeminiOAuthService
}
func NewCRSSyncService(
......@@ -25,12 +27,14 @@ func NewCRSSyncService(
proxyRepo ProxyRepository,
oauthService *OAuthService,
openaiOAuthService *OpenAIOAuthService,
geminiOAuthService *GeminiOAuthService,
) *CRSSyncService {
return &CRSSyncService{
accountRepo: accountRepo,
proxyRepo: proxyRepo,
oauthService: oauthService,
openaiOAuthService: openaiOAuthService,
geminiOAuthService: geminiOAuthService,
}
}
......@@ -75,6 +79,8 @@ type crsExportResponse struct {
ClaudeConsoleAccounts []crsConsoleAccount `json:"claudeConsoleAccounts"`
OpenAIOAuthAccounts []crsOpenAIOAuthAccount `json:"openaiOAuthAccounts"`
OpenAIResponsesAccounts []crsOpenAIResponsesAccount `json:"openaiResponsesAccounts"`
GeminiOAuthAccounts []crsGeminiOAuthAccount `json:"geminiOAuthAccounts"`
GeminiAPIKeyAccounts []crsGeminiAPIKeyAccount `json:"geminiApiKeyAccounts"`
} `json:"data"`
}
......@@ -147,6 +153,37 @@ type crsOpenAIOAuthAccount struct {
Extra map[string]any `json:"extra"`
}
type crsGeminiOAuthAccount struct {
Kind string `json:"kind"`
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Platform string `json:"platform"`
AuthType string `json:"authType"` // oauth
IsActive bool `json:"isActive"`
Schedulable bool `json:"schedulable"`
Priority int `json:"priority"`
Status string `json:"status"`
Proxy *crsProxy `json:"proxy"`
Credentials map[string]any `json:"credentials"`
Extra map[string]any `json:"extra"`
}
type crsGeminiAPIKeyAccount struct {
Kind string `json:"kind"`
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Platform string `json:"platform"`
IsActive bool `json:"isActive"`
Schedulable bool `json:"schedulable"`
Priority int `json:"priority"`
Status string `json:"status"`
Proxy *crsProxy `json:"proxy"`
Credentials map[string]any `json:"credentials"`
Extra map[string]any `json:"extra"`
}
func (s *CRSSyncService) SyncFromCRS(ctx context.Context, input SyncFromCRSInput) (*SyncFromCRSResult, error) {
baseURL, err := normalizeBaseURL(input.BaseURL)
if err != nil {
......@@ -174,7 +211,7 @@ func (s *CRSSyncService) SyncFromCRS(ctx context.Context, input SyncFromCRSInput
Items: make(
[]SyncFromCRSItemResult,
0,
len(exported.Data.ClaudeAccounts)+len(exported.Data.ClaudeConsoleAccounts)+len(exported.Data.OpenAIOAuthAccounts)+len(exported.Data.OpenAIResponsesAccounts),
len(exported.Data.ClaudeAccounts)+len(exported.Data.ClaudeConsoleAccounts)+len(exported.Data.OpenAIOAuthAccounts)+len(exported.Data.OpenAIResponsesAccounts)+len(exported.Data.GeminiOAuthAccounts)+len(exported.Data.GeminiAPIKeyAccounts),
),
}
......@@ -678,6 +715,225 @@ func (s *CRSSyncService) SyncFromCRS(ctx context.Context, input SyncFromCRSInput
result.Items = append(result.Items, item)
}
// Gemini OAuth -> sub2api gemini oauth
for _, src := range exported.Data.GeminiOAuthAccounts {
item := SyncFromCRSItemResult{
CRSAccountID: src.ID,
Kind: src.Kind,
Name: src.Name,
}
refreshToken, _ := src.Credentials["refresh_token"].(string)
if strings.TrimSpace(refreshToken) == "" {
item.Action = "failed"
item.Error = "missing refresh_token"
result.Failed++
result.Items = append(result.Items, item)
continue
}
proxyID, err := s.mapOrCreateProxy(ctx, input.SyncProxies, &proxies, src.Proxy, fmt.Sprintf("crs-%s", src.Name))
if err != nil {
item.Action = "failed"
item.Error = "proxy sync failed: " + err.Error()
result.Failed++
result.Items = append(result.Items, item)
continue
}
credentials := sanitizeCredentialsMap(src.Credentials)
if v, ok := credentials["token_type"].(string); !ok || strings.TrimSpace(v) == "" {
credentials["token_type"] = "Bearer"
}
// Convert expires_at from RFC3339 to Unix seconds string (recommended to keep consistent with GetCredential())
if expiresAtStr, ok := credentials["expires_at"].(string); ok && strings.TrimSpace(expiresAtStr) != "" {
if t, err := time.Parse(time.RFC3339, expiresAtStr); err == nil {
credentials["expires_at"] = strconv.FormatInt(t.Unix(), 10)
}
}
extra := make(map[string]any)
if src.Extra != nil {
for k, v := range src.Extra {
extra[k] = v
}
}
extra["crs_account_id"] = src.ID
extra["crs_kind"] = src.Kind
extra["crs_synced_at"] = now
existing, err := s.accountRepo.GetByCRSAccountID(ctx, src.ID)
if err != nil {
item.Action = "failed"
item.Error = "db lookup failed: " + err.Error()
result.Failed++
result.Items = append(result.Items, item)
continue
}
if existing == nil {
account := &Account{
Name: defaultName(src.Name, src.ID),
Platform: PlatformGemini,
Type: AccountTypeOAuth,
Credentials: credentials,
Extra: extra,
ProxyID: proxyID,
Concurrency: 3,
Priority: clampPriority(src.Priority),
Status: mapCRSStatus(src.IsActive, src.Status),
Schedulable: src.Schedulable,
}
if err := s.accountRepo.Create(ctx, account); err != nil {
item.Action = "failed"
item.Error = "create failed: " + err.Error()
result.Failed++
result.Items = append(result.Items, item)
continue
}
if refreshedCreds := s.refreshOAuthToken(ctx, account); refreshedCreds != nil {
account.Credentials = refreshedCreds
_ = s.accountRepo.Update(ctx, account)
}
item.Action = "created"
result.Created++
result.Items = append(result.Items, item)
continue
}
existing.Extra = mergeMap(existing.Extra, extra)
existing.Name = defaultName(src.Name, src.ID)
existing.Platform = PlatformGemini
existing.Type = AccountTypeOAuth
existing.Credentials = mergeMap(existing.Credentials, credentials)
if proxyID != nil {
existing.ProxyID = proxyID
}
existing.Concurrency = 3
existing.Priority = clampPriority(src.Priority)
existing.Status = mapCRSStatus(src.IsActive, src.Status)
existing.Schedulable = src.Schedulable
if err := s.accountRepo.Update(ctx, existing); err != nil {
item.Action = "failed"
item.Error = "update failed: " + err.Error()
result.Failed++
result.Items = append(result.Items, item)
continue
}
if refreshedCreds := s.refreshOAuthToken(ctx, existing); refreshedCreds != nil {
existing.Credentials = refreshedCreds
_ = s.accountRepo.Update(ctx, existing)
}
item.Action = "updated"
result.Updated++
result.Items = append(result.Items, item)
}
// Gemini API Key -> sub2api gemini apikey
for _, src := range exported.Data.GeminiAPIKeyAccounts {
item := SyncFromCRSItemResult{
CRSAccountID: src.ID,
Kind: src.Kind,
Name: src.Name,
}
apiKey, _ := src.Credentials["api_key"].(string)
if strings.TrimSpace(apiKey) == "" {
item.Action = "failed"
item.Error = "missing api_key"
result.Failed++
result.Items = append(result.Items, item)
continue
}
proxyID, err := s.mapOrCreateProxy(ctx, input.SyncProxies, &proxies, src.Proxy, fmt.Sprintf("crs-%s", src.Name))
if err != nil {
item.Action = "failed"
item.Error = "proxy sync failed: " + err.Error()
result.Failed++
result.Items = append(result.Items, item)
continue
}
credentials := sanitizeCredentialsMap(src.Credentials)
if baseURL, ok := credentials["base_url"].(string); !ok || strings.TrimSpace(baseURL) == "" {
credentials["base_url"] = "https://generativelanguage.googleapis.com"
}
extra := make(map[string]any)
if src.Extra != nil {
for k, v := range src.Extra {
extra[k] = v
}
}
extra["crs_account_id"] = src.ID
extra["crs_kind"] = src.Kind
extra["crs_synced_at"] = now
existing, err := s.accountRepo.GetByCRSAccountID(ctx, src.ID)
if err != nil {
item.Action = "failed"
item.Error = "db lookup failed: " + err.Error()
result.Failed++
result.Items = append(result.Items, item)
continue
}
if existing == nil {
account := &Account{
Name: defaultName(src.Name, src.ID),
Platform: PlatformGemini,
Type: AccountTypeApiKey,
Credentials: credentials,
Extra: extra,
ProxyID: proxyID,
Concurrency: 3,
Priority: clampPriority(src.Priority),
Status: mapCRSStatus(src.IsActive, src.Status),
Schedulable: src.Schedulable,
}
if err := s.accountRepo.Create(ctx, account); err != nil {
item.Action = "failed"
item.Error = "create failed: " + err.Error()
result.Failed++
result.Items = append(result.Items, item)
continue
}
item.Action = "created"
result.Created++
result.Items = append(result.Items, item)
continue
}
existing.Extra = mergeMap(existing.Extra, extra)
existing.Name = defaultName(src.Name, src.ID)
existing.Platform = PlatformGemini
existing.Type = AccountTypeApiKey
existing.Credentials = mergeMap(existing.Credentials, credentials)
if proxyID != nil {
existing.ProxyID = proxyID
}
existing.Concurrency = 3
existing.Priority = clampPriority(src.Priority)
existing.Status = mapCRSStatus(src.IsActive, src.Status)
existing.Schedulable = src.Schedulable
if err := s.accountRepo.Update(ctx, existing); err != nil {
item.Action = "failed"
item.Error = "update failed: " + err.Error()
result.Failed++
result.Items = append(result.Items, item)
continue
}
item.Action = "updated"
result.Updated++
result.Items = append(result.Items, item)
}
return result, nil
}
......@@ -944,6 +1200,21 @@ func (s *CRSSyncService) refreshOAuthToken(ctx context.Context, account *Account
}
}
}
case PlatformGemini:
if s.geminiOAuthService == nil {
return nil
}
tokenInfo, refreshErr := s.geminiOAuthService.RefreshAccountToken(ctx, account)
if refreshErr != nil {
err = refreshErr
} else {
newCredentials = s.geminiOAuthService.BuildAccountCredentials(tokenInfo)
for k, v := range account.Credentials {
if _, exists := newCredentials[k]; !exists {
newCredentials[k] = v
}
}
}
default:
return nil
}
......
......@@ -320,8 +320,17 @@ func (s *GatewayService) SelectAccountForModel(ctx context.Context, groupID *int
selected = acc
} else if acc.Priority == selected.Priority {
// 优先级相同时,选最久未用的
if acc.LastUsedAt == nil || (selected.LastUsedAt != nil && acc.LastUsedAt.Before(*selected.LastUsedAt)) {
switch {
case acc.LastUsedAt == nil && selected.LastUsedAt != nil:
selected = acc
case acc.LastUsedAt != nil && selected.LastUsedAt == nil:
// keep selected (never used is preferred)
case acc.LastUsedAt == nil && selected.LastUsedAt == nil:
// keep selected (both never used)
default:
if acc.LastUsedAt.Before(*selected.LastUsedAt) {
selected = acc
}
}
}
}
......
This diff is collapsed.
package service
import (
"context"
"github.com/Wei-Shaw/sub2api/internal/pkg/geminicli"
)
// GeminiOAuthClient performs Google OAuth token exchange/refresh for Gemini integration.
type GeminiOAuthClient interface {
ExchangeCode(ctx context.Context, oauthType, code, codeVerifier, redirectURI, proxyURL string) (*geminicli.TokenResponse, error)
RefreshToken(ctx context.Context, oauthType, refreshToken, proxyURL string) (*geminicli.TokenResponse, error)
}
This diff is collapsed.
package service
import (
"context"
"time"
)
// GeminiTokenCache stores short-lived access tokens and coordinates refresh to avoid stampedes.
type GeminiTokenCache interface {
// cacheKey should be stable for the token scope; for GeminiCli OAuth we primarily use project_id.
GetAccessToken(ctx context.Context, cacheKey string) (string, error)
SetAccessToken(ctx context.Context, cacheKey string, token string, ttl time.Duration) error
AcquireRefreshLock(ctx context.Context, cacheKey string, ttl time.Duration) (bool, error)
ReleaseRefreshLock(ctx context.Context, cacheKey string) error
}
package service
import (
"context"
"errors"
"log"
"strconv"
"strings"
"time"
)
const (
geminiTokenRefreshSkew = 3 * time.Minute
geminiTokenCacheSkew = 5 * time.Minute
)
type GeminiTokenProvider struct {
accountRepo AccountRepository
tokenCache GeminiTokenCache
geminiOAuthService *GeminiOAuthService
}
func NewGeminiTokenProvider(
accountRepo AccountRepository,
tokenCache GeminiTokenCache,
geminiOAuthService *GeminiOAuthService,
) *GeminiTokenProvider {
return &GeminiTokenProvider{
accountRepo: accountRepo,
tokenCache: tokenCache,
geminiOAuthService: geminiOAuthService,
}
}
func (p *GeminiTokenProvider) GetAccessToken(ctx context.Context, account *Account) (string, error) {
if account == nil {
return "", errors.New("account is nil")
}
if account.Platform != PlatformGemini || account.Type != AccountTypeOAuth {
return "", errors.New("not a gemini oauth account")
}
cacheKey := geminiTokenCacheKey(account)
// 1) Try cache first.
if p.tokenCache != nil {
if token, err := p.tokenCache.GetAccessToken(ctx, cacheKey); err == nil && strings.TrimSpace(token) != "" {
return token, nil
}
}
// 2) Refresh if needed (pre-expiry skew).
expiresAt := parseExpiresAt(account)
needsRefresh := expiresAt == nil || time.Until(*expiresAt) <= geminiTokenRefreshSkew
if needsRefresh && p.tokenCache != nil {
locked, err := p.tokenCache.AcquireRefreshLock(ctx, cacheKey, 30*time.Second)
if err == nil && locked {
defer func() { _ = p.tokenCache.ReleaseRefreshLock(ctx, cacheKey) }()
// Re-check after lock (another worker may have refreshed).
if token, err := p.tokenCache.GetAccessToken(ctx, cacheKey); err == nil && strings.TrimSpace(token) != "" {
return token, nil
}
fresh, err := p.accountRepo.GetByID(ctx, account.ID)
if err == nil && fresh != nil {
account = fresh
}
expiresAt = parseExpiresAt(account)
if expiresAt == nil || time.Until(*expiresAt) <= geminiTokenRefreshSkew {
if p.geminiOAuthService == nil {
return "", errors.New("gemini oauth service not configured")
}
tokenInfo, err := p.geminiOAuthService.RefreshAccountToken(ctx, account)
if err != nil {
return "", err
}
newCredentials := p.geminiOAuthService.BuildAccountCredentials(tokenInfo)
for k, v := range account.Credentials {
if _, exists := newCredentials[k]; !exists {
newCredentials[k] = v
}
}
account.Credentials = newCredentials
_ = p.accountRepo.Update(ctx, account)
expiresAt = parseExpiresAt(account)
}
}
}
accessToken := account.GetCredential("access_token")
if strings.TrimSpace(accessToken) == "" {
return "", errors.New("access_token not found in credentials")
}
// project_id is optional now:
// - If present: will use Code Assist API (requires project_id)
// - If absent: will use AI Studio API with OAuth token (like regular API key mode)
// Auto-detect project_id only if explicitly enabled via a credential flag
projectID := strings.TrimSpace(account.GetCredential("project_id"))
autoDetectProjectID := account.GetCredential("auto_detect_project_id") == "true"
if projectID == "" && autoDetectProjectID {
if p.geminiOAuthService == nil {
return accessToken, nil // Fallback to AI Studio API mode
}
var proxyURL string
if account.ProxyID != nil && p.geminiOAuthService.proxyRepo != nil {
if proxy, err := p.geminiOAuthService.proxyRepo.GetByID(ctx, *account.ProxyID); err == nil && proxy != nil {
proxyURL = proxy.URL()
}
}
detected, err := p.geminiOAuthService.fetchProjectID(ctx, accessToken, proxyURL)
if err != nil {
log.Printf("[GeminiTokenProvider] Auto-detect project_id failed: %v, fallback to AI Studio API mode", err)
return accessToken, nil
}
detected = strings.TrimSpace(detected)
if detected != "" {
if account.Credentials == nil {
account.Credentials = make(map[string]any)
}
account.Credentials["project_id"] = detected
_ = p.accountRepo.Update(ctx, account)
}
}
// 3) Populate cache with TTL.
if p.tokenCache != nil {
ttl := 30 * time.Minute
if expiresAt != nil {
until := time.Until(*expiresAt)
switch {
case until > geminiTokenCacheSkew:
ttl = until - geminiTokenCacheSkew
case until > 0:
ttl = until
default:
ttl = time.Minute
}
}
_ = p.tokenCache.SetAccessToken(ctx, cacheKey, accessToken, ttl)
}
return accessToken, nil
}
func geminiTokenCacheKey(account *Account) string {
projectID := strings.TrimSpace(account.GetCredential("project_id"))
if projectID != "" {
return projectID
}
return "account:" + strconv.FormatInt(account.ID, 10)
}
func parseExpiresAt(account *Account) *time.Time {
raw := strings.TrimSpace(account.GetCredential("expires_at"))
if raw == "" {
return nil
}
if unixSec, err := strconv.ParseInt(raw, 10, 64); err == nil && unixSec > 0 {
t := time.Unix(unixSec, 0)
return &t
}
if t, err := time.Parse(time.RFC3339, raw); err == nil {
return &t
}
return nil
}
package service
import (
"context"
"strconv"
"time"
)
type GeminiTokenRefresher struct {
geminiOAuthService *GeminiOAuthService
}
func NewGeminiTokenRefresher(geminiOAuthService *GeminiOAuthService) *GeminiTokenRefresher {
return &GeminiTokenRefresher{geminiOAuthService: geminiOAuthService}
}
func (r *GeminiTokenRefresher) CanRefresh(account *Account) bool {
return account.Platform == PlatformGemini && account.Type == AccountTypeOAuth
}
func (r *GeminiTokenRefresher) NeedsRefresh(account *Account, refreshWindow time.Duration) bool {
if !r.CanRefresh(account) {
return false
}
expiresAtStr := account.GetCredential("expires_at")
if expiresAtStr == "" {
return false
}
expiresAt, err := strconv.ParseInt(expiresAtStr, 10, 64)
if err != nil {
return false
}
expiryTime := time.Unix(expiresAt, 0)
return time.Until(expiryTime) < refreshWindow
}
func (r *GeminiTokenRefresher) Refresh(ctx context.Context, account *Account) (map[string]any, error) {
tokenInfo, err := r.geminiOAuthService.RefreshAccountToken(ctx, account)
if err != nil {
return nil, err
}
newCredentials := r.geminiOAuthService.BuildAccountCredentials(tokenInfo)
for k, v := range account.Credentials {
if _, exists := newCredentials[k]; !exists {
newCredentials[k] = v
}
}
return newCredentials, nil
}
package service
import (
"context"
"github.com/Wei-Shaw/sub2api/internal/pkg/geminicli"
)
// GeminiCliCodeAssistClient calls GeminiCli internal Code Assist endpoints.
type GeminiCliCodeAssistClient interface {
LoadCodeAssist(ctx context.Context, accessToken, proxyURL string, req *geminicli.LoadCodeAssistRequest) (*geminicli.LoadCodeAssistResponse, error)
OnboardUser(ctx context.Context, accessToken, proxyURL string, req *geminicli.OnboardUserRequest) (*geminicli.OnboardUserResponse, error)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment