並列データインポート

並列実行の可能性を可能にする特定のタスクセットがあるとします。 たとえば、指定された期間後にすべてのフィードを更新するRSSアグリゲーターを整理する必要があります。 リモートソースからデータをダウンロードするのに、メインであると同時にかなり具体的な時間が費やされることは明らかです。 これを考えると、テープの順次ロードによるこのようなインポートの編成は無意味であるため、多数のテープの場合、インポートは割り当てられた時間に収まりません。



この問題には2つの解決策があります。 1つ目は、CURLを使用してテープの並列ロードを実装することです。 例:



//

$rMultiHandler = curl_multi_init();

$aResources = array();

foreach ( $aFeedUrls as $sFeedUrl ) {

$rResource = curl_init();



curl_setopt($rResource, CURLOPT_RETURNTRANSFER, 1);

curl_setopt($rResource, CURLOPT_URL, $sFeedUrl );

curl_setopt($rResource, CURLOPT_FOLLOWLOCATION, true );

curl_setopt($rResource, CURLOPT_TIMEOUT, 60);



curl_multi_add_handle( $multi_handler, $rResource );

$aResources[] = array(

'url' => $sFeedUrl,

'client' => $rResource

);

}



// CURL

$iRunningProcesses = null ;

do {

usleep( 1000000 );

curl_multi_exec( $rMultiHandler, $iRunningProcesses );

} while ( $iRunningProcesses > 0 );



//

foreach ( $aResources as $aResource ) {

$aHeaders = curl_getinfo( $aResource[ 'client' ] );

$sBody = curl_multi_getcontent( $aResource[ 'client' ] );

}












このオプションは、テープのロードを待機している間のダウンタイムの問題を部分的に解決しますが、テープのさらなる分析はシーケンシャルモードで行う必要があります。 さらに、対象モデルでテープが特定のクラスのオブジェクトと見なされる場合、あまり便利ではありません。



2番目のオプションは、それぞれテープ上に子プロセスのセット(プール)を作成することです。 これは、たとえばproc_ *ファミリーの関数を使用して実行できます。 サーバーの負荷を制御するために、同時に実行されるプロセスのセット(プールサイズ)を特定の数に制限することも合理的です(原則、このステートメントは最初のオプションにも当てはまります)。 これを行うには、プールのステータスを監視し、プール内のプロセスが完了すると新しいプロセスを追加するディスパッチャをエミュレートする必要があります。



以下は、RSSフィードをインポートするタスクを並列実行するためのプール実装の自己文書化された例です。



/**

*

*/

class Import {



/**

*

* @var int

*/

const POOL_SIZE = 10;



/**

*

* @var int

*/

const POOL_PROC_EXEC_TIME = 180;



/**

*

*/

public function startPool() {



file_put_contents( 'import.log' , "[*] " .

PHP_EOL, FILE_APPEND );



//

$iSuccess = 0;

$iFailure = 0;

$iUpdated = 0;



// ,

$aFeedId = array( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,

13, 14, 15 );



//

$aPool = array();

for ( $iIter = 0; $iIter < self::POOL_SIZE && !empty( $aFeedId );

$iIter++ ) {

$iFeedId = array_shift( $aFeedId );



$ this ->startProcess( $aPool, $iFeedId );

}



//

while ( !empty( $aPool ) ) {



// (1 )

usleep(1000000);



//

foreach ( $aPool as $iKey => &$aProcess ) {



//

$aProcStatus = proc_get_status( $aProcess[ 'handler' ] );



//

if ( false === $aProcStatus[ 'running' ] ) {



//

$iResponse = fgets( $aProcess[ 'pipes' ][1] );



//

fclose( $aProcess[ 'pipes' ][1] );

fclose( $aProcess[ 'pipes' ][2] );

proc_close( $aProcess[ 'handler' ] );



//

// www.php.net/manual/en/function.proc-get-status.php#92145

if ( 0 === $aProcStatus[ 'exitcode' ]

&& is_numeric( $iResponse ) ) {

$iSuccess++;

$iUpdated += $iResponse;



//

} else

$iFailure++;



//

unset( $aPool[ $iKey ] );

if ( !empty( $aFeedId ) ) {

$iFeedId = array_shift( $aFeedId );



$bIsLaunched = $ this ->startProcess( $aPool,

$iFeedId );

if ( !$bIsLaunched )

$iFailure++;

}



//

} else {



//

if ( time() - $aProcess[ 'iTimeStart' ] >

self::POOL_PROC_EXEC_TIME ) {

file_put_contents( 'import.log' , "[!] " .

" {$aProcess['iFeedId']} " .

" " . PHP_EOL, FILE_APPEND );

$iSingnalCode = 15;

proc_terminate( $aProcess[ 'handler' ], $iSingnalCode );

}

}

}

unset( $aProcess );

}



file_put_contents( 'import.log' , "[*] : " .

" {$iSuccess}, {$iFailure}, " .

" {$iUpdated}" . PHP_EOL, FILE_APPEND );

}



/**

*

* @param array $aPool

* @param int $iFeedId

*/

public function startProcess( array &$aPool, $iFeedId ) {



//

// www.php.net/manual/en/function.proc-get-status.php#93382

$sCmd = "exec php -f " . __FILE__ . " {$iFeedId}" ;



$aDescriptors = array(

1 => array( "pipe" , "w" ),

2 => array( "pipe" , "w" )

);

$aPipes = array();



//

$bSuccess = true ;

$rProcess = proc_open( $sCmd, $aDescriptors, $aPipes );

if ( is_resource( $rProcess ) ) {

$aPool[] = array(

'handler' => $rProcess,

'pipes' => $aPipes,

'iFeedId' => $iFeedId,

'iTimeStart' => time()

);

} else {

$bSuccess = false ;

file_put_contents( 'import.log' , "[!] " .

" {$iFeedId}" , FILE_APPEND );

}



return $bSuccess;

}



/**

*

* @param $iFeedId

* @return int

*/

public function doImport( $iFeedId ) {



file_put_contents( 'import.log' , "[+] {$iFeedId}" .

PHP_EOL, FILE_APPEND);



//

$iExecTime = rand( 1, 10 );

usleep( $iExecTime * 1000000 );

$iUpdated = rand( 0,10 );



file_put_contents( 'import.log' , "[-] {$iFeedId}" .

" {$iExecTime} " . PHP_EOL, FILE_APPEND);



//

echo $iUpdated;



return $iUpdated;

}

}



/**

*

*/

$oImport = new Import();



//

if ( 1 === $argc ) {

$oImport->startPool();



//

} else {

$iFeedId = $argv[1];

$oImport->doImport( $iFeedId );

}












このスクリプトの結果は次のログになります。

[*]

[+] 1

[+] 2

[+] 3

[+] 4

[+] 5

[+] 8

[+] 6

[+] 7

[+] 9

[+] 10

[-] 7 1

[+] 11

[-] 1 5

[+] 12

[-] 10 5

[-] 2 6

[-] 12 1

[+] 13

[+] 14

[-] 3 7

[-] 6 7

[+] 15

[-] 9 7

[-] 14 1

[-] 11 6

[-] 4 9

[-] 5 10

[-] 8 10

[-] 13 7

[-] 15 6

[*] : 15, 0, 89









この方法は戦闘条件でテストされており、現時点では苦情は発生していません。



All Articles