跳到主要内容

RAG Agent 代码实现示例

如果说 Tool-Using Agent 代码实现示例 更偏回答:

一个会调工具的 Agent,最小代码骨架应该怎么搭

Research Agent 代码实现示例 更偏回答:

一个会持续补资料、整理研究结论的 Agent,循环应该怎么组织

那么这篇文档要解决的是更贴近真实知识系统的一个问题:

当任务既依赖私有知识检索,又需要多步判断、补充检索、读取细节和最终有依据输出时,RAG Agent 的代码到底该怎么写?

很多人第一次做 RAG Agent 时,概念上已经知道这些词:

  • retrieval
  • rerank
  • tool use
  • state
  • decision loop
  • grounded answer

但一落到工程实现,还是会卡在几个很实际的问题上:

  • 普通 RAGRAG Agent 的边界到底在哪里
  • 第一轮检索以后,系统怎么判断“还要不要继续查”
  • retrieval / rerank / read / synthesis 应该怎么分层
  • state 里到底要存哪些东西,后面才不会乱
  • 怎样让最终答案不是“看起来像总结”,而是“真的有依据”
  • 日志和最小评估要记录什么,后面才知道系统到底有没有变好

所以这篇文档不讲大而全框架,也不讲复杂多 Agent 编排,而是聚焦一个很实用的目标:

给你一个足够完整、但仍然轻量的 TypeScript 伪代码骨架,帮助你把 RAG、tool use、state 和多步决策 loop 接成一个真正能落地的系统。

1. 什么任务适合做 RAG Agent

不是所有知识问答都要做成 RAG Agent

如果任务只是:

  • 单轮问答
  • 目标非常明确
  • 第一轮检索通常就够
  • 不需要额外工具
  • 结果主要是“找到相关段落并组织回答”

那普通 RAG 往往已经够用了。

更适合 RAG Agent 的,是下面这类任务:

  • 需要围绕一个目标做多轮信息补全
  • 第一批检索结果通常不够,或者彼此冲突
  • 需要在“继续检索”和“开始总结”之间做判断
  • 需要混合多种工具,而不只是向量检索
  • 最终输出必须能说明依据、缺口和不确定性

比如:

  • 企业内部助手:从产品文档、会议纪要、历史事故复盘里整理某功能的约束和风险
  • 合规知识助手:围绕一个问题持续检索政策、制度、执行记录,再给出带出处的结论
  • 技术调研助手:先搜内部 ADR,再读设计文档,再查接口定义,最后形成方案判断
  • 客服升级助手:先查知识库,再读工单历史,再检索相关变更记录,最后判断是否需要人工升级

这些任务的共性不是“知识量大”,而是:

系统必须围绕一个目标,判断下一步该查什么、该不该继续查,以及什么时候可以收束。

2. RAG Agent 和普通 RAG 的区别

很多团队第一版系统其实做的是:

用户问题 -> 检索 top-k -> 拼上下文 -> 一次生成答案

这就是典型的普通 RAG

它的优点是:

  • 简单
  • 成本低
  • 延迟可控
  • 很适合 FAQ、知识问答、说明型场景

但它的天然限制也很明显:

  • 检索 query 往往只有一次
  • 没有显式状态来记录缺口和冲突
  • 不会主动决定“需要再查一次”
  • 也不会主动读某篇命中的长文档细节

RAG Agent 则更像下面这条循环:

目标 -> 检索 -> 判断证据是否足够 -> 必要时改写 query / 调其他工具 / 深读文档 -> 更新 state -> 继续或停止 -> 形成有依据输出

所以它和普通 RAG 的核心区别不是“有没有向量库”,而是:

  • 普通 RAG:检索是一次性上下文准备
  • RAG Agent:检索是决策循环里的一步动作

再说得更具体一点:

  • 普通 RAG 主要在解决“把相关内容拿给模型看”
  • RAG Agent 主要在解决“围绕目标,持续判断信息是否足够”

3. 最小架构应该长什么样

第一版 RAG Agent 不需要很重。

一个够用的最小架构,建议先拆成 6 层:

  1. Goal Understanding
  2. Decision Loop
  3. Retrieval Pipeline
  4. Reading / Tool Layer
  5. State Store
  6. Final Synthesis

可以先把它理解成下面这张图:

这里最重要的是:

  • Retrieve 负责广撒网
  • Rerank 负责把有限注意力给更相关的候选
  • Read 负责在需要时做深入阅读
  • Decision Loop 负责决定下一步是继续查、改查法,还是停下来总结
  • State 负责让系统记住自己已经查过什么、确认了什么、还缺什么

第一版不要把所有事都塞进一个 prompt。

只要你把“动作选择”和“证据累积”这两个职责拆出来,系统就已经比单轮 RAG 稳定很多了。

4. retrieval / rerank / read / synthesis 到底是什么关系

这 4 个词经常被一起提,但它们解决的是不同问题。

4.1 Retrieval:先把候选找出来

retrieval 的目标不是直接给最终答案,而是尽量不要漏掉相关候选。

它更像一个高召回阶段,常见做法包括:

  • 向量检索
  • BM25 / keyword 检索
  • hybrid search
  • 基于 metadata 的过滤

这一层的关键不是“最准”,而是:

先把可能有用的材料找进来。

4.2 Rerank:把有限阅读预算花在更值的地方

检索回来的 top-k 往往很杂。

rerank 的作用是:

  • 更贴近当前子问题排序
  • 压掉表面相关、实则无关的片段
  • 帮系统决定先读谁

如果你不做 rerank,Agent 也能跑,但当知识库稍微复杂一些时,就很容易出现:

  • 明明召回到了关键片段,却没被真正读到
  • 上下文被次优材料占满
  • 最终结论建立在边缘信息上

4.3 Read:命中片段不等于真正读懂文档

很多第一版 RAG 会直接把 chunk 拼给模型,然后希望模型自己补全一切。

问题是,chunk 常常只够告诉你:

  • 这份文档可能相关
  • 某个说法可能存在

但还不够支撑:

  • 具体前提是什么
  • 结论适用范围是什么
  • 同一文档里还有没有补充条件或例外

所以 read 阶段通常要做的是:

  • 读取命中文档的相邻片段
  • 拉取章节级上下文
  • 读取文档摘要或全文
  • 必要时调其他结构化工具补细节

也就是说:

retrieval 告诉你该看哪里,read 才真正帮你把证据读完整。

4.4 Synthesis:不是“复述检索结果”,而是“基于证据形成输出”

synthesis 的目标不是把搜到的内容重新写一遍,而是把证据组织成:

  • 结论
  • 支撑依据
  • 未解决缺口
  • 不确定性说明

如果没有前面的结构化证据,synthesis 很容易退化成“流畅但无依据”的总结。

所以这 4 层更准确的关系是:

retrieval 负责找候选
rerank 负责排序优先级
read 负责补足上下文
synthesis 负责基于证据收束输出

5. state 应该怎么设计

RAG Agentstate 往往决定了系统能不能稳定进化。

因为这类系统最怕的不是没搜到,而是:

  • 搜过什么自己忘了
  • 哪些结论已经有依据,哪些还只是猜测,说不清
  • 缺口没有被显式记录
  • 最后模型把“部分信息”说成“完整结论”

第一版比较推荐的做法是,让 state 明确覆盖 5 类信息:

  • 当前任务目标
  • 当前检索计划和子问题
  • 已沉淀证据
  • 信息缺口和冲突
  • 执行轨迹与停止条件

下面是一套比较实用的状态结构:

type RetrievalQuery = {
id: string;
text: string;
intent: "broad_recall" | "targeted_followup" | "conflict_check";
createdAtStep: number;
};

type Evidence = {
id: string;
question: string;
claim: string;
answerSpan: string;
sourceId: string;
sourceTitle: string;
chunkId: string;
score: number;
grounded: boolean;
notes?: string;
};

type Gap = {
question: string;
reason:
| "no_relevant_results"
| "evidence_too_weak"
| "source_conflict"
| "need_more_context";
suggestedAction: "retrieve_more" | "read_doc" | "use_other_tool" | "stop_with_uncertainty";
};

type ActionRecord = {
step: number;
action:
| "retrieve"
| "rerank"
| "read_chunk"
| "read_document"
| "use_tool"
| "synthesize"
| "stop";
inputSummary: string;
outputSummary: string;
latencyMs: number;
};

type RagAgentState = {
userGoal: string;
normalizedTask: string;
subQuestions: string[];
pendingQueries: RetrievalQuery[];
evidence: Evidence[];
gaps: Gap[];
answeredQuestions: string[];
visitedSourceIds: string[];
actionHistory: ActionRecord[];
step: number;
maxSteps: number;
done: boolean;
stopReason?:
| "enough_evidence"
| "max_steps_reached"
| "no_better_action"
| "insufficient_grounding";
};

这套结构的重点不是“字段很多”,而是每个字段都直接服务于循环决策:

  • subQuestions:把大问题拆成可检索、可验证的子问题
  • pendingQueries:记录接下来准备怎么查,而不是每轮临时想到什么查什么
  • evidence:存可引用的证据单元,而不是只存原始文本
  • gaps:显式记录为什么还不能停
  • visitedSourceIds:避免反复读同一份材料
  • stopReason:让系统停得可解释,而不是“感觉差不多了”

6. evidence / grounding 结构为什么要单独设计

很多系统做完检索后,只是在内存里保留一堆 chunk 文本。

这会带来一个很实际的问题:

模型能看见文本,不等于系统拥有了“可引用的依据”。

RAG Agent 来说,更稳的做法是显式维护 evidencegrounding 结构。

一个最小的 grounded evidence,至少要回答这些问题:

  • 它在支持哪个子问题
  • 它具体支持了什么 claim
  • 来自哪份来源
  • 对应原文的哪一段
  • 这是强依据还是弱依据

例如:

type Grounding = {
claimId: string;
sourceId: string;
sourceTitle: string;
sourceUrl?: string;
chunkId: string;
quote: string;
relevanceScore: number;
support: "strong" | "partial" | "weak";
};

type FinalClaim = {
id: string;
statement: string;
supportedBy: Grounding[];
unresolvedRisks: string[];
};

这样做的收益很直接:

  • 最终答案可以按 claim 挂证据
  • 能显式识别“这个说法其实只有弱支持”
  • 能把“不确定”和“没有证据”区分开
  • 后续做评估时,也能检查 claim 是否真的有 grounding

很多“看起来像幻觉”的问题,本质上不是模型乱说,而是系统从头到尾没有把“证据单元”当成一等公民。

7. 什么时候继续检索,什么时候停止

这是 RAG Agent 和普通 RAG 最不一样的地方之一。

普通 RAG 的默认行为往往是:

  • 先查一次
  • 然后直接答

RAG Agent 必须显式回答一个问题:

当前证据到底够不够支撑输出?

7.1 适合继续检索的信号

遇到下面这些情况,通常应该继续:

  • 关键子问题仍然没有证据
  • 检索结果只给出高度概括的片段,没有上下文
  • 不同来源出现冲突说法
  • 当前证据全都来自单一来源,可信度不足
  • 模型在总结时频繁出现“可能、推测、似乎”这类弱措辞

7.2 适合停止的信号

遇到下面这些情况,通常可以停:

  • 每个关键子问题都至少有 1 到 2 条有效依据
  • 新一轮检索和上一轮相比没有新增信息
  • 再继续查的边际收益已经很低
  • 已经可以清楚写出“结论 + 依据 + 不确定项”

7.3 不要把“停止”理解成“百分之百确定”

很多知识任务本来就不可能做到完全确定。

更健康的停止条件不是:

我已经知道一切

而是:

我已经有足够依据支撑当前结论,并能明确说明剩余不确定性。

所以第一版很值得加一个简单的停止函数:

function shouldStop(state: RagAgentState): boolean {
const unresolvedQuestions = state.subQuestions.filter(
(q) => !state.answeredQuestions.includes(q),
);

if (unresolvedQuestions.length === 0 && state.evidence.length >= 4) {
state.stopReason = "enough_evidence";
return true;
}

if (state.step >= state.maxSteps) {
state.stopReason = "max_steps_reached";
return true;
}

const tooManyWeakGaps = state.gaps.every(
(gap) => gap.suggestedAction === "stop_with_uncertainty",
);

if (state.gaps.length > 0 && tooManyWeakGaps) {
state.stopReason = "insufficient_grounding";
return true;
}

return false;
}

这不是万能规则,但它已经比“固定检索一次”更接近真实系统需要的控制能力了。

8. 一个较完整的 TypeScript 伪代码实现

下面这份伪代码故意不引入重框架。

目标不是直接运行,而是让你看清楚一个 RAG Agent 的最小闭环到底由哪些部分组成。

type SearchHit = {
chunkId: string;
sourceId: string;
sourceTitle: string;
text: string;
retrievalScore: number;
};

type RankedHit = SearchHit & {
rerankScore: number;
};

type ReadResult = {
sourceId: string;
sourceTitle: string;
chunkId: string;
excerpt: string;
expandedContext: string;
};

type Tool<TArgs, TResult> = {
name: string;
run(args: TArgs): Promise<TResult>;
};

const retrieveTool: Tool<{ query: string; topK: number }, SearchHit[]> = {
name: "retrieve_chunks",
async run({ query, topK }) {
return hybridSearch(query, topK);
},
};

const rerankTool: Tool<{ query: string; hits: SearchHit[] }, RankedHit[]> = {
name: "rerank_hits",
async run({ query, hits }) {
return rerankByCrossEncoder(query, hits);
},
};

const readTool: Tool<{ sourceId: string; chunkId: string }, ReadResult> = {
name: "read_context",
async run({ sourceId, chunkId }) {
return readChunkWithNeighbors(sourceId, chunkId);
},
};

function initializeState(userGoal: string): RagAgentState {
const subQuestions = decomposeGoal(userGoal);

return {
userGoal,
normalizedTask: normalizeGoal(userGoal),
subQuestions,
pendingQueries: subQuestions.map((question, index) => ({
id: `q_${index + 1}`,
text: question,
intent: "broad_recall",
createdAtStep: 0,
})),
evidence: [],
gaps: [],
answeredQuestions: [],
visitedSourceIds: [],
actionHistory: [],
step: 0,
maxSteps: 8,
done: false,
};
}

function logAction(
state: RagAgentState,
record: Omit<ActionRecord, "step">,
): void {
state.actionHistory.push({
step: state.step,
...record,
});
}

async function retrieveAndRead(
state: RagAgentState,
query: RetrievalQuery,
): Promise<void> {
const startedAt = Date.now();

const hits = await retrieveTool.run({ query: query.text, topK: 12 });
logAction(state, {
action: "retrieve",
inputSummary: query.text,
outputSummary: `hits=${hits.length}`,
latencyMs: Date.now() - startedAt,
});

if (hits.length === 0) {
state.gaps.push({
question: query.text,
reason: "no_relevant_results",
suggestedAction: "retrieve_more",
});
return;
}

const rerankStartedAt = Date.now();
const rankedHits = await rerankTool.run({ query: query.text, hits });
logAction(state, {
action: "rerank",
inputSummary: `query=${query.text}, hits=${hits.length}`,
outputSummary: `ranked=${rankedHits.length}`,
latencyMs: Date.now() - rerankStartedAt,
});

const topHits = rankedHits.slice(0, 3);

for (const hit of topHits) {
const readStartedAt = Date.now();
const detail = await readTool.run({
sourceId: hit.sourceId,
chunkId: hit.chunkId,
});

logAction(state, {
action: "read_chunk",
inputSummary: `${hit.sourceTitle}:${hit.chunkId}`,
outputSummary: `read source=${detail.sourceId}`,
latencyMs: Date.now() - readStartedAt,
});

state.visitedSourceIds.push(hit.sourceId);

const extracted = extractEvidence({
question: query.text,
content: detail.expandedContext,
sourceId: detail.sourceId,
sourceTitle: detail.sourceTitle,
chunkId: detail.chunkId,
score: hit.rerankScore,
});

state.evidence.push(...extracted);
}
}

function assessCoverage(state: RagAgentState): void {
state.gaps = [];
state.answeredQuestions = [];

for (const question of state.subQuestions) {
const supportingEvidence = state.evidence.filter(
(item) => item.question === question && item.grounded,
);

if (supportingEvidence.length >= 1) {
state.answeredQuestions.push(question);
continue;
}

state.gaps.push({
question,
reason: "evidence_too_weak",
suggestedAction: "retrieve_more",
});
}
}

function decideNextAction(
state: RagAgentState,
):
| { type: "run_query"; query: RetrievalQuery }
| { type: "synthesize" }
| { type: "stop" } {
if (shouldStop(state)) {
return { type: "stop" };
}

const nextQuery = state.pendingQueries.shift();
if (nextQuery) {
return { type: "run_query", query: nextQuery };
}

if (state.gaps.length === 0) {
return { type: "synthesize" };
}

const followupGap = state.gaps[0];
state.pendingQueries.push({
id: `q_followup_${state.step}`,
text: rewriteQueryForGap(followupGap.question, state.evidence),
intent: "targeted_followup",
createdAtStep: state.step,
});

return {
type: "run_query",
query: state.pendingQueries.shift()!,
};
}

function buildGroundedAnswer(state: RagAgentState): string {
const claims = synthesizeClaimsFromEvidence(state.evidence, state.subQuestions);

const supportedClaims = claims.map((claim) => {
const support = state.evidence.filter((item) => item.claim === claim.statement);

return {
statement: claim.statement,
evidence: support.map((item) => ({
sourceTitle: item.sourceTitle,
answerSpan: item.answerSpan,
})),
};
});

return renderAnswer({
goal: state.userGoal,
claims: supportedClaims,
unresolvedGaps: state.gaps.map((gap) => gap.question),
stopReason: state.stopReason ?? "enough_evidence",
});
}

async function runRagAgent(userGoal: string): Promise<string> {
const state = initializeState(userGoal);

while (!state.done) {
state.step += 1;
assessCoverage(state);

const decision = decideNextAction(state);

if (decision.type === "run_query") {
await retrieveAndRead(state, decision.query);
continue;
}

if (decision.type === "synthesize") {
logAction(state, {
action: "synthesize",
inputSummary: `evidence=${state.evidence.length}`,
outputSummary: "building grounded answer",
latencyMs: 0,
});

state.done = true;
return buildGroundedAnswer(state);
}

state.done = true;
}

logAction(state, {
action: "stop",
inputSummary: `reason=${state.stopReason ?? "unknown"}`,
outputSummary: `evidence=${state.evidence.length}, gaps=${state.gaps.length}`,
latencyMs: 0,
});

return buildGroundedAnswer(state);
}

这段代码里有几个特别值得注意的点:

  • 检索不是终点,而是 retrieveAndRead() 里的第一步
  • rerankread 是显式阶段,不是隐含在同一个黑盒里
  • assessCoverage() 每轮都会重新判断覆盖度
  • decideNextAction() 决定的是“下一步动作”,而不是直接生成最终答案
  • 最终输出来自 buildGroundedAnswer(),它只基于已沉淀证据组织结果

这就是 RAG Agent 最核心的工程思路:

不是把更多文本塞给模型,而是把“查、读、判断、收束”组织成一个可解释循环。

9. 怎样避免幻觉和无依据总结

很多团队做知识 Agent,真正栽跟头的地方不是检索不到,而是:

模型把“看过一些材料”误表现成“已经确认结论”。

要降低这个风险,至少可以做下面几件事。

9.1 没有依据的 claim 不允许进入最终答案

最简单也最有效的一条规则就是:

  • 每个最终 claim 必须绑定至少一条 grounding
  • 没有 grounding 的说法只能进“推测”或“不确定项”,不能进确定结论

9.2 区分“无结果”和“无证据支持”

这两个状态完全不同:

  • 无结果:系统没搜到相关内容
  • 无证据支持:搜到内容了,但不足以支撑结论

如果不区分,模型就很容易把“我没看到”写成“这件事不存在”。

9.3 让模型输出不确定性,而不是强行补全

最终答案里非常值得显式保留:

  • 已确认结论
  • 证据来源
  • 尚未确认的问题
  • 为什么未确认

也就是说,不要只允许系统输出“答案”,也要允许它输出“目前还不能确定”。

9.4 控制证据摘要,不让摘要替代原文

一个常见问题是:

  • 前几轮把证据摘要得越来越短
  • 最后一轮模型其实只在读摘要,而不是原始依据

这会导致语义漂移。

更稳的做法是:

  • evidence 保留原始 answerSpan
  • claimsummary 只是为了组织,不替代引用内容
  • 合成最终答案时,尽量从 grounding 出发,而不是只从二手摘要出发

10. 日志应该记录什么

如果你后面想调优 RAG Agent,日志几乎是必需品。

因为这类系统出问题时,根因可能在很多层:

  • query 写差了
  • 检索召回差了
  • rerank 把关键片段压下去了
  • read 读得不够深
  • 停止太早
  • synthesis 过度扩展

所以第一版至少建议记录这些字段:

type RagRunLog = {
sessionId: string;
userGoal: string;
step: number;
action: string;
query?: string;
selectedSourceIds?: string[];
evidenceCount: number;
gapCount: number;
stopReason?: string;
latencyMs: number;
};

实践里比较值得重点观察的指标有:

  • 每轮 query 长什么样
  • 每轮召回了多少候选
  • rerank 后真正进入阅读的是哪些来源
  • 最终有多少条 evidence 被用进答案
  • 系统是因为“证据足够”停,还是因为“步数到了”停

如果这些日志没有,你后面会很难判断:

系统回答差,到底是检索差,还是决策差,还是最后总结差。

11. 最小 eval 应该怎么做

第一版不需要上非常重的评估平台,但至少应该有一套最小 eval

一个很实用的最小集合可以包括:

11.1 Answer Grounding Rate

看最终答案里的 claim,有多少比例真正带了依据。

可以简单定义为:

grounded_claims / total_claims

11.2 Coverage of Key Questions

看关键子问题里,有多少被有效回答了。

比如:

answered_sub_questions / total_sub_questions

11.3 Unnecessary Retrieval Rate

看系统有没有在证据已经足够后还继续乱查。

这类指标有助于发现:

  • decision loop 太保守
  • 停止条件过弱
  • query rewrite 质量不稳定

11.4 Citation Correctness Spot Check

随机抽查若干答案,检查:

  • 引用的出处是否真的支持该 claim
  • 有没有“引用在,但其实不支持结论”的情况

这个指标非常重要,因为很多系统表面看“有引用”,其实只是“带了链接”,并不是真的 grounded。

12. 第一版实现时最常见的坑

最后收一下第一版最容易踩的坑。

12.1 把 Agent 做成“一次大 prompt”

这样通常会导致:

  • 没有显式状态
  • 没有可观察日志
  • 没法稳定控制继续还是停止

12.2 把 chunk 当 evidence

chunk 只是候选材料,不等于已经能支撑某个结论。

真正对最终输出有价值的是:

  • 哪个 claim
  • 被哪段原文支持
  • 支持强度如何

12.3 没有区分 broad recall 和 targeted follow-up

第一轮检索通常适合广召回。

后续检索更适合围绕缺口做有针对性的补查。

如果所有 query 都一种写法,系统很容易在“搜很多”和“搜得准”之间两头不到岸。

12.4 停止条件只有 maxSteps

如果系统只会“跑满步数再停”,那本质上还没有真正学会决策。

RAG Agent 的一个核心能力,就是知道什么时候已经有足够依据,什么时候应该明确承认仍有不确定性。

13. 可以怎样继续演进

如果你已经把这套最小骨架跑通,后面比较自然的演进方向通常是:

  • 给不同子问题分配不同检索策略
  • 引入来源可信度权重
  • 对冲突来源做显式 resolution loop
  • read 扩展成章节级或文档级阅读
  • 为最终答案增加 claim-level 引用标注

但在这之前,更重要的是先把第一版的主闭环做稳:

围绕目标检索 -> 读出证据 -> 判断是否够用 -> 不够就继续 -> 够了再输出,并且输出必须带依据

这才是 RAG Agent 和“会检索的聊天机器人”之间真正的分水岭。