@Test public void testMajorityPropose() throws Exception { final List<TestPriest> majorityTestPriests = Stream.generate(this::testPriest) .limit(PRIESTS_COUNT - MINORITY) .collect(toList()); final List<TestPriest> minorityTestPriests = Stream.generate(this::testPriest) .limit(MINORITY) .collect(toList()); final Set<ActorPath> priestsPaths = Stream .concat(majorityTestPriests.stream(), minorityTestPriests.stream()) .map(p -> p.path) .collect(toSet()); final List<TestKit> majorityKits = majorityTestPriests.stream().map(p -> p.kit).collect(toList()); minorityTestPriests.forEach(p -> p.priest.tell(PoisonPill.getInstance(), ActorRef.noSender())); final ActorRef leader = system.actorOf(DecreePresident.props(new Cluster(priestsPaths), 1)); leader.tell(new PaxosAPI.Propose("VALUE", 1), ActorRef.noSender()); majorityKits.forEach(kit -> kit.expectMsg(new PaxosAPI.Decide("VALUE", 1))); }
@Test public void testMinorityPropose() throws Exception { final List<TestPriest> majorityTestPriests = Stream.generate(this::testPriest) .limit(PRIESTS_COUNT - MINORITY) .collect(toList()); final List<TestPriest> minorityTestPriests = Stream.generate(this::testPriest) .limit(MINORITY) .collect(toList()); final Set<ActorPath> priestsPaths = Stream .concat(majorityTestPriests.stream(), minorityTestPriests.stream()) .map(p -> p.path) .collect(toSet()); final List<TestKit> majorityKits = majorityTestPriests.stream().map(p -> p.kit).collect(toList()); majorityTestPriests.forEach(p -> p.priest.tell(PoisonPill.getInstance(), ActorRef.noSender())); final ActorRef leader = system.actorOf(DecreePresident.props(new Cluster(priestsPaths), 1)); leader.tell(new PaxosAPI.Propose("VALUE", 1), ActorRef.noSender()); majorityKits.forEach(kit -> kit.expectNoMsg(Duration.create(1, SECONDS))); }
@Override public void onReceive(Object message) { if (message instanceof CaptureSnapshotReply) { Snapshot snapshot = Snapshot.create( ((CaptureSnapshotReply)message).getSnapshotState(), params.captureSnapshot.getUnAppliedEntries(), params.captureSnapshot.getLastIndex(), params.captureSnapshot.getLastTerm(), params.captureSnapshot.getLastAppliedIndex(), params.captureSnapshot.getLastAppliedTerm(), params.electionTerm.getCurrentTerm(), params.electionTerm.getVotedFor(), params.peerInformation); LOG.debug("{}: Received CaptureSnapshotReply, sending {}", params.id, snapshot); params.replyToActor.tell(new GetSnapshotReply(params.id, snapshot), getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); } else if (message instanceof ReceiveTimeout) { LOG.warn("{}: Got ReceiveTimeout for inactivity - did not receive CaptureSnapshotReply within {} ms", params.id, params.receiveTimeout.toMillis()); params.replyToActor.tell(new akka.actor.Status.Failure(new TimeoutException(String.format( "Timed out after %d ms while waiting for CaptureSnapshotReply", params.receiveTimeout.toMillis()))), getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); } }
void removeEntityOwnershipListener(String entityType, DOMEntityOwnershipListener listener) { LOG.debug("{}: Removing EntityOwnershipListener {} for entity type {}", logId, listener, entityType); listenerLock.writeLock().lock(); try { if (entityTypeListenerMap.remove(entityType, listener)) { ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener); LOG.debug("{}: Found {}", logId, listenerEntry); listenerEntry.referenceCount--; if (listenerEntry.referenceCount <= 0) { listenerActorMap.remove(listener); if (listenerEntry.actorRef != null) { LOG.debug("Killing EntityOwnershipListenerActor {}", listenerEntry.actorRef); listenerEntry.actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); } } } } finally { listenerLock.writeLock().unlock(); } }
@Override public void close() { boolean sendCloseMessage; synchronized (this) { sendCloseMessage = !closed && listenerRegistrationActor != null; closed = true; } if (sendCloseMessage) { listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender()); listenerRegistrationActor = null; } if (dataChangeListenerActor != null) { dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); dataChangeListenerActor = null; } }
@Override public void onReceive(Object message) { if (message instanceof GetSnapshotReply) { onGetSnapshotReply((GetSnapshotReply)message); } else if (message instanceof Failure) { LOG.debug("{}: Received {}", params.id, message); params.replyToActor.tell(message, getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); } else if (message instanceof ReceiveTimeout) { String msg = String.format( "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s " + "did not respond.", params.receiveTimeout.toMillis(), params.shardNames.size(), remainingShardNames.size(), remainingShardNames); LOG.warn("{}: {}", params.id, msg); params.replyToActor.tell(new Failure(new TimeoutException(msg)), getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); } }
private void onGetSnapshotReply(GetSnapshotReply getSnapshotReply) { LOG.debug("{}: Received {}", params.id, getSnapshotReply); ShardIdentifier shardId = ShardIdentifier.fromShardIdString(getSnapshotReply.getId()); shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot())); remainingShardNames.remove(shardId.getShardName()); if (remainingShardNames.isEmpty()) { LOG.debug("{}: All shard snapshots received", params.id); DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType, params.shardManagerSnapshot, shardSnapshots); params.replyToActor.tell(datastoreSnapshot, getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); } }
@SuppressWarnings("checkstyle:IllegalCatch") private Entry<DataStoreClient, ActorRef> createDatastoreClient( final String shardName, final ActorContext actorContext) throws DOMDataTreeShardCreationFailedException { LOG.debug("{}: Creating distributed datastore client for shard {}", memberName, shardName); final Props distributedDataStoreClientProps = SimpleDataStoreClientActor.props(memberName, "Shard-" + shardName, actorContext, shardName); final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps); try { return new SimpleEntry<>(SimpleDataStoreClientActor .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS), clientActor); } catch (final Exception e) { LOG.error("{}: Failed to get actor for {}", distributedDataStoreClientProps, memberName, e); clientActor.tell(PoisonPill.getInstance(), noSender()); throw new DOMDataTreeShardCreationFailedException( "Unable to create datastore client for shard{" + shardName + "}", e); } }
private void handleSnapshotMessage(final Object message) { if (message instanceof SaveSnapshotFailure) { LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause()); persisting = false; self().tell(PoisonPill.getInstance(), ActorRef.noSender()); } else if (message instanceof SaveSnapshotSuccess) { LOG.debug("{}: got command: {}", persistenceId(), message); SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message; deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(), saved.metadata().timestamp() - 1, 0L, 0L)); persisting = false; unstash(); } else { LOG.debug("{}: stashing command {}", persistenceId(), message); stash(); } }
void cancel(long executionId, Throwable throwable) { ActorRef child = getContext().getChild(String.valueOf(executionId)); if (child != null) { if (!scheduledTerminations.containsKey(child)) { getContext().watch(child); child.tell(new Status.Failure(throwable), getSelf()); // Give the top-level interpreter some time to finish. Otherwise, we will terminate it after a timeout. Cancellable scheduledTermination = getContext().system().scheduler().scheduleOnce( Duration.create(1, TimeUnit.MINUTES), child, PoisonPill.getInstance(), getContext().dispatcher(), getSelf() ); scheduledTerminations.put(child, scheduledTermination); } } else { log.warning("Request to cancel unknown execution {} because of: {}", executionId, throwable); } }
@AfterClass public static void tearDownCluster() throws Exception { if (highAvailabilityServices != null) { highAvailabilityServices.closeAndCleanupAllData(); } if (actorSystem != null) { actorSystem.shutdown(); } if (archiver != null) { archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); } if (jobManager != null) { jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); } if (taskManager != null) { taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); } }
@Override public void run() { while(true) { try { Serializable message = function.apply(null); ObjectNode node = Json.newObject(); node.put("type", messageType); node.put("id", id); node.put("value", Json.toJson(message)); out.tell(node.toString(),self); Thread.sleep(interval); } catch (Exception e) { Logger.error(e.getMessage(),e); self.tell(PoisonPill.getInstance(), self()); } } }
/** * This message is called receipt of data of type I (from upstream pipes). * It ingests the message to produce an object of type O and sends it downstream. * Null handling of messages occurs here, there is no need for it to be implemented * in the 'ingest' method. * @param message The object that is received for processing. */ @Override @SuppressWarnings("unchecked") public final void onReceive(Object message) { if (message != null) { if(message instanceof InitializationMessage) { initializePipe((InitializationMessage)message); } else if (message instanceof StopMessage) { receivedStopMessages++; if (receivedStopMessages.equals(upstreamPipeCount)) { downstreamPipes.forEach(x -> x.tell(new StopMessage(), this.getSelf())); this.getSelf().tell(PoisonPill.getInstance(), this.getSelf()); } } else{ I inbound = (I) message; O outbound = ingest(inbound); send(outbound); } } }
private void handleRollback() throws SQLException { log.debug("rolling back transaction"); ActorRef sender = getSender(), self = getSelf(); executorService.execute(() -> { try { connection.rollback(); sender.tell(new Ack(), self); } catch(Exception e) { log.error("rollback failed: {}", e); sender.tell(new Failure(e), self); } self.tell(PoisonPill.getInstance(), self); }); }
private void handleCommit() throws SQLException { log.debug("committing transaction"); ActorRef sender = getSender(), self = getSelf(); executorService.execute(() -> { try { connection.commit(); sender.tell(new Ack(), self); } catch(Exception e) { log.error("commit failed: {}", e); sender.tell(new Failure(e), self); } self.tell(PoisonPill.getInstance(), self); }); }
protected void handleFinalizeSession(FinalizeSession finalizeSession) { JobState state = finalizeSession.getJobState(); log.debug("finalizing session: {}", state); ActorRef self = getSelf(); f.ask(jobContext, new UpdateJobState(state)).whenComplete((msg0, t0) -> { if(t0 != null) { log.error("couldn't change job state: {}", t0); } f.ask(loader, new SessionFinished(importJob)).whenComplete((msg1, t1) -> { if(t1 != null) { log.error("couldn't finish import session: {}", t1); } log.debug("session finalized"); self.tell(PoisonPill.getInstance(), self); }); }); }
@Override public void produceRouter(ActorSystem system, String role) { ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create( system ).withRole( "io" ); system.actorOf( ClusterSingletonManager.props( Props.create( GuiceActorProducer.class, QueueActorRouter.class ), PoisonPill.getInstance(), settings ), "queueActorRouter" ); ClusterSingletonProxySettings proxySettings = ClusterSingletonProxySettings.create( system ).withRole( role ); system.actorOf( ClusterSingletonProxy.props( "/user/queueActorRouter", proxySettings ), "queueActorRouterProxy" ); }
@Override public void produceRouter(ActorSystem system, String role) { ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create( system ).withRole( "io" ); system.actorOf( ClusterSingletonManager.props( Props.create( GuiceActorProducer.class, QueueWriterRouter.class ), PoisonPill.getInstance(), settings ), "queueWriterRouter" ); ClusterSingletonProxySettings proxySettings = ClusterSingletonProxySettings.create( system ).withRole( role ); system.actorOf( ClusterSingletonProxy.props( "/user/queueWriterRouter", proxySettings ), "queueWriterRouterProxy" ); }
@Override public void produceRouter(ActorSystem system, String role) { ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create( system ).withRole( "io" ); system.actorOf( ClusterSingletonManager.props( Props.create( GuiceActorProducer.class, QueueSenderRouter.class ), PoisonPill.getInstance(), settings ), "queueSenderRouter" ); ClusterSingletonProxySettings proxySettings = ClusterSingletonProxySettings.create( system ).withRole( role ); system.actorOf( ClusterSingletonProxy.props( "/user/queueSenderRouter", proxySettings ), "queueSenderRouterProxy" ); }
@Override public void produceRouter( ActorSystem system, String role ) { ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create( system ).withRole("io"); system.actorOf( ClusterSingletonManager.props( Props.create( GuiceActorProducer.class, UniqueValuesRouter.class ), PoisonPill.getInstance(), settings ), "uvRouter" ); ClusterSingletonProxySettings proxySettings = ClusterSingletonProxySettings.create( system ).withRole( role ); system.actorOf( ClusterSingletonProxy.props( "/user/uvRouter", proxySettings ), "uvProxy" ); subscribeToReservations( system ); }
public MyPriorityMailBox(ActorSystem.Settings settings, Config config) { // Creating a new PriorityGenerator, super(new PriorityGenerator() { @Override public int gen(Object message) { if (message.equals("DISPLAY_LIST")) return 2; // 'DisplayList messages should be treated // last if possible else if (message.equals(PoisonPill.getInstance())) return 3; // PoisonPill when no other left else return 0; // By default they go with high priority } }); }
public MyUnboundedPriorityMailbox(ActorSystem.Settings settings, Config config) { // Creating a new PriorityGenerator, super(new PriorityGenerator() { @Override public int gen(Object message) { if (message instanceof Address) return 0; // Worker Registration messages should be // treated // with highest priority else if (message.equals(PoisonPill.getInstance())) return 3; // PoisonPill when no other left else return 1; // By default they go with medium priority } }); }
@Test public void killOneByOneTest() { final String prefix = "killOneByOneTest"; final Set<ActorPath> broadcastPaths = LongStream.range(0, PRIESTS_COUNT) .boxed() .map(l -> system.child(prefix + l)) .collect(Collectors.toSet()); final List<TestBroadcast> testPriests = LongStream.range(0, PRIESTS_COUNT) .boxed() .map(l -> testBroadcast(prefix, l, new Cluster(broadcastPaths))) .collect(toList()); while (testPriests.size() > PRIESTS_COUNT / 2) { final TestBroadcast currentLeader = testPriests.get(0); final List<String> decrees = Stream .generate(UUID::randomUUID) .map(UUID::toString) .limit(1000) .collect(toList()); for (String v : decrees) { currentLeader.broadcast.tell(new AtomicBroadcastAPI.Broadcast(v), ActorRef.noSender()); testPriests.forEach(p -> p.kit.expectMsg(new AtomicBroadcastAPI.Deliver(v))); } currentLeader.broadcast.tell(PoisonPill.getInstance(), ActorRef.noSender()); testPriests.remove(currentLeader); } }
protected void killActor(TestActorRef<TestRaftActor> actor) { JavaTestKit testkit = new JavaTestKit(getSystem()); testkit.watch(actor); actor.tell(PoisonPill.getInstance(), null); testkit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class); testkit.unwatch(actor); }
private void killActor(ActorRef actor, JavaTestKit kit, boolean remove) { LOG.info("Killing actor {}", actor); kit.watch(actor); actor.tell(PoisonPill.getInstance(), ActorRef.noSender()); kit.expectTerminated(JavaTestKit.duration("5 seconds"), actor); if (remove) { createdActors.remove(actor); } }
@SuppressWarnings("checkstyle:IllegalCatch") private void batchedModifications(BatchedModifications batched) { if (checkClosed()) { if (batched.isReady()) { getSelf().tell(PoisonPill.getInstance(), getSelf()); } return; } try { for (Modification modification: batched.getModifications()) { modification.apply(transaction.getSnapshot()); } totalBatchedModificationsReceived++; if (batched.isReady()) { if (lastBatchedModificationsException != null) { throw lastBatchedModificationsException; } if (totalBatchedModificationsReceived != batched.getTotalMessagesSent()) { throw new IllegalStateException(String.format( "The total number of batched messages received %d does not match the number sent %d", totalBatchedModificationsReceived, batched.getTotalMessagesSent())); } readyTransaction(false, batched.isDoCommitOnReady(), batched.getVersion()); } else { getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf()); } } catch (Exception e) { lastBatchedModificationsException = e; getSender().tell(new akka.actor.Status.Failure(e), getSelf()); if (batched.isReady()) { getSelf().tell(PoisonPill.getInstance(), getSelf()); } } }
private void readyTransaction(boolean returnSerialized, boolean doImmediateCommit, short clientTxVersion) { TransactionIdentifier transactionID = getTransactionId(); LOG.debug("readyTransaction : {}", transactionID); getShardActor().forward(new ForwardedReadyTransaction(transactionID, clientTxVersion, transaction, doImmediateCommit), getContext()); // The shard will handle the commit from here so we're no longer needed - self-destruct. getSelf().tell(PoisonPill.getInstance(), getSelf()); }
@Override protected synchronized void removeRegistration() { if (listenerRegistrationActor != null) { listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender()); listenerRegistrationActor = null; } dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); }
private void closeListenerRegistration() { closed = true; if (registration != null) { registration.close(); onClose.run(); registration = null; if (killSchedule == null) { killSchedule = getContext().system().scheduler().scheduleOnce(Duration.create(killDelay, TimeUnit.MILLISECONDS), getSelf(), PoisonPill.getInstance(), getContext().dispatcher(), ActorRef.noSender()); } } }
private void closeTransaction(final boolean sendReply) { getDOMStoreTransaction().abortFromTransactionActor(); shardActor.tell(new PersistAbortTransactionPayload(transactionId), ActorRef.noSender()); if (sendReply && returnCloseTransactionReply()) { getSender().tell(new CloseTransactionReply(), getSelf()); } getSelf().tell(PoisonPill.getInstance(), getSelf()); }
void removeCommitCohort(final ActorRef sender, final RemoveCohort message) { final ActorRef cohort = message.getCohort(); final RegistrationTreeNode<ActorRef> node = cohortToNode.get(cohort); if (node != null) { removeRegistration(node, cohort); cohortToNode.remove(cohort); } sender.tell(new Status.Success(null), ActorRef.noSender()); cohort.tell(PoisonPill.getInstance(), cohort); }
private void onAddServerFailure(final String shardName, final String message, final Throwable failure, final ActorRef sender, final boolean removeShardOnFailure) { shardReplicaOperationsInProgress.remove(shardName); if (removeShardOnFailure) { ShardInformation shardInfo = localShards.remove(shardName); if (shardInfo.getActor() != null) { shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); } } sender.tell(new Status.Failure(message == null ? failure : new RuntimeException(message, failure)), getSelf()); }
@Override public void close() { // TODO should we also remove all listeners? LOG.debug("Closing {} ShardAccess", prefix); closed = true; if (roleChangeListenerActor != null) { // stop RoleChangeListenerActor roleChangeListenerActor.tell(PoisonPill.getInstance(), noSender()); roleChangeListenerActor = null; } }
@After public void tearDown() { InMemoryJournal.clear(); InMemorySnapshotStore.clear(); mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); mockShardActor = null; actorFactory.close(); }
@Override public void close() { if (rpcManager != null) { LOG.info("Stopping RPC Manager at {}", rpcManager); rpcManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); rpcManager = null; } }
private void switchBehavior(final AbstractClientActorBehavior<?> nextBehavior) { if (!currentBehavior.equals(nextBehavior)) { if (nextBehavior == null) { LOG.debug("{}: shutting down", persistenceId()); self().tell(PoisonPill.getInstance(), ActorRef.noSender()); } else { LOG.debug("{}: switched from {} to {}", persistenceId(), currentBehavior, nextBehavior); } currentBehavior.close(); currentBehavior = nextBehavior; } }
@Override public void stop() { if (_actor != null) { _actor.tell(PoisonPill.getInstance(), ActorRef.noSender()); _actor = null; } }
private CompletionStage<HttpResponse> getHttpResponseForTelemetry( final HttpRequest request, final MessageProcessorsFactory messageProcessorsFactory) { final Optional<HttpHeader> upgradeToWebSocketHeader = request.getHeader("UpgradeToWebSocket"); if (upgradeToWebSocketHeader.orElse(null) instanceof akka.http.impl.engine.ws.UpgradeToWebSocketLowLevel) { final akka.http.impl.engine.ws.UpgradeToWebSocketLowLevel lowLevelUpgradeToWebSocketHeader = (akka.http.impl.engine.ws.UpgradeToWebSocketLowLevel) upgradeToWebSocketHeader.get(); final ActorRef connection = _actorSystem.actorOf(Connection.props(_metrics, messageProcessorsFactory)); final Sink<Message, ?> inChannel = Sink.actorRef(connection, PoisonPill.getInstance()); final Source<Message, ActorRef> outChannel = Source.<Message>actorRef(TELEMETRY_BUFFER_SIZE, OverflowStrategy.dropBuffer()) .<ActorRef>mapMaterializedValue(channel -> { _actorSystem.actorSelection("/user/telemetry").resolveOne(Timeout.apply(1, TimeUnit.SECONDS)).onSuccess( new JavaPartialFunction<ActorRef, Object>() { @Override public Object apply(final ActorRef telemetry, final boolean isCheck) throws Exception { final Connect connectMessage = new Connect(telemetry, connection, channel); connection.tell(connectMessage, ActorRef.noSender()); telemetry.tell(connectMessage, ActorRef.noSender()); return null; } }, _actorSystem.dispatcher() ); return channel; }); return CompletableFuture.completedFuture( lowLevelUpgradeToWebSocketHeader.handleMessagesWith( inChannel, outChannel)); } return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.BAD_REQUEST)); }
@Override public void close() { LOG.info("Closing source"); if (receiverActorSystem != null) { receiverActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); receiverActorSystem.shutdown(); } }
@Override void onEmptySetOfAsynchronousActions() { if (state == State.ALL_OUTPUTS) { state = State.DONE; getSelf().tell(PoisonPill.getInstance(), getSelf()); } }