Datus-Agent 深度分析文档 – 模块 1:Agent 核心架构
本文档深入分析 Datus-Agent 的 Agent 核心架构,包括 Node 系统设计、Workflow 引擎、Planning & Reflection 机制。
目录
-
架构概览 -
Node 系统设计 -
Workflow 引擎 -
AgenticNode 架构 -
Planning & Reflection 机制 -
核心调用流程
架构概览
整体架构图
┌─────────────────────────────────────────────────────────────────────────┐│ Datus Agent Core │├─────────────────────────────────────────────────────────────────────────┤│ ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ Entry Point │────▶│ Workflow │────▶│ Node │ ││ │ (Agent) │ │ Runner │ │ Executor │ ││ └──────────────┘ └──────────────┘ └──────────────┘ ││ │ │ │ ││ ▼ ▼ ▼ ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ Config │ │ Context │ │ Tools │ ││ │ Manager │ │ Manager │ │ Registry │ ││ └──────────────┘ └──────────────┘ └──────────────┘ ││ │└─────────────────────────────────────────────────────────────────────────┘ │ ┌────────────────────┼────────────────────┐ ▼ ▼ ▼┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐│ Storage Layer │ │ Model Layer │ │ Tool Layer ││ - LanceDB │ │ - LLMBaseModel │ │ - DB Tools ││ - Schema RAG │ │ - Multi-Provider│ │ - MCP Tools ││ - Metric Store │ │ - Session Mgmt │ │ - Skill Tools │└─────────────────┘ └─────────────────┘ └─────────────────┘
核心组件职责
|
|
|
|
|---|---|---|
| Agent | datus/agent/agent.py |
|
| Workflow | datus/agent/workflow.py |
|
| Node | datus/agent/node/node.py |
|
| AgenticNode | datus/agent/node/agentic_node.py |
|
| WorkflowRunner | datus/agent/workflow_runner.py |
|
Node 系统设计
设计哲学
Datus-Agent 的 Node 系统采用命令模式 + 工厂模式的组合设计:
- 命令模式
:每个 Node 封装一个独立的执行逻辑 - 工厂模式
:通过 Node.new_instance()统一创建不同节点类型 - 模板方法模式
:基类定义执行框架,子类实现具体逻辑
Node 类层次结构
┌─────────────────────────────────────────────────────────────┐│ Node (ABC) ││ ───────────────────────────────────────────────────────── ││ + id: str ││ + description: str ││ + type: str ││ + input: BaseInput ││ + result: BaseResult ││ + status: str (pending/running/completed/failed) ││ + dependencies: List[str] ││ ───────────────────────────────────────────────────────── ││ + run(): BaseResult ││ + execute(): BaseResult (abstract) ││ + execute_stream(): AsyncGenerator (abstract) ││ + update_context(): Dict (abstract) ││ + setup_input(): Dict (abstract) ││ ───────────────────────────────────────────────────────── │└─────────────────────────────────────────────────────────────┘ │ ┌───────────────┴───────────────┐ │ │ ▼ ▼┌─────────────────────────┐ ┌─────────────────────────┐│ Standard Node │ │ AgenticNode ││ (传统节点,直接执行) │ │ (基于 OpenAI Agents SDK)││ ───────────────────── │ │ ───────────────────── ││ • SchemaLinkingNode │ │ • ChatAgenticNode ││ • GenerateSQLNode │ │ • GenSQLAgenticNode ││ • ExecuteSQLNode │ │ • GenReportAgenticNode ││ • ReasonSQLNode │ │ • ExploreAgenticNode ││ • DocSearchNode │ │ • GenMetricsAgenticNode││ • OutputNode │ │ • ... ││ • FixNode │ │ ││ • ReflectNode │ │ ││ • HitlNode │ │ ││ • BeginNode │ │ ││ • SearchMetricsNode │ │ ││ • ParallelNode │ │ ││ • SelectionNode │ │ ││ • SubworkflowNode │ │ ││ • CompareNode │ │ ││ • DateParserNode │ │ │└─────────────────────────┘ └─────────────────────────┘
Node 工厂模式实现
# datus/agent/node/node.pyclassNode(ABC): @classmethoddefnew_instance( cls, node_id: str, description: str, node_type: str, input_data: BaseInput = None, agent_config: Optional[AgentConfig] = None, tools: Optional[List[Tool]] = None, node_name: Optional[str] = None,):"""工厂方法:根据 node_type 创建具体的 Node 实例"""from datus.agent.node import ( BeginNode, ChatAgenticNode, CompareNode, DateParserNode, DocSearchNode, ExecuteSQLNode, FixNode, GenerateSQLNode, GenSQLAgenticNode, HitlNode, OutputNode, ParallelNode, ReasonSQLNode, ReflectNode, SchemaLinkingNode, SearchMetricsNode, SelectionNode, SubworkflowNode, )if node_type == NodeType.TYPE_SCHEMA_LINKING:return SchemaLinkingNode(node_id, description, node_type, input_data, agent_config)elif node_type == NodeType.TYPE_GENERATE_SQL:return GenerateSQLNode(node_id, description, node_type, input_data, agent_config, tools)elif node_type == NodeType.TYPE_EXECUTE_SQL:return ExecuteSQLNode(node_id, description, node_type, input_data, agent_config, tools)# ... 更多节点类型elif node_type == NodeType.TYPE_CHAT:return ChatAgenticNode(node_id, description, node_type, input_data, agent_config, tools)elif node_type == NodeType.TYPE_GENSQL:return GenSQLAgenticNode( node_id, description, node_type, input_data, agent_config, tools, node_name )else:raise ValueError(f"Invalid node type: {node_type}")
Node 执行生命周期
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐│ pending │────▶│ running │────▶│ completed │ │ failed │└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ ▲ ▲ │ │ │ ▼ │ │ ┌─────────────┐ │ │ │ executing │───────────┘ │ └─────────────┘ │ │ ┌─────────────────────────────────────────────┘ │ (exception caught) ▼
核心 Node 类型详解
1. SchemaLinkingNode(模式链接节点)
职责:将用户自然语言查询映射到数据库 schema
classSchemaLinkingNode(Node):defexecute(self) -> BaseResult:"""执行 schema 链接,识别相关表和列"""# 1. 从上下文中获取用户查询 user_query = self.input.query# 2. 使用 LLM 识别相关 schema schema_context = self._retrieve_schema_context(user_query)# 3. 生成 schema 链接结果return SchemaLinkingResult( success=True, tables=identified_tables, columns=identified_columns, )
2. GenerateSQLNode(SQL 生成节点)
职责:根据 schema 和用户查询生成 SQL
classGenerateSQLNode(Node):defexecute(self) -> BaseResult:"""执行 SQL 生成"""# 1. 获取 schema 链接结果 schema_context = self.workflow.context.schema_linking_result# 2. 构建 prompt prompt = self._build_sql_generation_prompt( user_query=self.input.query, schema=schema_context, )# 3. 调用 LLM 生成 SQL response = self.model.generate(prompt)return GenerateSQLResult( success=True, sql=response.sql, reasoning=response.reasoning, )
3. ExecuteSQLNode(SQL 执行节点)
职责:执行生成的 SQL 并返回结果
classExecuteSQLNode(Node):defexecute(self) -> BaseResult:"""执行 SQL 查询"""# 1. 获取待执行的 SQL sql = self.input.sql# 2. 通过连接器执行 connector = self._sql_connector(self.input.database_name) result_df = connector.execute_sql(sql)return ExecuteSQLResult( success=True, data=result_df, row_count=len(result_df), )
4. ReflectNode(反思节点)
职责:分析执行结果,提出改进建议
classReflectNode(Node):defexecute(self) -> BaseResult:"""执行反思,分析工作流执行结果"""# 1. 收集所有前置节点的结果 node_results = self._collect_node_results()# 2. 分析成功/失败原因 analysis = self.model.generate( prompt=self._build_reflection_prompt(node_results) )# 3. 生成改进建议return ReflectionResult( success=True, suggestions=analysis.suggestions, workflow_adjustments=analysis.adjustments, )
Workflow 引擎
Workflow 核心数据结构
classWorkflow:"""工作流表示和管理器"""def__init__( self, name: str, task: Optional[SqlTask] = None, agent_config: Optional[AgentConfig] = None,): self.name = name # 工作流名称 self.task = task # 任务描述 self.nodes = {} # node_id -> Node 映射 self.node_order = [] # 执行顺序 self.current_node_index = 0# 当前执行位置 self.status = "pending"# 状态 self.context = Context() # 共享上下文 self.reflection_round = 0# 反思轮次
Workflow 状态机
┌─────────────┐ │ pending │ └──────┬──────┘ │ start() ▼ ┌─────────────┐ ┌────▶│ running │────┐ │ └──────┬──────┘ │ │ │ │ pause()│ complete() │ fail() │ │ │ ▼ │ ▼ ┌────────────┐ │ ┌────────────┐ │ paused │──────┘ │ failed │ └─────┬──────┘ └────────────┘ │ resume() │ └─────────────────────────────┐ │ ┌─────────────┐ │ │ completed │◀───────┘ └─────────────┘
Workflow 执行流程
defexecute_workflow(workflow: Workflow):"""执行工作流的核心逻辑""" workflow.status = "running"whilenot workflow.is_complete(): node = workflow.get_current_node()if node isNone:break# 检查依赖ifnot _check_dependencies(node, workflow): workflow.status = "failed"break# 执行节点try: result = node.run()if result.success: node.complete(result)# 更新上下文 node.update_context(workflow)else: node.fail(result.error) workflow.status = "failed"breakexcept Exception as e: node.fail(str(e)) workflow.status = "failed"break# 移动到下一个节点 workflow.advance_to_next_node() workflow.status = "completed"return workflow.get_final_result()
上下文管理
classContext:"""工作流共享上下文"""def__init__(self): self.sql_contexts: List[SQLContext] = [] # SQL 执行历史 self.schema_linking_results: List = [] # Schema 链接结果 self.user_messages: List[str] = [] # 用户消息历史 self.assistant_messages: List[str] = [] # 助手响应历史 self.metadata: Dict = {} # 元数据defadd_sql_context(self, sql: str, result: DataFrame, success: bool):"""添加 SQL 执行上下文""" self.sql_contexts.append( SQLContext(sql=sql, result=result, success=success) )defget_last_sql_context(self) -> SQLContext:"""获取最近的 SQL 上下文"""iflen(self.sql_contexts) > 0:return self.sql_contexts[-1]raise DatusException(ErrorCode.NODE_NO_SQL_CONTEXT)
AgenticNode 架构
什么是 AgenticNode?
AgenticNode 是 Datus-Agent 引入的新一代节点架构,基于 OpenAI Agents SDK 构建,支持:
- 会话管理
:基于 SQLite 的持久化会话 - 流式交互
:实时输出工具调用和响应 - 工具集成
:原生支持 MCP 工具和函数工具 - 自动压缩
:会话 token 接近限制时自动压缩历史 - 权限控制
:统一的工具/MCP/技能权限管理
AgenticNode 类层次
┌─────────────────────────────────────────────────────────────┐│ AgenticNode ││ ───────────────────────────────────────────────────────── ││ + session: AdvancedSQLiteSession ││ + session_id: str ││ + mcp_servers: Dict[str, MCPServerStdio] ││ + actions: List[ActionHistory] ││ + permission_manager: PermissionManager ││ + skill_manager: SkillManager ││ + action_bus: ActionBus ││ ───────────────────────────────────────────────────────── ││ + _get_or_create_session(): (session, summary) ││ + _manual_compact(): dict ││ + _auto_compact(): bool ││ + execute_stream(): AsyncGenerator[ActionHistory] ││ + setup_tools(): List[Tool] ││ ───────────────────────────────────────────────────────── │└─────────────────────────────────────────────────────────────┘ │ ┌────────────────────┼────────────────────┐ │ │ │ ▼ ▼ ▼┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐│ ChatAgenticNode │ │ GenSQLAgenticNode│ │GenReportAgenticNode││ (聊天节点) │ │ (SQL 生成节点) │ │ (报告生成节点) │└─────────────────┘ └─────────────────┘ └─────────────────┘
会话管理机制
classAgenticNode(Node):def_get_or_create_session(self) -> tuple[AdvancedSQLiteSession, Optional[str]]:"""获取或创建会话 Returns: (session, summary) - summary 是之前压缩的对话摘要 """ summary = Noneif self._session isNone:if self.session_id isNone: self.session_id = self._generate_session_id()if self.ephemeral:# 临时会话(用于子 agent)- 不持久化 self._session = AdvancedSQLiteSession( session_id=self.session_id, db_path=":memory:", create_tables=True, )else:# 持久化会话 self._session = self.model.create_session(self.session_id)# 如果有之前的压缩摘要,返回用于恢复上下文if self.last_summary: summary = self.last_summary self.last_summary = Nonereturn self._session, summary
自动压缩机制
asyncdef_auto_compact(self) -> bool:"""当会话接近 token 限制 (~90%) 时自动压缩"""ifnot self.model ornot self.context_length:returnFalse current_tokens = await self._count_session_tokens()if current_tokens > (self.context_length * 0.9): logger.info(f"触发自动压缩:{current_tokens}/{self.context_length} tokens") result = await self._manual_compact()return result.get("success", False)returnFalseasyncdef_manual_compact(self) -> dict:"""手动压缩会话,生成对话摘要"""# 1. 使用 LLM 生成摘要 summarization_prompt = ("Summarize our conversation up to this point. ""The summary should be a concise yet comprehensive overview..." ) result = await self.model.generate_with_tools( prompt=summarization_prompt, session=self._session, max_turns=1, temperature=0.3, max_tokens=2000 )# 2. 存储摘要供下次会话使用 self.last_summary = result.get("content", "")# 3. 删除旧会话 self.model.delete_session(self.session_id) self.session_id = None self._session = Nonereturn {"success": True, "summary": self.last_summary}
工具调用流式处理
async def execute_stream( self, action_history_manager: Optional[ActionHistoryManager] = None) -> AsyncGenerator[ActionHistory, None]:"""流式执行,实时输出工具调用和响应""" session, conversation_summary = self._get_or_create_session()# 构建系统 prompt system_prompt = self._get_system_prompt(conversation_summary)# 设置工具 tools = self.setup_tools()# 流式执行async for action in self.model.run_stream( system_prompt=system_prompt, prompt=self.input.query, session=session, tools=tools, mcp_servers=self.mcp_servers, ):# 通过 action_bus 合并子动作async for merged_action in self.action_bus.process(action):yield merged_action# 记录到历史 self.actions.append(merged_action)if action_history_manager: action_history_manager.add_action(merged_action)# 检查是否需要压缩await self._auto_compact()
权限管理系统
classAgenticNode(Node):def_setup_permission_manager(self) -> None:"""初始化统一的权限管理器"""ifnot self.agent_config ornothasattr(self.agent_config, "permissions_config"):return permissions_config = self.agent_config.permissions_config# 获取节点特定的权限覆盖 node_permissions = self.node_config.get("permissions", {}) self.permission_manager = PermissionManager( global_config=permissions_config, node_overrides={self.get_node_name(): node_permissions} if node_permissions else {}, )
技能系统集成
classAgenticNode(Node):def_setup_skill_manager(self) -> None:"""初始化技能管理器"""ifnot self.agent_config ornothasattr(self.agent_config, "skills_config"):return skills_config = self.agent_config.skills_config self.skill_manager = SkillManager( config=skills_config, permission_manager=self.permission_manager, )def_setup_skill_func_tools(self) -> None:"""设置技能函数工具""" skill_patterns_str = self.node_config.get("skills")ifnot skill_patterns_str:return self.skill_func_tool = SkillFuncTool( manager=self.skill_manager, node_name=self.get_node_name(), )
Planning & Reflection 机制
反思工作流
Datus-Agent 实现了**自反思(Self-Reflection)**机制,通过迭代优化提高 SQL 生成准确率:
┌─────────────────────────────────────────────────────────────────┐│ Reflection Workflow │└─────────────────────────────────────────────────────────────────┘Round 1:┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐│ Begin │───▶│ Schema │───▶│ Generate │───▶│ Execute ││ Node │ │ Linking │ │ SQL │ │ SQL │└──────────┘ └──────────┘ └──────────┘ └──────────┘ │ ▼Round 2: ┌──────────┐┌──────────┐ ┌──────────┐ │ Reflect ││ Fix │───▶│ Generate │◀─────────│ Node ││ Node │ │ SQL │ │ (分析) │└──────────┘ └──────────┘ └──────────┘
ReflectNode 实现
classReflectNode(Node):defexecute(self) -> BaseResult:"""执行反思分析"""# 1. 收集所有节点执行结果 node_results = []for node_id in self.workflow.node_order[:self.workflow.current_node_index]: node = self.workflow.get_node(node_id) node_results.append({"type": node.type,"status": node.status,"result": node.result, })# 2. 构建反思 prompt prompt = f""" Analyze the workflow execution: Task: {self.workflow.get_task()} Execution Results:{self._format_results(node_results)} Please identify: 1. What went well? 2. What went wrong? 3. How to improve? """# 3. 调用 LLM 生成反思 response = self.model.generate(prompt)# 4. 解析反思结果return ReflectionResult( success=True, analysis=response.analysis, suggestions=response.suggestions, workflow_adjustments=response.adjustments, )
FixNode 实现
classFixNode(Node):defexecute(self) -> BaseResult:"""根据反思结果修复 SQL"""# 1. 获取反思建议 reflection = self.workflow.get_last_node_by_type(NodeType.TYPE_REFLECT) suggestions = reflection.result.suggestions# 2. 获取之前的 SQL 和错误 last_sql = self.workflow.get_last_sql_context().sql error_message = self.workflow.get_last_sql_context().error# 3. 构建修复 prompt prompt = f""" Fix the SQL based on reflection suggestions: Original SQL: {last_sql} Error: {error_message} Suggestions: {suggestions} Generate corrected SQL. """# 4. 生成修复后的 SQL response = self.model.generate(prompt)return FixInput( success=True, fixed_sql=response.sql, reasoning=response.reasoning, )
工作流调整
defadjust_nodes(self, suggestions: List[Dict]):"""根据反思建议调整工作流"""for suggestion in suggestions: action = suggestion.get("action") node_id = suggestion.get("node_id")if action == "add"and"node"in suggestion:# 添加新节点 node_data = suggestion["node"] node = Node.new_instance( node_id=node_data.get("id"), description=node_data.get("description"), node_type=node_data.get("type"), agent_config=self.global_config, ) self.add_node(node, suggestion.get("position"))elif action == "remove"and node_id:# 移除节点 self.remove_node(node_id)elif action == "modify"and node_id:# 修改节点 node = self.get_node(node_id)if node: modifications = suggestion["modifications"]if"description"in modifications: node.description = modifications["description"]
核心调用流程
完整调用链路
用户输入 │ ▼┌─────────────────┐│ DatusCLI.run() │ (cli/repl.py)└────────┬────────┘ │ ▼┌─────────────────┐│ _execute_chat_ │ 解析命令类型│ command() │└────────┬────────┘ │ ▼┌─────────────────┐│ ChatCommands. │ 创建/恢复 ChatAgenticNode│ cmd_chat() │└────────┬────────┘ │ ▼┌─────────────────┐│ AgenticNode. │ 流式执行│ execute_stream │└────────┬────────┘ │ ├─────────────────┐ │ │ ▼ ▼┌─────────────────┐ ┌─────────────────┐│ setup_tools() │ │ _get_system_ ││ 准备工具列表 │ │ prompt() ││ │ │ 构建系统提示 │└─────────────────┘ └────────┬────────┘ │ ▼ ┌─────────────────┐ │ _get_or_create_ │ │ session() │ │ 获取/创建会话 │ └────────┬────────┘ │ ▼ ┌─────────────────┐ │ model.run_ │ │ stream() │ │ 流式调用 LLM │ └────────┬────────┘ │ ┌──────────────┼──────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Tool │ │ Tool │ │ Final │ │ Call │ │ Result │ │ Output │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ └─────────────┴─────────────┘ │ ▼ ┌─────────────────┐ │ ActionHistory │ │ 记录到历史 │ └─────────────────┘
关键代码位置索引
|
|
|
|
|---|---|---|
|
|
datus/cli/main.py |
main()
Application.run() |
|
|
datus/cli/repl.py |
DatusCLI.run()
_execute_chat_command() |
|
|
datus/cli/chat_commands.py |
ChatCommands.cmd_chat() |
|
|
datus/agent/node/agentic_node.py |
AgenticNode.execute_stream() |
|
|
datus/agent/node/agentic_node.py |
_get_or_create_session() |
|
|
|
setup_tools() |
|
|
datus/agent/node/agentic_node.py |
_get_system_prompt() |
|
|
datus/models/base.py |
LLMBaseModel.run_stream() |
总结
Datus-Agent 的 Agent 核心架构体现了以下设计原则:
- 模块化
:Node 系统高度模块化,每个节点职责单一 - 可扩展
:工厂模式支持快速添加新节点类型 - 流式交互
:AgenticNode 支持实时输出,提升用户体验 - 自反思
:Reflection 机制实现持续改进 - 会话管理
:基于 SQLite 的持久化会话,支持跨轮次对话 - 权限控制
:统一的权限管理系统,保障安全性
下一模块:CLI 交互系统设计
夜雨聆风