Yii2用のシンプルなデーモンシステム

この記事では、PHPのデーモンシステムの実装の主なニュアンスを明らかにし、Yii2コン​​ソールコマンドを悪魔化するように教えます。



過去3年間、私は1つのグループの企業向けに十分な規模の企業ポータルを開発および開発してきました。 私は、多くの人と同じように、ビジネスが必要とするタスクを解決するときにタイムアウトが収まらないという問題に遭遇しました。 30万行のExcelでレポートを作成し、1500文字のメーリングリストを送信します。 当然、そのようなタスクはバックグラウンドタスク、デーモン、およびcrontabで解決する必要があります。 記事の枠組みでは、王冠と悪魔の比較は行いません。このような問題を解決するために悪魔を選びました。 同時に、私たちにとって重要な要件は、それぞれバックエンド用にすでに記述されたすべてのものにアクセスできることでした。デーモンはYii2フレームワークの継続である必要があります。 同じ理由で、phpDaemonのような既製のソリューションは私たちに適合しませんでした。



カットの下で、私が得たYii2に悪魔を実装するための既製のソリューション。



PHPのデーモンのテーマは、うらやましい規則性で上昇します(1、2、3、 およびbadooの連中は、接続を失うことなくそれらを再起動します )。 おそらく私の自転車は 、人気のあるフレームワークで悪魔を実行する簡単な方法が役立つでしょう。



いくつかの基本



プロセスが悪魔になるためには、次のものが必要です。

  1. コンソールおよび標準入出力ストリームからスクリプトをアンバインドします。
  2. メインコードの実行を無限ループでラップします。
  3. プロセス制御メカニズムを実装します。


コンソールから取り外します


開始するには、標準ストリームSTDIN、STOUT、STDERRを閉じます。 しかし、PHPを使用しないと、最初のオープンストリーム標準になるため、/ dev / nullで開きます。



if (is_resource(STDIN)) { fclose(STDIN); $stdIn = fopen('/dev/null', 'r'); } if (is_resource(STDOUT)) { fclose(STDOUT); $stdOut = fopen('/dev/null', 'ab'); } if (is_resource(STDERR)) { fclose(STDERR); $stdErr = fopen('/dev/null', 'ab'); }
      
      





次に、プロセスをforkし、forkをメインプロセスにします。 ドナープロセス-完了。

 $pid = pcntl_fork(); if ($pid == -1) { $this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() rise error'); } elseif ($pid) { $this->halt(self::EXIT_CODE_NORMAL); } else { posix_setsid(); }
      
      





無限ループと制御


私はすべてがサイクルで理解されていると思います。 ただし、必要な制御メカニズムをより詳細に検討する必要があります。



すでに実行中のプロセスを修正する


ここではすべてが簡単です-デーモンを起動すると、PIDがその名前のファイルに格納され、作業の最後にこのファイルが爆発します。



POSIX信号処理


デーモンは、オペレーティングシステムからの信号を正しく処理する必要があります。 シグナルを受信すると、SIGTERMはスムーズにシャットダウンします。 これはいくつかのことで実現されます。まず、受信した信号を処理する関数を定義します。



 pcntl_signal(SIGTERM, ['MyClassName', 'mySignalHandlerFunction']);
      
      





次に、信号処理関数で、クラスの静的プロパティの割り当てをtrueに設定します。

 static function signalHandler($signo, $pid = null, $status = null) { self::$stopFlag = true; }
      
      





そして第三に、無限ループはそれほど無限ではないはずです。

 while (!self::$stopFlag) { pcntl_signal_dispatch(); }
      
      





PHPの異なるバージョンでの信号処理の機能
PHP <5.3.0では、特別な宣言ディレクティブ(ticks = N)が信号の配信使用されました。 tickは、declareブロック内のパーサーによって実行される低レベルの操作ごとに発生するイベントです。 信号の分配は設定に従って行われました。 値が小さすぎるとパフォーマンスが低下し、値が大きすぎると信号処理がタイミングよく行われません。



PHP> = 5.3.0では、 pcntl_signal_dispatch()関数が登場しました。この関数は、手動でシグナルを配信するために呼び出すことができます。これは各反復後に行います。

そして最後に、PHP 7.1では、シグナルの非同期配信が利用可能になり、オーバーヘッドや手動で関数を呼び出すことなく、ほぼ瞬時にシグナルを受信できるようになります。



これで、オペレーティングシステムからコマンドを受信すると、スクリプトは現在の反復を静かに終了し、ループを終了します。



メモリリーク監視


残念ながら、デーモンが再起動せずに長時間動作すると、メモリが流れ始めます。 リーク率は、使用する機能によって異なります。 私たちの実践から、最もひどく流れていたのは、標準のSoapClientクラスを介してリモートSOAPサービスと連携するデーモンでした。 したがって、これを監視し、定期的に再起動する必要があります。 サイクルを漏れ制御条件で補完します。



 while (!self::$stopFlag) { if (memory_get_usage() > $this->memoryLimit) { break; } pcntl_signal_dispatch(); }
      
      





Yiiのコードはどこにありますか?



ソースはGithub- yii2-daemonに投稿されており、パッケージはcomposerを介してインストールすることもできます。



パッケージは、基本クラスDaemonControllerとクラスWatcherDaemonControllerの2つの抽象クラスのみで構成されています。



デーモンコントローラー
 <?php namespace vyants\daemon; use yii\base\NotSupportedException; use yii\console\Controller; use yii\helpers\Console; /** * Class DaemonController * * @author Vladimir Yants <vladimir.yants@gmail.com> */ abstract class DaemonController extends Controller { const EVENT_BEFORE_JOB = "beforeJob"; const EVENT_AFTER_JOB = "afterJob"; const EVENT_BEFORE_ITERATION = "beforeIteration"; const EVENT_AFTER_ITERATION = "afterIteration"; /** * @var $demonize boolean Run controller as Daemon * @default false */ public $demonize = false; /** * @var $isMultiInstance boolean allow daemon create a few instances * @see $maxChildProcesses * @default false */ public $isMultiInstance = false; /** * @var $parentPID int main procces pid */ protected $parentPID; /** * @var $maxChildProcesses int max daemon instances * @default 10 */ public $maxChildProcesses = 10; /** * @var $currentJobs [] array of running instances */ protected static $currentJobs = []; /** * @var int Memory limit for daemon, must bee less than php memory_limit * @default 32M */ protected $memoryLimit = 268435456; /** * @var boolean used for soft daemon stop, set 1 to stop */ private static $stopFlag = false; /** * @var int Delay between task list checking * @default 5sec */ protected $sleep = 5; protected $pidDir = "@runtime/daemons/pids"; protected $logDir = "@runtime/daemons/logs"; private $stdIn; private $stdOut; private $stdErr; /** * Init function */ public function init() { parent::init(); //set PCNTL signal handlers pcntl_signal(SIGTERM, ['vyants\daemon\DaemonController', 'signalHandler']); pcntl_signal(SIGINT, ['vyants\daemon\DaemonController', 'signalHandler']); pcntl_signal(SIGHUP, ['vyants\daemon\DaemonController', 'signalHandler']); pcntl_signal(SIGUSR1, ['vyants\daemon\DaemonController', 'signalHandler']); pcntl_signal(SIGCHLD, ['vyants\daemon\DaemonController', 'signalHandler']); } function __destruct() { $this->deletePid(); } /** * Adjusting logger. You can override it. */ protected function initLogger() { $targets = \Yii::$app->getLog()->targets; foreach ($targets as $name => $target) { $target->enabled = false; } $config = [ 'levels' => ['error', 'warning', 'trace', 'info'], 'logFile' => \Yii::getAlias($this->logDir) . DIRECTORY_SEPARATOR . $this->getProcessName() . '.log', 'logVars' => [], 'except' => [ 'yii\db\*', // Don't include messages from db ], ]; $targets['daemon'] = new \yii\log\FileTarget($config); \Yii::$app->getLog()->targets = $targets; \Yii::$app->getLog()->init(); } /** * Daemon worker body * * @param $job * * @return boolean */ abstract protected function doJob($job); /** * Base action, you can\t override or create another actions * @return bool * @throws NotSupportedException */ final public function actionIndex() { if ($this->demonize) { $pid = pcntl_fork(); if ($pid == -1) { $this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() rise error'); } elseif ($pid) { $this->cleanLog(); $this->halt(self::EXIT_CODE_NORMAL); } else { posix_setsid(); $this->closeStdStreams(); } } $this->changeProcessName(); //run loop return $this->loop(); } /** * Set new process name */ protected function changeProcessName() { //rename process if (version_compare(PHP_VERSION, '5.5.0') >= 0) { cli_set_process_title($this->getProcessName()); } else { if (function_exists('setproctitle')) { setproctitle($this->getProcessName()); } else { \Yii::error('Can\'t find cli_set_process_title or setproctitle function'); } } } /** * Close std streams and open to /dev/null * need some class properties */ protected function closeStdStreams() { if (is_resource(STDIN)) { fclose(STDIN); $this->stdIn = fopen('/dev/null', 'r'); } if (is_resource(STDOUT)) { fclose(STDOUT); $this->stdOut = fopen('/dev/null', 'ab'); } if (is_resource(STDERR)) { fclose(STDERR); $this->stdErr = fopen('/dev/null', 'ab'); } } /** * Prevent non index action running * * @param \yii\base\Action $action * * @return bool * @throws NotSupportedException */ public function beforeAction($action) { if (parent::beforeAction($action)) { $this->initLogger(); if ($action->id != "index") { throw new NotSupportedException( "Only index action allowed in daemons. So, don't create and call another" ); } return true; } else { return false; } } /** *    * * @param string $actionID * * @return array */ public function options($actionID) { return [ 'demonize', 'taskLimit', 'isMultiInstance', 'maxChildProcesses', ]; } /** * Extract current unprocessed jobs * You can extract jobs from DB (DataProvider will be great), queue managers (ZMQ, RabbiMQ etc), redis and so on * * @return array with jobs */ abstract protected function defineJobs(); /** * Fetch one task from array of tasks * * @param Array * * @return mixed one task */ protected function defineJobExtractor(&$jobs) { return array_shift($jobs); } /** * Main Loop * * * @return boolean 0|1 */ final private function loop() { if (file_put_contents($this->getPidPath(), getmypid())) { $this->parentPID = getmypid(); \Yii::trace('Daemon ' . $this->getProcessName() . ' pid ' . getmypid() . ' started.'); while (!self::$stopFlag) { if (memory_get_usage() > $this->memoryLimit) { \Yii::trace('Daemon ' . $this->getProcessName() . ' pid ' . getmypid() . ' used ' . memory_get_usage() . ' bytes on ' . $this->memoryLimit . ' bytes allowed by memory limit'); break; } $this->trigger(self::EVENT_BEFORE_ITERATION); $this->renewConnections(); $jobs = $this->defineJobs(); if ($jobs && !empty($jobs)) { while (($job = $this->defineJobExtractor($jobs)) !== null) { //if no free workers, wait if ($this->isMultiInstance && (count(static::$currentJobs) >= $this->maxChildProcesses)) { \Yii::trace('Reached maximum number of child processes. Waiting...'); while (count(static::$currentJobs) >= $this->maxChildProcesses) { sleep(1); pcntl_signal_dispatch(); } \Yii::trace( 'Free workers found: ' . ($this->maxChildProcesses - count(static::$currentJobs)) . ' worker(s). Delegate tasks.' ); } pcntl_signal_dispatch(); $this->runDaemon($job); } } else { sleep($this->sleep); } pcntl_signal_dispatch(); $this->trigger(self::EVENT_AFTER_ITERATION); } \Yii::info('Daemon ' . $this->getProcessName() . ' pid ' . getmypid() . ' is stopped.'); return self::EXIT_CODE_NORMAL; } $this->halt(self::EXIT_CODE_ERROR, 'Can\'t create pid file ' . $this->getPidPath()); } /** * Delete pid file */ protected function deletePid() { $pid = $this->getPidPath(); if (file_exists($pid)) { if (file_get_contents($pid) == getmypid()) { unlink($this->getPidPath()); } } else { \Yii::error('Can\'t unlink pid file ' . $this->getPidPath()); } } /** * PCNTL signals handler * * @param $signo * @param null $pid * @param null $status */ final static function signalHandler($signo, $pid = null, $status = null) { switch ($signo) { case SIGINT: case SIGTERM: //shutdown self::$stopFlag = true; break; case SIGHUP: //restart, not implemented break; case SIGUSR1: //user signal, not implemented break; case SIGCHLD: if (!$pid) { $pid = pcntl_waitpid(-1, $status, WNOHANG); } while ($pid > 0) { if ($pid && isset(static::$currentJobs[$pid])) { unset(static::$currentJobs[$pid]); } $pid = pcntl_waitpid(-1, $status, WNOHANG); } break; } } /** * Tasks runner * * @param string $job * * @return boolean */ final public function runDaemon($job) { if ($this->isMultiInstance) { $this->flushLog(); $pid = pcntl_fork(); if ($pid == -1) { return false; } elseif ($pid !== 0) { static::$currentJobs[$pid] = true; return true; } else { $this->cleanLog(); $this->renewConnections(); //child process must die $this->trigger(self::EVENT_BEFORE_JOB); $status = $this->doJob($job); $this->trigger(self::EVENT_AFTER_JOB); if ($status) { $this->halt(self::EXIT_CODE_NORMAL); } else { $this->halt(self::EXIT_CODE_ERROR, 'Child process #' . $pid . ' return error.'); } } } else { $this->trigger(self::EVENT_BEFORE_JOB); $status = $this->doJob($job); $this->trigger(self::EVENT_AFTER_JOB); return $status; } } /** * Stop process and show or write message * * @param $code int -1|0|1 * @param $message string */ protected function halt($code, $message = null) { if ($message !== null) { if ($code == self::EXIT_CODE_ERROR) { \Yii::error($message); if (!$this->demonize) { $message = Console::ansiFormat($message, [Console::FG_RED]); } } else { \Yii::trace($message); } if (!$this->demonize) { $this->writeConsole($message); } } if ($code !== -1) { \Yii::$app->end($code); } } /** * Renew connections * @throws \yii\base\InvalidConfigException * @throws \yii\db\Exception */ protected function renewConnections() { if (isset(\Yii::$app->db)) { \Yii::$app->db->close(); \Yii::$app->db->open(); } } /** * Show message in console * * @param $message */ private function writeConsole($message) { $out = Console::ansiFormat('[' . date('dmY H:i:s') . '] ', [Console::BOLD]); $this->stdout($out . $message . "\n"); } /** * @param string $daemon * * @return string */ public function getPidPath($daemon = null) { $dir = \Yii::getAlias($this->pidDir); if (!file_exists($dir)) { mkdir($dir, 0744, true); } $daemon = $this->getProcessName($daemon); return $dir . DIRECTORY_SEPARATOR . $daemon; } /** * @return string */ public function getProcessName($route = null) { if (is_null($route)) { $route = \Yii::$app->requestedRoute; } return str_replace(['/index', '/'], ['', '.'], $route); } /** * If in daemon mode - no write to console * * @param string $string * * @return bool|int */ public function stdout($string) { if (!$this->demonize && is_resource(STDOUT)) { return parent::stdout($string); } else { return false; } } /** * If in daemon mode - no write to console * * @param string $string * * @return int */ public function stderr($string) { if (!$this->demonize && is_resource(\STDERR)) { return parent::stderr($string); } else { return false; } } /** * Empty log queue */ protected function cleanLog() { \Yii::$app->log->logger->messages = []; } /** * Empty log queue */ protected function flushLog($final = false) { \Yii::$app->log->logger->flush($final); } }
      
      







WatcherDaemonController
 <?php namespace vyants\daemon\controllers; use vyants\daemon\DaemonController; /** * watcher-daemon - check another daemons and run it if need * * @author Vladimir Yants <vladimir.yants@gmail.com> */ abstract class WatcherDaemonController extends DaemonController { /** * @var string subfolder in console/controllers */ public $daemonFolder = 'daemons'; /** * @var boolean flag for first iteration */ protected $firstIteration = true; /** * Prevent double start */ public function init() { $pid_file = $this->getPidPath(); if (file_exists($pid_file) && ($pid = file_get_contents($pid_file)) && file_exists("/proc/$pid")) { $this->halt(self::EXIT_CODE_ERROR, 'Another Watcher is already running.'); } parent::init(); } /** * Job processing body * * @param $job array * * @return boolean */ protected function doJob($job) { $pid_file = $this->getPidPath($job['daemon']); \Yii::trace('Check daemon ' . $job['daemon']); if (file_exists($pid_file)) { $pid = file_get_contents($pid_file); if ($this->isProcessRunning($pid)) { if ($job['enabled']) { \Yii::trace('Daemon ' . $job['daemon'] . ' running and working fine'); return true; } else { \Yii::warning('Daemon ' . $job['daemon'] . ' running, but disabled in config. Send SIGTERM signal.'); if (isset($job['hardKill']) && $job['hardKill']) { posix_kill($pid, SIGKILL); } else { posix_kill($pid, SIGTERM); } return true; } } } \Yii::error('Daemon pid not found.'); if ($job['enabled']) { \Yii::trace('Try to run daemon ' . $job['daemon'] . '.'); $command_name = $job['daemon'] . DIRECTORY_SEPARATOR . 'index'; //flush log before fork $this->flushLog(true); //run daemon $pid = pcntl_fork(); if ($pid === -1) { $this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() returned error'); } elseif ($pid === 0) { $this->cleanLog(); \Yii::$app->requestedRoute = $command_name; \Yii::$app->runAction("$command_name", ['demonize' => 1]); $this->halt(0); } else { $this->initLogger(); \Yii::trace('Daemon ' . $job['daemon'] . ' is running with pid ' . $pid); } } \Yii::trace('Daemon ' . $job['daemon'] . ' is checked.'); return true; } /** * @return array */ protected function defineJobs() { if ($this->firstIteration) { $this->firstIteration = false; } else { sleep($this->sleep); } return $this->getDaemonsList(); } /** * Daemons for check. Better way - get it from database * [ * ['daemon' => 'one-daemon', 'enabled' => true] * ... * ['daemon' => 'another-daemon', 'enabled' => false] * ] * @return array */ abstract protected function getDaemonsList(); /** * @param $pid * * @return bool */ public function isProcessRunning($pid) { return file_exists("/proc/$pid"); } }
      
      







デーモンコントローラー


これは、すべてのデーモンの親クラスです。 最小限の悪魔の例を次に示します。



 <?php namespace console\controllers\daemons; use vyants\daemon\DaemonController; class TestController extends DaemonController { /** * @param $job * * @return boolean */ protected function doJob($job) { //do some job return true; } /** * @return array */ protected function defineJobs() { return []; } }
      
      





defineJobs()関数は、実行する一連のタスクを返す必要があります。 デフォルトでは、配列を返すことが期待されています。 たとえばMongoCursorを返したい場合は、defineJobExtractor()をオーバーライドする必要があります。 doJob()関数は、入力用の1つのタスクを受け取り、それを使用して必要な操作を実行し、ソース内の特定のタスクを2回目に落ちないように満たす必要があります。



可能なパラメーターと設定:





接続損失の問題


fork()操作中に、親プロセスで確立された接続が子プロセスで機能しなくなります。 この問題を回避するために、すべての分岐の後、renewConnections()関数呼び出しが行われます。 デフォルトでは、この関数はYii :: $ app-> dbのみを再接続しますが、これをオーバーライドして、子プロセスで接続する必要がある他のソースを追加できます。



ロギング


デーモンは、標準のYiiロガーを再構成します。 デフォルトの動作に満足できない場合は、initLogger()関数をオーバーライドします。



WatcherDaemonController


これはほぼ完成した悪魔のオブザーバーです。 このデーモンのタスクは、他のデーモンを監視し、必要に応じてそれらを開始および停止することです。 2回起動することはできないため、crontabで安全に実行できます。 使用を開始するには、コンソール/コントローラーにデーモンフォルダーを作成し、次の形式のクラスを配置する必要があります。



 <?php namespace console\controllers\daemons; use vyants\daemon\controllers\WatcherDaemonController; /** * Class WatcherController */ class WatcherController extends WatcherDaemonController { protected $sleep = 10; /** * @return array */ protected function getDaemonsList() { return [ ['daemon' => 'daemons/test', 'enabled' => true] ]; } }
      
      





定義する必要がある関数は、getDaemonsList()の1つだけです。この関数は、監視するデーモンのリストを返します。 最も単純な形式では、コードに配線された配列ですが、この場合、その場でリストを変更することはできません。 デーモンのリストをデータベースまたは別のファイルに入れて、そこから毎回取得します。 この場合、ウォッチャーは自身を再起動せずにデーモンをオンまたはオフにできます。



おわりに



現在、電子メールメッセージの送信からレポートの生成、異なるシステム間でのデータの更新まで、さまざまなタスクを実行する50以上のデーモンがあります。



デーモンは、MySQL、RabbitMQ、リモートWebサービスなど、さまざまなタスクのソースで動作します。 フライトは正常です。

もちろん、PHPのデーモンをGoの同じデーモンと比較することはできません。 しかし、開発速度が速いこと、すでに記述されたコードを再利用できること、および別の言語でチームを学ぶ必要がないことは、短所を上回ります。



All Articles