Skip to content

订单系统解耦

概述

在电商系统中,订单创建是一个核心业务流程,涉及库存扣减、支付处理、物流通知、积分计算等多个下游系统。通过 RabbitMQ 实现订单系统解耦,将主流程与下游系统异步化处理,提升系统的可维护性和扩展性。

业务背景与需求

场景描述

某电商平台订单创建流程涉及以下子系统:

子系统功能耗时依赖关系
库存系统扣减库存、锁定商品50-100ms必须成功
支付系统创建支付单、调用支付网关100-300ms必须成功
积分系统计算并增加用户积分20-50ms可异步
优惠券系统标记优惠券已使用20-50ms可异步
通知系统发送订单确认通知50-100ms可异步
数据分析记录用户行为数据10-30ms可异步
物流系统预约物流、生成运单100-200ms可异步

痛点分析

传统同步调用链路:
用户下单 → 库存扣减 → 支付处理 → 积分计算 → 优惠券处理 → 通知发送 → 数据统计 → 物流预约
总耗时:350-830ms

问题:
1. 任意子系统故障导致整个下单失败
2. 系统耦合度高,难以独立扩展
3. 响应时间长,用户体验差
4. 故障影响范围大

需求目标

目标指标
下单响应时间< 300ms
系统可用性99.9%
消息可靠性99.99%
解耦程度核心流程与辅助流程完全解耦

架构设计

整体架构图

mermaid
graph TB
    subgraph "客户端"
        A[用户端]
        B[管理端]
    end
    
    subgraph "订单服务"
        C[订单API]
        D[订单核心服务]
        E[消息发布器]
    end
    
    subgraph "RabbitMQ"
        F[订单交换机<br/>order.exchange]
        
        subgraph "核心队列"
            G[库存队列<br/>order.inventory]
            H[支付队列<br/>order.payment]
        end
        
        subgraph "辅助队列"
            I[积分队列<br/>order.points]
            J[优惠券队列<br/>order.coupon]
            K[通知队列<br/>order.notification]
            L[分析队列<br/>order.analytics]
            M[物流队列<br/>order.logistics]
        end
        
        N[死信队列<br/>order.dlq]
    end
    
    subgraph "下游服务"
        O[库存服务]
        P[支付服务]
        Q[积分服务]
        R[优惠券服务]
        S[通知服务]
        T[数据分析服务]
        U[物流服务]
    end
    
    subgraph "补偿机制"
        V[补偿服务]
        W[人工处理]
    end
    
    A --> C
    B --> C
    C --> D
    D --> E
    E --> F
    F --> G
    F --> H
    F --> I
    F --> J
    F --> K
    F --> L
    F --> M
    
    G --> O
    H --> P
    I --> Q
    J --> R
    K --> S
    L --> T
    M --> U
    
    G -.-> N
    H -.-> N
    I -.-> N
    J -.-> N
    K -.-> N
    L -.-> N
    M -.-> N
    
    N --> V
    V --> W

订单创建流程

mermaid
sequenceDiagram
    participant User as 用户
    participant Order as 订单服务
    participant MQ as RabbitMQ
    participant Inventory as 库存服务
    participant Payment as 支付服务
    participant Points as 积分服务
    participant Notify as 通知服务
    
    User->>Order: 提交订单
    Order->>Order: 创建订单记录
    Order->>MQ: 发布库存扣减事件
    Order->>MQ: 发布支付创建事件
    Order-->>User: 返回订单号(立即响应)
    
    par 并行处理
        MQ->>Inventory: 消费库存事件
        Inventory->>Inventory: 扣减库存
        Inventory-->>MQ: ACK确认
        
        MQ->>Payment: 消费支付事件
        Payment->>Payment: 创建支付单
        Payment-->>MQ: ACK确认
    end
    
    Order->>MQ: 发布积分事件
    Order->>MQ: 发布通知事件
    
    par 异步处理
        MQ->>Points: 消费积分事件
        Points->>Points: 增加积分
        
        MQ->>Notify: 消费通知事件
        Notify->>Notify: 发送通知
    end

事件驱动模型

mermaid
stateDiagram-v2
    [*] --> OrderCreated: 用户下单
    OrderCreated --> InventoryReserved: 库存预留成功
    OrderCreated --> OrderCancelled: 库存预留失败
    InventoryReserved --> PaymentPending: 等待支付
    PaymentPending --> PaymentSuccess: 支付成功
    PaymentPending --> PaymentFailed: 支付失败/超时
    PaymentSuccess --> OrderConfirmed: 订单确认
    PaymentFailed --> InventoryReleased: 释放库存
    InventoryReleased --> OrderCancelled: 订单取消
    OrderConfirmed --> [*]
    OrderCancelled --> [*]

PHP 代码实现

订单事件定义

php
<?php

namespace App\Messaging\Order;

abstract class OrderEvent
{
    public string $eventId;
    public string $eventType;
    public string $orderId;
    public int $timestamp;
    public array $metadata;

    public function __construct(string $orderId, array $metadata = [])
    {
        $this->eventId = $this->generateEventId();
        $this->orderId = $orderId;
        $this->timestamp = time();
        $this->metadata = $metadata;
    }

    private function generateEventId(): string
    {
        return sprintf('evt_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
    }

    abstract public function getEventType(): string;

    public function toArray(): array
    {
        return [
            'event_id' => $this->eventId,
            'event_type' => $this->getEventType(),
            'order_id' => $this->orderId,
            'timestamp' => $this->timestamp,
            'metadata' => $this->metadata,
            'payload' => $this->getPayload(),
        ];
    }

    abstract protected function getPayload(): array;
}

class OrderCreatedEvent extends OrderEvent
{
    private array $orderData;

    public function __construct(string $orderId, array $orderData, array $metadata = [])
    {
        parent::__construct($orderId, $metadata);
        $this->orderData = $orderData;
    }

    public function getEventType(): string
    {
        return 'order.created';
    }

    protected function getPayload(): array
    {
        return $this->orderData;
    }

    public function getOrderData(): array
    {
        return $this->orderData;
    }
}

class OrderPaidEvent extends OrderEvent
{
    private string $transactionId;
    private string $paymentMethod;
    private float $paidAmount;

    public function __construct(
        string $orderId,
        string $transactionId,
        string $paymentMethod,
        float $paidAmount,
        array $metadata = []
    ) {
        parent::__construct($orderId, $metadata);
        $this->transactionId = $transactionId;
        $this->paymentMethod = $paymentMethod;
        $this->paidAmount = $paidAmount;
    }

    public function getEventType(): string
    {
        return 'order.paid';
    }

    protected function getPayload(): array
    {
        return [
            'transaction_id' => $this->transactionId,
            'payment_method' => $this->paymentMethod,
            'paid_amount' => $this->paidAmount,
        ];
    }
}

class OrderCancelledEvent extends OrderEvent
{
    private string $reason;
    private array $refundInfo;

    public function __construct(
        string $orderId,
        string $reason,
        array $refundInfo = [],
        array $metadata = []
    ) {
        parent::__construct($orderId, $metadata);
        $this->reason = $reason;
        $this->refundInfo = $refundInfo;
    }

    public function getEventType(): string
    {
        return 'order.cancelled';
    }

    protected function getPayload(): array
    {
        return [
            'reason' => $this->reason,
            'refund_info' => $this->refundInfo,
        ];
    }
}

class OrderShippedEvent extends OrderEvent
{
    private string $trackingNumber;
    private string $carrier;
    private string $estimatedDelivery;

    public function __construct(
        string $orderId,
        string $trackingNumber,
        string $carrier,
        string $estimatedDelivery,
        array $metadata = []
    ) {
        parent::__construct($orderId, $metadata);
        $this->trackingNumber = $trackingNumber;
        $this->carrier = $carrier;
        $this->estimatedDelivery = $estimatedDelivery;
    }

    public function getEventType(): string
    {
        return 'order.shipped';
    }

    protected function getPayload(): array
    {
        return [
            'tracking_number' => $this->trackingNumber,
            'carrier' => $this->carrier,
            'estimated_delivery' => $this->estimatedDelivery,
        ];
    }
}

订单事件发布器

php
<?php

namespace App\Messaging\Order;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class OrderEventPublisher
{
    private AMQPStreamConnection $connection;
    private $channel;
    private string $exchangeName = 'order.exchange';
    private string $deadLetterExchange = 'order.dlx';
    private bool $confirmed = false;

    private const ROUTING_KEYS = [
        'order.created' => [
            'order.inventory.reserve',
            'order.payment.create',
            'order.points.calculate',
            'order.coupon.use',
            'order.notification.send',
            'order.analytics.record',
        ],
        'order.paid' => [
            'order.inventory.confirm',
            'order.points.credit',
            'order.notification.send',
            'order.logistics.create',
        ],
        'order.cancelled' => [
            'order.inventory.release',
            'order.coupon.release',
            'order.points.rollback',
            'order.notification.send',
        ],
        'order.shipped' => [
            'order.notification.send',
            'order.analytics.record',
        ],
    ];

    public function __construct(AMQPStreamConnection $connection, bool $enableConfirm = true)
    {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->setupInfrastructure();
        
        if ($enableConfirm) {
            $this->enablePublisherConfirms();
        }
    }

    private function setupInfrastructure(): void
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::TOPIC,
            false,
            true,
            false
        );

        $this->channel->exchange_declare(
            $this->deadLetterExchange,
            AMQPExchangeType::DIRECT,
            false,
            true,
            false
        );

        $this->setupQueues();
    }

    private function setupQueues(): void
    {
        $queueConfigs = [
            'order.inventory' => [
                'routing_keys' => ['order.inventory.reserve', 'order.inventory.confirm', 'order.inventory.release'],
                'priority' => 10,
            ],
            'order.payment' => [
                'routing_keys' => ['order.payment.create'],
                'priority' => 10,
            ],
            'order.points' => [
                'routing_keys' => ['order.points.calculate', 'order.points.credit', 'order.points.rollback'],
                'priority' => 5,
            ],
            'order.coupon' => [
                'routing_keys' => ['order.coupon.use', 'order.coupon.release'],
                'priority' => 5,
            ],
            'order.notification' => [
                'routing_keys' => ['order.notification.send'],
                'priority' => 3,
            ],
            'order.analytics' => [
                'routing_keys' => ['order.analytics.record'],
                'priority' => 1,
            ],
            'order.logistics' => [
                'routing_keys' => ['order.logistics.create'],
                'priority' => 5,
            ],
        ];

        foreach ($queueConfigs as $queueName => $config) {
            $args = [
                'x-dead-letter-exchange' => ['S', $this->deadLetterExchange],
                'x-dead-letter-routing-key' => ['S', 'order.failed'],
                'x-max-priority' => ['I', 10],
                'x-message-ttl' => ['I', 604800000],
            ];

            $this->channel->queue_declare(
                $queueName,
                false,
                true,
                false,
                false,
                false,
                $args
            );

            foreach ($config['routing_keys'] as $routingKey) {
                $this->channel->queue_bind($queueName, $this->exchangeName, $routingKey);
            }
        }

        $this->channel->queue_declare('order.dlq', false, true, false, false);
        $this->channel->queue_bind('order.dlq', $this->deadLetterExchange, 'order.failed');
    }

    private function enablePublisherConfirms(): void
    {
        $this->channel->set_ack_handler(function (AMQPMessage $message) {
            $this->confirmed = true;
        });

        $this->channel->set_nack_handler(function (AMQPMessage $message) {
            $this->confirmed = false;
            error_log('Message NACKed: ' . $message->getBody());
        });

        $this->channel->confirm_select();
    }

    public function publish(OrderEvent $event): bool
    {
        $routingKeys = self::ROUTING_KEYS[$event->getEventType()] ?? [];
        
        if (empty($routingKeys)) {
            throw new \InvalidArgumentException('Unknown event type: ' . $event->getEventType());
        }

        $message = new AMQPMessage(
            json_encode($event->toArray()),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $event->eventId,
                'timestamp' => $event->timestamp,
                'type' => $event->getEventType(),
                'headers' => [
                    'order_id' => $event->orderId,
                ],
            ]
        );

        foreach ($routingKeys as $routingKey) {
            $this->channel->basic_publish(
                $message,
                $this->exchangeName,
                $routingKey
            );
        }

        if ($this->channel->is_consuming()) {
            $this->channel->wait_for_pending_acks(5.0);
        }

        return $this->confirmed;
    }

    public function publishAsync(OrderEvent $event): void
    {
        $routingKeys = self::ROUTING_KEYS[$event->getEventType()] ?? [];
        
        $message = new AMQPMessage(
            json_encode($event->toArray()),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $event->eventId,
                'timestamp' => $event->timestamp,
                'type' => $event->getEventType(),
            ]
        );

        foreach ($routingKeys as $routingKey) {
            $this->channel->basic_publish(
                $message,
                $this->exchangeName,
                $routingKey
            );
        }
    }

    public function close(): void
    {
        if ($this->channel) {
            $this->channel->close();
        }
    }
}

订单服务核心实现

php
<?php

namespace App\Services;

use App\Messaging\Order\{OrderCreatedEvent, OrderPaidEvent, OrderCancelledEvent, OrderEventPublisher};
use App\Repository\OrderRepository;
use App\Repository\InventoryRepository;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class OrderService
{
    private OrderRepository $orderRepository;
    private InventoryRepository $inventoryRepository;
    private OrderEventPublisher $eventPublisher;
    private $connection;

    public function __construct(
        OrderRepository $orderRepository,
        InventoryRepository $inventoryRepository,
        AMQPStreamConnection $connection
    ) {
        $this->orderRepository = $orderRepository;
        $this->inventoryRepository = $inventoryRepository;
        $this->connection = $connection;
        $this->eventPublisher = new OrderEventPublisher($connection);
    }

    public function createOrder(array $orderData): array
    {
        $orderId = $this->generateOrderId();
        
        $order = [
            'order_id' => $orderId,
            'user_id' => $orderData['user_id'],
            'items' => $orderData['items'],
            'total_amount' => $orderData['total_amount'],
            'status' => 'pending',
            'created_at' => date('Y-m-d H:i:s'),
            'updated_at' => date('Y-m-d H:i:s'),
            'shipping_address' => $orderData['shipping_address'],
            'coupon_id' => $orderData['coupon_id'] ?? null,
            'remark' => $orderData['remark'] ?? '',
        ];

        $this->orderRepository->beginTransaction();

        try {
            $this->orderRepository->create($order);

            foreach ($orderData['items'] as $item) {
                $reserved = $this->inventoryRepository->reserve(
                    $item['product_id'],
                    $item['sku_id'],
                    $item['quantity'],
                    $orderId
                );

                if (!$reserved) {
                    throw new \Exception("Insufficient inventory for product: {$item['product_id']}");
                }
            }

            $this->orderRepository->commit();

            $event = new OrderCreatedEvent($orderId, $order, [
                'source' => 'web',
                'user_id' => $orderData['user_id'],
            ]);

            $this->eventPublisher->publish($event);

            return [
                'success' => true,
                'order_id' => $orderId,
                'message' => 'Order created successfully',
            ];

        } catch (\Exception $e) {
            $this->orderRepository->rollback();
            
            return [
                'success' => false,
                'message' => $e->getMessage(),
            ];
        }
    }

    public function confirmPayment(string $orderId, array $paymentData): array
    {
        $order = $this->orderRepository->findById($orderId);
        
        if (!$order) {
            return ['success' => false, 'message' => 'Order not found'];
        }

        if ($order['status'] !== 'pending') {
            return ['success' => false, 'message' => 'Invalid order status'];
        }

        $this->orderRepository->update($orderId, [
            'status' => 'paid',
            'payment_id' => $paymentData['payment_id'],
            'paid_at' => date('Y-m-d H:i:s'),
        ]);

        $event = new OrderPaidEvent(
            $orderId,
            $paymentData['transaction_id'],
            $paymentData['payment_method'],
            $paymentData['amount'],
            ['source' => 'payment_callback']
        );

        $this->eventPublisher->publish($event);

        return ['success' => true, 'message' => 'Payment confirmed'];
    }

    public function cancelOrder(string $orderId, string $reason, array $refundInfo = []): array
    {
        $order = $this->orderRepository->findById($orderId);
        
        if (!$order) {
            return ['success' => false, 'message' => 'Order not found'];
        }

        if (!in_array($order['status'], ['pending', 'paid'])) {
            return ['success' => false, 'message' => 'Order cannot be cancelled'];
        }

        $this->orderRepository->update($orderId, [
            'status' => 'cancelled',
            'cancelled_at' => date('Y-m-d H:i:s'),
            'cancel_reason' => $reason,
        ]);

        foreach ($order['items'] as $item) {
            $this->inventoryRepository->release(
                $item['product_id'],
                $item['sku_id'],
                $item['quantity'],
                $orderId
            );
        }

        $event = new OrderCancelledEvent($orderId, $reason, $refundInfo, [
            'source' => 'user_cancel',
        ]);

        $this->eventPublisher->publish($event);

        return ['success' => true, 'message' => 'Order cancelled'];
    }

    private function generateOrderId(): string
    {
        return sprintf('ORD%s%s', date('YmdHis'), strtoupper(bin2hex(random_bytes(4))));
    }
}

下游服务消费者实现

php
<?php

namespace App\Consumers;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

abstract class OrderEventConsumer
{
    protected AMQPStreamConnection $connection;
    protected $channel;
    protected string $queueName;
    protected int $prefetchCount = 10;
    protected bool $running = true;

    public function __construct(AMQPStreamConnection $connection)
    {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->setup();
    }

    protected function setup(): void
    {
        $this->channel->basic_qos(null, $this->prefetchCount, null);
    }

    public function consume(): void
    {
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            [$this, 'processMessage']
        );

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true);
            
            if (!$this->running) {
                break;
            }
            
            usleep(100000);
        }
    }

    public function processMessage(AMQPMessage $message): void
    {
        $eventData = json_decode($message->body, true);
        
        try {
            $result = $this->handleEvent($eventData);
            
            if ($result['success']) {
                $message->ack();
                $this->logSuccess($eventData, $result);
            } else {
                $this->handleFailure($message, $eventData, $result['error']);
            }
            
        } catch (\Exception $e) {
            $this->handleException($message, $eventData, $e);
        }
    }

    abstract protected function handleEvent(array $eventData): array;

    protected function handleFailure(AMQPMessage $message, array $eventData, \Exception $error): void
    {
        $retryCount = $this->getRetryCount($message);
        
        if ($retryCount < 3) {
            $this->logRetry($eventData, $retryCount, $error);
            $message->nack(false, true);
        } else {
            $this->logFailure($eventData, $error);
            $message->nack(false, false);
        }
    }

    protected function handleException(AMQPMessage $message, array $eventData, \Exception $e): void
    {
        $this->logError($eventData, $e);
        $message->nack(false, true);
    }

    protected function getRetryCount(AMQPMessage $message): int
    {
        $headers = $message->get('application_headers');
        if ($headers && isset($headers['x-retry-count'])) {
            return $headers['x-retry-count'];
        }
        return 0;
    }

    public function stop(): void
    {
        $this->running = false;
    }

    public function close(): void
    {
        $this->stop();
        if ($this->channel) {
            $this->channel->close();
        }
    }

    protected function logSuccess(array $eventData, array $result): void
    {
        error_log(sprintf(
            '[%s] Event processed: %s - %s',
            static::class,
            $eventData['event_id'],
            $eventData['event_type']
        ));
    }

    protected function logRetry(array $eventData, int $retryCount, \Exception $error): void
    {
        error_log(sprintf(
            '[%s] Retry %d: %s - %s',
            static::class,
            $retryCount,
            $eventData['event_id'],
            $error->getMessage()
        ));
    }

    protected function logFailure(array $eventData, \Exception $error): void
    {
        error_log(sprintf(
            '[%s] Failed: %s - %s',
            static::class,
            $eventData['event_id'],
            $error->getMessage()
        ));
    }

    protected function logError(array $eventData, \Exception $e): void
    {
        error_log(sprintf(
            '[%s] Error: %s - %s',
            static::class,
            $eventData['event_id'],
            $e->getMessage()
        ));
    }
}

class InventoryConsumer extends OrderEventConsumer
{
    protected string $queueName = 'order.inventory';
    private InventoryService $inventoryService;

    public function __construct(AMQPStreamConnection $connection, InventoryService $inventoryService)
    {
        parent::__construct($connection);
        $this->inventoryService = $inventoryService;
    }

    protected function handleEvent(array $eventData): array
    {
        $eventType = $eventData['event_type'];
        $orderId = $eventData['order_id'];
        $payload = $eventData['payload'];

        switch ($eventType) {
            case 'order.created':
                return $this->handleOrderCreated($orderId, $payload);
                
            case 'order.paid':
                return $this->handleOrderPaid($orderId, $payload);
                
            case 'order.cancelled':
                return $this->handleOrderCancelled($orderId, $payload);
                
            default:
                return ['success' => true];
        }
    }

    private function handleOrderCreated(string $orderId, array $payload): array
    {
        foreach ($payload['items'] as $item) {
            $confirmed = $this->inventoryService->confirmReservation(
                $item['product_id'],
                $item['sku_id'],
                $item['quantity'],
                $orderId
            );

            if (!$confirmed) {
                return [
                    'success' => false,
                    'error' => new \Exception("Failed to confirm inventory for order: {$orderId}"),
                ];
            }
        }

        return ['success' => true];
    }

    private function handleOrderPaid(string $orderId, array $payload): array
    {
        return ['success' => true];
    }

    private function handleOrderCancelled(string $orderId, array $payload): array
    {
        foreach ($payload['items'] ?? [] as $item) {
            $this->inventoryService->releaseReservation(
                $item['product_id'],
                $item['sku_id'],
                $item['quantity'],
                $orderId
            );
        }

        return ['success' => true];
    }
}

class PointsConsumer extends OrderEventConsumer
{
    protected string $queueName = 'order.points';
    private PointsService $pointsService;

    public function __construct(AMQPStreamConnection $connection, PointsService $pointsService)
    {
        parent::__construct($connection);
        $this->pointsService = $pointsService;
    }

    protected function handleEvent(array $eventData): array
    {
        $eventType = $eventData['event_type'];
        $orderId = $eventData['order_id'];
        $payload = $eventData['payload'];

        switch ($eventType) {
            case 'order.paid':
                return $this->handleOrderPaid($orderId, $payload);
                
            case 'order.cancelled':
                return $this->handleOrderCancelled($orderId, $payload);
                
            default:
                return ['success' => true];
        }
    }

    private function handleOrderPaid(string $orderId, array $payload): array
    {
        $userId = $payload['user_id'];
        $points = $this->calculatePoints($payload['total_amount']);

        $this->pointsService->credit(
            $userId,
            $points,
            "Order #{$orderId} reward",
            $orderId
        );

        return ['success' => true, 'points_credited' => $points];
    }

    private function handleOrderCancelled(string $orderId, array $payload): array
    {
        $userId = $payload['user_id'];

        $this->pointsService->rollback(
            $userId,
            $orderId,
            "Order #{$orderId} cancelled"
        );

        return ['success' => true];
    }

    private function calculatePoints(float $amount): int
    {
        return (int) floor($amount);
    }
}

class NotificationConsumer extends OrderEventConsumer
{
    protected string $queueName = 'order.notification';
    private NotificationService $notificationService;

    public function __construct(AMQPStreamConnection $connection, NotificationService $notificationService)
    {
        parent::__construct($connection);
        $this->notificationService = $notificationService;
    }

    protected function handleEvent(array $eventData): array
    {
        $eventType = $eventData['event_type'];
        $orderId = $eventData['order_id'];
        $payload = $eventData['payload'];

        $templateMap = [
            'order.created' => 'ORDER_CREATED',
            'order.paid' => 'ORDER_PAID',
            'order.cancelled' => 'ORDER_CANCELLED',
            'order.shipped' => 'ORDER_SHIPPED',
        ];

        $template = $templateMap[$eventType] ?? null;
        
        if (!$template) {
            return ['success' => true];
        }

        $this->notificationService->send([
            'user_id' => $payload['user_id'],
            'template' => $template,
            'data' => [
                'order_id' => $orderId,
                'order_data' => $payload,
            ],
            'channels' => ['sms', 'email', 'push'],
        ]);

        return ['success' => true];
    }
}

补偿服务实现

php
<?php

namespace App\Services;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class CompensationService
{
    private AMQPStreamConnection $connection;
    private $channel;
    private OrderRepository $orderRepository;
    private string $dlqName = 'order.dlq';

    public function __construct(
        AMQPStreamConnection $connection,
        OrderRepository $orderRepository
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->orderRepository = $orderRepository;
    }

    public function processDeadLetterQueue(): void
    {
        $this->channel->basic_consume(
            $this->dlqName,
            '',
            false,
            false,
            false,
            false,
            [$this', 'handleDeadLetter']
        );

        while (count($this->channel->callbacks)) {
            $this->channel->wait();
        }
    }

    public function handleDeadLetter(AMQPMessage $message): void
    {
        $data = json_decode($message->body, true);
        $eventData = $data['original_task'] ?? $data;
        $error = $data['error'] ?? [];

        $this->logDeadLetter($eventData, $error);

        $compensationResult = $this->executeCompensation($eventData, $error);

        if ($compensationResult['success']) {
            $message->ack();
            $this->logCompensationSuccess($eventData);
        } else {
            $this->logCompensationFailure($eventData, $compensationResult['error']);
            $this->alertManualIntervention($eventData, $error);
            $message->ack();
        }
    }

    private function executeCompensation(array $eventData, array $error): array
    {
        $eventType = $eventData['event_type'] ?? '';
        $orderId = $eventData['order_id'] ?? '';

        switch ($eventType) {
            case 'order.created':
                return $this->compensateOrderCreated($orderId, $eventData);
                
            case 'order.paid':
                return $this->compensateOrderPaid($orderId, $eventData);
                
            default:
                return ['success' => true];
        }
    }

    private function compensateOrderCreated(string $orderId, array $eventData): array
    {
        try {
            $order = $this->orderRepository->findById($orderId);
            
            if ($order && $order['status'] === 'pending') {
                $this->orderRepository->update($orderId, [
                    'status' => 'failed',
                    'failed_at' => date('Y-m-d H:i:s'),
                    'failure_reason' => 'Downstream service failure',
                ]);

                return ['success' => true];
            }

            return ['success' => true];
            
        } catch (\Exception $e) {
            return ['success' => false, 'error' => $e];
        }
    }

    private function compensateOrderPaid(string $orderId, array $eventData): array
    {
        return ['success' => true];
    }

    private function logDeadLetter(array $eventData, array $error): void
    {
        error_log(sprintf(
            '[Compensation] Dead letter received: %s - %s',
            $eventData['event_id'] ?? 'unknown',
            $error['message'] ?? 'unknown error'
        ));
    }

    private function logCompensationSuccess(array $eventData): void
    {
        error_log(sprintf(
            '[Compensation] Success: %s',
            $eventData['event_id'] ?? 'unknown'
        ));
    }

    private function logCompensationFailure(array $eventData, \Exception $error): void
    {
        error_log(sprintf(
            '[Compensation] Failed: %s - %s',
            $eventData['event_id'] ?? 'unknown',
            $error->getMessage()
        ));
    }

    private function alertManualIntervention(array $eventData, array $error): void
    {
        error_log(sprintf(
            '[Compensation] MANUAL INTERVENTION REQUIRED: %s',
            json_encode([
                'event' => $eventData,
                'error' => $error,
                'timestamp' => date('Y-m-d H:i:s'),
            ])
        ));
    }
}

关键技术点解析

1. 事件溯源设计

每个订单状态变更都产生对应事件,便于:

  • 追踪订单完整生命周期
  • 实现事件回放和状态恢复
  • 支持审计和合规需求

2. 幂等性保证

php
class IdempotentEventHandler
{
    private $redis;

    public function handle(array $eventData): array
    {
        $eventId = $eventData['event_id'];
        $lockKey = "event:processed:{$eventId}";

        if ($this->redis->exists($lockKey)) {
            return ['success' => true, 'status' => 'already_processed'];
        }

        $result = $this->doHandle($eventData);

        if ($result['success']) {
            $this->redis->setex($lockKey, 86400 * 7, json_encode($result));
        }

        return $result;
    }
}

3. 最终一致性

通过补偿机制确保数据最终一致:

  • 死信队列收集失败消息
  • 补偿服务处理异常情况
  • 人工介入兜底

4. 消息顺序性

同一订单的事件需要保证顺序处理:

php
$args = [
    'x-single-active-consumer' => ['t', true],
    'x-queue-type' => ['S', 'quorum'],
];

性能优化建议

1. 批量处理优化

php
class BatchOrderProcessor
{
    private array $batch = [];
    private int $batchSize = 100;
    private float $batchTimeout = 1.0;

    public function addEvent(array $event): void
    {
        $this->batch[] = $event;

        if (count($this->batch) >= $this->batchSize) {
            $this->flush();
        }
    }

    public function flush(): void
    {
        if (empty($this->batch)) {
            return;
        }

        $this->processBatch($this->batch);
        $this->batch = [];
    }
}

2. 消费者水平扩展

bash
# Supervisor 配置
[program:order_consumer]
command=php /var/www/bin/consumer.php order.inventory
process_name=%(program_name)s_%(process_num)02d
numprocs=5
autostart=true
autorestart=true

3. 数据库优化

  • 订单表按时间分区
  • 读写分离
  • 添加必要索引

常见问题与解决方案

1. 消息丢失

问题: 消息在传输过程中丢失

解决方案:

  • 开启消息持久化
  • 使用 publisher confirms
  • 配置镜像队列或仲裁队列

2. 消息重复

问题: 消息被重复消费

解决方案:

  • 实现幂等性处理
  • 使用唯一事件ID去重
  • Redis 记录已处理事件

3. 消息积压

问题: 队列消息堆积

解决方案:

  • 增加消费者数量
  • 优化消费者处理逻辑
  • 使用批量处理

4. 顺序错乱

问题: 同一订单的事件处理顺序错乱

解决方案:

  • 使用单消费者模式
  • 按订单ID路由到同一队列
  • 使用消息分组

相关链接