乐于分享
好东西不私藏

Datus-Agent 深度分析文档 – 模块 1:Agent 核心架构

Datus-Agent 深度分析文档 – 模块 1:Agent 核心架构

本文档深入分析 Datus-Agent 的 Agent 核心架构,包括 Node 系统设计、Workflow 引擎、Planning & Reflection 机制。


目录

  1. 架构概览
  2. Node 系统设计
  3. Workflow 引擎
  4. AgenticNode 架构
  5. Planning & Reflection 机制
  6. 核心调用流程

架构概览

整体架构图

┌─────────────────────────────────────────────────────────────────────────┐│                           Datus Agent Core                               │├─────────────────────────────────────────────────────────────────────────┤│                                                                          ││  ┌──────────────┐     ┌──────────────┐     ┌──────────────┐            ││  │  Entry Point │────▶│   Workflow   │────▶│    Node      │            ││  │   (Agent)    │     │    Runner    │     │  Executor    │            ││  └──────────────┘     └──────────────┘     └──────────────┘            ││         │                   │                   │                        ││         ▼                   ▼                   ▼                        ││  ┌──────────────┐     ┌──────────────┐     ┌──────────────┐            ││  │    Config    │     │   Context    │     │    Tools     │            ││  │   Manager    │     │   Manager    │     │   Registry   │            ││  └──────────────┘     └──────────────┘     └──────────────┘            ││                                                                          │└─────────────────────────────────────────────────────────────────────────┘                              │         ┌────────────────────┼────────────────────┐         ▼                    ▼                    ▼┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐│  Storage Layer  │  │   Model Layer   │  │   Tool Layer    ││  - LanceDB      │  │  - LLMBaseModel │  │  - DB Tools     ││  - Schema RAG   │  │  - Multi-Provider│  │  - MCP Tools    ││  - Metric Store │  │  - Session Mgmt │  │  - Skill Tools  │└─────────────────┘  └─────────────────┘  └─────────────────┘

核心组件职责

组件
文件位置
核心职责
Agent datus/agent/agent.py
入口点,负责初始化、配置管理、工作流调度
Workflow datus/agent/workflow.py
工作流编排,Node 调度,状态管理
Node datus/agent/node/node.py
执行单元抽象,支持 20+ 节点类型
AgenticNode datus/agent/node/agentic_node.py
基于 OpenAI Agents SDK 的智能节点基类
WorkflowRunner datus/agent/workflow_runner.py
工作流执行器,支持同步/异步/流式执行

Node 系统设计

设计哲学

Datus-Agent 的 Node 系统采用命令模式 + 工厂模式的组合设计:

  1. 命令模式
    :每个 Node 封装一个独立的执行逻辑
  2. 工厂模式
    :通过 Node.new_instance() 统一创建不同节点类型
  3. 模板方法模式
    :基类定义执行框架,子类实现具体逻辑

Node 类层次结构

┌─────────────────────────────────────────────────────────────┐│                         Node (ABC)                          ││  ─────────────────────────────────────────────────────────  ││  + id: str                                                  ││  + description: str                                         ││  + type: str                                                ││  + input: BaseInput                                         ││  + result: BaseResult                                       ││  + status: str (pending/running/completed/failed)           ││  + dependencies: List[str]                                  ││  ─────────────────────────────────────────────────────────  ││  + run(): BaseResult                                        ││  + execute(): BaseResult (abstract)                         ││  + execute_stream(): AsyncGenerator (abstract)              ││  + update_context(): Dict (abstract)                        ││  + setup_input(): Dict (abstract)                           ││  ─────────────────────────────────────────────────────────  │└─────────────────────────────────────────────────────────────┘                              │              ┌───────────────┴───────────────┐              │                               │              ▼                               ▼┌─────────────────────────┐      ┌─────────────────────────┐│      Standard Node      │      │      AgenticNode        ││  (传统节点,直接执行)    │      │  (基于 OpenAI Agents SDK)││  ─────────────────────  │      │  ─────────────────────  ││  • SchemaLinkingNode    │      │  • ChatAgenticNode      ││  • GenerateSQLNode      │      │  • GenSQLAgenticNode    ││  • ExecuteSQLNode       │      │  • GenReportAgenticNode ││  • ReasonSQLNode        │      │  • ExploreAgenticNode   ││  • DocSearchNode        │      │  • GenMetricsAgenticNode││  • OutputNode           │      │  • ...                  ││  • FixNode              │      │                         ││  • ReflectNode          │      │                         ││  • HitlNode             │      │                         ││  • BeginNode            │      │                         ││  • SearchMetricsNode    │      │                         ││  • ParallelNode         │      │                         ││  • SelectionNode        │      │                         ││  • SubworkflowNode      │      │                         ││  • CompareNode          │      │                         ││  • DateParserNode       │      │                         │└─────────────────────────┘      └─────────────────────────┘

Node 工厂模式实现

# datus/agent/node/node.pyclassNode(ABC):    @classmethoddefnew_instance(        cls,        node_id: str,        description: str,        node_type: str,        input_data: BaseInput = None,        agent_config: Optional[AgentConfig] = None,        tools: Optional[List[Tool]] = None,        node_name: Optional[str] = None,):"""工厂方法:根据 node_type 创建具体的 Node 实例"""from datus.agent.node import (            BeginNode, ChatAgenticNode, CompareNode, DateParserNode,            DocSearchNode, ExecuteSQLNode, FixNode, GenerateSQLNode,            GenSQLAgenticNode, HitlNode, OutputNode, ParallelNode,            ReasonSQLNode, ReflectNode, SchemaLinkingNode,            SearchMetricsNode, SelectionNode, SubworkflowNode,        )if node_type == NodeType.TYPE_SCHEMA_LINKING:return SchemaLinkingNode(node_id, description, node_type, input_data, agent_config)elif node_type == NodeType.TYPE_GENERATE_SQL:return GenerateSQLNode(node_id, description, node_type, input_data, agent_config, tools)elif node_type == NodeType.TYPE_EXECUTE_SQL:return ExecuteSQLNode(node_id, description, node_type, input_data, agent_config, tools)# ... 更多节点类型elif node_type == NodeType.TYPE_CHAT:return ChatAgenticNode(node_id, description, node_type, input_data, agent_config, tools)elif node_type == NodeType.TYPE_GENSQL:return GenSQLAgenticNode(                node_id, description, node_type, input_data, agent_config, tools, node_name            )else:raise ValueError(f"Invalid node type: {node_type}")

Node 执行生命周期

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐│   pending   │────▶│   running   │────▶│  completed  │     │   failed    │└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘                          │                   ▲                   ▲                          │                   │                   │                          ▼                   │                   │                    ┌─────────────┐           │                   │                    │  executing  │───────────┘                   │                    └─────────────┘                               │                                                                  │                    ┌─────────────────────────────────────────────┘                    │ (exception caught)                    ▼

核心 Node 类型详解

1. SchemaLinkingNode(模式链接节点)

职责:将用户自然语言查询映射到数据库 schema

classSchemaLinkingNode(Node):defexecute(self) -> BaseResult:"""执行 schema 链接,识别相关表和列"""# 1. 从上下文中获取用户查询        user_query = self.input.query# 2. 使用 LLM 识别相关 schema        schema_context = self._retrieve_schema_context(user_query)# 3. 生成 schema 链接结果return SchemaLinkingResult(            success=True,            tables=identified_tables,            columns=identified_columns,        )

2. GenerateSQLNode(SQL 生成节点)

职责:根据 schema 和用户查询生成 SQL

classGenerateSQLNode(Node):defexecute(self) -> BaseResult:"""执行 SQL 生成"""# 1. 获取 schema 链接结果        schema_context = self.workflow.context.schema_linking_result# 2. 构建 prompt        prompt = self._build_sql_generation_prompt(            user_query=self.input.query,            schema=schema_context,        )# 3. 调用 LLM 生成 SQL        response = self.model.generate(prompt)return GenerateSQLResult(            success=True,            sql=response.sql,            reasoning=response.reasoning,        )

3. ExecuteSQLNode(SQL 执行节点)

职责:执行生成的 SQL 并返回结果

classExecuteSQLNode(Node):defexecute(self) -> BaseResult:"""执行 SQL 查询"""# 1. 获取待执行的 SQL        sql = self.input.sql# 2. 通过连接器执行        connector = self._sql_connector(self.input.database_name)        result_df = connector.execute_sql(sql)return ExecuteSQLResult(            success=True,            data=result_df,            row_count=len(result_df),        )

4. ReflectNode(反思节点)

职责:分析执行结果,提出改进建议

classReflectNode(Node):defexecute(self) -> BaseResult:"""执行反思,分析工作流执行结果"""# 1. 收集所有前置节点的结果        node_results = self._collect_node_results()# 2. 分析成功/失败原因        analysis = self.model.generate(            prompt=self._build_reflection_prompt(node_results)        )# 3. 生成改进建议return ReflectionResult(            success=True,            suggestions=analysis.suggestions,            workflow_adjustments=analysis.adjustments,        )

Workflow 引擎

Workflow 核心数据结构

classWorkflow:"""工作流表示和管理器"""def__init__(        self,        name: str,        task: Optional[SqlTask] = None,        agent_config: Optional[AgentConfig] = None,):        self.name = name                          # 工作流名称        self.task = task                          # 任务描述        self.nodes = {}                           # node_id -> Node 映射        self.node_order = []                      # 执行顺序        self.current_node_index = 0# 当前执行位置        self.status = "pending"# 状态        self.context = Context()                  # 共享上下文        self.reflection_round = 0# 反思轮次

Workflow 状态机

                    ┌─────────────┐                    │   pending   │                    └──────┬──────┘                           │ start()                           ▼                    ┌─────────────┐              ┌────▶│   running   │────┐              │     └──────┬──────┘    │              │            │           │        pause()│      complete()      │ fail()              │            │           │              ▼            │           ▼       ┌────────────┐      │    ┌────────────┐       │   paused   │──────┘    │   failed   │       └─────┬──────┘           └────────────┘             │ resume()             │             └─────────────────────────────┐                                           │                    ┌─────────────┐        │                    │  completed  │◀───────┘                    └─────────────┘

Workflow 执行流程

defexecute_workflow(workflow: Workflow):"""执行工作流的核心逻辑"""    workflow.status = "running"whilenot workflow.is_complete():        node = workflow.get_current_node()if node isNone:break# 检查依赖ifnot _check_dependencies(node, workflow):            workflow.status = "failed"break# 执行节点try:            result = node.run()if result.success:                node.complete(result)# 更新上下文                node.update_context(workflow)else:                node.fail(result.error)                workflow.status = "failed"breakexcept Exception as e:            node.fail(str(e))            workflow.status = "failed"break# 移动到下一个节点        workflow.advance_to_next_node()    workflow.status = "completed"return workflow.get_final_result()

上下文管理

classContext:"""工作流共享上下文"""def__init__(self):        self.sql_contexts: List[SQLContext] = []      # SQL 执行历史        self.schema_linking_results: List = []        # Schema 链接结果        self.user_messages: List[str] = []            # 用户消息历史        self.assistant_messages: List[str] = []       # 助手响应历史        self.metadata: Dict = {}                      # 元数据defadd_sql_context(self, sql: str, result: DataFrame, success: bool):"""添加 SQL 执行上下文"""        self.sql_contexts.append(            SQLContext(sql=sql, result=result, success=success)        )defget_last_sql_context(self) -> SQLContext:"""获取最近的 SQL 上下文"""iflen(self.sql_contexts) > 0:return self.sql_contexts[-1]raise DatusException(ErrorCode.NODE_NO_SQL_CONTEXT)

AgenticNode 架构

什么是 AgenticNode?

AgenticNode 是 Datus-Agent 引入的新一代节点架构,基于 OpenAI Agents SDK 构建,支持:

  • 会话管理
    :基于 SQLite 的持久化会话
  • 流式交互
    :实时输出工具调用和响应
  • 工具集成
    :原生支持 MCP 工具和函数工具
  • 自动压缩
    :会话 token 接近限制时自动压缩历史
  • 权限控制
    :统一的工具/MCP/技能权限管理

AgenticNode 类层次

┌─────────────────────────────────────────────────────────────┐│                      AgenticNode                            ││  ─────────────────────────────────────────────────────────  ││  + session: AdvancedSQLiteSession                           ││  + session_id: str                                          ││  + mcp_servers: Dict[str, MCPServerStdio]                   ││  + actions: List[ActionHistory]                             ││  + permission_manager: PermissionManager                    ││  + skill_manager: SkillManager                              ││  + action_bus: ActionBus                                    ││  ─────────────────────────────────────────────────────────  ││  + _get_or_create_session(): (session, summary)             ││  + _manual_compact(): dict                                  ││  + _auto_compact(): bool                                    ││  + execute_stream(): AsyncGenerator[ActionHistory]          ││  + setup_tools(): List[Tool]                                ││  ─────────────────────────────────────────────────────────  │└─────────────────────────────────────────────────────────────┘                              │         ┌────────────────────┼────────────────────┐         │                    │                    │         ▼                    ▼                    ▼┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐│ ChatAgenticNode │  │ GenSQLAgenticNode│  │GenReportAgenticNode││  (聊天节点)      │  │  (SQL 生成节点)   │  │  (报告生成节点)    │└─────────────────┘  └─────────────────┘  └─────────────────┘

会话管理机制

classAgenticNode(Node):def_get_or_create_session(self) -> tuple[AdvancedSQLiteSession, Optional[str]]:"""获取或创建会话        Returns:            (session, summary) - summary 是之前压缩的对话摘要        """        summary = Noneif self._session isNone:if self.session_id isNone:                self.session_id = self._generate_session_id()if self.ephemeral:# 临时会话(用于子 agent)- 不持久化                self._session = AdvancedSQLiteSession(                    session_id=self.session_id,                    db_path=":memory:",                    create_tables=True,                )else:# 持久化会话                self._session = self.model.create_session(self.session_id)# 如果有之前的压缩摘要,返回用于恢复上下文if self.last_summary:                summary = self.last_summary                self.last_summary = Nonereturn self._session, summary

自动压缩机制

asyncdef_auto_compact(self) -> bool:"""当会话接近 token 限制 (~90%) 时自动压缩"""ifnot self.model ornot self.context_length:returnFalse    current_tokens = await self._count_session_tokens()if current_tokens > (self.context_length * 0.9):        logger.info(f"触发自动压缩:{current_tokens}/{self.context_length} tokens")        result = await self._manual_compact()return result.get("success"False)returnFalseasyncdef_manual_compact(self) -> dict:"""手动压缩会话,生成对话摘要"""# 1. 使用 LLM 生成摘要    summarization_prompt = ("Summarize our conversation up to this point. ""The summary should be a concise yet comprehensive overview..."    )    result = await self.model.generate_with_tools(        prompt=summarization_prompt,        session=self._session,        max_turns=1,        temperature=0.3,        max_tokens=2000    )# 2. 存储摘要供下次会话使用    self.last_summary = result.get("content""")# 3. 删除旧会话    self.model.delete_session(self.session_id)    self.session_id = None    self._session = Nonereturn {"success"True"summary": self.last_summary}

工具调用流式处理

async def execute_stream(    self, action_history_manager: Optional[ActionHistoryManager] = None) -> AsyncGenerator[ActionHistory, None]:"""流式执行,实时输出工具调用和响应"""    session, conversation_summary = self._get_or_create_session()# 构建系统 prompt    system_prompt = self._get_system_prompt(conversation_summary)# 设置工具    tools = self.setup_tools()# 流式执行async for action in self.model.run_stream(        system_prompt=system_prompt,        prompt=self.input.query,        session=session,        tools=tools,        mcp_servers=self.mcp_servers,    ):# 通过 action_bus 合并子动作async for merged_action in self.action_bus.process(action):yield merged_action# 记录到历史            self.actions.append(merged_action)if action_history_manager:                action_history_manager.add_action(merged_action)# 检查是否需要压缩await self._auto_compact()

权限管理系统

classAgenticNode(Node):def_setup_permission_manager(self) -> None:"""初始化统一的权限管理器"""ifnot self.agent_config ornothasattr(self.agent_config, "permissions_config"):return        permissions_config = self.agent_config.permissions_config# 获取节点特定的权限覆盖        node_permissions = self.node_config.get("permissions", {})        self.permission_manager = PermissionManager(            global_config=permissions_config,            node_overrides={self.get_node_name(): node_permissions} if node_permissions else {},        )

技能系统集成

classAgenticNode(Node):def_setup_skill_manager(self) -> None:"""初始化技能管理器"""ifnot self.agent_config ornothasattr(self.agent_config, "skills_config"):return        skills_config = self.agent_config.skills_config        self.skill_manager = SkillManager(            config=skills_config,            permission_manager=self.permission_manager,        )def_setup_skill_func_tools(self) -> None:"""设置技能函数工具"""        skill_patterns_str = self.node_config.get("skills")ifnot skill_patterns_str:return        self.skill_func_tool = SkillFuncTool(            manager=self.skill_manager,            node_name=self.get_node_name(),        )

Planning & Reflection 机制

反思工作流

Datus-Agent 实现了**自反思(Self-Reflection)**机制,通过迭代优化提高 SQL 生成准确率:

┌─────────────────────────────────────────────────────────────────┐│                    Reflection Workflow                           │└─────────────────────────────────────────────────────────────────┘Round 1:┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐│  Begin   │───▶│ Schema   │───▶│ Generate │───▶│ Execute  ││  Node    │    │ Linking  │    │   SQL    │    │   SQL    │└──────────┘    └──────────┘    └──────────┘    └──────────┘                                              │                                              ▼Round 2:                               ┌──────────┐┌──────────┐    ┌──────────┐          │  Reflect ││  Fix     │───▶│ Generate │◀─────────│   Node   ││  Node    │    │   SQL    │          │ (分析)   │└──────────┘    └──────────┘          └──────────┘

ReflectNode 实现

classReflectNode(Node):defexecute(self) -> BaseResult:"""执行反思分析"""# 1. 收集所有节点执行结果        node_results = []for node_id in self.workflow.node_order[:self.workflow.current_node_index]:            node = self.workflow.get_node(node_id)            node_results.append({"type": node.type,"status": node.status,"result": node.result,            })# 2. 构建反思 prompt        prompt = f"""        Analyze the workflow execution:        Task: {self.workflow.get_task()}        Execution Results:{self._format_results(node_results)}        Please identify:        1. What went well?        2. What went wrong?        3. How to improve?        """# 3. 调用 LLM 生成反思        response = self.model.generate(prompt)# 4. 解析反思结果return ReflectionResult(            success=True,            analysis=response.analysis,            suggestions=response.suggestions,            workflow_adjustments=response.adjustments,        )

FixNode 实现

classFixNode(Node):defexecute(self) -> BaseResult:"""根据反思结果修复 SQL"""# 1. 获取反思建议        reflection = self.workflow.get_last_node_by_type(NodeType.TYPE_REFLECT)        suggestions = reflection.result.suggestions# 2. 获取之前的 SQL 和错误        last_sql = self.workflow.get_last_sql_context().sql        error_message = self.workflow.get_last_sql_context().error# 3. 构建修复 prompt        prompt = f"""        Fix the SQL based on reflection suggestions:        Original SQL: {last_sql}        Error: {error_message}        Suggestions: {suggestions}        Generate corrected SQL.        """# 4. 生成修复后的 SQL        response = self.model.generate(prompt)return FixInput(            success=True,            fixed_sql=response.sql,            reasoning=response.reasoning,        )

工作流调整

defadjust_nodes(self, suggestions: List[Dict]):"""根据反思建议调整工作流"""for suggestion in suggestions:        action = suggestion.get("action")        node_id = suggestion.get("node_id")if action == "add"and"node"in suggestion:# 添加新节点            node_data = suggestion["node"]            node = Node.new_instance(                node_id=node_data.get("id"),                description=node_data.get("description"),                node_type=node_data.get("type"),                agent_config=self.global_config,            )            self.add_node(node, suggestion.get("position"))elif action == "remove"and node_id:# 移除节点            self.remove_node(node_id)elif action == "modify"and node_id:# 修改节点            node = self.get_node(node_id)if node:                modifications = suggestion["modifications"]if"description"in modifications:                    node.description = modifications["description"]

核心调用流程

完整调用链路

用户输入   │   ▼┌─────────────────┐│  DatusCLI.run() │  (cli/repl.py)└────────┬────────┘         │         ▼┌─────────────────┐│ _execute_chat_  │  解析命令类型│    command()    │└────────┬────────┘         │         ▼┌─────────────────┐│ ChatCommands.   │  创建/恢复 ChatAgenticNode│   cmd_chat()    │└────────┬────────┘         │         ▼┌─────────────────┐│ AgenticNode.    │  流式执行│  execute_stream │└────────┬────────┘         │         ├─────────────────┐         │                 │         ▼                 ▼┌─────────────────┐ ┌─────────────────┐│  setup_tools()  │ │ _get_system_    ││  准备工具列表   │ │   prompt()      ││                 │ │  构建系统提示   │└─────────────────┘ └────────┬────────┘                             │                             ▼                    ┌─────────────────┐                    │ _get_or_create_ │                    │    session()    │                    │  获取/创建会话  │                    └────────┬────────┘                             │                             ▼                    ┌─────────────────┐                    │  model.run_     │                    │    stream()     │                    │  流式调用 LLM   │                    └────────┬────────┘                             │              ┌──────────────┼──────────────┐              │              │              │              ▼              ▼              ▼       ┌──────────┐  ┌──────────┐  ┌──────────┐       │  Tool    │  │  Tool    │  │  Final   │       │  Call    │  │  Result  │  │  Output  │       └────┬─────┘  └────┬─────┘  └────┬─────┘            │             │             │            └─────────────┴─────────────┘                          │                          ▼                 ┌─────────────────┐                 │  ActionHistory  │                 │  记录到历史     │                 └─────────────────┘

关键代码位置索引

功能
文件路径
关键函数/类
CLI 入口
datus/cli/main.py main()

Application.run()
REPL 循环
datus/cli/repl.py DatusCLI.run()

_execute_chat_command()
聊天命令
datus/cli/chat_commands.py ChatCommands.cmd_chat()
AgenticNode
datus/agent/node/agentic_node.py AgenticNode.execute_stream()
会话管理
datus/agent/node/agentic_node.py _get_or_create_session()
工具设置
各 AgenticNode 子类
setup_tools()
系统 Prompt
datus/agent/node/agentic_node.py _get_system_prompt()
LLM 调用
datus/models/base.py LLMBaseModel.run_stream()

总结

Datus-Agent 的 Agent 核心架构体现了以下设计原则:

  1. 模块化
    :Node 系统高度模块化,每个节点职责单一
  2. 可扩展
    :工厂模式支持快速添加新节点类型
  3. 流式交互
    :AgenticNode 支持实时输出,提升用户体验
  4. 自反思
    :Reflection 机制实现持续改进
  5. 会话管理
    :基于 SQLite 的持久化会话,支持跨轮次对话
  6. 权限控制
    :统一的权限管理系统,保障安全性

下一模块:CLI 交互系统设计