/** * Uses the Hadoop API to store results * @param result */ @Override public void store(JavaRDD<ObjectValue> result) { logger.info("Storing result."); JavaSparkContext sc = NotaQL.SparkFactory.getSparkContext(); String mongoDBHost = "mongodb://" + NotaQL.prop.getProperty("mongodb_host", "localhost") + ":27017/"; Configuration config = new Configuration(); config.set("mongo.output.uri", mongoDBHost + databaseName + "." + collectionName); JavaPairRDD<Object,BSONObject> output = result.mapToPair( o -> new Tuple2<>(null, (DBObject)ValueConverter.convertFromNotaQL(o)) ); if(NotaQL.prop.getProperty("log_output") != null && NotaQL.prop.getProperty("log_output").equals("true")) output.foreach(t -> logger.info("Storing object: " + t._2.toString())); else logger.info("Storing objects."); output.saveAsNewAPIHadoopFile("file:///notapplicable", Object.class, Object.class, MongoOutputFormat.class, config); logger.info("Stored result."); }
public static void writerdd(JavaPairRDD<String, Map<String, Map<String, String>>> inputRdd, String mongouri) { // Mongo-Hadoop Specific configuration org.apache.hadoop.conf.Configuration midbconf = new org.apache.hadoop.conf.Configuration(); midbconf.set("mongo.output.format", "com.mongodb.hadoop.MongoOutputFormat"); midbconf.set("mongo.output.uri", mongouri); // Map RDD to MongoDB Objects JavaPairRDD<Object, BSONObject> mongordd = inputRdd .mapToPair(new basicDBMongo()); // Update MongoDB mongordd.saveAsNewAPIHadoopFile("file:///notapplicable", Object.class, Object.class, MongoOutputFormat.class, midbconf); }
/** * Instantiates a new Mongo extractor. */ public MongoExtractor() { super(); this.inputFormat = new MongoInputFormat(); this.outputFormat = new MongoOutputFormat(); }
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { if(args.length < 3) { System.err.println("Usage: MapReduceExercise " + "[mongodb input uri] " + "[mongodb output uri] " + "update=[true or false]"); System.err.println("Example: MapReduceExercise " + "mongodb://127.0.0.1:27017/movielens.ratings " + "mongodb://127.0.0.1:27017/movielens.ratings.stats update=false"); System.err.println("Example: MapReduceExercise " + "mongodb://127.0.0.1:27017/movielens.ratings " + "mongodb://127.0.0.1:27017/movielens.movies update=true"); System.exit(-1); } Class outputValueClass = BSONWritable.class; Class reducerClass = Reduce.class; if(args[2].equals("update=true")) { outputValueClass = MongoUpdateWritable.class; reducerClass = ReduceUpdater.class; } Configuration conf = new Configuration(); // Set MongoDB-specific configuration items conf.setClass("mongo.job.mapper", Map.class, Mapper.class); conf.setClass("mongo.job.reducer", reducerClass, Reducer.class); conf.setClass("mongo.job.mapper.output.key", IntWritable.class, Object.class); conf.setClass("mongo.job.mapper.output.value", DoubleWritable.class, Object.class); conf.setClass("mongo.job.output.key", NullWritable.class, Object.class); conf.setClass("mongo.job.output.value", outputValueClass, Object.class); conf.set("mongo.input.uri", args[0]); conf.set("mongo.output.uri", args[1]); Job job = Job.getInstance(conf); // Set Hadoop-specific job parameters job.setInputFormatClass(MongoInputFormat.class); job.setOutputFormatClass(MongoOutputFormat.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(outputValueClass); job.setMapperClass(Map.class); job.setReducerClass(reducerClass); job.setJarByClass(MapReduceExercise.class); job.submit(); }