claude-code/learn/phase-2-conversation-loop.md
mingyangxu46-prog b6f37082cf
Learn/20260401 (#39)
* docs: 添加 Claude Code 源码学习笔记(第一、二阶段)

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-02 21:47:49 +08:00

30 KiB
Raw Blame History

第二阶段:核心对话循环详解

用户发一句话后,如何变成 API 请求、如何处理流式响应和工具调用

对话循环总览

用户输入 "帮我读取 README.md"
  │
  ▼
REPL.tsx: onSubmit → onQuery → onQueryImpl
  │
  ├── 1. 并行加载上下文:
  │     getSystemPrompt() + getUserContext() + getSystemContext()
  │
  ├── 2. buildEffectiveSystemPrompt() — 合成最终系统提示
  │
  ├── 3. for await (const event of query({...}))  ★ 核心循环
  │     │
  │     │  query.ts: queryLoop()
  │     │    ├── while (true) {
  │     │    │     ├── autocompact / microcompact 处理
  │     │    │     ├── deps.callModel() → claude.ts 流式 API 调用
  │     │    │     │     └── for await (message of stream) { yield message }
  │     │    │     │
  │     │    │     ├── 收集 assistant 消息中的 tool_use 块
  │     │    │     │
  │     │    │     ├── needsFollowUp?
  │     │    │     │     ├── true → 执行工具 → 收集结果 → state = next → continue
  │     │    │     │     └── false → 检查错误恢复 → return { reason: 'completed' }
  │     │    │     }
  │     │
  │     └── onQueryEvent(event) — 更新 UI 状态
  │
  └── 4. 收尾: resetLoadingState(), onTurnComplete()

两条数据路径

路径 调用方 说明
交互式REPL REPL.tsx → query() 直接调用 query() AsyncGenerator
非交互式SDK/print print.ts → QueryEngine.submitMessage()query() 通过 QueryEngine 包装增加了会话持久化、usage 跟踪等

1. query.ts1732 行)— 核心查询循环

文件路径: src/query.ts

1.1 文件结构

query.ts (1732 行)
├── [0-120]      Import 区 + feature flag 条件模块加载
├── [122-148]    yieldMissingToolResultBlocks() — 为未配对的 tool_use 生成错误 tool_result
├── [150-178]    常量与辅助函数 (MAX_OUTPUT_TOKENS_RECOVERY_LIMIT, isWithheldMaxOutputTokens)
├── [180-198]    QueryParams 类型定义
├── [200-216]    State 类型 — 循环迭代间的可变状态
├── [218-238]    query() — 导出的 AsyncGenerator委托给 queryLoop()
├── [240-1732]   queryLoop() — 核心 while(true) 循环
│   ├── [241-306]    初始化 State + 内存预取
│   ├── [307-448]    循环开头:解构 state、消息预处理snip/microcompact/context collapse
│   ├── [449-578]    系统提示构建(第449行) + autocompact(第453行) + StreamingToolExecutor 初始化(第562行)
│   ├── [650-866]    ★ deps.callModel()(第659行) + 流式响应处理 + tool_use 收集
│   ├── [896-956]    错误处理FallbackTriggeredError、通用错误
│   ├── [1002-1054]  中断处理abortController.signal.aborted
│   ├── [1065-1360]  无 followUp 时的终止/恢复逻辑
│   │   ├── prompt-too-long 恢复
│   │   ├── max_output_tokens 恢复(升级 + 多轮)
│   │   ├── stop hooks 执行
│   │   └── return { reason: 'completed' }
│   └── [1360-1732]  有 followUp 时的工具执行 + 下一轮准备
│       ├── 工具执行streaming 或 sequential
│       ├── attachment 注入(排队命令、内存预取、技能发现)
│       ├── maxTurns 检查
│       └── state = next → continue

1.2 入口query() 函数(第 219 行)

export async function* query(params: QueryParams):
  AsyncGenerator<StreamEvent | Message | ..., Terminal> {
  const consumedCommandUuids: string[] = []
  const terminal = yield* queryLoop(params, consumedCommandUuids)
  // 通知所有消费的排队命令已完成
  for (const uuid of consumedCommandUuids) {
    notifyCommandLifecycle(uuid, 'completed')
  }
  return terminal
}

query() 本身很薄,只做两件事:

  1. 委托给 queryLoop() 执行实际逻辑
  2. 在正常返回后通知排队命令的生命周期

1.3 QueryParams第 181 行)

type QueryParams = {
  messages: Message[]           // 当前对话消息
  systemPrompt: SystemPrompt    // 系统提示
  userContext: { [k: string]: string }  // 用户上下文CLAUDE.md 等)
  systemContext: { [k: string]: string }  // 系统上下文git 状态等)
  canUseTool: CanUseToolFn      // 工具权限检查函数
  toolUseContext: ToolUseContext // 工具执行上下文
  fallbackModel?: string        // 备用模型
  querySource: QuerySource      // 查询来源标识
  maxTurns?: number             // 最大轮次限制
  taskBudget?: { total: number }  // 令牌预算
}

1.4 State — 循环迭代间的可变状态(第 204 行)

type State = {
  messages: Message[]               // 累积的消息列表
  toolUseContext: ToolUseContext     // 工具执行上下文
  autoCompactTracking: ...          // 自动压缩跟踪
  maxOutputTokensRecoveryCount: number  // 输出令牌恢复尝试次数
  hasAttemptedReactiveCompact: boolean  // 是否已尝试响应式压缩
  maxOutputTokensOverride: number | undefined  // 输出令牌覆盖
  pendingToolUseSummary: Promise<...>   // 待处理的工具使用摘要
  stopHookActive: boolean | undefined   // stop hook 是否活跃
  turnCount: number                     // 当前轮次
  transition: Continue | undefined      // 上一次迭代为何 continue
}

设计关键:每次 continue 时通过 state = { ... } 一次性更新所有状态,而不是分散的 9 个赋值。transition 字段记录了为什么要继续循环(便于调试和测试)。

1.5 queryLoop() 核心流程(第 241 行)

while (true) 循环(第 307 行)的每次迭代代表一次 API 调用。循环直到:

  • 模型不需要工具调用 → return { reason: 'completed' }
  • 被用户中断 → return { reason: 'aborted_*' }
  • 达到最大轮次 → return { reason: 'max_turns' }
  • 遇到不可恢复的错误 → return { reason: 'model_error' }

步骤 1消息预处理

每次迭代开头:
  ├── 解构 state → messages, toolUseContext, tracking, ...
  ├── getMessagesAfterCompactBoundary() — 只保留压缩边界后的消息
  ├── snip 处理feature flag跳过
  ├── microcompact 处理feature flag跳过
  └── autocompact 检查 — 消息过长时自动压缩

步骤 2系统提示构建第 449 行)

const fullSystemPrompt = asSystemPrompt(
  appendSystemContext(systemPrompt, systemContext),
)

将系统上下文git 状态、日期等追加到系统提示。注意用户上下文CLAUDE.md 等)不在这里注入,而是在 deps.callModel() 调用时通过 prependUserContext(messagesForQuery, userContext) 注入到消息数组的最前面(第 660 行)。

步骤 3Autocompact第 454-543 行)

当消息历史过长时自动压缩:

autocompact 流程:
  ├── 检查 token 数量是否超过阈值
  ├── 超过 → 调用 compact API用 Haiku 总结历史)
  │   ├── yield compactBoundaryMessage  ← 标记压缩边界
  │   └── 更新 messages 为压缩后的版本
  └── 未超过 → 继续

步骤 4调用 API第 559-708 行)— 核心

StreamingToolExecutor 在第 562 行初始化API 调用在第 659 行开始:

// 第 562 行:初始化流式工具执行器
let streamingToolExecutor = useStreamingToolExecution
  ? new StreamingToolExecutor(
      toolUseContext.options.tools, canUseTool, toolUseContext,
    )
  : null

// 第 659 行:调用 API
for await (const message of deps.callModel({
  messages: prependUserContext(messagesForQuery, userContext),  // ← 用户上下文注入到消息最前面
  systemPrompt: fullSystemPrompt,
  thinkingConfig: toolUseContext.options.thinkingConfig,
  tools: toolUseContext.options.tools,
  signal: toolUseContext.abortController.signal,
  options: { model: currentModel, querySource, fallbackModel, ... }
})) {
  // 处理每条流式消息(第 708-866 行)
}

deps.callModel() 最终调用 claude.tsqueryModelWithStreaming()

步骤 5流式响应处理第 708-866 行)

处理逻辑在 for await 循环体内(第 708 行的 }) 之后到第 866 行):

for await (const message of stream):
  ├── message.type === 'assistant'?
  │   ├── 记录到 assistantMessages[]
  │   ├── 提取 tool_use 块 → toolUseBlocks[]
  │   ├── needsFollowUp = true如果有 tool_use
  │   └── streamingToolExecutor.addTool()  ← 流式工具并行执行
  │
  ├── withheld? (prompt-too-long / max_output_tokens)
  │   └── 暂扣不 yield等后面恢复逻辑处理
  │
  └── yield message  ← 正常 yield 给上层REPL/QueryEngine

StreamingToolExecutor:在 API 流式返回的同时就开始执行工具(如读文件),不等流结束。通过 addTool() 添加待执行工具,getCompletedResults() 获取已完成的结果。

步骤 6A无 followUp — 终止/恢复(第 1065-1360 行)

当模型没有请求工具调用时(needsFollowUp === false

无 followUp:
  ├── prompt-too-long 恢复?
  │   ├── context collapse drainfeature flag跳过
  │   ├── reactive compact → 压缩消息重试
  │   └── 都失败 → yield 错误 + return
  │
  ├── max_output_tokens 恢复?
  │   ├── 第一次 → 升级到 64k token 限制continue
  │   ├── 后续 → 注入恢复消息("继续,别道歉"continue
  │   └── 超过 3 次 → yield 错误 + return
  │
  ├── stop hooks 执行
  │   ├── preventContinuation? → return
  │   └── blockingErrors? → 将错误加入消息continue
  │
  └── return { reason: 'completed' }  ★ 正常结束

恢复消息内容(第 1229 行)

"Output token limit hit. Resume directly — no apology, no recap of what
you were doing. Pick up mid-thought if that is where the cut happened.
Break remaining work into smaller pieces."

步骤 6B有 followUp — 工具执行 + 下一轮(第 1363-1731 行)

当模型请求了工具调用时(needsFollowUp === true

有 followUp:
  ├── 工具执行(两种模式)
  │   ├── streamingToolExecutor? → getRemainingResults()(流式已启动)
  │   └── 否 → runTools()(传统顺序执行)
  │
  ├── for await (const update of toolUpdates):
  │   ├── yield update.message  ← 工具结果消息
  │   └── toolResults.push(...)  ← 收集工具结果
  │
  ├── 中断检查abortController.signal.aborted
  │   └── return { reason: 'aborted_tools' }
  │
  ├── attachment 注入
  │   ├── 排队命令(其他线程提交的消息)
  │   ├── 内存预取(相关记忆文件)
  │   └── 技能发现预取
  │
  ├── maxTurns 检查
  │   └── 超过 → yield max_turns_reached + return
  │
  └── state = { messages: [...old, ...assistant, ...toolResults], turnCount: +1 }
      → continue  ★ 回到循环顶部,发起下一次 API 调用

1.6 错误处理与模型降级(第 897-956 行)

API 调用出错:
  ├── FallbackTriggeredError529 过载)?
  │   ├── 切换到 fallbackModel
  │   ├── 清空本轮 assistant/tool 消息
  │   ├── yield 系统消息 "Switched to X due to high demand for Y"
  │   └── continue重试整个请求
  │
  └── 其他错误
      ├── ImageSizeError/ImageResizeError → yield 友好错误 + return
      ├── yieldMissingToolResultBlocks() — 补全未配对的 tool_result
      └── yield API 错误消息 + return

1.7 关键设计思想

设计 说明
AsyncGenerator 模式 query()async function*,通过 yield 逐条产出事件,调用者用 for await 消费
while(true) + state 对象 每次 continue 构建新 State 对象,避免分散的状态修改
transition 字段 记录为什么要 continuenext_turnmax_output_tokens_recoveryreactive_compact_retry...),便于调试
StreamingToolExecutor API 流式返回时就并行执行工具,不等流结束
Withheld 消息 可恢复错误先暂扣,恢复成功则不 yield 错误,失败才 yield

2. QueryEngine.ts1320 行)— 高层编排器

文件路径: src/QueryEngine.ts

2.1 定位

QueryEngine 是 query()上层包装,主要用于:

  • print 模式claude -p):通过 ask()QueryEngine.submitMessage()
  • SDK 模式:外部程序通过 SDK 调用
  • REPL 不用它REPL 直接调用 query()

2.2 文件结构

QueryEngine.ts (1320 行)
├── [0-130]      Import 区 + feature flag 条件模块
├── [131-174]    QueryEngineConfig 类型定义
├── [185-1202]   QueryEngine 类
│   ├── [185-208]    成员变量 + constructor
│   ├── [210-1181]   submitMessage() — 核心方法(~970 行)
│   │   ├── [210-400]    参数解析 + processUserInputContext 构建
│   │   ├── [400-465]    用户输入处理 + 会话持久化
│   │   ├── [465-660]    斜杠命令处理 + 无需查询的快速返回
│   │   ├── [660-690]    文件历史快照
│   │   ├── [679-1074]   ★ for await (const message of query({...})) — 消费 query()
│   │   └── [1074-1181]  结果提取 + yield result
│   ├── [1183-1202]  interrupt() / getMessages() / setModel() 辅助方法
├── [1210-1320]  ask() — 便捷包装函数

2.3 QueryEngineConfig

type QueryEngineConfig = {
  cwd: string                    // 工作目录
  tools: Tools                   // 工具列表
  commands: Command[]            // 斜杠命令
  mcpClients: MCPServerConnection[]  // MCP 服务器连接
  agents: AgentDefinition[]      // Agent 定义
  canUseTool: CanUseToolFn       // 权限检查
  getAppState / setAppState      // 全局状态存取
  initialMessages?: Message[]    // 初始消息(恢复对话)
  readFileCache: FileStateCache  // 文件读取缓存
  customSystemPrompt?: string    // 自定义系统提示
  thinkingConfig?: ThinkingConfig // 思考模式配置
  maxTurns?: number              // 最大轮次
  maxBudgetUsd?: number          // USD 预算上限
  jsonSchema?: Record<...>       // 结构化输出 schema
  // ... 更多配置
}

2.4 submitMessage() 核心流程

submitMessage(prompt)
  │
  ├── 1. 参数准备
  │   ├── 解构 config 获取 tools, commands, model, ...
  │   ├── 构建 wrappedCanUseTool包装权限检查跟踪拒绝
  │   ├── fetchSystemPromptParts() — 获取系统提示各部分
  │   └── 构建 processUserInputContext
  │
  ├── 2. 用户输入处理
  │   ├── processUserInput(prompt) — 解析斜杠命令 / 普通文本
  │   ├── mutableMessages.push(...messagesFromUserInput)
  │   └── recordTranscript(messages) — 持久化到 JSONL
  │
  ├── 3. yield buildSystemInitMessage() — SDK 初始化消息
  │
  ├── 4. shouldQuery === false?(斜杠命令的本地执行结果)
  │   ├── yield 命令输出
  │   ├── yield { type: 'result', subtype: 'success' }
  │   └── return
  │
  ├── 5. ★ for await (const message of query({...}))
  │   │   消费 query() 产出的每条消息
  │   │
  │   ├── message.type === 'assistant'
  │   │   ├── mutableMessages.push(msg)
  │   │   ├── recordTranscript()  ← fire-and-forget
  │   │   ├── yield* normalizeMessage(msg) — 转换为 SDK 格式
  │   │   └── 捕获 stop_reason
  │   │
  │   ├── message.type === 'user'(工具结果)
  │   │   ├── mutableMessages.push(msg)
  │   │   ├── turnCount++
  │   │   └── yield* normalizeMessage(msg)
  │   │
  │   ├── message.type === 'stream_event'
  │   │   ├── 跟踪 usagemessage_start/delta/stop
  │   │   └── includePartialMessages? → yield 流事件
  │   │
  │   ├── message.type === 'system'
  │   │   ├── compact_boundary → GC 旧消息 + yield 给 SDK
  │   │   └── api_error → yield 重试信息
  │   │
  │   └── maxBudgetUsd 检查 → 超预算则 yield error + return
  │
  └── 6. yield { type: 'result', subtype: 'success', result: textResult }

2.5 ask() 便捷函数(第 1211 行)

export async function* ask({ prompt, tools, ... }) {
  const engine = new QueryEngine({ ... })
  try {
    yield* engine.submitMessage(prompt)
  } finally {
    setReadFileCache(engine.getReadFileState())
  }
}

ask()QueryEngine 的一次性包装,创建 engine → 提交消息 → 清理。用于 print.ts--print 模式。

2.6 QueryEngine vs REPL 直接调用 query()

特性 QueryEngine (SDK/print) REPL 直接调用 query()
会话持久化 自动 recordTranscript 由 useLogMessages 处理
Usage 跟踪 内部 totalUsage 累积 由外层 cost-tracker 处理
权限拒绝跟踪 记录 permissionDenials[] 直接 UI 交互
结果格式 yield SDKMessage 格式 原始 Message 格式
消息 GC compact_boundary 后释放旧消息 UI 需要保留完整历史

3. claude.ts3420 行)— API 客户端

文件路径: src/services/api/claude.ts

3.1 文件结构

claude.ts (3420 行)
├── [0-260]      Import 区(大量 SDK 类型、工具函数)
├── [272-331]    getExtraBodyParams() — 构建额外请求体参数
├── [333-502]    缓存相关getPromptCachingEnabled, getCacheControl, should1hCacheTTL, configureEffortParams, configureTaskBudgetParams
├── [504-587]    verifyApiKey() — API 密钥验证
├── [589-675]    消息转换userMessageToMessageParam, assistantMessageToMessageParam
├── [677-708]    Options 类型定义
├── [710-781]    queryModelWithoutStreaming / queryModelWithStreaming — 公开的两个入口
├── [783-813]    辅助函数shouldDeferLspTool, getNonstreamingFallbackTimeoutMs
├── [819-918]    executeNonStreamingRequest() — 非流式请求辅助
├── [920-999]    更多辅助函数getPreviousRequestIdFromMessages, stripExcessMediaItems
├── [1018-3420]  ★ queryModel() — 核心私有函数2400 行)
│   ├── [1018-1370]   前置检查 + 工具 schema 构建 + 消息归一化 + 系统提示组装
│   ├── [1539-1730]   paramsFromContext() — 构建 API 请求参数
│   ├── [1777-2100]   withRetry + 流式 API 调用anthropic.beta.messages.create + stream
│   ├── [1941-2300]   流式事件处理for await of stream
│   └── [2300-3420]   非流式降级 + 日志、分析、清理

3.2 两个公开入口

// 入口 1流式主要路径
export async function* queryModelWithStreaming({
  messages, systemPrompt, thinkingConfig, tools, signal, options
}) {
  yield* withStreamingVCR(messages, async function* () {
    yield* queryModel(messages, systemPrompt, thinkingConfig, tools, signal, options)
  })
}

// 入口 2非流式compact 等内部用途)
export async function queryModelWithoutStreaming({
  messages, systemPrompt, thinkingConfig, tools, signal, options
}) {
  let assistantMessage
  for await (const message of ...) {
    if (message.type === 'assistant') assistantMessage = message
  }
  return assistantMessage
}

两者都委托给内部的 queryModel()withStreamingVCR 是一个 VCR录像/回放)包装器,用于调试。

3.3 Options 类型(第 677 行)

type Options = {
  getToolPermissionContext: () => Promise<ToolPermissionContext>
  model: string                      // 模型名称
  toolChoice?: BetaToolChoiceTool    // 强制使用特定工具
  isNonInteractiveSession: boolean   // 是否非交互模式
  fallbackModel?: string             // 备用模型
  querySource: QuerySource           // 查询来源
  agents: AgentDefinition[]          // Agent 定义
  enablePromptCaching?: boolean      // 启用提示缓存
  effortValue?: EffortValue          // 推理努力级别
  mcpTools: Tools                    // MCP 工具
  fastMode?: boolean                 // 快速模式
  taskBudget?: { total: number; remaining?: number }  // 令牌预算
}

3.4 queryModel() 核心流程(第 1018 行)

这是整个 API 调用的核心2400 行。关键步骤:

阶段 1前置准备1018-1400 行)

queryModel()
  ├── off-switch 检查Opus 过载时的全局关闭开关)
  ├── beta headers 组装getMergedBetas
  │   ├── 基础 betas
  │   ├── advisor beta如果启用
  │   ├── tool search beta如果启用
  │   ├── cache scope beta
  │   └── effort / task budget betas
  │
  ├── 工具过滤
  │   ├── tool search 启用 → 只包含已发现的 deferred tools
  │   └── tool search 未启用 → 过滤掉 ToolSearchTool
  │
  ├── toolToAPISchema() — 每个工具转为 API 格式
  │
  ├── normalizeMessagesForAPI() — 消息转换为 API 格式
  │   ├── UserMessage → { role: 'user', content: ... }
  │   ├── AssistantMessage → { role: 'assistant', content: ... }
  │   └── 跳过 system/attachment/progress 等内部消息类型
  │
  └── 系统提示最终组装
      ├── getAttributionHeader(fingerprint)
      ├── getCLISyspromptPrefix()
      ├── ...systemPrompt
      └── advisor 指令(如果启用)

阶段 2构建请求参数 — paramsFromContext()(第 1539-1730 行)

const paramsFromContext = (retryContext: RetryContext) => {
  // ... 动态 beta headers、effort、task budget 配置 ...
  
  // 思考模式配置adaptive 或 enabled + budget
  let thinking = undefined
  if (hasThinking && modelSupportsThinking(options.model)) {
    if (modelSupportsAdaptiveThinking(options.model)) {
      thinking = { type: 'adaptive' }
    } else {
      thinking = { type: 'enabled', budget_tokens: thinkingBudget }
    }
  }

  return {
    model: normalizeModelStringForAPI(options.model),
    messages: addCacheBreakpoints(messagesForAPI, ...),  // 带缓存标记的消息
    system,                           // 系统提示块(已构建好)
    tools: allTools,                  // 工具 schema
    tool_choice: options.toolChoice,
    max_tokens: maxOutputTokens,
    thinking,
    ...(temperature !== undefined && { temperature }),
    ...(useBetas && { betas: betasParams }),
    metadata: getAPIMetadata(),
    ...extraBodyParams,
    ...(speed !== undefined && { speed }),  // 快速模式
  }
}

阶段 3流式 API 调用(第 1779-1858 行)

// 使用 withRetry 包装,自动处理重试
const generator = withRetry(
  () => getAnthropicClient({ maxRetries: 0, model, source: querySource }),
  async (anthropic, attempt, context) => {
    const params = paramsFromContext(context)

    // ★ 核心 API 调用(第 1823 行)
    // 使用 .create() + stream: true而非 .stream()
    // 避免 BetaMessageStream 的 O(n²) partial JSON 解析开销
    const result = await anthropic.beta.messages
      .create(
        { ...params, stream: true },
        { signal, ...(clientRequestId && { headers: { ... } }) },
      )
      .withResponse()

    return result.data  // Stream<BetaRawMessageStreamEvent>
  },
  { model, fallbackModel, thinkingConfig, signal, querySource }
)

// 消费 withRetry 的系统错误消息(重试通知等)
let e
do {
  e = await generator.next()
  if (!('controller' in e.value)) yield e.value  // yield API 错误消息
} while (!e.done)
stream = e.value  // 获取最终的 Stream 对象

// 处理流式事件(第 1941 行)
for await (const part of stream) {
  switch (part.type) {
    case 'message_start':    // 记录 request_id、usage
    case 'content_block_start':  // 新的内容块开始text/thinking/tool_use
    case 'content_block_delta':  // 增量内容 → yield stream_event 给 UI
    case 'content_block_stop':   // 内容块完成 → yield AssistantMessage
    case 'message_delta':    // stop_reason、usage 更新
    case 'message_stop':     // 整条消息完成
  }
}

阶段 4withRetry 重试策略

withRetry 逻辑:
  ├── 429 (Rate Limit) → 等待 Retry-After 后重试
  ├── 529 (Overloaded) → 切换到 fallbackModelthrow FallbackTriggeredError
  ├── 500 (Server Error) → 指数退避重试
  ├── 408 (Timeout) → 重试
  ├── 其他错误 → 不重试,直接抛出
  └── 最大重试次数: 根据模型和错误类型动态计算

阶段 5非流式降级

当流式请求中途失败时,可能降级为非流式请求:

流式失败(部分响应已收到):
  ├── 已接收的内容 → yield 给上层
  ├── 剩余部分 → 降级为非流式请求anthropic.beta.messages.create
  └── 非流式结果 → 转换格式 yield

3.5 消息转换函数

// UserMessage → API 格式
userMessageToMessageParam(message, addCache, enablePromptCaching, querySource)
   { role: 'user', content: [...] }
  // addCache=true 时最后一个 content block 添加 cache_control

// AssistantMessage → API 格式
assistantMessageToMessageParam(message, addCache, enablePromptCaching, querySource)
   { role: 'assistant', content: [...] }
  // thinking/redacted_thinking 块不加 cache_control

3.6 Prompt Caching 策略

缓存策略:
  ├── cache_control: { type: 'ephemeral' }  — 默认5 分钟 TTL
  ├── cache_control: { type: 'ephemeral', ttl: '1h' }  — 订阅用户/Ant1 小时
  ├── cache_control: { ..., scope: 'global' }  — 跨会话共享(无 MCP 工具时)
  └── 禁用条件:
      ├── DISABLE_PROMPT_CACHING 环境变量
      ├── DISABLE_PROMPT_CACHING_HAIKU仅 Haiku
      └── DISABLE_PROMPT_CACHING_SONNET仅 Sonnet

3.7 多 Provider 支持

getAnthropicClient() 根据配置返回不同的 SDK 客户端:

Provider 入口 说明
Anthropic 直接 API 默认,api.anthropic.com
AWS Bedrock 通过 Bedrock 使用 @anthropic-ai/bedrock-sdk
Google Vertex 通过 Vertex 使用 @anthropic-ai/vertex-sdk
Azure 通过 Azure 类似 Bedrock 的包装

Provider 选择逻辑在 src/utils/model/providers.tsgetAPIProvider() 中。


完整数据流:一次工具调用的生命周期

以用户输入 "读取 README.md" 为例:

1. REPL.tsx: 用户按回车
   onSubmit("读取 README.md")
     └── handlePromptSubmit()
           └── onQuery([userMessage])

2. REPL.tsx: onQueryImpl()
   ├── getSystemPrompt() + getUserContext() + getSystemContext()
   └── for await (event of query({messages, systemPrompt, ...}))

3. query.ts: queryLoop() — 第 1 次迭代
   ├── messagesForQuery = [...messages]  // 包含用户消息
   ├── deps.callModel({...})
   │     └── claude.ts: queryModel()
   │           ├── 构建 API 参数
   │           └── anthropic.beta.messages.create({ ...params, stream: true })
   │
   ├── API 流式返回:
   │   content_block_start: { type: 'tool_use', name: 'Read', id: 'toolu_123' }
   │   content_block_delta: { input: '{"file_path": "/path/to/README.md"}' }
   │   content_block_stop
   │   message_delta: { stop_reason: 'tool_use' }
   │
   ├── 收集: toolUseBlocks = [{ name: 'Read', id: 'toolu_123', input: {...} }]
   ├── needsFollowUp = true
   │
   ├── 工具执行:
   │   streamingToolExecutor.getRemainingResults()
   │     └── Read 工具执行 → 返回文件内容
   │   yield toolResultMessage  ← 包含文件内容
   │
   └── state = { messages: [...old, assistantMsg, toolResultMsg], turnCount: 2 }
       → continue

4. query.ts: queryLoop() — 第 2 次迭代
   ├── messagesForQuery 现在包含:
   │   [userMsg, assistantMsg(tool_use), userMsg(tool_result)]
   │
   ├── deps.callModel({...})  ← 再次调用 API
   │
   ├── API 返回:
   │   content_block_start: { type: 'text' }
   │   content_block_delta: { text: "README.md 的内容是..." }
   │   content_block_stop
   │   message_delta: { stop_reason: 'end_turn' }
   │
   ├── toolUseBlocks = []  ← 没有工具调用
   ├── needsFollowUp = false
   │
   └── return { reason: 'completed' }  ★ 循环结束

5. REPL.tsx: onQueryEvent(event)
   ├── 更新 streamingText打字机效果
   ├── 更新 messages 数组
   └── 重新渲染 UI

关键设计模式总结

模式 位置 说明
AsyncGenerator 链式传递 query.ts → claude.ts yield* 将底层事件透传给上层,形成事件流管道
while(true) + State 对象 query.ts queryLoop 循环迭代间通过不可变 State 传递transition 字段记录原因
StreamingToolExecutor query.ts API 流式返回时并行执行工具,不等流结束
Withheld 消息 query.ts 可恢复错误先暂扣不 yield恢复成功则吞掉错误
withRetry 重试 claude.ts 429/500/529 自动重试529 触发模型降级
Prompt Caching claude.ts 缓存系统提示和历史消息,减少 API token 消耗
非流式降级 claude.ts 流式请求中途失败时降级为非流式完成剩余部分
QueryEngine 包装 QueryEngine.ts 为 SDK/print 提供会话管理、持久化、usage 跟踪

需要忽略的代码

模式 说明
feature('REACTIVE_COMPACT') / feature('CONTEXT_COLLAPSE') 所有 feature flag 保护的代码 — 全部是死代码
feature('CACHED_MICROCOMPACT') 缓存微压缩 — 死代码
feature('HISTORY_SNIP') / snipModule 历史截断 — 死代码
feature('TOKEN_BUDGET') / budgetTracker 令牌预算 — 死代码
feature('BG_SESSIONS') / taskSummaryModule 后台会话 — 死代码
process.env.USER_TYPE === 'ant' Anthropic 内部专用代码
VCR (withStreamingVCR/withVCR) 调试录像/回放包装器,不影响正常流程