Appearance
Java 消费者实现
一、概述
消费者(Consumer)是 RabbitMQ 中接收和处理消息的组件。本文档详细介绍如何使用 Java 客户端实现消息消费者,包括订阅消费、消息确认、消费模式等内容。
1.1 消费者架构
┌─────────────────────────────────────────────────────────────────────┐
│ 消费者架构图 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Queue │ ──► │ Channel │ ──► │ Consumer │ │
│ └─────────────┘ └─────────────┘ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Application │ │
│ │ (处理消息) │ │
│ └─────────────┘ │
│ │
│ 消费模式: │
│ ├── Push 模式:服务端主动推送消息 │
│ └── Pull 模式:客户端主动拉取消息 │
│ │
│ 消息确认: │
│ ├── 自动确认:autoAck = true │
│ └── 手动确认:autoAck = false │
│ │
└─────────────────────────────────────────────────────────────────────┘1.2 消费方式对比
text
┌─────────────────────────────────────────────────────────────────────┐
│ 消费方式对比 │
├───────────────┬─────────────────────────────────────────────────────┤
│ 方式 │ 说明 │
├───────────────┼─────────────────────────────────────────────────────┤
│ Push 模式 │ 服务端主动推送消息到消费者 │
│ (basicConsume)│ 实时性好,推荐使用 │
│ │ 需要注册回调函数 │
├───────────────┼─────────────────────────────────────────────────────┤
│ Pull 模式 │ 客户端主动从队列获取消息 │
│ (basicGet) │ 适合低频消费场景 │
│ │ 每次只获取一条消息 │
└───────────────┴─────────────────────────────────────────────────────┘二、核心知识点
2.1 Push 模式消费
2.1.1 基础消费者
java
import com.rabbitmq.client.*;
public class BasicConsumer {
private final Channel channel;
private String consumerTag;
public BasicConsumer(Channel channel) {
this.channel = channel;
}
public void startConsuming(String queueName) throws Exception {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("收到消息: " + message);
System.out.println("交换器: " + delivery.getEnvelope().getExchange());
System.out.println("路由键: " + delivery.getEnvelope().getRoutingKey());
System.out.println("投递标签: " + delivery.getEnvelope().getDeliveryTag());
delivery.getEnvelope().getDeliveryTag();
channel.basicAck(deliveryTag, false);
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费者被取消: " + consumerTag);
};
boolean autoAck = false;
consumerTag = channel.basicConsume(queueName, autoAck, deliverCallback, cancelCallback);
System.out.println("消费者已启动,标签: " + consumerTag);
}
public void stopConsuming() throws Exception {
if (consumerTag != null) {
channel.basicCancel(consumerTag);
System.out.println("消费者已停止");
}
}
}2.1.2 完整消费者实现
java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class CompleteConsumer {
private final Channel channel;
private String consumerTag;
public CompleteConsumer(Channel channel) {
this.channel = channel;
}
public void startCompleteConsumer(String queueName) throws Exception {
DeliverCallback deliverCallback = (tag, delivery) -> {
try {
String message = new String(delivery.getBody(), "UTF-8");
AMQP.BasicProperties props = delivery.getProperties();
Map<String, Object> headers = props.getHeaders();
System.out.println("=== 收到消息 ===");
System.out.println("消息ID: " + props.getMessageId());
System.out.println("消息类型: " + props.getType());
System.out.println("时间戳: " + props.getTimestamp());
System.out.println("内容类型: " + props.getContentType());
System.out.println("消息内容: " + message);
if (headers != null) {
System.out.println("消息头: " + headers);
}
processMessage(message);
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
channel.basicAck(deliveryTag, false);
System.out.println("消息处理完成并确认");
} catch (Exception e) {
System.err.println("消息处理失败: " + e.getMessage());
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
boolean requeue = shouldRequeue(e);
channel.basicNack(deliveryTag, false, requeue);
}
};
CancelCallback cancelCallback = tag -> {
System.out.println("消费者被取消: " + tag);
};
ConsumerShutdownSignalCallback shutdownCallback = (tag, sig) -> {
System.out.println("消费者关闭: " + sig.getMessage());
};
channel.basicQos(10);
boolean autoAck = false;
consumerTag = channel.basicConsume(
queueName,
autoAck,
"my-consumer-tag",
false,
false,
null,
deliverCallback,
cancelCallback,
shutdownCallback
);
}
private void processMessage(String message) throws Exception {
Thread.sleep(100);
System.out.println("处理消息: " + message);
}
private boolean shouldRequeue(Exception e) {
return !(e instanceof BusinessException);
}
public void stop() throws Exception {
if (consumerTag != null) {
channel.basicCancel(consumerTag);
}
}
}
class BusinessException extends Exception {
public BusinessException(String message) {
super(message);
}
}2.2 Pull 模式消费
java
import com.rabbitmq.client.*;
public class PullConsumer {
private final Channel channel;
public PullConsumer(Channel channel) {
this.channel = channel;
}
public void consumeWithGet(String queueName) throws Exception {
while (true) {
GetResponse response = channel.basicGet(queueName, false);
if (response == null) {
System.out.println("队列为空,等待...");
Thread.sleep(1000);
continue;
}
String message = new String(response.getBody(), "UTF-8");
System.out.println("获取消息: " + message);
System.out.println("消息数量: " + response.getMessageCount());
try {
processMessage(message);
long deliveryTag = response.getEnvelope().getDeliveryTag();
channel.basicAck(deliveryTag, false);
System.out.println("消息已确认");
} catch (Exception e) {
System.err.println("处理失败: " + e.getMessage());
long deliveryTag = response.getEnvelope().getDeliveryTag();
channel.basicNack(deliveryTag, false, true);
}
}
}
private void processMessage(String message) throws Exception {
System.out.println("处理: " + message);
}
}2.3 消息确认
2.3.1 确认模式
java
import com.rabbitmq.client.*;
public class AckModes {
private final Channel channel;
public AckModes(Channel channel) {
this.channel = channel;
}
public void autoAck(String queueName) throws Exception {
DeliverCallback callback = (tag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("收到消息(自动确认): " + message);
};
channel.basicConsume(queueName, true, callback, tag -> {});
}
public void manualAck(String queueName) throws Exception {
DeliverCallback callback = (tag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("收到消息: " + message);
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
channel.basicAck(deliveryTag, false);
};
channel.basicConsume(queueName, false, callback, tag -> {});
}
public void demonstrateAckTypes(long deliveryTag) throws Exception {
channel.basicAck(deliveryTag, false);
channel.basicAck(deliveryTag, true);
channel.basicNack(deliveryTag, false, false);
channel.basicNack(deliveryTag, true, true);
channel.basicReject(deliveryTag, false);
channel.basicReject(deliveryTag, true);
}
}2.3.2 确认策略
text
┌─────────────────────────────────────────────────────────────────────┐
│ 消息确认策略 │
├───────────────┬─────────────────────────────────────────────────────┤
│ 方法 │ 说明 │
├───────────────┼─────────────────────────────────────────────────────┤
│ basicAck │ 确认消息成功处理 │
│ │ multiple=false: 确认单条消息 │
│ │ multiple=true: 确认该标签及之前的所有消息 │
├───────────────┼─────────────────────────────────────────────────────┤
│ basicNack │ 否定确认消息 │
│ │ multiple: 是否批量 │
│ │ requeue: 是否重新入队 │
├───────────────┼─────────────────────────────────────────────────────┤
│ basicReject │ 拒绝单条消息 │
│ │ requeue: 是否重新入队 │
│ │ 比 basicNack 更轻量 │
├───────────────┼─────────────────────────────────────────────────────┤
│ basicRecover │ 重新投递所有未确认的消息 │
│ │ requeue=true: 重新入队后投递 │
│ │ requeue=false: 尝试投递给同一消费者 │
└───────────────┴─────────────────────────────────────────────────────┘2.4 QoS 设置
java
import com.rabbitmq.client.*;
public class QosDemo {
private final Channel channel;
public QosDemo(Channel channel) {
this.channel = channel;
}
public void setQos() throws Exception {
channel.basicQos(10);
channel.basicQos(20, true);
channel.basicQos(5, 10, false);
System.out.println("QoS 设置完成");
}
public void demonstrateQosEffect(String queueName) throws Exception {
channel.basicQos(5);
int[] processingCount = {0};
DeliverCallback callback = (tag, delivery) -> {
processingCount[0]++;
System.out.println("处理中消息数: " + processingCount[0]);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
channel.basicAck(deliveryTag, false);
processingCount[0]--;
};
channel.basicConsume(queueName, false, callback, tag -> {});
}
}2.5 消费者标签
java
import com.rabbitmq.client.*;
public class ConsumerTagDemo {
private final Channel channel;
public ConsumerTagDemo(Channel channel) {
this.channel = channel;
}
public void useCustomTag(String queueName) throws Exception {
String customTag = "consumer-" + System.currentTimeMillis();
DeliverCallback callback = (tag, delivery) -> {
System.out.println("消费者标签: " + tag);
System.out.println("消息: " + new String(delivery.getBody()));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
String returnedTag = channel.basicConsume(
queueName,
false,
customTag,
callback,
tag -> System.out.println("取消: " + tag)
);
System.out.println("返回的标签: " + returnedTag);
}
public void cancelConsumer(String consumerTag) throws Exception {
channel.basicCancel(consumerTag);
System.out.println("消费者已取消: " + consumerTag);
}
}2.6 消费者取消通知
java
import com.rabbitmq.client.*;
public class ConsumerCancelDemo {
public void setupCancelNotification(Channel channel, String queueName)
throws Exception {
DeliverCallback deliverCallback = (tag, delivery) -> {
System.out.println("收到消息: " + new String(delivery.getBody()));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = tag -> {
System.out.println("消费者被取消,标签: " + tag);
System.out.println("可能原因:队列被删除");
};
String consumerTag = channel.basicConsume(
queueName,
false,
deliverCallback,
cancelCallback
);
}
}三、代码示例
3.1 完整消费者示例
java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class CompleteConsumerDemo {
private final Connection connection;
private final ExecutorService executorService;
private final AtomicInteger processedCount = new AtomicInteger(0);
private volatile boolean running = true;
public CompleteConsumerDemo(String host, int port, String username,
String password, String vhost) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(vhost);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
executorService = Executors.newFixedThreadPool(10);
factory.setSharedExecutor(executorService);
connection = factory.newConnection("consumer-app");
}
public void startConsumer(String queueName, int prefetchCount) throws Exception {
Channel channel = connection.createChannel();
channel.basicQos(prefetchCount);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
executorService.submit(() -> {
try {
processMessage(delivery);
channel.basicAck(
delivery.getEnvelope().getDeliveryTag(),
false
);
int count = processedCount.incrementAndGet();
if (count % 100 == 0) {
System.out.println("已处理消息数: " + count);
}
} catch (Exception e) {
System.err.println("处理失败: " + e.getMessage());
try {
channel.basicNack(
delivery.getEnvelope().getDeliveryTag(),
false,
shouldRequeue(e)
);
} catch (IOException ex) {
ex.printStackTrace();
}
}
});
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费者被取消: " + consumerTag);
};
ConsumerShutdownSignalCallback shutdownCallback = (consumerTag, sig) -> {
System.err.println("消费者关闭: " + sig.getMessage());
if (!sig.isInitiatedByApplication()) {
System.err.println("异常关闭,尝试恢复...");
}
};
channel.basicConsume(
queueName,
false,
"consumer-" + System.currentTimeMillis(),
deliverCallback,
cancelCallback,
shutdownCallback
);
System.out.println("消费者已启动,队列: " + queueName);
}
private void processMessage(Delivery delivery) throws Exception {
String body = new String(delivery.getBody(), "UTF-8");
AMQP.BasicProperties props = delivery.getProperties();
String messageId = props.getMessageId();
String messageType = props.getType();
Map<String, Object> headers = props.getHeaders();
System.out.println("处理消息:");
System.out.println(" ID: " + messageId);
System.out.println(" 类型: " + messageType);
System.out.println(" 内容: " + body);
switch (messageType != null ? messageType : "default") {
case "order.created":
processOrderCreated(body);
break;
case "order.cancelled":
processOrderCancelled(body);
break;
default:
processDefault(body);
}
}
private void processOrderCreated(String body) {
System.out.println("处理订单创建: " + body);
}
private void processOrderCancelled(String body) {
System.out.println("处理订单取消: " + body);
}
private void processDefault(String body) {
System.out.println("处理默认消息: " + body);
}
private boolean shouldRequeue(Exception e) {
if (e instanceof RetryableException) {
return true;
}
return false;
}
public void shutdown() {
running = false;
try {
if (connection != null && connection.isOpen()) {
connection.close(5000);
}
} catch (Exception e) {
e.printStackTrace();
}
executorService.shutdown();
try {
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
System.out.println("消费者已关闭");
}
public static void main(String[] args) throws Exception {
CompleteConsumerDemo consumer = new CompleteConsumerDemo(
"localhost", 5672, "guest", "guest", "/"
);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.shutdown();
}));
consumer.startConsumer("orders.queue", 10);
Thread.sleep(Long.MAX_VALUE);
}
}
class RetryableException extends Exception {
public RetryableException(String message) {
super(message);
}
}3.2 多线程消费者
java
import com.rabbitmq.client.*;
import java.util.concurrent.*;
public class MultiThreadConsumer {
private final Connection connection;
private final ExecutorService executorService;
public MultiThreadConsumer(Connection connection, int threadCount) {
this.connection = connection;
this.executorService = Executors.newFixedThreadPool(threadCount);
}
public void startConsumers(String queueName, int consumerCount, int prefetchCount)
throws Exception {
for (int i = 0; i < consumerCount; i++) {
final int consumerIndex = i;
executorService.submit(() -> {
try {
Channel channel = connection.createChannel();
channel.basicQos(prefetchCount);
String consumerTag = "consumer-" + consumerIndex + "-" +
Thread.currentThread().getId();
DeliverCallback callback = (tag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[Consumer-" + consumerIndex +
"] 处理消息: " + message);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
channel.basicAck(
delivery.getEnvelope().getDeliveryTag(),
false
);
};
channel.basicConsume(queueName, false, consumerTag,
callback, tag -> {});
System.out.println("消费者 " + consumerIndex + " 已启动");
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
}3.3 优先级消费者
java
import com.rabbitmq.client.*;
import java.util.concurrent.*;
public class PriorityConsumer {
private final Channel channel;
private final PriorityBlockingQueue<PriorityMessage> messageQueue;
private final ExecutorService executorService;
public PriorityConsumer(Channel channel, int workerCount) {
this.channel = channel;
this.messageQueue = new PriorityBlockingQueue<>(1000,
(a, b) -> b.priority - a.priority);
this.executorService = Executors.newFixedThreadPool(workerCount);
}
public void start(String queueName) throws Exception {
channel.basicQos(100);
DeliverCallback callback = (tag, delivery) -> {
int priority = 0;
Map<String, Object> headers = delivery.getProperties().getHeaders();
if (headers != null && headers.containsKey("priority")) {
priority = ((Number) headers.get("priority")).intValue();
}
PriorityMessage msg = new PriorityMessage(
delivery,
priority,
delivery.getEnvelope().getDeliveryTag()
);
messageQueue.offer(msg);
};
channel.basicConsume(queueName, false, callback, tag -> {});
for (int i = 0; i < 5; i++) {
executorService.submit(this::processMessages);
}
System.out.println("优先级消费者已启动");
}
private void processMessages() {
while (!Thread.currentThread().isInterrupted()) {
try {
PriorityMessage msg = messageQueue.take();
System.out.println("处理消息 [优先级: " + msg.priority +
"]: " + new String(msg.delivery.getBody()));
channel.basicAck(msg.deliveryTag, false);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void shutdown() {
executorService.shutdown();
}
static class PriorityMessage {
final Delivery delivery;
final int priority;
final long deliveryTag;
PriorityMessage(Delivery delivery, int priority, long deliveryTag) {
this.delivery = delivery;
this.priority = priority;
this.deliveryTag = deliveryTag;
}
}
}四、实际应用场景
4.1 订单处理消费者
java
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
public class OrderConsumer {
private final Channel channel;
private final ObjectMapper objectMapper = new ObjectMapper();
public OrderConsumer(Channel channel) {
this.channel = channel;
}
public void start(String queueName) throws Exception {
channel.basicQos(10);
DeliverCallback callback = (tag, delivery) -> {
try {
String messageType = delivery.getProperties().getType();
String body = new String(delivery.getBody(), "UTF-8");
Map<String, Object> order = objectMapper.readValue(body, Map.class);
switch (messageType) {
case "order.created":
handleOrderCreated(order);
break;
case "order.paid":
handleOrderPaid(order);
break;
case "order.shipped":
handleOrderShipped(order);
break;
case "order.cancelled":
handleOrderCancelled(order);
break;
default:
System.err.println("未知消息类型: " + messageType);
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
System.err.println("处理订单失败: " + e.getMessage());
channel.basicNack(
delivery.getEnvelope().getDeliveryTag(),
false,
true
);
}
};
channel.basicConsume(queueName, false, callback, tag -> {});
}
private void handleOrderCreated(Map<String, Object> order) {
System.out.println("处理订单创建: " + order.get("orderId"));
}
private void handleOrderPaid(Map<String, Object> order) {
System.out.println("处理订单支付: " + order.get("orderId"));
}
private void handleOrderShipped(Map<String, Object> order) {
System.out.println("处理订单发货: " + order.get("orderId"));
}
private void handleOrderCancelled(Map<String, Object> order) {
System.out.println("处理订单取消: " + order.get("orderId"));
}
}4.2 日志处理消费者
java
import com.rabbitmq.client.*;
import java.util.Map;
public class LogConsumer {
private final Channel channel;
public LogConsumer(Channel channel) {
this.channel = channel;
}
public void start(String queueName) throws Exception {
DeliverCallback callback = (tag, delivery) -> {
String body = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
String[] parts = routingKey.split("\\.");
String service = parts.length > 0 ? parts[0] : "unknown";
String level = parts.length > 1 ? parts[1] : "info";
processLog(service, level, body);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(queueName, true, callback, tag -> {});
}
private void processLog(String service, String level, String message) {
String timestamp = java.time.LocalDateTime.now().toString();
System.out.printf("[%s] [%s] [%s] %s%n",
timestamp, level.toUpperCase(), service, message);
if ("error".equalsIgnoreCase(level)) {
sendAlert(service, message);
}
}
private void sendAlert(String service, String message) {
System.err.println("发送告警: " + service + " - " + message);
}
}五、常见问题与解决方案
5.1 消息重复消费
问题描述: 消息被重复处理。
解决方案:
java
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;
public class DeduplicationConsumer {
private final Set<String> processedMessages = ConcurrentHashMap.newKeySet();
public void processWithDeduplication(Delivery delivery) throws Exception {
String messageId = delivery.getProperties().getMessageId();
if (messageId != null && processedMessages.contains(messageId)) {
System.out.println("重复消息,跳过: " + messageId);
return;
}
processMessage(delivery);
if (messageId != null) {
processedMessages.add(messageId);
}
}
}5.2 消息处理超时
问题描述: 消息处理时间过长。
解决方案:
java
import java.util.concurrent.*;
public class TimeoutConsumer {
private final ExecutorService executor = Executors.newCachedThreadPool();
public void processWithTimeout(Delivery delivery, long timeoutSeconds)
throws Exception {
Future<?> future = executor.submit(() -> {
processMessage(delivery);
});
try {
future.get(timeoutSeconds, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
throw new RuntimeException("处理超时");
}
}
}5.3 消费者阻塞
问题描述: 消费者处理速度慢导致消息堆积。
解决方案:
java
public void handleBackpressure(Channel channel, Delivery delivery) throws Exception {
int messageCount = getQueueMessageCount(channel);
if (messageCount > 10000) {
Thread.sleep(100);
}
processMessage(delivery);
}
private int getQueueMessageCount(Channel channel) throws Exception {
AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive("my.queue");
return declareOk.getMessageCount();
}六、最佳实践建议
6.1 消费者设计
text
最佳实践:
├── 使用手动确认模式
├── 合理设置 QoS 预取数量
├── 实现幂等性处理
├── 处理异常情况
├── 监控消费速率
└── 实现优雅关闭6.2 性能优化
text
性能建议:
├── 使用多线程处理
├── 批量确认消息
├── 异步处理消息
├── 控制预取数量
└── 避免阻塞操作6.3 可靠性保障
text
可靠性建议:
├── 正确处理确认和拒绝
├── 实现重试机制
├── 使用死信队列
├── 监控消费者状态
└── 处理网络中断