Java 类org.apache.spark.api.java.function.FlatMapFunction 实例源码
项目:incubator-sdap-mudrod
文件:RDDUtil.java
/**
* getAllWordsInDoc: Extracted all unique terms from all docs.
*
* @param docwordRDD Pair RDD, each key is a doc, and value is term list extracted from
* that doc.
* @return unique term list
*/
public static JavaRDD<String> getAllWordsInDoc(JavaPairRDD<String, List<String>> docwordRDD) {
JavaRDD<String> wordRDD = docwordRDD.values().flatMap(new FlatMapFunction<List<String>, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Iterator<String> call(List<String> list) {
return list.iterator();
}
}).distinct();
return wordRDD;
}
项目:incubator-sdap-mudrod
文件:SessionExtractor.java
/**
* loadClickStremFromTxt:Load click stream form txt file
*
* @param clickthroughFile
* txt file
* @param sc
* the spark context
* @return clickstream list in JavaRDD format {@link ClickStream}
*/
public JavaRDD<ClickStream> loadClickStremFromTxt(String clickthroughFile, JavaSparkContext sc) {
return sc.textFile(clickthroughFile).flatMap(new FlatMapFunction<String, ClickStream>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@SuppressWarnings("unchecked")
@Override
public Iterator<ClickStream> call(String line) throws Exception {
List<ClickStream> clickthroughs = (List<ClickStream>) ClickStream.parseFromTextLine(line);
return (Iterator<ClickStream>) clickthroughs;
}
});
}
项目:incubator-sdap-mudrod
文件:CrawlerDetection.java
void checkByRateInParallel() throws InterruptedException, IOException {
JavaRDD<String> userRDD = getUserRDD(this.httpType);
LOG.info("Original User count: {}", userRDD.count());
int userCount = 0;
userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>, Integer>) iterator -> {
ESDriver tmpES = new ESDriver(props);
tmpES.createBulkProcessor();
List<Integer> realUserNums = new ArrayList<>();
while (iterator.hasNext()) {
String s = iterator.next();
Integer realUser = checkByRate(tmpES, s);
realUserNums.add(realUser);
}
tmpES.destroyBulkProcessor();
tmpES.close();
return realUserNums.iterator();
}).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b);
LOG.info("User count: {}", Integer.toString(userCount));
}
项目:mudrod
文件:RDDUtil.java
/**
* getAllWordsInDoc: Extracted all unique terms from all docs.
*
* @param docwordRDD Pair RDD, each key is a doc, and value is term list extracted from
* that doc.
* @return unique term list
*/
public static JavaRDD<String> getAllWordsInDoc(JavaPairRDD<String, List<String>> docwordRDD) {
JavaRDD<String> wordRDD = docwordRDD.values().flatMap(new FlatMapFunction<List<String>, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Iterator<String> call(List<String> list) {
return list.iterator();
}
}).distinct();
return wordRDD;
}
项目:mudrod
文件:SessionExtractor.java
/**
* loadClickStremFromTxt:Load click stream form txt file
*
* @param clickthroughFile
* txt file
* @param sc
* the spark context
* @return clickstream list in JavaRDD format {@link ClickStream}
*/
public JavaRDD<ClickStream> loadClickStremFromTxt(String clickthroughFile, JavaSparkContext sc) {
return sc.textFile(clickthroughFile).flatMap(new FlatMapFunction<String, ClickStream>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@SuppressWarnings("unchecked")
@Override
public Iterator<ClickStream> call(String line) throws Exception {
List<ClickStream> clickthroughs = (List<ClickStream>) ClickStream.parseFromTextLine(line);
return (Iterator<ClickStream>) clickthroughs;
}
});
}
项目:mudrod
文件:CrawlerDetection.java
void checkByRateInParallel() throws InterruptedException, IOException {
JavaRDD<String> userRDD = getUserRDD(this.httpType);
LOG.info("Original User count: {}", userRDD.count());
int userCount = 0;
userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>, Integer>) iterator -> {
ESDriver tmpES = new ESDriver(props);
tmpES.createBulkProcessor();
List<Integer> realUserNums = new ArrayList<>();
while (iterator.hasNext()) {
String s = iterator.next();
Integer realUser = checkByRate(tmpES, s);
realUserNums.add(realUser);
}
tmpES.destroyBulkProcessor();
tmpES.close();
return realUserNums.iterator();
}).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b);
LOG.info("User count: {}", Integer.toString(userCount));
}
项目:beam
文件:TranslationUtils.java
/** A pair to {@link KV} flatmap function . */
static <K, V> FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>> fromPairFlatMapFunction() {
return new FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>>() {
@Override
public Iterator<KV<K, V>> call(Iterator<Tuple2<K, V>> itr) {
final Iterator<KV<K, V>> outputItr =
Iterators.transform(
itr,
new com.google.common.base.Function<Tuple2<K, V>, KV<K, V>>() {
@Override
public KV<K, V> apply(Tuple2<K, V> t2) {
return KV.of(t2._1(), t2._2());
}
});
return outputItr;
}
};
}
项目:beam
文件:TranslationUtils.java
/**
* A utility method that adapts {@link Function} to a {@link FlatMapFunction} with an {@link
* Iterator} input. This is particularly useful because it allows to use functions written for map
* functions in flatmap functions.
*
* @param func the {@link Function} to adapt.
* @param <InputT> the input type.
* @param <OutputT> the output type.
* @return a {@link FlatMapFunction} that accepts an {@link Iterator} as an input and applies the
* {@link Function} on every element.
*/
public static <InputT, OutputT>
FlatMapFunction<Iterator<InputT>, OutputT> functionToFlatMapFunction(
final Function<InputT, OutputT> func) {
return new FlatMapFunction<Iterator<InputT>, OutputT>() {
@Override
public Iterator<OutputT> call(Iterator<InputT> itr) throws Exception {
final Iterator<OutputT> outputItr =
Iterators.transform(
itr,
new com.google.common.base.Function<InputT, OutputT>() {
@Override
public OutputT apply(InputT t) {
try {
return func.call(t);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
return outputItr;
}
};
}
项目:chronix.spark
文件:ChronixSparkContext.java
/**
* Low-level chunked query.
*
* @param query Solr query
* @param zkHost Zookeeper host
* @param collection the Solr collection of chronix time series data
* @param chronixStorage a ChronixSolrCloudStorage instance
* @return ChronixRDD of time series (chunks)
* @throws SolrServerException
*/
public ChronixRDD queryChronixChunks(
final SolrQuery query,
final String zkHost,
final String collection,
final ChronixSolrCloudStorage chronixStorage) throws SolrServerException, IOException {
// first get a list of replicas to query for this collection
List<String> shards = chronixStorage.getShardList(zkHost, collection);
// parallelize the requests to the shards
JavaRDD<MetricTimeSeries> docs = jsc.parallelize(shards, shards.size()).flatMap(
(FlatMapFunction<String, MetricTimeSeries>) shardUrl -> chronixStorage.streamFromSingleNode(
zkHost, collection, shardUrl, query, new MetricTimeSeriesConverter()).iterator());
return new ChronixRDD(docs);
}
项目:chronix.spark
文件:ChronixRDD.java
/**
* Transformation: Transforms the ChronixRDD into a RDD of MetricObservations (pair of timestamp & value + dimensions).
*
* @return RDD of MetricObservations
*/
public JavaRDD<MetricObservation> toObservations() {
return this.flatMap((FlatMapFunction<MetricTimeSeries, MetricObservation>) ts -> ts.points().map(point -> {
//null-safe read of dimensional values
String host = ts.attributes().get(MetricDimension.HOST) == null ? null
: ts.attributes().get(MetricDimension.HOST).toString();
String series = ts.attributes().get(MetricDimension.MEASUREMENT_SERIES) == null ? null
: ts.attributes().get(MetricDimension.MEASUREMENT_SERIES).toString();
String process = ts.attributes().get(MetricDimension.PROCESS) == null ? null
: ts.attributes().get(MetricDimension.PROCESS).toString();
String group = ts.attributes().get(MetricDimension.METRIC_GROUP) == null ? null
: ts.attributes().get(MetricDimension.METRIC_GROUP).toString();
String ag = ts.attributes().get(MetricDimension.AGGREGATION_LEVEL) == null ? null
: ts.attributes().get(MetricDimension.AGGREGATION_LEVEL).toString();
//convert Point/MetricTimeSeries to MetricObservation
return new MetricObservation(
ts.getMetric(),
host, series, process, group, ag,
point.getTimestamp(),
point.getValue()
);
}).iterator());
}
项目:power-java
文件:HelloSpark.java
static public void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext("local", "Hello Spark");
JavaRDD<String> lines = sc.textFile("data/test1.txt");
//JavaRDD<String> tokens = lines.flatMap(line -> tokenize(line)); // worked for mllib version 1.5, not for version 2.0
JavaRDD<String> tokens = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) {
return tokenize(s).iterator();
}
});
JavaPairRDD<String, Integer> counts =
tokens.mapToPair(
token ->
new Tuple2<String, Integer>(token.toLowerCase(), 1))
.reduceByKey((count1, count2) -> count1 + count2);
Map countMap = counts.collectAsMap();
System.out.println(countMap);
List<Tuple2<String, Integer>> collection = counts.collect();
System.out.println(collection);
}
项目:SHMACK
文件:WordCount.java
@SuppressWarnings("serial")
@Override
public SortedCounts<String> execute(final JavaSparkContext spark) {
final JavaRDD<String> textFile = spark.textFile(inputFile);
final JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(final String rawJSON) throws TwitterException {
final Status tweet = TwitterObjectFactory.createStatus(rawJSON);
String text = tweet.getText();
return Arrays.asList(text.split(" "));
}
});
final JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(final String s) {
return new Tuple2<String, Integer>(s.toLowerCase(), 1);
}
});
final JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(final Integer a, final Integer b) {
return a + b;
}
});
return SortedCounts.create(counts);
}
项目:GeoSpark
文件:SpatialRDD.java
private JavaRDD<T> partition(final SpatialPartitioner partitioner) {
return this.rawSpatialRDD.flatMapToPair(
new PairFlatMapFunction<T, Integer, T>() {
@Override
public Iterator<Tuple2<Integer, T>> call(T spatialObject) throws Exception {
return partitioner.placeObject(spatialObject);
}
}
).partitionBy(partitioner)
.mapPartitions(new FlatMapFunction<Iterator<Tuple2<Integer, T>>, T>() {
@Override
public Iterator<T> call(final Iterator<Tuple2<Integer, T>> tuple2Iterator) throws Exception {
return new Iterator<T>() {
@Override
public boolean hasNext() {
return tuple2Iterator.hasNext();
}
@Override
public T next() {
return tuple2Iterator.next()._2();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}; }
}, true);
}
项目:learning-spark-examples
文件:BasicFlatMap.java
public static void main(String[] args) throws Exception {
if (args.length != 2) {
throw new Exception("Usage BasicFlatMap sparkMaster inputFile");
}
JavaSparkContext sc = new JavaSparkContext(
args[0], "basicflatmap", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> rdd = sc.textFile(args[1]);
JavaRDD<String> words = rdd.flatMap(
new FlatMapFunction<String, String>() { public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}});
Map<String, Long> result = words.countByValue();
for (Entry<String, Long> entry: result.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
}
}
项目:gatk
文件:ReadWalkerSpark.java
private static FlatMapFunction<Shard<GATKRead>, ReadWalkerContext> getReadsFunction(
Broadcast<ReferenceMultiSource> bReferenceSource, Broadcast<FeatureManager> bFeatureManager,
SAMSequenceDictionary sequenceDictionary, int readShardPadding) {
return (FlatMapFunction<Shard<GATKRead>, ReadWalkerContext>) shard -> {
// get reference bases for this shard (padded)
SimpleInterval paddedInterval = shard.getInterval().expandWithinContig(readShardPadding, sequenceDictionary);
ReferenceDataSource reference = bReferenceSource == null ? null :
new ReferenceMemorySource(bReferenceSource.getValue().getReferenceBases(paddedInterval), sequenceDictionary);
FeatureManager features = bFeatureManager == null ? null : bFeatureManager.getValue();
return StreamSupport.stream(shard.spliterator(), false)
.map(r -> {
final SimpleInterval readInterval = getReadInterval(r);
return new ReadWalkerContext(r, new ReferenceContext(reference, readInterval), new FeatureContext(features, readInterval));
}).iterator();
};
}
项目:gatk
文件:VariantWalkerSpark.java
private static FlatMapFunction<Shard<VariantContext>, VariantWalkerContext> getVariantsFunction(
final Broadcast<ReferenceMultiSource> bReferenceSource,
final Broadcast<FeatureManager> bFeatureManager,
final SAMSequenceDictionary sequenceDictionary, final int variantShardPadding) {
return (FlatMapFunction<Shard<VariantContext>, VariantWalkerContext>) shard -> {
// get reference bases for this shard (padded)
SimpleInterval paddedInterval = shard.getInterval().expandWithinContig(variantShardPadding, sequenceDictionary);
ReferenceDataSource reference = bReferenceSource == null ? null :
new ReferenceMemorySource(bReferenceSource.getValue().getReferenceBases(paddedInterval), sequenceDictionary);
FeatureManager features = bFeatureManager == null ? null : bFeatureManager.getValue();
return StreamSupport.stream(shard.spliterator(), false)
.filter(v -> v.getStart() >= shard.getStart() && v.getStart() <= shard.getEnd()) // only include variants that start in the shard
.map(v -> {
final SimpleInterval variantInterval = new SimpleInterval(v);
return new VariantWalkerContext(v,
new ReadsContext(), // empty
new ReferenceContext(reference, variantInterval),
new FeatureContext(features, variantInterval));
}).iterator();
};
}
项目:gatk
文件:HaplotypeCallerSpark.java
/**
* @return and RDD of {@link Tuple2<AssemblyRegion, SimpleInterval>} which pairs each AssemblyRegion with the
* interval it was generated in
*/
private static FlatMapFunction<Iterator<Shard<GATKRead>>, Tuple2<AssemblyRegion, SimpleInterval>> shardsToAssemblyRegions(
final Broadcast<ReferenceMultiSource> reference,
final Broadcast<HaplotypeCallerArgumentCollection> hcArgsBroadcast,
final ShardingArgumentCollection assemblyArgs,
final SAMFileHeader header,
final Broadcast<VariantAnnotatorEngine> annotatorEngineBroadcast) {
return shards -> {
final ReferenceMultiSource referenceMultiSource = reference.value();
final ReferenceMultiSourceAdapter referenceSource = new ReferenceMultiSourceAdapter(referenceMultiSource);
final HaplotypeCallerEngine hcEngine = new HaplotypeCallerEngine(hcArgsBroadcast.value(), false, false, header, referenceSource, annotatorEngineBroadcast.getValue());
final ReadsDownsampler readsDownsampler = assemblyArgs.maxReadsPerAlignmentStart > 0 ?
new PositionalDownsampler(assemblyArgs.maxReadsPerAlignmentStart, header) : null;
return Utils.stream(shards)
//TODO we've hacked multi interval shards here with a shim, but we should investigate as smarter approach https://github.com/broadinstitute/gatk/issues/4299
.map(shard -> new ShardToMultiIntervalShardAdapter<>(
new DownsampleableSparkReadShard(new ShardBoundary(shard.getInterval(), shard.getPaddedInterval()), shard, readsDownsampler)))
.flatMap(shardToRegion(assemblyArgs, header, referenceSource, hcEngine)).iterator();
};
}
项目:gatk
文件:ReadsSparkSourceUnitTest.java
@Test(dataProvider = "readPairsAndPartitions")
public void testPutPairsInSamePartition(int numPairs, int numPartitions, int[] expectedReadsPerPartition) throws IOException {
JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();
SAMFileHeader header = ArtificialReadUtils.createArtificialSamHeader();
header.setSortOrder(SAMFileHeader.SortOrder.queryname);
JavaRDD<GATKRead> reads = createPairedReads(ctx, header, numPairs, numPartitions);
ReadsSparkSource readsSparkSource = new ReadsSparkSource(ctx);
JavaRDD<GATKRead> pairedReads = readsSparkSource.putPairsInSamePartition(header, reads);
List<List<GATKRead>> partitions = pairedReads.mapPartitions((FlatMapFunction<Iterator<GATKRead>, List<GATKRead>>) it ->
Iterators.singletonIterator(Lists.newArrayList(it))).collect();
assertEquals(partitions.size(), numPartitions);
for (int i = 0; i < numPartitions; i++) {
assertEquals(partitions.get(i).size(), expectedReadsPerPartition[i]);
}
assertEquals(Arrays.stream(expectedReadsPerPartition).sum(), numPairs * 2);
}
项目:deeplearning4j
文件:ParameterAveragingTrainingMaster.java
protected void doIterationPathsMDS(SparkComputationGraph graph, JavaRDD<String> split, int splitNum, int numSplits,
int dataSetObjectNumExamples) {
log.info("Starting training of split {} of {}. workerMiniBatchSize={}, averagingFreq={}, Configured for {} workers",
splitNum, numSplits, batchSizePerWorker, averagingFrequency, numWorkers);
if (collectTrainingStats)
stats.logMapPartitionsStart();
JavaRDD<String> splitData = split;
if (collectTrainingStats)
stats.logRepartitionStart();
splitData = SparkUtils.repartition(splitData, repartition, repartitionStrategy,
numObjectsEachWorker(dataSetObjectNumExamples), numWorkers);
int nPartitions = splitData.partitions().size();
if (collectTrainingStats && repartition != Repartition.Never)
stats.logRepartitionEnd();
FlatMapFunction<Iterator<String>, ParameterAveragingTrainingResult> function =
new ExecuteWorkerPathMDSFlatMap<>(getWorkerInstance(graph));
JavaRDD<ParameterAveragingTrainingResult> result = splitData.mapPartitions(function);
processResults(null, graph, result, splitNum, numSplits);
if (collectTrainingStats)
stats.logMapPartitionsEnd(nPartitions);
}
项目:deeplearning4j
文件:ParameterAveragingTrainingMaster.java
protected void doIteration(SparkComputationGraph graph, JavaRDD<MultiDataSet> split, int splitNum, int numSplits) {
log.info("Starting training of split {} of {}. workerMiniBatchSize={}, averagingFreq={}, Configured for {} workers",
splitNum, numSplits, batchSizePerWorker, averagingFrequency, numWorkers);
if (collectTrainingStats)
stats.logMapPartitionsStart();
JavaRDD<MultiDataSet> splitData = split;
splitData = SparkUtils.repartition(splitData, repartition, repartitionStrategy,
numObjectsEachWorker(rddDataSetNumExamples), numWorkers);
int nPartitions = split.partitions().size();
FlatMapFunction<Iterator<MultiDataSet>, ParameterAveragingTrainingResult> function =
new ExecuteWorkerMultiDataSetFlatMap<>(getWorkerInstance(graph));
JavaRDD<ParameterAveragingTrainingResult> result = splitData.mapPartitions(function);
processResults(null, graph, result, splitNum, numSplits);
if (collectTrainingStats)
stats.logMapPartitionsEnd(nPartitions);
}
项目:deeplearning4j
文件:ParameterAveragingTrainingMaster.java
protected void doIterationPDS_MDS(SparkComputationGraph graph, JavaRDD<PortableDataStream> split, int splitNum,
int numSplits) {
log.info("Starting training of split {} of {}. workerMiniBatchSize={}, averagingFreq={}, Configured for {} workers",
splitNum, numSplits, batchSizePerWorker, averagingFrequency, numWorkers);
if (collectTrainingStats)
stats.logMapPartitionsStart();
JavaRDD<PortableDataStream> splitData = split;
if (collectTrainingStats)
stats.logRepartitionStart();
splitData = SparkUtils.repartition(splitData, repartition, repartitionStrategy,
numObjectsEachWorker(rddDataSetNumExamples), numWorkers);
int nPartitions = splitData.partitions().size();
if (collectTrainingStats && repartition != Repartition.Never)
stats.logRepartitionEnd();
FlatMapFunction<Iterator<PortableDataStream>, ParameterAveragingTrainingResult> function =
new ExecuteWorkerPDSMDSFlatMap<>(getWorkerInstance(graph));
JavaRDD<ParameterAveragingTrainingResult> result = splitData.mapPartitions(function);
processResults(null, graph, result, splitNum, numSplits);
if (collectTrainingStats)
stats.logMapPartitionsEnd(nPartitions);
}
项目:AbstractRendering
文件:GlyphsetRDD.java
@Override public Rectangle2D bounds() {
final JavaRDD<Rectangle2D> rects;
if (partitions) {
rects = base.mapPartitions(
new FlatMapFunction<Iterator<Glyph<G,I>>,Rectangle2D>() {
public Iterable<Rectangle2D> call(Iterator<Glyph<G, I>> glyphs) throws Exception {
ArrayList<Glyph<G,I>> glyphList = Lists.newArrayList(new IterableIterator<>(glyphs));
return Arrays.asList(Util.bounds(glyphList));
}});
} else {
rects = base.map(new Function<Glyph<G,I>,Rectangle2D>() {
public Rectangle2D call(Glyph<G,I> glyph) throws Exception {
return Util.boundOne(glyph.shape());
}});
}
return rects.reduce(new Function2<Rectangle2D, Rectangle2D,Rectangle2D>() {
public Rectangle2D call(Rectangle2D left, Rectangle2D right) throws Exception {
return Util.bounds(left, right);
}
});
}
项目:BLASpark
文件:OtherOperations.java
private static CoordinateMatrix GetLU_COORD(CoordinateMatrix A) {
JavaRDD<MatrixEntry> rows = A.entries().toJavaRDD().cache();
JavaRDD<MatrixEntry> LUEntries = rows.mapPartitions(new FlatMapFunction<Iterator<MatrixEntry>, MatrixEntry>() {
@Override
public Iterator<MatrixEntry> call(Iterator<MatrixEntry> matrixEntryIterator) throws Exception {
List<MatrixEntry> newLowerEntries = new ArrayList<MatrixEntry>();
while(matrixEntryIterator.hasNext()) {
MatrixEntry currentEntry = matrixEntryIterator.next();
if(currentEntry.i() != currentEntry.j()) {
newLowerEntries.add(currentEntry);
}
else {
newLowerEntries.add(new MatrixEntry(currentEntry.i(), currentEntry.j(), 0.0));
}
}
return newLowerEntries.iterator();
}
});
CoordinateMatrix newMatrix = new CoordinateMatrix(LUEntries.rdd());
return newMatrix;
}
项目:BLASpark
文件:OtherOperations.java
private static CoordinateMatrix GetD_COORD(CoordinateMatrix A, boolean inverseValues, JavaSparkContext jsc) {
JavaRDD<MatrixEntry> rows = A.entries().toJavaRDD().cache();
final Broadcast<Boolean> inverseValuesBC = jsc.broadcast(inverseValues);
JavaRDD<MatrixEntry> LUEntries = rows.mapPartitions(new FlatMapFunction<Iterator<MatrixEntry>, MatrixEntry>() {
@Override
public Iterator<MatrixEntry> call(Iterator<MatrixEntry> matrixEntryIterator) throws Exception {
List<MatrixEntry> newLowerEntries = new ArrayList<MatrixEntry>();
boolean inverseValuesValue = inverseValuesBC.getValue().booleanValue();
while(matrixEntryIterator.hasNext()) {
MatrixEntry currentEntry = matrixEntryIterator.next();
if(currentEntry.i() == currentEntry.j()) {
if(inverseValuesValue) {
newLowerEntries.add(new MatrixEntry(currentEntry.i(), currentEntry.j(), 1.0/currentEntry.value()));
}
else {
newLowerEntries.add(currentEntry);
}
}
else {
newLowerEntries.add(new MatrixEntry(currentEntry.i(), currentEntry.j(), 0.0));
}
}
return newLowerEntries.iterator();
}
});
CoordinateMatrix newMatrix = new CoordinateMatrix(LUEntries.rdd());
return newMatrix;
}
项目:incubator-sdap-mudrod
文件:SessionGenerator.java
public void genSessionByRefererInParallel(int timeThres) throws InterruptedException, IOException {
JavaRDD<String> userRDD = getUserRDD(this.cleanupType);
int sessionCount = 0;
sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
ESDriver tmpES = new ESDriver(props);
tmpES.createBulkProcessor();
List<Integer> sessionNums = new ArrayList<>();
while (arg0.hasNext()) {
String s = arg0.next();
Integer sessionNum = genSessionByReferer(tmpES, s, timeThres);
sessionNums.add(sessionNum);
}
tmpES.destroyBulkProcessor();
tmpES.close();
return sessionNums.iterator();
}
}).reduce(new Function2<Integer, Integer, Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer a, Integer b) {
return a + b;
}
});
LOG.info("Initial Session count: {}", Integer.toString(sessionCount));
}
项目:incubator-sdap-mudrod
文件:SessionStatistic.java
public void processSessionInParallel() throws InterruptedException, IOException {
List<String> sessions = this.getSessions();
JavaRDD<String> sessionRDD = spark.sc.parallelize(sessions, partition);
int sessionCount = 0;
sessionCount = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
@Override
public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
ESDriver tmpES = new ESDriver(props);
tmpES.createBulkProcessor();
List<Integer> sessionNums = new ArrayList<Integer>();
sessionNums.add(0);
while (arg0.hasNext()) {
String s = arg0.next();
Integer sessionNum = processSession(tmpES, s);
sessionNums.add(sessionNum);
}
tmpES.destroyBulkProcessor();
tmpES.close();
return sessionNums.iterator();
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) {
return a + b;
}
});
LOG.info("Final Session count: {}", Integer.toString(sessionCount));
}
项目:Sparkathon
文件:JavaStructuredNetworkWordCount.java
public static void main(String args[]) throws StreamingQueryException {
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCount")
.master("local")
.config("spark.sql.shuffle.partitions", 8)
.getOrCreate();
// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
// Split the lines into words
Dataset<String> words = lines
.as(Encoders.STRING())
.flatMap(
new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
}
项目:mudrod
文件:SessionGenerator.java
public void genSessionByRefererInParallel(int timeThres) throws InterruptedException, IOException {
JavaRDD<String> userRDD = getUserRDD(this.cleanupType);
int sessionCount = 0;
sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
ESDriver tmpES = new ESDriver(props);
tmpES.createBulkProcessor();
List<Integer> sessionNums = new ArrayList<>();
while (arg0.hasNext()) {
String s = arg0.next();
Integer sessionNum = genSessionByReferer(tmpES, s, timeThres);
sessionNums.add(sessionNum);
}
tmpES.destroyBulkProcessor();
tmpES.close();
return sessionNums.iterator();
}
}).reduce(new Function2<Integer, Integer, Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer a, Integer b) {
return a + b;
}
});
LOG.info("Initial Session count: {}", Integer.toString(sessionCount));
}
项目:mudrod
文件:SessionStatistic.java
public void processSessionInParallel() throws InterruptedException, IOException {
List<String> sessions = this.getSessions();
JavaRDD<String> sessionRDD = spark.sc.parallelize(sessions, partition);
int sessionCount = 0;
sessionCount = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
@Override
public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
ESDriver tmpES = new ESDriver(props);
tmpES.createBulkProcessor();
List<Integer> sessionNums = new ArrayList<Integer>();
sessionNums.add(0);
while (arg0.hasNext()) {
String s = arg0.next();
Integer sessionNum = processSession(tmpES, s);
sessionNums.add(sessionNum);
}
tmpES.destroyBulkProcessor();
tmpES.close();
return sessionNums.iterator();
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) {
return a + b;
}
});
LOG.info("Final Session count: {}", Integer.toString(sessionCount));
}
项目:incubator-pulsar
文件:SparkStreamingPulsarReceiverExample.java
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("pulsar-spark");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
ClientConfiguration clientConf = new ClientConfiguration();
ConsumerConfiguration consConf = new ConsumerConfiguration();
String url = "pulsar://localhost:6650/";
String topic = "persistent://sample/standalone/ns1/topic1";
String subs = "sub1";
JavaReceiverInputDStream<byte[]> msgs = jssc
.receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf, url, topic, subs));
JavaDStream<Integer> isContainingPulsar = msgs.flatMap(new FlatMapFunction<byte[], Integer>() {
@Override
public Iterator<Integer> call(byte[] msg) {
return Arrays.asList(((new String(msg)).indexOf("Pulsar") != -1) ? 1 : 0).iterator();
}
});
JavaDStream<Integer> numOfPulsar = isContainingPulsar.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
numOfPulsar.print();
jssc.start();
jssc.awaitTermination();
}
项目:rheem
文件:FunctionCompiler.java
/**
* Create an appropriate {@link Function} for deploying the given {@link MapPartitionsDescriptor}
* on Apache Spark's {@link JavaRDD#mapPartitions(FlatMapFunction)}.
*
* @param descriptor describes the function
* @param operator that executes the {@link Function}; only required if the {@code descriptor} describes an {@link ExtendedFunction}
* @param operatorContext contains optimization information for the {@code operator}
* @param inputs that feed the {@code operator}; only required if the {@code descriptor} describes an {@link ExtendedFunction}
*/
public <I, O> FlatMapFunction<Iterator<I>, O> compile(MapPartitionsDescriptor<I, O> descriptor,
SparkExecutionOperator operator,
OptimizationContext.OperatorContext operatorContext,
ChannelInstance[] inputs) {
final java.util.function.Function<Iterable<I>, Iterable<O>> javaImplementation = descriptor.getJavaImplementation();
if (javaImplementation instanceof FunctionDescriptor.ExtendedSerializableFunction) {
return new ExtendedMapPartitionsFunctionAdapter<>(
(FunctionDescriptor.ExtendedSerializableFunction<Iterable<I>, Iterable<O>>) javaImplementation,
new SparkExecutionContext(operator, inputs, operatorContext.getOptimizationContext().getIterationNumber())
);
} else {
return new MapPartitionsFunctionAdapter<>(javaImplementation);
}
}
项目:rheem
文件:FunctionCompiler.java
/**
* Create an appropriate {@link FlatMapFunction} for deploying the given {@link FlatMapDescriptor}
* on Apache Spark.
*
* @param descriptor describes the function
* @param operator that executes the {@link Function}; only required if the {@code descriptor} describes an {@link ExtendedFunction}
* @param operatorContext contains optimization information for the {@code operator}
* @param inputs that feed the {@code operator}; only required if the {@code descriptor} describes an {@link ExtendedFunction}
*/
public <I, O> FlatMapFunction<I, O> compile(FlatMapDescriptor<I, O> descriptor,
SparkExecutionOperator operator,
OptimizationContext.OperatorContext operatorContext,
ChannelInstance[] inputs) {
final java.util.function.Function<I, Iterable<O>> javaImplementation = descriptor.getJavaImplementation();
if (javaImplementation instanceof FunctionDescriptor.ExtendedSerializableFunction) {
return new ExtendedFlatMapFunctionAdapter<>(
(FunctionDescriptor.ExtendedSerializableFunction<I, Iterable<O>>) javaImplementation,
new SparkExecutionContext(operator, inputs, operatorContext.getOptimizationContext().getIterationNumber())
);
} else {
return new FlatMapFunctionAdapter<>(javaImplementation);
}
}
项目:envelope
文件:MorphlineUtils.java
@SuppressWarnings("serial")
public static FlatMapFunction<Row, Row> morphlineMapper(final String morphlineFile, final String morphlineId,
final StructType outputSchema) {
return new FlatMapFunction<Row, Row>() {
@Override
public Iterator<Row> call(Row row) throws Exception {
// Retrieve the Command pipeline via ThreadLocal
Pipeline pipeline = MorphlineUtils.getPipeline(morphlineFile, morphlineId);
if (null == pipeline) {
pipeline = MorphlineUtils.setPipeline(morphlineFile, morphlineId, new Collector(), true);
}
// Convert each Row into a Record
StructType inputSchema = row.schema();
if (null == inputSchema) {
throw new RuntimeException("Row does not have an associated StructType schema");
}
Record inputRecord = new Record();
String[] fieldNames = inputSchema.fieldNames();
// TODO : Confirm nested object conversion
for (int i = 0; i < fieldNames.length; i++) {
inputRecord.put(fieldNames[i], row.get(i));
}
// Process each Record via the Command pipeline
List<Record> outputRecords = MorphlineUtils.executePipeline(pipeline, inputRecord);
// Convert each Record into a new Row
List<Row> outputRows = Lists.newArrayListWithCapacity(outputRecords.size());
for (Record record : outputRecords) {
outputRows.add(MorphlineUtils.convertToRow(outputSchema, record));
}
return outputRows.iterator();
}
};
}
项目:envelope
文件:TestMorphlineUtils.java
@Test
public void morphlineMapper(
final @Mocked MorphlineUtils.Pipeline pipeline,
final @Mocked Row row,
final @Mocked StructType schema
) throws Exception {
new Expectations(MorphlineUtils.class) {{
MorphlineUtils.getPipeline("file", "id"); result = pipeline; times = 1;
MorphlineUtils.executePipeline(pipeline, (Record) any); result = Lists.newArrayList(); times = 1;
row.schema(); result = schema;
row.get(anyInt); returns("val1", "val2"); times = 2;
schema.fieldNames(); result = new String[] { "one", "two"};
}};
FlatMapFunction<Row, Row> function = MorphlineUtils.morphlineMapper("file", "id", schema);
Iterator<Row> results = function.call(row);
assertEquals("Invalid number of Rows returned", 0, Lists.newArrayList(results).size());
new Verifications() {{
Record record;
MorphlineUtils.executePipeline(pipeline, record = withCapture());
assertEquals(2, record.getFields().size());
assertEquals("val1", record.get("one").get(0));
}};
}
项目:envelope
文件:TestMorphlineUtils.java
@Test
public void morphlineMapperNoPipeline(
final @Mocked MorphlineUtils.Pipeline pipeline,
final @Mocked Row row,
final @Mocked StructType schema
) throws Exception {
new Expectations(MorphlineUtils.class) {{
MorphlineUtils.getPipeline("file", "id"); result = null; times = 1;
MorphlineUtils.setPipeline("file", "id", (MorphlineUtils.Collector) any, true); result = pipeline; times = 1;
MorphlineUtils.executePipeline(pipeline, (Record) any); result = Lists.newArrayList(); times = 1;
row.schema(); result = schema;
row.get(anyInt); returns("val1", "val2"); times = 2;
schema.fieldNames(); result = new String[] { "one", "two"};
}};
FlatMapFunction<Row, Row> function = MorphlineUtils.morphlineMapper("file", "id", schema);
Iterator<Row> results = function.call(row);
assertEquals("Invalid number of Rows returned", 0, Lists.newArrayList(results).size());
new Verifications() {{
Record record;
MorphlineUtils.executePipeline(pipeline, record = withCapture());
assertEquals(2, record.getFields().size());
assertEquals("val1", record.get("one").get(0));
}};
}
项目:envelope
文件:TestMorphlineUtils.java
@Test (expected = RuntimeException.class)
public void morphlineMapperNoSchema(
final @Mocked MorphlineUtils.Pipeline pipeline,
final @Mocked Row row,
final @Mocked StructType schema
) throws Exception {
new Expectations(MorphlineUtils.class) {{
MorphlineUtils.getPipeline("file", "id"); result = pipeline; times = 1;
row.schema(); result = null;
}};
FlatMapFunction<Row, Row> function = MorphlineUtils.morphlineMapper("file", "id", schema);
function.call(row);
}
项目:federator
文件:SparkStreaming.java
@Override
public String insert(Entity entity, final Set<Value> values) throws ParseException, Exception {
JavaDStream<Value> cache = lines.flatMap(new FlatMapFunction<String, Value>() {
@Override
public Iterable<Value> call(String x) {
return values;
}
});
cache.persist();
return entity.getId();
}
项目:federator
文件:SparkRedisStreaming.java
@Override
public String insert(Entity entity, final Set<Value> values) throws ParseException, Exception {
JavaDStream<Value> cache = lines.flatMap(new FlatMapFunction<String, Value>() {
@Override
public Iterable<Value> call(String x) {
return values;
}
});
cache.persist();
return entity.getId();
}
项目:StreamBench
文件:SparkJoinTest.java
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkJoinTest");
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));
ssc.checkpoint("checkpoint");
JavaReceiverInputDStream<String> lines = ssc.socketTextStream("127.0.0.1", 9999);
JavaPairDStream<String, Long> nameStream = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String l) throws Exception {
return Arrays.asList(l.split(" "));
}
}).mapToPair(new PairFunction<String, String, Long>() {
public Tuple2<String, Long> call(String w) throws Exception {
return new Tuple2<>(w, 1L);
}
}).window(Durations.seconds(30), Durations.seconds(10));
JavaReceiverInputDStream<String> lines2 = ssc.socketTextStream("127.0.0.1", 9998);
JavaPairDStream<String, Long> nameAgeStream = lines2.mapToPair(new PairFunction<String, String, Long>() {
@Override
public Tuple2<String, Long> call(String s) throws Exception {
String[] list = s.split(" ");
String name = list[0];
long age = 0L;
if (list.length > 1)
age = Long.parseLong(list[1]);
return new Tuple2<String, Long>(name, age);
}
}).window(Durations.seconds(11), Durations.seconds(11));
// nameStream.print();
// nameAgeStream.print();
JavaPairDStream<String, Tuple2<Long, Long>> joinedStream = nameStream.join(nameAgeStream);
joinedStream.print();
ssc.start();
ssc.awaitTermination();
}
项目:gatk-protected
文件:HaplotypeCallerSpark.java
/**
* Call variants from Tuples of AssemblyRegion and Simple Interval
* The interval should be the non-padded shard boundary for the shard that the corresponding AssemblyRegion was
* created in, it's used to eliminate redundant variant calls at the edge of shard boundaries.
*/
private static FlatMapFunction<Iterator<Tuple2<AssemblyRegion, SimpleInterval>>, VariantContext> callVariantsFromAssemblyRegions(
final AuthHolder authHolder,
final SAMFileHeader header,
final Broadcast<ReferenceMultiSource> referenceBroadcast,
final Broadcast<HaplotypeCallerArgumentCollection> hcArgsBroadcast) {
return regionAndIntervals -> {
//HaplotypeCallerEngine isn't serializable but is expensive to instantiate, so construct and reuse one for every partition
final ReferenceMultiSourceAdapter referenceReader = new ReferenceMultiSourceAdapter(referenceBroadcast.getValue(), authHolder);
final HaplotypeCallerEngine hcEngine = new HaplotypeCallerEngine(hcArgsBroadcast.value(), header, referenceReader);
return iteratorToStream(regionAndIntervals).flatMap(regionToVariants(hcEngine)).iterator();
};
}