Appearance
Web STOMP 插件
概述
Web STOMP 插件(rabbitmq_web_stomp)是 RabbitMQ 的一个重要插件,它允许通过 WebSocket 协议在浏览器中直接与 RabbitMQ 进行消息交互。这使得构建实时 Web 应用变得更加简单,无需编写复杂的后端消息代理代码。
核心知识点
Web STOMP 架构
mermaid
graph TB
subgraph 浏览器
WS[WebSocket 客户端]
STOMP[STOMP.js]
end
subgraph Web STOMP 插件
WSHandler[WebSocket 处理器]
STOMPHandler[STOMP 协议转换]
end
subgraph RabbitMQ
AMQP[AMQP 核心]
Exchange[Exchange]
Queue[Queue]
end
WS --> STOMP
STOMP --> WSHandler
WSHandler --> STOMPHandler
STOMPHandler --> AMQP
AMQP --> Exchange
Exchange --> Queue
style WSHandler fill:#e1f5fe
style STOMPHandler fill:#e1f5fe工作流程
mermaid
sequenceDiagram
participant B as 浏览器
participant W as Web STOMP
participant R as RabbitMQ
participant Q as 队列
B->>W: CONNECT (WebSocket)
W->>R: 验证连接
R-->>W: CONNECTED
W-->>B: CONNECTED
B->>W: SUBSCRIBE /queue/test
W->>R: 声明队列
R-->>W: 队列信息
W-->>B: 确认订阅
B->>W: SEND /queue/test
W->>R: 发布消息
R->>Q: 存入队列
Q->>R: 投递消息
R-->>W: MESSAGE
W-->>B: MESSAGE (WebSocket)核心概念
| 概念 | 说明 |
|---|---|
| STOMP | 面向消息的简单文本协议 |
| WebSocket | 浏览器双向通信协议 |
| /queue/* | 队列目的地前缀 |
| /exchange/* | 交换器目的地前缀 |
PHP 代码示例
安装和配置 Web STOMP 插件
php
<?php
class WebStompPluginInstaller
{
public function install()
{
echo "安装 Web STOMP 插件...\n";
$commands = [
'rabbitmq-plugins enable rabbitmq_web_stomp',
'systemctl restart rabbitmq-server'
];
foreach ($commands as $command) {
exec($command, $output, $returnCode);
if ($returnCode !== 0) {
throw new RuntimeException("命令执行失败: {$command}");
}
}
echo "Web STOMP 插件安装完成\n";
}
public function isEnabled()
{
exec('rabbitmq-plugins list -e', $output);
foreach ($output as $line) {
if (strpos($line, 'rabbitmq_web_stomp') !== false) {
return true;
}
}
return false;
}
public function configure($port = 15674, $wsPort = 15674)
{
$config = <<<CONFIG
web_stomp.listeners.http.default = $port
web_stomp.ws_frame = text
web_stomp.listeners.ws.default = $wsPort
CONFIG;
file_put_contents(
'/etc/rabbitmq/rabbitmq.conf',
$config,
FILE_APPEND
);
echo "Web STOMP 配置已更新\n";
}
}WebSocket 客户端实现(JavaScript)
javascript
// 注意:这是 JavaScript 代码,运行在浏览器中
class RabbitMQWebClient {
constructor(url, username = 'guest', password = 'guest') {
this.url = url;
this.username = username;
this.password = password;
this.client = null;
this.subscriptions = new Map();
this.messageHandlers = new Map();
}
connect() {
return new Promise((resolve, reject) => {
this.client = Stomp.client(this.url);
this.client.connect(
this.username,
this.password,
(frame) => {
console.log('Connected:', frame);
resolve(frame);
},
(error) => {
console.error('Connection error:', error);
reject(error);
}
);
});
}
disconnect() {
if (this.client) {
this.client.disconnect();
}
}
subscribe(queue, callback, id = null) {
const subscriptionId = id || `sub-${Date.now()}`;
const subscription = this.client.subscribe(queue, (message) => {
const body = JSON.parse(message.body);
callback(body, message);
}, { id: subscriptionId });
this.subscriptions.set(subscriptionId, subscription);
this.messageHandlers.set(queue, callback);
return subscriptionId;
}
unsubscribe(subscriptionId) {
const subscription = this.subscriptions.get(subscriptionId);
if (subscription) {
subscription.unsubscribe();
this.subscriptions.delete(subscriptionId);
}
}
send(queue, body, headers = {}) {
this.client.send(queue, headers, JSON.stringify(body));
}
sendToQueue(queueName, message) {
this.send(`/queue/${queueName}`, message);
}
sendToExchange(exchangeName, routingKey, message) {
this.send(`/exchange/${exchangeName}/${routingKey}`, message);
}
}Web STOMP PHP 客户端
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class StompWebPublisher
{
private $connection;
private $channel;
private $exchangeName = 'stomp_exchange';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'topic',
false,
true,
false
);
}
public function publishToWebClient($userId, $message)
{
$msg = new AMQPMessage(
json_encode($message),
[
'content_type' => 'application/json',
'delivery_mode' => 2,
'user_id' => $userId
]
);
$this->channel->basic_publish(
$msg,
$this->exchangeName,
"user.{$userId}"
);
echo "消息已推送给用户 {$userId}\n";
}
public function broadcast($message)
{
$msg = new AMQPMessage(
json_encode($message),
[
'content_type' => 'application/json',
'delivery_mode' => 2
]
);
$this->channel->basic_publish(
$msg,
$this->exchangeName,
'broadcast'
);
echo "广播消息已发送\n";
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}实时通知系统
php
<?php
class RealTimeNotificationSystem
{
private $publisher;
const NOTIFICATION_TYPE_INFO = 'info';
const NOTIFICATION_TYPE_SUCCESS = 'success';
const NOTIFICATION_TYPE_WARNING = 'warning';
const NOTIFICATION_TYPE_ERROR = 'error';
public function __construct()
{
$this->publisher = new StompWebPublisher();
}
public function sendToUser($userId, $type, $title, $message, $data = [])
{
$notification = [
'id' => uniqid('notif_'),
'type' => $type,
'title' => $title,
'message' => $message,
'data' => $data,
'timestamp' => time()
];
$this->publisher->publishToWebClient($userId, [
'event' => 'notification',
'notification' => $notification
]);
}
public function sendToAll($type, $title, $message, $data = [])
{
$notification = [
'id' => uniqid('notif_'),
'type' => $type,
'title' => $title,
'message' => $message,
'data' => $data,
'timestamp' => time()
];
$this->publisher->broadcast([
'event' => 'notification',
'notification' => $notification
]);
}
public function info($userId, $title, $message, $data = [])
{
$this->sendToUser($userId, self::NOTIFICATION_TYPE_INFO, $title, $message, $data);
}
public function success($userId, $title, $message, $data = [])
{
$this->sendToUser($userId, self::NOTIFICATION_TYPE_SUCCESS, $title, $message, $data);
}
public function warning($userId, $title, $message, $data = [])
{
$this->sendToUser($userId, self::NOTIFICATION_TYPE_WARNING, $title, $message, $data);
}
public function error($userId, $title, $message, $data = [])
{
$this->sendToUser($userId, self::NOTIFICATION_TYPE_ERROR, $title, $message, $data);
}
}在线用户状态管理
php
<?php
class OnlineUserManager
{
private $publisher;
public function __construct()
{
$this->publisher = new StompWebPublisher();
}
public function userOnline($userId)
{
$this->publishUserStatus($userId, 'online', [
'online_at' => time(),
'device' => 'web'
]);
}
public function userOffline($userId)
{
$this->publishUserStatus($userId, 'offline', [
'offline_at' => time()
]);
}
public function userActivity($userId, $activity)
{
$this->publishUserStatus($userId, 'activity', [
'activity' => $activity,
'timestamp' => time()
]);
}
private function publishUserStatus($userId, $status, $data)
{
$this->publisher->publishToWebClient($userId, [
'event' => 'user_status',
'user_id' => $userId,
'status' => $status,
'data' => $data
]);
}
public function broadcastOnlineUsers($userIds)
{
$this->publisher->broadcast([
'event' => 'online_users',
'user_ids' => $userIds,
'count' => count($userIds)
]);
}
}实际应用场景
1. 实时聊天系统
php
<?php
class ChatMessageBroker
{
private $publisher;
public function __construct()
{
$this->publisher = new StompWebPublisher();
}
public function sendMessage($fromUserId, $toUserId, $message)
{
$chatMessage = [
'id' => uniqid('msg_'),
'from' => $fromUserId,
'to' => $toUserId,
'message' => $message,
'timestamp' => time(),
'status' => 'sent'
];
$this->publisher->publishToWebClient($toUserId, [
'event' => 'chat_message',
'message' => $chatMessage
]);
return $chatMessage;
}
public function sendGroupMessage($fromUserId, $groupId, $message)
{
$chatMessage = [
'id' => uniqid('msg_'),
'from' => $fromUserId,
'group_id' => $groupId,
'message' => $message,
'timestamp' => time()
];
$this->publisher->publishToWebClient($groupId, [
'event' => 'group_message',
'message' => $chatMessage
]);
}
public function sendTypingIndicator($fromUserId, $toUserId)
{
$this->publisher->publishToWebClient($toUserId, [
'event' => 'typing',
'from' => $fromUserId,
'is_typing' => true
]);
}
}2. 实时数据看板
php
<?php
class DashboardPublisher
{
private $publisher;
public function __construct()
{
$this->publisher = new StompWebPublisher();
}
public function publishMetrics($metrics)
{
$this->publisher->broadcast([
'event' => 'metrics_update',
'metrics' => $metrics,
'timestamp' => time()
]);
}
public function publishAlert($alert)
{
$this->publisher->broadcast([
'event' => 'alert',
'alert' => $alert,
'timestamp' => time()
]);
}
public function publishNotification($userId, $notification)
{
$this->publisher->publishToWebClient($userId, [
'event' => 'notification',
'notification' => $notification,
'timestamp' => time()
]);
}
}3. 实时协作编辑
php
<?php
class CollaborativeEditBroker
{
private $publisher;
public function __construct()
{
$this->publisher = new StompWebPublisher();
}
public function broadcastEdit($documentId, $userId, $operation)
{
$this->publisher->publishToWebClient("doc_{$documentId}", [
'event' => 'document_edit',
'document_id' => $documentId,
'user_id' => $userId,
'operation' => $operation,
'timestamp' => time()
]);
}
public function notifyCursorPosition($documentId, $userId, $position)
{
$this->publisher->publishToWebClient("doc_{$documentId}", [
'event' => 'cursor_update',
'document_id' => $documentId,
'user_id' => $userId,
'position' => $position,
'timestamp' => time()
]);
}
}常见问题与解决方案
问题 1:WebSocket 连接失败
原因:端口未开放或配置错误
解决方案:
php
<?php
class WebStompConnectionTester
{
private $host = 'localhost';
private $port = 15674;
public function testConnection()
{
$socket = @fsockopen($this->host, $this->port, $errno, $errstr, 5);
if (!$socket) {
return [
'success' => false,
'error' => "无法连接到 {$this->host}:{$this->port} - {$errstr}"
];
}
fclose($socket);
return ['success' => true];
}
public function getServerInfo()
{
$response = $this->makeHttpRequest("/api/overview");
return json_decode($response, true);
}
private function makeHttpRequest($endpoint)
{
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://{$this->host}:15672{$endpoint}");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_USERPWD, "guest:guest");
curl_setopt($ch, CURLOPT_TIMEOUT, 10);
$response = curl_exec($ch);
curl_close($ch);
return $response;
}
}问题 2:消息丢失
原因:网络不稳定或消费者处理失败
解决方案:
javascript
class ReliableWebClient extends RabbitMQWebClient {
constructor(url, username, password) {
super(url, username, password);
this.pendingMessages = [];
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
}
async connect() {
try {
await super.connect();
this.reconnectAttempts = 0;
await this.resubscribe();
await this.replayPendingMessages();
} catch (error) {
this.handleReconnect(error);
}
}
handleReconnect(error) {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.pow(2, this.reconnectAttempts) * 1000;
setTimeout(() => this.connect(), delay);
}
}
async resubscribe() {
for (const [queue, callback] of this.messageHandlers) {
this.subscribe(queue, callback);
}
}
async replayPendingMessages() {
for (const msg of this.pendingMessages) {
this.send(msg.queue, msg.body);
}
this.pendingMessages = [];
}
send(queue, body, headers = {}) {
try {
super.send(queue, body, headers);
} catch (error) {
this.pendingMessages.push({ queue, body, headers });
}
}
}问题 3:消息顺序混乱
解决方案:
javascript
class OrderedWebClient extends RabbitMQWebClient {
constructor(url, username, password) {
super(url, username, password);
this.messageQueue = [];
this.processing = false;
}
async subscribe(queue, callback, id = null) {
return super.subscribe(queue, async (message, rawMessage) => {
this.messageQueue.push({ message, callback, rawMessage });
await this.processQueue();
}, id);
}
async processQueue() {
if (this.processing) return;
this.processing = true;
while (this.messageQueue.length > 0) {
const item = this.messageQueue.shift();
await item.callback(item.message, item.rawMessage);
}
this.processing = false;
}
}最佳实践建议
1. 认证和授权
php
<?php
class WebStompAuthenticator
{
public function authenticate($username, $password)
{
$user = $this->lookupUser($username);
if (!$user || !$this->verifyPassword($password, $user['password'])) {
return false;
}
return $user;
}
public function authorize($user, $destination)
{
$permissions = $this->getUserPermissions($user['id']);
foreach ($permissions as $permission) {
if ($this->matchDestination($destination, $permission['destination'])) {
return $permission['access'];
}
}
return 'deny';
}
private function matchDestination($destination, $pattern)
{
$pattern = str_replace('*', '.*', $pattern);
return preg_match("/^{$pattern}$/", $destination) === 1;
}
private function lookupUser($username)
{
return ['id' => 1, 'username' => $username, 'password' => 'hashed'];
}
private function verifyPassword($password, $hashedPassword)
{
return password_verify($password, $hashedPassword);
}
private function getUserPermissions($userId)
{
return [
['destination' => '/queue/user.*', 'access' => 'read'],
['destination' => '/exchange/notifications.*', 'access' => 'write']
];
}
}2. 连接管理
php
<?php
class ConnectionPoolManager
{
private $maxConnections = 100;
private $activeConnections = [];
private $connectionTimeout = 300;
public function acquireConnection($userId)
{
$connectionId = $this->getConnectionId($userId);
if (isset($this->activeConnections[$connectionId])) {
$conn = $this->activeConnections[$connectionId];
if ($this->isConnectionValid($conn)) {
return $conn;
}
unset($this->activeConnections[$connectionId]);
}
if (count($this->activeConnections) >= $this->maxConnections) {
$this->cleanupOldConnections();
}
$conn = $this->createConnection($userId);
$this->activeConnections[$connectionId] = $conn;
return $conn;
}
private function createConnection($userId)
{
return [
'user_id' => $userId,
'created_at' => time(),
'last_activity' => time()
];
}
private function isConnectionValid($conn)
{
return (time() - $conn['last_activity']) < $this->connectionTimeout;
}
private function cleanupOldConnections()
{
$now = time();
foreach ($this->activeConnections as $id => $conn) {
if (($now - $conn['last_activity']) > $this->connectionTimeout) {
unset($this->activeConnections[$id]);
}
}
}
}3. 消息压缩
php
<?php
class MessageCompressor
{
const COMPRESSION_THRESHOLD = 1024;
public function compress($data)
{
$json = json_encode($data);
if (strlen($json) > self::COMPRESSION_THRESHOLD) {
return gzcompress($json);
}
return $json;
}
public function decompress($data)
{
$decompressed = @gzuncompress($data);
if ($decompressed === false) {
return json_decode($data, true);
}
return json_decode($decompressed, true);
}
}