Skip to content

Java 客户端库介绍

一、概述

RabbitMQ Java 客户端库是 RabbitMQ 官方提供的 Java 语言 AMQP 客户端实现,是使用 Java 与 RabbitMQ 交互的标准库。

1.1 库简介

text
┌─────────────────────────────────────────────────────────────────────┐
│                    RabbitMQ Java Client                              │
├─────────────────────────────────────────────────────────────────────┤
│  名称:RabbitMQ Java Client                                         │
│  组织:RabbitMQ (VMware)                                            │
│  协议:AMQP 0-9-1                                                   │
│  许可:ASL 2.0 / MPL 2.0                                            │
│  仓库:https://github.com/rabbitmq/rabbitmq-java-client             │
│  最新版本:5.x 系列                                                 │
└─────────────────────────────────────────────────────────────────────┘

1.2 版本要求

Java Client 版本Java 版本要求RabbitMQ 版本
5.xJava 8+3.x
4.xJava 6+3.x
3.xJava 6+2.x

1.3 主要特性

text
核心特性:
├── 完整的 AMQP 0-9-1 协议支持
├── 自动连接恢复
├── 通道级流量控制
├── 发布者确认
├── 消费者取消通知
├── TLS/SSL 支持
├── SASL 认证
└── 拓扑恢复

二、核心知识点

2.1 依赖配置

2.1.1 Maven 配置

xml
<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.18.0</version>
    </dependency>
    
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>2.0.7</version>
    </dependency>
    
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.4.8</version>
    </dependency>
</dependencies>

2.1.2 Gradle 配置

groovy
dependencies {
    implementation 'com.rabbitmq:amqp-client:5.18.0'
    implementation 'org.slf4j:slf4j-api:2.0.7'
    implementation 'ch.qos.logback:logback-classic:1.4.8'
}

2.2 核心类介绍

2.2.1 类结构图

text
┌─────────────────────────────────────────────────────────────────────┐
│                    Java Client 核心类                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────┐                                           │
│  │   ConnectionFactory  │  连接工厂,创建连接                       │
│  └──────────┬──────────┘                                           │
│             │                                                       │
│             ▼                                                       │
│  ┌─────────────────────┐                                           │
│  │     Connection      │  TCP 连接,创建通道                       │
│  └──────────┬──────────┘                                           │
│             │                                                       │
│             ▼                                                       │
│  ┌─────────────────────┐                                           │
│  │      Channel        │  通道,执行 AMQP 操作                     │
│  └──────────┬──────────┘                                           │
│             │                                                       │
│     ┌───────┴───────┬───────────────┐                              │
│     ▼               ▼               ▼                              │
│  ┌────────┐   ┌──────────┐   ┌──────────┐                         │
│  │Exchange│   │  Queue   │   │ Consumer │                         │
│  └────────┘   └──────────┘   └──────────┘                         │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

2.2.2 ConnectionFactory

java
import com.rabbitmq.client.ConnectionFactory;

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");

factory.setConnectionTimeout(30000);
factory.setHandshakeTimeout(10000);
factory.setRequestedHeartbeat(60);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setTopologyRecoveryEnabled(true);

2.2.3 Connection

java
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

Connection connection = factory.newConnection();

String clientProvidedName = "my-app-producer";
Connection connection = factory.newConnection(clientProvidedName);

Channel channel = connection.createChannel();

boolean isOpen = connection.isOpen();
Map<String, Object> serverProps = connection.getServerProperties();
connection.close();

2.2.4 Channel

java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.BuiltinExchangeType;

Channel channel = connection.createChannel();

channel.exchangeDeclare("my.exchange", BuiltinExchangeType.DIRECT, true);

channel.queueDeclare("my.queue", true, false, false, null);

channel.queueBind("my.queue", "my.exchange", "routing.key");

channel.basicPublish("my.exchange", "routing.key", null, message.getBytes());

channel.close();

2.3 消息属性

2.3.1 AMQP.BasicProperties

java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.MessageProperties;

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .contentType("application/json")
    .contentEncoding("UTF-8")
    .headers(Map.of("key1", "value1", "key2", "value2"))
    .deliveryMode(2)
    .priority(1)
    .correlationId("corr-id-123")
    .replyTo("reply.queue")
    .expiration("60000")
    .messageId("msg-id-123")
    .timestamp(new Date())
    .type("order.created")
    .userId("user-001")
    .appId("app-001")
    .build();

channel.basicPublish(exchange, routingKey, props, messageBody.getBytes());

2.3.2 预定义属性

java
import com.rabbitmq.client.MessageProperties;

channel.basicPublish(exchange, routingKey, 
    MessageProperties.PERSISTENT_TEXT_PLAIN, 
    messageBody.getBytes());

channel.basicPublish(exchange, routingKey, 
    MessageProperties.PERSISTENT_BASIC, 
    messageBody.getBytes());

channel.basicPublish(exchange, routingKey, 
    MessageProperties.MINIMAL_PERSISTENT_BASIC, 
    messageBody.getBytes());

channel.basicPublish(exchange, routingKey, 
    MessageProperties.MINIMAL_BASIC, 
    messageBody.getBytes());

2.4 消费者模式

2.4.1 推模式(Push)

java
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println("收到消息: " + message);
    
    long deliveryTag = delivery.getEnvelope().getDeliveryTag();
    channel.basicAck(deliveryTag, false);
};

boolean autoAck = false;
String consumerTag = channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});

channel.basicCancel(consumerTag);

2.4.2 拉模式(Pull)

java
import com.rabbitmq.client.GetResponse;

boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);

if (response != null) {
    String message = new String(response.getBody(), "UTF-8");
    System.out.println("获取消息: " + message);
    
    long deliveryTag = response.getEnvelope().getDeliveryTag();
    channel.basicAck(deliveryTag, false);
} else {
    System.out.println("队列为空");
}

2.5 自动恢复机制

text
自动恢复机制:
┌─────────────────────────────────────────────────────────────────────┐
│                                                                     │
│  连接恢复:                                                         │
│  ├── 网络中断检测                                                   │
│  ├── 自动重连                                                       │
│  ├── 重连间隔配置                                                   │
│  └── 连接状态监听                                                   │
│                                                                     │
│  拓扑恢复:                                                         │
│  ├── 交换器重新声明                                                 │
│  ├── 队列重新声明                                                   │
│  ├── 绑定关系恢复                                                   │
│  └── 消费者重新注册                                                 │
│                                                                     │
│  配置项:                                                           │
│  ├── setAutomaticRecoveryEnabled(true)                             │
│  ├── setNetworkRecoveryInterval(5000)                              │
│  └── setTopologyRecoveryEnabled(true)                              │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

三、代码示例

3.1 基础连接示例

java
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;

public class RabbitMQConnectionDemo {
    
    private Connection connection;
    private Channel channel;
    
    public void connect() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        
        factory.setConnectionTimeout(30000);
        factory.setHandshakeTimeout(10000);
        factory.setRequestedHeartbeat(60);
        
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setTopologyRecoveryEnabled(true);
        
        connection = factory.newConnection("demo-connection");
        channel = connection.createChannel();
        
        System.out.println("连接建立成功");
        System.out.println("服务端信息: " + connection.getServerProperties());
    }
    
    public void close() throws Exception {
        if (channel != null && channel.isOpen()) {
            channel.close();
        }
        if (connection != null && connection.isOpen()) {
            connection.close();
        }
        System.out.println("连接已关闭");
    }
    
    public static void main(String[] args) {
        RabbitMQConnectionDemo demo = new RabbitMQConnectionDemo();
        try {
            demo.connect();
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                demo.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

3.2 TLS 连接示例

java
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.security.KeyStore;

public class TLSConnectionDemo {
    
    public Connection createTLSConnection() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("localhost");
        factory.setPort(5671);
        factory.setUsername("guest");
        factory.setPassword("guest");
        
        char[] keyPassphrase = "password".toCharArray();
        KeyStore keyStore = KeyStore.getInstance("PKCS12");
        keyStore.load(new FileInputStream("/path/to/client-key.p12"), keyPassphrase);
        
        KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
        kmf.init(keyStore, keyPassphrase);
        
        char[] trustPassphrase = "password".toCharArray();
        KeyStore trustStore = KeyStore.getInstance("JKS");
        trustStore.load(new FileInputStream("/path/to/trust-store.jks"), trustPassphrase);
        
        TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
        tmf.init(trustStore);
        
        SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
        sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
        
        factory.useSslProtocol(sslContext);
        
        factory.enableHostnameVerification(true);
        
        return factory.newConnection();
    }
    
    public Connection createSimpleTLSConnection() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("localhost");
        factory.setPort(5671);
        factory.setUsername("guest");
        factory.setPassword("guest");
        
        factory.useSslProtocol();
        
        return factory.newConnection();
    }
}

3.3 连接状态监听

java
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import com.rabbitmq.client.Channel;

public class ConnectionListenerDemo {
    
    private Connection connection;
    private Channel channel;
    
    public void connectWithListeners() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        
        connection = factory.newConnection();
        channel = connection.createChannel();
        
        connection.addShutdownListener(new ShutdownListener() {
            @Override
            public void shutdownCompleted(ShutdownSignalException cause) {
                System.out.println("连接关闭: " + cause.getReason());
                if (cause.isInitiatedByApplication()) {
                    System.out.println("由应用程序主动关闭");
                } else {
                    System.out.println("由服务端或网络问题关闭");
                }
            }
        });
        
        if (connection instanceof Recoverable) {
            Recoverable recoverable = (Recoverable) connection;
            recoverable.addRecoveryListener(new RecoveryListener() {
                @Override
                public void handleRecovery(Recoverable recoverable) {
                    System.out.println("连接恢复成功");
                }
                
                @Override
                public void handleRecoveryStarted(Recoverable recoverable) {
                    System.out.println("开始恢复连接...");
                }
            });
        }
        
        channel.addShutdownListener(cause -> {
            System.out.println("通道关闭: " + cause.getReason());
        });
        
        System.out.println("连接建立完成,监听器已注册");
    }
    
    public void close() throws Exception {
        if (channel != null && channel.isOpen()) {
            channel.close();
        }
        if (connection != null && connection.isOpen()) {
            connection.close();
        }
    }
    
    public static void main(String[] args) throws Exception {
        ConnectionListenerDemo demo = new ConnectionListenerDemo();
        demo.connectWithListeners();
        
        Thread.sleep(5000);
        
        demo.close();
    }
}

四、实际应用场景

4.1 微服务通信

text
场景:微服务之间异步通信

架构:
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  订单服务    │     │  库存服务    │     │  通知服务    │
│  (Producer) │     │ (Consumer)  │     │ (Consumer)  │
└──────┬──────┘     └──────┬──────┘     └──────┬──────┘
       │                   │                   │
       │         AMQP 协议                      │
       └───────────────────┼───────────────────┘

                  ┌────────┴────────┐
                  │   RabbitMQ      │
                  └─────────────────┘

Java 客户端角色:
├── 订单服务:生产者,发布订单消息
├── 库存服务:消费者,处理库存变更
└── 通知服务:消费者,发送通知

4.2 任务队列

text
场景:异步任务处理

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Web 应用   │ ──► │  任务队列    │ ──► │  Worker     │
│  (Producer) │     │  (Queue)    │     │ (Consumer)  │
└─────────────┘     └─────────────┘     └─────────────┘

特点:
├── 解耦:Web 应用快速响应
├── 削峰:平滑处理高峰流量
├── 可扩展:增加 Worker 提高处理能力
└── 可靠:消息持久化和确认机制

五、常见问题与解决方案

5.1 连接超时

问题描述: 连接 RabbitMQ 时超时。

解决方案

java
ConnectionFactory factory = new ConnectionFactory();
factory.setConnectionTimeout(30000);
factory.setHandshakeTimeout(10000);
factory.setSocketTimeout(30000);

5.2 连接泄漏

问题描述: 连接未正确关闭导致资源泄漏。

解决方案

java
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {
    
} catch (Exception e) {
    e.printStackTrace();
}

5.3 线程安全问题

问题描述: 多线程共享 Channel 导致问题。

解决方案

java
public class ChannelManager {
    private final Connection connection;
    private final ThreadLocal<Channel> channelThreadLocal = new ThreadLocal<>();
    
    public ChannelManager(Connection connection) {
        this.connection = connection;
    }
    
    public Channel getChannel() throws Exception {
        Channel channel = channelThreadLocal.get();
        if (channel == null || !channel.isOpen()) {
            channel = connection.createChannel();
            channelThreadLocal.set(channel);
        }
        return channel;
    }
}

六、最佳实践建议

6.1 连接管理

text
最佳实践:
├── 使用连接池或单例模式管理连接
├── 每个线程使用独立的 Channel
├── 启用自动恢复机制
├── 设置合理的心跳间隔
└── 使用 try-with-resources 确保资源释放

6.2 性能优化

text
性能建议:
├── 批量发布消息
├── 使用异步确认模式
├── 合理设置 QoS 预取数量
├── 避免频繁创建销毁连接
└── 使用线程池处理消息

6.3 异常处理

text
异常处理建议:
├── 捕获并处理所有可能的异常
├── 实现重试机制
├── 记录详细日志
├── 监控连接状态
└── 实现优雅关闭

七、相关链接