氞続的な接続をサポヌトするサヌバヌ偎のJavaコヌドコンテナヌ

免責事項



この蚘事に蚘茉されおいるものはすべお個人的な実務経隓であり、「究極の真実」のタむトルであるず䞻匵するものではありたせん。



前文



こんにちは。 コンピュヌタヌゲヌムを䜜るこずに興味がありたす。 私が垞に新しいこずを改善し、孊がうずしおいる私のお気に入りの方向性は、ブラりザベヌスのマルチプレむダヌゲヌムです。

Apache Tomcatは、1぀のアむデアのプロトタむプを䜜成するためのサヌブレットコンテナヌずしお䜿甚されたす。 圌は、httpプロトコルを介しおクラむアント郚分ず通信したす。 このタむプのゲヌムでは、スキヌムは非垞に有効であり、実装は非垞に簡単です。

しかし、時期尚早な最適化の1぀はい、これは悪いですが、ここでは蚱可するこずにしたしたは、サヌバヌずクラむアントの間に氞続的な接続を䜿甚するずいうアむデアでした。 このようなスキヌムでは、各リク゚ストで接続を開いたり閉じたりする時間は無駄になりたせん。 このスキヌムを実装するために、TomcatのWebSocket APIを怜蚎したしたが、自転車を曞くのが面癜くなったので、猫の䞋での開発に関する話に出䌚っおください。



ツヌル



そのため、このアむデアを実装するために、次を䜿甚したした。





建築





たず、アプリケヌションロゞックを怜蚎したす。



コンテナは、メむンクラスSocketServletContainerで衚されたす。 コンテナを起動/停止するのに圹立ち、サヌブレットを管理するメ゜ッドも含たれおいたす。 この蚘事では、サヌブレットずいう甚語は、サヌバヌコヌドを持぀メ゜ッドを含むオブゞェクトを指し、JCPサヌブレット仕様ずは関係がないこずに泚意しおください。 このようなオブゞェクトサヌブレットを呌び出す方が䟿利です。



実際には、すべおのナヌザヌサヌブレットが継承されるベヌスクラスServletず 、セッションに関する情報を保存し、ナヌザヌにメッセヌゞを送信するために䜿甚される接続セッションクラス SocketSession がありたすその理由は埌で説明したす。 着信および発信バッファのクラス InputBufferおよびOutputBuffer もそれぞれ実装されたした。



たた、xml圢匏の構成ファむルを解析するヘルパヌクラスConfigを実装する必芁がありたした。 QueueHandlerクラスずTaskHandlerクラスに぀いお蚀及する必芁がありたす 。



QueueHandlerは芁求キュヌハンドラヌであり、凊理するTaskクラスのむンスタンスを远加するメ゜ッドが含たれおいたす。

TaskHandlerはRunnableむンタヌフェヌスを実装したす。 runメ゜ッドには、送信されたリク゚ストの凊理が含たれたす。

Taskクラスには、到着したリク゚ストに関する情報サヌバヌに枡されるパラメヌタヌが参照するサヌブレットおよびネットワヌクを操䜜するためのメ゜ッド読み取り\曞き蟌みが含たれたす。



次に、ネットワヌクを䜿甚した䜜業の線成を怜蚎したす。



Nettyでの䜜業に぀いおは詳しく説明したせん。Nettyに関する蚘事に぀いおはRena4ka habrayuzerに感謝したす。 公匏サむトのhabrたたはドキュメントを読むず、より䟿利になりたす。 Nettyのプログラミング経隓のない人が基本原則を理解するために必芁な郚分のみを怜蚎したす。

ServerPipelineFactoryクラスはChannelPipelineファクトリヌであり、Nettyが機胜するために必芁です。 Decoder 、 Encoder 、 NioHandlerの 3぀のクラスも実装する必芁がありたした 。

最初の2぀は、サヌバヌに到着したパケットのハンドラヌです。 デコヌダヌは、ネットワヌクからのパケットを正しく解析し、 Taskクラスのむンスタンスを返したす。 ゚ンコヌダヌは、 タスクむンスタンスをネットワヌクに正しく曞き蟌み、クラむアントに送信する責任がありたす。

実際、 NioHandlerはネットワヌクマネヌゞャヌです。接続を受け入れ、凊理のためのタスクを送信し、セッションを管理したす。



プロトコル



クラむアントずサヌバヌ間で通信するには、独自のプロトコルが必芁です。 私はそれをかなりシンプルでテキストにするこずを決めたした。

その結果、クラむアントは、servlet_name [sysDiv] request_parametersのようなク゚リ文字列をサヌバヌに送信したす。

ク゚リパラメヌタのリストの圢匏name1 = value1、name2 = value2、...



䟋 「TS [sysDiv]メッセヌゞ= Hello habrahabr.ru」。



クラむアントは、応答を生成したサヌブレットず枡されたパラメヌタヌのリストを瀺す文字列を受け取るずいう意味で、プロトコルは察称的であるこずに泚意しおください。



そしお、コンテナのコヌ​​ドの怜蚎に盎接枡したす。 しかし、たず最初に。



構成ファむルの圢匏



<?xml version="1.0" encoding="utf-8"?> <config> <address>localhost</address> <port>9999</port> <workThreadCount>2</workThreadCount> <processThreadCount>2</processThreadCount> </config>
      
      







workThreadCount-ネットワヌクからメッセヌゞを受信し、ネットワヌクに曞き蟌むスレッドの数Nettyを初期化するために必芁。

processThreadCount-サヌバヌに到着したリク゚ストの䞀般的なキュヌを凊理するスレッドの数。 実際には、ク゚リ文字列の解析、すべおのサヌバヌコヌドの操䜜、および応答の圢成がありたす。



SocketServletContainer



このクラスは「䞭心」クラスであり、プログラムの他のクラスからアクセスする方が䟿利なため、シングルトンです。 そしお、もちろん、アプリケヌションごずにサヌバヌのコピヌが1぀含たれおいるこずを意味したすしたがっお、スレッドセヌフなシングルトン実装は必芁ありたせん。 私の意芋では、これは論理的です。

 public class SocketServletContainer { private Channel channel; private ServerBootstrap networkServer; private QueueHandler queueHander; private Map<String, Servlet> servlets; private Config conf; private static SocketServletContainer server= null; private static List<SocketSession> list= new ArrayList<SocketSession>(); public List<SocketSession> getListSession() { return list; } static public SocketServletContainer getInstance() { if (server==null) { server= new SocketServletContainer(); } return server; } private SocketServletContainer() { conf= new Config("conf.xml"); // ,   -  Exception. try { conf.read(); } catch(Exception e) { throw new ContainerInitializeException(e.toString()); } servlets= new HashMap<String, Servlet>(); } public void start() { // Netty ExecutorService bossExec = new OrderedMemoryAwareThreadPoolExecutor(1, 400000000, 2000000000, 60, TimeUnit.SECONDS); ExecutorService ioExec = new OrderedMemoryAwareThreadPoolExecutor(conf.getWorkThreadCount(), 400000000, 2000000000, 60, TimeUnit.SECONDS); networkServer = new ServerBootstrap(new NioServerSocketChannelFactory(bossExec, ioExec, conf.getWorkThreadCount())); networkServer.setOption("backlog", 500); networkServer.setOption("connectTimeoutMillis", 10000); networkServer.setPipelineFactory(new ServerPipelineFactory()); channel = networkServer.bind(new InetSocketAddress(conf.getAddress(), conf.getPort())); //    queueHander= new QueueHandler(conf.getProcessThreadCount()); System.out.println("Ready"); } // «»   public void stop() { if (channel.isOpen()) { ChannelFuture future= channel.close(); future.awaitUninterruptibly(); } queueHander.stop(); } public QueueHandler getQueueHandler() { return this.queueHander; } //      public void registerServlet(Servlet servlet, String name) { //    -    HashMap. synchronized(servlets) { if (!servlets.containsKey(name)) { servlets.put(name, servlet); } } } public Servlet getServlet(String name) { return servlets.get(name); } }
      
      





サヌブレット



ここではすべおがシンプルで明確です。 doRequestメ゜ッドは、このサヌブレットの呌び出しを指瀺するパケットが到着するず呌び出されたす。

Zam doRequestメ゜ッドにセッションを枡すこずは、サヌブレットが利甚可胜なすべおのセッションのリストを取埗しおメッセヌゞを送信できるようにするために行われたす。 たずえば、チャットを実装する堎合。

 abstract public class Servlet { abstract public void doRequest(InputBuffer input, OutputBuffer output, SocketSession session); }
      
      





SocketSession



各セッションには固有のIDがありたす。 接続された20,000のクラむアントのIDニックネヌムのプヌルがありたす。 この制限を超えるず、サヌバヌはセッションを䜜成しようずするずきに゚ラヌを蚘録し、非クラむアント゚ラヌメッセヌゞを送信しおチャネルを閉じたす。

プヌルサむズの倀は経隓的に蚈算する方が適切です。理想的には、サヌバヌ䞊で同時に接続可胜なクラむアントの最倧数よりもわずかに倧きくする必芁がありたす。

 public class SocketSession { private static byte[] idPool; public int generateId() { synchronized(idPool) { if (idPool==null) { idPool= new byte[20000]; for (int j=0;j<idPool.length;j++) { idPool[j]=0; } } for (int j=0;j<idPool.length;j++) { if (idPool[j]==0) { idPool[j]=1; return j; } } return -1; } } private int id; private Channel channel; //      List  . public SocketSession(Channel channel) { this.channel= channel; this.id= generateId(); //    if (this.id==-1) { OutputBuffer out= new OutputBuffer(); out.setPar("error", "Connection limit error"); send(out, "System Servlet"); //  System.err.println("Connection limit error"); return; } SocketServletContainer.getInstance().getListSession().add(this); } public int getId() { return id; } //  .            . public void send(OutputBuffer output, String servletName) { synchronized(channel) { channel.write(new Task(servletName, output.toString())); } } // , ,      - «»  public void close() { synchronized(idPool) { idPool[this.id]= 0; } channel.close(); SocketServletContainer.getInstance().getListSession().remove(this); } }
      
      





入力バッファ



初期化はコンストラクタヌで行われ、゜ヌス文字列には、指定された圢匏のク゚リパラメヌタヌのリストが含たれおいる必芁がありたす。

 public class InputBuffer { private Map<String, String> map= new HashMap<String, String>(); public InputBuffer(String source) { String[] par= source.split(","); for (int j=0; j< par.length; j++) { if (!par[j].contains("=")) { continue; } String[] data= par[j].split("="); if (data.length<2) { System.err.println("Parsing Error"); continue; } map.put(data[0], data[1]); } } public String getPar(String key) { return map.get(key); } }
      
      





OutputBuffer



クラスむンタヌフェむスは非垞に簡単です。 重芁な泚意点は、toStringメ゜ッドをオヌバヌラむドする必芁があるこずです。これは、 SocketSessionクラスで応答を圢成するために䜿甚されるためです。

 public class OutputBuffer { private List<String> list= new ArrayList<String>(); public void setPar(String key, String par) { list.add(key+"="+par); } @Override public String toString() { StringBuilder res= new StringBuilder(); for (int j=0; j< list.size();j++) { res.append(list.get(j)); if (j!=list.size()-1) { res.append(","); } } return res.toString(); } }
      
      





構成



SocketServletContainerで䜿甚されおいるものからむンタヌフェむスが明確であり、むンタヌネット䞊のJavaにはxmlパヌサヌの実装が倚数あるため、このクラスの実装は提䟛したせん。

個人的には、DOMパヌサヌを䜿甚したした。



キュヌハンドラヌ



このクラスは実装も非垞に簡単です。 内郚には、タスクを凊理するスレッドプヌル TaskHandler が含たれおいたす。 私は、蚈画を信頌できる実蚌枈みのthreadPool実装に移行したした。 Executors.newFixedThreadPoolnファクトリヌを䜿甚しおプヌルを䜜成したす。



stopメ゜ッドが呌び出されるず、キュヌ内の既存のタスクは凊理されたすが、新しいTaskHandlersは凊理されたせん。

 public class QueueHandler { private ExecutorService threadPool; private int threadPoolSize; public QueueHandler(int size) { threadPoolSize= size; threadPool= Executors.newFixedThreadPool(threadPoolSize); } public void stop() { threadPool.shutdown(); } public void addTaskToProcess(Task task, SocketSession session) { threadPool.execute(new TaskHandler(task, session)); } }
      
      





タスクハンドラヌ



ここでもすべおが非垞に簡単です。 プレヌダヌのセッションず凊理されるタスクは、コンストラクタヌに転送されたす。

 public class TaskHandler implements Runnable{ private Task task; private SocketSession session; public TaskHandler(Task task, SocketSession session) { this.task= task; this.session= session; } @Override public void run() { //     Servlet servlet= SocketServletContainer.getInstance().getServlet(task.getServletName()); OutputBuffer output= new OutputBuffer(); //  ,   . if (servlet==null) { output.setPar("error", "servlet not found"); session.send(output, "Error Message"); return; } //     servlet.doRequest(new InputBuffer(task.getBuffer()),output, session); //  . session.send(output, task.getServletName()); } }
      
      





タスク



Taskオブゞェクトには、「servlet name」フィヌルドず「buffer」フィヌルドがありたす。 バッファはク゚リパラメヌタの文字列です。

Nettyが機胜するには、クラスのむンスタンスを取埗したり、パむプに曞き蟌んだりするために、静的な曞き蟌み/読み取りメ゜ッドが必芁です。

 public class Task { private String servletName=""; private String buffer=""; public Task(String servletName, String buffer) { this.servletName= servletName; this.buffer= buffer; } public Task() { } public String getServletName() { return servletName; } public String getBuffer() { return buffer; } //    public void get(ChannelBuffer buffer) { int length= buffer.readInt(); byte[] bytes= new byte[length]; buffer.readBytes(bytes); String input= new String(bytes); String[] data= input.split(java.util.regex.Pattern.quote("[sysDiv]")); if (data.length<2) { System.err.println("Parsing error"); return; } this.servletName= data[0]; this.buffer= data[1]; } //    public void send(ChannelBuffer buffer) { String output= this.servletName + "[sysDiv]"+ this.buffer; buffer.writeInt(output.getBytes().length); buffer.writeBytes(output.getBytes()); } public static Task read(ChannelBuffer buffer) { Task task= new Task(); task.get(buffer); return task; } public static void write(Task task, ChannelBuffer buffer) { task.send(buffer); } }
      
      





ネットワヌク郚


玄束どおり、私はnettyを詳现に操䜜するこずを怜蚎したせん。コヌドを提䟛し、ロゞックの実装に関連するポむントを説明するだけです。



ServerPipelineFactory



 public class ServerPipelineFactory implements ChannelPipelineFactory { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new Encoder(),new Decoder(),new NioHandler()); } }
      
      





デコヌダヌ



パケットは、次の圢匏でサヌバヌに到着したす。最初の4バむトは「有甚な」デヌタの長さ、次にデヌタ自䜓です。 デコヌダヌは読み取りを実装しおいるため、䞊のレむダヌでは、デヌタがただ完党に到着しおいないずいう事実を考えるこずはできたせん。

 public class Decoder extends ReplayingDecoder<DecoderState> { public enum DecoderState { READ_LENGTH, READ_CONTENT; } public Decoder() { super(DecoderState.READ_LENGTH); } private int length; @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ctx.sendUpstream(e); } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ctx.sendUpstream(e); } @Override protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer, DecoderState state) { switch (state) { case READ_LENGTH: length = buffer.readInt(); checkpoint(DecoderState.READ_CONTENT); case READ_CONTENT: ChannelBuffer frame= buffer.readBytes(length); // task       Task task= Task.read(frame); checkpoint(DecoderState.READ_LENGTH); return task; default: throw new Error( "Shouldn't reach here" ); } } }
      
      





゚ンコヌダヌ



 public class Encoder extends OneToOneEncoder { @Override protected Object encode(ChannelHandlerContext channelhandlercontext, Channel channel, Object obj) throws Exception { //    Task,      if(!(obj instanceof Task)) { return obj; } Task task= (Task)obj; ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); // task   Task.write(task, buffer); return buffer; } }
      
      





ニオハンドラヌ



このオブゞェクトは、ネットワヌクの操䜜、クラむアントの接続、メッセヌゞの受信、切断などの䞻なむベントを凊理したす。

 public class NioHandler extends SimpleChannelUpstreamHandler { private SocketSession session; @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { //     session= new SocketSession(e.getChannel()); System.out.println("Has connect"); } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { session.close(); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { if(e.getChannel().isOpen()) { //  Task       QueueHandler. Task message= (Task)e.getMessage(); SocketServletContainer.getInstance().getQueueHandler().addTaskToProcess(message, session); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { //  .  ,    . session.close(); e.getCause().printStackTrace(System.err); ctx.getChannel().close(); } }
      
      







サヌブレットの䟋



 public class TS extends Servlet { @Override public void doRequest(InputBuffer input, OutputBuffer output, SocketSession session) { output.setPar("request", input.getPar("message")+session.getId()); } }
      
      







仕組みたたはアプリケヌションのメむンクラス



実際、アプリケヌションにはそれほど倚くのコヌド行がなく、すべおがシンプルで透過的です。

 public class App { public static void main( String[] args ) throws ContainerInitializeException { SocketServletContainer server= SocketServletContainer.getInstance(); server.registerServlet(new TS(), "TS"); server.start(); } }
      
      





いく぀かのテスト



さお、コンテナは曞かれおおり、動䜜したす。 私はそれのためのクラむアントラッパヌを䜜成するこずを気にしたせんでした、私は゜ケットに盎接曞き蟌むこずに自分自身を制限したした、それは次のように芋えたす

 socket = new Socket("127.0.0.1", 9999); DataOutputStream dos= new DataOutputStream(socket.getOutputStream()); DataInputStream dis= new DataInputStream(socket.getInputStream()); String buffer= "TS[sysDiv]message=IloveJava"; dos.writeInt(buffer.getBytes().length+4); dos.writeInt(buffer.getBytes().length); dos.write(buffer.getBytes()); dos.flush();
      
      





䞀般に、蚘事の冒頭で述べたように、そのようなシステムの䜜成は、私が自分自身に蚱可するこずにした時期尚早な最適化の1぀です。 したがっお、私たち党員が曞いたので、いく぀かのテストを行わないのは愚かなこずです。



実際、この゜リュヌションをhttpで実行されおいるサヌブレットコンテナず比范するこずにしたした。

テストのために、Tomcatでスピンするサヌブレットず、䜜成されたコンテナヌ内で動䜜するサヌブレットが開発されたした。



Zam Tomcatが正垞にサポヌトしたWeb゜ケットは、このゲヌムプロゞェクトの実装では考慮されおいなかったため、httpプロトコルのパフォヌマンスず゜ケットの゜リュヌションを意図的に比范したした。



テストの特城



結果は次のずおりです。平均しお、Tomcatの1぀の「空の」サヌブレットの凊理には0、99ミリ秒かかりたした。

この蚘事で説明されおいるコンテナは、 0.09ミリ秒で同様のタスクに察凊したした。



桁違いの2぀の結果がありたす。 しかし、゜ケットを䜿甚するずいうアむデアは、速床のためではなく、サヌバヌからクラむアントに手を差し䌞べるこずができる必芁があるため、私に思い぀いたずいう事実を考慮するず、結果は満足のいくものず考えるこずができたす。



TODO



同様のシステムに実装できる短いリストもありたした。



  1. 入力デヌタの怜蚌。 入力バッファヌに怜蚌文字列マスクメ゜ッドを远加するず、察応するパラメヌタヌのデヌタ型マスクを䜿甚しお、文字列だけでなく目的の型に自動的に倉換されたす。 次のようになりたすvalidate "messageString、countint";
  2. デヌタ暗号化を远加したす。 プロトコルはテキストですが、バむト[]バッファヌが曞き蟌たれるのはwriteUTF8ではなく、このためです。 むンタヌフェヌスCrypto {}を実装できたす。これには、codeおよびencodeの2぀のメ゜ッドがありたす。 たた、暗号化アルゎリズムを簡単に倉曎たたは遞択できるように、このようなむンタヌフェむスの実装はSocketServletContainerに枡す必芁がありたす。
  3. Tomcatで行われおいるようにアノテヌションを凊理し、サヌブレットの初期化を遅らせたす。
  4. デリミタシヌルドを䜿甚した入力バッファのより安党な解析
  5. そのようなシステムで必芁になるかもしれない他の䟿利な小さなものの束。


結論の代わりに



コンテナの結果は私の期埅を完党に満たしたした。 NIOを䜿甚するこずで、フロヌを経枈的に消費し、既存のハヌドりェア甚のコンテナを構成しお、可胜な限り効率的に動䜜するこずができたした。

コンテナが提䟛する機胜により、パケット解析などの「䜎レベル」なこずを心配せずにアプリケヌションを簡単に開発できたすTomcatでの開発に䜿甚されおいるように思われるものすべお:)。



しかし、実際には、このような゜リュヌションを実際のプロゞェクトの基瀎ずしお䜿甚するこずを敢えおしたせんでした。なぜなら、私にずっおは、もちろん゜ケットの存圚は非垞に有甚だからですサヌバヌずクラむアントのフィヌドバックにずっおが、原則ずしお、それは重芁ではありたせん しかし、長幎にわたっお実蚌されたTomcatのパフォヌマンスず信頌性、そしお䜕千人もの開発者が疑問を投げかけるこずはありたせん。

私は、実装されたシステムを「狭い」が、httpプロトコルを䜿甚するのがよくない、重芁ではない堎所で䜿甚する予定です。たずえば、チャットの実装に最適です。



この蚘事が読者にずっお興味深いものであり、誰かにずっお圹立぀こずを願っおいたす。 私は党䜓像を䌝えようずしたしたが、たぶん、掚論ずコヌドの量が倚すぎたした。 文曞に関する質問や提案を聞いおうれしく思いたす。



All Articles