使用 webman + GatewayWorker 实现websocket服务

Time : 2024-01-13 / View : 111

这里使用 webman [ GatewayWorker插件 ] 来实现

创建 webman 项目  如果已经安装过 webman 可直接跳过

composer create-project workerman/webman

进入 webman 目录进行插件安装

composer require webman/gateway-worker

配置及业务目录

配置文件在 config/plugin/webman/gateway-worker/ 目录

业务目录在 plugin/webman/gateway 目录

消息交互数据格式

首先约定客户端与服务端交互的消息体格式

客户端 => 服务端

let msg = {
    // 执行方法 例:user@info
    'cmd':'user@info',
    // 业务数据
    'data':data,
    // 时间戳
    'timestamp': Date.now().toString().substring(0,10)
};
console.debug('您向服务端发送了消息:', msg)
socket.send(JSON.stringify(msg));

服务端 => 客户端

{
    // 消息类型 如 'type':'ping' 代表是心跳检测类型消息
    'type':'ping',
    // 业务数据
    'data':{},
    // 时间戳
    'timestamp': 0
}

webman消息处理

通过业务目录中的 Events.php 文件 onMessage 方法,监听用户发送过的来消息体,进行业务实现

public static function onMessage($client_id, $message)
    {
        // 消息体json转数组
        $message = json_decode($message, true);

        // 判断消息格式是否正确
        if (!is_array($message) and !isset($message['cmd'])) {
            // 消息格式不正确,主动关闭链接
            Gateway::closeClient($client_id);
        }

        // 异常处理
        try {
            // 数据
            $data = $message['data'] ?? [];
            // 登录token
            $token = $message['token'] ?? [];

            // 解析 cmd 字段,得到业务逻辑的控制器与方法
            list($class, $action) = explode('@', $message['cmd']);
            // 控制器类名
            $classNamespace = 'app\\webSocket\\'.ucwords($class);
            // 状态码
            $errorType = 404;
            // 判断控制器是否存在
            if (class_exists($classNamespace)) {
                $class = new $classNamespace;
            } else { // 控制器不存在,直接调用基类
                $class = new WebSocket();
                $errorType = 405;
            }

            // 给控制器属性分配数据
            $class->setData($data)->setClientId($client_id)->setToken($token);
            
            // 方法是否存在
            if (method_exists($class, $action)) {
                // 执行方法
                $class->$action();
            } else {
                // 方法不存在,调用自定义错误方法 
                $class->error($errorType);
            }

        }catch (\Exception $exception) {
            // 出线异常,关闭链接
            Gateway::closeClient($client_id);
        }
    }

websocket 控制器基类

<?php
namespace app\common\base;

use GatewayWorker\Lib\Gateway;

class WebSocket
{
    public $data = [];
    public $token = '';
    public $clientId = '';

    /**
     * 设置数据
     * @param array $data
     * @return $this
     */
    public function setData(array $data): WebSocket
    {
        $this->data = $data;
        return $this;
    }

    /**
     * 设置登录态token
     * @param string $token
     * @return $this
     */
    public function setToken(string $token): WebSocket
    {
        $this->token = $token;
        return $this;
    }

    /**
     * 设置当前客户端ID
     * @param string $clientId
     * @return $this
     */
    public function setClientId(string $clientId): WebSocket
    {
        $this->clientId = $clientId;
        return $this;
    }

    /**
     * 手动返回异常
     * @param int $code 状态码
     * @param string $msg 消息
     * @return void
     */
    public function error(int $code, string $msg = 'error')
    {
        $this->sendToClient('error',[
            'code'=>$code,
            'msg'=>$msg
        ]);
    }

    /**
     * 关闭客户端连接
     * @param string $clientId 客户端ID
     * @return void
     */
    public function closeClient(string $clientId = '')
    {
        if (empty($clientId)) {
            $clientId = $this->clientId;
        }
        Gateway::closeClient($clientId);
    }

    /**
     * 向客户端发送信息
     * @param string $type 消息类型
     * @param array $data 数据
     * @param string $clientId 客户端ID
     * @return void
     */
    public function sendToClient(string $type, array $data, string $clientId = '')
    {
        if (empty($clientId)) {
            $clientId = $this->clientId;
        }

        Gateway::sendToClient($clientId,json_encode([
            'type'=>$type,
            'timestamp'=>time(),
            'data'=>$data,
        ],JSON_UNESCAPED_UNICODE|JSON_UNESCAPED_SLASHES));
    }
}

H5客户端测试代码

<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <title>websocket - 客户端测试</title>
</head>
<body>
<input type="text" id="msg" value="">
<span onclick="push()">发送</span>
<script>javascript方法</script>
</body>
</html>

javascript方法

    // 创建一个Socket实例 注意端口与服务端一致
    socket = new WebSocket('ws://localhost:7272');
    // 打开Socket
    socket.onopen = function(event){
        // 发送一个初始化消息,可以是本地化已登录的token,可用于断线重连后刷新客户端ID
        let cmd = 'index@init'
        let data = {};
        let token = Date.now().toString()
        sendMsg(cmd, data, token)
    }
    // 监听消息
    socket.onmessage = function(event){
        try {
            let msg = JSON.parse(event.data);
            console.debug('收到服务端消息:', msg);
            // 分析服务端消息类型
            switch (msg.type) {
                // 心跳检测
                case 'ping':
                    // 发送指定消息体
                    sendMsg('index@ping',{})
                    break;
                // 其他方法自行实现
                default:
                    // console.log(event);
                    break;
            }
        }catch (e) {
            console.debug('服务端非格式化数据:',event);
        }
    }
    // 监听Socket的关闭
    socket.onclose = function(event){
        console.debug('链接被关闭',event);
    }
    // 关闭Socket....
    //socket.close()};

    // 发送input框类的值到服务端
    function push() {
        let msg = document.getElementById('msg').value;
        sendMsg('index@test',{msg:msg})
    }

    // 发送消息封装
    function sendMsg(cmd, data = {}, token = '') {
        let msg = {
            // 执行方法 例:user@info
            'cmd':cmd,
            // 业务数据
            'data':data,
            // 登录态token
            'token':token,
            // 时间戳
            'timestamp': Date.now().toString().substring(0,10)
        };
        console.debug('您向服务端发送了消息:', msg)
        socket.send(JSON.stringify(msg));
    }