您的位置:首页 > 编程语言 > PHP开发

php多并发任务执行演示

2012-08-02 12:27 239 查看
<?php
/**
* 多任务并发执行演示.
*/

//最大并发数,用来保护服务器不被累死
define('MAX_CONCURRENCY', 50);

/**
* 多任务并发类
*/
class MultiTask
{
/**
* 任务池
* @var array $pool
*/
private $pool = array();

/**
* 并发数
* @var int $concurrency
*/
private $concurrency = 1;

/**
* 设置并发数
* @param int $concurrency
*/
public function setConcurrency($concurrency = 1)
{
if($concurrency>=1 && $concurrency<=MAX_CONCURRENCY)
{
$this->concurrency = $concurrency;
}
}

/**
* 添加一个新任务. 如果任务池满了,就先消化一个任务池内的任务.
* @param Task $new_task
* @return bool
*/
public function addTask(Task $new_task)
{
$pool = &$this->pool;
$status = $new_task->status();
if($this->isFull())
{
$this->doWork();
}
$pool[] = $new_task;
//echo "\033[1;31m+++task:{$status['command']}\033[m\r\n";
return true;
}

/**
* 消化一个任务池内的任务,本方法是本程序的核心所在
* @return bool
*/
private function doWork()
{
$pool = &$this->pool;
while($this->notEmpty())
{
foreach($pool as $tid=>$task)
{
$status = $task->status();
if($task->isRunning())
{
if($task->isTimeout())
{
$task->terminate();
echo "\033[1;35m***task:$tid:{$status['command']} timeout, force closed!\033[m\r\n";
unset($pool[$tid]);
return false;
}
//echo "checking task:$tid:{$status['command']}:".intval($status['excute_time'])."\r\n";
}
else
{
$task->close();
echo "\033[1;32m---task:$tid:{$status['command']} finished!\033[m\r\n";
unset($pool[$tid]);
return true;
}

}
usleep(1000);
}
return true;
}

/**
* 判断任务池满了
* @return bool
*/

private function isFull()
{
return count($this->pool) >= $this->concurrency;
}

/**
* 判断任务池非空
* @return bool
*/
private function notEmpty()
{
return !empty($this->pool);
}

/**
* 全部任务添加后的完成阶段
*/
public function finish()
{
//echo "\033[1;31mWaiting all tasks to be finished!\033[m\r\n";
while($this->notEmpty())
{
$this->doWork();
usleep(1000);
}
//echo "\033[1;31mAll tasks have been done!\033[m\r\n";
}
public function __destruct()
{
$this->finish();
}
}
/*
* 任务类
*/
class Task
{
/**
* 任务句柄
* @var resource $handle
*/
private $handle;

/**
* 任务开始时间
* @var time $start_time
*/
private $start_time;

/**
* 命令管道,包括 0=>输入  1=>输出  2=>错误
* @var array $pipes
*/
private $pipes = array();

/**
* 任务状态
* @var array $status
*/
private $status = array();

/**
* 指定任务运行所需的环境变量
* @var array $env
*/
private $env = array();

/**
* 指定任务运行所需的当前路径
* @var string $cwd
*/
private $cwd = '';

/**
* 超时时间,单位为秒,默认为0,值为0时,永不超时.
* @var int $timeout
*/
private $timeout = 0;

/**
* 错误日志的路径
* @var string $error_log
*/
private $error_log = './error.log';

/**
* 用构造函数来调用命令
* @param string $cmd
* @param array $param
*/
public function  __construct($cmd, $param=array())
{
$this->cwd = getcwd();
//只接受合法的参数
foreach(array(
'env',
'cwd',
'timeout',
'error_log',
) as $validparam)
{
if(isset($param[$validparam]))
{
$this->$validparam = $param[$validparam];

}
}
$desc = array(
0=>array('pipe','r'),
1=>array('pipe','w'),
2=>array('file', $this->error_log, 'a')
);
//执行命令
$this->handle = proc_open($cmd, $desc, $this->pipes, $this->cwd, $this->env);
//把输出设成非阻塞
stream_set_blocking($this->pipes[1], 0);
$this->start_time = microtime(true);
}
/**
* 任务超时的判断,调用本方法前应先调用 $task->status()方法
* @return bool
*/
public function isTimeout()
{
return $this->timeout
? $this->status['excute_time'] >= $this->timeout
: false;
}

/**
* 判断任务是否在执行,调用本方法前应先调用 $task->status()方法
* @return bool
*/
public function isRunning()
{
return $this->status['running'];
}

/**
* 正常结束任务
*/
public function close()
{
if(is_resource($this->handle))
{
//stream_set_blocking($this->pipes[1],1);
fpassthru($this->pipes[1]);
proc_close($this->handle);
}
}

/**
* 强行终止超时的任务
*/
public function terminate()
{
if(is_resource($this->handle))
{
proc_terminate($this->handle);
proc_close($this->handle);
}
}

/**
* 获取任务状态
* @return array
*/
public function status()
{
$status = & $this->status;
//获取进程句柄的状态
$status = proc_get_status($this->handle);
$status['start_time'] = $this->start_time;
$status['excute_time'] = microtime(true) - $this->start_time;
return $status;
}
}

$m = new MultiTask();
$m->setConcurrency(10);

$cmd = "cmd1";
$m->addTask(new Task($cmd));

$cmd = "cmd2";
$m->addTask(new Task($cmd));
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息