Skip to content

RabbitMQ 生产者(Producer)

概述

生产者(Producer)是 RabbitMQ 消息模型中的消息创建者和发送者。它负责创建消息并将其发送到 Exchange,是整个消息流转过程的起点。生产者不需要知道消息最终会被哪个消费者处理,这种解耦设计是消息队列的核心优势之一。

生产者的核心职责

mermaid
graph LR
    A[业务数据] --> B[创建消息]
    B --> C[设置属性]
    C --> D[选择 Exchange]
    D --> E[设置 Routing Key]
    E --> F[发送消息]
    F --> G[处理确认]

生产者的主要职责包括:

  • 消息创建:将业务数据封装成消息
  • 属性设置:配置消息的元数据(持久化、优先级、TTL 等)
  • 路由选择:选择目标 Exchange 和 Routing Key
  • 消息发送:将消息发送到 RabbitMQ
  • 确认处理:处理消息发送结果(成功/失败)

核心知识点

1. 消息结构

一条完整的 RabbitMQ 消息包含两部分:

mermaid
graph TB
    subgraph 消息结构
        A[Message] --> B[Body 消息体]
        A --> C[Properties 消息属性]
        C --> D[content_type]
        C --> E[content_encoding]
        C --> F[delivery_mode]
        C --> G[priority]
        C --> H[correlation_id]
        C --> I[reply_to]
        C --> J[expiration]
        C --> K[message_id]
        C --> L[timestamp]
        C --> M[type]
        C --> N[user_id]
        C --> O[app_id]
        C --> P[headers]
    end

消息属性详解

属性类型说明
content_typestring消息体的 MIME 类型,如 application/json
content_encodingstring消息体的编码方式,如 utf-8
delivery_modeint1=非持久化,2=持久化
priorityint消息优先级(0-9)
correlation_idstring关联 ID,用于 RPC 场景
reply_tostring回复队列名称
expirationstring消息过期时间(毫秒)
message_idstring消息唯一标识
timestampint消息创建时间戳
typestring消息类型标识
user_idstring用户 ID(需验证)
app_idstring应用标识
headersarray自定义消息头

2. 消息持久化

消息持久化确保消息在 RabbitMQ 重启后不会丢失,需要三个条件同时满足:

mermaid
graph TB
    A[消息持久化] --> B[Exchange 持久化]
    A --> C[Queue 持久化]
    A --> D[Message 持久化]
    
    B --> B1[durable = true]
    C --> C1[durable = true]
    D --> D1[delivery_mode = 2]

3. 消息确认机制(Publisher Confirms)

Publisher Confirms 是生产者确认消息已被 RabbitMQ 接收的机制:

mermaid
sequenceDiagram
    participant P as Producer
    participant B as RabbitMQ Broker
    
    P->>B: 发送消息
    Note over P: 开启 confirm 模式
    B->>P: 返回 basic.ack
    Note over P: 收到确认

确认模式有三种:

  • 同步确认:发送后等待确认
  • 批量确认:发送一批后等待确认
  • 异步确认:注册回调处理确认

4. 事务机制

RabbitMQ 支持事务,但性能较差,不推荐在高吞吐场景使用:

mermaid
sequenceDiagram
    participant P as Producer
    participant B as RabbitMQ Broker
    
    P->>B: tx.select
    B->>P: tx.select-ok
    P->>B: basic.publish
    P->>B: basic.publish
    P->>B: tx.commit
    B->>P: tx.commit-ok

代码示例

基础生产者实现

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class BasicProducer
{
    private $connection;
    private $channel;
    
    public function __construct(array $config = [])
    {
        $config = array_merge([
            'host' => 'localhost',
            'port' => 5672,
            'user' => 'guest',
            'password' => 'guest',
            'vhost' => '/',
        ], $config);
        
        $this->connection = new AMQPStreamConnection(
            $config['host'],
            $config['port'],
            $config['user'],
            $config['password'],
            $config['vhost']
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function publish(
        string $exchange,
        string $routingKey,
        string $body,
        array $properties = []
    ): void {
        $message = new AMQPMessage($body, $properties);
        
        $this->channel->basic_publish(
            $message,
            $exchange,
            $routingKey
        );
    }
    
    public function close(): void
    {
        if ($this->channel) {
            $this->channel->close();
        }
        if ($this->connection) {
            $this->connection->close();
        }
    }
}

$producer = new BasicProducer();

$producer->publish(
    'orders_exchange',
    'order.created',
    json_encode(['order_id' => 1001, 'amount' => 299.99]),
    ['content_type' => 'application/json']
);

$producer->close();

带确认机制的生产者

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class ConfirmProducer
{
    private $connection;
    private $channel;
    private $pendingConfirms = [];
    private $nextPublishSeqNo = 1;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
        
        $this->channel->confirm_select();
        
        $this->channel->set_ack_handler(function ($deliveryTag, $multiple) {
            if ($multiple) {
                $keys = array_keys($this->pendingConfirms);
                foreach ($keys as $key) {
                    if ($key <= $deliveryTag) {
                        unset($this->pendingConfirms[$key]);
                    }
                }
            } else {
                unset($this->pendingConfirms[$deliveryTag]);
            }
            echo "Message acknowledged: delivery_tag = {$deliveryTag}\n";
        });
        
        $this->channel->set_nack_handler(function ($deliveryTag, $multiple) {
            echo "Message nacked: delivery_tag = {$deliveryTag}\n";
        });
    }
    
    public function publishWithConfirm(
        string $exchange,
        string $routingKey,
        string $body,
        array $properties = [],
        int $timeout = 5
    ): bool {
        $defaultProperties = [
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
        ];
        
        $properties = array_merge($defaultProperties, $properties);
        $message = new AMQPMessage($body, $properties);
        
        $this->channel->basic_publish($message, $exchange, $routingKey);
        
        $deliveryTag = $this->nextPublishSeqNo++;
        $this->pendingConfirms[$deliveryTag] = true;
        
        $this->channel->wait_for_pending_acks_returns($timeout);
        
        return !isset($this->pendingConfirms[$deliveryTag]);
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

try {
    $producer = new ConfirmProducer([
        'host' => 'localhost',
        'user' => 'guest',
        'password' => 'guest'
    ]);
    
    $success = $producer->publishWithConfirm(
        'orders_exchange',
        'order.created',
        json_encode(['order_id' => 1001])
    );
    
    if ($success) {
        echo "Message published and confirmed successfully\n";
    } else {
        echo "Message was not confirmed\n";
    }
    
    $producer->close();
} catch (Exception $e) {
    echo "Error: " . $e->getMessage() . "\n";
}

带重试机制的生产者

php
<?php

class RobustProducer
{
    private $config;
    private $connection;
    private $channel;
    private $maxRetries = 3;
    private $retryDelay = 1000;
    
    public function __construct(array $config)
    {
        $this->config = $config;
        $this->connect();
    }
    
    private function connect(): void
    {
        $this->connection = new AMQPStreamConnection(
            $this->config['host'],
            $this->config['port'],
            $this->config['user'],
            $this->config['password'],
            $this->config['vhost']
        );
        
        $this->channel = $this->connection->channel();
        $this->channel->confirm_select();
    }
    
    private function reconnect(): void
    {
        try {
            if ($this->channel) {
                $this->channel->close();
            }
            if ($this->connection) {
                $this->connection->close();
            }
        } catch (Exception $e) {
        }
        
        $this->connect();
    }
    
    public function publish(
        string $exchange,
        string $routingKey,
        string $body,
        array $properties = []
    ): bool {
        $attempt = 0;
        
        while ($attempt < $this->maxRetries) {
            try {
                $message = new AMQPMessage($body, array_merge([
                    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                ], $properties));
                
                $this->channel->basic_publish($message, $exchange, $routingKey);
                $this->channel->wait_for_pending_acks_returns(5);
                
                return true;
            } catch (Exception $e) {
                $attempt++;
                echo "Publish attempt {$attempt} failed: " . $e->getMessage() . "\n";
                
                if ($attempt < $this->maxRetries) {
                    usleep($this->retryDelay * 1000 * $attempt);
                    $this->reconnect();
                }
            }
        }
        
        return false;
    }
    
    public function close(): void
    {
        try {
            $this->channel->close();
            $this->connection->close();
        } catch (Exception $e) {
        }
    }
}

批量发送消息

php
<?php

class BatchProducer
{
    private $channel;
    private $connection;
    private $batchSize = 100;
    private $currentBatch = 0;
    
    public function __construct(array $config)
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
        $this->channel->confirm_select();
    }
    
    public function publishBatch(
        string $exchange,
        string $routingKey,
        array $messages
    ): array {
        $results = [
            'success' => 0,
            'failed' => 0,
            'total' => count($messages)
        ];
        
        foreach ($messages as $data) {
            $message = new AMQPMessage(
                json_encode($data),
                [
                    'content_type' => 'application/json',
                    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                ]
            );
            
            $this->channel->basic_publish($message, $exchange, $routingKey);
            $this->currentBatch++;
            
            if ($this->currentBatch >= $this->batchSize) {
                if ($this->waitForConfirms()) {
                    $results['success'] += $this->currentBatch;
                } else {
                    $results['failed'] += $this->currentBatch;
                }
                $this->currentBatch = 0;
            }
        }
        
        if ($this->currentBatch > 0) {
            if ($this->waitForConfirms()) {
                $results['success'] += $this->currentBatch;
            } else {
                $results['failed'] += $this->currentBatch;
            }
        }
        
        return $results;
    }
    
    private function waitForConfirms(int $timeout = 30): bool
    {
        try {
            $this->channel->wait_for_pending_acks_returns($timeout);
            return true;
        } catch (Exception $e) {
            return false;
        }
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$producer = new BatchProducer(['host' => 'localhost']);

$messages = [];
for ($i = 1; $i <= 500; $i++) {
    $messages[] = [
        'order_id' => $i,
        'product' => "Product {$i}",
        'amount' => rand(100, 1000) / 10
    ];
}

$results = $producer->publishBatch('orders_exchange', 'order.created', $messages);
echo "Published: {$results['success']} success, {$results['failed']} failed\n";

$producer->close();

设置消息属性示例

php
<?php

class MessagePropertiesExample
{
    public function createMessageWithAllProperties(): AMQPMessage
    {
        return new AMQPMessage(
            json_encode(['data' => 'example']),
            [
                'content_type' => 'application/json',
                'content_encoding' => 'utf-8',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'priority' => 5,
                'correlation_id' => uniqid('corr_', true),
                'reply_to' => 'reply_queue',
                'expiration' => '60000',
                'message_id' => uniqid('msg_', true),
                'timestamp' => time(),
                'type' => 'order_created',
                'user_id' => 'app_user',
                'app_id' => 'order_service',
                'headers' => [
                    'trace_id' => 'abc123',
                    'span_id' => 'def456',
                    'source' => 'web_frontend',
                    'version' => '1.0.0'
                ]
            ]
        );
    }
    
    public function createPriorityMessage(int $priority, string $body): AMQPMessage
    {
        return new AMQPMessage($body, [
            'priority' => min(9, max(0, $priority)),
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
        ]);
    }
    
    public function createTTLMessage(string $body, int $ttlMs): AMQPMessage
    {
        return new AMQPMessage($body, [
            'expiration' => (string)$ttlMs,
        ]);
    }
    
    public function createRPCRequest(string $body, string $replyTo, string $correlationId): AMQPMessage
    {
        return new AMQPMessage($body, [
            'reply_to' => $replyTo,
            'correlation_id' => $correlationId,
            'content_type' => 'application/json',
        ]);
    }
}

实际应用场景

1. 订单系统消息发送

php
<?php

class OrderMessageProducer
{
    private $producer;
    
    public function __construct(RobustProducer $producer)
    {
        $this->producer = $producer;
    }
    
    public function publishOrderCreated(array $order): bool
    {
        return $this->producer->publish(
            'orders_exchange',
            'order.created',
            json_encode([
                'event' => 'order.created',
                'data' => $order,
                'timestamp' => time(),
                'message_id' => uniqid('order_', true)
            ]),
            [
                'content_type' => 'application/json',
                'type' => 'order.created',
                'message_id' => $order['order_id'],
                'headers' => [
                    'version' => '1.0',
                    'source' => 'order_service'
                ]
            ]
        );
    }
    
    public function publishOrderPaid(int $orderId, float $amount): bool
    {
        return $this->producer->publish(
            'orders_exchange',
            'order.paid',
            json_encode([
                'event' => 'order.paid',
                'data' => [
                    'order_id' => $orderId,
                    'amount' => $amount,
                    'paid_at' => date('Y-m-d H:i:s')
                ]
            ])
        );
    }
    
    public function publishOrderCancelled(int $orderId, string $reason): bool
    {
        return $this->producer->publish(
            'orders_exchange',
            'order.cancelled',
            json_encode([
                'event' => 'order.cancelled',
                'data' => [
                    'order_id' => $orderId,
                    'reason' => $reason,
                    'cancelled_at' => date('Y-m-d H:i:s')
                ]
            ])
        );
    }
}

2. 日志收集系统

php
<?php

class LogProducer
{
    private $producer;
    private $serviceName;
    
    public function __construct(RobustProducer $producer, string $serviceName)
    {
        $this->producer = $producer;
        $this->serviceName = $serviceName;
    }
    
    public function log(string $level, string $message, array $context = []): bool
    {
        $routingKey = "log.{$level}";
        
        return $this->producer->publish(
            'logs_exchange',
            $routingKey,
            json_encode([
                'level' => $level,
                'message' => $message,
                'context' => $context,
                'service' => $this->serviceName,
                'hostname' => gethostname(),
                'timestamp' => date('Y-m-d H:i:s'),
                'trace_id' => $context['trace_id'] ?? null
            ]),
            [
                'content_type' => 'application/json',
                'headers' => [
                    'level' => $level,
                    'service' => $this->serviceName
                ]
            ]
        );
    }
    
    public function emergency(string $message, array $context = []): bool
    {
        return $this->log('emergency', $message, $context);
    }
    
    public function error(string $message, array $context = []): bool
    {
        return $this->log('error', $message, $context);
    }
    
    public function warning(string $message, array $context = []): bool
    {
        return $this->log('warning', $message, $context);
    }
    
    public function info(string $message, array $context = []): bool
    {
        return $this->log('info', $message, $context);
    }
    
    public function debug(string $message, array $context = []): bool
    {
        return $this->log('debug', $message, $context);
    }
}

3. 异步任务调度

php
<?php

class TaskProducer
{
    private $producer;
    
    public function __construct(RobustProducer $producer)
    {
        $this->producer = $producer;
    }
    
    public function dispatch(string $task, array $payload, array $options = []): bool
    {
        $options = array_merge([
            'delay' => 0,
            'priority' => 5,
            'max_retries' => 3,
            'timeout' => 300
        ], $options);
        
        $routingKey = "task.{$task}";
        
        $message = [
            'task' => $task,
            'payload' => $payload,
            'options' => $options,
            'attempts' => 0,
            'created_at' => time()
        ];
        
        $properties = [
            'content_type' => 'application/json',
            'priority' => $options['priority'],
            'headers' => [
                'task_type' => $task,
                'max_retries' => $options['max_retries']
            ]
        ];
        
        if ($options['delay'] > 0) {
            $properties['expiration'] = (string)($options['delay'] * 1000);
        }
        
        return $this->producer->publish(
            'tasks_exchange',
            $routingKey,
            json_encode($message),
            $properties
        );
    }
    
    public function sendEmail(string $to, string $subject, string $body, int $delay = 0): bool
    {
        return $this->dispatch('send_email', [
            'to' => $to,
            'subject' => $subject,
            'body' => $body
        ], ['delay' => $delay]);
    }
    
    public function generateReport(int $reportId, string $format, int $priority = 5): bool
    {
        return $this->dispatch('generate_report', [
            'report_id' => $reportId,
            'format' => $format
        ], ['priority' => $priority]);
    }
    
    public function cleanupExpiredData(): bool
    {
        return $this->dispatch('cleanup_expired', [], [
            'priority' => 1,
            'delay' => 3600
        ]);
    }
}

常见问题与解决方案

1. 消息发送失败

问题原因

  • 连接断开
  • Exchange 不存在
  • 权限不足
  • 消息过大

解决方案

php
<?php

class SafeProducer
{
    private $producer;
    
    public function safePublish(
        string $exchange,
        string $routingKey,
        string $body,
        array $properties = []
    ): array {
        $result = [
            'success' => false,
            'error' => null,
            'message_id' => null
        ];
        
        $maxSize = 128 * 1024 * 1024;
        if (strlen($body) > $maxSize) {
            $result['error'] = 'Message too large';
            return $result;
        }
        
        try {
            $success = $this->producer->publish($exchange, $routingKey, $body, $properties);
            
            if ($success) {
                $result['success'] = true;
                $result['message_id'] = $properties['message_id'] ?? md5($body);
            } else {
                $result['error'] = 'Publish failed after retries';
            }
        } catch (Exception $e) {
            $result['error'] = $e->getMessage();
        }
        
        return $result;
    }
}

2. 消息顺序问题

问题原因

  • 多个消费者并行处理
  • 消息重试导致顺序错乱

解决方案

php
<?php

class OrderedProducer
{
    private $producer;
    
    public function publishOrdered(string $exchange, string $baseRoutingKey, array $messages): bool
    {
        $shardCount = 10;
        
        foreach ($messages as $index => $message) {
            $shardId = $index % $shardCount;
            $routingKey = "{$baseRoutingKey}.shard.{$shardId}";
            
            $message['sequence'] = $index;
            $message['shard_id'] = $shardId;
            
            $this->producer->publish(
                $exchange,
                $routingKey,
                json_encode($message)
            );
        }
        
        return true;
    }
}

3. 消息丢失问题

问题原因

  • 未启用持久化
  • 未使用确认机制
  • Exchange 或 Queue 未声明

解决方案

php
<?php

class ReliableProducer
{
    private $channel;
    private $connection;
    
    public function __construct(array $config)
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'],
            $config['port'],
            $config['user'],
            $config['password'],
            $config['vhost']
        );
        
        $this->channel = $this->connection->channel();
        $this->channel->confirm_select();
    }
    
    public function publishReliable(
        string $exchange,
        string $exchangeType,
        string $routingKey,
        string $body
    ): bool {
        $this->channel->exchange_declare(
            $exchange,
            $exchangeType,
            false,
            true,
            false
        );
        
        $message = new AMQPMessage($body, [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'content_type' => 'application/json'
        ]);
        
        $this->channel->basic_publish($message, $exchange, $routingKey);
        
        $this->channel->wait_for_pending_acks_returns(5);
        
        return true;
    }
}

最佳实践建议

1. 连接池管理

php
<?php

class ProducerPool
{
    private static $instances = [];
    private $connections = [];
    private $config;
    
    public static function getInstance(string $name = 'default'): self
    {
        if (!isset(self::$instances[$name])) {
            self::$instances[$name] = new self();
        }
        return self::$instances[$name];
    }
    
    public function setConfig(array $config): void
    {
        $this->config = $config;
    }
    
    public function getProducer(): RobustProducer
    {
        $key = md5(serialize($this->config));
        
        if (!isset($this->connections[$key])) {
            $this->connections[$key] = new RobustProducer($this->config);
        }
        
        return $this->connections[$key];
    }
    
    public function closeAll(): void
    {
        foreach ($this->connections as $producer) {
            $producer->close();
        }
        $this->connections = [];
    }
}

2. 消息封装最佳实践

php
<?php

class MessageBuilder
{
    private $body;
    private $properties = [];
    private $headers = [];
    
    public static function create(): self
    {
        return new self();
    }
    
    public function withBody($data): self
    {
        $this->body = is_string($data) ? $data : json_encode($data);
        return $this;
    }
    
    public function withContentType(string $type): self
    {
        $this->properties['content_type'] = $type;
        return $this;
    }
    
    public function persistent(): self
    {
        $this->properties['delivery_mode'] = AMQPMessage::DELIVERY_MODE_PERSISTENT;
        return $this;
    }
    
    public function withPriority(int $priority): self
    {
        $this->properties['priority'] = min(9, max(0, $priority));
        return $this;
    }
    
    public function withTTL(int $milliseconds): self
    {
        $this->properties['expiration'] = (string)$milliseconds;
        return $this;
    }
    
    public function withMessageId(string $id): self
    {
        $this->properties['message_id'] = $id;
        return $this;
    }
    
    public function withHeader(string $key, $value): self
    {
        $this->headers[$key] = $value;
        return $this;
    }
    
    public function withTraceContext(string $traceId, string $spanId): self
    {
        $this->headers['trace_id'] = $traceId;
        $this->headers['span_id'] = $spanId;
        return $this;
    }
    
    public function build(): AMQPMessage
    {
        if (!empty($this->headers)) {
            $this->properties['headers'] = $this->headers;
        }
        
        return new AMQPMessage($this->body, $this->properties);
    }
}

$message = MessageBuilder::create()
    ->withBody(['order_id' => 1001])
    ->withContentType('application/json')
    ->persistent()
    ->withPriority(5)
    ->withTTL(60000)
    ->withMessageId(uniqid('msg_', true))
    ->withHeader('source', 'order_service')
    ->withTraceContext('trace123', 'span456')
    ->build();

3. 错误处理与日志

php
<?php

class LoggedProducer
{
    private $producer;
    private $logger;
    
    public function __construct(RobustProducer $producer, $logger)
    {
        $this->producer = $producer;
        $this->logger = $logger;
    }
    
    public function publish(
        string $exchange,
        string $routingKey,
        string $body,
        array $properties = []
    ): bool {
        $startTime = microtime(true);
        $messageId = $properties['message_id'] ?? uniqid();
        
        $this->logger->info('Publishing message', [
            'message_id' => $messageId,
            'exchange' => $exchange,
            'routing_key' => $routingKey,
            'body_size' => strlen($body)
        ]);
        
        try {
            $success = $this->producer->publish($exchange, $routingKey, $body, $properties);
            
            $duration = (microtime(true) - $startTime) * 1000;
            
            if ($success) {
                $this->logger->info('Message published successfully', [
                    'message_id' => $messageId,
                    'duration_ms' => round($duration, 2)
                ]);
            } else {
                $this->logger->error('Message publish failed', [
                    'message_id' => $messageId,
                    'duration_ms' => round($duration, 2)
                ]);
            }
            
            return $success;
        } catch (Exception $e) {
            $this->logger->error('Message publish exception', [
                'message_id' => $messageId,
                'error' => $e->getMessage(),
                'trace' => $e->getTraceAsString()
            ]);
            
            throw $e;
        }
    }
}

相关链接