Skip to content

ThinkPHP与Workerman结合,通过 websocket 实现即时通讯消息推送

整合方式

使用 Workerman, GatewayWorker时开发者最关心的是如何与现有mvc框架(ThinkPHP Yii laravel等)整合,以下是官方推荐的整合方式。见示意图:

work-with-other-mvc-framework

总体原则

  • 现有mvc框架项目与GatewayWorker独立部署互不干扰
  • 所有的业务逻辑都由http服务post/get到mvc框架中完成
  • 当mvc框架需要向客户端主动推送数据时才在mvc框架中调用Gateway的API GatewayClient完成推送。

在thinkphp中使用

1.安装think-worker扩展

composer require topthink/think-worker

2.新建自定义events处理类

php
<?php
namespace app\common\gatewayWorker;
 
use Exception;
use GatewayWorker\Lib\Gateway;
use think\worker\Application;
use Workerman\Worker;
 
class Events
{
    /**
     * onWorkerStart 事件回调
     * 当businessWorker进程启动时触发。每个进程生命周期内都只会触发一次
     * @access public
     * @param Worker $businessWorker
     * @return void
     */
    public static function onWorkerStart(Worker $businessWorker): void
    {
        $app = new Application;
        $app->initialize();
    }
 
    /**
     * onConnect 事件回调
     * 当客户端连接上gateway进程时(TCP三次握手完毕时)触发
     * @param string $client_id
     * @return void
     */
    public static function onConnect(string $client_id): void
    {
        Gateway::sendToCurrentClient(json_encode(['type'=>'onConnect','client_id'=>$client_id]));
    }
 
    /**
     * onWebSocketConnect 事件回调
     * 当客户端连接上gateway完成websocket握手时触发
     * @param string $client_id 断开连接的客户端client_id
     * @param  mixed $data
     * @return void
     */
    public static function onWebSocketConnect(string $client_id, $data): void
    {
    }
 
    /**
     * onMessage 事件回调
     * 当客户端发来数据(Gateway进程收到数据)后触发
     * @access public
     * @param string $client_id
     * @param mixed $data
     * @return void
     * @throws Exception
     */
    public static function onMessage(string $client_id, mixed $data): void
    {
    }
 
    /**
     * onClose 事件回调 当用户断开连接时触发的方法
     * @param string $client_id 断开连接的客户端client_id
     * @return void
     * @throws Exception
     */
    public static function onClose(string $client_id): void
    {
    }
 
    /**
     * onWorkerStop 事件回调
     * 当businessWorker进程退出时触发。每个进程生命周期内都只会触发一次。
     * @param Worker $businessWorker
     * @return void
     */
    public static function onWorkerStop(Worker $businessWorker)
    {
    }
}

3.修改配置文件 gateway_worker.phpeventHandler 配置项,指定自定义的events处理类

php
'eventHandler' => \app\common\gatewayWorker\Events::class,

4.启动 Gateway

php
php think worker:gateway

5.客户端逻辑代码

javascript
// 这里的ws链接地址,在tp配置文件 gateway_worker.php 中
ws = new WebSocket("ws://127.0.0.1:2348");
// 服务端主动推送消息时会触发这里的onmessage
ws.onmessage = function (e) {
    // json数据转换成js对象
    let data = JSON.parse(e.data);
    // 获取类型
    let type = data.type || '';
    switch (type) {
        // Events.php中返回的onConnect类型的消息,将client_id发给后台进行uid绑定
        case 'onConnect':
            // 利用jquery发起ajax请求,将client_id发给后端进行uid绑定
            $.post('/index/bind', {client_id: data.client_id}, function (data) {
            }, 'json');
            break;
        // 当mvc框架调用GatewayClient发消息时直接alert出来
        default :
            alert(e.data);
    }
};

6.clientId与用户ID的绑定

假设前提:用户已经登录成功可以拿到userID,然后通过前端html页面使用ws协议 连接后端ws服务,拿到client_id后,使用http请求,发送clientId后端接口,进行绑定

!>📢 注意:以下代码只是为了api功能演示 ,正常的业务逻辑应该是由前端通过API接口获取一个绑定口令,再通过ws发送口令至后端完成绑定,防止前端发送错误的clientId至后端进行绑定

php
public function bind()
{
    // 前端通过链接ws获得的client_id
    $client_id = input('client_id');
    // 假设用户ID为1
    $userId = 1;
    // 假设用户加入了群组:Group:123
    $userGroup = 'group:123';
    // client_id 与 已登录用户的 userid 绑定
    \GatewayWorker\Lib\Gateway::bindUid($client_id, $userId);
    // 加入某个群组(可调用多次加入多个群组)
    \GatewayWorker\Lib\Gateway::joinGroup($client_id, $userGroup);
}

7.进行消息推送

http服务器进行ws消息推送,前提是用户ID与clienId绑定成功

php
public function push()
{
    // 假设用户ID为1
    $userId = 1;
    // 单个群组:Group:123
    $userGroup = 'group:123';
    // 多个群组
    $userGroups = ['group:123','group:456'];
 
    // 发送个人消息
    \GatewayWorker\Lib\Gateway::sendToUid($userId, json_encode([
        'type'=>'sendToUid',
        'time'=>time(),
    ]));
     
    // 向单个群组的网站页面发送数据
    \GatewayWorker\Lib\Gateway::sendToGroup($userGroup, json_encode([
        'type'=>'sendToGroup',
        'time'=>time(),
    ]));
 
    // 向多个群组的网站页面发送数据
    \GatewayWorker\Lib\Gateway::sendToGroup($userGroups, json_encode([
        'type'=>'sendToGroup',
        'time'=>time(),
    ]));
 
    // 向所有用户发送消息
    \GatewayWorker\Lib\Gateway::sendToAll(json_encode([
        'type'=>'sendToAll',
        'time'=>time(),
    ]));
}

8.断线重连

  • 在 Events 类中,监听 onClose 断开连接回调,当用户断开链接时清理userId与clientId之间的关系以及用户断连的其他业务逻辑处理
  • 客户端监听断开事件,重新发起http请求至绑定接口进行重链