Skip to content

RabbitMQ 连接与通道(Connection & Channel)

概述

连接(Connection)和通道(Channel)是 RabbitMQ 客户端与 Broker 通信的基础设施。Connection 是客户端与 RabbitMQ 之间的 TCP 连接,Channel 是建立在 Connection 之上的轻量级连接。理解它们的工作机制对于构建高性能、可靠的消息系统至关重要。

连接与通道的关系

mermaid
graph TB
    subgraph 应用程序
        A[Application]
    end
    
    subgraph 连接层
        C1[Channel 1]
        C2[Channel 2]
        C3[Channel 3]
        C4[Channel N]
    end
    
    subgraph 传输层
        CONN[TCP Connection]
    end
    
    subgraph RabbitMQ Broker
        R[Broker]
    end
    
    A --> C1
    A --> C2
    A --> C3
    A --> C4
    C1 --> CONN
    C2 --> CONN
    C3 --> CONN
    C4 --> CONN
    CONN -->|AMQP Protocol| R

核心知识点

1. Connection(连接)

Connection 是客户端与 RabbitMQ Broker 之间的 TCP 连接:

mermaid
graph LR
    subgraph Connection 生命周期
        A[创建连接] --> B[握手认证]
        B --> C[建立通道]
        C --> D[通信]
        D --> E[心跳检测]
        E --> F[关闭连接]
    end

Connection 特点

  • 基于 TCP 协议
  • 需要认证(用户名、密码、Vhost)
  • 支持心跳检测
  • 资源消耗较大(建议复用)

Connection 参数

参数说明默认值
hostRabbitMQ 主机地址localhost
port端口号5672
user用户名guest
password密码guest
vhost虚拟主机/
heartbeat心跳间隔(秒)60
connection_timeout连接超时(秒)3.0
read_write_timeout读写超时(秒)130.0

2. Channel(通道)

Channel 是建立在 Connection 之上的虚拟连接:

mermaid
graph TB
    subgraph 单 Connection 多 Channel
        CONN[Connection]
        CONN --> CH1[Channel 1 - 生产者]
        CONN --> CH2[Channel 2 - 消费者]
        CONN --> CH3[Channel 3 - 管理]
        CONN --> CH4[Channel 4 - 事务]
    end

Channel 特点

  • 轻量级,资源消耗小
  • 每个 Channel 有独立的 ID
  • 支持并发操作
  • 同一 Channel 上的操作是串行的

Channel 用途

  • 发布消息
  • 消费消息
  • 声明交换机和队列
  • 创建绑定
  • 事务操作

3. 连接生命周期

mermaid
sequenceDiagram
    participant C as Client
    participant B as Broker
    
    C->>B: 1. TCP 连接
    B->>C: 2. 连接确认
    C->>B: 3. 协议头
    B->>C: 4. 协议响应
    C->>B: 5. 认证请求
    B->>C: 6. 认证成功
    C->>B: 7. 打开 Vhost
    B->>C: 8. Vhost 打开确认
    C->>B: 9. 心跳检测
    B->>C: 10. 心跳响应
    C->>B: 11. 关闭连接
    B->>C: 12. 关闭确认

4. 心跳机制

mermaid
graph TB
    subgraph 心跳检测
        A[发送心跳帧] --> B{收到响应?}
        B -->|是| C[重置计数器]
        B -->|否| D[计数器+1]
        D --> E{超过阈值?}
        E -->|是| F[断开连接]
        E -->|否| A
        C --> A
    end

心跳机制的作用:

  • 检测连接是否存活
  • 防止防火墙断开空闲连接
  • 及时发现网络问题

5. 连接池化

mermaid
graph TB
    subgraph 连接池
        P[Pool]
        P --> C1[Connection 1]
        P --> C2[Connection 2]
        P --> C3[Connection 3]
        
        C1 --> CH1[Channels]
        C2 --> CH2[Channels]
        C3 --> CH3[Channels]
    end
    
    subgraph 应用
        A1[Thread 1]
        A2[Thread 2]
        A3[Thread 3]
    end
    
    A1 --> P
    A2 --> P
    A3 --> P

代码示例

基础连接创建

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Connection\AMQPSocketConnection;

class ConnectionFactory
{
    public static function createStreamConnection(array $config = []): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/',
            $config['insist'] ?? false,
            $config['login_method'] ?? 'AMQPLAIN',
            $config['login_response'] ?? null,
            $config['locale'] ?? 'en_US',
            $config['connection_timeout'] ?? 3.0,
            $config['read_write_timeout'] ?? 130.0,
            $config['context'] ?? null,
            $config['keepalive'] ?? true,
            $config['heartbeat'] ?? 60
        );
    }
    
    public static function createSocketConnection(array $config = []): AMQPSocketConnection
    {
        return new AMQPSocketConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/',
            $config['insist'] ?? false,
            $config['login_method'] ?? 'AMQPLAIN',
            $config['login_response'] ?? null,
            $config['locale'] ?? 'en_US',
            $config['read_timeout'] ?? 130,
            $config['write_timeout'] ?? 130,
            $config['heartbeat'] ?? 60
        );
    }
}

try {
    $connection = ConnectionFactory::createStreamConnection([
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
        'heartbeat' => 60
    ]);
    
    echo "Connection established successfully\n";
    echo "Server properties: " . json_encode($connection->getServerProperties()) . "\n";
    
    $connection->close();
} catch (Exception $e) {
    echo "Connection failed: " . $e->getMessage() . "\n";
}

Channel 操作

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class ChannelManager
{
    private $connection;
    private $channels = [];
    
    public function __construct(AMQPStreamConnection $connection)
    {
        $this->connection = $connection;
    }
    
    public function createChannel(int $id = null)
    {
        $channel = $this->connection->channel($id);
        
        $channelId = $channel->getChannelId();
        $this->channels[$channelId] = $channel;
        
        echo "Channel {$channelId} created\n";
        
        return $channel;
    }
    
    public function getChannel(int $id)
    {
        return $this->channels[$id] ?? null;
    }
    
    public function closeChannel(int $id): void
    {
        if (isset($this->channels[$id])) {
            $this->channels[$id]->close();
            unset($this->channels[$id]);
            
            echo "Channel {$id} closed\n";
        }
    }
    
    public function closeAllChannels(): void
    {
        foreach ($this->channels as $id => $channel) {
            $channel->close();
            echo "Channel {$id} closed\n";
        }
        
        $this->channels = [];
    }
    
    public function getActiveChannelCount(): int
    {
        return count($this->channels);
    }
}

$connection = ConnectionFactory::createStreamConnection();
$manager = new ChannelManager($connection);

$channel1 = $manager->createChannel();
$channel2 = $manager->createChannel();

echo "Active channels: " . $manager->getActiveChannelCount() . "\n";

$manager->closeAllChannels();
$connection->close();

连接池实现

php
<?php

class ConnectionPool
{
    private static $instance = null;
    private $connections = [];
    private $config;
    private $maxConnections = 5;
    private $currentConnections = 0;
    
    private function __construct(array $config)
    {
        $this->config = $config;
    }
    
    public static function getInstance(array $config = []): self
    {
        if (self::$instance === null) {
            self::$instance = new self($config);
        }
        return self::$instance;
    }
    
    public function getConnection(): AMQPStreamConnection
    {
        foreach ($this->connections as $conn) {
            if ($conn['in_use'] === false && $conn['connection']->isConnected()) {
                $conn['in_use'] = true;
                return $conn['connection'];
            }
        }
        
        if ($this->currentConnections < $this->maxConnections) {
            $connection = $this->createNewConnection();
            $this->connections[] = [
                'connection' => $connection,
                'in_use' => true
            ];
            $this->currentConnections++;
            return $connection;
        }
        
        throw new RuntimeException('Connection pool exhausted');
    }
    
    public function releaseConnection(AMQPStreamConnection $connection): void
    {
        foreach ($this->connections as &$conn) {
            if ($conn['connection'] === $connection) {
                $conn['in_use'] = false;
                break;
            }
        }
    }
    
    private function createNewConnection(): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            $this->config['host'] ?? 'localhost',
            $this->config['port'] ?? 5672,
            $this->config['user'] ?? 'guest',
            $this->config['password'] ?? 'guest',
            $this->config['vhost'] ?? '/',
            false,
            'AMQPLAIN',
            null,
            'en_US',
            3.0,
            130.0,
            null,
            true,
            60
        );
    }
    
    public function closeAll(): void
    {
        foreach ($this->connections as $conn) {
            try {
                $conn['connection']->close();
            } catch (Exception $e) {
            }
        }
        
        $this->connections = [];
        $this->currentConnections = 0;
    }
    
    public function getStats(): array
    {
        return [
            'total_connections' => $this->currentConnections,
            'active_connections' => count(array_filter($this->connections, fn($c) => $c['in_use'])),
            'max_connections' => $this->maxConnections
        ];
    }
}

$pool = ConnectionPool::getInstance([
    'host' => 'localhost',
    'user' => 'guest',
    'password' => 'guest'
]);

$conn1 = $pool->getConnection();
$conn2 = $pool->getConnection();

print_r($pool->getStats());

$pool->releaseConnection($conn1);

print_r($pool->getStats());

$pool->closeAll();

心跳配置

php
<?php

class HeartbeatConfiguration
{
    public static function createConnectionWithHeartbeat(int $heartbeatSeconds = 60): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest',
            '/',
            false,
            'AMQPLAIN',
            null,
            'en_US',
            3.0,
            $heartbeatSeconds * 2 + 10,
            null,
            true,
            $heartbeatSeconds
        );
    }
    
    public static function setupHeartbeatHandler($connection, callable $onTimeout = null): void
    {
        $connection->set_close_handler(function ($replyCode, $replyText) use ($onTimeout) {
            echo "Connection closed: {$replyCode} - {$replyText}\n";
            
            if ($onTimeout) {
                $onTimeout($replyCode, $replyText);
            }
        });
    }
    
    public static function checkConnectionHealth(AMQPStreamConnection $connection): array
    {
        return [
            'is_connected' => $connection->isConnected(),
            'server_properties' => $connection->getServerProperties(),
            'vhost' => $connection->getVhost()
        ];
    }
}

$connection = HeartbeatConfiguration::createConnectionWithHeartbeat(30);

HeartbeatConfiguration::setupHeartbeatHandler($connection, function ($code, $text) {
    echo "Heartbeat timeout detected: {$code} - {$text}\n";
});

print_r(HeartbeatConfiguration::checkConnectionHealth($connection));

$connection->close();

自动重连机制

php
<?php

class ReconnectingConnection
{
    private $config;
    private $connection = null;
    private $channel = null;
    private $maxRetries = 5;
    private $retryDelay = 1000;
    private $listeners = [];
    
    public function __construct(array $config)
    {
        $this->config = $config;
        $this->connect();
    }
    
    private function connect(): void
    {
        $attempt = 0;
        
        while ($attempt < $this->maxRetries) {
            try {
                $this->connection = new AMQPStreamConnection(
                    $this->config['host'],
                    $this->config['port'],
                    $this->config['user'],
                    $this->config['password'],
                    $this->config['vhost'],
                    false,
                    'AMQPLAIN',
                    null,
                    'en_US',
                    3.0,
                    130.0,
                    null,
                    true,
                    $this->config['heartbeat'] ?? 60
                );
                
                $this->setupCloseHandler();
                
                echo "Connected successfully\n";
                return;
            } catch (Exception $e) {
                $attempt++;
                echo "Connection attempt {$attempt} failed: {$e->getMessage()}\n";
                
                if ($attempt < $this->maxRetries) {
                    usleep($this->retryDelay * 1000 * $attempt);
                }
            }
        }
        
        throw new RuntimeException("Failed to connect after {$this->maxRetries} attempts");
    }
    
    private function setupCloseHandler(): void
    {
        $this->connection->set_close_handler(function ($replyCode, $replyText) {
            echo "Connection closed: {$replyCode} - {$replyText}\n";
            $this->handleDisconnect();
        });
    }
    
    private function handleDisconnect(): void
    {
        $this->connection = null;
        $this->channel = null;
        
        $this->notifyListeners('disconnect');
        
        echo "Attempting to reconnect...\n";
        
        try {
            $this->connect();
            $this->notifyListeners('reconnect');
        } catch (Exception $e) {
            echo "Reconnection failed: {$e->getMessage()}\n";
        }
    }
    
    public function getChannel()
    {
        if ($this->channel === null || !$this->isConnected()) {
            $this->channel = $this->connection->channel();
        }
        
        return $this->channel;
    }
    
    public function isConnected(): bool
    {
        return $this->connection !== null && $this->connection->isConnected();
    }
    
    public function addListener(string $event, callable $callback): void
    {
        $this->listeners[$event][] = $callback;
    }
    
    private function notifyListeners(string $event): void
    {
        if (isset($this->listeners[$event])) {
            foreach ($this->listeners[$event] as $callback) {
                $callback();
            }
        }
    }
    
    public function close(): void
    {
        if ($this->channel) {
            $this->channel->close();
        }
        if ($this->connection) {
            $this->connection->close();
        }
    }
}

$reconnecting = new ReconnectingConnection([
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'password' => 'guest',
    'vhost' => '/',
    'heartbeat' => 30
]);

$reconnecting->addListener('disconnect', function () {
    echo "Disconnected from RabbitMQ\n";
});

$reconnecting->addListener('reconnect', function () {
    echo "Reconnected to RabbitMQ\n";
});

$channel = $reconnecting->getChannel();

echo "Is connected: " . ($reconnecting->isConnected() ? 'Yes' : 'No') . "\n";

$reconnecting->close();

多线程安全连接

php
<?php

class ThreadSafeConnectionManager
{
    private static $connections = [];
    private static $locks = [];
    
    public static function getConnection(string $name = 'default', array $config = []): AMQPStreamConnection
    {
        if (!isset(self::$connections[$name])) {
            self::$connections[$name] = self::createConnection($config);
        }
        
        $connection = self::$connections[$name];
        
        if (!$connection->isConnected()) {
            self::$connections[$name] = self::createConnection($config);
        }
        
        return self::$connections[$name];
    }
    
    public static function getChannel(string $connectionName = 'default')
    {
        $connection = self::getConnection($connectionName);
        return $connection->channel();
    }
    
    private static function createConnection(array $config): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
    }
    
    public static function closeAll(): void
    {
        foreach (self::$connections as $name => $connection) {
            try {
                $connection->close();
                echo "Closed connection: {$name}\n";
            } catch (Exception $e) {
                echo "Error closing connection {$name}: {$e->getMessage()}\n";
            }
        }
        
        self::$connections = [];
    }
    
    public static function getStats(): array
    {
        $stats = [];
        
        foreach (self::$connections as $name => $connection) {
            $stats[$name] = [
                'connected' => $connection->isConnected()
            ];
        }
        
        return $stats;
    }
}

实际应用场景

1. Web 应用连接管理

php
<?php

class WebAppConnectionManager
{
    private static $instance = null;
    private $connection;
    private $channels = [];
    
    private function __construct(array $config)
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'],
            $config['port'],
            $config['user'],
            $config['password'],
            $config['vhost']
        );
    }
    
    public static function getInstance(array $config = []): self
    {
        if (self::$instance === null) {
            self::$instance = new self($config);
        }
        return self::$instance;
    }
    
    public function getProducerChannel()
    {
        if (!isset($this->channels['producer'])) {
            $this->channels['producer'] = $this->connection->channel();
            $this->channels['producer']->confirm_select();
        }
        return $this->channels['producer'];
    }
    
    public function getConsumerChannel()
    {
        if (!isset($this->channels['consumer'])) {
            $this->channels['consumer'] = $this->connection->channel();
        }
        return $this->channels['consumer'];
    }
    
    public function getAdminChannel()
    {
        if (!isset($this->channels['admin'])) {
            $this->channels['admin'] = $this->connection->channel();
        }
        return $this->channels['admin'];
    }
    
    public function __destruct()
    {
        foreach ($this->channels as $channel) {
            try {
                $channel->close();
            } catch (Exception $e) {
            }
        }
        
        try {
            $this->connection->close();
        } catch (Exception $e) {
        }
    }
}

$manager = WebAppConnectionManager::getInstance([
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'password' => 'guest',
    'vhost' => '/'
]);

$producerChannel = $manager->getProducerChannel();
$consumerChannel = $manager->getConsumerChannel();

2. 后台任务连接管理

php
<?php

class WorkerConnectionManager
{
    private $connection;
    private $channel;
    private $config;
    
    public function __construct(array $config)
    {
        $this->config = $config;
        $this->connect();
    }
    
    private function connect(): void
    {
        $this->connection = new AMQPStreamConnection(
            $this->config['host'],
            $this->config['port'],
            $this->config['user'],
            $this->config['password'],
            $this->config['vhost'],
            false,
            'AMQPLAIN',
            null,
            'en_US',
            3.0,
            0,
            null,
            false,
            0
        );
        
        $this->channel = $this->connection->channel();
        
        $this->channel->basic_qos(null, $this->config['prefetch_count'] ?? 1, null);
        
        echo "Worker connected to RabbitMQ\n";
    }
    
    public function getChannel()
    {
        if (!$this->connection->isConnected()) {
            $this->connect();
        }
        
        return $this->channel;
    }
    
    public function close(): void
    {
        if ($this->channel) {
            $this->channel->close();
        }
        if ($this->connection) {
            $this->connection->close();
        }
    }
}

3. 高可用连接配置

php
<?php

class HighAvailabilityConnection
{
    private $hosts;
    private $currentHostIndex = 0;
    private $connection = null;
    private $config;
    
    public function __construct(array $hosts, array $config = [])
    {
        $this->hosts = $hosts;
        $this->config = $config;
        $this->connect();
    }
    
    private function connect(): void
    {
        $connected = false;
        $attempts = 0;
        $maxAttempts = count($this->hosts) * 3;
        
        while (!$connected && $attempts < $maxAttempts) {
            $host = $this->hosts[$this->currentHostIndex];
            
            try {
                $this->connection = new AMQPStreamConnection(
                    $host['host'],
                    $host['port'] ?? 5672,
                    $this->config['user'] ?? 'guest',
                    $this->config['password'] ?? 'guest',
                    $this->config['vhost'] ?? '/',
                    false,
                    'AMQPLAIN',
                    null,
                    'en_US',
                    3.0,
                    130.0,
                    null,
                    true,
                    $this->config['heartbeat'] ?? 60
                );
                
                $this->setupFailover();
                $connected = true;
                
                echo "Connected to {$host['host']}\n";
            } catch (Exception $e) {
                echo "Failed to connect to {$host['host']}: {$e->getMessage()}\n";
                
                $this->currentHostIndex = ($this->currentHostIndex + 1) % count($this->hosts);
                $attempts++;
                
                if ($attempts < $maxAttempts) {
                    sleep(1);
                }
            }
        }
        
        if (!$connected) {
            throw new RuntimeException('Failed to connect to any RabbitMQ host');
        }
    }
    
    private function setupFailover(): void
    {
        $this->connection->set_close_handler(function ($code, $text) {
            echo "Connection lost: {$code} - {$text}\n";
            $this->handleFailover();
        });
    }
    
    private function handleFailover(): void
    {
        echo "Attempting failover...\n";
        
        $this->connection = null;
        
        $this->currentHostIndex = ($this->currentHostIndex + 1) % count($this->hosts);
        
        try {
            $this->connect();
        } catch (Exception $e) {
            echo "Failover failed: {$e->getMessage()}\n";
        }
    }
    
    public function getConnection(): AMQPStreamConnection
    {
        if (!$this->connection || !$this->connection->isConnected()) {
            $this->connect();
        }
        
        return $this->connection;
    }
    
    public function getChannel()
    {
        return $this->getConnection()->channel();
    }
    
    public function close(): void
    {
        if ($this->connection) {
            $this->connection->close();
        }
    }
}

$haConnection = new HighAvailabilityConnection(
    [
        ['host' => 'rabbitmq1.example.com', 'port' => 5672],
        ['host' => 'rabbitmq2.example.com', 'port' => 5672],
        ['host' => 'rabbitmq3.example.com', 'port' => 5672]
    ],
    [
        'user' => 'admin',
        'password' => 'admin123',
        'vhost' => '/production',
        'heartbeat' => 30
    ]
);

$channel = $haConnection->getChannel();

$haConnection->close();

常见问题与解决方案

1. 连接超时

问题原因

  • 网络问题
  • Broker 未启动
  • 防火墙阻止

解决方案

php
<?php

class TimeoutResilientConnection
{
    public static function connectWithTimeout(array $config, float $timeout = 5.0): ?AMQPStreamConnection
    {
        $startTime = microtime(true);
        
        while (true) {
            try {
                return new AMQPStreamConnection(
                    $config['host'],
                    $config['port'],
                    $config['user'],
                    $config['password'],
                    $config['vhost'],
                    false,
                    'AMQPLAIN',
                    null,
                    'en_US',
                    min($timeout, 3.0),
                    $timeout * 2
                );
            } catch (Exception $e) {
                if (microtime(true) - $startTime >= $timeout) {
                    throw new RuntimeException("Connection timeout after {$timeout} seconds");
                }
                
                usleep(500000);
            }
        }
    }
}

2. Channel 泄漏

问题原因

  • Channel 未正确关闭
  • 异常导致 Channel 未释放

解决方案

php
<?php

class ChannelPool
{
    private $connection;
    private $availableChannels = [];
    private $usedChannels = [];
    private $maxChannels = 100;
    
    public function __construct(AMQPStreamConnection $connection)
    {
        $this->connection = $connection;
    }
    
    public function acquire()
    {
        if (!empty($this->availableChannels)) {
            $channel = array_pop($this->availableChannels);
            $this->usedChannels[] = $channel;
            return $channel;
        }
        
        if (count($this->usedChannels) < $this->maxChannels) {
            $channel = $this->connection->channel();
            $this->usedChannels[] = $channel;
            return $channel;
        }
        
        throw new RuntimeException('Channel pool exhausted');
    }
    
    public function release($channel): void
    {
        $index = array_search($channel, $this->usedChannels, true);
        
        if ($index !== false) {
            unset($this->usedChannels[$index]);
            $this->availableChannels[] = $channel;
        }
    }
    
    public function __destruct()
    {
        foreach ($this->usedChannels as $channel) {
            $channel->close();
        }
        
        foreach ($this->availableChannels as $channel) {
            $channel->close();
        }
    }
}

3. 内存泄漏

问题原因

  • 长时间运行的消费者
  • 消息未正确处理

解决方案

php
<?php

class MemoryManagedConsumer
{
    private $connection;
    private $channel;
    private $messageCount = 0;
    private $restartThreshold = 10000;
    
    public function __construct(AMQPStreamConnection $connection)
    {
        $this->connection = $connection;
        $this->channel = $connection->channel();
    }
    
    public function consume(string $queue, callable $processor): void
    {
        $this->channel->basic_qos(null, 1, null);
        
        $this->channel->basic_consume($queue, '', false, false, false, false, function ($message) use ($processor, $queue) {
            $processor($message);
            $message->ack();
            
            $this->messageCount++;
            
            if ($this->messageCount >= $this->restartThreshold) {
                $this->restartConsumer($queue, $processor);
            }
        });
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    private function restartConsumer(string $queue, callable $processor): void
    {
        echo "Restarting consumer after {$this->messageCount} messages\n";
        
        $this->channel->close();
        $this->channel = $this->connection->channel();
        $this->messageCount = 0;
        
        gc_collect_cycles();
    }
}

最佳实践建议

1. 连接配置优化

php
<?php

class OptimalConnectionConfig
{
    public static function forWebApplication(): array
    {
        return [
            'connection_timeout' => 3.0,
            'read_write_timeout' => 130.0,
            'heartbeat' => 60,
            'keepalive' => true
        ];
    }
    
    public static function forLongRunningWorker(): array
    {
        return [
            'connection_timeout' => 3.0,
            'read_write_timeout' => 0,
            'heartbeat' => 0,
            'keepalive' => true
        ];
    }
    
    public static function forHighThroughput(): array
    {
        return [
            'connection_timeout' => 3.0,
            'read_write_timeout' => 60.0,
            'heartbeat' => 30,
            'keepalive' => true
        ];
    }
}

2. Channel 使用规范

php
<?php

class ChannelBestPractices
{
    public static function createProducerChannel($connection)
    {
        $channel = $connection->channel();
        $channel->confirm_select();
        return $channel;
    }
    
    public static function createConsumerChannel($connection, int $prefetchCount = 1)
    {
        $channel = $connection->channel();
        $channel->basic_qos(null, $prefetchCount, null);
        return $channel;
    }
    
    public static function createTransactionalChannel($connection)
    {
        $channel = $connection->channel();
        $channel->tx_select();
        return $channel;
    }
}

3. 连接监控

php
<?php

class ConnectionMonitor
{
    private $apiUrl;
    private $credentials;
    
    public function __construct(string $host, string $user, string $password)
    {
        $this->apiUrl = "http://{$host}:15672/api";
        $this->credentials = base64_encode("{$user}:{$password}");
    }
    
    public function getConnectionCount(): int
    {
        $url = "{$this->apiUrl}/connections";
        
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_HTTPHEADER, [
            "Authorization: Basic {$this->credentials}"
        ]);
        
        $response = curl_exec($ch);
        curl_close($ch);
        
        $connections = json_decode($response, true);
        
        return is_array($connections) ? count($connections) : 0;
    }
    
    public function getChannelCount(): int
    {
        $url = "{$this->apiUrl}/channels";
        
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_HTTPHEADER, [
            "Authorization: Basic {$this->credentials}"
        ]);
        
        $response = curl_exec($ch);
        curl_close($ch);
        
        $channels = json_decode($response, true);
        
        return is_array($channels) ? count($channels) : 0;
    }
}

相关链接