Java 类org.apache.commons.lang3.mutable.MutableLong 实例源码
项目:cloud-meter
文件:StatCalculator.java
/**
* Get the value which %percent% of the values are less than. This works
* just like median (where median represents the 50% point). A typical
* desire is to see the 90% point - the value that 90% of the data points
* are below, the remaining 10% are above.
*
* @param percent
* number representing the wished percent (between <code>0</code>
* and <code>1.0</code>)
* @return the value which %percent% of the values are less than
*/
public T getPercentPoint(double percent) {
if (count <= 0) {
return ZERO;
}
if (percent >= 1.0) {
return getMax();
}
// use Math.round () instead of simple (long) to provide correct value rounding
long target = Math.round (count * percent);
try {
for (Entry<T, MutableLong> val : valuesMap.entrySet()) {
target -= val.getValue().longValue();
if (target <= 0){
return val.getKey();
}
}
} catch (ConcurrentModificationException ignored) {
// ignored. May happen occasionally, but no harm done if so.
}
return ZERO; // TODO should this be getMin()?
}
项目:apex-malhar
文件:ApexWindowedStreamImpl.java
@Override
public <STREAM extends WindowedStream<Tuple.WindowedTuple<Long>>> STREAM count(Option... opts)
{
Function.MapFunction<T, Tuple<Long>> kVMap = new Function.MapFunction<T, Tuple<Long>>()
{
@Override
public Tuple<Long> f(T input)
{
if (input instanceof Tuple.TimestampedTuple) {
return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)input).getTimestamp(), 1L);
} else {
return new Tuple.TimestampedTuple<>(System.currentTimeMillis(), 1L);
}
}
};
WindowedStream<Tuple<Long>> innerstream = map(kVMap);
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createWindowedOperator(new SumLong());
return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts);
}
项目:apex-malhar
文件:StateTracker.java
void bucketAccessed(long bucketId)
{
long now = System.currentTimeMillis();
if (accessedBucketIds.add(bucketId) || now - lastUpdateAccessTime > updateAccessTimeInterval) {
synchronized (bucketLastAccess) {
for (long id : accessedBucketIds) {
MutableLong lastAccessTime = bucketLastAccess.get(id);
if (lastAccessTime != null) {
lastAccessTime.setValue(now);
} else {
bucketLastAccess.put(id, new MutableLong(now));
}
}
}
accessedBucketIds.clear();
lastUpdateAccessTime = now;
}
}
项目:apex-malhar
文件:WEQueryQueueManagerTest.java
@Test
public void testSimpleRemoveEmpty()
{
WindowEndQueueManager<Query, Void> wqqm = new WindowEndQueueManager<>();
wqqm.setup(null);
wqqm.beginWindow(0);
QueryBundle<Query, Void, MutableLong> qb = wqqm.dequeue();
Query queryD = qb == null ? null : qb.getQuery();
Assert.assertEquals("The queries must match.", null, queryD);
qb = wqqm.dequeue();
queryD = qb == null ? null : qb.getQuery();
Assert.assertEquals("The queries must match.", null, queryD);
wqqm.endWindow();
wqqm.teardown();
}
项目:apex-malhar
文件:WEQueryQueueManagerTest.java
@Test
public void testSimpleAddOneRemove()
{
WindowEndQueueManager<Query, Void> wqqm = new WindowEndQueueManager<>();
wqqm.setup(null);
wqqm.beginWindow(0);
Query query = new MockQuery("1");
wqqm.enqueue(query, null, new MutableLong(1L));
Query queryD = wqqm.dequeue().getQuery();
QueryBundle<Query, Void, MutableLong> qb = wqqm.dequeue();
Query queryD1 = qb == null ? null : qb.getQuery();
wqqm.endWindow();
wqqm.teardown();
Assert.assertEquals("The queries must match.", query, queryD);
Assert.assertEquals("The queries must match.", null, queryD1);
}
项目:apex-malhar
文件:QueryManagerAsynchronousTest.java
@Override
public void run()
{
int numLoops = totalTuples / batchSize;
for (int loopCounter = 0, tupleCounter = 0; loopCounter < numLoops; loopCounter++, tupleCounter++) {
for (int batchCounter = 0; batchCounter < batchSize; batchCounter++, tupleCounter++) {
queueManager.enqueue(new MockQuery(tupleCounter + ""), null, new MutableLong(1L));
if (rand.nextDouble() < waitMillisProb) {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}
}
项目:apex-malhar
文件:WindowedOperatorTest.java
private WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator()
{
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new WindowedOperatorImpl<>();
if (useSpillable) {
sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore);
// TODO: We don't yet support Spillable data structures for window state storage because SpillableMapImpl does not yet support iterating over all keys.
windowStateStorage = new InMemoryWindowedStorage<>();
SpillableWindowedPlainStorage<MutableLong> pds = new SpillableWindowedPlainStorage<>();
pds.setSpillableComplexComponent(sccImpl);
plainDataStorage = pds;
SpillableWindowedPlainStorage<Long> prs = new SpillableWindowedPlainStorage<>();
prs.setSpillableComplexComponent(sccImpl);
plainRetractionStorage = prs;
windowedOperator.addComponent("SpillableComplexComponent", sccImpl);
} else {
windowStateStorage = new InMemoryWindowedStorage<>();
plainDataStorage = new InMemoryWindowedStorage<>();
plainRetractionStorage = new InMemoryWindowedStorage<>();
}
windowedOperator.setDataStorage(plainDataStorage);
windowedOperator.setRetractionStorage(plainRetractionStorage);
windowedOperator.setWindowStateStorage(windowStateStorage);
windowedOperator.setAccumulation(new SumAccumulation());
return windowedOperator;
}
项目:apex-malhar
文件:WindowedOperatorTest.java
@Test
public void testValidation() throws Exception
{
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new WindowedOperatorImpl<>();
verifyValidationFailure(windowedOperator, "nothing is configured");
windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
verifyValidationFailure(windowedOperator, "data storage is not set");
windowedOperator.setDataStorage(new InMemoryWindowedStorage<MutableLong>());
verifyValidationFailure(windowedOperator, "accumulation is not set");
windowedOperator.setAccumulation(new SumAccumulation());
windowedOperator.validate();
windowedOperator.setTriggerOption(new TriggerOption().accumulatingAndRetractingFiredPanes());
verifyValidationFailure(windowedOperator, "retracting storage is not set for ACCUMULATING_AND_RETRACTING");
windowedOperator.setRetractionStorage(new InMemoryWindowedStorage<Long>());
windowedOperator.validate();
windowedOperator.setTriggerOption(new TriggerOption().discardingFiredPanes().firingOnlyUpdatedPanes());
verifyValidationFailure(windowedOperator, "DISCARDING is not valid for option firingOnlyUpdatedPanes");
windowedOperator.setTriggerOption(new TriggerOption().accumulatingFiredPanes().firingOnlyUpdatedPanes());
windowedOperator.setRetractionStorage(null);
verifyValidationFailure(windowedOperator, "retracting storage is not set for option firingOnlyUpdatedPanes");
}
项目:apex-malhar
文件:WindowedOperatorTest.java
@Test
public void testSlidingWindowAssignment()
{
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
windowedOperator.setWindowOption(new WindowOption.SlidingTimeWindows(Duration.millis(1000), Duration.millis(200)));
windowedOperator.setup(testMeta.operatorContext);
Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1600L, 2L));
Collection<? extends Window> windows = windowedValue.getWindows();
Window[] winArray = windows.toArray(new Window[]{});
Assert.assertEquals(5, winArray.length);
Assert.assertEquals(BASE + 800, winArray[0].getBeginTimestamp());
Assert.assertEquals(1000, winArray[0].getDurationMillis());
Assert.assertEquals(BASE + 1000, winArray[1].getBeginTimestamp());
Assert.assertEquals(1000, winArray[1].getDurationMillis());
Assert.assertEquals(BASE + 1200, winArray[2].getBeginTimestamp());
Assert.assertEquals(1000, winArray[2].getDurationMillis());
Assert.assertEquals(BASE + 1400, winArray[3].getBeginTimestamp());
Assert.assertEquals(1000, winArray[3].getDurationMillis());
Assert.assertEquals(BASE + 1600, winArray[4].getBeginTimestamp());
Assert.assertEquals(1000, winArray[4].getDurationMillis());
windowedOperator.teardown();
}
项目:apex-malhar
文件:WindowedOperatorTest.java
@Test
public void testKeyedAccumulation()
{
KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(false);
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
windowedOperator.setup(testMeta.operatorContext);
windowedOperator.beginWindow(1);
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, new KeyValPair<>("a", 2L)));
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, new KeyValPair<>("a", 3L)));
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 300L, new KeyValPair<>("b", 4L)));
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 150L, new KeyValPair<>("b", 5L)));
windowedOperator.endWindow();
Assert.assertEquals(1, keyedDataStorage.size());
Assert.assertEquals(5L, keyedDataStorage.get(new Window.TimeWindow(BASE, 1000), "a").longValue());
Assert.assertEquals(9L, keyedDataStorage.get(new Window.TimeWindow(BASE, 1000), "b").longValue());
windowedOperator.teardown();
}
项目:apex-malhar
文件:Application.java
@Override
public void populateDAG(DAG dag, Configuration configuration)
{
WordGenerator inputOperator = new WordGenerator();
KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = new KeyedWindowedOperatorImpl<>();
Accumulation<Long, MutableLong, Long> sum = new SumAccumulation();
windowedOperator.setAccumulation(sum);
windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, MutableLong>());
windowedOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, Long>());
windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(1)));
windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.millis(1000)).accumulatingAndRetractingFiredPanes());
//windowedOperator.setAllowedLateness(Duration.millis(14000));
ConsoleOutputOperator outputOperator = new ConsoleOutputOperator();
dag.addOperator("inputOperator", inputOperator);
dag.addOperator("windowedOperator", windowedOperator);
dag.addOperator("outputOperator", outputOperator);
dag.addStream("input_windowed", inputOperator.output, windowedOperator.input);
dag.addStream("windowed_output", windowedOperator.output, outputOperator.input);
}
项目:apex-malhar
文件:Application.java
@Override
public void populateDAG(DAG dag, Configuration configuration)
{
RandomNumberPairGenerator inputOperator = new RandomNumberPairGenerator();
WindowedOperatorImpl<MutablePair<Double, Double>, MutablePair<MutableLong, MutableLong>, Double> windowedOperator = new WindowedOperatorImpl<>();
Accumulation<MutablePair<Double, Double>, MutablePair<MutableLong, MutableLong>, Double> piAccumulation = new PiAccumulation();
windowedOperator.setAccumulation(piAccumulation);
windowedOperator.setDataStorage(new InMemoryWindowedStorage<MutablePair<MutableLong, MutableLong>>());
windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
windowedOperator.setWindowOption(new WindowOption.GlobalWindow());
windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.millis(1000)).accumulatingFiredPanes());
ConsoleOutputOperator outputOperator = new ConsoleOutputOperator();
dag.addOperator("inputOperator", inputOperator);
dag.addOperator("windowedOperator", windowedOperator);
dag.addOperator("outputOperator", outputOperator);
dag.addStream("input_windowed", inputOperator.output, windowedOperator.input);
dag.addStream("windowed_output", windowedOperator.output, outputOperator.input);
}
项目:apex-malhar
文件:SumTest.java
@Test
public void SumTest()
{
SumInt si = new SumInt();
SumLong sl = new SumLong();
SumFloat sf = new SumFloat();
SumDouble sd = new SumDouble();
Assert.assertEquals(new MutableInt(10), si.accumulate(si.defaultAccumulatedValue(), 10));
Assert.assertEquals(new MutableInt(11), si.accumulate(new MutableInt(1), 10));
Assert.assertEquals(new MutableInt(22), si.merge(new MutableInt(1), new MutableInt(21)));
Assert.assertEquals(new MutableLong(10L), sl.accumulate(sl.defaultAccumulatedValue(), 10L));
Assert.assertEquals(new MutableLong(22L), sl.accumulate(new MutableLong(2L), 20L));
Assert.assertEquals(new MutableLong(41L), sl.merge(new MutableLong(32L), new MutableLong(9L)));
Assert.assertEquals(new MutableFloat(9.0F), sf.accumulate(sf.defaultAccumulatedValue(), 9.0F));
Assert.assertEquals(new MutableFloat(22.5F), sf.accumulate(new MutableFloat(2.5F), 20F));
Assert.assertEquals(new MutableFloat(41.0F), sf.merge(new MutableFloat(33.1F), new MutableFloat(7.9F)));
Assert.assertEquals(new MutableDouble(9.0), sd.accumulate(sd.defaultAccumulatedValue(), 9.0));
Assert.assertEquals(new MutableDouble(22.5), sd.accumulate(new MutableDouble(2.5), 20.0));
Assert.assertEquals(new MutableDouble(41.0), sd.merge(new MutableDouble(33.1), new MutableDouble(7.9)));
}
项目:ij-ridgedetection
文件:Convol.java
/**
* Compute gauss mask 0.
*
* @param num
* the num
* @param sigma
* the sigma
* @return the double[]
*/
/*
* num ist eigentlich pointer - aufrufende Funkion nimmt an, dass num geändert
* wird. Übergebe es deswegen als MutableDouble aus CommonsLang
*/
public double[] compute_gauss_mask_0(MutableLong num, double sigma) {
int i, n;
double limit;
double[] h;
limit = LinesUtil.MASK_SIZE(LinesUtil.MAX_SIZE_MASK_0, sigma); /* Error < 0.001 on each side */
n = (int) limit;
h = new double[2 * n + 1];
for (i = -n + 1; i <= n - 1; i++)
h[n + i] = phi0(-i + 0.5, sigma) - phi0(-i - 0.5, sigma);
h[0] = 1.0 - phi0(n - 0.5, sigma);
h[2 * n] = phi0(-n + 0.5, sigma);
num.setValue(n);
return h;
}
项目:ij-ridgedetection
文件:Convol.java
/**
* Compute gauss mask 1.
*
* @param num
* the num
* @param sigma
* the sigma
* @return the double[]
*/
/*
* num ist eigentlich pointer - aufrufende Funkion nimmt an, dass num geändert
* wird. Übergebe es deswegen als MutableDouble aus CommonsLang
*/
public double[] compute_gauss_mask_1(MutableLong num, double sigma) {
int i, n;
double limit;
double[] h;
limit = LinesUtil.MASK_SIZE(LinesUtil.MAX_SIZE_MASK_1, sigma); /* Error < 0.001 on each side */
n = (int) limit;
h = new double[2 * n + 1];
for (i = -n + 1; i <= n - 1; i++)
h[n + i] = phi1(-i + 0.5, sigma) - phi1(-i - 0.5, sigma);
h[0] = -phi1(n - 0.5, sigma);
h[2 * n] = phi1(-n + 0.5, sigma);
num.setValue(n);
return h;
}
项目:ij-ridgedetection
文件:Convol.java
/**
* Compute gauss mask 2.
*
* @param num
* the num
* @param sigma
* the sigma
* @return the double[]
*/
/*
* num ist eigentlich pointer - aufrufende Funkion nimmt an, dass num geändert
* wird. Übergebe es deswegen als MutableDouble aus CommonsLang
*/
public double[] compute_gauss_mask_2(MutableLong num, double sigma) {
int i, n;
double limit;
double[] h;
limit = LinesUtil.MASK_SIZE(LinesUtil.MAX_SIZE_MASK_2, sigma); /* Error < 0.001 on each side */
n = (int) limit;
h = new double[2 * n + 1];
for (i = -n + 1; i <= n - 1; i++)
h[n + i] = phi2(-i + 0.5, sigma) - phi2(-i - 0.5, sigma);
h[0] = -phi2(n - 0.5, sigma);
h[2 * n] = phi2(-n + 0.5, sigma);
num.setValue(n);
return h;
}
项目:hbase
文件:AbstractFSWAL.java
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException {
if (this.closed) {
throw new IOException(
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
}
MutableLong txidHolder = new MutableLong();
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
txidHolder.setValue(ringBuffer.next());
});
long txid = txidHolder.longValue();
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry);
} finally {
ringBuffer.publish(txid);
}
return txid;
}
项目:count-db
文件:FileDataInterface.java
@Override
public long freeMemory() {
MutableLong totalBytesReleased = new MutableLong(0);
ifNotClosed(() -> {
for (FileBucket bucket : fileBuckets) {
bucket.lockRead();
for (FileInfo fileInfo : bucket.getFiles()) {
long bytesReleased = fileInfo.discardFileContents();
updateSizeOfCachedFileContents(-bytesReleased);
totalBytesReleased.add(bytesReleased);
}
bucket.unlockRead();
}
});
return totalBytesReleased.longValue();
}
项目:apache-jmeter-2.10
文件:StatCalculator.java
/**
* Get the value which %percent% of the values are less than. This works
* just like median (where median represents the 50% point). A typical
* desire is to see the 90% point - the value that 90% of the data points
* are below, the remaining 10% are above.
*
* @param percent
* @return the value which %percent% of the values are less than
*/
public T getPercentPoint(double percent) {
if (count <= 0) {
return ZERO;
}
if (percent >= 1.0) {
return getMax();
}
// use Math.round () instead of simple (long) to provide correct value rounding
long target = Math.round (count * percent);
try {
for (Entry<T, MutableLong> val : valuesMap.entrySet()) {
target -= val.getValue().longValue();
if (target <= 0){
return val.getKey();
}
}
} catch (ConcurrentModificationException ignored) {
// ignored. May happen occasionally, but no harm done if so.
}
return ZERO; // TODO should this be getMin()?
}
项目:timely
文件:SortedStringAccumulator.java
@Override
public void add(String value) {
if (values.containsKey(value)) {
values.get(value).increment();
} else {
values.put(value, new MutableLong(1));
}
}
项目:timely
文件:SortedStringAccumulator.java
@Override
public void merge(Accumulator<String, ConcurrentSkipListMap<String, MutableLong>> other) {
other.getLocalValue().forEach((k, v) -> {
if (values.containsKey(k)) {
values.get(k).add(v.longValue());
} else {
values.put(k, v);
}
});
}
项目:Net2Plan
文件:NetPlan.java
/**
* <p>Default constructor. Creates an empty design</p>
*
* @since 0.4.0
*/
public NetPlan()
{
super(null, 0, 0, new AttributeMap());
this.netPlan = this;
DEFAULT_ROUTING_TYPE = RoutingType.SOURCE_ROUTING;
isModifiable = true;
networkDescription = "";
networkName = "";
nextElementId = new MutableLong(1);
layers = new ArrayList<NetworkLayer>();
nodes = new ArrayList<Node>();
srgs = new ArrayList<SharedRiskGroup>();
resources = new ArrayList<Resource>();
cache_nodesDown = new HashSet<Node>();
this.cache_type2Resources = new HashMap<String, Set<Resource>>();
this.cache_id2NodeMap = new HashMap<Long, Node>();
this.cache_id2ResourceMap = new HashMap<Long, Resource>();
this.cache_id2LayerMap = new HashMap<Long, NetworkLayer>();
this.cache_id2srgMap = new HashMap<Long, SharedRiskGroup>();
this.cache_id2LinkMap = new HashMap<Long, Link>();
this.cache_id2DemandMap = new HashMap<Long, Demand>();
this.cache_id2MulticastDemandMap = new HashMap<Long, MulticastDemand>();
this.cache_id2RouteMap = new HashMap<Long, Route>();
this.cache_id2MulticastTreeMap = new HashMap<Long, MulticastTree>();
this.cache_taggedElements = new HashMap<> ();
this.cache_nodesPerSiteName = new HashMap<> ();
this.cache_planningDomain2nodes = new HashMap<> ();
interLayerCoupling = new DirectedAcyclicGraph<NetworkLayer, DemandLinkMapping>(DemandLinkMapping.class);
defaultLayer = addLayer("Layer 0", null, null, null, null, null);
}
项目:cloud-meter
文件:StatCalculator.java
/**
* Returns the distribution of the values in the list.
*
* @return map containing either Integer or Long keys; entries are a Number array containing the key and the [Integer] count.
* TODO - why is the key value also stored in the entry array? See Bug 53825
*/
public Map<Number, Number[]> getDistribution() {
Map<Number, Number[]> items = new HashMap<>();
for (Entry<T, MutableLong> entry : valuesMap.entrySet()) {
Number[] dis = new Number[2];
dis[0] = entry.getKey();
dis[1] = entry.getValue();
items.put(entry.getKey(), dis);
}
return items;
}
项目:cloud-meter
文件:StatCalculator.java
private void updateValueCount(T actualValue, long sampleCount) {
MutableLong count = valuesMap.get(actualValue);
if (count != null) {
count.add(sampleCount);
} else {
// insert new value
valuesMap.put(actualValue, new MutableLong(sampleCount));
}
}
项目:apex-malhar
文件:ApexWindowedStreamImpl.java
@Override
public <K, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, Long>>>> STREAM countByKey(Function.ToKeyValue<T, K, Long> convertToKeyValue, Option... opts)
{
WindowedStream<Tuple<KeyValPair<K, Long>>> kvstream = map(convertToKeyValue);
KeyedWindowedOperatorImpl<K, Long, MutableLong, Long> keyedWindowedOperator = createKeyedWindowedOperator(new SumLong());
return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts);
}
项目:apex-malhar
文件:WindowEndQueueManager.java
@Override
public void endWindow()
{
for (QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, MutableLong>> tempNode = queryQueue.getHead();
tempNode != null; tempNode = tempNode.getNext()) {
MutableLong qc = tempNode.getPayload().getQueueContext();
qc.decrement();
}
}
项目:apex-malhar
文件:AppDataWindowEndQueueManager.java
@Override
public boolean enqueue(QUERY query, META_QUERY metaQuery, MutableLong context)
{
if (context != null) {
query.setCountdown(context.getValue());
}
if (query.isOneTime()) {
return super.enqueue(query, metaQuery, new MutableLong(1L));
} else {
return super.enqueue(query, metaQuery, new MutableLong(query.getCountdown()));
}
}
项目:apex-malhar
文件:AppDataWindowEndQueueManager.java
@Override
public boolean addingFilter(QueryBundle<QUERY, META_QUERY, MutableLong> queryBundle)
{
QueueListNode<QueryBundle<QUERY, META_QUERY, MutableLong>> queryNode =
queryIDToNode.get(queryBundle.getQuery().getId());
if (queryNode == null) {
return true;
}
queryNode.setPayload(queryBundle);
return false;
}
项目:apex-malhar
文件:AbstractAppDataSnapshotServer.java
@Override
public Result executeQuery(Query query, Void metaQuery, MutableLong queueContext)
{
return new DataResultSnapshot(query,
currentData,
queueContext.getValue());
}
项目:apex-malhar
文件:WEQueryQueueManagerTest.java
@Test
public void testSimpleAddRemove2()
{
WindowEndQueueManager<Query, Void> wqqm = new WindowEndQueueManager<>();
wqqm.setup(null);
wqqm.beginWindow(0);
Query query = new MockQuery("1");
wqqm.enqueue(query, null, new MutableLong(1L));
Query queryD = wqqm.dequeue().getQuery();
QueryBundle<Query, Void, MutableLong> qb = wqqm.dequeue();
Query queryD1 = qb == null ? null : qb.getQuery();
Query query1 = new MockQuery("2");
wqqm.enqueue(query1, null, new MutableLong(1L));
Query query1D = wqqm.dequeue().getQuery();
qb = wqqm.dequeue();
Query query1D1 = qb == null ? null : qb.getQuery();
wqqm.endWindow();
wqqm.teardown();
Assert.assertEquals("The queries must match.", query, queryD);
Assert.assertEquals("The queries must match.", null, queryD1);
Assert.assertEquals("The queries must match.", query1, query1D);
Assert.assertEquals("The queries must match.", null, query1D1);
}
项目:apex-malhar
文件:WEQueryQueueManagerTest.java
@Test
public void testSimpleAddAfterStarted()
{
WindowEndQueueManager<Query, Void> wqqm = new WindowEndQueueManager<>();
wqqm.setup(null);
wqqm.beginWindow(0);
Query query = new MockQuery("0");
wqqm.enqueue(query, null, new MutableLong(1L));
Query query1 = new MockQuery("1");
wqqm.enqueue(query1, null, new MutableLong(1L));
Query queryD = wqqm.dequeue().getQuery();
Query query2 = new MockQuery("2");
wqqm.enqueue(query2, null, new MutableLong(1L));
Query query1D = wqqm.dequeue().getQuery();
Query query2D = wqqm.dequeue().getQuery();
QueryBundle<Query, Void, MutableLong> qb = wqqm.dequeue();
Query query3D = qb == null ? null : qb.getQuery();
wqqm.endWindow();
wqqm.teardown();
Assert.assertEquals("The queries must match.", query, queryD);
Assert.assertEquals("The queries must match.", query1, query1D);
Assert.assertEquals("The queries must match.", query2, query2D);
Assert.assertEquals("The queries must match.", null, query3D);
}
项目:apex-malhar
文件:QueryManagerAsynchronousTest.java
@Override
public MockResult executeQuery(MockQuery query, Void metaQuery, MutableLong queueContext)
{
if (rand.nextDouble() < waitMillisProb) {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
return new MockResult(query);
}
项目:apex-malhar
文件:MovingBoundaryTimeBucketAssignerTest.java
@Test
public void testTimeBucketKeyExpiry()
{
final MutableLong purgeLessThanEqualTo = new MutableLong(-2);
testMeta.timeBucketAssigner.setExpireBefore(Duration.standardSeconds(1));
testMeta.timeBucketAssigner.setBucketSpan(Duration.standardSeconds(1));
testMeta.timeBucketAssigner.setPurgeListener(new TimeBucketAssigner.PurgeListener()
{
@Override
public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
{
purgeLessThanEqualTo.setValue(timeBucket);
}
});
long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
Assert.assertEquals("purgeLessThanEqualTo", -2L, purgeLessThanEqualTo.longValue());
long time0 = Duration.standardSeconds(0).getMillis() + referenceTime;
Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucket(time0) );
testMeta.timeBucketAssigner.endWindow();
Assert.assertEquals("purgeLessThanEqualTo", -1, purgeLessThanEqualTo.longValue());
long time1 = Duration.standardSeconds(9).getMillis() + referenceTime;
Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucket(time1) );
testMeta.timeBucketAssigner.endWindow();
Assert.assertEquals("purgeLessThanEqualTo", 8, purgeLessThanEqualTo.longValue());
long time2 = Duration.standardSeconds(10).getMillis() + referenceTime;
Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucket(time2) );
testMeta.timeBucketAssigner.endWindow();
Assert.assertEquals("purgeLessThanEqualTo", 9, purgeLessThanEqualTo.longValue());
//Check for expiry of time1 now
Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucket(time1) );
testMeta.timeBucketAssigner.endWindow();
Assert.assertEquals("purgeLessThanEqualTo", 9, purgeLessThanEqualTo.longValue());
testMeta.timeBucketAssigner.teardown();
}
项目:apex-malhar
文件:WindowedOperatorTest.java
private KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator(boolean forSession)
{
KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = new KeyedWindowedOperatorImpl<>();
if (useSpillable) {
sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore);
// TODO: We don't yet support Spillable data structures for window state storage because SpillableMapImpl does not yet support iterating over all keys.
windowStateStorage = new InMemoryWindowedStorage<>();
if (forSession) {
SpillableSessionWindowedStorage<String, MutableLong> sws = new SpillableSessionWindowedStorage<>();
sws.setSpillableComplexComponent(sccImpl);
keyedDataStorage = sws;
} else {
SpillableWindowedKeyedStorage<String, MutableLong> kds = new SpillableWindowedKeyedStorage<>();
kds.setSpillableComplexComponent(sccImpl);
keyedDataStorage = kds;
}
SpillableWindowedKeyedStorage<String, Long> krs = new SpillableWindowedKeyedStorage<>();
krs.setSpillableComplexComponent(sccImpl);
keyedRetractionStorage = krs;
windowedOperator.addComponent("SpillableComplexComponent", sccImpl);
} else {
windowStateStorage = new InMemoryWindowedStorage<>();
if (forSession) {
keyedDataStorage = new InMemorySessionWindowedStorage<>();
} else {
keyedDataStorage = new InMemoryWindowedKeyedStorage<>();
}
keyedRetractionStorage = new InMemoryWindowedKeyedStorage<>();
}
windowedOperator.setDataStorage(keyedDataStorage);
windowedOperator.setRetractionStorage(keyedRetractionStorage);
windowedOperator.setWindowStateStorage(windowStateStorage);
windowedOperator.setAccumulation(new SumAccumulation());
return windowedOperator;
}
项目:apex-malhar
文件:WindowedOperatorTest.java
@Test
public void testImplicitWatermarks()
{
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
CollectorTestSink controlSink = new CollectorTestSink();
windowedOperator.controlOutput.setSink(controlSink);
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
windowedOperator.setAllowedLateness(Duration.millis(1000));
windowedOperator.setImplicitWatermarkGenerator(new FixedDiffEventTimeWatermarkGen(100));
windowedOperator.setup(testMeta.operatorContext);
windowedOperator.beginWindow(1);
windowedOperator.endWindow();
Assert.assertEquals("We should get no watermark tuple", 0, controlSink.getCount(false));
windowedOperator.beginWindow(2);
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, 2L));
windowedOperator.endWindow();
Assert.assertEquals("We should get one watermark tuple", 1, controlSink.getCount(false));
Assert.assertEquals("Check Watermark value",
((ControlTuple.Watermark)controlSink.collectedTuples.get(0)).getTimestamp(), BASE);
windowedOperator.beginWindow(3);
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 900L, 4L));
windowedOperator.endWindow();
Assert.assertEquals("We should get two watermark tuples", 2, controlSink.getCount(false));
Assert.assertEquals("Check Watermark value",
((ControlTuple.Watermark)controlSink.collectedTuples.get(1)).getTimestamp(), BASE + 800);
}
项目:apex-malhar
文件:WindowedOperatorTest.java
@Test
public void testGlobalWindowAssignment()
{
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
windowedOperator.setWindowOption(new WindowOption.GlobalWindow());
windowedOperator.setup(testMeta.operatorContext);
Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1100L, 2L));
Collection<? extends Window> windows = windowedValue.getWindows();
Assert.assertEquals(1, windows.size());
Assert.assertEquals(Window.GlobalWindow.INSTANCE, windows.iterator().next());
windowedOperator.teardown();
}
项目:apex-malhar
文件:WindowedOperatorTest.java
@Test
public void testTimeWindowAssignment()
{
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
windowedOperator.setup(testMeta.operatorContext);
Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1100L, 2L));
Collection<? extends Window> windows = windowedValue.getWindows();
Assert.assertEquals(1, windows.size());
Window window = windows.iterator().next();
Assert.assertEquals(BASE + 1000, window.getBeginTimestamp());
Assert.assertEquals(1000, window.getDurationMillis());
}
项目:apex-malhar
文件:Application.java
@Override
public MutablePair<MutableLong, MutableLong> accumulate(MutablePair<MutableLong, MutableLong> accumulatedValue, MutablePair<Double, Double> input)
{
if (input.getLeft() * input.getLeft() + input.getRight() * input.getRight() < 1) {
accumulatedValue.getLeft().increment();
}
accumulatedValue.getRight().increment();
return accumulatedValue;
}
项目:apex-malhar
文件:Application.java
@Override
public MutablePair<MutableLong, MutableLong> merge(MutablePair<MutableLong, MutableLong> accumulatedValue1, MutablePair<MutableLong, MutableLong> accumulatedValue2)
{
accumulatedValue1.getLeft().add(accumulatedValue2.getLeft());
accumulatedValue1.getRight().add(accumulatedValue2.getRight());
return accumulatedValue1;
}
项目:apex-samples
文件:Application.java
@Override
public void populateDAG(DAG dag, Configuration configuration)
{
WordGenerator inputOperator = new WordGenerator();
KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator =
new KeyedWindowedOperatorImpl<>();
Accumulation<Long, MutableLong, Long> sum = new SumLong();
windowedOperator.setAccumulation(sum);
windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, MutableLong>());
windowedOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, Long>());
windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(1)));
windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(
Duration.millis(1000)).accumulatingAndRetractingFiredPanes());
ConsoleOutputOperator consoleOutput = new ConsoleOutputOperator();
GenericFileOutputOperator<Object> fileOutput = new GenericFileOutputOperator<>();
fileOutput.setConverter(new ToStringConverter());
dag.addOperator("inputOperator", inputOperator);
dag.addOperator("windowedOperator", windowedOperator);
dag.addOperator("consoleOutput", consoleOutput);
dag.addOperator("output", fileOutput);
dag.addStream("input_windowed", inputOperator.output, windowedOperator.input);
dag.addStream("windowed_output", windowedOperator.output, consoleOutput.input, fileOutput.input);
}