SSE(Server-Sent Events)
1. SSE 总体介绍
SSE 是一种基于 HTTP 的单向实时推送技术,服务端可以持续向客户端发送事件流,客户端通过 EventSource 或 fetch 读取流式响应来接收数据。
核心功能
- 单向实时推送:服务端持续推送,客户端被动接收。
- 自动重连:断线后浏览器会自动重连,或由前端自定义重连策略。
- 轻量协议:基于纯文本流
text/event-stream,部署和排障成本低。 - 简化跨网关:相比 WebSocket,通常更容易穿透代理和负载均衡。
关键知识点
- 响应头:
Content-Type: text/event-stream; charset=utf-8。 - 事件格式: 每个事件由多行组成,以空行结尾。
- 事件字段:
event、data、id、retry。 - 心跳保活:用注释行
:或空事件保持连接。 - 重连续传:使用
Last-Event-ID或自定义 token 缓存策略。
常用场景
- AI 对话流式输出。
- 实时日志与监控告警。
- 任务进度通知与状态更新。
- 股票/行情推送。
- 后台运营事件流、审核进度等。
2. 原生 EventSource 前端接入
说明:EventSource 原生支持自动重连,但不支持自定义请求头。鉴权常用 Cookie 或 URL token。
import { useEffect, useRef, useState } from "react";
type SseState = {
buffer: string;
lastEventId: string;
retryCount: number;
maxRetry: number;
heartbeatTimeoutMs: number;
};
export function SseEventSourceDemo() {
const [text, setText] = useState("");
const [isStopped, setIsStopped] = useState(false);
const esRef = useRef<EventSource | null>(null);
const heartbeatTimerRef = useRef<number | null>(null);
const stopRef = useRef(false);
const stateRef = useRef<SseState>({
buffer: "",
lastEventId: localStorage.getItem("sse:lastEventId") || "",
retryCount: 0,
maxRetry: 5,
heartbeatTimeoutMs: 15000,
});
function typeAppend(chunk: string) {
let i = 0;
const timer = window.setInterval(() => {
setText((prev) => prev + chunk[i]);
i += 1;
if (i >= chunk.length) window.clearInterval(timer);
}, 20);
}
function resetHeartbeat() {
if (heartbeatTimerRef.current) {
window.clearTimeout(heartbeatTimerRef.current);
}
heartbeatTimerRef.current = window.setTimeout(() => {
if (stopRef.current) return;
safeClose();
reconnect();
}, stateRef.current.heartbeatTimeoutMs);
}
function safeClose() {
if (esRef.current) {
esRef.current.close();
esRef.current = null;
}
}
function connect() {
const url = new URL("/api/chat/stream", window.location.origin);
if (stateRef.current.lastEventId) {
url.searchParams.set("lastEventId", stateRef.current.lastEventId);
}
const es = new EventSource(url.toString(), { withCredentials: true });
esRef.current = es;
es.onopen = () => {
stateRef.current.retryCount = 0;
resetHeartbeat();
};
es.onmessage = (e) => {
if (stopRef.current) return;
stateRef.current.buffer += e.data;
localStorage.setItem("sse:buffer", stateRef.current.buffer);
if (e.lastEventId) {
stateRef.current.lastEventId = e.lastEventId;
localStorage.setItem("sse:lastEventId", stateRef.current.lastEventId);
}
typeAppend(e.data);
resetHeartbeat();
};
es.addEventListener("heartbeat", () => {
if (stopRef.current) return;
resetHeartbeat();
});
es.onerror = () => {
if (stopRef.current) return;
safeClose();
reconnect();
};
}
function reconnect() {
if (stopRef.current) return;
const { retryCount, maxRetry } = stateRef.current;
if (retryCount >= maxRetry) return;
const backoff = Math.min(1000 * Math.pow(2, retryCount), 8000);
stateRef.current.retryCount += 1;
window.setTimeout(connect, backoff);
}
useEffect(() => {
connect();
return () => {
stopRef.current = true;
safeClose();
if (heartbeatTimerRef.current) {
window.clearTimeout(heartbeatTimerRef.current);
}
};
}, []);
function handleStop() {
stopRef.current = true;
setIsStopped(true);
safeClose();
if (heartbeatTimerRef.current) {
window.clearTimeout(heartbeatTimerRef.current);
}
}
return (
<div>
<button type="button" onClick={handleStop} disabled={isStopped}>
Stop
</button>
<pre>{text}</pre>
</div>
);
}
EventSource 重点:
- 断线重连:浏览器会自动重连,但仍建议加自定义重试策略。
- 心跳:服务端定期发送
event: heartbeat或: ping。 - token 缓存:保存
lastEventId和已拼接文本。
3. fetch stream 前端接入
说明:fetch 可以读取 ReadableStream,可自定义请求头,但需要自己解析 SSE 格式。
import { useEffect, useRef, useState } from "react";
type SseEvent = {
event: string;
data: string;
id: string;
};
type FetchState = {
buffer: string;
lastEventId: string;
retryCount: number;
maxRetry: number;
heartbeatTimeoutMs: number;
};
export function SseFetchStreamDemo() {
const [text, setText] = useState("");
const [isStopped, setIsStopped] = useState(false);
const heartbeatTimerRef = useRef<number | null>(null);
const abortRef = useRef<AbortController | null>(null);
const stopRef = useRef(false);
const stateRef = useRef<FetchState>({
buffer: localStorage.getItem("sse:buffer") || "",
lastEventId: localStorage.getItem("sse:lastEventId") || "",
retryCount: 0,
maxRetry: 5,
heartbeatTimeoutMs: 15000,
});
function typeAppend(chunk: string) {
let i = 0;
const timer = window.setInterval(() => {
setText((prev) => prev + chunk[i]);
i += 1;
if (i >= chunk.length) window.clearInterval(timer);
}, 20);
}
function resetHeartbeat() {
if (heartbeatTimerRef.current) {
window.clearTimeout(heartbeatTimerRef.current);
}
heartbeatTimerRef.current = window.setTimeout(() => {
if (stopRef.current) return;
if (abortRef.current) abortRef.current.abort();
reconnect();
}, stateRef.current.heartbeatTimeoutMs);
}
function parseSSE(chunk: string): { events: SseEvent[]; tail: string } {
const events: SseEvent[] = [];
const parts = chunk.split("\n\n");
const tail = parts.pop() || "";
for (const part of parts) {
const lines = part.split("\n");
const evt: SseEvent = { event: "message", data: "", id: "" };
for (const line of lines) {
if (line.startsWith("event:")) evt.event = line.slice(6).trim();
if (line.startsWith("data:")) evt.data += line.slice(5).trim();
if (line.startsWith("id:")) evt.id = line.slice(3).trim();
if (line.startsWith(":")) {
evt.event = "heartbeat";
evt.data = "";
}
}
events.push(evt);
}
return { events, tail };
}
async function connect() {
if (stopRef.current) return;
const url = new URL("/api/chat/stream", window.location.origin);
if (stateRef.current.lastEventId) {
url.searchParams.set("lastEventId", stateRef.current.lastEventId);
}
const controller = new AbortController();
abortRef.current = controller;
const res = await fetch(url.toString(), {
method: "GET",
headers: {
Authorization: "Bearer YOUR_TOKEN",
},
signal: controller.signal,
});
if (!res.ok || !res.body) throw new Error("SSE stream failed");
const reader = res.body.getReader();
const decoder = new TextDecoder("utf-8");
let buffer = "";
resetHeartbeat();
stateRef.current.retryCount = 0;
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const parsed = parseSSE(buffer);
buffer = parsed.tail;
for (const evt of parsed.events) {
if (stopRef.current) return;
if (evt.event === "heartbeat") {
resetHeartbeat();
continue;
}
if (evt.id) {
stateRef.current.lastEventId = evt.id;
localStorage.setItem("sse:lastEventId", stateRef.current.lastEventId);
}
stateRef.current.buffer += evt.data;
localStorage.setItem("sse:buffer", stateRef.current.buffer);
typeAppend(evt.data);
resetHeartbeat();
}
}
}
function reconnect() {
if (stopRef.current) return;
const { retryCount, maxRetry } = stateRef.current;
if (retryCount >= maxRetry) return;
const backoff = Math.min(1000 * Math.pow(2, retryCount), 8000);
stateRef.current.retryCount += 1;
window.setTimeout(() => {
connect().catch(() => reconnect());
}, backoff);
}
useEffect(() => {
connect().catch(() => reconnect());
return () => {
stopRef.current = true;
if (abortRef.current) abortRef.current.abort();
if (heartbeatTimerRef.current) {
window.clearTimeout(heartbeatTimerRef.current);
}
};
}, []);
function handleStop() {
stopRef.current = true;
setIsStopped(true);
if (abortRef.current) abortRef.current.abort();
if (heartbeatTimerRef.current) {
window.clearTimeout(heartbeatTimerRef.current);
}
}
return (
<div>
<button type="button" onClick={handleStop} disabled={isStopped}>
Stop
</button>
<pre>{text}</pre>
</div>
);
}
fetch stream 重点:
- 支持自定义 Header(适合 token 鉴权)。
- 需要手动解析 SSE 协议。
- 断线重连与心跳超时必须自己处理。
4. Node 后端完整实现
下面示例基于 Express,包含:
Last-Event-ID/lastEventId查询参数续传。- 心跳保活。
- 简单的 token 缓存与断线续传逻辑(内存示例,生产建议换持久化存储)。
import express from "express";
import type { Request, Response } from "express";
type TokenEvent = {
id: string;
data: string;
};
const app = express();
// 简单内存缓存:按会话缓存已发送 token
const conversationCache = new Map<string, TokenEvent[]>();
function writeEvent(res: Response, evt: TokenEvent, eventName = "message") {
res.write(`id: ${evt.id}\n`);
res.write(`event: ${eventName}\n`);
res.write(`data: ${evt.data}\n\n`);
}
function writeHeartbeat(res: Response) {
res.write(`event: heartbeat\ndata: ok\n\n`);
}
app.get("/api/chat/stream", async (req: Request, res: Response) => {
res.set({
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
});
res.flushHeaders();
const conversationId = String(req.query.cid || "default");
const lastEventId =
String(req.headers["last-event-id"] || "") ||
String(req.query.lastEventId || "");
// 断线续传:补发 lastEventId 之后的事件
const cached = conversationCache.get(conversationId) || [];
if (lastEventId) {
const lastIndex = cached.findIndex((e) => e.id === lastEventId);
const replay = lastIndex >= 0 ? cached.slice(lastIndex + 1) : cached;
for (const evt of replay) writeEvent(res, evt);
}
// 心跳定时器
const heartbeatTimer = setInterval(() => {
writeHeartbeat(res);
}, 10000);
// 模拟 AI token 流
let counter = cached.length;
const tokens = ["你好,", "这是 ", "SSE ", "流式 ", "输出。"];
const streamTimer = setInterval(() => {
if (counter >= tokens.length) {
clearInterval(streamTimer);
res.write(`event: done\ndata: ok\n\n`);
res.end();
return;
}
const evt = { id: String(counter + 1), data: tokens[counter] };
const list = conversationCache.get(conversationId) || [];
list.push(evt);
conversationCache.set(conversationId, list);
writeEvent(res, evt);
counter += 1;
}, 700);
req.on("close", () => {
clearInterval(streamTimer);
clearInterval(heartbeatTimer);
});
});
app.listen(3000);
Nginx 配置示例(关闭缓冲)
location /api/chat/stream {
proxy_pass http://127.0.0.1:3000;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
chunked_transfer_encoding off;
}
后端要点:
- 必须设置
text/event-stream并关闭缓冲。 Last-Event-ID续传可以用 header 或 query。- 心跳避免中间设备断连。
- 生产环境建议持久化 token(如 Redis)。
5. 用户主动中断会话的处理
前端:
- 用户点击“ 停止生成”时,立即
close()(EventSource)或abort()(fetch)。 - 设置
stopRef,避免触发自动重连与重试。 - 保留已接收内容与
lastEventId,方便用户“继续生成”或重连。
后端:
req.on("close")会触发,及时清理定时器与流式资源。- 对于 AI 流式接口,建议在 close 时中断上游流,避免资源浪费。
6. SSE 与 WebSocket 对比
| 维度 | SSE | WebSocket |
|---|---|---|
| 方向与能力 | 单向(服务端 -> 客户端) | 双向(客户端 <-> 服务端) |
| 协议与部署 | 基于 HTTP,穿透代理更容易 | 需要协议升级,网关配置更复杂 |
| 重连与可靠性 | 浏览器原生自动重连,支持 Last-Event-ID | 需自行实现重连与状态恢复 |
| 浏览器支持 | 支持良好,但不支持自定义请求头 | 支持广泛,可自定义鉴权策略 |
| 适用场景 | AI 流式输出、日志流、进度通知 | 聊天、协作编辑、游戏控制 |