Skip to content

Java 生产者实现

一、概述

生产者(Producer)是 RabbitMQ 中发送消息的组件。本文档详细介绍如何使用 Java 客户端实现消息生产者,包括基础发布、消息属性、确认模式等内容。

1.1 生产者架构

┌─────────────────────────────────────────────────────────────────────┐
│                       生产者架构图                                    │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────┐                                                   │
│  │ Application  │                                                   │
│  └──────┬──────┘                                                   │
│         │                                                           │
│         ▼                                                           │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐          │
│  │   Channel   │ ──► │  Exchange   │ ──► │   Queue    │          │
│  └─────────────┘     └─────────────┘     └─────────────┘          │
│                                                                     │
│  操作:                                                             │
│  ├── basicPublish: 发布消息                                         │
│  ├── exchangeDeclare: 声明交换器                                    │
│  ├── queueDeclare: 声明队列                                         │
│  └── queueBind: 绑定队列                                           │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 发布模式

text
发布模式:
├── 同步发布:等待确认
├── 异步发布:使用回调
├── 批量发布:提高吞吐量
└── 事务发布:保证原子性

二、核心知识点

2.1 基础消息发布

2.1.1 最简单的发布

java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class SimpleProducer {
    
    public void publishSimpleMessage(Channel channel, String exchange, 
                                     String routingKey, String message) throws Exception {
        
        byte[] body = message.getBytes("UTF-8");
        
        channel.basicPublish(exchange, routingKey, null, body);
        
        System.out.println("消息已发送: " + message);
    }
}

2.1.2 完整发布流程

java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.AMQP;

public class CompleteProducer {
    
    public void publishWithDeclaration(Channel channel) throws Exception {
        String exchangeName = "my.exchange";
        String queueName = "my.queue";
        String routingKey = "my.routing.key";
        String message = "Hello RabbitMQ!";
        
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
        
        channel.queueDeclare(queueName, true, false, false, null);
        
        channel.queueBind(queueName, exchangeName, routingKey);
        
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
            .contentType("text/plain")
            .deliveryMode(2)
            .priority(0)
            .build();
        
        channel.basicPublish(exchangeName, routingKey, properties, 
                           message.getBytes("UTF-8"));
        
        System.out.println("消息发布完成");
    }
}

2.2 消息属性

2.2.1 消息属性详解

java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.MessageProperties;

public class MessagePropertiesDemo {
    
    public void demonstrateProperties(Channel channel, String exchange, 
                                       String routingKey) throws Exception {
        
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .contentType("application/json")
            .contentEncoding("UTF-8")
            .headers(createHeaders())
            .deliveryMode(2)
            .priority(5)
            .correlationId("corr-123")
            .replyTo("reply-queue")
            .expiration("60000")
            .messageId("msg-456")
            .timestamp(new java.util.Date())
            .type("order.created")
            .userId("user-001")
            .appId("app-order-service")
            .clusterId("cluster-001")
            .build();
        
        String message = "{\"orderId\": \"ORD-001\", \"amount\": 99.99}";
        
        channel.basicPublish(exchange, routingKey, props, message.getBytes("UTF-8"));
    }
    
    private java.util.Map<String, Object> createHeaders() {
        java.util.Map<String, Object> headers = new java.util.HashMap<>();
        headers.put("x-correlation-id", "corr-123");
        headers.put("x-trace-id", "trace-456");
        headers.put("x-source-system", "order-service");
        headers.put("x-timestamp", System.currentTimeMillis());
        return headers;
    }
}

2.2.2 预定义属性常量

java
import com.rabbitmq.client.MessageProperties;

public class PredefinedProperties {
    
    public void usePredefinedProperties(Channel channel, String exchange, 
                                         String routingKey) throws Exception {
        
        channel.basicPublish(exchange, routingKey, 
            MessageProperties.PERSISTENT_TEXT_PLAIN, 
            "持久化纯文本消息".getBytes("UTF-8"));
        
        channel.basicPublish(exchange, routingKey, 
            MessageProperties.PERSISTENT_BASIC, 
            "持久化基础消息".getBytes("UTF-8"));
        
        channel.basicPublish(exchange, routingKey, 
            MessageProperties.MINIMAL_PERSISTENT_BASIC, 
            "最小持久化消息".getBytes("UTF-8"));
        
        channel.basicPublish(exchange, routingKey, 
            MessageProperties.MINIMAL_BASIC, 
            "最小非持久化消息".getBytes("UTF-8"));
        
        channel.basicPublish(exchange, routingKey, 
            MessageProperties.BASIC, 
            "非持久化基础消息".getBytes("UTF-8"));
    }
}

2.3 发布者确认

2.3.1 同步确认模式

java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class SyncConfirmProducer {
    
    private final Channel channel;
    private final SortedSet<Long> pendingConfirms = new TreeSet<>();
    
    public SyncConfirmProducer(Channel channel) throws Exception {
        this.channel = channel;
        channel.confirmSelect();
    }
    
    public void publishWithSyncConfirm(String exchange, String routingKey, 
                                        String message) throws Exception {
        
        long sequenceNumber = channel.getNextPublishSeqNo();
        pendingConfirms.add(sequenceNumber);
        
        channel.basicPublish(exchange, routingKey, null, message.getBytes("UTF-8"));
        
        boolean confirmed = channel.waitForConfirms(5000);
        
        if (confirmed) {
            pendingConfirms.remove(sequenceNumber);
            System.out.println("消息确认: " + sequenceNumber);
        } else {
            System.err.println("消息未确认: " + sequenceNumber);
        }
    }
    
    public void publishBatch(String exchange, String routingKey, 
                             java.util.List<String> messages) throws Exception {
        
        for (String message : messages) {
            long seq = channel.getNextPublishSeqNo();
            pendingConfirms.add(seq);
            channel.basicPublish(exchange, routingKey, null, message.getBytes("UTF-8"));
        }
        
        boolean allConfirmed = channel.waitForConfirms();
        
        if (allConfirmed) {
            pendingConfirms.clear();
            System.out.println("批量消息全部确认");
        } else {
            System.err.println("部分消息未确认: " + pendingConfirms);
        }
    }
}

2.3.2 异步确认模式

java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;

public class AsyncConfirmProducer {
    
    private final Channel channel;
    private final SortedSet<Long> pendingConfirms = new TreeSet<>();
    private final Map<Long, String> messages = new ConcurrentHashMap<>();
    
    public AsyncConfirmProducer(Channel channel) throws Exception {
        this.channel = channel;
        channel.confirmSelect();
        
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    Long first = pendingConfirms.first();
                    pendingConfirms.headSet(deliveryTag + 1).clear();
                    System.out.println("批量确认: " + first + " 到 " + deliveryTag);
                } else {
                    pendingConfirms.remove(deliveryTag);
                    String msg = messages.remove(deliveryTag);
                    System.out.println("确认消息: " + msg);
                }
            }
            
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    System.err.println("批量未确认: 到 " + deliveryTag);
                    pendingConfirms.headSet(deliveryTag + 1).clear();
                } else {
                    System.err.println("未确认消息: " + messages.get(deliveryTag));
                    pendingConfirms.remove(deliveryTag);
                }
            }
        });
    }
    
    public void publishAsync(String exchange, String routingKey, String message) 
            throws Exception {
        
        long seq = channel.getNextPublishSeqNo();
        pendingConfirms.add(seq);
        messages.put(seq, message);
        
        channel.basicPublish(exchange, routingKey, null, message.getBytes("UTF-8"));
        
        System.out.println("消息发布: " + message);
    }
}

2.4 Mandatory 特性

java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.AMQP;

public class MandatoryProducer {
    
    private final Channel channel;
    
    public MandatoryProducer(Channel channel) throws Exception {
        this.channel = channel;
        
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText,
                                     String exchange, String routingKey,
                                     AMQP.BasicProperties properties,
                                     byte[] body) throws IOException {
                
                System.err.println("消息被退回:");
                System.err.println("  原因: " + replyText);
                System.err.println("  交换器: " + exchange);
                System.err.println("  路由键: " + routingKey);
                System.err.println("  消息内容: " + new String(body, "UTF-8"));
            }
        });
    }
    
    public void publishMandatory(String exchange, String routingKey, String message) 
            throws Exception {
        
        channel.basicPublish(exchange, routingKey, true, null, 
                           message.getBytes("UTF-8"));
        
        System.out.println("消息已发布 (mandatory=true)");
    }
    
    public void publishNonMandatory(String exchange, String routingKey, 
                                     String message) throws Exception {
        
        channel.basicPublish(exchange, routingKey, false, null, 
                           message.getBytes("UTF-8"));
        
        System.out.println("消息已发布 (mandatory=false)");
    }
}

2.5 批量发布

java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class BatchProducer {
    
    private final Channel channel;
    private final int batchSize;
    private final List<String> messageBuffer = new ArrayList<>();
    
    public BatchProducer(Channel channel, int batchSize) {
        this.channel = channel;
        this.batchSize = batchSize;
    }
    
    public void addMessage(String message) throws Exception {
        messageBuffer.add(message);
        
        if (messageBuffer.size() >= batchSize) {
            flush();
        }
    }
    
    public void flush() throws Exception {
        if (messageBuffer.isEmpty()) {
            return;
        }
        
        channel.confirmSelect();
        
        for (String msg : messageBuffer) {
            channel.basicPublish("batch.exchange", "batch.key", null, 
                               msg.getBytes("UTF-8"));
        }
        
        channel.waitForConfirms();
        
        System.out.println("批量发送完成: " + messageBuffer.size() + " 条消息");
        
        messageBuffer.clear();
    }
    
    public void close() throws Exception {
        flush();
    }
}

class BatchProducerDemo {
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.exchangeDeclare("batch.exchange", 
                com.rabbitmq.client.BuiltinExchangeType.DIRECT, true);
            channel.queueDeclare("batch.queue", true, false, false, null);
            channel.queueBind("batch.queue", "batch.exchange", "batch.key");
            
            BatchProducer producer = new BatchProducer(channel, 100);
            
            for (int i = 0; i < 500; i++) {
                producer.addMessage("消息 #" + i);
            }
            
            producer.close();
            
            System.out.println("批量发布完成");
        }
    }
}

三、代码示例

3.1 完整生产者示例

java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.*;
import java.util.*;

public class CompleteProducer {
    
    private final Connection connection;
    private final Channel channel;
    private final String exchangeName;
    private final String queueName;
    private final String routingKey;
    
    public CompleteProducer(String host, int port, String username, 
                           String password, String vhost) throws Exception {
        
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(vhost);
        
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        
        connection = factory.newConnection("producer-app");
        channel = connection.createChannel();
        
        channel.confirmSelect();
        
        exchangeName = "orders.exchange";
        queueName = "orders.queue";
        routingKey = "order.created";
        
        setupInfrastructure();
        setupConfirmListener();
    }
    
    private void setupInfrastructure() throws Exception {
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true);
        
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 86400000);
        args.put("x-dead-letter-exchange", "dlx.exchange");
        
        channel.queueDeclare(queueName, true, false, false, args);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        System.out.println("基础设施创建完成");
    }
    
    private void setupConfirmListener() {
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) {
                System.out.println("消息确认: " + deliveryTag + 
                    (multiple ? " (批量)" : ""));
            }
            
            @Override
            public void handleNack(long deliveryTag, boolean multiple) {
                System.err.println("消息未确认: " + deliveryTag);
            }
        });
        
        channel.addReturnListener(returnInfo -> {
            System.err.println("消息被退回: " + returnInfo.getReplyText());
        });
    }
    
    public void publishOrderCreated(String orderId, String userId, 
                                    double amount) throws Exception {
        
        Map<String, Object> orderData = new HashMap<>();
        orderData.put("orderId", orderId);
        orderData.put("userId", userId);
        orderData.put("amount", amount);
        orderData.put("status", "CREATED");
        orderData.put("timestamp", System.currentTimeMillis());
        
        String message = new com.fasterxml.jackson.databind.ObjectMapper()
            .writeValueAsString(orderData);
        
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
            .contentType("application/json")
            .deliveryMode(2)
            .priority(5)
            .messageId(orderId)
            .timestamp(new Date())
            .type("order.created")
            .headers(createHeaders())
            .build();
        
        channel.basicPublish(exchangeName, routingKey, true, properties, 
                           message.getBytes("UTF-8"));
        
        System.out.println("订单消息已发布: " + orderId);
    }
    
    private Map<String, Object> createHeaders() {
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-event-type", "ORDER_CREATED");
        headers.put("x-service-name", "order-service");
        headers.put("x-trace-id", UUID.randomUUID().toString());
        return headers;
    }
    
    public void close() throws Exception {
        if (channel != null && channel.isOpen()) {
            channel.close();
        }
        if (connection != null && connection.isOpen()) {
            connection.close();
        }
    }
    
    public static void main(String[] args) {
        try (CompleteProducer producer = new CompleteProducer(
                "localhost", 5672, "guest", "guest", "/")) {
            
            for (int i = 1; i <= 10; i++) {
                producer.publishOrderCreated(
                    "ORD-" + String.format("%05d", i),
                    "USER-" + (i % 10 + 1),
                    99.99 * i
                );
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3.2 带重试的生产者

java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.*;

public class RetryProducer {
    
    private final Channel channel;
    private final String exchange;
    private final int maxRetries;
    private final long retryDelay;
    
    public RetryProducer(Channel channel, String exchange, 
                        int maxRetries, long retryDelay) {
        this.channel = channel;
        this.exchange = exchange;
        this.maxRetries = maxRetries;
        this.retryDelay = retryDelay;
    }
    
    public void publishWithRetry(String routingKey, String message) {
        int attempt = 0;
        Exception lastException = null;
        
        while (attempt < maxRetries) {
            try {
                channel.basicPublish(exchange, routingKey, 
                    MessageProperties.PERSISTENT_TEXT_PLAIN, 
                    message.getBytes("UTF-8"));
                
                System.out.println("消息发布成功: " + message);
                return;
                
            } catch (IOException e) {
                lastException = e;
                attempt++;
                
                System.err.println("发布失败 (尝试 " + attempt + "/" + 
                    maxRetries + "): " + e.getMessage());
                
                if (attempt < maxRetries) {
                    try {
                        Thread.sleep(retryDelay * attempt);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
        
        throw new RuntimeException("消息发布失败,已重试 " + maxRetries + " 次", 
            lastException);
    }
    
    public void publishWithRetryExponential(String routingKey, String message) {
        int attempt = 0;
        long delay = retryDelay;
        
        while (attempt < maxRetries) {
            try {
                channel.basicPublish(exchange, routingKey, 
                    MessageProperties.PERSISTENT_TEXT_PLAIN, 
                    message.getBytes("UTF-8"));
                
                return;
                
            } catch (IOException e) {
                attempt++;
                
                if (attempt >= maxRetries) {
                    throw new RuntimeException("重试次数耗尽", e);
                }
                
                try {
                    Thread.sleep(delay);
                    delay *= 2;
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("重试被中断", ie);
                }
            }
        }
    }
}

四、实际应用场景

4.1 订单消息发布

java
import com.rabbitmq.client.*;
import java.util.*;

public class OrderEventProducer {
    
    private final Channel channel;
    private final String exchange;
    
    public OrderEventProducer(Channel channel) throws Exception {
        this.channel = channel;
        this.exchange = "order.events";
        
        channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
    }
    
    public void publishOrderCreated(Map<String, Object> order) throws Exception {
        publish("order.created", order);
    }
    
    public void publishOrderPaid(Map<String, Object> order) throws Exception {
        publish("order.paid", order);
    }
    
    public void publishOrderShipped(Map<String, Object> order) throws Exception {
        publish("order.shipped", order);
    }
    
    public void publishOrderCancelled(Map<String, Object> order) throws Exception {
        publish("order.cancelled", order);
    }
    
    private void publish(String eventType, Map<String, Object> data) throws Exception {
        String messageId = UUID.randomUUID().toString();
        
        Map<String, Object> event = new HashMap<>();
        event.put("eventId", messageId);
        event.put("eventType", eventType);
        event.put("data", data);
        event.put("timestamp", System.currentTimeMillis());
        
        String json = new com.fasterxml.jackson.databind.ObjectMapper()
            .writeValueAsString(event);
        
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .contentType("application/json")
            .messageId(messageId)
            .type(eventType)
            .timestamp(new Date())
            .headers(createEventHeaders(eventType))
            .deliveryMode(2)
            .priority(5)
            .build();
        
        channel.basicPublish(exchange, eventType, props, json.getBytes("UTF-8"));
        
        System.out.println("订单事件已发布: " + eventType);
    }
    
    private Map<String, Object> createEventHeaders(String eventType) {
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-event-type", eventType);
        headers.put("x-service", "order-service");
        headers.put("x-version", "1.0");
        headers.put("x-trace-id", UUID.randomUUID().toString());
        return headers;
    }
}

4.2 日志消息发布

java
import com.rabbitmq.client.*;
import java.util.*;

public class LogProducer {
    
    private final Channel channel;
    private final String exchange;
    
    public LogProducer(Channel channel) throws Exception {
        this.channel = channel;
        this.exchange = "application.logs";
        
        channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
    }
    
    public void logDebug(String service, String message) {
        publishLog("DEBUG", service, message);
    }
    
    public void logInfo(String service, String message) {
        publishLog("INFO", service, message);
    }
    
    public void logWarn(String service, String message) {
        publishLog("WARN", service, message);
    }
    
    public void logError(String service, String message, Throwable throwable) {
        String fullMessage = message + "\n" + getStackTrace(throwable);
        publishLog("ERROR", service, fullMessage);
    }
    
    private void publishLog(String level, String service, String message) {
        try {
            Map<String, Object> logData = new HashMap<>();
            logData.put("level", level);
            logData.put("service", service);
            logData.put("message", message);
            logData.put("timestamp", System.currentTimeMillis());
            logData.put("host", java.net.InetAddress.getLocalHost().getHostName());
            
            String json = new com.fasterxml.jackson.databind.ObjectMapper()
                .writeValueAsString(logData);
            
            String routingKey = service + "." + level.toLowerCase();
            
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .contentType("application/json")
                .priority(getPriority(level))
                .build();
            
            channel.basicPublish(exchange, routingKey, props, json.getBytes("UTF-8"));
            
        } catch (Exception e) {
            System.err.println("日志发布失败: " + e.getMessage());
        }
    }
    
    private int getPriority(String level) {
        return switch (level) {
            case "ERROR" -> 9;
            case "WARN" -> 5;
            case "INFO" -> 1;
            default -> 0;
        };
    }
    
    private String getStackTrace(Throwable throwable) {
        StringBuilder sb = new StringBuilder();
        for (StackTraceElement element : throwable.getStackTrace()) {
            sb.append(element).append("\n");
        }
        return sb.toString();
    }
}

五、常见问题与解决方案

5.1 消息路由失败

问题描述: 消息发布后无法路由到任何队列。

解决方案

java
channel.addReturnListener(returnInfo -> {
    System.err.println("消息被退回: " + returnInfo.getReplyText());
    
    if (returnInfo.getReplyCode() == 312) {
        System.err.println("检查绑定关系和路由键");
    }
});

5.2 发布确认超时

问题描述: 等待确认超时。

解决方案

java
boolean confirmed = channel.waitForConfirms(10000);

if (!confirmed) {
    System.err.println("确认超时,处理未确认消息");
}

5.3 队列满

问题描述: 队列达到最大长度,新消息被拒绝。

解决方案

java
Map<String, Object> args = new HashMap<>();
args.put("x-max-length", 10000);
args.put("x-overflow", "reject-publish");

channel.queueDeclare("my.queue", true, false, false, args);

六、最佳实践建议

6.1 发布最佳实践

text
最佳实践:
├── 使用发布者确认确保消息可靠
├── 设置 mandatory=true 处理路由失败
├── 合理使用消息属性
├── 使用批量发布提高吞吐量
├── 实现重试机制处理临时失败
└── 监控发布失败率

6.2 性能优化

text
性能优化:
├── 批量发布消息
├── 复用连接和通道
├── 合理设置预取数量
├── 使用异步确认
└── 控制消息大小

6.3 可靠性保障

text
可靠性保障:
├── 消息持久化
├── 发布者确认
├── 备用交换器
├── 死信队列
└── 消息追踪

七、相关链接