Async异步服务

  • Async服务是可以无缝接入到任务业务中的,你可以使用框架中的任何服务在实现handle方法中。 Async是基于swoole的task-server服务,用于将慢速任务丢给异步task去处理,从而解决性能问题.(建议更新到swoole最新版本 测试环境为v1.9.2)。
  • 使用场景:单进程业务复杂,可以拆分为多个子进程同时进行业务数据封装,此过程是异步阻塞的,客户端还是会阻塞等待服务端返回数据。

1.修改配置 config/async.php

  • 开启async server
app/async user_server
  • 注意如果要开启守护进程模式,不要设置swoole config的daemonize为true(相对路径会出错),应该如下:
app/async user_server &
  • 执行client查看运行结果
php src/Async/User/clent.php

2.完整流程介绍

使用注意
  • 使用全局变量,局部静态变量时,内存释放的问题。否则有可能内存泄漏!
  • 共享单个数据库对象,对数据库长连接丢失时的断线重连问题。要做好断线重连机制(Group框架内部已经做了)
  • 控制好worker与task的数量。比例。防止出现worker接受大量tcp连接时,task处理不过来,导致数据返回超时。

3.目录结构与使用

3种事件
  • onWork
  • onTask
  • onFinish

4.onWork事件

  • onWork,很明显server端在接受client数据时,触发的事件。分为两个参数cmd(命令名称)和handler(处理的类)。onWork事件旨在将任务分发到task,下面是配置文件
    'onWork' => [
        [   
            //client端发送过来的处理命令
            'cmd' => 'getUserInfo',
            //处理器
            'handler' => 'src\Async\User\Work\UserHandler',
        ],

    ],
  • src/Async/User/Work/UserHandler.php,请继承Group\Async\Handler\WorkHandler类,实现handle()即可。
  • 获取client端发送的数据
    $this -> getData();
  • 投递task任务
    $cmd = "getUserInfo";
    $this -> task($cmd, $data);
完整示例
    <?php

    namespace src\Async\User\Work;

    use Group\Async\Handler\WorkHandler;

    class UserHandler extends WorkHandler
    {
        public function handle()
        {
            $data = $this -> getData();
            foreach ($data as $value) {
                $this -> task($value);
            }
        }
    }

5.onTask事件

  • 投递完task任务,我们来看看onTask事件
    'onTask' => [
        [   
            //work传来的命令
            'cmd' => 'getUserInfo',
            //task处理器
            'handler' => 'src\Async\User\Task\UserHandler',
            //task结束时需要执行的处理器
            'onFinish' => 'src\Async\User\Finish\UserHandler',
        ],
        [   
            //work传来的命令
            'cmd' => 'getUserAddress',
            //task处理器
            'handler' => 'src\Async\User\Task\UserAddressHandler',
        ],

    ],
  • 上面的work丢过来任务后,我们可以写一个src/Async/User/Task/UserHandler.php来处理task进程要做的事情。请继承Group\Async\Handler\TaskHandler类,实现handle()即可。
  • 获取work发送的数据
    $this -> getData();
  • 返回处理好的数据
    return $this->finish($data);
完整示例
<?php

namespace src\Async\User\Task;

use Group\Async\Handler\TaskHandler;

class UserHandler extends TaskHandler
{
    public function handle()
    {    
        $data = $this -> getData();
        $userId = $data['data'];
        $user = $this->getUserService()->getUser($userId);
        if ($userId == 1) {
            $user['cmd'] = 'needAddress';
        }

        return $this->finish($user);
    }

       public function getUserService()
        {
            return $this->createService("User:User");
        }
}

6.onFinish事件

  • task任务完成之后,我们就要写最后的finish事件了(如果不定义finish事件,系统会自动返回task返回的数据)。把数据丢回给client端。
  • 我们可以写一个src/Async/User/Finish/UserHandler.php来处理task进程要做的事情。请继承Group\Async\Handler\FinishHandler类,实现handle()即可。
  • 获取task最后return回来的数据
    $this -> getData();
  • client端的demo 请看src/Async/User/client.php
  • 投递task任务(在结束一个task任务之后,你还可以继续投递给其他的task)
    $cmd = "getUserAddress";
    $this -> task($cmd, $data);
完整示例
<?php

namespace src\Async\User\Finish;

use Group\Async\Handler\FinishHandler;

class UserHandler extends FinishHandler
{
    public function handle()
    {
        $user = $this -> getData();
        if (isset($user['cmd']) && $user['cmd'] == 'needAddress') {
            unset($user['cmd']);
            $this -> task("getUserAddress", $user['name']);
        } else {
            return $user;
        }
    }
}
等到所有task执行完成时,系统会自动返回所有处理完成的数据。

7.Client使用

  • 在框架中使用,Group的Async封装了call方法作为client端与server端通信,包含了4个参数
    $server = 'user_server'; //config配置的serverName
    $cmd = "getUserInfo"; // 传给server的指令
    $data = [1,2,3,4,5,6,7,8,9,10]; // 数据
    $needRecvData = true; //默认为true,false的话server端不会返回数据

    \Async::call($server, $cmd, $data, $needRecvData);
  • 当然你也可以以socket方式来连接server, client端的demo 请看src/Async/User/client.php

results matching ""

    No results matching ""