Skip to content

事件溯源实现

概述

事件溯源(Event Sourcing)是一种将应用状态变化存储为事件序列的模式。通过 RabbitMQ 持久化和传播这些事件,可以实现完整的状态重建、时间旅行查询和审计追踪。

业务背景与需求

场景描述

某金融系统需要完整记录账户状态变化:

操作事件数据
开户AccountOpened账户ID、用户ID、初始余额
存款MoneyDeposited账户ID、金额、时间
取款MoneyWithdrawn账户ID、金额、时间
转账MoneyTransferred源账户、目标账户、金额
冻结AccountFrozen账户ID、原因、时间

传统存储问题

状态覆盖问题:
1. 只保留最终状态,历史丢失
2. 无法回溯历史状态
3. 审计追踪困难
4. Bug 难以定位和修复

事件溯源优势

优势说明
完整历史所有状态变化都有记录
时间旅行可重建任意时间点的状态
审计追踪天然支持审计需求
调试友好可重放事件定位问题

架构设计

事件溯源架构图

mermaid
graph TB
    subgraph "命令端"
        A[命令处理]
        B[聚合根]
        C[事件生成]
    end
    
    subgraph "事件存储"
        D[事件日志]
        E[快照存储]
    end
    
    subgraph "RabbitMQ"
        F[事件交换机<br/>es.exchange]
        G[事件队列]
    end
    
    subgraph "投影端"
        H[投影构建器]
        I[读模型]
    end
    
    subgraph "重建服务"
        J[状态重建]
        K[快照生成]
    end
    
    A --> B
    B --> C
    C --> D
    C --> F
    
    D --> E
    D --> J
    
    F --> G
    G --> H
    H --> I
    
    J --> K
    K --> E

事件流转流程

mermaid
sequenceDiagram
    participant Client as 客户端
    participant Command as 命令处理器
    participant Aggregate as 聚合根
    participant Store as 事件存储
    participant MQ as RabbitMQ
    participant Projector as 投影器
    
    Client->>Command: 发送命令
    Command->>Store: 加载历史事件
    Store-->>Command: 返回事件列表
    Command->>Aggregate: 重放事件
    Aggregate-->>Command: 返回当前状态
    Command->>Aggregate: 执行命令
    Aggregate->>Aggregate: 生成新事件
    Aggregate-->>Command: 返回新事件
    Command->>Store: 追加事件
    Command->>MQ: 发布事件
    Command-->>Client: 返回结果
    
    MQ->>Projector: 投递事件
    Projector->>Projector: 构建投影
    Projector->>Projector: 更新读模型

状态重建流程

mermaid
graph LR
    A[事件1<br/>开户] --> B[事件2<br/>存款100]
    B --> C[事件3<br/>取款50]
    C --> D[事件4<br/>存款200]
    D --> E[当前状态<br/>余额250]
    
    style A fill:#e1f5fe
    style B fill:#e1f5fe
    style C fill:#e1f5fe
    style D fill:#e1f5fe
    style E fill:#c8e6c9

PHP 代码实现

事件基类

php
<?php

namespace App\EventSourcing;

abstract class DomainEvent
{
    public string $eventId;
    public string $eventType;
    public string $aggregateType;
    public string $aggregateId;
    public int $version;
    public int $timestamp;
    public array $metadata;

    public function __construct(
        string $aggregateType,
        string $aggregateId,
        int $version
    ) {
        $this->eventId = $this->generateEventId();
        $this->eventType = $this->getEventType();
        $this->aggregateType = $aggregateType;
        $this->aggregateId = $aggregateId;
        $this->version = $version;
        $this->timestamp = time();
        $this->metadata = [];
    }

    private function generateEventId(): string
    {
        return sprintf('evt_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
    }

    abstract protected function getEventType(): string;

    abstract public function getPayload(): array;

    public function toArray(): array
    {
        return [
            'event_id' => $this->eventId,
            'event_type' => $this->eventType,
            'aggregate_type' => $this->aggregateType,
            'aggregate_id' => $this->aggregateId,
            'version' => $this->version,
            'timestamp' => $this->timestamp,
            'metadata' => $this->metadata,
            'payload' => $this->getPayload(),
        ];
    }

    public static function fromArray(array $data): self
    {
        $class = static::class;
        $event = new $class(
            $data['aggregate_id'],
            $data['version']
        );
        $event->eventId = $data['event_id'];
        $event->timestamp = $data['timestamp'];
        $event->metadata = $data['metadata'] ?? [];
        return $event;
    }
}

账户事件定义

php
<?php

namespace App\EventSourcing\Events;

use App\EventSourcing\DomainEvent;

class AccountOpenedEvent extends DomainEvent
{
    private string $userId;
    private string $currency;
    private float $initialBalance;

    public function __construct(
        string $accountId,
        string $userId,
        string $currency = 'CNY',
        float $initialBalance = 0.0
    ) {
        parent::__construct('account', $accountId, 1);
        $this->userId = $userId;
        $this->currency = $currency;
        $this->initialBalance = $initialBalance;
    }

    protected function getEventType(): string
    {
        return 'account.opened';
    }

    public function getPayload(): array
    {
        return [
            'user_id' => $this->userId,
            'currency' => $this->currency,
            'initial_balance' => $this->initialBalance,
        ];
    }
}

class MoneyDepositedEvent extends DomainEvent
{
    private float $amount;
    private float $balanceAfter;
    private string $description;

    public function __construct(
        string $accountId,
        int $version,
        float $amount,
        float $balanceAfter,
        string $description = ''
    ) {
        parent::__construct('account', $accountId, $version);
        $this->amount = $amount;
        $this->balanceAfter = $balanceAfter;
        $this->description = $description;
    }

    protected function getEventType(): string
    {
        return 'account.money_deposited';
    }

    public function getPayload(): array
    {
        return [
            'amount' => $this->amount,
            'balance_after' => $this->balanceAfter,
            'description' => $this->description,
        ];
    }
}

class MoneyWithdrawnEvent extends DomainEvent
{
    private float $amount;
    private float $balanceAfter;
    private string $description;

    public function __construct(
        string $accountId,
        int $version,
        float $amount,
        float $balanceAfter,
        string $description = ''
    ) {
        parent::__construct('account', $accountId, $version);
        $this->amount = $amount;
        $this->balanceAfter = $balanceAfter;
        $this->description = $description;
    }

    protected function getEventType(): string
    {
        return 'account.money_withdrawn';
    }

    public function getPayload(): array
    {
        return [
            'amount' => $this->amount,
            'balance_after' => $this->balanceAfter,
            'description' => $this->description,
        ];
    }
}

class MoneyTransferredEvent extends DomainEvent
{
    private string $toAccountId;
    private float $amount;
    private float $balanceAfter;

    public function __construct(
        string $accountId,
        int $version,
        string $toAccountId,
        float $amount,
        float $balanceAfter
    ) {
        parent::__construct('account', $accountId, $version);
        $this->toAccountId = $toAccountId;
        $this->amount = $amount;
        $this->balanceAfter = $balanceAfter;
    }

    protected function getEventType(): string
    {
        return 'account.money_transferred';
    }

    public function getPayload(): array
    {
        return [
            'to_account_id' => $this->toAccountId,
            'amount' => $this->amount,
            'balance_after' => $this->balanceAfter,
        ];
    }
}

class AccountFrozenEvent extends DomainEvent
{
    private string $reason;

    public function __construct(
        string $accountId,
        int $version,
        string $reason
    ) {
        parent::__construct('account', $accountId, $version);
        $this->reason = $reason;
    }

    protected function getEventType(): string
    {
        return 'account.frozen';
    }

    public function getPayload(): array
    {
        return [
            'reason' => $this->reason,
        ];
    }
}

聚合根实现

php
<?php

namespace App\EventSourcing;

abstract class AggregateRoot
{
    protected string $aggregateId;
    protected int $version = 0;
    protected array $uncommittedEvents = [];

    public function getAggregateId(): string
    {
        return $this->aggregateId;
    }

    public function getVersion(): int
    {
        return $this->version;
    }

    public function getUncommittedEvents(): array
    {
        return $this->uncommittedEvents;
    }

    public function markEventsAsCommitted(): void
    {
        $this->uncommittedEvents = [];
    }

    public function loadFromHistory(array $events): void
    {
        foreach ($events as $event) {
            $this->apply($event);
            $this->version = $event->version;
        }
    }

    protected function raise(DomainEvent $event): void
    {
        $this->uncommittedEvents[] = $event;
        $this->apply($event);
        $this->version = $event->version;
    }

    abstract protected function apply(DomainEvent $event): void;
}

class Account extends AggregateRoot
{
    private string $userId;
    private string $currency;
    private float $balance = 0.0;
    private bool $frozen = false;

    public static function open(string $accountId, string $userId, string $currency = 'CNY'): self
    {
        $account = new self();
        $account->aggregateId = $accountId;

        $event = new AccountOpenedEvent($accountId, $userId, $currency);
        $account->raise($event);

        return $account;
    }

    public function deposit(float $amount, string $description = ''): void
    {
        if ($this->frozen) {
            throw new \RuntimeException('Account is frozen');
        }

        if ($amount <= 0) {
            throw new \InvalidArgumentException('Amount must be positive');
        }

        $newBalance = $this->balance + $amount;

        $event = new MoneyDepositedEvent(
            $this->aggregateId,
            $this->version + 1,
            $amount,
            $newBalance,
            $description
        );

        $this->raise($event);
    }

    public function withdraw(float $amount, string $description = ''): void
    {
        if ($this->frozen) {
            throw new \RuntimeException('Account is frozen');
        }

        if ($amount <= 0) {
            throw new \InvalidArgumentException('Amount must be positive');
        }

        if ($this->balance < $amount) {
            throw new \RuntimeException('Insufficient balance');
        }

        $newBalance = $this->balance - $amount;

        $event = new MoneyWithdrawnEvent(
            $this->aggregateId,
            $this->version + 1,
            $amount,
            $newBalance,
            $description
        );

        $this->raise($event);
    }

    public function transfer(string $toAccountId, float $amount): void
    {
        if ($this->frozen) {
            throw new \RuntimeException('Account is frozen');
        }

        if ($amount <= 0) {
            throw new \InvalidArgumentException('Amount must be positive');
        }

        if ($this->balance < $amount) {
            throw new \RuntimeException('Insufficient balance');
        }

        $newBalance = $this->balance - $amount;

        $event = new MoneyTransferredEvent(
            $this->aggregateId,
            $this->version + 1,
            $toAccountId,
            $amount,
            $newBalance
        );

        $this->raise($event);
    }

    public function freeze(string $reason): void
    {
        if ($this->frozen) {
            return;
        }

        $event = new AccountFrozenEvent(
            $this->aggregateId,
            $this->version + 1,
            $reason
        );

        $this->raise($event);
    }

    public function getBalance(): float
    {
        return $this->balance;
    }

    public function isFrozen(): bool
    {
        return $this->frozen;
    }

    protected function apply(DomainEvent $event): void
    {
        $method = 'apply' . $this->getEventClassName($event);

        if (method_exists($this, $method)) {
            $this->$method($event);
        }
    }

    private function getEventClassName(DomainEvent $event): string
    {
        $parts = explode('\\', get_class($event));
        return end($parts);
    }

    private function applyAccountOpenedEvent(AccountOpenedEvent $event): void
    {
        $this->userId = $event->getPayload()['user_id'];
        $this->currency = $event->getPayload()['currency'];
        $this->balance = $event->getPayload()['initial_balance'];
    }

    private function applyMoneyDepositedEvent(MoneyDepositedEvent $event): void
    {
        $this->balance = $event->getPayload()['balance_after'];
    }

    private function applyMoneyWithdrawnEvent(MoneyWithdrawnEvent $event): void
    {
        $this->balance = $event->getPayload()['balance_after'];
    }

    private function applyMoneyTransferredEvent(MoneyTransferredEvent $event): void
    {
        $this->balance = $event->getPayload()['balance_after'];
    }

    private function applyAccountFrozenEvent(AccountFrozenEvent $event): void
    {
        $this->frozen = true;
    }
}

事件存储实现

php
<?php

namespace App\EventSourcing;

class EventStore
{
    private $pdo;
    private EventPublisher $publisher;

    public function __construct(\PDO $pdo, EventPublisher $publisher)
    {
        $this->pdo = $pdo;
        $this->publisher = $publisher;
        $this->initTable();
    }

    private function initTable(): void
    {
        $sql = "
            CREATE TABLE IF NOT EXISTS event_store (
                id BIGINT AUTO_INCREMENT PRIMARY KEY,
                event_id VARCHAR(64) NOT NULL UNIQUE,
                event_type VARCHAR(128) NOT NULL,
                aggregate_type VARCHAR(64) NOT NULL,
                aggregate_id VARCHAR(64) NOT NULL,
                version INT NOT NULL,
                payload JSON NOT NULL,
                metadata JSON,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
                UNIQUE KEY uk_aggregate_version (aggregate_type, aggregate_id, version),
                INDEX idx_aggregate (aggregate_type, aggregate_id),
                INDEX idx_created_at (created_at)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        ";
        $this->pdo->exec($sql);
    }

    public function append(array $events): void
    {
        $this->pdo->beginTransaction();

        try {
            foreach ($events as $event) {
                $this->insertEvent($event);
            }

            $this->pdo->commit();

            foreach ($events as $event) {
                $this->publisher->publish($event);
            }

        } catch (\Exception $e) {
            $this->pdo->rollBack();
            throw $e;
        }
    }

    private function insertEvent(DomainEvent $event): void
    {
        $sql = "
            INSERT INTO event_store 
            (event_id, event_type, aggregate_type, aggregate_id, version, payload, metadata)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        ";

        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([
            $event->eventId,
            $event->eventType,
            $event->aggregateType,
            $event->aggregateId,
            $event->version,
            json_encode($event->getPayload()),
            json_encode($event->metadata),
        ]);
    }

    public function getEvents(string $aggregateType, string $aggregateId): array
    {
        $sql = "
            SELECT * FROM event_store 
            WHERE aggregate_type = ? AND aggregate_id = ?
            ORDER BY version ASC
        ";

        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([$aggregateType, $aggregateId]);

        $events = [];
        while ($row = $stmt->fetch(\PDO::FETCH_ASSOC)) {
            $events[] = $this->deserializeEvent($row);
        }

        return $events;
    }

    public function getEventsFromVersion(string $aggregateType, string $aggregateId, int $version): array
    {
        $sql = "
            SELECT * FROM event_store 
            WHERE aggregate_type = ? AND aggregate_id = ? AND version > ?
            ORDER BY version ASC
        ";

        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([$aggregateType, $aggregateId, $version]);

        $events = [];
        while ($row = $stmt->fetch(\PDO::FETCH_ASSOC)) {
            $events[] = $this->deserializeEvent($row);
        }

        return $events;
    }

    private function deserializeEvent(array $row): DomainEvent
    {
        $eventClass = $this->getEventClass($row['event_type']);

        $eventData = [
            'event_id' => $row['event_id'],
            'event_type' => $row['event_type'],
            'aggregate_type' => $row['aggregate_type'],
            'aggregate_id' => $row['aggregate_id'],
            'version' => (int) $row['version'],
            'timestamp' => strtotime($row['created_at']),
            'metadata' => json_decode($row['metadata'], true) ?? [],
            'payload' => json_decode($row['payload'], true),
        ];

        return $eventClass::fromArray($eventData);
    }

    private function getEventClass(string $eventType): string
    {
        $map = [
            'account.opened' => AccountOpenedEvent::class,
            'account.money_deposited' => MoneyDepositedEvent::class,
            'account.money_withdrawn' => MoneyWithdrawnEvent::class,
            'account.money_transferred' => MoneyTransferredEvent::class,
            'account.frozen' => AccountFrozenEvent::class,
        ];

        return $map[$eventType] ?? DomainEvent::class;
    }
}

仓储实现

php
<?php

namespace App\EventSourcing;

class AccountRepository
{
    private EventStore $eventStore;
    private SnapshotRepository $snapshotRepository;

    public function __construct(
        EventStore $eventStore,
        SnapshotRepository $snapshotRepository
    ) {
        $this->eventStore = $eventStore;
        $this->snapshotRepository = $snapshotRepository;
    }

    public function load(string $accountId): Account
    {
        $snapshot = $this->snapshotRepository->get('account', $accountId);

        if ($snapshot) {
            $account = Account::fromSnapshot($snapshot);
            $events = $this->eventStore->getEventsFromVersion(
                'account',
                $accountId,
                $snapshot['version']
            );
            $account->loadFromHistory($events);
        } else {
            $events = $this->eventStore->getEvents('account', $accountId);
            $account = new Account();
            $account->loadFromHistory($events);
        }

        return $account;
    }

    public function save(Account $account): void
    {
        $events = $account->getUncommittedEvents();

        if (empty($events)) {
            return;
        }

        $this->eventStore->append($events);
        $account->markEventsAsCommitted();

        if ($account->getVersion() % 100 === 0) {
            $this->snapshotRepository->save($account->toSnapshot());
        }
    }
}

快照实现

php
<?php

namespace App\EventSourcing;

class SnapshotRepository
{
    private $redis;

    public function __construct($redis)
    {
        $this->redis = $redis;
    }

    public function get(string $aggregateType, string $aggregateId): ?array
    {
        $key = $this->getKey($aggregateType, $aggregateId);
        $data = $this->redis->get($key);

        if ($data) {
            return json_decode($data, true);
        }

        return null;
    }

    public function save(array $snapshot): void
    {
        $key = $this->getKey($snapshot['aggregate_type'], $snapshot['aggregate_id']);
        $this->redis->set($key, json_encode($snapshot));
    }

    private function getKey(string $aggregateType, string $aggregateId): string
    {
        return "snapshot:{$aggregateType}:{$aggregateId}";
    }
}

class Account extends AggregateRoot
{
    public function toSnapshot(): array
    {
        return [
            'aggregate_type' => 'account',
            'aggregate_id' => $this->aggregateId,
            'version' => $this->version,
            'user_id' => $this->userId,
            'currency' => $this->currency,
            'balance' => $this->balance,
            'frozen' => $this->frozen,
            'snapshot_at' => time(),
        ];
    }

    public static function fromSnapshot(array $snapshot): self
    {
        $account = new self();
        $account->aggregateId = $snapshot['aggregate_id'];
        $account->version = $snapshot['version'];
        $account->userId = $snapshot['user_id'];
        $account->currency = $snapshot['currency'];
        $account->balance = $snapshot['balance'];
        $account->frozen = $snapshot['frozen'];
        return $account;
    }
}

完整使用示例

php
<?php

require_once 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\EventSourcing\{
    EventStore,
    EventPublisher,
    AccountRepository,
    SnapshotRepository,
    Account
};

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$pdo = new PDO('mysql:host=localhost;dbname=event_sourcing', 'root', 'password');
$redis = new Redis();
$redis->connect('localhost', 6379);

$eventPublisher = new EventPublisher($connection);
$eventStore = new EventStore($pdo, $eventPublisher);
$snapshotRepository = new SnapshotRepository($redis);
$accountRepository = new AccountRepository($eventStore, $snapshotRepository);

echo "=== 事件溯源示例 ===\n\n";

echo "1. 创建账户\n";
$account = Account::open('ACC001', 'user_001', 'CNY');
$accountRepository->save($account);
echo "账户已创建: ACC001\n";

echo "\n2. 存款操作\n";
$account = $accountRepository->load('ACC001');
$account->deposit(1000.00, '首次存款');
$accountRepository->save($account);
echo "存款 1000.00,当前余额: {$account->getBalance()}\n";

echo "\n3. 取款操作\n";
$account = $accountRepository->load('ACC001');
$account->withdraw(200.00, 'ATM取款');
$accountRepository->save($account);
echo "取款 200.00,当前余额: {$account->getBalance()}\n";

echo "\n4. 转账操作\n";
$account = $accountRepository->load('ACC001');
$account->transfer('ACC002', 300.00);
$accountRepository->save($account);
echo "转账 300.00 到 ACC002,当前余额: {$account->getBalance()}\n";

echo "\n5. 冻结账户\n";
$account = $accountRepository->load('ACC001');
$account->freeze('可疑交易');
$accountRepository->save($account);
echo "账户已冻结\n";

echo "\n6. 查询事件历史\n";
$events = $eventStore->getEvents('account', 'ACC001');
echo "账户 ACC001 事件历史:\n";
foreach ($events as $event) {
    echo "- [v{$event->version}] {$event->eventType}\n";
}

echo "\n7. 重建历史状态\n";
$account = new Account();
$account->loadFromHistory(array_slice($events, 0, 2));
echo "重建到版本2时的余额: {$account->getBalance()}\n";

$eventPublisher->close();
$connection->close();

echo "\n=== 示例完成 ===\n";

关键技术点解析

1. 事件追加

事件只能追加,不能修改或删除:

php
$this->eventStore->append($events);

2. 状态重建

通过重放事件重建状态:

php
public function loadFromHistory(array $events): void
{
    foreach ($events as $event) {
        $this->apply($event);
    }
}

3. 快照优化

定期保存快照,加速重建:

php
if ($account->getVersion() % 100 === 0) {
    $this->snapshotRepository->save($account->toSnapshot());
}

4. 版本控制

通过版本号保证事件顺序:

php
UNIQUE KEY uk_aggregate_version (aggregate_type, aggregate_id, version)

性能优化建议

优化项建议说明
快照策略每 N 个事件保存快照减少重建时间
事件压缩大事件压缩存储减少存储空间
分区存储按聚合根分区提高查询效率
异步投影异步构建读模型提升写入性能

常见问题与解决方案

1. 事件膨胀

问题: 事件数量快速增长

解决方案:

  • 快照机制
  • 事件归档
  • 事件压缩

2. 重建性能

问题: 大量事件重建慢

解决方案:

  • 快照优化
  • 增量加载
  • 并行处理

3. 事件格式变更

问题: 事件结构需要升级

解决方案:

  • 事件版本化
  • 向上转换器
  • 多版本支持

相关链接