V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
echo404
V2EX  ›  PHP

熟悉 swoole 和 socket 编程的大佬能进来帮忙看看这个问题么?

  •  
  •   echo404 · 2019-01-29 13:54:10 +08:00 · 3926 次点击
    这是一个创建于 2130 天前的主题,其中的信息可能已经有所发展或是发生改变。

    1.背景

    我司最近将消息中间件改为了 mqtt,因为 mqtt 的特性(同一个 client_id 不能进行多次连接),导致客户端与 mqtt 服务器之间必须使用一个长连接。一开始我是用队列 while 循环拉取来实现发消息这个操作的。后来闲的蛋疼加上想学点新的东西,就想用 swoole 写个 tcp 服务器,然后就入了坑了.....

    2、问题

    现在,我在每个 task 进程中都启动了一个 mqtt 连接,在压测时,这个 mqtt 连接经常会出现 errno=11 (资源不可用)的错误,我查了下相关资料,这个错误出现的原因是因为 mqtt 连接的 socket 写缓冲区满了,消息无法写进写缓冲区。关于这块我很疑惑,task 进程不应该是个单线程的进程么?这样 mqtt 连接一次只处理一个消息,那为什么还会出现写缓冲区满的错误呢? 另外一个问题就是在测试性能时,我用 swoole 写的这个服务器性能竟然和 while 循环拉取队列的性能差不多,差不多都是每秒发送 380 条消息左右。我都要疯了,究竟是我什么地方写的不对,我现在都怀疑人生了!!!!

    3、代码

    <?php
    namespace swoole;
    
    //如果以守护进程启动后,必须使用绝对地址
    include_once __DIR__."/mqtt/MessageId.php";
    include_once __DIR__."/mqtt/phpMQTT.php";
    
    class TcpServer
    {
        private $serv;
        private $mqtt_config;
        private $mqtt_connect;
        const PROCESS_NAME = 'swoole';
    
        public function __construct()
        {
            //创建 Server 对象,监听 127.0.0.1:9501 端口
            $this->serv = new \swoole_server('0.0.0.0', 9501);
    
            //设置属性
            $this->serv->set(array(
                //守护进程化。设置 daemonize => 1 时,程序将转入后台作为守护进程运行。长时间运行的服务器端程序必须启用此项。
                //启用守护进程后,CWD (当前目录)环境变量的值会发生变更,相对路径的文件读写会出错。PHP 程序中必须使用绝对路径
                'daemonize' => true,
                'worker_num' => 2, //异步非阻塞代码一般设为 CPU 的 1-4 倍。
                'max_request' => 10000, //一个 worker 进程在处理完超过此数值的任务后将自动退出,主要作用是解决 PHP 进程内存溢出问题
                'max_conn' => 1024, //进程最大连接数
                'task_worker_num' => 20, //Task 进程最大值不得超过 cpu_num*1000,该进程是同步阻塞的,里面不得调用异步 IO 函数
                'task_ipc_mode' => 3, //worker 进程与 task 进程之间的通信模式,3 为队列通信并且设置为了争抢模式,使用消息队列通信,如果 Task 进程处理能力低于投递速度,可能会引起 Worker 进程阻塞。
                'message_queue_key' => 0x72000100, //指定消息队列的 key
                'task_max_request' => 10000, //task 进程最大任务数
                'log_file' => '/tmp/swoole.log', //日志文件
                'log_level' => 4, //需要记录的错误级别
                'tcp_fastopen' => true, //开启 TCP 快连接
            ));
    
            //监听服务启动事件
            $this->serv->on('Start', array($this, 'onStart'));
            //监听管理进程启动事件
            $this->serv->on('ManagerStart', array($this, 'onManagerStart'));
            //监听工作进程启动事件
            $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
            //监听工作进程异常退出事件
            $this->serv->on('WorkerError', array($this, 'onWorkerError'));
            //监听工作检测停止事件
            $this->serv->on('WorkerStop', array($this, 'onWorkerStop'));
            //监听连接进入事件
            $this->serv->on('Connect', array($this, 'onConnect'));
            //监听数据接受事件
            $this->serv->on('Receive', array($this, 'onReceive'));
            //监听连接关闭事件
            $this->serv->on('Close', array($this, 'onClose'));
            //监听 task 进程接收任务事件
            $this->serv->on('Task', array($this, 'onTask'));
            //监听 Task 进程完成任务事件
            $this->serv->on('Finish', array($this, 'onFinish'));
    
            //启动服务器
            $this->serv->start();
        }
    
        //onStart 回调中,仅允许 echo、打印 Log、修改进程名称。不得执行其他操作
        public function onStart($serv)
        {
            swoole_set_process_name(self::PROCESS_NAME.'_master');
        }
    
        //在这个回调函数中可以修改管理进程的名称
        public function onManagerStart($serv)
        {
            swoole_set_process_name(self::PROCESS_NAME.'_manager');
        }
    
        //此事件在 Worker 进程 /Task 进程启动时发生,这里创建的对象可以在进程生命周期内使用
        public function onWorkerStart($serv, $worker_id)
        {
            //引入常用函数文件,由于可能会发送更改,所以在 worker 进程开始时引用
            include_once __DIR__.'/mqtt/__cron.php';
            $jobType = $serv->taskworker ? 'Tasker' : 'Worker';
            swoole_set_process_name(self::PROCESS_NAME.'_'.$jobType.'_'.$worker_id);
            //在 task 进程中启动 mqtt 连接
            if ($serv->taskworker) {
                echo "Task 进程({$worker_id})启动\r\n";
                //获取配置
                $this->mqtt_config = get_mqtt_config($worker_id);
                //连接服务器(这里为了以后能加入多个 mqtt 实例,所以我们将连接放入一个数组中)
                foreach ($this->mqtt_config as $key=>$item) {
                    $mqtt = get_mqtt($item, 'o2o_mqtt', []);
                    //如果没有连接上 mqtt 服务器,关闭当前进程
                    if (!$mqtt) {
                        $serv->stop($worker_id, true);
                    }
                    $mqtt_arr[$key] = $mqtt;
                }
                $this->mqtt_connect = $mqtt_arr;
                //30S 发送一次心跳包
                $serv->tick(30000, function () use ($serv, $worker_id) {
                    //发送心跳包
                    foreach ($this->mqtt_config as $key=>$item) {
                        if (!$this->mqtt_connect[$key]['obj']->ping()) {
                            //如果 ping 失败就重新连接
                            echo "{$item['addr']} ping 失败,退出当前 task 进程($worker_id)\r\n";
                            $mqtt = get_mqtt($item, 'o2o_mqtt', []);
                            if (!$mqtt) {
                                $serv->stop($worker_id, true);
                            }
                            $this->mqtt_connect[$key] = $mqtt;
                        }
                    }
                });
            }
    
            //在 worker 进程判断文件是否更新
            if (!$serv->taskworker) {
                //清除文件状态缓存,这个是为了防止下面 filemtime 从缓存中读取
                clearstatcache();
                $filemtime = filemtime(__FILE__);
                //30S 检测一次文件更新
                $serv->tick(30000, function () use ($serv, $worker_id, $filemtime) {
                    //检查文件更新
                    clearstatcache();
                    //如果文件变化,则重启所有的 work 进程
                    if ($filemtime != filemtime(__FILE__)) {
                        echo "文件更新,重启所有 woker/task 进程\r\n";
                        $serv->reload();
                    }
                });
            }
        }
    
        public function onWorkerError($serv, $worker_id, $worker_pid, $exit_code, $signal)
        {
            echo "{$worker_id} Error\r\n";
        }
    
        //此函数在 Worker 进程中执行
        public function onWorkerStop($serv,$worker_id)
        {
            //zend_opcache 的 opcache 清理函数,防止某些服务器开启了 opcache
            opcache_reset();
        }
    
        //此函数在 Worker 进程中执行
        public function onConnect($serv, $fd)
        {
            //echo "Client: connect.\n";
        }
    
        //此函数在 Worker 进程中执行
        public function onReceive($serv, $fd, $from_id, $data)
        {
            $param['data'] = json_decode($data,true);
            $param['fd'] = $fd;
            //向 task 进程投递任务
            $serv->task(json_encode($param));
        }
    
        //此函数在 Task 进程中执行
        public function onTask($serv, $task_id, $src_worker_id, $data)
        {
            $st = microtime(true);
            $param = json_decode($data, true);
            $data = $param['data'];
            $fd = $param['fd'];
            $return = ['code' => 2, 'msg' => 'mqtt 消息发送失败'];
            foreach ($this->mqtt_connect as $key=>$value) {
                if ($value['minDevNo'] < $data['device_id'] && $value['maxDevNo'] > $data['device_id']) {
                    $res = send_message($value['obj'], $data['mqtt_topic'], $data['message']);
                    if ($res) {
                        $return['code'] = 1;
                        $return['msg'] = 'mqtt 消息发送成功';
                        echo "接收到数据".$data['message'].', 发往'.$data['mqtt_topic']."成功".time()."\r\n";
                    }else{
                        //断线重连
                        echo "接收到数据".$data['message'].', 发往'.$data['mqtt_topic']."失败,重新连接 mqtt\r\n";
                        foreach ($this->mqtt_config as $k=>$item) {
                            $mqtt = get_mqtt($item, 'o2o_mqtt', []);
                            if (!$mqtt) {
                                $serv->stop($serv->worker_id, true);
                            }
                            $mqtt_arr[$k] = $mqtt;
                        }
                        $this->mqtt_connect = $mqtt_arr;
                    }
                }
            }
            $res = json_encode($return);
            $serv->send($fd, $res);
            $et = microtime(true);
            echo "任务{$src_worker_id}-{$serv->worker_id}-{$task_id}完成,花费时间".($et-$st)."S\r\n";
            return $res;
        }
    
        //此函数在 worker 进程中执行
        public function onFinish($serv, $task_id, $data)
        {
            //echo "{$task_id}回调完成\r\n";
        }
    
        public function onClose($server, $fd, $reactorId)
        {
            //echo "Client: close.\n";
        }
    }
    
    $serv = new TcpServer();
    
    14 条回复    2019-04-03 21:58:00 +08:00
    wo642436249
        1
    wo642436249  
       2019-01-29 16:05:09 +08:00
    干嘛不用协程
    echo404
        2
    echo404  
    OP
       2019-01-29 16:12:46 +08:00
    @wo642436249 刚接触 swoole,异步非阻塞就已经写得很费力了,暂时没有能力去写个协程版的,而且问题应该也不是在这块吧
    puritania
        3
    puritania  
       2019-01-29 16:21:20 +08:00
    所以我选择 golang
    AngryPanda
        4
    AngryPanda  
       2019-01-29 16:22:01 +08:00
    [task 进程不应该是个单线程的进程么?]

    我的理解不是。比如你只开了 2 个 worker,难道只能有 2 个请求被并行处理?那并发数怎么可能上的去呢?
    triptipstop
        5
    triptipstop  
       2019-01-29 16:23:25 +08:00
    写不好 PHP 的才用 Go
    AngryPanda
        6
    AngryPanda  
       2019-01-29 16:27:11 +08:00
    我最近也是第一次用 swoole 来写了一个应用,使用的协程接口。

    性能上的确提升很大,然而写法让人很不习惯。这点比 golang 难用多了。
    echo404
        7
    echo404  
    OP
       2019-01-29 16:34:16 +08:00
    @AngryPanda 测试服务器是我自己的一个 1 核 2G 的小水管,所以按文档中所说,worker 进程是 CPU 数的 1-4 倍,我只开了 2 个进程,每个 worker 进程处理的最大连接数为 1024,2 个进程就同时接收 2048 个请求。超出这个数值之后,如果再有请求去连接这个 TCP 服务器应该会报错误,但是我这边的压测日志中,并没有记录到对应的错误,所以应该还没有到达最大并发量才对
    ferock
        8
    ferock  
       2019-01-29 16:37:01 +08:00
    @AngryPanda swoole 研发成本其实并不低,比较起来,还不如用其他语言带来的性能提升来的 “核算”
    AngryPanda
        9
    AngryPanda  
       2019-01-29 16:37:27 +08:00
    @echo404 单线程怎么同时处理这些多连接的请求呢。
    AngryPanda
        10
    AngryPanda  
       2019-01-29 16:39:30 +08:00
    @ferock 的确如此。swoole 的协程还需要配合很多协程客户端来用,这点限制非常大。且和原来的 php 写法差异比较大。
    echo404
        11
    echo404  
    OP
       2019-01-29 16:49:15 +08:00
    @AngryPanda 就我的理解:worker 进程是多线程的,task 进程是单线程的,worker 进程接收到 reactor 进程传递过来的请求之后,将请求投递到 linux 系统自带的队列中去(这个过程是异步的),task 进程就一直读取这个队列中的消息进行处理
    liuxu
        12
    liuxu  
       2019-01-29 17:24:27 +08:00   ❤️ 1
    1 核 2G,不会 1M 带宽吧,压测带宽跑满了吧
    echo404
        13
    echo404  
    OP
       2019-01-30 10:34:25 +08:00
    @liuxu 还真是,把网络这块给忘了
    chdahuzi
        14
    chdahuzi  
       2019-04-03 21:58:00 +08:00
    @AngryPanda swoole4 内置了协程,即便不显示得用协程,每个请求都用到了协程
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1020 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 21:07 · PVG 05:07 · LAX 13:07 · JFK 16:07
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.