Appearance
RabbitMQ OAuth 2.0 认证
概述
OAuth 2.0 是现代应用中广泛使用的授权框架,RabbitMQ 通过 rabbitmq_auth_backend_oauth2 插件支持 OAuth 2.0 认证。这使得 RabbitMQ 能够与 Keycloak、Auth0、Okta 等身份提供商集成,实现现代化的身份认证和授权管理。
核心知识点
OAuth 2.0 认证架构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │────▶│ RabbitMQ │────▶│ OAuth │
│ (应用程序) │◀────│ Server │◀────│ Provider │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
│ │ │
│ 1. 获取 Token │ │
│─────────────────────────────────────▶│
│ │ │
│ 2. 返回 JWT │ │
│◀─────────────────────────────────────│
│ │ │
│ 3. 连接 RabbitMQ (携带 Token) │
│─────────────────▶│ │
│ │ 4. 验证 Token │
│ │───────────────────▶│
│ │ 5. 返回用户信息 │
│ │◀───────────────────│
│ 6. 连接建立 │ │
│◀─────────────────│ │
└──────────────────┴────────────────────┘JWT Token 结构
RabbitMQ OAuth 2.0 使用 JWT (JSON Web Token) 进行认证:
┌─────────────────────────────────────────────────────────────┐
│ JWT Token │
├─────────────────────────────────────────────────────────────┤
│ Header │
│ { │
│ "alg": "RS256", │
│ "typ": "JWT", │
│ "kid": "key-id" │
│ } │
├─────────────────────────────────────────────────────────────┤
│ Payload (Claims) │
│ { │
│ "sub": "user-123", │
│ "iss": "https://auth.example.com", │
│ "aud": "rabbitmq", │
│ "exp": 1234567890, │
│ "scope": "rabbitmq.read:* rabbitmq.write:*", │
│ "rabbitmq": { │
│ "vhost": "/", │
│ "resource": "queue", │
│ "tags": ["management"] │
│ } │
│ } │
├─────────────────────────────────────────────────────────────┤
│ Signature │
│ HMACSHA256( │
│ base64UrlEncode(header) + "." + │
│ base64UrlEncode(payload), │
│ secret │
│ ) │
└─────────────────────────────────────────────────────────────┘支持的 OAuth 2.0 提供商
| 提供商 | 类型 | 说明 |
|---|---|---|
| Keycloak | 开源 | Red Hat 开源身份管理 |
| Auth0 | 商业 | 通用身份平台 |
| Okta | 商业 | 企业身份管理 |
| Azure AD | 商业 | 微软企业目录 |
| Google OAuth | 商业 | Google 身份服务 |
配置示例
安装 OAuth 2.0 插件
bash
# 启用 OAuth 2.0 认证插件
rabbitmq-plugins enable rabbitmq_auth_backend_oauth2
# 验证插件已启用
rabbitmq-plugins list | grep oauth2基础配置
在 rabbitmq.conf 中配置 OAuth 2.0:
conf
# 启用 OAuth 2.0 认证后端
auth_backends.1 = oauth2
# OAuth 2.0 提供商配置
auth_oauth2.resource_server_id = rabbitmq
# JWT 签名验证密钥(对称密钥)
auth_oauth2.signing_keys.default = your-secret-key-here
# 或使用 JWK Set URL(推荐)
auth_oauth2.jwk_set_url = https://auth.example.com/.well-known/jwks.json
# Token 端点
auth_oauth2.token_endpoint = https://auth.example.com/oauth/token
# Issuer URL
auth_oauth2.issuer = https://auth.example.com
# 资源服务器 ID
auth_oauth2.resource_server_id = rabbitmq-apiKeycloak 配置
conf
# Keycloak 配置示例
auth_oauth2.resource_server_id = rabbitmq
auth_oauth2.issuer = https://keycloak.example.com/realms/myrealm
auth_oauth2.jwk_set_url = https://keycloak.example.com/realms/myrealm/protocol/openid-connect/certs
# 可选:配置客户端 ID
auth_oauth2.client_id = rabbitmq-client
# 可选:配置作用域
auth_oauth2.scope = openid profile rabbitmqAuth0 配置
conf
# Auth0 配置示例
auth_oauth2.resource_server_id = rabbitmq-api
auth_oauth2.issuer = https://your-tenant.auth0.com/
auth_oauth2.jwk_set_url = https://your-tenant.auth0.com/.well-known/jwks.json
# Auth0 API 配置
auth_oauth2.audience = https://rabbitmq.example.comAzure AD 配置
conf
# Azure AD 配置示例
auth_oauth2.resource_server_id = api://rabbitmq-api
auth_oauth2.issuer = https://login.microsoftonline.com/{tenant-id}/v2.0
auth_oauth2.jwk_set_url = https://login.microsoftonline.com/{tenant-id}/discovery/v2.0/keys
# Azure AD 特定配置
auth_oauth2.azure_tenant_id = your-tenant-id高级配置
conf
# Token 验证配置
auth_oauth2.verify_aud = true
auth_oauth2.verify_iss = true
# 时钟偏差容忍(秒)
auth_oauth2.clock_skew = 60
# 默认 vhost
auth_oauth2.default_vhost = /
# 默认权限
auth_oauth2.default_permissions.configure = .*
auth_oauth2.default_permissions.write = .*
auth_oauth2.default_permissions.read = .*
# 作用域前缀
auth_oauth2.scope_prefix = rabbitmq.
# 静态作用域映射
auth_oauth2.scope_aliases.administrator = rabbitmq.tag:administrator
auth_oauth2.scope_aliases.monitoring = rabbitmq.tag:monitoring
auth_oauth2.scope_aliases.management = rabbitmq.tag:managementPHP 代码示例
OAuth 2.0 客户端
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
class OAuth2RabbitMQClient
{
private $oauthConfig;
private $rabbitmqConfig;
private $tokenCache = [];
public function __construct(array $oauthConfig, array $rabbitmqConfig)
{
$this->oauthConfig = $oauthConfig;
$this->rabbitmqConfig = $rabbitmqConfig;
}
public function connectWithClientCredentials(): ?AMQPStreamConnection
{
$token = $this->getClientCredentialsToken();
if (!$token) {
throw new RuntimeException('获取访问令牌失败');
}
return $this->connectWithToken($token);
}
public function connectWithPassword(string $username, string $password): ?AMQPStreamConnection
{
$token = $this->getPasswordGrantToken($username, $password);
if (!$token) {
throw new RuntimeException('获取访问令牌失败');
}
return $this->connectWithToken($token);
}
public function connectWithToken(string $token): ?AMQPStreamConnection
{
try {
return new AMQPStreamConnection(
$this->rabbitmqConfig['host'],
$this->rabbitmqConfig['port'],
$token,
'',
$this->rabbitmqConfig['vhost'] ?? '/'
);
} catch (Exception $e) {
throw new RuntimeException('RabbitMQ 连接失败: ' . $e->getMessage());
}
}
private function getClientCredentialsToken(): ?string
{
$cacheKey = 'client_credentials';
if (isset($this->tokenCache[$cacheKey])) {
$cached = $this->tokenCache[$cacheKey];
if ($cached['expires_at'] > time() + 60) {
return $cached['token'];
}
}
$response = $this->requestToken([
'grant_type' => 'client_credentials',
'client_id' => $this->oauthConfig['client_id'],
'client_secret' => $this->oauthConfig['client_secret'],
'scope' => $this->oauthConfig['scope'] ?? 'rabbitmq.read:* rabbitmq.write:*'
]);
if ($response && isset($response['access_token'])) {
$this->tokenCache[$cacheKey] = [
'token' => $response['access_token'],
'expires_at' => time() + ($response['expires_in'] ?? 3600)
];
return $response['access_token'];
}
return null;
}
private function getPasswordGrantToken(string $username, string $password): ?string
{
$response = $this->requestToken([
'grant_type' => 'password',
'client_id' => $this->oauthConfig['client_id'],
'client_secret' => $this->oauthConfig['client_secret'],
'username' => $username,
'password' => $password,
'scope' => $this->oauthConfig['scope'] ?? 'openid profile rabbitmq'
]);
return $response['access_token'] ?? null;
}
private function requestToken(array $params): ?array
{
$ch = curl_init($this->oauthConfig['token_url']);
curl_setopt_array($ch, [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => http_build_query($params),
CURLOPT_HTTPHEADER => [
'Content-Type: application/x-www-form-urlencoded',
'Accept: application/json'
]
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode === 200) {
return json_decode($response, true);
}
error_log("Token request failed: HTTP {$httpCode} - {$response}");
return null;
}
public function refreshToken(string $refreshToken): ?string
{
$response = $this->requestToken([
'grant_type' => 'refresh_token',
'client_id' => $this->oauthConfig['client_id'],
'client_secret' => $this->oauthConfig['client_secret'],
'refresh_token' => $refreshToken
]);
return $response['access_token'] ?? null;
}
}
// 使用示例
$client = new OAuth2RabbitMQClient(
[
'token_url' => 'https://auth.example.com/oauth/token',
'client_id' => 'rabbitmq-client',
'client_secret' => 'client-secret',
'scope' => 'rabbitmq.read:* rabbitmq.write:*'
],
[
'host' => 'localhost',
'port' => 5672,
'vhost' => '/'
]
);
// 使用客户端凭证连接
$connection = $client->connectWithClientCredentials();
echo "OAuth 2.0 认证连接成功\n";JWT Token 生成器
php
<?php
class RabbitMQJWTGenerator
{
private $secretKey;
private $issuer;
private $audience;
private $algorithm = 'HS256';
public function __construct(string $secretKey, string $issuer, string $audience = 'rabbitmq')
{
$this->secretKey = $secretKey;
$this->issuer = $issuer;
$this->audience = $audience;
}
public function generateToken(
string $subject,
array $permissions,
int $expiresIn = 3600,
array $tags = []
): string {
$now = time();
$header = [
'alg' => $this->algorithm,
'typ' => 'JWT'
];
$payload = [
'sub' => $subject,
'iss' => $this->issuer,
'aud' => $this->audience,
'iat' => $now,
'exp' => $now + $expiresIn,
'jti' => bin2hex(random_bytes(16))
];
// 添加 RabbitMQ 特定声明
$rabbitmqClaims = [];
if (!empty($permissions)) {
$rabbitmqClaims['permissions'] = $permissions;
}
if (!empty($tags)) {
$rabbitmqClaims['tags'] = $tags;
}
if (!empty($rabbitmqClaims)) {
$payload['rabbitmq'] = $rabbitmqClaims;
}
// 编码 Header 和 Payload
$headerEncoded = $this->base64UrlEncode(json_encode($header));
$payloadEncoded = $this->base64UrlEncode(json_encode($payload));
// 生成签名
$signature = $this->sign($headerEncoded . '.' . $payloadEncoded);
$signatureEncoded = $this->base64UrlEncode($signature);
return $headerEncoded . '.' . $payloadEncoded . '.' . $signatureEncoded;
}
public function generateScopedToken(
string $subject,
string $vhost = '/',
array $resources = []
): string {
$scopes = [];
foreach ($resources as $resource) {
$type = $resource['type'] ?? 'queue';
$name = $resource['name'] ?? '*';
$permissions = $resource['permissions'] ?? ['read', 'write', 'configure'];
foreach ($permissions as $perm) {
$scopes[] = "rabbitmq.{$perm}:{$type}/{$vhost}/{$name}";
}
}
$payload = [
'sub' => $subject,
'iss' => $this->issuer,
'aud' => $this->audience,
'iat' => time(),
'exp' => time() + 3600,
'scope' => implode(' ', $scopes)
];
$headerEncoded = $this->base64UrlEncode(json_encode(['alg' => $this->algorithm, 'typ' => 'JWT']));
$payloadEncoded = $this->base64UrlEncode(json_encode($payload));
$signatureEncoded = $this->base64UrlEncode($this->sign($headerEncoded . '.' . $payloadEncoded));
return $headerEncoded . '.' . $payloadEncoded . '.' . $signatureEncoded;
}
private function sign(string $data): string
{
return hash_hmac('sha256', $data, $this->secretKey, true);
}
private function base64UrlEncode(string $data): string
{
return rtrim(strtr(base64_encode($data), '+/', '-_'), '=');
}
public function verifyToken(string $token): ?array
{
$parts = explode('.', $token);
if (count($parts) !== 3) {
return null;
}
[$headerEncoded, $payloadEncoded, $signatureEncoded] = $parts;
// 验证签名
$expectedSignature = $this->base64UrlEncode($this->sign($headerEncoded . '.' . $payloadEncoded));
if (!hash_equals($expectedSignature, $signatureEncoded)) {
return null;
}
// 解码 Payload
$payload = json_decode(base64_decode(strtr($payloadEncoded, '-_', '+/')), true);
// 验证过期时间
if (isset($payload['exp']) && $payload['exp'] < time()) {
return null;
}
return $payload;
}
}
// 使用示例
$generator = new RabbitMQJWTGenerator(
'your-secret-key',
'https://your-app.example.com',
'rabbitmq'
);
// 生成带权限的 Token
$token = $generator->generateToken(
'user-123',
[
['vhost' => '/', 'resource' => 'queue', 'name' => 'myapp_.*', 'permissions' => ['configure', 'write', 'read']]
],
3600,
['management']
);
echo "生成的 Token: {$token}\n";
// 生成带作用域的 Token
$scopedToken = $generator->generateScopedToken(
'service-account',
'/',
[
['type' => 'queue', 'name' => 'orders_.*', 'permissions' => ['read', 'write']],
['type' => 'exchange', 'name' => 'events', 'permissions' => ['write']]
]
);
echo "作用域 Token: {$scopedToken}\n";Keycloak 集成
php
<?php
class KeycloakRabbitMQIntegration
{
private $keycloakUrl;
private $realm;
private $clientId;
private $clientSecret;
public function __construct(
string $keycloakUrl,
string $realm,
string $clientId,
string $clientSecret
) {
$this->keycloakUrl = rtrim($keycloakUrl, '/');
$this->realm = $realm;
$this->clientId = $clientId;
$this->clientSecret = $clientSecret;
}
public function getServiceAccountToken(): ?string
{
$url = "{$this->keycloakUrl}/realms/{$this->realm}/protocol/openid-connect/token";
$ch = curl_init($url);
curl_setopt_array($ch, [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => http_build_query([
'grant_type' => 'client_credentials',
'client_id' => $this->clientId,
'client_secret' => $this->clientSecret
])
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode === 200) {
$data = json_decode($response, true);
return $data['access_token'] ?? null;
}
return null;
}
public function getUserToken(string $username, string $password): ?array
{
$url = "{$this->keycloakUrl}/realms/{$this->realm}/protocol/openid-connect/token";
$ch = curl_init($url);
curl_setopt_array($ch, [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => http_build_query([
'grant_type' => 'password',
'client_id' => $this->clientId,
'client_secret' => $this->clientSecret,
'username' => $username,
'password' => $password,
'scope' => 'openid profile rabbitmq'
])
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode === 200) {
return json_decode($response, true);
}
return null;
}
public function introspectToken(string $token): ?array
{
$url = "{$this->keycloakUrl}/realms/{$this->realm}/protocol/openid-connect/token/introspect";
$ch = curl_init($url);
curl_setopt_array($ch, [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_POST => true,
CURLOPT_USERPWD => $this->clientId . ':' . $this->clientSecret,
CURLOPT_POSTFIELDS => http_build_query([
'token' => $token
])
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode === 200) {
return json_decode($response, true);
}
return null;
}
public function createRabbitMQMapper(string $clientId): bool
{
$adminToken = $this->getServiceAccountToken();
if (!$adminToken) {
return false;
}
$url = "{$this->keycloakUrl}/admin/realms/{$this->realm}/clients/{$clientId}/protocol-mappers/models";
$mapper = [
'name' => 'rabbitmq-permissions',
'protocol' => 'openid-connect',
'protocolMapper' => 'oidc-usermodel-attribute-mapper',
'consentRequired' => false,
'config' => [
'userinfo.token.claim' => 'true',
'user.attribute' => 'rabbitmq_permissions',
'id.token.claim' => 'true',
'access.token.claim' => 'true',
'claim.name' => 'rabbitmq.permissions',
'jsonType.label' => 'String'
]
];
$ch = curl_init($url);
curl_setopt_array($ch, [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_POST => true,
CURLOPT_HTTPHEADER => [
'Authorization: Bearer ' . $adminToken,
'Content-Type: application/json'
],
CURLOPT_POSTFIELDS => json_encode($mapper)
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
return $httpCode === 201;
}
}
// 使用示例
$keycloak = new KeycloakRabbitMQIntegration(
'https://keycloak.example.com',
'myrealm',
'rabbitmq-client',
'client-secret'
);
// 获取服务账户 Token
$token = $keycloak->getServiceAccountToken();
echo "服务账户 Token: {$token}\n";
// 用户登录
$tokenData = $keycloak->getUserToken('john.doe', 'password');
if ($tokenData) {
echo "用户 Token: {$tokenData['access_token']}\n";
echo "刷新 Token: {$tokenData['refresh_token']}\n";
}实际应用场景
场景一:微服务认证
php
<?php
class MicroserviceAuth
{
private $oauthClient;
private $tokenStore;
public function getRabbitMQConnection(): AMQPStreamConnection
{
$token = $this->getValidToken();
return new AMQPStreamConnection(
getenv('RABBITMQ_HOST'),
(int)getenv('RABBITMQ_PORT'),
$token,
'',
getenv('RABBITMQ_VHOST')
);
}
private function getValidToken(): string
{
$cached = $this->tokenStore->get('rabbitmq_token');
if ($cached && $cached['expires_at'] > time() + 300) {
return $cached['token'];
}
$token = $this->oauthClient->getClientCredentialsToken();
$this->tokenStore->set('rabbitmq_token', [
'token' => $token,
'expires_at' => time() + 3600
]);
return $token;
}
}场景二:用户权限委托
php
<?php
class UserPermissionDelegation
{
private $jwtGenerator;
public function delegateQueueAccess(string $userId, string $queueName, int $duration = 3600): string
{
return $this->jwtGenerator->generateToken(
$userId,
[
[
'vhost' => '/',
'resource' => 'queue',
'name' => $queueName,
'permissions' => ['read', 'write']
]
],
$duration
);
}
public function delegateTemporaryAccess(string $userId, array $resources): string
{
return $this->jwtGenerator->generateScopedToken(
$userId,
'/',
$resources
);
}
}常见问题与解决方案
问题 1:Token 验证失败
错误信息:
OAuth2 token validation failed: invalid signature解决方案:
conf
# 确保签名密钥配置正确
auth_oauth2.signing_keys.default = your-correct-secret-key
# 或使用 JWK Set URL
auth_oauth2.jwk_set_url = https://auth.example.com/.well-known/jwks.json问题 2:Token 过期
解决方案:
php
<?php
// 实现 Token 刷新机制
class TokenRefreshHandler
{
public function ensureValidToken(): string
{
$token = $this->getCachedToken();
if ($this->isTokenExpiringSoon($token)) {
return $this->refreshToken();
}
return $token;
}
private function isTokenExpiringSoon(string $token): bool
{
$parts = explode('.', $token);
$payload = json_decode(base64_decode($parts[1]), true);
return ($payload['exp'] ?? 0) < time() + 300;
}
}问题 3:权限不足
解决方案:
php
<?php
// 确保 Token 包含正确的权限声明
$token = $generator->generateToken(
'user-123',
[
[
'vhost' => '/',
'resource' => 'queue',
'name' => '.*',
'permissions' => ['configure', 'write', 'read']
]
],
3600,
['management', 'monitoring']
);最佳实践建议
1. Token 生命周期管理
php
<?php
class TokenLifecycleManager
{
private $cache;
private $oauthClient;
public function getToken(): string
{
$cached = $this->cache->get('access_token');
if ($cached && $cached['expires_at'] > time() + 60) {
return $cached['token'];
}
if (isset($cached['refresh_token'])) {
$newToken = $this->oauthClient->refreshToken($cached['refresh_token']);
if ($newToken) {
$this->cacheToken($newToken);
return $newToken['access_token'];
}
}
$newToken = $this->oauthClient->getClientCredentialsToken();
$this->cacheToken($newToken);
return $newToken;
}
private function cacheToken(array $tokenData): void
{
$this->cache->set('access_token', [
'token' => $tokenData['access_token'],
'refresh_token' => $tokenData['refresh_token'] ?? null,
'expires_at' => time() + ($tokenData['expires_in'] ?? 3600)
]);
}
}2. 最小权限原则
php
<?php
class LeastPrivilegeTokenGenerator
{
public function generateTokenForConsumer(string $queueName): string
{
return $this->generator->generateScopedToken(
'consumer-service',
'/',
[
['type' => 'queue', 'name' => $queueName, 'permissions' => ['read']]
]
);
}
public function generateTokenForProducer(string $exchangeName): string
{
return $this->generator->generateScopedToken(
'producer-service',
'/',
[
['type' => 'exchange', 'name' => $exchangeName, 'permissions' => ['write']]
]
);
}
}安全注意事项
安全警告
- 使用 HTTPS:所有 OAuth 2.0 通信必须使用 TLS 加密
- 保护客户端密钥:不要在客户端代码中暴露 client_secret
- 短期 Token:使用短期有效的 Token(建议 1 小时内)
- Token 刷新:实现安全的 Token 刷新机制
- 审计日志:记录所有 Token 使用情况
