Java 类org.apache.spark.api.java.function.Function 实例源码
项目:vn.vitk
文件:NGramBuilder.java
/**
* Creates a n-gram data frame from text lines.
* @param lines
* @return a n-gram data frame.
*/
DataFrame createNGramDataFrame(JavaRDD<String> lines) {
JavaRDD<Row> rows = lines.map(new Function<String, Row>(){
private static final long serialVersionUID = -4332903997027358601L;
@Override
public Row call(String line) throws Exception {
return RowFactory.create(Arrays.asList(line.split("\\s+")));
}
});
StructType schema = new StructType(new StructField[] {
new StructField("words",
DataTypes.createArrayType(DataTypes.StringType), false,
Metadata.empty()) });
DataFrame wordDF = new SQLContext(jsc).createDataFrame(rows, schema);
// build a bigram language model
NGram transformer = new NGram().setInputCol("words")
.setOutputCol("ngrams").setN(2);
DataFrame ngramDF = transformer.transform(wordDF);
ngramDF.show(10, false);
return ngramDF;
}
项目:vn.vitk
文件:DependencyParser.java
/**
* Parses a list of PoS-tagged sentences, each on a line and writes the result to an output
* file in a specified output format.
* @param jsc
* @param sentences
* @param outputFileName
* @param outuptFormat
*/
public void parse(JavaSparkContext jsc, List<String> sentences, String outputFileName, OutputFormat outputFormat) {
JavaRDD<String> input = jsc.parallelize(sentences);
JavaRDD<Sentence> sents = input.map(new TaggedLineToSentenceFunction());
JavaRDD<DependencyGraph> graphs = sents.map(new ParsingFunction());
JavaRDD<Row> rows = graphs.map(new Function<DependencyGraph, Row>() {
private static final long serialVersionUID = -812004521983071103L;
public Row call(DependencyGraph graph) {
return RowFactory.create(graph.getSentence().toString(), graph.dependencies());
}
});
StructType schema = new StructType(new StructField[]{
new StructField("sentence", DataTypes.StringType, false, Metadata.empty()),
new StructField("dependency", DataTypes.StringType, false, Metadata.empty())
});
SQLContext sqlContext = new SQLContext(jsc);
DataFrame df = sqlContext.createDataFrame(rows, schema);
if (outputFormat == OutputFormat.TEXT)
df.select("dependency").write().text(outputFileName);
else
df.repartition(1).write().json(outputFileName);
}
项目:Java-Data-Science-Cookbook
文件:KMeansClusteringMlib.java
public static void main( String[] args ){
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("K-means Example");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load and parse data
String path = "data/km-data.txt";
JavaRDD<String> data = sc.textFile(path);
JavaRDD<Vector> parsedData = data.map(
new Function<String, Vector>() {
public Vector call(String s) {
String[] sarray = s.split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++)
values[i] = Double.parseDouble(sarray[i]);
return Vectors.dense(values);
}
}
);
parsedData.cache();
// Cluster the data into two classes using KMeans
int numClusters = 2;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
}
项目:Spark-Machine-Learning-Modules
文件:PredictUnit.java
public static JavaRDD<Tuple2<Object, Object>> predictForOutput_LogisticRegressionModel(LogisticRegressionModel model, JavaRDD<LabeledPoint> data){
JavaRDD<Tuple2<Object, Object>> FeaturesAndPrediction = data.map(
new Function<LabeledPoint, Tuple2<Object, Object>>() {
private static final long serialVersionUID = 1L;
public Tuple2<Object, Object> call(LabeledPoint p) {
Double prediction = model.predict(p.features());
return new Tuple2<Object, Object>(p.features(), prediction);
}
}
);
return FeaturesAndPrediction;
}
项目:fst-bench
文件:JavaSleep.java
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Usage: JavaSleep <seconds>");
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaSleep");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
Integer parallel = sparkConf.getInt("spark.default.parallelism", ctx.defaultParallelism());
Integer seconds = Integer.parseInt(args[0]);
Integer[] init_val = new Integer[parallel];
Arrays.fill(init_val, seconds);
JavaRDD<Integer> workload = ctx.parallelize(Arrays.asList(init_val), parallel).map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer s) throws InterruptedException {
Thread.sleep(s * 1000);
return 0;
}
});
List<Integer> output = workload.collect();
ctx.stop();
}
项目:SparkJNI
文件:StreamUtils.java
static List<StreamVectors> performJavaStream(String appName, List<StreamVectors> input, int noIters) {
JavaRDD<StreamVectors> streamVectorsJavaRDD = ExampleUtils.getSparkContext(appName).parallelize(input);
for (int i = 0; i < noIters; i++) {
streamVectorsJavaRDD = streamVectorsJavaRDD.map(new Function<StreamVectors, StreamVectors>() {
@Override
public StreamVectors call(StreamVectors streamVectors) throws Exception {
streamVectors.setStartRun(System.nanoTime());
for (int idx = 0; idx < streamVectors.A.length; idx++) {
streamVectors.C[idx] = streamVectors.A[idx];
}
for (int idx = 0; idx < streamVectors.A.length; idx++) {
streamVectors.B[idx] = streamVectors.scaling_constant * streamVectors.C[idx];
}
for (int idx = 0; idx < streamVectors.A.length; idx++) {
streamVectors.C[idx] = streamVectors.A[idx] + streamVectors.B[idx];
}
for (int idx = 0; idx < streamVectors.A.length; idx++) {
streamVectors.A[idx] = streamVectors.B[idx] + streamVectors.scaling_constant * streamVectors.C[idx];
}
streamVectors.setEndRun(System.nanoTime());
return streamVectors;
}
});
}
return streamVectorsJavaRDD.collect();
}
项目:SparkJNI
文件:StreamMain.java
private List<StreamVectors> performJavaStream(String appName, List<StreamVectors> input) {
return ExampleUtils.getSparkContext(appName).parallelize(input).map(new Function<StreamVectors, StreamVectors>() {
@Override
public StreamVectors call(StreamVectors streamVectors) throws Exception {
streamVectors.setStartRun(System.nanoTime());
for(int idx = 0; idx < streamVectors.A.length; idx++){
streamVectors.C[idx] = streamVectors.A[idx];
}
for(int idx = 0; idx < streamVectors.A.length; idx++){
streamVectors.B[idx] = streamVectors.scaling_constant * streamVectors.C[idx];
}
for(int idx = 0; idx < streamVectors.A.length; idx++){
streamVectors.C[idx] = streamVectors.A[idx] + streamVectors.B[idx];
}
for(int idx = 0; idx < streamVectors.A.length; idx++){
streamVectors.A[idx] = streamVectors.B[idx] + streamVectors.scaling_constant * streamVectors.C[idx];
}
streamVectors.setEndRun(System.nanoTime());
return streamVectors;
}
}).collect();
}
项目:splice-community-sample-code
文件:SaveLogAggRDD.java
@Override
public void call(JavaPairRDD<PublisherGeoKey, AggregationLog> logsRDD) throws Exception {
if (logsRDD != null) {
LOG.info(" Data to process in RDD:" + logsRDD.count());
JavaRDD<AggregationResult> aggResRDD = logsRDD.map(new Function<Tuple2<PublisherGeoKey, AggregationLog>, AggregationResult>() {
@Override
public AggregationResult call(
Tuple2<PublisherGeoKey, AggregationLog> arg0)
throws Exception {
PublisherGeoKey p = arg0._1;
AggregationLog a = arg0._2;
return new AggregationResult(new Timestamp(a.getTimestamp()),
p.getPublisher(), p.getGeo(), a.getImps(),
(int) a.getUniquesHll().estimatedSize(),
a.getSumBids() / a.getImps());
}
});
LOG.info(" Call Data Process Partition");
aggResRDD.foreachPartition(new SaveLogAggPartition());
} else
LOG.error("Data to process:" + 0);
}
项目:Sparkathon
文件:PassingFunctions.java
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Big Apple").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
class GetLength implements Function<String, Integer> {
public Integer call(String s) {
return s.length();
}
}
class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) {
return a + b;
}
}
JavaRDD<String> lines = sc.textFile("src/main/resources/compressed.gz");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
// Printing an RDD
lineLengths.foreach(x-> System.out.println(x));
int totalLength = lineLengths.reduce(new Sum());
System.out.println(totalLength);
}
项目:rheem
文件:FunctionCompiler.java
/**
* Create an appropriate {@link Function}-based predicate for deploying the given {@link PredicateDescriptor}
* on Apache Spark.
*
* @param predicateDescriptor describes the function
* @param operator that executes the {@link Function}; only required if the {@code descriptor} describes an {@link ExtendedFunction}
* @param operatorContext contains optimization information for the {@code operator}
* @param inputs that feed the {@code operator}; only required if the {@code descriptor} describes an {@link ExtendedFunction}
*/
public <Type> Function<Type, Boolean> compile(
PredicateDescriptor<Type> predicateDescriptor,
SparkExecutionOperator operator,
OptimizationContext.OperatorContext operatorContext,
ChannelInstance[] inputs) {
final Predicate<Type> javaImplementation = predicateDescriptor.getJavaImplementation();
if (javaImplementation instanceof PredicateDescriptor.ExtendedSerializablePredicate) {
return new ExtendedPredicateAdapater<>(
(PredicateDescriptor.ExtendedSerializablePredicate<Type>) javaImplementation,
new SparkExecutionContext(operator, inputs, operatorContext.getOptimizationContext().getIterationNumber())
);
} else {
return new PredicateAdapter<>(javaImplementation);
}
}
项目:vn.vitk
文件:Tagger.java
private JavaRDD<String> toTaggedSentence(DataFrame output) {
return output.javaRDD().map(new Function<Row, String>() {
private static final long serialVersionUID = 4208643510231783579L;
@Override
public String call(Row row) throws Exception {
String[] tokens = row.getString(0).trim().split("\\s+");
String[] tags = row.getString(1).trim().split("\\s+");
if (tokens.length != tags.length) {
System.err.println("Incompatible lengths!");
return null;
}
StringBuilder sb = new StringBuilder(64);
for (int j = 0; j < tokens.length; j++) {
sb.append(tokens[j]);
sb.append('/');
sb.append(tags[j]);
sb.append(' ');
}
return sb.toString().trim();
}
});
}
项目:vn.vitk
文件:Tokenizer.java
/**
* Counts the number of non-space characters in this data set. This utility method
* is used to check the tokenization result.
* @param lines
* @return number of characters
*/
int numCharacters(JavaRDD<String> lines) {
JavaRDD<Integer> lengths = lines.map(new Function<String, Integer>() {
private static final long serialVersionUID = -2189399343462982586L;
@Override
public Integer call(String line) throws Exception {
line = line.replaceAll("[\\s_]+", "");
return line.length();
}
});
return lengths.reduce(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = -8438072946884289401L;
@Override
public Integer call(Integer e0, Integer e1) throws Exception {
return e0 + e1;
}
});
}
项目:vn.vitk
文件:DependencyParser.java
/**
* Parses all sentences in an input file, each on a line and writes the result to
* the console window containing flattened dependency tuples.
* @param jsc
* @param inputFileName
*/
public void parse(JavaSparkContext jsc, String inputFileName) {
List<String> sentences = jsc.textFile(inputFileName).collect();
JavaRDD<String> input = jsc.parallelize(sentences);
JavaRDD<Sentence> sents = input.map(new TaggedLineToSentenceFunction());
JavaRDD<DependencyGraph> graphs = sents.map(new ParsingFunction());
JavaRDD<String> rows = graphs.map(new Function<DependencyGraph, String>() {
private static final long serialVersionUID = -6021310762521034121L;
public String call(DependencyGraph graph) {
return graph.dependencies();
}
});
for (String s : rows.collect()) {
System.out.println(s);
}
}
项目:SparkToParquet
文件:AppMain.java
public static void main(String[] args) throws IOException {
Flags.setFromCommandLineArgs(THE_OPTIONS, args);
// 初始化Spark Conf.
SparkConf conf = new SparkConf().setAppName("A SECTONG Application: Apache Log Analysis with Spark");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, Flags.getInstance().getSlideInterval());
SQLContext sqlContext = new SQLContext(sc);
// 初始化参数
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(Flags.getInstance().getKafka_topic().split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", Flags.getInstance().getKafka_broker());
// 从Kafka Stream获取数据
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 5266880065425088203L;
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<ApacheAccessLog> accessLogsDStream = lines.flatMap(line -> {
List<ApacheAccessLog> list = new ArrayList<>();
try {
// 映射每一行
list.add(ApacheAccessLog.parseFromLogLine(line));
return list;
} catch (RuntimeException e) {
return list;
}
}).cache();
accessLogsDStream.foreachRDD(rdd -> {
// rdd to DataFrame
DataFrame df = sqlContext.createDataFrame(rdd, ApacheAccessLog.class);
// 写入Parquet文件
df.write().partitionBy("ipAddress", "method", "responseCode").mode(SaveMode.Append).parquet(Flags.getInstance().getParquetFile());
return null;
});
// 启动Streaming服务器
jssc.start(); // 启动计算
jssc.awaitTermination(); // 等待终止
}
项目:beam
文件:TranslationUtils.java
/** {@link KV} to pair flatmap function. */
public static <K, V> PairFlatMapFunction<Iterator<KV<K, V>>, K, V> toPairFlatMapFunction() {
return new PairFlatMapFunction<Iterator<KV<K, V>>, K, V>() {
@Override
public Iterator<Tuple2<K, V>> call(final Iterator<KV<K, V>> itr) {
final Iterator<Tuple2<K, V>> outputItr =
Iterators.transform(
itr,
new com.google.common.base.Function<KV<K, V>, Tuple2<K, V>>() {
@Override
public Tuple2<K, V> apply(KV<K, V> kv) {
return new Tuple2<>(kv.getKey(), kv.getValue());
}
});
return outputItr;
}
};
}
项目:beam
文件:TranslationUtils.java
/** A pair to {@link KV} flatmap function . */
static <K, V> FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>> fromPairFlatMapFunction() {
return new FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>>() {
@Override
public Iterator<KV<K, V>> call(Iterator<Tuple2<K, V>> itr) {
final Iterator<KV<K, V>> outputItr =
Iterators.transform(
itr,
new com.google.common.base.Function<Tuple2<K, V>, KV<K, V>>() {
@Override
public KV<K, V> apply(Tuple2<K, V> t2) {
return KV.of(t2._1(), t2._2());
}
});
return outputItr;
}
};
}
项目:beam
文件:TranslationUtils.java
/**
* A utility method that adapts {@link PairFunction} to a {@link PairFlatMapFunction} with an
* {@link Iterator} input. This is particularly useful because it allows to use functions written
* for mapToPair functions in flatmapToPair functions.
*
* @param pairFunction the {@link PairFunction} to adapt.
* @param <T> the input type.
* @param <K> the output key type.
* @param <V> the output value type.
* @return a {@link PairFlatMapFunction} that accepts an {@link Iterator} as an input and applies
* the {@link PairFunction} on every element.
*/
public static <T, K, V> PairFlatMapFunction<Iterator<T>, K, V> pairFunctionToPairFlatMapFunction(
final PairFunction<T, K, V> pairFunction) {
return new PairFlatMapFunction<Iterator<T>, K, V>() {
@Override
public Iterator<Tuple2<K, V>> call(Iterator<T> itr) throws Exception {
final Iterator<Tuple2<K, V>> outputItr =
Iterators.transform(
itr,
new com.google.common.base.Function<T, Tuple2<K, V>>() {
@Override
public Tuple2<K, V> apply(T t) {
try {
return pairFunction.call(t);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
return outputItr;
}
};
}
项目:beam
文件:TranslationUtils.java
/**
* A utility method that adapts {@link Function} to a {@link FlatMapFunction} with an {@link
* Iterator} input. This is particularly useful because it allows to use functions written for map
* functions in flatmap functions.
*
* @param func the {@link Function} to adapt.
* @param <InputT> the input type.
* @param <OutputT> the output type.
* @return a {@link FlatMapFunction} that accepts an {@link Iterator} as an input and applies the
* {@link Function} on every element.
*/
public static <InputT, OutputT>
FlatMapFunction<Iterator<InputT>, OutputT> functionToFlatMapFunction(
final Function<InputT, OutputT> func) {
return new FlatMapFunction<Iterator<InputT>, OutputT>() {
@Override
public Iterator<OutputT> call(Iterator<InputT> itr) throws Exception {
final Iterator<OutputT> outputItr =
Iterators.transform(
itr,
new com.google.common.base.Function<InputT, OutputT>() {
@Override
public OutputT apply(InputT t) {
try {
return func.call(t);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
return outputItr;
}
};
}
项目:beam
文件:CoderHelpers.java
/**
* A function wrapper for converting a byte array pair to a key-value pair, where
* values are {@link Iterable}.
*
* @param keyCoder Coder to deserialize keys.
* @param valueCoder Coder to deserialize values.
* @param <K> The type of the key being deserialized.
* @param <V> The type of the value being deserialized.
* @return A function that accepts a pair of byte arrays and returns a key-value pair.
*/
public static <K, V> PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>
fromByteFunctionIterable(final Coder<K> keyCoder, final Coder<V> valueCoder) {
return new PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>() {
@Override
public Tuple2<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> tuple) {
return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder),
Iterables.transform(tuple._2(), new com.google.common.base.Function<byte[], V>() {
@Override
public V apply(byte[] bytes) {
return fromByteArray(bytes, valueCoder);
}
}));
}
};
}
项目:net.jgp.labs.spark
文件:RowProcessor.java
@Override
public void call(JavaRDD<String> rdd) throws Exception {
JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
private static final long serialVersionUID = 5167089361335095997L;
@Override
public Row call(String msg) {
Row row = RowFactory.create(msg);
return row;
}
});
// Create Schema
StructType schema = DataTypes.createStructType(
new StructField[] { DataTypes.createStructField("Message", DataTypes.StringType, true) });
// Get Spark 2.0 session
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> msgDataFrame = spark.createDataFrame(rowRDD, schema);
msgDataFrame.show();
}
项目:es-hadoop-v2.2.0
文件:AbstractJavaEsSparkTest.java
public void testEsRDDZReadJson() throws Exception {
String target = "spark-test/java-basic-json-read";
RestUtils.touch("spark-test");
RestUtils.postData(target, "{\"message\" : \"Hello World\",\"message_date\" : \"2014-05-25\"}".getBytes());
RestUtils.postData(target, "{\"message\" : \"Goodbye World\",\"message_date\" : \"2014-05-25\"}".getBytes());
RestUtils.refresh("spark-test");
JavaRDD<String> esRDD = JavaEsSpark.esJsonRDD(sc, target).values();
System.out.println(esRDD.collect());
JavaRDD<String> messages = esRDD.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String string) throws Exception {
return string.contains("message");
}
});
// jdk8
//esRDD.filter(m -> m.contains("message")));
assertThat((int) messages.count(), is(2));
System.out.println(messages.take(10));
System.out.println(messages);
}
项目:spark-transformers
文件:IfZeroVectorBridgeTest.java
public DataFrame createDF(JavaRDD<Tuple2<Vector, String>> rdd) {
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("vectorized_count", new VectorUDT(), true));
fields.add(DataTypes.createStructField("product_title", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows.
JavaRDD<Row> rowRDD = rdd.map(
new Function<Tuple2<Vector, String>, Row>() {
public Row call(Tuple2<Vector, String> record) {
return RowFactory.create(record._1(), record._2());
}
});
return sqlContext.createDataFrame(rowRDD, schema);
}
项目:spark_log_data
文件:LogDataWebinar.java
private static JavaDStream<String> createDStream(JavaStreamingContext javaStreamingContext, String hostName, int port) {
JavaReceiverInputDStream<SparkFlumeEvent> flumeEventStream = FlumeUtils.createStream(javaStreamingContext, hostName, port);
// Set different storage level
// flumeEventStream.persist(StorageLevel.MEMORY_AND_DISK_SER());
JavaDStream<String> dStream = flumeEventStream.map(new Function<SparkFlumeEvent, String>() {
@Override
public String call(SparkFlumeEvent sparkFlumeEvent) throws Exception {
byte[] bodyArray = sparkFlumeEvent.event().getBody().array();
String logTxt = new String(bodyArray, "UTF-8");
logger.info(logTxt);
return logTxt;
}
});
// dStream.print();
return dStream;
}
项目:Spark-Course
文件:SparkApp.java
public void accessTableWitRDD(JavaSparkContext sc){
JavaRDD<String> cassandraRDD = javaFunctions(sc).cassandraTable("todolist", "todolisttable")
.map(new Function<CassandraRow, String>() {
@Override
public String call(CassandraRow cassandraRow) throws Exception {
return cassandraRow.toString();
}
});
System.out.println("\nReading Data from todolisttable in Cassandra with a RDD: \n" + StringUtils.join(cassandraRDD.toArray(), "\n"));
// javaFunctions(cassandraRDD).writerBuilder("todolist", "todolisttable", mapToRow(String.class)).saveToCassandra();
}
项目:chronix.spark
文件:ChronixRDD.java
/**
* Transformation: Joins the time series according their identity.
*
* @return joined time series
*/
public ChronixRDD joinChunks() {
JavaPairRDD<MetricTimeSeriesKey, Iterable<MetricTimeSeries>> groupRdd
= this.groupBy(MetricTimeSeriesKey::new);
JavaPairRDD<MetricTimeSeriesKey, MetricTimeSeries> joinedRdd
= groupRdd.mapValues((Function<Iterable<MetricTimeSeries>, MetricTimeSeries>) mtsIt -> {
MetricTimeSeriesOrdering ordering = new MetricTimeSeriesOrdering();
List<MetricTimeSeries> orderedChunks = ordering.immutableSortedCopy(mtsIt);
MetricTimeSeries result = null;
for (MetricTimeSeries mts : orderedChunks) {
if (result == null) {
result = new MetricTimeSeries
.Builder(mts.getMetric())
.attributes(mts.attributes()).build();
}
result.addAll(mts.getTimestampsAsArray(), mts.getValuesAsArray());
}
return result;
});
JavaRDD<MetricTimeSeries> resultJavaRdd =
joinedRdd.map((Tuple2<MetricTimeSeriesKey, MetricTimeSeries> mtTuple) -> mtTuple._2);
return new ChronixRDD(resultJavaRdd);
}
项目:kite-apps
文件:KafkaOutput.java
/**
* Writes the content of the stream to the Kafka topic
* behind this producer.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="SE_INNER_CLASS", justification="Uses state from outer class.")
public void write (JavaDStream<T> stream) {
stream.foreachRDD(new Function<JavaRDD<T>, Void>() {
@Override
public Void call(JavaRDD<T> rdd) throws Exception {
write(rdd);
return null;
}
});
}
项目:learning-spark
文件:LineCountWithFiltering.java
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Line Count With Filtering");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
// Read the source file
JavaRDD<String> input = sparkContext.textFile(args[0]);
// RDD is immutable, let's create a new RDD which doesn't contain empty lines
// the function needs to return true for the records to be kept
JavaRDD<String> nonEmptyLines = input.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
if(s == null || s.trim().length() < 1) {
return false;
}
return true;
}
});
long count = nonEmptyLines.count();
System.out.println(String.format("Total lines in %s is %d",args[0],count));
}
项目:spark-utils
文件:SparkAvroLoaderTest.java
private void assertMapFunction(Function<Tuple2<AvroKey<Country>, NullWritable>, Country> function) throws Exception {
Country country = Country.newBuilder()
.setId(3)
.setIso("PL")
.setName("Poland")
.build();
AvroKey<Country> avroKey = new AvroKey<Country>(country);
Tuple2<AvroKey<Country>, NullWritable> pair = new Tuple2<>(avroKey, NullWritable.get());
Country retCountry = function.call(pair);
assertEquals(Integer.valueOf(3), retCountry.getId());
assertEquals("PL", retCountry.getIso());
assertEquals("Poland", retCountry.getName());
}
项目:near-image-replica-detection
文件:MemoryPersistenceSystem.java
@Override
public JavaPairDStream<ImageFeature, ImageFeature> queryFeaturesStreaming(
ReplicaConnection conn, IndexingParams params, JavaPairDStream<ImageInfo, ImageFeature> sketches) {
final ReplicaConnection connF = conn;
final IndexingParams paramsF = params;
return sketches.transformToPair(new Function<JavaPairRDD<ImageInfo, ImageFeature>, JavaPairRDD<ImageFeature, ImageFeature>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public JavaPairRDD<ImageFeature, ImageFeature> call(JavaPairRDD<ImageInfo, ImageFeature> v1) throws Exception {
return queryFeatures(connF, paramsF, v1);
}
});
}
项目:spark-dataflow
文件:TransformTranslator.java
private static <T> TransformEvaluator<AvroIO.Read.Bound<T>> readAvro() {
return new TransformEvaluator<AvroIO.Read.Bound<T>>() {
@Override
public void evaluate(AvroIO.Read.Bound<T> transform, EvaluationContext context) {
String pattern = transform.getFilepattern();
JavaSparkContext jsc = context.getSparkContext();
@SuppressWarnings("unchecked")
JavaRDD<AvroKey<T>> avroFile = (JavaRDD<AvroKey<T>>) (JavaRDD<?>)
jsc.newAPIHadoopFile(pattern,
AvroKeyInputFormat.class,
AvroKey.class, NullWritable.class,
new Configuration()).keys();
JavaRDD<WindowedValue<T>> rdd = avroFile.map(
new Function<AvroKey<T>, T>() {
@Override
public T call(AvroKey<T> key) {
return key.datum();
}
}).map(WindowingHelpers.<T>windowFunction());
context.setOutputRDD(transform, rdd);
}
};
}
项目:spark-dataflow
文件:TransformTranslator.java
private static <K, V> TransformEvaluator<HadoopIO.Read.Bound<K, V>> readHadoop() {
return new TransformEvaluator<HadoopIO.Read.Bound<K, V>>() {
@Override
public void evaluate(HadoopIO.Read.Bound<K, V> transform, EvaluationContext context) {
String pattern = transform.getFilepattern();
JavaSparkContext jsc = context.getSparkContext();
@SuppressWarnings ("unchecked")
JavaPairRDD<K, V> file = jsc.newAPIHadoopFile(pattern,
transform.getFormatClass(),
transform.getKeyClass(), transform.getValueClass(),
new Configuration());
JavaRDD<WindowedValue<KV<K, V>>> rdd =
file.map(new Function<Tuple2<K, V>, KV<K, V>>() {
@Override
public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
return KV.of(t2._1(), t2._2());
}
}).map(WindowingHelpers.<KV<K, V>>windowFunction());
context.setOutputRDD(transform, rdd);
}
};
}
项目:spark-dataflow
文件:StreamingTransformTranslator.java
private static <K, V> TransformEvaluator<KafkaIO.Read.Unbound<K, V>> kafka() {
return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() {
@Override
public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) {
StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
JavaStreamingContext jssc = sec.getStreamingContext();
Class<K> keyClazz = transform.getKeyClass();
Class<V> valueClazz = transform.getValueClass();
Class<? extends Decoder<K>> keyDecoderClazz = transform.getKeyDecoderClass();
Class<? extends Decoder<V>> valueDecoderClazz = transform.getValueDecoderClass();
Map<String, String> kafkaParams = transform.getKafkaParams();
Set<String> topics = transform.getTopics();
JavaPairInputDStream<K, V> inputPairStream = KafkaUtils.createDirectStream(jssc, keyClazz,
valueClazz, keyDecoderClazz, valueDecoderClazz, kafkaParams, topics);
JavaDStream<WindowedValue<KV<K, V>>> inputStream =
inputPairStream.map(new Function<Tuple2<K, V>, KV<K, V>>() {
@Override
public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
return KV.of(t2._1(), t2._2());
}
}).map(WindowingHelpers.<KV<K, V>>windowFunction());
sec.setStream(transform, inputStream);
}
};
}
项目:spark-dataflow
文件:CoderHelpers.java
/**
* A function wrapper for converting a byte array pair to a key-value pair, where
* values are {@link Iterable}.
*
* @param keyCoder Coder to deserialize keys.
* @param valueCoder Coder to deserialize values.
* @param <K> The type of the key being deserialized.
* @param <V> The type of the value being deserialized.
* @return A function that accepts a pair of byte arrays and returns a key-value pair.
*/
static <K, V> PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>
fromByteFunctionIterable(final Coder<K> keyCoder, final Coder<V> valueCoder) {
return new PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>() {
@Override
public Tuple2<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> tuple) {
return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder),
Iterables.transform(tuple._2(), new com.google.common.base.Function<byte[], V>() {
@Override
public V apply(byte[] bytes) {
return fromByteArray(bytes, valueCoder);
}
}));
}
};
}
项目:predictionio-template-java-ecom-recommender
文件:Algorithm.java
private List<ItemScore> similarItems(final List<double[]> recentProductFeatures, Model model, Query query) {
JavaRDD<ItemScore> itemScores = model.getIndexItemFeatures().map(new Function<Tuple2<Integer, Tuple2<String, double[]>>, ItemScore>() {
@Override
public ItemScore call(Tuple2<Integer, Tuple2<String, double[]>> element) throws Exception {
double similarity = 0.0;
for (double[] recentFeature : recentProductFeatures) {
similarity += cosineSimilarity(element._2()._2(), recentFeature);
}
return new ItemScore(element._2()._1(), similarity);
}
});
itemScores = validScores(itemScores, query.getWhitelist(), query.getBlacklist(), query.getCategories(), model.getItems(), query.getUserEntityId());
return sortAndTake(itemScores, query.getNumber());
}
项目:predictionio-template-java-ecom-recommender
文件:Algorithm.java
private JavaRDD<ItemScore> validScores(JavaRDD<ItemScore> all, final Set<String> whitelist, final Set<String> blacklist, final Set<String> categories, final Map<String, Item> items, String userEntityId) {
final Set<String> seenItemEntityIds = seenItemEntityIds(userEntityId);
final Set<String> unavailableItemEntityIds = unavailableItemEntityIds();
return all.filter(new Function<ItemScore, Boolean>() {
@Override
public Boolean call(ItemScore itemScore) throws Exception {
Item item = items.get(itemScore.getItemEntityId());
return (item != null
&& passWhitelistCriteria(whitelist, item.getEntityId())
&& passBlacklistCriteria(blacklist, item.getEntityId())
&& passCategoryCriteria(categories, item)
&& passUnseenCriteria(seenItemEntityIds, item.getEntityId())
&& passAvailabilityCriteria(unavailableItemEntityIds, item.getEntityId()));
}
});
}
项目:streamsx.sparkMLLib
文件:JavaTrainingApplication.java
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkStreamsSampleTrainingApplication");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> lines = jsc.textFile("data/random_2d_training.csv");
JavaRDD<Vector> parsedData = lines.map(
new Function<String, Vector>() {
@Override
public Vector call(String s) {
String[] sarray = s.split(",");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
}
}
);
parsedData.cache();
int numClusters = 10;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
clusters.save(jsc.sc(), "etc/kmeans_model");
jsc.close();
}
项目:SparkApps
文件:Main.java
public static void main(String[] args) {
DbConnection dbConnection = new DbConnection(MYSQL_DRIVER, MYSQL_CONNECTION_URL, MYSQL_USERNAME, MYSQL_PWD);
// Load data from MySQL
JdbcRDD<Object[]> jdbcRDD =
new JdbcRDD<>(sc.sc(), dbConnection, "select * from employees where emp_no >= ? and emp_no <= ?", 10001,
499999, 10, new MapResult(), ClassManifestFactory$.MODULE$.fromClass(Object[].class));
// Convert to JavaRDD
JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD, ClassManifestFactory$.MODULE$.fromClass(Object[].class));
// Join first name and last name
List<String> employeeFullNameList = javaRDD.map(new Function<Object[], String>() {
@Override
public String call(final Object[] record) throws Exception {
return record[2] + " " + record[3];
}
}).collect();
for (String fullName : employeeFullNameList) {
LOGGER.info(fullName);
}
}
项目:kylin
文件:SparkCubingByLayer.java
private Long getRDDCountSum(JavaPairRDD<ByteArray, Object[]> rdd, final int countMeasureIndex) {
final ByteArray ONE = new ByteArray();
Long count = rdd.mapValues(new Function<Object[], Long>() {
@Override
public Long call(Object[] objects) throws Exception {
return (Long) objects[countMeasureIndex];
}
}).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>>() {
@Override
public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> longTuple2, Tuple2<ByteArray, Long> longTuple22)
throws Exception {
return new Tuple2<>(ONE, longTuple2._2() + longTuple22._2());
}
})._2();
return count;
}
项目:kylin
文件:IteratorUtilsTest.java
private static ArrayList<Tuple2<Integer, Integer>> getResult(List<Tuple2<Integer, Integer>> list) {
return Lists.newArrayList(IteratorUtils.merge(list.iterator(), new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1 - o2;
}
}, new Function<Iterable<Integer>, Integer>() {
@Override
public Integer call(Iterable<Integer> v1) throws Exception {
int sum = 0;
for (Integer integer : v1) {
sum += integer;
}
return sum;
}
}));
}
项目:iis
文件:DocumentOrganizationCombinerTest.java
private void assertFilterDocumentProjectFunction(Function<Tuple2<String, AffMatchDocumentProject>, Boolean> function,
Float confidenceLevelThreshold) throws Exception {
if (confidenceLevelThreshold != null) {
// given
float greaterConfidenceLevel = confidenceLevelThreshold+0.1f;
// execute & assert
assertTrue(function.call(new Tuple2<String, AffMatchDocumentProject>(projectId, new AffMatchDocumentProject(documentId, projectId, greaterConfidenceLevel))));
// given
float smallerConfidenceLevel = confidenceLevelThreshold-0.1f;
// execute & assert
assertFalse(function.call(new Tuple2<String, AffMatchDocumentProject>(projectId, new AffMatchDocumentProject(documentId, projectId, smallerConfidenceLevel))));
} else {
// execute & assert
assertTrue(function.call(new Tuple2<String, AffMatchDocumentProject>(projectId, new AffMatchDocumentProject(documentId, projectId, 0.0001f))));
}
}