Commit 0e237326 authored by Elysia's avatar Elysia
Browse files

fix(gateway): 防止流式 failover 拼接腐化导致客户端收到双 message_start



当上游在 SSE 流中途返回 event:error 时,handleStreamingResponse 已将
部分 SSE 事件写入客户端,但原先的 failover 逻辑仍会切换到下一个账号
并写入完整流,导致客户端收到两个 message_start 进而产生 400 错误。

修复方案:在每次 Forward 调用前记录 c.Writer.Size(),若 Forward 返回
UpstreamFailoverError 后 writer 字节数增加,说明 SSE 内容已不可撤销地
发送给客户端,此时直接调用 handleFailoverExhausted 发送 SSE error 事件
终止流,而非继续 failover。

Ping-only 场景不受影响:slot 等待期的 ping 字节在 Forward 前后相等,
正常 failover 流程照常进行。
Co-Authored-By: default avatarClaude Sonnet 4.6 <noreply@anthropic.com>
parent 6da5fa01
...@@ -391,6 +391,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) { ...@@ -391,6 +391,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
if fs.SwitchCount > 0 { if fs.SwitchCount > 0 {
requestCtx = service.WithAccountSwitchCount(requestCtx, fs.SwitchCount, h.metadataBridgeEnabled()) requestCtx = service.WithAccountSwitchCount(requestCtx, fs.SwitchCount, h.metadataBridgeEnabled())
} }
// 记录 Forward 前已写入字节数,Forward 后若增加则说明 SSE 内容已发,禁止 failover
writerSizeBeforeForward := c.Writer.Size()
if account.Platform == service.PlatformAntigravity { if account.Platform == service.PlatformAntigravity {
result, err = h.antigravityGatewayService.ForwardGemini(requestCtx, c, account, reqModel, "generateContent", reqStream, body, hasBoundSession) result, err = h.antigravityGatewayService.ForwardGemini(requestCtx, c, account, reqModel, "generateContent", reqStream, body, hasBoundSession)
} else { } else {
...@@ -402,6 +404,11 @@ func (h *GatewayHandler) Messages(c *gin.Context) { ...@@ -402,6 +404,11 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
if err != nil { if err != nil {
var failoverErr *service.UpstreamFailoverError var failoverErr *service.UpstreamFailoverError
if errors.As(err, &failoverErr) { if errors.As(err, &failoverErr) {
// 流式内容已写入客户端,无法撤销,禁止 failover 以防止流拼接腐化
if c.Writer.Size() != writerSizeBeforeForward {
h.handleFailoverExhausted(c, failoverErr, account.Platform, true)
return
}
action := fs.HandleFailoverError(c.Request.Context(), h.gatewayService, account.ID, account.Platform, failoverErr) action := fs.HandleFailoverError(c.Request.Context(), h.gatewayService, account.ID, account.Platform, failoverErr)
switch action { switch action {
case FailoverContinue: case FailoverContinue:
...@@ -637,6 +644,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) { ...@@ -637,6 +644,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
if fs.SwitchCount > 0 { if fs.SwitchCount > 0 {
requestCtx = service.WithAccountSwitchCount(requestCtx, fs.SwitchCount, h.metadataBridgeEnabled()) requestCtx = service.WithAccountSwitchCount(requestCtx, fs.SwitchCount, h.metadataBridgeEnabled())
} }
// 记录 Forward 前已写入字节数,Forward 后若增加则说明 SSE 内容已发,禁止 failover
writerSizeBeforeForward := c.Writer.Size()
if account.Platform == service.PlatformAntigravity && account.Type != service.AccountTypeAPIKey { if account.Platform == service.PlatformAntigravity && account.Type != service.AccountTypeAPIKey {
result, err = h.antigravityGatewayService.Forward(requestCtx, c, account, body, hasBoundSession) result, err = h.antigravityGatewayService.Forward(requestCtx, c, account, body, hasBoundSession)
} else { } else {
...@@ -706,6 +715,11 @@ func (h *GatewayHandler) Messages(c *gin.Context) { ...@@ -706,6 +715,11 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
} }
var failoverErr *service.UpstreamFailoverError var failoverErr *service.UpstreamFailoverError
if errors.As(err, &failoverErr) { if errors.As(err, &failoverErr) {
// 流式内容已写入客户端,无法撤销,禁止 failover 以防止流拼接腐化
if c.Writer.Size() != writerSizeBeforeForward {
h.handleFailoverExhausted(c, failoverErr, account.Platform, true)
return
}
action := fs.HandleFailoverError(c.Request.Context(), h.gatewayService, account.ID, account.Platform, failoverErr) action := fs.HandleFailoverError(c.Request.Context(), h.gatewayService, account.ID, account.Platform, failoverErr)
switch action { switch action {
case FailoverContinue: case FailoverContinue:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment