ブロックなしのデータブロックのアトミック処理

ブロックせずにアルゴリズムを使用することは、開発者にとって常に威圧的です。 2つ以上のスレッドが同じデータブロックを同時に処理できないように、ブロックせずにデータへのアクセスを整理することは非常に困難です。 ほとんどの開発者は、スタックやリンクリストなどの標準コンテナをブロックせずに使用しますが、それ以上は使用しません。 同じ記事で、ブロックせずにマルチスレッド環境でデータへのアクセスを整理する方法を説明したいと思います。



このメソッドの主なアイデアは、各スレッドが個別のバッファーを使用し、そこにメインバッファーからデータをコピーし、処理してから、そのバッファーへのポインターをメインバッファーへのポインターと交換することです。 次のコードを検討してください。



#include <stdio.h> struct data_s { int a; int b; }; struct data_s reader_tmp, writer_tmp, data; struct data_s *reader_tmp_ptr, *writer_tmp_ptr, *data_ptr; int done = 0; void process(struct data_s *data) { data->a++; data->b++; } void* writer(void* p) { struct data_s *tmp_ptr; int i; for(i = 0; i < 1000000; i++) { do { tmp_ptr = data_ptr; writer_tmp_ptr->a = tmp_ptr->a; writer_tmp_ptr->b = tmp_ptr->b; process(writer_tmp_ptr); } while(!__sync_compare_and_swap(&data_ptr, tmp_ptr, writer_tmp_ptr)); writer_tmp_ptr = tmp_ptr; } } void* reader(void *p) { struct data_s *tmp_ptr; int a, b; while(!done) { do { tmp_ptr = data_ptr; reader_ptr->a = tmp_ptr->a; reader_ptr->b = tmp_ptr->b; a = tmp_ptr->a; b = tmp_ptr->b; } while(!__sync_bool_compare_and_swap(&data_ptr, tmp_ptr, reader_tmp_ptr)); reader_tmp_ptr = tmp_ptr; printf(“data = {%d, %d}\n”, a, b); } } int main() { pthread_t reader_thread, writer_thread; data.a = 0; data.b = 0; data_ptr = &data; writer_tmp_ptr = &writer_tmp; reader_tmp_ptr = &reader_tmp; pthread_create(&read_thread, NULL, reader, NULL); pthread_create(&write_thread, NULL, writer, NULL); pthread_join(write_thread, NULL); done = 1; pthread_join(read_thread, NULL); return 0; }
      
      





上記のコードでは、データは、data_ptrが指すバッファーから、writer_tmp_ptrが指すバッファーに処理前にコピーされます。 そして、これらのポインターは場所を変えます。 さらに、data_ptrでは、writer_tmp_ptrはアトミック操作compare_and_swapを使用して書き込まれます。この操作は、最初の引数を2番目と比較し、一致する場合は3番目の引数を最初に書き込み、trueを返します。 それ以外の場合、falseを返します。 これは何のためですか? 例としてリーダー機能を検討してください。 この関数を実行するスレッドが行a = tmp_ptr-> aの後に一時停止したとします。 この時点で、tmp_ptrはデータを指します。 ライター関数を実行しているスレッドがすぐに動作を開始しました。 最初の反復を完了した後、彼はwriter_tmp_ptrとdata_ptrを交換し、次の反復を開始し、行data-> b ++の後に停止しました。 この状況では、writer_tmp_ptrはデータを指し、リーダー関数のtmp_ptrはデータを指します。 同じバッファの同時読み取りと変更が判明します。 ただし、data_ptrポインターとtmp_ptrポインターが一致しなくなったため、compare_and_swap操作は衝突を検出し、読み取り操作を再度実行します。 割り当てreader_tmp_ptr = tmp_ptrがこのようなチェックに失敗するのはなぜですか?



すべてがシンプルです。 reader_tmp_ptr変数は、実行されるスレッド固有の変数です。 この例では、グローバルに設定しましたが、完全に正しいわけではありません。 複数の読み取りストリームの場合、2番目のストリームに別のグローバル変数を設定し、この変数をそのストリームに固有のバッファーポインターとして使用するために、現在実行中のストリームを決定する関数内で設定する必要があります。 最良のオプションは、いわゆるを使用することです。 フロー固有の変数。 たとえば、pthreadライブラリには、pthread_getspecific / pthread_setspecificなどの優れた機能があります。 このコードを書く目標は、このアルゴリズムがどのように機能するかを読者に明確に示すことでした。 最適化なしでは、本質の概念を混乱させるだけです。



すべてが完璧であるように思われ、プログラムは画面上に同じ値のペアを表示する必要がありますが、それほど単純ではありません。 行a = tmp_ptr-> a;の後にリーダー関数を実行するスレッドが停止したことも想像してください。 その後、ライター関数を実行するスレッドは2回の反復を完了し、3回目を実行します。 プロセス機能の完了後に停止します。 次に、リーダー機能を実行するスレッドが作業を再開します。 この状況では、変数aとbの値は一致しませんが、compare_and_swap操作はtrueを返します。 data_ptrは再びデータを指します。つまり、data_ptrとtmp_ptrは再び一致します。 これはABA問題と呼ばれます 。 この問題を解決する1つの方法は、ポインターにカウンターを追加することです。これは、新しい値が割り当てられるたびに増分します。 次の例では、このような問題はありません。



 #include <stdio.h> #include <stdint.h> #include <pthread.h> struct data_s { int a; int b; }; struct data_pointer_s { union { uint64_t qw[2]; struct { struct data_s *data_ptr; uint64_t aba_counter; }; }; }; static inline char cas128bit(volatile struct data_pointer_s *a, struct data_pointer_s b, struct data_pointer_s c) { char result; __asm__ __volatile__( "lock cmpxchg16b %1\n\t" "setz %0\n" : "=q" (result) , "+m" (a->qw) : "a" (b.data_ptr), "d" (b.aba_counter) , "b" (c.data_ptr), "c" (c.aba_counter) : "cc" ); return result; } struct data_s reader_tmp, writer_tmp, data; struct data_pointer_s reader_tmp_ptr, writer_tmp_ptr, data_ptr; int done = 0; void process(struct data_s *data) { data->a++; data->b++; } void* writer(void* p) { struct data_pointer_s tmp_ptr; int i; for(i = 0; i < 1000000; i++) { do { tmp_ptr = data_ptr; writer_tmp_ptr.data_ptr->a = tmp_ptr.data_ptr->a; writer_tmp_ptr.data_ptr->b = tmp_ptr.data_ptr->b; process(writer_tmp_ptr.data_ptr); writer_tmp_ptr.aba_counter = tmp_ptr.aba_counter + 1; } while(!cas128bit(&data_ptr, tmp_ptr, writer_tmp_ptr)); writer_tmp_ptr = tmp_ptr; } } void* reader(void *p) { struct data_pointer_s tmp_ptr; int a, b; while(!done) { do { tmp_ptr = data_ptr; reader_tmp_ptr.data_ptr->a = tmp_ptr.data_ptr->a; reader_tmp_ptr.data_ptr->b = tmp_ptr.data_ptr->b; a = tmp_ptr.data_ptr->a; b = tmp_ptr.data_ptr->b; reader_tmp_ptr.aba_counter = tmp_ptr.aba_counter + 1; } while(!cas128bit(&data_ptr, tmp_ptr, reader_tmp_ptr)); reader_tmp_ptr = tmp_ptr; printf("data = {%d, %d}\n", a, b); } } int main() { pthread_t reader_thread, writer_thread; data.a = 0; data.b = 0; data_ptr.data_ptr = &data; data_ptr.aba_counter = 0; writer_tmp_ptr.data_ptr = &writer_tmp; writer_tmp_ptr.aba_counter = 0; reader_tmp_ptr.data_ptr = &reader_tmp; reader_tmp_ptr.aba_counter = 0; pthread_create(&reader_thread, NULL, reader, NULL); pthread_create(&writer_thread, NULL, writer, NULL); pthread_join(writer_thread, NULL); done = 1; pthread_join(reader_thread, NULL); return 0; }
      
      





このコードの有効性は、コピーされるデータの量とプロセス機能の複雑さに依存することに注意してください。 数十メガバイトのボリュームを持つデータブロックのアトミック処理が必要な場合、ミューテックスの使用ははるかに効率的です。 また、compare_and_swapがfalseを返した後、別のスレッドに操作を完了する機会を与えるたびに、わずかな遅延(数マイクロ秒のオーダー)を追加することを検討するとよいでしょう。 繰り返しますが、遅延の存在と時間はタスクの詳細に直接依存します。



また、このアルゴリズムがどのように機能するかを理解し、提示する際に助けてくれたユーザーvladvicに感謝します。



All Articles