Skip to content

事件驱动架构

概述

事件驱动架构(Event-Driven Architecture,EDA)是一种以事件为核心的设计模式,系统组件通过产生、检测和消费事件进行通信。RabbitMQ 作为事件总线,实现服务间的松耦合通信,提升系统的可扩展性和响应能力。

业务背景与需求

场景描述

某电商平台采用微服务架构,服务间通信需求:

服务产生事件消费事件
用户服务用户注册、信息变更订单创建、积分变更
订单服务订单创建、状态变更用户验证、库存扣减
支付服务支付成功、退款订单状态更新
库存服务库存预警、补货订单创建、退货
通知服务-所有业务事件

需求分析

需求项描述
松耦合服务间无直接依赖
异步通信非阻塞式消息传递
事件溯源支持事件回放和审计
可扩展新服务可随时订阅事件
可靠性事件不丢失,保证送达

架构设计

整体架构图

mermaid
graph TB
    subgraph "事件生产者"
        A[用户服务]
        B[订单服务]
        C[支付服务]
        D[库存服务]
    end
    
    subgraph "事件总线 - RabbitMQ"
        E[事件交换机<br/>event.exchange]
        
        subgraph "事件通道"
            F[用户事件通道<br/>user.events]
            G[订单事件通道<br/>order.events]
            H[支付事件通道<br/>payment.events]
            I[库存事件通道<br/>inventory.events]
        end
        
        J[事件存储<br/>event.store]
    end
    
    subgraph "事件消费者"
        K[通知服务]
        L[分析服务]
        M[搜索服务]
        N[审计服务]
    end
    
    subgraph "事件管理"
        O[事件注册表]
        P[Schema注册]
        Q[事件监控]
    end
    
    A --> E
    B --> E
    C --> E
    D --> E
    
    E --> F
    E --> G
    E --> H
    E --> I
    
    F --> J
    G --> J
    H --> J
    I --> J
    
    F --> K
    F --> L
    G --> K
    G --> M
    H --> K
    I --> K
    I --> N
    
    O --> E
    P --> E
    Q --> J

事件流转流程

mermaid
sequenceDiagram
    participant Service as 业务服务
    participant EventBus as 事件总线
    participant Store as 事件存储
    participant Consumer as 事件消费者
    participant Monitor as 监控系统
    
    Service->>Service: 执行业务逻辑
    Service->>EventBus: 发布领域事件
    
    EventBus->>Store: 持久化事件
    EventBus->>EventBus: 路由到订阅者
    
    par 并行投递
        EventBus->>Consumer: 投递给消费者A
        EventBus->>Consumer: 投递给消费者B
    end
    
    Consumer->>Consumer: 处理事件
    Consumer-->>EventBus: ACK确认
    
    EventBus->>Monitor: 记录事件指标
    
    alt 处理失败
        Consumer-->>EventBus: NACK
        EventBus->>EventBus: 重试或进入死信
    end

事件生命周期

mermaid
stateDiagram-v2
    [*] --> created: 创建事件
    created --> published: 发布到总线
    published --> persisted: 持久化
    persisted --> routed: 路由分发
    routed --> delivered: 投递消费者
    delivered --> processing: 开始处理
    processing --> completed: 处理成功
    processing --> failed: 处理失败
    failed --> retrying: 重试中
    retrying --> delivered: 重新投递
    retrying --> dead_lettered: 重试耗尽
    completed --> [*]
    dead_lettered --> [*]
    
    note right of created: 事件生成
    note right of published: 进入总线
    note right of delivered: 到达消费者
    note right of completed: 处理完成

PHP 代码实现

事件基类定义

php
<?php

namespace App\Event;

abstract class DomainEvent
{
    public string $eventId;
    public string $eventType;
    public string $aggregateType;
    public string $aggregateId;
    public int $version;
    public int $timestamp;
    public array $metadata;

    public function __construct(
        string $aggregateType,
        string $aggregateId,
        int $version = 1
    ) {
        $this->eventId = $this->generateEventId();
        $this->eventType = $this->getEventType();
        $this->aggregateType = $aggregateType;
        $this->aggregateId = $aggregateId;
        $this->version = $version;
        $this->timestamp = time();
        $this->metadata = [
            'source' => get_class($this),
            'correlation_id' => $this->generateCorrelationId(),
        ];
    }

    private function generateEventId(): string
    {
        return sprintf('evt_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
    }

    private function generateCorrelationId(): string
    {
        return bin2hex(random_bytes(16));
    }

    abstract protected function getEventType(): string;

    abstract public function getPayload(): array;

    public function addMetadata(string $key, $value): self
    {
        $this->metadata[$key] = $value;
        return $this;
    }

    public function toArray(): array
    {
        return [
            'event_id' => $this->eventId,
            'event_type' => $this->eventType,
            'aggregate_type' => $this->aggregateType,
            'aggregate_id' => $this->aggregateId,
            'version' => $this->version,
            'timestamp' => $this->timestamp,
            'metadata' => $this->metadata,
            'payload' => $this->getPayload(),
        ];
    }

    public function toJson(): string
    {
        return json_encode($this->toArray(), JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
    }
}

具体事件定义

php
<?php

namespace App\Event;

class UserRegisteredEvent extends DomainEvent
{
    private string $userId;
    private string $email;
    private string $name;

    public function __construct(string $userId, string $email, string $name)
    {
        parent::__construct('user', $userId, 1);
        $this->userId = $userId;
        $this->email = $email;
        $this->name = $name;
    }

    protected function getEventType(): string
    {
        return 'user.registered';
    }

    public function getPayload(): array
    {
        return [
            'user_id' => $this->userId,
            'email' => $this->email,
            'name' => $this->name,
        ];
    }
}

class OrderCreatedEvent extends DomainEvent
{
    private string $orderId;
    private string $userId;
    private array $items;
    private float $totalAmount;

    public function __construct(
        string $orderId,
        string $userId,
        array $items,
        float $totalAmount,
        int $version = 1
    ) {
        parent::__construct('order', $orderId, $version);
        $this->orderId = $orderId;
        $this->userId = $userId;
        $this->items = $items;
        $this->totalAmount = $totalAmount;
    }

    protected function getEventType(): string
    {
        return 'order.created';
    }

    public function getPayload(): array
    {
        return [
            'order_id' => $this->orderId,
            'user_id' => $this->userId,
            'items' => $this->items,
            'total_amount' => $this->totalAmount,
        ];
    }
}

class PaymentCompletedEvent extends DomainEvent
{
    private string $paymentId;
    private string $orderId;
    private float $amount;
    private string $paymentMethod;

    public function __construct(
        string $paymentId,
        string $orderId,
        float $amount,
        string $paymentMethod
    ) {
        parent::__construct('payment', $paymentId, 1);
        $this->paymentId = $paymentId;
        $this->orderId = $orderId;
        $this->amount = $amount;
        $this->paymentMethod = $paymentMethod;
    }

    protected function getEventType(): string
    {
        return 'payment.completed';
    }

    public function getPayload(): array
    {
        return [
            'payment_id' => $this->paymentId,
            'order_id' => $this->orderId,
            'amount' => $this->amount,
            'payment_method' => $this->paymentMethod,
        ];
    }
}

class InventoryDeductedEvent extends DomainEvent
{
    private string $productId;
    private string $skuId;
    private int $quantity;
    private int $remainingStock;

    public function __construct(
        string $productId,
        string $skuId,
        int $quantity,
        int $remainingStock
    ) {
        parent::__construct('inventory', $productId, 1);
        $this->productId = $productId;
        $this->skuId = $skuId;
        $this->quantity = $quantity;
        $this->remainingStock = $remainingStock;
    }

    protected function getEventType(): string
    {
        return 'inventory.deducted';
    }

    public function getPayload(): array
    {
        return [
            'product_id' => $this->productId,
            'sku_id' => $this->skuId,
            'quantity' => $this->quantity,
            'remaining_stock' => $this->remainingStock,
        ];
    }
}

事件总线实现

php
<?php

namespace App\Event;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class EventBus
{
    private AMQPStreamConnection $connection;
    private $channel;
    private EventStore $eventStore;
    private string $exchangeName = 'event.exchange';
    private array $eventRouting = [];

    public function __construct(
        AMQPStreamConnection $connection,
        EventStore $eventStore
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->eventStore = $eventStore;
        $this->setupInfrastructure();
    }

    private function setupInfrastructure(): void
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::TOPIC,
            false,
            true,
            false
        );

        $this->setupEventChannels();
    }

    private function setupEventChannels(): void
    {
        $channels = [
            'user.events' => ['user.*'],
            'order.events' => ['order.*'],
            'payment.events' => ['payment.*'],
            'inventory.events' => ['inventory.*'],
            'all.events' => ['#'],
        ];

        foreach ($channels as $queueName => $routingKeys) {
            $args = [
                'x-message-ttl' => ['I', 604800000],
                'x-max-length' => ['I', 1000000],
            ];

            $this->channel->queue_declare(
                $queueName,
                false,
                true,
                false,
                false,
                false,
                $args
            );

            foreach ($routingKeys as $routingKey) {
                $this->channel->queue_bind(
                    $queueName,
                    $this->exchangeName,
                    $routingKey
                );
            }
        }
    }

    public function publish(DomainEvent $event): bool
    {
        $this->eventStore->append($event);

        $routingKey = $this->getRoutingKey($event);

        $message = new AMQPMessage(
            $event->toJson(),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $event->eventId,
                'timestamp' => $event->timestamp,
                'type' => $event->eventType,
                'headers' => [
                    'aggregate_type' => $event->aggregateType,
                    'aggregate_id' => $event->aggregateId,
                    'version' => $event->version,
                    'correlation_id' => $event->metadata['correlation_id'] ?? '',
                ],
            ]
        );

        $this->channel->basic_publish(
            $message,
            $this->exchangeName,
            $routingKey
        );

        return true;
    }

    public function publishBatch(array $events): int
    {
        $count = 0;
        foreach ($events as $event) {
            if ($event instanceof DomainEvent) {
                $this->publish($event);
                $count++;
            }
        }
        return $count;
    }

    private function getRoutingKey(DomainEvent $event): string
    {
        return sprintf('%s.%s', $event->aggregateType, $event->eventType);
    }

    public function subscribe(
        string $queueName,
        string $eventType,
        callable $handler
    ): void {
        $this->eventRouting[$queueName][$eventType] = $handler;
    }

    public function close(): void
    {
        if ($this->channel) {
            $this->channel->close();
        }
    }
}

事件存储实现

php
<?php

namespace App\Event;

class EventStore
{
    private $pdo;

    public function __construct(\PDO $pdo)
    {
        $this->pdo = $pdo;
        $this->initTable();
    }

    private function initTable(): void
    {
        $sql = "
            CREATE TABLE IF NOT EXISTS event_store (
                id BIGINT AUTO_INCREMENT PRIMARY KEY,
                event_id VARCHAR(64) NOT NULL UNIQUE,
                event_type VARCHAR(128) NOT NULL,
                aggregate_type VARCHAR(64) NOT NULL,
                aggregate_id VARCHAR(64) NOT NULL,
                version INT NOT NULL,
                payload JSON NOT NULL,
                metadata JSON,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
                INDEX idx_aggregate (aggregate_type, aggregate_id),
                INDEX idx_event_type (event_type),
                INDEX idx_created_at (created_at)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        ";
        $this->pdo->exec($sql);
    }

    public function append(DomainEvent $event): bool
    {
        $sql = "
            INSERT INTO event_store 
            (event_id, event_type, aggregate_type, aggregate_id, version, payload, metadata)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        ";

        $stmt = $this->pdo->prepare($sql);
        return $stmt->execute([
            $event->eventId,
            $event->eventType,
            $event->aggregateType,
            $event->aggregateId,
            $event->version,
            json_encode($event->getPayload()),
            json_encode($event->metadata),
        ]);
    }

    public function getEvents(string $aggregateType, string $aggregateId): array
    {
        $sql = "
            SELECT * FROM event_store 
            WHERE aggregate_type = ? AND aggregate_id = ?
            ORDER BY version ASC
        ";

        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([$aggregateType, $aggregateId]);

        return $stmt->fetchAll(\PDO::FETCH_ASSOC);
    }

    public function getEventsByType(string $eventType, int $limit = 100): array
    {
        $sql = "
            SELECT * FROM event_store 
            WHERE event_type = ?
            ORDER BY created_at DESC
            LIMIT ?
        ";

        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([$eventType, $limit]);

        return $stmt->fetchAll(\PDO::FETCH_ASSOC);
    }

    public function getEventsAfter(int $timestamp, int $limit = 1000): array
    {
        $sql = "
            SELECT * FROM event_store 
            WHERE created_at > FROM_UNIXTIME(?)
            ORDER BY created_at ASC
            LIMIT ?
        ";

        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([$timestamp, $limit]);

        return $stmt->fetchAll(\PDO::FETCH_ASSOC);
    }
}

事件处理器实现

php
<?php

namespace App\Event;

abstract class EventHandler
{
    protected array $supportedEvents = [];

    abstract public function handle(DomainEvent $event): void;

    public function supports(string $eventType): bool
    {
        return in_array($eventType, $this->supportedEvents);
    }

    public function getSupportedEvents(): array
    {
        return $this->supportedEvents;
    }
}

class NotificationEventHandler extends EventHandler
{
    protected array $supportedEvents = [
        'user.registered',
        'order.created',
        'payment.completed',
    ];

    private NotificationService $notificationService;

    public function __construct(NotificationService $notificationService)
    {
        $this->notificationService = $notificationService;
    }

    public function handle(DomainEvent $event): void
    {
        switch ($event->eventType) {
            case 'user.registered':
                $this->handleUserRegistered($event);
                break;
            case 'order.created':
                $this->handleOrderCreated($event);
                break;
            case 'payment.completed':
                $this->handlePaymentCompleted($event);
                break;
        }
    }

    private function handleUserRegistered(DomainEvent $event): void
    {
        $payload = $event->getPayload();
        $this->notificationService->send([
            'type' => 'email',
            'to' => $payload['email'],
            'template' => 'welcome',
            'data' => ['name' => $payload['name']],
        ]);
    }

    private function handleOrderCreated(DomainEvent $event): void
    {
        $payload = $event->getPayload();
        $this->notificationService->send([
            'type' => 'sms',
            'to' => $payload['user_id'],
            'template' => 'order_created',
            'data' => ['order_id' => $payload['order_id']],
        ]);
    }

    private function handlePaymentCompleted(DomainEvent $event): void
    {
        $payload = $event->getPayload();
        $this->notificationService->send([
            'type' => 'push',
            'to' => $payload['order_id'],
            'template' => 'payment_success',
            'data' => ['amount' => $payload['amount']],
        ]);
    }
}

class AnalyticsEventHandler extends EventHandler
{
    protected array $supportedEvents = ['#'];

    private AnalyticsService $analyticsService;

    public function __construct(AnalyticsService $analyticsService)
    {
        $this->analyticsService = $analyticsService;
    }

    public function handle(DomainEvent $event): void
    {
        $this->analyticsService->track([
            'event_id' => $event->eventId,
            'event_type' => $event->eventType,
            'aggregate_type' => $event->aggregateType,
            'aggregate_id' => $event->aggregateId,
            'timestamp' => $event->timestamp,
            'payload' => $event->getPayload(),
        ]);
    }
}

事件消费者实现

php
<?php

namespace App\Event;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class EventConsumer
{
    private AMQPStreamConnection $connection;
    private $channel;
    private array $handlers = [];
    private bool $running = true;

    public function __construct(AMQPStreamConnection $connection)
    {
        $this->connection = $connection;
        $this->channel = $connection->channel();
    }

    public function registerHandler(EventHandler $handler): void
    {
        foreach ($handler->getSupportedEvents() as $eventType) {
            $this->handlers[$eventType][] = $handler;
        }
    }

    public function consume(string $queueName, int $prefetchCount = 10): void
    {
        $this->channel->basic_qos(null, $prefetchCount, null);

        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            false,
            false,
            false,
            [$this, 'processMessage']
        );

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true);
            if (!$this->running) break;
            usleep(100000);
        }
    }

    public function processMessage(AMQPMessage $message): void
    {
        $eventData = json_decode($message->body, true);
        $eventType = $eventData['event_type'];

        try {
            $event = $this->reconstructEvent($eventData);

            $handlers = $this->getHandlers($eventType);

            foreach ($handlers as $handler) {
                $handler->handle($event);
            }

            $message->ack();

        } catch (\Exception $e) {
            $this->handleError($message, $eventData, $e);
        }
    }

    private function reconstructEvent(array $data): DomainEvent
    {
        $eventClass = $this->getEventClass($data['event_type']);
        
        if (!class_exists($eventClass)) {
            throw new \RuntimeException("Unknown event type: {$data['event_type']}");
        }

        return $eventClass::fromArray($data);
    }

    private function getEventClass(string $eventType): string
    {
        $map = [
            'user.registered' => UserRegisteredEvent::class,
            'order.created' => OrderCreatedEvent::class,
            'payment.completed' => PaymentCompletedEvent::class,
            'inventory.deducted' => InventoryDeductedEvent::class,
        ];

        return $map[$eventType] ?? '';
    }

    private function getHandlers(string $eventType): array
    {
        $handlers = [];

        if (isset($this->handlers[$eventType])) {
            $handlers = array_merge($handlers, $this->handlers[$eventType]);
        }

        if (isset($this->handlers['#'])) {
            $handlers = array_merge($handlers, $this->handlers['#']);
        }

        return $handlers;
    }

    private function handleError(AMQPMessage $message, array $eventData, \Exception $e): void
    {
        error_log(sprintf(
            '[EventConsumer] Error processing event %s: %s',
            $eventData['event_id'],
            $e->getMessage()
        ));

        $message->nack(false, true);
    }

    public function stop(): void
    {
        $this->running = false;
    }

    public function close(): void
    {
        $this->stop();
        if ($this->channel) {
            $this->channel->close();
        }
    }
}

完整使用示例

php
<?php

require_once 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Event\{
    DomainEvent,
    EventBus,
    EventStore,
    EventConsumer,
    UserRegisteredEvent,
    OrderCreatedEvent,
    PaymentCompletedEvent,
    NotificationEventHandler,
    AnalyticsEventHandler
};

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$pdo = new PDO('mysql:host=localhost;dbname=event_store', 'root', 'password');
$eventStore = new EventStore($pdo);

$eventBus = new EventBus($connection, $eventStore);

$userEvent = new UserRegisteredEvent(
    'user_001',
    'user@example.com',
    '张三'
);
$eventBus->publish($userEvent);
echo "发布用户注册事件: {$userEvent->eventId}\n";

$orderEvent = new OrderCreatedEvent(
    'order_001',
    'user_001',
    [['product_id' => 'P001', 'quantity' => 2, 'price' => 99.00]],
    198.00
);
$eventBus->publish($orderEvent);
echo "发布订单创建事件: {$orderEvent->eventId}\n";

$paymentEvent = new PaymentCompletedEvent(
    'payment_001',
    'order_001',
    198.00,
    'alipay'
);
$eventBus->publish($paymentEvent);
echo "发布支付完成事件: {$paymentEvent->eventId}\n";

$notificationHandler = new NotificationEventHandler(new NotificationService());
$analyticsHandler = new AnalyticsEventHandler(new AnalyticsService());

$consumer = new EventConsumer($connection);
$consumer->registerHandler($notificationHandler);
$consumer->registerHandler($analyticsHandler);

echo "\n开始消费事件...\n";

pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () use ($consumer) {
    echo "收到终止信号\n";
    $consumer->stop();
});
pcntl_signal(SIGINT, function () use ($consumer) {
    echo "收到中断信号\n";
    $consumer->stop();
});

$consumer->consume('all.events');

$eventBus->close();
$consumer->close();
$connection->close();

关键技术点解析

1. 事件溯源

所有事件持久化存储,支持:

  • 事件回放重建状态
  • 审计追踪
  • 调试分析

2. 事件版本控制

php
public function __construct(
    string $aggregateType,
    string $aggregateId,
    int $version = 1
) {
    $this->version = $version;
}

3. 幂等性处理

php
public function handle(DomainEvent $event): void
{
    if ($this->isProcessed($event->eventId)) {
        return;
    }
    
    $this->doHandle($event);
    $this->markProcessed($event->eventId);
}

4. 事件关联

通过 correlation_id 追踪关联事件:

php
'metadata' => [
    'correlation_id' => $this->generateCorrelationId(),
]

性能优化建议

优化项建议说明
批量发布合并事件批量发布减少网络开销
异步处理消费者异步处理提高吞吐量
事件压缩大事件压缩传输减少带宽占用
分区存储按聚合根分区存储提高查询效率

常见问题与解决方案

1. 事件顺序

问题: 同一聚合的事件顺序错乱

解决方案: 使用版本号保证顺序,单消费者处理

2. 事件丢失

问题: 事件发布后丢失

解决方案: 事件存储 + 消息持久化 + 确认机制

3. 事件重复

问题: 事件被重复消费

解决方案: 幂等性处理,记录已处理事件ID

4. 事件膨胀

问题: 事件存储快速增长

解决方案: 定期归档,快照机制

相关链接