Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
陈曦
sub2api
Commits
68f67198
Commit
68f67198
authored
Apr 10, 2026
by
陈曦
Browse files
解决前端在生产环境构建失败的问题
parent
058c3bd8
Pipeline
#82019
passed with stage
in 3 minutes and 54 seconds
Changes
2
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
frontend/src/assets/trafficapi_ai_call_documentation.md
0 → 100644
View file @
68f67198
# TrafficAPI AI 模型调用文档
## 概述
TrafficAPI 是一个 AI API 网关平台,用于分发和管理 AI 产品订阅的 API 配额。它支持 Claude、OpenAI、Gemini 等多种 AI 模型,提供统一的 API 接口和智能调度功能。
## 接入地址
| 项目 | 值 |
|------|----|
| Base URL |
`https://trafficapi.fcluadecodex.xyz`
|
| Claude 端点 |
`https://trafficapi.fcluadecodex.xyz/v1/messages`
|
| OpenAI 端点 |
`https://trafficapi.fcluadecodex.xyz/v1/chat/completions`
|
| Gemini 端点 |
`https://trafficapi.fcluadecodex.xyz/v1beta/models/{model}%3AgenerateContent`
|
> **注意**:Gemini 端点路径中的 `%3A` 是冒号 `:` 的 URL 编码,curl 及各语言 HTTP 客户端需使用编码后的形式。
## 环境变量约定
文档中所有调用示例均通过以下环境变量引用凭据,请在运行前预先配置:
```
bash
export
TRAFFICAPI_BASE_URL
=
"https://trafficapi.fcluadecodex.xyz"
export
CLAUDE_API_KEY
=
"your-claude-api-key"
export
OPENAI_API_KEY
=
"your-openai-api-key"
export
GEMINI_API_KEY
=
"your-gemini-api-key"
```
## 目录
1.
[
TrafficAPI 基本原理
](
#TrafficAPI-基本原理
)
2.
[
认证和 API Key 获取方式
](
#认证和-api-key-获取方式
)
3.
[
Claude 模型调用
](
#claude-模型调用
)
-
[
HTTP API 调用
](
#http-api-调用
)
-
[
SDK 调用
](
#sdk-调用
)
4.
[
OpenAI 模型调用
](
#openai-模型调用
)
-
[
HTTP API 调用
](
#http-api-调用-1
)
-
[
SDK 调用
](
#sdk-调用-1
)
5.
[
Gemini 模型调用
](
#gemini-模型调用
)
-
[
HTTP API 调用
](
#http-api-调用-2
)
-
[
SDK 调用
](
#sdk-调用-2
)
6.
[
流式响应处理
](
#流式响应处理
)
7.
[
错误处理和重试机制
](
#错误处理和重试机制
)
8.
[
性能优化建议
](
#性能优化建议
)
## TrafficAPI 基本原理
### 架构设计
TrafficAPI 作为 AI API 中转服务,其核心原理如下:
1.
**统一网关层**
:提供标准化的 API 端点,兼容 Claude、OpenAI、Gemini 等不同厂商的 API 协议
2.
**智能调度**
:根据账号状态、负载情况、成本等因素智能选择上游账号
3.
**粘性会话**
:支持会话级别的账号绑定,确保同一会话的请求路由到同一上游账号
4.
**配额管理**
:精确追踪 Token 使用量,实现按量计费
5.
**并发控制**
:用户级和账号级的并发限制,防止资源滥用
### 核心功能
-
**多账号管理**
:支持多种上游账号类型(OAuth、API Key)
-
**API Key 分发**
:为用户生成和管理 API Key
-
**精确计费**
:Token 级别的用量追踪和成本计算
-
**智能调度**
:智能账号选择,支持粘性会话
-
**并发控制**
:用户级和账号级并发限制
-
**速率限制**
:可配置的请求和 Token 速率限制
## 认证和 API Key 获取方式
### 1. 管理员获取 API Key
1.
登录 TrafficAPI 管理后台
2.
进入「API Keys」管理页面
3.
点击「Create API Key」生成新的 API Key
4.
复制生成的 API Key(格式:
`sk-xxxxxxxxxxxxxxxx`
)
### 2. 用户获取 API Key
1.
用户注册并登录 TrafficAPI 平台
2.
在用户面板中查看或生成 API Key
3.
管理员可以为用户分配 API Key
### 3. API Key 使用方式
在 HTTP 请求头中添加:
```
http
Authorization: Bearer $YOUR_API_KEY
```
或者使用
`x-api-key`
头:
```
http
x-api-key: $YOUR_API_KEY
```
## HTTP 请求参数详解
### 请求头参数
| 参数名 | 是否必填 | 说明 | 示例值 |
|--------|----------|------|--------|
| Authorization | 是 | Bearer Token 认证 |
`Bearer sk-xxx`
|
| Content-Type | 是 | 请求体格式 |
`application/json`
|
| Accept | 否 | 响应格式 |
`application/json`
或
`text/event-stream`
|
| User-Agent | 否 | 客户端标识 |
`MyApp/1.0`
|
| X-Request-ID | 否 | 请求唯一标识 |
`req_123456`
|
### 请求体参数(通用)
| 参数名 | 类型 | 是否必填 | 说明 | 示例值 |
|--------|------|----------|------|--------|
| model | string | 是 | 模型名称 |
`claude-3-sonnet-20241022`
|
| messages | array | 是 | 消息数组 |
`[{"role": "user", "content": "Hello"}]`
|
| max_tokens | integer | 否 | 最大生成 token 数 |
`1000`
|
| temperature | number | 否 | 温度参数(0.0-2.0) |
`0.7`
|
| stream | boolean | 否 | 是否启用流式响应 |
`true`
|
| top_p | number | 否 | 核采样参数(0.0-1.0) |
`0.9`
|
| frequency_penalty | number | 否 | 频率惩罚(-2.0-2.0) |
`0.0`
|
| presence_penalty | number | 否 | 存在惩罚(-2.0-2.0) |
`0.0`
|
### Claude 特有参数
| 参数名 | 类型 | 是否必填 | 说明 | 示例值 |
|--------|------|----------|------|--------|
| system | string | 否 | 系统提示 |
`"You are a helpful assistant."`
|
| max_tokens | integer | 否 | 最大生成 token 数 |
`4096`
|
### OpenAI 特有参数
| 参数名 | 类型 | 是否必填 | 说明 | 示例值 |
|--------|------|----------|------|--------|
| n | integer | 否 | 生成多个选择 |
`1`
|
| stop | string/array | 否 | 停止序列 |
`["\n", "Human:"]`
|
| logprobs | integer | 否 | 返回 token 对数概率 |
`null`
|
### Gemini 特有参数
| 参数名 | 类型 | 是否必填 | 说明 | 示例值 |
|--------|------|----------|------|--------|
| safetySettings | array | 否 | 安全设置 |
`[{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}]`
|
| generationConfig | object | 否 | 生成配置 |
`{"temperature": 0.7, "maxOutputTokens": 1000}`
|
## 响应参数详解
### 成功响应
```
json
{
"id"
:
"chatcmpl-123"
,
"object"
:
"chat.completion"
,
"created"
:
1677652288
,
"model"
:
"claude-3-sonnet-20241022"
,
"choices"
:
[
{
"index"
:
0
,
"message"
:
{
"role"
:
"assistant"
,
"content"
:
"Hello! How can I help you today?"
},
"finish_reason"
:
"stop"
}
],
"usage"
:
{
"prompt_tokens"
:
10
,
"completion_tokens"
:
15
,
"total_tokens"
:
25
}
}
```
### 响应字段说明
| 字段名 | 类型 | 说明 |
|--------|------|------|
| id | string | 请求唯一标识 |
| object | string | 对象类型 |
| created | integer | 创建时间戳 |
| model | string | 使用的模型 |
| choices | array | 生成结果数组 |
| usage | object | Token 使用统计 |
| choices[].index | integer | 选择索引 |
| choices[].message | object | 消息内容 |
| choices[].finish_reason | string | 完成原因 |
| usage.prompt_tokens | integer | 提示 token 数 |
| usage.completion_tokens | integer | 生成 token 数 |
| usage.total_tokens | integer | 总 token 数 |
## Claude 模型调用
### HTTP API 调用
#### 基础信息
-
**端点**
:
`/v1/messages`
-
**协议**
:兼容 Anthropic Claude API
-
**Content-Type**
:
`application/json`
#### curl 示例
```
bash
# 非流式调用
curl
-X
POST
"https://trafficapi.fcluadecodex.xyz/v1/messages"
\
-H
"Authorization: Bearer
$CLAUDE_API_KEY
"
\
-H
"Content-Type: application/json"
\
-d
'{
"model": "claude-sonnet-4-6",
"max_tokens": 1024,
"messages": [
{
"role": "user",
"content": "Hello, how are you?"
}
]
}'
# 流式调用
curl
-X
POST
"https://trafficapi.fcluadecodex.xyz/v1/messages"
\
-H
"Authorization: Bearer
$CLAUDE_API_KEY
"
\
-H
"Content-Type: application/json"
\
-H
"anthropic-version: 2023-06-01"
\
-d
'{
"model": "claude-sonnet-4-6",
"max_tokens": 1024,
"messages": [
{
"role": "user",
"content": "Hello, how are you?"
}
],
"stream": true
}'
```
#### Python 示例
```
python
import
requests
import
json
class
TrafficAPIClient
:
def
__init__
(
self
,
base_url
,
api_key
):
self
.
base_url
=
base_url
self
.
api_key
=
api_key
self
.
headers
=
{
"Authorization"
:
f
"Bearer
{
api_key
}
"
,
"Content-Type"
:
"application/json"
,
"anthropic-version"
:
"2023-06-01"
}
def
create_message
(
self
,
model
,
messages
,
max_tokens
=
1024
,
stream
=
False
):
url
=
f
"
{
self
.
base_url
}
/v1/messages"
payload
=
{
"model"
:
model
,
"max_tokens"
:
max_tokens
,
"messages"
:
messages
,
"stream"
:
stream
}
response
=
requests
.
post
(
url
,
headers
=
self
.
headers
,
json
=
payload
,
stream
=
stream
)
if
stream
:
return
self
.
_handle_stream_response
(
response
)
else
:
return
response
.
json
()
def
_handle_stream_response
(
self
,
response
):
for
line
in
response
.
iter_lines
():
if
line
:
line
=
line
.
decode
(
'utf-8'
)
if
line
.
startswith
(
'data: '
):
data
=
line
[
6
:]
if
data
==
'[DONE]'
:
break
try
:
yield
json
.
loads
(
data
)
except
json
.
JSONDecodeError
:
continue
# 使用示例
import
os
client
=
TrafficAPIClient
(
base_url
=
os
.
environ
.
get
(
"TRAFFICAPI_BASE_URL"
,
"https://trafficapi.fcluadecodex.xyz"
),
api_key
=
os
.
environ
[
"CLAUDE_API_KEY"
]
)
# 非流式调用
response
=
client
.
create_message
(
model
=
"claude-sonnet-4-6"
,
messages
=
[
{
"role"
:
"user"
,
"content"
:
"Hello, how are you?"
}
],
max_tokens
=
1024
,
stream
=
False
)
print
(
response
)
# 流式调用
for
chunk
in
client
.
create_message
(
model
=
"claude-sonnet-4-6"
,
messages
=
[
{
"role"
:
"user"
,
"content"
:
"Hello, how are you?"
}
],
max_tokens
=
1024
,
stream
=
True
):
print
(
chunk
)
```
**Python SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址,默认为官方地址
-
`timeout`
: 可选,请求超时时间(秒)
-
`max_retries`
: 可选,最大重试次数
**聊天完成参数:**
-
`model`
: 必填,模型名称
-
`messages`
: 必填,消息列表,格式为
`[{"role": "user", "content": "..."}]`
-
`max_tokens`
: 可选,最大生成 token 数
-
`temperature`
: 可选,温度参数(0.0-2.0)
-
`stream`
: 可选,是否启用流式响应
-
`top_p`
: 可选,核采样参数
-
`frequency_penalty`
: 可选,频率惩罚
-
`presence_penalty`
: 可选,存在惩罚
#### Go 示例
```
go
package
main
import
(
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
)
type
Message
struct
{
Role
string
`json:"role"`
Content
string
`json:"content"`
}
type
ClaudeRequest
struct
{
Model
string
`json:"model"`
MaxTokens
int
`json:"max_tokens"`
Messages
[]
Message
`json:"messages"`
Stream
bool
`json:"stream"`
}
type
ClaudeResponse
struct
{
ID
string
`json:"id"`
Type
string
`json:"type"`
Role
string
`json:"role"`
Content
[]
struct
{
Type
string
`json:"type"`
Text
string
`json:"text"`
}
`json:"content"`
Model
string
`json:"model"`
StopReason
string
`json:"stop_reason"`
StopSequence
string
`json:"stop_sequence"`
Usage
struct
{
InputTokens
int
`json:"input_tokens"`
OutputTokens
int
`json:"output_tokens"`
}
`json:"usage"`
}
type
TrafficAPIClient
struct
{
BaseURL
string
APIKey
string
}
func
NewTrafficAPIClient
(
baseURL
,
apiKey
string
)
*
TrafficAPIClient
{
return
&
TrafficAPIClient
{
BaseURL
:
baseURL
,
APIKey
:
apiKey
,
}
}
func
(
c
*
TrafficAPIClient
)
CreateMessage
(
req
ClaudeRequest
)
(
*
ClaudeResponse
,
error
)
{
url
:=
c
.
BaseURL
+
"/v1/messages"
reqBody
,
err
:=
json
.
Marshal
(
req
)
if
err
!=
nil
{
return
nil
,
err
}
httpReq
,
err
:=
http
.
NewRequest
(
"POST"
,
url
,
bytes
.
NewBuffer
(
reqBody
))
if
err
!=
nil
{
return
nil
,
err
}
httpReq
.
Header
.
Set
(
"Authorization"
,
"Bearer "
+
c
.
APIKey
)
httpReq
.
Header
.
Set
(
"Content-Type"
,
"application/json"
)
httpReq
.
Header
.
Set
(
"anthropic-version"
,
"2023-06-01"
)
client
:=
&
http
.
Client
{}
resp
,
err
:=
client
.
Do
(
httpReq
)
if
err
!=
nil
{
return
nil
,
err
}
defer
resp
.
Body
.
Close
()
if
resp
.
StatusCode
!=
http
.
StatusOK
{
body
,
_
:=
io
.
ReadAll
(
resp
.
Body
)
return
nil
,
fmt
.
Errorf
(
"API error: %s, body: %s"
,
resp
.
Status
,
string
(
body
))
}
var
claudeResp
ClaudeResponse
if
err
:=
json
.
NewDecoder
(
resp
.
Body
)
.
Decode
(
&
claudeResp
);
err
!=
nil
{
return
nil
,
err
}
return
&
claudeResp
,
nil
}
func
(
c
*
TrafficAPIClient
)
CreateMessageStream
(
req
ClaudeRequest
)
(
<-
chan
string
,
error
)
{
req
.
Stream
=
true
url
:=
c
.
BaseURL
+
"/v1/messages"
reqBody
,
err
:=
json
.
Marshal
(
req
)
if
err
!=
nil
{
return
nil
,
err
}
httpReq
,
err
:=
http
.
NewRequest
(
"POST"
,
url
,
bytes
.
NewBuffer
(
reqBody
))
if
err
!=
nil
{
return
nil
,
err
}
httpReq
.
Header
.
Set
(
"Authorization"
,
"Bearer "
+
c
.
APIKey
)
httpReq
.
Header
.
Set
(
"Content-Type"
,
"application/json"
)
httpReq
.
Header
.
Set
(
"anthropic-version"
,
"2023-06-01"
)
client
:=
&
http
.
Client
{}
resp
,
err
:=
client
.
Do
(
httpReq
)
if
err
!=
nil
{
return
nil
,
err
}
if
resp
.
StatusCode
!=
http
.
StatusOK
{
resp
.
Body
.
Close
()
body
,
_
:=
io
.
ReadAll
(
resp
.
Body
)
return
nil
,
fmt
.
Errorf
(
"API error: %s, body: %s"
,
resp
.
Status
,
string
(
body
))
}
ch
:=
make
(
chan
string
)
go
func
()
{
defer
resp
.
Body
.
Close
()
defer
close
(
ch
)
reader
:=
bufio
.
NewReader
(
resp
.
Body
)
for
{
line
,
err
:=
reader
.
ReadString
(
'\n'
)
if
err
!=
nil
{
if
err
==
io
.
EOF
{
break
}
return
}
line
=
strings
.
TrimSpace
(
line
)
if
strings
.
HasPrefix
(
line
,
"data: "
)
{
data
:=
line
[
6
:
]
if
data
==
"[DONE]"
{
break
}
ch
<-
data
}
}
}()
return
ch
,
nil
}
// 使用示例
func
main
()
{
baseURL
:=
os
.
Getenv
(
"TRAFFICAPI_BASE_URL"
)
if
baseURL
==
""
{
baseURL
=
"https://trafficapi.fcluadecodex.xyz"
}
client
:=
NewTrafficAPIClient
(
baseURL
,
os
.
Getenv
(
"CLAUDE_API_KEY"
))
// 非流式调用
req
:=
ClaudeRequest
{
Model
:
"claude-sonnet-4-6"
,
MaxTokens
:
1024
,
Messages
:
[]
Message
{
{
Role
:
"user"
,
Content
:
"Hello, how are you?"
},
},
Stream
:
false
,
}
resp
,
err
:=
client
.
CreateMessage
(
req
)
if
err
!=
nil
{
fmt
.
Printf
(
"Error: %v
\n
"
,
err
)
return
}
fmt
.
Printf
(
"Response: %+v
\n
"
,
resp
)
// 流式调用
req
.
Stream
=
true
ch
,
err
:=
client
.
CreateMessageStream
(
req
)
if
err
!=
nil
{
fmt
.
Printf
(
"Error: %v
\n
"
,
err
)
return
}
for
chunk
:=
range
ch
{
fmt
.
Printf
(
"Chunk: %s
\n
"
,
chunk
)
}
}
```
**Go SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址
-
`http_client`
: 可选,自定义 HTTP 客户端
**聊天完成参数:**
-
`Model`
: 必填,模型名称
-
`Messages`
: 必填,消息数组
-
`MaxTokens`
: 可选,最大生成 token 数
-
`Temperature`
: 可选,温度参数
-
`Stream`
: 可选,是否启用流式响应
-
`TopP`
: 可选,核采样参数
-
`FrequencyPenalty`
: 可选,频率惩罚
-
`PresencePenalty`
: 可选,存在惩罚
### SDK 调用
#### 使用官方 Anthropic SDK
```
python
import
anthropic
from
anthropic
import
Anthropic
# 配置 TrafficAPI 作为代理
import
os
client
=
Anthropic
(
api_key
=
os
.
environ
[
"CLAUDE_API_KEY"
],
base_url
=
os
.
environ
.
get
(
"TRAFFICAPI_BASE_URL"
,
"https://trafficapi.fcluadecodex.xyz"
)
# 无需 /v1 后缀,SDK 会自动添加
)
# 调用 Claude
response
=
client
.
messages
.
create
(
model
=
"claude-sonnet-4-6"
,
max_tokens
=
1024
,
messages
=
[
{
"role"
:
"user"
,
"content"
:
"Hello, how are you?"
}
]
)
print
(
response
.
content
[
0
].
text
)
```
**Python SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址,默认为官方地址
-
`timeout`
: 可选,请求超时时间(秒)
-
`max_retries`
: 可选,最大重试次数
**聊天完成参数:**
-
`model`
: 必填,模型名称
-
`messages`
: 必填,消息列表,格式为
`[{"role": "user", "content": "..."}]`
-
`max_tokens`
: 可选,最大生成 token 数
-
`temperature`
: 可选,温度参数(0.0-2.0)
-
`stream`
: 可选,是否启用流式响应
-
`top_p`
: 可选,核采样参数
-
`frequency_penalty`
: 可选,频率惩罚
-
`presence_penalty`
: 可选,存在惩罚
#### 使用 LangChain
```
python
from
langchain.chat_models
import
ChatAnthropic
from
langchain.schema
import
HumanMessage
# 配置 LangChain
import
os
chat
=
ChatAnthropic
(
anthropic_api_key
=
os
.
environ
[
"CLAUDE_API_KEY"
],
anthropic_api_url
=
os
.
environ
.
get
(
"TRAFFICAPI_BASE_URL"
,
"https://trafficapi.fcluadecodex.xyz"
),
model
=
"claude-sonnet-4-6"
)
# 调用
messages
=
[
HumanMessage
(
content
=
"Hello, how are you?"
)]
response
=
chat
(
messages
)
print
(
response
.
content
)
```
**Python SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址,默认为官方地址
-
`timeout`
: 可选,请求超时时间(秒)
-
`max_retries`
: 可选,最大重试次数
**聊天完成参数:**
-
`model`
: 必填,模型名称
-
`messages`
: 必填,消息列表,格式为
`[{"role": "user", "content": "..."}]`
-
`max_tokens`
: 可选,最大生成 token 数
-
`temperature`
: 可选,温度参数(0.0-2.0)
-
`stream`
: 可选,是否启用流式响应
-
`top_p`
: 可选,核采样参数
-
`frequency_penalty`
: 可选,频率惩罚
-
`presence_penalty`
: 可选,存在惩罚
## OpenAI 模型调用
### HTTP API 调用
#### 基础信息
-
**端点**
:
`/v1/chat/completions`
-
**协议**
:兼容 OpenAI Chat Completions API
-
**Content-Type**
:
`application/json`
#### curl 示例
```
bash
# 非流式调用
curl
-X
POST
"https://trafficapi.fcluadecodex.xyz/v1/chat/completions"
\
-H
"Authorization: Bearer
$OPENAI_API_KEY
"
\
-H
"Content-Type: application/json"
\
-d
'{
"model": "gpt-5.2",
"messages": [
{
"role": "user",
"content": "Hello, how are you?"
}
],
"max_tokens": 1024
}'
# 流式调用
curl
-X
POST
"https://trafficapi.fcluadecodex.xyz/v1/chat/completions"
\
-H
"Authorization: Bearer
$OPENAI_API_KEY
"
\
-H
"Content-Type: application/json"
\
-d
'{
"model": "gpt-5.2",
"messages": [
{
"role": "user",
"content": "Hello, how are you?"
}
],
"max_tokens": 1024,
"stream": true
}'
```
#### Python 示例
```
python
import
requests
import
json
class
TrafficAPIOpenAIClient
:
def
__init__
(
self
,
base_url
,
api_key
):
self
.
base_url
=
base_url
self
.
api_key
=
api_key
self
.
headers
=
{
"Authorization"
:
f
"Bearer
{
api_key
}
"
,
"Content-Type"
:
"application/json"
}
def
create_chat_completion
(
self
,
model
,
messages
,
max_tokens
=
1024
,
stream
=
False
):
url
=
f
"
{
self
.
base_url
}
/v1/chat/completions"
payload
=
{
"model"
:
model
,
"messages"
:
messages
,
"max_tokens"
:
max_tokens
,
"stream"
:
stream
}
response
=
requests
.
post
(
url
,
headers
=
self
.
headers
,
json
=
payload
,
stream
=
stream
)
if
stream
:
return
self
.
_handle_stream_response
(
response
)
else
:
return
response
.
json
()
def
_handle_stream_response
(
self
,
response
):
for
line
in
response
.
iter_lines
():
if
line
:
line
=
line
.
decode
(
'utf-8'
)
if
line
.
startswith
(
'data: '
):
data
=
line
[
6
:]
if
data
==
'[DONE]'
:
break
try
:
yield
json
.
loads
(
data
)
except
json
.
JSONDecodeError
:
continue
# 使用示例
import
os
client
=
TrafficAPIOpenAIClient
(
base_url
=
os
.
environ
.
get
(
"TRAFFICAPI_BASE_URL"
,
"https://trafficapi.fcluadecodex.xyz"
),
api_key
=
os
.
environ
[
"OPENAI_API_KEY"
]
)
# 非流式调用
response
=
client
.
create_chat_completion
(
model
=
"gpt-5.2"
,
messages
=
[
{
"role"
:
"user"
,
"content"
:
"Hello, how are you?"
}
],
max_tokens
=
1024
,
stream
=
False
)
print
(
response
[
"choices"
][
0
][
"message"
][
"content"
])
# 流式调用
for
chunk
in
client
.
create_chat_completion
(
model
=
"gpt-5.2"
,
messages
=
[
{
"role"
:
"user"
,
"content"
:
"Hello, how are you?"
}
],
max_tokens
=
1024
,
stream
=
True
):
if
"choices"
in
chunk
and
chunk
[
"choices"
]:
delta
=
chunk
[
"choices"
][
0
].
get
(
"delta"
,
{})
if
"content"
in
delta
:
print
(
delta
[
"content"
],
end
=
""
,
flush
=
True
)
```
**Python SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址,默认为官方地址
-
`timeout`
: 可选,请求超时时间(秒)
-
`max_retries`
: 可选,最大重试次数
**聊天完成参数:**
-
`model`
: 必填,模型名称
-
`messages`
: 必填,消息列表,格式为
`[{"role": "user", "content": "..."}]`
-
`max_tokens`
: 可选,最大生成 token 数
-
`temperature`
: 可选,温度参数(0.0-2.0)
-
`stream`
: 可选,是否启用流式响应
-
`top_p`
: 可选,核采样参数
-
`frequency_penalty`
: 可选,频率惩罚
-
`presence_penalty`
: 可选,存在惩罚
#### Go 示例
```
go
package
main
import
(
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
)
type
OpenAIMessage
struct
{
Role
string
`json:"role"`
Content
string
`json:"content"`
}
type
OpenAIRequest
struct
{
Model
string
`json:"model"`
Messages
[]
OpenAIMessage
`json:"messages"`
MaxTokens
int
`json:"max_tokens"`
Stream
bool
`json:"stream"`
}
type
OpenAIResponse
struct
{
ID
string
`json:"id"`
Object
string
`json:"object"`
Created
int64
`json:"created"`
Model
string
`json:"model"`
Choices
[]
struct
{
Index
int
`json:"index"`
Message
struct
{
Role
string
`json:"role"`
Content
string
`json:"content"`
}
`json:"message"`
FinishReason
string
`json:"finish_reason"`
}
`json:"choices"`
Usage
struct
{
PromptTokens
int
`json:"prompt_tokens"`
CompletionTokens
int
`json:"completion_tokens"`
TotalTokens
int
`json:"total_tokens"`
}
`json:"usage"`
}
type
TrafficAPIOpenAIClient
struct
{
BaseURL
string
APIKey
string
}
func
NewTrafficAPIOpenAIClient
(
baseURL
,
apiKey
string
)
*
TrafficAPIOpenAIClient
{
return
&
TrafficAPIOpenAIClient
{
BaseURL
:
baseURL
,
APIKey
:
apiKey
,
}
}
func
(
c
*
TrafficAPIOpenAIClient
)
CreateChatCompletion
(
req
OpenAIRequest
)
(
*
OpenAIResponse
,
error
)
{
url
:=
c
.
BaseURL
+
"/v1/chat/completions"
reqBody
,
err
:=
json
.
Marshal
(
req
)
if
err
!=
nil
{
return
nil
,
err
}
httpReq
,
err
:=
http
.
NewRequest
(
"POST"
,
url
,
bytes
.
NewBuffer
(
reqBody
))
if
err
!=
nil
{
return
nil
,
err
}
httpReq
.
Header
.
Set
(
"Authorization"
,
"Bearer "
+
c
.
APIKey
)
httpReq
.
Header
.
Set
(
"Content-Type"
,
"application/json"
)
client
:=
&
http
.
Client
{}
resp
,
err
:=
client
.
Do
(
httpReq
)
if
err
!=
nil
{
return
nil
,
err
}
defer
resp
.
Body
.
Close
()
if
resp
.
StatusCode
!=
http
.
StatusOK
{
body
,
_
:=
io
.
ReadAll
(
resp
.
Body
)
return
nil
,
fmt
.
Errorf
(
"API error: %s, body: %s"
,
resp
.
Status
,
string
(
body
))
}
var
openaiResp
OpenAIResponse
if
err
:=
json
.
NewDecoder
(
resp
.
Body
)
.
Decode
(
&
openaiResp
);
err
!=
nil
{
return
nil
,
err
}
return
&
openaiResp
,
nil
}
func
(
c
*
TrafficAPIOpenAIClient
)
CreateChatCompletionStream
(
req
OpenAIRequest
)
(
<-
chan
string
,
error
)
{
req
.
Stream
=
true
url
:=
c
.
BaseURL
+
"/v1/chat/completions"
reqBody
,
err
:=
json
.
Marshal
(
req
)
if
err
!=
nil
{
return
nil
,
err
}
httpReq
,
err
:=
http
.
NewRequest
(
"POST"
,
url
,
bytes
.
NewBuffer
(
reqBody
))
if
err
!=
nil
{
return
nil
,
err
}
httpReq
.
Header
.
Set
(
"Authorization"
,
"Bearer "
+
c
.
APIKey
)
httpReq
.
Header
.
Set
(
"Content-Type"
,
"application/json"
)
client
:=
&
http
.
Client
{}
resp
,
err
:=
client
.
Do
(
httpReq
)
if
err
!=
nil
{
return
nil
,
err
}
if
resp
.
StatusCode
!=
http
.
StatusOK
{
resp
.
Body
.
Close
()
body
,
_
:=
io
.
ReadAll
(
resp
.
Body
)
return
nil
,
fmt
.
Errorf
(
"API error: %s, body: %s"
,
resp
.
Status
,
string
(
body
))
}
ch
:=
make
(
chan
string
)
go
func
()
{
defer
resp
.
Body
.
Close
()
defer
close
(
ch
)
reader
:=
bufio
.
NewReader
(
resp
.
Body
)
for
{
line
,
err
:=
reader
.
ReadString
(
'\n'
)
if
err
!=
nil
{
if
err
==
io
.
EOF
{
break
}
return
}
line
=
strings
.
TrimSpace
(
line
)
if
strings
.
HasPrefix
(
line
,
"data: "
)
{
data
:=
line
[
6
:
]
if
data
==
"[DONE]"
{
break
}
ch
<-
data
}
}
}()
return
ch
,
nil
}
// 使用示例
func
main
()
{
baseURL
:=
os
.
Getenv
(
"TRAFFICAPI_BASE_URL"
)
if
baseURL
==
""
{
baseURL
=
"https://trafficapi.fcluadecodex.xyz"
}
client
:=
NewTrafficAPIOpenAIClient
(
baseURL
,
os
.
Getenv
(
"OPENAI_API_KEY"
))
// 非流式调用
req
:=
OpenAIRequest
{
Model
:
"gpt-5.2"
,
Messages
:
[]
OpenAIMessage
{
{
Role
:
"user"
,
Content
:
"Hello, how are you?"
},
},
MaxTokens
:
1024
,
Stream
:
false
,
}
resp
,
err
:=
client
.
CreateChatCompletion
(
req
)
if
err
!=
nil
{
fmt
.
Printf
(
"Error: %v
\n
"
,
err
)
return
}
fmt
.
Printf
(
"Response: %+v
\n
"
,
resp
)
// 流式调用
req
.
Stream
=
true
ch
,
err
:=
client
.
CreateChatCompletionStream
(
req
)
if
err
!=
nil
{
fmt
.
Printf
(
"Error: %v
\n
"
,
err
)
return
}
for
chunk
:=
range
ch
{
fmt
.
Printf
(
"Chunk: %s
\n
"
,
chunk
)
}
}
```
**Go SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址
-
`http_client`
: 可选,自定义 HTTP 客户端
**聊天完成参数:**
-
`Model`
: 必填,模型名称
-
`Messages`
: 必填,消息数组
-
`MaxTokens`
: 可选,最大生成 token 数
-
`Temperature`
: 可选,温度参数
-
`Stream`
: 可选,是否启用流式响应
-
`TopP`
: 可选,核采样参数
-
`FrequencyPenalty`
: 可选,频率惩罚
-
`PresencePenalty`
: 可选,存在惩罚
### SDK 调用
#### 使用官方 OpenAI SDK
```
python
import
openai
# 配置 TrafficAPI 作为代理
import
os
client
=
openai
.
OpenAI
(
api_key
=
os
.
environ
[
"OPENAI_API_KEY"
],
base_url
=
os
.
environ
.
get
(
"TRAFFICAPI_BASE_URL"
,
"https://trafficapi.fcluadecodex.xyz"
)
+
"/v1"
)
# 调用 GPT
response
=
client
.
chat
.
completions
.
create
(
model
=
"gpt-5.2"
,
messages
=
[
{
"role"
:
"user"
,
"content"
:
"Hello, how are you?"
}
],
max_tokens
=
1024
)
print
(
response
.
choices
[
0
].
message
.
content
)
```
**Python SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址,默认为官方地址
-
`timeout`
: 可选,请求超时时间(秒)
-
`max_retries`
: 可选,最大重试次数
**聊天完成参数:**
-
`model`
: 必填,模型名称
-
`messages`
: 必填,消息列表,格式为
`[{"role": "user", "content": "..."}]`
-
`max_tokens`
: 可选,最大生成 token 数
-
`temperature`
: 可选,温度参数(0.0-2.0)
-
`stream`
: 可选,是否启用流式响应
-
`top_p`
: 可选,核采样参数
-
`frequency_penalty`
: 可选,频率惩罚
-
`presence_penalty`
: 可选,存在惩罚
#### 使用 LangChain
```
python
from
langchain.chat_models
import
ChatOpenAI
from
langchain.schema
import
HumanMessage
# 配置 LangChain
import
os
chat
=
ChatOpenAI
(
openai_api_key
=
os
.
environ
[
"OPENAI_API_KEY"
],
openai_api_base
=
os
.
environ
.
get
(
"TRAFFICAPI_BASE_URL"
,
"https://trafficapi.fcluadecodex.xyz"
)
+
"/v1"
,
model_name
=
"gpt-5.2"
)
# 调用
messages
=
[
HumanMessage
(
content
=
"Hello, how are you?"
)]
response
=
chat
(
messages
)
print
(
response
.
content
)
```
**Python SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址,默认为官方地址
-
`timeout`
: 可选,请求超时时间(秒)
-
`max_retries`
: 可选,最大重试次数
**聊天完成参数:**
-
`model`
: 必填,模型名称
-
`messages`
: 必填,消息列表,格式为
`[{"role": "user", "content": "..."}]`
-
`max_tokens`
: 可选,最大生成 token 数
-
`temperature`
: 可选,温度参数(0.0-2.0)
-
`stream`
: 可选,是否启用流式响应
-
`top_p`
: 可选,核采样参数
-
`frequency_penalty`
: 可选,频率惩罚
-
`presence_penalty`
: 可选,存在惩罚
## Gemini 模型调用
### HTTP API 调用
#### 基础信息
-
**端点**
:
`/v1beta/models/{model}:generateContent`
-
**协议**
:兼容 Google Gemini API
-
**Content-Type**
:
`application/json`
#### curl 示例
```
bash
# 非流式调用
curl
-X
POST
"https://trafficapi.fcluadecodex.xyz/v1beta/models/gemini-3.1-flash-lite-preview%3AgenerateContent"
\
-H
"Authorization: Bearer
$GEMINI_API_KEY
"
\
-H
"Content-Type: application/json"
\
-d
'{
"contents": [
{
"parts": [
{
"text": "Hello, how are you?"
}
]
}
]
}'
# 流式调用(使用 streamGenerateContent 端点,注意 %3A 是 : 的 URL 编码)
curl
-X
POST
"https://trafficapi.fcluadecodex.xyz/v1beta/models/gemini-3.1-flash-lite-preview%3AstreamGenerateContent"
\
-H
"Authorization: Bearer
$GEMINI_API_KEY
"
\
-H
"Content-Type: application/json"
\
-d
'{
"contents": [
{
"parts": [
{
"text": "Hello, how are you?"
}
]
}
]
}'
```
#### Python 示例
```
python
import
requests
import
json
class
TrafficAPIGeminiClient
:
def
__init__
(
self
,
base_url
,
api_key
):
self
.
base_url
=
base_url
self
.
api_key
=
api_key
self
.
headers
=
{
"Authorization"
:
f
"Bearer
{
api_key
}
"
,
"Content-Type"
:
"application/json"
}
def
generate_content
(
self
,
model
,
contents
):
# 注意:路径中的 : 需要 URL 编码为 %3A
url
=
f
"
{
self
.
base_url
}
/v1beta/models/
{
model
}
%3AgenerateContent"
payload
=
{
"contents"
:
contents
}
response
=
requests
.
post
(
url
,
headers
=
self
.
headers
,
json
=
payload
)
return
response
.
json
()
def
generate_content_stream
(
self
,
model
,
contents
):
# 注意:路径中的 : 需要 URL 编码为 %3A
url
=
f
"
{
self
.
base_url
}
/v1beta/models/
{
model
}
%3AstreamGenerateContent"
payload
=
{
"contents"
:
contents
}
response
=
requests
.
post
(
url
,
headers
=
self
.
headers
,
json
=
payload
,
stream
=
True
)
return
self
.
_handle_stream_response
(
response
)
def
_handle_stream_response
(
self
,
response
):
# streamGenerateContent 返回 SSE 格式,每行以 "data: " 开头
for
line
in
response
.
iter_lines
():
if
line
:
line
=
line
.
decode
(
'utf-8'
)
if
line
.
startswith
(
'data: '
):
data
=
line
[
6
:]
if
data
==
'[DONE]'
:
break
try
:
yield
json
.
loads
(
data
)
except
json
.
JSONDecodeError
:
continue
# 使用示例
import
os
client
=
TrafficAPIGeminiClient
(
base_url
=
os
.
environ
.
get
(
"TRAFFICAPI_BASE_URL"
,
"https://trafficapi.fcluadecodex.xyz"
),
api_key
=
os
.
environ
[
"GEMINI_API_KEY"
]
)
# 非流式调用
response
=
client
.
generate_content
(
model
=
"gemini-3.1-flash-lite-preview"
,
contents
=
[
{
"parts"
:
[
{
"text"
:
"Hello, how are you?"
}
]
}
],
stream
=
False
)
print
(
response
[
"candidates"
][
0
][
"content"
][
"parts"
][
0
][
"text"
])
# 流式调用(使用 streamGenerateContent 端点)
for
chunk
in
client
.
generate_content_stream
(
model
=
"gemini-3.1-flash-lite-preview"
,
contents
=
[
{
"parts"
:
[
{
"text"
:
"Hello, how are you?"
}
]
}
]
):
if
"candidates"
in
chunk
and
chunk
[
"candidates"
]:
for
candidate
in
chunk
[
"candidates"
]:
if
"content"
in
candidate
and
"parts"
in
candidate
[
"content"
]:
for
part
in
candidate
[
"content"
][
"parts"
]:
if
"text"
in
part
:
print
(
part
[
"text"
],
end
=
""
,
flush
=
True
)
```
**Python SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址,默认为官方地址
-
`timeout`
: 可选,请求超时时间(秒)
-
`max_retries`
: 可选,最大重试次数
**聊天完成参数:**
-
`model`
: 必填,模型名称
-
`messages`
: 必填,消息列表,格式为
`[{"role": "user", "content": "..."}]`
-
`max_tokens`
: 可选,最大生成 token 数
-
`temperature`
: 可选,温度参数(0.0-2.0)
-
`stream`
: 可选,是否启用流式响应
-
`top_p`
: 可选,核采样参数
-
`frequency_penalty`
: 可选,频率惩罚
-
`presence_penalty`
: 可选,存在惩罚
#### Go 示例
```
go
package
main
import
(
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
)
type
GeminiPart
struct
{
Text
string
`json:"text"`
}
type
GeminiContent
struct
{
Parts
[]
GeminiPart
`json:"parts"`
}
type
GeminiRequest
struct
{
Contents
[]
GeminiContent
`json:"contents"`
GenerationConfig
*
struct
{
CandidateCount
int
`json:"candidateCount"`
}
`json:"generationConfig,omitempty"`
}
type
GeminiResponse
struct
{
Candidates
[]
struct
{
Content
struct
{
Parts
[]
GeminiPart
`json:"parts"`
}
`json:"content"`
FinishReason
string
`json:"finishReason"`
}
`json:"candidates"`
UsageMetadata
*
struct
{
PromptTokenCount
int
`json:"promptTokenCount"`
CandidatesTokenCount
int
`json:"candidatesTokenCount"`
TotalTokenCount
int
`json:"totalTokenCount"`
}
`json:"usageMetadata"`
}
type
TrafficAPIGeminiClient
struct
{
BaseURL
string
APIKey
string
}
func
NewTrafficAPIGeminiClient
(
baseURL
,
apiKey
string
)
*
TrafficAPIGeminiClient
{
return
&
TrafficAPIGeminiClient
{
BaseURL
:
baseURL
,
APIKey
:
apiKey
,
}
}
func
(
c
*
TrafficAPIGeminiClient
)
GenerateContent
(
model
string
,
req
GeminiRequest
)
(
*
GeminiResponse
,
error
)
{
// 注意:路径中的 : 需要 URL 编码为 %3A(%%3A 在 Sprintf 中输出字面量 %3A)
url
:=
fmt
.
Sprintf
(
"%s/v1beta/models/%s%%3AgenerateContent"
,
c
.
BaseURL
,
model
)
reqBody
,
err
:=
json
.
Marshal
(
req
)
if
err
!=
nil
{
return
nil
,
err
}
httpReq
,
err
:=
http
.
NewRequest
(
"POST"
,
url
,
bytes
.
NewBuffer
(
reqBody
))
if
err
!=
nil
{
return
nil
,
err
}
httpReq
.
Header
.
Set
(
"Authorization"
,
"Bearer "
+
c
.
APIKey
)
httpReq
.
Header
.
Set
(
"Content-Type"
,
"application/json"
)
client
:=
&
http
.
Client
{}
resp
,
err
:=
client
.
Do
(
httpReq
)
if
err
!=
nil
{
return
nil
,
err
}
defer
resp
.
Body
.
Close
()
if
resp
.
StatusCode
!=
http
.
StatusOK
{
body
,
_
:=
io
.
ReadAll
(
resp
.
Body
)
return
nil
,
fmt
.
Errorf
(
"API error: %s, body: %s"
,
resp
.
Status
,
string
(
body
))
}
var
geminiResp
GeminiResponse
if
err
:=
json
.
NewDecoder
(
resp
.
Body
)
.
Decode
(
&
geminiResp
);
err
!=
nil
{
return
nil
,
err
}
return
&
geminiResp
,
nil
}
func
(
c
*
TrafficAPIGeminiClient
)
GenerateContentStream
(
model
string
,
req
GeminiRequest
)
(
<-
chan
string
,
error
)
{
url
:=
fmt
.
Sprintf
(
"%s/v1beta/models/%s%%3AstreamGenerateContent"
,
c
.
BaseURL
,
model
)
reqBody
,
err
:=
json
.
Marshal
(
req
)
if
err
!=
nil
{
return
nil
,
err
}
httpReq
,
err
:=
http
.
NewRequest
(
"POST"
,
url
,
bytes
.
NewBuffer
(
reqBody
))
if
err
!=
nil
{
return
nil
,
err
}
httpReq
.
Header
.
Set
(
"Authorization"
,
"Bearer "
+
c
.
APIKey
)
httpReq
.
Header
.
Set
(
"Content-Type"
,
"application/json"
)
client
:=
&
http
.
Client
{}
resp
,
err
:=
client
.
Do
(
httpReq
)
if
err
!=
nil
{
return
nil
,
err
}
if
resp
.
StatusCode
!=
http
.
StatusOK
{
resp
.
Body
.
Close
()
body
,
_
:=
io
.
ReadAll
(
resp
.
Body
)
return
nil
,
fmt
.
Errorf
(
"API error: %s, body: %s"
,
resp
.
Status
,
string
(
body
))
}
ch
:=
make
(
chan
string
)
go
func
()
{
defer
resp
.
Body
.
Close
()
defer
close
(
ch
)
// streamGenerateContent 返回 SSE 格式,每行以 "data: " 开头
scanner
:=
bufio
.
NewScanner
(
resp
.
Body
)
for
scanner
.
Scan
()
{
line
:=
scanner
.
Text
()
if
strings
.
HasPrefix
(
line
,
"data: "
)
{
data
:=
line
[
6
:
]
if
data
==
"[DONE]"
{
break
}
if
data
!=
""
{
ch
<-
data
}
}
}
}()
return
ch
,
nil
}
// 使用示例
func
main
()
{
baseURL
:=
os
.
Getenv
(
"TRAFFICAPI_BASE_URL"
)
if
baseURL
==
""
{
baseURL
=
"https://trafficapi.fcluadecodex.xyz"
}
client
:=
NewTrafficAPIGeminiClient
(
baseURL
,
os
.
Getenv
(
"GEMINI_API_KEY"
))
// 非流式调用
req
:=
GeminiRequest
{
Contents
:
[]
GeminiContent
{
{
Parts
:
[]
GeminiPart
{
{
Text
:
"Hello, how are you?"
},
},
},
},
}
resp
,
err
:=
client
.
GenerateContent
(
"gemini-3.1-flash-lite-preview"
,
req
)
if
err
!=
nil
{
fmt
.
Printf
(
"Error: %v
\n
"
,
err
)
return
}
fmt
.
Printf
(
"Response: %+v
\n
"
,
resp
)
// 流式调用
ch
,
err
:=
client
.
GenerateContentStream
(
"gemini-3.1-flash-lite-preview"
,
req
)
if
err
!=
nil
{
fmt
.
Printf
(
"Error: %v
\n
"
,
err
)
return
}
for
chunk
:=
range
ch
{
fmt
.
Printf
(
"Chunk: %s
\n
"
,
chunk
)
}
}
```
**Go SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址
-
`http_client`
: 可选,自定义 HTTP 客户端
**聊天完成参数:**
-
`Model`
: 必填,模型名称
-
`Messages`
: 必填,消息数组
-
`MaxTokens`
: 可选,最大生成 token 数
-
`Temperature`
: 可选,温度参数
-
`Stream`
: 可选,是否启用流式响应
-
`TopP`
: 可选,核采样参数
-
`FrequencyPenalty`
: 可选,频率惩罚
-
`PresencePenalty`
: 可选,存在惩罚
### SDK 调用
#### 使用官方 Google Generative AI SDK
```
python
import
google.generativeai
as
genai
# 配置 TrafficAPI 作为代理
import
os
_base
=
os
.
environ
.
get
(
"TRAFFICAPI_BASE_URL"
,
"https://trafficapi.fcluadecodex.xyz"
)
genai
.
configure
(
api_key
=
os
.
environ
[
"GEMINI_API_KEY"
],
transport
=
"rest"
,
client_options
=
{
"api_endpoint"
:
f
"
{
_base
}
/v1beta"
}
)
# 创建模型
model
=
genai
.
GenerativeModel
(
"gemini-3.1-flash-lite-preview"
)
# 调用
response
=
model
.
generate_content
(
"Hello, how are you?"
)
print
(
response
.
text
)
# 流式调用
response
=
model
.
generate_content
(
"Hello, how are you?"
,
stream
=
True
)
for
chunk
in
response
:
print
(
chunk
.
text
,
end
=
""
,
flush
=
True
)
```
**Python SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址,默认为官方地址
-
`timeout`
: 可选,请求超时时间(秒)
-
`max_retries`
: 可选,最大重试次数
**聊天完成参数:**
-
`model`
: 必填,模型名称
-
`messages`
: 必填,消息列表,格式为
`[{"role": "user", "content": "..."}]`
-
`max_tokens`
: 可选,最大生成 token 数
-
`temperature`
: 可选,温度参数(0.0-2.0)
-
`stream`
: 可选,是否启用流式响应
-
`top_p`
: 可选,核采样参数
-
`frequency_penalty`
: 可选,频率惩罚
-
`presence_penalty`
: 可选,存在惩罚
#### 使用 LangChain
```
python
from
langchain_google_genai
import
ChatGoogleGenerativeAI
from
langchain.schema
import
HumanMessage
# 配置 LangChain
import
os
chat
=
ChatGoogleGenerativeAI
(
google_api_key
=
os
.
environ
[
"GEMINI_API_KEY"
],
google_api_base
=
os
.
environ
.
get
(
"TRAFFICAPI_BASE_URL"
,
"https://trafficapi.fcluadecodex.xyz"
)
+
"/v1beta"
,
model
=
"gemini-3.1-flash-lite-preview"
)
# 调用
messages
=
[
HumanMessage
(
content
=
"Hello, how are you?"
)]
response
=
chat
(
messages
)
print
(
response
.
content
)
```
**Python SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址,默认为官方地址
-
`timeout`
: 可选,请求超时时间(秒)
-
`max_retries`
: 可选,最大重试次数
**聊天完成参数:**
-
`model`
: 必填,模型名称
-
`messages`
: 必填,消息列表,格式为
`[{"role": "user", "content": "..."}]`
-
`max_tokens`
: 可选,最大生成 token 数
-
`temperature`
: 可选,温度参数(0.0-2.0)
-
`stream`
: 可选,是否启用流式响应
-
`top_p`
: 可选,核采样参数
-
`frequency_penalty`
: 可选,频率惩罚
-
`presence_penalty`
: 可选,存在惩罚
## 流式响应处理
### 通用流式响应处理模式
TrafficAPI 支持所有模型的流式响应,处理模式类似但略有不同:
#### Claude 流式响应格式
```
event: message_start
data: {"type":"message_start","message":{"model":"claude-sonnet-4-6","id":"msg_xxx","type":"message","role":"assistant","content":[],...}}
event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" there"}}
event: message_stop
data: {"type":"message_stop"}
```
#### OpenAI 流式响应格式
```
data: {"id": "chatcmpl-123", "object": "chat.completion.chunk", "created": 1677652288, "model": "gpt-5.2", "choices": [{"index": 0, "delta": {"content": "Hello"}, "finish_reason": null}]}
data: {"id": "chatcmpl-123", "object": "chat.completion.chunk", "created": 1677652288, "model": "gpt-5.2", "choices": [{"index": 0, "delta": {"content": " there"}, "finish_reason": null}]}
data: [DONE]
```
#### Gemini 流式响应格式(streamGenerateContent 端点,SSE 格式)
```
data: {"candidates":[{"content":{"role":"model","parts":[{"text":"Hello"}]},"finishReason":"","index":0}],"usageMetadata":{...},"modelVersion":"gemini-3.1-flash-lite-preview"}
data: {"candidates":[{"content":{"role":"model","parts":[{"text":" there"}]},"finishReason":"STOP","index":0}],"usageMetadata":{...}}
```
### 流式处理最佳实践
1.
**缓冲区管理**
:适当缓冲流式数据,避免频繁的 UI 更新
2.
**错误处理**
:流式响应中可能包含错误信息,需要实时检测
3.
**连接保持**
:确保网络连接稳定,必要时实现重连机制
4.
**资源清理**
:及时关闭流式连接,释放资源
### Python 通用流式处理函数
```
python
def
handle_stream_response
(
response
,
callback
):
"""
通用流式响应处理函数
Args:
response: requests.Response 对象
callback: 处理每个数据块的函数,接收解析后的 JSON 数据
"""
buffer
=
""
for
line
in
response
.
iter_lines
():
if
line
:
line
=
line
.
decode
(
'utf-8'
)
# 处理 SSE 格式(Claude/OpenAI)
if
line
.
startswith
(
'data: '
):
data
=
line
[
6
:]
if
data
==
'[DONE]'
:
break
try
:
json_data
=
json
.
loads
(
data
)
callback
(
json_data
)
except
json
.
JSONDecodeError
:
# 可能是 Gemini 的纯 JSON 流
buffer
+=
data
try
:
json_data
=
json
.
loads
(
buffer
)
callback
(
json_data
)
buffer
=
""
except
json
.
JSONDecodeError
:
continue
else
:
# 处理 Gemini 的纯 JSON 流
buffer
+=
line
try
:
json_data
=
json
.
loads
(
buffer
)
callback
(
json_data
)
buffer
=
""
except
json
.
JSONDecodeError
:
continue
```
**Python SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址,默认为官方地址
-
`timeout`
: 可选,请求超时时间(秒)
-
`max_retries`
: 可选,最大重试次数
**聊天完成参数:**
-
`model`
: 必填,模型名称
-
`messages`
: 必填,消息列表,格式为
`[{"role": "user", "content": "..."}]`
-
`max_tokens`
: 可选,最大生成 token 数
-
`temperature`
: 可选,温度参数(0.0-2.0)
-
`stream`
: 可选,是否启用流式响应
-
`top_p`
: 可选,核采样参数
-
`frequency_penalty`
: 可选,频率惩罚
-
`presence_penalty`
: 可选,存在惩罚
## 错误处理和重试机制
### 常见错误码
| 状态码 | 错误类型 | 描述 | 处理建议 |
|--------|----------|------|----------|
| 400 | Bad Request | 请求参数错误 | 检查请求格式和参数 |
| 401 | Unauthorized | API Key 无效或过期 | 检查 API Key 有效性 |
| 403 | Forbidden | 权限不足 | 检查用户权限和配额 |
| 429 | Too Many Requests | 请求频率超限 | 降低请求频率,实现退避重试 |
| 500 | Internal Server Error | 服务器内部错误 | 等待后重试,联系管理员 |
| 502 | Bad Gateway | 上游服务不可用 | 等待后重试 |
| 503 | Service Unavailable | 服务暂时不可用 | 等待后重试 |
### 重试策略实现
#### Python 重试装饰器
```
python
import
time
import
random
from
functools
import
wraps
def
retry_with_exponential_backoff
(
max_retries
=
5
,
initial_delay
=
1
,
exponential_base
=
2
,
jitter
=
True
,
retry_status_codes
=
[
429
,
500
,
502
,
503
,
504
]
):
"""指数退避重试装饰器"""
def
decorator
(
func
):
@
wraps
(
func
)
def
wrapper
(
*
args
,
**
kwargs
):
delay
=
initial_delay
for
attempt
in
range
(
max_retries
):
try
:
return
func
(
*
args
,
**
kwargs
)
except
Exception
as
e
:
# 检查是否需要重试
should_retry
=
False
if
hasattr
(
e
,
'status_code'
):
should_retry
=
e
.
status_code
in
retry_status_codes
elif
hasattr
(
e
,
'response'
)
and
hasattr
(
e
.
response
,
'status_code'
):
should_retry
=
e
.
response
.
status_code
in
retry_status_codes
if
not
should_retry
or
attempt
==
max_retries
-
1
:
raise
# 计算延迟时间
delay
*=
exponential_base
**
attempt
if
jitter
:
delay
=
random
.
uniform
(
0
,
delay
)
print
(
f
"Attempt
{
attempt
+
1
}
failed, retrying in
{
delay
:.
2
f
}
seconds..."
)
time
.
sleep
(
delay
)
return
func
(
*
args
,
**
kwargs
)
return
wrapper
return
decorator
# 使用示例
@
retry_with_exponential_backoff
(
max_retries
=
3
)
def
call_api_with_retry
():
response
=
requests
.
post
(...)
response
.
raise_for_status
()
return
response
.
json
()
```
**Python SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址,默认为官方地址
-
`timeout`
: 可选,请求超时时间(秒)
-
`max_retries`
: 可选,最大重试次数
**聊天完成参数:**
-
`model`
: 必填,模型名称
-
`messages`
: 必填,消息列表,格式为
`[{"role": "user", "content": "..."}]`
-
`max_tokens`
: 可选,最大生成 token 数
-
`temperature`
: 可选,温度参数(0.0-2.0)
-
`stream`
: 可选,是否启用流式响应
-
`top_p`
: 可选,核采样参数
-
`frequency_penalty`
: 可选,频率惩罚
-
`presence_penalty`
: 可选,存在惩罚
#### Go 重试实现
```
go
package
main
import
(
"fmt"
"math/rand"
"time"
)
type
RetryConfig
struct
{
MaxRetries
int
InitialDelay
time
.
Duration
ExponentialBase
float64
Jitter
bool
RetryStatusCodes
[]
int
}
func
NewDefaultRetryConfig
()
*
RetryConfig
{
return
&
RetryConfig
{
MaxRetries
:
5
,
InitialDelay
:
time
.
Second
,
ExponentialBase
:
2.0
,
Jitter
:
true
,
RetryStatusCodes
:
[]
int
{
429
,
500
,
502
,
503
,
504
},
}
}
func
RetryWithExponentialBackoff
(
config
*
RetryConfig
,
fn
func
()
error
)
error
{
var
lastErr
error
delay
:=
config
.
InitialDelay
for
attempt
:=
0
;
attempt
<
config
.
MaxRetries
;
attempt
++
{
err
:=
fn
()
if
err
==
nil
{
return
nil
}
lastErr
=
err
// 检查是否需要重试
shouldRetry
:=
false
if
apiErr
,
ok
:=
err
.
(
*
APIError
);
ok
{
for
_
,
code
:=
range
config
.
RetryStatusCodes
{
if
apiErr
.
StatusCode
==
code
{
shouldRetry
=
true
break
}
}
}
if
!
shouldRetry
||
attempt
==
config
.
MaxRetries
-
1
{
break
}
// 计算延迟时间
delay
=
time
.
Duration
(
float64
(
delay
)
*
config
.
ExponentialBase
)
if
config
.
Jitter
{
delay
=
time
.
Duration
(
float64
(
delay
)
*
(
0.5
+
rand
.
Float64
()))
}
fmt
.
Printf
(
"Attempt %d failed, retrying in %v...
\n
"
,
attempt
+
1
,
delay
)
time
.
Sleep
(
delay
)
}
return
lastErr
}
// 使用示例
func
callAPIWithRetry
()
error
{
config
:=
NewDefaultRetryConfig
()
return
RetryWithExponentialBackoff
(
config
,
func
()
error
{
resp
,
err
:=
http
.
Post
(
...
)
if
err
!=
nil
{
return
err
}
defer
resp
.
Body
.
Close
()
if
resp
.
StatusCode
!=
http
.
StatusOK
{
return
&
APIError
{
StatusCode
:
resp
.
StatusCode
,
Message
:
"API request failed"
,
}
}
return
nil
})
}
```
**Go SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址
-
`http_client`
: 可选,自定义 HTTP 客户端
**聊天完成参数:**
-
`Model`
: 必填,模型名称
-
`Messages`
: 必填,消息数组
-
`MaxTokens`
: 可选,最大生成 token 数
-
`Temperature`
: 可选,温度参数
-
`Stream`
: 可选,是否启用流式响应
-
`TopP`
: 可选,核采样参数
-
`FrequencyPenalty`
: 可选,频率惩罚
-
`PresencePenalty`
: 可选,存在惩罚
### 错误监控和告警
1.
**错误率监控**
:监控 API 调用错误率,设置阈值告警
2.
**延迟监控**
:监控 API 响应时间,识别性能问题
3.
**配额监控**
:监控用户配额使用情况,提前预警
4.
**健康检查**
:定期进行健康检查,确保服务可用性
## 性能优化建议
### 1. 连接池管理
#### Python 连接池配置
```
python
import
requests
from
requests.adapters
import
HTTPAdapter
from
urllib3.util.retry
import
Retry
# 创建会话并配置连接池
session
=
requests
.
Session
()
# 配置重试策略
retry_strategy
=
Retry
(
total
=
3
,
backoff_factor
=
1
,
status_forcelist
=
[
429
,
500
,
502
,
503
,
504
],
allowed_methods
=
[
"POST"
,
"GET"
]
)
# 配置适配器
adapter
=
HTTPAdapter
(
max_retries
=
retry_strategy
,
pool_connections
=
10
,
# 连接池大小
pool_maxsize
=
10
,
# 最大连接数
pool_block
=
False
# 是否阻塞等待连接
)
session
.
mount
(
"http://"
,
adapter
)
session
.
mount
(
"https://"
,
adapter
)
# 使用会话调用 API
response
=
session
.
post
(
"https://trafficapi.fcluadecodex.xyz/v1/chat/completions"
,
headers
=
headers
,
json
=
payload
,
timeout
=
30
)
```
**Python SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址,默认为官方地址
-
`timeout`
: 可选,请求超时时间(秒)
-
`max_retries`
: 可选,最大重试次数
**聊天完成参数:**
-
`model`
: 必填,模型名称
-
`messages`
: 必填,消息列表,格式为
`[{"role": "user", "content": "..."}]`
-
`max_tokens`
: 可选,最大生成 token 数
-
`temperature`
: 可选,温度参数(0.0-2.0)
-
`stream`
: 可选,是否启用流式响应
-
`top_p`
: 可选,核采样参数
-
`frequency_penalty`
: 可选,频率惩罚
-
`presence_penalty`
: 可选,存在惩罚
#### Go 连接池配置
```
go
package
main
import
(
"net/http"
"time"
)
func
createHTTPClient
()
*
http
.
Client
{
transport
:=
&
http
.
Transport
{
MaxIdleConns
:
100
,
// 最大空闲连接数
MaxIdleConnsPerHost
:
10
,
// 每个主机的最大空闲连接数
MaxConnsPerHost
:
10
,
// 每个主机的最大连接数
IdleConnTimeout
:
90
*
time
.
Second
,
// 空闲连接超时时间
TLSHandshakeTimeout
:
10
*
time
.
Second
,
// TLS 握手超时时间
}
return
&
http
.
Client
{
Transport
:
transport
,
Timeout
:
30
*
time
.
Second
,
// 请求超时时间
}
}
// 使用共享的 HTTP 客户端
var
httpClient
=
createHTTPClient
()
func
callAPI
()
(
*
http
.
Response
,
error
)
{
req
,
err
:=
http
.
NewRequest
(
"POST"
,
"https://trafficapi.fcluadecodex.xyz/v1/chat/completions"
,
nil
)
if
err
!=
nil
{
return
nil
,
err
}
return
httpClient
.
Do
(
req
)
}
```
**Go SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址
-
`http_client`
: 可选,自定义 HTTP 客户端
**聊天完成参数:**
-
`Model`
: 必填,模型名称
-
`Messages`
: 必填,消息数组
-
`MaxTokens`
: 可选,最大生成 token 数
-
`Temperature`
: 可选,温度参数
-
`Stream`
: 可选,是否启用流式响应
-
`TopP`
: 可选,核采样参数
-
`FrequencyPenalty`
: 可选,频率惩罚
-
`PresencePenalty`
: 可选,存在惩罚
### 2. 请求批量化
对于多个独立的请求,可以考虑批量处理:
```
python
from
concurrent.futures
import
ThreadPoolExecutor
,
as_completed
def
batch_process_requests
(
requests_data
,
max_workers
=
5
):
"""批量处理请求"""
results
=
[]
with
ThreadPoolExecutor
(
max_workers
=
max_workers
)
as
executor
:
# 提交所有任务
future_to_request
=
{
executor
.
submit
(
process_single_request
,
data
):
data
for
data
in
requests_data
}
# 收集结果
for
future
in
as_completed
(
future_to_request
):
try
:
result
=
future
.
result
()
results
.
append
(
result
)
except
Exception
as
e
:
print
(
f
"Request failed:
{
e
}
"
)
results
.
append
(
None
)
return
results
```
**Python SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址,默认为官方地址
-
`timeout`
: 可选,请求超时时间(秒)
-
`max_retries`
: 可选,最大重试次数
**聊天完成参数:**
-
`model`
: 必填,模型名称
-
`messages`
: 必填,消息列表,格式为
`[{"role": "user", "content": "..."}]`
-
`max_tokens`
: 可选,最大生成 token 数
-
`temperature`
: 可选,温度参数(0.0-2.0)
-
`stream`
: 可选,是否启用流式响应
-
`top_p`
: 可选,核采样参数
-
`frequency_penalty`
: 可选,频率惩罚
-
`presence_penalty`
: 可选,存在惩罚
### 3. 缓存策略
对于重复的查询,实现缓存机制:
```
python
import
hashlib
import
pickle
from
datetime
import
datetime
,
timedelta
class
APICache
:
def
__init__
(
self
,
ttl_seconds
=
300
):
self
.
cache
=
{}
self
.
ttl
=
ttl_seconds
def
get_cache_key
(
self
,
endpoint
,
params
):
"""生成缓存键"""
key_data
=
f
"
{
endpoint
}
:
{
json
.
dumps
(
params
,
sort_keys
=
True
)
}
"
return
hashlib
.
md5
(
key_data
.
encode
()).
hexdigest
()
def
get
(
self
,
endpoint
,
params
):
"""获取缓存"""
key
=
self
.
get_cache_key
(
endpoint
,
params
)
if
key
in
self
.
cache
:
cached_data
,
timestamp
=
self
.
cache
[
key
]
if
datetime
.
now
()
-
timestamp
<
timedelta
(
seconds
=
self
.
ttl
):
return
cached_data
return
None
def
set
(
self
,
endpoint
,
params
,
data
):
"""设置缓存"""
key
=
self
.
get_cache_key
(
endpoint
,
params
)
self
.
cache
[
key
]
=
(
data
,
datetime
.
now
())
def
clear
(
self
):
"""清空缓存"""
self
.
cache
.
clear
()
# 使用缓存
cache
=
APICache
(
ttl_seconds
=
600
)
def
cached_api_call
(
endpoint
,
params
):
# 检查缓存
cached_result
=
cache
.
get
(
endpoint
,
params
)
if
cached_result
:
return
cached_result
# 调用 API
result
=
call_api
(
endpoint
,
params
)
# 缓存结果
cache
.
set
(
endpoint
,
params
,
result
)
return
result
```
**Python SDK 参数说明:**
**初始化参数:**
-
`api_key`
: 必填,TrafficAPI 的 API Key
-
`base_url`
: 可选,TrafficAPI 服务地址,默认为官方地址
-
`timeout`
: 可选,请求超时时间(秒)
-
`max_retries`
: 可选,最大重试次数
**聊天完成参数:**
-
`model`
: 必填,模型名称
-
`messages`
: 必填,消息列表,格式为
`[{"role": "user", "content": "..."}]`
-
`max_tokens`
: 可选,最大生成 token 数
-
`temperature`
: 可选,温度参数(0.0-2.0)
-
`stream`
: 可选,是否启用流式响应
-
`top_p`
: 可选,核采样参数
-
`frequency_penalty`
: 可选,频率惩罚
-
`presence_penalty`
: 可选,存在惩罚
### 4. 安全最佳实践
1.
**API Key 管理**
:
-
定期轮换 API Key
-
使用环境变量存储敏感信息
-
实现 Key 的访问控制
2.
**请求验证**
:
-
验证输入参数
-
限制请求大小
-
实现速率限制
3.
**日志和审计**
:
-
记录所有 API 调用
-
实现操作审计
-
监控异常行为
## 注意事项
### 1. 服务条款合规性
使用 TrafficAPI 调用 AI 模型服务时,请注意:
-
遵守上游服务提供商(Anthropic、OpenAI、Google)的服务条款
-
了解并遵守相关法律法规
-
合理使用 API 资源,避免滥用
### 2. 安全性考虑
1.
**API Key 保护**
:
-
不要在客户端代码中硬编码 API Key
-
使用环境变量或密钥管理服务
-
定期轮换 API Key
2.
**请求安全**
:
-
验证用户输入,防止注入攻击
-
实现请求速率限制
-
监控异常请求模式
3.
**数据隐私**
:
-
避免传输敏感个人信息
-
了解数据在传输和存储中的加密情况
-
遵守数据保护法规(如 GDPR、CCPA)
### 3. 成本控制
1.
**用量监控**
:
-
实时监控 Token 使用量
-
设置用量告警阈值
-
定期分析成本趋势
2.
**优化策略**
:
-
使用缓存减少重复请求
-
优化提示词减少 Token 消耗
-
考虑使用成本更低的模型
3.
**预算管理**
:
-
设置月度预算限制
-
实现自动化的成本控制
-
定期审查成本效益
frontend/src/views/user/DocsView.vue
View file @
68f67198
...
...
@@ -38,7 +38,7 @@ import { onMounted, onUnmounted, ref, nextTick } from 'vue'
import
{
Marked
,
Renderer
}
from
'
marked
'
import
DOMPurify
from
'
dompurify
'
import
AppLayout
from
'
@/components/layout/AppLayout.vue
'
import
docRaw
from
'
../../../..
/trafficapi_ai_call_documentation.md?raw
'
import
docRaw
from
'
@/assets
/trafficapi_ai_call_documentation.md?raw
'
// ─── Slug ─────────────────────────────────────────────────────────────────────
function
slugify
(
text
:
string
):
string
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment