2つのスレッド用のシンプルなロックフリーオブジェクト

ユニバーサルロックフリーオブジェクトに関する多くの記事がありますが、いくつかの特別な場合には、それらは不必要に面倒です。 私の場合は、それだけでした。あるストリームから別のストリームへの情報の一方向の送信を整理する必要がありました。 メインスレッドはワーカーによって開始されます。その後、彼は停止を要求することしかできなくなり、管理できなくなります。 次に、ワーカースレッドは、現在の状態(実行の進行状況)をメインに通知し、実行の中間結果を送信できます。 ワーカーからメインストリームへのデータ転送のみが必要であることがわかりました。



もちろん、おそらく私は自転車を、あるいはもっと悪いことにグリッチのある自転車を発明しました。 したがって、コメントと批判は大歓迎です!



状態オブジェクト



ワークフローの状態はクラスとして表されます。 同時に、メインスレッドは常に状態オブジェクトに保存されたデータを取得する義務はありません(たとえば、メインスレッドが進行状況の中間値をスキップするかどうかは関係ありません。現時点で関連する最新のものを取得することが重要です)



ロックフリー状態転送を実装するには、その3つのインスタンス(同じクラスの異なるオブジェクト)が必要です。



var ReadItem: TLockFreeWorkState; CurrentItem: TLockFreeWorkState; WriteItem: TLockFreeWorkState;
      
      





アイデアは次のとおりです。ワークフローはWriteItemオブジェクトに自由にアクセスできます。 すべてのデータが保存されると、InterlockedExchange操作がCurrentItemのオブジェクトで実行され、その後、メインスレッドに新しい状態の準備ができたことが通知されます(この例では、通常のPostMessageが使用されました)。 通知ハンドラーのメインスレッドは、ReadItemオブジェクトを使用してCurrentItemオブジェクトのInterlockedExchange操作を実行します。その後、ReadItemからデータを自由に読み取ることができます。



このような「バブル」が判明します。ステータスデータがWriteItemに表示され、ReadItemのCurrentItemを介して「ポップアップ」されます。 ちなみに、このような構造の基本クラスの通常の名前は思いつかなかったので、単にTLockFreeWorkStateを呼び出しました(誰かがもっと良いアイデアを持っているかもしれません)。



注意点が1つあります。メインスレッドはいつでも現在の状態に適用できます。 常にInterlockedExchangeを実行する場合は、現在の状態と以前の状態を交互に返します。



クラスの通常の最新フラグは、これを防ぐのに役立ちます。 状態を書き込むとき、ワークフローは常にWriteItem.Newest:= Trueを設定し、InterlockedExchangeの後、このフラグはCurrentItemになります。 最初のメインスレッドはCurrentItem.Newestをチェックし、Trueの場合のみ、InterlockedExchangeを実行し、ReadItem.Newestは直ちにFalseにリセットします。 メインスレッドからCurrentItem.Newestを読むのは安全だと思いましたが、正しくない場合は修正してください。



これはすべて、単純化されたコードの形式になっています(明確にするために、型のゴーストは省略されています)。



 type TLockFreeWorkState = class public Newest: Boolean; end; function Read(var CurrentItem, ReadItem: TLockFreeWorkState): Boolean; begin if CurrentItem.Newest then begin ReadItem := InterlockedExchangePointer(CurrentItem, ReadItem); ReadItem.Newest := False; Result := True; end else Result := False; end; procedure Write(var CurrentItem, WriteItem: TLockFreeWorkState); begin WriteItem.Newest := True; WriteItem := InterlockedExchangePointer(CurrentItem, WriteItem); end;
      
      





キューオブジェクト



いくつかの点でアプローチは似ていますが、実装のために最初に必要なオブジェクトは1つだけですが、それへの2つのリンクが必要です。



 var ReadQueue: TLockFreeWorkQueue; WriteQueue: TLockFreeWorkQueue;
      
      





最初に、TLockFreeWorkQueueの単一のインスタンスが作成され、ReadQueue変数とWriteQueue変数に書き込まれます。 このクラスは循環バッファーであり、次の説明があります。



  TLockFreeWorkQueue = class public Head: Integer; Tail: Integer; Items: array[0..QueueCapacity - 1] of TObject; end;
      
      





QueueCapacityは、リングバッファーの長さを決定する定数(ゼロより大きい)です。



アイテムがキューに追加されると、ワークフローはWriteQueue.Items [Tail]要素のInterlockedExchangeComparePointerを実行します。 この場合、要素はNilと比較され、成功した場合、追加される要素がNilに書き込まれます。 操作が成功した場合、Tail値は1増加し、QueueCapacityに達すると0にリセットされます。 ワーカースレッド(ライタースレッド)のみがこの変数にアクセスできるため、Tailを自由に操作できます。 また、この後、ワークフローはメインアイテムにアイテムがキューに表示されたことを通知する必要があります。 操作が失敗した場合、これはキューがいっぱいであることを意味しますが、それについては後で説明します。



メインスレッドは、ワーカーから通知されると、キューから要素を読み取るサイクルを開始します(実際、読み取りはいつでも開始できます)。 要素を取得するために、Nil値が書き込まれるReadQueue.Items [Head]要素に対してInterlockedExchangePointerが呼び出されます。 抽出されたアイテムがNilでない場合、Head値は1増加し、QueueCapacityに達すると0にリセットされます。



それでは、バッファオーバーフローのケースに対処しましょう。 新しい要素については、新しいキューオブジェクトを作成して書き込みを続けることができます。そのため、このオブジェクトをリーダースレッドで見つけられるように、現在のキューオブジェクトにリンクを渡す必要があります。 これを行うには、追加のNextQueueフィールドをクラスに追加します。



  TLockFreeWorkQueue = class public Head: Integer; Tail: Integer; Items: array[0..QueueCapacity - 1] of TObject; NextQueue: TLockFreeWorkQueue; end;
      
      





ここで、InterlockedExchangeComparePointer要素を書き込むときにNilを返す場合(キューがいっぱい)、新しいNewWriteQueueキューオブジェクトを作成します:TLockFreeWorkQueue、それに追加する要素を書き込み、InterlockedExchangePointerをWriteQueue.NextQueue変数で実行し、最後にNewWriteQueueをWriteQue変数に保存します。 したがって、この操作の後、ReadQueueおよびWriteQueueの値はすでに異なるオブジェクトを参照しています。



メインスレッドで、空のキュー処理を追加する必要があります。 ReadQueue.Items [Head]要素のInterlockedExchangePointerを読み込んでNilを返す場合、InterlockedExchangePointer(ReadQueue.NextQueue、Nil)も実行するNextQueueフィールドを追加で確認する必要があります。 Non-Nilが返された場合、オブジェクトをNewReadQueueに保存し、現在のReadQueueを削除して、この変数をNewReadQueueに設定します。



以下は、キューにアイテムを追加するための簡略化されたコードです。



 procedure AddQueueItem(var WriteQueue: TLockFreeWorkQueue; Item: TObject); var NewWriteQueue: TLockFreeWorkQueue; begin if InterlockedCompareExchangePointer(WriteQueue.Items[WriteQueue.Tail]), Item, Nil) = Nil then begin // Added successfully Inc(WriteQueue.Tail); if WriteQueue.Tail = QueueCapacity then WriteQueue.Tail := 0; end else begin // WriteQueue full. Create new chained queue. NewWriteQueue := TLockFreeWorkQueue.Create; NewWriteQueue.Items[0] := Item; Inc(NewWriteQueue.Tail); if NewWriteQueue.Tail = QueueCapacity then // Check single-item queue NewWriteQueue.Tail := 0; InterlockedExchangePointer(WriteQueue.NextQueue, NewWriteQueue); WriteQueue := NewWriteQueue; end; end;
      
      





そして、キューからアイテムを取得します。



 function ExtractQueueItem(var ReadQueue: TLockFreeWorkQueue): TObject; var NewReadQueue: TLockFreeWorkQueue; begin Result := Nil; repeat Result := InterlockedExchangePointer(ReadQueue.Items[ReadQueue.Head], Nil); if Result = Nil then begin // No new items in this queue. Check next queue is available NewReadQueue := InterlockedExchangePointer(ReadQueue.NextQueue, Nil); if Assigned(NewReadQueue) then begin ReadQueue.Free; ReadQueue := NewReadQueue; end else // No new item in queue Exit; end; until Result <> Nil; // Item extracted successfully Inc(ReadQueue.Head); if ReadQueue.Head = QueueCapacity then ReadQueue.Head := 0; end;
      
      





このコードでは、多少安全かもしれません。 NextQueueフィールドを使用する操作で、通常InterlockedExchangePointerを使用する必要があるかどうかはわかりませんが、直接読み取りおよび書き込みを実行しても安全な場合があります。



テストケース



簡単なコンソールの例とともに、作業コードとコームコードをネタバレの下で見ることができます。



テストケース
 program LockFreeTest; {$APPTYPE CONSOLE} {$R *.res} uses SysUtils, Classes, Windows, Messages; // Lock-free work thread state //////////////////////////////////////////////// type TLockFreeWorkState = class protected FNewest: Boolean; public class function Read(var CurrentItem, ReadItem): Boolean; class procedure Write(var CurrentItem, WriteItem); property Newest: Boolean read FNewest write FNewest; end; class function TLockFreeWorkState.Read(var CurrentItem, ReadItem): Boolean; begin if TLockFreeWorkState(CurrentItem).Newest then begin pointer(ReadItem) := InterlockedExchangePointer(pointer(CurrentItem), pointer(ReadItem)); TLockFreeWorkState(ReadItem).Newest := False; Result := True; end else Result := False; end; class procedure TLockFreeWorkState.Write(var CurrentItem, WriteItem); begin TLockFreeWorkState(WriteItem).Newest := True; pointer(WriteItem) := InterlockedExchangePointer(pointer(CurrentItem), pointer(WriteItem)); end; // Lock-free work thread queue //////////////////////////////////////////////// type TLockFreeWorkQueue = class public const QueueCapacity = 4; // Small value for test purposes public type TLockFreeWorkQueueItems = array[0..QueueCapacity - 1] of TObject; public Head: Integer; // Access from main thread only Tail: Integer; // Access from work thread only NextQueue: TLockFreeWorkQueue; Items: TLockFreeWorkQueueItems; public destructor Destroy; override; class procedure Add(var WriteQueue: TLockFreeWorkQueue; Item: TObject); static; class function Extract(var ReadQueue: TLockFreeWorkQueue): TObject; static; end; destructor TLockFreeWorkQueue.Destroy; var i: Integer; begin // Free non-extracted items for i := 0 to QueueCapacity - 1 do Items[i].Free; // Free NextQueue if exists NextQueue.Free; inherited; end; class procedure TLockFreeWorkQueue.Add(var WriteQueue: TLockFreeWorkQueue; Item: TObject); var NewWriteQueue: TLockFreeWorkQueue; begin // Check item assigned (can't add empty items) if not Assigned(Item) or not Assigned(WriteQueue) then Exit; if InterlockedCompareExchangePointer(pointer(WriteQueue.Items[WriteQueue.Tail]), pointer(Item), Nil) = Nil then begin // Added successfully Inc(WriteQueue.Tail); if WriteQueue.Tail = QueueCapacity then WriteQueue.Tail := 0; end else begin // WriteQueue full. Create new chained queue. NewWriteQueue := TLockFreeWorkQueue.Create; NewWriteQueue.Items[0] := Item; Inc(NewWriteQueue.Tail); if NewWriteQueue.Tail = QueueCapacity then // Check single-item queue NewWriteQueue.Tail := 0; InterlockedExchangePointer(pointer(WriteQueue.NextQueue), NewWriteQueue); WriteQueue := NewWriteQueue; end; end; class function TLockFreeWorkQueue.Extract(var ReadQueue: TLockFreeWorkQueue): TObject; var NewReadQueue: TLockFreeWorkQueue; begin Result := Nil; if not Assigned(ReadQueue) then Exit; repeat Result := InterlockedExchangePointer(pointer(ReadQueue.Items[ReadQueue.Head]), Nil); if Result = Nil then begin // No new items in this queue. Check next queue is available NewReadQueue := InterlockedExchangePointer(pointer(ReadQueue.NextQueue), Nil); if Assigned(NewReadQueue) then begin ReadQueue.Free; ReadQueue := NewReadQueue; end else // No new item in queue Exit; end; until Result <> Nil; // Item extracted successfully Inc(ReadQueue.Head); if ReadQueue.Head = QueueCapacity then ReadQueue.Head := 0; end; // Test work thread /////////////////////////////////////////////////////////// const WM_MAINNOTIFY = WM_USER + 1; type TWorkThreadState = class(TLockFreeWorkState) public Progress: Integer; end; TWorkThreadQueueItem = class public ItemData: Integer; end; TWorkThread = class(TThread) protected FMainHandle: THandle; FMainNotified: Integer; // State fields FStateRead: TWorkThreadState; FStateCurrent: TWorkThreadState; FStateWrite: TWorkThreadState; // Queue fields FQueueRead: TLockFreeWorkQueue; FQueueWrite: TLockFreeWorkQueue; // Debug (test) fiels FDebugReadQueue: Boolean; procedure Execute; override; procedure SetState; procedure AddQueueItem(Item: TWorkThreadQueueItem); procedure NotifyMain; public constructor Create(CreateSuspended: Boolean); destructor Destroy; override; function GetState: TWorkThreadState; function ExtractQueueItem: TWorkThreadQueueItem; procedure NotificationProcessed; property MainHandle: THandle read FMainHandle; end; constructor TWorkThread.Create(CreateSuspended: Boolean); begin inherited Create(CreateSuspended); // State objects FStateRead := TWorkThreadState.Create; FStateCurrent := TWorkThreadState.Create; FStateWrite := TWorkThreadState.Create; // Queue objects FQueueRead := TLockFreeWorkQueue.Create; FQueueWrite := FQueueRead; end; destructor TWorkThread.Destroy; begin inherited; FStateRead.Free; FStateCurrent.Free; FStateWrite.Free; // Always destroy read queue only: only read queue may have NextQueue reference FQueueRead.Free; end; procedure TWorkThread.NotifyMain; begin if InterlockedExchange(FMainNotified, 1) = 0 then PostMessage(FMainHandle, WM_MAINNOTIFY, 0, 0); end; procedure TWorkThread.NotificationProcessed; begin InterlockedExchange(FMainNotified, 0); end; function TWorkThread.GetState: TWorkThreadState; begin TLockFreeWorkState.Read(FStateCurrent, FStateRead); Result := FStateRead; end; procedure TWorkThread.SetState; begin TLockFreeWorkState.Write(FStateCurrent, FStateWrite); end; procedure TWorkThread.AddQueueItem(Item: TWorkThreadQueueItem); begin TLockFreeWorkQueue.Add(FQueueWrite, Item); end; function TWorkThread.ExtractQueueItem: TWorkThreadQueueItem; begin Result := TWorkThreadQueueItem(TLockFreeWorkQueue.Extract(FQueueRead)); end; procedure TWorkThread.Execute; const TestQueueCountToFlush = 10; var ProgressIndex: Integer; TestQueueCount: Integer; Item: TWorkThreadQueueItem; begin TestQueueCount := 0; ProgressIndex := 0; while not Terminated do begin // Send current progress if FStateWrite.Progress <> ProgressIndex then begin // All state object fields initialization required FStateWrite.Progress := ProgressIndex; SetState; NotifyMain; end; // Emulate calculation Sleep(500); Inc(ProgressIndex); // Put intermediate result in queue Item := TWorkThreadQueueItem.Create; Item.ItemData := ProgressIndex; AddQueueItem(Item); Inc(TestQueueCount); if TestQueueCount = TestQueueCountToFlush then begin TestQueueCount := 0; // Allow queue reading from main thread FDebugReadQueue := True; NotifyMain; end; end; end; // Test application /////////////////////////////////////////////////////////// type TMain = class protected FHandle: THandle; FThread: TWorkThread; procedure WndProc(var Message: TMessage); public constructor Create; destructor Destroy; override; function Run: Boolean; property Handle: THandle read FHandle; end; var Main: TMain; constructor TMain.Create; begin FHandle := AllocateHWnd(WndProc); FThread := TWorkThread.Create(True); FThread.FMainHandle := Handle; FThread.Start; writeln('Work thread started'); end; destructor TMain.Destroy; begin writeln('Stopping work thread...'); FThread.Free; writeln('Work thread stopped'); DeallocateHWnd(FHandle); inherited; end; procedure TMain.WndProc(var Message: TMessage); var State: TWorkThreadState; Item: TWorkThreadQueueItem; begin if Message.Msg = WM_MAINNOTIFY then begin FThread.NotificationProcessed; State := FThread.GetState; // Show current progress writeln('Work progress ', State.Progress); // Check queue reading allowed if FThread.FDebugReadQueue then begin writeln('Read queue...'); repeat Item := FThread.ExtractQueueItem; try if Assigned(Item) then writeln('Queue item: ', Item.ItemData); finally Item.Free; end; until not Assigned(Item); FThread.FDebugReadQueue := False; end; end else Message.Result := DefWindowProc(Handle, Message.Msg, Message.wParam, Message.lParam); end; function TMain.Run: Boolean; var Msg: TMsg; begin writeln('Start message loop (Ctrl+C to break)'); Result := True; while Result do case Integer(GetMessage(Msg, Handle, 0, 0)) of 0: Break; -1: Result := False; else begin TranslateMessage(Msg); DispatchMessage(Msg); end; end; end; // Console event handler ////////////////////////////////////////////////////// function ConsoleEventProc(CtrlType: DWORD): BOOL; stdcall; begin Result := False; case CtrlType of CTRL_CLOSE_EVENT, CTRL_C_EVENT, CTRL_BREAK_EVENT: if Assigned(Main) then begin PostMessage(Main.Handle, WM_QUIT, 0, 0); Result := True; end; end; end; // Main procedure ///////////////////////////////////////////////////////////// begin {$IFDEF DEBUG} ReportMemoryLeaksOnShutdown := True; {$ENDIF} try SetConsoleCtrlHandler(@ConsoleEventProc, True); Main := TMain.Create; try Main.Run; finally FreeAndNil(Main); end; except on E: Exception do Writeln(E.ClassName, ': ', E.Message); end; end.
      
      







通常の状況では、キューにアイテムが表示されると、できるだけ早くメインストリームによってアイテムを取得する必要があります。 ただし、キューのオーバーフローをテストするために、TWorkThread.FDebugReadQueueフィールドを追加しました。Falseに設定すると、メインスレッドがキューから読み取ることができなくなります(TWorkThread.Executeメソッドでは、定数TestQueueCountToFlush = 10が導入され、メインスレッドは要素を10個追加した後にのみ読み取ることができます)。



残念ながら、テストケースは単純すぎて、読み取り/書き込みユーティリティ関数内でストリームが切り替えられたときに、ストリーム間の読み取り/書き込みの衝突を生成しません。 しかし、ここでは、アルゴリズムのすべてのボトルネックをチェックすることが可能かどうか、およびコードを何に向ける必要があるかはわかりません。



All Articles