public void testWithSessionCloseOutsideTheLoop() throws Exception { TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); for (int i = 0; i < 100; i++) { TopicSubscriber subscriber = subscriberSession.createSubscriber(topic); DummyMessageListener listener = new DummyMessageListener(); subscriber.setMessageListener(listener); subscriber.close(); } subscriberSession.close(); connection.close(); Thread.sleep(1000); Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic); assertNotNull(dest); assertTrue(dest.getConsumers().isEmpty()); }
private void doTestCreateTopicPublisher(boolean useAnonymousProducers) throws JMSException { cf.setUseAnonymousProducers(useAnonymousProducers); JmsPoolConnection connection = (JmsPoolConnection) cf.createConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic1 = session.createTopic("Topic-1"); Topic topic2 = session.createTopic("Topic-2"); JmsPoolTopicPublisher publisher1 = (JmsPoolTopicPublisher) session.createPublisher(topic1); JmsPoolTopicPublisher publisher2 = (JmsPoolTopicPublisher) session.createPublisher(topic2); if (useAnonymousProducers) { assertSame(publisher1.getMessageProducer(), publisher2.getMessageProducer()); } else { assertNotSame(publisher1.getMessageProducer(), publisher2.getMessageProducer()); } connection.close(); }
@Test public void testGetTopic() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTemporaryTopic(); TopicSubscriber subscriber = session.createSubscriber(topic); assertNotNull(subscriber.getTopic()); assertSame(topic, subscriber.getTopic()); subscriber.close(); try { subscriber.getTopic(); fail("Cannot read topic on closed subscriber"); } catch (IllegalStateException ise) {} }
@Test public void testGetNoLocal() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTemporaryTopic(); TopicSubscriber subscriber = session.createDurableSubscriber(topic, "name", "color = red", true); assertTrue(subscriber.getNoLocal()); subscriber.close(); try { subscriber.getNoLocal(); fail("Cannot read state on closed subscriber"); } catch (IllegalStateException ise) {} }
@Test public void testGetTopicSubscriber() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTemporaryTopic(); JmsPoolTopicSubscriber subscriber = (JmsPoolTopicSubscriber) session.createDurableSubscriber(topic, "name", "color = red", true); assertNotNull(subscriber.getTopicSubscriber()); assertTrue(subscriber.getTopicSubscriber() instanceof MockJMSTopicSubscriber); subscriber.close(); try { subscriber.getTopicSubscriber(); fail("Cannot read state on closed subscriber"); } catch (IllegalStateException ise) {} }
@Test public void testGetTopic() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTemporaryTopic(); TopicPublisher publisher = session.createPublisher(topic); assertNotNull(publisher.getTopic()); assertSame(topic, publisher.getTopic()); publisher.close(); try { publisher.getTopic(); fail("Cannot read topic on closed publisher"); } catch (IllegalStateException ise) {} }
@Test public void testGetTopicPublisher() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTemporaryTopic(); JmsPoolTopicPublisher publisher = (JmsPoolTopicPublisher) session.createPublisher(topic); assertNotNull(publisher.getTopicPublisher()); assertTrue(publisher.getTopicPublisher() instanceof MockJMSTopicPublisher); publisher.close(); try { publisher.getTopicPublisher(); fail("Cannot read state on closed publisher"); } catch (IllegalStateException ise) {} }
@Test(timeout = 60000) public void testSenderAndPublisherDest() throws Exception { JmsPoolXAConnectionFactory pcf = new JmsPoolXAConnectionFactory(); pcf.setConnectionFactory(new ActiveMQXAConnectionFactory( "vm://test?broker.persistent=false&broker.useJmx=false")); QueueConnection connection = pcf.createQueueConnection(); QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); QueueSender sender = session.createSender(session.createQueue("AA")); assertNotNull(sender.getQueue().getQueueName()); connection.close(); TopicConnection topicConnection = pcf.createTopicConnection(); TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher topicPublisher = topicSession.createPublisher(topicSession.createTopic("AA")); assertNotNull(topicPublisher.getTopic().getTopicName()); topicConnection.close(); pcf.stop(); }
/** * @param topicConnection * @param chatTopic * @param userId * @throws JMSException * @throws IOException */ void publish(TopicConnection topicConnection, Topic chatTopic, String userId) throws JMSException, IOException { TopicSession tsession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher topicPublisher = tsession.createPublisher(chatTopic); topicConnection.start(); BufferedReader reader = new BufferedReader(new InputStreamReader( System.in)); while (true) { String msgToSend = reader.readLine(); if (msgToSend.equalsIgnoreCase("exit")) { topicConnection.close(); System.exit(0); } else { TextMessage msg = (TextMessage) tsession.createTextMessage(); msg.setText("\n["+userId + " : " + msgToSend+"]"); topicPublisher.publish(msg); } } }
public static ManagedSession create( Session session ) { if ( (session instanceof XAQueueSession) && (session instanceof XATopicSession)) return new ManagedXAQueueTopicSession(session); if (session instanceof XAQueueSession) return new ManagedXAQueueSession((XAQueueSession) session); if (session instanceof XATopicSession) return new ManagedXATopicSession((XATopicSession) session); if ( (session instanceof QueueSession) && (session instanceof TopicSession)) return new ManagedQueueTopicSession(session); if (session instanceof QueueSession) return new ManagedQueueSession((QueueSession) session); if (session instanceof TopicSession) return new ManagedTopicSession((TopicSession) session); return new ManagedSession(session); }
/** * Sends the given {@code events} to the configured JMS Topic. It takes the current Unit of Work * into account when available. Otherwise, it simply publishes directly. * * @param events the events to publish on the JMS Message Broker */ protected void send(List<? extends EventMessage<?>> events) { try (TopicConnection topicConnection = connectionFactory.createTopicConnection()) { int ackMode = isTransacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE; TopicSession topicSession = topicConnection.createTopicSession(isTransacted, ackMode); try (TopicPublisher publisher = topicSession.createPublisher(topic)) { for (EventMessage event : events) { Message jmsMessage = messageConverter.createJmsMessage(event, topicSession); doSendMessage(publisher, jmsMessage); } } finally { handleTransaction(topicSession); } } catch (JMSException ex) { throw new EventPublicationFailedException( "Unable to establish TopicConnection to JMS message broker.", ex); } }
@Before public void setUp() throws Exception { embeddedBroker.start(); out = new SimpleEventBus(); in = new SimpleEventBus(); ActiveMQConnectionFactory connectionFactory = embeddedBroker.createConnectionFactory(); topicConnection = connectionFactory.createTopicConnection(); topicConnection.start(); TopicSession topicSession = topicConnection.createTopicSession( true, Session.SESSION_TRANSACTED); TemporaryTopic topic = topicSession.createTemporaryTopic(); consumer = topicSession.createConsumer(topic); publisher = new JmsPublisher(out); publisher.setTopic(topic); publisher.setConnectionFactory(connectionFactory); publisher.postConstruct(); publisher.start(); jmsMessageSource = new JmsMessageSource(this.consumer, new DefaultJmsMessageConverter(new XStreamSerializer())); jmsMessageSource.subscribe(in::publish); }
@Test public void testSendMessage_NoUnitOfWork() throws Exception { TopicConnection connection = mock(TopicConnection.class); when(connectionFactory.createTopicConnection()).thenReturn(connection); TopicSession transactionalSession = mock(TopicSession.class); when(connection.createTopicSession(true, Session.SESSION_TRANSACTED)) .thenReturn(transactionalSession); when(transactionalSession.createPublisher(topic)).thenReturn(publisher); GenericEventMessage<String> message = new GenericEventMessage<>("Message"); TextMessage jmsMessage = mock(TextMessage.class); when(converter.createJmsMessage(message, transactionalSession)).thenReturn(jmsMessage); eventBus.publish(message); verify(publisher).publish(jmsMessage); verify(transactionalSession).commit(); verify(transactionalSession).close(); }
@Test public void testSendMessage_WithTransactionalUnitOfWork() throws Exception { GenericEventMessage<String> message = new GenericEventMessage<>("Message"); final UnitOfWork<?> uow = DefaultUnitOfWork.startAndGet(message); TopicConnection connection = mock(TopicConnection.class); when(connectionFactory.createTopicConnection()).thenReturn(connection); TopicSession transactionalSession = mock(TopicSession.class); when(connection.createTopicSession(true, Session.SESSION_TRANSACTED)) .thenReturn(transactionalSession); when(transactionalSession.createPublisher(topic)).thenReturn(publisher); when(transactionalSession.getTransacted()).thenReturn(true); TextMessage jmsMessage = mock(TextMessage.class); when(converter.createJmsMessage(message, transactionalSession)).thenReturn(jmsMessage); eventBus.publish(message); uow.commit(); verify(publisher).publish(jmsMessage); verify(transactionalSession).commit(); verify(transactionalSession).close(); }
@Test public void testSendMessage_WithUnitOfWorkRollback() throws Exception { GenericEventMessage<String> message = new GenericEventMessage<>("Message"); final UnitOfWork<?> uow = DefaultUnitOfWork.startAndGet(message); TopicConnection connection = mock(TopicConnection.class); when(connectionFactory.createTopicConnection()).thenReturn(connection); TopicSession transactionalSession = mock(TopicSession.class); when(connection.createTopicSession(true, Session.SESSION_TRANSACTED)) .thenReturn(transactionalSession); when(transactionalSession.createPublisher(topic)).thenReturn(publisher); when(transactionalSession.getTransacted()).thenReturn(true); TextMessage jmsMessage = mock(TextMessage.class); when(converter.createJmsMessage(message, transactionalSession)).thenReturn(jmsMessage); eventBus.publish(message); verify(transactionalSession, never()).rollback(); verify(transactionalSession, never()).commit(); verify(transactionalSession, never()).close(); uow.rollback(); verify(publisher, never()).publish(jmsMessage); verify(transactionalSession, never()).commit(); verify(connectionFactory, never()).createTopicConnection(); }
@Test public void testSendPersistentMessage() throws Exception { cut.setPersistent(true); cut.setMessageConverter(null); cut.postConstruct(); TopicConnection connection = mock(TopicConnection.class); when(connectionFactory.createTopicConnection()).thenReturn(connection); TopicSession transactionalSession = mock(TopicSession.class); when(connection.createTopicSession(true, Session.SESSION_TRANSACTED)) .thenReturn(transactionalSession); when(transactionalSession.createPublisher(topic)).thenReturn(publisher); TextMessage jmsMessage = mock(TextMessage.class); when(transactionalSession.createTextMessage(any())).thenReturn(jmsMessage); ArgumentCaptor<Message> jmsMsgCapture = ArgumentCaptor.forClass(Message.class); doNothing().when(publisher).publish(jmsMsgCapture.capture()); eventBus.publish(new GenericEventMessage<>("Message")); verify(jmsMessage).setJMSDeliveryMode(DeliveryMode.PERSISTENT); }
public DestinationWrapper<Topic> lookupTopic(String topic, TopicSession session) throws JMSException, NamingException { if (usingJNDI || session == null) { return lookupTopicFromJNDI(topic); } else if (usingMQ) { //if we are using MQ call the superclass MQ methods to create the // topic then we'll do anything MB specific.. //if we are using MQ call the superclass MQ methods to create the topic then we'll do anything MB specific.. DestinationWrapper<Topic> dw = super.lookupTopic(topic, session); configureWBIMBTopic((MQTopic)dw.destination); return dw; } //if we are here then we need to go create and configure the topic // ourselves as it must be for MC or IP return new DestinationWrapper<Topic>(topic, configureWBIMBTopic((MQTopic) session.createTopic(topic))); }
protected void createSubscriber(TopicSession topicSession, Topic topic) throws JMSException { // Create subscriber MessageConsumer subscriber = topicSession.createConsumer(topic); // Attach message listener to subscriber subscriber.setMessageListener(new MessageListener() { public void onMessage(javax.jms.Message message) { try { // Process the message processMessage(message); } catch (Exception ex) { // Error logger.error("Error", ex); } } }); }
public JMSSink(final String tcfBindingName, final String topicBindingName, final String username, final String password) { try { final Context ctx = new InitialContext(); final TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) lookup(ctx, tcfBindingName); final TopicConnection topicConnection = topicConnectionFactory.createTopicConnection(username, password); topicConnection.start(); final TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); final Topic topic = (Topic) ctx.lookup(topicBindingName); final TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic); topicSubscriber.setMessageListener(this); } catch (final Exception e) { logger.error("Could not read JMS message.", e); } }
protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException { if (inboundTopicBridges != null) { TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); for (InboundTopicBridge bridge : inboundTopicBridges) { String TopicName = bridge.getInboundTopicName(); Topic foreignTopic = createForeignTopic(outboundSession, TopicName); bridge.setConsumer(null); bridge.setConsumerTopic(foreignTopic); bridge.setConsumerConnection(connection); bridge.setJmsConnector(this); addInboundBridge(bridge); } outboundSession.close(); } }
protected void initializeInboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException { if (inboundTopicBridges != null) { TopicSession localSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); for (InboundTopicBridge bridge : inboundTopicBridges) { String localTopicName = bridge.getLocalTopicName(); Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); bridge.setProducerTopic(activemqTopic); bridge.setProducerConnection(connection); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getInboundMessageConvertor()); } bridge.setJmsConnector(this); addInboundBridge(bridge); } localSession.close(); } }
protected void initializeOutboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException { if (outboundTopicBridges != null) { TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); for (OutboundTopicBridge bridge : outboundTopicBridges) { String topicName = bridge.getOutboundTopicName(); Topic foreignTopic = createForeignTopic(outboundSession, topicName); bridge.setProducerTopic(foreignTopic); bridge.setProducerConnection(connection); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } bridge.setJmsConnector(this); addOutboundBridge(bridge); } outboundSession.close(); } }
protected void initializeOutboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException { if (outboundTopicBridges != null) { TopicSession localSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); for (OutboundTopicBridge bridge : outboundTopicBridges) { String localTopicName = bridge.getLocalTopicName(); Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); bridge.setConsumer(null); bridge.setConsumerTopic(activemqTopic); bridge.setConsumerConnection(connection); bridge.setJmsConnector(this); addOutboundBridge(bridge); } localSession.close(); } }
/** * Create {@link MessageProducer} instance for the provided session. * * @param session JMS Session instance. * @return Message producer. * @throws JMSConnectorException Error when creating the JMS Message Producer. */ public MessageProducer createMessageProducer(Session session) throws JMSConnectorException { try { if (logger.isDebugEnabled()) { logger.debug("Creating a new JMS Message Producer on: " + this.connectionFactoryString); } if ((JMSConstants.JMS_SPEC_VERSION_1_1.equals(jmsSpec)) || (JMSConstants.JMS_SPEC_VERSION_2_0 .equals(jmsSpec))) { return session.createProducer(null); } else { if (JMSConstants.JMSDestinationType.QUEUE.equals(this.destinationType)) { return ((QueueSession) session).createSender(null); } else { return ((TopicSession) session).createPublisher(null); } } } catch (JMSException e) { throw new JMSConnectorException("JMS Exception while creating the producer for the destination ", e); } }
/** * Close a JMS {@link Session}. * @param session Session that needs to be closed. * @throws JMSException if an error occurs while closing the session. */ public void closeSession(Session session) throws JMSException { if (session != null) { if (logger.isDebugEnabled()) { logger.debug("Closing a JMS Session of: " + this.connectionFactoryString); } if ((JMSConstants.JMS_SPEC_VERSION_1_1.equals(jmsSpec)) || (JMSConstants.JMS_SPEC_VERSION_2_0 .equals(jmsSpec))) { session.close(); } else { if (JMSConstants.JMSDestinationType.QUEUE.equals(this.destinationType)) { ((QueueSession) session).close(); } else { ((TopicSession) session).close(); } } } }
/** * To publish the messages to a topic. * * @throws JMSException JMS Exception. * @throws InterruptedException Interrupted exception while waiting in between messages. */ public void publishMessagesToTopic(String topicName) throws JMSException, InterruptedException { TopicConnection topicConnection = (TopicConnection) connectionFactory.createConnection(); topicConnection.start(); TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = topicSession.createTopic(topicName); MessageProducer topicSender = topicSession.createProducer(destination); topicSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int index = 0; index < 10; index++) { String topicText = "Topic Message : " + (index + 1); TextMessage topicMessage = topicSession.createTextMessage(topicText); topicSender.send(topicMessage); logger.info("Publishing " + topicText + " to topic " + topicName); Thread.sleep(1000); } topicConnection.close(); topicSession.close(); topicSender.close(); }
@Test public void testNotificationProperties() throws Exception { try (TopicConnection topicConnection = factory.createTopicConnection()) { TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic notificationsTopic = topicSession.createTopic("activemq.notifications"); TopicSubscriber subscriber = topicSession.createSubscriber(notificationsTopic); List<Message> receivedMessages = new CopyOnWriteArrayList<>(); subscriber.setMessageListener(receivedMessages::add); topicConnection.start(); Wait.waitFor(() -> receivedMessages.size() > 0); Assert.assertTrue(receivedMessages.size() > 0); for (Message message : receivedMessages) { assertNotNull(message); assertNotNull(message.getStringProperty("_AMQ_NotifType")); } } }
@Test(timeout = 60000) public void testSendAndReceiveOnTopic() throws Exception { Connection connection = createConnection("myClientId"); try { TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(getTopicName()); TopicSubscriber consumer = session.createSubscriber(topic); TopicPublisher producer = session.createPublisher(topic); TextMessage message = session.createTextMessage("test-message"); producer.send(message); producer.close(); connection.start(); message = (TextMessage) consumer.receive(1000); assertNotNull(message); assertNotNull(message.getText()); assertEquals("test-message", message.getText()); } finally { connection.close(); } }
@Override protected void setUp() throws Exception { super.setUp(); context = createApplicationContext(); ActiveMQConnectionFactory fac = (ActiveMQConnectionFactory) context.getBean("localFactory"); localConnection = fac.createTopicConnection(); localConnection.start(); requestServerSession = localConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic theTopic = requestServerSession.createTopic(getClass().getName()); requestServerConsumer = requestServerSession.createConsumer(theTopic); requestServerConsumer.setMessageListener(this); requestServerProducer = requestServerSession.createProducer(null); fac = (ActiveMQConnectionFactory) context.getBean("remoteFactory"); remoteConnection = fac.createTopicConnection(); remoteConnection.start(); TopicSession session = remoteConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); requestor = new TopicRequestor(session, theTopic); }
public void testWithSessionAndSubsciberClose() throws Exception { TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); for (int i = 0; i < 100; i++) { TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber subscriber = subscriberSession.createSubscriber(topic); DummyMessageListener listener = new DummyMessageListener(); subscriber.setMessageListener(listener); subscriber.close(); subscriberSession.close(); } connection.close(); Thread.sleep(1000); Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic); assertNotNull(dest); assertTrue(dest.getConsumers().isEmpty()); }
public void testWithOneSubscriber() throws Exception { TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber subscriber = subscriberSession.createSubscriber(topic); DummyMessageListener listener = new DummyMessageListener(); subscriber.setMessageListener(listener); subscriber.close(); subscriberSession.close(); connection.close(); Thread.sleep(1000); Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic); assertNotNull(dest); assertTrue(dest.getConsumers().isEmpty()); }
public void testWithoutSessionAndSubsciberClose() throws Exception { TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); for (int i = 0; i < 100; i++) { TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber subscriber = subscriberSession.createSubscriber(topic); assertNotNull(subscriber); } connection.close(); Thread.sleep(1000); Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic); assertNotNull(dest); assertTrue(dest.getConsumers().isEmpty()); }
/** * Running this test you can produce a leak of only 2 ConsumerInfo on BE * broker, NOT 200 as in other cases! */ public void testWithoutSessionAndSubsciberClosePlayAround() throws Exception { TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); for (int i = 0; i < 100; i++) { TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber subscriber = subscriberSession.createSubscriber(topic); DummyMessageListener listener = new DummyMessageListener(); subscriber.setMessageListener(listener); if (i != 50) { subscriber.close(); subscriberSession.close(); } } connection.close(); Thread.sleep(1000); Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic); assertNotNull(dest); assertTrue(dest.getConsumers().isEmpty()); }
private Message collectMessagesFromDurableSubscriptionForOneMinute() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName); TopicConnection connection = connectionFactory.createTopicConnection(); connection.setClientID(clientID); TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = topicSession.createTopic(topicName); connection.start(); TopicSubscriber subscriber = topicSession.createDurableSubscriber(topic, durableSubName); LOG.info("About to receive messages"); Message message = subscriber.receive(120000); subscriber.close(); connection.close(); LOG.info("collectMessagesFromDurableSubscriptionForOneMinute done"); return message; }
@Test public void testGetNoLocalOnClosedConsumer() throws Exception { Connection consumerConnection = null; try { consumerConnection = createConnection(); TopicConnection tc = (TopicConnection) consumerConnection; TopicSession consumerSession = tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber topicConsumer = consumerSession.createSubscriber(ActiveMQServerTestCase.topic1); topicConsumer.close(); try { topicConsumer.getNoLocal(); Assert.fail("must throw a JMS IllegalStateException"); } catch (javax.jms.IllegalStateException e) { // OK } } finally { if (consumerConnection != null) { consumerConnection.close(); } } }
/** * Topics shouldn't hold on to messages if there are no subscribers */ @Test public void testPersistentMessagesForTopicDropped() throws Exception { TopicConnection topicConn = createTopicConnection(); TopicSession sess = topicConn.createTopicSession(true, 0); TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1); pub.setDeliveryMode(DeliveryMode.PERSISTENT); Message m = sess.createTextMessage("testing123"); pub.publish(m); sess.commit(); topicConn.close(); checkEmpty(ActiveMQServerTestCase.topic1); }
/** * Topics shouldn't hold on to messages when the non-durable subscribers close */ @Test public void testPersistentMessagesForTopicDropped2() throws Exception { TopicConnection topicConn = createTopicConnection(); topicConn.start(); TopicSession sess = topicConn.createTopicSession(true, 0); TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1); TopicSubscriber sub = sess.createSubscriber(ActiveMQServerTestCase.topic1); pub.setDeliveryMode(DeliveryMode.PERSISTENT); Message m = sess.createTextMessage("testing123"); pub.publish(m); sess.commit(); // receive but rollback TextMessage m2 = (TextMessage) sub.receive(3000); ProxyAssertSupport.assertNotNull(m2); ProxyAssertSupport.assertEquals("testing123", m2.getText()); sess.rollback(); topicConn.close(); checkEmpty(ActiveMQServerTestCase.topic1); }
/** * Create a topic subscriber * * @param topic The topic * @return The subscriber * @throws JMSException Thrown if an error occurs */ @Override public TopicSubscriber createSubscriber(final Topic topic) throws JMSException { lock(); try { TopicSession session = getTopicSessionInternal(); if (ActiveMQRASession.trace) { ActiveMQRALogger.LOGGER.trace("createSubscriber " + session + " topic=" + topic); } TopicSubscriber result = session.createSubscriber(topic); result = new ActiveMQRATopicSubscriber(result, this); if (ActiveMQRASession.trace) { ActiveMQRALogger.LOGGER.trace("createdSubscriber " + session + " ActiveMQTopicSubscriber=" + result); } addConsumer(result); return result; } finally { unlock(); } }
/** * Create a topic publisher * * @param topic The topic * @return The publisher * @throws JMSException Thrown if an error occurs */ @Override public TopicPublisher createPublisher(final Topic topic) throws JMSException { lock(); try { TopicSession session = getTopicSessionInternal(); if (ActiveMQRASession.trace) { ActiveMQRALogger.LOGGER.trace("createPublisher " + session + " topic=" + topic); } TopicPublisher result = session.createPublisher(topic); result = new ActiveMQRATopicPublisher(result, this); if (ActiveMQRASession.trace) { ActiveMQRALogger.LOGGER.trace("createdPublisher " + session + " publisher=" + result); } addProducer(result); return result; } finally { unlock(); } }
/** * Get the topic session * * @return The topic session * @throws JMSException Thrown if an error occurs */ @Override public TopicSession getTopicSession() throws JMSException { if (ActiveMQRASession.trace) { ActiveMQRALogger.LOGGER.trace("getTopicSession()"); } if (cri.getType() == ActiveMQRAConnectionFactory.CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION) { throw new IllegalStateException("Non XA connection"); } lock(); try { return this; } finally { unlock(); } }