public static void verifyShardState(final AbstractDataStore datastore, final String shardName, final Consumer<OnDemandShardState> verifier) throws Exception { ActorContext actorContext = datastore.getActorContext(); Future<ActorRef> future = actorContext.findLocalShardAsync(shardName); ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS)); AssertionError lastError = null; Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 5) { OnDemandShardState shardState = (OnDemandShardState)actorContext .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE); try { verifier.accept(shardState); return; } catch (AssertionError e) { lastError = e; Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } } throw lastError; }
@SuppressWarnings("checkstyle:IllegalCatch") private void verifyActorReady(ActorRef actorRef) { // Sometimes we see messages go to dead letters soon after creation - it seems the actor isn't quite // in a state yet to receive messages or isn't actually created yet. This seems to happen with // actorSelection so, to alleviate it, we use an actorSelection and send an Identify message with // retries to ensure it's ready. Timeout timeout = new Timeout(100, TimeUnit.MILLISECONDS); Throwable lastError = null; Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 10) { try { ActorSelection actorSelection = system.actorSelection(actorRef.path().toString()); Future<Object> future = Patterns.ask(actorSelection, new Identify(""), timeout); ActorIdentity reply = (ActorIdentity)Await.result(future, timeout.duration()); Assert.assertNotNull("Identify returned null", reply.getRef()); return; } catch (Exception | AssertionError e) { Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); lastError = e; } } throw new RuntimeException(lastError); }
@Test public void testWaitTillReadyCountDown() { try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) { doReturn(datastoreContext).when(actorContext).getDatastoreContext(); doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout(); doReturn(FiniteDuration.apply(5000, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration(); Executors.newSingleThreadExecutor().submit(() -> { Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); distributedDataStore.getWaitTillReadyCountDownLatch().countDown(); }); long start = System.currentTimeMillis(); distributedDataStore.waitTillReady(); long end = System.currentTimeMillis(); assertTrue("Expected to be released in 500 millis", end - start < 5000); } }
public static void main(String[] args) { Flux<LocalDateTime> flux = Flux.<LocalDateTime>create(e -> { Schedulers.newSingle("brc", true) .schedulePeriodically( () -> { LOGGER.info("calculating..."); e.next(LocalDateTime.now(ZoneOffset.UTC)); }, 0, 100, TimeUnit.MILLISECONDS); }, OverflowStrategy.LATEST).cache(1); flux.blockFirst(); while (true) { LOGGER.info("{}", flux.blockFirst(Duration.ofMillis(0))); Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } }
public static void main(String[] args) { Flux<Integer> flux = Flux.<Integer>push(e -> { // imagine reading from DB row by row or from file line by line for (int fi = 0; fi < 30; fi++) { Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); e.next(fi); } }) .log() .subscribeOn(emitter) .log(); flux.flatMap( // could be some other IO like reading from a second database i -> Mono.fromSupplier(() -> i + " - " + i * 2) .log() .subscribeOn(transformer) .log() .publishOn(consumer)) .log() .collectList() .log().block(); }
@Override public void write(Message message) { Uninterruptibles.awaitUninterruptibly(transportInitialized); LOG.debug("Sending message: {}", message); try { final GelfMessageBuilder messageBuilder = new GelfMessageBuilder(message.getMessage(), message.getSource()) .timestamp(message.getTimestamp().getMillis() / 1000.0) .additionalFields(message.getFields().asMap()); if (message.getLevel() != null) { messageBuilder.level(GelfMessageLevel.valueOf(message.getLevel().toString())); } else { messageBuilder.level(null); } transport.send(messageBuilder.build()); } catch (InterruptedException e) { LOG.error("Failed to send message", e); } }
/** * Close the DomainSocketWatcher and wait for its thread to terminate. * * If there is more than one close, all but the first will be ignored. */ @Override public void close() throws IOException { lock.lock(); try { if (closed) return; if (LOG.isDebugEnabled()) { LOG.debug(this + ": closing"); } closed = true; } finally { lock.unlock(); } // Close notificationSockets[0], so that notificationSockets[1] gets an EOF // event. This will wake up the thread immediately if it is blocked inside // the select() system call. notificationSockets[0].close(); // Wait for the select thread to terminate. Uninterruptibles.joinUninterruptibly(watcherThread); }
/** * Refreshes the value associated with {@code key}, unless another thread is already doing so. * Returns the newly refreshed value associated with {@code key} if it was refreshed inline, or * {@code null} if another thread is performing the refresh or if an error occurs during * refresh. */ @Nullable V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) { final LoadingValueReference<K, V> loadingValueReference = insertLoadingValueReference(key, hash, checkTime); if (loadingValueReference == null) { return null; } ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader); if (result.isDone()) { try { return Uninterruptibles.getUninterruptibly(result); } catch (Throwable t) { // don't let refresh exceptions propagate; error was already logged } } return null; }
@Override public T call() { T result; while (true) { try { result = callable.call(); break; } catch (Throwable e) { if (retryOn.stream().noneMatch(c -> getAllCauses(e).anyMatch(c::isInstance))) { throw Throwables.propagate(e); } log.warn(errorMessage + ", retry in " + delaySec + " sec: " + e.toString()); Uninterruptibles.sleepUninterruptibly(delaySec, TimeUnit.SECONDS); } } return result; }
public void waitForMembersUp(final String... otherMembers) { Set<String> otherMembersSet = Sets.newHashSet(otherMembers); Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 10) { CurrentClusterState state = Cluster.get(getSystem()).state(); for (Member m: state.getMembers()) { if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) && otherMembersSet.isEmpty()) { return; } } Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } fail("Member(s) " + otherMembersSet + " are not Up"); }
private void verifyEmptyBucket(final JavaTestKit testKit, final ActorRef registry, final Address address) throws AssertionError { Map<Address, Bucket<RoutingTable>> buckets; int numTries = 0; while (true) { buckets = retrieveBuckets(registry1, testKit, address); try { verifyBucket(buckets.get(address), Collections.emptyList()); break; } catch (AssertionError e) { if (++numTries >= 50) { throw e; } } Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); } }
@Test public void testAssembledMessageStateExpiration() throws IOException { final int expiryDuration = 200; try (MessageAssembler assembler = newMessageAssemblerBuilder("testAssembledMessageStateExpiration") .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) { final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1); final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 2, SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref()); assembler.handleMessage(messageSlice, testProbe.ref()); final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class); assertSuccessfulMessageSliceReply(reply, IDENTIFIER, 1); assertTrue("MessageAssembler should have remove state for " + identifier, assembler.hasState(identifier)); Uninterruptibles.sleepUninterruptibly(expiryDuration + 50, TimeUnit.MILLISECONDS); assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier)); verify(mockFiledBackedStream).cleanup(); } }
@VisibleForTesting AsyncCopier( InputStream source, OutputStream sink, Supplier<Level> ioExceptionLogLevel, CopyStrategy copyStrategy, ExecutorService executorService) { this.source = source; this.sink = sink; this.ioExceptionLogLevel = ioExceptionLogLevel; this.copyStrategy = copyStrategy; // Submit the copy task and wait uninterruptibly, but very briefly, for it to actually start. copyFuture = executorService.submit(new Runnable() { @Override public void run() { copy(); } }); Uninterruptibles.awaitUninterruptibly(copyStarted); }
/** * Stops and removes a volume scanner.<p/> * * This function will block until the volume scanner has stopped. * * @param volume The volume to remove. */ public synchronized void removeVolumeScanner(FsVolumeSpi volume) { if (!isEnabled()) { LOG.debug("Not removing volume scanner for {}, because the block " + "scanner is disabled.", volume.getStorageID()); return; } VolumeScanner scanner = scanners.get(volume.getStorageID()); if (scanner == null) { LOG.warn("No scanner found to remove for volumeId {}", volume.getStorageID()); return; } LOG.info("Removing scanner for volume {} (StorageID {})", volume.getBasePath(), volume.getStorageID()); scanner.shutdown(); scanners.remove(volume.getStorageID()); Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES); }
public static void verifyShardStats(final AbstractDataStore datastore, final String shardName, final ShardStatsVerifier verifier) throws Exception { ActorContext actorContext = datastore.getActorContext(); Future<ActorRef> future = actorContext.findLocalShardAsync(shardName); ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS)); AssertionError lastError = null; Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 5) { ShardStats shardStats = (ShardStats)actorContext .executeOperation(shardActor, Shard.GET_SHARD_MBEAN_MESSAGE); try { verifier.verify(shardStats); return; } catch (AssertionError e) { lastError = e; Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } } throw lastError; }
@Test public void testSendAsync() { CountDownLatch latch1 = new CountDownLatch(1); CompletableFuture<Void> response = netty1.sendAsync(ep2, "test-subject", "hello world".getBytes()); response.whenComplete((r, e) -> { assertNull(e); latch1.countDown(); }); Uninterruptibles.awaitUninterruptibly(latch1); CountDownLatch latch2 = new CountDownLatch(1); response = netty1.sendAsync(invalidEndPoint, "test-subject", "hello world".getBytes()); response.whenComplete((r, e) -> { assertNotNull(e); latch2.countDown(); }); Uninterruptibles.awaitUninterruptibly(latch2); }
@SuppressWarnings("checkstyle:IllegalCatch") public static <T> List<T> expectMatching(final ActorRef actor, final Class<T> clazz, final int count, final Predicate<T> matcher) { int timeout = 5000; Exception lastEx = null; List<T> messages = Collections.emptyList(); for (int i = 0; i < timeout / 50; i++) { try { messages = getAllMatching(actor, clazz); Iterables.removeIf(messages, Predicates.not(matcher)); if (messages.size() >= count) { return messages; } lastEx = null; } catch (Exception e) { lastEx = e; } Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } throw new AssertionError(String.format("Expected %d messages of type %s. Actual received was %d: %s", count, clazz, messages.size(), messages), lastEx); }
@SuppressWarnings("checkstyle:IllegalCatch") public static void waitUntilLeader(ActorRef actorRef) { FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS); for (int i = 0; i < 20 * 5; i++) { Future<Object> future = Patterns.ask(actorRef, FindLeader.INSTANCE, new Timeout(duration)); try { final Optional<String> maybeLeader = ((FindLeaderReply)Await.result(future, duration)).getLeaderActor(); if (maybeLeader.isPresent()) { return; } } catch (Exception e) { LOG.error("FindLeader failed", e); } Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } Assert.fail("Leader not found for actorRef " + actorRef.path()); }
protected TestActorRef<TestRaftActor> newTestRaftActor(String id, TestRaftActor.Builder builder) { builder.collectorActor(factory.createActor( MessageCollectorActor.props(), factory.generateActorId(id + "-collector"))).id(id); InvalidActorNameException lastEx = null; for (int i = 0; i < 10; i++) { try { return factory.createTestActor(builder.props().withDispatcher(Dispatchers.DefaultDispatcherId()) .withMailbox(Mailboxes.DefaultMailboxId()), id); } catch (InvalidActorNameException e) { lastEx = e; Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } } assertNotNull(lastEx); throw lastEx; }
/** * Create a test actor with the passed in name. * * @param props the actor Props * @param actorId name of actor * @param <T> the actor type * @return the ActorRef */ @SuppressWarnings("unchecked") public <T extends Actor> TestActorRef<T> createTestActor(Props props, String actorId) { InvalidActorNameException lastError = null; for (int i = 0; i < 10; i++) { try { TestActorRef<T> actorRef = TestActorRef.create(system, props, actorId); return (TestActorRef<T>) addActor(actorRef, true); } catch (InvalidActorNameException e) { lastError = e; Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } } throw lastError; }
@Test public void testCheckExpiredSlicedMessageState() throws IOException { doReturn(1).when(mockInputStream).read(any(byte[].class)); final int expiryDuration = 200; try (MessageSlicer slicer = MessageSlicer.builder().messageSliceSize(1) .logContext("testCheckExpiredSlicedMessageState") .fileBackedStreamFactory(mockFiledBackedStreamFactory) .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) { slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(), mockOnFailureCallback); Uninterruptibles.sleepUninterruptibly(expiryDuration + 50, TimeUnit.MILLISECONDS); slicer.checkExpiredSlicedMessageState(); assertFailureCallback(RuntimeException.class); verify(mockFiledBackedStream).cleanup(); } }
@Test public void testOkToReplicate() { MockRaftActorContext context = new MockRaftActorContext(); context.setCommitIndex(0); FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(new PeerInfo("follower1", null, VotingState.VOTING), 10, context); assertTrue(followerLogInformation.okToReplicate()); assertFalse(followerLogInformation.okToReplicate()); // wait for 150 milliseconds and it should work again Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); assertTrue(followerLogInformation.okToReplicate()); //increment next index and try immediately and it should work again followerLogInformation.incrNextIndex(); assertTrue(followerLogInformation.okToReplicate()); }
@SuppressWarnings("checkstyle:IllegalCatch") private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator, final String shardDispatcher, final String shardManagerId) { Exception lastException = null; for (int i = 0; i < 100; i++) { try { return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher), shardManagerId); } catch (Exception e) { lastException = e; Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying " + "(retry count = {})", shardManagerId, e.getMessage(), i); } } throw new IllegalStateException("Failed to create Shard Manager", lastException); }
@SuppressWarnings("checkstyle:IllegalCatch") private static ActorRef createShardedDataTreeActor(final ActorSystem actorSystem, final ShardedDataTreeActorCreator creator, final String shardDataTreeActorId) { Exception lastException = null; for (int i = 0; i < MAX_ACTOR_CREATION_RETRIES; i++) { try { return actorSystem.actorOf(creator.props(), shardDataTreeActorId); } catch (final Exception e) { lastException = e; Uninterruptibles.sleepUninterruptibly(ACTOR_RETRY_DELAY, ACTOR_RETRY_TIME_UNIT); LOG.debug("Could not create actor {} because of {} -" + " waiting for sometime before retrying (retry count = {})", shardDataTreeActorId, e.getMessage(), i); } } throw new IllegalStateException("Failed to create actor for ShardedDOMDataTree", lastException); }
public static void verifyRaftState(final AbstractDataStore datastore, final String shardName, final RaftStateVerifier verifier) throws Exception { ActorContext actorContext = datastore.getActorContext(); Future<ActorRef> future = actorContext.findLocalShardAsync(shardName); ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS)); AssertionError lastError = null; Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 5) { OnDemandRaftState raftState = (OnDemandRaftState)actorContext .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE); try { verifier.verify(raftState); return; } catch (AssertionError e) { lastError = e; Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } } throw lastError; }
static void waitForMembersUp(final ActorSystem node, final UniqueAddress... addresses) { Set<UniqueAddress> otherMembersSet = Sets.newHashSet(addresses); Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 10) { CurrentClusterState state = Cluster.get(node).state(); for (Member m : state.getMembers()) { if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.uniqueAddress()) && otherMembersSet.isEmpty()) { return; } } Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } fail("Member(s) " + otherMembersSet + " are not Up"); }
private static void verifyCandidates(final AbstractDataStore dataStore, final DOMEntity entity, final String... expCandidates) throws Exception { AssertionError lastError = null; Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.MILLISECONDS) <= 10000) { Optional<NormalizedNode<?, ?>> possible = dataStore.newReadOnlyTransaction() .read(entityPath(entity.getType(), entity.getIdentifier()).node(Candidate.QNAME)) .get(5, TimeUnit.SECONDS); try { assertEquals("Candidates not found for " + entity, true, possible.isPresent()); Collection<String> actual = new ArrayList<>(); for (MapEntryNode candidate: ((MapNode)possible.get()).getValue()) { actual.add(candidate.getChild(CANDIDATE_NAME_NODE_ID).get().getValue().toString()); } assertEquals("Candidates for " + entity, Arrays.asList(expCandidates), actual); return; } catch (AssertionError e) { lastError = e; Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS); } } throw lastError; }
protected void verifyEntityCandidate(String entityType, YangInstanceIdentifier entityId, String candidateName, Function<YangInstanceIdentifier,NormalizedNode<?,?>> reader, boolean expectPresent) { AssertionError lastError = null; Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) { NormalizedNode<?, ?> node = reader.apply(ENTITY_OWNERS_PATH); try { verifyEntityCandidate(node, entityType, entityId, candidateName, expectPresent); return; } catch (AssertionError e) { lastError = e; Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } } throw lastError; }
protected void verifyNodeRemoved(YangInstanceIdentifier path, Function<YangInstanceIdentifier,NormalizedNode<?,?>> reader) { AssertionError lastError = null; Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) { try { NormalizedNode<?, ?> node = reader.apply(path); Assert.assertNull("Node was not removed at path: " + path, node); return; } catch (AssertionError e) { lastError = e; Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } } throw lastError; }
@Override public void write(Message message) { Uninterruptibles.awaitUninterruptibly(transportInitialized); LOG.debug("Sending message: {}", message); try { Producer<String, String> producer = getProducer(configuration); producer.send(new ProducerRecord<>(String.valueOf(configuration.getTopic()), "message", message.getMessage())); } catch (Exception e) { LOG.error("Failed to send message", e); } }
@Test(timeout=60000) public void testStatisticsOperations() throws Exception { final Statistics stats = new Statistics("file"); Assert.assertEquals(0L, stats.getBytesRead()); Assert.assertEquals(0L, stats.getBytesWritten()); Assert.assertEquals(0, stats.getWriteOps()); stats.incrementBytesWritten(1000); Assert.assertEquals(1000L, stats.getBytesWritten()); Assert.assertEquals(0, stats.getWriteOps()); stats.incrementWriteOps(123); Assert.assertEquals(123, stats.getWriteOps()); Thread thread = new Thread() { @Override public void run() { stats.incrementWriteOps(1); } }; thread.start(); Uninterruptibles.joinUninterruptibly(thread); Assert.assertEquals(124, stats.getWriteOps()); // Test copy constructor and reset function Statistics stats2 = new Statistics(stats); stats.reset(); Assert.assertEquals(0, stats.getWriteOps()); Assert.assertEquals(0L, stats.getBytesWritten()); Assert.assertEquals(0L, stats.getBytesRead()); Assert.assertEquals(124, stats2.getWriteOps()); Assert.assertEquals(1000L, stats2.getBytesWritten()); Assert.assertEquals(0L, stats2.getBytesRead()); }
/** * Test that a java interruption can stop the watcher thread */ @Test(timeout=60000) public void testInterruption() throws Exception { final DomainSocketWatcher watcher = newDomainSocketWatcher(10); watcher.watcherThread.interrupt(); Uninterruptibles.joinUninterruptibly(watcher.watcherThread); watcher.close(); }
/** * Test that domain sockets are closed when the watcher is closed. */ @Test(timeout=300000) public void testCloseSocketOnWatcherClose() throws Exception { final DomainSocketWatcher watcher = newDomainSocketWatcher(10000000); DomainSocket pair[] = DomainSocket.socketpair(); watcher.add(pair[1], new DomainSocketWatcher.Handler() { @Override public boolean handle(DomainSocket sock) { return true; } }); watcher.close(); Uninterruptibles.joinUninterruptibly(watcher.watcherThread); assertFalse(pair[1].isOpen()); }
@Test public void BasicTickTest() throws IOException { System.out.println("==> Starting BasicTickTest"); String spoutId = "feeder.spout"; String boltId = "tick.bolt"; String topoId = "TestTopology"; TopologyBuilder builder = new TopologyBuilder(); FeederSpout spout = TickBoltTest.createFeeder(); builder.setSpout(spoutId, spout); SimpleStatefulTick tickBolt = new SimpleStatefulTick(); builder.setBolt(boltId, tickBolt).shuffleGrouping(spoutId); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topoId, TestUtils.stormConfig(), builder.createTopology()); /* Let's Submit Stuff! */ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); spout.feed(Arrays.asList(new String[]{"key1", "msg1"})); /* And sleep some more */ Uninterruptibles.sleepUninterruptibly(6, TimeUnit.SECONDS); // TODO: this test isn't great .. the number of lines in the file from ticks could vary int expectedLines = 3; Assert.assertTrue("We should have at least " + expectedLines + " lines in the test file.", expectedLines <= tickBolt.tickFile.numLines()); Assert.assertEquals(1, tickBolt.workFile.numLines()); cluster.killTopology(topoId); Utils.sleep(4 * 1000); }
/** * Get the result of the given {@link Future}, retrying if interrupted using * {@link Uninterruptibles#getUninterruptibly}, and wrap any checked exceptions * other than the one given using {@link Throwables#propagate(Throwable)}. * * Use this when you only need to handle a particular checked exception, * and want any others can be handled generically. */ public static <T, E extends Throwable> T getUncheckedExcept(Future<T> future, Class<E> except) throws E { try { return Uninterruptibles.getUninterruptibly(future); } catch(ExecutionException e) { if(except.isInstance(e.getCause())) { throw except.cast(e.getCause()); } else { throw Throwables.propagate(e); } } }
private static Map<Address, Bucket<RoutingTable>> retrieveBuckets(final ActorRef bucketStore, final JavaTestKit testKit, final Address... addresses) { int numTries = 0; while (true) { bucketStore.tell(GET_ALL_BUCKETS, testKit.getRef()); @SuppressWarnings("unchecked") Map<Address, Bucket<RoutingTable>> buckets = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), Map.class); boolean foundAll = true; for (Address addr : addresses) { Bucket<RoutingTable> bucket = buckets.get(addr); if (bucket == null) { foundAll = false; break; } } if (foundAll) { return buckets; } if (++numTries >= 50) { Assert.fail("Missing expected buckets for addresses: " + Arrays.toString(addresses) + ", Actual: " + buckets); } Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); } }
/** * Put a dummy task into the queue and wait for it to be run. Because it's single threaded, this means all * tasks submitted before this point are now completed. Usually you won't want to use this method - it's a * convenience primarily used in unit testing. If you want to wait for an event to be called the right thing * to do is usually to create a {@link com.google.common.util.concurrent.SettableFuture} and then call set * on it. You can then either block on that future, compose it, add listeners to it and so on. */ public static void waitForUserCode() { final CountDownLatch latch = new CountDownLatch(1); USER_THREAD.execute(new Runnable() { @Override public void run() { latch.countDown(); } }); Uninterruptibles.awaitUninterruptibly(latch); }
public static void waitUntilShardIsDown(final ActorContext actorContext, final String shardName) { for (int i = 0; i < 20 * 5 ; i++) { LOG.debug("Waiting for shard down {}", shardName); Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName); if (!shardReply.isPresent()) { return; } } throw new IllegalStateException("Shard[" + shardName + " did not shutdown in time"); }