Skip to content

消息重复消费

概述

消息重复消费是指同一条消息被消费者处理多次的现象。在分布式系统中,由于网络抖动、服务重启、确认超时等原因,消息重复消费是常见问题。本文档将详细分析原因并提供解决方案。

问题表现与症状

常见症状

┌─────────────────────────────────────────────────────────────┐
│                  消息重复消费典型症状                        │
├─────────────────────────────────────────────────────────────┤
│  1. 数据库中出现重复记录                                     │
│  2. 用户收到重复通知                                         │
│  3. 订单被重复处理                                           │
│  4. 账户余额被多次扣减                                       │
│  5. 日志中出现相同消息ID的多次处理记录                       │
│  6. 业务数据统计不准确                                       │
└─────────────────────────────────────────────────────────────┘

重复消费场景分析

                    ┌─────────────────┐
                    │  消息重复场景    │
                    └────────┬────────┘

    ┌────────────┬───────────┼───────────┬────────────┐
    ▼            ▼           ▼           ▼            ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│网络抖动│ │消费者  │ │ACK超时 │ │生产者  │ │集群    │
│重试    │ │重启    │ │重投递  │ │重复发送│ │切换    │
└────────┘ └────────┘ └────────┘ └────────┘ └────────┘

问题原因分析

1. 网络层面原因

原因说明发生概率
网络抖动ACK响应丢失,Broker重投递
连接超时消费者处理慢,连接断开
网络分区集群节点间通信异常

2. 消费者层面原因

原因说明发生概率
处理超时业务处理时间超过ACK超时时间
消费者重启处理中重启,消息重新入队
异常崩溃处理过程中崩溃未ACK
预取过多批量预取后处理失败

3. 生产者层面原因

原因说明发生概率
确认超时重发未收到确认,生产者重发
业务重试业务逻辑层面的重复发送
事务回滚后重发事务失败后重新发送

4. Broker层面原因

原因说明发生概率
镜像队列同步主从切换时消息重复
持久化恢复重启后消息状态不一致

诊断步骤

步骤1:确认重复消费现象

bash
# 查看队列消息统计
rabbitmqctl list_queues name messages messages_ready messages_unacked

# 查看消息投递统计
rabbitmqctl list_queues name message_stats.deliver message_stats.redeliver

# 查看重投递率
curl -s -u guest:guest http://localhost:15672/api/queues | jq '.[] | {name, message_stats: .message_stats.redeliver}'

步骤2:分析消费者日志

bash
# 搜索重复处理的消息ID
grep -E "message_id|处理消息" /var/log/app/consumer.log | sort | uniq -d

# 统计消息处理次数
grep "message_id" /var/log/app/consumer.log | awk '{print $NF}' | sort | uniq -c | sort -rn | head -20

步骤3:检查ACK配置

bash
# 查看消费者确认模式
rabbitmqctl list_consumers

# 查看未确认消息
rabbitmqctl list_queues name messages_unacked

# 检查超时配置
rabbitmqctl environment | grep consumer_timeout

步骤4:检查网络状况

bash
# 检查连接状态
rabbitmqctl list_connections

# 检查网络延迟
ping -c 10 rabbitmq-server

# 检查丢包率
traceroute rabbitmq-server

解决方案

1. 消息幂等性设计

基于唯一ID的幂等处理

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class IdempotentConsumer
{
    private $connection;
    private $channel;
    private $redis;
    private $db;
    private $processedCacheTTL = 86400;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->channel = $this->connection->channel();
        
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        
        $this->db = new PDO('mysql:host=localhost;dbname=app', 'user', 'pass');
    }

    public function consume(string $queue)
    {
        $callback = function (AMQPMessage $message) {
            $body = json_decode($message->getBody(), true);
            $messageId = $message->get('message_id') ?? $body['id'] ?? null;
            
            if (!$messageId) {
                $messageId = $this->generateMessageId($body);
            }
            
            if ($this->isProcessed($messageId)) {
                echo "消息 {$messageId} 已处理过,跳过\n";
                $message->ack();
                return;
            }
            
            try {
                $this->processWithIdempotency($messageId, $body);
                $this->markAsProcessed($messageId);
                $message->ack();
                echo "消息 {$messageId} 处理成功\n";
            } catch (\Exception $e) {
                echo "消息 {$messageId} 处理失败: " . $e->getMessage() . "\n";
                $message->nack(true);
            }
        };
        
        $this->channel->basic_consume($queue, '', false, false, false, false, $callback);
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }

    private function isProcessed(string $messageId): bool
    {
        return $this->redis->exists("processed:{$messageId}");
    }

    private function markAsProcessed(string $messageId): void
    {
        $this->redis->setex("processed:{$messageId}", $this->processedCacheTTL, '1');
    }

    private function processWithIdempotency(string $messageId, array $body): void
    {
        $this->db->beginTransaction();
        
        try {
            $checkStmt = $this->db->prepare(
                "SELECT id FROM processed_messages WHERE message_id = ? FOR UPDATE"
            );
            $checkStmt->execute([$messageId]);
            
            if ($checkStmt->fetch()) {
                $this->db->commit();
                return;
            }
            
            $this->doBusinessLogic($body);
            
            $insertStmt = $this->db->prepare(
                "INSERT INTO processed_messages (message_id, created_at) VALUES (?, NOW())"
            );
            $insertStmt->execute([$messageId]);
            
            $this->db->commit();
        } catch (\Exception $e) {
            $this->db->rollBack();
            throw $e;
        }
    }

    private function doBusinessLogic(array $body): void
    {
        // 具体业务逻辑
    }

    private function generateMessageId(array $body): string
    {
        return md5(json_encode($body) . microtime(true));
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
        $this->redis->close();
    }
}

// 使用示例
$consumer = new IdempotentConsumer();
$consumer->consume('orders.queue');

2. 业务唯一键去重

php
<?php

class BusinessKeyDeduplication
{
    private $redis;
    private $db;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        
        $this->db = new PDO('mysql:host=localhost;dbname=app', 'user', 'pass');
    }

    public function processOrder(array $orderData): bool
    {
        $orderNo = $orderData['order_no'];
        $dedupeKey = "order:dedup:{$orderNo}";
        
        $luaScript = <<<'LUA'
            if redis.call("EXISTS", KEYS[1]) == 1 then
                return 0
            end
            redis.call("SETEX", KEYS[1], ARGV[1], "1")
            return 1
LUA;
        
        $result = $this->redis->eval($luaScript, [$dedupeKey, 86400], 1);
        
        if ($result === 0) {
            echo "订单 {$orderNo} 已处理过\n";
            return true;
        }
        
        return $this->createOrder($orderData);
    }

    private function createOrder(array $orderData): bool
    {
        $this->db->beginTransaction();
        
        try {
            $checkStmt = $this->db->prepare(
                "SELECT id FROM orders WHERE order_no = ? FOR UPDATE"
            );
            $checkStmt->execute([$orderData['order_no']]);
            
            if ($checkStmt->fetch()) {
                $this->db->commit();
                return true;
            }
            
            $insertStmt = $this->db->prepare(
                "INSERT INTO orders (order_no, user_id, amount, status, created_at) 
                 VALUES (?, ?, ?, 'pending', NOW())"
            );
            $insertStmt->execute([
                $orderData['order_no'],
                $orderData['user_id'],
                $orderData['amount'],
            ]);
            
            $this->db->commit();
            return true;
        } catch (\Exception $e) {
            $this->db->rollBack();
            $this->redis->del("order:dedup:{$orderData['order_no']}");
            throw $e;
        }
    }
}

3. 生产者端防重

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class DeduplicationProducer
{
    private $connection;
    private $channel;
    private $redis;
    private $idempotentTTL = 86400;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->channel = $this->connection->channel();
        $this->channel->confirm_select();
        
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
    }

    public function sendMessage(
        string $exchange,
        string $routingKey,
        array $data,
        ?string $messageId = null
    ): bool {
        $messageId = $messageId ?? $this->generateMessageId($data);
        $dedupeKey = "msg:sent:{$messageId}";
        
        if ($this->redis->exists($dedupeKey)) {
            echo "消息 {$messageId} 已发送过,跳过\n";
            return true;
        }
        
        $messageBody = json_encode($data);
        
        $message = new AMQPMessage($messageBody, [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'content_type' => 'application/json',
            'message_id' => $messageId,
            'timestamp' => time(),
        ]);
        
        $this->channel->basic_publish($message, $exchange, $routingKey);
        
        try {
            $this->channel->wait_for_pending_acks_returns(5.0);
            $this->redis->setex($dedupeKey, $this->idempotentTTL, '1');
            echo "消息 {$messageId} 发送成功\n";
            return true;
        } catch (\Exception $e) {
            echo "消息 {$messageId} 发送失败: " . $e->getMessage() . "\n";
            return false;
        }
    }

    private function generateMessageId(array $data): string
    {
        ksort($data);
        return md5(json_encode($data));
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
        $this->redis->close();
    }
}

// 使用示例
$producer = new DeduplicationProducer();

$orderData = [
    'order_no' => 'ORD-20240101-001',
    'user_id' => 'USER-123',
    'amount' => 99.99,
];

$producer->sendMessage(
    'orders.exchange',
    'order.created',
    $orderData,
    'ORD-20240101-001'
);

4. 数据库唯一索引去重

sql
-- 创建消息处理记录表
CREATE TABLE message_processing_log (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    message_id VARCHAR(64) NOT NULL,
    queue_name VARCHAR(128) NOT NULL,
    status ENUM('processing', 'completed', 'failed') NOT NULL DEFAULT 'processing',
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    
    UNIQUE KEY uk_message_queue (message_id, queue_name),
    KEY idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 订单表唯一索引
CREATE TABLE orders (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    order_no VARCHAR(64) NOT NULL,
    user_id VARCHAR(64) NOT NULL,
    amount DECIMAL(10, 2) NOT NULL,
    status VARCHAR(32) NOT NULL DEFAULT 'pending',
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    
    UNIQUE KEY uk_order_no (order_no),
    KEY idx_user_id (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
php
<?php

class DatabaseDeduplication
{
    private $db;

    public function __construct()
    {
        $this->db = new PDO('mysql:host=localhost;dbname=app', 'user', 'pass');
    }

    public function processMessage(string $messageId, string $queueName, callable $processor): bool
    {
        $this->db->beginTransaction();
        
        try {
            $lockStmt = $this->db->prepare(
                "INSERT INTO message_processing_log (message_id, queue_name, status) 
                 VALUES (?, ?, 'processing')"
            );
            
            try {
                $lockStmt->execute([$messageId, $queueName]);
            } catch (PDOException $e) {
                if ($e->getCode() == 23000) {
                    $updateStmt = $this->db->prepare(
                        "SELECT status FROM message_processing_log 
                         WHERE message_id = ? AND queue_name = ?"
                    );
                    $updateStmt->execute([$messageId, $queueName]);
                    $status = $updateStmt->fetchColumn();
                    
                    if ($status === 'completed') {
                        $this->db->commit();
                        return true;
                    }
                    
                    throw new \RuntimeException("消息正在处理中: {$messageId}");
                }
                throw $e;
            }
            
            $result = $processor();
            
            $completeStmt = $this->db->prepare(
                "UPDATE message_processing_log SET status = 'completed' 
                 WHERE message_id = ? AND queue_name = ?"
            );
            $completeStmt->execute([$messageId, $queueName]);
            
            $this->db->commit();
            return $result;
        } catch (\Exception $e) {
            $this->db->rollBack();
            throw $e;
        }
    }

    public function cleanupOldLogs(int $daysToKeep = 7): int
    {
        $stmt = $this->db->prepare(
            "DELETE FROM message_processing_log 
             WHERE created_at < DATE_SUB(NOW(), INTERVAL ? DAY)"
        );
        $stmt->execute([$daysToKeep]);
        return $stmt->rowCount();
    }
}

预防措施

1. 幂等性设计原则

┌─────────────────────────────────────────────────────────────┐
│                    幂等性设计检查清单                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  □ 每条消息都有唯一标识符(message_id)                       │
│  □ 消费前检查消息是否已处理                                  │
│  □ 使用数据库唯一索引保证数据唯一性                          │
│  □ 使用Redis缓存已处理消息ID                                 │
│  □ 业务操作使用乐观锁或悲观锁                                │
│  □ 设计可重入的业务接口                                     │
│  □ 记录消息处理日志便于追踪                                  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2. 监控告警

yaml
# Prometheus 告警规则
groups:
  - name: message_duplicate
    rules:
      - alert: HighRedeliveryRate
        expr: |
          rate(rabbitmq_queue_messages_redelivered_total[5m])
          /
          rate(rabbitmq_queue_messages_delivered_total[5m])
          > 0.1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "消息重投递率过高"
          description: "队列 {{ $labels.queue }} 重投递率超过 10%"

3. 配置优化

bash
# rabbitmq.conf
# 增加消费者超时时间
consumer_timeout = 1800000

# 增加心跳时间
heartbeat = 60

# 优化预取设置
# 在消费者代码中设置合理的 prefetch_count

注意事项

  1. 幂等性是业务责任:消息队列无法保证业务层面的幂等
  2. 唯一ID要稳定:消息ID生成规则要一致
  3. 缓存要有过期:避免Redis内存无限增长
  4. 数据库索引要合理:唯一索引是最后一道防线
  5. 日志要完整:便于问题追踪和分析

相关链接