.Net甚の信頌性の高いUdpプロトコルの実装

むンタヌネットは昔から倉わっおいたす。 䞻なむンタヌネットプロトコルの1぀であるUDPは 、デヌタグラムずブロヌドキャストの配信だけでなく、ネットワヌクノヌド間のピアツヌピア接続の提䟛にもアプリケヌションで䜿甚されたす。 シンプルなデバむスのため、このプロトコルには以前に蚈画されおいなかった倚くの適甚方法がありたすが、配信の保蚌がないなどのプロトコルの欠陥は消えおいたせん。 この蚘事では、UDPを介した保蚌付き配信プロトコルの実装に぀いお説明したす。

内容
゚ントリヌ

プロトコル芁件

信頌できるUDPヘッダヌ

プロトコルの䞀般原則

プロトコルのタむムアりトずタむマヌ

信頌性の高いUDP送信状態図

コヌドの詳现。 トランスミッションコントロヌルナニット

コヌドの詳现。 州



コヌドの詳现。 接続の䜜成ず確立

コヌドの詳现。 接続タむムアりトを閉じる

コヌドの詳现。 デヌタ埩旧

信頌できるUDP API

おわりに

圹立぀リンクず蚘事



゚ントリヌ



むンタヌネットの元のアヌキテクチャは、各ノヌドがグロヌバルで䞀意のIPアドレスを持ち、他のノヌドず盎接通信できる均䞀なアドレス空間を暗瀺しおいたした。 実際、むンタヌネットのアヌキテクチャは異なりたす。グロヌバルIPアドレスの1぀の領域ず、 NATデバむスの背埌に隠されたプラむベヌトアドレスを持぀倚くの領域です。 このようなアヌキテクチャでは、グロヌバルアドレス空間にあるデバむスのみが䞀意のグロヌバルルヌティング可胜なIPアドレスを持っおいるため、ネットワヌク䞊の誰ずでも簡単にやり取りできたす。 プラむベヌトネットワヌクにあるノヌドは、同じネットワヌク内の他のノヌドに接続できるだけでなく、グロヌバルアドレス空間内の他の既知のノヌドにも接続できたす。 この盞互䜜甚はネットワヌクアドレス翻蚳のメカニズムのために䞻に達成されたす 。 Wi-FiルヌタヌなどのNATデバむスは、発信接続甚の倉換テヌブルに特別な゚ントリを䜜成し、パケットのIPアドレスずポヌト番号を倉曎したす。 これにより、グロヌバルネットワヌクのノヌドからグロヌバルアドレス空間のノヌドぞの発信接続を確立できたす。 ただし、同時に、NATデバむスは通垞、着信接続甚の個別のルヌルが蚭定されおいない限り、すべおの着信トラフィックをブロックしたす。



このようなむンタヌネットのアヌキテクチャは、クラむアントがプラむベヌトネットワヌクにあり、サヌバヌがグロヌバルアドレスを持っおいる堎合に、クラむアントずサヌバヌの盞互䜜甚に十分に適しおいたす。 ただし、 異なるプラむベヌトネットワヌク間で2぀のノヌドを盎接接続するのは困難です。 2぀のノヌドの盎接接続は、音声䌝送Skype、コンピュヌタヌぞのリモヌトアクセスTeamViewer、オンラむンゲヌムなどのピアツヌピアアプリケヌションにずっお重芁です。



さたざたなプラむベヌトネットワヌク䞊にあるデバむス間でピアツヌピア接続を確立するための最も効果的な方法の1぀は、ホヌルパンチず呌ばれたす。 この手法は、UDPプロトコルに基づくアプリケヌションで最もよく䜿甚されたす。



ただし、アプリケヌションでデヌタ配信の保蚌が必芁な堎合、たずえば、コンピュヌタヌ間でファむルを転送する堎合、UDPを䜿甚するず、UDPは保蚌された配信プロトコルではなく、TCPプロトコルずは異なり、パケット配信を順番に提䟛しないずいう事実に関連する倚くの問題が発生したす。



この堎合、保蚌されたパケット配信を保蚌するには、必芁な機胜を提䟛し、UDP䞊で動䜜するアプリケヌション局プロトコルを実装する必芁がありたす。



すぐに泚意したいのは、異なるプラむベヌトネットワヌクのノヌド間にTCP接続を確立するためのTCPホヌルパンチテクニックがあるこずですが、倚くのNATデバむスがサポヌトされおいないため、通垞、そのようなノヌドを接続する䞻な方法ずは芋なされおいたせん。



この蚘事の埌半では、保蚌付き配信プロトコルの実装のみを怜蚎したす。 UDPホヌルパンチング手法の実装に぀いおは、以䞋の蚘事で説明したす。



プロトコル芁件



  1. ポゞティブフィヌドバックメカニズムいわゆるポゞティブアクノリッゞメントによっお実装された信頌性の高いパケット配信
  2. 効率的なビッグデヌタ転送の必芁性、぀たり プロトコルは䞍必芁なパケットリレヌを避けるべきです
  3. 配信確認メカニズム「クリヌンな」UDPプロトコルずしお機胜する機胜をキャンセルするこずが可胜です。
  4. 各メッセヌゞの確認を䌎うコマンドモヌドを実装する機胜
  5. プロトコルを介しおデヌタを送信するための基本単䜍はメッセヌゞです。


これらの芁件は、 rfc 908およびrfc 1151で説明されおいるReliable Data Protocolの芁件ずほが䞀臎しおおり、このプロトコルを開発する際にこれらの暙準に䟝存しおいたした。



これらの芁件を理解するために、TCPおよびUDPプロトコルを䜿甚した2぀のネットワヌクノヌド間のデヌタ転送のタむミング図を芋おみたしょう。 どちらの堎合でも、1぀のパケットが倱われるず仮定したす。

TCPを介した非察話型デヌタの送信




図からわかるように、パケットが倱われた堎合、TCPは倱われたパケットを怜出し、これを送信者に報告し、倱われたセグメントの数を芁求したす。

UDPデヌタ転送




UDPは、損倱を怜出するための手順を実行したせん。 UDPプロトコルでの䌝送゚ラヌの制埡は、アプリケヌションに完党にかかっおいたす。



TCPプロトコルでの゚ラヌ怜出は、゚ンドノヌドずの接続を確立し、この接続の状態を維持し、各パケットヘッダヌで送信されたバむト数を瀺し、「確認番号」確認番号を䜿甚しお通知を受信したす。



さらに、パフォヌマンスを改善する぀たり、確認を受信せずに耇数のセグメントを送信するために、TCPプロトコルは、いわゆる送信りィンドりセグメントの送信者が受信するこずを期埅するデヌタのバむト数を䜿甚したす。



TCPプロトコルの詳现に぀いおは、 rfc 793を参照しおください。UDPはrfc 768で 、実際に定矩されおいたす。



䞊蚘から、UDP経由でメッセヌゞを配信するための信頌性の高いプロトコル以䞋、 Reliable UDPず呌びたすを䜜成するには、TCP通信メカニズムず同様の実装が必芁であるこずは明らかです。 すなわち

さらに必芁

信頌できるUDPヘッダヌ



UDPデヌタグラムはIPデヌタグラムにカプセル化されおいるこずを思い出しおください。 したがっお、Reliable UDPパケットはUDPデヌタグラムに「ラップ」されたす。

信頌できるUDPヘッダヌのカプセル化




Reliable UDPヘッダヌの構造は非垞に単玔です。









フラグは次のずおりです。



プロトコルの䞀般原則



Reliable UDPは2぀のノヌド間のメッセヌゞ送信の保蚌に重点を眮いおいるため、反察偎ずの接続を確立できるはずです。 接続を確立するために、送信偎はFirstPacketフラグ付きのパケットを送信したす。これに察する答えは、接続が確立されたこずを意味したす。 すべおの応答パケット、぀たり肯定応答パケットは、PacketNumberフィヌルドの倀を、正垞に受信したパケットの最倧のPacketNumber倀よりも垞に1倧きい倀に蚭定したす。 最初に送信されたパケットの[オプション]フィヌルドに、メッセヌゞサむズが曞き蟌たれたす。



同様のメカニズムを䜿甚しお接続を完了したす。 最埌のメッセヌゞパケットでは、LastPacketフラグが蚭定されおいたす。 応答パケットは、最埌のパケットの番号+ 1を瀺したす。これは、受信偎にずっお、メッセヌゞの配信が成功したこずを意味したす。

接続の確立ず終了の図




接続が確立されるず、デヌタ転送が開始されたす。 デヌタはパケットのブロックで送信されたす。 最埌を陀く各ブロックには、固定数のパケットが含たれおいたす。 これは、送信/受信りィンドりのサむズず同じです。 デヌタの最埌のブロックのパケットが少なくなる堎合がありたす。 各ブロックを送信した埌、送信偎は配信の確認、たたは倱われたパケットの再配信の芁求を期埅し、応答を受信するための受信/送信りィンドりを開いたたたにしたす。 ブロックの配信の確認を受信した埌、送信/受信りィンドりがシフトされ、次のデヌタブロックが送信されたす。



受信偎はパケットを受け入れたす。 各パケットは、送信りィンドりに入るためにチェックされたす。 りィンドりに入らないパッケヌゞおよび重耇は削陀されたす。 なぜなら りィンドりサむズは固定されおおり、受信偎ず送信偎で同じです。パケットブロックを損倱なしで配信する堎合、りィンドりは次のデヌタブロックのパケットを受信するようにシフトされ、配信の確認が送信されたす。 䜜業タむマヌで蚭定された期間内にりィンドりがいっぱいにならない堎合、パケットが配信されなかったチェックが開始され、再配信のリク゚ストが送信されたす。

再送信チャヌト




プロトコルのタむムアりトずタむマヌ



接続を確立できない理由はいく぀かありたす。 たずえば、受信偎がオフラむンの堎合。 この堎合、接続を確立しようずするず、タむムアりトによっお接続が閉じられたす。 Reliable UDP実装では、2぀のタむマヌを䜿甚しおタむムアりトを蚭定したす。 最初のタむマヌは、リモヌトホストからの応答を埅぀ために䜿甚されたす。 送信偎でトリガヌされた堎合、最埌に送信されたパケットが再床送信されたす。 レシヌバヌが起動するず、レシヌバヌは倱われたパケットをチェックし、再配信のリク゚ストを送信したす。



2番目のタむマヌ-ノヌド間の通信がない堎合に接続を閉じるために必芁です。 送信偎では、ワヌキングタむマヌがトリガヌされた盎埌に起動し、リモヌトノヌドからの応答を埅ちたす。 指定した期間に応答がない堎合、接続は終了し、リ゜ヌスは解攟されたす。 受信偎では、クロヌズタむマヌはダブルオペレヌションタむマヌの埌に開始されたす。 これは、確認パッケヌゞの玛倱に察する保険に必芁です。 タむマヌがトリガヌされるず、接続も終了し、リ゜ヌスが解攟されたす。



信頌性の高いUDP送信状態図



プロトコルの原理はステヌトマシンに実装され、各ステヌトはパケット凊理の特定のロゞックを担圓したす。

信頌できるUDP状態図







クロヌズ -実際には、それは状態ではなく、マシンの開始点ず終了点です。 Closed状態は、非同期UDPサヌバヌを実装する䌝送制埡ナニットによっお取埗され、パケットを適切な接続にリダむレクトしお、状態凊理を開始したす。



FirstPacketSending-メッセヌゞを送信するずきに発信接続が存圚する初期状態。



この状態では、最初のパケットは通垞のメッセヌゞ甚に送信されたす。 送信の確認がないメッセヌゞの堎合、これが唯䞀の状態です。メッセヌゞ党䜓が送信されたす。



SendingCycle-メッセヌゞパケットを送信するための䞻芁な状態。



状態FirstPacketSendingからそれぞの遷移は、メッセヌゞの最初のパケットを送信した埌に実行されたす。 すべおの確認ず再送信の芁求が来るのはこの状態です。 解決方法は2぀ありたす-メッセヌゞの配信たたはタむムアりトが成功した堎合。



FirstPacketReceived-メッセヌゞ受信者の初期状態。



送信開始の正確性をチェックし、必芁な構造を䜜成し、最初のパケットの確認を送信したす。



単䞀のパケットで構成され、配信確認を䜿甚せずに送信されるメッセヌゞの堎合、これが唯䞀の状態です。 そのようなメッセヌゞを凊理した埌、接続は閉じられたす。



組み立おは、メッセヌゞパケットを受信するための基底状態です。



パケットを䞀時ストレヌゞに曞き蟌み、パケット損倱がないかどうかを確認し、パケットのブロックずメッセヌゞ党䜓の配信に関する確認を送信し、倱われたパケットの再配信芁求を送信したす。 メッセヌゞ党䜓が正垞に受信された堎合、接続は完了状態になりたす。それ以倖の堎合、タむムアりトにより終了したす。



完了 -メッセヌゞ党䜓が正垞に受信された堎合、接続を閉じたす。



この状態は、メッセヌゞの組み立おず、メッセヌゞ配信確認が送信者ぞのパスに沿っお倱われた堎合に必芁です。 この状態からの出口はタむムアりトしたすが、接続は正垞に閉じられたず芋なされたす。



コヌドの詳现。 トランスミッションコントロヌルナニット



Reliable UDPの重芁な芁玠の1぀は、䌝送制埡ナニットです。 このブロックのタスクは、珟圚の接続ず補助芁玠を保存し、察応する接続​​に着信パケットを配垃し、接続にパケットを送信するためのむンタヌフェむスを提䟛し、プロトコルAPIを実装するこずです。 送信制埡ナニットは、UDPレベルからパケットを受信し、凊理のためにステヌトマシンにリダむレクトしたす。 パケットを受信するために、非同期UDPサヌバヌが実装されおいたす。

ReliableUdpConnectionControlBlockクラスの䞀郚のメンバヌ
internal class ReliableUdpConnectionControlBlock : IDisposable { //     .      public ConcurrentDictionary<Tuple<EndPoint, Int32>, byte[]> IncomingStreams { get; private set;} //     .     . public ConcurrentDictionary<Tuple<EndPoint, Int32>, byte[]> OutcomingStreams { get; private set; } // connection record   . private readonly ConcurrentDictionary<Tuple<EndPoint, Int32>, ReliableUdpConnectionRecord> m_listOfHandlers; //    . private readonly List<ReliableUdpSubscribeObject> m_subscribers; //   private Socket m_socketIn; //     private int m_port; //  IP  private IPAddress m_ipAddress; //    public IPEndPoint LocalEndpoint { get; private set; } //    //    public StatesCollection States { get; private set; } //   .    TransmissionId private readonly RNGCryptoServiceProvider m_randomCrypto; //... }
      
      







非同期UDPサヌバヌの実装
 private void Receive() { EndPoint connectedClient = new IPEndPoint(IPAddress.Any, 0); //   ,   socket.BeginReceiveFrom byte[] buffer = new byte[DefaultMaxPacketSize + ReliableUdpHeader.Length]; //         this.m_socketIn.BeginReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref connectedClient, EndReceive, buffer); } private void EndReceive(IAsyncResult ar) { EndPoint connectedClient = new IPEndPoint(IPAddress.Any, 0); int bytesRead = this.m_socketIn.EndReceiveFrom(ar, ref connectedClient); // ,    Receive(); // ..       -     //  IAsyncResult.AsyncState byte[] bytes = ((byte[]) ar.AsyncState).Slice(0, bytesRead); //    ReliableUdpHeader header; if (!ReliableUdpStateTools.ReadReliableUdpHeader(bytes, out header)) { //    -   return; } //     connection record'   Tuple<EndPoint, Int32> key = new Tuple<EndPoint, Int32>(connectedClient, header.TransmissionId); //   connection record    ReliableUdpConnectionRecord record = m_listOfHandlers.GetOrAdd(key, new ReliableUdpConnectionRecord(key, this, header.ReliableUdpMessageType)); //        record.State.ReceivePacket(record, header, bytes); }
      
      







メッセヌゞ送信ごずに、接続情報を含む構造が䜜成されたす。 この構造は接続レコヌドず呌ばれたす 。

ReliableUdpConnectionRecordクラスの䞀郚のメンバヌ
 internal class ReliableUdpConnectionRecord : IDisposable { //     public byte[] IncomingStream { get; set; } //      public ReliableUdpState State { get; set; } // ,   connection record //     public Tuple<EndPoint, Int32> Key { get; private set;} //     public int WindowLowerBound; //    public readonly int WindowSize; //     public int SndNext; //     public int NumberOfPackets; //   (      Tuple) //     public readonly Int32 TransmissionId; //  IP endpoint –    public readonly IPEndPoint RemoteClient; //  ,     IP  //    MTU – (IP.Header + UDP.Header + RelaibleUDP.Header) public readonly int BufferSize; //    public readonly ReliableUdpConnectionControlBlock Tcb; //      BeginSendMessage/EndSendMessage public readonly AsyncResultSendMessage AsyncResult; //     public bool IsNoAnswerNeeded; //     (    ) public int RcvCurrent; //      public int[] LostPackets { get; private set; } //    .   bool. public int IsLastPacketReceived = 0; //... }
      
      







コヌドの詳现。 州



状態は、パケットのメむン凊理が行われるReliable UDPプロトコルの状態マシンを実装したす。 抜象クラスReliableUdpStateは、状態のむンタヌフェむスを提䟛したす。







プロトコルのロゞック党䜓は、䞊蚘のクラスず、たずえば接続レコヌドからReliableUdpヘッダヌを構築するなどの静的メ゜ッドを提䟛するヘルパヌクラスによっお実装されたす。



次に、プロトコルの基本的なアルゎリズムを決定するむンタヌフェむスメ゜ッドの実装を詳现に怜蚎したす。



DisposeByTimeoutメ゜ッド



DisposeByTimeoutメ゜ッドは、タむムアりト埌に接続リ゜ヌスを解攟し、メッセヌゞ配信の成功/倱敗を通知したす。

ReliableUdpState.DisposeByTimeout
 protected virtual void DisposeByTimeout(object record) { ReliableUdpConnectionRecord connectionRecord = (ReliableUdpConnectionRecord) record; if (record.AsyncResult != null) { connectionRecord.AsyncResult.SetAsCompleted(false); } connectionRecord.Dispose(); }
      
      







完了状態でのみ再定矩されたす。

Completed.DisposeByTimeout
 protected override void DisposeByTimeout(object record) { ReliableUdpConnectionRecord connectionRecord = (ReliableUdpConnectionRecord) record; //      SetAsCompleted(connectionRecord); }
      
      





ProcessPacketsメ゜ッド



ProcessPacketsメ゜ッドは、1぀たたは耇数のパッケヌゞの远加凊理を担圓したす。 盎接、たたはパケット埅機タむマヌを介しお呌び出されたす。



Assembling状態では、メ゜ッドはオヌバヌラむドされ、倱われたパケットをチェックし、最埌のパケットを受信しお​​正垞なチェックに合栌した堎合にCompleted状態に切り替えたす。

Assembling.ProcessPackets
 public override void ProcessPackets(ReliableUdpConnectionRecord connectionRecord) { if (connectionRecord.IsDone != 0) return; if (!ReliableUdpStateTools.CheckForNoPacketLoss(connectionRecord, connectionRecord.IsLastPacketReceived != 0)) { //   ,     foreach (int seqNum in connectionRecord.LostPackets) { if (seqNum != 0) { ReliableUdpStateTools.SendAskForLostPacket(connectionRecord, seqNum); } } //     ,     if (!connectionRecord.TimerSecondTry) { connectionRecord.WaitForPacketsTimer.Change(connectionRecord.ShortTimerPeriod, -1); connectionRecord.TimerSecondTry = true; return; } //      WaitForPacketTimer //     -     StartCloseWaitTimer(connectionRecord); } else if (connectionRecord.IsLastPacketReceived != 0) //   { //       ReliableUdpStateTools.SendAcknowledgePacket(connectionRecord); connectionRecord.State = connectionRecord.Tcb.States.Completed; connectionRecord.State.ProcessPackets(connectionRecord); //     //  ,  ,  //   ack         . //    -   //   Completed    StartCloseWaitTimer(connectionRecord); } //  ,  ack      else { if (!connectionRecord.TimerSecondTry) { ReliableUdpStateTools.SendAcknowledgePacket(connectionRecord); connectionRecord.WaitForPacketsTimer.Change(connectionRecord.ShortTimerPeriod, -1); connectionRecord.TimerSecondTry = true; return; } //     StartCloseWaitTimer(connectionRecord); } }
      
      





SendingCycle状態では、このメ゜ッドはタむマヌによっおのみ呌び出され、最埌のメッセヌゞを再送信するず同時に、閉じるタむマヌをアクティブにしたす。

SendingCycle.ProcessPackets
 public override void ProcessPackets(ReliableUdpConnectionRecord connectionRecord) { if (connectionRecord.IsDone != 0) return; //     // (     -   ,     ) ReliableUdpStateTools.SendPacket(connectionRecord, ReliableUdpStateTools.RetransmissionCreateUdpPayload(connectionRecord, connectionRecord.SndNext - 1)); //   CloseWait –        StartCloseWaitTimer(connectionRecord); }
      
      





Completed状態では、メ゜ッドは䜜業タむマヌを停止し、メッセヌゞをサブスクラむバヌに枡したす。

Completed.ProcessPackets
 public override void ProcessPackets(ReliableUdpConnectionRecord connectionRecord) { if (connectionRecord.WaitForPacketsTimer != null) connectionRecord.WaitForPacketsTimer.Dispose(); //       ReliableUdpStateTools.CreateMessageFromMemoryStream(connectionRecord); }
      
      





ReceivePacketメ゜ッド



FirstPacketReceived状態では、メ゜ッドの䞻なタスクは、最初のメッセヌゞパケットが実際にむンタヌフェむスに到着したかどうかを刀断し、単䞀パケットで構成されるメッセヌゞを収集するこずです。

FirstPacketReceived.ReceivePacket
 public override void ReceivePacket(ReliableUdpConnectionRecord connectionRecord, ReliableUdpHeader header, byte[] payload) { if (!header.Flags.HasFlag(ReliableUdpHeaderFlags.FirstPacket)) //   return; //    - FirstPacket  LastPacket -       if (header.Flags.HasFlag(ReliableUdpHeaderFlags.FirstPacket) & header.Flags.HasFlag(ReliableUdpHeaderFlags.LastPacket)) { ReliableUdpStateTools.CreateMessageFromSinglePacket(connectionRecord, header, payload.Slice(ReliableUdpHeader.Length, payload.Length)); if (!header.Flags.HasFlag(ReliableUdpHeaderFlags.NoAsk)) { //    ReliableUdpStateTools.SendAcknowledgePacket(connectionRecord); } SetAsCompleted(connectionRecord); return; } // by design  packet numbers   0; if (header.PacketNumber != 0) return; ReliableUdpStateTools.InitIncomingBytesStorage(connectionRecord, header); ReliableUdpStateTools.WritePacketData(connectionRecord, header, payload); //  - ,    connectionRecord.NumberOfPackets = (int)Math.Ceiling((double) ((double) connectionRecord.IncomingStream.Length/(double) connectionRecord.BufferSize)); //      (0) connectionRecord.RcvCurrent = header.PacketNumber; //      1 connectionRecord.WindowLowerBound++; //   connectionRecord.State = connectionRecord.Tcb.States.Assembling; //      //       if (header.Flags.HasFlag(ReliableUdpHeaderFlags.NoAsk)) { connectionRecord.CloseWaitTimer = new Timer(DisposeByTimeout, connectionRecord, connectionRecord.ShortTimerPeriod, -1); } else { ReliableUdpStateTools.SendAcknowledgePacket(connectionRecord); connectionRecord.WaitForPacketsTimer = new Timer(CheckByTimer, connectionRecord, connectionRecord.ShortTimerPeriod, -1); } }
      
      





SendingCycle状態では、このメ゜ッドは配信確認ず再送信芁求を受信するためにオヌバヌラむドされたす。

SendingCycle.ReceivePacket
 public override void ReceivePacket(ReliableUdpConnectionRecord connectionRecord, ReliableUdpHeader header, byte[] payload) { if (connectionRecord.IsDone != 0) return; if (!header.Flags.HasFlag(ReliableUdpHeaderFlags.RequestForPacket)) return; //     //    + 1,     int windowHighestBound = Math.Min((connectionRecord.WindowLowerBound + connectionRecord.WindowSize), (connectionRecord.NumberOfPackets)); //      if (header.PacketNumber < connectionRecord.WindowLowerBound || header.PacketNumber > windowHighestBound) return; connectionRecord.WaitForPacketsTimer.Change(connectionRecord.ShortTimerPeriod, -1); if (connectionRecord.CloseWaitTimer != null) connectionRecord.CloseWaitTimer.Change(-1, -1); //    : if (header.PacketNumber == connectionRecord.NumberOfPackets) { //   Interlocked.Increment(ref connectionRecord.IsDone); SetAsCompleted(connectionRecord); return; } //      c  if ((header.Flags.HasFlag(ReliableUdpHeaderFlags.FirstPacket) && header.PacketNumber == 1)) { //    SendPacket(connectionRecord); } //       else if (header.PacketNumber == windowHighestBound) { //   / connectionRecord.WindowLowerBound += connectionRecord.WindowSize; //     connectionRecord.WindowControlArray.Nullify(); //    SendPacket(connectionRecord); } //      –    else ReliableUdpStateTools.SendPacket(connectionRecord, ReliableUdpStateTools.RetransmissionCreateUdpPayload(connectionRecord, header.PacketNumber)); }
      
      





ReceivePacketメ゜ッドのAssembling状態では、着信パケットからメッセヌゞを組み立おるための䞻な䜜業が行われたす。

Assembling.ReceivePacket
 public override void ReceivePacket(ReliableUdpConnectionRecord connectionRecord, ReliableUdpHeader header, byte[] payload) { if (connectionRecord.IsDone != 0) return; //        if (header.Flags.HasFlag(ReliableUdpHeaderFlags.NoAsk)) { //   connectionRecord.CloseWaitTimer.Change(connectionRecord.LongTimerPeriod, -1); //   ReliableUdpStateTools.WritePacketData(connectionRecord, header, payload); //       -   if (header.Flags.HasFlag(ReliableUdpHeaderFlags.LastPacket)) { connectionRecord.State = connectionRecord.Tcb.States.Completed; connectionRecord.State.ProcessPackets(connectionRecord); } return; } //     int windowHighestBound = Math.Min((connectionRecord.WindowLowerBound + connectionRecord.WindowSize - 1), (connectionRecord.NumberOfPackets - 1)); //       if (header.PacketNumber < connectionRecord.WindowLowerBound || header.PacketNumber > (windowHighestBound)) return; //   if (connectionRecord.WindowControlArray.Contains(header.PacketNumber)) return; //   ReliableUdpStateTools.WritePacketData(connectionRecord, header, payload); //    connectionRecord.PacketCounter++; //         connectionRecord.WindowControlArray[header.PacketNumber - connectionRecord.WindowLowerBound] = header.PacketNumber; //     if (header.PacketNumber > connectionRecord.RcvCurrent) connectionRecord.RcvCurrent = header.PacketNumber; //   connectionRecord.TimerSecondTry = false; connectionRecord.WaitForPacketsTimer.Change(connectionRecord.ShortTimerPeriod, -1); if (connectionRecord.CloseWaitTimer != null) connectionRecord.CloseWaitTimer.Change(-1, -1); //     if (header.Flags.HasFlag(ReliableUdpHeaderFlags.LastPacket)) { Interlocked.Increment(ref connectionRecord.IsLastPacketReceived); } //      ,    //     else if (connectionRecord.PacketCounter == connectionRecord.WindowSize) { //  . connectionRecord.PacketCounter = 0; //    connectionRecord.WindowLowerBound += connectionRecord.WindowSize; //     connectionRecord.WindowControlArray.Nullify(); ReliableUdpStateTools.SendAcknowledgePacket(connectionRecord); } //      if (Thread.VolatileRead(ref connectionRecord.IsLastPacketReceived) != 0) { //   ProcessPackets(connectionRecord); } }
      
      





完了状態では、メ゜ッドの唯䞀のタスクは、メッセヌゞの正垞な配信の再確認を送信するこずです。

Completed.ReceivePacket
 public override void ReceivePacket(ReliableUdpConnectionRecord connectionRecord, ReliableUdpHeader header, byte[] payload) { //        , //   ack     if (header.Flags.HasFlag(ReliableUdpHeaderFlags.LastPacket)) { ReliableUdpStateTools.SendAcknowledgePacket(connectionRecord); } }
      
      





SendPacketメ゜ッド



FirstPacketSending状態では、このメ゜ッドは最初のデヌタパケットを送信したす。メッセヌゞが配信確認を必芁ずしない堎合、メッセヌゞ党䜓を送信したす。

FirstPacketSending.SendPacket
 public override void SendPacket(ReliableUdpConnectionRecord connectionRecord) { connectionRecord.PacketCounter = 0; connectionRecord.SndNext = 0; connectionRecord.WindowLowerBound = 0; //     -    //    if (connectionRecord.IsNoAnswerNeeded) { //    As Is do { ReliableUdpStateTools.SendPacket(connectionRecord, ReliableUdpStateTools.CreateUdpPayload(connectionRecord, ReliableUdpStateTools. CreateReliableUdpHeader(connectionRecord))); connectionRecord.SndNext++; } while (connectionRecord.SndNext < connectionRecord.NumberOfPackets); SetAsCompleted(connectionRecord); return; } //       ReliableUdpHeader header = ReliableUdpStateTools.CreateReliableUdpHeader(connectionRecord); ReliableUdpStateTools.SendPacket(connectionRecord, ReliableUdpStateTools.CreateUdpPayload(connectionRecord, header)); //   connectionRecord.SndNext++; //   connectionRecord.WindowLowerBound++; connectionRecord.State = connectionRecord.Tcb.States.SendingCycle; //   connectionRecord.WaitForPacketsTimer = new Timer(CheckByTimer, connectionRecord, connectionRecord.ShortTimerPeriod, -1); }
      
      





SendingCycle状態では、このメ゜ッドはパケットのブロックを送信したす。

SendingCycle.SendPacket
 public override void SendPacket(ReliableUdpConnectionRecord connectionRecord) { //    for (connectionRecord.PacketCounter = 0; connectionRecord.PacketCounter < connectionRecord.WindowSize && connectionRecord.SndNext < connectionRecord.NumberOfPackets; connectionRecord.PacketCounter++) { ReliableUdpHeader header = ReliableUdpStateTools.CreateReliableUdpHeader(connectionRecord); ReliableUdpStateTools.SendPacket(connectionRecord, ReliableUdpStateTools.CreateUdpPayload(connectionRecord, header)); connectionRecord.SndNext++; } //     ,     connectionRecord.WaitForPacketsTimer.Change( connectionRecord.ShortTimerPeriod, -1 ); if ( connectionRecord.CloseWaitTimer != null ) { connectionRecord.CloseWaitTimer.Change( -1, -1 ); } }
      
      





コヌドの詳现。 接続の䜜成ず確立



基本的な状態ず状態の凊理に䜿甚される方法に慣れたので、いく぀かのプロトコル操䜜䟋の詳现を分析できたす。

通垞のデヌタ䌝送図




接続の接続レコヌドを䜜成し、最初のパケットを送信するこずを詳现に怜蚎しおください。 転送の開始者は垞に、メッセヌゞを送信するためにAPIメ゜ッドを呌び出すアプリケヌションです。 次に、送信制埡ナニットのStartTransmissionメ゜ッドがアクティブになり、新しいメッセヌゞのデヌタ転送が開始されたす。

アりトバりンド接続の䜜成
 private void StartTransmission(ReliableUdpMessage reliableUdpMessage, EndPoint endPoint, AsyncResultSendMessage asyncResult) { if (m_isListenerStarted == 0) { if (this.LocalEndpoint == null) { throw new ArgumentNullException( "", "You must use constructor with parameters or start listener before sending message" ); } //     StartListener(LocalEndpoint); } //    ,   EndPoint  ReliableUdpHeader.TransmissionId byte[] transmissionId = new byte[4]; //    transmissionId m_randomCrypto.GetBytes(transmissionId); Tuple<EndPoint, Int32> key = new Tuple<EndPoint, Int32>(endPoint, BitConverter.ToInt32(transmissionId, 0)); //       , //         if (!m_listOfHandlers.TryAdd(key, new ReliableUdpConnectionRecord(key, this, reliableUdpMessage, asyncResult))) { //   –      m_randomCrypto.GetBytes(transmissionId); key = new Tuple<EndPoint, Int32>(endPoint, BitConverter.ToInt32(transmissionId, 0)); if (!m_listOfHandlers.TryAdd(key, new ReliableUdpConnectionRecord(key, this, reliableUdpMessage, asyncResult))) //     –   throw new ArgumentException("Pair TransmissionId & EndPoint is already exists in the dictionary"); } //     m_listOfHandlers[key].State.SendPacket(m_listOfHandlers[key]); }
      
      





最初のパケットの送信FirstPacketSending状態
 public override void SendPacket(ReliableUdpConnectionRecord connectionRecord) { connectionRecord.PacketCounter = 0; connectionRecord.SndNext = 0; connectionRecord.WindowLowerBound = 0; // ... //       ReliableUdpHeader header = ReliableUdpStateTools.CreateReliableUdpHeader(connectionRecord); ReliableUdpStateTools.SendPacket(connectionRecord, ReliableUdpStateTools.CreateUdpPayload(connectionRecord, header)); //   connectionRecord.SndNext++; //   connectionRecord.WindowLowerBound++; //    SendingCycle connectionRecord.State = connectionRecord.Tcb.States.SendingCycle; //   connectionRecord.WaitForPacketsTimer = new Timer(CheckByTimer, connectionRecord, connectionRecord.ShortTimerPeriod, -1); }
      
      





最初のパケットを送信した埌、送信者はSendingCycle状態に入りたす-パケットの配信の確認を埅ちたす。

受信偎は、EndReceiveメ゜ッドを䜿甚しお、送信されたパケットを受け入れ、新しい接続レコヌドを䜜成しお、パケットを事前解析枈みヘッダヌずずもにReceivePacketメ゜ッドによるFirstPacketReceived状態の凊理に枡したす

受信偎で接続を䜜成したす。
 private void EndReceive(IAsyncResult ar) { // ... //   //    ReliableUdpHeader header; if (!ReliableUdpStateTools.ReadReliableUdpHeader(bytes, out header)) { //    -   return; } //     connection record'   Tuple<EndPoint, Int32> key = new Tuple<EndPoint, Int32>(connectedClient, header.TransmissionId); //   connection record    ReliableUdpConnectionRecord record = m_listOfHandlers.GetOrAdd(key, new ReliableUdpConnectionRecord(key, this, header. ReliableUdpMessageType)); //        record.State.ReceivePacket(record, header, bytes); }
      
      





最初のパケットを受信しお​​送信確認FirstPacketReceivedステヌタス
 public override void ReceivePacket(ReliableUdpConnectionRecord connectionRecord, ReliableUdpHeader header, byte[] payload) { if (!header.Flags.HasFlag(ReliableUdpHeaderFlags.FirstPacket)) //   return; // ... // by design  packet numbers   0; if (header.PacketNumber != 0) return; //       ReliableUdpStateTools.InitIncomingBytesStorage(connectionRecord, header); //      ReliableUdpStateTools.WritePacketData(connectionRecord, header, payload); //  - ,    connectionRecord.NumberOfPackets = (int)Math.Ceiling((double) ((double) connectionRecord.IncomingStream.Length/(double) connectionRecord.BufferSize)); //      (0) connectionRecord.RcvCurrent = header.PacketNumber; //      1 connectionRecord.WindowLowerBound++; //   connectionRecord.State = connectionRecord.Tcb.States.Assembling; if (/*    */) // ... else { //   ReliableUdpStateTools.SendAcknowledgePacket(connectionRecord); connectionRecord.WaitForPacketsTimer = new Timer(CheckByTimer, connectionRecord, connectionRecord.ShortTimerPeriod, -1); } }
      
      





コヌドの詳现。 接続タむムアりトを閉じる



タむムアりト凊理は、Reliable UDPの重芁な郚分です。 䞭間ノヌドで障害が発生し、䞡方向のデヌタ配信が䞍可胜になった䟋を考えおみたしょう。

タむムアりト接続閉鎖図




図からわかるように、送信者の䜜業タむマヌはパケットブロックを送信した盎埌に開始されたす。 これは、 SendingCycle状態のSendPacketメ゜ッドで発生したす。

ワヌキングタむマヌを有効にするSendingCycle状態
 public override void SendPacket(ReliableUdpConnectionRecord connectionRecord) { //    // ... //     connectionRecord.WaitForPacketsTimer.Change( connectionRecord.ShortTimerPeriod, -1 ); if ( connectionRecord.CloseWaitTimer != null ) connectionRecord.CloseWaitTimer.Change( -1, -1 ); }
      
      





タむマヌ期間は、接続が䜜成されるずきに蚭定されたす。 デフォルトでは、ShortTimerPeriodは5秒です。 この䟋では、1.5秒に蚭定されおいたす。



着信接続の堎合、到着した最埌のデヌタパケットを受信した埌にタむマヌが開始されたす。これは、Assembling状態のReceivePacketメ゜ッドで発生したす

䜜業タむマヌを有効にするアセンブル状態
 public override void ReceivePacket(ReliableUdpConnectionRecord connectionRecord, ReliableUdpHeader header, byte[] payload) { // ... //   connectionRecord.TimerSecondTry = false; connectionRecord.WaitForPacketsTimer.Change(connectionRecord.ShortTimerPeriod, -1); if (connectionRecord.CloseWaitTimer != null) connectionRecord.CloseWaitTimer.Change(-1, -1); // ... }
      
      





着信接続では、ワヌキングタむマヌが埅機しおいる間にパケットが受信されたせんでした。タむマヌは機胜し、ProcessPacketsメ゜ッドを呌び出したした。このメ゜ッドでは、倱われたパケットが怜出され、再配信の芁求が初めお送信されたした。

再配信リク゚ストの送信アセンブル状態
 public override void ProcessPackets(ReliableUdpConnectionRecord connectionRecord) { // ... if (/*    */) { //      //     ,     if (!connectionRecord.TimerSecondTry) { connectionRecord.WaitForPacketsTimer.Change(connectionRecord.ShortTimerPeriod, -1); connectionRecord.TimerSecondTry = true; return; } //      WaitForPacketTimer //     -     StartCloseWaitTimer(connectionRecord); } else if (/*      */) { // ... StartCloseWaitTimer(connectionRecord); } //  ack      else { if (!connectionRecord.TimerSecondTry) { //   ack connectionRecord.WaitForPacketsTimer.Change(connectionRecord.ShortTimerPeriod, -1); connectionRecord.TimerSecondTry = true; return; } //     StartCloseWaitTimer(connectionRecord); } }
      
      





TimerSecondTry倉数はtrueに蚭定されたす。この倉数は、䜜業タむマヌを再起動したす。



たた、送信者は䜜業タむマヌをトリガヌし、最埌に送信されたパケットを再送信したす。

接続終了タむマヌを有効にするSendingCycle状態
 public override void ProcessPackets(ReliableUdpConnectionRecord connectionRecord) { // ... //     // ... //   CloseWait –        StartCloseWaitTimer(connectionRecord); }
      
      





次に、発信接続で、接続を閉じるためのタむマヌが開始されたす。

ReliableUdpState.StartCloseWaitTimer
 protected void StartCloseWaitTimer(ReliableUdpConnectionRecord connectionRecord) { if (connectionRecord.CloseWaitTimer != null) connectionRecord.CloseWaitTimer.Change(connectionRecord.LongTimerPeriod, -1); else connectionRecord.CloseWaitTimer = new Timer(DisposeByTimeout, connectionRecord, connectionRecord.LongTimerPeriod, -1); }
      
      





接続終了タむマヌはデフォルトで30秒です。



しばらくするず、受信偎の䜜業タむマヌが再トリガヌされ、リク゚ストが再床送信され、着信接続の接続



終了タむマヌが開始されたす。終了タむマヌがトリガヌされるず、䞡方の接続レコヌドのすべおのリ゜ヌスが解攟されたす。送信者は、アップストリヌムアプリケヌションぞの配信の倱敗を報告したすAPI Reliable UDPを参照。

接続レコヌドリ゜ヌスの解攟
 public void Dispose() { try { System.Threading.Monitor.Enter(this.LockerReceive); } finally { Interlocked.Increment(ref this.IsDone); if (WaitForPacketsTimer != null) { WaitForPacketsTimer.Dispose(); } if (CloseWaitTimer != null) { CloseWaitTimer.Dispose(); } byte[] stream; Tcb.IncomingStreams.TryRemove(Key, out stream); stream = null; Tcb.OutcomingStreams.TryRemove(Key, out stream); stream = null; System.Threading.Monitor.Exit(this.LockerReceive); } }
      
      







コヌドの詳现。デヌタ埩旧



パケット損倱の堎合のデヌタ転送回埩の図




タむムアりトによる接続のクロヌズですでに説明したように、䜜業タむマヌが切れるず、受信者は倱われたパケットをチェックしたす。パケットが倱われた堎合、受信者に到達しなかったパケットの数のリストが䜜成されたす。これらの番号は、特定の接続のLostPackets配列に入力され、再配信のリク゚ストが送信されたす。

再配信リク゚ストの送信アセンブル状態
 public override void ProcessPackets(ReliableUdpConnectionRecord connectionRecord) { //... if (!ReliableUdpStateTools.CheckForNoPacketLoss(connectionRecord, connectionRecord.IsLastPacketReceived != 0)) { //   ,     foreach (int seqNum in connectionRecord.LostPackets) { if (seqNum != 0) { ReliableUdpStateTools.SendAskForLostPacket(connectionRecord, seqNum); } } // ... } }
      
      





送信者は再配信リク゚ストを受け入れ、䞍足しおいるパッケヌゞを送信したす。この時点で、送信者はすでに接続を閉じるためのタむマヌを開始しおおり、芁求を受信するずリセットされたす。

倱われたパケットの再送信SendingCycle状態
 public override void ReceivePacket(ReliableUdpConnectionRecord connectionRecord, ReliableUdpHeader header, byte[] payload) { // ... connectionRecord.WaitForPacketsTimer.Change(connectionRecord.ShortTimerPeriod, -1); //     if (connectionRecord.CloseWaitTimer != null) connectionRecord.CloseWaitTimer.Change(-1, -1); // ... //      –    else ReliableUdpStateTools.SendPacket(connectionRecord, ReliableUdpStateTools.RetransmissionCreateUdpPayload(connectionRecord, header.PacketNumber)); }
      
      





再送信されたパケット図のパケット3は、着信接続によっお受信されたす。受信りィンドりがいっぱいになるようにチェックが行われ、通垞のデヌタ転送が埩元されたす。

受信りィンドりに入っおいるこずを確認したすアセンブル状態
 public override void ReceivePacket(ReliableUdpConnectionRecord connectionRecord, ReliableUdpHeader header, byte[] payload) { // ... //    connectionRecord.PacketCounter++; //         connectionRecord.WindowControlArray[header.PacketNumber - connectionRecord.WindowLowerBound] = header.PacketNumber; //     if (header.PacketNumber > connectionRecord.RcvCurrent) connectionRecord.RcvCurrent = header.PacketNumber; //   connectionRecord.TimerSecondTry = false; connectionRecord.WaitForPacketsTimer.Change(connectionRecord.ShortTimerPeriod, -1); if (connectionRecord.CloseWaitTimer != null) connectionRecord.CloseWaitTimer.Change(-1, -1); // ... //      ,    //     else if (connectionRecord.PacketCounter == connectionRecord.WindowSize) { //  . connectionRecord.PacketCounter = 0; //    connectionRecord.WindowLowerBound += connectionRecord.WindowSize; //     connectionRecord.WindowControlArray.Nullify(); ReliableUdpStateTools.SendAcknowledgePacket(connectionRecord); } // ... }
      
      





信頌できるUDP API



デヌタ転送プロトコルず察話するために、オヌプンクラスのReliable Udpがありたす。これは、䌝送制埡ナニットのラッパヌです。クラスの最も重芁なメンバヌは次のずおりです。

 public sealed class ReliableUdp : IDisposable { //     public IPEndPoint LocalEndpoint //   ReliableUdp   //      IP  //  .  0     //    public ReliableUdp(IPAddress localAddress, int port = 0) //      public ReliableUdpSubscribeObject SubscribeOnMessages(ReliableUdpMessageCallback callback, ReliableUdpMessageTypes messageType = ReliableUdpMessageTypes.Any, IPEndPoint ipEndPoint = null) //     public void Unsubscribe(ReliableUdpSubscribeObject subscribeObject) //    // :   XP  Server 2003  , ..  .NET Framework 4.0 public Task<bool> SendMessageAsync(ReliableUdpMessage reliableUdpMessage, IPEndPoint remoteEndPoint, CancellationToken cToken) //     public IAsyncResult BeginSendMessage(ReliableUdpMessage reliableUdpMessage, IPEndPoint remoteEndPoint, AsyncCallback asyncCallback, Object state) //     public bool EndSendMessage(IAsyncResult asyncResult) //   public void Dispose() }
      
      





サブスクリプションによっおメッセヌゞが受信されたす。コヌルバックメ゜ッドの眲名を委任したす。

 public delegate void ReliableUdpMessageCallback( ReliableUdpMessage reliableUdpMessage, IPEndPoint remoteClient );
      
      





メッセヌゞ

 public class ReliableUdpMessage { //  ,   public ReliableUdpMessageTypes Type { get; private set; } //   public byte[] Body { get; private set; } //    true –      //     public bool NoAsk { get; private set; } }
      
      





特定のタむプのメッセヌゞおよび/たたは特定の送信者をサブスクラむブするには、2぀のオプションパラメヌタヌReliableUdpMessageTypes messageTypeおよびIPEndPoint ipEndPointが䜿甚されたす。



メッセヌゞの皮類

 public enum ReliableUdpMessageTypes : short { //  Any = 0, //   STUN server StunRequest = 1, //   STUN server StunResponse = 2, //   FileTransfer =3, // ... }
      
      







メッセヌゞは非同期的に送信されたす;このため、プロトコルは非同期プログラミングモデルを実装したす。

 public IAsyncResult BeginSendMessage(ReliableUdpMessage reliableUdpMessage, IPEndPoint remoteEndPoint, AsyncCallback asyncCallback, Object state)
      
      





メッセヌゞの送信結果は、メッセヌゞが受信者に正垞に到達した堎合はtrue、タむムアりトにより接続が閉じられた堎合はfalseになりたす。

 public bool EndSendMessage(IAsyncResult asyncResult)
      
      







おわりに



この蚘事では倚くのこずは説明されおいたせん。スレッドマッチングメカニズム、䟋倖および゚ラヌ凊理、メッセヌゞ送信甚の非同期メ゜ッドの実装。しかし、プロトコルの䞭栞であるパケットの凊理、接続の確立、タむムアりトの解決のロゞックの説明は、あなたのために明確にされるべきです。



信頌できる配信プロトコルのデモ版は非垞に安定しおおり、柔軟性があり、以前に定矩された芁件を満たしおいたす。ただし、説明した実装を改善できるこずを付け加えたす。たずえば、スルヌプットを向䞊させ、タむマヌの期間を動的に倉曎するには、スラむディングりィンドりやRTTなどのメカニズムをプロトコルに远加できたす。たた、MTUを決定するメカニズムを実装するず䟿利です。接続のノヌド間ただし、倧きなメッセヌゞを送信する堎合のみ。



ご意芋、ご意芋をお埅ちしおおりたす。



PS詳现に興味がある堎合、たたはプロトコルをテストするだけの堎合は、GitHubeのプロゞェクトぞのリンク

Reliable UDP project



圹立぀リンクず蚘事



  1. TCPプロトコル仕様英語およびロシア語
  2. UDPプロトコル仕様英語ずロシア語
  3. RUDPプロトコルの説明draft-ietf-sigtran-reliable-udp-00
  4. 信頌できるデヌタプロトコルrfc 908およびrfc 1151
  5. シンプルなUDP配信怜蚌の実装.NETおよびUDPを䜿甚しおネットワヌクを完党に制埡
  6. NATブリッゞングメカニズムを説明する蚘事ネットワヌクアドレス倉換噚を介したピアツヌピア通信
  7. 非同期プログラミングモデルの実装CLR非同期の実装モデルをプログラミングし、どのようにIAsyncResultむンタヌデザむンパタヌンを実装したす
  8. 非同期プログラミングモデルからタスクベヌスの非同期テンプレヌトTAPのAPMぞの移行

    TPLおよび埓来の.NET非同期プログラミング

    ず他の非同期パタヌンおよびタむプずの盞互運甚




曎新むンタヌフェむスにタスクを远加するアむデアに぀いお、mayorovpずsidristijに感謝したす。ラむブラリず叀いOSずの互換性は壊れおいたせん。4番目のフレヌムワヌクは、XPサヌバヌず2003サヌバヌの䞡方をサポヌトしおいたす。



All Articles