/** * Generates JMS messages */ @PostConstruct public void generateMessages() throws JMSException { for (int i = 0; i < messageCount; i++) { final int index = i; final String text = "Message number is " + i + "."; template.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage(text); message.setIntProperty(MESSAGE_COUNT, index); System.out.println("Sending message: " + text); return message; } }); } }
@Override public void onMessage(final Message message) { // 使用线程池多线程处理 threadPoolTaskExecutor.execute(new Runnable() { @Override public void run() { TextMessage textMessage = (TextMessage) message; try { String text = textMessage.getText(); LOGGER.info("消费:{}", text); } catch (Exception e) { e.printStackTrace(); } } }); }
@Test public void msgDeployment() throws Exception{ TextMessage message = mock(TextMessage.class); when(message.getStringProperty("source")).thenReturn("deployment"); String msgJson = gson.toJson(createCmsDeployment("active")); when(message.getText()).thenReturn(msgJson); listener.onMessage(message); //once more to get a SKIP when(message.getText()).thenReturn(gson.toJson(createCmsDeployment("is-not-active"))); listener.onMessage(message); listener.cleanup(); listener.getConnectionStats(); }
public void sendMessages(ConnectionFactory connectionFactory) throws Exception { for (int i = 0; i < NUM_MESSAGES; i++) { Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(QUEUE); MessageProducer producer = session.createProducer(destination); String msgTo = "hello"; TextMessage message = session.createTextMessage(msgTo); producer.send(message); connection.close(); LOG.debug("sent " + i + " messages using " + connectionFactory.getClass()); } }
private void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException, InterruptedException { Connection connection = cf.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryQueue tempQueue = session.createTemporaryQueue(); TextMessage msg = session.createTextMessage("Request"); msg.setJMSReplyTo(tempQueue); MessageProducer producer = session.createProducer(session.createQueue(serviceQueue)); producer.send(msg); MessageConsumer consumer = session.createConsumer(tempQueue); Message replyMsg = consumer.receive(); assertNotNull(replyMsg); LOG.debug("Reply message: {}", replyMsg); consumer.close(); producer.close(); session.close(); connection.close(); }
public void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory, String queueName) throws JMSException { Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); final javax.jms.Message inMessage = consumer.receive(); String requestMessageId = inMessage.getJMSMessageID(); LOG.debug("Received message " + requestMessageId); final TextMessage replyMessage = session.createTextMessage("Result"); replyMessage.setJMSCorrelationID(inMessage.getJMSMessageID()); final MessageProducer producer = session.createProducer(inMessage.getJMSReplyTo()); LOG.debug("Sending reply to " + inMessage.getJMSReplyTo()); producer.send(replyMessage); producer.close(); consumer.close(); session.close(); connection.close(); }
/** * Publish ci state message. * * @param event the event */ public void publishCiStateMessage(CiChangeStateEvent event) { try { TextMessage message = session.createTextMessage(gson.toJson(event)); message.setLongProperty("ciId", event.getCiId()); message.setStringProperty("type", "ci-change-state"); message.setStringProperty("ciState", event.getNewState()); producer.send(message); logger.info("Published: ciId:" + event.getCiId()); logger.info(message.getText()); } catch (JMSException e) { // TODO see if we can put some durability here logger.error("caught Exception publishing",e); } }
private void send(Object task, String queue, Status status, String reason, String detail) { jmsTemplate.send(queue, (Session session) -> { TextMessage message = session.createTextMessage(); message.setText(taskMapper.map(task)); if (!Strings.isNullOrEmpty(header)) { setStringProperty(message, header, ExternalTaskManager.toTaskName(task.getClass())); } if (status != null) { setStringProperty(message, Headers.STATUS, status.name()); } if (!Strings.isNullOrEmpty(reason)) { setStringProperty(message, Headers.REASON, reason); } if (!Strings.isNullOrEmpty(detail)) { setStringProperty(message, Headers.DETAIL, detail); } return message; }); }
@Override public void run(String... strings) throws Exception { final String messageText = "Request"; LOG.info("============= Sending: " + messageText); this.jmsTemplate.send("example", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { Queue replyQueue = session.createQueue("reply-queue"); TextMessage message = session.createTextMessage(messageText); message.setJMSCorrelationID(correlationID.toString()); message.setJMSReplyTo(replyQueue); return message; } }); }
/** * @param topicConnection * @param chatTopic * @param userId * @throws JMSException * @throws IOException */ void publish(TopicConnection topicConnection, Topic chatTopic, String userId) throws JMSException, IOException { TopicSession tsession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher topicPublisher = tsession.createPublisher(chatTopic); topicConnection.start(); BufferedReader reader = new BufferedReader(new InputStreamReader( System.in)); while (true) { String msgToSend = reader.readLine(); if (msgToSend.equalsIgnoreCase("exit")) { topicConnection.close(); System.exit(0); } else { TextMessage msg = (TextMessage) tsession.createTextMessage(); msg.setText("\n["+userId + " : " + msgToSend+"]"); topicPublisher.publish(msg); } } }
private void startConsumer() { try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination; if ("topic".equalsIgnoreCase(destinationType)) { destination = session.createTopic(destinationName); } else { destination = session.createQueue(destinationName); } consumer = session.createConsumer(destination); isStarted.compareAndSet(false, true); while (true) { Message message = consumer.receive(); if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); if (isRecording.get()) { addData(message, text); } counter.incrementAndGet(); } } } catch (Exception e) { //e.printStackTrace(); } finally { terminate(); } }
/** * Implementation of <code>MessageListener</code>. */ public void onMessage(Message message) { try { int messageCount = message.getIntProperty(JmsMessageProducer.MESSAGE_COUNT); if (message instanceof TextMessage) { TextMessage tm = (TextMessage)message; String msg = tm.getText(); System.out.println("Processed message '{}'. value={}"+ msg+ messageCount); counter.incrementAndGet(); } } catch (JMSException e) { e.printStackTrace(); } }
/** * Convert a message into a Kafka Connect SourceRecord. * * @param context the JMS context to use for building messages * @param topic the Kafka topic * @param messageBodyJms whether to interpret MQ messages as JMS messages * @param message the message * * @return the Kafka Connect SourceRecord * * @throws JMSException Message could not be converted */ @Override public SourceRecord toSourceRecord(JMSContext context, String topic, boolean messageBodyJms, Message message) throws JMSException { byte[] payload; if (message instanceof BytesMessage) { payload = message.getBody(byte[].class); } else if (message instanceof TextMessage) { String s = message.getBody(String.class); payload = s.getBytes(UTF_8); } else { log.error("Unsupported JMS message type {}", message.getClass()); throw new ConnectException("Unsupported JMS message type"); } SchemaAndValue sv = converter.toConnectData(topic, payload); return new SourceRecord(null, null, topic, sv.schema(), sv.value()); }
@Test public void testQueueOperations() throws Exception { String queueName = "DEMO.TEST"; QueueUtils.send(queueName, "first", 0); QueueConsumer consumer = new QueueConsumer(queueName); String text = consumer.waitForNextMessage(); assertEquals("first", text); QueueUtils.send(queueName, "second", 0); QueueUtils.send(queueName, "third", 0); consumer.purgeMessages(); QueueUtils.send(queueName, "foo", 25); consumer.setMessageListener(m -> { TextMessage tm = (TextMessage) m; try { logger.info("*** received message: {}", tm.getText()); assertEquals("foo", tm.getText()); passed = true; consumer.stop(); } catch (JMSException e) { throw new RuntimeException(e); } }); QueueUtils.waitUntilStopped(); assertTrue(passed); }
@BeforeEach public void setUp() throws JMSException { initMocks(this); when(mockEvent.getTarget()).thenReturn(of(rdf.createIRI("trellis:repository/resource"))); when(mockEvent.getAgents()).thenReturn(singleton(Trellis.AdministratorAgent)); when(mockEvent.getCreated()).thenReturn(time); when(mockEvent.getIdentifier()).thenReturn(rdf.createIRI("urn:test")); when(mockEvent.getTypes()).thenReturn(singleton(AS.Update)); when(mockEvent.getTargetTypes()).thenReturn(singleton(LDP.RDFSource)); when(mockEvent.getInbox()).thenReturn(empty()); when(mockConnection.createSession(anyBoolean(), eq(AUTO_ACKNOWLEDGE))).thenReturn(mockSession); when(mockSession.createQueue(eq(queueName))).thenReturn(mockQueue); when(mockSession.createTextMessage(anyString())).thenReturn(mockMessage); when(mockSession.createProducer(any(Queue.class))).thenReturn(mockProducer); doNothing().when(mockProducer).send(any(TextMessage.class)); }
@Override public void onMessage(Message msg) { if (msg instanceof TextMessage) { TextMessage textMessage = (TextMessage) msg; String text = ""; try { text = textMessage.getText(); } catch (JMSException e) { logger.error(e.getMessage()); e.printStackTrace(); } System.out.println("Received: " + text); } else { System.out.println("Received: " + msg); } }
@Test public void testSendPersistentMessage() throws Exception { cut.setPersistent(true); cut.setMessageConverter(null); cut.postConstruct(); TopicConnection connection = mock(TopicConnection.class); when(connectionFactory.createTopicConnection()).thenReturn(connection); TopicSession transactionalSession = mock(TopicSession.class); when(connection.createTopicSession(true, Session.SESSION_TRANSACTED)) .thenReturn(transactionalSession); when(transactionalSession.createPublisher(topic)).thenReturn(publisher); TextMessage jmsMessage = mock(TextMessage.class); when(transactionalSession.createTextMessage(any())).thenReturn(jmsMessage); ArgumentCaptor<Message> jmsMsgCapture = ArgumentCaptor.forClass(Message.class); doNothing().when(publisher).publish(jmsMsgCapture.capture()); eventBus.publish(new GenericEventMessage<>("Message")); verify(jmsMessage).setJMSDeliveryMode(DeliveryMode.PERSISTENT); }
@Override public void verstuurBijhoudingsNotificatie(final BijhoudingsplanNotificatieBericht notificatieBericht) { try { final MessageCreator messageCreator = session -> { final TextMessage message = session.createTextMessage(); // Header voor message group mechanisme van ActiveMq message.setStringProperty("JMSXGroupID", String.valueOf(notificatieBericht.getOntvangendePartijCode())); message.setText(serialiseerderBijhoudingsplan.serialiseerNaarString(notificatieBericht)); return message; }; bijhoudingsplanJmsTemplate.send(messageCreator); } catch (final JmsException e) { LOGGER.error("Het publiceren van het verwerk BijhoudingsplanBericht is mislukt ivm een JMS exceptie.", e); throw e; } }
/** * handles the message; designed for TextMessage types */ public void onMessage(Message message) { try { if (message instanceof TextMessage) { try { logger.info("got message: " + ((TextMessage) message).getText()); processMessage((TextMessage) message); //session.commit(); } catch (ActivitiException ae) { logger.error("ActivityException in onMessage", ae); //session.rollback(); throw ae; } } } catch (JMSException e) { logger.error("JMSException in onMessage", e); } }
@Test /** test the message impl */ public void testListening() throws JMSException { try { listener.init(); TextMessage message = mock(TextMessage.class); when(message.getText()).thenReturn("{messgetext:true}"); when(message.getStringProperty("task_id")).thenReturn("corel-id"); when(message.getStringProperty("task_result_code")).thenReturn( "200"); when(message.getStringProperty("type")).thenReturn("deploybom"); when(message.getJMSCorrelationID()).thenReturn("jms|cor!rel!ation!id"); listener.onMessage(message); listener.cleanup(); listener.getConnectionStats(); } catch (JMSException e) { System.out.println("CAUTH EXCEPTION " + e.getMessage()); e.printStackTrace(); throw e; } }
private void putQueue(List<String> events) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_BIND_URL); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(DESTINATION_NAME); MessageProducer producer = session.createProducer(destination); for (String event : events) { TextMessage message = session.createTextMessage(); message.setText(event); producer.send(message); } session.commit(); session.close(); connection.close(); }
private void wrapAndSendResponseInternal(Message incoming, List<B> response) throws JMSException { String text; try { text = JsonMapper.serialize(response); } catch (JsonProcessingException e) { getLogger().warn("Failed to convert response to text. Will not send response"); return; } getLogger().debug("Response (object): {}", response); getLogger().debug("Response (string): {}", text); TextMessage msg = getContext().createTextMessage(text); msg.setJMSCorrelationID(incoming.getJMSCorrelationID()); getContext().createProducer() .setDisableMessageID(true) .setDisableMessageTimestamp(true) .setDeliveryMode(DeliveryMode.NON_PERSISTENT) .send(incoming.getJMSReplyTo(), msg); }
private static void sendMessage(Session session, MessageProducer producer) throws JMSException { /* for (int i = 0; i < SEND_NUMBER; i++) { TextMessage message = session.createTextMessage("ActiveMQ Send Message:"+i); System.out.println("SendMessage:"+""+i); // 发送消息到目的地方 producer.send(message); }//*/ int i = 0; do{ TextMessage message = session.createTextMessage("ActiveMQ Send Message:"+i); System.out.println("SendMessage:"+""+i); // 发送消息到目的地方 producer.send(message); i++; if(i > SEND_NUMBER) break; }while(true); }
@Test public void sendAndReceive() throws Exception { jmsTemplate.send("TEST.FOO", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage("Hello world"); } }); TextMessage received = (TextMessage) jmsTemplate.receive("TEST.FOO"); assertEquals("Hello world", received.getText()); jmsTemplate.convertAndSend("TEST.FOO", "Hello world"); assertEquals("Hello world", jmsTemplate.receiveAndConvert("TEST.FOO")); List<MockSpan> mockSpans = mockTracer.finishedSpans(); assertEquals(4, mockSpans.size()); checkSpans(mockSpans); assertNull(mockTracer.activeSpan()); }
public void send(){ jmsTemplate.send(notifyQueue,new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage message=session.createTextMessage("textMsg....柯远"); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 65*1000); System.out.println("等65秒后再发"); return message; } }); }
@Test public void sendAndReceive() throws Exception { Destination destination = session.createQueue("TEST.FOO"); MessageProducer messageProducer = session.createProducer(destination); messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // Instrument MessageProducer with TracingMessageProducer TracingMessageProducer producer = new TracingMessageProducer(messageProducer, mockTracer); MessageConsumer messageConsumer = session.createConsumer(destination); // Instrument MessageConsumer with TracingMessageConsumer TracingMessageConsumer consumer = new TracingMessageConsumer(messageConsumer, mockTracer); TextMessage message = session.createTextMessage("Hello world"); producer.send(message); TextMessage received = (TextMessage) consumer.receive(5000); assertEquals("Hello world", received.getText()); List<MockSpan> mockSpans = mockTracer.finishedSpans(); assertEquals(2, mockSpans.size()); checkSpans(mockSpans); assertNull(mockTracer.activeSpan()); }
@Override public TextMessage createTextMessage(String text) { try { return getSession().createTextMessage(text); } catch (JMSException jmse) { throw JMSExceptionSupport.createRuntimeException(jmse); } }
@Override public TextMessage createTextMessage(String text) throws JMSException { checkClosed(); MockJMSTextMessage message = new MockJMSTextMessage(); message.setText(text); return message; }
@Override public void onMessage(final Message message) { BrpNu.set(DatumUtil.nuAlsZonedDateTime()); try { LOGGER.debug("onMessage"); final TextMessage textMessage = (TextMessage) message; final String text = textMessage.getText(); final SelectieTaakResultaat selectieTaakResultaat = JSON_STRING_SERIALISEERDER.deserialiseerVanuitString(text, SelectieTaakResultaat.class); selectieTaakResultaatOntvanger.ontvang(selectieTaakResultaat); } catch (JMSException e) { LOGGER.error("error on message", e); } }
@Override public JMSProducer send(Destination destination, String body) { try { TextMessage message = session.createTextMessage(body); doSend(destination, message); } catch (JMSException jmse) { throw JMSExceptionSupport.createRuntimeException(jmse); } return this; }
@Override public void onMessage(Message message) { PrinterUtils.printELog("接收到消息..."); if(message instanceof TextMessage){ String msg=""; try { msg=((TextMessage) message).getText(); }catch (Exception e){ e.printStackTrace(); } PrinterUtils.printILog("msg:"+msg); }else { PrinterUtils.printILog("messgae is not TextMessage"); } }
@Override public void onMessage(final Message message) { BrpNu.set(DatumUtil.nuAlsZonedDateTime()); LOGGER.debug("onMessage"); final TextMessage textMessage = (TextMessage) message; try { final String text = textMessage.getText(); final SelectieVerwerkTaakBericht selectieTaak = JSON_STRING_SERIALISEERDER.deserialiseerVanuitString(text, SelectieVerwerkTaakBericht.class); selectieTaakVerwerker.verwerkSelectieTaak(selectieTaak); } catch (JMSException e) { LOGGER.error("error on message", e); } }
@Test public void validateConsumeWithCustomHeadersAndProperties() throws Exception { final String destinationName = "testQueue"; JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); jmsTemplate.send(destinationName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage("hello from the other side"); message.setStringProperty("foo", "foo"); message.setBooleanProperty("bar", false); message.setJMSReplyTo(session.createQueue("fooQueue")); return message; } }); JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class)); final AtomicBoolean callbackInvoked = new AtomicBoolean(); consumer.consume(destinationName, new ConsumerCallback() { @Override public void accept(JMSResponse response) { callbackInvoked.set(true); assertEquals("hello from the other side", new String(response.getMessageBody())); assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO)); assertEquals("foo", response.getMessageProperties().get("foo")); assertEquals("false", response.getMessageProperties().get("bar")); } }); assertTrue(callbackInvoked.get()); ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); }
/** * This test simply validates that {@link ConnectionFactory} can be setup by * pointing to the location of the client libraries at runtime. It uses * ActiveMQ which is not present at the POM but instead pulled from Maven * repo using {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which * implies that for this test to run the computer must be connected to the * Internet. If computer is not connected to the Internet, this test will * quietly fail logging a message. */ @Test public void validateFactoryCreationWithActiveMQLibraries() throws Exception { try { String libPath = TestUtils.setupActiveMqLibForTesting(true); TestRunner runner = TestRunners.newTestRunner(mock(Processor.class)); JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); runner.addControllerService("cfProvider", cfProvider); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "vm://localhost?broker.persistent=false"); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath); runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, "org.apache.activemq.ActiveMQConnectionFactory"); runner.enableControllerService(cfProvider); runner.assertValid(cfProvider); Connection connection = cfProvider.getConnectionFactory().createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue("myqueue"); MessageProducer producer = session.createProducer(queue); MessageConsumer consumer = session.createConsumer(queue); TextMessage message = session.createTextMessage("Hello"); producer.send(message); assertEquals("Hello", ((TextMessage) consumer.receive()).getText()); connection.stop(); connection.close(); } catch (Exception e) { logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e); } }
/** * This test simply validates that {@link ConnectionFactory} can be setup by pointing to the location of the client * libraries at runtime. It uses ActiveMQ which is not present at the POM but instead pulled from Maven repo using * {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which implies that for this test to run the computer must * be connected to the Internet. If computer is not connected to the Internet, this test will quietly fail logging a * message. */ @Test public void validateFactoryCreationWithActiveMQLibraries() throws Exception { try { String libPath = TestUtils.setupActiveMqLibForTesting(true); TestRunner runner = TestRunners.newTestRunner(mock(Processor.class)); JNDIConnectionFactoryProvider cfProvider = new JNDIConnectionFactoryProvider(); runner.addControllerService("cfProvider", cfProvider); runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.BROKER_URI, "vm://localhost?broker.persistent=false"); runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.JNDI_CF_LOOKUP, "ConnectionFactory"); runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath); runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); runner.enableControllerService(cfProvider); runner.assertValid(cfProvider); Connection connection = cfProvider.getConnectionFactory().createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue("myqueue"); MessageProducer producer = session.createProducer(queue); MessageConsumer consumer = session.createConsumer(queue); TextMessage message = session.createTextMessage("Hello"); producer.send(message); assertEquals("Hello", ((TextMessage) consumer.receive()).getText()); connection.stop(); connection.close(); } catch (Exception e) { logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e); } }
/** * Exceptions test. * * @throws OpampException */ public void exceptionsTest() throws OpampException { OpsEventListener oel = new OpsEventListener(); opsEventListener.setGson(new Gson()); when(opsEventListener.getEventUtil().getGson()).thenReturn(new Gson()); String eventJson = makeJson(UNHEALTHY, "oldstate"); TextMessage message = mock(TextMessage.class); try { when(message.getStringProperty("type")).thenReturn("ci-change-state"); when(message.getText()).thenReturn(eventJson); } catch (JMSException e) { e.printStackTrace(); } BadStateProcessor bsProcessorMock = mock(BadStateProcessor.class); CiChangeStateEvent changeEvent = new CiChangeStateEvent(); OpsBaseEvent event = new OpsBaseEvent(); event.setCiId(anyLong()); doThrow(new OpsException(1, "expected")).when(bsProcessorMock).processUnhealthyState(changeEvent); oel.setBsProcessor(bsProcessorMock); oel.onMessage(message); // /further test coverage could be done with this mixture. save for // later // when(bsProcessorMock.processUnhealthyState(anyLong()); // doThrow(new // OpampException("expected")).when(bsProcessorMock).processUnhealthyState(anyLong()); // oel.setBsProcessor(bsProcessorMock); // bsProcessor.processUnhealthyState(event.getCiId()); }
protected TextMessage createTextMessage(MessageData data) throws JMSException { TextMessage message = session.createTextMessage(data.getPayload()); Map<String, String> headers = data.getHeaders(); if (headers != null) { for (String key : headers.keySet()) { message.setStringProperty(key, headers.get(key)); } } return message; }
private Callable<String> receiveMessage(final JmsOperations jmsOperations) { return () -> { final TextMessage textMessage = (TextMessage) jmsOperations.receive(); try { return textMessage.getText(); } catch (JMSException e) { throw new IllegalStateException("Could not receive message", e); } }; }