public void testWithSessionCloseOutsideTheLoop() throws Exception { TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); for (int i = 0; i < 100; i++) { TopicSubscriber subscriber = subscriberSession.createSubscriber(topic); DummyMessageListener listener = new DummyMessageListener(); subscriber.setMessageListener(listener); subscriber.close(); } subscriberSession.close(); connection.close(); Thread.sleep(1000); Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic); assertNotNull(dest); assertTrue(dest.getConsumers().isEmpty()); }
@Test public void testGetTopic() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTemporaryTopic(); TopicSubscriber subscriber = session.createSubscriber(topic); assertNotNull(subscriber.getTopic()); assertSame(topic, subscriber.getTopic()); subscriber.close(); try { subscriber.getTopic(); fail("Cannot read topic on closed subscriber"); } catch (IllegalStateException ise) {} }
@Test public void testGetNoLocal() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTemporaryTopic(); TopicSubscriber subscriber = session.createDurableSubscriber(topic, "name", "color = red", true); assertTrue(subscriber.getNoLocal()); subscriber.close(); try { subscriber.getNoLocal(); fail("Cannot read state on closed subscriber"); } catch (IllegalStateException ise) {} }
public JMSSink(final String tcfBindingName, final String topicBindingName, final String username, final String password) { try { final Context ctx = new InitialContext(); final TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) lookup(ctx, tcfBindingName); final TopicConnection topicConnection = topicConnectionFactory.createTopicConnection(username, password); topicConnection.start(); final TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); final Topic topic = (Topic) ctx.lookup(topicBindingName); final TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic); topicSubscriber.setMessageListener(this); } catch (final Exception e) { logger.error("Could not read JMS message.", e); } }
/** * Close a JMS {@link MessageConsumer}. * @param messageConsumer JMS Message Consumer that needs to be closed. * @throws JMSException if an error occurs while closing the consumer. */ public void closeConsumer(MessageConsumer messageConsumer) throws JMSException { if (messageConsumer != null) { if (logger.isDebugEnabled()) { logger.debug("Closing a JMS Message Consumer of: " + this.connectionFactoryString); } if ((JMSConstants.JMS_SPEC_VERSION_1_1.equals(jmsSpec)) || (JMSConstants.JMS_SPEC_VERSION_2_0 .equals(jmsSpec))) { messageConsumer.close(); } else { if (JMSConstants.JMSDestinationType.QUEUE.equals(this.destinationType)) { if (messageConsumer instanceof QueueReceiver) { ((QueueReceiver) messageConsumer).close(); } } else { if (messageConsumer instanceof TopicSubscriber) { ((TopicSubscriber) messageConsumer).close(); } } } } }
@Test public void testNotificationProperties() throws Exception { try (TopicConnection topicConnection = factory.createTopicConnection()) { TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic notificationsTopic = topicSession.createTopic("activemq.notifications"); TopicSubscriber subscriber = topicSession.createSubscriber(notificationsTopic); List<Message> receivedMessages = new CopyOnWriteArrayList<>(); subscriber.setMessageListener(receivedMessages::add); topicConnection.start(); Wait.waitFor(() -> receivedMessages.size() > 0); Assert.assertTrue(receivedMessages.size() > 0); for (Message message : receivedMessages) { assertNotNull(message); assertNotNull(message.getStringProperty("_AMQ_NotifType")); } } }
@Test(timeout = 60000) public void testSendAndReceiveOnTopic() throws Exception { Connection connection = createConnection("myClientId"); try { TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(getTopicName()); TopicSubscriber consumer = session.createSubscriber(topic); TopicPublisher producer = session.createPublisher(topic); TextMessage message = session.createTextMessage("test-message"); producer.send(message); producer.close(); connection.start(); message = (TextMessage) consumer.receive(1000); assertNotNull(message); assertNotNull(message.getText()); assertEquals("test-message", message.getText()); } finally { connection.close(); } }
@Test(timeout = 60000) public void testDurableSubscriptionUnsubscribe() throws Exception { Connection connection = createConnection("myClientId"); try { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(getTopicName()); TopicSubscriber myDurSub = session.createDurableSubscriber(topic, "myDurSub"); session.close(); connection.close(); connection = createConnection("myClientId"); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); myDurSub = session.createDurableSubscriber(topic, "myDurSub"); myDurSub.close(); Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub"))); session.unsubscribe("myDurSub"); Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub"))); session.close(); connection.close(); } finally { connection.close(); } }
public void testWithSessionAndSubsciberClose() throws Exception { TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); for (int i = 0; i < 100; i++) { TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber subscriber = subscriberSession.createSubscriber(topic); DummyMessageListener listener = new DummyMessageListener(); subscriber.setMessageListener(listener); subscriber.close(); subscriberSession.close(); } connection.close(); Thread.sleep(1000); Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic); assertNotNull(dest); assertTrue(dest.getConsumers().isEmpty()); }
public void testWithOneSubscriber() throws Exception { TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber subscriber = subscriberSession.createSubscriber(topic); DummyMessageListener listener = new DummyMessageListener(); subscriber.setMessageListener(listener); subscriber.close(); subscriberSession.close(); connection.close(); Thread.sleep(1000); Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic); assertNotNull(dest); assertTrue(dest.getConsumers().isEmpty()); }
public void testWithoutSessionAndSubsciberClose() throws Exception { TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); for (int i = 0; i < 100; i++) { TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber subscriber = subscriberSession.createSubscriber(topic); assertNotNull(subscriber); } connection.close(); Thread.sleep(1000); Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic); assertNotNull(dest); assertTrue(dest.getConsumers().isEmpty()); }
/** * Running this test you can produce a leak of only 2 ConsumerInfo on BE * broker, NOT 200 as in other cases! */ public void testWithoutSessionAndSubsciberClosePlayAround() throws Exception { TopicConnection connection = connectionFactory.createTopicConnection(); connection.start(); for (int i = 0; i < 100; i++) { TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber subscriber = subscriberSession.createSubscriber(topic); DummyMessageListener listener = new DummyMessageListener(); subscriber.setMessageListener(listener); if (i != 50) { subscriber.close(); subscriberSession.close(); } } connection.close(); Thread.sleep(1000); Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic); assertNotNull(dest); assertTrue(dest.getConsumers().isEmpty()); }
private Message collectMessagesFromDurableSubscriptionForOneMinute() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName); TopicConnection connection = connectionFactory.createTopicConnection(); connection.setClientID(clientID); TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = topicSession.createTopic(topicName); connection.start(); TopicSubscriber subscriber = topicSession.createDurableSubscriber(topic, durableSubName); LOG.info("About to receive messages"); Message message = subscriber.receive(120000); subscriber.close(); connection.close(); LOG.info("collectMessagesFromDurableSubscriptionForOneMinute done"); return message; }
public void testRemove() throws Exception { Connection connection = createConnection(); connection.setClientID("cliID"); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber subscriber = session.createDurableSubscriber((Topic) createDestination(), "subName"); subscriber.close(); connection.close(); assertTrue(Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { return broker.getAdminView().getInactiveDurableTopicSubscribers().length == 0; } }, 15000)); }
public void testRemoveAfterRestart() throws Exception { Connection connection = createConnection(); connection.setClientID("cliID"); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber subscriber = session.createDurableSubscriber((Topic) createDestination(), "subName"); subscriber.close(); connection.close(); LOG.info("Broker restarting, wait for inactive cleanup afterwards."); restartBroker(); LOG.info("Broker restarted, wait for inactive cleanup now."); assertTrue(broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1); assertTrue(Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { return broker.getAdminView().getInactiveDurableTopicSubscribers().length == 0; } }, 20000)); }
@Test public void testGetNoLocalOnClosedConsumer() throws Exception { Connection consumerConnection = null; try { consumerConnection = createConnection(); TopicConnection tc = (TopicConnection) consumerConnection; TopicSession consumerSession = tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber topicConsumer = consumerSession.createSubscriber(ActiveMQServerTestCase.topic1); topicConsumer.close(); try { topicConsumer.getNoLocal(); Assert.fail("must throw a JMS IllegalStateException"); } catch (javax.jms.IllegalStateException e) { // OK } } finally { if (consumerConnection != null) { consumerConnection.close(); } } }
@Test public void testGetTopic() throws Exception { Connection consumerConnection = null; try { consumerConnection = createConnection(); Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer topicConsumer = consumerSession.createConsumer(ActiveMQServerTestCase.topic1); Topic t = ((TopicSubscriber) topicConsumer).getTopic(); ProxyAssertSupport.assertEquals(ActiveMQServerTestCase.topic1, t); } finally { if (consumerConnection != null) { consumerConnection.close(); } } }
@Test public void testGetTopicOnClosedConsumer() throws Exception { Connection consumerConnection = null; try { consumerConnection = createConnection(); Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer topicConsumer = consumerSession.createConsumer(ActiveMQServerTestCase.topic1); topicConsumer.close(); try { ((TopicSubscriber) topicConsumer).getTopic(); Assert.fail("must throw a JMS IllegalStateException"); } catch (javax.jms.IllegalStateException e) { // OK } } finally { if (consumerConnection != null) { consumerConnection.close(); } } }
/** * Topics shouldn't hold on to messages when the non-durable subscribers close */ @Test public void testPersistentMessagesForTopicDropped2() throws Exception { TopicConnection topicConn = createTopicConnection(); topicConn.start(); TopicSession sess = topicConn.createTopicSession(true, 0); TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1); TopicSubscriber sub = sess.createSubscriber(ActiveMQServerTestCase.topic1); pub.setDeliveryMode(DeliveryMode.PERSISTENT); Message m = sess.createTextMessage("testing123"); pub.publish(m); sess.commit(); // receive but rollback TextMessage m2 = (TextMessage) sub.receive(3000); ProxyAssertSupport.assertNotNull(m2); ProxyAssertSupport.assertEquals("testing123", m2.getText()); sess.rollback(); topicConn.close(); checkEmpty(ActiveMQServerTestCase.topic1); }
@Test public void testUnsubscribeWithActiveConsumer() throws Exception { Connection conn = createConnection(); conn.setClientID("zeke"); Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber dursub = s.createDurableSubscriber(ActiveMQServerTestCase.topic1, "dursub0"); try { s.unsubscribe("dursub0"); ProxyAssertSupport.fail(); } catch (IllegalStateException e) { // Ok - it is illegal to ubscribe a subscription if it has active consumers } dursub.close(); s.unsubscribe("dursub0"); }
@Test public void testSubscribeWithActiveSubscription() throws Exception { Connection conn = createConnection(); conn.setClientID("zeke"); Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber dursub1 = s.createDurableSubscriber(ActiveMQServerTestCase.topic1, "dursub1"); try { s.createDurableSubscriber(ActiveMQServerTestCase.topic1, "dursub1"); ProxyAssertSupport.fail(); } catch (IllegalStateException e) { // Ok - it is illegal to have more than one active subscriber on a subscrtiption at any one time } dursub1.close(); s.unsubscribe("dursub1"); }
/** * Create a topic subscriber * * @param topic The topic * @return The subscriber * @throws JMSException Thrown if an error occurs */ @Override public TopicSubscriber createSubscriber(final Topic topic) throws JMSException { lock(); try { TopicSession session = getTopicSessionInternal(); if (ActiveMQRASession.trace) { ActiveMQRALogger.LOGGER.trace("createSubscriber " + session + " topic=" + topic); } TopicSubscriber result = session.createSubscriber(topic); result = new ActiveMQRATopicSubscriber(result, this); if (ActiveMQRASession.trace) { ActiveMQRALogger.LOGGER.trace("createdSubscriber " + session + " ActiveMQTopicSubscriber=" + result); } addConsumer(result); return result; } finally { unlock(); } }
@Override public TopicSubscriber createDurableSubscriber(final Topic topic, final String name, String messageSelector, final boolean noLocal) throws JMSException { // As per spec. section 4.11 if (sessionType == ActiveMQSession.TYPE_QUEUE_SESSION) { throw new IllegalStateException("Cannot create a durable subscriber on a QueueSession"); } checkTopic(topic); if (!(topic instanceof ActiveMQDestination)) { throw new InvalidDestinationException("Not an ActiveMQTopic:" + topic); } if ("".equals(messageSelector)) { messageSelector = null; } ActiveMQDestination jbdest = (ActiveMQDestination) topic; if (jbdest.isQueue()) { throw new InvalidDestinationException("Cannot create a subscriber on a queue"); } return createConsumer(jbdest, name, messageSelector, noLocal, ConsumerDurability.DURABLE); }
@Override public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { externalAccessLock.readLock().lock(); try { checkNotClosed(); LocalTopicSubscriber subscriber = new LocalTopicSubscriber(engine,this,topic,messageSelector,noLocal,idProvider.createID(),null); registerConsumer(subscriber); subscriber.initDestination(); return subscriber; } finally { externalAccessLock.readLock().unlock(); } }
@Override public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { externalAccessLock.readLock().lock(); try { checkNotClosed(); RemoteDurableTopicSubscriber subscriber = new RemoteDurableTopicSubscriber(idProvider.createID(), this, topic, messageSelector, noLocal, name); registerConsumer(subscriber); subscriber.remoteInit(); return subscriber; } finally { externalAccessLock.readLock().unlock(); } }
@Override public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { externalAccessLock.readLock().lock(); try { checkNotClosed(); RemoteTopicSubscriber subscriber = new RemoteTopicSubscriber(idProvider.createID(), this, DestinationTools.asRef(topic), messageSelector, noLocal); registerConsumer(subscriber); subscriber.remoteInit(); return subscriber; } finally { externalAccessLock.readLock().unlock(); } }
public void testClientDeleteQueueSuccess() throws Exception { try { Connection conn = getConnection("test", "client", "guest"); Session sess = conn.createSession(true, Session.SESSION_TRANSACTED); conn.start(); // create kipper Topic kipper = sess.createTopic("kipper"); TopicSubscriber subscriber = sess.createDurableSubscriber(kipper, "kipper"); subscriber.close(); sess.unsubscribe("kipper"); //Do something to show connection is active. sess.rollback(); conn.close(); } catch (Exception e) { fail("Test failed due to:" + e.getMessage()); } }
public void testServerDeleteQueueFailure() throws Exception { try { Connection conn = getConnection("test", "server", "guest"); Session sess = conn.createSession(true, Session.SESSION_TRANSACTED); conn.start(); // create kipper Topic kipper = sess.createTopic("kipper"); TopicSubscriber subscriber = sess.createDurableSubscriber(kipper, "kipper"); subscriber.close(); sess.unsubscribe("kipper"); //Do something to show connection is active. sess.rollback(); conn.close(); } catch (JMSException e) { // JMSException -> linedException = AMQException.403 check403Exception(e.getLinkedException()); } }
public void testQueueReceiversAndTopicSubscriber() throws Exception { Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}"); Topic topic = new AMQAnyDestination("ADDR:amq.topic/test"); QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); QueueReceiver receiver = qSession.createReceiver(queue); TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber sub = tSession.createSubscriber(topic); Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue")); prod1.send(ssn.createTextMessage("test1")); MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test")); prod2.send(ssn.createTextMessage("test2")); Message msg1 = receiver.receive(); assertNotNull(msg1); assertEquals("test1",((TextMessage)msg1).getText()); Message msg2 = sub.receive(); assertNotNull(msg2); assertEquals("test2",((TextMessage)msg2).getText()); }
public synchronized void initTopicSubscriber() throws JMSException, NamingException { //first of all we have to inititiate the TopicSession //(without this we can't instantiate a TopicSubscriber) initTopicSession(); //get the TopicSubscriber from our TopicSession TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic, null, true); logger.debug("Topic subscriber created"); //if set we pass our ExtendedMessageListener to the TopicSubscriber as MessageListener if (messageListener != null) { topicSubscriber.setMessageListener(messageListener); } //start listening to JMS topicConnection.start(); logger.info("Topic connection started"); }
@Override public boolean unsubscribe(String channel) { TopicSubscriber subscriber = this.subscribers.remove(channel); if (subscriber == null) { return false; } log.debug(String.format("Unsubscribing Subscriber[%s] for Topic [%s].", subscriber, channel)); try { subscriber.setMessageListener(null); subscriber.close(); } catch (JMSException e) {} return true; }
public TopicSubscriber createSubscriber(Topic topic) throws JMSException { lock(); try { TopicSession session = getTopicSession(); if (trace) log.trace("createSubscriber " + session + " topic=" + topic); TopicSubscriber result = session.createSubscriber(topic); result = new JmsTopicSubscriber(result, this); if (trace) log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result); addConsumer(result); return result; } finally { unlock(); } }
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { lock(); try { TopicSession session = getTopicSession(); if (trace) log.trace("createSubscriber " + session + " topic=" + topic + " selector=" + messageSelector + " noLocal=" + noLocal); TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal); result = new JmsTopicSubscriber(result, this); if (trace) log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result); addConsumer(result); return result; } finally { unlock(); } }
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { if (info.getType() == JmsConnectionFactory.QUEUE) { throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession"); } lock(); try { Session session = getSession(); if (trace) log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name); TopicSubscriber result = session.createDurableSubscriber(topic, name); result = new JmsTopicSubscriber(result, this); if (trace) log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result); addConsumer(result); return result; } finally { unlock(); } }
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { lock(); try { Session session = getSession(); if (trace) log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name + " selector=" + messageSelector + " noLocal=" + noLocal); TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal); result = new JmsTopicSubscriber(result, this); if (trace) log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result); addConsumer(result); return result; } finally { unlock(); } }
@Override public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { MessageProducerConsumerKey key = new MessageProducerConsumerKey(topic); if (log.isDebugEnabled()) { log.debug("looking for durable subscriber based on " + key); } TopicSubscriberWrapper topicSubscriber = topicSubscribers.get(key); if (topicSubscriber == null) { if (log.isDebugEnabled()) { log.debug("found no durable subscriber based on " + key + ", creating it"); } topicSubscriber = new TopicSubscriberWrapper(getSession().createDurableSubscriber(topic, name), this, pooledConnection.getPoolingConnectionFactory()); if (pooledConnection.getPoolingConnectionFactory().getCacheProducersConsumers()) { if (log.isDebugEnabled()) { log.debug("caching durable subscriber via key " + key); } topicSubscribers.put(key, topicSubscriber); } } else if (log.isDebugEnabled()) { log.debug("found durable subscriber based on " + key + ", recycling it: " + topicSubscriber); } return topicSubscriber; }
@Override public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { MessageProducerConsumerKey key = new MessageProducerConsumerKey(topic, messageSelector, noLocal); if (log.isDebugEnabled()) { log.debug("looking for durable subscriber based on " + key); } TopicSubscriberWrapper topicSubscriber = topicSubscribers.get(key); if (topicSubscriber == null) { if (log.isDebugEnabled()) { log.debug("found no durable subscriber based on " + key + ", creating it"); } topicSubscriber = new TopicSubscriberWrapper(getSession().createDurableSubscriber(topic, name, messageSelector, noLocal), this, pooledConnection.getPoolingConnectionFactory()); if (pooledConnection.getPoolingConnectionFactory().getCacheProducersConsumers()) { if (log.isDebugEnabled()) { log.debug("caching durable subscriber via key " + key); } topicSubscribers.put(key, topicSubscriber); } } else if (log.isDebugEnabled()) { log.debug("found durable subscriber based on " + key + ", recycling it: " + topicSubscriber); } return topicSubscriber; }
@Test public void testToString() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTemporaryTopic(); TopicSubscriber subscriber = session.createSubscriber(topic); assertNotNull(subscriber.toString()); }
@Override public TopicSubscriber createDurableSubscriber( final Topic topic, final String name ) throws JMSException { return notImplemented(); }
@Override public TopicSubscriber createDurableSubscriber( final Topic topic, final String name, final String messageSelector, final boolean noLocal ) throws JMSException { return notImplemented(); }