Appearance
内存问题诊断
概述
RabbitMQ 内存问题是生产环境中常见的故障原因,可能导致服务变慢、触发流控甚至崩溃。本文档将详细介绍内存问题的诊断方法和解决方案。
内存使用分析
1. RabbitMQ 内存组成
┌─────────────────────────────────────────────────────────────┐
│ RabbitMQ 内存组成 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Total Memory │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Queue │ │ Connection│ │ Binary │ │ │
│ │ │ Index │ │ Buffers │ │ Data │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Message │ │ Plugin │ │ Erlang │ │ │
│ │ │ Store │ │ Memory │ │ VM │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘2. 内存使用分布
┌─────────────────────────────────────────────────────────────┐
│ 典型内存使用分布 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 组件 │ 占比 │ 说明 │
│ ───────────────────────────────────────────────────────── │
│ 消息存储 │ 30-50% │ 队列中的消息 │
│ 连接缓冲区 │ 10-20% │ 连接和通道缓冲 │
│ 二进制数据 │ 10-20% │ 消息体和元数据 │
│ 队列索引 │ 5-10% │ 队列索引数据 │
│ 插件 │ 5-10% │ 管理插件等 │
│ Erlang VM │ 5-10% │ 运行时开销 │
│ 其他 │ 5-10% │ 其他组件 │
│ │
└─────────────────────────────────────────────────────────────┘诊断步骤
步骤1:查看整体内存状态
bash
# 查看内存使用概览
rabbitmqctl status | grep -A 30 "Memory"
# 使用 rabbitmq-diagnostics
rabbitmq-diagnostics memory_breakdown
# 查看内存水位
rabbitmqctl status | grep -A 5 "memory_high_watermark"步骤2:分析内存分布
bash
# 查看详细内存分布
rabbitmqctl eval 'rabbit_memory:memory_breakdown().'
# 查看各组件内存使用
rabbitmqctl eval '
[{total, rabbit_memory:total_memory()},
{connections, rabbit_memory:connection_memory()},
{queues, rabbit_memory:queue_memory()},
{binary, rabbit_memory:binary_memory()}].'
# 查看队列内存占用
rabbitmqctl list_queues name memory | sort -k2 -n -r | head -20步骤3:检查内存告警
bash
# 查看告警状态
rabbitmqctl list_node_names | while read node; do
echo "节点: $node"
rabbitmqctl -n $node eval 'rabbit_alarm:get_alarms().'
done
# 查看内存限制
rabbitmqctl eval 'rabbit_vm:memory_high_watermark().'
# 查看内存使用率
rabbitmqctl status | grep "Memory use"步骤4:分析进程内存
bash
# 查看Erlang进程内存
rabbitmqctl eval '
processes() |>
lists:map(fun(P) -> {P, process_info(P, memory)} end) |>
lists:sort(fun({_, {memory, A}}, {_, {memory, B}}) -> A > B end) |>
lists:sublist(10).'
# 查看ETS表内存
rabbitmqctl eval '
ets:all() |>
lists:map(fun(T) -> {T, ets:info(T, memory)} end) |>
lists:sort(fun({_, A}, {_, B}) -> A > B end) |>
lists:sublist(10).'PHP 内存诊断工具
php
<?php
class RabbitMQMemoryDiagnostics
{
private $apiUrl;
private $user;
private $password;
public function __construct(
string $host = 'localhost',
int $port = 15672,
string $user = 'guest',
string $password = 'guest'
) {
$this->apiUrl = "http://{$host}:{$port}/api";
$this->user = $user;
$this->password = $password;
}
private function request(string $endpoint): array
{
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $this->apiUrl . $endpoint,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_USERPWD => $this->user . ':' . $this->password,
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_TIMEOUT => 10,
]);
$response = curl_exec($ch);
curl_close($ch);
return json_decode($response, true);
}
public function getMemoryOverview(): array
{
$overview = $this->request('/overview');
$nodes = $this->request('/nodes');
$result = [
'timestamp' => date('Y-m-d H:i:s'),
'cluster_name' => $overview['cluster_name'] ?? 'unknown',
'nodes' => [],
'total_memory' => 0,
'alerts' => [],
];
foreach ($nodes as $node) {
$memUsed = $node['mem_used'] ?? 0;
$memLimit = $node['mem_limit'] ?? 1;
$memPercent = round($memUsed / $memLimit * 100, 2);
$result['nodes'][] = [
'name' => $node['name'],
'mem_used' => $this->formatBytes($memUsed),
'mem_limit' => $this->formatBytes($memLimit),
'mem_percent' => $memPercent,
'mem_alarm' => $node['mem_alarm'] ?? false,
'partitions' => count($node['partitions'] ?? []),
];
$result['total_memory'] += $memUsed;
if ($memPercent > 80) {
$result['alerts'][] = [
'level' => $memPercent > 90 ? 'critical' : 'warning',
'node' => $node['name'],
'message' => "节点 {$node['name']} 内存使用率 {$memPercent}%",
'recommendation' => $this->getRecommendation($memPercent),
];
}
if ($node['mem_alarm'] ?? false) {
$result['alerts'][] = [
'level' => 'critical',
'node' => $node['name'],
'message' => "节点 {$node['name']} 触发内存告警",
'recommendation' => '立即处理,考虑扩容或清理消息',
];
}
}
$result['total_memory_formatted'] = $this->formatBytes($result['total_memory']);
return $result;
}
public function getQueueMemoryAnalysis(): array
{
$queues = $this->request('/queues');
$result = [
'total_queues' => count($queues),
'total_memory' => 0,
'top_memory_queues' => [],
'queues_without_consumers' => [],
'queues_with_backlog' => [],
];
$queueMemory = [];
foreach ($queues as $queue) {
$memory = $queue['memory'] ?? 0;
$messages = $queue['messages'] ?? 0;
$consumers = $queue['consumers'] ?? 0;
$result['total_memory'] += $memory;
$queueMemory[] = [
'name' => $queue['name'],
'vhost' => $queue['vhost'],
'memory' => $memory,
'memory_formatted' => $this->formatBytes($memory),
'messages' => $messages,
'consumers' => $consumers,
];
if ($consumers === 0 && $messages > 0) {
$result['queues_without_consumers'][] = [
'name' => $queue['name'],
'messages' => $messages,
'memory' => $this->formatBytes($memory),
];
}
if ($messages > 10000) {
$result['queues_with_backlog'][] = [
'name' => $queue['name'],
'messages' => $messages,
'memory' => $this->formatBytes($memory),
];
}
}
usort($queueMemory, function ($a, $b) {
return $b['memory'] - $a['memory'];
});
$result['top_memory_queues'] = array_slice($queueMemory, 0, 20);
$result['total_memory_formatted'] = $this->formatBytes($result['total_memory']);
return $result;
}
public function getConnectionMemoryAnalysis(): array
{
$connections = $this->request('/connections');
$result = [
'total_connections' => count($connections),
'total_memory' => 0,
'by_user' => [],
'by_peer_host' => [],
'top_memory_connections' => [],
];
$connMemory = [];
foreach ($connections as $conn) {
$recvOct = $conn['recv_oct'] ?? 0;
$sendOct = $conn['send_oct'] ?? 0;
$channels = $conn['channels'] ?? 0;
$estimatedMemory = $recvOct + $sendOct + ($channels * 10240);
$result['total_memory'] += $estimatedMemory;
$user = $conn['user'] ?? 'unknown';
$peerHost = $conn['peer_host'] ?? 'unknown';
$result['by_user'][$user] = ($result['by_user'][$user] ?? 0) + 1;
$result['by_peer_host'][$peerHost] = ($result['by_peer_host'][$peerHost] ?? 0) + 1;
$connMemory[] = [
'name' => $conn['name'],
'user' => $user,
'peer_host' => $peerHost,
'channels' => $channels,
'recv_oct' => $this->formatBytes($recvOct),
'send_oct' => $this->formatBytes($sendOct),
'estimated_memory' => $this->formatBytes($estimatedMemory),
];
}
usort($connMemory, function ($a, $b) {
return strcmp($b['estimated_memory'], $a['estimated_memory']);
});
$result['top_memory_connections'] = array_slice($connMemory, 0, 10);
$result['total_memory_formatted'] = $this->formatBytes($result['total_memory']);
return $result;
}
public function generateMemoryReport(): string
{
$overview = $this->getMemoryOverview();
$queues = $this->getQueueMemoryAnalysis();
$connections = $this->getConnectionMemoryAnalysis();
$report = "=== RabbitMQ 内存诊断报告 ===\n";
$report .= "生成时间: {$overview['timestamp']}\n\n";
$report .= "【节点内存状态】\n";
foreach ($overview['nodes'] as $node) {
$report .= "节点: {$node['name']}\n";
$report .= " 已用: {$node['mem_used']} / {$node['mem_limit']} ({$node['mem_percent']}%)\n";
$report .= " 告警: " . ($node['mem_alarm'] ? '是' : '否') . "\n";
}
$report .= "\n";
$report .= "【队列内存分析】\n";
$report .= "总队列数: {$queues['total_queues']}\n";
$report .= "队列总内存: {$queues['total_memory_formatted']}\n";
$report .= "TOP5 内存占用队列:\n";
foreach (array_slice($queues['top_memory_queues'], 0, 5) as $queue) {
$report .= " {$queue['name']}: {$queue['memory_formatted']} ({$queue['messages']} 消息)\n";
}
$report .= "\n";
$report .= "【连接内存分析】\n";
$report .= "总连接数: {$connections['total_connections']}\n";
$report .= "连接总内存: {$connections['total_memory_formatted']}\n";
$report .= "按用户统计:\n";
foreach ($connections['by_user'] as $user => $count) {
$report .= " {$user}: {$count} 连接\n";
}
$report .= "\n";
if (!empty($overview['alerts'])) {
$report .= "【告警信息】\n";
foreach ($overview['alerts'] as $alert) {
$report .= "[{$alert['level']}] {$alert['message']}\n";
$report .= " 建议: {$alert['recommendation']}\n";
}
}
if (!empty($queues['queues_without_consumers'])) {
$report .= "\n【无消费者队列】\n";
foreach (array_slice($queues['queues_without_consumers'], 0, 5) as $queue) {
$report .= " {$queue['name']}: {$queue['messages']} 消息, {$queue['memory']}\n";
}
}
return $report;
}
private function formatBytes(int $bytes): string
{
$units = ['B', 'KB', 'MB', 'GB', 'TB'];
$i = 0;
while ($bytes >= 1024 && $i < count($units) - 1) {
$bytes /= 1024;
$i++;
}
return round($bytes, 2) . ' ' . $units[$i];
}
private function getRecommendation(float $memPercent): string
{
if ($memPercent > 90) {
return '紧急:立即清理消息或扩容内存';
} elseif ($memPercent > 80) {
return '警告:考虑增加消费者或清理积压消息';
} else {
return '关注:持续监控内存使用趋势';
}
}
}
// 使用示例
$diagnostics = new RabbitMQMemoryDiagnostics();
echo $diagnostics->generateMemoryReport();常见内存问题及解决方案
1. 消息积压导致内存过高
php
<?php
class MemoryOptimization
{
private $connection;
private $channel;
public function __construct()
{
$this->connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
'localhost', 5672, 'guest', 'guest'
);
$this->channel = $this->connection->channel();
}
public function convertToLazyQueue(string $queueName): void
{
$policy = [
'name' => 'lazy-' . $queueName,
'pattern' => '^' . preg_quote($queueName) . '$',
'definition' => [
'queue-type' => 'lazy',
],
'priority' => 1,
'apply-to' => 'queues',
];
echo "建议使用以下命令设置策略:\n";
echo "rabbitmqctl set_policy {$policy['name']} \"{$policy['pattern']}\" '" .
json_encode($policy['definition']) . "' --apply-to queues\n";
}
public function setQueueMaxLength(string $queueName, int $maxLength): void
{
$args = new \PhpAmqpLib\Wire\AMQPTable([
'x-max-length' => $maxLength,
'x-overflow' => 'reject-publish-dlx',
]);
echo "创建队列时使用参数:\n";
echo "x-max-length: {$maxLength}\n";
echo "x-overflow: reject-publish-dlx\n";
}
public function setQueueTTL(string $queueName, int $ttlMs): void
{
echo "建议设置队列TTL: {$ttlMs}ms\n";
echo "创建队列时使用参数 x-message-ttl: {$ttlMs}\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}2. 连接数过多导致内存过高
bash
# 查看连接数
rabbitmqctl list_connections | wc -l
# 查看每个用户的连接数
rabbitmqctl list_connections user | sort | uniq -c | sort -rn
# 限制最大连接数
# 在 rabbitmq.conf 中配置
# connection_max = 100003. 内存告警处理
bash
# 临时提高内存限制
rabbitmqctl eval 'application:set_env(rabbit, vm_memory_high_watermark, 0.6).'
# 触发GC
rabbitmqctl eval 'erlang:garbage_collect().'
# 查看阻塞的发布者
rabbitmqctl list_connections name state | grep blocked内存配置优化
1. 内存水位配置
bash
# rabbitmq.conf
# 内存高水位(默认0.4,即40%)
vm_memory_high_watermark.relative = 0.6
# 或使用绝对值
# vm_memory_high_watermark.absolute = 4GB
# 流控触发阈值(默认0.05,即在高水位的95%触发)
vm_memory_high_watermark_paging_ratio = 0.752. 队列内存优化
bash
# 使用惰性队列减少内存占用
# 设置策略
rabbitmqctl set_policy lazy-queues "^lazy\." '{"queue-type":"lazy"}' --apply-to queues
# 设置队列最大长度
rabbitmqctl set_policy max-length "^limited\." '{"max-length":100000}' --apply-to queues
# 设置消息TTL
rabbitmqctl set_policy message-ttl "^ttl\." '{"message-ttl":86400000}' --apply-to queues3. Erlang VM 内存优化
bash
# rabbitmq.conf
# Erlang VM 内存分配策略
scheduler_busy_wait_threshold = 1000
# GC 配置
# 在 advanced.config 中配置内存监控脚本
bash
#!/bin/bash
# memory_monitor.sh
THRESHOLD=80
LOG_FILE="/var/log/rabbitmq/memory_monitor.log"
log_message() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >> $LOG_FILE
}
get_memory_percent() {
rabbitmqctl status 2>/dev/null | grep -oP 'Memory use:.*?\K[0-9.]+(?=%)'
}
check_memory() {
local mem_percent=$(get_memory_percent)
if (( $(echo "$mem_percent > $THRESHOLD" | bc -l) )); then
log_message "WARNING: 内存使用率 ${mem_percent}% 超过阈值 ${THRESHOLD}%"
# 记录内存详情
log_message "队列内存TOP5:"
rabbitmqctl list_queues name memory 2>/dev/null | \
sort -k2 -rn | head -5 >> $LOG_FILE
# 发送告警
# send_alert "RabbitMQ内存告警: ${mem_percent}%"
fi
}
while true; do
check_memory
sleep 60
done注意事项
- 内存告警是保护机制:不要盲目提高内存限制
- 惰性队列有代价:会增加磁盘IO
- 监控要持续:内存问题往往是渐进的
- 预防优于治疗:提前设置队列限制
- 测试内存恢复:确保内存下降后服务能恢复
