Appearance
Pika 库介绍
一、概述
Pika 是 Python 中最流行的 RabbitMQ 客户端库,实现了 AMQP 0-9-1 协议。它提供了同步和异步两种编程模式,适合各种应用场景。
1.1 库简介
text
┌─────────────────────────────────────────────────────────────────────┐
│ Pika 库简介 │
├─────────────────────────────────────────────────────────────────────┤
│ 名称:Pika │
│ 语言:Python │
│ 协议:AMQP 0-9-1 │
│ 许可:BSD 3-Clause │
│ 仓库:https://github.com/pika/pika │
│ 最新版本:1.x 系列 │
└─────────────────────────────────────────────────────────────────────┘1.2 安装方式
bash
pip install pika
pip install pika==1.3.2
pip install pika[ssl]1.3 主要特性
text
核心特性:
├── 完整的 AMQP 0-9-1 协议支持
├── 同步和异步两种连接模式
├── 线程安全的连接
├── SSL/TLS 支持
├── 心跳机制
├── 自动重连
└── 消费者取消通知二、核心知识点
2.1 连接模式
text
┌─────────────────────────────────────────────────────────────────────┐
│ Pika 连接模式 │
├───────────────────────┬─────────────────────────────────────────────┤
│ 模式 │ 说明 │
├───────────────────────┼─────────────────────────────────────────────┤
│ BlockingConnection │ 同步阻塞连接 │
│ │ 适合简单脚本和命令行工具 │
│ │ 使用简单,但不适合高并发 │
├───────────────────────┼─────────────────────────────────────────────┤
│ SelectConnection │ 异步连接,基于 select │
│ │ 适合事件驱动应用 │
│ │ 单线程,高性能 │
├───────────────────────┼─────────────────────────────────────────────┤
│ LibevConnection │ 异步连接,基于 libev │
│ │ 需要安装额外依赖 │
├───────────────────────┼─────────────────────────────────────────────┤
│ TornadoConnection │ 异步连接,集成 Tornado │
│ │ 适合 Tornado 应用 │
├───────────────────────┼─────────────────────────────────────────────┤
│ AsyncioConnection │ 异步连接,集成 asyncio │
│ │ Python 3.5+ 推荐 │
└───────────────────────┴─────────────────────────────────────────────┘2.2 基础连接
2.2.1 同步连接
python
import pika
def basic_connection():
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters(
host='localhost',
port=5672,
virtual_host='/',
credentials=credentials,
heartbeat=60,
connection_attempts=3,
retry_delay=5
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
print("连接建立成功")
connection.close()
def simple_connection():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
print("简单连接建立成功")
connection.close()
if __name__ == '__main__':
basic_connection()
simple_connection()2.2.2 URL 连接
python
import pika
def url_connection():
url = 'amqp://guest:guest@localhost:5672/%2F'
parameters = pika.URLParameters(url)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
print("URL 连接建立成功")
connection.close()
def url_with_options():
url = 'amqp://guest:guest@localhost:5672/%2F?heartbeat=60&connection_attempts=3'
parameters = pika.URLParameters(url)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
print("带选项的 URL 连接建立成功")
connection.close()2.3 核心类介绍
2.3.1 ConnectionParameters
python
import pika
def connection_parameters_demo():
parameters = pika.ConnectionParameters(
host='localhost',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('guest', 'guest'),
heartbeat=60,
blocked_connection_timeout=300,
connection_attempts=3,
retry_delay=5,
socket_timeout=10,
stack_timeout=15,
locale='en_US',
client_properties={
'app': 'my-application',
'version': '1.0.0'
}
)
return parameters2.3.2 Channel
python
import pika
class ChannelDemo:
def __init__(self):
self.connection = None
self.channel = None
def connect(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
print("通道创建成功")
def declare_exchange(self, exchange_name, exchange_type='direct'):
self.channel.exchange_declare(
exchange=exchange_name,
exchange_type=exchange_type,
durable=True
)
print(f"交换器 {exchange_name} 声明成功")
def declare_queue(self, queue_name, arguments=None):
result = self.channel.queue_declare(
queue=queue_name,
durable=True,
arguments=arguments
)
print(f"队列 {queue_name} 声明成功")
return result.method.message_count
def bind_queue(self, queue_name, exchange_name, routing_key):
self.channel.queue_bind(
queue=queue_name,
exchange=exchange_name,
routing_key=routing_key
)
print(f"绑定成功: {exchange_name} -> {queue_name} ({routing_key})")
def close(self):
if self.channel:
self.channel.close()
if self.connection:
self.connection.close()
print("连接已关闭")
if __name__ == '__main__':
demo = ChannelDemo()
demo.connect()
demo.declare_exchange('test.exchange')
demo.declare_queue('test.queue')
demo.bind_queue('test.queue', 'test.exchange', 'test.key')
demo.close()2.4 消息属性
python
import pika
import json
import time
def publish_with_properties():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
properties = pika.BasicProperties(
content_type='application/json',
content_encoding='utf-8',
headers={
'x-custom-header': 'value',
'x-timestamp': int(time.time())
},
delivery_mode=2,
priority=5,
correlation_id='corr-123',
reply_to='reply.queue',
expiration='60000',
message_id='msg-456',
timestamp=int(time.time()),
type='order.created',
user_id='guest',
app_id='my-app'
)
message = json.dumps({'order_id': 'ORD-001', 'amount': 99.99})
channel.basic_publish(
exchange='',
routing_key='test.queue',
body=message,
properties=properties
)
print("消息发布成功")
connection.close()2.5 异步连接
python
import pika
import functools
class AsyncConsumer:
def __init__(self, queue_name):
self.queue_name = queue_name
self.connection = None
self.channel = None
def connect(self):
parameters = pika.ConnectionParameters('localhost')
self.connection = pika.SelectConnection(
parameters,
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error,
on_close_callback=self.on_connection_closed
)
def on_connection_open(self, unused_connection):
print("连接已建立")
self.connection.channel(on_open_callback=self.on_channel_open)
def on_connection_open_error(self, unused_connection, error):
print(f"连接建立失败: {error}")
self.connection.ioloop.stop()
def on_connection_closed(self, unused_connection, reason):
print(f"连接已关闭: {reason}")
self.connection.ioloop.stop()
def on_channel_open(self, channel):
print("通道已打开")
self.channel = channel
self.channel.add_on_close_callback(self.on_channel_closed)
self.channel.queue_declare(
queue=self.queue_name,
durable=True,
callback=self.on_queue_declared
)
def on_channel_closed(self, channel, reason):
print(f"通道已关闭: {reason}")
def on_queue_declared(self, method_frame):
print(f"队列已声明: {method_frame.method.queue}")
self.channel.basic_consume(
queue=self.queue_name,
on_message_callback=self.on_message,
auto_ack=False
)
def on_message(self, channel, basic_deliver, properties, body):
print(f"收到消息: {body.decode()}")
channel.basic_ack(
delivery_tag=basic_deliver.delivery_tag
)
def run(self):
self.connect()
self.connection.ioloop.start()
def stop(self):
if self.connection:
self.connection.close()
if __name__ == '__main__':
consumer = AsyncConsumer('test.queue')
try:
consumer.run()
except KeyboardInterrupt:
consumer.stop()三、代码示例
3.1 完整生产者示例
python
import pika
import json
import uuid
import time
from typing import Optional, Dict, Any
class ReliableProducer:
def __init__(self, host: str = 'localhost', port: int = 5672,
username: str = 'guest', password: str = 'guest',
virtual_host: str = '/'):
credentials = pika.PlainCredentials(username, password)
parameters = pika.ConnectionParameters(
host=host,
port=port,
virtual_host=virtual_host,
credentials=credentials,
heartbeat=60,
connection_attempts=3,
retry_delay=5
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.confirm_enabled = False
self.pending_confirms: Dict[str, bool] = {}
def enable_confirms(self):
self.channel.confirm_delivery()
self.confirm_enabled = True
print("发布确认已启用")
def declare_exchange(self, exchange: str, exchange_type: str = 'direct',
durable: bool = True):
self.channel.exchange_declare(
exchange=exchange,
exchange_type=exchange_type,
durable=durable
)
print(f"交换器 {exchange} 已声明")
def declare_queue(self, queue: str, durable: bool = True,
arguments: Optional[Dict] = None):
self.channel.queue_declare(
queue=queue,
durable=durable,
arguments=arguments
)
print(f"队列 {queue} 已声明")
def bind_queue(self, queue: str, exchange: str, routing_key: str):
self.channel.queue_bind(
queue=queue,
exchange=exchange,
routing_key=routing_key
)
print(f"绑定: {exchange} -> {queue} ({routing_key})")
def publish(self, exchange: str, routing_key: str, message: Any,
content_type: str = 'application/json',
delivery_mode: int = 2,
priority: int = 0,
headers: Optional[Dict] = None) -> bool:
message_id = str(uuid.uuid4())
if isinstance(message, (dict, list)):
body = json.dumps(message).encode('utf-8')
elif isinstance(message, str):
body = message.encode('utf-8')
else:
body = message
properties = pika.BasicProperties(
content_type=content_type,
delivery_mode=delivery_mode,
priority=priority,
message_id=message_id,
timestamp=int(time.time()),
headers=headers or {}
)
try:
if self.confirm_enabled:
self.channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=properties,
mandatory=True
)
print(f"消息发布成功: {message_id}")
return True
else:
self.channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=properties
)
return True
except pika.exceptions.UnroutableError:
print(f"消息无法路由: {message_id}")
return False
except Exception as e:
print(f"发布失败: {e}")
return False
def close(self):
if self.channel:
self.channel.close()
if self.connection:
self.connection.close()
print("连接已关闭")
if __name__ == '__main__':
producer = ReliableProducer()
producer.enable_confirms()
producer.declare_exchange('order.exchange', 'topic')
producer.declare_queue('order.queue')
producer.bind_queue('order.queue', 'order.exchange', 'order.#')
for i in range(5):
producer.publish(
exchange='order.exchange',
routing_key='order.created',
message={
'order_id': f'ORD-{i:04d}',
'user_id': f'USER-{i % 3 + 1}',
'amount': 99.99 * (i + 1),
'created_at': time.strftime('%Y-%m-%d %H:%M:%S')
},
headers={'x-source': 'producer-demo'}
)
producer.close()3.2 完整消费者示例
python
import pika
import json
import functools
from typing import Optional, Callable
class ReliableConsumer:
def __init__(self, queue: str, host: str = 'localhost',
port: int = 5672, username: str = 'guest',
password: str = 'guest', virtual_host: str = '/'):
self.queue = queue
credentials = pika.PlainCredentials(username, password)
self.parameters = pika.ConnectionParameters(
host=host,
port=port,
virtual_host=virtual_host,
credentials=credentials,
heartbeat=60
)
self.connection: Optional[pika.BlockingConnection] = None
self.channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None
self.consumer_tag: Optional[str] = None
self.message_handler: Optional[Callable] = None
def connect(self):
self.connection = pika.BlockingConnection(self.parameters)
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=10)
print("消费者已连接")
def set_message_handler(self, handler: Callable):
self.message_handler = handler
def on_message(self, channel, method, properties, body):
try:
message = body.decode('utf-8')
if properties.content_type == 'application/json':
message = json.loads(message)
print(f"收到消息: {message}")
print(f" 消息ID: {properties.message_id}")
print(f" 内容类型: {properties.content_type}")
if self.message_handler:
self.message_handler(message, properties)
channel.basic_ack(delivery_tag=method.delivery_tag)
print("消息已确认")
except Exception as e:
print(f"处理失败: {e}")
channel.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False
)
def start_consuming(self):
if not self.channel:
self.connect()
self.consumer_tag = self.channel.basic_consume(
queue=self.queue,
on_message_callback=self.on_message,
auto_ack=False
)
print(f"开始消费队列: {self.queue}")
try:
self.channel.start_consuming()
except KeyboardInterrupt:
self.stop_consuming()
def stop_consuming(self):
if self.channel and self.consumer_tag:
self.channel.basic_cancel(self.consumer_tag)
print("停止消费")
def close(self):
if self.channel:
self.channel.close()
if self.connection:
self.connection.close()
print("连接已关闭")
def message_handler(message, properties):
print(f"自定义处理: {message}")
if __name__ == '__main__':
consumer = ReliableConsumer('order.queue')
consumer.set_message_handler(message_handler)
try:
consumer.start_consuming()
except KeyboardInterrupt:
pass
finally:
consumer.close()四、实际应用场景
4.1 任务队列
python
import pika
import json
import time
class TaskQueue:
def __init__(self, queue_name: str):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self.queue_name = queue_name
self.channel.queue_declare(
queue=queue_name,
durable=True,
arguments={
'x-message-ttl': 86400000,
'x-dead-letter-exchange': 'dlx.exchange'
}
)
def submit_task(self, task_type: str, payload: dict):
message = {
'task_id': str(uuid.uuid4()),
'task_type': task_type,
'payload': payload,
'created_at': time.time()
}
self.channel.basic_publish(
exchange='',
routing_key=self.queue_name,
body=json.dumps(message).encode(),
properties=pika.BasicProperties(
delivery_mode=2,
content_type='application/json'
)
)
print(f"任务已提交: {message['task_id']}")
def process_tasks(self, handler: callable):
def callback(ch, method, properties, body):
task = json.loads(body)
try:
handler(task)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"任务处理失败: {e}")
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False
)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue=self.queue_name,
on_message_callback=callback
)
print("开始处理任务...")
self.channel.start_consuming()
def close(self):
self.connection.close()4.2 发布订阅
python
import pika
import json
class PubSubDemo:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
def setup(self):
self.channel.exchange_declare(
exchange='pubsub.exchange',
exchange_type='fanout',
durable=True
)
def publish(self, message: dict):
self.channel.basic_publish(
exchange='pubsub.exchange',
routing_key='',
body=json.dumps(message).encode(),
properties=pika.BasicProperties(
delivery_mode=2
)
)
print("消息已发布")
def subscribe(self, callback: callable):
result = self.channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
self.channel.queue_bind(
queue=queue_name,
exchange='pubsub.exchange'
)
self.channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
print("开始订阅...")
self.channel.start_consuming()
def close(self):
self.connection.close()五、常见问题与解决方案
5.1 连接断开
问题描述: 连接因心跳超时断开。
解决方案:
python
parameters = pika.ConnectionParameters(
host='localhost',
heartbeat=60,
blocked_connection_timeout=300
)5.2 消息丢失
问题描述: 消息发送后丢失。
解决方案:
python
channel.confirm_delivery()
channel.basic_publish(
exchange='',
routing_key='queue',
body=message,
properties=pika.BasicProperties(delivery_mode=2),
mandatory=True
)5.3 消费阻塞
问题描述: 消费者处理缓慢导致消息堆积。
解决方案:
python
channel.basic_qos(prefetch_count=10)六、最佳实践建议
6.1 连接管理
text
连接建议:
├── 使用连接池或单例模式
├── 设置合理的心跳间隔
├── 实现自动重连机制
├── 正确处理连接关闭
└── 使用 with 语句管理资源6.2 性能优化
text
性能建议:
├── 使用异步连接模式
├── 合理设置预取数量
├── 批量处理消息
├── 复用连接和通道
└── 控制消息大小6.3 可靠性保障
text
可靠性建议:
├── 启用发布确认
├── 使用手动确认模式
├── 消息持久化
├── 配置死信队列
└── 实现重试机制