Appearance
Java 连接池管理
一、概述
连接池是管理 RabbitMQ 连接和通道的重要机制,通过复用连接和通道,可以显著提高系统性能和资源利用率。本文档详细介绍 Java 客户端的连接池实现和管理策略。
1.1 连接池架构
┌─────────────────────────────────────────────────────────────────────┐
│ 连接池架构 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Application │ │
│ └──────────────────────────────┬───────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Connection Pool │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Connection1 │ │ Connection2 │ │ ConnectionN │ │ │
│ │ │ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │ │ │
│ │ │ │Ch 1 │ │ │ │Ch 1 │ │ │ │Ch 1 │ │ │ │
│ │ │ │Ch 2 │ │ │ │Ch 2 │ │ │ │Ch 2 │ │ │ │
│ │ │ │Ch N │ │ │ │Ch N │ │ │ │Ch N │ │ │ │
│ │ │ └─────┘ │ │ └─────┘ │ │ └─────┘ │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ RabbitMQ Server │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘1.2 连接池优势
text
连接池优势:
├── 性能提升:避免频繁创建销毁连接
├── 资源复用:复用 TCP 连接和通道
├── 连接管理:统一管理连接生命周期
├── 流量控制:限制最大连接数
└── 故障恢复:自动重连和恢复二、核心知识点
2.1 连接池设计
2.1.1 基础连接池
java
import com.rabbitmq.client.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class SimpleConnectionPool {
private final BlockingQueue<Connection> connectionPool;
private final ConnectionFactory factory;
private final int maxPoolSize;
private final AtomicInteger createdCount = new AtomicInteger(0);
public SimpleConnectionPool(String host, int port, String username,
String password, String vhost, int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
this.connectionPool = new LinkedBlockingQueue<>(maxPoolSize);
factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(vhost);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
}
public Connection borrowConnection() throws Exception {
Connection connection = connectionPool.poll();
if (connection == null) {
if (createdCount.get() < maxPoolSize) {
connection = createConnection();
} else {
connection = connectionPool.take();
}
}
if (!connection.isOpen()) {
connection = createConnection();
}
return connection;
}
public void returnConnection(Connection connection) {
if (connection != null && connection.isOpen()) {
connectionPool.offer(connection);
} else {
createdCount.decrementAndGet();
}
}
private synchronized Connection createConnection() throws Exception {
if (createdCount.get() >= maxPoolSize) {
throw new IllegalStateException("连接池已满");
}
Connection connection = factory.newConnection("pool-connection-" + createdCount.get());
createdCount.incrementAndGet();
connection.addShutdownListener(cause -> {
createdCount.decrementAndGet();
connectionPool.remove(connection);
});
return connection;
}
public void close() {
Connection connection;
while ((connection = connectionPool.poll()) != null) {
try {
if (connection.isOpen()) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
createdCount.set(0);
}
public int getPoolSize() {
return connectionPool.size();
}
public int getCreatedCount() {
return createdCount.get();
}
}2.1.2 通道池实现
java
import com.rabbitmq.client.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ChannelPool {
private final Connection connection;
private final BlockingQueue<Channel> channelPool;
private final int maxPoolSize;
private final AtomicInteger createdCount = new AtomicInteger(0);
private final int prefetchCount;
public ChannelPool(Connection connection, int maxPoolSize, int prefetchCount) {
this.connection = connection;
this.maxPoolSize = maxPoolSize;
this.prefetchCount = prefetchCount;
this.channelPool = new LinkedBlockingQueue<>(maxPoolSize);
}
public Channel borrowChannel() throws Exception {
Channel channel = channelPool.poll();
if (channel == null) {
if (createdCount.get() < maxPoolSize) {
channel = createChannel();
} else {
channel = channelPool.take();
}
}
if (!channel.isOpen()) {
channel = createChannel();
}
return channel;
}
public Channel borrowChannel(long timeout, TimeUnit unit) throws Exception {
Channel channel = channelPool.poll(timeout, unit);
if (channel == null) {
return null;
}
if (!channel.isOpen()) {
channel = createChannel();
}
return channel;
}
public void returnChannel(Channel channel) {
if (channel != null && channel.isOpen()) {
try {
channel.basicCancel("");
} catch (Exception e) {
}
if (!channelPool.offer(channel)) {
closeChannel(channel);
}
} else {
createdCount.decrementAndGet();
}
}
private Channel createChannel() throws Exception {
if (!connection.isOpen()) {
throw new IllegalStateException("连接已关闭");
}
Channel channel = connection.createChannel();
channel.basicQos(prefetchCount);
createdCount.incrementAndGet();
channel.addShutdownListener(cause -> {
createdCount.decrementAndGet();
channelPool.remove(channel);
});
return channel;
}
private void closeChannel(Channel channel) {
try {
if (channel.isOpen()) {
channel.close();
}
} catch (Exception e) {
e.printStackTrace();
}
createdCount.decrementAndGet();
}
public void close() {
Channel channel;
while ((channel = channelPool.poll()) != null) {
closeChannel(channel);
}
createdCount.set(0);
}
public int getAvailableChannels() {
return channelPool.size();
}
public int getCreatedCount() {
return createdCount.get();
}
}2.2 连接池管理器
java
import com.rabbitmq.client.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class RabbitMQPoolManager {
private final ConnectionFactory factory;
private final int maxConnections;
private final int maxChannelsPerConnection;
private final int prefetchCount;
private final BlockingQueue<PooledConnection> connectionPool;
private final AtomicLong totalBorrows = new AtomicLong(0);
private final AtomicLong totalReturns = new AtomicLong(0);
public RabbitMQPoolManager(String host, int port, String username,
String password, String vhost,
int maxConnections, int maxChannelsPerConnection,
int prefetchCount) {
this.factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(vhost);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setTopologyRecoveryEnabled(true);
this.maxConnections = maxConnections;
this.maxChannelsPerConnection = maxChannelsPerConnection;
this.prefetchCount = prefetchCount;
this.connectionPool = new LinkedBlockingQueue<>(maxConnections);
}
public void initialize() throws Exception {
for (int i = 0; i < maxConnections; i++) {
PooledConnection pooledConn = createPooledConnection();
connectionPool.offer(pooledConn);
}
System.out.println("连接池初始化完成,连接数: " + maxConnections);
}
public PooledChannel borrowChannel() throws Exception {
totalBorrows.incrementAndGet();
PooledConnection pooledConn = connectionPool.take();
try {
Channel channel = pooledConn.borrowChannel();
return new PooledChannel(pooledConn, channel);
} finally {
connectionPool.offer(pooledConn);
}
}
public PooledChannel borrowChannel(long timeout, TimeUnit unit) throws Exception {
PooledConnection pooledConn = connectionPool.poll(timeout, unit);
if (pooledConn == null) {
return null;
}
try {
Channel channel = pooledConn.borrowChannel(timeout, unit);
if (channel == null) {
return null;
}
return new PooledChannel(pooledConn, channel);
} finally {
connectionPool.offer(pooledConn);
}
}
public void returnChannel(PooledChannel pooledChannel) {
totalReturns.incrementAndGet();
if (pooledChannel != null) {
pooledChannel.getPooledConnection().returnChannel(pooledChannel.getChannel());
}
}
private PooledConnection createPooledConnection() throws Exception {
Connection connection = factory.newConnection("pooled-connection");
connection.addShutdownListener(cause -> {
System.err.println("连接关闭: " + cause.getMessage());
});
return new PooledConnection(connection, maxChannelsPerConnection, prefetchCount);
}
public void close() {
PooledConnection pooledConn;
while ((pooledConn = connectionPool.poll()) != null) {
pooledConn.close();
}
System.out.println("连接池已关闭");
System.out.println("统计: 借出=" + totalBorrows.get() +
", 归还=" + totalReturns.get());
}
public PoolStatistics getStatistics() {
return new PoolStatistics(
connectionPool.size(),
maxConnections,
totalBorrows.get(),
totalReturns.get()
);
}
}
class PooledConnection {
private final Connection connection;
private final ChannelPool channelPool;
public PooledConnection(Connection connection, int maxChannels, int prefetchCount) {
this.connection = connection;
this.channelPool = new ChannelPool(connection, maxChannels, prefetchCount);
}
public Channel borrowChannel() throws Exception {
return channelPool.borrowChannel();
}
public Channel borrowChannel(long timeout, TimeUnit unit) throws Exception {
return channelPool.borrowChannel(timeout, unit);
}
public void returnChannel(Channel channel) {
channelPool.returnChannel(channel);
}
public void close() {
channelPool.close();
try {
if (connection.isOpen()) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public int getAvailableChannels() {
return channelPool.getAvailableChannels();
}
}
class PooledChannel {
private final PooledConnection pooledConnection;
private final Channel channel;
public PooledChannel(PooledConnection pooledConnection, Channel channel) {
this.pooledConnection = pooledConnection;
this.channel = channel;
}
public Channel getChannel() {
return channel;
}
public PooledConnection getPooledConnection() {
return pooledConnection;
}
}
class PoolStatistics {
private final int availableConnections;
private final int maxConnections;
private final long totalBorrows;
private final long totalReturns;
public PoolStatistics(int availableConnections, int maxConnections,
long totalBorrows, long totalReturns) {
this.availableConnections = availableConnections;
this.maxConnections = maxConnections;
this.totalBorrows = totalBorrows;
this.totalReturns = totalReturns;
}
public int getAvailableConnections() { return availableConnections; }
public int getMaxConnections() { return maxConnections; }
public long getTotalBorrows() { return totalBorrows; }
public long getTotalReturns() { return totalReturns; }
@Override
public String toString() {
return String.format("PoolStatistics{available=%d, max=%d, borrows=%d, returns=%d}",
availableConnections, maxConnections, totalBorrows, totalReturns);
}
}2.3 连接池配置
java
import com.rabbitmq.client.*;
public class PoolConfig {
private String host = "localhost";
private int port = 5672;
private String username = "guest";
private String password = "guest";
private String virtualHost = "/";
private int maxConnections = 5;
private int maxChannelsPerConnection = 10;
private int prefetchCount = 10;
private boolean automaticRecovery = true;
private long networkRecoveryInterval = 5000;
private boolean topologyRecovery = true;
private int requestedHeartbeat = 60;
private long borrowTimeout = 5000;
private long connectionTimeout = 30000;
public static PoolConfig defaultConfig() {
return new PoolConfig();
}
public static PoolConfig productionConfig() {
PoolConfig config = new PoolConfig();
config.maxConnections = 10;
config.maxChannelsPerConnection = 20;
config.prefetchCount = 20;
config.requestedHeartbeat = 30;
return config;
}
public static PoolConfig highThroughputConfig() {
PoolConfig config = new PoolConfig();
config.maxConnections = 20;
config.maxChannelsPerConnection = 50;
config.prefetchCount = 50;
return config;
}
public ConnectionFactory createConnectionFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setAutomaticRecoveryEnabled(automaticRecovery);
factory.setNetworkRecoveryInterval(networkRecoveryInterval);
factory.setTopologyRecoveryEnabled(topologyRecovery);
factory.setRequestedHeartbeat(requestedHeartbeat);
factory.setConnectionTimeout((int) connectionTimeout);
return factory;
}
public String getHost() { return host; }
public void setHost(String host) { this.host = host; }
public int getPort() { return port; }
public void setPort(int port) { this.port = port; }
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getPassword() { return password; }
public void setPassword(String password) { this.password = password; }
public String getVirtualHost() { return virtualHost; }
public void setVirtualHost(String virtualHost) { this.virtualHost = virtualHost; }
public int getMaxConnections() { return maxConnections; }
public void setMaxConnections(int maxConnections) { this.maxConnections = maxConnections; }
public int getMaxChannelsPerConnection() { return maxChannelsPerConnection; }
public void setMaxChannelsPerConnection(int maxChannelsPerConnection) {
this.maxChannelsPerConnection = maxChannelsPerConnection;
}
public int getPrefetchCount() { return prefetchCount; }
public void setPrefetchCount(int prefetchCount) { this.prefetchCount = prefetchCount; }
public long getBorrowTimeout() { return borrowTimeout; }
public void setBorrowTimeout(long borrowTimeout) { this.borrowTimeout = borrowTimeout; }
}三、代码示例
3.1 完整连接池示例
java
import com.rabbitmq.client.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
public class CompletePoolDemo {
private final RabbitMQPoolManager poolManager;
public CompletePoolDemo(PoolConfig config) throws Exception {
this.poolManager = new RabbitMQPoolManager(
config.getHost(),
config.getPort(),
config.getUsername(),
config.getPassword(),
config.getVirtualHost(),
config.getMaxConnections(),
config.getMaxChannelsPerConnection(),
config.getPrefetchCount()
);
this.poolManager.initialize();
}
public void publish(String exchange, String routingKey, byte[] body) throws Exception {
PooledChannel pooledChannel = null;
try {
pooledChannel = poolManager.borrowChannel(5, TimeUnit.SECONDS);
if (pooledChannel == null) {
throw new RuntimeException("获取通道超时");
}
Channel channel = pooledChannel.getChannel();
channel.basicPublish(exchange, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN, body);
} finally {
if (pooledChannel != null) {
poolManager.returnChannel(pooledChannel);
}
}
}
public void consume(String queueName, Consumer<Delivery> messageHandler) throws Exception {
PooledChannel pooledChannel = poolManager.borrowChannel();
try {
Channel channel = pooledChannel.getChannel();
DeliverCallback callback = (consumerTag, delivery) -> {
messageHandler.accept(delivery);
};
channel.basicConsume(queueName, false, callback, consumerTag -> {});
} finally {
poolManager.returnChannel(pooledChannel);
}
}
public void executeWithChannel(ChannelOperation operation) throws Exception {
PooledChannel pooledChannel = null;
try {
pooledChannel = poolManager.borrowChannel();
operation.execute(pooledChannel.getChannel());
} finally {
if (pooledChannel != null) {
poolManager.returnChannel(pooledChannel);
}
}
}
public void close() {
poolManager.close();
}
public PoolStatistics getStatistics() {
return poolManager.getStatistics();
}
@FunctionalInterface
public interface ChannelOperation {
void execute(Channel channel) throws Exception;
}
public static void main(String[] args) throws Exception {
PoolConfig config = PoolConfig.productionConfig();
config.setHost("localhost");
CompletePoolDemo demo = new CompletePoolDemo(config);
try {
for (int i = 0; i < 100; i++) {
final int index = i;
demo.executeWithChannel(channel -> {
channel.basicPublish("", "test.queue", null,
("消息 " + index).getBytes());
});
}
System.out.println("统计: " + demo.getStatistics());
} finally {
demo.close();
}
}
}3.2 带监控的连接池
java
import com.rabbitmq.client.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class MonitoredConnectionPool {
private final RabbitMQPoolManager poolManager;
private final ScheduledExecutorService monitorExecutor;
private final AtomicLong totalMessages = new AtomicLong(0);
private final AtomicLong failedMessages = new AtomicLong(0);
private final AtomicLong totalLatency = new AtomicLong(0);
public MonitoredConnectionPool(PoolConfig config) throws Exception {
this.poolManager = new RabbitMQPoolManager(
config.getHost(), config.getPort(),
config.getUsername(), config.getPassword(),
config.getVirtualHost(),
config.getMaxConnections(),
config.getMaxChannelsPerConnection(),
config.getPrefetchCount()
);
this.poolManager.initialize();
this.monitorExecutor = Executors.newSingleThreadScheduledExecutor();
startMonitoring();
}
private void startMonitoring() {
monitorExecutor.scheduleAtFixedRate(() -> {
PoolStatistics stats = poolManager.getStatistics();
long msgs = totalMessages.get();
long failed = failedMessages.get();
long latency = msgs > 0 ? totalLatency.get() / msgs : 0;
System.out.println("=== 连接池监控 ===");
System.out.println("可用连接: " + stats.getAvailableConnections() +
"/" + stats.getMaxConnections());
System.out.println("总消息数: " + msgs);
System.out.println("失败消息: " + failed);
System.out.println("平均延迟: " + latency + " ms");
}, 10, 10, TimeUnit.SECONDS);
}
public void publishWithMetrics(String exchange, String routingKey, byte[] body)
throws Exception {
long startTime = System.currentTimeMillis();
try {
poolManager.borrowChannel();
PooledChannel pooledChannel = poolManager.borrowChannel();
try {
pooledChannel.getChannel().basicPublish(exchange, routingKey,
null, body);
totalMessages.incrementAndGet();
totalLatency.addAndGet(System.currentTimeMillis() - startTime);
} finally {
poolManager.returnChannel(pooledChannel);
}
} catch (Exception e) {
failedMessages.incrementAndGet();
throw e;
}
}
public PoolMetrics getMetrics() {
long msgs = totalMessages.get();
return new PoolMetrics(
msgs,
failedMessages.get(),
msgs > 0 ? totalLatency.get() / msgs : 0,
poolManager.getStatistics()
);
}
public void close() {
monitorExecutor.shutdown();
poolManager.close();
}
}
class PoolMetrics {
private final long totalMessages;
private final long failedMessages;
private final long averageLatency;
private final PoolStatistics poolStats;
public PoolMetrics(long totalMessages, long failedMessages,
long averageLatency, PoolStatistics poolStats) {
this.totalMessages = totalMessages;
this.failedMessages = failedMessages;
this.averageLatency = averageLatency;
this.poolStats = poolStats;
}
public long getTotalMessages() { return totalMessages; }
public long getFailedMessages() { return failedMessages; }
public long getAverageLatency() { return averageLatency; }
public PoolStatistics getPoolStats() { return poolStats; }
@Override
public String toString() {
return String.format("PoolMetrics{total=%d, failed=%d, avgLatency=%dms, pool=%s}",
totalMessages, failedMessages, averageLatency, poolStats);
}
}3.3 动态扩容连接池
java
import com.rabbitmq.client.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class DynamicConnectionPool {
private final ConnectionFactory factory;
private final int minConnections;
private final int maxConnections;
private final int maxChannelsPerConnection;
private final BlockingQueue<PooledConnection> availableConnections;
private final ConcurrentHashMap<PooledConnection, Boolean> allConnections;
private final AtomicInteger currentSize = new AtomicInteger(0);
private final ScheduledExecutorService scaler;
private volatile boolean scaling = false;
public DynamicConnectionPool(ConnectionFactory factory, int minConnections,
int maxConnections, int maxChannelsPerConnection) {
this.factory = factory;
this.minConnections = minConnections;
this.maxConnections = maxConnections;
this.maxChannelsPerConnection = maxChannelsPerConnection;
this.availableConnections = new LinkedBlockingQueue<>();
this.allConnections = new ConcurrentHashMap<>();
this.scaler = Executors.newSingleThreadScheduledExecutor();
}
public void initialize() throws Exception {
for (int i = 0; i < minConnections; i++) {
PooledConnection conn = createConnection();
availableConnections.offer(conn);
}
scaler.scheduleAtFixedRate(this::scale, 30, 30, TimeUnit.SECONDS);
System.out.println("动态连接池初始化完成,最小连接数: " + minConnections);
}
public PooledConnection borrowConnection() throws Exception {
PooledConnection conn = availableConnections.poll();
if (conn == null) {
if (currentSize.get() < maxConnections) {
synchronized (this) {
if (currentSize.get() < maxConnections) {
conn = createConnection();
}
}
}
if (conn == null) {
conn = availableConnections.take();
}
}
return conn;
}
public void returnConnection(PooledConnection conn) {
if (conn != null && conn.isOpen()) {
availableConnections.offer(conn);
} else {
removeConnection(conn);
}
}
private PooledConnection createConnection() throws Exception {
Connection connection = factory.newConnection("dynamic-pool-" + currentSize.get());
PooledConnection pooledConn = new PooledConnection(connection, maxChannelsPerConnection, 10);
allConnections.put(pooledConn, true);
currentSize.incrementAndGet();
System.out.println("创建新连接,当前连接数: " + currentSize.get());
return pooledConn;
}
private void removeConnection(PooledConnection conn) {
allConnections.remove(conn);
availableConnections.remove(conn);
currentSize.decrementAndGet();
try {
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("移除连接,当前连接数: " + currentSize.get());
}
private void scale() {
int available = availableConnections.size();
int current = currentSize.get();
if (available < minConnections / 2 && current < maxConnections) {
scaleUp();
} else if (available > minConnections && current > minConnections) {
scaleDown();
}
}
private void scaleUp() {
if (scaling) return;
scaling = true;
try {
if (currentSize.get() < maxConnections) {
PooledConnection conn = createConnection();
availableConnections.offer(conn);
System.out.println("扩容连接池");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
scaling = false;
}
}
private void scaleDown() {
if (scaling) return;
scaling = true;
try {
PooledConnection conn = availableConnections.poll();
if (conn != null && currentSize.get() > minConnections) {
removeConnection(conn);
System.out.println("缩容连接池");
}
} finally {
scaling = false;
}
}
public void close() {
scaler.shutdown();
for (PooledConnection conn : allConnections.keySet()) {
try {
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
availableConnections.clear();
allConnections.clear();
currentSize.set(0);
System.out.println("动态连接池已关闭");
}
public int getCurrentSize() {
return currentSize.get();
}
public int getAvailableSize() {
return availableConnections.size();
}
}四、实际应用场景
4.1 Web 应用集成
java
import com.rabbitmq.client.*;
import javax.servlet.*;
import java.util.concurrent.*;
public class RabbitMQPoolFilter implements Filter {
private RabbitMQPoolManager poolManager;
@Override
public void init(FilterConfig filterConfig) throws ServletException {
String host = filterConfig.getInitParameter("host");
int port = Integer.parseInt(filterConfig.getInitParameter("port"));
String username = filterConfig.getInitParameter("username");
String password = filterConfig.getInitParameter("password");
PoolConfig config = PoolConfig.productionConfig();
config.setHost(host);
config.setPort(port);
config.setUsername(username);
config.setPassword(password);
try {
poolManager = new RabbitMQPoolManager(
config.getHost(), config.getPort(),
config.getUsername(), config.getPassword(),
config.getVirtualHost(),
config.getMaxConnections(),
config.getMaxChannelsPerConnection(),
config.getPrefetchCount()
);
poolManager.initialize();
filterConfig.getServletContext().setAttribute("rabbitmq.pool", poolManager);
} catch (Exception e) {
throw new ServletException("初始化 RabbitMQ 连接池失败", e);
}
}
@Override
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain) throws IOException, ServletException {
chain.doFilter(request, response);
}
@Override
public void destroy() {
if (poolManager != null) {
poolManager.close();
}
}
}4.2 Spring 集成
java
import com.rabbitmq.client.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Component
public class RabbitMQPoolComponent {
@Value("${rabbitmq.host}")
private String host;
@Value("${rabbitmq.port}")
private int port;
@Value("${rabbitmq.username}")
private String username;
@Value("${rabbitmq.password}")
private String password;
@Value("${rabbitmq.pool.max-connections:5}")
private int maxConnections;
@Value("${rabbitmq.pool.max-channels:10}")
private int maxChannels;
private RabbitMQPoolManager poolManager;
@PostConstruct
public void init() throws Exception {
PoolConfig config = new PoolConfig();
config.setHost(host);
config.setPort(port);
config.setUsername(username);
config.setPassword(password);
config.setMaxConnections(maxConnections);
config.setMaxChannelsPerConnection(maxChannels);
poolManager = new RabbitMQPoolManager(
config.getHost(), config.getPort(),
config.getUsername(), config.getPassword(),
config.getVirtualHost(),
config.getMaxConnections(),
config.getMaxChannelsPerConnection(),
config.getPrefetchCount()
);
poolManager.initialize();
}
@PreDestroy
public void destroy() {
if (poolManager != null) {
poolManager.close();
}
}
public void execute(ChannelOperation operation) throws Exception {
PooledChannel pooledChannel = null;
try {
pooledChannel = poolManager.borrowChannel();
operation.execute(pooledChannel.getChannel());
} finally {
if (pooledChannel != null) {
poolManager.returnChannel(pooledChannel);
}
}
}
@FunctionalInterface
public interface ChannelOperation {
void execute(Channel channel) throws Exception;
}
}五、常见问题与解决方案
5.1 连接泄漏
问题描述: 连接借出后未归还。
解决方案:
java
public void safeExecute(ChannelOperation operation) throws Exception {
PooledChannel pooledChannel = null;
try {
pooledChannel = poolManager.borrowChannel();
operation.execute(pooledChannel.getChannel());
} finally {
if (pooledChannel != null) {
poolManager.returnChannel(pooledChannel);
}
}
}5.2 连接池耗尽
问题描述: 连接池中无可用连接。
解决方案:
java
PooledChannel pooledChannel = poolManager.borrowChannel(5, TimeUnit.SECONDS);
if (pooledChannel == null) {
throw new RuntimeException("获取连接超时,连接池可能已耗尽");
}5.3 连接失效
问题描述: 池中连接已失效。
解决方案:
java
public Channel borrowChannel() throws Exception {
Channel channel = channelPool.poll();
if (channel == null || !channel.isOpen()) {
channel = createChannel();
}
return channel;
}六、最佳实践建议
6.1 连接池配置
text
配置建议:
├── 连接数:根据并发量设置,通常 5-20
├── 通道数:每个连接 10-50 个通道
├── 预取数量:根据处理能力设置
├── 超时时间:合理设置借出超时
└── 心跳间隔:生产环境 30-60 秒6.2 资源管理
text
管理建议:
├── 使用 try-finally 确保归还
├── 监控连接池使用率
├── 实现健康检查
├── 配置合理的超时
└── 优雅关闭连接池6.3 性能优化
text
性能建议:
├── 复用连接和通道
├── 批量操作减少借还
├── 异步处理提高吞吐
├── 合理设置池大小
└── 监控和调优