- それは本当に中間的なものでしょうか、つまり、端末操作が完了するまでソースに触れませんか?
- 彼女は怠け者で、ソースから必要以上のデータを引き出すことはありませんか?
- 無限のストリームで動作しますか? 限られた量のメモリが必要ですか?
- それはうまく並列しますか?
これらのポイントのマイナスは、そのような操作を追加するかどうかを真剣に考えさせます。 最初のマイナス-それはすぐではありません。 たとえば、jOOλの競合他社にはshuffle()操作があり、これは中間のように見えますが、実際にはリストのストリーム全体を直接消費し、それを混合して新しいストリームを作成します。 私はそれを尊重しません。
残りの項目のマイナスは、それらに違反する標準的な操作があるため、すぐに「いいえ」を意味するわけではありません。 2番目のアイテムは
flatMap()
、3番目-
sorted()
、4番目-すべての種類の
limit()
および
takeWhile()
(JDK-9で)に
takeWhile()
ます。 それでも、私はこれを避けようとします。 しかし、先日、私は自分では並列処理がうまくいかず、使用方法によっては無限のストリームで動作しない可能性があることを発見しましたが、それでもあまりにも優れています。 これにより、文字通り数行で、既存の中間操作と、存在しない操作の束をほぼすべて表現することができます。 操作headTail()を呼び出しました。
操作メソッドは、2つの関数パラメーターを受け入れます(以下では、 PECSを省略します)。
<R> StreamEx<R> headTail(BiFunction<T, StreamEx<T>, Stream<R>> mapper, Supplier<Stream<R>> supplier)
最初の関数は、元のストリームの最初の要素と他のすべての要素を含むストリームの2つの引数を取ります。 関数はこれで何でもでき、新しいストリームを返すことができます。それは次の操作に転送されます。 2番目の引数は、引数を取らず、最初の関数と同じタイプのストリームを返す関数です。 元のストリームが空であることが判明した場合に呼び出されます。 実際、関数全体の1つだけが呼び出され、ストリーム全体の端末操作中に1回だけ呼び出されます。
多くの場合、2番目の関数は空のストリームのみを返す必要があります(元のストリームが空の場合、結果は空になります)。したがって、省略できます。
<R> StreamEx<R> headTail(BiFunction<T, StreamEx<T>, Stream<R>> mapper)
これで何ができるか見てみましょう。 単純な使用例は次のようになります。
StreamEx.of("name", "John", "Mary", "Lucy") .headTail((head, tail) -> tail.map(str -> head+": "+str)) .forEach(System.out::println);
結論:
name: John name: Mary name: Lucy
ここでは、ストリームの最初の要素を削除し、それを使用して後続の要素と連結します。 したがって、最初の行にヘッダーがあるテキストファイルを解析できます。 しかし、それはかなり退屈です。 この方法で遊ぶと、はるかに強力であることがわかりました。 それを通して、JDKからの主要な中間操作を表現してみましょう。
Stream.map
マップ操作は、指定された関数を元のストリームのすべての要素に適用します。 これは、headTail()を通してどのように見えるかです:
public static <T, R> StreamEx<R> map(StreamEx<T> input, Function<T, R> mapper) { return input.headTail((head, tail) -> map(tail, mapper).prepend(mapper.apply(head))); }
ここでは、別の単純なプリペンド操作を使用しますが、それなしでは何も起こりません。 これは、2つのストリームの連結に関するトピックのバリエーションです(Stream.concatは標準APIにあります)。 ここでは、末尾を呼び出してから、関数をストリームの先頭にあるhead要素に適用した結果を追加します。
これは再帰に似ていますが、誰もが再帰がスタックを消費していることを知っています。 関数型プログラミング言語では、末尾再帰の最適化によって節約される場合がありますが、Javaではそうではなく、予期されていません。 ただし、この場合、これは完全な再帰ではありません。自分自身の中でmapメソッドを呼び出すのではなく、後で呼び出される関数を作成するだけです。 この場合、個々の
headTail()
への変更がストリームの先頭のみに影響し、テールは変更されないままであれば、呼び出しの深さを制御できることが判明しました。 この機能を「テールストリームの最適化」とはあまり考えていませんでした。 中間操作
prepend
(ストリームの先頭に何かを追加する)、
mapFirst
(残りの部分に触れることなくストリームの最初の要素を変更する)、および
headTail
自体と
headTail
ます。 原則として、標準のskipおよびdropWhile(JDK-9を使用)に拡張できますが、私のライブラリは標準操作が元のStream APIと完全に互換性があることを約束しており、ここで微妙な違いが生じます。
何らかの方法で、上記のマップ操作は、一定サイズよりも大きいスタックまたはメモリを消費せず、任意の長さのストリームにまったく適用できます。 他の操作を見てみましょう。
Stream.limit
ストリームを指定された長さに制限します。 それを1つの要素に制限する場合は、単純にヘッドからストリームを作成します。そうでない場合は、制限を減らしてテールを呼び出します(ハンドルn <= 0-読者のための演習)。
public static <T> StreamEx<T> limit(StreamEx<T> input, int n) { return input.headTail((head, tail) -> n > 1 ? limit(tail, n - 1).prepend(head) : Stream.of(head)); }
最初は少し違う方法で書きました(flatMap引数のように、headTail引数は空のストリームの代わりにnullを返すことができます):
public static <T> StreamEx<T> limit(StreamEx<T> input, int n) { return input.headTail((head, tail) -> n > 0 ? limit(tail, n - 1).prepend(head) : null); }
ただし、この実装には欠点があります。ソースから必要以上に1つの要素を考慮します(n = 0の場合、head引数が読み取られますが、使用されません)。 これは重大な場合があります。 たとえば、このようなコードは機能するはずです。
limit(StreamEx.of(new Random().ints(0, 1000).boxed().distinct()), 1000).forEach(System.out::println);
0から999までの乱数の無限のストリームから、一意の乱数を選択します。 1000個のユニークな番号がありますが、1001個はありません。そのため、ソースから1001番目の番号を取得しようとすると、すべてがフリーズします。
Stream.skip
最初のn個の要素を捨てます。 n = 0の場合、ヘッドを接着したままテールを返します。それ以外の場合は、引数を減らして呼び出します。
static <T> StreamEx<T> skip(StreamEx<T> input, int n) { return input.headTail((head, tail) -> n > 0 ? skip(tail, n - 1) : tail.prepend(head)); }
Stream.flatMap
ストリーム上の各要素を表示し、それらから共通のストリームを作成します。 この場合、実装はマップと同じです。
public static <T, R> StreamEx<R> flatMap(StreamEx<T> input, Function<T, Stream<R>> mapper) { return input.headTail((head, tail) -> flatMap(tail, mapper).prepend(mapper.apply(head))); }
ここでの唯一の違いは、ストリームを受け入れる別のプリペンドが使用されることです(実際、最初のプリペンドはこの特殊なケースです)。
Stream.peek
ストリームの各要素に対して追加のアクションを実行し、ストリームをそのまま返します。 アクションを実行し、頭を尻尾に接着します。
public static <T> StreamEx<T> peek(StreamEx<T> input, Consumer<T> consumer) { return input.headTail((head, tail) -> { consumer.accept(head); return peek(tail, consumer).prepend(head); }); }
Stream.filter
述部を満たす要素を残します。 述部が満たされている場合にのみ頭を接着します。
public static <T> StreamEx<T> filter(StreamEx<T> input, Predicate<T> predicate) { return input.<T> headTail((head, tail) -> predicate.test(head) ? filter(tail, predicate).prepend(head) : filter(tail, predicate)); }
Stream.distinct
ユニークなアイテムを残す。 明らかに追加のメモリが必要になります。 単純な実装では、フィルター(標準または上記で宣言されたもの)を使用します。
public static <T> StreamEx<T> distinct(StreamEx<T> input) { return input.headTail((head, tail) -> distinct(tail.filter(n -> !Objects.equals(head, n))).prepend(head)); }
しかし、そのようなコードは依然としてスタックを使い果たし、テールストリームの最適化はありません。 さらに、各要素はフィルターチェーンによって線形にチェックされますが、最適化したいと思います。 これを行うには、HashSetパラメーターを保持します。
private static <T> StreamEx<T> distinct(StreamEx<T> input, Set<T> observed) { return input.headTail((head, tail) -> observed.add(head) ? distinct(tail, observed).prepend(head) : distinct(tail, observed)); }
要素が既にセットに含まれている場合、
Set.add
は
false
返すことを忘れないでください。 この場合、頭を刺さないでください。 そのような実装はスタックを使い果たしず、標準のメモリよりも劣りません。 ここでは、実行するメソッドを追加する価値があります(再帰関数では、実行するために別のパブリックメソッドが必要になることがよくあります)。
public static <T> StreamEx<T> distinct(StreamEx<T> input) { return distinct(input, new HashSet<>()); }
Stream.sorted
ストリームを並べ替えます。 操作は特別です。ソースが完全に読み取られるまで、結果に何も与えることはできません。 すべてをバッファリングする必要があり(たとえば、
ArrayList
)、ここでは初めて2番目の引数
headTail
を使用します。
public static <T> StreamEx<T> sorted(StreamEx<T> input) { return sorted(input, new ArrayList<>()); } private static <T> StreamEx<T> sorted(StreamEx<T> input, List<T> buf) { return input.headTail((head, tail) -> { buf.add(head); return sorted(tail, buf); }, () -> { buf.sort(null); return buf.stream(); }); }
ソースストリーム全体が終了したら、バッファを並べ替え、そこからストリームを返します。 このような
sorted
は標準のものと同様に機能し、上記の
shuffle
よりも優れていることに注意してください。 たとえば、2つのソートされたストリームを連結すると、最初のストリームを完全に読み取るまで、2番目のストリームはソートされません。 ちなみに、
buf.sort(null)
を
Collections.shuffle(buf)
に置き換えることで、あなたと
shuffle
はほぼ正常に動作します。
Collections.reverse(buf)
を使用すると、ストリームを反転できます。
JDK-9では、これまでに2つの新しい中間操作が追加されています。 また、それらを実現します。
Stream.takeWhile
述部がfalseを返したらすぐにストリームをトリミングします。 制限のように見えます:
public static <T> StreamEx<T> takeWhile(StreamEx<T> input, Predicate<T> predicate) { return input.headTail((head, tail) -> predicate.test(head) ? takeWhile(tail, predicate).prepend(head) : null); }
Stream.dropWhile
述部が
false
返すまで、ストリームから要素をスローし
false
。
skip
似ています:
public static <T> StreamEx<T> dropWhile(StreamEx<T> input, Predicate<T> predicate) { return input.headTail((head, tail) -> predicate.test(head) ? dropWhile(tail, predicate) : tail.prepend(head)); }
さて、車輪の再発明は退屈です。 Stream APIにはない新しい操作を実装してみましょう。
鏡
コンテンツをストリームの最後に逆順で追加します(1、2、3からのストリームが1、2、3、3、2、1になります)。 簡単に実行できますが、テールの最適化は行われません。
public static <T> StreamEx<T> mirror(StreamEx<T> input) { return input.headTail((head, tail) -> mirror(tail).append(head).prepend(head)); }
末尾から、バッファが必要です。
public static <T> StreamEx<T> mirror(StreamEx<T> input) { return mirror(input, new ArrayDeque<>()); } private static <T> StreamEx<T> mirror(StreamEx<T> input, Deque<T> buf) { return input.headTail((head, tail) -> { buf.addFirst(head); return mirror(tail, buf).prepend(head); }, buf::stream); }
両方の実装は、必要以上のものを取りません:
mirror(StreamEx.of(1,2,3,4,5)).limit(3)
は反射点に到達せず、ソースから3つの要素のみを減算します。
scanLeft
ストリームを順次変更し、特定の操作を実行します。 たとえば、
scanLeft(StreamEx.of(1,2,3,4,5), Integer::sum)
は要素を連続して合計し、ストリーム
scanLeft(StreamEx.of(1,2,3,4,5), Integer::sum)
を作成する必要があります。
public static <T> StreamEx<T> scanLeft(StreamEx<T> input, BinaryOperator<T> operator) { return input.headTail((head, tail) -> scanLeft(tail.mapFirst(cur -> operator.apply(head, cur)), operator).prepend(head)); }
ここでは、すでにStreamExにあるmapFirstメソッドを使用しました。 しかし、もしなければ、再帰がなくても簡単に書くことができます。
public static <T> StreamEx<T> mapFirst(StreamEx<T> input, UnaryOperator<T> operator) { return input.headTail((head, tail) -> tail.prepend(operator.apply(head))); }
いずれの場合も、mapFirstと既存のテールの両方でテールが最適化されます。
takeWhileClosed
名前はあまり成功しないかもしれません。 場合によっては、述語を満たす要素だけでなく、それに違反する最初の要素も含まれるように、
takeWhile
を変更することがあります。 既存の操作と通常の
takeWhile
を介してこれを表現するのは普通ではありません。
headTail
を
headTail
と簡単です。
public static <T> StreamEx<T> takeWhileClosed(StreamEx<T> input, Predicate<T> predicate) { return input.headTail((head, tail) -> predicate.test(head) ? takeWhileClosed(tail, predicate).prepend(head) : Stream.of(head)); }
毎回
最初から始めて、指定された間隔(たとえば、10分ごと)でストリームから要素を取得します。 ここで
skip
操作と組み合わせると便利ですが、標準の
skip
はテールが最適化されないため、オーバーライドされた
skip
を使用します。
public static <T> StreamEx<T> every(StreamEx<T> input, int n) { return input.headTail((head, tail) -> every(skip(tail, n - 1), n).prepend(head)); }
カップル
指定された関数をそれらに適用することにより、ストリームを要素の互いに素なペアに分割します(要素の数が奇数の場合、最後の要素をスローします)。 ここでは、
headTail
2回呼び出すと便利です。
public static <T, R> StreamEx<R> couples(StreamEx<T> input, BiFunction<T, T, R> mapper) { return input.headTail((left, tail1) -> tail1.headTail((right, tail2) -> couples(tail2, mapper).prepend(mapper.apply(left, right)))); }
pairMap
交差するペアでも同じことが必要ですか? 簡単です。再帰呼び出し中に適切な要素をストリームに返すだけです。
public static <T, R> StreamEx<R> pairMap(StreamEx<T> input, BiFunction<T, T, R> mapper) { return input.headTail((left, tail1) -> tail1.headTail((right, tail2) -> pairMap(tail2.prepend(right), mapper).prepend(mapper.apply(left, right)))); }
そのような操作は既にStreamExにあり、私はそれについて書きました。 もちろん、
headTail()
による実装とは異なり、通常は並列化されます。
バッチ
さて、ペアで、私はわかります。 そして、ストリームを固定長の断片に(リストの形式で)分割し、最後に整数以外の断片を失いたくない場合はどうでしょうか? たとえば、
batches(StreamEx(1,2,3,4,5,6,7), 3)
は、リスト
[1,2,3], [4,5,6], [7]
からストリームを作成する必要があります。 中間バッファを含む引数はここで役立ちます:
public static <T> StreamEx<List<T>> batches(StreamEx<T> input, int size) { return batches(input, size, Collections.emptyList()); } private static <T> StreamEx<List<T>> batches(StreamEx<T> input, int size, List<T> cur) { return input.headTail((head, tail) -> cur.size() >= size ? batches(tail, size, Collections.singletonList(head)).prepend(cur) // : batches(tail, size, StreamEx.of(cur).append(head).toList()), // () -> Stream.of(cur)); }
ソースが使い果たされた場合、最後に蓄積されたバッファを
() -> Stream.of(cur)
を使用して結果に戻し、テールが失われないようにします。 ここでは、実装
StreamEx.of(cur).append(head).toList()
美しさのために、
StreamEx.of(cur).append(head).toList()
を使用して新しいリストを作成するたびに、既存のリストを変更しません。 ただし、パフォーマンスが重要な場合は、変更可能なリストを簡単に挿入できます。
withIndices
ストリーム内の要素のインデックスを知る必要がありますか? これが可能です。 インデックスと要素のペアのような特別な型を開始しないために、
BiFunction<Integer, T, R>
型の抽象関数を使用します。これは、インデックスと要素で必要な処理を実行できます。
public static <T, R> StreamEx<R> withIndices(StreamEx<T> input, BiFunction<Integer, T, R> mapper) { return withIndices(input, 0, mapper); } private static <T, R> StreamEx<R> withIndices(StreamEx<T> input, int idx, BiFunction<Integer, T, R> mapper) { return input.headTail((head, tail) -> withIndices(tail, idx + 1, mapper).prepend(mapper.apply(idx, head))); }
支配者
よりエキゾチックなタスク:指定された要素が「支配する」特定の要素に続く要素をスローします。 優位性は、2つの要素から述語を定義します。 たとえば、
dominators(numbers, (a, b) -> a >= b)
は、元の数値のサブセットを増やします。 実装はすべてに似ていますが、dropWhileをスキップする代わりに使用されます:
public static <T> StreamEx<T> dominators(StreamEx<T> input, BiPredicate<T, T> isDominator) { return input.headTail((head, tail) -> dominators(dropWhile(tail, e -> isDominator.test(head, e)), isDominator) .prepend(head)); }
appendReduction
ストリームの最後に別の要素を追加します-与えられた操作での削減の結果。 たとえば、
appendReduction(numbers, 0, Integer::sum)
、その要素の合計を数値のストリームに追加します。
public static <T> StreamEx<T> appendReduction(StreamEx<T> input, T identity, BinaryOperator<T> op) { return input.headTail((head, tail) -> appendReduction(tail, op.apply(identity, head), op).prepend(head), () -> Stream.of(identity)); }
いつものように、すべてが怠zyであり、テールが最適化されています。
素数
むしろ、学習タスク。 エラトステネスのふるいを作成します。すでに見たものに分割されている素数をスローする素数の怠zyなストリーム:
public static StreamEx<Integer> sieve(StreamEx<Integer> input) { return sieve(StreamEx.iterate(2, x -> x+1)); } private static StreamEx<Integer> sieve(StreamEx<Integer> input) { return input.headTail((head, tail) -> sieve(tail.filter(n -> n % head != 0)).prepend(head)); }
ここでは、関数型言語の同様のものももちろん最適化されていませんが、テールの最適化は取得されません。 しかし、それは簡単に見えます。 標準設定では、JVMはStackOverflowErrorでクラッシュするまで、最大200,000以上の素数を発行します。
他の便利な操作を考え出すことができます。 たとえば、ストリームの内容を指定された回数だけループで繰り返します。 または、2つの異なるフィルターでフィルター処理してストリームを複製します(同時に、2番目のフィルターが通過しなかったメモリに保存しないでください)。 実行中のウィンドウを作成できます(バッチに似ていますが、重複しています)。 実際、何を思いついたとしても、これをheadTailで非常に短時間で実装することができました(私のテストはこちらです )。 いずれにせよ、私にとって、headTailは
Iterator
や
Spliterator
書くよりも明らかに短く理解しやすい
Spliterator
。 私が理解しているように、関数型プログラミングの世界では、そのようなことは当たり前です。 Javaでそれが可能であることは素晴らしいことです。
喜んでプログラム!