Appearance
RPC 通信模式
概述
RPC(Remote Procedure Call,远程过程调用)通信模式是一种允许程序调用另一台计算机上的函数或方法,就像调用本地函数一样的通信模式。在 RabbitMQ 中,RPC 模式通过临时回复队列和关联标识(correlation_id)来实现请求与响应的匹配。
核心知识点
架构图
mermaid
graph TB
subgraph 客户端
C[RPC Client]
RQ[Reply Queue]
end
subgraph RabbitMQ
R[RPC Queue]
end
subgraph 服务端
S[RPC Server]
end
C -->|1. 发送请求<br/>reply_to, correlation_id| R
R -->|2. 投递请求| S
S -->|3. 处理请求| S
S -->|4. 发送响应<br/>correlation_id| RQ
RQ -->|5. 投递响应| C
style C fill:#e1f5fe
style S fill:#e8f5e9
style RQ fill:#fff3e0工作流程
mermaid
sequenceDiagram
participant C as 客户端
participant R as RPC队列
participant S as 服务端
participant Q as 回复队列
Note over C: 创建临时回复队列
C->>Q: queue_declare(exclusive=true)
Q-->>C: 返回队列名
Note over C: 发送请求
C->>R: 消息(reply_to, correlation_id)
R->>S: 投递请求
Note over S: 处理请求
S->>S: 执行业务逻辑
Note over S: 发送响应
S->>Q: 消息(correlation_id)
Q->>C: 投递响应
Note over C: 验证响应
C->>C: 匹配 correlation_id核心概念
| 概念 | 说明 |
|---|---|
| RPC Queue | 服务端监听的请求队列 |
| Reply Queue | 客户端创建的临时回复队列 |
| correlation_id | 关联标识,用于匹配请求和响应 |
| reply_to | 消息属性,指定回复队列名称 |
PHP 代码示例
RPC 客户端
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RpcClient
{
private $connection;
private $channel;
private $callbackQueue;
private $response;
private $corrId;
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
list($this->callbackQueue, ,) = $this->channel->queue_declare(
'',
false,
false,
true,
true
);
$this->channel->basic_consume(
$this->callbackQueue,
'',
false,
true,
false,
false,
[$this, 'onResponse']
);
}
public function onResponse(AMQPMessage $response)
{
if ($response->get('correlation_id') === $this->corrId) {
$this->response = $response->body;
}
}
public function call($queueName, $payload)
{
$this->response = null;
$this->corrId = uniqid();
$msg = new AMQPMessage(
json_encode($payload),
[
'correlation_id' => $this->corrId,
'reply_to' => $this->callbackQueue
]
);
$this->channel->basic_publish($msg, '', $queueName);
while (!$this->response) {
$this->channel->wait();
}
return json_decode($this->response, true);
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}RPC 服务端
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RpcServer
{
private $connection;
private $channel;
private $queueName;
private $handlers = [];
public function __construct($queueName)
{
$this->queueName = $queueName;
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->queue_declare($queueName, false, false, false, false);
}
public function registerHandler($action, callable $handler)
{
$this->handlers[$action] = $handler;
}
public function start()
{
$callback = function (AMQPMessage $message) {
$payload = json_decode($message->body, true);
$action = $payload['action'] ?? 'default';
echo sprintf(
" [%s] 收到请求: %s\n",
date('Y-m-d H:i:s'),
$action
);
try {
if (!isset($this->handlers[$action])) {
throw new RuntimeException("未知的操作: {$action}");
}
$result = call_user_func($this->handlers[$action], $payload);
$response = [
'success' => true,
'data' => $result
];
} catch (Exception $e) {
$response = [
'success' => false,
'error' => [
'message' => $e->getMessage(),
'code' => $e->getCode()
]
];
}
$replyMessage = new AMQPMessage(
json_encode($response),
['correlation_id' => $message->get('correlation_id')]
);
$message->delivery_info['channel']->basic_publish(
$replyMessage,
'',
$message->get('reply_to')
);
$message->ack();
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
echo " [x] RPC 服务端已启动,等待请求...\n";
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}带超时的 RPC 客户端
php
<?php
class RpcClientWithTimeout
{
private $connection;
private $channel;
private $callbackQueue;
private $responses = [];
private $defaultTimeout = 5;
public function __construct($timeout = 5)
{
$this->defaultTimeout = $timeout;
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
list($this->callbackQueue, ,) = $this->channel->queue_declare(
'',
false,
false,
true,
true
);
$this->channel->basic_consume(
$this->callbackQueue,
'',
false,
true,
false,
false,
[$this, 'onResponse']
);
}
public function onResponse(AMQPMessage $message)
{
$corrId = $message->get('correlation_id');
$this->responses[$corrId] = [
'body' => $message->body,
'timestamp' => microtime(true)
];
}
public function call($queueName, $payload, $timeout = null)
{
$timeout = $timeout ?? $this->defaultTimeout;
$corrId = $this->generateCorrelationId();
$msg = new AMQPMessage(
json_encode($payload),
[
'correlation_id' => $corrId,
'reply_to' => $this->callbackQueue,
'timestamp' => time()
]
);
$this->channel->basic_publish($msg, '', $queueName);
$startTime = microtime(true);
while (!isset($this->responses[$corrId])) {
$elapsed = microtime(true) - $startTime;
if ($elapsed > $timeout) {
throw new RuntimeException("RPC 调用超时 ({$timeout}秒)");
}
$remainingTime = $timeout - $elapsed;
try {
$this->channel->wait(null, false, $remainingTime);
} catch (PhpAmqpLib\Exception\AMQPTimeoutException $e) {
throw new RuntimeException("等待响应超时");
}
}
$response = $this->responses[$corrId];
unset($this->responses[$corrId]);
return json_decode($response['body'], true);
}
private function generateCorrelationId()
{
return sprintf(
'%s-%s-%s',
getmypid(),
microtime(true),
bin2hex(random_bytes(8))
);
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}完整示例:用户服务 RPC
php
<?php
class UserServiceRpcServer
{
private $server;
private $users = [
1 => ['id' => 1, 'name' => '张三', 'email' => 'zhangsan@example.com'],
2 => ['id' => 2, 'name' => '李四', 'email' => 'lisi@example.com'],
3 => ['id' => 3, 'name' => '王五', 'email' => 'wangwu@example.com'],
];
public function __construct()
{
$this->server = new RpcServer('user_service');
$this->registerHandlers();
}
private function registerHandlers()
{
$this->server->registerHandler('get', [$this, 'getUser']);
$this->server->registerHandler('create', [$this, 'createUser']);
$this->server->registerHandler('update', [$this, 'updateUser']);
$this->server->registerHandler('delete', [$this, 'deleteUser']);
$this->server->registerHandler('list', [$this, 'listUsers']);
}
public function getUser($payload)
{
$userId = $payload['user_id'] ?? null;
if (!$userId) {
throw new InvalidArgumentException('缺少 user_id 参数');
}
if (!isset($this->users[$userId])) {
throw new RuntimeException("用户不存在: {$userId}");
}
return $this->users[$userId];
}
public function createUser($payload)
{
$name = $payload['name'] ?? null;
$email = $payload['email'] ?? null;
if (!$name || !$email) {
throw new InvalidArgumentException('缺少必要参数');
}
$userId = max(array_keys($this->users)) + 1;
$this->users[$userId] = [
'id' => $userId,
'name' => $name,
'email' => $email
];
return $this->users[$userId];
}
public function updateUser($payload)
{
$userId = $payload['user_id'] ?? null;
if (!$userId || !isset($this->users[$userId])) {
throw new RuntimeException("用户不存在: {$userId}");
}
if (isset($payload['name'])) {
$this->users[$userId]['name'] = $payload['name'];
}
if (isset($payload['email'])) {
$this->users[$userId]['email'] = $payload['email'];
}
return $this->users[$userId];
}
public function deleteUser($payload)
{
$userId = $payload['user_id'] ?? null;
if (!$userId || !isset($this->users[$userId])) {
throw new RuntimeException("用户不存在: {$userId}");
}
unset($this->users[$userId]);
return ['deleted' => true];
}
public function listUsers($payload)
{
$page = $payload['page'] ?? 1;
$pageSize = $payload['page_size'] ?? 10;
$users = array_slice(
array_values($this->users),
($page - 1) * $pageSize,
$pageSize
);
return [
'users' => $users,
'total' => count($this->users),
'page' => $page,
'page_size' => $pageSize
];
}
public function start()
{
$this->server->start();
}
}
class UserServiceRpcClient
{
private $client;
public function __construct()
{
$this->client = new RpcClientWithTimeout(10);
}
public function getUser($userId)
{
$response = $this->client->call('user_service', [
'action' => 'get',
'user_id' => $userId
]);
if (!$response['success']) {
throw new RuntimeException($response['error']['message']);
}
return $response['data'];
}
public function createUser($name, $email)
{
$response = $this->client->call('user_service', [
'action' => 'create',
'name' => $name,
'email' => $email
]);
if (!$response['success']) {
throw new RuntimeException($response['error']['message']);
}
return $response['data'];
}
public function updateUser($userId, $data)
{
$response = $this->client->call('user_service', array_merge([
'action' => 'update',
'user_id' => $userId
], $data));
if (!$response['success']) {
throw new RuntimeException($response['error']['message']);
}
return $response['data'];
}
public function deleteUser($userId)
{
$response = $this->client->call('user_service', [
'action' => 'delete',
'user_id' => $userId
]);
if (!$response['success']) {
throw new RuntimeException($response['error']['message']);
}
return $response['data'];
}
public function listUsers($page = 1, $pageSize = 10)
{
$response = $this->client->call('user_service', [
'action' => 'list',
'page' => $page,
'page_size' => $pageSize
]);
if (!$response['success']) {
throw new RuntimeException($response['error']['message']);
}
return $response['data'];
}
public function close()
{
$this->client->close();
}
}实际应用场景
1. 微服务间同步调用
php
<?php
class OrderServiceRpcClient
{
private $userClient;
private $productClient;
private $inventoryClient;
public function __construct()
{
$this->userClient = new RpcClientWithTimeout(5);
$this->productClient = new RpcClientWithTimeout(5);
$this->inventoryClient = new RpcClientWithTimeout(5);
}
public function createOrder($orderData)
{
$userResponse = $this->userClient->call('user_service', [
'action' => 'get',
'user_id' => $orderData['user_id']
]);
if (!$userResponse['success']) {
throw new RuntimeException('用户不存在');
}
$productResponse = $this->productClient->call('product_service', [
'action' => 'get',
'product_id' => $orderData['product_id']
]);
if (!$productResponse['success']) {
throw new RuntimeException('商品不存在');
}
$inventoryResponse = $this->inventoryClient->call('inventory_service', [
'action' => 'check',
'product_id' => $orderData['product_id'],
'quantity' => $orderData['quantity']
]);
if (!$inventoryResponse['success'] || !$inventoryResponse['data']['available']) {
throw new RuntimeException('库存不足');
}
return $this->saveOrder($orderData);
}
private function saveOrder($orderData)
{
return ['order_id' => uniqid(), 'status' => 'created'];
}
}2. 分布式事务协调
php
<?php
class DistributedTransactionCoordinator
{
private $participants = [];
public function addParticipant($name, $client, $queueName)
{
$this->participants[$name] = [
'client' => $client,
'queue' => $queueName
];
}
public function execute($transactionData)
{
$prepared = [];
try {
foreach ($this->participants as $name => $config) {
$response = $config['client']->call($config['queue'], [
'action' => 'prepare',
'transaction_id' => $transactionData['transaction_id'],
'data' => $transactionData
]);
if (!$response['success']) {
throw new RuntimeException("参与者 {$name} 准备失败");
}
$prepared[] = $name;
}
foreach ($this->participants as $name => $config) {
$config['client']->call($config['queue'], [
'action' => 'commit',
'transaction_id' => $transactionData['transaction_id']
]);
}
return ['success' => true];
} catch (Exception $e) {
foreach ($prepared as $name) {
$config = $this->participants[$name];
try {
$config['client']->call($config['queue'], [
'action' => 'rollback',
'transaction_id' => $transactionData['transaction_id']
]);
} catch (Exception $rollbackError) {
error_log("回滚失败: {$name} - " . $rollbackError->getMessage());
}
}
return ['success' => false, 'error' => $e->getMessage()];
}
}
}3. 实时数据聚合
php
<?php
class DataAggregationRpcClient
{
private $clients = [];
public function addSource($name, $client, $queueName)
{
$this->clients[$name] = [
'client' => $client,
'queue' => $queueName
];
}
public function aggregate($query)
{
$results = [];
$errors = [];
foreach ($this->clients as $name => $config) {
try {
$response = $config['client']->call($config['queue'], [
'action' => 'query',
'query' => $query
]);
if ($response['success']) {
$results[$name] = $response['data'];
} else {
$errors[$name] = $response['error'];
}
} catch (Exception $e) {
$errors[$name] = $e->getMessage();
}
}
return [
'results' => $results,
'errors' => $errors,
'summary' => $this->summarize($results)
];
}
private function summarize($results)
{
$summary = [];
foreach ($results as $source => $data) {
if (is_array($data) && isset($data['count'])) {
$summary[$source] = $data['count'];
}
}
return $summary;
}
}常见问题与解决方案
问题 1:请求响应不匹配
解决方案:
php
<?php
class CorrelationIdManager
{
private static $counter = 0;
public static function generate()
{
self::$counter++;
return sprintf(
'%s-%d-%d-%s',
getmypid(),
time(),
self::$counter,
bin2hex(random_bytes(4))
);
}
public static function validate($correlationId)
{
if (empty($correlationId)) {
throw new InvalidArgumentException("correlation_id 不能为空");
}
return true;
}
}
class ThreadSafeRpcClient
{
private $responses = [];
private $lock;
public function __construct()
{
$this->lock = new \Swoole\Lock(SWOOLE_MUTEX);
}
public function onResponse(AMQPMessage $message)
{
$this->lock->lock();
try {
$this->responses[$message->get('correlation_id')] = $message->body;
} finally {
$this->lock->unlock();
}
}
public function getResponse($corrId)
{
$this->lock->lock();
try {
return $this->responses[$corrId] ?? null;
} finally {
$this->lock->unlock();
}
}
}问题 2:服务端处理缓慢
解决方案:
php
<?php
class AsyncRpcServer
{
public function processRequest(AMQPMessage $message)
{
$payload = json_decode($message->body, true);
if ($this->isLongRunningTask($payload)) {
$taskId = $this->submitToWorker($payload);
$this->sendPendingResponse($message, $taskId);
} else {
$result = $this->processSync($payload);
$this->sendResponse($message, $result);
}
}
private function isLongRunningTask($payload)
{
return isset($payload['estimated_time']) && $payload['estimated_time'] > 5;
}
private function submitToWorker($payload)
{
return uniqid('task_');
}
private function sendPendingResponse($message, $taskId)
{
$response = new AMQPMessage(
json_encode([
'success' => true,
'status' => 'pending',
'task_id' => $taskId
]),
['correlation_id' => $message->get('correlation_id')]
);
$message->delivery_info['channel']->basic_publish(
$response,
'',
$message->get('reply_to')
);
}
}问题 3:服务不可用降级
解决方案:
php
<?php
class ResilientRpcClient
{
private $client;
private $circuitBreaker;
private $fallbackHandler;
public function call($queueName, $payload)
{
if (!$this->circuitBreaker->isAvailable($queueName)) {
return $this->handleFallback($queueName, $payload);
}
try {
$response = $this->client->call($queueName, $payload);
$this->circuitBreaker->recordSuccess($queueName);
return $response;
} catch (RuntimeException $e) {
$this->circuitBreaker->recordFailure($queueName);
return $this->handleFallback($queueName, $payload);
}
}
private function handleFallback($queueName, $payload)
{
if ($this->fallbackHandler) {
return call_user_func($this->fallbackHandler, $queueName, $payload);
}
return [
'success' => false,
'error' => '服务暂时不可用',
'fallback' => true
];
}
}
class CircuitBreaker
{
private $states = [];
private $failureThreshold = 5;
private $recoveryTimeout = 60;
public function isAvailable($service)
{
if (!isset($this->states[$service])) {
return true;
}
$state = $this->states[$service];
if ($state['status'] === 'open') {
if (time() - $state['last_failure'] > $this->recoveryTimeout) {
$this->states[$service]['status'] = 'half-open';
return true;
}
return false;
}
return true;
}
public function recordSuccess($service)
{
$this->states[$service] = [
'status' => 'closed',
'failures' => 0
];
}
public function recordFailure($service)
{
if (!isset($this->states[$service])) {
$this->states[$service] = ['failures' => 0];
}
$this->states[$service]['failures']++;
$this->states[$service]['last_failure'] = time();
if ($this->states[$service]['failures'] >= $this->failureThreshold) {
$this->states[$service]['status'] = 'open';
}
}
}最佳实践建议
1. 统一响应格式
php
<?php
class RpcResponse
{
public static function success($data = null, $message = 'success')
{
return [
'success' => true,
'message' => $message,
'data' => $data,
'timestamp' => time()
];
}
public static function error($message, $code = 0, $details = null)
{
return [
'success' => false,
'error' => [
'message' => $message,
'code' => $code,
'details' => $details
],
'timestamp' => time()
];
}
}2. 请求日志追踪
php
<?php
class TracedRpcClient
{
private $client;
private $logger;
public function call($queueName, $payload)
{
$traceId = $this->generateTraceId();
$startTime = microtime(true);
$this->logger->info('RPC 请求开始', [
'trace_id' => $traceId,
'queue' => $queueName,
'payload' => $payload
]);
try {
$response = $this->client->call($queueName, $payload);
$this->logger->info('RPC 请求成功', [
'trace_id' => $traceId,
'duration_ms' => round((microtime(true) - $startTime) * 1000, 2)
]);
return $response;
} catch (Exception $e) {
$this->logger->error('RPC 请求失败', [
'trace_id' => $traceId,
'duration_ms' => round((microtime(true) - $startTime) * 1000, 2),
'error' => $e->getMessage()
]);
throw $e;
}
}
private function generateTraceId()
{
return bin2hex(random_bytes(16));
}
}3. 服务注册与发现
php
<?php
class RpcServiceRegistry
{
private $services = [];
public function register($serviceName, $queueName, $metadata = [])
{
$this->services[$serviceName] = [
'queue' => $queueName,
'metadata' => $metadata,
'registered_at' => time()
];
}
public function getQueueName($serviceName)
{
if (!isset($this->services[$serviceName])) {
throw new RuntimeException("服务未注册: {$serviceName}");
}
return $this->services[$serviceName]['queue'];
}
public function listServices()
{
return array_keys($this->services);
}
}
class ServiceAwareRpcClient
{
private $registry;
private $clients = [];
public function call($serviceName, $payload)
{
$queueName = $this->registry->getQueueName($serviceName);
if (!isset($this->clients[$serviceName])) {
$this->clients[$serviceName] = new RpcClientWithTimeout();
}
return $this->clients[$serviceName]->call($queueName, $payload);
}
}