# TrafficAPI AI 模型调用文档 ## 概述 TrafficAPI 是一个 AI API 网关平台,用于分发和管理 AI 产品订阅的 API 配额。它支持 Claude、OpenAI、Gemini 等多种 AI 模型,提供统一的 API 接口和智能调度功能。 ## 接入地址 | 项目 | 值 | |------|----| | Base URL | `https://trafficapi.fcluadecodex.xyz` | | Claude 端点 | `https://trafficapi.fcluadecodex.xyz/v1/messages` | | OpenAI 端点 | `https://trafficapi.fcluadecodex.xyz/v1/chat/completions` | | Gemini 端点 | `https://trafficapi.fcluadecodex.xyz/v1beta/models/{model}%3AgenerateContent` | > **注意**:Gemini 端点路径中的 `%3A` 是冒号 `:` 的 URL 编码,curl 及各语言 HTTP 客户端需使用编码后的形式。 ## 环境变量约定 文档中所有调用示例均通过以下环境变量引用凭据,请在运行前预先配置: ```bash export TRAFFICAPI_BASE_URL="https://trafficapi.fcluadecodex.xyz" export CLAUDE_API_KEY="your-claude-api-key" export OPENAI_API_KEY="your-openai-api-key" export GEMINI_API_KEY="your-gemini-api-key" ``` ## 目录 1. [TrafficAPI 基本原理](#TrafficAPI-基本原理) 2. [认证和 API Key 获取方式](#认证和-api-key-获取方式) 3. [Claude 模型调用](#claude-模型调用) - [HTTP API 调用](#http-api-调用) - [SDK 调用](#sdk-调用) 4. [OpenAI 模型调用](#openai-模型调用) - [HTTP API 调用](#http-api-调用-1) - [SDK 调用](#sdk-调用-1) 5. [Gemini 模型调用](#gemini-模型调用) - [HTTP API 调用](#http-api-调用-2) - [SDK 调用](#sdk-调用-2) 6. [流式响应处理](#流式响应处理) 7. [错误处理和重试机制](#错误处理和重试机制) 8. [性能优化建议](#性能优化建议) ## TrafficAPI 基本原理 ### 架构设计 TrafficAPI 作为 AI API 中转服务,其核心原理如下: 1. **统一网关层**:提供标准化的 API 端点,兼容 Claude、OpenAI、Gemini 等不同厂商的 API 协议 2. **智能调度**:根据账号状态、负载情况、成本等因素智能选择上游账号 3. **粘性会话**:支持会话级别的账号绑定,确保同一会话的请求路由到同一上游账号 4. **配额管理**:精确追踪 Token 使用量,实现按量计费 5. **并发控制**:用户级和账号级的并发限制,防止资源滥用 ### 核心功能 - **多账号管理**:支持多种上游账号类型(OAuth、API Key) - **API Key 分发**:为用户生成和管理 API Key - **精确计费**:Token 级别的用量追踪和成本计算 - **智能调度**:智能账号选择,支持粘性会话 - **并发控制**:用户级和账号级并发限制 - **速率限制**:可配置的请求和 Token 速率限制 ## 认证和 API Key 获取方式 ### 1. 管理员获取 API Key 1. 登录 TrafficAPI 管理后台 2. 进入「API Keys」管理页面 3. 点击「Create API Key」生成新的 API Key 4. 复制生成的 API Key(格式:`sk-xxxxxxxxxxxxxxxx`) ### 2. 用户获取 API Key 1. 用户注册并登录 TrafficAPI 平台 2. 在用户面板中查看或生成 API Key 3. 管理员可以为用户分配 API Key ### 3. API Key 使用方式 在 HTTP 请求头中添加: ```http Authorization: Bearer $YOUR_API_KEY ``` 或者使用 `x-api-key` 头: ```http x-api-key: $YOUR_API_KEY ``` ## HTTP 请求参数详解 ### 请求头参数 | 参数名 | 是否必填 | 说明 | 示例值 | |--------|----------|------|--------| | Authorization | 是 | Bearer Token 认证 | `Bearer sk-xxx` | | Content-Type | 是 | 请求体格式 | `application/json` | | Accept | 否 | 响应格式 | `application/json` 或 `text/event-stream` | | User-Agent | 否 | 客户端标识 | `MyApp/1.0` | | X-Request-ID | 否 | 请求唯一标识 | `req_123456` | ### 请求体参数(通用) | 参数名 | 类型 | 是否必填 | 说明 | 示例值 | |--------|------|----------|------|--------| | model | string | 是 | 模型名称 | `claude-3-sonnet-20241022` | | messages | array | 是 | 消息数组 | `[{"role": "user", "content": "Hello"}]` | | max_tokens | integer | 否 | 最大生成 token 数 | `1000` | | temperature | number | 否 | 温度参数(0.0-2.0) | `0.7` | | stream | boolean | 否 | 是否启用流式响应 | `true` | | top_p | number | 否 | 核采样参数(0.0-1.0) | `0.9` | | frequency_penalty | number | 否 | 频率惩罚(-2.0-2.0) | `0.0` | | presence_penalty | number | 否 | 存在惩罚(-2.0-2.0) | `0.0` | ### Claude 特有参数 | 参数名 | 类型 | 是否必填 | 说明 | 示例值 | |--------|------|----------|------|--------| | system | string | 否 | 系统提示 | `"You are a helpful assistant."` | | max_tokens | integer | 否 | 最大生成 token 数 | `4096` | ### OpenAI 特有参数 | 参数名 | 类型 | 是否必填 | 说明 | 示例值 | |--------|------|----------|------|--------| | n | integer | 否 | 生成多个选择 | `1` | | stop | string/array | 否 | 停止序列 | `["\n", "Human:"]` | | logprobs | integer | 否 | 返回 token 对数概率 | `null` | ### Gemini 特有参数 | 参数名 | 类型 | 是否必填 | 说明 | 示例值 | |--------|------|----------|------|--------| | safetySettings | array | 否 | 安全设置 | `[{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}]` | | generationConfig | object | 否 | 生成配置 | `{"temperature": 0.7, "maxOutputTokens": 1000}` | ## 响应参数详解 ### 成功响应 ```json { "id": "chatcmpl-123", "object": "chat.completion", "created": 1677652288, "model": "claude-3-sonnet-20241022", "choices": [ { "index": 0, "message": { "role": "assistant", "content": "Hello! How can I help you today?" }, "finish_reason": "stop" } ], "usage": { "prompt_tokens": 10, "completion_tokens": 15, "total_tokens": 25 } } ``` ### 响应字段说明 | 字段名 | 类型 | 说明 | |--------|------|------| | id | string | 请求唯一标识 | | object | string | 对象类型 | | created | integer | 创建时间戳 | | model | string | 使用的模型 | | choices | array | 生成结果数组 | | usage | object | Token 使用统计 | | choices[].index | integer | 选择索引 | | choices[].message | object | 消息内容 | | choices[].finish_reason | string | 完成原因 | | usage.prompt_tokens | integer | 提示 token 数 | | usage.completion_tokens | integer | 生成 token 数 | | usage.total_tokens | integer | 总 token 数 | ## Claude 模型调用 ### HTTP API 调用 #### 基础信息 - **端点**:`/v1/messages` - **协议**:兼容 Anthropic Claude API - **Content-Type**:`application/json` #### curl 示例 ```bash # 非流式调用 curl -X POST "https://trafficapi.fcluadecodex.xyz/v1/messages" \ -H "Authorization: Bearer $CLAUDE_API_KEY" \ -H "Content-Type: application/json" \ -d '{ "model": "claude-sonnet-4-6", "max_tokens": 1024, "messages": [ { "role": "user", "content": "Hello, how are you?" } ] }' # 流式调用 curl -X POST "https://trafficapi.fcluadecodex.xyz/v1/messages" \ -H "Authorization: Bearer $CLAUDE_API_KEY" \ -H "Content-Type: application/json" \ -H "anthropic-version: 2023-06-01" \ -d '{ "model": "claude-sonnet-4-6", "max_tokens": 1024, "messages": [ { "role": "user", "content": "Hello, how are you?" } ], "stream": true }' ``` #### Python 示例 ```python import requests import json class TrafficAPIClient: def __init__(self, base_url, api_key): self.base_url = base_url self.api_key = api_key self.headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "anthropic-version": "2023-06-01" } def create_message(self, model, messages, max_tokens=1024, stream=False): url = f"{self.base_url}/v1/messages" payload = { "model": model, "max_tokens": max_tokens, "messages": messages, "stream": stream } response = requests.post(url, headers=self.headers, json=payload, stream=stream) if stream: return self._handle_stream_response(response) else: return response.json() def _handle_stream_response(self, response): for line in response.iter_lines(): if line: line = line.decode('utf-8') if line.startswith('data: '): data = line[6:] if data == '[DONE]': break try: yield json.loads(data) except json.JSONDecodeError: continue # 使用示例 import os client = TrafficAPIClient( base_url=os.environ.get("TRAFFICAPI_BASE_URL", "https://trafficapi.fcluadecodex.xyz"), api_key=os.environ["CLAUDE_API_KEY"] ) # 非流式调用 response = client.create_message( model="claude-sonnet-4-6", messages=[ {"role": "user", "content": "Hello, how are you?"} ], max_tokens=1024, stream=False ) print(response) # 流式调用 for chunk in client.create_message( model="claude-sonnet-4-6", messages=[ {"role": "user", "content": "Hello, how are you?"} ], max_tokens=1024, stream=True ): print(chunk) ``` **Python SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址,默认为官方地址 - `timeout`: 可选,请求超时时间(秒) - `max_retries`: 可选,最大重试次数 **聊天完成参数:** - `model`: 必填,模型名称 - `messages`: 必填,消息列表,格式为 `[{"role": "user", "content": "..."}]` - `max_tokens`: 可选,最大生成 token 数 - `temperature`: 可选,温度参数(0.0-2.0) - `stream`: 可选,是否启用流式响应 - `top_p`: 可选,核采样参数 - `frequency_penalty`: 可选,频率惩罚 - `presence_penalty`: 可选,存在惩罚 #### Go 示例 ```go package main import ( "bufio" "bytes" "encoding/json" "fmt" "io" "net/http" "os" "strings" ) type Message struct { Role string `json:"role"` Content string `json:"content"` } type ClaudeRequest struct { Model string `json:"model"` MaxTokens int `json:"max_tokens"` Messages []Message `json:"messages"` Stream bool `json:"stream"` } type ClaudeResponse struct { ID string `json:"id"` Type string `json:"type"` Role string `json:"role"` Content []struct { Type string `json:"type"` Text string `json:"text"` } `json:"content"` Model string `json:"model"` StopReason string `json:"stop_reason"` StopSequence string `json:"stop_sequence"` Usage struct { InputTokens int `json:"input_tokens"` OutputTokens int `json:"output_tokens"` } `json:"usage"` } type TrafficAPIClient struct { BaseURL string APIKey string } func NewTrafficAPIClient(baseURL, apiKey string) *TrafficAPIClient { return &TrafficAPIClient{ BaseURL: baseURL, APIKey: apiKey, } } func (c *TrafficAPIClient) CreateMessage(req ClaudeRequest) (*ClaudeResponse, error) { url := c.BaseURL + "/v1/messages" reqBody, err := json.Marshal(req) if err != nil { return nil, err } httpReq, err := http.NewRequest("POST", url, bytes.NewBuffer(reqBody)) if err != nil { return nil, err } httpReq.Header.Set("Authorization", "Bearer "+c.APIKey) httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("anthropic-version", "2023-06-01") client := &http.Client{} resp, err := client.Do(httpReq) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("API error: %s, body: %s", resp.Status, string(body)) } var claudeResp ClaudeResponse if err := json.NewDecoder(resp.Body).Decode(&claudeResp); err != nil { return nil, err } return &claudeResp, nil } func (c *TrafficAPIClient) CreateMessageStream(req ClaudeRequest) (<-chan string, error) { req.Stream = true url := c.BaseURL + "/v1/messages" reqBody, err := json.Marshal(req) if err != nil { return nil, err } httpReq, err := http.NewRequest("POST", url, bytes.NewBuffer(reqBody)) if err != nil { return nil, err } httpReq.Header.Set("Authorization", "Bearer "+c.APIKey) httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("anthropic-version", "2023-06-01") client := &http.Client{} resp, err := client.Do(httpReq) if err != nil { return nil, err } if resp.StatusCode != http.StatusOK { resp.Body.Close() body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("API error: %s, body: %s", resp.Status, string(body)) } ch := make(chan string) go func() { defer resp.Body.Close() defer close(ch) reader := bufio.NewReader(resp.Body) for { line, err := reader.ReadString('\n') if err != nil { if err == io.EOF { break } return } line = strings.TrimSpace(line) if strings.HasPrefix(line, "data: ") { data := line[6:] if data == "[DONE]" { break } ch <- data } } }() return ch, nil } // 使用示例 func main() { baseURL := os.Getenv("TRAFFICAPI_BASE_URL") if baseURL == "" { baseURL = "https://trafficapi.fcluadecodex.xyz" } client := NewTrafficAPIClient(baseURL, os.Getenv("CLAUDE_API_KEY")) // 非流式调用 req := ClaudeRequest{ Model: "claude-sonnet-4-6", MaxTokens: 1024, Messages: []Message{ {Role: "user", Content: "Hello, how are you?"}, }, Stream: false, } resp, err := client.CreateMessage(req) if err != nil { fmt.Printf("Error: %v\n", err) return } fmt.Printf("Response: %+v\n", resp) // 流式调用 req.Stream = true ch, err := client.CreateMessageStream(req) if err != nil { fmt.Printf("Error: %v\n", err) return } for chunk := range ch { fmt.Printf("Chunk: %s\n", chunk) } } ``` **Go SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址 - `http_client`: 可选,自定义 HTTP 客户端 **聊天完成参数:** - `Model`: 必填,模型名称 - `Messages`: 必填,消息数组 - `MaxTokens`: 可选,最大生成 token 数 - `Temperature`: 可选,温度参数 - `Stream`: 可选,是否启用流式响应 - `TopP`: 可选,核采样参数 - `FrequencyPenalty`: 可选,频率惩罚 - `PresencePenalty`: 可选,存在惩罚 ### SDK 调用 #### 使用官方 Anthropic SDK ```python import anthropic from anthropic import Anthropic # 配置 TrafficAPI 作为代理 import os client = Anthropic( api_key=os.environ["CLAUDE_API_KEY"], base_url=os.environ.get("TRAFFICAPI_BASE_URL", "https://trafficapi.fcluadecodex.xyz") # 无需 /v1 后缀,SDK 会自动添加 ) # 调用 Claude response = client.messages.create( model="claude-sonnet-4-6", max_tokens=1024, messages=[ {"role": "user", "content": "Hello, how are you?"} ] ) print(response.content[0].text) ``` **Python SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址,默认为官方地址 - `timeout`: 可选,请求超时时间(秒) - `max_retries`: 可选,最大重试次数 **聊天完成参数:** - `model`: 必填,模型名称 - `messages`: 必填,消息列表,格式为 `[{"role": "user", "content": "..."}]` - `max_tokens`: 可选,最大生成 token 数 - `temperature`: 可选,温度参数(0.0-2.0) - `stream`: 可选,是否启用流式响应 - `top_p`: 可选,核采样参数 - `frequency_penalty`: 可选,频率惩罚 - `presence_penalty`: 可选,存在惩罚 #### 使用 LangChain ```python from langchain.chat_models import ChatAnthropic from langchain.schema import HumanMessage # 配置 LangChain import os chat = ChatAnthropic( anthropic_api_key=os.environ["CLAUDE_API_KEY"], anthropic_api_url=os.environ.get("TRAFFICAPI_BASE_URL", "https://trafficapi.fcluadecodex.xyz"), model="claude-sonnet-4-6" ) # 调用 messages = [HumanMessage(content="Hello, how are you?")] response = chat(messages) print(response.content) ``` **Python SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址,默认为官方地址 - `timeout`: 可选,请求超时时间(秒) - `max_retries`: 可选,最大重试次数 **聊天完成参数:** - `model`: 必填,模型名称 - `messages`: 必填,消息列表,格式为 `[{"role": "user", "content": "..."}]` - `max_tokens`: 可选,最大生成 token 数 - `temperature`: 可选,温度参数(0.0-2.0) - `stream`: 可选,是否启用流式响应 - `top_p`: 可选,核采样参数 - `frequency_penalty`: 可选,频率惩罚 - `presence_penalty`: 可选,存在惩罚 ## OpenAI 模型调用 ### HTTP API 调用 #### 基础信息 - **端点**:`/v1/chat/completions` - **协议**:兼容 OpenAI Chat Completions API - **Content-Type**:`application/json` #### curl 示例 ```bash # 非流式调用 curl -X POST "https://trafficapi.fcluadecodex.xyz/v1/chat/completions" \ -H "Authorization: Bearer $OPENAI_API_KEY" \ -H "Content-Type: application/json" \ -d '{ "model": "gpt-5.2", "messages": [ { "role": "user", "content": "Hello, how are you?" } ], "max_tokens": 1024 }' # 流式调用 curl -X POST "https://trafficapi.fcluadecodex.xyz/v1/chat/completions" \ -H "Authorization: Bearer $OPENAI_API_KEY" \ -H "Content-Type: application/json" \ -d '{ "model": "gpt-5.2", "messages": [ { "role": "user", "content": "Hello, how are you?" } ], "max_tokens": 1024, "stream": true }' ``` #### Python 示例 ```python import requests import json class TrafficAPIOpenAIClient: def __init__(self, base_url, api_key): self.base_url = base_url self.api_key = api_key self.headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } def create_chat_completion(self, model, messages, max_tokens=1024, stream=False): url = f"{self.base_url}/v1/chat/completions" payload = { "model": model, "messages": messages, "max_tokens": max_tokens, "stream": stream } response = requests.post(url, headers=self.headers, json=payload, stream=stream) if stream: return self._handle_stream_response(response) else: return response.json() def _handle_stream_response(self, response): for line in response.iter_lines(): if line: line = line.decode('utf-8') if line.startswith('data: '): data = line[6:] if data == '[DONE]': break try: yield json.loads(data) except json.JSONDecodeError: continue # 使用示例 import os client = TrafficAPIOpenAIClient( base_url=os.environ.get("TRAFFICAPI_BASE_URL", "https://trafficapi.fcluadecodex.xyz"), api_key=os.environ["OPENAI_API_KEY"] ) # 非流式调用 response = client.create_chat_completion( model="gpt-5.2", messages=[ {"role": "user", "content": "Hello, how are you?"} ], max_tokens=1024, stream=False ) print(response["choices"][0]["message"]["content"]) # 流式调用 for chunk in client.create_chat_completion( model="gpt-5.2", messages=[ {"role": "user", "content": "Hello, how are you?"} ], max_tokens=1024, stream=True ): if "choices" in chunk and chunk["choices"]: delta = chunk["choices"][0].get("delta", {}) if "content" in delta: print(delta["content"], end="", flush=True) ``` **Python SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址,默认为官方地址 - `timeout`: 可选,请求超时时间(秒) - `max_retries`: 可选,最大重试次数 **聊天完成参数:** - `model`: 必填,模型名称 - `messages`: 必填,消息列表,格式为 `[{"role": "user", "content": "..."}]` - `max_tokens`: 可选,最大生成 token 数 - `temperature`: 可选,温度参数(0.0-2.0) - `stream`: 可选,是否启用流式响应 - `top_p`: 可选,核采样参数 - `frequency_penalty`: 可选,频率惩罚 - `presence_penalty`: 可选,存在惩罚 #### Go 示例 ```go package main import ( "bufio" "bytes" "encoding/json" "fmt" "io" "net/http" "os" "strings" ) type OpenAIMessage struct { Role string `json:"role"` Content string `json:"content"` } type OpenAIRequest struct { Model string `json:"model"` Messages []OpenAIMessage `json:"messages"` MaxTokens int `json:"max_tokens"` Stream bool `json:"stream"` } type OpenAIResponse struct { ID string `json:"id"` Object string `json:"object"` Created int64 `json:"created"` Model string `json:"model"` Choices []struct { Index int `json:"index"` Message struct { Role string `json:"role"` Content string `json:"content"` } `json:"message"` FinishReason string `json:"finish_reason"` } `json:"choices"` Usage struct { PromptTokens int `json:"prompt_tokens"` CompletionTokens int `json:"completion_tokens"` TotalTokens int `json:"total_tokens"` } `json:"usage"` } type TrafficAPIOpenAIClient struct { BaseURL string APIKey string } func NewTrafficAPIOpenAIClient(baseURL, apiKey string) *TrafficAPIOpenAIClient { return &TrafficAPIOpenAIClient{ BaseURL: baseURL, APIKey: apiKey, } } func (c *TrafficAPIOpenAIClient) CreateChatCompletion(req OpenAIRequest) (*OpenAIResponse, error) { url := c.BaseURL + "/v1/chat/completions" reqBody, err := json.Marshal(req) if err != nil { return nil, err } httpReq, err := http.NewRequest("POST", url, bytes.NewBuffer(reqBody)) if err != nil { return nil, err } httpReq.Header.Set("Authorization", "Bearer "+c.APIKey) httpReq.Header.Set("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(httpReq) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("API error: %s, body: %s", resp.Status, string(body)) } var openaiResp OpenAIResponse if err := json.NewDecoder(resp.Body).Decode(&openaiResp); err != nil { return nil, err } return &openaiResp, nil } func (c *TrafficAPIOpenAIClient) CreateChatCompletionStream(req OpenAIRequest) (<-chan string, error) { req.Stream = true url := c.BaseURL + "/v1/chat/completions" reqBody, err := json.Marshal(req) if err != nil { return nil, err } httpReq, err := http.NewRequest("POST", url, bytes.NewBuffer(reqBody)) if err != nil { return nil, err } httpReq.Header.Set("Authorization", "Bearer "+c.APIKey) httpReq.Header.Set("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(httpReq) if err != nil { return nil, err } if resp.StatusCode != http.StatusOK { resp.Body.Close() body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("API error: %s, body: %s", resp.Status, string(body)) } ch := make(chan string) go func() { defer resp.Body.Close() defer close(ch) reader := bufio.NewReader(resp.Body) for { line, err := reader.ReadString('\n') if err != nil { if err == io.EOF { break } return } line = strings.TrimSpace(line) if strings.HasPrefix(line, "data: ") { data := line[6:] if data == "[DONE]" { break } ch <- data } } }() return ch, nil } // 使用示例 func main() { baseURL := os.Getenv("TRAFFICAPI_BASE_URL") if baseURL == "" { baseURL = "https://trafficapi.fcluadecodex.xyz" } client := NewTrafficAPIOpenAIClient(baseURL, os.Getenv("OPENAI_API_KEY")) // 非流式调用 req := OpenAIRequest{ Model: "gpt-5.2", Messages: []OpenAIMessage{ {Role: "user", Content: "Hello, how are you?"}, }, MaxTokens: 1024, Stream: false, } resp, err := client.CreateChatCompletion(req) if err != nil { fmt.Printf("Error: %v\n", err) return } fmt.Printf("Response: %+v\n", resp) // 流式调用 req.Stream = true ch, err := client.CreateChatCompletionStream(req) if err != nil { fmt.Printf("Error: %v\n", err) return } for chunk := range ch { fmt.Printf("Chunk: %s\n", chunk) } } ``` **Go SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址 - `http_client`: 可选,自定义 HTTP 客户端 **聊天完成参数:** - `Model`: 必填,模型名称 - `Messages`: 必填,消息数组 - `MaxTokens`: 可选,最大生成 token 数 - `Temperature`: 可选,温度参数 - `Stream`: 可选,是否启用流式响应 - `TopP`: 可选,核采样参数 - `FrequencyPenalty`: 可选,频率惩罚 - `PresencePenalty`: 可选,存在惩罚 ### SDK 调用 #### 使用官方 OpenAI SDK ```python import openai # 配置 TrafficAPI 作为代理 import os client = openai.OpenAI( api_key=os.environ["OPENAI_API_KEY"], base_url=os.environ.get("TRAFFICAPI_BASE_URL", "https://trafficapi.fcluadecodex.xyz") + "/v1" ) # 调用 GPT response = client.chat.completions.create( model="gpt-5.2", messages=[ {"role": "user", "content": "Hello, how are you?"} ], max_tokens=1024 ) print(response.choices[0].message.content) ``` **Python SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址,默认为官方地址 - `timeout`: 可选,请求超时时间(秒) - `max_retries`: 可选,最大重试次数 **聊天完成参数:** - `model`: 必填,模型名称 - `messages`: 必填,消息列表,格式为 `[{"role": "user", "content": "..."}]` - `max_tokens`: 可选,最大生成 token 数 - `temperature`: 可选,温度参数(0.0-2.0) - `stream`: 可选,是否启用流式响应 - `top_p`: 可选,核采样参数 - `frequency_penalty`: 可选,频率惩罚 - `presence_penalty`: 可选,存在惩罚 #### 使用 LangChain ```python from langchain.chat_models import ChatOpenAI from langchain.schema import HumanMessage # 配置 LangChain import os chat = ChatOpenAI( openai_api_key=os.environ["OPENAI_API_KEY"], openai_api_base=os.environ.get("TRAFFICAPI_BASE_URL", "https://trafficapi.fcluadecodex.xyz") + "/v1", model_name="gpt-5.2" ) # 调用 messages = [HumanMessage(content="Hello, how are you?")] response = chat(messages) print(response.content) ``` **Python SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址,默认为官方地址 - `timeout`: 可选,请求超时时间(秒) - `max_retries`: 可选,最大重试次数 **聊天完成参数:** - `model`: 必填,模型名称 - `messages`: 必填,消息列表,格式为 `[{"role": "user", "content": "..."}]` - `max_tokens`: 可选,最大生成 token 数 - `temperature`: 可选,温度参数(0.0-2.0) - `stream`: 可选,是否启用流式响应 - `top_p`: 可选,核采样参数 - `frequency_penalty`: 可选,频率惩罚 - `presence_penalty`: 可选,存在惩罚 ## Gemini 模型调用 ### HTTP API 调用 #### 基础信息 - **端点**:`/v1beta/models/{model}:generateContent` - **协议**:兼容 Google Gemini API - **Content-Type**:`application/json` #### curl 示例 ```bash # 非流式调用 curl -X POST "https://trafficapi.fcluadecodex.xyz/v1beta/models/gemini-3.1-flash-lite-preview%3AgenerateContent" \ -H "Authorization: Bearer $GEMINI_API_KEY" \ -H "Content-Type: application/json" \ -d '{ "contents": [ { "parts": [ { "text": "Hello, how are you?" } ] } ] }' # 流式调用(使用 streamGenerateContent 端点,注意 %3A 是 : 的 URL 编码) curl -X POST "https://trafficapi.fcluadecodex.xyz/v1beta/models/gemini-3.1-flash-lite-preview%3AstreamGenerateContent" \ -H "Authorization: Bearer $GEMINI_API_KEY" \ -H "Content-Type: application/json" \ -d '{ "contents": [ { "parts": [ { "text": "Hello, how are you?" } ] } ] }' ``` #### Python 示例 ```python import requests import json class TrafficAPIGeminiClient: def __init__(self, base_url, api_key): self.base_url = base_url self.api_key = api_key self.headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } def generate_content(self, model, contents): # 注意:路径中的 : 需要 URL 编码为 %3A url = f"{self.base_url}/v1beta/models/{model}%3AgenerateContent" payload = {"contents": contents} response = requests.post(url, headers=self.headers, json=payload) return response.json() def generate_content_stream(self, model, contents): # 注意:路径中的 : 需要 URL 编码为 %3A url = f"{self.base_url}/v1beta/models/{model}%3AstreamGenerateContent" payload = {"contents": contents} response = requests.post(url, headers=self.headers, json=payload, stream=True) return self._handle_stream_response(response) def _handle_stream_response(self, response): # streamGenerateContent 返回 SSE 格式,每行以 "data: " 开头 for line in response.iter_lines(): if line: line = line.decode('utf-8') if line.startswith('data: '): data = line[6:] if data == '[DONE]': break try: yield json.loads(data) except json.JSONDecodeError: continue # 使用示例 import os client = TrafficAPIGeminiClient( base_url=os.environ.get("TRAFFICAPI_BASE_URL", "https://trafficapi.fcluadecodex.xyz"), api_key=os.environ["GEMINI_API_KEY"] ) # 非流式调用 response = client.generate_content( model="gemini-3.1-flash-lite-preview", contents=[ { "parts": [ {"text": "Hello, how are you?"} ] } ], stream=False ) print(response["candidates"][0]["content"]["parts"][0]["text"]) # 流式调用(使用 streamGenerateContent 端点) for chunk in client.generate_content_stream( model="gemini-3.1-flash-lite-preview", contents=[ { "parts": [ {"text": "Hello, how are you?"} ] } ] ): if "candidates" in chunk and chunk["candidates"]: for candidate in chunk["candidates"]: if "content" in candidate and "parts" in candidate["content"]: for part in candidate["content"]["parts"]: if "text" in part: print(part["text"], end="", flush=True) ``` **Python SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址,默认为官方地址 - `timeout`: 可选,请求超时时间(秒) - `max_retries`: 可选,最大重试次数 **聊天完成参数:** - `model`: 必填,模型名称 - `messages`: 必填,消息列表,格式为 `[{"role": "user", "content": "..."}]` - `max_tokens`: 可选,最大生成 token 数 - `temperature`: 可选,温度参数(0.0-2.0) - `stream`: 可选,是否启用流式响应 - `top_p`: 可选,核采样参数 - `frequency_penalty`: 可选,频率惩罚 - `presence_penalty`: 可选,存在惩罚 #### Go 示例 ```go package main import ( "bufio" "bytes" "encoding/json" "fmt" "io" "net/http" "os" "strings" ) type GeminiPart struct { Text string `json:"text"` } type GeminiContent struct { Parts []GeminiPart `json:"parts"` } type GeminiRequest struct { Contents []GeminiContent `json:"contents"` GenerationConfig *struct { CandidateCount int `json:"candidateCount"` } `json:"generationConfig,omitempty"` } type GeminiResponse struct { Candidates []struct { Content struct { Parts []GeminiPart `json:"parts"` } `json:"content"` FinishReason string `json:"finishReason"` } `json:"candidates"` UsageMetadata *struct { PromptTokenCount int `json:"promptTokenCount"` CandidatesTokenCount int `json:"candidatesTokenCount"` TotalTokenCount int `json:"totalTokenCount"` } `json:"usageMetadata"` } type TrafficAPIGeminiClient struct { BaseURL string APIKey string } func NewTrafficAPIGeminiClient(baseURL, apiKey string) *TrafficAPIGeminiClient { return &TrafficAPIGeminiClient{ BaseURL: baseURL, APIKey: apiKey, } } func (c *TrafficAPIGeminiClient) GenerateContent(model string, req GeminiRequest) (*GeminiResponse, error) { // 注意:路径中的 : 需要 URL 编码为 %3A(%%3A 在 Sprintf 中输出字面量 %3A) url := fmt.Sprintf("%s/v1beta/models/%s%%3AgenerateContent", c.BaseURL, model) reqBody, err := json.Marshal(req) if err != nil { return nil, err } httpReq, err := http.NewRequest("POST", url, bytes.NewBuffer(reqBody)) if err != nil { return nil, err } httpReq.Header.Set("Authorization", "Bearer "+c.APIKey) httpReq.Header.Set("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(httpReq) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("API error: %s, body: %s", resp.Status, string(body)) } var geminiResp GeminiResponse if err := json.NewDecoder(resp.Body).Decode(&geminiResp); err != nil { return nil, err } return &geminiResp, nil } func (c *TrafficAPIGeminiClient) GenerateContentStream(model string, req GeminiRequest) (<-chan string, error) { url := fmt.Sprintf("%s/v1beta/models/%s%%3AstreamGenerateContent", c.BaseURL, model) reqBody, err := json.Marshal(req) if err != nil { return nil, err } httpReq, err := http.NewRequest("POST", url, bytes.NewBuffer(reqBody)) if err != nil { return nil, err } httpReq.Header.Set("Authorization", "Bearer "+c.APIKey) httpReq.Header.Set("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(httpReq) if err != nil { return nil, err } if resp.StatusCode != http.StatusOK { resp.Body.Close() body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("API error: %s, body: %s", resp.Status, string(body)) } ch := make(chan string) go func() { defer resp.Body.Close() defer close(ch) // streamGenerateContent 返回 SSE 格式,每行以 "data: " 开头 scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { line := scanner.Text() if strings.HasPrefix(line, "data: ") { data := line[6:] if data == "[DONE]" { break } if data != "" { ch <- data } } } }() return ch, nil } // 使用示例 func main() { baseURL := os.Getenv("TRAFFICAPI_BASE_URL") if baseURL == "" { baseURL = "https://trafficapi.fcluadecodex.xyz" } client := NewTrafficAPIGeminiClient(baseURL, os.Getenv("GEMINI_API_KEY")) // 非流式调用 req := GeminiRequest{ Contents: []GeminiContent{ { Parts: []GeminiPart{ {Text: "Hello, how are you?"}, }, }, }, } resp, err := client.GenerateContent("gemini-3.1-flash-lite-preview", req) if err != nil { fmt.Printf("Error: %v\n", err) return } fmt.Printf("Response: %+v\n", resp) // 流式调用 ch, err := client.GenerateContentStream("gemini-3.1-flash-lite-preview", req) if err != nil { fmt.Printf("Error: %v\n", err) return } for chunk := range ch { fmt.Printf("Chunk: %s\n", chunk) } } ``` **Go SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址 - `http_client`: 可选,自定义 HTTP 客户端 **聊天完成参数:** - `Model`: 必填,模型名称 - `Messages`: 必填,消息数组 - `MaxTokens`: 可选,最大生成 token 数 - `Temperature`: 可选,温度参数 - `Stream`: 可选,是否启用流式响应 - `TopP`: 可选,核采样参数 - `FrequencyPenalty`: 可选,频率惩罚 - `PresencePenalty`: 可选,存在惩罚 ### SDK 调用 #### 使用官方 Google Generative AI SDK ```python import google.generativeai as genai # 配置 TrafficAPI 作为代理 import os _base = os.environ.get("TRAFFICAPI_BASE_URL", "https://trafficapi.fcluadecodex.xyz") genai.configure( api_key=os.environ["GEMINI_API_KEY"], transport="rest", client_options={ "api_endpoint": f"{_base}/v1beta" } ) # 创建模型 model = genai.GenerativeModel("gemini-3.1-flash-lite-preview") # 调用 response = model.generate_content("Hello, how are you?") print(response.text) # 流式调用 response = model.generate_content("Hello, how are you?", stream=True) for chunk in response: print(chunk.text, end="", flush=True) ``` **Python SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址,默认为官方地址 - `timeout`: 可选,请求超时时间(秒) - `max_retries`: 可选,最大重试次数 **聊天完成参数:** - `model`: 必填,模型名称 - `messages`: 必填,消息列表,格式为 `[{"role": "user", "content": "..."}]` - `max_tokens`: 可选,最大生成 token 数 - `temperature`: 可选,温度参数(0.0-2.0) - `stream`: 可选,是否启用流式响应 - `top_p`: 可选,核采样参数 - `frequency_penalty`: 可选,频率惩罚 - `presence_penalty`: 可选,存在惩罚 #### 使用 LangChain ```python from langchain_google_genai import ChatGoogleGenerativeAI from langchain.schema import HumanMessage # 配置 LangChain import os chat = ChatGoogleGenerativeAI( google_api_key=os.environ["GEMINI_API_KEY"], google_api_base=os.environ.get("TRAFFICAPI_BASE_URL", "https://trafficapi.fcluadecodex.xyz") + "/v1beta", model="gemini-3.1-flash-lite-preview" ) # 调用 messages = [HumanMessage(content="Hello, how are you?")] response = chat(messages) print(response.content) ``` **Python SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址,默认为官方地址 - `timeout`: 可选,请求超时时间(秒) - `max_retries`: 可选,最大重试次数 **聊天完成参数:** - `model`: 必填,模型名称 - `messages`: 必填,消息列表,格式为 `[{"role": "user", "content": "..."}]` - `max_tokens`: 可选,最大生成 token 数 - `temperature`: 可选,温度参数(0.0-2.0) - `stream`: 可选,是否启用流式响应 - `top_p`: 可选,核采样参数 - `frequency_penalty`: 可选,频率惩罚 - `presence_penalty`: 可选,存在惩罚 ## 流式响应处理 ### 通用流式响应处理模式 TrafficAPI 支持所有模型的流式响应,处理模式类似但略有不同: #### Claude 流式响应格式 ``` event: message_start data: {"type":"message_start","message":{"model":"claude-sonnet-4-6","id":"msg_xxx","type":"message","role":"assistant","content":[],...}} event: content_block_start data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}} event: content_block_delta data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}} event: content_block_delta data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" there"}} event: message_stop data: {"type":"message_stop"} ``` #### OpenAI 流式响应格式 ``` data: {"id": "chatcmpl-123", "object": "chat.completion.chunk", "created": 1677652288, "model": "gpt-5.2", "choices": [{"index": 0, "delta": {"content": "Hello"}, "finish_reason": null}]} data: {"id": "chatcmpl-123", "object": "chat.completion.chunk", "created": 1677652288, "model": "gpt-5.2", "choices": [{"index": 0, "delta": {"content": " there"}, "finish_reason": null}]} data: [DONE] ``` #### Gemini 流式响应格式(streamGenerateContent 端点,SSE 格式) ``` data: {"candidates":[{"content":{"role":"model","parts":[{"text":"Hello"}]},"finishReason":"","index":0}],"usageMetadata":{...},"modelVersion":"gemini-3.1-flash-lite-preview"} data: {"candidates":[{"content":{"role":"model","parts":[{"text":" there"}]},"finishReason":"STOP","index":0}],"usageMetadata":{...}} ``` ### 流式处理最佳实践 1. **缓冲区管理**:适当缓冲流式数据,避免频繁的 UI 更新 2. **错误处理**:流式响应中可能包含错误信息,需要实时检测 3. **连接保持**:确保网络连接稳定,必要时实现重连机制 4. **资源清理**:及时关闭流式连接,释放资源 ### Python 通用流式处理函数 ```python def handle_stream_response(response, callback): """ 通用流式响应处理函数 Args: response: requests.Response 对象 callback: 处理每个数据块的函数,接收解析后的 JSON 数据 """ buffer = "" for line in response.iter_lines(): if line: line = line.decode('utf-8') # 处理 SSE 格式(Claude/OpenAI) if line.startswith('data: '): data = line[6:] if data == '[DONE]': break try: json_data = json.loads(data) callback(json_data) except json.JSONDecodeError: # 可能是 Gemini 的纯 JSON 流 buffer += data try: json_data = json.loads(buffer) callback(json_data) buffer = "" except json.JSONDecodeError: continue else: # 处理 Gemini 的纯 JSON 流 buffer += line try: json_data = json.loads(buffer) callback(json_data) buffer = "" except json.JSONDecodeError: continue ``` **Python SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址,默认为官方地址 - `timeout`: 可选,请求超时时间(秒) - `max_retries`: 可选,最大重试次数 **聊天完成参数:** - `model`: 必填,模型名称 - `messages`: 必填,消息列表,格式为 `[{"role": "user", "content": "..."}]` - `max_tokens`: 可选,最大生成 token 数 - `temperature`: 可选,温度参数(0.0-2.0) - `stream`: 可选,是否启用流式响应 - `top_p`: 可选,核采样参数 - `frequency_penalty`: 可选,频率惩罚 - `presence_penalty`: 可选,存在惩罚 ## 错误处理和重试机制 ### 常见错误码 | 状态码 | 错误类型 | 描述 | 处理建议 | |--------|----------|------|----------| | 400 | Bad Request | 请求参数错误 | 检查请求格式和参数 | | 401 | Unauthorized | API Key 无效或过期 | 检查 API Key 有效性 | | 403 | Forbidden | 权限不足 | 检查用户权限和配额 | | 429 | Too Many Requests | 请求频率超限 | 降低请求频率,实现退避重试 | | 500 | Internal Server Error | 服务器内部错误 | 等待后重试,联系管理员 | | 502 | Bad Gateway | 上游服务不可用 | 等待后重试 | | 503 | Service Unavailable | 服务暂时不可用 | 等待后重试 | ### 重试策略实现 #### Python 重试装饰器 ```python import time import random from functools import wraps def retry_with_exponential_backoff( max_retries=5, initial_delay=1, exponential_base=2, jitter=True, retry_status_codes=[429, 500, 502, 503, 504] ): """指数退避重试装饰器""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): delay = initial_delay for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: # 检查是否需要重试 should_retry = False if hasattr(e, 'status_code'): should_retry = e.status_code in retry_status_codes elif hasattr(e, 'response') and hasattr(e.response, 'status_code'): should_retry = e.response.status_code in retry_status_codes if not should_retry or attempt == max_retries - 1: raise # 计算延迟时间 delay *= exponential_base ** attempt if jitter: delay = random.uniform(0, delay) print(f"Attempt {attempt + 1} failed, retrying in {delay:.2f} seconds...") time.sleep(delay) return func(*args, **kwargs) return wrapper return decorator # 使用示例 @retry_with_exponential_backoff(max_retries=3) def call_api_with_retry(): response = requests.post(...) response.raise_for_status() return response.json() ``` **Python SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址,默认为官方地址 - `timeout`: 可选,请求超时时间(秒) - `max_retries`: 可选,最大重试次数 **聊天完成参数:** - `model`: 必填,模型名称 - `messages`: 必填,消息列表,格式为 `[{"role": "user", "content": "..."}]` - `max_tokens`: 可选,最大生成 token 数 - `temperature`: 可选,温度参数(0.0-2.0) - `stream`: 可选,是否启用流式响应 - `top_p`: 可选,核采样参数 - `frequency_penalty`: 可选,频率惩罚 - `presence_penalty`: 可选,存在惩罚 #### Go 重试实现 ```go package main import ( "fmt" "math/rand" "time" ) type RetryConfig struct { MaxRetries int InitialDelay time.Duration ExponentialBase float64 Jitter bool RetryStatusCodes []int } func NewDefaultRetryConfig() *RetryConfig { return &RetryConfig{ MaxRetries: 5, InitialDelay: time.Second, ExponentialBase: 2.0, Jitter: true, RetryStatusCodes: []int{429, 500, 502, 503, 504}, } } func RetryWithExponentialBackoff(config *RetryConfig, fn func() error) error { var lastErr error delay := config.InitialDelay for attempt := 0; attempt < config.MaxRetries; attempt++ { err := fn() if err == nil { return nil } lastErr = err // 检查是否需要重试 shouldRetry := false if apiErr, ok := err.(*APIError); ok { for _, code := range config.RetryStatusCodes { if apiErr.StatusCode == code { shouldRetry = true break } } } if !shouldRetry || attempt == config.MaxRetries-1 { break } // 计算延迟时间 delay = time.Duration(float64(delay) * config.ExponentialBase) if config.Jitter { delay = time.Duration(float64(delay) * (0.5 + rand.Float64())) } fmt.Printf("Attempt %d failed, retrying in %v...\n", attempt+1, delay) time.Sleep(delay) } return lastErr } // 使用示例 func callAPIWithRetry() error { config := NewDefaultRetryConfig() return RetryWithExponentialBackoff(config, func() error { resp, err := http.Post(...) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return &APIError{ StatusCode: resp.StatusCode, Message: "API request failed", } } return nil }) } ``` **Go SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址 - `http_client`: 可选,自定义 HTTP 客户端 **聊天完成参数:** - `Model`: 必填,模型名称 - `Messages`: 必填,消息数组 - `MaxTokens`: 可选,最大生成 token 数 - `Temperature`: 可选,温度参数 - `Stream`: 可选,是否启用流式响应 - `TopP`: 可选,核采样参数 - `FrequencyPenalty`: 可选,频率惩罚 - `PresencePenalty`: 可选,存在惩罚 ### 错误监控和告警 1. **错误率监控**:监控 API 调用错误率,设置阈值告警 2. **延迟监控**:监控 API 响应时间,识别性能问题 3. **配额监控**:监控用户配额使用情况,提前预警 4. **健康检查**:定期进行健康检查,确保服务可用性 ## 性能优化建议 ### 1. 连接池管理 #### Python 连接池配置 ```python import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry # 创建会话并配置连接池 session = requests.Session() # 配置重试策略 retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["POST", "GET"] ) # 配置适配器 adapter = HTTPAdapter( max_retries=retry_strategy, pool_connections=10, # 连接池大小 pool_maxsize=10, # 最大连接数 pool_block=False # 是否阻塞等待连接 ) session.mount("http://", adapter) session.mount("https://", adapter) # 使用会话调用 API response = session.post( "https://trafficapi.fcluadecodex.xyz/v1/chat/completions", headers=headers, json=payload, timeout=30 ) ``` **Python SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址,默认为官方地址 - `timeout`: 可选,请求超时时间(秒) - `max_retries`: 可选,最大重试次数 **聊天完成参数:** - `model`: 必填,模型名称 - `messages`: 必填,消息列表,格式为 `[{"role": "user", "content": "..."}]` - `max_tokens`: 可选,最大生成 token 数 - `temperature`: 可选,温度参数(0.0-2.0) - `stream`: 可选,是否启用流式响应 - `top_p`: 可选,核采样参数 - `frequency_penalty`: 可选,频率惩罚 - `presence_penalty`: 可选,存在惩罚 #### Go 连接池配置 ```go package main import ( "net/http" "time" ) func createHTTPClient() *http.Client { transport := &http.Transport{ MaxIdleConns: 100, // 最大空闲连接数 MaxIdleConnsPerHost: 10, // 每个主机的最大空闲连接数 MaxConnsPerHost: 10, // 每个主机的最大连接数 IdleConnTimeout: 90 * time.Second, // 空闲连接超时时间 TLSHandshakeTimeout: 10 * time.Second, // TLS 握手超时时间 } return &http.Client{ Transport: transport, Timeout: 30 * time.Second, // 请求超时时间 } } // 使用共享的 HTTP 客户端 var httpClient = createHTTPClient() func callAPI() (*http.Response, error) { req, err := http.NewRequest("POST", "https://trafficapi.fcluadecodex.xyz/v1/chat/completions", nil) if err != nil { return nil, err } return httpClient.Do(req) } ``` **Go SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址 - `http_client`: 可选,自定义 HTTP 客户端 **聊天完成参数:** - `Model`: 必填,模型名称 - `Messages`: 必填,消息数组 - `MaxTokens`: 可选,最大生成 token 数 - `Temperature`: 可选,温度参数 - `Stream`: 可选,是否启用流式响应 - `TopP`: 可选,核采样参数 - `FrequencyPenalty`: 可选,频率惩罚 - `PresencePenalty`: 可选,存在惩罚 ### 2. 请求批量化 对于多个独立的请求,可以考虑批量处理: ```python from concurrent.futures import ThreadPoolExecutor, as_completed def batch_process_requests(requests_data, max_workers=5): """批量处理请求""" results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务 future_to_request = { executor.submit(process_single_request, data): data for data in requests_data } # 收集结果 for future in as_completed(future_to_request): try: result = future.result() results.append(result) except Exception as e: print(f"Request failed: {e}") results.append(None) return results ``` **Python SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址,默认为官方地址 - `timeout`: 可选,请求超时时间(秒) - `max_retries`: 可选,最大重试次数 **聊天完成参数:** - `model`: 必填,模型名称 - `messages`: 必填,消息列表,格式为 `[{"role": "user", "content": "..."}]` - `max_tokens`: 可选,最大生成 token 数 - `temperature`: 可选,温度参数(0.0-2.0) - `stream`: 可选,是否启用流式响应 - `top_p`: 可选,核采样参数 - `frequency_penalty`: 可选,频率惩罚 - `presence_penalty`: 可选,存在惩罚 ### 3. 缓存策略 对于重复的查询,实现缓存机制: ```python import hashlib import pickle from datetime import datetime, timedelta class APICache: def __init__(self, ttl_seconds=300): self.cache = {} self.ttl = ttl_seconds def get_cache_key(self, endpoint, params): """生成缓存键""" key_data = f"{endpoint}:{json.dumps(params, sort_keys=True)}" return hashlib.md5(key_data.encode()).hexdigest() def get(self, endpoint, params): """获取缓存""" key = self.get_cache_key(endpoint, params) if key in self.cache: cached_data, timestamp = self.cache[key] if datetime.now() - timestamp < timedelta(seconds=self.ttl): return cached_data return None def set(self, endpoint, params, data): """设置缓存""" key = self.get_cache_key(endpoint, params) self.cache[key] = (data, datetime.now()) def clear(self): """清空缓存""" self.cache.clear() # 使用缓存 cache = APICache(ttl_seconds=600) def cached_api_call(endpoint, params): # 检查缓存 cached_result = cache.get(endpoint, params) if cached_result: return cached_result # 调用 API result = call_api(endpoint, params) # 缓存结果 cache.set(endpoint, params, result) return result ``` **Python SDK 参数说明:** **初始化参数:** - `api_key`: 必填,TrafficAPI 的 API Key - `base_url`: 可选,TrafficAPI 服务地址,默认为官方地址 - `timeout`: 可选,请求超时时间(秒) - `max_retries`: 可选,最大重试次数 **聊天完成参数:** - `model`: 必填,模型名称 - `messages`: 必填,消息列表,格式为 `[{"role": "user", "content": "..."}]` - `max_tokens`: 可选,最大生成 token 数 - `temperature`: 可选,温度参数(0.0-2.0) - `stream`: 可选,是否启用流式响应 - `top_p`: 可选,核采样参数 - `frequency_penalty`: 可选,频率惩罚 - `presence_penalty`: 可选,存在惩罚 ### 4. 安全最佳实践 1. **API Key 管理**: - 定期轮换 API Key - 使用环境变量存储敏感信息 - 实现 Key 的访问控制 2. **请求验证**: - 验证输入参数 - 限制请求大小 - 实现速率限制 3. **日志和审计**: - 记录所有 API 调用 - 实现操作审计 - 监控异常行为 ## 注意事项 ### 1. 服务条款合规性 使用 TrafficAPI 调用 AI 模型服务时,请注意: - 遵守上游服务提供商(Anthropic、OpenAI、Google)的服务条款 - 了解并遵守相关法律法规 - 合理使用 API 资源,避免滥用 ### 2. 安全性考虑 1. **API Key 保护**: - 不要在客户端代码中硬编码 API Key - 使用环境变量或密钥管理服务 - 定期轮换 API Key 2. **请求安全**: - 验证用户输入,防止注入攻击 - 实现请求速率限制 - 监控异常请求模式 3. **数据隐私**: - 避免传输敏感个人信息 - 了解数据在传输和存储中的加密情况 - 遵守数据保护法规(如 GDPR、CCPA) ### 3. 成本控制 1. **用量监控**: - 实时监控 Token 使用量 - 设置用量告警阈值 - 定期分析成本趋势 2. **优化策略**: - 使用缓存减少重复请求 - 优化提示词减少 Token 消耗 - 考虑使用成本更低的模型 3. **预算管理**: - 设置月度预算限制 - 实现自动化的成本控制 - 定期审查成本效益