跳到主要内容

SSE(Server-Sent Events)

1. SSE 总体介绍

SSE 是一种基于 HTTP 的单向实时推送技术,服务端可以持续向客户端发送事件流,客户端通过 EventSourcefetch 读取流式响应来接收数据。

核心功能

  1. 单向实时推送:服务端持续推送,客户端被动接收。
  2. 自动重连:断线后浏览器会自动重连,或由前端自定义重连策略。
  3. 轻量协议:基于纯文本流 text/event-stream,部署和排障成本低。
  4. 简化跨网关:相比 WebSocket,通常更容易穿透代理和负载均衡。

关键知识点

  1. 响应头:Content-Type: text/event-stream; charset=utf-8
  2. 事件格式:每个事件由多行组成,以空行结尾。
  3. 事件字段:eventdataidretry
  4. 心跳保活:用注释行 : 或空事件保持连接。
  5. 重连续传:使用 Last-Event-ID 或自定义 token 缓存策略。

常用场景

  1. AI 对话流式输出。
  2. 实时日志与监控告警。
  3. 任务进度通知与状态更新。
  4. 股票/行情推送。
  5. 后台运营事件流、审核进度等。

2. 原生 EventSource 前端接入

说明:EventSource 原生支持自动重连,但不支持自定义请求头。鉴权常用 Cookie 或 URL token。

import { useEffect, useRef, useState } from "react";

type SseState = {
buffer: string;
lastEventId: string;
retryCount: number;
maxRetry: number;
heartbeatTimeoutMs: number;
};

export function SseEventSourceDemo() {
const [text, setText] = useState("");
const [isStopped, setIsStopped] = useState(false);
const esRef = useRef<EventSource | null>(null);
const heartbeatTimerRef = useRef<number | null>(null);
const stopRef = useRef(false);

const stateRef = useRef<SseState>({
buffer: "",
lastEventId: localStorage.getItem("sse:lastEventId") || "",
retryCount: 0,
maxRetry: 5,
heartbeatTimeoutMs: 15000,
});

function typeAppend(chunk: string) {
let i = 0;
const timer = window.setInterval(() => {
setText((prev) => prev + chunk[i]);
i += 1;
if (i >= chunk.length) window.clearInterval(timer);
}, 20);
}

function resetHeartbeat() {
if (heartbeatTimerRef.current) {
window.clearTimeout(heartbeatTimerRef.current);
}
heartbeatTimerRef.current = window.setTimeout(() => {
if (stopRef.current) return;
safeClose();
reconnect();
}, stateRef.current.heartbeatTimeoutMs);
}

function safeClose() {
if (esRef.current) {
esRef.current.close();
esRef.current = null;
}
}

function connect() {
const url = new URL("/api/chat/stream", window.location.origin);
if (stateRef.current.lastEventId) {
url.searchParams.set("lastEventId", stateRef.current.lastEventId);
}
const es = new EventSource(url.toString(), { withCredentials: true });
esRef.current = es;

es.onopen = () => {
stateRef.current.retryCount = 0;
resetHeartbeat();
};

es.onmessage = (e) => {
if (stopRef.current) return;
stateRef.current.buffer += e.data;
localStorage.setItem("sse:buffer", stateRef.current.buffer);
if (e.lastEventId) {
stateRef.current.lastEventId = e.lastEventId;
localStorage.setItem("sse:lastEventId", stateRef.current.lastEventId);
}
typeAppend(e.data);
resetHeartbeat();
};

es.addEventListener("heartbeat", () => {
if (stopRef.current) return;
resetHeartbeat();
});

es.onerror = () => {
if (stopRef.current) return;
safeClose();
reconnect();
};
}

function reconnect() {
if (stopRef.current) return;
const { retryCount, maxRetry } = stateRef.current;
if (retryCount >= maxRetry) return;
const backoff = Math.min(1000 * Math.pow(2, retryCount), 8000);
stateRef.current.retryCount += 1;
window.setTimeout(connect, backoff);
}

useEffect(() => {
connect();
return () => {
stopRef.current = true;
safeClose();
if (heartbeatTimerRef.current) {
window.clearTimeout(heartbeatTimerRef.current);
}
};
}, []);

function handleStop() {
stopRef.current = true;
setIsStopped(true);
safeClose();
if (heartbeatTimerRef.current) {
window.clearTimeout(heartbeatTimerRef.current);
}
}

return (
<div>
<button type="button" onClick={handleStop} disabled={isStopped}>
Stop
</button>
<pre>{text}</pre>
</div>
);
}

EventSource 重点:

  1. 断线重连:浏览器会自动重连,但仍建议加自定义重试策略。
  2. 心跳:服务端定期发送 event: heartbeat: ping
  3. token 缓存:保存 lastEventId 和已拼接文本。

3. fetch stream 前端接入

说明:fetch 可以读取 ReadableStream,可自定义请求头,但需要自己解析 SSE 格式。

import { useEffect, useRef, useState } from "react";

type SseEvent = {
event: string;
data: string;
id: string;
};

type FetchState = {
buffer: string;
lastEventId: string;
retryCount: number;
maxRetry: number;
heartbeatTimeoutMs: number;
};

export function SseFetchStreamDemo() {
const [text, setText] = useState("");
const [isStopped, setIsStopped] = useState(false);
const heartbeatTimerRef = useRef<number | null>(null);
const abortRef = useRef<AbortController | null>(null);
const stopRef = useRef(false);

const stateRef = useRef<FetchState>({
buffer: localStorage.getItem("sse:buffer") || "",
lastEventId: localStorage.getItem("sse:lastEventId") || "",
retryCount: 0,
maxRetry: 5,
heartbeatTimeoutMs: 15000,
});

function typeAppend(chunk: string) {
let i = 0;
const timer = window.setInterval(() => {
setText((prev) => prev + chunk[i]);
i += 1;
if (i >= chunk.length) window.clearInterval(timer);
}, 20);
}

function resetHeartbeat() {
if (heartbeatTimerRef.current) {
window.clearTimeout(heartbeatTimerRef.current);
}
heartbeatTimerRef.current = window.setTimeout(() => {
if (stopRef.current) return;
if (abortRef.current) abortRef.current.abort();
reconnect();
}, stateRef.current.heartbeatTimeoutMs);
}

function parseSSE(chunk: string): { events: SseEvent[]; tail: string } {
const events: SseEvent[] = [];
const parts = chunk.split("\n\n");
const tail = parts.pop() || "";

for (const part of parts) {
const lines = part.split("\n");
const evt: SseEvent = { event: "message", data: "", id: "" };
for (const line of lines) {
if (line.startsWith("event:")) evt.event = line.slice(6).trim();
if (line.startsWith("data:")) evt.data += line.slice(5).trim();
if (line.startsWith("id:")) evt.id = line.slice(3).trim();
if (line.startsWith(":")) {
evt.event = "heartbeat";
evt.data = "";
}
}
events.push(evt);
}
return { events, tail };
}

async function connect() {
if (stopRef.current) return;
const url = new URL("/api/chat/stream", window.location.origin);
if (stateRef.current.lastEventId) {
url.searchParams.set("lastEventId", stateRef.current.lastEventId);
}

const controller = new AbortController();
abortRef.current = controller;

const res = await fetch(url.toString(), {
method: "GET",
headers: {
Authorization: "Bearer YOUR_TOKEN",
},
signal: controller.signal,
});

if (!res.ok || !res.body) throw new Error("SSE stream failed");

const reader = res.body.getReader();
const decoder = new TextDecoder("utf-8");
let buffer = "";

resetHeartbeat();
stateRef.current.retryCount = 0;

while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });

const parsed = parseSSE(buffer);
buffer = parsed.tail;

for (const evt of parsed.events) {
if (stopRef.current) return;
if (evt.event === "heartbeat") {
resetHeartbeat();
continue;
}
if (evt.id) {
stateRef.current.lastEventId = evt.id;
localStorage.setItem("sse:lastEventId", stateRef.current.lastEventId);
}
stateRef.current.buffer += evt.data;
localStorage.setItem("sse:buffer", stateRef.current.buffer);
typeAppend(evt.data);
resetHeartbeat();
}
}
}

function reconnect() {
if (stopRef.current) return;
const { retryCount, maxRetry } = stateRef.current;
if (retryCount >= maxRetry) return;
const backoff = Math.min(1000 * Math.pow(2, retryCount), 8000);
stateRef.current.retryCount += 1;
window.setTimeout(() => {
connect().catch(() => reconnect());
}, backoff);
}

useEffect(() => {
connect().catch(() => reconnect());
return () => {
stopRef.current = true;
if (abortRef.current) abortRef.current.abort();
if (heartbeatTimerRef.current) {
window.clearTimeout(heartbeatTimerRef.current);
}
};
}, []);

function handleStop() {
stopRef.current = true;
setIsStopped(true);
if (abortRef.current) abortRef.current.abort();
if (heartbeatTimerRef.current) {
window.clearTimeout(heartbeatTimerRef.current);
}
}

return (
<div>
<button type="button" onClick={handleStop} disabled={isStopped}>
Stop
</button>
<pre>{text}</pre>
</div>
);
}

fetch stream 重点:

  1. 支持自定义 Header(适合 token 鉴权)。
  2. 需要手动解析 SSE 协议。
  3. 断线重连与心跳超时必须自己处理。

4. Node 后端完整实现

下面示例基于 Express,包含:

  1. Last-Event-ID / lastEventId 查询参数续传。
  2. 心跳保活。
  3. 简单的 token 缓存与断线续传逻辑(内存示例,生产建议换持久化存储)。
import express from "express";
import type { Request, Response } from "express";

type TokenEvent = {
id: string;
data: string;
};

const app = express();

// 简单内存缓存:按会话缓存已发送 token
const conversationCache = new Map<string, TokenEvent[]>();

function writeEvent(res: Response, evt: TokenEvent, eventName = "message") {
res.write(`id: ${evt.id}\n`);
res.write(`event: ${eventName}\n`);
res.write(`data: ${evt.data}\n\n`);
}

function writeHeartbeat(res: Response) {
res.write(`event: heartbeat\ndata: ok\n\n`);
}

app.get("/api/chat/stream", async (req: Request, res: Response) => {
res.set({
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
});
res.flushHeaders();

const conversationId = String(req.query.cid || "default");
const lastEventId =
String(req.headers["last-event-id"] || "") ||
String(req.query.lastEventId || "");

// 断线续传:补发 lastEventId 之后的事件
const cached = conversationCache.get(conversationId) || [];
if (lastEventId) {
const lastIndex = cached.findIndex((e) => e.id === lastEventId);
const replay = lastIndex >= 0 ? cached.slice(lastIndex + 1) : cached;
for (const evt of replay) writeEvent(res, evt);
}

// 心跳定时器
const heartbeatTimer = setInterval(() => {
writeHeartbeat(res);
}, 10000);

// 模拟 AI token 流
let counter = cached.length;
const tokens = ["你好,", "这是 ", "SSE ", "流式 ", "输出。"];

const streamTimer = setInterval(() => {
if (counter >= tokens.length) {
clearInterval(streamTimer);
res.write(`event: done\ndata: ok\n\n`);
res.end();
return;
}
const evt = { id: String(counter + 1), data: tokens[counter] };
const list = conversationCache.get(conversationId) || [];
list.push(evt);
conversationCache.set(conversationId, list);
writeEvent(res, evt);
counter += 1;
}, 700);

req.on("close", () => {
clearInterval(streamTimer);
clearInterval(heartbeatTimer);
});
});

app.listen(3000);

Nginx 配置示例(关闭缓冲)

location /api/chat/stream {
proxy_pass http://127.0.0.1:3000;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
chunked_transfer_encoding off;
}

后端要点:

  1. 必须设置 text/event-stream 并关闭缓冲。
  2. Last-Event-ID 续传可以用 header 或 query。
  3. 心跳避免中间设备断连。
  4. 生产环境建议持久化 token(如 Redis)。

5. 用户主动中断会话的处理

前端:

  1. 用户点击“停止生成”时,立即 close()(EventSource)或 abort()(fetch)。
  2. 设置 stopRef,避免触发自动重连与重试。
  3. 保留已接收内容与 lastEventId,方便用户“继续生成”或重连。

后端:

  1. req.on("close") 会触发,及时清理定时器与流式资源。
  2. 对于 AI 流式接口,建议在 close 时中断上游流,避免资源浪费。

6. SSE 与 WebSocket 对比

维度SSEWebSocket
方向与能力单向(服务端 -> 客户端)双向(客户端 <-> 服务端)
协议与部署基于 HTTP,穿透代理更容易需要协议升级,网关配置更复杂
重连与可靠性浏览器原生自动重连,支持 Last-Event-ID需自行实现重连与状态恢复
浏览器支持支持良好,但不支持自定义请求头支持广泛,可自定义鉴权策略
适用场景AI 流式输出、日志流、进度通知聊天、协作编辑、游戏控制