履歴を挿入するのが簡単だった理由
すべてのバンは最小限の労力と非常に小さなコード変更で達成されました-Hystrixは同期プログラミングモデルを提供します。その結果、小さなhttpクライアントコールコードのみがアプリケーションストリームからhystrixスレッドプールに転送され、問題の数は最小限に抑えられます(これらの問題は、完全に避けてください)。
単純な例を考えてみましょう。CommandHelloWorldコマンドは、非常に重要なデータとそれを使用するクライアントコードを遅延して取得します。
public class CommandHelloWorld extends HystrixCommand<String> { @Override protected String run() throws InterruptedException { // // Thread.sleep(random.nextInt(500)); return "Hello " + name + "!"; } } // String executeCommand(String str) { // , . return new CommandHelloWorld(str).execute(); }
実行時間が制御された非常にシンプルなソリューションであることが判明しました。 もちろん、時間の経過とともに、最初の利点は欠点に変わります。 200個の要素のリストを提供する必要があるクライアントがいます。各要素の形成のために、外部システムに1回(または複数回)(時には1つではない)連絡する必要があります。 すべての外部システムがバッチをサポートしているわけではなく、エラーなしでコードを簡単に並列化できるわけではありません-外部システムへの1回の旅行で20ミリ秒を費やした後、リクエスト全体に4秒かかるため、ユーザーが苦しみ、Tomcatスレッドのプールはストリームを受信しませんでした、同時に、histrixスレッドプールは実際にはリクエストで満たされていませんでした。 それを修正する時が来ました。
どのように履歴を整理しますか
ほとんどすべてのアプリケーションは、次のパターンに従って構築されました。
- ソースデータのリストを取得します
- それからストリームを形成する
- を含む履歴でフィルター/マップします
- ストリームリストから収集する
もちろん、理論的には、Javaストリームには並列操作がありますが、実際には、作業が実行されるスレッドプールの管理が非常に悪いため、使用を拒否することにしました。 元のパターンを壊さず、チームへのモスクを壊さない、ある種のわかりやすい解決策が必要です。 その結果、reactivex.ioとjavaストリームのアドオンに基づいて、2つの実用的なソリューションが提案されました。
デモはgithubにあります。すべての動作例はテストに含まれています。
オプション1.反応性を追加する
histrix内では、「非同期データソース」 reactivex.ioのリアクティブ(これは高速であるという事実ではなく、用語です)ライブラリを使用していることに注意してください。 この記事は、この手に負えないトピックへのガイドではありませんが、1つのエレガントなソリューションが示されます。 残念なことに、私はobservableという用語の確立されたロシア語の翻訳に精通していないため、ソースと呼びます。 そして、 Patternを壊さないように、このように行動します:
- ソースデータのリストを取得します
- それからソースを形成する
- histrixを含む、ソースのフィルター処理/マッピング
- ソースリストから収集する
一方で、最後の操作は見た目ほど単純ではありませんが、すべての外部接続が履歴によって制御され、したがって時間に厳密に制限されていることがわかっているため、アプリケーションの独自のコードは実際には何もしないため、指定された期間のデータソースを想定できますタイムアウトは値を上げる必要があります。そうしないと、タイムアウトが期限切れになる前(1秒)にhistrixによって例外がスローされます。 したがって、単純なコレクター関数を使用します。
static List<String> toList(Observable<String> observable) { return observable.timeout(1, TimeUnit.SECONDS).toList().toBlocking().single(); }
ネイティブAPIを使用して2つのコマンドとソースを作成します。
/** * - */ static Observable<String> executeCommand(String str) { LOG.info("Hot Hystrix command created: {}", str); return new CommandHelloWorld(str).observe(); } /** * - */ static Observable<String> executeCommandDelayed(String str) { LOG.info("Cold Hystrix command created: {}", str); return new CommandHelloWorld(str).toObservable(); }
したがって、6つの要素のリストを処理する単純なケースを考えてみましょう。
public void testNaive() { List<Integer> source = IntStream.range(1, 7).boxed().collect(Collectors.toList()); Observable<String> observable = Observable.from(source) .flatMap(elem -> executeCommand(elem.toString())); toList(observable).forEach(el ->LOG.info("List element: {}", el)); }
すべてが約500ミリ秒にわたって著しく並行して動作し、プログラムログはhistrixストリームの同時使用を確認します。 副作用として、アイテムはリストにランダムにリストされます。 これは反応性の価格です。
リストのサイズを49に増やしてみてください-そして、通常の問題が発生します:
public void testStupid() { List<Integer> source = IntStream.range(1, 50).boxed().collect(Collectors.toList()); Observable<String> observable = Observable.from(source) .flatMap(elem -> executeCommand(elem.toString())); toList(observable).forEach(el ->LOG.info("List element: {}", el)); }
以下に、このような面白いログの排出を示します。
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 1 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 2 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 3 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 4 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 5 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 6 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 7 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 8 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 9 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 10 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 11 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 5 [hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command start: 2 [hystrix-ExampleGroup-9] INFO org.silentpom.CommandHelloWorld - Command start: 9 [hystrix-ExampleGroup-8] INFO org.silentpom.CommandHelloWorld - Command start: 8 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 4 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 1 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 6 [hystrix-ExampleGroup-10] INFO org.silentpom.CommandHelloWorld - Command start: 10 [hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command start: 3 [main] ERROR org.silentpom.RxHystrixTest - Ooops com.netflix.hystrix.exception.HystrixRuntimeException: CommandHelloWorld could not be queued for execution and no fallback available.
事実、デフォルトでは、histrixはゼロキューで10スレッドのプールを作成します。 ソースをサブスクライブすると、そのすべての要素が非常に迅速に放出され、プール全体が即座にオーバーフローします。コールドソースを作成しても効果はありませんでした。 このような履歴は必要ありません。 1人のクライアントの欲を合理的に制限する必要があります。
ソリューションは非常にシンプルであることが判明しました。
- まず、flatMapを使用しないため、ソースにサブスクライブしてもすべてのコマンドが作成されるわけではありません。 そして、mapメソッドを使用して二重ソースを作成します。
- これらのソースをウィンドウメソッドでグループ化-トリプルソースを取得します!
- トリプルソースを厳密に整理する時が来ました-concatMapメソッドでそれらを一つずつリリースしてください
- 各二重ソースは、flatMapによって並列に計算されます
コードは驚くほどコンパクトでしたが、その作業を理解するには長い時間がかかりました。
public void testWindow() { List<Integer> source = IntStream.range(1, 50).boxed().collect(Collectors.toList()); Observable<String> observable = Observable.from(source) .map(elem -> executeCommandDelayed(elem.toString())) .window(7) .concatMap(window -> window.flatMap(x -> x)); toList(observable).forEach(el ->LOG.info("List element: {}", el)); }
ログの断片を見てみましょう:
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 20 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 21 [hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command start: 3 [hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command start: 7 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 5 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 4 [hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command start: 2 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 6 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 1 [hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 3 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 6 [hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 2 [hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 7 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 1 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 5 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 4 [hystrix-ExampleGroup-8] INFO org.silentpom.CommandHelloWorld - Command start: 8 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 11 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 12
アプリケーションログから、7番目のチームの起動後に新しいチームの起動プロセスが停止したことがわかります。 その結果、非常に簡単なコード変更が行われ、プール全体を詰まらせることなく、所望の並列度で外部システムへのクエリを実行できるようになりました。
オプション2.反応性を参照してください! 幹部がいます
まれな人はrxを理解しています。 彼が反対を言っても-あなた自身の言葉で上記のコードを書くように頼みなさい。 しかし、Java 8にはすでにストリームと未来があり、histrixはネイティブとして未来と連携できるようです。彼らの助けを借りて並列処理を開始してみましょう。 このような未来を創造します。
// static Future<String> executeCommandDelayed(String str) { LOG.info("Direct Hystrix command created: {}", str); return new CommandHelloWorld(str).queue(); } // , static String executeCommand(String str) { LOG.info("Direct Hystrix command created: {}", str); return new CommandHelloWorld(str).execute(); }
49個の要素のリストを処理しようとしています。
public void testStupid() { IntStream.range(1, 50).boxed().map( value -> executeCommandDelayed(value.toString()) ).collect(Collectors.toList()) .forEach(el -> LOG.info("List element (FUTURE): {}", el.toString())); }
また、よくある問題です。
[main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 2 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 3 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 4 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 5 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 6 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 7 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 8 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 9 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 10 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 11 [main] ERROR org.silentpom.stream.ParallelAsyncServiceTest - Ooops com.netflix.hystrix.exception.HystrixRuntimeException: CommandHelloWorld could not be queued for execution and no fallback available.
この場合、コマンドの作成順序はスプリッターによって制御されます。つまり、通常は完全に不透明で危険です。 ストリームを操作するためのかなり馴染みのある手法を維持しながら、2段階で問題を解決しようとします。
1)ソースデータストリームを先物のストリームにマッピングします。 さらに、実際に起動されたタスクを制御します。 このためには、必要な並列度を持つ中間エグゼキューターが必要です
2)先物の流れを価値の流れに変える。 各先物は最終的にhistrixによって実行され、リードタイムが保証されていることに留意してください。
最初のステップを実装するには、ユーザー定義関数を変換するための別のparallelWarp演算子を作成します。2番目のステップでは、ストリームを受信および返すwaitStream関数を作成する必要があります。
public void testSmart() { service.waitStream( IntStream.range(1, 50).boxed().map( service.parallelWarp( value -> executeCommand(value.toString()) ) ) ).collect(Collectors.toList()) .forEach(el -> LOG.info("List element: {}", el)); }
この記録は、ストリームのユーザーにはほとんど馴染みがありました。 ボンネットの下にあるものを見てみましょう。これが今日の最後のコードです。
// threadSize = 7 public ParallelAsyncService(int threadSize) { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("parallel-async-thread-%d").build(); // , executorService = Executors.newFixedThreadPool(threadSize, namedThreadFactory); } /** * Maps user function T -> Ret to function T -> Future<Ret>. Adds task to executor service * @param mapper user function * @param <T> user function argument * @param <Ret> user function result * @return function to future */ public <T, Ret> Function<T, Future<Ret>> parallelWarp(Function<T, Ret> mapper) { return (T t) -> { LOG.info("Submitting task to inner executor"); Future<Ret> future = executorService.submit(() -> { LOG.info("Sending task to hystrix"); return mapper.apply(t); }); return future; }; } /** * waits all futures in stream and rethrow exception if occured * @param futureStream stream of futures * @param <T> type * @return stream of results */ public <T> Stream<T> waitStream(Stream<Future<T>> futureStream) { List<Future<T>> futures = futureStream.collect(Collectors.toList()); // wait all futures one by one. for (Future<T> future : futures) { try { future.get(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } throw new RuntimeException(e); } } // all futures have completed, it is safe to call get return futures.stream().map( future -> { try { return future.get(); } catch (Exception e) { e.printStackTrace(); return null; // } } );
waitStreamメソッドは非常にシンプルで、エラー処理のみが台無しになりました。 parallelWarp演算子は非常に単純で、おそらく関数型プログラミングの支持者の間で特別な名前を持っています。 新しいhistrixコマンドは、必要な並列度を備えた内部エグゼキューターによってのみ作成されます。 Pruflink:
main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 18 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 19 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 20 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 21 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 4 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 1 [hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command start: 2 [hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command start: 3 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 5 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 6 [hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command start: 7 [hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 2 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 5 [hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 3 [hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 7 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 6 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 4 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 1 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 11 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 12 [hystrix-ExampleGroup-8] INFO org.silentpom.CommandHelloWorld - Command start: 8 [hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command start: 13 [hystrix-ExampleGroup-9] INFO org.silentpom.CommandHelloWorld - Command start: 9
このアプローチでは、histrixスレッドの各プールに追加のスレッドプールが必要でしたが、出力リストの順序は維持されていました。 どのアプローチがアプリケーションに役立つかは、時間でわかります。
githubのテストですべての例を見ることができることを繰り返します。 私は有名なhabro-criticismにうれしいです。