跳转至

第 7 篇:多 Agent 协作系统

学习目标

  • 理解多 Agent 协作的架构设计
  • 掌握 Agent 生成和通信机制
  • 学习协调器模式(Coordinator Mode)的实现
  • 了解工具过滤和权限隔离

7.1 多 Agent 架构概述

架构层次

flowchart TB
    UI["用户界面层<br/>(PromptInput / REPL)"]
    Coordinator["协调器 (Coordinator)<br/>- 接收用户请求<br/>- 规划和分配任务<br/>- 合成工人结果<br/>- 与用户通信"]
    Worker1["Worker 1<br/>(研究)"]
    Worker2["Worker 2<br/>(实现)"]
    Worker3["Worker 3<br/>(测试)"]

    UI --> Coordinator
    Coordinator --> Worker1
    Coordinator --> Worker2
    Coordinator --> Worker3

Agent 类型

// 来自 src/tools/AgentTool/constants.ts
export const AGENT_TOOL_NAME = 'Agent'

// Agent 工具允许生成的 Agent 类型
export const ALLOWED_AGENT_TYPES = [
  'general-purpose',  // 通用 Agent
  'worker',           // 工人 Agent(协调器模式)
  'researcher',       // 研究 Agent
  'reviewer',         // 审查 Agent
] as const

7.2 Agent 工具定义

输入输出 Schema

// 来自 src/tools/AgentTool/AgentTool.tsx
import { z } from 'zod/v4'
import { lazySchema } from '../../utils/lazySchema.js'

// 基础输入 Schema
const baseInputSchema = lazySchema(() => z.object({
  description: z.string().describe('A short (3-5 word) description of the task'),
  prompt: z.string().describe('The task for the agent to perform'),
  subagent_type: z.string().optional()
    .describe('The type of specialized agent to use'),
  model: z.enum(['sonnet', 'opus', 'haiku']).optional()
    .describe('Optional model override for this agent'),
  run_in_background: z.boolean().optional()
    .describe('Set to true to run this agent in the background'),
}))

// 多 Agent 参数(KAIROS 特性)
const multiAgentInputSchema = z.object({
  name: z.string().optional()
    .describe('Name for the spawned agent. Makes it addressable via SendMessage'),
  team_name: z.string().optional()
    .describe('Team name for spawning'),
  mode: permissionModeSchema().optional()
    .describe('Permission mode for spawned teammate'),
})

// 完整 Schema
const fullInputSchema = lazySchema(() =>
  baseInputSchema().merge(multiAgentInputSchema).extend({
    isolation: z.enum(['worktree', 'remote']).optional()
      .describe('Isolation mode'),
    cwd: z.string().optional()
      .describe('Absolute path to run the agent in'),
  })
)

Agent 工具核心逻辑

// 来自 src/tools/AgentTool/AgentTool.tsx — 简化的 call 方法
export const AgentTool = buildTool({
  name: AGENT_TOOL_NAME,

  async call(input, context, canUseTool, parentMessage, onProgress) {
    const appState = context.getAppState()
    const agentId = createAgentId()

    // 1. 解析 Agent 类型
    const agentType = input.subagent_type || 'general-purpose'
    const isForkSubagent = isForkSubagentEnabled() && agentType === FORK_AGENT

    // 2. 获取 Agent 定义
    const agentDefinitions = await getAgentDefinitionsWithOverrides(
      appState.agentDefinitions,
      input.subagent_type,
    )
    const agentDefinition = agentDefinitions.find(
      a => a.agentType === agentType
    ) || GENERAL_PURPOSE_AGENT

    // 3. 解析 Agent 工具列表
    const resolvedTools = resolveAgentTools(
      agentDefinition,
      context.options.tools,
      isAsync = true,
    )

    // 4. 构建系统提示
    const systemPrompt = await buildAgentSystemPrompt(
      agentDefinition,
      resolvedTools.resolvedTools,
      context,
    )

    // 5. 构建消息历史
    const messages = isForkSubagent
      ? buildForkedMessages(context.messages, agentDefinition)
      : [createUserMessage(input.prompt)]

    // 6. 运行 Agent
    const result = await runAsyncAgentLifecycle(
      {
        agentId,
        agentType: agentDefinition.agentType,
        model: getAgentModel(agentDefinition, input.model, appState),
        systemPrompt,
        messages,
        tools: resolvedTools.resolvedTools,
      },
      {
        context,
        input,
        parentMessage,
        onProgress,
      },
    )

    return { data: result }
  },
})

7.3 工具过滤机制

Agent 工具过滤

// 来自 src/tools/AgentTool/agentToolUtils.ts
const ALL_AGENT_DISALLOWED_TOOLS = new Set([
  'TaskCreateTool',
  'TaskUpdateTool',
  'TaskListTool',
  // ... 不允许 Agent 使用的工具
])

const CUSTOM_AGENT_DISALLOWED_TOOLS = new Set([
  'WebBrowserTool',
  // ... 不允许自定义 Agent 使用的工具
])

const ASYNC_AGENT_ALLOWED_TOOLS = new Set([
  'Bash',
  'FileRead',
  'FileEdit',
  'FileWrite',
  'Grep',
  'Glob',
  'Agent',  // 允许生成子 Agent
  // ... 允许异步 Agent 使用的工具
])

/**
 * 过滤 Agent 可用的工具
 */
export function filterToolsForAgent({
  tools,
  isBuiltIn,
  isAsync = false,
  permissionMode,
}: {
  tools: Tools
  isBuiltIn: boolean
  isAsync?: boolean
  isAsync?: boolean
  permissionMode?: PermissionMode
}): Tools {
  return tools.filter(tool => {
    // 允许 MCP 工具
    if (tool.name.startsWith('mcp__')) {
      return true
    }

    // 允许计划模式下的 ExitPlanMode
    if (
      toolMatchesName(tool, EXIT_PLAN_MODE_V2_TOOL_NAME) &&
      permissionMode === 'plan'
    ) {
      return true
    }

    // 所有 Agent 禁止的工具
    if (ALL_AGENT_DISALLOWED_TOOLS.has(tool.name)) {
      return false
    }

    // 自定义 Agent 额外禁止的工具
    if (!isBuiltIn && CUSTOM_AGENT_DISALLOWED_TOOLS.has(tool.name)) {
      return false
    }

    // 异步 Agent 只允许特定工具
    if (isAsync && !ASYNC_AGENT_ALLOWED_TOOLS.has(tool.name)) {
      // 允许在进程队友生成同步子 Agent
      if (isAgentSwarmsEnabled() && isInProcessTeammate()) {
        if (toolMatchesName(tool, AGENT_TOOL_NAME)) {
          return true
        }
      }
      return false
    }

    return true
  })
}

工具解析和验证

// 来自 src/tools/AgentTool/agentToolUtils.ts
/**
 * 解析和验证 Agent 工具配置
 * 支持通配符展开和权限模式匹配
 */
export function resolveAgentTools(
  agentDefinition: Pick<AgentDefinition, 'tools' | 'disallowedTools' | 'source'>,
  availableTools: Tools,
  isAsync = false,
): ResolvedAgentTools {
  const { tools: agentTools, disallowedTools, source } = agentDefinition

  // 1. 过滤可用工具
  const filteredAvailableTools = filterToolsForAgent({
    tools: availableTools,
    isBuiltIn: source === 'built-in',
    isAsync,
  })

  // 2. 构建拒绝工具集合
  const disallowedToolSet = new Set(
    disallowedTools?.map(toolSpec => {
      const { toolName } = permissionRuleValueFromString(toolSpec)
      return toolName
    }) ?? []
  )

  // 3. 过滤拒绝的工具
  const allowedAvailableTools = filteredAvailableTools.filter(
    tool => !disallowedToolSet.has(tool.name),
  )

  // 4. 处理通配符
  const hasWildcard =
    agentTools === undefined ||
    (agentTools.length === 1 && agentTools[0] === '*')

  if (hasWildcard) {
    return {
      hasWildcard: true,
      validTools: [],
      invalidTools: [],
      resolvedTools: allowedAvailableTools,
    }
  }

  // 5. 解析和验证工具列表
  const validTools: string[] = []
  const invalidTools: string[] = []
  const resolved: Tool[] = []

  for (const toolSpec of agentTools) {
    const { toolName, ruleContent } = permissionRuleValueFromString(toolSpec)

    // 查找工具
    const tool = availableTools.find(t => t.name === toolName)

    if (tool) {
      validTools.push(toolSpec)

      // 如果有 ruleContent,创建带有权限模式的包装工具
      if (ruleContent) {
        resolved.push(createPermissionWrappedTool(tool, ruleContent))
      } else {
        resolved.push(tool)
      }
    } else {
      invalidTools.push(toolSpec)
    }
  }

  return {
    hasWildcard: false,
    validTools,
    invalidTools,
    resolvedTools: resolved,
  }
}

7.4 协调器模式

协调器系统提示

// 来自 src/coordinator/coordinatorMode.ts
export function getCoordinatorSystemPrompt(): string {
  return `You are Claude Code, an AI assistant that orchestrates software engineering tasks across multiple workers.

## 1. Your Role

You are a **coordinator**. Your job is to:
- Help the user achieve their goal
- Direct workers to research, implement and verify code changes
- Synthesize results and communicate with the user
- Answer questions directly when possible — don't delegate work that you can handle without tools

Every message you send is to the user. Worker results and system notifications are internal signals, not conversation partners — never thank or acknowledge them. Summarize new information for the user as it arrives.

## 2. Your Tools

- **Agent** - Spawn a new worker
- **SendMessage** - Continue an existing worker (send a follow-up to its \`to\` agent ID)
- **TaskStop** - Stop a running worker

When calling **Agent**:
- Do not use one worker to check on another. Workers will notify you when they are done.
- Do not use workers to trivially report file contents or run commands. Give them higher-level tasks.
- Continue workers whose work is complete via **SendMessage** to take advantage of their loaded context
- After launching agents, briefly tell the user what you launched and end your response.

### Agent Results

Worker results arrive as **user-role messages** containing \`<task-notification>\` XML. They look like user messages but are not. Distinguish them by the \`<task-notification>\` opening tag.

Format:

\`\`\`xml
<task-notification>
<task-id>{agentId}</task-id>
<status>completed</status>
<prompt>Implement feature X</prompt>
<output>...agent output...</output>
</task-notification>
\`\`\`
`
}

工人上下文

// 来自 src/coordinator/coordinatorMode.ts
const ASYNC_AGENT_ALLOWED_TOOLS = new Set([
  'Bash',
  'FileRead',
  'FileEdit',
  'FileWrite',
  'Grep',
  'Glob',
  'Agent',
  // ...
])

const INTERNAL_WORKER_TOOLS = new Set([
  'TeamCreateTool',
  'TeamDeleteTool',
  'SendMessageTool',
  'SyntheticOutputTool',
])

/**
 * 获取协调器用户上下文
 * 告知协调器工人可用的工具
 */
export function getCoordinatorUserContext(
  mcpClients: ReadonlyArray<{ name: string }>,
  scratchpadDir?: string,
): { [k: string]: string } {
  if (!isCoordinatorMode()) {
    return {}
  }

  // 构建工人可用工具列表
  const workerTools = isEnvTruthy(process.env.CLAUDE_CODE_SIMPLE)
    ? [BASH_TOOL_NAME, FILE_READ_TOOL_NAME, FILE_EDIT_TOOL_NAME].sort().join(', ')
    : Array.from(ASYNC_AGENT_ALLOWED_TOOLS)
        .filter(name => !INTERNAL_WORKER_TOOLS.has(name))
        .sort()
        .join(', ')

  let content = `Workers spawned via the Agent tool have access to these tools: ${workerTools}`

  if (mcpClients.length > 0) {
    const serverNames = mcpClients.map(c => c.name).join(', ')
    content += `\n\nWorkers also have access to MCP tools from connected MCP servers: ${serverNames}`
  }

  if (scratchpadDir && isScratchpadGateEnabled()) {
    content += `\n\nScratchpad directory: ${scratchpadDir}\nWorkers can read and write here without permission prompts.`
  }

  return { workerToolsContext: content }
}

7.5 Agent 生命周期管理

Agent 任务注册

// 来自 src/tasks/LocalAgentTask/LocalAgentTask.ts
/**
 * 注册异步 Agent 任务
 */
export async function registerAsyncAgent(
  agentId: AgentId,
  params: {
    type: 'local_agent' | 'remote_agent'
    prompt: string
    model: string
    agentType: string
  },
): Promise<void> {
  const appState = getAppState()

  // 创建任务状态
  const taskState: TaskState = {
    id: agentId,
    type: params.type,
    status: 'pending',
    description: params.prompt,
    startTime: Date.now(),
    outputFile: getTaskOutputPath(agentId),
    outputOffset: 0,
  }

  // 更新 AppState
  appState.setAppState(prev => ({
    ...prev,
    tasks: {
      ...prev.tasks,
      [agentId]: taskState,
    },
  }))
}

/**
 * 更新 Agent 进度
 */
export async function updateAgentProgress(
  agentId: AgentId,
  update: {
    status?: TaskStatus
    message?: string
    output?: string
  },
): Promise<void> {
  const appState = getAppState()
  const task = appState.getState().tasks[agentId]

  if (!task) {
    throw new Error(`Agent task not found: ${agentId}`)
  }

  appState.setAppState(prev => ({
    ...prev,
    tasks: {
      ...prev.tasks,
      [agentId]: {
        ...task,
        status: update.status ?? task.status,
        description: update.message ?? task.description,
      },
    },
  }))
}

/**
 * 完成 Agent 任务
 */
export async function completeAgentTask(
  agentId: AgentId,
  result: {
    output: string
    tokenUsage: TokenUsage
  },
): Promise<void> {
  const appState = getAppState()
  const task = appState.getState().tasks[agentId]

  if (!task) {
    throw new Error(`Agent task not found: ${agentId}`)
  }

  // 更新状态为完成
  appState.setAppState(prev => ({
    ...prev,
    tasks: {
      ...prev.tasks,
      [agentId]: {
        ...task,
        status: 'completed',
        endTime: Date.now(),
        description: result.output,
      },
    },
  }))

  // 发送通知
  await enqueueAgentNotification({
    agentId,
    type: 'completed',
    output: result.output,
  })

  // 记录使用量
  logEvent('agent_completed', {
    agentId,
    inputTokens: result.tokenUsage.inputTokens,
    outputTokens: result.tokenUsage.outputTokens,
    cost: calculateCost(result.tokenUsage),
  })
}

Agent 消息处理

// 来自 src/tasks/LocalAgentTask/LocalAgentTask.ts
/**
 * 从 Agent 消息更新进度
 */
export function updateProgressFromMessage(
  agentId: AgentId,
  message: Message,
  tracker: ProgressTracker,
): void {
  if (message.type === 'assistant') {
    // 更新 token 计数
    const usage = message.message.usage
    if (usage) {
      tracker.inputTokens += usage.inputTokens ?? 0
      tracker.outputTokens += usage.outputTokens ?? 0
    }

    // 检查是否有工具调用
    if (message.toolUses?.length) {
      tracker.toolCallCount += message.toolUses.length
    }
  }

  // 更新活动描述
  const activityResolver = createActivityDescriptionResolver(message)
  if (activityResolver) {
    tracker.activityDescription = activityResolver()
  }
}

7.6 Agent 通信机制

SendMessage 工具

// 来自 src/tools/SendMessageTool/SendMessageTool.ts
export const SendMessageTool = buildTool({
  name: SEND_MESSAGE_TOOL_NAME,
  description: () => 'Send a message to a running or paused agent',

  get inputSchema() {
    return z.object({
      to: z.string().describe('The agent ID or name to send the message to'),
      message: z.string().describe('The message to send'),
    })
  },

  async call({ to, message }, context) {
    const appState = context.getAppState()

    // 查找目标 Agent
    let targetAgentId: AgentId
    if (appState.agentNameRegistry.has(to)) {
      targetAgentId = appState.agentNameRegistry.get(to)!
    } else if (isValidAgentId(to)) {
      targetAgentId = to as AgentId
    } else {
      throw new Error(`Agent not found: ${to}`)
    }

    // 检查 Agent 状态
    const task = appState.tasks[targetAgentId]
    if (!task) {
      throw new Error(`Task not found for agent: ${targetAgentId}`)
    }

    // 发送消息
    await sendMessageToAgent(targetAgentId, message)

    return {
      data: {
        success: true,
        agentId: targetAgentId,
      },
    }
  },
})

任务通知

// 来自 src/utils/task/sdkProgress.ts
/**
 * 发送任务进度事件
 */
export function emitTaskProgress(
  agentId: AgentId,
  progress: {
    type: 'launched' | 'completed' | 'failed' | 'output'
    output?: string
    error?: string
  },
): void {
  const event: TaskProgressEvent = {
    type: 'task_progress',
    taskId: agentId,
    timestamp: Date.now(),
    ...progress,
  }

  // 通过 SDK 事件队列发送
  enqueueSdkEvent(event)
}

7.7 Fork 子 Agent

Fork 机制

// 来自 src/tools/AgentTool/forkSubagent.ts
export const FORK_AGENT = 'fork'

/**
 * 构建 Fork 消息
 * Fork 子 Agent 共享父级的消息历史
 */
export function buildForkedMessages(
  parentMessages: Message[],
  agentDefinition: AgentDefinition,
): Message[] {
  // 1. 复制父级消息
  const forkedMessages = parentMessages.map(msg => ({
    ...msg,
    uuid: randomUUID(),  // 新 UUID
    forkedFrom: msg.uuid,  // 记录来源
  }))

  // 2. 添加 Fork 通知
  forkedMessages.push({
    type: 'system',
    subtype: 'fork_notice',
    content: `Forked to ${agentDefinition.agentType} agent`,
    uuid: randomUUID(),
    timestamp: Date.now(),
  })

  return forkedMessages
}

/**
 * 检查是否启用 Fork 子 Agent
 */
export function isForkSubagentEnabled(): boolean {
  return getFeatureValue_CACHED_MAY_BE_STALE('tengu_fork_subagent', false)
}

7.8 Agent 摘要和归档

// 来自 src/services/AgentSummary/agentSummary.ts
/**
 * 启动 Agent 摘要生成
 * 用于压缩长对话历史
 */
export async function startAgentSummarization(
  agentId: AgentId,
  messages: Message[],
): Promise<string> {
  // 使用小型快速模型生成摘要
  const summary = await generateSummary(messages, {
    model: 'claude-haiku-4-5',
    maxTokens: 1000,
  })

  // 保存摘要
  await saveAgentSummary(agentId, summary)

  return summary
}

7.9 关键代码位置索引

功能 文件路径 关键函数/类型
Agent 工具 src/tools/AgentTool/AgentTool.tsx AgentTool.call()
工具过滤 src/tools/AgentTool/agentToolUtils.ts filterToolsForAgent, resolveAgentTools
协调器模式 src/coordinator/coordinatorMode.ts getCoordinatorSystemPrompt
Agent 任务 src/tasks/LocalAgentTask/LocalAgentTask.ts registerAsyncAgent, completeAgentTask
Agent 定义 src/tools/AgentTool/loadAgentsDir.js getAgentDefinitionsWithOverrides
Fork 子 Agent src/tools/AgentTool/forkSubagent.ts buildForkedMessages
消息发送 src/tools/SendMessageTool/SendMessageTool.ts SendMessageTool.call()

课后练习

  1. 阅读代码
  2. 打开 src/tools/AgentTool/agentToolUtils.ts,查看 filterToolsForAgent 函数
  3. 打开 src/coordinator/coordinatorMode.ts,阅读协调器系统提示
  4. 打开 src/tasks/LocalAgentTask/LocalAgentTask.ts,了解任务生命周期

  5. 思考问题

  6. 为什么异步 Agent 只能使用有限的工具集?
  7. 协调器模式如何避免工具爆炸?
  8. Fork 子 Agent 相比普通子 Agent 有什么优势?

  9. 实践

  10. 设计一个多 Agent 协作流程(如:研究→实现→测试)
  11. 定义自定义 Agent 类型和可用工具

下一步第 8 篇 — 实战:构建你自己的 Agent