Appearance
多Agent协作系统
单个Agent能力有限,协作才能解决复杂问题
为什么需要多Agent协作?
单个Agent在处理复杂任务时面临诸多挑战,多Agent协作系统通过专业化分工和协同工作,能够更有效地解决复杂问题。
单Agent的局限性:
────────────────────────────
• 能力瓶颈:一个Agent难以精通所有领域
• 效率问题:串行处理,无法并行
• 容错性差:单点故障风险
• 上下文限制:难以处理超长任务
多Agent的优势:
────────────────────────────
• 专业分工:每个Agent专注擅长领域
• 并行处理:多个Agent同时工作
• 高容错:一个Agent失败可由其他补救
• 可扩展:按需增加Agent数量多Agent协作模式
1. 主从模式(Master-Worker)
最简单的协作模式,主Agent负责任务分配,从Agent负责执行。
主从模式架构:
┌─────────────────────────────────────────┐
│ │
│ 主Agent │
│ (任务分配、结果汇总) │
│ ↓↓↓↓↓ │
│ ┌─────────┬─────────┬─────────┐ │
│ ↓ ↓ ↓ ↓ │
│ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │Worker│ │Worker│ │Worker│ │Worker│ │
│ │ 1 │ │ 2 │ │ 3 │ │ 4 │ │
│ └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘ │
│ └─────────┴─────────┴─────────┘ │
│ ↓↓↓↓↓ │
│ 结果汇总 │
│ │
└─────────────────────────────────────────┘适用场景:
- 批量数据处理
- 并行搜索任务
- 分布式计算
php
<?php
/**
* 主从工作系统类
* 实现主Agent分配任务,从Agent执行任务的模式
*/
class MasterWorkerSystem
{
// 主Agent实例
private $master;
// 工作Agent列表
private array $workers;
/**
* 构造函数
* @param mixed $masterAgent 主Agent实例
* @param array $workerAgents 工作Agent列表
*/
public function __construct($masterAgent, array $workerAgents)
{
// 初始化主Agent
$this->master = $masterAgent;
// 初始化工作Agent列表
$this->workers = $workerAgents;
}
/**
* 处理任务
* @param string $task 任务描述
* @return string 处理结果
*/
public function process(string $task): string
{
// 主Agent分解任务
$subtasks = $this->master->decompose($task);
// 初始化结果数组
$results = [];
// 遍历子任务
foreach ($subtasks as $i => $subtask) {
// 轮询选择工作Agent
$worker = $this->workers[$i % count($this->workers)];
// 执行子任务
$result = $worker->execute($subtask);
// 收集结果
$results[] = $result;
}
// 主Agent聚合结果并返回
return $this->master->aggregate($results);
}
}2. 对等模式(Peer-to-Peer)
所有Agent地位平等,通过协商完成任务。
对等模式架构:
────────────────────────────
Agent A ←──→ Agent B
↑↓ ↑↓
↑↓ ↑↓
Agent D ←──→ Agent C
特点:
• 无中心节点
• 自主协商
• 动态协作适用场景:
- 去中心化决策
- 共识达成
- 资源共享
php
<?php
/**
* 对等Agent类
* 实现无中心节点的对等协作模式
*/
class PeerAgent
{
// Agent名称
private string $name;
// 能力列表
private array $capabilities;
// 对等Agent列表
private array $peers = [];
/**
* 构造函数
* @param string $name Agent名称
* @param array $capabilities 能力列表
*/
public function __construct(string $name, array $capabilities)
{
// 初始化名称
$this->name = $name;
// 初始化能力列表
$this->capabilities = $capabilities;
}
/**
* 添加对等Agent
* @param PeerAgent $peer 对等Agent实例
*/
public function addPeer(PeerAgent $peer): void
{
// 添加到对等列表
$this->peers[] = $peer;
}
/**
* 请求帮助
* @param string $task 任务描述
* @return string|null 执行结果
*/
public function requestHelp(string $task): ?string
{
// 遍历对等Agent
foreach ($this->peers as $peer) {
// 检查是否能处理
if ($peer->canHandle($task)) {
// 执行并返回结果
return $peer->execute($task);
}
}
// 无可用Agent返回null
return null;
}
/**
* 检查是否能处理任务
* @param string $task 任务描述
* @return bool 是否能处理
*/
public function canHandle(string $task): bool
{
// 遍历能力列表
foreach ($this->capabilities as $cap) {
// 检查任务是否包含能力关键词
if (str_contains($task, $cap)) {
return true;
}
}
return false;
}
/**
* 执行任务
* @param string $task 任务描述
* @return string 执行结果
*/
public function execute(string $task): string
{
// 返回执行结果
return "Executed: {$task}";
}
}3. 层级模式(Hierarchical)
分层管理,上层负责决策,下层负责执行。
层级模式架构:
────────────────────────────
┌──────────┐
│ 管理Agent │
└────┬─────┘
│
┌───────────┼───────────┐
↓ ↓ ↓
┌───────┐ ┌───────┐ ┌───────┐
│协调器A│ │协调器B│ │协调器C│
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
┌──┴──┐ ┌──┴──┐ ┌──┴──┐
│执行器│ │执行器│ │执行器│
└─────┘ └─────┘ └─────┘适用场景:
- 大型项目
- 企业级应用
- 复杂决策流程
php
<?php
class HierarchicalSystem
{
private $manager;
private array $coordinators = [];
private array $executors = [];
public function __construct()
{
$this->manager = new ManagerAgent();
$this->coordinators = [
'research' => new CoordinatorAgent('research'),
'analysis' => new CoordinatorAgent('analysis'),
'writing' => new CoordinatorAgent('writing')
];
$this->executors = [
'research' => array_fill(0, 3, new ExecutorAgent()),
'analysis' => array_fill(0, 2, new ExecutorAgent()),
'writing' => array_fill(0, 2, new ExecutorAgent())
];
}
public function process(string $task): string
{
$plan = $this->manager->createPlan($task);
$results = [];
foreach ($plan['phases'] as $phase) {
$coordinator = $this->coordinators[$phase['type']];
$subtasks = $coordinator->split($phase);
$executors = $this->executors[$phase['type']];
$phaseResults = [];
foreach ($subtasks as $i => $subtask) {
$result = $executors[$i % count($executors)]->execute($subtask);
$phaseResults[] = $result;
}
$results[$phase['type']] = $coordinator->merge($phaseResults);
}
return $this->manager->finalize($results);
}
}4. 黑板模式(Blackboard)
多个Agent通过共享黑板进行协作。
黑板模式架构:
────────────────────────────
┌─────────────────────────────────────┐
│ 黑板 (Blackboard) │
│ ┌─────────────────────────────┐ │
│ │ 任务状态 │ │
│ │ 中间结果 │ │
│ │ 知识库 │ │
│ │ 控制信息 │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────┘
↑↓ ↑↓ ↑↓
Agent A Agent B Agent C
工作流程:
1. Agent读取黑板状态
2. 判断是否能贡献
3. 更新黑板内容
4. 重复直到任务完成适用场景:
- 知识密集型任务
- 专家系统
- 复杂问题求解
php
<?php
class Blackboard
{
private array $data = [];
private string $status = 'pending';
private array $history = [];
public function read(?string $key = null): mixed
{
if ($key !== null) {
return $this->data[$key] ?? null;
}
return $this->data;
}
public function write(string $key, mixed $value): void
{
$this->data[$key] = $value;
$this->history[] = [
'action' => 'write',
'key' => $key,
'value' => $value,
'timestamp' => time()
];
}
public function updateStatus(string $status): void
{
$this->status = $status;
}
public function getStatus(): string
{
return $this->status;
}
}
class KnowledgeAgent
{
private string $name;
private string $expertise;
private Blackboard $blackboard;
public function __construct(string $name, string $expertise, Blackboard $blackboard)
{
$this->name = $name;
$this->expertise = $expertise;
$this->blackboard = $blackboard;
}
public function canContribute(): bool
{
$currentState = $this->blackboard->read();
return $this->expertiseInNeeds($currentState);
}
public function contribute(): void
{
if ($this->canContribute()) {
$result = $this->process();
$this->blackboard->write($this->expertise, $result);
}
}
private function expertiseInNeeds(array $state): bool
{
return !isset($state[$this->expertise]);
}
private function process(): mixed
{
return "Result from {$this->name}";
}
}Agent通信机制
消息传递
Agent之间通过消息进行通信,消息包含发送者、接收者、内容和元数据。
php
<?php
enum MessageType: string
{
case REQUEST = 'request';
case RESPONSE = 'response';
case INFORM = 'inform';
case QUERY = 'query';
case COMMAND = 'command';
}
class AgentMessage
{
public string $sender;
public string $receiver;
public MessageType $messageType;
public mixed $content;
public ?string $conversationId = null;
public \DateTime $timestamp;
public function __construct(
string $sender,
string $receiver,
MessageType $messageType,
mixed $content,
?string $conversationId = null
) {
$this->sender = $sender;
$this->receiver = $receiver;
$this->messageType = $messageType;
$this->content = $content;
$this->conversationId = $conversationId;
$this->timestamp = new \DateTime();
}
}
class MessageBus
{
private array $queues = [];
private array $handlers = [];
public function register(string $agentId, callable $handler): void
{
$this->queues[$agentId] = [];
$this->handlers[$agentId] = $handler;
}
public function send(AgentMessage $message): void
{
if (isset($this->queues[$message->receiver])) {
$this->queues[$message->receiver][] = $message;
}
}
public function broadcast(AgentMessage $message, array $exclude = []): void
{
foreach ($this->queues as $agentId => $queue) {
if (!in_array($agentId, $exclude) && $agentId !== $message->sender) {
$this->send(new AgentMessage(
$message->sender,
$agentId,
$message->messageType,
$message->content
));
}
}
}
public function processMessages(): void
{
foreach ($this->queues as $agentId => &$queue) {
while (!empty($queue)) {
$message = array_shift($queue);
call_user_func($this->handlers[$agentId], $message);
}
}
}
}通信协议
标准通信协议:
┌─────────────────────────────────────────┐
│ 字段 │ 说明 │
├─────────────────────────────────────────┤
│ sender │ 发送者ID │
│ receiver │ 接收者ID │
│ type │ 消息类型 │
│ content │ 消息内容 │
│ conversation │ 会话ID(用于追踪对话) │
│ timestamp │ 时间戳 │
│ priority │ 优先级 │
│ ttl │ 消息有效期 │
└─────────────────────────────────────────┘同步与异步通信
php
<?php
class SynchronousCommunication
{
private MessageBus $bus;
public function __construct(MessageBus $bus)
{
$this->bus = $bus;
}
public function request(
string $sender,
string $receiver,
mixed $content,
float $timeout = 30.0
): mixed {
$conversationId = uniqid('conv_', true);
$message = new AgentMessage(
$sender,
$receiver,
MessageType::REQUEST,
$content,
$conversationId
);
$this->bus->send($message);
$startTime = microtime(true);
while (microtime(true) - $startTime < $timeout) {
$response = $this->checkResponse($conversationId);
if ($response !== null) {
return $response->content;
}
usleep(100000);
}
throw new \RuntimeException('Request timed out');
}
private function checkResponse(string $conversationId): ?AgentMessage
{
return null;
}
}
class AsynchronousCommunication
{
private MessageBus $bus;
private array $callbacks = [];
public function __construct(MessageBus $bus)
{
$this->bus = $bus;
}
public function requestAsync(
string $sender,
string $receiver,
mixed $content,
callable $callback
): string {
$conversationId = uniqid('conv_', true);
$this->callbacks[$conversationId] = $callback;
$message = new AgentMessage(
$sender,
$receiver,
MessageType::REQUEST,
$content,
$conversationId
);
$this->bus->send($message);
return $conversationId;
}
public function handleResponse(AgentMessage $response): void
{
if (isset($this->callbacks[$response->conversationId])) {
$callback = $this->callbacks[$response->conversationId];
unset($this->callbacks[$response->conversationId]);
$callback($response);
}
}
}任务协调策略
任务分解
将复杂任务分解为可管理的子任务。
php
<?php
class TaskDecomposer
{
private $llm;
public function __construct($llm)
{
$this->llm = $llm;
}
public function decompose(string $task, array $availableAgents): array
{
$prompt = <<<PROMPT
任务:{$task}
可用的Agent及其能力:
{$this->formatAgents($availableAgents)}
请将任务分解为子任务,每个子任务指定:
1. 子任务描述
2. 负责的Agent类型
3. 依赖的其他子任务
4. 预期输出
以JSON格式输出。
PROMPT;
$response = $this->llm->generate($prompt);
return $this->parseDecomposition($response);
}
private function formatAgents(array $agents): string
{
$lines = [];
foreach ($agents as $agent) {
$lines[] = "- {$agent['type']}: {$agent['capabilities']}";
}
return implode("\n", $lines);
}
private function parseDecomposition(string $response): array
{
$decomposition = json_decode($response, true);
return array_map(function ($task) {
return [
'id' => $task['id'],
'description' => $task['description'],
'agent_type' => $task['agent_type'],
'dependencies' => $task['dependencies'] ?? [],
'expected_output' => $task['expected_output']
];
}, $decomposition['subtasks'] ?? []);
}
}任务调度
php
<?php
class TaskScheduler
{
private array $taskQueue = [];
private array $completed = [];
private array $results = [];
public function addTasks(array $tasks): void
{
foreach ($tasks as $task) {
$this->taskQueue[] = $task;
}
}
public function getReadyTasks(): array
{
$ready = [];
foreach ($this->taskQueue as $task) {
if ($this->dependenciesMet($task)) {
$ready[] = $task;
}
}
return $ready;
}
public function dependenciesMet(array $task): bool
{
foreach ($task['dependencies'] ?? [] as $dep) {
if (!in_array($dep, $this->completed)) {
return false;
}
}
return true;
}
public function markCompleted(string $taskId, mixed $result): void
{
$this->completed[] = $taskId;
$this->results[$taskId] = $result;
}
public function executeParallel(array $agents): void
{
while (!empty($this->taskQueue)) {
$readyTasks = $this->getReadyTasks();
if (empty($readyTasks)) {
if (!empty($this->taskQueue)) {
usleep(100000);
continue;
}
break;
}
$futures = [];
foreach ($readyTasks as $key => $task) {
$index = array_search($task, $this->taskQueue, true);
if ($index !== false) {
array_splice($this->taskQueue, $index, 1);
}
$agent = $agents[$task['agent_type']] ?? null;
if ($agent !== null) {
$futures[$key] = $agent->execute($task);
}
}
foreach ($futures as $key => $result) {
$this->markCompleted($readyTasks[$key]['id'], $result);
}
}
}
}冲突解决
php
<?php
class ConflictResolver
{
private string $strategy;
public function __construct(string $strategy = 'voting')
{
$this->strategy = $strategy;
}
public function resolve(array $results): mixed
{
return match ($this->strategy) {
'voting' => $this->votingResolve($results),
'priority' => $this->priorityResolve($results),
'merge' => $this->mergeResolve($results),
default => $results[0] ?? null
};
}
private function votingResolve(array $results): mixed
{
if (empty($results)) {
return null;
}
if (is_string($results[0])) {
$counts = array_count_values($results);
arsort($counts);
return array_key_first($counts);
}
return $results[0];
}
private function priorityResolve(array $results): mixed
{
if (empty($results)) {
return null;
}
usort($results, function ($a, $b) {
$priorityA = $a['priority'] ?? 0;
$priorityB = $b['priority'] ?? 0;
return $priorityB <=> $priorityA;
});
return $results[0];
}
private function mergeResolve(array $results): mixed
{
if (empty($results)) {
return null;
}
$merged = [];
foreach ($results as $r) {
if (is_array($r)) {
$merged = array_merge($merged, $r);
}
}
return $merged;
}
}协作模式实例
研究团队协作
php
<?php
class ResearchTeam
{
private $llm;
private array $agents = [];
private MessageBus $messageBus;
public function __construct($llm)
{
$this->llm = $llm;
$this->messageBus = new MessageBus();
$this->agents = [
'researcher' => new ResearcherAgent($llm),
'analyst' => new AnalystAgent($llm),
'writer' => new WriterAgent($llm),
'reviewer' => new ReviewerAgent($llm)
];
foreach ($this->agents as $name => $agent) {
$this->messageBus->register($name, [$agent, 'handleMessage']);
}
}
public function conductResearch(string $topic): string
{
$workflow = [
['researcher', 'gather_information', ['topic' => $topic]],
['analyst', 'analyze_data', []],
['writer', 'write_report', []],
['reviewer', 'review_report', []]
];
$context = ['topic' => $topic];
foreach ($workflow as [$agentName, $task, $params]) {
$agent = $this->agents[$agentName];
$params = array_merge($params, $context);
$result = $agent->execute($task, $params);
$context = array_merge($context, $result);
}
return $context['final_report'] ?? '';
}
}
class ResearcherAgent
{
private $llm;
public function __construct($llm)
{
$this->llm = $llm;
}
public function execute(string $task, array $params): array
{
if ($task === 'gather_information') {
return $this->gatherInformation($params['topic']);
}
return [];
}
private function gatherInformation(string $topic): array
{
return [
'raw_data' => "关于{$topic}的收集数据...",
'sources' => ['source1', 'source2']
];
}
public function handleMessage(AgentMessage $message): void
{
// 处理消息
}
}软件开发协作
php
<?php
class DevTeam
{
private $llm;
private Blackboard $blackboard;
private array $agents = [];
public function __construct($llm)
{
$this->llm = $llm;
$this->blackboard = new Blackboard();
$this->agents = [
'architect' => new ArchitectAgent($llm, $this->blackboard),
'developer' => new DeveloperAgent($llm, $this->blackboard),
'tester' => new TesterAgent($llm, $this->blackboard),
'reviewer' => new CodeReviewerAgent($llm, $this->blackboard)
];
}
public function developFeature(string $requirements): array
{
$this->blackboard->write('requirements', $requirements);
$this->blackboard->write('status', 'started');
$maxIterations = 20;
for ($iteration = 0; $iteration < $maxIterations; $iteration++) {
foreach ($this->agents as $agent) {
if ($agent->canContribute()) {
$agent->contribute();
}
}
if ($this->blackboard->getStatus() === 'completed') {
break;
}
}
return [
'code' => $this->blackboard->read('final_code'),
'tests' => $this->blackboard->read('test_results'),
'review' => $this->blackboard->read('review_comments')
];
}
}性能优化
并行执行优化
php
<?php
use React\Promise\Promise;
use React\EventLoop\Loop;
class ParallelCoordinator
{
private int $maxWorkers;
public function __construct(int $maxWorkers = 4)
{
$this->maxWorkers = $maxWorkers;
}
public function executeParallel(array $agents, array $tasks): array
{
$results = [];
$chunks = array_chunk(array_map(null, $agents, $tasks), $this->maxWorkers);
foreach ($chunks as $chunk) {
$promises = [];
foreach ($chunk as [$agent, $task]) {
$promises[] = $this->executeAsync($agent, $task);
}
foreach ($promises as $promise) {
$results[] = $promise->wait();
}
}
return $results;
}
private function executeAsync($agent, $task): Promise
{
return new Promise(function ($resolve) use ($agent, $task) {
Loop::addTimer(0, function () use ($agent, $task, $resolve) {
$result = $agent->execute($task);
$resolve($result);
});
});
}
}资源管理
php
<?php
class ResourceManager
{
private int $maxConcurrent;
private int $currentCount = 0;
private array $resourcePool = [];
private array $waitQueue = [];
public function __construct(int $maxConcurrent = 10)
{
$this->maxConcurrent = $maxConcurrent;
}
public function acquire(string $resourceType): mixed
{
while ($this->currentCount >= $this->maxConcurrent) {
usleep(10000);
}
$this->currentCount++;
return $this->resourcePool[$resourceType] ?? null;
}
public function release(string $resourceType): void
{
$this->currentCount = max(0, $this->currentCount - 1);
}
public function executeWithResource($agent, $task, string $resourceType): mixed
{
$resource = $this->acquire($resourceType);
try {
return $agent->execute($task, $resource);
} finally {
$this->release($resourceType);
}
}
}监控与调试
协作追踪
php
<?php
class CollaborationTracker
{
private array $events = [];
private array $agentStates = [];
public function logEvent(string $eventType, string $agent, array $details): void
{
$this->events[] = [
'timestamp' => new \DateTime(),
'type' => $eventType,
'agent' => $agent,
'details' => $details
];
}
public function logMessage(string $sender, string $receiver, string $content): void
{
$this->logEvent('message', $sender, [
'receiver' => $receiver,
'content' => mb_substr($content, 0, 100)
]);
}
public function logTaskStart(string $agent, string $task): void
{
$this->logEvent('task_start', $agent, ['task' => $task]);
}
public function logTaskComplete(string $agent, string $task, string $result): void
{
$this->logEvent('task_complete', $agent, [
'task' => $task,
'result' => mb_substr($result, 0, 100)
]);
}
public function getAgentTimeline(string $agent): array
{
return array_filter($this->events, fn($e) => $e['agent'] === $agent);
}
public function getCollaborationGraph(): array
{
$graph = [];
foreach ($this->events as $event) {
if ($event['type'] === 'message') {
$sender = $event['agent'];
$receiver = $event['details']['receiver'];
if (!isset($graph[$sender])) {
$graph[$sender] = [];
}
if (!isset($graph[$sender][$receiver])) {
$graph[$sender][$receiver] = 0;
}
$graph[$sender][$receiver]++;
}
}
return $graph;
}
}最佳实践
设计原则
1. 清晰的角色定义
────────────────────────────
每个Agent应该有明确的职责边界:
• 研究Agent:负责信息收集
• 分析Agent:负责数据处理
• 写作Agent:负责内容生成
2. 最小化通信
────────────────────────────
减少不必要的消息传递:
• 只在必要时通信
• 批量发送消息
• 使用黑板共享状态
3. 容错设计
────────────────────────────
处理Agent失败的情况:
• 超时机制
• 重试策略
• 备用Agent
4. 可观测性
────────────────────────────
能够追踪协作过程:
• 详细日志
• 状态监控
• 性能指标常见陷阱
避免这些陷阱
- 过度协作:简单任务不需要多Agent
- 通信爆炸:消息过多导致性能下降
- 死锁:循环依赖导致任务无法推进
- 状态不一致:共享状态管理混乱
- 忽略失败:没有处理Agent失败的情况
学习检验
概念理解
- 多Agent协作的四种主要模式各有什么特点?
- 如何设计Agent之间的通信机制?
- 任务调度器如何处理任务依赖关系?
实践任务
- 设计一个三人研究团队的协作系统
- 实现一个基于黑板的协作模式
- 添加协作追踪和可视化功能
下一步学习
💡 记住:多Agent协作的关键是清晰的分工、高效的通信和可靠的协调。
