Appearance
RabbitMQ 生产者(Producer)
概述
生产者(Producer)是 RabbitMQ 消息模型中的消息创建者和发送者。它负责创建消息并将其发送到 Exchange,是整个消息流转过程的起点。生产者不需要知道消息最终会被哪个消费者处理,这种解耦设计是消息队列的核心优势之一。
生产者的核心职责
mermaid
graph LR
A[业务数据] --> B[创建消息]
B --> C[设置属性]
C --> D[选择 Exchange]
D --> E[设置 Routing Key]
E --> F[发送消息]
F --> G[处理确认]生产者的主要职责包括:
- 消息创建:将业务数据封装成消息
- 属性设置:配置消息的元数据(持久化、优先级、TTL 等)
- 路由选择:选择目标 Exchange 和 Routing Key
- 消息发送:将消息发送到 RabbitMQ
- 确认处理:处理消息发送结果(成功/失败)
核心知识点
1. 消息结构
一条完整的 RabbitMQ 消息包含两部分:
mermaid
graph TB
subgraph 消息结构
A[Message] --> B[Body 消息体]
A --> C[Properties 消息属性]
C --> D[content_type]
C --> E[content_encoding]
C --> F[delivery_mode]
C --> G[priority]
C --> H[correlation_id]
C --> I[reply_to]
C --> J[expiration]
C --> K[message_id]
C --> L[timestamp]
C --> M[type]
C --> N[user_id]
C --> O[app_id]
C --> P[headers]
end消息属性详解
| 属性 | 类型 | 说明 |
|---|---|---|
| content_type | string | 消息体的 MIME 类型,如 application/json |
| content_encoding | string | 消息体的编码方式,如 utf-8 |
| delivery_mode | int | 1=非持久化,2=持久化 |
| priority | int | 消息优先级(0-9) |
| correlation_id | string | 关联 ID,用于 RPC 场景 |
| reply_to | string | 回复队列名称 |
| expiration | string | 消息过期时间(毫秒) |
| message_id | string | 消息唯一标识 |
| timestamp | int | 消息创建时间戳 |
| type | string | 消息类型标识 |
| user_id | string | 用户 ID(需验证) |
| app_id | string | 应用标识 |
| headers | array | 自定义消息头 |
2. 消息持久化
消息持久化确保消息在 RabbitMQ 重启后不会丢失,需要三个条件同时满足:
mermaid
graph TB
A[消息持久化] --> B[Exchange 持久化]
A --> C[Queue 持久化]
A --> D[Message 持久化]
B --> B1[durable = true]
C --> C1[durable = true]
D --> D1[delivery_mode = 2]3. 消息确认机制(Publisher Confirms)
Publisher Confirms 是生产者确认消息已被 RabbitMQ 接收的机制:
mermaid
sequenceDiagram
participant P as Producer
participant B as RabbitMQ Broker
P->>B: 发送消息
Note over P: 开启 confirm 模式
B->>P: 返回 basic.ack
Note over P: 收到确认确认模式有三种:
- 同步确认:发送后等待确认
- 批量确认:发送一批后等待确认
- 异步确认:注册回调处理确认
4. 事务机制
RabbitMQ 支持事务,但性能较差,不推荐在高吞吐场景使用:
mermaid
sequenceDiagram
participant P as Producer
participant B as RabbitMQ Broker
P->>B: tx.select
B->>P: tx.select-ok
P->>B: basic.publish
P->>B: basic.publish
P->>B: tx.commit
B->>P: tx.commit-ok代码示例
基础生产者实现
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class BasicProducer
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$config = array_merge([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
], $config);
$this->connection = new AMQPStreamConnection(
$config['host'],
$config['port'],
$config['user'],
$config['password'],
$config['vhost']
);
$this->channel = $this->connection->channel();
}
public function publish(
string $exchange,
string $routingKey,
string $body,
array $properties = []
): void {
$message = new AMQPMessage($body, $properties);
$this->channel->basic_publish(
$message,
$exchange,
$routingKey
);
}
public function close(): void
{
if ($this->channel) {
$this->channel->close();
}
if ($this->connection) {
$this->connection->close();
}
}
}
$producer = new BasicProducer();
$producer->publish(
'orders_exchange',
'order.created',
json_encode(['order_id' => 1001, 'amount' => 299.99]),
['content_type' => 'application/json']
);
$producer->close();带确认机制的生产者
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class ConfirmProducer
{
private $connection;
private $channel;
private $pendingConfirms = [];
private $nextPublishSeqNo = 1;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
$this->channel->confirm_select();
$this->channel->set_ack_handler(function ($deliveryTag, $multiple) {
if ($multiple) {
$keys = array_keys($this->pendingConfirms);
foreach ($keys as $key) {
if ($key <= $deliveryTag) {
unset($this->pendingConfirms[$key]);
}
}
} else {
unset($this->pendingConfirms[$deliveryTag]);
}
echo "Message acknowledged: delivery_tag = {$deliveryTag}\n";
});
$this->channel->set_nack_handler(function ($deliveryTag, $multiple) {
echo "Message nacked: delivery_tag = {$deliveryTag}\n";
});
}
public function publishWithConfirm(
string $exchange,
string $routingKey,
string $body,
array $properties = [],
int $timeout = 5
): bool {
$defaultProperties = [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
];
$properties = array_merge($defaultProperties, $properties);
$message = new AMQPMessage($body, $properties);
$this->channel->basic_publish($message, $exchange, $routingKey);
$deliveryTag = $this->nextPublishSeqNo++;
$this->pendingConfirms[$deliveryTag] = true;
$this->channel->wait_for_pending_acks_returns($timeout);
return !isset($this->pendingConfirms[$deliveryTag]);
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
try {
$producer = new ConfirmProducer([
'host' => 'localhost',
'user' => 'guest',
'password' => 'guest'
]);
$success = $producer->publishWithConfirm(
'orders_exchange',
'order.created',
json_encode(['order_id' => 1001])
);
if ($success) {
echo "Message published and confirmed successfully\n";
} else {
echo "Message was not confirmed\n";
}
$producer->close();
} catch (Exception $e) {
echo "Error: " . $e->getMessage() . "\n";
}带重试机制的生产者
php
<?php
class RobustProducer
{
private $config;
private $connection;
private $channel;
private $maxRetries = 3;
private $retryDelay = 1000;
public function __construct(array $config)
{
$this->config = $config;
$this->connect();
}
private function connect(): void
{
$this->connection = new AMQPStreamConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['password'],
$this->config['vhost']
);
$this->channel = $this->connection->channel();
$this->channel->confirm_select();
}
private function reconnect(): void
{
try {
if ($this->channel) {
$this->channel->close();
}
if ($this->connection) {
$this->connection->close();
}
} catch (Exception $e) {
}
$this->connect();
}
public function publish(
string $exchange,
string $routingKey,
string $body,
array $properties = []
): bool {
$attempt = 0;
while ($attempt < $this->maxRetries) {
try {
$message = new AMQPMessage($body, array_merge([
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
], $properties));
$this->channel->basic_publish($message, $exchange, $routingKey);
$this->channel->wait_for_pending_acks_returns(5);
return true;
} catch (Exception $e) {
$attempt++;
echo "Publish attempt {$attempt} failed: " . $e->getMessage() . "\n";
if ($attempt < $this->maxRetries) {
usleep($this->retryDelay * 1000 * $attempt);
$this->reconnect();
}
}
}
return false;
}
public function close(): void
{
try {
$this->channel->close();
$this->connection->close();
} catch (Exception $e) {
}
}
}批量发送消息
php
<?php
class BatchProducer
{
private $channel;
private $connection;
private $batchSize = 100;
private $currentBatch = 0;
public function __construct(array $config)
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
$this->channel->confirm_select();
}
public function publishBatch(
string $exchange,
string $routingKey,
array $messages
): array {
$results = [
'success' => 0,
'failed' => 0,
'total' => count($messages)
];
foreach ($messages as $data) {
$message = new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->channel->basic_publish($message, $exchange, $routingKey);
$this->currentBatch++;
if ($this->currentBatch >= $this->batchSize) {
if ($this->waitForConfirms()) {
$results['success'] += $this->currentBatch;
} else {
$results['failed'] += $this->currentBatch;
}
$this->currentBatch = 0;
}
}
if ($this->currentBatch > 0) {
if ($this->waitForConfirms()) {
$results['success'] += $this->currentBatch;
} else {
$results['failed'] += $this->currentBatch;
}
}
return $results;
}
private function waitForConfirms(int $timeout = 30): bool
{
try {
$this->channel->wait_for_pending_acks_returns($timeout);
return true;
} catch (Exception $e) {
return false;
}
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$producer = new BatchProducer(['host' => 'localhost']);
$messages = [];
for ($i = 1; $i <= 500; $i++) {
$messages[] = [
'order_id' => $i,
'product' => "Product {$i}",
'amount' => rand(100, 1000) / 10
];
}
$results = $producer->publishBatch('orders_exchange', 'order.created', $messages);
echo "Published: {$results['success']} success, {$results['failed']} failed\n";
$producer->close();设置消息属性示例
php
<?php
class MessagePropertiesExample
{
public function createMessageWithAllProperties(): AMQPMessage
{
return new AMQPMessage(
json_encode(['data' => 'example']),
[
'content_type' => 'application/json',
'content_encoding' => 'utf-8',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'priority' => 5,
'correlation_id' => uniqid('corr_', true),
'reply_to' => 'reply_queue',
'expiration' => '60000',
'message_id' => uniqid('msg_', true),
'timestamp' => time(),
'type' => 'order_created',
'user_id' => 'app_user',
'app_id' => 'order_service',
'headers' => [
'trace_id' => 'abc123',
'span_id' => 'def456',
'source' => 'web_frontend',
'version' => '1.0.0'
]
]
);
}
public function createPriorityMessage(int $priority, string $body): AMQPMessage
{
return new AMQPMessage($body, [
'priority' => min(9, max(0, $priority)),
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);
}
public function createTTLMessage(string $body, int $ttlMs): AMQPMessage
{
return new AMQPMessage($body, [
'expiration' => (string)$ttlMs,
]);
}
public function createRPCRequest(string $body, string $replyTo, string $correlationId): AMQPMessage
{
return new AMQPMessage($body, [
'reply_to' => $replyTo,
'correlation_id' => $correlationId,
'content_type' => 'application/json',
]);
}
}实际应用场景
1. 订单系统消息发送
php
<?php
class OrderMessageProducer
{
private $producer;
public function __construct(RobustProducer $producer)
{
$this->producer = $producer;
}
public function publishOrderCreated(array $order): bool
{
return $this->producer->publish(
'orders_exchange',
'order.created',
json_encode([
'event' => 'order.created',
'data' => $order,
'timestamp' => time(),
'message_id' => uniqid('order_', true)
]),
[
'content_type' => 'application/json',
'type' => 'order.created',
'message_id' => $order['order_id'],
'headers' => [
'version' => '1.0',
'source' => 'order_service'
]
]
);
}
public function publishOrderPaid(int $orderId, float $amount): bool
{
return $this->producer->publish(
'orders_exchange',
'order.paid',
json_encode([
'event' => 'order.paid',
'data' => [
'order_id' => $orderId,
'amount' => $amount,
'paid_at' => date('Y-m-d H:i:s')
]
])
);
}
public function publishOrderCancelled(int $orderId, string $reason): bool
{
return $this->producer->publish(
'orders_exchange',
'order.cancelled',
json_encode([
'event' => 'order.cancelled',
'data' => [
'order_id' => $orderId,
'reason' => $reason,
'cancelled_at' => date('Y-m-d H:i:s')
]
])
);
}
}2. 日志收集系统
php
<?php
class LogProducer
{
private $producer;
private $serviceName;
public function __construct(RobustProducer $producer, string $serviceName)
{
$this->producer = $producer;
$this->serviceName = $serviceName;
}
public function log(string $level, string $message, array $context = []): bool
{
$routingKey = "log.{$level}";
return $this->producer->publish(
'logs_exchange',
$routingKey,
json_encode([
'level' => $level,
'message' => $message,
'context' => $context,
'service' => $this->serviceName,
'hostname' => gethostname(),
'timestamp' => date('Y-m-d H:i:s'),
'trace_id' => $context['trace_id'] ?? null
]),
[
'content_type' => 'application/json',
'headers' => [
'level' => $level,
'service' => $this->serviceName
]
]
);
}
public function emergency(string $message, array $context = []): bool
{
return $this->log('emergency', $message, $context);
}
public function error(string $message, array $context = []): bool
{
return $this->log('error', $message, $context);
}
public function warning(string $message, array $context = []): bool
{
return $this->log('warning', $message, $context);
}
public function info(string $message, array $context = []): bool
{
return $this->log('info', $message, $context);
}
public function debug(string $message, array $context = []): bool
{
return $this->log('debug', $message, $context);
}
}3. 异步任务调度
php
<?php
class TaskProducer
{
private $producer;
public function __construct(RobustProducer $producer)
{
$this->producer = $producer;
}
public function dispatch(string $task, array $payload, array $options = []): bool
{
$options = array_merge([
'delay' => 0,
'priority' => 5,
'max_retries' => 3,
'timeout' => 300
], $options);
$routingKey = "task.{$task}";
$message = [
'task' => $task,
'payload' => $payload,
'options' => $options,
'attempts' => 0,
'created_at' => time()
];
$properties = [
'content_type' => 'application/json',
'priority' => $options['priority'],
'headers' => [
'task_type' => $task,
'max_retries' => $options['max_retries']
]
];
if ($options['delay'] > 0) {
$properties['expiration'] = (string)($options['delay'] * 1000);
}
return $this->producer->publish(
'tasks_exchange',
$routingKey,
json_encode($message),
$properties
);
}
public function sendEmail(string $to, string $subject, string $body, int $delay = 0): bool
{
return $this->dispatch('send_email', [
'to' => $to,
'subject' => $subject,
'body' => $body
], ['delay' => $delay]);
}
public function generateReport(int $reportId, string $format, int $priority = 5): bool
{
return $this->dispatch('generate_report', [
'report_id' => $reportId,
'format' => $format
], ['priority' => $priority]);
}
public function cleanupExpiredData(): bool
{
return $this->dispatch('cleanup_expired', [], [
'priority' => 1,
'delay' => 3600
]);
}
}常见问题与解决方案
1. 消息发送失败
问题原因:
- 连接断开
- Exchange 不存在
- 权限不足
- 消息过大
解决方案:
php
<?php
class SafeProducer
{
private $producer;
public function safePublish(
string $exchange,
string $routingKey,
string $body,
array $properties = []
): array {
$result = [
'success' => false,
'error' => null,
'message_id' => null
];
$maxSize = 128 * 1024 * 1024;
if (strlen($body) > $maxSize) {
$result['error'] = 'Message too large';
return $result;
}
try {
$success = $this->producer->publish($exchange, $routingKey, $body, $properties);
if ($success) {
$result['success'] = true;
$result['message_id'] = $properties['message_id'] ?? md5($body);
} else {
$result['error'] = 'Publish failed after retries';
}
} catch (Exception $e) {
$result['error'] = $e->getMessage();
}
return $result;
}
}2. 消息顺序问题
问题原因:
- 多个消费者并行处理
- 消息重试导致顺序错乱
解决方案:
php
<?php
class OrderedProducer
{
private $producer;
public function publishOrdered(string $exchange, string $baseRoutingKey, array $messages): bool
{
$shardCount = 10;
foreach ($messages as $index => $message) {
$shardId = $index % $shardCount;
$routingKey = "{$baseRoutingKey}.shard.{$shardId}";
$message['sequence'] = $index;
$message['shard_id'] = $shardId;
$this->producer->publish(
$exchange,
$routingKey,
json_encode($message)
);
}
return true;
}
}3. 消息丢失问题
问题原因:
- 未启用持久化
- 未使用确认机制
- Exchange 或 Queue 未声明
解决方案:
php
<?php
class ReliableProducer
{
private $channel;
private $connection;
public function __construct(array $config)
{
$this->connection = new AMQPStreamConnection(
$config['host'],
$config['port'],
$config['user'],
$config['password'],
$config['vhost']
);
$this->channel = $this->connection->channel();
$this->channel->confirm_select();
}
public function publishReliable(
string $exchange,
string $exchangeType,
string $routingKey,
string $body
): bool {
$this->channel->exchange_declare(
$exchange,
$exchangeType,
false,
true,
false
);
$message = new AMQPMessage($body, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json'
]);
$this->channel->basic_publish($message, $exchange, $routingKey);
$this->channel->wait_for_pending_acks_returns(5);
return true;
}
}最佳实践建议
1. 连接池管理
php
<?php
class ProducerPool
{
private static $instances = [];
private $connections = [];
private $config;
public static function getInstance(string $name = 'default'): self
{
if (!isset(self::$instances[$name])) {
self::$instances[$name] = new self();
}
return self::$instances[$name];
}
public function setConfig(array $config): void
{
$this->config = $config;
}
public function getProducer(): RobustProducer
{
$key = md5(serialize($this->config));
if (!isset($this->connections[$key])) {
$this->connections[$key] = new RobustProducer($this->config);
}
return $this->connections[$key];
}
public function closeAll(): void
{
foreach ($this->connections as $producer) {
$producer->close();
}
$this->connections = [];
}
}2. 消息封装最佳实践
php
<?php
class MessageBuilder
{
private $body;
private $properties = [];
private $headers = [];
public static function create(): self
{
return new self();
}
public function withBody($data): self
{
$this->body = is_string($data) ? $data : json_encode($data);
return $this;
}
public function withContentType(string $type): self
{
$this->properties['content_type'] = $type;
return $this;
}
public function persistent(): self
{
$this->properties['delivery_mode'] = AMQPMessage::DELIVERY_MODE_PERSISTENT;
return $this;
}
public function withPriority(int $priority): self
{
$this->properties['priority'] = min(9, max(0, $priority));
return $this;
}
public function withTTL(int $milliseconds): self
{
$this->properties['expiration'] = (string)$milliseconds;
return $this;
}
public function withMessageId(string $id): self
{
$this->properties['message_id'] = $id;
return $this;
}
public function withHeader(string $key, $value): self
{
$this->headers[$key] = $value;
return $this;
}
public function withTraceContext(string $traceId, string $spanId): self
{
$this->headers['trace_id'] = $traceId;
$this->headers['span_id'] = $spanId;
return $this;
}
public function build(): AMQPMessage
{
if (!empty($this->headers)) {
$this->properties['headers'] = $this->headers;
}
return new AMQPMessage($this->body, $this->properties);
}
}
$message = MessageBuilder::create()
->withBody(['order_id' => 1001])
->withContentType('application/json')
->persistent()
->withPriority(5)
->withTTL(60000)
->withMessageId(uniqid('msg_', true))
->withHeader('source', 'order_service')
->withTraceContext('trace123', 'span456')
->build();3. 错误处理与日志
php
<?php
class LoggedProducer
{
private $producer;
private $logger;
public function __construct(RobustProducer $producer, $logger)
{
$this->producer = $producer;
$this->logger = $logger;
}
public function publish(
string $exchange,
string $routingKey,
string $body,
array $properties = []
): bool {
$startTime = microtime(true);
$messageId = $properties['message_id'] ?? uniqid();
$this->logger->info('Publishing message', [
'message_id' => $messageId,
'exchange' => $exchange,
'routing_key' => $routingKey,
'body_size' => strlen($body)
]);
try {
$success = $this->producer->publish($exchange, $routingKey, $body, $properties);
$duration = (microtime(true) - $startTime) * 1000;
if ($success) {
$this->logger->info('Message published successfully', [
'message_id' => $messageId,
'duration_ms' => round($duration, 2)
]);
} else {
$this->logger->error('Message publish failed', [
'message_id' => $messageId,
'duration_ms' => round($duration, 2)
]);
}
return $success;
} catch (Exception $e) {
$this->logger->error('Message publish exception', [
'message_id' => $messageId,
'error' => $e->getMessage(),
'trace' => $e->getTraceAsString()
]);
throw $e;
}
}
}