Skip to content

消息过滤

概述

消息过滤是 RabbitMQ 中实现消息选择性消费的重要机制。通过设置过滤条件,消费者可以只接收感兴趣的消息,避免处理无关消息,提高系统效率和资源利用率。

核心知识点

消息过滤方式

RabbitMQ 提供多种消息过滤方式:

方式说明适用场景
Routing Key 过滤基于路由键精确或模式匹配日志级别、消息类型分类
Headers 过滤基于消息头部属性匹配复杂条件过滤
Topic 模式过滤基于通配符匹配多维度消息分类
应用层过滤消费者端代码过滤业务逻辑过滤

过滤机制架构

mermaid
graph TB
    subgraph 生产者
        P[消息生产者]
    end

    subgraph RabbitMQ
        E[Exchange]
        Q1[队列1<br/>过滤条件: level=error]
        Q2[队列2<br/>过滤条件: level=info]
        Q3[队列3<br/>过滤条件: level=*]
    end

    subgraph 消费者
        C1[错误日志处理器]
        C2[信息日志处理器]
        C3[全部日志处理器]
    end

    P -->|level=error| E
    P -->|level=info| E
    P -->|level=debug| E

    E -->|匹配| Q1
    E -->|匹配| Q2
    E -->|匹配| Q3

    Q1 --> C1
    Q2 --> C2
    Q3 --> C3

Headers 过滤原理

mermaid
sequenceDiagram
    participant P as 生产者
    participant E as Headers Exchange
    participant Q1 as 队列A<br/>x-match: all<br/>type=log, level=error
    participant Q2 as 队列B<br/>x-match: any<br/>type=log, level=info
    participant C as 消费者

    Note over P: 发送消息<br/>headers: type=log, level=error
    P->>E: 消息
    E->>Q1: 匹配成功(all条件)
    E->>Q2: 不匹配(any条件需要level=info)
    Q1->>C: 消费消息

    Note over P: 发送消息<br/>headers: type=log, level=info
    P->>E: 消息
    E->>Q1: 不匹配
    E->>Q2: 匹配成功(any条件)
    Q2->>C: 消费消息

PHP 代码示例

基于 Headers 的消息过滤

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class HeadersFilterPublisher
{
    private $connection;
    private $channel;
    private $exchangeName = 'headers_filter_exchange';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'headers',
            false,
            true,
            false
        );
    }

    public function publish($message, array $headers)
    {
        $msg = new AMQPMessage(
            json_encode([
                'content' => $message,
                'headers' => $headers,
                'timestamp' => time()
            ]),
            [
                'content_type' => 'application/json',
                'application_headers' => new AMQPTable($headers)
            ]
        );

        $this->channel->basic_publish(
            $msg,
            $this->exchangeName
        );

        echo "消息已发送,过滤条件: " . json_encode($headers) . "\n";
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

class HeadersFilterConsumer
{
    private $connection;
    private $channel;
    private $exchangeName = 'headers_filter_exchange';
    private $queueName;

    public function __construct(array $filterHeaders, $xMatch = 'all')
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'headers',
            false,
            true,
            false
        );

        list($this->queueName, ,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );

        $bindHeaders = array_merge($filterHeaders, ['x-match' => $xMatch]);

        $this->channel->queue_bind(
            $this->queueName,
            $this->exchangeName,
            '',
            false,
            new AMQPTable($bindHeaders)
        );

        echo "消费者已创建,过滤条件: " . json_encode($filterHeaders) . " (x-match: {$xMatch})\n";
    }

    public function consume()
    {
        $callback = function (AMQPMessage $message) {
            $body = json_decode($message->body, true);
            $headers = $message->get('application_headers');

            echo sprintf(
                " [%s] 收到消息: %s\n     头部: %s\n",
                date('H:i:s'),
                $body['content'],
                json_encode($headers ? $headers->getNativeData() : [])
            );

            $message->ack();
        };

        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

基于 Topic 的消息过滤

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class TopicFilterPublisher
{
    private $connection;
    private $channel;
    private $exchangeName = 'topic_filter_exchange';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'topic',
            false,
            true,
            false
        );
    }

    public function publish($routingKey, $message)
    {
        $msg = new AMQPMessage(
            json_encode([
                'content' => $message,
                'routing_key' => $routingKey,
                'timestamp' => time()
            ]),
            ['content_type' => 'application/json']
        );

        $this->channel->basic_publish(
            $msg,
            $this->exchangeName,
            $routingKey
        );

        echo "消息已发送,路由键: {$routingKey}\n";
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

class TopicFilterConsumer
{
    private $connection;
    private $channel;
    private $exchangeName = 'topic_filter_exchange';
    private $queueName;
    private $bindingKeys;

    public function __construct(array $bindingKeys)
    {
        $this->bindingKeys = $bindingKeys;

        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'topic',
            false,
            true,
            false
        );

        list($this->queueName, ,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );

        foreach ($bindingKeys as $bindingKey) {
            $this->channel->queue_bind(
                $this->queueName,
                $this->exchangeName,
                $bindingKey
            );
        }

        echo "消费者已创建,绑定模式: " . implode(', ', $bindingKeys) . "\n";
    }

    public function consume()
    {
        $callback = function (AMQPMessage $message) {
            $body = json_decode($message->body, true);
            $routingKey = $message->delivery_info['routing_key'];

            echo sprintf(
                " [%s] 路由键: %s, 消息: %s\n",
                date('H:i:s'),
                $routingKey,
                $body['content']
            );

            $message->ack();
        };

        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

完整示例:日志过滤系统

php
<?php

class LogFilterSystem
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new HeadersFilterPublisher();
    }

    public function sendLog($level, $source, $message, array $extra = [])
    {
        $headers = array_merge([
            'level' => $level,
            'source' => $source
        ], $extra);

        $this->publisher->publish($message, $headers);
    }
}

class ErrorLogConsumer
{
    public function __construct()
    {
        $this->consumer = new HeadersFilterConsumer(
            ['level' => 'error'],
            'all'
        );
    }

    public function start()
    {
        echo "错误日志消费者启动,只接收 level=error 的消息\n";
        $this->consumer->consume();
    }
}

class SystemLogConsumer
{
    public function __construct()
    {
        $this->consumer = new HeadersFilterConsumer(
            ['source' => 'system'],
            'all'
        );
    }

    public function start()
    {
        echo "系统日志消费者启动,只接收 source=system 的消息\n";
        $this->consumer->consume();
    }
}

class CriticalLogConsumer
{
    public function __construct()
    {
        $this->consumer = new HeadersFilterConsumer(
            ['level' => 'error', 'priority' => 'high'],
            'all'
        );
    }

    public function start()
    {
        echo "关键日志消费者启动,接收 level=error 且 priority=high 的消息\n";
        $this->consumer->consume();
    }
}

class AnyErrorOrWarningConsumer
{
    public function __construct()
    {
        $this->consumer = new HeadersFilterConsumer(
            ['level' => 'error'],
            'any'
        );
    }

    public function start()
    {
        echo "任意错误或警告消费者启动\n";
        $this->consumer->consume();
    }
}

应用层消息过滤

php
<?php

class ApplicationLayerFilter
{
    private $filters = [];

    public function addFilter($name, callable $filter)
    {
        $this->filters[$name] = $filter;
    }

    public function matches($message, $filterName = null)
    {
        if ($filterName !== null) {
            if (!isset($this->filters[$filterName])) {
                return false;
            }

            return call_user_func($this->filters[$filterName], $message);
        }

        foreach ($this->filters as $filter) {
            if (call_user_func($filter, $message)) {
                return true;
            }
        }

        return false;
    }

    public function filter(array $messages)
    {
        return array_filter($messages, function ($message) {
            return $this->matches($message);
        });
    }
}

class SmartLogConsumer
{
    private $connection;
    private $channel;
    private $filter;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->filter = new ApplicationLayerFilter();
        $this->setupFilters();
    }

    private function setupFilters()
    {
        $this->filter->addFilter('high_priority', function ($message) {
            $data = json_decode($message->body, true);
            return isset($data['priority']) && $data['priority'] === 'high';
        });

        $this->filter->addFilter('error_only', function ($message) {
            $data = json_decode($message->body, true);
            return isset($data['level']) && $data['level'] === 'error';
        });

        $this->filter->addFilter('recent_only', function ($message) {
            $data = json_decode($message->body, true);
            if (!isset($data['timestamp'])) {
                return false;
            }
            return (time() - $data['timestamp']) < 3600;
        });
    }

    public function consume($queueName, $filterName = null)
    {
        $callback = function (AMQPMessage $message) use ($filterName) {
            if ($this->filter->matches($message, $filterName)) {
                $body = json_decode($message->body, true);
                echo "处理消息: " . json_encode($body, JSON_UNESCAPED_UNICODE) . "\n";
            } else {
                echo "跳过不匹配的消息\n";
            }

            $message->ack();
        };

        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }
}

复合条件过滤

php
<?php

class CompositeFilter
{
    private $conditions = [];

    public function addCondition($field, $operator, $value)
    {
        $this->conditions[] = [
            'field' => $field,
            'operator' => $operator,
            'value' => $value
        ];
    }

    public function evaluate(array $data)
    {
        foreach ($this->conditions as $condition) {
            if (!$this->evaluateCondition($data, $condition)) {
                return false;
            }
        }

        return true;
    }

    private function evaluateCondition($data, $condition)
    {
        $field = $condition['field'];
        $operator = $condition['operator'];
        $value = $condition['value'];

        if (!isset($data[$field])) {
            return false;
        }

        $fieldValue = $data[$field];

        switch ($operator) {
            case '=':
                return $fieldValue === $value;
            case '!=':
                return $fieldValue !== $value;
            case '>':
                return $fieldValue > $value;
            case '>=':
                return $fieldValue >= $value;
            case '<':
                return $fieldValue < $value;
            case '<=':
                return $fieldValue <= $value;
            case 'in':
                return in_array($fieldValue, (array)$value);
            case 'not_in':
                return !in_array($fieldValue, (array)$value);
            case 'contains':
                return strpos($fieldValue, $value) !== false;
            case 'regex':
                return preg_match($value, $fieldValue) === 1;
            default:
                return false;
        }
    }
}

class FilterBuilder
{
    public static function create()
    {
        return new CompositeFilter();
    }

    public static function level($level)
    {
        $filter = new CompositeFilter();
        $filter->addCondition('level', '=', $level);
        return $filter;
    }

    public static function levelIn(array $levels)
    {
        $filter = new CompositeFilter();
        $filter->addCondition('level', 'in', $levels);
        return $filter;
    }

    public static function sourceAndLevel($source, $level)
    {
        $filter = new CompositeFilter();
        $filter->addCondition('source', '=', $source);
        $filter->addCondition('level', '=', $level);
        return $filter;
    }
}

实际应用场景

1. 多租户消息过滤

php
<?php

class TenantMessageFilter
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new HeadersFilterPublisher();
    }

    public function sendToTenant($tenantId, $messageType, $message)
    {
        $this->publisher->publish($message, [
            'tenant_id' => $tenantId,
            'message_type' => $messageType
        ]);
    }

    public function sendToAllTenants($messageType, $message)
    {
        $this->publisher->publish($message, [
            'message_type' => $messageType,
            'broadcast' => true
        ]);
    }
}

class TenantConsumer
{
    private $tenantId;

    public function __construct($tenantId)
    {
        $this->tenantId = $tenantId;
        $this->consumer = new HeadersFilterConsumer(
            ['tenant_id' => $tenantId],
            'all'
        );
    }

    public function start()
    {
        echo "租户 {$this->tenantId} 消费者启动\n";
        $this->consumer->consume();
    }
}

class BroadcastConsumer
{
    public function __construct()
    {
        $this->consumer = new HeadersFilterConsumer(
            ['broadcast' => true],
            'all'
        );
    }

    public function start()
    {
        echo "广播消息消费者启动\n";
        $this->consumer->consume();
    }
}

2. 优先级消息过滤

php
<?php

class PriorityMessageFilter
{
    private $publisher;

    const PRIORITY_LOW = 'low';
    const PRIORITY_NORMAL = 'normal';
    const PRIORITY_HIGH = 'high';
    const PRIORITY_CRITICAL = 'critical';

    public function __construct()
    {
        $this->publisher = new HeadersFilterPublisher();
    }

    public function send($message, $priority, $category = 'general')
    {
        $this->publisher->publish($message, [
            'priority' => $priority,
            'category' => $category
        ]);
    }

    public function sendCritical($message, $category = 'general')
    {
        $this->send($message, self::PRIORITY_CRITICAL, $category);
    }

    public function sendHigh($message, $category = 'general')
    {
        $this->send($message, self::PRIORITY_HIGH, $category);
    }
}

class CriticalMessageHandler
{
    public function __construct()
    {
        $this->consumer = new HeadersFilterConsumer(
            ['priority' => PriorityMessageFilter::PRIORITY_CRITICAL],
            'all'
        );
    }

    public function start()
    {
        echo "关键消息处理器启动\n";
        $this->consumer->consume();
    }
}

class HighPriorityHandler
{
    public function __construct()
    {
        $this->consumer = new HeadersFilterConsumer(
            ['priority' => PriorityMessageFilter::PRIORITY_HIGH],
            'all'
        );
    }

    public function start()
    {
        echo "高优先级消息处理器启动\n";
        $this->consumer->consume();
    }
}

3. 地理位置消息过滤

php
<?php

class GeoMessageFilter
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new HeadersFilterPublisher();
    }

    public function sendToRegion($region, $country, $message)
    {
        $this->publisher->publish($message, [
            'region' => $region,
            'country' => $country
        ]);
    }

    public function sendToCountry($country, $message)
    {
        $this->publisher->publish($message, [
            'country' => $country
        ]);
    }
}

class RegionConsumer
{
    public function __construct($region)
    {
        $this->consumer = new HeadersFilterConsumer(
            ['region' => $region],
            'all'
        );
    }

    public function start()
    {
        $this->consumer->consume();
    }
}

class CountryConsumer
{
    public function __construct($country)
    {
        $this->consumer = new HeadersFilterConsumer(
            ['country' => $country],
            'all'
        );
    }

    public function start()
    {
        $this->consumer->consume();
    }
}

常见问题与解决方案

问题 1:过滤条件过于复杂导致性能问题

解决方案

php
<?php

class OptimizedFilter
{
    private $indexedFields = ['level', 'source', 'priority'];
    private $cache = [];

    public function matches($message, array $conditions)
    {
        $cacheKey = $this->generateCacheKey($message, $conditions);

        if (isset($this->cache[$cacheKey])) {
            return $this->cache[$cacheKey];
        }

        $result = $this->evaluateConditions($message, $conditions);

        if (count($this->cache) > 10000) {
            $this->cache = array_slice($this->cache, -5000, null, true);
        }

        $this->cache[$cacheKey] = $result;

        return $result;
    }

    private function generateCacheKey($message, $conditions)
    {
        $data = json_decode($message->body, true);
        $keyParts = [];

        foreach ($this->indexedFields as $field) {
            if (isset($data[$field])) {
                $keyParts[] = "{$field}={$data[$field]}";
            }
        }

        $keyParts[] = md5(json_encode($conditions));

        return implode('|', $keyParts);
    }

    private function evaluateConditions($message, $conditions)
    {
        $data = json_decode($message->body, true);

        foreach ($conditions as $field => $value) {
            if (!isset($data[$field]) || $data[$field] !== $value) {
                return false;
            }
        }

        return true;
    }
}

问题 2:过滤条件变更需要重建队列

解决方案

php
<?php

class DynamicFilterManager
{
    private $connection;
    private $channel;
    private $currentFilters = [];
    private $queueName;
    private $exchangeName;

    public function updateFilters(array $newFilters)
    {
        $this->unbindOldFilters();
        $this->bindNewFilters($newFilters);
        $this->currentFilters = $newFilters;

        echo "过滤条件已更新\n";
    }

    private function unbindOldFilters()
    {
        foreach ($this->currentFilters as $filter) {
            try {
                $this->channel->queue_unbind(
                    $this->queueName,
                    $this->exchangeName,
                    $filter
                );
            } catch (Exception $e) {
                error_log("解绑失败: " . $e->getMessage());
            }
        }
    }

    private function bindNewFilters(array $filters)
    {
        foreach ($filters as $filter) {
            $this->channel->queue_bind(
                $this->queueName,
                $this->exchangeName,
                $filter
            );
        }
    }

    public function getCurrentFilters()
    {
        return $this->currentFilters;
    }
}

问题 3:消息无法匹配任何过滤条件

解决方案

php
<?php

class FallbackFilterConsumer
{
    private $connection;
    private $channel;
    private $mainQueue;
    private $fallbackQueue;
    private $exchangeName;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->setupQueues();
    }

    private function setupQueues()
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            'headers',
            false,
            true,
            false
        );

        list($this->mainQueue, ,) = $this->channel->queue_declare(
            'main_filtered_queue',
            false,
            true,
            false,
            false
        );

        list($this->fallbackQueue, ,) = $this->channel->queue_declare(
            'fallback_queue',
            false,
            true,
            false,
            false
        );

        $this->channel->queue_bind(
            $this->fallbackQueue,
            $this->exchangeName,
            '',
            false,
            new AMQPTable(['x-match' => 'all'])
        );
    }

    public function consumeFallback()
    {
        $callback = function (AMQPMessage $message) {
            echo "收到未匹配的消息: " . $message->body . "\n";

            $this->handleUnmatched($message);

            $message->ack();
        };

        $this->channel->basic_consume(
            $this->fallbackQueue,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    private function handleUnmatched(AMQPMessage $message)
    {
        error_log("未匹配消息: " . $message->body);
    }
}

最佳实践建议

1. 合理设计过滤条件

php
<?php

class FilterDesignGuidelines
{
    public static function validateFilter(array $filter)
    {
        if (empty($filter)) {
            throw new InvalidArgumentException("过滤条件不能为空");
        }

        if (count($filter) > 10) {
            throw new InvalidArgumentException("过滤条件过多,建议不超过10个");
        }

        foreach ($filter as $key => $value) {
            if (!is_string($key) || empty($key)) {
                throw new InvalidArgumentException("过滤键必须是非空字符串");
            }

            if (!is_scalar($value)) {
                throw new InvalidArgumentException("过滤值必须是标量类型");
            }
        }

        return true;
    }

    public static function suggestFilterType(array $filter)
    {
        $count = count($filter);

        if ($count === 1) {
            return 'direct';
        } elseif ($count <= 3) {
            return 'headers';
        } else {
            return 'application_layer';
        }
    }
}

2. 过滤条件监控

php
<?php

class FilterMonitor
{
    private $stats = [];

    public function recordMatch($filterName, $matched)
    {
        if (!isset($this->stats[$filterName])) {
            $this->stats[$filterName] = [
                'total' => 0,
                'matched' => 0,
                'unmatched' => 0
            ];
        }

        $this->stats[$filterName]['total']++;

        if ($matched) {
            $this->stats[$filterName]['matched']++;
        } else {
            $this->stats[$filterName]['unmatched']++;
        }
    }

    public function getMatchRate($filterName)
    {
        if (!isset($this->stats[$filterName])) {
            return 0;
        }

        $total = $this->stats[$filterName]['total'];

        if ($total === 0) {
            return 0;
        }

        return $this->stats[$filterName]['matched'] / $total;
    }

    public function getStats()
    {
        return $this->stats;
    }

    public function getLowMatchFilters($threshold = 0.1)
    {
        $lowMatch = [];

        foreach ($this->stats as $filterName => $data) {
            $rate = $this->getMatchRate($filterName);

            if ($rate < $threshold) {
                $lowMatch[$filterName] = [
                    'match_rate' => $rate,
                    'stats' => $data
                ];
            }
        }

        return $lowMatch;
    }
}

3. 过滤条件版本管理

php
<?php

class VersionedFilter
{
    private $version;
    private $filters = [];

    public function __construct($version = 'v1')
    {
        $this->version = $version;
    }

    public function addFilter($name, array $conditions, $version = null)
    {
        $version = $version ?? $this->version;

        if (!isset($this->filters[$version])) {
            $this->filters[$version] = [];
        }

        $this->filters[$version][$name] = $conditions;
    }

    public function getFilter($name, $version = null)
    {
        $version = $version ?? $this->version;

        if (!isset($this->filters[$version][$name])) {
            throw new RuntimeException("过滤器不存在: {$name}@{$version}");
        }

        return $this->filters[$version][$name];
    }

    public function migrate($fromVersion, $toVersion)
    {
        if (!isset($this->filters[$fromVersion])) {
            throw new RuntimeException("源版本不存在: {$fromVersion}");
        }

        $this->filters[$toVersion] = $this->filters[$fromVersion];
        $this->version = $toVersion;

        echo "过滤条件已从 {$fromVersion} 迁移到 {$toVersion}\n";
    }
}

相关链接