Skip to content

CQRS 实现

概述

CQRS(Command Query Responsibility Segregation)是一种将命令(写操作)和查询(读操作)分离的架构模式。通过 RabbitMQ 实现命令和查询的异步同步,可以显著提升系统的性能和可扩展性。

业务背景与需求

场景描述

某电商系统读写特征分析:

操作类型特点频率性能要求
商品查询只读、高并发10000 QPS< 50ms
订单查询只读、中并发5000 QPS< 100ms
订单创建写入、事务性100 TPS< 500ms
库存更新写入、原子性200 TPS< 200ms

传统架构问题

单一模型问题:
1. 读写操作竞争同一数据源
2. 复杂查询影响写入性能
3. 难以针对读写分别优化
4. 数据模型过于复杂

CQRS 优势

优势说明
独立优化读写模型可独立设计和优化
性能提升读写分离,互不影响
扩展灵活读写端可独立扩展
复杂度降低各端模型更简单清晰

架构设计

CQRS 架构图

mermaid
graph TB
    subgraph "客户端"
        A[查询请求]
        B[命令请求]
    end
    
    subgraph "查询端"
        C[查询API]
        D[读模型<br/>Redis/Elasticsearch]
    end
    
    subgraph "命令端"
        E[命令API]
        F[命令处理器]
        G[写模型<br/>MySQL]
        H[事件发布器]
    end
    
    subgraph "RabbitMQ"
        I[事件交换机<br/>cqrs.exchange]
        J[同步队列<br/>cqrs.sync]
    end
    
    subgraph "同步服务"
        K[事件处理器]
        L[投影构建器]
    end
    
    A --> C
    C --> D
    
    B --> E
    E --> F
    F --> G
    F --> H
    H --> I
    
    I --> J
    J --> K
    K --> L
    L --> D

数据同步流程

mermaid
sequenceDiagram
    participant Client as 客户端
    participant Command as 命令端
    participant WriteDB as 写数据库
    participant MQ as RabbitMQ
    participant Sync as 同步服务
    participant ReadDB as 读数据库
    
    Client->>Command: 发送命令
    Command->>Command: 验证命令
    Command->>WriteDB: 执行写入
    WriteDB-->>Command: 返回结果
    Command->>MQ: 发布领域事件
    Command-->>Client: 返回成功
    
    MQ->>Sync: 投递事件
    Sync->>Sync: 构建投影
    Sync->>ReadDB: 更新读模型
    Sync-->>MQ: ACK确认
    
    Note over Client,ReadDB: 查询端异步更新

读写模型对比

mermaid
graph LR
    subgraph "写模型"
        A[订单表]
        B[订单明细表]
        C[状态机]
    end
    
    subgraph "读模型"
        D[订单视图]
        E[订单列表]
        F[统计报表]
    end
    
    A --> D
    B --> D
    A --> E
    A --> F

PHP 代码实现

命令定义

php
<?php

namespace App\CQRS;

abstract class Command
{
    public string $commandId;
    public string $commandType;
    public int $timestamp;
    public array $metadata;

    public function __construct()
    {
        $this->commandId = $this->generateCommandId();
        $this->commandType = $this->getCommandType();
        $this->timestamp = time();
        $this->metadata = [];
    }

    private function generateCommandId(): string
    {
        return sprintf('cmd_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
    }

    abstract protected function getCommandType(): string;

    abstract public function getPayload(): array;

    public function toArray(): array
    {
        return [
            'command_id' => $this->commandId,
            'command_type' => $this->commandType,
            'timestamp' => $this->timestamp,
            'metadata' => $this->metadata,
            'payload' => $this->getPayload(),
        ];
    }
}

class CreateOrderCommand extends Command
{
    private string $userId;
    private array $items;
    private float $totalAmount;
    private string $shippingAddress;

    public function __construct(
        string $userId,
        array $items,
        float $totalAmount,
        string $shippingAddress
    ) {
        parent::__construct();
        $this->userId = $userId;
        $this->items = $items;
        $this->totalAmount = $totalAmount;
        $this->shippingAddress = $shippingAddress;
    }

    protected function getCommandType(): string
    {
        return 'order.create';
    }

    public function getPayload(): array
    {
        return [
            'user_id' => $this->userId,
            'items' => $this->items,
            'total_amount' => $this->totalAmount,
            'shipping_address' => $this->shippingAddress,
        ];
    }
}

class UpdateOrderStatusCommand extends Command
{
    private string $orderId;
    private string $status;
    private ?string $remark;

    public function __construct(string $orderId, string $status, ?string $remark = null)
    {
        parent::__construct();
        $this->orderId = $orderId;
        $this->status = $status;
        $this->remark = $remark;
    }

    protected function getCommandType(): string
    {
        return 'order.update_status';
    }

    public function getPayload(): array
    {
        return [
            'order_id' => $this->orderId,
            'status' => $this->status,
            'remark' => $this->remark,
        ];
    }
}

查询定义

php
<?php

namespace App\CQRS;

abstract class Query
{
    public string $queryId;
    public string $queryType;
    public int $timestamp;

    public function __construct()
    {
        $this->queryId = $this->generateQueryId();
        $this->queryType = $this->getQueryType();
        $this->timestamp = time();
    }

    private function generateQueryId(): string
    {
        return sprintf('qry_%s_%s', date('YmdHis'), bin2hex(random_bytes(4)));
    }

    abstract protected function getQueryType(): string;

    abstract public function getCriteria(): array;
}

class GetOrderQuery extends Query
{
    private string $orderId;

    public function __construct(string $orderId)
    {
        parent::__construct();
        $this->orderId = $orderId;
    }

    protected function getQueryType(): string
    {
        return 'order.get';
    }

    public function getCriteria(): array
    {
        return ['order_id' => $this->orderId];
    }
}

class ListOrdersQuery extends Query
{
    private string $userId;
    private int $page;
    private int $pageSize;
    private ?string $status;

    public function __construct(
        string $userId,
        int $page = 1,
        int $pageSize = 20,
        ?string $status = null
    ) {
        parent::__construct();
        $this->userId = $userId;
        $this->page = $page;
        $this->pageSize = $pageSize;
        $this->status = $status;
    }

    protected function getQueryType(): string
    {
        return 'order.list';
    }

    public function getCriteria(): array
    {
        return [
            'user_id' => $this->userId,
            'page' => $this->page,
            'page_size' => $this->pageSize,
            'status' => $this->status,
        ];
    }
}

命令处理器

php
<?php

namespace App\CQRS;

interface CommandHandlerInterface
{
    public function handle(Command $command): CommandResult;
    public function supports(string $commandType): bool;
}

class CommandResult
{
    private bool $success;
    private $data;
    private ?string $error;
    private array $events;

    private function __construct(bool $success, $data = null, ?string $error = null, array $events = [])
    {
        $this->success = $success;
        $this->data = $data;
        $this->error = $error;
        $this->events = $events;
    }

    public static function success($data = null, array $events = []): self
    {
        return new self(true, $data, null, $events);
    }

    public static function failure(string $error): self
    {
        return new self(false, null, $error);
    }

    public function isSuccess(): bool
    {
        return $this->success;
    }

    public function getData()
    {
        return $this->data;
    }

    public function getError(): ?string
    {
        return $this->error;
    }

    public function getEvents(): array
    {
        return $this->events;
    }
}

class CreateOrderHandler implements CommandHandlerInterface
{
    private OrderRepository $orderRepository;
    private InventoryService $inventoryService;

    public function __construct(
        OrderRepository $orderRepository,
        InventoryService $inventoryService
    ) {
        $this->orderRepository = $orderRepository;
        $this->inventoryService = $inventoryService;
    }

    public function handle(Command $command): CommandResult
    {
        $payload = $command->getPayload();

        $orderId = $this->generateOrderId();

        foreach ($payload['items'] as $item) {
            $reserved = $this->inventoryService->reserve(
                $item['product_id'],
                $item['sku_id'],
                $item['quantity']
            );

            if (!$reserved) {
                return CommandResult::failure("Insufficient inventory for product: {$item['product_id']}");
            }
        }

        $order = [
            'order_id' => $orderId,
            'user_id' => $payload['user_id'],
            'items' => $payload['items'],
            'total_amount' => $payload['total_amount'],
            'shipping_address' => $payload['shipping_address'],
            'status' => 'pending',
            'created_at' => date('Y-m-d H:i:s'),
        ];

        $this->orderRepository->create($order);

        $event = new OrderCreatedEvent(
            $orderId,
            $payload['user_id'],
            $payload['items'],
            $payload['total_amount']
        );

        return CommandResult::success(['order_id' => $orderId], [$event]);
    }

    public function supports(string $commandType): bool
    {
        return $commandType === 'order.create';
    }

    private function generateOrderId(): string
    {
        return sprintf('ORD%s%s', date('YmdHis'), strtoupper(bin2hex(random_bytes(4))));
    }
}

class CommandBus
{
    private array $handlers = [];
    private EventPublisher $eventPublisher;

    public function __construct(EventPublisher $eventPublisher)
    {
        $this->eventPublisher = $eventPublisher;
    }

    public function register(CommandHandlerInterface $handler): void
    {
        $this->handlers[] = $handler;
    }

    public function dispatch(Command $command): CommandResult
    {
        foreach ($this->handlers as $handler) {
            if ($handler->supports($command->commandType)) {
                $result = $handler->handle($command);

                if ($result->isSuccess()) {
                    foreach ($result->getEvents() as $event) {
                        $this->eventPublisher->publish($event);
                    }
                }

                return $result;
            }
        }

        return CommandResult::failure("No handler found for command: {$command->commandType}");
    }
}

查询处理器

php
<?php

namespace App\CQRS;

interface QueryHandlerInterface
{
    public function handle(Query $query): QueryResult;
    public function supports(string $queryType): bool;
}

class QueryResult
{
    private bool $success;
    private $data;
    private ?string $error;

    private function __construct(bool $success, $data = null, ?string $error = null)
    {
        $this->success = $success;
        $this->data = $data;
        $this->error = $error;
    }

    public static function success($data): self
    {
        return new self(true, $data);
    }

    public static function failure(string $error): self
    {
        return new self(false, null, $error);
    }

    public function isSuccess(): bool
    {
        return $this->success;
    }

    public function getData()
    {
        return $this->data;
    }
}

class GetOrderHandler implements QueryHandlerInterface
{
    private OrderReadRepository $readRepository;

    public function __construct(OrderReadRepository $readRepository)
    {
        $this->readRepository = $readRepository;
    }

    public function handle(Query $query): QueryResult
    {
        $criteria = $query->getCriteria();
        $orderId = $criteria['order_id'];

        $order = $this->readRepository->findById($orderId);

        if (!$order) {
            return QueryResult::failure("Order not found: {$orderId}");
        }

        return QueryResult::success($order);
    }

    public function supports(string $queryType): bool
    {
        return $queryType === 'order.get';
    }
}

class ListOrdersHandler implements QueryHandlerInterface
{
    private OrderReadRepository $readRepository;

    public function __construct(OrderReadRepository $readRepository)
    {
        $this->readRepository = $readRepository;
    }

    public function handle(Query $query): QueryResult
    {
        $criteria = $query->getCriteria();

        $orders = $this->readRepository->findByUser(
            $criteria['user_id'],
            $criteria['page'],
            $criteria['page_size'],
            $criteria['status']
        );

        $total = $this->readRepository->countByUser(
            $criteria['user_id'],
            $criteria['status']
        );

        return QueryResult::success([
            'items' => $orders,
            'total' => $total,
            'page' => $criteria['page'],
            'page_size' => $criteria['page_size'],
        ]);
    }

    public function supports(string $queryType): bool
    {
        return $queryType === 'order.list';
    }
}

class QueryBus
{
    private array $handlers = [];

    public function register(QueryHandlerInterface $handler): void
    {
        $this->handlers[] = $handler;
    }

    public function dispatch(Query $query): QueryResult
    {
        foreach ($this->handlers as $handler) {
            if ($handler->supports($query->queryType)) {
                return $handler->handle($query);
            }
        }

        return QueryResult::failure("No handler found for query: {$query->queryType}");
    }
}

读模型同步服务

php
<?php

namespace App\CQRS;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class ReadModelSyncService
{
    private AMQPStreamConnection $connection;
    private $channel;
    private OrderReadRepository $readRepository;
    private array $projectors = [];
    private bool $running = true;

    public function __construct(
        AMQPStreamConnection $connection,
        OrderReadRepository $readRepository
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->readRepository = $readRepository;
        $this->setupProjectors();
    }

    private function setupProjectors(): void
    {
        $this->projectors = [
            'order.created' => [$this, 'projectOrderCreated'],
            'order.status_changed' => [$this, 'projectOrderStatusChanged'],
            'order.cancelled' => [$this, 'projectOrderCancelled'],
        ];
    }

    public function start(string $queueName = 'cqrs.sync'): void
    {
        $this->channel->queue_declare($queueName, false, true, false, false);
        $this->channel->queue_bind($queueName, 'cqrs.exchange', '#');

        $this->channel->basic_qos(null, 10, null);
        $this->channel->basic_consume($queueName, '', false, false, false, false, [$this, 'handleEvent']);

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true);
            if (!$this->running) break;
            usleep(100000);
        }
    }

    public function handleEvent(AMQPMessage $message): void
    {
        $event = json_decode($message->body, true);
        $eventType = $event['event_type'];

        if (isset($this->projectors[$eventType])) {
            try {
                $this->projectors[$eventType]($event);
                $message->ack();
            } catch (\Exception $e) {
                error_log("Projection error: " . $e->getMessage());
                $message->nack(false, true);
            }
        } else {
            $message->ack();
        }
    }

    private function projectOrderCreated(array $event): void
    {
        $payload = $event['payload'];

        $orderView = [
            'order_id' => $payload['order_id'],
            'user_id' => $payload['user_id'],
            'items' => json_encode($payload['items']),
            'total_amount' => $payload['total_amount'],
            'status' => 'pending',
            'item_count' => count($payload['items']),
            'created_at' => date('Y-m-d H:i:s', $event['timestamp']),
            'updated_at' => date('Y-m-d H:i:s', $event['timestamp']),
        ];

        $this->readRepository->upsert($orderView);
    }

    private function projectOrderStatusChanged(array $event): void
    {
        $payload = $event['payload'];

        $this->readRepository->updateStatus(
            $payload['order_id'],
            $payload['status'],
            date('Y-m-d H:i:s', $event['timestamp'])
        );
    }

    private function projectOrderCancelled(array $event): void
    {
        $payload = $event['payload'];

        $this->readRepository->updateStatus(
            $payload['order_id'],
            'cancelled',
            date('Y-m-d H:i:s', $event['timestamp'])
        );
    }

    public function stop(): void
    {
        $this->running = false;
    }
}

完整使用示例

php
<?php

require_once 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\CQRS\{
    Command,
    Query,
    CommandBus,
    QueryBus,
    CreateOrderCommand,
    GetOrderQuery,
    ListOrdersQuery,
    CreateOrderHandler,
    GetOrderHandler,
    ListOrdersHandler,
    ReadModelSyncService
};

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$redis = new Redis();
$redis->connect('localhost', 6379);

$eventPublisher = new EventPublisher($connection);
$commandBus = new CommandBus($eventPublisher);
$queryBus = new QueryBus();

$orderWriteRepo = new OrderRepository();
$orderReadRepo = new OrderReadRepository($redis);
$inventoryService = new InventoryService();

$commandBus->register(new CreateOrderHandler($orderWriteRepo, $inventoryService));
$queryBus->register(new GetOrderHandler($orderReadRepo));
$queryBus->register(new ListOrdersHandler($orderReadRepo));

$syncService = new ReadModelSyncService($connection, $orderReadRepo);

$syncPid = pcntl_fork();
if ($syncPid === 0) {
    echo "启动读模型同步服务...\n";
    $syncService->start();
    exit(0);
}

sleep(1);

echo "=== CQRS 示例 ===\n\n";

echo "1. 执行命令 - 创建订单\n";
$command = new CreateOrderCommand(
    'user_001',
    [
        ['product_id' => 'P001', 'sku_id' => 'SKU001', 'quantity' => 2, 'price' => 99.00],
        ['product_id' => 'P002', 'sku_id' => 'SKU002', 'quantity' => 1, 'price' => 199.00],
    ],
    397.00,
    '北京市朝阳区xxx'
);

$result = $commandBus->dispatch($command);

if ($result->isSuccess()) {
    $orderId = $result->getData()['order_id'];
    echo "订单创建成功: {$orderId}\n";
} else {
    echo "订单创建失败: {$result->getError()}\n";
}

sleep(2);

echo "\n2. 执行查询 - 获取订单详情\n";
$query = new GetOrderQuery($orderId);
$result = $queryBus->dispatch($query);

if ($result->isSuccess()) {
    echo "订单详情:\n";
    print_r($result->getData());
}

echo "\n3. 执行查询 - 订单列表\n";
$query = new ListOrdersQuery('user_001', 1, 10);
$result = $queryBus->dispatch($query);

if ($result->isSuccess()) {
    $data = $result->getData();
    echo "订单列表 (共 {$data['total']} 条):\n";
    foreach ($data['items'] as $order) {
        echo "- {$order['order_id']}: {$order['status']}\n";
    }
}

posix_kill($syncPid, SIGTERM);
pcntl_wait($status);

$eventPublisher->close();
$syncService->stop();
$connection->close();

echo "\n=== 示例完成 ===\n";

关键技术点解析

1. 读写分离

php
class CommandBus { /* 命令处理 */ }
class QueryBus { /* 查询处理 */ }

2. 事件驱动同步

php
$this->eventPublisher->publish($event);

3. 投影构建

php
private function projectOrderCreated(array $event): void
{
    $orderView = [...];
    $this->readRepository->upsert($orderView);
}

4. 最终一致性

读模型异步更新,保证最终一致:

命令执行 → 事件发布 → 投影构建 → 读模型更新

性能优化建议

优化项建议说明
读模型缓存Redis 缓存热点数据提升查询性能
批量同步合并多个事件减少写入次数
分库分表读写分离存储独立扩展
索引优化针对查询优化索引提升查询效率

常见问题与解决方案

1. 数据延迟

问题: 读模型更新延迟

解决方案:

  • 优化同步速度
  • 关键场景直接查写库
  • 提示用户刷新

2. 事件丢失

问题: 同步事件丢失

解决方案:

  • 消息持久化
  • 事件存储
  • 定期校验

3. 投影错误

问题: 投影构建失败

解决方案:

  • 错误日志记录
  • 重试机制
  • 手动修复工具

相关链接