private void startSyncReceiveThread(final MessageConsumer messageConsumer, final long receiveTimeout) { syncReceiveThread = new SyncReceiveThread() { @Override public void run() { log.info("start listen to the " + typeStr + "[" + queue.getName() + "]."); while (running) { try { Message message = messageConsumer.receive(receiveTimeout); processMessage(message); } catch (Throwable e) { // 如果是关闭的时候,可能会报InterruptedException log.error("listen to the [" + queue.getName() + "] error.", e); } } closed = true; } }; syncReceiveThread.start(); }
public MessageConsumer createTopicConsumer(String selector) throws JMSException { if (isQueue) { throw new IllegalArgumentException("Only for topic, not queue"); } String consumerId = "consumer-" + UUID.randomUUID(); topicConnection = startConnection(consumerId); Session session = topicConnection.createSession(isTransacted, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(destinationName); if (isDurable) { if (selector != null) { return session.createDurableSubscriber(topic, consumerId, selector, true); } else { return session.createDurableSubscriber(topic, consumerId); } } else { if (selector != null) { return session.createConsumer(topic, selector); } else { return session.createConsumer(topic); } } }
/** Deletes the input Message from the given Queue, by creating a temporary consumer for that Message * @param session the JMS Session. * @param message the JMS Message. * @param queueName the Queue to consume from. * @throws FrameworkException in case any internal error occurs. * @throws ApplicationExceptions Indicates application error(s). */ static void consumeMessage(Session session, Message message, String queueName) throws FrameworkException, ApplicationExceptions { try { // Creates a consumer on the session for the given queueName, and specifying a selector having HEADER_JMS_MESSAGE_ID as the given messageId String selector = new StringBuilder(HEADER_JMS_MESSAGE_ID) .append("='") .append(message.getJMSMessageID()) .append('\'') .toString(); MessageConsumer consumer = session.createConsumer(JmsClientHelper.obtainQueue(queueName), selector); // Consume the message. Wait for 10 seconds at most Message m = consumer.receive(10000); if (m == null) throw new ApplicationExceptions(new JaffaMessagingApplicationException(JaffaMessagingApplicationException.MESSAGE_NOT_FOUND)); consumer.close(); } catch (JMSException e) { log.error("Error in consuming a JMS Message", e); throw new JaffaMessagingFrameworkException(JaffaMessagingFrameworkException.DELETE_ERROR, null, e); } }
@Test public void testReceive() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection(); Session session = connection.createSession(); Queue queue = session.createTemporaryQueue(); MessageConsumer consumer = session.createConsumer(queue, "Color = Red"); assertNull(consumer.receive()); consumer.close(); try { consumer.receive(); fail("Should not be able to interact with closed consumer"); } catch (IllegalStateException ise) {} }
@Test public void testReceiveTimed() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection(); Session session = connection.createSession(); Queue queue = session.createTemporaryQueue(); MessageConsumer consumer = session.createConsumer(queue, "Color = Red"); assertNull(consumer.receive(1)); consumer.close(); try { consumer.receive(1); fail("Should not be able to interact with closed consumer"); } catch (IllegalStateException ise) {} }
private void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException, InterruptedException { Connection connection = cf.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryQueue tempQueue = session.createTemporaryQueue(); TextMessage msg = session.createTextMessage("Request"); msg.setJMSReplyTo(tempQueue); MessageProducer producer = session.createProducer(session.createQueue(serviceQueue)); producer.send(msg); MessageConsumer consumer = session.createConsumer(tempQueue); Message replyMsg = consumer.receive(); assertNotNull(replyMsg); LOG.debug("Reply message: {}", replyMsg); consumer.close(); producer.close(); session.close(); connection.close(); }
public void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory, String queueName) throws JMSException { Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); final javax.jms.Message inMessage = consumer.receive(); String requestMessageId = inMessage.getJMSMessageID(); LOG.debug("Received message " + requestMessageId); final TextMessage replyMessage = session.createTextMessage("Result"); replyMessage.setJMSCorrelationID(inMessage.getJMSMessageID()); final MessageProducer producer = session.createProducer(inMessage.getJMSReplyTo()); LOG.debug("Sending reply to " + inMessage.getJMSReplyTo()); producer.send(replyMessage); producer.close(); consumer.close(); session.close(); connection.close(); }
/** * This test simply validates that {@link ConnectionFactory} can be setup by * pointing to the location of the client libraries at runtime. It uses * ActiveMQ which is not present at the POM but instead pulled from Maven * repo using {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which * implies that for this test to run the computer must be connected to the * Internet. If computer is not connected to the Internet, this test will * quietly fail logging a message. */ @Test public void validateFactoryCreationWithActiveMQLibraries() throws Exception { try { String libPath = TestUtils.setupActiveMqLibForTesting(true); TestRunner runner = TestRunners.newTestRunner(mock(Processor.class)); JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); runner.addControllerService("cfProvider", cfProvider); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "vm://localhost?broker.persistent=false"); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, "org.apache.activemq.ActiveMQConnectionFactory"); runner.enableControllerService(cfProvider); runner.assertValid(cfProvider); Connection connection = cfProvider.getConnectionFactory().createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue("myqueue"); MessageProducer producer = session.createProducer(queue); MessageConsumer consumer = session.createConsumer(queue); TextMessage message = session.createTextMessage("Hello"); producer.send(message); assertEquals("Hello", ((TextMessage) consumer.receive()).getText()); connection.stop(); connection.close(); } catch (Exception e) { logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e); } }
/** * JMS Consumer and Converter to convert retrieve and convert Message. * @param consumer The consumer * @param converter The converter */ public JmsMessageSource(MessageConsumer consumer, JmsMessageConverter converter) { try { this.converter = converter; consumer.setMessageListener(this); } catch (JMSException ex) { throw new RuntimeException(ex); } }
@Override public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { if (destination instanceof Topic) { throw new IllegalStateException("Operation not supported by a QueueSession"); } return super.createConsumer(destination, messageSelector); }
public MessageConsumer createConsumer(Destination destination, ProxySession session) throws JMSException { String destinationName = destination.toString(); ProxyMessageConsumer messageConsumer = new ProxyMessageConsumer(session); messageConsumer.setDestination(destination); if (destination instanceof Topic) { this.messageHandler.registerToTopic(destinationName, messageConsumer.getId()); } return messageConsumer; }
/** * @see javax.jms.Session#createConsumer(javax.jms.Destination, java.lang.String) */ @Override public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { if (destination instanceof Queue) { throw new IllegalStateException("Operation not supported by a TopicSession"); } return super.createConsumer(destination, messageSelector, noLocal); }
@Test public void testToString() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection(); Session session = connection.createSession(); Queue queue = session.createTemporaryQueue(); MessageConsumer consumer = session.createConsumer(queue); assertNotNull(consumer.toString()); }
@Test public void testCloseMoreThanOnce() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection(); Session session = connection.createSession(); Queue queue = session.createTemporaryQueue(); MessageConsumer consumer = session.createConsumer(queue); consumer.close(); consumer.close(); }
@Before public void setup() throws Exception { beforeSetup(); connectionFactory = mock(ConnectionFactory.class); connection = mock(Connection.class); session = mock(Session.class); queue = mock(Queue.class); topic = mock(Topic.class); messageConsumer = mock(MessageConsumer.class); message = mock(TextMessage.class); when(message.getPropertyNames()).thenReturn(new Enumeration<Object>() { @Override public boolean hasMoreElements() { return false; } @Override public Object nextElement() { throw new UnsupportedOperationException(); } }); when(message.getText()).thenReturn(TEXT); when(connectionFactory.createConnection(USERNAME, PASSWORD)).thenReturn(connection); when(connection.createSession(true, Session.SESSION_TRANSACTED)).thenReturn(session); when(session.createQueue(destinationName)).thenReturn(queue); when(session.createConsumer(any(Destination.class), anyString())).thenReturn(messageConsumer); when(messageConsumer.receiveNoWait()).thenReturn(message); when(messageConsumer.receive(anyLong())).thenReturn(message); destinationName = DESTINATION_NAME; destinationType = JMSDestinationType.QUEUE; destinationLocator = JMSDestinationLocator.CDI; messageSelector = SELECTOR; batchSize = 10; pollTimeout = 500L; context = new Context(); converter = new DefaultJMSMessageConverter.Builder().build(context); event = converter.convert(message).iterator().next(); userName = Optional.of(USERNAME); password = Optional.of(PASSWORD); afterSetup(); }
@Test(timeout = 60000) public void testLingeringPooledSessionsHoldingPrefetchedMessages() throws Exception { produceMessages(); Session pooledSession1 = pooledConn1.createSession(false, Session.AUTO_ACKNOWLEDGE); pooledSession1.createConsumer(queue); final QueueViewMBean view = getProxyToQueue(queue.getPhysicalName()); assertTrue("Should have all sent messages in flight:", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisfied() throws Exception { return view.getInFlightCount() == MESSAGE_COUNT; } }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(25))); // While all the message are in flight we should get anything on this consumer. Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(queue); assertNull(consumer.receive(500)); pooledConn1.close(); assertTrue("Should have only one consumer now:", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisfied() throws Exception { return view.getSubscriptions().length == 1; } }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(25))); // Now we'd expect that the message stuck in the prefetch of the pooled session's // consumer would be rerouted to the non-pooled session's consumer. assertNotNull(consumer.receive(10000)); }
@Test(timeout = 60000) public void testNonPooledConnectionCloseNotHoldingPrefetchedMessages() throws Exception { produceMessages(); Session directSession = directConn2.createSession(false, Session.AUTO_ACKNOWLEDGE); directSession.createConsumer(queue); final QueueViewMBean view = getProxyToQueue(queue.getPhysicalName()); assertTrue("Should have all sent messages in flight:", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisfied() throws Exception { return view.getInFlightCount() == MESSAGE_COUNT; } }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(25))); // While all the message are in flight we should get anything on this consumer. Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(queue); assertNull(consumer.receive(500)); directConn2.close(); assertTrue("Should have only one consumer now:", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisfied() throws Exception { return view.getSubscriptions().length == 1; } }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(25))); // Now we'd expect that the message stuck in the prefetch of the first session's // consumer would be rerouted to the alternate session's consumer. assertNotNull(consumer.receive(10000)); }
/** * This test simply validates that {@link ConnectionFactory} can be setup by pointing to the location of the client * libraries at runtime. It uses ActiveMQ which is not present at the POM but instead pulled from Maven repo using * {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which implies that for this test to run the computer must * be connected to the Internet. If computer is not connected to the Internet, this test will quietly fail logging a * message. */ @Test public void validateFactoryCreationWithActiveMQLibraries() throws Exception { try { String libPath = TestUtils.setupActiveMqLibForTesting(true); TestRunner runner = TestRunners.newTestRunner(mock(Processor.class)); JNDIConnectionFactoryProvider cfProvider = new JNDIConnectionFactoryProvider(); runner.addControllerService("cfProvider", cfProvider); runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.BROKER_URI, "vm://localhost?broker.persistent=false"); runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.JNDI_CF_LOOKUP, "ConnectionFactory"); runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath); runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); runner.enableControllerService(cfProvider); runner.assertValid(cfProvider); Connection connection = cfProvider.getConnectionFactory().createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue("myqueue"); MessageProducer producer = session.createProducer(queue); MessageConsumer consumer = session.createConsumer(queue); TextMessage message = session.createTextMessage("Hello"); producer.send(message); assertEquals("Hello", ((TextMessage) consumer.receive()).getText()); connection.stop(); connection.close(); } catch (Exception e) { logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e); } }
@Override public MessageConsumer createConsumer( final Destination destination, final String messageSelector ) throws JMSException { return notImplemented(); }
@Override public MessageConsumer createConsumer( final Destination destination, final String messageSelector, final boolean noLocal ) throws JMSException { return notImplemented(); }
@Override public MessageConsumer createSharedConsumer( final Topic topic, final String sharedSubscriptionName, final String messageSelector ) throws JMSException { return notImplemented(); }
@Override public MessageConsumer createDurableConsumer( final Topic topic, final String name, final String messageSelector, final boolean noLocal ) throws JMSException { return notImplemented(); }
public static void main(String[] args) { try { // Create a ConnectionFactory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://Toshiba:61616"); // Create a Connection Connection connection = connectionFactory.createConnection(); connection.start(); // Create a Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create the destination (Topic or Queue) Destination destination = session.createQueue("HELLOWORLD.TESTQ"); // Create a MessageConsumer from the Session to the Topic or Queue MessageConsumer consumer = session.createConsumer(destination); // Wait for a message Message message = consumer.receive(1000); if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("Received: " + text); } else { System.out.println("Received: " + message); } consumer.close(); session.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } }
public void run() { try { // Create a ConnectionFactory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://Toshiba:61616"); // Create a Connection Connection connection = connectionFactory.createConnection(); connection.start(); connection.setExceptionListener(this); // Create a Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create the destination (Topic or Queue) Destination destination = session.createQueue("HELLOWORLD.TESTQ"); // Create a MessageConsumer from the Session to the Topic or Queue MessageConsumer consumer = session.createConsumer(destination); // Wait for a message Message message = consumer.receive(1000); if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("Received: " + text); } else { System.out.println("Received: " + message); } consumer.close(); session.close(); connection.close(); } catch (Exception e) { System.out.println("Caught: " + e); e.printStackTrace(); } }
@Bean public MessageConsumer sqsMessageConsumer(final SQSConnection connection) throws JMSException { /* * Create the session and use UNORDERED_ACKNOWLEDGE mode. Acknowledging * messages deletes them from the queue. Each message must be individually * acknowledged */ Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE); return session.createConsumer(session.createQueue(queueName)); }
@Bean public MessageConsumer sqsMessageConsumer(SQSConnection connection) throws JMSException { /* * Create the session and use CLIENT_ACKNOWLEDGE mode. Acknowledging * messages deletes them from the queue */ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); return session.createConsumer(session.createQueue(queueName)); }
@Parameters({ "broker-ssl-port"}) @Test public void testConsumerProducerWithSsl(String port) throws Exception { String queueName = "testConsumerProducerWithAutoAck"; InitialContext initialContextForQueue = ClientHelper .getInitialContextBuilder("admin", "admin", "localhost", port) .enableSsl() .withQueue(queueName) .build(); ConnectionFactory connectionFactory = (ConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY); Connection connection = connectionFactory.createConnection(); connection.start(); // publish 100 messages Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = producerSession.createQueue(queueName); MessageProducer producer = producerSession.createProducer(queue); int numberOfMessages = 100; for (int i = 0; i < numberOfMessages; i++) { producer.send(producerSession.createTextMessage("Test message " + i)); } producerSession.close(); // Consume published messages Session subscriberSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination subscriberDestination = (Destination) initialContextForQueue.lookup(queueName); MessageConsumer consumer = subscriberSession.createConsumer(subscriberDestination); for (int i = 0; i < numberOfMessages; i++) { Message message = consumer.receive(1000); Assert.assertNotNull(message, "Message #" + i + " was not received"); } connection.close(); }
@Test public void sendAndReceive() throws Exception { Destination destination = session.createQueue("TEST.FOO"); MessageProducer messageProducer = session.createProducer(destination); messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // Instrument MessageProducer with TracingMessageProducer TracingMessageProducer producer = new TracingMessageProducer(messageProducer, mockTracer); MessageConsumer messageConsumer = session.createConsumer(destination); // Instrument MessageConsumer with TracingMessageConsumer TracingMessageConsumer consumer = new TracingMessageConsumer(messageConsumer, mockTracer); TextMessage message = session.createTextMessage("Hello world"); producer.send(message); TextMessage received = (TextMessage) consumer.receive(5000); assertEquals("Hello world", received.getText()); List<MockSpan> mockSpans = mockTracer.finishedSpans(); assertEquals(2, mockSpans.size()); checkSpans(mockSpans); assertNull(mockTracer.activeSpan()); }
@Override protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer) throws JMSException { //消费MQ限速 if (!rateLimiter.tryAcquire()) { return false; } return super.receiveAndExecute(invoker, session, consumer); }
private void doCleanupQueue( final Session session, final Destination destination ) throws JMSException { try { MessageConsumer consumer = session.createConsumer(destination); Message message = null; do { message = consumer.receiveNoWait(); if (message != null) { message.acknowledge(); } } while (message != null); } finally { releaseSession(false); } }
@Override protected MessageConsumer createConsumer(Session session, Destination destination, String messageSelector) throws JMSException { return new TracingMessageConsumer(super.createConsumer(session, destination, messageSelector), tracer); }
@Override public MessageConsumer createSubscriber() throws JMSException { TopicSubscriber recv = ((TopicSession) session).createSubscriber((Topic) topic, messageSelector, true); log.debug("Created non-durable subscriber"); return recv; }
@Override public void requeueFailedMessages() { try { ActiveMQConnection connection = ActiveMqUtils.openConnection(user, password, url); Session session = ActiveMqUtils.startSession(connection); int count = getQueueSize(session, queueError); if (count < 1) { return; } log.info("Requeuing {} failed messages...", count); Queue queueErr = session.createQueue(queueError); MessageConsumer consumer = session.createConsumer(queueErr); Queue queueRetry = session.createQueue(queueInput); MessageProducer producer = session.createProducer(queueRetry); for (int consumed = 0; consumed < count; consumed++) { TextMessage message = (TextMessage) consumer.receive(REQUEUE_TIMEOUT); if (message == null) { continue; } String text = message.getText(); String requestId = message.getJMSCorrelationID(); log.info("Requeuing message '{}'", text); try { TextMessage newMessage = session.createTextMessage(text); newMessage.setJMSCorrelationID(requestId); producer.send(newMessage); } catch (Exception e) { log.error("Failed to requeue message", e); } message.acknowledge(); session.commit(); } producer.close(); consumer.close(); } catch (JMSException ex) { throw new MessageQueueException("Failed to requeue failed messages", ex); } }
@Override public MessageConsumer createConsumer(Destination destination) throws JMSException { return addConsumer(getInternalSession().createConsumer(destination)); }
@Override public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException { return addConsumer(getInternalSession().createConsumer(destination, selector)); }
@Override public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException { return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal)); }
@Override public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector) throws JMSException { PooledSessionHolder state = safeGetSessionHolder(); state.getConnection().checkClientJMSVersionSupport(2, 0); return addConsumer(state.getSession().createSharedConsumer(topic, sharedSubscriptionName, messageSelector)); }
@Override public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException { PooledSessionHolder state = safeGetSessionHolder(); state.getConnection().checkClientJMSVersionSupport(2, 0); return addConsumer(state.getSession().createDurableConsumer(topic, name)); }