Java 类org.apache.spark.api.java.JavaPairRDD 实例源码

项目:ViraPipe    文件:InterleaveMulti.java   
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {

    List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
    List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);

    JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
    JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
    JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);

    zips.foreach( splits ->  {
      Path path = splits._1.getPath();
      FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
      FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);
      writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq");
    });
  }
项目:oryx2    文件:AbstractKMeansEvaluation.java   
/**
 * @param evalData points to cluster for evaluation
 * @return cluster IDs as keys, and metrics for each cluster like the count, sum of distances to centroid,
 *  and sum of squared distances
 */
JavaPairRDD<Integer,ClusterMetric> fetchClusterMetrics(JavaRDD<Vector> evalData) {
  return evalData.mapToPair(vector -> {
    double closestDist = Double.POSITIVE_INFINITY;
    int minClusterID = Integer.MIN_VALUE;
    double[] vec = vector.toArray();
    for (ClusterInfo cluster : clusters.values()) {
      double distance = distanceFn.applyAsDouble(cluster.getCenter(), vec);
      if (distance < closestDist) {
        closestDist = distance;
        minClusterID = cluster.getID();
      }
    }
    Preconditions.checkState(!Double.isInfinite(closestDist) && !Double.isNaN(closestDist));
    return new Tuple2<>(minClusterID, new ClusterMetric(1L, closestDist, closestDist * closestDist));
  }).reduceByKey(ClusterMetric::add);
}
项目:oryx2    文件:Evaluation.java   
/**
 * Computes root mean squared error of {@link Rating#rating()} versus predicted value.
 */
static double rmse(MatrixFactorizationModel mfModel, JavaRDD<Rating> testData) {
  JavaPairRDD<Tuple2<Integer,Integer>,Double> testUserProductValues =
      testData.mapToPair(rating -> new Tuple2<>(new Tuple2<>(rating.user(), rating.product()), rating.rating()));
  @SuppressWarnings("unchecked")
  RDD<Tuple2<Object,Object>> testUserProducts =
      (RDD<Tuple2<Object,Object>>) (RDD<?>) testUserProductValues.keys().rdd();
  JavaRDD<Rating> predictions = testData.wrapRDD(mfModel.predict(testUserProducts));
  double mse = predictions.mapToPair(
      rating -> new Tuple2<>(new Tuple2<>(rating.user(), rating.product()), rating.rating())
  ).join(testUserProductValues).values().mapToDouble(valuePrediction -> {
    double diff = valuePrediction._1() - valuePrediction._2();
    return diff * diff;
  }).mean();
  return Math.sqrt(mse);
}
项目:ViraPipe    文件:NormalizeRDD.java   
private static JavaRDD<String> getUniqueKmers(JavaPairRDD<Text, SequencedFragment> fastqRDD, int k) {
  JavaRDD<String> rdd = fastqRDD.mapPartitions(records -> {

    HashSet<String> umer_set = new HashSet<String>();
    while (records.hasNext()) {
      Tuple2<Text, SequencedFragment> fastq = records.next();
      String seq = fastq._2.getSequence().toString();
      //HashSet<String> umer_in_seq = new HashSet<String>();
      for (int i = 0; i < seq.length() - k - 1; i++) {
        String kmer = seq.substring(i, i + k);
        umer_set.add(kmer);
      }
    }
    return umer_set.iterator();
  });

  JavaRDD<String> umersRDD = rdd.distinct();
  //umersRDD.sortBy(s -> s, true, 4);
  return umersRDD;
}
项目:ViraPipe    文件:SamToFastq.java   
private static JavaPairRDD<Text, SequencedFragment> mapSAMRecordsToFastq(JavaRDD<SAMRecord> bamRDD) {

    JavaPairRDD<Text, SequencedFragment> fastqRDD = bamRDD.mapToPair(read -> {

      String name = read.getReadName();
      if(read.getReadPairedFlag()){
        if(read.getFirstOfPairFlag())
          name = name+"/1";
        if(read.getSecondOfPairFlag())
          name = name+"/2";
      }

      //TODO: check values
      Text t = new Text(name);
      SequencedFragment sf = new SequencedFragment();
      sf.setSequence(new Text(read.getReadString()));
      sf.setQuality(new Text(read.getBaseQualityString()));

      return new Tuple2<Text, SequencedFragment>(t, sf);
    });
    return fastqRDD;
  }
项目:Apache-Spark-2x-for-Java-Developers    文件:WordCount.java   
public static void wordCountJava8( String filename )
{
    // Define a configuration to use to interact with Spark
    SparkConf conf = new SparkConf().setMaster("local").setAppName("Work Count App");

    // Create a Java version of the Spark Context from the configuration
    JavaSparkContext sc = new JavaSparkContext(conf);

    // Load the input data, which is a text file read from the command line
    JavaRDD<String> input = sc.textFile( filename );

    // Java 8 with lambdas: split the input string into words
   // TODO here a change has happened 
    JavaRDD<String> words = input.flatMap( s -> Arrays.asList( s.split( " " ) ).iterator() );

    // Java 8 with lambdas: transform the collection of words into pairs (word and 1) and then count them
    JavaPairRDD<Object, Object> counts = words.mapToPair( t -> new Tuple2( t, 1 ) ).reduceByKey( (x, y) -> (int)x + (int)y );

    // Save the word count back out to a text file, causing evaluation.
    counts.saveAsTextFile( "output" );
}
项目:Apache-Spark-2x-for-Java-Developers    文件:SparkWordCount.java   
public static void main(String[] args) throws Exception {
    System.out.println(System.getProperty("hadoop.home.dir"));
    String inputPath = args[0];
    String outputPath = args[1];
    FileUtils.deleteQuietly(new File(outputPath));

    JavaSparkContext sc = new JavaSparkContext("local", "sparkwordcount");

    JavaRDD<String> rdd = sc.textFile(inputPath);

    JavaPairRDD<String, Integer> counts = rdd
            .flatMap(x -> Arrays.asList(x.split(" ")).iterator())
            .mapToPair(x -> new Tuple2<String, Integer>((String) x, 1))
            .reduceByKey((x, y) -> x + y);

    counts.saveAsTextFile(outputPath);
    sc.close();
}
项目:rdf2x    文件:InstancePartitioner.java   
/**
 * Partition instances by the specified partitioning (e.g. by instance type)
 *
 * @param instances RDD of instances to partition
 * @return partitioned RDD if requested, original RDD if no partitioning is specified
 */
public JavaRDD<Instance> partition(JavaRDD<Instance> instances) {
    if (!config.isRepartitionByType()) {
        return instances;
    }
    log.info("Getting counts by type hash");
    Map<Integer, Long> typeCounts = getApproximateTypeHashCounts(instances);
    int numPartitions = instances.getNumPartitions();
    long totalInstances = instances.count();
    long instancesPerPartition = totalInstances / numPartitions + 1;

    JavaPairRDD<Integer, Instance> instanceWithPartitions = instances.mapToPair(instance -> {
        int typeHash = getTypeHash(instance);
        int splitIncrement = getSplitIncrement(instance.getId(), typeCounts.get(typeHash), instancesPerPartition);
        return new Tuple2<>(typeHash + splitIncrement, instance);
    });

    log.info("Partitioning instances by type");
    return instanceWithPartitions
            .partitionBy(new HashPartitioner(numPartitions))
            .values();
}
项目:incubator-sdap-mudrod    文件:RDDUtil.java   
/**
 * getAllWordsInDoc: Extracted all unique terms from all docs.
 *
 * @param docwordRDD Pair RDD, each key is a doc, and value is term list extracted from
 *                   that doc.
 * @return unique term list
 */
public static JavaRDD<String> getAllWordsInDoc(JavaPairRDD<String, List<String>> docwordRDD) {
  JavaRDD<String> wordRDD = docwordRDD.values().flatMap(new FlatMapFunction<List<String>, String>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Iterator<String> call(List<String> list) {
      return list.iterator();
    }
  }).distinct();

  return wordRDD;
}
项目:incubator-sdap-mudrod    文件:MatrixGenerator.java   
/**
 * Generate a csv which is a term-metadata matrix genetrated from original
 * metadata.
 *
 * @see DiscoveryStepAbstract#execute()
 */
@Override
public Object execute() {
  LOG.info("Metadata matrix started");
  startTime = System.currentTimeMillis();

  String metadataMatrixFile = props.getProperty("metadataMatrix");
  try {
    MetadataExtractor extractor = new MetadataExtractor();
    JavaPairRDD<String, List<String>> metadataTermsRDD = extractor.loadMetadata(this.es, this.spark.sc, props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.RAW_METADATA_TYPE));
    LabeledRowMatrix wordDocMatrix = MatrixUtil.createWordDocMatrix(metadataTermsRDD);
    MatrixUtil.exportToCSV(wordDocMatrix.rowMatrix, wordDocMatrix.rowkeys, wordDocMatrix.colkeys, metadataMatrixFile);

  } catch (Exception e) {
    LOG.error("Error during Metadata matrix generaion: {}", e);
  }

  endTime = System.currentTimeMillis();
  LOG.info("Metadata matrix finished time elapsed: {}s", (endTime - startTime) / 1000);
  return null;
}
项目:oryx2    文件:ALSUpdate.java   
private static RDD<Tuple2<Object,double[]>> readAndConvertFeatureRDD(
    JavaPairRDD<String,float[]> javaRDD,
    Broadcast<Map<String,Integer>> bIdToIndex) {

  RDD<Tuple2<Integer,double[]>> scalaRDD = javaRDD.mapToPair(t ->
      new Tuple2<>(bIdToIndex.value().get(t._1()), t._2())
  ).mapValues(f -> {
      double[] d = new double[f.length];
      for (int i = 0; i < d.length; i++) {
        d[i] = f[i];
      }
      return d;
    }
  ).rdd();

  // This mimics the persistence level establish by ALS training methods
  scalaRDD.persist(StorageLevel.MEMORY_AND_DISK());

  @SuppressWarnings("unchecked")
  RDD<Tuple2<Object,double[]>> objKeyRDD = (RDD<Tuple2<Object,double[]>>) (RDD<?>) scalaRDD;
  return objKeyRDD;
}
项目:incubator-sdap-mudrod    文件:SVDAnalyzer.java   
/**
 * GetSVDMatrix: Create SVD matrix csv file from original csv file.
 *
 * @param csvFileName       each row is a term, and each column is a document.
 * @param svdDimention      Dimension of SVD matrix
 * @param svdMatrixFileName CSV file name of SVD matrix
 */
public void getSVDMatrix(String csvFileName, int svdDimention, String svdMatrixFileName) {

  JavaPairRDD<String, Vector> importRDD = MatrixUtil.loadVectorFromCSV(spark, csvFileName, 1);
  JavaRDD<Vector> vectorRDD = importRDD.values();
  RowMatrix wordDocMatrix = new RowMatrix(vectorRDD.rdd());
  RowMatrix tfidfMatrix = MatrixUtil.createTFIDFMatrix(wordDocMatrix);
  RowMatrix svdMatrix = MatrixUtil.buildSVDMatrix(tfidfMatrix, svdDimention);

  List<String> rowKeys = importRDD.keys().collect();
  List<String> colKeys = new ArrayList<>();
  for (int i = 0; i < svdDimention; i++) {
    colKeys.add("dimension" + i);
  }
  MatrixUtil.exportToCSV(svdMatrix, rowKeys, colKeys, svdMatrixFileName);
}
项目:MinoanER    文件:LabelMatchingHeuristic.java   
/**
 * Return an RDD with keys: label objects, and values: entity ids from a single collection, having this label
 * @param inputTriples
 * @param labelAtts
 * @param entityIds
 * @param SEPARATOR
 * @param positiveIds
 * @return 
 */
private JavaPairRDD<String,Integer> getLabelBlocks(JavaRDD<String> inputTriples, Set<String> labelAtts, JavaRDD<String> entityIds, String SEPARATOR, boolean positiveIds) {
    Object2IntOpenHashMap<String> urls1 = Utils.readEntityIdsMapping(entityIds, positiveIds);
    return inputTriples.mapToPair(line -> {
    String[] spo = line.toLowerCase().replaceAll(" \\.$", "").split(SEPARATOR); //lose the ending " ." from valid .nt files
      if (spo.length < 3) {
          return null;
      }          
      if (labelAtts.contains(spo[1])) {
        String labelValue = line.substring(line.indexOf(spo[1])+spo[1].length()+SEPARATOR.length())
                .toLowerCase().replaceAll("[^a-z0-9 ]", "").trim();
        int subjectId = urls1.getInt(Utils.encodeURIinUTF8(spo[0])); //replace subject url with entity id
        if (!positiveIds) {
            subjectId = -subjectId;
        }
        return new Tuple2<String,Integer>(labelValue,subjectId);
      } else {
          return null;
      }          
    })
    .filter(x-> x!= null)
    .distinct();
}
项目:MinoanER    文件:CNPNeighborsUnnormalized.java   
/**
 * 
 * @param topKvalueCandidates the topK results per entity, acquired from value similarity
 * @param rawTriples1 the rdf triples of the first entity collection
 * @param rawTriples2 the rdf triples of the second entity collection
 * @param SEPARATOR the delimiter that separates subjects, predicates and objects in the rawTriples1 and rawTriples2 files
 * @param entityIds1 the mapping of entity urls to entity ids, as it was used in blocking
 * @param entityIds2
 * @param MIN_SUPPORT_THRESHOLD the minimum support threshold, below which, relations are discarded from top relations
 * @param K the K for topK candidate matches
 * @param N the N for topN rdf relations (and neighbors)
 * @param jsc the java spark context used to load files and broadcast variables
 * @return topK neighbor candidates per entity
 */
public JavaPairRDD<Integer, IntArrayList> run(JavaPairRDD<Integer,Int2FloatLinkedOpenHashMap> topKvalueCandidates, 
        JavaRDD<String> rawTriples1, 
        JavaRDD<String> rawTriples2,             
        String SEPARATOR, 
        JavaRDD<String> entityIds1, 
        JavaRDD<String> entityIds2, 
        float MIN_SUPPORT_THRESHOLD,
        int K,
        int N, 
        JavaSparkContext jsc) {

    Map<Integer,IntArrayList> inNeighbors = new HashMap<>(new RelationsRank().run(rawTriples1, SEPARATOR, entityIds1, MIN_SUPPORT_THRESHOLD, N, true, jsc));
    inNeighbors.putAll(new RelationsRank().run(rawTriples2, SEPARATOR, entityIds2, MIN_SUPPORT_THRESHOLD, N, false, jsc));

    Broadcast<Map<Integer,IntArrayList>> inNeighbors_BV = jsc.broadcast(inNeighbors);

    //JavaPairRDD<Integer, IntArrayList> topKneighborCandidates =  getTopKNeighborSims(topKvalueCandidates, inNeighbors_BV, K);        
    JavaPairRDD<Integer, IntArrayList> topKneighborCandidates =  getTopKNeighborSimsSUM(topKvalueCandidates, inNeighbors_BV, K);        
    return topKneighborCandidates;
}
项目:oryx2    文件:SaveToHDFSFunction.java   
@Override
public void call(JavaPairRDD<K,M> rdd, Time time) throws IOException {
  if (rdd.isEmpty()) {
    log.info("RDD was empty, not saving to HDFS");
  } else {
    String file = prefix + "-" + time.milliseconds() + "." + suffix;
    Path path = new Path(file);
    FileSystem fs = FileSystem.get(path.toUri(), hadoopConf);
    if (fs.exists(path)) {
      log.warn("Saved data already existed, possibly from a failed job. Deleting {}", path);
      fs.delete(path, true);
    }
    log.info("Saving RDD to HDFS at {}", file);
    rdd.mapToPair(
        new ValueToWritableFunction<>(keyClass, messageClass, keyWritableClass, messageWritableClass)
    ).saveAsNewAPIHadoopFile(
        file,
        keyWritableClass,
        messageWritableClass,
        SequenceFileOutputFormat.class,
        hadoopConf);
  }
}
项目:MinoanER    文件:BlockFilteringAdvanced.java   
public JavaPairRDD<Integer,IntArrayList> parseBlockCollection(JavaRDD<String> blockingInput) {
    System.out.println("Parsing the blocking collection...");
    return blockingInput
        .map(line -> line.split("\t")) //split to [blockId, [entityIds]]
        .filter(line -> line.length == 2) //only keep lines of this format
        .mapToPair(pair -> {                
            int blockId = Integer.parseInt(pair[0]);
            String[] entities = pair[1].replaceFirst(";", "").split("#");
            if (entities == null || entities.length == 0) {
                return null;
            }
            List<Integer> outputEntities = new ArrayList<>(); //possible (but not really probable) cause of OOM (memory errors) if huge blocks exist
            for (String entity : entities) {
                if (entity.isEmpty()) continue; //in case the last entityId finishes with '#'
                Integer entityId = Integer.parseInt(entity);                                
                outputEntities.add(entityId);
            }
            return new Tuple2<>(blockId, new IntArrayList(outputEntities.stream().mapToInt(i->i).toArray()));
        })
        .filter(x -> x != null);
}
项目:MinoanER    文件:BlockingEvaluation.java   
/**
 * Compute precision, recall, f-measure of the input results, given the ground truth. 
 * The input RDDs should be in the same format (negative entity Id, positive entity Id).
 * @param blockingResults the blocking results in the form (-entityId, +entityId)
 * @param groundTruth the ground truth in the form (-entityId, +entityId)
 * @param TPs true positives to update (true matches)
 * @param FPs false positives to update (false matches)
 * @param FNs false negatives to update (missed matches)
 */
public void evaluateBlockingResults(JavaPairRDD<Integer,IntArrayList> blockingResults, JavaPairRDD<Integer,Integer> groundTruth, LongAccumulator TPs, LongAccumulator FPs, LongAccumulator FNs, boolean verbose) {
    blockingResults
            .fullOuterJoin(groundTruth)
            .foreach(joinedMatch -> {
                IntArrayList myCandidates = joinedMatch._2()._1().orElse(null);
                Integer correctResult = joinedMatch._2()._2().orElse(null);
                if (myCandidates == null) { //this means that the correct result is not null (otherwise, nothing to join here)
                    FNs.add(1); //missed match
                    if (verbose) {
                        System.out.println("FN: Did not provide any match for "+joinedMatch._1());
                    }
                } else if (correctResult == null) {
                    FPs.add(myCandidates.size()); //each candidate is a false match (no candidate should exist)
                } else if (myCandidates.contains(correctResult)) {
                    TPs.add(1); //true match
                    FPs.add(myCandidates.size()-1); //the rest are false matches (ideal: only one candidate suggested)
                } else {        //then the correct result is not included in my candidates => I missed this match and all my candidates are wrong
                    FPs.add(myCandidates.size()); //all my candidates were false 
                    FNs.add(1); //the correct match was missed
                    if (verbose) {
                        System.out.println("FN: Provided false matches "+myCandidates+" for "+joinedMatch._1()+". The correct results was "+correctResult);
                    }
                }                    
            });
}
项目:MinoanER    文件:EvaluateMatchingResults.java   
/**
 * Compute precision, recall, f-measure of the input results, given the ground truth. 
 * The input RDDs should be in the same format (negative entity Id, positive entity Id). 
 * This is a void method, as it only changes the accumulator values. 
 * @param results the matching results in the form (-entityId, +entityId)
 * @param groundTruth the ground truth in the form (-entityId, +entityId)
 * @param TPs
 * @param FPs
 * @param FNs
 */
public void evaluateResults(JavaPairRDD<Integer,Integer> results, JavaPairRDD<Integer,Integer> groundTruth, LongAccumulator TPs, LongAccumulator FPs, LongAccumulator FNs) {
    results
            .fullOuterJoin(groundTruth)
            .foreach(joinedMatch -> {
                Integer myResult = joinedMatch._2()._1().orElse(null);
                Integer correctResult = joinedMatch._2()._2().orElse(null);
                if (myResult == null) {
                    FNs.add(1); //missed match
                } else if (correctResult == null) {
                    FPs.add(1); //wrong match
                } else if (myResult.equals(correctResult)) {
                    TPs.add(1); //true match
                } else {        //then I gave a different result than the correct match
                    FPs.add(1); //my result was wrong 
                    FNs.add(1); //the correct match was missed
                }                    
            });
}
项目:MinoanER    文件:EvaluateMatchingResults.java   
/**
 * Compute precision, recall, f-measure of the input results, given the ground truth. 
 * The input RDDs should be in the same format (negative entity Id, positive entity Id). 
 * This is a void method, as it only changes the accumulator values. 
 * @param results the matching results in the form (-entityId, +entityId)
 * @param groundTruth the ground truth in the form (-entityId, +entityId)
 * @param TPs
 * @param FPs
 * @param FNs
 */
public void evaluateResultsNEW(JavaPairRDD<Integer,Integer> results, JavaPairRDD<Integer,Integer> groundTruth, LongAccumulator TPs, LongAccumulator FPs, LongAccumulator FNs) {
    results
            .rightOuterJoin(groundTruth)
            .foreach(joinedMatch -> {
                Integer myResult = joinedMatch._2()._1().orElse(null);
                Integer correctResult = joinedMatch._2()._2();
                if (myResult == null) {
                    FNs.add(1); //missed match
                /*} else if (correctResult == null) {
                    FPs.add(1); //wrong match
                    */
                } else if (myResult.equals(correctResult)) {
                    TPs.add(1); //true match
                } else {        //then I gave a different result than the correct match
                    FPs.add(1); //my result was wrong 
                    FNs.add(1); //the correct match was missed
                }                    
            });
}
项目:MinoanER    文件:BlockFilteringAdvancedTest.java   
/**
 * Test of parseBlockCollection method, of class BlockFilteringAdvanced.
 */
@Test
public void testParseBlockCollection() {
    System.out.println("parseBlockCollection");
    List<String> dummyBlocks = new ArrayList<>();
    dummyBlocks.add("0\t1#2#3#4#5#;-1#-2#-3#-4#-5#");
    dummyBlocks.add("1\t3#4#5#;-1#-5#");
    dummyBlocks.add("2\t5#;-5#");
    dummyBlocks.add("3\t5#;");
    JavaRDD<String> blockingInput = jsc.parallelize(dummyBlocks);
    BlockFilteringAdvanced instance = new BlockFilteringAdvanced();        
    JavaPairRDD<Integer, IntArrayList> result = instance.parseBlockCollection(blockingInput);

    List<Tuple2<Integer,IntArrayList>> dummyBlocksParsed = new ArrayList<>();
    dummyBlocksParsed.add(new Tuple2<>(0, new IntArrayList(new int[]{1,2,3,4,5,-1,-2,-3,-4,-5})));
    dummyBlocksParsed.add(new Tuple2<>(1, new IntArrayList(new int[]{3,4,5,-1,-5})));
    dummyBlocksParsed.add(new Tuple2<>(2, new IntArrayList(new int[]{5,-5})));
    dummyBlocksParsed.add(new Tuple2<>(3, new IntArrayList(new int[]{5})));
    JavaPairRDD<Integer, IntArrayList> expResult = jsc.parallelizePairs(dummyBlocksParsed);

    List<Tuple2<Integer, IntArrayList>> resultList = result.collect();
    List<Tuple2<Integer, IntArrayList>> expResultList = expResult.collect();
    System.out.println("Result: "+Arrays.toString(resultList.toArray()));
    System.out.println("Expect: "+Arrays.toString(expResultList.toArray()));
    assertEquals(resultList, expResultList);
}
项目:gcp    文件:BigQueryHelper.java   
public static <X> VoidFunction<JavaPairRDD<X, JsonObject>> outputTo(String table, String schema) throws IOException {
  Configuration conf = new Configuration();
  conf.set("mapreduce.job.outputformat.class", BigQueryOutputFormat.class.getName());
  BigQueryConfiguration.configureBigQueryOutput(conf, table, schema);

  return rdd -> {
    if (rdd.count() > 0L) {
      long time = System.currentTimeMillis();
      /* This was only required the first time on a fresh table, it seems I had to kickstart the _PARTITIONTIME pseudo-column
       * but now it automatically add to the proper table using ingestion time. Using the decorator would only be required
       * if we were to place the entries using their "event timestamp", e.g. loading rows on old partitions.
       * Implementing that would be much harder though, since'd have to check each message, or each "partition" (date-based)
      if (partitioned) {
        String today = ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ofPattern("yyyyMMdd"));
        BigQueryConfiguration.configureBigQueryOutput(conf, table + "$" + today, schema);
      }*/
      rdd.saveAsNewAPIHadoopDataset(conf);
      System.out.printf("Sent %d rows to BQ in %.1fs\n", rdd.count(), (System.currentTimeMillis() - time) / 1000f);
    }
  };
}
项目:spark-dependencies    文件:DependenciesSparkHelper.java   
/**
 * Derives dependency links based on supplied spans (e.g. multiple traces). If there is a link A->B
 * in multiple traces it will return just one {@link Dependency} link with a correct {@link Dependency#callCount}.
 *
 * @param traceIdSpans <traceId, trace> {@link org.apache.spark.api.java.JavaRDD} with trace id and a collection of
 *                     spans with that traceId.
 * @return Aggregated dependency links for all traces.
 */
public static List<Dependency> derive(JavaPairRDD<String, Iterable<Span>> traceIdSpans) {
  return traceIdSpans.flatMapValues(new SpansToDependencyLinks())
      .values()
      .mapToPair(dependency -> new Tuple2<>(new Tuple2<>(dependency.getParent(), dependency.getChild()), dependency))
      .reduceByKey((v1, v2) -> new Dependency(v1.getParent(), v1.getChild(), v1.getCallCount() + v2.getCallCount()))
      .values()
      .collect();
}
项目:ViraPipe    文件:RepartitionFastq.java   
public static void main(String[] args) throws IOException {

        if (args.length < 1) {
            System.err.println("Usage: RepartitionFastq <input path> <output path> <number of partitions>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf().setAppName("RepartitionFastq");
        //conf.set("spark.default.parallelism", String.valueOf(args[2]));
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaPairRDD<Text, SequencedFragment> fastqRDD = sc.newAPIHadoopFile(args[0], FastqInputFormat.class, Text.class, SequencedFragment.class, sc.hadoopConfiguration());

        JavaPairRDD<Text, SequencedFragment> repartitioned = fastqRDD.repartition(Integer.valueOf(args[2]));

        repartitioned.saveAsNewAPIHadoopFile(args[1], Text.class, SequencedFragment.class, FastqOutputFormat.class, sc.hadoopConfiguration());

        sc.stop();
    }
项目:athena    文件:SVMDistJob.java   
public SVMModel generateDecisionTreeWithPreprocessing(JavaPairRDD<Object, BSONObject> mongoRDD,
                                                               AthenaMLFeatureConfiguration athenaMLFeatureConfiguration,
                                                               SVMDetectionAlgorithm SVMDetectionAlgorithm,
                                                               Marking marking,
                                                               SVMModelSummary SVMModelSummary) {

    return generateKMeansModel(
            rddPreProcessing(mongoRDD, athenaMLFeatureConfiguration, SVMModelSummary,
                    marking),
            SVMDetectionAlgorithm, SVMModelSummary
    );
}
项目:ViraPipe    文件:Interleave.java   
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {

    List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
    List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);

    JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
    JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
    JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);

    zips.foreach( splits ->  {
      Path path = splits._1.getPath();
      FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
      FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);
      writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq");
    });
  }
项目:ViraPipe    文件:HDFSWriter.java   
private static JavaPairRDD<Text, SequencedFragment> alignmentsToFastq(JavaRDD<String> alignmentRDD, SAMFileHeader header) {
    return alignmentRDD.mapPartitionsToPair(alns -> {

        List<Tuple2<Text, SequencedFragment>> records = new ArrayList<Tuple2<Text, SequencedFragment>>();
        final SAMLineParser samLP = new SAMLineParser(new DefaultSAMRecordFactory(), ValidationStringency.SILENT, header, null, null);
        while (alns.hasNext()) {
            String aln = alns.next().replace("\r\n", "").replace("\n", "").replace(System.lineSeparator(), "");
            try{
                SAMRecord sam = samLP.parseLine(aln);
                String[] fields = aln.split("\\t");
                String name = fields[0];
                if(sam.getReadPairedFlag()){
                    if(sam.getFirstOfPairFlag())
                        name = name+"/1";
                    if(sam.getSecondOfPairFlag())
                        name = name+"/2";
                }

                String bases = fields[9];
                String quality = fields[10];

                Text t = new Text(name);
                SequencedFragment sf = new SequencedFragment();
                sf.setSequence(new Text(bases));
                sf.setQuality(new Text(quality));
                records.add(new Tuple2<Text, SequencedFragment>(t, sf));
            }catch(SAMFormatException e){
                System.out.println(e.getMessage().toString());
            }
        }
        return records.iterator();
    });
}
项目:athena    文件:LinearRegressionDistJob.java   
public LinearRegressionModel generateDecisionTreeWithPreprocessing(JavaPairRDD<Object, BSONObject> mongoRDD,
                                                                   AthenaMLFeatureConfiguration athenaMLFeatureConfiguration,
                                                                   LinearRegressionDetectionAlgorithm linearRegressionDetectionAlgorithm,
                                                                   Marking marking,
                                                                   LinearRegressionModelSummary linearRegressionModelSummary) {

    return generateKMeansModel(
            rddPreProcessing(mongoRDD, athenaMLFeatureConfiguration, linearRegressionModelSummary,
                    marking),
            linearRegressionDetectionAlgorithm, linearRegressionModelSummary
    );
}
项目:athena    文件:KMeansDistJob.java   
public KMeansModel generateKmeansWithPreprocessing(JavaPairRDD<Object, BSONObject> mongoRDD,
                                                   AthenaMLFeatureConfiguration athenaMLFeatureConfiguration,
                                                   KMeansDetectionAlgorithm kMeansDetectionAlgorithm,
                                                   KmeansModelSummary kmeansModelSummary) {
    return generateKMeansModel(
            rddPreProcessing(mongoRDD, athenaMLFeatureConfiguration, kmeansModelSummary),
            kMeansDetectionAlgorithm, kmeansModelSummary
    );
}
项目:spark-dependencies    文件:ElasticsearchDependenciesJob.java   
void run(String spanResource, String depResource) {
  log.info("Running Dependencies job for {}, reading from {} index, result storing to {}", day, spanResource ,depResource);
  JavaSparkContext sc = new JavaSparkContext(conf);
  try {
    JavaPairRDD<String, Iterable<Span>> traces = JavaEsSpark.esJsonRDD(sc, spanResource)
        .map(new ElasticTupleToSpan())
        .groupBy(Span::getTraceId);

    List<Dependency> dependencyLinks = DependenciesSparkHelper.derive(traces);
    store(sc, dependencyLinks, depResource);
    log.info("Done, {} dependency objects created", dependencyLinks.size());
  } finally {
    sc.stop();
  }
}
项目:ViraPipe    文件:HDFSWriter.java   
public static JavaPairRDD<SAMRecord, SAMRecordWritable> readsToWritableNoHeader(JavaRDD<SAMRecord> records) {
    return records.mapToPair(read -> {
        final SAMRecordWritable samRecordWritable = new SAMRecordWritable();
        samRecordWritable.set(read);
        return new Tuple2<>(read, samRecordWritable);
    });
}
项目:Apache-Spark-2x-for-Java-Developers    文件:WordCountTransformOpEx.java   
public static void main(String[] args) throws Exception {

      System.setProperty("hadoop.home.dir", "E:\\hadoop");

   SparkConf sparkConf = new SparkConf().setAppName("WordCountSocketEx").setMaster("local[*]");
   JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
   Logger rootLogger = LogManager.getRootLogger();
        rootLogger.setLevel(Level.WARN); 
   List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<>("hello", 10), new Tuple2<>("world", 10));
   JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);


   JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream( "10.0.75.1", Integer.parseInt("9000"), StorageLevels.MEMORY_AND_DISK_SER);

   JavaDStream<String> words = StreamingLines.flatMap( str -> Arrays.asList(str.split(" ")).iterator() );

   JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str-> new Tuple2<>(str, 1)).reduceByKey((count1,count2) ->count1+count2 );

   wordCounts.print();

JavaPairDStream<String, Integer> joinedDstream = wordCounts
        .transformToPair(new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
            @Override
            public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
                JavaPairRDD<String, Integer> modRDD = rdd.join(initialRDD).mapToPair(
                        new PairFunction<Tuple2<String, Tuple2<Integer, Integer>>, String, Integer>() {
                            @Override
                            public Tuple2<String, Integer> call(
                                    Tuple2<String, Tuple2<Integer, Integer>> joinedTuple) throws Exception {
                                return new Tuple2<>(joinedTuple._1(),(joinedTuple._2()._1() + joinedTuple._2()._2()));
                            }
                        });
                return modRDD;
            }
        });

   joinedDstream.print();
   streamingContext.start();
   streamingContext.awaitTermination();
 }
项目:athena    文件:GradientBoostedTreesDistJob.java   
public GradientBoostedTreesModel generateDecisionTreeWithPreprocessing(JavaPairRDD<Object, BSONObject> mongoRDD,
                                                               AthenaMLFeatureConfiguration athenaMLFeatureConfiguration,
                                                               GradientBoostedTreesDetectionAlgorithm gradientBoostedTreesDetectionAlgorithm,
                                                               Marking marking,
                                                               GradientBoostedTreesModelSummary gradientBoostedTreesModelSummary) {

    return generateKMeansModel(
            rddPreProcessing(mongoRDD, athenaMLFeatureConfiguration, gradientBoostedTreesModelSummary,
                    marking),
            gradientBoostedTreesDetectionAlgorithm, gradientBoostedTreesModelSummary
    );
}
项目:oryx2    文件:Evaluation.java   
private static JavaPairRDD<Integer,Iterable<Rating>> predictAll(
    MatrixFactorizationModel mfModel,
    JavaRDD<Rating> data,
    JavaPairRDD<Integer,Integer> userProducts) {
  @SuppressWarnings("unchecked")
  RDD<Tuple2<Object,Object>> userProductsRDD =
      (RDD<Tuple2<Object,Object>>) (RDD<?>) userProducts.rdd();
  return data.wrapRDD(mfModel.predict(userProductsRDD)).groupBy(Rating::user);
}
项目:Apache-Spark-2x-for-Java-Developers    文件:WordCountRecoverableEx.java   
protected static JavaStreamingContext createContext(String ip, int port, String checkpointDirectory) {
    SparkConf sparkConf = new SparkConf().setAppName("WordCountRecoverableEx").setMaster("local[*]");
    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
    streamingContext.checkpoint(checkpointDirectory);
    // Initial state RDD input to mapWithState
    @SuppressWarnings("unchecked")
    List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
    JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);

    JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream(ip,port, StorageLevels.MEMORY_AND_DISK_SER);

    JavaDStream<String> words = StreamingLines.flatMap(str -> Arrays.asList(str.split(" ")).iterator());

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str -> new Tuple2<>(str, 1))
            .reduceByKey((count1, count2) -> count1 + count2);

    // Update the cumulative count function
    Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc = new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> call(String word, Optional<Integer> one, State<Integer> state) {
            int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
            Tuple2<String, Integer> output = new Tuple2<>(word, sum);
            state.update(sum);
            return output;
        }
    };

    // DStream made of get cumulative counts that get updated in every batch
    JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = wordCounts
            .mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));

    stateDstream.print();
    return streamingContext;
}
项目:oryx2    文件:ALSUpdate.java   
@Override
public void publishAdditionalModelData(JavaSparkContext sparkContext,
                                       PMML pmml,
                                       JavaRDD<String> newData,
                                       JavaRDD<String> pastData,
                                       Path modelParentPath,
                                       TopicProducer<String, String> modelUpdateTopic) {
  // Send item updates first, before users. That way, user-based endpoints like /recommend
  // may take longer to not return 404, but when they do, the result will be more complete.
  log.info("Sending item / Y data as model updates");
  String yPathString = AppPMMLUtils.getExtensionValue(pmml, "Y");
  JavaPairRDD<String,float[]> productRDD = readFeaturesRDD(sparkContext, new Path(modelParentPath, yPathString));

  String updateBroker = modelUpdateTopic.getUpdateBroker();
  String topic = modelUpdateTopic.getTopic();

  // For now, there is no use in sending known users for each item
  productRDD.foreachPartition(new EnqueueFeatureVecsFn("Y", updateBroker, topic));

  log.info("Sending user / X data as model updates");
  String xPathString = AppPMMLUtils.getExtensionValue(pmml, "X");
  JavaPairRDD<String,float[]> userRDD = readFeaturesRDD(sparkContext, new Path(modelParentPath, xPathString));

  if (noKnownItems) {
    userRDD.foreachPartition(new EnqueueFeatureVecsFn("X", updateBroker, topic));
  } else {
    log.info("Sending known item data with model updates");
    JavaRDD<String[]> allData =
        (pastData == null ? newData : newData.union(pastData)).map(MLFunctions.PARSE_FN);
    JavaPairRDD<String,Collection<String>> knownItems = knownsRDD(allData, true);
    userRDD.join(knownItems).foreachPartition(
        new EnqueueFeatureVecsAndKnownItemsFn("X", updateBroker, topic));
  }
}
项目:Apache-Spark-2x-for-Java-Developers    文件:MapSideJoinBroadcast.java   
public static void main(String[] args) {

        SparkSession sparkSession = SparkSession.builder().master("local").appName("My App")
                .config("spark.sql.warehouse.dir", "file:////C:/Users/sgulati/spark-warehouse").getOrCreate();

        JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());

        JavaPairRDD<String, String> userIdToCityId = jsc.parallelizePairs(
                Arrays.asList(new Tuple2<String, String>("1", "101"), new Tuple2<String, String>("2", "102"),
                        new Tuple2<String, String>("3", "107"), new Tuple2<String, String>("4", "103"),
                        new Tuple2<String, String>("11", "101"), new Tuple2<String, String>("12", "102"),
                        new Tuple2<String, String>("13", "107"), new Tuple2<String, String>("14", "103")));

        JavaPairRDD<String, String> cityIdToCityName = jsc.parallelizePairs(
                Arrays.asList(new Tuple2<String, String>("101", "India"), new Tuple2<String, String>("102", "UK"),
                        new Tuple2<String, String>("103", "Germany"), new Tuple2<String, String>("107", "USA")));

        Broadcast<Map<String, String>> citiesBroadcasted = jsc.broadcast(cityIdToCityName.collectAsMap());

        JavaRDD<Tuple3<String, String, String>> joined = userIdToCityId.map(
                v1 -> new Tuple3<String, String, String>(v1._1(), v1._2(), citiesBroadcasted.value().get(v1._2())));

        System.out.println(joined.collect());

    }
项目:oryx2    文件:ALSUpdate.java   
/**
 * Combines {@link Rating}s with the same user/item into one, with score as the sum of
 * all of the scores.
 */
private JavaRDD<Rating> aggregateScores(JavaRDD<Rating> original, double epsilon) {
  JavaPairRDD<Tuple2<Integer,Integer>,Double> tuples =
      original.mapToPair(rating -> new Tuple2<>(new Tuple2<>(rating.user(), rating.product()), rating.rating()));

  JavaPairRDD<Tuple2<Integer,Integer>,Double> aggregated;
  if (implicit) {
    // TODO can we avoid groupByKey? reduce, combine, fold don't seem viable since
    // they don't guarantee the delete elements are properly handled
    aggregated = tuples.groupByKey().mapValues(MLFunctions.SUM_WITH_NAN);
  } else {
    // For non-implicit, last wins.
    aggregated = tuples.foldByKey(Double.NaN, (current, next) -> next);
  }

  JavaPairRDD<Tuple2<Integer,Integer>,Double> noNaN =
      aggregated.filter(kv -> !Double.isNaN(kv._2()));

  if (logStrength) {
    return noNaN.map(userProductScore -> new Rating(
        userProductScore._1()._1(),
        userProductScore._1()._2(),
        Math.log1p(userProductScore._2() / epsilon)));
  } else {
    return noNaN.map(userProductScore -> new Rating(
        userProductScore._1()._1(),
        userProductScore._1()._2(),
        userProductScore._2()));
  }
}
项目:oryx2    文件:MockBatchUpdate.java   
private static Collection<Tuple2<String,String>> collect(JavaPairRDD<String,String> rdd) {
  if (rdd == null) {
    return Collections.emptyList();
  } else {
    return rdd.collect();
  }
}
项目:athena    文件:MachineLearningManagerImpl.java   
public RandomForestValidationSummary validateRandomForestAthenaFeatures(JavaSparkContext sc,
                                                                        FeatureConstraint featureConstraint,
                                                                        AthenaMLFeatureConfiguration athenaMLFeatureConfiguration,
                                                                        RandomForestDetectionModel randomForestDetectionModel,
                                                                        Indexing indexing, Marking marking) {
    long start = System.nanoTime(); // <-- start

    JavaPairRDD<Object, BSONObject> mongoRDD;
    mongoRDD = sc.newAPIHadoopRDD(
            mongodbConfig,            // Configuration
            MongoInputFormat.class,   // InputFormat: read from a live cluster.
            Object.class,             // Key class
            BSONObject.class          // Value class
    );

    RandomForestDetectionAlgorithm randomForestDetectionAlgorithm = (RandomForestDetectionAlgorithm) randomForestDetectionModel.getDetectionAlgorithm();

    RandomForestValidationSummary randomForestValidationSummary =
            new RandomForestValidationSummary(sc.sc(), randomForestDetectionAlgorithm.getNumClasses(), indexing, marking);

    RandomForestDistJob randomForestDistJob = new RandomForestDistJob();

    randomForestDistJob.validate(mongoRDD,
            athenaMLFeatureConfiguration,
            randomForestDetectionModel,
            randomForestValidationSummary);


    long end = System.nanoTime(); // <-- start
    long time = end - start;

    randomForestValidationSummary.setTotalValidationTime(time);
    return randomForestValidationSummary;
}
项目:incubator-sdap-mudrod    文件:SessionCooccurence.java   
/**
 * filter out-of-data metadata
 *
 * @param es
 *          the Elasticsearch drive
 * @param userDatasetsRDD
 *          dataset extracted from session
 * @return filtered session datasets
 */
public JavaPairRDD<String, List<String>> removeRetiredDataset(ESDriver es, JavaPairRDD<String, List<String>> userDatasetsRDD) {

  Map<String, String> nameMap = this.getOnServiceMetadata(es);

  return userDatasetsRDD.mapToPair(new PairFunction<Tuple2<String, List<String>>, String, List<String>>() {
    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, List<String>> call(Tuple2<String, List<String>> arg0) throws Exception {
      List<String> oriDatasets = arg0._2;
      List<String> newDatasets = new ArrayList<>();
      int size = oriDatasets.size();
      for (int i = 0; i < size; i++) {
        String name = oriDatasets.get(i);
        if (nameMap.containsKey(name)) {
          newDatasets.add(nameMap.get(name));
        }
      }
      return new Tuple2<>(arg0._1, newDatasets);
    }
  });

}