Java 类akka.actor.ActorSelection 实例源码

项目:hashsdn-controller    文件:TestActorFactory.java   
@SuppressWarnings("checkstyle:IllegalCatch")
private void verifyActorReady(ActorRef actorRef) {
    // Sometimes we see messages go to dead letters soon after creation - it seems the actor isn't quite
    // in a state yet to receive messages or isn't actually created yet. This seems to happen with
    // actorSelection so, to alleviate it, we use an actorSelection and send an Identify message with
    // retries to ensure it's ready.

    Timeout timeout = new Timeout(100, TimeUnit.MILLISECONDS);
    Throwable lastError = null;
    Stopwatch sw = Stopwatch.createStarted();
    while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
        try {
            ActorSelection actorSelection = system.actorSelection(actorRef.path().toString());
            Future<Object> future = Patterns.ask(actorSelection, new Identify(""), timeout);
            ActorIdentity reply = (ActorIdentity)Await.result(future, timeout.duration());
            Assert.assertNotNull("Identify returned null", reply.getRef());
            return;
        } catch (Exception | AssertionError e) {
            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
            lastError = e;
        }
    }

    throw new RuntimeException(lastError);
}
项目:hashsdn-controller    文件:TransactionProxyTest.java   
private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
        throws Exception {
    ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));

    if (exToThrow instanceof PrimaryNotFoundException) {
        doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
    } else {
        doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
                .findPrimaryShardAsync(anyString());
    }

    doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
            any(ActorSelection.class), any(), any(Timeout.class));

    TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);

    propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
}
项目:hashsdn-controller    文件:AbstractLeader.java   
/**
 * Initiates a snapshot capture to install on a follower.
 *
 * <p>
 * Install Snapshot works as follows
 *   1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
 *   2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
 *      the Leader's handleMessage with a SendInstallSnapshot message.
 *   3. The Leader obtains and stores the Snapshot from the SendInstallSnapshot message and sends it in chunks to
 *      the Follower via InstallSnapshot messages.
 *   4. For each chunk, the Follower sends back an InstallSnapshotReply.
 *   5. On receipt of the InstallSnapshotReply for the last chunk, the Leader marks the install complete for that
 *      follower.
 *   6. If another follower requires a snapshot and a snapshot has been collected (via SendInstallSnapshot)
 *      then send the existing snapshot in chunks to the follower.
 *
 * @param followerId the id of the follower.
 * @return true if capture was initiated, false otherwise.
 */
public boolean initiateCaptureSnapshot(final String followerId) {
    FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
    if (snapshotHolder.isPresent()) {
        // If a snapshot is present in the memory, most likely another install is in progress no need to capture
        // snapshot. This could happen if another follower needs an install when one is going on.
        final ActorSelection followerActor = context.getPeerActorSelection(followerId);

        // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
        sendSnapshotChunk(followerActor, followerLogInfo);
        return true;
    }

    boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
        this.getReplicatedToAllIndex(), followerId);
    if (captureInitiated) {
        followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
            context.getConfigParams().getSnapshotChunkSize(), logName()));
    }

    return captureInitiated;
}
项目:hashsdn-controller    文件:AbstractLeader.java   
private void sendInstallSnapshot() {
    log.debug("{}: sendInstallSnapshot", logName());
    for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
        String followerId = e.getKey();
        ActorSelection followerActor = context.getPeerActorSelection(followerId);
        FollowerLogInformation followerLogInfo = e.getValue();

        if (followerActor != null) {
            long nextIndex = followerLogInfo.getNextIndex();
            if (followerLogInfo.getInstallSnapshotState() != null
                    || context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED
                    || canInstallSnapshot(nextIndex)) {
                sendSnapshotChunk(followerActor, followerLogInfo);
            }
        }
    }
}
项目:hashsdn-controller    文件:DataTreeChangeListenerSupportTest.java   
@Test
public void testInitialChangeListenerEventWithContainerPath() throws DataValidationFailedException {
    writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME));

    Entry<MockDataTreeChangeListener, ActorSelection> entry = registerChangeListener(TEST_PATH, 1);
    MockDataTreeChangeListener listener = entry.getKey();

    listener.waitForChangeEvents();
    listener.verifyNotifiedData(TEST_PATH);

    listener.reset(1);

    writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME));
    listener.waitForChangeEvents();
    listener.verifyNotifiedData(TEST_PATH);

    listener.reset(1);
    JavaTestKit kit = new JavaTestKit(getSystem());
    entry.getValue().tell(CloseDataTreeNotificationListenerRegistration.getInstance(), kit.getRef());
    kit.expectMsgClass(JavaTestKit.duration("5 seconds"), CloseDataTreeNotificationListenerRegistrationReply.class);

    writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME));
    listener.verifyNoNotifiedData(TEST_PATH);
}
项目:hashsdn-controller    文件:Shard.java   
private void onMakeLeaderLocal() {
    LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
    if (isLeader()) {
        getSender().tell(new Status.Success(null), getSelf());
        return;
    }

    final ActorSelection leader = getLeader();

    if (leader == null) {
        // Leader is not present. The cluster is most likely trying to
        // elect a leader and we should let that run its normal course

        // TODO we can wait for the election to complete and retry the
        // request. We can also let the caller retry by sending a flag
        // in the response indicating the request is "reTryable".
        getSender().tell(new Failure(
                new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. "
                        + "Currently there is no leader for " + persistenceId())),
                getSelf());
        return;
    }

    leader.tell(new RequestLeadership(getId(), getSender()), getSelf());
}
项目:hashsdn-controller    文件:Shard.java   
@SuppressWarnings("checkstyle:IllegalCatch")
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
    LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionId());

    boolean isLeaderActive = isLeaderActive();
    if (isLeader() && isLeaderActive) {
        try {
            commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
        } catch (Exception e) {
            LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
                    message.getTransactionId(), e);
            getSender().tell(new Failure(e), getSelf());
        }
    } else {
        ActorSelection leader = getLeader();
        if (!isLeaderActive || leader == null) {
            messageRetrySupport.addMessageToRetry(message, getSender(),
                    "Could not process ready local transaction " + message.getTransactionId());
        } else {
            LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
            message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
            leader.forward(message, getContext());
        }
    }
}
项目:hashsdn-controller    文件:Shard.java   
private void handleForwardedReadyTransaction(final ForwardedReadyTransaction forwardedReady) {
    LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionId());

    boolean isLeaderActive = isLeaderActive();
    if (isLeader() && isLeaderActive) {
        commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
    } else {
        ActorSelection leader = getLeader();
        if (!isLeaderActive || leader == null) {
            messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
                    "Could not process forwarded ready transaction " + forwardedReady.getTransactionId());
        } else {
            LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);

            ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(),
                    forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
            readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
            leader.forward(readyLocal, getContext());
        }
    }
}
项目:hashsdn-controller    文件:AbstractDataListenerSupport.java   
protected ActorSelection processListenerRegistrationMessage(M message) {
    final ActorSelection listenerActor = selectActor(message.getListenerActorPath());

    // We have a leader so enable the listener.
    listenerActor.tell(new EnableNotification(true, persistenceId()), getSelf());

    if (!message.isRegisterOnAllInstances()) {
        // This is a leader-only registration so store a reference to the listener actor so it can be notified
        // at a later point if notifications should be enabled or disabled.
        leaderOnlyListenerActors.add(listenerActor);
    }

    allListenerActors.add(listenerActor);

    return listenerActor;
}
项目:hashsdn-controller    文件:DataTreeChangeListenerProxy.java   
private void setListenerRegistrationActor(final ActorSelection actor) {
    if (actor == null) {
        LOG.debug("{}: Ignoring null actor on {}", logContext(), this);
        return;
    }

    synchronized (this) {
        if (!isClosed()) {
            this.listenerRegistrationActor = actor;
            return;
        }
    }

    // This registration has already been closed, notify the actor
    actor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), null);
}
项目:hashsdn-controller    文件:LocalThreePhaseCommitCohort.java   
Future<ActorSelection> initiateCoordinatedCommit() {
    final Future<Object> messageFuture = initiateCommit(false);
    final Future<ActorSelection> ret = TransactionReadyReplyMapper.transform(messageFuture, actorContext,
            transaction.getIdentifier());
    ret.onComplete(new OnComplete<ActorSelection>() {
        @Override
        public void onComplete(final Throwable failure, final ActorSelection success) throws Throwable {
            if (failure != null) {
                LOG.info("Failed to prepare transaction {} on backend", transaction.getIdentifier(), failure);
                transactionAborted(transaction);
                return;
            }

            LOG.debug("Transaction {} resolved to actor {}", transaction.getIdentifier(), success);
        }
    }, actorContext.getClientDispatcher());

    return ret;
}
项目:hashsdn-controller    文件:EntityOwnershipShard.java   
void tryCommitModifications(final BatchedModifications modifications) {
    if (isLeader()) {
        LOG.debug("{}: Committing BatchedModifications {} locally", persistenceId(),
                modifications.getTransactionId());

        // Note that it's possible the commit won't get consensus and will timeout and not be applied
        // to the state. However we don't need to retry it in that case b/c it will be committed to
        // the journal first and, once a majority of followers come back on line and it is replicated,
        // it will be applied at that point.
        handleBatchedModificationsLocal(modifications, self());
    } else {
        final ActorSelection leader = getLeader();
        if (leader != null) {
            possiblyRemoveAllInitialCandidates(leader);

            LOG.debug("{}: Sending BatchedModifications {} to leader {}", persistenceId(),
                    modifications.getTransactionId(), leader);

            Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
                    getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));

            Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender());
        }
    }
}
项目:hashsdn-controller    文件:EntityOwnershipShard.java   
void possiblyRemoveAllInitialCandidates(final ActorSelection leader) {
    // The following handles removing all candidates on startup when re-joining with a remote leader. When a
    // follower is detected as down, the leader will re-assign new owners to entities that were owned by the
    // down member but doesn't remove the down member as a candidate, as the down node may actually be isolated
    // and still running. Therefore on startup we send an initial message to the remote leader to remove any
    // potential stale candidates we had previously registered, as it's possible a candidate may not be
    // registered by a client in the new incarnation. We have to send the RemoveAllCandidates message prior to any
    // pending registrations.
    if (removeAllInitialCandidates && leader != null) {
        removeAllInitialCandidates = false;
        if (!isLeader()) {
            LOG.debug("{} - got new leader {} on startup - sending RemoveAllCandidates", persistenceId(), leader);

            leader.tell(new RemoveAllCandidates(localMemberName), ActorRef.noSender());
        }
    }
}
项目:hashsdn-controller    文件:ActorContextTest.java   
@Test
@SuppressWarnings("checkstyle:IllegalCatch")
public void testExecuteRemoteOperationAsync() {
    new JavaTestKit(getSystem()) {
        {
            ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));

            ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));

            ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
                    mock(ClusterWrapper.class), mock(Configuration.class));

            ActorSelection actor = actorContext.actorSelection(shardActorRef.path());

            Future<Object> future = actorContext.executeOperationAsync(actor, "hello");

            try {
                Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
                assertEquals("Result", "hello", result);
            } catch (Exception e) {
                throw new AssertionError(e);
            }
        }
    };
}
项目:hashsdn-controller    文件:RemoteTransactionContextSupport.java   
/**
 * Sets the target primary shard and initiates a CreateTransaction try.
 */
void setPrimaryShard(PrimaryShardInfo primaryShardInfo) {
    this.primaryShardInfo = primaryShardInfo;

    if (getTransactionType() == TransactionType.WRITE_ONLY
            && getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
        ActorSelection primaryShard = primaryShardInfo.getPrimaryShardActor();

        LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
            getIdentifier(), primaryShard);

        // For write-only Tx's we prepare the transaction modifications directly on the shard actor
        // to avoid the overhead of creating a separate transaction actor.
        transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext(
                primaryShard, String.valueOf(primaryShard.path()), primaryShardInfo.getPrimaryShardVersion()));
    } else {
        tryCreateTransaction();
    }
}
项目:hashsdn-controller    文件:TransactionProxy.java   
private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
        final Set<Entry<String, TransactionContextWrapper>> txContextWrapperEntries) {

    final List<ThreePhaseCommitCohortProxy.CohortInfo> cohorts = new ArrayList<>(txContextWrapperEntries.size());
    for (Entry<String, TransactionContextWrapper> e : txContextWrapperEntries) {
        LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());

        final TransactionContextWrapper wrapper = e.getValue();

        // The remote tx version is obtained the via TransactionContext which may not be available yet so
        // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the
        // TransactionContext is available.
        Supplier<Short> txVersionSupplier = () -> wrapper.getTransactionContext().getTransactionVersion();

        cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(), txVersionSupplier));
    }

    return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohorts, getIdentifier());
}
项目:commelina    文件:AbstractServiceActor.java   
protected ActorSelection selectFrontend() {
    if (frontend == null) {
        BackendFindFrontend backendFindFrontend = (BackendFindFrontend) PatternsCS.ask(getContext().getSystem()
                .actorSelection(Constants.CLUSTER_BACKEND_PATH), BackendFindEvent.getDefaultInstance(), getAskForFrontendTimeout())
                .toCompletableFuture().join();

        frontend = getContext().getSystem().actorSelection(backendFindFrontend.getFrontendAddress());
    }
    return frontend;
}
项目:hashsdn-controller    文件:AbstractDataStoreClientBehaviorTest.java   
private static ActorContext createActorContextMock(final ActorSystem system, final ActorRef actor) {
    final ActorContext mock = mock(ActorContext.class);
    final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
    final ActorSelection selection = system.actorSelection(actor.path());
    final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
    promise.success(shardInfo);
    when(mock.findPrimaryShardAsync(SHARD)).thenReturn(promise.future());
    return mock;
}
项目:hashsdn-controller    文件:ShardTest.java   
@Test
public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
    new ShardTestKit(getSystem()) {
        {
            final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
            dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
                    .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());

            final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
                    TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));

            setupInMemorySnapshotStore();

            final TestActorRef<Shard> shard = actorFactory.createTestActor(
                    newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                    actorFactory.generateActorId(testName + "-shard"));

            waitUntilNoLeader(shard);

            shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
            final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
                    RegisterDataTreeNotificationListenerReply.class);
            assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());

            final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
            regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
            expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);

            shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
                    .customRaftPolicyImplementation(null).build(), ActorRef.noSender());

            listener.expectNoMoreChanges("Received unexpected change after close");
        }
    };
}
项目:hashsdn-controller    文件:ThreePhaseCommitCohortProxy.java   
private ListenableFuture<Void> resolveCohorts() {
    if (cohortsResolvedFuture.isDone()) {
        return cohortsResolvedFuture;
    }

    final AtomicInteger completed = new AtomicInteger(cohorts.size());
    final Object lock = new Object();
    for (final CohortInfo info: cohorts) {
        info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
            @Override
            public void onComplete(final Throwable failure, final ActorSelection actor)  {
                synchronized (lock) {
                    boolean done = completed.decrementAndGet() == 0;
                    if (failure != null) {
                        LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
                        cohortsResolvedFuture.setException(failure);
                    } else if (!cohortsResolvedFuture.isDone()) {
                        LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);

                        info.setResolvedActor(actor);
                        if (done) {
                            LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
                            cohortsResolvedFuture.set(null);
                        }
                    }
                }
            }
        }, actorContext.getClientDispatcher());
    }

    return cohortsResolvedFuture;
}
项目:DHIS2-fhir-lab-app    文件:DefaultOrchestrator.java   
private void queryDHIS2FhirRepositoryResources(MediatorHTTPRequest request)
{
    originalRequest = request;
    ActorSelection httpConnector = getContext().actorSelection(config.userPathFor("http-connector"));
    Map <String, String> headers = new HashMap<>();
    headers.put("Accept", "application/json");
    String resourceInformation="FHIR Trackers resources";
    log.info("Querying the DHIS2 tracker server");



    //builtRequestPath="/baseDstu3/Practitioner?_lastUpdated=>=2016-10-11T09:12:37&_lastUpdated=<=2016-10-13T09:12:45&_pretty=true";

    String ServerApp=mediatorConfiguration.getServerSourceAppName().equals("null")?null:mediatorConfiguration.getServerSourceAppName();

    baseServerRepoURI=FhirMediatorUtilities.buidServerRepoBaseUri(
            this.mediatorConfiguration.getSourceServerScheme(),
            this.mediatorConfiguration.getSourceServerURI(),
            this.mediatorConfiguration.getSourceServerPort(),
            ServerApp,
            this.mediatorConfiguration.getSourceServerFhirDataModel()
    );
    String uriRepServer=baseServerRepoURI;

    String encodedUriSourceServer=FhirMediatorUtilities.encodeUrlToHttpFormat(uriRepServer);
    MediatorHTTPRequest serviceRequest = new MediatorHTTPRequest(
            request.getRequestHandler(),
            getSelf(),
            resourceInformation,
            "GET",
            encodedUriSourceServer,
            null,
            headers,
            null);

    resultOutPutHeader+="requestDateTime:"+new Date().toString()+",";
    httpConnector.tell(serviceRequest, getSelf());

}
项目:hashsdn-controller    文件:TransactionProxyTest.java   
@Test
public void testReadThrottlingWhenShardNotFound() {

    completeOperation(transactionProxy -> {
        doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
                any(ActorSelection.class), eqReadData());

        transactionProxy.read(TestModel.TEST_PATH);

        transactionProxy.read(TestModel.TEST_PATH);
    }, false);
}
项目:sunbird-lms-service    文件:ActorSystemTest.java   
@SuppressWarnings("deprecation")
//@Test
public void testActorRef(){
  Object obj = ActorSystemFactory.getActorSystem().initializeActorSystem();
   if(provider.equalsIgnoreCase("local")){
     assertTrue(obj instanceof ActorRef);
   } else {
     assertTrue(obj instanceof ActorSelection);
   }
}
项目:hashsdn-controller    文件:RaftActorLeadershipTransferCohort.java   
void init() {
    RaftActorContext context = raftActor.getRaftActorContext();
    RaftActorBehavior currentBehavior = raftActor.getCurrentBehavior();

    transferTimer.start();

    Optional<ActorRef> roleChangeNotifier = raftActor.getRoleChangeNotifier();
    if (roleChangeNotifier.isPresent()) {
        roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null,
                currentBehavior.getLeaderPayloadVersion()), raftActor.self());
    }

    for (String peerId: context.getPeerIds()) {
        ActorSelection followerActor = context.getPeerActorSelection(peerId);
        if (followerActor != null) {
            followerActor.tell(new LeaderTransitioning(context.getId()), context.getActor());
        }
    }

    raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) {
        @Override
        protected void doRun() {
            LOG.debug("{}: pauseLeader successfully completed - doing transfer", raftActor.persistenceId());
            doTransfer();
        }

        @Override
        protected void doCancel() {
            LOG.debug("{}: pauseLeader timed out - continuing with transfer", raftActor.persistenceId());
            doTransfer();
        }
    });
}
项目:hashsdn-controller    文件:Candidate.java   
private void startNewTerm() {


        // set voteCount back to 1 (that is voting for self)
        voteCount = 1;

        // Increment the election term and vote for self
        long currentTerm = context.getTermInformation().getCurrentTerm();
        long newTerm = currentTerm + 1;
        context.getTermInformation().updateAndPersist(newTerm, context.getId());

        log.info("{}: Starting new election term {}", logName(), newTerm);

        // Request for a vote
        // TODO: Retry request for vote if replies do not arrive in a reasonable
        // amount of time TBD
        for (String peerId : votingPeers) {
            ActorSelection peerActor = context.getPeerActorSelection(peerId);
            if (peerActor != null) {
                RequestVote requestVote = new RequestVote(
                        context.getTermInformation().getCurrentTerm(),
                        context.getId(),
                        context.getReplicatedLog().lastIndex(),
                        context.getReplicatedLog().lastTerm());

                log.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId);

                peerActor.tell(requestVote, context.getActor());
            }
        }
    }
项目:hashsdn-controller    文件:AbstractLeader.java   
private void sendAppendEntriesToFollower(ActorSelection followerActor, List<ReplicatedLogEntry> entries,
        FollowerLogInformation followerLogInformation) {
    // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from
    // possibly committing and applying conflicting entries (those with same index, different term) from a prior
    // term that weren't replicated to a majority, which would be a violation of raft.
    //     - if the follower isn't active. In this case we don't know the state of the follower and we send an
    //       empty AppendEntries as a heart beat to prevent election.
    //     - if we're in the process of installing a snapshot. In this case we don't send any new entries but still
    //       need to send AppendEntries to prevent election.
    //     - if we're in the process of slicing an AppendEntries with a large log entry payload. In this case we
    //       need to send an empty AppendEntries to prevent election.
    boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null;
    long leaderCommitIndex = isInstallingSnaphot || followerLogInformation.isLogEntrySlicingInProgress()
            || !followerLogInformation.isFollowerActive() ? -1 : context.getCommitIndex();

    long followerNextIndex = followerLogInformation.getNextIndex();
    AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
        getLogEntryIndex(followerNextIndex - 1),
        getLogEntryTerm(followerNextIndex - 1), entries,
        leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion());

    if (!entries.isEmpty() || log.isTraceEnabled()) {
        log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
                appendEntries);
    }

    followerActor.tell(appendEntries, actor());
}
项目:hashsdn-controller    文件:RaftActorContextImpl.java   
@Override public ActorSelection getPeerActorSelection(String peerId) {
    String peerAddress = getPeerAddress(peerId);
    if (peerAddress != null) {
        return actorSelection(peerAddress);
    }
    return null;
}
项目:hashsdn-controller    文件:RaftActorServerConfigurationSupport.java   
private void onNewOperation(ServerOperationContext<?> operationContext) {
    if (raftActor.isLeader()) {
        currentOperationState.onNewOperation(operationContext);
    } else {
        ActorSelection leader = raftActor.getLeader();
        if (leader != null) {
            LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
            leader.tell(operationContext.getOperation(), operationContext.getClientRequestor());
        } else {
            LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
            operationContext.getClientRequestor().tell(operationContext.newReply(
                    ServerChangeStatus.NO_LEADER, null), raftActor.self());
        }
    }
}
项目:hashsdn-controller    文件:RaftActor.java   
/**
 * Derived actor can call getLeader if they need a reference to the Leader.
 * This would be useful for example in forwarding a request to an actor
 * which is the leader
 *
 * @return A reference to the leader if known, null otherwise
 */
public ActorSelection getLeader() {
    String leaderAddress = getLeaderAddress();

    if (leaderAddress == null) {
        return null;
    }

    return context.actorSelection(leaderAddress);
}
项目:hashsdn-controller    文件:MockRaftActorContext.java   
@Override public ActorSelection getPeerActorSelection(final String peerId) {
    String peerAddress = getPeerAddress(peerId);
    if (peerAddress != null) {
        return actorSelection(peerAddress);
    }
    return null;
}
项目:hashsdn-controller    文件:Shard.java   
private void handleBatchedModifications(final BatchedModifications batched) {
    // This message is sent to prepare the modifications transaction directly on the Shard as an
    // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
    // BatchedModifications message, the caller sets the ready flag in the message indicating
    // modifications are complete. The reply contains the cohort actor path (this actor) for the caller
    // to initiate the 3-phase commit. This also avoids the overhead of sending an additional
    // ReadyTransaction message.

    // If we're not the leader then forward to the leader. This is a safety measure - we shouldn't
    // normally get here if we're not the leader as the front-end (TransactionProxy) should determine
    // the primary/leader shard. However with timing and caching on the front-end, there's a small
    // window where it could have a stale leader during leadership transitions.
    //
    boolean isLeaderActive = isLeaderActive();
    if (isLeader() && isLeaderActive) {
        handleBatchedModificationsLocal(batched, getSender());
    } else {
        ActorSelection leader = getLeader();
        if (!isLeaderActive || leader == null) {
            messageRetrySupport.addMessageToRetry(batched, getSender(),
                    "Could not process BatchedModifications " + batched.getTransactionId());
        } else {
            // If this is not the first batch and leadership changed in between batched messages,
            // we need to reconstruct previous BatchedModifications from the transaction
            // DataTreeModification, honoring the max batched modification count, and forward all the
            // previous BatchedModifications to the new leader.
            Collection<BatchedModifications> newModifications = commitCoordinator
                    .createForwardedBatchedModifications(batched,
                            datastoreContext.getShardBatchedModificationCount());

            LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(),
                    newModifications.size(), leader);

            for (BatchedModifications bm : newModifications) {
                leader.forward(bm, getContext());
            }
        }
    }
}
项目:sunbird-lms-mw    文件:ActorSystemTest.java   
@SuppressWarnings("deprecation")
@Test
public void testActorRef(){
  Object obj = ActorSystemFactory.getActorSystem().initializeActorSystem(ActorOperations.CREATE_USER.getValue());
   if(provider.equalsIgnoreCase("local")){
     Assert.assertTrue(obj instanceof ActorRef);
   } else {
     Assert.assertTrue(obj instanceof ActorSelection);
   }
}
项目:hashsdn-controller    文件:RemoteTransactionContext.java   
protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
        ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
    super(identifier, remoteTransactionVersion);
    this.limiter = Preconditions.checkNotNull(limiter);
    this.actor = actor;
    this.actorContext = actorContext;
}
项目:hashsdn-controller    文件:ThreePhaseCommitCohortProxy.java   
@Override
List<Future<ActorSelection>> getCohortFutures() {
    List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
    for (CohortInfo info: cohorts) {
        cohortFutures.add(info.getActorFuture());
    }

    return cohortFutures;
}
项目:hashsdn-controller    文件:LocalThreePhaseCommitCohort.java   
protected LocalThreePhaseCommitCohort(final ActorContext actorContext, final ActorSelection leader,
        final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
        final DataTreeModification modification) {
    this.actorContext = Preconditions.checkNotNull(actorContext);
    this.leader = Preconditions.checkNotNull(leader);
    this.transaction = Preconditions.checkNotNull(transaction);
    this.modification = Preconditions.checkNotNull(modification);
    this.operationError = null;
}
项目:hashsdn-controller    文件:LocalThreePhaseCommitCohort.java   
protected LocalThreePhaseCommitCohort(final ActorContext actorContext, final ActorSelection leader,
        final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final Exception operationError) {
    this.actorContext = Preconditions.checkNotNull(actorContext);
    this.leader = Preconditions.checkNotNull(leader);
    this.transaction = Preconditions.checkNotNull(transaction);
    this.operationError = Preconditions.checkNotNull(operationError);
    this.modification = null;
}
项目:hashsdn-controller    文件:TransactionProxyTest.java   
@Test
public void testExistsCompletion() {
    completeOperation(transactionProxy -> {
        doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
                any(ActorSelection.class), eqDataExists(), any(Timeout.class));

        transactionProxy.exists(TestModel.TEST_PATH);

        transactionProxy.exists(TestModel.TEST_PATH);
    });

}
项目:hashsdn-controller    文件:ConnectClientSuccess.java   
ConnectClientSuccess(final ClientIdentifier target, final long sequence, final ActorRef backend,
    final List<ActorSelection> alternates, final Optional<DataTree> dataTree, final int maxMessages) {
    super(target, sequence);
    this.backend = Preconditions.checkNotNull(backend);
    this.alternates = ImmutableList.copyOf(alternates);
    this.dataTree = dataTree.orElse(null);
    Preconditions.checkArgument(maxMessages > 0, "Maximum messages has to be positive, not %s", maxMessages);
    this.maxMessages = maxMessages;
}
项目:hashsdn-controller    文件:AbstractClientHistoryTest.java   
protected static ActorContext createActorContextMock(final ActorSystem system, final ActorRef actor) {
    final ActorContext mock = mock(ActorContext.class);
    final Promise<PrimaryShardInfo> promise = new DefaultPromise<>();
    final ActorSelection selection = system.actorSelection(actor.path());
    final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
    promise.success(shardInfo);
    when(mock.findPrimaryShardAsync(any())).thenReturn(promise.future());
    return mock;
}