Vertxの非同期プログラミングの3つのパラダイム

非同期プログラミングの3つのパラダイム-コールバック、フューチャー、Vertxフレームワーク上の単純なWebアプリケーションの例のコルーチンを示したいと思います。 コードはKotlinで記述します。



HTTPリクエストで特定の文字列を受信するアプリケーションがあり、データベースでURLを検索し、そのURLに移動して、そのコンテンツをクライアントに送り返すとします。

Vertxは、負荷の高いアプリケーション用の非同期フレームワークとして構想され、netty、新しいIO、イベントバスを使用します



Vertxの慣例に従い、1つのVerticle(Akkaを知っている場合はアクターの類似物)がリクエストを受信し、イベントバス文字列を送信して、実際に作業に従事している他のBusinessVerticleのURLを検索します。



object Main { @JvmStatic fun main(args: Array<String>) { val vertx = Vertx.vertx() vertx.deployVerticle(HttpVerticle()) vertx.deployVerticle(BusinessVerticle()) } }
      
      





 class HttpVerticle : AbstractVerticle() { @Throws(Exception::class) override fun start(startFuture: Future<Void>) { val router = createRouter() vertx.createHttpServer() .requestHandler(router) .listen(8080) { result -> if (result.succeeded()) { startFuture.complete() } else { startFuture.fail(result.cause()) } } } private fun createRouter(): Router = Router.router(vertx).apply { get("/").handler(handlerRoot) } private val handlerRoot = Handler<RoutingContext> { rc -> vertx.eventBus().send("my.addr", rc.request().getParam("id") ?: "") { resp: AsyncResult<Message<String>> -> if (resp.succeeded()) { rc.response().end(resp.result().body()) } else { rc.fail(500) } } } }
      
      





標準APIでは、すべての非同期はコールバックを通じて行われるため、BusinessVerticleの初期実装は次のようになります。



 class BusinessVerticle : AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url,"/") .send { ar -> if (ar.succeeded()) { val response = ar.result() message.reply(response.bodyAsString()) } else { message.fail(500, ar.cause().message) } } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } } else { message.fail(500, res.cause().message) } } } }
      
      





率直に言って、まあまあ-コールバック地獄、特にエラー処理。



コールバックの第一人者が教えてくれるように、状況を改善してみましょう-個別のメソッドで各コールバックを選択することにより:



  class BusinessVerticle: AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> handleConnectionCallback(res, message) } } private fun handleConnectionCallback( res: AsyncResult<SQLConnection>, message: Message<String> ) { if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> handleQueryCallBack(res2, message) } } else { message.fail(500, res.cause().message) } } private fun handleQueryCallBack( res2: AsyncResult<ResultSet>, message: Message<String> ) { if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url, "/") .send { ar -> handleHttpCallback(ar, message) } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } private fun handleHttpCallback( ar: AsyncResult<HttpResponse<Buffer>>, message: Message<String> ) { if (ar.succeeded()) { // Obtain response val response = ar.result() message.reply(response.bodyAsString()) } else { message.fail(500, ar.cause().message) } } }
      
      





まあ、それは良くなりました。 しかし、まあまあ。



特に読みやすいコードではない多くの行では、応答のためにメッセージオブジェクトをドラッグする必要があります。エラー処理はコード全体に広がります。



Futuresを使用してこのがらくたを書き換えてみましょう

Futureは、 Future.compose()を使用して簡単に結合できるため、特に優れています。



最初に、コールバックを受け入れ、Futureを返すメソッドに何も返さない標準のVertxメソッドを変換します。



既存のクラスにメソッドを追加するKotlinの機能を利用します。



 fun JDBCClient.getConnectionF(): Future<SQLConnection> { val f = Future.future<SQLConnection>() getConnection { res -> if (res.succeeded()) { val connection = res.result() f.complete(connection) } else { f.fail(res.cause()) } } return f } fun SQLConnection.queryF(query:String): Future<ResultSet> { val f = Future.future<ResultSet>() query(query) { res -> if (res.succeeded()) { val resultSet = res.result() f.complete(resultSet) } else { f.fail(res.cause()) } } return f } fun <T,M> HttpRequest<T>.sendF(): Future<HttpResponse<M>> { val f = Future.future<HttpResponse<M>>() send() { res -> if (res.succeeded()) { val response = res.result() f.complete(response) } else { f.fail(res.cause()) } } return f }
      
      





そして、BusinessVerticle.handleMessageを次のようにします。



  private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) { // Obtain response val response = res.result() message.reply(response) } else { message.fail(500, res.cause().message) } } } private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.getConnectionF() val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") } val url = resultSet.map { it.rows[0].getString("url").removePrefix("http://") } val httpResponse = url.compose { webclient.get(it, "/").sendF() } val content = httpResponse.map { it.bodyAsString() } return content }
      
      





かっこいい。



シンプルで読みやすいコード。 一箇所でのエラー処理。 必要に応じて、異なる例外に対して異なる反応をさせることができます。たとえば、それを別の関数に入れることができます。



詩人の夢!



しかし、何らかの条件でFutureチェーンを終了する必要がある場合はどうなりますか?

たとえば、データベースに対応するエントリがない場合、例外(およびクライアントにコード500)をスローするのではなく、コード200で文字列「レコードなし」を返します。



Future.compose()からチェーンを終了する唯一の方法(私が知っているは、例外をスローすることです。



つまり このようなことをする必要があります:例外のタイプを決定し、データベースにエントリがなければこの例外をスローし、この例外を特別な方法で処理します。



 class NoContentException(message:String):Exception(message) private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.getConnectionF() val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") } val url = resultSet.map { if (it.numRows<1) throw NoContentException("No records") it.rows[0].getString("url").removePrefix("http://") } val httpResponse = url.compose { webclient.get(it, "/").sendF() } val content = httpResponse.map { it.bodyAsString() } return content } private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) { // Obtain response val response = res.result() message.reply(response) } else { if (res.cause() is NoContentException) message.reply(res.cause().message) else message.fail(500, res.cause().message) } } }
      
      





うまくいく!



しかし、すでに悪化しているように見えます。例外を使用して実行フローを制御することは美しくありません。 また、個別の処理を必要とするこのようなケースが多数ある場合、コードははるかに読みにくくなります。



Kotlinコルーチンで同じことを試してみましょう。

Habré( 1、2、3 、...)を含め、コルーチンについて多くのことが書かれているので、それらについては別に書きません。



Vertxの最新バージョンは、コールバックが受け入れるすべてのメソッドのコルーチンバージョンを自動的に生成します。



図書館をつなぐ

「vertx-lang-kotlin-coroutines」

「vertx-lang-kotlin」



そして、例えば、取得



 JDBCClient.getConnectionAwait() SQLConnection.queryAwait()
      
      





など



その後、メッセージ処理メソッドは、キュートでシンプルなものに変わります。



 private suspend fun handleMessage(message: Message<String>) { try { val content = getContent(message) message.reply(content) } catch(e:Exception){ message.fail(500, e.message) } } private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content }
      
      





さて、コルーチンコンテキストを提供して呼び出しを変更する必要があります。



 vertx.eventBus().consumer<String>("my.addr") { message -> GlobalScope.launch(vertx.dispatcher()) { handleMessage(message)} }
      
      





ここで何が起こっていますか?



Awaitを使用したこれらのメソッドはすべて、コードを非同期的に呼び出し、その結果を待機し、待機中にスレッドが別のコルーチンの実行に切り替えます。



ボンネットの下を見ると、次のようになります。



 suspend fun SQLClient.getConnectionAwait(): SQLConnection { return awaitResult { this.getConnection(it) } } suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T { val asyncResult = awaitEvent(block) if (asyncResult.succeeded()) return asyncResult.result() else throw asyncResult.cause() } suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T { return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> try { block.invoke(Handler { t -> cont.resume(t) }) } catch (e: Exception) { cont.resumeWithException(e) } } }
      
      





Futuresによる自己記述の実装に似たもの。



しかし、ここでは通常のコードを取得します-戻り値の型として(Futureの代わりに)文字列、AsyncResultでいコールバックの代わりにtry / catch



実行チェーンを途中で停止する必要がある場合、例外なく自然に見えます。



  private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") if (resultSet.numRows<1) return "No records" val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content }
      
      





私の意見では、素晴らしい!



All Articles