/** * @param args * @throws JMSException * @throws IOException */ public static void main(String[] args) throws JMSException, IOException { if (args.length != 1) { System.out.println("User Name is required...."); } else { userId = args[0]; ApplicationContext ctx = new ClassPathXmlApplicationContext( "com/springtraining/jms/spring-config.xml"); BasicJMSChat basicJMSChat = (BasicJMSChat) ctx .getBean("basicJMSChat"); TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) basicJMSChat.chatJMSTemplate .getConnectionFactory(); TopicConnection tc = topicConnectionFactory.createTopicConnection(); basicJMSChat.publish(tc, basicJMSChat.chatTopic, userId); basicJMSChat.subscribe(tc, basicJMSChat.chatTopic, basicJMSChat); } }
public void testWithSessionCloseOutsideTheLoop() throws Exception { TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); for (int i = 0; i < 100; i++) { TopicSubscriber subscriber = subscriberSession.createSubscriber(topic); DummyMessageListener listener = new DummyMessageListener(); subscriber.setMessageListener(listener); subscriber.close(); } subscriberSession.close(); connection.close(); Thread.sleep(1000); Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic); assertNotNull(dest); assertTrue(dest.getConsumers().isEmpty()); }
@Test(timeout = 60000) public void testSenderAndPublisherDest() throws Exception { JmsPoolXAConnectionFactory pcf = new JmsPoolXAConnectionFactory(); pcf.setConnectionFactory(new ActiveMQXAConnectionFactory( "vm://test?broker.persistent=false&broker.useJmx=false")); QueueConnection connection = pcf.createQueueConnection(); QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); QueueSender sender = session.createSender(session.createQueue("AA")); assertNotNull(sender.getQueue().getQueueName()); connection.close(); TopicConnection topicConnection = pcf.createTopicConnection(); TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher topicPublisher = topicSession.createPublisher(topicSession.createTopic("AA")); assertNotNull(topicPublisher.getTopic().getTopicName()); topicConnection.close(); pcf.stop(); }
/** * @param topicConnection * @param chatTopic * @param userId * @throws JMSException * @throws IOException */ void publish(TopicConnection topicConnection, Topic chatTopic, String userId) throws JMSException, IOException { TopicSession tsession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher topicPublisher = tsession.createPublisher(chatTopic); topicConnection.start(); BufferedReader reader = new BufferedReader(new InputStreamReader( System.in)); while (true) { String msgToSend = reader.readLine(); if (msgToSend.equalsIgnoreCase("exit")) { topicConnection.close(); System.exit(0); } else { TextMessage msg = (TextMessage) tsession.createTextMessage(); msg.setText("\n["+userId + " : " + msgToSend+"]"); topicPublisher.publish(msg); } } }
public static ManagedConnection create( final Connection connection ) { if ( (connection instanceof XAQueueConnection) && (connection instanceof XATopicConnection)) { return new ManagedXAQueueTopicConnection(connection); } else if (connection instanceof XAQueueConnection) { return new ManagedXAQueueConnection((XAQueueConnection) connection); } else if (connection instanceof XATopicConnection) { return new ManagedXATopicConnection((XATopicConnection) connection); } else if ( (connection instanceof QueueConnection) && (connection instanceof TopicConnection)) { return new ManagedQueueTopicConnection(connection); } else if (connection instanceof QueueConnection) { return new ManagedQueueConnection((QueueConnection) connection); } else if (connection instanceof TopicConnection) { return new ManagedTopicConnection((TopicConnection) connection); } else { return new ManagedConnection(connection); } }
/** * Sends the given {@code events} to the configured JMS Topic. It takes the current Unit of Work * into account when available. Otherwise, it simply publishes directly. * * @param events the events to publish on the JMS Message Broker */ protected void send(List<? extends EventMessage<?>> events) { try (TopicConnection topicConnection = connectionFactory.createTopicConnection()) { int ackMode = isTransacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE; TopicSession topicSession = topicConnection.createTopicSession(isTransacted, ackMode); try (TopicPublisher publisher = topicSession.createPublisher(topic)) { for (EventMessage event : events) { Message jmsMessage = messageConverter.createJmsMessage(event, topicSession); doSendMessage(publisher, jmsMessage); } } finally { handleTransaction(topicSession); } } catch (JMSException ex) { throw new EventPublicationFailedException( "Unable to establish TopicConnection to JMS message broker.", ex); } }
@Test public void testSendMessage_NoUnitOfWork() throws Exception { TopicConnection connection = mock(TopicConnection.class); when(connectionFactory.createTopicConnection()).thenReturn(connection); TopicSession transactionalSession = mock(TopicSession.class); when(connection.createTopicSession(true, Session.SESSION_TRANSACTED)) .thenReturn(transactionalSession); when(transactionalSession.createPublisher(topic)).thenReturn(publisher); GenericEventMessage<String> message = new GenericEventMessage<>("Message"); TextMessage jmsMessage = mock(TextMessage.class); when(converter.createJmsMessage(message, transactionalSession)).thenReturn(jmsMessage); eventBus.publish(message); verify(publisher).publish(jmsMessage); verify(transactionalSession).commit(); verify(transactionalSession).close(); }
@Test public void testSendMessage_WithTransactionalUnitOfWork() throws Exception { GenericEventMessage<String> message = new GenericEventMessage<>("Message"); final UnitOfWork<?> uow = DefaultUnitOfWork.startAndGet(message); TopicConnection connection = mock(TopicConnection.class); when(connectionFactory.createTopicConnection()).thenReturn(connection); TopicSession transactionalSession = mock(TopicSession.class); when(connection.createTopicSession(true, Session.SESSION_TRANSACTED)) .thenReturn(transactionalSession); when(transactionalSession.createPublisher(topic)).thenReturn(publisher); when(transactionalSession.getTransacted()).thenReturn(true); TextMessage jmsMessage = mock(TextMessage.class); when(converter.createJmsMessage(message, transactionalSession)).thenReturn(jmsMessage); eventBus.publish(message); uow.commit(); verify(publisher).publish(jmsMessage); verify(transactionalSession).commit(); verify(transactionalSession).close(); }
@Test public void testSendMessage_WithUnitOfWorkRollback() throws Exception { GenericEventMessage<String> message = new GenericEventMessage<>("Message"); final UnitOfWork<?> uow = DefaultUnitOfWork.startAndGet(message); TopicConnection connection = mock(TopicConnection.class); when(connectionFactory.createTopicConnection()).thenReturn(connection); TopicSession transactionalSession = mock(TopicSession.class); when(connection.createTopicSession(true, Session.SESSION_TRANSACTED)) .thenReturn(transactionalSession); when(transactionalSession.createPublisher(topic)).thenReturn(publisher); when(transactionalSession.getTransacted()).thenReturn(true); TextMessage jmsMessage = mock(TextMessage.class); when(converter.createJmsMessage(message, transactionalSession)).thenReturn(jmsMessage); eventBus.publish(message); verify(transactionalSession, never()).rollback(); verify(transactionalSession, never()).commit(); verify(transactionalSession, never()).close(); uow.rollback(); verify(publisher, never()).publish(jmsMessage); verify(transactionalSession, never()).commit(); verify(connectionFactory, never()).createTopicConnection(); }
@Test public void testSendPersistentMessage() throws Exception { cut.setPersistent(true); cut.setMessageConverter(null); cut.postConstruct(); TopicConnection connection = mock(TopicConnection.class); when(connectionFactory.createTopicConnection()).thenReturn(connection); TopicSession transactionalSession = mock(TopicSession.class); when(connection.createTopicSession(true, Session.SESSION_TRANSACTED)) .thenReturn(transactionalSession); when(transactionalSession.createPublisher(topic)).thenReturn(publisher); TextMessage jmsMessage = mock(TextMessage.class); when(transactionalSession.createTextMessage(any())).thenReturn(jmsMessage); ArgumentCaptor<Message> jmsMsgCapture = ArgumentCaptor.forClass(Message.class); doNothing().when(publisher).publish(jmsMsgCapture.capture()); eventBus.publish(new GenericEventMessage<>("Message")); verify(jmsMessage).setJMSDeliveryMode(DeliveryMode.PERSISTENT); }
/** * Create a default Session for this ConnectionFactory, * adapting to JMS 1.0.2 style queue/topic mode if necessary. * @param con the JMS Connection to operate on * @param mode the Session acknowledgement mode * ({@code Session.TRANSACTED} or one of the common modes) * @return the newly created Session * @throws JMSException if thrown by the JMS API */ protected Session createSession(Connection con, Integer mode) throws JMSException { // Determine JMS API arguments... boolean transacted = (mode == Session.SESSION_TRANSACTED); int ackMode = (transacted ? Session.AUTO_ACKNOWLEDGE : mode); // Now actually call the appropriate JMS factory method... if (Boolean.FALSE.equals(this.pubSubMode) && con instanceof QueueConnection) { return ((QueueConnection) con).createQueueSession(transacted, ackMode); } else if (Boolean.TRUE.equals(this.pubSubMode) && con instanceof TopicConnection) { return ((TopicConnection) con).createTopicSession(transacted, ackMode); } else { return con.createSession(transacted, ackMode); } }
@Test public void testWithTopicConnection() throws JMSException { Connection con = mock(TopicConnection.class); SingleConnectionFactory scf = new SingleConnectionFactory(con); TopicConnection con1 = scf.createTopicConnection(); con1.start(); con1.stop(); con1.close(); TopicConnection con2 = scf.createTopicConnection(); con2.start(); con2.stop(); con2.close(); scf.destroy(); // should trigger actual close verify(con, times(2)).start(); verify(con, times(2)).stop(); verify(con).close(); verifyNoMoreInteractions(con); }
@Test public void testWithTopicConnectionFactoryAndJms11Usage() throws JMSException { TopicConnectionFactory cf = mock(TopicConnectionFactory.class); TopicConnection con = mock(TopicConnection.class); given(cf.createConnection()).willReturn(con); SingleConnectionFactory scf = new SingleConnectionFactory(cf); Connection con1 = scf.createConnection(); Connection con2 = scf.createConnection(); con1.start(); con2.start(); con1.close(); con2.close(); scf.destroy(); // should trigger actual close verify(con).start(); verify(con).stop(); verify(con).close(); verifyNoMoreInteractions(con); }
@Test public void testWithTopicConnectionFactoryAndJms102Usage() throws JMSException { TopicConnectionFactory cf = mock(TopicConnectionFactory.class); TopicConnection con = mock(TopicConnection.class); given(cf.createTopicConnection()).willReturn(con); SingleConnectionFactory scf = new SingleConnectionFactory(cf); Connection con1 = scf.createTopicConnection(); Connection con2 = scf.createTopicConnection(); con1.start(); con2.start(); con1.close(); con2.close(); scf.destroy(); // should trigger actual close verify(con).start(); verify(con).stop(); verify(con).close(); verifyNoMoreInteractions(con); }
public TopicConnection getTopicConnection(TopicConnectionFactory tcf, String uniqueID ) throws JMSException { final TopicConnection tc; final String username = Config.parms.getString("us"); if (username != null && username.length() != 0) { Log.logger.log(Level.INFO, "getTopicConnection(): authenticating as \"" + username + "\""); final String password = Config.parms.getString("pw"); tc = tcf.createTopicConnection(username, password); } else { tc = tcf.createTopicConnection(); } if (durable) { // Note: change signature to match getConnection setDurableConnectionId( tc, ((WorkerThread)Thread.currentThread()), uniqueID ); } // end if durable return tc; }
public JMSSink(final String tcfBindingName, final String topicBindingName, final String username, final String password) { try { final Context ctx = new InitialContext(); final TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) lookup(ctx, tcfBindingName); final TopicConnection topicConnection = topicConnectionFactory.createTopicConnection(username, password); topicConnection.start(); final TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); final Topic topic = (Topic) ctx.lookup(topicBindingName); final TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic); topicSubscriber.setMessageListener(this); } catch (final Exception e) { logger.error("Could not read JMS message.", e); } }
protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException { if (inboundTopicBridges != null) { TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); for (InboundTopicBridge bridge : inboundTopicBridges) { String TopicName = bridge.getInboundTopicName(); Topic foreignTopic = createForeignTopic(outboundSession, TopicName); bridge.setConsumer(null); bridge.setConsumerTopic(foreignTopic); bridge.setConsumerConnection(connection); bridge.setJmsConnector(this); addInboundBridge(bridge); } outboundSession.close(); } }
protected void initializeInboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException { if (inboundTopicBridges != null) { TopicSession localSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); for (InboundTopicBridge bridge : inboundTopicBridges) { String localTopicName = bridge.getLocalTopicName(); Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); bridge.setProducerTopic(activemqTopic); bridge.setProducerConnection(connection); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getInboundMessageConvertor()); } bridge.setJmsConnector(this); addInboundBridge(bridge); } localSession.close(); } }
protected void initializeOutboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException { if (outboundTopicBridges != null) { TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); for (OutboundTopicBridge bridge : outboundTopicBridges) { String topicName = bridge.getOutboundTopicName(); Topic foreignTopic = createForeignTopic(outboundSession, topicName); bridge.setProducerTopic(foreignTopic); bridge.setProducerConnection(connection); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } bridge.setJmsConnector(this); addOutboundBridge(bridge); } outboundSession.close(); } }
protected void initializeOutboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException { if (outboundTopicBridges != null) { TopicSession localSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); for (OutboundTopicBridge bridge : outboundTopicBridges) { String localTopicName = bridge.getLocalTopicName(); Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); bridge.setConsumer(null); bridge.setConsumerTopic(activemqTopic); bridge.setConsumerConnection(connection); bridge.setJmsConnector(this); addOutboundBridge(bridge); } localSession.close(); } }
/** * Create JMS {@link Session} instance on top of the provided {@link Connection} instance. * * @param connection JMS Connection. * @return Session instance. * @throws JMSConnectorException Error when creating the JMS Session. */ public Session createSession(Connection connection) throws JMSConnectorException { try { if (logger.isDebugEnabled()) { logger.debug("Creating a new JMS Session on: " + this.connectionFactoryString); } if (JMSConstants.JMS_SPEC_VERSION_1_1.equals(jmsSpec) || JMSConstants.JMS_SPEC_VERSION_2_0 .equals(jmsSpec)) { return connection.createSession(transactedSession, sessionAckMode); } else if (JMSConstants.JMSDestinationType.QUEUE.equals(this.destinationType)) { return ((QueueConnection) (connection)).createQueueSession(transactedSession, sessionAckMode); } else { return ((TopicConnection) (connection)).createTopicSession(transactedSession, sessionAckMode); } } catch (JMSException e) { throw new JMSConnectorException( "JMS Exception while obtaining session for factory " + connectionFactoryString, e); } }
/** * Close a JMS {@link Connection}. * @param connection Connection that need to be closed. * @throws JMSException if an error occurs while closing the connection. */ public void closeConnection(Connection connection) throws JMSException { if (connection != null) { if (logger.isDebugEnabled()) { logger.debug("Closing a JMS Connection of: " + this.connectionFactoryString); } if ((JMSConstants.JMS_SPEC_VERSION_1_1.equals(jmsSpec)) || (JMSConstants.JMS_SPEC_VERSION_2_0 .equals(jmsSpec))) { connection.close(); } else { if (JMSConstants.JMSDestinationType.QUEUE.equals(this.destinationType)) { ((QueueConnection) connection).close(); } else { ((TopicConnection) connection).close(); } } } }
/** * To publish the messages to a topic. * * @throws JMSException JMS Exception. * @throws InterruptedException Interrupted exception while waiting in between messages. */ public void publishMessagesToTopic(String topicName) throws JMSException, InterruptedException { TopicConnection topicConnection = (TopicConnection) connectionFactory.createConnection(); topicConnection.start(); TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = topicSession.createTopic(topicName); MessageProducer topicSender = topicSession.createProducer(destination); topicSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int index = 0; index < 10; index++) { String topicText = "Topic Message : " + (index + 1); TextMessage topicMessage = topicSession.createTextMessage(topicText); topicSender.send(topicMessage); logger.info("Publishing " + topicText + " to topic " + topicName); Thread.sleep(1000); } topicConnection.close(); topicSession.close(); topicSender.close(); }
@Test public void testNotificationProperties() throws Exception { try (TopicConnection topicConnection = factory.createTopicConnection()) { TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic notificationsTopic = topicSession.createTopic("activemq.notifications"); TopicSubscriber subscriber = topicSession.createSubscriber(notificationsTopic); List<Message> receivedMessages = new CopyOnWriteArrayList<>(); subscriber.setMessageListener(receivedMessages::add); topicConnection.start(); Wait.waitFor(() -> receivedMessages.size() > 0); Assert.assertTrue(receivedMessages.size() > 0); for (Message message : receivedMessages) { assertNotNull(message); assertNotNull(message.getStringProperty("_AMQ_NotifType")); } } }
public void testWithSessionAndSubsciberClose() throws Exception { TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); for (int i = 0; i < 100; i++) { TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber subscriber = subscriberSession.createSubscriber(topic); DummyMessageListener listener = new DummyMessageListener(); subscriber.setMessageListener(listener); subscriber.close(); subscriberSession.close(); } connection.close(); Thread.sleep(1000); Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic); assertNotNull(dest); assertTrue(dest.getConsumers().isEmpty()); }
public void testWithOneSubscriber() throws Exception { TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber subscriber = subscriberSession.createSubscriber(topic); DummyMessageListener listener = new DummyMessageListener(); subscriber.setMessageListener(listener); subscriber.close(); subscriberSession.close(); connection.close(); Thread.sleep(1000); Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic); assertNotNull(dest); assertTrue(dest.getConsumers().isEmpty()); }
public void testWithoutSessionAndSubsciberClose() throws Exception { TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); for (int i = 0; i < 100; i++) { TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber subscriber = subscriberSession.createSubscriber(topic); assertNotNull(subscriber); } connection.close(); Thread.sleep(1000); Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic); assertNotNull(dest); assertTrue(dest.getConsumers().isEmpty()); }
/** * Running this test you can produce a leak of only 2 ConsumerInfo on BE * broker, NOT 200 as in other cases! */ public void testWithoutSessionAndSubsciberClosePlayAround() throws Exception { TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); for (int i = 0; i < 100; i++) { TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber subscriber = subscriberSession.createSubscriber(topic); DummyMessageListener listener = new DummyMessageListener(); subscriber.setMessageListener(listener); if (i != 50) { subscriber.close(); subscriberSession.close(); } } connection.close(); Thread.sleep(1000); Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic); assertNotNull(dest); assertTrue(dest.getConsumers().isEmpty()); }
private Message collectMessagesFromDurableSubscriptionForOneMinute() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName); TopicConnection connection = connectionFactory.createTopicConnection(); connection.setClientID(clientID); TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = topicSession.createTopic(topicName); connection.start(); TopicSubscriber subscriber = topicSession.createDurableSubscriber(topic, durableSubName); LOG.info("About to receive messages"); Message message = subscriber.receive(120000); subscriber.close(); connection.close(); LOG.info("collectMessagesFromDurableSubscriptionForOneMinute done"); return message; }
@Test public void testGetNoLocalOnClosedConsumer() throws Exception { Connection consumerConnection = null; try { consumerConnection = createConnection(); TopicConnection tc = (TopicConnection) consumerConnection; TopicSession consumerSession = tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber topicConsumer = consumerSession.createSubscriber(ActiveMQServerTestCase.topic1); topicConsumer.close(); try { topicConsumer.getNoLocal(); Assert.fail("must throw a JMS IllegalStateException"); } catch (javax.jms.IllegalStateException e) { // OK } } finally { if (consumerConnection != null) { consumerConnection.close(); } } }
/** * Topics shouldn't hold on to messages if there are no subscribers */ @Test public void testPersistentMessagesForTopicDropped() throws Exception { TopicConnection topicConn = createTopicConnection(); TopicSession sess = topicConn.createTopicSession(true, 0); TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1); pub.setDeliveryMode(DeliveryMode.PERSISTENT); Message m = sess.createTextMessage("testing123"); pub.publish(m); sess.commit(); topicConn.close(); checkEmpty(ActiveMQServerTestCase.topic1); }
/** * Topics shouldn't hold on to messages when the non-durable subscribers close */ @Test public void testPersistentMessagesForTopicDropped2() throws Exception { TopicConnection topicConn = createTopicConnection(); topicConn.start(); TopicSession sess = topicConn.createTopicSession(true, 0); TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1); TopicSubscriber sub = sess.createSubscriber(ActiveMQServerTestCase.topic1); pub.setDeliveryMode(DeliveryMode.PERSISTENT); Message m = sess.createTextMessage("testing123"); pub.publish(m); sess.commit(); // receive but rollback TextMessage m2 = (TextMessage) sub.receive(3000); ProxyAssertSupport.assertNotNull(m2); ProxyAssertSupport.assertEquals("testing123", m2.getText()); sess.rollback(); topicConn.close(); checkEmpty(ActiveMQServerTestCase.topic1); }
private void assertConnectionType(Connection conn, String type) { if ("generic".equals(type) || "queue".equals(type) || "topic".equals(type)) { //generic Assert.assertFalse(conn instanceof XAConnection); Assert.assertTrue(conn instanceof QueueConnection); Assert.assertFalse(conn instanceof XAQueueConnection); Assert.assertTrue(conn instanceof TopicConnection); Assert.assertFalse(conn instanceof XATopicConnection); } else if ("xa".equals(type) || "xa-queue".equals(type) || "xa-topic".equals(type)) { Assert.assertTrue(conn instanceof XAConnection); Assert.assertTrue(conn instanceof QueueConnection); Assert.assertTrue(conn instanceof XAQueueConnection); Assert.assertTrue(conn instanceof TopicConnection); Assert.assertTrue(conn instanceof XATopicConnection); } else { Assert.fail("Unknown connection type: " + type); } }
/** * Create a topic connection * * @param userName The user name * @param password The password * @return The connection * @throws JMSException Thrown if the operation fails */ @Override public TopicConnection createTopicConnection(final String userName, final String password) throws JMSException { if (ActiveMQRAConnectionFactoryImpl.trace) { ActiveMQRALogger.LOGGER.trace("createTopicConnection(" + userName + ", ****)"); } ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.TOPIC_CONNECTION); s.setUserName(userName); s.setPassword(password); validateUser(s); if (ActiveMQRAConnectionFactoryImpl.trace) { ActiveMQRALogger.LOGGER.trace("Created topic connection: " + s); } return s; }
/** * Create a default Session for this ConnectionFactory, * adaptign to JMS 1.0.2 style queue/topic mode if necessary. * @param con the JMS Connection to operate on * @param mode the Session acknowledgement mode * ({@code Session.TRANSACTED} or one of the common modes) * @return the newly created Session * @throws JMSException if thrown by the JMS API */ protected Session createSession(Connection con, Integer mode) throws JMSException { // Determine JMS API arguments... boolean transacted = (mode == Session.SESSION_TRANSACTED); int ackMode = (transacted ? Session.AUTO_ACKNOWLEDGE : mode); // Now actually call the appropriate JMS factory method... if (Boolean.FALSE.equals(this.pubSubMode) && con instanceof QueueConnection) { return ((QueueConnection) con).createQueueSession(transacted, ackMode); } else if (Boolean.TRUE.equals(this.pubSubMode) && con instanceof TopicConnection) { return ((TopicConnection) con).createTopicSession(transacted, ackMode); } else { return con.createSession(transacted, ackMode); } }
@Test @Deprecated public void testTransactionCommit102WithTopic() throws JMSException { TopicConnectionFactory cf = mock(TopicConnectionFactory.class); TopicConnection con = mock(TopicConnection.class); final TopicSession session = mock(TopicSession.class); given(cf.createTopicConnection()).willReturn(con); given(con.createTopicSession(true, Session.AUTO_ACKNOWLEDGE)).willReturn(session); JmsTransactionManager tm = new JmsTransactionManager102(cf, true); TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition()); JmsTemplate jt = new JmsTemplate102(cf, true); jt.execute(new SessionCallback() { @Override public Object doInJms(Session sess) { assertTrue(sess == session); return null; } }); tm.commit(ts); verify(session).commit(); verify(session).close(); verify(con).close(); }
@Test public void testWithTopicConnection() throws JMSException { Connection con = mock(TopicConnection.class); SingleConnectionFactory scf = new SingleConnectionFactory(con); TopicConnection con1 = scf.createTopicConnection(); con1.start(); con1.stop(); // should be ignored con1.close(); // should be ignored TopicConnection con2 = scf.createTopicConnection(); con2.start(); con2.stop(); // should be ignored con2.close(); // should be ignored scf.destroy(); // should trigger actual close verify(con).start(); verify(con).stop(); verify(con).close(); verifyNoMoreInteractions(con); }
@Test public void testWithTopicConnectionFactoryAndJms11Usage() throws JMSException { TopicConnectionFactory cf = mock(TopicConnectionFactory.class); TopicConnection con = mock(TopicConnection.class); given(cf.createConnection()).willReturn(con); SingleConnectionFactory scf = new SingleConnectionFactory(cf); Connection con1 = scf.createConnection(); con1.start(); con1.close(); // should be ignored Connection con2 = scf.createConnection(); con2.start(); con2.close(); // should be ignored scf.destroy(); // should trigger actual close verify(con).start(); verify(con).stop(); verify(con).close(); verifyNoMoreInteractions(con); }
@Test public void testWithTopicConnectionFactoryAndJms102Usage() throws JMSException { TopicConnectionFactory cf = mock(TopicConnectionFactory.class); TopicConnection con = mock(TopicConnection.class); given(cf.createTopicConnection()).willReturn(con); SingleConnectionFactory scf = new SingleConnectionFactory(cf); Connection con1 = scf.createTopicConnection(); con1.start(); con1.close(); // should be ignored Connection con2 = scf.createTopicConnection(); con2.start(); con2.close(); // should be ignored scf.destroy(); // should trigger actual close verify(con).start(); verify(con).stop(); verify(con).close(); verifyNoMoreInteractions(con); }
@Test public void testConnectionFactory102WithTopic() throws JMSException { TopicConnectionFactory cf = mock(TopicConnectionFactory.class); TopicConnection con = mock(TopicConnection.class); given(cf.createTopicConnection()).willReturn(con); SingleConnectionFactory scf = new SingleConnectionFactory102(cf, true); TopicConnection con1 = scf.createTopicConnection(); con1.start(); con1.close(); // should be ignored TopicConnection con2 = scf.createTopicConnection(); con2.start(); con2.close(); // should be ignored scf.destroy(); // should trigger actual close verify(con).start(); verify(con).stop(); verify(con).close(); verifyNoMoreInteractions(con); }