Java 类org.apache.hadoop.mapred.MapFileOutputFormat 实例源码
项目:GeoCrawler
文件:SegmentReader.java
private List<Writable> getMapRecords(Path dir, Text key) throws Exception {
MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir,
getConf());
ArrayList<Writable> res = new ArrayList<Writable>();
Class<?> keyClass = readers[0].getKeyClass();
Class<?> valueClass = readers[0].getValueClass();
if (!keyClass.getName().equals("org.apache.hadoop.io.Text"))
throw new IOException("Incompatible key (" + keyClass.getName() + ")");
Writable value = (Writable) valueClass.newInstance();
// we don't know the partitioning schema
for (int i = 0; i < readers.length; i++) {
if (readers[i].get(key, value) != null) {
res.add(value);
value = (Writable) valueClass.newInstance();
Text aKey = (Text) keyClass.newInstance();
while (readers[i].next(aKey, value) && aKey.equals(key)) {
res.add(value);
value = (Writable) valueClass.newInstance();
}
}
readers[i].close();
}
return res;
}
项目:GeoCrawler
文件:LinkDbMerger.java
public static JobConf createMergeJob(Configuration config, Path linkDb,
boolean normalize, boolean filter) {
Path newLinkDb = new Path("linkdb-merge-"
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf job = new NutchJob(config);
job.setJobName("linkdb merge " + linkDb);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(LinkDbFilter.class);
job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
job.setReducerClass(LinkDbMerger.class);
FileOutputFormat.setOutputPath(job, newLinkDb);
job.setOutputFormat(MapFileOutputFormat.class);
job.setBoolean("mapred.output.compress", true);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Inlinks.class);
// https://issues.apache.org/jira/browse/NUTCH-1069
job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
return job;
}
项目:GeoCrawler
文件:NodeReader.java
/**
* Prints the content of the Node represented by the url to system out.
*
* @param webGraphDb
* The webgraph from which to get the node.
* @param url
* The url of the node.
*
* @throws IOException
* If an error occurs while getting the node.
*/
public void dumpUrl(Path webGraphDb, String url) throws IOException {
fs = FileSystem.get(getConf());
nodeReaders = MapFileOutputFormat.getReaders(fs, new Path(webGraphDb,
WebGraph.NODE_DIR), getConf());
// open the readers, get the node, print out the info, and close the readers
Text key = new Text(url);
Node node = new Node();
MapFileOutputFormat.getEntry(nodeReaders,
new HashPartitioner<Text, Node>(), key, node);
System.out.println(url + ":");
System.out.println(" inlink score: " + node.getInlinkScore());
System.out.println(" outlink score: " + node.getOutlinkScore());
System.out.println(" num inlinks: " + node.getNumInlinks());
System.out.println(" num outlinks: " + node.getNumOutlinks());
FSUtils.closeReaders(nodeReaders);
}
项目:anthelion
文件:SegmentReader.java
private List<Writable> getMapRecords(Path dir, Text key) throws Exception {
MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir, getConf());
ArrayList<Writable> res = new ArrayList<Writable>();
Class keyClass = readers[0].getKeyClass();
Class valueClass = readers[0].getValueClass();
if (!keyClass.getName().equals("org.apache.hadoop.io.Text"))
throw new IOException("Incompatible key (" + keyClass.getName() + ")");
Writable value = (Writable)valueClass.newInstance();
// we don't know the partitioning schema
for (int i = 0; i < readers.length; i++) {
if (readers[i].get(key, value) != null) {
res.add(value);
value = (Writable)valueClass.newInstance();
Text aKey = (Text) keyClass.newInstance();
while (readers[i].next(aKey, value) && aKey.equals(key)) {
res.add(value);
value = (Writable)valueClass.newInstance();
}
}
readers[i].close();
}
return res;
}
项目:anthelion
文件:LinkDbMerger.java
public static JobConf createMergeJob(Configuration config, Path linkDb, boolean normalize, boolean filter) {
Path newLinkDb =
new Path("linkdb-merge-" +
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf job = new NutchJob(config);
job.setJobName("linkdb merge " + linkDb);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(LinkDbFilter.class);
job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
job.setReducerClass(LinkDbMerger.class);
FileOutputFormat.setOutputPath(job, newLinkDb);
job.setOutputFormat(MapFileOutputFormat.class);
job.setBoolean("mapred.output.compress", true);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Inlinks.class);
// https://issues.apache.org/jira/browse/NUTCH-1069
job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
return job;
}
项目:anthelion
文件:LoopReader.java
/**
* Prints loopset for a single url. The loopset information will show any
* outlink url the eventually forms a link cycle.
*
* @param webGraphDb The WebGraph to check for loops
* @param url The url to check.
*
* @throws IOException If an error occurs while printing loopset information.
*/
public void dumpUrl(Path webGraphDb, String url)
throws IOException {
// open the readers
fs = FileSystem.get(getConf());
loopReaders = MapFileOutputFormat.getReaders(fs, new Path(webGraphDb,
Loops.LOOPS_DIR), getConf());
// get the loopset for a given url, if any
Text key = new Text(url);
LoopSet loop = new LoopSet();
MapFileOutputFormat.getEntry(loopReaders,
new HashPartitioner<Text, LoopSet>(), key, loop);
// print out each loop url in the set
System.out.println(url + ":");
for (String loopUrl : loop.getLoopSet()) {
System.out.println(" " + loopUrl);
}
// close the readers
FSUtils.closeReaders(loopReaders);
}
项目:anthelion
文件:NodeReader.java
/**
* Prints the content of the Node represented by the url to system out.
*
* @param webGraphDb The webgraph from which to get the node.
* @param url The url of the node.
*
* @throws IOException If an error occurs while getting the node.
*/
public void dumpUrl(Path webGraphDb, String url)
throws IOException {
fs = FileSystem.get(getConf());
nodeReaders = MapFileOutputFormat.getReaders(fs, new Path(webGraphDb,
WebGraph.NODE_DIR), getConf());
// open the readers, get the node, print out the info, and close the readers
Text key = new Text(url);
Node node = new Node();
MapFileOutputFormat.getEntry(nodeReaders,
new HashPartitioner<Text, Node>(), key, node);
System.out.println(url + ":");
System.out.println(" inlink score: " + node.getInlinkScore());
System.out.println(" outlink score: " + node.getOutlinkScore());
System.out.println(" num inlinks: " + node.getNumInlinks());
System.out.println(" num outlinks: " + node.getNumOutlinks());
FSUtils.closeReaders(nodeReaders);
}
项目:GeoCrawler
文件:CrawlDbReader.java
private void openReaders(String crawlDb, JobConf config)
throws IOException {
if (readers != null)
return;
FileSystem fs = FileSystem.get(config);
readers = MapFileOutputFormat.getReaders(fs, new Path(crawlDb,
CrawlDb.CURRENT_NAME), config);
}
项目:GeoCrawler
文件:CrawlDbReader.java
public CrawlDatum get(String crawlDb, String url, JobConf config)
throws IOException {
Text key = new Text(url);
CrawlDatum val = new CrawlDatum();
openReaders(crawlDb, config);
CrawlDatum res = (CrawlDatum) MapFileOutputFormat.getEntry(readers,
new HashPartitioner<Text, CrawlDatum>(), key, val);
return res;
}
项目:GeoCrawler
文件:LinkRank.java
/**
* Runs the initializer job. The initializer job sets up the nodes with a
* default starting score for link analysis.
*
* @param nodeDb
* The node database to use.
* @param output
* The job output directory.
*
* @throws IOException
* If an error occurs while running the initializer job.
*/
private void runInitializer(Path nodeDb, Path output) throws IOException {
// configure the initializer
JobConf initializer = new NutchJob(getConf());
initializer.setJobName("LinkAnalysis Initializer");
FileInputFormat.addInputPath(initializer, nodeDb);
FileOutputFormat.setOutputPath(initializer, output);
initializer.setInputFormat(SequenceFileInputFormat.class);
initializer.setMapperClass(Initializer.class);
initializer.setMapOutputKeyClass(Text.class);
initializer.setMapOutputValueClass(Node.class);
initializer.setOutputKeyClass(Text.class);
initializer.setOutputValueClass(Node.class);
initializer.setOutputFormat(MapFileOutputFormat.class);
initializer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
// run the initializer
LOG.info("Starting initialization job");
try {
JobClient.runJob(initializer);
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
LOG.info("Finished initialization job.");
}
项目:GeoCrawler
文件:LinkRank.java
/**
* Runs the link analysis job. The link analysis job applies the link rank
* formula to create a score per url and stores that score in the NodeDb.
*
* Typically the link analysis job is run a number of times to allow the link
* rank scores to converge.
*
* @param nodeDb
* The node database from which we are getting previous link rank
* scores.
* @param inverted
* The inverted inlinks
* @param output
* The link analysis output.
* @param iteration
* The current iteration number.
* @param numIterations
* The total number of link analysis iterations
*
* @throws IOException
* If an error occurs during link analysis.
*/
private void runAnalysis(Path nodeDb, Path inverted, Path output,
int iteration, int numIterations, float rankOne) throws IOException {
JobConf analyzer = new NutchJob(getConf());
analyzer.set("link.analyze.iteration", String.valueOf(iteration + 1));
analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (iteration + 1)
+ " of " + numIterations);
FileInputFormat.addInputPath(analyzer, nodeDb);
FileInputFormat.addInputPath(analyzer, inverted);
FileOutputFormat.setOutputPath(analyzer, output);
analyzer.set("link.analyze.rank.one", String.valueOf(rankOne));
analyzer.setMapOutputKeyClass(Text.class);
analyzer.setMapOutputValueClass(ObjectWritable.class);
analyzer.setInputFormat(SequenceFileInputFormat.class);
analyzer.setMapperClass(Analyzer.class);
analyzer.setReducerClass(Analyzer.class);
analyzer.setOutputKeyClass(Text.class);
analyzer.setOutputValueClass(Node.class);
analyzer.setOutputFormat(MapFileOutputFormat.class);
analyzer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
LOG.info("Starting analysis job");
try {
JobClient.runJob(analyzer);
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
LOG.info("Finished analysis job.");
}
项目:GeoCrawler
文件:LinkDumper.java
public static void main(String[] args) throws Exception {
if (args == null || args.length < 2) {
System.out.println("LinkDumper$Reader usage: <webgraphdb> <url>");
return;
}
// open the readers for the linkdump directory
Configuration conf = NutchConfiguration.create();
FileSystem fs = FileSystem.get(conf);
Path webGraphDb = new Path(args[0]);
String url = args[1];
MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(
webGraphDb, DUMP_DIR), conf);
// get the link nodes for the url
Text key = new Text(url);
LinkNodes nodes = new LinkNodes();
MapFileOutputFormat.getEntry(readers,
new HashPartitioner<Text, LinkNodes>(), key, nodes);
// print out the link nodes
LinkNode[] linkNodesAr = nodes.getLinks();
System.out.println(url + ":");
for (LinkNode node : linkNodesAr) {
System.out.println(" " + node.getUrl() + " - "
+ node.getNode().toString());
}
// close the readers
FSUtils.closeReaders(readers);
}
项目:GeoCrawler
文件:TestSegmentMergerCrawlDatums.java
/**
* Checks the merged segment and removes the stuff again.
*
* @param the
* test directory
* @param the
* merged segment
* @return the final status
*/
protected byte checkMergedSegment(Path testDir, Path mergedSegment)
throws Exception {
// Get a MapFile reader for the <Text,CrawlDatum> pairs
MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(
mergedSegment, CrawlDatum.FETCH_DIR_NAME), conf);
Text key = new Text();
CrawlDatum value = new CrawlDatum();
byte finalStatus = 0x0;
for (MapFile.Reader reader : readers) {
while (reader.next(key, value)) {
LOG.info("Reading status for: " + key.toString() + " > "
+ CrawlDatum.getStatusName(value.getStatus()));
// Only consider fetch status
if (CrawlDatum.hasFetchStatus(value)
&& key.toString().equals("http://nutch.apache.org/")) {
finalStatus = value.getStatus();
}
}
// Close the reader again
reader.close();
}
// Remove the test directory again
fs.delete(testDir, true);
LOG.info("Final fetch status for: http://nutch.apache.org/ > "
+ CrawlDatum.getStatusName(finalStatus));
// Return the final status
return finalStatus;
}
项目:GeoCrawler
文件:TestSegmentMerger.java
@Test
public void testLargeMerge() throws Exception {
SegmentMerger merger = new SegmentMerger(conf);
merger.merge(out, new Path[] { seg1, seg2 }, false, false, -1);
// verify output
FileStatus[] stats = fs.listStatus(out);
// there should be just one path
Assert.assertEquals(1, stats.length);
Path outSeg = stats[0].getPath();
Text k = new Text();
ParseText v = new ParseText();
MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(
outSeg, ParseText.DIR_NAME), conf);
int cnt1 = 0, cnt2 = 0;
for (MapFile.Reader r : readers) {
while (r.next(k, v)) {
String ks = k.toString();
String vs = v.getText();
if (ks.startsWith("seg1-")) {
cnt1++;
Assert.assertTrue(vs.startsWith("seg1 "));
} else if (ks.startsWith("seg2-")) {
cnt2++;
Assert.assertTrue(vs.startsWith("seg2 "));
}
}
r.close();
}
Assert.assertEquals(countSeg1, cnt1);
Assert.assertEquals(countSeg2, cnt2);
}
项目:FEL
文件:ExtractWikipediaAnchorText.java
/**
* Extracts CF for each found anchor.
*
* @param inputPath
* @param mapPath
* @param outputPath
* @throws IOException
*/
private void task3(String inputPath, String mapPath, String outputPath) throws IOException {
LOG.info("Extracting anchor text (phase 3)...");
LOG.info(" - input: " + inputPath);
LOG.info(" - output: " + outputPath);
LOG.info(" - mapping: " + mapPath);
JobConf conf = new JobConf(getConf(), ExtractWikipediaAnchorText.class);
conf.setJobName(String.format("ExtractWikipediaAnchorText:phase3[input: %s, output: %s]", inputPath, outputPath));
conf.setNumReduceTasks(1);
String location = "map.dat";
try {
DistributedCache.addCacheFile(new URI(mapPath + "/part-00000/data" + "#" + location), conf);
//DistributedCache.addCacheFile(new URI(mapPath + "/singleentitymap.data" + "#" + location), conf);
DistributedCache.createSymlink(conf);
} catch (URISyntaxException e) {
e.printStackTrace();
}
FileInputFormat.addInputPath(conf, new Path(inputPath));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputFormat(MapFileOutputFormat.class);
// conf.setOutputFormat(TextOutputFormat.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MyMapper3.class);
conf.setCombinerClass(MyReducer3.class);
conf.setReducerClass(MyReducer3.class);
JobClient.runJob(conf);
}
项目:FEL
文件:ExtractWikipediaAnchorText.java
/**
* Maps from (targetID, (anchor, count)) to (anchor, (targetID, count)).
*
* @param inputPath
* @param outputPath
* @throws IOException
*/
private void task4(String inputPath, String outputPath) throws IOException {
LOG.info("Extracting anchor text (phase 4)...");
LOG.info(" - input: " + inputPath);
LOG.info(" - output: " + outputPath);
JobConf conf = new JobConf(getConf(), ExtractWikipediaAnchorText.class);
conf.setJobName(String.format("ExtractWikipediaAnchorText:phase4[input: %s, output: %s]", inputPath, outputPath));
conf.setNumReduceTasks(1);
//FileInputFormat.addInputPath(conf, new Path(inputPath + "/part-00000/data"));
FileInputFormat.addInputPath(conf, new Path(inputPath + "/part-*/data"));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputFormat(MapFileOutputFormat.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(HMapSIW.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(HMapSIW.class);
conf.setMapperClass(MyMapper4.class);
conf.setReducerClass(MyReducer4.class);
JobClient.runJob(conf);
}
项目:anthelion
文件:CrawlDbReader.java
public CrawlDatum get(String crawlDb, String url, Configuration config) throws IOException {
Text key = new Text(url);
CrawlDatum val = new CrawlDatum();
openReaders(crawlDb, config);
CrawlDatum res = (CrawlDatum)MapFileOutputFormat.getEntry(readers,
new HashPartitioner<Text, CrawlDatum>(), key, val);
return res;
}
项目:anthelion
文件:CrawlDbReader.java
public void processDumpJob(String crawlDb, String output, Configuration config, String format, String regex, String status) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb dump: starting");
LOG.info("CrawlDb db: " + crawlDb);
}
Path outFolder = new Path(output);
JobConf job = new NutchJob(config);
job.setJobName("dump " + crawlDb);
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
FileOutputFormat.setOutputPath(job, outFolder);
if (format.equals("csv")) {
job.setOutputFormat(CrawlDatumCsvOutputFormat.class);
}
else if (format.equals("crawldb")) {
job.setOutputFormat(MapFileOutputFormat.class);
} else {
job.setOutputFormat(TextOutputFormat.class);
}
if (status != null) job.set("status", status);
if (regex != null) job.set("regex", regex);
job.setMapperClass(CrawlDbDumpMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
JobClient.runJob(job);
if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: done"); }
}
项目:anthelion
文件:LinkRank.java
/**
* Runs the initializer job. The initializer job sets up the nodes with a
* default starting score for link analysis.
*
* @param nodeDb The node database to use.
* @param output The job output directory.
*
* @throws IOException If an error occurs while running the initializer job.
*/
private void runInitializer(Path nodeDb, Path output)
throws IOException {
// configure the initializer
JobConf initializer = new NutchJob(getConf());
initializer.setJobName("LinkAnalysis Initializer");
FileInputFormat.addInputPath(initializer, nodeDb);
FileOutputFormat.setOutputPath(initializer, output);
initializer.setInputFormat(SequenceFileInputFormat.class);
initializer.setMapperClass(Initializer.class);
initializer.setMapOutputKeyClass(Text.class);
initializer.setMapOutputValueClass(Node.class);
initializer.setOutputKeyClass(Text.class);
initializer.setOutputValueClass(Node.class);
initializer.setOutputFormat(MapFileOutputFormat.class);
initializer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
// run the initializer
LOG.info("Starting initialization job");
try {
JobClient.runJob(initializer);
}
catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
LOG.info("Finished initialization job.");
}
项目:anthelion
文件:LinkRank.java
/**
* Runs the link analysis job. The link analysis job applies the link rank
* formula to create a score per url and stores that score in the NodeDb.
*
* Typically the link analysis job is run a number of times to allow the link
* rank scores to converge.
*
* @param nodeDb The node database from which we are getting previous link
* rank scores.
* @param inverted The inverted inlinks
* @param output The link analysis output.
* @param iteration The current iteration number.
* @param numIterations The total number of link analysis iterations
*
* @throws IOException If an error occurs during link analysis.
*/
private void runAnalysis(Path nodeDb, Path inverted, Path output,
int iteration, int numIterations, float rankOne)
throws IOException {
JobConf analyzer = new NutchJob(getConf());
analyzer.set("link.analyze.iteration", String.valueOf(iteration + 1));
analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (iteration + 1)
+ " of " + numIterations);
FileInputFormat.addInputPath(analyzer, nodeDb);
FileInputFormat.addInputPath(analyzer, inverted);
FileOutputFormat.setOutputPath(analyzer, output);
analyzer.set("link.analyze.rank.one", String.valueOf(rankOne));
analyzer.setMapOutputKeyClass(Text.class);
analyzer.setMapOutputValueClass(ObjectWritable.class);
analyzer.setInputFormat(SequenceFileInputFormat.class);
analyzer.setMapperClass(Analyzer.class);
analyzer.setReducerClass(Analyzer.class);
analyzer.setOutputKeyClass(Text.class);
analyzer.setOutputValueClass(Node.class);
analyzer.setOutputFormat(MapFileOutputFormat.class);
analyzer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
LOG.info("Starting analysis job");
try {
JobClient.runJob(analyzer);
}
catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
LOG.info("Finished analysis job.");
}
项目:anthelion
文件:LinkDumper.java
public static void main(String[] args)
throws Exception {
if (args == null || args.length < 2) {
System.out.println("LinkDumper$Reader usage: <webgraphdb> <url>");
return;
}
// open the readers for the linkdump directory
Configuration conf = NutchConfiguration.create();
FileSystem fs = FileSystem.get(conf);
Path webGraphDb = new Path(args[0]);
String url = args[1];
MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(
webGraphDb, DUMP_DIR), conf);
// get the link nodes for the url
Text key = new Text(url);
LinkNodes nodes = new LinkNodes();
MapFileOutputFormat.getEntry(readers,
new HashPartitioner<Text, LinkNodes>(), key, nodes);
// print out the link nodes
LinkNode[] linkNodesAr = nodes.getLinks();
System.out.println(url + ":");
for (LinkNode node : linkNodesAr) {
System.out.println(" " + node.getUrl() + " - "
+ node.getNode().toString());
}
// close the readers
FSUtils.closeReaders(readers);
}
项目:anthelion
文件:TestSegmentMerger.java
public void testLargeMerge() throws Exception {
SegmentMerger merger = new SegmentMerger(conf);
merger.merge(out, new Path[]{seg1, seg2}, false, false, -1);
// verify output
FileStatus[] stats = fs.listStatus(out);
// there should be just one path
assertEquals(1, stats.length);
Path outSeg = stats[0].getPath();
Text k = new Text();
ParseText v = new ParseText();
MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(outSeg, ParseText.DIR_NAME), conf);
int cnt1 = 0, cnt2 = 0;
for (MapFile.Reader r : readers) {
while (r.next(k, v)) {
String ks = k.toString();
String vs = v.getText();
if (ks.startsWith("seg1-")) {
cnt1++;
assertTrue(vs.startsWith("seg1 "));
} else if (ks.startsWith("seg2-")) {
cnt2++;
assertTrue(vs.startsWith("seg2 "));
}
}
r.close();
}
assertEquals(countSeg1, cnt1);
assertEquals(countSeg2, cnt2);
}
项目:fst-bench
文件:HiveData.java
private void createRankingsTableDirectly() throws IOException, URISyntaxException {
log.info("Creating table rankings...");
Path fout = new Path(options.getResultPath(), RANKINGS);
JobConf job = new JobConf(HiveData.class);
String jobname = "Create rankings";
/** TODO: change another more effective way as this operation may cause
* about 2 min delay (originally ~15min in total)
*/
setRankingsOptions(job);
job.setJobName(jobname);
job.set("mapred.reduce.slowstart.completed.maps", "0.3");
job.set("mapreduce.job.reduce.slowstart.completedmaps", "0.3");
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(JoinBytesInt.class);
job.setJarByClass(DummyToRankingsMapper.class);
job.setJarByClass(JoinBytesIntCombiner.class);
job.setJarByClass(GenerateRankingsReducer.class);
job.setMapperClass(DummyToRankingsMapper.class);
job.setCombinerClass(JoinBytesIntCombiner.class);
job.setReducerClass(GenerateRankingsReducer.class);
if (options.getNumReds() > 0) {
job.setNumReduceTasks(options.getNumReds());
} else {
job.setNumReduceTasks(Utils.getMaxNumReds());
}
job.setInputFormat(NLineInputFormat.class);
FileInputFormat.setInputPaths(job, dummy.getPath());
job.set("mapred.map.output.compression.type", "BLOCK");
job.set("mapreduce.output.fileoutputformat.compress.type","BLOCK");
MapFileOutputFormat.setCompressOutput(job, true);
// MapFileOutputFormat.setOutputCompressorClass(job, org.apache.hadoop.io.compress.LzoCodec.class);
MapFileOutputFormat.setOutputCompressorClass(job, org.apache.hadoop.io.compress.DefaultCodec.class);
if (options.isSequenceOut()) {
job.setOutputFormat(SequenceFileOutputFormat.class);
} else {
job.setOutputFormat(TextOutputFormat.class);
}
if (null != options.getCodecClass()) {
job.set("mapred.output.compression.type","BLOCK");
job.set("mapreduce.output.fileoutputformat.compress.type","BLOCK");
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, options.getCodecClass());
}
FileOutputFormat.setOutputPath(job, fout);
log.info("Running Job: " +jobname);
log.info("Pages file " + dummy.getPath() + " as input");
log.info("Rankings file " + fout + " as output");
JobClient.runJob(job);
log.info("Finished Running Job: " + jobname);
}
项目:fst-bench
文件:NutchData.java
private void createNutchUrls() throws IOException, URISyntaxException {
log.info("Creating nutch urls ...");
JobConf job = new JobConf(NutchData.class);
Path urls = new Path(options.getWorkPath(), URLS_DIR_NAME);
Utils.checkHdfsPath(urls);
String jobname = "Create nutch urls";
job.setJobName(jobname);
setNutchOptions(job);
FileInputFormat.setInputPaths(job, dummy.getPath());
job.setInputFormat(NLineInputFormat.class);
job.setMapperClass(CreateUrlHash.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormat(MapFileOutputFormat.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
MapFileOutputFormat.setOutputPath(job, urls);
// SequenceFileOutputFormat.setOutputPath(job, fout);
/*
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
*/
log.info("Running Job: " +jobname);
log.info("Pages file " + dummy.getPath() + " as input");
log.info("Rankings file " + urls + " as output");
JobClient.runJob(job);
log.info("Finished Running Job: " + jobname);
log.info("Cleaning temp files...");
Utils.cleanTempFiles(urls);
}
项目:GeoCrawler
文件:CrawlDbReader.java
public void processDumpJob(String crawlDb, String output,
JobConf config, String format, String regex, String status,
Integer retry, String expr) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb dump: starting");
LOG.info("CrawlDb db: " + crawlDb);
}
Path outFolder = new Path(output);
JobConf job = new NutchJob(config);
job.setJobName("dump " + crawlDb);
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
FileOutputFormat.setOutputPath(job, outFolder);
if (format.equals("csv")) {
job.setOutputFormat(CrawlDatumCsvOutputFormat.class);
} else if (format.equals("crawldb")) {
job.setOutputFormat(MapFileOutputFormat.class);
} else {
job.setOutputFormat(TextOutputFormat.class);
}
if (status != null)
job.set("status", status);
if (regex != null)
job.set("regex", regex);
if (retry != null)
job.setInt("retry", retry);
if (expr != null) {
job.set("expr", expr);
LOG.info("CrawlDb db: expr: " + expr);
}
job.setMapperClass(CrawlDbDumpMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
JobClient.runJob(job);
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb dump: done");
}
}
项目:GeoCrawler
文件:ScoreUpdater.java
/**
* Updates the inlink score in the web graph node databsae into the crawl
* database.
*
* @param crawlDb
* The crawl database to update
* @param webGraphDb
* The webgraph database to use.
*
* @throws IOException
* If an error occurs while updating the scores.
*/
public void update(Path crawlDb, Path webGraphDb) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("ScoreUpdater: starting at " + sdf.format(start));
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
// create a temporary crawldb with the new scores
LOG.info("Running crawldb update " + crawlDb);
Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
Path crawlDbCurrent = new Path(crawlDb, CrawlDb.CURRENT_NAME);
Path newCrawlDb = new Path(crawlDb, Integer.toString(new Random()
.nextInt(Integer.MAX_VALUE)));
// run the updater job outputting to the temp crawl database
JobConf updater = new NutchJob(conf);
updater.setJobName("Update CrawlDb from WebGraph");
FileInputFormat.addInputPath(updater, crawlDbCurrent);
FileInputFormat.addInputPath(updater, nodeDb);
FileOutputFormat.setOutputPath(updater, newCrawlDb);
updater.setInputFormat(SequenceFileInputFormat.class);
updater.setMapperClass(ScoreUpdater.class);
updater.setReducerClass(ScoreUpdater.class);
updater.setMapOutputKeyClass(Text.class);
updater.setMapOutputValueClass(ObjectWritable.class);
updater.setOutputKeyClass(Text.class);
updater.setOutputValueClass(CrawlDatum.class);
updater.setOutputFormat(MapFileOutputFormat.class);
try {
JobClient.runJob(updater);
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
// remove the temp crawldb on error
if (fs.exists(newCrawlDb)) {
fs.delete(newCrawlDb, true);
}
throw e;
}
// install the temp crawl database
LOG.info("ScoreUpdater: installing new crawldb " + crawlDb);
CrawlDb.install(updater, crawlDb);
long end = System.currentTimeMillis();
LOG.info("ScoreUpdater: finished at " + sdf.format(end) + ", elapsed: "
+ TimingUtil.elapsedTime(start, end));
}
项目:GeoCrawler
文件:SegmentHandler.java
private Writable getEntry(MapFile.Reader[] readers, Text url, Writable entry)
throws IOException {
return MapFileOutputFormat.getEntry(readers, PARTITIONER, url, entry);
}
项目:FEL
文件:ExtractWikipediaAnchorText.java
/**
*
* Maps from (srcID, (targetID, anchor) to (targetID, (anchor, count)).
*
* @param inputPath
* @param outputPath
* @throws IOException
*/
private void task2(String inputPath, String outputPath, String redirPath) throws IOException {
LOG.info("Extracting anchor text (phase 2)...");
LOG.info(" - input: " + inputPath);
LOG.info(" - output: " + outputPath);
Random r = new Random( );
//String tmpOutput = "tmp-" + this.getClass().getCanonicalName() + "-" + r.nextInt(10000);
//LOG.info( "intermediate folder for merge " + tmpOutput );
JobConf conf = new JobConf(getConf(), ExtractWikipediaAnchorText.class);
conf.setJobName(String.format("ExtractWikipediaAnchorText:phase2[input: %s, output: %s]", inputPath, outputPath));
// Gathers everything together for convenience; feasible for Wikipedia.
conf.setNumReduceTasks(1);
try {
DistributedCache.addCacheFile(new URI(redirPath + "/part-00000" + "#" + "redirs.dat"), conf);
DistributedCache.createSymlink(conf);
} catch (URISyntaxException e) {
e.printStackTrace();
}
FileInputFormat.addInputPath(conf, new Path(inputPath));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
//FileOutputFormat.setOutputPath(conf, new Path(tmpOutput));
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputFormat(MapFileOutputFormat.class);
// conf.setOutputFormat(TextOutputFormat.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(HMapSIW.class);
conf.setMapperClass(MyMapper2.class);
conf.setReducerClass(MyReducer2.class);
// Delete the output directory if it exists already.
FileSystem.get(conf).delete(new Path(outputPath), true);
JobClient.runJob(conf);
// Clean up intermediate data.
FileSystem.get(conf).delete(new Path(inputPath), true);
/*
//merge
String finalO = outputPath+"/part-00000/data";
FileSystem.get(conf).mkdirs( new Path( outputPath + "part-00000") );
getMergeInHdfs( tmpOutput, finalO, conf );
FileSystem.get(conf).delete(new Path(tmpOutput), true);
*/
}
项目:anthelion
文件:CrawlDbReader.java
private void openReaders(String crawlDb, Configuration config) throws IOException {
if (readers != null) return;
FileSystem fs = FileSystem.get(config);
readers = MapFileOutputFormat.getReaders(fs, new Path(crawlDb,
CrawlDb.CURRENT_NAME), config);
}
项目:anthelion
文件:CrawlDBScanner.java
private void scan(Path crawlDb, Path outputPath, String regex, String status,
boolean text) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("CrawlDB scanner: starting at " + sdf.format(start));
JobConf job = new NutchJob(getConf());
job.setJobName("Scan : " + crawlDb + " for URLS matching : " + regex);
job.set("CrawlDBScanner.regex", regex);
if (status != null) job.set("CrawlDBScanner.status", status);
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(CrawlDBScanner.class);
job.setReducerClass(CrawlDBScanner.class);
FileOutputFormat.setOutputPath(job, outputPath);
// if we want a text dump of the entries
// in order to check something - better to use the text format and avoid
// compression
if (text) {
job.set("mapred.output.compress", "false");
job.setOutputFormat(TextOutputFormat.class);
}
// otherwise what we will actually create is a mini-crawlDB which can be
// then used
// for debugging
else {
job.setOutputFormat(MapFileOutputFormat.class);
}
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CrawlDatum.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
try {
JobClient.runJob(job);
} catch (IOException e) {
throw e;
}
long end = System.currentTimeMillis();
LOG.info("CrawlDb scanner: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
项目:anthelion
文件:SegmentHandler.java
private Writable getEntry(MapFile.Reader[] readers, Text url,
Writable entry) throws IOException {
return MapFileOutputFormat.getEntry(readers, PARTITIONER, url, entry);
}
项目:anthelion
文件:ScoreUpdater.java
/**
* Updates the inlink score in the web graph node databsae into the crawl
* database.
*
* @param crawlDb The crawl database to update
* @param webGraphDb The webgraph database to use.
*
* @throws IOException If an error occurs while updating the scores.
*/
public void update(Path crawlDb, Path webGraphDb)
throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("ScoreUpdater: starting at " + sdf.format(start));
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
// create a temporary crawldb with the new scores
LOG.info("Running crawldb update " + crawlDb);
Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
Path crawlDbCurrent = new Path(crawlDb, CrawlDb.CURRENT_NAME);
Path newCrawlDb = new Path(crawlDb,
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
// run the updater job outputting to the temp crawl database
JobConf updater = new NutchJob(conf);
updater.setJobName("Update CrawlDb from WebGraph");
FileInputFormat.addInputPath(updater, crawlDbCurrent);
FileInputFormat.addInputPath(updater, nodeDb);
FileOutputFormat.setOutputPath(updater, newCrawlDb);
updater.setInputFormat(SequenceFileInputFormat.class);
updater.setMapperClass(ScoreUpdater.class);
updater.setReducerClass(ScoreUpdater.class);
updater.setMapOutputKeyClass(Text.class);
updater.setMapOutputValueClass(ObjectWritable.class);
updater.setOutputKeyClass(Text.class);
updater.setOutputValueClass(CrawlDatum.class);
updater.setOutputFormat(MapFileOutputFormat.class);
try {
JobClient.runJob(updater);
}
catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
// remove the temp crawldb on error
if (fs.exists(newCrawlDb)) {
fs.delete(newCrawlDb, true);
}
throw e;
}
// install the temp crawl database
LOG.info("ScoreUpdater: installing new crawldb " + crawlDb);
CrawlDb.install(updater, crawlDb);
long end = System.currentTimeMillis();
LOG.info("ScoreUpdater: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}