Skip to content

Pika 消费者实现

一、概述

消费者(Consumer)是从 RabbitMQ 队列中接收和处理消息的组件。本文档详细介绍如何使用 Pika 库实现消息消费者,包括同步消费、异步消费、消息确认等内容。

1.1 消费者架构

┌─────────────────────────────────────────────────────────────────────┐
│                       消费者架构                                     │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐          │
│  │   Queue     │ ──► │   Channel   │ ──► │  Consumer   │          │
│  └─────────────┘     └─────────────┘     └──────┬──────┘          │
│                                                  │                  │
│                                                  ▼                  │
│                                          ┌─────────────┐          │
│                                          │ Application │          │
│                                          │  (处理消息)  │          │
│                                          └─────────────┘          │
│                                                                     │
│  消费模式:                                                         │
│  ├── Push 模式:服务端主动推送消息                                  │
│  └── Pull 模式:客户端主动拉取消息                                  │
│                                                                     │
│  消息确认:                                                         │
│  ├── 自动确认:auto_ack = True                                      │
│  └── 手动确认:basic_ack/basic_nack/basic_reject                    │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 消费方式对比

text
┌─────────────────────────────────────────────────────────────────────┐
│                     消费方式对比                                     │
├───────────────┬─────────────────────────────────────────────────────┤
│     方式      │                    说明                              │
├───────────────┼─────────────────────────────────────────────────────┤
│  Push 模式    │ 服务端主动推送消息到消费者                          │
│  (basic_consume)│ 实时性好,推荐使用                                 │
│               │ 需要注册回调函数                                    │
├───────────────┼─────────────────────────────────────────────────────┤
│  Pull 模式    │ 客户端主动从队列获取消息                            │
│  (basic_get)  │ 适合低频消费场景                                    │
│               │ 每次只获取一条消息                                  │
└───────────────┴─────────────────────────────────────────────────────┘

二、核心知识点

2.1 Push 模式消费

2.1.1 基础消费者

python
import pika

def basic_consumer():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    def callback(ch, method, properties, body):
        print(f"收到消息: {body.decode()}")
    
    channel.basic_consume(
        queue='hello',
        on_message_callback=callback,
        auto_ack=True
    )
    
    print('开始消费,按 CTRL+C 退出')
    channel.start_consuming()

basic_consumer()

2.1.2 手动确认消费

python
import pika

def manual_ack_consumer():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    channel.queue_declare(queue='task_queue', durable=True)
    
    channel.basic_qos(prefetch_count=1)
    
    def callback(ch, method, properties, body):
        try:
            print(f"处理消息: {body.decode()}")
            
            import time
            time.sleep(1)
            
            ch.basic_ack(delivery_tag=method.delivery_tag)
            print("消息已确认")
            
        except Exception as e:
            print(f"处理失败: {e}")
            ch.basic_nack(
                delivery_tag=method.delivery_tag,
                requeue=False
            )
    
    channel.basic_consume(
        queue='task_queue',
        on_message_callback=callback,
        auto_ack=False
    )
    
    print('开始消费,按 CTRL+C 退出')
    channel.start_consuming()

manual_ack_consumer()

2.2 Pull 模式消费

python
import pika

def pull_consumer():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    channel.queue_declare(queue='pull_queue')
    
    while True:
        method, properties, body = channel.basic_get(
            queue='pull_queue',
            auto_ack=False
        )
        
        if method:
            print(f"获取消息: {body.decode()}")
            
            try:
                process_message(body)
                channel.basic_ack(method.delivery_tag)
                print("消息已确认")
            except Exception as e:
                print(f"处理失败: {e}")
                channel.basic_nack(method.delivery_tag, requeue=False)
        else:
            print("队列为空,等待...")
            import time
            time.sleep(1)

def process_message(body):
    print(f"处理: {body.decode()}")

pull_consumer()

2.3 消息确认

2.3.1 确认方式

python
import pika

def demonstrate_ack_modes():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    def auto_ack_callback(ch, method, properties, body):
        print(f"自动确认: {body.decode()}")
    
    channel.basic_consume(
        queue='auto_ack_queue',
        on_message_callback=auto_ack_callback,
        auto_ack=True
    )
    
    def manual_ack_callback(ch, method, properties, body):
        print(f"手动确认: {body.decode()}")
        
        ch.basic_ack(method.delivery_tag)
        
        ch.basic_ack(method.delivery_tag, multiple=True)
        
        ch.basic_nack(method.delivery_tag, requeue=True)
        
        ch.basic_nack(method.delivery_tag, multiple=True, requeue=False)
        
        ch.basic_reject(method.delivery_tag, requeue=False)

2.3.2 确认策略

text
┌─────────────────────────────────────────────────────────────────────┐
│                     消息确认策略                                     │
├───────────────┬─────────────────────────────────────────────────────┤
│     方法      │                    说明                              │
├───────────────┼─────────────────────────────────────────────────────┤
│  basic_ack    │ 确认消息成功处理                                    │
│               │ multiple=False: 确认单条消息                        │
│               │ multiple=True: 确认该标签及之前的所有消息           │
├───────────────┼─────────────────────────────────────────────────────┤
│  basic_nack   │ 否定确认消息                                        │
│               │ multiple: 是否批量                                  │
│               │ requeue: 是否重新入队                               │
├───────────────┼─────────────────────────────────────────────────────┤
│  basic_reject │ 拒绝单条消息                                        │
│               │ requeue: 是否重新入队                               │
│               │ 比 basic_nack 更轻量                                 │
├───────────────┼─────────────────────────────────────────────────────┤
│  basic_recover│ 重新投递所有未确认的消息                            │
│               │ requeue=True: 重新入队后投递                        │
│               │ requeue=False: 尝试投递给同一消费者                 │
└───────────────┴─────────────────────────────────────────────────────┘

2.4 QoS 设置

python
import pika

def qos_consumer():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    channel.queue_declare(queue='qos_queue', durable=True)
    
    channel.basic_qos(
        prefetch_count=10,
        global_qos=False
    )
    
    def callback(ch, method, properties, body):
        print(f"处理消息: {body.decode()}")
        
        import time
        time.sleep(1)
        
        ch.basic_ack(method.delivery_tag)
    
    channel.basic_consume(
        queue='qos_queue',
        on_message_callback=callback,
        auto_ack=False
    )
    
    print('开始消费,预取数量: 10')
    channel.start_consuming()

2.5 消费者标签

python
import pika

def consumer_tag_demo():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    channel.queue_declare(queue='tag_queue')
    
    def callback(ch, method, properties, body):
        print(f"收到消息: {body.decode()}")
        ch.basic_ack(method.delivery_tag)
    
    consumer_tag = channel.basic_consume(
        queue='tag_queue',
        on_message_callback=callback,
        consumer_tag='my-consumer-tag',
        auto_ack=False
    )
    
    print(f"消费者标签: {consumer_tag}")
    
    import threading
    import time
    
    def cancel_after_delay():
        time.sleep(10)
        channel.basic_cancel(consumer_tag)
        print("消费者已取消")
    
    cancel_thread = threading.Thread(target=cancel_after_delay)
    cancel_thread.start()
    
    channel.start_consuming()

2.6 消费者取消

python
import pika

def cancel_callback_demo():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    channel.queue_declare(queue='cancel_queue')
    
    def on_cancel(method_frame):
        print(f"消费者被取消: {method_frame}")
    
    def callback(ch, method, properties, body):
        print(f"收到消息: {body.decode()}")
        ch.basic_ack(method.delivery_tag)
    
    consumer_tag = channel.basic_consume(
        queue='cancel_queue',
        on_message_callback=callback,
        on_cancel_callback=on_cancel,
        auto_ack=False
    )
    
    channel.start_consuming()

三、代码示例

3.1 完整消费者类

python
import pika
import json
import logging
import signal
import sys
from typing import Callable, Optional, Dict, Any

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class PikaConsumer:
    
    def __init__(self, host: str = 'localhost', port: int = 5672,
                 username: str = 'guest', password: str = 'guest',
                 virtual_host: str = '/'):
        
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.virtual_host = virtual_host
        
        self.connection: Optional[pika.BlockingConnection] = None
        self.channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None
        self.consumer_tag: Optional[str] = None
        
        self.message_handler: Optional[Callable] = None
        self.running = False
        
        signal.signal(signal.SIGINT, self._signal_handler)
        signal.signal(signal.SIGTERM, self._signal_handler)
    
    def _signal_handler(self, signum, frame):
        logger.info(f"收到信号 {signum},准备关闭...")
        self.stop()
        sys.exit(0)
    
    def connect(self):
        credentials = pika.PlainCredentials(self.username, self.password)
        parameters = pika.ConnectionParameters(
            host=self.host,
            port=self.port,
            virtual_host=self.virtual_host,
            credentials=credentials,
            heartbeat=60,
            connection_attempts=3,
            retry_delay=5
        )
        
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()
        
        logger.info(f"连接到 {self.host}:{self.port}")
    
    def set_message_handler(self, handler: Callable):
        self.message_handler = handler
    
    def declare_queue(self, queue: str, durable: bool = True,
                     arguments: Optional[Dict] = None):
        if not self.channel:
            self.connect()
        
        self.channel.queue_declare(
            queue=queue,
            durable=durable,
            arguments=arguments
        )
        logger.info(f"队列 {queue} 已声明")
    
    def set_qos(self, prefetch_count: int = 10):
        if not self.channel:
            self.connect()
        
        self.channel.basic_qos(prefetch_count=prefetch_count)
        logger.info(f"QoS 设置: prefetch_count={prefetch_count}")
    
    def _on_message(self, channel, method, properties, body):
        try:
            message = body.decode('utf-8')
            
            if properties.content_type == 'application/json':
                message = json.loads(message)
            
            logger.debug(f"收到消息: {properties.message_id}")
            
            if self.message_handler:
                self.message_handler(message, properties.headers or {})
            
            channel.basic_ack(delivery_tag=method.delivery_tag)
            logger.debug("消息已确认")
            
        except Exception as e:
            logger.error(f"处理消息失败: {e}")
            channel.basic_nack(
                delivery_tag=method.delivery_tag,
                requeue=False
            )
    
    def start_consuming(self, queue: str, consumer_tag: str = ''):
        if not self.channel:
            self.connect()
        
        self.consumer_tag = self.channel.basic_consume(
            queue=queue,
            on_message_callback=self._on_message,
            auto_ack=False,
            consumer_tag=consumer_tag
        )
        
        logger.info(f"开始消费队列: {queue}")
        self.running = True
        
        try:
            self.channel.start_consuming()
        except Exception as e:
            logger.error(f"消费异常: {e}")
            self.stop()
    
    def stop(self):
        self.running = False
        
        if self.channel and self.consumer_tag:
            try:
                self.channel.basic_cancel(self.consumer_tag)
                logger.info("消费者已取消")
            except Exception as e:
                logger.error(f"取消消费者失败: {e}")
        
        self.close()
    
    def close(self):
        if self.channel and self.channel.is_open:
            self.channel.close()
        
        if self.connection and self.connection.is_open:
            self.connection.close()
        
        logger.info("连接已关闭")
    
    def __enter__(self):
        self.connect()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()


if __name__ == '__main__':
    def handle_message(message: Any, headers: Dict):
        print(f"处理消息: {message}")
        print(f"消息头: {headers}")
    
    with PikaConsumer() as consumer:
        consumer.set_message_handler(handle_message)
        consumer.declare_queue('test.queue')
        consumer.set_qos(prefetch_count=10)
        consumer.start_consuming('test.queue')

3.2 多线程消费者

python
import pika
import threading
import logging
from queue import Queue
from typing import Callable

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class MultiThreadConsumer:
    
    def __init__(self, queue_name: str, num_workers: int = 5,
                 prefetch_count: int = 10):
        self.queue_name = queue_name
        self.num_workers = num_workers
        self.prefetch_count = prefetch_count
        
        self.connection = None
        self.channel = None
        
        self.message_queue = Queue(maxsize=100)
        self.workers = []
        self.running = False
    
    def connect(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        self.channel.queue_declare(queue=self.queue_name, durable=True)
        self.channel.basic_qos(prefetch_count=self.prefetch_count)
        
        logger.info("连接已建立")
    
    def start_workers(self, handler: Callable):
        for i in range(self.num_workers):
            worker = threading.Thread(
                target=self._worker_loop,
                args=(handler, i),
                daemon=True
            )
            worker.start()
            self.workers.append(worker)
        
        logger.info(f"启动 {self.num_workers} 个工作线程")
    
    def _worker_loop(self, handler: Callable, worker_id: int):
        while self.running:
            try:
                message_data = self.message_queue.get(timeout=1)
                
                channel, method, properties, body = message_data
                
                try:
                    handler(body.decode('utf-8'))
                    channel.basic_ack(method.delivery_tag)
                    logger.debug(f"Worker {worker_id}: 消息已确认")
                except Exception as e:
                    logger.error(f"Worker {worker_id}: 处理失败: {e}")
                    channel.basic_nack(method.delivery_tag, requeue=False)
                
            except:
                pass
    
    def _on_message(self, channel, method, properties, body):
        self.message_queue.put((channel, method, properties, body))
    
    def start(self, handler: Callable):
        self.connect()
        self.running = True
        self.start_workers(handler)
        
        self.channel.basic_consume(
            queue=self.queue_name,
            on_message_callback=self._on_message,
            auto_ack=False
        )
        
        logger.info("开始消费...")
        
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.stop()
    
    def stop(self):
        self.running = False
        
        for worker in self.workers:
            worker.join(timeout=5)
        
        if self.connection:
            self.connection.close()
        
        logger.info("消费者已停止")


if __name__ == '__main__':
    def process_message(message: str):
        import time
        time.sleep(0.1)
        print(f"处理: {message}")
    
    consumer = MultiThreadConsumer('test.queue', num_workers=5)
    
    try:
        consumer.start(process_message)
    except KeyboardInterrupt:
        consumer.stop()

3.3 优先级消费者

python
import pika
import threading
from queue import PriorityQueue
from typing import Tuple, Any

class PriorityConsumer:
    
    def __init__(self, queue_name: str):
        self.queue_name = queue_name
        self.connection = None
        self.channel = None
        
        self.priority_queue = PriorityQueue()
        self.running = False
    
    def connect(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        args = {'x-max-priority': 10}
        self.channel.queue_declare(
            queue=self.queue_name,
            durable=True,
            arguments=args
        )
        
        self.channel.basic_qos(prefetch_count=10)
    
    def _on_message(self, channel, method, properties, body):
        priority = properties.priority or 0
        
        self.priority_queue.put(
            (-priority, channel, method, body),
            block=False
        )
    
    def _process_messages(self, handler):
        while self.running:
            try:
                neg_priority, channel, method, body = self.priority_queue.get(timeout=1)
                priority = -neg_priority
                
                try:
                    handler(body.decode('utf-8'))
                    channel.basic_ack(method.delivery_tag)
                    print(f"处理优先级 {priority} 消息")
                except Exception as e:
                    print(f"处理失败: {e}")
                    channel.basic_nack(method.delivery_tag, requeue=False)
                
            except:
                pass
    
    def start(self, handler):
        self.connect()
        self.running = True
        
        process_thread = threading.Thread(
            target=self._process_messages,
            args=(handler,),
            daemon=True
        )
        process_thread.start()
        
        self.channel.basic_consume(
            queue=self.queue_name,
            on_message_callback=self._on_message,
            auto_ack=False
        )
        
        print("开始消费优先级消息...")
        
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.stop()
    
    def stop(self):
        self.running = False
        
        if self.connection:
            self.connection.close()


if __name__ == '__main__':
    def handle_priority_message(message: str):
        print(f"处理: {message}")
    
    consumer = PriorityConsumer('priority.queue')
    
    try:
        consumer.start(handle_priority_message)
    except KeyboardInterrupt:
        consumer.stop()

四、实际应用场景

4.1 订单处理消费者

python
import pika
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class OrderConsumer:
    
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        self._setup_infrastructure()
    
    def _setup_infrastructure(self):
        self.channel.exchange_declare(
            exchange='order.events',
            exchange_type='topic',
            durable=True
        )
        
        self.channel.queue_declare(
            queue='order.processing',
            durable=True,
            arguments={
                'x-message-ttl': 86400000,
                'x-dead-letter-exchange': 'dlx.exchange'
            }
        )
        
        self.channel.queue_bind(
            queue='order.processing',
            exchange='order.events',
            routing_key='order.#'
        )
        
        self.channel.basic_qos(prefetch_count=10)
    
    def _on_order_message(self, channel, method, properties, body):
        try:
            event = json.loads(body.decode('utf-8'))
            event_type = properties.type or 'unknown'
            
            logger.info(f"收到订单事件: {event_type}")
            
            if event_type == 'order.created':
                self._handle_order_created(event)
            elif event_type == 'order.paid':
                self._handle_order_paid(event)
            elif event_type == 'order.cancelled':
                self._handle_order_cancelled(event)
            else:
                logger.warning(f"未知事件类型: {event_type}")
            
            channel.basic_ack(method.delivery_tag)
            logger.info(f"事件处理完成: {event_type}")
            
        except Exception as e:
            logger.error(f"处理失败: {e}")
            channel.basic_nack(method.delivery_tag, requeue=False)
    
    def _handle_order_created(self, event: dict):
        order_id = event.get('order_id')
        logger.info(f"处理订单创建: {order_id}")
    
    def _handle_order_paid(self, event: dict):
        order_id = event.get('order_id')
        logger.info(f"处理订单支付: {order_id}")
    
    def _handle_order_cancelled(self, event: dict):
        order_id = event.get('order_id')
        logger.info(f"处理订单取消: {order_id}")
    
    def start(self):
        self.channel.basic_consume(
            queue='order.processing',
            on_message_callback=self._on_order_message,
            auto_ack=False
        )
        
        logger.info("开始消费订单事件...")
        
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.stop()
    
    def stop(self):
        self.connection.close()
        logger.info("消费者已停止")


if __name__ == '__main__':
    consumer = OrderConsumer()
    consumer.start()

4.2 日志处理消费者

python
import pika
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class LogConsumer:
    
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        self._setup_infrastructure()
    
    def _setup_infrastructure(self):
        self.channel.exchange_declare(
            exchange='logs',
            exchange_type='topic',
            durable=True
        )
        
        self.channel.queue_declare(
            queue='logs.error',
            durable=True
        )
        
        self.channel.queue_declare(
            queue='logs.all',
            durable=True
        )
        
        self.channel.queue_bind(
            queue='logs.error',
            exchange='logs',
            routing_key='*.error'
        )
        
        self.channel.queue_bind(
            queue='logs.all',
            exchange='logs',
            routing_key='#'
        )
        
        self.channel.basic_qos(prefetch_count=20)
    
    def _on_error_log(self, channel, method, properties, body):
        try:
            log_entry = json.loads(body.decode('utf-8'))
            
            logger.error(
                f"[{log_entry.get('service')}] "
                f"{log_entry.get('message')}"
            )
            
            self._send_alert(log_entry)
            
            channel.basic_ack(method.delivery_tag)
            
        except Exception as e:
            logger.error(f"处理错误日志失败: {e}")
            channel.basic_nack(method.delivery_tag, requeue=False)
    
    def _on_all_logs(self, channel, method, properties, body):
        try:
            log_entry = json.loads(body.decode('utf-8'))
            
            level = log_entry.get('level', 'INFO')
            service = log_entry.get('service', 'unknown')
            message = log_entry.get('message', '')
            
            logger.info(f"[{level}] {service}: {message}")
            
            self._store_log(log_entry)
            
            channel.basic_ack(method.delivery_tag)
            
        except Exception as e:
            logger.error(f"处理日志失败: {e}")
            channel.basic_nack(method.delivery_tag, requeue=False)
    
    def _send_alert(self, log_entry: dict):
        logger.warning(f"发送告警: {log_entry.get('message')}")
    
    def _store_log(self, log_entry: dict):
        pass
    
    def start(self):
        self.channel.basic_consume(
            queue='logs.error',
            on_message_callback=self._on_error_log,
            auto_ack=False
        )
        
        self.channel.basic_consume(
            queue='logs.all',
            on_message_callback=self._on_all_logs,
            auto_ack=False
        )
        
        logger.info("开始消费日志...")
        
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.stop()
    
    def stop(self):
        self.connection.close()


if __name__ == '__main__':
    consumer = LogConsumer()
    consumer.start()

五、常见问题与解决方案

5.1 消息重复消费

问题描述: 消息被重复处理。

解决方案

python
import pika
import redis

redis_client = redis.Redis()

def callback(ch, method, properties, body):
    message_id = properties.message_id
    
    if redis_client.exists(f"processed:{message_id}"):
        print(f"消息已处理,跳过: {message_id}")
        ch.basic_ack(method.delivery_tag)
        return
    
    process_message(body)
    
    redis_client.setex(f"processed:{message_id}", 86400, "1")
    ch.basic_ack(method.delivery_tag)

5.2 消息处理超时

问题描述: 消息处理时间过长。

解决方案

python
import pika
import threading

def callback(ch, method, properties, body):
    def process_with_timeout():
        try:
            process_message(body)
            ch.basic_ack(method.delivery_tag)
        except Exception as e:
            ch.basic_nack(method.delivery_tag, requeue=False)
    
    thread = threading.Thread(target=process_with_timeout)
    thread.start()
    thread.join(timeout=30)
    
    if thread.is_alive():
        print("处理超时")
        ch.basic_nack(method.delivery_tag, requeue=False)

5.3 消费者阻塞

问题描述: 消费者处理缓慢导致消息堆积。

解决方案

python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

channel.basic_qos(prefetch_count=10)

def callback(ch, method, properties, body):
    queue_info = ch.queue_declare(
        queue='my_queue',
        passive=True
    )
    message_count = queue_info.method.message_count
    
    if message_count > 1000:
        import time
        time.sleep(0.1)
    
    process_message(body)
    ch.basic_ack(method.delivery_tag)

六、最佳实践建议

6.1 消费者设计

text
设计建议:
├── 使用手动确认模式
├── 设置合理的预取数量
├── 实现幂等性处理
├── 处理异常情况
├── 监控消费速率
└── 实现优雅关闭

6.2 性能优化

text
性能建议:
├── 使用多线程处理
├── 批量确认消息
├── 异步处理消息
├── 控制预取数量
└── 避免阻塞操作

6.3 可靠性保障

text
可靠性建议:
├── 正确处理确认和拒绝
├── 实现重试机制
├── 使用死信队列
├── 监控消费者状态
└── 处理网络中断

七、相关链接