乐于分享
好东西不私藏

4、LangChain4j 进阶:文档加载、拆分与向量化实战

4、LangChain4j 进阶:文档加载、拆分与向量化实战

概述

在构建RAG(检索增强生成)系统时,文档处理是关键的第一步。本文将深入探讨如何使用LangChain4j处理各种类型的文档,包括PDF、Word、Excel等,以及如何进行有效的文本拆分和向量化。

文档加载器详解

1. PDF文档加载

import dev.langchain4j.data.document.Document;import dev.langchain4j.data.document.loader.FileSystemDocumentLoader;import dev.langchain4j.data.document.parser.apache.pdfbox.ApachePdfBoxDocumentParser;publicclassPDFLoaderExample{publicstaticvoidmain(String[] args){// 使用Apache PDFBox解析PDF文档        Document pdfDoc = FileSystemDocumentLoader.loadDocument(            Paths.get("manuals/product_manual.pdf"),new ApachePdfBoxDocumentParser()        );        System.out.println("PDF文档内容:" + pdfDoc.text());        System.out.println("元数据:" + pdfDoc.metadata());    }}

2. Word文档加载

import dev.langchain4j.data.document.parser.apache.poi.ApachePoiDocumentParser;publicclassWordLoaderExample{public Document loadWordDocument(String filePath){return FileSystemDocumentLoader.loadDocument(            Paths.get(filePath),new ApachePoiDocumentParser()  // 支持.doc和.docx        );    }}

3. 多格式文档批量加载

import dev.langchain4j.data.document.loader.FileSystemDocumentLoader;import dev.langchain4j.data.document.Document;publicclassMultiFormatLoader{public List<Document> loadDocumentsFromDirectory(String directoryPath){        Path dir = Paths.get(directoryPath);// 定义支持的文件扩展名        Set<String> supportedExtensions = Set.of(".pdf"".doc"".docx"".txt"".html");        List<Document> documents = new ArrayList<>();try (Stream<Path> paths = Files.walk(dir)) {            paths.filter(Files::isRegularFile)                 .filter(path -> supportedExtensions.contains(getExtension(path)))                 .forEach(path -> {                     DocumentParser parser = getParserForExtension(getExtension(path));                     Document doc = FileSystemDocumentLoader.loadDocument(path, parser);                     documents.add(doc);                 });        } catch (IOException e) {thrownew RuntimeException("Error loading documents", e);        }return documents;    }private String getExtension(Path path){        String fileName = path.getFileName().toString();int dotIndex = fileName.lastIndexOf('.');return dotIndex > 0 ? fileName.substring(dotIndex).toLowerCase() : "";    }private DocumentParser getParserForExtension(String extension){switch (extension) {case".pdf":returnnew ApachePdfBoxDocumentParser();case".doc":case".docx":returnnew ApachePoiDocumentParser();case".html":returnnew ApacheTikaDocumentParser();default:returnnew TextDocumentParser();        }    }}

文本拆分策略

1. 递归字符拆分器

import dev.langchain4j.data.document.splitter.DocumentSplitters;import dev.langchain4j.data.segment.TextSegment;publicclassRecursiveSplitterExample{public List<TextSegment> splitDocument(Document document){// 递归拆分,优先按段落拆分,然后按句子,最后按字符        DocumentSplitter splitter = DocumentSplitters.recursive(500,  // 最大段落长度50// 重叠字符数        );return splitter.split(document);    }}

2. 自定义拆分器

import dev.langchain4j.data.document.Document;import dev.langchain4j.data.document.DocumentSplitter;import dev.langchain4j.data.segment.TextSegment;publicclassCustomDocumentSplitterimplementsDocumentSplitter{privatefinalint maxLength;privatefinalint overlap;publicCustomDocumentSplitter(int maxLength, int overlap){this.maxLength = maxLength;this.overlap = overlap;    }@Overridepublic List<TextSegment> split(Document document){        String text = document.text();        List<TextSegment> segments = new ArrayList<>();// 按章节拆分        String[] chapters = text.split("(?=\\n\\d+\\.\\s)"); // 匹配章节标题for (String chapter : chapters) {if (chapter.trim().isEmpty()) continue;// 对长章节再次拆分if (chapter.length() > maxLength) {                List<TextSegment> subSegments = splitByLength(chapter);                segments.addAll(subSegments);            } else {                segments.add(TextSegment.from(chapter, document.metadata()));            }        }return segments;    }private List<TextSegment> splitByLength(String text){        List<TextSegment> segments = new ArrayList<>();int start = 0;while (start < text.length()) {int end = Math.min(start + maxLength, text.length());            String segmentText = text.substring(start, end);// 确保不在单词中间拆分if (end < text.length()) {int lastSpace = segmentText.lastIndexOf(' ');if (lastSpace > 0) {                    end = start + lastSpace;                    segmentText = text.substring(start, end);                }            }            segments.add(TextSegment.from(segmentText));// 计算下一个起始位置,考虑重叠            start = end - overlap;if (start <= segments.get(segments.size() - 1).text().length() - overlap) {break;            }        }return segments;    }}

3. 语义感知拆分

import dev.langchain4j.model.embedding.EmbeddingModel;import dev.langchain4j.model.embedding.onnx.allminilml6v2q.AllMiniLmL6V2QuantizedEmbeddingModel;publicclassSemanticSplitter{privatefinal EmbeddingModel embeddingModel;publicSemanticSplitter(){this.embeddingModel = new AllMiniLmL6V2QuantizedEmbeddingModel();    }public List<TextSegment> semanticSplit(List<TextSegment> initialSegments){        List<TextSegment> refinedSegments = new ArrayList<>();for (int i = 0; i < initialSegments.size(); i++) {            TextSegment current = initialSegments.get(i);if (i < initialSegments.size() - 1) {                TextSegment next = initialSegments.get(i + 1);// 计算相邻段落的语义相似度                Embedding currentEmbedding = embeddingModel.embed(current.text()).content();                Embedding nextEmbedding = embeddingModel.embed(next.text()).content();double similarity = cosineSimilarity(currentEmbedding.vector(), nextEmbedding.vector());// 如果相似度过低,可能表示主题转换,保留分割if (similarity < 0.3) {                    refinedSegments.add(current);                } else {// 合并相似段落                    String mergedText = current.text() + " " + next.text();                    refinedSegments.add(TextSegment.from(mergedText));                    i++; // 跳过下一个段落,因为它已经被合并                }            } else {                refinedSegments.add(current);            }        }return refinedSegments;    }privatedoublecosineSimilarity(float[] vec1, float[] vec2){double dotProduct = 0.0;double norm1 = 0.0;double norm2 = 0.0;for (int i = 0; i < vec1.length; i++) {            dotProduct += vec1[i] * vec2[i];            norm1 += Math.pow(vec1[i], 2);            norm2 += Math.pow(vec2[i], 2);        }return dotProduct / (Math.sqrt(norm1) * Math.sqrt(norm2));    }}

向量化与存储

1. 嵌入模型选择

import dev.langchain4j.model.embedding.EmbeddingModel;import dev.langchain4j.model.embedding.onnx.allminilml6v2q.AllMiniLmL6V2QuantizedEmbeddingModel;publicclassEmbeddingModelSelector{publicenum EmbeddingType {        ALL_MINI_LM_L6_V2_Q,  // 轻量级,适合资源受限环境        BGE_SMALL_ZH,         // 中文优化        MPNET_BASE_V2         // 高精度,资源消耗较大    }public EmbeddingModel selectModel(EmbeddingType type){switch (type) {case ALL_MINI_LM_L6_V2_Q:returnnew AllMiniLmL6V2QuantizedEmbeddingModel();case BGE_SMALL_ZH:// 需要相应的中文模型实现return createChineseOptimizedModel();case MPNET_BASE_V2:// 需要相应的模型实现return createHighPrecisionModel();default:returnnew AllMiniLmL6V2QuantizedEmbeddingModel();        }    }private EmbeddingModel createChineseOptimizedModel(){// 实现中文优化的嵌入模型// 这里使用占位符returnnew AllMiniLmL6V2QuantizedEmbeddingModel();    }private EmbeddingModel createHighPrecisionModel(){// 实现高精度嵌入模型// 这里使用占位符returnnew AllMiniLmL6V2QuantizedEmbeddingModel();    }}

2. 向量存储实现

import dev.langchain4j.data.embedding.Embedding;import dev.langchain4j.data.segment.TextSegment;import dev.langchain4j.store.embedding.EmbeddingMatch;import dev.langchain4j.store.embedding.EmbeddingStore;import dev.langchain4j.store.embedding.inmemory.InMemoryEmbeddingStore;publicclassVectorStorageService{privatefinal EmbeddingModel embeddingModel;privatefinal EmbeddingStore<TextSegment> embeddingStore;publicVectorStorageService(){this.embeddingModel = new AllMiniLmL6V2QuantizedEmbeddingModel();this.embeddingStore = new InMemoryEmbeddingStore<>();    }// 将文档段落添加到向量存储publicvoidaddDocuments(List<TextSegment> segments){for (TextSegment segment : segments) {            Embedding embedding = embeddingModel.embed(segment.text()).content();            embeddingStore.add(embedding, segment);        }    }// 检索最相关的文档段落public List<EmbeddingMatch<TextSegment>> findRelevantDocuments(String query, int maxResults) {        Embedding queryEmbedding = embeddingModel.embed(query).content();return embeddingStore.findRelevant(queryEmbedding, maxResults);    }// 批量添加文档(适用于大量数据)publicvoidaddDocumentsBatch(List<TextSegment> segments){        List<Embedding> embeddings = segments.stream()            .map(segment -> embeddingModel.embed(segment.text()).content())            .collect(Collectors.toList());        embeddingStore.addAll(embeddings, segments);    }}

性能优化

1. 批量处理优化

publicclassOptimizedDocumentProcessor{privatefinal EmbeddingModel embeddingModel;privatefinalint batchSize;publicOptimizedDocumentProcessor(EmbeddingModel embeddingModel, int batchSize){this.embeddingModel = embeddingModel;this.batchSize = batchSize;    }publicvoidprocessDocumentsInBatches(List<Document> documents){        List<List<Document>> batches = Lists.partition(documents, batchSize);for (List<Document> batch : batches) {            processBatch(batch);        }    }privatevoidprocessBatch(List<Document> batch){// 并行处理批次内的文档        List<TextSegment> allSegments = batch.parallelStream()            .flatMap(doc -> {                DocumentSplitter splitter = DocumentSplitters.recursive(50050);return splitter.split(doc).stream();            })            .collect(Collectors.toList());// 批量生成嵌入向量        List<Embedding> embeddings = embeddingModel.embedAll(            allSegments.stream()                .map(TextSegment::text)                .collect(Collectors.toList())        ).content();// 批量存储到向量数据库        embeddingStore.addAll(embeddings, allSegments);    }}

2. 内存管理

publicclassMemoryEfficientProcessor{privatefinal EmbeddingModel embeddingModel;privatefinal EmbeddingStore<TextSegment> embeddingStore;publicvoidprocessLargeDocument(String filePath){// 分块读取大文件,避免内存溢出try (BufferedReader reader = Files.newBufferedReader(Paths.get(filePath))) {            StringBuilder chunk = new StringBuilder();            String line;int linesProcessed = 0;finalint chunkSize = 1000// 每1000行处理一次while ((line = reader.readLine()) != null) {                chunk.append(line).append("\n");                linesProcessed++;if (linesProcessed >= chunkSize) {                    processChunk(chunk.toString());                    chunk.setLength(0); // 清空StringBuilder                    linesProcessed = 0;                }            }// 处理剩余内容if (chunk.length() > 0) {                processChunk(chunk.toString());            }        } catch (IOException e) {thrownew RuntimeException("Error processing large document", e);        }    }privatevoidprocessChunk(String chunk){        Document doc = Document.from(chunk);        DocumentSplitter splitter = DocumentSplitters.recursive(50050);        List<TextSegment> segments = splitter.split(doc);// 生成嵌入并存储for (TextSegment segment : segments) {            Embedding embedding = embeddingModel.embed(segment.text()).content();            embeddingStore.add(embedding, segment);        }    }}

错误处理与监控

1. 异常处理

@Slf4jpublicclassRobustDocumentProcessor{public ProcessResult processDocumentSafely(Document document){try {// 验证文档if (document == null || document.text().trim().isEmpty()) {return ProcessResult.failure("Document is empty or null");            }// 检查文档大小if (document.text().length() > MAX_DOCUMENT_SIZE) {return ProcessResult.failure("Document exceeds maximum size limit");            }// 执行处理            DocumentSplitter splitter = DocumentSplitters.recursive(50050);            List<TextSegment> segments = splitter.split(document);// 存储到向量数据库            List<Embedding> embeddings = embeddingModel.embedAll(                segments.stream()                    .map(TextSegment::text)                    .collect(Collectors.toList())            ).content();            embeddingStore.addAll(embeddings, segments);return ProcessResult.success(segments.size());        } catch (Exception e) {            log.error("Error processing document", e);return ProcessResult.failure("Processing error: " + e.getMessage());        }    }privatestaticclassProcessResult{privatefinalboolean success;privatefinal String message;privatefinalint processedCount;privateProcessResult(boolean success, String message, int processedCount){this.success = success;this.message = message;this.processedCount = processedCount;        }publicstatic ProcessResult success(int count){returnnew ProcessResult(true"Success", count);        }publicstatic ProcessResult failure(String message){returnnew ProcessResult(false, message, 0);        }    }}

2. 处理指标监控

@ComponentpublicclassProcessingMetrics{privatefinal MeterRegistry meterRegistry;privatefinal Timer documentProcessingTimer;privatefinal Counter documentsProcessedCounter;privatefinal Gauge averageDocumentSizeGauge;publicProcessingMetrics(MeterRegistry meterRegistry){this.meterRegistry = meterRegistry;this.documentProcessingTimer = Timer.builder("document.processing.duration")            .description("Time taken to process documents")            .register(meterRegistry);this.documentsProcessedCounter = Counter.builder("documents.processed")            .description("Number of documents processed")            .register(meterRegistry);this.averageDocumentSizeGauge = Gauge.builder("document.average.size")            .description("Average size of processed documents")            .register(meterRegistry, this, ProcessingMetrics::getAverageDocumentSize);    }publicvoidrecordProcessing(Document document){        Timer.Sample sample = Timer.start(meterRegistry);// 处理文档        processDocument(document);// 记录指标        sample.stop(documentProcessingTimer);        documentsProcessedCounter.increment();        updateAverageDocumentSize(document.text().length());    }privatevolatiledouble averageDocumentSize = 0;privatevolatileint documentCount = 0;privatesynchronizedvoidupdateAverageDocumentSize(int size){        documentCount++;        averageDocumentSize = ((averageDocumentSize * (documentCount - 1)) + size) / documentCount;    }publicdoublegetAverageDocumentSize(){return averageDocumentSize;    }}

总结

文档加载、拆分和向量化是构建高质量RAG系统的基础。选择合适的文档加载器、拆分策略和嵌入模型对于系统性能至关重要。通过合理的批处理、内存管理和错误处理,可以构建高效、可靠的文档处理管道。

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » 4、LangChain4j 进阶:文档加载、拆分与向量化实战

评论 抢沙发

5 + 2 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮