In production, the painful part is not “streaming is slow.” It’s “streaming breaks and then duplicates output after reconnect.”
This guide gives you a practical recovery loop: delta persistence + resume token + idempotent dedup, so reconnection does not replay garbage.
Failure model first: what are you actually fixing?
For OpenAI Responses streaming pipelines, the common failures are:
- Transient network break: client socket dies, server job continues.
- Client timeout cancellation: app timeout is shorter than generation budget.
- Duplicate consumption: reconnect resumes from wrong offset and repeats text.
- Multi-worker race: same
request_idprocessed by multiple workers.
If you don’t separate these modes, your “fixes” will be random.
Minimal stable architecture (conservative mode)
Use a three-layer state model:
- L1 in-memory buffer: low-latency append for active connections.
- L2 Redis Stream: append-only delta events by
response_id. - L3 Postgres snapshot: final deduplicated text + metadata.
Required fields per delta event:
{
"response_id": "resp_xxx",
"segment_seq": 128,
"chunk_id": "ck_8f2...",
"delta_text": "...",
"delta_hash": "sha1(...)",
"created_at": "2026-03-23T01:13:09Z"
}
segment_seq + delta_hash is your anti-dup backbone.
Go template: consume, persist, checkpoint
This template does three things:
- persists every delta to Redis (replayable),
- deduplicates in-memory (cheap),
- supports checkpoint-based recovery.
package stream
import (
"context"
"crypto/sha1"
"encoding/hex"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
type DeltaEvent struct {
ResponseID string
SegmentSeq int64
ChunkID string
Text string
At time.Time
}
type CheckpointStore interface {
Save(ctx context.Context, responseID string, seq int64, resumeToken string) error
Load(ctx context.Context, responseID string) (seq int64, resumeToken string, err error)
}
type Deduper struct {
seen map[string]struct{}
}
func NewDeduper() *Deduper { return &Deduper{seen: make(map[string]struct{}, 4096)} }
func (d *Deduper) SeenOrSet(seq int64, text string) bool {
h := sha1.Sum([]byte(fmt.Sprintf("%d|%s", seq, text)))
k := hex.EncodeToString(h[:])
if _, ok := d.seen[k]; ok {
return true
}
d.seen[k] = struct{}{}
return false
}
func PersistDelta(ctx context.Context, rdb *redis.Client, ev DeltaEvent) error {
key := fmt.Sprintf("resp:%s:deltas", ev.ResponseID)
return rdb.XAdd(ctx, &redis.XAddArgs{
Stream: key,
Values: map[string]any{
"response_id": ev.ResponseID,
"segment_seq": ev.SegmentSeq,
"chunk_id": ev.ChunkID,
"delta_text": ev.Text,
"ts": ev.At.UnixMilli(),
},
}).Err()
}
Resume token usage without foot-guns
Recommended policy:
- checkpoint every N events (N=20) or every 1 second.
- checkpoint fields:
last_seq,resume_token,updated_at. - on reconnect, request only events after
last_seq + 1.
Pseudo logic:
on reconnect:
cp = loadCheckpoint(response_id)
stream = openStream(resume_token=cp.resume_token)
for delta in stream:
if delta.seq <= cp.last_seq: skip
if dedupe(delta.seq, delta.text): skip
append(delta)
if should_checkpoint(): save(delta.seq, delta.resume_token)
Do not recover by wall-clock time; clock skew will burn you.
Dedup strategy: text equality is not enough
If you dedup only by delta_text, you will misfire on tiny repeated fragments (".", "\n", etc.).
Safer keys:
- primary:
response_id + segment_seq - fallback:
response_id + delta_hash(text)
Suggested SQL constraints:
create table if not exists response_deltas (
response_id text not null,
segment_seq bigint not null,
delta_hash text not null,
delta_text text not null,
created_at timestamptz not null default now(),
primary key (response_id, segment_seq)
);
create unique index if not exists uq_response_hash
on response_deltas(response_id, delta_hash);
Timeout budget alignment
Many “stream failures” are timeout mismatches:
- API timeout:
45s - upstream gateway timeout:
30s - client read timeout:
20s
Result: client dies first, server still runs, reconnect duplicates output.
Conservative baseline:
- client read timeout >= 90s
- keepalive on
- gateway idle timeout >= client timeout + 15s
- generation budget controlled separately from socket timeout
Failure drills before production
- 3s network break: verify resume without duplicate output.
- forced 502 from proxy: verify reconnect + checkpoint correctness.
- two workers recover same
response_id: verify uniqueness constraints block duplicates. - temporary Redis outage: verify memory fallback + later backfill.
Useful macOS network chaos drill:
# 25% packet loss + 200ms delay (remember to reset)
sudo dnctl pipe 1 config delay 200ms plr 0.25
sudo pfctl -E
printf "dummynet out quick proto tcp from any to any pipe 1\n" | sudo pfctl -f -
# reset
sudo pfctl -d
sudo dnctl -q flush
Metrics that actually matter
Track at least these six:
stream_resume_success_ratestream_duplicate_drop_countcheckpoint_lag_msdelta_persist_p99_msresume_from_stale_checkpoint_countsame_response_multi_worker_conflict
Threshold examples:
- resume success < 99% => page the team
- duplicate drop rate > 3% => investigate replay path
- checkpoint lag p99 > 1500ms => scale persistence path
Summary
This design optimizes for explainable stability:
- checkpoint makes reconnection deterministic,
- uniqueness constraints block duplicate replay,
- metrics make failures diagnosable.
Get recovery correctness first. Then chase lower latency.