Java 类org.apache.hadoop.util.ReflectionUtils 实例源码
项目:hadoop
文件:CompressionEmulationUtil.java
/**
* Returns a {@link OutputStream} for a file that might need
* compression.
*/
static OutputStream getPossiblyCompressedOutputStream(Path file,
Configuration conf)
throws IOException {
FileSystem fs = file.getFileSystem(conf);
JobConf jConf = new JobConf(conf);
if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jConf)) {
// get the codec class
Class<? extends CompressionCodec> codecClass =
org.apache.hadoop.mapred.FileOutputFormat
.getOutputCompressorClass(jConf,
GzipCodec.class);
// get the codec implementation
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
// add the appropriate extension
file = file.suffix(codec.getDefaultExtension());
if (isCompressionEmulationEnabled(conf)) {
FSDataOutputStream fileOut = fs.create(file, false);
return new DataOutputStream(codec.createOutputStream(fileOut));
}
}
return fs.create(file, false);
}
项目:angel
文件:DFSStorageOldAPI.java
@SuppressWarnings({"rawtypes", "unchecked"})
public void initReader() throws IOException {
try {
Configuration conf = WorkerContext.get().getConf();
String inputFormatClassName =
conf.get(AngelConf.ANGEL_INPUTFORMAT_CLASS,
AngelConf.DEFAULT_ANGEL_INPUTFORMAT_CLASS);
Class<? extends org.apache.hadoop.mapred.InputFormat> inputFormatClass =
(Class<? extends org.apache.hadoop.mapred.InputFormat>) Class
.forName(inputFormatClassName);
org.apache.hadoop.mapred.InputFormat inputFormat =
ReflectionUtils.newInstance(inputFormatClass,
new JobConf(conf));
org.apache.hadoop.mapred.RecordReader<KEY, VALUE> recordReader =
inputFormat.getRecordReader(split, new JobConf(conf), Reporter.NULL);
setReader(new DFSReaderOldAPI(recordReader));
} catch (Exception x) {
LOG.error("init reader error ", x);
throw new IOException(x);
}
}
项目:hadoop
文件:CompositeInputSplit.java
/**
* {@inheritDoc}
* @throws IOException If the child InputSplit cannot be read, typically
* for faliing access checks.
*/
@SuppressWarnings("unchecked") // Generic array assignment
public void readFields(DataInput in) throws IOException {
int card = WritableUtils.readVInt(in);
if (splits == null || splits.length != card) {
splits = new InputSplit[card];
}
Class<? extends InputSplit>[] cls = new Class[card];
try {
for (int i = 0; i < card; ++i) {
cls[i] =
Class.forName(Text.readString(in)).asSubclass(InputSplit.class);
}
for (int i = 0; i < card; ++i) {
splits[i] = ReflectionUtils.newInstance(cls[i], null);
splits[i].readFields(in);
}
} catch (ClassNotFoundException e) {
throw (IOException)new IOException("Failed split init").initCause(e);
}
}
项目:hadoop
文件:ResourceManager.java
protected ResourceScheduler createScheduler() {
String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
YarnConfiguration.DEFAULT_RM_SCHEDULER);
LOG.info("Using Scheduler: " + schedulerClassName);
try {
Class<?> schedulerClazz = Class.forName(schedulerClassName);
if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) {
return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz,
this.conf);
} else {
throw new YarnRuntimeException("Class: " + schedulerClassName
+ " not instance of " + ResourceScheduler.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate Scheduler: "
+ schedulerClassName, e);
}
}
项目:hadoop-oss
文件:HttpServer2.java
/** Get an array of FilterConfiguration specified in the conf */
private static FilterInitializer[] getFilterInitializers(Configuration conf) {
if (conf == null) {
return null;
}
Class<?>[] classes = conf.getClasses(FILTER_INITIALIZER_PROPERTY);
if (classes == null) {
return null;
}
FilterInitializer[] initializers = new FilterInitializer[classes.length];
for(int i = 0; i < classes.length; i++) {
initializers[i] = (FilterInitializer)ReflectionUtils.newInstance(
classes[i], conf);
}
return initializers;
}
项目:hadoop
文件:MapTask.java
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
collector = createSortingCollector(job, reporter);
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
}
项目:hadoop-oss
文件:CompressionCodecFactory.java
/**
* Find the codecs specified in the config value io.compression.codecs
* and register them. Defaults to gzip and deflate.
*/
public CompressionCodecFactory(Configuration conf) {
codecs = new TreeMap<String, CompressionCodec>();
codecsByClassName = new HashMap<String, CompressionCodec>();
codecsByName = new HashMap<String, CompressionCodec>();
List<Class<? extends CompressionCodec>> codecClasses =
getCodecClasses(conf);
if (codecClasses == null || codecClasses.isEmpty()) {
addCodec(new GzipCodec());
addCodec(new DefaultCodec());
} else {
for (Class<? extends CompressionCodec> codecClass : codecClasses) {
addCodec(ReflectionUtils.newInstance(codecClass, conf));
}
}
}
项目:hadoop-oss
文件:WritableComparator.java
/** Get a comparator for a {@link WritableComparable} implementation. */
public static WritableComparator get(
Class<? extends WritableComparable> c, Configuration conf) {
WritableComparator comparator = comparators.get(c);
if (comparator == null) {
// force the static initializers to run
forceInit(c);
// look to see if it is defined now
comparator = comparators.get(c);
// if not, use the generic one
if (comparator == null) {
comparator = new WritableComparator(c, conf, true);
}
}
// Newly passed Configuration objects should be used.
ReflectionUtils.setConf(comparator, conf);
return comparator;
}
项目:hadoop
文件:Chain.java
private <E> E makeCopyForPassByValue(Serialization<E> serialization,
E obj) throws IOException {
Serializer<E> ser =
serialization.getSerializer(GenericsUtil.getClass(obj));
Deserializer<E> deser =
serialization.getDeserializer(GenericsUtil.getClass(obj));
DataOutputBuffer dof = threadLocalDataOutputBuffer.get();
dof.reset();
ser.open(dof);
ser.serialize(obj);
ser.close();
obj = ReflectionUtils.newInstance(GenericsUtil.getClass(obj),
getChainJobConf());
ByteArrayInputStream bais =
new ByteArrayInputStream(dof.getData(), 0, dof.getLength());
deser.open(bais);
deser.deserialize(obj);
deser.close();
return obj;
}
项目:ditb
文件:Encryption.java
public static KeyProvider getKeyProvider(Configuration conf) {
String providerClassName = conf.get(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY,
KeyStoreKeyProvider.class.getName());
String providerParameters = conf.get(HConstants.CRYPTO_KEYPROVIDER_PARAMETERS_KEY, "");
try {
Pair<String,String> providerCacheKey = new Pair<String,String>(providerClassName,
providerParameters);
KeyProvider provider = keyProviderCache.get(providerCacheKey);
if (provider != null) {
return provider;
}
provider = (KeyProvider) ReflectionUtils.newInstance(
getClassLoaderForClass(KeyProvider.class).loadClass(providerClassName),
conf);
provider.init(providerParameters);
if (LOG.isDebugEnabled()) {
LOG.debug("Installed " + providerClassName + " into key provider cache");
}
keyProviderCache.put(providerCacheKey, provider);
return provider;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
项目:hadoop-oss
文件:TestWritable.java
/** Utility method for testing writables. */
public static Writable testWritable(Writable before
, Configuration conf) throws Exception {
DataOutputBuffer dob = new DataOutputBuffer();
before.write(dob);
DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), dob.getLength());
Writable after = (Writable)ReflectionUtils.newInstance(
before.getClass(), conf);
after.readFields(dib);
assertEquals(before, after);
return after;
}
项目:hadoop
文件:AbstractReservationSystem.java
private PlanFollower createPlanFollower() {
String planFollowerPolicyClassName =
conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER,
getDefaultPlanFollower());
if (planFollowerPolicyClassName == null) {
return null;
}
LOG.info("Using PlanFollowerPolicy: " + planFollowerPolicyClassName);
try {
Class<?> planFollowerPolicyClazz =
conf.getClassByName(planFollowerPolicyClassName);
if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) {
return (PlanFollower) ReflectionUtils.newInstance(
planFollowerPolicyClazz, conf);
} else {
throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName
+ " not instance of " + PlanFollower.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException(
"Could not instantiate PlanFollowerPolicy: "
+ planFollowerPolicyClassName, e);
}
}
项目:spark_deep
文件:Server.java
private void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt(); // try to read an id
if (LOG.isDebugEnabled())
LOG.debug(" got #" + id);
Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param
((RPC.Invocation)param).setConf(conf);
param.readFields(dis);
Call call = new Call(id, param, this);
//
callQueue.put(call); // queue the call; maybe blocked here
incRpcCount(); // Increment the rpc count
}
项目:ditb
文件:IndexFile.java
@SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
// First clear the map. Otherwise we will just accumulate
// entries every time this method is called.
this.instance.clear();
// Read the number of entries in the map
int entries = in.readInt();
// Then read each key/value pair
for (int i = 0; i < entries; i++) {
byte[] key = Bytes.readByteArray(in);
byte id = in.readByte();
Class clazz = getClass(id);
V value = null;
if (clazz.equals(byte[].class)) {
byte[] bytes = Bytes.readByteArray(in);
value = (V) bytes;
} else {
Writable w = (Writable) ReflectionUtils.newInstance(clazz, getConf());
w.readFields(in);
value = (V) w;
}
this.instance.put(key, value);
}
}
项目:hadoop
文件:SharedCacheManager.java
@SuppressWarnings("unchecked")
private static SCMStore createSCMStoreService(Configuration conf) {
Class<? extends SCMStore> defaultStoreClass;
try {
defaultStoreClass =
(Class<? extends SCMStore>) Class
.forName(YarnConfiguration.DEFAULT_SCM_STORE_CLASS);
} catch (Exception e) {
throw new YarnRuntimeException("Invalid default scm store class"
+ YarnConfiguration.DEFAULT_SCM_STORE_CLASS, e);
}
SCMStore store =
ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.SCM_STORE_CLASS,
defaultStoreClass, SCMStore.class), conf);
return store;
}
项目:aliyun-maxcompute-data-collectors
文件:NetezzaExternalTableHCatImportMapper.java
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
helper = new SqoopHCatImportHelper(conf);
String recordClassName = conf.get(ConfigurationHelper
.getDbInputClassProperty());
if (null == recordClassName) {
throw new IOException("DB Input class name is not set!");
}
try {
Class<?> cls = Class.forName(recordClassName, true,
Thread.currentThread().getContextClassLoader());
sqoopRecord = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
if (null == sqoopRecord) {
throw new IOException("Could not instantiate object of type "
+ recordClassName);
}
}
项目:aliyun-maxcompute-data-collectors
文件:ParquetExportMapper.java
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
// Instantiate a copy of the user's class to hold and parse the record.
String recordClassName = conf.get(
ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
if (null == recordClassName) {
throw new IOException("Export table class name ("
+ ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+ ") is not set!");
}
try {
Class cls = Class.forName(recordClassName, true,
Thread.currentThread().getContextClassLoader());
recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
if (null == recordImpl) {
throw new IOException("Could not instantiate object of type "
+ recordClassName);
}
columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
MapWritable.class);
}
项目:aliyun-maxcompute-data-collectors
文件:CodecMap.java
/**
* Given a codec name, instantiate the concrete implementation
* class that implements it.
* @throws com.cloudera.sqoop.io.UnsupportedCodecException if a codec cannot
* be found with the supplied name.
*/
public static CompressionCodec getCodec(String codecName,
Configuration conf) throws com.cloudera.sqoop.io.UnsupportedCodecException {
// Try standard Hadoop mechanism first
CompressionCodec codec = getCodecByName(codecName, conf);
if (codec != null) {
return codec;
}
// Fall back to Sqoop mechanism
String codecClassName = null;
try {
codecClassName = getCodecClassName(codecName);
if (null == codecClassName) {
return null;
}
Class<? extends CompressionCodec> codecClass =
(Class<? extends CompressionCodec>)
conf.getClassByName(codecClassName);
return (CompressionCodec) ReflectionUtils.newInstance(
codecClass, conf);
} catch (ClassNotFoundException cnfe) {
throw new com.cloudera.sqoop.io.UnsupportedCodecException(
"Cannot find codec class "
+ codecClassName + " for codec " + codecName);
}
}
项目:aliyun-maxcompute-data-collectors
文件:JobBase.java
protected void doValidate(SqoopOptions options, Configuration conf,
ValidationContext validationContext)
throws ValidationException {
Validator validator = (Validator) ReflectionUtils.newInstance(
options.getValidatorClass(), conf);
ValidationThreshold threshold = (ValidationThreshold)
ReflectionUtils.newInstance(options.getValidationThresholdClass(),
conf);
ValidationFailureHandler failureHandler = (ValidationFailureHandler)
ReflectionUtils.newInstance(options.getValidationFailureHandlerClass(),
conf);
StringBuilder sb = new StringBuilder();
sb.append("Validating the integrity of the import using the "
+ "following configuration\n");
sb.append("\tValidator : ").append(validator.getClass().getName())
.append('\n');
sb.append("\tThreshold Specifier : ")
.append(threshold.getClass().getName()).append('\n');
sb.append("\tFailure Handler : ")
.append(failureHandler.getClass().getName()).append('\n');
LOG.info(sb.toString());
validator.validate(validationContext, threshold, failureHandler);
}
项目:ditb
文件:HRegionServer.java
static private ReplicationService newReplicationInstance(String classname, Configuration conf,
HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException {
Class<?> clazz = null;
try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
clazz = Class.forName(classname, true, classLoader);
} catch (java.lang.ClassNotFoundException nfe) {
throw new IOException("Could not find class for " + classname);
}
// create an instance of the replication object.
ReplicationService service = (ReplicationService) ReflectionUtils.newInstance(clazz, conf);
service.initialize(server, fs, logDir, oldLogDir);
return service;
}
项目:hadoop
文件:AbstractReservationSystem.java
protected Planner getReplanner(String planQueueName) {
ReservationSchedulerConfiguration reservationConfig =
getReservationSchedulerConfiguration();
String plannerClassName = reservationConfig.getReplanner(planQueueName);
LOG.info("Using Replanner: " + plannerClassName + " for queue: "
+ planQueueName);
try {
Class<?> plannerClazz = conf.getClassByName(plannerClassName);
if (Planner.class.isAssignableFrom(plannerClazz)) {
Planner planner =
(Planner) ReflectionUtils.newInstance(plannerClazz, conf);
planner.init(planQueueName, reservationConfig);
return planner;
} else {
throw new YarnRuntimeException("Class: " + plannerClazz
+ " not instance of " + Planner.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate Planner: "
+ plannerClassName + " for queue: " + planQueueName, e);
}
}
项目:hadoop
文件:RMProxy.java
/**
* Helper method to create FailoverProxyProvider.
*/
private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider(
Configuration conf, Class<T> protocol) {
Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass;
try {
defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>)
Class.forName(
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER);
} catch (Exception e) {
throw new YarnRuntimeException("Invalid default failover provider class" +
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e);
}
RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
defaultProviderClass, RMFailoverProxyProvider.class), conf);
provider.init(conf, (RMProxy<T>) this, protocol);
return provider;
}
项目:hadoop
文件:SharedCacheChecksumFactory.java
/**
* Get a new <code>SharedCacheChecksum</code> object based on the configurable
* algorithm implementation
* (see <code>yarn.sharedcache.checksum.algo.impl</code>)
*
* @return <code>SharedCacheChecksum</code> object
*/
public static SharedCacheChecksum getChecksum(Configuration conf) {
Class<? extends SharedCacheChecksum> clazz =
conf.getClass(YarnConfiguration.SHARED_CACHE_CHECKSUM_ALGO_IMPL,
defaultAlgorithm, SharedCacheChecksum.class);
SharedCacheChecksum checksum = instances.get(clazz);
if (checksum == null) {
try {
checksum = ReflectionUtils.newInstance(clazz, conf);
SharedCacheChecksum old = instances.putIfAbsent(clazz, checksum);
if (old != null) {
checksum = old;
}
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
}
return checksum;
}
项目:hadoop
文件:FileInputFormat.java
/**
* Get a PathFilter instance of the filter set for the input paths.
*
* @return the PathFilter instance set for the job, NULL if none has been set.
*/
public static PathFilter getInputPathFilter(JobContext context) {
Configuration conf = context.getConfiguration();
Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
PathFilter.class);
return (filterClass != null) ?
(PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
}
项目:wherehowsX
文件:SequenceFileAnalyzer.java
@Override
public SampleDataRecord getSampleData(Path path) throws IOException {
SampleDataRecord dataRecord = null;
if (!fs.exists(path))
LOG.error("sequence file : " + path.toUri().getPath() + " is not exist on hdfs");
else {
try {
LOG.info("sequencefileanalyzer start parse sampledata for file path : {}", path.toUri().getPath());
SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(path));
List<Object> sampleValues = new ArrayList<Object>();
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf());
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
int count = 0;
String keyName = "Key";
String valueName = "Value";
while (reader.next(key, value) && count < 12) {
sampleValues.add("{\"" + keyName + "\": \"" + key + "\", \"" + valueName + "\": \"" + value + "\"}");
count++;
}
dataRecord = new SampleDataRecord(path.toUri().getPath(), sampleValues);
LOG.info("sequence file path : {}, sample data is {}", path.toUri().getPath(), sampleValues);
} catch (Exception e) {
LOG.error("path : {} content " + " is not Sequence File format content ",path.toUri().getPath());
LOG.info(e.getStackTrace().toString());
}
}
return dataRecord;
}
项目:hadoop
文件:HttpServer2.java
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
if (!HttpServer2.isInstrumentationAccessAllowed(getServletContext(),
request, response)) {
return;
}
response.setContentType("text/plain; charset=UTF-8");
try (PrintStream out = new PrintStream(
response.getOutputStream(), false, "UTF-8")) {
ReflectionUtils.printThreadInfo(out, "");
}
ReflectionUtils.logThreadInfo(LOG, "jsp requested", 1);
}
项目:hadoop
文件:ResourceUsageMatcher.java
/**
* Configure the {@link ResourceUsageMatcher} to load the configured plugins
* and initialize them.
*/
@SuppressWarnings("unchecked")
public void configure(Configuration conf, ResourceCalculatorPlugin monitor,
ResourceUsageMetrics metrics, Progressive progress) {
Class[] plugins = conf.getClasses(RESOURCE_USAGE_EMULATION_PLUGINS);
if (plugins == null) {
System.out.println("No resource usage emulator plugins configured.");
} else {
for (Class clazz : plugins) {
if (clazz != null) {
if (ResourceUsageEmulatorPlugin.class.isAssignableFrom(clazz)) {
ResourceUsageEmulatorPlugin plugin =
(ResourceUsageEmulatorPlugin) ReflectionUtils.newInstance(clazz,
conf);
emulationPlugins.add(plugin);
} else {
throw new RuntimeException("Misconfigured resource usage plugins. "
+ "Class " + clazz.getClass().getName() + " is not a resource "
+ "usage plugin as it does not extend "
+ ResourceUsageEmulatorPlugin.class.getName());
}
}
}
}
// initialize the emulators once all the configured emulator plugins are
// loaded
for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) {
emulator.initialize(conf, metrics, monitor, progress);
}
}
项目:hadoop
文件:FsDatasetSpi.java
/** @return the configured factory. */
public static Factory<?> getFactory(Configuration conf) {
@SuppressWarnings("rawtypes")
final Class<? extends Factory> clazz = conf.getClass(
DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
FsDatasetFactory.class,
Factory.class);
return ReflectionUtils.newInstance(clazz, conf);
}
项目:hadoop
文件:TestShufflePlugin.java
@Test
/**
* A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
* as if it came from a 3rd party.
*/
public void testPluginAbility() {
try{
// create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
JobConf jobConf = new JobConf();
jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,
TestShufflePlugin.TestShuffleConsumerPlugin.class,
ShuffleConsumerPlugin.class);
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class<? extends ShuffleConsumerPlugin> clazz =
jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz);
// load 3rd party plugin through core's factory method
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin);
}
catch (Exception e) {
assertTrue("Threw exception:" + e, false);
}
}
项目:hadoop
文件:TaggedInputSplit.java
@SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
inputSplitClass = (Class<? extends InputSplit>) readClass(in);
inputSplit = (InputSplit) ReflectionUtils
.newInstance(inputSplitClass, conf);
inputSplit.readFields(in);
inputFormatClass = (Class<? extends InputFormat>) readClass(in);
mapperClass = (Class<? extends Mapper>) readClass(in);
}
项目:hadoop
文件:Display.java
public TextRecordInputStream(FileStatus f) throws IOException {
final Path fpath = f.getPath();
final Configuration lconf = getConf();
r = new SequenceFile.Reader(lconf,
SequenceFile.Reader.file(fpath));
key = ReflectionUtils.newInstance(
r.getKeyClass().asSubclass(WritableComparable.class), lconf);
val = ReflectionUtils.newInstance(
r.getValueClass().asSubclass(Writable.class), lconf);
inbuf = new DataInputBuffer();
outbuf = new DataOutputBuffer();
}
项目:hadoop
文件:MapRunner.java
@SuppressWarnings("unchecked")
public void configure(JobConf job) {
this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job);
//increment processed counter only if skipping feature is enabled
this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 &&
SkipBadRecords.getAutoIncrMapperProcCount(job);
}
项目:hadoop
文件:SequenceFileInputFilter.java
public FilterRecordReader(Configuration conf)
throws IOException {
super();
// instantiate filter
filter = (Filter)ReflectionUtils.newInstance(
conf.getClass(FILTER_CLASS, PercentFilter.class), conf);
}
项目:hadoop
文件:JobConf.java
/**
* Get the user defined {@link WritableComparable} comparator for
* grouping keys of inputs to the reduce.
*
* @return comparator set by the user for grouping values.
* @see #setOutputValueGroupingComparator(Class) for details.
*/
public RawComparator getOutputValueGroupingComparator() {
Class<? extends RawComparator> theClass = getClass(
JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
if (theClass == null) {
return getOutputKeyComparator();
}
return ReflectionUtils.newInstance(theClass, this);
}
项目:ditb
文件:Compression.java
private CompressionCodec buildCodec(Configuration conf) {
try {
Class<?> externalCodec =
getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec");
return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, new Configuration(
conf));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
项目:ditb
文件:CellCreator.java
public CellCreator(Configuration conf) {
Class<? extends VisibilityExpressionResolver> clazz = conf.getClass(
VISIBILITY_EXP_RESOLVER_CLASS, DefaultVisibilityExpressionResolver.class,
VisibilityExpressionResolver.class);
this.visExpResolver = ReflectionUtils.newInstance(clazz, conf);
this.visExpResolver.init();
}
项目:hadoop-oss
文件:HttpServer2.java
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
if (!HttpServer2.isInstrumentationAccessAllowed(getServletContext(),
request, response)) {
return;
}
response.setContentType("text/plain; charset=UTF-8");
try (PrintStream out = new PrintStream(
response.getOutputStream(), false, "UTF-8")) {
ReflectionUtils.printThreadInfo(out, "");
}
ReflectionUtils.logThreadInfo(LOG, "jsp requested", 1);
}
项目:hadoop
文件:GenericWritable.java
@Override
public void readFields(DataInput in) throws IOException {
type = in.readByte();
Class<? extends Writable> clazz = getTypes()[type & 0xff];
try {
instance = ReflectionUtils.newInstance(clazz, conf);
} catch (Exception e) {
e.printStackTrace();
throw new IOException("Cannot initialize the class: " + clazz);
}
instance.readFields(in);
}
项目:hadoop
文件:InputSampler.java
/**
* Write a partition file for the given job, using the Sampler provided.
* Queries the sampler for a sample keyset, sorts by the output key
* comparator, selects the keys for each rank, and writes to the destination
* returned from {@link TotalOrderPartitioner#getPartitionFile}.
*/
@SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler)
throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = job.getConfiguration();
final InputFormat inf =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
int numPartitions = job.getNumReduceTasks();
K[] samples = (K[])sampler.getSample(inf, job);
LOG.info("Using " + samples.length + " samples");
RawComparator<K> comparator =
(RawComparator<K>) job.getSortComparator();
Arrays.sort(samples, comparator);
Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
FileSystem fs = dst.getFileSystem(conf);
if (fs.exists(dst)) {
fs.delete(dst, false);
}
SequenceFile.Writer writer = SequenceFile.createWriter(fs,
conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
NullWritable nullValue = NullWritable.get();
float stepSize = samples.length / (float) numPartitions;
int last = -1;
for(int i = 1; i < numPartitions; ++i) {
int k = Math.round(stepSize * i);
while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
++k;
}
writer.append(samples[k], nullValue);
last = k;
}
writer.close();
}
项目:hadoop
文件:OverrideRecordReader.java
@SuppressWarnings("unchecked") // Explicit check for value class agreement
public V createValue() {
if (null == valueclass) {
Class<?> cls = kids[kids.length -1].createValue().getClass();
for (int i = kids.length -1; cls.equals(NullWritable.class); i--) {
cls = kids[i].createValue().getClass();
}
valueclass = cls.asSubclass(Writable.class);
}
if (valueclass.equals(NullWritable.class)) {
return (V) NullWritable.get();
}
return (V) ReflectionUtils.newInstance(valueclass, null);
}