Appearance
Java 生产者实现
一、概述
生产者(Producer)是 RabbitMQ 中发送消息的组件。本文档详细介绍如何使用 Java 客户端实现消息生产者,包括基础发布、消息属性、确认模式等内容。
1.1 生产者架构
┌─────────────────────────────────────────────────────────────────────┐
│ 生产者架构图 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ Application │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Channel │ ──► │ Exchange │ ──► │ Queue │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ 操作: │
│ ├── basicPublish: 发布消息 │
│ ├── exchangeDeclare: 声明交换器 │
│ ├── queueDeclare: 声明队列 │
│ └── queueBind: 绑定队列 │
│ │
└─────────────────────────────────────────────────────────────────────┘1.2 发布模式
text
发布模式:
├── 同步发布:等待确认
├── 异步发布:使用回调
├── 批量发布:提高吞吐量
└── 事务发布:保证原子性二、核心知识点
2.1 基础消息发布
2.1.1 最简单的发布
java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class SimpleProducer {
public void publishSimpleMessage(Channel channel, String exchange,
String routingKey, String message) throws Exception {
byte[] body = message.getBytes("UTF-8");
channel.basicPublish(exchange, routingKey, null, body);
System.out.println("消息已发送: " + message);
}
}2.1.2 完整发布流程
java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.AMQP;
public class CompleteProducer {
public void publishWithDeclaration(Channel channel) throws Exception {
String exchangeName = "my.exchange";
String queueName = "my.queue";
String routingKey = "my.routing.key";
String message = "Hello RabbitMQ!";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(0)
.build();
channel.basicPublish(exchangeName, routingKey, properties,
message.getBytes("UTF-8"));
System.out.println("消息发布完成");
}
}2.2 消息属性
2.2.1 消息属性详解
java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.MessageProperties;
public class MessagePropertiesDemo {
public void demonstrateProperties(Channel channel, String exchange,
String routingKey) throws Exception {
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.contentEncoding("UTF-8")
.headers(createHeaders())
.deliveryMode(2)
.priority(5)
.correlationId("corr-123")
.replyTo("reply-queue")
.expiration("60000")
.messageId("msg-456")
.timestamp(new java.util.Date())
.type("order.created")
.userId("user-001")
.appId("app-order-service")
.clusterId("cluster-001")
.build();
String message = "{\"orderId\": \"ORD-001\", \"amount\": 99.99}";
channel.basicPublish(exchange, routingKey, props, message.getBytes("UTF-8"));
}
private java.util.Map<String, Object> createHeaders() {
java.util.Map<String, Object> headers = new java.util.HashMap<>();
headers.put("x-correlation-id", "corr-123");
headers.put("x-trace-id", "trace-456");
headers.put("x-source-system", "order-service");
headers.put("x-timestamp", System.currentTimeMillis());
return headers;
}
}2.2.2 预定义属性常量
java
import com.rabbitmq.client.MessageProperties;
public class PredefinedProperties {
public void usePredefinedProperties(Channel channel, String exchange,
String routingKey) throws Exception {
channel.basicPublish(exchange, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
"持久化纯文本消息".getBytes("UTF-8"));
channel.basicPublish(exchange, routingKey,
MessageProperties.PERSISTENT_BASIC,
"持久化基础消息".getBytes("UTF-8"));
channel.basicPublish(exchange, routingKey,
MessageProperties.MINIMAL_PERSISTENT_BASIC,
"最小持久化消息".getBytes("UTF-8"));
channel.basicPublish(exchange, routingKey,
MessageProperties.MINIMAL_BASIC,
"最小非持久化消息".getBytes("UTF-8"));
channel.basicPublish(exchange, routingKey,
MessageProperties.BASIC,
"非持久化基础消息".getBytes("UTF-8"));
}
}2.3 发布者确认
2.3.1 同步确认模式
java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class SyncConfirmProducer {
private final Channel channel;
private final SortedSet<Long> pendingConfirms = new TreeSet<>();
public SyncConfirmProducer(Channel channel) throws Exception {
this.channel = channel;
channel.confirmSelect();
}
public void publishWithSyncConfirm(String exchange, String routingKey,
String message) throws Exception {
long sequenceNumber = channel.getNextPublishSeqNo();
pendingConfirms.add(sequenceNumber);
channel.basicPublish(exchange, routingKey, null, message.getBytes("UTF-8"));
boolean confirmed = channel.waitForConfirms(5000);
if (confirmed) {
pendingConfirms.remove(sequenceNumber);
System.out.println("消息确认: " + sequenceNumber);
} else {
System.err.println("消息未确认: " + sequenceNumber);
}
}
public void publishBatch(String exchange, String routingKey,
java.util.List<String> messages) throws Exception {
for (String message : messages) {
long seq = channel.getNextPublishSeqNo();
pendingConfirms.add(seq);
channel.basicPublish(exchange, routingKey, null, message.getBytes("UTF-8"));
}
boolean allConfirmed = channel.waitForConfirms();
if (allConfirmed) {
pendingConfirms.clear();
System.out.println("批量消息全部确认");
} else {
System.err.println("部分消息未确认: " + pendingConfirms);
}
}
}2.3.2 异步确认模式
java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
public class AsyncConfirmProducer {
private final Channel channel;
private final SortedSet<Long> pendingConfirms = new TreeSet<>();
private final Map<Long, String> messages = new ConcurrentHashMap<>();
public AsyncConfirmProducer(Channel channel) throws Exception {
this.channel = channel;
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
Long first = pendingConfirms.first();
pendingConfirms.headSet(deliveryTag + 1).clear();
System.out.println("批量确认: " + first + " 到 " + deliveryTag);
} else {
pendingConfirms.remove(deliveryTag);
String msg = messages.remove(deliveryTag);
System.out.println("确认消息: " + msg);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.err.println("批量未确认: 到 " + deliveryTag);
pendingConfirms.headSet(deliveryTag + 1).clear();
} else {
System.err.println("未确认消息: " + messages.get(deliveryTag));
pendingConfirms.remove(deliveryTag);
}
}
});
}
public void publishAsync(String exchange, String routingKey, String message)
throws Exception {
long seq = channel.getNextPublishSeqNo();
pendingConfirms.add(seq);
messages.put(seq, message);
channel.basicPublish(exchange, routingKey, null, message.getBytes("UTF-8"));
System.out.println("消息发布: " + message);
}
}2.4 Mandatory 特性
java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.AMQP;
public class MandatoryProducer {
private final Channel channel;
public MandatoryProducer(Channel channel) throws Exception {
this.channel = channel;
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText,
String exchange, String routingKey,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.err.println("消息被退回:");
System.err.println(" 原因: " + replyText);
System.err.println(" 交换器: " + exchange);
System.err.println(" 路由键: " + routingKey);
System.err.println(" 消息内容: " + new String(body, "UTF-8"));
}
});
}
public void publishMandatory(String exchange, String routingKey, String message)
throws Exception {
channel.basicPublish(exchange, routingKey, true, null,
message.getBytes("UTF-8"));
System.out.println("消息已发布 (mandatory=true)");
}
public void publishNonMandatory(String exchange, String routingKey,
String message) throws Exception {
channel.basicPublish(exchange, routingKey, false, null,
message.getBytes("UTF-8"));
System.out.println("消息已发布 (mandatory=false)");
}
}2.5 批量发布
java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class BatchProducer {
private final Channel channel;
private final int batchSize;
private final List<String> messageBuffer = new ArrayList<>();
public BatchProducer(Channel channel, int batchSize) {
this.channel = channel;
this.batchSize = batchSize;
}
public void addMessage(String message) throws Exception {
messageBuffer.add(message);
if (messageBuffer.size() >= batchSize) {
flush();
}
}
public void flush() throws Exception {
if (messageBuffer.isEmpty()) {
return;
}
channel.confirmSelect();
for (String msg : messageBuffer) {
channel.basicPublish("batch.exchange", "batch.key", null,
msg.getBytes("UTF-8"));
}
channel.waitForConfirms();
System.out.println("批量发送完成: " + messageBuffer.size() + " 条消息");
messageBuffer.clear();
}
public void close() throws Exception {
flush();
}
}
class BatchProducerDemo {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare("batch.exchange",
com.rabbitmq.client.BuiltinExchangeType.DIRECT, true);
channel.queueDeclare("batch.queue", true, false, false, null);
channel.queueBind("batch.queue", "batch.exchange", "batch.key");
BatchProducer producer = new BatchProducer(channel, 100);
for (int i = 0; i < 500; i++) {
producer.addMessage("消息 #" + i);
}
producer.close();
System.out.println("批量发布完成");
}
}
}三、代码示例
3.1 完整生产者示例
java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.*;
import java.util.*;
public class CompleteProducer {
private final Connection connection;
private final Channel channel;
private final String exchangeName;
private final String queueName;
private final String routingKey;
public CompleteProducer(String host, int port, String username,
String password, String vhost) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(vhost);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
connection = factory.newConnection("producer-app");
channel = connection.createChannel();
channel.confirmSelect();
exchangeName = "orders.exchange";
queueName = "orders.queue";
routingKey = "order.created";
setupInfrastructure();
setupConfirmListener();
}
private void setupInfrastructure() throws Exception {
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true);
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 86400000);
args.put("x-dead-letter-exchange", "dlx.exchange");
channel.queueDeclare(queueName, true, false, false, args);
channel.queueBind(queueName, exchangeName, routingKey);
System.out.println("基础设施创建完成");
}
private void setupConfirmListener() {
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
System.out.println("消息确认: " + deliveryTag +
(multiple ? " (批量)" : ""));
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
System.err.println("消息未确认: " + deliveryTag);
}
});
channel.addReturnListener(returnInfo -> {
System.err.println("消息被退回: " + returnInfo.getReplyText());
});
}
public void publishOrderCreated(String orderId, String userId,
double amount) throws Exception {
Map<String, Object> orderData = new HashMap<>();
orderData.put("orderId", orderId);
orderData.put("userId", userId);
orderData.put("amount", amount);
orderData.put("status", "CREATED");
orderData.put("timestamp", System.currentTimeMillis());
String message = new com.fasterxml.jackson.databind.ObjectMapper()
.writeValueAsString(orderData);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2)
.priority(5)
.messageId(orderId)
.timestamp(new Date())
.type("order.created")
.headers(createHeaders())
.build();
channel.basicPublish(exchangeName, routingKey, true, properties,
message.getBytes("UTF-8"));
System.out.println("订单消息已发布: " + orderId);
}
private Map<String, Object> createHeaders() {
Map<String, Object> headers = new HashMap<>();
headers.put("x-event-type", "ORDER_CREATED");
headers.put("x-service-name", "order-service");
headers.put("x-trace-id", UUID.randomUUID().toString());
return headers;
}
public void close() throws Exception {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
}
public static void main(String[] args) {
try (CompleteProducer producer = new CompleteProducer(
"localhost", 5672, "guest", "guest", "/")) {
for (int i = 1; i <= 10; i++) {
producer.publishOrderCreated(
"ORD-" + String.format("%05d", i),
"USER-" + (i % 10 + 1),
99.99 * i
);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}3.2 带重试的生产者
java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.*;
public class RetryProducer {
private final Channel channel;
private final String exchange;
private final int maxRetries;
private final long retryDelay;
public RetryProducer(Channel channel, String exchange,
int maxRetries, long retryDelay) {
this.channel = channel;
this.exchange = exchange;
this.maxRetries = maxRetries;
this.retryDelay = retryDelay;
}
public void publishWithRetry(String routingKey, String message) {
int attempt = 0;
Exception lastException = null;
while (attempt < maxRetries) {
try {
channel.basicPublish(exchange, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println("消息发布成功: " + message);
return;
} catch (IOException e) {
lastException = e;
attempt++;
System.err.println("发布失败 (尝试 " + attempt + "/" +
maxRetries + "): " + e.getMessage());
if (attempt < maxRetries) {
try {
Thread.sleep(retryDelay * attempt);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
throw new RuntimeException("消息发布失败,已重试 " + maxRetries + " 次",
lastException);
}
public void publishWithRetryExponential(String routingKey, String message) {
int attempt = 0;
long delay = retryDelay;
while (attempt < maxRetries) {
try {
channel.basicPublish(exchange, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
return;
} catch (IOException e) {
attempt++;
if (attempt >= maxRetries) {
throw new RuntimeException("重试次数耗尽", e);
}
try {
Thread.sleep(delay);
delay *= 2;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("重试被中断", ie);
}
}
}
}
}四、实际应用场景
4.1 订单消息发布
java
import com.rabbitmq.client.*;
import java.util.*;
public class OrderEventProducer {
private final Channel channel;
private final String exchange;
public OrderEventProducer(Channel channel) throws Exception {
this.channel = channel;
this.exchange = "order.events";
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
}
public void publishOrderCreated(Map<String, Object> order) throws Exception {
publish("order.created", order);
}
public void publishOrderPaid(Map<String, Object> order) throws Exception {
publish("order.paid", order);
}
public void publishOrderShipped(Map<String, Object> order) throws Exception {
publish("order.shipped", order);
}
public void publishOrderCancelled(Map<String, Object> order) throws Exception {
publish("order.cancelled", order);
}
private void publish(String eventType, Map<String, Object> data) throws Exception {
String messageId = UUID.randomUUID().toString();
Map<String, Object> event = new HashMap<>();
event.put("eventId", messageId);
event.put("eventType", eventType);
event.put("data", data);
event.put("timestamp", System.currentTimeMillis());
String json = new com.fasterxml.jackson.databind.ObjectMapper()
.writeValueAsString(event);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.messageId(messageId)
.type(eventType)
.timestamp(new Date())
.headers(createEventHeaders(eventType))
.deliveryMode(2)
.priority(5)
.build();
channel.basicPublish(exchange, eventType, props, json.getBytes("UTF-8"));
System.out.println("订单事件已发布: " + eventType);
}
private Map<String, Object> createEventHeaders(String eventType) {
Map<String, Object> headers = new HashMap<>();
headers.put("x-event-type", eventType);
headers.put("x-service", "order-service");
headers.put("x-version", "1.0");
headers.put("x-trace-id", UUID.randomUUID().toString());
return headers;
}
}4.2 日志消息发布
java
import com.rabbitmq.client.*;
import java.util.*;
public class LogProducer {
private final Channel channel;
private final String exchange;
public LogProducer(Channel channel) throws Exception {
this.channel = channel;
this.exchange = "application.logs";
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
}
public void logDebug(String service, String message) {
publishLog("DEBUG", service, message);
}
public void logInfo(String service, String message) {
publishLog("INFO", service, message);
}
public void logWarn(String service, String message) {
publishLog("WARN", service, message);
}
public void logError(String service, String message, Throwable throwable) {
String fullMessage = message + "\n" + getStackTrace(throwable);
publishLog("ERROR", service, fullMessage);
}
private void publishLog(String level, String service, String message) {
try {
Map<String, Object> logData = new HashMap<>();
logData.put("level", level);
logData.put("service", service);
logData.put("message", message);
logData.put("timestamp", System.currentTimeMillis());
logData.put("host", java.net.InetAddress.getLocalHost().getHostName());
String json = new com.fasterxml.jackson.databind.ObjectMapper()
.writeValueAsString(logData);
String routingKey = service + "." + level.toLowerCase();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.priority(getPriority(level))
.build();
channel.basicPublish(exchange, routingKey, props, json.getBytes("UTF-8"));
} catch (Exception e) {
System.err.println("日志发布失败: " + e.getMessage());
}
}
private int getPriority(String level) {
return switch (level) {
case "ERROR" -> 9;
case "WARN" -> 5;
case "INFO" -> 1;
default -> 0;
};
}
private String getStackTrace(Throwable throwable) {
StringBuilder sb = new StringBuilder();
for (StackTraceElement element : throwable.getStackTrace()) {
sb.append(element).append("\n");
}
return sb.toString();
}
}五、常见问题与解决方案
5.1 消息路由失败
问题描述: 消息发布后无法路由到任何队列。
解决方案:
java
channel.addReturnListener(returnInfo -> {
System.err.println("消息被退回: " + returnInfo.getReplyText());
if (returnInfo.getReplyCode() == 312) {
System.err.println("检查绑定关系和路由键");
}
});5.2 发布确认超时
问题描述: 等待确认超时。
解决方案:
java
boolean confirmed = channel.waitForConfirms(10000);
if (!confirmed) {
System.err.println("确认超时,处理未确认消息");
}5.3 队列满
问题描述: 队列达到最大长度,新消息被拒绝。
解决方案:
java
Map<String, Object> args = new HashMap<>();
args.put("x-max-length", 10000);
args.put("x-overflow", "reject-publish");
channel.queueDeclare("my.queue", true, false, false, args);六、最佳实践建议
6.1 发布最佳实践
text
最佳实践:
├── 使用发布者确认确保消息可靠
├── 设置 mandatory=true 处理路由失败
├── 合理使用消息属性
├── 使用批量发布提高吞吐量
├── 实现重试机制处理临时失败
└── 监控发布失败率6.2 性能优化
text
性能优化:
├── 批量发布消息
├── 复用连接和通道
├── 合理设置预取数量
├── 使用异步确认
└── 控制消息大小6.3 可靠性保障
text
可靠性保障:
├── 消息持久化
├── 发布者确认
├── 备用交换器
├── 死信队列
└── 消息追踪