@Override public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator, IntFunction<Integer[]> generator) { if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { return helper.evaluate(spliterator, false, generator); } else { Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator); int[] content = n.asPrimitiveArray(); Arrays.parallelSort(content); return Nodes.node(content); } }
@Override public Spliterator<E> trySplit() { Node<E> p; int s = getEst(); if (s > 1 && (p = current) != null) { int n = batch + BATCH_UNIT; if (n > s) { n = s; } if (n > MAX_BATCH) { n = MAX_BATCH; } Object[] a = new Object[n]; int j = 0; do { a[j++] = p.item; } while ((p = p.next) != null && j < n); current = p; batch = j; est = s - j; return Spliterators.spliterator(a, 0, j, Spliterator.ORDERED); } return null; }
static<U> void mixedTraverseAndSplit(Consumer<U> b, Spliterator<U> splTop) { Spliterator<U> spl1, spl2, spl3; splTop.tryAdvance(b); spl2 = splTop.trySplit(); if (spl2 != null) { spl2.tryAdvance(b); spl1 = spl2.trySplit(); if (spl1 != null) { spl1.tryAdvance(b); spl1.forEachRemaining(b); } spl2.tryAdvance(b); spl2.forEachRemaining(b); } splTop.tryAdvance(b); spl3 = splTop.trySplit(); if (spl3 != null) { spl3.tryAdvance(b); spl3.forEachRemaining(b); } splTop.tryAdvance(b); splTop.forEachRemaining(b); }
@Test public void shouldSplitPartOfTask() { List<String> lines = asList("one", "two"); TextSpliterator ts = new TextSpliterator(lines); Spliterator<String> fork = ts.trySplit(); assertNotNull(fork); StringBuilder sb = new StringBuilder(); assertTrue(ts.tryAdvance(sb::append)); assertEquals("one", sb.toString()); assertFalse(ts.tryAdvance(sb::append)); assertEquals("one", sb.toString()); sb = new StringBuilder(); assertTrue(fork.tryAdvance(sb::append)); assertEquals("two", sb.toString()); assertFalse(fork.tryAdvance(sb::append)); assertEquals("two", sb.toString()); }
private static <T, S extends Spliterator<T>> void testSplitOnce( Collection<T> exp, Supplier<S> supplier, UnaryOperator<Consumer<T>> boxingAdapter) { S spliterator = supplier.get(); long sizeIfKnown = spliterator.getExactSizeIfKnown(); boolean isOrdered = spliterator.hasCharacteristics(Spliterator.ORDERED); ArrayList<T> fromSplit = new ArrayList<>(); Spliterator<T> s1 = supplier.get(); Spliterator<T> s2 = s1.trySplit(); long s1Size = s1.getExactSizeIfKnown(); long s2Size = (s2 != null) ? s2.getExactSizeIfKnown() : 0; Consumer<T> addToFromSplit = boxingAdapter.apply(fromSplit::add); if (s2 != null) s2.forEachRemaining(addToFromSplit); s1.forEachRemaining(addToFromSplit); if (sizeIfKnown >= 0) { assertEquals(sizeIfKnown, fromSplit.size()); if (s1Size >= 0 && s2Size >= 0) assertEquals(sizeIfKnown, s1Size + s2Size); } assertContents(fromSplit, exp, isOrdered); }
@Override public Stream<URL> resources(String name) { Objects.requireNonNull(name); // ordering not specified int characteristics = (Spliterator.NONNULL | Spliterator.IMMUTABLE | Spliterator.SIZED | Spliterator.SUBSIZED); Supplier<Spliterator<URL>> supplier = () -> { try { List<URL> urls = findResourcesAsList(name); return Spliterators.spliterator(urls, characteristics); } catch (IOException e) { throw new UncheckedIOException(e); } }; Stream<URL> s1 = StreamSupport.stream(supplier, characteristics, false); Stream<URL> s2 = parent.resources(name); return Stream.concat(s1, s2); }
@Override public Set<Entry<K, V>> entrySet() { return new EntrySet<K, V>() { @Override Map<K, V> map() { return IteratorBasedAbstractMap.this; } @Override public Iterator<Entry<K, V>> iterator() { return entryIterator(); } @Override public Spliterator<Entry<K, V>> spliterator() { return entrySpliterator(); } @Override public void forEach(Consumer<? super Entry<K, V>> action) { forEachEntry(action); } }; }
ForEachTask(PipelineHelper<T> helper, Spliterator<S> spliterator, Sink<S> sink) { super(null); this.sink = sink; this.helper = helper; this.spliterator = spliterator; this.targetSize = 0L; }
SliceTask(AbstractPipeline<P_OUT, P_OUT, ?> op, PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator, IntFunction<P_OUT[]> generator, long offset, long size) { super(helper, spliterator); this.op = op; this.generator = generator; this.targetOffset = offset; this.targetSize = size; }
@Test(dataProvider = "HashableIntSpliteratorWithNull") void testNullPointerExceptionWithNull(String description, Collection<HashableInteger> exp, Supplier<Spliterator<HashableInteger>> s) { assertThrowsNPE(() -> s.get().forEachRemaining(null)); assertThrowsNPE(() -> s.get().tryAdvance(null)); }
public static OfDouble ofNode(String name, Node.OfDouble node) { int characteristics = Spliterator.SIZED | Spliterator.ORDERED; return new AbstractTestData.DoubleTestData<>(name, node, n -> StreamSupport.doubleStream(n::spliterator, characteristics, false), n -> StreamSupport.doubleStream(n::spliterator, characteristics, true), Node.OfDouble::spliterator, n -> (int) n.count()); }
private static void assertSpliterator(Spliterator<?> s, int rootCharacteristics) { if ((rootCharacteristics & Spliterator.SUBSIZED) != 0) { assertTrue(s.hasCharacteristics(Spliterator.SUBSIZED), "Child split is not SUBSIZED when root split is SUBSIZED"); } assertSpliterator(s); }
@Override public Spliterator<P_OUT> trySplit() { if (isParallel && !finished) { init(); Spliterator<P_IN> split = spliterator.trySplit(); return (split == null) ? null : wrap(split); } else return null; }
/** * Decides whether or not to split a task further or compute it * directly. If computing directly, calls {@code doLeaf} and pass * the result to {@code setRawResult}. Otherwise splits off * subtasks, forking one and continuing as the other. * * <p> The method is structured to conserve resources across a * range of uses. The loop continues with one of the child tasks * when split, to avoid deep recursion. To cope with spliterators * that may be systematically biased toward left-heavy or * right-heavy splits, we alternate which child is forked versus * continued in the loop. */ @Override public void compute() { Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators long sizeEstimate = rs.estimateSize(); long sizeThreshold = getTargetSize(sizeEstimate); boolean forkRight = false; @SuppressWarnings("unchecked") K task = (K) this; while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) { K leftChild, rightChild, taskToFork; task.leftChild = leftChild = task.makeChild(ls); task.rightChild = rightChild = task.makeChild(rs); task.setPendingCount(1); if (forkRight) { forkRight = false; rs = ls; task = leftChild; taskToFork = rightChild; } else { forkRight = true; task = rightChild; taskToFork = leftChild; } taskToFork.fork(); sizeEstimate = rs.estimateSize(); } task.setLocalResult(task.doLeaf()); task.tryComplete(); }
/** * Returns a {@code Stream}, the elements of which are lines read from * this {@code BufferedReader}. The {@link Stream} is lazily populated, * i.e., read only occurs during the * <a href="../util/stream/package-summary.html#StreamOps">terminal * stream operation</a>. * * <p> The reader must not be operated on during the execution of the * terminal stream operation. Otherwise, the result of the terminal stream * operation is undefined. * * <p> After execution of the terminal stream operation there are no * guarantees that the reader will be at a specific position from which to * read the next character or line. * * <p> If an {@link IOException} is thrown when accessing the underlying * {@code BufferedReader}, it is wrapped in an {@link * UncheckedIOException} which will be thrown from the {@code Stream} * method that caused the read to take place. This method will return a * Stream if invoked on a BufferedReader that is closed. Any operation on * that stream that requires reading from the BufferedReader after it is * closed, will cause an UncheckedIOException to be thrown. * * @return a {@code Stream<String>} providing the lines of text * described by this {@code BufferedReader} * * @since 1.8 */ public Stream<String> lines() { Iterator<String> iter = new Iterator<>() { String nextLine = null; @Override public boolean hasNext() { if (nextLine != null) { return true; } else { try { nextLine = readLine(); return (nextLine != null); } catch (IOException e) { throw new UncheckedIOException(e); } } } @Override public String next() { if (nextLine != null || hasNext()) { String line = nextLine; nextLine = null; return line; } else { throw new NoSuchElementException(); } } }; return StreamSupport.stream(Spliterators.spliteratorUnknownSize( iter, Spliterator.ORDERED | Spliterator.NONNULL), false); }
@Override Spliterator<Entry<K, V>> entrySpliterator() { return CollectSpliterators.flatMap( map.entrySet().spliterator(), keyToValueCollectionEntry -> { K key = keyToValueCollectionEntry.getKey(); Collection<V> valueCollection = keyToValueCollectionEntry.getValue(); return CollectSpliterators.map( valueCollection.spliterator(), (V value) -> Maps.immutableEntry(key, value)); }, Spliterator.SIZED, size()); }
/** * Constructor for the head of a stream pipeline. * * @param source {@code Spliterator} describing the stream source * @param sourceFlags the source flags for the stream source, described in * {@link StreamOpFlag} * @param parallel {@code true} if the pipeline is parallel */ AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { this.previousStage = null; this.sourceSpliterator = source; this.sourceStage = this; this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; // The following is an optimization of: // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE); this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE; this.depth = 0; this.parallel = parallel; }
@Override public int characteristics() { if (proxyEstimateSize) return sp.characteristics(); else return sp.characteristics() & ~(Spliterator.SUBSIZED | Spliterator.SIZED); }
/** * Overrides AbstractTask version to include checks for early * exits while splitting or computing. */ @Override public void compute() { Spliterator<P_IN> rs = spliterator, ls; long sizeEstimate = rs.estimateSize(); long sizeThreshold = getTargetSize(sizeEstimate); boolean forkRight = false; @SuppressWarnings("unchecked") K task = (K) this; AtomicReference<R> sr = sharedResult; R result; while ((result = sr.get()) == null) { if (task.taskCanceled()) { result = task.getEmptyResult(); break; } if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) { result = task.doLeaf(); break; } K leftChild, rightChild, taskToFork; task.leftChild = leftChild = task.makeChild(ls); task.rightChild = rightChild = task.makeChild(rs); task.setPendingCount(1); if (forkRight) { forkRight = false; rs = ls; task = leftChild; taskToFork = rightChild; } else { forkRight = true; task = rightChild; taskToFork = leftChild; } taskToFork.fork(); sizeEstimate = rs.estimateSize(); } task.setLocalResult(result); task.tryComplete(); }
@CollectionFeature.Require(SUPPORTS_ADD) public void testSpliteratorNotImmutable_CollectionAllowsAdd() { // If add is supported, verify that IMMUTABLE is not reported. synchronized (collection) { // for Collections.synchronized assertFalse(collection.spliterator().hasCharacteristics(Spliterator.IMMUTABLE)); } }
@Override final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<P_OUT[]> generator) { return Nodes.collect(helper, spliterator, flattenTree, generator); }
/** * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}. * * @implNote * The implementation attempts to cast to a Spliterator.OfInt, and throws an * exception if this cast is not possible. */ private static Spliterator.OfInt adapt(Spliterator<Integer> s) { if (s instanceof Spliterator.OfInt) { return (Spliterator.OfInt) s; } else { if (Tripwire.ENABLED) Tripwire.trip(AbstractPipeline.class, "using IntStream.adapt(Spliterator<Integer> s)"); throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)"); } }
protected ForEachOrderedTask(PipelineHelper<T> helper, Spliterator<S> spliterator, Sink<T> action) { super(null); this.helper = helper; this.spliterator = spliterator; this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); // Size map to avoid concurrent re-sizes this.completionMap = new ConcurrentHashMap<>(Math.max(16, AbstractTask.LEAF_TARGET << 1)); this.action = action; this.leftPredecessor = null; }
SizedCollectorTask(Spliterator<P_IN> spliterator, PipelineHelper<P_OUT> helper, int arrayLength) { assert spliterator.hasCharacteristics(Spliterator.SUBSIZED); this.spliterator = spliterator; this.helper = helper; this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); this.offset = 0; this.length = arrayLength; }
@Override public int characteristics() { if (beforeSplit) { // Concatenation loses DISTINCT and SORTED characteristics return aSpliterator.characteristics() & bSpliterator.characteristics() & ~(Spliterator.DISTINCT | Spliterator.SORTED | (unsized ? Spliterator.SIZED | Spliterator.SUBSIZED : 0)); } else { return bSpliterator.characteristics(); } }
@Override public Spliterator.OfLong trySplit() { long size = estimateSize(); return size <= 1 ? null // Left split always has a half-open range : new RangeLongSpliterator(from, from = from + splitPoint(size), 0); }
@Override public Spliterator spliterator() { return stream.spliterator(); }
public Spliterator<K> spliterator() { return (m instanceof ConcurrentSkipListMap) ? ((ConcurrentSkipListMap<K,V>)m).keySpliterator() : ((SubMap<K,V>)m).new SubMapKeyIterator(); }
@Override public Spliterator.OfInt spliterator() { assert !building : "during building"; return super.spliterator(); }
@Override public Spliterator<Entry<K, V>> spliterator() { return Spliterators.spliterator(entries, ImmutableSet.SPLITERATOR_CHARACTERISTICS); }
OfLong(Spliterator.OfLong aSpliterator, Spliterator.OfLong bSpliterator) { super(aSpliterator, bSpliterator); }
@Test(dataProvider = "Spliterator.OfLong") public void testLongSplitAfterFullTraversal(String description, Collection<Long> exp, Supplier<Spliterator.OfLong> s) { testSplitAfterFullTraversal(s, longBoxingConsumer()); }
Taking(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { super(s, parent); }
OfRef(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) { super(s, noSplitting); this.p = p; }
public Spliterator<Map.Entry<K, V>> spliterator() { return null; }
@Override protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s, long sliceOrigin, long sliceFence, long origin, long fence) { return new SliceSpliterator.OfInt(s, sliceOrigin, sliceFence, origin, fence); }
public static <T> Stream<T> of(Iterator<T> iterator) { return StreamSupport.stream(Spliterators.spliterator(iterator, 0, Spliterator.ORDERED), false); }
@Override public Spliterator.OfInt trySplit() { return (Spliterator.OfInt) super.trySplit(); }
public Spliterator<K> trySplit() { return null; }