Skip to content

临时回复队列

概述

临时回复队列(Temporary Reply Queue)是 RabbitMQ 中实现双向通信的重要机制。它允许消费者将处理结果返回给生产者,常用于 RPC(远程过程调用)模式和需要确认响应的场景。

核心知识点

什么是临时回复队列

临时回复队列是一种具有以下特点的队列:

  1. 自动删除:当消费者断开连接时,队列会自动删除
  2. 独占访问:只有创建该队列的连接才能访问
  3. 随机命名:由 RabbitMQ 自动生成唯一的队列名称
  4. 生命周期短:通常用于单次请求-响应场景

工作原理

mermaid
sequenceDiagram
    participant P as 生产者
    participant R as RabbitMQ
    participant C as 消费者
    participant Q as 临时回复队列

    P->>R: 声明临时回复队列
    R-->>P: 返回队列名称(如: amq.gen-xxx)
    P->>R: 发送消息(设置 reply_to 属性)
    R->>C: 投递消息到工作队列
    C->>R: 处理完成后发送响应到 reply_to
    R->>Q: 投递响应消息
    P->>Q: 从临时队列接收响应
    P->>R: 关闭连接
    R->>Q: 自动删除临时队列

关键属性说明

属性说明
exclusive设置为 true,表示独占队列
auto_delete设置为 true,表示自动删除
durable设置为 false,临时队列不需要持久化

PHP 代码示例

基础示例:创建临时回复队列

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class TemporaryReplyQueue
{
    private $connection;
    private $channel;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();
    }

    public function createTemporaryQueue()
    {
        list($queueName, ,) = $this->channel->queue_declare(
            '',       
            false,    
            false,    
            true,     
            true      
        );

        echo "临时队列已创建: {$queueName}\n";
        return $queueName;
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$tempQueue = new TemporaryReplyQueue();
$queueName = $tempQueue->createTemporaryQueue();
$tempQueue->close();

完整示例:请求-响应模式

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RpcClient
{
    private $connection;
    private $channel;
    private $callbackQueue;
    private $response;
    private $corrId;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        list($this->callbackQueue, ,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );

        $this->channel->basic_consume(
            $this->callbackQueue,
            '',
            false,
            true,
            false,
            false,
            [$this, 'onResponse']
        );
    }

    public function onResponse(AMQPMessage $response)
    {
        if ($response->get('correlation_id') == $this->corrId) {
            $this->response = $response->body;
        }
    }

    public function call($n)
    {
        $this->response = null;
        $this->corrId = uniqid();

        $msg = new AMQPMessage(
            (string)$n,
            [
                'correlation_id' => $this->corrId,
                'reply_to' => $this->callbackQueue
            ]
        );

        $this->channel->basic_publish($msg, '', 'rpc_queue');

        while (!$this->response) {
            $this->channel->wait();
        }

        return $this->response;
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$rpcClient = new RpcClient();

try {
    $response = $rpcClient->call(30);
    echo " [.] 收到响应: {$response}\n";
} catch (Exception $e) {
    echo "请求失败: " . $e->getMessage() . "\n";
} finally {
    $rpcClient->close();
}

RPC 服务端示例

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RpcServer
{
    private $connection;
    private $channel;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->queue_declare('rpc_queue', false, false, false, false);
    }

    public function fib($n)
    {
        if ($n == 0) return 0;
        if ($n == 1) return 1;
        return $this->fib($n - 1) + $this->fib($n - 2);
    }

    public function start()
    {
        $callback = function (AMQPMessage $req) {
            $n = intval($req->body);
            echo " [.] 收到请求: fib({$n})\n";

            $result = $this->fib($n);

            $msg = new AMQPMessage(
                (string)$result,
                ['correlation_id' => $req->get('correlation_id')]
            );

            $req->delivery_info['channel']->basic_publish(
                $msg,
                '',
                $req->get('reply_to')
            );

            $req->ack();
        };

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

        echo " [x] RPC 服务端启动,等待请求...\n";

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$server = new RpcServer();

try {
    $server->start();
} catch (Exception $e) {
    echo "服务异常: " . $e->getMessage() . "\n";
} finally {
    $server->close();
}

带超时的请求示例

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RpcClientWithTimeout
{
    private $connection;
    private $channel;
    private $callbackQueue;
    private $responses = [];
    private $timeout = 5;

    public function __construct($timeout = 5)
    {
        $this->timeout = $timeout;

        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        list($this->callbackQueue, ,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );

        $this->channel->basic_consume(
            $this->callbackQueue,
            '',
            false,
            true,
            false,
            false,
            [$this, 'onResponse']
        );
    }

    public function onResponse(AMQPMessage $response)
    {
        $corrId = $response->get('correlation_id');
        $this->responses[$corrId] = $response->body;
    }

    public function call($routingKey, $message, $timeout = null)
    {
        $timeout = $timeout ?? $this->timeout;
        $corrId = uniqid();

        $msg = new AMQPMessage(
            json_encode($message),
            [
                'correlation_id' => $corrId,
                'reply_to' => $this->callbackQueue,
                'content_type' => 'application/json'
            ]
        );

        $this->channel->basic_publish($msg, '', $routingKey);

        $startTime = time();

        while (!isset($this->responses[$corrId])) {
            if (time() - $startTime > $timeout) {
                throw new RuntimeException("请求超时 ({$timeout}秒)");
            }

            $remainingTime = $timeout - (time() - $startTime);
            $this->channel->wait(null, false, $remainingTime);
        }

        $response = $this->responses[$corrId];
        unset($this->responses[$corrId]);

        return json_decode($response, true);
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$client = new RpcClientWithTimeout(10);

try {
    $response = $client->call('user_service', ['action' => 'get', 'user_id' => 123]);
    echo "响应: " . json_encode($response, JSON_UNESCAPED_UNICODE) . "\n";
} catch (RuntimeException $e) {
    echo "错误: " . $e->getMessage() . "\n";
} finally {
    $client->close();
}

实际应用场景

1. 微服务间同步调用

php
<?php

class UserServiceClient
{
    private $rpcClient;

    public function __construct()
    {
        $this->rpcClient = new RpcClientWithTimeout();
    }

    public function getUser($userId)
    {
        return $this->rpcClient->call('user.get', ['user_id' => $userId]);
    }

    public function createUser($userData)
    {
        return $this->rpcClient->call('user.create', $userData);
    }

    public function updateUser($userId, $userData)
    {
        return $this->rpcClient->call('user.update', array_merge(['user_id' => $userId], $userData));
    }
}

2. 分布式任务状态查询

php
<?php

class TaskStatusChecker
{
    private $rpcClient;

    public function __construct()
    {
        $this->rpcClient = new RpcClientWithTimeout(30);
    }

    public function checkProgress($taskId)
    {
        $response = $this->rpcClient->call('task.status', ['task_id' => $taskId]);

        return [
            'task_id' => $taskId,
            'status' => $response['status'] ?? 'unknown',
            'progress' => $response['progress'] ?? 0,
            'result' => $response['result'] ?? null
        ];
    }
}

3. 实时数据验证

php
<?php

class DataValidator
{
    private $rpcClient;

    public function __construct()
    {
        $this->rpcClient = new RpcClientWithTimeout(5);
    }

    public function validateEmail($email)
    {
        try {
            $response = $this->rpcClient->call('validator.email', ['email' => $email]);
            return $response['valid'] ?? false;
        } catch (Exception $e) {
            return false;
        }
    }

    public function validatePhone($phone)
    {
        try {
            $response = $this->rpcClient->call('validator.phone', ['phone' => $phone]);
            return $response['valid'] ?? false;
        } catch (Exception $e) {
            return false;
        }
    }
}

常见问题与解决方案

问题 1:临时队列未正确清理

原因:连接异常断开时,临时队列可能残留

解决方案

php
<?php

class SafeTemporaryQueue
{
    private $connection;
    private $channel;
    private $queueName;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        list($this->queueName, ,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );
    }

    public function __destruct()
    {
        try {
            if ($this->channel && $this->channel->is_open()) {
                $this->channel->close();
            }
            if ($this->connection && $this->connection->isConnected()) {
                $this->connection->close();
            }
        } catch (Exception $e) {
            error_log("清理临时队列失败: " . $e->getMessage());
        }
    }

    public function getQueueName()
    {
        return $this->queueName;
    }
}

问题 2:响应丢失或超时

原因:网络延迟或服务端处理时间过长

解决方案

php
<?php

class ReliableRpcClient
{
    private $connection;
    private $channel;
    private $callbackQueue;
    private $maxRetries = 3;
    private $baseTimeout = 5;

    public function callWithRetry($routingKey, $message)
    {
        $lastException = null;

        for ($attempt = 1; $attempt <= $this->maxRetries; $attempt++) {
            try {
                $timeout = $this->baseTimeout * $attempt;
                return $this->call($routingKey, $message, $timeout);
            } catch (RuntimeException $e) {
                $lastException = $e;
                error_log("第 {$attempt} 次尝试失败: " . $e->getMessage());

                if ($attempt < $this->maxRetries) {
                    usleep(500000 * $attempt);
                }
            }
        }

        throw $lastException;
    }

    private function call($routingKey, $message, $timeout)
    {
        // ... 实现调用逻辑
    }
}

问题 3:并发请求时 correlation_id 冲突

原因:多线程环境下可能生成相同的 ID

解决方案

php
<?php

class CorrelationIdGenerator
{
    private static $counter = 0;

    public static function generate()
    {
        self::$counter++;
        return sprintf(
            '%s-%d-%d-%s',
            getmypid(),
            time(),
            self::$counter,
            bin2hex(random_bytes(4))
        );
    }
}

class ThreadSafeRpcClient
{
    public function call($routingKey, $message)
    {
        $corrId = CorrelationIdGenerator::generate();

        // ... 使用生成的 correlation_id
    }
}

最佳实践建议

1. 合理设置超时时间

php
<?php

class TimeoutConfig
{
    const FAST_OPERATION = 2;
    const NORMAL_OPERATION = 5;
    const SLOW_OPERATION = 30;
    const BATCH_OPERATION = 60;
}

class ConfigurableRpcClient
{
    public function fastCall($routingKey, $message)
    {
        return $this->call($routingKey, $message, TimeoutConfig::FAST_OPERATION);
    }

    public function normalCall($routingKey, $message)
    {
        return $this->call($routingKey, $message, TimeoutConfig::NORMAL_OPERATION);
    }

    public function slowCall($routingKey, $message)
    {
        return $this->call($routingKey, $message, TimeoutConfig::SLOW_OPERATION);
    }
}

2. 使用连接池管理

php
<?php

class RpcConnectionPool
{
    private static $instances = [];
    private static $maxConnections = 10;

    public static function getConnection($host = 'localhost')
    {
        $key = md5($host);

        if (!isset(self::$instances[$key])) {
            if (count(self::$instances) >= self::$maxConnections) {
                throw new RuntimeException("连接池已满");
            }

            self::$instances[$key] = new AMQPStreamConnection(
                $host,
                5672,
                'guest',
                'guest'
            );
        }

        return self::$instances[$key];
    }

    public static function closeAll()
    {
        foreach (self::$instances as $connection) {
            try {
                $connection->close();
            } catch (Exception $e) {
                error_log("关闭连接失败: " . $e->getMessage());
            }
        }
        self::$instances = [];
    }
}

3. 实现优雅的错误处理

php
<?php

class RobustRpcClient
{
    private $rpcClient;

    public function call($routingKey, $message)
    {
        try {
            $response = $this->rpcClient->call($routingKey, $message);

            if (isset($response['error'])) {
                throw new RpcException(
                    $response['error']['message'],
                    $response['error']['code'] ?? 0
                );
            }

            return $response['data'] ?? $response;
        } catch (RuntimeException $e) {
            throw new RpcException("RPC 调用失败: " . $e->getMessage(), 0, $e);
        }
    }
}

class RpcException extends Exception
{
    public function __construct($message, $code = 0, $previous = null)
    {
        parent::__construct($message, $code, $previous);
    }
}

4. 监控和日志记录

php
<?php

class MonitoredRpcClient
{
    private $rpcClient;
    private $logger;

    public function call($routingKey, $message)
    {
        $startTime = microtime(true);
        $corrId = uniqid();

        $this->logger->info("RPC 请求开始", [
            'correlation_id' => $corrId,
            'routing_key' => $routingKey,
            'message' => $message
        ]);

        try {
            $response = $this->rpcClient->call($routingKey, $message);

            $duration = round((microtime(true) - $startTime) * 1000, 2);

            $this->logger->info("RPC 请求成功", [
                'correlation_id' => $corrId,
                'duration_ms' => $duration
            ]);

            return $response;
        } catch (Exception $e) {
            $duration = round((microtime(true) - $startTime) * 1000, 2);

            $this->logger->error("RPC 请求失败", [
                'correlation_id' => $corrId,
                'duration_ms' => $duration,
                'error' => $e->getMessage()
            ]);

            throw $e;
        }
    }
}

相关链接