さまざまなプロジェクトをサポートする過程で、 Promise
不適切な作業のために生産Promise
問題が発生する状況に何度か遭遇しました。 さらに、この非常に間違った作業のパターンは常に同じでしたが、異なる装いに隠れていました。 さらに、誤ったコードはさまざまな人々によって書かれました。 さらに、 Promise
での作業に関する記事では、強調したい問題の言及を本当に見つけました。 だから私は多くの人が私が話す問題を忘れていると思います。
約束、未来、そして役者とともに、Scalaで非同期コードの多くの例を読むのは面白いですか? 猫へようこそ!
未来について少し
手始めに、 Future
についてのちょっとした理論を紹介します。 Scala標準ライブラリのこのタイプに精通している場合、この部分は安全にスキップできます。
ScalaはFuture[T]
型を使用して非同期計算を表します。 DBMSからキー値を抽出する必要があるとします。 このようなリクエストの同期関数の署名は次のようになります。
trait SyncKVStore[K, V] { def get(key: K): V }
その後、次のように使用できます。
val store: SyncKVStore[String, String] = ... val value = store.get("1234") // value String
このアプローチには欠点があります: get()
メソッドはブロックする可能性があり、比較的長い時間ネットワーク上でデータが送信される可能性があるためです。 ノンブロッキングにしようとすると、特性は次のようになります。
import scala.concurrent.Future trait KVStore[K, V] { def get(key: K): Future[V] }
実装を正しく書き換えるには、使用するDBMSの非同期ドライバーを使用する必要がある可能性があります。 例として、マップ上にメモリ内実装を記述します。
import scala.concurrent.ExecutionContext class DummyKVStore(implicit ec: ExecutionContext) extends KVStore[String, String] { // Future(...) // ExecutionContext def get(key: String): Future[String] = Future(map(key)) private val map = Map("1234" -> "42", "42 -> 13", "1a" -> "b3") }
取得した値は、たとえば次のように使用できます。
// - ExecutionContext import scala.concurrent.ExecutionContext.Implicits.global val store = new DummyKVStore // map ExecutionContext // store.get("1234").map { value => log.info(value) // DummyKVStore 42 }
Futureには、値を処理するための便利な非同期メソッドがいくつかあります;それらのいくつかを簡単に説明します。
-
map(f: A => B)
-先物の終わりに、関数f
を使用して成功した結果を変換します -
flatMap(f: A => Future[B])
-成功した結果も変換しますが、実際には非同期関数を受け入れます -
foreach(f: A => Unit)
-正常に完了すると、関数f
実行し、先物の結果を渡します -
onComplete(f: Try[A] => Unit)
と同じですが、任意の完了時に機能を実行します。 エラーあり
名前から推測できるように、これらすべてのメソッドはimplicit ec: ExecutionContext
パラメーターも使用することを明確にすることも価値があります。これには、先物の実行のコンテキストに関する情報が含まれます。
先物の詳細については、例えばこちらをご覧ください 。
約束:理論のさらに2つの段落
Promise
とは何ですか? 実際、これは先物を含む型指定された追記型コンテナです。
val p = Promise[Int]() p.success(42) p.future // Future 42
先物が完了するための多くの方法があります。例えば:
-
success(t: T)
-結果t
フューチャーを終了します -
failure(e: Throwable)
e
先物をファイルする -
complete(try: Try[T])
-try
がSuccess
場合、先物を終了します。 失敗したら失敗 -
completeWith(future: Future[T])
-future
終了したときに内部future
完了します
したがって、Promiseは先物を作成するために使用できます。
約束:使用の実践に没頭する
コールバックで既製の非同期Java APIの上に独自の非同期APIを実装することを想像してください。 先物で返したい結果はコールバックでのみ使用できるため、 Future.apply()
メソッドを直接使用することはできません。 ここで、 Promise
が役立ちます。 SOへのこの答えには、実世界でPromise
を使用した一見素晴らしい例があります。
def makeHTTPCall(request: Request): Future[Response] = { val p = Promise[Response]() registerOnCompleteCallback { buffer => val response = makeResponse(buffer) p.success(response) } p.future }
さて、Akka-HTTPなどの新しいWebサービスでこの関数を使用します。 開始するには、SBTで依存関係を接続します。
libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.0.10"
そして、サービスコードを記述します。
import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.marshalling.GenericMarshallers.futureMarshaller import scala.concurrent.ExecutionContext.Implicits.global object WebService extends App { // Akka-HTTP implicit val system = ActorSystem() val store = new DummyKVStore val route = // GET /value/$id (get & path("value" / IntNumber)) { id => complete { for { value <- store.get(id) response <- makeHTTPCall(new RequestImpl(value)) } yield response } } Http().bindAndHandle(route, "localhost", 80) }
注:Akka-HTTPのcomplete()
メソッドはFuture[T]
受け入れることができ、このためにfutureMarshaller
インポートされます。 先物の完了後にHTTPリクエストに応答します。
また、HTTP APIを介してデータベースからのすべてのメールに特定のキーで値を送信するタスクを冗談にすることも決定しました。 そして、彼はサイクルでそれをします:配布の完了後、すべてのクライアントは再びそれをやり始めます。
def allEmails: Seq[String] = ... def sendEmails(implicit ec: ExecutionContext): Future[Unit] = { Future.sequence { for { email <- allEmails } yield for { value <- store.get("42") response <- makeHTTPCall(new SendMailRequest(email, value)) } yield response }.flatMap(_ => sendEmails) // }
すべてを運用環境に配置します。 ただし、数日後、APIコンシェルジュが私たちのところに来て、タイムアウトによるサービスの定期的なロールアウトについて苦情を申し立てます。 そして3日後、タスクがメールの送信を停止したことがわかりました! 問題は何ですか?
ログには、次のトレースがあります。
some.package.SomeException at some.package.makeResponse(...) at some.package.$anonfun$makeHTTPCall$1(...) ...
makeResponse()
メソッドが実行をmakeResponse()
したことがmakeResponse()
。 makeHTTPCall()
ソースを見ると、この場合、それが返す先物は決して終わらないことがわかります!
val p = Promise[Response] registerOnCompleteCallback(buffer => { val response = makeResponse(buffer) // p.success(response) // success() }) p.future
これが、タイムアウトによってAPIが落ち、メール送信サイクルが機能しなくなった理由です。 残念ながら、Scalaでは、 多くの人が望むように、どの関数も実行を返すことができると考えずにプログラミングすることはできません ...
そのため、 Try.apply()
メソッドは実行をインターセプトし、値でSuccess
を返すか、例外をスローしてFailure
を返すことができることを思い出してTry.apply()
。 ラムダ本体を単純な方法で修正し、コードにレビューを送信します。
import scala.util._ Try(makeResponse(buffer)) match { case Success(r) => p.success(r) case Failure(e) => p.failure(e) }
ただし、レビューでは、promiseにはcomplete()
メソッドがあり、それ自体が手で書いたものと同じことを行うことがわかります。
p.complete(Try(makeResponse(buffer))
Promise
について学んだこと:
-
Promise
メソッドの最初に宣言され、そのfuturesが最後に返される場合、これはこのfuturesが終了することを意味するものではありません。 -
Promise
を常に閉じなければならないリソースと考えると便利です。 ただし、プロミスは通常、異なるスレッドで宣言されて閉じられるため、標準言語構成(try-finally
)またはライブラリ(scala-arm
)を使用してリソースとして使用することには問題があります。
おそらく誰かが、これは人為的な例であり、実生活では誰も約束を忘れることを忘れないと言うでしょうか? まあ、そのような懐疑論者のために、私はAkkaのいくつかの本当のバグ/ PRの形で答えを持っています。
- QueueSourceは、突然の終了時にonComplete futureを完了しません
- IgnoreSinkは、突然の終了時にmat futureを完了しません
- 完了したキューのオファーを呼び出すと、決して完了できない未来が返されます
さらに、すべてがこの例のように単純で明白であるとは限りません。
理論の最後の部分:俳優への小さな紹介
アクターに精通している人は、このパートをスキップできます。
この記事の後半で、Akkaアクターについて少し知識が必要になります。 akka-actorモジュールをプロジェクトに接続します(SBTの例):
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.7"
Akkaのアクターは、非同期的にメッセージを受信するいくつかの動作を持つオブジェクトです。 デフォルトでは、動作はreceive
メソッドで決定されreceive
。
import akka.actor._ import akka.actor.Actor.Receive val log: Logger = ourLogObject() case object HelloRequest class HelloActor extends Actor { // Receive -- def receive: Receive = { // HelloRequest, "hello!" case HelloRequest => log.info("hello!") } } // val system = ActorSystem() // val actor: ActorRef = system.actorOf(Props(new HelloActor)) // "hello!" actor ! HelloRequest
作成後、アクターは直接アクセスできませんが、 ActorRef
と呼ばれるプロキシを介してアクセスできます。 このプロキシを介して、メソッドを使用して非同期でメッセージを送信できます!
( tell
エイリアス)、これらのメッセージは、アクターの動作を決定するメソッドによって処理されます。 メッセージはシリアライズ可能でなければならないため、多くのcase object
、メッセージ用のcase object
作成されます(メッセージのパラメーターがないcase class
)。 アクターは一度に1つのメッセージしか処理できないため、同期プリミティブとしても使用できます。
もう1つの重要なポイントがあります。アクターはその動作機能を変更できる、つまり実際にメッセージを処理できます。 これを行うには、アクターはcontext.become(newReceive)
メソッドを呼び出す必要がありますnewReceive
はReceive
パラメーターです。 その後、次のメッセージから開始して、デフォルトのreceive
代わりにnewReceive
関数による処理が開始されます。
パターンの各部分を接続する:約束を俳優に渡す
それでは、次の例に移りましょう。
何らかのサービスのクライアントを作成する必要があります。 たとえば、予約用。 idでホテル情報を受信できるようにしたいとします。
case class Hotel(id: Long, name: String, country: String) // trait BookingClient { def getHotel(id: Long): Future[Hotel] }
次に、予約APIにアクセスして応答を処理するメソッドを定義する必要があります。 これを行うには、Akka-HTTPライブラリの非同期HTTPクライアントを使用します。 依存関係で接続します:
libraryDependencies ++= "com.typesafe.akka" %% "akka-http" % "10.0.10"
彼らは、比較的大きなRPSで短時間メソッドを実行したいのですが、応答時間はそれほど重要ではありません。 Akka-HTTPクライアントには特異性があります: akka.http.host-connection-pool.max-connections要求以上の並列実行を許可しません 。 非常に単純なソリューションを使用します。すべてのリクエストがアクターを通過するようにします。つまり、1つのストリームになります(実際のソリューションはもう少し複雑でしたが、これはこの例では重要ではありません)。 先物を返したいので、約束を作成してアクターに渡します。すでにアクターでコミットします。
// implicit system materializer Akka-HTTP; ec Future.foreach() class HttpBookingClient(baseUri: String)(implicit system: ActorSystem, materializer: ActorMaterializer, ec: ExecutionContext) { override def getHotel(id: Long): Future[Hotel] = { val p = Promise[Hotel]() actor ! Request(id, p) p.future } private val actor = system.actorOf(Props(new ClientActor)) private case class Request(id: Long, p: Promise[Hotel]) private case object Completed private class ClientActor extends Actor { // , override def receive: Receive = { case Request(id, p) => val uri = Uri(baseUri).withQuery(Query("id" -> id.toString)) val eventual = Http().singleRequest(HttpRequest(uri = uri)) // API eventual.foreach { response => p.completeWith { val unmarshalled = Unmarshal(response.entity) // 200, response.status match { case StatusCodes.OK => // Unmarshaller[Hotel], , akka-http-spray-json // unmarshalled.to[Hotel] case _ => unmarshalled.to[String].flatMap(error => Future.failed(new Exception(error))) } }) } p.future.onComplete(_ => self ! Completed) // running context.become(running) } // , // private def running: Receive = { case request: Request => // , ; self ! request case Completed => // context.become(receive) } } }
繰り返しになりますが、すべてを出した後、最初はすべてうまくいきましたが、「 getHotel()
Method Returns Incomplete Futures」という見出しのバグレポートがありました。 なぜこれが起こったのですか? 私たち全員が想像したように見えますが、ラムダ本体全体に対してcompleteWith()
メソッドを使用しました...それにもかかわらず、特定の条件下では、先物はまだ残っています。
問題は、 foreach()
メソッドに渡されるラムダは、最終的なeventual
正常に完了したときにのみ開始されることです。 したがって、この先物が失敗した場合(たとえば、ネットが落ちた場合)、約束は決してneverめられません!
修正は比較的簡単であると想定できますonComplete()
代わりにonComplete()
を使用し、渡されたラムダのエラーを処理する必要があります。 このようなもの:
eventual.onComplete { case Success(response) => // , foreach... case Failure(e) => p.failure(e) }
これにより、特にフュージョンのフュージョンによるフューチャー先物の問題が解決されますが、この方法で送信されたフューチャー先物の問題をすべて解決できるわけではありません。
さらなる推論を単純化するために、将来を完了するためにアクターに約束を転送する、より単純な、同時により一般的な例を実装します。
case class Request(arg: String, p: Promise[String]) trait GimmeActor { val actor: ActorRef def function(arg: String): Future[String] = { val p = Promise[String]() actor ! Request(arg, p) p.future } }
ちなみに、ボディfuncion()
の構築は、たとえばAkkaや他のライブラリのソースコードによく見られます。 同じAkkaには、このパターンに従って書かれたPromise
数十の使用法があります。
-
Promise
作成 - 非同期関数への引数の1つとして渡します(Akkaのソースでは、これは多くの場合、単に
actor.tell()
呼び出しです) -
Promise
によって作成されたfuture
フィールドを返します。
明確にするためのいくつかの例:
このパターンの使用には、少なくともいくつかの問題があります。
- 2つのサンプルクラス:
class DoesntEvenKnowAboutRequest extends Actor { def receive: Receive = { case PoisonPill => } } class HandlesRequestWrongWay extends Actor { def receive: Receive = { case Request(arg, p) => } }
どちらの場合も、約束は明らかに完了しません。
一方では、これらの例はあまりにも合成的であると言えます。 一方、コンパイラはこのようなエラーから私たちを保護しません。 さらに、この問題はアクターだけでなく、Promiseを通常の非同期関数に転送する場合にも覚えておく必要があります。
- アクターの実装を正しく記述し、リクエストを受け取ったときに常に約束を完了させます。
class AlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(arg, p) => p.success("42") }
私たちはすべて順調で、このアクターを使用したサービスは完璧に機能します。 ただし、最初に1つのサーバーでサービスを開始することにしました。
しかし、今では会社も成長しており、ユーザーの数も増えています。それに伴い、アクターへのリクエストの数も増えています。 負荷のピーク時に、アクターは必要な時間内にメッセージフローに対処する時間がなくなったため、水平スケーリングを行うことにしました。クラスター内の異なるノードでAlwaysCompletesRequest
を実行します。 クラスターを整理するには、akka-clusterを使用する必要がありますが、簡単にするためにクラスターを整理せず、1つのリモートアクターAlwaysCompletesRequest
見てAlwaysCompletesRequest
。
両方のJVMでakka-remoteをサポートするActorSystem
を作成する必要があります:アクターへのアクセスとホスト。 これを行うにはapplication.conf
両方のサービスのapplication.conf
次の構成を追加します。
akka { actor { provider = remote } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "some-hostname" port = 2552 # ; - } } }
SBTの例のように、両方のサービスにakka-remote依存関係を追加する必要もあります。
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.5.7"
サーバー上でアクターを作成します:
val system = ActorSystem("server") system.actorOf(Props(new AlwaysCompletesRequest), "always-complete-actor")
次に、クライアントで取得します。
val system = ActorSystem("client") val actor = system.actorSelection("akka.tcp://server@some-hostname:2552/user/promise-actor") val gimme = new GimmeActor { override val actor: ActorRef = actor } val future = gimme.function("give me 42, please!")
そして、クライアントとともにサーバーを起動しました...そしてすぐに行き詰まりました。
ログに次の例外があります。
akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.actor.ActorSelectionMessage] using serializer [class akka.remote.serialization.MessageContainerSerializer]. Caused by: java.io.NotSerializableException: scala.concurrent.impl.CallbackRunnable
したがって、リモートアクターにプロミスを送信しようとすると、プロミスのシリアル化エラーが非常に予想どおりに発生しました。 実際、たとえシリアル化して約束を渡すことができたとしても、リモートJVMでジャムになるだけで、JVMではスタックしたままになります。 したがって、アクターにプロミスを転送することはローカルメッセージングでのみ機能します。つまり、アクターにプロミスを送信するこのパターンはうまくスケーリングしません。
パターンの問題を解決するためのオプション
タイムアウト完了
問題を解決する最も明白な方法は、タイムアウト付きでプロミスを提出することです。 Akkaを使用すると、たとえば次のようにこれを行うことができます。
// .seconds import scala.concurrent.duration._ val system: ActorSystem = ... val timeout = 2.seconds // system.scheduler.scheduleOnce(timeout)(p.tryFailure(new SomeTimeoutException))
前に確認したPromise
メソッドは、 Promise
まだ完了しPromise
いない場合にのみ成功します。 これらのメソッドがプロミスの完了後に呼び出されると、それらはIllegalStateException
をスローします。 約束がすでに完了している可能性がある場合に、約束の完了を試みる必要がある場合、考えられるものと似た方法がありますが、名前にtry
プレフィックスがあります。 約束を自分で完了した場合、 true
を返しtrue
。 このメソッドを呼び出す前にプロミスが完了した場合はfalse
。
また、オプションとして、アクター内で直接タイムアウトすることで先物を失敗させることができます。
class AlwaysAlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(str, p) => p.completeWith(someFuture()) context.scheduler.scheduleOnce(timeout)(self ! Timeout(p)) case Timeout(p) => p.tryFailure(new SomeTimeoutException) } }
もちろん、このオプションはスケーラビリティの問題を解決しません。
質問パターン
もちろん、別の方法で行うこともできます。 たとえば、タイムアウトが必要なaskパターンを使用します。
import akka.pattern.ask import akka.util.Timeout case class Request(arg: String) // def function(arg: String): Future[String] = { implicit val timeout = Timeout(2.seconds) val any: Future[Any] = actor ? Request(arg) // timeout ? any.mapTo[String] }
この場合、アクターの実装はわずかに異なる必要があります。約束を完了する代わりに、メッセージに返信する必要があります。
class AlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(arg) => // : sender() ! "42" }
ただし、実装の容易さには危険が伴います。
- 現在、アクターを「終了」する先物はタイプセーフではありません。 間違った型の値でプロミスをクローズしようとすると、コンパイラーは手をつかむことができます。 ただし、
Any
からの型変換の場合は保護されません。 - この議論はやや主観的に見えるかもしれませんが、私はそれに反対することはできません。
tell
を使用した設計tell
、通常、俳優モデルにとって理解しやすいと考えられています。 -
ask
" " , akka-remote, , ""ActorRef
(, remote, ). Akkatell
,ask
, .
Akka Typed
, ask- , , . Akka ask- Future[Any]
. , , :
(actor ? Request(arg)).mapTo[String]
, , String
, . Akka Typed. :
libraryDependencies += "com.typesafe.akka" %% "akka-typed" % "2.5.7"
, :
import akka.typed._ import akka.typed.scaladsl.Actor import akka.typed.scaladsl.AskPattern._ import akka.util.Timeout import scala.concurrent.Future import scala.concurrent.duration._ case class Request(arg: String, replyTo: ActorRef[String]) // val typeSafeActor = Actor.immutable[Request] { (_, msg) => msg.replyTo ! "42" // : Actor.same } val system: ActorSystem[Request] = ActorSystem(typeSafeActor, "type-safe-actor") def function(arg: String): Future[String] = { implicit val timeout: Timeout = Timeout(2.seconds) system ? (Request(arg, _)) }
. , , 2. , - API " ".
, .. ,
, , - , ActorSystem
. :
case class Request(id: Long) case class Response(hotel: Hotel) // API class CallingActor(actor: ActorRef) extends Actor { def receive: Receive = { case response: String => doSomethingWithActorResponse(response) case request: Request => actor ! request // } } // class AlwaysCompletesRequest extends Actor { def receive: Receive = { case Request(arg) => // sender() ! "42" }
, . API , .
,
Akka , . , , , : 100%-, 100%- , , . , , . , , , .
, , , . , . , :
override def receive: Receive = { case Request(id, p) => p.completeWith(doRequest(id)) p.future.onComplete(self ! Completed) context.become(running) } def doRequest(id: Long): Future[Hotel] = { val uri = Uri(baseUri).withQuery(Query("id" -> id.toString)) val eventual = Http().singleRequest(HttpRequest(Uri(uri = uri))) eventual.flatMap { response => val unmarshalled = Unmarshal(response.entity) response.code match { case StatusCodes.OK => unmarshalled.to[Hotel] case _ => Future.failed(new Exception(unmarshalled.to[String])) } } }
:
- ;
- API
flatMap
,onComplete
; - - Akka-HTTP.
, actor.tell()
.
:
- — , JVM.
- ask/tell — Akka, .
- :
+
— ;
± — , , , ; ;
-
— . - — ( ).
- — API, typed ask.
- API — : , , .
ask | typed ask | promise | |||
---|---|---|---|---|---|
- | + | + | + | - | |
ask/tell | tell | ask | ask | tell | tell |
± | - | + | ± | ± | |
+ | + | + | + | - | |
+ | + | - | + | + | |
API | + | + | + | - | + |
, , .
, , , , . , , :
- , ;
- , , , ;
- .
, .
- , !