Appearance
消息重复消费
概述
消息重复消费是指同一条消息被消费者处理多次的现象。在分布式系统中,由于网络抖动、服务重启、确认超时等原因,消息重复消费是常见问题。本文档将详细分析原因并提供解决方案。
问题表现与症状
常见症状
┌─────────────────────────────────────────────────────────────┐
│ 消息重复消费典型症状 │
├─────────────────────────────────────────────────────────────┤
│ 1. 数据库中出现重复记录 │
│ 2. 用户收到重复通知 │
│ 3. 订单被重复处理 │
│ 4. 账户余额被多次扣减 │
│ 5. 日志中出现相同消息ID的多次处理记录 │
│ 6. 业务数据统计不准确 │
└─────────────────────────────────────────────────────────────┘重复消费场景分析
┌─────────────────┐
│ 消息重复场景 │
└────────┬────────┘
│
┌────────────┬───────────┼───────────┬────────────┐
▼ ▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│网络抖动│ │消费者 │ │ACK超时 │ │生产者 │ │集群 │
│重试 │ │重启 │ │重投递 │ │重复发送│ │切换 │
└────────┘ └────────┘ └────────┘ └────────┘ └────────┘问题原因分析
1. 网络层面原因
| 原因 | 说明 | 发生概率 |
|---|---|---|
| 网络抖动 | ACK响应丢失,Broker重投递 | 高 |
| 连接超时 | 消费者处理慢,连接断开 | 中 |
| 网络分区 | 集群节点间通信异常 | 低 |
2. 消费者层面原因
| 原因 | 说明 | 发生概率 |
|---|---|---|
| 处理超时 | 业务处理时间超过ACK超时时间 | 高 |
| 消费者重启 | 处理中重启,消息重新入队 | 中 |
| 异常崩溃 | 处理过程中崩溃未ACK | 中 |
| 预取过多 | 批量预取后处理失败 | 低 |
3. 生产者层面原因
| 原因 | 说明 | 发生概率 |
|---|---|---|
| 确认超时重发 | 未收到确认,生产者重发 | 高 |
| 业务重试 | 业务逻辑层面的重复发送 | 中 |
| 事务回滚后重发 | 事务失败后重新发送 | 低 |
4. Broker层面原因
| 原因 | 说明 | 发生概率 |
|---|---|---|
| 镜像队列同步 | 主从切换时消息重复 | 低 |
| 持久化恢复 | 重启后消息状态不一致 | 低 |
诊断步骤
步骤1:确认重复消费现象
bash
# 查看队列消息统计
rabbitmqctl list_queues name messages messages_ready messages_unacked
# 查看消息投递统计
rabbitmqctl list_queues name message_stats.deliver message_stats.redeliver
# 查看重投递率
curl -s -u guest:guest http://localhost:15672/api/queues | jq '.[] | {name, message_stats: .message_stats.redeliver}'步骤2:分析消费者日志
bash
# 搜索重复处理的消息ID
grep -E "message_id|处理消息" /var/log/app/consumer.log | sort | uniq -d
# 统计消息处理次数
grep "message_id" /var/log/app/consumer.log | awk '{print $NF}' | sort | uniq -c | sort -rn | head -20步骤3:检查ACK配置
bash
# 查看消费者确认模式
rabbitmqctl list_consumers
# 查看未确认消息
rabbitmqctl list_queues name messages_unacked
# 检查超时配置
rabbitmqctl environment | grep consumer_timeout步骤4:检查网络状况
bash
# 检查连接状态
rabbitmqctl list_connections
# 检查网络延迟
ping -c 10 rabbitmq-server
# 检查丢包率
traceroute rabbitmq-server解决方案
1. 消息幂等性设计
基于唯一ID的幂等处理
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class IdempotentConsumer
{
private $connection;
private $channel;
private $redis;
private $db;
private $processedCacheTTL = 86400;
public function __construct()
{
$this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
$this->db = new PDO('mysql:host=localhost;dbname=app', 'user', 'pass');
}
public function consume(string $queue)
{
$callback = function (AMQPMessage $message) {
$body = json_decode($message->getBody(), true);
$messageId = $message->get('message_id') ?? $body['id'] ?? null;
if (!$messageId) {
$messageId = $this->generateMessageId($body);
}
if ($this->isProcessed($messageId)) {
echo "消息 {$messageId} 已处理过,跳过\n";
$message->ack();
return;
}
try {
$this->processWithIdempotency($messageId, $body);
$this->markAsProcessed($messageId);
$message->ack();
echo "消息 {$messageId} 处理成功\n";
} catch (\Exception $e) {
echo "消息 {$messageId} 处理失败: " . $e->getMessage() . "\n";
$message->nack(true);
}
};
$this->channel->basic_consume($queue, '', false, false, false, false, $callback);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
private function isProcessed(string $messageId): bool
{
return $this->redis->exists("processed:{$messageId}");
}
private function markAsProcessed(string $messageId): void
{
$this->redis->setex("processed:{$messageId}", $this->processedCacheTTL, '1');
}
private function processWithIdempotency(string $messageId, array $body): void
{
$this->db->beginTransaction();
try {
$checkStmt = $this->db->prepare(
"SELECT id FROM processed_messages WHERE message_id = ? FOR UPDATE"
);
$checkStmt->execute([$messageId]);
if ($checkStmt->fetch()) {
$this->db->commit();
return;
}
$this->doBusinessLogic($body);
$insertStmt = $this->db->prepare(
"INSERT INTO processed_messages (message_id, created_at) VALUES (?, NOW())"
);
$insertStmt->execute([$messageId]);
$this->db->commit();
} catch (\Exception $e) {
$this->db->rollBack();
throw $e;
}
}
private function doBusinessLogic(array $body): void
{
// 具体业务逻辑
}
private function generateMessageId(array $body): string
{
return md5(json_encode($body) . microtime(true));
}
public function close()
{
$this->channel->close();
$this->connection->close();
$this->redis->close();
}
}
// 使用示例
$consumer = new IdempotentConsumer();
$consumer->consume('orders.queue');2. 业务唯一键去重
php
<?php
class BusinessKeyDeduplication
{
private $redis;
private $db;
public function __construct()
{
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
$this->db = new PDO('mysql:host=localhost;dbname=app', 'user', 'pass');
}
public function processOrder(array $orderData): bool
{
$orderNo = $orderData['order_no'];
$dedupeKey = "order:dedup:{$orderNo}";
$luaScript = <<<'LUA'
if redis.call("EXISTS", KEYS[1]) == 1 then
return 0
end
redis.call("SETEX", KEYS[1], ARGV[1], "1")
return 1
LUA;
$result = $this->redis->eval($luaScript, [$dedupeKey, 86400], 1);
if ($result === 0) {
echo "订单 {$orderNo} 已处理过\n";
return true;
}
return $this->createOrder($orderData);
}
private function createOrder(array $orderData): bool
{
$this->db->beginTransaction();
try {
$checkStmt = $this->db->prepare(
"SELECT id FROM orders WHERE order_no = ? FOR UPDATE"
);
$checkStmt->execute([$orderData['order_no']]);
if ($checkStmt->fetch()) {
$this->db->commit();
return true;
}
$insertStmt = $this->db->prepare(
"INSERT INTO orders (order_no, user_id, amount, status, created_at)
VALUES (?, ?, ?, 'pending', NOW())"
);
$insertStmt->execute([
$orderData['order_no'],
$orderData['user_id'],
$orderData['amount'],
]);
$this->db->commit();
return true;
} catch (\Exception $e) {
$this->db->rollBack();
$this->redis->del("order:dedup:{$orderData['order_no']}");
throw $e;
}
}
}3. 生产者端防重
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class DeduplicationProducer
{
private $connection;
private $channel;
private $redis;
private $idempotentTTL = 86400;
public function __construct()
{
$this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
$this->channel->confirm_select();
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
}
public function sendMessage(
string $exchange,
string $routingKey,
array $data,
?string $messageId = null
): bool {
$messageId = $messageId ?? $this->generateMessageId($data);
$dedupeKey = "msg:sent:{$messageId}";
if ($this->redis->exists($dedupeKey)) {
echo "消息 {$messageId} 已发送过,跳过\n";
return true;
}
$messageBody = json_encode($data);
$message = new AMQPMessage($messageBody, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json',
'message_id' => $messageId,
'timestamp' => time(),
]);
$this->channel->basic_publish($message, $exchange, $routingKey);
try {
$this->channel->wait_for_pending_acks_returns(5.0);
$this->redis->setex($dedupeKey, $this->idempotentTTL, '1');
echo "消息 {$messageId} 发送成功\n";
return true;
} catch (\Exception $e) {
echo "消息 {$messageId} 发送失败: " . $e->getMessage() . "\n";
return false;
}
}
private function generateMessageId(array $data): string
{
ksort($data);
return md5(json_encode($data));
}
public function close()
{
$this->channel->close();
$this->connection->close();
$this->redis->close();
}
}
// 使用示例
$producer = new DeduplicationProducer();
$orderData = [
'order_no' => 'ORD-20240101-001',
'user_id' => 'USER-123',
'amount' => 99.99,
];
$producer->sendMessage(
'orders.exchange',
'order.created',
$orderData,
'ORD-20240101-001'
);4. 数据库唯一索引去重
sql
-- 创建消息处理记录表
CREATE TABLE message_processing_log (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
message_id VARCHAR(64) NOT NULL,
queue_name VARCHAR(128) NOT NULL,
status ENUM('processing', 'completed', 'failed') NOT NULL DEFAULT 'processing',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_message_queue (message_id, queue_name),
KEY idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 订单表唯一索引
CREATE TABLE orders (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
order_no VARCHAR(64) NOT NULL,
user_id VARCHAR(64) NOT NULL,
amount DECIMAL(10, 2) NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'pending',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_order_no (order_no),
KEY idx_user_id (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;php
<?php
class DatabaseDeduplication
{
private $db;
public function __construct()
{
$this->db = new PDO('mysql:host=localhost;dbname=app', 'user', 'pass');
}
public function processMessage(string $messageId, string $queueName, callable $processor): bool
{
$this->db->beginTransaction();
try {
$lockStmt = $this->db->prepare(
"INSERT INTO message_processing_log (message_id, queue_name, status)
VALUES (?, ?, 'processing')"
);
try {
$lockStmt->execute([$messageId, $queueName]);
} catch (PDOException $e) {
if ($e->getCode() == 23000) {
$updateStmt = $this->db->prepare(
"SELECT status FROM message_processing_log
WHERE message_id = ? AND queue_name = ?"
);
$updateStmt->execute([$messageId, $queueName]);
$status = $updateStmt->fetchColumn();
if ($status === 'completed') {
$this->db->commit();
return true;
}
throw new \RuntimeException("消息正在处理中: {$messageId}");
}
throw $e;
}
$result = $processor();
$completeStmt = $this->db->prepare(
"UPDATE message_processing_log SET status = 'completed'
WHERE message_id = ? AND queue_name = ?"
);
$completeStmt->execute([$messageId, $queueName]);
$this->db->commit();
return $result;
} catch (\Exception $e) {
$this->db->rollBack();
throw $e;
}
}
public function cleanupOldLogs(int $daysToKeep = 7): int
{
$stmt = $this->db->prepare(
"DELETE FROM message_processing_log
WHERE created_at < DATE_SUB(NOW(), INTERVAL ? DAY)"
);
$stmt->execute([$daysToKeep]);
return $stmt->rowCount();
}
}预防措施
1. 幂等性设计原则
┌─────────────────────────────────────────────────────────────┐
│ 幂等性设计检查清单 │
├─────────────────────────────────────────────────────────────┤
│ │
│ □ 每条消息都有唯一标识符(message_id) │
│ □ 消费前检查消息是否已处理 │
│ □ 使用数据库唯一索引保证数据唯一性 │
│ □ 使用Redis缓存已处理消息ID │
│ □ 业务操作使用乐观锁或悲观锁 │
│ □ 设计可重入的业务接口 │
│ □ 记录消息处理日志便于追踪 │
│ │
└─────────────────────────────────────────────────────────────┘2. 监控告警
yaml
# Prometheus 告警规则
groups:
- name: message_duplicate
rules:
- alert: HighRedeliveryRate
expr: |
rate(rabbitmq_queue_messages_redelivered_total[5m])
/
rate(rabbitmq_queue_messages_delivered_total[5m])
> 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "消息重投递率过高"
description: "队列 {{ $labels.queue }} 重投递率超过 10%"3. 配置优化
bash
# rabbitmq.conf
# 增加消费者超时时间
consumer_timeout = 1800000
# 增加心跳时间
heartbeat = 60
# 优化预取设置
# 在消费者代码中设置合理的 prefetch_count注意事项
- 幂等性是业务责任:消息队列无法保证业务层面的幂等
- 唯一ID要稳定:消息ID生成规则要一致
- 缓存要有过期:避免Redis内存无限增长
- 数据库索引要合理:唯一索引是最后一道防线
- 日志要完整:便于问题追踪和分析
