Hangfire Queueのサポート

Hangfireは.net(コア)のライブラリで、「fire and forget」の原則に基づいて一部のコードを非同期で実行できます。 このようなコードの例としては、電子メールの送信、ビデオ処理、別のシステムとの同期などがあります。 「ファイアアンドフォーゲット」に加えて、Cron形式のスケジュールされたタスクだけでなく、遅延タスクもサポートされています。







現在、このようなライブラリが多数あります。 Hangfireの利点のいくつかは次のとおりです。









Hangfireとその使用方法に関する多くの優れた記事があるので、あまり詳細に説明しません。 この記事では、いくつかのキュー(またはタスクプール)のサポートを使用する方法、標準の再試行機能を修正する方法、および各キューに個別の構成を持たせる方法について説明します。







(疑似)キューの既存のサポート



重要な注意:タイトルでは、Hangfireはタスクが特定の順序で実行されることを保証しないため、疑似キューという用語を使用しました。 つまり 「先入れ先出し」の原則は適用されず、これに依存しません。 さらに、ライブラリの作成者は、タスクをべき等にすることを推奨しています。 予期しない複数の実行に対して安定しています。 さらに、「キュー」という言葉だけを使用します。 Hangfireは「キュー」という用語を使用します。







Hangfireは単純なキューをサポートしています。 rabbitMQやAzure Service Busなどのメッセージキューシステムの柔軟性はありませんが、多くの場合、幅広いタスクを解決するのに十分です。







各タスクには「キュー」プロパティ、つまり、実行する必要があるキューの名前があります。 デフォルトでは、特に指定しない限り、タスクは「default」という名前でキューに送信されます。 異なるタイプのタスクの実行を個別に管理するには、複数のキューのサポートが必要です。 たとえば、ビデオ処理タスクを「video_queue」キューに入れ、電子メールを「email_queue」キューに配信することができます。 したがって、これら2つのタイプのタスクを個別に実行できます。 ビデオ処理を専用サーバーに移動する場合は、「video_queue」キューを処理するコンソールアプリケーションとして別のHangfireサーバーを実行することにより、これを簡単に行うことができます。







練習に移りましょう



asp.netコアでのHangfireサーバーのセットアップは次のとおりです。







public void Configure(IApplicationBuilder app) { app.UseHangfireServer(new BackgroundJobServerOptions { WorkerCount = 2, Queues = new[] { "email_queue", "video_queue" } }); }
      
      





問題1-再生タスクが「デフォルト」キューに入る



上で述べたように、Hangfireには「デフォルト」と呼ばれるデフォルトのキューがあります。 「video_queue」などのキューに置かれたタスクが失敗し、再試行する必要がある場合、「video_queue」ではなく、再実行のためにデフォルトのキューに送信されます。その結果、タスクはまったく実行されません。必要に応じて、Hangfireサーバーのインスタンス。 この動作は実験的に確立されたもので、おそらくHangfire自体のバグです。







ジョブフィルター



Hangfireでは、ASP.NET MVCのアクションフィルターに原則的に似ている、いわゆるフィルター( ジョブフィルター )の助けを借りて機能を拡張することができます。 実際には、Hangfireの内部ロジックはステートマシンとして実装されています。 これは、プール内のタスクをある状態から別の状態に順次転送するエンジンです(たとえば、作成->エンキュー->処理->成功)。フィルターにより、状態が変わるたびに実行されるタスクを「インターセプト」して操作できます。 フィルターは、単一のメソッド、クラス、またはグローバルに適用できる属性として実装されます。







ジョブパラメータ



ElectStateContextオブジェクトは、引数としてフィルターメソッドに渡されます。 このオブジェクトには、現在のタスクに関する完全な情報が含まれています。 特に、GetJobParameter <>(...)およびSettJobParameter <>(...)メソッドがあります。 ジョブパラメータを使用すると、タスク関連の情報をデータベースに保存できます。 タスクが最初に送信されたキューの名前が保存されるのはジョブパラメータにあります。何らかの理由で、この情報は次の再試行時に無視されます。







解決策



したがって、エラーで終了したタスクがあり、(最初の作成時に割り当てられたものと同じ)適切なキューで再実行するために送信する必要があります。 エラーで完了したタスクの繰り返しは、失敗状態からエンキュー状態への移行です。 問題を解決するには、タスクが「エンキュー」状態になると、タスクが最初に送信されたキューをチェックし、「QueueName」パラメーターを目的の値に入れるフィルターを作成します。







 public class HangfireUseCorrectQueueFilter : JobFilterAttribute, IElectStateFilter { public void OnStateElection(ElectStateContext context) { if (context.CandidateState is EnqueuedState enqueuedState) { var queueName = context.GetJobParameter<string>("QueueName"); if (string.IsNullOrWhiteSpace(queueName)) { context.SetJobParameter("QueueName", enqueuedState.Queue); } else { enqueuedState.Queue = queueName; } } } }
      
      





デフォルトのフィルターをすべてのタスクに(つまりグローバルに)適用するには、次のコードを構成に追加します。







 GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });
      
      





もう1つの小さな問題は、GlobalJobFiltersコレクションにデフォルトでAutomaticRetryAttributeクラスのインスタンスが含まれていることです。 これは、失敗したタスクの再実行を担当する標準フィルターです。 また、元のキューを無視して、タスクを「デフォルト」キューに送信します。 自転車に乗るには、このフィルターをコレクションから削除し、フィルターに繰り返しのタスクを任せる必要があります。 その結果、構成コードは次のようになります。







 var defaultRetryFilter = GlobalJobFilters.Filters .FirstOrDefault(f => f.Instance is AutomaticRetryAttribute); if (defaultRetryFilter != null && defaultRetryFilter.Instance != null) { GlobalJobFilters.Filters.Remove(defaultRetryFilter.Instance); } GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });
      
      





AutomaticRetryAttributeは、試行と試行の間の間隔を自動的に増加させるロジック(次の試行ごとに間隔が増加するロジック)を実装し、GlobalJobFiltersコレクションからAutomaticRetryAttributeを削除することに注意する必要があります( ScheduleAgainLaterメソッドの実装を参照)







そのため、タスクを異なるキューで実行できるようになりました。これにより、異なるマシンで異なるキューを処理するなど、実装を個別に管理できます。 フィルターコレクションからAutomaticRetryAttributeを削除したため、エラーが発生した場合にタスクが繰り返される回数と間隔はわかりません。







問題2-各キューの個別設定



キューごとに間隔と繰り返し回数を個別に構成できるようにしたいだけでなく、一部のキューについて明示的に値を指定しなかった場合は、デフォルト値を適用する必要があります。 これを行うには、別のフィルターを実装し、それをHangfireRetryJobFilter



と呼びます。







理想的には、構成コードは次のようになります。







 GlobalJobFilters.Filters.Add(new HangfireRetryJobFilter { Order = 2, ["email_queue"] = new HangfireQueueSettings { DelayInSeconds = 120, RetryAttempts = 3 }, ["video_queue"] = new HangfireQueueSettings { DelayInSeconds = 60, RetryAttempts = 5 } });
      
      





解決策



これを行うには、最初にHangfireQueueSettings



クラスを追加します。これは、設定のコンテナーとして機能します。







 public sealed class HangfireQueueSettings { public int RetryAttempts { get; set; } public int DelayInSeconds { get; set; } }
      
      





次に、フィルター自体の実装を追加します。エラーの後にタスクが繰り返されると、キューの構成に応じて設定が適用され、再試行回数が監視されます。







 public class HangfireRetryJobFilter : JobFilterAttribute, IElectStateFilter, IApplyStateFilter { private readonly HangfireQueueSettings _defaultQueueSettings = new HangfireQueueSettings { RetryAttempts = 3, DelayInSeconds = 10 }; private readonly IDictionary<string, HangfireQueueSettings> _settings = new Dictionary<string, HangfireQueueSettings>(); public HangfireQueueSettings this[string queueName] { get { return _settings.TryGetValue(queueName, out HangfireQueueSettings queueSettings) ? queueSettings : _defaultQueueSettings; } set { _settings[queueName] = value; } } public void OnStateElection(ElectStateContext context) { if (!(context.CandidateState is FailedState failedState)) { // This filter accepts only failed job state. return; } var retryAttempt = context.GetJobParameter<int>("RetryCount") + 1; var queueName = context.GetJobParameter<string>("QueueName"); if (retryAttempt <= this[queueName].RetryAttempts) { ScheduleAgainLater(context, retryAttempt, failedState, queueName); } else { TransitionToDeleted(context, failedState, queueName); } } public void OnStateApplied( ApplyStateContext context, IWriteOnlyTransaction transaction) { if (context.NewState is ScheduledState && context.NewState.Reason != null && context.NewState.Reason.StartsWith("Retry attempt")) { transaction.AddToSet("retries", context.BackgroundJob.Id); } } public void OnStateUnapplied( ApplyStateContext context, IWriteOnlyTransaction transaction) { if (context.OldStateName == ScheduledState.StateName) { transaction.RemoveFromSet("retries", context.BackgroundJob.Id); } } private void ScheduleAgainLater( ElectStateContext context, int retryAttempt, FailedState failedState, string queueName) { context.SetJobParameter("RetryCount", retryAttempt); var delay = TimeSpan.FromSeconds(this[queueName].DelayInSeconds); const int maxMessageLength = 50; var exceptionMessage = failedState.Exception.Message.Length > maxMessageLength ? failedState.Exception.Message.Substring(0, maxMessageLength - 1) + "…" : failedState.Exception.Message; // If attempt number is less than max attempts, we should // schedule the job to run again later. var reason = $"Retry attempt {retryAttempt} of {this[queueName].RetryAttempts}: {exceptionMessage}"; context.CandidateState = delay == TimeSpan.Zero ? (IState)new EnqueuedState { Reason = reason } : new ScheduledState(delay) { Reason = reason }; } private void TransitionToDeleted( ElectStateContext context, FailedState failedState, string queueName) { context.CandidateState = new DeletedState { Reason = this[queueName].RetryAttempts > 0 ? "Exceeded the maximum number of retry attempts." : "Retries were disabled for this job." }; } }
      
      





コードへの注意: HangfireRetryJobFilter



クラスを実装するとき、 HangfireRetryJobFilter



AutomaticRetryAttribute



クラスが基礎として使用されたため、一部のメソッドの実装はこのクラスの対応するメソッドと部分的に一致します。


問題3-特定のキューにタスクを送信する方法は?



私は、タスクをキューに割り当てる2つの方法を見つけることができました:文書化された方法と-いいえ。







最初の方法 - メソッドの対応する属性をハングアップします







 [Queue("video_queue")] public void SomeMethod() { } BackgroundJob.Enqueue(() => SomeMethod());
      
      





http://docs.hangfire.io/en/latest/background-processing/configuring-queues.html







2番目のメソッド (文書化されていない) BackgroundJobClient



クラスを使用する







 var client = new BackgroundJobClient(); client.Create(() => MyMethod(), new EnqueuedState("video_queue"));
      
      





2番目の方法の利点は、Hangfireに不必要な依存関係を作成せず、どのプロセスでタスクを実行するかを決定できることです。 残念ながら、公式ドキュメントでは、 BackgroundJobClient



クラスとその適用方法についての言及は見つかりませんでした。 ソリューションでは2番目の方法を使用したため、実際にテストされています。







おわりに



この記事では、Hangfireの複数のキューのサポートを使用して、異なるタイプのタスクの処理を分離しました。 キューごとに個別の構成が可能な失敗したタスクを繰り返し、ジョブフィルターを使用してHangfireの機能を拡張するメカニズムを実装し、実行するタスクを目的のキューに送信する方法も学習しました。







この記事が誰かに役立つことを願っています。 コメントさせていただきます。







便利なリンク



Hangfireドキュメント

Hangfireソースコード

Scott Hanselman-ASP.NETでバックグラウンドタスクを実行する方法








All Articles