サブプロセスでの非同期PHPスクリプトの実行





こんにちは、ハブロビテス。



今日は、PHP言語での非同期(並列)計算などの重要なことについてお話したいと思います。

PHP自体は、マルチスレッドであると主張したことのないスクリプト言語です。 しかし、多くの企業は単純に恐れており、これらのタスクにより適したプログラミング言語に移行したくないため、フォレストの奥に行くほど、開発者が直面する課題はより深刻になり、息を“む必要があります。 したがって、あなたは彼らが与えるもので作業する必要があります。

カットの下の詳細...





少し前に、かなり重要な作業が私の前にありました。

要するに、プロジェクトは、商品のコストを計算するために約20の非常に重いモジュールを実装しました。

これはすべていくつかのリレーショナルテーブルにかかっていました。各モジュールには独自の計算ルールなどが含まれていました。 しかし、これをすべてクライアントに提供することは単一のパッケージでした。 そして、それは迅速に行われなければなりませんでした。 とても速い。 キャッシュは節約されましたが、非常に限られた量で、技術的な要件を満たすにはまったく不十分です。



アルゴリズムは非常に単純でした。必要な引数を入力し、すべてのモジュールを配列にインスタンス化し、ループでこの全体を計算しました。 回答は単一のオブジェクトに収集され、後処理のためにクライアントに吐き出されました。



そのため、ある瞬間、チームと私は行き詰まり、新しいモジュールが処理時間を直線的に追加するのではなく、ある種の進行を増やすことに気付きました。



既に推測したかもしれませんが、何らかの方法でプロセスを並列化することが提案されました。 しかし、PHPを使用するのは簡単なことではありません。彼はこれをすぐに実行する方法を知らないからです。



さまざまな解決策が試されています。





残念ながら、結果として、彼らは何もしませんでした。 プロジェクトを削減することが決定されました。



しかし、私には解決策がなければならないので、質問は未解決のままです。 そして、それでも、メインスクリプトが生成するある種の「サブプロセス」について考えました(exec()関数に類似)。



それ以来、かなり多くの時間が経ちました。私は長い間プロジェクトを去りました。 しかし先週、非常に重要なタスクが1つありました。特定の方法で、特定のエンティティの現在の状態とその重いリレーショナル依存関係の一部を保護するスクリプトを記述することです。 このために、データを正しく準備してデータベースに保存する2つのクラスが使用されます。 問題は、そのようなオブジェクトが約2800個あることです。



PHP Fatal error: Allowed memory size of <over9000> bytes exhausted.
      
      





50エンティティのパッケージごとに、平均190 MBのメモリが消費され、新しいパッケージごとに使用されるメモリの数が増加しました。 RAMの使用に関する制限を完全にオフにすると、同じエラーとセグメンテーションフォールトが発生しました。



つまり とにかく、スクリプト内のRAMのオーバーフローを回避する方法を見つけ出し、それを「少し」高速化することが必要でした。 最初に、反復から反復までのメモリ消費が増加している理由を解明しようとしました。 シンフォニックなServiceContainerとEventDispatcherの機能から脚が成長していることが判明しました。 そこで、コンテナ全体がイベントにプッシュされ、これが再帰的に行われます。 正直に言うと、周りを回るのは面倒で、同僚はかなりエレガントなソリューションを提案しました。



Symfony2コンポーネントセットには、 Symfony Process Componentのようなすばらしいものがあります

この天才により、スクリプトの実行中にサブプロセスを生成し、CLIモードで実行できます(通常のコンソールコマンドのように)。



最初は、RAM使用量を制限するために、一度に1プロセスずつ「出芽」させようとしました。 しかし、その後、彼らはドックで、このことは非同期に動作できることを読みました。



試してみることにしました。 結果は次のようになります(以下はGitHubのサンプルリポジトリの例です。サブプロセス自体のロジックは非常に単純ですが、より重いです)。



メインコマンド
 <?php namespace Example\Command; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Process\Process; class MainCommand extends Command { protected function configure() { $this->setName('example:main') ->setDescription('Run example command with optional number of CPUs') ->addArgument('CPUs', null, 'number of working CPUs', 2); } protected function execute(InputInterface $input, OutputInterface $output) { $channels = []; $maxChannels = $input->getArgument('CPUs'); $exampleArray = $this->getExampleArray(); $output->writeln('<fg=green>Start example process</>'); while (count($exampleArray) > 0 || count($channels) > 0) { foreach ($channels as $key => $channel) { if ($channel instanceof Process && $channel->isTerminated()) { unset($channels[$key]); } } if (count($channels) >= $maxChannels) { continue; } if (!$item = array_pop($exampleArray)) { continue; } $process = new Process(sprintf('php index.php example:sub-process %s', $item), __DIR__ . '/../../../'); $process->start(); if (!$process->isStarted()) { throw new \Exception($process->getErrorOutput()); } $channels[] = $process; } $output->writeln('<bg=green;fg=black>Done.</>'); } /** * @return array */ private function getExampleArray() { $array = []; for ($i = 0; $i < 30; $i++) { $name = 'No' . $i; $x1 = rand(1, 10); $y1 = rand(1, 10); $x2 = rand(1, 10); $y2 = rand(1, 10); $array[] = $name . '.' . $x1 . '.' . $y1 . '.' . $x2 . '.' . $y2; } return $array; } }
      
      







サブプロセスコマンド
 <?php namespace Example\Command; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; class SubProcessCommand extends Command { protected function configure() { $this->setName('example:sub-process') ->setDescription('Run example sub-process command') ->addArgument('item'); } protected function execute(InputInterface $input, OutputInterface $output) { $items = explode('.', $input->getArgument('item')); $pointName = $items[0]; $x1 = $items[1]; $y1 = $items[2]; $x2 = $items[3]; $y2 = $items[4]; // Used for mocking heavy execution. $sum = 0; for ($i = 1; $i <= 30000000; $i++){ $sum += $i; } $distance = bcsqrt(pow(($x2 - $x1),2) + pow(($y2 - $y1),2)); $data = sprintf('Point %s: %s', $pointName, (string)$distance); file_put_contents(__DIR__.'/../../../output/Point'.$pointName , print_r($data, 1), FILE_APPEND); } }
      
      







index.php
 <?php require __DIR__ . '/vendor/autoload.php'; use Symfony\Component\Console\Application; $application = new Application(); $application->add(new \Example\Command\MainCommand()); $application->add(new \Example\Command\SubProcessCommand()); $application->run();
      
      









その結果、次の図のようになります。





率直に言って、私はそのような機会に非常に感銘を受けました。

この記事がお役に立てば幸いです。 追加の資料として、上記の例を実装したリポジトリへのリンクをここに残します。



リポジトリ



ご清聴ありがとうございました。 レビューとコメントをいただければ幸いです。



UPD

AlmazDelDiabloskvotに感謝します。

このソリューションは、Symfony Processコンポーネントのベースであるproc_open()



関数がプロジェクトで禁止されていない場合にのみ機能します



スクリーンショットのhtopを更新しました。 現在、プロセスに関するデータがあります。 ありがとうhell0w0rd



All Articles