Skip to content

服务解耦实践

概述

服务解耦是微服务架构的核心目标之一。通过 RabbitMQ 实现服务间的异步通信,降低服务间的直接依赖,提高系统的可维护性、可扩展性和容错能力。

业务背景与需求

场景描述

某电商系统服务依赖关系:

传统架构依赖链:
用户下单 → 订单服务 → 库存服务 → 支付服务 → 通知服务 → 积分服务

                物流服务

问题:
1. 任何服务故障都会影响下单
2. 服务升级需要协调多个团队
3. 性能瓶颈难以定位
4. 代码耦合度高

解耦目标

目标描述
独立部署服务可独立发布,不影响其他服务
故障隔离单服务故障不影响整体业务
弹性扩展各服务可独立扩缩容
技术异构不同服务可使用不同技术栈

架构设计

解耦前后对比

mermaid
graph TB
    subgraph "解耦前 - 紧耦合"
        A1[订单服务] --> B1[库存服务]
        A1 --> C1[支付服务]
        A1 --> D1[通知服务]
        A1 --> E1[积分服务]
        A1 --> F1[物流服务]
    end
    
    subgraph "解耦后 - 松耦合"
        A2[订单服务] --> G2[RabbitMQ]
        G2 --> B2[库存服务]
        G2 --> C2[支付服务]
        G2 --> D2[通知服务]
        G2 --> E2[积分服务]
        G2 --> F2[物流服务]
    end

事件驱动解耦架构

mermaid
graph TB
    subgraph "业务服务层"
        A[订单服务]
        B[用户服务]
        C[商品服务]
    end
    
    subgraph "消息中间件"
        D[RabbitMQ<br/>事件总线]
        
        subgraph "事件通道"
            E[订单事件]
            F[用户事件]
            G[商品事件]
        end
    end
    
    subgraph "下游服务层"
        H[库存服务]
        I[支付服务]
        J[通知服务]
        K[积分服务]
        L[分析服务]
    end
    
    A --> D
    B --> D
    C --> D
    
    D --> E
    D --> F
    D --> G
    
    E --> H
    E --> I
    E --> J
    E --> K
    F --> J
    F --> K
    G --> L

服务边界划分

mermaid
graph LR
    subgraph "核心域"
        A[订单服务]
        B[支付服务]
    end
    
    subgraph "支撑域"
        C[库存服务]
        D[用户服务]
        E[商品服务]
    end
    
    subgraph "通用域"
        F[通知服务]
        G[积分服务]
        H[搜索服务]
    end
    
    A -->|订单事件| C
    A -->|订单事件| F
    A -->|订单事件| G
    B -->|支付事件| A

PHP 代码实现

事件发布者封装

php
<?php

namespace App\ServiceDecoupling;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class EventPublisher
{
    private AMQPStreamConnection $connection;
    private $channel;
    private string $exchangeName = 'domain.events';
    private string $serviceName;

    public function __construct(
        AMQPStreamConnection $connection,
        string $serviceName
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->serviceName = $serviceName;
        $this->setupExchange();
    }

    private function setupExchange(): void
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::TOPIC,
            false,
            true,
            false
        );
    }

    public function publish(string $eventType, array $payload, array $metadata = []): string
    {
        $eventId = $this->generateEventId();

        $event = [
            'event_id' => $eventId,
            'event_type' => $eventType,
            'source' => $this->serviceName,
            'timestamp' => time(),
            'payload' => $payload,
            'metadata' => array_merge([
                'version' => '1.0',
                'correlation_id' => $this->generateCorrelationId(),
            ], $metadata),
        ];

        $routingKey = $this->buildRoutingKey($eventType);

        $message = new AMQPMessage(
            json_encode($event),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $eventId,
                'timestamp' => time(),
                'type' => $eventType,
            ]
        );

        $this->channel->basic_publish(
            $message,
            $this->exchangeName,
            $routingKey
        );

        return $eventId;
    }

    private function buildRoutingKey(string $eventType): string
    {
        return str_replace('.', '.', $eventType);
    }

    private function generateEventId(): string
    {
        return sprintf('evt_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
    }

    private function generateCorrelationId(): string
    {
        return bin2hex(random_bytes(16));
    }

    public function close(): void
    {
        if ($this->channel) {
            $this->channel->close();
        }
    }
}

事件订阅者封装

php
<?php

namespace App\ServiceDecoupling;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class EventSubscriber
{
    private AMQPStreamConnection $connection;
    private $channel;
    private string $serviceName;
    private string $exchangeName = 'domain.events';
    private array $subscriptions = [];
    private bool $running = true;

    public function __construct(
        AMQPStreamConnection $connection,
        string $serviceName
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->serviceName = $serviceName;
        $this->setupExchange();
    }

    private function setupExchange(): void
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::TOPIC,
            false,
            true,
            false
        );
    }

    public function subscribe(string $eventType, callable $handler, array $options = []): void
    {
        $this->subscriptions[$eventType] = [
            'handler' => $handler,
            'options' => array_merge([
                'prefetch' => 10,
                'retry' => 3,
                'dead_letter' => true,
            ], $options),
        ];
    }

    public function start(): void
    {
        $queueName = "queue.{$this->serviceName}";
        $args = [
            'x-dead-letter-exchange' => ['S', 'domain.dlx'],
            'x-message-ttl' => ['I', 604800000],
        ];

        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            $args
        );

        foreach ($this->subscriptions as $eventType => $config) {
            $routingKey = $eventType;
            $this->channel->queue_bind($queueName, $this->exchangeName, $routingKey);
        }

        $prefetch = max(array_column(array_column($this->subscriptions, 'options'), 'prefetch'));
        $this->channel->basic_qos(null, $prefetch, null);

        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            false,
            false,
            false,
            [$this, 'handleMessage']
        );

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true);
            if (!$this->running) break;
            usleep(100000);
        }
    }

    public function handleMessage(AMQPMessage $message): void
    {
        $event = json_decode($message->body, true);
        $eventType = $event['event_type'];

        if (!isset($this->subscriptions[$eventType])) {
            $message->ack();
            return;
        }

        $config = $this->subscriptions[$eventType];
        $handler = $config['handler'];

        try {
            $handler($event);
            $message->ack();
        } catch (\Exception $e) {
            $this->handleError($message, $event, $e, $config['options']);
        }
    }

    private function handleError(
        AMQPMessage $message,
        array $event,
        \Exception $e,
        array $options
    ): void {
        $retryCount = $this->getRetryCount($message);

        if ($retryCount < $options['retry']) {
            error_log(sprintf(
                '[EventSubscriber] Retry %d for event %s: %s',
                $retryCount + 1,
                $event['event_id'],
                $e->getMessage()
            ));
            $message->nack(false, true);
        } else {
            error_log(sprintf(
                '[EventSubscriber] Event %s failed after %d retries: %s',
                $event['event_id'],
                $retryCount,
                $e->getMessage()
            ));
            $message->nack(false, false);
        }
    }

    private function getRetryCount(AMQPMessage $message): int
    {
        $headers = $message->get('application_headers');
        if ($headers && isset($headers['x-retry-count'])) {
            return $headers['x-retry-count'];
        }
        return 0;
    }

    public function stop(): void
    {
        $this->running = false;
    }

    public function close(): void
    {
        $this->stop();
        if ($this->channel) {
            $this->channel->close();
        }
    }
}

订单服务实现

php
<?php

namespace App\Services;

use App\ServiceDecoupling\EventPublisher;

class OrderService
{
    private EventPublisher $eventPublisher;
    private OrderRepository $orderRepository;

    public function __construct(
        EventPublisher $eventPublisher,
        OrderRepository $orderRepository
    ) {
        $this->eventPublisher = $eventPublisher;
        $this->orderRepository = $orderRepository;
    }

    public function createOrder(array $orderData): array
    {
        $orderId = $this->generateOrderId();

        $order = [
            'order_id' => $orderId,
            'user_id' => $orderData['user_id'],
            'items' => $orderData['items'],
            'total_amount' => $orderData['total_amount'],
            'status' => 'pending',
            'created_at' => date('Y-m-d H:i:s'),
        ];

        $this->orderRepository->create($order);

        $this->eventPublisher->publish('order.created', [
            'order_id' => $orderId,
            'user_id' => $orderData['user_id'],
            'items' => $orderData['items'],
            'total_amount' => $orderData['total_amount'],
        ], [
            'trace_id' => $orderData['trace_id'] ?? null,
        ]);

        return [
            'success' => true,
            'order_id' => $orderId,
        ];
    }

    public function updateOrderStatus(string $orderId, string $status, array $extra = []): void
    {
        $this->orderRepository->update($orderId, [
            'status' => $status,
            'updated_at' => date('Y-m-d H:i:s'),
        ]);

        $this->eventPublisher->publish('order.status_changed', [
            'order_id' => $orderId,
            'status' => $status,
            'extra' => $extra,
        ]);
    }

    public function cancelOrder(string $orderId, string $reason): void
    {
        $order = $this->orderRepository->findById($orderId);

        $this->orderRepository->update($orderId, [
            'status' => 'cancelled',
            'cancel_reason' => $reason,
            'cancelled_at' => date('Y-m-d H:i:s'),
        ]);

        $this->eventPublisher->publish('order.cancelled', [
            'order_id' => $orderId,
            'user_id' => $order['user_id'],
            'reason' => $reason,
            'items' => $order['items'],
        ]);
    }

    private function generateOrderId(): string
    {
        return sprintf('ORD%s%s', date('YmdHis'), strtoupper(bin2hex(random_bytes(4))));
    }
}

库存服务实现

php
<?php

namespace App\Services;

use App\ServiceDecoupling\EventSubscriber;
use App\ServiceDecoupling\EventPublisher;

class InventoryService
{
    private EventSubscriber $eventSubscriber;
    private EventPublisher $eventPublisher;
    private InventoryRepository $inventoryRepository;

    public function __construct(
        EventSubscriber $eventSubscriber,
        EventPublisher $eventPublisher,
        InventoryRepository $inventoryRepository
    ) {
        $this->eventSubscriber = $eventSubscriber;
        $this->eventPublisher = $eventPublisher;
        $this->inventoryRepository = $inventoryRepository;
        $this->setupSubscriptions();
    }

    private function setupSubscriptions(): void
    {
        $this->eventSubscriber->subscribe('order.created', [$this, 'handleOrderCreated']);
        $this->eventSubscriber->subscribe('order.cancelled', [$this, 'handleOrderCancelled']);
        $this->eventSubscriber->subscribe('payment.completed', [$this, 'handlePaymentCompleted']);
    }

    public function handleOrderCreated(array $event): void
    {
        $payload = $event['payload'];

        foreach ($payload['items'] as $item) {
            $success = $this->inventoryRepository->reserve(
                $item['product_id'],
                $item['sku_id'],
                $item['quantity'],
                $payload['order_id']
            );

            if (!$success) {
                $this->eventPublisher->publish('inventory.reserve_failed', [
                    'order_id' => $payload['order_id'],
                    'product_id' => $item['product_id'],
                    'sku_id' => $item['sku_id'],
                    'reason' => 'Insufficient stock',
                ]);
                return;
            }
        }

        $this->eventPublisher->publish('inventory.reserved', [
            'order_id' => $payload['order_id'],
            'items' => $payload['items'],
        ]);
    }

    public function handleOrderCancelled(array $event): void
    {
        $payload = $event['payload'];

        foreach ($payload['items'] as $item) {
            $this->inventoryRepository->release(
                $item['product_id'],
                $item['sku_id'],
                $item['quantity'],
                $payload['order_id']
            );
        }

        $this->eventPublisher->publish('inventory.released', [
            'order_id' => $payload['order_id'],
            'items' => $payload['items'],
        ]);
    }

    public function handlePaymentCompleted(array $event): void
    {
        $payload = $event['payload'];

        $this->inventoryRepository->confirmReservation($payload['order_id']);

        $this->eventPublisher->publish('inventory.confirmed', [
            'order_id' => $payload['order_id'],
        ]);
    }

    public function start(): void
    {
        $this->eventSubscriber->start();
    }

    public function stop(): void
    {
        $this->eventSubscriber->stop();
    }
}

通知服务实现

php
<?php

namespace App\Services;

use App\ServiceDecoupling\EventSubscriber;

class NotificationService
{
    private EventSubscriber $eventSubscriber;
    private array $notifiers;

    public function __construct(
        EventSubscriber $eventSubscriber,
        array $notifiers = []
    ) {
        $this->eventSubscriber = $eventSubscriber;
        $this->notifiers = $notifiers;
        $this->setupSubscriptions();
    }

    private function setupSubscriptions(): void
    {
        $this->eventSubscriber->subscribe('order.created', [$this, 'handleOrderCreated']);
        $this->eventSubscriber->subscribe('order.status_changed', [$this, 'handleOrderStatusChanged']);
        $this->eventSubscriber->subscribe('order.cancelled', [$this, 'handleOrderCancelled']);
        $this->eventSubscriber->subscribe('user.registered', [$this, 'handleUserRegistered']);
        $this->eventSubscriber->subscribe('payment.completed', [$this, 'handlePaymentCompleted']);
    }

    public function handleOrderCreated(array $event): void
    {
        $payload = $event['payload'];

        $this->sendNotification([
            'type' => 'order_created',
            'user_id' => $payload['user_id'],
            'channels' => ['sms', 'email'],
            'data' => [
                'order_id' => $payload['order_id'],
                'total_amount' => $payload['total_amount'],
            ],
        ]);
    }

    public function handleOrderStatusChanged(array $event): void
    {
        $payload = $event['payload'];
        $status = $payload['status'];

        $templates = [
            'paid' => 'order_paid',
            'shipped' => 'order_shipped',
            'delivered' => 'order_delivered',
        ];

        if (!isset($templates[$status])) {
            return;
        }

        $this->sendNotification([
            'type' => $templates[$status],
            'user_id' => $payload['extra']['user_id'] ?? null,
            'channels' => ['sms', 'push'],
            'data' => [
                'order_id' => $payload['order_id'],
                'status' => $status,
            ],
        ]);
    }

    public function handleOrderCancelled(array $event): void
    {
        $payload = $event['payload'];

        $this->sendNotification([
            'type' => 'order_cancelled',
            'user_id' => $payload['user_id'],
            'channels' => ['sms', 'email'],
            'data' => [
                'order_id' => $payload['order_id'],
                'reason' => $payload['reason'],
            ],
        ]);
    }

    public function handleUserRegistered(array $event): void
    {
        $payload = $event['payload'];

        $this->sendNotification([
            'type' => 'welcome',
            'user_id' => $payload['user_id'],
            'channels' => ['email'],
            'data' => [
                'name' => $payload['name'],
                'email' => $payload['email'],
            ],
        ]);
    }

    public function handlePaymentCompleted(array $event): void
    {
        $payload = $event['payload'];

        $this->sendNotification([
            'type' => 'payment_success',
            'user_id' => $payload['extra']['user_id'] ?? null,
            'channels' => ['sms', 'push'],
            'data' => [
                'order_id' => $payload['order_id'],
                'amount' => $payload['amount'],
            ],
        ]);
    }

    private function sendNotification(array $notification): void
    {
        foreach ($notification['channels'] as $channel) {
            if (isset($this->notifiers[$channel])) {
                $this->notifiers[$channel]->send($notification);
            }
        }
    }

    public function start(): void
    {
        $this->eventSubscriber->start();
    }

    public function stop(): void
    {
        $this->eventSubscriber->stop();
    }
}

完整使用示例

php
<?php

require_once 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\ServiceDecoupling\{EventPublisher, EventSubscriber};
use App\Services\{OrderService, InventoryService, NotificationService};

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$orderPublisher = new EventPublisher($connection, 'order-service');
$inventoryPublisher = new EventPublisher($connection, 'inventory-service');

$orderService = new OrderService(
    $orderPublisher,
    new OrderRepository()
);

echo "=== 服务解耦示例 ===\n\n";

echo "1. 创建订单\n";
$result = $orderService->createOrder([
    'user_id' => 'user_001',
    'items' => [
        ['product_id' => 'P001', 'sku_id' => 'SKU001', 'quantity' => 2, 'price' => 99.00],
    ],
    'total_amount' => 198.00,
]);
echo "订单创建结果: " . json_encode($result) . "\n\n";

$inventorySubscriber = new EventSubscriber($connection, 'inventory-service');
$inventoryService = new InventoryService(
    $inventorySubscriber,
    $inventoryPublisher,
    new InventoryRepository()
);

$notificationSubscriber = new EventSubscriber($connection, 'notification-service');
$notificationService = new NotificationService(
    $notificationSubscriber,
    [
        'sms' => new SmsNotifier(),
        'email' => new EmailNotifier(),
        'push' => new PushNotifier(),
    ]
);

echo "2. 启动下游服务消费者\n";

$inventoryPid = pcntl_fork();
if ($inventoryPid === 0) {
    $inventoryService->start();
    exit(0);
}

$notificationPid = pcntl_fork();
if ($notificationPid === 0) {
    $notificationService->start();
    exit(0);
}

sleep(2);

echo "\n3. 更新订单状态\n";
$orderService->updateOrderStatus($result['order_id'], 'paid', [
    'user_id' => 'user_001',
]);

sleep(1);

echo "\n4. 取消订单\n";
$orderService->cancelOrder($result['order_id'], '用户主动取消');

sleep(2);

posix_kill($inventoryPid, SIGTERM);
posix_kill($notificationPid, SIGTERM);
pcntl_wait($status);
pcntl_wait($status);

$orderPublisher->close();
$inventoryPublisher->close();
$inventorySubscriber->close();
$notificationSubscriber->close();
$connection->close();

echo "\n=== 示例完成 ===\n";

关键技术点解析

1. 事件驱动解耦

服务通过事件通信,而非直接调用:

php
$this->eventPublisher->publish('order.created', $payload);

2. 最终一致性

通过事件传播保证数据最终一致:

订单创建 → 库存预留 → 支付完成 → 库存确认

3. 故障隔离

单服务故障不影响其他服务:

  • 消息队列作为缓冲
  • 失败消息进入死信队列
  • 独立重试机制

4. 独立扩展

各服务可独立部署和扩展:

订单服务: 3 实例
库存服务: 2 实例
通知服务: 5 实例

性能优化建议

优化项建议说明
事件精简只包含必要数据减少网络传输
批量处理合并同类事件提高吞吐量
本地缓存缓存热点数据减少远程调用
异步处理非核心流程异步提升响应速度

常见问题与解决方案

1. 事件丢失

解决方案: 消息持久化 + 事件存储

2. 事件顺序

解决方案: 单消费者 + 版本号

3. 事件重复

解决方案: 幂等性处理

4. 服务雪崩

解决方案: 熔断 + 限流 + 降级

相关链接