public void dispatch(MessageDispatch messageDispatch) { try { messageDispatch.setConsumer(this); ServerSession serverSession = sessionPool.getServerSession(); Session s = serverSession.getSession(); ActiveMQSession session = null; if (s instanceof ActiveMQSession) { session = (ActiveMQSession)s; } else if (s instanceof ActiveMQTopicSession) { ActiveMQTopicSession topicSession = (ActiveMQTopicSession)s; session = (ActiveMQSession)topicSession.getNext(); } else if (s instanceof ActiveMQQueueSession) { ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s; session = (ActiveMQSession)queueSession.getNext(); } else { connection.onClientInternalException(new JMSException("Session pool provided an invalid session type: " + s.getClass())); return; } session.dispatch(messageDispatch); serverSession.start(); } catch (JMSException e) { connection.onAsyncException(e); } }
@Override public ServerSession getServerSession() throws JMSException { synchronized (this) { if (serverSessionInUse) { LOG.info("asked for session while in use, not serialised delivery"); success.set(false); completed.set(true); } serverSessionInUse = true; return serverSession; } }
@Test public void testFailoverWithConnectionConsumer() throws Exception { LOG.info(this + " running test testFailoverWithConnectionConsumer"); startCleanBroker(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); configureConnectionFactory(cf); Connection connection = cf.createConnection(); connection.start(); final CountDownLatch connectionConsumerGotOne = new CountDownLatch(1); try { Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue(QUEUE_NAME); final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.createConnectionConsumer(destination, null, new ServerSessionPool() { @Override public ServerSession getServerSession() throws JMSException { return new ServerSession() { @Override public Session getSession() throws JMSException { return poolSession; } @Override public void start() throws JMSException { connectionConsumerGotOne.countDown(); poolSession.run(); } }; } }, 1); MessageConsumer consumer = session.createConsumer(destination); MessageProducer producer; TextMessage message; final int count = 10; for (int i = 0; i < count; i++) { producer = session.createProducer(destination); message = session.createTextMessage("Test message: " + count); producer.send(message); producer.close(); } // restart to force failover and connection state recovery before the commit broker.stop(); startBroker(); session.commit(); for (int i = 0; i < count - 1; i++) { Message received = consumer.receive(20000); Assert.assertNotNull("Failed to get message: " + count, received); } session.commit(); } finally { connection.close(); } Assert.assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS)); }