レーヨン:Rustのデータの同時実行性

過去数週間、私はRustでのデータ並列化のための実験ライブラリであるRayonの更新に取り組んでいます。



開発がどのように進んでいるかに非常に満足しているので、私はブログ投稿に来た理由を説明することにしました。

Rayonの目標は、シリアルコードに並列処理を簡単に追加できるようにすることです。これにより、 for



ループまたは反復子を複数のスレッドで動作させることができます。 たとえば、このようなイテレータのチェーンがある場合:



 let total_price = stores.iter() .map(|store| store.compute_price(&list)) .sum()
      
      





次に、通常の「シリアルイテレータ」をレーヨンから「パラレルイテレータ」に変更するだけで、作業を並列化できます。



 let total_price = stores.par_iter() .map(|store| store.compute_price(&list)) .sum()
      
      







もちろん、並列処理を単純にするだけでは十分ではなく、安全にする必要もあります。 Rayonは、APIを使用してもデータの競合が発生しないようにします。

この投稿では、Rayonの仕組みについて説明します。 最初に、基本的なレーヨン( join



)プリミティブについて説明し、次にその実装方法について説明します。

Rustの多くの機能を組み合わせることにより、プログラムの実行中に非常に低いオーバーヘッドでjoin



を実装しつつ、厳密なセキュリティを確保できることに注意を払いたいと思います。 次に、 join



基づいて並列イテレータの抽象化がどのように構築されるかを簡単に説明します。

ただし、レーヨンはさらに開発中であることを強調したいと思います。 現在の実装は私が望むほど柔軟ではないので、並列イテレーターの設計は、より多くの、例えば反復(しゃれなし)を経ることになると期待しています。 さらに、特にパニックの広がりやリソースのクリーンアップなど、正しく処理されない特殊なケースがいくつかあります。 それにもかかわらず、Rayonは現在、特定のタスクに役立つことがあります。 あなたも幸せになることを願って、私はとても幸せです!



プライマリレーヨン:プリミティブに参加



投稿の冒頭で、map-reduce操作に並列イテレーターを使用する例を示しました。



 let total_price = stores.par_iter() .map(|store| store.compute_price(&list)) .sum()
      
      





ただし、実際には並列イテレータは、より基本的なプリミティブjoin



基づいて構築された単なるライブラリです。 join



使用join



非常に簡単です。 以下に示すように、2つのクロージャーを引き起こし、それらを並列に実行join



可能性があります。 両方が完了するとすぐに、結果が返されます。



 // `do_something`  `do_something_else` **   join(|| do_something(), || do_something_else())
      
      





ここでの主なポイントは、2つのクロージャーを潜在的に並行して実行できることです: 並列スレッドを使用するかどうか決定は、空きカーネルがあるかどうかに応じて動的に行われます。 join



使用して、プログラム内で並列処理が役立つ場所join



マークし、ライブラリで実行時にそれを使用するかどうかを決定できるようにjoin



という考え方です。

潜在的な並行性のアプローチは、レーヨンを限られたクロスビームフローと区別する基本的な考え方です。 クロスビームで、2つの制限されたスレッド間で作業を分散する場合、常に異なるスレッドで並行して実行されます。 同時に、Rayonでjoin



を呼び出しても、必ずしも並列コードが実行されるとは限りません。 その結果、よりシンプルなAPIだけでなく、リソースをより効率的に使用できます。 これは、並列化が有益になる時期を事前に予測することが非常に難しいためです。 これには常にグローバルコンテキストの知識が必要です。たとえば、コンピューターには無料のカーネルがあり、現在実行されている他の並列操作は何ですか? 実際、この投稿の主な目標の1つは、先ほど見た保証された同時実行性とは対照的に、 Rustのデータ同時実行性のライブラリの基礎として潜在的な同時実行性を促進することです。

これは、クロスビームが提供する保証された並列性のための別個の役割がないことは言うまでもありません。 潜在的な並行性のセマンティクスは、並列クロージャーができることにもいくつかの制限を課します。 たとえば、チャネルを使用してjoin



2つのクロージャ間で通信しようとすると、ほとんどの場合デッドロックが発生します。 join



、通常は順次アルゴリズムで並列処理を使用するためのヒントとして考える価値があります。 時にはこれはあなたが望むものではない-いくつかのアルゴリズムは最初は並列である 。 (ただし、 join



内からMutex



AtomicU32



などの型を使用することは完全に通常であることに注意してください。1つのクロージャーが別のクロージャーを待機するのをブロックしたくないだけです。)



結合例:並列クイックソート



join



プリミティブは、アルゴリズム分割して征服するのに理想的です 。 これらのアルゴリズムは、作業をほぼ等しい部分に分割し、再帰的に実行します。 たとえば、quicksortの並列バージョンを実装できます。



 fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { if v.len() > 1 { let mid = partition(v); let (lo, hi) = v.split_at_mut(mid); rayon::join(|| quick_sort(lo), || quick_sort(hi)); } } fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize { // . https://en.wikipedia.org/wiki/Quicksort#Lomuto_partition_scheme }
      
      





実際、このバージョンのクイックソートとシリアルバージョンの唯一の違いは、 rayon::join



呼び出すrayon::join



ます。



結合の実装方法:盗用



内部的に、 join



ジョブ代行受信と呼ばれる手法を使用して実装されます。 私の知る限り、作業の傍受はCilkプロジェクトの一部として最初に導入され、それ以来、かなり標準的な手法になりました(実際、レーヨン(「ビスコース」、「シルク」、「シルク」、 -約Transl。) -Cilkへのオマージュ。

主なアイデアは、 join(a, b)



呼び出すたびにjoin(a, b)



安全に並行して実行できる2つのタスクa



b



を定義することですが、このための空きスレッドがあるかどうかはまだわかりません。 現在のスレッドが行うことは、 「スケジュールされたジョブ」キューにb



を追加するだけで、その後b



を取得してすぐに実行a



ます。 同時に、他のアクティブなスレッドのプールがあります(通常、CPUコアごとに1つのスレッド、またはそのようなもの)。 スレッドの1つが解放されるとすぐに他のスレッドの「計画された作業」のキューに入り、タスクがあれば、フリースレッドがそれをキャプチャしてそれ自体を実行します。 したがって、この場合、最初のスレッドがa



実行でビジーでa



、別のスレッドがb



実行を開始できます。

最初のスレッドがa



終了a



とすぐに、他の誰かがb



実行を開始しましたか? そうでない場合、彼は自分でそれを行います。 その場合、別のスレッドが終了するまで待機する必要があります。 しかし、最初のスレッドが待機している間、別のスレッドから作業を奪い取り、それによって全体の作業プロセスの完了に貢献できます。

Rustのような擬似コードの形式では、 join



は次のようになります( 実際のコードは少し異なります。たとえば、各操作で結果を得ることができます)。



 fn join<A,B>(oper_a: A, oper_b: B) where A: FnOnce() + Send, B: FnOnce() + Send, { //  `oper_b`  ,      : let job = push_onto_local_queue(oper_b); //  `oper_a` : oper_a(); // Check whether anybody stole `oper_b`: if pop_from_local_queue(oper_b) { //  ,  . oper_b(); } else { // ,    . //         : while not_yet_complete(job) { steal_from_others(); } result_b = job.result(); } }
      
      





ジョブキャプチャを非常にエレガントにしているのは、CPU負荷への自然な適応です。 つまり、すべてのワーカースレッドがビジーの場合、 join(a, b)



はすべてのクロージャーを順番に実行し始めます(つまりa(); b();



)、これはシリアルコードより悪くありません。 しかし、フリースレッドがある場合は、並列実行になります。



パフォーマンス測定



レーヨンはまだ非常に若いため、テストプログラムはあまりありません(まだ最適化していません)。 これにもかかわらず、すでに顕著な加速を得ることができますが、このためには、デバッグに少し時間をかける必要があります。 たとえば、クイックソート更新バージョンでは、4コアMacbook Proでの並列実行による次の加速が見られます(したがって、4倍の加速が期待できる最大値です)。

配列の長さ 加速
1K 0.95x
32K 2.19x
64K 3.09x
128K 3.52倍
512K 3.84x
1024K 4.01x


元のバージョンと比較して行った変更- 移行をシーケンシャルアルゴリズムに追加しました。 一番下の行は、入力配列が十分に小さい場合(私のコードでは5000エレメント未満)、 join



呼び出しを拒否して、アルゴリズムのシーケンシャルバージョンに移動するjoin



です。 これは、私の例のコードからわかるように、コードを特性とまったく重複させることなく行うことができます。 (興味があれば、この記事の最後にある付録でアイデアを説明します。)

いくつかの最適化の後、シーケンシャル実行への移行がそれほど頻繁に必要とされないことを願っていますが、高レベルのAPI(前述の並列イテレーターなど)がシーケンシャル実行への移行も行うことができるため、常に考える必要はありません。

いずれにせよ、シーケンシャル実行に移行しないと、結果はそれほど良くありませんが、はるかに悪い結果になる可能性があります。

配列の長さ 加速
1K 0.41x
32K 2.05x
64K 2.42x
128K 2.75x
512K 3.02x
1024K 3.10x


特に、このバージョンのコードは、 並列処理のためにすべてのサブ配列をユニット長まで送信することに注意してください。 配列が512Kまたは1024Kの長さである場合、多くのサブアレイが作成されます。これは多くのタスクを意味しますが、最大3.10倍の加速が得られます。 コードが非常にうまく実行される理由は、次の部分で説明するように、 基本的なアプローチが正しいためだと思います。Rayonはメモリの割り当てと仮想ディスパッチを回避します。 それでも、1Kアレイでは0.41xよりも優れたパフォーマンスを望みます(そしてそれは可能だと思います)。



Rust機能を使用してオーバーヘッドを最小限に抑える



上記からわかるように、このスキームを動作可能にするには、タスクをローカルキューに配置するオーバーヘッドコストを可能な限り削減する必要があります。 最終的に、プロセッサの数はタスクの数よりもはるかに少ないため、ほとんどのタスクはインターセプトされないことが予想されます。 Rayon APIは、Rustのいくつかの機能を使用してこのオーバーヘッドを削減するように設計されています。



上記からわかるように、投稿タスクのオーバーヘッドは非常に低くなっていますが、私が望むほどではありません。 それらをさらに減らすにはいくつかの方法があります。





データレースの自由



レーヨンはデータレースからの自由を保証することを先に述べました。 これは、再現するのが難しい奇妙なバグを心配することなく、以前のシーケンシャルコードに並列性を追加できることを意味します。

心配する必要があるエラーには2つのタイプがあります。 まず、2つのフォールトが同じ可変状態を使用できるため、1つのスレッドで行われた変更が他のスレッドに影響を与える可能性があります。 たとえば、両方のクロージャーでlo



パラメーターを指定してquick_sort



を(誤って) quick_sort



ように上記の例を変更した場合、コードがコンパイルされないことを願っています。



 fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { if v.len() > 1 { let mid = partition(v); let (lo, hi) = v.split_at_mut(mid); rayon::join(|| quick_sort(lo), || quick_sort(lo)); // <-- ! } }
      
      





実際、このエラーが表示されます。



 test.rs:14:10: 14:27 error: closure requires unique access to `lo` but it is already borrowed [E0500] test.rs:14 || quick_sort(lo)); ^~~~~~~~~~~~~~~~~
      
      





一方のクロージャーでlo



(またはhi



)を処理し、もう一方のクロージャーでv



を処理しようとすると、同様のエラーが発生します。これは両方のスライスと重なります。

注:この例は人為的なように見えますが、実際には、並列イテレーターを実装するときに一度許可した(または許可する)実際のバグであり、これについては後で説明します。 コピーと貼り付けでこのような間違いを犯すのは非常に簡単で、Rustがそれらを不可能なイベントに変え、プログラムのクラッシュによるバグに変えないことは非常に良いことです。

キャッチできる別の種類のバグは、 join



のクロージャーの1つからのスレッドセーフでないタイプの使用です。 たとえば、RustはRc



と呼ばれる非原子参照カウンターを持つ型を提供します。 Rc



は非アトミック命令を使用して参照カウントを更新するため、 Rc



を異なるスレッド間で分割することは安全ではありません。 次の例のように誰かがやろうとすると、参照カウンタが簡単に不正になり、メモリが二重に解放されるか、さらに悪化する可能性があります。



 fn share_rc<T:PartialOrd+Send>(rc: Rc<i32> { //     `clone`   . //     . //     ! rayon::join(|| something(rc.clone()), || something(rc.clone())); }
      
      





しかし、もちろん、この例をコンパイルしようとすると、エラーが発生します。



 test.rs:14:5: 14:9 error: the trait `core::marker::Sync` is not implemented for the type `alloc::rc::Rc<i32>` [E0277] test.rs:14 rayon::join(|| something(rc.clone()), ^~~~~~~~~~~ test.rs:14:5: 14:9 help: run `rustc --explain E0277` to see a detailed explanation test.rs:14:5: 14:9 note: `alloc::rc::Rc<i32>` cannot be shared between threads safely
      
      





ご覧のとおり、 「メモ」の後の最後の投稿で、コンパイラは、異なるスレッド間でRc



へのアクセスを共有できないことを示しています。

join



関数がこれらの不変条件の両方をサポートできるのはどのようなダークマジックですか? 実際、答えは驚くほど簡単です。 同じ&mut



スライスを2つの異なるクロージャーに転送しようとしたときに受け取った最初のエラーは、基本型システムRustに由来します:同時に存在し、同時に同じ&mut



アクセスできる2つのクロージャーを持つことはできませんカットします。 これは、 &mut



データへのアクセスが一意である必要があるためです。つまり、同じ&mut



値への一意のアクセスを持つ2つのクロージャがある場合、値はそれほど一意ではありません。

(実際、これはRust型システムで作業する際の最大の洞察の 1つでした。それ以前は、シーケンシャルプログラムと「データレース」「ハンギングポインター はまったく異なる種類のバグであると考えていましたが、 Hydra単独では、基本的に、両方のタイプのバグにはエイリアスとデータ変更が横行しており、両方ともテニュアシステムと借用システムを使用して解決できます。

では、スレッド間でRc



を転送しようとした2番目のエラーはどうでしょうか。 join



関数では、両方のクロージャー引数がSend



タイプを満たす必要があるために発生しました。 RustのSend



は、スレッド間でデータを安全に転送できることを意味します。 したがって、 join



が両方のクロージャーがSend



タイプを満たす必要があることを宣言すると、 「クロージャーがアクセスできるデータの場合、あるスレッドから別のスレッドに切り替えても安全であるはずです。



並列イテレーター



投稿の冒頭で、並列イテレーターを使用した例を示しました。



 let total_price = stores.par_iter() .map(|store| store.compute_price(&list)) .sum();
      
      





しかしそれ以来、私はjoin



専念しています。 先ほど言ったように、並列イテレーターAPIは実際にはjoin



単純なラッパーです。 現時点では、それは何よりも濃縮物のように見えます。 しかし、本当にエレガントなのは 、並行性に関連する安全でないコードを必要としないことです。 つまり、並列イテレーターAPIは単純にjoin



に基づいて構築され、安全でないコードをすべて隠します。 (より正確には、ベクトルの構築時に初期化されていないメモリの管理に関連する安全でないコードはまだほとんどありません。しかし、このコードは並列処理とは関係ありません。同様のコードはVec



実装にあります、適切に書く時間がなかったので。)

並列イテレータの実装の詳細に深く入り込みたくありません。私の計画によると、それはまだ変更されるためです。 しかし、高レベルでの考え方は、次の基本的なメソッドを持つParallelIterator



があるということです。



 pub trait ParallelIterator { type Item; type Shared: Sync; type State: ParallelIteratorState<Shared=Self::Shared, Item=Self::Item> + Send; fn state(self) -> (Self::Shared, Self::State); ... //    ,  `map`  . . }
      
      





この考え方は、メソッドstate



がイテレータを一般的な状態と個々のスレッドの状態に分割するというものです。一般的な状態は、すべてのワーカースレッドで(潜在的に)利用できるため、タイプである必要がありますSync



(同時に複数のスレッドからの共有をサポートします)。個々のスレッドの状態はjoin



への呼び出しごとに分離されるため、タイプにのみ応答するSend



必要があります(別のスレッドに安全に転送できます)。

タイプ ParallelIteratorState



は、残りの作業の一部を表します(たとえば、処理用のサブスライス)。彼には3つの方法があります。



 pub trait ParallelIteratorState: Sized { type Item; type Shared: Sync; fn len(&mut self) -> ParallelLen; fn split_at(self, index: usize) -> (Self, Self); fn for_each<OP>(self, shared: &Self::Shared, op: OP) where OP: FnMut(Self::Item); }
      
      





このメソッドlen



は、残っている作業量のアイデアを提供します。メソッドsplit_at



はこの状態を2つの部分に分割します。このメソッドfor_each



は、指定された反復子からのすべての値を処理します。したがって、たとえば、スライスの並列イテレータ&[T]



は次のことを行う必要があります。



これらの両方の特性を備えているため、同じパターンに従ってコレクションの並列処理を実装できます。作業がどれだけ残っているかを確認し、多すぎる場合は、2つの部分に分割します。それ以外の場合は、コレクションを順番に処理します(これには、前に見た順番処理への移行が自動的に含まれることに注意してください)。



 fn process(shared, state) { if state.len() is too big { //     let midpoint = state.len() / 2; let (state1, state2) = state.split_at(midpoint); rayon::join(|| process(shared, state1), || process(shared, state2)); } else { //     state.for_each(|item| { // process item }) } }
      
      





ここでは、例を参照することができ、リンクされているベクトルの要素のコレクションをして1つの値にデータ・ストリームを最小限に抑えるには



結論と歴史的背景



Rayonの最新バージョンに非常に満足しています。彼女は非常に使いやすく、非常に表現力があり、非常に効果的になる可能性が高いと思います。

また、Rustでデータの並列処理がエレガントになったことも嬉しく思います。これは、長い進化と多くの開発の繰り返しの結果です。たとえば、Rustは初期の頃、共有メモリを使用せずに並列タスクがパイプを介して通信する厳密なアーランのようなアプローチを使用していました。このアプローチは、高レベルのアプリケーションには適していますが、並列クイックソートソートの記述には適していません。ただし、型システムを徐々に変更して、パラレルクイックソートのシンプルでクイックなバージョンにどんどん近づけるようにしました。

私の初期の デザインを見ると、現在の反復Rayon



が現時点で最適であることは明らかです。私が特に気に入っているのは、ユーザーだけでなく開発者にとってもシンプルなことです。つまり、Rust型システムでクレイジーなトリックを使用したり、セキュリティを実現するためにパズル型を使用したりする必要はありません。これは、言語の2つの主なソリューションのおかげで可能だと思います。





アプリケーション:コードの重複を伴​​わない順次実行への移行の実装



クイックソートの例でパフォーマンスを向上させるには、配列が十分に小さい場合にシリアルコードへのジャンプを使用する必要があることを前述しました。これらのケースに2つのクイックソート実装があると、非常にイライラします。幸い、Rust特性を使用して、同じソースコードから2つのバージョンのコードを自動的に生成できます。このアプリは、この例で使用したトリックを説明しています。

最初に、Joiner



関数から抽象化される型が定義されますjoin







 trait Joiner { ///    ,    . fn is_parallel() -> bool; ///   `rayon::join`,   `oper_a(); oper_b();`. fn join<A,R_A,B,R_B>(oper_a: A, oper_b: B) -> (R_A, R_B) where A: FnOnce() -> R_A + Send, B: FnOnce() -> R_B + Send; }
      
      





このタイプには、シリアルおよびパラレル操作モードに対応する2つの実装があります。



 struct Parallel; impl Joiner for Parallel { .. } struct Sequential; impl Joiner for Sequential { .. }
      
      





これで、選択した実装(シーケンシャルまたはパラレル)を定義するquick_sort



型に関連するポリモーフィック関数として書き換えることができますJ: Joiner



小さな配列の並列バージョンはシリアルになります:



 fn quick_sort<J:Joiner, T:PartialOrd+Send>(v: &mut [T]) { if v.len() > 1 { //     ,    5K: if J::is_parallel() && v.len() <= 5*1024 { return quick_sort::<Sequential, T>(v); } let mid = partition(v); let (lo, hi) = v.split_at_mut(mid); J::join(|| quick_sort::<J,T>(lo), || quick_sort::<J,T>(hi)); }
      
      






All Articles