Appearance
RabbitMQ 连接与通道(Connection & Channel)
概述
连接(Connection)和通道(Channel)是 RabbitMQ 客户端与 Broker 通信的基础设施。Connection 是客户端与 RabbitMQ 之间的 TCP 连接,Channel 是建立在 Connection 之上的轻量级连接。理解它们的工作机制对于构建高性能、可靠的消息系统至关重要。
连接与通道的关系
mermaid
graph TB
subgraph 应用程序
A[Application]
end
subgraph 连接层
C1[Channel 1]
C2[Channel 2]
C3[Channel 3]
C4[Channel N]
end
subgraph 传输层
CONN[TCP Connection]
end
subgraph RabbitMQ Broker
R[Broker]
end
A --> C1
A --> C2
A --> C3
A --> C4
C1 --> CONN
C2 --> CONN
C3 --> CONN
C4 --> CONN
CONN -->|AMQP Protocol| R核心知识点
1. Connection(连接)
Connection 是客户端与 RabbitMQ Broker 之间的 TCP 连接:
mermaid
graph LR
subgraph Connection 生命周期
A[创建连接] --> B[握手认证]
B --> C[建立通道]
C --> D[通信]
D --> E[心跳检测]
E --> F[关闭连接]
endConnection 特点:
- 基于 TCP 协议
- 需要认证(用户名、密码、Vhost)
- 支持心跳检测
- 资源消耗较大(建议复用)
Connection 参数:
| 参数 | 说明 | 默认值 |
|---|---|---|
| host | RabbitMQ 主机地址 | localhost |
| port | 端口号 | 5672 |
| user | 用户名 | guest |
| password | 密码 | guest |
| vhost | 虚拟主机 | / |
| heartbeat | 心跳间隔(秒) | 60 |
| connection_timeout | 连接超时(秒) | 3.0 |
| read_write_timeout | 读写超时(秒) | 130.0 |
2. Channel(通道)
Channel 是建立在 Connection 之上的虚拟连接:
mermaid
graph TB
subgraph 单 Connection 多 Channel
CONN[Connection]
CONN --> CH1[Channel 1 - 生产者]
CONN --> CH2[Channel 2 - 消费者]
CONN --> CH3[Channel 3 - 管理]
CONN --> CH4[Channel 4 - 事务]
endChannel 特点:
- 轻量级,资源消耗小
- 每个 Channel 有独立的 ID
- 支持并发操作
- 同一 Channel 上的操作是串行的
Channel 用途:
- 发布消息
- 消费消息
- 声明交换机和队列
- 创建绑定
- 事务操作
3. 连接生命周期
mermaid
sequenceDiagram
participant C as Client
participant B as Broker
C->>B: 1. TCP 连接
B->>C: 2. 连接确认
C->>B: 3. 协议头
B->>C: 4. 协议响应
C->>B: 5. 认证请求
B->>C: 6. 认证成功
C->>B: 7. 打开 Vhost
B->>C: 8. Vhost 打开确认
C->>B: 9. 心跳检测
B->>C: 10. 心跳响应
C->>B: 11. 关闭连接
B->>C: 12. 关闭确认4. 心跳机制
mermaid
graph TB
subgraph 心跳检测
A[发送心跳帧] --> B{收到响应?}
B -->|是| C[重置计数器]
B -->|否| D[计数器+1]
D --> E{超过阈值?}
E -->|是| F[断开连接]
E -->|否| A
C --> A
end心跳机制的作用:
- 检测连接是否存活
- 防止防火墙断开空闲连接
- 及时发现网络问题
5. 连接池化
mermaid
graph TB
subgraph 连接池
P[Pool]
P --> C1[Connection 1]
P --> C2[Connection 2]
P --> C3[Connection 3]
C1 --> CH1[Channels]
C2 --> CH2[Channels]
C3 --> CH3[Channels]
end
subgraph 应用
A1[Thread 1]
A2[Thread 2]
A3[Thread 3]
end
A1 --> P
A2 --> P
A3 --> P代码示例
基础连接创建
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Connection\AMQPSocketConnection;
class ConnectionFactory
{
public static function createStreamConnection(array $config = []): AMQPStreamConnection
{
return new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/',
$config['insist'] ?? false,
$config['login_method'] ?? 'AMQPLAIN',
$config['login_response'] ?? null,
$config['locale'] ?? 'en_US',
$config['connection_timeout'] ?? 3.0,
$config['read_write_timeout'] ?? 130.0,
$config['context'] ?? null,
$config['keepalive'] ?? true,
$config['heartbeat'] ?? 60
);
}
public static function createSocketConnection(array $config = []): AMQPSocketConnection
{
return new AMQPSocketConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/',
$config['insist'] ?? false,
$config['login_method'] ?? 'AMQPLAIN',
$config['login_response'] ?? null,
$config['locale'] ?? 'en_US',
$config['read_timeout'] ?? 130,
$config['write_timeout'] ?? 130,
$config['heartbeat'] ?? 60
);
}
}
try {
$connection = ConnectionFactory::createStreamConnection([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'heartbeat' => 60
]);
echo "Connection established successfully\n";
echo "Server properties: " . json_encode($connection->getServerProperties()) . "\n";
$connection->close();
} catch (Exception $e) {
echo "Connection failed: " . $e->getMessage() . "\n";
}Channel 操作
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class ChannelManager
{
private $connection;
private $channels = [];
public function __construct(AMQPStreamConnection $connection)
{
$this->connection = $connection;
}
public function createChannel(int $id = null)
{
$channel = $this->connection->channel($id);
$channelId = $channel->getChannelId();
$this->channels[$channelId] = $channel;
echo "Channel {$channelId} created\n";
return $channel;
}
public function getChannel(int $id)
{
return $this->channels[$id] ?? null;
}
public function closeChannel(int $id): void
{
if (isset($this->channels[$id])) {
$this->channels[$id]->close();
unset($this->channels[$id]);
echo "Channel {$id} closed\n";
}
}
public function closeAllChannels(): void
{
foreach ($this->channels as $id => $channel) {
$channel->close();
echo "Channel {$id} closed\n";
}
$this->channels = [];
}
public function getActiveChannelCount(): int
{
return count($this->channels);
}
}
$connection = ConnectionFactory::createStreamConnection();
$manager = new ChannelManager($connection);
$channel1 = $manager->createChannel();
$channel2 = $manager->createChannel();
echo "Active channels: " . $manager->getActiveChannelCount() . "\n";
$manager->closeAllChannels();
$connection->close();连接池实现
php
<?php
class ConnectionPool
{
private static $instance = null;
private $connections = [];
private $config;
private $maxConnections = 5;
private $currentConnections = 0;
private function __construct(array $config)
{
$this->config = $config;
}
public static function getInstance(array $config = []): self
{
if (self::$instance === null) {
self::$instance = new self($config);
}
return self::$instance;
}
public function getConnection(): AMQPStreamConnection
{
foreach ($this->connections as $conn) {
if ($conn['in_use'] === false && $conn['connection']->isConnected()) {
$conn['in_use'] = true;
return $conn['connection'];
}
}
if ($this->currentConnections < $this->maxConnections) {
$connection = $this->createNewConnection();
$this->connections[] = [
'connection' => $connection,
'in_use' => true
];
$this->currentConnections++;
return $connection;
}
throw new RuntimeException('Connection pool exhausted');
}
public function releaseConnection(AMQPStreamConnection $connection): void
{
foreach ($this->connections as &$conn) {
if ($conn['connection'] === $connection) {
$conn['in_use'] = false;
break;
}
}
}
private function createNewConnection(): AMQPStreamConnection
{
return new AMQPStreamConnection(
$this->config['host'] ?? 'localhost',
$this->config['port'] ?? 5672,
$this->config['user'] ?? 'guest',
$this->config['password'] ?? 'guest',
$this->config['vhost'] ?? '/',
false,
'AMQPLAIN',
null,
'en_US',
3.0,
130.0,
null,
true,
60
);
}
public function closeAll(): void
{
foreach ($this->connections as $conn) {
try {
$conn['connection']->close();
} catch (Exception $e) {
}
}
$this->connections = [];
$this->currentConnections = 0;
}
public function getStats(): array
{
return [
'total_connections' => $this->currentConnections,
'active_connections' => count(array_filter($this->connections, fn($c) => $c['in_use'])),
'max_connections' => $this->maxConnections
];
}
}
$pool = ConnectionPool::getInstance([
'host' => 'localhost',
'user' => 'guest',
'password' => 'guest'
]);
$conn1 = $pool->getConnection();
$conn2 = $pool->getConnection();
print_r($pool->getStats());
$pool->releaseConnection($conn1);
print_r($pool->getStats());
$pool->closeAll();心跳配置
php
<?php
class HeartbeatConfiguration
{
public static function createConnectionWithHeartbeat(int $heartbeatSeconds = 60): AMQPStreamConnection
{
return new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest',
'/',
false,
'AMQPLAIN',
null,
'en_US',
3.0,
$heartbeatSeconds * 2 + 10,
null,
true,
$heartbeatSeconds
);
}
public static function setupHeartbeatHandler($connection, callable $onTimeout = null): void
{
$connection->set_close_handler(function ($replyCode, $replyText) use ($onTimeout) {
echo "Connection closed: {$replyCode} - {$replyText}\n";
if ($onTimeout) {
$onTimeout($replyCode, $replyText);
}
});
}
public static function checkConnectionHealth(AMQPStreamConnection $connection): array
{
return [
'is_connected' => $connection->isConnected(),
'server_properties' => $connection->getServerProperties(),
'vhost' => $connection->getVhost()
];
}
}
$connection = HeartbeatConfiguration::createConnectionWithHeartbeat(30);
HeartbeatConfiguration::setupHeartbeatHandler($connection, function ($code, $text) {
echo "Heartbeat timeout detected: {$code} - {$text}\n";
});
print_r(HeartbeatConfiguration::checkConnectionHealth($connection));
$connection->close();自动重连机制
php
<?php
class ReconnectingConnection
{
private $config;
private $connection = null;
private $channel = null;
private $maxRetries = 5;
private $retryDelay = 1000;
private $listeners = [];
public function __construct(array $config)
{
$this->config = $config;
$this->connect();
}
private function connect(): void
{
$attempt = 0;
while ($attempt < $this->maxRetries) {
try {
$this->connection = new AMQPStreamConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['password'],
$this->config['vhost'],
false,
'AMQPLAIN',
null,
'en_US',
3.0,
130.0,
null,
true,
$this->config['heartbeat'] ?? 60
);
$this->setupCloseHandler();
echo "Connected successfully\n";
return;
} catch (Exception $e) {
$attempt++;
echo "Connection attempt {$attempt} failed: {$e->getMessage()}\n";
if ($attempt < $this->maxRetries) {
usleep($this->retryDelay * 1000 * $attempt);
}
}
}
throw new RuntimeException("Failed to connect after {$this->maxRetries} attempts");
}
private function setupCloseHandler(): void
{
$this->connection->set_close_handler(function ($replyCode, $replyText) {
echo "Connection closed: {$replyCode} - {$replyText}\n";
$this->handleDisconnect();
});
}
private function handleDisconnect(): void
{
$this->connection = null;
$this->channel = null;
$this->notifyListeners('disconnect');
echo "Attempting to reconnect...\n";
try {
$this->connect();
$this->notifyListeners('reconnect');
} catch (Exception $e) {
echo "Reconnection failed: {$e->getMessage()}\n";
}
}
public function getChannel()
{
if ($this->channel === null || !$this->isConnected()) {
$this->channel = $this->connection->channel();
}
return $this->channel;
}
public function isConnected(): bool
{
return $this->connection !== null && $this->connection->isConnected();
}
public function addListener(string $event, callable $callback): void
{
$this->listeners[$event][] = $callback;
}
private function notifyListeners(string $event): void
{
if (isset($this->listeners[$event])) {
foreach ($this->listeners[$event] as $callback) {
$callback();
}
}
}
public function close(): void
{
if ($this->channel) {
$this->channel->close();
}
if ($this->connection) {
$this->connection->close();
}
}
}
$reconnecting = new ReconnectingConnection([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'heartbeat' => 30
]);
$reconnecting->addListener('disconnect', function () {
echo "Disconnected from RabbitMQ\n";
});
$reconnecting->addListener('reconnect', function () {
echo "Reconnected to RabbitMQ\n";
});
$channel = $reconnecting->getChannel();
echo "Is connected: " . ($reconnecting->isConnected() ? 'Yes' : 'No') . "\n";
$reconnecting->close();多线程安全连接
php
<?php
class ThreadSafeConnectionManager
{
private static $connections = [];
private static $locks = [];
public static function getConnection(string $name = 'default', array $config = []): AMQPStreamConnection
{
if (!isset(self::$connections[$name])) {
self::$connections[$name] = self::createConnection($config);
}
$connection = self::$connections[$name];
if (!$connection->isConnected()) {
self::$connections[$name] = self::createConnection($config);
}
return self::$connections[$name];
}
public static function getChannel(string $connectionName = 'default')
{
$connection = self::getConnection($connectionName);
return $connection->channel();
}
private static function createConnection(array $config): AMQPStreamConnection
{
return new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
}
public static function closeAll(): void
{
foreach (self::$connections as $name => $connection) {
try {
$connection->close();
echo "Closed connection: {$name}\n";
} catch (Exception $e) {
echo "Error closing connection {$name}: {$e->getMessage()}\n";
}
}
self::$connections = [];
}
public static function getStats(): array
{
$stats = [];
foreach (self::$connections as $name => $connection) {
$stats[$name] = [
'connected' => $connection->isConnected()
];
}
return $stats;
}
}实际应用场景
1. Web 应用连接管理
php
<?php
class WebAppConnectionManager
{
private static $instance = null;
private $connection;
private $channels = [];
private function __construct(array $config)
{
$this->connection = new AMQPStreamConnection(
$config['host'],
$config['port'],
$config['user'],
$config['password'],
$config['vhost']
);
}
public static function getInstance(array $config = []): self
{
if (self::$instance === null) {
self::$instance = new self($config);
}
return self::$instance;
}
public function getProducerChannel()
{
if (!isset($this->channels['producer'])) {
$this->channels['producer'] = $this->connection->channel();
$this->channels['producer']->confirm_select();
}
return $this->channels['producer'];
}
public function getConsumerChannel()
{
if (!isset($this->channels['consumer'])) {
$this->channels['consumer'] = $this->connection->channel();
}
return $this->channels['consumer'];
}
public function getAdminChannel()
{
if (!isset($this->channels['admin'])) {
$this->channels['admin'] = $this->connection->channel();
}
return $this->channels['admin'];
}
public function __destruct()
{
foreach ($this->channels as $channel) {
try {
$channel->close();
} catch (Exception $e) {
}
}
try {
$this->connection->close();
} catch (Exception $e) {
}
}
}
$manager = WebAppConnectionManager::getInstance([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/'
]);
$producerChannel = $manager->getProducerChannel();
$consumerChannel = $manager->getConsumerChannel();2. 后台任务连接管理
php
<?php
class WorkerConnectionManager
{
private $connection;
private $channel;
private $config;
public function __construct(array $config)
{
$this->config = $config;
$this->connect();
}
private function connect(): void
{
$this->connection = new AMQPStreamConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['password'],
$this->config['vhost'],
false,
'AMQPLAIN',
null,
'en_US',
3.0,
0,
null,
false,
0
);
$this->channel = $this->connection->channel();
$this->channel->basic_qos(null, $this->config['prefetch_count'] ?? 1, null);
echo "Worker connected to RabbitMQ\n";
}
public function getChannel()
{
if (!$this->connection->isConnected()) {
$this->connect();
}
return $this->channel;
}
public function close(): void
{
if ($this->channel) {
$this->channel->close();
}
if ($this->connection) {
$this->connection->close();
}
}
}3. 高可用连接配置
php
<?php
class HighAvailabilityConnection
{
private $hosts;
private $currentHostIndex = 0;
private $connection = null;
private $config;
public function __construct(array $hosts, array $config = [])
{
$this->hosts = $hosts;
$this->config = $config;
$this->connect();
}
private function connect(): void
{
$connected = false;
$attempts = 0;
$maxAttempts = count($this->hosts) * 3;
while (!$connected && $attempts < $maxAttempts) {
$host = $this->hosts[$this->currentHostIndex];
try {
$this->connection = new AMQPStreamConnection(
$host['host'],
$host['port'] ?? 5672,
$this->config['user'] ?? 'guest',
$this->config['password'] ?? 'guest',
$this->config['vhost'] ?? '/',
false,
'AMQPLAIN',
null,
'en_US',
3.0,
130.0,
null,
true,
$this->config['heartbeat'] ?? 60
);
$this->setupFailover();
$connected = true;
echo "Connected to {$host['host']}\n";
} catch (Exception $e) {
echo "Failed to connect to {$host['host']}: {$e->getMessage()}\n";
$this->currentHostIndex = ($this->currentHostIndex + 1) % count($this->hosts);
$attempts++;
if ($attempts < $maxAttempts) {
sleep(1);
}
}
}
if (!$connected) {
throw new RuntimeException('Failed to connect to any RabbitMQ host');
}
}
private function setupFailover(): void
{
$this->connection->set_close_handler(function ($code, $text) {
echo "Connection lost: {$code} - {$text}\n";
$this->handleFailover();
});
}
private function handleFailover(): void
{
echo "Attempting failover...\n";
$this->connection = null;
$this->currentHostIndex = ($this->currentHostIndex + 1) % count($this->hosts);
try {
$this->connect();
} catch (Exception $e) {
echo "Failover failed: {$e->getMessage()}\n";
}
}
public function getConnection(): AMQPStreamConnection
{
if (!$this->connection || !$this->connection->isConnected()) {
$this->connect();
}
return $this->connection;
}
public function getChannel()
{
return $this->getConnection()->channel();
}
public function close(): void
{
if ($this->connection) {
$this->connection->close();
}
}
}
$haConnection = new HighAvailabilityConnection(
[
['host' => 'rabbitmq1.example.com', 'port' => 5672],
['host' => 'rabbitmq2.example.com', 'port' => 5672],
['host' => 'rabbitmq3.example.com', 'port' => 5672]
],
[
'user' => 'admin',
'password' => 'admin123',
'vhost' => '/production',
'heartbeat' => 30
]
);
$channel = $haConnection->getChannel();
$haConnection->close();常见问题与解决方案
1. 连接超时
问题原因:
- 网络问题
- Broker 未启动
- 防火墙阻止
解决方案:
php
<?php
class TimeoutResilientConnection
{
public static function connectWithTimeout(array $config, float $timeout = 5.0): ?AMQPStreamConnection
{
$startTime = microtime(true);
while (true) {
try {
return new AMQPStreamConnection(
$config['host'],
$config['port'],
$config['user'],
$config['password'],
$config['vhost'],
false,
'AMQPLAIN',
null,
'en_US',
min($timeout, 3.0),
$timeout * 2
);
} catch (Exception $e) {
if (microtime(true) - $startTime >= $timeout) {
throw new RuntimeException("Connection timeout after {$timeout} seconds");
}
usleep(500000);
}
}
}
}2. Channel 泄漏
问题原因:
- Channel 未正确关闭
- 异常导致 Channel 未释放
解决方案:
php
<?php
class ChannelPool
{
private $connection;
private $availableChannels = [];
private $usedChannels = [];
private $maxChannels = 100;
public function __construct(AMQPStreamConnection $connection)
{
$this->connection = $connection;
}
public function acquire()
{
if (!empty($this->availableChannels)) {
$channel = array_pop($this->availableChannels);
$this->usedChannels[] = $channel;
return $channel;
}
if (count($this->usedChannels) < $this->maxChannels) {
$channel = $this->connection->channel();
$this->usedChannels[] = $channel;
return $channel;
}
throw new RuntimeException('Channel pool exhausted');
}
public function release($channel): void
{
$index = array_search($channel, $this->usedChannels, true);
if ($index !== false) {
unset($this->usedChannels[$index]);
$this->availableChannels[] = $channel;
}
}
public function __destruct()
{
foreach ($this->usedChannels as $channel) {
$channel->close();
}
foreach ($this->availableChannels as $channel) {
$channel->close();
}
}
}3. 内存泄漏
问题原因:
- 长时间运行的消费者
- 消息未正确处理
解决方案:
php
<?php
class MemoryManagedConsumer
{
private $connection;
private $channel;
private $messageCount = 0;
private $restartThreshold = 10000;
public function __construct(AMQPStreamConnection $connection)
{
$this->connection = $connection;
$this->channel = $connection->channel();
}
public function consume(string $queue, callable $processor): void
{
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume($queue, '', false, false, false, false, function ($message) use ($processor, $queue) {
$processor($message);
$message->ack();
$this->messageCount++;
if ($this->messageCount >= $this->restartThreshold) {
$this->restartConsumer($queue, $processor);
}
});
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
private function restartConsumer(string $queue, callable $processor): void
{
echo "Restarting consumer after {$this->messageCount} messages\n";
$this->channel->close();
$this->channel = $this->connection->channel();
$this->messageCount = 0;
gc_collect_cycles();
}
}最佳实践建议
1. 连接配置优化
php
<?php
class OptimalConnectionConfig
{
public static function forWebApplication(): array
{
return [
'connection_timeout' => 3.0,
'read_write_timeout' => 130.0,
'heartbeat' => 60,
'keepalive' => true
];
}
public static function forLongRunningWorker(): array
{
return [
'connection_timeout' => 3.0,
'read_write_timeout' => 0,
'heartbeat' => 0,
'keepalive' => true
];
}
public static function forHighThroughput(): array
{
return [
'connection_timeout' => 3.0,
'read_write_timeout' => 60.0,
'heartbeat' => 30,
'keepalive' => true
];
}
}2. Channel 使用规范
php
<?php
class ChannelBestPractices
{
public static function createProducerChannel($connection)
{
$channel = $connection->channel();
$channel->confirm_select();
return $channel;
}
public static function createConsumerChannel($connection, int $prefetchCount = 1)
{
$channel = $connection->channel();
$channel->basic_qos(null, $prefetchCount, null);
return $channel;
}
public static function createTransactionalChannel($connection)
{
$channel = $connection->channel();
$channel->tx_select();
return $channel;
}
}3. 连接监控
php
<?php
class ConnectionMonitor
{
private $apiUrl;
private $credentials;
public function __construct(string $host, string $user, string $password)
{
$this->apiUrl = "http://{$host}:15672/api";
$this->credentials = base64_encode("{$user}:{$password}");
}
public function getConnectionCount(): int
{
$url = "{$this->apiUrl}/connections";
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_HTTPHEADER, [
"Authorization: Basic {$this->credentials}"
]);
$response = curl_exec($ch);
curl_close($ch);
$connections = json_decode($response, true);
return is_array($connections) ? count($connections) : 0;
}
public function getChannelCount(): int
{
$url = "{$this->apiUrl}/channels";
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_HTTPHEADER, [
"Authorization: Basic {$this->credentials}"
]);
$response = curl_exec($ch);
curl_close($ch);
$channels = json_decode($response, true);
return is_array($channels) ? count($channels) : 0;
}
}