Harness 工程之拆解 AI 编程助手(三):深入内核——那条指令背后,六个模块各司其职(1)
LLM 客户端(llm.py,200 行)
是什么
为什么需要”薄封装”
-
流式 tool_calls 需要手动拼装
-
重试逻辑不应散落在业务代码中
-
费用追踪需要统一入口
核心功能一:流式响应处理
# corecoder/llm.py:105-127def chat(self,messages: list[dict],tools: list[dict] | None = None,on_token=None,) -> LLMResponse:params: dict = {"model": self.model,"messages": messages,"stream": True, # 关键:始终使用流式**self.extra,}if tools:params["tools"] = tools# stream_options 是 OpenAI 的扩展字段,不是所有 provider 都支持try:params["stream_options"] = {"include_usage": True}stream = self._call_with_retry(params)except Exception:params.pop("stream_options", None)stream = self._call_with_retry(params)
# corecoder/llm.py:129-162content_parts: list[str] = []tc_map: dict[int, dict] = {} # index -> {id, name, arguments_str}prompt_tok = 0completion_tok = 0for chunk in stream:# token 用量在最后一个 chunk 中if chunk.usage:prompt_tok = chunk.usage.prompt_tokenscompletion_tok = chunk.usage.completion_tokensif not chunk.choices:continuedelta = chunk.choices[0].delta# 累积文本内容if delta.content:content_parts.append(delta.content)if on_token:on_token(delta.content) # 实时回调给 CLI 层做流式显示# 累积 tool_calls —— 这是最精妙的部分if delta.tool_calls:for tc_delta in delta.tool_calls:idx = tc_delta.index # 用 index 区分不同的 tool_callif idx not in tc_map:tc_map[idx] = {"id": "", "name": "", "args": ""}if tc_delta.id:tc_map[idx]["id"] = tc_delta.idif tc_delta.function:if tc_delta.function.name:tc_map[idx]["name"] = tc_delta.function.nameif tc_delta.function.arguments:tc_map[idx]["args"] += tc_delta.function.arguments # 注意:这里是 += 拼接
chunk 1: tc_delta.id = "call_abc", tc_delta.function.name = "edit_file"chunk 2: tc_delta.function.arguments = '{"file_path": "main'chunk 3: tc_delta.function.arguments = '.py", "old_string": "from ut'chunk 4: tc_delta.function.arguments = 'ils import halper", "new_string"...'chunk 5: tc_delta.function.arguments = ': "from utils import helper"}'
# corecoder/llm.py:164-172parsed: list[ToolCall] = []for idx insorted(tc_map):raw = tc_map[idx]try:args = json.loads(raw["args"])except (json.JSONDecodeError, KeyError):args = {} # 解析失败就传空参数,让工具自己报错parsed.append(ToolCall(id=raw["id"], name=raw["name"], arguments=args))
核心功能二:指数退避重试
# corecoder/llm.py:184-199def _call_with_retry(self, params: dict, max_retries: int = 3):"""Retry on transient errors with exponential backoff."""for attempt in range(max_retries):try:return self.client.chat.completions.create(**params)except (RateLimitError, APITimeoutError, APIConnectionError) as e:if attempt == max_retries - 1:raise # 最后一次还失败就不忍了,直接抛异常wait = 2 ** attempt # 1秒 → 2秒 → 4秒time.sleep(wait)except APIError as e:# 5xx = 服务端错误,重试;4xx = 客户端错误,不重试if e.status_code and e.status_code >= 500 and attempt < max_retries - 1:time.sleep(2 ** attempt)else:raise

核心功能三:费用估算
# corecoder/llm.py:48-76 (定价表)_PRICING = {# OpenAI"gpt-5.4": (2.5, 15), # (input $/M tokens, output $/M tokens)"gpt-4o": (2.5, 10),"gpt-4o-mini": (0.15, 0.6),# DeepSeek"deepseek-chat": (0.27, 1.10),# Anthropic Claude"claude-opus-4-6": (5, 25),# Alibaba Qwen"qwen-max": (0.78, 3.9),# Moonshot Kimi"kimi-k2.5": (0.6, 3),# ... 共 15+ 个模型}
# corecoder/llm.py:93-103@propertydef estimated_cost(self) -> float | None:pricing = _PRICING.get(self.model)if not pricing:return None # 未知模型返回 None,不会瞎猜input_rate, output_rate = pricingreturn (self.total_prompt_tokens * input_rate / 1_000_000+ self.total_completion_tokens * output_rate / 1_000_000)
工具系统(tools/,共 478 行)
工具基类(base.py,28 行)
# corecoder/tools/base.py(完整代码,仅 28 行)from abc import ABC, abstractmethodclass Tool(ABC):"""Minimal tool interface. Subclass this to add new capabilities."""name: str # 工具名称,如 "bash"、"edit_file"description: str # 给 LLM 看的自然语言描述parameters: dict # JSON Schema,定义工具接受的参数@abstractmethoddef execute(self, **kwargs) -> str:"""Run the tool and return a text result."""...def schema(self) -> dict:"""OpenAI function-calling schema."""return {"type": "function","function": {"name": self.name,"description": self.description,"parameters": self.parameters,},}
工具注册表(init.py,28 行)
# corecoder/tools/__init__.py(完整代码)from .bash import BashToolfrom .read import ReadFileToolfrom .write import WriteFileToolfrom .edit import EditFileToolfrom .glob_tool import GlobToolfrom .grep import GrepToolfrom .agent import AgentToolALL_TOOLS = [BashTool(),ReadFileTool(),WriteFileTool(),EditFileTool(),GlobTool(),GrepTool(),AgentTool(),]def get_tool(name: str):"""Look up a tool by name."""for t in ALL_TOOLS:if t.name == name:return treturn None
搜索替换编辑(edit.py,90 行)——最重要的工具

# corecoder/tools/edit.py:44-73def execute(self, file_path: str, old_string: str, new_string: str) -> str:try:p = Path(file_path).expanduser().resolve()if not p.exists():return f"Error: {file_path} not found"content = p.read_text()occurrences = content.count(old_string)if occurrences == 0:# 没找到 → 报错 + 显示文件开头(帮助 LLM 纠正)preview = content[:500] + ("..." if len(content) > 500 else "")return (f"Error: old_string not found in {file_path}.\n"f"File starts with:\n{preview}")if occurrences > 1:# 找到多个 → 报错 + 提示加更多上下文return (f"Error: old_string appears {occurrences} times in {file_path}. "f"Include more surrounding lines to make it unique.")# 恰好找到 1 次 → 替换new_content = content.replace(old_string, new_string, 1)p.write_text(new_content)_changed_files.add(str(p)) # 记录修改过的文件# 生成 unified diff 供用户审查diff = _unified_diff(content, new_content, str(p))return f"Edited {file_path}\n{diff}"except Exception as e:return f"Error: {e}"
-
找到 0 次:不是简单报错,而是把文件的前 500 个字符返回给 LLM。这相当于告诉 LLM:”你看,文件长这样,你再仔细找找。”
-
找到 >1 次:明确告诉 LLM 有几处匹配,并建议”加更多上下文”让匹配唯一。
-
找到恰好 1 次:替换成功,并生成标准 unified diff。
# corecoder/tools/edit.py:76-89def _unified_diff(old: str, new: str, filename: str, context: int = 3) -> str:"""Generate a compact unified diff between old and new file content."""old_lines = old.splitlines(keepends=True)new_lines = new.splitlines(keepends=True)diff = difflib.unified_diff(old_lines, new_lines,fromfile=f"a/{filename}", tofile=f"b/{filename}",n=context,)result = "".join(diff)# 过大的 diff 截断,避免撑爆上下文if len(result) > 3000:result = result[:2500] + "\n... (diff truncated)\n"return result
Shell 执行(bash.py,116 行)
# corecoder/tools/bash.py:19-29_DANGEROUS_PATTERNS = [(r"\brm\s+(-\w*)?-r\w*\s+(/|~|\$HOME)", "recursive delete on home/root"),(r"\brm\s+(-\w*)?-rf\s", "force recursive delete"),(r"\bmkfs\b", "format filesystem"),(r"\bdd\s+.*of=/dev/", "raw disk write"),(r">\s*/dev/sd[a-z]", "overwrite block device"),(r"\bchmod\s+(-R\s+)?777\s+/", "chmod 777 on root"),(r":\(\)\s*\{.*:\|:.*\}", "fork bomb"),(r"\bcurl\b.*\|\s*(sudo\s+)?bash", "pipe curl to bash"),(r"\bwget\b.*\|\s*(sudo\s+)?bash", "pipe wget to bash"),]
# corecoder/tools/bash.py:82-87iflen(out) > 15_000:out = (out[:6000]+ f"\n\n... truncated ({len(out)} chars total) ...\n\n"+ out[-3000:])
# corecoder/tools/bash.py:103-116def _update_cwd(command: str, current_cwd: str):"""Track directory changes from cd commands."""global _cwdparts = command.split("&&")for part in parts:part = part.strip()if part.startswith("cd "):target = part[3:].strip().strip("'\"")if target:new_dir = os.path.normpath(os.path.join(current_cwd, os.path.expanduser(target)))if os.path.isdir(new_dir):_cwd = new_dir
文件读取(read.py,54 行)
# corecoder/tools/read.py:32-53def execute(self, file_path: str, offset: int = 1, limit: int = 2000) -> str:try:p = Path(file_path).expanduser().resolve()if not p.exists():return f"Error: {file_path} not found"if not p.is_file():return f"Error: {file_path} is a directory, not a file"text = p.read_text(errors="replace")lines = text.splitlines()total = len(lines)start = max(0, offset - 1) # 1-based → 0-basedchunk = lines[start : start + limit]numbered = [f"{start + i + 1}\t{ln}" for i, ln in enumerate(chunk)]result = "\n".join(numbered)if total > start + limit:result += f"\n... ({total} lines total, showing {start+1}-{start+len(chunk)})"return result or "(empty file)"except Exception as e:return f"Error: {e}"
-
行号显示:每行前面加 {行号}\t{内容}。这让 LLM 在后续的 edit_file调用中能精确定位(虽然 edit 不用行号,但行号帮助 LLM 理解上下文) -
分页机制:offset+ limit参数支持大文件的分段读取,默认一次最多 2000 行
文件写入(write.py,39 行)
# corecoder/tools/write.py(完整代码,仅 39 行)from pathlib import Pathfrom .base import Toolfrom .edit import _changed_filesclass WriteFileTool(Tool):name = "write_file"description = ("Create a new file or completely overwrite an existing one. ""For small edits to existing files, prefer edit_file instead.")parameters = {"type": "object","properties": {"file_path": {"type": "string", "description": "Path for the file"},"content": {"type": "string", "description": "Full file content to write"},},"required": ["file_path", "content"],}def execute(self, file_path: str, content: str) -> str:try:p = Path(file_path).expanduser().resolve()p.parent.mkdir(parents=True, exist_ok=True) # 自动创建嵌套目录p.write_text(content)_changed_files.add(str(p)) # 共享修改记录n_lines = content.count("\n") + (1 if content and not content.endswith("\n") else 0)return f"Wrote {n_lines} lines to {file_path}"except Exception as e:return f"Error: {e}"
文件搜索(glob_tool.py+grep.py)
# corecoder/tools/glob_tool.py:28-47(核心逻辑)def execute(self, pattern: str, path: str = ".") -> str:base = Path(path).expanduser().resolve()if not base.is_dir():return f"Error: {path} is not a directory"hits = list(base.glob(pattern))# 按修改时间排序,最新的在前hits.sort(key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True)total = len(hits)shown = hits[:100] # 最多返回 100 条...
# corecoder/tools/grep.py:8_SKIP_DIRS = {".git", "node_modules", "__pycache__", ".venv", "venv", ".tox", "dist", "build"}
-
跳过 8 个”噪音目录”(.git、node_modules 等),避免搜索结果被无关文件淹没
-
最多返回 200 条匹配、遍历 5000 个文件,防止搜索超时
子代理(agent.py,59 行)
# corecoder/tools/agent.py:36-58def execute(self, task: str) -> str:if self._parent_agent is None:return "Error: agent tool not initialized (no parent agent)"from ..agent import Agent # 延迟 import,避免循环依赖parent = self._parent_agentsub = Agent(llm=parent.llm, # ① 共享 LLMtools=[t for t in parent.tools if t.name != "agent"], # ② 排除自身max_context_tokens=parent.context.max_tokens,max_rounds=20, # ③ 最多 20 轮)try:result = sub.chat(task)# 结果太长时截断,防止撑爆父级上下文if len(result) > 5000:result = result[:4500] + "\n... (sub-agent output truncated)"return f"[Sub-agent completed]\n{result}"except Exception as e:return f"Sub-agent error: {e}"
-
共享 LLM(llm=parent.llm):子代理使用同一个 LLM 实例(同一个 API key、同一个模型),不产生额外的连接开销 -
排除自身(if t.name != “agent”):防止递归——子代理不能再生成子代理,避免”俄罗斯套娃”式的无限嵌套 -
独立上下文(sub = Agent(…)创建全新实例):子代理有自己独立的messages列表,任务结束后直接销毁,不影响主 Agent -
结果截断(5000 字符上限):子代理可能产出很长的分析报告,全部塞进主 Agent 的上下文窗口得不偿失,只保留摘要
夜雨聆风