diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index c6657750f4..8b1b1aee0b 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -129,7 +129,6 @@ import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultHttpResponse; -import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpObjectAggregator; @@ -306,21 +305,28 @@ public ReduceMapFileCount(ReduceContext rc) { @Override public void operationComplete(ChannelFuture future) throws Exception { + Channel ch = future.channel(); if (!future.isSuccess()) { - future.channel().close(); + ch.close(); return; } int waitCount = this.reduceContext.getMapsToWait().decrementAndGet(); if (waitCount == 0) { + LOG.debug("Finished with all map outputs"); + /* + * LastHttpContent.EMPTY_LAST_CONTENT can only be written when there are no remaining maps to send, + * this is the only time we can finish the HTTP response. + */ + ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); metrics.operationComplete(future); // Let the idle timer handler close keep-alive connections if (reduceContext.getKeepAlive()) { - ChannelPipeline pipeline = future.channel().pipeline(); + ChannelPipeline pipeline = ch.pipeline(); TimeoutHandler timeoutHandler = (TimeoutHandler) pipeline.get(TIMEOUT_HANDLER); timeoutHandler.setEnabledTimeout(true); } else { - future.channel().close(); + ch.close(); } } else { SHUFFLE.sendMap(reduceContext); @@ -993,12 +999,11 @@ public void channelActive(ChannelHandlerContext ctx) @Override public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { - FullHttpRequest request = (FullHttpRequest) message; + HttpRequest request = (HttpRequest) message; handleRequest(ctx, request); - request.release(); } - private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request) + private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) throws IOException, Exception { if (request.getMethod() != GET) { sendError(ctx, METHOD_NOT_ALLOWED); @@ -1123,13 +1128,9 @@ private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request) for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) { ChannelFuture nextMap = sendMap(reduceContext); if(nextMap == null) { - // by this special message flushed, we can make sure the whole response is finished - ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); return; } } - // by this special message flushed, we can make sure the whole response is finished - ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); } private boolean isNullOrEmpty(List entries) { @@ -1496,7 +1497,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, DataOutputBuffer dobRange = new DataOutputBuffer(); // Indicate how many record to be written WritableUtils.writeVInt(dobRange, reduceRange.getLast() - reduceRange.getFirst() + 1); - ch.write(wrappedBuffer(dobRange.getData(), 0, dobRange.getLength())); + ch.writeAndFlush(wrappedBuffer(dobRange.getData(), 0, dobRange.getLength())); for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) { TezIndexRecord index = outputInfo.getIndex(reduce); // Records are only valid if they have a non-zero part length @@ -1511,7 +1512,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); // Free the memory needed to store the spill and index records - ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength())); } outputInfo.finish(); @@ -1531,14 +1532,14 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, rangeOffset, rangePartLength, manageOsCache, readaheadLength, readaheadPool, spillFile.getAbsolutePath(), shuffleBufferSize, shuffleTransferToAllowed); - writeFuture = ch.write(partition); + writeFuture = ch.writeAndFlush(partition); } else { // HTTPS cannot be done with zero copy. final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, rangeOffset, rangePartLength, sslFileBufferSize, manageOsCache, readaheadLength, readaheadPool, spillFile.getAbsolutePath()); - writeFuture = ch.write(chunk); + writeFuture = ch.writeAndFlush(chunk); } metrics.shuffleConnections.incr(); metrics.shuffleOutputBytes.incr(rangePartLength); // optimistic diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java index 40b16857ad..d0d0a381e7 100644 --- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java @@ -48,6 +48,8 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.zip.Checksum; import org.apache.hadoop.conf.Configuration; @@ -109,6 +111,8 @@ public class TestShuffleHandler { private static final File TEST_DIR = new File(System.getProperty("test.build.data"), TestShuffleHandler.class.getName()).getAbsoluteFile(); private static final String HADOOP_TMP_DIR = "hadoop.tmp.dir"; + private static final String TEST_PARTITION_DATA_STRING = "0123456789"; + class MockShuffleHandler extends org.apache.tez.auxservices.ShuffleHandler { @Override protected Shuffle getShuffle(final Configuration conf) { @@ -283,9 +287,7 @@ static void checkShuffleMetrics(MetricsSystem ms, long bytes, int failed, @Test (timeout = 10000) public void testClientClosesConnection() throws Exception { final AtomicBoolean failureEncountered = new AtomicBoolean(false); - Configuration conf = new Configuration(); - conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath()); - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + Configuration conf = getInitialConf(); ShuffleHandler shuffleHandler = new ShuffleHandler() { @Override protected Shuffle getShuffle(Configuration conf) { @@ -387,9 +389,7 @@ SocketAddress getSocketAddress() { @Test(timeout = 10000) public void testKeepAlive() throws Exception { final AtomicBoolean failureEncountered = new AtomicBoolean(false); - Configuration conf = new Configuration(); - conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath()); - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + Configuration conf = getInitialConf(); conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true); // try setting to -ve keep alive timeout. conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100); @@ -535,9 +535,7 @@ protected void sendError(ChannelHandlerContext ctx, String message, @Test public void testSocketKeepAlive() throws Exception { - Configuration conf = new Configuration(); - conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath()); - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + Configuration conf = getInitialConf(); conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true); // try setting to -ve keep alive timeout. conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100); @@ -579,9 +577,7 @@ public void testSocketKeepAlive() throws Exception { @Test (timeout = 10000) public void testIncompatibleShuffleVersion() throws Exception { final int failureNum = 3; - Configuration conf = new Configuration(); - conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath()); - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + Configuration conf = getInitialConf(); ShuffleHandler shuffleHandler = new ShuffleHandler(); shuffleHandler.init(conf); shuffleHandler.start(); @@ -613,9 +609,7 @@ public void testIncompatibleShuffleVersion() throws Exception { @Test (timeout = 10000) public void testMaxConnections() throws Exception { - Configuration conf = new Configuration(); - conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath()); - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + Configuration conf = getInitialConf(); conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); ShuffleHandler shuffleHandler = new ShuffleHandler() { @Override @@ -722,9 +716,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, */ @Test(timeout = 10000) public void testRangedFetch() throws IOException { - Configuration conf = new Configuration(); - conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath()); - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + Configuration conf = getInitialConf(); conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); @@ -741,23 +733,7 @@ public void testRangedFetch() throws IOException { List fileMap = new ArrayList<>(); createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, conf, fileMap); - ShuffleHandler shuffleHandler = new ShuffleHandler() { - - @Override - protected Shuffle getShuffle(Configuration conf) { - // replace the shuffle handler with one stubbed for testing - return new Shuffle(conf) { - - @Override - protected void verifyRequest(String appid, ChannelHandlerContext ctx, - HttpRequest request, HttpResponse response, URL requestUri) - throws IOException { - // Do nothing. - } - - }; - } - }; + ShuffleHandler shuffleHandler = getShuffleHandlerWithNoVerify(); shuffleHandler.init(conf); try { shuffleHandler.start(); @@ -814,6 +790,104 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx, } } + /** + * Validate the ranged fetch works as expected for different amount of map attempts and reduce ranges. + */ + @Test(timeout = 30000) + public void testRangedFetchMultipleAttempts() throws IOException { + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/1, /*reduceRange*/1); + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/5, /*reduceRange*/1); + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/10, /*reduceRange*/1); + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/100, /*reduceRange*/1); + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/1, /*reduceRange*/5); + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/5, /*reduceRange*/5); + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/10, /*reduceRange*/5); + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/100, /*reduceRange*/5); + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/1, /*reduceRange*/10); + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/5, /*reduceRange*/10); + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/10, /*reduceRange*/10); + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/100, /*reduceRange*/10); + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/1, /*reduceRange*/100); + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/5, /*reduceRange*/100); + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/10, /*reduceRange*/100); + runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/100, /*reduceRange*/100); + } + + private void runMultiAttemptMultiRangeShuffleTest(int attemptRange, int reduceRange) throws IOException { + Random random = new Random(); + String user = "randomUser"; + int firstAttempt = random.nextInt(10); + int reducerIdStart = random.nextInt(10); + int reducerIdEnd = reducerIdStart + reduceRange - 1; + + Configuration conf = getInitialConf(); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + conf.setInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES, 3); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); + UserGroupInformation.setConfiguration(conf); + File absLogDir = new File("target", TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + ApplicationId appId = ApplicationId.newInstance(12345, 1); + LOG.info(appId.toString()); + List attemptIds = IntStream.range(firstAttempt, firstAttempt + attemptRange) + .mapToObj(i -> "attempt_12345_1_m_" + i + "_0").collect(Collectors.toList()); + List fileMap = new ArrayList<>(); + for (String attemptId : attemptIds) { + createShuffleHandlerFiles(absLogDir, user, appId.toString(), attemptId, conf, fileMap, reducerIdStart, + reducerIdEnd); + } + ShuffleHandler shuffleHandler = getShuffleHandlerWithNoVerify(); + shuffleHandler.init(conf); + try { + shuffleHandler.start(); + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + outputBuffer.reset(); + Token jt = new Token("identifier".getBytes(), "password".getBytes(), + new Text(user), new Text("shuffleService")); + jt.write(outputBuffer); + shuffleHandler.initializeApplication(new ApplicationInitializationContext(user, appId, + ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength()))); + URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_0001&dag=1&reduce=" + reducerIdStart + "-" + reducerIdEnd + "&map=" + + String.join(",", attemptIds)); + LOG.info("Calling shuffle URL: {}", url); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + boolean succeeded = false; + try { + DataInputStream is = new DataInputStream(conn.getInputStream()); + for (String attempt : attemptIds) { + int partitionCount = WritableUtils.readVInt(is); + List headers = new ArrayList<>(partitionCount); + for (int i = reducerIdStart; i <= reducerIdEnd; i++) { + ShuffleHeader header = new ShuffleHeader(); + header.readFields(is); + Assert.assertEquals("Incorrect map id", attempt, header.getMapId()); + Assert.assertEquals("Incorrect reduce id", i, header.getPartition()); + headers.add(header); + } + for (ShuffleHeader header : headers) { + byte[] bytes = new byte[(int) header.getCompressedLength()]; + is.read(bytes); + Assert.assertEquals(TEST_PARTITION_DATA_STRING, new String(bytes)); + } + } + succeeded = true; + // Read one more byte to force EOF + is.readByte(); + Assert.fail("More fetch bytes that expected in stream"); + } catch (EOFException e) { + Assert.assertTrue("Failed to copy ranged fetch", succeeded); + } + + } finally { + shuffleHandler.close(); + FileUtil.fullyDelete(absLogDir); + } + } + /** * Validate the ownership of the map-output files being pulled in. The * local-file-system owner of the file should match the user component in the @@ -824,9 +898,7 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx, public void testMapFileAccess() throws IOException { // This will run only in NativeIO is enabled as SecureIOUtils need it assumeTrue(NativeIO.isAvailable()); - Configuration conf = new Configuration(); - conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath()); - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + Configuration conf = getInitialConf(); conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); @@ -842,23 +914,7 @@ public void testMapFileAccess() throws IOException { List fileMap = new ArrayList(); createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, conf, fileMap); - ShuffleHandler shuffleHandler = new ShuffleHandler() { - - @Override - protected Shuffle getShuffle(Configuration conf) { - // replace the shuffle handler with one stubbed for testing - return new Shuffle(conf) { - - @Override - protected void verifyRequest(String appid, ChannelHandlerContext ctx, - HttpRequest request, HttpResponse response, URL requestUri) - throws IOException { - // Do nothing. - } - - }; - } - }; + ShuffleHandler shuffleHandler = getShuffleHandlerWithNoVerify(); shuffleHandler.init(conf); try { shuffleHandler.start(); @@ -907,48 +963,55 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx, } } - private static void createShuffleHandlerFiles(File logDir, String user, - String appId, String appAttemptId, Configuration conf, - List fileMap) throws IOException { - String attemptDir = - StringUtils.join(Path.SEPARATOR, - new String[] { logDir.getAbsolutePath(), - ShuffleHandler.USERCACHE, user, - ShuffleHandler.APPCACHE, appId,"dag_1/" + "output", - appAttemptId }); + private static void createShuffleHandlerFiles(File logDir, String user, String appId, String appAttemptId, + Configuration conf, List fileMap) throws IOException { + createShuffleHandlerFiles(logDir, user, appId, appAttemptId, conf, fileMap, 0, 1); + } + + private static void createShuffleHandlerFiles(File logDir, String user, String appId, String appAttemptId, + Configuration conf, List fileMap, int reduceStart, int reduceEnd) throws IOException { + String attemptDir = StringUtils.join(Path.SEPARATOR, new String[] { logDir.getAbsolutePath(), + ShuffleHandler.USERCACHE, user, ShuffleHandler.APPCACHE, appId, "dag_1/" + "output", appAttemptId }); File appAttemptDir = new File(attemptDir); appAttemptDir.mkdirs(); - System.out.println(appAttemptDir.getAbsolutePath()); + LOG.info(appAttemptDir.getAbsolutePath()); File indexFile = new File(appAttemptDir, "file.out.index"); fileMap.add(indexFile); - createIndexFile(indexFile, conf); + createIndexFile(indexFile, conf, reduceStart, reduceEnd); File mapOutputFile = new File(appAttemptDir, "file.out"); fileMap.add(mapOutputFile); - createMapOutputFile(mapOutputFile, conf); + createMapOutputFile(mapOutputFile, conf, reduceEnd - reduceStart + 1); } - private static void - createMapOutputFile(File mapOutputFile, Configuration conf) - throws IOException { + private static void createMapOutputFile(File mapOutputFile, Configuration conf, int partitionCount) + throws IOException { FileOutputStream out = new FileOutputStream(mapOutputFile); - out.write("Creating new dummy map output file. Used only for testing" - .getBytes()); + + StringBuilder b = new StringBuilder(partitionCount * TEST_PARTITION_DATA_STRING.length()); + for (int i = 0; i < partitionCount; i++) { + b.append(TEST_PARTITION_DATA_STRING); + } + + out.write(b.toString().getBytes()); out.flush(); out.close(); } - private static void createIndexFile(File indexFile, Configuration conf) + private static void createIndexFile(File indexFile, Configuration conf, int reduceStart, int reduceEnd) throws IOException { if (indexFile.exists()) { - System.out.println("Deleting existing file"); + LOG.info("Deleting existing file"); indexFile.delete(); } Checksum crc = new PureJavaCrc32(); - TezSpillRecord tezSpillRecord = new TezSpillRecord(2); - tezSpillRecord.putIndex(new TezIndexRecord(0, 10, 10), 0); - tezSpillRecord.putIndex(new TezIndexRecord(10, 10, 10), 1); - tezSpillRecord.writeToFile(new Path(indexFile.getAbsolutePath()), conf, - FileSystem.getLocal(conf).getRaw(), crc); + TezSpillRecord tezSpillRecord = new TezSpillRecord(reduceEnd + 1); + int offset = 0; + for (int i = reduceStart; i <= reduceEnd; i++) { + tezSpillRecord.putIndex( + new TezIndexRecord(offset, TEST_PARTITION_DATA_STRING.length(), TEST_PARTITION_DATA_STRING.length()), i); + offset += TEST_PARTITION_DATA_STRING.length(); + } + tezSpillRecord.writeToFile(new Path(indexFile.getAbsolutePath()), conf, FileSystem.getLocal(conf).getRaw(), crc); } @Test @@ -958,9 +1021,7 @@ public void testRecovery() throws IOException { final File tmpDir = new File(System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), TestShuffleHandler.class.getName()); - Configuration conf = new Configuration(); - conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath()); - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + Configuration conf = getInitialConf(); conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); ShuffleHandler shuffle = new ShuffleHandler(); // emulate aux services startup with recovery enabled @@ -1026,9 +1087,7 @@ public void testRecoveryFromOtherVersions() throws IOException { final File tmpDir = new File(System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), TestShuffleHandler.class.getName()); - Configuration conf = new Configuration(); - conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath()); - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + Configuration conf = getInitialConf(); conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); ShuffleHandler shuffle = new ShuffleHandler(); // emulate aux services startup with recovery enabled @@ -1133,9 +1192,7 @@ private static int getShuffleResponseCode(ShuffleHandler shuffle, @Test(timeout = 100000) public void testGetMapOutputInfo() throws Exception { final AtomicBoolean failureEncountered = new AtomicBoolean(false); - Configuration conf = new Configuration(); - conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath()); - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + Configuration conf = getInitialConf(); conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); @@ -1237,10 +1294,8 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, @Test(timeout = 5000) public void testDagDelete() throws Exception { final AtomicBoolean failureEncountered = new AtomicBoolean(false); - Configuration conf = new Configuration(); - conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath()); + Configuration conf = getInitialConf(); conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); UserGroupInformation.setConfiguration(conf); @@ -1318,9 +1373,8 @@ protected void sendError(ChannelHandlerContext ctx, String message, @Test public void testVertexShuffleDelete() throws Exception { final ArrayList failures = new ArrayList(1); - Configuration conf = new Configuration(); + Configuration conf = getInitialConf(); conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); UserGroupInformation.setConfiguration(conf); @@ -1387,7 +1441,7 @@ protected void sendError(ChannelHandlerContext ctx, String message, fail("Encountered Exception!" + e.getMessage()); } } finally { - shuffleHandler.stop(); + shuffleHandler.close(); FileUtil.fullyDelete(absLogDir); } } @@ -1395,9 +1449,8 @@ protected void sendError(ChannelHandlerContext ctx, String message, @Test(timeout = 5000) public void testFailedTaskAttemptDelete() throws Exception { final ArrayList failures = new ArrayList(1); - Configuration conf = new Configuration(); + Configuration conf = getInitialConf(); conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); UserGroupInformation.setConfiguration(conf); @@ -1469,7 +1522,7 @@ protected void sendError(ChannelHandlerContext ctx, String message, Assert.assertEquals("sendError called due to shuffle error", 0, failures.size()); } finally { - shuffleHandler.stop(); + shuffleHandler.close(); FileUtil.fullyDelete(absLogDir); } } @@ -1500,10 +1553,7 @@ public void testSendMapCount() throws Exception { when(mockCh.writeAndFlush(Object.class)).thenReturn(mockFuture); final ShuffleHandler sh = new MockShuffleHandler(); - Configuration conf = new Configuration(); - conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath()); - // The Shuffle handler port associated with the service is bound to but not used. - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + Configuration conf = getInitialConf(); sh.init(conf); sh.start(); int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES, @@ -1523,8 +1573,7 @@ public void testSendMapCount() throws Exception { @Test public void testShuffleHandlerSendsDiskError() throws Exception { - Configuration conf = new Configuration(); - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + Configuration conf = getInitialConf(); DataInputStream input = null; MockShuffleHandlerWithFatalDiskError shuffleHandler = @@ -1609,22 +1658,45 @@ public void testConfigPortStatic() throws Exception { shuffleHandler.serviceStart(); Assert.assertEquals(port, shuffleHandler.getPort()); } finally { - shuffleHandler.stop(); + shuffleHandler.close(); } } @Test public void testConfigPortDynamic() throws Exception { - Configuration conf = new Configuration(); - // 0 as config, should be dynamically chosen by netty - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + Configuration conf = getInitialConf(); MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2(); shuffleHandler.serviceInit(conf); try { shuffleHandler.serviceStart(); Assert.assertTrue("ShuffleHandler should use a random chosen port", shuffleHandler.getPort() > 0); } finally { - shuffleHandler.stop(); + shuffleHandler.close(); } } + + private Configuration getInitialConf() { + Configuration conf = new Configuration(); + conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath()); + // 0 as config, should be dynamically chosen by netty + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + return conf; + } + + private ShuffleHandler getShuffleHandlerWithNoVerify() { + return new ShuffleHandler() { + + @Override + protected Shuffle getShuffle(Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, + HttpResponse response, URL requestUri) throws IOException { + // Do nothing. + } + }; + } + }; + } }