claude-code/learn/phase-2-conversation-loop.md

774 lines
30 KiB
Markdown
Raw Normal View History

# 第二阶段:核心对话循环详解
> 用户发一句话后,如何变成 API 请求、如何处理流式响应和工具调用
## 对话循环总览
```
用户输入 "帮我读取 README.md"
REPL.tsx: onSubmit → onQuery → onQueryImpl
├── 1. 并行加载上下文:
│ getSystemPrompt() + getUserContext() + getSystemContext()
├── 2. buildEffectiveSystemPrompt() — 合成最终系统提示
├── 3. for await (const event of query({...})) ★ 核心循环
│ │
│ │ query.ts: queryLoop()
│ │ ├── while (true) {
│ │ │ ├── autocompact / microcompact 处理
│ │ │ ├── deps.callModel() → claude.ts 流式 API 调用
│ │ │ │ └── for await (message of stream) { yield message }
│ │ │ │
│ │ │ ├── 收集 assistant 消息中的 tool_use 块
│ │ │ │
│ │ │ ├── needsFollowUp?
│ │ │ │ ├── true → 执行工具 → 收集结果 → state = next → continue
│ │ │ │ └── false → 检查错误恢复 → return { reason: 'completed' }
│ │ │ }
│ │
│ └── onQueryEvent(event) — 更新 UI 状态
└── 4. 收尾: resetLoadingState(), onTurnComplete()
```
### 两条数据路径
| 路径 | 调用方 | 说明 |
|------|--------|------|
| **交互式REPL** | REPL.tsx → `query()` | 直接调用 `query()` AsyncGenerator |
| **非交互式SDK/print** | print.ts → `QueryEngine.submitMessage()``query()` | 通过 QueryEngine 包装增加了会话持久化、usage 跟踪等 |
---
## 1. query.ts1732 行)— 核心查询循环
**文件路径**: `src/query.ts`
### 1.1 文件结构
```
query.ts (1732 行)
├── [0-120] Import 区 + feature flag 条件模块加载
├── [122-148] yieldMissingToolResultBlocks() — 为未配对的 tool_use 生成错误 tool_result
├── [150-178] 常量与辅助函数 (MAX_OUTPUT_TOKENS_RECOVERY_LIMIT, isWithheldMaxOutputTokens)
├── [180-198] QueryParams 类型定义
├── [200-216] State 类型 — 循环迭代间的可变状态
├── [218-238] query() — 导出的 AsyncGenerator委托给 queryLoop()
├── [240-1732] queryLoop() — 核心 while(true) 循环
│ ├── [241-306] 初始化 State + 内存预取
│ ├── [307-448] 循环开头:解构 state、消息预处理snip/microcompact/context collapse
│ ├── [449-578] 系统提示构建(第449行) + autocompact(第453行) + StreamingToolExecutor 初始化(第562行)
│ ├── [650-866] ★ deps.callModel()(第659行) + 流式响应处理 + tool_use 收集
│ ├── [896-956] 错误处理FallbackTriggeredError、通用错误
│ ├── [1002-1054] 中断处理abortController.signal.aborted
│ ├── [1065-1360] 无 followUp 时的终止/恢复逻辑
│ │ ├── prompt-too-long 恢复
│ │ ├── max_output_tokens 恢复(升级 + 多轮)
│ │ ├── stop hooks 执行
│ │ └── return { reason: 'completed' }
│ └── [1360-1732] 有 followUp 时的工具执行 + 下一轮准备
│ ├── 工具执行streaming 或 sequential
│ ├── attachment 注入(排队命令、内存预取、技能发现)
│ ├── maxTurns 检查
│ └── state = next → continue
```
### 1.2 入口query() 函数(第 219 行)
```ts
export async function* query(params: QueryParams):
AsyncGenerator<StreamEvent | Message | ..., Terminal> {
const consumedCommandUuids: string[] = []
const terminal = yield* queryLoop(params, consumedCommandUuids)
// 通知所有消费的排队命令已完成
for (const uuid of consumedCommandUuids) {
notifyCommandLifecycle(uuid, 'completed')
}
return terminal
}
```
`query()` 本身很薄,只做两件事:
1. 委托给 `queryLoop()` 执行实际逻辑
2. 在正常返回后通知排队命令的生命周期
### 1.3 QueryParams第 181 行)
```ts
type QueryParams = {
messages: Message[] // 当前对话消息
systemPrompt: SystemPrompt // 系统提示
userContext: { [k: string]: string } // 用户上下文CLAUDE.md 等)
systemContext: { [k: string]: string } // 系统上下文git 状态等)
canUseTool: CanUseToolFn // 工具权限检查函数
toolUseContext: ToolUseContext // 工具执行上下文
fallbackModel?: string // 备用模型
querySource: QuerySource // 查询来源标识
maxTurns?: number // 最大轮次限制
taskBudget?: { total: number } // 令牌预算
}
```
### 1.4 State — 循环迭代间的可变状态(第 204 行)
```ts
type State = {
messages: Message[] // 累积的消息列表
toolUseContext: ToolUseContext // 工具执行上下文
autoCompactTracking: ... // 自动压缩跟踪
maxOutputTokensRecoveryCount: number // 输出令牌恢复尝试次数
hasAttemptedReactiveCompact: boolean // 是否已尝试响应式压缩
maxOutputTokensOverride: number | undefined // 输出令牌覆盖
pendingToolUseSummary: Promise<...> // 待处理的工具使用摘要
stopHookActive: boolean | undefined // stop hook 是否活跃
turnCount: number // 当前轮次
transition: Continue | undefined // 上一次迭代为何 continue
}
```
**设计关键**:每次 `continue` 时通过 `state = { ... }` 一次性更新所有状态,而不是分散的 9 个赋值。`transition` 字段记录了为什么要继续循环(便于调试和测试)。
### 1.5 queryLoop() 核心流程(第 241 行)
`while (true)` 循环(第 307 行)的每次迭代代表一次 API 调用。循环直到:
- 模型不需要工具调用 → `return { reason: 'completed' }`
- 被用户中断 → `return { reason: 'aborted_*' }`
- 达到最大轮次 → `return { reason: 'max_turns' }`
- 遇到不可恢复的错误 → `return { reason: 'model_error' }`
#### 步骤 1消息预处理
```
每次迭代开头:
├── 解构 state → messages, toolUseContext, tracking, ...
├── getMessagesAfterCompactBoundary() — 只保留压缩边界后的消息
├── snip 处理feature flag跳过
├── microcompact 处理feature flag跳过
└── autocompact 检查 — 消息过长时自动压缩
```
#### 步骤 2系统提示构建第 449 行)
```ts
const fullSystemPrompt = asSystemPrompt(
appendSystemContext(systemPrompt, systemContext),
)
```
将系统上下文git 状态、日期等追加到系统提示。注意用户上下文CLAUDE.md 等)不在这里注入,而是在 `deps.callModel()` 调用时通过 `prependUserContext(messagesForQuery, userContext)` 注入到消息数组的最前面(第 660 行)。
#### 步骤 3Autocompact第 454-543 行)
当消息历史过长时自动压缩:
```
autocompact 流程:
├── 检查 token 数量是否超过阈值
├── 超过 → 调用 compact API用 Haiku 总结历史)
│ ├── yield compactBoundaryMessage ← 标记压缩边界
│ └── 更新 messages 为压缩后的版本
└── 未超过 → 继续
```
#### 步骤 4调用 API第 559-708 行)— 核心
StreamingToolExecutor 在第 562 行初始化API 调用在第 659 行开始:
```ts
// 第 562 行:初始化流式工具执行器
let streamingToolExecutor = useStreamingToolExecution
? new StreamingToolExecutor(
toolUseContext.options.tools, canUseTool, toolUseContext,
)
: null
// 第 659 行:调用 API
for await (const message of deps.callModel({
messages: prependUserContext(messagesForQuery, userContext), // ← 用户上下文注入到消息最前面
systemPrompt: fullSystemPrompt,
thinkingConfig: toolUseContext.options.thinkingConfig,
tools: toolUseContext.options.tools,
signal: toolUseContext.abortController.signal,
options: { model: currentModel, querySource, fallbackModel, ... }
})) {
// 处理每条流式消息(第 708-866 行)
}
```
`deps.callModel()` 最终调用 `claude.ts``queryModelWithStreaming()`
#### 步骤 5流式响应处理第 708-866 行)
处理逻辑在 `for await` 循环体内(第 708 行的 `})` 之后到第 866 行):
```
for await (const message of stream):
├── message.type === 'assistant'?
│ ├── 记录到 assistantMessages[]
│ ├── 提取 tool_use 块 → toolUseBlocks[]
│ ├── needsFollowUp = true如果有 tool_use
│ └── streamingToolExecutor.addTool() ← 流式工具并行执行
├── withheld? (prompt-too-long / max_output_tokens)
│ └── 暂扣不 yield等后面恢复逻辑处理
└── yield message ← 正常 yield 给上层REPL/QueryEngine
```
**StreamingToolExecutor**:在 API 流式返回的同时就开始执行工具(如读文件),不等流结束。通过 `addTool()` 添加待执行工具,`getCompletedResults()` 获取已完成的结果。
#### 步骤 6A无 followUp — 终止/恢复(第 1065-1360 行)
当模型没有请求工具调用时(`needsFollowUp === false`
```
无 followUp:
├── prompt-too-long 恢复?
│ ├── context collapse drainfeature flag跳过
│ ├── reactive compact → 压缩消息重试
│ └── 都失败 → yield 错误 + return
├── max_output_tokens 恢复?
│ ├── 第一次 → 升级到 64k token 限制continue
│ ├── 后续 → 注入恢复消息("继续,别道歉"continue
│ └── 超过 3 次 → yield 错误 + return
├── stop hooks 执行
│ ├── preventContinuation? → return
│ └── blockingErrors? → 将错误加入消息continue
└── return { reason: 'completed' } ★ 正常结束
```
**恢复消息内容(第 1229 行)**
```
"Output token limit hit. Resume directly — no apology, no recap of what
you were doing. Pick up mid-thought if that is where the cut happened.
Break remaining work into smaller pieces."
```
#### 步骤 6B有 followUp — 工具执行 + 下一轮(第 1363-1731 行)
当模型请求了工具调用时(`needsFollowUp === true`
```
有 followUp:
├── 工具执行(两种模式)
│ ├── streamingToolExecutor? → getRemainingResults()(流式已启动)
│ └── 否 → runTools()(传统顺序执行)
├── for await (const update of toolUpdates):
│ ├── yield update.message ← 工具结果消息
│ └── toolResults.push(...) ← 收集工具结果
├── 中断检查abortController.signal.aborted
│ └── return { reason: 'aborted_tools' }
├── attachment 注入
│ ├── 排队命令(其他线程提交的消息)
│ ├── 内存预取(相关记忆文件)
│ └── 技能发现预取
├── maxTurns 检查
│ └── 超过 → yield max_turns_reached + return
└── state = { messages: [...old, ...assistant, ...toolResults], turnCount: +1 }
→ continue ★ 回到循环顶部,发起下一次 API 调用
```
### 1.6 错误处理与模型降级(第 897-956 行)
```
API 调用出错:
├── FallbackTriggeredError529 过载)?
│ ├── 切换到 fallbackModel
│ ├── 清空本轮 assistant/tool 消息
│ ├── yield 系统消息 "Switched to X due to high demand for Y"
│ └── continue重试整个请求
└── 其他错误
├── ImageSizeError/ImageResizeError → yield 友好错误 + return
├── yieldMissingToolResultBlocks() — 补全未配对的 tool_result
└── yield API 错误消息 + return
```
### 1.7 关键设计思想
| 设计 | 说明 |
|------|------|
| **AsyncGenerator 模式** | `query()``async function*`,通过 `yield` 逐条产出事件,调用者用 `for await` 消费 |
| **while(true) + state 对象** | 每次 `continue` 构建新 State 对象,避免分散的状态修改 |
| **transition 字段** | 记录为什么要 continue`next_turn``max_output_tokens_recovery``reactive_compact_retry`...),便于调试 |
| **StreamingToolExecutor** | API 流式返回时就并行执行工具,不等流结束 |
| **Withheld 消息** | 可恢复错误先暂扣,恢复成功则不 yield 错误,失败才 yield |
---
## 2. QueryEngine.ts1320 行)— 高层编排器
**文件路径**: `src/QueryEngine.ts`
### 2.1 定位
QueryEngine 是 `query()` 的**上层包装**,主要用于:
- **print 模式**`claude -p`):通过 `ask()``QueryEngine.submitMessage()`
- **SDK 模式**:外部程序通过 SDK 调用
- **REPL 不用它**REPL 直接调用 `query()`
### 2.2 文件结构
```
QueryEngine.ts (1320 行)
├── [0-130] Import 区 + feature flag 条件模块
├── [131-174] QueryEngineConfig 类型定义
├── [185-1202] QueryEngine 类
│ ├── [185-208] 成员变量 + constructor
│ ├── [210-1181] submitMessage() — 核心方法(~970 行)
│ │ ├── [210-400] 参数解析 + processUserInputContext 构建
│ │ ├── [400-465] 用户输入处理 + 会话持久化
│ │ ├── [465-660] 斜杠命令处理 + 无需查询的快速返回
│ │ ├── [660-690] 文件历史快照
│ │ ├── [679-1074] ★ for await (const message of query({...})) — 消费 query()
│ │ └── [1074-1181] 结果提取 + yield result
│ ├── [1183-1202] interrupt() / getMessages() / setModel() 辅助方法
├── [1210-1320] ask() — 便捷包装函数
```
### 2.3 QueryEngineConfig
```ts
type QueryEngineConfig = {
cwd: string // 工作目录
tools: Tools // 工具列表
commands: Command[] // 斜杠命令
mcpClients: MCPServerConnection[] // MCP 服务器连接
agents: AgentDefinition[] // Agent 定义
canUseTool: CanUseToolFn // 权限检查
getAppState / setAppState // 全局状态存取
initialMessages?: Message[] // 初始消息(恢复对话)
readFileCache: FileStateCache // 文件读取缓存
customSystemPrompt?: string // 自定义系统提示
thinkingConfig?: ThinkingConfig // 思考模式配置
maxTurns?: number // 最大轮次
maxBudgetUsd?: number // USD 预算上限
jsonSchema?: Record<...> // 结构化输出 schema
// ... 更多配置
}
```
### 2.4 submitMessage() 核心流程
```
submitMessage(prompt)
├── 1. 参数准备
│ ├── 解构 config 获取 tools, commands, model, ...
│ ├── 构建 wrappedCanUseTool包装权限检查跟踪拒绝
│ ├── fetchSystemPromptParts() — 获取系统提示各部分
│ └── 构建 processUserInputContext
├── 2. 用户输入处理
│ ├── processUserInput(prompt) — 解析斜杠命令 / 普通文本
│ ├── mutableMessages.push(...messagesFromUserInput)
│ └── recordTranscript(messages) — 持久化到 JSONL
├── 3. yield buildSystemInitMessage() — SDK 初始化消息
├── 4. shouldQuery === false?(斜杠命令的本地执行结果)
│ ├── yield 命令输出
│ ├── yield { type: 'result', subtype: 'success' }
│ └── return
├── 5. ★ for await (const message of query({...}))
│ │ 消费 query() 产出的每条消息
│ │
│ ├── message.type === 'assistant'
│ │ ├── mutableMessages.push(msg)
│ │ ├── recordTranscript() ← fire-and-forget
│ │ ├── yield* normalizeMessage(msg) — 转换为 SDK 格式
│ │ └── 捕获 stop_reason
│ │
│ ├── message.type === 'user'(工具结果)
│ │ ├── mutableMessages.push(msg)
│ │ ├── turnCount++
│ │ └── yield* normalizeMessage(msg)
│ │
│ ├── message.type === 'stream_event'
│ │ ├── 跟踪 usagemessage_start/delta/stop
│ │ └── includePartialMessages? → yield 流事件
│ │
│ ├── message.type === 'system'
│ │ ├── compact_boundary → GC 旧消息 + yield 给 SDK
│ │ └── api_error → yield 重试信息
│ │
│ └── maxBudgetUsd 检查 → 超预算则 yield error + return
└── 6. yield { type: 'result', subtype: 'success', result: textResult }
```
### 2.5 ask() 便捷函数(第 1211 行)
```ts
export async function* ask({ prompt, tools, ... }) {
const engine = new QueryEngine({ ... })
try {
yield* engine.submitMessage(prompt)
} finally {
setReadFileCache(engine.getReadFileState())
}
}
```
`ask()``QueryEngine` 的一次性包装,创建 engine → 提交消息 → 清理。用于 `print.ts``--print` 模式。
### 2.6 QueryEngine vs REPL 直接调用 query()
| 特性 | QueryEngine (SDK/print) | REPL 直接调用 query() |
|------|------------------------|---------------------|
| 会话持久化 | 自动 recordTranscript | 由 useLogMessages 处理 |
| Usage 跟踪 | 内部 totalUsage 累积 | 由外层 cost-tracker 处理 |
| 权限拒绝跟踪 | 记录 permissionDenials[] | 直接 UI 交互 |
| 结果格式 | yield SDKMessage 格式 | 原始 Message 格式 |
| 消息 GC | compact_boundary 后释放旧消息 | UI 需要保留完整历史 |
---
## 3. claude.ts3420 行)— API 客户端
**文件路径**: `src/services/api/claude.ts`
### 3.1 文件结构
```
claude.ts (3420 行)
├── [0-260] Import 区(大量 SDK 类型、工具函数)
├── [272-331] getExtraBodyParams() — 构建额外请求体参数
├── [333-502] 缓存相关getPromptCachingEnabled, getCacheControl, should1hCacheTTL, configureEffortParams, configureTaskBudgetParams
├── [504-587] verifyApiKey() — API 密钥验证
├── [589-675] 消息转换userMessageToMessageParam, assistantMessageToMessageParam
├── [677-708] Options 类型定义
├── [710-781] queryModelWithoutStreaming / queryModelWithStreaming — 公开的两个入口
├── [783-813] 辅助函数shouldDeferLspTool, getNonstreamingFallbackTimeoutMs
├── [819-918] executeNonStreamingRequest() — 非流式请求辅助
├── [920-999] 更多辅助函数getPreviousRequestIdFromMessages, stripExcessMediaItems
├── [1018-3420] ★ queryModel() — 核心私有函数2400 行)
│ ├── [1018-1370] 前置检查 + 工具 schema 构建 + 消息归一化 + 系统提示组装
│ ├── [1539-1730] paramsFromContext() — 构建 API 请求参数
│ ├── [1777-2100] withRetry + 流式 API 调用anthropic.beta.messages.create + stream
│ ├── [1941-2300] 流式事件处理for await of stream
│ └── [2300-3420] 非流式降级 + 日志、分析、清理
```
### 3.2 两个公开入口
```ts
// 入口 1流式主要路径
export async function* queryModelWithStreaming({
messages, systemPrompt, thinkingConfig, tools, signal, options
}) {
yield* withStreamingVCR(messages, async function* () {
yield* queryModel(messages, systemPrompt, thinkingConfig, tools, signal, options)
})
}
// 入口 2非流式compact 等内部用途)
export async function queryModelWithoutStreaming({
messages, systemPrompt, thinkingConfig, tools, signal, options
}) {
let assistantMessage
for await (const message of ...) {
if (message.type === 'assistant') assistantMessage = message
}
return assistantMessage
}
```
两者都委托给内部的 `queryModel()``withStreamingVCR` 是一个 VCR录像/回放)包装器,用于调试。
### 3.3 Options 类型(第 677 行)
```ts
type Options = {
getToolPermissionContext: () => Promise<ToolPermissionContext>
model: string // 模型名称
toolChoice?: BetaToolChoiceTool // 强制使用特定工具
isNonInteractiveSession: boolean // 是否非交互模式
fallbackModel?: string // 备用模型
querySource: QuerySource // 查询来源
agents: AgentDefinition[] // Agent 定义
enablePromptCaching?: boolean // 启用提示缓存
effortValue?: EffortValue // 推理努力级别
mcpTools: Tools // MCP 工具
fastMode?: boolean // 快速模式
taskBudget?: { total: number; remaining?: number } // 令牌预算
}
```
### 3.4 queryModel() 核心流程(第 1018 行)
这是整个 API 调用的核心2400 行。关键步骤:
#### 阶段 1前置准备1018-1400 行)
```
queryModel()
├── off-switch 检查Opus 过载时的全局关闭开关)
├── beta headers 组装getMergedBetas
│ ├── 基础 betas
│ ├── advisor beta如果启用
│ ├── tool search beta如果启用
│ ├── cache scope beta
│ └── effort / task budget betas
├── 工具过滤
│ ├── tool search 启用 → 只包含已发现的 deferred tools
│ └── tool search 未启用 → 过滤掉 ToolSearchTool
├── toolToAPISchema() — 每个工具转为 API 格式
├── normalizeMessagesForAPI() — 消息转换为 API 格式
│ ├── UserMessage → { role: 'user', content: ... }
│ ├── AssistantMessage → { role: 'assistant', content: ... }
│ └── 跳过 system/attachment/progress 等内部消息类型
└── 系统提示最终组装
├── getAttributionHeader(fingerprint)
├── getCLISyspromptPrefix()
├── ...systemPrompt
└── advisor 指令(如果启用)
```
#### 阶段 2构建请求参数 — paramsFromContext()(第 1539-1730 行)
```ts
const paramsFromContext = (retryContext: RetryContext) => {
// ... 动态 beta headers、effort、task budget 配置 ...
// 思考模式配置adaptive 或 enabled + budget
let thinking = undefined
if (hasThinking && modelSupportsThinking(options.model)) {
if (modelSupportsAdaptiveThinking(options.model)) {
thinking = { type: 'adaptive' }
} else {
thinking = { type: 'enabled', budget_tokens: thinkingBudget }
}
}
return {
model: normalizeModelStringForAPI(options.model),
messages: addCacheBreakpoints(messagesForAPI, ...), // 带缓存标记的消息
system, // 系统提示块(已构建好)
tools: allTools, // 工具 schema
tool_choice: options.toolChoice,
max_tokens: maxOutputTokens,
thinking,
...(temperature !== undefined && { temperature }),
...(useBetas && { betas: betasParams }),
metadata: getAPIMetadata(),
...extraBodyParams,
...(speed !== undefined && { speed }), // 快速模式
}
}
```
#### 阶段 3流式 API 调用(第 1779-1858 行)
```ts
// 使用 withRetry 包装,自动处理重试
const generator = withRetry(
() => getAnthropicClient({ maxRetries: 0, model, source: querySource }),
async (anthropic, attempt, context) => {
const params = paramsFromContext(context)
// ★ 核心 API 调用(第 1823 行)
// 使用 .create() + stream: true而非 .stream()
// 避免 BetaMessageStream 的 O(n²) partial JSON 解析开销
const result = await anthropic.beta.messages
.create(
{ ...params, stream: true },
{ signal, ...(clientRequestId && { headers: { ... } }) },
)
.withResponse()
return result.data // Stream<BetaRawMessageStreamEvent>
},
{ model, fallbackModel, thinkingConfig, signal, querySource }
)
// 消费 withRetry 的系统错误消息(重试通知等)
let e
do {
e = await generator.next()
if (!('controller' in e.value)) yield e.value // yield API 错误消息
} while (!e.done)
stream = e.value // 获取最终的 Stream 对象
// 处理流式事件(第 1941 行)
for await (const part of stream) {
switch (part.type) {
case 'message_start': // 记录 request_id、usage
case 'content_block_start': // 新的内容块开始text/thinking/tool_use
case 'content_block_delta': // 增量内容 → yield stream_event 给 UI
case 'content_block_stop': // 内容块完成 → yield AssistantMessage
case 'message_delta': // stop_reason、usage 更新
case 'message_stop': // 整条消息完成
}
}
```
#### 阶段 4withRetry 重试策略
```
withRetry 逻辑:
├── 429 (Rate Limit) → 等待 Retry-After 后重试
├── 529 (Overloaded) → 切换到 fallbackModelthrow FallbackTriggeredError
├── 500 (Server Error) → 指数退避重试
├── 408 (Timeout) → 重试
├── 其他错误 → 不重试,直接抛出
└── 最大重试次数: 根据模型和错误类型动态计算
```
#### 阶段 5非流式降级
当流式请求中途失败时,可能降级为非流式请求:
```
流式失败(部分响应已收到):
├── 已接收的内容 → yield 给上层
├── 剩余部分 → 降级为非流式请求anthropic.beta.messages.create
└── 非流式结果 → 转换格式 yield
```
### 3.5 消息转换函数
```ts
// UserMessage → API 格式
userMessageToMessageParam(message, addCache, enablePromptCaching, querySource)
→ { role: 'user', content: [...] }
// addCache=true 时最后一个 content block 添加 cache_control
// AssistantMessage → API 格式
assistantMessageToMessageParam(message, addCache, enablePromptCaching, querySource)
→ { role: 'assistant', content: [...] }
// thinking/redacted_thinking 块不加 cache_control
```
### 3.6 Prompt Caching 策略
```
缓存策略:
├── cache_control: { type: 'ephemeral' } — 默认5 分钟 TTL
├── cache_control: { type: 'ephemeral', ttl: '1h' } — 订阅用户/Ant1 小时
├── cache_control: { ..., scope: 'global' } — 跨会话共享(无 MCP 工具时)
└── 禁用条件:
├── DISABLE_PROMPT_CACHING 环境变量
├── DISABLE_PROMPT_CACHING_HAIKU仅 Haiku
└── DISABLE_PROMPT_CACHING_SONNET仅 Sonnet
```
### 3.7 多 Provider 支持
`getAnthropicClient()` 根据配置返回不同的 SDK 客户端:
| Provider | 入口 | 说明 |
|----------|------|------|
| Anthropic | 直接 API | 默认,`api.anthropic.com` |
| AWS Bedrock | 通过 Bedrock | 使用 `@anthropic-ai/bedrock-sdk` |
| Google Vertex | 通过 Vertex | 使用 `@anthropic-ai/vertex-sdk` |
| Azure | 通过 Azure | 类似 Bedrock 的包装 |
Provider 选择逻辑在 `src/utils/model/providers.ts``getAPIProvider()` 中。
---
## 完整数据流:一次工具调用的生命周期
以用户输入 "读取 README.md" 为例:
```
1. REPL.tsx: 用户按回车
onSubmit("读取 README.md")
└── handlePromptSubmit()
└── onQuery([userMessage])
2. REPL.tsx: onQueryImpl()
├── getSystemPrompt() + getUserContext() + getSystemContext()
└── for await (event of query({messages, systemPrompt, ...}))
3. query.ts: queryLoop() — 第 1 次迭代
├── messagesForQuery = [...messages] // 包含用户消息
├── deps.callModel({...})
│ └── claude.ts: queryModel()
│ ├── 构建 API 参数
│ └── anthropic.beta.messages.create({ ...params, stream: true })
├── API 流式返回:
│ content_block_start: { type: 'tool_use', name: 'Read', id: 'toolu_123' }
│ content_block_delta: { input: '{"file_path": "/path/to/README.md"}' }
│ content_block_stop
│ message_delta: { stop_reason: 'tool_use' }
├── 收集: toolUseBlocks = [{ name: 'Read', id: 'toolu_123', input: {...} }]
├── needsFollowUp = true
├── 工具执行:
│ streamingToolExecutor.getRemainingResults()
│ └── Read 工具执行 → 返回文件内容
│ yield toolResultMessage ← 包含文件内容
└── state = { messages: [...old, assistantMsg, toolResultMsg], turnCount: 2 }
→ continue
4. query.ts: queryLoop() — 第 2 次迭代
├── messagesForQuery 现在包含:
│ [userMsg, assistantMsg(tool_use), userMsg(tool_result)]
├── deps.callModel({...}) ← 再次调用 API
├── API 返回:
│ content_block_start: { type: 'text' }
│ content_block_delta: { text: "README.md 的内容是..." }
│ content_block_stop
│ message_delta: { stop_reason: 'end_turn' }
├── toolUseBlocks = [] ← 没有工具调用
├── needsFollowUp = false
└── return { reason: 'completed' } ★ 循环结束
5. REPL.tsx: onQueryEvent(event)
├── 更新 streamingText打字机效果
├── 更新 messages 数组
└── 重新渲染 UI
```
---
## 关键设计模式总结
| 模式 | 位置 | 说明 |
|------|------|------|
| AsyncGenerator 链式传递 | query.ts → claude.ts | `yield*` 将底层事件透传给上层,形成事件流管道 |
| while(true) + State 对象 | query.ts queryLoop | 循环迭代间通过不可变 State 传递transition 字段记录原因 |
| StreamingToolExecutor | query.ts | API 流式返回时并行执行工具,不等流结束 |
| Withheld 消息 | query.ts | 可恢复错误先暂扣不 yield恢复成功则吞掉错误 |
| withRetry 重试 | claude.ts | 429/500/529 自动重试529 触发模型降级 |
| Prompt Caching | claude.ts | 缓存系统提示和历史消息,减少 API token 消耗 |
| 非流式降级 | claude.ts | 流式请求中途失败时降级为非流式完成剩余部分 |
| QueryEngine 包装 | QueryEngine.ts | 为 SDK/print 提供会话管理、持久化、usage 跟踪 |
## 需要忽略的代码
| 模式 | 说明 |
|------|------|
| `feature('REACTIVE_COMPACT')` / `feature('CONTEXT_COLLAPSE')` 等 | 所有 feature flag 保护的代码 — 全部是死代码 |
| `feature('CACHED_MICROCOMPACT')` | 缓存微压缩 — 死代码 |
| `feature('HISTORY_SNIP')` / `snipModule` | 历史截断 — 死代码 |
| `feature('TOKEN_BUDGET')` / `budgetTracker` | 令牌预算 — 死代码 |
| `feature('BG_SESSIONS')` / `taskSummaryModule` | 后台会话 — 死代码 |
| `process.env.USER_TYPE === 'ant'` | Anthropic 内部专用代码 |
| VCR (withStreamingVCR/withVCR) | 调试录像/回放包装器,不影响正常流程 |