ストリームAPI:ユニバーサル中間操作

私は、標準のJava 8ストリームAPIを拡張する無料のStreamExライブラリを開発し、そこに新しい操作、コレクター、およびストリームソースを追加しています。 通常、すべてを連続して追加するわけではありませんが、可能性のある各機能を包括的に検討します。 たとえば、新しい中間操作を追加すると、次のような疑問が生じます。



  1. それは本当に中間的なものでしょうか、つまり、端末操作が完了するまでソースに触れませんか?
  2. 彼女は怠け者で、ソースから必要以上のデータを引き出すことはありませんか?
  3. 無限のストリームで動作しますか? 限られた量のメモリが必要ですか?
  4. それはうまく並列しますか?


これらのポイントのマイナスは、そのような操作を追加するかどうかを真剣に考えさせます。 最初のマイナス-それはすぐではありません。 たとえば、jOOλの競合他社にはshuffle()操作があり、これは中間のように見えますが、実際にはリストのストリーム全体を直接消費し、それを混合して新しいストリームを作成します。 私はそれを尊重しません。



残りの項目のマイナスは、それらに違反する標準的な操作があるため、すぐに「いいえ」を意味するわけではありません。 2番目のアイテムはflatMap()



、3番目- sorted()



、4番目-すべての種類のlimit()



およびtakeWhile()



(JDK-9で)にtakeWhile()



ます。 それでも、私はこれを避けようとします。 しかし、先日、私は自分では並列処理がうまくいかず、使用方法によっては無限のストリームで動作しない可能性があることを発見しましたが、それでもあまりにも優れています。 これにより、文字通り数行で、既存の中間操作と、存在しない操作の束をほぼすべて表現することができます。 操作headTail()を呼び出しました。



操作メソッドは、2つの関数パラメーターを受け入れます(以下では、 PECSを省略します)。



 <R> StreamEx<R> headTail(BiFunction<T, StreamEx<T>, Stream<R>> mapper, Supplier<Stream<R>> supplier)
      
      





最初の関数は、元のストリームの最初の要素と他のすべての要素を含むストリームの2つの引数を取ります。 関数はこれで何でもでき、新しいストリームを返すことができます。それは次の操作に転送されます。 2番目の引数は、引数を取らず、最初の関数と同じタイプのストリームを返す関数です。 元のストリームが空であることが判明した場合に呼び出されます。 実際、関数全体の1つだけが呼び出され、ストリーム全体の端末操作中に1回だけ呼び出されます。



多くの場合、2番目の関数は空のストリームのみを返す必要があります(元のストリームが空の場合、結果は空になります)。したがって、省略できます。



 <R> StreamEx<R> headTail(BiFunction<T, StreamEx<T>, Stream<R>> mapper)
      
      





これで何ができるか見てみましょう。 単純な使用例は次のようになります。



 StreamEx.of("name", "John", "Mary", "Lucy") .headTail((head, tail) -> tail.map(str -> head+": "+str)) .forEach(System.out::println);
      
      





結論:



 name: John name: Mary name: Lucy
      
      





ここでは、ストリームの最初の要素を削除し、それを使用して後続の要素と連結します。 したがって、最初の行にヘッダーがあるテキストファイルを解析できます。 しかし、それはかなり退屈です。 この方法で遊ぶと、はるかに強力であることがわかりました。 それを通して、JDKからの主要な中間操作を表現してみましょう。



Stream.map



マップ操作は、指定された関数を元のストリームのすべての要素に適用します。 これは、headTail()を通してどのように見えるかです:

 public static <T, R> StreamEx<R> map(StreamEx<T> input, Function<T, R> mapper) { return input.headTail((head, tail) -> map(tail, mapper).prepend(mapper.apply(head))); }
      
      





ここでは、別の単純なプリペンド操作を使用しますが、それなしでは何も起こりません。 これは、2つのストリームの連結に関するトピックのバリエーションです(Stream.concatは標準APIにあります)。 ここでは、末尾を呼び出してから、関数をストリームの先頭にあるhead要素に適用した結果を追加します。



これは再帰に似ていますが、誰もが再帰がスタックを消費していることを知っています。 関数型プログラミング言語では、末尾再帰の最適化によって節約される場合がありますが、Javaではそうではなく、予期されていません。 ただし、この場合、これは完全な再帰ではありません。自分自身の中でmapメソッドを呼び出すのではなく、後で呼び出される関数を作成するだけです。 この場合、個々のheadTail()



への変更がストリームの先頭のみに影響し、テールは変更されないままであれば、呼び出しの深さを制御できることが判明しました。 この機能を「テールストリームの最適化」とはあまり考えていませんでした。 中間操作prepend



(ストリームの先頭に何かを追加する)、 mapFirst



(残りの部分に触れることなくストリームの最初の要素を変更する)、およびheadTail



自体とheadTail



ます。 原則として、標準のskipおよびdropWhile(JDK-9を使用)に拡張できますが、私のライブラリは標準操作が元のStream APIと完全に互換性があることを約束しており、ここで微妙な違いが生じます。



何らかの方法で、上記のマップ操作は、一定サイズよりも大きいスタックまたはメモリを消費せず、任意の長さのストリームにまったく適用できます。 他の操作を見てみましょう。



Stream.limit



ストリームを指定された長さに制限します。 それを1つの要素に制限する場合は、単純にヘッドからストリームを作成します。そうでない場合は、制限を減らしてテールを呼び出します(ハンドルn <= 0-読者のための演習)。



 public static <T> StreamEx<T> limit(StreamEx<T> input, int n) { return input.headTail((head, tail) -> n > 1 ? limit(tail, n - 1).prepend(head) : Stream.of(head)); }
      
      





最初は少し違う方法で書きました(flatMap引数のように、headTail引数は空のストリームの代わりにnullを返すことができます):



 public static <T> StreamEx<T> limit(StreamEx<T> input, int n) { return input.headTail((head, tail) -> n > 0 ? limit(tail, n - 1).prepend(head) : null); }
      
      





ただし、この実装には欠点があります。ソースから必要以上に1つの要素を考慮します(n = 0の場合、head引数が読み取られますが、使用されません)。 これは重大な場合があります。 たとえば、このようなコードは機能するはずです。



 limit(StreamEx.of(new Random().ints(0, 1000).boxed().distinct()), 1000).forEach(System.out::println);
      
      





0から999までの乱数の無限のストリームから、一意の乱数を選択します。 1000個のユニークな番号がありますが、1001個はありません。そのため、ソースから1001番目の番号を取得しようとすると、すべてがフリーズします。



Stream.skip



最初のn個の要素を捨てます。 n = 0の場合、ヘッドを接着したままテールを返します。それ以外の場合は、引数を減らして呼び出します。



 static <T> StreamEx<T> skip(StreamEx<T> input, int n) { return input.headTail((head, tail) -> n > 0 ? skip(tail, n - 1) : tail.prepend(head)); }
      
      







Stream.flatMap



ストリーム上の各要素を表示し、それらから共通のストリームを作成します。 この場合、実装はマップと同じです。



 public static <T, R> StreamEx<R> flatMap(StreamEx<T> input, Function<T, Stream<R>> mapper) { return input.headTail((head, tail) -> flatMap(tail, mapper).prepend(mapper.apply(head))); }
      
      





ここでの唯一の違いは、ストリームを受け入れる別のプリペンドが使用されることです(実際、最初のプリペンドはこの特殊なケースです)。



Stream.peek



ストリームの各要素に対して追加のアクションを実行し、ストリームをそのまま返します。 アクションを実行し、頭を尻尾に接着します。

 public static <T> StreamEx<T> peek(StreamEx<T> input, Consumer<T> consumer) { return input.headTail((head, tail) -> { consumer.accept(head); return peek(tail, consumer).prepend(head); }); }
      
      







Stream.filter



述部を満たす要素を残します。 述部が満たされている場合にのみ頭を接着します。

 public static <T> StreamEx<T> filter(StreamEx<T> input, Predicate<T> predicate) { return input.<T> headTail((head, tail) -> predicate.test(head) ? filter(tail, predicate).prepend(head) : filter(tail, predicate)); }
      
      





Stream.distinct



ユニークなアイテムを残す。 明らかに追加のメモリが必要になります。 単純な実装では、フィルター(標準または上記で宣言されたもの)を使用します。



 public static <T> StreamEx<T> distinct(StreamEx<T> input) { return input.headTail((head, tail) -> distinct(tail.filter(n -> !Objects.equals(head, n))).prepend(head)); }
      
      





しかし、そのようなコードは依然としてスタックを使い果たし、テールストリームの最適化はありません。 さらに、各要素はフィルターチェーンによって線形にチェックされますが、最適化したいと思います。 これを行うには、HashSetパラメーターを保持します。



 private static <T> StreamEx<T> distinct(StreamEx<T> input, Set<T> observed) { return input.headTail((head, tail) -> observed.add(head) ? distinct(tail, observed).prepend(head) : distinct(tail, observed)); }
      
      





要素が既にセットに含まれている場合、 Set.add



false



返すことを忘れないでください。 この場合、頭を刺さないでください。 そのような実装はスタックを使い果たしず、標準のメモリよりも劣りません。 ここでは、実行するメソッドを追加する価値があります(再帰関数では、実行するために別のパブリックメソッドが必要になることがよくあります)。



 public static <T> StreamEx<T> distinct(StreamEx<T> input) { return distinct(input, new HashSet<>()); }
      
      





Stream.sorted



ストリームを並べ替えます。 操作は特別です。ソースが完全に読み取られるまで、結果に何も与えることはできません。 すべてをバッファリングする必要があり(たとえば、 ArrayList



)、ここでは初めて2番目の引数headTail



を使用します。



 public static <T> StreamEx<T> sorted(StreamEx<T> input) { return sorted(input, new ArrayList<>()); } private static <T> StreamEx<T> sorted(StreamEx<T> input, List<T> buf) { return input.headTail((head, tail) -> { buf.add(head); return sorted(tail, buf); }, () -> { buf.sort(null); return buf.stream(); }); }
      
      





ソースストリーム全体が終了したら、バッファを並べ替え、そこからストリームを返します。 このようなsorted



は標準のものと同様に機能し、上記のshuffle



よりも優れていることに注意してください。 たとえば、2つのソートされたストリームを連結すると、最初のストリームを完全に読み取るまで、2番目のストリームはソートされません。 ちなみに、 buf.sort(null)



Collections.shuffle(buf)



に置き換えることで、あなたとshuffle



はほぼ正常に動作します。 Collections.reverse(buf)



を使用すると、ストリームを反転できます。



JDK-9では、これまでに2つの新しい中間操作が追加されています。 また、それらを実現します。



Stream.takeWhile



述部がfalseを返したらすぐにストリームをトリミングします。 制限のように見えます:



 public static <T> StreamEx<T> takeWhile(StreamEx<T> input, Predicate<T> predicate) { return input.headTail((head, tail) -> predicate.test(head) ? takeWhile(tail, predicate).prepend(head) : null); }
      
      





Stream.dropWhile



述部がfalse



返すまで、ストリームから要素をスローしfalse



skip



似ています:



 public static <T> StreamEx<T> dropWhile(StreamEx<T> input, Predicate<T> predicate) { return input.headTail((head, tail) -> predicate.test(head) ? dropWhile(tail, predicate) : tail.prepend(head)); }
      
      





さて、車輪の再発明は退屈です。 Stream APIにはない新しい操作を実装してみましょう。





コンテンツをストリームの最後に逆順で追加します(1、2、3からのストリームが1、2、3、3、2、1になります)。 簡単に実行できますが、テールの最適化は行われません。



 public static <T> StreamEx<T> mirror(StreamEx<T> input) { return input.headTail((head, tail) -> mirror(tail).append(head).prepend(head)); }
      
      





末尾から、バッファが必要です。



 public static <T> StreamEx<T> mirror(StreamEx<T> input) { return mirror(input, new ArrayDeque<>()); } private static <T> StreamEx<T> mirror(StreamEx<T> input, Deque<T> buf) { return input.headTail((head, tail) -> { buf.addFirst(head); return mirror(tail, buf).prepend(head); }, buf::stream); }
      
      





両方の実装は、必要以上のものを取りません: mirror(StreamEx.of(1,2,3,4,5)).limit(3)



は反射点に到達せず、ソースから3つの要素のみを減算します。



scanLeft



ストリームを順次変更し、特定の操作を実行します。 たとえば、 scanLeft(StreamEx.of(1,2,3,4,5), Integer::sum)



は要素を連続して合計し、ストリームscanLeft(StreamEx.of(1,2,3,4,5), Integer::sum)



を作成する必要があります。



 public static <T> StreamEx<T> scanLeft(StreamEx<T> input, BinaryOperator<T> operator) { return input.headTail((head, tail) -> scanLeft(tail.mapFirst(cur -> operator.apply(head, cur)), operator).prepend(head)); }
      
      





ここでは、すでにStreamExにあるmapFirstメソッドを使用しました。 しかし、もしなければ、再帰がなくても簡単に書くことができます。



 public static <T> StreamEx<T> mapFirst(StreamEx<T> input, UnaryOperator<T> operator) { return input.headTail((head, tail) -> tail.prepend(operator.apply(head))); }
      
      





いずれの場合も、mapFirstと既存のテールの両方でテールが最適化されます。



takeWhileClosed



名前はあまり成功しないかもしれません。 場合によっては、述語を満たす要素だけでなく、それに違反する最初の要素も含まれるように、 takeWhile



を変更することがあります。 既存の操作と通常のtakeWhile



を介してこれを表現するのは普通ではありません。 headTail



headTail



と簡単です。



 public static <T> StreamEx<T> takeWhileClosed(StreamEx<T> input, Predicate<T> predicate) { return input.headTail((head, tail) -> predicate.test(head) ? takeWhileClosed(tail, predicate).prepend(head) : Stream.of(head)); }
      
      





毎回



最初から始めて、指定された間隔(たとえば、10分ごと)でストリームから要素を取得します。 ここでskip



操作と組み合わせると便利ですが、標準のskip



はテールが最適化されないため、オーバーライドされたskip



を使用します。



 public static <T> StreamEx<T> every(StreamEx<T> input, int n) { return input.headTail((head, tail) -> every(skip(tail, n - 1), n).prepend(head)); }
      
      





カップル



指定された関数をそれらに適用することにより、ストリームを要素の互いに素なペアに分割します(要素の数が奇数の場合、最後の要素をスローします)。 ここでは、 headTail



2回呼び出すと便利です。



 public static <T, R> StreamEx<R> couples(StreamEx<T> input, BiFunction<T, T, R> mapper) { return input.headTail((left, tail1) -> tail1.headTail((right, tail2) -> couples(tail2, mapper).prepend(mapper.apply(left, right)))); }
      
      





pairMap



交差するペアでも同じことが必要ですか? 簡単です。再帰呼び出し中に適切な要素をストリームに返すだけです。



 public static <T, R> StreamEx<R> pairMap(StreamEx<T> input, BiFunction<T, T, R> mapper) { return input.headTail((left, tail1) -> tail1.headTail((right, tail2) -> pairMap(tail2.prepend(right), mapper).prepend(mapper.apply(left, right)))); }
      
      





そのような操作は既にStreamExにあり、私それについて書きました。 もちろん、 headTail()



による実装とは異なり、通常は並列化されます。



バッチ



さて、ペアで、私はわかります。 そして、ストリームを固定長の断片に(リストの形式で)分割し、最後に整数以外の断片を失いたくない場合はどうでしょうか? たとえば、 batches(StreamEx(1,2,3,4,5,6,7), 3)



は、リスト[1,2,3], [4,5,6], [7]



からストリームを作成する必要があります。 中間バッファを含む引数はここで役立ちます:



 public static <T> StreamEx<List<T>> batches(StreamEx<T> input, int size) { return batches(input, size, Collections.emptyList()); } private static <T> StreamEx<List<T>> batches(StreamEx<T> input, int size, List<T> cur) { return input.headTail((head, tail) -> cur.size() >= size ? batches(tail, size, Collections.singletonList(head)).prepend(cur) //         : batches(tail, size, StreamEx.of(cur).append(head).toList()), //     () -> Stream.of(cur)); }
      
      





ソースが使い果たされた場合、最後に蓄積されたバッファを() -> Stream.of(cur)



を使用して結果に戻し、テールが失われないようにします。 ここでは、実装StreamEx.of(cur).append(head).toList()



美しさのために、 StreamEx.of(cur).append(head).toList()



を使用して新しいリストを作成するたびに、既存のリストを変更しません。 ただし、パフォーマンスが重要な場合は、変更可能なリストを簡単に挿入できます。



withIndices



ストリーム内の要素のインデックスを知る必要がありますか? これが可能です。 インデックスと要素のペアのような特別な型を開始しないために、 BiFunction<Integer, T, R>



型の抽象関数を使用します。これは、インデックスと要素で必要な処理を実行できます。



 public static <T, R> StreamEx<R> withIndices(StreamEx<T> input, BiFunction<Integer, T, R> mapper) { return withIndices(input, 0, mapper); } private static <T, R> StreamEx<R> withIndices(StreamEx<T> input, int idx, BiFunction<Integer, T, R> mapper) { return input.headTail((head, tail) -> withIndices(tail, idx + 1, mapper).prepend(mapper.apply(idx, head))); }
      
      







支配者



よりエキゾチックなタスク:指定された要素が「支配する」特定の要素に続く要素をスローします。 優位性は、2つの要素から述語を定義します。 たとえば、 dominators(numbers, (a, b) -> a >= b)



は、元の数値のサブセットを増やします。 実装はすべてに似ていますが、dropWhileをスキップする代わりに使用されます:



 public static <T> StreamEx<T> dominators(StreamEx<T> input, BiPredicate<T, T> isDominator) { return input.headTail((head, tail) -> dominators(dropWhile(tail, e -> isDominator.test(head, e)), isDominator) .prepend(head)); }
      
      







appendReduction



ストリームの最後に別の要素を追加します-与えられた操作での削減の結果。 たとえば、 appendReduction(numbers, 0, Integer::sum)



、その要素の合計を数値のストリームに追加します。



 public static <T> StreamEx<T> appendReduction(StreamEx<T> input, T identity, BinaryOperator<T> op) { return input.headTail((head, tail) -> appendReduction(tail, op.apply(identity, head), op).prepend(head), () -> Stream.of(identity)); }
      
      





いつものように、すべてが怠zyであり、テールが最適化されています。



素数



むしろ、学習タスク。 エラトステネスのふるいを作成します。すでに見たものに分割されている素数をスローする素数の怠zyなストリーム:



 public static StreamEx<Integer> sieve(StreamEx<Integer> input) { return sieve(StreamEx.iterate(2, x -> x+1)); } private static StreamEx<Integer> sieve(StreamEx<Integer> input) { return input.headTail((head, tail) -> sieve(tail.filter(n -> n % head != 0)).prepend(head)); }
      
      





ここでは、関数型言語の同様のものももちろん最適化されていませんが、テールの最適化は取得されません。 しかし、それは簡単に見えます。 標準設定では、JVMはStackOverflowErrorでクラッシュするまで、最大200,000以上の素数を発行します。



他の便利な操作を考え出すことができます。 たとえば、ストリームの内容を指定された回数だけループで繰り返します。 または、2つの異なるフィルターでフィルター処理してストリームを複製します(同時に、2番目のフィルターが通過しなかったメモリに保存しないでください)。 実行中のウィンドウを作成できます(バッチに似ていますが、重複しています)。 実際、何を思いついたとしても、これをheadTailで非常に短時間で実装することができました(私のテストはこちらです )。 いずれにせよ、私にとって、headTailはIterator



Spliterator



書くよりも明らかに短く理解しやすいSpliterator



。 私が理解しているように、関数型プログラミングの世界では、そのようなことは当たり前です。 Javaでそれが可能であることは素晴らしいことです。



喜んでプログラム!



All Articles