Skip to content

安全加固实践

概述

RabbitMQ 安全加固是保障消息系统数据安全和访问控制的重要环节。本文档介绍 RabbitMQ 安全配置的最佳实践,包括认证、授权、加密、网络安全等方面。

安全架构层次

┌─────────────────────────────────────────────────────────────────────────┐
│                        安全架构层次                                      │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                        网络安全层                                │   │
│  │                                                                  │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │   │
│  │  │   防火墙    │  │   VPC隔离   │  │   端口限制  │             │   │
│  │  └─────────────┘  └─────────────┘  └─────────────┘             │   │
│  │                                                                  │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                             │                                           │
│                             ▼                                           │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                        传输安全层                                │   │
│  │                                                                  │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │   │
│  │  │  TLS/SSL    │  │  证书验证   │  │  加密传输   │             │   │
│  │  └─────────────┘  └─────────────┘  └─────────────┘             │   │
│  │                                                                  │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                             │                                           │
│                             ▼                                           │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                        认证授权层                                │   │
│  │                                                                  │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │   │
│  │  │  用户认证   │  │  权限控制   │  │  vhost隔离  │             │   │
│  │  └─────────────┘  └─────────────┘  └─────────────┘             │   │
│  │                                                                  │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                             │                                           │
│                             ▼                                           │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                        应用安全层                                │   │
│  │                                                                  │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │   │
│  │  │  消息加密   │  │  敏感数据   │  │  审计日志   │             │   │
│  │  └─────────────┘  └─────────────┘  └─────────────┘             │   │
│  │                                                                  │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

PHP 代码示例

正确做法:TLS 安全连接

php
<?php

namespace App\Messaging\Security;

use PhpAmqpLib\Connection\AMQPSSLConnection;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class SecureConnectionFactory
{
    private array $config;
    
    public function __construct(array $config)
    {
        $this->config = array_merge([
            'host' => 'localhost',
            'port' => 5671,
            'user' => 'guest',
            'password' => 'guest',
            'vhost' => '/',
            'ssl' => [
                'enabled' => true,
                'ca_file' => '/etc/ssl/certs/ca-bundle.crt',
                'cert_file' => '/etc/ssl/certs/client.crt',
                'key_file' => '/etc/ssl/private/client.key',
                'verify_peer' => true,
                'verify_peer_name' => true,
            ],
            'connection_timeout' => 3.0,
            'read_write_timeout' => 10.0,
            'heartbeat' => 60,
        ], $config);
    }
    
    public function createConnection()
    {
        if ($this->config['ssl']['enabled']) {
            return $this->createSSLConnection();
        }
        
        return $this->createPlainConnection();
    }
    
    private function createSSLConnection(): AMQPSSLConnection
    {
        $sslOptions = [
            'cafile' => $this->config['ssl']['ca_file'],
            'local_cert' => $this->config['ssl']['cert_file'],
            'local_pk' => $this->config['ssl']['key_file'],
            'verify_peer' => $this->config['ssl']['verify_peer'],
            'verify_peer_name' => $this->config['ssl']['verify_peer_name'],
            'allow_self_signed' => $this->config['ssl']['allow_self_signed'] ?? false,
        ];
        
        return new AMQPSSLConnection(
            $this->config['host'],
            $this->config['port'],
            $this->config['user'],
            $this->config['password'],
            $this->config['vhost'],
            $sslOptions,
            [
                'connection_timeout' => $this->config['connection_timeout'],
                'read_write_timeout' => $this->config['read_write_timeout'],
                'heartbeat' => $this->config['heartbeat'],
            ]
        );
    }
    
    private function createPlainConnection(): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            $this->config['host'],
            $this->config['port'],
            $this->config['user'],
            $this->config['password'],
            $this->config['vhost'],
            false,
            'AMQPLAIN',
            null,
            'en_US',
            $this->config['connection_timeout'],
            $this->config['read_write_timeout'],
            null,
            true,
            $this->config['heartbeat']
        );
    }
}

用户权限管理

php
<?php

namespace App\Messaging\Security;

class UserManager
{
    private $apiClient;
    private string $baseUrl;
    
    public function __construct(string $host, string $user, string $password)
    {
        $this->baseUrl = "http://{$host}:15672/api";
        $this->apiClient = new ApiClient($user, $password);
    }
    
    public function createUser(
        string $username,
        string $password,
        array $tags = []
    ): bool {
        $url = "{$this->baseUrl}/users/{$username}";
        
        $data = [
            'password' => $password,
            'tags' => implode(',', $tags),
        ];
        
        $response = $this->apiClient->put($url, $data);
        
        return $response['status'] === 204;
    }
    
    public function deleteUser(string $username): bool
    {
        $url = "{$this->baseUrl}/users/{$username}";
        
        $response = $this->apiClient->delete($url);
        
        return $response['status'] === 204;
    }
    
    public function setPermissions(
        string $username,
        string $vhost,
        array $permissions
    ): bool {
        $url = "{$this->baseUrl}/permissions/{$vhost}/{$username}";
        
        $data = [
            'configure' => $permissions['configure'] ?? '.*',
            'write' => $permissions['write'] ?? '.*',
            'read' => $permissions['read'] ?? '.*',
        ];
        
        $response = $this->apiClient->put($url, $data);
        
        return $response['status'] === 204;
    }
    
    public function setTopicPermissions(
        string $username,
        string $vhost,
        string $exchange,
        array $permissions
    ): bool {
        $url = "{$this->baseUrl}/topic-permissions/{$vhost}/{$username}";
        
        $data = [
            'exchange' => $exchange,
            'write' => $permissions['write'] ?? '.*',
            'read' => $permissions['read'] ?? '.*',
        ];
        
        $response = $this->apiClient->put($url, $data);
        
        return $response['status'] === 204;
    }
    
    public function createVhost(string $name): bool
    {
        $url = "{$this->baseUrl}/vhosts/{$name}";
        
        $response = $this->apiClient->put($url, []);
        
        return $response['status'] === 204;
    }
    
    public function deleteVhost(string $name): bool
    {
        $url = "{$this->baseUrl}/vhosts/{$name}";
        
        $response = $this->apiClient->delete($url);
        
        return $response['status'] === 204;
    }
    
    public function listUsers(): array
    {
        $url = "{$this->baseUrl}/users";
        
        return $this->apiClient->get($url);
    }
    
    public function listPermissions(string $vhost = null): array
    {
        $url = $vhost 
            ? "{$this->baseUrl}/vhosts/{$vhost}/permissions"
            : "{$this->baseUrl}/permissions";
        
        return $this->apiClient->get($url);
    }
}

class ApiClient
{
    private string $user;
    private string $password;
    
    public function __construct(string $user, string $password)
    {
        $this->user = $user;
        $this->password = $password;
    }
    
    public function get(string $url): array
    {
        return $this->request('GET', $url);
    }
    
    public function put(string $url, array $data): array
    {
        return $this->request('PUT', $url, $data);
    }
    
    public function delete(string $url): array
    {
        return $this->request('DELETE', $url);
    }
    
    private function request(string $method, string $url, array $data = null): array
    {
        $ch = curl_init();
        
        $options = [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => $this->user . ':' . $this->password,
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_TIMEOUT => 10,
        ];
        
        if ($method === 'PUT') {
            $options[CURLOPT_CUSTOMREQUEST] = 'PUT';
            $options[CURLOPT_POSTFIELDS] = json_encode($data);
        } elseif ($method === 'DELETE') {
            $options[CURLOPT_CUSTOMREQUEST] = 'DELETE';
        }
        
        curl_setopt_array($ch, $options);
        
        $response = curl_exec($ch);
        $status = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        
        curl_close($ch);
        
        return [
            'status' => $status,
            'data' => json_decode($response, true),
        ];
    }
}

消息加密

php
<?php

namespace App\Messaging\Security;

use PhpAmqpLib\Message\AMQPMessage;

class MessageEncryption
{
    private string $encryptionKey;
    private string $cipher = 'aes-256-gcm';
    
    public function __construct(string $encryptionKey)
    {
        $this->encryptionKey = $encryptionKey;
    }
    
    public function encrypt(array $data): string
    {
        $plaintext = json_encode($data, JSON_UNESCAPED_UNICODE);
        $iv = random_bytes(openssl_cipher_iv_length($this->cipher));
        $tag = '';
        
        $ciphertext = openssl_encrypt(
            $plaintext,
            $this->cipher,
            $this->encryptionKey,
            OPENSSL_RAW_DATA,
            $iv,
            $tag
        );
        
        return base64_encode($iv . $tag . $ciphertext);
    }
    
    public function decrypt(string $encrypted): array
    {
        $data = base64_decode($encrypted);
        
        $ivLength = openssl_cipher_iv_length($this->cipher);
        $tagLength = 16;
        
        $iv = substr($data, 0, $ivLength);
        $tag = substr($data, $ivLength, $tagLength);
        $ciphertext = substr($data, $ivLength + $tagLength);
        
        $plaintext = openssl_decrypt(
            $ciphertext,
            $this->cipher,
            $this->encryptionKey,
            OPENSSL_RAW_DATA,
            $iv,
            $tag
        );
        
        return json_decode($plaintext, true);
    }
}

class SecureMessageProducer
{
    private $channel;
    private MessageEncryption $encryption;
    
    public function __construct($channel, string $encryptionKey)
    {
        $this->channel = $channel;
        $this->encryption = new MessageEncryption($encryptionKey);
    }
    
    public function publish(
        string $exchange,
        string $routingKey,
        array $data,
        bool $encrypt = true
    ): void {
        $payload = $encrypt ? $this->encryption->encrypt($data) : json_encode($data);
        
        $properties = [
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'headers' => [
                'encrypted' => $encrypt,
            ],
        ];
        
        $message = new AMQPMessage($payload, $properties);
        
        $this->channel->basic_publish($message, $exchange, $routingKey);
    }
}

class SecureMessageConsumer
{
    private MessageEncryption $encryption;
    
    public function __construct(string $encryptionKey)
    {
        $this->encryption = new MessageEncryption($encryptionKey);
    }
    
    public function process(AMQPMessage $message): array
    {
        $headers = $message->get('application_headers');
        $encrypted = $headers ? ($headers->getNativeData()['encrypted'] ?? false) : false;
        
        if ($encrypted) {
            return $this->encryption->decrypt($message->body);
        }
        
        return json_decode($message->body, true);
    }
}

敏感数据处理

php
<?php

namespace App\Messaging\Security;

class SensitiveDataHandler
{
    private array $sensitiveFields = [
        'password',
        'credit_card',
        'card_number',
        'cvv',
        'ssn',
        'api_key',
        'secret',
        'token',
    ];
    
    public function mask(array $data): array
    {
        return $this->processRecursive($data, [$this, 'maskValue']);
    }
    
    public function redact(array $data): array
    {
        return $this->processRecursive($data, [$this, 'redactValue']);
    }
    
    private function processRecursive(array $data, callable $processor): array
    {
        $result = [];
        
        foreach ($data as $key => $value) {
            if (is_array($value)) {
                $result[$key] = $this->processRecursive($value, $processor);
            } elseif ($this->isSensitiveField($key)) {
                $result[$key] = $processor($value);
            } else {
                $result[$key] = $value;
            }
        }
        
        return $result;
    }
    
    private function isSensitiveField(string $field): bool
    {
        $field = strtolower($field);
        
        foreach ($this->sensitiveFields as $sensitive) {
            if (strpos($field, $sensitive) !== false) {
                return true;
            }
        }
        
        return false;
    }
    
    private function maskValue($value): string
    {
        if (!is_string($value)) {
            return '***';
        }
        
        $length = strlen($value);
        
        if ($length <= 4) {
            return str_repeat('*', $length);
        }
        
        return substr($value, 0, 2) . str_repeat('*', $length - 4) . substr($value, -2);
    }
    
    private function redactValue(): string
    {
        return '[REDACTED]';
    }
    
    public function addSensitiveField(string $field): void
    {
        $this->sensitiveFields[] = strtolower($field);
    }
}

审计日志

php
<?php

namespace App\Messaging\Security;

use PhpAmqpLib\Message\AMQPMessage;

class AuditLogger
{
    private $logger;
    private SensitiveDataHandler $dataHandler;
    
    public function __construct($logger, SensitiveDataHandler $dataHandler)
    {
        $this->logger = $logger;
        $this->dataHandler = $dataHandler;
    }
    
    public function logPublish(
        string $exchange,
        string $routingKey,
        AMQPMessage $message,
        array $context = []
    ): void {
        $this->log('publish', [
            'exchange' => $exchange,
            'routing_key' => $routingKey,
            'message_id' => $message->get('message_id'),
            'user_id' => $message->get('user_id'),
            'app_id' => $message->get('app_id'),
            'body_preview' => $this->getBodyPreview($message->body),
        ], $context);
    }
    
    public function logConsume(
        string $queue,
        AMQPMessage $message,
        array $context = []
    ): void {
        $this->log('consume', [
            'queue' => $queue,
            'message_id' => $message->get('message_id'),
            'consumer_tag' => $message->getConsumerTag(),
            'delivery_tag' => $message->getDeliveryTag(),
            'body_preview' => $this->getBodyPreview($message->body),
        ], $context);
    }
    
    public function logAck(
        string $queue,
        AMQPMessage $message,
        array $context = []
    ): void {
        $this->log('ack', [
            'queue' => $queue,
            'message_id' => $message->get('message_id'),
            'delivery_tag' => $message->getDeliveryTag(),
        ], $context);
    }
    
    public function logNack(
        string $queue,
        AMQPMessage $message,
        string $reason,
        array $context = []
    ): void {
        $this->log('nack', [
            'queue' => $queue,
            'message_id' => $message->get('message_id'),
            'delivery_tag' => $message->getDeliveryTag(),
            'reason' => $reason,
        ], $context);
    }
    
    public function logConnection(
        string $event,
        array $details,
        array $context = []
    ): void {
        $this->log('connection', array_merge([
            'event' => $event,
        ], $details), $context);
    }
    
    private function log(string $action, array $data, array $context): void
    {
        $logEntry = [
            'timestamp' => date('c'),
            'action' => $action,
            'data' => $this->dataHandler->mask($data),
            'context' => $context,
            'ip' => $_SERVER['REMOTE_ADDR'] ?? null,
            'user_agent' => $_SERVER['HTTP_USER_AGENT'] ?? null,
        ];
        
        $this->logger->info('rabbitmq_audit', $logEntry);
    }
    
    private function getBodyPreview(string $body, int $maxLength = 200): string
    {
        $decoded = json_decode($body, true);
        
        if ($decoded) {
            $decoded = $this->dataHandler->mask($decoded);
            $body = json_encode($decoded);
        }
        
        if (strlen($body) > $maxLength) {
            return substr($body, 0, $maxLength) . '...';
        }
        
        return $body;
    }
}

错误做法:不安全配置

php
<?php

class InsecureConnection
{
    public function connect()
    {
        // 错误1:使用默认凭证
        $connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        
        // 错误2:无 TLS 加密
        // 错误3:无证书验证
        // 错误4:使用默认 vhost
        
        return $connection;
    }
}

class InsecureMessage
{
    public function publish(array $data)
    {
        // 错误5:明文传输敏感信息
        $message = new AMQPMessage(json_encode([
            'user_id' => $data['user_id'],
            'password' => $data['password'],
            'credit_card' => $data['credit_card'],
        ]));
        
        // 错误6:无消息加密
        // 错误7:无审计日志
        
        $this->channel->basic_publish($message, '', 'queue');
    }
}

class InsecurePermissions
{
    public function setup()
    {
        // 错误8:所有用户都有全部权限
        // 错误9:未使用 vhost 隔离
        // 错误10:未限制队列访问
    }
}

RabbitMQ 安全配置

配置文件安全设置

# rabbitmq.conf

# 禁用 guest 用户远程访问
loopback_users.guest = true

# 启用 TLS
listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq/ssl/ca.crt
ssl_options.certfile = /etc/rabbitmq/ssl/server.crt
ssl_options.keyfile = /etc/rabbitmq/ssl/server.key
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = false

# 管理界面 TLS
management.ssl.port = 15671
management.ssl.cacertfile = /etc/rabbitmq/ssl/ca.crt
management.ssl.certfile = /etc/rabbitmq/ssl/server.crt
management.ssl.keyfile = /etc/rabbitmq/ssl/server.key

# 认证机制
auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = AMQPLAIN

# 密码策略
password_checking.check = true

# 连接限制
connection_max = 10000
channel_max = 2048

# 心跳
heartbeat = 60

# 日志
log.console.level = info
log.file.level = info

用户权限配置脚本

bash
#!/bin/bash

# 创建管理员用户
rabbitmqctl add_user admin <secure_password>
rabbitmqctl set_user_tags admin administrator

# 创建应用用户
rabbitmqctl add_user app_user <secure_password>
rabbitmqctl set_user_tags app_user monitoring

# 创建 vhost
rabbitmqctl add_vhost /production
rabbitmqctl add_vhost /development

# 设置权限
rabbitmqctl set_permissions -p /production app_user "^app\..*" "^app\..*" "^app\..*"
rabbitmqctl set_permissions -p /development app_user ".*" ".*" ".*"

# 删除默认用户
rabbitmqctl delete_user guest

# 列出用户
rabbitmqctl list_users
rabbitmqctl list_permissions -p /production

防火墙配置

bash
# iptables 配置

# AMQP 端口 (5672/5671)
iptables -A INPUT -p tcp --dport 5672 -s 10.0.0.0/8 -j ACCEPT
iptables -A INPUT -p tcp --dport 5671 -s 10.0.0.0/8 -j ACCEPT

# 管理界面 (15672/15671)
iptables -A INPUT -p tcp --dport 15672 -s 10.0.0.0/8 -j ACCEPT
iptables -A INPUT -p tcp --dport 15671 -s 10.0.0.0/8 -j ACCEPT

# 集群通信 (4369, 25672)
iptables -A INPUT -p tcp --dport 4369 -s 10.0.0.0/8 -j ACCEPT
iptables -A INPUT -p tcp --dport 25672 -s 10.0.0.0/8 -j ACCEPT

# Prometheus 监控 (15692)
iptables -A INPUT -p tcp --dport 15692 -s 10.0.0.0/8 -j ACCEPT

# 拒绝其他访问
iptables -A INPUT -p tcp --dport 5672:15692 -j DROP

最佳实践建议清单

认证安全

  • [ ] 禁用 guest 用户远程访问
  • [ ] 使用强密码策略
  • [ ] 定期轮换密码
  • [ ] 使用最小权限原则

传输安全

  • [ ] 启用 TLS 加密
  • [ ] 配置证书验证
  • [ ] 使用可信 CA 证书
  • [ ] 定期更新证书

网络安全

  • [ ] 配置防火墙规则
  • [ ] 限制端口访问
  • [ ] 使用 VPC 隔离
  • [ ] 配置安全组

应用安全

  • [ ] 敏感数据加密
  • [ ] 配置审计日志
  • [ ] 数据脱敏处理
  • [ ] 安全编码实践

生产环境注意事项

  1. 凭证管理

    • 使用密钥管理系统
    • 定期轮换凭证
    • 避免硬编码凭证
  2. 证书管理

    • 使用可信 CA
    • 设置证书过期提醒
    • 准备证书更新流程
  3. 权限审计

    • 定期审计用户权限
    • 清理无用账户
    • 记录权限变更
  4. 安全监控

    • 监控异常登录
    • 监控权限变更
    • 监控敏感操作

相关链接