/** * Test that a client which supports short-circuit reads using * shared memory can fall back to not using shared memory when * the server doesn't support it. */ @Test public void testShortCircuitReadFromServerWithoutShm() throws Exception { TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration clientConf = createShortCircuitConf( "testShortCircuitReadFromServerWithoutShm", sockDir); Configuration serverConf = new Configuration(clientConf); serverConf.setInt( DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0); DFSInputStream.tcpReadsDisabledForTesting = true; final MiniDFSCluster cluster = new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); cluster.waitActive(); clientConf.set(DFS_CLIENT_CONTEXT, "testShortCircuitReadFromServerWithoutShm_clientContext"); final DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf); final String TEST_FILE = "/test_file"; final int TEST_FILE_LEN = 4000; final int SEED = 0xFADEC; DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, (short)1, SEED); byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); byte expected[] = DFSTestUtil. calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); final ShortCircuitCache cache = fs.dfs.getClientContext().getShortCircuitCache(); final DatanodeInfo datanode = new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId()); cache.getDfsClientShmManager().visit(new Visitor() { @Override public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) throws IOException { Assert.assertEquals(1, info.size()); PerDatanodeVisitorInfo vinfo = info.get(datanode); Assert.assertTrue(vinfo.disabled); Assert.assertEquals(0, vinfo.full.size()); Assert.assertEquals(0, vinfo.notFull.size()); } }); cluster.shutdown(); sockDir.close(); }
/** * Test that a client which supports short-circuit reads using * shared memory can fall back to not using shared memory when * the server doesn't support it. */ @Test public void testShortCircuitReadFromServerWithoutShm() throws Exception { TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration clientConf = createShortCircuitConf( "testShortCircuitReadFromServerWithoutShm", sockDir); Configuration serverConf = new Configuration(clientConf); serverConf.setInt( DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0); DFSInputStream.tcpReadsDisabledForTesting = true; final MiniDFSCluster cluster = new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); cluster.waitActive(); clientConf.set(DFS_CLIENT_CONTEXT, "testShortCircuitReadFromServerWithoutShm_clientContext"); final DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf); final String TEST_FILE = "/test_file"; final int TEST_FILE_LEN = 4000; final int SEED = 0xFADEC; DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, (short)1, SEED); byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); byte expected[] = DFSTestUtil. calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); final ShortCircuitCache cache = fs.dfs.getClientContext().getShortCircuitCache(); final DatanodeInfo datanode = new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId()); cache.getDfsClientShmManager().visit(new Visitor() { @Override public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) throws IOException { Assert.assertEquals(1, info.size()); PerDatanodeVisitorInfo vinfo = info.get(datanode); Assert.assertTrue(vinfo.disabled); Assert.assertEquals(0, vinfo.full.size()); Assert.assertEquals(0, vinfo.notFull.size()); } }); cluster.shutdown(); }