一尘不染

takeWhile()与Flatmap的工作方式不同

java

我正在使用takeWhile创建片段,以探索其可能性。当与flatMap结合使用时,其行为与预期不符。请在下面找到代码片段。

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

实际输出:

Sample1
Sample2
Sample3
Sample5

预期输出:

Sample1
Sample2
Sample3

期望的原因是takeWhile应该一直执行到内部条件变为真为止。我还在平面图中添加了打印输出语句以进行调试。流仅返回两次,这与期望一致。

但是,这在链中没有平面图的情况下也可以正常工作。

String[] strArraySingle = {"Sample3", "Sample4", "Sample5"};
Arrays.stream(strArraySingle)
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

实际输出:

Sample3

此处,实际输出与预期输出匹配。

免责声明:这些摘录仅用于代码实践,不能用于任何有效的用例。

更新:
错误JDK-8193856:该修补程序将作为JDK
10的一部分提供。更改将更正whileOps Sink :: accept

@Override 
public void accept(T t) {
    if (take = predicate.test(t)) {
        downstream.accept(t);
    }
}

更改的实现:

@Override
public void accept(T t) {
    if (take && (take = predicate.test(t))) {
        downstream.accept(t);
    }
}

阅读 301

收藏
2020-12-03

共1个答案

一尘不染

这是JDK 9中的错误-从问题#8193856开始

takeWhile错误地假设上游操作支持并接受取消操作,不幸的是并非如此flatMap

说明

如果已订购流,takeWhile则应显示预期的行为。在您的代码中,情况并非完全如此,因为您使用forEach会放弃订购。如果您关心它(在本示例中这样做),则应该使用它forEachOrdered。有趣的是:那没有任何改变。🤔

因此,也许流不是一开始就订购的?)如果为从其创建的流创建一个临时变量,strArray并通过((StatefulOp) stream).isOrdered();在断点处执行表达式来检查其是否有序,那么您会发现它确实是有序的:

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Stream<String> stream = Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));

// breakpoint here
System.out.println(stream);

这意味着这很可能是实现错误。

融入守则

正如其他人所怀疑的那样,我现在也认为这 可能flatMap渴望有关。更确切地说,这两个问题可能具有相同的根本原因。

查看的来源WhileOps,我们可以看到以下方法:

@Override
public void accept(T t) {
    if (take = predicate.test(t)) {
        downstream.accept(t);
    }
}

@Override
public boolean cancellationRequested() {
    return !take || downstream.cancellationRequested();
}

此代码用于takeWhile检查给定的流元素t是否predicate满足:

  • 如果是这样,它将元素传递给downstream操作,在这种情况下为System.out::println
  • 如果不是,则将其设置take为false,因此在下次询问是否应取消管道(即完成)时,它将返回true

这涵盖了takeWhile操作。您需要知道的另一件事是forEachOrdered导致终端操作执行该方法ReferencePipeline::forEachWithCancel

@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    boolean cancelled;
    do { } while (
            !(cancelled = sink.cancellationRequested())
            && spliterator.tryAdvance(sink));
    return cancelled;
}

所有这些是:

  1. 检查管道是否被取消
  2. 如果没有,则将水槽前进一个元素
  3. 如果这是最后一个元素,则停止

看起来很有前途吧?

不带 flatMap

在“好的情况下”(没有flatMap第二个示例);forEachWithCancel直接在WhileOpas
上运行sink,您可以看到它是如何进行的:

  • ReferencePipeline::forEachWithCancel 进行循环:
    • WhileOps::accept 被赋予每个流元素
    • WhileOps::cancellationRequested 在每个元素之后被查询
  • 在某个时候"Sample4"使谓词失败,流被取消

好极了!

flatMap

在“最坏情况”(与flatMap您的第一个例子),forEachWithCancel运行在flatMap运行,虽然,这只是调用forEachRemainingArraySpliterator进行{"Sample3", "Sample4", "Sample5"},这将会:

if ((a = array).length >= (hi = fence) &&
    (i = index) >= 0 && i < (index = hi)) {
    do { action.accept((T)a[i]); } while (++i < hi);
}

忽略所有这些hifence东西(仅在将数组处理拆分为并行流时使用),这是一个简单的for循环,它将每个元素传递给takeWhile操作,但从不检查它是否被取消

2020-12-03