private static int getShuffleResponseCode(ShuffleHandler shuffle, Token<JobTokenIdentifier> jt) throws IOException { URL url = new URL("http://127.0.0.1:" + shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + "/mapOutput?job=job_12345_0001&reduce=0&map=attempt_12345_1_m_1_0"); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); String encHash = SecureShuffleUtils.hashFromString( SecureShuffleUtils.buildMsgFrom(url), JobTokenSecretManager.createSecretKey(jt.getPassword())); conn.addRequestProperty( SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); conn.connect(); int rc = conn.getResponseCode(); conn.disconnect(); return rc; }
protected void populateHeaders(List<String> mapIds, String outputBaseStr, String user, int reduce, HttpRequest request, HttpResponse response, boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap) throws IOException { long contentLength = 0; for (String mapId : mapIds) { String base = outputBaseStr + mapId; MapOutputInfo outputInfo = getMapOutputInfo(base, mapId, reduce, user); if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) { mapOutputInfoMap.put(mapId, outputInfo); } // Index file Path indexFileName = lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf); IndexRecord info = indexCache.getIndexInformation(mapId, reduce, indexFileName, user); ShuffleHeader header = new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce); DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); contentLength += info.partLength; contentLength += dob.getLength(); } // Now set the response headers. setResponseHeaders(response, keepAliveParam, contentLength); }
protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); // Put shuffle version into http header response.setHeader(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); response.setContent( ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8)); // Close the connection as soon as the error message is sent. ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); }
/** * simulate a reducer that sends an invalid shuffle-header - sometimes a wrong * header_name and sometimes a wrong version * * @throws Exception exception */ @Test (timeout = 10000) public void testIncompatibleShuffleVersion() throws Exception { final int failureNum = 3; Configuration conf = new Configuration(); conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); ShuffleHandler shuffleHandler = new ShuffleHandler(); shuffleHandler.init(conf); shuffleHandler.start(); // simulate a reducer that closes early by reading a single shuffle header // then closing the connection URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0"); for (int i = 0; i < failureNum; ++i) { HttpURLConnection conn = (HttpURLConnection)url.openConnection(); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, i == 0 ? "mapreduce" : "other"); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, i == 1 ? "1.0.0" : "1.0.1"); conn.connect(); Assert.assertEquals( HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode()); } shuffleHandler.stop(); shuffleHandler.close(); }
@Test(timeout = 10000) public void testSocketKeepAlive() throws Exception { Configuration conf = new Configuration(); conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true); // try setting to -ve keep alive timeout. conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100); HttpURLConnection conn = null; MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2(); try { shuffleHandler.init(conf); shuffleHandler.start(); String shuffleBaseURL = "http://127.0.0.1:" + shuffleHandler.getConfig().get( ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); URL url = new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&" + "map=attempt_12345_1_m_1_0"); conn = (HttpURLConnection) url.openConnection(); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); conn.connect(); conn.getInputStream(); Assert.assertTrue("socket should be set KEEP_ALIVE", shuffleHandler.isSocketKeepAlive()); } finally { if (conn != null) { conn.disconnect(); } shuffleHandler.stop(); } }
@Test (timeout = 10000) public void testIncompatibleShuffleVersion() throws Exception { final int failureNum = 3; Configuration conf = new Configuration(); conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); ShuffleHandler shuffleHandler = new ShuffleHandler(); shuffleHandler.init(conf); shuffleHandler.start(); // simulate a reducer that closes early by reading a single shuffle header // then closing the connection URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0"); for (int i = 0; i < failureNum; ++i) { HttpURLConnection conn = (HttpURLConnection)url.openConnection(); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, i == 0 ? "mapreduce" : "other"); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, i == 1 ? "1.0.0" : "1.0.1"); conn.connect(); Assert.assertEquals( HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode()); } shuffleHandler.stop(); shuffleHandler.close(); }
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, String user, String mapId, int reduce, MapOutputInfo mapOutputInfo) throws IOException { final IndexRecord info = mapOutputInfo.indexRecord; final ShuffleHeader header = new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce); final DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); final File spillfile = new File(mapOutputInfo.mapOutputFileName.toString()); RandomAccessFile spill; try { spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null); } catch (FileNotFoundException e) { LOG.info(spillfile + " not found"); return null; } ChannelFuture writeFuture; if (ch.getPipeline().get(SslHandler.class) == null) { final FadvisedFileRegion partition = new FadvisedFileRegion(spill, info.startOffset, info.partLength, manageOsCache, readaheadLength, readaheadPool, spillfile.getAbsolutePath(), shuffleBufferSize, shuffleTransferToAllowed); writeFuture = ch.write(partition); writeFuture.addListener(new ChannelFutureListener() { // TODO error handling; distinguish IO/connection failures, // attribute to appropriate spill output @Override public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { partition.transferSuccessful(); } partition.releaseExternalResources(); } }); } else { // HTTPS cannot be done with zero copy. final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, info.startOffset, info.partLength, sslFileBufferSize, manageOsCache, readaheadLength, readaheadPool, spillfile.getAbsolutePath()); writeFuture = ch.write(chunk); } metrics.shuffleConnections.incr(); metrics.shuffleOutputBytes.incr(info.partLength); // optimistic return writeFuture; }
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, String user, String jobId, String mapId, int reduce) throws IOException { // TODO replace w/ rsrc alloc // $x/$user/appcache/$appId/output/$mapId // TODO: Once Shuffle is out of NM, this can use MR APIs to convert between App and Job JobID jobID = JobID.forName(jobId); ApplicationId appID = ApplicationId.newInstance( Long.parseLong(jobID.getJtIdentifier()), jobID.getId()); final String base = ContainerLocalizer.USERCACHE + "/" + user + "/" + ContainerLocalizer.APPCACHE + "/" + ConverterUtils.toString(appID) + "/output" + "/" + mapId; if (LOG.isDebugEnabled()) { LOG.debug("DEBUG0 " + base); } // Index file Path indexFileName = lDirAlloc.getLocalPathToRead( base + "/file.out.index", conf); // Map-output file Path mapOutputFileName = lDirAlloc.getLocalPathToRead( base + "/file.out", conf); if (LOG.isDebugEnabled()) { LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : " + indexFileName); } final IndexRecord info = indexCache.getIndexInformation(mapId, reduce, indexFileName, user); final ShuffleHeader header = new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce); final DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); final File spillfile = new File(mapOutputFileName.toString()); RandomAccessFile spill; try { spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null); } catch (FileNotFoundException e) { LOG.info(spillfile + " not found"); return null; } ChannelFuture writeFuture; if (ch.getPipeline().get(SslHandler.class) == null) { final FadvisedFileRegion partition = new FadvisedFileRegion(spill, info.startOffset, info.partLength, manageOsCache, readaheadLength, readaheadPool, spillfile.getAbsolutePath()); writeFuture = ch.write(partition); writeFuture.addListener(new ChannelFutureListener() { // TODO error handling; distinguish IO/connection failures, // attribute to appropriate spill output @Override public void operationComplete(ChannelFuture future) { partition.releaseExternalResources(); } }); } else { // HTTPS cannot be done with zero copy. final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, info.startOffset, info.partLength, sslFileBufferSize, manageOsCache, readaheadLength, readaheadPool, spillfile.getAbsolutePath()); writeFuture = ch.write(chunk); } metrics.shuffleConnections.incr(); metrics.shuffleOutputBytes.incr(info.partLength); // optimistic return writeFuture; }