4、LangChain4j 进阶:文档加载、拆分与向量化实战
概述
在构建RAG(检索增强生成)系统时,文档处理是关键的第一步。本文将深入探讨如何使用LangChain4j处理各种类型的文档,包括PDF、Word、Excel等,以及如何进行有效的文本拆分和向量化。
文档加载器详解
1. PDF文档加载
import dev.langchain4j.data.document.Document;import dev.langchain4j.data.document.loader.FileSystemDocumentLoader;import dev.langchain4j.data.document.parser.apache.pdfbox.ApachePdfBoxDocumentParser;publicclassPDFLoaderExample{publicstaticvoidmain(String[] args){// 使用Apache PDFBox解析PDF文档 Document pdfDoc = FileSystemDocumentLoader.loadDocument( Paths.get("manuals/product_manual.pdf"),new ApachePdfBoxDocumentParser() ); System.out.println("PDF文档内容:" + pdfDoc.text()); System.out.println("元数据:" + pdfDoc.metadata()); }}
2. Word文档加载
import dev.langchain4j.data.document.parser.apache.poi.ApachePoiDocumentParser;publicclassWordLoaderExample{public Document loadWordDocument(String filePath){return FileSystemDocumentLoader.loadDocument( Paths.get(filePath),new ApachePoiDocumentParser() // 支持.doc和.docx ); }}
3. 多格式文档批量加载
import dev.langchain4j.data.document.loader.FileSystemDocumentLoader;import dev.langchain4j.data.document.Document;publicclassMultiFormatLoader{public List<Document> loadDocumentsFromDirectory(String directoryPath){ Path dir = Paths.get(directoryPath);// 定义支持的文件扩展名 Set<String> supportedExtensions = Set.of(".pdf", ".doc", ".docx", ".txt", ".html"); List<Document> documents = new ArrayList<>();try (Stream<Path> paths = Files.walk(dir)) { paths.filter(Files::isRegularFile) .filter(path -> supportedExtensions.contains(getExtension(path))) .forEach(path -> { DocumentParser parser = getParserForExtension(getExtension(path)); Document doc = FileSystemDocumentLoader.loadDocument(path, parser); documents.add(doc); }); } catch (IOException e) {thrownew RuntimeException("Error loading documents", e); }return documents; }private String getExtension(Path path){ String fileName = path.getFileName().toString();int dotIndex = fileName.lastIndexOf('.');return dotIndex > 0 ? fileName.substring(dotIndex).toLowerCase() : ""; }private DocumentParser getParserForExtension(String extension){switch (extension) {case".pdf":returnnew ApachePdfBoxDocumentParser();case".doc":case".docx":returnnew ApachePoiDocumentParser();case".html":returnnew ApacheTikaDocumentParser();default:returnnew TextDocumentParser(); } }}
文本拆分策略
1. 递归字符拆分器
import dev.langchain4j.data.document.splitter.DocumentSplitters;import dev.langchain4j.data.segment.TextSegment;publicclassRecursiveSplitterExample{public List<TextSegment> splitDocument(Document document){// 递归拆分,优先按段落拆分,然后按句子,最后按字符 DocumentSplitter splitter = DocumentSplitters.recursive(500, // 最大段落长度50// 重叠字符数 );return splitter.split(document); }}
2. 自定义拆分器
import dev.langchain4j.data.document.Document;import dev.langchain4j.data.document.DocumentSplitter;import dev.langchain4j.data.segment.TextSegment;publicclassCustomDocumentSplitterimplementsDocumentSplitter{privatefinalint maxLength;privatefinalint overlap;publicCustomDocumentSplitter(int maxLength, int overlap){this.maxLength = maxLength;this.overlap = overlap; }@Overridepublic List<TextSegment> split(Document document){ String text = document.text(); List<TextSegment> segments = new ArrayList<>();// 按章节拆分 String[] chapters = text.split("(?=\\n\\d+\\.\\s)"); // 匹配章节标题for (String chapter : chapters) {if (chapter.trim().isEmpty()) continue;// 对长章节再次拆分if (chapter.length() > maxLength) { List<TextSegment> subSegments = splitByLength(chapter); segments.addAll(subSegments); } else { segments.add(TextSegment.from(chapter, document.metadata())); } }return segments; }private List<TextSegment> splitByLength(String text){ List<TextSegment> segments = new ArrayList<>();int start = 0;while (start < text.length()) {int end = Math.min(start + maxLength, text.length()); String segmentText = text.substring(start, end);// 确保不在单词中间拆分if (end < text.length()) {int lastSpace = segmentText.lastIndexOf(' ');if (lastSpace > 0) { end = start + lastSpace; segmentText = text.substring(start, end); } } segments.add(TextSegment.from(segmentText));// 计算下一个起始位置,考虑重叠 start = end - overlap;if (start <= segments.get(segments.size() - 1).text().length() - overlap) {break; } }return segments; }}
3. 语义感知拆分
import dev.langchain4j.model.embedding.EmbeddingModel;import dev.langchain4j.model.embedding.onnx.allminilml6v2q.AllMiniLmL6V2QuantizedEmbeddingModel;publicclassSemanticSplitter{privatefinal EmbeddingModel embeddingModel;publicSemanticSplitter(){this.embeddingModel = new AllMiniLmL6V2QuantizedEmbeddingModel(); }public List<TextSegment> semanticSplit(List<TextSegment> initialSegments){ List<TextSegment> refinedSegments = new ArrayList<>();for (int i = 0; i < initialSegments.size(); i++) { TextSegment current = initialSegments.get(i);if (i < initialSegments.size() - 1) { TextSegment next = initialSegments.get(i + 1);// 计算相邻段落的语义相似度 Embedding currentEmbedding = embeddingModel.embed(current.text()).content(); Embedding nextEmbedding = embeddingModel.embed(next.text()).content();double similarity = cosineSimilarity(currentEmbedding.vector(), nextEmbedding.vector());// 如果相似度过低,可能表示主题转换,保留分割if (similarity < 0.3) { refinedSegments.add(current); } else {// 合并相似段落 String mergedText = current.text() + " " + next.text(); refinedSegments.add(TextSegment.from(mergedText)); i++; // 跳过下一个段落,因为它已经被合并 } } else { refinedSegments.add(current); } }return refinedSegments; }privatedoublecosineSimilarity(float[] vec1, float[] vec2){double dotProduct = 0.0;double norm1 = 0.0;double norm2 = 0.0;for (int i = 0; i < vec1.length; i++) { dotProduct += vec1[i] * vec2[i]; norm1 += Math.pow(vec1[i], 2); norm2 += Math.pow(vec2[i], 2); }return dotProduct / (Math.sqrt(norm1) * Math.sqrt(norm2)); }}
向量化与存储
1. 嵌入模型选择
import dev.langchain4j.model.embedding.EmbeddingModel;import dev.langchain4j.model.embedding.onnx.allminilml6v2q.AllMiniLmL6V2QuantizedEmbeddingModel;publicclassEmbeddingModelSelector{publicenum EmbeddingType { ALL_MINI_LM_L6_V2_Q, // 轻量级,适合资源受限环境 BGE_SMALL_ZH, // 中文优化 MPNET_BASE_V2 // 高精度,资源消耗较大 }public EmbeddingModel selectModel(EmbeddingType type){switch (type) {case ALL_MINI_LM_L6_V2_Q:returnnew AllMiniLmL6V2QuantizedEmbeddingModel();case BGE_SMALL_ZH:// 需要相应的中文模型实现return createChineseOptimizedModel();case MPNET_BASE_V2:// 需要相应的模型实现return createHighPrecisionModel();default:returnnew AllMiniLmL6V2QuantizedEmbeddingModel(); } }private EmbeddingModel createChineseOptimizedModel(){// 实现中文优化的嵌入模型// 这里使用占位符returnnew AllMiniLmL6V2QuantizedEmbeddingModel(); }private EmbeddingModel createHighPrecisionModel(){// 实现高精度嵌入模型// 这里使用占位符returnnew AllMiniLmL6V2QuantizedEmbeddingModel(); }}
2. 向量存储实现
import dev.langchain4j.data.embedding.Embedding;import dev.langchain4j.data.segment.TextSegment;import dev.langchain4j.store.embedding.EmbeddingMatch;import dev.langchain4j.store.embedding.EmbeddingStore;import dev.langchain4j.store.embedding.inmemory.InMemoryEmbeddingStore;publicclassVectorStorageService{privatefinal EmbeddingModel embeddingModel;privatefinal EmbeddingStore<TextSegment> embeddingStore;publicVectorStorageService(){this.embeddingModel = new AllMiniLmL6V2QuantizedEmbeddingModel();this.embeddingStore = new InMemoryEmbeddingStore<>(); }// 将文档段落添加到向量存储publicvoidaddDocuments(List<TextSegment> segments){for (TextSegment segment : segments) { Embedding embedding = embeddingModel.embed(segment.text()).content(); embeddingStore.add(embedding, segment); } }// 检索最相关的文档段落public List<EmbeddingMatch<TextSegment>> findRelevantDocuments(String query, int maxResults) { Embedding queryEmbedding = embeddingModel.embed(query).content();return embeddingStore.findRelevant(queryEmbedding, maxResults); }// 批量添加文档(适用于大量数据)publicvoidaddDocumentsBatch(List<TextSegment> segments){ List<Embedding> embeddings = segments.stream() .map(segment -> embeddingModel.embed(segment.text()).content()) .collect(Collectors.toList()); embeddingStore.addAll(embeddings, segments); }}
性能优化
1. 批量处理优化
publicclassOptimizedDocumentProcessor{privatefinal EmbeddingModel embeddingModel;privatefinalint batchSize;publicOptimizedDocumentProcessor(EmbeddingModel embeddingModel, int batchSize){this.embeddingModel = embeddingModel;this.batchSize = batchSize; }publicvoidprocessDocumentsInBatches(List<Document> documents){ List<List<Document>> batches = Lists.partition(documents, batchSize);for (List<Document> batch : batches) { processBatch(batch); } }privatevoidprocessBatch(List<Document> batch){// 并行处理批次内的文档 List<TextSegment> allSegments = batch.parallelStream() .flatMap(doc -> { DocumentSplitter splitter = DocumentSplitters.recursive(500, 50);return splitter.split(doc).stream(); }) .collect(Collectors.toList());// 批量生成嵌入向量 List<Embedding> embeddings = embeddingModel.embedAll( allSegments.stream() .map(TextSegment::text) .collect(Collectors.toList()) ).content();// 批量存储到向量数据库 embeddingStore.addAll(embeddings, allSegments); }}
2. 内存管理
publicclassMemoryEfficientProcessor{privatefinal EmbeddingModel embeddingModel;privatefinal EmbeddingStore<TextSegment> embeddingStore;publicvoidprocessLargeDocument(String filePath){// 分块读取大文件,避免内存溢出try (BufferedReader reader = Files.newBufferedReader(Paths.get(filePath))) { StringBuilder chunk = new StringBuilder(); String line;int linesProcessed = 0;finalint chunkSize = 1000; // 每1000行处理一次while ((line = reader.readLine()) != null) { chunk.append(line).append("\n"); linesProcessed++;if (linesProcessed >= chunkSize) { processChunk(chunk.toString()); chunk.setLength(0); // 清空StringBuilder linesProcessed = 0; } }// 处理剩余内容if (chunk.length() > 0) { processChunk(chunk.toString()); } } catch (IOException e) {thrownew RuntimeException("Error processing large document", e); } }privatevoidprocessChunk(String chunk){ Document doc = Document.from(chunk); DocumentSplitter splitter = DocumentSplitters.recursive(500, 50); List<TextSegment> segments = splitter.split(doc);// 生成嵌入并存储for (TextSegment segment : segments) { Embedding embedding = embeddingModel.embed(segment.text()).content(); embeddingStore.add(embedding, segment); } }}
错误处理与监控
1. 异常处理
@Slf4jpublicclassRobustDocumentProcessor{public ProcessResult processDocumentSafely(Document document){try {// 验证文档if (document == null || document.text().trim().isEmpty()) {return ProcessResult.failure("Document is empty or null"); }// 检查文档大小if (document.text().length() > MAX_DOCUMENT_SIZE) {return ProcessResult.failure("Document exceeds maximum size limit"); }// 执行处理 DocumentSplitter splitter = DocumentSplitters.recursive(500, 50); List<TextSegment> segments = splitter.split(document);// 存储到向量数据库 List<Embedding> embeddings = embeddingModel.embedAll( segments.stream() .map(TextSegment::text) .collect(Collectors.toList()) ).content(); embeddingStore.addAll(embeddings, segments);return ProcessResult.success(segments.size()); } catch (Exception e) { log.error("Error processing document", e);return ProcessResult.failure("Processing error: " + e.getMessage()); } }privatestaticclassProcessResult{privatefinalboolean success;privatefinal String message;privatefinalint processedCount;privateProcessResult(boolean success, String message, int processedCount){this.success = success;this.message = message;this.processedCount = processedCount; }publicstatic ProcessResult success(int count){returnnew ProcessResult(true, "Success", count); }publicstatic ProcessResult failure(String message){returnnew ProcessResult(false, message, 0); } }}
2. 处理指标监控
@ComponentpublicclassProcessingMetrics{privatefinal MeterRegistry meterRegistry;privatefinal Timer documentProcessingTimer;privatefinal Counter documentsProcessedCounter;privatefinal Gauge averageDocumentSizeGauge;publicProcessingMetrics(MeterRegistry meterRegistry){this.meterRegistry = meterRegistry;this.documentProcessingTimer = Timer.builder("document.processing.duration") .description("Time taken to process documents") .register(meterRegistry);this.documentsProcessedCounter = Counter.builder("documents.processed") .description("Number of documents processed") .register(meterRegistry);this.averageDocumentSizeGauge = Gauge.builder("document.average.size") .description("Average size of processed documents") .register(meterRegistry, this, ProcessingMetrics::getAverageDocumentSize); }publicvoidrecordProcessing(Document document){ Timer.Sample sample = Timer.start(meterRegistry);// 处理文档 processDocument(document);// 记录指标 sample.stop(documentProcessingTimer); documentsProcessedCounter.increment(); updateAverageDocumentSize(document.text().length()); }privatevolatiledouble averageDocumentSize = 0;privatevolatileint documentCount = 0;privatesynchronizedvoidupdateAverageDocumentSize(int size){ documentCount++; averageDocumentSize = ((averageDocumentSize * (documentCount - 1)) + size) / documentCount; }publicdoublegetAverageDocumentSize(){return averageDocumentSize; }}
总结
文档加载、拆分和向量化是构建高质量RAG系统的基础。选择合适的文档加载器、拆分策略和嵌入模型对于系统性能至关重要。通过合理的批处理、内存管理和错误处理,可以构建高效、可靠的文档处理管道。
夜雨聆风
