@Test public void testDropWhile() { assertArrayEquals(new long[] { 5, 6, 7, 8, 9, 10, 11, 12, 13, 14 }, LongStreamEx.range(100).dropWhile( i -> i % 10 < 5).limit(10).toArray()); assertEquals(100, LongStreamEx.range(100).dropWhile(i -> i % 10 < 0).count()); assertEquals(0, LongStreamEx.range(100).dropWhile(i -> i % 10 < 10).count()); assertEquals(OptionalLong.of(0), LongStreamEx.range(100).dropWhile(i -> i % 10 < 0).findFirst()); assertEquals(OptionalLong.empty(), LongStreamEx.range(100).dropWhile(i -> i % 10 < 10).findFirst()); java.util.Spliterator.OfLong spltr = LongStreamEx.range(100).dropWhile(i -> i % 10 < 1).spliterator(); assertTrue(spltr.tryAdvance((long x) -> assertEquals(1, x))); Builder builder = LongStream.builder(); spltr.forEachRemaining(builder); assertArrayEquals(LongStreamEx.range(2, 100).toArray(), builder.build().toArray()); }
@Override public void run(String... args) throws Exception { getCloudEnvProperties(); loadPoSCounties(); if (!skipSetup){ runSetup(); } if (numberOfTransactions<0) numberOfTransactions = Integer.MAX_VALUE; logger.info(">>>>> RUNNING SIMULATION"); logger.info("--------------------------------------"); logger.info(">>> Geode rest endpoint: "+geodeURL); logger.info("--------------------------------------"); logger.info(">>> Posting "+numberOfTransactions+" transactions ..."); int numberOfDevices = counties.size(); OfLong deviceIDs = new Random().longs(0, numberOfDevices).iterator(); OfLong accountIDs = new Random().longs(0, numberOfAccounts).iterator(); Random random = new Random(); long mean = 100; // mean value for transactions long variance = 40; // variance DecimalFormat df = new DecimalFormat(); df.setMaximumFractionDigits(2); for (int i=0; i<numberOfTransactions; i++){ //Map<String,Object> map = (Map)objects.get(i); Transaction t = new Transaction(); t.setId(Math.abs(UUID.randomUUID().getLeastSignificantBits())); long accountId = accountIDs.next(); t.setAccountId(accountId); // 90% of times, we'll transact this account from a single "home location" if (Math.random()<0.9){ t.setDeviceId(getHomePoS(accountId)); } else { t.setDeviceId(deviceIDs.next()); } t.setTimestamp(System.currentTimeMillis()); double value = Double.parseDouble(df.format(Math.abs(mean+random.nextGaussian()*variance))); t.setValue(value); try{ Transaction response = restTemplate.postForObject(geodeURL+RegionName.Transaction, t, Transaction.class); } catch(Exception e){ logger.warning("Failed to connect to Geode using URL "+geodeURL); e.printStackTrace(); } Thread.sleep(delay); } logger.info("done"); }
@Override public OfLong iterator() { return this.delegate.iterator(); }
@Override public java.util.Spliterator.OfLong spliterator() { return this.delegate.spliterator(); }
LongStreamEx(Spliterator.OfLong spliterator, StreamContext context) { super(spliterator, context); }
final LongStreamEx delegate(Spliterator.OfLong spliterator) { return new LongStreamEx(spliterator, context); }
@Override public OfLong iterator() { return Spliterators.iterator(spliterator()); }
@Test public void testBasics() { assertFalse(LongStreamEx.of(1).isParallel()); assertTrue(LongStreamEx.of(1).parallel().isParallel()); assertFalse(LongStreamEx.of(1).parallel().sequential().isParallel()); AtomicInteger i = new AtomicInteger(); try (LongStreamEx s = LongStreamEx.of(1).onClose(i::incrementAndGet)) { assertEquals(1, s.count()); } assertEquals(1, i.get()); assertEquals(6, LongStreamEx.range(0, 4).sum()); assertEquals(3, LongStreamEx.range(0, 4).max().getAsLong()); assertEquals(0, LongStreamEx.range(0, 4).min().getAsLong()); assertEquals(1.5, LongStreamEx.range(0, 4).average().getAsDouble(), 0.000001); assertEquals(4, LongStreamEx.range(0, 4).summaryStatistics().getCount()); assertArrayEquals(new long[] { 1, 2, 3 }, LongStreamEx.range(0, 5).skip(1).limit(3).toArray()); assertArrayEquals(new long[] { 1, 2, 3 }, LongStreamEx.of(3, 1, 2).sorted().toArray()); assertArrayEquals(new long[] { 1, 2, 3 }, LongStreamEx.of(1, 2, 1, 3, 2).distinct().toArray()); assertArrayEquals(new int[] { 2, 4, 6 }, LongStreamEx.range(1, 4).mapToInt(x -> (int) x * 2).toArray()); assertArrayEquals(new long[] { 2, 4, 6 }, LongStreamEx.range(1, 4).map(x -> x * 2).toArray()); assertArrayEquals(new double[] { 2, 4, 6 }, LongStreamEx.range(1, 4).mapToDouble(x -> x * 2).toArray(), 0.0); assertArrayEquals(new long[] { 1, 3 }, LongStreamEx.range(0, 5).filter(x -> x % 2 == 1).toArray()); assertEquals(6, LongStreamEx.of(1, 2, 3).reduce(Long::sum).getAsLong()); assertEquals(Long.MAX_VALUE, LongStreamEx.rangeClosed(1, Long.MAX_VALUE).spliterator().getExactSizeIfKnown()); assertTrue(LongStreamEx.of(1, 2, 3).spliterator().hasCharacteristics(Spliterator.ORDERED)); assertFalse(LongStreamEx.of(1, 2, 3).unordered().spliterator().hasCharacteristics(Spliterator.ORDERED)); OfLong iterator = LongStreamEx.of(1, 2, 3).iterator(); assertEquals(1L, iterator.nextLong()); assertEquals(2L, iterator.nextLong()); assertEquals(3L, iterator.nextLong()); assertFalse(iterator.hasNext()); AtomicInteger idx = new AtomicInteger(); long[] result = new long[500]; LongStreamEx.range(1000).atLeast(500).parallel().forEachOrdered(val -> result[idx.getAndIncrement()] = val); assertArrayEquals(LongStreamEx.range(500, 1000).toArray(), result); assertTrue(LongStreamEx.empty().noneMatch(x -> true)); assertFalse(LongStreamEx.of(1).noneMatch(x -> true)); assertTrue(LongStreamEx.of(1).noneMatch(x -> false)); }
@Override public OfLong iterator() { return ThrowingBridge.of(getDelegate().iterator(), getExceptionClass()); }
@Override public Spliterator.OfLong spliterator() { return ThrowingBridge.of(getDelegate().spliterator(), getExceptionClass()); }
/** * Produces an array containing cumulative results of applying the * accumulation function going left to right. * * <p> * This is a terminal operation. * * <p> * For parallel stream it's not guaranteed that accumulator will always be * executed in the same thread. * * <p> * This method cannot take all the advantages of parallel streams as it must * process elements strictly left to right. * * @param accumulator a * <a href="package-summary.html#NonInterference">non-interfering * </a>, <a href="package-summary.html#Statelessness">stateless</a> * function for incorporating an additional element into a result * @return the array where the first element is the first element of this * stream and every successor element is the result of applying * accumulator function to the previous array element and the * corresponding stream element. The resulting array has the same * length as this stream. * @see #foldLeft(LongBinaryOperator) * @since 0.5.1 */ public long[] scanLeft(LongBinaryOperator accumulator) { Spliterator.OfLong spliterator = spliterator(); long size = spliterator.getExactSizeIfKnown(); LongBuffer buf = new LongBuffer(size >= 0 && size <= Integer.MAX_VALUE ? (int) size : INITIAL_SIZE); delegate(spliterator).forEachOrdered(i -> buf.add(buf.size == 0 ? i : accumulator.applyAsLong(buf.data[buf.size - 1], i))); return buf.toArray(); }
/** * Creates a <strong>parallel</strong> {@code long} stream from the given Spliterator. This operation is similar to * calling {@code StreamSupport.longStream(spliterator, true)} with the difference that a parallel * <a href="https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps">terminal * operation</a> will be executed in the given {@link ForkJoinPool}. * * @param spliterator A {@code Spliterator.OfLong} describing the stream elements. Must not be {@code null}. * @param workerPool Thread pool for parallel execution of a terminal operation. Must not be {@code null}. * @return A parallel {@code long} stream that executes a terminal operation in the given {@link ForkJoinPool}. * @see StreamSupport#longStream(Spliterator.OfLong, boolean) */ public static LongStream parallelStream(Spliterator.OfLong spliterator, ForkJoinPool workerPool) { requireNonNull(spliterator, "Spliterator must not be null"); return new ParallelLongStreamSupport(longStream(spliterator, true), workerPool); }
/** * Creates a <strong>parallel</strong> {@code long} stream from the given Spliterator supplier. This operation is * similar to calling {@code StreamSupport.longStream(supplier, characteristics, true)} with the difference that a * parallel * <a href="https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps">terminal * operation</a> will be executed in the given {@link ForkJoinPool}. * * @param supplier A {@code Supplier} of a {@code Spliterator.OfLong}. Must not be {@code null}. * @param characteristics Spliterator characteristics of the supplied {@code Spliterator}. The characteristics must * be equal to {@code supplier.get().characteristics()}, otherwise undefined behavior may occur when terminal * operation commences. * @param workerPool Thread pool for parallel execution of a terminal operation. Must not be {@code null}. * @return A parallel {@code long} stream that executes a terminal operation in the given {@link ForkJoinPool}. * @see StreamSupport#longStream(Supplier, int, boolean) */ public static LongStream parallelStream(Supplier<? extends Spliterator.OfLong> supplier, int characteristics, ForkJoinPool workerPool) { requireNonNull(supplier, "Supplier must not be null"); return new ParallelLongStreamSupport(longStream(supplier, characteristics, true), workerPool); }
/** * Returns a stream consisting of the remaining elements of this stream * after discarding the first {@code n} elements of the stream. If this * stream contains fewer than {@code n} elements then an empty stream will * be returned. * * <p> * This is a stateful quasi-intermediate operation. Unlike * {@link #skip(long)} it skips the first elements even if the stream is * unordered. The main purpose of this method is to workaround the problem * of skipping the first elements from non-sized source with further * parallel processing and unordered terminal operation (such as * {@link #forEach(LongConsumer)}). This problem was fixed in OracleJDK * 8u60. * * <p> * Also it behaves much better with infinite streams processed in parallel. * For example, * {@code LongStreamEx.iterate(0L, i->i+1).skip(1).limit(100).parallel().toArray()} * will likely to fail with {@code OutOfMemoryError}, but will work nicely * if {@code skip} is replaced with {@code skipOrdered}. * * <p> * For sequential streams this method behaves exactly like * {@link #skip(long)}. * * @param n the number of leading elements to skip * @return the new stream * @throws IllegalArgumentException if {@code n} is negative * @see #skip(long) * @since 0.3.2 */ public LongStreamEx skipOrdered(long n) { Spliterator.OfLong spliterator = (isParallel() ? StreamSupport.longStream(spliterator(), false) : stream()) .skip(n).spliterator(); return delegate(spliterator); }
/** * Returns a stream containing cumulative results of applying the * accumulation function going left to right. * * <p> * This is a stateful * <a href="package-summary.html#StreamOps">quasi-intermediate</a> * operation. * * <p> * This operation resembles {@link #scanLeft(LongBinaryOperator)}, but * unlike {@code scanLeft} this operation is intermediate and accumulation * function must be associative. * * <p> * This method cannot take all the advantages of parallel streams as it must * process elements strictly left to right. Using an unordered source or * removing the ordering constraint with {@link #unordered()} may improve * the parallel processing speed. * * @param op an <a href="package-summary.html#Associativity">associative</a> * , <a href="package-summary.html#NonInterference">non-interfering * </a>, <a href="package-summary.html#Statelessness">stateless</a> * function for computing the next element based on the previous one * @return the new stream. * @see #scanLeft(LongBinaryOperator) * @since 0.6.1 */ public LongStreamEx prefix(LongBinaryOperator op) { return delegate(new PrefixOps.OfLong(spliterator(), op)); }
/** * Returns a sequential {@code LongStreamEx} containing a single element. * * @param element the single element * @return a singleton sequential stream */ public static LongStreamEx of(long element) { return of(new ConstSpliterator.OfLong(element, 1, true)); }
/** * Returns a sequential {@link LongStreamEx} created from given * {@link java.util.Spliterator.OfLong}. * * @param spliterator a spliterator to create the stream from. * @return the new stream * @since 0.3.4 */ public static LongStreamEx of(Spliterator.OfLong spliterator) { return new LongStreamEx(spliterator, StreamContext.SEQUENTIAL); }
/** * Returns a sequential, ordered {@link LongStreamEx} created from given * {@link java.util.PrimitiveIterator.OfLong}. * * <p> * This method is roughly equivalent to * {@code LongStreamEx.of(Spliterators.spliteratorUnknownSize(iterator, ORDERED))} * , but may show better performance for parallel processing. * * <p> * Use this method only if you cannot provide better Stream source. * * @param iterator an iterator to create the stream from. * @return the new stream * @since 0.5.1 */ public static LongStreamEx of(PrimitiveIterator.OfLong iterator) { return of(new UnknownSizeSpliterator.USOfLong(iterator)); }
/** * Returns a sequential unordered {@code LongStreamEx} of given length which * elements are equal to supplied value. * * @param value the constant value * @param length the length of the stream * @return a new {@code LongStreamEx} * @since 0.1.2 */ public static LongStreamEx constant(long value, long length) { return of(new ConstSpliterator.OfLong(value, length, false)); }
/** * Returns the spliterator which covers all the elements emitted by this * emitter. * * @return the new spliterator */ default Spliterator.OfLong spliterator() { return new EmitterSpliterator.OfLong(this); }