生产里最难受的不是“流式返回慢”,而是“流式返回断了还重复”,用户看到半句、重连后又从中间重喷一遍。
这篇给一套可落地的恢复闭环:delta 持久化 + resume token + 幂等去重,目标是“断线可续,重放不重字”。
先定失败模型:你到底在修什么
OpenAI Responses 流式链路里,常见失败有 4 类:
- 网络抖动断流:客户端 TCP/代理短断,服务端任务还在跑。
- 客户端超时主动取消:上层超时预算太紧,生成没完就切了。
- 重复消费:重连后从错误偏移继续,导致内容重复拼接。
- 多副本竞争写入:同一个 request_id 被多个 worker 同时处理。
如果不把这 4 类区分开,后面全是玄学。
架构最小闭环(保守但稳)
建议先用这套三层状态:
- L1 内存缓冲(进程内):当前连接的低延迟拼接。
- L2 Redis Stream(会话级):按 response_id 追加 delta 事件。
- L3 Postgres(最终快照):落地去重后的最终文本和元数据。
关键字段(每个 delta 事件都要有):
{
"response_id": "resp_xxx",
"segment_seq": 128,
"chunk_id": "ck_8f2...",
"delta_text": "...",
"delta_hash": "sha1(...)",
"created_at": "2026-03-23T01:13:09Z"
}
其中 segment_seq + delta_hash 是去重命门。
Go 侧:流式消费与持久化模板
下面这个模板做三件事:
- 每条 delta 立即写 Redis(可重放)
- 同时做内存去重(低成本)
- 定时 checkpoint(可恢复)
package stream
import (
"context"
"crypto/sha1"
"encoding/hex"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
type DeltaEvent struct {
ResponseID string
SegmentSeq int64
ChunkID string
Text string
At time.Time
}
type CheckpointStore interface {
Save(ctx context.Context, responseID string, seq int64, resumeToken string) error
Load(ctx context.Context, responseID string) (seq int64, resumeToken string, err error)
}
type Deduper struct {
seen map[string]struct{}
}
func NewDeduper() *Deduper { return &Deduper{seen: make(map[string]struct{}, 4096)} }
func (d *Deduper) SeenOrSet(seq int64, text string) bool {
h := sha1.Sum([]byte(fmt.Sprintf("%d|%s", seq, text)))
k := hex.EncodeToString(h[:])
if _, ok := d.seen[k]; ok {
return true
}
d.seen[k] = struct{}{}
return false
}
func PersistDelta(ctx context.Context, rdb *redis.Client, ev DeltaEvent) error {
key := fmt.Sprintf("resp:%s:deltas", ev.ResponseID)
return rdb.XAdd(ctx, &redis.XAddArgs{
Stream: key,
Values: map[string]any{
"response_id": ev.ResponseID,
"segment_seq": ev.SegmentSeq,
"chunk_id": ev.ChunkID,
"delta_text": ev.Text,
"ts": ev.At.UnixMilli(),
},
}).Err()
}
resume token 怎么用才不翻车
建议策略:
- 每收到 N 条(建议 20)或每 1 秒写一次 checkpoint。
- checkpoint 保存:
last_seq、resume_token、updated_at。 - 重连时先读 checkpoint,只请求
last_seq+1后的数据。
伪代码:
on reconnect:
cp = loadCheckpoint(response_id)
stream = openStream(resume_token=cp.resume_token)
for delta in stream:
if delta.seq <= cp.last_seq: skip
if dedupe(delta.seq, delta.text): skip
append(delta)
if should_checkpoint(): save(delta.seq, delta.resume_token)
别偷懒用“按时间戳恢复”,时钟漂移会害你。
重复片段去重:别只看文本相等
只用 delta_text 去重会误杀(例如“。”、“\n”重复很常见)。
更稳的 Key:
- 主键:
response_id + segment_seq - 兜底:
response_id + delta_hash(text)
SQL 侧建议唯一约束:
create table if not exists response_deltas (
response_id text not null,
segment_seq bigint not null,
delta_hash text not null,
delta_text text not null,
created_at timestamptz not null default now(),
primary key (response_id, segment_seq)
);
create unique index if not exists uq_response_hash
on response_deltas(response_id, delta_hash);
超时预算:三层要对齐
很多“断流”其实是预算互相打架:
- API 请求超时:
45s - 上游网关超时:
30s - 客户端 read timeout:
20s
这会导致客户端先死,网关后死,服务端还在跑,最后重复恢复。
保守配置建议:
- 客户端 read timeout >= 90s
- 连接 keepalive 开启
- 网关 idle timeout >= 客户端 read timeout + 15s
- 任务总预算单独控制,不和 socket 超时绑死
故障演练清单(上线前必须过)
- 中途断网 3 秒:验证可续写且不重复。
- 代理返回 502:验证自动重连 + checkpoint 生效。
- 两个 worker 同时恢复同一 response_id:验证唯一约束挡住重复。
- Redis 短暂不可用:验证降级到内存缓存并补偿写回。
可直接跑的网络抖动演练(macOS):
# 模拟 25% 丢包 + 200ms 延迟(测试后记得恢复)
sudo dnctl pipe 1 config delay 200ms plr 0.25
sudo pfctl -E
printf "dummynet out quick proto tcp from any to any pipe 1\n" | sudo pfctl -f -
# 恢复
sudo pfctl -d
sudo dnctl -q flush
监控指标:盯这 6 个就够用
stream_resume_success_ratestream_duplicate_drop_countcheckpoint_lag_msdelta_persist_p99_msresume_from_stale_checkpoint_countsame_response_multi_worker_conflict
阈值建议:
- 恢复成功率 < 99% 直接告警
- 重复丢弃率 > 3% 触发排查
- checkpoint 延迟 p99 > 1500ms 触发容量扩展
总结
这套方案的核心不是“更快”,而是可解释地稳定:
- 有 checkpoint,断了能续;
- 有唯一约束,重了能挡;
- 有指标,坏了能定位。
先把恢复闭环跑稳,再追求极限时延。生产里,稳定就是吞吐的一部分。