Skip to content

Pika 生产者实现

一、概述

本文档详细介绍如何使用 Pika 库实现消息生产者,包括基础发布、消息属性、发布确认等内容。

1.1 生产者架构

┌─────────────────────────────────────────────────────────────────────┐
│                       Pika 生产者架构                                 │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐          │
│  │ Application  │ ──► │   Channel   │ ──► │   Exchange  │          │
│  └─────────────┘     └─────────────┘     └─────────────┘          │
│                              │                    │                │
│                              ▼                    ▼                │
│                        ┌─────────────┐     ┌─────────────┐        │
│                        │    Queue    │ ◄── │  Binding    │        │
│                        └─────────────┘     └─────────────┘        │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 消息发布流程

python
发布流程:
1. 创建连接
2. 创建通道
3. 声明交换器
4. 声明队列
5. 绑定队列
6. 发布消息
7. 确认消息

二、核心知识点

2.1 基础发布

python
import pika

def simple_publish():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    channel.queue_declare(queue='hello', durable=True)
    
    channel.basic_publish(
        exchange='',
        routing_key='hello',
        body='Hello World!',
        properties=pika.BasicProperties(
            delivery_mode=2
        )
    )
    
    print("消息发布成功")
    connection.close()

simple_publish()

2.2 消息属性

python
import pika
import json
import time

def publish_with_properties():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    properties = pika.BasicProperties(
        content_type='application/json',
        content_encoding='utf-8',
        
        headers={
            'x-custom-header': 'value',
            'x-source': 'producer'
        },
        
        delivery_mode=2,
        priority=5,
        
        correlation_id='corr-123',
        reply_to='reply-queue',
        
        expiration='60000',
        
        message_id='msg-456',
        timestamp=int(time.time()),
        
        type='order.created',
        user_id='guest',
        app_id='my-app'
    )
    
    message = json.dumps({
        'order_id': 'ORD-001',
        'amount': 99.99
    })
    
    channel.basic_publish(
        exchange='orders',
        routing_key='order.created',
        body=message,
        properties=properties
    )
    
    print("消息发布成功")
    connection.close()

2.3 发布确认

python
import pika

def publish_with_confirm():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    channel.confirm_delivery()
    
    channel.queue_declare(queue='confirm.queue', durable=True)
    
    try:
        channel.basic_publish(
            exchange='',
            routing_key='confirm.queue',
            body='Message with confirmation',
            properties=pika.BasicProperties(delivery_mode=2),
            mandatory=True
        )
        print("消息确认成功")
    except pika.exceptions.UnroutableError:
        print("消息无法路由")
    
    connection.close()

2.4 批量发布

python
import pika

def batch_publish():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    channel.queue_declare(queue='batch.queue', durable=True)
    
    for i in range(100):
        channel.basic_publish(
            exchange='',
            routing_key='batch.queue',
            body=f'Message {i}',
            properties=pika.BasicProperties(delivery_mode=2)
        )
    
    print("批量发布完成")
    connection.close()

2.5 返回回调

python
import pika

def publish_with_returns():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    channel.queue_declare(queue='test.queue', durable=True)
    
    channel.basic_publish(
        exchange='non.existent.exchange',
        routing_key='test.key',
        body='Test message',
        properties=pika.BasicProperties(delivery_mode=2),
        mandatory=True
    )
    
    connection.close()

三、代码示例

3.1 完整生产者类

python
import pika
import json
import uuid
import time
from typing import Any, Dict, Optional


class PikaProducer:
    
    def __init__(self, host: str = 'localhost', port: int = 5672,
                 username: str = 'guest', password: str = 'guest',
                 virtual_host: str = '/'):
        
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.virtual_host = virtual_host
        
        self.connection = None
        self.channel = None
        
        self._connect()
    
    def _connect(self):
        credentials = pika.PlainCredentials(self.username, self.password)
        parameters = pika.ConnectionParameters(
            host=self.host,
            port=self.port,
            virtual_host=self.virtual_host,
            credentials=credentials,
            heartbeat=60,
            connection_attempts=3,
            retry_delay=5
        )
        
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()
        
        print(f"连接到 {self.host}:{self.port}")
    
    def declare_exchange(self, exchange: str, exchange_type: str = 'direct',
                        durable: bool = True):
        self.channel.exchange_declare(
            exchange=exchange,
            exchange_type=exchange_type,
            durable=durable
        )
        print(f"交换器 {exchange} 已声明")
    
    def declare_queue(self, queue: str, durable: bool = True,
                    arguments: Optional[Dict] = None):
        self.channel.queue_declare(
            queue=queue,
            durable=durable,
            arguments=arguments
        )
        print(f"队列 {queue} 已声明")
    
    def bind_queue(self, queue: str, exchange: str, routing_key: str):
        self.channel.queue_bind(
            queue=queue,
            exchange=exchange,
            routing_key=routing_key
        )
        print(f"绑定: {exchange} -> {queue} ({routing_key})")
    
    def enable_confirms(self):
        self.channel.confirm_delivery()
        print("发布确认已启用")
    
    def publish(self, exchange: str, routing_key: str, body: Any,
               content_type: str = 'application/json',
               delivery_mode: int = 2,
               priority: int = 0,
               headers: Optional[Dict] = None,
               mandatory: bool = True) -> bool:
        
        if isinstance(body, (dict, list)):
            body = json.dumps(body).encode('utf-8')
        elif isinstance(body, str):
            body = body.encode('utf-8')
        
        message_id = str(uuid.uuid4())
        
        properties = pika.BasicProperties(
            content_type=content_type,
            delivery_mode=delivery_mode,
            priority=priority,
            message_id=message_id,
            timestamp=int(time.time()),
            headers=headers or {}
        )
        
        try:
            self.channel.basic_publish(
                exchange=exchange,
                routing_key=routing_key,
                body=body,
                properties=properties,
                mandatory=mandatory
            )
            print(f"消息发布成功: {message_id}")
            return True
            
        except pika.exceptions.UnroutableError:
            print(f"消息无法路由: {message_id}")
            return False
        
        except Exception as e:
            print(f"发布失败: {e}")
            return False
    
    def publish_with_retry(self, exchange: str, routing_key: str,
                          body: Any, max_retries: int = 3,
                          retry_delay: float = 1.0) -> bool:
        
        for attempt in range(max_retries):
            success = self.publish(exchange, routing_key, body)
            
            if success:
                return True
            
            if attempt < max_retries - 1:
                time.sleep(retry_delay * (attempt + 1))
        
        return False
    
    def close(self):
        if self.channel and self.channel.is_open:
            self.channel.close()
        
        if self.connection and self.connection.is_open:
            self.connection.close()
        
        print("连接已关闭")
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()


if __name__ == '__main__':
    with PikaProducer() as producer:
        producer.declare_exchange('orders.exchange', 'topic')
        producer.declare_queue('orders.queue')
        producer.bind_queue('orders.queue', 'orders.exchange', 'order.#')
        
        producer.enable_confirms()
        
        for i in range(5):
            producer.publish(
                exchange='orders.exchange',
                routing_key='order.created',
                body={
                    'order_id': f'ORD-{i:04d}',
                    'user_id': f'USER-{i % 3 + 1}',
                    'amount': 99.99 * (i + 1),
                    'created_at': time.strftime('%Y-%m-%d %H:%M:%S')
                },
                headers={'x-source': 'producer-demo'}
            )

3.2 订单消息生产者

python
import pika
import json
import time
from datetime import datetime


class OrderProducer:
    
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        self._setup_infrastructure()
    
    def _setup_infrastructure(self):
        self.channel.exchange_declare(
            exchange='order.events',
            exchange_type='topic',
            durable=True
        )
        
        self.channel.queue_declare(
            queue='order.created',
            durable=True,
            arguments={
                'x-message-ttl': 86400000,
                'x-dead-letter-exchange': 'dlx.exchange'
            }
        )
        
        self.channel.queue_bind(
            queue='order.created',
            exchange='order.events',
            routing_key='order.created'
        )
        
        self.channel.confirm_delivery()
    
    def publish_order_created(self, order_id: str, user_id: str,
                           amount: float, items: list) -> bool:
        
        event = {
            'event_type': 'order.created',
            'event_id': f'evt-{order_id}',
            'order_id': order_id,
            'user_id': user_id,
            'amount': amount,
            'items': items,
            'timestamp': datetime.now().isoformat()
        }
        
        try:
            self.channel.basic_publish(
                exchange='order.events',
                routing_key='order.created',
                body=json.dumps(event).encode('utf-8'),
                properties=pika.BasicProperties(
                    content_type='application/json',
                    delivery_mode=2,
                    message_id=event['event_id'],
                    type='order.created',
                    timestamp=int(time.time())
                )
            )
            print(f"订单创建事件已发布: {order_id}")
            return True
            
        except Exception as e:
            print(f"发布失败: {e}")
            return False
    
    def publish_order_paid(self, order_id: str, payment_id: str,
                          payment_method: str) -> bool:
        
        event = {
            'event_type': 'order.paid',
            'event_id': f'evt-pay-{order_id}',
            'order_id': order_id,
            'payment_id': payment_id,
            'payment_method': payment_method,
            'timestamp': datetime.now().isoformat()
        }
        
        try:
            self.channel.basic_publish(
                exchange='order.events',
                routing_key='order.paid',
                body=json.dumps(event).encode('utf-8'),
                properties=pika.BasicProperties(
                    content_type='application/json',
                    delivery_mode=2,
                    message_id=event['event_id'],
                    type='order.paid'
                )
            )
            print(f"订单支付事件已发布: {order_id}")
            return True
            
        except Exception as e:
            print(f"发布失败: {e}")
            return False
    
    def close(self):
        self.connection.close()


if __name__ == '__main__':
    producer = OrderProducer()
    
    items = [
        {'product_id': 'PROD-001', 'quantity': 2, 'price': 29.99},
        {'product_id': 'PROD-002', 'quantity': 1, 'price': 49.99}
    ]
    
    producer.publish_order_created('ORD-001', 'USER-001', 109.97, items)
    producer.publish_order_paid('ORD-001', 'PAY-001', 'credit_card')
    
    producer.close()

四、实际应用场景

4.1 事件发布

python
import pika
import json
from datetime import datetime


class EventPublisher:
    
    def __init__(self, exchange: str, exchange_type: str = 'topic'):
        self.exchange = exchange
        self.exchange_type = exchange_type
        
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        self.channel.exchange_declare(
            exchange=self.exchange,
            exchange_type=self.exchange_type,
            durable=True
        )
        
        self.channel.confirm_delivery()
    
    def publish_event(self, event_type: str, routing_key: str,
                     payload: dict) -> bool:
        
        event = {
            'event_id': f'{event_type}-{uuid.uuid4().hex[:8]}',
            'event_type': event_type,
            'payload': payload,
            'timestamp': datetime.now().isoformat(),
            'source': 'my-service'
        }
        
        try:
            self.channel.basic_publish(
                exchange=self.exchange,
                routing_key=routing_key,
                body=json.dumps(event).encode('utf-8'),
                properties=pika.BasicProperties(
                    content_type='application/json',
                    delivery_mode=2,
                    message_id=event['event_id'],
                    type=event_type
                )
            )
            print(f"事件已发布: {event_type}")
            return True
            
        except Exception as e:
            print(f"发布失败: {e}")
            return False
    
    def close(self):
        self.connection.close()


import uuid

publisher = EventPublisher('events', 'topic')

publisher.publish_event('user.registered', 'user.registered', {
    'user_id': 'USER-001',
    'email': 'user@example.com'
})

publisher.publish_event('order.created', 'order.created', {
    'order_id': 'ORD-001',
    'total': 99.99
})

publisher.close()

4.2 延迟消息

python
import pika
import json
import time


class DelayedPublisher:
    
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        self.channel.exchange_declare(
            exchange='delayed.exchange',
            exchange_type='x-delayed-message',
            durable=True,
            arguments={
                'x-delayed-type': 'direct'
            }
        )
    
    def publish_delayed(self, routing_key: str, message: dict,
                       delay_ms: int) -> bool:
        
        try:
            self.channel.basic_publish(
                exchange='delayed.exchange',
                routing_key=routing_key,
                body=json.dumps(message).encode('utf-8'),
                properties=pika.BasicProperties(
                    delivery_mode=2,
                    headers={'x-delay': delay_ms}
                )
            )
            print(f"延迟消息已发布,延迟 {delay_ms}ms")
            return True
            
        except Exception as e:
            print(f"发布失败: {e}")
            return False
    
    def close(self):
        self.connection.close()


if __name__ == '__main__':
    publisher = DelayedPublisher()
    
    publisher.publish_delayed('notification', {
        'message': 'Hello after delay!'
    }, 5000)
    
    publisher.close()

五、常见问题与解决方案

5.1 消息无法路由

问题描述: 消息发布后无法路由到任何队列。

解决方案

python
try:
    channel.basic_publish(
        exchange='exchange',
        routing_key='routing.key',
        body=message,
        mandatory=True
    )
except pika.exceptions.UnroutableError:
    print("消息无法路由,处理失败情况")

5.2 消息丢失

问题描述: 消息发布后丢失。

解决方案

python
channel.confirm_delivery()

channel.basic_publish(
    exchange='',
    routing_key='queue',
    body=message,
    properties=pika.BasicProperties(delivery_mode=2)
)

5.3 消息过大

问题描述: 消息体过大导致性能问题。

解决方案

python
message = json.dumps(data)
compressed = gzip.compress(message.encode('utf-8'))

channel.basic_publish(
    exchange='',
    routing_key='queue',
    body=compressed,
    properties=pika.BasicProperties(
        content_encoding='gzip'
    )
)

六、最佳实践建议

6.1 发布建议

text
建议:
├── 使用发布确认确保可靠性
├── 消息持久化
├── 设置 mandatory 标志
├── 实现重试机制
└── 监控发布失败

6.2 性能优化

text
建议:
├── 批量发布消息
├── 复用连接和通道
├── 合理设置预取数量
└── 控制消息大小

6.3 错误处理

text
建议:
├── 捕获 UnroutableError
├── 实现重试机制
├── 记录详细日志
└── 监控消息发送状态

七、相关链接