@Test(timeout = 4000) public void testSendMapCount() throws Exception { final List<ShuffleHandler.ReduceMapFileCount> listenerList = new ArrayList<ShuffleHandler.ReduceMapFileCount>(); final ChannelHandlerContext mockCtx = Mockito.mock(ChannelHandlerContext.class); final MessageEvent mockEvt = Mockito.mock(MessageEvent.class); final Channel mockCh = Mockito.mock(AbstractChannel.class); // Mock HttpRequest and ChannelFuture final HttpRequest mockHttpRequest = createMockHttpRequest(); final ChannelFuture mockFuture = createMockChannelFuture(mockCh, listenerList); // Mock Netty Channel Context and Channel behavior Mockito.doReturn(mockCh).when(mockCtx).getChannel(); Mockito.when(mockCtx.getChannel()).thenReturn(mockCh); Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class)); Mockito.when(mockCh.write(Object.class)).thenReturn(mockFuture); //Mock MessageEvent behavior Mockito.doReturn(mockCh).when(mockEvt).getChannel(); Mockito.when(mockEvt.getChannel()).thenReturn(mockCh); Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage(); final ShuffleHandler sh = new MockShuffleHandler(); Configuration conf = new Configuration(); sh.init(conf); sh.start(); int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES, ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES); sh.getShuffle(conf).messageReceived(mockCtx, mockEvt); assertTrue("Number of Open files should not exceed the configured " + "value!-Not Expected", listenerList.size() <= maxOpenFiles); while(!listenerList.isEmpty()) { listenerList.remove(0).operationComplete(mockFuture); assertTrue("Number of Open files should not exceed the configured " + "value!-Not Expected", listenerList.size() <= maxOpenFiles); } sh.close(); }
@Test(timeout = 4000) public void testSendMapCount() throws Exception { final List<ShuffleHandler.ReduceMapFileCount> listenerList = new ArrayList<ShuffleHandler.ReduceMapFileCount>(); final ChannelHandlerContext mockCtx = Mockito.mock(ChannelHandlerContext.class); final MessageEvent mockEvt = Mockito.mock(MessageEvent.class); final Channel mockCh = Mockito.mock(AbstractChannel.class); final ChannelPipeline mockPipeline = Mockito.mock(ChannelPipeline.class); // Mock HttpRequest and ChannelFuture final HttpRequest mockHttpRequest = createMockHttpRequest(); final ChannelFuture mockFuture = createMockChannelFuture(mockCh, listenerList); final ShuffleHandler.TimeoutHandler timerHandler = new ShuffleHandler.TimeoutHandler(); // Mock Netty Channel Context and Channel behavior Mockito.doReturn(mockCh).when(mockCtx).getChannel(); Mockito.when(mockCh.getPipeline()).thenReturn(mockPipeline); Mockito.when(mockPipeline.get( Mockito.any(String.class))).thenReturn(timerHandler); Mockito.when(mockCtx.getChannel()).thenReturn(mockCh); Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class)); Mockito.when(mockCh.write(Object.class)).thenReturn(mockFuture); //Mock MessageEvent behavior Mockito.doReturn(mockCh).when(mockEvt).getChannel(); Mockito.when(mockEvt.getChannel()).thenReturn(mockCh); Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage(); final ShuffleHandler sh = new MockShuffleHandler(); Configuration conf = new Configuration(); sh.init(conf); sh.start(); int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES, ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES); sh.getShuffle(conf).messageReceived(mockCtx, mockEvt); assertTrue("Number of Open files should not exceed the configured " + "value!-Not Expected", listenerList.size() <= maxOpenFiles); while(!listenerList.isEmpty()) { listenerList.remove(0).operationComplete(mockFuture); assertTrue("Number of Open files should not exceed the configured " + "value!-Not Expected", listenerList.size() <= maxOpenFiles); } sh.close(); }
@Test(timeout = 4000) public void testSendMapCount() throws Exception { final List<ShuffleHandler.ReduceMapFileCount> listenerList = new ArrayList<ShuffleHandler.ReduceMapFileCount>(); final ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class); final MessageEvent mockEvt = mock(MessageEvent.class); final Channel mockCh = mock(AbstractChannel.class); final ChannelPipeline mockPipeline = Mockito.mock(ChannelPipeline.class); // Mock HttpRequest and ChannelFuture final HttpRequest mockHttpRequest = createMockHttpRequest(); final ChannelFuture mockFuture = createMockChannelFuture(mockCh, listenerList); final ShuffleHandler.TimeoutHandler timerHandler = new ShuffleHandler.TimeoutHandler(); // Mock Netty Channel Context and Channel behavior Mockito.doReturn(mockCh).when(mockCtx).getChannel(); Mockito.when(mockCh.getPipeline()).thenReturn(mockPipeline); Mockito.when(mockPipeline.get(Mockito.any(String.class))).thenReturn(timerHandler); when(mockCtx.getChannel()).thenReturn(mockCh); Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class)); when(mockCh.write(Object.class)).thenReturn(mockFuture); //Mock MessageEvent behavior Mockito.doReturn(mockCh).when(mockEvt).getChannel(); when(mockEvt.getChannel()).thenReturn(mockCh); Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage(); final ShuffleHandler sh = new MockShuffleHandler(); Configuration conf = new Configuration(); // The Shuffle handler port associated with the service is bound to but not used. conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); sh.init(conf); sh.start(); int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES, ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES); sh.getShuffle(conf).messageReceived(mockCtx, mockEvt); assertTrue("Number of Open files should not exceed the configured " + "value!-Not Expected", listenerList.size() <= maxOpenFiles); while(!listenerList.isEmpty()) { listenerList.remove(0).operationComplete(mockFuture); assertTrue("Number of Open files should not exceed the configured " + "value!-Not Expected", listenerList.size() <= maxOpenFiles); } sh.close(); }