コロと別のラウズコールバックの実装

CPANには、このような素晴らしいモジュールファミリ- コロがあります。 これらのモジュールを使用すると、コルチンを使用して真珠でプログラミングできます。



小さな紹介


いつでも、プログラム内のどこでも(たとえば、関数本体内またはサイクルの次の反復で)、現在の状態を完全に保存し、プログラム内の別のポイントに一時的に「切り替える」ことができると想像してください。 この「他の」ポイントでいくつかの有用な作業を行った後、戻って保存された状態を復元すると、すべての作業はこの「スイッチ」がまったくなかったかのように行われます。 もちろん、新しいポイントで発生した一般データのこれらの変更はカウントしません。 それぞれが残りの結果に依存しないいくつかの「重い」機能を備えており、同様の「スイッチ」はそれらの並列実行をシミュレートできます。 つまり、外部からは、関数が並列に実行されているように見えますが、実際には、各瞬間に、そのうちの1つだけの「ピース」が実行され、この「ピース」のサイズを決定します。 言い換えると、各関数は独自のスレッドで実行され、すべてのスレッドは(システム内の数に関係なく)1つのプロセッサコアのみを使用し、各スレッドが独自のプロセッサ時間を取得するために、すべてがこの時間を共有する必要があります。 真の並列性の欠如により、スレッドで発生した共有データのすべての変更は他のすべてのスレッドですぐに利用可能になり、スレッド間の切り替えの瞬間を指定するため、同期の必要性が大幅に削減されます(基本的に、外部リソースを操作する必要があります)

このすべておよびその他の多くは、Coroと呼ばれるモジュールファミリを使用して真珠に実装できます。 このファミリのメインモジュールを使用すると、別のスレッド(以下これらのスレッドをコロスレッドと呼びます)で機能またはコードブロックを実行できます。また、補助モジュールは同期ツール、メッセージキュー、イベントループとの統合などを追加します。



コロスレッドの作成


コロストリームを作成するには、次のいずれかの構造を使用する必要があります。



Coro::async { ... } @args;
      
      





ブロックとその引数の間にコンマがないことに注意してください。または



 my $coro = new Coro(\&code_ref, @args); $coro->ready();
      
      





最初のケースでは、ストリームはCoro::async



関数に渡されるブロックになり、ブロックの直後に指定された引数はブロック内で関数引数として( @_



を介し@_



)利用可能になります。 2番目のケースでは、関数へのリンクとこの関数の引数を使用してストリームを作成します。 2番目の構成は、作成されたスレッドへの参照を返し、 ready()



メソッドが呼び出されます。 これはまさに、2番目の構造と1番目の構造の主な違いです。作成されたスレッドは、準備キューに配置されるまで非アクティブになります(これについては以下で詳しく説明します)。

どちらの場合も、対応する関数またはコードブロックが実行される限り、スレッドは「存続」します。 ちなみに、プログラム自体も別のコロスレッド(メインスレッド)で実行されます。



コロストリームの切り替え


オペレーティングシステムの腸内のどこかで切り替えが行われるシステムスレッドとは異なり、コロスレッドを手動で切り替える必要があります。 最も明白な切り替えポイント(多かれ少なかれ明白な切り替えポイントを思いつくことができます):



2番目の場合、プロセッサは、データがネットワーク経由で到着するか、ディスクから読み取られない(およびネットワーク経由で転送されるか、ディスクに書き込まれる)まで使用されません。

Coroでコントロールを転送する方法は? 現在の状態を保存し、現在のコロストリームの実行を中断するには、静的schedule()



メソッドを使用する必要があります。さらに、このメソッドは、レディキューから次のコロストリームを抽出し、実行を開始します。 したがって、coroスレッド呼び出しschedule()



将来プロセッサ時間を再び取得できるようにするには、まずready()



メソッドを使用してready()



キューの最後に自分自身を配置する必要があります(または他のスレッドがそれを行う必要があります) ) 中断されたスレッドは、準備完了キューの最後に配置されるまでブロックされたままです(プロセッサ時間を受け取りません)。 他のアクティブなスレッドが作業を完了するまでにこれが発生しない場合、Coroはこれを検出してプログラムをクラッシュさせます。 ready()



schedule()



の呼び出しは非常に頻繁に使用されるため、Coroモジュールは便宜上、次の行のペアに相当するcede()



呼び出しを提供します。



 $Coro::current->ready(); Coro::schedule;
      
      





例を見てみましょう。
 #!/usr/bin/perl $| = 1; use strict; use warnings; use Coro; #  coro-   Coro::async Coro::async { my $thread_id = shift; #    coro- $Coro::current->desc("Thread #$thread_id"); for (my $i = 0; $i < 1_000_000; $i++) { if ($i % 1000 == 0) { print "$Coro::current->{desc} - Processed: $i items\n"; #   coro-   ready- $Coro::current->ready(); #      ready- Coro::schedule(); } } } 0; #       coro- sub my_thread { my $thread_id = shift; $Coro::current->desc("Thread #$thread_id"); for (my $i = 0; $i < 1_000_000; $i++) { if ($i % 1000 == 0) { print "$Coro::current->{desc} - Processed: $i items\n"; #     coro- Coro::cede(); } } } my @threads = (); for (my $thread_id = 1; $thread_id < 5; $thread_id++) { #   coro-   Coro::new() my $thread = new Coro(\&my_thread, $thread_id); #   coro-   ready- $thread->ready(); push @threads, $thread; } while (my $thread = shift @threads) { #   coro-   ,   coro-   $thread->join(); }
      
      





結果:

スレッド#0-処理済み:0アイテム
スレッド#1-処理済み:0アイテム
スレッド#2-処理済み:0アイテム
スレッド#3-処理済み:0アイテム
スレッド#4-処理済み:0アイテム
スレッド#0-処理済み:1000アイテム
スレッド#1-処理済み:1000アイテム
スレッド#2-処理済み:1000アイテム
スレッド#3-処理済み:1000アイテム
スレッド#4-処理済み:1000アイテム
 ...
スレッド#0-処理済み:999000アイテム
スレッド#1-処理済み:999000アイテム
スレッド#2-処理済み:999000アイテム
スレッド#3-処理済み:999000アイテム
スレッド#4-処理済み:999000アイテム




この例では、コロスレッドはさまざまな方法で作成され、プロセッサ時間をさまざまな方法で相互に送信します。 すべてのコロストリームは同じジョブを実行します-1000回の繰り返しごとに進捗状況を報告し、実行を中断します。最初にレディキューの最後に(明示的にまたはcede()



を使用してcede()



残りのコロストリームを処理する機会を与えます。 プログラムは、メインコロストリームが完了するまで動作を続け、メインコロストリームは、作成された5つのコロストリームのうち4つが終了するのを待っています( join()



メソッドを呼び出すと、コールが行われるコロストリームがブロックされますjoin()



このメソッドが呼び出されたコロスレッドが完了するまで)。



イベントループ統合


上記の例は、コロスレッドがプロセッサー時間を共有し、時間のかかる作業から休憩を取る方法を示しています。 上記のように、プロセッサ時間を共有する正当な理由は、ブロッキング操作(通常はI / O)を実行することでもあります。

多くのブロッキング操作を伴う効果的なプログラム操作の問題に直面した場合、通常、イベントループを使用してこの問題を解決します。 たとえば、ソケットを非ブロックモードにして「ウォッチャー」を「ハング」させ、ソケットの書き込みまたは読み取りの準備状況を監視し、タイムアウト操作を中断するタイマーを作成します。 関心のあるイベントが発生すると、イベントサイクルの腸から、対応する「ウォッチ」に関連付けられたコールバックが呼び出されます。 プロジェクトがより複雑になるにつれて、どのコールバック、いつ、なぜコールされているかを理解することがますます難しくなっています。 Coroを使用すると、状況は著しく改善され、プログラムコードはより線形で理解しやすくなります(純粋に私の意見)。

まず、Coroモジュールファミリには、コロストリームをイベントループに統合するための3つのモジュールがあります。これらは、 Coro :: AnyEventCoro :: EventおよびCoro :: EVです (以下のコードはCoro :: EV用です)。 イベントループをプログラムに統合するには、ループ自体(たとえば、メインスレッド)でループ自体を実行するだけです。



 Coro::async { EV::run() };
      
      





イベント処理の利便性のために、Coroモジュールは2つの便利な関数rouse_cb()



およびrouse_wait()



提供します。



したがって、以下のコードは同等です。



 # 1.   rouse_cb()  rouse_wait() my $timer = EV::timer(5, 5, sub { my ($watcher, $revents) = @_; print "Timer $wathcer: timeout\n"; }); #2.   rouse_cb()  rouse_wait() my $timer = EV::timer(5, 5, rouse_cb()); my ($watcher, $revents) = rouse_wait(); print "Timer $wathcer: timeout\n";
      
      







ラウズコールバックの別の実装


上記のコードはrouse_cb()



rouse_wait()



の完全な力を伝えませんが、実際のプロジェクトで作業するときにその理解が得られます。 それにもかかわらず、私にとっては、組み込みのrouse_cb()



コールバックのメインマイナスを見つけましたrouse_cb()



関数によって返されたコールバックを保存し、それをもう一度使用しようとすると(これは繰り返し操作で論理的です同じ仕事ですか?)、何も起こりません。 少なくとも1回呼び出されると、コールバックはその状態を保持し、このコールバックのその後のrouse_wait()



へのすべての呼び出しは、以前に保存された引数をすぐに返します。

したがって、私は、ラウズコールバックの実装を記述することにしました。 この実装では、コールバックはオブジェクトであり、 rouse_wait()



関数の代わりに、 rouse_wait()



wait()



メソッドが使用されます。



 my $cb = new My::RouseCallback; my $timer = EV::timer(5, 5, $cb); my ($watcher, $revents) = $cb->wait(); print "Timer $wathcer: timeout\n";
      
      





My :: RouseCallbackの実装
 package My::RouseCallback; use strict; use warnings; use Coro; # ""      My::RouseCallback my %STORAGE = (); #  : my $cb = new My::RouseCallback; sub new { my ($class) = @_; my $context = {args => [], done => 0, coro => undef}; my $self = bless sub { #     $context->{args} = \@_; #   ,     $context->{done} = 1; if ($context->{coro}) { #   coro- $context->{coro}->ready(); } }, $class; $STORAGE{"$self"} = $context; return $self; }; #   : $cb->wait(); sub wait { my $self = shift; my $context = $STORAGE{"$self"}; #   ,  coro-   $context->{coro} = $Coro::current; #   coro-   ,      while ($context->{done} == 0) { Coro::schedule(); } #        my @args = @{ $context->{args} }; $context->{args} = []; $context->{done} = 0; return @args; } sub DESTROY { my $self = shift; $self->(); delete $STORAGE{"$self"}; }; 1; __END__
      
      









タスクでCoroを使用する可能性がある場合は、試してみてください。おそらく気に入るでしょう。 ドキュメントを研究し、実際に得た知識を共有します。



PS。 EVファミリとCoroファミリのモジュールを一緒に使用する場合は、注意してください。 最初と2番目の両方は、デフォルトでasync()関数をエクスポートします。 したがって、コロスレッドを作成するときは、常に明示的にCoro :: asyncを指定することをお勧めします。



All Articles