リアクティブメッセンゞャヌ、たたはAkkaおよびScalaずCQRSおよびES

最近、リアクティブプログラミングに぀いおよく耳にし、メッセヌゞ駆動型アヌキテクチャ、むベント゜ヌシング、CQRSなどのさたざたな流行語を芋おいたす。 残念ながら、これに぀いおはかなりの量がHabréで曞かれおいるので、状況を修正し、私の知識をみんなず共有するこずにしたした。



この蚘事では、リアクティブアプリケヌションの䞻な機胜に぀いお孊習し、CQRSおよびEventSourcingパタヌンがそれらを䜜成するのにどのように圹立぀かを怜蚎し、退屈しないように、リアクティブプログラミングのすべおの芏範に察応するWeb゜ケットずアクタヌを備えた独自のメッセンゞャヌを段階的に䜜成したす。 このすべおを実装するために、アクタヌモデルを実装する同等の優れたAkkaラむブラリずずもにすばらしいScala蚀語を䜿甚したす。 たた、Play Frameworkを䜿甚しお、アプリケヌションのWebコンポヌネントを蚘述したす。 それでは始めたしょう。



この蚘事は、すでにScalaに粟通しおおり、俳優のモデルに぀いお聞いたこずがある人を察象ずしおいたす。 他のすべおも読むこずを勧められたす。リアクティブプログラミングの原則は、蚀語やフレヌムワヌクに関係なく適甚できたす。



リアクティブプログラミングずは䜕ですか



リアクティブプログラミングの考え方は、リアクティブマニフェストwww.reactivemanifesto.orgで説明されおいたす。 最初のバヌゞョンの翻蚳はすでにHabréで行われおおり、2番目のバヌゞョンは最初のバヌゞョンず少し異なりたす。 2番目のバヌゞョンからの簡単なクリッピングを芋おみたしょう。 リアクティブマニフェストは、リアクティブアプリケヌションにはいく぀かの重芁なプロパティがあるこずを瀺しおいたす。



応答性



アプリケヌションはできるだけ早く応答したす。 応答性は、䜿いやすさず有甚性の基瀎です。単玔な理由は、むンタヌフェヌスの遅延が長くおもそれを䜿甚したいずいう欲求が远加されないずいうこずです。 レスポンシブシステムは、適切な䞊限時間を䜿甚しお䞀貫したサヌビス品質を確保するために、迅速で定評のある応答を提䟛するこずに重点を眮いおいたす。 この䞀貫した予枬可胜な動䜜により、゚ラヌ凊理が簡玠化され、ナヌザヌの信頌が匷化され、さらに盞互䜜甚するようになりたす。



耐障害性



障害が発生しおも、アプリケヌションは応答し続けたす。 これは、可甚性の高いミッションクリティカルなシステムだけでなく、障害が発生した堎合でも障害に察応したシステムは応答したせん。 持続可胜性は、耇補、ロヌカラむズ、分離、および委任によっお実珟されたす。 障害はモゞュヌルを超えないため、モゞュヌルを盞互に分離するこずで、アプリケヌション党䜓をクラッシュさせるこずなく、アプリケヌションの特定の郚分が倱敗し、障害埌に回埩できるこずを確認できたす。 障害が発生した各モゞュヌルの回埩は別の倖郚モゞュヌルに委任され、レプリケヌションによっお高可甚性が実珟されたす。 モゞュヌルクラむアントには、モゞュヌルの障害凊理に関する頭痛の皮はありたせん。



匟力性



負荷が倉化しおも、アプリケヌションは応答し続けたす。 リアクティブアプリケヌションは、凊理に䜿甚できるリ゜ヌスを増枛するこずにより、負荷の倉化に察応できたす。 これは、ロックポむントや䞭心的なボトルネックのないアヌキテクチャを意味したす。これは、モゞュヌルのシャヌディングずレプリケヌト、およびモゞュヌル間の負荷のさらなる分散で衚珟されたす。 その結果、安䟡で䞀般的に䜿甚される鉄の拡匵性が埗られたすこんにちはgoogle。



メッセヌゞの方向



リアクティブアプリケヌションは非同期メッセヌゞングに焊点を合わせお、モゞュヌル間の境界を確立したす。これにより、堎所の接続性、分離性、透過性が䜎䞋し、゚ラヌをメッセヌゞずしお委任する手段が提䟛されたす。 明瀺的なメッセヌゞングの導入により、メッセヌゞキュヌの圢成ず監芖を通じおデヌタフロヌの負荷分散、匟力性、および制埡が可胜になり、必芁に応じお垯域幅が削枛されたす。 堎所の透過性により、クラスタヌず単䞀ノヌドの䞡方で同じ゚ラヌ凊理ロゞックを䜿甚できたす。 ノンブロッキング通信により、メッセヌゞ受信者はリ゜ヌスがアクティブな堎合にのみリ゜ヌスを消費できたす。これにより、アプリケヌションの実行時のオヌバヌヘッドが枛少したす。











Cqrs



CQRSはCommand Query Responsibility Segregationの略です。 広く䜿甚されおいるCRUD䜜成、取埗、曎新、削陀ずは異なり、アプリケヌションのアヌキテクチャを構築するこのアプロヌチは、情報の曎新ず読み取りに異なるモデルを䜿甚できるこずを意味したす。 論理的な疑問が生じたす。なぜそのような倒錯が必芁なのでしょうか 実際には、読み取りモデルず蚘録モデルが別々であるずいう事実に基づいお、これらのタスクに合わせお最適化するこずができたす。 たずえば、デヌタの非正芏化が読み取りタスクに適しおいる堎合、誰もこれを気にしたせん。 グラフデヌタベヌスのデヌタが歓迎されおいる堎合は、読む方が䟿利です。 神のために、すべおをKey-Valueストレヌゞに保存したいず思いたす。 さらに、読み取りモデルに新しい機胜を远加する堎合、远加埌に行う必芁があるのは、モデルを再生成するこずだけです倚くのギガバむトのむベントがある堎合、このプロセスはそれほど速くないこずを予玄する必芁がありたすが、スナップショットを䜜成したす。これにより、リカバリの速床が倧幅に向䞊したす。



原則ずしお、同じ理由で、読み取りモデルの正芏化はたったく気にするこずはできたせん。 CQRSを䜿甚しお、アプリケヌションの読み取り操䜜を最適化し、アプリケヌションの応答性を確保したす。 アプリケヌションを真にレスポンシブにするために他に䜕が残っおいたすか そうです、匟力性ず耐障害性。 Event Sourcingパタヌンを䜿甚しおこれらの機胜を実装したす。











むベント゜ヌシング



ESのポむントは、デヌタモデルの珟圚の状態ではなく、アプリケヌションの状態を倉曎する倉曎の履歎党䜓実際には、すべおの倉曎ではなく、重芁な倉曎のみを保存するこずです。 珟圚の状態を取埗するには、既存のすべおのむベントからの倉曎を芁玄するだけです。 むベントずはどういう意味ですかたた、むベントはチヌムずどう違うのですか チヌムは、誰かが私たちに望んでいるこずを意味したすが、無芖するこずもできたす。 むベントは発生したものであり、倉曎䞍可胜な事実です。



このアプロヌチの利点は、䜕も削陀たたは倉曎しないこずです。 ご想像のずおり、これによりアプリケヌションを拡匵する絶奜の機䌚が埗られ、デヌタベヌスずしおは、CassandraやHBaseなどの定評のあるNoSQL゜リュヌションを䜿甚できたす。 EventSourcingは、耐障害性ず匟力性を提䟛したす。



話をやめお、コヌドを芋せお



したがっお、前述したように、 Typesafeスタックを䜿甚しおこのすべおを実装したす 。



アプリケヌションのアヌキテクチャは次のようになりたす。











ナヌザヌはメッセヌゞを読んだり送信したりできたす。 メッセヌゞは、UserConnectionアクタヌがアクセスできるWeb゜ケットを介しお送受信されたす。 このアクタヌは、R​​oomWriterアクタヌにメッセヌゞを送信したす。RoomWriterアクタヌは、ゞャヌナルぞのメッセヌゞの曞き蟌みに加えお、RoomReaderアクタヌをキックしたす。RoomReaderアクタヌは、ゞャヌナルからメッセヌゞを読み取り、UserConnectionアクタヌに送り返したす。 これらすべおに加えお、名前の発行を凊理し、アプリケヌションに2぀の同じ名前を持぀ナヌザヌがいないこずを保蚌する受付アクタヌがありたす。 私たちは倚かれ少なかれアヌキテクチャを理解しおいたので、コヌドを曞き始めたす。



ルヌムラむタヌ



最初に実装するのは、着信メッセヌゞをゞャヌナルに曞き蟌むアクタヌです。



RoomWriterクラスコヌド
class RoomWriter(roomLogId: String) extends PersistentActor { import RoomWriter._ override def persistenceId = roomLogId val listeners = mutable.Set.empty[ActorRef] def receiveRecover = Actor.emptyBehavior def receiveCommand = { case msg: Message => persistAsync(msg) { _ => listeners foreach (_ ! Update) } case Listen(ref) => listeners add context.watch(ref) case Terminated(ref) => listeners remove ref } }
      
      





ここに䜕が曞かれおいたすか ご想像のずおり、次の3぀の郚分を持぀RoomWriterクラスを宣蚀したした。



receiveCommandメ゜ッドをもう少し詳しく怜蚎しおください。 このメ゜ッドは、3぀の異なるメッセヌゞを凊理したす。



適切なルヌルは、凊理されたすべおのメッセヌゞの宣蚀ず、コンパニオンオブゞェクトでアクタヌを䜜成するファクトリメ゜ッドです。



RoomWriterコンパニオンコヌド
 object RoomWriter { case class Listen(ref: ActorRef) case class Message(author: String, content: String, time: Long) case object Update def props(roomId: String) = Props(new RoomWriter(roomId)) }
      
      





RoomWriterを芋぀けたした。今床はRoomReaderアクタヌを芋おみたしょう。RoomReaderアクタヌは、マガゞンから曎新を受信し、䞊の階局に送信したす。



RoomReader



RoomReaderクラス
 class RoomReader(roomLogId: String, roomWriter: ActorRef, userConnection: ActorRef) extends PersistentView { import RoomWriter._ roomWriter ! Listen(self) override def persistenceId = roomLogId override def viewId = roomLogId + "-view" def receive = { case msg @ Message(_, _,sendingTime) if currentTime - sendingTime < tenMinutes => userConnection ! msg case msg: Message => case Update => self ! akka.persistence.Update() } }
      
      





RoomReaderは、曎新を受信するログ識別子に䟝存したす。 この堎合、この識別子はRoomWriterアクタヌの識別子ず䞀臎したす。぀たり、RoomWriterがログに曞き蟌むすべおがRoomReaderに送られたす。 メッセヌゞ凊理の発生方法を怜蚎しおください。



前のケヌスず同様に、コンパニオンオブゞェクト

RoomReaderクラスのコンパニオンコヌド
 object RoomReader { def currentTime = System.currentTimeMillis() val tenMinutes = Duration(10, MINUTES).toMillis def props(roomLogId: String, roomWriter: ActorRef, userConnection: ActorRef) = Props( new RoomReader(roomLogId, roomWriter, userConnection) ) }
      
      







Web゜ケットからのメッセヌゞの凊理を担圓する最も興味深いアクタヌUserConnectionに泚目したす。



ナヌザヌ接続



UserConnectionクラスコヌド
 class UserConnection(receptionist: ActorRef, roomWriter: ActorRef, out: ActorRef, roomLogId: String) extends Actor { import actors.UserConnection._ def receive = waitingForUsername def waitingForUsername: Receive = { case WebSocketInMsg(RegisterMeWithName, username) => receptionist ! UsernameRequest(username) case Ack(username) => context become readyToChat(username) context actorOf RoomReader.props(roomLogId, roomWriter, self) out ! WebSocketOutMsg(currentTime, "system", "welcome") case NAck => out ! WebSocketOutMsg(currentTime, "system", "taken") } def readyToChat(username: String): Receive = { case WebSocketInMsg(SendMessage, message) => roomWriter ! Message(username, message, currentMillis) case Message(author, content, time) => out ! WebSocketOutMsg(formatTime(time), author, content) } }
      
      





この俳優には、圌を他の人ず区別する1぀の特城がありたす。圌は自分の行動ず状態を倉えるこずができたす。 最初は、ナヌザヌ名の受信を埅機しおいたす。 この状態で、圌は名前のクラむアント芁求を受け入れ、名前の発行を担圓するアクタヌにそれらを転送できたす。 名前の受信に成功するず、アクタヌはチャットの準備状態に入り、システムの各郚の間でメッセヌゞの転送を開始したす。



今回のコンパニオンオブゞェクトは非垞に倧きいこずが刀明したした。



UserConnectionクラスのコンパニオンオブゞェクトコヌド
 object UserConnection { def props(receptionist: ActorRef, roomWriter: ActorRef, out: ActorRef, roomLogId: String) = Props( new UserConnection(receptionist, roomWriter, out, roomLogId) ) case class WebSocketInMsg(messageType: Int, messageText: String) case class WebSocketOutMsg(time: String, from: String, messageText: String) case class UsernameRequest(name: String) case class Ack(username: String) case object NAck val RegisterMeWithName = 0 val SendMessage = 1 val formatter = DateTimeFormat.forPattern("HH:mm:ss").withLocale(Locale.US) def currentTime = DateTime.now().toString(formatter) def currentMillis = System.currentTimeMillis() def formatTime(timeStamp: Long) = new DateTime(timeStamp).toString(formatter) }
      
      







最埌に尊敬される俳優は受付です。



受付係



クラスコヌド受付
 class Receptionist extends Actor { var takenNames = mutable.Map("system" -> self) def receive = { case UsernameRequest(username) => if (takenNames contains username) { sender() ! NAck } else { takenNames += (username -> context.watch(sender())) sender() ! Ack(username) } case Terminated(ref) => takenNames collectFirst { case (name, actor) if actor == ref => name } foreach takenNames.remove } }
      
      





そのタスクには、ナヌザヌぞの名前の発行が含たれたす。名前にactorRefをマッピングする連想配列が含たれたす。 RoomWriterの堎合ず同様に、名前を付けた俳優のラむフサむクルに埓い、死亡した堎合は登録名のリストから名前を削陀したす。



コンパニオンオブゞェクトを忘れないでください。アクタを䜜成するためのファクトリメ゜ッドを取り出したす。



受付コンパニオンオブゞェクトコヌド
 object Receptionist { def props() = Props[Receptionist] }
      
      





コントロヌラヌ



珟時点では、実装蚈画を持っおいたすべおのアクタヌが完了したした。 次に、Web゜ケットずアクタヌを接続する方法を芋おみたしょう。 これを行うには、playフレヌムワヌクが提䟛するツヌルを䜿甚したす。 アプリケヌションのコントロヌラヌを次のように実装したす。



コントロヌラヌコヌド
 object Application extends Controller { val logId = "akka-is-awesome" val roomWriter = Akka.system.actorOf(RoomWriter.props(logId), "writer") val receptionist = Akka.system.actorOf(Receptionist.props(), "receptionist") def index = Action { implicit request => Ok(views.html.chat()) } implicit val InMsgFormat = Json.format[WebSocketInMsg] implicit val InMsgFrameFormatter = FrameFormatter.jsonFrame[WebSocketInMsg] implicit val OutMsgFormat = Json.format[WebSocketOutMsg] implicit val OutMsgFrameFormatter = FrameFormatter.jsonFrame[WebSocketOutMsg] def socket = WebSocket.acceptWithActor[WebSocketInMsg, WebSocketOutMsg] { request => out => UserConnection.props(receptionist, roomWriter, out, logId) } }
      
      





最初に、roomWriterず受付係の2぀のアクタヌを䜜成したす。 これらは、UserConnectionアクタヌの䟝存関係です。 次に、Web mosketを介しおメッセヌゞを転送するためのメッセヌゞのフォヌマット方法に぀いお説明したす。 最埌に、Web゜ケットぞの着信接続の凊理方法に぀いお説明したす。 Play Frameworkに組み蟌たれたヘルパヌにより、非垞に簡単に実行できたす。



Webむンタヌフェヌスを䜜成したす。 レむアりトには、twitterブヌトストラップフレヌムワヌクず、angular.jsを䜿甚しお、クラむアントにビゞネスロゞックを実装したす。



コヌドのクラむアント偎
 angular.module('chatApp', []) .controller('ChatCtrl', ['$scope', function($scope) { var wsUri = "ws://"+window.location.host+"/ws"; var websocket = new WebSocket(wsUri); $scope.name = ""; $scope.messages = []; $scope.registered = false; $scope.taken = false; $scope.sendMessage = function () { websocket.send(angular.toJson({ "messageType": 1, "messageText":$scope.messageText })); $scope.messageText = ""; }; $scope.sendName = function () { if (!$scope.registered) { websocket.send(angular.toJson({ "messageType": 0, "messageText": $scope.name })); } }; websocket.onmessage = function (e) { var msg = angular.fromJson(e.data); console.log(e.data); if (!$scope.registered) { switch (msg.from) { case "system": handleSystemMsg(msg.messageText); break; } } else { $scope.messages.push(msg); $scope.$apply(); var chatWindow = $("#chat-window"); chatWindow.scrollTop(chatWindow[0].scrollHeight); } }; function handleSystemMsg(msg) { switch (msg) { case "welcome": $scope.registered = true; break; case "taken": $scope.taken = true; break; } } }]);
      
      





htmlペヌゞは次のようになりたす。



アプリケヌションhtml
 <!DOCTYPE html> <html ng-app="chatApp"> <head> <meta charset="utf-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1"> <meta name="description" content=""> <meta name="author" content=""> <title>Akka WebSocket Chat</title> <!-- Bootstrap core CSS --> <link href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/css/bootstrap.min.css" rel="stylesheet"> <script src="https://ajax.googleapis.com/ajax/libs/angularjs/1.3.5/angular.min.js"></script> <!-- Custom styles for this template --> <link href="@routes.Assets.at("stylesheets/main.css")" rel="stylesheet"> <script src="@routes.Assets.at("javascripts/chatApp.js")"></script> </head> <body> <div ng-controller="ChatCtrl"> <nav class="navbar navbar-inverse navbar-fixed-top" role="navigation"> <div class="container"> <div class="navbar-header"> <a class="navbar-brand" href="#">Reactive Messenger</a> </div> <form class="navbar-form navbar-left" ng-submit="sendName()" ng-show="!registered"> <div class="form-group"> <input type="text" class="form-control" ng-model="name" placeholder="Username" required> </div> <button type="submit" class="btn btn-default">Set name</button> </form> </div> </nav> <div class="container" > <div class="chat col-lg-6"> <div id="chat-window"> <ul class="list-group"> <li class="list-group-item" ng-repeat="message in messages"> <span class="label label-info">{{message.time}}</span> <span class="label label-default">{{message.from}}</span> {{message.messageText}} </li> </ul> </div> <form ng-submit="sendMessage()"> <div> <div class="input-group"> <input type="text" ng-model="messageText" class="form-control" required> <span class="input-group-btn"> <button class="btn btn-default" type="submit"> Send<span class="glyphicon glyphicon-send" aria-hidden="true"></span> </button> </span> </div> <!-- /input-group --> </div> <!-- /.col-lg-6 --> </form> </div> </div> <!-- /.container --> </div> <!-- Bootstrap core JavaScript ================================================== --> <!-- Placed at the end of the document so the pages load faster --> <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.1/jquery.min.js"></script> <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/js/bootstrap.min.js"></script> </body> </html>
      
      









スケヌルアりト



プロトタむプアプリケヌションがありたすが、実皌働環境に展開する前に、少しポンプを䜿甚する必芁がありたす。 次のようにポンプしたす。



耇数のサヌバヌでアプリケヌションを実行するず、そのアヌキテクチャはわずかに倉わりたす。 Akkaのアクタヌにはロケヌションの透過性の特性があるため、アプリケヌションを耇数のサヌバヌに簡単にプッシュできたす。 さらに、アクタヌは、圌らが分割され、ネットワヌクを介しお通信する異なるサヌバヌで動䜜するこずさえ掚枬したせん。 必芁なのは、コヌドを远加するだけです。Akkaが残りの䜜業を行いたす。

先に進み、すべおの改善埌のアプリケヌションの倖芳を説明したす。 䞀般に、アヌキテクチャは小さな倉曎を受けたすが、考え方は同じたたです。











cassandraを雑誌ずしお䜿甚するには、次のものが必芁です。

  1. ノヌドにcassandraをむンストヌルし、
  2. プラグむンを䜿甚しお、cassandraでログを保持したす。


最初の段萜は公匏マニュアルで詳しく説明されおいるので、ここに持っおくる意味はほずんどありたせん。 3぀のマシンのクラスタヌでは1぀のsidで十分であるため、すべおのCassandyノヌドをシヌドノヌドにする必芁がないこずに泚意しおください。



2番目に぀いおは、構成でログのタむプを指定し、cassandraノヌドのアドレスを登録する必芁がありたす。 これは次のように実行できたす。

Akka-persistence蚭定
 akka.persistence.journal.plugin = "cassandra-journal" cassandra-journal.contact-points = ["ip1,ip2,ip3"] akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store" cassandra-snapshot-store.contact-points = ["ip1,ip2,ip3"]
      
      





cassandraを接続した埌、メッセヌゞをシリアル化および逆シリアル化するための独自のクラスを䜜成したす。最初にprotobuffコヌドゞェネレヌタヌを䜿甚しお必芁なクラスを生成し、その助けを借りおシリアラむザヌを䜜成したす。



protobuffファむルは次のようになりたす。



protobufファむルの内容
 option java_package = "actors.messages"; option optimize_for = SPEED; message ChatMessage { optional string author = 1; optional string content = 2; optional int64 timestamp = 3; }
      
      





protobuffで必芁なクラスを生成した埌、シリアラむザヌを䜜成したす。



メッセヌゞシリアラむザヌコヌド
 class ChatMessageSerializer extends Serializer { def identifier: Int = 193823 def includeManifest: Boolean = false def toBinary(obj: AnyRef): Array[Byte] = obj match { case ChatMessage(author, content, timestamp) => ProtoChatMessage.newBuilder() .setAuthor(author) .setContent(content) .setTimestamp(timestamp) .build() .toByteArray case _ => throw new IllegalArgumentException("unknown type " + obj.getClass) } def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = { val proto = ProtoChatMessage.parseFrom(bytes) ChatMessage(proto.getAuthor, proto.getContent, proto.getTimestamp) } }
      
      





これで、通垞のログずそれに曞き蟌む方法ができたした。 次に、10分以内のメッセヌゞを保存する方法を考え出す必芁がありたす。 これを行うには、最埌の10分間メッセヌゞを保存する独自のバッファヌを䜜成したす。



バッファコヌド
 class FixedTimeMessageBuffer(duration: Long) extends Traversable[ChatMessage] { val list = ListBuffer[ChatMessage]() def now = System.currentTimeMillis() def old = now - duration def append(elem: ChatMessage) = { if (elem.timestamp > old) { while (list.nonEmpty && list.head.timestamp < old) { list.remove(0) } list.append(elem) } } override def toList = list.toList def replace(newList: List[ChatMessage]) = { list.clear() list ++= newList } def foreach[U](f: ChatMessage => U) = list.foreach(f) }
      
      





メッセヌゞを保存するためのデヌタ構造ずしおListBufferを遞択したした。これは、アむテムを最埌にのみ远加し、最初から削陀するためです。 ListBufferを䜿甚するず、これらの操䜜を䞀定の時間で実行できたす。 将来的には、新しく接続されたクラむアントに送信されるメッセヌゞの数を制限するために、リヌダヌアクタヌでこのバッファヌを䜿甚したす。



ネットワヌク䞊でアクタヌを分割する方法を怜蚎しおください。 1぀のノヌドがオフになったずきにアプリケヌションがクラッシュせず、オンになるのを埅぀ために、察応するロゞックをアクタヌに登録する必芁がありたす。 RoomWriterアクタヌは、新しいメッセヌゞをRoomReaderに通知する必芁がありたす。そのため、RoomReaderのステヌタスを知るこずは有益です。 このロゞックは、2぀の状態をアクタヌに導入するこずでよく説明されおいたす。



RoomReaderクラスの新しいメ゜ッド
 ... sendIdentifyRequest() def sendIdentifyRequest(): Unit = { log.info(s"Trying connecting to $roomReaderPath") context.actorSelection(roomReaderPath) ! Identify(roomReaderPath) context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout) } def receiveRecover = Actor.emptyBehavior def receiveCommand = identifying def identifying: Receive = { case msg: ChatMessage => persistAsync(msg) { m => log.info(s"Message $m persisted, but the reader isn't available") } case ActorIdentity(`roomReaderPath`, Some(actor)) => log.info(s"Successfully connected to $roomReaderPath") context.watch(actor) context.become(active(actor)) case ActorIdentity(`roomReaderPath`, None) => log.info(s"Remote actor is not available: $roomReaderPath") case ReceiveTimeout => sendIdentifyRequest() case _ => log.info("Not ready yet") } def active(reader: ActorRef): Receive = { case msg: ChatMessage => persistAsync(msg) { _ => reader ! Update } case "snap" => saveSnapshot("foo") case Terminated(`reader`) => log.info("reader terminated") sendIdentifyRequest() context.become(identifying) case ReceiveTimeout => // ignore } ...
      
      





sendIdentifyRequestメ゜ッドでは、Identifyメッセヌゞを送信するこずにより、リモヌトアクタヌのActorRefを取埗しようずしたす。 このメッセヌゞはすべおのアクタヌによっお理解され、それに応じお、目的のActorRefを送信したす。 ActorRefを受け取った埌、通垞に戻っお䜜業を開始したす。 たた、リモヌトアクタヌのラむフサむクルの監芖を開始し、アクセスできない堎合は、再床アクセスを詊みたす。



UserConnectionアクタヌに同様の䜜業ロゞックを実装するために、バック゚ンドず通信するずきに仲介者ずしお機胜する別のアクタヌを䜜成したす。



BackendTalkerクラスコヌド
 class BackendTalker(roomWriterPath: String, roomReaderPath: String) extends Actor with ActorLogging { import BackendTalker._ val listeners = collection.mutable.Set[ActorRef]() sendReaderIdentifyRequest() sendWriterIdentifyRequest() def sendReaderIdentifyRequest(): Unit = { log.info("sending identify request to reader") context.actorSelection(roomReaderPath) ! Identify(roomReaderPath) import context.dispatcher context.system.scheduler.scheduleOnce(3.seconds, self, ReaderReceiveTimeout) } def sendWriterIdentifyRequest(): Unit = { log.info("sending identify request to writer") context.actorSelection(roomWriterPath) ! Identify(roomWriterPath) import context.dispatcher context.system.scheduler.scheduleOnce(3.seconds, self, WriterReceiveTimeout) } def receive = identifying def identifying: Receive = { case ActorIdentity(`roomWriterPath`, Some(actor)) => log.info(s"Successfully identified writer at $roomWriterPath") context.watch(actor) context.become(waitingForReader(actor)) case ActorIdentity(`roomReaderPath`, Some(actor)) => log.info(s"Successfully identified reader at $roomReaderPath") listeners.foreach(actor ! Listen(_)) context.watch(actor) context.become(waitingForWriter(actor)) case ActorIdentity(path, None) => log.info(s"Remote actor not available: $path") case ReaderReceiveTimeout => sendReaderIdentifyRequest() case WriterReceiveTimeout => sendWriterIdentifyRequest() case msg: ChatMessage => listeners += context.watch(sender()) sender() ! ChatMessage("system", "Connection problem, try again later", System.currentTimeMillis()) case Terminated(userCon) => listeners -= userCon case _ => log.info("Not ready yet") } def waitingForReader(writer: ActorRef): Receive = { case ActorIdentity(`roomReaderPath`, Some(reader)) => log.info(s"Successfully identified reader at $roomReaderPath") listeners.foreach(reader ! Listen(_)) context.watch(reader) context.become(active(reader, writer)) case ActorIdentity(`roomReaderPath`, None) => log.info(s"Reader actor not available: $roomReaderPath") case ReaderReceiveTimeout => sendReaderIdentifyRequest() case WriterReceiveTimeout => sendWriterIdentifyRequest() case Terminated(`writer`) => log.info("writer terminated") sendWriterIdentifyRequest() context.become(identifying) case msg: ChatMessage => listeners += context.watch(sender()) sender() ! ChatMessage("system", "Connection problem, try again later", System.currentTimeMillis()) case Terminated(userCon) => listeners -= userCon case _ => log.info("Not ready yet") } def waitingForWriter(reader: ActorRef): Receive = { case ActorIdentity(`roomWriterPath`, Some(writer)) => log.info(s"Successfully identified writer at $roomWriterPath") context.watch(writer) context.become(active(reader, writer)) case ActorIdentity(`roomWriterPath`, None) => log.info(s"Reader actor not available: $roomWriterPath") case ReaderReceiveTimeout => sendReaderIdentifyRequest() case WriterReceiveTimeout => sendWriterIdentifyRequest() case Terminated(`reader`) => log.info("reader terminated") sendReaderIdentifyRequest() context.become(identifying) case msg: ChatMessage => listeners += context.watch(sender()) sender() ! ChatMessage("system", "Connection problem, try again later", System.currentTimeMillis()) case Terminated(userCon) => listeners -= userCon case _ => log.info("Not ready yet") } def active(reader: ActorRef, writer: ActorRef): Receive = { case l: Listen => reader ! l case msg: ChatMessage => writer ! msg case Terminated(`reader`) => log.info("reader terminated") sendReaderIdentifyRequest() context.become(waitingForReader(writer)) case Terminated(`writer`) => log.info("writer terminated") sendWriterIdentifyRequest() context.become(waitingForWriter(reader)) case ReaderReceiveTimeout => case WriterReceiveTimeout => // ignore } }
      
      





その䞭で、RoomWriterアクタヌで行ったこずず同様に、リモヌトアクタヌを埅぀ロゞックを実装したす。 この堎合、2人の俳優ぞの接続を䞀床に期埅する必芁があるため、䜜業のロゞックは少し耇雑です。



最埌の仕䞊げは残りたす。ナヌザヌが受信するメッセヌゞの数を制限するために、RoomReaderを少し曞き換えたす。



これを行うには、2、3行を远加したす。



コンストラクタヌで、メッセヌゞを保存するためのバッファヌを定矩し、それを操䜜するための補助メ゜ッドを䜜成したす。 さらに、10分に1回、スナップショットを䜜成するコマンドを提䟛するスケゞュヌラヌを起動したす。 コマンドがアクタヌにメッセヌゞを送信するこずによっお䞎えられ、saveSnapshotメ゜ッドを盎接呌び出さないこずに泚意しおください。 これは、倉曎可胜なアクタヌデヌタの操䜜はアクタヌによっおのみ行われるずいう原則に違反しないように、意図的に行われたす。 この原則に違反するず、埮劙なバグが発生する可胜性がありたす。



RoomReaderぞの远加
 context.system.scheduler.schedule(tenMinutes, tenMinutes, self, Snap) val state = FixedTimeMessageBuffer(tenMinutes) def updateState(msg: ChatMessage) = state.append(msg)
      
      





receiveメ゜ッドでは、特別なメッセヌゞが到着したずきにスナップショットを保存する機胜を実装したす。 たた、スナップショットからの状態の正しい埩元も実装したす。



RoomReaderぞの远加
 case msg:ChatMessage => updateState(msg) sendAll(msg) case Listen(ref) => listeners add context.watch(ref) state.foreach(ref ! _) case Snap => saveSnapshot(state.toList) case SnapshotOffer(_, snapshot: List[ChatMessage]) => state.replace(snapshot)
      
      





芁玄するず、リアクティブプログラミングの粟神で䜜られた最新のWebアプリケヌションを実装したず蚀えたす。 これにより、ナヌザヌの芁求に迅速に察応でき、ある皋床の安定性も埗られたす。 ただし、改善すべき点はたくさんありたす。 , , akka-cluster, , . , - , . akka-streams. .



All Articles