何らかの理由で、出会ったすべての選択肢が私に合わず、自分で決断をしなければなりませんでした。
次の一連の要件がありました。
- フォークの使用;
- 必要な拡張機能がない場合にインターフェイスを保存する同期モード。
- 子プロセスの再利用。
- プロセス間での完全なデータ交換。 つまり 引数で開始し、完了時に結果を取得します。
- 子プロセス間でイベントを交換する機能-操作中の「スレッド」とメインプロセス。
- 再利用を維持し、引数を渡し、結果を取得しながら、スレッドのプールを操作します。
- 実行時エラーの処理。
- 作業の実行、スレッドによる作業の待機、初期化のタイムアウト。
- 最大のパフォーマンス。
結果はAzaThreadライブラリです(古い名前はCThreadです)。
せっかちな人のために、すぐにソースにリンクします:
github.com/Anizoptera/AzaThread
説明
AzaThreadは、スレッドクラスを作成するためのシンプルなインターフェイスを提供します。 実際には非同期操作に個別のプロセスを使用しますが、心配する必要はありません。 スレッドからイベントを送信したり、結果を返したり、1つのスレッドを何度も使用して開始引数を渡したり、作業が異なるプロセスで行われているという事実に注意を払わずに、ホットケーキのようなタスクをかき集める16スレッドのプールを作成したりできます。
さらに、最適なスレッド数と、構成に合わせてプロセス間でデータを転送するオプションを選択することにより、さまざまなモードでライブラリのパフォーマンスを簡単にテストできます。
完全に機能させるには、 libevent 、 posix 、およびpcntlの拡張機能が必要です。
ライブラリは、LibEventとペアのソケットを使用してプロセス間で通信します。 5つのデータ転送オプション(引数、結果、イベントデータ)をサポート!
パフォーマンスデータとともにオプションをすぐに提供します。 Intel Core i7 2600K 3.40 Ghz(VMware仮想マシンのUbuntu 11.04)上の8つのスレッドのプールでテストされました。 jpsでのテストの10回の繰り返しの平均結果が表示されます(1秒あたりのジョブ数-1秒あたりの引数を受け取り、データを返すタスクの数)。
いや | jps | 説明 |
---|---|---|
1 | 6501 | 同じソケットを介したシリアル化されたデータ転送。 デフォルトのオプション。 |
2 | 6625 | 同じことですが、igbinaryシリアル化を使用します(最も生産的なオプション)。 igbinaryがインストールされている場合、デフォルトで使用されます。 |
3 | 6194 | System Vメモリキュー(sysvmsg) |
4 | 6008 | System V共有メモリ(sysvshm) |
5 | 6052 | 共有メモリ(shmop) |
ソケットを操作するための拡張子が自動的に選択されます。 利用可能な場合、 ソケット拡張が使用され、パフォーマンスが向上します。 それ以外の場合、 ストリームが呼び出されます 。
子プロセスは、利用可能なすべてのシグナルをリッスンします。 デフォルトでは、それらすべて(SIGWINCHとSIGINFOを除く)をシャットダウンする必要があります。 しかし、これは、信号の名前でストリームクラスにメソッドを作成することで簡単にオーバーライドできます。 たとえば、 sigWinch 。
親プロセスでは、デフォルトで、すべての信号もキャプチャされます。 これは、クラスのlistenMasterSignalsパラメーターをfalseに設定することで変更できます。 この場合、SIGCHLDのみが処理されます。 m <signal name>という静的メソッドを作成することで、独自のハンドラーを簡単に追加できます。 たとえば、 mSigTerm 。
何らかの理由で子プロセスが停止した場合、クラスは新しいタスクを開始するときに自動的に分岐します。 これは気付かれずに発生するため、まったく考える必要はありません。 エラーが発生した場合にインスタンスを再作成する必要はありません。
子プロセスは、時々親の存在をチェックします。 彼が突然死んだ場合、子供の自動は終了します。
スレッドまたはスレッドプールによって使用されるすべてのリソースは、デストラクタが呼び出されると自動的にクリアされます。 ただし、 cleanupメソッドを呼び出すことで、それらを強制的にクリーニングできます。 この場合、スレッド/プールは使用できなくなります。
標準設定では、クラスが作成されるとすぐに、ストリームは事前に初期化されます。 preforkパラメーターをfalseに設定すると、タスクの開始時にのみフォークが発生します。
一般に、カスタマイズ可能なパラメーターはかなりあります。 フォーク後の子プロセスの名前の変更(コンストラクターpNameパラメーター)、タスク実行時間のタイムアウト( timeoutWork )、タスクが子プロセスを待機する最大時間のタイムアウト( timeoutMaxWait )、事前初期化時間のタイムアウト( timeoutInit )、読み取りソケットのバッファーサイズ( pipeReadSize 、 pipeMasterReadSize )。
スレッドのマルチタスクを無効にできます( multitask )。 この場合、タスクが完了するたびに、子プロセスは終了し、次の開始のために再び分岐します。 これにより、パフォーマンスが著しく低下します。
コードはテストでカバーされ、詳細に文書化されています。使用例はexample.phpファイルで表示および実行できます。
より複雑なエラー処理の例は、単体テストコードで見ることができます。
正確に何がどこで発生しているかについての非常に詳細な情報を表示するデバッグモードがあります。
使用例
主な機能は、最大限のシンプルさです。 別の「スレッド」で何かを実行したいだけなら、次のコードで十分です。
class ExampleThread extends Thread { protected function process() { // Some work here } } $thread = new ExampleThread(); $thread->wait()->run();
完全な作業に必要なものがすべてある場合、タスクは非同期に実行されます。 そうでない場合、すべてが引き続き動作しますが、同期モードで動作します。
パラメーターを渡して結果を取得すると、コードはもう少し複雑になります。
class ExampleThread extends Thread { protected function process() { return $this->getParam(0); } } $thread = new ExampleThread(); $thread->wait()->run(123); $result = $thread->wait()->getResult();
同様に、少し手を振って、ストリームからイベント処理を追加します。
class ExampleThread extends Thread { const EV_PROCESS = 'process'; protected function process() { $events = $this->getParam(0); for ($i = 0; $i < $events; $i++) { $event_data = $i; $this->trigger(self::EV_PROCESS, $event_data); } } } // . $additionalArgument = 123; $thread->bind(ExampleThread::EV_PROCESS, function($event_name, $event_data, $additional_arg) { // }, $additionalArgument); $events = 10; // , // , // preforkWait TRUE - $thread->wait(); $thread = new ExampleThread(); $thread->run($events)->wait();
最後に、エラー処理で8つのスレッドのプールを使用します。
$threads = 8 // $pool = new ThreadPool('ExampleThread', $threads); $num = 25; // $left = $num; // do { // // while ($pool->hasWaiting() && $left > 0) { // id $threadId = $pool->run(); $left--; } if ($results = $pool->wait($failed)) { foreach ($results as $threadId => $result) { // // // id ($threadId) $num--; } } if ($failed) { // . // // // foreach ($failed as $threadId) { $left++; } } } while ($num > 0); // . . $pool->cleanup();
性能試験結果
Ubuntu 11.04を搭載した2台のマシンでテストを実行しました。
最初-Intel Core i3 540 3.07 Ghz
2番目はIntel Core i7 2600K 3.40 Ghz(ubuntuはVMware仮想マシンにインストールされます)
生産性の成長を評価できるように、結果を簡単に示します。
繰り返しますが、これらはjpsでの10回のテストの繰り返し(1秒あたりのジョブ数-1秒あたりのタスク数)の平均結果です。
タスクとして、スレッドは次のゴミを実行します。
for ($i = 0; $i < 1000; $i++) { $r = mt_rand(0, PHP_INT_MAX) * mt_rand(0, PHP_INT_MAX); }
最初の結果は、同期操作(フォークなし)です。
すでに12のパフォーマンス低下が始まったため、最初の構成では18スレッドと20スレッドを試しませんでした。
スレッド数 | 最初の構成 | 第二 |
---|---|---|
0 | 553 | 763 |
1 | 330 | 669 |
2 | 580 | 1254 |
4 | 1015 | 2188 |
8 | 1040 | 2618 |
10 | 1027 | 2719 |
12 | 970 | 2739 |
16 | 958 | 2904 |
18 | - | 2830 |
20 | - | 2730 |
つまり、プロセッサによっては2〜4回以上パフォーマンスが向上します。
必要なパラメーターを使用して一連のテストを実行するコードは、ファイルexamples / speed_test.phpにあります。 そのため、パフォーマンスを簡単にテストし、最適なスレッド数を自分で選択できます。
さて、結論として、もう一度、ソースへのリンク、おそらく誰かが上から気づかなかった:
github.com/Anizoptera/AzaThread
ライブラリが誰にとっても有用であれば、私はとても幸せです。 機能のリクエストや検出されたバグはgithubに残しておくことができます。すぐにライブラリを修正して改善します。
UPD:
主要なライブラリの更新が発表され、CThreadからAzaThreadに名前が変更されました。 コメントでは、彼らは「C」で名前について激怒しました。 したがって、ライブラリはAzaThreadと呼ばれ、名前空間を使用し、PSR-0をサポートします:)
この点で、記事を少し修正しました-コード、名前、githubへのリンク。
UPD2:
AzaThreadにcomposerパッケージが追加されました 。 ライブラリは、Anizoptera CMFから他のオープンコンポーネントに移動しました 。 また、オープンコンポーネントの新しい開発と更新については、ブログAzaGroup.ruに書き込みます。