Appearance
消息去重
概述
在分布式消息系统中,由于网络波动、服务重启等原因,消息可能会被重复发送或消费。消息去重是确保消息只被处理一次的重要机制。
核心原理
消息重复产生的原因
mermaid
graph TD
subgraph 生产者重复
P1[网络超时] --> R1[生产者重试]
P2[服务重启] --> R1
R1 --> D1[重复消息]
end
subgraph 消费者重复
C1[处理超时] --> R2[消息重新投递]
C2[确认失败] --> R2
R2 --> D2[重复消费]
end
style D1 fill:#FF6B6B
style D2 fill:#FF6B6B去重策略
mermaid
graph LR
subgraph 去重策略
M[消息] --> ID[唯一标识]
ID --> S[存储检查]
S --> E{是否存在?}
E -->|是| SKIP[跳过处理]
E -->|否| PROC[处理消息]
PROC --> STORE[存储标识]
end去重方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Redis SET | 性能高 | 依赖 Redis | 高并发场景 |
| 数据库唯一索引 | 可靠性高 | 性能较低 | 低并发场景 |
| 消息 ID 表 | 灵活 | 需要维护 | 通用场景 |
| Bloom Filter | 内存占用小 | 有误判率 | 海量数据 |
PHP 代码示例
基于 Redis 的去重
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RedisDeduplicator
{
private $redis;
private $ttl;
public function __construct($redis, $ttl = 86400)
{
$this->redis = $redis;
$this->ttl = $ttl;
}
public function isDuplicate($messageId)
{
$key = "msg:dedup:{$messageId}";
// 使用 SETNX 原子操作
$result = $this->redis->setnx($key, 1);
if ($result) {
// 设置过期时间
$this->redis->expire($key, $this->ttl);
return false; // 不是重复消息
}
return true; // 是重复消息
}
public function markProcessed($messageId)
{
$key = "msg:dedup:{$messageId}";
$this->redis->setex($key, $this->ttl, 1);
}
}
// 使用示例
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$deduplicator = new RedisDeduplicator($redis);
$queueName = 'dedup-queue';
$channel->queue_declare($queueName, false, true, false, false);
$callback = function (AMQPMessage $msg) use ($deduplicator) {
$data = json_decode($msg->getBody(), true);
$messageId = $data['message_id'] ?? null;
if (!$messageId) {
echo "消息缺少 ID,跳过\n";
$msg->ack();
return;
}
// 检查是否重复
if ($deduplicator->isDuplicate($messageId)) {
echo "重复消息,跳过: {$messageId}\n";
$msg->ack();
return;
}
try {
// 处理消息
processMessage($data);
// 标记为已处理
$deduplicator->markProcessed($messageId);
$msg->ack();
echo "消息处理成功: {$messageId}\n";
} catch (Exception $e) {
echo "处理失败: " . $e->getMessage() . "\n";
$msg->nack(true);
}
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
function processMessage($data)
{
// 业务处理逻辑
}
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();基于数据库的去重
php
<?php
use PhpAmqpLib\Message\AMQPMessage;
class DatabaseDeduplicator
{
private $pdo;
private $tableName;
public function __construct($pdo, $tableName = 'message_dedup')
{
$this->pdo = $pdo;
$this->tableName = $tableName;
$this->createTable();
}
private function createTable()
{
$sql = "CREATE TABLE IF NOT EXISTS {$this->tableName} (
message_id VARCHAR(128) PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TINYINT DEFAULT 1
)";
$this->pdo->exec($sql);
}
public function isDuplicate($messageId)
{
$sql = "SELECT COUNT(*) FROM {$this->tableName} WHERE message_id = ?";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([$messageId]);
return $stmt->fetchColumn() > 0;
}
public function markProcessed($messageId)
{
try {
$sql = "INSERT INTO {$this->tableName} (message_id) VALUES (?)";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([$messageId]);
return true;
} catch (PDOException $e) {
// 唯一键冲突,说明已存在
if ($e->getCode() == 23000) {
return false;
}
throw $e;
}
}
public function cleanup($days = 7)
{
$sql = "DELETE FROM {$this->tableName} WHERE created_at < DATE_SUB(NOW(), INTERVAL ? DAY)";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([$days]);
}
}生产者端去重
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class DedupMessageProducer
{
private $channel;
private $redis;
public function __construct($channel, $redis)
{
$this->channel = $channel;
$this->redis = $redis;
}
public function sendWithDedup($exchange, $routingKey, $data, $messageId = null)
{
// 生成或使用提供的消息 ID
$messageId = $messageId ?? $this->generateMessageId($data);
// 检查是否已发送
$key = "msg:sent:{$messageId}";
if ($this->redis->exists($key)) {
echo "消息已发送过,跳过: {$messageId}\n";
return $messageId;
}
// 发送消息
$message = new AMQPMessage(
json_encode(array_merge($data, ['message_id' => $messageId])),
[
'content_type' => 'application/json',
'message_id' => $messageId,
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$this->channel->basic_publish($message, $exchange, $routingKey);
// 标记为已发送
$this->redis->setex($key, 86400, 1);
echo "消息已发送: {$messageId}\n";
return $messageId;
}
private function generateMessageId($data)
{
// 基于数据内容生成唯一 ID
return md5(json_encode($data) . microtime(true));
}
}完整的去重解决方案
php
<?php
use PhpAmqpLib\Message\AMQPMessage;
class MessageDeduplicationService
{
private $redis;
private $pdo;
private $config;
public function __construct($redis, $pdo, $config = [])
{
$this->redis = $redis;
$this->pdo = $pdo;
$this->config = array_merge([
'redis_ttl' => 86400,
'db_cleanup_days' => 7,
'use_double_check' => true
], $config);
}
public function checkAndProcess(AMQPMessage $msg, callable $processor)
{
$data = json_decode($msg->getBody(), true);
$messageId = $this->extractMessageId($msg, $data);
if (!$messageId) {
throw new InvalidArgumentException('消息缺少唯一标识');
}
// 第一层:Redis 快速检查
if ($this->isInRedis($messageId)) {
echo "[Redis] 重复消息: {$messageId}\n";
$msg->ack();
return false;
}
try {
// 执行业务处理
$result = $processor($data);
// 标记为已处理(先 Redis 后数据库)
$this->markInRedis($messageId);
$this->markInDatabase($messageId);
$msg->ack();
echo "消息处理成功: {$messageId}\n";
return $result;
} catch (Exception $e) {
echo "处理失败: " . $e->getMessage() . "\n";
$msg->nack(true);
throw $e;
}
}
private function extractMessageId($msg, $data)
{
// 优先使用消息属性中的 message_id
$messageId = $msg->get('message_id');
// 其次使用消息体中的 message_id
if (!$messageId && isset($data['message_id'])) {
$messageId = $data['message_id'];
}
// 最后使用业务 ID 组合
if (!$messageId && isset($data['order_id'])) {
$messageId = "order:{$data['order_id']}";
}
return $messageId;
}
private function isInRedis($messageId)
{
return $this->redis->exists("msg:processed:{$messageId}");
}
private function markInRedis($messageId)
{
$this->redis->setex(
"msg:processed:{$messageId}",
$this->config['redis_ttl'],
1
);
}
private function markInDatabase($messageId)
{
try {
$sql = "INSERT INTO message_dedup (message_id) VALUES (?)";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([$messageId]);
} catch (PDOException $e) {
// 忽略唯一键冲突
if ($e->getCode() != 23000) {
throw $e;
}
}
}
}实际应用场景
1. 订单处理去重
php
<?php
class OrderDeduplicationHandler
{
private $dedupService;
public function __construct($redis, $pdo)
{
$this->dedupService = new MessageDeduplicationService($redis, $pdo);
}
public function handleOrder(AMQPMessage $msg)
{
$this->dedupService->checkAndProcess($msg, function ($data) {
$orderId = $data['order_id'];
// 检查订单状态,确保幂等性
$order = $this->getOrder($orderId);
if ($order['status'] !== 'pending') {
echo "订单 {$orderId} 已处理,状态: {$order['status']}\n";
return;
}
// 处理订单
$this->processOrder($order);
});
}
private function getOrder($orderId)
{
// 获取订单
}
private function processOrder($order)
{
// 处理订单逻辑
}
}2. 支付回调去重
php
<?php
class PaymentCallbackDeduplication
{
private $redis;
public function __construct($redis)
{
$this->redis = $redis;
}
public function handleCallback($paymentId, $callbackData)
{
$messageId = "payment:callback:{$paymentId}";
$key = "payment:callback:processed:{$messageId}";
// 使用 Redis 原子操作
$result = $this->redis->setnx($key, json_encode([
'data' => $callbackData,
'time' => time()
]));
if (!$result) {
echo "支付回调已处理: {$paymentId}\n";
return $this->redis->get($key);
}
// 设置过期时间
$this->redis->expire($key, 86400 * 7); // 7天
// 处理支付回调
$result = $this->processPayment($paymentId, $callbackData);
return $result;
}
private function processPayment($paymentId, $data)
{
// 处理支付逻辑
}
}3. 分布式任务去重
php
<?php
class DistributedTaskDeduplication
{
private $redis;
public function __construct($redis)
{
$this->redis = $redis;
}
public function submitTask($taskType, $taskData)
{
// 生成任务唯一标识
$taskId = $this->generateTaskId($taskType, $taskData);
$key = "task:submitted:{$taskId}";
// 原子检查并设置
$lua = "
if redis.call('exists', KEYS[1]) == 1 then
return 0
end
redis.call('setex', KEYS[1], ARGV[1], ARGV[2])
return 1
";
$result = $this->redis->eval($lua, [$key, 3600, json_encode($taskData)], 1);
if (!$result) {
echo "任务已存在: {$taskId}\n";
return null;
}
// 提交任务
return $this->doSubmitTask($taskId, $taskType, $taskData);
}
private function generateTaskId($taskType, $taskData)
{
return md5($taskType . ':' . json_encode($taskData));
}
private function doSubmitTask($taskId, $taskType, $taskData)
{
// 实际提交任务
}
}常见问题与解决方案
问题 1: Redis 宕机导致去重失效
解决方案:
php
<?php
// 使用双重检查:Redis + 数据库
class ReliableDeduplicator
{
public function isDuplicate($messageId)
{
// 第一层:Redis 快速检查
try {
if ($this->redis->exists("msg:{$messageId}")) {
return true;
}
} catch (Exception $e) {
// Redis 异常,降级到数据库检查
error_log("Redis 异常: " . $e->getMessage());
}
// 第二层:数据库可靠检查
return $this->isInDatabase($messageId);
}
}问题 2: 消息 ID 生成重复
解决方案:
php
<?php
// 使用更可靠的 ID 生成方式
class MessageIdGenerator
{
public static function generate($data = null)
{
$parts = [
date('YmdHis'),
gethostname(),
getmypid(),
uniqid(),
$data ? md5(json_encode($data)) : ''
];
return implode('-', array_filter($parts));
}
public static function generateFromBusiness($entity, $id, $action)
{
return "{$entity}:{$id}:{$action}";
}
}问题 3: 去重数据膨胀
解决方案:
php
<?php
// 定期清理过期数据
class DedupCleanupService
{
public function cleanup()
{
// Redis 自动过期
// 数据库定期清理
$sql = "DELETE FROM message_dedup WHERE created_at < DATE_SUB(NOW(), INTERVAL 7 DAY)";
$this->pdo->exec($sql);
}
}最佳实践建议
- 生成唯一 ID: 每条消息都有唯一标识
- 双重检查: Redis + 数据库确保可靠性
- 设置过期: 定期清理去重数据
- 幂等设计: 业务逻辑本身支持幂等
- 监控告警: 监控重复消息比例
