private ZooKeeperCommandExecutor(String replicaId, CommandExecutor delegate, CuratorFramework curator, String zkPath, boolean createPathIfNotExist, File revisionFile, int numWorkers, int maxLogCount, long minLogAgeMillis) { super(replicaId); this.delegate = delegate; this.revisionFile = revisionFile; this.curator = curator; this.zkPath = zkPath; this.createPathIfNotExist = createPathIfNotExist; this.maxLogCount = maxLogCount; this.minLogAgeMillis = minLogAgeMillis; final ThreadPoolExecutor executor = new ThreadPoolExecutor( numWorkers, numWorkers, 60, TimeUnit.SECONDS, new LinkedTransferQueue<>(), new DefaultThreadFactory("zookeeper-command-executor", true)); executor.allowCoreThreadTimeOut(true); this.executor = executor; logWatcher = new PathChildrenCache(curator, absolutePath(LOG_PATH), true); logWatcher.getListenable().addListener(this, MoreExecutors.directExecutor()); oldLogRemover = new OldLogRemover(); leaderSelector = new LeaderSelector(curator, absolutePath(LEADER_PATH), oldLogRemover); leaderSelector.autoRequeue(); }
Collection<Queue<Boolean>> concurrentQueues() { List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>(); queues.add(new ConcurrentLinkedDeque<Boolean>()); queues.add(new ConcurrentLinkedQueue<Boolean>()); queues.add(new ArrayBlockingQueue<Boolean>(count, false)); queues.add(new ArrayBlockingQueue<Boolean>(count, true)); queues.add(new LinkedBlockingQueue<Boolean>()); queues.add(new LinkedBlockingDeque<Boolean>()); queues.add(new LinkedTransferQueue<Boolean>()); // Following additional implementations are available from: // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html // queues.add(new SynchronizedLinkedListQueue<Boolean>()); // Avoid "first fast, second slow" benchmark effect. Collections.shuffle(queues); return queues; }
public static void main(String[] args) { final Comparator<String> firstChar = new Comparator<>() { public int compare(String x, String y) { return x.charAt(0) - y.charAt(0); }}; test(new PriorityQueue<String>(firstChar)); test(new PriorityQueue<String>(10, firstChar)); test(new PriorityBlockingQueue<String>(10, firstChar)); test(new ArrayBlockingQueue<String>(10)); test(new LinkedBlockingQueue<String>(10)); test(new LinkedBlockingDeque<String>(10)); test(new LinkedTransferQueue<String>()); test(new ArrayDeque<String>(10)); System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); if (failed > 0) throw new Error("Some tests failed"); }
public static void main(String[] args) throws Exception { final int maxConsumers = (args.length > 0) ? Integer.parseInt(args[0]) : 5; pool = Executors.newCachedThreadPool(); for (int i = 1; i <= maxConsumers; i += (i+1) >>> 1) { // Adjust iterations to limit typical single runs to <= 10 ms; // Notably, fair queues get fewer iters. // Unbounded queues can legitimately OOME if iterations // high enough, but we have a sufficiently low limit here. run(new ArrayBlockingQueue<Integer>(100), i, 1000); run(new LinkedBlockingQueue<Integer>(100), i, 1000); run(new LinkedBlockingDeque<Integer>(100), i, 1000); run(new LinkedTransferQueue<Integer>(), i, 700); run(new PriorityBlockingQueue<Integer>(), i, 1000); run(new SynchronousQueue<Integer>(), i, 300); run(new SynchronousQueue<Integer>(true), i, 200); run(new ArrayBlockingQueue<Integer>(100, true), i, 100); } pool.shutdown(); if (! pool.awaitTermination(LONG_DELAY_MS, MILLISECONDS)) throw new Error(); pool = null; }
public static void main(String[] args) throws Exception { final int maxPairs = (args.length > 0) ? Integer.parseInt(args[0]) : 5; int iters = 10000; pool = Executors.newCachedThreadPool(); for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) { // Adjust iterations to limit typical single runs to <= 10 ms; // Notably, fair queues get fewer iters. // Unbounded queues can legitimately OOME if iterations // high enough, but we have a sufficiently low limit here. run(new ArrayBlockingQueue<Integer>(100), i, 500); run(new LinkedBlockingQueue<Integer>(100), i, 1000); run(new LinkedBlockingDeque<Integer>(100), i, 1000); run(new LinkedTransferQueue<Integer>(), i, 1000); run(new PriorityBlockingQueue<Integer>(), i, 1000); run(new SynchronousQueue<Integer>(), i, 400); run(new SynchronousQueue<Integer>(true), i, 300); run(new ArrayBlockingQueue<Integer>(100, true), i, 100); } pool.shutdown(); if (! pool.awaitTermination(LONG_DELAY_MS, MILLISECONDS)) throw new Error(); pool = null; }
Collection<Queue<Integer>> concurrentQueues() { List<Queue<Integer>> queues = new ArrayList<>(); queues.add(new ConcurrentLinkedDeque<Integer>()); queues.add(new ConcurrentLinkedQueue<Integer>()); queues.add(new ArrayBlockingQueue<Integer>(items, false)); //queues.add(new ArrayBlockingQueue<Integer>(count, true)); queues.add(new LinkedBlockingQueue<Integer>()); queues.add(new LinkedBlockingDeque<Integer>()); queues.add(new LinkedTransferQueue<Integer>()); // Following additional implementations are available from: // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html // queues.add(new SynchronizedLinkedListQueue<Integer>()); // Avoid "first fast, second slow" benchmark effect. Collections.shuffle(queues); return queues; }
Collection<Queue<Boolean>> concurrentQueues() { List<Queue<Boolean>> queues = new ArrayList<>(); queues.add(new ConcurrentLinkedDeque<Boolean>()); queues.add(new ConcurrentLinkedQueue<Boolean>()); queues.add(new ArrayBlockingQueue<Boolean>(count, false)); queues.add(new ArrayBlockingQueue<Boolean>(count, true)); queues.add(new LinkedBlockingQueue<Boolean>()); queues.add(new LinkedBlockingDeque<Boolean>()); queues.add(new LinkedTransferQueue<Boolean>()); // Following additional implementations are available from: // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html // queues.add(new SynchronizedLinkedListQueue<Boolean>()); // Avoid "first fast, second slow" benchmark effect. Collections.shuffle(queues); return queues; }
/** * Checks that traversal operations collapse a random pattern of * dead nodes as could normally only occur with a race. */ @Test(dataProvider = "traversalActions") public void traversalOperationsCollapseRandomNodes( Consumer<LinkedTransferQueue> traversalAction) { LinkedTransferQueue q = new LinkedTransferQueue(); int n = rnd.nextInt(6); for (int i = 0; i < n; i++) q.add(i); ArrayList nulledOut = new ArrayList(); for (Object p = head(q); p != null; p = next(p)) if (rnd.nextBoolean()) { nulledOut.add(item(p)); ITEM.setVolatile(p, null); } traversalAction.accept(q); int c = nodeCount(q); assertEquals(q.size(), c - (q.contains(n - 1) ? 0 : 1)); for (int i = 0; i < n; i++) assertTrue(nulledOut.contains(i) ^ q.contains(i)); }
@Test(dataProvider = "pollActions") public void pollActionsOneNodeSlack( Consumer<LinkedTransferQueue> pollAction) { LinkedTransferQueue q = new LinkedTransferQueue(); int n = 1 + rnd.nextInt(5); for (int i = 0; i < n; i++) q.add(i); assertEquals(nodeCount(q), n + 1); for (int i = 0; i < n; i++) { int c = nodeCount(q); boolean slack = item(head(q)) == null; if (slack) assertNotNull(item(next(head(q)))); pollAction.accept(q); assertEquals(nodeCount(q), q.isEmpty() ? 1 : c - (slack ? 2 : 0)); } assertInvariants(q); }
@Test(dataProvider = "addActions") public void addActionsOneNodeSlack( Consumer<LinkedTransferQueue> addAction) { LinkedTransferQueue q = new LinkedTransferQueue(); int n = 1 + rnd.nextInt(9); for (int i = 0; i < n; i++) { boolean slack = next(tail(q)) != null; addAction.accept(q); if (slack) assertNull(next(tail(q))); else { assertNotNull(next(tail(q))); assertNull(next(next(tail(q)))); } assertInvariants(q); } }
/** * Queue contains all elements of the collection it is initialized by */ public void testConstructor5() { Integer[] ints = new Integer[SIZE]; for (int i = 0; i < SIZE; ++i) { ints[i] = i; } List intList = Arrays.asList(ints); LinkedTransferQueue q = new LinkedTransferQueue(intList); assertEquals(q.size(), intList.size()); assertEquals(q.toString(), intList.toString()); assertTrue(Arrays.equals(q.toArray(), intList.toArray())); assertTrue(Arrays.equals(q.toArray(new Object[0]), intList.toArray(new Object[0]))); assertTrue(Arrays.equals(q.toArray(new Object[SIZE]), intList.toArray(new Object[SIZE]))); for (int i = 0; i < SIZE; ++i) { assertEquals(ints[i], q.poll()); } }
/** * retainAll(c) retains only those elements of c and reports true * if changed */ public void testRetainAll() { LinkedTransferQueue q = populatedQueue(SIZE); LinkedTransferQueue p = populatedQueue(SIZE); for (int i = 0; i < SIZE; ++i) { boolean changed = q.retainAll(p); if (i == 0) { assertFalse(changed); } else { assertTrue(changed); } assertTrue(q.containsAll(p)); assertEquals(SIZE - i, q.size()); p.remove(); } }
/** * iterator iterates through all elements */ public void testIterator() throws InterruptedException { LinkedTransferQueue q = populatedQueue(SIZE); Iterator it = q.iterator(); int i; for (i = 0; it.hasNext(); i++) assertTrue(q.contains(it.next())); assertEquals(i, SIZE); assertIteratorExhausted(it); it = q.iterator(); for (i = 0; it.hasNext(); i++) assertEquals(it.next(), q.take()); assertEquals(i, SIZE); assertIteratorExhausted(it); }
/** * iterator.remove() removes current element */ public void testIteratorRemove() { final LinkedTransferQueue q = new LinkedTransferQueue(); q.add(two); q.add(one); q.add(three); Iterator it = q.iterator(); it.next(); it.remove(); it = q.iterator(); assertSame(it.next(), one); assertSame(it.next(), three); assertFalse(it.hasNext()); }
/** * offer transfers elements across Executor tasks */ public void testOfferInExecutor() { final LinkedTransferQueue q = new LinkedTransferQueue(); final CheckedBarrier threadsStarted = new CheckedBarrier(2); final ExecutorService executor = Executors.newFixedThreadPool(2); try (PoolCleaner cleaner = cleaner(executor)) { executor.execute(new CheckedRunnable() { public void realRun() throws InterruptedException { threadsStarted.await(); long startTime = System.nanoTime(); assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS)); assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); }}); executor.execute(new CheckedRunnable() { public void realRun() throws InterruptedException { threadsStarted.await(); assertSame(one, q.take()); checkEmpty(q); }}); } }
/** * timed poll retrieves elements across Executor threads */ public void testPollInExecutor() { final LinkedTransferQueue q = new LinkedTransferQueue(); final CheckedBarrier threadsStarted = new CheckedBarrier(2); final ExecutorService executor = Executors.newFixedThreadPool(2); try (PoolCleaner cleaner = cleaner(executor)) { executor.execute(new CheckedRunnable() { public void realRun() throws InterruptedException { assertNull(q.poll()); threadsStarted.await(); long startTime = System.nanoTime(); assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); checkEmpty(q); }}); executor.execute(new CheckedRunnable() { public void realRun() throws InterruptedException { threadsStarted.await(); q.put(one); }}); } }
/** * drainTo(c) empties queue into another collection c */ public void testDrainTo() { LinkedTransferQueue q = populatedQueue(SIZE); ArrayList l = new ArrayList(); q.drainTo(l); assertEquals(0, q.size()); assertEquals(SIZE, l.size()); for (int i = 0; i < SIZE; ++i) { assertEquals(i, l.get(i)); } q.add(zero); q.add(one); assertFalse(q.isEmpty()); assertTrue(q.contains(zero)); assertTrue(q.contains(one)); l.clear(); q.drainTo(l); assertEquals(0, q.size()); assertEquals(2, l.size()); for (int i = 0; i < 2; ++i) { assertEquals(i, l.get(i)); } }
/** * drainTo(c, n) empties first min(n, size) elements of queue into c */ public void testDrainToN() { LinkedTransferQueue q = new LinkedTransferQueue(); for (int i = 0; i < SIZE + 2; ++i) { for (int j = 0; j < SIZE; j++) { assertTrue(q.offer(j)); } ArrayList l = new ArrayList(); q.drainTo(l, i); int k = (i < SIZE) ? i : SIZE; assertEquals(k, l.size()); assertEquals(SIZE - k, q.size()); for (int j = 0; j < k; ++j) assertEquals(j, l.get(j)); do {} while (q.poll() != null); } }
/** * transfer waits until a poll occurs. The transfered element * is returned by the associated poll. */ public void testTransfer2() throws InterruptedException { final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>(); final CountDownLatch threadStarted = new CountDownLatch(1); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { threadStarted.countDown(); q.transfer(five); checkEmpty(q); }}); threadStarted.await(); Callable<Boolean> oneElement = new Callable<Boolean>() { public Boolean call() { return !q.isEmpty() && q.size() == 1; }}; waitForThreadToEnterWaitState(t, oneElement); assertSame(five, q.poll()); checkEmpty(q); awaitTermination(t); }
/** * transfer waits until a poll occurs, at which point the polling * thread returns the element */ public void testTransfer4() throws InterruptedException { final LinkedTransferQueue q = new LinkedTransferQueue(); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { q.transfer(four); assertFalse(q.contains(four)); assertSame(three, q.poll()); }}); while (q.isEmpty()) Thread.yield(); assertFalse(q.isEmpty()); assertEquals(1, q.size()); assertTrue(q.offer(three)); assertSame(four, q.poll()); awaitTermination(t); }
/** * transfer waits until a take occurs. The transfered element * is returned by the associated take. */ public void testTransfer5() throws InterruptedException { final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>(); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { q.transfer(four); checkEmpty(q); }}); while (q.isEmpty()) Thread.yield(); assertFalse(q.isEmpty()); assertEquals(1, q.size()); assertSame(four, q.take()); checkEmpty(q); awaitTermination(t); }
/** * If there is a consumer waiting in take, tryTransfer returns * true while successfully transfering object. */ public void testTryTransfer4() throws InterruptedException { final Object hotPotato = new Object(); final LinkedTransferQueue q = new LinkedTransferQueue(); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() { while (! q.hasWaitingConsumer()) Thread.yield(); assertTrue(q.hasWaitingConsumer()); checkEmpty(q); assertTrue(q.tryTransfer(hotPotato)); }}); assertSame(q.take(), hotPotato); checkEmpty(q); awaitTermination(t); }
/** * tryTransfer gives up after the timeout and returns false */ public void testTryTransfer6() throws InterruptedException { final LinkedTransferQueue q = new LinkedTransferQueue(); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { long startTime = System.nanoTime(); assertFalse(q.tryTransfer(new Object(), timeoutMillis(), MILLISECONDS)); assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); checkEmpty(q); }}); awaitTermination(t); checkEmpty(q); }
/** * tryTransfer waits for any elements previously in to be removed * before transfering to a poll or take */ public void testTryTransfer7() throws InterruptedException { final LinkedTransferQueue q = new LinkedTransferQueue(); assertTrue(q.offer(four)); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { long startTime = System.nanoTime(); assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS)); assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); checkEmpty(q); }}); while (q.size() != 2) Thread.yield(); assertEquals(2, q.size()); assertSame(four, q.poll()); assertSame(five, q.poll()); checkEmpty(q); awaitTermination(t); }
public LinkedTransferQueueTestHelper(LinkedTransferQueue<GenericTestObject> queueIn, boolean isConsumerIn, boolean isProducerIn, boolean expectInterruptIn) { queue = queueIn; isConsumer = isConsumerIn; isProducer = isProducerIn; expectInterrupt = expectInterruptIn; }
/** * Constructor * @param options the options */ CsvRequestHandler(CsvSourceOptions<R> options) { this.options = options; this.rowPredicate = options.getRowPredicate().orElse(null); this.rowKeyParser = options.getRowKeyParser().orElse(null); this.logBatchSize = options.getLogBatchSize(); if (options.isParallel()) { this.countDownLatch = new CountDownLatch(1); this.queue = new LinkedTransferQueue<>(); final Thread thread = new Thread(this, "DataFrameCsvReaderThread"); thread.setDaemon(true); thread.start(); } }
/** * Creates the buffer */ @Override public void initialize() { super.initialize(); queue=new LinkedTransferQueue<String>(); System.out.printf("Test: The test has been initialized\n"); }
@Test(dataProvider = "spliteratorTraversers") public void testQueue(String desc, Consumer<Queue<String>> c) throws InterruptedException { AtomicBoolean done = new AtomicBoolean(false); Queue<String> msgs = new LinkedTransferQueue<>(); CompletableFuture<Void> traversalTask = CompletableFuture.runAsync(() -> { while (!done.get()) { // Traversal will fail if self-linked nodes of // LinkedTransferQueue are erroneously reported c.accept(msgs); } }); CompletableFuture<Void> addAndRemoveTask = CompletableFuture.runAsync(() -> { while (!traversalTask.isDone()) { msgs.add("msg"); msgs.remove("msg"); } }); Thread.sleep(TimeUnit.SECONDS.toMillis(1)); done.set(true); addAndRemoveTask.join(); Assert.assertTrue(traversalTask.isDone()); traversalTask.join(); }
ExcelSheetContentHandler(ExcelSourceOptions<R> options){ this.options = options; this.rowPredicate = options.getRowPredicate().orElse(null); this.rowKeyParser = options.getRowKeyParser().orElse(null); this.logBatchSize = options.getLogBatchSize(); this.output = new ParserOutput(new CsvParserSettings()); if (options.isParallel()) { this.countDownLatch = new CountDownLatch(1); this.queue = new LinkedTransferQueue<>(); final Thread thread = new Thread(this, "DataFrameExcelReaderThread"); thread.setDaemon(true); thread.start(); } }
void test(String[] args) throws Throwable { test(new LinkedBlockingQueue()); test(new LinkedBlockingQueue(2000)); test(new LinkedBlockingDeque()); test(new LinkedBlockingDeque(2000)); test(new ArrayBlockingQueue(2000)); test(new LinkedTransferQueue()); }
void main() throws Throwable { test(new LinkedBlockingDeque(10)); test(new LinkedBlockingQueue(10)); test(new LinkedTransferQueue()); test(new ArrayBlockingQueue(10)); test(new PriorityBlockingQueue()); test(new SynchronousQueue()); test(new SynchronousQueue(true)); }
void test(String[] args) throws Throwable { testQueue(new LinkedBlockingQueue<Integer>()); testQueue(new LinkedBlockingDeque<Integer>()); testQueue(new ArrayBlockingQueue<Integer>(10, true)); testQueue(new ArrayBlockingQueue<Integer>(10, false)); testQueue(new LinkedTransferQueue<Integer>()); testQueue(new PriorityBlockingQueue<Integer>()); }
void test(String[] args) throws Throwable { test(new LinkedBlockingQueue()); test(new LinkedBlockingQueue(20)); test(new LinkedBlockingDeque()); test(new LinkedBlockingDeque(20)); test(new ConcurrentLinkedDeque()); test(new ConcurrentLinkedQueue()); test(new LinkedTransferQueue()); test(new ArrayBlockingQueue(20)); }
void test(String[] args) throws Throwable { testQueue(new LinkedBlockingQueue(10)); testQueue(new LinkedBlockingQueue()); testQueue(new LinkedBlockingDeque(10)); testQueue(new LinkedBlockingDeque()); testQueue(new ArrayBlockingQueue(10)); testQueue(new PriorityBlockingQueue(10)); testQueue(new ConcurrentLinkedDeque()); testQueue(new ConcurrentLinkedQueue()); testQueue(new LinkedTransferQueue()); }
int nodeCount(LinkedTransferQueue q) { int i = 0; for (Object p = head(q); p != null; ) { i++; if (p == (p = next(p))) p = head(q); } return i; }
int tailCount(LinkedTransferQueue q) { int i = 0; for (Object p = tail(q); p != null; ) { i++; if (p == (p = next(p))) p = head(q); } return i; }
Object findNode(LinkedTransferQueue q, Object e) { for (Object p = head(q); p != null; ) { if (item(p) != null && e.equals(item(p))) return p; if (p == (p = next(p))) p = head(q); } throw new AssertionError("not found"); }
@Test public void addRemove() { LinkedTransferQueue q = new LinkedTransferQueue(); assertInvariants(q); assertNull(next(head(q))); assertNull(item(head(q))); q.add(1); assertEquals(nodeCount(q), 2); assertInvariants(q); q.remove(1); assertEquals(nodeCount(q), 1); assertInvariants(q); }
/** * Traversal actions that visit every node and do nothing, but * have side effect of squeezing out dead nodes. */ @DataProvider public Object[][] traversalActions() { return List.<Consumer<LinkedTransferQueue>>of( q -> q.forEach(e -> {}), q -> assertFalse(q.contains(new Object())), q -> assertFalse(q.remove(new Object())), q -> q.spliterator().forEachRemaining(e -> {}), q -> q.stream().collect(toList()), q -> assertFalse(q.removeIf(e -> false)), q -> assertFalse(q.removeAll(List.of()))) .stream().map(x -> new Object[]{ x }).toArray(Object[][]::new); }
@Test(dataProvider = "traversalActions") public void traversalOperationsCollapseLeadingNodes( Consumer<LinkedTransferQueue> traversalAction) { LinkedTransferQueue q = new LinkedTransferQueue(); Object oldHead; int n = 1 + rnd.nextInt(5); for (int i = 0; i < n; i++) q.add(i); assertEquals(nodeCount(q), n + 1); oldHead = head(q); traversalAction.accept(q); assertInvariants(q); assertEquals(nodeCount(q), n); assertIsSelfLinked(oldHead); }