/** * 版本3.0.9以下存在多线程初始化问题,这个类作为一个样例 */ public static void main(String[] args) throws InterruptedException, BrokenBarrierException { System.out.println(Arrays.asList(splitWord)); for (int j = 0; j < 1000; j++) { CyclicBarrier barrier = new CyclicBarrier(11, null); for (int i = 0; i < 10; i++) { Thread t = new Thread(new Worker(barrier), "t" + i); t.start(); } Thread.sleep(500); barrier.await(); while (barrier.getNumberWaiting() > 0) { Thread.sleep(1000); } Thread.sleep(1000); System.out.println(Arrays.asList(splitWord)); } }
/** * A reset before threads enter barrier does not throw * BrokenBarrierException */ public void testReset_NoBrokenBarrier() throws Exception { final CyclicBarrier c = new CyclicBarrier(3); c.reset(); Thread t1 = newStartedThread(new CheckedRunnable() { public void realRun() throws Exception { c.await(); }}); Thread t2 = newStartedThread(new CheckedRunnable() { public void realRun() throws Exception { c.await(); }}); c.await(); awaitTermination(t1); awaitTermination(t2); }
@Test(timeout=20000) public void testCommitJobFailsJob() throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); AsyncDispatcher dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.start(); CyclicBarrier syncBarrier = new CyclicBarrier(2); OutputCommitter committer = new TestingOutputCommitter(syncBarrier, false); CommitterEventHandler commitHandler = createCommitterEventHandler(dispatcher, committer); commitHandler.init(conf); commitHandler.start(); JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); completeJobTasks(job); assertJobState(job, JobStateInternal.COMMITTING); // let the committer fail and verify the job fails syncBarrier.await(); assertJobState(job, JobStateInternal.FAILED); dispatcher.stop(); commitHandler.stop(); }
public static void main(String[] args) throws InterruptedException { final int N=10; Thread[] allSoldier=new Thread[N]; boolean flag=false; CyclicBarrier cyclic=new CyclicBarrier(N,new BarrierRun(flag,N)); System.out.println("集合队伍!"); for(int i=0;i<N;++i){ System.out.println("士兵 "+i+"报道!"); allSoldier[i]=new Thread(new Soldier(cyclic,"士兵"+i)); allSoldier[i].start(); // if(i==5) {这个中断会引起一个interrupt()和n个BrokenBarrierException异常。意思是栅栏破坏掉了, // 线程永远无法完成栅栏 // allSoldier[1].interrupt(); // } } }
/** * Blocks the named executor by getting its only thread running a task blocked on a CyclicBarrier and fills the queue with a noop task. * So requests to use this queue should get {@link EsRejectedExecutionException}s. */ private CyclicBarrier blockExecutor(String name) throws Exception { ThreadPool threadPool = getInstanceFromNode(ThreadPool.class); CyclicBarrier barrier = new CyclicBarrier(2); logger.info("Blocking the [{}] executor", name); threadPool.executor(name).execute(() -> { try { threadPool.executor(name).execute(() -> {}); barrier.await(); logger.info("Blocked the [{}] executor", name); barrier.await(); logger.info("Unblocking the [{}] executor", name); } catch (Exception e) { throw new RuntimeException(e); } }); barrier.await(); blockedExecutors.add(barrier); return barrier; }
public void testCustomScheduler_deadlock() throws InterruptedException, BrokenBarrierException { final CyclicBarrier inGetNextSchedule = new CyclicBarrier(2); // This will flakily deadlock, so run it multiple times to increase the flake likelihood for (int i = 0; i < 1000; i++) { Service service = new AbstractScheduledService() { @Override protected void runOneIteration() {} @Override protected Scheduler scheduler() { return new CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { if (state() != State.STARTING) { inGetNextSchedule.await(); Thread.yield(); throw new RuntimeException("boom"); } return new Schedule(0, TimeUnit.NANOSECONDS); } }; } }; service.startAsync().awaitRunning(); inGetNextSchedule.await(); service.stopAsync(); } }
static void test(int i, int nkeys, String[] key, Class mapClass) throws Exception { System.out.print("Threads: " + i + "\t:"); Map map = (Map) mapClass.newInstance(); // Uncomment to start with a non-empty table // for (int j = 0; j < nkeys; j += 4) // start 1/4 occupied // map.put(key[j], key[j]); LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); CyclicBarrier barrier = new CyclicBarrier(i + 1, timer); for (int t = 0; t < i; ++t) pool.execute(new Runner(t, map, key, barrier)); barrier.await(); barrier.await(); long time = timer.getTime(); long tpo = time / (i * (long) nops); System.out.print(LoopHelpers.rightJustify(tpo) + " ns per op"); double secs = (double) (time) / 1000000000.0; System.out.println("\t " + secs + "s run time"); map.clear(); }
private SomeFeed(int threadCount, boolean barriered) { this.threadCount = threadCount; this.barriered = barriered; if (barriered) { barrier = new CyclicBarrier(threadCount, System.out::println); } launchPublishers(); SomeListener tickOutputter = new SomeListener() { @Override public void priceTick(PriceTick event) { System.out.println("."); } @Override public void error(Throwable throwable) { } }; // register(tickOutputter); }
public SimpleProcessorBenchmarkClientRunnable(String targetIP, int targetPort, int clientNums, int rpcTimeout, CyclicBarrier barrier, CountDownLatch latch, long startTime, long endTime){ this.targetIP = targetIP; this.targetPort = targetPort; this.clientNums = clientNums; this.rpcTimeout = rpcTimeout; this.barrier = barrier; this.latch = latch; this.startTime = startTime; this.endTime = endTime; maxRange = (Integer.parseInt(String.valueOf((endTime - startTime))) / 1000) + 1; errorTPS = new long[maxRange]; errorResponseTimes = new long[maxRange]; tps = new long[maxRange]; responseTimes = new long[maxRange]; // init for (int i = 0; i < maxRange; i++) { errorTPS[i] = 0; errorResponseTimes[i] = 0; tps[i] = 0; responseTimes[i] = 0; } }
/** * adds by multiple threads produce correct sum */ public void testAddAndSumMT() throws Throwable { final int incs = 1000000; final int nthreads = 4; final ExecutorService pool = Executors.newCachedThreadPool(); LongAdder a = new LongAdder(); CyclicBarrier barrier = new CyclicBarrier(nthreads + 1); for (int i = 0; i < nthreads; ++i) pool.execute(new AdderTask(a, barrier, incs)); barrier.await(); barrier.await(); long total = (long)nthreads * incs; long sum = a.sum(); assertEquals(sum, total); pool.shutdown(); }
@Test public void eachThreadGetsDifferentGlobalTxId() throws Exception { CyclicBarrier barrier = new CyclicBarrier(2); Runnable runnable = exceptionalRunnable(() -> { String txId = UUID.randomUUID().toString(); omegaContext.setGlobalTxId(txId); barrier.await(); assertThat(omegaContext.globalTxId(), is(txId)); }); CompletableFuture<Void> future1 = CompletableFuture.runAsync(runnable); CompletableFuture<Void> future2 = CompletableFuture.runAsync(runnable); CompletableFuture.allOf(future1, future2).join(); }
@Test(timeout = 5000) public void invokeEitherPostInterceptOrOnTimeoutConcurrently() throws Exception { List<Future<?>> futures = new LinkedList<>(); for (int i = 0; i < runningCounts; i++) { TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying); CyclicBarrier cyclicBarrier = new CyclicBarrier(2); futures.add(executorService.submit(() -> { waitForSignal(cyclicBarrier); interceptor.postIntercept(localTxId, signature); })); futures.add(executorService.submit(() -> { waitForSignal(cyclicBarrier); interceptor.onTimeout(localTxId, signature, timeoutException); })); } waitTillAllDone(futures); assertThat(postInterceptInvoked.get() + onTimeoutInvoked.get(), is(runningCounts)); }
@Test(timeout = 5000) public void invokeEitherOnErrorOrOnTimeoutConcurrently() throws Exception { RuntimeException oops = new RuntimeException("oops"); List<Future<?>> futures = new LinkedList<>(); for (int i = 0; i < runningCounts; i++) { TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying); CyclicBarrier cyclicBarrier = new CyclicBarrier(2); futures.add(executorService.submit(() -> { waitForSignal(cyclicBarrier); interceptor.onError(localTxId, signature, oops); })); futures.add(executorService.submit(() -> { waitForSignal(cyclicBarrier); interceptor.onTimeout(localTxId, signature, timeoutException); })); } waitTillAllDone(futures); assertThat(onErrorInvoked.get() + onTimeoutInvoked.get(), is(runningCounts)); }
public AbstractClientRunnable(String targetIP, int targetPort, int clientNums, int rpcTimeout, CyclicBarrier barrier, CountDownLatch latch, long startTime, long endTime){ this.barrier = barrier; this.latch = latch; this.startTime = startTime; this.endTime = endTime; serviceFactory.setTargetIP(targetIP); serviceFactory.setClientNums(clientNums); serviceFactory.setTargetPort(targetPort); serviceFactory.setConnectTimeout(rpcTimeout); maxRange = (Integer.parseInt(String.valueOf((endTime - startTime))) / 1000000) + 1; errorTPS = new long[maxRange]; errorResponseTimes = new long[maxRange]; tps = new long[maxRange]; responseTimes = new long[maxRange]; // init for (int i = 0; i < maxRange; i++) { errorTPS[i] = 0; errorResponseTimes[i] = 0; tps[i] = 0; responseTimes[i] = 0; } }
public BidClientRunnable(String protocol, String serialization, String targetIP, int targetPort, int clientNums, int rpcTimeout, CyclicBarrier barrier, CountDownLatch latch, long startTime, long endTime){ super(protocol, serialization, targetIP, targetPort, clientNums, rpcTimeout, barrier, latch, startTime, endTime); Impression imp = new Impression(); imp.setBidFloor(1.1); imp.setId("abc"); List<Impression> imps = new ArrayList<Impression>(1); imps.add(imp); request.setImpressions(imps); Geo geo = new Geo(); geo.setCity("beijing"); geo.setCountry("china"); geo.setLat(100.1f); geo.setLon(100.1f); Device device = new Device(); device.setMake("apple"); device.setOs("ios"); device.setVersion("7.0"); device.setLang("zh_CN"); device.setModel("iphone"); device.setGeo(geo); request.setDevice(device); }
public AbstractClientRunnable(String protocol, String serialization, String targetIP, int targetPort, int clientNums, int rpcTimeout, CyclicBarrier barrier, CountDownLatch latch, long startTime, long endTime){ this.barrier = barrier; this.latch = latch; this.startTime = startTime; this.endTime = endTime; serviceFactory.setProtocol(protocol); serviceFactory.setTargetIP(targetIP); serviceFactory.setClientNums(clientNums); serviceFactory.setTargetPort(targetPort); serviceFactory.setConnectTimeout(rpcTimeout); serviceFactory.setSerialization(serialization); maxRange = (Integer.parseInt(String.valueOf((endTime - startTime))) / 1000000) + 1; errorTPS = new long[maxRange]; errorResponseTimes = new long[maxRange]; tps = new long[maxRange]; responseTimes = new long[maxRange]; // init for (int i = 0; i < maxRange; i++) { errorTPS[i] = 0; errorResponseTimes[i] = 0; tps[i] = 0; responseTimes[i] = 0; } }
public void await() { if(0 == delay) { log.info("No pause between retry"); return; } final Timer wakeup = new Timer(); final CyclicBarrier wait = new CyclicBarrier(2); // Schedule for immediate execution with an interval of 1s wakeup.scheduleAtFixedRate(new PauserTimerTask(wait), 0, 1000); try { // Wait for notify from wakeup timer wait.await(); } catch(InterruptedException | BrokenBarrierException e) { log.error(e.getMessage(), e); } }
synchronized void makeBarrier() { if (numPlayers < 1) { log.warning("cannot make barrier for " + numPlayers + " viewers - something is wrong"); log.warning("disabling sychronized playback because probably multiple viewers are active but we are not playing set of sychronized files"); outer.getToggleSyncEnabledAction().actionPerformed(null); // toggle all the viewers syncenabled menu item // JOptionPane.showMessageDialog(null,"Disabled sychronized playback because files are not part of sychronized set"); return; } log.info("making barrier for " + this); barrier = new CyclicBarrier(numPlayers, new Runnable() { public void run() { // this is run after await synchronization; it updates the time to read events from each AEInputStream // log.info(Thread.currentThread()+" resetting barrier"); barrier.reset(); setTime(getTime() + getTimesliceUs()); } }); }
static void oneRun(int nthreads, int iters) throws Exception { LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); CyclicBarrier barrier = new CyclicBarrier(nthreads + 1, timer); Exchanger<Int> l = null; Exchanger<Int> r = new Exchanger<>(); for (int i = 0; i < nthreads; ++i) { pool.execute(new Stage(l, r, barrier, iters)); l = r; r = (i+2 < nthreads) ? new Exchanger<Int>() : null; } barrier.await(); barrier.await(); long time = timer.getTime(); if (print) System.out.println(LoopHelpers.rightJustify(time / (iters * nthreads + iters * (nthreads-2))) + " ns per transfer"); }
@Test public void startsNewChildSpan() throws Exception { CyclicBarrier cyclicBarrier = new CyclicBarrier(nThreads); CompletableFuture<?>[] futures = new CompletableFuture[nThreads]; for (int i = 0; i < nThreads; i++) { futures[i] = CompletableFuture.runAsync(() -> { Span currentSpan = tracing.tracer().newTrace().start(); waitTillAllAreReady(cyclicBarrier); try (SpanInScope spanInScope = tracing.tracer().withSpanInScope(currentSpan)) { assertThat(tracingAdviser.invoke(spanName, path, supplier), is(expected)); } catch (Throwable throwable) { fail(throwable.getMessage()); } finally { currentSpan.finish(); } }, Executors.newFixedThreadPool(nThreads)); } CompletableFuture.allOf(futures).join(); assertThat(traces.size(), is(nThreads)); for (Queue<zipkin2.Span> queue : traces.values()) { zipkin2.Span child = queue.poll(); assertThat(child.name(), is(spanName)); zipkin2.Span parent = queue.poll(); assertThat(child.parentId(), is(parent.id())); assertThat(child.traceId(), is(parent.traceId())); assertThat(tracedValues(child), contains(this.getClass().getCanonicalName())); } }
public void testRebuildNoDeadlock() throws Exception { CyclicBarrier barrier= new CyclicBarrier(2); final Task1 task1 = new Task1(); Thread t1 = new Thread(task1, "Thread 1"); final Task2 task2 = new Task2(barrier); Thread t2 = new Thread(task2, "Thread 2"); handler.setBarrier(barrier); t1.start(); t2.start(); t1.join(60000); t2.join(60000); // wait max 1 min for the test to finish assertTrue(task1.done); assertTrue(task2.done); }
private static Awaiter awaiter(final CyclicBarrier barrier, final long millis) { return new Awaiter() { public void run() { toTheStartingGate(); try { barrier.await(millis, MILLISECONDS); } catch (Throwable result) { result(result); }}}; }
public static void main(String[] args) { int num = 10; CyclicBarrier barrier = new CyclicBarrier(num, new Runnable() { @Override public void run() { // TODO Auto-generated method stub System.out.println("go on together!"); } }); for (int i = 1; i <= num; i++) { new Thread(new CyclicBarrierWorker(i, barrier)).start(); } }
@Test public void getId32() throws Exception { int nThreads = 1000; int size = 1000; CyclicBarrier cyclicBarrier = new CyclicBarrier(nThreads + 1); StopWatch stopWatch = new StopWatch(); ExecutorService executorService = Executors.newFixedThreadPool(nThreads); stopWatch.start(); for (int i = 0; i < nThreads; i++) { int port = (8800 + (i % 10)); executorService.execute(new Runnable() { @Override public void run() { try { cyclicBarrier.await(); for (int j = 0; j < size; j++) { mockMvc.perform( get("/api/snowflake/get-id32?partnerKey=A" + port) .header("Content-Type", "application/json;charset=UTF-8")) .andExpect(status().isOk()) .andExpect(jsonPath("$.code").value(0)) .andExpect(jsonPath("$.data.id").isNumber()); } cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } }); } cyclicBarrier.await(); cyclicBarrier.await(); stopWatch.stop(); System.out.println(stopWatch.prettyPrint()); executorService.shutdown(); }
public void testTaskThrowsError() throws Exception { class MyError extends Error {} final CyclicBarrier barrier = new CyclicBarrier(2); // we need to make sure the error gets thrown on a different thread. ExecutorService service = Executors.newSingleThreadExecutor(); try { final SerializingExecutor executor = new SerializingExecutor(service); Runnable errorTask = new Runnable() { @Override public void run() { throw new MyError(); } }; Runnable barrierTask = new Runnable() { @Override public void run() { try { barrier.await(); } catch (Exception e) { throw new RuntimeException(e); } } }; executor.execute(errorTask); service.execute(barrierTask); // submit directly to the service // the barrier task runs after the error task so we know that the error has been observed by // SerializingExecutor by the time the barrier is satified barrier.await(10, TimeUnit.SECONDS); executor.execute(barrierTask); // timeout means the second task wasn't even tried barrier.await(10, TimeUnit.SECONDS); } finally { service.shutdown(); } }
private void waitForBarrier(CyclicBarrier b) { try { b.await(); } catch (InterruptedException | BrokenBarrierException e) { Assert.fail("Test error: Caught unexpected exception:", e); } }
public void testConcurrentPrimary() throws InterruptedException { Thread[] threads = new Thread[randomIntBetween(2, 5)]; final int opsPerThread = randomIntBetween(10, 20); final int maxOps = opsPerThread * threads.length; final long unFinishedSeq = randomIntBetween(0, maxOps - 2); // make sure we always index the last seqNo to simplify maxSeq checks logger.info("--> will run [{}] threads, maxOps [{}], unfinished seq no [{}]", threads.length, maxOps, unFinishedSeq); final CyclicBarrier barrier = new CyclicBarrier(threads.length); for (int t = 0; t < threads.length; t++) { final int threadId = t; threads[t] = new Thread(new AbstractRunnable() { @Override public void onFailure(Exception e) { throw new ElasticsearchException("failure in background thread", e); } @Override protected void doRun() throws Exception { barrier.await(); for (int i = 0; i < opsPerThread; i++) { long seqNo = tracker.generateSeqNo(); logger.info("[t{}] started [{}]", threadId, seqNo); if (seqNo != unFinishedSeq) { tracker.markSeqNoAsCompleted(seqNo); logger.info("[t{}] completed [{}]", threadId, seqNo); } } } }, "testConcurrentPrimary_" + threadId); threads[t].start(); } for (Thread thread : threads) { thread.join(); } assertThat(tracker.getMaxSeqNo(), equalTo(maxOps - 1L)); assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L)); tracker.markSeqNoAsCompleted(unFinishedSeq); assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L)); assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1)); assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); }
public void testConcurrentGetAndFlush() throws Exception { ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); engine.index(indexForDoc(doc)); final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>(); latestGetResult.set(engine.get(new Engine.Get(true, newUid(doc)))); final AtomicBoolean flushFinished = new AtomicBoolean(false); final CyclicBarrier barrier = new CyclicBarrier(2); Thread getThread = new Thread(() -> { try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } while (flushFinished.get() == false) { Engine.GetResult previousGetResult = latestGetResult.get(); if (previousGetResult != null) { previousGetResult.release(); } latestGetResult.set(engine.get(new Engine.Get(true, newUid(doc)))); if (latestGetResult.get().exists() == false) { break; } } }); getThread.start(); barrier.await(); engine.flush(); flushFinished.set(true); getThread.join(); assertTrue(latestGetResult.get().exists()); latestGetResult.get().release(); }
@Override public ClientRunnable getClientRunnable(String targetIP, int targetPort, int clientNums, int rpcTimeout, CyclicBarrier barrier, CountDownLatch latch, long endTime, long startTime) { return new SimpleProcessorBenchmarkClientRunnable(targetIP, targetPort, clientNums, rpcTimeout, barrier, latch, startTime, endTime); }
@SuppressWarnings("rawtypes") @Override public ClientRunnable getClientRunnable(String targetIP, int targetPort, int clientNums, int rpcTimeout, CyclicBarrier barrier, CountDownLatch latch, long startTime, long endTime) throws IllegalArgumentException, SecurityException, InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException, ClassNotFoundException { String runnable = properties.getProperty("classname"); Class[] parameterTypes = new Class[] { String.class, int.class, int.class, int.class, CyclicBarrier.class, CountDownLatch.class, long.class, long.class }; Object[] parameters = new Object[] { targetIP, targetPort, clientNums, rpcTimeout, barrier, latch, startTime, endTime }; return (ClientRunnable) Class.forName(runnable).getConstructor(parameterTypes).newInstance(parameters); }
public AbstractBenchmarkClientRunnable(BenchmarkService benchmarkService, CyclicBarrier barrier, CountDownLatch latch, long startTime, long endTime) { this.cyclicBarrier = barrier; this.countDownLatch = latch; this.startTime = startTime; this.endTime = endTime; this.benchmarkService = benchmarkService; statisticTime = (int) ((endTime - startTime) / 1000000); statistics = new RunnableStatistics(statisticTime); }
/** * A 1-party barrier triggers after single await */ public void testSingleParty() throws Exception { CyclicBarrier b = new CyclicBarrier(1); assertEquals(1, b.getParties()); assertEquals(0, b.getNumberWaiting()); b.await(); b.await(); assertEquals(0, b.getNumberWaiting()); }
private void waitForSignal(CyclicBarrier cyclicBarrier) { try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { fail(e.getMessage()); } }
/** * Main method of the example * @param args */ public static void main(String[] args) { /* * Initializes the bi-dimensional array of data * 10000 rows * 1000 numbers in each row * Looking for number 5 */ final int ROWS=10000; final int NUMBERS=1000; final int SEARCH=5; final int PARTICIPANTS=5; final int LINES_PARTICIPANT=2000; MatrixMock mock=new MatrixMock(ROWS, NUMBERS,SEARCH); // Initializes the object for the results Results results=new Results(ROWS); // Creates an Grouper object Grouper grouper=new Grouper(results); // Creates the CyclicBarrier object. It has 5 participants and, when // they finish, the CyclicBarrier will execute the grouper object CyclicBarrier barrier=new CyclicBarrier(PARTICIPANTS,grouper); // Creates, initializes and starts 5 Searcher objects Searcher searchers[]=new Searcher[PARTICIPANTS]; for (int i=0; i<PARTICIPANTS; i++){ searchers[i]=new Searcher(i*LINES_PARTICIPANT, (i*LINES_PARTICIPANT)+LINES_PARTICIPANT, mock, results, 5,barrier); Thread thread=new Thread(searchers[i]); thread.start(); } System.out.printf("Main: The main thread has finished.\n"); }
/** * A timeout in timed await throws TimeoutException */ public void testAwait3_TimeoutException() throws InterruptedException { final CyclicBarrier c = new CyclicBarrier(2); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws Exception { long startTime = System.nanoTime(); try { c.await(timeoutMillis(), MILLISECONDS); shouldThrow(); } catch (TimeoutException success) {} assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); }}); awaitTermination(t); }