Appearance
消息过滤
概述
消息过滤是 RabbitMQ 中实现消息选择性消费的重要机制。通过设置过滤条件,消费者可以只接收感兴趣的消息,避免处理无关消息,提高系统效率和资源利用率。
核心知识点
消息过滤方式
RabbitMQ 提供多种消息过滤方式:
| 方式 | 说明 | 适用场景 |
|---|---|---|
| Routing Key 过滤 | 基于路由键精确或模式匹配 | 日志级别、消息类型分类 |
| Headers 过滤 | 基于消息头部属性匹配 | 复杂条件过滤 |
| Topic 模式过滤 | 基于通配符匹配 | 多维度消息分类 |
| 应用层过滤 | 消费者端代码过滤 | 业务逻辑过滤 |
过滤机制架构
mermaid
graph TB
subgraph 生产者
P[消息生产者]
end
subgraph RabbitMQ
E[Exchange]
Q1[队列1<br/>过滤条件: level=error]
Q2[队列2<br/>过滤条件: level=info]
Q3[队列3<br/>过滤条件: level=*]
end
subgraph 消费者
C1[错误日志处理器]
C2[信息日志处理器]
C3[全部日志处理器]
end
P -->|level=error| E
P -->|level=info| E
P -->|level=debug| E
E -->|匹配| Q1
E -->|匹配| Q2
E -->|匹配| Q3
Q1 --> C1
Q2 --> C2
Q3 --> C3Headers 过滤原理
mermaid
sequenceDiagram
participant P as 生产者
participant E as Headers Exchange
participant Q1 as 队列A<br/>x-match: all<br/>type=log, level=error
participant Q2 as 队列B<br/>x-match: any<br/>type=log, level=info
participant C as 消费者
Note over P: 发送消息<br/>headers: type=log, level=error
P->>E: 消息
E->>Q1: 匹配成功(all条件)
E->>Q2: 不匹配(any条件需要level=info)
Q1->>C: 消费消息
Note over P: 发送消息<br/>headers: type=log, level=info
P->>E: 消息
E->>Q1: 不匹配
E->>Q2: 匹配成功(any条件)
Q2->>C: 消费消息PHP 代码示例
基于 Headers 的消息过滤
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class HeadersFilterPublisher
{
private $connection;
private $channel;
private $exchangeName = 'headers_filter_exchange';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'headers',
false,
true,
false
);
}
public function publish($message, array $headers)
{
$msg = new AMQPMessage(
json_encode([
'content' => $message,
'headers' => $headers,
'timestamp' => time()
]),
[
'content_type' => 'application/json',
'application_headers' => new AMQPTable($headers)
]
);
$this->channel->basic_publish(
$msg,
$this->exchangeName
);
echo "消息已发送,过滤条件: " . json_encode($headers) . "\n";
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}
class HeadersFilterConsumer
{
private $connection;
private $channel;
private $exchangeName = 'headers_filter_exchange';
private $queueName;
public function __construct(array $filterHeaders, $xMatch = 'all')
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'headers',
false,
true,
false
);
list($this->queueName, ,) = $this->channel->queue_declare(
'',
false,
false,
true,
true
);
$bindHeaders = array_merge($filterHeaders, ['x-match' => $xMatch]);
$this->channel->queue_bind(
$this->queueName,
$this->exchangeName,
'',
false,
new AMQPTable($bindHeaders)
);
echo "消费者已创建,过滤条件: " . json_encode($filterHeaders) . " (x-match: {$xMatch})\n";
}
public function consume()
{
$callback = function (AMQPMessage $message) {
$body = json_decode($message->body, true);
$headers = $message->get('application_headers');
echo sprintf(
" [%s] 收到消息: %s\n 头部: %s\n",
date('H:i:s'),
$body['content'],
json_encode($headers ? $headers->getNativeData() : [])
);
$message->ack();
};
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}基于 Topic 的消息过滤
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class TopicFilterPublisher
{
private $connection;
private $channel;
private $exchangeName = 'topic_filter_exchange';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'topic',
false,
true,
false
);
}
public function publish($routingKey, $message)
{
$msg = new AMQPMessage(
json_encode([
'content' => $message,
'routing_key' => $routingKey,
'timestamp' => time()
]),
['content_type' => 'application/json']
);
$this->channel->basic_publish(
$msg,
$this->exchangeName,
$routingKey
);
echo "消息已发送,路由键: {$routingKey}\n";
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}
class TopicFilterConsumer
{
private $connection;
private $channel;
private $exchangeName = 'topic_filter_exchange';
private $queueName;
private $bindingKeys;
public function __construct(array $bindingKeys)
{
$this->bindingKeys = $bindingKeys;
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'topic',
false,
true,
false
);
list($this->queueName, ,) = $this->channel->queue_declare(
'',
false,
false,
true,
true
);
foreach ($bindingKeys as $bindingKey) {
$this->channel->queue_bind(
$this->queueName,
$this->exchangeName,
$bindingKey
);
}
echo "消费者已创建,绑定模式: " . implode(', ', $bindingKeys) . "\n";
}
public function consume()
{
$callback = function (AMQPMessage $message) {
$body = json_decode($message->body, true);
$routingKey = $message->delivery_info['routing_key'];
echo sprintf(
" [%s] 路由键: %s, 消息: %s\n",
date('H:i:s'),
$routingKey,
$body['content']
);
$message->ack();
};
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}完整示例:日志过滤系统
php
<?php
class LogFilterSystem
{
private $publisher;
public function __construct()
{
$this->publisher = new HeadersFilterPublisher();
}
public function sendLog($level, $source, $message, array $extra = [])
{
$headers = array_merge([
'level' => $level,
'source' => $source
], $extra);
$this->publisher->publish($message, $headers);
}
}
class ErrorLogConsumer
{
public function __construct()
{
$this->consumer = new HeadersFilterConsumer(
['level' => 'error'],
'all'
);
}
public function start()
{
echo "错误日志消费者启动,只接收 level=error 的消息\n";
$this->consumer->consume();
}
}
class SystemLogConsumer
{
public function __construct()
{
$this->consumer = new HeadersFilterConsumer(
['source' => 'system'],
'all'
);
}
public function start()
{
echo "系统日志消费者启动,只接收 source=system 的消息\n";
$this->consumer->consume();
}
}
class CriticalLogConsumer
{
public function __construct()
{
$this->consumer = new HeadersFilterConsumer(
['level' => 'error', 'priority' => 'high'],
'all'
);
}
public function start()
{
echo "关键日志消费者启动,接收 level=error 且 priority=high 的消息\n";
$this->consumer->consume();
}
}
class AnyErrorOrWarningConsumer
{
public function __construct()
{
$this->consumer = new HeadersFilterConsumer(
['level' => 'error'],
'any'
);
}
public function start()
{
echo "任意错误或警告消费者启动\n";
$this->consumer->consume();
}
}应用层消息过滤
php
<?php
class ApplicationLayerFilter
{
private $filters = [];
public function addFilter($name, callable $filter)
{
$this->filters[$name] = $filter;
}
public function matches($message, $filterName = null)
{
if ($filterName !== null) {
if (!isset($this->filters[$filterName])) {
return false;
}
return call_user_func($this->filters[$filterName], $message);
}
foreach ($this->filters as $filter) {
if (call_user_func($filter, $message)) {
return true;
}
}
return false;
}
public function filter(array $messages)
{
return array_filter($messages, function ($message) {
return $this->matches($message);
});
}
}
class SmartLogConsumer
{
private $connection;
private $channel;
private $filter;
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->filter = new ApplicationLayerFilter();
$this->setupFilters();
}
private function setupFilters()
{
$this->filter->addFilter('high_priority', function ($message) {
$data = json_decode($message->body, true);
return isset($data['priority']) && $data['priority'] === 'high';
});
$this->filter->addFilter('error_only', function ($message) {
$data = json_decode($message->body, true);
return isset($data['level']) && $data['level'] === 'error';
});
$this->filter->addFilter('recent_only', function ($message) {
$data = json_decode($message->body, true);
if (!isset($data['timestamp'])) {
return false;
}
return (time() - $data['timestamp']) < 3600;
});
}
public function consume($queueName, $filterName = null)
{
$callback = function (AMQPMessage $message) use ($filterName) {
if ($this->filter->matches($message, $filterName)) {
$body = json_decode($message->body, true);
echo "处理消息: " . json_encode($body, JSON_UNESCAPED_UNICODE) . "\n";
} else {
echo "跳过不匹配的消息\n";
}
$message->ack();
};
$this->channel->basic_consume(
$queueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
}复合条件过滤
php
<?php
class CompositeFilter
{
private $conditions = [];
public function addCondition($field, $operator, $value)
{
$this->conditions[] = [
'field' => $field,
'operator' => $operator,
'value' => $value
];
}
public function evaluate(array $data)
{
foreach ($this->conditions as $condition) {
if (!$this->evaluateCondition($data, $condition)) {
return false;
}
}
return true;
}
private function evaluateCondition($data, $condition)
{
$field = $condition['field'];
$operator = $condition['operator'];
$value = $condition['value'];
if (!isset($data[$field])) {
return false;
}
$fieldValue = $data[$field];
switch ($operator) {
case '=':
return $fieldValue === $value;
case '!=':
return $fieldValue !== $value;
case '>':
return $fieldValue > $value;
case '>=':
return $fieldValue >= $value;
case '<':
return $fieldValue < $value;
case '<=':
return $fieldValue <= $value;
case 'in':
return in_array($fieldValue, (array)$value);
case 'not_in':
return !in_array($fieldValue, (array)$value);
case 'contains':
return strpos($fieldValue, $value) !== false;
case 'regex':
return preg_match($value, $fieldValue) === 1;
default:
return false;
}
}
}
class FilterBuilder
{
public static function create()
{
return new CompositeFilter();
}
public static function level($level)
{
$filter = new CompositeFilter();
$filter->addCondition('level', '=', $level);
return $filter;
}
public static function levelIn(array $levels)
{
$filter = new CompositeFilter();
$filter->addCondition('level', 'in', $levels);
return $filter;
}
public static function sourceAndLevel($source, $level)
{
$filter = new CompositeFilter();
$filter->addCondition('source', '=', $source);
$filter->addCondition('level', '=', $level);
return $filter;
}
}实际应用场景
1. 多租户消息过滤
php
<?php
class TenantMessageFilter
{
private $publisher;
public function __construct()
{
$this->publisher = new HeadersFilterPublisher();
}
public function sendToTenant($tenantId, $messageType, $message)
{
$this->publisher->publish($message, [
'tenant_id' => $tenantId,
'message_type' => $messageType
]);
}
public function sendToAllTenants($messageType, $message)
{
$this->publisher->publish($message, [
'message_type' => $messageType,
'broadcast' => true
]);
}
}
class TenantConsumer
{
private $tenantId;
public function __construct($tenantId)
{
$this->tenantId = $tenantId;
$this->consumer = new HeadersFilterConsumer(
['tenant_id' => $tenantId],
'all'
);
}
public function start()
{
echo "租户 {$this->tenantId} 消费者启动\n";
$this->consumer->consume();
}
}
class BroadcastConsumer
{
public function __construct()
{
$this->consumer = new HeadersFilterConsumer(
['broadcast' => true],
'all'
);
}
public function start()
{
echo "广播消息消费者启动\n";
$this->consumer->consume();
}
}2. 优先级消息过滤
php
<?php
class PriorityMessageFilter
{
private $publisher;
const PRIORITY_LOW = 'low';
const PRIORITY_NORMAL = 'normal';
const PRIORITY_HIGH = 'high';
const PRIORITY_CRITICAL = 'critical';
public function __construct()
{
$this->publisher = new HeadersFilterPublisher();
}
public function send($message, $priority, $category = 'general')
{
$this->publisher->publish($message, [
'priority' => $priority,
'category' => $category
]);
}
public function sendCritical($message, $category = 'general')
{
$this->send($message, self::PRIORITY_CRITICAL, $category);
}
public function sendHigh($message, $category = 'general')
{
$this->send($message, self::PRIORITY_HIGH, $category);
}
}
class CriticalMessageHandler
{
public function __construct()
{
$this->consumer = new HeadersFilterConsumer(
['priority' => PriorityMessageFilter::PRIORITY_CRITICAL],
'all'
);
}
public function start()
{
echo "关键消息处理器启动\n";
$this->consumer->consume();
}
}
class HighPriorityHandler
{
public function __construct()
{
$this->consumer = new HeadersFilterConsumer(
['priority' => PriorityMessageFilter::PRIORITY_HIGH],
'all'
);
}
public function start()
{
echo "高优先级消息处理器启动\n";
$this->consumer->consume();
}
}3. 地理位置消息过滤
php
<?php
class GeoMessageFilter
{
private $publisher;
public function __construct()
{
$this->publisher = new HeadersFilterPublisher();
}
public function sendToRegion($region, $country, $message)
{
$this->publisher->publish($message, [
'region' => $region,
'country' => $country
]);
}
public function sendToCountry($country, $message)
{
$this->publisher->publish($message, [
'country' => $country
]);
}
}
class RegionConsumer
{
public function __construct($region)
{
$this->consumer = new HeadersFilterConsumer(
['region' => $region],
'all'
);
}
public function start()
{
$this->consumer->consume();
}
}
class CountryConsumer
{
public function __construct($country)
{
$this->consumer = new HeadersFilterConsumer(
['country' => $country],
'all'
);
}
public function start()
{
$this->consumer->consume();
}
}常见问题与解决方案
问题 1:过滤条件过于复杂导致性能问题
解决方案:
php
<?php
class OptimizedFilter
{
private $indexedFields = ['level', 'source', 'priority'];
private $cache = [];
public function matches($message, array $conditions)
{
$cacheKey = $this->generateCacheKey($message, $conditions);
if (isset($this->cache[$cacheKey])) {
return $this->cache[$cacheKey];
}
$result = $this->evaluateConditions($message, $conditions);
if (count($this->cache) > 10000) {
$this->cache = array_slice($this->cache, -5000, null, true);
}
$this->cache[$cacheKey] = $result;
return $result;
}
private function generateCacheKey($message, $conditions)
{
$data = json_decode($message->body, true);
$keyParts = [];
foreach ($this->indexedFields as $field) {
if (isset($data[$field])) {
$keyParts[] = "{$field}={$data[$field]}";
}
}
$keyParts[] = md5(json_encode($conditions));
return implode('|', $keyParts);
}
private function evaluateConditions($message, $conditions)
{
$data = json_decode($message->body, true);
foreach ($conditions as $field => $value) {
if (!isset($data[$field]) || $data[$field] !== $value) {
return false;
}
}
return true;
}
}问题 2:过滤条件变更需要重建队列
解决方案:
php
<?php
class DynamicFilterManager
{
private $connection;
private $channel;
private $currentFilters = [];
private $queueName;
private $exchangeName;
public function updateFilters(array $newFilters)
{
$this->unbindOldFilters();
$this->bindNewFilters($newFilters);
$this->currentFilters = $newFilters;
echo "过滤条件已更新\n";
}
private function unbindOldFilters()
{
foreach ($this->currentFilters as $filter) {
try {
$this->channel->queue_unbind(
$this->queueName,
$this->exchangeName,
$filter
);
} catch (Exception $e) {
error_log("解绑失败: " . $e->getMessage());
}
}
}
private function bindNewFilters(array $filters)
{
foreach ($filters as $filter) {
$this->channel->queue_bind(
$this->queueName,
$this->exchangeName,
$filter
);
}
}
public function getCurrentFilters()
{
return $this->currentFilters;
}
}问题 3:消息无法匹配任何过滤条件
解决方案:
php
<?php
class FallbackFilterConsumer
{
private $connection;
private $channel;
private $mainQueue;
private $fallbackQueue;
private $exchangeName;
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->setupQueues();
}
private function setupQueues()
{
$this->channel->exchange_declare(
$this->exchangeName,
'headers',
false,
true,
false
);
list($this->mainQueue, ,) = $this->channel->queue_declare(
'main_filtered_queue',
false,
true,
false,
false
);
list($this->fallbackQueue, ,) = $this->channel->queue_declare(
'fallback_queue',
false,
true,
false,
false
);
$this->channel->queue_bind(
$this->fallbackQueue,
$this->exchangeName,
'',
false,
new AMQPTable(['x-match' => 'all'])
);
}
public function consumeFallback()
{
$callback = function (AMQPMessage $message) {
echo "收到未匹配的消息: " . $message->body . "\n";
$this->handleUnmatched($message);
$message->ack();
};
$this->channel->basic_consume(
$this->fallbackQueue,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
private function handleUnmatched(AMQPMessage $message)
{
error_log("未匹配消息: " . $message->body);
}
}最佳实践建议
1. 合理设计过滤条件
php
<?php
class FilterDesignGuidelines
{
public static function validateFilter(array $filter)
{
if (empty($filter)) {
throw new InvalidArgumentException("过滤条件不能为空");
}
if (count($filter) > 10) {
throw new InvalidArgumentException("过滤条件过多,建议不超过10个");
}
foreach ($filter as $key => $value) {
if (!is_string($key) || empty($key)) {
throw new InvalidArgumentException("过滤键必须是非空字符串");
}
if (!is_scalar($value)) {
throw new InvalidArgumentException("过滤值必须是标量类型");
}
}
return true;
}
public static function suggestFilterType(array $filter)
{
$count = count($filter);
if ($count === 1) {
return 'direct';
} elseif ($count <= 3) {
return 'headers';
} else {
return 'application_layer';
}
}
}2. 过滤条件监控
php
<?php
class FilterMonitor
{
private $stats = [];
public function recordMatch($filterName, $matched)
{
if (!isset($this->stats[$filterName])) {
$this->stats[$filterName] = [
'total' => 0,
'matched' => 0,
'unmatched' => 0
];
}
$this->stats[$filterName]['total']++;
if ($matched) {
$this->stats[$filterName]['matched']++;
} else {
$this->stats[$filterName]['unmatched']++;
}
}
public function getMatchRate($filterName)
{
if (!isset($this->stats[$filterName])) {
return 0;
}
$total = $this->stats[$filterName]['total'];
if ($total === 0) {
return 0;
}
return $this->stats[$filterName]['matched'] / $total;
}
public function getStats()
{
return $this->stats;
}
public function getLowMatchFilters($threshold = 0.1)
{
$lowMatch = [];
foreach ($this->stats as $filterName => $data) {
$rate = $this->getMatchRate($filterName);
if ($rate < $threshold) {
$lowMatch[$filterName] = [
'match_rate' => $rate,
'stats' => $data
];
}
}
return $lowMatch;
}
}3. 过滤条件版本管理
php
<?php
class VersionedFilter
{
private $version;
private $filters = [];
public function __construct($version = 'v1')
{
$this->version = $version;
}
public function addFilter($name, array $conditions, $version = null)
{
$version = $version ?? $this->version;
if (!isset($this->filters[$version])) {
$this->filters[$version] = [];
}
$this->filters[$version][$name] = $conditions;
}
public function getFilter($name, $version = null)
{
$version = $version ?? $this->version;
if (!isset($this->filters[$version][$name])) {
throw new RuntimeException("过滤器不存在: {$name}@{$version}");
}
return $this->filters[$version][$name];
}
public function migrate($fromVersion, $toVersion)
{
if (!isset($this->filters[$fromVersion])) {
throw new RuntimeException("源版本不存在: {$fromVersion}");
}
$this->filters[$toVersion] = $this->filters[$fromVersion];
$this->version = $toVersion;
echo "过滤条件已从 {$fromVersion} 迁移到 {$toVersion}\n";
}
}