Skip to content

Headers Exchange

概述

Headers Exchange(头部交换机)是一种根据消息头(headers)属性进行路由的交换机类型。与 Direct 和 Topic Exchange 不同,它不依赖 routing key,而是通过消息头的键值对进行匹配。

核心原理

Headers Exchange 通过消息的 headers 属性进行路由匹配,支持以下匹配规则:

  • x-match: all - 所有指定的 header 都必须匹配(AND 逻辑)
  • x-match: any - 任意一个 header 匹配即可(OR 逻辑)
mermaid
graph LR
    P[生产者] -->|headers: type=order, priority=high| E[Headers Exchange]
    
    E -->|x-match: all<br/>type=order, priority=high| Q1[高优先级订单队列]
    E -->|x-match: any<br/>type=order| Q2[订单队列]
    E -->|x-match: all<br/>type=payment| Q3[支付队列]
    
    style E fill:#f9f,stroke:#333

匹配规则详解

绑定参数消息 Headers匹配结果
x-match: all, type=ordertype=order✓ 匹配
x-match: all, type=order, priority=hightype=order, priority=high✓ 匹配
x-match: all, type=order, priority=hightype=order, priority=low✗ 不匹配
x-match: any, type=order, type=paymenttype=order✓ 匹配
x-match: any, type=order, type=paymenttype=refund✗ 不匹配

PHP 代码示例

生产者 - 发送带 headers 的消息

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$exchangeName = 'headers_logs';
$channel->exchange_declare($exchangeName, AMQPExchangeType::HEADERS, false, true, false);

// 发送不同类型的消息
$messages = [
    [
        'body' => json_encode(['event' => 'order_created', 'order_id' => 'ORD-001']),
        'headers' => [
            'type' => 'order',
            'priority' => 'high',
            'source' => 'web'
        ]
    ],
    [
        'body' => json_encode(['event' => 'payment_completed', 'payment_id' => 'PAY-001']),
        'headers' => [
            'type' => 'payment',
            'priority' => 'normal',
            'source' => 'api'
        ]
    ],
    [
        'body' => json_encode(['event' => 'notification_sent', 'notification_id' => 'NOT-001']),
        'headers' => [
            'type' => 'notification',
            'priority' => 'low',
            'source' => 'system'
        ]
    ]
];

foreach ($messages as $msgData) {
    $message = new AMQPMessage(
        $msgData['body'],
        [
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'application_headers' => new \PhpAmqpLib\Wire\AMQPTable($msgData['headers'])
        ]
    );
    
    $channel->basic_publish($message, $exchangeName);
    
    echo "消息已发送 - Headers: " . json_encode($msgData['headers']) . "\n";
}

$channel->close();
$connection->close();

消费者 - 绑定 headers 匹配规则

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$exchangeName = 'headers_logs';
$channel->exchange_declare($exchangeName, AMQPExchangeType::HEADERS, false, true, false);

$queueName = 'high-priority-orders-queue';
$channel->queue_declare($queueName, false, true, false, false);

// 绑定规则:type=order AND priority=high
$bindingArgs = new AMQPTable([
    'x-match' => 'all',
    'type' => 'order',
    'priority' => 'high'
]);

$channel->queue_bind($queueName, $exchangeName, '', false, $bindingArgs);

echo "队列已绑定 - 匹配规则: x-match=all, type=order, priority=high\n";
echo "等待消息...\n";

$callback = function (AMQPMessage $msg) {
    $headers = $msg->get('application_headers')->getNativeData();
    
    echo "收到消息:\n";
    echo "  Headers: " . json_encode($headers) . "\n";
    echo "  Body: " . $msg->getBody() . "\n";
    echo "-------------------\n";
    
    $msg->ack();
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

多条件 OR 匹配

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$exchangeName = 'headers_logs';
$channel->exchange_declare($exchangeName, AMQPExchangeType::HEADERS, false, true, false);

$queueName = 'all-orders-and-payments-queue';
$channel->queue_declare($queueName, false, true, false, false);

// 绑定规则:type=order OR type=payment
$bindingArgs = new AMQPTable([
    'x-match' => 'any',
    'type' => 'order',
    'type' => 'payment'
]);

// 注意:PHP 数组不允许重复键,需要分开绑定
$channel->queue_bind($queueName, $exchangeName, '', false, new AMQPTable([
    'x-match' => 'all',
    'type' => 'order'
]));

$channel->queue_bind($queueName, $exchangeName, '', false, new AMQPTable([
    'x-match' => 'all',
    'type' => 'payment'
]));

echo "队列已绑定 - 匹配 type=order 或 type=payment\n";

$channel->close();
$connection->close();

完整的消息处理器类

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;

class HeadersExchangePublisher
{
    private $channel;
    private $exchangeName;
    
    public function __construct($channel, $exchangeName)
    {
        $this->channel = $channel;
        $this->exchangeName = $exchangeName;
        
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::HEADERS,
            false,
            true,
            false
        );
    }
    
    public function publish($body, array $headers)
    {
        $message = new AMQPMessage(
            json_encode($body),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'application_headers' => new AMQPTable($headers)
            ]
        );
        
        $this->channel->basic_publish($message, $this->exchangeName);
    }
    
    public function publishOrder($orderData, $priority = 'normal')
    {
        $this->publish($orderData, [
            'type' => 'order',
            'priority' => $priority,
            'timestamp' => time()
        ]);
    }
    
    public function publishPayment($paymentData, $priority = 'normal')
    {
        $this->publish($paymentData, [
            'type' => 'payment',
            'priority' => $priority,
            'timestamp' => time()
        ]);
    }
}

class HeadersExchangeConsumer
{
    private $channel;
    private $exchangeName;
    
    public function __construct($channel, $exchangeName)
    {
        $this->channel = $channel;
        $this->exchangeName = $exchangeName;
        
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::HEADERS,
            false,
            true,
            false
        );
    }
    
    public function subscribe($queueName, array $matchHeaders, $xMatch = 'all', $callback)
    {
        $this->channel->queue_declare($queueName, false, true, false, false);
        
        $bindingArgs = new AMQPTable(array_merge(
            ['x-match' => $xMatch],
            $matchHeaders
        ));
        
        $this->channel->queue_bind($queueName, $this->exchangeName, '', false, $bindingArgs);
        
        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume($queueName, '', false, false, false, false, $callback);
        
        echo "已订阅队列: {$queueName}\n";
        echo "匹配规则: x-match={$xMatch}, headers=" . json_encode($matchHeaders) . "\n";
    }
    
    public function wait()
    {
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
}

实际应用场景

1. 多维度消息路由

php
<?php

class MultiDimensionalRouter
{
    private $publisher;
    
    public function __construct($channel)
    {
        $this->publisher = new HeadersExchangePublisher($channel, 'multi_dim_events');
    }
    
    public function routeEvent($event, $region, $env, $priority)
    {
        $this->publisher->publish($event, [
            'region' => $region,        // cn, us, eu
            'env' => $env,              // prod, staging, dev
            'priority' => $priority,    // high, normal, low
            'timestamp' => time()
        ]);
    }
}

// 消费者订阅特定区域和环境的事件
$consumer = new HeadersExchangeConsumer($channel, 'multi_dim_events');

// 订阅中国生产环境的高优先级事件
$consumer->subscribe(
    'cn-prod-high-priority-queue',
    ['region' => 'cn', 'env' => 'prod', 'priority' => 'high'],
    'all',
    $callback
);

2. 复杂过滤订阅

php
<?php

class EventSubscriptionManager
{
    private $channel;
    private $exchangeName = 'filtered_events';
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::HEADERS,
            false,
            true,
            false
        );
    }
    
    public function createSubscription($subscriberId, array $filters)
    {
        $queueName = "subscriber_{$subscriberId}_queue";
        $this->channel->queue_declare($queueName, false, true, false, false);
        
        // 根据过滤器类型创建绑定
        if (isset($filters['require_all'])) {
            $bindingArgs = new AMQPTable(array_merge(
                ['x-match' => 'all'],
                $filters['require_all']
            ));
            $this->channel->queue_bind($queueName, $this->exchangeName, '', false, $bindingArgs);
        }
        
        if (isset($filters['require_any'])) {
            $bindingArgs = new AMQPTable(array_merge(
                ['x-match' => 'any'],
                $filters['require_any']
            ));
            $this->channel->queue_bind($queueName, $this->exchangeName, '', false, $bindingArgs);
        }
        
        return $queueName;
    }
    
    public function publishEvent($event, array $attributes)
    {
        $message = new AMQPMessage(
            json_encode($event),
            [
                'content_type' => 'application/json',
                'application_headers' => new AMQPTable($attributes)
            ]
        );
        
        $this->channel->basic_publish($message, $this->exchangeName);
    }
}

// 使用示例
$manager = new EventSubscriptionManager($channel);

// 创建订阅:只接收 type=alert 且 severity=critical 的消息
$queueName = $manager->createSubscription('alert-handler', [
    'require_all' => [
        'type' => 'alert',
        'severity' => 'critical'
    ]
]);

// 发布事件
$manager->publishEvent(
    ['message' => 'Database connection failed'],
    ['type' => 'alert', 'severity' => 'critical', 'source' => 'monitoring']
);

常见问题与解决方案

问题 1: Headers 匹配不生效

症状: 消息发送成功但队列未收到

原因: headers 名称或值不匹配

解决方案: 检查 headers 格式

php
<?php

// 正确设置 headers
$message = new AMQPMessage(
    $body,
    [
        'application_headers' => new AMQPTable([
            'type' => 'order',
            'priority' => 'high'
        ])
    ]
);

// 绑定时也要匹配
$bindingArgs = new AMQPTable([
    'x-match' => 'all',
    'type' => 'order',
    'priority' => 'high'
]);

问题 2: 性能问题

症状: Headers Exchange 路由性能较差

解决方案: Headers Exchange 匹配开销较大,建议:

php
<?php

// 如果只需要简单路由,使用 Direct 或 Topic Exchange
// Headers Exchange 适合复杂的多维度匹配场景

// 优化:减少 headers 数量
$headers = [
    'type' => 'order',    // 只保留必要的 headers
    'priority' => 'high'
];

最佳实践建议

  1. 合理使用场景: Headers Exchange 适合多维度、复杂条件的路由场景
  2. 简化 headers: 只包含必要的 headers,减少匹配开销
  3. 避免过度绑定: 大量绑定会影响性能
  4. 文档化 headers: 维护 headers 字典,明确每个 header 的含义和取值
  5. 考虑替代方案: 简单场景优先使用 Direct 或 Topic Exchange

相关链接