PHP並列プログラム
以前は、トピックタイトルは「マルチスレッドPHPプログラムの作成」でした。 PHPには、複数のコア/プロセッサを使用するアプリケーションを記述する「通常の」方法が1つあります。これはfork()です。 PHPでのfork()システムコールのアプリケーションとpcntl拡張機能について説明します。 例として、grepのかなり高速な並列実装を作成します(
find . -type f -print0 | xargs -0 -P $NUM_PROCS grep $EXPR
と同様の速度
find . -type f -print0 | xargs -0 -P $NUM_PROCS grep $EXPR
)。
実装
PHPでこのシステムコールを実装するのは非常に簡単です。
PHP_FUNCTION(pcntl_fork) { pid_t id; id = fork(); if (id == -1) { PCNTL_G(last_error) = errno; php_error_docref(NULL TSRMLS_CC, E_WARNING, "Error %d", errno); } RETURN_LONG((long) id); }
fork()システムコールとは
* nixシステムのfork()システムコールは、現在のプロセスの完全なコピーを作成するシステムコールです。 fork()システムコールはその値を2回返します。親は子のPIDを受け取り、子は0を受け取ります。奇妙なことに、多くの場合、これは複数のCPUを使用するアプリケーションを書くのに十分です。
$ php -r '$pid = pcntl_fork(); echo posix_getpid() . ": Fork returned $pid\n";' 9545: Fork returned 9546 9546: Fork returned 0
fork()使用時の落とし穴
実際、fork()は、ユーザープロセスがメモリに持っているものを考えずに仕事をします。たとえば、atexit(register_shutdown_function)を介して登録された関数など、すべてをコピーします。 例:
$ php -r 'register_shutdown_function(function() { echo "Exited!\n"; }); pcntl_fork();' Exited! Exited!
残念ながら、スクリプトの最後で、PHPはデストラクタ(データベース接続リソースの内部デストラクタを含む)を呼び出します。 mysqli拡張の例:
<?php /* test.php */ $conn = new mysqli(..., "mysql") or die("Cannot connect\n"); $pid = pcntl_fork(); if ($pid > 0) { echo "Parent exiting\n"; exit(0); } echo "Sending query\n"; $res = $conn->query("SHOW TABLES") or die("Cannot get query result\n"); print_r($res->fetch_all()); /* $ php test.php Parent exiting Sending query Warning: mysqli::query(): MySQL server has gone away in test.php on line 9 Warning: mysqli::query(): Error reading result set's header in test.php on line 9 Cannot get query result */
プログラムの出力は、必ずしも記述されているとおりではありません。 子は、親の接続を閉じる手順が実行される前に「時間があり」、すべてが正常に機能する場合があります。
関数/デストラクタの遅延実行の処理
実際、遅延実行の問題は、必要なものを正確に知っていれば解決できます。 たとえば、Cには関数_exit()があり 、インストールされたハンドラーを開始せずに終了します。 残念ながら、PHPにはそのような関数はありませんが、その動作は信号を使用して部分的にエミュレートできます。
function _exit() { posix_kill(posix_getpid(), SIGTERM); }
この「ハック」は、2つのPHPプロセスでデータベースへの接続を同時にアクティブに保つのに十分ですが、もちろん実際にはこれを行わない方が良いでしょう:):
<?php /* test.php */ $conn = new mysqli(..., "mysql") or die("Cannot connect\n"); function _exit() { posix_kill(posix_getpid(), SIGTERM); } function show_tables() { global $conn; echo "Sending query\n"; $res = $conn->query("SHOW TABLES") or die("Cannot get query result\n"); echo "Tables count: " . $res->num_rows . "\n"; } $pid = pcntl_fork(); if ($pid > 0) { show_tables(); _exit(); } sleep(1); show_tables(); /* $ php test.php Sending query Tables count: 24 Terminated: 15 <--- $ Sending query Tables count: 24 */
grepを書く
たとえば、現在のディレクトリでマスクで検索するgrepの簡単なバージョンを作成しましょう。
<?php /* : $ php grep.php argv ./grep.php:$pattern = "/$argv[1]/m"; */ exec("find . -type f", $files, $retval); // $pattern = "/$argv[1]/m"; foreach($files as $file) { $fp = fopen($file, "rb"); // , $is_binary = strpos(fread($fp, 1024), "\0") !== false; fseek($fp, 0); if ($is_binary) { if (preg_match($pattern, file_get_contents($file))) echo "$file: binary matches\n"; } else { while (false !== ($ln = fgets($fp))) if (preg_match($pattern, $ln)) echo "$file:$ln"; } fclose($fp); }
並列バージョンのgrepの作成
次に、このプログラムを並列化することにより、このプログラムを高速化する方法について考えてみましょう。 $ files配列(ファイルリスト)をいくつかの部分に分割し、これらの部分を個別に処理できることは簡単にわかります。 そして、ある種の大きなタスクのリストがある場合はすべての場合にこれを行うことができます。対応するプロセスの各N番目を取得して処理するだけです。 したがって、これについては多かれ少なかれ一般的な関数を記述します。
define('PROCESSES_NUM', 2); // function parallelForeach($arr, $func) { for ($proc_num = 0; $proc_num < PROCESSES_NUM; $proc_num++) { $pid = pcntl_fork(); if ($pid == 0) break; } if ($pid) { for ($i = 0; $i < PROCESSES_NUM; $i++) pcntl_wait($status); return; } // PROCESSES_NUM $l = count($arr); for ($i = $proc_num; $i < $l; $i += PROCESSES_NUM) $func($arr[$i]); exit(0); }
foreach()をparallelForeach関数の使用に置き換え、エラー処理を追加します:
完全なソース
<?php /* parallel-grep.php */ define('PROCESSES_NUM', 2); if ($argc != 2) { fwrite(STDERR, "Usage: $argv[0] <pattern>\n"); exit(1); } grep($argv[1]); function grep($pattern) { exec("find . -type f", $files, $retval); if ($retval) exit($retval); $pattern = "/$pattern/m"; if (false === preg_match($pattern, '123')) { fwrite(STDERR, "Incorrect regular expression\n"); exit(1); } parallelForeach($files, function($f) use ($pattern) { grepFile($pattern, $f); }); exit(0); } function grepFile($pattern, $file) { $fp = fopen($file, "rb"); if (!$fp) { fwrite(STDERR, "Cannot read $file\n"); return; } $binary = strpos(fread($fp, 1024), "\0") !== false; fseek($fp, 0); if ($binary) { if (preg_match($pattern, file_get_contents($file))) echo "$file: binary matches\n"; } else { while (false !== ($ln = fgets($fp))) { if (preg_match($pattern, $ln)) echo "$file:$ln"; } } fclose($fp); } function parallelForeach($arr, $func) { for ($proc_num = 0; $proc_num < PROCESSES_NUM; $proc_num++) { $pid = pcntl_fork(); if ($pid < 0) { fwrite(STDERR, "Cannot fork\n"); exit(1); } if ($pid == 0) break; } if ($pid) { for ($i = 0; $i < PROCESSES_NUM; $i++) { pcntl_wait($status); $exitcode = pcntl_wexitstatus($status); if ($exitcode) exit(1); } return; } $l = count($arr); for ($i = $proc_num; $i < $l; $i += PROCESSES_NUM) $func($arr[$i]); exit(0); }
PHP 5.3.10のソースコードでgrepの動作を確認します。
$ php ~/parallel-grep.php '^PHP_FUNCTION' | head ./ext/calendar/calendar.c:PHP_FUNCTION(cal_info) ./ext/calendar/calendar.c:PHP_FUNCTION(cal_days_in_month) ./ext/calendar/calendar.c:PHP_FUNCTION(cal_to_jd) ./ext/calendar/calendar.c:PHP_FUNCTION(cal_from_jd) ./ext/calendar/calendar.c:PHP_FUNCTION(jdtogregorian) ./ext/calendar/calendar.c:PHP_FUNCTION(gregoriantojd) ./ext/calendar/calendar.c:PHP_FUNCTION(jdtojulian) ./ext/calendar/calendar.c:PHP_FUNCTION(juliantojd) ./ext/calendar/calendar.c:PHP_FUNCTION(jdtojewish) ./ext/calendar/calendar.c:PHP_FUNCTION(jewishtojd) $ time php ~/parallel-grep.php '^PHP_FUNCTION' | wc -l 4056 real 0m2.073s user 0m3.265s sys 0m0.550s $ time grep -R '^PHP_FUNCTION' . | wc -l 4056 real 0m3.646s user 0m3.415s sys 0m0.209s $ time find . -type f -print0 | xargs -0 -P 2 grep '^PHP_FUNCTION' | wc -l 4056 real 0m1.895s user 0m3.247s sys 0m0.249s
動作します! PHPの並列プログラミングで最も一般的に使用されるパターンの1つ、タスクからのキューの並列処理について説明しました。 タスクがgrepの例のような分解を許可している場合、私の記事がマルチスレッドPHPアプリケーションの作成を恐れないようにするのに役立つことを願っています。 ありがとう