public static void dispatch(int trafficUnitsNumber, double timeSec, DateLocation dateLocation, double[] speedLimitByLane) { ExecutorService execService = ForkJoinPool.commonPool(); try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){ subscribe(publisher, execService, Process.AVERAGE_SPEED, timeSec, dateLocation, speedLimitByLane); subscribe(publisher, execService, Process.TRAFFIC_DENSITY, timeSec, dateLocation, speedLimitByLane); publisher.submit(trafficUnitsNumber); } finally { try { execService.shutdown(); execService.awaitTermination(1, TimeUnit.SECONDS); } catch (Exception ex) { System.out.println("Caught around execService.awaitTermination(): " + ex.getClass().getName()); } finally { execService.shutdownNow(); } } }
private static void demo4_Flow_submissionPublisher() { System.out.println(); ExecutorService execService = ForkJoinPool.commonPool();//Executors.newFixedThreadPool(3); try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){//execService, 1)){ demoSubscribe(publisher, execService, "One"); demoSubscribe(publisher, execService, "Two"); demoSubscribe(publisher, execService, "Three"); IntStream.range(1, 5).forEach(publisher::submit); } finally { try { execService.shutdown(); int shutdownDelaySec = 1; System.out.println("Waiting for " + shutdownDelaySec + " sec before shutting down service..."); execService.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS); } catch (Exception ex) { System.out.println("Caught around execService.awaitTermination(): " + ex.getClass().getName()); } finally { System.out.println("Calling execService.shutdownNow()..."); List<Runnable> l = execService.shutdownNow(); System.out.println(l.size() + " tasks were waiting to be executed. Service stopped."); } } }
@Test public void teststockRemoval() throws InterruptedException { Stock stock = new Stock(); SubmissionPublisher<Order> p = new SubmissionPublisher<>(); p.subscribe(new StockMaintain(stock)); Product product = new Product(); stock.store(product, 40); OrderItem item = new OrderItem(); item.setProduct(product); item.setAmount(10); Order order = new Order(); List<OrderItem> items = new LinkedList<>(); items.add(item); order.setItems(items); for (int i = 0; i < 10; i++) p.submit(order); log.info("所有订单已经提交完毕"); for (int j = 0; j < 10; j++) { log.info("Sleeping a bit..."); Thread.sleep(50); } p.close(); log.info("Publisher已关闭"); }
@Test public void testInventoryRemoval() throws InterruptedException { Inventory inventory = new Inventory(); SubmissionPublisher<Order> p = new SubmissionPublisher<>();//Executors.newFixedThreadPool(6), 20); p.subscribe(new InventoryKeeper(inventory)); Product product = new Product(); inventory.store(product, 20); OrderItem item = new OrderItem(); item.setProduct(product); item.setAmount(10); Order order = new Order(); List<OrderItem> items = new LinkedList<>(); items.add(item); order.setItems(items); for (int i = 0; i < 10; i++) p.submit(order); log.info("All orders were submitted"); for (int j = 0; j < 10; j++) { log.info("Sleeping a bit..."); Thread.sleep(50); } p.close(); log.info("Publisher was closed"); }
public static void main(String[] args) { Consumer1 consumer1=new Consumer1(); Consumer2 consumer2=new Consumer2(); Consumer3 consumer3=new Consumer3(); SubmissionPublisher<Item> publisher=new SubmissionPublisher<>(); publisher.subscribe(consumer1); publisher.subscribe(consumer2); publisher.subscribe(consumer3); for (int i=0; i<10; i++) { Item item =new Item(); item.setTitle("Item "+i); item.setContent("This is the item "+i); publisher.submit(item); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } publisher.close(); }
/** * A closed publisher reports isClosed with no closedException and * throws IllegalStateException upon attempted submission; a * subsequent close or closeExceptionally has no additional * effect. */ public void testClose() { SubmissionPublisher<Integer> p = basicPublisher(); checkInitialState(p); p.close(); assertTrue(p.isClosed()); assertNull(p.getClosedException()); try { p.submit(1); shouldThrow(); } catch (IllegalStateException success) {} Throwable ex = new SPException(); p.closeExceptionally(ex); assertTrue(p.isClosed()); assertNull(p.getClosedException()); }
/** * A publisher closedExceptionally reports isClosed with the * closedException and throws IllegalStateException upon attempted * submission; a subsequent close or closeExceptionally has no * additional effect. */ public void testCloseExceptionally() { SubmissionPublisher<Integer> p = basicPublisher(); checkInitialState(p); Throwable ex = new SPException(); p.closeExceptionally(ex); assertTrue(p.isClosed()); assertSame(p.getClosedException(), ex); try { p.submit(1); shouldThrow(); } catch (IllegalStateException success) {} p.close(); assertTrue(p.isClosed()); assertSame(p.getClosedException(), ex); }
/** * Upon attempted resubscription, the subscriber's onError is * called and the subscription is cancelled. */ public void testSubscribe4() { TestSubscriber s = new TestSubscriber(); SubmissionPublisher<Integer> p = basicPublisher(); p.subscribe(s); assertTrue(p.hasSubscribers()); assertEquals(1, p.getNumberOfSubscribers()); assertTrue(p.getSubscribers().contains(s)); assertTrue(p.isSubscribed(s)); s.awaitSubscribe(); assertNotNull(s.sn); assertEquals(0, s.nexts); assertEquals(0, s.errors); assertEquals(0, s.completes); p.subscribe(s); s.awaitError(); assertEquals(0, s.nexts); assertEquals(1, s.errors); assertFalse(p.isSubscribed(s)); }
/** * Closing a publisher causes onComplete to subscribers */ public void testCloseCompletes() { SubmissionPublisher<Integer> p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); p.submit(1); p.close(); assertTrue(p.isClosed()); assertNull(p.getClosedException()); s1.awaitComplete(); assertEquals(1, s1.nexts); assertEquals(1, s1.completes); s2.awaitComplete(); assertEquals(1, s2.nexts); assertEquals(1, s2.completes); }
/** * Closing a publisher exceptionally causes onError to subscribers * after they are subscribed */ public void testCloseExceptionallyError() { SubmissionPublisher<Integer> p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); p.submit(1); p.closeExceptionally(new SPException()); assertTrue(p.isClosed()); s1.awaitSubscribe(); s1.awaitError(); assertTrue(s1.nexts <= 1); assertEquals(1, s1.errors); s2.awaitSubscribe(); s2.awaitError(); assertTrue(s2.nexts <= 1); assertEquals(1, s2.errors); }
/** * Cancelling a subscription eventually causes no more onNexts to be issued */ public void testCancel() { SubmissionPublisher<Integer> p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); s1.awaitSubscribe(); p.submit(1); s1.sn.cancel(); for (int i = 2; i <= 20; ++i) p.submit(i); p.close(); s2.awaitComplete(); assertEquals(20, s2.nexts); assertEquals(1, s2.completes); assertTrue(s1.nexts < 20); assertFalse(p.isSubscribed(s1)); }
/** * Throwing an exception in onNext causes onError */ public void testThrowOnNext() { SubmissionPublisher<Integer> p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); s1.awaitSubscribe(); p.submit(1); s1.throwOnCall = true; p.submit(2); p.close(); s2.awaitComplete(); assertEquals(2, s2.nexts); s1.awaitComplete(); assertEquals(1, s1.errors); }
/** * If a handler is supplied in constructor, it is invoked when * subscriber throws an exception in onNext */ public void testThrowOnNextHandler() { AtomicInteger calls = new AtomicInteger(); SubmissionPublisher<Integer> p = new SubmissionPublisher<>( basicExecutor, 8, (s, e) -> calls.getAndIncrement()); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); s1.awaitSubscribe(); p.submit(1); s1.throwOnCall = true; p.submit(2); p.close(); s2.awaitComplete(); assertEquals(2, s2.nexts); assertEquals(1, s2.completes); s1.awaitError(); assertEquals(1, s1.errors); assertEquals(1, calls.get()); }
/** * onNext items are issued in the same order to each subscriber */ public void testOrder() { SubmissionPublisher<Integer> p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); for (int i = 1; i <= 20; ++i) p.submit(i); p.close(); s2.awaitComplete(); s1.awaitComplete(); assertEquals(20, s2.nexts); assertEquals(1, s2.completes); assertEquals(20, s1.nexts); assertEquals(1, s1.completes); }
/** * onNext is issued only if requested */ public void testRequest1() { SubmissionPublisher<Integer> p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); s1.request = false; p.subscribe(s1); s1.awaitSubscribe(); assertEquals(0, p.estimateMinimumDemand()); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s2); p.submit(1); p.submit(2); s2.awaitNext(1); assertEquals(0, s1.nexts); s1.sn.request(3); p.submit(3); p.close(); s2.awaitComplete(); assertEquals(3, s2.nexts); assertEquals(1, s2.completes); s1.awaitComplete(); assertTrue(s1.nexts > 0); assertEquals(1, s1.completes); }
/** * onNext is not issued when requests become zero */ public void testRequest2() { SubmissionPublisher<Integer> p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); s1.request = false; p.submit(1); p.submit(2); p.close(); s2.awaitComplete(); assertEquals(2, s2.nexts); assertEquals(1, s2.completes); s1.awaitNext(1); assertEquals(1, s1.nexts); }
/** * submit returns number of lagged items, compatible with result * of estimateMaximumLag. */ public void testLaggedSubmit() { SubmissionPublisher<Integer> p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); s2.request = false; p.subscribe(s1); p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); assertEquals(1, p.submit(1)); assertTrue(p.estimateMaximumLag() >= 1); assertTrue(p.submit(2) >= 2); assertTrue(p.estimateMaximumLag() >= 2); s1.sn.request(4); assertTrue(p.submit(3) >= 3); assertTrue(p.estimateMaximumLag() >= 3); s2.sn.request(4); p.submit(4); p.close(); s2.awaitComplete(); assertEquals(4, s2.nexts); s1.awaitComplete(); assertEquals(4, s2.nexts); }
/** * submit eventually issues requested items when buffer capacity is 1 */ public void testCap1Submit() { SubmissionPublisher<Integer> p = new SubmissionPublisher<>(basicExecutor, 1); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); for (int i = 1; i <= 20; ++i) { assertTrue(p.estimateMinimumDemand() <= 1); assertTrue(p.submit(i) >= 0); } p.close(); s2.awaitComplete(); s1.awaitComplete(); assertEquals(20, s2.nexts); assertEquals(1, s2.completes); assertEquals(20, s1.nexts); assertEquals(1, s1.completes); }
/** * offer returns number of lagged items if not saturated */ public void testLaggedOffer() { SubmissionPublisher<Integer> p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); s2.request = false; p.subscribe(s1); p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); assertTrue(p.offer(1, null) >= 1); assertTrue(p.offer(2, null) >= 2); s1.sn.request(4); assertTrue(p.offer(3, null) >= 3); s2.sn.request(4); p.offer(4, null); p.close(); s2.awaitComplete(); assertEquals(4, s2.nexts); s1.awaitComplete(); assertEquals(4, s2.nexts); }
/** * offer reports drops if saturated */ public void testDroppedOffer() { SubmissionPublisher<Integer> p = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); s2.request = false; p.subscribe(s1); p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); for (int i = 1; i <= 4; ++i) assertTrue(p.offer(i, null) >= 0); p.offer(5, null); assertTrue(p.offer(6, null) < 0); s1.sn.request(64); assertTrue(p.offer(7, null) < 0); s2.sn.request(64); p.close(); s2.awaitComplete(); assertTrue(s2.nexts >= 4); s1.awaitComplete(); assertTrue(s1.nexts >= 4); }
/** * offer invokes drop handler if saturated */ public void testHandledDroppedOffer() { AtomicInteger calls = new AtomicInteger(); SubmissionPublisher<Integer> p = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); s2.request = false; p.subscribe(s1); p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); for (int i = 1; i <= 4; ++i) assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0); p.offer(4, (s, x) -> noopHandle(calls)); assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0); s1.sn.request(64); assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0); s2.sn.request(64); p.close(); s2.awaitComplete(); s1.awaitComplete(); assertTrue(calls.get() >= 4); }
/** * offer succeeds if drop handler forces request */ public void testRecoveredHandledDroppedOffer() { AtomicInteger calls = new AtomicInteger(); SubmissionPublisher<Integer> p = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); s2.request = false; p.subscribe(s1); p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); int n = 0; for (int i = 1; i <= 8; ++i) { int d = p.offer(i, (s, x) -> reqHandle(calls, s)); n = n + 2 + (d < 0 ? d : 0); } p.close(); s2.awaitComplete(); s1.awaitComplete(); assertEquals(n, s1.nexts + s2.nexts); assertTrue(calls.get() >= 2); }
/** * Timed offer returns number of lagged items if not saturated */ public void testLaggedTimedOffer() { SubmissionPublisher<Integer> p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); s2.request = false; p.subscribe(s1); p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); long startTime = System.nanoTime(); assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1); assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2); s1.sn.request(4); assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3); s2.sn.request(4); p.offer(4, LONG_DELAY_MS, MILLISECONDS, null); p.close(); s2.awaitComplete(); assertEquals(4, s2.nexts); s1.awaitComplete(); assertEquals(4, s2.nexts); assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); }
/** * A closed publisher reports isClosed with no closedException and * throws ISE upon attempted submission; a subsequent close or * closeExceptionally has no additional effect. */ public void testClose() { SubmissionPublisher<Integer> p = basicPublisher(); checkInitialState(p); p.close(); assertTrue(p.isClosed()); assertNull(p.getClosedException()); try { p.submit(1); shouldThrow(); } catch (IllegalStateException success) {} Throwable ex = new SPException(); p.closeExceptionally(ex); assertTrue(p.isClosed()); assertNull(p.getClosedException()); }
/** * A publisher closedExceptionally reports isClosed with the * closedException and throws ISE upon attempted submission; a * subsequent close or closeExceptionally has no additional * effect. */ public void testCloseExceptionally() { SubmissionPublisher<Integer> p = basicPublisher(); checkInitialState(p); Throwable ex = new SPException(); p.closeExceptionally(ex); assertTrue(p.isClosed()); assertSame(p.getClosedException(), ex); try { p.submit(1); shouldThrow(); } catch (IllegalStateException success) {} p.close(); assertTrue(p.isClosed()); assertSame(p.getClosedException(), ex); }
/** * Closing a publisher exceptionally causes onError to subscribers */ public void testCloseExceptionallyError() { SubmissionPublisher<Integer> p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); p.submit(1); p.closeExceptionally(new SPException()); assertTrue(p.isClosed()); s1.awaitError(); assertTrue(s1.nexts <= 1); assertEquals(1, s1.errors); s2.awaitError(); assertTrue(s2.nexts <= 1); assertEquals(1, s2.errors); }
/** * If a handler is supplied in constructor, it is invoked when * subscriber throws an exception in onNext */ public void testThrowOnNextHandler() { AtomicInteger calls = new AtomicInteger(); SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer> (basicExecutor, 8, (s, e) -> calls.getAndIncrement()); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); s1.awaitSubscribe(); p.submit(1); s1.throwOnCall = true; p.submit(2); p.close(); s2.awaitComplete(); assertEquals(2, s2.nexts); assertEquals(1, s2.completes); s1.awaitError(); assertEquals(1, s1.errors); assertEquals(1, calls.get()); }
/** * onNext is issued only if requested */ public void testRequest1() { SubmissionPublisher<Integer> p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); s1.request = false; p.subscribe(s1); s1.awaitSubscribe(); assertTrue(p.estimateMinimumDemand() == 0); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s2); p.submit(1); p.submit(2); s2.awaitNext(1); assertEquals(0, s1.nexts); s1.sn.request(3); p.submit(3); p.close(); s2.awaitComplete(); assertEquals(3, s2.nexts); assertEquals(1, s2.completes); s1.awaitComplete(); assertTrue(s1.nexts > 0); assertEquals(1, s1.completes); }
/** * Negative request causes error */ public void testRequest3() { SubmissionPublisher<Integer> p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); s1.sn.request(-1L); p.submit(1); p.submit(2); p.close(); s2.awaitComplete(); assertEquals(2, s2.nexts); assertEquals(1, s2.completes); s1.awaitError(); assertEquals(1, s1.errors); assertTrue(s1.lastError instanceof IllegalArgumentException); }
/** * submit eventually issues requested items when buffer capacity is 1 */ public void testCap1Submit() { SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>( basicExecutor, 1); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); for (int i = 1; i <= 20; ++i) { assertTrue(p.estimateMinimumDemand() <= 1); assertTrue(p.submit(i) >= 0); } p.close(); s2.awaitComplete(); s1.awaitComplete(); assertEquals(20, s2.nexts); assertEquals(1, s2.completes); assertEquals(20, s1.nexts); assertEquals(1, s1.completes); }