RxおよびHttpListenerを使用したC#のイベント駆動型HTTPサーバー

十分な名前ですか? え? この投稿では、 Reactive Extensionsのパワーを使用して、C#で単純なイベント指向のHTTPサーバーを作成する代替アプローチを紹介します。



はじめに


説明するのが苦手なので、イベントモデルnode.jsに関する非常に興味深いDan Yorkの記事を引用します。

Webサーバーの「従来の」モードは、常にストリームモデルに基づいています。 Apacheまたは他のWebサーバーを起動すると、接続の受け入れが開始されます。 接続を受け入れると、ページまたは別のトランザクションの処理が完了するまで接続を開いたままにします。 ディスクからページを読み込んだり、データベースに結果を書き込んだりするのに数マイクロ秒かかる場合、Webサーバーは入出力操作に対してブロックされます。 (これは「ブロッキングI / O」と呼ばれます)。 このタイプのサーバーを拡張するには、サーバー自体の追加のコピーを実行する必要があります(通常、各コピーには追加のオペレーティングシステムスレッドが必要なので、「スレッドベース」と呼ばれます)。

対照的に、Node.JSはイベント駆動型モデルを使用します。このモデルでは、Webサーバーが要求を受け入れ、すぐに処理し、次の要求のためにそれらを受け取ります。 最初の要求が完了すると、処理キューに戻り、キューの最後に達すると、結果が返されます(または、次のアクションを必要とするすべてが実行されます)。 このモデルは非常に効率的でスケーラブルです。通常、Webサーバーは常にリクエストを受け入れるためです。 単一の読み取りまたは書き込み操作が完了するまで待機しません。 (このメソッドは「ノンブロッキングI / O」または「イベント指向I / O」と呼ばれます)。



.NETの世界で何が起こっているのですか?


.NETエコシステムでは、これについて多くのことが発生します。



代替アプローチ


HttpListenerクラスとReactive Extensionsを使用して、次のようなものを作成できます。

public class HttpServer : IObservable<RequestContext>, IDisposable { private readonly HttpListener listener; private readonly IObservable<RequestContext> stream; public HttpServer(string url) { listener = new HttpListener(); listener.Prefixes.Add(url); listener.Start(); stream = ObservableHttpContext(); } private IObservable<RequestContext> ObservableHttpContext() { return Observable.Create<RequestContext>(obs => Observable.FromAsyncPattern<HttpListenerContext>(listener.BeginGetContext, listener.EndGetContext)() .Select(c => new RequestContext(c.Request, c.Response)) .Subscribe(obs)) .Repeat() .Retry() .Publish() .RefCount(); } public void Dispose() { listener.Stop(); } public IDisposable Subscribe(IObserver<RequestContext> observer) { return stream.Subscribe(observer); } }
      
      





このコードに関する注意事項:



使用例


この概念に基づいて、あらゆるタイプのWebアプリケーションを作成できます。 「hello world」レベルのアプリケーションは次のようになります。

 static void Main() { //a stream os messages var subject = new Subject<string>(); using(var server = new HttpServer("http://*:5555/")) { var handler = server.Where(ctx => ctx.Request.Url.EndsWith("/hello")) .Subscribe(ctx => ctx.Respond(new StringResponse("world"))); Console.ReadLine(); handler.Dispose(); } }
      
      





非同期のみを行うことをお勧めします。 たとえば、データベースに接続する場合、これは非同期操作である必要があり、コールバック/オブザーバブル/タスクなどを一緒に保持する必要があります。



共有したいさらに興味深いアプリケーションがあります。これはロングポーリングと呼ばれます。

ロングポーリングは、従来のポーリング手法のバリエーションであり、サーバーからクライアントへの情報の送信をエミュレートできます。 長いポーリングでは、クライアントは通常の要求と同じ方法でサーバーに情報を要求します。 ただし、サーバーがクライアントに利用可能な情報を持っていない場合、空の応答を送信する代わりに、サーバーは要求を保持し、情報が利用可能になるのを待ちます。



したがって、上記のコードを使用した長いポーリングの最も簡単な例は次のとおりです。

 class Program { static void Main() { //a stream os messages var subject = new Subject<string>(); using(var server = new HttpServer("http://*:5555/")) { //the listeners stream and subscription var listeners = server .Where(ctx => ctx.Request.HttpMethod == "GET") .Subscribe(ctx => subject.Take(1) //wait the next message to end the request .Subscribe(m => ctx.Respond(new StringResponse(m)))); //the publishing stream and subscrition var publisher = server .Where(ctx => ctx.Request.HttpMethod == "POST") .Subscribe(ctx => ctx.Request.InputStream.ReadBytes(ctx.Request.ContentLength) .Subscribe(bts => { ctx.Respond(new EmptyResponse(201)); subject.OnNext(Encoding.UTF8.GetString(bts)); })); Console.ReadLine(); listeners.Dispose(); publisher.Dispose(); } } }
      
      





ご覧のとおり、オブザーバーを機能させます...ブロッキング操作はありません。 ストリームからの読み取りも非同期操作です。



動作するコードを見たいですか?


以下は、コードがどのように機能するかを示すビデオです。

画像

そして最終的に、ステップごとに掘り下げて調査したい場合、または単に探索したい場合は、 ここでソースコードをオープンソースで公開します

Gustavo Machado、Silvio Massari、Nancyフレームワークのスタッフに、私が彼らから盗んだコードの一部とヒントに感謝します。




All Articles