Appearance
CQRS 实现
概述
CQRS(Command Query Responsibility Segregation)是一种将命令(写操作)和查询(读操作)分离的架构模式。通过 RabbitMQ 实现命令和查询的异步同步,可以显著提升系统的性能和可扩展性。
业务背景与需求
场景描述
某电商系统读写特征分析:
| 操作类型 | 特点 | 频率 | 性能要求 |
|---|---|---|---|
| 商品查询 | 只读、高并发 | 10000 QPS | < 50ms |
| 订单查询 | 只读、中并发 | 5000 QPS | < 100ms |
| 订单创建 | 写入、事务性 | 100 TPS | < 500ms |
| 库存更新 | 写入、原子性 | 200 TPS | < 200ms |
传统架构问题
单一模型问题:
1. 读写操作竞争同一数据源
2. 复杂查询影响写入性能
3. 难以针对读写分别优化
4. 数据模型过于复杂CQRS 优势
| 优势 | 说明 |
|---|---|
| 独立优化 | 读写模型可独立设计和优化 |
| 性能提升 | 读写分离,互不影响 |
| 扩展灵活 | 读写端可独立扩展 |
| 复杂度降低 | 各端模型更简单清晰 |
架构设计
CQRS 架构图
mermaid
graph TB
subgraph "客户端"
A[查询请求]
B[命令请求]
end
subgraph "查询端"
C[查询API]
D[读模型<br/>Redis/Elasticsearch]
end
subgraph "命令端"
E[命令API]
F[命令处理器]
G[写模型<br/>MySQL]
H[事件发布器]
end
subgraph "RabbitMQ"
I[事件交换机<br/>cqrs.exchange]
J[同步队列<br/>cqrs.sync]
end
subgraph "同步服务"
K[事件处理器]
L[投影构建器]
end
A --> C
C --> D
B --> E
E --> F
F --> G
F --> H
H --> I
I --> J
J --> K
K --> L
L --> D数据同步流程
mermaid
sequenceDiagram
participant Client as 客户端
participant Command as 命令端
participant WriteDB as 写数据库
participant MQ as RabbitMQ
participant Sync as 同步服务
participant ReadDB as 读数据库
Client->>Command: 发送命令
Command->>Command: 验证命令
Command->>WriteDB: 执行写入
WriteDB-->>Command: 返回结果
Command->>MQ: 发布领域事件
Command-->>Client: 返回成功
MQ->>Sync: 投递事件
Sync->>Sync: 构建投影
Sync->>ReadDB: 更新读模型
Sync-->>MQ: ACK确认
Note over Client,ReadDB: 查询端异步更新读写模型对比
mermaid
graph LR
subgraph "写模型"
A[订单表]
B[订单明细表]
C[状态机]
end
subgraph "读模型"
D[订单视图]
E[订单列表]
F[统计报表]
end
A --> D
B --> D
A --> E
A --> FPHP 代码实现
命令定义
php
<?php
namespace App\CQRS;
abstract class Command
{
public string $commandId;
public string $commandType;
public int $timestamp;
public array $metadata;
public function __construct()
{
$this->commandId = $this->generateCommandId();
$this->commandType = $this->getCommandType();
$this->timestamp = time();
$this->metadata = [];
}
private function generateCommandId(): string
{
return sprintf('cmd_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
}
abstract protected function getCommandType(): string;
abstract public function getPayload(): array;
public function toArray(): array
{
return [
'command_id' => $this->commandId,
'command_type' => $this->commandType,
'timestamp' => $this->timestamp,
'metadata' => $this->metadata,
'payload' => $this->getPayload(),
];
}
}
class CreateOrderCommand extends Command
{
private string $userId;
private array $items;
private float $totalAmount;
private string $shippingAddress;
public function __construct(
string $userId,
array $items,
float $totalAmount,
string $shippingAddress
) {
parent::__construct();
$this->userId = $userId;
$this->items = $items;
$this->totalAmount = $totalAmount;
$this->shippingAddress = $shippingAddress;
}
protected function getCommandType(): string
{
return 'order.create';
}
public function getPayload(): array
{
return [
'user_id' => $this->userId,
'items' => $this->items,
'total_amount' => $this->totalAmount,
'shipping_address' => $this->shippingAddress,
];
}
}
class UpdateOrderStatusCommand extends Command
{
private string $orderId;
private string $status;
private ?string $remark;
public function __construct(string $orderId, string $status, ?string $remark = null)
{
parent::__construct();
$this->orderId = $orderId;
$this->status = $status;
$this->remark = $remark;
}
protected function getCommandType(): string
{
return 'order.update_status';
}
public function getPayload(): array
{
return [
'order_id' => $this->orderId,
'status' => $this->status,
'remark' => $this->remark,
];
}
}查询定义
php
<?php
namespace App\CQRS;
abstract class Query
{
public string $queryId;
public string $queryType;
public int $timestamp;
public function __construct()
{
$this->queryId = $this->generateQueryId();
$this->queryType = $this->getQueryType();
$this->timestamp = time();
}
private function generateQueryId(): string
{
return sprintf('qry_%s_%s', date('YmdHis'), bin2hex(random_bytes(4)));
}
abstract protected function getQueryType(): string;
abstract public function getCriteria(): array;
}
class GetOrderQuery extends Query
{
private string $orderId;
public function __construct(string $orderId)
{
parent::__construct();
$this->orderId = $orderId;
}
protected function getQueryType(): string
{
return 'order.get';
}
public function getCriteria(): array
{
return ['order_id' => $this->orderId];
}
}
class ListOrdersQuery extends Query
{
private string $userId;
private int $page;
private int $pageSize;
private ?string $status;
public function __construct(
string $userId,
int $page = 1,
int $pageSize = 20,
?string $status = null
) {
parent::__construct();
$this->userId = $userId;
$this->page = $page;
$this->pageSize = $pageSize;
$this->status = $status;
}
protected function getQueryType(): string
{
return 'order.list';
}
public function getCriteria(): array
{
return [
'user_id' => $this->userId,
'page' => $this->page,
'page_size' => $this->pageSize,
'status' => $this->status,
];
}
}命令处理器
php
<?php
namespace App\CQRS;
interface CommandHandlerInterface
{
public function handle(Command $command): CommandResult;
public function supports(string $commandType): bool;
}
class CommandResult
{
private bool $success;
private $data;
private ?string $error;
private array $events;
private function __construct(bool $success, $data = null, ?string $error = null, array $events = [])
{
$this->success = $success;
$this->data = $data;
$this->error = $error;
$this->events = $events;
}
public static function success($data = null, array $events = []): self
{
return new self(true, $data, null, $events);
}
public static function failure(string $error): self
{
return new self(false, null, $error);
}
public function isSuccess(): bool
{
return $this->success;
}
public function getData()
{
return $this->data;
}
public function getError(): ?string
{
return $this->error;
}
public function getEvents(): array
{
return $this->events;
}
}
class CreateOrderHandler implements CommandHandlerInterface
{
private OrderRepository $orderRepository;
private InventoryService $inventoryService;
public function __construct(
OrderRepository $orderRepository,
InventoryService $inventoryService
) {
$this->orderRepository = $orderRepository;
$this->inventoryService = $inventoryService;
}
public function handle(Command $command): CommandResult
{
$payload = $command->getPayload();
$orderId = $this->generateOrderId();
foreach ($payload['items'] as $item) {
$reserved = $this->inventoryService->reserve(
$item['product_id'],
$item['sku_id'],
$item['quantity']
);
if (!$reserved) {
return CommandResult::failure("Insufficient inventory for product: {$item['product_id']}");
}
}
$order = [
'order_id' => $orderId,
'user_id' => $payload['user_id'],
'items' => $payload['items'],
'total_amount' => $payload['total_amount'],
'shipping_address' => $payload['shipping_address'],
'status' => 'pending',
'created_at' => date('Y-m-d H:i:s'),
];
$this->orderRepository->create($order);
$event = new OrderCreatedEvent(
$orderId,
$payload['user_id'],
$payload['items'],
$payload['total_amount']
);
return CommandResult::success(['order_id' => $orderId], [$event]);
}
public function supports(string $commandType): bool
{
return $commandType === 'order.create';
}
private function generateOrderId(): string
{
return sprintf('ORD%s%s', date('YmdHis'), strtoupper(bin2hex(random_bytes(4))));
}
}
class CommandBus
{
private array $handlers = [];
private EventPublisher $eventPublisher;
public function __construct(EventPublisher $eventPublisher)
{
$this->eventPublisher = $eventPublisher;
}
public function register(CommandHandlerInterface $handler): void
{
$this->handlers[] = $handler;
}
public function dispatch(Command $command): CommandResult
{
foreach ($this->handlers as $handler) {
if ($handler->supports($command->commandType)) {
$result = $handler->handle($command);
if ($result->isSuccess()) {
foreach ($result->getEvents() as $event) {
$this->eventPublisher->publish($event);
}
}
return $result;
}
}
return CommandResult::failure("No handler found for command: {$command->commandType}");
}
}查询处理器
php
<?php
namespace App\CQRS;
interface QueryHandlerInterface
{
public function handle(Query $query): QueryResult;
public function supports(string $queryType): bool;
}
class QueryResult
{
private bool $success;
private $data;
private ?string $error;
private function __construct(bool $success, $data = null, ?string $error = null)
{
$this->success = $success;
$this->data = $data;
$this->error = $error;
}
public static function success($data): self
{
return new self(true, $data);
}
public static function failure(string $error): self
{
return new self(false, null, $error);
}
public function isSuccess(): bool
{
return $this->success;
}
public function getData()
{
return $this->data;
}
}
class GetOrderHandler implements QueryHandlerInterface
{
private OrderReadRepository $readRepository;
public function __construct(OrderReadRepository $readRepository)
{
$this->readRepository = $readRepository;
}
public function handle(Query $query): QueryResult
{
$criteria = $query->getCriteria();
$orderId = $criteria['order_id'];
$order = $this->readRepository->findById($orderId);
if (!$order) {
return QueryResult::failure("Order not found: {$orderId}");
}
return QueryResult::success($order);
}
public function supports(string $queryType): bool
{
return $queryType === 'order.get';
}
}
class ListOrdersHandler implements QueryHandlerInterface
{
private OrderReadRepository $readRepository;
public function __construct(OrderReadRepository $readRepository)
{
$this->readRepository = $readRepository;
}
public function handle(Query $query): QueryResult
{
$criteria = $query->getCriteria();
$orders = $this->readRepository->findByUser(
$criteria['user_id'],
$criteria['page'],
$criteria['page_size'],
$criteria['status']
);
$total = $this->readRepository->countByUser(
$criteria['user_id'],
$criteria['status']
);
return QueryResult::success([
'items' => $orders,
'total' => $total,
'page' => $criteria['page'],
'page_size' => $criteria['page_size'],
]);
}
public function supports(string $queryType): bool
{
return $queryType === 'order.list';
}
}
class QueryBus
{
private array $handlers = [];
public function register(QueryHandlerInterface $handler): void
{
$this->handlers[] = $handler;
}
public function dispatch(Query $query): QueryResult
{
foreach ($this->handlers as $handler) {
if ($handler->supports($query->queryType)) {
return $handler->handle($query);
}
}
return QueryResult::failure("No handler found for query: {$query->queryType}");
}
}读模型同步服务
php
<?php
namespace App\CQRS;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class ReadModelSyncService
{
private AMQPStreamConnection $connection;
private $channel;
private OrderReadRepository $readRepository;
private array $projectors = [];
private bool $running = true;
public function __construct(
AMQPStreamConnection $connection,
OrderReadRepository $readRepository
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->readRepository = $readRepository;
$this->setupProjectors();
}
private function setupProjectors(): void
{
$this->projectors = [
'order.created' => [$this, 'projectOrderCreated'],
'order.status_changed' => [$this, 'projectOrderStatusChanged'],
'order.cancelled' => [$this, 'projectOrderCancelled'],
];
}
public function start(string $queueName = 'cqrs.sync'): void
{
$this->channel->queue_declare($queueName, false, true, false, false);
$this->channel->queue_bind($queueName, 'cqrs.exchange', '#');
$this->channel->basic_qos(null, 10, null);
$this->channel->basic_consume($queueName, '', false, false, false, false, [$this, 'handleEvent']);
while ($this->running && count($this->channel->callbacks)) {
$this->channel->wait(null, true);
if (!$this->running) break;
usleep(100000);
}
}
public function handleEvent(AMQPMessage $message): void
{
$event = json_decode($message->body, true);
$eventType = $event['event_type'];
if (isset($this->projectors[$eventType])) {
try {
$this->projectors[$eventType]($event);
$message->ack();
} catch (\Exception $e) {
error_log("Projection error: " . $e->getMessage());
$message->nack(false, true);
}
} else {
$message->ack();
}
}
private function projectOrderCreated(array $event): void
{
$payload = $event['payload'];
$orderView = [
'order_id' => $payload['order_id'],
'user_id' => $payload['user_id'],
'items' => json_encode($payload['items']),
'total_amount' => $payload['total_amount'],
'status' => 'pending',
'item_count' => count($payload['items']),
'created_at' => date('Y-m-d H:i:s', $event['timestamp']),
'updated_at' => date('Y-m-d H:i:s', $event['timestamp']),
];
$this->readRepository->upsert($orderView);
}
private function projectOrderStatusChanged(array $event): void
{
$payload = $event['payload'];
$this->readRepository->updateStatus(
$payload['order_id'],
$payload['status'],
date('Y-m-d H:i:s', $event['timestamp'])
);
}
private function projectOrderCancelled(array $event): void
{
$payload = $event['payload'];
$this->readRepository->updateStatus(
$payload['order_id'],
'cancelled',
date('Y-m-d H:i:s', $event['timestamp'])
);
}
public function stop(): void
{
$this->running = false;
}
}完整使用示例
php
<?php
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\CQRS\{
Command,
Query,
CommandBus,
QueryBus,
CreateOrderCommand,
GetOrderQuery,
ListOrdersQuery,
CreateOrderHandler,
GetOrderHandler,
ListOrdersHandler,
ReadModelSyncService
};
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$redis = new Redis();
$redis->connect('localhost', 6379);
$eventPublisher = new EventPublisher($connection);
$commandBus = new CommandBus($eventPublisher);
$queryBus = new QueryBus();
$orderWriteRepo = new OrderRepository();
$orderReadRepo = new OrderReadRepository($redis);
$inventoryService = new InventoryService();
$commandBus->register(new CreateOrderHandler($orderWriteRepo, $inventoryService));
$queryBus->register(new GetOrderHandler($orderReadRepo));
$queryBus->register(new ListOrdersHandler($orderReadRepo));
$syncService = new ReadModelSyncService($connection, $orderReadRepo);
$syncPid = pcntl_fork();
if ($syncPid === 0) {
echo "启动读模型同步服务...\n";
$syncService->start();
exit(0);
}
sleep(1);
echo "=== CQRS 示例 ===\n\n";
echo "1. 执行命令 - 创建订单\n";
$command = new CreateOrderCommand(
'user_001',
[
['product_id' => 'P001', 'sku_id' => 'SKU001', 'quantity' => 2, 'price' => 99.00],
['product_id' => 'P002', 'sku_id' => 'SKU002', 'quantity' => 1, 'price' => 199.00],
],
397.00,
'北京市朝阳区xxx'
);
$result = $commandBus->dispatch($command);
if ($result->isSuccess()) {
$orderId = $result->getData()['order_id'];
echo "订单创建成功: {$orderId}\n";
} else {
echo "订单创建失败: {$result->getError()}\n";
}
sleep(2);
echo "\n2. 执行查询 - 获取订单详情\n";
$query = new GetOrderQuery($orderId);
$result = $queryBus->dispatch($query);
if ($result->isSuccess()) {
echo "订单详情:\n";
print_r($result->getData());
}
echo "\n3. 执行查询 - 订单列表\n";
$query = new ListOrdersQuery('user_001', 1, 10);
$result = $queryBus->dispatch($query);
if ($result->isSuccess()) {
$data = $result->getData();
echo "订单列表 (共 {$data['total']} 条):\n";
foreach ($data['items'] as $order) {
echo "- {$order['order_id']}: {$order['status']}\n";
}
}
posix_kill($syncPid, SIGTERM);
pcntl_wait($status);
$eventPublisher->close();
$syncService->stop();
$connection->close();
echo "\n=== 示例完成 ===\n";关键技术点解析
1. 读写分离
php
class CommandBus { /* 命令处理 */ }
class QueryBus { /* 查询处理 */ }2. 事件驱动同步
php
$this->eventPublisher->publish($event);3. 投影构建
php
private function projectOrderCreated(array $event): void
{
$orderView = [...];
$this->readRepository->upsert($orderView);
}4. 最终一致性
读模型异步更新,保证最终一致:
命令执行 → 事件发布 → 投影构建 → 读模型更新性能优化建议
| 优化项 | 建议 | 说明 |
|---|---|---|
| 读模型缓存 | Redis 缓存热点数据 | 提升查询性能 |
| 批量同步 | 合并多个事件 | 减少写入次数 |
| 分库分表 | 读写分离存储 | 独立扩展 |
| 索引优化 | 针对查询优化索引 | 提升查询效率 |
常见问题与解决方案
1. 数据延迟
问题: 读模型更新延迟
解决方案:
- 优化同步速度
- 关键场景直接查写库
- 提示用户刷新
2. 事件丢失
问题: 同步事件丢失
解决方案:
- 消息持久化
- 事件存储
- 定期校验
3. 投影错误
问题: 投影构建失败
解决方案:
- 错误日志记录
- 重试机制
- 手动修复工具
