/** * Wraps a given FSDataOutputStream with a CryptoOutputStream. The size of the * data buffer required for the stream is specified by the * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration * variable. * * @param conf * @param out * @return FSDataOutputStream * @throws IOException */ public static FSDataOutputStream wrapIfNecessary(Configuration conf, FSDataOutputStream out) throws IOException { if (isEncryptedSpillEnabled(conf)) { out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array()); byte[] iv = createIV(conf); out.write(iv); if (LOG.isDebugEnabled()) { LOG.debug("IV written to Stream [" + Base64.encodeBase64URLSafeString(iv) + "]"); } return new CryptoFSDataOutputStream(out, CryptoCodec.getInstance(conf), getBufferSize(conf), getEncryptionKey(), iv); } else { return out; } }
/** * Wraps a given FSDataOutputStream with a CryptoOutputStream. The size of the * data buffer required for the stream is specified by the * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration * variable. * * @param conf * @param out * @return FSDataOutputStream * @throws IOException */ public static FSDataOutputStream wrapIfNecessary(Configuration conf, FSDataOutputStream out) throws IOException { if (isShuffleEncrypted(conf)) { out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array()); byte[] iv = createIV(conf); out.write(iv); if (LOG.isDebugEnabled()) { LOG.debug("IV written to Stream [" + Base64.encodeBase64URLSafeString(iv) + "]"); } return new CryptoFSDataOutputStream(out, CryptoCodec.getInstance(conf), getBufferSize(conf), getEncryptionKey(), iv); } else { return out; } }
@Override protected OutputStream getOutputStream(int bufferSize, byte[] key, byte[] iv) throws IOException { return new CryptoFSDataOutputStream(fs.create(file), codec, bufferSize, key, iv); }