Appearance
事件驱动架构
概述
事件驱动架构(Event-Driven Architecture,EDA)是一种以事件为核心的设计模式,系统组件通过产生、检测和消费事件进行通信。RabbitMQ 作为事件总线,实现服务间的松耦合通信,提升系统的可扩展性和响应能力。
业务背景与需求
场景描述
某电商平台采用微服务架构,服务间通信需求:
| 服务 | 产生事件 | 消费事件 |
|---|---|---|
| 用户服务 | 用户注册、信息变更 | 订单创建、积分变更 |
| 订单服务 | 订单创建、状态变更 | 用户验证、库存扣减 |
| 支付服务 | 支付成功、退款 | 订单状态更新 |
| 库存服务 | 库存预警、补货 | 订单创建、退货 |
| 通知服务 | - | 所有业务事件 |
需求分析
| 需求项 | 描述 |
|---|---|
| 松耦合 | 服务间无直接依赖 |
| 异步通信 | 非阻塞式消息传递 |
| 事件溯源 | 支持事件回放和审计 |
| 可扩展 | 新服务可随时订阅事件 |
| 可靠性 | 事件不丢失,保证送达 |
架构设计
整体架构图
mermaid
graph TB
subgraph "事件生产者"
A[用户服务]
B[订单服务]
C[支付服务]
D[库存服务]
end
subgraph "事件总线 - RabbitMQ"
E[事件交换机<br/>event.exchange]
subgraph "事件通道"
F[用户事件通道<br/>user.events]
G[订单事件通道<br/>order.events]
H[支付事件通道<br/>payment.events]
I[库存事件通道<br/>inventory.events]
end
J[事件存储<br/>event.store]
end
subgraph "事件消费者"
K[通知服务]
L[分析服务]
M[搜索服务]
N[审计服务]
end
subgraph "事件管理"
O[事件注册表]
P[Schema注册]
Q[事件监控]
end
A --> E
B --> E
C --> E
D --> E
E --> F
E --> G
E --> H
E --> I
F --> J
G --> J
H --> J
I --> J
F --> K
F --> L
G --> K
G --> M
H --> K
I --> K
I --> N
O --> E
P --> E
Q --> J事件流转流程
mermaid
sequenceDiagram
participant Service as 业务服务
participant EventBus as 事件总线
participant Store as 事件存储
participant Consumer as 事件消费者
participant Monitor as 监控系统
Service->>Service: 执行业务逻辑
Service->>EventBus: 发布领域事件
EventBus->>Store: 持久化事件
EventBus->>EventBus: 路由到订阅者
par 并行投递
EventBus->>Consumer: 投递给消费者A
EventBus->>Consumer: 投递给消费者B
end
Consumer->>Consumer: 处理事件
Consumer-->>EventBus: ACK确认
EventBus->>Monitor: 记录事件指标
alt 处理失败
Consumer-->>EventBus: NACK
EventBus->>EventBus: 重试或进入死信
end事件生命周期
mermaid
stateDiagram-v2
[*] --> created: 创建事件
created --> published: 发布到总线
published --> persisted: 持久化
persisted --> routed: 路由分发
routed --> delivered: 投递消费者
delivered --> processing: 开始处理
processing --> completed: 处理成功
processing --> failed: 处理失败
failed --> retrying: 重试中
retrying --> delivered: 重新投递
retrying --> dead_lettered: 重试耗尽
completed --> [*]
dead_lettered --> [*]
note right of created: 事件生成
note right of published: 进入总线
note right of delivered: 到达消费者
note right of completed: 处理完成PHP 代码实现
事件基类定义
php
<?php
namespace App\Event;
abstract class DomainEvent
{
public string $eventId;
public string $eventType;
public string $aggregateType;
public string $aggregateId;
public int $version;
public int $timestamp;
public array $metadata;
public function __construct(
string $aggregateType,
string $aggregateId,
int $version = 1
) {
$this->eventId = $this->generateEventId();
$this->eventType = $this->getEventType();
$this->aggregateType = $aggregateType;
$this->aggregateId = $aggregateId;
$this->version = $version;
$this->timestamp = time();
$this->metadata = [
'source' => get_class($this),
'correlation_id' => $this->generateCorrelationId(),
];
}
private function generateEventId(): string
{
return sprintf('evt_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
}
private function generateCorrelationId(): string
{
return bin2hex(random_bytes(16));
}
abstract protected function getEventType(): string;
abstract public function getPayload(): array;
public function addMetadata(string $key, $value): self
{
$this->metadata[$key] = $value;
return $this;
}
public function toArray(): array
{
return [
'event_id' => $this->eventId,
'event_type' => $this->eventType,
'aggregate_type' => $this->aggregateType,
'aggregate_id' => $this->aggregateId,
'version' => $this->version,
'timestamp' => $this->timestamp,
'metadata' => $this->metadata,
'payload' => $this->getPayload(),
];
}
public function toJson(): string
{
return json_encode($this->toArray(), JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
}
}具体事件定义
php
<?php
namespace App\Event;
class UserRegisteredEvent extends DomainEvent
{
private string $userId;
private string $email;
private string $name;
public function __construct(string $userId, string $email, string $name)
{
parent::__construct('user', $userId, 1);
$this->userId = $userId;
$this->email = $email;
$this->name = $name;
}
protected function getEventType(): string
{
return 'user.registered';
}
public function getPayload(): array
{
return [
'user_id' => $this->userId,
'email' => $this->email,
'name' => $this->name,
];
}
}
class OrderCreatedEvent extends DomainEvent
{
private string $orderId;
private string $userId;
private array $items;
private float $totalAmount;
public function __construct(
string $orderId,
string $userId,
array $items,
float $totalAmount,
int $version = 1
) {
parent::__construct('order', $orderId, $version);
$this->orderId = $orderId;
$this->userId = $userId;
$this->items = $items;
$this->totalAmount = $totalAmount;
}
protected function getEventType(): string
{
return 'order.created';
}
public function getPayload(): array
{
return [
'order_id' => $this->orderId,
'user_id' => $this->userId,
'items' => $this->items,
'total_amount' => $this->totalAmount,
];
}
}
class PaymentCompletedEvent extends DomainEvent
{
private string $paymentId;
private string $orderId;
private float $amount;
private string $paymentMethod;
public function __construct(
string $paymentId,
string $orderId,
float $amount,
string $paymentMethod
) {
parent::__construct('payment', $paymentId, 1);
$this->paymentId = $paymentId;
$this->orderId = $orderId;
$this->amount = $amount;
$this->paymentMethod = $paymentMethod;
}
protected function getEventType(): string
{
return 'payment.completed';
}
public function getPayload(): array
{
return [
'payment_id' => $this->paymentId,
'order_id' => $this->orderId,
'amount' => $this->amount,
'payment_method' => $this->paymentMethod,
];
}
}
class InventoryDeductedEvent extends DomainEvent
{
private string $productId;
private string $skuId;
private int $quantity;
private int $remainingStock;
public function __construct(
string $productId,
string $skuId,
int $quantity,
int $remainingStock
) {
parent::__construct('inventory', $productId, 1);
$this->productId = $productId;
$this->skuId = $skuId;
$this->quantity = $quantity;
$this->remainingStock = $remainingStock;
}
protected function getEventType(): string
{
return 'inventory.deducted';
}
public function getPayload(): array
{
return [
'product_id' => $this->productId,
'sku_id' => $this->skuId,
'quantity' => $this->quantity,
'remaining_stock' => $this->remainingStock,
];
}
}事件总线实现
php
<?php
namespace App\Event;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class EventBus
{
private AMQPStreamConnection $connection;
private $channel;
private EventStore $eventStore;
private string $exchangeName = 'event.exchange';
private array $eventRouting = [];
public function __construct(
AMQPStreamConnection $connection,
EventStore $eventStore
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->eventStore = $eventStore;
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::TOPIC,
false,
true,
false
);
$this->setupEventChannels();
}
private function setupEventChannels(): void
{
$channels = [
'user.events' => ['user.*'],
'order.events' => ['order.*'],
'payment.events' => ['payment.*'],
'inventory.events' => ['inventory.*'],
'all.events' => ['#'],
];
foreach ($channels as $queueName => $routingKeys) {
$args = [
'x-message-ttl' => ['I', 604800000],
'x-max-length' => ['I', 1000000],
];
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
$args
);
foreach ($routingKeys as $routingKey) {
$this->channel->queue_bind(
$queueName,
$this->exchangeName,
$routingKey
);
}
}
}
public function publish(DomainEvent $event): bool
{
$this->eventStore->append($event);
$routingKey = $this->getRoutingKey($event);
$message = new AMQPMessage(
$event->toJson(),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => $event->eventId,
'timestamp' => $event->timestamp,
'type' => $event->eventType,
'headers' => [
'aggregate_type' => $event->aggregateType,
'aggregate_id' => $event->aggregateId,
'version' => $event->version,
'correlation_id' => $event->metadata['correlation_id'] ?? '',
],
]
);
$this->channel->basic_publish(
$message,
$this->exchangeName,
$routingKey
);
return true;
}
public function publishBatch(array $events): int
{
$count = 0;
foreach ($events as $event) {
if ($event instanceof DomainEvent) {
$this->publish($event);
$count++;
}
}
return $count;
}
private function getRoutingKey(DomainEvent $event): string
{
return sprintf('%s.%s', $event->aggregateType, $event->eventType);
}
public function subscribe(
string $queueName,
string $eventType,
callable $handler
): void {
$this->eventRouting[$queueName][$eventType] = $handler;
}
public function close(): void
{
if ($this->channel) {
$this->channel->close();
}
}
}事件存储实现
php
<?php
namespace App\Event;
class EventStore
{
private $pdo;
public function __construct(\PDO $pdo)
{
$this->pdo = $pdo;
$this->initTable();
}
private function initTable(): void
{
$sql = "
CREATE TABLE IF NOT EXISTS event_store (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
event_id VARCHAR(64) NOT NULL UNIQUE,
event_type VARCHAR(128) NOT NULL,
aggregate_type VARCHAR(64) NOT NULL,
aggregate_id VARCHAR(64) NOT NULL,
version INT NOT NULL,
payload JSON NOT NULL,
metadata JSON,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_aggregate (aggregate_type, aggregate_id),
INDEX idx_event_type (event_type),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
";
$this->pdo->exec($sql);
}
public function append(DomainEvent $event): bool
{
$sql = "
INSERT INTO event_store
(event_id, event_type, aggregate_type, aggregate_id, version, payload, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)
";
$stmt = $this->pdo->prepare($sql);
return $stmt->execute([
$event->eventId,
$event->eventType,
$event->aggregateType,
$event->aggregateId,
$event->version,
json_encode($event->getPayload()),
json_encode($event->metadata),
]);
}
public function getEvents(string $aggregateType, string $aggregateId): array
{
$sql = "
SELECT * FROM event_store
WHERE aggregate_type = ? AND aggregate_id = ?
ORDER BY version ASC
";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([$aggregateType, $aggregateId]);
return $stmt->fetchAll(\PDO::FETCH_ASSOC);
}
public function getEventsByType(string $eventType, int $limit = 100): array
{
$sql = "
SELECT * FROM event_store
WHERE event_type = ?
ORDER BY created_at DESC
LIMIT ?
";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([$eventType, $limit]);
return $stmt->fetchAll(\PDO::FETCH_ASSOC);
}
public function getEventsAfter(int $timestamp, int $limit = 1000): array
{
$sql = "
SELECT * FROM event_store
WHERE created_at > FROM_UNIXTIME(?)
ORDER BY created_at ASC
LIMIT ?
";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([$timestamp, $limit]);
return $stmt->fetchAll(\PDO::FETCH_ASSOC);
}
}事件处理器实现
php
<?php
namespace App\Event;
abstract class EventHandler
{
protected array $supportedEvents = [];
abstract public function handle(DomainEvent $event): void;
public function supports(string $eventType): bool
{
return in_array($eventType, $this->supportedEvents);
}
public function getSupportedEvents(): array
{
return $this->supportedEvents;
}
}
class NotificationEventHandler extends EventHandler
{
protected array $supportedEvents = [
'user.registered',
'order.created',
'payment.completed',
];
private NotificationService $notificationService;
public function __construct(NotificationService $notificationService)
{
$this->notificationService = $notificationService;
}
public function handle(DomainEvent $event): void
{
switch ($event->eventType) {
case 'user.registered':
$this->handleUserRegistered($event);
break;
case 'order.created':
$this->handleOrderCreated($event);
break;
case 'payment.completed':
$this->handlePaymentCompleted($event);
break;
}
}
private function handleUserRegistered(DomainEvent $event): void
{
$payload = $event->getPayload();
$this->notificationService->send([
'type' => 'email',
'to' => $payload['email'],
'template' => 'welcome',
'data' => ['name' => $payload['name']],
]);
}
private function handleOrderCreated(DomainEvent $event): void
{
$payload = $event->getPayload();
$this->notificationService->send([
'type' => 'sms',
'to' => $payload['user_id'],
'template' => 'order_created',
'data' => ['order_id' => $payload['order_id']],
]);
}
private function handlePaymentCompleted(DomainEvent $event): void
{
$payload = $event->getPayload();
$this->notificationService->send([
'type' => 'push',
'to' => $payload['order_id'],
'template' => 'payment_success',
'data' => ['amount' => $payload['amount']],
]);
}
}
class AnalyticsEventHandler extends EventHandler
{
protected array $supportedEvents = ['#'];
private AnalyticsService $analyticsService;
public function __construct(AnalyticsService $analyticsService)
{
$this->analyticsService = $analyticsService;
}
public function handle(DomainEvent $event): void
{
$this->analyticsService->track([
'event_id' => $event->eventId,
'event_type' => $event->eventType,
'aggregate_type' => $event->aggregateType,
'aggregate_id' => $event->aggregateId,
'timestamp' => $event->timestamp,
'payload' => $event->getPayload(),
]);
}
}事件消费者实现
php
<?php
namespace App\Event;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class EventConsumer
{
private AMQPStreamConnection $connection;
private $channel;
private array $handlers = [];
private bool $running = true;
public function __construct(AMQPStreamConnection $connection)
{
$this->connection = $connection;
$this->channel = $connection->channel();
}
public function registerHandler(EventHandler $handler): void
{
foreach ($handler->getSupportedEvents() as $eventType) {
$this->handlers[$eventType][] = $handler;
}
}
public function consume(string $queueName, int $prefetchCount = 10): void
{
$this->channel->basic_qos(null, $prefetchCount, null);
$this->channel->basic_consume(
$queueName,
'',
false,
false,
false,
false,
[$this, 'processMessage']
);
while ($this->running && count($this->channel->callbacks)) {
$this->channel->wait(null, true);
if (!$this->running) break;
usleep(100000);
}
}
public function processMessage(AMQPMessage $message): void
{
$eventData = json_decode($message->body, true);
$eventType = $eventData['event_type'];
try {
$event = $this->reconstructEvent($eventData);
$handlers = $this->getHandlers($eventType);
foreach ($handlers as $handler) {
$handler->handle($event);
}
$message->ack();
} catch (\Exception $e) {
$this->handleError($message, $eventData, $e);
}
}
private function reconstructEvent(array $data): DomainEvent
{
$eventClass = $this->getEventClass($data['event_type']);
if (!class_exists($eventClass)) {
throw new \RuntimeException("Unknown event type: {$data['event_type']}");
}
return $eventClass::fromArray($data);
}
private function getEventClass(string $eventType): string
{
$map = [
'user.registered' => UserRegisteredEvent::class,
'order.created' => OrderCreatedEvent::class,
'payment.completed' => PaymentCompletedEvent::class,
'inventory.deducted' => InventoryDeductedEvent::class,
];
return $map[$eventType] ?? '';
}
private function getHandlers(string $eventType): array
{
$handlers = [];
if (isset($this->handlers[$eventType])) {
$handlers = array_merge($handlers, $this->handlers[$eventType]);
}
if (isset($this->handlers['#'])) {
$handlers = array_merge($handlers, $this->handlers['#']);
}
return $handlers;
}
private function handleError(AMQPMessage $message, array $eventData, \Exception $e): void
{
error_log(sprintf(
'[EventConsumer] Error processing event %s: %s',
$eventData['event_id'],
$e->getMessage()
));
$message->nack(false, true);
}
public function stop(): void
{
$this->running = false;
}
public function close(): void
{
$this->stop();
if ($this->channel) {
$this->channel->close();
}
}
}完整使用示例
php
<?php
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Event\{
DomainEvent,
EventBus,
EventStore,
EventConsumer,
UserRegisteredEvent,
OrderCreatedEvent,
PaymentCompletedEvent,
NotificationEventHandler,
AnalyticsEventHandler
};
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$pdo = new PDO('mysql:host=localhost;dbname=event_store', 'root', 'password');
$eventStore = new EventStore($pdo);
$eventBus = new EventBus($connection, $eventStore);
$userEvent = new UserRegisteredEvent(
'user_001',
'user@example.com',
'张三'
);
$eventBus->publish($userEvent);
echo "发布用户注册事件: {$userEvent->eventId}\n";
$orderEvent = new OrderCreatedEvent(
'order_001',
'user_001',
[['product_id' => 'P001', 'quantity' => 2, 'price' => 99.00]],
198.00
);
$eventBus->publish($orderEvent);
echo "发布订单创建事件: {$orderEvent->eventId}\n";
$paymentEvent = new PaymentCompletedEvent(
'payment_001',
'order_001',
198.00,
'alipay'
);
$eventBus->publish($paymentEvent);
echo "发布支付完成事件: {$paymentEvent->eventId}\n";
$notificationHandler = new NotificationEventHandler(new NotificationService());
$analyticsHandler = new AnalyticsEventHandler(new AnalyticsService());
$consumer = new EventConsumer($connection);
$consumer->registerHandler($notificationHandler);
$consumer->registerHandler($analyticsHandler);
echo "\n开始消费事件...\n";
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () use ($consumer) {
echo "收到终止信号\n";
$consumer->stop();
});
pcntl_signal(SIGINT, function () use ($consumer) {
echo "收到中断信号\n";
$consumer->stop();
});
$consumer->consume('all.events');
$eventBus->close();
$consumer->close();
$connection->close();关键技术点解析
1. 事件溯源
所有事件持久化存储,支持:
- 事件回放重建状态
- 审计追踪
- 调试分析
2. 事件版本控制
php
public function __construct(
string $aggregateType,
string $aggregateId,
int $version = 1
) {
$this->version = $version;
}3. 幂等性处理
php
public function handle(DomainEvent $event): void
{
if ($this->isProcessed($event->eventId)) {
return;
}
$this->doHandle($event);
$this->markProcessed($event->eventId);
}4. 事件关联
通过 correlation_id 追踪关联事件:
php
'metadata' => [
'correlation_id' => $this->generateCorrelationId(),
]性能优化建议
| 优化项 | 建议 | 说明 |
|---|---|---|
| 批量发布 | 合并事件批量发布 | 减少网络开销 |
| 异步处理 | 消费者异步处理 | 提高吞吐量 |
| 事件压缩 | 大事件压缩传输 | 减少带宽占用 |
| 分区存储 | 按聚合根分区存储 | 提高查询效率 |
常见问题与解决方案
1. 事件顺序
问题: 同一聚合的事件顺序错乱
解决方案: 使用版本号保证顺序,单消费者处理
2. 事件丢失
问题: 事件发布后丢失
解决方案: 事件存储 + 消息持久化 + 确认机制
3. 事件重复
问题: 事件被重复消费
解决方案: 幂等性处理,记录已处理事件ID
4. 事件膨胀
问题: 事件存储快速增长
解决方案: 定期归档,快照机制
