Java 类javax.jms.MessageConsumer 实例源码
项目:tangyuan2
文件:ActiveMqReceiver.java
private void startSyncReceiveThread(final MessageConsumer messageConsumer, final long receiveTimeout) {
syncReceiveThread = new SyncReceiveThread() {
@Override
public void run() {
log.info("start listen to the " + typeStr + "[" + queue.getName() + "].");
while (running) {
try {
Message message = messageConsumer.receive(receiveTimeout);
processMessage(message);
} catch (Throwable e) {
// 如果是关闭的时候,可能会报InterruptedException
log.error("listen to the [" + queue.getName() + "] error.", e);
}
}
closed = true;
}
};
syncReceiveThread.start();
}
项目:xtf
文件:JmsClient.java
public MessageConsumer createTopicConsumer(String selector) throws JMSException {
if (isQueue) {
throw new IllegalArgumentException("Only for topic, not queue");
}
String consumerId = "consumer-" + UUID.randomUUID();
topicConnection = startConnection(consumerId);
Session session = topicConnection.createSession(isTransacted, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(destinationName);
if (isDurable) {
if (selector != null) {
return session.createDurableSubscriber(topic, consumerId, selector, true);
} else {
return session.createDurableSubscriber(topic, consumerId);
}
} else {
if (selector != null) {
return session.createConsumer(topic, selector);
} else {
return session.createConsumer(topic);
}
}
}
项目:jaffa-framework
文件:JmsBrowser.java
/** Deletes the input Message from the given Queue, by creating a temporary consumer for that Message
* @param session the JMS Session.
* @param message the JMS Message.
* @param queueName the Queue to consume from.
* @throws FrameworkException in case any internal error occurs.
* @throws ApplicationExceptions Indicates application error(s).
*/
static void consumeMessage(Session session, Message message, String queueName) throws FrameworkException, ApplicationExceptions {
try {
// Creates a consumer on the session for the given queueName, and specifying a selector having HEADER_JMS_MESSAGE_ID as the given messageId
String selector = new StringBuilder(HEADER_JMS_MESSAGE_ID)
.append("='")
.append(message.getJMSMessageID())
.append('\'')
.toString();
MessageConsumer consumer = session.createConsumer(JmsClientHelper.obtainQueue(queueName), selector);
// Consume the message. Wait for 10 seconds at most
Message m = consumer.receive(10000);
if (m == null)
throw new ApplicationExceptions(new JaffaMessagingApplicationException(JaffaMessagingApplicationException.MESSAGE_NOT_FOUND));
consumer.close();
} catch (JMSException e) {
log.error("Error in consuming a JMS Message", e);
throw new JaffaMessagingFrameworkException(JaffaMessagingFrameworkException.DELETE_ERROR, null, e);
}
}
项目:pooled-jms
文件:JmsPoolMessageConusmerTest.java
@Test
public void testReceive() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
Session session = connection.createSession();
Queue queue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(queue, "Color = Red");
assertNull(consumer.receive());
consumer.close();
try {
consumer.receive();
fail("Should not be able to interact with closed consumer");
} catch (IllegalStateException ise) {}
}
项目:pooled-jms
文件:JmsPoolMessageConusmerTest.java
@Test
public void testReceiveTimed() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
Session session = connection.createSession();
Queue queue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(queue, "Color = Red");
assertNull(consumer.receive(1));
consumer.close();
try {
consumer.receive(1);
fail("Should not be able to interact with closed consumer");
} catch (IllegalStateException ise) {}
}
项目:pooled-jms
文件:PooledConnectionTempQueueTest.java
private void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException, InterruptedException {
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = session.createTemporaryQueue();
TextMessage msg = session.createTextMessage("Request");
msg.setJMSReplyTo(tempQueue);
MessageProducer producer = session.createProducer(session.createQueue(serviceQueue));
producer.send(msg);
MessageConsumer consumer = session.createConsumer(tempQueue);
Message replyMsg = consumer.receive();
assertNotNull(replyMsg);
LOG.debug("Reply message: {}", replyMsg);
consumer.close();
producer.close();
session.close();
connection.close();
}
项目:pooled-jms
文件:PooledConnectionTempQueueTest.java
public void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory, String queueName) throws JMSException {
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
final javax.jms.Message inMessage = consumer.receive();
String requestMessageId = inMessage.getJMSMessageID();
LOG.debug("Received message " + requestMessageId);
final TextMessage replyMessage = session.createTextMessage("Result");
replyMessage.setJMSCorrelationID(inMessage.getJMSMessageID());
final MessageProducer producer = session.createProducer(inMessage.getJMSReplyTo());
LOG.debug("Sending reply to " + inMessage.getJMSReplyTo());
producer.send(replyMessage);
producer.close();
consumer.close();
session.close();
connection.close();
}
项目:solace-integration-guides
文件:JMSConnectionFactoryProviderTest.java
/**
* This test simply validates that {@link ConnectionFactory} can be setup by
* pointing to the location of the client libraries at runtime. It uses
* ActiveMQ which is not present at the POM but instead pulled from Maven
* repo using {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which
* implies that for this test to run the computer must be connected to the
* Internet. If computer is not connected to the Internet, this test will
* quietly fail logging a message.
*/
@Test
public void validateFactoryCreationWithActiveMQLibraries() throws Exception {
try {
String libPath = TestUtils.setupActiveMqLibForTesting(true);
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI,
"vm://localhost?broker.persistent=false");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
"org.apache.activemq.ActiveMQConnectionFactory");
runner.enableControllerService(cfProvider);
runner.assertValid(cfProvider);
Connection connection = cfProvider.getConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("myqueue");
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = session.createTextMessage("Hello");
producer.send(message);
assertEquals("Hello", ((TextMessage) consumer.receive()).getText());
connection.stop();
connection.close();
} catch (Exception e) {
logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e);
}
}
项目:axon-jms
文件:JmsMessageSource.java
/**
* JMS Consumer and Converter to convert retrieve and convert Message.
* @param consumer The consumer
* @param converter The converter
*/
public JmsMessageSource(MessageConsumer consumer, JmsMessageConverter converter) {
try {
this.converter = converter;
consumer.setMessageListener(this);
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
项目:pooled-jms
文件:MockJMSQueueSession.java
@Override
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
if (destination instanceof Topic) {
throw new IllegalStateException("Operation not supported by a QueueSession");
}
return super.createConsumer(destination, messageSelector);
}
项目:lemon
文件:ProxyConnectionFactory.java
public MessageConsumer createConsumer(Destination destination,
ProxySession session) throws JMSException {
String destinationName = destination.toString();
ProxyMessageConsumer messageConsumer = new ProxyMessageConsumer(session);
messageConsumer.setDestination(destination);
if (destination instanceof Topic) {
this.messageHandler.registerToTopic(destinationName,
messageConsumer.getId());
}
return messageConsumer;
}
项目:pooled-jms
文件:MockJMSTopicSession.java
/**
* @see javax.jms.Session#createConsumer(javax.jms.Destination, java.lang.String)
*/
@Override
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
if (destination instanceof Queue) {
throw new IllegalStateException("Operation not supported by a TopicSession");
}
return super.createConsumer(destination, messageSelector, noLocal);
}
项目:pooled-jms
文件:JmsPoolMessageConusmerTest.java
@Test
public void testToString() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
Session session = connection.createSession();
Queue queue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(queue);
assertNotNull(consumer.toString());
}
项目:pooled-jms
文件:JmsPoolMessageConusmerTest.java
@Test
public void testCloseMoreThanOnce() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
Session session = connection.createSession();
Queue queue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(queue);
consumer.close();
consumer.close();
}
项目:flume-release-1.7.0
文件:JMSMessageConsumerTestBase.java
@Before
public void setup() throws Exception {
beforeSetup();
connectionFactory = mock(ConnectionFactory.class);
connection = mock(Connection.class);
session = mock(Session.class);
queue = mock(Queue.class);
topic = mock(Topic.class);
messageConsumer = mock(MessageConsumer.class);
message = mock(TextMessage.class);
when(message.getPropertyNames()).thenReturn(new Enumeration<Object>() {
@Override
public boolean hasMoreElements() {
return false;
}
@Override
public Object nextElement() {
throw new UnsupportedOperationException();
}
});
when(message.getText()).thenReturn(TEXT);
when(connectionFactory.createConnection(USERNAME, PASSWORD)).thenReturn(connection);
when(connection.createSession(true, Session.SESSION_TRANSACTED)).thenReturn(session);
when(session.createQueue(destinationName)).thenReturn(queue);
when(session.createConsumer(any(Destination.class), anyString())).thenReturn(messageConsumer);
when(messageConsumer.receiveNoWait()).thenReturn(message);
when(messageConsumer.receive(anyLong())).thenReturn(message);
destinationName = DESTINATION_NAME;
destinationType = JMSDestinationType.QUEUE;
destinationLocator = JMSDestinationLocator.CDI;
messageSelector = SELECTOR;
batchSize = 10;
pollTimeout = 500L;
context = new Context();
converter = new DefaultJMSMessageConverter.Builder().build(context);
event = converter.convert(message).iterator().next();
userName = Optional.of(USERNAME);
password = Optional.of(PASSWORD);
afterSetup();
}
项目:pooled-jms
文件:PooledConnectionSessionCleanupTest.java
@Test(timeout = 60000)
public void testLingeringPooledSessionsHoldingPrefetchedMessages() throws Exception {
produceMessages();
Session pooledSession1 = pooledConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
pooledSession1.createConsumer(queue);
final QueueViewMBean view = getProxyToQueue(queue.getPhysicalName());
assertTrue("Should have all sent messages in flight:", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return view.getInFlightCount() == MESSAGE_COUNT;
}
}, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(25)));
// While all the message are in flight we should get anything on this consumer.
Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
assertNull(consumer.receive(500));
pooledConn1.close();
assertTrue("Should have only one consumer now:", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return view.getSubscriptions().length == 1;
}
}, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(25)));
// Now we'd expect that the message stuck in the prefetch of the pooled session's
// consumer would be rerouted to the non-pooled session's consumer.
assertNotNull(consumer.receive(10000));
}
项目:pooled-jms
文件:PooledConnectionSessionCleanupTest.java
@Test(timeout = 60000)
public void testNonPooledConnectionCloseNotHoldingPrefetchedMessages() throws Exception {
produceMessages();
Session directSession = directConn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
directSession.createConsumer(queue);
final QueueViewMBean view = getProxyToQueue(queue.getPhysicalName());
assertTrue("Should have all sent messages in flight:", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return view.getInFlightCount() == MESSAGE_COUNT;
}
}, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(25)));
// While all the message are in flight we should get anything on this consumer.
Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
assertNull(consumer.receive(500));
directConn2.close();
assertTrue("Should have only one consumer now:", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return view.getSubscriptions().length == 1;
}
}, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(25)));
// Now we'd expect that the message stuck in the prefetch of the first session's
// consumer would be rerouted to the alternate session's consumer.
assertNotNull(consumer.receive(10000));
}
项目:nifi-jms-jndi
文件:JMSConnectionFactoryProviderTest.java
/**
* This test simply validates that {@link ConnectionFactory} can be setup by
* pointing to the location of the client libraries at runtime. It uses
* ActiveMQ which is not present at the POM but instead pulled from Maven
* repo using {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which
* implies that for this test to run the computer must be connected to the
* Internet. If computer is not connected to the Internet, this test will
* quietly fail logging a message.
*/
@Test
public void validateFactoryCreationWithActiveMQLibraries() throws Exception {
try {
String libPath = TestUtils.setupActiveMqLibForTesting(true);
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI,
"vm://localhost?broker.persistent=false");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
"org.apache.activemq.ActiveMQConnectionFactory");
runner.enableControllerService(cfProvider);
runner.assertValid(cfProvider);
Connection connection = cfProvider.getConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("myqueue");
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = session.createTextMessage("Hello");
producer.send(message);
assertEquals("Hello", ((TextMessage) consumer.receive()).getText());
connection.stop();
connection.close();
} catch (Exception e) {
logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e);
}
}
项目:nifi-jms-jndi
文件:JNDIConnectionFactoryProviderTest.java
/**
* This test simply validates that {@link ConnectionFactory} can be setup by pointing to the location of the client
* libraries at runtime. It uses ActiveMQ which is not present at the POM but instead pulled from Maven repo using
* {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which implies that for this test to run the computer must
* be connected to the Internet. If computer is not connected to the Internet, this test will quietly fail logging a
* message.
*/
@Test
public void validateFactoryCreationWithActiveMQLibraries() throws Exception {
try {
String libPath = TestUtils.setupActiveMqLibForTesting(true);
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JNDIConnectionFactoryProvider cfProvider = new JNDIConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.BROKER_URI,
"vm://localhost?broker.persistent=false");
runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.JNDI_CF_LOOKUP, "ConnectionFactory");
runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath);
runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
runner.enableControllerService(cfProvider);
runner.assertValid(cfProvider);
Connection connection = cfProvider.getConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("myqueue");
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = session.createTextMessage("Hello");
producer.send(message);
assertEquals("Hello", ((TextMessage) consumer.receive()).getText());
connection.stop();
connection.close();
} catch (Exception e) {
logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e);
}
}
项目:testee.fi
文件:TestEEfiSession.java
@Override
public MessageConsumer createConsumer(
final Destination destination,
final String messageSelector
) throws JMSException {
return notImplemented();
}
项目:testee.fi
文件:TestEEfiSession.java
@Override
public MessageConsumer createConsumer(
final Destination destination,
final String messageSelector,
final boolean noLocal
) throws JMSException {
return notImplemented();
}
项目:testee.fi
文件:TestEEfiSession.java
@Override
public MessageConsumer createSharedConsumer(
final Topic topic,
final String sharedSubscriptionName,
final String messageSelector
) throws JMSException {
return notImplemented();
}
项目:testee.fi
文件:TestEEfiSession.java
@Override
public MessageConsumer createDurableConsumer(
final Topic topic,
final String name,
final String messageSelector,
final boolean noLocal
) throws JMSException {
return notImplemented();
}
项目:SpringTutorial
文件:HelloWorldConsumer.java
public static void main(String[] args) {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://Toshiba:61616");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("HELLOWORLD.TESTQ");
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message
Message message = consumer.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
} else {
System.out.println("Received: " + message);
}
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
项目:SpringTutorial
文件:HelloWorldConsumerThread.java
public void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://Toshiba:61616");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(this);
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("HELLOWORLD.TESTQ");
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message
Message message = consumer.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
} else {
System.out.println("Received: " + message);
}
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
项目:aem-orchestrator
文件:AwsConfig.java
@Bean
public MessageConsumer sqsMessageConsumer(final SQSConnection connection) throws JMSException {
/*
* Create the session and use UNORDERED_ACKNOWLEDGE mode. Acknowledging
* messages deletes them from the queue. Each message must be individually
* acknowledged
*/
Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);
return session.createConsumer(session.createQueue(queueName));
}
项目:aem-stack-manager
文件:AwsConfig.java
@Bean
public MessageConsumer sqsMessageConsumer(SQSConnection connection) throws JMSException {
/*
* Create the session and use CLIENT_ACKNOWLEDGE mode. Acknowledging
* messages deletes them from the queue
*/
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
return session.createConsumer(session.createQueue(queueName));
}
项目:message-broker
文件:SslTransportTest.java
@Parameters({ "broker-ssl-port"})
@Test
public void testConsumerProducerWithSsl(String port) throws Exception {
String queueName = "testConsumerProducerWithAutoAck";
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder("admin", "admin", "localhost", port)
.enableSsl()
.withQueue(queueName)
.build();
ConnectionFactory connectionFactory
= (ConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
Connection connection = connectionFactory.createConnection();
connection.start();
// publish 100 messages
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = producerSession.createQueue(queueName);
MessageProducer producer = producerSession.createProducer(queue);
int numberOfMessages = 100;
for (int i = 0; i < numberOfMessages; i++) {
producer.send(producerSession.createTextMessage("Test message " + i));
}
producerSession.close();
// Consume published messages
Session subscriberSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination subscriberDestination = (Destination) initialContextForQueue.lookup(queueName);
MessageConsumer consumer = subscriberSession.createConsumer(subscriberDestination);
for (int i = 0; i < numberOfMessages; i++) {
Message message = consumer.receive(1000);
Assert.assertNotNull(message, "Message #" + i + " was not received");
}
connection.close();
}
项目:java-jms
文件:TracingActiveMQTest.java
@Test
public void sendAndReceive() throws Exception {
Destination destination = session.createQueue("TEST.FOO");
MessageProducer messageProducer = session.createProducer(destination);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Instrument MessageProducer with TracingMessageProducer
TracingMessageProducer producer =
new TracingMessageProducer(messageProducer, mockTracer);
MessageConsumer messageConsumer = session.createConsumer(destination);
// Instrument MessageConsumer with TracingMessageConsumer
TracingMessageConsumer consumer = new TracingMessageConsumer(messageConsumer, mockTracer);
TextMessage message = session.createTextMessage("Hello world");
producer.send(message);
TextMessage received = (TextMessage) consumer.receive(5000);
assertEquals("Hello world", received.getText());
List<MockSpan> mockSpans = mockTracer.finishedSpans();
assertEquals(2, mockSpans.size());
checkSpans(mockSpans);
assertNull(mockTracer.activeSpan());
}
项目:JavaSamples
文件:MessageListenerContainer.java
@Override
protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer) throws JMSException {
//消费MQ限速
if (!rateLimiter.tryAcquire()) {
return false;
}
return super.receiveAndExecute(invoker, session, consumer);
}
项目:ats-framework
文件:JmsClient.java
private void doCleanupQueue( final Session session, final Destination destination ) throws JMSException {
try {
MessageConsumer consumer = session.createConsumer(destination);
Message message = null;
do {
message = consumer.receiveNoWait();
if (message != null) {
message.acknowledge();
}
} while (message != null);
} finally {
releaseSession(false);
}
}
项目:java-jms
文件:TracingJmsTemplate.java
@Override
protected MessageConsumer createConsumer(Session session, Destination destination,
String messageSelector)
throws JMSException {
return new TracingMessageConsumer(super.createConsumer(session, destination, messageSelector),
tracer);
}
项目:solace-integration-guides
文件:JNDIConnectionFactoryProviderTest.java
/**
* This test simply validates that {@link ConnectionFactory} can be setup by pointing to the location of the client
* libraries at runtime. It uses ActiveMQ which is not present at the POM but instead pulled from Maven repo using
* {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which implies that for this test to run the computer must
* be connected to the Internet. If computer is not connected to the Internet, this test will quietly fail logging a
* message.
*/
@Test
public void validateFactoryCreationWithActiveMQLibraries() throws Exception {
try {
String libPath = TestUtils.setupActiveMqLibForTesting(true);
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JNDIConnectionFactoryProvider cfProvider = new JNDIConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.BROKER_URI,
"vm://localhost?broker.persistent=false");
runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.JNDI_CF_LOOKUP, "ConnectionFactory");
runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath);
runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
runner.enableControllerService(cfProvider);
runner.assertValid(cfProvider);
Connection connection = cfProvider.getConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("myqueue");
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = session.createTextMessage("Hello");
producer.send(message);
assertEquals("Hello", ((TextMessage) consumer.receive()).getText());
connection.stop();
connection.close();
} catch (Exception e) {
logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e);
}
}
项目:jmsclient
文件:JMSTopicClient.java
@Override
public MessageConsumer createSubscriber() throws JMSException
{
TopicSubscriber recv = ((TopicSession) session).createSubscriber((Topic) topic, messageSelector, true);
log.debug("Created non-durable subscriber");
return recv;
}
项目:servicebuilder
文件:ActiveMqListener.java
@Override
public void requeueFailedMessages() {
try {
ActiveMQConnection connection = ActiveMqUtils.openConnection(user, password, url);
Session session = ActiveMqUtils.startSession(connection);
int count = getQueueSize(session, queueError);
if (count < 1) {
return;
}
log.info("Requeuing {} failed messages...", count);
Queue queueErr = session.createQueue(queueError);
MessageConsumer consumer = session.createConsumer(queueErr);
Queue queueRetry = session.createQueue(queueInput);
MessageProducer producer = session.createProducer(queueRetry);
for (int consumed = 0; consumed < count; consumed++) {
TextMessage message = (TextMessage) consumer.receive(REQUEUE_TIMEOUT);
if (message == null) {
continue;
}
String text = message.getText();
String requestId = message.getJMSCorrelationID();
log.info("Requeuing message '{}'", text);
try {
TextMessage newMessage = session.createTextMessage(text);
newMessage.setJMSCorrelationID(requestId);
producer.send(newMessage);
} catch (Exception e) {
log.error("Failed to requeue message", e);
}
message.acknowledge();
session.commit();
}
producer.close();
consumer.close();
} catch (JMSException ex) {
throw new MessageQueueException("Failed to requeue failed messages", ex);
}
}
项目:pooled-jms
文件:JmsPoolSession.java
@Override
public MessageConsumer createConsumer(Destination destination) throws JMSException {
return addConsumer(getInternalSession().createConsumer(destination));
}
项目:pooled-jms
文件:JmsPoolSession.java
@Override
public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
return addConsumer(getInternalSession().createConsumer(destination, selector));
}
项目:pooled-jms
文件:JmsPoolSession.java
@Override
public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
}
项目:pooled-jms
文件:JmsPoolSession.java
@Override
public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector) throws JMSException {
PooledSessionHolder state = safeGetSessionHolder();
state.getConnection().checkClientJMSVersionSupport(2, 0);
return addConsumer(state.getSession().createSharedConsumer(topic, sharedSubscriptionName, messageSelector));
}
项目:pooled-jms
文件:JmsPoolSession.java
@Override
public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException {
PooledSessionHolder state = safeGetSessionHolder();
state.getConnection().checkClientJMSVersionSupport(2, 0);
return addConsumer(state.getSession().createDurableConsumer(topic, name));
}