Skip to content

消息去重

概述

在分布式消息系统中,由于网络波动、服务重启等原因,消息可能会被重复发送或消费。消息去重是确保消息只被处理一次的重要机制。

核心原理

消息重复产生的原因

mermaid
graph TD
    subgraph 生产者重复
        P1[网络超时] --> R1[生产者重试]
        P2[服务重启] --> R1
        R1 --> D1[重复消息]
    end
    
    subgraph 消费者重复
        C1[处理超时] --> R2[消息重新投递]
        C2[确认失败] --> R2
        R2 --> D2[重复消费]
    end
    
    style D1 fill:#FF6B6B
    style D2 fill:#FF6B6B

去重策略

mermaid
graph LR
    subgraph 去重策略
        M[消息] --> ID[唯一标识]
        ID --> S[存储检查]
        S --> E{是否存在?}
        E -->|是| SKIP[跳过处理]
        E -->|否| PROC[处理消息]
        PROC --> STORE[存储标识]
    end

去重方案对比

方案优点缺点适用场景
Redis SET性能高依赖 Redis高并发场景
数据库唯一索引可靠性高性能较低低并发场景
消息 ID 表灵活需要维护通用场景
Bloom Filter内存占用小有误判率海量数据

PHP 代码示例

基于 Redis 的去重

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RedisDeduplicator
{
    private $redis;
    private $ttl;
    
    public function __construct($redis, $ttl = 86400)
    {
        $this->redis = $redis;
        $this->ttl = $ttl;
    }
    
    public function isDuplicate($messageId)
    {
        $key = "msg:dedup:{$messageId}";
        
        // 使用 SETNX 原子操作
        $result = $this->redis->setnx($key, 1);
        
        if ($result) {
            // 设置过期时间
            $this->redis->expire($key, $this->ttl);
            return false;  // 不是重复消息
        }
        
        return true;  // 是重复消息
    }
    
    public function markProcessed($messageId)
    {
        $key = "msg:dedup:{$messageId}";
        $this->redis->setex($key, $this->ttl, 1);
    }
}

// 使用示例
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$deduplicator = new RedisDeduplicator($redis);

$queueName = 'dedup-queue';
$channel->queue_declare($queueName, false, true, false, false);

$callback = function (AMQPMessage $msg) use ($deduplicator) {
    $data = json_decode($msg->getBody(), true);
    $messageId = $data['message_id'] ?? null;
    
    if (!$messageId) {
        echo "消息缺少 ID,跳过\n";
        $msg->ack();
        return;
    }
    
    // 检查是否重复
    if ($deduplicator->isDuplicate($messageId)) {
        echo "重复消息,跳过: {$messageId}\n";
        $msg->ack();
        return;
    }
    
    try {
        // 处理消息
        processMessage($data);
        
        // 标记为已处理
        $deduplicator->markProcessed($messageId);
        
        $msg->ack();
        echo "消息处理成功: {$messageId}\n";
        
    } catch (Exception $e) {
        echo "处理失败: " . $e->getMessage() . "\n";
        $msg->nack(true);
    }
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

function processMessage($data)
{
    // 业务处理逻辑
}

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

基于数据库的去重

php
<?php

use PhpAmqpLib\Message\AMQPMessage;

class DatabaseDeduplicator
{
    private $pdo;
    private $tableName;
    
    public function __construct($pdo, $tableName = 'message_dedup')
    {
        $this->pdo = $pdo;
        $this->tableName = $tableName;
        $this->createTable();
    }
    
    private function createTable()
    {
        $sql = "CREATE TABLE IF NOT EXISTS {$this->tableName} (
            message_id VARCHAR(128) PRIMARY KEY,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            status TINYINT DEFAULT 1
        )";
        
        $this->pdo->exec($sql);
    }
    
    public function isDuplicate($messageId)
    {
        $sql = "SELECT COUNT(*) FROM {$this->tableName} WHERE message_id = ?";
        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([$messageId]);
        
        return $stmt->fetchColumn() > 0;
    }
    
    public function markProcessed($messageId)
    {
        try {
            $sql = "INSERT INTO {$this->tableName} (message_id) VALUES (?)";
            $stmt = $this->pdo->prepare($sql);
            $stmt->execute([$messageId]);
            return true;
        } catch (PDOException $e) {
            // 唯一键冲突,说明已存在
            if ($e->getCode() == 23000) {
                return false;
            }
            throw $e;
        }
    }
    
    public function cleanup($days = 7)
    {
        $sql = "DELETE FROM {$this->tableName} WHERE created_at < DATE_SUB(NOW(), INTERVAL ? DAY)";
        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([$days]);
    }
}

生产者端去重

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class DedupMessageProducer
{
    private $channel;
    private $redis;
    
    public function __construct($channel, $redis)
    {
        $this->channel = $channel;
        $this->redis = $redis;
    }
    
    public function sendWithDedup($exchange, $routingKey, $data, $messageId = null)
    {
        // 生成或使用提供的消息 ID
        $messageId = $messageId ?? $this->generateMessageId($data);
        
        // 检查是否已发送
        $key = "msg:sent:{$messageId}";
        if ($this->redis->exists($key)) {
            echo "消息已发送过,跳过: {$messageId}\n";
            return $messageId;
        }
        
        // 发送消息
        $message = new AMQPMessage(
            json_encode(array_merge($data, ['message_id' => $messageId])),
            [
                'content_type' => 'application/json',
                'message_id' => $messageId,
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]
        );
        
        $this->channel->basic_publish($message, $exchange, $routingKey);
        
        // 标记为已发送
        $this->redis->setex($key, 86400, 1);
        
        echo "消息已发送: {$messageId}\n";
        
        return $messageId;
    }
    
    private function generateMessageId($data)
    {
        // 基于数据内容生成唯一 ID
        return md5(json_encode($data) . microtime(true));
    }
}

完整的去重解决方案

php
<?php

use PhpAmqpLib\Message\AMQPMessage;

class MessageDeduplicationService
{
    private $redis;
    private $pdo;
    private $config;
    
    public function __construct($redis, $pdo, $config = [])
    {
        $this->redis = $redis;
        $this->pdo = $pdo;
        $this->config = array_merge([
            'redis_ttl' => 86400,
            'db_cleanup_days' => 7,
            'use_double_check' => true
        ], $config);
    }
    
    public function checkAndProcess(AMQPMessage $msg, callable $processor)
    {
        $data = json_decode($msg->getBody(), true);
        $messageId = $this->extractMessageId($msg, $data);
        
        if (!$messageId) {
            throw new InvalidArgumentException('消息缺少唯一标识');
        }
        
        // 第一层:Redis 快速检查
        if ($this->isInRedis($messageId)) {
            echo "[Redis] 重复消息: {$messageId}\n";
            $msg->ack();
            return false;
        }
        
        try {
            // 执行业务处理
            $result = $processor($data);
            
            // 标记为已处理(先 Redis 后数据库)
            $this->markInRedis($messageId);
            $this->markInDatabase($messageId);
            
            $msg->ack();
            echo "消息处理成功: {$messageId}\n";
            
            return $result;
            
        } catch (Exception $e) {
            echo "处理失败: " . $e->getMessage() . "\n";
            $msg->nack(true);
            throw $e;
        }
    }
    
    private function extractMessageId($msg, $data)
    {
        // 优先使用消息属性中的 message_id
        $messageId = $msg->get('message_id');
        
        // 其次使用消息体中的 message_id
        if (!$messageId && isset($data['message_id'])) {
            $messageId = $data['message_id'];
        }
        
        // 最后使用业务 ID 组合
        if (!$messageId && isset($data['order_id'])) {
            $messageId = "order:{$data['order_id']}";
        }
        
        return $messageId;
    }
    
    private function isInRedis($messageId)
    {
        return $this->redis->exists("msg:processed:{$messageId}");
    }
    
    private function markInRedis($messageId)
    {
        $this->redis->setex(
            "msg:processed:{$messageId}",
            $this->config['redis_ttl'],
            1
        );
    }
    
    private function markInDatabase($messageId)
    {
        try {
            $sql = "INSERT INTO message_dedup (message_id) VALUES (?)";
            $stmt = $this->pdo->prepare($sql);
            $stmt->execute([$messageId]);
        } catch (PDOException $e) {
            // 忽略唯一键冲突
            if ($e->getCode() != 23000) {
                throw $e;
            }
        }
    }
}

实际应用场景

1. 订单处理去重

php
<?php

class OrderDeduplicationHandler
{
    private $dedupService;
    
    public function __construct($redis, $pdo)
    {
        $this->dedupService = new MessageDeduplicationService($redis, $pdo);
    }
    
    public function handleOrder(AMQPMessage $msg)
    {
        $this->dedupService->checkAndProcess($msg, function ($data) {
            $orderId = $data['order_id'];
            
            // 检查订单状态,确保幂等性
            $order = $this->getOrder($orderId);
            
            if ($order['status'] !== 'pending') {
                echo "订单 {$orderId} 已处理,状态: {$order['status']}\n";
                return;
            }
            
            // 处理订单
            $this->processOrder($order);
        });
    }
    
    private function getOrder($orderId)
    {
        // 获取订单
    }
    
    private function processOrder($order)
    {
        // 处理订单逻辑
    }
}

2. 支付回调去重

php
<?php

class PaymentCallbackDeduplication
{
    private $redis;
    
    public function __construct($redis)
    {
        $this->redis = $redis;
    }
    
    public function handleCallback($paymentId, $callbackData)
    {
        $messageId = "payment:callback:{$paymentId}";
        $key = "payment:callback:processed:{$messageId}";
        
        // 使用 Redis 原子操作
        $result = $this->redis->setnx($key, json_encode([
            'data' => $callbackData,
            'time' => time()
        ]));
        
        if (!$result) {
            echo "支付回调已处理: {$paymentId}\n";
            return $this->redis->get($key);
        }
        
        // 设置过期时间
        $this->redis->expire($key, 86400 * 7);  // 7天
        
        // 处理支付回调
        $result = $this->processPayment($paymentId, $callbackData);
        
        return $result;
    }
    
    private function processPayment($paymentId, $data)
    {
        // 处理支付逻辑
    }
}

3. 分布式任务去重

php
<?php

class DistributedTaskDeduplication
{
    private $redis;
    
    public function __construct($redis)
    {
        $this->redis = $redis;
    }
    
    public function submitTask($taskType, $taskData)
    {
        // 生成任务唯一标识
        $taskId = $this->generateTaskId($taskType, $taskData);
        $key = "task:submitted:{$taskId}";
        
        // 原子检查并设置
        $lua = "
            if redis.call('exists', KEYS[1]) == 1 then
                return 0
            end
            redis.call('setex', KEYS[1], ARGV[1], ARGV[2])
            return 1
        ";
        
        $result = $this->redis->eval($lua, [$key, 3600, json_encode($taskData)], 1);
        
        if (!$result) {
            echo "任务已存在: {$taskId}\n";
            return null;
        }
        
        // 提交任务
        return $this->doSubmitTask($taskId, $taskType, $taskData);
    }
    
    private function generateTaskId($taskType, $taskData)
    {
        return md5($taskType . ':' . json_encode($taskData));
    }
    
    private function doSubmitTask($taskId, $taskType, $taskData)
    {
        // 实际提交任务
    }
}

常见问题与解决方案

问题 1: Redis 宕机导致去重失效

解决方案:

php
<?php

// 使用双重检查:Redis + 数据库
class ReliableDeduplicator
{
    public function isDuplicate($messageId)
    {
        // 第一层:Redis 快速检查
        try {
            if ($this->redis->exists("msg:{$messageId}")) {
                return true;
            }
        } catch (Exception $e) {
            // Redis 异常,降级到数据库检查
            error_log("Redis 异常: " . $e->getMessage());
        }
        
        // 第二层:数据库可靠检查
        return $this->isInDatabase($messageId);
    }
}

问题 2: 消息 ID 生成重复

解决方案:

php
<?php

// 使用更可靠的 ID 生成方式
class MessageIdGenerator
{
    public static function generate($data = null)
    {
        $parts = [
            date('YmdHis'),
            gethostname(),
            getmypid(),
            uniqid(),
            $data ? md5(json_encode($data)) : ''
        ];
        
        return implode('-', array_filter($parts));
    }
    
    public static function generateFromBusiness($entity, $id, $action)
    {
        return "{$entity}:{$id}:{$action}";
    }
}

问题 3: 去重数据膨胀

解决方案:

php
<?php

// 定期清理过期数据
class DedupCleanupService
{
    public function cleanup()
    {
        // Redis 自动过期
        
        // 数据库定期清理
        $sql = "DELETE FROM message_dedup WHERE created_at < DATE_SUB(NOW(), INTERVAL 7 DAY)";
        $this->pdo->exec($sql);
    }
}

最佳实践建议

  1. 生成唯一 ID: 每条消息都有唯一标识
  2. 双重检查: Redis + 数据库确保可靠性
  3. 设置过期: 定期清理去重数据
  4. 幂等设计: 业务逻辑本身支持幂等
  5. 监控告警: 监控重复消息比例

相关链接