時間制限のある並列データ読み込み

いくつかのリモートソースからデータを取得する必要があるが、待機時間が長すぎない場合があります。 たとえば、天気や通貨のデータを読み込むときに、いくつかのサービスに問い合わせて、一定期間回答したすべての人の結果を表示できます。







この期間中に十分な数のサービスが応答しなかった場合は、ダウンロードを待つ時間を追加できます。



合計で、3つの基本的なパラメーターで動作します。





物事を簡単にするために、ローダークラスを作成します。 すべてが非常にシンプルで、最初にリストし、次に説明します:

AsyncDataLoader
public sealed class AsyncDataLoader<T> { /// <summary> ///   . /// </summary> public AsyncDataLoader() { EmergencyPeriod = TimeSpan.Zero; MinResultsCount = 0; } /// <summary> ///   . /// </summary> /// <param name="dataLoaders">,  .</param> /// <param name="loadDataPeriod">,       .</param> public AsyncDataLoader(IEnumerable<Func<T>> dataLoaders, TimeSpan loadDataPeriod) : this(dataLoaders, loadDataPeriod, 0, TimeSpan.Zero) { } /// <summary> ///   . /// </summary> /// <param name="dataLoaders">,  .</param> /// <param name="loadDataPeriod">,       .</param> /// <param name="minimalResultsCount">    .</param> /// <param name="emergencyPeriod">,       .</param> public AsyncDataLoader(IEnumerable<Func<T>> dataLoaders, TimeSpan loadDataPeriod, int minimalResultsCount, TimeSpan emergencyPeriod) { DataLoaders = dataLoaders; LoadDataPeriod = loadDataPeriod; EmergencyPeriod = emergencyPeriod; MinResultsCount = minimalResultsCount; } /// <summary> ///    ,        ,     . /// </summary> public TimeSpan EmergencyPeriod { get; set; } /// <summary> ///       . /// </summary> public int MinResultsCount { get; set; } /// <summary> ///    ,  . /// </summary> public IEnumerable<Func<T>> DataLoaders { get; set; } /// <summary> ///    ,       . /// </summary> public TimeSpan LoadDataPeriod { get; set; } /// <summary> ///       . /// </summary> public bool SkipDefaultResults { get; set; } /// <summary> ///      . /// </summary> /// <returns> .</returns> public async Task<T[]> GetResultsAsync() { BlockingCollection<T> results = new BlockingCollection<T>(); List<Task> tasks = new List<Task>(); tasks.AddRange(DataLoaders.Select(handler => Task.Factory.StartNew(() => { T result = handler.Invoke(); results.Add(result); }, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness, TaskScheduler.Default))); bool isAllCompleted = true; try { CancellationTokenSource source = new CancellationTokenSource(LoadDataPeriod); CancellationToken token = source.Token; #if DEBUG token = CancellationToken.None; //    #endif await Task.Factory.ContinueWhenAll(tasks.ToArray(), (t) => { }, token); } catch (TaskCanceledException ex) //  ?  . { isAllCompleted = false; } if (!isAllCompleted && EmergencyPeriod > TimeSpan.Zero) //     { Func<bool> isReadyHandler = () => results.Count >= MinResultsCount; //,  ,     . await WaitWhenReady(isReadyHandler, EmergencyPeriod); } if (SkipDefaultResults) return results.Where(r => !object.Equals(r, default(T))).ToArray(); return results.ToArray(); } /// <summary> ///    . /// </summary> /// <param name="isReadyValidator">, ,   .</param> /// <param name="totalDelay"> .</param> /// <param name="iterationsCount">   .</param> private async Task WaitWhenReady(Func<bool> isReadyValidator, TimeSpan totalDelay, int iterationsCount = 7) { if (isReadyValidator()) return; double milliseconds = totalDelay.TotalMilliseconds / iterationsCount; TimeSpan delay = TimeSpan.FromMilliseconds(milliseconds); for (int i = 0; i < iterationsCount; i++) { if (isReadyValidator()) return; await Task.Delay(delay); } } }
      
      







GetResultsAsyncの本文:

  1. 結果を保存するコレクションを作成します。 BlockingCollectionクラスは、異なるスレッドと対話するときに安全です。
  2. 各ハンドラーを個別のタスクに配置します。 すべてのタスクをリストにグループ化し、スケジューラに長時間実行について警告し(TaskCreationOptions.LongRunning)、優先度を追加するように依頼します(TaskCreationOptions.PreferFairness)。
  3. 実行時にすべてのタスクを開始し、時間制限を設定します。
  4. 必要に応じて、データをダウンロードするための追加の時間を与えます。
  5. SkipDefaultResults == trueフラグの場合、戻る前に空の結果をスキップします。


デバッグバージョンの場合、デバッグされた関数のコードをウォークスルーできるように、時間制限を強制的に無効にします。

参照:




All Articles