Java 类org.apache.spark.api.java.function.VoidFunction 实例源码
项目:incubator-sdap-mudrod
文件:SessionGenerator.java
public void combineShortSessionsInParallel(int timeThres) throws InterruptedException, IOException {
JavaRDD<String> userRDD = getUserRDD(this.cleanupType);
userRDD.foreachPartition(new VoidFunction<Iterator<String>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(Iterator<String> arg0) throws Exception {
ESDriver tmpES = new ESDriver(props);
tmpES.createBulkProcessor();
while (arg0.hasNext()) {
String s = arg0.next();
combineShortSessions(tmpES, s, timeThres);
}
tmpES.destroyBulkProcessor();
tmpES.close();
}
});
}
项目:gcp
文件:BigQueryHelper.java
public static <X> VoidFunction<JavaPairRDD<X, JsonObject>> outputTo(String table, String schema) throws IOException {
Configuration conf = new Configuration();
conf.set("mapreduce.job.outputformat.class", BigQueryOutputFormat.class.getName());
BigQueryConfiguration.configureBigQueryOutput(conf, table, schema);
return rdd -> {
if (rdd.count() > 0L) {
long time = System.currentTimeMillis();
/* This was only required the first time on a fresh table, it seems I had to kickstart the _PARTITIONTIME pseudo-column
* but now it automatically add to the proper table using ingestion time. Using the decorator would only be required
* if we were to place the entries using their "event timestamp", e.g. loading rows on old partitions.
* Implementing that would be much harder though, since'd have to check each message, or each "partition" (date-based)
if (partitioned) {
String today = ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ofPattern("yyyyMMdd"));
BigQueryConfiguration.configureBigQueryOutput(conf, table + "$" + today, schema);
}*/
rdd.saveAsNewAPIHadoopDataset(conf);
System.out.printf("Sent %d rows to BQ in %.1fs\n", rdd.count(), (System.currentTimeMillis() - time) / 1000f);
}
};
}
项目:mudrod
文件:SessionGenerator.java
public void combineShortSessionsInParallel(int timeThres) throws InterruptedException, IOException {
JavaRDD<String> userRDD = getUserRDD(this.cleanupType);
userRDD.foreachPartition(new VoidFunction<Iterator<String>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(Iterator<String> arg0) throws Exception {
ESDriver tmpES = new ESDriver(props);
tmpES.createBulkProcessor();
while (arg0.hasNext()) {
String s = arg0.next();
combineShortSessions(tmpES, s, timeThres);
}
tmpES.destroyBulkProcessor();
tmpES.close();
}
});
}
项目:nats-connector-spark
文件:SparkToNatsConnectorPool.java
/**
* @param stream, the Spark Stream to publish to NATS
* @param dataEncoder, the function used to encode the Spark Stream Records into the NATS Message Payloads
*/
public <V extends Object> void publishToNats(final JavaDStream<V> stream, final Function<V, byte[]> dataEncoder) {
logger.trace("publishToNats(JavaDStream<String> stream)");
stream.foreachRDD((VoidFunction<JavaRDD<V>>) rdd -> {
logger.trace("stream.foreachRDD");
rdd.foreachPartitionAsync(objects -> {
logger.trace("rdd.foreachPartition");
final SparkToNatsConnector<?> connector = getConnector();
while(objects.hasNext()) {
final V obj = objects.next();
logger.trace("Will publish {}", obj);
connector.publishToNats(dataEncoder.apply(obj));
}
returnConnector(connector); // return to the pool for future reuse
});
});
}
项目:nats-connector-spark
文件:SparkToNatsConnectorPool.java
/**
* @param stream, the Spark Stream (composed of Key/Value Records) to publish to NATS
* @param dataEncoder, the function used to encode the Spark Stream Records into the NATS Message Payloads
*/
public <K extends Object, V extends Object> void publishToNatsAsKeyValue(final JavaPairDStream<K, V> stream, final Function<V, byte[]> dataEncoder) {
logger.trace("publishToNats(JavaPairDStream<String, String> stream)");
setStoredAsKeyValue(true);
stream.foreachRDD((VoidFunction<JavaPairRDD<K, V>>) rdd -> {
logger.trace("stream.foreachRDD");
rdd.foreachPartitionAsync((VoidFunction<Iterator<Tuple2<K,V>>>) tuples -> {
logger.trace("rdd.foreachPartition");
final SparkToNatsConnector<?> connector = getConnector();
while(tuples.hasNext()) {
final Tuple2<K,V> tuple = tuples.next();
logger.trace("Will publish {}", tuple);
connector.publishToNats(tuple._1.toString(), dataEncoder.apply(tuple._2));
}
returnConnector(connector); // return to the pool for future reuse
});
});
}
项目:HadoopCV
文件:InputFormatTest.java
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("VideoInput").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
Configuration hc = new org.apache.hadoop.conf.Configuration();
JavaPairRDD<Text, HBMat> video = sc.newAPIHadoopFile("data/bike.avi", VideoInputFormat.class, Text.class, HBMat.class,hc);
video.foreach(new VoidFunction<Tuple2<Text,HBMat>>() {
@Override
public void call(Tuple2<Text, HBMat> tuple) throws Exception {
HBMat image = (HBMat)tuple._2;
System.out.print(image.getBmat().dump());
}
});
System.out.print(video.count());
}
项目:GeoSpark
文件:GeoSparkVizImageGenerator.java
/**
* Save raster image as local file.
*
* @param distributedImage the distributed image
* @param outputPath the output path
* @param imageType the image type
* @param zoomLevel the zoom level
* @param partitionOnX the partition on X
* @param partitionOnY the partition on Y
* @return true, if successful
* @throws Exception the exception
*/
public boolean SaveRasterImageAsLocalFile(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, final String outputPath, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY) throws Exception
{
logger.info("[GeoSparkViz][SaveRasterImageAsLocalFile][Start]");
for(int i=0;i<partitionOnX*partitionOnY;i++) {
deleteLocalFile(outputPath+"-"+ RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i),imageType);
}
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() {
@Override
public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception {
SaveRasterImageAsLocalFile(integerImageSerializableWrapperTuple2._2.getImage(), outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType);
}
});
logger.info("[GeoSparkViz][SaveRasterImageAsLocalFile][Stop]");
return true;
}
项目:GeoSpark
文件:GeoSparkVizImageGenerator.java
/**
* Save raster image as hadoop file.
*
* @param distributedImage the distributed image
* @param outputPath the output path
* @param imageType the image type
* @param zoomLevel the zoom level
* @param partitionOnX the partition on X
* @param partitionOnY the partition on Y
* @return true, if successful
* @throws Exception the exception
*/
public boolean SaveRasterImageAsHadoopFile(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, final String outputPath, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY) throws Exception
{
logger.info("[GeoSparkViz][SaveRasterImageAsHadoopFile][Start]");
for(int i=0;i<partitionOnX*partitionOnY;i++) {
deleteHadoopFile(outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i)+".", imageType);
}
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() {
@Override
public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception {
SaveRasterImageAsHadoopFile(integerImageSerializableWrapperTuple2._2.getImage(), outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType);
}
});
logger.info("[GeoSparkViz][SaveRasterImageAsHadoopFile][Stop]");
return true;
}
项目:GeoSpark
文件:GeoSparkVizImageGenerator.java
/**
* Save raster image as S 3 file.
*
* @param distributedImage the distributed image
* @param regionName the region name
* @param accessKey the access key
* @param secretKey the secret key
* @param bucketName the bucket name
* @param path the path
* @param imageType the image type
* @param zoomLevel the zoom level
* @param partitionOnX the partition on X
* @param partitionOnY the partition on Y
* @return true, if successful
*/
public boolean SaveRasterImageAsS3File(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage,
final String regionName, final String accessKey, final String secretKey,
final String bucketName, final String path, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY)
{
logger.info("[GeoSparkViz][SaveRasterImageAsS3File][Start]");
S3Operator s3Operator = new S3Operator(regionName, accessKey, secretKey);
for(int i=0;i<partitionOnX*partitionOnY;i++) {
s3Operator.deleteImage(bucketName, path+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i)+"."+imageType.getTypeName());
}
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() {
@Override
public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception {
SaveRasterImageAsS3File(integerImageSerializableWrapperTuple2._2.getImage(), regionName, accessKey, secretKey, bucketName, path+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType);
}
});
logger.info("[GeoSparkViz][SaveRasterImageAsS3File][Stop]");
return true;
}
项目:GeoSpark
文件:ImageGenerator.java
/**
* Save raster image as local file.
*
* @param distributedImage the distributed image
* @param outputPath the output path
* @param imageType the image type
* @param zoomLevel the zoom level
* @param partitionOnX the partition on X
* @param partitionOnY the partition on Y
* @return true, if successful
* @throws Exception the exception
*/
public boolean SaveRasterImageAsLocalFile(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, final String outputPath, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY) throws Exception
{
logger.info("[GeoSparkViz][SaveRasterImageAsLocalFile][Start]");
for(int i=0;i<partitionOnX*partitionOnY;i++) {
deleteLocalFile(outputPath+"-"+ RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i),imageType);
}
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() {
@Override
public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception {
SaveRasterImageAsLocalFile(integerImageSerializableWrapperTuple2._2.image, outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType);
}
});
logger.info("[GeoSparkViz][SaveRasterImageAsLocalFile][Stop]");
return true;
}
项目:GeoSpark
文件:ImageGenerator.java
/**
* Save raster image as hadoop file.
*
* @param distributedImage the distributed image
* @param outputPath the output path
* @param imageType the image type
* @param zoomLevel the zoom level
* @param partitionOnX the partition on X
* @param partitionOnY the partition on Y
* @return true, if successful
* @throws Exception the exception
*/
public boolean SaveRasterImageAsHadoopFile(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, final String outputPath, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY) throws Exception
{
logger.info("[GeoSparkViz][SaveRasterImageAsHadoopFile][Start]");
for(int i=0;i<partitionOnX*partitionOnY;i++) {
deleteHadoopFile(outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i)+".", imageType);
}
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() {
@Override
public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception {
SaveRasterImageAsHadoopFile(integerImageSerializableWrapperTuple2._2.image, outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType);
}
});
logger.info("[GeoSparkViz][SaveRasterImageAsHadoopFile][Stop]");
return true;
}
项目:GeoSpark
文件:ImageGenerator.java
/**
* Save raster image as S 3 file.
*
* @param distributedImage the distributed image
* @param regionName the region name
* @param accessKey the access key
* @param secretKey the secret key
* @param bucketName the bucket name
* @param path the path
* @param imageType the image type
* @param zoomLevel the zoom level
* @param partitionOnX the partition on X
* @param partitionOnY the partition on Y
* @return true, if successful
*/
public boolean SaveRasterImageAsS3File(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage,
final String regionName, final String accessKey, final String secretKey,
final String bucketName, final String path, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY)
{
logger.info("[GeoSparkViz][SaveRasterImageAsS3File][Start]");
S3Operator s3Operator = new S3Operator(regionName, accessKey, secretKey);
for(int i=0;i<partitionOnX*partitionOnY;i++) {
s3Operator.deleteImage(bucketName, path+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i)+"."+imageType.getTypeName());
}
distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() {
@Override
public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception {
SaveRasterImageAsS3File(integerImageSerializableWrapperTuple2._2.image, regionName, accessKey, secretKey, bucketName, path+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType);
}
});
logger.info("[GeoSparkViz][SaveRasterImageAsS3File][Stop]");
return true;
}
项目:Luzzu
文件:TriplePublisher.java
public void publishTriples(JavaRDD<String> datasetRDD) throws IOException {
logger.debug("Initiating publication of triples on the queue...");
datasetRDD.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 7603190977649586962L;
@Override
public void call(String stmt) throws Exception {
// publish triple (statement) into the exchange
if(stmt != null) {
if(channel == null) {
logger.warn("Channel was found to be null attempting to publish, reconnecting...");
connect();
}
channel.basicPublish(EXCHANGE_NAME, "", null, stmt.getBytes());
}
}
});
logger.debug("All triples published on the queue. Processing metrics...");
}
项目:gcp
文件:IdleStop.java
public static <A extends JavaRDDLike<?, ?>> VoidFunction<A> create(JavaStreamingContext jsc, long amount, String printf) {
final LongAccumulator stopAcc = jsc.ssc().sc().longAccumulator();
return rdd -> {
if (printf != null)
System.out.printf(printf, rdd.count());
if (rdd.count() == 0L) {
stopAcc.add(1L);
if (stopAcc.value() >= amount)
jsc.stop();
} else
stopAcc.reset();
};
}
项目:spark-streaming-direct-kafka
文件:Functions.java
public static <T> VoidFunction<T> noOp() {
return new VoidFunction<T>() {
@Override
public void call(T t) {
// do nothing
}
};
}
项目:maelstrom
文件:StreamMultiTopic.java
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("StreamMultiTopic");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
CuratorFramework curator = OffsetManager.createCurator("127.0.0.1:2181");
KafkaConsumerPoolFactory<String,String> poolFactory = new KafkaConsumerPoolFactory<>("127.0.0.1:9092", StringDecoder.class, StringDecoder.class);
ControllerKafkaTopics<String,String> topics = new ControllerKafkaTopics<>(sc.sc(), curator, poolFactory);
topics.registerTopic("test_multi", "test");
topics.registerTopic("test_multi", "test2");
new StreamProcessor<String,String>(topics) {
@Override
public final void process() {
JavaRDD<Tuple2<String,String>> rdd = fetch().toJavaRDD();
rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,String>>>() {
@Override
public final void call(final Iterator<Tuple2<String,String>> it) {
while (it.hasNext()) {
Tuple2<String,String> e = it.next();
LOG.info("key=" + e._1 + " message=" + e._2());
}
}
});
commit();
}
}.run();
sc.sc().stop();
}
项目:maelstrom
文件:StreamSingleTopic.java
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("StreamSingleTopic");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
CuratorFramework curator = OffsetManager.createCurator("127.0.0.1:2181");
KafkaConsumerPoolFactory<String,String> poolFactory = new KafkaConsumerPoolFactory<>("127.0.0.1:9092", StringDecoder.class, StringDecoder.class);
ControllerKafkaTopics<String,String> topics = new ControllerKafkaTopics<>(sc.sc(), curator, poolFactory);
ControllerKafkaTopic<String,String> topic = topics.registerTopic("test_group", "test");
new StreamProcessor<String,String>(topic) {
@Override
public final void process() {
JavaRDD<Tuple2<String,String>> rdd = fetch().toJavaRDD();
rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,String>>>() {
@Override
public final void call(final Iterator<Tuple2<String,String>> it) {
while (it.hasNext()) {
Tuple2<String,String> e = it.next();
LOG.info("key=" + e._1 + " message=" + e._2());
}
}
});
commit();
}
}.run();
sc.sc().stop();
}
项目:incubator-pirk
文件:ComputeStreamingResponse.java
private void encryptedColumnCalc(JavaPairDStream<Long,BigInteger> encRowRDD)
{
// Multiply the column values by colNum: emit <colNum, finalColVal>
JavaPairDStream<Long,BigInteger> encColRDD;
if (colMultReduceByKey)
{
encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(bVars), numColMultPartitions);
}
else
{
encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(bVars));
}
// Update the output name, by batch number
bVars.setOutput(outputFile + "_" + accum.numBatchesGetValue());
// Form and write the response object
encColRDD.repartition(1).foreachRDD((VoidFunction<JavaPairRDD<Long,BigInteger>>) rdd -> {
rdd.foreachPartition(new FinalResponseFunction(accum, bVars));
int maxBatchesVar = bVars.getMaxBatches();
if (maxBatchesVar != -1 && accum.numBatchesGetValue() == maxBatchesVar)
{
logger.info("num batches = maxBatches = " + maxBatchesVar + "; shutting down");
System.exit(0);
}
});
}
项目:beam
文件:TranslationUtils.java
public static <T> VoidFunction<T> emptyVoidFunction() {
return new VoidFunction<T>() {
@Override
public void call(T t) throws Exception {
// Empty implementation.
}
};
}
项目:beam
文件:UnboundedDataset.java
@Override
public void action() {
// Force computation of DStream.
dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() {
@Override
public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
rdd.foreach(TranslationUtils.<WindowedValue<T>>emptyVoidFunction());
}
});
}
项目:nats-connector-spark
文件:AbstractNatsToSparkTest.java
protected void validateTheReceptionOfMessages(JavaStreamingContext ssc,
JavaReceiverInputDStream<String> stream) throws InterruptedException {
JavaDStream<String> messages = stream.repartition(3);
ExecutorService executor = Executors.newFixedThreadPool(6);
final int nbOfMessages = 5;
NatsPublisher np = getNatsPublisher(nbOfMessages);
if (logger.isDebugEnabled()) {
messages.print();
}
messages.foreachRDD(new VoidFunction<JavaRDD<String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(JavaRDD<String> rdd) throws Exception {
logger.debug("RDD received: {}", rdd.collect());
final long count = rdd.count();
if ((count != 0) && (count != nbOfMessages)) {
rightNumber = false;
logger.error("The number of messages received should have been {} instead of {}.", nbOfMessages, count);
}
TOTAL_COUNT.getAndAdd((int) count);
atLeastSomeData = atLeastSomeData || (count > 0);
for (String str :rdd.collect()) {
if (! str.startsWith(NatsPublisher.NATS_PAYLOAD)) {
payload = str;
}
}
}
});
closeTheValidation(ssc, executor, nbOfMessages, np);
}
项目:nats-connector-spark
文件:AbstractNatsToSparkTest.java
protected void validateTheReceptionOfIntegerMessages(JavaStreamingContext ssc,
JavaReceiverInputDStream<Integer> stream) throws InterruptedException {
JavaDStream<Integer> messages = stream.repartition(3);
ExecutorService executor = Executors.newFixedThreadPool(6);
final int nbOfMessages = 5;
NatsPublisher np = getNatsPublisher(nbOfMessages);
if (logger.isDebugEnabled()) {
messages.print();
}
messages.foreachRDD(new VoidFunction<JavaRDD<Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(JavaRDD<Integer> rdd) throws Exception {
logger.debug("RDD received: {}", rdd.collect());
final long count = rdd.count();
if ((count != 0) && (count != nbOfMessages)) {
rightNumber = false;
logger.error("The number of messages received should have been {} instead of {}.", nbOfMessages, count);
}
TOTAL_COUNT.getAndAdd((int) count);
atLeastSomeData = atLeastSomeData || (count > 0);
for (Integer value :rdd.collect()) {
if (value < NatsPublisher.NATS_PAYLOAD_INT) {
payload = value.toString();
}
}
}
});
closeTheValidation(ssc, executor, nbOfMessages, np);
}
项目:nats-connector-spark
文件:AbstractNatsToSparkTest.java
protected void validateTheReceptionOfMessages(final JavaStreamingContext ssc,
final JavaPairDStream<String, String> messages) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(6);
final int nbOfMessages = 5;
NatsPublisher np = getNatsPublisher(nbOfMessages);
if (logger.isDebugEnabled()) {
messages.print();
}
JavaPairDStream<String, Integer> pairs = messages.mapToPair(s -> new Tuple2(s._1, 1));
JavaPairDStream<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
counts.print();
counts.foreachRDD((VoidFunction<JavaPairRDD<String, Integer>>) pairRDD -> {
pairRDD.foreach((VoidFunction<Tuple2<String, Integer>>) tuple -> {
final long count = tuple._2;
if ((count != 0) && (count != nbOfMessages)) {
rightNumber = false;
logger.error("The number of messages received should have been {} instead of {}.", nbOfMessages, count);
}
TOTAL_COUNT.getAndAdd((int) count);
atLeastSomeData = atLeastSomeData || (count > 0);
});
});
closeTheValidation(ssc, executor, nbOfMessages, np);
}
项目:kafka-spark-consumer
文件:ProcessedOffsetManager.java
@SuppressWarnings("deprecation")
public static void persists(JavaPairDStream<Integer, Iterable<Long>> partitonOffset, Properties props) {
partitonOffset.foreachRDD(new VoidFunction<JavaPairRDD<Integer,Iterable<Long>>>() {
@Override
public void call(JavaPairRDD<Integer, Iterable<Long>> po) throws Exception {
List<Tuple2<Integer, Iterable<Long>>> poList = po.collect();
doPersists(poList, props);
}
});
}
项目:kafka-spark-consumer
文件:ProcessedOffsetManager.java
@SuppressWarnings("deprecation")
public static void persists(DStream<Tuple2<Integer, Iterable<Long>>> partitonOffset, Properties props) {
ClassTag<Tuple2<Integer, Iterable<Long>>> tuple2ClassTag =
ScalaUtil.<Integer, Iterable<Long>>getTuple2ClassTag();
JavaDStream<Tuple2<Integer, Iterable<Long>>> jpartitonOffset =
new JavaDStream<Tuple2<Integer, Iterable<Long>>>(partitonOffset, tuple2ClassTag);
jpartitonOffset.foreachRDD(new VoidFunction<JavaRDD<Tuple2<Integer, Iterable<Long>>>>() {
@Override
public void call(JavaRDD<Tuple2<Integer, Iterable<Long>>> po) throws Exception {
List<Tuple2<Integer, Iterable<Long>>> poList = po.collect();
doPersists(poList, props);
}
});
}
项目:deeplearning4j
文件:PrintDataSet.java
@Override
public void call(JavaRDD<DataSet> dataSetJavaRDD) throws Exception {
dataSetJavaRDD.foreach(new VoidFunction<DataSet>() {
@Override
public void call(DataSet dataSet) throws Exception {
System.out.println(dataSet);
}
});
}
项目:deeplearning4j
文件:PrintDataSet.java
@Override
public Void call(JavaRDD<DataSet> dataSetJavaRDD) throws Exception {
dataSetJavaRDD.foreach(new VoidFunction<DataSet>() {
@Override
public void call(DataSet dataSet) throws Exception {
System.out.println(dataSet);
}
});
return null;
}
项目:spliceengine
文件:ReaderWriterExample.java
public static void main(String[] args) throws Exception {
final String dbUrl = args[0];
final String hostname = args[1];
final String port = args[2];
final String inTargetSchema = args[3];
final String inTargetTable = args[4];
SparkConf conf = new SparkConf();
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(500));
SpliceSpark.setContext(ssc.sparkContext());
SparkSession spark = SpliceSpark.getSessionUnsafe();
JavaReceiverInputDStream<String> stream = ssc.socketTextStream(hostname, Integer.parseInt(port));
// Create a SplicemachineContext based on the provided DB connection
SplicemachineContext splicemachineContext = new SplicemachineContext(dbUrl);
// Set target tablename and schemaname
final String table = inTargetSchema + "." + inTargetTable;
stream.foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> {
JavaRDD<Row> rowRDD = rdd.map((Function<String, Row>) s -> RowFactory.create(s));
Dataset<Row> df = spark.createDataFrame(rowRDD, splicemachineContext.getSchema(table));
splicemachineContext.insert(df, table);
});
ssc.start();
ssc.awaitTermination();
}
项目:Apache-Spark-2x-for-Java-Developers
文件:PersistExample.java
/**
* @param args
*/
public static void main(String[] args) {
//C:\Users\sumit.kumar\Downloads\bin\warehouse
//System.setProperty("hadoop.home.dir", "C:\\Users\\sumit.kumar\\Downloads");
String logFile = "src/main/resources/Apology_by_Plato.txt"; // Should be some file on your system
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.WARN);
SparkConf conf = new SparkConf().setMaster("local").setAppName("ActionExamples").set("spark.hadoop.validateOutputSpecs", "false");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaRDD<Integer> rdd = sparkContext.parallelize(Arrays.asList(1, 2, 3,4,5),3).cache();
JavaRDD<Integer> evenRDD= rdd.filter(new org.apache.spark.api.java.function.Function<Integer, Boolean>() {
@Override
public Boolean call(Integer v1) throws Exception {
return ((v1%2)==0)?true:false;
}
});
evenRDD.persist(StorageLevel.MEMORY_AND_DISK());
evenRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer t) throws Exception {
System.out.println("The value of RDD are :"+t);
}
});
//unpersisting the RDD
evenRDD.unpersist();
rdd.unpersist();
/* JavaRDD<String> lines = spark.read().textFile(logFile).javaRDD().cache();
System.out.println("DEBUG: \n"+ lines.toDebugString());
long word= lines.count();
JavaRDD<String> distinctLines=lines.distinct();
System.out.println("DEBUG: \n"+ distinctLines.toDebugString());
JavaRDD<String> finalRdd=lines.subtract(distinctLines);
System.out.println("DEBUG: \n"+ finalRdd.toDebugString());
System.out.println("The count is "+word);
System.out.println("The count is "+distinctLines.count());
System.out.println("The count is "+finalRdd.count());
finalRdd.foreach(new VoidFunction<String>() {
@Override
public void call(String t) throws Exception {
// TODO Auto-generated method stub
System.out.println(t);
}
});
*/ /*SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
StorageLevel newLevel;
JavaRDD<String> logData = sc.textFile(logFile).cache();
long numAs = logData.filter(new Function(logFile, logFile, logFile, logFile, false) {
public Boolean call(String s) { return s.contains("a"); }
}).count();
long numBs = logData.filter(new Function(logFile, logFile, logFile, logFile, false) {
public Boolean call(String s) { return s.contains("b"); }
}).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
sc.stop();*/
}
项目:athena
文件:GaussianMixtureDistJob.java
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD,
AthenaMLFeatureConfiguration athenaMLFeatureConfiguration,
GaussianMixtureDetectionModel gaussianMixtureDetectionModel,
GaussianMixtureValidationSummary gaussianMixtureValidationSummary) {
List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures();
Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight();
GaussianMixtureModel gaussianMixtureModel = (GaussianMixtureModel) gaussianMixtureDetectionModel.getDetectionModel();
int numberOfTargetValue = listOfTargetFeatures.size();
Normalizer normalizer = new Normalizer();
mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() {
public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException {
long start2 = System.nanoTime(); // <-- start
BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE);
BSONObject idx = (BSONObject) t._2();
Vector normedForVal;
double[] values = new double[numberOfTargetValue];
for (int j = 0; j < numberOfTargetValue; j++) {
values[j] = 0;
if (feature.containsField(listOfTargetFeatures.get(j).getValue())) {
Object obj = feature.get(listOfTargetFeatures.get(j).getValue());
if (obj instanceof Long) {
values[j] = (Long) obj;
} else if (obj instanceof Double) {
values[j] = (Double) obj;
} else if (obj instanceof Boolean) {
values[j] = (Boolean) obj ? 1 : 0;
} else {
return;
}
//check weight
if (weight.containsKey(listOfTargetFeatures.get(j))) {
values[j] *= weight.get(listOfTargetFeatures.get(j));
}
//check absolute
if (athenaMLFeatureConfiguration.isAbsolute()){
values[j] = Math.abs(values[j]);
}
}
}
if (athenaMLFeatureConfiguration.isNormalization()) {
normedForVal = normalizer.transform(Vectors.dense(values));
} else {
normedForVal = Vectors.dense(values);
}
int detectIdx = gaussianMixtureModel.predict(normedForVal);
gaussianMixtureValidationSummary.updateSummary(detectIdx, idx, feature);
long end2 = System.nanoTime();
long result2 = end2 - start2;
gaussianMixtureValidationSummary.addTotalNanoSeconds(result2);
}
});
gaussianMixtureValidationSummary.calculateDetectionRate();
gaussianMixtureValidationSummary.getAverageNanoSeconds();
gaussianMixtureValidationSummary.setGaussianMixtureDetectionAlgorithm(
(GaussianMixtureDetectionAlgorithm)gaussianMixtureDetectionModel.getDetectionAlgorithm());
}
项目:athena
文件:KMeansDistJob.java
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD,
AthenaMLFeatureConfiguration athenaMLFeatureConfiguration,
KMeansDetectionModel kMeansDetectionModel,
KmeansValidationSummary kmeansValidationSummary) {
List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures();
Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight();
KMeansModel cluster = (KMeansModel) kMeansDetectionModel.getDetectionModel();
int numberOfTargetValue = listOfTargetFeatures.size();
Normalizer normalizer = new Normalizer();
mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() {
public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException {
long start2 = System.nanoTime(); // <-- start
BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE);
BSONObject idx = (BSONObject) t._2();
Vector normedForVal;
double[] values = new double[numberOfTargetValue];
for (int j = 0; j < numberOfTargetValue; j++) {
values[j] = 0;
if (feature.containsField(listOfTargetFeatures.get(j).getValue())) {
Object obj = feature.get(listOfTargetFeatures.get(j).getValue());
if (obj instanceof Long) {
values[j] = (Long) obj;
} else if (obj instanceof Double) {
values[j] = (Double) obj;
} else if (obj instanceof Boolean) {
values[j] = (Boolean) obj ? 1 : 0;
} else {
return;
}
//check weight
if (weight.containsKey(listOfTargetFeatures.get(j))) {
values[j] *= weight.get(listOfTargetFeatures.get(j));
}
//check absolute
if (athenaMLFeatureConfiguration.isAbsolute()) {
values[j] = Math.abs(values[j]);
}
}
}
if (athenaMLFeatureConfiguration.isNormalization()) {
normedForVal = normalizer.transform(Vectors.dense(values));
} else {
normedForVal = Vectors.dense(values);
}
int detectIdx = cluster.predict(normedForVal);
kmeansValidationSummary.updateSummary(detectIdx, idx, feature);
long end2 = System.nanoTime();
long result2 = end2 - start2;
kmeansValidationSummary.addTotalNanoSeconds(result2);
}
});
kmeansValidationSummary.calculateDetectionRate();
kmeansValidationSummary.getAverageNanoSeconds();
kmeansValidationSummary.setkMeansDetectionAlgorithm((KMeansDetectionAlgorithm) kMeansDetectionModel.getDetectionAlgorithm());
}
项目:athena
文件:GradientBoostedTreesDistJob.java
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD,
AthenaMLFeatureConfiguration athenaMLFeatureConfiguration,
GradientBoostedTreesDetectionModel gradientBoostedTreesDetectionModel,
GradientBoostedTreesValidationSummary gradientBoostedTreesValidationSummary) {
List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures();
Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight();
Marking marking = gradientBoostedTreesDetectionModel.getMarking();
GradientBoostedTreesModel model = (GradientBoostedTreesModel) gradientBoostedTreesDetectionModel.getDetectionModel();
Normalizer normalizer = new Normalizer();
int numberOfTargetValue = listOfTargetFeatures.size();
mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() {
public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException {
long start2 = System.nanoTime(); // <-- start
BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE);
BSONObject idx = (BSONObject) t._2();
int originLabel = marking.checkClassificationMarkingElements(idx,feature);
double[] values = new double[numberOfTargetValue];
for (int j = 0; j < numberOfTargetValue; j++) {
values[j] = 0;
if (feature.containsField(listOfTargetFeatures.get(j).getValue())) {
Object obj = feature.get(listOfTargetFeatures.get(j).getValue());
if (obj instanceof Long) {
values[j] = (Long) obj;
} else if (obj instanceof Double) {
values[j] = (Double) obj;
} else if (obj instanceof Boolean) {
values[j] = (Boolean) obj ? 1 : 0;
} else {
return;
}
//check weight
if (weight.containsKey(listOfTargetFeatures.get(j))) {
values[j] *= weight.get(listOfTargetFeatures.get(j));
}
//check absolute
if (athenaMLFeatureConfiguration.isAbsolute()){
values[j] = Math.abs(values[j]);
}
}
}
Vector normedForVal;
if (athenaMLFeatureConfiguration.isNormalization()) {
normedForVal = normalizer.transform(Vectors.dense(values));
} else {
normedForVal = Vectors.dense(values);
}
LabeledPoint p = new LabeledPoint(originLabel,normedForVal);
int validatedLabel = (int) model.predict(p.features());
gradientBoostedTreesValidationSummary.updateSummary(validatedLabel,idx,feature);
long end2 = System.nanoTime();
long result2 = end2 - start2;
gradientBoostedTreesValidationSummary.addTotalNanoSeconds(result2);
}
});
gradientBoostedTreesValidationSummary.getAverageNanoSeconds();
gradientBoostedTreesValidationSummary.setGradientBoostedTreesDetectionAlgorithm((GradientBoostedTreesDetectionAlgorithm) gradientBoostedTreesDetectionModel.getDetectionAlgorithm());
}
项目:athena
文件:RandomForestDistJob.java
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD,
AthenaMLFeatureConfiguration athenaMLFeatureConfiguration,
RandomForestDetectionModel randomForestDetectionModel,
RandomForestValidationSummary randomForestValidationSummary) {
List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures();
Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight();
Marking marking = randomForestDetectionModel.getMarking();
RandomForestModel model = (RandomForestModel) randomForestDetectionModel.getDetectionModel();
Normalizer normalizer = new Normalizer();
int numberOfTargetValue = listOfTargetFeatures.size();
mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() {
public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException {
long start2 = System.nanoTime(); // <-- start
BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE);
BSONObject idx = (BSONObject) t._2();
int originLabel = marking.checkClassificationMarkingElements(idx,feature);
double[] values = new double[numberOfTargetValue];
for (int j = 0; j < numberOfTargetValue; j++) {
values[j] = 0;
if (feature.containsField(listOfTargetFeatures.get(j).getValue())) {
Object obj = feature.get(listOfTargetFeatures.get(j).getValue());
if (obj instanceof Long) {
values[j] = (Long) obj;
} else if (obj instanceof Double) {
values[j] = (Double) obj;
} else if (obj instanceof Boolean) {
values[j] = (Boolean) obj ? 1 : 0;
} else {
return;
}
//check weight
if (weight.containsKey(listOfTargetFeatures.get(j))) {
values[j] *= weight.get(listOfTargetFeatures.get(j));
}
//check absolute
if (athenaMLFeatureConfiguration.isAbsolute()){
values[j] = Math.abs(values[j]);
}
}
}
Vector normedForVal;
if (athenaMLFeatureConfiguration.isNormalization()) {
normedForVal = normalizer.transform(Vectors.dense(values));
} else {
normedForVal = Vectors.dense(values);
}
LabeledPoint p = new LabeledPoint(originLabel,normedForVal);
int validatedLabel = (int) model.predict(p.features());
randomForestValidationSummary.updateSummary(validatedLabel,idx,feature);
long end2 = System.nanoTime();
long result2 = end2 - start2;
randomForestValidationSummary.addTotalNanoSeconds(result2);
}
});
randomForestValidationSummary.getAverageNanoSeconds();
randomForestValidationSummary.setRandomForestDetectionAlgorithm((RandomForestDetectionAlgorithm) randomForestDetectionModel.getDetectionAlgorithm());
}
项目:athena
文件:SVMDistJob.java
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD,
AthenaMLFeatureConfiguration athenaMLFeatureConfiguration,
SVMDetectionModel SVMDetectionModel,
SVMValidationSummary SVMValidationSummary) {
List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures();
Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight();
Marking marking = SVMDetectionModel.getMarking();
SVMModel model = (SVMModel) SVMDetectionModel.getDetectionModel();
Normalizer normalizer = new Normalizer();
int numberOfTargetValue = listOfTargetFeatures.size();
mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() {
public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException {
long start2 = System.nanoTime(); // <-- start
BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE);
BSONObject idx = (BSONObject) t._2();
int originLabel = marking.checkClassificationMarkingElements(idx,feature);
double[] values = new double[numberOfTargetValue];
for (int j = 0; j < numberOfTargetValue; j++) {
values[j] = 0;
if (feature.containsField(listOfTargetFeatures.get(j).getValue())) {
Object obj = feature.get(listOfTargetFeatures.get(j).getValue());
if (obj instanceof Long) {
values[j] = (Long) obj;
} else if (obj instanceof Double) {
values[j] = (Double) obj;
} else if (obj instanceof Boolean) {
values[j] = (Boolean) obj ? 1 : 0;
} else {
return;
}
//check weight
if (weight.containsKey(listOfTargetFeatures.get(j))) {
values[j] *= weight.get(listOfTargetFeatures.get(j));
}
//check absolute
if (athenaMLFeatureConfiguration.isAbsolute()){
values[j] = Math.abs(values[j]);
}
}
}
Vector normedForVal;
if (athenaMLFeatureConfiguration.isNormalization()) {
normedForVal = normalizer.transform(Vectors.dense(values));
} else {
normedForVal = Vectors.dense(values);
}
LabeledPoint p = new LabeledPoint(originLabel,normedForVal);
//Only SVM!!
int validatedLabel;// = (int) model.predict(p.features());
double score = model.predict(p.features());
if (score > 0){
//detection
validatedLabel = 1;
} else {
validatedLabel = 0;
}
SVMValidationSummary.updateSummary(validatedLabel,idx,feature);
long end2 = System.nanoTime();
long result2 = end2 - start2;
SVMValidationSummary.addTotalNanoSeconds(result2);
}
});
SVMValidationSummary.getAverageNanoSeconds();
SVMValidationSummary.setSvmDetectionAlgorithm((SVMDetectionAlgorithm) SVMDetectionModel.getDetectionAlgorithm());
}
项目:athena
文件:LogisticRegressionDistJob.java
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD,
AthenaMLFeatureConfiguration athenaMLFeatureConfiguration,
LogisticRegressionDetectionModel logisticRegressionDetectionModel,
LogisticRegressionValidationSummary logisticRegressionValidationSummary) {
List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures();
Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight();
Marking marking = logisticRegressionDetectionModel.getMarking();
LogisticRegressionModel model = (LogisticRegressionModel) logisticRegressionDetectionModel.getDetectionModel();
Normalizer normalizer = new Normalizer();
int numberOfTargetValue = listOfTargetFeatures.size();
mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() {
public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException {
long start2 = System.nanoTime(); // <-- start
BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE);
BSONObject idx = (BSONObject) t._2();
int originLabel = marking.checkClassificationMarkingElements(idx,feature);
double[] values = new double[numberOfTargetValue];
for (int j = 0; j < numberOfTargetValue; j++) {
values[j] = 0;
if (feature.containsField(listOfTargetFeatures.get(j).getValue())) {
Object obj = feature.get(listOfTargetFeatures.get(j).getValue());
if (obj instanceof Long) {
values[j] = (Long) obj;
} else if (obj instanceof Double) {
values[j] = (Double) obj;
} else if (obj instanceof Boolean) {
values[j] = (Boolean) obj ? 1 : 0;
} else {
return;
}
//check weight
if (weight.containsKey(listOfTargetFeatures.get(j))) {
values[j] *= weight.get(listOfTargetFeatures.get(j));
}
//check absolute
if (athenaMLFeatureConfiguration.isAbsolute()){
values[j] = Math.abs(values[j]);
}
}
}
Vector normedForVal;
if (athenaMLFeatureConfiguration.isNormalization()) {
normedForVal = normalizer.transform(Vectors.dense(values));
} else {
normedForVal = Vectors.dense(values);
}
LabeledPoint p = new LabeledPoint(originLabel,normedForVal);
int validatedLabel = (int) model.predict(p.features());
logisticRegressionValidationSummary.updateSummary(validatedLabel,idx,feature);
long end2 = System.nanoTime();
long result2 = end2 - start2;
logisticRegressionValidationSummary.addTotalNanoSeconds(result2);
}
});
logisticRegressionValidationSummary.getAverageNanoSeconds();
logisticRegressionValidationSummary.setLogisticRegressionDetectionAlgorithm((LogisticRegressionDetectionAlgorithm) logisticRegressionDetectionModel.getDetectionAlgorithm());
}
项目:athena
文件:DecisionTreeDistJob.java
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD,
AthenaMLFeatureConfiguration athenaMLFeatureConfiguration,
DecisionTreeDetectionModel decisionTreeDetectionModel,
DecisionTreeValidationSummary decisionTreeValidationSummary) {
List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures();
Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight();
Marking marking = decisionTreeDetectionModel.getMarking();
DecisionTreeModel model = (DecisionTreeModel) decisionTreeDetectionModel.getDetectionModel();
Normalizer normalizer = new Normalizer();
int numberOfTargetValue = listOfTargetFeatures.size();
mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() {
public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException {
long start2 = System.nanoTime(); // <-- start
BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE);
BSONObject idx = (BSONObject) t._2();
int originLabel = marking.checkClassificationMarkingElements(idx,feature);
double[] values = new double[numberOfTargetValue];
for (int j = 0; j < numberOfTargetValue; j++) {
values[j] = 0;
if (feature.containsField(listOfTargetFeatures.get(j).getValue())) {
Object obj = feature.get(listOfTargetFeatures.get(j).getValue());
if (obj instanceof Long) {
values[j] = (Long) obj;
} else if (obj instanceof Double) {
values[j] = (Double) obj;
} else if (obj instanceof Boolean) {
values[j] = (Boolean) obj ? 1 : 0;
} else {
return;
}
//check weight
if (weight.containsKey(listOfTargetFeatures.get(j))) {
values[j] *= weight.get(listOfTargetFeatures.get(j));
}
//check absolute
if (athenaMLFeatureConfiguration.isAbsolute()){
values[j] = Math.abs(values[j]);
}
}
}
Vector normedForVal;
if (athenaMLFeatureConfiguration.isNormalization()) {
normedForVal = normalizer.transform(Vectors.dense(values));
} else {
normedForVal = Vectors.dense(values);
}
LabeledPoint p = new LabeledPoint(originLabel,normedForVal);
int validatedLabel = (int) model.predict(p.features());
decisionTreeValidationSummary.updateSummary(validatedLabel,idx,feature);
long end2 = System.nanoTime();
long result2 = end2 - start2;
decisionTreeValidationSummary.addTotalNanoSeconds(result2);
}
});
decisionTreeValidationSummary.getAverageNanoSeconds();
decisionTreeValidationSummary.setDecisionTreeDetectionAlgorithm((DecisionTreeDetectionAlgorithm) decisionTreeDetectionModel.getDetectionAlgorithm());
}
项目:athena
文件:NaiveBayesDistJob.java
public void validate(JavaPairRDD<Object, BSONObject> mongoRDD,
AthenaMLFeatureConfiguration athenaMLFeatureConfiguration,
NaiveBayesDetectionModel naiveBayesDetectionModel,
NaiveBayesValidationSummary naiveBayesValidationSummary) {
List<AthenaFeatureField> listOfTargetFeatures = athenaMLFeatureConfiguration.getListOfTargetFeatures();
Map<AthenaFeatureField, Integer> weight = athenaMLFeatureConfiguration.getWeight();
Marking marking = naiveBayesDetectionModel.getMarking();
NaiveBayesModel model = (NaiveBayesModel) naiveBayesDetectionModel.getDetectionModel();
Normalizer normalizer = new Normalizer();
int numberOfTargetValue = listOfTargetFeatures.size();
mongoRDD.foreach(new VoidFunction<Tuple2<Object, BSONObject>>() {
public void call(Tuple2<Object, BSONObject> t) throws UnknownHostException {
long start2 = System.nanoTime(); // <-- start
BSONObject feature = (BSONObject) t._2().get(AthenaFeatureField.FEATURE);
BSONObject idx = (BSONObject) t._2();
int originLabel = marking.checkClassificationMarkingElements(idx,feature);
double[] values = new double[numberOfTargetValue];
for (int j = 0; j < numberOfTargetValue; j++) {
values[j] = 0;
if (feature.containsField(listOfTargetFeatures.get(j).getValue())) {
Object obj = feature.get(listOfTargetFeatures.get(j).getValue());
if (obj instanceof Long) {
values[j] = (Long) obj;
} else if (obj instanceof Double) {
values[j] = (Double) obj;
} else if (obj instanceof Boolean) {
values[j] = (Boolean) obj ? 1 : 0;
} else {
return;
}
//check weight
if (weight.containsKey(listOfTargetFeatures.get(j))) {
values[j] *= weight.get(listOfTargetFeatures.get(j));
}
//check absolute
if (athenaMLFeatureConfiguration.isAbsolute()){
values[j] = Math.abs(values[j]);
}
}
}
Vector normedForVal;
if (athenaMLFeatureConfiguration.isNormalization()) {
normedForVal = normalizer.transform(Vectors.dense(values));
} else {
normedForVal = Vectors.dense(values);
}
LabeledPoint p = new LabeledPoint(originLabel,normedForVal);
int validatedLabel = (int) model.predict(p.features());
naiveBayesValidationSummary.updateSummary(validatedLabel,idx,feature);
long end2 = System.nanoTime();
long result2 = end2 - start2;
naiveBayesValidationSummary.addTotalNanoSeconds(result2);
}
});
naiveBayesValidationSummary.getAverageNanoSeconds();
naiveBayesValidationSummary.setNaiveBayesDetectionAlgorithm((NaiveBayesDetectionAlgorithm) naiveBayesDetectionModel.getDetectionAlgorithm());
}
项目:envelope
文件:Runner.java
/**
* Run the Envelope pipeline as a Spark Streaming job.
* @param steps The full configuration of the Envelope pipeline
*/
@SuppressWarnings("unchecked")
private static void runStreaming(final Set<Step> steps) throws Exception {
final Set<Step> independentNonStreamingSteps = StepUtils.getIndependentNonStreamingSteps(steps);
runBatch(independentNonStreamingSteps);
Set<StreamingStep> streamingSteps = StepUtils.getStreamingSteps(steps);
for (final StreamingStep streamingStep : streamingSteps) {
LOG.debug("Setting up streaming step: " + streamingStep.getName());
@SuppressWarnings("rawtypes")
JavaDStream stream = streamingStep.getStream();
final StructType streamSchema = streamingStep.getSchema();
LOG.debug("Stream schema: " + streamSchema);
stream.foreachRDD(new VoidFunction<JavaRDD<?>>() {
@Override
public void call(JavaRDD<?> raw) throws Exception {
// Some independent steps might be repeating steps that have been flagged for reload
StepUtils.resetRepeatingSteps(steps);
// This will run any batch steps (and dependents) that are not submitted
runBatch(independentNonStreamingSteps);
streamingStep.stageProgress(raw);
JavaRDD<Row> translated = streamingStep.translate(raw);
Dataset<Row> batchDF = Contexts.getSparkSession().createDataFrame(translated, streamSchema);
streamingStep.setData(batchDF);
streamingStep.setSubmitted(true);
Set<Step> allDependentSteps = StepUtils.getAllDependentSteps(streamingStep, steps);
runBatch(allDependentSteps);
StepUtils.resetDataSteps(allDependentSteps);
streamingStep.recordProgress();
}
});
LOG.debug("Finished setting up streaming step: " + streamingStep.getName());
}
JavaStreamingContext jsc = Contexts.getJavaStreamingContext();
jsc.start();
LOG.debug("Streaming context started");
jsc.awaitTermination();
LOG.debug("Streaming context terminated");
}
项目:net.jgp.labs.spark
文件:StreamingIngestionFileSystemTextFileToDataframeApp.java
private void start() {
// Create a local StreamingContext with two working thread and batch interval of
// 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Streaming Ingestion File System Text File to Dataframe");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
JavaDStream<String> msgDataStream = jssc.textFileStream(StreamingUtils.getInputDirectory());
msgDataStream.print();
// Create JavaRDD<Row>
msgDataStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
private static final long serialVersionUID = -590010339928376829L;
@Override
public void call(JavaRDD<String> rdd) {
JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
private static final long serialVersionUID = 5167089361335095997L;
@Override
public Row call(String msg) {
Row row = RowFactory.create(msg);
return row;
}
});
// Create Schema
StructType schema = DataTypes.createStructType(
new StructField[] { DataTypes.createStructField("Message", DataTypes.StringType, true) });
// Get Spark 2.0 session
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> msgDataFrame = spark.createDataFrame(rowRDD, schema);
msgDataFrame.show();
}
});
jssc.start();
try {
jssc.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
项目:nats-connector-spark
文件:SparkToNatsConnector.java
/**
* A method that will publish all the records of the provided Spark RDD into NATS
* @param rdd, the RDD to publish to NATS
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public <V> void publishToNats(final JavaRDD<V> rdd) {
((JavaRDD) rdd).foreach((VoidFunction<V> & Serializable) obj -> publishToNats(encodeData(obj)));
}