乐于分享
好东西不私藏

Claude Code 源码深度拆解③ | QueryEngine:46000行的TAOR循环如何运转

Claude Code 源码深度拆解③ | QueryEngine:46000行的TAOR循环如何运转

一个只有50行核心逻辑的循环,如何驱动整个Agent系统?

一、引言:从“智能体”到“循环体”

在上一期拆解工具系统时,我们留下了一个悬念:模型调用工具的决策是谁做出的?工具执行后的结果又是如何反馈给模型的?

答案藏在Claude Code源码中最大的单文件——QueryEngine.ts里。这个文件有46000行代码,是整个Agent系统的“心脏”。但讽刺的是,它的核心循环逻辑只有大约50行。

本期我们将深入TAOR循环的每一行关键代码,不仅展示源码,更要解释为什么这样设计好在哪里有哪些坑。让我们从一个最基本的问题开始:为什么Agent需要一个循环?

二、TAOR循环的哲学:让运行时“变笨”

2.1 两种Agent架构路线

在Agent系统设计中,存在两条截然不同的路线:

路线A:框架层编排(LangChain为代表)

框架代码决定执行流程:1. 调用模型获取意图2. 根据意图选择工具3. 执行工具4. 将结果格式化后再次调用模型5. 判断是否达到终止条件6. 如果未终止,回到步骤2

这种设计的问题是:编排逻辑写死在框架里。当模型能力升级时,框架的编排逻辑可能成为瓶颈——模型想做的事,框架不允许;框架让做的事,模型不理解。

路线B:模型层自主(Claude Code为代表)

运行时只做一件事:while (未终止) {    让模型决定下一步做什么    执行模型的选择    把结果告诉模型}

这种设计的核心是:运行时不知道自己在做什么,它只是跑循环。所有决策权交给模型。

2.2 TAOR循环的定义

TAOR是一个极其简单的概念:

┌─────────────────────────────────────────────────────────┐│                     TAOR 循环                           ││                                                         ││   Think ──→ Act ──→ Observe ──→ Repeat                ││    思考       行动      观察         重复                 ││                                                         ││   - 模型思考:生成下一步的决策                           ││   - 系统行动:执行模型选择的工具                         ││   - 系统观察:收集执行结果                               ││   - 注入上下文,回到思考                                 │└─────────────────────────────────────────────────────────┘

源码中的简化表示:

// QueryEngine.ts 中的核心循环(简化自源码)async function runTAORLoop(  initialMessages: Message[],  tools: ToolRegistry,  context: ExecutionContext): Promise<LoopResult> {  let messages = [...initialMessages];  let iterationCount = 0;  let shouldContinue = true;  while (shouldContinue && iterationCount < context.maxIterations) {    // ============ THINK 阶段 ============    // 调用模型,传入当前所有消息    const response = await callModel({      messages,      tools: tools.getDefinitions(),      systemPrompt: context.systemPrompt,    });    // 分析模型的响应    const analysis = analyzeModelResponse(response);    // ============ 决策分支 ============    if (analysis.type === 'text') {      // 模型返回纯文本,说明任务完成或需要澄清      messages.push({ role: 'assistant', content: response.text });      shouldContinue = false;  // 终止循环      continue;    }    if (analysis.type === 'tool_use') {      // ============ ACT 阶段 ============      const toolCall = analysis.toolCall;      // 将模型的工具调用记录到消息历史      messages.push({        role: 'assistant',        content: null,        tool_calls: [toolCall]      });      // 执行工具      const toolResult = await executeTool(        toolCall.name,        toolCall.input,        context      );      // ============ OBSERVE 阶段 ============      // 将工具执行结果作为工具响应消息加入历史      messages.push({        role: 'tool',        tool_call_id: toolCall.id,        content: formatToolResult(toolResult)      });      // 继续循环,进入下一轮 THINK      iterationCount++;      continue;    }    if (analysis.type === 'stop') {      // 模型主动停止      shouldContinue = false;    }  }  return { messages, iterations: iterationCount };}

2.3 为什么“运行时越笨,架构越稳定”?

这个设计哲学值得深入解读:

第一层理解:职责分离

  • 运行时:负责消息传递、工具执行、状态维护(机械劳动)
  • 模型:负责决策、推理、判断何时停止(智力劳动)

这种分离意味着:当Anthropic发布更强大的Claude模型时,Claude Code的运行时代码几乎不需要修改。模型自己会变得更聪明,知道如何使用工具、何时停止。

第二层理解:避免“聪明”的陷阱LangChain早期版本有一个“智能路由”功能——框架根据用户输入自动选择调用哪个链。但实践中经常出现框架选错链、用户困惑的情况。因为框架的“智能”是基于规则的,远不如模型的理解能力。

Claude Code的做法是:我放弃在框架层做任何“智能”决策,全部交给模型。这反而让系统更可靠——因为只有一方在做决策。

第三层理解:可调试性当Agent出错时,你需要回答一个问题:是模型决策错了,还是执行出错了?

  • 在框架层编排的系统中,这两者纠缠在一起,很难定位
  • 在TAOR循环中,每一步都有明确的消息记录:模型说了什么、工具执行了什么、结果是什么。问题出在哪一环,一目了然

三、QueryEngine的核心组件深度解析

3.1 流式响应处理:边生成边执行

Claude Code的一个关键体验是“实时感”——模型生成工具调用的同时,UI就在渲染;工具执行的同时,结果就在流式返回。这背后是精细的流式处理逻辑。

// 流式处理的核心代码(基于源码推断)async function* streamModelResponse(  messages: Message[],  tools: ToolDefinition[],  context: ExecutionContext): AsyncGenerator<StreamEvent> {  const stream = await client.messages.stream({    model: context.model,    messages,    tools,    max_tokens: context.maxTokens,  });  let currentToolCall: Partial<ToolCall> | null = null;  let textBuffer = '';  for await (const event of stream) {    switch (event.type) {      case 'text_delta':        // 文本增量:立即推送给UI        textBuffer += event.delta;        yield { type: 'text', delta: event.delta };        break;      case 'tool_call_start':        // 工具调用开始:创建新的工具调用记录        currentToolCall = {          id: event.id,          name: event.name,          input: '',        };        yield { type: 'tool_start', id: event.id, name: event.name };        break;      case 'tool_call_delta':        // 工具参数增量:逐步累积JSON        if (currentToolCall) {          currentToolCall.input += event.delta;          yield {             type: 'tool_delta',             id: currentToolCall.id,             delta: event.delta           };        }        break;      case 'tool_call_end':        // 工具调用结束:解析完整参数,准备执行        if (currentToolCall) {          try {            currentToolCall.parsedInput = JSON.parse(currentToolCall.input);            yield {               type: 'tool_complete',               toolCall: currentToolCall as ToolCall             };          } catch (e) {            yield {               type: 'error',               error: `Failed to parse tool input: ${e}`             };          }        }        currentToolCall = null;        break;      case 'message_stop':        // 消息结束        yield { type: 'done' };        break;    }  }}

这个设计好在哪里?

  1. 渐进式渲染
    :用户看到文本逐字出现,工具调用逐步构建,体验流畅
  2. 提前执行
    :不需要等整个响应完成,工具调用一结束就可以立即执行
  3. 错误隔离
    :JSON解析失败不会导致整个流崩溃,可以优雅降级
  4. 背压处理
    :使用AsyncGenerator,消费端可以控制读取速度

3.2 自动重试机制:处理瞬时故障

大模型API偶尔会返回5xx错误或超时。Claude Code内置了智能重试逻辑:

// 自动重试的实现(基于源码推断)async function callModelWithRetry(  params: ModelCallParams,  context: ExecutionContext): Promise<ModelResponse> {  let lastError: Error | null = null;  let delay = context.retryConfig.initialDelay; // 通常1秒  for (let attempt = 1; attempt <= context.retryConfig.maxAttempts; attempt++) {    try {      const response = await callModelOnce(params);      // 记录成功(用于遥测)      context.telemetry.recordModelCall({        attempts: attempt,        success: true,      });      return response;    } catch (error) {      lastError = error as Error;      // 判断是否应该重试      if (!isRetryableError(error)) {        // 4xx错误通常不应重试        throw error;      }      // 记录失败(用于遥测)      context.telemetry.recordModelCall({        attempts: attempt,        success: false,        error: error.message,      });      if (attempt < context.retryConfig.maxAttempts) {        // 指数退避 + 抖动        const jitter = Math.random() * 0.3 * delay;        await sleep(delay + jitter);        delay *= context.retryConfig.backoffFactor; // 通常为2      }    }  }  throw new Error(`Model call failed after ${context.retryConfig.maxAttempts} attempts: ${lastError?.message}`);}// 判断错误是否可重试function isRetryableError(error: any): boolean {  // 5xx 服务器错误  if (error.status >= 500 && error.status < 600) return true;  // 429 速率限制  if (error.status === 429) return true;  // 网络错误  if (error.code === 'ECONNRESET' || error.code === 'ETIMEDOUT') return true;  // overloaded 错误  if (error.message?.includes('overloaded')) return true;  return false;}

这个设计好在哪里?

  1. 分类处理
    :只重试可恢复的错误,4xx错误直接失败(避免浪费配额)
  2. 指数退避+抖动
    :防止多个并发请求同时重试造成“惊群效应”
  3. 遥测记录
    :每次调用的尝试次数都被记录,用于监控API质量
  4. 明确的失败信息
    :抛出异常时包含所有尝试的历史

3.3 Token计数与预算管理

每次模型调用都需要精确控制token使用量,Claude Code实现了精细的计数逻辑:

// Token计数与预算管理(基于源码推断)class TokenBudgetManager {  private usedTokens: number = 0;  private readonly maxTokens: number;  private readonly warningThreshold: number;  constructor(maxTokens: number, warningThreshold: number = 0.8) {    this.maxTokens = maxTokens;    this.warningThreshold = maxTokens * warningThreshold;  }  // 估算消息的token数  estimateMessageTokens(messages: Message[]): number {    // 使用tiktoken或模型特定的tokenizer    let total = 0;    for (const msg of messages) {      total += this.estimateTextTokens(msg.content || '');      if (msg.tool_calls) {        total += this.estimateToolCallTokens(msg.tool_calls);      }    }    return total;  }  // 检查是否接近上限  checkBudget(estimatedTokens: number): BudgetStatus {    const projectedTotal = this.usedTokens + estimatedTokens;    if (projectedTotal > this.maxTokens) {      return {        status: 'exceeded',        current: this.usedTokens,        projected: projectedTotal,        max: this.maxTokens,        recommendation: 'trigger_compaction',      };    }    if (projectedTotal > this.maxTokens * this.warningThreshold) {      return {        status: 'warning',        current: this.usedTokens,        projected: projectedTotal,        max: this.maxTokens,        recommendation: 'prepare_compaction',      };    }    return {      status: 'ok',      current: this.usedTokens,      projected: projectedTotal,      max: this.maxTokens,    };  }  // 预留缓冲空间  reserveBuffer(tokens: number): number {    // 始终保留一定比例给模型响应    const reserved = Math.min(tokens, this.maxTokens - this.usedTokens - 500);    this.usedTokens += reserved;    return reserved;  }}

这个设计好在哪里?

  1. 预警而非硬限
    :达到80%时警告,而非等到100%才报错
  2. 预留响应空间
    :不仅要计算输入token,还要为模型输出预留空间
  3. 触发压缩的决策依据
    :budget检查结果直接作为是否触发AutoCompact的依据

3.4 思考模式(Thinking Mode)的实现

Claude Code支持让模型在回答前进行“思考”——生成内部推理token,这些token不计入最终上下文:

// 思考模式的实现(基于源码推断)interface ThinkingConfig {  enabled: boolean;  budget: number;  // 思考可用的token数  mode: 'auto' | 'forced' | 'disabled';}async function handleThinkingMode(  messages: Message[],  config: ThinkingConfig,  context: ExecutionContext): Promise<ThinkingResult> {  if (!config.enabled) {    return { messages, thinkingUsed: 0 };  }  // 构建包含thinking指令的系统提示  const thinkingPrompt = `You are allowed to think step-by-step before responding.Your thinking will not be shown to the user and will not count against the context limit.Thinking budget: ${config.budget} tokens.Use this space to:- Break down complex problems- Plan multi-step tool sequences- Verify assumptions before acting`;  const response = await client.messages.create({    model: context.model,    messages,    system: context.systemPrompt + '\n\n' + thinkingPrompt,    thinking: {      type: 'enabled',      budget_tokens: config.budget,    },    max_tokens: context.maxTokens,  });  // thinking token不计入消息历史  // 只将最终响应加入上下文  messages.push({    role: 'assistant',    content: response.content,  });  return {    messages,    thinkingUsed: response.thinking?.tokens_used || 0,  };}

这个设计好在哪里?

  1. 零污染
    :思考内容不计入上下文,不占用宝贵的context window
  2. 按需启用
    :只有复杂任务才开启,简单任务跳过(节省延迟和成本)
  3. 预算控制
    :思考也有token预算,防止无限思考

四、消息历史的精细管理

4.1 消息数组的结构

Claude Code的消息历史管理非常精细,每一轮交互都被精确记录:

// 消息历史的结构示例const messageHistory: Message[] = [  // 第一条:用户输入  {     role: 'user',     content: '帮我找出项目中所有未使用的依赖'   },  // 模型决定使用工具  {     role: 'assistant',     content: null,    tool_calls: [{      id: 'call_123',      type: 'function',      function: {        name: 'Bash',        arguments: '{"command": "npx depcheck --json"}'      }    }]  },  // 工具执行结果  {     role: 'tool',     tool_call_id: 'call_123',    content: '{"unused": ["lodash", "moment", "axios"]}'  },  // 模型分析结果并给出回答  {     role: 'assistant',     content: '发现3个未使用的依赖:lodash、moment、axios。建议运行 npm uninstall 移除。'  },];

4.2 消息裁剪策略

随着对话增长,消息历史会超过context window。QueryEngine实现了智能裁剪:

// 消息裁剪策略(基于源码推断)interface TrimStrategy {  // 必须保留的消息索引  protectedIndices: Set<number>;  // 最大保留消息数  maxMessages: number;  // 摘要生成器  summarizer?: (messages: Message[]) => Promise<string>;}function trimMessageHistory(  messages: Message[],  strategy: TrimStrategy): Message[] {  if (messages.length <= strategy.maxMessages) {    return messages;  }  const trimmed: Message[] = [];  const protectedSet = strategy.protectedIndices;  // 规则1:系统消息始终保留(如果有)  // 规则2:最近N条消息优先保留  // 规则3:标记为protected的消息必须保留  // 规则4:包含关键决策的tool_call不能丢弃  const recentCount = Math.floor(strategy.maxMessages * 0.6);  const olderCount = strategy.maxMessages - recentCount;  // 保留最近的recentCount条  for (let i = messages.length - recentCount; i < messages.length; i++) {    trimmed.push(messages[i]);  }  // 从前面挑选olderCount条重要的  const olderMessages = messages.slice(0, messages.length - recentCount);  const importantOlder = selectImportantMessages(olderMessages, olderCount, protectedSet);  // 按时间顺序合并  return [...importantOlder, ...trimmed];}function selectImportantMessages(  messages: Message[],  count: number,  protectedIndices: Set<number>): Message[] {  const scored: Array<{ index: number; score: number }> = [];  for (let i = 0; i < messages.length; i++) {    const msg = messages[i];    let score = 0;    // 保护标记    if (protectedIndices.has(i)) {      score += 1000;    }    // 用户消息更重要    if (msg.role === 'user') {      score += 10;    }    // 包含错误的消息更重要    if (msg.content?.includes('error') || msg.content?.includes('Error')) {      score += 5;    }    // 工具调用结果更重要(包含执行结果)    if (msg.role === 'tool') {      score += 3;    }    scored.push({ index: i, score });  }  // 按分数排序,取前count个,再按原始顺序排序  return scored    .sort((a, b) => b.score - a.score)    .slice(0, count)    .sort((a, b) => a.index - b.index)    .map(item => messages[item.index]);}

这个设计好在哪里?

  1. 分区域保留
    :近期消息全保留(上下文连贯),远期消息精选保留(关键信息不丢)
  2. 重要性评分
    :不是简单截断,而是根据消息类型和内容评分
  3. 保护机制
    :允许上层标记“这条消息绝对不能丢”

五、自愈查询循环:Context压缩的触发机制

5.1 为什么需要自愈?

在第四期我们将详细拆解三层压缩策略,这里先看QueryEngine如何触发压缩:

// 自愈循环的触发逻辑(基于源码推断)async function runWithSelfHealing(  messages: Message[],  context: ExecutionContext): Promise<LoopResult> {  const budgetManager = new TokenBudgetManager(context.maxContextTokens);  while (true) {    // 检查token预算    const estimated = budgetManager.estimateMessageTokens(messages);    const budgetStatus = budgetManager.checkBudget(estimated);    if (budgetStatus.status === 'exceeded') {      // 触发AutoCompact      console.log(`[QueryEngine] Context budget exceeded, triggering compaction...`);      const compactResult = await context.compactor.compact(messages, {        type: 'auto',        reserveTokens: 13000,  // 预留缓冲      });      messages = compactResult.messages;      continue;  // 压缩后重新进入循环    }    if (budgetStatus.status === 'warning') {      // 接近上限,提醒但继续      console.log(`[QueryEngine] Context budget warning: ${estimated}/${context.maxContextTokens}`);    }    // 正常的TAOR循环    try {      return await runTAORLoop(messages, context.tools, context);    } catch (error) {      // 检查是否是可恢复的错误      if (isContextLengthError(error)) {        // Token超限,触发压缩后重试        const compactResult = await context.compactor.compact(messages, {          type: 'emergency',        });        messages = compactResult.messages;        continue;      }      throw error;    }  }}

5.2 熔断器设计:防止无限压缩循环

源码中一个值得注意的设计是熔断器:

// 熔断器实现(基于源码推断)class CompactionCircuitBreaker {  private failureCount: number = 0;  private readonly maxFailures: number = 3;  private lastFailureTime: number = 0;  private readonly cooldownMs: number = 60000; // 1分钟  async compact(    messages: Message[],    compactor: Compactor  ): Promise<Message[]> {    // 检查熔断状态    if (this.failureCount >= this.maxFailures) {      const timeSinceLastFailure = Date.now() - this.lastFailureTime;      if (timeSinceLastFailure < this.cooldownMs) {        throw new Error(          `Compaction circuit breaker is OPEN. ` +          `${this.failureCount} failures in a row. ` +          `Try again in ${Math.ceil((this.cooldownMs - timeSinceLastFailure) / 1000)}s.`        );      }      // 冷却期已过,半开状态      this.failureCount = 0;    }    try {      const result = await compactor.compact(messages);      // 成功,重置计数器      this.failureCount = 0;      return result;    } catch (error) {      // 失败,增加计数      this.failureCount++;      this.lastFailureTime = Date.now();      console.warn(        `[CircuitBreaker] Compaction failed (${this.failureCount}/${this.maxFailures}): ${error}`      );      throw error;    }  }}

这个熔断器的价值(用数据说话):

源码注释显示:2026年3月10日之前,压缩功能没有重试上限。一个bug导致压缩持续失败时,系统会无限重试。记录显示单会话最高3272次连续失败,全局每天浪费约25万次API调用。加入熔断器后,这个问题被彻底解决。

六、对Agent开发的核心启示

6.1 四条可迁移的设计原则

原则一:循环要薄,决策要沉

// ❌ 不要在循环里写业务逻辑while (true) {  if (intent === 'write_code') {    await writeCode();  } else if (intent === 'run_test') {    await runTest();  }  // 永远加不完的if-else...}// ✅ 只驱动循环,决策交给模型while (true) {  const decision = await model.decide(messages, tools);  await execute(decision);  messages.push(decision.result);}

原则二:消息历史是Agent的记忆体

  • 每一条消息都要仔细设计结构
  • 工具调用和工具结果必须成对出现
  • 考虑消息的重要性评分和裁剪策略

原则三:流式处理决定用户体验

  • 边生成边渲染 > 等待完整响应
  • 工具调用可以提前执行
  • 错误不应该中断整个流

原则四:自愈能力是生产级的标志

  • 检测异常状态(如token超限)
  • 自动触发修复流程(如压缩)
  • 加上熔断器防止雪崩

6.2 一个最小化的TAOR实现

// 你可以立刻使用的简化版TAOR循环async function simpleTAOR(  userQuery: string,  tools: Tool[],  model: ModelClient): Promise<string> {  const messages: Message[] = [    { role: 'user', content: userQuery }  ];  let iterations = 0;  const MAX_ITERATIONS = 10;  while (iterations < MAX_ITERATIONS) {    // Think    const response = await model.chat({      messages,      tools: tools.map(t => t.definition),    });    // 检查是否需要调用工具    if (response.toolCalls && response.toolCalls.length > 0) {      // 记录模型的工具调用      messages.push({        role: 'assistant',        content: null,        tool_calls: response.toolCalls,      });      // Act & Observe      for (const toolCall of response.toolCalls) {        const tool = tools.find(t => t.name === toolCall.name);        if (!tool) {          throw new Error(`Unknown tool: ${toolCall.name}`);        }        const result = await tool.execute(toolCall.arguments);        messages.push({          role: 'tool',          tool_call_id: toolCall.id,          content: JSON.stringify(result),        });      }      iterations++;      continue;  // 回到Think    }    // 没有工具调用,说明任务完成    messages.push({      role: 'assistant',      content: response.content,    });    return response.content;  }  throw new Error(`Exceeded max iterations (${MAX_ITERATIONS})`);}

七、小结与下一期预告

QueryEngine的46000行代码,核心只有50行的TAOR循环。这个设计告诉我们:

  1. 运行时越“笨”,系统越稳定
    ——把智能下沉给模型,把确定性留给框架
  2. 循环只是骨架,周边组件才是血肉
    ——流式处理、重试、token管理、自愈压缩,这些才是生产级系统的标志
  3. 消息历史的设计是隐形的架构决策
    ——它决定了Agent能记住什么、忘记什么

下一期,我们将深入Context压缩系统,看Claude Code如何用三层策略(MicroCompact、AutoCompact、Full Compact)把Context Window从一个“限制”变成可管理的“资源”。那个3272次失败的熔断器故事,我们会在第四期展开讲。


上一篇回顾:Claude Code 源码深度拆解② | 40+工具模块:Bash作为通用适配器的设计哲学

下一篇预告:Claude Code 源码深度拆解④ | 三层压缩策略:把Context Window变成可管理资源


延伸思考:如果你的Agent循环需要支持“人在回路”(执行危险操作前等待用户确认),TAOR循环应该如何修改?欢迎在评论区分享你的设计。