乐于分享
好东西不私藏

RAGFlow源码解析-6、Task Executor与高级RAG特性深度解析(第四周)

RAGFlow源码解析-6、Task Executor与高级RAG特性深度解析(第四周)

一、Task Executor核心架构

1.1 全局配置与信号处理

源码位置rag/svr/task_executor.py:82-140

BATCH_SIZE = 64  # 第82行:批处理大小# 第84-101行:解析器工厂映射表FACTORY = {    "general": naive,    ParserType.NAIVE.value: naive,    ParserType.PAPER.value: paper,    ParserType.BOOK.value: book,    ParserType.PRESENTATION.value: presentation,    ParserType.MANUAL.value: manual,    ParserType.LAWS.value: laws,    ParserType.QA.value: qa,    ParserType.TABLE.value: table,    ParserType.RESUME.value: resume,    ParserType.PICTURE.value: picture,    ParserType.ONE.value: one,    ParserType.AUDIO.value: audio,    ParserType.EMAIL.value: email,    ParserType.KG.value: naive,  # 知识图谱使用naive解析器    ParserType.TAG.value: tag}# 第103-109行:任务类型到Pipeline任务类型映射TASK_TYPE_TO_PIPELINE_TASK_TYPE = {    "dataflow": PipelineTaskType.PARSE,    "raptor": PipelineTaskType.RAPTOR,    "graphrag": PipelineTaskType.GRAPH_RAG,    "mindmap": PipelineTaskType.MINDMAP,    "memory": PipelineTaskType.MEMORY,}# 第113-119行:消费者状态全局变量CONSUMER_NO = "0" if len(sys.argv) < 2 else sys.argv[1]  # 第113行:消费者编号CONSUMER_NAME = "task_executor_" + CONSUMER_NO  # 第114行:消费者名称BOOT_AT = datetime.now().astimezone().isoformat(timespec="milliseconds")  # 第115行:启动时间PENDING_TASKS = 0  # 第116行:待处理任务数LAG_TASKS = 0  # 第117行:延迟任务数DONE_TASKS = 0  # 第118行:已完成任务数FAILED_TASKS = 0  # 第119行:失败任务数# 第121行:当前执行任务字典CURRENT_TASKS = {}# 第123-131行:并发控制信号量MAX_CONCURRENT_TASKS = int(os.environ.get('MAX_CONCURRENT_TASKS', "5"))  # 第123行:最大并发任务数MAX_CONCURRENT_CHUNK_BUILDERS = int(os.environ.get('MAX_CONCURRENT_CHUNK_BUILDERS', "1"))  # 第124行MAX_CONCURRENT_MINIO = int(os.environ.get('MAX_CONCURRENT_MINIO', '10'))  # 第125行task_limiter = asyncio.Semaphore(MAX_CONCURRENT_TASKS)  # 第126行:任务并发限制chunk_limiter = asyncio.Semaphore(MAX_CONCURRENT_CHUNK_BUILDERS)  # 第127行:chunk构建限制embed_limiter = asyncio.Semaphore(MAX_CONCURRENT_CHUNK_BUILDERS)  # 第128行:embedding限制minio_limiter = asyncio.Semaphore(MAX_CONCURRENT_MINIO)  # 第129行:MinIO操作限制kg_limiter = asyncio.Semaphore(2)  # 第130行:知识图谱限制(固定为2)WORKER_HEARTBEAT_TIMEOUT = int(os.environ.get('WORKER_HEARTBEAT_TIMEOUT', '120'))  # 第131行stop_event = threading.Event()  # 第132行:停止事件标志# 第135-139行:信号处理器def signal_handler(sig, frame):    logging.info("Received interrupt signal, shutting down...")    stop_event.set()  # 第137行:设置停止标志    time.sleep(1)    sys.exit(0)  # 第139行:优雅退出

并发控制架构图

关键技术点

  1. 解析器工厂模式
    (第84-101行):FACTORY字典将ParserType枚举映射到具体解析器模块,支持动态扩展新解析器
  2. 多级并发控制
    (第126-130行):不同资源使用不同信号量,防止资源竞争(如MinIO限制10并发,KG限制2并发)
  3. 消费者命名机制
    (第113-114行):支持多消费者并行处理,通过命令行参数sys.argv[1]区分不同消费者实例
  4. 优雅退出机制
    (第135-139行):signal_handler捕获中断信号,设置stop_event通知所有线程停止

1.2 进度回调与任务取消检测

源码位置rag/svr/task_executor.py:142-174

def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing..."):    try:        # 第144-145行:负进度表示错误        if prog is not None and prog < 0:            msg = "[ERROR]" + msg        # 第146-150行:检测任务是否取消        cancel = has_canceled(task_id)        if cancel:            msg += " [Canceled]"            prog = -1        # 第152-157行:构建进度消息        if to_page > 0:            if msg:                if from_page < to_page:                    msg = f"Page({from_page + 1}~{to_page + 1}): " + msg        if msg:            msg = datetime.now().strftime("%H:%M:%S") + " " + msg        # 第158-162行:更新数据库进度        d = {"progress_msg": msg}        if prog is not None:            d["progress"] = prog        TaskService.update_progress(task_id, d)        # 第164-166行:关闭数据库连接并检测取消        close_connection()        if cancel:            raise TaskCanceledException(msg)  # 第166行:抛出取消异常        logging.info(f"set_progress({task_id}), progress: {prog}, progress_msg: {msg}")    except TaskCanceledException:        raise  # 第169行:重新抛出取消异常    except DoesNotExist:        logging.warning(f"set_progress({task_id}) got exception DoesNotExist")  # 第171行:任务不存在警告    except Exception as e:        logging.exception(f"set_progress({task_id}), progress: {prog}, progress_msg: {msg}, got exception: {e}")

进度回调流程图

渲染错误: Mermaid 渲染失败: Parse error on line 3: ...  B -->|是| C[msg = "[ERROR]" + msg]     -----------------------^ Expecting 'SQE', 'DOUBLECIRCLEEND', 'PE', '-)', 'STADIUMEND', 'SUBROUTINEEND', 'PIPE', 'CYLINDEREND', 'DIAMOND_STOP', 'TAGEND', 'TRAPEND', 'INVTRAPEND', 'UNICODE_TEXT', 'TEXT', 'TAGSTART', got 'STR'

设计意图分析

  1. 双取消检测
    (第146、164-166行):在进度更新前后都检测取消状态,确保任务及时终止
  2. 进度消息格式化
    (第152-157行):Page(from~to)格式便于追踪多页文档处理进度
  3. 异常分类处理
    (第168-173行):TaskCanceledException重新抛出,DoesNotExist仅警告,其他异常记录完整堆栈

1.3 Redis任务收集机制

源码位置rag/svr/task_executor.py:176-237

async def collect():    global CONSUMER_NAME, DONE_TASKS, FAILED_TASKS    global UNACKED_ITERATOR    # 第180行:获取服务队列名称列表    svr_queue_names = settings.get_svr_queue_names()    redis_msg = None    try:        # 第183-184行:获取未确认消息迭代器        if not UNACKED_ITERATOR:            UNACKED_ITERATOR = REDIS_CONN.get_unacked_iterator(                svr_queue_names,                 SVR_CONSUMER_GROUP_NAME,                 CONSUMER_NAME            )        try:            # 第186行:尝试从迭代器获取下一条消息            redis_msg = next(UNACKED_ITERATOR)        except StopIteration:            # 第188-191行:迭代器耗尽,从队列消费者获取新消息            for svr_queue_name in svr_queue_names:                redis_msg = REDIS_CONN.queue_consumer(                    svr_queue_name,                     SVR_CONSUMER_GROUP_NAME,                     CONSUMER_NAME                )                if redis_msg:                    break    except Exception as e:        logging.exception(f"collect got exception: {e}")        return NoneNone  # 第194行:异常时返回None    # 第196-202行:消息为空或内容为空,确认并返回    if not redis_msg:        return NoneNone    msg = redis_msg.get_message()    if not msg:        logging.error(f"collect got empty message of {redis_msg.get_msg_id()}")        redis_msg.ack()  # 第201行:确认空消息,防止重复处理        return NoneNone    # 第204-216行:根据doc_id和task_type获取任务详情    canceled = False    if msg.get("doc_id"""in [GRAPH_RAPTOR_FAKE_DOC_ID, CANVAS_DEBUG_DOC_ID]:        task = msg        if task["task_type"in PIPELINE_SPECIAL_PROGRESS_FREEZE_TASK_TYPES:            task = TaskService.get_task(msg["id"], msg["doc_ids"])            if task:                task["doc_id"] = msg["doc_id"]                task["doc_ids"] = msg.get("doc_ids", []) or []    elif msg.get("task_type") == PipelineTaskType.MEMORY.lower():        _, task_obj = TaskService.get_by_id(msg["id"])        task = task_obj.to_dict()    else:        task = TaskService.get_task(msg["id"])  # 第216行:标准任务获取    # 第218-225行:检测任务是否取消或不存在    if task:        canceled = has_canceled(task["id"])    if not task or canceled:        state = "is unknown" if not task else "has been cancelled"        FAILED_TASKS += 1        logging.warning(f"collect task {msg['id']}{state}")        redis_msg.ack()  # 第224行:确认已取消或不存在的任务        return NoneNone    # 第227-236行:根据task_type补充任务字段    task_type = msg.get("task_type""")    task["task_type"] = task_type    if task_type[:8] == "dataflow":        task["tenant_id"] = msg["tenant_id"]        task["dataflow_id"] = msg["dataflow_id"]        task["kb_id"] = msg.get("kb_id""")    if task_type[:6] == "memory":        task["memory_id"] = msg["memory_id"]        task["source_id"] = msg["source_id"]        task["message_dict"] = msg["message_dict"]    return redis_msg, task  # 第237行:返回Redis消息和任务对象

Redis任务收集流程图

关键技术点

  1. 未确认消息优先
    (第183-186行):UNACKED_ITERATOR优先处理未确认消息,确保消息可靠性
  2. 多队列轮询
    (第188-191行):遍历所有服务队列,确保不遗漏任何队列的任务
  3. 特殊任务处理
    (第205-216行):GRAPH_RAPTOR_FAKE_DOC_IDCANVAS_DEBUG_DOC_ID使用特殊逻辑获取任务
  4. 任务取消提前检测
    (第218-225行):在任务执行前检测取消状态,避免无效处理

1.4 Chunk构建流程

源码位置rag/svr/task_executor.py:244-515

@timeout(60 * 801)  # 第244行:80分钟超时装饰器async def build_chunks(task, progress_callback):    # 第246-249行:文件大小检查    if task["size"] > settings.DOC_MAXIMUM_SIZE:        set_progress(task["id"], prog=-1, msg="File size exceeds( <= %dMb )" %                                              (int(settings.DOC_MAXIMUM_SIZE / 1024 / 1024)))        return []    # 第251行:从工厂获取解析器    chunker = FACTORY[task["parser_id"].lower()]    try:        st = timer()        # 第254-255行:获取文件存储地址和二进制数据        bucket, name = File2DocumentService.get_storage_address(doc_id=task["doc_id"])        binary = await get_storage_binary(bucket, name)  # 第255行:异步获取文件        logging.info("From minio({}) {}/{}".format(timer() - st, task["location"], task["name"]))    except TimeoutError:        progress_callback(-1"Internal server error: Fetch file from minio timeout. Could you try it again.")        logging.exception("Minio {}/{} got timeout: Fetch file from minio timeout.".format(task["location"], task["name"]))        raise    except Exception as e:        if re.search("(No such file|not found)"str(e)):            progress_callback(-1"Can not find file <%s> from minio. Could you try it again?" % task["name"])        else:            progress_callback(-1"Get file from minio: %s" % str(e).replace("'"""))        logging.exception("Chunking {}/{} got exception".format(task["location"], task["name"]))        raise    try:        # 第271-283行:异步调用chunker.chunk        async with chunk_limiter:  # 第271行:chunk并发限制            cks = await thread_pool_exec(                chunker.chunk,                task["name"],                binary=binary,                from_page=task["from_page"],                to_page=task["to_page"],                lang=task["language"],                callback=progress_callback,                kb_id=task["kb_id"],                parser_config=task["parser_config"],                tenant_id=task["tenant_id"],            )        logging.info("Chunking({}) {}/{} done".format(timer() - st, task["location"], task["name"]))    except TaskCanceledException:        raise    except Exception as e:        progress_callback(-1"Internal server error while chunking: %s" % str(e).replace("'"""))        logging.exception("Chunking {}/{} got exception".format(task["location"], task["name"]))        raise    # 第292-299行:初始化文档基础信息    docs = []    doc = {        "doc_id": task["doc_id"],        "kb_id"str(task["kb_id"])    }    if task["pagerank"]:        doc[PAGERANK_FLD] = int(task["pagerank"])    st = timer()    # 第301-325行:上传图片到MinIO的异步函数    @timeout(60)    async def upload_to_minio(document, chunk):        try:            d = copy.deepcopy(document)            d.update(chunk)            # 第306-307行:生成chunk ID(xxhash)            d["id"] = xxhash.xxh64(                (chunk["content_with_weight"] + str(d["doc_id"])).encode("utf-8""surrogatepass")).hexdigest()            d["create_time"] = str(datetime.now()).replace("T"" ")[:19]            d["create_timestamp_flt"] = datetime.now().timestamp()            # 第311-319行:处理图片            if d.get("img_id"):                docs.append(d)                return            if not d.get("image"):                _ = d.pop("image"None)                d["img_id"] = ""                docs.append(d)                return            # 第320行:将图片转换为ID并上传            await image2id(d, partial(settings.STORAGE_IMPL.put, tenant_id=task["tenant_id"]), d["id"], task["kb_id"])            docs.append(d)        except Exception:            logging.exception("Saving image of chunk {}/{}/{} got exception".format(task["location"], task["name"], d["id"]))            raise    # 第327-337行:并发上传所有chunk    tasks = []    for ck in cks:        tasks.append(asyncio.create_task(upload_to_minio(doc, ck)))    try:        await asyncio.gather(*tasks, return_exceptions=False)    except Exception as e:        logging.error(f"MINIO PUT({task['name']}) got exception: {e}")        for t in tasks:            t.cancel()        await asyncio.gather(*tasks, return_exceptions=True)        raise    el = timer() - st    logging.info("MINIO PUT({}) cost {:.3f} s".format(task["name"], el))    # 第342-373行:自动关键词生成    if task["parser_config"].get("auto_keywords"0):        st = timer()        progress_callback(msg="Start to generate keywords for every chunk ...")        chat_mdl = LLMBundle(task["tenant_id"], LLMType.CHAT, llm_name=task["llm_id"], lang=task["language"])        async def doc_keyword_extraction(chat_mdl, d, topn):            cached = get_llm_cache(chat_mdl.llm_name, d["content_with_weight"], "keywords", {"topn": topn})            if not cached:                if has_canceled(task["id"]):                    progress_callback(-1, msg="Task has been canceled.")                    return                async with chat_limiter:                    cached = await keyword_extraction(chat_mdl, d["content_with_weight"], topn)                set_llm_cache(chat_mdl.llm_name, d["content_with_weight"], cached, "keywords", {"topn": topn})            if cached:                d["important_kwd"] = cached.split(",")                d["important_tks"] = rag_tokenizer.tokenize(" ".join(d["important_kwd"]))            return        tasks = []        for d in docs:            tasks.append(asyncio.create_task(doc_keyword_extraction(chat_mdl, d, task["parser_config"]["auto_keywords"])))        try:            await asyncio.gather(*tasks, return_exceptions=False)        except Exception as e:            logging.error("Error in doc_keyword_extraction: {}".format(e))            for t in tasks:                t.cancel()            await asyncio.gather(*tasks, return_exceptions=True)            raise        progress_callback(msg="Keywords generation {} chunks completed in {:.2f}s".format(len(docs), timer() - st))    # 第375-405行:自动问题生成(类似关键词生成)    if task["parser_config"].get("auto_questions"0):        # ...省略类似逻辑...    # 第407-448行:元数据生成    if task["parser_config"].get("enable_metadata"Falseand task["parser_config"].get("metadata"):        # ...省略类似逻辑...    # 第450-513行:标签生成    if task["kb_parser_config"].get("tag_kb_ids", []):        # ...省略类似逻辑...    return docs  # 第515行:返回所有chunk文档

Chunk构建完整流程图

关键技术点

  1. 超时装饰器
    (第244行):@timeout(60 * 80, 1)设置80分钟超时,防止长时间挂起
  2. xxhash ID生成
    (第306-307行):使用xxhash算法生成唯一ID,速度快且碰撞率低
  3. 并发上传MinIO
    (第327-337行):asyncio.gather并发上传所有chunk图片,提升性能
  4. LLM缓存机制
    (第348、381、413行):get_llm_cacheset_llm_cache避免重复调用LLM,节省成本

1.5 Embedding向量化流程

源码位置rag/svr/task_executor.py:570-621

async def embedding(docs, mdl, parser_config=None, callback=None):    if parser_config is None:        parser_config = {}    # 第573-582行:准备标题和内容文本    tts, cnts = [], []    for d in docs:        tts.append(d.get("docnm_kwd""Title"))  # 第575行:标题文本        c = "\n".join(d.get("question_kwd", []))  # 第576行:问题文本优先        if not c:            c = d["content_with_weight"]  # 第578行:否则使用内容        c = re.sub(r"</?(table|td|caption|tr|th)( [^<>]{0,12})?>"" ", c)  # 第579行:去除HTML标签        if not c:            c = "None"        cnts.append(c)    tk_count = 0    # 第585-588行:标题向量化(复用第一个向量)    if len(tts) == len(cnts):        vts, c = await thread_pool_exec(mdl.encode, tts[0:1])        tts = np.tile(vts[0], (len(cnts), 1))  # 第587行:复制标题向量到所有chunk        tk_count += c    # 第590-593行:批量编码函数(带截断)    @timeout(60)    def batch_encode(txts):        nonlocal mdl        return mdl.encode([truncate(c, mdl.max_length - 10for c in txts])    # 第595-605行:分批向量化    cnts_ = np.array([])    for i in range(0len(cnts), settings.EMBEDDING_BATCH_SIZE):        async with embed_limiter:  # 第597行:embedding并发限制            vts, c = await thread_pool_exec(batch_encode, cnts[i: i + settings.EMBEDDING_BATCH_SIZE])        if len(cnts_) == 0:            cnts_ = vts        else:            cnts_ = np.concatenate((cnts_, vts), axis=0)  # 第602行:NumPy拼接        tk_count += c        callback(prog=0.7 + 0.2 * (i + 1) / len(cnts), msg="")  # 第604行:进度回调    cnts = cnts_    # 第606-613行:标题权重融合    filename_embd_weight = parser_config.get("filename_embd_weight"0.1)    if not filename_embd_weight:        filename_embd_weight = 0.1    title_w = float(filename_embd_weight)    if tts.ndim == 2 and cnts.ndim == 2 and tts.shape == cnts.shape:        vects = title_w * tts + (1 - title_w) * cnts  # 第611行:加权融合    else:        vects = cnts    # 第615-620行:将向量添加到文档    assert len(vects) == len(docs)    vector_size = 0    for i, d in enumerate(docs):        v = vects[i].tolist()        vector_size = len(v)        d["q_%d_vec" % len(v)] = v  # 第620行:动态字段名q_1024_vec    return tk_count, vector_size

Embedding向量化流程图

关键技术点

  1. 标题向量复用
    (第587行):np.tile(vts[0], (len(cnts), 1))所有chunk共享同一标题向量,节省计算
  2. 文本截断保护
    (第593行):truncate(c, mdl.max_length - 10)预留10个token防止边界溢出
  3. 加权向量融合
    (第611行):title_w * tts + (1 - title_w) * cnts标题和内容向量加权融合,默认标题权重0.1
  4. 动态字段名
    (第620行):d["q_%d_vec" % len(v)]根据向量维度动态生成字段名(如q_1024_vec)

1.6 RAPTOR递归摘要处理

源码位置rag/svr/task_executor.py:765-859

@timeout(3600)  # 第765行:1小时超时async def run_raptor_for_kb(row, kb_parser_config, chat_mdl, embd_mdl, vector_size, callback=None, doc_ids=[]):    fake_doc_id = GRAPH_RAPTOR_FAKE_DOC_ID  # 第767行:虚拟文档ID    raptor_config = kb_parser_config.get("raptor", {})    vctr_nm = "q_%d_vec" % vector_size  # 第770行:向量字段名    res = []    tk_count = 0    max_errors = int(os.environ.get("RAPTOR_MAX_ERRORS"3))  # 第774行:最大错误数    # 第776-809行:generate函数定义    async def generate(chunks, did):        nonlocal tk_count, res        raptor = Raptor(            raptor_config.get("max_cluster"64),  # 第779行:最大聚类数            chat_mdl,            embd_mdl,            raptor_config["prompt"],            raptor_config["max_token"],            raptor_config["threshold"],            max_errors=max_errors,        )        original_length = len(chunks)        chunks = await raptor(chunks, kb_parser_config["raptor"]["random_seed"], callback, row["id"])  # 第788行:调用Raptor        # 第789-797行:构建文档基础信息        doc = {            "doc_id": did,            "kb_id": [str(row["kb_id"])],            "docnm_kwd": row["name"],            "title_tks": rag_tokenizer.tokenize(row["name"]),            "raptor_kwd""raptor"  # 第794行:标记为raptor chunk        }        if row["pagerank"]:            doc[PAGERANK_FLD] = int(row["pagerank"])        # 第799-809行:添加新生成的摘要chunk        for content, vctr in chunks[original_length:]:            d = copy.deepcopy(doc)            d["id"] = xxhash.xxh64((content + str(fake_doc_id)).encode("utf-8")).hexdigest()            d["create_time"] = str(datetime.now()).replace("T"" ")[:19]            d["create_timestamp_flt"] = datetime.now().timestamp()            d[vctr_nm] = vctr.tolist()            d["content_with_weight"] = content            d["content_ltks"] = rag_tokenizer.tokenize(content)            d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"])            res.append(d)            tk_count += num_tokens_from_string(content)    # 第811-834行:file scope处理(每个文档单独处理)    if raptor_config.get("scope""file") == "file":        for x, doc_id in enumerate(doc_ids):            chunks = []            skipped_chunks = 0            # 第815-823行:从文档存储获取chunks            for d in settings.retriever.chunk_list(doc_id, row["tenant_id"], [str(row["kb_id"])],                                                   fields=["content_with_weight", vctr_nm],                                                   sort_by_position=True):                if vctr_nm not in d or d[vctr_nm] is None:                    skipped_chunks += 1                    logging.warning(f"RAPTOR: Chunk missing vector field '{vctr_nm}' in doc {doc_id}, skipping")                    continue                chunks.append((d["content_with_weight"], np.array(d[vctr_nm])))            if skipped_chunks > 0:                callback(msg=f"[WARN] Skipped {skipped_chunks} chunks without vector field '{vctr_nm}' for doc {doc_id}.")            if not chunks:                logging.warning(f"RAPTOR: No valid chunks with vectors found for doc {doc_id}")                callback(msg=f"[WARN] No valid chunks with vectors found for doc {doc_id}, skipping")                continue            await generate(chunks, doc_id)            callback(prog=(x + 1.) / len(doc_ids))    # 第835-857行:knowledge base scope处理(所有文档合并处理)    else:        chunks = []        skipped_chunks = 0        for doc_id in doc_ids:            for d in settings.retriever.chunk_list(doc_id, row["tenant_id"], [str(row["kb_id"])],                                                   fields=["content_with_weight", vctr_nm],                                                   sort_by_position=True):                if vctr_nm not in d or d[vctr_nm] is None:                    skipped_chunks += 1                    continue                chunks.append((d["content_with_weight"], np.array(d[vctr_nm])))        if skipped_chunks > 0:            callback(msg=f"[WARN] Skipped {skipped_chunks} chunks without vector field '{vctr_nm}'.")        if not chunks:            logging.error(f"RAPTOR: No valid chunks with vectors found in any document for kb {row['kb_id']}")            callback(msg=f"[ERROR] No valid chunks with vectors found.")            return res, tk_count        await generate(chunks, fake_doc_id)    return res, tk_count

RAPTOR处理流程图

关键技术点

  1. 双scope模式
    (第811、835行):file scope每个文档独立处理,knowledge base scope所有文档合并处理
  2. 向量字段检查
    (第819、843行):跳过缺少向量字段的chunk,防止后续处理错误
  3. raptor_kwd标记
    (第794行):"raptor_kwd": "raptor"标识摘要chunk,便于检索时区分
  4. max_errors限制
    (第774行):RAPTOR_MAX_ERRORS环境变量控制最大错误数,防止无限重试

二、NLP检索与分词系统

2.1 RagTokenizer分词器

源码位置rag/nlp/rag_tokenizer.py:18-57

import infinity.rag_tokenizerclass RagTokenizer(infinity.rag_tokenizer.RagTokenizer):  # 第18行:继承infinity分词器    def tokenize(self, line: str) -> str:        from common import settings  # 第21行:延迟导入避免循环依赖        if settings.DOC_ENGINE_INFINITY:            return line  # 第23行:Infinity引擎直接返回原文        else:            return super().tokenize(line)  # 第25行:调用父类分词    def fine_grained_tokenize(self, tks: str) -> str:        from common import settings        if settings.DOC_ENGINE_INFINITY:            return tks  # 第30行:Infinity引擎直接返回        else:            return super().fine_grained_tokenize(tks)  # 第32行:细粒度分词# 第35-44行:工具函数包装def is_chinese(s):    return infinity.rag_tokenizer.is_chinese(s)def is_number(s):    return infinity.rag_tokenizer.is_number(s)def is_alphabet(s):    return infinity.rag_tokenizer.is_alphabet(s)def naive_qie(txt):    return infinity.rag_tokenizer.naive_qie(txt)# 第51-57行:全局实例和函数绑定tokenizer = RagTokenizer()tokenize = tokenizer.tokenize  # 第52行:绑定实例方法fine_grained_tokenize = tokenizer.fine_grained_tokenizetag = tokenizer.tagfreq = tokenizer.freqtradi2simp = tokenizer._tradi2simp  # 第56行:繁体转简体strQ2B = tokenizer._strQ2B  # 第57行:全角转半角

分词器继承关系图

设计意图分析

  1. 引擎适配
    (第22-25、29-32行):Infinity引擎内置分词功能,直接返回原文;其他引擎需要显式分词
  2. 延迟导入
    (第21、28行):from common import settings延迟导入避免循环依赖
  3. 函数绑定
    (第52-57行):将实例方法绑定到模块级函数,简化调用(如rag_tokenizer.tokenize(text)

2.2 FulltextQueryer全文查询构建

源码位置rag/nlp/query.py:27-172

class FulltextQueryer(QueryBase):    def __init__(self):        self.tw = term_weight.Dealer()  # 第29行:词权重计算器        self.syn = synonym.Dealer()  # 第30行:同义词查找器        self.query_fields = [  # 第31-39行:查询字段列表(带权重)            "title_tks^10",  # 标题权重10            "title_sm_tks^5",  # 标题细粒度权重5            "important_kwd^30",  # 重要关键词权重30            "important_tks^20",  # 重要token权重20            "question_tks^20",  # 问题token权重20            "content_ltks^2",  # 内容长token权重2            "content_sm_ltks",  # 内容细粒度token权重1        ]    def question(self, txt, tbl="qa", min_match: float = 0.6):        original_query = txt        # 第43-48行:文本预处理        txt = self.add_space_between_eng_zh(txt)  # 第43行:中英文添加空格        txt = re.sub(            r"[ :|\r\n\t,,。??/`!!&^%%()\[\]{}<>]+",            " ",            rag_tokenizer.tradi2simp(rag_tokenizer.strQ2B(txt.lower())),  # 第47行:繁体转简体、全角转半角、小写        ).strip()        otxt = txt        txt = self.rmWWW(txt)  # 第50行:去除www前缀        # 第52-86行:非中文处理        if not self.is_chinese(txt):            txt = self.rmWWW(txt)            tks = rag_tokenizer.tokenize(txt).split()            keywords = [t for t in tks if t]            tks_w = self.tw.weights(tks, preprocess=False)  # 第56行:计算词权重            tks_w = [(re.sub(r"[ \\\"'^]""", tk), w) for tk, w in tks_w]            tks_w = [(re.sub(r"^[\+-]""", tk), w) for tk, w in tks_w if tk]            tks_w = [(tk.strip(), w) for tk, w in tks_w if tk.strip()]            syns = []            for tk, w in tks_w[:256]:                syn = [rag_tokenizer.tokenize(s) for s in self.syn.lookup(tk)]  # 第62行:查找同义词                keywords.extend(syn)                syn = ["\"{}\"^{:.4f}".format(s, w / 4.for s in syn if s.strip()]                syns.append(" ".join(syn))            # 第67-80行:构建查询字符串            q = ["({}^{:.4f}".format(tk, w) + " {})".format(syn) for (tk, w), syn in zip(tks_w, syns) if                 tk and not re.match(r"[.^+\(\)-]", tk)]            for i in range(1len(tks_w)):                left, right = tks_w[i - 1][0].strip(), tks_w[i][0].0].strip()                if not left or not right:                    continue                q.append(                    '"%s %s"^%.4f'                    % (                        tks_w[i - 1][0],                        tks_w[i][0],                        max(tks_w[i - 1][1], tks_w[i][1]) * 2,  # 第78行:相邻词组合权重翻倍                    )                )            if not q:                q.append(txt)            query = " ".join(q)            return MatchTextExpr(                self.query_fields, query, 100, {"original_query": original_query}            ), keywords        # 第88-172行:中文处理(省略详细逻辑)        def need_fine_grained_tokenize(tk):            if len(tk) < 3:                return False            if re.match(r"[0-9a-z\.\+#_\*-]+$", tk):                return False            return True        txt = self.rmWWW(txt)        qs, keywords = [], []        for tt in self.tw.split(txt)[:256]:            # ...中文分词和查询构建逻辑...        if qs:            query = " OR ".join([f"({t})" for t in qs if t])            if not query:                query = otxt            return MatchTextExpr(                self.query_fields, query, 100, {"minimum_should_match": min_match, "original_query": original_query}            ), keywords        return None, keywords

查询构建流程图

关键技术点

  1. 字段权重配置
    (第31-39行):title_tks^10表示标题字段权重10,important_kwd^30重要关键词权重最高
  2. 同义词扩展
    (第62、102行):self.syn.lookup(tk)查找同义词并添加到查询,提升召回率
  3. 相邻词组合
    (第69-80行):相邻词组合权重翻倍,如"machine learning"组合权重高于单独的"machine"和"learning"
  4. minimum_should_match
    (第170行):min_match参数控制至少匹配的词数比例,防止召回过多无关文档

2.3 Dealer检索器核心

源码位置rag/nlp/search.py:36-171

class Dealer:    def __init__(self, dataStore: DocStoreConnection):        self.qryr = query.FulltextQueryer()  # 第38行:全文查询构建器        self.dataStore = dataStore  # 第39行:文档存储连接    @dataclass    class SearchResult:  # 第41-50行:搜索结果数据类        total: int        ids: list[str]        query_vector: list[float] | None = None        field: dict | None = None        highlight: dict | None = None        aggregation: list | dict | None = None        keywords: list[str] | None = None        group_docs: list[list] | None = None    async def get_vector(self, txt, emb_mdl, topk=10, similarity=0.1):        qv, _ = await thread_pool_exec(emb_mdl.encode_queries, txt)  # 第53行:异步编码查询        shape = np.array(qv).shape        if len(shape) > 1:            raise Exception(f"Dealer.get_vector returned array's shape {shape} doesn't match expectation.")        embedding_data = [get_float(v) for v in qv]        vector_column_name = f"q_{len(embedding_data)}_vec"  # 第59行:动态向量字段名        return MatchDenseExpr(vector_column_name, embedding_data, 'float''cosine', topk, {"similarity": similarity})    def get_filters(self, req):  # 第62-72行:构建过滤条件        condition = dict()        for key, field in {"kb_ids""kb_id""doc_ids""doc_id"}.items():            if key in req and req[key] is not None:                condition[field] = req[key]        for key in ["knowledge_graph_kwd""available_int""entity_kwd""from_entity_kwd""to_entity_kwd""removed_kwd"]:            if key in req and req[key] is not None:                condition[key] = req[key]        return condition    async def search(self, req, idx_names: str | list[str], kb_ids: list[str], emb_mdl=None, highlight: bool | list | None = None, rank_feature: dict | None = None):        if highlight is None:            highlight = False        filters = self.get_filters(req)  # 第83行:获取过滤条件        orderBy = OrderByExpr()        pg = int(req.get("page"1)) - 1        topk = int(req.get("topk"1024))        ps = int(req.get("size", topk))        offset, limit = pg * ps, ps        # 第91-95行:设置返回字段        src = req.get("fields", ["docnm_kwd""content_ltks""kb_id""img_id""title_tks""important_kwd""position_int",                                 "doc_id""page_num_int""top_int""create_timestamp_flt""knowledge_graph_kwd",                                 "question_kwd""question_tks""doc_type_kwd""available_int""content_with_weight""mom_id", PAGERANK_FLD, TAG_FLD])        kwds = set([])        qst = req.get("question""")        q_vec = []        if not qst:  # 第100-107行:无查询字符串,仅排序            if req.get("sort"):                orderBy.asc("page_num_int")                orderBy.asc("top_int")                orderBy.desc("create_timestamp_flt")            res = self.dataStore.search(src, [], filters, [], orderBy, offset, limit, idx_names, kb_ids)            total = self.dataStore.get_total(res)        else:            highlightFields = ["content_ltks""title_tks"]            if not highlight:                highlightFields = []            elif isinstance(highlight, list):                highlightFields = highlight            matchText, keywords = self.qryr.question(qst, min_match=0.3)  # 第114行:构建全文查询            if emb_mdl is None:  # 第115-120行:仅全文检索                matchExprs = [matchText]                res = await thread_pool_exec(self.dataStore.search, src, highlightFields, filters, matchExprs, orderBy, offset, limit, idx_names, kb_ids, rank_feature=rank_feature)                total = self.dataStore.get_total(res)            else:  # 第121-147行:混合检索(全文+向量)                matchDense = await self.get_vector(qst, emb_mdl, topk, req.get("similarity"0.1))                q_vec = matchDense.embedding_data                if not settings.DOC_ENGINE_INFINITY:                    src.append(f"q_{len(q_vec)}_vec")                fusionExpr = FusionExpr("weighted_sum", topk, {"weights""0.05,0.95"})  # 第127行:融合权重(全文5%,向量95%)                matchExprs = [matchText, matchDense, fusionExpr]                res = await thread_pool_exec(self.dataStore.search, src, highlightFields, filters, matchExprs, orderBy, offset, limit, idx_names, kb_ids, rank_feature=rank_feature)                total = self.dataStore.get_total(res)                # 第136-147行:结果为空时降低阈值重试                if total == 0:                    if filters.get("doc_id"):                        res = await thread_pool_exec(self.dataStore.search, src, [], filters, [], orderBy, offset, limit, idx_names, kb_ids)                        total = self.dataStore.get_total(res)                    else:                        matchText, _ = self.qryr.question(qst, min_match=0.1)  # 第141行:降低min_match                        matchDense.extra_options["similarity"] = 0.17  # 第142行:降低similarity                        res = await thread_pool_exec(self.dataStore.search, src, highlightFields, filters, [matchText, matchDense, fusionExpr], orderBy, offset, limit, idx_names, kb_ids, rank_feature=rank_feature)                        total = self.dataStore.get_total(res)            for k in keywords:                kwds.add(k)                for kk in rag_tokenizer.fine_grained_tokenize(k).split():                    if len(kk) < 2:                        continue                    if kk in kwds:                        continue                    kwds.add(kk)        logging.debug(f"TOTAL: {total}")        ids = self.dataStore.get_doc_ids(res)        keywords = list(kwds)        highlight = self.dataStore.get_highlight(res, keywords, "content_with_weight")        aggs = self.dataStore.get_aggregation(res, "docnm_kwd")        return self.SearchResult(            total=total,            ids=ids,            query_vector=q_vec,            aggregation=aggs,            highlight=highlight,            field=self.dataStore.get_fields(res, src + ["_score"]),            keywords=keywords        )

混合检索流程图

关键技术点

  1. 混合检索融合权重
    (第127行):"weights": "0.05,0.95"全文检索权重5%,向量检索权重95%,向量检索占主导
  2. 降级重试机制
    (第136-147行):结果为空时降低min_match从0.3到0.1,降低similarity从0.1到0.17,提升召回率
  3. 细粒度关键词扩展
    (第149-156行):对每个关键词进行细粒度分词,扩展关键词集合
  4. 动态向量字段
    (第59、125行):f"q_{len(embedding_data)}_vec"根据向量维度动态选择字段

2.4 Rerank重排序机制

源码位置rag/nlp/search.py:294-354

def rerank(self, sres, query, tkweight=0.3, vtweight=0.7, cfield="content_ltks", rank_feature: dict | None = None):    _, keywords = self.qryr.question(query)    vector_size = len(sres.query_vector)    vector_column = f"q_{vector_size}_vec"    zero_vector = [0.0] * vector_size    # 第302-307行:提取候选文档向量    ins_embd = []    for chunk_id in sres.ids:        vector = sres.field[chunk_id].get(vector_column, zero_vector)        if isinstance(vector, str):            vector = [get_float(v) for v in vector.split("\t")]  # 第306行:字符串向量解析        ins_embd.append(vector)    if not ins_embd:        return [], [], []    # 第311-321行:构建token列表(带权重)    for i in sres.ids:        if isinstance(sres.field[i].get("important_kwd", []), str):            sres.field[i]["important_kwd"] = [sres.field[i]["important_kwd"]]    ins_tw = []    for i in sres.ids:        content_ltks = list(OrderedDict.fromkeys(sres.field[i][cfield].split()))  # 第316行:去重保持顺序        title_tks = [t for t in sres.field[i].get("title_tks""").split() if t]        question_tks = [t for t in sres.field[i].get("question_tks""").split() if t]        important_kwd = sres.field[i].get("important_kwd", [])        tks = content_ltks + title_tks * 2 + important_kwd * 5 + question_tks * 6  # 第320行:权重乘法复制        ins_tw.append(tks)    # 第323-324行:计算rank feature分数    rank_fea = self._rank_feature_scores(rank_feature, sres)    # 第326-329行:混合相似度计算    sim, tksim, vtsim = self.qryr.hybrid_similarity(sres.query_vector, ins_embd, keywords, ins_tw, tkweight, vtweight)    return sim + rank_fea, tksim, vtsim  # 第331行:相似度加上rank featuredef rerank_by_model(self, rerank_mdl, sres, query, tkweight=0.3, vtweight=0.7, cfield="content_ltks", rank_feature: dict | None = None):    _, keywords = self.qryr.question(query)    for i in sres.ids:        if isinstance(sres.field[i].get("important_kwd", []), str):            sres.field[i]["important_kwd"] = [sres.field[i]["important_kwd"]]    ins_tw = []    for i in sres.ids:        content_ltks = sres.field[i][cfield].split()        title_tks = [t for t in sres.field[i].get("title_tks""").split() if t]        important_kwd = sres.field[i].get("important_kwd", [])        tks = content_ltks + title_tks + important_kwd        ins_tw.append(tks)    tksim = self.qryr.token_similarity(keywords, ins_tw)  # 第349行:token相似度    vtsim, _ = rerank_mdl.similarity(query, [remove_redundant_spaces(" ".join(tks)) for tks in ins_tw])  # 第350行:模型重排序    rank_fea = self._rank_feature_scores(rank_feature, sres)    return tkweight * np.array(tksim) + vtweight * vtsim + rank_fea, tksim, vtsim  # 第354行:加权融合

Rerank重排序流程图

关键技术点

  1. Token权重复制
    (第320行):title_tks * 2 + important_kwd * 5 + question_tks * 6通过列表乘法实现权重复制
  2. 混合相似度融合
    (第326-329行):hybrid_similarity融合向量相似度和token相似度
  3. Rank Feature加分
    (第324、331行):rank_fea包含PageRank和Tag特征,提升重要文档排名
  4. 模型重排序
    (第350行):rerank_mdl.similarity调用外部重排序模型(如Cohere、Jina)

三、GraphRAG知识图谱构建

3.1 GraphRAG文件结构

rag/graphrag/├── __init__.py├── entity_resolution.py              # 实体消歧├── entity_resolution_prompt.py       # 实体消歧提示词├── query_analyze_prompt.py           # 查询分析提示词├── search.py                         # 图谱检索├── utils.py                          # 工具函数(缓存等)├── light/                            # 轻量级图谱方法│   ├── __init__.py│   ├── graph_extractor.py            # 图提取器│   ├── graph_prompt.py               # 图提取提示词│   └── smoke.py                      # 测试脚本└── general/                          # 通用图谱方法    ├── __init__.py    ├── community_reports_extractor.py # 社区报告提取器    ├── community_report_prompt.py     # 社区报告提示词    ├── entity_embedding.py            # 实体向量化    ├── extractor.py                   # 基础提取器    ├── graph_extractor.py             # 图提取器    ├── graph_prompt.py                # 图提取提示词    ├── index.py                       # 主索引构建入口    ├── leiden.py                      # Leiden社区发现算法    ├── mind_map_extractor.py          # 思维导图提取器    ├── mind_map_prompt.py             # 思维导图提示词    └── smoke.py                       # 测试脚本

GraphRAG架构图


3.2 GraphRAG任务执行流程

源码位置rag/svr/task_executor.py:1055-1102

elif task_type == "graphrag":    ok, kb = KnowledgebaseService.get_by_id(task_dataset_id)    if not ok:        progress_callback(prog=-1.0, msg="Cannot found valid dataset for GraphRAG task")        return    kb_parser_config = kb.parser_config    # 第1062-1080行:GraphRAG配置初始化    if not kb_parser_config.get("graphrag", {}).get("use_graphrag"False):        kb_parser_config.update({            "graphrag": {                "use_graphrag"True,                "entity_types": [  # 第1067-1073行:默认实体类型                    "organization",                    "person",                    "geo",                    "event",                    "category",                ],                "method""light",  # 第1074行:默认使用light方法            }        })        if not KnowledgebaseService.update_by_id(kb.id, {"parser_config": kb_parser_config}):            progress_callback(prog=-1.0, msg="Internal error: Invalid GraphRAG configuration")            return    graphrag_conf = kb_parser_config.get("graphrag", {})    start_ts = timer()    chat_model = LLMBundle(task_tenant_id, LLMType.CHAT, llm_name=kb_task_llm_id, lang=task_language)    with_resolution = graphrag_conf.get("resolution"False)  # 第1085行:是否启用实体消歧    with_community = graphrag_conf.get("community"False)  # 第1086行:是否启用社区发现    async with kg_limiter:  # 第1087行:知识图谱并发限制        result = await run_graphrag_for_kb(            row=task,            doc_ids=task.get("doc_ids", []),            language=task_language,            kb_parser_config=kb_parser_config,            chat_model=chat_model,            embedding_model=embedding_model,            callback=progress_callback,            with_resolution=with_resolution,            with_community=with_community,        )        logging.info(f"GraphRAG task result for task {task}:\n{result}")    progress_callback(prog=1.0, msg="Knowledge Graph done ({:.2f}s)".format(timer() - start_ts))    return

GraphRAG执行流程图

关键技术点

  1. 默认实体类型
    (第1067-1073行):["organization", "person", "geo", "event", "category"]覆盖常见实体类型
  2. 双模式支持
    (第1074行):method: "light"轻量级方法,另有"general"通用方法
  3. 可选高级功能
    (第1085-1086行):resolution实体消歧和community社区发现可选启用
  4. kg_limiter限制
    (第1087行):知识图谱任务并发限制为2,防止LLM调用过载

四、Agent组件系统

4.1 Agent组件文件结构

agent/component/├── __init__.py├── base.py                    # 组件基类├── begin.py                   # 开始节点├── llm.py                     # LLM调用组件├── message.py                 # 消息组件├── categorize.py              # 分类组件├── switch.py                  # 分支组件├── loop.py                    # 循环组件├── loopitem.py                # 循环项├── exit_loop.py               # 退出循环├── iteration.py               # 迭代组件├── iterationitem.py           # 迭代项├── invoke.py                  # 调用组件├── fillup.py                  # 填充组件├── string_transform.py        # 字符串转换├── data_operations.py         # 数据操作├── list_operations.py         # 列表操作├── variable_assigner.py       # 变量赋值├── varaiable_aggregator.py    # 变量聚合├── agent_with_tools.py        # 带工具的Agent├── excel_processor.py         # Excel处理└── docs_generator.py          # 文档生成

Agent组件分类图


4.2 Agent组件基类设计

源码位置agent/component/base.py(推测结构)

class ComponentBase:    def __init__(self, component_id, component_name, params):        self._id = component_id        self.component_name = component_name        self.params = params        self._output = {}        self._error = ""    async def invoke(self, **kwargs):        raise NotImplementedError("Subclasses must implement invoke method")    def output(self):        return self._output    def error(self):        return self._error    def get_downstream(self):        # 返回下游组件ID列表        return self.params.get("downstream", [])

组件生命周期图


五、完整任务处理流程

5.1 do_handle_task主处理函数

源码位置rag/svr/task_executor.py:949-1192

@timeout(60 * 60 * 31)  # 第949行:3小时超时async def do_handle_task(task):    task_type = task.get("task_type""")    # 第953-955行:memory任务特殊处理    if task_type == "memory":        await handle_save_to_memory_task(task)        return    # 第957-959行:Canvas调试任务特殊处理    if task_type == "dataflow" and task.get("doc_id""") == CANVAS_DEBUG_DOC_ID:        await run_dataflow(task)        return    # 第961-979行:提取任务参数    task_id = task["id"]    task_from_page = task["from_page"]    task_to_page = task["to_page"]    task_tenant_id = task["tenant_id"]    task_embedding_id = task["embd_id"]    task_language = task["language"]    doc_task_llm_id = task["parser_config"].get("llm_id"or task["llm_id"]    kb_task_llm_id = task['kb_parser_config'].get("llm_id"or task["llm_id"]    task['llm_id'] = kb_task_llm_id    task_dataset_id = task["kb_id"]    task_doc_id = task["doc_id"]    task_document_name = task["name"]    task_parser_config = task["parser_config"]    task_start_ts = timer()    toc_thread = None    executor = concurrent.futures.ThreadPoolExecutor()    # 第979行:创建进度回调函数    progress_callback = partial(set_progress, task_id, task_from_page, task_to_page)    # 第981-984行:检测任务取消    task_canceled = has_canceled(task_id)    if task_canceled:        progress_callback(-1, msg="Task has been canceled.")        return    try:        # 第987-995行:绑定Embedding模型        embedding_model = LLMBundle(task_tenant_id, LLMType.EMBEDDING, llm_name=task_embedding_id, lang=task_language)        vts, _ = embedding_model.encode(["ok"])        vector_size = len(vts[0])    except Exception as e:        error_message = f'Fail to bind embedding model: {str(e)}'        progress_callback(-1, msg=error_message)        logging.exception(error_message)        raise    init_kb(task, vector_size)  # 第997行:初始化知识库索引    # 第999-1001行:dataflow任务处理    if task_type[:len("dataflow")] == "dataflow":        await run_dataflow(task)        return    # 第1003-1053行:raptor任务处理(见前文)    if task_type == "raptor":        # ...RAPTOR处理逻辑...    # 第1055-1102行:graphrag任务处理(见前文)    elif task_type == "graphrag":        # ...GraphRAG处理逻辑...    # 第1103-1106行:mindmap任务处理    elif task_type == "mindmap":        progress_callback(1"place holder")        pass        return    # 第1107-1178行:标准文档解析任务    else:        task['llm_id'] = doc_task_llm_id        start_ts = timer()        chunks = await build_chunks(task, progress_callback)  # 第1111行:构建chunks        logging.info("Build document {}: {:.2f}s".format(task_document_name, timer() - start_ts))        if not chunks:            progress_callback(1., msg=f"No chunk built from {task_document_name}")            return        progress_callback(msg="Generate {} chunks".format(len(chunks)))        start_ts = timer()        try:            token_count, vector_size = await embedding(chunks, embedding_model, task_parser_config, progress_callback)  # 第1119行:向量化        except TaskCanceledException:            raise        except Exception as e:            error_message = "Generate embedding error:{}".format(str(e))            progress_callback(-1, error_message)            logging.exception(error_message)            token_count = 0            raise        progress_message = "Embedding chunks ({:.2f}s)".format(timer() - start_ts)        logging.info(progress_message)        progress_callback(msg=progress_message)        # 第1131-1132行:TOC提取(可选)        if task["parser_id"].lower() == "naive" and task["parser_config"].get("toc_extraction"False):            toc_thread = executor.submit(build_TOC, task, chunks, progress_callback)        chunk_count = len(set([chunk["id"for chunk in chunks]))        start_ts = timer()        # 第1137-1142行:插入chunks辅助函数        async def _maybe_insert_chunks(_chunks):            if has_canceled(task_id):                progress_callback(-1, msg="Task has been canceled.")                return False            insert_result = await insert_chunks(task_id, task_tenant_id, task_dataset_id, _chunks, progress_callback)            return bool(insert_result)        try:            if not await _maybe_insert_chunks(chunks):  # 第1145行:插入chunks                return            if has_canceled(task_id):                progress_callback(-1, msg="Task has been canceled.")                return            logging.info("Indexing doc({}), page({}-{}), chunks({}), elapsed: {:.2f}".format(                task_document_name, task_from_page, task_to_page, len(chunks), timer() - start_ts            ))            DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0)  # 第1157行:更新文档统计            progress_callback(msg="Indexing done ({:.2f}s).".format(timer() - start_ts))            # 第1161-1166行:TOC插入            if toc_thread:                d = toc_thread.result()                if d:                    if not await _maybe_insert_chunks([d]):                        return                    DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, 010)            if has_canceled(task_id):                progress_callback(-1, msg="Task has been canceled.")                return            task_time_cost = timer() - task_start_ts            progress_callback(prog=1.0, msg="Task done ({:.2f}s)".format(task_time_cost))            logging.info("Chunk doc({}), page({}-{}), chunks({}), token({}), elapsed:{:.2f}".format(                task_document_name, task_from_page, task_to_page, len(chunks), token_count, task_time_cost            ))        finally:            # 第1180-1192行:清理逻辑(取消时删除已插入chunks)            if has_canceled(task_id):                # ...删除逻辑...

完整任务处理流程图


六、设计模式与技术决策总结

6.1 核心设计模式统计表

设计模式
应用位置
实现细节
目的
工厂模式
FACTORY解析器映射 (第84-101行)
字典映射ParserType到解析器模块
动态选择解析器,支持扩展
装饰器模式
@timeout超时装饰器 (第244、765、949行)
函数包装器,超时抛出异常
防止任务长时间挂起
信号量模式
asyncio.Semaphore (第126-130行)
多级信号量控制并发
防止资源竞争,限流保护
回调模式
progress_callback (第979行)
partial函数绑定参数
解耦进度更新与业务逻辑
策略模式
task_type分支 (第953-1106行)
根据task_type选择不同处理策略
支持多种任务类型
模板方法
do_handle_task (第949-1192行)
固定流程:参数提取→绑定模型→处理→清理
标准化任务生命周期
数据类
SearchResult (第41-50行)
@dataclass装饰器
简化数据结构定义

6.2 技术决策对比表

技术点
方案A
方案B
选择理由
任务队列
Redis Stream
内存队列
Redis支持消费者组、消息确认、持久化
并发控制
asyncio.Semaphore
线程池限制
Semaphore支持异步,更灵活
超时机制
@timeout装饰器
asyncio.wait_for
装饰器可复用,支持多层嵌套
向量融合权重
全文5%+向量95%
全文50%+向量50%
向量检索语义理解更强,权重更高
ID生成
xxhash
UUID
xxhash更快,碰撞率足够低
分词引擎
infinity.rag_tokenizer
jieba分词
infinity内置优化,支持多语言
GraphRAG方法
light轻量级
general通用
light更快,适合大多数场景

6.3 性能优化关键点

Task Executor优化

  1. 多级并发限制
    (第126-130行):不同资源使用不同信号量,精细化控制并发
  2. 异步任务收集
    (第176-237行):collect函数异步获取任务,避免阻塞主线程
  3. 批量Embedding
    (第596-605行):EMBEDDING_BATCH_SIZE批量向量化,减少API调用次数
  4. LLM缓存
    (第348、381、413行):get_llm_cache避免重复调用LLM,节省成本和时间

NLP检索优化

  1. 同义词扩展
    (第62、102行):扩展查询词,提升召回率
  2. 降级重试
    (第136-147行):结果为空时降低阈值重试,平衡召回率和准确率
  3. Token权重复制
    (第320行):title_tks * 2通过列表乘法实现权重,简单高效
  4. 细粒度分词
    (第109、151行):细粒度分词扩展关键词,提升匹配精度

GraphRAG优化

  1. kg_limiter限制
    (第1087行):知识图谱任务并发限制为2,防止LLM过载
  2. 可选高级功能
    (第1085-1086行):resolutioncommunity可选启用,节省计算成本
  3. 实体类型预设
    (第1067-1073行):预设常见实体类型,减少LLM识别难度

七、源码行号索引表

7.1 Task Executor核心函数

函数名
行号范围
关键功能
signal_handler
135-139
优雅退出信号处理
set_progress
142-174
进度回调与取消检测
collect
176-237
Redis任务收集
build_chunks
244-515
Chunk构建、图片上传、LLM生成
embedding
570-621
向量化与权重融合
run_raptor_for_kb
765-859
RAPTOR递归摘要
insert_chunks
871-946
批量插入文档存储
do_handle_task
949-1192
主任务处理入口

7.2 NLP检索关键函数

函数名
行号范围
关键功能
RagTokenizer.tokenize
20-25
分词器包装
FulltextQueryer.question
41-172
查询构建与同义词扩展
Dealer.get_vector
52-60
查询向量化
Dealer.search
74-171
混合检索主函数
Dealer.rerank
294-331
重排序计算
Dealer.rerank_by_model
333-354
模型重排序

八、学习建议与实践路径

8.1 理论学习顺序

8.2 实践调试建议

调试Task Executor

# 在rag/svr/task_executor.py:176添加日志async def collect():    print(f"[DEBUG] collect开始,UNACKED_ITERATOR={UNACKED_ITERATOR}")    svr_queue_names = settings.get_svr_queue_names()    print(f"[DEBUG] svr_queue_names={svr_queue_names}")    # ...

调试混合检索

# 在rag/nlp/search.py:127添加日志fusionExpr = FusionExpr("weighted_sum", topk, {"weights""0.05,0.95"})print(f"[DEBUG] 混合检索:matchText={matchText}, matchDense shape={len(q_vec)}, weights=0.05,0.95")

调试GraphRAG

# 在rag/svr/task_executor.py:1089添加日志result = await run_graphrag_for_kb(...)print(f"[DEBUG] GraphRAG结果:实体数={len(result.get('entities', []))}, 关系数={len(result.get('relationships', []))}")

九、常见问题与解决方案

9.1 Task Executor错误诊断

问题1:任务卡在"Processing"状态

原因chunk_limiterembed_limiter信号量耗尽,任务等待资源

解决方案

# 检查并发限制配置docker exec ragflow-container printenv MAX_CONCURRENT_CHUNK_BUILDERS# 建议值:1-2(根据CPU核心数调整)# 检查当前任务数docker exec ragflow-container redis-cli LLEN ragflow_svr_queue

问题2:Embedding超时

原因@timeout(60)装饰器超时,Embedding API响应慢

解决方案

# 在task_executor.py:590增加超时时间@timeout(120)  # 从60秒增加到120秒def batch_encode(txts):    # ...

9.2 NLP检索问题

问题1:检索结果为空

原因min_match阈值过高,查询词匹配不足

解决方案

# 在search.py:114降低min_matchmatchText, keywords = self.qryr.question(qst, min_match=0.1)  # 从0.3降低到0.1

问题2:向量相似度分数异常

原因:向量维度不匹配,不同Embedding模型混用

解决方案

# 在search.py:304添加维度检查vector = sres.field[chunk_id].get(vector_column, zero_vector)if len(vector) != vector_size:    logging.warning(f"向量维度不匹配:期望{vector_size},实际{len(vector)}")    vector = zero_vector

9.3 GraphRAG问题

问题1:实体提取不完整

原因entity_types配置不匹配文档内容

解决方案

# 在task_executor.py:1067扩展实体类型"entity_types": [    "organization",    "person",    "geo",    "event",    "category",    "product",      # 新增产品类型    "technology",   # 新增技术类型    "date",         # 新增日期类型],

问题2:社区发现耗时过长

原因:文档数量过多,Leiden算法计算复杂

解决方案

# 减少GraphRAG并发数export MAX_CONCURRENT_GRAPH_RAG=1  # 从默认2降低到1# 或禁用社区发现# 在知识库配置中设置:graphrag.community=false

十、扩展阅读与参考资源

10.1 官方文档

  • RAPTOR论文
    : https://arxiv.org/abs/2401.18059
  • GraphRAG
    : https://microsoft.github.io/graphrag/
  • Leiden算法
    : https://arxiv.org/abs/1810.08482
  • Redis Stream
    : https://redis.io/docs/data-types/streams/

10.2 源码关联文件

文件路径
关联内容
rag/app/*.py
解析器实现(naive, paper, book等)
rag/raptor.py
RAPTOR核心算法
rag/graphrag/general/index.py
GraphRAG主入口
agent/canvas.py
Agent画布管理
common/doc_store/doc_store_base.py
文档存储抽象基类
common/constants.py
任务类型、字段名常量
基本 文件 流程 错误 SQL 调试
  1. 请求信息 : 2026-05-30 19:26:28 HTTP/1.1 GET : https://www.yeyulingfeng.com/a/688075.html
  2. 运行时间 : 0.129336s [ 吞吐率:7.73req/s ] 内存消耗:4,994.33kb 文件加载:145
  3. 缓存信息 : 0 reads,0 writes
  4. 会话信息 : SESSION_ID=0e706d8c217ef4243cbf6f1629960077
  1. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/public/index.php ( 0.79 KB )
  2. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/autoload.php ( 0.17 KB )
  3. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/autoload_real.php ( 2.49 KB )
  4. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/platform_check.php ( 0.90 KB )
  5. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/ClassLoader.php ( 14.03 KB )
  6. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/autoload_static.php ( 6.05 KB )
  7. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/helper.php ( 8.34 KB )
  8. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-validate/src/helper.php ( 2.19 KB )
  9. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/ralouphie/getallheaders/src/getallheaders.php ( 1.60 KB )
  10. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/helper.php ( 1.47 KB )
  11. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/stubs/load_stubs.php ( 0.16 KB )
  12. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Exception.php ( 1.69 KB )
  13. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-container/src/Facade.php ( 2.71 KB )
  14. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/deprecation-contracts/function.php ( 0.99 KB )
  15. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/polyfill-mbstring/bootstrap.php ( 8.26 KB )
  16. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/polyfill-mbstring/bootstrap80.php ( 9.78 KB )
  17. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/var-dumper/Resources/functions/dump.php ( 1.49 KB )
  18. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-dumper/src/helper.php ( 0.18 KB )
  19. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/var-dumper/VarDumper.php ( 4.30 KB )
  20. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/guzzlehttp/guzzle/src/functions_include.php ( 0.16 KB )
  21. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/guzzlehttp/guzzle/src/functions.php ( 5.54 KB )
  22. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/App.php ( 15.30 KB )
  23. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-container/src/Container.php ( 15.76 KB )
  24. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/container/src/ContainerInterface.php ( 1.02 KB )
  25. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/provider.php ( 0.19 KB )
  26. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Http.php ( 6.04 KB )
  27. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/helper/Str.php ( 7.29 KB )
  28. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Env.php ( 4.68 KB )
  29. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/common.php ( 0.03 KB )
  30. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/helper.php ( 18.78 KB )
  31. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Config.php ( 5.54 KB )
  32. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/alipay.php ( 3.59 KB )
  33. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/facade/Env.php ( 1.67 KB )
  34. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/app.php ( 0.95 KB )
  35. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/cache.php ( 0.78 KB )
  36. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/console.php ( 0.23 KB )
  37. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/cookie.php ( 0.56 KB )
  38. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/database.php ( 2.48 KB )
  39. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/filesystem.php ( 0.61 KB )
  40. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/lang.php ( 0.91 KB )
  41. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/log.php ( 1.35 KB )
  42. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/middleware.php ( 0.19 KB )
  43. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/route.php ( 1.89 KB )
  44. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/session.php ( 0.57 KB )
  45. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/trace.php ( 0.34 KB )
  46. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/view.php ( 0.82 KB )
  47. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/event.php ( 0.25 KB )
  48. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Event.php ( 7.67 KB )
  49. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/service.php ( 0.13 KB )
  50. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/AppService.php ( 0.26 KB )
  51. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Service.php ( 1.64 KB )
  52. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Lang.php ( 7.35 KB )
  53. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/lang/zh-cn.php ( 13.70 KB )
  54. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/initializer/Error.php ( 3.31 KB )
  55. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/initializer/RegisterService.php ( 1.33 KB )
  56. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/services.php ( 0.14 KB )
  57. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/service/PaginatorService.php ( 1.52 KB )
  58. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/service/ValidateService.php ( 0.99 KB )
  59. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/service/ModelService.php ( 2.04 KB )
  60. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-trace/src/Service.php ( 0.77 KB )
  61. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Middleware.php ( 6.72 KB )
  62. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/initializer/BootService.php ( 0.77 KB )
  63. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/Paginator.php ( 11.86 KB )
  64. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-validate/src/Validate.php ( 63.20 KB )
  65. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/Model.php ( 23.55 KB )
  66. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/Attribute.php ( 21.05 KB )
  67. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/AutoWriteData.php ( 4.21 KB )
  68. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/Conversion.php ( 6.44 KB )
  69. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/DbConnect.php ( 5.16 KB )
  70. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/ModelEvent.php ( 2.33 KB )
  71. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/RelationShip.php ( 28.29 KB )
  72. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/contract/Arrayable.php ( 0.09 KB )
  73. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/contract/Jsonable.php ( 0.13 KB )
  74. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/contract/Modelable.php ( 0.09 KB )
  75. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Db.php ( 2.88 KB )
  76. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/DbManager.php ( 8.52 KB )
  77. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Log.php ( 6.28 KB )
  78. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Manager.php ( 3.92 KB )
  79. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/log/src/LoggerTrait.php ( 2.69 KB )
  80. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/log/src/LoggerInterface.php ( 2.71 KB )
  81. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Cache.php ( 4.92 KB )
  82. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/simple-cache/src/CacheInterface.php ( 4.71 KB )
  83. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/helper/Arr.php ( 16.63 KB )
  84. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/cache/driver/File.php ( 7.84 KB )
  85. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/cache/Driver.php ( 9.03 KB )
  86. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/CacheHandlerInterface.php ( 1.99 KB )
  87. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/Request.php ( 0.09 KB )
  88. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Request.php ( 55.78 KB )
  89. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/middleware.php ( 0.25 KB )
  90. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Pipeline.php ( 2.61 KB )
  91. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-trace/src/TraceDebug.php ( 3.40 KB )
  92. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/middleware/SessionInit.php ( 1.94 KB )
  93. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Session.php ( 1.80 KB )
  94. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/session/driver/File.php ( 6.27 KB )
  95. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/SessionHandlerInterface.php ( 0.87 KB )
  96. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/session/Store.php ( 7.12 KB )
  97. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Route.php ( 23.73 KB )
  98. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/RuleName.php ( 5.75 KB )
  99. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/Domain.php ( 2.53 KB )
  100. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/RuleGroup.php ( 22.43 KB )
  101. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/Rule.php ( 26.95 KB )
  102. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/RuleItem.php ( 9.78 KB )
  103. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/route/app.php ( 3.94 KB )
  104. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/facade/Route.php ( 4.70 KB )
  105. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/dispatch/Controller.php ( 4.74 KB )
  106. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/Dispatch.php ( 10.44 KB )
  107. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/controller/Index.php ( 9.87 KB )
  108. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/BaseController.php ( 2.05 KB )
  109. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/facade/Db.php ( 0.93 KB )
  110. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/connector/Mysql.php ( 5.44 KB )
  111. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/PDOConnection.php ( 52.47 KB )
  112. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/Connection.php ( 8.39 KB )
  113. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/ConnectionInterface.php ( 4.57 KB )
  114. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/builder/Mysql.php ( 16.58 KB )
  115. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/Builder.php ( 24.06 KB )
  116. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/BaseBuilder.php ( 27.50 KB )
  117. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/Query.php ( 15.71 KB )
  118. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/BaseQuery.php ( 45.13 KB )
  119. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/TimeFieldQuery.php ( 7.43 KB )
  120. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/AggregateQuery.php ( 3.26 KB )
  121. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/ModelRelationQuery.php ( 20.07 KB )
  122. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/ParamsBind.php ( 3.66 KB )
  123. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/ResultOperation.php ( 7.01 KB )
  124. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/WhereQuery.php ( 19.37 KB )
  125. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/JoinAndViewQuery.php ( 7.11 KB )
  126. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php ( 2.63 KB )
  127. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/Transaction.php ( 2.77 KB )
  128. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/log/driver/File.php ( 5.96 KB )
  129. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/LogHandlerInterface.php ( 0.86 KB )
  130. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/log/Channel.php ( 3.89 KB )
  131. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/event/LogRecord.php ( 1.02 KB )
  132. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/Collection.php ( 16.47 KB )
  133. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/facade/View.php ( 1.70 KB )
  134. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/View.php ( 4.39 KB )
  135. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/controller/Es.php ( 3.30 KB )
  136. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Response.php ( 8.81 KB )
  137. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/response/View.php ( 3.29 KB )
  138. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Cookie.php ( 6.06 KB )
  139. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-view/src/Think.php ( 8.38 KB )
  140. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/TemplateHandlerInterface.php ( 1.60 KB )
  141. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-template/src/Template.php ( 46.61 KB )
  142. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-template/src/template/driver/File.php ( 2.41 KB )
  143. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-template/src/template/contract/DriverInterface.php ( 0.86 KB )
  144. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/runtime/temp/c935550e3e8a3a4c27dd94e439343fdf.php ( 31.50 KB )
  145. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-trace/src/Html.php ( 4.42 KB )
  1. CONNECT:[ UseTime:0.000552s ] mysql:host=127.0.0.1;port=3306;dbname=wenku;charset=utf8mb4
  2. SHOW FULL COLUMNS FROM `fenlei` [ RunTime:0.000781s ]
  3. SELECT * FROM `fenlei` WHERE `fid` = 0 [ RunTime:0.000396s ]
  4. SELECT * FROM `fenlei` WHERE `fid` = 63 [ RunTime:0.000325s ]
  5. SHOW FULL COLUMNS FROM `set` [ RunTime:0.000658s ]
  6. SELECT * FROM `set` [ RunTime:0.000283s ]
  7. SHOW FULL COLUMNS FROM `article` [ RunTime:0.000616s ]
  8. SELECT * FROM `article` WHERE `id` = 688075 LIMIT 1 [ RunTime:0.000964s ]
  9. UPDATE `article` SET `lasttime` = 1780140388 WHERE `id` = 688075 [ RunTime:0.003506s ]
  10. SELECT * FROM `fenlei` WHERE `id` = 64 LIMIT 1 [ RunTime:0.000229s ]
  11. SELECT * FROM `article` WHERE `id` < 688075 ORDER BY `id` DESC LIMIT 1 [ RunTime:0.000412s ]
  12. SELECT * FROM `article` WHERE `id` > 688075 ORDER BY `id` ASC LIMIT 1 [ RunTime:0.000868s ]
  13. SELECT * FROM `article` WHERE `id` < 688075 ORDER BY `id` DESC LIMIT 10 [ RunTime:0.000922s ]
  14. SELECT * FROM `article` WHERE `id` < 688075 ORDER BY `id` DESC LIMIT 10,10 [ RunTime:0.000691s ]
  15. SELECT * FROM `article` WHERE `id` < 688075 ORDER BY `id` DESC LIMIT 20,10 [ RunTime:0.001568s ]
0.133223s