Skip to content

RabbitMQ OAuth 2.0 认证

概述

OAuth 2.0 是现代应用中广泛使用的授权框架,RabbitMQ 通过 rabbitmq_auth_backend_oauth2 插件支持 OAuth 2.0 认证。这使得 RabbitMQ 能够与 Keycloak、Auth0、Okta 等身份提供商集成,实现现代化的身份认证和授权管理。

核心知识点

OAuth 2.0 认证架构

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Client    │────▶│  RabbitMQ   │────▶│    OAuth    │
│  (应用程序)  │◀────│   Server    │◀────│   Provider  │
└─────────────┘     └─────────────┘     └─────────────┘
       │                  │                    │
       │                  │                    │
       │  1. 获取 Token   │                    │
       │─────────────────────────────────────▶│
       │                  │                    │
       │  2. 返回 JWT     │                    │
       │◀─────────────────────────────────────│
       │                  │                    │
       │  3. 连接 RabbitMQ (携带 Token)        │
       │─────────────────▶│                    │
       │                  │  4. 验证 Token     │
       │                  │───────────────────▶│
       │                  │  5. 返回用户信息    │
       │                  │◀───────────────────│
       │  6. 连接建立     │                    │
       │◀─────────────────│                    │
       └──────────────────┴────────────────────┘

JWT Token 结构

RabbitMQ OAuth 2.0 使用 JWT (JSON Web Token) 进行认证:

┌─────────────────────────────────────────────────────────────┐
│                        JWT Token                            │
├─────────────────────────────────────────────────────────────┤
│  Header                                                     │
│  {                                                          │
│    "alg": "RS256",                                          │
│    "typ": "JWT",                                            │
│    "kid": "key-id"                                          │
│  }                                                          │
├─────────────────────────────────────────────────────────────┤
│  Payload (Claims)                                           │
│  {                                                          │
│    "sub": "user-123",                                       │
│    "iss": "https://auth.example.com",                       │
│    "aud": "rabbitmq",                                       │
│    "exp": 1234567890,                                       │
│    "scope": "rabbitmq.read:* rabbitmq.write:*",             │
│    "rabbitmq": {                                            │
│      "vhost": "/",                                          │
│      "resource": "queue",                                   │
│      "tags": ["management"]                                 │
│    }                                                        │
│  }                                                          │
├─────────────────────────────────────────────────────────────┤
│  Signature                                                  │
│  HMACSHA256(                                                │
│    base64UrlEncode(header) + "." +                          │
│    base64UrlEncode(payload),                                │
│    secret                                                   │
│  )                                                          │
└─────────────────────────────────────────────────────────────┘

支持的 OAuth 2.0 提供商

提供商类型说明
Keycloak开源Red Hat 开源身份管理
Auth0商业通用身份平台
Okta商业企业身份管理
Azure AD商业微软企业目录
Google OAuth商业Google 身份服务

配置示例

安装 OAuth 2.0 插件

bash
# 启用 OAuth 2.0 认证插件
rabbitmq-plugins enable rabbitmq_auth_backend_oauth2

# 验证插件已启用
rabbitmq-plugins list | grep oauth2

基础配置

rabbitmq.conf 中配置 OAuth 2.0:

conf
# 启用 OAuth 2.0 认证后端
auth_backends.1 = oauth2

# OAuth 2.0 提供商配置
auth_oauth2.resource_server_id = rabbitmq

# JWT 签名验证密钥(对称密钥)
auth_oauth2.signing_keys.default = your-secret-key-here

# 或使用 JWK Set URL(推荐)
auth_oauth2.jwk_set_url = https://auth.example.com/.well-known/jwks.json

# Token 端点
auth_oauth2.token_endpoint = https://auth.example.com/oauth/token

# Issuer URL
auth_oauth2.issuer = https://auth.example.com

# 资源服务器 ID
auth_oauth2.resource_server_id = rabbitmq-api

Keycloak 配置

conf
# Keycloak 配置示例
auth_oauth2.resource_server_id = rabbitmq
auth_oauth2.issuer = https://keycloak.example.com/realms/myrealm
auth_oauth2.jwk_set_url = https://keycloak.example.com/realms/myrealm/protocol/openid-connect/certs

# 可选:配置客户端 ID
auth_oauth2.client_id = rabbitmq-client

# 可选:配置作用域
auth_oauth2.scope = openid profile rabbitmq

Auth0 配置

conf
# Auth0 配置示例
auth_oauth2.resource_server_id = rabbitmq-api
auth_oauth2.issuer = https://your-tenant.auth0.com/
auth_oauth2.jwk_set_url = https://your-tenant.auth0.com/.well-known/jwks.json

# Auth0 API 配置
auth_oauth2.audience = https://rabbitmq.example.com

Azure AD 配置

conf
# Azure AD 配置示例
auth_oauth2.resource_server_id = api://rabbitmq-api
auth_oauth2.issuer = https://login.microsoftonline.com/{tenant-id}/v2.0
auth_oauth2.jwk_set_url = https://login.microsoftonline.com/{tenant-id}/discovery/v2.0/keys

# Azure AD 特定配置
auth_oauth2.azure_tenant_id = your-tenant-id

高级配置

conf
# Token 验证配置
auth_oauth2.verify_aud = true
auth_oauth2.verify_iss = true

# 时钟偏差容忍(秒)
auth_oauth2.clock_skew = 60

# 默认 vhost
auth_oauth2.default_vhost = /

# 默认权限
auth_oauth2.default_permissions.configure = .*
auth_oauth2.default_permissions.write = .*
auth_oauth2.default_permissions.read = .*

# 作用域前缀
auth_oauth2.scope_prefix = rabbitmq.

# 静态作用域映射
auth_oauth2.scope_aliases.administrator = rabbitmq.tag:administrator
auth_oauth2.scope_aliases.monitoring = rabbitmq.tag:monitoring
auth_oauth2.scope_aliases.management = rabbitmq.tag:management

PHP 代码示例

OAuth 2.0 客户端

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;

class OAuth2RabbitMQClient
{
    private $oauthConfig;
    private $rabbitmqConfig;
    private $tokenCache = [];

    public function __construct(array $oauthConfig, array $rabbitmqConfig)
    {
        $this->oauthConfig = $oauthConfig;
        $this->rabbitmqConfig = $rabbitmqConfig;
    }

    public function connectWithClientCredentials(): ?AMQPStreamConnection
    {
        $token = $this->getClientCredentialsToken();
        
        if (!$token) {
            throw new RuntimeException('获取访问令牌失败');
        }

        return $this->connectWithToken($token);
    }

    public function connectWithPassword(string $username, string $password): ?AMQPStreamConnection
    {
        $token = $this->getPasswordGrantToken($username, $password);
        
        if (!$token) {
            throw new RuntimeException('获取访问令牌失败');
        }

        return $this->connectWithToken($token);
    }

    public function connectWithToken(string $token): ?AMQPStreamConnection
    {
        try {
            return new AMQPStreamConnection(
                $this->rabbitmqConfig['host'],
                $this->rabbitmqConfig['port'],
                $token,
                '',
                $this->rabbitmqConfig['vhost'] ?? '/'
            );
        } catch (Exception $e) {
            throw new RuntimeException('RabbitMQ 连接失败: ' . $e->getMessage());
        }
    }

    private function getClientCredentialsToken(): ?string
    {
        $cacheKey = 'client_credentials';
        
        if (isset($this->tokenCache[$cacheKey])) {
            $cached = $this->tokenCache[$cacheKey];
            if ($cached['expires_at'] > time() + 60) {
                return $cached['token'];
            }
        }

        $response = $this->requestToken([
            'grant_type' => 'client_credentials',
            'client_id' => $this->oauthConfig['client_id'],
            'client_secret' => $this->oauthConfig['client_secret'],
            'scope' => $this->oauthConfig['scope'] ?? 'rabbitmq.read:* rabbitmq.write:*'
        ]);

        if ($response && isset($response['access_token'])) {
            $this->tokenCache[$cacheKey] = [
                'token' => $response['access_token'],
                'expires_at' => time() + ($response['expires_in'] ?? 3600)
            ];
            return $response['access_token'];
        }

        return null;
    }

    private function getPasswordGrantToken(string $username, string $password): ?string
    {
        $response = $this->requestToken([
            'grant_type' => 'password',
            'client_id' => $this->oauthConfig['client_id'],
            'client_secret' => $this->oauthConfig['client_secret'],
            'username' => $username,
            'password' => $password,
            'scope' => $this->oauthConfig['scope'] ?? 'openid profile rabbitmq'
        ]);

        return $response['access_token'] ?? null;
    }

    private function requestToken(array $params): ?array
    {
        $ch = curl_init($this->oauthConfig['token_url']);
        
        curl_setopt_array($ch, [
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_POST => true,
            CURLOPT_POSTFIELDS => http_build_query($params),
            CURLOPT_HTTPHEADER => [
                'Content-Type: application/x-www-form-urlencoded',
                'Accept: application/json'
            ]
        ]);

        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);

        if ($httpCode === 200) {
            return json_decode($response, true);
        }

        error_log("Token request failed: HTTP {$httpCode} - {$response}");
        return null;
    }

    public function refreshToken(string $refreshToken): ?string
    {
        $response = $this->requestToken([
            'grant_type' => 'refresh_token',
            'client_id' => $this->oauthConfig['client_id'],
            'client_secret' => $this->oauthConfig['client_secret'],
            'refresh_token' => $refreshToken
        ]);

        return $response['access_token'] ?? null;
    }
}

// 使用示例
$client = new OAuth2RabbitMQClient(
    [
        'token_url' => 'https://auth.example.com/oauth/token',
        'client_id' => 'rabbitmq-client',
        'client_secret' => 'client-secret',
        'scope' => 'rabbitmq.read:* rabbitmq.write:*'
    ],
    [
        'host' => 'localhost',
        'port' => 5672,
        'vhost' => '/'
    ]
);

// 使用客户端凭证连接
$connection = $client->connectWithClientCredentials();
echo "OAuth 2.0 认证连接成功\n";

JWT Token 生成器

php
<?php

class RabbitMQJWTGenerator
{
    private $secretKey;
    private $issuer;
    private $audience;
    private $algorithm = 'HS256';

    public function __construct(string $secretKey, string $issuer, string $audience = 'rabbitmq')
    {
        $this->secretKey = $secretKey;
        $this->issuer = $issuer;
        $this->audience = $audience;
    }

    public function generateToken(
        string $subject,
        array $permissions,
        int $expiresIn = 3600,
        array $tags = []
    ): string {
        $now = time();
        
        $header = [
            'alg' => $this->algorithm,
            'typ' => 'JWT'
        ];

        $payload = [
            'sub' => $subject,
            'iss' => $this->issuer,
            'aud' => $this->audience,
            'iat' => $now,
            'exp' => $now + $expiresIn,
            'jti' => bin2hex(random_bytes(16))
        ];

        // 添加 RabbitMQ 特定声明
        $rabbitmqClaims = [];
        
        if (!empty($permissions)) {
            $rabbitmqClaims['permissions'] = $permissions;
        }
        
        if (!empty($tags)) {
            $rabbitmqClaims['tags'] = $tags;
        }

        if (!empty($rabbitmqClaims)) {
            $payload['rabbitmq'] = $rabbitmqClaims;
        }

        // 编码 Header 和 Payload
        $headerEncoded = $this->base64UrlEncode(json_encode($header));
        $payloadEncoded = $this->base64UrlEncode(json_encode($payload));

        // 生成签名
        $signature = $this->sign($headerEncoded . '.' . $payloadEncoded);
        $signatureEncoded = $this->base64UrlEncode($signature);

        return $headerEncoded . '.' . $payloadEncoded . '.' . $signatureEncoded;
    }

    public function generateScopedToken(
        string $subject,
        string $vhost = '/',
        array $resources = []
    ): string {
        $scopes = [];
        
        foreach ($resources as $resource) {
            $type = $resource['type'] ?? 'queue';
            $name = $resource['name'] ?? '*';
            $permissions = $resource['permissions'] ?? ['read', 'write', 'configure'];
            
            foreach ($permissions as $perm) {
                $scopes[] = "rabbitmq.{$perm}:{$type}/{$vhost}/{$name}";
            }
        }

        $payload = [
            'sub' => $subject,
            'iss' => $this->issuer,
            'aud' => $this->audience,
            'iat' => time(),
            'exp' => time() + 3600,
            'scope' => implode(' ', $scopes)
        ];

        $headerEncoded = $this->base64UrlEncode(json_encode(['alg' => $this->algorithm, 'typ' => 'JWT']));
        $payloadEncoded = $this->base64UrlEncode(json_encode($payload));
        $signatureEncoded = $this->base64UrlEncode($this->sign($headerEncoded . '.' . $payloadEncoded));

        return $headerEncoded . '.' . $payloadEncoded . '.' . $signatureEncoded;
    }

    private function sign(string $data): string
    {
        return hash_hmac('sha256', $data, $this->secretKey, true);
    }

    private function base64UrlEncode(string $data): string
    {
        return rtrim(strtr(base64_encode($data), '+/', '-_'), '=');
    }

    public function verifyToken(string $token): ?array
    {
        $parts = explode('.', $token);
        
        if (count($parts) !== 3) {
            return null;
        }

        [$headerEncoded, $payloadEncoded, $signatureEncoded] = $parts;

        // 验证签名
        $expectedSignature = $this->base64UrlEncode($this->sign($headerEncoded . '.' . $payloadEncoded));
        
        if (!hash_equals($expectedSignature, $signatureEncoded)) {
            return null;
        }

        // 解码 Payload
        $payload = json_decode(base64_decode(strtr($payloadEncoded, '-_', '+/')), true);

        // 验证过期时间
        if (isset($payload['exp']) && $payload['exp'] < time()) {
            return null;
        }

        return $payload;
    }
}

// 使用示例
$generator = new RabbitMQJWTGenerator(
    'your-secret-key',
    'https://your-app.example.com',
    'rabbitmq'
);

// 生成带权限的 Token
$token = $generator->generateToken(
    'user-123',
    [
        ['vhost' => '/', 'resource' => 'queue', 'name' => 'myapp_.*', 'permissions' => ['configure', 'write', 'read']]
    ],
    3600,
    ['management']
);

echo "生成的 Token: {$token}\n";

// 生成带作用域的 Token
$scopedToken = $generator->generateScopedToken(
    'service-account',
    '/',
    [
        ['type' => 'queue', 'name' => 'orders_.*', 'permissions' => ['read', 'write']],
        ['type' => 'exchange', 'name' => 'events', 'permissions' => ['write']]
    ]
);

echo "作用域 Token: {$scopedToken}\n";

Keycloak 集成

php
<?php

class KeycloakRabbitMQIntegration
{
    private $keycloakUrl;
    private $realm;
    private $clientId;
    private $clientSecret;

    public function __construct(
        string $keycloakUrl,
        string $realm,
        string $clientId,
        string $clientSecret
    ) {
        $this->keycloakUrl = rtrim($keycloakUrl, '/');
        $this->realm = $realm;
        $this->clientId = $clientId;
        $this->clientSecret = $clientSecret;
    }

    public function getServiceAccountToken(): ?string
    {
        $url = "{$this->keycloakUrl}/realms/{$this->realm}/protocol/openid-connect/token";

        $ch = curl_init($url);
        curl_setopt_array($ch, [
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_POST => true,
            CURLOPT_POSTFIELDS => http_build_query([
                'grant_type' => 'client_credentials',
                'client_id' => $this->clientId,
                'client_secret' => $this->clientSecret
            ])
        ]);

        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);

        if ($httpCode === 200) {
            $data = json_decode($response, true);
            return $data['access_token'] ?? null;
        }

        return null;
    }

    public function getUserToken(string $username, string $password): ?array
    {
        $url = "{$this->keycloakUrl}/realms/{$this->realm}/protocol/openid-connect/token";

        $ch = curl_init($url);
        curl_setopt_array($ch, [
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_POST => true,
            CURLOPT_POSTFIELDS => http_build_query([
                'grant_type' => 'password',
                'client_id' => $this->clientId,
                'client_secret' => $this->clientSecret,
                'username' => $username,
                'password' => $password,
                'scope' => 'openid profile rabbitmq'
            ])
        ]);

        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);

        if ($httpCode === 200) {
            return json_decode($response, true);
        }

        return null;
    }

    public function introspectToken(string $token): ?array
    {
        $url = "{$this->keycloakUrl}/realms/{$this->realm}/protocol/openid-connect/token/introspect";

        $ch = curl_init($url);
        curl_setopt_array($ch, [
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_POST => true,
            CURLOPT_USERPWD => $this->clientId . ':' . $this->clientSecret,
            CURLOPT_POSTFIELDS => http_build_query([
                'token' => $token
            ])
        ]);

        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);

        if ($httpCode === 200) {
            return json_decode($response, true);
        }

        return null;
    }

    public function createRabbitMQMapper(string $clientId): bool
    {
        $adminToken = $this->getServiceAccountToken();
        
        if (!$adminToken) {
            return false;
        }

        $url = "{$this->keycloakUrl}/admin/realms/{$this->realm}/clients/{$clientId}/protocol-mappers/models";

        $mapper = [
            'name' => 'rabbitmq-permissions',
            'protocol' => 'openid-connect',
            'protocolMapper' => 'oidc-usermodel-attribute-mapper',
            'consentRequired' => false,
            'config' => [
                'userinfo.token.claim' => 'true',
                'user.attribute' => 'rabbitmq_permissions',
                'id.token.claim' => 'true',
                'access.token.claim' => 'true',
                'claim.name' => 'rabbitmq.permissions',
                'jsonType.label' => 'String'
            ]
        ];

        $ch = curl_init($url);
        curl_setopt_array($ch, [
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_POST => true,
            CURLOPT_HTTPHEADER => [
                'Authorization: Bearer ' . $adminToken,
                'Content-Type: application/json'
            ],
            CURLOPT_POSTFIELDS => json_encode($mapper)
        ]);

        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);

        return $httpCode === 201;
    }
}

// 使用示例
$keycloak = new KeycloakRabbitMQIntegration(
    'https://keycloak.example.com',
    'myrealm',
    'rabbitmq-client',
    'client-secret'
);

// 获取服务账户 Token
$token = $keycloak->getServiceAccountToken();
echo "服务账户 Token: {$token}\n";

// 用户登录
$tokenData = $keycloak->getUserToken('john.doe', 'password');
if ($tokenData) {
    echo "用户 Token: {$tokenData['access_token']}\n";
    echo "刷新 Token: {$tokenData['refresh_token']}\n";
}

实际应用场景

场景一:微服务认证

php
<?php

class MicroserviceAuth
{
    private $oauthClient;
    private $tokenStore;

    public function getRabbitMQConnection(): AMQPStreamConnection
    {
        $token = $this->getValidToken();
        
        return new AMQPStreamConnection(
            getenv('RABBITMQ_HOST'),
            (int)getenv('RABBITMQ_PORT'),
            $token,
            '',
            getenv('RABBITMQ_VHOST')
        );
    }

    private function getValidToken(): string
    {
        $cached = $this->tokenStore->get('rabbitmq_token');
        
        if ($cached && $cached['expires_at'] > time() + 300) {
            return $cached['token'];
        }

        $token = $this->oauthClient->getClientCredentialsToken();
        
        $this->tokenStore->set('rabbitmq_token', [
            'token' => $token,
            'expires_at' => time() + 3600
        ]);

        return $token;
    }
}

场景二:用户权限委托

php
<?php

class UserPermissionDelegation
{
    private $jwtGenerator;

    public function delegateQueueAccess(string $userId, string $queueName, int $duration = 3600): string
    {
        return $this->jwtGenerator->generateToken(
            $userId,
            [
                [
                    'vhost' => '/',
                    'resource' => 'queue',
                    'name' => $queueName,
                    'permissions' => ['read', 'write']
                ]
            ],
            $duration
        );
    }

    public function delegateTemporaryAccess(string $userId, array $resources): string
    {
        return $this->jwtGenerator->generateScopedToken(
            $userId,
            '/',
            $resources
        );
    }
}

常见问题与解决方案

问题 1:Token 验证失败

错误信息

OAuth2 token validation failed: invalid signature

解决方案

conf
# 确保签名密钥配置正确
auth_oauth2.signing_keys.default = your-correct-secret-key

# 或使用 JWK Set URL
auth_oauth2.jwk_set_url = https://auth.example.com/.well-known/jwks.json

问题 2:Token 过期

解决方案

php
<?php
// 实现 Token 刷新机制
class TokenRefreshHandler
{
    public function ensureValidToken(): string
    {
        $token = $this->getCachedToken();
        
        if ($this->isTokenExpiringSoon($token)) {
            return $this->refreshToken();
        }
        
        return $token;
    }
    
    private function isTokenExpiringSoon(string $token): bool
    {
        $parts = explode('.', $token);
        $payload = json_decode(base64_decode($parts[1]), true);
        
        return ($payload['exp'] ?? 0) < time() + 300;
    }
}

问题 3:权限不足

解决方案

php
<?php
// 确保 Token 包含正确的权限声明
$token = $generator->generateToken(
    'user-123',
    [
        [
            'vhost' => '/',
            'resource' => 'queue',
            'name' => '.*',
            'permissions' => ['configure', 'write', 'read']
        ]
    ],
    3600,
    ['management', 'monitoring']
);

最佳实践建议

1. Token 生命周期管理

php
<?php

class TokenLifecycleManager
{
    private $cache;
    private $oauthClient;

    public function getToken(): string
    {
        $cached = $this->cache->get('access_token');
        
        if ($cached && $cached['expires_at'] > time() + 60) {
            return $cached['token'];
        }

        if (isset($cached['refresh_token'])) {
            $newToken = $this->oauthClient->refreshToken($cached['refresh_token']);
            if ($newToken) {
                $this->cacheToken($newToken);
                return $newToken['access_token'];
            }
        }

        $newToken = $this->oauthClient->getClientCredentialsToken();
        $this->cacheToken($newToken);
        
        return $newToken;
    }

    private function cacheToken(array $tokenData): void
    {
        $this->cache->set('access_token', [
            'token' => $tokenData['access_token'],
            'refresh_token' => $tokenData['refresh_token'] ?? null,
            'expires_at' => time() + ($tokenData['expires_in'] ?? 3600)
        ]);
    }
}

2. 最小权限原则

php
<?php

class LeastPrivilegeTokenGenerator
{
    public function generateTokenForConsumer(string $queueName): string
    {
        return $this->generator->generateScopedToken(
            'consumer-service',
            '/',
            [
                ['type' => 'queue', 'name' => $queueName, 'permissions' => ['read']]
            ]
        );
    }

    public function generateTokenForProducer(string $exchangeName): string
    {
        return $this->generator->generateScopedToken(
            'producer-service',
            '/',
            [
                ['type' => 'exchange', 'name' => $exchangeName, 'permissions' => ['write']]
            ]
        );
    }
}

安全注意事项

安全警告

  1. 使用 HTTPS:所有 OAuth 2.0 通信必须使用 TLS 加密
  2. 保护客户端密钥:不要在客户端代码中暴露 client_secret
  3. 短期 Token:使用短期有效的 Token(建议 1 小时内)
  4. Token 刷新:实现安全的 Token 刷新机制
  5. 审计日志:记录所有 Token 使用情况

相关链接