Skip to content

微服务间通信

概述

微服务架构中,服务间通信是核心问题之一。RabbitMQ 提供了异步消息通信机制,实现服务间的解耦、可靠传输和流量控制,是微服务通信的理想选择。

业务背景与需求

场景描述

某电商平台微服务通信需求:

通信场景调用方式特点量级
用户认证同步需要即时响应
库存查询同步需要即时响应
订单创建异步可延迟处理
支付通知异步可靠性优先
数据同步异步最终一致性

通信模式对比

同步通信 (HTTP/gRPC):
优点:简单直观,即时响应
缺点:紧耦合,级联失败,性能瓶颈

异步通信 (RabbitMQ):
优点:松耦合,削峰填谷,容错性好
缺点:复杂度增加,调试困难

需求目标

目标指标
解耦度服务独立部署,无直接依赖
可靠性消息不丢失,99.99%
延迟P99 < 100ms
吞吐量支持 10万 QPS

架构设计

整体架构图

mermaid
graph TB
    subgraph "API网关"
        A[API Gateway]
    end
    
    subgraph "同步服务"
        B[用户服务]
        C[商品服务]
    end
    
    subgraph "异步服务"
        D[订单服务]
        E[支付服务]
        F[库存服务]
        G[通知服务]
    end
    
    subgraph "RabbitMQ"
        H[服务交换机<br/>service.exchange]
        
        subgraph "请求队列"
            I[用户请求队列]
            J[商品请求队列]
            K[订单请求队列]
        end
        
        subgraph "响应队列"
            L[响应队列]
        end
        
        M[事件交换机<br/>event.exchange]
        N[死信队列]
    end
    
    subgraph "服务注册"
        O[服务发现]
        P[负载均衡]
    end
    
    A --> B
    A --> C
    A --> H
    
    H --> I
    H --> J
    H --> K
    
    I --> B
    J --> C
    K --> D
    
    B --> L
    C --> L
    D --> M
    
    M --> E
    M --> F
    M --> G
    
    O --> B
    O --> C
    O --> D
    
    I -.-> N
    J -.-> N
    K -.-> N

通信模式流程

mermaid
sequenceDiagram
    participant Gateway as API网关
    participant MQ as RabbitMQ
    participant Service as 目标服务
    participant Cache as 响应缓存
    
    alt 同步请求-响应模式
        Gateway->>MQ: 发送请求(带reply_to)
        MQ->>Service: 投递请求
        Service->>Service: 处理请求
        Service->>MQ: 发送响应到reply_to队列
        MQ->>Gateway: 投递响应
        Gateway-->>Gateway: 返回结果
    else 异步事件模式
        Gateway->>MQ: 发布事件
        MQ->>Service: 投递事件
        Service->>Service: 异步处理
        Service-->>MQ: ACK确认
    end

服务通信拓扑

mermaid
graph LR
    subgraph "核心服务"
        A[用户服务]
        B[订单服务]
        C[商品服务]
    end
    
    subgraph "支撑服务"
        D[支付服务]
        E[库存服务]
        F[物流服务]
    end
    
    subgraph "辅助服务"
        G[通知服务]
        H[分析服务]
        I[搜索服务]
    end
    
    A -->|用户事件| B
    A -->|用户事件| G
    B -->|订单事件| D
    B -->|订单事件| E
    B -->|订单事件| F
    B -->|订单事件| G
    C -->|商品事件| I
    D -->|支付事件| B
    E -->|库存事件| H

PHP 代码实现

服务消息基类

php
<?php

namespace App\Messaging;

abstract class ServiceMessage
{
    public string $messageId;
    public string $sourceService;
    public string $targetService;
    public string $action;
    public int $timestamp;
    public string $correlationId;
    public ?string $replyTo;
    public array $headers;

    public function __construct(
        string $sourceService,
        string $targetService,
        string $action
    ) {
        $this->messageId = $this->generateMessageId();
        $this->sourceService = $sourceService;
        $this->targetService = $targetService;
        $this->action = $action;
        $this->timestamp = time();
        $this->correlationId = $this->generateCorrelationId();
        $this->replyTo = null;
        $this->headers = [];
    }

    private function generateMessageId(): string
    {
        return sprintf('msg_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
    }

    private function generateCorrelationId(): string
    {
        return bin2hex(random_bytes(16));
    }

    public function setReplyTo(string $replyTo): self
    {
        $this->replyTo = $replyTo;
        return $this;
    }

    public function addHeader(string $key, $value): self
    {
        $this->headers[$key] = $value;
        return $this;
    }

    abstract public function getPayload(): array;

    public function toArray(): array
    {
        return [
            'message_id' => $this->messageId,
            'source_service' => $this->sourceService,
            'target_service' => $this->targetService,
            'action' => $this->action,
            'timestamp' => $this->timestamp,
            'correlation_id' => $this->correlationId,
            'reply_to' => $this->replyTo,
            'headers' => $this->headers,
            'payload' => $this->getPayload(),
        ];
    }
}

服务客户端实现

php
<?php

namespace App\Messaging;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class ServiceClient
{
    private AMQPStreamConnection $connection;
    private $channel;
    private string $serviceName;
    private string $exchangeName = 'service.exchange';
    private array $pendingResponses = [];
    private string $replyQueue;
    private float $defaultTimeout = 5.0;

    public function __construct(
        AMQPStreamConnection $connection,
        string $serviceName
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->serviceName = $serviceName;
        $this->setupInfrastructure();
    }

    private function setupInfrastructure(): void
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::DIRECT,
            false,
            true,
            false
        );

        [$this->replyQueue] = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );

        $this->channel->basic_consume(
            $this->replyQueue,
            '',
            false,
            true,
            false,
            false,
            [$this, 'handleResponse']
        );
    }

    public function call(
        string $targetService,
        string $action,
        array $payload,
        float $timeout = null
    ): array {
        $message = new ServiceRequest(
            $this->serviceName,
            $targetService,
            $action,
            $payload
        );

        $message->setReplyTo($this->replyQueue);

        $amqpMessage = new AMQPMessage(
            json_encode($message->toArray()),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $message->messageId,
                'correlation_id' => $message->correlationId,
                'reply_to' => $this->replyQueue,
            ]
        );

        $this->pendingResponses[$message->correlationId] = [
            'message_id' => $message->messageId,
            'response' => null,
            'received' => false,
        ];

        $routingKey = "request.{$targetService}";
        $this->channel->basic_publish(
            $amqpMessage,
            $this->exchangeName,
            $routingKey
        );

        return $this->waitForResponse(
            $message->correlationId,
            $timeout ?? $this->defaultTimeout
        );
    }

    public function callAsync(
        string $targetService,
        string $action,
        array $payload,
        callable $callback
    ): void {
        $message = new ServiceRequest(
            $this->serviceName,
            $targetService,
            $action,
            $payload
        );

        $message->setReplyTo($this->replyQueue);
        $message->addHeader('async_callback', true);

        $this->pendingResponses[$message->correlationId] = [
            'message_id' => $message->messageId,
            'callback' => $callback,
            'response' => null,
            'received' => false,
        ];

        $amqpMessage = new AMQPMessage(
            json_encode($message->toArray()),
            [
                'content_type' => 'application/json',
                'message_id' => $message->messageId,
                'correlation_id' => $message->correlationId,
                'reply_to' => $this->replyQueue,
            ]
        );

        $routingKey = "request.{$targetService}";
        $this->channel->basic_publish(
            $amqpMessage,
            $this->exchangeName,
            $routingKey
        );
    }

    public function handleResponse(AMQPMessage $message): void
    {
        $correlationId = $message->get('correlation_id');
        $response = json_decode($message->body, true);

        if (isset($this->pendingResponses[$correlationId])) {
            $this->pendingResponses[$correlationId]['response'] = $response;
            $this->pendingResponses[$correlationId]['received'] = true;

            if (isset($this->pendingResponses[$correlationId]['callback'])) {
                $callback = $this->pendingResponses[$correlationId]['callback'];
                $callback($response);
                unset($this->pendingResponses[$correlationId]);
            }
        }
    }

    private function waitForResponse(string $correlationId, float $timeout): array
    {
        $startTime = microtime(true);

        while (true) {
            $this->channel->wait(null, true);

            if ($this->pendingResponses[$correlationId]['received']) {
                $response = $this->pendingResponses[$correlationId]['response'];
                unset($this->pendingResponses[$correlationId]);
                return $response;
            }

            if ((microtime(true) - $startTime) > $timeout) {
                unset($this->pendingResponses[$correlationId]);
                throw new \RuntimeException("Service call timeout");
            }

            usleep(10000);
        }
    }

    public function close(): void
    {
        if ($this->channel) {
            $this->channel->close();
        }
    }
}

服务端实现

php
<?php

namespace App\Messaging;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class ServiceServer
{
    private AMQPStreamConnection $connection;
    private $channel;
    private string $serviceName;
    private string $exchangeName = 'service.exchange';
    private array $handlers = [];
    private bool $running = true;

    public function __construct(
        AMQPStreamConnection $connection,
        string $serviceName
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->serviceName = $serviceName;
        $this->setupInfrastructure();
    }

    private function setupInfrastructure(): void
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::DIRECT,
            false,
            true,
            false
        );

        $queueName = "service.{$this->serviceName}.request";
        $routingKey = "request.{$this->serviceName}";

        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false
        );

        $this->channel->queue_bind($queueName, $this->exchangeName, $routingKey);
    }

    public function registerHandler(string $action, callable $handler): void
    {
        $this->handlers[$action] = $handler;
    }

    public function start(): void
    {
        $queueName = "service.{$this->serviceName}.request";

        $this->channel->basic_qos(null, 10, null);

        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            false,
            false,
            false,
            [$this, 'handleRequest']
        );

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true);
            if (!$this->running) break;
            usleep(100000);
        }
    }

    public function handleRequest(AMQPMessage $message): void
    {
        $requestData = json_decode($message->body, true);
        $action = $requestData['action'];
        $payload = $requestData['payload'];
        $replyTo = $requestData['reply_to'];
        $correlationId = $message->get('correlation_id');

        try {
            if (!isset($this->handlers[$action])) {
                throw new \RuntimeException("Unknown action: {$action}");
            }

            $result = $this->handlers[$action]($payload);

            $response = [
                'success' => true,
                'data' => $result,
                'message_id' => $requestData['message_id'],
            ];

        } catch (\Exception $e) {
            $response = [
                'success' => false,
                'error' => $e->getMessage(),
                'message_id' => $requestData['message_id'],
            ];
        }

        if ($replyTo) {
            $responseMessage = new AMQPMessage(
                json_encode($response),
                [
                    'content_type' => 'application/json',
                    'correlation_id' => $correlationId,
                ]
            );

            $this->channel->basic_publish(
                $responseMessage,
                '',
                $replyTo
            );
        }

        $message->ack();
    }

    public function stop(): void
    {
        $this->running = false;
    }

    public function close(): void
    {
        $this->stop();
        if ($this->channel) {
            $this->channel->close();
        }
    }
}

服务代理实现

php
<?php

namespace App\Messaging;

class ServiceProxy
{
    private ServiceClient $client;
    private string $targetService;

    public function __construct(ServiceClient $client, string $targetService)
    {
        $this->client = $client;
        $this->targetService = $targetService;
    }

    public function __call(string $method, array $arguments): array
    {
        $action = $this->camelToSnake($method);
        $payload = $arguments[0] ?? [];

        return $this->client->call(
            $this->targetService,
            $action,
            $payload
        );
    }

    private function camelToSnake(string $input): string
    {
        return strtolower(preg_replace('/(?<!^)[A-Z]/', '_$0', $input));
    }
}

class ServiceProxyFactory
{
    private ServiceClient $client;

    public function __construct(ServiceClient $client)
    {
        $this->client = $client;
    }

    public function create(string $serviceName): ServiceProxy
    {
        return new ServiceProxy($this->client, $serviceName);
    }
}

服务发现集成

php
<?php

namespace App\Messaging;

class ServiceRegistry
{
    private $redis;
    private array $localCache = [];
    private int $cacheTtl = 60;

    public function __construct($redis)
    {
        $this->redis = $redis;
    }

    public function register(string $serviceName, string $instanceId, array $metadata = []): void
    {
        $key = "service:registry:{$serviceName}";
        $instance = array_merge([
            'instance_id' => $instanceId,
            'registered_at' => time(),
            'last_heartbeat' => time(),
        ], $metadata);

        $this->redis->hSet($key, $instanceId, json_encode($instance));
        $this->redis->expire($key, 300);
    }

    public function deregister(string $serviceName, string $instanceId): void
    {
        $key = "service:registry:{$serviceName}";
        $this->redis->hDel($key, $instanceId);
    }

    public function heartbeat(string $serviceName, string $instanceId): void
    {
        $key = "service:registry:{$serviceName}";
        $instance = $this->redis->hGet($key, $instanceId);

        if ($instance) {
            $data = json_decode($instance, true);
            $data['last_heartbeat'] = time();
            $this->redis->hSet($key, $instanceId, json_encode($data));
        }
    }

    public function getInstances(string $serviceName): array
    {
        if (isset($this->localCache[$serviceName])) {
            return $this->localCache[$serviceName];
        }

        $key = "service:registry:{$serviceName}";
        $instances = $this->redis->hGetAll($key);

        $result = [];
        foreach ($instances as $instanceId => $instance) {
            $data = json_decode($instance, true);
            if (time() - $data['last_heartbeat'] < 60) {
                $result[] = $data;
            }
        }

        $this->localCache[$serviceName] = $result;

        return $result;
    }

    public function getHealthyInstance(string $serviceName): ?array
    {
        $instances = $this->getInstances($serviceName);

        if (empty($instances)) {
            return null;
        }

        return $instances[array_rand($instances)];
    }
}

完整使用示例

php
<?php

require_once 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Messaging\{ServiceClient, ServiceServer, ServiceProxyFactory, ServiceRegistry};

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$redis = new Redis();
$redis->connect('localhost', 6379);

$registry = new ServiceRegistry($redis);

$registry->register('user-service', 'instance-1', [
    'host' => 'localhost',
    'port' => 8080,
]);

echo "用户服务已注册\n";

$userServer = new ServiceServer($connection, 'user-service');

$userServer->registerHandler('get_user', function (array $payload) {
    $userId = $payload['user_id'];
    
    return [
        'user_id' => $userId,
        'name' => '张三',
        'email' => 'user@example.com',
        'created_at' => '2024-01-01',
    ];
});

$userServer->registerHandler('validate_token', function (array $payload) {
    $token = $payload['token'];
    
    if (strlen($token) < 10) {
        throw new \RuntimeException('Invalid token');
    }
    
    return [
        'valid' => true,
        'user_id' => 'user_001',
        'expires_at' => time() + 3600,
    ];
});

echo "用户服务处理器已注册\n";

$pid = pcntl_fork();

if ($pid === 0) {
    echo "启动用户服务...\n";
    $userServer->start();
    exit(0);
}

sleep(1);

$orderClient = new ServiceClient($connection, 'order-service');
$proxyFactory = new ServiceProxyFactory($orderClient);

$userProxy = $proxyFactory->create('user-service');

echo "\n调用用户服务 - 获取用户信息:\n";
$result = $userProxy->getUser(['user_id' => 'user_001']);
echo json_encode($result, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";

echo "\n调用用户服务 - 验证Token:\n";
$result = $userProxy->validateToken(['token' => 'valid_token_12345']);
echo json_encode($result, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";

echo "\n异步调用用户服务:\n";
$orderClient->callAsync('user-service', 'get_user', ['user_id' => 'user_002'], function ($response) {
    echo "异步响应: " . json_encode($response, JSON_UNESCAPED_UNICODE) . "\n";
});

sleep(2);

posix_kill($pid, SIGTERM);
pcntl_wait($status);

$registry->deregister('user-service', 'instance-1');

$orderClient->close();
$userServer->close();
$connection->close();

echo "\n服务已关闭\n";

关键技术点解析

1. 请求-响应模式

使用 reply_to 和 correlation_id 实现同步调用:

php
$message->setReplyTo($this->replyQueue);
$amqpMessage->set('correlation_id', $correlationId);

2. 服务发现

通过 Redis 实现简单的服务注册与发现:

php
$registry->register('user-service', 'instance-1', $metadata);
$instance = $registry->getHealthyInstance('user-service');

3. 超时控制

php
private function waitForResponse(string $correlationId, float $timeout): array
{
    $startTime = microtime(true);
    while ((microtime(true) - $startTime) < $timeout) {
        // 等待响应
    }
    throw new \RuntimeException("Service call timeout");
}

4. 负载均衡

php
public function getHealthyInstance(string $serviceName): ?array
{
    $instances = $this->getInstances($serviceName);
    return $instances[array_rand($instances)];
}

性能优化建议

优化项建议说明
连接复用保持长连接减少连接开销
批量调用合并多个请求减少网络往返
本地缓存缓存服务实例减少注册中心压力
异步调用非阻塞处理提高并发能力

常见问题与解决方案

1. 服务超时

问题: 服务调用超时

解决方案:

  • 设置合理超时时间
  • 实现重试机制
  • 熔断降级

2. 服务不可用

问题: 目标服务宕机

解决方案:

  • 服务健康检查
  • 自动摘除不健康实例
  • 降级处理

3. 消息堆积

问题: 请求队列积压

解决方案:

  • 增加消费者实例
  • 动态扩容
  • 背压控制

4. 服务雪崩

问题: 级联失败

解决方案:

  • 熔断器模式
  • 限流保护
  • 降级策略

相关链接