@Nonnull @Override public Map<K, List<M>> putAll(@Nonnull Map<K, List<M>> values) { Map<K, ICompletableFuture<List<M>>> futureMap = new HashMap<>(); values.forEach((key, message) -> futureMap.put(key, hazelcastMap.putAsync(key, message))); Map<K, List<M>> ret = new HashMap<>(); futureMap.forEach((key, future) -> { try { List<M> value = future.get(); if (value != null) { ret.put(key, value); } } catch (ExecutionException | InterruptedException e) { // TODO: Figure out if we timed out or were interrupted... throw new RuntimeException(e.getMessage(), e); } }); return ret; }
@Nonnull @Override public Map<K, List<M>> removeAll(Collection<K> keys) { Map<K, ICompletableFuture<List<M>>> futureMap = new HashMap<>(); keys.forEach(key -> futureMap.put(key, hazelcastMap.removeAsync(key))); Map<K, List<M>> ret = new HashMap<>(); futureMap.forEach((key, future) -> { try { List<M> value = future.get(); if (value != null) { ret.put(key, value); } } catch (ExecutionException | InterruptedException e) { // TODO: Figure out if we timed out or were interrupted... throw new RuntimeException(e.getMessage(), e); } }); return ret; }
@Nonnull @Override @SuppressWarnings("unchecked") public <B extends PMessageBuilder<Message, Field>> Map<Key, B> putAllBuilders(@Nonnull Map<Key, B> builders) { Map<Key, ICompletableFuture<Builder>> futureMap = new HashMap<>(); builders.forEach((key, builder) -> futureMap.put(key, hazelcastMap.putAsync(key, (Builder) builder))); Map<Key, B> ret = new HashMap<>(); futureMap.forEach((key, future) -> { try { Builder value = future.get(); if (value != null) { ret.put(key, (B) value); } } catch (ExecutionException | InterruptedException e) { // TODO: Figure out if we timed out or were interrupted... throw new RuntimeException(e.getMessage(), e); } }); return ret; }
@Nonnull @Override public Map<Key, Message> removeAll(Collection<Key> keys) { Map<Key, ICompletableFuture<Builder>> futureMap = new HashMap<>(); keys.forEach(key -> futureMap.put(key, hazelcastMap.removeAsync(key))); Map<Key, Message> ret = new HashMap<>(); futureMap.forEach((key, builder) -> { try { Builder value = builder.get(); if (value != null) { ret.put(key, value.build()); } } catch (ExecutionException | InterruptedException e) { // TODO: Figure out if we timed out or were interrupted... throw new RuntimeException(e.getMessage(), e); } }); return ret; }
@Nonnull @Override public Map<Key, Message> putAll(@Nonnull Map<Key, Message> values) { Map<Key, ICompletableFuture<Message>> futureMap = new HashMap<>(); values.forEach((key, message) -> futureMap.put(key, hazelcastMap.putAsync(key, message))); Map<Key, Message> ret = new HashMap<>(); futureMap.forEach((key, future) -> { try { Message value = future.get(); if (value != null) { ret.put(key, value); } } catch (ExecutionException | InterruptedException e) { // TODO: Figure out if we timed out or were interrupted... throw new RuntimeException(e.getMessage(), e); } }); return ret; }
@Nonnull @Override public Map<Key, Message> removeAll(Collection<Key> keys) { Map<Key, ICompletableFuture<Message>> futureMap = new HashMap<>(); keys.forEach(key -> futureMap.put(key, hazelcastMap.removeAsync(key))); Map<Key, Message> ret = new HashMap<>(); futureMap.forEach((key, future) -> { try { Message value = future.get(); if (value != null) { ret.put(key, value); } } catch (ExecutionException | InterruptedException e) { // TODO: Figure out if we timed out or were interrupted... throw new RuntimeException(e.getMessage(), e); } }); return ret; }
@Override public void execute(HazelcastInstance hazelcastInstance) throws Exception { JobTracker jobTracker = hazelcastInstance.getJobTracker("default"); IList<Person> list = hazelcastInstance.getList("persons"); KeyValueSource<String, Person> source = KeyValueSource.fromList(list); Job<String, Person> job = jobTracker.newJob(source); ICompletableFuture future = job.mapper(new SalaryMapper()) // .combiner(new SalaryCombinerFactory()) // .reducer(new SalaryReducerFactory()) // .submit(); System.out.println(ToStringPrettyfier.toString(future.get())); }
@Override public void execute(HazelcastInstance hazelcastInstance) throws Exception { JobTracker jobTracker = hazelcastInstance.getJobTracker("default"); IList<Person> list = hazelcastInstance.getList("persons"); KeyValueSource<String, Person> source = KeyValueSource.fromList(list); Job<String, Person> job = jobTracker.newJob(source); // Collect all people by state ICompletableFuture future = job.mapper(new StateBasedCountMapper()).submit(); // Count people by state // ICompletableFuture future = job.mapper(new StateBasedCountMapper()).reducer(new CountReducerFactory()).submit(); // Same as above but with precalculation per node // ICompletableFuture future = job.mapper(new StateBasedCountMapper()).combiner(new CountCombinerFactory()) // .reducer(new CountReducerFactory()).submit(); System.out.println(ToStringPrettyfier.toString(future.get())); }
private <T> ScheduledFuture<T> submitToPartitionOwner(Callable<T> task, int partitionId, long delay, long period, boolean fixedRate) { if (task == null) { throw new NullPointerException("task can't be null"); } if (isShutdown()) { throw new RejectedExecutionException(getRejectionMessage()); } NodeEngine nodeEngine = getNodeEngine(); Data taskData = nodeEngine.toData(task); String uuid = buildRandomUuidString(); String name = getName(); ScheduledCallableTaskOperation op = new ScheduledCallableTaskOperation(name, uuid, taskData, delay, period, fixedRate); ICompletableFuture future = invoke(partitionId, op); return new ScheduledDelegatingFuture<T>(future, nodeEngine.getSerializationService(), delay); // return new CancellableDelegatingFuture<T>(future, nodeEngine, uuid, partitionId); }
private static Map<String, Long> mapReduce(HazelcastInstance hazelcastInstance) throws Exception { // Retrieving the JobTracker by name JobTracker jobTracker = hazelcastInstance.getJobTracker("default"); // Creating the KeyValueSource for a Hazelcast IMap IMap<String, String> map = hazelcastInstance.getMap("articles"); KeyValueSource<String, String> source = KeyValueSource.fromMap(map); Job<String, String> job = jobTracker.newJob(source); // Creating a new Job ICompletableFuture<Map<String, Long>> future = job // returned future .mapper(new TokenizerMapper()) // adding a mapper .combiner(new WordCountCombinerFactory()) // adding a combiner through the factory .reducer(new WordCountReducerFactory()) // adding a reducer through the factory .submit(); // submit the task // Attach a callback listener future.andThen(buildCallback()); // Wait and retrieve the result return future.get(); }
private static long mapReduceCollate(HazelcastInstance hazelcastInstance) throws Exception { // Retrieving the JobTracker by name JobTracker jobTracker = hazelcastInstance.getJobTracker("default"); // Creating the KeyValueSource for a Hazelcast IMap IMap<String, String> map = hazelcastInstance.getMap("articles"); KeyValueSource<String, String> source = KeyValueSource.fromMap(map); // Creating a new Job Job<String, String> job = jobTracker.newJob(source); ICompletableFuture<Long> future = job // returned future .mapper(new TokenizerMapper()) // adding a mapper .combiner(new WordCountCombinerFactory()) // adding a combiner through the factory .reducer(new WordCountReducerFactory()) // adding a reducer through the factory .submit(new WordCountCollator()); // submit the task and supply a collator // Wait and retrieve the result return future.get(); }
void add(ICompletableFuture<Long> future) { if (batchSize <= 0) { return; } batch.add(future); if (batch.size() == batchSize) { for (ICompletableFuture batchFuture : batch) { try { batchFuture.get(); } catch (Exception e) { throw rethrow(e); } } batch.clear(); } }
@Override @SuppressWarnings("unchecked") public void pushEntry(K key, V value) { if (storedException.get() != null) { throw new RuntimeException("Aborting pushEntry; problems are detected. Please check the cause", storedException.get()); } acquirePermit(1); try { ICompletableFuture<V> future = storeAsync(key, value); future.andThen(callback); } catch (Exception e) { releasePermit(1); throw rethrow(e); } }
@Override protected void init(@Nonnull Context context) throws Exception { ICompletableFuture<EventJournalInitialSubscriberState>[] futures = new ICompletableFuture[partitionIds.length]; Arrays.setAll(futures, i -> eventJournalReader.subscribeToEventJournal(partitionIds[i])); for (int i = 0; i < futures.length; i++) { emitOffsets[i] = readOffsets[i] = getSequence(futures[i].get()); } }
@Nonnull @Override public ICompletableFuture<ClientMessage> invoke(@Nonnegative int partitionId, @Nonnull ClientMessage request) { try { ClientInvocation clientInvocation = new ClientInvocation(client, request, partitionId); return clientInvocation.invoke(); } catch (Exception e) { throw ExceptionUtil.rethrow(e); } }
@Override public void execute(HazelcastInstance hazelcastInstance) throws Exception { JobTracker jobTracker = hazelcastInstance.getJobTracker("default"); IList<Person> list = hazelcastInstance.getList("persons"); KeyValueSource<String, Person> source = KeyValueSource.fromList(list); Job<String, Person> job = jobTracker.newJob(source); ICompletableFuture<List<Map.Entry<String, Integer>>> future = // job.mapper(new SalaryMapper()) // .combiner(new SalaryCombinerFactory()) // .reducer(new SalaryReducerFactory()) // .submit(new SalaryCollator()); // Intermediate result List<Map.Entry<String, Integer>> orderedSalariesByState = future.get(); Map.Entry<String, Integer> topSalary = orderedSalariesByState.get(0); IList<Crime> crimesList = hazelcastInstance.getList("crimes"); KeyValueSource<String, Crime> crimeSource = KeyValueSource.fromList(crimesList); Job<String, Crime> crimeJob = jobTracker.newJob(crimeSource); ICompletableFuture<Map<CrimeCategory, Integer>> crimeFuture = // crimeJob.mapper(new CrimeMapper(topSalary.getKey())) // .reducer(new CrimeReducerFactory()) // .submit(); System.out.println(ToStringPrettyfier.toString(crimeFuture.get())); }
public static void main(String[] args) throws Exception { // Prepare Hazelcast cluster HazelcastInstance hazelcastInstance = buildCluster(3); try { // Read data fillMapWithData(hazelcastInstance); JobTracker tracker = hazelcastInstance.getJobTracker("default"); IMap<String, String> map = hazelcastInstance.getMap(MAP_NAME); KeyValueSource<String, String> source = KeyValueSource.fromMap(map); Job<String, String> job = tracker.newJob(source); ICompletableFuture<Map<String, Integer>> future = job .mapper(new TokenizerMapper()) // Activate Combiner to add combining phase! // .combiner(new WordcountCombinerFactory()) .reducer(new WordcountReducerFactory()) .submit(); System.out.println(ToStringPrettyfier.toString(future.get())); } finally { // Shutdown cluster Hazelcast.shutdownAll(); } }
private static void doMapReduce( final HazelcastInstance instance, final String filename) throws ExecutionException, InterruptedException, IOException { final IMap<Integer, SpatialPoint> geomap = instance.getMap(GEOMAP); final JobTracker tracker = instance.getJobTracker("default"); final Job<Integer, SpatialPoint> job = tracker.newJob(KeyValueSource.fromMap(geomap)); final ICompletableFuture<Map<Integer, Integer>> future = job. mapper(new FeatureMapper(HEXMAP)). reducer(new FeatureReducerFactory()). submit(); final FileOutputStream fileOutputStream = new FileOutputStream(filename); try { final PrintStream printStream = new PrintStream(fileOutputStream); printStream.format("ID,POPULATION%n"); final Map<Integer, Integer> result = future.get(); for (final Map.Entry<Integer, Integer> entry : result.entrySet()) { printStream.format("%d,%d%n", entry.getKey(), entry.getValue()); } printStream.flush(); } finally { fileOutputStream.close(); } }
private <T> void executeAsync(ICompletableFuture<T> future, Handler<AsyncResult<T>> resultHandler) { future.andThen( new HandlerCallBackAdapter(resultHandler), VertxExecutorAdapter.getOrCreate(vertx.getOrCreateContext()) ); }
@Override public void put(K k, V v, long ttl, Handler<AsyncResult<Void>> completionHandler) { K kk = convertParam(k); V vv = convertParam(v); executeAsyncVoid( (ICompletableFuture<Void>) map.putAsync(kk, vv, ttl, TimeUnit.MILLISECONDS), completionHandler ); }
@Override public void remove(K k, Handler<AsyncResult<V>> resultHandler) { K kk = convertParam(k); executeAsync( (ICompletableFuture<V>)map.removeAsync(kk), resultHandler ); }
private void executeAsyncVoid(ICompletableFuture<Void> future, Handler<AsyncResult<Void>> resultHandler) { future.andThen( new VoidHandlerCallBackAdapter(resultHandler), VertxExecutorAdapter.getOrCreate(vertx.getOrCreateContext()) ); }
@TimeStep(prob = 0.2) public ICompletableFuture<Object> putAsync(ThreadState state) { int key = state.randomInt(keyCount); Object value = state.randomInt(); count.putAsyncCount.incrementAndGet(); return map.putAsync(key, value); }
@TimeStep(prob = 0.2) public ICompletableFuture<Object> putAsyncTTL(ThreadState state) { int key = state.randomInt(keyCount); Object value = state.randomInt(); int delay = 1 + state.randomInt(maxTTLExpirySeconds); count.putAsyncTTLCount.incrementAndGet(); return map.putAsync(key, value, delay, TimeUnit.SECONDS); }
@TimeStep(prob = 0.5) public void mapReduce(ThreadState state) throws Exception { JobTracker tracker = targetInstance.getJobTracker(Thread.currentThread().getName() + name); KeyValueSource<Integer, Employee> source = KeyValueSource.fromMap(map); Job<Integer, Employee> job = tracker.newJob(source); ICompletableFuture<Map<Integer, Set<Employee>>> future = job .mapper(new ModIdMapper(2)) .combiner(new RangeIdCombinerFactory(10, 30)) .reducer(new IdReducerFactory(10, 20, 30)) .submit(); Map<Integer, Set<Employee>> result = future.get(); for (Set<Employee> set : result.values()) { for (Employee employee : set) { assertTrue(employee.getId() % 2 == 0); assertTrue(employee.getId() >= 10 && employee.getId() <= 30); assertTrue(employee.getId() != 10); assertTrue(employee.getId() != 20); assertTrue(employee.getId() != 30); } } state.operationCounter.mapReduce++; }
@TimeStep public void timeStep(ThreadState state) throws Exception { if (!state.semaphore.tryAcquire(1, acquireTimeoutMs, TimeUnit.MILLISECONDS)) { throw new TestException("Failed to acquire a license from the semaphore within the given timeout"); } String key = keys[state.randomInt(keyCount)]; ICompletableFuture<String> f = (ICompletableFuture<String>) map.putAsync(key, ""); f.andThen(state); }
@TimeStep(prob = 0.1) public void write(ThreadState state) throws Exception { Integer key = state.randomInt(keyCount); Integer value = state.randomInt(); ICompletableFuture<?> future = cache.putAsync(key, value); state.futureList.add(future); state.syncIfNecessary(state.iteration++); }
@TimeStep(prob = -1) public void get(ThreadState state) throws Exception { Integer key = state.randomInt(keyCount); ICompletableFuture<?> future = cache.getAsync(key); state.futureList.add(future); state.syncIfNecessary(state.iteration++); }
private void syncIfNecessary(long iteration) throws Exception { if (iteration % batchSize == 0) { for (ICompletableFuture<?> future : futureList) { future.get(); } futureList.clear(); } }
@TimeStep public void write(ThreadState state, Probe probe, @StartNanos long startNanos) { AsyncAtomicLong counter = state.getRandomCounter(); state.increments++; ICompletableFuture<Long> future = counter.asyncIncrementAndGet(); state.add(future); future.andThen(new LongExecutionCallback(probe, startNanos)); }
@TimeStep(prob = -1) public void get(ThreadState state, Probe probe, @StartNanos long startNanos) { AsyncAtomicLong counter = state.getRandomCounter(); ICompletableFuture<Long> future = counter.asyncGet(); state.add(future); future.andThen(new LongExecutionCallback(probe, startNanos)); }
@Override public ICompletableFuture<V> getAsync(K key) { return cache.getAsync(key); }
@Override public ICompletableFuture<V> getAsync(K key, ExpiryPolicy expiryPolicy) { return cache.getAsync(key, expiryPolicy); }
@Override public ICompletableFuture<Void> putAsync(K key, V value) { return cache.putAsync(key, value); }
@Override public ICompletableFuture<Void> putAsync(K key, V value, ExpiryPolicy expiryPolicy) { return cache.putAsync(key, value, expiryPolicy); }
@Override public ICompletableFuture<Boolean> putIfAbsentAsync(K key, V value) { return cache.putIfAbsentAsync(key, value); }
@Override public ICompletableFuture<Boolean> putIfAbsentAsync(K key, V value, ExpiryPolicy expiryPolicy) { return cache.putIfAbsentAsync(key, value, expiryPolicy); }
@Override public ICompletableFuture<V> getAndPutAsync(K key, V value) { return cache.getAndPutAsync(key, value); }
@Override public ICompletableFuture<V> getAndPutAsync(K key, V value, ExpiryPolicy expiryPolicy) { return cache.getAndPutAsync(key, value, expiryPolicy); }