開発がどのように進んでいるかに非常に満足しているので、私はブログ投稿に来た理由を説明することにしました。
Rayonの目標は、シリアルコードに並列処理を簡単に追加できるようにすることです。これにより、
for
ループまたは反復子を複数のスレッドで動作させることができます。 たとえば、このようなイテレータのチェーンがある場合:
let total_price = stores.iter() .map(|store| store.compute_price(&list)) .sum()
次に、通常の「シリアルイテレータ」をレーヨンから「パラレルイテレータ」に変更するだけで、作業を並列化できます。
let total_price = stores.par_iter() .map(|store| store.compute_price(&list)) .sum()
もちろん、並列処理を単純にするだけでは十分ではなく、安全にする必要もあります。 Rayonは、APIを使用してもデータの競合が発生しないようにします。
この投稿では、Rayonの仕組みについて説明します。 最初に、基本的なレーヨン(
join
)プリミティブについて説明し、次にその実装方法について説明します。
Rustの多くの機能を組み合わせることにより、プログラムの実行中に非常に低いオーバーヘッドで
join
を実装しつつ、厳密なセキュリティを確保できることに注意を払いたいと思います。 次に、
join
基づいて並列イテレータの抽象化がどのように構築されるかを簡単に説明します。
ただし、レーヨンはさらに開発中であることを強調したいと思います。 現在の実装は私が望むほど柔軟ではないので、並列イテレーターの設計は、より多くの、例えば反復(しゃれなし)を経ることになると期待しています。 さらに、特にパニックの広がりやリソースのクリーンアップなど、正しく処理されない特殊なケースがいくつかあります。 それにもかかわらず、Rayonは現在、特定のタスクに役立つことがあります。 あなたも幸せになることを願って、私はとても幸せです!
プライマリレーヨン:プリミティブに参加
投稿の冒頭で、map-reduce操作に並列イテレーターを使用する例を示しました。
let total_price = stores.par_iter() .map(|store| store.compute_price(&list)) .sum()
ただし、実際には並列イテレータは、より基本的なプリミティブ
join
基づいて構築された単なるライブラリです。
join
使用
join
非常に簡単です。 以下に示すように、2つのクロージャーを引き起こし、それらを並列に実行
join
可能性があります。 両方が完了するとすぐに、結果が返されます。
// `do_something` `do_something_else` ** join(|| do_something(), || do_something_else())
ここでの主なポイントは、2つのクロージャーを潜在的に並行して実行できることです: 並列スレッドを使用するかどうかの決定は、空きカーネルがあるかどうかに応じて動的に行われます。
join
使用して、プログラム内で並列処理が役立つ場所
join
マークし、ライブラリで実行時にそれを使用するかどうかを決定できるように
join
という考え方です。
潜在的な並行性のアプローチは、レーヨンを限られたクロスビームフローと区別する基本的な考え方です。 クロスビームで、2つの制限されたスレッド間で作業を分散する場合、常に異なるスレッドで並行して実行されます。 同時に、Rayonで
join
を呼び出しても、必ずしも並列コードが実行されるとは限りません。 その結果、よりシンプルなAPIだけでなく、リソースをより効率的に使用できます。 これは、並列化が有益になる時期を事前に予測することが非常に難しいためです。 これには常にグローバルコンテキストの知識が必要です。たとえば、コンピューターには無料のカーネルがあり、現在実行されている他の並列操作は何ですか? 実際、この投稿の主な目標の1つは、先ほど見た保証された同時実行性とは対照的に、 Rustのデータ同時実行性のライブラリの基礎として潜在的な同時実行性を促進することです。
これは、クロスビームが提供する保証された並列性のための別個の役割がないことは言うまでもありません。 潜在的な並行性のセマンティクスは、並列クロージャーができることにもいくつかの制限を課します。 たとえば、チャネルを使用して
join
2つのクロージャ間で通信しようとすると、ほとんどの場合デッドロックが発生します。
join
、通常は順次アルゴリズムで並列処理を使用するためのヒントとして考える価値があります。 時にはこれはあなたが望むものではない-いくつかのアルゴリズムは最初は並列である 。 (ただし、
join
内から
Mutex
、
AtomicU32
などの型を使用することは完全に通常であることに注意してください。1つのクロージャーが別のクロージャーを待機するのをブロックしたくないだけです。)
結合例:並列クイックソート
join
プリミティブは、アルゴリズムを分割して征服するのに理想的です 。 これらのアルゴリズムは、作業をほぼ等しい部分に分割し、再帰的に実行します。 たとえば、quicksortの並列バージョンを実装できます。
fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { if v.len() > 1 { let mid = partition(v); let (lo, hi) = v.split_at_mut(mid); rayon::join(|| quick_sort(lo), || quick_sort(hi)); } } fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize { // . https://en.wikipedia.org/wiki/Quicksort#Lomuto_partition_scheme }
実際、このバージョンのクイックソートとシリアルバージョンの唯一の違いは、
rayon::join
呼び出す
rayon::join
ます。
結合の実装方法:盗用
内部的に、
join
ジョブ代行受信と呼ばれる手法を使用して実装されます。 私の知る限り、作業の傍受はCilkプロジェクトの一部として最初に導入され、それ以来、かなり標準的な手法になりました(実際、レーヨン(「ビスコース」、「シルク」、「シルク」、 -約Transl。) -Cilkへのオマージュ。
主なアイデアは、
join(a, b)
呼び出すたびに
join(a, b)
安全に並行して実行できる2つのタスク
a
と
b
を定義することですが、このための空きスレッドがあるかどうかはまだわかりません。 現在のスレッドが行うことは、 「スケジュールされたジョブ」キューに
b
を追加するだけで、その後
b
を取得してすぐに実行
a
ます。 同時に、他のアクティブなスレッドのプールがあります(通常、CPUコアごとに1つのスレッド、またはそのようなもの)。 スレッドの1つが解放されるとすぐに、他のスレッドの「計画された作業」のキューに入り、タスクがあれば、フリースレッドがそれをキャプチャしてそれ自体を実行します。 したがって、この場合、最初のスレッドが
a
実行でビジーで
a
、別のスレッドが
b
実行を開始できます。
最初のスレッドが
a
終了
a
とすぐに、他の誰かが
b
実行を開始しましたか? そうでない場合、彼は自分でそれを行います。 その場合、別のスレッドが終了するまで待機する必要があります。 しかし、最初のスレッドが待機している間、別のスレッドから作業を奪い取り、それによって全体の作業プロセスの完了に貢献できます。
Rustのような擬似コードの形式では、
join
は次のようになります( 実際のコードは少し異なります。たとえば、各操作で結果を得ることができます)。
fn join<A,B>(oper_a: A, oper_b: B) where A: FnOnce() + Send, B: FnOnce() + Send, { // `oper_b` , : let job = push_onto_local_queue(oper_b); // `oper_a` : oper_a(); // Check whether anybody stole `oper_b`: if pop_from_local_queue(oper_b) { // , . oper_b(); } else { // , . // : while not_yet_complete(job) { steal_from_others(); } result_b = job.result(); } }
ジョブキャプチャを非常にエレガントにしているのは、CPU負荷への自然な適応です。 つまり、すべてのワーカースレッドがビジーの場合、
join(a, b)
はすべてのクロージャーを順番に実行し始めます(つまり
a(); b();
)、これはシリアルコードより悪くありません。 しかし、フリースレッドがある場合は、並列実行になります。
パフォーマンス測定
レーヨンはまだ非常に若いため、テストプログラムはあまりありません(まだ最適化していません)。 これにもかかわらず、すでに顕著な加速を得ることができますが、このためには、デバッグに少し時間をかける必要があります。 たとえば、クイックソートの更新バージョンでは、4コアMacbook Proでの並列実行による次の加速が見られます(したがって、4倍の加速が期待できる最大値です)。
配列の長さ | 加速 |
---|---|
1K | 0.95x |
32K | 2.19x |
64K | 3.09x |
128K | 3.52倍 |
512K | 3.84x |
1024K | 4.01x |
元のバージョンと比較して行った変更- 移行をシーケンシャルアルゴリズムに追加しました。 一番下の行は、入力配列が十分に小さい場合(私のコードでは5000エレメント未満)、
join
呼び出しを拒否して、アルゴリズムのシーケンシャルバージョンに移動する
join
です。 これは、私の例のコードからわかるように、コードを特性とまったく重複させることなく行うことができます。 (興味があれば、この記事の最後にある付録でアイデアを説明します。)
いくつかの最適化の後、シーケンシャル実行への移行がそれほど頻繁に必要とされないことを願っていますが、高レベルのAPI(前述の並列イテレーターなど)がシーケンシャル実行への移行も行うことができるため、常に考える必要はありません。
いずれにせよ、シーケンシャル実行に移行しないと、結果はそれほど良くありませんが、はるかに悪い結果になる可能性があります。
配列の長さ | 加速 |
---|---|
1K | 0.41x |
32K | 2.05x |
64K | 2.42x |
128K | 2.75x |
512K | 3.02x |
1024K | 3.10x |
特に、このバージョンのコードは、 並列処理のためにすべてのサブ配列をユニット長まで送信することに注意してください。 配列が512Kまたは1024Kの長さである場合、多くのサブアレイが作成されます。これは多くのタスクを意味しますが、最大3.10倍の加速が得られます。 コードが非常にうまく実行される理由は、次の部分で説明するように、 基本的なアプローチが正しいためだと思います。Rayonはメモリの割り当てと仮想ディスパッチを回避します。 それでも、1Kアレイでは0.41xよりも優れたパフォーマンスを望みます(そしてそれは可能だと思います)。
Rust機能を使用してオーバーヘッドを最小限に抑える
上記からわかるように、このスキームを動作可能にするには、タスクをローカルキューに配置するオーバーヘッドコストを可能な限り削減する必要があります。 最終的に、プロセッサの数はタスクの数よりもはるかに少ないため、ほとんどのタスクはインターセプトされないことが予想されます。 Rayon APIは、Rustのいくつかの機能を使用してこのオーバーヘッドを削減するように設計されています。
-
join
、その引数のクロージャータイプに関してポリモーフィックです。 そしてこれは、 単相化の過程で、特定の各呼び出しに特化したjoin
個別のコピーが作成されることを意味します。 これにより、join
がoper_a()
およびoper_b()
呼び出すoper_a()
それらがインターセプトされる比較的まれなケースとは異なります)、呼び出しは静的にディスパッチされるという事実につながります。つまり、インラインになります。 はい。クロージャを作成するためにメモリを割り当てる必要はありません。 -
join
は両方のクロージャーが完了するまで実行をブロックするため、スタック上の配置を最大限に活用できます 。 これは、APIユーザーと実装の両方に適しています。たとえば、上記のクイックソートの例は、入力に渡される&mut [T]
スライスへのアクセスに基づいています。これは、join
ロックにより可能です。 同時に、join
実装は、ヒープからのメモリの割り当てを完全に回避し 、スタックのみを使用できます(たとえば、ローカルタスクキューに配置されたクロージャーオブジェクトはスタックに配置されます)。
上記からわかるように、投稿タスクのオーバーヘッドは非常に低くなっていますが、私が望むほどではありません。 それらをさらに減らすにはいくつかの方法があります。
- 多くの作業代行受信の実装では、並列処理のためにタスクをタスクキューに配置することをスキップするかどうかを決定するときに、ヒューリスティックを使用します。 たとえば、作業Lazy Tzannes Planning では 、作業をインターセプトできる空きワーカースレッド( 「空腹」スレッドと呼ばれる)がない場合、キューにタスクを配置しないようにしました。
- そして、もちろん、古き良き最適化が役立ちます。 たとえば、
join
コンパイルjoin
ときに取得したLLVMビットコードやアセンブラコードを調べたことがなく、そこで最適化するのが最も簡単なようです。
データレースの自由
レーヨンはデータレースからの自由を保証することを先に述べました。 これは、再現するのが難しい奇妙なバグを心配することなく、以前のシーケンシャルコードに並列性を追加できることを意味します。
心配する必要があるエラーには2つのタイプがあります。 まず、2つのフォールトが同じ可変状態を使用できるため、1つのスレッドで行われた変更が他のスレッドに影響を与える可能性があります。 たとえば、両方のクロージャーで
lo
パラメーターを指定して
quick_sort
を(誤って)
quick_sort
ように上記の例を変更した場合、コードがコンパイルされないことを願っています。
fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { if v.len() > 1 { let mid = partition(v); let (lo, hi) = v.split_at_mut(mid); rayon::join(|| quick_sort(lo), || quick_sort(lo)); // <-- ! } }
実際、このエラーが表示されます。
test.rs:14:10: 14:27 error: closure requires unique access to `lo` but it is already borrowed [E0500] test.rs:14 || quick_sort(lo)); ^~~~~~~~~~~~~~~~~
一方のクロージャーで
lo
(または
hi
)を処理し、もう一方のクロージャーで
v
を処理しようとすると、同様のエラーが発生します。これは両方のスライスと重なります。
注:この例は人為的なように見えますが、実際には、並列イテレーターを実装するときに一度許可した(または許可する)実際のバグであり、これについては後で説明します。 コピーと貼り付けでこのような間違いを犯すのは非常に簡単で、Rustがそれらを不可能なイベントに変え、プログラムのクラッシュによるバグに変えないことは非常に良いことです。
キャッチできる別の種類のバグは、
join
のクロージャーの1つからのスレッドセーフでないタイプの使用です。 たとえば、Rustは
Rc
と呼ばれる非原子参照カウンターを持つ型を提供します。
Rc
は非アトミック命令を使用して参照カウントを更新するため、
Rc
を異なるスレッド間で分割することは安全ではありません。 次の例のように誰かがやろうとすると、参照カウンタが簡単に不正になり、メモリが二重に解放されるか、さらに悪化する可能性があります。
fn share_rc<T:PartialOrd+Send>(rc: Rc<i32> { // `clone` . // . // ! rayon::join(|| something(rc.clone()), || something(rc.clone())); }
しかし、もちろん、この例をコンパイルしようとすると、エラーが発生します。
test.rs:14:5: 14:9 error: the trait `core::marker::Sync` is not implemented for the type `alloc::rc::Rc<i32>` [E0277] test.rs:14 rayon::join(|| something(rc.clone()), ^~~~~~~~~~~ test.rs:14:5: 14:9 help: run `rustc --explain E0277` to see a detailed explanation test.rs:14:5: 14:9 note: `alloc::rc::Rc<i32>` cannot be shared between threads safely
ご覧のとおり、 「メモ」の後の最後の投稿で、コンパイラは、異なるスレッド間で
Rc
へのアクセスを共有できないことを示しています。
join
関数がこれらの不変条件の両方をサポートできるのはどのようなダークマジックですか? 実際、答えは驚くほど簡単です。 同じ
&mut
スライスを2つの異なるクロージャーに転送しようとしたときに受け取った最初のエラーは、基本型システムRustに由来します:同時に存在し、同時に同じ
&mut
アクセスできる2つのクロージャーを持つことはできませんカットします。 これは、
&mut
データへのアクセスが一意である必要があるためです。つまり、同じ
&mut
値への一意のアクセスを持つ2つのクロージャがある場合、値はそれほど一意ではありません。
(実際、これはRust型システムで作業する際の最大の洞察の 1つでした。それ以前は、シーケンシャルプログラムと「データレース」の「ハンギングポインター 」はまったく異なる種類のバグであると考えていましたが、 Hydra単独では、基本的に、両方のタイプのバグにはエイリアスとデータ変更が横行しており、両方ともテニュアシステムと借用システムを使用して解決できます。
では、スレッド間で
Rc
を転送しようとした2番目のエラーはどうでしょうか。
join
関数では、両方のクロージャー引数が
Send
タイプを満たす必要があるために発生しました。 Rustの
Send
は、スレッド間でデータを安全に転送できることを意味します。 したがって、
join
が両方のクロージャーが
Send
タイプを満たす必要があることを宣言すると、 「クロージャーがアクセスできるデータの場合、あるスレッドから別のスレッドに切り替えても安全であるはずです。 」
並列イテレーター
投稿の冒頭で、並列イテレーターを使用した例を示しました。
let total_price = stores.par_iter() .map(|store| store.compute_price(&list)) .sum();
しかしそれ以来、私は
join
専念しています。 先ほど言ったように、並列イテレーターAPIは実際には
join
単純なラッパーです。 現時点では、それは何よりも濃縮物のように見えます。 しかし、本当にエレガントなのは 、並行性に関連する安全でないコードを必要としないことです。 つまり、並列イテレーターAPIは単純に
join
に基づいて構築され、安全でないコードをすべて隠します。 (より正確には、ベクトルの構築時に初期化されていないメモリの管理に関連する安全でないコードはまだほとんどありません。しかし、このコードは並列処理とは関係ありません。同様のコードは
Vec
実装にあります、適切に書く時間がなかったので。)
並列イテレータの実装の詳細に深く入り込みたくありません。私の計画によると、それはまだ変更されるためです。 しかし、高レベルでの考え方は、次の基本的なメソッドを持つ
ParallelIterator
があるということです。
pub trait ParallelIterator { type Item; type Shared: Sync; type State: ParallelIteratorState<Shared=Self::Shared, Item=Self::Item> + Send; fn state(self) -> (Self::Shared, Self::State); ... // , `map` . . }
この考え方は、メソッド
state
がイテレータを一般的な状態と個々のスレッドの状態に分割するというものです。一般的な状態は、すべてのワーカースレッドで(潜在的に)利用できるため、タイプである必要があります
Sync
(同時に複数のスレッドからの共有をサポートします)。個々のスレッドの状態は
join
、への呼び出しごとに分離されるため、タイプにのみ応答する
Send
必要があります(別のスレッドに安全に転送できます)。
タイプ
ParallelIteratorState
は、残りの作業の一部を表します(たとえば、処理用のサブスライス)。彼には3つの方法があります。
pub trait ParallelIteratorState: Sized { type Item; type Shared: Sync; fn len(&mut self) -> ParallelLen; fn split_at(self, index: usize) -> (Self, Self); fn for_each<OP>(self, shared: &Self::Shared, op: OP) where OP: FnMut(Self::Item); }
このメソッド
len
は、残っている作業量のアイデアを提供します。メソッド
split_at
はこの状態を2つの部分に分割します。このメソッド
for_each
は、指定された反復子からのすべての値を処理します。したがって、たとえば、スライスの並列イテレータ
&[T]
は次のことを行う必要があります。
len
スライスの長さを返すだけの実装、split_at
スライスを2つのサブスライスに分割する実装、- そして実装します
for_each
。これは配列を通り抜けて各要素の操作を引き起こしますop
。
これらの両方の特性を備えているため、同じパターンに従ってコレクションの並列処理を実装できます。作業がどれだけ残っているかを確認し、多すぎる場合は、2つの部分に分割します。それ以外の場合は、コレクションを順番に処理します(これには、前に見た順番処理への移行が自動的に含まれることに注意してください)。
fn process(shared, state) { if state.len() is too big { // let midpoint = state.len() / 2; let (state1, state2) = state.split_at(midpoint); rayon::join(|| process(shared, state1), || process(shared, state2)); } else { // state.for_each(|item| { // process item }) } }
ここでは、例を参照することができ、リンクされているベクトルの要素のコレクションをして1つの値にデータ・ストリームを最小限に抑えるには。
結論と歴史的背景
Rayonの最新バージョンに非常に満足しています。彼女は非常に使いやすく、非常に表現力があり、非常に効果的になる可能性が高いと思います。
また、Rustでデータの並列処理がエレガントになったことも嬉しく思います。これは、長い進化と多くの開発の繰り返しの結果です。たとえば、Rustは初期の頃、共有メモリを使用せずに並列タスクがパイプを介して通信する厳密なアーランのようなアプローチを使用していました。このアプローチは、高レベルのアプリケーションには適していますが、並列クイックソートソートの記述には適していません。ただし、型システムを徐々に変更して、パラレルクイックソートのシンプルでクイックなバージョンにどんどん近づけるようにしました。
私の初期の デザインを見ると、現在の反復
Rayon
が現時点で最適であることは明らかです。私が特に気に入っているのは、ユーザーだけでなく開発者にとってもシンプルなことです。つまり、Rust型システムでクレイジーなトリックを使用したり、セキュリティを実現するためにパズル型を使用したりする必要はありません。これは、言語の2つの主なソリューションのおかげで可能だと思います。
- , . , ,
&mut
,const
- ( , ) ( , Rust , , - ,const
— . .) . , Rust , . - Send , RFC458.
Send
, . RFCSend
-:'static
, . Erlang, , , . - , .
,'static
Send
, !
アプリケーション:コードの重複を伴わない順次実行への移行の実装
クイックソートの例でパフォーマンスを向上させるには、配列が十分に小さい場合にシリアルコードへのジャンプを使用する必要があることを前述しました。これらのケースに2つのクイックソート実装があると、非常にイライラします。幸い、Rust特性を使用して、同じソースコードから2つのバージョンのコードを自動的に生成できます。このアプリは、この例で使用したトリックを説明しています。
最初に、
Joiner
関数から抽象化される型が定義されます
join
:
trait Joiner { /// , . fn is_parallel() -> bool; /// `rayon::join`, `oper_a(); oper_b();`. fn join<A,R_A,B,R_B>(oper_a: A, oper_b: B) -> (R_A, R_B) where A: FnOnce() -> R_A + Send, B: FnOnce() -> R_B + Send; }
このタイプには、シリアルおよびパラレル操作モードに対応する2つの実装があります。
struct Parallel; impl Joiner for Parallel { .. } struct Sequential; impl Joiner for Sequential { .. }
これで、選択した実装(シーケンシャルまたはパラレル)を定義する
quick_sort
型に関連するポリモーフィック関数として書き換えることができます
J: Joiner
。小さな配列の並列バージョンはシリアルになります:
fn quick_sort<J:Joiner, T:PartialOrd+Send>(v: &mut [T]) { if v.len() > 1 { // , 5K: if J::is_parallel() && v.len() <= 5*1024 { return quick_sort::<Sequential, T>(v); } let mid = partition(v); let (lo, hi) = v.split_at_mut(mid); J::join(|| quick_sort::<J,T>(lo), || quick_sort::<J,T>(hi)); }