claude-code/learn/phase-2-qa.md
mingyangxu46-prog b6f37082cf
Learn/20260401 (#39)
* docs: 添加 Claude Code 源码学习笔记(第一、二阶段)

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-02 21:47:49 +08:00

12 KiB
Raw Blame History

第二阶段 Q&A

Q1query.ts 的流式消息处理具体是怎样的?

核心问题deps.callModel() yield 出的每一条消息,在 queryLoop()for await 循环体L659-866中具体经历了什么处理

场景

用户说:"帮我看看 package.json 的内容"

模型回复:一段文字 "我来读取文件。" + 一个 Read 工具调用。

callModel yield 的完整消息序列

claude.ts 的 queryModel() 会 yield 两种类型的消息:

类型标记 含义 产出时机
stream_event 原始 SSE 事件包装 每个 SSE 事件都产出一条
assistant 完整的 AssistantMessage 仅在 content_block_stop 时产出

本例中 callModel 依次 yield 共 13 条消息

#1  { type: 'stream_event', event: { type: 'message_start', ... }, ttftMs: 342 }
#2  { type: 'stream_event', event: { type: 'content_block_start', index: 0, content_block: { type: 'text' } } }
#3  { type: 'stream_event', event: { type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: '我来' } } }
#4  { type: 'stream_event', event: { type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: '读取文件。' } } }
#5  { type: 'stream_event', event: { type: 'content_block_stop', index: 0 } }
#6  { type: 'assistant', uuid: 'uuid-1', message: { content: [{ type: 'text', text: '我来读取文件。' }], stop_reason: null } }
#7  { type: 'stream_event', event: { type: 'content_block_start', index: 1, content_block: { type: 'tool_use', id: 'toolu_001', name: 'Read' } } }
#8  { type: 'stream_event', event: { type: 'content_block_delta', index: 1, delta: { type: 'input_json_delta', partial_json: '{"file_path":' } } }
#9  { type: 'stream_event', event: { type: 'content_block_delta', index: 1, delta: { type: 'input_json_delta', partial_json: '"/path/package.json"}' } } }
#10 { type: 'stream_event', event: { type: 'content_block_stop', index: 1 } }
#11 { type: 'assistant', uuid: 'uuid-2', message: { content: [{ type: 'tool_use', id: 'toolu_001', name: 'Read', input: { file_path: '/path/package.json' } }], stop_reason: null } }
#12 { type: 'stream_event', event: { type: 'message_delta', delta: { stop_reason: 'tool_use' }, usage: { output_tokens: 87 } } }
#13 { type: 'stream_event', event: { type: 'message_stop' } }

注意 #6#11assistant 类型content_block_stop 时由 claude.ts 组装),其余全是 stream_event 类型

循环体结构

循环体在 L708-866结构如下

for await (const message of deps.callModel({...})) {   // L659
    // A. 降级检查 (L712)
    // B. backfill (L747-789)
    // C. withheld 检查 (L801-824)
    // D. yield (L825-827)
    // E. assistant 收集 + addTool (L828-848)
    // F. getCompletedResults (L850-865)
}

逐条走循环体

#1 stream_event (message_start)

A. L712: streamingFallbackOccured = false → 跳过

B. L748: message.type === 'assistant'?
   → 'stream_event' !== 'assistant' → 跳过整个 backfill 块

C. L801-824: withheld 检查
   → 不是 assistant 类型,各项检查均为 false → withheld = false

D. L825: yield message  ✅ → 透传给 REPLREPL 记录 ttftMs

E. L828: message.type === 'assistant'? → 否 → 跳过

F. L850-854: streamingToolExecutor.getCompletedResults()
   → tools 数组为空 → 无结果

净效果yield 透传。


#2 stream_event (content_block_start, type: text)

A-C. 同 #1
D.   yield message  ✅ → REPL 设置 spinner 为 "Responding..."
E-F. 同 #1

净效果yield 透传。


#3 stream_event (text_delta: "我来")

A-C. 同 #1
D.   yield message  ✅ → REPL 追加 streamingText += "我来"(打字机效果)
E-F. 同 #1

净效果yield 透传。


#4 stream_event (text_delta: "读取文件。")

同 #3
D. yield message  ✅ → REPL streamingText += "读取文件。"

净效果yield 透传。


#5 stream_event (content_block_stop, index:0)

同 #2
D. yield message  ✅ → REPL 无特殊操作(真正的 AssistantMessage 在下一条 #6

净效果yield 透传。


#6 assistant (text block 完整消息) ★

第一条 type: 'assistant' 的消息,走完全不同的路径

A. L712: streamingFallbackOccured = false → 跳过

B. L748: message.type === 'assistant'? → ✅ 进入 backfill
   L750: contentArr = [{ type: 'text', text: '我来读取文件。' }]
   L752: for i=0: block.type === 'text'
   L754: block.type === 'tool_use'? → 否 → 跳过
   L783: clonedContent 为 undefined → yieldMessage = message原样不变

C. L801: let withheld = false
   L802: feature('CONTEXT_COLLAPSE') → false → 跳过
   L813: reactiveCompact?.isWithheldPromptTooLong(message) → 否 → false
   L822: isWithheldMaxOutputTokens(message)
         → message.message.stop_reason === null → false
   → withheld = false

D. L825: yield message  ✅ → REPL 清除 streamingText添加完整 text 消息到列表

E. L828: message.type === 'assistant'? → ✅
   L830: assistantMessages.push(message)
         → assistantMessages = [uuid-1(text)]

   L832-834: msgToolUseBlocks = content.filter(type === 'tool_use')
             → [](这是 text block没有 tool_use

   L835: length > 0? → 否 → 不设 needsFollowUp
   L844: msgToolUseBlocks 为空 → 不调用 addTool

F. L854: getCompletedResults() → 空

净效果yield 消息 + assistantMessages 增加一条。needsFollowUp 仍为 false


#7 stream_event (content_block_start, tool_use: Read)

A-C. 同 stream_event 通用路径
D.   yield message  ✅ → REPL 设置 spinner 为 "tool-input",添加 streamingToolUse
E.   不是 assistant → 跳过
F.   getCompletedResults() → 空

#8 stream_event (input_json_delta: '{"file_path":')

D. yield message  ✅ → REPL 追加工具输入 JSON 碎片
F. getCompletedResults() → 空

#9 stream_event (input_json_delta: '"/path/package.json"}')

D. yield message  ✅
F. getCompletedResults() → 空

#10 stream_event (content_block_stop, index:1)

D. yield message  ✅
F. getCompletedResults() → 空

#11 assistant (tool_use block 完整消息) ★★

这条是最关键的——触发工具执行:

A. L712: streamingFallbackOccured = false → 跳过

B. L748: message.type === 'assistant'? → ✅ 进入 backfill
   L750: contentArr = [{ type: 'tool_use', id: 'toolu_001', name: 'Read',
                          input: { file_path: '/path/package.json' } }]
   L752: for i=0:
   L754: block.type === 'tool_use'? → ✅
   L756: typeof block.input === 'object' && !== null? → ✅
   L759: tool = findToolByName(tools, 'Read') → Read 工具定义
   L763: tool.backfillObservableInput 存在? → 假设存在
   L764-766: inputCopy = { file_path: '/path/package.json' }
             tool.backfillObservableInput(inputCopy)
             → 可能添加 absolutePath 字段
   L773-776: addedFields? → 假设有新增字段
             clonedContent = [...contentArr]
             clonedContent[0] = { ...block, input: inputCopy }
   L783-788: yieldMessage = {
               ...message,                 // uuid, type, timestamp 不变
               message: {
                 ...message.message,        // stop_reason, usage 不变
                 content: clonedContent      // ★ 替换为带 absolutePath 的副本
               }
             }
             // ★ 原始 message 保持不变(回传 API 保证缓存一致)

C. L801-824: withheld 检查 → 全部 false → withheld = false

D. L825: yield yieldMessage  ✅
         → yield 的是克隆版(带 backfill 字段),给 REPL 和 SDK 用
         → 原始 message 下面存进 assistantMessages回传 API 保证缓存一致

E. L828: message.type === 'assistant'? → ✅
   L830: assistantMessages.push(message)   // ★ push 原始 message不是 yieldMessage
         → assistantMessages = [uuid-1(text), uuid-2(tool_use)]

   L832-834: msgToolUseBlocks = content.filter(type === 'tool_use')
             → [{ type: 'tool_use', id: 'toolu_001', name: 'Read', input: {...} }]

   L835: length > 0? → ✅
   L836: toolUseBlocks.push(...msgToolUseBlocks)
         → toolUseBlocks = [Read_block]
   L837: needsFollowUp = true          // ★★★ 决定 while(true) 不会终止

   L840-842: streamingToolExecutor 存在 ✓ && !aborted ✓
   L844-846: for (const toolBlock of msgToolUseBlocks):
             streamingToolExecutor.addTool(Read_block, uuid-2消息)
             // ★★★ 工具开始执行!
             // → StreamingToolExecutor 内部:
             //   isConcurrencySafe = trueRead 是安全的)
             //   queued → processQueue() → canExecuteTool() → true
             //   → executeTool() → runToolUse() → 后台异步读文件

F. L850-854: getCompletedResults()
   → Read 刚开始执行status = 'executing' → 无完成结果

净效果

  • yield 克隆消息(带 backfill 字段)
  • assistantMessages push 原始消息
  • needsFollowUp = true
  • Read 工具在后台异步开始执行

#12 stream_event (message_delta, stop_reason: 'tool_use')

A-C. 同 stream_event 通用路径
D.   yield message  ✅

E.   不是 assistant → 跳过

F. L854: getCompletedResults()
   → ★ 此时 Read 可能已经完成了!(读文件通常 <1ms
   → 如果完成: status = 'completed', results 有值
     L428(StreamingToolExecutor): tool.status = 'yielded'
     L431-432: yield { message: UserMsg(tool_result) }
   → 回到 query.ts:
     L855: result.message 存在
     L856: yield result.message  ✅ → REPL 显示工具结果
     L857-862: toolResults.push(normalizeMessagesForAPI([result.message])...)
               → toolResults = [Read 的 tool_result]

净效果yield stream_event + 可能 yield 工具结果(如果工具已完成)。


#13 stream_event (message_stop)

D. yield message  ✅
F. getCompletedResults()
   → 如果 Read 在 #12 已被收割 → 空
   → 如果 Read 此时才完成 → yield 工具结果(同 #12 的 F 逻辑)

for await 循环退出后

L1018: aborted? → false → 跳过

L1065: if (!needsFollowUp)
       → needsFollowUp = true → 不进入 → 跳过终止逻辑

L1383: toolUpdates = streamingToolExecutor.getRemainingResults()
       → 如果 Read 已在 #12/#13 被收割 → 立即返回空
       → 如果 Read 还没完成 → 阻塞等待 → 完成后 yield 结果

L1387-1404: for await (const update of toolUpdates) {
              yield update.message        → REPL 显示
              toolResults.push(...)        → 收集
            }

L1718-1730: 构建 next State:
  state = {
    messages: [
      ...messagesForQuery,     // [UserMessage("帮我看看...")]
      ...assistantMessages,    // [AssistantMsg(text), AssistantMsg(tool_use)]
      ...toolResults,          // [UserMsg(tool_result)]
    ],
    turnCount: 1,
    transition: { reason: 'next_turn' },
  }
  → continue → while(true) 第 2 次迭代 → 带着工具结果再次调 API

循环体判定树总结

for await (const message of deps.callModel(...)) {
    │
    ├─ message.type === 'stream_event'?
    │   │
    │   └─ YES → 几乎零操作
    │        ├─ yield message透传给 REPL 做实时 UI
    │        └─ getCompletedResults()(顺便检查有没有完成的工具)
    │
    └─ message.type === 'assistant'?
        │
        ├─ B. backfill: 有 tool_use + backfillObservableInput?
        │   ├─ YES → 克隆消息yield 克隆版(原始消息保留给 API
        │   └─ NO  → yield 原始消息
        │
        ├─ C. withheld: prompt_too_long / max_output_tokens?
        │   ├─ YES → 不 yield暂扣等后面恢复逻辑处理
        │   └─ NO  → yield
        │
        ├─ E. assistantMessages.push(原始 message)
        │
        ├─ E. 有 tool_use block?
        │   ├─ YES → toolUseBlocks.push()
        │   │         + needsFollowUp = true
        │   │         + streamingToolExecutor.addTool() → ★ 立即开始执行工具
        │   └─ NO  → 什么都不做
        │
        └─ F. getCompletedResults() → 收割已完成的工具结果
}

一句话总结stream_event 透传不处理assistant 消息才是"真正的货"——收集起来、判断要不要暂扣、有工具就立即开始执行、顺便收割已完成的工具结果。