private <E> E makeCopyForPassByValue(Serialization<E> serialization, E obj) throws IOException { Serializer<E> ser = serialization.getSerializer(GenericsUtil.getClass(obj)); Deserializer<E> deser = serialization.getDeserializer(GenericsUtil.getClass(obj)); DataOutputBuffer dof = threadLocalDataOutputBuffer.get(); dof.reset(); ser.open(dof); ser.serialize(obj); ser.close(); obj = ReflectionUtils.newInstance(GenericsUtil.getClass(obj), getChainJobConf()); ByteArrayInputStream bais = new ByteArrayInputStream(dof.getData(), 0, dof.getLength()); deser.open(bais); deser.deserialize(obj); deser.close(); return obj; }
/** * Gets serializer for specified class. * * @param cls Class. * @param jobConf Job configuration. * @return Appropriate serializer. */ @SuppressWarnings("unchecked") private HadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException { A.notNull(cls, "cls"); SerializationFactory factory = new SerializationFactory(jobConf); Serialization<?> serialization = factory.getSerialization(cls); if (serialization == null) throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName()); if (serialization.getClass() == WritableSerialization.class) return new HadoopWritableSerialization((Class<? extends Writable>)cls); return new HadoopSerializationWrapper(serialization, cls); }
/** * Returns the OutputCollector to be used by a Mapper instance in the chain. * * @param mapperIndex index of the Mapper instance to get the OutputCollector. * @param output the original OutputCollector of the task. * @param reporter the reporter of the task. * @return the OutputCollector to be used in the chain. */ @SuppressWarnings({"unchecked"}) public OutputCollector getMapperCollector(int mapperIndex, OutputCollector output, Reporter reporter) { Serialization keySerialization = mappersKeySerialization.get(mapperIndex); Serialization valueSerialization = mappersValueSerialization.get(mapperIndex); return new ChainOutputCollector(mapperIndex, keySerialization, valueSerialization, output, reporter); }
public ChainOutputCollector(int index, Serialization<K> keySerialization, Serialization<V> valueSerialization, OutputCollector output, Reporter reporter) { this.nextMapperIndex = index + 1; this.keySerialization = keySerialization; this.valueSerialization = valueSerialization; this.output = output; this.reporter = reporter; }
public ChainOutputCollector(Serialization<K> keySerialization, Serialization<V> valueSerialization, OutputCollector output, Reporter reporter) { this.nextMapperIndex = 0; this.keySerialization = keySerialization; this.valueSerialization = valueSerialization; this.output = output; this.reporter = reporter; }
@SuppressWarnings({"unchecked"}) public void collect(K key, V value) throws IOException { if (nextMapperIndex < mappers.size()) { // there is a next mapper in chain // only need to ser/deser if there is next mapper in the chain if (keySerialization != null) { key = makeCopyForPassByValue(keySerialization, key); value = makeCopyForPassByValue(valueSerialization, value); } // gets ser/deser and mapper of next in chain Serialization nextKeySerialization = mappersKeySerialization.get(nextMapperIndex); Serialization nextValueSerialization = mappersValueSerialization.get(nextMapperIndex); Mapper nextMapper = mappers.get(nextMapperIndex); // invokes next mapper in chain nextMapper.map(key, value, new ChainOutputCollector(nextMapperIndex, nextKeySerialization, nextValueSerialization, output, reporter), reporter); } else { // end of chain, user real output collector output.collect(key, value); } }