生产里最难受的不是“流式返回慢”,而是“流式返回断了还重复”,用户看到半句、重连后又从中间重喷一遍。
这篇给一套可落地的恢复闭环:delta 持久化 + resume token + 幂等去重,目标是“断线可续,重放不重字”。

先定失败模型:你到底在修什么

OpenAI Responses 流式链路里,常见失败有 4 类:

  1. 网络抖动断流:客户端 TCP/代理短断,服务端任务还在跑。
  2. 客户端超时主动取消:上层超时预算太紧,生成没完就切了。
  3. 重复消费:重连后从错误偏移继续,导致内容重复拼接。
  4. 多副本竞争写入:同一个 request_id 被多个 worker 同时处理。

如果不把这 4 类区分开,后面全是玄学。

架构最小闭环(保守但稳)

建议先用这套三层状态:

  • L1 内存缓冲(进程内):当前连接的低延迟拼接。
  • L2 Redis Stream(会话级):按 response_id 追加 delta 事件。
  • L3 Postgres(最终快照):落地去重后的最终文本和元数据。

关键字段(每个 delta 事件都要有):

{
  "response_id": "resp_xxx",
  "segment_seq": 128,
  "chunk_id": "ck_8f2...",
  "delta_text": "...",
  "delta_hash": "sha1(...)",
  "created_at": "2026-03-23T01:13:09Z"
}

其中 segment_seq + delta_hash 是去重命门。

Go 侧:流式消费与持久化模板

下面这个模板做三件事:

  • 每条 delta 立即写 Redis(可重放)
  • 同时做内存去重(低成本)
  • 定时 checkpoint(可恢复)
package stream

import (
	"context"
	"crypto/sha1"
	"encoding/hex"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

type DeltaEvent struct {
	ResponseID string
	SegmentSeq int64
	ChunkID    string
	Text       string
	At         time.Time
}

type CheckpointStore interface {
	Save(ctx context.Context, responseID string, seq int64, resumeToken string) error
	Load(ctx context.Context, responseID string) (seq int64, resumeToken string, err error)
}

type Deduper struct {
	seen map[string]struct{}
}

func NewDeduper() *Deduper { return &Deduper{seen: make(map[string]struct{}, 4096)} }

func (d *Deduper) SeenOrSet(seq int64, text string) bool {
	h := sha1.Sum([]byte(fmt.Sprintf("%d|%s", seq, text)))
	k := hex.EncodeToString(h[:])
	if _, ok := d.seen[k]; ok {
		return true
	}
	d.seen[k] = struct{}{}
	return false
}

func PersistDelta(ctx context.Context, rdb *redis.Client, ev DeltaEvent) error {
	key := fmt.Sprintf("resp:%s:deltas", ev.ResponseID)
	return rdb.XAdd(ctx, &redis.XAddArgs{
		Stream: key,
		Values: map[string]any{
			"response_id": ev.ResponseID,
			"segment_seq": ev.SegmentSeq,
			"chunk_id":    ev.ChunkID,
			"delta_text":  ev.Text,
			"ts":          ev.At.UnixMilli(),
		},
	}).Err()
}

resume token 怎么用才不翻车

建议策略:

  1. 每收到 N 条(建议 20)或每 1 秒写一次 checkpoint。
  2. checkpoint 保存:last_seqresume_tokenupdated_at
  3. 重连时先读 checkpoint,只请求 last_seq+1 后的数据。

伪代码:

on reconnect:
  cp = loadCheckpoint(response_id)
  stream = openStream(resume_token=cp.resume_token)
  for delta in stream:
     if delta.seq <= cp.last_seq: skip
     if dedupe(delta.seq, delta.text): skip
     append(delta)
     if should_checkpoint(): save(delta.seq, delta.resume_token)

别偷懒用“按时间戳恢复”,时钟漂移会害你。

重复片段去重:别只看文本相等

只用 delta_text 去重会误杀(例如“。”、“\n”重复很常见)。

更稳的 Key:

  • 主键:response_id + segment_seq
  • 兜底:response_id + delta_hash(text)

SQL 侧建议唯一约束:

create table if not exists response_deltas (
  response_id text not null,
  segment_seq bigint not null,
  delta_hash  text not null,
  delta_text  text not null,
  created_at  timestamptz not null default now(),
  primary key (response_id, segment_seq)
);

create unique index if not exists uq_response_hash
on response_deltas(response_id, delta_hash);

超时预算:三层要对齐

很多“断流”其实是预算互相打架:

  • API 请求超时:45s
  • 上游网关超时:30s
  • 客户端 read timeout:20s

这会导致客户端先死,网关后死,服务端还在跑,最后重复恢复。

保守配置建议:

  • 客户端 read timeout >= 90s
  • 连接 keepalive 开启
  • 网关 idle timeout >= 客户端 read timeout + 15s
  • 任务总预算单独控制,不和 socket 超时绑死

故障演练清单(上线前必须过)

  1. 中途断网 3 秒:验证可续写且不重复。
  2. 代理返回 502:验证自动重连 + checkpoint 生效。
  3. 两个 worker 同时恢复同一 response_id:验证唯一约束挡住重复。
  4. Redis 短暂不可用:验证降级到内存缓存并补偿写回。

可直接跑的网络抖动演练(macOS):

# 模拟 25% 丢包 + 200ms 延迟(测试后记得恢复)
sudo dnctl pipe 1 config delay 200ms plr 0.25
sudo pfctl -E
printf "dummynet out quick proto tcp from any to any pipe 1\n" | sudo pfctl -f -

# 恢复
sudo pfctl -d
sudo dnctl -q flush

监控指标:盯这 6 个就够用

  • stream_resume_success_rate
  • stream_duplicate_drop_count
  • checkpoint_lag_ms
  • delta_persist_p99_ms
  • resume_from_stale_checkpoint_count
  • same_response_multi_worker_conflict

阈值建议:

  • 恢复成功率 < 99% 直接告警
  • 重复丢弃率 > 3% 触发排查
  • checkpoint 延迟 p99 > 1500ms 触发容量扩展

总结

这套方案的核心不是“更快”,而是可解释地稳定

  • 有 checkpoint,断了能续;
  • 有唯一约束,重了能挡;
  • 有指标,坏了能定位。

先把恢复闭环跑稳,再追求极限时延。生产里,稳定就是吞吐的一部分。