@Test public void testGetTopic() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTemporaryTopic(); TopicPublisher publisher = session.createPublisher(topic); assertNotNull(publisher.getTopic()); assertSame(topic, publisher.getTopic()); publisher.close(); try { publisher.getTopic(); fail("Cannot read topic on closed publisher"); } catch (IllegalStateException ise) {} }
@Test(timeout = 60000) public void testSenderAndPublisherDest() throws Exception { JmsPoolXAConnectionFactory pcf = new JmsPoolXAConnectionFactory(); pcf.setConnectionFactory(new ActiveMQXAConnectionFactory( "vm://test?broker.persistent=false&broker.useJmx=false")); QueueConnection connection = pcf.createQueueConnection(); QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); QueueSender sender = session.createSender(session.createQueue("AA")); assertNotNull(sender.getQueue().getQueueName()); connection.close(); TopicConnection topicConnection = pcf.createTopicConnection(); TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher topicPublisher = topicSession.createPublisher(topicSession.createTopic("AA")); assertNotNull(topicPublisher.getTopic().getTopicName()); topicConnection.close(); pcf.stop(); }
/** * @param topicConnection * @param chatTopic * @param userId * @throws JMSException * @throws IOException */ void publish(TopicConnection topicConnection, Topic chatTopic, String userId) throws JMSException, IOException { TopicSession tsession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher topicPublisher = tsession.createPublisher(chatTopic); topicConnection.start(); BufferedReader reader = new BufferedReader(new InputStreamReader( System.in)); while (true) { String msgToSend = reader.readLine(); if (msgToSend.equalsIgnoreCase("exit")) { topicConnection.close(); System.exit(0); } else { TextMessage msg = (TextMessage) tsession.createTextMessage(); msg.setText("\n["+userId + " : " + msgToSend+"]"); topicPublisher.publish(msg); } } }
/** * Sends the given {@code events} to the configured JMS Topic. It takes the current Unit of Work * into account when available. Otherwise, it simply publishes directly. * * @param events the events to publish on the JMS Message Broker */ protected void send(List<? extends EventMessage<?>> events) { try (TopicConnection topicConnection = connectionFactory.createTopicConnection()) { int ackMode = isTransacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE; TopicSession topicSession = topicConnection.createTopicSession(isTransacted, ackMode); try (TopicPublisher publisher = topicSession.createPublisher(topic)) { for (EventMessage event : events) { Message jmsMessage = messageConverter.createJmsMessage(event, topicSession); doSendMessage(publisher, jmsMessage); } } finally { handleTransaction(topicSession); } } catch (JMSException ex) { throw new EventPublicationFailedException( "Unable to establish TopicConnection to JMS message broker.", ex); } }
@Before public void setUp() throws Exception { eventBus = new SimpleEventBus(); cut = new JmsPublisher(eventBus); connectionFactory = mock(TopicConnectionFactory.class); publisher = mock(TopicPublisher.class); topic = mock(Topic.class); converter = mock(JmsMessageConverter.class); cut.setConnectionFactory(connectionFactory); cut.setTopic(topic); cut.setTransacted(true); cut.setMessageConverter(converter); cut.setPersistent(false); cut.postConstruct(); cut.start(); }
/** * Close a JMS {@link MessageProducer}. * @param messageProducer JMS Message Producer that needs to be closed. * @throws JMSException if an error occurs while closing the producer. */ public void closeProducer(MessageProducer messageProducer) throws JMSException { if (messageProducer != null) { if (logger.isDebugEnabled()) { logger.debug("Closing a JMS Message Producer of: " + this.connectionFactoryString); } if ((JMSConstants.JMS_SPEC_VERSION_1_1.equals(jmsSpec)) || (JMSConstants.JMS_SPEC_VERSION_2_0 .equals(jmsSpec))) { messageProducer.close(); } else { if (JMSConstants.JMSDestinationType.QUEUE.equals(this.destinationType)) { ((QueueSender) messageProducer).close(); } else { ((TopicPublisher) messageProducer).close(); } } } }
@Test(timeout = 60000) public void testSendAndReceiveOnTopic() throws Exception { Connection connection = createConnection("myClientId"); try { TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(getTopicName()); TopicSubscriber consumer = session.createSubscriber(topic); TopicPublisher producer = session.createPublisher(topic); TextMessage message = session.createTextMessage("test-message"); producer.send(message); producer.close(); connection.start(); message = (TextMessage) consumer.receive(1000); assertNotNull(message); assertNotNull(message.getText()); assertEquals("test-message", message.getText()); } finally { connection.close(); } }
/** * Topics shouldn't hold on to messages if there are no subscribers */ @Test public void testPersistentMessagesForTopicDropped() throws Exception { TopicConnection topicConn = createTopicConnection(); TopicSession sess = topicConn.createTopicSession(true, 0); TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1); pub.setDeliveryMode(DeliveryMode.PERSISTENT); Message m = sess.createTextMessage("testing123"); pub.publish(m); sess.commit(); topicConn.close(); checkEmpty(ActiveMQServerTestCase.topic1); }
/** * Topics shouldn't hold on to messages when the non-durable subscribers close */ @Test public void testPersistentMessagesForTopicDropped2() throws Exception { TopicConnection topicConn = createTopicConnection(); topicConn.start(); TopicSession sess = topicConn.createTopicSession(true, 0); TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1); TopicSubscriber sub = sess.createSubscriber(ActiveMQServerTestCase.topic1); pub.setDeliveryMode(DeliveryMode.PERSISTENT); Message m = sess.createTextMessage("testing123"); pub.publish(m); sess.commit(); // receive but rollback TextMessage m2 = (TextMessage) sub.receive(3000); ProxyAssertSupport.assertNotNull(m2); ProxyAssertSupport.assertEquals("testing123", m2.getText()); sess.rollback(); topicConn.close(); checkEmpty(ActiveMQServerTestCase.topic1); }
/** * Publish message * * @param message The message * @throws JMSException Thrown if an error occurs */ @Override public void publish(final Message message) throws JMSException { session.lock(); try { if (ActiveMQRATopicPublisher.trace) { ActiveMQRALogger.LOGGER.trace("send " + this + " message=" + message); } checkState(); ((TopicPublisher) producer).publish(message); if (ActiveMQRATopicPublisher.trace) { ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message); } } finally { session.unlock(); } }
/** * Publish message * * @param destination The destination * @param message The message * @throws JMSException Thrown if an error occurs */ @Override public void publish(final Topic destination, final Message message) throws JMSException { session.lock(); try { if (ActiveMQRATopicPublisher.trace) { ActiveMQRALogger.LOGGER.trace("send " + this + " destination=" + destination + " message=" + message); } checkState(); ((TopicPublisher) producer).publish(destination, message); if (ActiveMQRATopicPublisher.trace) { ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message); } } finally { session.unlock(); } }
/** * Create a topic publisher * * @param topic The topic * @return The publisher * @throws JMSException Thrown if an error occurs */ @Override public TopicPublisher createPublisher(final Topic topic) throws JMSException { lock(); try { TopicSession session = getTopicSessionInternal(); if (ActiveMQRASession.trace) { ActiveMQRALogger.LOGGER.trace("createPublisher " + session + " topic=" + topic); } TopicPublisher result = session.createPublisher(topic); result = new ActiveMQRATopicPublisher(result, this); if (ActiveMQRASession.trace) { ActiveMQRALogger.LOGGER.trace("createdPublisher " + session + " publisher=" + result); } addProducer(result); return result; } finally { unlock(); } }
/** * Overrides the superclass method to use the JMS 1.0.2 API to send a response. * <p>Uses the JMS pub-sub API if the given destination is a topic, * else uses the JMS queue API. */ protected void sendResponse(Session session, Destination destination, Message response) throws JMSException { MessageProducer producer = null; try { if (destination instanceof Topic) { producer = ((TopicSession) session).createPublisher((Topic) destination); postProcessProducer(producer, response); ((TopicPublisher) producer).publish(response); } else { producer = ((QueueSession) session).createSender((Queue) destination); postProcessProducer(producer, response); ((QueueSender) producer).send(response); } } finally { JmsUtils.closeMessageProducer(producer); } }
/** * This implementation overrides the superclass method to use JMS 1.0.2 API. */ protected void doSend(MessageProducer producer, Message message) throws JMSException { if (isPubSubDomain()) { if (isExplicitQosEnabled()) { ((TopicPublisher) producer).publish(message, getDeliveryMode(), getPriority(), getTimeToLive()); } else { ((TopicPublisher) producer).publish(message); } } else { if (isExplicitQosEnabled()) { ((QueueSender) producer).send(message, getDeliveryMode(), getPriority(), getTimeToLive()); } else { ((QueueSender) producer).send(message); } } }
/** * Test the execute(ProducerCallback) using a topic. */ @Test public void testTopicProducerCallback() throws Exception { JmsTemplate102 template = createTemplate(); template.setPubSubDomain(true); template.setConnectionFactory(topicConnectionFactory); template.afterPropertiesSet(); TopicPublisher topicPublisher = mock(TopicPublisher.class); given(topicSession.createPublisher(null)).willReturn(topicPublisher); given(topicPublisher.getPriority()).willReturn(4); template.execute(new ProducerCallback() { @Override public Object doInJms(Session session, MessageProducer producer) throws JMSException { session.getTransacted(); producer.getPriority(); return null; } }); verify(topicPublisher).close(); verify(topicSession).close(); verify(topicConnection).close(); }
@Override public TopicPublisher createPublisher(Topic topic) throws JMSException { externalAccessLock.readLock().lock(); try { checkNotClosed(); LocalTopicPublisher publisher = new LocalTopicPublisher(this,topic,idProvider.createID()); registerProducer(publisher); return publisher; } finally { externalAccessLock.readLock().unlock(); } }
@Override public TopicPublisher createPublisher(Topic topic) throws JMSException { externalAccessLock.readLock().lock(); try { checkNotClosed(); RemoteTopicPublisher publisher = new RemoteTopicPublisher(this, DestinationTools.asRef(topic), idProvider.createID()); registerProducer(publisher); return publisher; } finally { externalAccessLock.readLock().unlock(); } }
public void testAllMethodsThrowAfterConnectionClose() throws Exception { // give external brokers a chance to start up Thread.sleep(3000); AMQConnection connection = (AMQConnection) getConnection("guest", "guest"); Topic destination1 = new AMQTopic(connection, "t1"); TopicSession session1 = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher pub = session1.createPublisher(destination1); connection.close(); try { pub.getDeliveryMode(); fail("Expected exception not thrown"); } catch (javax.jms.IllegalStateException e) { // PASS } }
public void testUnidentifiedProducer() throws Exception { AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQTopic topic = new AMQTopic(con,"MyTopic"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(null); MessageConsumer consumer1 = session1.createConsumer(topic); con.start(); publisher.publish(topic, session1.createTextMessage("Hello")); TextMessage m = (TextMessage) consumer1.receive(2000); assertNotNull(m); try { publisher.publish(session1.createTextMessage("Goodbye")); fail("Did not throw UnsupportedOperationException"); } catch (UnsupportedOperationException e) { // PASS } con.close(); }
public void testSendingSameMessage() throws Exception { AMQConnection conn = (AMQConnection) getConnection("guest", "guest"); TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); TemporaryTopic topic = session.createTemporaryTopic(); assertNotNull(topic); TopicPublisher producer = session.createPublisher(topic); MessageConsumer consumer = session.createConsumer(topic); conn.start(); TextMessage sentMessage = session.createTextMessage("Test Message"); producer.send(sentMessage); session.commit(); TextMessage receivedMessage = (TextMessage) consumer.receive(2000); assertNotNull(receivedMessage); assertEquals(sentMessage.getText(), receivedMessage.getText()); producer.send(sentMessage); session.commit(); receivedMessage = (TextMessage) consumer.receive(2000); assertNotNull(receivedMessage); assertEquals(sentMessage.getText(), receivedMessage.getText()); session.commit(); conn.close(); }
@Test(timeout = 10000) public void testPublishMessageOnProvidedTopicWhenNotAnonymous() throws Exception { Topic topic = session.createTopic(getTestName()); TopicPublisher publisher = session.createPublisher(topic); Message message = session.createMessage(); try { publisher.publish(session.createTopic(getTestName() + "1"), message); fail("Should throw UnsupportedOperationException"); } catch (UnsupportedOperationException uoe) {} try { publisher.publish((Topic) null, message); fail("Should throw InvalidDestinationException"); } catch (InvalidDestinationException ide) {} }
@Test(timeout = 10000) public void testPublishMessageWithOptionsOnProvidedTopicWhenNotAnonymous() throws Exception { Topic topic = session.createTopic(getTestName()); TopicPublisher publisher = session.createPublisher(topic); Message message = session.createMessage(); try { publisher.publish(session.createTopic(getTestName() + "1"), message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); fail("Should throw UnsupportedOperationException"); } catch (UnsupportedOperationException uoe) {} try { publisher.publish((Topic) null, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); fail("Should throw InvalidDestinationException"); } catch (InvalidDestinationException ide) {} }
@Test public void testCreateTopicPublisher() throws Exception { JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI()); TopicConnection connection = factory.createTopicConnection(); assertNotNull(connection); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); assertNotNull(session); Topic topic = session.createTopic(name.getMethodName()); TopicPublisher publisher = session.createPublisher(topic); assertNotNull(publisher); TopicViewMBean proxy = getProxyToTopic(name.getMethodName()); assertEquals(0, proxy.getEnqueueCount()); connection.close(); }
/** * Product message for assigned topic. * * @param uri * e.g.: tcp://3CNL12096:61616 * @param queueName * name of queue * @throws JMSException */ public static void publishTextMsg2Topic(String uri, String topicName, String text) throws JMSException { TopicConnectionFactory connectionFactory = null; TopicConnection connection = null; TopicSession session = null; TopicPublisher tp = null; try { connectionFactory = new ActiveMQConnectionFactory(uri); connection = connectionFactory.createTopicConnection(); connection.start(); session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); tp = session.createPublisher(session.createTopic(topicName)); tp.setDeliveryMode(DeliveryMode.PERSISTENT); tp.publish(session.createTextMessage(text)); session.commit(); } finally { closeQuietly(tp); closeQuietly(session); closeQuietly(connection); } }
/** * {@inheritDoc} */ public void sendEnqueueMessage() { ServiceLocator locator = ServiceLocatorFactory.getLocator(); final TopicConnectionFactory factory = (TopicConnectionFactory) locator.lookup(DEFAULT_QUEUE_CONN_FACTORY); final Topic topic = (Topic) locator.lookup(FileManagementMDB.QUEUE_JNDI_NAME); TopicConnection connection = null; TopicSession session = null; TopicPublisher publisher = null; try { connection = factory.createTopicConnection(); session = connection.createTopicSession(true, 0); publisher = session.createPublisher(topic); final Message message = session.createTextMessage("enqueue"); publisher.send(message); } catch (JMSException e) { LOG.error("Couldn't submit job to JMS", e); } finally { close(publisher); close(session); close(connection); } }
public TopicPublisher createPublisher(Topic topic) throws JMSException { lock(); try { TopicSession session = getTopicSession(); if (trace) log.trace("createPublisher " + session + " topic=" + topic); TopicPublisher result = session.createPublisher(topic); result = new JmsTopicPublisher(result, this); if (trace) log.trace("createdPublisher " + session + " publisher=" + result); addProducer(result); return result; } finally { unlock(); } }
private TopicPublisher getTopicPublisher(Topic destination) throws JMSException { TopicPublisher result = null; if (useAnonymousProducers) { result = safeGetSessionHolder().getOrCreatePublisher(); } else { result = ((TopicSession) getInternalSession()).createPublisher(destination); } return result; }
public TopicPublisher getOrCreatePublisher() throws JMSException { if (publisher == null) { synchronized (this) { if (publisher == null) { publisher = ((TopicSession) session).createPublisher(null); } } } return publisher; }
@Test public void testToString() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTemporaryTopic(); TopicPublisher publisher = session.createPublisher(topic); assertNotNull(publisher.toString()); }
@Test public void testPublishToTopicFailsIfNotAnonymousPublisher() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTemporaryTopic(); TopicPublisher publisher = session.createPublisher(topic); try { publisher.publish(session.createTemporaryTopic(), session.createTextMessage()); fail("Should not be able to send to alternate destination"); } catch (UnsupportedOperationException ex) {} }
@Test(timeout = 60000) public void testJmsPoolConnectionFactory() throws Exception { ActiveMQTopic topic = new ActiveMQTopic("test"); pcf = new JmsPoolConnectionFactory(); pcf.setConnectionFactory(new ActiveMQConnectionFactory( "vm://test?broker.persistent=false&broker.useJmx=false")); connection = (TopicConnection) pcf.createConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher publisher = session.createPublisher(topic); publisher.publish(session.createMessage()); }
@Parameters({ "broker-port"}) @Test public void testSubscriberPublisher(String port) throws Exception { String topicName = "MyTopic1"; int numberOfMessages = 100; InitialContext initialContext = ClientHelper .getInitialContextBuilder("admin", "admin", "localhost", port) .withTopic(topicName) .build(); TopicConnectionFactory connectionFactory = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY); TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); // Initialize subscriber TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic subscriberDestination = (Topic) initialContext.lookup(topicName); TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination); // publish 100 messages TopicSession producerSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher producer = producerSession.createPublisher(subscriberDestination); for (int i = 0; i < numberOfMessages; i++) { producer.publish(producerSession.createTextMessage("Test message " + i)); } producerSession.close(); for (int i = 0; i < numberOfMessages; i++) { Message message = subscriber.receive(1000); Assert.assertNotNull(message, "Message #" + i + " was not received"); } connection.close(); }
/** * Build a dynamic proxy that reflectively adapts to JMS 2.0 API methods, if necessary. * Otherwise simply return this CachedMessageProducer instance itself. */ public MessageProducer getProxyIfNecessary() { if (completionListenerClass != null) { return (MessageProducer) Proxy.newProxyInstance(CachedMessageProducer.class.getClassLoader(), new Class<?>[] {MessageProducer.class, QueueSender.class, TopicPublisher.class}, new Jms2MessageProducerInvocationHandler()); } else { return this; } }
private static void closeResources(TopicConnection pConn, TopicSession pSession, TopicPublisher pPublisher) throws JMSException { if (pPublisher != null) { pPublisher.close(); } if (pSession != null) { pSession.close(); } if (pConn != null) { pConn.close(); } }
/** * Send the JMS Message using matching Message Sender implementation. * * @param destination JMS Queue/Topic. * @param message JMS Message. * @param producer JMS Message Producer. * @throws JMSException Thrown when sending the message. */ private void sendJMSMessage(Destination destination, Message message, MessageProducer producer) throws JMSException { if (JMSConstants.JMSDestinationType.QUEUE.equals(jmsConnectionFactory.getDestinationType()) || !JMSConstants.JMS_SPEC_VERSION_1_0.equals(jmsConnectionFactory.getJmsSpec())) { producer.send(destination, message, message.getJMSDeliveryMode(), message.getJMSPriority(), message.getJMSExpiration()); } else { ((TopicPublisher) producer) .send(destination, message, message.getJMSDeliveryMode(), message.getJMSPriority(), message.getJMSExpiration()); } }
@Test public void testTempTopicDelete() throws Exception { connection.start(); TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryTopic tempTopic = topicSession.createTemporaryTopic(); ActiveMQConnection newConn = (ActiveMQConnection) factory.createConnection(); try { TopicSession newTopicSession = newConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher publisher = newTopicSession.createPublisher(tempTopic); TextMessage msg = newTopicSession.createTextMessage("Test Message"); publisher.publish(msg); try { TopicSubscriber consumer = newTopicSession.createSubscriber(tempTopic); fail("should have gotten exception but got consumer: " + consumer); } catch (JMSException ex) { //correct } connection.close(); try { Message newMsg = newTopicSession.createMessage(); publisher.publish(newMsg); } catch (JMSException e) { //ok } } finally { newConn.close(); } }
@Test(timeout = 60000) public void testSendAndReceiveOnAutoCreatedTopic() throws Exception { Connection connection = createConnection("myClientId"); String topicName = UUID.randomUUID().toString(); SimpleString simpleTopicName = SimpleString.toSimpleString(topicName); try { TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); TopicPublisher producer = session.createPublisher(topic); TextMessage message = session.createTextMessage("test-message"); // this will auto-create the address, but not the subscription queue producer.send(message); assertNotNull(server.getAddressInfo(simpleTopicName)); assertEquals(RoutingType.MULTICAST, server.getAddressInfo(simpleTopicName).getRoutingType()); assertTrue(server.getAddressInfo(simpleTopicName).isAutoCreated()); assertTrue(server.getPostOffice().getBindingsForAddress(simpleTopicName).getBindings().isEmpty()); // this will auto-create the subscription queue TopicSubscriber consumer = session.createSubscriber(topic); assertFalse(server.getPostOffice().getBindingsForAddress(simpleTopicName).getBindings().isEmpty()); producer.send(message); producer.close(); connection.start(); message = (TextMessage) consumer.receive(1000); assertNotNull(message); assertNotNull(message.getText()); assertEquals("test-message", message.getText()); consumer.close(); assertTrue(server.getPostOffice().getBindingsForAddress(simpleTopicName).getBindings().isEmpty()); } finally { connection.close(); } }
@Test(timeout = 60000) public void testSendWithMultipleReceiversOnTopic() throws Exception { Connection connection = createConnection(); try { TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(getTopicName()); TopicSubscriber consumer1 = session.createSubscriber(topic); TopicSubscriber consumer2 = session.createSubscriber(topic); TopicPublisher producer = session.createPublisher(topic); TextMessage message = session.createTextMessage("test-message"); producer.send(message); producer.close(); connection.start(); message = (TextMessage) consumer1.receive(1000); assertNotNull(message); assertNotNull(message.getText()); assertEquals("test-message", message.getText()); message = (TextMessage) consumer2.receive(1000); assertNotNull(message); assertNotNull(message.getText()); assertEquals("test-message", message.getText()); } finally { connection.close(); } }
/** * Create a new wrapper * * @param producer the producer * @param session the session */ public ActiveMQRATopicPublisher(final TopicPublisher producer, final ActiveMQRASession session) { super(producer, session); if (ActiveMQRATopicPublisher.trace) { ActiveMQRALogger.LOGGER.trace("constructor(" + producer + ", " + session + ")"); } }