Appearance
事件溯源实现
概述
事件溯源(Event Sourcing)是一种将应用状态变化存储为事件序列的模式。通过 RabbitMQ 持久化和传播这些事件,可以实现完整的状态重建、时间旅行查询和审计追踪。
业务背景与需求
场景描述
某金融系统需要完整记录账户状态变化:
| 操作 | 事件 | 数据 |
|---|---|---|
| 开户 | AccountOpened | 账户ID、用户ID、初始余额 |
| 存款 | MoneyDeposited | 账户ID、金额、时间 |
| 取款 | MoneyWithdrawn | 账户ID、金额、时间 |
| 转账 | MoneyTransferred | 源账户、目标账户、金额 |
| 冻结 | AccountFrozen | 账户ID、原因、时间 |
传统存储问题
状态覆盖问题:
1. 只保留最终状态,历史丢失
2. 无法回溯历史状态
3. 审计追踪困难
4. Bug 难以定位和修复事件溯源优势
| 优势 | 说明 |
|---|---|
| 完整历史 | 所有状态变化都有记录 |
| 时间旅行 | 可重建任意时间点的状态 |
| 审计追踪 | 天然支持审计需求 |
| 调试友好 | 可重放事件定位问题 |
架构设计
事件溯源架构图
mermaid
graph TB
subgraph "命令端"
A[命令处理]
B[聚合根]
C[事件生成]
end
subgraph "事件存储"
D[事件日志]
E[快照存储]
end
subgraph "RabbitMQ"
F[事件交换机<br/>es.exchange]
G[事件队列]
end
subgraph "投影端"
H[投影构建器]
I[读模型]
end
subgraph "重建服务"
J[状态重建]
K[快照生成]
end
A --> B
B --> C
C --> D
C --> F
D --> E
D --> J
F --> G
G --> H
H --> I
J --> K
K --> E事件流转流程
mermaid
sequenceDiagram
participant Client as 客户端
participant Command as 命令处理器
participant Aggregate as 聚合根
participant Store as 事件存储
participant MQ as RabbitMQ
participant Projector as 投影器
Client->>Command: 发送命令
Command->>Store: 加载历史事件
Store-->>Command: 返回事件列表
Command->>Aggregate: 重放事件
Aggregate-->>Command: 返回当前状态
Command->>Aggregate: 执行命令
Aggregate->>Aggregate: 生成新事件
Aggregate-->>Command: 返回新事件
Command->>Store: 追加事件
Command->>MQ: 发布事件
Command-->>Client: 返回结果
MQ->>Projector: 投递事件
Projector->>Projector: 构建投影
Projector->>Projector: 更新读模型状态重建流程
mermaid
graph LR
A[事件1<br/>开户] --> B[事件2<br/>存款100]
B --> C[事件3<br/>取款50]
C --> D[事件4<br/>存款200]
D --> E[当前状态<br/>余额250]
style A fill:#e1f5fe
style B fill:#e1f5fe
style C fill:#e1f5fe
style D fill:#e1f5fe
style E fill:#c8e6c9PHP 代码实现
事件基类
php
<?php
namespace App\EventSourcing;
abstract class DomainEvent
{
public string $eventId;
public string $eventType;
public string $aggregateType;
public string $aggregateId;
public int $version;
public int $timestamp;
public array $metadata;
public function __construct(
string $aggregateType,
string $aggregateId,
int $version
) {
$this->eventId = $this->generateEventId();
$this->eventType = $this->getEventType();
$this->aggregateType = $aggregateType;
$this->aggregateId = $aggregateId;
$this->version = $version;
$this->timestamp = time();
$this->metadata = [];
}
private function generateEventId(): string
{
return sprintf('evt_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
}
abstract protected function getEventType(): string;
abstract public function getPayload(): array;
public function toArray(): array
{
return [
'event_id' => $this->eventId,
'event_type' => $this->eventType,
'aggregate_type' => $this->aggregateType,
'aggregate_id' => $this->aggregateId,
'version' => $this->version,
'timestamp' => $this->timestamp,
'metadata' => $this->metadata,
'payload' => $this->getPayload(),
];
}
public static function fromArray(array $data): self
{
$class = static::class;
$event = new $class(
$data['aggregate_id'],
$data['version']
);
$event->eventId = $data['event_id'];
$event->timestamp = $data['timestamp'];
$event->metadata = $data['metadata'] ?? [];
return $event;
}
}账户事件定义
php
<?php
namespace App\EventSourcing\Events;
use App\EventSourcing\DomainEvent;
class AccountOpenedEvent extends DomainEvent
{
private string $userId;
private string $currency;
private float $initialBalance;
public function __construct(
string $accountId,
string $userId,
string $currency = 'CNY',
float $initialBalance = 0.0
) {
parent::__construct('account', $accountId, 1);
$this->userId = $userId;
$this->currency = $currency;
$this->initialBalance = $initialBalance;
}
protected function getEventType(): string
{
return 'account.opened';
}
public function getPayload(): array
{
return [
'user_id' => $this->userId,
'currency' => $this->currency,
'initial_balance' => $this->initialBalance,
];
}
}
class MoneyDepositedEvent extends DomainEvent
{
private float $amount;
private float $balanceAfter;
private string $description;
public function __construct(
string $accountId,
int $version,
float $amount,
float $balanceAfter,
string $description = ''
) {
parent::__construct('account', $accountId, $version);
$this->amount = $amount;
$this->balanceAfter = $balanceAfter;
$this->description = $description;
}
protected function getEventType(): string
{
return 'account.money_deposited';
}
public function getPayload(): array
{
return [
'amount' => $this->amount,
'balance_after' => $this->balanceAfter,
'description' => $this->description,
];
}
}
class MoneyWithdrawnEvent extends DomainEvent
{
private float $amount;
private float $balanceAfter;
private string $description;
public function __construct(
string $accountId,
int $version,
float $amount,
float $balanceAfter,
string $description = ''
) {
parent::__construct('account', $accountId, $version);
$this->amount = $amount;
$this->balanceAfter = $balanceAfter;
$this->description = $description;
}
protected function getEventType(): string
{
return 'account.money_withdrawn';
}
public function getPayload(): array
{
return [
'amount' => $this->amount,
'balance_after' => $this->balanceAfter,
'description' => $this->description,
];
}
}
class MoneyTransferredEvent extends DomainEvent
{
private string $toAccountId;
private float $amount;
private float $balanceAfter;
public function __construct(
string $accountId,
int $version,
string $toAccountId,
float $amount,
float $balanceAfter
) {
parent::__construct('account', $accountId, $version);
$this->toAccountId = $toAccountId;
$this->amount = $amount;
$this->balanceAfter = $balanceAfter;
}
protected function getEventType(): string
{
return 'account.money_transferred';
}
public function getPayload(): array
{
return [
'to_account_id' => $this->toAccountId,
'amount' => $this->amount,
'balance_after' => $this->balanceAfter,
];
}
}
class AccountFrozenEvent extends DomainEvent
{
private string $reason;
public function __construct(
string $accountId,
int $version,
string $reason
) {
parent::__construct('account', $accountId, $version);
$this->reason = $reason;
}
protected function getEventType(): string
{
return 'account.frozen';
}
public function getPayload(): array
{
return [
'reason' => $this->reason,
];
}
}聚合根实现
php
<?php
namespace App\EventSourcing;
abstract class AggregateRoot
{
protected string $aggregateId;
protected int $version = 0;
protected array $uncommittedEvents = [];
public function getAggregateId(): string
{
return $this->aggregateId;
}
public function getVersion(): int
{
return $this->version;
}
public function getUncommittedEvents(): array
{
return $this->uncommittedEvents;
}
public function markEventsAsCommitted(): void
{
$this->uncommittedEvents = [];
}
public function loadFromHistory(array $events): void
{
foreach ($events as $event) {
$this->apply($event);
$this->version = $event->version;
}
}
protected function raise(DomainEvent $event): void
{
$this->uncommittedEvents[] = $event;
$this->apply($event);
$this->version = $event->version;
}
abstract protected function apply(DomainEvent $event): void;
}
class Account extends AggregateRoot
{
private string $userId;
private string $currency;
private float $balance = 0.0;
private bool $frozen = false;
public static function open(string $accountId, string $userId, string $currency = 'CNY'): self
{
$account = new self();
$account->aggregateId = $accountId;
$event = new AccountOpenedEvent($accountId, $userId, $currency);
$account->raise($event);
return $account;
}
public function deposit(float $amount, string $description = ''): void
{
if ($this->frozen) {
throw new \RuntimeException('Account is frozen');
}
if ($amount <= 0) {
throw new \InvalidArgumentException('Amount must be positive');
}
$newBalance = $this->balance + $amount;
$event = new MoneyDepositedEvent(
$this->aggregateId,
$this->version + 1,
$amount,
$newBalance,
$description
);
$this->raise($event);
}
public function withdraw(float $amount, string $description = ''): void
{
if ($this->frozen) {
throw new \RuntimeException('Account is frozen');
}
if ($amount <= 0) {
throw new \InvalidArgumentException('Amount must be positive');
}
if ($this->balance < $amount) {
throw new \RuntimeException('Insufficient balance');
}
$newBalance = $this->balance - $amount;
$event = new MoneyWithdrawnEvent(
$this->aggregateId,
$this->version + 1,
$amount,
$newBalance,
$description
);
$this->raise($event);
}
public function transfer(string $toAccountId, float $amount): void
{
if ($this->frozen) {
throw new \RuntimeException('Account is frozen');
}
if ($amount <= 0) {
throw new \InvalidArgumentException('Amount must be positive');
}
if ($this->balance < $amount) {
throw new \RuntimeException('Insufficient balance');
}
$newBalance = $this->balance - $amount;
$event = new MoneyTransferredEvent(
$this->aggregateId,
$this->version + 1,
$toAccountId,
$amount,
$newBalance
);
$this->raise($event);
}
public function freeze(string $reason): void
{
if ($this->frozen) {
return;
}
$event = new AccountFrozenEvent(
$this->aggregateId,
$this->version + 1,
$reason
);
$this->raise($event);
}
public function getBalance(): float
{
return $this->balance;
}
public function isFrozen(): bool
{
return $this->frozen;
}
protected function apply(DomainEvent $event): void
{
$method = 'apply' . $this->getEventClassName($event);
if (method_exists($this, $method)) {
$this->$method($event);
}
}
private function getEventClassName(DomainEvent $event): string
{
$parts = explode('\\', get_class($event));
return end($parts);
}
private function applyAccountOpenedEvent(AccountOpenedEvent $event): void
{
$this->userId = $event->getPayload()['user_id'];
$this->currency = $event->getPayload()['currency'];
$this->balance = $event->getPayload()['initial_balance'];
}
private function applyMoneyDepositedEvent(MoneyDepositedEvent $event): void
{
$this->balance = $event->getPayload()['balance_after'];
}
private function applyMoneyWithdrawnEvent(MoneyWithdrawnEvent $event): void
{
$this->balance = $event->getPayload()['balance_after'];
}
private function applyMoneyTransferredEvent(MoneyTransferredEvent $event): void
{
$this->balance = $event->getPayload()['balance_after'];
}
private function applyAccountFrozenEvent(AccountFrozenEvent $event): void
{
$this->frozen = true;
}
}事件存储实现
php
<?php
namespace App\EventSourcing;
class EventStore
{
private $pdo;
private EventPublisher $publisher;
public function __construct(\PDO $pdo, EventPublisher $publisher)
{
$this->pdo = $pdo;
$this->publisher = $publisher;
$this->initTable();
}
private function initTable(): void
{
$sql = "
CREATE TABLE IF NOT EXISTS event_store (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
event_id VARCHAR(64) NOT NULL UNIQUE,
event_type VARCHAR(128) NOT NULL,
aggregate_type VARCHAR(64) NOT NULL,
aggregate_id VARCHAR(64) NOT NULL,
version INT NOT NULL,
payload JSON NOT NULL,
metadata JSON,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_aggregate_version (aggregate_type, aggregate_id, version),
INDEX idx_aggregate (aggregate_type, aggregate_id),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
";
$this->pdo->exec($sql);
}
public function append(array $events): void
{
$this->pdo->beginTransaction();
try {
foreach ($events as $event) {
$this->insertEvent($event);
}
$this->pdo->commit();
foreach ($events as $event) {
$this->publisher->publish($event);
}
} catch (\Exception $e) {
$this->pdo->rollBack();
throw $e;
}
}
private function insertEvent(DomainEvent $event): void
{
$sql = "
INSERT INTO event_store
(event_id, event_type, aggregate_type, aggregate_id, version, payload, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)
";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([
$event->eventId,
$event->eventType,
$event->aggregateType,
$event->aggregateId,
$event->version,
json_encode($event->getPayload()),
json_encode($event->metadata),
]);
}
public function getEvents(string $aggregateType, string $aggregateId): array
{
$sql = "
SELECT * FROM event_store
WHERE aggregate_type = ? AND aggregate_id = ?
ORDER BY version ASC
";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([$aggregateType, $aggregateId]);
$events = [];
while ($row = $stmt->fetch(\PDO::FETCH_ASSOC)) {
$events[] = $this->deserializeEvent($row);
}
return $events;
}
public function getEventsFromVersion(string $aggregateType, string $aggregateId, int $version): array
{
$sql = "
SELECT * FROM event_store
WHERE aggregate_type = ? AND aggregate_id = ? AND version > ?
ORDER BY version ASC
";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([$aggregateType, $aggregateId, $version]);
$events = [];
while ($row = $stmt->fetch(\PDO::FETCH_ASSOC)) {
$events[] = $this->deserializeEvent($row);
}
return $events;
}
private function deserializeEvent(array $row): DomainEvent
{
$eventClass = $this->getEventClass($row['event_type']);
$eventData = [
'event_id' => $row['event_id'],
'event_type' => $row['event_type'],
'aggregate_type' => $row['aggregate_type'],
'aggregate_id' => $row['aggregate_id'],
'version' => (int) $row['version'],
'timestamp' => strtotime($row['created_at']),
'metadata' => json_decode($row['metadata'], true) ?? [],
'payload' => json_decode($row['payload'], true),
];
return $eventClass::fromArray($eventData);
}
private function getEventClass(string $eventType): string
{
$map = [
'account.opened' => AccountOpenedEvent::class,
'account.money_deposited' => MoneyDepositedEvent::class,
'account.money_withdrawn' => MoneyWithdrawnEvent::class,
'account.money_transferred' => MoneyTransferredEvent::class,
'account.frozen' => AccountFrozenEvent::class,
];
return $map[$eventType] ?? DomainEvent::class;
}
}仓储实现
php
<?php
namespace App\EventSourcing;
class AccountRepository
{
private EventStore $eventStore;
private SnapshotRepository $snapshotRepository;
public function __construct(
EventStore $eventStore,
SnapshotRepository $snapshotRepository
) {
$this->eventStore = $eventStore;
$this->snapshotRepository = $snapshotRepository;
}
public function load(string $accountId): Account
{
$snapshot = $this->snapshotRepository->get('account', $accountId);
if ($snapshot) {
$account = Account::fromSnapshot($snapshot);
$events = $this->eventStore->getEventsFromVersion(
'account',
$accountId,
$snapshot['version']
);
$account->loadFromHistory($events);
} else {
$events = $this->eventStore->getEvents('account', $accountId);
$account = new Account();
$account->loadFromHistory($events);
}
return $account;
}
public function save(Account $account): void
{
$events = $account->getUncommittedEvents();
if (empty($events)) {
return;
}
$this->eventStore->append($events);
$account->markEventsAsCommitted();
if ($account->getVersion() % 100 === 0) {
$this->snapshotRepository->save($account->toSnapshot());
}
}
}快照实现
php
<?php
namespace App\EventSourcing;
class SnapshotRepository
{
private $redis;
public function __construct($redis)
{
$this->redis = $redis;
}
public function get(string $aggregateType, string $aggregateId): ?array
{
$key = $this->getKey($aggregateType, $aggregateId);
$data = $this->redis->get($key);
if ($data) {
return json_decode($data, true);
}
return null;
}
public function save(array $snapshot): void
{
$key = $this->getKey($snapshot['aggregate_type'], $snapshot['aggregate_id']);
$this->redis->set($key, json_encode($snapshot));
}
private function getKey(string $aggregateType, string $aggregateId): string
{
return "snapshot:{$aggregateType}:{$aggregateId}";
}
}
class Account extends AggregateRoot
{
public function toSnapshot(): array
{
return [
'aggregate_type' => 'account',
'aggregate_id' => $this->aggregateId,
'version' => $this->version,
'user_id' => $this->userId,
'currency' => $this->currency,
'balance' => $this->balance,
'frozen' => $this->frozen,
'snapshot_at' => time(),
];
}
public static function fromSnapshot(array $snapshot): self
{
$account = new self();
$account->aggregateId = $snapshot['aggregate_id'];
$account->version = $snapshot['version'];
$account->userId = $snapshot['user_id'];
$account->currency = $snapshot['currency'];
$account->balance = $snapshot['balance'];
$account->frozen = $snapshot['frozen'];
return $account;
}
}完整使用示例
php
<?php
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\EventSourcing\{
EventStore,
EventPublisher,
AccountRepository,
SnapshotRepository,
Account
};
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$pdo = new PDO('mysql:host=localhost;dbname=event_sourcing', 'root', 'password');
$redis = new Redis();
$redis->connect('localhost', 6379);
$eventPublisher = new EventPublisher($connection);
$eventStore = new EventStore($pdo, $eventPublisher);
$snapshotRepository = new SnapshotRepository($redis);
$accountRepository = new AccountRepository($eventStore, $snapshotRepository);
echo "=== 事件溯源示例 ===\n\n";
echo "1. 创建账户\n";
$account = Account::open('ACC001', 'user_001', 'CNY');
$accountRepository->save($account);
echo "账户已创建: ACC001\n";
echo "\n2. 存款操作\n";
$account = $accountRepository->load('ACC001');
$account->deposit(1000.00, '首次存款');
$accountRepository->save($account);
echo "存款 1000.00,当前余额: {$account->getBalance()}\n";
echo "\n3. 取款操作\n";
$account = $accountRepository->load('ACC001');
$account->withdraw(200.00, 'ATM取款');
$accountRepository->save($account);
echo "取款 200.00,当前余额: {$account->getBalance()}\n";
echo "\n4. 转账操作\n";
$account = $accountRepository->load('ACC001');
$account->transfer('ACC002', 300.00);
$accountRepository->save($account);
echo "转账 300.00 到 ACC002,当前余额: {$account->getBalance()}\n";
echo "\n5. 冻结账户\n";
$account = $accountRepository->load('ACC001');
$account->freeze('可疑交易');
$accountRepository->save($account);
echo "账户已冻结\n";
echo "\n6. 查询事件历史\n";
$events = $eventStore->getEvents('account', 'ACC001');
echo "账户 ACC001 事件历史:\n";
foreach ($events as $event) {
echo "- [v{$event->version}] {$event->eventType}\n";
}
echo "\n7. 重建历史状态\n";
$account = new Account();
$account->loadFromHistory(array_slice($events, 0, 2));
echo "重建到版本2时的余额: {$account->getBalance()}\n";
$eventPublisher->close();
$connection->close();
echo "\n=== 示例完成 ===\n";关键技术点解析
1. 事件追加
事件只能追加,不能修改或删除:
php
$this->eventStore->append($events);2. 状态重建
通过重放事件重建状态:
php
public function loadFromHistory(array $events): void
{
foreach ($events as $event) {
$this->apply($event);
}
}3. 快照优化
定期保存快照,加速重建:
php
if ($account->getVersion() % 100 === 0) {
$this->snapshotRepository->save($account->toSnapshot());
}4. 版本控制
通过版本号保证事件顺序:
php
UNIQUE KEY uk_aggregate_version (aggregate_type, aggregate_id, version)性能优化建议
| 优化项 | 建议 | 说明 |
|---|---|---|
| 快照策略 | 每 N 个事件保存快照 | 减少重建时间 |
| 事件压缩 | 大事件压缩存储 | 减少存储空间 |
| 分区存储 | 按聚合根分区 | 提高查询效率 |
| 异步投影 | 异步构建读模型 | 提升写入性能 |
常见问题与解决方案
1. 事件膨胀
问题: 事件数量快速增长
解决方案:
- 快照机制
- 事件归档
- 事件压缩
2. 重建性能
问题: 大量事件重建慢
解决方案:
- 快照优化
- 增量加载
- 并行处理
3. 事件格式变更
问题: 事件结构需要升级
解决方案:
- 事件版本化
- 向上转换器
- 多版本支持
