Netty High Performance NIOサーバー

前文


こんにちは。 私はCISで最大のMinecraftサーバーのメイン開発者です(誰がそれを必要としているかは広告しません、彼らは知っています)。 ほぼ1年間、40人以上を対象に設計されたサーバー実装を作成しています(少なくとも500の数字が必要です)。 これまでのところ、すべてが成功していましたが、最近、システムは最も成功したネットワーク実装(入力用に1ストリーム、出力用に1 +処理用に1)、300人のプレイヤーがオンライン、980以上のフローが動作するという事実に基づいて動き始めました(+システム)これは、デフォルトのio Javaのパフォーマンスと組み合わされて、パフォーマンスが大幅に低下します。100人のプレーヤーでも、サーバーは主にネットワークとの間で読み書きを行います。



そこで、NIOに切り替えることにしました。 Nettyライブラリは偶然に手に入りましたが、その構造は既に機能しているソリューションに統合するのにぴったりのように思えました。 残念ながら、ロシア語だけでなく英語版のNettyマニュアルもほとんどないので、最善の方法を見つけるために多くのことを試し、ライブラリコードを調べなければなりませんでした。



ここでは、Nettyを介してネットワークを操作する際のサーバー部分をペイントしようとします。おそらく、それは誰かに役立つでしょう。



サーバー作成



ExecutorService bossExec = new OrderedMemoryAwareThreadPoolExecutor(1, 400000000, 2000000000, 60, TimeUnit.SECONDS); ExecutorService ioExec = new OrderedMemoryAwareThreadPoolExecutor(4 /*    */, 400000000, 2000000000, 60, TimeUnit.SECONDS); ServerBootstrap networkServer = new ServerBootstrap(new NioServerSocketChannelFactory(bossExec, ioExec, 4 /*       */)); networkServer.setOption("backlog", 500); networkServer.setOption("connectTimeoutMillis", 10000); networkServer.setPipelineFactory(new ServerPipelineFactory()); Channel channel = networkServer.bind(new InetSocketAddress(address, port));
      
      





フランスの同僚の経験によると、Nettyタスクを実行するためにOrderedMemoryAwareThreadPoolExecutorによって使用され、最も効果的です。 Executors.newFixedThreadPool(n)など、他のエグゼキューターを使用できます。 いかなる場合でもExecutors.newCachedThreadPool()を使用しないでください。それは不当に多くのスレッドを作成し、Nettyからの利益はほとんどありません。 4つ以上のワークフローを使用することは意味がありません。 それらは膨大な負荷に対処する以上のものです(4スレッドのXebia-Franceのプログラマーは、100,000を超える同時接続を引き出しました)。 ボスストリームは、リスニングポートごとに1つにする必要があります。 サーバーを後で停止できるように、バインド関数が返すチャネルServerBootsrapを保存する必要があります。



パイプライン工場



クライアントの接続とパッケージの処理方法はPipelineFactoryによって決定されます。PipelineFactoryは、クライアントとのチャネルが開かれると、そのパイプラインを作成し、そのチャネルで発生するイベントハンドラーが定義されます。 私たちの場合、これはServerPipelineFactoryです:

 public class ServerPipelineFactory implements ChannelPipelineFactory { @Override public ChannelPipeline getPipeline() throws Exception { PacketFrameDecoder decoder = new PacketFrameDecoder(); PacketFrameEncoder encoder = new PacketFrameEncoder(); return Channels.pipeline(decoder, encoder, new PlayerHandler(decoder, encoder)); } }
      
      





このコードでは、 PacketFrameDecoderPacketFrameEncoder、およびPlayerHandlerは、定義するイベントハンドラーです。 Channels.pipeline()関数は、ハンドラーが渡された新しいパイプラインを作成します。 注意:イベントは、パイプライン関数から渡された順序でハンドラーによって処理されます!



プロトコル



後で明確になるように、プロトコルについて少し説明します。



Packetクラスを拡張するクラスオブジェクトを使用してデータが交換されます。このクラスオブジェクトには、 get(ChannelBuffer入力)send(ChannelBuffer出力)の 2つの関数が定義されています。 したがって、最初の関数は必要なデータをチャネルから読み取り、2番目の関数はパケットデータをチャネルに書き込みます。

 public abstract class Packet { public static Packet read(ChannelBuffer buffer) throws IOException { int id = buffer.readUnsignedShort(); //  ID  ,  ,     Packet packet = getPacket(id); //      ID if(packet == null) throw new IOException("Bad packet ID: " + id); //         ,   packet.get(buffer); //       return packet; } public statuc Packet write(Packet packet, ChannelBuffer buffer) { buffer.writeChar(packet.getId()); //  ID  packet.send(buffer); //    } // ,       public abstract void get(ChannelBuffer buffer); public abstract void send(ChannelBuffer buffer); }
      
      





明確にするためのパッケージのペアの例:

 // ,       public class Packet1Login extends Packet { public String login; public void get(ChannelBuffer buffer) { int length = buffer.readShort(); StringBuilder builder = new StringBuilder(); for(int i = 0; i < length ++i) builder.append(buffer.readChar()); login = builder.toString(); } public void send(ChannelBuffer buffer) { //   , ..      } } // ,       ,      public class Packet255KickDisconnect extends Packet { public String reason; public void get(ChannelBuffer buffer) { int length = buffer.readShort(); StringBuilder builder = new StringBuilder(); for(int i = 0; i < length ++i) builder.append(buffer.readChar()); reason = builder.toString(); } public void send(ChannelBuffer buffer) { buffer.writeShort(reason.length()); for(int i = 0; i < reason.length(); ++i) { buffer.writeChar(reason.getCharAt(i)); } } }
      
      





ChannelBufferは 、1つにまとめられたDataInputStreamおよびDataOutputStreamに非常に似ています。 ほとんどの機能は、同じではないにしても、非常に似ています。 IOをブロックしているように、バッファに読み込むのに十分なバイトがあるかどうかをチェックする必要がないことに注意してください。 これについては後で...



カスタマーサービス



クライアントでの作業は、主にPlayerHandlerクラスによって決定されます。

 public class PlayerHandler extends SimpleChannelUpstreamHandler { private PlayerWorkerThread worker; @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { //     .    Worker  — ,      . //      ( e.getChannel()),        worker = new PlayerWorkerThread(this, e.getChannel()); } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { //   .   ,   ,    ,     .    ,     ,   ,       ,    . worker.disconnectedFromChannel(); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { //     Packet'  ,       worker.      . if(e.getChannel().isOpen()) worker.acceptPacket((Packet) e.getMessage()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { //    .  ,  . Server.logger.log(Level.WARNING, "Exception from downstream", e.getCause()); ctx.getChannel().close(); } }
      
      





ワーカーは単純にchannel.write(パケット)関数を使用してプレーヤーにデータを送信できます。ここで、channelはプレーヤーのチャンネルで、接続時に彼に送信され、packetはPacketクラスのオブジェクトです。 エンコーダーはパケットのエンコードを担当します。



デコーダーとエンコーダー



実際、システムの最も重要な部分は、ユーザーストリームからパケットパケットを生成し、同じパケットをストリームに送信することです。



エンコーダーは非常にシンプルで、プレーヤーにパケットを送信します。

 public class PacketFrameEncoder extends OneToOneEncoder { @Override protected Object encode(ChannelHandlerContext channelhandlercontext, Channel channel, Object obj) throws Exception { if(!(obj instanceof Packet)) return obj; //    ,      Packet p = (Packet) obj; ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); //          .      ,       — ChannelBuffers     ,    . Packet.write(p, buffer); //     return buffer; //  ,       } }
      
      







デコーダーはすでにはるかに複雑です。 実際には、クライアントから送信されたバッファーには、パケット全体を読み取るための十分なバイトがない場合があります。 この場合、ReplayingDecoderクラスが役立ちます。 デコード機能を実装し、そのストリームからデータを読み取るだけで、何も心配する必要はありません。

 public class PacketFrameDecoder extends ReplayingDecoder<VoidEnum> { @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ctx.sendUpstream(e); } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ctx.sendUpstream(e); } @Override protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer, VoidEnum e) throws Exception { return Packet.read(buffer); } }
      
      





問題は、それがどのように機能するかです。 非常に簡単に、デコード関数を呼び出す前に、デコーダーは現在の読み取りインデックスをマークします。バッファーから読み取るときに十分なデータがない場合、例外がスローされます。 この場合、バッファは初期位置に戻り、ユーザーからさらにデータを受信するとデコードが繰り返されます。 読み取りが成功した場合(null以外が返された場合)、デコーダーは、少なくとも1バイト以上残っている場合、既にバッファーに残っているデータに対してデコード関数を再度呼び出そうとします。



例外がスローされた場合、すべて動作が遅くなりますか? たとえば、バッファ内のデータ量をチェックして、パケットを読み取るのに十分かどうかを評価するよりも遅い。 ただし、キャッシュされた例外を使用するため、スタックトレースを埋めたり、新しい例外オブジェクトを作成する時間さえありません。 ReplayingDecoderのその他のパフォーマンス強化機能の詳細については、 こちらをご覧ください。



たとえば、IDによってパケットのサイズを事前に決定できる場合は、FrameDecoderを試すこともできます。



それがすべてのようです



結果は素晴らしかった。 まず、サーバーは1,000スレッドでストリーミングしなくなりました-4つのNettyストリーム+ 4つのデータ処理スレッドが250以上のクライアントで優れた仕事をします(テストが続行されます)。 第二に、プロセッサの負荷ははるかに小さくなり、接続の数から直線的に増加しなくなりました。 第三に、場合によっては応答時間が短くなりました。



誰かがこれが役立つことを願っています。 できるだけ多くの重要なデータを転送しようとしましたが、行き過ぎました。 結局のところ、多くの例はありませんか? あなたの答えを聞いて、厳密に判断しないでください-私が初めてHabrで書くとき。



追記:さらに便利なもの



Nettyには、特筆に値するその他の興味深い機能がいくつかあります。



まず、サーバーのシャットダウン:

 ChannelFuture future = channel.close(); future.awaitUninterruptibly();
      
      





ここで、channelは、バインド関数が最初に返したチャネルです。 future.awaitUninterruptibly()は 、チャネルが閉じてコードの実行が継続するまで待機します。



ハイライト:ChannelFuture。 channel.write(パケット)関数を使用してパケットをチャネルに送信すると、ChannelFutureが返されます。これは、実行されているアクションのステータスを監視する特別なオブジェクトです。 これにより、アクションが完了したかどうかを確認できます。



たとえば、切断パケットをクライアントに送信し、その背後のチャネルを閉じたいとします。 もしそうなら

 channel.write(new Packet255KickDisconnect("!")); channel.close();
      
      





99%の確率で、ChannelClosedExceptionが発生し、パケットはクライアントに到達しません。 しかし、あなたはこれを行うことができます:

 ChannelFuture future = channel.write(new Packet255KickDisconnect("!")); try { future.await(10000); //    10 ,    } catch(InterruptedException e) {} channel.close();
      
      





パケットがユーザーに送信されるまで実行スレッドをブロックできることを除き、すべてがスーパーになります。 したがって、リスナーをChannelFutureにハングアップさせることができます。これは、イベントが完了し、任意のアクションを実行することが通知されるオブジェクトです。 接続を閉じるために、既製のリスナーChannelFutureListener.CLOSEがあります。 使用例:

 ChannelFuture future = channel.write(new Packet255KickDisconnect("!")); furute.addListener(ChannelFutureListener.CLOSE);
      
      





効果は同じで、ロックはありません。 独自のリスナーを作成する方法を理解することは難しくありません-関数は1つだけです。 既製のクラスを開きます。ここでは例を挙げません。



より重要な情報


コメントに正しく記載されているように、ブロッキング操作を使用したり、ハンドラー(パイプラインでハングするハンドラー)で待機したりしないことをお勧めします。 そうしないと、処理の流れが永久に失われたり、他のクライアントのイベントの処理が単に遅くなるリスクがあります。



また、ハンドラーでは、「将来を待つ」ことはできません。 ChannelFutureで.await()または.awaitUninterruptibly()を実行します。 まず、あなたは成功しません、あなたはハンドラーからそれらを呼び出すことができません-システムはあなたにそのような愚かなことをさせず、例外を投げます。 第二に、そうでない場合、フローは再び停止し、他の顧客にサービスを提供できなくなります。



一般に、ChannelHandlersで実行されるすべてのアクションは、できるだけシンプルで、ブロックされないようにする必要があります。 いずれの場合も、データを直接処理しないでください-パケットをキューに入れて、別のスレッドで処理します。



All Articles