Skip to content

Web STOMP 插件

概述

Web STOMP 插件(rabbitmq_web_stomp)是 RabbitMQ 的一个重要插件,它允许通过 WebSocket 协议在浏览器中直接与 RabbitMQ 进行消息交互。这使得构建实时 Web 应用变得更加简单,无需编写复杂的后端消息代理代码。

核心知识点

Web STOMP 架构

mermaid
graph TB
    subgraph 浏览器
        WS[WebSocket 客户端]
        STOMP[STOMP.js]
    end

    subgraph Web STOMP 插件
        WSHandler[WebSocket 处理器]
        STOMPHandler[STOMP 协议转换]
    end

    subgraph RabbitMQ
        AMQP[AMQP 核心]
        Exchange[Exchange]
        Queue[Queue]
    end

    WS --> STOMP
    STOMP --> WSHandler
    WSHandler --> STOMPHandler
    STOMPHandler --> AMQP
    AMQP --> Exchange
    Exchange --> Queue

    style WSHandler fill:#e1f5fe
    style STOMPHandler fill:#e1f5fe

工作流程

mermaid
sequenceDiagram
    participant B as 浏览器
    participant W as Web STOMP
    participant R as RabbitMQ
    participant Q as 队列

    B->>W: CONNECT (WebSocket)
    W->>R: 验证连接
    R-->>W: CONNECTED
    W-->>B: CONNECTED

    B->>W: SUBSCRIBE /queue/test
    W->>R: 声明队列
    R-->>W: 队列信息
    W-->>B: 确认订阅

    B->>W: SEND /queue/test
    W->>R: 发布消息
    R->>Q: 存入队列

    Q->>R: 投递消息
    R-->>W: MESSAGE
    W-->>B: MESSAGE (WebSocket)

核心概念

概念说明
STOMP面向消息的简单文本协议
WebSocket浏览器双向通信协议
/queue/*队列目的地前缀
/exchange/*交换器目的地前缀

PHP 代码示例

安装和配置 Web STOMP 插件

php
<?php

class WebStompPluginInstaller
{
    public function install()
    {
        echo "安装 Web STOMP 插件...\n";

        $commands = [
            'rabbitmq-plugins enable rabbitmq_web_stomp',
            'systemctl restart rabbitmq-server'
        ];

        foreach ($commands as $command) {
            exec($command, $output, $returnCode);

            if ($returnCode !== 0) {
                throw new RuntimeException("命令执行失败: {$command}");
            }
        }

        echo "Web STOMP 插件安装完成\n";
    }

    public function isEnabled()
    {
        exec('rabbitmq-plugins list -e', $output);

        foreach ($output as $line) {
            if (strpos($line, 'rabbitmq_web_stomp') !== false) {
                return true;
            }
        }

        return false;
    }

    public function configure($port = 15674, $wsPort = 15674)
    {
        $config = <<<CONFIG
web_stomp.listeners.http.default = $port
web_stomp.ws_frame = text
web_stomp.listeners.ws.default = $wsPort
CONFIG;

        file_put_contents(
            '/etc/rabbitmq/rabbitmq.conf',
            $config,
            FILE_APPEND
        );

        echo "Web STOMP 配置已更新\n";
    }
}

WebSocket 客户端实现(JavaScript)

javascript
// 注意:这是 JavaScript 代码,运行在浏览器中

class RabbitMQWebClient {
    constructor(url, username = 'guest', password = 'guest') {
        this.url = url;
        this.username = username;
        this.password = password;
        this.client = null;
        this.subscriptions = new Map();
        this.messageHandlers = new Map();
    }

    connect() {
        return new Promise((resolve, reject) => {
            this.client = Stomp.client(this.url);

            this.client.connect(
                this.username,
                this.password,
                (frame) => {
                    console.log('Connected:', frame);
                    resolve(frame);
                },
                (error) => {
                    console.error('Connection error:', error);
                    reject(error);
                }
            );
        });
    }

    disconnect() {
        if (this.client) {
            this.client.disconnect();
        }
    }

    subscribe(queue, callback, id = null) {
        const subscriptionId = id || `sub-${Date.now()}`;

        const subscription = this.client.subscribe(queue, (message) => {
            const body = JSON.parse(message.body);
            callback(body, message);
        }, { id: subscriptionId });

        this.subscriptions.set(subscriptionId, subscription);
        this.messageHandlers.set(queue, callback);

        return subscriptionId;
    }

    unsubscribe(subscriptionId) {
        const subscription = this.subscriptions.get(subscriptionId);
        if (subscription) {
            subscription.unsubscribe();
            this.subscriptions.delete(subscriptionId);
        }
    }

    send(queue, body, headers = {}) {
        this.client.send(queue, headers, JSON.stringify(body));
    }

    sendToQueue(queueName, message) {
        this.send(`/queue/${queueName}`, message);
    }

    sendToExchange(exchangeName, routingKey, message) {
        this.send(`/exchange/${exchangeName}/${routingKey}`, message);
    }
}

Web STOMP PHP 客户端

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class StompWebPublisher
{
    private $connection;
    private $channel;
    private $exchangeName = 'stomp_exchange';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'topic',
            false,
            true,
            false
        );
    }

    public function publishToWebClient($userId, $message)
    {
        $msg = new AMQPMessage(
            json_encode($message),
            [
                'content_type' => 'application/json',
                'delivery_mode' => 2,
                'user_id' => $userId
            ]
        );

        $this->channel->basic_publish(
            $msg,
            $this->exchangeName,
            "user.{$userId}"
        );

        echo "消息已推送给用户 {$userId}\n";
    }

    public function broadcast($message)
    {
        $msg = new AMQPMessage(
            json_encode($message),
            [
                'content_type' => 'application/json',
                'delivery_mode' => 2
            ]
        );

        $this->channel->basic_publish(
            $msg,
            $this->exchangeName,
            'broadcast'
        );

        echo "广播消息已发送\n";
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

实时通知系统

php
<?php

class RealTimeNotificationSystem
{
    private $publisher;

    const NOTIFICATION_TYPE_INFO = 'info';
    const NOTIFICATION_TYPE_SUCCESS = 'success';
    const NOTIFICATION_TYPE_WARNING = 'warning';
    const NOTIFICATION_TYPE_ERROR = 'error';

    public function __construct()
    {
        $this->publisher = new StompWebPublisher();
    }

    public function sendToUser($userId, $type, $title, $message, $data = [])
    {
        $notification = [
            'id' => uniqid('notif_'),
            'type' => $type,
            'title' => $title,
            'message' => $message,
            'data' => $data,
            'timestamp' => time()
        ];

        $this->publisher->publishToWebClient($userId, [
            'event' => 'notification',
            'notification' => $notification
        ]);
    }

    public function sendToAll($type, $title, $message, $data = [])
    {
        $notification = [
            'id' => uniqid('notif_'),
            'type' => $type,
            'title' => $title,
            'message' => $message,
            'data' => $data,
            'timestamp' => time()
        ];

        $this->publisher->broadcast([
            'event' => 'notification',
            'notification' => $notification
        ]);
    }

    public function info($userId, $title, $message, $data = [])
    {
        $this->sendToUser($userId, self::NOTIFICATION_TYPE_INFO, $title, $message, $data);
    }

    public function success($userId, $title, $message, $data = [])
    {
        $this->sendToUser($userId, self::NOTIFICATION_TYPE_SUCCESS, $title, $message, $data);
    }

    public function warning($userId, $title, $message, $data = [])
    {
        $this->sendToUser($userId, self::NOTIFICATION_TYPE_WARNING, $title, $message, $data);
    }

    public function error($userId, $title, $message, $data = [])
    {
        $this->sendToUser($userId, self::NOTIFICATION_TYPE_ERROR, $title, $message, $data);
    }
}

在线用户状态管理

php
<?php

class OnlineUserManager
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new StompWebPublisher();
    }

    public function userOnline($userId)
    {
        $this->publishUserStatus($userId, 'online', [
            'online_at' => time(),
            'device' => 'web'
        ]);
    }

    public function userOffline($userId)
    {
        $this->publishUserStatus($userId, 'offline', [
            'offline_at' => time()
        ]);
    }

    public function userActivity($userId, $activity)
    {
        $this->publishUserStatus($userId, 'activity', [
            'activity' => $activity,
            'timestamp' => time()
        ]);
    }

    private function publishUserStatus($userId, $status, $data)
    {
        $this->publisher->publishToWebClient($userId, [
            'event' => 'user_status',
            'user_id' => $userId,
            'status' => $status,
            'data' => $data
        ]);
    }

    public function broadcastOnlineUsers($userIds)
    {
        $this->publisher->broadcast([
            'event' => 'online_users',
            'user_ids' => $userIds,
            'count' => count($userIds)
        ]);
    }
}

实际应用场景

1. 实时聊天系统

php
<?php

class ChatMessageBroker
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new StompWebPublisher();
    }

    public function sendMessage($fromUserId, $toUserId, $message)
    {
        $chatMessage = [
            'id' => uniqid('msg_'),
            'from' => $fromUserId,
            'to' => $toUserId,
            'message' => $message,
            'timestamp' => time(),
            'status' => 'sent'
        ];

        $this->publisher->publishToWebClient($toUserId, [
            'event' => 'chat_message',
            'message' => $chatMessage
        ]);

        return $chatMessage;
    }

    public function sendGroupMessage($fromUserId, $groupId, $message)
    {
        $chatMessage = [
            'id' => uniqid('msg_'),
            'from' => $fromUserId,
            'group_id' => $groupId,
            'message' => $message,
            'timestamp' => time()
        ];

        $this->publisher->publishToWebClient($groupId, [
            'event' => 'group_message',
            'message' => $chatMessage
        ]);
    }

    public function sendTypingIndicator($fromUserId, $toUserId)
    {
        $this->publisher->publishToWebClient($toUserId, [
            'event' => 'typing',
            'from' => $fromUserId,
            'is_typing' => true
        ]);
    }
}

2. 实时数据看板

php
<?php

class DashboardPublisher
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new StompWebPublisher();
    }

    public function publishMetrics($metrics)
    {
        $this->publisher->broadcast([
            'event' => 'metrics_update',
            'metrics' => $metrics,
            'timestamp' => time()
        ]);
    }

    public function publishAlert($alert)
    {
        $this->publisher->broadcast([
            'event' => 'alert',
            'alert' => $alert,
            'timestamp' => time()
        ]);
    }

    public function publishNotification($userId, $notification)
    {
        $this->publisher->publishToWebClient($userId, [
            'event' => 'notification',
            'notification' => $notification,
            'timestamp' => time()
        ]);
    }
}

3. 实时协作编辑

php
<?php

class CollaborativeEditBroker
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new StompWebPublisher();
    }

    public function broadcastEdit($documentId, $userId, $operation)
    {
        $this->publisher->publishToWebClient("doc_{$documentId}", [
            'event' => 'document_edit',
            'document_id' => $documentId,
            'user_id' => $userId,
            'operation' => $operation,
            'timestamp' => time()
        ]);
    }

    public function notifyCursorPosition($documentId, $userId, $position)
    {
        $this->publisher->publishToWebClient("doc_{$documentId}", [
            'event' => 'cursor_update',
            'document_id' => $documentId,
            'user_id' => $userId,
            'position' => $position,
            'timestamp' => time()
        ]);
    }
}

常见问题与解决方案

问题 1:WebSocket 连接失败

原因:端口未开放或配置错误

解决方案

php
<?php

class WebStompConnectionTester
{
    private $host = 'localhost';
    private $port = 15674;

    public function testConnection()
    {
        $socket = @fsockopen($this->host, $this->port, $errno, $errstr, 5);

        if (!$socket) {
            return [
                'success' => false,
                'error' => "无法连接到 {$this->host}:{$this->port} - {$errstr}"
            ];
        }

        fclose($socket);

        return ['success' => true];
    }

    public function getServerInfo()
    {
        $response = $this->makeHttpRequest("/api/overview");

        return json_decode($response, true);
    }

    private function makeHttpRequest($endpoint)
    {
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, "http://{$this->host}:15672{$endpoint}");
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_USERPWD, "guest:guest");
        curl_setopt($ch, CURLOPT_TIMEOUT, 10);

        $response = curl_exec($ch);
        curl_close($ch);

        return $response;
    }
}

问题 2:消息丢失

原因:网络不稳定或消费者处理失败

解决方案

javascript
class ReliableWebClient extends RabbitMQWebClient {
    constructor(url, username, password) {
        super(url, username, password);
        this.pendingMessages = [];
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
    }

    async connect() {
        try {
            await super.connect();
            this.reconnectAttempts = 0;
            await this.resubscribe();
            await this.replayPendingMessages();
        } catch (error) {
            this.handleReconnect(error);
        }
    }

    handleReconnect(error) {
        if (this.reconnectAttempts < this.maxReconnectAttempts) {
            this.reconnectAttempts++;
            const delay = Math.pow(2, this.reconnectAttempts) * 1000;
            setTimeout(() => this.connect(), delay);
        }
    }

    async resubscribe() {
        for (const [queue, callback] of this.messageHandlers) {
            this.subscribe(queue, callback);
        }
    }

    async replayPendingMessages() {
        for (const msg of this.pendingMessages) {
            this.send(msg.queue, msg.body);
        }
        this.pendingMessages = [];
    }

    send(queue, body, headers = {}) {
        try {
            super.send(queue, body, headers);
        } catch (error) {
            this.pendingMessages.push({ queue, body, headers });
        }
    }
}

问题 3:消息顺序混乱

解决方案

javascript
class OrderedWebClient extends RabbitMQWebClient {
    constructor(url, username, password) {
        super(url, username, password);
        this.messageQueue = [];
        this.processing = false;
    }

    async subscribe(queue, callback, id = null) {
        return super.subscribe(queue, async (message, rawMessage) => {
            this.messageQueue.push({ message, callback, rawMessage });
            await this.processQueue();
        }, id);
    }

    async processQueue() {
        if (this.processing) return;
        this.processing = true;

        while (this.messageQueue.length > 0) {
            const item = this.messageQueue.shift();
            await item.callback(item.message, item.rawMessage);
        }

        this.processing = false;
    }
}

最佳实践建议

1. 认证和授权

php
<?php

class WebStompAuthenticator
{
    public function authenticate($username, $password)
    {
        $user = $this->lookupUser($username);

        if (!$user || !$this->verifyPassword($password, $user['password'])) {
            return false;
        }

        return $user;
    }

    public function authorize($user, $destination)
    {
        $permissions = $this->getUserPermissions($user['id']);

        foreach ($permissions as $permission) {
            if ($this->matchDestination($destination, $permission['destination'])) {
                return $permission['access'];
            }
        }

        return 'deny';
    }

    private function matchDestination($destination, $pattern)
    {
        $pattern = str_replace('*', '.*', $pattern);

        return preg_match("/^{$pattern}$/", $destination) === 1;
    }

    private function lookupUser($username)
    {
        return ['id' => 1, 'username' => $username, 'password' => 'hashed'];
    }

    private function verifyPassword($password, $hashedPassword)
    {
        return password_verify($password, $hashedPassword);
    }

    private function getUserPermissions($userId)
    {
        return [
            ['destination' => '/queue/user.*', 'access' => 'read'],
            ['destination' => '/exchange/notifications.*', 'access' => 'write']
        ];
    }
}

2. 连接管理

php
<?php

class ConnectionPoolManager
{
    private $maxConnections = 100;
    private $activeConnections = [];
    private $connectionTimeout = 300;

    public function acquireConnection($userId)
    {
        $connectionId = $this->getConnectionId($userId);

        if (isset($this->activeConnections[$connectionId])) {
            $conn = $this->activeConnections[$connectionId];

            if ($this->isConnectionValid($conn)) {
                return $conn;
            }

            unset($this->activeConnections[$connectionId]);
        }

        if (count($this->activeConnections) >= $this->maxConnections) {
            $this->cleanupOldConnections();
        }

        $conn = $this->createConnection($userId);
        $this->activeConnections[$connectionId] = $conn;

        return $conn;
    }

    private function createConnection($userId)
    {
        return [
            'user_id' => $userId,
            'created_at' => time(),
            'last_activity' => time()
        ];
    }

    private function isConnectionValid($conn)
    {
        return (time() - $conn['last_activity']) < $this->connectionTimeout;
    }

    private function cleanupOldConnections()
    {
        $now = time();

        foreach ($this->activeConnections as $id => $conn) {
            if (($now - $conn['last_activity']) > $this->connectionTimeout) {
                unset($this->activeConnections[$id]);
            }
        }
    }
}

3. 消息压缩

php
<?php

class MessageCompressor
{
    const COMPRESSION_THRESHOLD = 1024;

    public function compress($data)
    {
        $json = json_encode($data);

        if (strlen($json) > self::COMPRESSION_THRESHOLD) {
            return gzcompress($json);
        }

        return $json;
    }

    public function decompress($data)
    {
        $decompressed = @gzuncompress($data);

        if ($decompressed === false) {
            return json_decode($data, true);
        }

        return json_decode($decompressed, true);
    }
}

相关链接