Skip to content

Pika 连接管理

一、概述

本文档详细介绍 Pika 库的连接管理,包括连接创建、配置、生命周期管理和重连机制。

1.1 连接架构

┌─────────────────────────────────────────────────────────────────────┐
│                       Pika 连接架构                                  │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Application Layer                         │   │
│  └──────────────────────────────┬──────────────────────────────┘   │
│                                 │                                   │
│                                 ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                  ConnectionFactory                            │   │
│  │                  (连接工厂)                                   │   │
│  └──────────────────────────────┬──────────────────────────────┘   │
│                                 │                                   │
│                                 ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                     Connection                               │   │
│  │                    (TCP 连接)                                 │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐   │   │
│  │  │ Channel1 │ │ Channel2 │ │ Channel3 │ │ ChannelN │   │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘   │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 连接类型

text
连接类型:
├── BlockingConnection - 同步阻塞连接
├── SelectConnection - 异步选择连接
├── LibevConnection - 基于 libev 的异步连接
├── TornadoConnection - Tornado 框架集成
└── AsyncioConnection - asyncio 异步连接

二、核心知识点

2.1 连接参数

python
import pika

def create_connection_parameters():
    return pika.ConnectionParameters(
        host='localhost',
        port=5672,
        virtual_host='/',
        credentials=pika.PlainCredentials('guest', 'guest'),
        
        heartbeat=60,
        blocked_connection_timeout=300,
        
        connection_attempts=3,
        retry_delay=5,
        
        socket_timeout=10,
        stack_timeout=15,
        
        locale='en_US',
        
        client_properties={
            'product': 'MyApp',
            'version': '1.0.0'
        }
    )

2.2 同步连接

python
import pika

class SyncConnection:
    
    def __init__(self, host='localhost', port=5672,
                 username='guest', password='guest'):
        
        credentials = pika.PlainCredentials(username, password)
        parameters = pika.ConnectionParameters(
            host=host,
            port=port,
            credentials=credentials,
            heartbeat=60
        )
        
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()
    
    def publish(self, queue, message):
        self.channel.basic_publish(
            exchange='',
            routing_key=queue,
            body=message.encode()
        )
    
    def consume(self, queue, callback):
        self.channel.basic_consume(
            queue=queue,
            on_message_callback=callback,
            auto_ack=True
        )
        self.channel.start_consuming()
    
    def close(self):
        if self.connection and self.connection.is_open:
            self.connection.close()

with SyncConnection() as conn:
    conn.publish('test.queue', 'Hello World!')

2.3 异步连接

python
import pika
import asyncio

class AsyncConnection:
    
    def __init__(self, host='localhost'):
        self.host = host
        self.connection = None
        self.channel = None
    
    async def connect(self):
        loop = asyncio.get_event_loop()
        
        parameters = pika.ConnectionParameters(
            host=self.host,
            connection_attempts=3,
            retry_delay=5
        )
        
        self.connection = pika.AsyncioConnection(
            parameters,
            on_open_callback=self.on_connection_open,
            on_open_error_callback=self.on_connection_error,
            on_close_callback=self.on_connection_close
        )
        
        await asyncio.sleep(0.1)
    
    def on_connection_open(self, connection):
        print("连接已打开")
        self.channel = connection.channel(
            on_open_callback=self.on_channel_open
        )
    
    def on_connection_error(self, connection, error):
        print(f"连接错误: {error}")
    
    def on_connection_close(self, connection, reason):
        print(f"连接已关闭: {reason}")
    
    def on_channel_open(self, channel):
        print("通道已打开")
        self.channel = channel
        
        channel.queue_declare(
            queue='test',
            durable=True,
            callback=self.on_queue_declared
        )
    
    def on_queue_declared(self, method_frame):
        print("队列已声明")
        
        channel.basic_consume(
            queue='test',
            on_message_callback=self.on_message
        )
    
    def on_message(self, channel, method, properties, body):
        print(f"收到消息: {body.decode()}")
        channel.basic_ack(delivery_tag=method.delivery_tag)
    
    async def publish(self, queue, message):
        def publish_callback():
            self.channel.basic_publish(
                exchange='',
                routing_key=queue,
                body=message.encode()
            )
        
        await asyncio.get_event_loop().run_in_executor(None, publish_callback)
    
    async def close(self):
        if self.connection and self.connection.is_open:
            self.connection.close()

2.4 连接生命周期

python
import pika

class ConnectionLifecycle:
    
    def __init__(self):
        self.connection = None
        self.channel = None
    
    def setup_connection(self):
        credentials = pika.PlainCredentials('guest', 'guest')
        parameters = pika.ConnectionParameters(
            host='localhost',
            credentials=credentials,
            heartbeat=60,
            blocked_connection_timeout=300
        )
        
        self.connection = pika.BlockingConnection(parameters)
        
        if self.connection.is_open:
            print("连接已建立")
        
        self.channel = self.connection.channel()
        
        if self.channel.is_open:
            print("通道已打开")
    
    def add_close_callback(self):
        self.connection.add_close_callback(self.on_connection_close)
        self.channel.add_close_callback(self.on_channel_close)
    
    def on_connection_close(self, connection, reason):
        print(f"连接关闭: {reason}")
    
    def on_channel_close(self, channel, reason):
        print(f"通道关闭: {reason}")
    
    def close_gracefully(self):
        if self.channel and self.channel.is_open:
            self.channel.close()
            print("通道已关闭")
        
        if self.connection and self.connection.is_open:
            self.connection.close()
            print("连接已关闭")
    
    def force_close(self):
        if self.connection:
            self.connection.close()
            print("强制关闭连接")

2.5 自动重连

python
import pika
import time

class ReconnectingConnection:
    
    def __init__(self, host='localhost', max_retries=5, retry_delay=5):
        self.host = host
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.connection = None
        self.channel = None
    
    def connect_with_retry(self):
        for attempt in range(self.max_retries):
            try:
                credentials = pika.PlainCredentials('guest', 'guest')
                parameters = pika.ConnectionParameters(
                    host=self.host,
                    credentials=credentials,
                    heartbeat=60
                )
                
                self.connection = pika.BlockingConnection(parameters)
                self.channel = self.connection.channel()
                
                print(f"连接成功 (尝试 {attempt + 1}/{self.max_retries})")
                return True
                
            except pika.exceptions.AMQPConnectionError as e:
                print(f"连接失败 (尝试 {attempt + 1}/{self.max_retries}): {e}")
                
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay)
        
        print("达到最大重试次数,连接失败")
        return False
    
    def reconnect_on_failure(self):
        while True:
            if self.connect_with_retry():
                break
            
            time.sleep(self.retry_delay)
    
    def close(self):
        if self.connection and self.connection.is_open:
            self.connection.close()

2.6 连接池

python
import pika
from queue import Queue, Empty
import threading

class ConnectionPool:
    
    def __init__(self, host='localhost', pool_size=5):
        self.host = host
        self.pool_size = pool_size
        self.pool = Queue(maxsize=pool_size)
        self.lock = threading.Lock()
        
        self._initialize_pool()
    
    def _initialize_pool(self):
        for _ in range(self.pool_size):
            connection = self._create_connection()
            self.pool.put(connection)
    
    def _create_connection(self):
        credentials = pika.PlainCredentials('guest', 'guest')
        parameters = pika.ConnectionParameters(
            host=self.host,
            credentials=credentials,
            heartbeat=60
        )
        
        connection = pika.BlockingConnection(parameters)
        return connection
    
    def get_connection(self, timeout=10):
        try:
            connection = self.pool.get(timeout=timeout)
            
            if not connection.is_open:
                connection = self._create_connection()
            
            return connection
            
        except Empty:
            with self.lock:
                if self.pool.qsize() < self.pool_size:
                    connection = self._create_connection()
                    return connection
            
            raise Exception("获取连接超时")
    
    def return_connection(self, connection):
        if connection and connection.is_open:
            try:
                self.pool.put_nowait(connection)
            except:
                connection.close()
        else:
            try:
                connection = self._create_connection()
                self.pool.put_nowait(connection)
            except:
                pass
    
    def close_all(self):
        while not self.pool.empty():
            try:
                connection = self.pool.get_nowait()
                connection.close()
            except:
                pass
    
    def execute(self, operation):
        connection = self.get_connection()
        channel = connection.channel()
        
        try:
            result = operation(channel)
            channel.close()
            self.return_connection(connection)
            return result
        except Exception as e:
            channel.close()
            self.return_connection(connection)
            raise e

三、代码示例

3.1 完整连接管理示例

python
import pika
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class ConnectionManager:
    
    def __init__(self, host='localhost', port=5672,
                 username='guest', password='guest',
                 virtual_host='/',
                 heartbeat=60,
                 connection_timeout=30,
                 max_retries=3,
                 retry_delay=5):
        
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.virtual_host = virtual_host
        self.heartbeat = heartbeat
        self.connection_timeout = connection_timeout
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        
        self.connection = None
        self.channel = None
    
    def create_parameters(self):
        credentials = pika.PlainCredentials(self.username, self.password)
        
        return pika.ConnectionParameters(
            host=self.host,
            port=self.port,
            virtual_host=self.virtual_host,
            credentials=credentials,
            heartbeat=self.heartbeat,
            connection_timeout=self.connection_timeout,
            connection_attempts=self.max_retries,
            retry_delay=self.retry_delay,
            blocked_connection_timeout=300,
            client_properties={
                'product': 'RabbitMQ-Python-Client',
                'version': '1.0.0',
                'platform': 'Python'
            }
        )
    
    def connect(self):
        parameters = self.create_parameters()
        
        for attempt in range(self.max_retries):
            try:
                self.connection = pika.BlockingConnection(parameters)
                self.channel = self.connection.channel()
                
                self.channel.basic_qos(prefetch_count=10)
                
                logger.info(f"连接建立成功 (尝试 {attempt + 1})")
                return True
                
            except pika.exceptions.AMQPConnectionError as e:
                logger.warning(f"连接失败: {e}")
                
                if attempt < self.max_retries - 1:
                    logger.info(f"等待 {self.retry_delay} 秒后重试...")
                    time.sleep(self.retry_delay)
        
        logger.error("达到最大重试次数,连接失败")
        return False
    
    def ensure_connection(self):
        if not self.connection or not self.connection.is_open:
            return self.connect()
        
        if not self.channel or not self.channel.is_open:
            self.channel = self.connection.channel()
        
        return True
    
    def close(self):
        if self.channel and self.channel.is_open:
            try:
                self.channel.close()
                logger.info("通道已关闭")
            except Exception as e:
                logger.error(f"关闭通道失败: {e}")
        
        if self.connection and self.connection.is_open:
            try:
                self.connection.close()
                logger.info("连接已关闭")
            except Exception as e:
                logger.error(f"关闭连接失败: {e}")
    
    def __enter__(self):
        self.connect()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()
    
    def publish(self, exchange, routing_key, body, properties=None):
        self.ensure_connection()
        
        if not properties:
            properties = pika.BasicProperties(
                delivery_mode=2,
                content_type='application/json'
            )
        
        self.channel.basic_publish(
            exchange=exchange,
            routing_key=routing_key,
            body=body.encode() if isinstance(body, str) else body,
            properties=properties
        )
    
    def consume(self, queue, callback, auto_ack=True):
        self.ensure_connection()
        
        self.channel.basic_consume(
            queue=queue,
            on_message_callback=callback,
            auto_ack=auto_ack
        )
        
        self.channel.start_consuming()


if __name__ == '__main__':
    with ConnectionManager('localhost') as manager:
        manager.publish('', 'test.queue', 'Hello World!')
        print("消息发布成功")

3.2 TLS 连接

python
import pika
import ssl

def create_ssl_connection():
    ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS)
    
    ssl_context.load_verify_locations('ca_certificate.pem')
    ssl_context.load_cert_chain(
        'client_certificate.pem',
        'client_key.pem'
    )
    
    ssl_options = pika.SSLOptions(ssl_context, 'localhost')
    
    credentials = pika.PlainCredentials('guest', 'guest')
    
    parameters = pika.ConnectionParameters(
        host='localhost',
        port=5671,
        credentials=credentials,
        ssl_options=ssl_options,
        heartbeat=60
    )
    
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    print("TLS 连接建立成功")
    
    return connection, channel

3.3 连接监控

python
import pika
import threading
import time

class ConnectionMonitor:
    
    def __init__(self, connection):
        self.connection = connection
        self.monitoring = False
        self.monitor_thread = None
    
    def start_monitoring(self, interval=30):
        self.monitoring = True
        self.monitor_thread = threading.Thread(
            target=self._monitor,
            args=(interval,),
            daemon=True
        )
        self.monitor_thread.start()
    
    def _monitor(self, interval):
        while self.monitoring:
            time.sleep(interval)
            
            if not self.connection.is_open:
                print("警告: 连接已断开!")
            
            if self.connection.protocol_state in ['CLOSED', 'CLOSING']:
                print("警告: 协议状态异常!")
    
    def stop_monitoring(self):
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
    
    def get_status(self):
        return {
            'is_open': self.connection.is_open,
            'protocol_state': self.connection.protocol_state,
            'server_properties': self.connection.server_properties,
            'bytes_sent': self.connection.bytes_sent,
            'bytes_received': self.connection.bytes_received
        }

四、实际应用场景

4.1 Web 应用连接管理

python
import pika
from flask import Flask

app = Flask(__name__)

connection = None
channel = None

@app.before_request
def get_channel():
    global connection, channel
    
    if not connection or not connection.is_open:
        credentials = pika.PlainCredentials('guest', 'guest')
        parameters = pika.ConnectionParameters(
            host='localhost',
            credentials=credentials
        )
        connection = pika.BlockingConnection(parameters)
        channel = connection.channel()
    
    return channel

@app.teardown_appcontext
def close_connection(exception=None):
    pass

4.2 定时任务连接

python
import pika
import schedule
import time

def send_heartbeat():
    credentials = pika.PlainCredentials('guest', 'guest')
    parameters = pika.ConnectionParameters(
        host='localhost',
        credentials=credentials
    )
    
    with pika.BlockingConnection(parameters) as connection:
        channel = connection.channel()
        
        channel.basic_publish(
            exchange='',
            routing_key='heartbeat.queue',
            body=f'Heartbeat at {time.time()}'.encode()
        )

schedule.every(1).minutes.do(send_heartbeat)

while True:
    schedule.run_pending()
    time.sleep(1)

五、常见问题与解决方案

5.1 连接超时

问题描述: 连接建立超时。

解决方案

python
parameters = pika.ConnectionParameters(
    host='localhost',
    connection_timeout=30,
    socket_timeout=30
)

5.2 心跳超时

问题描述: 连接因心跳超时断开。

解决方案

python
parameters = pika.ConnectionParameters(
    heartbeat=60,
    blocked_connection_timeout=300
)

5.3 连接泄漏

问题描述: 连接未正确关闭。

解决方案

python
with pika.BlockingConnection(parameters) as connection:
    channel = connection.channel()
    # 使用连接
# 自动关闭

六、最佳实践建议

6.1 连接管理

text
建议:
├── 使用 with 语句管理连接生命周期
├── 设置合理的心跳间隔
├── 实现重连机制
├── 监控连接状态
└── 正确处理异常

6.2 性能优化

text
建议:
├── 复用连接,避免频繁创建
├── 使用线程安全的连接池
├── 合理设置超时时间
└── 使用异步连接模式

6.3 安全配置

text
建议:
├── 生产环境使用 TLS
├── 使用强密码
├── 限制用户权限
└── 定期更新证书

七、相关链接