プロローグ
最近、Windowsアプリケーションでソケットを効果的に使用する必要に直面しました。 このタスクは、ビジーなサーバーの典型的なものです。 ここでは、実装言語であるDelphiのみが非定型のように見えます。
I / O完了ポートを使用して、多数のソケットで大量の非同期作業を行う方法を説明したいと思います。 マイクロソフトでは、この技術を使用することを推奨しています。 多くの人がそれに慣れていると思いますが、 念のため、MSDNへのリンクを示します。 このテクノロジーの本質は、システムがイベントの非常に効率的なキューを編成し、プログラムがスレッドプールからそれを処理することです。スレッドプールのサイズは、コンピューティングコアの数によって選択されます。 このアプローチには、さまざまなエンドポイントに対して多数の非同期入出力操作が同時に実行されるという利点があります。 完成したソースはすぐに(より良い) ここを見ることができます 。 すべてが完璧というわけではありませんが、実験のためにはそれで十分です。
ロードマップ
私はある意味で、オブジェクトの編成と入出力操作に関連するすべての点でNode.Jsのイデオロギーを遵守します。
サーバー側の場合、次を実装する必要があります。
- ソケットを聞いています。 新しい化合物の受け入れまたは拒否。
- 追跡クライアントソケットクローズ信号。
クライアントの場合、このリストの最初の項目は関係ありませんが、サーバーへの非同期接続を実装する必要があります。 両方のクラスで、1つのエンドポイントの読み取りと書き込みを同時に行うことができます。
作成されたすべてのクライアントおよびサーバーソケットインスタンスは、1つの共通メッセージキューと1つのスレッドプールを使用します。 これは、1つのアプリケーションで両方のタイプのソケットを最適な方法で使用できるようにするために必要です。
実装
始めましょう。 まず、構築の完全に非同期のイベントモデルに関連して、クラスではなくインターフェイスを実装することに注意してください。 この場合、割り当てられたメモリに対する責任が最終的なプログラマから削除されるため、これは非常に便利です。 とにかく、ここで別の方法でその使用を追跡することは、非常にコストがかかるか、完全に不可能です。 モジュールの初期化中に多くの作業が発生するはずです。
- さまざまなタイプのソケットのリストを作成します。
- ソケットサブシステムの初期化。
- メッセージキューを作成します。
- キューを処理するプールを作成します。
- ソケットのイベントを作成します。
- ソケットイベントを監視するストリームの作成(たとえば、新しいクライアントの接続)。
したがって、初期化セクションには、リストをポイントごとに実装する次の手順が含まれています。
procedure Init; var WSAData: TWsaData; i: Integer; begin gClients := TProtoStore.Create; gListeners := TProtoStore.Create; gServerClients := TProtoStore.Create; if WSAStartup(MAKEWORD(2, 2), WSAData) <> 0 then raise IOCPClientException.Create(sErrorInit_WSAtartup); gIOCP := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, CPUCount * 2); if gIOCP = INVALID_HANDLE_VALUE then raise IOCPClientException.Create(sErrorInit_CreateIoCompletionPort); for i := 1 to CPUCount * 2 do begin SetLength(gWorkers, Length(gWorkers) + 1); gWorkers[Length(gWorkers) - 1] := TWorkerThread.Create(); end; gListenerAcceptEvent := WSACreateEvent; if gListenerAcceptEvent = WSA_INVALID_EVENT then raise IOCPClientException.Create(sErrorInit_WSACreateEvent); gServerClientsCloseEvent := WSACreateEvent; if gServerClientsCloseEvent = WSA_INVALID_EVENT then raise IOCPClientException.Create(sErrorInit_WSACreateEvent); gClisentsConnectAndCloseEvents := WSACreateEvent; if gClisentsConnectAndCloseEvents = WSA_INVALID_EVENT then raise IOCPClientException.Create(sErrorInit_WSACreateEvent); gClientSocketEventThread := TSocketEventThread.Create (gClisentsConnectAndCloseEvents, gClients, ET_EVENT_SIGNALED); gClientSocketEventThread.Start; gServerClientsSocketEventThread := TSocketEventThread.Create (gServerClientsCloseEvent, gServerClients, ET_EVENT_SIGNALED); gServerClientsSocketEventThread.Start; gServerSocketEventThread := TSocketEventThread.Create(gListenerAcceptEvent, gListeners, ET_EVENT_SIGNALED); gServerSocketEventThread.Start; end;
この場合のCreateIoCompletionPort関数は、特別なメッセージキューを作成します。
同じTSocketEventThreadストリームクラスが、異なる目的でソケットのイベントを追跡するために使用されていることがわかります。 このクラスのストリームは、ソケットイベントを予期するプロシージャを実行し、イベントの発生に関するメッセージ(このストリームが提供するタイプの各ソケット)をすぐにキューに入れます。
procedure TSocketEventThread.WaitForClientsEvents; var WaitResult: DWORD; const TimeOut: DWORD = 100; begin WaitResult := WSAWaitForMultipleEvents(1, @fEvent, FALSE, TimeOut, FALSE); if WaitResult = WSA_WAIT_FAILED then raise IOCPClientException.Create (sErrorWaitForClientsEvents_WSAWaitForMultipleEvents); if WaitResult = WSA_WAIT_EVENT_0 then begin if not WSAResetEvent(fEvent) then raise IOCPClientException.Create (sErrorWaitForClientsEvents_WSAResetEvent); fStore.Post(fKey); end; end;
メソッドfStore.Post(fKey)は次のとおりです。 また、メッセージをキューに送信します。
procedure TProtoStore.Post(CompletionKey: DWORD); var i: Integer; begin fLock.Enter; try for i := 0 to Length(ProtoArray) - 1 do begin ProtoArray[i]._AddRef; if not PostQueuedCompletionStatus(gIOCP, 0, CompletionKey, POverlapped(ProtoArray[i])) then begin ProtoArray[i]._Release; raise IOCPClientException.Create(sErrorPost_PostQueuedCompletionStatus); end; end; finally fLock.Leave; end; end;
特に注意すべき点は、インターフェースを備えたオブジェクトの使用です。
_AddRefメソッドは、オブジェクトが「キュー内」にあり、破棄すべきではないことを示すために使用されます。 (後で、処理後、_Releaseが呼び出されます)。 PostQueuedCompletionStatusプロシージャは、メッセージを直接キューに入れます。
プールは各メッセージを非同期的に処理します。
これを行うには、彼は次の手順を実行します。
procedure TWorkerThread.ProcessIOCP; var NumberOfBytes: DWORD; CompletionKey: NativeUInt; Overlapped: POverlapped; Proto: TIOCPSocketProto; begin if not((not GetQueuedCompletionStatus(gIOCP, NumberOfBytes, CompletionKey, Overlapped, INFINITE)) and (Overlapped = nil)) then begin if CompletionKey = ET_EVENT_SIGNALED then begin Proto := TIOCPSocketProto(Overlapped); with Proto do begin IOCPProcessEventsProc(); _Release; end end else if CompletionKey <> 0 then begin Proto := TIOCPSocketProto(CompletionKey); if Proto.IOCPProcessIOProc(NumberOfBytes, Overlapped) then Proto._Release; end; end end;
GetQueuedCompletionStatusプロシージャは、キューからメッセージを受信するために使用されます。 次に、このメッセージが入力/出力の完了に関するメッセージであるか、イベントに関するこのメッセージであるかが判別されます。 ここでは、キューを介して情報を渡す2つのメソッド、この場合はソケットクラスの特定のインスタンスへのリンクを示します。
処理は、すべてのタイプのソケットに対して統合されています。これは、共通ハンドラーを含む共通祖先からの継承を使用して実現され、それらの再定義が許可されます。
ソケットイベントを処理するメカニズムを検討してください。
procedure TIOCPSocketProto.IOCPProcessEventsProc(); var WSAEvents: TWsaNetworkEvents; AcceptedSocket: TSocket; RemoteAddress: string; begin if fStateLock <> CLI_SOCKET_LOCK_CLOSED then begin fClosingLock.BeginRead; try if (fStateLock <> CLI_SOCKET_LOCK_CLOSED) then if WSAEnumNetworkEvents(fSocket, 0, WSAEvents) <> SOCKET_ERROR then begin if ((WSAEvents.lNetworkEvents and FD_CONNECT) <> 0) then begin if 0 = WSAEvents.iErrorCode[FD_CONNECT_BIT] then InterlockedExchange(fStateLock, CLI_SOCKET_LOCK_CONNECTED); CallOnConnect; end; if ((WSAEvents.lNetworkEvents and FD_CLOSE) <> 0) and (0 = WSAEvents.iErrorCode[FD_CLOSE_BIT]) then CallOnClose; if ((WSAEvents.lNetworkEvents and FD_ACCEPT) <> 0) and (0 = WSAEvents.iErrorCode[FD_ACCEPT_BIT]) then begin AcceptedSocket := DoAccept(RemoteAddress); if AcceptedSocket <> INVALID_SOCKET then begin fClientClass.Create(AcceptedSocket, fOnConnect, fOnClose, RemoteAddress).Prepare; end; end; end finally fClosingLock.EndRead; end; end; end;
TMultiReadExclusiveWriteSynchronizerクラスは、ここで興味深いことに適用されます。 ソケットを閉じて、プール内の別のスレッドからオブジェクトを破棄する試みを防ぐために使用されます(fClosingLock.BeginRead)。 ソケットの作成および終了操作を除き、ソケットを使用したすべての操作は、この同期オブジェクトの読み取り操作として実行されます。書き込み操作であるため、リソースの排他的所有権でのみ実行できます。
他のすべての点では、この手順でソケットを操作することは完全に普通です。
この手順で検討する価値がある唯一のことは、新しいクライアントをサーバーに接続するDoAcceptメソッドです。
function TIOCPSocketProto.DoAccept(var RemoteAddress: string): TSocket; var addr: TSockAddr; addrlen: Integer; dwCallbackData: NativeUInt; RemoteAddrLen: DWORD; begin dwCallbackData := NativeUInt(self); addrlen := SizeOf(addr); Result := WSAAccept(fSocket, @addr, @addrlen, ServerAcceptCallBack, dwCallbackData); if Result <> INVALID_SOCKET then begin SetLength(RemoteAddress, 255); RemoteAddrLen := Length(RemoteAddress); if WSAAddressToString(addr, addrlen, nil, PChar(@RemoteAddress[1]), RemoteAddrLen) = SOCKET_ERROR then raise IOCPClientException.Create(sErrorAccept_WSAAddressToString); SetLength(RemoteAddress, RemoteAddrLen - 1) end end;
ここで重要なのは、WSAAcceptを使用することです。 この関数を使用すると、クライアントが実際にFD_CONNECTイベントを受け取るように、クライアント接続を拒否できます。
これは、いわゆるブラックリストを整理するための好ましい方法です。
さらに進みます。 入出力の構成を検討してください。 これは、読み取り操作を例として使用して行います。
procedure TIOCPSocketProto.Read(Length: DWORD; OnRead, OnReadProcess: TOnReadEvent); var Bytes, Flags: DWORD; WsaBuf: TWsaBuf; begin fClosingLock.BeginRead; try if fStateLock = CLI_SOCKET_LOCK_CONNECTED then begin if InterlockedCompareExchange(fReadLock, IO_PROCESS, IO_IDLE) = IO_IDLE then begin fOnRead := OnRead; fOnReadProcess := OnReadProcess; fReaded := 0; fReadBufLength := Length; fReadBuffer := nil; GetMem(fReadBuffer, Length); if fReadBuffer <> nil then begin Bytes := 0; FillChar(fOverlappedRead, SizeOf(fOverlappedRead), 0); WsaBuf.buf := fReadBuffer; WsaBuf.len := fReadBufLength; Flags := 0; Bytes := 0; _AddRef; if (WSARecv(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedRead, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then begin FreeMem(fReadBuffer, Length); InterlockedExchange(fReadLock, IO_IDLE); _Release; raise IOCPClientException.Create(sErrorRead_WSARecv); end; end else raise IOCPClientException.Create(sErrorRead_GetMem); end else raise IOCPClientException.Create(sErrorRead_InProcess); end else raise IOCPClientException.Create(sErrorRead_NotConnected); finally fClosingLock.EndRead; end; end;
ここでは、インターロックロックを使用する必要がありました。 これは非常に高速で、I / Oオプションをリコールする試みを遮断する必要性を満たします。 メモリは、操作ごとに1回、バッファの下に割り当てられます。 次に、非同期モードでのソケットからの読み取りが呼び出されます。 オブジェクトはAddRefで「マーク」され、キュー内で削除されないようにします。 パケットの読み取りが完了すると、これに関するメッセージは自動的にキューに入れられます。
完了した入力/出力に関するメッセージがキューから選択されるとどうなるかを考えてください。
function TIOCPSocketProto.IOCPProcessIOProc(NumberOfBytes: DWORD; Overlapped: POverlapped): Boolean; var Bytes, Flags: DWORD; WsaBuf: TWsaBuf; begin Result := FALSE; fClosingLock.BeginRead; try if Overlapped = @fOverlappedRead then begin if NumberOfBytes <> 0 then begin if fReadLock = IO_PROCESS then begin inc(fReaded, NumberOfBytes); if fReaded < fReadBufLength then begin CallOnReadProcess; WsaBuf.buf := fReadBuffer; inc(WsaBuf.buf, fReaded); WsaBuf.len := fReadBufLength; dec(WsaBuf.len, fReaded); Flags := 0; Bytes := 0; if (WSARecv(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedRead, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then begin CallOnRead; Result := True; end end else begin CallOnReadProcess; CallOnRead; Result := True; end; end end else begin CallOnRead; Result := True; end; end else if Overlapped = @fOverlappedWrite then begin if NumberOfBytes <> 0 then begin if fWriteLock = IO_PROCESS then begin inc(fWrited, NumberOfBytes); if fWrited < fWriteBufLength then begin CallOnWriteProcess; WsaBuf.buf := fWriteBuffer; inc(WsaBuf.buf, fWrited); WsaBuf.len := fWriteBufLength; dec(WsaBuf.len, fWrited); Flags := 0; Bytes := 0; if (WSASend(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedWrite, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then begin CallOnWrite; Result := True; end end else begin CallOnWriteProcess; CallOnWrite; Result := True; end; end end else begin CallOnWrite; Result := True; end; end finally fClosingLock.EndRead; end; end;
この手順の本質は、割り当てられたバッファが一杯にならない時点まで、ソケットの読み取りまたは書き込みが発生することです。 この場合の興味深い点は、オーバーレイされた構造を参照して操作のタイプを決定することです。 このリンクはキューによって提供され、読み取りおよび書き込みの構造が格納されているクラスの対応するフィールドと比較する必要があるだけです。
また、読み取り/書き込み操作が即座に実行された場合、それはまだキューに入ることは注目に値しますが、これはAPIを介して構成できます。
ソケットクラスの作成とキューイングも検討する価値があります。
constructor TIOCPClientSocket.Create(RemoteAddress: string; OnConnect, OnClose: TOnSimpleSocketEvenet); var lRemoteAddress: TSockAddr; lRemoteAddressLength: Integer; begin inherited Create(); fStore := gClients; fOnConnect := OnConnect; fOnClose := OnClose; fStateLock := 0; fRemoteAddressStr := RemoteAddress; fSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED); if fSocket = INVALID_SOCKET then raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSASocket); if (WSAEventSelect(fSocket, gClisentsConnectAndCloseEvents, FD_CONNECT or FD_CLOSE) = SOCKET_ERROR) then raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAEventSelect); if CreateIoCompletionPort(fSocket, gIOCP, NativeUInt(self), 0) = 0 then raise IOCPClientException.Create (sErrorTIOCPClientSocket_CreateIoCompletionPort); fStateLock := CLI_SOCKET_LOCK_CREATED; fStore.Add(self); lRemoteAddressLength := SizeOf(lRemoteAddress); lRemoteAddress.sa_family := AF_INET; if WSAStringToAddress(PChar(@fRemoteAddressStr[1]), AF_INET, nil, lRemoteAddress, lRemoteAddressLength) = SOCKET_ERROR then raise IOCPClientException.Create (sErrorTIOCPClientSocket_WSAStringToAddress); if (WSAConnect(fSocket, lRemoteAddress, lRemoteAddressLength, nil, nil, nil, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSAEWOULDBLOCK) then raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAConnect); end;
クライアントソケットコンストラクターでは、ソケット(WSASocket)が即座に作成され、キューに登録され(CreateIoCompletionPort)、イベントに関連付けられ、非同期接続関数(WSAConnect)を呼び出します。 接続の事実は、最初に考慮されたストリーム(ソケットでイベントを待機するストリーム)で予想されます。 これにより、このイベントがキューに入れられます。
エピローグ
私の意見では、この記事では、イベントプログラミング用のクラスを作成するための成功したテクニックについて簡単に説明します。
Delphiのソケットを使用して、高パフォーマンスの作業用のクラスを作成しました。 このトピックのカバーは一般的に非常に不十分であり、インターフェイスを使用し、IOCPを使用して安全な接続を作成するソケットコンテキスト(暗号化プロバイダーおよびWinsock Secure Socket Extensions)のトピックに関する別の2-3投稿でこの出版物を続ける予定です。 完全なサンプルコードはこちらです。