Appearance
Pika 消费者实现
一、概述
消费者(Consumer)是从 RabbitMQ 队列中接收和处理消息的组件。本文档详细介绍如何使用 Pika 库实现消息消费者,包括同步消费、异步消费、消息确认等内容。
1.1 消费者架构
┌─────────────────────────────────────────────────────────────────────┐
│ 消费者架构 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Queue │ ──► │ Channel │ ──► │ Consumer │ │
│ └─────────────┘ └─────────────┘ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Application │ │
│ │ (处理消息) │ │
│ └─────────────┘ │
│ │
│ 消费模式: │
│ ├── Push 模式:服务端主动推送消息 │
│ └── Pull 模式:客户端主动拉取消息 │
│ │
│ 消息确认: │
│ ├── 自动确认:auto_ack = True │
│ └── 手动确认:basic_ack/basic_nack/basic_reject │
│ │
└─────────────────────────────────────────────────────────────────────┘1.2 消费方式对比
text
┌─────────────────────────────────────────────────────────────────────┐
│ 消费方式对比 │
├───────────────┬─────────────────────────────────────────────────────┤
│ 方式 │ 说明 │
├───────────────┼─────────────────────────────────────────────────────┤
│ Push 模式 │ 服务端主动推送消息到消费者 │
│ (basic_consume)│ 实时性好,推荐使用 │
│ │ 需要注册回调函数 │
├───────────────┼─────────────────────────────────────────────────────┤
│ Pull 模式 │ 客户端主动从队列获取消息 │
│ (basic_get) │ 适合低频消费场景 │
│ │ 每次只获取一条消息 │
└───────────────┴─────────────────────────────────────────────────────┘二、核心知识点
2.1 Push 模式消费
2.1.1 基础消费者
python
import pika
def basic_consumer():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f"收到消息: {body.decode()}")
channel.basic_consume(
queue='hello',
on_message_callback=callback,
auto_ack=True
)
print('开始消费,按 CTRL+C 退出')
channel.start_consuming()
basic_consumer()2.1.2 手动确认消费
python
import pika
def manual_ack_consumer():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
try:
print(f"处理消息: {body.decode()}")
import time
time.sleep(1)
ch.basic_ack(delivery_tag=method.delivery_tag)
print("消息已确认")
except Exception as e:
print(f"处理失败: {e}")
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False
)
channel.basic_consume(
queue='task_queue',
on_message_callback=callback,
auto_ack=False
)
print('开始消费,按 CTRL+C 退出')
channel.start_consuming()
manual_ack_consumer()2.2 Pull 模式消费
python
import pika
def pull_consumer():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='pull_queue')
while True:
method, properties, body = channel.basic_get(
queue='pull_queue',
auto_ack=False
)
if method:
print(f"获取消息: {body.decode()}")
try:
process_message(body)
channel.basic_ack(method.delivery_tag)
print("消息已确认")
except Exception as e:
print(f"处理失败: {e}")
channel.basic_nack(method.delivery_tag, requeue=False)
else:
print("队列为空,等待...")
import time
time.sleep(1)
def process_message(body):
print(f"处理: {body.decode()}")
pull_consumer()2.3 消息确认
2.3.1 确认方式
python
import pika
def demonstrate_ack_modes():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
def auto_ack_callback(ch, method, properties, body):
print(f"自动确认: {body.decode()}")
channel.basic_consume(
queue='auto_ack_queue',
on_message_callback=auto_ack_callback,
auto_ack=True
)
def manual_ack_callback(ch, method, properties, body):
print(f"手动确认: {body.decode()}")
ch.basic_ack(method.delivery_tag)
ch.basic_ack(method.delivery_tag, multiple=True)
ch.basic_nack(method.delivery_tag, requeue=True)
ch.basic_nack(method.delivery_tag, multiple=True, requeue=False)
ch.basic_reject(method.delivery_tag, requeue=False)2.3.2 确认策略
text
┌─────────────────────────────────────────────────────────────────────┐
│ 消息确认策略 │
├───────────────┬─────────────────────────────────────────────────────┤
│ 方法 │ 说明 │
├───────────────┼─────────────────────────────────────────────────────┤
│ basic_ack │ 确认消息成功处理 │
│ │ multiple=False: 确认单条消息 │
│ │ multiple=True: 确认该标签及之前的所有消息 │
├───────────────┼─────────────────────────────────────────────────────┤
│ basic_nack │ 否定确认消息 │
│ │ multiple: 是否批量 │
│ │ requeue: 是否重新入队 │
├───────────────┼─────────────────────────────────────────────────────┤
│ basic_reject │ 拒绝单条消息 │
│ │ requeue: 是否重新入队 │
│ │ 比 basic_nack 更轻量 │
├───────────────┼─────────────────────────────────────────────────────┤
│ basic_recover│ 重新投递所有未确认的消息 │
│ │ requeue=True: 重新入队后投递 │
│ │ requeue=False: 尝试投递给同一消费者 │
└───────────────┴─────────────────────────────────────────────────────┘2.4 QoS 设置
python
import pika
def qos_consumer():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='qos_queue', durable=True)
channel.basic_qos(
prefetch_count=10,
global_qos=False
)
def callback(ch, method, properties, body):
print(f"处理消息: {body.decode()}")
import time
time.sleep(1)
ch.basic_ack(method.delivery_tag)
channel.basic_consume(
queue='qos_queue',
on_message_callback=callback,
auto_ack=False
)
print('开始消费,预取数量: 10')
channel.start_consuming()2.5 消费者标签
python
import pika
def consumer_tag_demo():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='tag_queue')
def callback(ch, method, properties, body):
print(f"收到消息: {body.decode()}")
ch.basic_ack(method.delivery_tag)
consumer_tag = channel.basic_consume(
queue='tag_queue',
on_message_callback=callback,
consumer_tag='my-consumer-tag',
auto_ack=False
)
print(f"消费者标签: {consumer_tag}")
import threading
import time
def cancel_after_delay():
time.sleep(10)
channel.basic_cancel(consumer_tag)
print("消费者已取消")
cancel_thread = threading.Thread(target=cancel_after_delay)
cancel_thread.start()
channel.start_consuming()2.6 消费者取消
python
import pika
def cancel_callback_demo():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='cancel_queue')
def on_cancel(method_frame):
print(f"消费者被取消: {method_frame}")
def callback(ch, method, properties, body):
print(f"收到消息: {body.decode()}")
ch.basic_ack(method.delivery_tag)
consumer_tag = channel.basic_consume(
queue='cancel_queue',
on_message_callback=callback,
on_cancel_callback=on_cancel,
auto_ack=False
)
channel.start_consuming()三、代码示例
3.1 完整消费者类
python
import pika
import json
import logging
import signal
import sys
from typing import Callable, Optional, Dict, Any
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PikaConsumer:
def __init__(self, host: str = 'localhost', port: int = 5672,
username: str = 'guest', password: str = 'guest',
virtual_host: str = '/'):
self.host = host
self.port = port
self.username = username
self.password = password
self.virtual_host = virtual_host
self.connection: Optional[pika.BlockingConnection] = None
self.channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None
self.consumer_tag: Optional[str] = None
self.message_handler: Optional[Callable] = None
self.running = False
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
logger.info(f"收到信号 {signum},准备关闭...")
self.stop()
sys.exit(0)
def connect(self):
credentials = pika.PlainCredentials(self.username, self.password)
parameters = pika.ConnectionParameters(
host=self.host,
port=self.port,
virtual_host=self.virtual_host,
credentials=credentials,
heartbeat=60,
connection_attempts=3,
retry_delay=5
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
logger.info(f"连接到 {self.host}:{self.port}")
def set_message_handler(self, handler: Callable):
self.message_handler = handler
def declare_queue(self, queue: str, durable: bool = True,
arguments: Optional[Dict] = None):
if not self.channel:
self.connect()
self.channel.queue_declare(
queue=queue,
durable=durable,
arguments=arguments
)
logger.info(f"队列 {queue} 已声明")
def set_qos(self, prefetch_count: int = 10):
if not self.channel:
self.connect()
self.channel.basic_qos(prefetch_count=prefetch_count)
logger.info(f"QoS 设置: prefetch_count={prefetch_count}")
def _on_message(self, channel, method, properties, body):
try:
message = body.decode('utf-8')
if properties.content_type == 'application/json':
message = json.loads(message)
logger.debug(f"收到消息: {properties.message_id}")
if self.message_handler:
self.message_handler(message, properties.headers or {})
channel.basic_ack(delivery_tag=method.delivery_tag)
logger.debug("消息已确认")
except Exception as e:
logger.error(f"处理消息失败: {e}")
channel.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False
)
def start_consuming(self, queue: str, consumer_tag: str = ''):
if not self.channel:
self.connect()
self.consumer_tag = self.channel.basic_consume(
queue=queue,
on_message_callback=self._on_message,
auto_ack=False,
consumer_tag=consumer_tag
)
logger.info(f"开始消费队列: {queue}")
self.running = True
try:
self.channel.start_consuming()
except Exception as e:
logger.error(f"消费异常: {e}")
self.stop()
def stop(self):
self.running = False
if self.channel and self.consumer_tag:
try:
self.channel.basic_cancel(self.consumer_tag)
logger.info("消费者已取消")
except Exception as e:
logger.error(f"取消消费者失败: {e}")
self.close()
def close(self):
if self.channel and self.channel.is_open:
self.channel.close()
if self.connection and self.connection.is_open:
self.connection.close()
logger.info("连接已关闭")
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
if __name__ == '__main__':
def handle_message(message: Any, headers: Dict):
print(f"处理消息: {message}")
print(f"消息头: {headers}")
with PikaConsumer() as consumer:
consumer.set_message_handler(handle_message)
consumer.declare_queue('test.queue')
consumer.set_qos(prefetch_count=10)
consumer.start_consuming('test.queue')3.2 多线程消费者
python
import pika
import threading
import logging
from queue import Queue
from typing import Callable
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MultiThreadConsumer:
def __init__(self, queue_name: str, num_workers: int = 5,
prefetch_count: int = 10):
self.queue_name = queue_name
self.num_workers = num_workers
self.prefetch_count = prefetch_count
self.connection = None
self.channel = None
self.message_queue = Queue(maxsize=100)
self.workers = []
self.running = False
def connect(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue_name, durable=True)
self.channel.basic_qos(prefetch_count=self.prefetch_count)
logger.info("连接已建立")
def start_workers(self, handler: Callable):
for i in range(self.num_workers):
worker = threading.Thread(
target=self._worker_loop,
args=(handler, i),
daemon=True
)
worker.start()
self.workers.append(worker)
logger.info(f"启动 {self.num_workers} 个工作线程")
def _worker_loop(self, handler: Callable, worker_id: int):
while self.running:
try:
message_data = self.message_queue.get(timeout=1)
channel, method, properties, body = message_data
try:
handler(body.decode('utf-8'))
channel.basic_ack(method.delivery_tag)
logger.debug(f"Worker {worker_id}: 消息已确认")
except Exception as e:
logger.error(f"Worker {worker_id}: 处理失败: {e}")
channel.basic_nack(method.delivery_tag, requeue=False)
except:
pass
def _on_message(self, channel, method, properties, body):
self.message_queue.put((channel, method, properties, body))
def start(self, handler: Callable):
self.connect()
self.running = True
self.start_workers(handler)
self.channel.basic_consume(
queue=self.queue_name,
on_message_callback=self._on_message,
auto_ack=False
)
logger.info("开始消费...")
try:
self.channel.start_consuming()
except KeyboardInterrupt:
self.stop()
def stop(self):
self.running = False
for worker in self.workers:
worker.join(timeout=5)
if self.connection:
self.connection.close()
logger.info("消费者已停止")
if __name__ == '__main__':
def process_message(message: str):
import time
time.sleep(0.1)
print(f"处理: {message}")
consumer = MultiThreadConsumer('test.queue', num_workers=5)
try:
consumer.start(process_message)
except KeyboardInterrupt:
consumer.stop()3.3 优先级消费者
python
import pika
import threading
from queue import PriorityQueue
from typing import Tuple, Any
class PriorityConsumer:
def __init__(self, queue_name: str):
self.queue_name = queue_name
self.connection = None
self.channel = None
self.priority_queue = PriorityQueue()
self.running = False
def connect(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
args = {'x-max-priority': 10}
self.channel.queue_declare(
queue=self.queue_name,
durable=True,
arguments=args
)
self.channel.basic_qos(prefetch_count=10)
def _on_message(self, channel, method, properties, body):
priority = properties.priority or 0
self.priority_queue.put(
(-priority, channel, method, body),
block=False
)
def _process_messages(self, handler):
while self.running:
try:
neg_priority, channel, method, body = self.priority_queue.get(timeout=1)
priority = -neg_priority
try:
handler(body.decode('utf-8'))
channel.basic_ack(method.delivery_tag)
print(f"处理优先级 {priority} 消息")
except Exception as e:
print(f"处理失败: {e}")
channel.basic_nack(method.delivery_tag, requeue=False)
except:
pass
def start(self, handler):
self.connect()
self.running = True
process_thread = threading.Thread(
target=self._process_messages,
args=(handler,),
daemon=True
)
process_thread.start()
self.channel.basic_consume(
queue=self.queue_name,
on_message_callback=self._on_message,
auto_ack=False
)
print("开始消费优先级消息...")
try:
self.channel.start_consuming()
except KeyboardInterrupt:
self.stop()
def stop(self):
self.running = False
if self.connection:
self.connection.close()
if __name__ == '__main__':
def handle_priority_message(message: str):
print(f"处理: {message}")
consumer = PriorityConsumer('priority.queue')
try:
consumer.start(handle_priority_message)
except KeyboardInterrupt:
consumer.stop()四、实际应用场景
4.1 订单处理消费者
python
import pika
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class OrderConsumer:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self._setup_infrastructure()
def _setup_infrastructure(self):
self.channel.exchange_declare(
exchange='order.events',
exchange_type='topic',
durable=True
)
self.channel.queue_declare(
queue='order.processing',
durable=True,
arguments={
'x-message-ttl': 86400000,
'x-dead-letter-exchange': 'dlx.exchange'
}
)
self.channel.queue_bind(
queue='order.processing',
exchange='order.events',
routing_key='order.#'
)
self.channel.basic_qos(prefetch_count=10)
def _on_order_message(self, channel, method, properties, body):
try:
event = json.loads(body.decode('utf-8'))
event_type = properties.type or 'unknown'
logger.info(f"收到订单事件: {event_type}")
if event_type == 'order.created':
self._handle_order_created(event)
elif event_type == 'order.paid':
self._handle_order_paid(event)
elif event_type == 'order.cancelled':
self._handle_order_cancelled(event)
else:
logger.warning(f"未知事件类型: {event_type}")
channel.basic_ack(method.delivery_tag)
logger.info(f"事件处理完成: {event_type}")
except Exception as e:
logger.error(f"处理失败: {e}")
channel.basic_nack(method.delivery_tag, requeue=False)
def _handle_order_created(self, event: dict):
order_id = event.get('order_id')
logger.info(f"处理订单创建: {order_id}")
def _handle_order_paid(self, event: dict):
order_id = event.get('order_id')
logger.info(f"处理订单支付: {order_id}")
def _handle_order_cancelled(self, event: dict):
order_id = event.get('order_id')
logger.info(f"处理订单取消: {order_id}")
def start(self):
self.channel.basic_consume(
queue='order.processing',
on_message_callback=self._on_order_message,
auto_ack=False
)
logger.info("开始消费订单事件...")
try:
self.channel.start_consuming()
except KeyboardInterrupt:
self.stop()
def stop(self):
self.connection.close()
logger.info("消费者已停止")
if __name__ == '__main__':
consumer = OrderConsumer()
consumer.start()4.2 日志处理消费者
python
import pika
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LogConsumer:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self._setup_infrastructure()
def _setup_infrastructure(self):
self.channel.exchange_declare(
exchange='logs',
exchange_type='topic',
durable=True
)
self.channel.queue_declare(
queue='logs.error',
durable=True
)
self.channel.queue_declare(
queue='logs.all',
durable=True
)
self.channel.queue_bind(
queue='logs.error',
exchange='logs',
routing_key='*.error'
)
self.channel.queue_bind(
queue='logs.all',
exchange='logs',
routing_key='#'
)
self.channel.basic_qos(prefetch_count=20)
def _on_error_log(self, channel, method, properties, body):
try:
log_entry = json.loads(body.decode('utf-8'))
logger.error(
f"[{log_entry.get('service')}] "
f"{log_entry.get('message')}"
)
self._send_alert(log_entry)
channel.basic_ack(method.delivery_tag)
except Exception as e:
logger.error(f"处理错误日志失败: {e}")
channel.basic_nack(method.delivery_tag, requeue=False)
def _on_all_logs(self, channel, method, properties, body):
try:
log_entry = json.loads(body.decode('utf-8'))
level = log_entry.get('level', 'INFO')
service = log_entry.get('service', 'unknown')
message = log_entry.get('message', '')
logger.info(f"[{level}] {service}: {message}")
self._store_log(log_entry)
channel.basic_ack(method.delivery_tag)
except Exception as e:
logger.error(f"处理日志失败: {e}")
channel.basic_nack(method.delivery_tag, requeue=False)
def _send_alert(self, log_entry: dict):
logger.warning(f"发送告警: {log_entry.get('message')}")
def _store_log(self, log_entry: dict):
pass
def start(self):
self.channel.basic_consume(
queue='logs.error',
on_message_callback=self._on_error_log,
auto_ack=False
)
self.channel.basic_consume(
queue='logs.all',
on_message_callback=self._on_all_logs,
auto_ack=False
)
logger.info("开始消费日志...")
try:
self.channel.start_consuming()
except KeyboardInterrupt:
self.stop()
def stop(self):
self.connection.close()
if __name__ == '__main__':
consumer = LogConsumer()
consumer.start()五、常见问题与解决方案
5.1 消息重复消费
问题描述: 消息被重复处理。
解决方案:
python
import pika
import redis
redis_client = redis.Redis()
def callback(ch, method, properties, body):
message_id = properties.message_id
if redis_client.exists(f"processed:{message_id}"):
print(f"消息已处理,跳过: {message_id}")
ch.basic_ack(method.delivery_tag)
return
process_message(body)
redis_client.setex(f"processed:{message_id}", 86400, "1")
ch.basic_ack(method.delivery_tag)5.2 消息处理超时
问题描述: 消息处理时间过长。
解决方案:
python
import pika
import threading
def callback(ch, method, properties, body):
def process_with_timeout():
try:
process_message(body)
ch.basic_ack(method.delivery_tag)
except Exception as e:
ch.basic_nack(method.delivery_tag, requeue=False)
thread = threading.Thread(target=process_with_timeout)
thread.start()
thread.join(timeout=30)
if thread.is_alive():
print("处理超时")
ch.basic_nack(method.delivery_tag, requeue=False)5.3 消费者阻塞
问题描述: 消费者处理缓慢导致消息堆积。
解决方案:
python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.basic_qos(prefetch_count=10)
def callback(ch, method, properties, body):
queue_info = ch.queue_declare(
queue='my_queue',
passive=True
)
message_count = queue_info.method.message_count
if message_count > 1000:
import time
time.sleep(0.1)
process_message(body)
ch.basic_ack(method.delivery_tag)六、最佳实践建议
6.1 消费者设计
text
设计建议:
├── 使用手动确认模式
├── 设置合理的预取数量
├── 实现幂等性处理
├── 处理异常情况
├── 监控消费速率
└── 实现优雅关闭6.2 性能优化
text
性能建议:
├── 使用多线程处理
├── 批量确认消息
├── 异步处理消息
├── 控制预取数量
└── 避免阻塞操作6.3 可靠性保障
text
可靠性建议:
├── 正确处理确认和拒绝
├── 实现重试机制
├── 使用死信队列
├── 监控消费者状态
└── 处理网络中断