@AstVisitor(nodes=AstNodes.EXPRESSIONS, minVersion=8) public void visit(Expression expr, MethodContext mc) { if(expr.getCode() == AstCode.InvokeInterface) { MethodReference mr = (MethodReference) expr.getOperand(); if(mr.getReturnType().getPackageName().equals("java.util.stream") && Types.isBaseStream(mr.getReturnType())) { // intermediate stream operation if(mc.isAnnotated() && !Inf.BACKLINK.findTransitiveUsages(expr, true).findAny().isPresent()) { // .parallel()/.sequential()/.onClose()/.unordered() excluded as may return itself if(Types.is(mr.getReturnType(), BaseStream.class)) { mc.report("StreamMethodMayNotReturnItself", 0, expr); } else { mc.report("AbandonedStream", 0, expr); } } } } }
@Override public Stream<T> getCollectionStream(Repository repository) { Stream<T> stream = Stream.empty(); List<Stream<T>> streams = new ArrayList<>(); Set<UUID> seen = new HashSet<>(); for (ModelCollectionQuery<T> query : queries) { Stream<T> queryStream = query.getCollectionStream(repository) .filter(m -> !seen.contains(m.getId())) .map(m -> { seen.add(m.getId()); return m; }); streams.add(queryStream); stream = Stream.concat(stream, queryStream); } return stream.onClose(() -> { streams.forEach(BaseStream::close); }); }
private static < T, T_SPLITR extends Spliterator<T>, T_STREAM extends BaseStream<T, T_STREAM>> T_STREAM concatInternal(T_STREAM[] streams, IntFunction<T_SPLITR[]> arrayFunction, Function<T_STREAM, T_SPLITR> spliteratorFunction, Function<T_SPLITR[], T_SPLITR> concatFunction, BiFunction<T_SPLITR, Boolean, T_STREAM> streamFunction) { T_SPLITR[] spliterators = arrayFunction.apply(streams.length); boolean parallel = false; for (int i = 0; i < streams.length; i++) { T_STREAM inStream = streams[i]; T_SPLITR inSpliterator = spliteratorFunction.apply(inStream); spliterators[i] = inSpliterator; parallel = parallel || inStream.isParallel(); } T_SPLITR outSpliterator = concatFunction.apply(spliterators); T_STREAM outStream = streamFunction.apply(outSpliterator, parallel); return outStream.onClose(new ComposedClose(streams)); }
@Override public void run() { int i = 0; BaseStream<?, ?> stream; while (i < streams.length) { stream = streams[i++]; try { stream.close(); } catch (Throwable e1) { while (i < streams.length) { stream = streams[i++]; try { stream.close(); } catch (Throwable e2) { // TODO: Should we wrap this in a try/catch too? e1.addSuppressed(e2); } } throw e1; } } }
private static<T> Class<?> standardizeClass(Class<T> initialClass){ if(JsonValue.class.isAssignableFrom(initialClass)) return JsonValue.class; else if( List.class.isAssignableFrom(initialClass) ) return List.class; else if( BaseStream.class.isAssignableFrom(initialClass) ) return BaseStream.class; else if( Set.class.isAssignableFrom(initialClass) ) return Set.class; else if( initialClass.isArray() ) return Array.class; else if( JsonNode.class.isAssignableFrom(initialClass) ) return JsonNode.class; else if( Queue.class.isAssignableFrom(initialClass) ) return Queue.class; else if( Iterator.class.isAssignableFrom(initialClass) ) return Iterator.class; else if( Map.class.isAssignableFrom(initialClass) ) return Map.class; else if(Iterable.class.isAssignableFrom(initialClass) ) return Iterable.class; else return initialClass; }
@Test public void sequential() { BaseStream<?, ?> stream = this.parallelStreamSupportMock.sequential(); verify(this.delegateMock).sequential(); assertSame(this.parallelStreamSupportMock, stream); }
@Test public void parallel() { BaseStream<?, ?> stream = this.parallelStreamSupportMock.parallel(); verify(this.delegateMock).parallel(); assertSame(this.parallelStreamSupportMock, stream); }
@Test public void unordered() { BaseStream<?, ?> stream = this.parallelStreamSupportMock.unordered(); verify(this.delegateMock).unordered(); assertSame(this.parallelStreamSupportMock, stream); }
@Test public void onClose() { Runnable r = () -> {}; BaseStream<?, ?> stream = this.parallelStreamSupportMock.onClose(r); verify(this.delegateMock).onClose(r); assertSame(this.parallelStreamSupportMock, stream); }
/** * @param stream * a stream which creates new instances of type <code>T</code>. */ public StreamProducer(final BaseStream<T, ?> stream) { if (stream == null) { throw new IllegalArgumentException("stream may not be null"); } this.stream = stream; }
StreamContext combine(BaseStream<?, ?> other) { if (other == null) return this; StreamContext otherStrategy = of(other); StreamContext result = this; if (other.isParallel() && !parallel) result = parallel(); if (otherStrategy.closeHandler != null) result = result.onClose(otherStrategy.closeHandler); return result; }
static StreamContext of(BaseStream<?, ?> stream) { if (stream instanceof BaseStreamEx) return ((BaseStreamEx<?, ?, ?, ?>) stream).context; if (mustCloseStream(stream)) return new StreamContext(stream.isParallel()).onClose(stream::close); return stream.isParallel() ? PARALLEL : SEQUENTIAL; }
static boolean mustCloseStream(BaseStream<?, ?> target) { try { if (SOURCE_STAGE != null && SOURCE_CLOSE_ACTION != null && SOURCE_CLOSE_ACTION.get(SOURCE_STAGE.get(target)) == null) return false; } catch (IllegalArgumentException | IllegalAccessException e) { // ignore } return true; }
AbstractStreamBuilder(PipelineImpl<?> pipeline, StreamTerminator streamTerminator, Set<BaseStream<?, ?>> streamSet) { this.pipeline = requireNonNull(pipeline); this.streamTerminator = requireNonNull(streamTerminator); this.closeHandlers = new ArrayList<>(); this.streamSet = streamSet; this.linkedOrConsumed = false; }
private ReferencePipeline<MockEntity> createPipeline(Action<?, ?> action) { @SuppressWarnings("unchecked") final Supplier<Stream<MockEntity>> supplier = mock(Supplier.class); final Stream<MockEntity> stream = MockEntityUtil.stream((int) SQL_COUNT_RESULT); when(supplier.get()).thenReturn(stream); @SuppressWarnings("unchecked") final ReferencePipeline<MockEntity> pipeline = new PipelineImpl<>((Supplier<BaseStream<?, ?>>) (Object) supplier); pipeline.add(action); return pipeline; }
@Override public BaseStream onClose(Runnable arg0) { return stream.onClose(arg0); }
@Override public BaseStream parallel() { return stream.parallel(); }
@Override public BaseStream sequential() { return stream.sequential(); }
@Override public BaseStream unordered() { return stream.unordered(); }
private <T extends BaseStream<?, T>> T maybeParallel(final T in) { return useParallelStreams() ? in.parallel() : in; }
void assertUnsized(BaseStream<?, ?> s) { Spliterator<?> sp = s.spliterator(); assertFalse(sp.hasCharacteristics(Spliterator.SIZED | Spliterator.SUBSIZED)); assertEquals(sp.estimateSize(), Long.MAX_VALUE); }
void assertSized(BaseStream<?, ?> s) { Spliterator<?> sp = s.spliterator(); assertTrue(sp.hasCharacteristics(Spliterator.SIZED | Spliterator.SUBSIZED)); assertTrue(sp.estimateSize() < Long.MAX_VALUE); }
@Override public <Q, QS extends BaseStream<Q, QS>> HasNext<Q, QS> append(Action<T, TS, Q, QS> next) { @SuppressWarnings("unchecked") // An empty stream is an empty stream. final HasNext<Q, QS> casted = (HasNext<Q, QS>) this; return casted; }
@Override public <Q, QS extends BaseStream<Q, QS>> HasNext<Q, QS> append(Action<T, Stream<T>, Q, QS> next) { return next; }
@Override public <Q, QS extends BaseStream<Q, QS>> HasNext<Q, QS> append(Action<Long, LongStream, Q, QS> next) { return next; }
@Override public <Q, QS extends BaseStream<Q, QS>> HasNext<Q, QS> append(Action<T, TS, Q, QS> next) { return next; }
@Override public <Q, QS extends BaseStream<Q, QS>> HasNext<Q, QS> append(Action<R, Stream<R>, Q, QS> next) { return next; }
@Override public <Q, QS extends BaseStream<Q, QS>> HasNext<Q, QS> append(Action<Double, DoubleStream, Q, QS> next) { return next; }
@Override public <Q, QS extends BaseStream<Q, QS>> HasNext<Q, QS> append(Action<Integer, IntStream, Q, QS> next) { return next; }