您的位置:首页 > 编程语言 > PHP开发

[thrift]PHP版多进程服务器

2015-08-21 17:51 537 查看
<?php
/**
 * 多进程形式的server.
 * @package thrift.server
 * @author flynetcn
 */
namespace Thrift\Server;

use Thrift\Server\TServer;
use Thrift\Transport\TTransport;
use Thrift\Exception\TException;
use Thrift\Exception\TTransportException;

class TMultiProcessServer extends TServer
{
	/**
	 * 捕获的信号编号
	 */
	static $catchQuitSignal = 0;

	/**
	 * worker进程数量
	 */
	private $workProcessNum = 4;

	/**
	 * 每个worker进程处理的最大请求数
	 */
	private $maxWorkRequestNum = 2000;

	/**
	 * 当前worker进程已处理的请求数
	 */
	private $currentWorkRequestNum = 0;
	
	/**
	 * 当前连接调用次数
	 */
	private $currentConnectCallNum = 0;

	/**
	 * 发送超时
	 */
	private $sendTimeoutSec = 1;

	/**
	 * 接收超时
	 */
	private $recvTimeoutSec = 1;

	/**
	 * 当前进程pid
	 */
	private $pid = 0;

	/**
	 * Flag for the main serving loop
	 */
	private $stop_ = false;

	/**
	 * List of children.
	 */
	protected $childrens = array();

	/**
	 * 服务器日志文件
	 */
	protected static $logFiles;
	protected static $pidFile;

	/**
	 * run
	 */ 
	public function serve($daemon=false, array $config=array())
	{
		if (isset($config['workProcessNum'])) {
			$this->workProcessNum = intval($config['workProcessNum']);
		}
		if ($this->workProcessNum < 1) {
			self::log(1, "child workProcessNum can not be less than 1");
			throw new TException('child workProcessNum can not be less than 1');
		}
		if (isset($config['maxWorkRequestNum'])) {
			$this->maxWorkRequestNum = intval($config['maxWorkRequestNum']);
		}
		if ($this->maxWorkRequestNum < 1) {
			self::log(1, "child maxWorkRequestNum can not be less than 1");
			throw new TException('child maxWorkRequestNum can not be less than 1');
		}
		if (isset($config['sendTimeoutSec'])) {
			$this->sendTimeoutSec = intval($config['sendTimeoutSec']);
		}
		if (isset($config['recvTimeoutSec'])) {
			$this->recvTimeoutSec = intval($config['recvTimeoutSec']);
		}
		if ($daemon) {
			$this->daemon();
			$this->registerSignalHandler();
			self::$logFiles = isset($config['logFiles']) && is_array($config['logFiles']) ? $config['logFiles'] : array();
			self::$pidFile = isset($config['pidFile']) ? $config['pidFile'] : '';
			declare(ticks=3);
		}
		$this->pid = posix_getpid();
		self::createPidFile($this->pid);
		self::log(0, "manage process({$this->pid}) has started");
		$this->transport_->listen();
		while (!$this->stop_) {
			while ($this->workProcessNum > 0) {
				try {
					$pid = pcntl_fork();
					if ($pid > 0) {
						$this->handleParent($pid, $this->workProcessNum);
					} else if ($pid === 0) {
						$this->pid = posix_getpid();
						$this->handleChild($this->workProcessNum);
					} else {
						self::log(1, "Failed to fork");
						throw new TException('Failed to fork');
					}
					$this->workProcessNum--;
				} catch (Exception $e) {
				}
			}
			$this->collectChildren();
			sleep(2);
			if (\Thrift\Server\TMultiProcessServer::$catchQuitSignal) {
				$this->stop();
			}
		}
	}
	
	public function getCurrentWorkRequestNum()
	{
		return $this->currentWorkRequestNum;
	}
	
	public function getCurrentConnectCallNum()
	{
		return $this->currentConnectCallNum;
	}

	/**
	 * Code run by the parent
	 *
	 * @param int $pid
	 * @param int $num 进程编号
	 * @return void
	 */
	private function handleParent($pid, $num)
	{
		$this->childrens[$pid] = $num;
	}

	/**
	 * Code run by the child.
	 *
	 * @param int $num 进程编号
	 * @return void
	 */
	private function handleChild($num)
	{
		self::log(0, "child process($this->pid) has started");
		$this->childrens = array();
		while (!$this->stop_) {
			try {
				$transport = $this->transport_->accept();
				if ($transport != null) {
					$transport->setSendTimeout($this->sendTimeoutSec * 1000);
					$transport->setRecvTimeout($this->recvTimeoutSec * 1000);
					$this->currentWorkRequestNum++;
					$this->currentConnectCallNum = 0;
					$inputTransport = $this->inputTransportFactory_->getTransport($transport);
					$outputTransport = $this->outputTransportFactory_->getTransport($transport);
					$inputProtocol = $this->inputProtocolFactory_->getProtocol($inputTransport);
					$outputProtocol = $this->outputProtocolFactory_->getProtocol($outputTransport);
					while ($this->processor_->process($inputProtocol, $outputProtocol)) {
						$this->currentConnectCallNum++;
					}
					@$transport->close();
				}
			} catch (TTransportException $e) {
			} catch (Exception $e) {
				self::log(1, $e->getMessage().'('.$e->getCode().')');
			}
			if (\Thrift\Server\TMultiProcessServer::$catchQuitSignal) {
				$this->stop();
			}
			if ($this->currentWorkRequestNum >= $this->maxWorkRequestNum) {
				self::log(0, "child process($this->pid) has processe {$this->currentWorkRequestNum} requests will be exit");
				$this->stop();
				break;
			}
		}
		exit(0);
	}

	/**
	 * Collects any children we may have
	 *
	 * @return void
	 */
	private function collectChildren()
	{
		foreach ($this->childrens as $pid => $num) {
			if (pcntl_waitpid($pid, $status, WNOHANG) > 0) {
				unset($this->childrens[$pid]);
				$this->workProcessNum++;
			}
		}
	}

	/**
	 * @return void
	 */
	public function stop()
	{
		$this->transport_->close();
		$this->stop_ = true;
		foreach ($this->childrens as $pid => $num) {
			if (!posix_kill($pid, SIGTERM)) {
			}
		}
	}

	/**
	 * 附加信号处理
	 */
	public static function sig_handler($signo)
	{
		switch ($signo) {
			case SIGTERM:
			case SIGHUP:
			case SIGQUIT:
			case SIGTSTP:
				$pid = posix_getpid();
				self::log(0, "process($pid) catch signo: $signo");
				\Thrift\Server\TMultiProcessServer::$catchQuitSignal = $signo;
				break;
			default:
		}
	}

	/**
	 * 附加信号处理
	 */
	private function registerSignalHandler()
	{
		pcntl_signal(SIGTERM, '\Thrift\Server\TMultiProcessServer::sig_handler');
		pcntl_signal(SIGHUP, '\Thrift\Server\TMultiProcessServer::sig_handler');
		pcntl_signal(SIGQUIT, '\Thrift\Server\TMultiProcessServer::sig_handler');
		pcntl_signal(SIGTSTP, '\Thrift\Server\TMultiProcessServer::sig_handler');
		declare(ticks=3);
	}

	/**
	 * 附加守护进程方式
	 */
	private function daemon()
	{
		if (!function_exists('posix_setsid')) {
			return;
		}
		if (($pid1 = pcntl_fork()) != 0) {
			exit;
		}
		posix_setsid();
		if (($pid2 = pcntl_fork()) != 0) {
			exit;
		}
	}

	public static function log($type, $msg)
	{
		static $fds;
		$msg = date('Y-m-d H:i:s')." $type {$msg}\n";
		if (isset(self::$logFiles[$type]) && self::$logFiles[$type]) {
			if (file_exists(self::$logFiles[$type])) {
				if (empty($fds[$type])) {
					$fds[$type] = fopen(self::$logFiles[$type], 'a');
				}
				if (!$fds[$type]) {
					$fds[$type] = fopen('php://stdout', 'w');
					fwrite($fds[$type], date('Y-m-d H:i:s')." WARNING fopen(".self::$logFiles[$type].") failed\n");
				}
			} else {
				if (!is_dir(dirname(self::$logFiles[$type])) && !mkdir(dirname(self::$logFiles[$type]), 0755, true)) {
					$fds[$type] = fopen('php://stdout', 'w');
					fwrite($fds[$type], date('Y-m-d H:i:s')." WARNING mkdir(".self::$logFiles[$type].") failed\n");
				} elseif (!($fds[$type] = fopen(self::$logFiles[$type], 'a'))) {
					$fds[$type] = fopen('php://stdout', 'w');
					fwrite($fds[$type], date('Y-m-d H:i:s')." WARNING fopen(".self::$logFiles[$type].") failed\n");
				}
			}
		} else {
			$fds[$type] = fopen('php://stdout', 'w');
		}
		$ret = fwrite($fds[$type], $msg);
		if (!$ret && self::$logFiles[$type]) {
			fclose($fds[$type]);
			$fds[$type] = fopen(self::$logFiles[$type], 'a');
			$ret = fwrite($fds[$type], $msg);
		}
		return true;
	}

	public static function createPidFile($pid=0)
	{
		if (!$pid) {
			$pid = posix_getpid();
		}
		if (file_exists(self::$pidFile)) {
			$fd = fopen(self::$pidFile, 'w');
			if (!$fd) {
				self::log(1, "fopen(".self::$pidFile.") failed");
				return false;
			}
		} else {
			if (!is_dir(dirname(self::$pidFile)) && !mkdir(dirname(self::$pidFile), 0755, true)) {
				self::log(1, "mkdir(".self::$pidFile.") failed");
				return false;
			} elseif (!($fd = fopen(self::$pidFile, 'w'))) {
				self::log(1, "fopen(".self::$pidFile.") failed");
				return false;
			}
		}
		if (!fwrite($fd, "$pid")) {
			self::log(1, "fwrite(".self::$pidFile.",$pid) failed");
			return false;
		}
		fclose($fd);
		return  true;
	}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: