Java 类org.jboss.netty.channel.AbstractChannel 实例源码

项目:aliyun-oss-hadoop-fs    文件:TestShuffleHandler.java   
@Test(timeout = 4000)
public void testSendMapCount() throws Exception {
  final List<ShuffleHandler.ReduceMapFileCount> listenerList =
      new ArrayList<ShuffleHandler.ReduceMapFileCount>();

  final ChannelHandlerContext mockCtx =
      Mockito.mock(ChannelHandlerContext.class);
  final MessageEvent mockEvt = Mockito.mock(MessageEvent.class);
  final Channel mockCh = Mockito.mock(AbstractChannel.class);

  // Mock HttpRequest and ChannelFuture
  final HttpRequest mockHttpRequest = createMockHttpRequest();
  final ChannelFuture mockFuture = createMockChannelFuture(mockCh,
      listenerList);

  // Mock Netty Channel Context and Channel behavior
  Mockito.doReturn(mockCh).when(mockCtx).getChannel();
  Mockito.when(mockCtx.getChannel()).thenReturn(mockCh);
  Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class));
  Mockito.when(mockCh.write(Object.class)).thenReturn(mockFuture);

  //Mock MessageEvent behavior
  Mockito.doReturn(mockCh).when(mockEvt).getChannel();
  Mockito.when(mockEvt.getChannel()).thenReturn(mockCh);
  Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage();

  final ShuffleHandler sh = new MockShuffleHandler();
  Configuration conf = new Configuration();
  sh.init(conf);
  sh.start();
  int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES,
      ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
  sh.getShuffle(conf).messageReceived(mockCtx, mockEvt);
  assertTrue("Number of Open files should not exceed the configured " +
          "value!-Not Expected",
      listenerList.size() <= maxOpenFiles);
  while(!listenerList.isEmpty()) {
    listenerList.remove(0).operationComplete(mockFuture);
    assertTrue("Number of Open files should not exceed the configured " +
            "value!-Not Expected",
        listenerList.size() <= maxOpenFiles);
  }
  sh.close();
}
项目:hops    文件:TestShuffleHandler.java   
@Test(timeout = 4000)
public void testSendMapCount() throws Exception {
  final List<ShuffleHandler.ReduceMapFileCount> listenerList =
      new ArrayList<ShuffleHandler.ReduceMapFileCount>();

  final ChannelHandlerContext mockCtx =
      Mockito.mock(ChannelHandlerContext.class);
  final MessageEvent mockEvt = Mockito.mock(MessageEvent.class);
  final Channel mockCh = Mockito.mock(AbstractChannel.class);
  final ChannelPipeline mockPipeline = Mockito.mock(ChannelPipeline.class);

  // Mock HttpRequest and ChannelFuture
  final HttpRequest mockHttpRequest = createMockHttpRequest();
  final ChannelFuture mockFuture = createMockChannelFuture(mockCh,
      listenerList);
  final ShuffleHandler.TimeoutHandler timerHandler =
      new ShuffleHandler.TimeoutHandler();

  // Mock Netty Channel Context and Channel behavior
  Mockito.doReturn(mockCh).when(mockCtx).getChannel();
  Mockito.when(mockCh.getPipeline()).thenReturn(mockPipeline);
  Mockito.when(mockPipeline.get(
      Mockito.any(String.class))).thenReturn(timerHandler);
  Mockito.when(mockCtx.getChannel()).thenReturn(mockCh);
  Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class));
  Mockito.when(mockCh.write(Object.class)).thenReturn(mockFuture);

  //Mock MessageEvent behavior
  Mockito.doReturn(mockCh).when(mockEvt).getChannel();
  Mockito.when(mockEvt.getChannel()).thenReturn(mockCh);
  Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage();

  final ShuffleHandler sh = new MockShuffleHandler();
  Configuration conf = new Configuration();
  sh.init(conf);
  sh.start();
  int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES,
      ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
  sh.getShuffle(conf).messageReceived(mockCtx, mockEvt);
  assertTrue("Number of Open files should not exceed the configured " +
          "value!-Not Expected",
      listenerList.size() <= maxOpenFiles);
  while(!listenerList.isEmpty()) {
    listenerList.remove(0).operationComplete(mockFuture);
    assertTrue("Number of Open files should not exceed the configured " +
            "value!-Not Expected",
        listenerList.size() <= maxOpenFiles);
  }
  sh.close();
}
项目:tez    文件:TestShuffleHandler.java   
@Test(timeout = 4000)
public void testSendMapCount() throws Exception {
  final List<ShuffleHandler.ReduceMapFileCount> listenerList =
      new ArrayList<ShuffleHandler.ReduceMapFileCount>();

  final ChannelHandlerContext mockCtx =
      mock(ChannelHandlerContext.class);
  final MessageEvent mockEvt = mock(MessageEvent.class);
  final Channel mockCh = mock(AbstractChannel.class);
  final ChannelPipeline mockPipeline = Mockito.mock(ChannelPipeline.class);

  // Mock HttpRequest and ChannelFuture
  final HttpRequest mockHttpRequest = createMockHttpRequest();
  final ChannelFuture mockFuture = createMockChannelFuture(mockCh,
      listenerList);
  final ShuffleHandler.TimeoutHandler timerHandler =
      new ShuffleHandler.TimeoutHandler();

  // Mock Netty Channel Context and Channel behavior
  Mockito.doReturn(mockCh).when(mockCtx).getChannel();
  Mockito.when(mockCh.getPipeline()).thenReturn(mockPipeline);
  Mockito.when(mockPipeline.get(Mockito.any(String.class))).thenReturn(timerHandler);
  when(mockCtx.getChannel()).thenReturn(mockCh);
  Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class));
  when(mockCh.write(Object.class)).thenReturn(mockFuture);

  //Mock MessageEvent behavior
  Mockito.doReturn(mockCh).when(mockEvt).getChannel();
  when(mockEvt.getChannel()).thenReturn(mockCh);
  Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage();

  final ShuffleHandler sh = new MockShuffleHandler();
  Configuration conf = new Configuration();
  // The Shuffle handler port associated with the service is bound to but not used.
  conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
  sh.init(conf);
  sh.start();
  int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES,
      ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
  sh.getShuffle(conf).messageReceived(mockCtx, mockEvt);
  assertTrue("Number of Open files should not exceed the configured " +
          "value!-Not Expected",
      listenerList.size() <= maxOpenFiles);
  while(!listenerList.isEmpty()) {
    listenerList.remove(0).operationComplete(mockFuture);
    assertTrue("Number of Open files should not exceed the configured " +
            "value!-Not Expected",
        listenerList.size() <= maxOpenFiles);
  }
  sh.close();
}