乐于分享
好东西不私藏

Datus-Agent 深度分析文档 – 模块 2:CLI 交互系统设计

Datus-Agent 深度分析文档 – 模块 2:CLI 交互系统设计

Datus-Agent 深度分析文档 – 模块 2:CLI 交互系统设计

本文档深入分析 Datus-CLI 的交互系统设计,包括 REPL 架构、命令系统、Subagent Wizard、交互式 UI 组件。


目录

  1. CLI 架构概览
  2. REPL 引擎设计
  3. 命令系统设计
  4. Subagent Wizard
  5. 交互式 UI 组件
  6. 执行状态管理

CLI 架构概览

整体架构图

┌─────────────────────────────────────────────────────────────────────────┐│                            Datus-CLI                                     │├─────────────────────────────────────────────────────────────────────────┤│                                                                          ││  ┌──────────────┐     ┌──────────────┐     ┌──────────────┐            ││  │  main.py     │────▶│   repl.py    │────▶│  Commands    │            ││  │  Entry Point │     │  REPL Loop   │     │  Handlers    │            ││  └──────────────┘     └──────────────┘     └──────────────┘            ││         │                   │                   │                        ││         ▼                   ▼                   ▼                        ││  ┌──────────────┐     ┌──────────────┐     ┌──────────────┐            ││  │  Argument    │     │  Prompt      │     │  Action      │            ││  │  Parser      │     │  Session     │     │  Display     │            ││  └──────────────┘     └──────────────┘     └──────────────┘            ││                                                                          │└─────────────────────────────────────────────────────────────────────────┘                              │         ┌────────────────────┼────────────────────┐         ▼                    ▼                    ▼┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐│  Chat Commands  │  │  Agent Commands │  │  SubAgent       ││  - /chat        │  │  - !sl          │  │  Commands       ││  - /reason      │  │  - !sm          │  │  - .subagent    ││  - /fix         │  │  - !sd          │  │  - add/edit     │└─────────────────┘  └─────────────────┘  └─────────────────┘

核心组件职责

组件
文件位置
核心职责
main.py datus/cli/main.py
CLI 入口点,参数解析,模式选择
repl.py datus/cli/repl.py
REPL 主循环,提示符管理,命令分发
chat_commands.py datus/cli/chat_commands.py
聊天命令处理,Node 管理,流式输出
agent_commands.py datus/cli/agent_commands.py
Agent/Workflow/Node 命令
sub_agent_commands.py datus/cli/sub_agent_commands.py
Subagent 管理命令
sub_agent_wizard.py datus/cli/sub_agent_wizard.py
Subagent 创建向导(TUI)
execution_state.py datus/cli/execution_state.py
执行状态管理,中断处理

REPL 引擎设计

REPL 核心类结构

classDatusCLI:"""Main REPL for the Datus CLI application."""def__init__(self, args, interactive: bool = True):        self.args = args        self.interactive = interactive        self.console = Console(log_path=False)# Agent 管理        self.agent = None        self.agent_initializing = False        self.agent_ready = False        self._workflow_runner: WorkflowRunner | None = None# 配置管理        self.agent_config = load_agent_config(**vars(self.args))        self.configuration_manager = configuration_manager()# 历史记录        self.history = FileHistory(str(history_file))        self.session: PromptSession | None = None# 子 Agent 管理        self.available_subagents = set(SYS_SUB_AGENTS)# CLI 上下文        self.cli_context = CliContext(            current_db_name=getattr(args, "database"""),            current_catalog=getattr(args, "catalog"""),            current_schema=getattr(args, "schema"""),        )# 命令处理器        self.agent_commands = AgentCommands(self, self.cli_context)        self.chat_commands = ChatCommands(self)        self.context_commands = ContextCommands(self)        self.metadata_commands = MetadataCommands(self)        self.sub_agent_commands = SubAgentCommands(self)        self.bi_dashboard_commands = BiDashboardCommands(self)

REPL 主循环

defrun(self):"""Run the REPL loop."""    self._print_welcome()whileTrue:try:# 1. 获取动态提示符            prompt_text = self._get_prompt_text()# 2. 获取用户输入(支持预填充)            prefill = self._prefill_input or""            user_input_raw = self.session.prompt(                message=prompt_text,                default=prefill,            )# 3. 回显用户输入(语法高亮)            self._echo_user_input(prompt_text, user_input)# 4. 解析命令类型            cmd_type, cmd, args = self._parse_command(user_input)# 5. 根据类型执行命令if cmd_type == CommandType.SQL:                self._execute_sql(user_input)elif cmd_type == CommandType.TOOL:                self._execute_tool_command(cmd, args)elif cmd_type == CommandType.CONTEXT:                self._execute_context_command(cmd, args)elif cmd_type == CommandType.CHAT:                self._execute_chat_command(args, subagent_name=cmd)elif cmd_type == CommandType.INTERNAL:                self._execute_internal_command(cmd, args)except KeyboardInterrupt:continueexcept EOFError:return0

命令类型解析

classCommandType(Enum):"""Type of command entered by the user."""    SQL = "sql"# 普通 SQL 语句    TOOL = "tool"# !command (工具/工作流命令)    CONTEXT = "context"# @command (上下文探索命令)    CHAT = "chat"# /command (聊天命令)    INTERNAL = "internal"# .command (CLI 控制命令)    EXIT = "exit"# exit/quit 命令def_parse_command(self, user_input: str) -> Tuple[CommandType, strstr]:"""Parse user input into command type, command, and arguments."""    user_input = user_input.strip()# 检查退出命令if user_input.lower() in ["exit""quit"]:return CommandType.EXIT, """"# 检查命令前缀if user_input.startswith("!"):# Tool command: !command args        parts = user_input[1:].split(None1)return CommandType.TOOL, parts[0], parts[1iflen(parts) > 1else""elif user_input.startswith("@"):# Context command: @command args        parts = user_input[1:].split(None1)return CommandType.CONTEXT, parts[0], parts[1iflen(parts) > 1else""elif user_input.startswith("/"):# Chat command: /command args 或 /subagent_name args        parts = user_input[1:].split(None1)return CommandType.CHAT, parts[0], parts[1iflen(parts) > 1else""elif user_input.startswith("."):# Internal command: .command args        parts = user_input[1:].split(None1)return CommandType.INTERNAL, parts[0], parts[1iflen(parts) > 1else""else:# 默认为 SQL 语句return CommandType.SQL, "", user_input

Prompt Session 配置

def_init_prompt_session(self):"""初始化 prompt_toolkit session"""    self.session = PromptSession(        history=self.history,                    # 文件历史存储        auto_suggest=AutoSuggestFromHistory(),   # 历史建议        lexer=PygmentsLexer(CustomSqlLexer),     # SQL 语法高亮        completer=self.create_combined_completer(),  # 自动补全        multiline=True,                          # 多行输入        key_bindings=self._create_custom_key_bindings(),  # 自定义键绑定        enable_history_search=True,              # 历史搜索        search_ignore_case=True,                 # 忽略大小写        erase_when_done=True,                    # 完成后清空        style=merge_styles([            style_from_pygments_cls(CustomPygmentsStyle),            Style.from_dict({"prompt""ansigreen bold"}),        ]),        complete_while_typing=True,              # 输入时补全    )

自定义键绑定

def_create_custom_key_bindings(self):"""创建自定义键绑定"""    kb = KeyBindings()    @kb.add("tab")def_(event):"""Tab 键:触发补全"""        buffer = event.app.current_bufferif buffer.complete_state:            buffer.complete_next()  # 菜单已打开,选择下一项else:            buffer.start_completion(select_first=False)  # 打开菜单    @kb.add("s-tab")def_(event):"""Shift+Tab:切换 Plan 模式"""        self.plan_mode_active = not self.plan_mode_active        buffer = event.app.current_buffer        buffer.reset()  # 清空输入        event.app.exit()  # 退出当前 prompt,触发重新显示    @kb.add("enter")def_(event):"""Enter 键:应用补全或执行"""        buffer = event.app.current_bufferif buffer.complete_state:            comp = cs.current_completionif comp isnotNone:                buffer.apply_completion(comp)  # 应用高亮项else:                buffer.cancel_completion()  # 关闭菜单                buffer.validate_and_handle()  # 执行return        buffer.validate_and_handle()  # 正常执行    @kb.add("c-o")def_(event):"""Ctrl+O:显示详细追踪信息"""        event.app.exit(result="_open_chat_sql_details")return kb

命令系统设计

命令注册表

# 在 DatusCLI.__init__() 中注册self.commands = {# Agent 工具命令 (!)"!sl": self.agent_commands.cmd_schema_linking,"!sm": self.agent_commands.cmd_search_metrics,"!sq": self.agent_commands.cmd_search_reference_sql,"!sd": self.agent_commands.cmd_doc_search,"!save": self.agent_commands.cmd_save,"!bash": self._cmd_bash,# 上下文命令 (@)"@catalog": self.context_commands.cmd_catalog,"@subject": self.context_commands.cmd_subject,# 内部命令 (.)".clear": self.chat_commands.cmd_clear_chat,".chat_info": self.chat_commands.cmd_chat_info,".compact": self.chat_commands.cmd_compact,".sessions": self.chat_commands.cmd_list_sessions,".resume": self.chat_commands.cmd_resume,".rewind": self.chat_commands.cmd_rewind,".databases": self.metadata_commands.cmd_list_databases,".database": self.metadata_commands.cmd_switch_database,".tables": self.metadata_commands.cmd_tables,".schemas": self.metadata_commands.cmd_schemas,".schema": self.metadata_commands.cmd_switch_schema,".table_schema": self.metadata_commands.cmd_table_schema,".indexes": self.metadata_commands.cmd_indexes,".namespace": self._cmd_switch_namespace,".subagent": self.sub_agent_commands.cmd,".mcp": self._cmd_mcp,".skill": self._cmd_skill,".bootstrap-bi": self.bi_dashboard_commands.cmd,".help": self._cmd_help,".exit": self._cmd_exit,".quit": self._cmd_exit,}

命令执行流程

用户输入: /chat 查询销售额         │         ▼┌─────────────────┐│ _parse_command()│  解析为 (CHAT, "chat", "查询销售额")└────────┬────────┘         │         ▼┌─────────────────┐│ _execute_chat_  │  分发到 chat_commands│    command()    │└────────┬────────┘         │         ▼┌─────────────────┐│ ChatCommands.   │  创建/恢复 ChatAgenticNode│   cmd_chat()    │└────────┬────────┘         │         ▼┌─────────────────┐│ _execute_chat() │  执行聊天命令└────────┬────────┘         │         ├─────────────────┐         │                 │         ▼                 ▼┌─────────────────┐ ┌─────────────────┐│  创建 Node 输入   │ │  流式执行 Node   ││  create_node_   │ │  execute_stream ││     input()     │ │                 │└─────────────────┘ └────────┬────────┘                             │                             ▼                    ┌─────────────────┐                    │  ActionHistory  │                    │  显示输出       │                    └─────────────────┘

ChatCommands 核心方法

classChatCommands:"""Handles all chat-related commands and functionality."""def__init__(self, cli_instance: "DatusCLI"):        self.cli = cli_instance        self.console = cli_instance.console# Node 管理        self.current_node: ChatAgenticNode | None = None        self.chat_node: ChatAgenticNode | None = None        self.current_subagent_name: str | None = None        self.chat_history = []        self.last_actions = []        self.all_turn_actions: List[Tuple[strList[ActionHistory]]] = []def_should_create_new_node(self, subagent_name: str = None) -> bool:"""判断是否需要创建新 Node"""if self.current_node isNone:returnTrueif subagent_name:# 切换到 subagent 或 subagent 变化时创建新节点return self.current_subagent_name != subagent_nameelse:# 从 subagent 切换回 regular 时创建新节点returnbool(self.current_subagent_name)def_create_new_node(self, subagent_name: str = None):"""创建新 Node"""from datus.agent.node.node_factory import create_interactive_node        label = subagent_name or"chat"        self.console.print(f"[dim]Creating new {label} session...[/]")return create_interactive_node(subagent_name, self.cli.agent_config, node_id_suffix="_cli")defexecute_chat_command(        self,        message: str,        plan_mode: bool = False,        subagent_name: Optional[str] = None,        compact_when_new_subagent: bool = True,):"""执行聊天命令"""        self._execute_chat(            message,            plan_mode=plan_mode,            subagent_name=subagent_name,            compact_when_new_subagent=compact_when_new_subagent,            interactive=True,        )

流式执行与显示

def_execute_chat(    self,    message: str,    plan_mode: bool = False,    subagent_name: Optional[str] = None,    compact_when_new_subagent: bool = True,    interactive: bool = True,):"""执行聊天命令的核心逻辑"""# 1. 判断是否需要创建新 Nodeif self._should_create_new_node(subagent_name):# 切换 subagent 时压缩当前会话if compact_when_new_subagent:            self._trigger_compact_for_current_node()# 创建新 Node        self.current_node = self._create_new_node(subagent_name)        self.current_subagent_name = subagent_name# 2. 解析 @ 引用    at_tables, at_metrics, at_sqls = self._parse_at_references(message)# 3. 创建 Node 输入    node_input, node_type = self.create_node_input(        user_message=message,        current_node=self.current_node,        at_tables=at_tables,        at_metrics=at_metrics,        at_sqls=at_sqls,        plan_mode=plan_mode,    )# 4. 流式执行asyncdefrun_stream():asyncfor action in self.current_node.execute_stream(            input_data=node_input,            action_history_manager=self.cli.actions,        ):yield action# 5. 显示输出    actions = asyncio.run(self._run_and_display(run_stream()))    self.last_actions = actions

Subagent Wizard

向导架构

Subagent Wizard 是一个基于 prompt_toolkit 的终端用户界面 (TUI),提供分步式配置创建体验。

┌──────────────────────────────────────────────────────────────────┐│                    SubAgent Wizard                                ││                                                                   ││  Step 1: 基本信息                                                  ││  ┌────────────────────────────────────────────────────────────┐  ││  │ Agent Name: [sql-analyst____________]                       │  ││  │ Description: [用于 SQL 查询分析的智能助手________________]    │  ││  └────────────────────────────────────────────────────────────┘  ││                                                                   ││  Step 2: 系统提示                                                  ││  ┌────────────────────────────────────────────────────────────┐  ││  │ System Prompt:                                              │  ││  │ ┌────────────────────────────────────────────────────────┐ │  ││  │ │ You are a SQL analyst...                               │ │  ││  │ │                                                         │ │  ││  │ │                                                         │ │  ││  │ └────────────────────────────────────────────────────────┘ │  ││  └────────────────────────────────────────────────────────────┘  ││                                                                   ││  Step 3: 工具选择                                                  ││  ┌────────────────────────────────────────────────────────────┐  ││  │ Available Tools:                                            │  ││  │ ☑ db_query       - Execute SQL queries                     │  ││  │ ☑ schema_search  - Search table schemas                   │  ││  │ ☐ metric_search  - Search metrics                          │  ││  │ ☐ doc_search     - Search documentation                    │  ││  └────────────────────────────────────────────────────────────┘  ││                                                                   ││  Step 4: 规则配置                                                  ││  ┌────────────────────────────────────────────────────────────┐  ││  │ Rules:                                                      │  ││  │ 1. Always use LIMIT 100 for initial queries                │  ││  │ 2. Explain complex SQL before executing                    │  ││  │ [+ Add Rule]                                                │  ││  └────────────────────────────────────────────────────────────┘  ││                                                                   ││                    [Cancel]  [< Back]  [Next >]  [Finish]        │└──────────────────────────────────────────────────────────────────┘

Wizard 类结构

classSubAgentWizard:"""交互式 Subagent 创建向导"""def__init__(self, cli_instance: "DatusCLI", data: Optional[Union[SubAgentConfig, Dict]] = None):        self.cli_instance = cli_instance# 配置数据ifnot data:            self.prompt_template_name = "sql_system"            self.data = SubAgentConfig(                system_prompt="",                agent_description="",                scoped_context=ScopedContext(),            )else:            self.data = data            self.prompt_template_name = f"{self.data.system_prompt}_system"# 步骤管理        self.step = 0        self.done = False        self.error_dialog = None# UI 组件        self._init_components()        self._init_key_bindings()        self._init_layout()# 应用        self.app = Application(            layout=self.layout,            key_bindings=self.kb,            style=self.style,            full_screen=True,            mouse_support=True,        )

组件初始化

def_init_components(self):"""初始化 UI 组件"""# Step 1: 基本信息    self.name_input = TextArea(        text="",        placeholder="Enter agent name (e.g., sql-analyst)",        multiline=False,    )    self.description_input = TextArea(        text="",        placeholder="Enter agent description",        multiline=False,    )# Step 2: 系统提示    self.system_prompt_input = TextArea(        text="",        placeholder="Enter system prompt...",        lexer=PygmentsLexer(YamlLexer),    )    self.preview_panel = Frame(        Label("Preview will appear here"),        title="YAML Preview",    )# Step 3: 工具选择    self.tool_checkbox = CheckboxList(        values=[            ("db_query""db_query - Execute SQL queries"),            ("schema_search""schema_search - Search table schemas"),            ("metric_search""metric_search - Search metrics"),            ("doc_search""doc_search - Search documentation"),        ]    )# Step 4: 规则配置    self.rules_list = []    self.selected_rule_index = 0# 导航按钮    self.prev_button = Button("< Back", handler=self._prev_step)    self.next_button = Button("Next >", handler=self._next_step)    self.finish_button = Button("Finish", handler=self._finish)    self.cancel_button = Button("Cancel", handler=self._cancel)

布局系统

def_init_layout(self):"""初始化布局"""# 顶部标题    header = Label("SubAgent Wizard - Create a new agent", style="class:title")# 步骤指示器    step_indicator = self._create_step_indicator()# 主内容区域(动态切换)    self.main_content = self._create_step_content(0)# 底部按钮    button_container = VSplit([        self.cancel_button,        self.prev_button,        self.next_button,        self.finish_button,    ], align=WindowAlign.RIGHT)# 完整布局    self.layout = Layout(        HSplit([            header,            step_indicator,            self.main_content,            button_container,        ])    )

分步逻辑

def_next_step(self):"""进入下一步"""# 验证当前步骤ifnot self._validate_current_step():        self._show_error("Please fill in all required fields")returnif self.step < 4:        self.step += 1        self.main_content = self._create_step_content(self.step)        self.layout.reload()if self.step == 4:        self.next_button.text = "Finish"        self.next_button.handler = self._finishdef_prev_step(self):"""返回上一步"""if self.step > 0:        self.step -= 1        self.main_content = self._create_step_content(self.step)        self.layout.reload()if self.step < 4:        self.next_button.text = "Next >"        self.next_button.handler = self._next_stepdef_create_step_content(self, step: int):"""创建步骤内容"""if step == 0:# Step 1: 基本信息return VSplit([            Frame(self.name_input, title="Agent Name"),            Frame(self.description_input, title="Description"),        ])elif step == 1:# Step 2: 系统提示return VSplit([            Frame(self.system_prompt_input, title="System Prompt"),            Frame(self.preview_panel, title="YAML Preview"),        ])elif step == 2:# Step 3: 工具选择return Frame(self.tool_checkbox, title="Available Tools")elif step == 3:# Step 4: 规则配置return Frame(self._create_rules_list(), title="Rules")

完成与保存

def_finish(self):"""完成向导并保存配置"""# 最终验证ifnot self._validate_all_steps():        self._show_error("Please complete all required fields")return# 构建配置    config = SubAgentConfig(        system_prompt=self.name_input.text,        agent_description=self.description_input.text,        tools=self._get_selected_tools(),        rules=self.rules_list,        scoped_context=ScopedContext(            tables=self.selected_tables,            metrics=self.selected_metrics,            sqls=self.selected_sqls,        ),    )# 保存到配置文件    config_path = self.cli_instance.agent_config.path_manager.subagent_config_path(        config.system_prompt    )withopen(config_path, "w"as f:        yaml.dump(config.model_dump(), f, default_flow_style=False)    self.console.print(f"[green]✓ Subagent '{config.system_prompt}' created successfully![/]")# 退出应用    self.done = True    self.app.exit()

交互式 UI 组件

自动补全系统

classAtReferenceCompleter(Completer):"""@ 引用补全器:@table, @metric, @sql"""def__init__(self, agent_config, available_subagents: Set[str]):        self.agent_config = agent_config        self.available_subagents = available_subagents        self._cache = {}defget_completions(self, document: Document, complete_event):        text = document.text        cursor_pos = document.cursor_position# 查找 @ 模式match = re.search(r'@(\w*)$', text[:cursor_pos])ifmatch:            prefix = match.group(1)# 补全 subagent 名称for subagent in self.available_subagents:if subagent.startswith(prefix):yield Completion(                        subagent,                        start_position=-len(prefix),                        display=subagent,                    )# 补全表引用for table in self._get_available_tables():if table.startswith(prefix):yield Completion(f"table:{table}",                        start_position=-len(prefix),                        display=f"📊 {table}",                    )classSQLCompleter(Completer):"""SQL 关键字补全"""    KEYWORDS = ["SELECT""FROM""WHERE""GROUP BY""ORDER BY""LIMIT","JOIN""LEFT JOIN""INNER JOIN""RIGHT JOIN","COUNT""SUM""AVG""MAX""MIN","CASE""WHEN""THEN""ELSE""END",    ]defget_completions(self, document: Document, complete_event):        text = document.text        cursor_pos = document.cursor_position        word = self._get_word_before_cursor(text, cursor_pos)for keyword in self.KEYWORDS:if keyword.upper().startswith(word.upper()):yield Completion(                    keyword,                    start_position=-len(word),                    display=keyword,                )

语法高亮

classCustomSqlLexer(PygmentsLexer):"""自定义 SQL 语法高亮"""def__init__(self):super().__init__(SqlLexer)defhighlight(self, text: str) -> str:"""高亮 SQL 文本"""return highlight(text, SqlLexer(), TerminalTrueColorFormatter())classCustomPygmentsStyle(Style):"""自定义 Pygments 样式"""    styles = {        Token.Keyword: "ansiblue",        Token.Name: "ansigreen",        Token.String: "ansiyellow",        Token.Number: "ansimagenta",        Token.Operator: "ansiwhite",        Token.Comment: "ansibrightblack italic",    }

智能表格显示

def_smart_display_table(    self,    data: List[Dict[strAny]],    columns: Optional[List[str]] = None,) -> None:"""智能表格显示,处理宽表"""ifnot data:        self.console.print("[yellow]No data to display[/]")return# 计算最大列数    max_columns = max(4, self.console.width // self.console_column_width)# 智能列选择:前部 + 后部 + 省略号iflen(all_columns_list) > max_columns:        show_back = max_columns // 2        show_front = max_columns - show_back        display_columns = front_columns + ["..."] + back_columnselse:        display_columns = all_columns_list# 动态列宽    num_display_columns = len([col for col in display_columns if col != "..."])if num_display_columns <= 2:        dynamic_column_width = max(25, self.console.width // max(2, num_display_columns) - 4)elif num_display_columns <= 4:        dynamic_column_width = max(20, self.console.width // num_display_columns - 3)else:        dynamic_column_width = self.console_column_width# 创建表格    table = Table(show_header=True, header_style="bold green")for col in display_columns:if col == "...":            table.add_column(col, width=5, justify="center")else:            table.add_column(col, width=dynamic_column_width, overflow="fold", no_wrap=False)# 添加行for row in data:        row_values = []for col in display_columns:if col == "...":                row_values.append("...")else:                value = row.get(col)ifisinstance(value, datetime):                    value = value.strftime("%Y-%m-%d %H:%M:%S")else:                    value = str(value)                row_values.append(value)        table.add_row(*row_values)    self.console.print(table)

执行状态管理

ExecutionState 类

classExecutionState:"""管理执行状态和中断处理"""def__init__(self):        self.is_running = False        self.is_paused = False        self.should_stop = False        self.current_action = None        self.progress = 0.0        self.error = Nonedefstart(self):"""开始执行"""        self.is_running = True        self.is_paused = False        self.should_stop = False        self.error = Nonedefpause(self):"""暂停执行"""        self.is_paused = Truedefresume(self):"""恢复执行"""        self.is_paused = Falsedefstop(self):"""停止执行"""        self.should_stop = True        self.is_running = Falsedefupdate_progress(self, action: str, progress: float):"""更新进度"""        self.current_action = action        self.progress = progressclassInterruptController:"""中断控制器"""def__init__(self):        self._interrupt_event = asyncio.Event()        self._handlers = []defregister_handler(self, handler: Callable):"""注册中断处理器"""        self._handlers.append(handler)asyncdeftrigger(self, reason: str):"""触发中断"""for handler in self._handlers:await handler(reason)        self._interrupt_event.set()defreset(self):"""重置中断状态"""        self._interrupt_event.clear()

InteractionBroker 类

classInteractionBroker:"""交互代理,处理用户交互请求"""def__init__(self):        self._pending_questions = []        self._responses = {}        self._lock = asyncio.Lock()asyncdefask_user(self, question: str, options: List[str] = None) -> str:"""向用户提问"""asyncwith self._lock:            question_id = str(uuid.uuid4())            self._pending_questions.append({"id": question_id,"question": question,"options": options,            })# 等待响应while question_id notin self._responses:await asyncio.sleep(0.1)return self._responses.pop(question_id)defsubmit_response(self, question_id: str, response: str):"""提交响应"""        self._responses[question_id] = response

自动提交交互

@asynccontextmanagerasyncdefauto_submit_interaction(broker: InteractionBroker, console: Console):"""自动提交交互的上下文管理器"""asyncdefhandle_question(question_data: dict):        question_id = question_data["id"]        question = question_data["question"]        options = question_data["options"]# 显示问题        console.print(f"\n[bold blue]{question}[/]")if options:# 显示选项for i, option inenumerate(options, 1):                console.print(f"  {i}{option}")# 获取选择            choice = await asyncio.get_event_loop().run_in_executor(None,lambdainput("Enter choice number: ")            )            response = options[int(choice) - 1]else:# 获取文本输入            response = await asyncio.get_event_loop().run_in_executor(None,lambdainput("Enter response: ")            )# 提交响应        broker.submit_response(question_id, response)# 启动监听    task = asyncio.create_task(_listen_for_questions(broker, handle_question))try:yieldfinally:        task.cancel()try:await taskexcept asyncio.CancelledError:pass

总结

Datus-CLI 的交互系统设计体现了以下原则:

  1. 用户友好
    :REPL + 自动补全 + 语法高亮
  2. 模块化
    :命令处理器分离,职责单一
  3. 可扩展
    :命令注册表模式,易于添加新命令
  4. 交互式
    :Subagent Wizard 提供向导式配置体验
  5. 流式输出
    :实时显示执行进度和结果
  6. 状态管理
    :完善的执行状态和中断处理机制

上一模块:Agent 核心架构下一模块:工具与连接器架构