@Override public double[] toArray() { IList<Double> list = inner.collect(DistributedCollectors.toIList(uniqueListName())); try { double[] array = new double[list.size()]; int index = 0; for (Double d : list) { array[index++] = d; } return array; } finally { list.destroy(); } }
@Override public Vertex buildDAG(DAG dag) { String listName = uniqueListName(); IList<T> list = context.getJetInstance().getList(listName); Vertex previous = upstream.buildDAG(dag); Vertex writer = dag.newVertex("write-list-" + listName, SinkProcessors.writeListP(listName)); if (upstream.isOrdered()) { writer.localParallelism(1); } dag.edge(from(previous, 1).to(writer, 0)); context.addStreamListener(() -> { list.forEach(consumer); list.destroy(); }); return previous; }
@Override public Boolean reduce(StreamContext context, Pipe<? extends T> upstream) { String listName = uniqueListName(); DAG dag = new DAG(); Vertex previous = upstream.buildDAG(dag); Vertex anyMatch = dag.newVertex("any-match", () -> new AnyMatchP<>(predicate)); Vertex writer = dag.newVertex("write-" + listName, SinkProcessors.writeListP(listName)); dag.edge(between(previous, anyMatch)) .edge(between(anyMatch, writer)); executeJob(context, dag); IList<Boolean> results = context.getJetInstance().getList(listName); boolean result = anyMatch(results); results.destroy(); return result; }
@Nonnull public static ProcessorMetaSupplier writeListP(@Nonnull String name, @Nullable ClientConfig clientConfig) { boolean isLocal = clientConfig == null; return dontParallelize(new HazelcastWriterSupplier<>( serializableConfig(clientConfig), index -> new ArrayList<>(), ArrayList::add, instance -> { IList<Object> list = instance.getList(name); return buffer -> { try { list.addAll(buffer); } catch (HazelcastInstanceNotActiveException e) { handleInstanceNotActive(instance, e, isLocal); } buffer.clear(); }; }, noopConsumer() )); }
@Test public void ilistCollect_whenSourceMap() throws Exception { IStreamMap<String, Integer> map = getMap(); fillMap(map); IList<Entry<String, Integer>> collected = map.stream().collect(toIList(randomString())); Entry<String, Integer>[] expecteds = map.entrySet().toArray(new Entry[0]); Entry<String, Integer>[] actuals = collected.toArray(new Entry[0]); Comparator<Entry<String, Integer>> entryComparator = Comparator.comparing(Entry::getKey); Arrays.sort(expecteds, entryComparator); Arrays.sort(actuals, entryComparator); assertArrayEquals(expecteds, actuals); }
@Test public void ilistCollect_whenSourceCache() throws Exception { IStreamCache<String, Integer> cache = getCache(); fillCache(cache); IList<Entry<String, Integer>> collected = cache.stream().collect(toIList(randomString())); Cache.Entry<String, Integer>[] expecteds = new Cache.Entry[cache.size()]; int count = 0; for (Cache.Entry<String, Integer> entry : cache) { expecteds[count++] = entry; } Map.Entry<String, Integer>[] actuals = collected.toArray(new Map.Entry[0]); Arrays.sort(expecteds, Comparator.comparing(Cache.Entry::getKey)); Arrays.sort(actuals, Comparator.comparing(Map.Entry::getKey)); assertEquals(expecteds.length, actuals.length); for (int i = 0; i < expecteds.length; i++) { assertEquals(expecteds[i].getKey(), actuals[i].getKey()); assertEquals(expecteds[i].getValue(), actuals[i].getValue()); } }
@Test public void sourceList() throws InterruptedException { IStreamList<Integer> list = getList(); fillList(list); IList<Integer> result = list .stream() .filter(f -> f < 100) .collect(DistributedCollectors.toIList(randomString())); assertEquals(100, result.size()); for (int i = 0; i < 100; i++) { int val = result.get(i); assertEquals(i, val); } }
@Test public void sourceList() { IStreamList<Integer> list = getList(); int modulus = 10; fillList(list, i -> i % modulus); IList<Integer> result = list .stream() .distinct() .collect(DistributedCollectors.toIList(randomString())); assertEquals(modulus, result.size()); for (int i = 0; i < 10; i++) { assertTrue(Integer.toString(i), result.contains(i)); } }
@Test public void testReadHdfs() { DAG dag = new DAG(); Vertex source = dag.newVertex("source", readHdfsP(jobConf, mapperType.mapper)) .localParallelism(4); Vertex sink = dag.newVertex("sink", writeListP("sink")) .localParallelism(1); dag.edge(between(source, sink)); Future<Void> future = instance.newJob(dag).getFuture(); assertCompletesEventually(future); IList list = instance.getList("sink"); assertEquals(expectedSinkSize(), list.size()); assertTrue(list.get(0).toString().contains("value")); }
@Override public void execute(HazelcastInstance hazelcastInstance) throws Exception { JobTracker jobTracker = hazelcastInstance.getJobTracker("default"); IList<Person> list = hazelcastInstance.getList("persons"); KeyValueSource<String, Person> source = KeyValueSource.fromList(list); Job<String, Person> job = jobTracker.newJob(source); ICompletableFuture future = job.mapper(new SalaryMapper()) // .combiner(new SalaryCombinerFactory()) // .reducer(new SalaryReducerFactory()) // .submit(); System.out.println(ToStringPrettyfier.toString(future.get())); }
@Override public void execute(HazelcastInstance hazelcastInstance) throws Exception { JobTracker jobTracker = hazelcastInstance.getJobTracker("default"); IList<Person> list = hazelcastInstance.getList("persons"); KeyValueSource<String, Person> source = KeyValueSource.fromList(list); Job<String, Person> job = jobTracker.newJob(source); // Collect all people by state ICompletableFuture future = job.mapper(new StateBasedCountMapper()).submit(); // Count people by state // ICompletableFuture future = job.mapper(new StateBasedCountMapper()).reducer(new CountReducerFactory()).submit(); // Same as above but with precalculation per node // ICompletableFuture future = job.mapper(new StateBasedCountMapper()).combiner(new CountCombinerFactory()) // .reducer(new CountReducerFactory()).submit(); System.out.println(ToStringPrettyfier.toString(future.get())); }
@Test @Ignore public void addRemoveItemListener() throws InterruptedException { HazelcastClient hClient = getHazelcastClient(); final IList<String> list = hClient.getList("addRemoveItemListenerList"); final CountDownLatch addLatch = new CountDownLatch(4); final CountDownLatch removeLatch = new CountDownLatch(4); ItemListener<String> listener = new CountDownItemListener<String>(addLatch, removeLatch); list.addItemListener(listener, true); list.add("hello"); list.add("hello"); list.remove("hello"); list.remove("hello"); list.removeItemListener(listener); list.add("hello"); list.add("hello"); list.remove("hello"); list.remove("hello"); Thread.sleep(10); assertEquals(2, addLatch.getCount()); assertEquals(2, removeLatch.getCount()); }
@Test public void size() { HazelcastClient hClient = getHazelcastClient(); IList<Integer> list = hClient.getList("size"); int count = 100; assertTrue(list.isEmpty()); for (int i = 0; i < count; i++) { assertTrue(list.add(i)); } assertEquals(count, list.size()); for (int i = 0; i < count / 2; i++) { assertTrue(list.add(i)); } assertFalse(list.isEmpty()); assertEquals(count + count / 2, list.size()); }
@Test public void remove() { HazelcastClient hClient = getHazelcastClient(); IList<Integer> list = hClient.getList("remove"); int count = 100; assertTrue(list.isEmpty()); for (int i = 0; i < count; i++) { assertTrue(list.add(i)); } assertEquals(count, list.size()); for (int i = 0; i < count; i++) { assertTrue(list.remove((Object) i)); } assertTrue(list.isEmpty()); for (int i = count; i < 2 * count; i++) { assertFalse(list.remove((Object) i)); } }
@Test public void iterate() { HazelcastClient hClient = getHazelcastClient(); IList<Integer> list = hClient.getList("iterate"); list.add(1); list.add(2); list.add(2); list.add(3); assertEquals(4, list.size()); Map<Integer, Integer> counter = new HashMap<Integer, Integer>(); counter.put(1, 1); counter.put(2, 2); counter.put(3, 1); for (Iterator<Integer> iterator = list.iterator(); iterator.hasNext(); ) { Integer integer = iterator.next(); counter.put(integer, counter.get(integer) - 1); iterator.remove(); } assertEquals(Integer.valueOf(0), counter.get(1)); assertEquals(Integer.valueOf(0), counter.get(2)); assertEquals(Integer.valueOf(0), counter.get(3)); assertTrue(list.isEmpty()); }
@Override protected void execute(String... args) { withHazelcast(hazelcast -> { String name = "default"; IList<Integer> list = hazelcast.getList(name); if (args.length > 0) { if ("master".equals(args[0])) { IntStream.rangeClosed(1, 10).forEach(list::add); } } readConsoleWhile(hazelcast, name, () -> { list.forEach(e -> show("element = %d.", e)); return null; }, list::size); }); }
@Verify public void globalVerify() { IList<Integer> results = targetInstance.getList(name + "max"); int observedMaxSize = 0; for (int m : results) { if (observedMaxSize < m) { observedMaxSize = m; } } logger.info(name + ": cache " + cache.getName() + " toleranceFactor=" + toleranceFactor + " configuredMaxSize=" + configuredMaxSize + " estimatedMaxSize=" + estimatedMaxSize + " observedMaxSize=" + observedMaxSize + " size=" + cache.size() ); IList<Counter> counters = targetInstance.getList(name + "counter"); Counter total = new Counter(); for (Counter c : counters) { total.add(c); } logger.info(name + ": " + total); logger.info(name + ": putAllMap size=" + putAllMap.size()); }
@Verify(global = true) public void verify() { for (int i = 0; i < maxAccounts; i++) { ILock lock = targetInstance.getLock(name + i); assertFalse(name + ": Lock should be unlocked", lock.isLocked()); } long totalValue = 0; IList<Long> accounts = targetInstance.getList(name); for (long value : accounts) { totalValue += value; } logger.info(": totalValue=" + totalValue); assertEquals(name + ": totalInitialValue != totalValue ", totalInitialValue, totalValue); Counter total = new Counter(); IList<Counter> totals = targetInstance.getList(name + "count"); for (Counter count : totals) { total.add(count); } logger.info("total count " + total); }
@Override public MapReduceTask<KeyIn, ValueIn, KeyOut, ValueOut> build( IList<ValueIn> list ) { try { ClientListProxy<ValueIn> proxy = (ClientListProxy<ValueIn>) list; ClientContext context = (ClientContext) GET_CLIENTCONTEXT_METHOD.invoke( proxy ); return new IListClientMapReduceTaskProxy<KeyIn, ValueIn, KeyOut, ValueOut>( proxy.getName(), context, hazelcastInstance ); } catch ( Throwable t ) { ExceptionUtil.rethrow( t ); } return null; }
@Override public void applyChange(ConfigChangeRequest configChange) { if(readOnly){ return; } IMap<String,String> config = hazelcastInstance.getMap(mapReference); for(Map.Entry<String, String> en: configChange.getAddedProperties().entrySet()){ String metaVal = configChange.getAddedProperties().get("_" + en.getKey()+".ttl"); if(metaVal!=null){ try { long ms = Long.parseLong(metaVal); config.put(en.getKey(), en.getValue(), ms, TimeUnit.MILLISECONDS); }catch(Exception e){ LOG.log(Level.WARNING, "Failed to parse TTL in millis: " + metaVal + " for '"+ en.getKey()+"'", e); config.put(en.getKey(), en.getValue()); } }else { config.put(en.getKey(), en.getValue()); } } for(String key: configChange.getRemovedProperties()){ config.remove(key); } IList<String> taList = hazelcastInstance.getList("_tamaya.transactions"); taList.add(configChange.getTransactionID()); config.put("_tamaya.transaction.lastId", configChange.getTransactionID(), 1, TimeUnit.DAYS); config.put("_tamaya.transaction.startedAt", String.valueOf(configChange.getStartedAt()), 1, TimeUnit.DAYS); config.flush(); refresh(); }
@Override @SuppressWarnings("unchecked") public <T> IList<T> getIList(String name) { name = Objects.requireNonNull(name); final IList<T> valu = getBeanSafely(name, IList.class); if (null != valu) { return valu; } return hz().getList(name); }
@Override public long[] toArray() { IList<Long> list = inner.collect(DistributedCollectors.toIList(uniqueListName())); try { long[] array = new long[list.size()]; int index = 0; for (Long l : list) { array[index++] = l; } return array; } finally { list.destroy(); } }
@Override public Object[] toArray() { IList<E_OUT> list = collect(toIList(uniqueListName())); Object[] array = list.toArray(); list.destroy(); return array; }
@Override public <A> A[] toArray(IntFunction<A[]> generator) { IList<E_OUT> list = collect(toIList(uniqueListName())); A[] array = generator.apply(list.size()); array = list.toArray(array); list.destroy(); return array; }
@Override public Optional<E_OUT> findFirst() { IList<E_OUT> first = this.limit(1).collect(toIList(uniqueListName())); Optional<E_OUT> value = first.size() == 0 ? Optional.empty() : Optional.of(first.get(0)); first.destroy(); return value; }
@Override public int[] toArray() { IList<Integer> list = inner.collect(DistributedCollectors.toIList(uniqueListName())); try { int[] array = new int[list.size()]; int index = 0; for (Integer i : list) { array[index++] = i; } return array; } finally { list.destroy(); } }
private static boolean anyMatch(IList<Boolean> results) { for (Boolean result : results) { if (result) { return true; } } return false; }
private static <T> Optional<T> execute(StreamContext context, DAG dag, Vertex combiner) { String listName = uniqueListName(); Vertex writeList = dag.newVertex("write-" + listName, SinkProcessors.writeListP(listName)); dag.edge(between(combiner, writeList)); IList<T> list = context.getJetInstance().getList(listName); executeJob(context, dag); if (list.isEmpty()) { list.destroy(); return Optional.empty(); } T result = list.get(0); list.destroy(); return Optional.of(result); }
static <A, R> R execute(StreamContext context, DAG dag, Vertex combiner, Function<A, R> finisher) { String listName = uniqueListName(); Vertex writer = dag.newVertex("write-" + listName, SinkProcessors.writeListP(listName)); dag.edge(between(combiner, writer)); executeJob(context, dag); IList<A> list = context.getJetInstance().getList(listName); A result = list.get(0); list.destroy(); return finisher.apply(result); }
@Test public void sourceList() { IStreamList<Integer> list = getList(); fillList(list, IntStream.range(0, COUNT).map(i -> COUNT - i - 1).limit(COUNT).iterator()); IList<Integer> result = list .stream() .sorted() .collect(DistributedCollectors.toIList(randomString())); assertList(result); }
@Test public void sourceMap() { IList<Integer> list = streamMap() .map(Entry::getValue) .sorted() .collect(DistributedCollectors.toIList(randomString())); assertList(list); }
@Test public void sourceCache() { IList<Integer> list = streamCache() .map(Entry::getValue) .sorted() .collect(DistributedCollectors.toIList(randomString())); assertList(list); }
@Test public void sourceMap_withComparator() { IList<Integer> list = streamMap() .map(Entry::getValue) .sorted((left, right) -> right.compareTo(left)) .collect(DistributedCollectors.toIList(randomString())); assertListDescending(list); }
@Test public void sourceCache_withComparator() { IList<Integer> list = streamCache() .map(Entry::getValue) .sorted((left, right) -> right.compareTo(left)) .collect(DistributedCollectors.toIList(randomString())); assertListDescending(list); }
@Test public void operationsAfterSort_sourceMap() { IList<Integer> list = streamMap() .map(Entry::getValue) .sorted(Integer::compareTo) .map(i -> i * i) .collect(DistributedCollectors.toIList(randomString())); assertListSquare(list); }
@Test public void operationsAfterSort_sourceCache() { IList<Integer> list = streamCache() .map(Entry::getValue) .sorted(Integer::compareTo) .map(i -> i * i) .collect(DistributedCollectors.toIList(randomString())); assertListSquare(list); }