claude-code/learn/phase-2-conversation-loop.md
mingyangxu46-prog b6f37082cf
Learn/20260401 (#39)
* docs: 添加 Claude Code 源码学习笔记(第一、二阶段)

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-02 21:47:49 +08:00

774 lines
30 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 第二阶段:核心对话循环详解
> 用户发一句话后,如何变成 API 请求、如何处理流式响应和工具调用
## 对话循环总览
```
用户输入 "帮我读取 README.md"
REPL.tsx: onSubmit → onQuery → onQueryImpl
├── 1. 并行加载上下文:
│ getSystemPrompt() + getUserContext() + getSystemContext()
├── 2. buildEffectiveSystemPrompt() — 合成最终系统提示
├── 3. for await (const event of query({...})) ★ 核心循环
│ │
│ │ query.ts: queryLoop()
│ │ ├── while (true) {
│ │ │ ├── autocompact / microcompact 处理
│ │ │ ├── deps.callModel() → claude.ts 流式 API 调用
│ │ │ │ └── for await (message of stream) { yield message }
│ │ │ │
│ │ │ ├── 收集 assistant 消息中的 tool_use 块
│ │ │ │
│ │ │ ├── needsFollowUp?
│ │ │ │ ├── true → 执行工具 → 收集结果 → state = next → continue
│ │ │ │ └── false → 检查错误恢复 → return { reason: 'completed' }
│ │ │ }
│ │
│ └── onQueryEvent(event) — 更新 UI 状态
└── 4. 收尾: resetLoadingState(), onTurnComplete()
```
### 两条数据路径
| 路径 | 调用方 | 说明 |
|------|--------|------|
| **交互式REPL** | REPL.tsx → `query()` | 直接调用 `query()` AsyncGenerator |
| **非交互式SDK/print** | print.ts → `QueryEngine.submitMessage()``query()` | 通过 QueryEngine 包装增加了会话持久化、usage 跟踪等 |
---
## 1. query.ts1732 行)— 核心查询循环
**文件路径**: `src/query.ts`
### 1.1 文件结构
```
query.ts (1732 行)
├── [0-120] Import 区 + feature flag 条件模块加载
├── [122-148] yieldMissingToolResultBlocks() — 为未配对的 tool_use 生成错误 tool_result
├── [150-178] 常量与辅助函数 (MAX_OUTPUT_TOKENS_RECOVERY_LIMIT, isWithheldMaxOutputTokens)
├── [180-198] QueryParams 类型定义
├── [200-216] State 类型 — 循环迭代间的可变状态
├── [218-238] query() — 导出的 AsyncGenerator委托给 queryLoop()
├── [240-1732] queryLoop() — 核心 while(true) 循环
│ ├── [241-306] 初始化 State + 内存预取
│ ├── [307-448] 循环开头:解构 state、消息预处理snip/microcompact/context collapse
│ ├── [449-578] 系统提示构建(第449行) + autocompact(第453行) + StreamingToolExecutor 初始化(第562行)
│ ├── [650-866] ★ deps.callModel()(第659行) + 流式响应处理 + tool_use 收集
│ ├── [896-956] 错误处理FallbackTriggeredError、通用错误
│ ├── [1002-1054] 中断处理abortController.signal.aborted
│ ├── [1065-1360] 无 followUp 时的终止/恢复逻辑
│ │ ├── prompt-too-long 恢复
│ │ ├── max_output_tokens 恢复(升级 + 多轮)
│ │ ├── stop hooks 执行
│ │ └── return { reason: 'completed' }
│ └── [1360-1732] 有 followUp 时的工具执行 + 下一轮准备
│ ├── 工具执行streaming 或 sequential
│ ├── attachment 注入(排队命令、内存预取、技能发现)
│ ├── maxTurns 检查
│ └── state = next → continue
```
### 1.2 入口query() 函数(第 219 行)
```ts
export async function* query(params: QueryParams):
AsyncGenerator<StreamEvent | Message | ..., Terminal> {
const consumedCommandUuids: string[] = []
const terminal = yield* queryLoop(params, consumedCommandUuids)
// 通知所有消费的排队命令已完成
for (const uuid of consumedCommandUuids) {
notifyCommandLifecycle(uuid, 'completed')
}
return terminal
}
```
`query()` 本身很薄,只做两件事:
1. 委托给 `queryLoop()` 执行实际逻辑
2. 在正常返回后通知排队命令的生命周期
### 1.3 QueryParams第 181 行)
```ts
type QueryParams = {
messages: Message[] // 当前对话消息
systemPrompt: SystemPrompt // 系统提示
userContext: { [k: string]: string } // 用户上下文CLAUDE.md 等)
systemContext: { [k: string]: string } // 系统上下文git 状态等)
canUseTool: CanUseToolFn // 工具权限检查函数
toolUseContext: ToolUseContext // 工具执行上下文
fallbackModel?: string // 备用模型
querySource: QuerySource // 查询来源标识
maxTurns?: number // 最大轮次限制
taskBudget?: { total: number } // 令牌预算
}
```
### 1.4 State — 循环迭代间的可变状态(第 204 行)
```ts
type State = {
messages: Message[] // 累积的消息列表
toolUseContext: ToolUseContext // 工具执行上下文
autoCompactTracking: ... // 自动压缩跟踪
maxOutputTokensRecoveryCount: number // 输出令牌恢复尝试次数
hasAttemptedReactiveCompact: boolean // 是否已尝试响应式压缩
maxOutputTokensOverride: number | undefined // 输出令牌覆盖
pendingToolUseSummary: Promise<...> // 待处理的工具使用摘要
stopHookActive: boolean | undefined // stop hook 是否活跃
turnCount: number // 当前轮次
transition: Continue | undefined // 上一次迭代为何 continue
}
```
**设计关键**:每次 `continue` 时通过 `state = { ... }` 一次性更新所有状态,而不是分散的 9 个赋值。`transition` 字段记录了为什么要继续循环(便于调试和测试)。
### 1.5 queryLoop() 核心流程(第 241 行)
`while (true)` 循环(第 307 行)的每次迭代代表一次 API 调用。循环直到:
- 模型不需要工具调用 → `return { reason: 'completed' }`
- 被用户中断 → `return { reason: 'aborted_*' }`
- 达到最大轮次 → `return { reason: 'max_turns' }`
- 遇到不可恢复的错误 → `return { reason: 'model_error' }`
#### 步骤 1消息预处理
```
每次迭代开头:
├── 解构 state → messages, toolUseContext, tracking, ...
├── getMessagesAfterCompactBoundary() — 只保留压缩边界后的消息
├── snip 处理feature flag跳过
├── microcompact 处理feature flag跳过
└── autocompact 检查 — 消息过长时自动压缩
```
#### 步骤 2系统提示构建第 449 行)
```ts
const fullSystemPrompt = asSystemPrompt(
appendSystemContext(systemPrompt, systemContext),
)
```
将系统上下文git 状态、日期等追加到系统提示。注意用户上下文CLAUDE.md 等)不在这里注入,而是在 `deps.callModel()` 调用时通过 `prependUserContext(messagesForQuery, userContext)` 注入到消息数组的最前面(第 660 行)。
#### 步骤 3Autocompact第 454-543 行)
当消息历史过长时自动压缩:
```
autocompact 流程:
├── 检查 token 数量是否超过阈值
├── 超过 → 调用 compact API用 Haiku 总结历史)
│ ├── yield compactBoundaryMessage ← 标记压缩边界
│ └── 更新 messages 为压缩后的版本
└── 未超过 → 继续
```
#### 步骤 4调用 API第 559-708 行)— 核心
StreamingToolExecutor 在第 562 行初始化API 调用在第 659 行开始:
```ts
// 第 562 行:初始化流式工具执行器
let streamingToolExecutor = useStreamingToolExecution
? new StreamingToolExecutor(
toolUseContext.options.tools, canUseTool, toolUseContext,
)
: null
// 第 659 行:调用 API
for await (const message of deps.callModel({
messages: prependUserContext(messagesForQuery, userContext), // ← 用户上下文注入到消息最前面
systemPrompt: fullSystemPrompt,
thinkingConfig: toolUseContext.options.thinkingConfig,
tools: toolUseContext.options.tools,
signal: toolUseContext.abortController.signal,
options: { model: currentModel, querySource, fallbackModel, ... }
})) {
// 处理每条流式消息(第 708-866 行)
}
```
`deps.callModel()` 最终调用 `claude.ts``queryModelWithStreaming()`
#### 步骤 5流式响应处理第 708-866 行)
处理逻辑在 `for await` 循环体内(第 708 行的 `})` 之后到第 866 行):
```
for await (const message of stream):
├── message.type === 'assistant'?
│ ├── 记录到 assistantMessages[]
│ ├── 提取 tool_use 块 → toolUseBlocks[]
│ ├── needsFollowUp = true如果有 tool_use
│ └── streamingToolExecutor.addTool() ← 流式工具并行执行
├── withheld? (prompt-too-long / max_output_tokens)
│ └── 暂扣不 yield等后面恢复逻辑处理
└── yield message ← 正常 yield 给上层REPL/QueryEngine
```
**StreamingToolExecutor**:在 API 流式返回的同时就开始执行工具(如读文件),不等流结束。通过 `addTool()` 添加待执行工具,`getCompletedResults()` 获取已完成的结果。
#### 步骤 6A无 followUp — 终止/恢复(第 1065-1360 行)
当模型没有请求工具调用时(`needsFollowUp === false`
```
无 followUp:
├── prompt-too-long 恢复?
│ ├── context collapse drainfeature flag跳过
│ ├── reactive compact → 压缩消息重试
│ └── 都失败 → yield 错误 + return
├── max_output_tokens 恢复?
│ ├── 第一次 → 升级到 64k token 限制continue
│ ├── 后续 → 注入恢复消息("继续,别道歉"continue
│ └── 超过 3 次 → yield 错误 + return
├── stop hooks 执行
│ ├── preventContinuation? → return
│ └── blockingErrors? → 将错误加入消息continue
└── return { reason: 'completed' } ★ 正常结束
```
**恢复消息内容(第 1229 行)**
```
"Output token limit hit. Resume directly — no apology, no recap of what
you were doing. Pick up mid-thought if that is where the cut happened.
Break remaining work into smaller pieces."
```
#### 步骤 6B有 followUp — 工具执行 + 下一轮(第 1363-1731 行)
当模型请求了工具调用时(`needsFollowUp === true`
```
有 followUp:
├── 工具执行(两种模式)
│ ├── streamingToolExecutor? → getRemainingResults()(流式已启动)
│ └── 否 → runTools()(传统顺序执行)
├── for await (const update of toolUpdates):
│ ├── yield update.message ← 工具结果消息
│ └── toolResults.push(...) ← 收集工具结果
├── 中断检查abortController.signal.aborted
│ └── return { reason: 'aborted_tools' }
├── attachment 注入
│ ├── 排队命令(其他线程提交的消息)
│ ├── 内存预取(相关记忆文件)
│ └── 技能发现预取
├── maxTurns 检查
│ └── 超过 → yield max_turns_reached + return
└── state = { messages: [...old, ...assistant, ...toolResults], turnCount: +1 }
→ continue ★ 回到循环顶部,发起下一次 API 调用
```
### 1.6 错误处理与模型降级(第 897-956 行)
```
API 调用出错:
├── FallbackTriggeredError529 过载)?
│ ├── 切换到 fallbackModel
│ ├── 清空本轮 assistant/tool 消息
│ ├── yield 系统消息 "Switched to X due to high demand for Y"
│ └── continue重试整个请求
└── 其他错误
├── ImageSizeError/ImageResizeError → yield 友好错误 + return
├── yieldMissingToolResultBlocks() — 补全未配对的 tool_result
└── yield API 错误消息 + return
```
### 1.7 关键设计思想
| 设计 | 说明 |
|------|------|
| **AsyncGenerator 模式** | `query()``async function*`,通过 `yield` 逐条产出事件,调用者用 `for await` 消费 |
| **while(true) + state 对象** | 每次 `continue` 构建新 State 对象,避免分散的状态修改 |
| **transition 字段** | 记录为什么要 continue`next_turn``max_output_tokens_recovery``reactive_compact_retry`...),便于调试 |
| **StreamingToolExecutor** | API 流式返回时就并行执行工具,不等流结束 |
| **Withheld 消息** | 可恢复错误先暂扣,恢复成功则不 yield 错误,失败才 yield |
---
## 2. QueryEngine.ts1320 行)— 高层编排器
**文件路径**: `src/QueryEngine.ts`
### 2.1 定位
QueryEngine 是 `query()` 的**上层包装**,主要用于:
- **print 模式**`claude -p`):通过 `ask()``QueryEngine.submitMessage()`
- **SDK 模式**:外部程序通过 SDK 调用
- **REPL 不用它**REPL 直接调用 `query()`
### 2.2 文件结构
```
QueryEngine.ts (1320 行)
├── [0-130] Import 区 + feature flag 条件模块
├── [131-174] QueryEngineConfig 类型定义
├── [185-1202] QueryEngine 类
│ ├── [185-208] 成员变量 + constructor
│ ├── [210-1181] submitMessage() — 核心方法(~970 行)
│ │ ├── [210-400] 参数解析 + processUserInputContext 构建
│ │ ├── [400-465] 用户输入处理 + 会话持久化
│ │ ├── [465-660] 斜杠命令处理 + 无需查询的快速返回
│ │ ├── [660-690] 文件历史快照
│ │ ├── [679-1074] ★ for await (const message of query({...})) — 消费 query()
│ │ └── [1074-1181] 结果提取 + yield result
│ ├── [1183-1202] interrupt() / getMessages() / setModel() 辅助方法
├── [1210-1320] ask() — 便捷包装函数
```
### 2.3 QueryEngineConfig
```ts
type QueryEngineConfig = {
cwd: string // 工作目录
tools: Tools // 工具列表
commands: Command[] // 斜杠命令
mcpClients: MCPServerConnection[] // MCP 服务器连接
agents: AgentDefinition[] // Agent 定义
canUseTool: CanUseToolFn // 权限检查
getAppState / setAppState // 全局状态存取
initialMessages?: Message[] // 初始消息(恢复对话)
readFileCache: FileStateCache // 文件读取缓存
customSystemPrompt?: string // 自定义系统提示
thinkingConfig?: ThinkingConfig // 思考模式配置
maxTurns?: number // 最大轮次
maxBudgetUsd?: number // USD 预算上限
jsonSchema?: Record<...> // 结构化输出 schema
// ... 更多配置
}
```
### 2.4 submitMessage() 核心流程
```
submitMessage(prompt)
├── 1. 参数准备
│ ├── 解构 config 获取 tools, commands, model, ...
│ ├── 构建 wrappedCanUseTool包装权限检查跟踪拒绝
│ ├── fetchSystemPromptParts() — 获取系统提示各部分
│ └── 构建 processUserInputContext
├── 2. 用户输入处理
│ ├── processUserInput(prompt) — 解析斜杠命令 / 普通文本
│ ├── mutableMessages.push(...messagesFromUserInput)
│ └── recordTranscript(messages) — 持久化到 JSONL
├── 3. yield buildSystemInitMessage() — SDK 初始化消息
├── 4. shouldQuery === false?(斜杠命令的本地执行结果)
│ ├── yield 命令输出
│ ├── yield { type: 'result', subtype: 'success' }
│ └── return
├── 5. ★ for await (const message of query({...}))
│ │ 消费 query() 产出的每条消息
│ │
│ ├── message.type === 'assistant'
│ │ ├── mutableMessages.push(msg)
│ │ ├── recordTranscript() ← fire-and-forget
│ │ ├── yield* normalizeMessage(msg) — 转换为 SDK 格式
│ │ └── 捕获 stop_reason
│ │
│ ├── message.type === 'user'(工具结果)
│ │ ├── mutableMessages.push(msg)
│ │ ├── turnCount++
│ │ └── yield* normalizeMessage(msg)
│ │
│ ├── message.type === 'stream_event'
│ │ ├── 跟踪 usagemessage_start/delta/stop
│ │ └── includePartialMessages? → yield 流事件
│ │
│ ├── message.type === 'system'
│ │ ├── compact_boundary → GC 旧消息 + yield 给 SDK
│ │ └── api_error → yield 重试信息
│ │
│ └── maxBudgetUsd 检查 → 超预算则 yield error + return
└── 6. yield { type: 'result', subtype: 'success', result: textResult }
```
### 2.5 ask() 便捷函数(第 1211 行)
```ts
export async function* ask({ prompt, tools, ... }) {
const engine = new QueryEngine({ ... })
try {
yield* engine.submitMessage(prompt)
} finally {
setReadFileCache(engine.getReadFileState())
}
}
```
`ask()``QueryEngine` 的一次性包装,创建 engine → 提交消息 → 清理。用于 `print.ts``--print` 模式。
### 2.6 QueryEngine vs REPL 直接调用 query()
| 特性 | QueryEngine (SDK/print) | REPL 直接调用 query() |
|------|------------------------|---------------------|
| 会话持久化 | 自动 recordTranscript | 由 useLogMessages 处理 |
| Usage 跟踪 | 内部 totalUsage 累积 | 由外层 cost-tracker 处理 |
| 权限拒绝跟踪 | 记录 permissionDenials[] | 直接 UI 交互 |
| 结果格式 | yield SDKMessage 格式 | 原始 Message 格式 |
| 消息 GC | compact_boundary 后释放旧消息 | UI 需要保留完整历史 |
---
## 3. claude.ts3420 行)— API 客户端
**文件路径**: `src/services/api/claude.ts`
### 3.1 文件结构
```
claude.ts (3420 行)
├── [0-260] Import 区(大量 SDK 类型、工具函数)
├── [272-331] getExtraBodyParams() — 构建额外请求体参数
├── [333-502] 缓存相关getPromptCachingEnabled, getCacheControl, should1hCacheTTL, configureEffortParams, configureTaskBudgetParams
├── [504-587] verifyApiKey() — API 密钥验证
├── [589-675] 消息转换userMessageToMessageParam, assistantMessageToMessageParam
├── [677-708] Options 类型定义
├── [710-781] queryModelWithoutStreaming / queryModelWithStreaming — 公开的两个入口
├── [783-813] 辅助函数shouldDeferLspTool, getNonstreamingFallbackTimeoutMs
├── [819-918] executeNonStreamingRequest() — 非流式请求辅助
├── [920-999] 更多辅助函数getPreviousRequestIdFromMessages, stripExcessMediaItems
├── [1018-3420] ★ queryModel() — 核心私有函数2400 行)
│ ├── [1018-1370] 前置检查 + 工具 schema 构建 + 消息归一化 + 系统提示组装
│ ├── [1539-1730] paramsFromContext() — 构建 API 请求参数
│ ├── [1777-2100] withRetry + 流式 API 调用anthropic.beta.messages.create + stream
│ ├── [1941-2300] 流式事件处理for await of stream
│ └── [2300-3420] 非流式降级 + 日志、分析、清理
```
### 3.2 两个公开入口
```ts
// 入口 1流式主要路径
export async function* queryModelWithStreaming({
messages, systemPrompt, thinkingConfig, tools, signal, options
}) {
yield* withStreamingVCR(messages, async function* () {
yield* queryModel(messages, systemPrompt, thinkingConfig, tools, signal, options)
})
}
// 入口 2非流式compact 等内部用途)
export async function queryModelWithoutStreaming({
messages, systemPrompt, thinkingConfig, tools, signal, options
}) {
let assistantMessage
for await (const message of ...) {
if (message.type === 'assistant') assistantMessage = message
}
return assistantMessage
}
```
两者都委托给内部的 `queryModel()``withStreamingVCR` 是一个 VCR录像/回放)包装器,用于调试。
### 3.3 Options 类型(第 677 行)
```ts
type Options = {
getToolPermissionContext: () => Promise<ToolPermissionContext>
model: string // 模型名称
toolChoice?: BetaToolChoiceTool // 强制使用特定工具
isNonInteractiveSession: boolean // 是否非交互模式
fallbackModel?: string // 备用模型
querySource: QuerySource // 查询来源
agents: AgentDefinition[] // Agent 定义
enablePromptCaching?: boolean // 启用提示缓存
effortValue?: EffortValue // 推理努力级别
mcpTools: Tools // MCP 工具
fastMode?: boolean // 快速模式
taskBudget?: { total: number; remaining?: number } // 令牌预算
}
```
### 3.4 queryModel() 核心流程(第 1018 行)
这是整个 API 调用的核心2400 行。关键步骤:
#### 阶段 1前置准备1018-1400 行)
```
queryModel()
├── off-switch 检查Opus 过载时的全局关闭开关)
├── beta headers 组装getMergedBetas
│ ├── 基础 betas
│ ├── advisor beta如果启用
│ ├── tool search beta如果启用
│ ├── cache scope beta
│ └── effort / task budget betas
├── 工具过滤
│ ├── tool search 启用 → 只包含已发现的 deferred tools
│ └── tool search 未启用 → 过滤掉 ToolSearchTool
├── toolToAPISchema() — 每个工具转为 API 格式
├── normalizeMessagesForAPI() — 消息转换为 API 格式
│ ├── UserMessage → { role: 'user', content: ... }
│ ├── AssistantMessage → { role: 'assistant', content: ... }
│ └── 跳过 system/attachment/progress 等内部消息类型
└── 系统提示最终组装
├── getAttributionHeader(fingerprint)
├── getCLISyspromptPrefix()
├── ...systemPrompt
└── advisor 指令(如果启用)
```
#### 阶段 2构建请求参数 — paramsFromContext()(第 1539-1730 行)
```ts
const paramsFromContext = (retryContext: RetryContext) => {
// ... 动态 beta headers、effort、task budget 配置 ...
// 思考模式配置adaptive 或 enabled + budget
let thinking = undefined
if (hasThinking && modelSupportsThinking(options.model)) {
if (modelSupportsAdaptiveThinking(options.model)) {
thinking = { type: 'adaptive' }
} else {
thinking = { type: 'enabled', budget_tokens: thinkingBudget }
}
}
return {
model: normalizeModelStringForAPI(options.model),
messages: addCacheBreakpoints(messagesForAPI, ...), // 带缓存标记的消息
system, // 系统提示块(已构建好)
tools: allTools, // 工具 schema
tool_choice: options.toolChoice,
max_tokens: maxOutputTokens,
thinking,
...(temperature !== undefined && { temperature }),
...(useBetas && { betas: betasParams }),
metadata: getAPIMetadata(),
...extraBodyParams,
...(speed !== undefined && { speed }), // 快速模式
}
}
```
#### 阶段 3流式 API 调用(第 1779-1858 行)
```ts
// 使用 withRetry 包装,自动处理重试
const generator = withRetry(
() => getAnthropicClient({ maxRetries: 0, model, source: querySource }),
async (anthropic, attempt, context) => {
const params = paramsFromContext(context)
// ★ 核心 API 调用(第 1823 行)
// 使用 .create() + stream: true而非 .stream()
// 避免 BetaMessageStream 的 O(n²) partial JSON 解析开销
const result = await anthropic.beta.messages
.create(
{ ...params, stream: true },
{ signal, ...(clientRequestId && { headers: { ... } }) },
)
.withResponse()
return result.data // Stream<BetaRawMessageStreamEvent>
},
{ model, fallbackModel, thinkingConfig, signal, querySource }
)
// 消费 withRetry 的系统错误消息(重试通知等)
let e
do {
e = await generator.next()
if (!('controller' in e.value)) yield e.value // yield API 错误消息
} while (!e.done)
stream = e.value // 获取最终的 Stream 对象
// 处理流式事件(第 1941 行)
for await (const part of stream) {
switch (part.type) {
case 'message_start': // 记录 request_id、usage
case 'content_block_start': // 新的内容块开始text/thinking/tool_use
case 'content_block_delta': // 增量内容 → yield stream_event 给 UI
case 'content_block_stop': // 内容块完成 → yield AssistantMessage
case 'message_delta': // stop_reason、usage 更新
case 'message_stop': // 整条消息完成
}
}
```
#### 阶段 4withRetry 重试策略
```
withRetry 逻辑:
├── 429 (Rate Limit) → 等待 Retry-After 后重试
├── 529 (Overloaded) → 切换到 fallbackModelthrow FallbackTriggeredError
├── 500 (Server Error) → 指数退避重试
├── 408 (Timeout) → 重试
├── 其他错误 → 不重试,直接抛出
└── 最大重试次数: 根据模型和错误类型动态计算
```
#### 阶段 5非流式降级
当流式请求中途失败时,可能降级为非流式请求:
```
流式失败(部分响应已收到):
├── 已接收的内容 → yield 给上层
├── 剩余部分 → 降级为非流式请求anthropic.beta.messages.create
└── 非流式结果 → 转换格式 yield
```
### 3.5 消息转换函数
```ts
// UserMessage → API 格式
userMessageToMessageParam(message, addCache, enablePromptCaching, querySource)
{ role: 'user', content: [...] }
// addCache=true 时最后一个 content block 添加 cache_control
// AssistantMessage → API 格式
assistantMessageToMessageParam(message, addCache, enablePromptCaching, querySource)
{ role: 'assistant', content: [...] }
// thinking/redacted_thinking 块不加 cache_control
```
### 3.6 Prompt Caching 策略
```
缓存策略:
├── cache_control: { type: 'ephemeral' } — 默认5 分钟 TTL
├── cache_control: { type: 'ephemeral', ttl: '1h' } — 订阅用户/Ant1 小时
├── cache_control: { ..., scope: 'global' } — 跨会话共享(无 MCP 工具时)
└── 禁用条件:
├── DISABLE_PROMPT_CACHING 环境变量
├── DISABLE_PROMPT_CACHING_HAIKU仅 Haiku
└── DISABLE_PROMPT_CACHING_SONNET仅 Sonnet
```
### 3.7 多 Provider 支持
`getAnthropicClient()` 根据配置返回不同的 SDK 客户端:
| Provider | 入口 | 说明 |
|----------|------|------|
| Anthropic | 直接 API | 默认,`api.anthropic.com` |
| AWS Bedrock | 通过 Bedrock | 使用 `@anthropic-ai/bedrock-sdk` |
| Google Vertex | 通过 Vertex | 使用 `@anthropic-ai/vertex-sdk` |
| Azure | 通过 Azure | 类似 Bedrock 的包装 |
Provider 选择逻辑在 `src/utils/model/providers.ts``getAPIProvider()` 中。
---
## 完整数据流:一次工具调用的生命周期
以用户输入 "读取 README.md" 为例:
```
1. REPL.tsx: 用户按回车
onSubmit("读取 README.md")
└── handlePromptSubmit()
└── onQuery([userMessage])
2. REPL.tsx: onQueryImpl()
├── getSystemPrompt() + getUserContext() + getSystemContext()
└── for await (event of query({messages, systemPrompt, ...}))
3. query.ts: queryLoop() — 第 1 次迭代
├── messagesForQuery = [...messages] // 包含用户消息
├── deps.callModel({...})
│ └── claude.ts: queryModel()
│ ├── 构建 API 参数
│ └── anthropic.beta.messages.create({ ...params, stream: true })
├── API 流式返回:
│ content_block_start: { type: 'tool_use', name: 'Read', id: 'toolu_123' }
│ content_block_delta: { input: '{"file_path": "/path/to/README.md"}' }
│ content_block_stop
│ message_delta: { stop_reason: 'tool_use' }
├── 收集: toolUseBlocks = [{ name: 'Read', id: 'toolu_123', input: {...} }]
├── needsFollowUp = true
├── 工具执行:
│ streamingToolExecutor.getRemainingResults()
│ └── Read 工具执行 → 返回文件内容
│ yield toolResultMessage ← 包含文件内容
└── state = { messages: [...old, assistantMsg, toolResultMsg], turnCount: 2 }
→ continue
4. query.ts: queryLoop() — 第 2 次迭代
├── messagesForQuery 现在包含:
│ [userMsg, assistantMsg(tool_use), userMsg(tool_result)]
├── deps.callModel({...}) ← 再次调用 API
├── API 返回:
│ content_block_start: { type: 'text' }
│ content_block_delta: { text: "README.md 的内容是..." }
│ content_block_stop
│ message_delta: { stop_reason: 'end_turn' }
├── toolUseBlocks = [] ← 没有工具调用
├── needsFollowUp = false
└── return { reason: 'completed' } ★ 循环结束
5. REPL.tsx: onQueryEvent(event)
├── 更新 streamingText打字机效果
├── 更新 messages 数组
└── 重新渲染 UI
```
---
## 关键设计模式总结
| 模式 | 位置 | 说明 |
|------|------|------|
| AsyncGenerator 链式传递 | query.ts → claude.ts | `yield*` 将底层事件透传给上层,形成事件流管道 |
| while(true) + State 对象 | query.ts queryLoop | 循环迭代间通过不可变 State 传递transition 字段记录原因 |
| StreamingToolExecutor | query.ts | API 流式返回时并行执行工具,不等流结束 |
| Withheld 消息 | query.ts | 可恢复错误先暂扣不 yield恢复成功则吞掉错误 |
| withRetry 重试 | claude.ts | 429/500/529 自动重试529 触发模型降级 |
| Prompt Caching | claude.ts | 缓存系统提示和历史消息,减少 API token 消耗 |
| 非流式降级 | claude.ts | 流式请求中途失败时降级为非流式完成剩余部分 |
| QueryEngine 包装 | QueryEngine.ts | 为 SDK/print 提供会话管理、持久化、usage 跟踪 |
## 需要忽略的代码
| 模式 | 说明 |
|------|------|
| `feature('REACTIVE_COMPACT')` / `feature('CONTEXT_COLLAPSE')` 等 | 所有 feature flag 保护的代码 — 全部是死代码 |
| `feature('CACHED_MICROCOMPACT')` | 缓存微压缩 — 死代码 |
| `feature('HISTORY_SNIP')` / `snipModule` | 历史截断 — 死代码 |
| `feature('TOKEN_BUDGET')` / `budgetTracker` | 令牌预算 — 死代码 |
| `feature('BG_SESSIONS')` / `taskSummaryModule` | 后台会话 — 死代码 |
| `process.env.USER_TYPE === 'ant'` | Anthropic 内部专用代码 |
| VCR (withStreamingVCR/withVCR) | 调试录像/回放包装器,不影响正常流程 |