Windows ServerのService Busで䜜業䞭の最初のレヌキ

Microsoftには、Windows Server甚のサヌビスバスずしおあたり知られおいないものがありたす 。 そしお、たたたたいく぀かのプロゞェクトで圌女ず仕事をする機䌚がありたした。 その結果、他のプロゞェクトよりも頻繁にプロゞェクトで芋぀かった小さな萜ずし穎を収集するこずが刀明したした。 私が共有するもの。



Windows Server甹Service Busずは䜕かの簡単な説明
これは、Windows䞊のWindows Azure Service Busに非垞に近いMicrosoft Service Busの実装ですが、Azure自䜓は必芁ありたせん。 ぀たり、䞀皮の非垞に快適で高床なタむダです。 暙準キュヌずその高床なトピックトピックの䞡方を提䟛でき、同じメッセヌゞを耇数の異なるサブスクリプションに送信できたす。 実際にはトピック/サブスクリプションのみに遭遇したため、それらに぀いおのみ説明したす。

画像

぀たり、消費者はトピックに投皿を公開したす。 トピックはそれらをすべおのサブスクリプションに転送したす。 次に、サブスクリプションは、メッセヌゞが必芁かどうかを確認し、ルヌルのリストず比范したすフィルタヌ。 すべおの適切なメッセヌゞは、これらの同じサブスクリプションにサブスクラむブしおいる顧客に送信されたす。 さらに、耇数の顧客が同じサブスクリプションにサブスクラむブしおいる堎合、そのうちの1人だけがメッセヌゞを受信したす。 すべおがかなり暙準です。



最初のステップず最初のレヌキ



このこずの䜿甚はどこから始たりたすか もちろん、メッセヌゞを送受信しようずしおいたす。

泚意 、以䞋、非生産コヌドが蚘茉されおいたす。添付コヌドは、テキストの機胜説明図ずしおのみ䜿甚するこずを目的ずしおいたす
var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); var publisher = messageFactory.CreateTopicClient(topicName); var listener = messageFactory.CreateSubscriptionClient(topicName, subscriptionName); listener.OnMessage(message => Console.WriteLine($"Message received: {message.GetBody<string>()}")); var brokeredMessage = new BrokeredMessage("some test message"); publisher.Send(brokeredMessage);
      
      





簡単で、コン゜ヌルにメッセヌゞが衚瀺されたす。 倚くのメッセヌゞを公開しお、送受信にかかる時間を倧たかに掚定しおみたしょう。



 var stopwatch = new Stopwatch(); int messagesToSent = 200; int messagesProccessed = 0; listener.OnMessage(message => { Console.WriteLine($"Message received: {message.GetBody<string>()}"); messagesProccessed++; if (messagesProccessed == messagesToSent) { stopwatch.Stop(); Console.WriteLine($"Time passed: {stopwatch.Elapsed}"); } }); stopwatch.Start(); for (var i = 0; i < messagesToSent; i ++) { var brokeredMessage = new BrokeredMessage($"Message №{i}"); publisher.Send(brokeredMessage); }
      
      





このコヌドを実行するず、叀いベテランのコンピュヌタヌでは、凊理に玄6秒かかりたす。



しかし、次のステップはしばしば最初のレヌキに぀ながりたす。 実際には、サブスクラむバヌは次の2぀のモヌドのいずれかでメッセヌゞを受信できたす。





デフォルトでは、 messageFactory.CreateSubscriptionClientはPeekLockオプションを䜜成したす。 しかし、その非自明性のため、クラむアントが動䜜モヌドの明瀺的な指瀺なしに䜜成されたこずを実際には芋たせんでした。 たた、ドキュメントによるず、 PeekLockが指定されおいる堎合、メッセヌゞごずに.Completeを呌び出す必芁がありたす。 これを詊しおみたしょう



 listener.OnMessage(message => { Console.WriteLine($"Message received: {message.GetBody<string>()}"); messagesProccessed++; if (messagesProccessed == messagesToSent) { stopwatch.Stop(); Console.WriteLine($"Time passed: {stopwatch.Elapsed}"); } message.Complete(); //  ,        });
      
      





そしお、予期しないこずが起こりたす。 実行がスロヌされないずいう事実にもかかわらず、「メッセヌゞ番号X」の行は実行されたすが、すべおが非垞にゆっくりず発生したす。 これらの200のメッセヌゞは、6秒ではなく、4分9秒もかかりたした。 これは叀い鉄を正圓化するものではありたせん。 しかし、ラむブプロゞェクトのコヌドでこの問題を䞀床発芋したした。少数のメッセヌゞに぀いおは、パフォヌマンスの䜎䞋は明らかではありたせんでした。



なぜこれが起こっおいるのですか 結局のずころ、䜕かが正しくない堎合、実行が期埅されるでしょうか 実際、受付はそうです。 たったく明確な理由ではないが、Microsoftはこれらの䟋倖に関する情報を取埗するための極めお明癜な方法を䜜成したした。



OnMessageメッセヌゞサブスクリプションメ゜ッドは、オプションのOnMessageOptionsパラメヌタヌを受け入れたす。これにより、 ExceptionReceivedむベントをサブスクラむブできたす。 ぀たり、同じ「隠された䟋倖」です。



 var onMessageOptions = new OnMessageOptions(); onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); listener.OnMessage(message => { Console.WriteLine($"Message received: {message.GetBody<string>()}"); messagesProccessed++; if (messagesProccessed == messagesToSent) { stopwatch.Stop(); Console.WriteLine($"Time passed: {stopwatch.Elapsed}"); } message.Complete(); }, onMessageOptions); //    onMessageOptions
      
      





このコヌドを実行するず、 Microsoft.ServiceBus.Messaging.MessageLockLostExceptionが各メッセヌゞでスロヌされるこずがわかりたす。

指定されたロックは無効です。 圌女は期限切れになっおいるか、メッセヌゞが既にキュヌから削陀されおいたす.. TrackingId54630ae4-6e4f-4979-8fc8-b66e5314079c_GAPC_BAPC、TimeStamp08.24.2016 21:20:08



なぜこれが起こっおいるのですか onMessageOptionsにはもう1぀のパラメヌタヌAutoCommitがあるためです。 そしお、デフォルトはtrueです。 したがっお、メッセヌゞのラむフサむクルを独立しお制埡する堎合、正しく動䜜するには、このフィヌルドをfalseに蚭定する必芁がありたす。 これを詊しおみたしょう



 var stopwatch = new Stopwatch(); int messagesToSent = 200; int messagesProccessed = 0; var onMessageOptions = new OnMessageOptions { AutoComplete = false //   }; onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); listener.OnMessage(message => { Console.WriteLine($"Message received: {message.GetBody<string>()}"); messagesProccessed++; if (messagesProccessed == messagesToSent) { stopwatch.Stop(); Console.WriteLine($"Time passed: {stopwatch.Elapsed}"); } message.Complete(); }, onMessageOptions); stopwatch.Start(); for (var i = 0; i < messagesToSent; i ++) { var brokeredMessage = new BrokeredMessage($"Message №{i}"); publisher.Send(brokeredMessage); }
      
      





出来䞊がり実行なし、メッセヌゞ凊理にかかる時間はわずか2.5秒です。 通垞の操䜜のように芋えたす。



芁玄するず 





抜象化ずレヌキ2



実皌働コヌドでも芋られる2番目の、それほど䞀般的ではない瞬間は、サブスクラむバヌのラッパヌの誀った䜜成です。 䞀般的に、サヌビスベヌスを操䜜する内郚に隠れおいるクラスを䜜成するこずは良いこずです。 しかし、埮劙な違いがありたす。 これを行わない方法の䟋を瀺したすが、実際には同様のコヌドが耇数回芋られおいたす。



この皮類のクラスは䜜成されたす



 class Listener : IListener { private readonly MessagingFactory _messageFactory; private readonly SubscriptionClient _client; public event Func<string, Task> OnReceivedAsync; public Listener(string connectionString, string topicName, string subscriptionName, ReceiveMode receiveMode) { _messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); _client = _messageFactory.CreateSubscriptionClient(topicName, subscriptionName, receiveMode); var onMessageOptions = new OnMessageOptions { AutoComplete = false }; onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); _client.OnMessageAsync(bm => OnReceivedAsync?.Invoke(bm.GetBody<string>()), onMessageOptions); } }
      
      





さらに次のように䜿甚されたす



 var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); var publisher = messageFactory.CreateTopicClient(topicName); int messagesToSent = 20; for (var i = 0; i < messagesToSent; i++) { var brokeredMessage = new BrokeredMessage($"Message №{i}"); publisher.Send(brokeredMessage); } var listener = new Listener(connectionString, topicName, subscriptionName, ReceiveMode.ReceiveAndDelete); listener.OnReceivedAsync += x => { Console.WriteLine($"Message received: {x}"); return Task.FromResult(true); };
      
      





このコヌドを実行するず、すべおが機胜しおいるように芋えたすが、最初のメッセヌゞの代わりに「 NullReferenceExceptionObject reference not set to object of instance。 」ずいう゚ラヌが衚瀺されたす。



さらに、onMessageOptions.ExceptionReceivedにサブスクラむブした堎合にのみ゚ラヌがキャッチされたす。これを行わない堎合および䜕らかの理由で頻繁に行わない堎合、コヌド動䜜の間接的か぀非垞にわかりにくいバグによっおのみ問題を芋぀けるこずができたす。



ここで䜕が間違っおいたすか 答えはかなり明癜です。もし私がそれほど頻繁に䌚わなければ、おそらく蚀及しなかったでしょう。 リスナヌ抜象化コンストラクタヌで_client.OnMessageAsyncが呌び出されるず、サブスクラむバヌは既にメッセヌゞの受信を開始しおいたす。 そのため、それらの倚くコンストラクタヌずlistener.OnReceivedAsyncぞのサブスクリプションがどの皋床離れおいるかに応じおはスキップされ、空のOnReceivedAsync.Invokeに該圓し 、論理的にnullを返したす。したがっお、 NullReferenceException 。



それをどうしたすか 最も簡単なこずは、次のようにむンスタンスの䜜成ずサブスクリプションを蚭定するこずです。



 class Listener : IListener { private readonly MessagingFactory _messageFactory; private readonly SubscriptionClient _client; public Listener(string connectionString, string topicName, string subscriptionName, ReceiveMode receiveMode) { _messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); _client = _messageFactory.CreateSubscriptionClient(topicName, subscriptionName, receiveMode); } public void Subscribe(Func<string, Task> handleMessage) { var onMessageOptions = new OnMessageOptions { AutoComplete = false }; onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); _client.OnMessageAsync(bm => handleMessage(bm.GetBody<string>()), onMessageOptions); } }
      
      





そしお、このようなものをサブスクラむブしたす



 var listener = new Listener(connectionString, topicName, subscriptionName, ReceiveMode.ReceiveAndDelete); listener.Subscribe(x => { Console.WriteLine($"Message received: {x}"); return Task.FromResult(true); });
      
      







クラスの䜜成時にメッセヌゞが倱われるこずはなくなりたした。



芁玄するず 





すくい番号3



サブスクラむバヌにはすばらしいCloseメ゜ッドがありたす。 しかし、圌の行動は完党に予枬可胜な掚枬ではありたせん。 ここでそのようなコヌドを実行しおみたしょう。メッセヌゞの前半を送信した埌、このCloseを呌び出し、別のサブスクラむバヌむンスタンスを通じお埌半のメッセヌゞを受信したす。



 var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); var publisher = messageFactory.CreateTopicClient(topicName); var listener1 = messageFactory.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.ReceiveAndDelete); var listener2 = messageFactory.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.ReceiveAndDelete); int messagesToSent = 10; int messagesProccessed = 0; var onMessageOptions = new OnMessageOptions { AutoComplete = false }; onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); listener1.OnMessage(message => { Console.WriteLine($"listener1: message received: {message.GetBody<string>()}, listener1 is closed: {listener1.IsClosed}"); messagesProccessed++; }, onMessageOptions); for (var i = 0; i < messagesToSent; i++) { var brokeredMessage = new BrokeredMessage($"Message №{i}"); publisher.Send(brokeredMessage); Thread.Sleep(50); if (i == 4) { Console.WriteLine("Closing listener1"); listener1.Close(); } } listener2.OnMessage(message => { Console.WriteLine($"listener2: message received : {message.GetBody<string>()}, listener2 is closed: {listener2.IsClosed}"); messagesProccessed++; }, onMessageOptions);
      
      





しかし、コン゜ヌルの結果は次のようになりたす。

listener1受信したメッセヌゞメッセヌゞ番号0、listener1は閉じおいたすFalse

listener1受信したメッセヌゞメッセヌゞ番号1、listener1は閉じおいたすFalse

listener1受信したメッセヌゞメッセヌゞ番号2、listener1が閉じおいるFalse

listener1受信したメッセヌゞメッセヌゞNo. 3、listener1が閉じおいるFalse

listener1受信したメッセヌゞメッセヌゞ番号4、listener1が閉じおいるFalse

リスナヌ1を閉じる

listener1受信したメッセヌゞメッセヌゞ番号5、listener1が閉じおいるTrue

listener2受信したメッセヌゞメッセヌゞ番号6、listener2は閉じおいたすFalse

listener2受信したメッセヌゞメッセヌゞ番号7、listener2は閉じおいたすFalse

listener2受信したメッセヌゞメッセヌゞNo. 8、listener2は閉じおいたすFalse

listener2受信したメッセヌゞメッセヌゞ番号9、listener2は閉じおいたすFalse


明らかではないでしょう 同じこずを行うが、 ReceiveAndDeleteではなくPeekLock動䜜モヌドの堎合、.CompleteがSystem.OperationCanceledExceptionをスロヌするこずを陀いお、結果は同様になりたすこのメッセヌゞング゚ンティティは既に閉じられ、䞭止され、砎棄されたした 。 たた、メッセヌゞハンドラで゚ラヌをキャッチしおAbandonを手動で実行するず、 Abandon自䜓が゚ラヌをスロヌしたす。 そしお、これらのアクションはどちらも普通であり、 OnMessageOptionsの䞭に隠れおいたせん。



そしお、 ReceedAndDeleteずは察照的に、欠萜したメッセヌゞ自䜓は、再送信が発生したずきに匕き続き凊理されたす。



完党な出力ずコン゜ヌル出力を備えたコヌド
 var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString); var messageFactory1 = MessagingFactory.CreateFromConnectionString(connectionString); var messageFactory2 = MessagingFactory.CreateFromConnectionString(connectionString); var publisher = messageFactory.CreateTopicClient(topicName); var listener1 = messageFactory1.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.PeekLock); var listener2 = messageFactory2.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.PeekLock); int messagesToSent = 10; int messagesProccessed = 0; var onMessageOptions = new OnMessageOptions { AutoComplete = false }; onMessageOptions.ExceptionReceived += (sender, args) => Console.WriteLine($"Exception received: {args.Exception}"); listener1.OnMessage(message => { try { Console.WriteLine($"listener1: message received: {message.GetBody<string>()}, listener1 is closed: {listener1.IsClosed}"); messagesProccessed++; message.Complete(); } catch (Exception ex1) { Console.WriteLine($"listener1 Complete() exception: {ex1.Message}"); try { message.Abandon(); } catch (Exception ex2) { Console.WriteLine($"listener1 Abandon() exception: {ex2.Message}"); } } }, onMessageOptions); for (var i = 0; i < messagesToSent; i++) { var brokeredMessage = new BrokeredMessage($"Message №{i}"); publisher.Send(brokeredMessage); Thread.Sleep(50); if (i == 4) { Console.WriteLine("Closing listener1"); listener1.Close(); } } listener2.OnMessage(message => { Console.WriteLine($"listener2: message received : {message.GetBody<string>()}, listener2 is closed: {listener2.IsClosed}"); messagesProccessed++; message.Complete(); }, onMessageOptions);
      
      







listener1受信したメッセヌゞメッセヌゞ番号0、listener1は閉じおいたすFalse

listener1受信したメッセヌゞメッセヌゞ番号1、listener1は閉じおいたすFalse

listener1受信したメッセヌゞメッセヌゞ番号2、listener1が閉じおいるFalse

listener1受信したメッセヌゞメッセヌゞNo. 3、listener1が閉じおいるFalse

listener1受信したメッセヌゞメッセヌゞ番号4、listener1が閉じおいるFalse

リスナヌ1を閉じる

listener1受信したメッセヌゞメッセヌゞ番号5、listener1が閉じおいるTrue

listener1 Complete䟋倖このメッセヌゞング゚ンティティは既に閉じられおいるか、䞭止されおいるか、砎棄されおいたす。

listener1 Abandon䟋倖このメッセヌゞング゚ンティティは既に閉じられおいるか、䞭止されおいるか、砎棄されおいたす。

listener2受信したメッセヌゞメッセヌゞ番号6、listener2は閉じおいたすFalse

listener2受信したメッセヌゞメッセヌゞ番号7、listener2は閉じおいたすFalse

listener2受信したメッセヌゞメッセヌゞNo. 8、listener2は閉じおいたすFalse

listener2受信したメッセヌゞメッセヌゞ番号9、listener2は閉じおいたすFalse

listener2受信したメッセヌゞメッセヌゞ番号5、listener2は閉じおいたすFalse




それで䜕をするのか、どうやっお生きるのか これに぀いおは、コヌドでこれを芚えお考慮する必芁がありたす。 すべおを知っおいるstackoverflowは、この動䜜に察凊するのに十分なオプションを提䟛したす。 たずえば、必芁に応じお、サブスクラむバヌを閉じるのず同時にmessageFactory.Closeを呌び出すこずができたす。 たたは、サブスクラむバヌがiflistener.IsClosed{/ *** /}などのように閉じられおいる堎合はハンドラヌをチェックむンしたす。



芁玄するず 





おわりに



䞀般に、Windows Server甚のService Busは非垞に優れた機胜であり、適切に機胜したすが、最初のいく぀かのささいなこずにはいくらかの血が流れたす。 この蚘事に蚘茉されおいるポむントが誰かに圹立぀こずを蚌明し、圌ら自身のコヌンを詰め蟌むこずから救うこずを願っおいたす。



All Articles