Appearance
Pika 生产者实现
一、概述
本文档详细介绍如何使用 Pika 库实现消息生产者,包括基础发布、消息属性、发布确认等内容。
1.1 生产者架构
┌─────────────────────────────────────────────────────────────────────┐
│ Pika 生产者架构 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Application │ ──► │ Channel │ ──► │ Exchange │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Queue │ ◄── │ Binding │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘1.2 消息发布流程
python
发布流程:
1. 创建连接
2. 创建通道
3. 声明交换器
4. 声明队列
5. 绑定队列
6. 发布消息
7. 确认消息二、核心知识点
2.1 基础发布
python
import pika
def simple_publish():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
channel.basic_publish(
exchange='',
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2
)
)
print("消息发布成功")
connection.close()
simple_publish()2.2 消息属性
python
import pika
import json
import time
def publish_with_properties():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
properties = pika.BasicProperties(
content_type='application/json',
content_encoding='utf-8',
headers={
'x-custom-header': 'value',
'x-source': 'producer'
},
delivery_mode=2,
priority=5,
correlation_id='corr-123',
reply_to='reply-queue',
expiration='60000',
message_id='msg-456',
timestamp=int(time.time()),
type='order.created',
user_id='guest',
app_id='my-app'
)
message = json.dumps({
'order_id': 'ORD-001',
'amount': 99.99
})
channel.basic_publish(
exchange='orders',
routing_key='order.created',
body=message,
properties=properties
)
print("消息发布成功")
connection.close()2.3 发布确认
python
import pika
def publish_with_confirm():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='confirm.queue', durable=True)
try:
channel.basic_publish(
exchange='',
routing_key='confirm.queue',
body='Message with confirmation',
properties=pika.BasicProperties(delivery_mode=2),
mandatory=True
)
print("消息确认成功")
except pika.exceptions.UnroutableError:
print("消息无法路由")
connection.close()2.4 批量发布
python
import pika
def batch_publish():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='batch.queue', durable=True)
for i in range(100):
channel.basic_publish(
exchange='',
routing_key='batch.queue',
body=f'Message {i}',
properties=pika.BasicProperties(delivery_mode=2)
)
print("批量发布完成")
connection.close()2.5 返回回调
python
import pika
def publish_with_returns():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='test.queue', durable=True)
channel.basic_publish(
exchange='non.existent.exchange',
routing_key='test.key',
body='Test message',
properties=pika.BasicProperties(delivery_mode=2),
mandatory=True
)
connection.close()三、代码示例
3.1 完整生产者类
python
import pika
import json
import uuid
import time
from typing import Any, Dict, Optional
class PikaProducer:
def __init__(self, host: str = 'localhost', port: int = 5672,
username: str = 'guest', password: str = 'guest',
virtual_host: str = '/'):
self.host = host
self.port = port
self.username = username
self.password = password
self.virtual_host = virtual_host
self.connection = None
self.channel = None
self._connect()
def _connect(self):
credentials = pika.PlainCredentials(self.username, self.password)
parameters = pika.ConnectionParameters(
host=self.host,
port=self.port,
virtual_host=self.virtual_host,
credentials=credentials,
heartbeat=60,
connection_attempts=3,
retry_delay=5
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
print(f"连接到 {self.host}:{self.port}")
def declare_exchange(self, exchange: str, exchange_type: str = 'direct',
durable: bool = True):
self.channel.exchange_declare(
exchange=exchange,
exchange_type=exchange_type,
durable=durable
)
print(f"交换器 {exchange} 已声明")
def declare_queue(self, queue: str, durable: bool = True,
arguments: Optional[Dict] = None):
self.channel.queue_declare(
queue=queue,
durable=durable,
arguments=arguments
)
print(f"队列 {queue} 已声明")
def bind_queue(self, queue: str, exchange: str, routing_key: str):
self.channel.queue_bind(
queue=queue,
exchange=exchange,
routing_key=routing_key
)
print(f"绑定: {exchange} -> {queue} ({routing_key})")
def enable_confirms(self):
self.channel.confirm_delivery()
print("发布确认已启用")
def publish(self, exchange: str, routing_key: str, body: Any,
content_type: str = 'application/json',
delivery_mode: int = 2,
priority: int = 0,
headers: Optional[Dict] = None,
mandatory: bool = True) -> bool:
if isinstance(body, (dict, list)):
body = json.dumps(body).encode('utf-8')
elif isinstance(body, str):
body = body.encode('utf-8')
message_id = str(uuid.uuid4())
properties = pika.BasicProperties(
content_type=content_type,
delivery_mode=delivery_mode,
priority=priority,
message_id=message_id,
timestamp=int(time.time()),
headers=headers or {}
)
try:
self.channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=properties,
mandatory=mandatory
)
print(f"消息发布成功: {message_id}")
return True
except pika.exceptions.UnroutableError:
print(f"消息无法路由: {message_id}")
return False
except Exception as e:
print(f"发布失败: {e}")
return False
def publish_with_retry(self, exchange: str, routing_key: str,
body: Any, max_retries: int = 3,
retry_delay: float = 1.0) -> bool:
for attempt in range(max_retries):
success = self.publish(exchange, routing_key, body)
if success:
return True
if attempt < max_retries - 1:
time.sleep(retry_delay * (attempt + 1))
return False
def close(self):
if self.channel and self.channel.is_open:
self.channel.close()
if self.connection and self.connection.is_open:
self.connection.close()
print("连接已关闭")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
if __name__ == '__main__':
with PikaProducer() as producer:
producer.declare_exchange('orders.exchange', 'topic')
producer.declare_queue('orders.queue')
producer.bind_queue('orders.queue', 'orders.exchange', 'order.#')
producer.enable_confirms()
for i in range(5):
producer.publish(
exchange='orders.exchange',
routing_key='order.created',
body={
'order_id': f'ORD-{i:04d}',
'user_id': f'USER-{i % 3 + 1}',
'amount': 99.99 * (i + 1),
'created_at': time.strftime('%Y-%m-%d %H:%M:%S')
},
headers={'x-source': 'producer-demo'}
)3.2 订单消息生产者
python
import pika
import json
import time
from datetime import datetime
class OrderProducer:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self._setup_infrastructure()
def _setup_infrastructure(self):
self.channel.exchange_declare(
exchange='order.events',
exchange_type='topic',
durable=True
)
self.channel.queue_declare(
queue='order.created',
durable=True,
arguments={
'x-message-ttl': 86400000,
'x-dead-letter-exchange': 'dlx.exchange'
}
)
self.channel.queue_bind(
queue='order.created',
exchange='order.events',
routing_key='order.created'
)
self.channel.confirm_delivery()
def publish_order_created(self, order_id: str, user_id: str,
amount: float, items: list) -> bool:
event = {
'event_type': 'order.created',
'event_id': f'evt-{order_id}',
'order_id': order_id,
'user_id': user_id,
'amount': amount,
'items': items,
'timestamp': datetime.now().isoformat()
}
try:
self.channel.basic_publish(
exchange='order.events',
routing_key='order.created',
body=json.dumps(event).encode('utf-8'),
properties=pika.BasicProperties(
content_type='application/json',
delivery_mode=2,
message_id=event['event_id'],
type='order.created',
timestamp=int(time.time())
)
)
print(f"订单创建事件已发布: {order_id}")
return True
except Exception as e:
print(f"发布失败: {e}")
return False
def publish_order_paid(self, order_id: str, payment_id: str,
payment_method: str) -> bool:
event = {
'event_type': 'order.paid',
'event_id': f'evt-pay-{order_id}',
'order_id': order_id,
'payment_id': payment_id,
'payment_method': payment_method,
'timestamp': datetime.now().isoformat()
}
try:
self.channel.basic_publish(
exchange='order.events',
routing_key='order.paid',
body=json.dumps(event).encode('utf-8'),
properties=pika.BasicProperties(
content_type='application/json',
delivery_mode=2,
message_id=event['event_id'],
type='order.paid'
)
)
print(f"订单支付事件已发布: {order_id}")
return True
except Exception as e:
print(f"发布失败: {e}")
return False
def close(self):
self.connection.close()
if __name__ == '__main__':
producer = OrderProducer()
items = [
{'product_id': 'PROD-001', 'quantity': 2, 'price': 29.99},
{'product_id': 'PROD-002', 'quantity': 1, 'price': 49.99}
]
producer.publish_order_created('ORD-001', 'USER-001', 109.97, items)
producer.publish_order_paid('ORD-001', 'PAY-001', 'credit_card')
producer.close()四、实际应用场景
4.1 事件发布
python
import pika
import json
from datetime import datetime
class EventPublisher:
def __init__(self, exchange: str, exchange_type: str = 'topic'):
self.exchange = exchange
self.exchange_type = exchange_type
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange=self.exchange,
exchange_type=self.exchange_type,
durable=True
)
self.channel.confirm_delivery()
def publish_event(self, event_type: str, routing_key: str,
payload: dict) -> bool:
event = {
'event_id': f'{event_type}-{uuid.uuid4().hex[:8]}',
'event_type': event_type,
'payload': payload,
'timestamp': datetime.now().isoformat(),
'source': 'my-service'
}
try:
self.channel.basic_publish(
exchange=self.exchange,
routing_key=routing_key,
body=json.dumps(event).encode('utf-8'),
properties=pika.BasicProperties(
content_type='application/json',
delivery_mode=2,
message_id=event['event_id'],
type=event_type
)
)
print(f"事件已发布: {event_type}")
return True
except Exception as e:
print(f"发布失败: {e}")
return False
def close(self):
self.connection.close()
import uuid
publisher = EventPublisher('events', 'topic')
publisher.publish_event('user.registered', 'user.registered', {
'user_id': 'USER-001',
'email': 'user@example.com'
})
publisher.publish_event('order.created', 'order.created', {
'order_id': 'ORD-001',
'total': 99.99
})
publisher.close()4.2 延迟消息
python
import pika
import json
import time
class DelayedPublisher:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange='delayed.exchange',
exchange_type='x-delayed-message',
durable=True,
arguments={
'x-delayed-type': 'direct'
}
)
def publish_delayed(self, routing_key: str, message: dict,
delay_ms: int) -> bool:
try:
self.channel.basic_publish(
exchange='delayed.exchange',
routing_key=routing_key,
body=json.dumps(message).encode('utf-8'),
properties=pika.BasicProperties(
delivery_mode=2,
headers={'x-delay': delay_ms}
)
)
print(f"延迟消息已发布,延迟 {delay_ms}ms")
return True
except Exception as e:
print(f"发布失败: {e}")
return False
def close(self):
self.connection.close()
if __name__ == '__main__':
publisher = DelayedPublisher()
publisher.publish_delayed('notification', {
'message': 'Hello after delay!'
}, 5000)
publisher.close()五、常见问题与解决方案
5.1 消息无法路由
问题描述: 消息发布后无法路由到任何队列。
解决方案:
python
try:
channel.basic_publish(
exchange='exchange',
routing_key='routing.key',
body=message,
mandatory=True
)
except pika.exceptions.UnroutableError:
print("消息无法路由,处理失败情况")5.2 消息丢失
问题描述: 消息发布后丢失。
解决方案:
python
channel.confirm_delivery()
channel.basic_publish(
exchange='',
routing_key='queue',
body=message,
properties=pika.BasicProperties(delivery_mode=2)
)5.3 消息过大
问题描述: 消息体过大导致性能问题。
解决方案:
python
message = json.dumps(data)
compressed = gzip.compress(message.encode('utf-8'))
channel.basic_publish(
exchange='',
routing_key='queue',
body=compressed,
properties=pika.BasicProperties(
content_encoding='gzip'
)
)六、最佳实践建议
6.1 发布建议
text
建议:
├── 使用发布确认确保可靠性
├── 消息持久化
├── 设置 mandatory 标志
├── 实现重试机制
└── 监控发布失败6.2 性能优化
text
建议:
├── 批量发布消息
├── 复用连接和通道
├── 合理设置预取数量
└── 控制消息大小6.3 错误处理
text
建议:
├── 捕获 UnroutableError
├── 实现重试机制
├── 记录详细日志
└── 监控消息发送状态