private void configureGenericRecordExportInputFormat(Job job, String tableName) throws IOException { ConnManager connManager = context.getConnManager(); Map<String, Integer> columnTypeInts; if (options.getCall() == null) { columnTypeInts = connManager.getColumnTypes( tableName, options.getSqlQuery()); } else { columnTypeInts = connManager.getColumnTypesForProcedure( options.getCall()); } String[] specifiedColumns = options.getColumns(); MapWritable columnTypes = new MapWritable(); for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) { String column = e.getKey(); column = (specifiedColumns == null) ? column : options.getColumnNameCaseInsensitive(column); if (column != null) { Text columnName = new Text(column); Text columnType = new Text(connManager.toJavaType(tableName, column, e.getValue())); columnTypes.put(columnName, columnType); } } DefaultStringifier.store(job.getConfiguration(), columnTypes, AvroExportMapper.AVRO_COLUMN_TYPES_MAP); }
@Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); // Instantiate a copy of the user's class to hold and parse the record. String recordClassName = conf.get( ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY); if (null == recordClassName) { throw new IOException("Export table class name (" + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY + ") is not set!"); } try { Class cls = Class.forName(recordClassName, true, Thread.currentThread().getContextClassLoader()); recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf); } catch (ClassNotFoundException cnfe) { throw new IOException(cnfe); } if (null == recordImpl) { throw new IOException("Could not instantiate object of type " + recordClassName); } columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP, MapWritable.class); }
/** * Creates a {@link Configuration} for the Map or Reduce in the chain. * * <p> * It creates a new Configuration using the chain job's Configuration as base * and adds to it the configuration properties for the chain element. The keys * of the chain element Configuration have precedence over the given * Configuration. * </p> * * @param jobConf * the chain job's Configuration. * @param confKey * the key for chain element configuration serialized in the chain * job's Configuration. * @return a new Configuration aggregating the chain job's Configuration with * the chain element configuration properties. */ protected static Configuration getChainElementConf(Configuration jobConf, String confKey) { Configuration conf = null; try (Stringifier<Configuration> stringifier = new DefaultStringifier<Configuration>(jobConf, Configuration.class);) { String confString = jobConf.get(confKey, null); if (confString != null) { conf = stringifier.fromString(jobConf.get(confKey, null)); } } catch (IOException ioex) { throw new RuntimeException(ioex); } // we have to do this because the Writable desearialization clears all // values set in the conf making not possible do a // new Configuration(jobConf) in the creation of the conf above jobConf = new Configuration(jobConf); if (conf != null) { for (Map.Entry<String, String> entry : conf) { jobConf.set(entry.getKey(), entry.getValue()); } } return jobConf; }
protected LinkedMapWritable getForestStatusMap(Configuration conf) throws IOException { String forestHost = conf.get(OUTPUT_FOREST_HOST); if (forestHost != null) { //Restores the object from the configuration. LinkedMapWritable fhmap = DefaultStringifier.load(conf, OUTPUT_FOREST_HOST, LinkedMapWritable.class); // must be in fast load mode, otherwise won't reach here String s = conf.get(ASSIGNMENT_POLICY); //EXECUTION_MODE must have a value in mlcp; //default is "distributed" in hadoop connector String mode = conf.get(EXECUTION_MODE, MODE_DISTRIBUTED); if (MODE_DISTRIBUTED.equals(mode)) { AssignmentPolicy.Kind policy = AssignmentPolicy.Kind.forName(s); am.initialize(policy, fhmap, conf.getInt(BATCH_SIZE, 10)); } return fhmap; } else { throw new IOException("Forest host map not found"); } }
@Override public void checkOutputSpecs(Configuration conf, ContentSource cs) throws IOException { // check for required configuration if (conf.get(OUTPUT_QUERY) == null) { throw new IllegalArgumentException(OUTPUT_QUERY + " is not specified."); } // warn against unsupported configuration if (conf.get(BATCH_SIZE) != null) { LOG.warn("Config entry for " + "\"mapreduce.marklogic.output.batchsize\" is not " + "supported for " + this.getClass().getName() + " and will be ignored."); } String queryLanguage = conf.get(OUTPUT_QUERY_LANGUAGE); if (queryLanguage != null) { InternalUtilities.checkQueryLanguage(queryLanguage); } // store hosts into config system DefaultStringifier.store(conf, queryHosts(cs), OUTPUT_FOREST_HOST); }
/** * Creates a {@link Configuration} for the Map or Reduce in the chain. * * <p> * It creates a new Configuration using the chain job's Configuration as base * and adds to it the configuration properties for the chain element. The keys * of the chain element Configuration have precedence over the given * Configuration. * </p> * * @param jobConf * the chain job's Configuration. * @param confKey * the key for chain element configuration serialized in the chain * job's Configuration. * @return a new Configuration aggregating the chain job's Configuration with * the chain element configuration properties. */ protected static Configuration getChainElementConf(Configuration jobConf, String confKey) { Configuration conf = null; try { Stringifier<Configuration> stringifier = new DefaultStringifier<Configuration>(jobConf, Configuration.class); String confString = jobConf.get(confKey, null); if (confString != null) { conf = stringifier.fromString(jobConf.get(confKey, null)); } } catch (IOException ioex) { throw new RuntimeException(ioex); } // we have to do this because the Writable desearialization clears all // values set in the conf making not possible do a // new Configuration(jobConf) in the creation of the conf above jobConf = new Configuration(jobConf); if (conf != null) { for (Map.Entry<String, String> entry : conf) { jobConf.set(entry.getKey(), entry.getValue()); } } return jobConf; }
/** * Creates a {@link JobConf} for one of the Maps or Reduce in the chain. * <p/> * It creates a new JobConf using the chain job's JobConf as base and adds to * it the configuration properties for the chain element. The keys of the * chain element jobConf have precedence over the given JobConf. * * @param jobConf the chain job's JobConf. * @param confKey the key for chain element configuration serialized in the * chain job's JobConf. * @return a new JobConf aggregating the chain job's JobConf with the chain * element configuration properties. */ private static JobConf getChainElementConf(JobConf jobConf, String confKey) { JobConf conf; try { Stringifier<JobConf> stringifier = new DefaultStringifier<JobConf>(jobConf, JobConf.class); conf = stringifier.fromString(jobConf.get(confKey, null)); } catch (IOException ioex) { throw new RuntimeException(ioex); } // we have to do this because the Writable desearialization clears all // values set in the conf making not possible do do a new JobConf(jobConf) // in the creation of the conf above jobConf = new JobConf(jobConf); for(Map.Entry<String, String> entry : conf) { jobConf.set(entry.getKey(), entry.getValue()); } return jobConf; }
@Override public String toString() { Configuration conf = new Configuration(); conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," + "org.apache.hadoop.io.serializer.WritableSerialization"); DefaultStringifier<Map<String,String>> mapStringifier = new DefaultStringifier<Map<String,String>>(conf, GenericsUtil.getClass(params)); try { return mapStringifier.toString(params); } catch (IOException e) { log.info("Encountered IOException while deserializing returning empty string", e); return ""; } }
@Override protected void configureInputFormat(Job job, String tableName, String tableClassName, String splitByCol) throws ClassNotFoundException, IOException { fileType = getInputFileType(); super.configureInputFormat(job, tableName, tableClassName, splitByCol); if (fileType == FileType.AVRO_DATA_FILE) { LOG.debug("Configuring for Avro export"); ConnManager connManager = context.getConnManager(); Map<String, Integer> columnTypeInts = connManager.getColumnTypes(tableName, options.getSqlQuery()); MapWritable columnTypes = new MapWritable(); for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) { Text columnName = new Text(e.getKey()); Text columnText = new Text( connManager.toJavaType(tableName, e.getKey(), e.getValue())); columnTypes.put(columnName, columnText); } DefaultStringifier.store(job.getConfiguration(), columnTypes, AvroExportMapper.AVRO_COLUMN_TYPES_MAP); } }
@Test public void testAvroWithNoColumnsSpecified() throws Exception { SqoopOptions opts = new SqoopOptions(); opts.setExportDir("myexportdir"); JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE); Job job = new Job(); jdbcExportJob.configureInputFormat(job, null, null, null); assertEquals(asSetOfText("Age", "Name", "Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet()); }
@Test public void testAvroWithAllColumnsSpecified() throws Exception { SqoopOptions opts = new SqoopOptions(); opts.setExportDir("myexportdir"); String[] columns = { "Age", "Name", "Gender" }; opts.setColumns(columns); JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE); Job job = new Job(); jdbcExportJob.configureInputFormat(job, null, null, null); assertEquals(asSetOfText("Age", "Name", "Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet()); }
@Test public void testAvroWithOneColumnSpecified() throws Exception { SqoopOptions opts = new SqoopOptions(); opts.setExportDir("myexportdir"); String[] columns = { "Gender" }; opts.setColumns(columns); JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE); Job job = new Job(); jdbcExportJob.configureInputFormat(job, null, null, null); assertEquals(asSetOfText("Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet()); }
@Test public void testAvroWithSomeColumnsSpecified() throws Exception { SqoopOptions opts = new SqoopOptions(); opts.setExportDir("myexportdir"); String[] columns = { "Age", "Name" }; opts.setColumns(columns); JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE); Job job = new Job(); jdbcExportJob.configureInputFormat(job, null, null, null); assertEquals(asSetOfText("Age", "Name"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet()); }
@Test public void testAvroWithMoreColumnsSpecified() throws Exception { SqoopOptions opts = new SqoopOptions(); opts.setExportDir("myexportdir"); String[] columns = { "Age", "Name", "Gender", "Address" }; opts.setColumns(columns); JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE); Job job = new Job(); jdbcExportJob.configureInputFormat(job, null, null, null); assertEquals(asSetOfText("Age", "Name", "Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet()); }
protected static void setMapperConf(boolean isMap, Configuration jobConf, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, Configuration mapperConf, int index, String prefix) { // if the Mapper does not have a configuration, create an empty one if (mapperConf == null) { // using a Configuration without defaults to make it lightweight. // still the chain's conf may have all defaults and this conf is // overlapped to the chain configuration one. mapperConf = new Configuration(true); } // store the input/output classes of the mapper in the mapper conf mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class); mapperConf .setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass, Object.class); mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class); mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass, Object.class); // serialize the mapper configuration in the chain configuration. Stringifier<Configuration> stringifier = new DefaultStringifier<Configuration>(jobConf, Configuration.class); try { jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index, stringifier .toString(new Configuration(mapperConf))); } catch (IOException ioEx) { throw new RuntimeException(ioEx); } // increment the chain counter jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1); }
protected static void setReducerConf(Configuration jobConf, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, Configuration reducerConf, String prefix) { // if the Reducer does not have a Configuration, create an empty one if (reducerConf == null) { // using a Configuration without defaults to make it lightweight. // still the chain's conf may have all defaults and this conf is // overlapped to the chain's Configuration one. reducerConf = new Configuration(false); } // store the input/output classes of the reducer in // the reducer configuration reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class); reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass, Object.class); reducerConf .setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class); reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass, Object.class); // serialize the reducer configuration in the chain's configuration. Stringifier<Configuration> stringifier = new DefaultStringifier<Configuration>(jobConf, Configuration.class); try { jobConf.set(prefix + CHAIN_REDUCER_CONFIG, stringifier .toString(new Configuration(reducerConf))); } catch (IOException ioEx) { throw new RuntimeException(ioEx); } }
@Override public void checkOutputSpecs(Configuration conf, ContentSource cs) throws IOException { super.checkOutputSpecs(conf, cs); // store mimetypes map into config system DefaultStringifier.store(conf, getMimetypesMap(), ConfigConstants.CONF_MIMETYPES); }
protected LinkedMapWritable getRoleMap(TaskAttemptContext context) throws IOException{ //Restores the object from the configuration. Configuration conf = context.getConfiguration(); LinkedMapWritable fhmap = null; if(conf.get(ConfigConstants.CONF_ROLE_MAP)!=null) { fhmap = DefaultStringifier.load(conf, ConfigConstants.CONF_ROLE_MAP, LinkedMapWritable.class); } return fhmap; }
protected String getServerVersion(TaskAttemptContext context) throws IOException{ //Restores the object from the configuration. Configuration conf = context.getConfiguration(); Text version = DefaultStringifier.load(conf, ConfigConstants.CONF_ML_VERSION, Text.class); return version.toString(); }
@Override public void checkOutputSpecs(Configuration conf, ContentSource cs) throws IOException { // warn against unsupported configuration if (conf.get(BATCH_SIZE) != null) { LOG.warn("Config entry for " + "\"mapreduce.marklogic.output.batchsize\" is not " + "supported for " + this.getClass().getName() + " and will be ignored."); } // store hosts into config system DefaultStringifier.store(conf, queryHosts(cs), OUTPUT_FOREST_HOST); }
protected TextArrayWritable getHosts(Configuration conf) throws IOException { String forestHost = conf.get(OUTPUT_FOREST_HOST); if (forestHost != null) { // Restores the object from the configuration. TextArrayWritable hosts = DefaultStringifier.load(conf, OUTPUT_FOREST_HOST, TextArrayWritable.class); return hosts; } else { throw new IOException("Forest host map not found"); } }