Group-Co
为什么写这个框架?
- 利用协程特性以同步方式来编写异步代码,增强可读性。
- 将swoole的异步特性与传统框架的MVC相结合。
- 可以用作api也可以用作http server,rpc server.
- 目前实现了以Zookeeper、Redis、Mysql为注册中心的服务化治理.
如何使用,与传统框架的区别?
- 框架基本使用与传统框架基本一致,路由,控制器,服务层,数据层。
- 在异步调用的地方需要以yield关键词来触发协程切换
特性
- 全异步协程调度,支持高并发
- 异步TCP,HTTP客户端
- 异步日志
- 异步文件读写
- 异步Mysql
- 异步Mysql事务处理
- 异步Redis
- 支持Mysql连接池,Redis连接池
- SOA服务化调用,内部封装完整的RPC通信,服务端采用异步Task处理后合并数据并返回。
- 异步TCP客户端支持并行、串行调用
- 支持EOF结束符协议、自定义网络通信协议,支持json化、php序列化包体,支持gzip。
- Twig、Doctrine支持视图、服务数据层
- 单元测试覆盖
文档总览
- 快速开始
- 异步服务
- 服务中心
- 基础服务
- 同步服务(用于服务开发)
- 控制台
环境依赖
- hiredis(redis异步库)
- redis
- mysql
- php >5.6 或者 php > 7.0
- swoole >=1.9.17(建议升级到最新版本) (在编译swoole时加入--enable-async-redis,开启异步redis客户端, --enable-openssl开启openssl支持,--with-openssl-dir指定你的openssl目录)
注:openssl是用于http异步客户端抓取https网址时依赖的模块,可以选择性开启
启动项目(请先完成环境依赖安装)
- 克隆项目
- 执行 => composer install (如果安装很慢,可以使用国内镜像,但是镜像包会有延迟)
- 新建一个runtime目录,用于存放日志等cache文件
- 配置config中的database配置文件
- 设置config/service.php中的registry_address.目前只支持redis、mysql作为注册中心
- 启动http server => php server.php
- 访问 http://localhost:9777/ 开始异步协程之旅
Demo(将用nginx做一次反向代理资源文件)
- 修改配置nginx,见doc/nginx.md,配置hosts
- 配置config中的service配置文件
- 执行脚本 => app/console sql:migrate
- 启动user服务 => app/service user
- 使用监控Monitor服务 app/service monitor
- 还可以启动其他服务,自行配置
- 访问配置的servername => groupco.com/demo 即可
更新代码
- 执行 => composer update
使用
- 启动http server => php server.php
- 热重启htt pserver => php server.php -s reload
- 关闭http server => php server.php -s stop
- 重启http server => php server.php -s restart
- 启动某个服务 => app/service user
- 热重启某个服务 => app/service user reload
- 关闭某个服务 => app/service user stop
特别注意
- 1.额外内存释放的问题,局部静态变量,全局变量的释放。
- 2.断线重连机制内部已封装(在执行sql时如果出现长连接已失效,将尝试3次重连操作)。
异步Tcp客户端
串行发包
use AsyncTcp;
$tcp = new AsyncTcp('127.0.0.1', 9501);
$tcp->setTimeout(2);
//串行发送
$res = (yield $tcp->call('hello server!'));
$res = (yield $tcp->call('hello server!'));
并行发包
use AsyncTcp;
$tcp = new AsyncTcp('127.0.0.1', 9501);
$tcp->setTimeout(2);
//并行发送数据包
$tcp->addCall('hello server1!');
$tcp->addCall('hello server2!');
$res = (yield $tcp->multiCall());
Tips(如果使用tcp异步客户端和其他服务端通信)
- tcp客户端的数据包格式可在config/app.php中配置.
- protocol为buf时,是按包头+包体封装数据包的,包头为4个字节,存放包体的长度,解包时同样也是按包头+包体解包,所以服务端send数据时也要按同样规则封包。
- protocol为eof时,是按'\r\n'结束符封装数据包的,解包时同样也是按'\r\n'解包,所以服务端send数据时也要按'\r\n'结束符封装数据包。
- protocol为空的话,不封装数据包。在应答式响应中可以使用,否则会出现粘包现象。(框架内部封装的service为该模式)
异步Http客户端
Get方式
1.使用域名形式
use AsyncHttp;
//直接使用域名, get方式
$http = new AsyncHttp('http://groupco.com');
//设置2s超时
$http->setTimeout(2);
//$http->setCookies(['token' => 'xxxx']);
$res = (yield $http->get('/'));
2.使用ip:port形式
use AsyncHttp;
//也可以通过ip:port方式
$http = new AsyncHttp('http://127.0.0.1:80');
$http->setHost('groupco.com');
$res = (yield $http->get('/user', ['id' => 1]));
Post方式
1.使用域名形式
use AsyncHttp;
//使用https, post方式
$http = new AsyncHttp('https://groupco.com');
$res = (yield $http->post('/test', ['postId' => 52]));
2.使用ip:port形式
use AsyncHttp;
//也可以通过ip:port方式
$http = new AsyncHttp('http://127.0.0.1:80');
$http->setHost('groupco.com');
$res = (yield $http->post('/test', ['postId' => 52]));
注:若请求https地址,需要在编译swoole时开启openssl
异步Mysql客户端
AsyncMysql::query($sql, $usePool = true)
第二个参数设为false将不会使用连接池中的资源,默认都会从连接池中取,配置连接池数量 => config/database.php
具体使用
use AsyncMysql;
//设置超时时间
AsyncMysql::setTimeout(2);
$res = (yield AsyncMysql::query("INSERT INTO `user` (`id`, `mobile`, `password`)
VALUES (NULL, '18768122222', '11111')"));
//失败返回false
if ($res) {
$result = $res->getResult();
$affectedRows = $res->getAffectedRows();
$id = $res->getInsertId();
}
异步Mysql事务处理
与传统事务一样使用,只是需要加上yield关键词,以异步方式调用
use AsyncMysql;
public function test()
{
try {
yield AsyncMysql::begin();
$res = (yield $this->doTrans());
if ($res === false) {
throw new \Exception("need roll back");
}
yield AsyncMysql::commit();
} catch (\Exception $e) {
yield AsyncMysql::rollback();
}
}
public function doTrans()
{
$res = (yield AsyncMysql::query("INSERT INTO `user` (`id`, `mobile`, `password`)
VALUES (NULL, '187681343332', '11111')"));
if ($res) {
$result = $res->getResult();
$affectedRows = $res->getAffectedRows();
$id = $res->getInsertId();
$res = (yield AsyncMysql::query("SELECT * FROM `user` WHERE id = {$id}"));
$res = (yield AsyncMysql::query("SELECT * FROM `user`"));
$res = (yield AsyncMysql::query("DELETE FROM `user` WHERE id = {$id}", false));
}
yield true;
}
异步Redis客户端
连接池(连接池默认开启)
use AsyncRedis;
//关闭连接池
AsyncRedis::enablePool(false);
//开启连接池
AsyncRedis::enablePool(true);
使用AsyncRedis
use AsyncRedis;
//设置超时时间
AsyncRedis::setTimeout(2);
yield AsyncRedis::set('foo', 'bar');
dump(yield AsyncRedis::get('foo'));
$user = json_encode(['foo' => 'bar']);
yield AsyncRedis::hSet('user', 1, $user);
dump(yield AsyncRedis::hGet('user', 1));
修改配置信息config/database.php:
'redis' => [
//redis连接池数量
'maxPool' => 5,
//redis连接超时时间
'timeout' => 5,
'default' => [
'host' => '127.0.0.1',
'port' => 6379,
'prefix' => 'group_',
'auth' => '',
'connect' => 'persistence'
],
],
异步Log日志
use AsyncLog;
yield AsyncLog::info('hello world');
yield AsyncLog::debug('test debug', ['foo' => 'bar']);
yield AsyncLog::notice('hello world',[], 'group.com');
yield AsyncLog::warning('hello world');
yield AsyncLog::error('hello world');
yield AsyncLog::critical('hello world');
yield AsyncLog::alert('hello world');
yield AsyncLog::emergency('hello world');
异步文件读写
读文件
use AsyncFile;
$content = (yield AsyncFile::read(__ROOT__."runtime/test.txt"));
写文件
$res = (yield AsyncFile::write(__ROOT__."runtime/test.txt", "hello wordls!"));
$res = (yield AsyncFile::write(__ROOT__."runtime/test.txt", "hello wordls!", FILE_APPEND));
目前仅支持小于4M的文件
异常Exception
以传统的try,catch抓取异常
如果在业务层不catch,框架层会捕捉,并返回一个500的server error响应。
如果在开发环境会返回一个500的具体错误的trace响应。
try {
throw new \Exception("Error Processing Request", 1);
//yield throwException(new \Exception("Error Processing Request", 1));
} catch (\Exception $e) {
echo $e->getMessage();
}
服务治理流程
注册中心
设置注册中心
修改config/service.php中的registry_address.目前只支持Redis、Mysql注册中心
Redis注册中心
'registry_address' => 'redis://127.0.0.1:6379'
Mysql注册中心
//mysql注册中心,开启后,请执行doc/mysql-registry.sql中的sql,创建2张表
'registry_address' => 'mysql://127.0.0.1:3306?dbname=Demo&user=root&password=123',
设置依赖的服务
修改config/app.php的services.你需要将项目依赖的服务模块写入该数组,在server启动时,会单独起一个进程订阅每个服务
//依赖的服务模块
'services' => ["User", "Order", "Monitor"],
启动服务
执行命令 app/service [需要执行的server名称]
app/service user
监控服务
框架层提供了'Group\Process\HeartbeatProcess',心跳检测类来监控服务健康。你可以在服务配置中加入此进程来启动监控。
例:
'monitor' => [
//本机当前内网ip
'ip' => '127.0.0.1',
'serv' => '0.0.0.0',
'port' => 9517,
'config' => [
//忽略
],
'public' => 'Monitor',
'process' => [
//你可以使用框架封装的心跳检测进程
'Group\Process\HeartbeatProcess',
],
],
在异步HTTP SERVER中使用服务
全局方法service_center()
使用service_center($service)获取服务地址,然后使用call()方法调用公开的服务方法
$service = (yield service_center('User'));
$user = (yield $service->call("User::getUser", ['id' => $userId]));
上面的代码会调用src/Service/User/Service/Impl/UserServiceImpl中的getUser方法
串行调用
//设置2秒超时
$service = (yield service_center("User"));
$service->setTimeout(2);
$users = (yield $service->call("User::getUsersCache", ['ids' => [1, 2]]));
$users2 = (yield $service->call("User::getUsersCache", ['ids' => [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]));
并行调用(只能针对同一服务模块)
$service = (yield service_center("User"));
$service->setTimeout(2);
$callId1 = $service->addCall("User::getUsersCache", ['ids' => [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]);
$callId2 = $service->addCall("User::getUser", ['id' => 1]);
$res = (yield $service->multiCall());
dump($res[$callId1]);
dump($res[$callId2]);
全局方法service()
使用service($serverName)获取服务地址,然后使用call()方法调用公开的服务方法.
service()不会通过注册中心发现服务
service("user")->setTimeout(2);
$users = (yield service("user")->call("User\User::getUsersCache", ['ids' => [1, 2, 3, 4, 5]]));
串行调用
service("user")->setTimeout(2);
$users = (yield service("user")->call("User\User::getUsersCache", ['ids' => [1, 2, 3, 4]]));
$users2 = (yield service("user")->call("User\User::getUsersCache", ['ids' => [1, 2, 3, 4]]));
并行调用
service("user")->setTimeout(2);
$callId1 = service("user")->addCall("User\User::getUsersCache", ['ids' => [1, 2, 3, 4, 5]]);
$callId2 = service("user")->addCall("User\User::getUser", ['id' => 1]);
$res = (yield service("user")->multiCall());
dump($res[$callId1]);
dump($res[$callId2]);
服务调用监控
KernalEvent::SERVICE_CALL事件
在框架层,调用servcie时,会抛出KernalEvent::SERVICE_CALL事件,你可以监听该事件,做数据上报处理,请以异步方式上报
<?php
namespace src\Web\Listeners;
use Listener;
use Event;
class ServiceCallListener extends Listener
{
public function setMethod()
{
return 'onServiceCall';
}
public function onServiceCall(Event $event)
{
$data = $event->getProperty();
$cmd = $data['cmd'];
$calltime = $data['calltime'];
//上报监控平台
//do something
}
}