RxJavaを使用したリアクティブストリームログ処理-パート1

画像

RxJavaによるリアクティブログストリーム処理-パートl







以前の投稿で、著者はELKスタックを使用してログを収集するケースを検討しました。

マイクロサービスとアプリケーションのコンテナ化への動きを考慮すると、ログとそのストレージの集中処理が事実上の標準になりつつあります。







次のステップに進んで、受け取った情報をより積極的に使用して、多くの問題が発生するずっと前に原因を見つける必要があるかもしれません。







脚注-この翻訳のストリームとデータストリームは交換可能な単語です。 また、ログという言葉はログを意味する場合がありますが、ほとんどの場合、テキストでは別の意味を使用しています







イベントログをシステムでリアルタイムで発生しているデータストリームと見なす場合、データリアルタイムで使用可能なすべてのオプションを分析することは非常に興味深いでしょう。たとえば、さまざまな情報ストリームを直接集約して不正な動作を検出する「攻撃」中、攻撃者を即座にブロックします。 「従来」データログを収集し、インシデント後に調査するのではありません。







または別の例では、特定のタイプのイベントに対応するイベントのみをフィルター処理( フィルター )し、userIDとして共通キーでグループ化( グループ化 )し、時間ウィンドウで合計数を計算し、ユーザーが特定のイベントで実行するこのタイプのイベント数を取得します期間。







failedLogStream() .window(5,TimeUnit.SECONDS) .flatMap(window -> window .groupBy(propertyStringValue("remoteIP")) .flatMap(grouped -> grouped .count() .map( failedLoginsCount -> { final String remoteIp = grouped.getKey(); return new Pair<>(remoteIp, failedLoginsCount); })) ) .filter(pair -> pair.get > 10) .forEach(System.out::println);
      
      





他のシステムで要求を開始し、その応答をデータストリームとして処理できます。これをサブスクライブし、 リアクティブストリームフレームワークで提示されるストリーム (データストリーム)を処理するために使い慣れた演算子を使用できます。







新しい開発パラダイムを学ぶ



ストリームのリアクティブプログラミングが何であるかを理解しておくといいでしょう。このため、 Kafka Streams SparkFlinkなどの大きなものをデプロイする必要はありません。







リアクティブプログラミングはノンブロッキング イベントであり、カウンターロード(メーカーからのデータ量が消費者が受信したデータ量を超えないフィードバックメカニズム)を備えた少数のスレッドでもスケーリングするアプリケーションです。







Spring5がもたらす最大のトピックは、 Jetプログラミングのサポートです 。 新しいspring-web-reactiveモジュールspring-web-mvcに似たフレームワークで、RESTサービスとリアクティブWebクライアントの非同期(非ブロッキング)応答を送信できます。これは、このソリューションをマイクロサービスアーキテクチャに使用できる可能性を示唆しています。 リアクティブストリームの概念は、Springに固有ではありません。これは、ほとんどのリアクティブフレームワークで合意されたリアクティブストリーム-jvmという共通の仕様があるためです(当面は、同じ名前はないかもしれませんが、フレームワークの代替になるほどコンセプトはシンプルである必要があります)。







歴史的に、 ジェットストリームモデルはRx.NETによって導入され、Netflixを使用してRxJavaと呼ばれるjavaに移植されました。 同時に、この概念はReactive EXtensionsと呼ばれる他の言語でも正常に実装されました。 それ以来、企業はジェットストリームの仕様と同じ方向に向かっています。 RxJavaはパイオニアであったため、大幅なリファクタリング (コードの書き直し)が必要です。したがって、バージョン2.xは仕様により適しています。Springリアクタはまだ新しいものですが、会社が仕様に従って実装を書き直すことは難しくありません。 それらがどのように関連しているかについてもっと読むことをお勧めします。







Doug Leaは、java.util.concurrent.Flowオブジェクトにジェットストリームを含めたいと言いました。これは、ジェットストリームがJava 9の一部として配信されることを意味します。







パフォーマンスの利点



また、別の流行語は、多くの異なるサービスを要求する必須の機能を持つマイクロサービスアーキテクチャです。 理想的には、完全な応答が次の要求を完了するのを待たずに、非ブロッキング要求を実行することが最善です。 サービスが結果の大きなリストを返す瞬間を待つ代わりに、最初のフラグメントを受信したときに別のシステムに新しいリクエストを送信する価値があると考えてください。







ブロックシない







リモート要求からの応答をストリーム(ストリームデータストリーム)、応答を受信したときにアクション(アクション)を起動するサブスクリプションと見なす場合、応答を待機しているストリームをブロックする代わりに、一般的に少数のストリームを使用できます。 、リソースのコストを削減します(たとえば、スレッドのスタックごとにスレッドとメモリ間でコンテキストを切り替えるためのプロセッサ時間)。







したがって、リアクティブプログラミングを使用すると、標準のハードウェアで通常より多くのイベントログを処理できます。







例:Gmailなどのサービスでは、ユーザーのメールを表示する必要があります。 ただし、メールには多くの人が写っている可能性があります(CC)。 連絡先にいるユーザーに写真を表示するのは楽しいでしょう。つまり、REST-ContactServiceを呼び出します。







次のようになります。







 Future<List<Mail>> emailsFuture = mailstoreService.getUnreadEmails(); List<Mail> emails = emailsFuture.get(); //   //  ,        //    ,      ? Future<List<Contacts>> contacts = getContactsForEmails(emails); for(Mail mail : emails) { streamRenderEmails(mail, contacts); //push() emails  }
      
      





CompletableFutureを使用したJava 8のリアクティブプログラミング(thenCompose、thenCombine、thenAccept、さらに50個のメソッドを使用)で問題の一部が解決されましたが、これはそれらが行うすべてを覚えておく必要があるという事実を否定するものではありませんが、これは読むのには役立ちませんコード)。







 CompletableFuture<List<Mail>> emailsFuture = mailstoreService.getUnreadEmails(); CompletableFuture<List<Contact>> emailsFuture .thenCompose(emails -> getContactsForEmails(emails)) //     List<Mail> .thenAccept(emailsContactsPair -> streamRenderEmails(emailsContactsPair.getKey(), emailsContactsPair.getValue()))
      
      





Listの代わりにIteratorに切り替えることができ、同時に新しい値が表示されたときにアクションを実行するように指示するメソッドはありません。 SQLには、そのような可能性があります。たとえば、すべてのデータをメモリにロードする代わりに、ResultSet(rs.next()を実行できます)です。







 public interface Iterator<E> { /** *  {@code true},     . */ boolean hasNext(); /** *    . */ E next(); }
      
      





しかし、「新しい意味がありますか?」と常に尋ねる必要があります。







 Iterable<Mail> emails = mailstoreService.getUnreadEmails(); Iterator<Mail> emailsIt = emails.iterator(); while(emailsIt.hasNext()) { Mail mail = emailsIt.next(); //            if(mail != null) { .... } }
      
      





必要なのは、リアクティブイテレータです。これは、新しい値が受信されるとすぐにサブスクライブしてアクションを実行できるタイプのデータです。 リアクティブストリームプログラミングはここから始まります。







それでは、ストリームとは何ですか?



すべてがストリームです







ストリームは、単に時間順に並べられたイベントのシーケンスですイベント XはイベントYの後に発生するため、 イベントは互いに競合しません )。







ストリームは、 0..Nイベントと2つの端末操作のいずれかを生成するようにモデル化されてます。









これを「 大理石の図表 」を使用して視覚的に説明できます。







オブザーバブルの大理石図







したがって、ストリームは単なるイベントログではなく、すべてであると想像できます。 単一の値であっても、値を解放するストリームとそれに続く完了イベントとして表現できます







無限ストリーム-イベントを発行しますが、単一の端末イベントはありません(完了|エラー)。







RxJavaは、タイプのストリームイベントをモデル化するためのObservableデータタイプを定義します。 Spring Reactorでは、 Fluxタイプと同等です。





観測可能なのは、さまざまな間隔で取られた温度の流れです。





Observableは、Webストアから購入した製品のストリームです。





Observableは、データベースへの要求に応じて返された1人のユーザー(ユーザー)を表します。





  public Observable<User> findByUserId(String userId) {...} //  Single    public Single<User> findByUserId(String userId) {...}
      
      





ただし、 Observableは単なるデータ型であるため、パブリッシュ/サブスクライバーのデザインパターンと同様に、3種類のイベントを処理するサブスクライバー(サブスクライバー)が必要です。





  Observable<CartItem> cartItemsStream = ...; Subscriber<CartItem> subscriber = new Subscriber<CartItem>() { @Override public void onNext(CartItem cartItem) { System.out.println("Cart Item added " + cartItem); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.printStackTrace(); } }; cartItemsStream.subscribe(subscriber);
      
      





リアクティブオペレーター



ただし、これはStream-aの一部に過ぎず、これまでのところ、従来のObserverデザインパターンのみを使用して、異常なものは使用していません。







リアクティブ部分は、ストリームがイベントを発生させたときに実行されるいくつかの関数(演算子-関数)を定義できることを意味します。







これは、別のストリーム( 不変のストリーム)が作成され、別のストリームを作成できることを意味します。







 Observable<CartItem> filteredCartStream = cartStream.filter(new Func1<CartItem, Boolean>() { @Override public Boolean call(CartItem cartItem) { return cartItem.isLaptop(); } }); Observable<Long> laptopCartItemsPriceStream = filteredCartStream.map(new Func1<CartItem, Long>() { @Override public Long call(CartItem cartItem) { try { return priceService.getPrice(cartItem.getId()); } catch(PriceServiceException e) { thrown new RuntimeException(e); } } });
      
      





Observableクラス(filter、map、groupBy、...)の演算子(メソッド)はObservableを返すため、演算子のチェーンを使用してラムダ構文と組み合わせて美しいものを書くことができます。







 Observable<BigDecimal> priceStream = cartStream .filter((cartItem) -> cartItem.isLaptop()). .map((laptop) -> { try { return priceService.getPrice(cartItem.getId()); } catch(PriceServiceException e) { thrown new RuntimeException(e); } });
      
      





上記では、 priceStream



が作成されても何も起こらないことに注意してくださいpriceService.getPrice()



、ステートメントのチェーンを通過する要素があるまで呼び出されません。 これは、rx演算子を使用して計画の外観を作成したことを意味します。管理されたデータがチェーンを下る方法(署名が記録されます)。







リアクティブプログラミングの説明を求められたとき、彼らは通常、Excelシートで冗談めかして例を示します。そこでは、セルが更新されるときに呼び出される数式が列に書き込まれ、それが順番に別のセルを更新し、そのセルがチェーン内の別のセルを更新します。







何もしないrx-operatorのように、これらの式は単にデータを制御するだけであり、それぞれがデータが連鎖するまで何かをするチャンスを得ます。







イベントがオペレーターのチェーンとともにどのように移動するかをよりよく理解するために、ある家から別の家に移動する例では、ムーバーはオペレーターとして機能し、あなたの家からのものが移動する-Thomas Nildが言ったように、便利なアナロジーを見つけました。







彼のサンプルコードは次のとおりです。







 Observable<Item> mover1 = Observable.create(s -> { while (house.hasItems()) { s.onNext(house.getItem()); } s.onCompleted(); }); Observable<Item> mover2 = mover1.map(item -> putInBox(item)); Subscription mover3 = mover2.subscribe(box -> putInTruck(box), () -> closeTruck()); //    OnCompleted()
      
      











「ローダー1は、 Observable



ソースの一方です。家から物を取り出すことで異常値を作成しますonNext()



map()



操作を実行するonNext()



メソッドでローダー2を呼び出しますonNext()



メソッドが呼び出されると、そして、それをボックスに転送します。それから、マシンにボックスをロードするonNext()



メソッドを使用して、最後のSubscriber



(サブスクライバー)であるローダー3を呼び出します。







RxJavaの魔法は利用可能な演算子の大きなセットであり、あなたの仕事はそれらをすべて組み合わせてデータの流れを制御することです。













多くのStreamオペレーターは、R​​eactiveXフレームワーク(Reactive Extensions)の中から一般的な言語(RxJava、RxJS、Rx.NETなど)で実装できるストリームで実行されるアクションを示す用語集を作成するのに役立ちます。







Spring Reactorなどのリアクティブストリームを操作するためにさまざまなフレームワークを使用する場合でも、これらの概念を知っておく必要があります(これらのフレームワークに共通の演算子があることを期待して)。







これまで、フィルタリングなどの単純な演算子のみを見てきました。







**フィルター**







フィルター条件に該当する要素のみをスキップします(1つのローダーは、すべてをすぐに別のローダーに転送するのではなく、100ドル未満のものを転送します)







ただし、ストリームを多くの個別のストリームに分割できる演算子がありますgroupBy



Observable<Observable<T>>



(ストリームストリーム)-これらはgroupBy



などの演算子groupBy









> ** **によるぐるープ化







  Observable<Integer> values = Observable.just(1,4,5,7,8,9,10); Observable<GroupedObservable<String, Integer>> oddEvenStream = values.groupBy((number) -> number % 2 == 0 ? "odd":"even"); Observable<Integer> remergedStream = Observable.concat(oddEvenStream); remergedStream.subscribe(number -> System.out.print(number +" "));
      
      





 // //1 5 7 9 4 8 10
      
      





かなり単純なconcat



演算子。偶数および奇数のストリームから単一のストリームを作成し、そのサブスクリプションをセットアップします。

>連結**







concat



は、ストリームが完了するのを待ってから別のストリームを追加し、再び1つのストリームを作成することがわかります。 したがって、奇数が最初に表示されます。







また、たとえばzip



演算子が行うように、多くのストリームを結合する機能もあります

> **ジップ演算子**







Zip



は、アーカイバとして機能するため、それほど名付けられていませんが、ジッパーのように(ジャケットに)、2つのストリームからのイベントを結合するためです。







>ジッパー(ラッチ)







1つのストリームから1つのイベントを取得し、別のストリームからイベントに接続します(ペアを作成します)。 これが完了したら、チェーンを下に移動する前に接着オペレータを適用します。







PS:これはより多くのストリームでも機能します。







そのため、1つのストリームがイベントをより速く発行したとしても、リスナーには、より遅いストリームからリリースされる複合イベントのみが表示されます。







ストリームから受信する多くのリモート呼び出しからの応答を「待機」する機能を持つことは、実際には非常に便利です。







一方、 combineLatest



オペレーターcombineLatest



、イベントのペアのリリースをcombineLatest



んが、代わりに、より遅いストリームから発行された最新のイベントを使用してから、それをチェーンのさらに下流に接着および転送する機能を適用します。







>最新の組み合わせ







プッシュアプ​​ローチに基づく思考に向かっています



Observable



が実際に作成される方法の例をいくつか見てみましょう。 最長の作成オプション:







  log("Before create Observable"); Observable<Integer> someIntStream = Observable .create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { log("Create"); emitter.onNext(3); emitter.onNext(4); emitter.onNext(5); emitter.onComplete(); log("Completed"); } }); log("After create Observable"); log("Subscribing 1st"); someIntStream.subscribe((val) -> LOGGER.info("received " + val)); //    // (for onError and onComplete)     , -  log("Subscribing 2nd"); someIntStream.subscribe((val) -> LOGGER.info("received " + val));
      
      





サブスクライバーが購読するとすぐにイベントがサブスクライバーに送信されます

この設計を使用しているわけではなく、新しいObservableOnSubscribe



オブジェクトを渡しました。これは、誰かがサブスクライブしたときに何をすべきかを示しています。







Observable



にサブスクライブするまで、出力はなく、何も起こりません。データは移動しません。







誰かがサインアップすると、 call()



メソッドがcall()



、3つのメッセージがチェーンにプッシュダウンされ、その後にストリームが終了したというシグナルが続きます。







2回サインアップするcall(...)



call(...)



メソッド内のコードも2回呼び出されます。 したがって、他の誰かがサブスクライブし、出力用に次の値を受け取るとすぐに、同じ値を効果的に転送します。







 mainThread: Before create Observable mainThread: After create Observable mainThread: Subscribing 1st mainThread: Create mainThread: received 3 mainThread: received 4 mainThread: received 5 mainThread: Completed mainThread: Subscribing 2nd mainThread: Create mainThread: received 3 mainThread: received 4 mainThread: received 5 mainThread: Completed
      
      





rx演算子は必ずしもマルチスレッドを意味するわけではないことに注意することが重要です。 RxJavaObservableSubscriberの間のデフォルトの競合を実装しません。 したがって、すべての呼び出しは「 メイン 」スレッドで発生します。







誰かがサインアップされたときに広がり始めるObservable



のタイプは、 cold observables



と呼ばれます。 別のビューはhot observables



、誰もフォローしていない場合でもイベントを発行できます。









Subjects



はそのような特別な種類のObservable



であり、 Observer



Subscriber



ように( onNext()



呼び出してonNext()



データをプッシュできることを決定する) Observer



でもあり、ホットObservables



実装を容易にします。 ReplaySubject



などの多くの実装もあり、選択したイベントをバッファーに保存してサブスクリプションで再生します(もちろん、 OutOfMemory



エラーを防ぐためにバッファーのサイズを指定できます)が、 PublishSubject



は署名後に発生したイベントのみをスキップします。

そしてもちろん、他のソースからObservables



を作成するための多くの静的メソッドがあります。







 Observable.just("This", "is", "something") Observable.from(Iterable<T> collection) Observable.from(Future<T> future) -    ,  `future` 
      
      





プッシュで送信されたデータのELKスタックRabbitMQエミッターへの追加



伝統的に、ELKスタックを使用する場合、ElasticSearchを使用してイベントログからデータを要求するため、プルベースのポーリングスタイルであると言えます。







代わりに、イベントが発生した瞬間から処理を開始するまでのイベントへの応答時間をさらに短縮するために、イベントがジャーナルに表示されたときに「即時」に通知するプッシュベースを使用できますか?







多数の可能なソリューションの1つはRabbitMqです。これは、膨大な数のメッセージを処理できることから、パフォーマンスで非常に高い評価を得ている戦闘での経験豊富なソリューションです。 それにもかかわらず、 Logstashは既にRabbitMQプラグインをサポートしています( FluentDには別のプラグインもあります)。これを既存のELKスタックに簡単に統合し、ElasticSearchとRabbitMQにログを書き込むことができます。







おそらく、 Logstashはコントローラーのように振る舞うことができ、それがどのように機能するか、ログに記録されたイベントを送信/保存する場所を選択できることを覚えているでしょう。 これは、処理したいイベントを除外したり、他のRabbitMQキューなどへの送信先を示したりできることを意味します。







Logstashの使用を省略したい場合は、Logback Appenderを介してRabbitMQにデータを直接送信するオプションもあります。







ところで:いわゆるAmqpAppender



は、 RabbitMQ AMQPの特定の実装です(プロトコルバージョンAMQP 0-9-1、0-9)。







たとえば、ActiveMQ(AMQPコネクタもサポートしている)はプロトコルバージョンAMQP 1.0を実装しているようですが、spring-amqpライブラリにはプロトコルバージョン0-9-1、0-9があり、1.0とはまったく異なります)タイプ'org.apache.activemq.transport.amqp.AmqpProtocolException: Connection from client using unsupported AMQP attempted'









ただし、私たちのソリューションは、 logstash-logback-encoderを使用し、フォーマットされたJSONをイベントログとともにLogstashに送信することでした 。 logstashの出力を交換ポイントRabbitMQ(交換)にリダイレクトします。







docker- composeを使用してlogstash-rabbitmqクラスターを開始します

リポジトリを複製できます







docker-compose -f docker-compose-rabbitmq.yml up





そして、あなたは使用することができます

./event-generate.sh





logstashに送信される多数のランダムイベントを生成します







, , , logstash . rabbitmq-output-plugin , :







 output { rabbitmq { exchange => logstash exchange_type => direct host => rabbitmq key => my_app } }
      
      





RabbitMQ JMS , AMQP , .







amqp







(exchange) .







'routing-key', , . , ' logstash. ''







AMQP



. Spring



c RabbitMq









  @Bean ConnectionFactory connectionFactory() { return new CachingConnectionFactory(host, port); } @Bean RabbitAdmin rabbitAdmin() { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); rabbitAdmin.declareQueue(queue()); rabbitAdmin.declareBinding(bindQueueFromExchange(queue(), exchange())); return rabbitAdmin; } @Bean SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName); container.setMessageListener(listenerAdapter); return container; } @Bean Queue queue() { return new Queue(queueName, false); } DirectExchange exchange() { return new DirectExchange("logstash"); } private Binding bindQueueFromExchange(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("my_app"); } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, new MessageConverter() { public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException { throw new RuntimeException("Unsupported"); } public String fromMessage(Message message) throws MessageConversionException { try { return new String(message.getBody(), "UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException("UnsupportedEncodingException"); } } }); messageListenerAdapter.setDefaultListenerMethod("receive"); //the method in our Receiver class return messageListenerAdapter; } @Bean Receiver receiver() { return new Receiver(); }
      
      





'logstash', 'my_app'. MessageListenerAdapter , 'receive' Receiver



, .







, , hot observable



, , , PublishSubject .







 public class Receiver { private PublishSubject<JsonObject> publishSubject = PublishSubject.create(); public Receiver() { } /** * Method invoked by Spring whenever a new message arrives * @param message amqp message */ public void receive(Object message) { log.info("Received remote message {}", message); JsonElement remoteJsonElement = gson.fromJson ((String) message, JsonElement.class); JsonObject jsonObj = remoteJsonElement.getAsJsonObject(); publishSubject.onNext(jsonObj); } public PublishSubject<JsonObject> getPublishSubject() { return publishSubject; } }
      
      





, SimpleMessageListenerContainer , ( ). Observable , ( onNext



, onComplete



, onError ):







 //     Observable.create(s -> { // Thread A new Thread(() -> { s.onNext("one"); s.onNext("two"); }).start(); // Thread B new Thread(() -> { s.onNext("three"); s.onNext("four"); }).start(); }); //     //  Observable<String> obs1 = Observable.create(s -> { // Thread A new Thread(() -> { s.onNext("one"); s.onNext("two"); }).start(); }); Observable<String> obs2 = Observable.create(s -> { // Thread B new Thread(() -> { s.onNext("three"); s.onNext("four"); }).start(); }); Observable<String> c = Observable.merge(obs1, obs2);
      
      





Observable.serialize()



Subject.toSerialized()



, 1 Thread



ListenerContainer



, . , Subjects



, . .







この長いパートII投稿(パート2)の続きとしてコードとリポジトリを確認するか、Rx Playgroundアクセスして、さらに多くの例を見つけることができます。翻訳者のサイトへのリンク




























All Articles