Appearance
RabbitMQ 绑定(Binding)
概述
绑定(Binding)是 RabbitMQ 中连接 Exchange 和 Queue 的桥梁。它定义了消息从 Exchange 路由到 Queue 的规则。通过绑定,Exchange 可以根据 Routing Key 或其他条件将消息分发到一个或多个队列。绑定是 RabbitMQ 消息路由机制的核心组成部分。
绑定的核心作用
mermaid
graph TB
subgraph Exchange
E[Exchange]
end
subgraph Bindings
B1[Binding 1]
B2[Binding 2]
B3[Binding 3]
end
subgraph Queues
Q1[Queue 1]
Q2[Queue 2]
Q3[Queue 3]
end
E --> B1
E --> B2
E --> B3
B1 -->|routing_key: info| Q1
B2 -->|routing_key: error| Q2
B3 -->|routing_key: warning| Q3绑定的主要作用:
- 定义路由规则:指定消息如何从 Exchange 到达 Queue
- 连接组件:将 Exchange 和 Queue 关联起来
- 灵活配置:支持多种路由策略
- 动态管理:可以动态添加或删除绑定
核心知识点
1. 绑定的组成
一个绑定包含以下要素:
mermaid
graph LR
A[Binding] --> B[Source Exchange]
A --> C[Destination Queue]
A --> D[Routing Key]
A --> E[Arguments]| 要素 | 说明 |
|---|---|
| Source Exchange | 源交换机,消息来源 |
| Destination Queue | 目标队列,消息目的地 |
| Routing Key | 路由键,匹配规则 |
| Arguments | 额外参数,用于 Headers Exchange |
2. 绑定与交换机类型
不同类型的交换机使用绑定的方式不同:
mermaid
graph TB
subgraph Direct Exchange
D1[Binding] -->|精确匹配| D2[Routing Key]
end
subgraph Fanout Exchange
F1[Binding] -->|忽略| F2[Routing Key]
end
subgraph Topic Exchange
T1[Binding] -->|通配符匹配| T2[Routing Key]
end
subgraph Headers Exchange
H1[Binding] -->|消息头匹配| H2[Arguments]
endDirect Exchange 绑定
Binding Key = "order.created"
消息 Routing Key = "order.created" → 匹配成功
消息 Routing Key = "order.updated" → 匹配失败Fanout Exchange 绑定
任何 Routing Key → 所有绑定的队列都收到消息Topic Exchange 绑定
Binding Key = "order.*"
消息 Routing Key = "order.created" → 匹配成功
消息 Routing Key = "order.updated" → 匹配成功
消息 Routing Key = "order.item.created" → 匹配失败
Binding Key = "order.#"
消息 Routing Key = "order.created" → 匹配成功
消息 Routing Key = "order.item.created" → 匹配成功Headers Exchange 绑定
Binding Arguments = {"x-match": "all", "type": "order", "priority": "high"}
消息 Headers = {"type": "order", "priority": "high"} → 匹配成功
消息 Headers = {"type": "order"} → 匹配失败3. 多绑定场景
一个 Exchange 可以绑定到多个 Queue,一个 Queue 也可以绑定到多个 Exchange:
mermaid
graph TB
subgraph 多对多绑定
E1[Exchange 1]
E2[Exchange 2]
Q1[Queue 1]
Q2[Queue 2]
Q3[Queue 3]
E1 --> Q1
E1 --> Q2
E2 --> Q2
E2 --> Q3
end4. 绑定属性
| 属性 | 说明 |
|---|---|
| Source | 源交换机名称 |
| Vhost | 虚拟主机 |
| Destination | 目标队列名称 |
| Destination Type | 目标类型(queue 或 exchange) |
| Routing Key | 路由键 |
| Arguments | 额外参数 |
5. Exchange 到 Exchange 绑定
RabbitMQ 支持将一个 Exchange 绑定到另一个 Exchange:
mermaid
graph LR
P[Producer] --> E1[Exchange 1]
E1 -->|Binding| E2[Exchange 2]
E2 --> Q1[Queue 1]
E2 --> Q2[Queue 2]这种绑定方式可以实现更复杂的路由拓扑。
代码示例
基础绑定操作
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class BindingManager
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function createBinding(
string $exchange,
string $queue,
string $routingKey = ''
): void {
$this->channel->queue_bind($queue, $exchange, $routingKey);
echo "Binding created: {$exchange} -> {$queue} (key: {$routingKey})\n";
}
public function removeBinding(
string $exchange,
string $queue,
string $routingKey = ''
): void {
$this->channel->queue_unbind($queue, $exchange, $routingKey);
echo "Binding removed: {$exchange} -> {$queue} (key: {$routingKey})\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$manager = new BindingManager();
$manager->createBinding('logs_exchange', 'info_queue', 'info');
$manager->createBinding('logs_exchange', 'error_queue', 'error');
$manager->close();Direct Exchange 绑定
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class DirectBindingExample
{
private $channel;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function setup(): void
{
$this->channel->exchange_declare(
'direct_logs',
AMQPExchangeType::DIRECT,
false,
true,
false
);
$queues = [
'logs_info' => 'info',
'logs_warning' => 'warning',
'logs_error' => 'error',
'logs_debug' => 'debug'
];
foreach ($queues as $queue => $routingKey) {
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind($queue, 'direct_logs', $routingKey);
echo "Bound {$queue} to direct_logs with key: {$routingKey}\n";
}
}
public function addDynamicBinding(string $queue, string $routingKey): void
{
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind($queue, 'direct_logs', $routingKey);
echo "Dynamic binding added: {$queue} with key: {$routingKey}\n";
}
public function removeBinding(string $queue, string $routingKey): void
{
$this->channel->queue_unbind($queue, 'direct_logs', $routingKey);
echo "Binding removed: {$queue} with key: {$routingKey}\n";
}
}Topic Exchange 绑定
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class TopicBindingExample
{
private $channel;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function setup(): void
{
$this->channel->exchange_declare(
'topic_events',
AMQPExchangeType::TOPIC,
false,
true,
false
);
$bindings = [
'all_orders' => 'order.#',
'order_created' => 'order.created',
'order_updated' => 'order.updated.*',
'all_users' => 'user.#',
'all_errors' => '#.error',
'all_events' => '#'
];
foreach ($bindings as $queue => $pattern) {
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind($queue, 'topic_events', $pattern);
echo "Bound {$queue} to topic_events with pattern: {$pattern}\n";
}
}
public function testRouting(string $routingKey): array
{
$matchedQueues = [];
$bindings = $this->getBindings('topic_events');
foreach ($bindings as $binding) {
if ($this->matchPattern($binding['routing_key'], $routingKey)) {
$matchedQueues[] = $binding['destination'];
}
}
return $matchedQueues;
}
private function matchPattern(string $pattern, string $routingKey): bool
{
$patternParts = explode('.', $pattern);
$keyParts = explode('.', $routingKey);
return $this->matchParts($patternParts, $keyParts);
}
private function matchParts(array $pattern, array $key): bool
{
if (empty($pattern) && empty($key)) {
return true;
}
if (empty($pattern)) {
return false;
}
$p = array_shift($pattern);
if ($p === '#') {
return true;
}
if (empty($key)) {
return false;
}
$k = array_shift($key);
if ($p === '*' || $p === $k) {
return $this->matchParts($pattern, $key);
}
return false;
}
private function getBindings(string $exchange): array
{
return [];
}
}Headers Exchange 绑定
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
class HeadersBindingExample
{
private $channel;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function setup(): void
{
$this->channel->exchange_declare(
'headers_exchange',
AMQPExchangeType::HEADERS,
false,
true,
false
);
$this->channel->queue_declare('high_priority_orders', false, true, false, false);
$this->channel->queue_bind(
'high_priority_orders',
'headers_exchange',
'',
false,
new AMQPTable([
'x-match' => 'all',
'priority' => 'high',
'type' => 'order'
])
);
$this->channel->queue_declare('marketing_messages', false, true, false, false);
$this->channel->queue_bind(
'marketing_messages',
'headers_exchange',
'',
false,
new AMQPTable([
'x-match' => 'any',
'category' => 'marketing',
'source' => 'campaign'
])
);
$this->channel->queue_declare('vip_orders', false, true, false, false);
$this->channel->queue_bind(
'vip_orders',
'headers_exchange',
'',
false,
new AMQPTable([
'x-match' => 'all',
'user_type' => 'vip',
'type' => 'order'
])
);
echo "Headers exchange bindings setup completed\n";
}
public function addCustomBinding(
string $queue,
array $headers,
string $matchType = 'all'
): void {
$arguments = array_merge(['x-match' => $matchType], $headers);
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind(
$queue,
'headers_exchange',
'',
false,
new AMQPTable($arguments)
);
echo "Custom binding added for {$queue}\n";
}
}Exchange 到 Exchange 绑定
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class ExchangeToExchangeBinding
{
private $channel;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function setup(): void
{
$this->channel->exchange_declare(
'incoming_exchange',
AMQPExchangeType::DIRECT,
false,
true,
false
);
$this->channel->exchange_declare(
'processing_exchange',
AMQPExchangeType::TOPIC,
false,
true,
false
);
$this->channel->exchange_declare(
'archive_exchange',
AMQPExchangeType::FANOUT,
false,
true,
false
);
$this->channel->exchange_bind(
'processing_exchange',
'incoming_exchange',
'process'
);
$this->channel->exchange_bind(
'archive_exchange',
'incoming_exchange',
'archive'
);
$this->channel->queue_declare('orders_queue', false, true, false, false);
$this->channel->queue_bind('orders_queue', 'processing_exchange', 'order.#');
$this->channel->queue_declare('archive_queue', false, true, false, false);
$this->channel->queue_bind('archive_queue', 'archive_exchange');
echo "Exchange-to-exchange bindings setup completed\n";
}
public function addExchangeBinding(
string $destination,
string $source,
string $routingKey = ''
): void {
$this->channel->exchange_bind($destination, $source, $routingKey);
echo "Exchange binding created: {$source} -> {$destination} (key: {$routingKey})\n";
}
public function removeExchangeBinding(
string $destination,
string $source,
string $routingKey = ''
): void {
$this->channel->exchange_unbind($destination, $source, $routingKey);
echo "Exchange binding removed: {$source} -> {$destination}\n";
}
}动态绑定管理
php
<?php
class DynamicBindingManager
{
private $channel;
private $bindings = [];
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function subscribe(
string $exchange,
string $exchangeType,
string $queue,
string $routingKey = '',
array $arguments = []
): void {
$this->channel->exchange_declare($exchange, $exchangeType, false, true, false);
$this->channel->queue_declare($queue, false, true, false, false);
if (!empty($arguments)) {
$arguments = new AMQPTable($arguments);
} else {
$arguments = null;
}
$this->channel->queue_bind($queue, $exchange, $routingKey, false, $arguments);
$bindingKey = $this->getBindingKey($exchange, $queue, $routingKey);
$this->bindings[$bindingKey] = [
'exchange' => $exchange,
'queue' => $queue,
'routing_key' => $routingKey,
'arguments' => $arguments
];
echo "Subscribed: {$queue} to {$exchange} with key: {$routingKey}\n";
}
public function unsubscribe(
string $exchange,
string $queue,
string $routingKey = ''
): void {
$this->channel->queue_unbind($queue, $exchange, $routingKey);
$bindingKey = $this->getBindingKey($exchange, $queue, $routingKey);
unset($this->bindings[$bindingKey]);
echo "Unsubscribed: {$queue} from {$exchange}\n";
}
public function unsubscribeAll(string $queue): void
{
foreach ($this->bindings as $key => $binding) {
if ($binding['queue'] === $queue) {
$this->channel->queue_unbind(
$binding['queue'],
$binding['exchange'],
$binding['routing_key']
);
unset($this->bindings[$key]);
}
}
echo "All bindings removed for queue: {$queue}\n";
}
public function getBindings(): array
{
return $this->bindings;
}
private function getBindingKey(string $exchange, string $queue, string $routingKey): string
{
return "{$exchange}:{$queue}:{$routingKey}";
}
}实际应用场景
1. 多服务订阅模式
php
<?php
class MultiServiceSubscription
{
private $channel;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function setupEventSystem(): void
{
$this->channel->exchange_declare('domain_events', 'topic', false, true, false);
$services = [
'inventory_service' => [
'order.created',
'order.cancelled',
'product.updated'
],
'payment_service' => [
'order.created',
'order.paid',
'payment.failed'
],
'notification_service' => [
'order.#',
'user.#',
'payment.#'
],
'analytics_service' => [
'#'
]
];
foreach ($services as $service => $events) {
$queue = "{$service}_queue";
$this->channel->queue_declare($queue, false, true, false, false);
foreach ($events as $event) {
$this->channel->queue_bind($queue, 'domain_events', $event);
echo "Bound {$queue} to domain_events with: {$event}\n";
}
}
}
public function addServiceSubscription(
string $service,
array $eventPatterns
): void {
$queue = "{$service}_queue";
$this->channel->queue_declare($queue, false, true, false, false);
foreach ($eventPatterns as $pattern) {
$this->channel->queue_bind($queue, 'domain_events', $pattern);
}
}
public function removeServiceSubscription(string $service): void
{
$queue = "{$service}_queue";
// Note: 删除队列会自动删除所有绑定
$this->channel->queue_delete($queue);
}
}2. 路由规则管理
php
<?php
class RoutingRuleManager
{
private $channel;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function createRoutingRule(
string $ruleName,
string $exchange,
string $queue,
string $pattern,
string $description = ''
): void {
$this->channel->queue_bind($queue, $exchange, $pattern);
$this->saveRuleMetadata($ruleName, [
'exchange' => $exchange,
'queue' => $queue,
'pattern' => $pattern,
'description' => $description,
'created_at' => date('Y-m-d H:i:s')
]);
echo "Routing rule '{$ruleName}' created\n";
}
public function updateRoutingRule(
string $ruleName,
string $newPattern
): void {
$metadata = $this->getRuleMetadata($ruleName);
if (!$metadata) {
throw new Exception("Rule '{$ruleName}' not found");
}
$this->channel->queue_unbind(
$metadata['queue'],
$metadata['exchange'],
$metadata['pattern']
);
$this->channel->queue_bind(
$metadata['queue'],
$metadata['exchange'],
$newPattern
);
$metadata['pattern'] = $newPattern;
$metadata['updated_at'] = date('Y-m-d H:i:s');
$this->saveRuleMetadata($ruleName, $metadata);
echo "Routing rule '{$ruleName}' updated\n";
}
public function deleteRoutingRule(string $ruleName): void
{
$metadata = $this->getRuleMetadata($ruleName);
if (!$metadata) {
throw new Exception("Rule '{$ruleName}' not found");
}
$this->channel->queue_unbind(
$metadata['queue'],
$metadata['exchange'],
$metadata['pattern']
);
$this->deleteRuleMetadata($ruleName);
echo "Routing rule '{$ruleName}' deleted\n";
}
private function saveRuleMetadata(string $name, array $data): void
{
// 实现保存规则元数据逻辑
}
private function getRuleMetadata(string $name): ?array
{
// 实现获取规则元数据逻辑
return null;
}
private function deleteRuleMetadata(string $name): void
{
// 实现删除规则元数据逻辑
}
}3. 条件路由系统
php
<?php
use PhpAmqpLib\Wire\AMQPTable;
class ConditionalRoutingSystem
{
private $channel;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function setupConditionalRouting(): void
{
$this->channel->exchange_declare('conditional_router', 'headers', false, true, false);
$this->setupPriorityRouting();
$this->setupRegionRouting();
$this->setupTypeRouting();
}
private function setupPriorityRouting(): void
{
$priorities = ['high', 'medium', 'low'];
foreach ($priorities as $priority) {
$queue = "priority_{$priority}_queue";
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind(
$queue,
'conditional_router',
'',
false,
new AMQPTable([
'x-match' => 'all',
'priority' => $priority
])
);
}
}
private function setupRegionRouting(): void
{
$regions = ['us', 'eu', 'asia'];
foreach ($regions as $region) {
$queue = "region_{$region}_queue";
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind(
$queue,
'conditional_router',
'',
false,
new AMQPTable([
'x-match' => 'any',
'region' => $region,
'fallback' => 'true'
])
);
}
}
private function setupTypeRouting(): void
{
$types = [
'order' => ['type' => 'order'],
'payment' => ['type' => 'payment'],
'notification' => ['type' => 'notification']
];
foreach ($types as $type => $headers) {
$queue = "{$type}_queue";
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind(
$queue,
'conditional_router',
'',
false,
new AMQPTable(array_merge(['x-match' => 'all'], $headers))
);
}
}
}常见问题与解决方案
1. 绑定冲突
问题原因:
- 多个绑定使用相同的 Routing Key
- 绑定规则重叠
解决方案:
php
<?php
class BindingConflictResolver
{
private $channel;
public function analyzeBindings(string $exchange): array
{
$bindings = $this->getExchangeBindings($exchange);
$conflicts = [];
$routingKeys = [];
foreach ($bindings as $binding) {
$key = $binding['routing_key'];
if (isset($routingKeys[$key])) {
$conflicts[] = [
'routing_key' => $key,
'queues' => [$routingKeys[$key], $binding['destination']]
];
} else {
$routingKeys[$key] = $binding['destination'];
}
}
return $conflicts;
}
public function resolveConflict(string $exchange, string $routingKey, string $preferredQueue): void
{
$bindings = $this->getExchangeBindings($exchange);
foreach ($bindings as $binding) {
if ($binding['routing_key'] === $routingKey &&
$binding['destination'] !== $preferredQueue) {
$this->channel->queue_unbind(
$binding['destination'],
$exchange,
$routingKey
);
}
}
}
private function getExchangeBindings(string $exchange): array
{
return [];
}
}2. 绑定丢失
问题原因:
- 队列或交换机被删除
- 非持久化绑定
- Broker 重启
解决方案:
php
<?php
class PersistentBindingManager
{
private $channel;
private $bindingStorage;
public function __construct(AMQPStreamConnection $connection, $storage)
{
$this->channel = $connection->channel();
$this->bindingStorage = $storage;
}
public function createPersistentBinding(
string $exchange,
string $queue,
string $routingKey = '',
array $arguments = []
): void {
$this->channel->exchange_declare($exchange, 'direct', false, true, false);
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind($queue, $exchange, $routingKey);
$this->bindingStorage->save([
'exchange' => $exchange,
'queue' => $queue,
'routing_key' => $routingKey,
'arguments' => $arguments,
'created_at' => time()
]);
}
public function restoreBindings(): void
{
$bindings = $this->bindingStorage->getAll();
foreach ($bindings as $binding) {
try {
$this->channel->queue_bind(
$binding['queue'],
$binding['exchange'],
$binding['routing_key']
);
echo "Restored binding: {$binding['exchange']} -> {$binding['queue']}\n";
} catch (Exception $e) {
echo "Failed to restore binding: {$e->getMessage()}\n";
}
}
}
}3. 绑定性能问题
问题原因:
- 绑定数量过多
- 复杂的 Topic 模式
- Headers Exchange 使用
解决方案:
php
<?php
class OptimizedBindingStrategy
{
private $channel;
public function optimizeTopicBindings(array $bindings): array
{
$optimized = [];
$groupedByQueue = [];
foreach ($bindings as $binding) {
$queue = $binding['queue'];
$pattern = $binding['routing_key'];
if (!isset($groupedByQueue[$queue])) {
$groupedByQueue[$queue] = [];
}
$groupedByQueue[$queue][] = $pattern;
}
foreach ($groupedByQueue as $queue => $patterns) {
$mergedPatterns = $this->mergePatterns($patterns);
foreach ($mergedPatterns as $pattern) {
$optimized[] = [
'queue' => $queue,
'routing_key' => $pattern
];
}
}
return $optimized;
}
private function mergePatterns(array $patterns): array
{
if (count($patterns) <= 5) {
return $patterns;
}
$prefixes = [];
foreach ($patterns as $pattern) {
$parts = explode('.', $pattern);
if (count($parts) >= 2) {
$prefix = $parts[0];
$prefixes[$prefix] = true;
}
}
if (count($prefixes) === 1) {
return [array_key_first($prefixes) . '.#'];
}
if (count($prefixes) < count($patterns) / 2) {
$merged = [];
foreach ($prefixes as $prefix => $true) {
$merged[] = "{$prefix}.#";
}
return $merged;
}
return ['#'];
}
public function getRecommendedExchangeType(array $useCase): string
{
if (count($useCase['bindings']) > 100) {
return 'fanout';
}
if ($useCase['needs_wildcards'] ?? false) {
return 'topic';
}
return 'direct';
}
}最佳实践建议
1. 绑定命名和组织
php
<?php
class BindingOrganization
{
public static function generateBindingName(
string $exchange,
string $queue,
string $routingKey
): string {
return "{$exchange}.{$queue}.{$routingKey}";
}
public static function organizeBindingsByService(array $bindings): array
{
$organized = [];
foreach ($bindings as $binding) {
$service = self::extractServiceFromQueue($binding['queue']);
if (!isset($organized[$service])) {
$organized[$service] = [];
}
$organized[$service][] = $binding;
}
return $organized;
}
private static function extractServiceFromQueue(string $queue): string
{
$parts = explode('_', $queue);
return $parts[0] ?? 'unknown';
}
}2. 绑定文档化
php
<?php
class BindingDocumentation
{
public static function generateDocumentation(array $bindings): string
{
$doc = "# RabbitMQ Bindings Documentation\n\n";
foreach ($bindings as $binding) {
$doc .= "## {$binding['name']}\n";
$doc .= "- **Exchange**: {$binding['exchange']}\n";
$doc .= "- **Queue**: {$binding['queue']}\n";
$doc .= "- **Routing Key**: {$binding['routing_key']}\n";
$doc .= "- **Description**: {$binding['description']}\n";
$doc .= "- **Created**: {$binding['created_at']}\n\n";
}
return $doc;
}
public static function exportToJson(array $bindings): string
{
return json_encode($bindings, JSON_PRETTY_PRINT);
}
}3. 绑定监控
php
<?php
class BindingMonitor
{
private $apiUrl;
private $credentials;
public function __construct(string $host, string $user, string $password)
{
$this->apiUrl = "http://{$host}:15672/api";
$this->credentials = base64_encode("{$user}:{$password}");
}
public function getBindings(string $vhost): array
{
$url = "{$this->apiUrl}/bindings/" . urlencode($vhost);
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_HTTPHEADER, [
"Authorization: Basic {$this->credentials}"
]);
$response = curl_exec($ch);
curl_close($ch);
return json_decode($response, true);
}
public function getExchangeBindings(string $vhost, string $exchange): array
{
$bindings = $this->getBindings($vhost);
return array_filter($bindings, function ($binding) use ($exchange) {
return $binding['source'] === $exchange;
});
}
public function checkBindingHealth(string $vhost, array $expectedBindings): array
{
$actualBindings = $this->getBindings($vhost);
$missing = [];
$extra = [];
foreach ($expectedBindings as $expected) {
$found = false;
foreach ($actualBindings as $actual) {
if ($actual['source'] === $expected['exchange'] &&
$actual['destination'] === $expected['queue'] &&
$actual['routing_key'] === $expected['routing_key']) {
$found = true;
break;
}
}
if (!$found) {
$missing[] = $expected;
}
}
return [
'missing' => $missing,
'extra' => $extra,
'healthy' => empty($missing)
];
}
}