前言
欢迎来到《Spring AI 零基础到实战》的第 25 节!
在上一节Java 程序员的 AI转型第二十四课:Spring AI 个人知识库实战(三)中,我们不仅基于Spring AI的内置组件解耦了 RAG 检索链路,并且通过 SSE 与响应式编程,构建了一个多轮流式对话接口。
然而,当 AI 知识库产品给到用户使用的时候,可能会出现这种问题:当询问“年假与调休的合并规则”,大模型洋洋洒洒地输出了专业解答。那么用户凭什么相信这段话是公司规定的原话,而不是大模型基于概率算出来的“幻觉”?
因此我们需要在AI回答的最后附带几个类似 [1]、[2] 的上标,点击即可跳转到具体的原始文档——这就是企业级 RAG 的底线:来源追溯(Citations)。
本节课,我们将移除纯文本提取流。深入 ChatResponse 的底层数据结构,从 SSE 数据流中,剥离出被 RAG 命中的 Document 元数据。让我们的系统做到“字字有出处,句句有回音”。
本节章节目标
底层透视:深入 ChatResponse的数据结构,洞悉 RAG 检索文档是如何被框架挂载的。末端帧劫持:摒弃的字符串流封装,掌控 Flux<ChatResponse>对象流的高阶转换技巧。架构契约:在 SSE 协议的生命周期末端(EOF),优雅追加 JSON 格式的元数据,确立前后端无缝联调的溯源防线。
知识库功能展示
登录页面

RAG对话

联网对话
大模型响应元数据 (Metadata)
当 QuestionAnswerAdvisor 从向量库捞出 4 块高相关度的切片并喂给大模型时,它并没有将这些原始对象丢弃。相反,它将这 4 块文档(包含我们在入库时打入的 source_filename 等标签),打包成了一个的“装箱单”,附着在了大模型返回的最终响应体中。如下图所示:

重构ChatServiceImpl提取引文
在上一节的代码中,我们为了追求快速跑通,使用了 chatClient.prompt()....stream().content()。这个语法糖简单,但代价却是极大:它在底层进行了一次字符串提取,把装着参考文献和 Token 消耗的 ChatResponse 整个包裹直接丢弃了。
为了追溯来源,我们需要舍弃便捷的 .content(),改用 .chatResponse() 获取原始的 Flux<ChatResponse> 流,并在响应式管道中手动实施拦截与拆解。
为了让前端能够区分“大模型正在生成的正文”与“参考资料”,我们的设计如下:
流的前段:保持纯净,只推送生成的正文(如 data: 调休...)。流的末尾(EOF):当模型推理结束时,追加一段由特殊标记包裹的 JSON(如 [CITATIONS_START]{"sources":["手册.pdf"]}[CITATIONS_END])。前端侦测到这个特殊标记后,即可拦截并渲染为引用卡片。
我们切入 ChatServiceImpl.java,进行逻辑重构:
publicclassChatServiceImplimplementsChatService{
/** 1. 制定溯源契约*/
privatestaticfinal String CITATIONS_START = "[CITATIONS_START]";
privatestaticfinal String CITATIONS_END = "[CITATIONS_END]";
public Flux<String> streamChatWithCitations(String chatId, String message){
Flux<ChatResponse> responseFlux = this.chatClient.prompt()
// ...
// 切换为chatResponse
.chatResponse()
.cache();
// 3. 数据包解析
// 提取征文
Flux<String> textFlux = responseFlux.map(chatClientResponse -> {
return chatClientResponse.chatResponse().getResult().getOutput().getText();
});
// 追加元数据
Flux<String> citationsFlux = responseFlux.last()
.mapNotNull(this::extractCitationsFromResponse)
.filter(c -> !c.isEmpty())
.flux();
return Flux.concat(textFlux, citationsFlux);
}
/**
* 从 ChatResponse 元数据中提取 RAG 命中文档的来源文件名
*
* QuestionAnswerAdvisor 在流结束时将检索到的 Document 列表挂载到
* response.getMetadata() 中,key 为 QuestionAnswerAdvisor.RETRIEVED_DOCUMENTS。
*/
private String extractCitationsFromResponse(ChatClientResponse chatClientResponse){
// 获取RAG文档
Object documentsObj = chatClientResponse.context()
.get(QuestionAnswerAdvisor.RETRIEVED_DOCUMENTS);
// 按文件名聚合页码:filename -> 有序去重页码集合
Map<String, TreeSet<Object>> filePageMap = new LinkedHashMap<>();
for (Object obj : docs) {
if (obj instanceof Document doc) {
String filename = (String) doc.getMetadata().get("source_filename");
if (filename == null) continue;
filePageMap.computeIfAbsent(filename, k -> new TreeSet<>());
// page_number 由 PagePdfDocumentReader 注入,非 PDF 文件无此字段
Object pageObj = doc.getMetadata().get("page_number");
if (pageObj != null) {
filePageMap.get(filename).add(pageObj);
}
}
}
// 构造结构化溯源列表:[{file, pages}, ...]
List<Map<String, Object>> sources = new ArrayList<>();
for (Map.Entry<String, TreeSet<Object>> entry : filePageMap.entrySet()) {
Map<String, Object> item = new LinkedHashMap<>();
item.put("file", entry.getKey());
// 无页码信息(非 PDF)时 pages 为空列表
item.put("pages", new ArrayList<>(entry.getValue()));
sources.add(item);
}
String jsonStr = objectMapper.writeValueAsString(Map.of("sources", sources));
return CITATIONS_START + jsonStr + CITATIONS_END;
}
}
底层细节剖析
Spring AI 框架的底层设计极其克制,在 SSE 的响应式流中,前面的数百个数据包只包含大模型生成的只言片语,其元数据是残缺的。只有当框架探测到 FinishReason(即大模型宣告推理完毕)时,整个请求的完整生命周期报告(包含消耗的 Token、完整的 RAG 检索命中结果)才会被一并下发,作为“装箱单”统一塞进 Metadata 中。我们的responseFlux.last()利用了这末端帧,完成了引文劫持。
详细可以查看org.springframework.ai.chat.client.advisor.api.BaseAdvisor的源码,片段如下:
default Flux<ChatClientResponse> adviseStream(){
return chatClientResponseFlux.map((response) -> {
// 检测是否完成,如果完整调用after追加扩展内容,在QuestionAnswerAdvisor的体现就是追加Metadata
if (AdvisorUtils.onFinishReason().test(response)) {
response = this.after(response, streamAdvisorChain);
}
return response;
}).onErrorResume((error) -> Flux.error(new IllegalStateException("Stream processing failed", error)));
}
流转验证
我们在上一节构建好的 ChatController 由于具备解耦性,此时我们无需修改HTTP 通信层逻辑。启动 Spring Boot 服务,浏览器测试:
http://localhost:8080/api/chat/stream?chatId=春风不晚&message=java开发手册&model=deepseek
【结果验证】: 在浏览器中,前半截依然是打字机流,但在数据流的最后,系统返回我们封装好的溯源契约:
data: 按照提供的文档信息
...
data:[CITATIONS_START]{"sources":[{"file":"阿里巴巴Java开发手册(终极版).pdf","pages":["1","7"]}]}[CITATIONS_END]
对于前端的开发而言,我们只需在合并文本流时增加一行正则匹配,将 [CITATIONS_START] 与 [CITATIONS_END] 之间的内容进行 JSON.parse,即可在对话下方渲染出类似 [引用来源:阿里巴巴Java开发手册(终极版).pdf] 的标签。整个 AI 产品的可解释性与商业公信力,瞬间拉满。
响应式缓存,防止重复计费
上文中,我们加了如下代码:
Flux<ChatResponse> responseFlux = this.chatClient.prompt()
// 多次订阅共享同一次 AI 请求,避免重复调用
.cache();
这是一个 Spring WebFlux的底层操作符。
为什么必须加? 因为这是一个数据流 Flux。在后面的代码逻辑中,我们需要消费这个流两次:第一次消费: 把流转换成 SSE 格式,通过 HTTP 实时推送给前端浏览器(让用户看到打字机效果)。 第二次消费: 等流结束(回答完毕)后,我们需要追加溯源标记。 不加 .cache(): Flux 是“冷流”,每订阅一次就会重新执行一次整个链路。这样不仅回复会变慢,还要花双倍的价钱。加了 .cache(): 第一次调用 API 拿到的流数据会被缓存下来。第二次会直接从内存缓存中读取刚才的完整文本,就不会触发第二次大模型网络调用。
在 Spring AI 结合 WebFlux 的开发中,如果业务场景需要同时满足“流式响应给用户”和“后台抓取完整内容处理(如存库、敏感词审核、触发另一个 AI 任务)”,那么就必须在最后加上 .cache()。它能减少费用损耗和加快响应速度。
总结
在本节中,我们完成了一次向底层架构的潜跃。
我们移除框架提供的傻瓜式语法糖,通过 Flux<ChatResponse>,掌控了流式响应的微观生命周期;拦截响应流 EOF 末端帧,拿到封装的 Document 元数据,并最终确立了前后端契约防线。
在看似黑盒的 AI 生成迷雾中,我们死死拽住了数据追根溯源的准绳。
下期预告
【第 26 节:Spring AI 个人知识库实战(五)- 增强联网搜索能力】。
至此,我们的本地知识库 RAG 主线已全部通关。但即便如此,它依然是一座“数据孤岛”。 如果用户问:“今天的北京天气?”,本地的 Redis 向量库里显然没有这份数据,AI 只能抱歉地说不知道。
一个智能系统,绝不应被禁锢在四面墙内! 下节课,我们将对 ChatClient 注入最后的灵魂:大模型函数调用(Function Calling)。当本地找不到答案时,让 AI 自动唤醒外部工具(如联网搜索引擎、实时天气 API),赋予它主动探索真实物理世界的能力!
精彩继续,我们下节见!
公众号回复:【源码】获取知识库代码
系列专栏:
SpringAI + RAG + MCP + Agent 零基础实战专栏
往期内容:
告别 API 付费!Java + Ollama 本地部署 Qwen 3.5:3分钟打造你的私人视觉 AI
性能狂飙 300%!Spring Boot + Vue 单体整合 Qwen 3.5:3秒搞定本地识图

夜雨聆风