@Test public void workerScheduleOnceUsesHook() { final CountingRunnable newCounter = new CountingRunnable(); final AtomicReference<Runnable> runnableRef = new AtomicReference<>(); RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() { @Override public Runnable apply(Runnable runnable) { runnableRef.set(runnable); return newCounter; } }); Worker worker = scheduler.createWorker(); CountingRunnable counter = new CountingRunnable(); worker.schedule(counter); // Verify our runnable was passed to the schedulers hook. assertSame(counter, runnableRef.get()); runUiThreadTasks(); // Verify the scheduled runnable was the one returned from the hook. assertEquals(1, newCounter.get()); assertEquals(0, counter.get()); }
@Test public void workerScheduleOnceWithDelayUsesHook() { final CountingRunnable newCounter = new CountingRunnable(); final AtomicReference<Runnable> runnableRef = new AtomicReference<>(); RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() { @Override public Runnable apply(Runnable runnable) { runnableRef.set(runnable); return newCounter; } }); Worker worker = scheduler.createWorker(); CountingRunnable counter = new CountingRunnable(); worker.schedule(counter, 1, MINUTES); // Verify our runnable was passed to the schedulers hook. assertSame(counter, runnableRef.get()); idleMainLooper(1, MINUTES); runUiThreadTasks(); // Verify the scheduled runnable was the one returned from the hook. assertEquals(1, newCounter.get()); assertEquals(0, counter.get()); }
@Test @Ignore("Implementation delegated to default RxJava implementation") public void workerSchedulePeriodicallyReschedulesItself() { Worker worker = scheduler.createWorker(); CountingRunnable counter = new CountingRunnable(); worker.schedulePeriodically(counter, 1, 1, MINUTES); runUiThreadTasks(); assertEquals(0, counter.get()); idleMainLooper(1, MINUTES); runUiThreadTasks(); assertEquals(1, counter.get()); idleMainLooper(1, MINUTES); runUiThreadTasks(); assertEquals(2, counter.get()); idleMainLooper(1, MINUTES); runUiThreadTasks(); assertEquals(3, counter.get()); }
@Test @Ignore("Implementation delegated to default RxJava implementation") public void workerSchedulePeriodicallyDisposedDoesNotRun() { Worker worker = scheduler.createWorker(); CountingRunnable counter = new CountingRunnable(); Disposable disposable = worker.schedulePeriodically(counter, 1, 1, MINUTES); runUiThreadTasks(); assertEquals(0, counter.get()); idleMainLooper(1, MINUTES); runUiThreadTasks(); assertEquals(1, counter.get()); idleMainLooper(1, MINUTES); runUiThreadTasks(); assertEquals(2, counter.get()); disposable.dispose(); idleMainLooper(1, MINUTES); runUiThreadTasks(); assertEquals(2, counter.get()); }
@Test public void workerUnsubscriptionDuringSchedulingCancelsScheduledAction() { final AtomicReference<Worker> workerRef = new AtomicReference<>(); RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() { @Override public Runnable apply(Runnable runnable) { // Purposefully unsubscribe in an asinine point after the normal unsubscribed check. workerRef.get().dispose(); return runnable; } }); Worker worker = scheduler.createWorker(); workerRef.set(worker); CountingRunnable counter = new CountingRunnable(); worker.schedule(counter); runUiThreadTasks(); assertEquals(0, counter.get()); }
@Test public void workerUnsubscriptionDoesNotAffectOtherWorkers() { Worker workerA = scheduler.createWorker(); CountingRunnable counterA = new CountingRunnable(); workerA.schedule(counterA, 1, MINUTES); Worker workerB = scheduler.createWorker(); CountingRunnable counterB = new CountingRunnable(); workerB.schedule(counterB, 1, MINUTES); workerA.dispose(); runUiThreadTasksIncludingDelayedTasks(); assertEquals(0, counterA.get()); assertEquals(1, counterB.get()); }
@Test public void workerPeriodic() throws Exception { Task t = new Task(3); Worker w = SwingSchedulers.edt().createWorker(); try { Disposable d = w.schedulePeriodically(t, 100, 100, TimeUnit.MILLISECONDS); Assert.assertTrue(t.await(5, TimeUnit.SECONDS)); d.dispose(); Thread.sleep(500); Assert.assertEquals(3, t.calls); } finally { w.dispose(); } }
@Test public void workerDispose() throws Exception { Task t = new Task(1); Worker w = SwingSchedulers.edt().createWorker(); try { Disposable d = w.schedule(t, 500, TimeUnit.MILLISECONDS); Thread.sleep(100); d.dispose(); Thread.sleep(500); Assert.assertEquals(0, t.calls); } finally { w.dispose(); } }
@Test(timeout = 5000) public void futureDisposeRace() throws Exception { SharedScheduler scheduler = new SharedScheduler(Schedulers.computation()); try { Worker w = scheduler.createWorker(); for (int i = 0; i < 1000; i++) { w.schedule(this); } while (calls != 1000) { Thread.sleep(100); } } finally { scheduler.shutdown(); } }
protected void taskThrows(Scheduler s) throws InterruptedException { try { List<Throwable> errors = TestHelper.trackPluginErrors(); Worker w = s.createWorker(); w.schedule(new Runnable() { @Override public void run() { calls.getAndIncrement(); throw new IllegalStateException(); } }); while (errors.isEmpty()) { Thread.sleep(20); } TestHelper.assertError(errors, 0, IllegalStateException.class); } finally { s.shutdown(); } }
@Test public void setFutureRace() { final Scheduler s = new ParallelScheduler(2, true); try { for (int i = 0; i < 1000; i++) { final Worker w = s.createWorker(); Runnable r1 = new Runnable() { @Override public void run() { w.schedule(ParallelSchedulerTest.this); } }; Runnable r2 = new Runnable() { @Override public void run() { w.dispose(); } }; TestHelper.race(r1, r2, Schedulers.single()); } } finally { s.shutdown(); } }
@Test(timeout = 10000) public void workerCrash() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { final BlockingScheduler scheduler = new BlockingScheduler(); scheduler.execute(new Action() { @Override public void run() throws Exception { final Worker worker = scheduler.createWorker(); worker.schedule(new Runnable() { @Override public void run() { worker.dispose(); scheduler.shutdown(); throw new IllegalArgumentException(); } }); } }); TestHelper.assertError(errors, 0, IllegalArgumentException.class); } finally { RxJavaPlugins.reset(); } }
public static void blockUntilWorkFinished(Scheduler scheduler, int numThreads, long timeout, TimeUnit unit) { final CountDownLatch latch = new CountDownLatch(numThreads); for (int i = 1; i <= numThreads; i++) { final Worker worker = scheduler.createWorker(); worker.schedule(new Runnable() { @Override public void run() { worker.dispose(); latch.countDown(); } }); } try { boolean finished = latch.await(timeout, unit); if (!finished) { throw new RuntimeException("timeout occured waiting for work to finish"); } } catch (InterruptedException e) { throw new RuntimeException(e); } }
@Test public void workerScheduleOncePostsImmediately() { Worker worker = scheduler.createWorker(); CountingRunnable counter = new CountingRunnable(); worker.schedule(counter); runUiThreadTasks(); assertEquals(1, counter.get()); }
@Test public void workerScheduleOnceWithNegativeDelayPostsImmediately() { Worker worker = scheduler.createWorker(); CountingRunnable counter = new CountingRunnable(); worker.schedule(counter, -1, TimeUnit.MINUTES); runUiThreadTasks(); assertEquals(1, counter.get()); }
@Test public void workerScheduleOnceDisposedDoesNotRun() { Worker worker = scheduler.createWorker(); CountingRunnable counter = new CountingRunnable(); Disposable disposable = worker.schedule(counter); disposable.dispose(); runUiThreadTasks(); assertEquals(0, counter.get()); }
@Test public void workerScheduleOnceWithDelayPostsWithDelay() { Worker worker = scheduler.createWorker(); CountingRunnable counter = new CountingRunnable(); worker.schedule(counter, 1, MINUTES); runUiThreadTasks(); assertEquals(0, counter.get()); idleMainLooper(1, MINUTES); runUiThreadTasks(); assertEquals(1, counter.get()); }
@Test public void workerScheduleOnceWithDelayDisposedDoesNotRun() { Worker worker = scheduler.createWorker(); CountingRunnable counter = new CountingRunnable(); Disposable disposable = worker.schedule(counter, 1, MINUTES); idleMainLooper(30, SECONDS); disposable.dispose(); idleMainLooper(30, SECONDS); runUiThreadTasks(); assertEquals(0, counter.get()); }
@Test @Ignore("Implementation delegated to default RxJava implementation") public void workerSchedulePeriodicallyUsesHookOnce() { Worker worker = scheduler.createWorker(); final CountingRunnable newCounter = new CountingRunnable(); final AtomicReference<Runnable> runnableRef = new AtomicReference<>(); RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() { @Override public Runnable apply(Runnable runnable) { runnableRef.set(runnable); return newCounter; } }); CountingRunnable counter = new CountingRunnable(); worker.schedulePeriodically(counter, 1, 1, MINUTES); // Verify our action was passed to the schedulers hook. assertSame(counter, runnableRef.get()); runnableRef.set(null); idleMainLooper(1, MINUTES); runUiThreadTasks(); // Verify the scheduled action was the one returned from the hook. assertEquals(1, newCounter.get()); assertEquals(0, counter.get()); // Ensure the hook was not called again when the runnable re-scheduled itself. assertNull(runnableRef.get()); }
@Test @Ignore("Implementation delegated to default RxJava implementation") public void workerSchedulePeriodicallyDisposedDuringRunDoesNotReschedule() { Worker worker = scheduler.createWorker(); final AtomicReference<Disposable> disposableRef = new AtomicReference<>(); CountingRunnable counter = new CountingRunnable() { @Override public void run() { super.run(); if (get() == 2) { disposableRef.get().dispose(); } } }; Disposable disposable = worker.schedulePeriodically(counter, 1, 1, MINUTES); disposableRef.set(disposable); runUiThreadTasks(); assertEquals(0, counter.get()); idleMainLooper(1, MINUTES); runUiThreadTasks(); assertEquals(1, counter.get()); idleMainLooper(1, MINUTES); runUiThreadTasks(); assertEquals(2, counter.get()); // Dispose will have happened here during the last run() execution. idleMainLooper(1, MINUTES); runUiThreadTasks(); assertEquals(2, counter.get()); }
@Test @Ignore("Implementation delegated to default RxJava implementation") public void workerSchedulePeriodicallyThrowingDoesNotReschedule() { Worker worker = scheduler.createWorker(); CountingRunnable counter = new CountingRunnable() { @Override public void run() { super.run(); if (get() == 2) { throw new RuntimeException("Broken!"); } } }; worker.schedulePeriodically(counter, 1, 1, MINUTES); runUiThreadTasks(); assertEquals(0, counter.get()); idleMainLooper(1, MINUTES); runUiThreadTasks(); assertEquals(1, counter.get()); idleMainLooper(1, MINUTES); runUiThreadTasks(); assertEquals(2, counter.get()); // Exception will have happened here during the last run() execution. idleMainLooper(1, MINUTES); runUiThreadTasks(); assertEquals(2, counter.get()); }
@Test public void workerDisposableTracksDisposedState() { Worker worker = scheduler.createWorker(); CountingRunnable counter = new CountingRunnable(); Disposable disposable = worker.schedule(counter); assertFalse(disposable.isDisposed()); disposable.dispose(); assertTrue(disposable.isDisposed()); }
@Test public void workerDisposeCancelsScheduled() { Worker worker = scheduler.createWorker(); CountingRunnable counter = new CountingRunnable(); worker.schedule(counter, 1, MINUTES); worker.dispose(); runUiThreadTasks(); assertEquals(0, counter.get()); }
@Test public void workerTracksDisposedState() { Worker worker = scheduler.createWorker(); assertFalse(worker.isDisposed()); worker.dispose(); assertTrue(worker.isDisposed()); }
@Test public void disposedWorkerReturnsDisposedDisposables() { Worker worker = scheduler.createWorker(); worker.dispose(); Disposable disposable = worker.schedule(new CountingRunnable()); assertTrue(disposable.isDisposed()); }
@Test public void worker() throws Exception { Task t = new Task(1); Worker w = SwingSchedulers.edt().createWorker(); try { w.schedule(t); Assert.assertTrue(t.await(5, TimeUnit.SECONDS)); } finally { w.dispose(); } }
@Test public void workerDelay() throws Exception { Task t = new Task(1); Worker w = SwingSchedulers.edt().createWorker(); try { w.schedule(t, 100, TimeUnit.MILLISECONDS); Assert.assertTrue(t.await(5, TimeUnit.SECONDS)); } finally { w.dispose(); } }
private boolean tryEmit(Observers<T> obs, DecoratingMember<T> m) { // get a fresh worker each time so we jump threads to // break the stack-trace (a long-enough chain of // checkout-checkins could otherwise provoke stack // overflow) // advance counter so the next and choose an Observer to emit to (round robin) int index = obs.index; MemberSingleObserver<T> o = obs.observers[index]; MemberSingleObserver<T> oNext = o; // atomically bump up the index (if that entry has not been deleted in // the meantime by disposal) while (true) { Observers<T> x = observers.get(); if (x.index == index && x.observers[index] == o) { boolean[] active = new boolean[x.active.length]; System.arraycopy(x.active, 0, active, 0, active.length); int nextIndex = (index + 1) % active.length; while (nextIndex != index && !active[nextIndex]) { nextIndex = (nextIndex + 1) % active.length; } active[nextIndex] = false; if (observers.compareAndSet(x, new Observers<T>(x.observers, active, x.activeCount - 1, nextIndex))) { oNext = x.observers[nextIndex]; break; } } else { // checkin because no active observers m.checkin(); return false; } } Worker worker = scheduler.createWorker(); worker.schedule(new Emitter<T>(worker, oNext, m)); return true; }
OnBackpressureTimeoutSubscriber(Subscriber<? super T> actual, int maxSize, long timeout, TimeUnit unit, Worker worker, Consumer<? super T> onEvict) { this.actual = actual; this.maxSizeDouble = maxSize << 1; this.timeout = timeout; this.unit = unit; this.worker = worker; this.onEvict = onEvict; this.requested = new AtomicLong(); this.queue = new ArrayDeque<Object>(); }
@SuppressWarnings("unchecked") ZipLatestCoordinator(Subscriber<? super R> actual, int n, Worker worker, Function<? super Object[], ? extends R> combiner) { super(n); this.actual = actual; this.subscribers = new InnerSubscriber[n]; this.wip = new AtomicInteger(); this.requested = new AtomicLong(); this.errors = new AtomicThrowable(); this.worker = worker; for (int i = 0; i < n; i++) { subscribers[i] = new InnerSubscriber<T>(this, i); } this.combiner = combiner; }
TimeoutLast(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker) { super(actual); this.timeout = timeout; this.unit = unit; this.worker = worker; this.task = new SequentialDisposable(); this.index = new AtomicLong(); this.value = new AtomicReference<T>(); }
SpanoutSubscriber(Subscriber<? super T> actual, long initialSpan, long betweenSpan, Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.initialSpan = initialSpan; this.betweenSpan = betweenSpan; this.worker = worker; this.delayError = delayError; this.lastEvent = -1L; this.queue = new SpscLinkedArrayQueue<T>(bufferSize); }
@Override protected void subscribeActual(Subscriber<? super T> s) { Worker worker = scheduler.createWorker(); SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(s, worker, source); s.onSubscribe(parent); DisposableHelper.replace(parent.task, worker.schedule(parent)); }
SubscribeOnSubscriber(Subscriber<? super T> actual, Worker worker, Publisher<T> source) { this.actual = actual; this.worker = worker; this.source = source; this.task = new AtomicReference<Disposable>(); this.requested = new AtomicBoolean(); }
void cancelledTask(Scheduler s) throws InterruptedException { try { Worker w = s.createWorker(); try { assertFalse(w.isDisposed()); Disposable d = w.schedule(this, 200, TimeUnit.MILLISECONDS); assertFalse(d.isDisposed()); d.dispose(); assertTrue(d.isDisposed()); Thread.sleep(300); assertEquals(0, calls.get()); w.dispose(); assertTrue(w.isDisposed()); } finally { w.dispose(); } } finally { s.shutdown(); } }
@Override protected void subscribeActual(Subscriber<? super T> child) { PagedQueue queue = new PagedQueue(options.fileFactory(), options.pageSizeBytes()); Worker worker = options.scheduler().createWorker(); if (source != null) { source.subscribe( new BufferToFileSubscriberFlowable<T>(child, queue, serializer, worker)); } else { source2.subscribe( new BufferToFileSubscriberObservable<T>(child, queue, serializer, worker)); } }
BufferToFileSubscriber(Subscriber<? super T> child, PagedQueue queue, Serializer<T> serializer, Worker worker) { this.child = child; this.queue = queue; this.serializer = serializer; this.worker = worker; }
@Test public void testDispose() { Scheduler s = SchedulerHelper.withThreadId(Schedulers.trampoline(), "boo"); Worker w = s.createWorker(); Assert.assertFalse(w.isDisposed()); w.dispose(); Assert.assertTrue(w.isDisposed()); }
@Test public void testPollQueueThrowsExceptionEmitsError() { PagedQueue queue = Mockito.mock(PagedQueue.class); RuntimeException err = new RuntimeException(); Mockito.doThrow(err).when(queue).poll(); Worker worker = Schedulers.trampoline().createWorker(); TestSubscriber<String> ts = TestSubscriber.create(1); BufferToFileSubscriberFlowable<String> b = new BufferToFileSubscriberFlowable<String>(ts, queue, Serializers.utf8(), worker); b.onSubscribe(IGNORE); b.request(1); b.run(); Mockito.verify(queue, Mockito.atLeastOnce()).poll(); ts.assertError(err); }