みなさんこんにちは!
Habrには、PHPでデーモンを作成する方法やその他の分岐したものに関する記事がたくさんありました。 同様の、しかしやや異なるトピックでの私の開発を共有したいと思います- 複数のPHPプロセスを管理します 。
まず、記事で使用されている用語の小さな用語集 。
- ジョブは、別のプロセスで実行されるタスクです。 コンソールに「php test.php」と入力します-これが仕事です。
- Job ManagerまたはProcess Managerは、タスクを管理するプロセスです。 出力を収集して処理し、入力にメッセージを送信できます。
タスクの目標は 、すでに実行中の作業プロセスに影響を与え、その実装の進捗に関する情報を取得できるようにすることです。
新しいプロセスを開始するには、 proc_open関数を使用します。これにより、新しいプロセスのI / O記述子を再定義できます。 単一のプロセスを管理するために、Jobクラスが開発されました。 作業は、名前と実行されたチームによって特徴付けられます。
class Job { protected $_pid = 0; protected $_name; protected $_cmd = ''; protected $_stderr = '/dev/null'; private $_resource = NULL; private $_pipes = array(); private $_waitpid = TRUE; public function __construct($cmd, $name = 'job') { $this->_cmd = $cmd; $this->_name = $name; } public function __destruct() { // if ($this->_resource) { if ($this->_waitpid && $this->isRunning()) { echo "Waiting for job to complete "; $status = NULL; pcntl_waitpid($this->_pid, $status); /*while ($this->isRunning()) { echo '.'; sleep(1); }*/ echo "\n"; } } // if (isset($this->_pipes) && is_array($this->_pipes)) { foreach (array_keys($this->_pipes) as $index ) { if (is_resource($this->_pipes[$index])) { fflush($this->_pipes[$index]); fclose($this->_pipes[$index]); unset($this->_pipes[$index]); } } } // if ($this->_resource) { proc_close($this->_resource); unset($this->_resource); } } public function pid() { return $this->_pid; } public function name() { return $this->_name; } // "". $nohup private function readPipe($index, $nohup = FALSE) { if (!isset($this->_pipes[$index])) return FALSE; if (!is_resource($this->_pipes[$index]) || feof($this->_pipes[$index])) return FALSE; if ($nohup) { $data = ''; while ($line = fgets($this->_pipes[$index])) { $data .= $line; } return $data; } while ($data = fgets($this->_pipes[$index])) { echo $data; } } public function pipeline($nohup = FALSE) { return $this->readPipe(1, $nohup); } public function stderr($nohup = FALSE) { return $this->readPipe(2, $nohup); } // public function execute() { // $descriptorspec = array( 0 => array('pipe', 'r'), // stdin 1 => array('pipe', 'w'), // stdout 2 => array('pipe', 'w') // stderr ); $this->_resource = proc_open('exec '.$this->_cmd, $descriptorspec, $this->_pipes); // stream_set_blocking($this->_pipes[0], 0); stream_set_blocking($this->_pipes[1], 0); stream_set_blocking($this->_pipes[2], 0); if (!is_resource($this->_resource)) return FALSE; $proc_status = proc_get_status($this->_resource); $this->_pid = isset($proc_status['pid']) ? $proc_status['pid'] : 0; } public function getPipe() { return $this->_pipes[1]; } public function getStderr() { return $this->_pipes[2]; } public function isRunning() { if (!is_resource($this->_resource)) return FALSE; $proc_status = proc_get_status($this->_resource); return isset($proc_status['running']) && $proc_status['running']; } // public function signal($sig) { if (!$this->isRunning()) return FALSE; posix_kill($this->_pid, $sig); } // STDIN public function message($msg) { if (!$this->isRunning()) return FALSE; fwrite($this->_pipes[0], $msg); } }
ジョブを管理するために、Job_Managerクラスが作成されます。これは、基本的にスキーム全体のキーです。
class Job_Manager { private $_pool_size = 20; private $_pool = array(); private $_streams = array(); private $_stderr = array(); private $_is_terminated = FALSE; protected $_dispatch_function = NULL; public function __construct() { // init pool // } public function __destruct() { // destroy pool foreach (array_keys($this->_pool) as $index) { $this->stopJob($index); } } // private function checkJobs() { $running_jobs = 0; foreach ($this->_pool as $index => $job) { if (!$job->isRunning()) { echo "Stopping job ".$this->_pool[$index]->name()." ($index)" . PHP_EOL; $this->stopJob($index); } else { $running_jobs++; } } return $running_jobs; } private function getFreeIndex() { foreach ($this->_pool as $index => $job) { if (!isset($job)) return $index; } return count($this->_pool) < $this->_pool_size ? count($this->_pool) : -1; } // public function startJob($cmd, $name = 'job') { // broadcast existing jobs $this->checkJobs(); $free_pool_slots = $this->_pool_size - count($this->_pool); if ($free_pool_slots <= 0) { // output error "no free slots in the pool" return -1; } $free_slot_index = $this->getFreeIndex(); if ($free_slot_index < 0) { return -1; } echo "Starting job $name ($free_slot_index)" . PHP_EOL; $this->_pool[$free_slot_index] = new Job($cmd, $name); $this->_pool[$free_slot_index]->execute(); $this->_streams[$free_slot_index] = $this->_pool[$free_slot_index]->getPipe(); $this->_stderr[$free_slot_index] = $this->_pool[$free_slot_index]->getStderr(); return $free_slot_index; } public function stopJob($index) { if (!isset($this->_pool[$index])) return FALSE; unset($this->_streams[$index]); unset($this->_stderr[$index]); unset($this->_pool[$index]); } public function name($index) { if (!isset($this->_pool[$index])) return FALSE; return $this->_pool[$index]->name(); } public function pipeline($index, $nohup = FALSE) { if (!isset($this->_pool[$index])) return FALSE; return $this->_pool[$index]->pipeline($nohup); } public function stderr($index, $nohup = FALSE) { if (!isset($this->_pool[$index])) return FALSE; return $this->_pool[$index]->stderr($nohup); } private function broadcastMessage($msg) { // sends selected signal to all child processes foreach ($this->_pool as $pool_index => $job) { $job->message($msg); } } private function broadcastSignal($sig) { // sends selected signal to all child processes foreach ($this->_pool as $pool_index => $job) { $job->signal($sig); } } // - protected function dispatch($cmd) { if (is_callable($this->_dispatch_function)) { call_user_func($this->_dispatch_function, $cmd); } } // public function registerDispatch($callable) { if (is_callable($callable)) { $this->_dispatch_function = $callable; } else { trigger_error("$callable is not callable func", E_USER_WARNING); } } // private function dispatchMain($cmd) { $parts = explode(' ', $cmd); $arg = isset($parts[0]) ? $parts[0] : ''; $val = isset($parts[1]) ? $parts[1] : ''; switch ($arg) { case "exit": $this->broadcastSignal(SIGTERM); $this->_is_terminated = TRUE; break; case "test": echo 'sending test' . PHP_EOL; $this->broadcastMessage('test'); $this->broadcastSignal(SIGUSR1); break; case 'kill': $pool_index = $val !== '' && (int)$val >= 0 ? (int)$val : -1; if ($pool_index >= 0 && isset($this->_pool[$pool_index])) { $this->_pool[$pool_index]->signal(SIGKILL); } break; default: $this->dispatch($cmd); break; } return FALSE; } public function process() { stream_set_blocking(STDIN, 0); $write = NULL; $except = NULL; while (!$this->_is_terminated) { /* - stream_select */ $read = $this->_streams; $except = $this->_stderr; $read[$this->_pool_size] = STDIN; if (is_array($read) && count($read) > 0) { if (false === ($num_changed_streams = stream_select($read, $write, $except, 2))) { // oops } elseif ($num_changed_streams > 0) { // if (is_array($read) && count($read) > 0) { $cmp_array = $this->_streams; $cmp_array[$this->_pool_size] = STDIN; foreach ($read as $resource) { $pool_index = array_search($resource, $cmp_array, TRUE); if ($pool_index === FALSE) continue; if ($pool_index == $this->_pool_size) { // stdin $content = ''; while ($cmd = fgets(STDIN)) { if (!$cmd) break; $content .= $cmd; } $content = trim($content); if ($content) { // Process Manager - - $this->dispatchMain($content); } //echo "stdin> " . $cmd; } else { // $pool_content = $this->pipeline($pool_index, TRUE); $job_name = $this->name($pool_index); if ($pool_content) { echo $job_name ." ($pool_index)" . ': ' . $pool_content; } $pool_content = $this->stderr($pool_index, TRUE); if ($pool_content) { echo $job_name ." ($pool_index)" . ' [STDERR]: ' . $pool_content; } } } } } } $this->checkJobs(); } } }
いくつかの抽象タスクを管理する方法を既に学習しましたが、実行可能プロセス自体のクラスを実装することは残っています。
class Executable { protected $_is_terminated = FALSE; protected $_cleanup_function = NULL; public function __construct() { // pcntl_signal(SIGTERM, array('Executable', 'signalHandler')); pcntl_signal(SIGHUP, array('Executable', 'signalHandler')); pcntl_signal(SIGINT, array('Executable', 'signalHandler')); pcntl_signal(SIGUSR1, array('Executable', 'signalHandler')); pcntl_signal(SIGUSR2, array('Executable', 'signalHandler')); stream_set_blocking(STDIN, 0); stream_set_blocking(STDOUT, 0); stream_set_blocking(STDERR, 0); } public function __destruct() { //echo "destructor called in " . get_class($this) . PHP_EOL; if (!$this->_is_terminated) { $this->_is_terminated = TRUE; $this->isTerminated(); } } // - private function cleanup() { if (is_callable($this->_cleanup_function)) { call_user_func($this->_cleanup_function); } } protected function registerCleanup($callable) { if (is_callable($callable)) { $this->_cleanup_function = $callable; } else { trigger_error("$callable is not callable func", E_USER_WARNING); } } protected function isTerminated() { pcntl_signal_dispatch(); if ($this->_is_terminated) { $this->cleanup(); } return $this->_is_terminated; } protected function dispatch($cmd) { // /* switch ($cmd) { } */ } protected function checkStdin() { $read = array(STDIN); $write = NULL; $except = NULL; if (is_array($read) && count($read) > 0) { if (false === ($num_changed_streams = stream_select($read, $write, $except, 2))) { // oops } elseif ($num_changed_streams > 0) { if (is_array($read) && count($read) > 0) { // stdin $content = ''; while ($cmd = fgets(STDIN)) { if (!$cmd) break; $content .= $cmd; } $this->dispatch($content); echo "recieved $content"; //echo "stdin> " . $cmd; } } } } // protected function signalHandler ($signo) { switch ($signo) { case SIGTERM: case SIGHUP: case SIGINT: $this->_is_terminated = TRUE; //echo "exiting in ".get_class($this)."...\n"; break; case SIGUSR1: //echo "SIGUSR1 recieved\n"; $this->checkStdin(); break; case SIGUSR2: $this->_is_terminated = TRUE; echo "[SHUTDOWN] in " . get_class($this) . PHP_EOL; flush(); exit(1); break; default: // handle all other signals break; } } }
プロセスマネージャの使用例として、「スリープ」プロセスを実装します。これは、STDOUTでスリープし、サブスクライブ解除するスクリプトです。
sleep.php
class SleeperTest extends Executable { public function sleep() { for($i = 0; !$this->isTerminated() && $i < 10; $i++) { ob_start(); echo $i . "\n"; ob_end_flush(); sleep(5); } } } $s = new SleeperTest; $s->sleep();
pm.php
$pm = new Job_Manager; $pm->startJob('php sleep.php', 'sleeper1'); $pm->startJob('php sleep.php', 'sleeper2'); // $pm->process();
実装で使用されるノンブロッキング記述子とstream_select関数を使用すると、あらゆる種類のデーモンに典型的な問題(アイドルサイクルでのCPU使用率が高くなる)を回避できます。 提案された方法にはこの欠点がなく、すべてがスムーズかつ穏やかに機能します。
更新 クラスソースをgithub https://github.com/xzag/php-pmに投稿しました