@Override public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(10, Duration.create(1, TimeUnit.MINUTES), new Function<Throwable, SupervisorStrategy.Directive>() { @Override public SupervisorStrategy.Directive apply(Throwable param) throws Exception { if (param instanceof IllegalArgumentException) return SupervisorStrategy.restart(); if (param instanceof ArithmeticException) return SupervisorStrategy.resume(); if (param instanceof NullPointerException) return SupervisorStrategy.stop(); else return SupervisorStrategy.escalate(); } } ); }
@Override public SupervisorStrategy create() { return new OneForOneStrategy( false, new PFBuilder<Throwable, SupervisorStrategy.Directive>() .match( Exception.class, (Exception e) -> { if (e instanceof ActorKilledException) { LOG.debug("Actor was killed. Stopping it now.", e); } else { LOG.error("Actor failed with exception. Stopping it now.", e); } return SupervisorStrategy.Stop$.MODULE$; }) .build()); }
@Override public SupervisorStrategy supervisorStrategy() { SupervisorStrategyInfo info = javactorInfoByJavactorType .get(javactor.getClass()).getSupervisorStrategyInfo().getInfo(); Duration withinDuration = toDuration(info.getTimeRange(), info.getTimeUnit()); final int maxNumRetries = info.getMaxNumRetries(); final boolean loggingEnabled = info.isLoggingEnabled(); return info.getType().equals(SupervisorStrategyType.ONE_FOR_ONE) ? new OneForOneStrategy(maxNumRetries, withinDuration, myDecider(), loggingEnabled ) : new AllForOneStrategy(maxNumRetries, withinDuration, myDecider(), loggingEnabled ); }
private static SupervisorStrategy buildResumeOnRuntimeErrorStrategy() { return new OneForOneStrategy(-1, Duration.Inf(), new Function<Throwable, SupervisorStrategy.Directive>() { @Override public Directive apply(Throwable throwable) throws Exception { logException(throwable); if (throwable instanceof Error) { return OneForOneStrategy.escalate(); } else if (throwable instanceof RuntimeException) { return OneForOneStrategy.resume(); } else { return OneForOneStrategy.restart(); } } }); }
@Override public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(10, Duration.create("1 minute"), (Function<Throwable, Directive>) t -> { LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t); return SupervisorStrategy.resume(); }); }
@Override public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(10, Duration.create("1 minute"), t -> { LOG.error("An exception happened actor will be resumed", t); return SupervisorStrategy.resume(); }); }
@Override public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(-1, Duration.Inf(), throwable -> { logger.error(throwable, "Unknown session error"); if (throwable instanceof Error) { return OneForOneStrategy.escalate(); } else { return OneForOneStrategy.resume(); } }); }
@Override public Directive apply(Throwable t) { logger.error(t, "Unknown failure"); if (t instanceof RuntimeException) { return SupervisorStrategy.restart(); } else { return SupervisorStrategy.stop(); } }
@Override public SupervisorStrategy supervisorStrategy() { return new AllForOneStrategy(10, Duration.create(1, TimeUnit.HOURS), new Function<Throwable, Directive>() { @Override public Directive apply(Throwable param) throws Exception { if (param instanceof IllegalArgumentException) return escalate(); if (param instanceof ArithmeticException) return escalate(); if (param instanceof NullPointerException) return escalate(); else return stop(); } } ); }
@Override public SupervisorStrategy supervisorStrategy() { return new AllForOneStrategy(10, Duration.create(1, TimeUnit.HOURS), new Function<Throwable, SupervisorStrategy.Directive>() { @Override public SupervisorStrategy.Directive apply(Throwable param) throws Exception { if (param instanceof IllegalArgumentException) return escalate(); if (param instanceof ArithmeticException) return escalate(); if (param instanceof NullPointerException) return escalate(); else return stop(); } } ); }
@Override public SupervisorStrategy supervisorStrategy() { return new AllForOneStrategy(10, Duration.create(1, TimeUnit.HOURS), new Function<Throwable, SupervisorStrategy.Directive>() { @Override public SupervisorStrategy.Directive apply(Throwable param) throws Exception { if (param instanceof IllegalArgumentException) return SupervisorStrategy.stop(); if (param instanceof ArithmeticException) return SupervisorStrategy.resume(); if (param instanceof NullPointerException) return SupervisorStrategy.restart(); else return SupervisorStrategy.escalate(); } } ); }
/** * Returns how to handle the given fault that occurred in a child actor. * * <p>If an exception occurs in the child actor, we send a {@link Status.Failure} message to this actor. * Otherwise, if the {@link Throwable} is not an exception, the failure is escalated; that is, this actor will * fail itself. * * <p>The strategy deactivates the Akka-provided logging, which logs all exception as errors by default. Instead, * this strategy performs its own logging: If the {@link Throwable} is an {@link Exception}, the exception is logged * at the debug level. Otherwise, the {@link Throwable} is logged at the error level. */ private SupervisorStrategy.Directive supervisorDirective(Throwable throwable) { if (throwable instanceof Exception) { InterpreterException exception = throwable instanceof InterpreterException ? (InterpreterException) throwable : new InterpreterException( ExecutionTrace.empty(), String.format("Failure of root-module interpreter %s.", getSender()), throwable ); if (log.isDebugEnabled()) { StringWriter stringWriter = new StringWriter(); PrintWriter writer = new PrintWriter(stringWriter); writer.println( "Exception thrown in root-module interpreter and caught in top-level interpreter."); throwable.printStackTrace(writer); log.debug(stringWriter.toString()); } getSelf().tell(new Status.Failure(exception), getSender()); return SupervisorStrategy.stop(); } else { log.error(throwable, "Error in root-module interpreter. Escalating... The JVM may not survive."); return SupervisorStrategy.escalate(); } }
@Override public SupervisorStrategy.Directive apply(Throwable throwable) { if (throwable instanceof Exception) { return SupervisorStrategy.stop(); } else { return SupervisorStrategy.escalate(); } }
/** * Returns how to handle the given fault that occurred in a child actor. */ private SupervisorStrategy.Directive supervisorDirective(Throwable throwable) { if (throwable instanceof InterpreterException) { return SupervisorStrategy.escalate(); } else if (throwable instanceof Exception) { InterpreterException interpreterException = mapChildException((Exception) throwable); getSelf().tell(new ChildActorFailed(interpreterException), getSelf()); return SupervisorStrategy.stop(); } else { return SupervisorStrategy.escalate(); } }
private SupervisorStrategy.Directive supervisorDirective(Throwable throwable) { // We cannot just re-throw the exception here because it would be caught by the actor system (which would // just restart the actor). We therefore schedule a deferred exception directly in the // CallingThreadExecutor. asyncTaskExecutor.execute(() -> { throw new UncaughtThrowableException(throwable); }); return SupervisorStrategy.stop(); }
@Override public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(-1, Duration.Inf(), t -> { log.info("Throwable, Work is failed for1 "+ t); if (t instanceof ActorInitializationException) return stop(); else if (t instanceof DeathPactException) return stop(); else if (t instanceof RuntimeException) { if (currentJobId!=null) { log.info("RuntimeException, Work is failed for "+ currentJobId); sendToMaster(new MasterWorkerProtocol.WorkFailed(workerId, jobId(),new Result(-1,"","","",null))); } getContext().become(receiveBuilder() .matchAny(p->idle.apply(p)) .build()); return restart(); } else if (t instanceof Exception) { if (currentJobId!=null) { log.info("Exception, Work is failed for "+ currentJobId); sendToMaster(new MasterWorkerProtocol.WorkFailed(workerId, jobId(),new Result(-1,"","","",null))); } getContext().become(receiveBuilder() .matchAny(p->idle.apply(p)) .build()); return restart(); } else { log.info("Throwable, Work is failed for "+ t); return escalate(); } } ); }
@Override public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(-1, Duration.create("1 minute"), t -> { log.error(t, "DroneActor failure caught by supervisor."); System.err.println(t.getMessage()); return SupervisorStrategy.resume(); // Continue on all exceptions! }); }
/** * The supervisor strategy. * * @param notificationRetryNumber * Number of retry when a delivery failed. * @param notificationRetryDuration * How long to wait before attempting to distribute the message * again. */ private static SupervisorStrategy getSupervisorStrategy(int notificationRetryNumber, String notificationRetryDuration) { return new OneForOneStrategy(notificationRetryNumber, Duration.create(notificationRetryDuration), new Function<Throwable, Directive>() { @Override public Directive apply(Throwable t) { log.error("An notification processor reported an exception, retry", t); return resume(); } }); }
private void createActors(ActorSystem actorSystem) { SupervisorStrategy strategy = getSupervisorStrategy(getNotificationRetryNumber(), getNotificationRetryDuration()); this.supervisorActor = actorSystem.actorOf((new RoundRobinPool(getPoolSize())).withSupervisorStrategy(strategy) .props(Props.create(new NotificationMessageProcessingActorCreator(getConfiguration(), getPreferenceManagerPlugin(), getEmailService(), this.getI18nMessagesPlugin()))), SUPERVISOR_ACTOR_NAME); log.info("Actor based notification system is started"); }
/** * Creates a {@link OneForOneStrategy} using the specified parameters. * * @param numberOfRetry * a number of retry * @param withinTimeRange * the time range * @param pluginConfigurationId * the unique id of the plugin configuration */ private static SupervisorStrategy getSupervisorStrategy(int numberOfRetry, Duration withinTimeRange, Long pluginConfigurationId) { final String errorMessage = String.format("An provisioning processor of the plugin %d reported an exception, retry", pluginConfigurationId); return new OneForOneStrategy(numberOfRetry, withinTimeRange, new Function<Throwable, Directive>() { @Override public Directive apply(Throwable t) { log.error(errorMessage, t); return resume(); } }); }
@Test public void test_sup_strategy_all_for_one_type() throws Throwable { javactor = new AllForOneTestJavactor(); final Props props = Props.create(new MyCreator(javactor)); final TestActorRef<JavactorUntypedActor> ref = TestActorRef.create(system, props, "testA"); final SupervisorStrategy supervisorStrategy = ref.underlyingActor().supervisorStrategy(); assertTrue(supervisorStrategy instanceof AllForOneStrategy); AllForOneStrategy afo = (AllForOneStrategy)supervisorStrategy; assertEquals(10, afo.maxNrOfRetries()); assertEquals(Duration.create("1 minute"), afo.withinTimeRange()); }
@Override public Directive apply(Throwable t) { if (t instanceof ArithmeticException) { return SupervisorStrategy.resume() ; } else if (t instanceof NullPointerException) { return SupervisorStrategy.stop(); } else if (t instanceof IllegalArgumentException) { return SupervisorStrategy.stop(); } else { return SupervisorStrategy.escalate(); } }
private static SupervisorStrategy buildResumeOrEscalateStrategy() { return new OneForOneStrategy(-1, Duration.Inf(), new Function<Throwable, SupervisorStrategy.Directive>() { @Override public Directive apply(Throwable throwable) throws Exception { logException(throwable); if (throwable instanceof Error) { return OneForOneStrategy.escalate(); } else { return OneForOneStrategy.resume(); } } }); }
private static SupervisorStrategy buildRestartOrEscalateStrategy() { return new OneForOneStrategy(-1, Duration.Inf(), new Function<Throwable, SupervisorStrategy.Directive>() { @Override public Directive apply(Throwable throwable) throws Exception { logException(throwable); if (throwable instanceof Error) { return OneForOneStrategy.escalate(); } else { return OneForOneStrategy.restart(); } } }); }
@Override public SupervisorStrategy supervisorStrategy() { return strategy; }
@Override public SupervisorStrategy supervisorStrategy() { return SUPERVISOR_STRATEGY; }
@Override public SupervisorStrategy supervisorStrategy() { return supervisorStrategy; }
@Override public SupervisorStrategy supervisorStrategy() { return SupervisorStrategy.stoppingStrategy(); }
@Override public SupervisorStrategy.Directive apply(Throwable throwable) throws Exception { return stop(); }
private Function<Throwable, Directive> myDecider() { return new Function<Throwable, Directive>() { @Override public Directive apply(Throwable t) { if ( t instanceof ActorInitializationException || t instanceof ActorKilledException || t instanceof DeathPactException ) { return SupervisorStrategy.stop(); } else if ( t instanceof Exception ) { Class<? extends Throwable> clazz = t.getClass(); ImmutableSet<Entry<Class<?>, Method>> entrySet = javactorInfoByJavactorType .get(javactor.getClass()).getSupervisorStrategyInfo().getOnExceptionMethods() .entrySet(); for (Entry<Class<?>, Method> entry : entrySet) { if (entry.getKey().isAssignableFrom(clazz)) { final Method method = entry.getValue(); try { return map((SupervisorDirective) methodInvoke( method, javactor, t)); } catch (Exception e) { throw new RuntimeException(e); } } } return SupervisorStrategy.restart(); } else { return SupervisorStrategy.escalate(); } } }; }
@Override public SupervisorStrategy supervisorStrategy() { LOG.debug("Custom supervisorStrategy is used for SetupDocumentTypeWorkerActor!"); return strategy; }
public static final SupervisorStrategy defaultStrategy() { final ExceptionElement exceptionElement=matchExceptionToErrorHadling(); Function<Throwable, Directive> behavior=new Function<Throwable, Directive>(){ @Override public Directive apply(Throwable t) throws Exception { ProcessorException e=(ProcessorException)t; if ( exceptionElement.getStategy()==ErrorStrategy.ONE && exceptionElement.getAction()==Action.SKIP) { stepexcmanager.tell(new MasterWorkerProtocol.WorkFailed(e.getWorkerId(), e.getWorkId()), ActorRef.noSender()); return SupervisorStrategy.resume(); } else if( exceptionElement.getStategy()==ErrorStrategy.ONE && exceptionElement.getAction()==Action.RETRY){ if(currentrestrart < exceptionElement.getTrynumber()-1){ executor.tell(new StepExecutionManager.Work(UUID.randomUUID().toString(),3), stepexcmanager); return SupervisorStrategy.restart(); }else{ stepexcmanager.tell(new MasterWorkerProtocol.WorkFailed(e.getWorkerId(), e.getWorkId()), ActorRef.noSender()); return SupervisorStrategy.resume(); } } else if(exceptionElement.getStategy()==ErrorStrategy.ALL && exceptionElement.getAction()==Action.SKIP){ stepexcmanager.tell(new OrchestratorMasterProtocol.BatchFail(Action.SKIP), ActorRef.noSender()); return SupervisorStrategy.resume(); } else if(exceptionElement.getStategy()==ErrorStrategy.ALL && exceptionElement.getAction()==Action.RETRY){ stepexcmanager.tell(new OrchestratorMasterProtocol.BatchFail(Action.RETRY), ActorRef.noSender()); } return SupervisorStrategy.escalate(); } }; if(exceptionElement!=null){ //AllForOneStrategy: The strategy is applied to all the children if(exceptionElement.getStategy()==ErrorStrategy.ALL){ return new AllForOneStrategy(exceptionElement.getTrynumber(),Duration.create(5,TimeUnit.SECONDS),behavior); } //OneForOneStrategy: The strategy is applied to only the children that fail else if(exceptionElement.getStategy()==ErrorStrategy.ONE){ return new OneForOneStrategy(exceptionElement.getTrynumber(), Duration.create(5,TimeUnit.SECONDS),behavior); } } // The Manager does not know how to handle this error return SupervisorStrategy.defaultStrategy(); }
public static SupervisorStrategy createIoRouterStrategy(AkkaContext context) { return buildResumeOrEscalateStrategy(); }