/** * Wraps a given FSDataInputStream with a CryptoInputStream. The size of the * data buffer required for the stream is specified by the * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration * variable. * * @param conf * @param in * @return FSDataInputStream * @throws IOException */ public static FSDataInputStream wrapIfNecessary(Configuration conf, FSDataInputStream in) throws IOException { if (isEncryptedSpillEnabled(conf)) { CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf); int bufferSize = getBufferSize(conf); // Not going to be used... but still has to be read... // Since the O/P stream always writes it.. IOUtils.readFully(in, new byte[8], 0, 8); byte[] iv = new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()]; IOUtils.readFully(in, iv, 0, cryptoCodec.getCipherSuite().getAlgorithmBlockSize()); if (LOG.isDebugEnabled()) { LOG.debug("IV read from Stream [" + Base64.encodeBase64URLSafeString(iv) + "]"); } return new CryptoFSDataInputStream(in, cryptoCodec, bufferSize, getEncryptionKey(), iv); } else { return in; } }
/** * Wraps a given FSDataInputStream with a CryptoInputStream. The size of the * data buffer required for the stream is specified by the * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration * variable. * * @param conf * @param in * @return FSDataInputStream * @throws IOException */ public static FSDataInputStream wrapIfNecessary(Configuration conf, FSDataInputStream in) throws IOException { if (isShuffleEncrypted(conf)) { CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf); int bufferSize = getBufferSize(conf); // Not going to be used... but still has to be read... // Since the O/P stream always writes it.. IOUtils.readFully(in, new byte[8], 0, 8); byte[] iv = new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()]; IOUtils.readFully(in, iv, 0, cryptoCodec.getCipherSuite().getAlgorithmBlockSize()); if (LOG.isDebugEnabled()) { LOG.debug("IV read from Stream [" + Base64.encodeBase64URLSafeString(iv) + "]"); } return new CryptoFSDataInputStream(in, cryptoCodec, bufferSize, getEncryptionKey(), iv); } else { return in; } }
@Override protected InputStream getInputStream(int bufferSize, byte[] key, byte[] iv) throws IOException { return new CryptoFSDataInputStream(fs.open(file), codec, bufferSize, key, iv); }