Appearance
RPC 模式实现
概述
RPC(Remote Procedure Call,远程过程调用)模式是一种允许程序调用另一台计算机上的函数或方法,就像调用本地函数一样的通信模式。在 RabbitMQ 中,RPC 模式通过临时回复队列和关联标识(correlation_id)来实现请求与响应的匹配。
核心知识点
RPC 模式架构
mermaid
graph TB
subgraph 客户端
A[RPC Client] --> B[创建临时回复队列]
B --> C[生成 correlation_id]
C --> D[发送请求消息]
D --> E[等待响应]
end
subgraph RabbitMQ
F[请求队列] --> G[消息路由]
H[临时回复队列] --> I[响应消息]
end
subgraph 服务端
J[RPC Server] --> K[监听请求队列]
K --> L[处理请求]
L --> M[发送响应]
end
D --> F
G --> K
M --> H
I --> E核心组件说明
| 组件 | 作用 |
|---|---|
| 请求队列 | 服务端监听的队列,用于接收客户端请求 |
| 回复队列 | 临时队列,用于接收服务端的响应 |
| correlation_id | 关联标识,用于匹配请求和响应 |
| reply_to | 消息属性,指定回复队列名称 |
工作流程
mermaid
sequenceDiagram
participant C as 客户端
participant R as RabbitMQ
participant S as 服务端
Note over C: 1. 创建临时回复队列
C->>R: queue_declare(exclusive=true)
R-->>C: 返回队列名 amq.gen-xxx
Note over C: 2. 发送请求
C->>R: publish(reply_to, correlation_id)
R->>S: 投递请求消息
Note over S: 3. 处理请求
S->>S: 执行业务逻辑
Note over S: 4. 发送响应
S->>R: publish(to reply_to, correlation_id)
R->>C: 投递响应消息
Note over C: 5. 验证并处理响应
C->>C: 匹配 correlation_idPHP 代码示例
基础 RPC 客户端
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RpcClient
{
private $connection;
private $channel;
private $callbackQueue;
private $response;
private $corrId;
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
list($this->callbackQueue, ,) = $this->channel->queue_declare(
'',
false,
false,
true,
true
);
$this->channel->basic_consume(
$this->callbackQueue,
'',
false,
true,
false,
false,
[$this, 'onResponse']
);
}
public function onResponse(AMQPMessage $response)
{
if ($response->get('correlation_id') === $this->corrId) {
$this->response = $response->body;
}
}
public function call($queueName, $payload)
{
$this->response = null;
$this->corrId = uniqid();
$message = new AMQPMessage(
json_encode($payload),
[
'correlation_id' => $this->corrId,
'reply_to' => $this->callbackQueue
]
);
$this->channel->basic_publish($message, '', $queueName);
while (!$this->response) {
$this->channel->wait();
}
return json_decode($this->response, true);
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}基础 RPC 服务端
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RpcServer
{
private $connection;
private $channel;
private $queueName;
private $handlers = [];
public function __construct($queueName)
{
$this->queueName = $queueName;
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->queue_declare($queueName, false, false, false, false);
}
public function registerHandler($action, callable $handler)
{
$this->handlers[$action] = $handler;
}
public function start()
{
$callback = function (AMQPMessage $message) {
$payload = json_decode($message->body, true);
$action = $payload['action'] ?? 'default';
echo sprintf(
" [%s] 收到请求: %s\n",
date('Y-m-d H:i:s'),
$action
);
try {
if (!isset($this->handlers[$action])) {
throw new RuntimeException("未知的操作: {$action}");
}
$result = call_user_func($this->handlers[$action], $payload);
$response = [
'success' => true,
'data' => $result
];
} catch (Exception $e) {
$response = [
'success' => false,
'error' => [
'message' => $e->getMessage(),
'code' => $e->getCode()
]
];
}
$replyMessage = new AMQPMessage(
json_encode($response),
['correlation_id' => $message->get('correlation_id')]
);
$message->delivery_info['channel']->basic_publish(
$replyMessage,
'',
$message->get('reply_to')
);
$message->ack();
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
echo " [x] RPC 服务端已启动,等待请求...\n";
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}完整示例:用户服务 RPC
php
<?php
class UserServiceServer
{
private $users = [
1 => ['id' => 1, 'name' => '张三', 'email' => 'zhangsan@example.com'],
2 => ['id' => 2, 'name' => '李四', 'email' => 'lisi@example.com'],
3 => ['id' => 3, 'name' => '王五', 'email' => 'wangwu@example.com'],
];
public function getUser($payload)
{
$userId = $payload['user_id'] ?? null;
if (!$userId) {
throw new InvalidArgumentException('缺少 user_id 参数');
}
if (!isset($this->users[$userId])) {
throw new RuntimeException("用户不存在: {$userId}");
}
return $this->users[$userId];
}
public function createUser($payload)
{
$name = $payload['name'] ?? null;
$email = $payload['email'] ?? null;
if (!$name || !$email) {
throw new InvalidArgumentException('缺少必要参数');
}
$userId = max(array_keys($this->users)) + 1;
$this->users[$userId] = [
'id' => $userId,
'name' => $name,
'email' => $email
];
return $this->users[$userId];
}
public function listUsers($payload)
{
$page = $payload['page'] ?? 1;
$pageSize = $payload['page_size'] ?? 10;
$users = array_slice(
array_values($this->users),
($page - 1) * $pageSize,
$pageSize
);
return [
'users' => $users,
'total' => count($this->users),
'page' => $page,
'page_size' => $pageSize
];
}
}
$server = new RpcServer('user_service');
$service = new UserServiceServer();
$server->registerHandler('get', [$service, 'getUser']);
$server->registerHandler('create', [$service, 'createUser']);
$server->registerHandler('list', [$service, 'listUsers']);
try {
$server->start();
} catch (Exception $e) {
echo "服务异常: " . $e->getMessage() . "\n";
} finally {
$server->close();
}用户服务客户端
php
<?php
class UserServiceClient
{
private $rpcClient;
public function __construct()
{
$this->rpcClient = new RpcClient();
}
public function getUser($userId)
{
$response = $this->rpcClient->call('user_service', [
'action' => 'get',
'user_id' => $userId
]);
if (!$response['success']) {
throw new RuntimeException($response['error']['message']);
}
return $response['data'];
}
public function createUser($name, $email)
{
$response = $this->rpcClient->call('user_service', [
'action' => 'create',
'name' => $name,
'email' => $email
]);
if (!$response['success']) {
throw new RuntimeException($response['error']['message']);
}
return $response['data'];
}
public function listUsers($page = 1, $pageSize = 10)
{
$response = $this->rpcClient->call('user_service', [
'action' => 'list',
'page' => $page,
'page_size' => $pageSize
]);
if (!$response['success']) {
throw new RuntimeException($response['error']['message']);
}
return $response['data'];
}
public function close()
{
$this->rpcClient->close();
}
}
$client = new UserServiceClient();
try {
$user = $client->getUser(1);
echo "用户信息: " . json_encode($user, JSON_UNESCAPED_UNICODE) . "\n";
$newUser = $client->createUser('赵六', 'zhaoliu@example.com');
echo "创建用户: " . json_encode($newUser, JSON_UNESCAPED_UNICODE) . "\n";
$result = $client->listUsers(1, 5);
echo "用户列表: " . json_encode($result, JSON_UNESCAPED_UNICODE) . "\n";
} catch (Exception $e) {
echo "错误: " . $e->getMessage() . "\n";
} finally {
$client->close();
}带超时和重试的高级客户端
php
<?php
class AdvancedRpcClient
{
private $connection;
private $channel;
private $callbackQueue;
private $responses = [];
private $defaultTimeout = 5;
private $maxRetries = 3;
public function __construct($timeout = 5, $maxRetries = 3)
{
$this->defaultTimeout = $timeout;
$this->maxRetries = $maxRetries;
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
list($this->callbackQueue, ,) = $this->channel->queue_declare(
'',
false,
false,
true,
true
);
$this->channel->basic_consume(
$this->callbackQueue,
'',
false,
true,
false,
false,
[$this, 'onResponse']
);
}
public function onResponse(AMQPMessage $message)
{
$corrId = $message->get('correlation_id');
$this->responses[$corrId] = [
'body' => $message->body,
'timestamp' => microtime(true)
];
}
public function call($queueName, $payload, $timeout = null)
{
$timeout = $timeout ?? $this->defaultTimeout;
$lastException = null;
for ($attempt = 1; $attempt <= $this->maxRetries; $attempt++) {
try {
return $this->executeCall($queueName, $payload, $timeout);
} catch (TimeoutException $e) {
$lastException = $e;
$this->log("第 {$attempt} 次尝试超时,准备重试...");
if ($attempt < $this->maxRetries) {
usleep(500000);
}
} catch (Exception $e) {
throw $e;
}
}
throw $lastException;
}
private function executeCall($queueName, $payload, $timeout)
{
$corrId = $this->generateCorrelationId();
$message = new AMQPMessage(
json_encode($payload),
[
'correlation_id' => $corrId,
'reply_to' => $this->callbackQueue,
'timestamp' => time()
]
);
$this->channel->basic_publish($message, '', $queueName);
$startTime = microtime(true);
while (!isset($this->responses[$corrId])) {
$elapsed = microtime(true) - $startTime;
if ($elapsed > $timeout) {
throw new TimeoutException("RPC 调用超时 ({$timeout}秒)");
}
$remainingTime = $timeout - $elapsed;
try {
$this->channel->wait(null, false, $remainingTime);
} catch (PhpAmqpLib\Exception\AMQPTimeoutException $e) {
throw new TimeoutException("等待响应超时");
}
}
$response = $this->responses[$corrId];
unset($this->responses[$corrId]);
return json_decode($response['body'], true);
}
private function generateCorrelationId()
{
return sprintf(
'%s-%s-%s',
getmypid(),
microtime(true),
bin2hex(random_bytes(8))
);
}
private function log($message)
{
echo sprintf("[%s] %s\n", date('Y-m-d H:i:s'), $message);
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}
class TimeoutException extends RuntimeException
{
}实际应用场景
1. 微服务同步调用
php
<?php
class OrderService
{
private $userClient;
private $productClient;
private $inventoryClient;
public function createOrder($orderData)
{
$user = $this->userClient->call('user_service', [
'action' => 'get',
'user_id' => $orderData['user_id']
]);
if (!$user['success']) {
throw new RuntimeException('用户不存在');
}
$product = $this->productClient->call('product_service', [
'action' => 'get',
'product_id' => $orderData['product_id']
]);
if (!$product['success']) {
throw new RuntimeException('商品不存在');
}
$inventory = $this->inventoryClient->call('inventory_service', [
'action' => 'check',
'product_id' => $orderData['product_id'],
'quantity' => $orderData['quantity']
]);
if (!$inventory['success'] || !$inventory['data']['available']) {
throw new RuntimeException('库存不足');
}
return $this->saveOrder($orderData);
}
private function saveOrder($orderData)
{
return ['order_id' => uniqid(), 'status' => 'created'];
}
}2. 分布式事务协调
php
<?php
class DistributedTransaction
{
private $participants = [];
public function addParticipant($name, $rpcClient, $queueName)
{
$this->participants[$name] = [
'client' => $rpcClient,
'queue' => $queueName
];
}
public function execute($transactionData)
{
$prepared = [];
try {
foreach ($this->participants as $name => $config) {
$response = $config['client']->call($config['queue'], [
'action' => 'prepare',
'transaction_id' => $transactionData['transaction_id'],
'data' => $transactionData
]);
if (!$response['success']) {
throw new RuntimeException("参与者 {$name} 准备失败");
}
$prepared[] = $name;
}
foreach ($this->participants as $name => $config) {
$config['client']->call($config['queue'], [
'action' => 'commit',
'transaction_id' => $transactionData['transaction_id']
]);
}
return ['success' => true];
} catch (Exception $e) {
foreach ($prepared as $name) {
$config = $this->participants[$name];
try {
$config['client']->call($config['queue'], [
'action' => 'rollback',
'transaction_id' => $transactionData['transaction_id']
]);
} catch (Exception $rollbackError) {
error_log("回滚失败: {$name} - " . $rollbackError->getMessage());
}
}
return ['success' => false, 'error' => $e->getMessage()];
}
}
}3. 实时数据聚合
php
<?php
class DataAggregator
{
private $clients = [];
public function addSource($name, $client, $queueName)
{
$this->clients[$name] = [
'client' => $client,
'queue' => $queueName
];
}
public function aggregate($query)
{
$results = [];
$errors = [];
foreach ($this->clients as $name => $config) {
try {
$response = $config['client']->call($config['queue'], [
'action' => 'query',
'query' => $query
]);
if ($response['success']) {
$results[$name] = $response['data'];
} else {
$errors[$name] = $response['error'];
}
} catch (Exception $e) {
$errors[$name] = $e->getMessage();
}
}
return [
'results' => $results,
'errors' => $errors,
'summary' => $this->summarize($results)
];
}
private function summarize($results)
{
$summary = [];
foreach ($results as $source => $data) {
if (is_array($data) && isset($data['count'])) {
$summary[$source] = $data['count'];
}
}
return $summary;
}
}常见问题与解决方案
问题 1:请求响应不匹配
原因:并发请求时 correlation_id 管理不当
解决方案:
php
<?php
class ConcurrentRpcClient
{
private $responses = [];
private $lock;
public function __construct()
{
$this->lock = new \Swoole\Lock(SWOOLE_MUTEX);
}
public function onResponse(AMQPMessage $message)
{
$this->lock->lock();
try {
$this->responses[$message->get('correlation_id')] = $message->body;
} finally {
$this->lock->unlock();
}
}
public function getResponse($corrId)
{
$this->lock->lock();
try {
return $this->responses[$corrId] ?? null;
} finally {
$this->lock->unlock();
}
}
}问题 2:服务端处理缓慢导致超时
原因:业务逻辑执行时间过长
解决方案:
php
<?php
class AsyncRpcServer
{
private $server;
public function processRequest(AMQPMessage $message)
{
$payload = json_decode($message->body, true);
if ($this->isLongRunningTask($payload)) {
$taskId = $this->submitToWorker($payload);
$this->sendPendingResponse($message, $taskId);
} else {
$result = $this->processSync($payload);
$this->sendResponse($message, $result);
}
}
private function isLongRunningTask($payload)
{
return isset($payload['estimated_time']) && $payload['estimated_time'] > 5;
}
private function submitToWorker($payload)
{
return uniqid('task_');
}
private function sendPendingResponse($message, $taskId)
{
$response = new AMQPMessage(
json_encode([
'status' => 'pending',
'task_id' => $taskId
]),
['correlation_id' => $message->get('correlation_id')]
);
$message->delivery_info['channel']->basic_publish(
$response,
'',
$message->get('reply_to')
);
}
}问题 3:服务不可用时的降级处理
解决方案:
php
<?php
class ResilientRpcClient
{
private $circuitBreaker;
private $fallbackHandler;
public function call($queueName, $payload)
{
if (!$this->circuitBreaker->isAvailable($queueName)) {
return $this->handleFallback($queueName, $payload);
}
try {
$response = $this->executeCall($queueName, $payload);
$this->circuitBreaker->recordSuccess($queueName);
return $response;
} catch (TimeoutException $e) {
$this->circuitBreaker->recordFailure($queueName);
return $this->handleFallback($queueName, $payload);
}
}
private function handleFallback($queueName, $payload)
{
if ($this->fallbackHandler) {
return call_user_func($this->fallbackHandler, $queueName, $payload);
}
return [
'success' => false,
'error' => '服务暂时不可用',
'fallback' => true
];
}
}
class CircuitBreaker
{
private $states = [];
private $failureThreshold = 5;
private $recoveryTimeout = 60;
public function isAvailable($service)
{
if (!isset($this->states[$service])) {
return true;
}
$state = $this->states[$service];
if ($state['status'] === 'open') {
if (time() - $state['last_failure'] > $this->recoveryTimeout) {
$this->states[$service]['status'] = 'half-open';
return true;
}
return false;
}
return true;
}
public function recordSuccess($service)
{
$this->states[$service] = [
'status' => 'closed',
'failures' => 0
];
}
public function recordFailure($service)
{
if (!isset($this->states[$service])) {
$this->states[$service] = ['failures' => 0];
}
$this->states[$service]['failures']++;
$this->states[$service]['last_failure'] = time();
if ($this->states[$service]['failures'] >= $this->failureThreshold) {
$this->states[$service]['status'] = 'open';
}
}
}最佳实践建议
1. 统一响应格式
php
<?php
class RpcResponse
{
public static function success($data = null, $message = 'success')
{
return [
'success' => true,
'message' => $message,
'data' => $data,
'timestamp' => time()
];
}
public static function error($message, $code = 0, $details = null)
{
return [
'success' => false,
'error' => [
'message' => $message,
'code' => $code,
'details' => $details
],
'timestamp' => time()
];
}
}2. 请求日志追踪
php
<?php
class TracedRpcClient
{
private $logger;
public function call($queueName, $payload)
{
$traceId = $this->generateTraceId();
$startTime = microtime(true);
$this->logger->info('RPC 请求开始', [
'trace_id' => $traceId,
'queue' => $queueName,
'payload' => $payload
]);
try {
$response = $this->executeCall($queueName, $payload, $traceId);
$this->logger->info('RPC 请求成功', [
'trace_id' => $traceId,
'duration_ms' => round((microtime(true) - $startTime) * 1000, 2)
]);
return $response;
} catch (Exception $e) {
$this->logger->error('RPC 请求失败', [
'trace_id' => $traceId,
'duration_ms' => round((microtime(true) - $startTime) * 1000, 2),
'error' => $e->getMessage()
]);
throw $e;
}
}
private function generateTraceId()
{
return bin2hex(random_bytes(16));
}
}3. 服务注册与发现
php
<?php
class RpcServiceRegistry
{
private $services = [];
public function register($serviceName, $queueName, $metadata = [])
{
$this->services[$serviceName] = [
'queue' => $queueName,
'metadata' => $metadata,
'registered_at' => time()
];
}
public function getQueueName($serviceName)
{
if (!isset($this->services[$serviceName])) {
throw new RuntimeException("服务未注册: {$serviceName}");
}
return $this->services[$serviceName]['queue'];
}
public function listServices()
{
return array_keys($this->services);
}
}
class ServiceAwareRpcClient
{
private $registry;
private $clients = [];
public function call($serviceName, $payload)
{
$queueName = $this->registry->getQueueName($serviceName);
if (!isset($this->clients[$serviceName])) {
$this->clients[$serviceName] = new AdvancedRpcClient();
}
return $this->clients[$serviceName]->call($queueName, $payload);
}
}