stream_select()を使用してタスクの実行を並列化します

PHPの一部のタスクを並行して実行できることを知っている人は多くありません。このため、フォークに頼る必要はありません。 PHP5にはストリーム関数があり、その中にはstream_select()があります。



Cameron Lairdの記事(http://www.ibm.com/developerworks/opensource/library/os-php-multitask/index.html?S_TACT=105AGX44&S_CMP=EDU)を読むと、これを行っていないすべての人がこのテクニックを学ぶことができます。 、このトピックでは、並列並列化に関与する単純な小さなクラスのParastreamsと、ストリームから受信したデータをどう処理するかについて、データハンドラーを指定して自分で決定します。







技術の範囲:

複数のソケットからネットワーク経由でデータを取得する必要があります。 stream_select()を使用すると、最も遅いソケットからデータを受信する時間に等しい時間ですべてのソケットからデータを受信します(従来のアプローチでは、合計時間は各ソケットからデータを受信する時間の合計に等しくなります)。

pomで検索を使用するとします。 スフィンクス 。 stream_select()を使用すると、検索デーモンへの複数のクエリを強制的に並行して実行できます(もちろん、sphinxapiに負担をかける必要がありますが、それほど複雑なことはありません)。 これは、検索が検索デーモンへの2つのクエリにつながる場合に役立ちます(たとえば、投稿とコメントで検索します):これら2つのクエリはそれぞれ、インデックスが並行して実行されます-つまり、検索の最適化と高速化が行われます。



クラスコードは次のとおりです。



<?php



/**

* Parastreams PHP class:

* a simple tool for performing multiple tasks with PHP - simultaneously (in parallel).

*

* example of usage:

* $ps = new Parastreams();

*

* function parastreams_callback($data) {

* echo $data."\n";

* }

*

* $s = stream_socket_client("localhost:80", $errno,

* $errstr, 10,

* STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);

* fwrite($s, "GET /sleep.php?delay=1 HTTP/1.0\r\nHost: localhost\r\n\r\n");

* $ps->add($s, 'parastreams_callback');

* ... // repeat the above 5 lines as many times as you wish to, adding new streams to $ps.

* $ps->run();// process the streams

*

* Author: Victor Bolshov ( crocodile2u ( the at symbol here ) yandex.ru )

*

* License: use this script without any retrictions.

*

* Based on code by Cameron Laird, you may find his code here:

* www.ibm.com/developerworks/opensource/library/os-php-multitask/index.html?S_TACT=105AGX44&S_CMP=EDU

*

* PHP version used: PHP 5.3.0alpha1 (should be compatible with older versions of PHP5)

*/



class Parastreams {

/**

* streams served by this instance

* @var resource[]

*/

private $streams = array();

/**

* stream events listeners

* @var array

*/

private $listeners = array();

/**

* @var int

*/

private $timeout = 10;

/**

* Constructor

* @param array $arg when specified, add() is called and $arg is passed to add()

* @see add()

*/

function __construct($arg = null )

{

if ($arg)

{

$ this ->add($arg);

}

}

/**

* add new stream(s)

* @param array | resource $arg either a stream resource or an array like this:

* array(

* array(stream1, listener1),

* array(stream2, listener2),..

*)

* where streamN is a stream resource created with stream_socket_client(),

* and listenerN is a Closure object which is called once the stream becomes readable,

* with the only argument: string $data (the data read from the stream)

* @param callable $arg2 the listener to stream; matters only in case when the first arg is not an array

* @return void

* @throws ParastreamsException

*/

function add($arg1, $arg2 = null )

{

if (is_array($arg1))

{

foreach ($arg1 as $offset => $s)

{

if (! is_array($s))

{

throw new ParastreamsException( "Illegal input at offset " . $offset . " (not an array)" );

} elseif (count($s = array_values($s)) < 2) {

throw new ParastreamsException( "Illegal input at offset " . $offset . " (length is less then 2)" );

} elseif (! is_resource($s[0])) {

throw new ParastreamsException( "Illegal input at offset " . $offset . " (not a stream resource)" );

} elseif (! is_callable($s[1])) {

throw new ParastreamsException( "Illegal input at offset " . $offset . " (not a callable)" );

}



$ this ->addOne($s[0], $s[1]);

}

} elseif (is_resource($arg1)) {

if (! is_callable($arg2))

{

throw new ParastreamsException( "Argument 2 is expected to be a callable, " . gettype($arg2) . " given" );

}

$ this ->addOne($arg1, $arg2);

} else {

throw new ParastreamsException( "Argument 1 is expected to be a resource or an array, " . gettype($arg1) . " given" );

}

}

/**

* Start listening to stream events

* @return void

* @throws ParastreamsException

*/

function run()

{

while (count($ this ->streams))

{

$events = $ this ->streams;

if ( false === stream_select($events, $w = null , $e = null , $ this ->timeout))

{

throw new ParastreamsException( "stream_select() failed!" );

} elseif (count($events)) {

$ this ->processStreamEvents($events);

} else {

throw new ParastreamsException( "Time out!" );

}

}

}



/* Starting private methods */



private function processStreamEvents($events)

{

foreach ($events as $fp) {

$id = array_search($fp, $ this ->streams);



$ this ->invokeListener($fp);



fclose($fp);

unset($ this ->streams[$id]);

}

}

private function invokeListener($fp)

{

foreach ($ this ->listeners as $index => $spec) {

if ($spec[0] == $fp)

{

$data = "" ;

while (! feof($fp))

{

$data .= fread($fp, 1024);

}

call_user_func($spec[1], $data);

unset($ this ->listeners[$index]);

return ;

}

}

}

private function addOne($stream, $listener)

{

$ this ->streams[] = $stream;

$ this ->listeners[] = array($stream, $listener);

}

}



class ParastreamsException extends RuntimeException {}




* This source code was highlighted with Source Code Highlighter .








使用例(コメントにありますが、それでも):



test.php:

<?php



require_once 'Parastreams.php' ;



function parastreams_callback($data) {

echo $data. "\n" ;

};



$streams = array();

for ($i = 1; $i <= 3; ++$i) {

$s = stream_socket_client( "localhost:80" , $errno,

$errstr, 10,

STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);

fwrite($s, "GET /sleep.php?delay=" . $i . " HTTP/1.0\r\nHost: localhost\r\n\r\n" );

$streams[$i] = array($s, 'parastreams_callback' );

}



$ps = new Parastreams($streams);

$ps->run();




* This source code was highlighted with Source Code Highlighter .








この例では、完全を期すためにsleep.phpを使用しています。これは次のとおりです。



<?php



$delay = filter_input(INPUT_GET, 'delay' , FILTER_VALIDATE_INT);

if ($delay <= 0) {

$delay = 1;

}



sleep($delay);



echo "was sleeping for $delay seconds\n" ;




* This source code was highlighted with Source Code Highlighter .







All Articles