Java 类org.apache.spark.api.java.function.MapFunction 实例源码
项目:bunsen
文件:Loinc.java
/**
* Reads the LOINC mutliaxial hierarchy file and converts it to a {@link HierarchicalElement}
* dataset.
*
* @param spark the Spark session
* @param loincHierarchyPath path to the multiaxial hierarchy CSV
* @return a dataset of {@link HierarchicalElement} representing the hierarchical relationship.
*/
public static Dataset<HierarchicalElement> readMultiaxialHierarchyFile(SparkSession spark,
String loincHierarchyPath) {
return spark.read()
.option("header", true)
.csv(loincHierarchyPath)
.select(col("IMMEDIATE_PARENT"), col("CODE"))
.where(col("IMMEDIATE_PARENT").isNotNull()
.and(col("IMMEDIATE_PARENT").notEqual(lit(""))))
.where(col("CODE").isNotNull()
.and(col("CODE").notEqual(lit(""))))
.map((MapFunction<Row, HierarchicalElement>) row -> {
HierarchicalElement element = new HierarchicalElement();
element.setAncestorSystem(LOINC_CODE_SYSTEM_URI);
element.setAncestorValue(row.getString(0));
element.setDescendantSystem(LOINC_CODE_SYSTEM_URI);
element.setDescendantValue(row.getString(1));
return element;
}, Hierarchies.getHierarchicalElementEncoder());
}
项目:bunsen
文件:ConceptMaps.java
/**
* Returns a new ConceptMaps instance that includes the given maps.
*
* @param conceptMaps concept maps to add to the returned collection.
* @return a new ConceptMaps instance with the values added.
*/
public ConceptMaps withConceptMaps(Dataset<ConceptMap> conceptMaps) {
Dataset<UrlAndVersion> newMembers = getUrlAndVersions(conceptMaps);
if (hasDuplicateUrlAndVersions(newMembers) || conceptMaps.count() != newMembers.count()) {
throw new IllegalArgumentException(
"Cannot add concept maps having duplicate conceptMapUri and conceptMapVersion");
}
// Remove the concept contents for persistence. This is most easily done in the ConceptMap
// object by setting the group to an empty list.
Dataset<ConceptMap> withoutConcepts = conceptMaps
.map((MapFunction<ConceptMap,ConceptMap>) conceptMap -> {
// Remove the elements rather than the groups to preserved the
// "unmapped" structure in a group that can refer to other
// concept maps.
ConceptMap withoutElements = conceptMap.copy();
List<ConceptMapGroupComponent> updatedGroups = new ArrayList<>();
for (ConceptMapGroupComponent group: withoutElements.getGroup()) {
group.setElement(new ArrayList<>());
updatedGroups.add(group);
}
withoutElements.setGroup(updatedGroups);
return withoutElements;
}, CONCEPT_MAP_ENCODER);
Dataset<Mapping> newMappings = conceptMaps.flatMap(ConceptMaps::expandMappingsIterator,
MAPPING_ENCODER);
return withConceptMaps(withoutConcepts, newMappings);
}
项目:gatk
文件:SparkSharder.java
/**
* Join an RDD of locatables with a set of intervals, and apply a function to process the locatables that overlap each interval.
* @param ctx the Spark Context
* @param locatables the locatables RDD, must be coordinate sorted
* @param locatableClass the class of the locatables, must be a subclass of {@link Locatable}
* @param sequenceDictionary the sequence dictionary to use to find contig lengths
* @param intervals the collection of intervals to apply the function to
* @param maxLocatableLength the maximum length of a {@link Locatable}, if any is larger than this size then an exception will be thrown
* @param f the function to process intervals and overlapping locatables with
* @param <L> the {@link Locatable} type
* @param <I> the interval type
* @param <T> the return type of <code>f</code>
* @return
*/
private static <L extends Locatable, I extends Locatable, T> JavaRDD<T> joinOverlapping(JavaSparkContext ctx, JavaRDD<L> locatables, Class<L> locatableClass,
SAMSequenceDictionary sequenceDictionary, List<I> intervals,
int maxLocatableLength, MapFunction<Tuple2<I, Iterable<L>>, T> f) {
return joinOverlapping(ctx, locatables, locatableClass, sequenceDictionary, intervals, maxLocatableLength,
(FlatMapFunction2<Iterator<L>, Iterator<I>, T>) (locatablesIterator, shardsIterator) -> Iterators.transform(locatablesPerShard(locatablesIterator, shardsIterator, sequenceDictionary, maxLocatableLength), new Function<Tuple2<I,Iterable<L>>, T>() {
@Nullable
@Override
public T apply(@Nullable Tuple2<I, Iterable<L>> input) {
try {
return f.call(input);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}));
}
项目:bunsen
文件:Snomed.java
/**
* Reads a Snomed relationship file and converts it to a {@link HierarchicalElement} dataset.
*
* @param spark the Spark session
* @param snomedRelationshipPath path to the SNOMED relationship file
* @return a dataset of{@link HierarchicalElement} representing the hierarchical relationship.
*/
public static Dataset<HierarchicalElement> readRelationshipFile(SparkSession spark,
String snomedRelationshipPath) {
return spark.read()
.option("header", true)
.option("delimiter", "\t")
.csv(snomedRelationshipPath)
.where(col("typeId").equalTo(lit(SNOMED_ISA_RELATIONSHIP_ID)))
.where(col("active").equalTo(lit("1")))
.select(col("destinationId"), col("sourceId"))
.where(col("destinationId").isNotNull()
.and(col("destinationId").notEqual(lit(""))))
.where(col("sourceId").isNotNull()
.and(col("sourceId").notEqual(lit(""))))
.map((MapFunction<Row, HierarchicalElement>) row -> {
HierarchicalElement element = new HierarchicalElement();
element.setAncestorSystem(SNOMED_CODE_SYSTEM_URI);
element.setAncestorValue(row.getString(0));
element.setDescendantSystem(SNOMED_CODE_SYSTEM_URI);
element.setDescendantValue(row.getString(1));
return element;
}, Hierarchies.getHierarchicalElementEncoder());
}
项目:video-stream-analytics
文件:VideoStreamProcessor.java
public static void main(String[] args) throws Exception {
//Read properties
Properties prop = PropertyFileReader.readPropertyFile();
//SparkSesion
SparkSession spark = SparkSession
.builder()
.appName("VideoStreamProcessor")
.master(prop.getProperty("spark.master.url"))
.getOrCreate();
//directory to save image files with motion detected
final String processedImageDir = prop.getProperty("processed.output.dir");
logger.warn("Output directory for saving processed images is set to "+processedImageDir+". This is configured in processed.output.dir key of property file.");
//create schema for json message
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("cameraId", DataTypes.StringType, true),
DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
DataTypes.createStructField("rows", DataTypes.IntegerType, true),
DataTypes.createStructField("cols", DataTypes.IntegerType, true),
DataTypes.createStructField("type", DataTypes.IntegerType, true),
DataTypes.createStructField("data", DataTypes.StringType, true)
});
//Create DataSet from stream messages from kafka
Dataset<VideoEventData> ds = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"))
.option("subscribe", prop.getProperty("kafka.topic"))
.option("kafka.max.partition.fetch.bytes", prop.getProperty("kafka.max.partition.fetch.bytes"))
.option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records"))
.load()
.selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"),schema).as("json"))
.select("json.*")
.as(Encoders.bean(VideoEventData.class));
//key-value pair of cameraId-VideoEventData
KeyValueGroupedDataset<String, VideoEventData> kvDataset = ds.groupByKey(new MapFunction<VideoEventData, String>() {
@Override
public String call(VideoEventData value) throws Exception {
return value.getCameraId();
}
}, Encoders.STRING());
//process
Dataset<VideoEventData> processedDataset = kvDataset.mapGroupsWithState(new MapGroupsWithStateFunction<String, VideoEventData, VideoEventData,VideoEventData>(){
@Override
public VideoEventData call(String key, Iterator<VideoEventData> values, GroupState<VideoEventData> state) throws Exception {
logger.warn("CameraId="+key+" PartitionId="+TaskContext.getPartitionId());
VideoEventData existing = null;
//check previous state
if (state.exists()) {
existing = state.get();
}
//detect motion
VideoEventData processed = VideoMotionDetector.detectMotion(key,values,processedImageDir,existing);
//update last processed
if(processed != null){
state.update(processed);
}
return processed;
}}, Encoders.bean(VideoEventData.class), Encoders.bean(VideoEventData.class));
//start
StreamingQuery query = processedDataset.writeStream()
.outputMode("update")
.format("console")
.start();
//await
query.awaitTermination();
}
项目:bunsen
文件:ValueSets.java
/**
* Returns a new ValueSets instance that includes the given value sets.
*
* @param valueSets the value sets to add to the returned collection.
* @return a new ValueSets instance with the added value sets.
*/
public ValueSets withValueSets(Dataset<ValueSet> valueSets) {
Dataset<UrlAndVersion> newMembers = getUrlAndVersions(valueSets);
// Ensure that there are no duplicates among the value sets
if (hasDuplicateUrlAndVersions(newMembers) || valueSets.count() != newMembers.count()) {
throw new IllegalArgumentException(
"Cannot add value sets having duplicate valueSetUri and valueSetVersion");
}
// The value set concepts will be stored in the values table for persistence, so we remove
// them from the individual value sets. This can be done most easily by setting concepts to an
// empty list.
Dataset<ValueSet> withoutConcepts = valueSets.map((MapFunction<ValueSet,ValueSet>) valueSet -> {
ValueSet valueSetWithoutConcepts = valueSet.copy();
List<ConceptSetComponent> updatedInclusions = new ArrayList<>();
for (ConceptSetComponent inclusion: valueSet.getCompose().getInclude()) {
ConceptSetComponent inclusionWithoutConcepts = inclusion.copy();
inclusionWithoutConcepts.setConcept(new ArrayList<>());
updatedInclusions.add(inclusionWithoutConcepts);
}
valueSetWithoutConcepts.getCompose().setInclude(updatedInclusions);
return valueSetWithoutConcepts;
}, VALUE_SET_ENCODER);
Dataset<Value> newValues = valueSets.flatMap(ValueSets::expandValuesIterator, VALUE_ENCODER);
return withValueSets(withoutConcepts, newValues);
}