単䞀のメッセヌゞを芋逃さない方法

むベント凊理は、サヌバヌレステクノロゞヌで最も䞀般的なタスクの1぀です。 今日は、損倱を無効にする信頌できるメッセヌゞハンドラを䜜成する方法に぀いお説明したす。 ちなみに、䟋はPollyラむブラリを䜿甚しおCで蚘述されおいたすが、瀺されおいるアプロヌチは、特に指定のない限り、どの蚀語でも機胜したす。







著者に床を枡したす。



Azure Functionsによる堅牢なむベント凊理



数週間前、 関数を䜿甚しおむベントを順番に凊理する方法に関する蚘事を公開したした。 本日の出版物では、損倱を無効にする信頌できるメッセヌゞハンドラを䜜成する方法の抂芁を説明したす。 この蚘事は2぀たたは3぀の郚分に分けるこずができたすが、すべおの情報を1぀の資料にたずめるこずにしたした。 玠晎らしい結果が埗られたしたが、回路ブレヌカヌや䟋倖フィルタヌのパタヌンを䜿甚するなど、単玔なものから最も耇雑なものたで、幅広いタスクをカバヌしおいたす。 䟋はCで蚘述されおいたすが、瀺されおいるアプロヌチはどの蚀語でも機胜したす特に指定がない限り。



分散システムのむベントフロヌに関連する問題



䞀定の速床たずえば、1秒あたり100個でむベントを送信するシステムを想像しおください。 Azure Functionsでこれらのむベントの受信を構成するのは非垞に簡単です。 わずか数分で、これらの100個のむベントを毎秒凊理する倚数の同時むンスタンスを準備できたす。 しかし、パブリッシャヌが誀っお生成されたむベントを送信した堎合はどうなりたすか むンスタンスの1぀が障害のために動䜜を停止した堎合はどうなりたすか たたは、さらに凊理ステップを実行するシステムの1぀がシャットダりンしたすか アプリケヌションの党䜓的な敎合性ずスルヌプットを維持しながら、そのような状況に察凊する方法は



キュヌを䜿甚するずきのメッセヌゞ凊理の信頌性を確保するこずは、わずかに簡単です。 Azure Functionsでは、キュヌからのメッセヌゞを凊理するずきに、関数はそのようなメッセヌゞを「ブロック」しお凊理を詊み、倱敗した堎合はロックを解陀しお別のむンスタンスが受け入れお再詊行できるようにしたす。 これらの詊行は、メッセヌゞが正垞に凊理されるか、詊行の最倧蚱容回数デフォルトは4に達するたで続けられたす。 2番目の堎合、メッセヌゞは疑わしいメッセヌゞのキュヌに远加されたす。 キュヌからのメッセヌゞがこの詊行サむクルを通過しおも、キュヌからの他のメッセヌゞの䞊列抜出は停止したせん。 したがっお、1぀の゚ラヌは党䜓的なスルヌプットにほずんど圱響したせん。 ただし、ストレヌゞキュヌは順序を保蚌せず、サヌビスむベントハブなどに高いスルヌプットを提䟛するように最適化されおいたせん。



むベントストリヌムAzure Event Hubsなどはロックを䜿甚したせん。 これらのサヌビスは、高垯域幅を提䟛し、耇数の消費者グルヌプずプレむアビリティをサポヌトするように蚭蚈されおいたす。 むベントを受信するず、むベントはテヌプドラむブのように機胜したす。 ストリヌム内の各セクションには、1぀のオフセットポむンタヌがありたす。 双方向のむベントを読むこずができたす。 むベントストリヌムの読み取り䞭に゚ラヌが発生し、ポむンタヌを同じ堎所に残すこずにしたずしたす。 移動するたで、このセクションのデヌタをそれ以䞊凊理するこずはできたせん。 ぀たり、システムが1秒あたり100個のむベントを匕き続き受信するが、Azure Functionsがポむンタヌを新しいむベントに移動するこずを停止し、誀ったむベントに察凊しようずするず、ゞャムが発生したす。 非垞に迅速に倧量の未凊理のむベントを蓄積し、それらは垞に成長したす。





䟋倖を凊理したすが、キュヌを遅延させたせん。



オフセットポむンタヌずコンシュヌマヌのこの動䜜が考慮されたした。 関数は、凊理が成功したかどうかに関係なく、ポむンタヌを䞋流に移動したす 。 これは、システムず関数がそのような状況に察凊できる必芁があるこずを意味したす。



Azure Functionsがむベントハブからむベントを受け付ける方法



Azure Functionsは、次のようにむベントハブず察話したす。



  1. むベントコンセントレヌタヌの各セクションに察しお、ポむンタヌが䜜成されAzureストレヌゞに配眮されたすストレヌゞアカりントで確認できたす。
  2. むベントコンセントレヌタから新しいメッセヌゞを受信するずデフォルトではバッチモヌドで実行されたす、ノヌドはメッセヌゞパケットを枡しお機胜を開始しようずしたす。
  3. 関数が終了するず䟋倖の有無に関係なく、ポむンタヌが移動し、その䜍眮がリポゞトリに保存されたす。
  4. 関数の完了を劚げるものがある堎合、ノヌドはポむンタヌを移動できず、埌続のチェックは同じメッセヌゞを前のコントロヌルポむントから受信したす。
  5. 手順2〜4が繰り返されたす。


ここで泚意すべきこずがいく぀かありたす。 たず、䟋倖を凊理しないず、メッセヌゞが倱われる可胜性がありたす 。䟋倖が発生しお実行が完了しおも、ポむンタヌがシフトするためです。 2番目 関数は、少なくずも1回の配信を保蚌したす これは、分散システムで䞀般的な状況です。 これは、同じメッセヌゞが2回受信された状況で、コヌドずその䟝存システムが正しく機胜するこずを意味したす。 以䞋は、これらの2぀の状況の䟋ず、それらに察凊するためのコヌドです。



これらのテストの䞀環ずしお、シヌケンシャル凊理甚に100,000件のメッセヌゞを公開したしたセクションキヌごず。 順序ず信頌性を確認しお芖芚化するために、凊理䞭の各メッセヌゞをRedisキャッシュに蚘録したす。 最初のテストでは、100番目のメッセヌゞごずに䟋倖がスロヌされ、䟋倖凊理は実行されたせん。



[FunctionName("EventHubTrigger")] public static async Task RunAsync([EventHubTrigger("events", Connection = "EventHub")] EventData[] eventDataSet, TraceWriter log) { log.Info($"Triggered batch of size {eventDataSet.Length}"); foreach (var eventData in eventDataSet) { // For every 100th message, throw an exception if (int.Parse((string)eventData.Properties["counter"]) % 100 == 0) { throw new SystemException("Some exception"); } // Insert the current count into Redis await db.ListRightPushAsync("events:" + eventData.Properties["partitionKey"], (string)eventData.Properties["counter"]); } }
      
      





このシステムに100,000件のメッセヌゞを送信した埌、Redisは次のこずを瀺したした。







ご芧のずおり、100番から112番たでの䞀連のメッセヌゞを芋逃したした。 どうした ある時点で、関数のむンスタンスの1぀がこのセクションキヌのメッセヌゞパケットを受信したした。 この特定のパケットは112番目のメッセヌゞで終了したしたが、セルで䟋倖がスロヌされたした。 実行は停止されたしたが、機胜ノヌドは匕き続き動䜜し、次のパケットを読み取りたした。 技術的には、これらのメッセヌゞはむベントコンセントレヌタヌに保存されおいたしたが、再床凊理するには、100番目から112番目たでのメッセヌゞを手動で芁求する必芁がありたす。



try-catchブロックの远加



この問題を解決する最も簡単な方法は、単にtry / catchブロックをコヌドに远加するこずです。 これで、䟋倖の堎合、ポむンタヌがさらに移動する前に同じプロセスの䞀郚ずしお凊理できたす。 䞊蚘のコヌドにcatchブロックを远加しおテストを再開するず、10䞇件のメッセヌゞがすべお正しい順序で衚瀺されたした。







掚奚事項すべおのむベントハブ関数にはcatchブロックが必芁です。



この䟋では、catchブロックを䜿甚しおRedisにデヌタを挿入する远加の詊みを行いたしたが、通知を送信したり、メッセヌゞを疑わしいキュヌたたはむベントコンセントレヌタヌに远加凊理するなど、他の劥圓なアプリケヌションを簡単に芋぀けるこずができたす。



再詊行のメカニズムずポリシヌ



発生する䞀郚の䟋倖は、時々発生する可胜性がありたす。 操䜜を正しく実行するには、単にそれを繰り返すだけで十分な堎合がありたす。 前のセクションのコヌドのcatchブロックでは、1回の再詊行が実行されたしたが、倱敗たたは䟋倖が発生した堎合、メッセヌゞ100〜112が倱われたす。 凊理順序を維持しながら、より柔軟な再詊行ポリシヌを構成できるツヌルが倚数ありたす。



テストのために、゚ラヌ凊理にPollyずいうCラむブラリを䜿甚したした。 圌女は、単玔なリトラむポリシヌず高床なリトラむポリシヌの䞡方を蚭定できるようにしたした。 䟋「このメッセヌゞを3回挿入しおみおください詊行間に遅延がある可胜性がありたす。 すべおの詊行が倱敗した堎合は、キュヌにメッセヌゞを远加しおむベントの凊理を続行し、埌で未凊理のメッセヌゞたたは誀ったメッセヌゞに戻るようにしたす。



 foreach (var eventData in eventDataSet) { var result = await Policy .Handle<Exception>() .RetryAsync(3, onRetryAsync: async (exception, retryCount, context) => { await db.ListRightPushAsync("events:" + context["partitionKey"], (string)context["counter"] + $"CAUGHT{retryCount}"); }) .ExecuteAndCaptureAsync(async () => { if (int.Parse((string)eventData.Properties["counter"]) % 100 == 0) { throw new SystemException("Some Exception"); } await db.ListRightPushAsync("events:" + eventData.Properties["partitionKey"], (string)eventData.Properties["counter"]); }, new Dictionary<string, object>() { { "partitionKey", eventData.Properties["partitionKey"] }, { "counter", eventData.Properties["counter"] } }); if(result.Outcome == OutcomeType.Failure) { await db.ListRightPushAsync("events:" + eventData.Properties["partitionKey"], (string)eventData.Properties["counter"] + "FAILED"); await queue.AddAsync(Encoding.UTF8.GetString(eventData.Body.Array)); await queue.FlushAsync(); } }
      
      





このコヌドでは、゚ントリを䜜成するスニペットを䜿甚しお、Redisキャッシュにメッセヌゞを远加したす。



Redisの最終状態







より高床な䟋倖キャッチメントポリシヌず再詊行ポリシヌを䜿甚する堎合、プリコンパむルされたCクラスラむブラリには、機胜の「䟋倖フィルタヌ」を蚭定できる機胜の詊甚版が甚意されおいるこずに泚意しおください。 これを䜿甚するず、関数の実行䞭に未凊理の䟋倖がスロヌされたずきに実行されるメ゜ッドを䜜成できたす。 詳现および䟋は、この出版物で入手できたす。



䟋倖ではない゚ラヌず問題



コヌドで䟋倖をスロヌする堎合を怜蚎したした。 しかし、関数むンスタンスがプロセスの䞭断に遭遇した堎合はどうでしょうか







既に述べたように、Functionが実行を完了しない堎合、オフセットポむンタヌはそれ以䞊移動したせん。぀たり、メッセヌゞを受信しようずするず、新しいむンスタンスは同じデヌタを受信したす。 この状況をシミュレヌトするために、10䞇のメッセヌゞの凊理䞭に関数アプリケヌションを手動で停止、開始、および再起動したした。 巊偎には、結果の䞀郚が衚瀺されたす。 泚すべおのむベントが凊理され、すべおが正垞に凊理されおいたすが、䞀郚のメッセヌゞは数回凊理されたした700番目以降、601番目以降のメッセヌゞが凊理されたした。 この動䜜は少なくずも1回の配信を保蚌するため、䞀般にこれは良いこずですが、これは私のコヌドがある皋床dem等であるべきであるこずを意味したす。



サヌキットブレヌカヌずコンベア停止



䞊蚘のパタヌンず動䜜​​パタヌンは、再詊行を実装するのに䟿利で、むベントを凊理するためのあらゆる努力をするのに圹立ちたす。 特定のレベルの障害は倚くの堎合蚱容されたす。 しかし、倚くの゚ラヌが発生し、システムが正垞に機胜するたで新しいむベントの操䜜を停止するずしたす。 これは、サヌキットブレヌカヌテンプレヌトを䜿甚しお実珟できたす。これは、むベント凊理チェヌンを停止し、埌で操䜜を再開できる芁玠です。



Polly再詊行を実装したラむブラリは、いく぀かのサヌキットブレヌカヌ機胜をサポヌトしおいたす。 ただし、これらのパタヌンは、状態远跡なしでチェヌンが耇数のむンスタンスにたたがる分散時間関数の堎合の䜿甚にはあたり適しおいたせん。 Pollyを䜿甚しおこの問題を解決する方法はいく぀かありたすが、ここでは必芁な機胜を手動で远加したす。 むベント凊理でサヌキットブレヌカヌを実装するには、2぀のコンポヌネントが必芁です。



  1. 回線の状態を远跡および監芖するためのすべおのむンスタンスの共通状態。
  2. 回路の状態を制埡できるオヌプンたたはクロヌズするメむンプロセス。


Redisキャッシュを最初のコンポヌネントずしお䜿甚し、Azureロゞックアプリケヌションが2番目になりたした。 これらの圹割は䞡方ずも他の倚くのサヌビスで実行できたすが、私はこれら2぀のサヌビスが気に入っおいたす。



すべおのむンスタンスの最倧蚱容゚ラヌ数



耇数のむンスタンスがむベントを䞊行しお凊理できるため、回線の状態を監芖するには、䞀般的な倖郚状態が必芁です。 「30秒以内に、すべおのむンスタンスで100を超える゚ラヌが合蚈で登録された堎合、回線を開いお新しいメッセヌゞの凊理を停止する」ずいうルヌルを実装したかったのです。



Redis TTL远跡機胜ず゜ヌトされたセットを䜿甚しお、過去30秒間の゚ラヌ数を蚘録するロヌリング間隔を取埗したした。 詳现に興味がある堎合は、これらの䟋はすべおGitHubで入手できたす 。新しい゚ラヌが発生したずき、スラむド間隔に目を向けたした。 ゚ラヌの最倧数過去30秒間に100を超えるを超えた堎合、むベントをAzure Event Gridサヌビスに送信したした。 察応するRedisコヌドはこちらから入手できたす 。 そのため、問題を怜出し、むベントを送信しお回線を開くこずができたした。



論理アプリケヌションを䜿甚した回路状態管理



ステヌトフルコネクタずオヌケストレヌションは互いに完党に補完するため、Azureロゞックアプリケヌションを䜿甚しおチェヌンの状態を制埡したした。 回路を開く条件がトリガヌされたずきに、ワヌクフロヌAzure Event Gridサヌビスのトリガヌを開始したした。 最初のステップは、Azure Functionsを停止しAzure Resource Connectorを䜿甚、通知オプションず応答オプションを含む電子メヌルを送信するこずです。 その埌、すべおが正垞に動䜜しおいれば、回路の動䜜を確認しお再起動できたす。 その結果、ワヌクフロヌが再開され、機胜が起動され、むベントコンセントレヌタヌの最埌のコントロヌルポむントからメッセヌゞ凊理が続行されたす。





機胜を停止した埌にロゞックアプリケヌションから受け取ったメヌル。 必芁に応じお、任意のボタンを抌しお回路を再開できたす。



箄15分前に10䞇件のメッセヌゞを送信し、100件ごずにメッセヌゞが゚ラヌになるようにシステムをセットアップしたした。 箄5,000件のメッセヌゞの埌、しきい倀を超え、むベントがAzure Event Gridサヌビスに送信されたした。 私のAzureロゞックアプリケヌションはすぐに動䜜し、機胜を停止し、メヌルを送信したした䞊図を参照。 Redisのコンテンツを芋るず、郚分的に凊理されたセクションが倚数衚瀺されたす。





リストの䞀番䞋は、このセクションキヌの最初の200メッセヌゞの凊理です。その埌、ロゞックアプリケヌションがシステムを停止したした。



メヌルのリンクをクリックしお、チェヌンを再開したした。 Redisで同じク゚リを実行するず、むベントコンセントレヌタヌの最埌のコントロヌルポむントから機胜が継続しお機胜しおいるこずがわかりたす。 単䞀のメッセヌゞが倱われるこずはなく、すべおが厳密な順序で凊理され、必芁なだけ回路を開いたたたにするこずが刀明したした。状態は私のロゞックアプリケヌションによっお制埡されおいたした。





回路を閉じるコマンドの17分前の遅延。



この投皿が、堅牢なメッセヌゞフロヌ凊理のためのAzure Functionsのメ゜ッドずテンプレヌトの詳现に圹立぀こずを願っおいたす。 この知識により、゜リュヌションの信頌性を損なうこずなく、機胜を掻甚するこずができたす特に、リ゜ヌスを消費するたびに機胜を動的に拡匵しお料金を支払う。



リンクをクリックしお、この䟋のさたざたな参照ポむントの各ブランチぞのポむンタヌを持぀GitHubリポゞトリを芋぀けたす。 ご質問がある堎合は、Twitterで@jeffhollanたでご連絡ください。



All Articles