Java 类org.apache.hadoop.mapreduce.lib.output.MultipleOutputs 实例源码
项目:dataSqueeze
文件:TextCompactionReducer.java
/**
* {@inheritDoc}
*/
protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
final Configuration configuration = context.getConfiguration();
final String sourcePath = configuration.get("compactionSourcePath");
final String targetPath = configuration.get("compactionTargetPath");
// Reducer stores data at the target directory retaining the directory structure of files
String filePath = key.toString().replace(sourcePath, targetPath);
if (key.toString().endsWith("/")) {
filePath = filePath.concat("file");
}
log.info("Compaction output path {}", filePath);
final URI uri = URI.create(filePath);
final MultipleOutputs multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
try {
for (final Text text : values) {
multipleOutputs.write(NullWritable.get(), text, uri.toString());
}
} finally {
multipleOutputs.close();
}
}
项目:dataSqueeze
文件:OrcCompactionReducer.java
/**
* {@inheritDoc}
*/
protected void reduce(final Text key, final Iterable<OrcValue> values, final Context context) throws IOException, InterruptedException {
final Configuration configuration = context.getConfiguration();
final String sourcePath = configuration.get("compactionSourcePath");
final String targetPath = configuration.get("compactionTargetPath");
// Reducer stores data at the target directory retaining the directory structure of files
String filePath = key.toString().replace(sourcePath, targetPath);
if (key.toString().endsWith("/")) {
filePath = filePath.concat("file");
}
log.info("Compaction output path {}", filePath);
final URI uri = URI.create(filePath);
final MultipleOutputs multipleOutputs = new MultipleOutputs<NullWritable, OrcValue>(context);
try {
for (final OrcValue text : values) {
multipleOutputs.write(NullWritable.get(), text, uri.toString());
}
} finally {
multipleOutputs.close();
}
}
项目:dataSqueeze
文件:BytesWritableCompactionReducer.java
/**
* {@inheritDoc}
*/
protected void reduce(final Text key, final Iterable<BytesWritable> values, final Context context) throws IOException, InterruptedException {
final Configuration configuration = context.getConfiguration();
final String sourcePath = configuration.get("compactionSourcePath");
final String targetPath = configuration.get("compactionTargetPath");
// Reducer stores data at the target directory retaining the directory structure of files
String filePath = key.toString().replace(sourcePath, targetPath);
if (key.toString().endsWith("/")) {
filePath = filePath.concat("file");
}
log.info("Compaction output path {}", filePath);
final URI uri = URI.create(filePath);
final MultipleOutputs multipleOutputs = new MultipleOutputs<NullWritable, BytesWritable>(context);
try {
for (final BytesWritable text : values) {
multipleOutputs.write(NullWritable.get(), text, uri.toString());
}
} finally {
multipleOutputs.close();
}
}
项目:InsAdjustment
文件:CSVparserMapper.java
public void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
multipleOutputs = new MultipleOutputs(context);
lowerBoundary = conf.get("LOWER_DATE");
upperBoundary = conf.get("HIGHER_DATE");
}
项目:dkpro-c4corpus
文件:WARCWriterReducerClass.java
/**
* Writes single WARCWritable to the output with specific output file prefix
*
* @param warcWritable warc record
* @param multipleOutputs output
* @throws IOException exception
* @throws InterruptedException exception
*/
// TODO move somewhere else?
public static void writeSingleWARCWritableToOutput(WARCWritable warcWritable,
MultipleOutputs<NullWritable, WARCWritable> multipleOutputs)
throws IOException, InterruptedException
{
WARCRecord.Header header = warcWritable.getRecord().getHeader();
String license = header.getField(WARCRecord.WARCRecordFieldConstants.LICENSE);
String language = header.getField(WARCRecord.WARCRecordFieldConstants.LANGUAGE);
String noBoilerplate = header
.getField(WARCRecord.WARCRecordFieldConstants.NO_BOILERPLATE);
String minimalHtml = header.getField(WARCRecord.WARCRecordFieldConstants.MINIMAL_HTML);
// set the file name prefix
String fileName = createOutputFilePrefix(license, language, noBoilerplate, minimalHtml);
// bottleneck of single reducer for all "Lic_none_Lang_en" pages (majority of Web)
// if ("en".equals(language) && LicenseDetector.NO_LICENCE.equals(license)) {
// long simHash = Long
// .valueOf(header.getField(WARCRecord.WARCRecordFieldConstants.SIMHASH));
// int binNumber = getBinNumberFromSimHash(simHash);
// fileName = createOutputFilePrefix(license, language, noBoilerplate);
// }
multipleOutputs.write(NullWritable.get(), warcWritable, fileName);
}
项目:incubator-pirk
文件:FinalResponseReducer.java
@Override
public void setup(Context ctx) throws IOException, InterruptedException
{
super.setup(ctx);
mos = new MultipleOutputs<>(ctx);
FileSystem fs = FileSystem.newInstance(ctx.getConfiguration());
storage = new HadoopFileSystemStore(fs);
String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
Query query = storage.recall(queryDir, Query.class);
QueryInfo queryInfo = query.getQueryInfo();
outputFile = ctx.getConfiguration().get("pirMR.outputFile");
response = new Response(queryInfo);
}
项目:TopPI
文件:PerItemTopKHadoopCollector.java
public void writeTopKBounds(MultipleOutputs<?, ?> sideOutputs, String outputName, String path, int minsup) throws IOException, InterruptedException {
final IntWritable itemW = new IntWritable();
final IntWritable boundW = new IntWritable();
TIntObjectIterator<PatternWithFreq[]> it = this.topK.iterator();
while (it.hasNext()) {
it.advance();
if (it.value()[this.k - 1] != null) {
final int supportCount = it.value()[this.k - 1].getSupportCount();
if (supportCount > minsup) {
itemW.set(it.key());
boundW.set(supportCount);
sideOutputs.write(outputName, itemW, boundW, path);
}
}
}
}
项目:TopPI
文件:MiningReducer.java
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
this.reverseRebasing = DistCache.readReverseRebasing(DistCache.getCachedFiles(context), conf);
if (conf.get(MinerWrapper.KEY_BOUNDS_PATH) != null) {
this.sideOutputs = new MultipleOutputs<IntWritable, SupportAndTransactionWritable>(context);
}
this.manyItems = conf.getBoolean(TopPIoverHadoop.KEY_MANY_ITEMS_MODE, false);
if (manyItems) {
this.marker = TopPIoverHadoop.FILTERED_DIRNAME;
} else {
this.marker = conf.get(TopPIoverHadoop.KEY_INPUT);
String[] sp = marker.split("/");
if (sp.length > 2) {
this.marker = sp[sp.length - 1];
}
}
this.minsup = conf.getInt(TopPIoverHadoop.KEY_MINSUP, 10);
this.nbGroups = conf.getInt(TopPIoverHadoop.KEY_NBGROUPS, 1);
this.maxItemId = conf.getInt(TopPIoverHadoop.KEY_REBASING_MAX_ID, 1);
this.k = conf.getInt(TopPIoverHadoop.KEY_K, 1);
}
项目:incubator-rya
文件:AbstractReasoningTool.java
/**
* Set up a MapReduce job to output human-readable text.
*/
protected void configureTextOutput(String destination) {
Path outPath;
outPath = MRReasoningUtils.getOutputPath(job.getConfiguration(), destination);
TextOutputFormat.setOutputPath(job, outPath);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.setCountersEnabled(job, true);
}
项目:mara
文件:BaseMRUnitTest.java
protected void verifyNamedOutput(MultipleOutputs multiOut, String name, Object key, Object value, String path, VerificationMode mode) {
ArgumentCaptor keyArg = ArgumentCaptor.forClass(key.getClass());
ArgumentCaptor valueArg = ArgumentCaptor.forClass(value.getClass());
try {
if (name == null) {
verify(multiOut, mode).write(keyArg.capture(), valueArg.capture(), path);
}
else {
if (path == null) {
verify(multiOut, mode).write(eq(name), keyArg.capture(), valueArg.capture());
assertEquals(key, keyArg.getValue());
assertEquals(value, valueArg.getValue());
}
else {
verify(multiOut, mode).write(name, keyArg.capture(), valueArg.capture(), path);
}
}
} catch (IOException | InterruptedException e) {
fail(e.getMessage());
}
}
项目:mara
文件:NamedOutputAnnotationHandler.java
/**
* If this is a multiple output we're annotating, see if there are type parameters to
* use for the key/value classes.
*
* @param target object to reflect for type params
* @return the key/value type parameters
*/
protected Pair<Type, Type> getGenericTypeParams(Object target) {
Pair<Type, Type> kvTypePair = null;
if (target instanceof Field) {
Field field = (Field)target;
if (field.getType() == MultipleOutputs.class) {
Type genericType = field.getGenericType();
if (genericType instanceof ParameterizedType) {
Type[] keyValueTypes = ((ParameterizedType)genericType).getActualTypeArguments();
if (keyValueTypes != null && keyValueTypes.length == 2) {
kvTypePair = new ImmutablePair<>(keyValueTypes[0], keyValueTypes[1]);
}
}
}
}
return kvTypePair;
}
项目:Pinot
文件:RollupPhaseThreeJob.java
@Override
public void setup(Context context) throws IOException, InterruptedException {
LOGGER.info("RollupPhaseOneJob.RollupPhaseOneMapper.setup()");
mos = new MultipleOutputs<BytesWritable, BytesWritable>(context);
Configuration configuration = context.getConfiguration();
FileSystem fileSystem = FileSystem.get(configuration);
Path configPath = new Path(configuration.get(ROLLUP_PHASE3_CONFIG_PATH.toString()));
try {
StarTreeConfig starTreeConfig = StarTreeConfig.decode(fileSystem.open(configPath));
config = RollupPhaseThreeConfig.fromStarTreeConfig(starTreeConfig);
dimensionNames = config.getDimensionNames();
dimensionNameToIndexMapping = new HashMap<String, Integer>();
for (int i = 0; i < dimensionNames.size(); i++) {
dimensionNameToIndexMapping.put(dimensionNames.get(i), i);
}
metricNames = config.getMetricNames();
metricTypes = config.getMetricTypes();
metricSchema = new MetricSchema(config.getMetricNames(), metricTypes);
} catch (Exception e) {
throw new IOException(e);
}
}
项目:Pinot
文件:RollupPhaseOneJob.java
@Override
public void setup(Context context) throws IOException, InterruptedException {
LOGGER.info("RollupPhaseOneJob.RollupPhaseOneMapper.setup()");
mos = new MultipleOutputs<BytesWritable, BytesWritable>(context);
Configuration configuration = context.getConfiguration();
FileSystem fileSystem = FileSystem.get(configuration);
Path configPath = new Path(configuration.get(ROLLUP_PHASE1_CONFIG_PATH
.toString()));
try {
StarTreeConfig starTreeConfig = StarTreeConfig.decode(fileSystem.open(configPath));
config = RollupPhaseOneConfig.fromStarTreeConfig(starTreeConfig);
dimensionNames = config.getDimensionNames();
metricTypes = config.getMetricTypes();
metricSchema = new MetricSchema(config.getMetricNames(), metricTypes);
String className = config.getThresholdFuncClassName();
Map<String,String> params = config.getThresholdFuncParams();
Constructor<?> constructor = Class.forName(className).getConstructor(Map.class);
thresholdFunc = (RollupThresholdFunction) constructor.newInstance(params);
} catch (Exception e) {
throw new IOException(e);
}
}
项目:Pinot
文件:RollupPhaseFourJob.java
@Override
public void setup(Context context) throws IOException, InterruptedException {
LOGGER.info("RollupPhaseOneJob.RollupPhaseOneMapper.setup()");
mos = new MultipleOutputs<BytesWritable, BytesWritable>(context);
Configuration configuration = context.getConfiguration();
FileSystem fileSystem = FileSystem.get(configuration);
Path configPath = new Path(configuration.get(ROLLUP_PHASE4_CONFIG_PATH.toString()));
try {
StarTreeConfig starTreeConfig = StarTreeConfig.decode(fileSystem.open(configPath));
config = RollupPhaseFourConfig.fromStarTreeConfig(starTreeConfig);
dimensionNames = config.getDimensionNames();
dimensionNameToIndexMapping = new HashMap<String, Integer>();
for (int i = 0; i < dimensionNames.size(); i++) {
dimensionNameToIndexMapping.put(dimensionNames.get(i), i);
}
metricNames = config.getMetricNames();
metricTypes = config.getMetricTypes();
metricSchema = new MetricSchema(config.getMetricNames(), metricTypes);
rollupOrder = config.getRollupOrder();
} catch (Exception e) {
throw new IOException(e);
}
}
项目:kylin
文件:FilterRecommendCuboidDataMapper.java
@Override
protected void doSetup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
mos = new MultipleOutputs(context);
String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
CubeManager cubeManager = CubeManager.getInstance(config);
CubeInstance cube = cubeManager.getCube(cubeName);
CubeSegment optSegment = cube.getSegmentById(segmentID);
CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment);
rowKeySplitter = new RowKeySplitter(originalSegment, 65, 255);
baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
recommendCuboids = cube.getCuboidsRecommend();
Preconditions.checkNotNull(recommendCuboids, "The recommend cuboid map could not be null");
}
项目:kylin
文件:UpdateOldCuboidShardMapper.java
@Override
protected void doSetup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
mos = new MultipleOutputs(context);
String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeSegment cubeSegment = cube.getSegmentById(segmentID);
CubeSegment oldSegment = cube.getOriginalSegmentToOptimize(cubeSegment);
cubeDesc = cube.getDescriptor();
baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
rowKeySplitter = new RowKeySplitter(oldSegment, 65, 256);
rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment);
}
项目:archiventory
文件:Archiventory.java
/**
* Apply identification stack
*
* @param containerFileStream Content of the container file
* @param containerFileName File name
* @param context Hadoop context (only for Hadoop job execution)
* @throws FileNotFoundException Exception if the container file cannot be
* found
* @throws IOException I/O Exception
*/
private void performIdentification(Container container, MultipleOutputs mos) throws FileNotFoundException, IOException, InterruptedException {
if (ctx == null) {
ctx = new ClassPathXmlApplicationContext(SPRING_CONFIG_RESOURCE_PATH);
}
IdentifierCollection identificationStack = (IdentifierCollection) ctx.getBean("identificationStack");
for (Identification identifierItem : identificationStack.getIdentifiers()) {
Identification fli = (Identification) identifierItem;
OutWritable outWriter = (OutWritable) ctx.getBean("outWriterBean");
HashMap<String, List<String>> identifyFileList = fli.identifyFileList(container.getBidiIdentifierFilenameMap());
if (mos != null) {
outWriter.write(identifyFileList, mos);
} else {
outWriter.write(identifyFileList);
}
}
}
项目:aliyun-maxcompute-data-collectors
文件:MainframeDatasetImportMapper.java
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
inputSplit = (MainframeDatasetInputSplit)context.getInputSplit();
mos = new MultipleOutputs<Text, NullWritable>(context);
numberOfRecords = 0;
outkey = new Text();
}
项目:WIFIProbe
文件:AnalysisReducer.java
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
gson = new Gson();
text = new Text();
multipleOutputs = new MultipleOutputs<LongWritable, Text>(context);
}
项目:WIFIProbe
文件:Task.java
private boolean analyze(final String inputFilePath,
final String outputFilePath,
final Long startTime) throws Exception {
Configuration conf = new Configuration();
conf.setLong(Holistic.START_TIME, startTime);
conf.setLong(Holistic.EXECUTE_TIME, executeHourTime);
Job jobAnalyze = Job.getInstance(conf, "analyze");
jobAnalyze.setJarByClass(Holistic.class);
MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.NEW_OLD_CUSTOMER,
TextOutputFormat.class, KeyWrapper.class, Text.class);
MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.CUSTOMER_FLOW_KEY,
TextOutputFormat.class, KeyWrapper.class, Text.class);
MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.CYCLE,
TextOutputFormat.class, KeyWrapper.class, Text.class);
MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.IN_STORE_HOUR,
TextOutputFormat.class, KeyWrapper.class, Text.class);
jobAnalyze.setMapperClass(AnalysisMapper.class);
jobAnalyze.setReducerClass(AnalysisReducer.class);
jobAnalyze.setCombinerClass(AnalysisCombiner.class);
jobAnalyze.setOutputKeyClass(LongWritable.class);
jobAnalyze.setOutputValueClass(Text.class);
jobAnalyze.setMapOutputKeyClass(KeyWrapper.class);
jobAnalyze.setMapOutputValueClass(ValueWrapper.class);
FileInputFormat.addInputPath(jobAnalyze, new Path(inputFilePath));
FileOutputFormat.setOutputPath(jobAnalyze, new Path(outputFilePath));
return jobAnalyze.waitForCompletion(true) ;
}
项目:hadoop
文件:MultipleOutputs.java
/**
* Creates and initializes multiple outputs support,
* it should be instantiated in the Mapper/Reducer setup method.
*
* @param context the TaskInputOutputContext object
*/
public MultipleOutputs(
TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
this.context = context;
namedOutputs = Collections.unmodifiableSet(
new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
recordWriters = new HashMap<String, RecordWriter<?, ?>>();
countersEnabled = getCountersEnabled(context);
}
项目:dataSqueeze
文件:OrcCompactionReducerTest.java
@Before
public void setup() throws Exception {
orcStruct.setFieldValue("field1", intWritable);
when(context.getConfiguration()).thenReturn(configuration);
when(configuration.get("compactionSourcePath")).thenReturn("/source");
when(configuration.get("compactionTargetPath")).thenReturn("/target");
whenNew(MultipleOutputs.class).withArguments(context).thenReturn(multipleOutputs);
final OrcValue value = new OrcValue(orcStruct);
list.add(value);
}
项目:aliyun-oss-hadoop-fs
文件:MultipleOutputs.java
/**
* Creates and initializes multiple outputs support,
* it should be instantiated in the Mapper/Reducer setup method.
*
* @param context the TaskInputOutputContext object
*/
public MultipleOutputs(
TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
this.context = context;
namedOutputs = Collections.unmodifiableSet(
new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
recordWriters = new HashMap<String, RecordWriter<?, ?>>();
countersEnabled = getCountersEnabled(context);
}
项目:SOAPgaea
文件:FastqQualityControlReducer.java
@Override
protected void setup(Context context) throws IOException {
mos = new MultipleOutputs<NullWritable, Text>(context);
Configuration conf = context.getConfiguration();
option = new FastqQualityControlOptions();
option.getOptionsFromHadoopConf(conf);
filter = new FastqQualityControlFilter(option);
}
项目:SOAPgaea
文件:VCFSort.java
private void setMultiOutputs(MultipleVCFHeader mVcfHeader, BioJob job) {
// TODO Auto-generated method stub
int i = 0;
Map<Integer, String> multiOutputs = new HashMap<>();
for(int id : mVcfHeader.getFileName2ID().values()) {
multiOutputs.put(id, "SortResult" + ++i);
MultipleOutputs.addNamedOutput(job, multiOutputs.get(id), SortOutputFormat.class, NullWritable.class, VariantContextWritable.class);
}
options.setMultiOutputs(multiOutputs);
}
项目:SOAPgaea
文件:VCFSort.java
@Override
protected void setup(Context context) throws IOException{
Configuration conf = context.getConfiguration();
options = new VCFSortOptions();
options.getOptionsFromHadoopConf(conf);
multiOutputs = options.getMultiOutputs();
mos = new MultipleOutputs<NullWritable, VariantContextWritable>(context);
}
项目:SOAPgaea
文件:RecalibratorContextWriter.java
@SuppressWarnings({ "rawtypes", "unchecked" })
public RecalibratorContextWriter(Context ctx,boolean multiple) {
if(multiple)
mos = new MultipleOutputs<NullWritable, Text>(ctx);
this.context = ctx;
value = new SamRecordWritable();
}
项目:SOAPgaea
文件:AnnotationSortReducer.java
@Override
protected void setup(Context context) throws IOException, InterruptedException {
resultValue = new Text();
multipleOutputs = new MultipleOutputs<>(context);
Configuration conf = context.getConfiguration();
Config userConfig = new Config(conf);
List<String> renameNewHeader = userConfig.getRenameNewHeader();
System.err.println(userConfig.getHeader());
newAnnoHeader = "#" + String.join("\t", renameNewHeader);
resultValue.set(newAnnoHeader);
}
项目:big-c
文件:MultipleOutputs.java
/**
* Creates and initializes multiple outputs support,
* it should be instantiated in the Mapper/Reducer setup method.
*
* @param context the TaskInputOutputContext object
*/
public MultipleOutputs(
TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
this.context = context;
namedOutputs = Collections.unmodifiableSet(
new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
recordWriters = new HashMap<String, RecordWriter<?, ?>>();
countersEnabled = getCountersEnabled(context);
}
项目:legion
文件:DefaultMapper.java
/**
* Do standard Hadoop setup, de-serialize the <code>LegionObjective</code>,
* and prepare for writing to multiple output files.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public void setup(Context context) {
Configuration config = context.getConfiguration();
this.objective = ObjectiveDeserializer.deserialize(
config.get("legion_objective"));
outputWriters = new MultipleOutputs(context);
}
项目:HadoopKMeansClustering
文件:KReducer.java
@Override
public void setup(Context context) throws IOException{
//get job configuration
Configuration conf = context.getConfiguration();
columns = Arrays.stream(conf.getStrings("columns"))
.map( s -> Integer.parseInt(s)).toArray(Integer[]::new);
k = (int) conf.getInt("k", 10);
currentIteration = conf.getInt("currentIteration", 0);
lastIteration = conf.getBoolean("lastIteration", false);
mos = new MultipleOutputs(context);
}
项目:incubator-pirk
文件:ExpTableReducer.java
@Override
public void setup(Context ctx) throws IOException, InterruptedException
{
super.setup(ctx);
mos = new MultipleOutputs<>(ctx);
reducerID = String.format("%05d", ctx.getTaskAttemptID().getTaskID().getId());
logger.info("reducerID = " + reducerID);
}
项目:incubator-pirk
文件:ColumnMultReducer.java
@Override
public void setup(Context ctx) throws IOException, InterruptedException
{
super.setup(ctx);
outputValue = new Text();
mos = new MultipleOutputs<>(ctx);
FileSystem fs = FileSystem.newInstance(ctx.getConfiguration());
String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
query = new HadoopFileSystemStore(fs).recall(queryDir, Query.class);
}
项目:data-polygamy
文件:AggregationReducer.java
@Override
public void setup(Context context)
throws IOException, InterruptedException {
String[] datasetNames = context.getConfiguration().get("dataset-name","").split(",");
String[] datasetIds = context.getConfiguration().get("dataset-id","").split(",");
for (int i = 0; i < datasetNames.length; i++)
idToDataset.put(Integer.parseInt(datasetIds[i]), datasetNames[i]);
out = new MultipleOutputs<SpatioTemporalWritable,FloatArrayWritable>(context);
//out = new MultipleOutputs<Text,Text>(context);
}
项目:data-polygamy
文件:ScalarFunctionDataMapper.java
@Override
public void setup(Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String[] fileSplitTokens = fileSplit.getPath().getParent().toString().split("/");
dataset = fileSplitTokens[fileSplitTokens.length-1];
out = new MultipleOutputs<Text,Text>(context);
}
项目:data-polygamy
文件:FeatureDataMapper.java
@Override
public void setup(Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String[] fileSplitTokens = fileSplit.getPath().getParent().toString().split("/");
dataset = fileSplitTokens[fileSplitTokens.length-1];
out = new MultipleOutputs<Text,Text>(context);
}
项目:hadoop-2.6.0-cdh5.4.3
文件:MultipleOutputs.java
/**
* Creates and initializes multiple outputs support,
* it should be instantiated in the Mapper/Reducer setup method.
*
* @param context the TaskInputOutputContext object
*/
public MultipleOutputs(
TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
this.context = context;
namedOutputs = Collections.unmodifiableSet(
new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
recordWriters = new HashMap<String, RecordWriter<?, ?>>();
countersEnabled = getCountersEnabled(context);
}
项目:incubator-rya
文件:AbstractReasoningTool.java
/**
* Set up the MapReduce job to output a schema (TBox).
*/
protected void configureSchemaOutput() {
Path outPath = MRReasoningUtils.getSchemaPath(job.getConfiguration());
SequenceFileOutputFormat.setOutputPath(job, outPath);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(SchemaWritable.class);
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
MultipleOutputs.addNamedOutput(job, "schemaobj",
SequenceFileOutputFormat.class, NullWritable.class, SchemaWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.setCountersEnabled(job, true);
}
项目:incubator-rya
文件:AbstractReasoningTool.java
/**
* Set up a MapReduce job to output newly derived triples.
* @param intermediate True if this is intermediate data. Outputs
* to [base]-[iteration]-[temp].
*/
protected void configureDerivationOutput(boolean intermediate) {
Path outPath;
Configuration conf = job.getConfiguration();
int iteration = MRReasoningUtils.getCurrentIteration(conf);
if (intermediate) {
outPath = MRReasoningUtils.getOutputPath(conf,
MRReasoningUtils.OUTPUT_BASE + iteration
+ MRReasoningUtils.TEMP_SUFFIX);
}
else {
outPath = MRReasoningUtils.getOutputPath(conf,
MRReasoningUtils.OUTPUT_BASE + iteration);
}
SequenceFileOutputFormat.setOutputPath(job, outPath);
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT,
SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT,
SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT,
SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT,
SequenceFileOutputFormat.class, Derivation.class, NullWritable.class);
MultipleOutputs.setCountersEnabled(job, true);
// Set up an output for diagnostic info, if needed
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
TextOutputFormat.class, Text.class, Text.class);
}
项目:incubator-rya
文件:ForwardChain.java
@Override
protected void setup(Context context) {
debugOut = new MultipleOutputs<>(context);
Configuration conf = context.getConfiguration();
if (schema == null) {
schema = MRReasoningUtils.loadSchema(context.getConfiguration());
}
debug = MRReasoningUtils.debug(conf);
}