/** * 使用jmsTemplate的send/MessageCreator()发送Map类型的消息并在Message中附加属性用于消息过滤. */ private void sendMessage(final User user, Destination destination) { jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { MapMessage message = session.createMapMessage(); message.setString("userName", user.getName()); message.setString("email", user.getEmail()); message.setStringProperty("objectType", "user"); return message; } }); }
public void sendMessage() { jmsTemplate.send(notifyTopic, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { MapMessage message = session.createMapMessage(); int delay=10*1000; System.out.println("生产消消息"); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); message.setStringProperty("objectType", "user"); return message; } }); }
protected Message makeMessage(Session session, ModelService modelService, Map<String, Object> context) throws GenericServiceException, JMSException { List<String> outParams = modelService.getParameterNames(ModelService.OUT_PARAM, false); if (UtilValidate.isNotEmpty(outParams)) throw new GenericServiceException("JMS service cannot have required OUT parameters; no parameters will be returned."); String xmlContext = null; try { if (Debug.verboseOn()) Debug.logVerbose("Serializing Context --> " + context, module); xmlContext = JmsSerializer.serialize(context); } catch (Exception e) { throw new GenericServiceException("Cannot serialize context.", e); } MapMessage message = session.createMapMessage(); message.setString("serviceName", modelService.invoke); message.setString("serviceContext", xmlContext); return message; }
@Test public void testMapConversion() throws JMSException { Session session = mock(Session.class); MapMessage message = mock(MapMessage.class); Map<String, String> content = new HashMap<String, String>(2); content.put("key1", "value1"); content.put("key2", "value2"); given(session.createMapMessage()).willReturn(message); given(message.getMapNames()).willReturn(Collections.enumeration(content.keySet())); given(message.getObject("key1")).willReturn("value1"); given(message.getObject("key2")).willReturn("value2"); SimpleMessageConverter converter = new SimpleMessageConverter(); Message msg = converter.toMessage(content, session); assertEquals(content, converter.fromMessage(msg)); verify(message).setObject("key1", "value1"); verify(message).setObject("key2", "value2"); }
@Test public void testMapConversionWhereMapHasNonStringTypesForKeys() throws JMSException { MapMessage message = mock(MapMessage.class); final Session session = mock(Session.class); given(session.createMapMessage()).willReturn(message); final Map<Integer, String> content = new HashMap<Integer, String>(1); content.put(1, "value1"); final SimpleMessageConverter converter = new SimpleMessageConverter(); try { converter.toMessage(content, session); fail("expected MessageConversionException"); } catch (MessageConversionException ex) { /* expected */ } }
@Test public void testMapConversionWhereMapHasNNullForKey() throws JMSException { MapMessage message = mock(MapMessage.class); final Session session = mock(Session.class); given(session.createMapMessage()).willReturn(message); final Map<Object, String> content = new HashMap<Object, String>(1); content.put(null, "value1"); final SimpleMessageConverter converter = new SimpleMessageConverter(); try { converter.toMessage(content, session); fail("expected MessageConversionException"); } catch (MessageConversionException ex) { /* expected */ } }
@Test public void testConsumeMapMessage() throws Exception { endpoint.expectedMessageCount(1); jmsTemplate.setPubSubDomain(false); jmsTemplate.send("test.map", new MessageCreator() { public Message createMessage(Session session) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("foo", "abc"); mapMessage.setString("bar", "xyz"); return mapMessage; } }); endpoint.assertIsSatisfied(); assertCorrectMapReceived(); }
protected void assertCorrectMapReceived() { Exchange exchange = endpoint.getReceivedExchanges().get(0); // This should be a JMS Exchange assertNotNull(ExchangeHelper.getBinding(exchange, JmsBinding.class)); JmsMessage in = (JmsMessage) exchange.getIn(); assertNotNull(in); Map<?, ?> map = exchange.getIn().getBody(Map.class); log.info("Received map: " + map); assertNotNull("Should have received a map message!", map); assertIsInstanceOf(MapMessage.class, in.getJmsMessage()); assertEquals("map.foo", "abc", map.get("foo")); assertEquals("map.bar", "xyz", map.get("bar")); assertEquals("map.size", 2, map.size()); }
/** * MessageListener回调函数. */ @Override public void onMessage(Message message) { try { MapMessage mapMessage = (MapMessage) message; // 打印消息详情 logger.info("UserName:{}, Email:{}", mapMessage.getString("userName"), mapMessage.getString("email")); // 发送邮件 // if (simpleMailService != null) { // simpleMailService.sendNotificationMail(mapMessage.getString("userName")); // } } catch (Exception e) { logger.error("处理消息时发生异常.", e); } }
/** * This implementation converts a TextMessage back to a String, a * ByteMessage back to a byte array, a MapMessage back to a Map, * and an ObjectMessage back to a Serializable object. Returns * the plain Message object in case of an unknown message type. * * @return payload * @throws javax.jms.JMSException */ @Override public Object convert(Message message) throws JMSException { if (message instanceof TextMessage) { return ((TextMessage)message).getText(); } else if (message instanceof StreamMessage) { return ((StreamMessage)message).readString(); } else if (message instanceof BytesMessage) { return extractByteArrayFromMessage((BytesMessage)message); } else if (message instanceof MapMessage) { return extractMapFromMessage((MapMessage)message); } else if (message instanceof ObjectMessage) { return extractSerializableFromMessage((ObjectMessage)message); } else { return message; } }
public static void print(Message msg) throws JMSException { log.info(".print received message: " + msg.getJMSMessageID()); if (msg instanceof ObjectMessage) { ObjectMessage objMsg = (ObjectMessage) msg; log.info(".print object: " + objMsg.getObject().toString()); } else { MapMessage mapMsg = (MapMessage) msg; HashMap map = new HashMap(); Enumeration en = mapMsg.getMapNames(); while (en.hasMoreElements()) { String property = (String) en.nextElement(); Object mapObject = mapMsg.getObject(property); map.put(property, mapObject); } log.info(".print map: " + map); } }
@Test public void testSendNullMapMessage() throws Exception { try (Connection connection = factory.createConnection()) { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); System.out.println("Queue:" + queue); MessageProducer producer = session.createProducer(queue); MessageConsumer consumer = session.createConsumer(queue); producer.send(session.createMapMessage()); Assert.assertNull(consumer.receive(100)); connection.start(); MapMessage message = (MapMessage) consumer.receive(5000); Assert.assertNotNull(message); message.acknowledge(); } }
private void sendCompressedMapMessageUsingOpenWire() throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination); MapMessage mapMessage = session.createMapMessage(); mapMessage.setBoolean("boolean-type", true); mapMessage.setByte("byte-type", (byte) 10); mapMessage.setBytes("bytes-type", TEXT.getBytes()); mapMessage.setChar("char-type", 'A'); mapMessage.setDouble("double-type", 55.3D); mapMessage.setFloat("float-type", 79.1F); mapMessage.setInt("int-type", 37); mapMessage.setLong("long-type", 56652L); mapMessage.setObject("object-type", new String("VVVV")); mapMessage.setShort("short-type", (short) 333); mapMessage.setString("string-type", TEXT); producer.send(mapMessage); }
@Override protected String extractString(JMSBindingData binding) throws Exception { Message content = binding.getMessage(); if (content instanceof TextMessage) { return TextMessage.class.cast(content).getText(); } else if (content instanceof BytesMessage) { BytesMessage sourceBytes = BytesMessage.class.cast(content); if (sourceBytes.getBodyLength() > Integer.MAX_VALUE) { throw JCAMessages.MESSAGES.theSizeOfMessageContentExceedsBytesThatIsNotSupportedByThisOperationSelector("" + Integer.MAX_VALUE); } byte[] bytearr = new byte[(int)sourceBytes.getBodyLength()]; sourceBytes.readBytes(bytearr); return new String(bytearr); } else if (content instanceof ObjectMessage) { ObjectMessage sourceObj = ObjectMessage.class.cast(content); return String.class.cast(sourceObj.getObject()); } else if (content instanceof MapMessage) { MapMessage sourceMap = MapMessage.class.cast(content); return sourceMap.getString(KEY); } else { return content.getStringProperty(KEY); } }
public void testBrokerStats() throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue replyTo = session.createTemporaryQueue(); MessageConsumer consumer = session.createConsumer(replyTo); Queue query = session.createQueue(StatisticsBroker.STATS_BROKER_PREFIX); MessageProducer producer = session.createProducer(query); Message msg = session.createMessage(); msg.setJMSReplyTo(replyTo); producer.send(msg); MapMessage reply = (MapMessage) consumer.receive(10 * 1000); assertNotNull(reply); assertTrue(reply.getMapNames().hasMoreElements()); assertTrue(reply.getJMSTimestamp() > 0); assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority()); /* for (Enumeration e = reply.getMapNames();e.hasMoreElements();) { String name = e.nextElement().toString(); System.err.println(name+"="+reply.getObject(name)); } */ }
public void testDestinationStats() throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue replyTo = session.createTemporaryQueue(); MessageConsumer consumer = session.createConsumer(replyTo); Queue testQueue = session.createQueue("Test.Queue"); MessageProducer producer = session.createProducer(null); Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + testQueue.getQueueName()); Message msg = session.createMessage(); producer.send(testQueue, msg); msg.setJMSReplyTo(replyTo); producer.send(query, msg); MapMessage reply = (MapMessage) consumer.receive(10 * 1000); assertNotNull(reply); assertTrue(reply.getMapNames().hasMoreElements()); assertTrue(reply.getJMSTimestamp() > 0); assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority()); /* for (Enumeration e = reply.getMapNames();e.hasMoreElements();) { String name = e.nextElement().toString(); System.err.println(name+"="+reply.getObject(name)); } */ }
public void testDestinationStatsWithDot() throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue replyTo = session.createTemporaryQueue(); MessageConsumer consumer = session.createConsumer(replyTo); Queue testQueue = session.createQueue("Test.Queue"); MessageProducer producer = session.createProducer(null); Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + "." + testQueue.getQueueName()); Message msg = session.createMessage(); producer.send(testQueue, msg); msg.setJMSReplyTo(replyTo); producer.send(query, msg); MapMessage reply = (MapMessage) consumer.receive(10 * 1000); assertNotNull(reply); assertTrue(reply.getMapNames().hasMoreElements()); assertTrue(reply.getJMSTimestamp() > 0); assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority()); /* for (Enumeration e = reply.getMapNames();e.hasMoreElements();) { String name = e.nextElement().toString(); System.err.println(name+"="+reply.getObject(name)); } */ }
@SuppressWarnings("unused") public void testSubscriptionStats() throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue replyTo = session.createTemporaryQueue(); MessageConsumer consumer = session.createConsumer(replyTo); Queue testQueue = session.createQueue("Test.Queue"); MessageConsumer testConsumer = session.createConsumer(testQueue); MessageProducer producer = session.createProducer(null); Queue query = session.createQueue(StatisticsBroker.STATS_SUBSCRIPTION_PREFIX); Message msg = session.createMessage(); producer.send(testQueue, msg); msg.setJMSReplyTo(replyTo); producer.send(query, msg); MapMessage reply = (MapMessage) consumer.receive(10 * 1000); assertNotNull(reply); assertTrue(reply.getMapNames().hasMoreElements()); assertTrue(reply.getJMSTimestamp() > 0); assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority()); /*for (Enumeration e = reply.getMapNames();e.hasMoreElements();) { String name = e.nextElement().toString(); System.err.println(name+"="+reply.getObject(name)); }*/ }
@Test public void testMapMessageCompression() throws Exception { MessageConsumer consumer1 = remoteSession.createConsumer(included); MessageProducer producer = localSession.createProducer(included); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); waitForConsumerRegistration(localBroker, 1, included); MapMessage test = localSession.createMapMessage(); for (int i = 0; i < 100; ++i) { test.setString(Integer.toString(i), "test string: " + i); } producer.send(test); Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS); assertNotNull(msg); ActiveMQMapMessage message = (ActiveMQMapMessage) msg; assertTrue(message.isCompressed()); for (int i = 0; i < 100; ++i) { assertEquals("test string: " + i, message.getString(Integer.toString(i))); } }
@Override protected Message createMessage(int index) throws JMSException { MapMessage answer = session.createMapMessage(); answer.setString("textField", data[index]); Map<String, Object> grandChildMap = new HashMap<>(); grandChildMap.put("x", "abc"); grandChildMap.put("y", Arrays.asList(new Object[]{"a", "b", "c"})); Map<String, Object> nestedMap = new HashMap<>(); nestedMap.put("a", "foo"); nestedMap.put("b", Integer.valueOf(23)); nestedMap.put("c", Long.valueOf(45)); nestedMap.put("d", grandChildMap); answer.setObject("mapField", nestedMap); answer.setObject("listField", Arrays.asList(new Object[]{"a", "b", "c"})); return answer; }
private void sendTestMapMessage(ActiveMQConnectionFactory factory, String message) throws JMSException { ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(queue); MapMessage mapMessage = session.createMapMessage(); mapMessage.setBoolean("boolean-type", true); mapMessage.setByte("byte-type", (byte) 10); mapMessage.setBytes("bytes-type", TEXT.getBytes()); mapMessage.setChar("char-type", 'A'); mapMessage.setDouble("double-type", 55.3D); mapMessage.setFloat("float-type", 79.1F); mapMessage.setInt("int-type", 37); mapMessage.setLong("long-type", 56652L); mapMessage.setObject("object-type", new String("VVVV")); mapMessage.setShort("short-type", (short) 333); mapMessage.setString("string-type", TEXT); producer.send(mapMessage); connection.close(); }
/** * Test in MapMessage the conversion between <code>getObject("foo")</code> and * <code>getDouble("foo")</code> (the later returning a java.lang.Double and the former a double) */ @Test public void testMapMessageConversion() { try { MapMessage message = senderSession.createMapMessage(); message.setDouble("pi", 3.14159); sender.send(message); Message m = receiver.receive(TestConfig.TIMEOUT); Assert.assertTrue("The message should be an instance of MapMessage.\n", m instanceof MapMessage); MapMessage msg = (MapMessage) m; Assert.assertTrue(msg.getObject("pi") instanceof Double); Assert.assertEquals(3.14159, ((Double) msg.getObject("pi")).doubleValue(), 0); Assert.assertEquals(3.14159, msg.getDouble("pi"), 0); } catch (JMSException e) { fail(e); } }
/** * Send a <code>MapMessage</code> with 2 Java primitives in its body (a <code> * String</code> and a <code>double</code>). * <br /> * Receive it and test that the values of the primitives of the body are correct */ @Test public void testMapMessage_2() { try { MapMessage message = senderSession.createMapMessage(); message.setString("name", "pi"); message.setDouble("value", 3.14159); sender.send(message); Message m = receiver.receive(TestConfig.TIMEOUT); Assert.assertTrue("The message should be an instance of MapMessage.\n", m instanceof MapMessage); MapMessage msg = (MapMessage) m; Assert.assertEquals("pi", msg.getString("name")); Assert.assertEquals(3.14159, msg.getDouble("value"), 0); } catch (JMSException e) { fail(e); } }
@Override protected void assertEquivalent(final Message m, final int mode, final boolean redelivery) throws JMSException { super.assertEquivalent(m, mode, redelivery); MapMessage map = (MapMessage) m; ProxyAssertSupport.assertTrue(map.getBoolean("boolean1")); ProxyAssertSupport.assertEquals(map.getChar("char1"), 'c'); ProxyAssertSupport.assertEquals(map.getDouble("double1"), 1.0D, 0.0D); ProxyAssertSupport.assertEquals(map.getFloat("float1"), 2.0F, 0.0F); ProxyAssertSupport.assertEquals(map.getInt("int1"), 3); ProxyAssertSupport.assertEquals(map.getLong("long1"), 4L); ProxyAssertSupport.assertEquals(map.getObject("object1"), obj); ProxyAssertSupport.assertEquals(map.getShort("short1"), (short) 5); ProxyAssertSupport.assertEquals(map.getString("string1"), "stringvalue"); }
@Override protected void prepareMessage(final Message m) throws JMSException { super.prepareMessage(m); MapMessage mm = (MapMessage) m; mm.setBoolean("boolean", true); mm.setByte("byte", (byte) 3); mm.setBytes("bytes", new byte[]{(byte) 3, (byte) 4, (byte) 5}); mm.setChar("char", (char) 6); mm.setDouble("double", 7.0); mm.setFloat("float", 8.0f); mm.setInt("int", 9); mm.setLong("long", 10L); mm.setObject("object", new String("this is an object")); mm.setShort("short", (short) 11); mm.setString("string", "this is a string"); }
@Override protected void assertEquivalent(final Message m, final int mode, final boolean redelivery) throws JMSException { super.assertEquivalent(m, mode, redelivery); MapMessage mm = (MapMessage) m; ProxyAssertSupport.assertEquals(true, mm.getBoolean("boolean")); ProxyAssertSupport.assertEquals((byte) 3, mm.getByte("byte")); byte[] bytes = mm.getBytes("bytes"); ProxyAssertSupport.assertEquals((byte) 3, bytes[0]); ProxyAssertSupport.assertEquals((byte) 4, bytes[1]); ProxyAssertSupport.assertEquals((byte) 5, bytes[2]); ProxyAssertSupport.assertEquals((char) 6, mm.getChar("char")); ProxyAssertSupport.assertEquals(new Double(7.0), new Double(mm.getDouble("double"))); ProxyAssertSupport.assertEquals(new Float(8.0f), new Float(mm.getFloat("float"))); ProxyAssertSupport.assertEquals(9, mm.getInt("int")); ProxyAssertSupport.assertEquals(10L, mm.getLong("long")); ProxyAssertSupport.assertEquals("this is an object", mm.getObject("object")); ProxyAssertSupport.assertEquals((short) 11, mm.getShort("short")); ProxyAssertSupport.assertEquals("this is a string", mm.getString("string")); }
/** * Wrap message * * @param message The message to be wrapped * @return The wrapped message */ Message wrapMessage(final Message message) { if (ActiveMQRAMessageConsumer.trace) { ActiveMQRALogger.LOGGER.trace("wrapMessage(" + message + ")"); } if (message instanceof BytesMessage) { return new ActiveMQRABytesMessage((BytesMessage) message, session); } else if (message instanceof MapMessage) { return new ActiveMQRAMapMessage((MapMessage) message, session); } else if (message instanceof ObjectMessage) { return new ActiveMQRAObjectMessage((ObjectMessage) message, session); } else if (message instanceof StreamMessage) { return new ActiveMQRAStreamMessage((StreamMessage) message, session); } else if (message instanceof TextMessage) { return new ActiveMQRATextMessage((TextMessage) message, session); } return new ActiveMQRAMessage(message, session); }
public MapMessage createMessage(Map<String, Object> body, Map<String, Object> properties) { MapMessage message = this.createMapMessage(); if (body != null) { for (Map.Entry<String, Object> entry : body.entrySet()) { try { message.setObject(entry.getKey(), entry.getValue()); } catch (JMSException jmsEx) { throw new EmbeddedJMSResourceException(String.format("Failed to set body entry {%s = %s} on MapMessage", entry.getKey(), entry.getValue().toString()), jmsEx); } } } setMessageProperties(message, properties); return message; }
@Override public MapMessage createMapMessage() { try { return getSession().createMapMessage(); } catch (JMSException jmse) { throw JMSExceptionSupport.createRuntimeException(jmse); } }
@Override public JMSProducer send(Destination destination, Map<String, Object> body) { try { MapMessage message = session.createMapMessage(); for (Map.Entry<String, Object> entry : body.entrySet()) { message.setObject(entry.getKey(), entry.getValue()); } doSend(destination, message); } catch (JMSException jmse) { throw JMSExceptionSupport.createRuntimeException(jmse); } return this; }
protected void doPublish(Event event) throws EdsException { Connection conn = null; Session session = null; MessageProducer messageProducer = null; try { LOG.debug("eds pub 3 mq in -[event:" + event + "]"); conn = connectionFactory.createConnection(); // 设置非事务,客户端确认方式 session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); MapMessage mapMessage = session.createMapMessage(); mapMessage = EventConverter.convertToMessage(mapMessage, event); Destination dest = getDestination(event.getName(), session); messageProducer = session.createProducer(dest); messageProducer.send(mapMessage); // commit session if necessary if (session.getTransacted()) { session.commit(); } LOG.debug("eds pub 4 mq ok -[conn:" + conn + ",session:" + session + ",event:" + event + "]"); }catch(JMSException e){ throw new EdsException("eds client activemq doPublish exception ", e); }finally { releaseSession(session); releaseMessageProducer(messageProducer); releaseConnection(conn, false); } }
public static MapMessage convertToMessage(MapMessage mapMessage,Event event) throws JMSException{ String name = event.getName(); Object data = event.getData(); // 如果已经是字符串,则不动 String dataStr = null; if(!(data instanceof String)){ dataStr = JSON.toJSONString(data,SerializerFeature.WriteClassName); }else{ dataStr = data.toString(); } mapMessage.setObject("name", name); mapMessage.setObject("data", dataStr); return mapMessage; }
@Override public Object fromMessage(Message message) throws JMSException, MessageConversionException { if(message instanceof MapMessage){ return EventConverter.convertToEvent((MapMessage) message ); } throw new EdsException("eds message to event conversion exception : " + message); }
@Override public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException { if(object instanceof Event){ MapMessage mapMessage = session.createMapMessage(); // Event ev = new Event(); return EventConverter.convertToMessage(mapMessage, (Event)object); } throw new EdsException("eds event to message conversion exception : " + ToStringBuilder.reflectionToString(object)); }
public static javax.jms.Message createJMSMessageForReservationCallbackProviderDestination( javax.jms.Session session, java.lang.String messsageType, String uniqueID) throws javax.jms.JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setString(ReservationCallbackProviderBean.MESSAGE_TYPE, messsageType); mapMessage.setString(ReservationCallbackProviderBean.UNIQUE_ID, uniqueID); return mapMessage; }
@Test public void createMapMessage() throws Exception { MapMessage message = mock(MapMessage.class); keys.stream().forEach(x -> current().put(x, x)); when(delegate.createMapMessage()).thenReturn(message); propagator.createMapMessage(); verify(delegate).createMapMessage(); verify(message).setStringProperty("1", "1"); }
@Override public MapMessage createMapMessage() { try { return getSession().createMapMessage(); } catch (JMSException jmse) { throw Utils.convertToRuntimeException(jmse); } }