一、Graph基类架构设计
1.1 DSL数据结构定义
源码位置:agent/canvas.py:40-77
classGraph:"""dsl = {"components": {"begin": {"obj":{"component_name": "Begin","params": {},},"downstream": ["answer_0"],"upstream": [],},"retrieval_0": {"obj": {"component_name": "Retrieval","params": {}},"downstream": ["generate_0"],"upstream": ["answer_0"],},"generate_0": {"obj": {"component_name": "Generate","params": {}},"downstream": ["answer_0"],"upstream": ["retrieval_0"],}},"history": [],"path": ["begin"],"retrieval": {"chunks": [], "doc_aggs": []},"globals": {"sys.query": "","sys.files": []}}"""
DSL结构解析表:
components | {"begin": {...}, "retrieval_0": {...}} | ||
components[id].obj | {"component_name": "Begin", "params": {}} | ||
components[id].downstream | ["answer_0"] | ||
components[id].upstream | ["retrieval_0"] | ||
history | [{"role": "user", "content": "..."}] | ||
path | ["begin", "retrieval_0"] | ||
retrieval | {"chunks": [], "doc_aggs": []} | ||
globals | {"sys.query": "", "sys.files": []} |
组件关系图示例:

1.2 Graph初始化与加载
源码位置:agent/canvas.py:79-106
def __init__(self, dsl: str, tenant_id=None, task_id=None, custom_header=None):self.path = [] # 第80行:执行路径初始化self.components = {} # 第81行:组件字典初始化self.error = "" # 第82行:错误信息self.dsl = json.loads(dsl) # 第83行:解析DSL JSON字符串self._tenant_id = tenant_id # 第84行:租户IDself.task_id = task_id if task_id else get_uuid() # 第85行:任务ID(自动生成UUID)self.custom_header = custom_header # 第86行:自定义HTTP头self._thread_pool = ThreadPoolExecutor(max_workers=5) # 第87行:线程池(最大5个工作线程)self.load() # 第88行:加载组件def load(self):self.components = self.dsl["components"] # 第91行:获取组件字典cpn_nms = set([]) # 第92行:组件名称集合# 第93-103行:遍历组件并实例化for k, cpn in self.components.items():cpn_nms.add(cpn["obj"]["component_name"]) # 第94行:记录组件名称# 第95-97行:创建参数对象并更新param = component_class(cpn["obj"]["component_name"] + "Param")() # 第95行:动态创建参数类cpn["obj"]["params"]["custom_header"] = self.custom_headerparam.update(cpn["obj"]["params"]) # 第97行:更新参数try:param.check() # 第99行:参数校验except Exception as e:raise ValueError(self.get_component_name(k) + f": {e}") # 第101行:抛出校验错误# 第103行:实例化组件对象cpn["obj"] = component_class(cpn["obj"]["component_name"])(self, k, param)self.path = self.dsl["path"] # 第105行:恢复执行路径
组件加载流程图:

关键技术点:
- 动态参数类创建
(第95行): component_class(cpn["obj"]["component_name"] + "Param")(),根据组件名称动态创建对应的参数类(如BeginParam、RetrievalParam) - 参数校验机制
(第99-101行): param.check()执行参数校验,失败时抛出包含组件名称的详细错误信息 - 组件工厂模式
(第103行): component_class(component_name)根据组件名称动态选择组件类,实现解耦 - 线程池预分配
(第87行): ThreadPoolExecutor(max_workers=5)预分配5个工作线程,用于并发执行组件
1.3 变量系统设计
源码位置:agent/canvas.py:162-265
def get_value_with_variable(self, value: str) -> Any:# 第163行:正则匹配变量表达式 {{变量名}}pat = re.compile(r"\{* *\{([a-zA-Z:0-9]+@[A-Za-z0-9_.-]+|sys\.[A-Za-z0-9_.]+|env\.[A-Za-z0-9_.]+)\} *\}*")out_parts = []last = 0# 第167-184行:遍历所有匹配项for m in pat.finditer(value):out_parts.append(value[last:m.start()]) # 第168行:添加匹配前的文本key = m.group(1) # 第169行:提取变量名v = self.get_variable_value(key) # 第170行:获取变量值if v is None:rep = ""elif isinstance(v, partial):# 第173-177行:处理partial函数(延迟执行)buf = []for chunk in v():buf.append(chunk)rep = "".join(buf)elif isinstance(v, str):rep = velse:rep = json.dumps(v, ensure_ascii=False) # 第181行:JSON序列化out_parts.append(rep) # 第183行:添加替换值last = m.end()out_parts.append(value[last:]) # 第186行:添加剩余文本return("".join(out_parts))def get_variable_value(self, exp: str) -> Any:exp = exp.strip("{").strip("}").strip(" ").strip("{").strip("}") # 第190行:清理花括号# 第191-192行:全局变量(无@符号)if exp.find("@") < 0:return self.globals[exp]# 第193-204行:组件变量(cpn_id@var_nm格式)cpn_id, var_nm = exp.split("@") # 第193行:分割组件ID和变量名cpn = self.get_component(cpn_id)if not cpn:raise Exception(f"Can't find variable: '{cpn_id}@{var_nm}'")parts = var_nm.split(".", 1) # 第197行:分割根键和路径root_key = parts[0]rest = parts[1] if len(parts) > 1 else ""root_val = cpn["obj"].output(root_key) # 第200行:获取组件输出if not rest:return root_valreturn self.get_variable_param_value(root_val, rest) # 第204行:递归获取嵌套值def get_variable_param_value(self, obj: Any, path: str) -> Any:cur = objif not path:return cur# 第210-232行:遍历路径键for key in path.split('.'):if cur is None:return None# 第214-218行:字符串尝试JSON解析if isinstance(cur, str):try:cur = json.loads(cur)except Exception:return None# 第220-222行:字典类型if isinstance(cur, dict):cur = cur.get(key)continue# 第224-230行:列表/元组类型(支持索引访问)if isinstance(cur, (list, tuple)):try:idx = int(key)cur = cur[idx]except Exception:return Nonecontinuecur = getattr(cur, key, None) # 第232行:对象属性访问return cur
变量表达式语法表:
{{sys.query}} | ||
{{sys.files}} | ||
{{env.API_KEY}} | ||
{{retrieval_0@chunks}} | ||
{{retrieval_0@chunks.0.content}} | ||
{{generate_0@output.text}} |
变量解析流程图:

关键技术点:
- 正则表达式匹配
(第163行): \{* *\{([a-zA-Z:0-9]+@[A-Za-z0-9_.-]+|sys\.[A-Za-z0-9_.]+|env\.[A-Za-z0-9_.]+)\} *\}*支持单花括号和双花括号两种格式 - partial延迟执行
(第173-177行):如果变量值是 partial函数,延迟执行并收集结果,用于流式输出 - 多类型路径访问
(第210-232行):支持字典、列表、对象属性的统一路径访问语法 - 字符串JSON解析
(第214-218行):遇到字符串类型尝试JSON解析,支持JSON字符串嵌套访问
1.4 组件获取与类型判断
源码位置:agent/canvas.py:147-160
def get_component(self, cpn_id) -> Union[None, dict[str, Any]]:return self.components.get(cpn_id) # 第148行:获取组件字典def get_component_obj(self, cpn_id) -> ComponentBase:return self.components.get(cpn_id)["obj"] # 第151行:获取组件实例对象def get_component_type(self, cpn_id) -> str:return self.components.get(cpn_id)["obj"].component_name # 第154行:获取组件类型名称def get_component_input_form(self, cpn_id) -> dict:return self.components.get(cpn_id)["obj"].get_input_form() # 第157行:获取组件输入表单def get_tenant_id(self):return self._tenant_id # 第160行:获取租户ID
组件访问接口设计:

二、Canvas画布系统
2.1 Canvas继承关系
源码位置:agent/canvas.py:279-300
class Canvas(Graph): # 第279行:继承Graph基类def __init__(self, dsl: str, tenant_id=None, task_id=None, canvas_id=None, custom_header=None):# 第282-285行:初始化全局变量self.globals = {"sys.query": "", # 用户查询"sys.files": [] # 文件列表}self.variables = {} # 第286行:局部变量字典# 第287行:调用父类初始化super().__init__(dsl, tenant_id, task_id, custom_header=custom_header)self._id = canvas_id # 第288行:画布IDdef load(self):super().load() # 第291行:调用父类加载# 第292-299行:加载历史和全局变量self.history = self.dsl["history"]if "globals" in self.dsl:self.globals = self.dsl["globals"]else:self.globals = {"sys.query": "","sys.files": []}# 第300-305行:加载变量和检索结果if "variables" in self.dsl:self.variables = self.dsl["variables"]else:self.variables = {}self.retrieval = self.dsl["retrieval"] # 第307行:检索结果self.memory = self.dsl.get("memory", []) # 第308行:记忆列表
Canvas扩展字段表:
globals | |||
variables | |||
history | |||
retrieval | |||
memory | |||
_id | |||
components | |||
path | |||
error |
Canvas继承关系图:

2.2 Canvas重置机制
源码位置:agent/canvas.py:128-137(Graph.reset)+ Canvas.reset
# Graph.reset (第128-136行)def reset(self):self.path = [] # 第129行:清空执行路径for k, cpn in self.components.items():self.components[k]["obj"].reset() # 第131行:重置所有组件try:REDIS_CONN.delete(f"{self.task_id}-logs") # 第133行:删除Redis日志REDIS_CONN.delete(f"{self.task_id}-cancel") # 第134行:删除取消标志except Exception as e:logging.exception(e)# Canvas.reset (推测实现)def reset(self, mem=False):super().reset() # 调用父类重置if not mem: # 如果不保留记忆self.history = [] # 清空对话历史self.retrieval = [] # 清空检索结果self.memory = [] # 清空记忆print(self.variables) # 打印变量(调试用)# 重置全局系统变量for k in self.globals.keys():if k.startswith("sys."):if isinstance(self.globals[k], str):self.globals[k] = "" # 字符串类型清空elif isinstance(self.globals[k], int):self.globals[k] = 0 # 整数类型归零elif isinstance(self.globals[k], list):self.globals[k] = [] # 列表类型清空
重置流程图:

三、组件基类设计
3.1 ComponentBase抽象基类
源码位置:agent/component/base.py(推测结构)
class ComponentBase:component_name = "Base" # 组件名称(类属性)def __init__(self, canvas, id, param):self._canvas = canvas # 画布引用self._id = id # 组件IDself._param = param # 参数对象self._output = {} # 输出字典self._error = "" # 错误信息def reset(self):"""重置组件状态"""self._output = {}self._error = ""def output(self, key=None):"""获取输出"""if key is None:return self._outputreturn self._output.get(key)def set_output(self, key, value):"""设置输出"""self._output[key] = valuedef error(self):"""获取错误信息"""return self._errordef set_error(self, msg):"""设置错误信息"""self._error = msgasync def invoke(self, **kwargs):"""执行组件(子类必须实现)"""raise NotImplementedError("Subclasses must implement invoke method")def get_downstream(self):"""获取下游组件列表"""cpn = self._canvas.get_component(self._id)return cpn.get("downstream", [])def get_upstream(self):"""获取上游组件列表"""cpn = self._canvas.get_component(self._id)return cpn.get("upstream", [])def get_input_form(self):"""获取输入表单定义"""return {}def __str__(self):"""序列化为JSON字符串"""return json.dumps({"component_name": self.component_name,"params": self._param.to_dict() if hasattr(self._param, "to_dict") else {}}, ensure_ascii=False)
组件生命周期状态图:

3.2 组件参数类设计
参数基类示例:
class ComponentParamBase:def __init__(self):self.custom_header = None # 自定义HTTP头def update(self, params):"""更新参数"""for k, v in params.items():if hasattr(self, k):setattr(self, k, v)def check(self):"""参数校验(子类可重写)"""passdef to_dict(self):"""转换为字典"""return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
具体参数类示例(LLMParam):
class LLMParam(ComponentParamBase):def __init__(self):super().__init__()self.llm_id = "" # LLM模型IDself.prompt = "" # 提示词self.temperature = 0.1 # 温度参数self.max_tokens = 2048 # 最大token数self.max_retries = 5 # 最大重试次数self.delay_after_error = 2.0 # 错误后延迟self.max_rounds = 5 # 最大轮数self.tools = [] # 工具列表self.mcp = [] # MCP服务器列表def check(self):"""校验参数"""if not self.llm_id:raise ValueError("llm_id cannot be empty")if self.temperature < 0 or self.temperature > 2:raise ValueError("temperature must be between 0 and 2")
四、核心组件实现
4.1 LLM组件
源码位置:agent/component/llm.py(推测结构)
class LLM(ComponentBase):component_name = "LLM"def __init__(self, canvas, id, param: LLMParam):super().__init__(canvas, id, param)# 创建LLM Bundleself.chat_mdl = LLMBundle(canvas.get_tenant_id(),TenantLLMService.llm_id2llm_type(param.llm_id),param.llm_id,max_retries=param.max_retries,retry_interval=param.delay_after_error,max_rounds=param.max_rounds)async def invoke(self, **kwargs):"""执行LLM调用"""# 获取上游输出upstream_output = kwargs.get("input", {})# 构建提示词(替换变量)prompt = self._canvas.get_value_with_variable(self._param.prompt)# 构建消息历史history = self._canvas.history.copy()history.append({"role": "user", "content": prompt})# 调用LLMgen_conf = {"temperature": self._param.temperature,"max_tokens": self._param.max_tokens}ans = ""async for delta in self.chat_mdl.async_chat_streamly("", history, gen_conf):if isinstance(delta, str):ans += deltayield delta # 流式输出# 设置输出self.set_output("text", ans)self._canvas.history.append({"role": "assistant", "content": ans})
LLM组件执行流程图:

4.2 Agent组件(带工具)
源码位置:agent/component/agent_with_tools.py:83
class Agent(LLM, ToolBase):component_name = "Agent"def __init__(self, canvas, id, param: LLMParam):LLM.__init__(self, canvas, id, param)# 第89-93行:加载工具self.tools = {}for idx, cpn in enumerate(self._param.tools):cpn = self._load_tool_obj(cpn)original_name = cpn.get_meta()["function"]["name"]indexed_name = f"{original_name}_{idx}" # 工具名添加索引后缀self.tools[indexed_name] = cpn# 第96-102行:创建LLM Bundleself.chat_mdl = LLMBundle(self._canvas.get_tenant_id(),TenantLLMService.llm_id2llm_type(self._param.llm_id),self._param.llm_id,max_retries=self._param.max_retries,retry_interval=self._param.delay_after_error,max_rounds=self._param.max_rounds,verbose_tool_use=True # 详细工具使用日志)# 第103-107行:构建工具元数据self.tool_meta = []for indexed_name, tool_obj in self.tools.items():original_meta = tool_obj.get_meta()indexed_meta = deepcopy(original_meta)indexed_meta["function"]["name"] = indexed_nameself.tool_meta.append(indexed_meta)# 第109-117行:加载MCP工具for mcp in self._param.mcp:_, mcp_server = MCPServerService.get_by_id(mcp["mcp_id"])custom_header = self._param.custom_headertool_call_session = MCPToolCallSession(mcp_server, mcp_server.variables, custom_header)for tnm, meta in mcp["tools"].items():# 添加MCP工具到tool_meta# ...
Agent工具调用流程图:

关键技术点:
- 工具名索引化
(第92行): indexed_name = f"{original_name}_{idx}"防止工具名冲突,支持同名工具多次使用 - MCP工具集成
(第109-117行):支持从MCP服务器动态加载工具,扩展工具生态 - verbose_tool_use
(第101行):启用详细工具使用日志,便于调试工具调用流程 - 多继承设计
(第83行): class Agent(LLM, ToolBase)继承LLM和ToolBase,复用LLM调用和工具管理逻辑
4.3 Retrieval检索组件
推测实现:
class Retrieval(ComponentBase):component_name = "Retrieval"def __init__(self, canvas, id, param: RetrievalParam):super().__init__(canvas, id, param)self.retrieval_mdl = LLMBundle(canvas.get_tenant_id(),LLMType.EMBEDDING,param.embd_id)async def invoke(self, **kwargs):"""执行检索"""# 获取查询(替换变量)query = self._canvas.get_value_with_variable(self._param.query)# 调用检索器from rag.nlp import searchdealer = search.Dealer(settings.docStoreConn)ranks = await dealer.retrieval(question=query,embd_mdl=self.retrieval_mdl,tenant_ids=[self._canvas.get_tenant_id()],kb_ids=self._param.kb_ids,page=1,page_size=self._param.top_n,similarity_threshold=self._param.similarity_threshold,vector_similarity_weight=self._param.vector_similarity_weight)# 设置输出self.set_output("chunks", ranks["chunks"])self.set_output("doc_aggs", ranks["doc_aggs"])# 缓存到canvasself._canvas.retrieval = ranks
4.4 Categorize分类组件
推测实现:
class Categorize(ComponentBase):component_name = "Categorize"def __init__(self, canvas, id, param: CategorizeParam):super().__init__(canvas, id, param)self.chat_mdl = LLMBundle(canvas.get_tenant_id(),LLMType.CHAT,param.llm_id)async def invoke(self, **kwargs):"""执行分类"""# 获取输入文本input_text = self._canvas.get_value_with_variable(self._param.input)# 构建分类提示词categories = self._param.categories # 分类列表prompt = f"请将以下文本分类到以下类别之一:{', '.join(categories)}\n\n文本:{input_text}\n\n类别:"# 调用LLMhistory = [{"role": "user", "content": prompt}]ans = ""async for delta in self.chat_mdl.async_chat_streamly("", history, {}):if isinstance(delta, str):ans += delta# 解析分类结果category = self._parse_category(ans, categories)# 设置输出self.set_output("category", category)# 动态设置下游组件(根据分类结果)downstream_map = self._param.category_to_downstreamdownstream = downstream_map.get(category, self.get_downstream())# 更新组件的downstreamcpn = self._canvas.get_component(self._id)cpn["downstream"] = [downstream]
分类组件流程图:

4.5 Switch分支组件
推测实现:
class Switch(ComponentBase):component_name = "Switch"def __init__(self, canvas, id, param: SwitchParam):super().__init__(canvas, id, param)async def invoke(self, **kwargs):"""执行分支判断"""# 获取条件变量condition_var = self._canvas.get_value_with_variable(self._param.condition)# 遍历条件分支for case in self._param.cases:# 评估条件if self._evaluate_condition(condition_var, case["condition"]):# 匹配成功,设置下游组件cpn = self._canvas.get_component(self._id)cpn["downstream"] = [case["downstream"]]self.set_output("matched", case["condition"])return# 无匹配,使用默认分支if self._param.default_downstream:cpn = self._canvas.get_component(self._id)cpn["downstream"] = [self._param.default_downstream]self.set_output("matched", "default")def _evaluate_condition(self, value, condition):"""评估条件"""# 支持多种条件类型:等于、包含、正则匹配等if condition.startswith("=="):return value == condition[2:].strip()elif condition.startswith("contains:"):return condition[9:].strip() in str(value)elif condition.startswith("regex:"):return bool(re.match(condition[6:].strip(), str(value)))else:return value == condition
4.6 Loop循环组件
推测实现:
class Loop(ComponentBase):component_name = "Loop"def __init__(self, canvas, id, param: LoopParam):super().__init__(canvas, id, param)async def invoke(self, **kwargs):"""执行循环"""# 获取循环变量items = self._canvas.get_value_with_variable(self._param.items)if not isinstance(items, (list, tuple)):items = [items]# 设置循环上下文self._canvas.variables[f"{self._id}_index"] = 0self._canvas.variables[f"{self._id}_items"] = items# 设置下游为循环体第一个组件cpn = self._canvas.get_component(self._id)cpn["downstream"] = [self._param.loop_body]self.set_output("count", len(items))
五、工具系统设计
5.1 ToolBase工具基类
源码位置:agent/tools/base.py:126(推测)
class ToolBase:def __init__(self, canvas, id, param):self._canvas = canvasself._id = idself._param = paramdef get_meta(self) -> dict:"""获取工具元数据(OpenAI function calling格式)"""raise NotImplementedError("Subclasses must implement get_meta")async def invoke(self, arguments: dict):"""执行工具"""raise NotImplementedError("Subclasses must implement invoke")def _load_tool_obj(self, tool_config):"""加载工具对象"""tool_name = tool_config["component_name"]tool_param = component_class(tool_name + "Param")()tool_param.update(tool_config["params"])return component_class(tool_name)(self._canvas, self._id, tool_param)
工具元数据格式:
{"type": "function","function": {"name": "search_web","description": "搜索网络获取信息","parameters": {"type": "object","properties": {"query": {"type": "string","description": "搜索查询词"},"top_k": {"type": "integer","description": "返回结果数量","default": 5}},"required": ["query"]}}}
5.2 内置工具示例
WebSearch工具:
class WebSearch(ToolBase):def get_meta(self) -> dict:return {"type": "function","function": {"name": "web_search","description": "搜索网络获取实时信息","parameters": {"type": "object","properties": {"query": {"type": "string", "description": "搜索查询"},"top_k": {"type": "integer", "default": 5}},"required": ["query"]}}}async def invoke(self, arguments: dict):query = arguments["query"]top_k = arguments.get("top_k", 5)# 调用搜索APIresults = await self._search(query, top_k)return {"results": results,"count": len(results)}
六、完整Agent执行流程
6.1 Canvas.run执行流程
推测实现:
async def run(self, **kwargs):"""执行画布"""# 初始化执行路径if not self.path:self.path.append("begin")# 执行第一个组件cpn_obj = self.get_component_obj(self.path[0])async for _ in cpn_obj.invoke(**kwargs):pass # 消费流式输出if cpn_obj.error():self.error = "[ERROR]" + cpn_obj.error()return# 循环执行下游组件idx = len(self.path) - 1cpn_obj = self.get_component_obj(self.path[idx])idx += 1self.path.extend(cpn_obj.get_downstream())while idx < len(self.path) and not self.error:last_cpn = self.get_component_obj(self.path[idx - 1])cpn_obj = self.get_component_obj(self.path[idx])# 执行组件(传递上游输出)async for _ in cpn_obj.invoke(**last_cpn.output()):passif cpn_obj.error():self.error = "[ERROR]" + cpn_obj.error()breakidx += 1self.path.extend(cpn_obj.get_downstream())# 返回最终输出if not self.error:return self.get_component_obj(self.path[-1]).output()return {}
完整执行流程图:

七、设计模式与技术决策总结
7.1 核心设计模式统计表
| 模板方法 | |||
| 工厂模式 | |||
| 策略模式 | |||
| 迭代器模式 | |||
| 多继承 | |||
| 装饰器模式 | |||
| 观察者模式 |
7.2 技术决策对比表
| DSL格式 | |||
| 变量语法 | |||
| 组件通信 | |||
| 工具调用 | |||
| 流式输出 | |||
| 错误处理 |
7.3 性能优化关键点
Canvas优化:
- 线程池预分配
(第87行): ThreadPoolExecutor(max_workers=5)避免动态创建线程开销 - 组件实例复用
(第103行):组件在load时创建,后续执行复用实例,避免重复初始化 - Redis日志删除
(第133-134行):reset时删除Redis日志和取消标志,释放内存
变量系统优化:
- 正则预编译
(第163行): re.compile预编译正则表达式,提升匹配性能 - partial延迟执行
(第173-177行):流式输出使用partial延迟执行,避免提前计算 - 路径递归解析
(第210-232行):支持嵌套路径一次解析,避免多次调用
组件执行优化:
- 下游动态扩展
(推测): self.path.extend(cpn_obj.get_downstream())支持条件分支动态添加下游 - 流式输出消费
(推测): async for _ in cpn_obj.invoke()消费流式输出,避免内存堆积 - 错误中断
(推测):组件报错立即break,避免无效下游执行
八、源码行号索引表
8.1 Graph/Canvas核心方法
Graph.__init__ | ||
Graph.load | ||
Graph.__str__ | ||
Graph.reset | ||
Graph.get_value_with_variable | ||
Graph.get_variable_value | ||
Graph.get_variable_param_value | ||
Canvas.__init__ | ||
Canvas.load |
8.2 组件核心方法(推测)
ComponentBase.__init__ | |
ComponentBase.reset | |
ComponentBase.invoke | |
ComponentBase.get_downstream | |
LLM.invoke | |
Agent.__init__ | |
Retrieval.invoke | |
Categorize.invoke | |
Switch.invoke |
九、学习建议与实践路径
9.1 理论学习顺序

9.2 实践调试建议
调试变量解析:
# 在agent/canvas.py:170添加日志v = self.get_variable_value(key)print(f"[DEBUG] 变量解析:key={key}, value={v}, type={type(v)}")
调试组件执行:
# 在组件invoke方法开始添加日志async def invoke(self, **kwargs):print(f"[DEBUG] 组件执行:id={self._id}, name={self.component_name}, kwargs={kwargs}")# ...
调试工具调用:
# 在agent/component/agent_with_tools.py工具调用处添加日志tool_result = await tool.invoke(arguments)print(f"[DEBUG] 工具调用:tool={tool_name}, args={arguments}, result={tool_result}")
十、常见问题与解决方案
10.1 变量解析问题
问题1:变量未找到错误
原因:变量表达式中的组件ID不存在或变量名错误
解决方案:
# 检查变量表达式格式# 正确:{{retrieval_0@chunks}}# 错误:{{retrieval_0@chunk}}(变量名错误)# 在canvas.py:196添加详细错误信息if not cpn:available_cpn_ids = list(self.components.keys())raise Exception(f"Can't find variable: '{cpn_id}@{var_nm}'. Available components: {available_cpn_ids}")
问题2:嵌套路径解析失败
原因:路径中的键不存在或类型不匹配
解决方案:
# 在canvas.py:232添加路径解析日志cur = getattr(cur, key, None)if cur is None:logging.warning(f"路径解析失败:path={path}, current_key={key}, available_keys={dir(obj) ifhasattr(obj, '__dict__') elselist(obj.keys()) ifisinstance(obj, dict) else []}")
10.2 组件执行问题
问题1:组件无限循环
原因:组件下游形成环,如A→B→A
解决方案:
# 在执行循环中添加路径检测visited = set()while idx < len(self.path) and not self.error:cpn_id = self.path[idx]if cpn_id in visited:raise Exception(f"检测到循环依赖:组件{cpn_id}已执行过")visited.add(cpn_id)# ...
问题2:流式输出未消费
原因:组件yield输出但调用者未消费
解决方案:
# 确保使用async for消费流式输出async for _ in cpn_obj.invoke(**kwargs):pass # 必须消费,否则流式输出丢失
10.3 工具调用问题
问题1:工具名冲突
原因:多个工具同名,导致调用错误
解决方案:
# 已实现:工具名索引化(agent_with_tools.py:92)indexed_name = f"{original_name}_{idx}"# 例如:search_web_0, search_web_1
问题2:MCP工具加载失败
原因:MCP服务器未启动或配置错误
解决方案:
# 检查MCP服务器状态docker exec ragflow-container curl http://mcp-server:port/health# 检查MCP配置docker exec ragflow-container cat /ragflow/conf/mcp_config.yaml
十一、扩展阅读与参考资源
11.1 官方文档
- OpenAI Function Calling
: https://platform.openai.com/docs/guides/function-calling - LangChain Agents
: https://python.langchain.com/docs/modules/agents/ - MCP协议
: https://modelcontextprotocol.io/
11.2 源码关联文件
agent/component/__init__.py | |
agent/tools/__init__.py | |
api/apps/canvas_app.py | |
api/db/services/canvas_service.py | |
rag/llm/chat_model.py |
夜雨聆风