Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
陈曦
sub2api
Commits
1e51de88
Unverified
Commit
1e51de88
authored
Mar 12, 2026
by
Wesley Liddick
Committed by
GitHub
Mar 12, 2026
Browse files
Merge pull request #937 from lxohi/fix/anthropic-stream-keepalive
fix: 为 Anthropic Messages API 流式转发添加下游 keepalive ping
parents
30995b53
6e90ec61
Changes
1
Hide whitespace changes
Inline
Side-by-side
backend/internal/service/gateway_service.go
View file @
1e51de88
...
@@ -5998,6 +5998,22 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
...
@@ -5998,6 +5998,22 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
intervalCh
=
intervalTicker
.
C
intervalCh
=
intervalTicker
.
C
}
}
// 下游 keepalive:防止代理/Cloudflare Tunnel 因连接空闲而断开
keepaliveInterval
:=
time
.
Duration
(
0
)
if
s
.
cfg
!=
nil
&&
s
.
cfg
.
Gateway
.
StreamKeepaliveInterval
>
0
{
keepaliveInterval
=
time
.
Duration
(
s
.
cfg
.
Gateway
.
StreamKeepaliveInterval
)
*
time
.
Second
}
var
keepaliveTicker
*
time
.
Ticker
if
keepaliveInterval
>
0
{
keepaliveTicker
=
time
.
NewTicker
(
keepaliveInterval
)
defer
keepaliveTicker
.
Stop
()
}
var
keepaliveCh
<-
chan
time
.
Time
if
keepaliveTicker
!=
nil
{
keepaliveCh
=
keepaliveTicker
.
C
}
lastDataAt
:=
time
.
Now
()
// 仅发送一次错误事件,避免多次写入导致协议混乱(写失败时尽力通知客户端)
// 仅发送一次错误事件,避免多次写入导致协议混乱(写失败时尽力通知客户端)
errorEventSent
:=
false
errorEventSent
:=
false
sendErrorEvent
:=
func
(
reason
string
)
{
sendErrorEvent
:=
func
(
reason
string
)
{
...
@@ -6187,6 +6203,7 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
...
@@ -6187,6 +6203,7 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
break
break
}
}
flusher
.
Flush
()
flusher
.
Flush
()
lastDataAt
=
time
.
Now
()
}
}
if
data
!=
""
{
if
data
!=
""
{
if
firstTokenMs
==
nil
&&
data
!=
"[DONE]"
{
if
firstTokenMs
==
nil
&&
data
!=
"[DONE]"
{
...
@@ -6220,6 +6237,22 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
...
@@ -6220,6 +6237,22 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
}
}
sendErrorEvent
(
"stream_timeout"
)
sendErrorEvent
(
"stream_timeout"
)
return
&
streamingResult
{
usage
:
usage
,
firstTokenMs
:
firstTokenMs
},
fmt
.
Errorf
(
"stream data interval timeout"
)
return
&
streamingResult
{
usage
:
usage
,
firstTokenMs
:
firstTokenMs
},
fmt
.
Errorf
(
"stream data interval timeout"
)
case
<-
keepaliveCh
:
if
clientDisconnected
{
continue
}
if
time
.
Since
(
lastDataAt
)
<
keepaliveInterval
{
continue
}
// SSE ping 事件:Anthropic 原生格式,客户端会正确处理,
// 同时保持连接活跃防止 Cloudflare Tunnel 等代理断开
if
_
,
werr
:=
fmt
.
Fprint
(
w
,
"event: ping
\n
data: {
\"
type
\"
:
\"
ping
\"
}
\n\n
"
);
werr
!=
nil
{
clientDisconnected
=
true
logger
.
LegacyPrintf
(
"service.gateway"
,
"Client disconnected during keepalive ping, continuing to drain upstream for billing"
)
continue
}
flusher
.
Flush
()
}
}
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment