@Override public boolean tryAdvance(LongConsumer consumer) { Objects.requireNonNull(consumer); final long i = from; if (i < upTo) { from++; consumer.accept(i); return true; } else if (last > 0) { last = 0; consumer.accept(i); return true; } return false; }
@Override public final LongStream peek(LongConsumer action) { Objects.requireNonNull(action); return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, 0) { @Override Sink<Long> opWrapSink(int flags, Sink<Long> sink) { return new Sink.ChainedLong<Long>(sink) { @Override public void accept(long t) { action.accept(t); downstream.accept(t); } }; } }; }
/** * Returns an infinite sequential ordered {@code LongStream} produced by iterative * application of a function {@code f} to an initial element {@code seed}, * producing a {@code Stream} consisting of {@code seed}, {@code f(seed)}, * {@code f(f(seed))}, etc. * * <p>The first element (position {@code 0}) in the {@code LongStream} will * be the provided {@code seed}. For {@code n > 0}, the element at position * {@code n}, will be the result of applying the function {@code f} to the * element at position {@code n - 1}. * * <p>The action of applying {@code f} for one element * <a href="../concurrent/package-summary.html#MemoryVisibility"><i>happens-before</i></a> * the action of applying {@code f} for subsequent elements. For any given * element the action may be performed in whatever thread the library * chooses. * * @param seed the initial element * @param f a function to be applied to the previous element to produce * a new element * @return a new sequential {@code LongStream} */ public static LongStream iterate(final long seed, final LongUnaryOperator f) { Objects.requireNonNull(f); Spliterator.OfLong spliterator = new Spliterators.AbstractLongSpliterator(Long.MAX_VALUE, Spliterator.ORDERED | Spliterator.IMMUTABLE | Spliterator.NONNULL) { long prev; boolean started; @Override public boolean tryAdvance(LongConsumer action) { Objects.requireNonNull(action); long t; if (started) t = f.applyAsLong(prev); else { t = seed; started = true; } action.accept(prev = t); return true; } }; return StreamSupport.longStream(spliterator, false); }
@Override public Array<T> forEachLong(LongConsumer consumer) { final int length = length(); if (isParallel() && length > 0) { final int processors = Runtime.getRuntime().availableProcessors(); final int splitThreshold = parallel ? Math.max(length() / processors, 10000) : Integer.MAX_VALUE; final ForEach action = new ForEach(0, length - 1, splitThreshold, consumer); ForkJoinPool.commonPool().invoke(action); } else { for (int i=0; i<length; ++i) { final long value = getLong(i); consumer.accept(value); } } return this; }
@Override protected final void computeUserRanking(final Set<RunFile> runs, final long userID, final LongSet candidateItems, final TreeMap<Double, LongSet> ranking) { candidateItems.forEach((LongConsumer) itemID -> { final MutableInt n = new MutableInt(0); final MutableDouble accum = new MutableDouble(0.0); runs.forEach(run -> { final double score = run.getScore(userID, itemID, Double.NaN); if (!Double.isNaN(score)) { n.increment(); accum.add(score); } }); if (n.get() > 0) { saveScore(ranking, itemID, computeScore(n.get(), accum.get())); } }); }
public void testLongForEachRemainingWithNull() { PrimitiveIterator.OfLong i = new PrimitiveIterator.OfLong() { @Override public long nextLong() { return 0; } @Override public boolean hasNext() { return false; } }; executeAndCatch(() -> i.forEachRemaining((LongConsumer) null)); executeAndCatch(() -> i.forEachRemaining((Consumer<Long>) null)); }
public void testLongForEachRemainingWithNull() { PrimitiveIterator.OfLong i = new PrimitiveIterator.OfLong() { @Override public long nextLong() { return 0; } @Override public boolean hasNext() { return false; } }; assertThrowsNPE(() -> i.forEachRemaining((LongConsumer) null)); assertThrowsNPE(() -> i.forEachRemaining((Consumer<Long>) null)); }
@Override public void forEachRemaining(LongConsumer consumer) { Objects.requireNonNull(consumer); long i = from; final long hUpTo = upTo; int hLast = last; from = upTo; last = 0; while (i < hUpTo) { consumer.accept(i++); } if (hLast > 0) { // Last element of closed range consumer.accept(i); } }
@Test(dataProvider = "LongStreamTestData", dataProviderClass = LongStreamTestDataProvider.class) public void testLongOps(String name, final TestData.OfLong data) { class RecordingConsumer extends AbstractRecordingConsumer<Long> implements LongConsumer { public void accept(long t) { list.add(t); } } final RecordingConsumer b = new RecordingConsumer(); withData(data) .stream(s -> s.peek(b)) .before(b::before) .after(b::after) .exercise(); }
@Override public boolean tryAdvance(LongConsumer action) { Objects.requireNonNull(action); action.accept(s.getAsLong()); return true; }
/** * Fuse the specified combination of runs and print the result. * * @param fold * the fold * @param runs * the runs of the current combination * @param allUsers * all the users in the current combination * @param outputFile * the path to the output file */ protected final void fuseAndPrint(final int fold, final Set<RunFile> runs, final LongSet allUsers, final Path outputFile) { try (final PrintWriter writer = new PrintWriter(Files.newBufferedWriter(outputFile))) { final LongSet all = HashLongSets.newUpdatableSet(maxRank); runs.forEach(run -> { all.addAll(run.getItems()); }); allUsers.forEach((LongConsumer) userID -> { final TreeMap<Double, LongSet> ranking = new TreeMap<Double, LongSet>( Collections.reverseOrder()); final LongSet candidateItems = HashLongSets.newUpdatableSet(maxRank); runs.forEach(run -> { candidateItems.addAll(run.getRanking(userID).keySet()); }); computeUserRanking(runs, userID, candidateItems, ranking); printRanking(userID, ranking, writer); }); } catch (final IOException e) { throw new RuntimeException(e); } }
@Override public boolean tryAdvance(LongConsumer action) { Objects.requireNonNull(action); if (count == -2) { action.accept(first); count = -1; return true; } else { return false; } }
@Override public void forEachRemaining(LongConsumer action) { Objects.requireNonNull(action); if (count == -2) { action.accept(first); count = -1; } }
/** * Adapt a {@code Sink<Long> to an {@code LongConsumer}, ideally simply * by casting. */ private static LongConsumer adapt(Sink<Long> sink) { if (sink instanceof LongConsumer) { return (LongConsumer) sink; } else { if (Tripwire.ENABLED) Tripwire.trip(AbstractPipeline.class, "using LongStream.adapt(Sink<Long> s)"); return sink::accept; } }
@Override public void forEach(LongConsumer action) { if (!isParallel()) { adapt(sourceStageSpliterator()).forEachRemaining(action); } else { super.forEach(action); } }
@Override public void forEachOrdered(LongConsumer action) { if (!isParallel()) { adapt(sourceStageSpliterator()).forEachRemaining(action); } else { super.forEachOrdered(action); } }
@Override public void forEach(Consumer<? super Long> consumer) { if (consumer instanceof LongConsumer) { forEach((LongConsumer) consumer); } else { if (Tripwire.ENABLED) Tripwire.trip(getClass(), "{0} calling SpinedBuffer.OfLong.forEach(Consumer)"); spliterator().forEachRemaining(consumer); } }
@Override protected void arrayForEach(long[] array, int from, int to, LongConsumer consumer) { for (int i = from; i < to; i++) consumer.accept(array[i]); }
@Override public boolean tryAdvance(LongConsumer consumer) { Objects.requireNonNull(consumer); boolean hasNext = doAdvance(); if (hasNext) consumer.accept(buffer.get(nextToConsume)); return hasNext; }
@Override public void forEachRemaining(LongConsumer consumer) { if (buffer == null && !finished) { Objects.requireNonNull(consumer); init(); ph.wrapAndCopyInto((Sink.OfLong) consumer::accept, spliterator); finished = true; } else { do { } while (tryAdvance(consumer)); } }
@Override public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) { Objects.requireNonNull(mapper); // We can do better than this, by polling cancellationRequested when stream is infinite return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { return new Sink.ChainedReference<P_OUT, Long>(sink) { LongConsumer downstreamAsLong = downstream::accept; @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { try (LongStream result = mapper.apply(u)) { // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it if (result != null) result.sequential().forEach(downstreamAsLong); } } }; } }; }
/** * {@inheritDoc} * * @param consumer A {@code Consumer} that is to be invoked with each * element in this {@code Node}. If this is an * {@code LongConsumer}, it is cast to {@code LongConsumer} so * the elements may be processed without boxing. */ @Override default void forEach(Consumer<? super Long> consumer) { if (consumer instanceof LongConsumer) { forEach((LongConsumer) consumer); } else { if (Tripwire.ENABLED) Tripwire.trip(getClass(), "{0} calling Node.OfLong.forEachRemaining(Consumer)"); spliterator().forEachRemaining(consumer); } }
public void testDiscardReturnBound() throws IOException, ClassNotFoundException { List<String> list = new ArrayList<>(); Consumer<String> c = (Consumer<String> & Serializable) list::add; assertSerial(c, cc -> { assertTrue(cc instanceof Consumer); }); AtomicLong a = new AtomicLong(); LongConsumer lc = (LongConsumer & Serializable) a::addAndGet; assertSerial(lc, plc -> { plc.accept(3); }); }
public void forEachRemaining(LongConsumer consumer) { if (consumer == null) throw new NullPointerException(); long i = index, f = fence; if (i < f) { index = f; Random r = rng; long o = origin, b = bound; do { consumer.accept(r.internalNextLong(o, b)); } while (++i < f); } }
public boolean tryAdvance(LongConsumer consumer) { if (consumer == null) throw new NullPointerException(); long i = index, f = fence; if (i < f) { consumer.accept(ThreadLocalRandom.current().internalNextLong(origin, bound)); index = i + 1; return true; } return false; }
public void forEachRemaining(LongConsumer consumer) { if (consumer == null) throw new NullPointerException(); long i = index, f = fence; if (i < f) { index = f; long o = origin, b = bound; ThreadLocalRandom rng = ThreadLocalRandom.current(); do { consumer.accept(rng.internalNextLong(o, b)); } while (++i < f); } }
/** * Creates an {@code PrimitiveIterator.OfLong} from a * {@code Spliterator.OfLong}. * * <p>Traversal of elements should be accomplished through the iterator. * The behaviour of traversal is undefined if the spliterator is operated * after the iterator is returned. * * @param spliterator The spliterator * @return An iterator * @throws NullPointerException if the given spliterator is {@code null} */ public static PrimitiveIterator.OfLong iterator(Spliterator.OfLong spliterator) { Objects.requireNonNull(spliterator); class Adapter implements PrimitiveIterator.OfLong, LongConsumer { boolean valueReady = false; long nextElement; @Override public void accept(long t) { valueReady = true; nextElement = t; } @Override public boolean hasNext() { if (!valueReady) spliterator.tryAdvance(this); return valueReady; } @Override public long nextLong() { if (!valueReady && !hasNext()) throw new NoSuchElementException(); else { valueReady = false; return nextElement; } } } return new Adapter(); }
@Override final boolean forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) { Spliterator.OfLong spl = adapt(spliterator); LongConsumer adaptedSink = adapt(sink); boolean cancelled; do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink)); return cancelled; }
public boolean tryAdvance(LongConsumer consumer) { if (consumer == null) throw new NullPointerException(); long i = index, f = fence; if (i < f) { consumer.accept(rng.internalNextLong(origin, bound)); index = i + 1; return true; } return false; }