private void putTopic(List<String> events) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_BIND_URL); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic(DESTINATION_NAME); MessageProducer producer = session.createProducer(destination); for (String event : events) { TextMessage message = session.createTextMessage(); message.setText(event); producer.send(message); } session.commit(); session.close(); connection.close(); }
@Test public void testFailedCreateConsumerConnectionStillWorks() throws JMSException { Connection connection = pooledConnFact.createConnection("guest", "password"); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(name.getMethodName()); try { session.createConsumer(queue); fail("Should fail to create consumer"); } catch (JMSSecurityException ex) { LOG.info("Caught expected security error"); } queue = session.createQueue("GUESTS." + name.getMethodName()); MessageProducer producer = session.createProducer(queue); producer.close(); connection.close(); }
public JmsPoolMessageProducer(JmsPoolSession session, MessageProducer messageProducer, Destination destination, boolean shared) throws JMSException { this.session = session; this.messageProducer = messageProducer; this.destination = destination; this.shared = shared; this.anonymousProducer = destination == null; this.deliveryMode = messageProducer.getDeliveryMode(); this.disableMessageID = messageProducer.getDisableMessageID(); this.disableMessageTimestamp = messageProducer.getDisableMessageTimestamp(); this.priority = messageProducer.getPriority(); this.timeToLive = messageProducer.getTimeToLive(); if (session.isJMSVersionSupported(2, 0)) { this.deliveryDelay = messageProducer.getDeliveryDelay(); } }
public void sendMessages(ConnectionFactory connectionFactory) throws Exception { for (int i = 0; i < NUM_MESSAGES; i++) { Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(QUEUE); MessageProducer producer = session.createProducer(destination); String msgTo = "hello"; TextMessage message = session.createTextMessage(msgTo); producer.send(message); connection.close(); LOG.debug("sent " + i + " messages using " + connectionFactory.getClass()); } }
private void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException, InterruptedException { Connection connection = cf.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryQueue tempQueue = session.createTemporaryQueue(); TextMessage msg = session.createTextMessage("Request"); msg.setJMSReplyTo(tempQueue); MessageProducer producer = session.createProducer(session.createQueue(serviceQueue)); producer.send(msg); MessageConsumer consumer = session.createConsumer(tempQueue); Message replyMsg = consumer.receive(); assertNotNull(replyMsg); LOG.debug("Reply message: {}", replyMsg); consumer.close(); producer.close(); session.close(); connection.close(); }
public void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory, String queueName) throws JMSException { Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); final javax.jms.Message inMessage = consumer.receive(); String requestMessageId = inMessage.getJMSMessageID(); LOG.debug("Received message " + requestMessageId); final TextMessage replyMessage = session.createTextMessage("Result"); replyMessage.setJMSCorrelationID(inMessage.getJMSMessageID()); final MessageProducer producer = session.createProducer(inMessage.getJMSReplyTo()); LOG.debug("Sending reply to " + inMessage.getJMSReplyTo()); producer.send(replyMessage); producer.close(); consumer.close(); session.close(); connection.close(); }
private void testReception( final TestClass i, final Destination destination, final ThrowingConsumer<Destination> destinationCheck ) throws Exception { final Connection connection = i.connectionFactory.createConnection(); assertNotNull(connection); final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); assertNotNull(session); final MessageProducer producer = session.createProducer(destination); assertNotNull(producer); final TextMessage message = session.createTextMessage(); assertNotNull(message); message.setText("I am IronMan"); producer.send(message); final List<ReceivedJmsMessage> messages = i.testQueue.drainReceivedMessages(); assertEquals(1, messages.size()); final ReceivedJmsMessage receivedMessage = messages.get(0); destinationCheck.accept(receivedMessage.getDestination()); assertTrue(receivedMessage.getJmsMessage() instanceof TextMessage); final TextMessage receivedTextMessage = (TextMessage) receivedMessage.getJmsMessage(); assertEquals("I am IronMan", receivedTextMessage.getText()); }
private void doSendTextMessage( final Session session, final Destination destination, final String textMessage, final Map<String, ?> properties ) throws JMSException { try { final Message message = textMessage != null ? session.createTextMessage(textMessage) : session.createTextMessage(); if (properties != null) { // Note: Setting any properties (including JMS fields) using // setObjectProperty might not be supported by all providers // Tested with: ActiveMQ for (final Entry<String, ?> property : properties.entrySet()) { message.setObjectProperty(property.getKey(), property.getValue()); } } final MessageProducer producer = session.createProducer(destination); producer.send(message); } finally { releaseSession(false); } }
private void doSendBinaryMessage( final Session session, final Destination destination, final byte[] bytes, final Map<String, ?> properties ) throws JMSException { try { BytesMessage message = session.createBytesMessage(); message.writeBytes(bytes); if (properties != null) { // Note: Setting any properties (including JMS fields) using // setObjectProperty might not be supported by all providers // Tested with: ActiveMQ for (final Entry<String, ?> property : properties.entrySet()) { message.setObjectProperty(property.getKey(), property.getValue()); } } final MessageProducer producer = session.createProducer(destination); producer.send(message); } finally { releaseSession(false); } }
public static MessageProducer createMessageProducer( Session session, Destination destination, MessageProducerOption producerOption) throws JMSException { MessageProducer producer = session.createProducer(destination); producer.setDeliveryDelay(producerOption.getDeliveryDelay()); producer.setDeliveryMode(producerOption.getDeliveryMode()); producer.setDisableMessageTimestamp(producerOption.isDisableMessageTimestamp()); producer.setDisableMessageID(producerOption.isDisableMessageId()); producer.setPriority(producerOption.getPriority()); producer.setTimeToLive(producerOption.getTimeToLive()); return producer; }
/** * @param session * @param messageProducer * @param dupMessageDetectStrategy * @return * @throws JMSException */ public static SimpleJmsMessageSender create( Session session, MessageProducer messageProducer, DupMessageDetectStrategy dupMessageDetectStrategy ) throws JMSException { SimpleJmsMessageSender messageSender = new SimpleJmsMessageSender(); messageSender.setMessageProducer(messageProducer); messageSender.setSession(session); if (dupMessageDetectStrategy != null) { messageSender.setDupMessageDetectStrategy(dupMessageDetectStrategy); } return messageSender; }
/** * This test simply validates that {@link ConnectionFactory} can be setup by * pointing to the location of the client libraries at runtime. It uses * ActiveMQ which is not present at the POM but instead pulled from Maven * repo using {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which * implies that for this test to run the computer must be connected to the * Internet. If computer is not connected to the Internet, this test will * quietly fail logging a message. */ @Test public void validateFactoryCreationWithActiveMQLibraries() throws Exception { try { String libPath = TestUtils.setupActiveMqLibForTesting(true); TestRunner runner = TestRunners.newTestRunner(mock(Processor.class)); JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); runner.addControllerService("cfProvider", cfProvider); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "vm://localhost?broker.persistent=false"); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, "org.apache.activemq.ActiveMQConnectionFactory"); runner.enableControllerService(cfProvider); runner.assertValid(cfProvider); Connection connection = cfProvider.getConnectionFactory().createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue("myqueue"); MessageProducer producer = session.createProducer(queue); MessageConsumer consumer = session.createConsumer(queue); TextMessage message = session.createTextMessage("Hello"); producer.send(message); assertEquals("Hello", ((TextMessage) consumer.receive()).getText()); connection.stop(); connection.close(); } catch (Exception e) { logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e); } }
public static void main(String[] args) throws Exception { Connection connection = null; String csvData = System.getProperty(CSVDATA); if(CSVDATA == null || CSVDATA.equals("")) throw new RuntimeException("LoyaltyCardManager.main() must pass the "+CSVDATA +" system property With format OPERATION;USERID;FIRSTNAME;LASTNAME;TRXID;TRXFEESAMOUNT;CURRENCY"); System.out.println("LoyaltyCardManager() will connect to router: "+ROUTER_URL+" : at the following address: "+QUEUE_NAME); ConnectionFactory connectionFactory = new JmsConnectionFactory(ROUTER_URL); try { // Step 1. Create an AMQP qpid connection connection = connectionFactory.createConnection(); // Step 2. Create a JMS session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Step 3. Create a Producer Queue fidelityRequestQueue = session.createQueue(QUEUE_NAME); MessageProducer beosbankFidelityRequestProducer = session.createProducer(fidelityRequestQueue); // Step 4. send a CSV Text Data on user transactions beosbankFidelityRequestProducer.send(session.createTextMessage(csvData)); System.out.println("\nmessage sent:"+ csvData+" \n"); } finally { if (connection != null) { // Step 9. close the connection connection.close(); } } }
private void sendObjectMsgSingleSession(List<? extends Serializable> objectsToSend) throws JMSException { Session session = null; Connection conn = null; try { conn = qFactory.createConnection(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(queue); for (Serializable objectToSend : objectsToSend) { ObjectMessage msg = session.createObjectMessage(); msg.setObject(objectToSend); producer.send(msg); } } finally { closeSession(session); closeConnection(conn); } }
/** Writes a JMS Message to the destination, as defined in the configuration file. * @param session the JMS Session. * @param message the JMS Message. * @param destinationName the Destination to write to. * @throws FrameworkException Indicates some system error. * @throws ApplicationExceptions Indicates application error(s). */ static void send(Session session, Message message, String destinationName) throws FrameworkException, ApplicationExceptions { try { Destination destination = obtainDestination(destinationName); MessageProducer producer = session.createProducer(destination); int messagePriority = message.getJMSPriority(); if (messagePriority != producer.getPriority() && messagePriority >= 0 && messagePriority <= 9) producer.setPriority(messagePriority); producer.send(message); if (log.isDebugEnabled()) log.debug("Sent message " + message + " to " + destinationName); } catch (JMSException e) { log.error("Error in sending a JMS Message", e); throw new JaffaMessagingFrameworkException(JaffaMessagingFrameworkException.SEND_ERROR, null, e); } }
private static void sendMessage(Session session, MessageProducer producer) throws JMSException { /* for (int i = 0; i < SEND_NUMBER; i++) { TextMessage message = session.createTextMessage("ActiveMQ Send Message:"+i); System.out.println("SendMessage:"+""+i); // 发送消息到目的地方 producer.send(message); }//*/ int i = 0; do{ TextMessage message = session.createTextMessage("ActiveMQ Send Message:"+i); System.out.println("SendMessage:"+""+i); // 发送消息到目的地方 producer.send(message); i++; if(i > SEND_NUMBER) break; }while(true); }
public static void send(String queueName, String text, int delayMillis) { EXECUTOR.submit(() -> { try { logger.info("*** artificial delay {}: {}", queueName, delayMillis); Thread.sleep(delayMillis); Connection connection = getConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); TextMessage message = session.createTextMessage(text); producer.send(message); logger.info("*** sent message {}: {}", queueName, text); session.close(); } catch (Exception e) { throw new RuntimeException(e); } }); }
private void putQueue(List<String> events) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_BIND_URL); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(DESTINATION_NAME); MessageProducer producer = session.createProducer(destination); for (String event : events) { TextMessage message = session.createTextMessage(); message.setText(event); producer.send(message); } session.commit(); session.close(); connection.close(); }
@Override public void plaatsAfnemerberichten(final List<AfnemerBericht> afnemerBerichten) { try { afnemersJmsTemplate.execute((final Session session, final MessageProducer producer) -> { for (final AfnemerBericht afnemerBericht : afnemerBerichten) { final SynchronisatieBerichtGegevens synchronisatieBerichtGegevens = afnemerBericht.getSynchronisatieBerichtGegevens(); final Message message = session.createTextMessage(serializer.serialiseerNaarString(synchronisatieBerichtGegevens)); message.setStringProperty(LeveringConstanten.JMS_MESSAGEGROUP_HEADER, String.valueOf(synchronisatieBerichtGegevens.getArchiveringOpdracht().getOntvangendePartijId())); final ToegangLeveringsAutorisatie toegang = afnemerBericht.getToegangLeveringsAutorisatie(); LOGGER.info("Zet bericht op de queue voor afnemer {} en kanaal {}", toegang.getGeautoriseerde().getPartij().getNaam(), toegang.getLeveringsautorisatie().getStelsel()); producer.send(message); } return null; }); } catch (final JmsException e) { LOGGER.error("fout in verzenden berichten naar afnemer queue", e); throw new BrpServiceRuntimeException(e); } }
@Override public void plaatsVrijBericht(final VrijBerichtGegevens vrijBerichtGegevens) { try { vrijBerichtJmsTemplate.execute((final Session session, final MessageProducer producer) -> { final Partij partij = vrijBerichtGegevens.getPartij(); final Stelsel stelsel = vrijBerichtGegevens.getStelsel(); final Message message = session.createTextMessage(serializer.serialiseerNaarString(vrijBerichtGegevens)); message.setStringProperty(LeveringConstanten.JMS_MESSAGEGROUP_HEADER, String.valueOf(partij.getCode())); LOGGER.info("Zet bericht op de queue voor vrij bericht ontvanger {} en kanaal {}", partij.getNaam(), stelsel); producer.send(message); return null; }); } catch (final JmsException e) { LOGGER.error("fout in verzenden berichten naar vrije berichten queue", e); throw new BrpServiceRuntimeException(e); } }
@Override public void publiceerMaakSelectieResultaatTaken(List<MaakSelectieResultaatTaak> maakSelectieResultaatTaken) { LOGGER.info("publiceer maak selectie resultaat taken"); final ProducerCallback<Void> producerCallback = (final Session session, final MessageProducer producer) -> { for (final MaakSelectieResultaatTaak maakSelectieResultaatTaak : maakSelectieResultaatTaken) { LOGGER.debug("publiceer maak selectie resultaat taak"); final String groupId = maakSelectieResultaatTaak.getSelectieRunId() + "_" + maakSelectieResultaatTaak.getToegangLeveringsAutorisatieId() + "_" + maakSelectieResultaatTaak.getDienstId(); final Message message = session.createTextMessage(serializer.serialiseerNaarString(maakSelectieResultaatTaak)); message.setStringProperty(LeveringConstanten.JMS_MESSAGEGROUP_HEADER, groupId); producer.send(message); } return null; }; PublicatieHelper.publiceer(maakSelectieResultaatTemplate, producerCallback, () -> "fout in verzenden berichten naar maak selectie resultaat taak queue"); }
@Override public void publiceerMaakSelectieGeenResultaatNetwerkTaak(List<MaakSelectieResultaatTaak> maakSelectieGeenResultaatNetwerkTaken) { LOGGER.info("publiceer maak selectie resultaat taken"); final ProducerCallback<Void> producerCallback = (final Session session, final MessageProducer producer) -> { LOGGER.debug("publiceer maak selectie resultaat taak"); for (final MaakSelectieResultaatTaak maakSelectieGeenResultaatNetwerkTaak : maakSelectieGeenResultaatNetwerkTaken) { final String groupId = maakSelectieGeenResultaatNetwerkTaak.getSelectieRunId() + "_" + maakSelectieGeenResultaatNetwerkTaak .getToegangLeveringsAutorisatieId() + "_" + maakSelectieGeenResultaatNetwerkTaak.getDienstId(); final Message message = session.createTextMessage(serializer.serialiseerNaarString(maakSelectieGeenResultaatNetwerkTaak)); message.setStringProperty(LeveringConstanten.JMS_MESSAGEGROUP_HEADER, groupId); producer.send(message); } return null; }; PublicatieHelper.publiceer(maakSelectieGeenResultaatNetwerkTemplate, producerCallback, () -> "fout in verzenden berichten naar maak selectie geen resultaat netwerk taak queue"); }
private void handleMessage(MessageHandler handler, Message message, Session session) { if (! (message instanceof TextMessage)) { return; } TextMessage textMessage = (TextMessage) message; String text = null; String requestId = UUID.randomUUID().toString(); try { text = textMessage.getText(); if (StringUtils.isNotEmpty(message.getJMSCorrelationID())) { requestId = message.getJMSCorrelationID(); } MDC.put(X_OBOS_REQUEST_ID, requestId); log.info("Received message '{}'", text); handler.handle(new ObjectMapper().readTree(text)); } catch (Exception e) { log.error("Failed to process message", e); try { TextMessage errorMessage = session.createTextMessage(text); errorMessage.setJMSCorrelationID(requestId); Queue queue = session.createQueue(queueError); MessageProducer errorProducer = session.createProducer(queue); errorProducer.send(errorMessage); } catch (JMSException jmse) { log.error("Failed to create error message", jmse); } } finally { MDC.remove(X_OBOS_REQUEST_ID); } }
private MessageProducer getMessageProducer(Destination destination) throws JMSException { MessageProducer result = null; if (useAnonymousProducers) { result = safeGetSessionHolder().getOrCreateProducer(); } else { result = getInternalSession().createProducer(destination); } return result; }
public MessageProducer getOrCreateProducer() throws JMSException { if (producer == null) { synchronized (this) { if (producer == null) { producer = session.createProducer(null); } } } return producer; }
private void sendMessage(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener listener) throws JMSException { MessageProducer messageProducer = getMessageProducer(); // Only one thread can use the producer at a time to allow for dynamic configuration // changes to match what's been configured here. synchronized (messageProducer) { long oldDelayValue = 0; if (deliveryDelay != 0 && session.isJMSVersionSupported(2, 0)) { oldDelayValue = messageProducer.getDeliveryDelay(); messageProducer.setDeliveryDelay(deliveryDelay); } // For the non-shared MessageProducer that is also not an anonymous producer we // need to call the send method for an explicit MessageProducer otherwise we // would be violating the JMS specification in regards to send calls. // // In all other cases we create an anonymous producer so we call the send with // destination parameter version. try { if (!shared && !anonymousProducer) { if (listener == null) { messageProducer.send(message, deliveryMode, priority, timeToLive); } else { messageProducer.send(message, deliveryMode, priority, timeToLive, listener); } } else { if (listener == null) { messageProducer.send(destination, message, deliveryMode, priority, timeToLive); } else { messageProducer.send(destination, message, deliveryMode, priority, timeToLive, listener); } } } finally { if (deliveryDelay != 0 && session.isJMSVersionSupported(2, 0)) { messageProducer.setDeliveryDelay(oldDelayValue); } } } }
public MessageProducer getMessageProducer() throws JMSRuntimeException { try { return producer.getMessageProducer(); } catch (JMSException jmsex) { throw JMSExceptionSupport.createRuntimeException(jmsex); } }
@Test public void testToString() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection(); Session session = connection.createSession(); Queue queue = session.createTemporaryQueue(); MessageProducer producer = session.createProducer(queue); assertNotNull(producer.toString()); }
@Test public void testCloseMoreThanOnce() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection(); Session session = connection.createSession(); Queue queue = session.createTemporaryQueue(); MessageProducer producer = session.createProducer(queue); producer.close(); producer.close(); }
@Test public void testNullDestinationOnSendToAnonymousProducer() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection(); Session session = connection.createSession(); MessageProducer producer = session.createProducer(null); try { producer.send(null, session.createMessage()); fail("Should not be able to send with null destination"); } catch (InvalidDestinationException ide) {} }
@Test public void testNullDestinationOnSendToTargetedProducer() throws JMSException { JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection(); Session session = connection.createSession(); MessageProducer producer = session.createProducer(session.createTemporaryQueue()); try { producer.send(null, session.createMessage()); fail("Should not be able to send with null destination"); } catch (InvalidDestinationException ide) {} }
@Override public MessageProducer createProducer(Destination destination) throws JMSException { if (destination instanceof Topic) { throw new IllegalStateException("Operation not supported by a QueueSession"); } return super.createProducer(destination); }
/** * @see javax.jms.Session#createProducer(javax.jms.Destination) */ @Override public MessageProducer createProducer(Destination destination) throws JMSException { if (destination instanceof Queue) { throw new IllegalStateException("Operation not supported by a TopicSession"); } return super.createProducer(destination); }
@Test public void testFailedCreateConsumerConnectionStillWorks() throws JMSException { // User can write but not read user.setCanConsumeAll(false); Connection connection = null; try { connection = cf.createConnection("admin", "admin"); } catch (JMSSecurityException jmsse) { fail("Should not be able to create connection using bad credentials"); } connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("test"); try { session.createConsumer(queue); fail("Should fail to create consumer"); } catch (JMSSecurityException ex) { LOG.debug("Caught expected security error"); } MessageProducer producer = session.createProducer(queue); producer.close(); connection.close(); }
private void produceMessages() throws Exception { Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(queue); for (int i = 0; i < MESSAGE_COUNT; ++i) { producer.send(session.createTextMessage("Test Message: " + i)); } producer.close(); }
@Test(timeout = 60000) public void testRepeatedCreateSessionProducerResultsInSame() throws Exception { JmsPoolConnection connection = (JmsPoolConnection) pooledFactory.createConnection(); assertTrue(pooledFactory.isUseAnonymousProducers()); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("test-topic"); JmsPoolMessageProducer producer = (JmsPoolMessageProducer) session.createProducer(destination); MessageProducer original = producer.getMessageProducer(); assertNotNull(original); session.close(); assertEquals(1, brokerService.getAdminView().getDynamicDestinationProducers().length); for (int i = 0; i < 20; ++i) { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = (JmsPoolMessageProducer) session.createProducer(destination); assertSame(original, producer.getMessageProducer()); session.close(); } assertEquals(1, brokerService.getAdminView().getDynamicDestinationProducers().length); connection.close(); pooledFactory.clear(); }
protected void doPublish(Event event) throws EdsException { Connection conn = null; Session session = null; MessageProducer messageProducer = null; try { LOG.debug("eds pub 3 mq in -[event:" + event + "]"); conn = connectionFactory.createConnection(); // 设置非事务,客户端确认方式 session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); MapMessage mapMessage = session.createMapMessage(); mapMessage = EventConverter.convertToMessage(mapMessage, event); Destination dest = getDestination(event.getName(), session); messageProducer = session.createProducer(dest); messageProducer.send(mapMessage); // commit session if necessary if (session.getTransacted()) { session.commit(); } LOG.debug("eds pub 4 mq ok -[conn:" + conn + ",session:" + session + ",event:" + event + "]"); }catch(JMSException e){ throw new EdsException("eds client activemq doPublish exception ", e); }finally { releaseSession(session); releaseMessageProducer(messageProducer); releaseConnection(conn, false); } }
private static void releaseMessageProducer(MessageProducer producer) { if (producer == null) { return; } try { producer.close(); } catch (Throwable ex) { LOG.warn("Could not close JMS MessageProducer", ex); } }