Skip to content

Java 连接池管理

一、概述

连接池是管理 RabbitMQ 连接和通道的重要机制,通过复用连接和通道,可以显著提高系统性能和资源利用率。本文档详细介绍 Java 客户端的连接池实现和管理策略。

1.1 连接池架构

┌─────────────────────────────────────────────────────────────────────┐
│                       连接池架构                                     │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                      Application                              │  │
│  └──────────────────────────────┬───────────────────────────────┘  │
│                                 │                                   │
│                                 ▼                                   │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                    Connection Pool                            │  │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐            │  │
│  │  │ Connection1 │ │ Connection2 │ │ ConnectionN │            │  │
│  │  │  ┌─────┐    │ │  ┌─────┐    │ │  ┌─────┐    │            │  │
│  │  │  │Ch 1 │    │ │  │Ch 1 │    │ │  │Ch 1 │    │            │  │
│  │  │  │Ch 2 │    │ │  │Ch 2 │    │ │  │Ch 2 │    │            │  │
│  │  │  │Ch N │    │ │  │Ch N │    │ │  │Ch N │    │            │  │
│  │  │  └─────┘    │ │  └─────┘    │ │  └─────┘    │            │  │
│  │  └─────────────┘ └─────────────┘ └─────────────┘            │  │
│  └──────────────────────────────────────────────────────────────┘  │
│                                 │                                   │
│                                 ▼                                   │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                     RabbitMQ Server                          │  │
│  └──────────────────────────────────────────────────────────────┘  │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 连接池优势

text
连接池优势:
├── 性能提升:避免频繁创建销毁连接
├── 资源复用:复用 TCP 连接和通道
├── 连接管理:统一管理连接生命周期
├── 流量控制:限制最大连接数
└── 故障恢复:自动重连和恢复

二、核心知识点

2.1 连接池设计

2.1.1 基础连接池

java
import com.rabbitmq.client.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class SimpleConnectionPool {
    
    private final BlockingQueue<Connection> connectionPool;
    private final ConnectionFactory factory;
    private final int maxPoolSize;
    private final AtomicInteger createdCount = new AtomicInteger(0);
    
    public SimpleConnectionPool(String host, int port, String username, 
                                String password, String vhost, int maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
        this.connectionPool = new LinkedBlockingQueue<>(maxPoolSize);
        
        factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(vhost);
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
    }
    
    public Connection borrowConnection() throws Exception {
        Connection connection = connectionPool.poll();
        
        if (connection == null) {
            if (createdCount.get() < maxPoolSize) {
                connection = createConnection();
            } else {
                connection = connectionPool.take();
            }
        }
        
        if (!connection.isOpen()) {
            connection = createConnection();
        }
        
        return connection;
    }
    
    public void returnConnection(Connection connection) {
        if (connection != null && connection.isOpen()) {
            connectionPool.offer(connection);
        } else {
            createdCount.decrementAndGet();
        }
    }
    
    private synchronized Connection createConnection() throws Exception {
        if (createdCount.get() >= maxPoolSize) {
            throw new IllegalStateException("连接池已满");
        }
        
        Connection connection = factory.newConnection("pool-connection-" + createdCount.get());
        createdCount.incrementAndGet();
        
        connection.addShutdownListener(cause -> {
            createdCount.decrementAndGet();
            connectionPool.remove(connection);
        });
        
        return connection;
    }
    
    public void close() {
        Connection connection;
        while ((connection = connectionPool.poll()) != null) {
            try {
                if (connection.isOpen()) {
                    connection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        createdCount.set(0);
    }
    
    public int getPoolSize() {
        return connectionPool.size();
    }
    
    public int getCreatedCount() {
        return createdCount.get();
    }
}

2.1.2 通道池实现

java
import com.rabbitmq.client.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ChannelPool {
    
    private final Connection connection;
    private final BlockingQueue<Channel> channelPool;
    private final int maxPoolSize;
    private final AtomicInteger createdCount = new AtomicInteger(0);
    private final int prefetchCount;
    
    public ChannelPool(Connection connection, int maxPoolSize, int prefetchCount) {
        this.connection = connection;
        this.maxPoolSize = maxPoolSize;
        this.prefetchCount = prefetchCount;
        this.channelPool = new LinkedBlockingQueue<>(maxPoolSize);
    }
    
    public Channel borrowChannel() throws Exception {
        Channel channel = channelPool.poll();
        
        if (channel == null) {
            if (createdCount.get() < maxPoolSize) {
                channel = createChannel();
            } else {
                channel = channelPool.take();
            }
        }
        
        if (!channel.isOpen()) {
            channel = createChannel();
        }
        
        return channel;
    }
    
    public Channel borrowChannel(long timeout, TimeUnit unit) throws Exception {
        Channel channel = channelPool.poll(timeout, unit);
        
        if (channel == null) {
            return null;
        }
        
        if (!channel.isOpen()) {
            channel = createChannel();
        }
        
        return channel;
    }
    
    public void returnChannel(Channel channel) {
        if (channel != null && channel.isOpen()) {
            try {
                channel.basicCancel("");
            } catch (Exception e) {
            }
            
            if (!channelPool.offer(channel)) {
                closeChannel(channel);
            }
        } else {
            createdCount.decrementAndGet();
        }
    }
    
    private Channel createChannel() throws Exception {
        if (!connection.isOpen()) {
            throw new IllegalStateException("连接已关闭");
        }
        
        Channel channel = connection.createChannel();
        channel.basicQos(prefetchCount);
        
        createdCount.incrementAndGet();
        
        channel.addShutdownListener(cause -> {
            createdCount.decrementAndGet();
            channelPool.remove(channel);
        });
        
        return channel;
    }
    
    private void closeChannel(Channel channel) {
        try {
            if (channel.isOpen()) {
                channel.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        createdCount.decrementAndGet();
    }
    
    public void close() {
        Channel channel;
        while ((channel = channelPool.poll()) != null) {
            closeChannel(channel);
        }
        createdCount.set(0);
    }
    
    public int getAvailableChannels() {
        return channelPool.size();
    }
    
    public int getCreatedCount() {
        return createdCount.get();
    }
}

2.2 连接池管理器

java
import com.rabbitmq.client.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class RabbitMQPoolManager {
    
    private final ConnectionFactory factory;
    private final int maxConnections;
    private final int maxChannelsPerConnection;
    private final int prefetchCount;
    
    private final BlockingQueue<PooledConnection> connectionPool;
    private final AtomicLong totalBorrows = new AtomicLong(0);
    private final AtomicLong totalReturns = new AtomicLong(0);
    
    public RabbitMQPoolManager(String host, int port, String username, 
                               String password, String vhost,
                               int maxConnections, int maxChannelsPerConnection,
                               int prefetchCount) {
        
        this.factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(vhost);
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setTopologyRecoveryEnabled(true);
        
        this.maxConnections = maxConnections;
        this.maxChannelsPerConnection = maxChannelsPerConnection;
        this.prefetchCount = prefetchCount;
        
        this.connectionPool = new LinkedBlockingQueue<>(maxConnections);
    }
    
    public void initialize() throws Exception {
        for (int i = 0; i < maxConnections; i++) {
            PooledConnection pooledConn = createPooledConnection();
            connectionPool.offer(pooledConn);
        }
        
        System.out.println("连接池初始化完成,连接数: " + maxConnections);
    }
    
    public PooledChannel borrowChannel() throws Exception {
        totalBorrows.incrementAndGet();
        
        PooledConnection pooledConn = connectionPool.take();
        
        try {
            Channel channel = pooledConn.borrowChannel();
            return new PooledChannel(pooledConn, channel);
        } finally {
            connectionPool.offer(pooledConn);
        }
    }
    
    public PooledChannel borrowChannel(long timeout, TimeUnit unit) throws Exception {
        PooledConnection pooledConn = connectionPool.poll(timeout, unit);
        
        if (pooledConn == null) {
            return null;
        }
        
        try {
            Channel channel = pooledConn.borrowChannel(timeout, unit);
            if (channel == null) {
                return null;
            }
            return new PooledChannel(pooledConn, channel);
        } finally {
            connectionPool.offer(pooledConn);
        }
    }
    
    public void returnChannel(PooledChannel pooledChannel) {
        totalReturns.incrementAndGet();
        
        if (pooledChannel != null) {
            pooledChannel.getPooledConnection().returnChannel(pooledChannel.getChannel());
        }
    }
    
    private PooledConnection createPooledConnection() throws Exception {
        Connection connection = factory.newConnection("pooled-connection");
        
        connection.addShutdownListener(cause -> {
            System.err.println("连接关闭: " + cause.getMessage());
        });
        
        return new PooledConnection(connection, maxChannelsPerConnection, prefetchCount);
    }
    
    public void close() {
        PooledConnection pooledConn;
        while ((pooledConn = connectionPool.poll()) != null) {
            pooledConn.close();
        }
        
        System.out.println("连接池已关闭");
        System.out.println("统计: 借出=" + totalBorrows.get() + 
            ", 归还=" + totalReturns.get());
    }
    
    public PoolStatistics getStatistics() {
        return new PoolStatistics(
            connectionPool.size(),
            maxConnections,
            totalBorrows.get(),
            totalReturns.get()
        );
    }
}

class PooledConnection {
    
    private final Connection connection;
    private final ChannelPool channelPool;
    
    public PooledConnection(Connection connection, int maxChannels, int prefetchCount) {
        this.connection = connection;
        this.channelPool = new ChannelPool(connection, maxChannels, prefetchCount);
    }
    
    public Channel borrowChannel() throws Exception {
        return channelPool.borrowChannel();
    }
    
    public Channel borrowChannel(long timeout, TimeUnit unit) throws Exception {
        return channelPool.borrowChannel(timeout, unit);
    }
    
    public void returnChannel(Channel channel) {
        channelPool.returnChannel(channel);
    }
    
    public void close() {
        channelPool.close();
        try {
            if (connection.isOpen()) {
                connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public int getAvailableChannels() {
        return channelPool.getAvailableChannels();
    }
}

class PooledChannel {
    
    private final PooledConnection pooledConnection;
    private final Channel channel;
    
    public PooledChannel(PooledConnection pooledConnection, Channel channel) {
        this.pooledConnection = pooledConnection;
        this.channel = channel;
    }
    
    public Channel getChannel() {
        return channel;
    }
    
    public PooledConnection getPooledConnection() {
        return pooledConnection;
    }
}

class PoolStatistics {
    private final int availableConnections;
    private final int maxConnections;
    private final long totalBorrows;
    private final long totalReturns;
    
    public PoolStatistics(int availableConnections, int maxConnections, 
                         long totalBorrows, long totalReturns) {
        this.availableConnections = availableConnections;
        this.maxConnections = maxConnections;
        this.totalBorrows = totalBorrows;
        this.totalReturns = totalReturns;
    }
    
    public int getAvailableConnections() { return availableConnections; }
    public int getMaxConnections() { return maxConnections; }
    public long getTotalBorrows() { return totalBorrows; }
    public long getTotalReturns() { return totalReturns; }
    
    @Override
    public String toString() {
        return String.format("PoolStatistics{available=%d, max=%d, borrows=%d, returns=%d}",
            availableConnections, maxConnections, totalBorrows, totalReturns);
    }
}

2.3 连接池配置

java
import com.rabbitmq.client.*;

public class PoolConfig {
    
    private String host = "localhost";
    private int port = 5672;
    private String username = "guest";
    private String password = "guest";
    private String virtualHost = "/";
    
    private int maxConnections = 5;
    private int maxChannelsPerConnection = 10;
    private int prefetchCount = 10;
    
    private boolean automaticRecovery = true;
    private long networkRecoveryInterval = 5000;
    private boolean topologyRecovery = true;
    private int requestedHeartbeat = 60;
    
    private long borrowTimeout = 5000;
    private long connectionTimeout = 30000;
    
    public static PoolConfig defaultConfig() {
        return new PoolConfig();
    }
    
    public static PoolConfig productionConfig() {
        PoolConfig config = new PoolConfig();
        config.maxConnections = 10;
        config.maxChannelsPerConnection = 20;
        config.prefetchCount = 20;
        config.requestedHeartbeat = 30;
        return config;
    }
    
    public static PoolConfig highThroughputConfig() {
        PoolConfig config = new PoolConfig();
        config.maxConnections = 20;
        config.maxChannelsPerConnection = 50;
        config.prefetchCount = 50;
        return config;
    }
    
    public ConnectionFactory createConnectionFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        
        factory.setAutomaticRecoveryEnabled(automaticRecovery);
        factory.setNetworkRecoveryInterval(networkRecoveryInterval);
        factory.setTopologyRecoveryEnabled(topologyRecovery);
        factory.setRequestedHeartbeat(requestedHeartbeat);
        factory.setConnectionTimeout((int) connectionTimeout);
        
        return factory;
    }
    
    public String getHost() { return host; }
    public void setHost(String host) { this.host = host; }
    
    public int getPort() { return port; }
    public void setPort(int port) { this.port = port; }
    
    public String getUsername() { return username; }
    public void setUsername(String username) { this.username = username; }
    
    public String getPassword() { return password; }
    public void setPassword(String password) { this.password = password; }
    
    public String getVirtualHost() { return virtualHost; }
    public void setVirtualHost(String virtualHost) { this.virtualHost = virtualHost; }
    
    public int getMaxConnections() { return maxConnections; }
    public void setMaxConnections(int maxConnections) { this.maxConnections = maxConnections; }
    
    public int getMaxChannelsPerConnection() { return maxChannelsPerConnection; }
    public void setMaxChannelsPerConnection(int maxChannelsPerConnection) { 
        this.maxChannelsPerConnection = maxChannelsPerConnection; 
    }
    
    public int getPrefetchCount() { return prefetchCount; }
    public void setPrefetchCount(int prefetchCount) { this.prefetchCount = prefetchCount; }
    
    public long getBorrowTimeout() { return borrowTimeout; }
    public void setBorrowTimeout(long borrowTimeout) { this.borrowTimeout = borrowTimeout; }
}

三、代码示例

3.1 完整连接池示例

java
import com.rabbitmq.client.*;
import java.util.concurrent.*;
import java.util.function.Consumer;

public class CompletePoolDemo {
    
    private final RabbitMQPoolManager poolManager;
    
    public CompletePoolDemo(PoolConfig config) throws Exception {
        this.poolManager = new RabbitMQPoolManager(
            config.getHost(),
            config.getPort(),
            config.getUsername(),
            config.getPassword(),
            config.getVirtualHost(),
            config.getMaxConnections(),
            config.getMaxChannelsPerConnection(),
            config.getPrefetchCount()
        );
        
        this.poolManager.initialize();
    }
    
    public void publish(String exchange, String routingKey, byte[] body) throws Exception {
        PooledChannel pooledChannel = null;
        
        try {
            pooledChannel = poolManager.borrowChannel(5, TimeUnit.SECONDS);
            
            if (pooledChannel == null) {
                throw new RuntimeException("获取通道超时");
            }
            
            Channel channel = pooledChannel.getChannel();
            channel.basicPublish(exchange, routingKey, 
                MessageProperties.PERSISTENT_TEXT_PLAIN, body);
            
        } finally {
            if (pooledChannel != null) {
                poolManager.returnChannel(pooledChannel);
            }
        }
    }
    
    public void consume(String queueName, Consumer<Delivery> messageHandler) throws Exception {
        PooledChannel pooledChannel = poolManager.borrowChannel();
        
        try {
            Channel channel = pooledChannel.getChannel();
            
            DeliverCallback callback = (consumerTag, delivery) -> {
                messageHandler.accept(delivery);
            };
            
            channel.basicConsume(queueName, false, callback, consumerTag -> {});
            
        } finally {
            poolManager.returnChannel(pooledChannel);
        }
    }
    
    public void executeWithChannel(ChannelOperation operation) throws Exception {
        PooledChannel pooledChannel = null;
        
        try {
            pooledChannel = poolManager.borrowChannel();
            operation.execute(pooledChannel.getChannel());
        } finally {
            if (pooledChannel != null) {
                poolManager.returnChannel(pooledChannel);
            }
        }
    }
    
    public void close() {
        poolManager.close();
    }
    
    public PoolStatistics getStatistics() {
        return poolManager.getStatistics();
    }
    
    @FunctionalInterface
    public interface ChannelOperation {
        void execute(Channel channel) throws Exception;
    }
    
    public static void main(String[] args) throws Exception {
        PoolConfig config = PoolConfig.productionConfig();
        config.setHost("localhost");
        
        CompletePoolDemo demo = new CompletePoolDemo(config);
        
        try {
            for (int i = 0; i < 100; i++) {
                final int index = i;
                demo.executeWithChannel(channel -> {
                    channel.basicPublish("", "test.queue", null, 
                        ("消息 " + index).getBytes());
                });
            }
            
            System.out.println("统计: " + demo.getStatistics());
            
        } finally {
            demo.close();
        }
    }
}

3.2 带监控的连接池

java
import com.rabbitmq.client.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class MonitoredConnectionPool {
    
    private final RabbitMQPoolManager poolManager;
    private final ScheduledExecutorService monitorExecutor;
    
    private final AtomicLong totalMessages = new AtomicLong(0);
    private final AtomicLong failedMessages = new AtomicLong(0);
    private final AtomicLong totalLatency = new AtomicLong(0);
    
    public MonitoredConnectionPool(PoolConfig config) throws Exception {
        this.poolManager = new RabbitMQPoolManager(
            config.getHost(), config.getPort(),
            config.getUsername(), config.getPassword(),
            config.getVirtualHost(),
            config.getMaxConnections(),
            config.getMaxChannelsPerConnection(),
            config.getPrefetchCount()
        );
        
        this.poolManager.initialize();
        this.monitorExecutor = Executors.newSingleThreadScheduledExecutor();
        
        startMonitoring();
    }
    
    private void startMonitoring() {
        monitorExecutor.scheduleAtFixedRate(() -> {
            PoolStatistics stats = poolManager.getStatistics();
            long msgs = totalMessages.get();
            long failed = failedMessages.get();
            long latency = msgs > 0 ? totalLatency.get() / msgs : 0;
            
            System.out.println("=== 连接池监控 ===");
            System.out.println("可用连接: " + stats.getAvailableConnections() + 
                "/" + stats.getMaxConnections());
            System.out.println("总消息数: " + msgs);
            System.out.println("失败消息: " + failed);
            System.out.println("平均延迟: " + latency + " ms");
            
        }, 10, 10, TimeUnit.SECONDS);
    }
    
    public void publishWithMetrics(String exchange, String routingKey, byte[] body) 
            throws Exception {
        
        long startTime = System.currentTimeMillis();
        
        try {
            poolManager.borrowChannel();
            
            PooledChannel pooledChannel = poolManager.borrowChannel();
            try {
                pooledChannel.getChannel().basicPublish(exchange, routingKey, 
                    null, body);
                
                totalMessages.incrementAndGet();
                totalLatency.addAndGet(System.currentTimeMillis() - startTime);
                
            } finally {
                poolManager.returnChannel(pooledChannel);
            }
            
        } catch (Exception e) {
            failedMessages.incrementAndGet();
            throw e;
        }
    }
    
    public PoolMetrics getMetrics() {
        long msgs = totalMessages.get();
        return new PoolMetrics(
            msgs,
            failedMessages.get(),
            msgs > 0 ? totalLatency.get() / msgs : 0,
            poolManager.getStatistics()
        );
    }
    
    public void close() {
        monitorExecutor.shutdown();
        poolManager.close();
    }
}

class PoolMetrics {
    private final long totalMessages;
    private final long failedMessages;
    private final long averageLatency;
    private final PoolStatistics poolStats;
    
    public PoolMetrics(long totalMessages, long failedMessages, 
                       long averageLatency, PoolStatistics poolStats) {
        this.totalMessages = totalMessages;
        this.failedMessages = failedMessages;
        this.averageLatency = averageLatency;
        this.poolStats = poolStats;
    }
    
    public long getTotalMessages() { return totalMessages; }
    public long getFailedMessages() { return failedMessages; }
    public long getAverageLatency() { return averageLatency; }
    public PoolStatistics getPoolStats() { return poolStats; }
    
    @Override
    public String toString() {
        return String.format("PoolMetrics{total=%d, failed=%d, avgLatency=%dms, pool=%s}",
            totalMessages, failedMessages, averageLatency, poolStats);
    }
}

3.3 动态扩容连接池

java
import com.rabbitmq.client.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class DynamicConnectionPool {
    
    private final ConnectionFactory factory;
    private final int minConnections;
    private final int maxConnections;
    private final int maxChannelsPerConnection;
    
    private final BlockingQueue<PooledConnection> availableConnections;
    private final ConcurrentHashMap<PooledConnection, Boolean> allConnections;
    private final AtomicInteger currentSize = new AtomicInteger(0);
    
    private final ScheduledExecutorService scaler;
    private volatile boolean scaling = false;
    
    public DynamicConnectionPool(ConnectionFactory factory, int minConnections, 
                                 int maxConnections, int maxChannelsPerConnection) {
        this.factory = factory;
        this.minConnections = minConnections;
        this.maxConnections = maxConnections;
        this.maxChannelsPerConnection = maxChannelsPerConnection;
        
        this.availableConnections = new LinkedBlockingQueue<>();
        this.allConnections = new ConcurrentHashMap<>();
        this.scaler = Executors.newSingleThreadScheduledExecutor();
    }
    
    public void initialize() throws Exception {
        for (int i = 0; i < minConnections; i++) {
            PooledConnection conn = createConnection();
            availableConnections.offer(conn);
        }
        
        scaler.scheduleAtFixedRate(this::scale, 30, 30, TimeUnit.SECONDS);
        
        System.out.println("动态连接池初始化完成,最小连接数: " + minConnections);
    }
    
    public PooledConnection borrowConnection() throws Exception {
        PooledConnection conn = availableConnections.poll();
        
        if (conn == null) {
            if (currentSize.get() < maxConnections) {
                synchronized (this) {
                    if (currentSize.get() < maxConnections) {
                        conn = createConnection();
                    }
                }
            }
            
            if (conn == null) {
                conn = availableConnections.take();
            }
        }
        
        return conn;
    }
    
    public void returnConnection(PooledConnection conn) {
        if (conn != null && conn.isOpen()) {
            availableConnections.offer(conn);
        } else {
            removeConnection(conn);
        }
    }
    
    private PooledConnection createConnection() throws Exception {
        Connection connection = factory.newConnection("dynamic-pool-" + currentSize.get());
        PooledConnection pooledConn = new PooledConnection(connection, maxChannelsPerConnection, 10);
        
        allConnections.put(pooledConn, true);
        currentSize.incrementAndGet();
        
        System.out.println("创建新连接,当前连接数: " + currentSize.get());
        
        return pooledConn;
    }
    
    private void removeConnection(PooledConnection conn) {
        allConnections.remove(conn);
        availableConnections.remove(conn);
        currentSize.decrementAndGet();
        
        try {
            conn.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        System.out.println("移除连接,当前连接数: " + currentSize.get());
    }
    
    private void scale() {
        int available = availableConnections.size();
        int current = currentSize.get();
        
        if (available < minConnections / 2 && current < maxConnections) {
            scaleUp();
        } else if (available > minConnections && current > minConnections) {
            scaleDown();
        }
    }
    
    private void scaleUp() {
        if (scaling) return;
        
        scaling = true;
        try {
            if (currentSize.get() < maxConnections) {
                PooledConnection conn = createConnection();
                availableConnections.offer(conn);
                System.out.println("扩容连接池");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            scaling = false;
        }
    }
    
    private void scaleDown() {
        if (scaling) return;
        
        scaling = true;
        try {
            PooledConnection conn = availableConnections.poll();
            if (conn != null && currentSize.get() > minConnections) {
                removeConnection(conn);
                System.out.println("缩容连接池");
            }
        } finally {
            scaling = false;
        }
    }
    
    public void close() {
        scaler.shutdown();
        
        for (PooledConnection conn : allConnections.keySet()) {
            try {
                conn.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        availableConnections.clear();
        allConnections.clear();
        currentSize.set(0);
        
        System.out.println("动态连接池已关闭");
    }
    
    public int getCurrentSize() {
        return currentSize.get();
    }
    
    public int getAvailableSize() {
        return availableConnections.size();
    }
}

四、实际应用场景

4.1 Web 应用集成

java
import com.rabbitmq.client.*;
import javax.servlet.*;
import java.util.concurrent.*;

public class RabbitMQPoolFilter implements Filter {
    
    private RabbitMQPoolManager poolManager;
    
    @Override
    public void init(FilterConfig filterConfig) throws ServletException {
        String host = filterConfig.getInitParameter("host");
        int port = Integer.parseInt(filterConfig.getInitParameter("port"));
        String username = filterConfig.getInitParameter("username");
        String password = filterConfig.getInitParameter("password");
        
        PoolConfig config = PoolConfig.productionConfig();
        config.setHost(host);
        config.setPort(port);
        config.setUsername(username);
        config.setPassword(password);
        
        try {
            poolManager = new RabbitMQPoolManager(
                config.getHost(), config.getPort(),
                config.getUsername(), config.getPassword(),
                config.getVirtualHost(),
                config.getMaxConnections(),
                config.getMaxChannelsPerConnection(),
                config.getPrefetchCount()
            );
            poolManager.initialize();
            
            filterConfig.getServletContext().setAttribute("rabbitmq.pool", poolManager);
            
        } catch (Exception e) {
            throw new ServletException("初始化 RabbitMQ 连接池失败", e);
        }
    }
    
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, 
                         FilterChain chain) throws IOException, ServletException {
        chain.doFilter(request, response);
    }
    
    @Override
    public void destroy() {
        if (poolManager != null) {
            poolManager.close();
        }
    }
}

4.2 Spring 集成

java
import com.rabbitmq.client.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Component
public class RabbitMQPoolComponent {
    
    @Value("${rabbitmq.host}")
    private String host;
    
    @Value("${rabbitmq.port}")
    private int port;
    
    @Value("${rabbitmq.username}")
    private String username;
    
    @Value("${rabbitmq.password}")
    private String password;
    
    @Value("${rabbitmq.pool.max-connections:5}")
    private int maxConnections;
    
    @Value("${rabbitmq.pool.max-channels:10}")
    private int maxChannels;
    
    private RabbitMQPoolManager poolManager;
    
    @PostConstruct
    public void init() throws Exception {
        PoolConfig config = new PoolConfig();
        config.setHost(host);
        config.setPort(port);
        config.setUsername(username);
        config.setPassword(password);
        config.setMaxConnections(maxConnections);
        config.setMaxChannelsPerConnection(maxChannels);
        
        poolManager = new RabbitMQPoolManager(
            config.getHost(), config.getPort(),
            config.getUsername(), config.getPassword(),
            config.getVirtualHost(),
            config.getMaxConnections(),
            config.getMaxChannelsPerConnection(),
            config.getPrefetchCount()
        );
        
        poolManager.initialize();
    }
    
    @PreDestroy
    public void destroy() {
        if (poolManager != null) {
            poolManager.close();
        }
    }
    
    public void execute(ChannelOperation operation) throws Exception {
        PooledChannel pooledChannel = null;
        try {
            pooledChannel = poolManager.borrowChannel();
            operation.execute(pooledChannel.getChannel());
        } finally {
            if (pooledChannel != null) {
                poolManager.returnChannel(pooledChannel);
            }
        }
    }
    
    @FunctionalInterface
    public interface ChannelOperation {
        void execute(Channel channel) throws Exception;
    }
}

五、常见问题与解决方案

5.1 连接泄漏

问题描述: 连接借出后未归还。

解决方案

java
public void safeExecute(ChannelOperation operation) throws Exception {
    PooledChannel pooledChannel = null;
    try {
        pooledChannel = poolManager.borrowChannel();
        operation.execute(pooledChannel.getChannel());
    } finally {
        if (pooledChannel != null) {
            poolManager.returnChannel(pooledChannel);
        }
    }
}

5.2 连接池耗尽

问题描述: 连接池中无可用连接。

解决方案

java
PooledChannel pooledChannel = poolManager.borrowChannel(5, TimeUnit.SECONDS);

if (pooledChannel == null) {
    throw new RuntimeException("获取连接超时,连接池可能已耗尽");
}

5.3 连接失效

问题描述: 池中连接已失效。

解决方案

java
public Channel borrowChannel() throws Exception {
    Channel channel = channelPool.poll();
    
    if (channel == null || !channel.isOpen()) {
        channel = createChannel();
    }
    
    return channel;
}

六、最佳实践建议

6.1 连接池配置

text
配置建议:
├── 连接数:根据并发量设置,通常 5-20
├── 通道数:每个连接 10-50 个通道
├── 预取数量:根据处理能力设置
├── 超时时间:合理设置借出超时
└── 心跳间隔:生产环境 30-60 秒

6.2 资源管理

text
管理建议:
├── 使用 try-finally 确保归还
├── 监控连接池使用率
├── 实现健康检查
├── 配置合理的超时
└── 优雅关闭连接池

6.3 性能优化

text
性能建议:
├── 复用连接和通道
├── 批量操作减少借还
├── 异步处理提高吞吐
├── 合理设置池大小
└── 监控和调优

七、相关链接