Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
陈曦
sub2api
Commits
4190293b
Unverified
Commit
4190293b
authored
Mar 07, 2026
by
Wesley Liddick
Committed by
GitHub
Mar 07, 2026
Browse files
Merge pull request #823 from StarryKira/fix/empty-stream-failover
Fix/empty streamfix issue #791
parents
421b4c0a
65a10679
Changes
3
Hide whitespace changes
Inline
Side-by-side
backend/internal/pkg/antigravity/stream_transformer.go
View file @
4190293b
...
@@ -119,23 +119,33 @@ func (p *StreamingProcessor) ProcessLine(line string) []byte {
...
@@ -119,23 +119,33 @@ func (p *StreamingProcessor) ProcessLine(line string) []byte {
return
result
.
Bytes
()
return
result
.
Bytes
()
}
}
// Finish 结束处理,返回最终事件和用量
// Finish 结束处理,返回最终事件和用量。
// 若整个流未收到任何可解析的上游数据(messageStartSent == false),
// 则不补发任何结束事件,防止客户端收到没有 message_start 的残缺流。
func
(
p
*
StreamingProcessor
)
Finish
()
([]
byte
,
*
ClaudeUsage
)
{
func
(
p
*
StreamingProcessor
)
Finish
()
([]
byte
,
*
ClaudeUsage
)
{
var
result
bytes
.
Buffer
if
!
p
.
messageStopSent
{
_
,
_
=
result
.
Write
(
p
.
emitFinish
(
""
))
}
usage
:=
&
ClaudeUsage
{
usage
:=
&
ClaudeUsage
{
InputTokens
:
p
.
inputTokens
,
InputTokens
:
p
.
inputTokens
,
OutputTokens
:
p
.
outputTokens
,
OutputTokens
:
p
.
outputTokens
,
CacheReadInputTokens
:
p
.
cacheReadTokens
,
CacheReadInputTokens
:
p
.
cacheReadTokens
,
}
}
if
!
p
.
messageStartSent
{
return
nil
,
usage
}
var
result
bytes
.
Buffer
if
!
p
.
messageStopSent
{
_
,
_
=
result
.
Write
(
p
.
emitFinish
(
""
))
}
return
result
.
Bytes
(),
usage
return
result
.
Bytes
(),
usage
}
}
// MessageStartSent 报告流中是否已发出过 message_start 事件(即是否收到过有效的上游数据)
func
(
p
*
StreamingProcessor
)
MessageStartSent
()
bool
{
return
p
.
messageStartSent
}
// emitMessageStart 发送 message_start 事件
// emitMessageStart 发送 message_start 事件
func
(
p
*
StreamingProcessor
)
emitMessageStart
(
v1Resp
*
V1InternalResponse
)
[]
byte
{
func
(
p
*
StreamingProcessor
)
emitMessageStart
(
v1Resp
*
V1InternalResponse
)
[]
byte
{
if
p
.
messageStartSent
{
if
p
.
messageStartSent
{
...
...
backend/internal/service/antigravity_gateway_service.go
View file @
4190293b
...
@@ -3696,6 +3696,15 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context
...
@@ -3696,6 +3696,15 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context
finalEvents
,
agUsage
:=
processor
.
Finish
()
finalEvents
,
agUsage
:=
processor
.
Finish
()
if
len
(
finalEvents
)
>
0
{
if
len
(
finalEvents
)
>
0
{
cw
.
Write
(
finalEvents
)
cw
.
Write
(
finalEvents
)
}
else
if
!
processor
.
MessageStartSent
()
&&
!
cw
.
Disconnected
()
{
// 整个流未收到任何可解析的上游数据(全部 SSE 行均无法被 JSON 解析),
// 触发 failover 在同账号重试,避免向客户端发出缺少 message_start 的残缺流
logger
.
LegacyPrintf
(
"service.antigravity_gateway"
,
"[antigravity-Claude-Stream] empty stream response (no valid events parsed), triggering failover"
)
return
nil
,
&
UpstreamFailoverError
{
StatusCode
:
http
.
StatusBadGateway
,
ResponseBody
:
[]
byte
(
`{"error":"empty stream response from upstream"}`
),
RetryableOnSameAccount
:
true
,
}
}
}
return
&
antigravityStreamResult
{
usage
:
convertUsage
(
agUsage
),
firstTokenMs
:
firstTokenMs
,
clientDisconnect
:
cw
.
Disconnected
()},
nil
return
&
antigravityStreamResult
{
usage
:
convertUsage
(
agUsage
),
firstTokenMs
:
firstTokenMs
,
clientDisconnect
:
cw
.
Disconnected
()},
nil
}
}
...
...
backend/internal/service/antigravity_gateway_service_test.go
View file @
4190293b
...
@@ -998,6 +998,46 @@ func TestHandleClaudeStreamingResponse_ClientDisconnect(t *testing.T) {
...
@@ -998,6 +998,46 @@ func TestHandleClaudeStreamingResponse_ClientDisconnect(t *testing.T) {
require
.
True
(
t
,
result
.
clientDisconnect
)
require
.
True
(
t
,
result
.
clientDisconnect
)
}
}
// TestHandleClaudeStreamingResponse_EmptyStream
// 验证:上游只返回无法解析的 SSE 行时,触发 UpstreamFailoverError 而不是向客户端发出残缺流
func
TestHandleClaudeStreamingResponse_EmptyStream
(
t
*
testing
.
T
)
{
gin
.
SetMode
(
gin
.
TestMode
)
svc
:=
newAntigravityTestService
(
&
config
.
Config
{
Gateway
:
config
.
GatewayConfig
{
MaxLineSize
:
defaultMaxLineSize
},
})
rec
:=
httptest
.
NewRecorder
()
c
,
_
:=
gin
.
CreateTestContext
(
rec
)
c
.
Request
=
httptest
.
NewRequest
(
http
.
MethodPost
,
"/"
,
nil
)
pr
,
pw
:=
io
.
Pipe
()
resp
:=
&
http
.
Response
{
StatusCode
:
http
.
StatusOK
,
Body
:
pr
,
Header
:
http
.
Header
{}}
go
func
()
{
defer
func
()
{
_
=
pw
.
Close
()
}()
// 所有行均为无法 JSON 解析的内容,ProcessLine 全部返回 nil
fmt
.
Fprintln
(
pw
,
"data: not-valid-json"
)
fmt
.
Fprintln
(
pw
,
""
)
fmt
.
Fprintln
(
pw
,
"data: also-invalid"
)
fmt
.
Fprintln
(
pw
,
""
)
}()
_
,
err
:=
svc
.
handleClaudeStreamingResponse
(
c
,
resp
,
time
.
Now
(),
"claude-sonnet-4-5"
)
_
=
pr
.
Close
()
// 应当返回 UpstreamFailoverError 而非 nil,以便上层触发 failover
require
.
Error
(
t
,
err
)
var
failoverErr
*
UpstreamFailoverError
require
.
ErrorAs
(
t
,
err
,
&
failoverErr
)
require
.
True
(
t
,
failoverErr
.
RetryableOnSameAccount
)
// 客户端不应收到任何 SSE 事件(既无 message_start 也无 message_stop)
body
:=
rec
.
Body
.
String
()
require
.
NotContains
(
t
,
body
,
"event: message_start"
)
require
.
NotContains
(
t
,
body
,
"event: message_stop"
)
require
.
NotContains
(
t
,
body
,
"event: message_delta"
)
}
// TestHandleClaudeStreamingResponse_ContextCanceled
// TestHandleClaudeStreamingResponse_ContextCanceled
// 验证:context 取消时不注入错误事件
// 验证:context 取消时不注入错误事件
func
TestHandleClaudeStreamingResponse_ContextCanceled
(
t
*
testing
.
T
)
{
func
TestHandleClaudeStreamingResponse_ContextCanceled
(
t
*
testing
.
T
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment