您的位置:首页 > 编程语言 > PHP开发

thinkPHP3.2.3集成swoole扩展

2016-04-05 16:02 661 查看


swoole.php

#!/bin/env php
<?php
/**
* 默认时区定义
*/
date_default_timezone_set('Asia/Shanghai');

/**
* 设置错误报告模式
*/
error_reporting(0);

/**
* 设置默认区域
*/
setlocale(LC_ALL, "zh_CN.utf-8");

/**
* 检测 PDO_MYSQL
*/
if (!extension_loaded('pdo_mysql')) {
exit('PDO_MYSQL extension is not installed' . PHP_EOL);
}
/**
* 检查exec 函数是否启用
*/
if (!function_exists('exec')) {
exit('exec function is disabled' . PHP_EOL);
}
/**
* 检查命令 lsof 命令是否存在
*/
exec("whereis lsof", $out);
if ($out[0] == 'lsof:') {
exit('lsof is not found' . PHP_EOL);
}
/**
* 定义项目根目录&swoole-task pid
*/
define('SWOOLE_PATH', __DIR__);
define('SWOOLE_TASK_PID_PATH', SWOOLE_PATH . DIRECTORY_SEPARATOR . 'Swoole' . DIRECTORY_SEPARATOR . 'tmp' . DIRECTORY_SEPARATOR . 'swoole-task.pid');
define('SWOOLE_TASK_NAME_PRE', 'swooleServ');

/**
* 加载 swoole server
*/
include SWOOLE_PATH . DIRECTORY_SEPARATOR . 'Swoole' . DIRECTORY_SEPARATOR . 'SwooleServer.php';

function portBind($port) {
$ret = [];
$cmd = "lsof -i :{$port}|awk '$1 != \"COMMAND\"  {print $1, $2, $9}'";
exec($cmd, $out);
if ($out) {
foreach ($out as $v) {
$a = explode(' ', $v);
list($ip, $p) = explode(':', $a[2]);
$ret[$a[1]] = [
'cmd' => $a[0],
'ip' => $ip,
'port' => $p,
];
}
}

return $ret;
}

function servStart($host, $port, $daemon, $name) {
echo "正在启动 swoole-task 服务" . PHP_EOL;
if (!is_writable(dirname(SWOOLE_TASK_PID_PATH))) {
exit("swoole-task-pid文件需要目录的写入权限:" . dirname(SWOOLE_TASK_PID_PATH) . PHP_EOL);
}
if (file_exists(SWOOLE_TASK_PID_PATH)) {
$pid = explode("\n", file_get_contents(SWOOLE_TASK_PID_PATH));
$cmd = "ps ax | awk '{ print $1 }' | grep -e \"^{$pid[0]}$\"";
exec($cmd, $out);
if (!empty($out)) {
exit("swoole-task pid文件 " . SWOOLE_TASK_PID_PATH . " 存在,swoole-task 服务器已经启动,进程pid为:{$pid[0]}" . PHP_EOL);
} else {
echo "警告:swoole-task pid文件 " . SWOOLE_TASK_PID_PATH . " 存在,可能swoole-task服务上次异常退出(非守护模式ctrl+c终止造成是最大可能)" . PHP_EOL;
unlink(SWOOLE_TASK_PID_PATH);
}
}
$bind = portBind($port);
if ($bind) {
foreach ($bind as $k => $v) {
if ($v['ip'] == '*' || $v['ip'] == $host) {
exit("端口已经被占用 {$host}:$port, 占用端口进程ID {$k}" . PHP_EOL);
}
}
}
unset($_SERVER['argv']);
$_SERVER['argc'] = 0;
echo "启动 swoole-task 服务成功" . PHP_EOL;
$server = new SwooleServer('127.0.0.1', 9501);
$server->run();
//确保服务器启动后swoole-task-pid文件必须生成
/*if (!empty(portBind($port)) && !file_exists(SWOOLE_TASK_PID_PATH)) {
exit("swoole-task pid文件生成失败( " . SWOOLE_TASK_PID_PATH . ") ,请手动关闭当前启动的swoole-task服务检查原因" . PHP_EOL);
}*/
}

function servStop($host, $port, $isRestart = false) {
echo "正在停止 swoole-task 服务" . PHP_EOL;
if (!file_exists(SWOOLE_TASK_PID_PATH)) {
exit('swoole-task-pid文件:' . SWOOLE_TASK_PID_PATH . '不存在' . PHP_EOL);
}
$pid = explode("\n", file_get_contents(SWOOLE_TASK_PID_PATH));
$bind = portBind($port);
if (empty($bind) || !isset($bind[$pid[0]])) {
exit("指定端口占用进程不存在 port:{$port}, pid:{$pid[0]}" . PHP_EOL);
}
$cmd = "kill {$pid[0]}";
exec($cmd);
do {
$out = [];
$c = "ps ax | awk '{ print $1 }' | grep -e \"^{$pid[0]}$\"";
exec($c, $out);
if (empty($out)) {
break;
}
} while (true);
//确保停止服务后swoole-task-pid文件被删除
if (file_exists(SWOOLE_TASK_PID_PATH)) {
unlink(SWOOLE_TASK_PID_PATH);
}
$msg = "执行命令 {$cmd} 成功,端口 {$host}:{$port} 进程结束" . PHP_EOL;
if ($isRestart) {
echo $msg;
} else {
exit($msg);
}
}

function servReload($host, $port, $isRestart = false) {
echo "正在平滑重启 swoole-task 服务" . PHP_EOL;
try {
$client = new \swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
$ret = $client->connect($host, $port);
if (empty($ret)) {
exit("{$host}:{$port} swoole-task服务不存在或者已经关闭" . PHP_EOL);
} else {
$client->send(json_encode(array('action' => 'reload')));
}
$msg = "执行命令reload成功,端口 {$host}:{$port} 进程重启" . PHP_EOL;
if ($isRestart) {
echo $msg;
} else {
exit($msg);
}
} catch (Exception $e) {
exit($e->getMessage() . PHP_EOL . $e->getTraceAsString());
}
}

function servClose($host, $port, $isRestart = false) {
echo "正在关闭 swoole-task 服务" . PHP_EOL;
try {
$client = new \swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
$ret = $client->connect($host, $port);
if (empty($ret)) {
exit("{$host}:{$port} swoole-task服务不存在或者已经关闭" . PHP_EOL);
} else {
$client->send(json_encode(array('action' => 'close')));
}
//确保停止服务后swoole-task-pid文件被删除
if (file_exists(SWOOLE_TASK_PID_PATH)) {
unlink(SWOOLE_TASK_PID_PATH);
}
$msg = "执行命令close成功,端口 {$host}:{$port} 进程结束" . PHP_EOL;
if ($isRestart) {
echo $msg;
} else {
exit($msg);
}
} catch (\Exception $e) {
exit($e->getMessage() . PHP_EOL . $e->getTraceAsString());
}
}

function servStatus($host, $port) {
echo "swoole-task {$host}:{$port} 运行状态" . PHP_EOL;
$pid = explode("\n", file_get_contents(SWOOLE_TASK_PID_PATH));
$bind = portBind($port);
if (empty($bind) || !isset($bind[$pid[0]])) {
exit("指定端口占用进程不存在 port:{$port}, pid:{$pid[0]}" . PHP_EOL);
}
$client = new \swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
$ret = $client->connect($host, $port);
if (empty($ret)) {
exit("{$host}:{$port} swoole-task服务不存在或者已经停止" . PHP_EOL);
} else {
$client->send(json_encode(array('action' => 'status')));
$out = $client->recv();
$a = json_decode($out);
$b = array(
'start_time' => '服务器启动的时间',
'connection_num' => '当前连接的数量',
'accept_count' => '接受的连接数量',
'close_count' => '关闭的连接数量',
'tasking_num' => '当前正在排队的任务数',
'request_count' => '请求的连接数量',
'worker_request_count' => 'worker连接数量',
'task_process_num' => '任务进程数量'
);
foreach ($a as $k1 => $v1) {
if ($k1 == 'start_time') {
$v1 = date("Y-m-d H:i:s", $v1);
}
echo $b[$k1] . ":\t$v1" . PHP_EOL;
}
}
exit();
}

function servList() {
echo "本机运行的swoole-task服务进程" . PHP_EOL;
$cmd = "ps aux|grep " . SWOOLE_TASK_NAME_PRE . "|grep -v grep|awk '{print $1, $2, $6, $8, $9, $11}'";
exec($cmd, $out);
if (empty($out)) {
exit("没有发现正在运行的swoole-task服务" . PHP_EOL);
}
echo "USER PID RSS(kb) STAT START COMMAND" . PHP_EOL;
foreach ($out as $v) {
echo $v . PHP_EOL;
}
exit();
}

//可执行命令
$cmds = [
'start',
'stop',
'restart',
'reload',
'close',
'status',
'list',
];
$shortopts = "dDh:p:n:";
$longopts = [
'help',
'daemon',
'nondaemon',
'host:',
'port:',
'name:',
];
$opts = getopt($shortopts, $longopts);

if (isset($opts['help']) || $argc < 2) {
echo <<<HELP
用法:php swoole.php 选项 ... 命令[start|stop|restart|reload|close|status|list]
管理swoole-task服务,确保系统 lsof 命令有效
如果不指定监听host或者port,使用配置参数

参数说明
--help  显示本帮助说明
-d, --daemon    指定此参数,以守护进程模式运行,不指定则读取配置文件值
-D, --nondaemon 指定此参数,以非守护进程模式运行,不指定则读取配置文件值
-h, --host  指定监听ip,例如 php swoole.php -h127.0.0.1
-p, --port  指定监听端口port, 例如 php swoole.php -h127.0.0.1 -p9520
-n, --name  指定服务进程名称,例如 php swoole.php -ntest start, 则进程名称为SWOOLE_TASK_NAME_PRE-name
启动swoole-task 如果不指定 host和port,读取默认配置
强制关闭swoole-task 必须指定port,没有指定host,关闭的监听端口是  *:port,指定了host,关闭 host:port端口
平滑关闭swoole-task 必须指定port,没有指定host,关闭的监听端口是  *:port,指定了host,关闭 host:port端口
强制重启swoole-task 必须指定端口
平滑重启swoole-task 必须指定端口
获取swoole-task 状态,必须指定port(不指定host默认127.0.0.1), tasking_num是正在处理的任务数量(0表示没有待处理任务)

HELP;
exit;
}
//参数检查
foreach ($opts as $k => $v) {
if (($k == 'h' || $k == 'host')) {
if (empty($v)) {
exit("参数 -h --host 必须指定值\n");
}
}
if (($k == 'p' || $k == 'port')) {
if (empty($v)) {
exit("参数 -p --port 必须指定值\n");
}
}
if (($k == 'n' || $k == 'name')) {
if (empty($v)) {
exit("参数 -n --name 必须指定值\n");
}
}
}

//命令检查
$cmd = $argv[$argc - 1];
if (!in_array($cmd, $cmds)) {
exit("输入命令有误 : {$cmd}, 请查看帮助文档\n");
}

//监听ip 127.0.0.1,空读取配置文件
$host = '127.0.0.1';
if (!empty($opts['h'])) {
$host = $opts['h'];
if (!filter_var($host, FILTER_VALIDATE_IP)) {
exit("输入host有误:{$host}");
}
}
if (!empty($opts['host'])) {
$host = $opts['host'];
if (!filter_var($host, FILTER_VALIDATE_IP)) {
exit("输入host有误:{$host}");
}
}
//监听端口,9501 读取配置文件
$port = 9501;
if (!empty($opts['p'])) {
$port = (int)$opts['p'];
if ($port <= 0) {
exit("输入port有误:{$port}");
}
}
if (!empty($opts['port'])) {
$port = (int)$opts['port'];
if ($port <= 0) {
exit("输入port有误:{$port}");
}
}
//进程名称 没有默认为 SWOOLE_TASK_NAME_PRE;
$name = SWOOLE_TASK_NAME_PRE;
if (!empty($opts['n'])) {
$name = $opts['n'];
}
if (!empty($opts['name'])) {
$name = $opts['n'];
}
//是否守护进程 -1 读取配置文件
$isdaemon = -1;
if (isset($opts['D']) || isset($opts['nondaemon'])) {
$isdaemon = 0;
}
if (isset($opts['d']) || isset($opts['daemon'])) {
$isdaemon = 1;
}
//启动swoole-task服务
if ($cmd == 'start') {
servStart($host, $port, $isdaemon, $name);
}
//强制停止swoole-task服务
if ($cmd == 'stop') {
if (empty($port)) {
exit("停止swoole-task服务必须指定port" . PHP_EOL);
}
servStop($host, $port);
}
//关闭swoole-task服务
if ($cmd == 'close') {
if (empty($port)) {
exit("停止swoole-task服务必须指定port" . PHP_EOL);
}
servClose($host, $port);
}
//强制重启swoole-task服务
if ($cmd == 'restart') {
if (empty($port)) {
exit("重启swoole-task服务必须指定port" . PHP_EOL);
}
echo "重启swoole-task服务" . PHP_EOL;
servStop($host, $port, true);
servStart($host, $port, $isdaemon, $name);
}
//平滑重启swoole-task服务
if ($cmd == 'reload') {
if (empty($port)) {
exit("平滑重启swoole-task服务必须指定port" . PHP_EOL);
}
echo "平滑重启swoole-task服务" . PHP_EOL;
servReload($host, $port, true);
}
//查看swoole-task服务状态
if ($cmd == 'status') {
if (empty($host)) {
$host = '127.0.0.1';
}
if (empty($port)) {
exit("查看swoole-task服务必须指定port(host不指定默认使用127.0.0.1)" . PHP_EOL);
}
servStatus($host, $port);
}
//查看swoole-task服务进程列表
if ($cmd == 'list') {
servList();
}


SwooleServer.php

<?php

/**
* Swoole服务端
*/
class SwooleServer {

private $_serv = null;
private $_setting = array();

public function __construct($host = '0.0.0.0', $port = 9501) {
$this->_setting = array(
'host' => $host,
'port' => $port,
'env' => 'dev', //环境 dev|test|prod
'process_name' => SWOOLE_TASK_NAME_PRE,  //swoole 进程名称
'worker_num' => 4, //一般设置为服务器CPU数的1-4倍
'task_worker_num' => 4,  //task进程的数量
'task_ipc_mode' => 3,  //使用消息队列通信,并设置为争抢模式
'task_max_request' => 10000,  //task进程的最大任务数
'daemonize' => 1, //以守护进程执行
'max_request' => 10000,
'dispatch_mode' => 2,
'log_file' => SWOOLE_PATH . DIRECTORY_SEPARATOR . 'App' . DIRECTORY_SEPARATOR . 'Runtime' . DIRECTORY_SEPARATOR . 'Logs' . DIRECTORY_SEPARATOR . 'Swoole' . date('Ymd') . '.log',  //日志
);
}

/**
* 运行swoole服务
*/
public function run() {
$this->_serv = new \swoole_server($this->_setting['host'], $this->_setting['port']);
$this->_serv->set(array(
'worker_num' => $this->_setting['worker_num'],
'task_worker_num' => $this->_setting['task_worker_num'],
'task_ipc_mode ' => $this->_setting['task_ipc_mode'],
'task_max_request' => $this->_setting['task_max_request'],
'daemonize' => $this->_setting['daemonize'],
'max_request' => $this->_setting['max_request'],
'dispatch_mode' => $this->_setting['dispatch_mode'],
'log_file' => $this->_setting['log_file']
));
$this->_serv->on('Start', array($this, 'onStart'));
$this->_serv->on('Connect', array($this, 'onConnect'));
$this->_serv->on('WorkerStart', array($this, 'onWorkerStart'));
$this->_serv->on('ManagerStart', array($this, 'onManagerStart'));
$this->_serv->on('WorkerStop', array($this, 'onWorkerStop'));
$this->_serv->on('Receive', array($this, 'onReceive'));
$this->_serv->on('Task', array($this, 'onTask'));
$this->_serv->on('Finish', array($this, 'onFinish'));
$this->_serv->on('Shutdown', array($this, 'onShutdown'));
$this->_serv->on('Close', array($this, 'onClose'));
$this->_serv->start();
}

/**
* 设置swoole进程名称
* @param string $name swoole进程名称
*/
private function setProcessName($name) {
if (function_exists('cli_set_process_title')) {
cli_set_process_title($name);
} else {
if (function_exists('swoole_set_process_name')) {
swoole_set_process_name($name);
} else {
trigger_error(__METHOD__ . " failed. require cli_set_process_title or swoole_set_process_name.");
}
}
}

/**
* Server启动在主进程的主线程回调此函数
* @param $serv
*/
public function onStart($serv) {
if (!$this->_setting['daemonize']) {
echo 'Date:' . date('Y-m-d H:i:s') . "\t swoole_server master worker start\n";
}
$this->setProcessName($this->_setting['process_name'] . '-master');
//记录进程id,脚本实现自动重启
$pid = "{$serv->master_pid}\n{$serv->manager_pid}";
file_put_contents(SWOOLE_TASK_PID_PATH, $pid);
}

/**
* worker start 加载业务脚本常驻内存
* @param $server
* @param $workerId
*/
public function onWorkerStart($serv, $workerId) {
if ($workerId >= $this->_setting['worker_num']) {
$this->setProcessName($this->_setting['process_name'] . '-task');
} else {
$this->setProcessName($this->_setting['process_name'] . '-event');
}
// 开启调试模式 建议开发阶段开启 部署阶段注释或者设为false
define('APP_DEBUG', True);
// 定义应用目录
define('APP_PATH', SWOOLE_PATH . DIRECTORY_SEPARATOR . 'Application' . DIRECTORY_SEPARATOR);
// 定义应用模式
define('APP_MODE', 'cli');
// 引入ThinkPHP入口文件
require SWOOLE_PATH . DIRECTORY_SEPARATOR . 'ThinkPHP' . DIRECTORY_SEPARATOR . 'ThinkPHP.php';
}

/**
* 监听连接进入事件
* @param $serv
* @param $fd
*/
public function onConnect($serv, $fd) {
if (!$this->_setting['daemonize']) {
echo 'Date:' . date('Y-m-d H:i:s') . "\t swoole_server connect[" . $fd . "]\n";
}
}

/**
* worker 进程停止
* @param $server
* @param $workerId
*/
public function onWorkerStop($serv, $workerId) {
if (!$this->_setting['daemonize']) {
echo 'Date:' . date('Y-m-d H:i:s') . "\t swoole_server[{$serv->setting['process_name']}  worker:{$workerId} shutdown\n";
}
}

/**
* 当管理进程启动时调用
* @param $serv
*/
public function onManagerStart($serv) {
if (!$this->_setting['daemonize']) {
echo 'Date:' . date('Y-m-d H:i:s') . "\t swoole_server manager worker start\n";
}
$this->setProcessName($this->_setting['process_name'] . '-manager');
}

/**
* 此事件在Server结束时发生
*/
public function onShutdown($serv) {
if (file_exists(SWOOLE_TASK_PID_PATH)) {
unlink(SWOOLE_TASK_PID_PATH);
}
if (!$this->_setting['daemonize']) {
echo 'Date:' . date('Y-m-d H:i:s') . "\t swoole_server shutdown\n";
}
}

/**
* 监听数据发送事件
* @param $serv
* @param $fd
* @param $from_id
* @param $data
*/
public function onReceive($serv, $fd, $from_id, $data) {
if (!$this->_setting['daemonize']) {
echo "Get Message From Client {$fd}:{$data}\n\n";
}
$result = json_decode($data, true);
switch ($result['action']) {
case 'reload':  //重启
$serv->reload();
break;
case 'close':  //关闭
$serv->shutdown();
break;
case 'status':  //状态
$serv->send($fd, json_encode($serv->stats()));
break;
default:
$serv->task($data);
break;
}
}

/**
* 监听连接Task事件
* @param $serv
* @param $task_id
* @param $from_id
* @param $data
*/
public function onTask($serv, $task_id, $from_id, $data) {
$result = json_decode($data, true);
//用TP处理各种逻辑
$serv->finish($data);
}

/**
* 监听连接Finish事件
* @param $serv
* @param $task_id
* @param $data
*/
public function onFinish($serv, $task_id, $data) {
if (!$this->_setting['daemonize']) {
echo "Task {$task_id} finish\n\n";
echo "Result: {$data}\n\n";
}
}

/**
* 监听连接关闭事件
* @param $serv
* @param $fd
*/
public function onClose($serv, $fd) {
if (!$this->_setting['daemonize']) {
echo 'Date:' . date('Y-m-d H:i:s') . "\t swoole_server close[" . $fd . "]\n";
}
}

}


相关命令:

1、服务启动
#启动服务,不指定绑定端口和ip,则使用默认配置
php swoole.php start
#启动服务 指定ip 和 port
php swoole.php -h127.0.0.1 -p9501 start
#启动服务 守护进程模式
php swoole.php -h127.0.0.1 -p9501 -d start
#启动服务 非守护进程模式
php swoole.php -h127.0.0.1 -p9501 -D start
#启动服务 指定进程名称(显示进程名为 swooleServ-9510-[master|manager|event|task]
php swoole.php -h127.0.0.1 -p9501 -n 9501 start

2、强制服务停止
php swoole.php stop
php swoole.php -p9501 stop
php swoole.php -h127.0.0.1 -p9501 stop

3、关闭服务
php swoole.php close
php swoole.php -p9501 close
php swoole.php -h127.0.0.1 -p9501 close

4、强制服务重启
php swoole.php restart
php swoole.php -p9501 restart
php swoole.php -h127.0.0.1 -p9501 restart

5、平滑服务重启
php swoole.php reload
php swoole.php -p9501 reload
php swoole.php -h127.0.0.1 -p9501 reload

6、服务状态
php swoole.php status
php swoole.php -h127.0.0.1 -p9501 status

7、swoole-task所有启动实例进程列表(一台服务器swoole-task可以有多个端口绑定的实例)
php swoole.php list

Swoole-ThinkPHP.zip ( 1.14 MB )
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: