In production, the painful part is not “streaming is slow.” It’s “streaming breaks and then duplicates output after reconnect.”
This guide gives you a practical recovery loop: delta persistence + resume token + idempotent dedup, so reconnection does not replay garbage.

Failure model first: what are you actually fixing?

For OpenAI Responses streaming pipelines, the common failures are:

  1. Transient network break: client socket dies, server job continues.
  2. Client timeout cancellation: app timeout is shorter than generation budget.
  3. Duplicate consumption: reconnect resumes from wrong offset and repeats text.
  4. Multi-worker race: same request_id processed by multiple workers.

If you don’t separate these modes, your “fixes” will be random.

Minimal stable architecture (conservative mode)

Use a three-layer state model:

  • L1 in-memory buffer: low-latency append for active connections.
  • L2 Redis Stream: append-only delta events by response_id.
  • L3 Postgres snapshot: final deduplicated text + metadata.

Required fields per delta event:

{
  "response_id": "resp_xxx",
  "segment_seq": 128,
  "chunk_id": "ck_8f2...",
  "delta_text": "...",
  "delta_hash": "sha1(...)",
  "created_at": "2026-03-23T01:13:09Z"
}

segment_seq + delta_hash is your anti-dup backbone.

Go template: consume, persist, checkpoint

This template does three things:

  • persists every delta to Redis (replayable),
  • deduplicates in-memory (cheap),
  • supports checkpoint-based recovery.
package stream

import (
	"context"
	"crypto/sha1"
	"encoding/hex"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

type DeltaEvent struct {
	ResponseID string
	SegmentSeq int64
	ChunkID    string
	Text       string
	At         time.Time
}

type CheckpointStore interface {
	Save(ctx context.Context, responseID string, seq int64, resumeToken string) error
	Load(ctx context.Context, responseID string) (seq int64, resumeToken string, err error)
}

type Deduper struct {
	seen map[string]struct{}
}

func NewDeduper() *Deduper { return &Deduper{seen: make(map[string]struct{}, 4096)} }

func (d *Deduper) SeenOrSet(seq int64, text string) bool {
	h := sha1.Sum([]byte(fmt.Sprintf("%d|%s", seq, text)))
	k := hex.EncodeToString(h[:])
	if _, ok := d.seen[k]; ok {
		return true
	}
	d.seen[k] = struct{}{}
	return false
}

func PersistDelta(ctx context.Context, rdb *redis.Client, ev DeltaEvent) error {
	key := fmt.Sprintf("resp:%s:deltas", ev.ResponseID)
	return rdb.XAdd(ctx, &redis.XAddArgs{
		Stream: key,
		Values: map[string]any{
			"response_id": ev.ResponseID,
			"segment_seq": ev.SegmentSeq,
			"chunk_id":    ev.ChunkID,
			"delta_text":  ev.Text,
			"ts":          ev.At.UnixMilli(),
		},
	}).Err()
}

Resume token usage without foot-guns

Recommended policy:

  1. checkpoint every N events (N=20) or every 1 second.
  2. checkpoint fields: last_seq, resume_token, updated_at.
  3. on reconnect, request only events after last_seq + 1.

Pseudo logic:

on reconnect:
  cp = loadCheckpoint(response_id)
  stream = openStream(resume_token=cp.resume_token)
  for delta in stream:
     if delta.seq <= cp.last_seq: skip
     if dedupe(delta.seq, delta.text): skip
     append(delta)
     if should_checkpoint(): save(delta.seq, delta.resume_token)

Do not recover by wall-clock time; clock skew will burn you.

Dedup strategy: text equality is not enough

If you dedup only by delta_text, you will misfire on tiny repeated fragments (".", "\n", etc.).

Safer keys:

  • primary: response_id + segment_seq
  • fallback: response_id + delta_hash(text)

Suggested SQL constraints:

create table if not exists response_deltas (
  response_id text not null,
  segment_seq bigint not null,
  delta_hash  text not null,
  delta_text  text not null,
  created_at  timestamptz not null default now(),
  primary key (response_id, segment_seq)
);

create unique index if not exists uq_response_hash
on response_deltas(response_id, delta_hash);

Timeout budget alignment

Many “stream failures” are timeout mismatches:

  • API timeout: 45s
  • upstream gateway timeout: 30s
  • client read timeout: 20s

Result: client dies first, server still runs, reconnect duplicates output.

Conservative baseline:

  • client read timeout >= 90s
  • keepalive on
  • gateway idle timeout >= client timeout + 15s
  • generation budget controlled separately from socket timeout

Failure drills before production

  1. 3s network break: verify resume without duplicate output.
  2. forced 502 from proxy: verify reconnect + checkpoint correctness.
  3. two workers recover same response_id: verify uniqueness constraints block duplicates.
  4. temporary Redis outage: verify memory fallback + later backfill.

Useful macOS network chaos drill:

# 25% packet loss + 200ms delay (remember to reset)
sudo dnctl pipe 1 config delay 200ms plr 0.25
sudo pfctl -E
printf "dummynet out quick proto tcp from any to any pipe 1\n" | sudo pfctl -f -

# reset
sudo pfctl -d
sudo dnctl -q flush

Metrics that actually matter

Track at least these six:

  • stream_resume_success_rate
  • stream_duplicate_drop_count
  • checkpoint_lag_ms
  • delta_persist_p99_ms
  • resume_from_stale_checkpoint_count
  • same_response_multi_worker_conflict

Threshold examples:

  • resume success < 99% => page the team
  • duplicate drop rate > 3% => investigate replay path
  • checkpoint lag p99 > 1500ms => scale persistence path

Summary

This design optimizes for explainable stability:

  • checkpoint makes reconnection deterministic,
  • uniqueness constraints block duplicate replay,
  • metrics make failures diagnosable.

Get recovery correctness first. Then chase lower latency.