Skip to content

使用 webman + GatewayWorker 实现websocket服务

这里使用 webman GatewayWorker插件 来实现

创建 webman 项目

如果已经安装过 webman 可直接跳过

bash
composer create-project workerman/webman

安装 gateway-worker 插件

进入 webman 目录进行插件安装

bash
composer require webman/gateway-worker

配置及业务目录

  • 配置文件在 config/plugin/webman/gateway-worker/ 目录
  • 业务目录在 plugin/webman/gateway 目录

消息交互数据格式约定

客户端 => 服务端

javascript
let msg = {
    // 执行方法 例:user@info
    'cmd': 'user@info',
    // 业务数据
    'data': data,
    // 时间戳
    'timestamp': Date.now().toString().substring(0, 10)
};
console.debug('您向服务端发送了消息:', msg)
socket.send(JSON.stringify(msg));

服务端 => 客户端

json
{
  "type": "ping",
  "data": {},
  "timestamp": 0
}
  • type:消息类型 如 'type':'ping' 代表是心跳检测类型消息
  • data:业务数据
  • timestamp:时间戳

webman消息处理

通过业务目录中的 Events.php 文件 onMessage 方法,监听用户发送过的来消息体,进行业务实现

php
public static function onMessage($client_id, $message)
{
    // 消息体json转数组
    $message = json_decode($message, true);

    // 判断消息格式是否正确
    if (!is_array($message) and !isset($message['cmd'])) {
        // 消息格式不正确,主动关闭链接
        Gateway::closeClient($client_id);
    }

    // 异常处理
    try {
        // 数据
        $data = $message['data'] ?? [];
        // 登录token
        $token = $message['token'] ?? [];

        // 解析 cmd 字段,得到业务逻辑的控制器与方法
        list($class, $action) = explode('@', $message['cmd']);
        // 控制器类名
        $classNamespace = 'app\\webSocket\\'.ucwords($class);
        // 状态码
        $errorType = 404;
        // 判断控制器是否存在
        if (class_exists($classNamespace)) {
            $class = new $classNamespace;
        } else { // 控制器不存在,直接调用基类
            $class = new WebSocket();
            $errorType = 405;
        }

        // 给控制器属性分配数据
        $class->setData($data)->setClientId($client_id)->setToken($token);
         
        // 方法是否存在
        if (method_exists($class, $action)) {
            // 执行方法
            $class->$action();
        } else {
            // 方法不存在,调用自定义错误方法 
            $class->error($errorType);
        }

    }catch (\Exception $exception) {
        // 出线异常,关闭链接
        Gateway::closeClient($client_id);
    }
}

websocket 控制器基类

php
<?php
namespace app\common\base;
 
use GatewayWorker\Lib\Gateway;
 
class WebSocket
{
    public $data = [];
    public $token = '';
    public $clientId = '';
 
    /**
     * 设置数据
     * @param array $data
     * @return $this
     */
    public function setData(array $data): WebSocket
    {
        $this->data = $data;
        return $this;
    }
 
    /**
     * 设置登录态token
     * @param string $token
     * @return $this
     */
    public function setToken(string $token): WebSocket
    {
        $this->token = $token;
        return $this;
    }
 
    /**
     * 设置当前客户端ID
     * @param string $clientId
     * @return $this
     */
    public function setClientId(string $clientId): WebSocket
    {
        $this->clientId = $clientId;
        return $this;
    }
 
    /**
     * 手动返回异常
     * @param int $code 状态码
     * @param string $msg 消息
     * @return void
     */
    public function error(int $code, string $msg = 'error')
    {
        $this->sendToClient('error',[
            'code'=>$code,
            'msg'=>$msg
        ]);
    }
 
    /**
     * 关闭客户端连接
     * @param string $clientId 客户端ID
     * @return void
     */
    public function closeClient(string $clientId = '')
    {
        if (empty($clientId)) {
            $clientId = $this->clientId;
        }
        Gateway::closeClient($clientId);
    }
 
    /**
     * 向客户端发送信息
     * @param string $type 消息类型
     * @param array $data 数据
     * @param string $clientId 客户端ID
     * @return void
     */
    public function sendToClient(string $type, array $data, string $clientId = '')
    {
        if (empty($clientId)) {
            $clientId = $this->clientId;
        }
 
        Gateway::sendToClient($clientId,json_encode([
            'type'=>$type,
            'timestamp'=>time(),
            'data'=>$data,
        ],JSON_UNESCAPED_UNICODE|JSON_UNESCAPED_SLASHES));
    }
}

H5客户端测试代码

html
<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <title>websocket - 客户端测试</title>
</head>
<body>
<input type="text" id="msg" value="">
<span onclick="push()">发送</span>
<script>
    // 创建一个Socket实例 注意端口与服务端一致
    socket = new WebSocket('ws://localhost:7272');
    // 打开Socket
    socket.onopen = function (event) {
        // 发送一个初始化消息,可以是本地化已登录的token,可用于断线重连后刷新客户端ID
        let cmd = 'index@init'
        let data = {};
        let token = Date.now().toString()
        sendMsg(cmd, data, token)
    }
    // 监听消息
    socket.onmessage = function (event) {
        try {
            let msg = JSON.parse(event.data);
            console.debug('收到服务端消息:', msg);
            // 分析服务端消息类型
            switch (msg.type) {
                    // 心跳检测
                case 'ping':
                    // 发送指定消息体
                    sendMsg('index@ping', {})
                    break;
                    // 其他方法自行实现
                default:
                    // console.log(event);
                    break;
            }
        } catch (e) {
            console.debug('服务端非格式化数据:', event);
        }
    }
    // 监听Socket的关闭
    socket.onclose = function (event) {
        console.debug('链接被关闭', event);
    }
    // 关闭Socket....
    //socket.close()};

    // 发送input框类的值到服务端
    function push() {
        let msg = document.getElementById('msg').value;
        sendMsg('index@test', {msg: msg})
    }

    // 发送消息封装
    function sendMsg(cmd, data = {}, token = '') {
        let msg = {
            // 执行方法 例:user@info
            'cmd': cmd,
            // 业务数据
            'data': data,
            // 登录态token
            'token': token,
            // 时间戳
            'timestamp': Date.now().toString().substring(0, 10)
        };
        console.debug('您向服务端发送了消息:', msg)
        socket.send(JSON.stringify(msg));
    }
</script>
</body>
</html>