@SuppressWarnings("checkstyle:IllegalCatch") private void verifyActorReady(ActorRef actorRef) { // Sometimes we see messages go to dead letters soon after creation - it seems the actor isn't quite // in a state yet to receive messages or isn't actually created yet. This seems to happen with // actorSelection so, to alleviate it, we use an actorSelection and send an Identify message with // retries to ensure it's ready. Timeout timeout = new Timeout(100, TimeUnit.MILLISECONDS); Throwable lastError = null; Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 10) { try { ActorSelection actorSelection = system.actorSelection(actorRef.path().toString()); Future<Object> future = Patterns.ask(actorSelection, new Identify(""), timeout); ActorIdentity reply = (ActorIdentity)Await.result(future, timeout.duration()); Assert.assertNotNull("Identify returned null", reply.getRef()); return; } catch (Exception | AssertionError e) { Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); lastError = e; } } throw new RuntimeException(lastError); }
private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker) throws Exception { ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); if (exToThrow instanceof PrimaryNotFoundException) { doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString()); } else { doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext) .findPrimaryShardAsync(anyString()); } doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync( any(ActorSelection.class), any(), any(Timeout.class)); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); propagateReadFailedExceptionCause(invoker.invoke(transactionProxy)); }
/** * Initiates a snapshot capture to install on a follower. * * <p> * Install Snapshot works as follows * 1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor. * 2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to * the Leader's handleMessage with a SendInstallSnapshot message. * 3. The Leader obtains and stores the Snapshot from the SendInstallSnapshot message and sends it in chunks to * the Follower via InstallSnapshot messages. * 4. For each chunk, the Follower sends back an InstallSnapshotReply. * 5. On receipt of the InstallSnapshotReply for the last chunk, the Leader marks the install complete for that * follower. * 6. If another follower requires a snapshot and a snapshot has been collected (via SendInstallSnapshot) * then send the existing snapshot in chunks to the follower. * * @param followerId the id of the follower. * @return true if capture was initiated, false otherwise. */ public boolean initiateCaptureSnapshot(final String followerId) { FollowerLogInformation followerLogInfo = followerToLog.get(followerId); if (snapshotHolder.isPresent()) { // If a snapshot is present in the memory, most likely another install is in progress no need to capture // snapshot. This could happen if another follower needs an install when one is going on. final ActorSelection followerActor = context.getPeerActorSelection(followerId); // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState. sendSnapshotChunk(followerActor, followerLogInfo); return true; } boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), this.getReplicatedToAllIndex(), followerId); if (captureInitiated) { followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState( context.getConfigParams().getSnapshotChunkSize(), logName())); } return captureInitiated; }
private void sendInstallSnapshot() { log.debug("{}: sendInstallSnapshot", logName()); for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) { String followerId = e.getKey(); ActorSelection followerActor = context.getPeerActorSelection(followerId); FollowerLogInformation followerLogInfo = e.getValue(); if (followerActor != null) { long nextIndex = followerLogInfo.getNextIndex(); if (followerLogInfo.getInstallSnapshotState() != null || context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED || canInstallSnapshot(nextIndex)) { sendSnapshotChunk(followerActor, followerLogInfo); } } } }
@Test public void testInitialChangeListenerEventWithContainerPath() throws DataValidationFailedException { writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME)); Entry<MockDataTreeChangeListener, ActorSelection> entry = registerChangeListener(TEST_PATH, 1); MockDataTreeChangeListener listener = entry.getKey(); listener.waitForChangeEvents(); listener.verifyNotifiedData(TEST_PATH); listener.reset(1); writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME)); listener.waitForChangeEvents(); listener.verifyNotifiedData(TEST_PATH); listener.reset(1); JavaTestKit kit = new JavaTestKit(getSystem()); entry.getValue().tell(CloseDataTreeNotificationListenerRegistration.getInstance(), kit.getRef()); kit.expectMsgClass(JavaTestKit.duration("5 seconds"), CloseDataTreeNotificationListenerRegistrationReply.class); writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME)); listener.verifyNoNotifiedData(TEST_PATH); }
private void onMakeLeaderLocal() { LOG.debug("{}: onMakeLeaderLocal received", persistenceId()); if (isLeader()) { getSender().tell(new Status.Success(null), getSelf()); return; } final ActorSelection leader = getLeader(); if (leader == null) { // Leader is not present. The cluster is most likely trying to // elect a leader and we should let that run its normal course // TODO we can wait for the election to complete and retry the // request. We can also let the caller retry by sending a flag // in the response indicating the request is "reTryable". getSender().tell(new Failure( new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. " + "Currently there is no leader for " + persistenceId())), getSelf()); return; } leader.tell(new RequestLeadership(getId(), getSender()), getSelf()); }
@SuppressWarnings("checkstyle:IllegalCatch") private void handleReadyLocalTransaction(final ReadyLocalTransaction message) { LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionId()); boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { try { commitCoordinator.handleReadyLocalTransaction(message, getSender(), this); } catch (Exception e) { LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), message.getTransactionId(), e); getSender().tell(new Failure(e), getSelf()); } } else { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(message, getSender(), "Could not process ready local transaction " + message.getTransactionId()); } else { LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader); message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(message, getContext()); } } }
private void handleForwardedReadyTransaction(final ForwardedReadyTransaction forwardedReady) { LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionId()); boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this); } else { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(forwardedReady, getSender(), "Could not process forwarded ready transaction " + forwardedReady.getTransactionId()); } else { LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader); ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(), forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit()); readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(readyLocal, getContext()); } } }
protected ActorSelection processListenerRegistrationMessage(M message) { final ActorSelection listenerActor = selectActor(message.getListenerActorPath()); // We have a leader so enable the listener. listenerActor.tell(new EnableNotification(true, persistenceId()), getSelf()); if (!message.isRegisterOnAllInstances()) { // This is a leader-only registration so store a reference to the listener actor so it can be notified // at a later point if notifications should be enabled or disabled. leaderOnlyListenerActors.add(listenerActor); } allListenerActors.add(listenerActor); return listenerActor; }
private void setListenerRegistrationActor(final ActorSelection actor) { if (actor == null) { LOG.debug("{}: Ignoring null actor on {}", logContext(), this); return; } synchronized (this) { if (!isClosed()) { this.listenerRegistrationActor = actor; return; } } // This registration has already been closed, notify the actor actor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), null); }
Future<ActorSelection> initiateCoordinatedCommit() { final Future<Object> messageFuture = initiateCommit(false); final Future<ActorSelection> ret = TransactionReadyReplyMapper.transform(messageFuture, actorContext, transaction.getIdentifier()); ret.onComplete(new OnComplete<ActorSelection>() { @Override public void onComplete(final Throwable failure, final ActorSelection success) throws Throwable { if (failure != null) { LOG.info("Failed to prepare transaction {} on backend", transaction.getIdentifier(), failure); transactionAborted(transaction); return; } LOG.debug("Transaction {} resolved to actor {}", transaction.getIdentifier(), success); } }, actorContext.getClientDispatcher()); return ret; }
void tryCommitModifications(final BatchedModifications modifications) { if (isLeader()) { LOG.debug("{}: Committing BatchedModifications {} locally", persistenceId(), modifications.getTransactionId()); // Note that it's possible the commit won't get consensus and will timeout and not be applied // to the state. However we don't need to retry it in that case b/c it will be committed to // the journal first and, once a majority of followers come back on line and it is replicated, // it will be applied at that point. handleBatchedModificationsLocal(modifications, self()); } else { final ActorSelection leader = getLeader(); if (leader != null) { possiblyRemoveAllInitialCandidates(leader); LOG.debug("{}: Sending BatchedModifications {} to leader {}", persistenceId(), modifications.getTransactionId(), leader); Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis( getDatastoreContext().getShardTransactionCommitTimeoutInSeconds())); Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender()); } } }
void possiblyRemoveAllInitialCandidates(final ActorSelection leader) { // The following handles removing all candidates on startup when re-joining with a remote leader. When a // follower is detected as down, the leader will re-assign new owners to entities that were owned by the // down member but doesn't remove the down member as a candidate, as the down node may actually be isolated // and still running. Therefore on startup we send an initial message to the remote leader to remove any // potential stale candidates we had previously registered, as it's possible a candidate may not be // registered by a client in the new incarnation. We have to send the RemoveAllCandidates message prior to any // pending registrations. if (removeAllInitialCandidates && leader != null) { removeAllInitialCandidates = false; if (!isLeader()) { LOG.debug("{} - got new leader {} on startup - sending RemoveAllCandidates", persistenceId(), leader); leader.tell(new RemoveAllCandidates(localMemberName), ActorRef.noSender()); } } }
@Test @SuppressWarnings("checkstyle:IllegalCatch") public void testExecuteRemoteOperationAsync() { new JavaTestKit(getSystem()) { { ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef)); ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, mock(ClusterWrapper.class), mock(Configuration.class)); ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); Future<Object> future = actorContext.executeOperationAsync(actor, "hello"); try { Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS)); assertEquals("Result", "hello", result); } catch (Exception e) { throw new AssertionError(e); } } }; }
/** * Sets the target primary shard and initiates a CreateTransaction try. */ void setPrimaryShard(PrimaryShardInfo primaryShardInfo) { this.primaryShardInfo = primaryShardInfo; if (getTransactionType() == TransactionType.WRITE_ONLY && getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { ActorSelection primaryShard = primaryShardInfo.getPrimaryShardActor(); LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context", getIdentifier(), primaryShard); // For write-only Tx's we prepare the transaction modifications directly on the shard actor // to avoid the overhead of creating a separate transaction actor. transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext( primaryShard, String.valueOf(primaryShard.path()), primaryShardInfo.getPrimaryShardVersion())); } else { tryCreateTransaction(); } }
private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort( final Set<Entry<String, TransactionContextWrapper>> txContextWrapperEntries) { final List<ThreePhaseCommitCohortProxy.CohortInfo> cohorts = new ArrayList<>(txContextWrapperEntries.size()); for (Entry<String, TransactionContextWrapper> e : txContextWrapperEntries) { LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey()); final TransactionContextWrapper wrapper = e.getValue(); // The remote tx version is obtained the via TransactionContext which may not be available yet so // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the // TransactionContext is available. Supplier<Short> txVersionSupplier = () -> wrapper.getTransactionContext().getTransactionVersion(); cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(), txVersionSupplier)); } return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohorts, getIdentifier()); }
protected ActorSelection selectFrontend() { if (frontend == null) { BackendFindFrontend backendFindFrontend = (BackendFindFrontend) PatternsCS.ask(getContext().getSystem() .actorSelection(Constants.CLUSTER_BACKEND_PATH), BackendFindEvent.getDefaultInstance(), getAskForFrontendTimeout()) .toCompletableFuture().join(); frontend = getContext().getSystem().actorSelection(backendFindFrontend.getFrontendAddress()); } return frontend; }
private static ActorContext createActorContextMock(final ActorSystem system, final ActorRef actor) { final ActorContext mock = mock(ActorContext.class); final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>(); final ActorSelection selection = system.actorSelection(actor.path()); final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0); promise.success(shardInfo); when(mock.findPrimaryShardAsync(SHARD)).thenReturn(promise.future()); return mock; }
@Test public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception { new ShardTestKit(getSystem()) { { final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed"; dataStoreContextBuilder.shardElectionTimeoutFactor(1000) .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0); final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener")); setupInMemorySnapshotStore(); final TestActorRef<Shard> shard = actorFactory.createTestActor( newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(testName + "-shard")); waitUntilNoLeader(shard); shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef()); final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"), RegisterDataTreeNotificationListenerReply.class); assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath()); regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef()); expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class); shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()) .customRaftPolicyImplementation(null).build(), ActorRef.noSender()); listener.expectNoMoreChanges("Received unexpected change after close"); } }; }
private ListenableFuture<Void> resolveCohorts() { if (cohortsResolvedFuture.isDone()) { return cohortsResolvedFuture; } final AtomicInteger completed = new AtomicInteger(cohorts.size()); final Object lock = new Object(); for (final CohortInfo info: cohorts) { info.getActorFuture().onComplete(new OnComplete<ActorSelection>() { @Override public void onComplete(final Throwable failure, final ActorSelection actor) { synchronized (lock) { boolean done = completed.decrementAndGet() == 0; if (failure != null) { LOG.debug("Tx {}: a cohort Future failed", transactionId, failure); cohortsResolvedFuture.setException(failure); } else if (!cohortsResolvedFuture.isDone()) { LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor); info.setResolvedActor(actor); if (done) { LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId); cohortsResolvedFuture.set(null); } } } } }, actorContext.getClientDispatcher()); } return cohortsResolvedFuture; }
private void queryDHIS2FhirRepositoryResources(MediatorHTTPRequest request) { originalRequest = request; ActorSelection httpConnector = getContext().actorSelection(config.userPathFor("http-connector")); Map <String, String> headers = new HashMap<>(); headers.put("Accept", "application/json"); String resourceInformation="FHIR Trackers resources"; log.info("Querying the DHIS2 tracker server"); //builtRequestPath="/baseDstu3/Practitioner?_lastUpdated=>=2016-10-11T09:12:37&_lastUpdated=<=2016-10-13T09:12:45&_pretty=true"; String ServerApp=mediatorConfiguration.getServerSourceAppName().equals("null")?null:mediatorConfiguration.getServerSourceAppName(); baseServerRepoURI=FhirMediatorUtilities.buidServerRepoBaseUri( this.mediatorConfiguration.getSourceServerScheme(), this.mediatorConfiguration.getSourceServerURI(), this.mediatorConfiguration.getSourceServerPort(), ServerApp, this.mediatorConfiguration.getSourceServerFhirDataModel() ); String uriRepServer=baseServerRepoURI; String encodedUriSourceServer=FhirMediatorUtilities.encodeUrlToHttpFormat(uriRepServer); MediatorHTTPRequest serviceRequest = new MediatorHTTPRequest( request.getRequestHandler(), getSelf(), resourceInformation, "GET", encodedUriSourceServer, null, headers, null); resultOutPutHeader+="requestDateTime:"+new Date().toString()+","; httpConnector.tell(serviceRequest, getSelf()); }
@Test public void testReadThrottlingWhenShardNotFound() { completeOperation(transactionProxy -> { doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( any(ActorSelection.class), eqReadData()); transactionProxy.read(TestModel.TEST_PATH); transactionProxy.read(TestModel.TEST_PATH); }, false); }
@SuppressWarnings("deprecation") //@Test public void testActorRef(){ Object obj = ActorSystemFactory.getActorSystem().initializeActorSystem(); if(provider.equalsIgnoreCase("local")){ assertTrue(obj instanceof ActorRef); } else { assertTrue(obj instanceof ActorSelection); } }
void init() { RaftActorContext context = raftActor.getRaftActorContext(); RaftActorBehavior currentBehavior = raftActor.getCurrentBehavior(); transferTimer.start(); Optional<ActorRef> roleChangeNotifier = raftActor.getRoleChangeNotifier(); if (roleChangeNotifier.isPresent()) { roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null, currentBehavior.getLeaderPayloadVersion()), raftActor.self()); } for (String peerId: context.getPeerIds()) { ActorSelection followerActor = context.getPeerActorSelection(peerId); if (followerActor != null) { followerActor.tell(new LeaderTransitioning(context.getId()), context.getActor()); } } raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) { @Override protected void doRun() { LOG.debug("{}: pauseLeader successfully completed - doing transfer", raftActor.persistenceId()); doTransfer(); } @Override protected void doCancel() { LOG.debug("{}: pauseLeader timed out - continuing with transfer", raftActor.persistenceId()); doTransfer(); } }); }
private void startNewTerm() { // set voteCount back to 1 (that is voting for self) voteCount = 1; // Increment the election term and vote for self long currentTerm = context.getTermInformation().getCurrentTerm(); long newTerm = currentTerm + 1; context.getTermInformation().updateAndPersist(newTerm, context.getId()); log.info("{}: Starting new election term {}", logName(), newTerm); // Request for a vote // TODO: Retry request for vote if replies do not arrive in a reasonable // amount of time TBD for (String peerId : votingPeers) { ActorSelection peerActor = context.getPeerActorSelection(peerId); if (peerActor != null) { RequestVote requestVote = new RequestVote( context.getTermInformation().getCurrentTerm(), context.getId(), context.getReplicatedLog().lastIndex(), context.getReplicatedLog().lastTerm()); log.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId); peerActor.tell(requestVote, context.getActor()); } } }
private void sendAppendEntriesToFollower(ActorSelection followerActor, List<ReplicatedLogEntry> entries, FollowerLogInformation followerLogInformation) { // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from // possibly committing and applying conflicting entries (those with same index, different term) from a prior // term that weren't replicated to a majority, which would be a violation of raft. // - if the follower isn't active. In this case we don't know the state of the follower and we send an // empty AppendEntries as a heart beat to prevent election. // - if we're in the process of installing a snapshot. In this case we don't send any new entries but still // need to send AppendEntries to prevent election. // - if we're in the process of slicing an AppendEntries with a large log entry payload. In this case we // need to send an empty AppendEntries to prevent election. boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null; long leaderCommitIndex = isInstallingSnaphot || followerLogInformation.isLogEntrySlicingInProgress() || !followerLogInformation.isFollowerActive() ? -1 : context.getCommitIndex(); long followerNextIndex = followerLogInformation.getNextIndex(); AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), getLogEntryIndex(followerNextIndex - 1), getLogEntryTerm(followerNextIndex - 1), entries, leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion()); if (!entries.isEmpty() || log.isTraceEnabled()) { log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(), appendEntries); } followerActor.tell(appendEntries, actor()); }
@Override public ActorSelection getPeerActorSelection(String peerId) { String peerAddress = getPeerAddress(peerId); if (peerAddress != null) { return actorSelection(peerAddress); } return null; }
private void onNewOperation(ServerOperationContext<?> operationContext) { if (raftActor.isLeader()) { currentOperationState.onNewOperation(operationContext); } else { ActorSelection leader = raftActor.getLeader(); if (leader != null) { LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader); leader.tell(operationContext.getOperation(), operationContext.getClientRequestor()); } else { LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId()); operationContext.getClientRequestor().tell(operationContext.newReply( ServerChangeStatus.NO_LEADER, null), raftActor.self()); } } }
/** * Derived actor can call getLeader if they need a reference to the Leader. * This would be useful for example in forwarding a request to an actor * which is the leader * * @return A reference to the leader if known, null otherwise */ public ActorSelection getLeader() { String leaderAddress = getLeaderAddress(); if (leaderAddress == null) { return null; } return context.actorSelection(leaderAddress); }
@Override public ActorSelection getPeerActorSelection(final String peerId) { String peerAddress = getPeerAddress(peerId); if (peerAddress != null) { return actorSelection(peerAddress); } return null; }
private void handleBatchedModifications(final BatchedModifications batched) { // This message is sent to prepare the modifications transaction directly on the Shard as an // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last // BatchedModifications message, the caller sets the ready flag in the message indicating // modifications are complete. The reply contains the cohort actor path (this actor) for the caller // to initiate the 3-phase commit. This also avoids the overhead of sending an additional // ReadyTransaction message. // If we're not the leader then forward to the leader. This is a safety measure - we shouldn't // normally get here if we're not the leader as the front-end (TransactionProxy) should determine // the primary/leader shard. However with timing and caching on the front-end, there's a small // window where it could have a stale leader during leadership transitions. // boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { handleBatchedModificationsLocal(batched, getSender()); } else { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(batched, getSender(), "Could not process BatchedModifications " + batched.getTransactionId()); } else { // If this is not the first batch and leadership changed in between batched messages, // we need to reconstruct previous BatchedModifications from the transaction // DataTreeModification, honoring the max batched modification count, and forward all the // previous BatchedModifications to the new leader. Collection<BatchedModifications> newModifications = commitCoordinator .createForwardedBatchedModifications(batched, datastoreContext.getShardBatchedModificationCount()); LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(), newModifications.size(), leader); for (BatchedModifications bm : newModifications) { leader.forward(bm, getContext()); } } } }
@SuppressWarnings("deprecation") @Test public void testActorRef(){ Object obj = ActorSystemFactory.getActorSystem().initializeActorSystem(ActorOperations.CREATE_USER.getValue()); if(provider.equalsIgnoreCase("local")){ Assert.assertTrue(obj instanceof ActorRef); } else { Assert.assertTrue(obj instanceof ActorSelection); } }
protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor, ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) { super(identifier, remoteTransactionVersion); this.limiter = Preconditions.checkNotNull(limiter); this.actor = actor; this.actorContext = actorContext; }
@Override List<Future<ActorSelection>> getCohortFutures() { List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size()); for (CohortInfo info: cohorts) { cohortFutures.add(info.getActorFuture()); } return cohortFutures; }
protected LocalThreePhaseCommitCohort(final ActorContext actorContext, final ActorSelection leader, final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final DataTreeModification modification) { this.actorContext = Preconditions.checkNotNull(actorContext); this.leader = Preconditions.checkNotNull(leader); this.transaction = Preconditions.checkNotNull(transaction); this.modification = Preconditions.checkNotNull(modification); this.operationError = null; }
protected LocalThreePhaseCommitCohort(final ActorContext actorContext, final ActorSelection leader, final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final Exception operationError) { this.actorContext = Preconditions.checkNotNull(actorContext); this.leader = Preconditions.checkNotNull(leader); this.transaction = Preconditions.checkNotNull(transaction); this.operationError = Preconditions.checkNotNull(operationError); this.modification = null; }
@Test public void testExistsCompletion() { completeOperation(transactionProxy -> { doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync( any(ActorSelection.class), eqDataExists(), any(Timeout.class)); transactionProxy.exists(TestModel.TEST_PATH); transactionProxy.exists(TestModel.TEST_PATH); }); }
ConnectClientSuccess(final ClientIdentifier target, final long sequence, final ActorRef backend, final List<ActorSelection> alternates, final Optional<DataTree> dataTree, final int maxMessages) { super(target, sequence); this.backend = Preconditions.checkNotNull(backend); this.alternates = ImmutableList.copyOf(alternates); this.dataTree = dataTree.orElse(null); Preconditions.checkArgument(maxMessages > 0, "Maximum messages has to be positive, not %s", maxMessages); this.maxMessages = maxMessages; }
protected static ActorContext createActorContextMock(final ActorSystem system, final ActorRef actor) { final ActorContext mock = mock(ActorContext.class); final Promise<PrimaryShardInfo> promise = new DefaultPromise<>(); final ActorSelection selection = system.actorSelection(actor.path()); final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0); promise.success(shardInfo); when(mock.findPrimaryShardAsync(any())).thenReturn(promise.future()); return mock; }