@BeforeClass public static void setup() { handler.addExceptionElement(e1); handler.addExceptionElement(e2); s2=new ActorStep("step2",5, null,impl2,handler); s1=new ActorStep("step1",5, s2,impl1,handler); s1.setWorkRef(0);model.add(s1); s2.setWorkRef(1);model.add(s2); for(int i=0;i<10;i++){ data.add(i+1); } orchestrator = TypedActor.get(_system).typedActorOf( new TypedProps<OrchestratorImpl>(Orchestrator.class, new Creator<OrchestratorImpl>() { /** * */ private static final long serialVersionUID = 1L; public OrchestratorImpl create() { return new OrchestratorImpl(model,batchproducer); } }), "orchestrator"); }
public ChartCacheImpl(DefaultChartBuilder chartBuilder, EventManager eventManager) { this.chartBuilder = chartBuilder; final ChartCache cc = TypedActor.<ChartCache> self(); final EventReceiver er = new EventReceiver() { @Override public void end() { } @Override public void end(Throwable e) { } @Override public void push(OrderedEvent oe) { if (oe.event().type.startsWith("file:")) { // Trigger notification for event cc.cleanup(oe.event().info("id")); } } }; eventManager.tell(EventReceiverMessage.add(er, null)); }
protected void attachEventReceiver() { final Notifier n = TypedActor.<Notifier>self(); final EventReceiver er = new EventReceiver() { @Override public void end() {} @Override public void end(Throwable e) {} @Override public void push(OrderedEvent oe) { // Skip out-of-date messages if (oe.event().type.equals("outofdate")) return; // Trigger notification for event n.handleEvent(oe); } }; fileStore.getEventManager().tell(EventReceiverMessage.add(er, null)); }
public static void main(String[] args) throws Exception { ActorSystem _system = ActorSystem.create("TypedActorsExample"); CalculatorInt calculator1 = TypedActor.get(_system).typedActorOf( new TypedProps<Calculator>(CalculatorInt.class, Calculator.class)); CalculatorInt calculator2 = TypedActor.get(_system).typedActorOf( new TypedProps<Calculator>(CalculatorInt.class, Calculator.class)); // Create a router with Typed Actors ActorRef actor1 = TypedActor.get(_system).getActorRefFor(calculator1); ActorRef actor2 = TypedActor.get(_system).getActorRefFor(calculator2); Iterable<ActorRef> routees = Arrays.asList(new ActorRef[] { actor1, actor2 }); ActorRef router = _system.actorOf(new Props() .withRouter(BroadcastRouter.create(routees))); router.tell("Hello there"); _system.shutdown(); }
@Override public void preStart() { this.batchstatus=BatchStatus.STARTING; for(int i=0;i<steps.size();i++){ ActorStep step=steps.get(i); ActorRef manager=TypedActor.context().actorOf(Props.create(StepExecutionManager.class, getSelf(),step.getName(),step.getCapacity(),step.getImplementation(),step.getErrorsHandler())); managers.put(step.getName(), new ManagerState(manager,Idle.instance,step)); } }
private F.Promise<List<Chart>> getPromiseOfCharts(final String id) { // are the charts currently fetched by an already created promise? // if so reuse that promise to avoid multiple datasources to be opened. F.Promise<List<Chart>> p = pmap.get(id); if(p == null) { // Get reference to ourself, so we can update the cache asynchronously final ChartCache self = TypedActor.self(); // Create a promise based on an asynchronous operation p = F.Promise.promise(new F.Function0<List<Chart>>() { @Override public List<Chart> apply() throws Throwable { try { // Get the charts (non-modifying operation) final List<Chart> charts = actuallyGetCharts(id); // Schedule update of self with new cache value self.update(id, charts); // Fulfil promise with charts return charts; } finally { pmap.remove(id); } } }, TypedActor.dispatcher()); // Execute on our own thread-pool pmap.put(id, p); } return p; }
private <T> T typedActor(final Injector injector, final Class<T> iface, final Class <? extends T> impl) { return TypedActor.get(Akka.system()).typedActorOf( new TypedProps<T>(iface, new Creator<T>() { @Override public T create() { return injector.getInstance(impl); } })); }
public HttpHandler(final ActorSystem system, final ActorRef coordinator) { super(); this.coordinator = coordinator; this.system = system; requester = TypedActor.get(system).typedActorOf( new TypedProps<RequestHandlerActor>( RequestHandlerActor.class, new Creator<RequestHandlerActor>() { public RequestHandlerActor create() { return new RequestHandlerActor(coordinator); } }), "entrypoint"); }
public static void main(String[] args) throws Exception { ActorSystem _system = ActorSystem.create("TypedActorsExample", ConfigFactory.load().getConfig("TypedActorExample")); CalculatorInt calculator = TypedActor.get(_system).typedActorOf( new TypedProps<SupervisorActor>(CalculatorInt.class, SupervisorActor.class),"supervisorActor"); // Get access to the ActorRef ActorRef calActor = TypedActor.get(_system).getActorRefFor(calculator); // call actor with a message calActor.tell("Hi there",calActor); //wait for child actor to get restarted Thread.sleep(500); // Invoke the method and wait for result Timeout timeout = new Timeout(Duration.parse("5 seconds")); Future<Object> future = Patterns.ask(calActor, Integer.valueOf(10), timeout); Integer result = (Integer) Await.result(future, timeout.duration()); System.out.println("Result from child actor->" + result); //wait before shutting down the system Thread.sleep(500); _system.shutdown(); }
public static void main(String[] args) throws Exception { ActorSystem _system = ActorSystem.create("TypedActorsExample"); Timeout timeout = new Timeout(Duration.parse("5 seconds")); CalculatorInt calculator = TypedActor.get(_system).typedActorOf( new TypedProps<Calculator>(CalculatorInt.class, Calculator.class)); // calling a fire and forget method calculator.incrementCount(); // Invoke the method and wait for result Future<Integer> future = calculator.add(Integer.valueOf(14), Integer.valueOf(6)); Integer result = Await.result(future, timeout.duration()); System.out.println("Result is " + result); Option<Integer> counterResult = calculator.incrementAndReturn(); System.out.println("Result is " + counterResult.get()); counterResult = calculator.incrementAndReturn(); System.out.println("Result is " + counterResult.get()); // Get access to the ActorRef ActorRef calActor = TypedActor.get(_system).getActorRefFor(calculator); // call actor with a message calActor.tell("Hi there"); _system.shutdown(); }
public static void main(String[] args) throws Exception { ActorSystem _system = ActorSystem.create("TypedActorsExample", ConfigFactory.load().getConfig("TypedActorExample")); Timeout timeout = new Timeout(Duration.parse("5 seconds")); CalculatorInt calculator = TypedActor.get(_system).typedActorOf( new TypedProps<Calculator>(CalculatorInt.class, Calculator.class).withDispatcher("defaultDispatcher")); // calling a fire and forget method calculator.incrementCount(); // Invoke the method and wait for result Future<Integer> future = calculator.add(Integer.valueOf(14), Integer.valueOf(6)); Integer result = Await.result(future, timeout.duration()); System.out.println("Result is " + result); Option<Integer> counterResult = calculator.incrementAndReturn(); System.out.println("Result is " + counterResult.get()); counterResult = calculator.incrementAndReturn(); System.out.println("Result is " + counterResult.get()); // Get access to the ActorRef ActorRef calActor = TypedActor.get(_system).getActorRefFor(calculator); // call actor with a message calActor.tell("Hi there"); _system.shutdown(); }
@SuppressWarnings("serial") public static void main(String[] args) { // instanciation of the actor model ActorSystem _system = ActorSystem.create("Karajan"); // Data to be processed final List<Integer> data=new ArrayList<Integer>(); for(int i=0;i<10;i++){ data.add(i+1); } // Some implementations String impl1="com.wordline.awltech.karajan.orchestrator.masterslavepullpatterntest.Implementation1"; String impl2="com.wordline.awltech.karajan.orchestrator.masterslavepullpatterntest.Implementation2"; // Somme Error Handling ExceptionElement e1=new ExceptionElement("ProcessorException", ErrorStrategy.ONE, Action.SKIP, 5); ExceptionElement e2=new ExceptionElement("ArithmeticException", ErrorStrategy.ONE, Action.SKIP, 5); ErrorHandling handler=new ErrorHandling(); handler.addExceptionElement(e1); handler.addExceptionElement(e2); // instanciation of some steps ActorStep s2=new ActorStep("step2",5, null,impl2,handler); ActorStep s1=new ActorStep("step1",5, s2,impl1,handler); //Model final List<ActorStep> model=new ArrayList<ActorStep>(); s1.setWorkRef(0);model.add(s1); s2.setWorkRef(1);model.add(s2); final ActorRef batchproducer =_system.actorOf(Props.create(BatchProducer.class,data.iterator(),5)); // instanciation of the orchestration Orchestrator orchestrator = TypedActor.get(_system).typedActorOf( new TypedProps<OrchestratorImpl>(Orchestrator.class, new Creator<OrchestratorImpl>() { public OrchestratorImpl create() { return new OrchestratorImpl(model,batchproducer); } }), "orchestrator"); while(orchestrator.getBatchStatus()!=BatchStatus.COMPLETED){ System.out.println(orchestrator.getBatchStatus()); } if(orchestrator.getBatchStatus()==BatchStatus.COMPLETED){ System.out.println("STATUS AT THE END: "+orchestrator.getBatchStatus()); System.out.println("PROCESSED: "+orchestrator.getStepMetrics("step1").PROCESSED); System.out.println("RECEIVED: "+orchestrator.getStepMetrics("step1").RECEIVED); _system.shutdown(); } }
public Future<Integer> add(Integer first, Integer second) { return Futures.successful(first + second, TypedActor.dispatcher()); }
public OrchestratorImpl(List<ActorStep> steps,ActorRef b) { this.steps=steps; this.bathcproducer=b; memory=TypedActor.context().actorOf(Props.create(OrchestrationMemory.class,steps.size())); }
/** * Private constructor that initializes the typed actor extension. * * @param actorSystem the actor system to wrap. */ private Akka(ActorSystem actorSystem) { system = actorSystem; typedActorExtension = TypedActor.get(actorSystem); }
/** * Allow to get the reference of the Orchestrator in order to be able to send * a message or to send the reference to receiver * @return ActorRef */ ActorRef getSelf(){ return TypedActor.context().self(); }
public Future<Integer> subtract(Integer first, Integer second) { return Futures.successful(first - second, TypedActor.dispatcher()); }