Skip to content

Pika 库介绍

一、概述

Pika 是 Python 中最流行的 RabbitMQ 客户端库,实现了 AMQP 0-9-1 协议。它提供了同步和异步两种编程模式,适合各种应用场景。

1.1 库简介

text
┌─────────────────────────────────────────────────────────────────────┐
│                        Pika 库简介                                   │
├─────────────────────────────────────────────────────────────────────┤
│  名称:Pika                                                         │
│  语言:Python                                                       │
│  协议:AMQP 0-9-1                                                   │
│  许可:BSD 3-Clause                                                 │
│  仓库:https://github.com/pika/pika                                 │
│  最新版本:1.x 系列                                                 │
└─────────────────────────────────────────────────────────────────────┘

1.2 安装方式

bash
pip install pika

pip install pika==1.3.2

pip install pika[ssl]

1.3 主要特性

text
核心特性:
├── 完整的 AMQP 0-9-1 协议支持
├── 同步和异步两种连接模式
├── 线程安全的连接
├── SSL/TLS 支持
├── 心跳机制
├── 自动重连
└── 消费者取消通知

二、核心知识点

2.1 连接模式

text
┌─────────────────────────────────────────────────────────────────────┐
│                     Pika 连接模式                                    │
├───────────────────────┬─────────────────────────────────────────────┤
│         模式          │                    说明                      │
├───────────────────────┼─────────────────────────────────────────────┤
│ BlockingConnection    │ 同步阻塞连接                                │
│                       │ 适合简单脚本和命令行工具                     │
│                       │ 使用简单,但不适合高并发                     │
├───────────────────────┼─────────────────────────────────────────────┤
│ SelectConnection      │ 异步连接,基于 select                        │
│                       │ 适合事件驱动应用                            │
│                       │ 单线程,高性能                              │
├───────────────────────┼─────────────────────────────────────────────┤
│ LibevConnection       │ 异步连接,基于 libev                         │
│                       │ 需要安装额外依赖                            │
├───────────────────────┼─────────────────────────────────────────────┤
│ TornadoConnection     │ 异步连接,集成 Tornado                       │
│                       │ 适合 Tornado 应用                           │
├───────────────────────┼─────────────────────────────────────────────┤
│ AsyncioConnection     │ 异步连接,集成 asyncio                       │
│                       │ Python 3.5+ 推荐                           │
└───────────────────────┴─────────────────────────────────────────────┘

2.2 基础连接

2.2.1 同步连接

python
import pika

def basic_connection():
    credentials = pika.PlainCredentials('guest', 'guest')
    
    parameters = pika.ConnectionParameters(
        host='localhost',
        port=5672,
        virtual_host='/',
        credentials=credentials,
        heartbeat=60,
        connection_attempts=3,
        retry_delay=5
    )
    
    connection = pika.BlockingConnection(parameters)
    
    channel = connection.channel()
    
    print("连接建立成功")
    
    connection.close()

def simple_connection():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    print("简单连接建立成功")
    
    connection.close()

if __name__ == '__main__':
    basic_connection()
    simple_connection()

2.2.2 URL 连接

python
import pika

def url_connection():
    url = 'amqp://guest:guest@localhost:5672/%2F'
    
    parameters = pika.URLParameters(url)
    
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    print("URL 连接建立成功")
    
    connection.close()

def url_with_options():
    url = 'amqp://guest:guest@localhost:5672/%2F?heartbeat=60&connection_attempts=3'
    
    parameters = pika.URLParameters(url)
    
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    print("带选项的 URL 连接建立成功")
    
    connection.close()

2.3 核心类介绍

2.3.1 ConnectionParameters

python
import pika

def connection_parameters_demo():
    parameters = pika.ConnectionParameters(
        host='localhost',
        port=5672,
        virtual_host='/',
        credentials=pika.PlainCredentials('guest', 'guest'),
        
        heartbeat=60,
        blocked_connection_timeout=300,
        
        connection_attempts=3,
        retry_delay=5,
        
        socket_timeout=10,
        stack_timeout=15,
        
        locale='en_US',
        client_properties={
            'app': 'my-application',
            'version': '1.0.0'
        }
    )
    
    return parameters

2.3.2 Channel

python
import pika

class ChannelDemo:
    
    def __init__(self):
        self.connection = None
        self.channel = None
    
    def connect(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        print("通道创建成功")
    
    def declare_exchange(self, exchange_name, exchange_type='direct'):
        self.channel.exchange_declare(
            exchange=exchange_name,
            exchange_type=exchange_type,
            durable=True
        )
        print(f"交换器 {exchange_name} 声明成功")
    
    def declare_queue(self, queue_name, arguments=None):
        result = self.channel.queue_declare(
            queue=queue_name,
            durable=True,
            arguments=arguments
        )
        print(f"队列 {queue_name} 声明成功")
        return result.method.message_count
    
    def bind_queue(self, queue_name, exchange_name, routing_key):
        self.channel.queue_bind(
            queue=queue_name,
            exchange=exchange_name,
            routing_key=routing_key
        )
        print(f"绑定成功: {exchange_name} -> {queue_name} ({routing_key})")
    
    def close(self):
        if self.channel:
            self.channel.close()
        if self.connection:
            self.connection.close()
        print("连接已关闭")

if __name__ == '__main__':
    demo = ChannelDemo()
    demo.connect()
    demo.declare_exchange('test.exchange')
    demo.declare_queue('test.queue')
    demo.bind_queue('test.queue', 'test.exchange', 'test.key')
    demo.close()

2.4 消息属性

python
import pika
import json
import time

def publish_with_properties():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    properties = pika.BasicProperties(
        content_type='application/json',
        content_encoding='utf-8',
        headers={
            'x-custom-header': 'value',
            'x-timestamp': int(time.time())
        },
        delivery_mode=2,
        priority=5,
        correlation_id='corr-123',
        reply_to='reply.queue',
        expiration='60000',
        message_id='msg-456',
        timestamp=int(time.time()),
        type='order.created',
        user_id='guest',
        app_id='my-app'
    )
    
    message = json.dumps({'order_id': 'ORD-001', 'amount': 99.99})
    
    channel.basic_publish(
        exchange='',
        routing_key='test.queue',
        body=message,
        properties=properties
    )
    
    print("消息发布成功")
    connection.close()

2.5 异步连接

python
import pika
import functools

class AsyncConsumer:
    
    def __init__(self, queue_name):
        self.queue_name = queue_name
        self.connection = None
        self.channel = None
    
    def connect(self):
        parameters = pika.ConnectionParameters('localhost')
        
        self.connection = pika.SelectConnection(
            parameters,
            on_open_callback=self.on_connection_open,
            on_open_error_callback=self.on_connection_open_error,
            on_close_callback=self.on_connection_closed
        )
    
    def on_connection_open(self, unused_connection):
        print("连接已建立")
        self.connection.channel(on_open_callback=self.on_channel_open)
    
    def on_connection_open_error(self, unused_connection, error):
        print(f"连接建立失败: {error}")
        self.connection.ioloop.stop()
    
    def on_connection_closed(self, unused_connection, reason):
        print(f"连接已关闭: {reason}")
        self.connection.ioloop.stop()
    
    def on_channel_open(self, channel):
        print("通道已打开")
        self.channel = channel
        self.channel.add_on_close_callback(self.on_channel_closed)
        
        self.channel.queue_declare(
            queue=self.queue_name,
            durable=True,
            callback=self.on_queue_declared
        )
    
    def on_channel_closed(self, channel, reason):
        print(f"通道已关闭: {reason}")
    
    def on_queue_declared(self, method_frame):
        print(f"队列已声明: {method_frame.method.queue}")
        
        self.channel.basic_consume(
            queue=self.queue_name,
            on_message_callback=self.on_message,
            auto_ack=False
        )
    
    def on_message(self, channel, basic_deliver, properties, body):
        print(f"收到消息: {body.decode()}")
        
        channel.basic_ack(
            delivery_tag=basic_deliver.delivery_tag
        )
    
    def run(self):
        self.connect()
        self.connection.ioloop.start()
    
    def stop(self):
        if self.connection:
            self.connection.close()

if __name__ == '__main__':
    consumer = AsyncConsumer('test.queue')
    
    try:
        consumer.run()
    except KeyboardInterrupt:
        consumer.stop()

三、代码示例

3.1 完整生产者示例

python
import pika
import json
import uuid
import time
from typing import Optional, Dict, Any

class ReliableProducer:
    
    def __init__(self, host: str = 'localhost', port: int = 5672,
                 username: str = 'guest', password: str = 'guest',
                 virtual_host: str = '/'):
        
        credentials = pika.PlainCredentials(username, password)
        parameters = pika.ConnectionParameters(
            host=host,
            port=port,
            virtual_host=virtual_host,
            credentials=credentials,
            heartbeat=60,
            connection_attempts=3,
            retry_delay=5
        )
        
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()
        
        self.confirm_enabled = False
        self.pending_confirms: Dict[str, bool] = {}
    
    def enable_confirms(self):
        self.channel.confirm_delivery()
        self.confirm_enabled = True
        print("发布确认已启用")
    
    def declare_exchange(self, exchange: str, exchange_type: str = 'direct',
                        durable: bool = True):
        self.channel.exchange_declare(
            exchange=exchange,
            exchange_type=exchange_type,
            durable=durable
        )
        print(f"交换器 {exchange} 已声明")
    
    def declare_queue(self, queue: str, durable: bool = True,
                     arguments: Optional[Dict] = None):
        self.channel.queue_declare(
            queue=queue,
            durable=durable,
            arguments=arguments
        )
        print(f"队列 {queue} 已声明")
    
    def bind_queue(self, queue: str, exchange: str, routing_key: str):
        self.channel.queue_bind(
            queue=queue,
            exchange=exchange,
            routing_key=routing_key
        )
        print(f"绑定: {exchange} -> {queue} ({routing_key})")
    
    def publish(self, exchange: str, routing_key: str, message: Any,
                content_type: str = 'application/json',
                delivery_mode: int = 2,
                priority: int = 0,
                headers: Optional[Dict] = None) -> bool:
        
        message_id = str(uuid.uuid4())
        
        if isinstance(message, (dict, list)):
            body = json.dumps(message).encode('utf-8')
        elif isinstance(message, str):
            body = message.encode('utf-8')
        else:
            body = message
        
        properties = pika.BasicProperties(
            content_type=content_type,
            delivery_mode=delivery_mode,
            priority=priority,
            message_id=message_id,
            timestamp=int(time.time()),
            headers=headers or {}
        )
        
        try:
            if self.confirm_enabled:
                self.channel.basic_publish(
                    exchange=exchange,
                    routing_key=routing_key,
                    body=body,
                    properties=properties,
                    mandatory=True
                )
                print(f"消息发布成功: {message_id}")
                return True
            else:
                self.channel.basic_publish(
                    exchange=exchange,
                    routing_key=routing_key,
                    body=body,
                    properties=properties
                )
                return True
        except pika.exceptions.UnroutableError:
            print(f"消息无法路由: {message_id}")
            return False
        except Exception as e:
            print(f"发布失败: {e}")
            return False
    
    def close(self):
        if self.channel:
            self.channel.close()
        if self.connection:
            self.connection.close()
        print("连接已关闭")


if __name__ == '__main__':
    producer = ReliableProducer()
    producer.enable_confirms()
    
    producer.declare_exchange('order.exchange', 'topic')
    producer.declare_queue('order.queue')
    producer.bind_queue('order.queue', 'order.exchange', 'order.#')
    
    for i in range(5):
        producer.publish(
            exchange='order.exchange',
            routing_key='order.created',
            message={
                'order_id': f'ORD-{i:04d}',
                'user_id': f'USER-{i % 3 + 1}',
                'amount': 99.99 * (i + 1),
                'created_at': time.strftime('%Y-%m-%d %H:%M:%S')
            },
            headers={'x-source': 'producer-demo'}
        )
    
    producer.close()

3.2 完整消费者示例

python
import pika
import json
import functools
from typing import Optional, Callable

class ReliableConsumer:
    
    def __init__(self, queue: str, host: str = 'localhost',
                 port: int = 5672, username: str = 'guest',
                 password: str = 'guest', virtual_host: str = '/'):
        
        self.queue = queue
        
        credentials = pika.PlainCredentials(username, password)
        self.parameters = pika.ConnectionParameters(
            host=host,
            port=port,
            virtual_host=virtual_host,
            credentials=credentials,
            heartbeat=60
        )
        
        self.connection: Optional[pika.BlockingConnection] = None
        self.channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None
        self.consumer_tag: Optional[str] = None
        self.message_handler: Optional[Callable] = None
    
    def connect(self):
        self.connection = pika.BlockingConnection(self.parameters)
        self.channel = self.connection.channel()
        
        self.channel.basic_qos(prefetch_count=10)
        
        print("消费者已连接")
    
    def set_message_handler(self, handler: Callable):
        self.message_handler = handler
    
    def on_message(self, channel, method, properties, body):
        try:
            message = body.decode('utf-8')
            
            if properties.content_type == 'application/json':
                message = json.loads(message)
            
            print(f"收到消息: {message}")
            print(f"  消息ID: {properties.message_id}")
            print(f"  内容类型: {properties.content_type}")
            
            if self.message_handler:
                self.message_handler(message, properties)
            
            channel.basic_ack(delivery_tag=method.delivery_tag)
            print("消息已确认")
            
        except Exception as e:
            print(f"处理失败: {e}")
            channel.basic_nack(
                delivery_tag=method.delivery_tag,
                requeue=False
            )
    
    def start_consuming(self):
        if not self.channel:
            self.connect()
        
        self.consumer_tag = self.channel.basic_consume(
            queue=self.queue,
            on_message_callback=self.on_message,
            auto_ack=False
        )
        
        print(f"开始消费队列: {self.queue}")
        
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.stop_consuming()
    
    def stop_consuming(self):
        if self.channel and self.consumer_tag:
            self.channel.basic_cancel(self.consumer_tag)
            print("停止消费")
    
    def close(self):
        if self.channel:
            self.channel.close()
        if self.connection:
            self.connection.close()
        print("连接已关闭")


def message_handler(message, properties):
    print(f"自定义处理: {message}")


if __name__ == '__main__':
    consumer = ReliableConsumer('order.queue')
    consumer.set_message_handler(message_handler)
    
    try:
        consumer.start_consuming()
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()

四、实际应用场景

4.1 任务队列

python
import pika
import json
import time

class TaskQueue:
    
    def __init__(self, queue_name: str):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        self.queue_name = queue_name
        
        self.channel.queue_declare(
            queue=queue_name,
            durable=True,
            arguments={
                'x-message-ttl': 86400000,
                'x-dead-letter-exchange': 'dlx.exchange'
            }
        )
    
    def submit_task(self, task_type: str, payload: dict):
        message = {
            'task_id': str(uuid.uuid4()),
            'task_type': task_type,
            'payload': payload,
            'created_at': time.time()
        }
        
        self.channel.basic_publish(
            exchange='',
            routing_key=self.queue_name,
            body=json.dumps(message).encode(),
            properties=pika.BasicProperties(
                delivery_mode=2,
                content_type='application/json'
            )
        )
        
        print(f"任务已提交: {message['task_id']}")
    
    def process_tasks(self, handler: callable):
        def callback(ch, method, properties, body):
            task = json.loads(body)
            
            try:
                handler(task)
                ch.basic_ack(delivery_tag=method.delivery_tag)
            except Exception as e:
                print(f"任务处理失败: {e}")
                ch.basic_nack(
                    delivery_tag=method.delivery_tag,
                    requeue=False
                )
        
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue=self.queue_name,
            on_message_callback=callback
        )
        
        print("开始处理任务...")
        self.channel.start_consuming()
    
    def close(self):
        self.connection.close()

4.2 发布订阅

python
import pika
import json

class PubSubDemo:
    
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
    
    def setup(self):
        self.channel.exchange_declare(
            exchange='pubsub.exchange',
            exchange_type='fanout',
            durable=True
        )
    
    def publish(self, message: dict):
        self.channel.basic_publish(
            exchange='pubsub.exchange',
            routing_key='',
            body=json.dumps(message).encode(),
            properties=pika.BasicProperties(
                delivery_mode=2
            )
        )
        print("消息已发布")
    
    def subscribe(self, callback: callable):
        result = self.channel.queue_declare(queue='', exclusive=True)
        queue_name = result.method.queue
        
        self.channel.queue_bind(
            queue=queue_name,
            exchange='pubsub.exchange'
        )
        
        self.channel.basic_consume(
            queue=queue_name,
            on_message_callback=callback,
            auto_ack=True
        )
        
        print("开始订阅...")
        self.channel.start_consuming()
    
    def close(self):
        self.connection.close()

五、常见问题与解决方案

5.1 连接断开

问题描述: 连接因心跳超时断开。

解决方案

python
parameters = pika.ConnectionParameters(
    host='localhost',
    heartbeat=60,
    blocked_connection_timeout=300
)

5.2 消息丢失

问题描述: 消息发送后丢失。

解决方案

python
channel.confirm_delivery()

channel.basic_publish(
    exchange='',
    routing_key='queue',
    body=message,
    properties=pika.BasicProperties(delivery_mode=2),
    mandatory=True
)

5.3 消费阻塞

问题描述: 消费者处理缓慢导致消息堆积。

解决方案

python
channel.basic_qos(prefetch_count=10)

六、最佳实践建议

6.1 连接管理

text
连接建议:
├── 使用连接池或单例模式
├── 设置合理的心跳间隔
├── 实现自动重连机制
├── 正确处理连接关闭
└── 使用 with 语句管理资源

6.2 性能优化

text
性能建议:
├── 使用异步连接模式
├── 合理设置预取数量
├── 批量处理消息
├── 复用连接和通道
└── 控制消息大小

6.3 可靠性保障

text
可靠性建议:
├── 启用发布确认
├── 使用手动确认模式
├── 消息持久化
├── 配置死信队列
└── 实现重试机制

七、相关链接