/** * Returns a {@link OutputStream} for a file that might need * compression. */ static OutputStream getPossiblyCompressedOutputStream(Path file, Configuration conf) throws IOException { FileSystem fs = file.getFileSystem(conf); JobConf jConf = new JobConf(conf); if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jConf)) { // get the codec class Class<? extends CompressionCodec> codecClass = org.apache.hadoop.mapred.FileOutputFormat .getOutputCompressorClass(jConf, GzipCodec.class); // get the codec implementation CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); // add the appropriate extension file = file.suffix(codec.getDefaultExtension()); if (isCompressionEmulationEnabled(conf)) { FSDataOutputStream fileOut = fs.create(file, false); return new DataOutputStream(codec.createOutputStream(fileOut)); } } return fs.create(file, false); }
@SuppressWarnings({"rawtypes", "unchecked"}) public void initReader() throws IOException { try { Configuration conf = WorkerContext.get().getConf(); String inputFormatClassName = conf.get(AngelConf.ANGEL_INPUTFORMAT_CLASS, AngelConf.DEFAULT_ANGEL_INPUTFORMAT_CLASS); Class<? extends org.apache.hadoop.mapred.InputFormat> inputFormatClass = (Class<? extends org.apache.hadoop.mapred.InputFormat>) Class .forName(inputFormatClassName); org.apache.hadoop.mapred.InputFormat inputFormat = ReflectionUtils.newInstance(inputFormatClass, new JobConf(conf)); org.apache.hadoop.mapred.RecordReader<KEY, VALUE> recordReader = inputFormat.getRecordReader(split, new JobConf(conf), Reporter.NULL); setReader(new DFSReaderOldAPI(recordReader)); } catch (Exception x) { LOG.error("init reader error ", x); throw new IOException(x); } }
/** * {@inheritDoc} * @throws IOException If the child InputSplit cannot be read, typically * for faliing access checks. */ @SuppressWarnings("unchecked") // Generic array assignment public void readFields(DataInput in) throws IOException { int card = WritableUtils.readVInt(in); if (splits == null || splits.length != card) { splits = new InputSplit[card]; } Class<? extends InputSplit>[] cls = new Class[card]; try { for (int i = 0; i < card; ++i) { cls[i] = Class.forName(Text.readString(in)).asSubclass(InputSplit.class); } for (int i = 0; i < card; ++i) { splits[i] = ReflectionUtils.newInstance(cls[i], null); splits[i].readFields(in); } } catch (ClassNotFoundException e) { throw (IOException)new IOException("Failed split init").initCause(e); } }
protected ResourceScheduler createScheduler() { String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER, YarnConfiguration.DEFAULT_RM_SCHEDULER); LOG.info("Using Scheduler: " + schedulerClassName); try { Class<?> schedulerClazz = Class.forName(schedulerClassName); if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) { return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz, this.conf); } else { throw new YarnRuntimeException("Class: " + schedulerClassName + " not instance of " + ResourceScheduler.class.getCanonicalName()); } } catch (ClassNotFoundException e) { throw new YarnRuntimeException("Could not instantiate Scheduler: " + schedulerClassName, e); } }
/** Get an array of FilterConfiguration specified in the conf */ private static FilterInitializer[] getFilterInitializers(Configuration conf) { if (conf == null) { return null; } Class<?>[] classes = conf.getClasses(FILTER_INITIALIZER_PROPERTY); if (classes == null) { return null; } FilterInitializer[] initializers = new FilterInitializer[classes.length]; for(int i = 0; i < classes.length; i++) { initializers[i] = (FilterInitializer)ReflectionUtils.newInstance( classes[i], conf); } return initializers; }
@SuppressWarnings("unchecked") NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { collector = createSortingCollector(job, reporter); partitions = jobContext.getNumReduceTasks(); if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; } }
/** * Find the codecs specified in the config value io.compression.codecs * and register them. Defaults to gzip and deflate. */ public CompressionCodecFactory(Configuration conf) { codecs = new TreeMap<String, CompressionCodec>(); codecsByClassName = new HashMap<String, CompressionCodec>(); codecsByName = new HashMap<String, CompressionCodec>(); List<Class<? extends CompressionCodec>> codecClasses = getCodecClasses(conf); if (codecClasses == null || codecClasses.isEmpty()) { addCodec(new GzipCodec()); addCodec(new DefaultCodec()); } else { for (Class<? extends CompressionCodec> codecClass : codecClasses) { addCodec(ReflectionUtils.newInstance(codecClass, conf)); } } }
/** Get a comparator for a {@link WritableComparable} implementation. */ public static WritableComparator get( Class<? extends WritableComparable> c, Configuration conf) { WritableComparator comparator = comparators.get(c); if (comparator == null) { // force the static initializers to run forceInit(c); // look to see if it is defined now comparator = comparators.get(c); // if not, use the generic one if (comparator == null) { comparator = new WritableComparator(c, conf, true); } } // Newly passed Configuration objects should be used. ReflectionUtils.setConf(comparator, conf); return comparator; }
private <E> E makeCopyForPassByValue(Serialization<E> serialization, E obj) throws IOException { Serializer<E> ser = serialization.getSerializer(GenericsUtil.getClass(obj)); Deserializer<E> deser = serialization.getDeserializer(GenericsUtil.getClass(obj)); DataOutputBuffer dof = threadLocalDataOutputBuffer.get(); dof.reset(); ser.open(dof); ser.serialize(obj); ser.close(); obj = ReflectionUtils.newInstance(GenericsUtil.getClass(obj), getChainJobConf()); ByteArrayInputStream bais = new ByteArrayInputStream(dof.getData(), 0, dof.getLength()); deser.open(bais); deser.deserialize(obj); deser.close(); return obj; }
public static KeyProvider getKeyProvider(Configuration conf) { String providerClassName = conf.get(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyStoreKeyProvider.class.getName()); String providerParameters = conf.get(HConstants.CRYPTO_KEYPROVIDER_PARAMETERS_KEY, ""); try { Pair<String,String> providerCacheKey = new Pair<String,String>(providerClassName, providerParameters); KeyProvider provider = keyProviderCache.get(providerCacheKey); if (provider != null) { return provider; } provider = (KeyProvider) ReflectionUtils.newInstance( getClassLoaderForClass(KeyProvider.class).loadClass(providerClassName), conf); provider.init(providerParameters); if (LOG.isDebugEnabled()) { LOG.debug("Installed " + providerClassName + " into key provider cache"); } keyProviderCache.put(providerCacheKey, provider); return provider; } catch (Exception e) { throw new RuntimeException(e); } }
/** Utility method for testing writables. */ public static Writable testWritable(Writable before , Configuration conf) throws Exception { DataOutputBuffer dob = new DataOutputBuffer(); before.write(dob); DataInputBuffer dib = new DataInputBuffer(); dib.reset(dob.getData(), dob.getLength()); Writable after = (Writable)ReflectionUtils.newInstance( before.getClass(), conf); after.readFields(dib); assertEquals(before, after); return after; }
private PlanFollower createPlanFollower() { String planFollowerPolicyClassName = conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER, getDefaultPlanFollower()); if (planFollowerPolicyClassName == null) { return null; } LOG.info("Using PlanFollowerPolicy: " + planFollowerPolicyClassName); try { Class<?> planFollowerPolicyClazz = conf.getClassByName(planFollowerPolicyClassName); if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) { return (PlanFollower) ReflectionUtils.newInstance( planFollowerPolicyClazz, conf); } else { throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName + " not instance of " + PlanFollower.class.getCanonicalName()); } } catch (ClassNotFoundException e) { throw new YarnRuntimeException( "Could not instantiate PlanFollowerPolicy: " + planFollowerPolicyClassName, e); } }
private void processData(byte[] buf) throws IOException, InterruptedException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); int id = dis.readInt(); // try to read an id if (LOG.isDebugEnabled()) LOG.debug(" got #" + id); Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param ((RPC.Invocation)param).setConf(conf); param.readFields(dis); Call call = new Call(id, param, this); // callQueue.put(call); // queue the call; maybe blocked here incRpcCount(); // Increment the rpc count }
@SuppressWarnings("unchecked") public void readFields(DataInput in) throws IOException { // First clear the map. Otherwise we will just accumulate // entries every time this method is called. this.instance.clear(); // Read the number of entries in the map int entries = in.readInt(); // Then read each key/value pair for (int i = 0; i < entries; i++) { byte[] key = Bytes.readByteArray(in); byte id = in.readByte(); Class clazz = getClass(id); V value = null; if (clazz.equals(byte[].class)) { byte[] bytes = Bytes.readByteArray(in); value = (V) bytes; } else { Writable w = (Writable) ReflectionUtils.newInstance(clazz, getConf()); w.readFields(in); value = (V) w; } this.instance.put(key, value); } }
@SuppressWarnings("unchecked") private static SCMStore createSCMStoreService(Configuration conf) { Class<? extends SCMStore> defaultStoreClass; try { defaultStoreClass = (Class<? extends SCMStore>) Class .forName(YarnConfiguration.DEFAULT_SCM_STORE_CLASS); } catch (Exception e) { throw new YarnRuntimeException("Invalid default scm store class" + YarnConfiguration.DEFAULT_SCM_STORE_CLASS, e); } SCMStore store = ReflectionUtils.newInstance(conf.getClass( YarnConfiguration.SCM_STORE_CLASS, defaultStoreClass, SCMStore.class), conf); return store; }
@Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); helper = new SqoopHCatImportHelper(conf); String recordClassName = conf.get(ConfigurationHelper .getDbInputClassProperty()); if (null == recordClassName) { throw new IOException("DB Input class name is not set!"); } try { Class<?> cls = Class.forName(recordClassName, true, Thread.currentThread().getContextClassLoader()); sqoopRecord = (SqoopRecord) ReflectionUtils.newInstance(cls, conf); } catch (ClassNotFoundException cnfe) { throw new IOException(cnfe); } if (null == sqoopRecord) { throw new IOException("Could not instantiate object of type " + recordClassName); } }
@Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); // Instantiate a copy of the user's class to hold and parse the record. String recordClassName = conf.get( ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY); if (null == recordClassName) { throw new IOException("Export table class name (" + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY + ") is not set!"); } try { Class cls = Class.forName(recordClassName, true, Thread.currentThread().getContextClassLoader()); recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf); } catch (ClassNotFoundException cnfe) { throw new IOException(cnfe); } if (null == recordImpl) { throw new IOException("Could not instantiate object of type " + recordClassName); } columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP, MapWritable.class); }
/** * Given a codec name, instantiate the concrete implementation * class that implements it. * @throws com.cloudera.sqoop.io.UnsupportedCodecException if a codec cannot * be found with the supplied name. */ public static CompressionCodec getCodec(String codecName, Configuration conf) throws com.cloudera.sqoop.io.UnsupportedCodecException { // Try standard Hadoop mechanism first CompressionCodec codec = getCodecByName(codecName, conf); if (codec != null) { return codec; } // Fall back to Sqoop mechanism String codecClassName = null; try { codecClassName = getCodecClassName(codecName); if (null == codecClassName) { return null; } Class<? extends CompressionCodec> codecClass = (Class<? extends CompressionCodec>) conf.getClassByName(codecClassName); return (CompressionCodec) ReflectionUtils.newInstance( codecClass, conf); } catch (ClassNotFoundException cnfe) { throw new com.cloudera.sqoop.io.UnsupportedCodecException( "Cannot find codec class " + codecClassName + " for codec " + codecName); } }
protected void doValidate(SqoopOptions options, Configuration conf, ValidationContext validationContext) throws ValidationException { Validator validator = (Validator) ReflectionUtils.newInstance( options.getValidatorClass(), conf); ValidationThreshold threshold = (ValidationThreshold) ReflectionUtils.newInstance(options.getValidationThresholdClass(), conf); ValidationFailureHandler failureHandler = (ValidationFailureHandler) ReflectionUtils.newInstance(options.getValidationFailureHandlerClass(), conf); StringBuilder sb = new StringBuilder(); sb.append("Validating the integrity of the import using the " + "following configuration\n"); sb.append("\tValidator : ").append(validator.getClass().getName()) .append('\n'); sb.append("\tThreshold Specifier : ") .append(threshold.getClass().getName()).append('\n'); sb.append("\tFailure Handler : ") .append(failureHandler.getClass().getName()).append('\n'); LOG.info(sb.toString()); validator.validate(validationContext, threshold, failureHandler); }
static private ReplicationService newReplicationInstance(String classname, Configuration conf, HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException { Class<?> clazz = null; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); clazz = Class.forName(classname, true, classLoader); } catch (java.lang.ClassNotFoundException nfe) { throw new IOException("Could not find class for " + classname); } // create an instance of the replication object. ReplicationService service = (ReplicationService) ReflectionUtils.newInstance(clazz, conf); service.initialize(server, fs, logDir, oldLogDir); return service; }
protected Planner getReplanner(String planQueueName) { ReservationSchedulerConfiguration reservationConfig = getReservationSchedulerConfiguration(); String plannerClassName = reservationConfig.getReplanner(planQueueName); LOG.info("Using Replanner: " + plannerClassName + " for queue: " + planQueueName); try { Class<?> plannerClazz = conf.getClassByName(plannerClassName); if (Planner.class.isAssignableFrom(plannerClazz)) { Planner planner = (Planner) ReflectionUtils.newInstance(plannerClazz, conf); planner.init(planQueueName, reservationConfig); return planner; } else { throw new YarnRuntimeException("Class: " + plannerClazz + " not instance of " + Planner.class.getCanonicalName()); } } catch (ClassNotFoundException e) { throw new YarnRuntimeException("Could not instantiate Planner: " + plannerClassName + " for queue: " + planQueueName, e); } }
/** * Helper method to create FailoverProxyProvider. */ private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider( Configuration conf, Class<T> protocol) { Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass; try { defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>) Class.forName( YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER); } catch (Exception e) { throw new YarnRuntimeException("Invalid default failover provider class" + YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e); } RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance( conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER, defaultProviderClass, RMFailoverProxyProvider.class), conf); provider.init(conf, (RMProxy<T>) this, protocol); return provider; }
/** * Get a new <code>SharedCacheChecksum</code> object based on the configurable * algorithm implementation * (see <code>yarn.sharedcache.checksum.algo.impl</code>) * * @return <code>SharedCacheChecksum</code> object */ public static SharedCacheChecksum getChecksum(Configuration conf) { Class<? extends SharedCacheChecksum> clazz = conf.getClass(YarnConfiguration.SHARED_CACHE_CHECKSUM_ALGO_IMPL, defaultAlgorithm, SharedCacheChecksum.class); SharedCacheChecksum checksum = instances.get(clazz); if (checksum == null) { try { checksum = ReflectionUtils.newInstance(clazz, conf); SharedCacheChecksum old = instances.putIfAbsent(clazz, checksum); if (old != null) { checksum = old; } } catch (Exception e) { throw new YarnRuntimeException(e); } } return checksum; }
/** * Get a PathFilter instance of the filter set for the input paths. * * @return the PathFilter instance set for the job, NULL if none has been set. */ public static PathFilter getInputPathFilter(JobContext context) { Configuration conf = context.getConfiguration(); Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null, PathFilter.class); return (filterClass != null) ? (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null; }
@Override public SampleDataRecord getSampleData(Path path) throws IOException { SampleDataRecord dataRecord = null; if (!fs.exists(path)) LOG.error("sequence file : " + path.toUri().getPath() + " is not exist on hdfs"); else { try { LOG.info("sequencefileanalyzer start parse sampledata for file path : {}", path.toUri().getPath()); SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(path)); List<Object> sampleValues = new ArrayList<Object>(); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf()); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf()); int count = 0; String keyName = "Key"; String valueName = "Value"; while (reader.next(key, value) && count < 12) { sampleValues.add("{\"" + keyName + "\": \"" + key + "\", \"" + valueName + "\": \"" + value + "\"}"); count++; } dataRecord = new SampleDataRecord(path.toUri().getPath(), sampleValues); LOG.info("sequence file path : {}, sample data is {}", path.toUri().getPath(), sampleValues); } catch (Exception e) { LOG.error("path : {} content " + " is not Sequence File format content ",path.toUri().getPath()); LOG.info(e.getStackTrace().toString()); } } return dataRecord; }
@Override public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { if (!HttpServer2.isInstrumentationAccessAllowed(getServletContext(), request, response)) { return; } response.setContentType("text/plain; charset=UTF-8"); try (PrintStream out = new PrintStream( response.getOutputStream(), false, "UTF-8")) { ReflectionUtils.printThreadInfo(out, ""); } ReflectionUtils.logThreadInfo(LOG, "jsp requested", 1); }
/** * Configure the {@link ResourceUsageMatcher} to load the configured plugins * and initialize them. */ @SuppressWarnings("unchecked") public void configure(Configuration conf, ResourceCalculatorPlugin monitor, ResourceUsageMetrics metrics, Progressive progress) { Class[] plugins = conf.getClasses(RESOURCE_USAGE_EMULATION_PLUGINS); if (plugins == null) { System.out.println("No resource usage emulator plugins configured."); } else { for (Class clazz : plugins) { if (clazz != null) { if (ResourceUsageEmulatorPlugin.class.isAssignableFrom(clazz)) { ResourceUsageEmulatorPlugin plugin = (ResourceUsageEmulatorPlugin) ReflectionUtils.newInstance(clazz, conf); emulationPlugins.add(plugin); } else { throw new RuntimeException("Misconfigured resource usage plugins. " + "Class " + clazz.getClass().getName() + " is not a resource " + "usage plugin as it does not extend " + ResourceUsageEmulatorPlugin.class.getName()); } } } } // initialize the emulators once all the configured emulator plugins are // loaded for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) { emulator.initialize(conf, metrics, monitor, progress); } }
/** @return the configured factory. */ public static Factory<?> getFactory(Configuration conf) { @SuppressWarnings("rawtypes") final Class<? extends Factory> clazz = conf.getClass( DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY, FsDatasetFactory.class, Factory.class); return ReflectionUtils.newInstance(clazz, conf); }
@Test /** * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin * as if it came from a 3rd party. */ public void testPluginAbility() { try{ // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin JobConf jobConf = new JobConf(); jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, TestShufflePlugin.TestShuffleConsumerPlugin.class, ShuffleConsumerPlugin.class); ShuffleConsumerPlugin shuffleConsumerPlugin = null; Class<? extends ShuffleConsumerPlugin> clazz = jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz); // load 3rd party plugin through core's factory method shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf); assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin); } catch (Exception e) { assertTrue("Threw exception:" + e, false); } }
@SuppressWarnings("unchecked") public void readFields(DataInput in) throws IOException { inputSplitClass = (Class<? extends InputSplit>) readClass(in); inputSplit = (InputSplit) ReflectionUtils .newInstance(inputSplitClass, conf); inputSplit.readFields(in); inputFormatClass = (Class<? extends InputFormat>) readClass(in); mapperClass = (Class<? extends Mapper>) readClass(in); }
public TextRecordInputStream(FileStatus f) throws IOException { final Path fpath = f.getPath(); final Configuration lconf = getConf(); r = new SequenceFile.Reader(lconf, SequenceFile.Reader.file(fpath)); key = ReflectionUtils.newInstance( r.getKeyClass().asSubclass(WritableComparable.class), lconf); val = ReflectionUtils.newInstance( r.getValueClass().asSubclass(Writable.class), lconf); inbuf = new DataInputBuffer(); outbuf = new DataOutputBuffer(); }
@SuppressWarnings("unchecked") public void configure(JobConf job) { this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job); //increment processed counter only if skipping feature is enabled this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && SkipBadRecords.getAutoIncrMapperProcCount(job); }
public FilterRecordReader(Configuration conf) throws IOException { super(); // instantiate filter filter = (Filter)ReflectionUtils.newInstance( conf.getClass(FILTER_CLASS, PercentFilter.class), conf); }
/** * Get the user defined {@link WritableComparable} comparator for * grouping keys of inputs to the reduce. * * @return comparator set by the user for grouping values. * @see #setOutputValueGroupingComparator(Class) for details. */ public RawComparator getOutputValueGroupingComparator() { Class<? extends RawComparator> theClass = getClass( JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class); if (theClass == null) { return getOutputKeyComparator(); } return ReflectionUtils.newInstance(theClass, this); }
private CompressionCodec buildCodec(Configuration conf) { try { Class<?> externalCodec = getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec"); return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, new Configuration( conf)); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } }
public CellCreator(Configuration conf) { Class<? extends VisibilityExpressionResolver> clazz = conf.getClass( VISIBILITY_EXP_RESOLVER_CLASS, DefaultVisibilityExpressionResolver.class, VisibilityExpressionResolver.class); this.visExpResolver = ReflectionUtils.newInstance(clazz, conf); this.visExpResolver.init(); }
@Override public void readFields(DataInput in) throws IOException { type = in.readByte(); Class<? extends Writable> clazz = getTypes()[type & 0xff]; try { instance = ReflectionUtils.newInstance(clazz, conf); } catch (Exception e) { e.printStackTrace(); throw new IOException("Cannot initialize the class: " + clazz); } instance.readFields(in); }
/** * Write a partition file for the given job, using the Sampler provided. * Queries the sampler for a sample keyset, sorts by the output key * comparator, selects the keys for each rank, and writes to the destination * returned from {@link TotalOrderPartitioner#getPartitionFile}. */ @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = job.getConfiguration(); final InputFormat inf = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); int numPartitions = job.getNumReduceTasks(); K[] samples = (K[])sampler.getSample(inf, job); LOG.info("Using " + samples.length + " samples"); RawComparator<K> comparator = (RawComparator<K>) job.getSortComparator(); Arrays.sort(samples, comparator); Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf)); FileSystem fs = dst.getFileSystem(conf); if (fs.exists(dst)) { fs.delete(dst, false); } SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, dst, job.getMapOutputKeyClass(), NullWritable.class); NullWritable nullValue = NullWritable.get(); float stepSize = samples.length / (float) numPartitions; int last = -1; for(int i = 1; i < numPartitions; ++i) { int k = Math.round(stepSize * i); while (last >= k && comparator.compare(samples[last], samples[k]) == 0) { ++k; } writer.append(samples[k], nullValue); last = k; } writer.close(); }
@SuppressWarnings("unchecked") // Explicit check for value class agreement public V createValue() { if (null == valueclass) { Class<?> cls = kids[kids.length -1].createValue().getClass(); for (int i = kids.length -1; cls.equals(NullWritable.class); i--) { cls = kids[i].createValue().getClass(); } valueclass = cls.asSubclass(Writable.class); } if (valueclass.equals(NullWritable.class)) { return (V) NullWritable.get(); } return (V) ReflectionUtils.newInstance(valueclass, null); }