Skip to content

RPC 模式实现

概述

RPC(Remote Procedure Call,远程过程调用)模式是一种允许程序调用另一台计算机上的函数或方法,就像调用本地函数一样的通信模式。在 RabbitMQ 中,RPC 模式通过临时回复队列和关联标识(correlation_id)来实现请求与响应的匹配。

核心知识点

RPC 模式架构

mermaid
graph TB
    subgraph 客户端
        A[RPC Client] --> B[创建临时回复队列]
        B --> C[生成 correlation_id]
        C --> D[发送请求消息]
        D --> E[等待响应]
    end

    subgraph RabbitMQ
        F[请求队列] --> G[消息路由]
        H[临时回复队列] --> I[响应消息]
    end

    subgraph 服务端
        J[RPC Server] --> K[监听请求队列]
        K --> L[处理请求]
        L --> M[发送响应]
    end

    D --> F
    G --> K
    M --> H
    I --> E

核心组件说明

组件作用
请求队列服务端监听的队列,用于接收客户端请求
回复队列临时队列,用于接收服务端的响应
correlation_id关联标识,用于匹配请求和响应
reply_to消息属性,指定回复队列名称

工作流程

mermaid
sequenceDiagram
    participant C as 客户端
    participant R as RabbitMQ
    participant S as 服务端

    Note over C: 1. 创建临时回复队列
    C->>R: queue_declare(exclusive=true)
    R-->>C: 返回队列名 amq.gen-xxx

    Note over C: 2. 发送请求
    C->>R: publish(reply_to, correlation_id)
    R->>S: 投递请求消息

    Note over S: 3. 处理请求
    S->>S: 执行业务逻辑

    Note over S: 4. 发送响应
    S->>R: publish(to reply_to, correlation_id)
    R->>C: 投递响应消息

    Note over C: 5. 验证并处理响应
    C->>C: 匹配 correlation_id

PHP 代码示例

基础 RPC 客户端

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RpcClient
{
    private $connection;
    private $channel;
    private $callbackQueue;
    private $response;
    private $corrId;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        list($this->callbackQueue, ,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );

        $this->channel->basic_consume(
            $this->callbackQueue,
            '',
            false,
            true,
            false,
            false,
            [$this, 'onResponse']
        );
    }

    public function onResponse(AMQPMessage $response)
    {
        if ($response->get('correlation_id') === $this->corrId) {
            $this->response = $response->body;
        }
    }

    public function call($queueName, $payload)
    {
        $this->response = null;
        $this->corrId = uniqid();

        $message = new AMQPMessage(
            json_encode($payload),
            [
                'correlation_id' => $this->corrId,
                'reply_to' => $this->callbackQueue
            ]
        );

        $this->channel->basic_publish($message, '', $queueName);

        while (!$this->response) {
            $this->channel->wait();
        }

        return json_decode($this->response, true);
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

基础 RPC 服务端

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RpcServer
{
    private $connection;
    private $channel;
    private $queueName;
    private $handlers = [];

    public function __construct($queueName)
    {
        $this->queueName = $queueName;

        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->queue_declare($queueName, false, false, false, false);
    }

    public function registerHandler($action, callable $handler)
    {
        $this->handlers[$action] = $handler;
    }

    public function start()
    {
        $callback = function (AMQPMessage $message) {
            $payload = json_decode($message->body, true);
            $action = $payload['action'] ?? 'default';

            echo sprintf(
                " [%s] 收到请求: %s\n",
                date('Y-m-d H:i:s'),
                $action
            );

            try {
                if (!isset($this->handlers[$action])) {
                    throw new RuntimeException("未知的操作: {$action}");
                }

                $result = call_user_func($this->handlers[$action], $payload);

                $response = [
                    'success' => true,
                    'data' => $result
                ];
            } catch (Exception $e) {
                $response = [
                    'success' => false,
                    'error' => [
                        'message' => $e->getMessage(),
                        'code' => $e->getCode()
                    ]
                ];
            }

            $replyMessage = new AMQPMessage(
                json_encode($response),
                ['correlation_id' => $message->get('correlation_id')]
            );

            $message->delivery_info['channel']->basic_publish(
                $replyMessage,
                '',
                $message->get('reply_to')
            );

            $message->ack();
        };

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        echo " [x] RPC 服务端已启动,等待请求...\n";

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

完整示例:用户服务 RPC

php
<?php

class UserServiceServer
{
    private $users = [
        1 => ['id' => 1, 'name' => '张三', 'email' => 'zhangsan@example.com'],
        2 => ['id' => 2, 'name' => '李四', 'email' => 'lisi@example.com'],
        3 => ['id' => 3, 'name' => '王五', 'email' => 'wangwu@example.com'],
    ];

    public function getUser($payload)
    {
        $userId = $payload['user_id'] ?? null;

        if (!$userId) {
            throw new InvalidArgumentException('缺少 user_id 参数');
        }

        if (!isset($this->users[$userId])) {
            throw new RuntimeException("用户不存在: {$userId}");
        }

        return $this->users[$userId];
    }

    public function createUser($payload)
    {
        $name = $payload['name'] ?? null;
        $email = $payload['email'] ?? null;

        if (!$name || !$email) {
            throw new InvalidArgumentException('缺少必要参数');
        }

        $userId = max(array_keys($this->users)) + 1;
        $this->users[$userId] = [
            'id' => $userId,
            'name' => $name,
            'email' => $email
        ];

        return $this->users[$userId];
    }

    public function listUsers($payload)
    {
        $page = $payload['page'] ?? 1;
        $pageSize = $payload['page_size'] ?? 10;

        $users = array_slice(
            array_values($this->users),
            ($page - 1) * $pageSize,
            $pageSize
        );

        return [
            'users' => $users,
            'total' => count($this->users),
            'page' => $page,
            'page_size' => $pageSize
        ];
    }
}

$server = new RpcServer('user_service');
$service = new UserServiceServer();

$server->registerHandler('get', [$service, 'getUser']);
$server->registerHandler('create', [$service, 'createUser']);
$server->registerHandler('list', [$service, 'listUsers']);

try {
    $server->start();
} catch (Exception $e) {
    echo "服务异常: " . $e->getMessage() . "\n";
} finally {
    $server->close();
}

用户服务客户端

php
<?php

class UserServiceClient
{
    private $rpcClient;

    public function __construct()
    {
        $this->rpcClient = new RpcClient();
    }

    public function getUser($userId)
    {
        $response = $this->rpcClient->call('user_service', [
            'action' => 'get',
            'user_id' => $userId
        ]);

        if (!$response['success']) {
            throw new RuntimeException($response['error']['message']);
        }

        return $response['data'];
    }

    public function createUser($name, $email)
    {
        $response = $this->rpcClient->call('user_service', [
            'action' => 'create',
            'name' => $name,
            'email' => $email
        ]);

        if (!$response['success']) {
            throw new RuntimeException($response['error']['message']);
        }

        return $response['data'];
    }

    public function listUsers($page = 1, $pageSize = 10)
    {
        $response = $this->rpcClient->call('user_service', [
            'action' => 'list',
            'page' => $page,
            'page_size' => $pageSize
        ]);

        if (!$response['success']) {
            throw new RuntimeException($response['error']['message']);
        }

        return $response['data'];
    }

    public function close()
    {
        $this->rpcClient->close();
    }
}

$client = new UserServiceClient();

try {
    $user = $client->getUser(1);
    echo "用户信息: " . json_encode($user, JSON_UNESCAPED_UNICODE) . "\n";

    $newUser = $client->createUser('赵六', 'zhaoliu@example.com');
    echo "创建用户: " . json_encode($newUser, JSON_UNESCAPED_UNICODE) . "\n";

    $result = $client->listUsers(1, 5);
    echo "用户列表: " . json_encode($result, JSON_UNESCAPED_UNICODE) . "\n";
} catch (Exception $e) {
    echo "错误: " . $e->getMessage() . "\n";
} finally {
    $client->close();
}

带超时和重试的高级客户端

php
<?php

class AdvancedRpcClient
{
    private $connection;
    private $channel;
    private $callbackQueue;
    private $responses = [];
    private $defaultTimeout = 5;
    private $maxRetries = 3;

    public function __construct($timeout = 5, $maxRetries = 3)
    {
        $this->defaultTimeout = $timeout;
        $this->maxRetries = $maxRetries;

        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        list($this->callbackQueue, ,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );

        $this->channel->basic_consume(
            $this->callbackQueue,
            '',
            false,
            true,
            false,
            false,
            [$this, 'onResponse']
        );
    }

    public function onResponse(AMQPMessage $message)
    {
        $corrId = $message->get('correlation_id');
        $this->responses[$corrId] = [
            'body' => $message->body,
            'timestamp' => microtime(true)
        ];
    }

    public function call($queueName, $payload, $timeout = null)
    {
        $timeout = $timeout ?? $this->defaultTimeout;
        $lastException = null;

        for ($attempt = 1; $attempt <= $this->maxRetries; $attempt++) {
            try {
                return $this->executeCall($queueName, $payload, $timeout);
            } catch (TimeoutException $e) {
                $lastException = $e;
                $this->log("第 {$attempt} 次尝试超时,准备重试...");

                if ($attempt < $this->maxRetries) {
                    usleep(500000);
                }
            } catch (Exception $e) {
                throw $e;
            }
        }

        throw $lastException;
    }

    private function executeCall($queueName, $payload, $timeout)
    {
        $corrId = $this->generateCorrelationId();

        $message = new AMQPMessage(
            json_encode($payload),
            [
                'correlation_id' => $corrId,
                'reply_to' => $this->callbackQueue,
                'timestamp' => time()
            ]
        );

        $this->channel->basic_publish($message, '', $queueName);

        $startTime = microtime(true);

        while (!isset($this->responses[$corrId])) {
            $elapsed = microtime(true) - $startTime;

            if ($elapsed > $timeout) {
                throw new TimeoutException("RPC 调用超时 ({$timeout}秒)");
            }

            $remainingTime = $timeout - $elapsed;

            try {
                $this->channel->wait(null, false, $remainingTime);
            } catch (PhpAmqpLib\Exception\AMQPTimeoutException $e) {
                throw new TimeoutException("等待响应超时");
            }
        }

        $response = $this->responses[$corrId];
        unset($this->responses[$corrId]);

        return json_decode($response['body'], true);
    }

    private function generateCorrelationId()
    {
        return sprintf(
            '%s-%s-%s',
            getmypid(),
            microtime(true),
            bin2hex(random_bytes(8))
        );
    }

    private function log($message)
    {
        echo sprintf("[%s] %s\n", date('Y-m-d H:i:s'), $message);
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

class TimeoutException extends RuntimeException
{
}

实际应用场景

1. 微服务同步调用

php
<?php

class OrderService
{
    private $userClient;
    private $productClient;
    private $inventoryClient;

    public function createOrder($orderData)
    {
        $user = $this->userClient->call('user_service', [
            'action' => 'get',
            'user_id' => $orderData['user_id']
        ]);

        if (!$user['success']) {
            throw new RuntimeException('用户不存在');
        }

        $product = $this->productClient->call('product_service', [
            'action' => 'get',
            'product_id' => $orderData['product_id']
        ]);

        if (!$product['success']) {
            throw new RuntimeException('商品不存在');
        }

        $inventory = $this->inventoryClient->call('inventory_service', [
            'action' => 'check',
            'product_id' => $orderData['product_id'],
            'quantity' => $orderData['quantity']
        ]);

        if (!$inventory['success'] || !$inventory['data']['available']) {
            throw new RuntimeException('库存不足');
        }

        return $this->saveOrder($orderData);
    }

    private function saveOrder($orderData)
    {
        return ['order_id' => uniqid(), 'status' => 'created'];
    }
}

2. 分布式事务协调

php
<?php

class DistributedTransaction
{
    private $participants = [];

    public function addParticipant($name, $rpcClient, $queueName)
    {
        $this->participants[$name] = [
            'client' => $rpcClient,
            'queue' => $queueName
        ];
    }

    public function execute($transactionData)
    {
        $prepared = [];

        try {
            foreach ($this->participants as $name => $config) {
                $response = $config['client']->call($config['queue'], [
                    'action' => 'prepare',
                    'transaction_id' => $transactionData['transaction_id'],
                    'data' => $transactionData
                ]);

                if (!$response['success']) {
                    throw new RuntimeException("参与者 {$name} 准备失败");
                }

                $prepared[] = $name;
            }

            foreach ($this->participants as $name => $config) {
                $config['client']->call($config['queue'], [
                    'action' => 'commit',
                    'transaction_id' => $transactionData['transaction_id']
                ]);
            }

            return ['success' => true];
        } catch (Exception $e) {
            foreach ($prepared as $name) {
                $config = $this->participants[$name];
                try {
                    $config['client']->call($config['queue'], [
                        'action' => 'rollback',
                        'transaction_id' => $transactionData['transaction_id']
                    ]);
                } catch (Exception $rollbackError) {
                    error_log("回滚失败: {$name} - " . $rollbackError->getMessage());
                }
            }

            return ['success' => false, 'error' => $e->getMessage()];
        }
    }
}

3. 实时数据聚合

php
<?php

class DataAggregator
{
    private $clients = [];

    public function addSource($name, $client, $queueName)
    {
        $this->clients[$name] = [
            'client' => $client,
            'queue' => $queueName
        ];
    }

    public function aggregate($query)
    {
        $results = [];
        $errors = [];

        foreach ($this->clients as $name => $config) {
            try {
                $response = $config['client']->call($config['queue'], [
                    'action' => 'query',
                    'query' => $query
                ]);

                if ($response['success']) {
                    $results[$name] = $response['data'];
                } else {
                    $errors[$name] = $response['error'];
                }
            } catch (Exception $e) {
                $errors[$name] = $e->getMessage();
            }
        }

        return [
            'results' => $results,
            'errors' => $errors,
            'summary' => $this->summarize($results)
        ];
    }

    private function summarize($results)
    {
        $summary = [];

        foreach ($results as $source => $data) {
            if (is_array($data) && isset($data['count'])) {
                $summary[$source] = $data['count'];
            }
        }

        return $summary;
    }
}

常见问题与解决方案

问题 1:请求响应不匹配

原因:并发请求时 correlation_id 管理不当

解决方案

php
<?php

class ConcurrentRpcClient
{
    private $responses = [];
    private $lock;

    public function __construct()
    {
        $this->lock = new \Swoole\Lock(SWOOLE_MUTEX);
    }

    public function onResponse(AMQPMessage $message)
    {
        $this->lock->lock();
        try {
            $this->responses[$message->get('correlation_id')] = $message->body;
        } finally {
            $this->lock->unlock();
        }
    }

    public function getResponse($corrId)
    {
        $this->lock->lock();
        try {
            return $this->responses[$corrId] ?? null;
        } finally {
            $this->lock->unlock();
        }
    }
}

问题 2:服务端处理缓慢导致超时

原因:业务逻辑执行时间过长

解决方案

php
<?php

class AsyncRpcServer
{
    private $server;

    public function processRequest(AMQPMessage $message)
    {
        $payload = json_decode($message->body, true);

        if ($this->isLongRunningTask($payload)) {
            $taskId = $this->submitToWorker($payload);

            $this->sendPendingResponse($message, $taskId);
        } else {
            $result = $this->processSync($payload);
            $this->sendResponse($message, $result);
        }
    }

    private function isLongRunningTask($payload)
    {
        return isset($payload['estimated_time']) && $payload['estimated_time'] > 5;
    }

    private function submitToWorker($payload)
    {
        return uniqid('task_');
    }

    private function sendPendingResponse($message, $taskId)
    {
        $response = new AMQPMessage(
            json_encode([
                'status' => 'pending',
                'task_id' => $taskId
            ]),
            ['correlation_id' => $message->get('correlation_id')]
        );

        $message->delivery_info['channel']->basic_publish(
            $response,
            '',
            $message->get('reply_to')
        );
    }
}

问题 3:服务不可用时的降级处理

解决方案

php
<?php

class ResilientRpcClient
{
    private $circuitBreaker;
    private $fallbackHandler;

    public function call($queueName, $payload)
    {
        if (!$this->circuitBreaker->isAvailable($queueName)) {
            return $this->handleFallback($queueName, $payload);
        }

        try {
            $response = $this->executeCall($queueName, $payload);
            $this->circuitBreaker->recordSuccess($queueName);
            return $response;
        } catch (TimeoutException $e) {
            $this->circuitBreaker->recordFailure($queueName);
            return $this->handleFallback($queueName, $payload);
        }
    }

    private function handleFallback($queueName, $payload)
    {
        if ($this->fallbackHandler) {
            return call_user_func($this->fallbackHandler, $queueName, $payload);
        }

        return [
            'success' => false,
            'error' => '服务暂时不可用',
            'fallback' => true
        ];
    }
}

class CircuitBreaker
{
    private $states = [];
    private $failureThreshold = 5;
    private $recoveryTimeout = 60;

    public function isAvailable($service)
    {
        if (!isset($this->states[$service])) {
            return true;
        }

        $state = $this->states[$service];

        if ($state['status'] === 'open') {
            if (time() - $state['last_failure'] > $this->recoveryTimeout) {
                $this->states[$service]['status'] = 'half-open';
                return true;
            }
            return false;
        }

        return true;
    }

    public function recordSuccess($service)
    {
        $this->states[$service] = [
            'status' => 'closed',
            'failures' => 0
        ];
    }

    public function recordFailure($service)
    {
        if (!isset($this->states[$service])) {
            $this->states[$service] = ['failures' => 0];
        }

        $this->states[$service]['failures']++;
        $this->states[$service]['last_failure'] = time();

        if ($this->states[$service]['failures'] >= $this->failureThreshold) {
            $this->states[$service]['status'] = 'open';
        }
    }
}

最佳实践建议

1. 统一响应格式

php
<?php

class RpcResponse
{
    public static function success($data = null, $message = 'success')
    {
        return [
            'success' => true,
            'message' => $message,
            'data' => $data,
            'timestamp' => time()
        ];
    }

    public static function error($message, $code = 0, $details = null)
    {
        return [
            'success' => false,
            'error' => [
                'message' => $message,
                'code' => $code,
                'details' => $details
            ],
            'timestamp' => time()
        ];
    }
}

2. 请求日志追踪

php
<?php

class TracedRpcClient
{
    private $logger;

    public function call($queueName, $payload)
    {
        $traceId = $this->generateTraceId();
        $startTime = microtime(true);

        $this->logger->info('RPC 请求开始', [
            'trace_id' => $traceId,
            'queue' => $queueName,
            'payload' => $payload
        ]);

        try {
            $response = $this->executeCall($queueName, $payload, $traceId);

            $this->logger->info('RPC 请求成功', [
                'trace_id' => $traceId,
                'duration_ms' => round((microtime(true) - $startTime) * 1000, 2)
            ]);

            return $response;
        } catch (Exception $e) {
            $this->logger->error('RPC 请求失败', [
                'trace_id' => $traceId,
                'duration_ms' => round((microtime(true) - $startTime) * 1000, 2),
                'error' => $e->getMessage()
            ]);

            throw $e;
        }
    }

    private function generateTraceId()
    {
        return bin2hex(random_bytes(16));
    }
}

3. 服务注册与发现

php
<?php

class RpcServiceRegistry
{
    private $services = [];

    public function register($serviceName, $queueName, $metadata = [])
    {
        $this->services[$serviceName] = [
            'queue' => $queueName,
            'metadata' => $metadata,
            'registered_at' => time()
        ];
    }

    public function getQueueName($serviceName)
    {
        if (!isset($this->services[$serviceName])) {
            throw new RuntimeException("服务未注册: {$serviceName}");
        }

        return $this->services[$serviceName]['queue'];
    }

    public function listServices()
    {
        return array_keys($this->services);
    }
}

class ServiceAwareRpcClient
{
    private $registry;
    private $clients = [];

    public function call($serviceName, $payload)
    {
        $queueName = $this->registry->getQueueName($serviceName);

        if (!isset($this->clients[$serviceName])) {
            $this->clients[$serviceName] = new AdvancedRpcClient();
        }

        return $this->clients[$serviceName]->call($queueName, $payload);
    }
}

相关链接