Skip to content

Java 连接管理

一、概述

连接管理是 RabbitMQ Java 客户端编程的基础。正确管理连接对于构建高性能、可靠的消息系统至关重要。本文详细介绍 Java 客户端的连接创建、配置、恢复和关闭等操作。

1.1 连接层次结构

text
┌─────────────────────────────────────────────────────────────────────┐
│                      连接层次结构                                    │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Application                               │   │
│  └──────────────────────────────┬──────────────────────────────┘   │
│                                 │                                   │
│                                 ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │               ConnectionFactory                              │   │
│  │                    (连接工厂)                                 │   │
│  └──────────────────────────────┬──────────────────────────────┘   │
│                                 │                                   │
│                                 ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                   Connection                                 │   │
│  │                  (TCP 连接)                                   │   │
│  │                                                              │   │
│  │   ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐      │   │
│  │   │ Channel1 │ │ Channel2 │ │ Channel3 │ │ ChannelN │      │   │
│  │   └──────────┘ └──────────┘ └──────────┘ └──────────┘      │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                 │                                   │
│                                 ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                   RabbitMQ Server                            │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 连接生命周期

text
连接生命周期:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   创建工厂   │ ──► │   建立连接   │ ──► │   使用连接   │ ──► │   关闭连接   │
│ Factory     │     │ Connect     │     │   Active    │     │   Close     │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
       │                   │                   │                   │
       ▼                   ▼                   ▼                   ▼
 配置参数            TCP 握手            创建 Channel        释放资源
 设置认证            SASL 认证           执行操作            关闭 Channel
 启用恢复            协商参数            收发消息            断开 TCP

二、核心知识点

2.1 ConnectionFactory 配置

2.1.1 基础配置

java
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionConfig {
    
    public static ConnectionFactory createBasicFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        
        return factory;
    }
    
    public static ConnectionFactory createFactoryFromUri() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        
        String uri = "amqp://guest:guest@localhost:5672/my_vhost";
        factory.setUri(uri);
        
        return factory;
    }
    
    public static ConnectionFactory createFactoryFromProperties() {
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.load("rabbitmq.properties");
        
        return factory;
    }
}

2.1.2 连接参数配置

java
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionParameters {
    
    public static ConnectionFactory configureParameters() {
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setConnectionTimeout(30000);
        factory.setHandshakeTimeout(10000);
        factory.setShutdownTimeout(10000);
        factory.setSocketTimeout(30000);
        
        factory.setRequestedFrameMax(131072);
        factory.setRequestedChannelMax(2047);
        factory.setRequestedHeartbeat(60);
        
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setTopologyRecoveryEnabled(true);
        
        factory.setExceptionHandler(new CustomExceptionHandler());
        
        return factory;
    }
}

2.1.3 配置参数说明

text
┌─────────────────────────────────────────────────────────────────────┐
│                    ConnectionFactory 配置参数                        │
├────────────────────────┬────────────────────────────────────────────┤
│         参数           │                    说明                     │
├────────────────────────┼────────────────────────────────────────────┤
│  host                  │ RabbitMQ 服务器地址                         │
│  port                  │ RabbitMQ 服务器端口(默认 5672)            │
│  username              │ 用户名                                     │
│  password              │ 密码                                       │
│  virtualHost           │ 虚拟主机(默认 /)                         │
├────────────────────────┼────────────────────────────────────────────┤
│  connectionTimeout     │ 连接超时时间(毫秒)                        │
│  handshakeTimeout      │ 握手超时时间(毫秒)                        │
│  shutdownTimeout       │ 关闭超时时间(毫秒)                        │
│  socketTimeout         │ Socket 超时时间(毫秒)                    │
├────────────────────────┼────────────────────────────────────────────┤
│  requestedFrameMax     │ 最大帧大小(字节)                         │
│  requestedChannelMax   │ 最大通道数                                 │
│  requestedHeartbeat    │ 心跳间隔(秒)                             │
├────────────────────────┼────────────────────────────────────────────┤
│  automaticRecovery     │ 自动恢复开关                               │
│  networkRecoveryInterval│ 网络恢复间隔(毫秒)                      │
│  topologyRecovery      │ 拓扑恢复开关                               │
└────────────────────────┴────────────────────────────────────────────┘

2.2 创建连接

2.2.1 单连接创建

java
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class SingleConnectionDemo {
    
    private Connection connection;
    
    public void connect() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        connection = factory.newConnection();
        
        connection = factory.newConnection("my-application");
        
        System.out.println("连接已建立: " + connection);
        System.out.println("服务端属性: " + connection.getServerProperties());
    }
    
    public void close() throws Exception {
        if (connection != null && connection.isOpen()) {
            connection.close();
            System.out.println("连接已关闭");
        }
    }
}

2.2.2 多地址连接

java
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Arrays;
import java.util.List;

public class MultiHostConnectionDemo {
    
    public Connection connectToCluster() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        
        List<Address> addresses = Arrays.asList(
            new Address("rabbitmq1.example.com", 5672),
            new Address("rabbitmq2.example.com", 5672),
            new Address("rabbitmq3.example.com", 5672)
        );
        
        Connection connection = factory.newConnection(addresses, "cluster-client");
        
        System.out.println("连接到: " + connection.getAddress());
        
        return connection;
    }
}

2.2.3 使用 Address 数组

java
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class AddressArrayDemo {
    
    public Connection connectWithAddressResolver() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        
        Address[] addresses = {
            new Address("node1.rabbitmq.local", 5672),
            new Address("node2.rabbitmq.local", 5672),
            new Address("node3.rabbitmq.local", 5672)
        };
        
        return factory.newConnection(addresses);
    }
    
    public Connection connectWithResolver() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        
        AddressResolver resolver = new ListAddressResolver(
            Arrays.asList(
                new Address("node1.rabbitmq.local", 5672),
                new Address("node2.rabbitmq.local", 5672)
            )
        );
        
        return factory.newConnection(resolver);
    }
}

2.3 自动恢复机制

2.3.1 配置自动恢复

java
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;

public class AutoRecoveryDemo {
    
    private Connection connection;
    
    public void setupAutoRecovery() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setTopologyRecoveryEnabled(true);
        
        connection = factory.newConnection();
        
        if (connection instanceof Recoverable) {
            Recoverable recoverable = (Recoverable) connection;
            
            recoverable.addRecoveryListener(new RecoveryListener() {
                @Override
                public void handleRecoveryStarted(Recoverable recoverable) {
                    System.out.println("开始恢复连接...");
                }
                
                @Override
                public void handleRecovery(Recoverable recoverable) {
                    System.out.println("连接恢复完成");
                }
            });
        }
    }
}

2.3.2 恢复机制详解

text
自动恢复机制:

┌─────────────────────────────────────────────────────────────────────┐
│                      自动恢复流程                                    │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  正常状态 ──────► 连接断开 ──────► 检测断开 ──────► 开始恢复        │
│     │                │               │                │             │
│     │                │               │                ▼             │
│     │                │               │        ┌──────────────┐      │
│     │                │               │        │ 重连 TCP     │      │
│     │                │               │        └──────┬───────┘      │
│     │                │               │               │              │
│     │                │               │               ▼              │
│     │                │               │        ┌──────────────┐      │
│     │                │               │        │ 重新认证     │      │
│     │                │               │        └──────┬───────┘      │
│     │                │               │               │              │
│     │                │               │               ▼              │
│     │                │               │        ┌──────────────┐      │
│     │                │               │        │ 恢复通道     │      │
│     │                │               │        └──────┬───────┘      │
│     │                │               │               │              │
│     │                │               │               ▼              │
│     │                │               │        ┌──────────────┐      │
│     │                │               │        │ 恢复消费者   │      │
│     │                │               │        └──────┬───────┘      │
│     │                │               │               │              │
│     │                │               │               ▼              │
│     │                │               │        ┌──────────────┐      │
│     │                │               │        │ 恢复完成     │      │
│     │                │               │        └──────┬───────┘      │
│     │                │               │               │              │
│     ◄────────────────────────────────────────────────┘              │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

2.4 连接状态监听

2.4.1 ShutdownListener

java
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

public class ShutdownListenerDemo {
    
    public void setupShutdownListener() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        Connection connection = factory.newConnection();
        
        connection.addShutdownListener(new ShutdownListener() {
            @Override
            public void shutdownCompleted(ShutdownSignalException cause) {
                System.out.println("连接关闭原因: " + cause.getReason());
                System.out.println("是否由应用发起: " + cause.isInitiatedByApplication());
                System.out.println("是否为硬错误: " + cause.isHardError());
                
                if (!cause.isInitiatedByApplication()) {
                    System.err.println("异常关闭,需要处理!");
                }
            }
        });
    }
    
    public void setupChannelShutdownListener(Channel channel) {
        channel.addShutdownListener(cause -> {
            System.out.println("通道关闭: " + cause.getMessage());
            System.out.println("通道编号: " + cause.getChannelNumber());
        });
    }
}

2.4.2 BlockedListener

java
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class BlockedListenerDemo {
    
    public void setupBlockedListener() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        Connection connection = factory.newConnection();
        
        connection.addBlockedListener(new BlockedListener() {
            @Override
            public void handleBlocked(String reason) throws IOException {
                System.out.println("连接被阻塞: " + reason);
            }
            
            @Override
            public void handleUnblocked() throws IOException {
                System.out.println("连接解除阻塞");
            }
        });
    }
}

2.5 优雅关闭

java
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import java.util.concurrent.TimeUnit;

public class GracefulShutdownDemo {
    
    private Connection connection;
    private volatile boolean running = true;
    
    public void start() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            shutdown();
        }));
    }
    
    public void shutdown() {
        System.out.println("开始优雅关闭...");
        running = false;
        
        try {
            if (connection != null && connection.isOpen()) {
                connection.close(2000);
                System.out.println("连接已关闭");
            }
        } catch (Exception e) {
            System.err.println("关闭连接时出错: " + e.getMessage());
            try {
                connection.abort();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
    
    public void closeWithTimeout(Channel channel) throws Exception {
        channel.close();
        connection.close(5000);
    }
    
    public void closeAbruptly() {
        try {
            connection.abort();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

三、代码示例

3.1 连接管理器

java
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class RabbitMQConnectionManager {
    
    private final ConnectionFactory factory;
    private Connection connection;
    private final ConcurrentHashMap<Long, Channel> channels = new ConcurrentHashMap<>();
    private final AtomicInteger channelCounter = new AtomicInteger(0);
    
    public RabbitMQConnectionManager(String host, int port, String username, String password, String vhost) {
        factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(vhost);
        
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setTopologyRecoveryEnabled(true);
        factory.setRequestedHeartbeat(60);
    }
    
    public synchronized Connection getConnection() throws Exception {
        if (connection == null || !connection.isOpen()) {
            connection = factory.newConnection("connection-manager");
            setupConnectionListeners();
        }
        return connection;
    }
    
    private void setupConnectionListeners() {
        connection.addShutdownListener(new ShutdownListener() {
            @Override
            public void shutdownCompleted(ShutdownSignalException cause) {
                System.out.println("[Connection] 关闭: " + cause.getReason());
                if (!cause.isInitiatedByApplication()) {
                    System.err.println("[Connection] 异常关闭!");
                }
            }
        });
        
        if (connection instanceof Recoverable) {
            Recoverable recoverable = (Recoverable) connection;
            recoverable.addRecoveryListener(new RecoveryListener() {
                @Override
                public void handleRecoveryStarted(Recoverable recoverable) {
                    System.out.println("[Connection] 开始恢复...");
                }
                
                @Override
                public void handleRecovery(Recoverable recoverable) {
                    System.out.println("[Connection] 恢复完成");
                }
            });
        }
    }
    
    public Channel createChannel() throws Exception {
        Connection conn = getConnection();
        Channel channel = conn.createChannel();
        
        long channelId = channelCounter.incrementAndGet();
        channels.put(channelId, channel);
        
        channel.addShutdownListener(cause -> {
            channels.remove(channelId);
            System.out.println("[Channel-" + channelId + "] 关闭");
        });
        
        return channel;
    }
    
    public void closeChannel(Channel channel) {
        if (channel != null && channel.isOpen()) {
            try {
                channel.close();
            } catch (Exception e) {
                System.err.println("关闭通道失败: " + e.getMessage());
            }
        }
    }
    
    public synchronized void close() {
        for (Channel channel : channels.values()) {
            closeChannel(channel);
        }
        channels.clear();
        
        if (connection != null && connection.isOpen()) {
            try {
                connection.close(5000);
                System.out.println("连接管理器已关闭");
            } catch (Exception e) {
                System.err.println("关闭连接失败: " + e.getMessage());
                try {
                    connection.abort();
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
    
    public boolean isConnected() {
        return connection != null && connection.isOpen();
    }
    
    public int getActiveChannelCount() {
        return channels.size();
    }
}

3.2 连接池实现

java
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class RabbitMQChannelPool {
    
    private final Connection connection;
    private final BlockingQueue<Channel> pool;
    private final int maxPoolSize;
    private final int prefetchCount;
    
    public RabbitMQChannelPool(ConnectionFactory factory, int maxPoolSize, int prefetchCount) throws Exception {
        this.maxPoolSize = maxPoolSize;
        this.prefetchCount = prefetchCount;
        this.pool = new LinkedBlockingQueue<>(maxPoolSize);
        
        factory.setAutomaticRecoveryEnabled(true);
        factory.setTopologyRecoveryEnabled(true);
        this.connection = factory.newConnection("channel-pool");
        
        for (int i = 0; i < maxPoolSize; i++) {
            pool.offer(createChannel());
        }
        
        System.out.println("通道池初始化完成,大小: " + maxPoolSize);
    }
    
    private Channel createChannel() throws Exception {
        Channel channel = connection.createChannel();
        channel.basicQos(prefetchCount);
        return channel;
    }
    
    public Channel borrowChannel() throws Exception {
        Channel channel = pool.poll();
        if (channel == null || !channel.isOpen()) {
            channel = createChannel();
        }
        return channel;
    }
    
    public Channel borrowChannel(long timeout, TimeUnit unit) throws Exception {
        Channel channel = pool.poll(timeout, unit);
        if (channel != null && !channel.isOpen()) {
            channel = createChannel();
        }
        return channel;
    }
    
    public void returnChannel(Channel channel) {
        if (channel != null && channel.isOpen()) {
            try {
                channel.basicCancel("");
            } catch (Exception e) {
            }
            
            if (pool.size() < maxPoolSize) {
                pool.offer(channel);
            } else {
                try {
                    channel.close();
                } catch (Exception e) {
                }
            }
        }
    }
    
    public void close() {
        Channel channel;
        while ((channel = pool.poll()) != null) {
            try {
                if (channel.isOpen()) {
                    channel.close();
                }
            } catch (Exception e) {
            }
        }
        
        try {
            if (connection.isOpen()) {
                connection.close();
            }
        } catch (Exception e) {
        }
        
        System.out.println("通道池已关闭");
    }
    
    public int getAvailableChannels() {
        return pool.size();
    }
}

四、实际应用场景

4.1 高可用集群连接

java
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class ClusterConnectionDemo {
    
    private final List<Address> addresses;
    private final ConnectionFactory factory;
    
    public ClusterConnectionDemo() {
        addresses = new ArrayList<>();
        addresses.add(new Address("rabbitmq1.local", 5672));
        addresses.add(new Address("rabbitmq2.local", 5672));
        addresses.add(new Address("rabbitmq3.local", 5672));
        
        factory = new ConnectionFactory();
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
    }
    
    public Connection connect() throws Exception {
        Collections.shuffle(addresses);
        
        return factory.newConnection(addresses, "ha-client");
    }
}

4.2 多租户连接

java
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MultiTenantConnectionManager {
    
    private final ConnectionFactory factoryTemplate;
    private final Map<String, Connection> connections = new ConcurrentHashMap<>();
    
    public MultiTenantConnectionManager(String host, int port) {
        factoryTemplate = new ConnectionFactory();
        factoryTemplate.setHost(host);
        factoryTemplate.setPort(port);
        factoryTemplate.setAutomaticRecoveryEnabled(true);
    }
    
    public Connection getConnection(String tenantId, String username, String password, String vhost) 
            throws Exception {
        String key = tenantId + ":" + vhost;
        
        return connections.computeIfAbsent(key, k -> {
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost(factoryTemplate.getHost());
                factory.setPort(factoryTemplate.getPort());
                factory.setUsername(username);
                factory.setPassword(password);
                factory.setVirtualHost(vhost);
                factory.setAutomaticRecoveryEnabled(true);
                
                return factory.newConnection("tenant-" + tenantId);
            } catch (Exception e) {
                throw new RuntimeException("创建连接失败: " + e.getMessage(), e);
            }
        });
    }
    
    public void closeAll() {
        connections.forEach((key, conn) -> {
            try {
                if (conn.isOpen()) {
                    conn.close();
                }
            } catch (Exception e) {
                System.err.println("关闭连接失败 [" + key + "]: " + e.getMessage());
            }
        });
        connections.clear();
    }
}

五、常见问题与解决方案

5.1 连接泄漏

问题描述: 连接未正确关闭导致资源泄漏。

解决方案

java
public class ConnectionLeakPrevention {
    
    public void executeWithConnection(ConnectionFactory factory, Consumer<Connection> action) {
        try (Connection connection = factory.newConnection()) {
            action.accept(connection);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    
    public void executeWithChannel(ConnectionFactory factory, Consumer<Channel> action) {
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            action.accept(channel);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

5.2 连接超时

问题描述: 连接建立超时。

解决方案

java
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("remote-host");
factory.setConnectionTimeout(30000);
factory.setHandshakeTimeout(15000);
factory.setSocketTimeout(30000);

5.3 心跳超时

问题描述: 连接因心跳超时断开。

解决方案

java
ConnectionFactory factory = new ConnectionFactory();
factory.setRequestedHeartbeat(60);

factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);

六、最佳实践建议

6.1 连接管理

text
最佳实践:
├── 使用单例或连接池管理连接
├── 启用自动恢复机制
├── 设置合理的超时时间
├── 实现优雅关闭
└── 监控连接状态

6.2 性能优化

text
性能建议:
├── 复用连接,避免频繁创建
├── 每个线程独立 Channel
├── 合理设置心跳间隔
├── 使用异步操作
└── 批量处理消息

6.3 安全配置

text
安全建议:
├── 生产环境使用 TLS
├── 使用专用用户账号
├── 限制用户权限
├── 定期轮换密码
└── 启用审计日志

七、相关链接