public ListenableFuture<?> publishAsync(final Exchange exchange, final Message message, final @Nullable BasicProperties properties, final @Nullable Publish publish) { // NOTE: Serialization must happen synchronously, because getter methods may not be thread-safe final String payload = gson.toJson(message); final AMQP.BasicProperties finalProperties = getProperties(message, properties); final Publish finalPublish = Publish.forMessage(message, publish); if(this.executorService == null) throw new IllegalStateException("Not connected"); return this.executorService.submit(new Runnable() { @Override public void run() { try { publish(exchange, payload, finalProperties, finalPublish); } catch(Throwable e) { logger.log(Level.SEVERE, "Unhandled exception publishing message type " + finalProperties.getType(), e); } } }); }
@Override public void handleDelivery( final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body) throws IOException { deliveryCount.increment(); final MessageController messageController = buildMessageController(properties); for (final TransportConsumer transportConsumer : transportConsumers) { try { transportConsumer.handleMessage(messageController, body); } catch (final Exception e) { deliveryFailureCount.increment(); LOGGER.error("Error giving message to transport consumer:", e); } } }
public Builder(@Nullable BasicProperties p) { if(p == null) return; this.contentType = p.getContentType(); this.contentEncoding = p.getContentEncoding(); this.headers = p.getHeaders(); this.deliveryMode = p.getDeliveryMode(); this.priority = p.getPriority(); this.correlationId = p.getCorrelationId(); this.replyTo = p.getReplyTo(); this.expiration = p.getExpiration(); this.timestamp = p.getTimestamp(); this.type = p.getType(); this.userId = p.getUserId(); this.appId = p.getAppId(); }
private static void mergeProperties(Metadata to, BasicProperties from) { if(from.getMessageId() != null) to.setMessageId(from.getMessageId()); if(from.getDeliveryMode() != null) to.setDeliveryMode(from.getDeliveryMode()); if(from.getExpiration() != null) to.setExpiration(from.getExpiration()); if(from.getCorrelationId() != null) to.setCorrelationId(from.getCorrelationId()); if(from.getReplyTo() != null) to.setReplyTo(from.getReplyTo()); final Map<String, Object> headers = from.getHeaders(); if(headers != null && !headers.isEmpty()) { to.setHeaders(MapUtils.merge(to.getHeaders(), headers)); } }
public Metadata getProperties(Message message, @Nullable BasicProperties properties) { Metadata amqp = cloneProperties(DEFAULT_PROPERTIES); amqp.setMessageId(idFactory.newId()); amqp.setTimestamp(new Date()); amqp.setType(messageRegistry.typeName(message.getClass())); if(message instanceof ModelMessage) { amqp.setHeaders(MapUtils.merge(amqp.getHeaders(), Metadata.MODEL_NAME, modelRegistry.meta(((ModelMessage) message).model()).name())); } MessageDefaults.ExpirationMillis expiration = Types.inheritableAnnotation(message.getClass(), MessageDefaults.ExpirationMillis.class); if(expiration != null) { amqp.setExpiration(String.valueOf(expiration.value())); } MessageDefaults.Persistent persistent = Types.inheritableAnnotation(message.getClass(), MessageDefaults.Persistent.class); if(persistent != null) { amqp.setDeliveryMode(persistent.value() ? 2 : 1); } if(properties != null) mergeProperties(amqp, properties); return amqp; }
static Map<String, Struct> headers(BasicProperties basicProperties) { Map<String, Object> input = basicProperties.getHeaders(); Map<String, Struct> results = new LinkedHashMap<>(); if (null != input) { for (Map.Entry<String, Object> kvp : input.entrySet()) { log.trace("headers() - key = '{}' value= '{}'", kvp.getKey(), kvp.getValue()); final String field; final Object headerValue; if (kvp.getValue() instanceof LongString) { headerValue = kvp.getValue().toString(); } else { headerValue = kvp.getValue(); } if (!FIELD_LOOKUP.containsKey(headerValue.getClass())) { throw new DataException( String.format("Could not determine the type for field '%s' type '%s'", kvp.getKey(), headerValue.getClass().getName()) ); } else { field = FIELD_LOOKUP.get(headerValue.getClass()); } log.trace("headers() - Storing value for header in field = '{}' as {}", field, field); Struct value = new Struct(SCHEMA_HEADER_VALUE) .put("type", field) .put(field, headerValue); results.put(kvp.getKey(), value); } } return results; }
static Struct value(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) { return new Struct(SCHEMA_VALUE) .put(FIELD_MESSAGE_CONSUMERTAG, consumerTag) .put(FIELD_MESSAGE_ENVELOPE, envelope(envelope)) .put(FIELD_MESSAGE_BASICPROPERTIES, basicProperties(basicProperties)) .put(FIELD_MESSAGE_BODY, body); }
@TestFactory public Stream<DynamicTest> headers() { final List<HeaderTestCase> tests = Arrays.asList( HeaderTestCase.of(Byte.valueOf("1"), Schema.Type.INT8.toString().toLowerCase(), Byte.valueOf("1")), HeaderTestCase.of(Short.valueOf("1"), Schema.Type.INT16.toString().toLowerCase(), Short.valueOf("1")), HeaderTestCase.of(Integer.valueOf("1"), Schema.Type.INT32.toString().toLowerCase(), Integer.valueOf("1")), HeaderTestCase.of(Long.valueOf("1"), Schema.Type.INT64.toString().toLowerCase(), Long.valueOf("1")), HeaderTestCase.of(Float.valueOf("1"), Schema.Type.FLOAT32.toString().toLowerCase(), Float.valueOf("1")), HeaderTestCase.of(Double.valueOf("1"), Schema.Type.FLOAT64.toString().toLowerCase(), Double.valueOf("1")), HeaderTestCase.of("1", Schema.Type.STRING.toString().toLowerCase(), "1"), HeaderTestCase.of(LongStringHelper.asLongString("1"), Schema.Type.STRING.toString().toLowerCase(), "1"), HeaderTestCase.of(new Date(1500691965123L), "timestamp", new Date(1500691965123L)) ); return tests.stream().map(test -> dynamicTest(test.toString(), () -> { final Map<String, Object> INPUT_HEADERS = ImmutableMap.of("input", test.input); BasicProperties basicProperties = mock(BasicProperties.class); when(basicProperties.getHeaders()).thenReturn(INPUT_HEADERS); final Map<String, Struct> actual = MessageConverter.headers(basicProperties); verify(basicProperties, only()).getHeaders(); assertNotNull(actual, "actual should not be null."); assertTrue(actual.containsKey("input"), "actual should contain key 'input'"); Struct actualStruct = actual.get("input"); actualStruct.validate(); assertStruct(test.expectedStruct(), actualStruct); })); }
/*** * 解析 timestamp * 查看消息header区,取得timestamp值 * * @getParam() delivery * @return */ private long getTimestamp(QueueingConsumer.Delivery delivery) { BasicProperties prop = delivery.getProperties(); Map<String, Object> headers = prop.getHeaders(); long timestamp = -1; if (headers != null) { if (LogUtil.getMqSyncLog().isDebugEnabled()) LogUtil.getMqSyncLog().debug(headers.toString()); Object obj = headers.get(MqConstant.TIMESTAMP); if (obj != null) { if (LogUtil.getMqSyncLog().isDebugEnabled()) LogUtil.getMqSyncLog().debug(obj.toString()); Date dt = DateUtil.parse(obj.toString()); if (dt != null) timestamp = dt.getTime(); if (LogUtil.getMqSyncLog().isDebugEnabled()) LogUtil.getMqSyncLog().debug("Timestamp:" + timestamp); timestamp = 0; } } return timestamp; }
public static EncodedProperties encode(BasicProperties properties) { Set<Header> headers = new HashSet<Header>(); if (properties.getHeaders() != null) { for (String headerName : properties.getHeaders().keySet()) { headers.add(Header.newBuilder().setName(headerName) .setValue(properties.getHeaders().get(headerName).toString()).build()); } } EncodedProperties.Builder propertiesBuilder = EncodedProperties.newBuilder(); if (properties.getAppId() != null) propertiesBuilder.setAppID(properties.getAppId()); if (properties.getContentEncoding() != null) propertiesBuilder.setContentEncoding(properties.getContentEncoding()); if (properties.getContentType() != null) propertiesBuilder.setContentType(properties.getContentType()); if (properties.getCorrelationId() != null) propertiesBuilder.setCorrelationId(properties.getCorrelationId()); if (properties.getDeliveryMode() != null) propertiesBuilder.setDeliveryMode(properties.getDeliveryMode()); if (properties.getExpiration() != null) propertiesBuilder.setExpiration(properties.getExpiration()); if (properties.getMessageId() != null) propertiesBuilder.setMessageID(properties.getMessageId()); if (properties.getPriority() != null) propertiesBuilder.setPriority(properties.getPriority()); if (properties.getReplyTo() != null) propertiesBuilder.setReplyTo(properties.getReplyTo()); if (properties.getTimestamp() != null) propertiesBuilder.setTimestamp(properties.getTimestamp().getTime()); if (properties.getType() != null) propertiesBuilder.setType(properties.getType()); if (properties.getUserId() != null) propertiesBuilder.setUserId(properties.getUserId()); if (headers.size() > 0) propertiesBuilder.addAllHeader(headers); return propertiesBuilder.build(); }
public static AMQP.BasicProperties decode(EncodedProperties properties) { AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); if (properties.hasAppID()) builder.appId(properties.getAppID()); if (properties.hasContentEncoding()) builder.contentEncoding(properties.getContentEncoding()); if (properties.hasContentType()) builder.contentType(properties.getContentType()); if (properties.hasCorrelationId()) builder.correlationId(properties.getCorrelationId()); if (properties.hasDeliveryMode()) builder.deliveryMode(properties.getDeliveryMode()); if (properties.hasExpiration()) builder.expiration(properties.getExpiration()); if (properties.hasMessageID()) builder.messageId(properties.getMessageID()); if (properties.hasPriority()) builder.priority(properties.getPriority()); if (properties.hasReplyTo()) builder.replyTo(properties.getReplyTo()); if (properties.hasTimestamp()) builder.timestamp(new Date(properties.getTimestamp())); if (properties.hasType()) builder.type(properties.getType()); if (properties.hasUserId()) builder.userId(properties.getUserId()); if (properties.getHeaderCount() > 0) { Map<String,Object> headerMap = new HashMap<String,Object>(); for (Header h : properties.getHeaderList()) { headerMap.put(h.getName(), h.getValue()); } builder.headers(headerMap); } return builder.build(); }
public static Map<String, Object> getHeaders(AMQP.BasicProperties props) { return nonNullHeaders(props.getHeaders()); }
public static @Nullable String getHeaderString(AMQP.BasicProperties props, String name) { // Header values are some kind of spooky fake string object // called LongStringHelper.ByteArrayLongString Object o = getHeaders(props).get(name); return o == null ? null : o.toString(); }
public static int getHeaderInt(AMQP.BasicProperties props, String name, int def) { final String text = getHeaderString(props, name); return text == null ? def : Integer.parseInt(text); }
public static int protocolVersion(AMQP.BasicProperties props) { return getHeaderInt(props, PROTOCOL_VERSION, ApiConstants.PROTOCOL_VERSION); }
public static Optional<String> modelName(AMQP.BasicProperties props) { return Optional.ofNullable(getHeaderString(props, MODEL_NAME)); }
public Metadata(BasicProperties p) { this(p.getContentType(), p.getContentEncoding(), p.getHeaders(), p.getDeliveryMode(), p.getPriority(), p.getCorrelationId(), p.getReplyTo(), p.getExpiration(), p.getMessageId(), p.getTimestamp(), p.getType(), p.getUserId(), p.getAppId()); }
public void publishSync(Exchange exchange, Message message, @Nullable BasicProperties properties, @Nullable Publish publish) { publish(exchange, gson.toJson(message), getProperties(message, properties), Publish.forMessage(message, publish)); }
public void publishSync(Message message, @Nullable BasicProperties properties) { publishSync(message, properties, (Publish) null); }
public void publishSync(Message message, @Nullable BasicProperties properties, String routingKey) { publishSync(message, properties, new Publish(routingKey)); }
public void publishSync(Message message, @Nullable BasicProperties properties, @Nullable Publish publish) { client.publishSync(this, message, properties, publish); }
public ListenableFuture<?> publishAsync(Message message, @Nullable BasicProperties properties) { return publishAsync(message, properties, (Publish) null); }
public ListenableFuture<?> publishAsync(Message message, @Nullable BasicProperties properties, String routingKey) { return publishAsync(message, properties, new Publish(routingKey)); }
public ListenableFuture<?> publishAsync(Message message, @Nullable BasicProperties properties, @Nullable Publish publish) { return client.publishAsync(this, message, properties, publish); }
static Struct key(AMQP.BasicProperties basicProperties) { return new Struct(SCHEMA_KEY) .put(FIELD_BASIC_PROPERTIES_MESSAGEID, basicProperties.getMessageId()); }
public RabbitMessage(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) { this.consumerTag = consumerTag; this.envelope = envelope; this.properties = properties; this.body = body; }
public BasicProperties getProperties() { return properties; }
@Override public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) throws StageException { if (!isConnected() && !conf.advanced.automaticRecoveryEnabled) { // If we don't have automatic recovery enabled and the connection is closed, we should stop the pipeline. throw new StageException(Errors.RABBITMQ_05); } long maxTime = System.currentTimeMillis() + conf.basicConfig.maxWaitTime; int maxRecords = Math.min(maxBatchSize, conf.basicConfig.maxBatchSize); int numRecords = 0; String nextSourceOffset = lastSourceOffset; while (System.currentTimeMillis() < maxTime && numRecords < maxRecords) { try { RabbitMessage message = messages.poll(conf.basicConfig.maxWaitTime, TimeUnit.MILLISECONDS); if (message == null) { continue; } String recordId = message.getEnvelope().toString(); List<Record> records = parseRabbitMessage(recordId, message.getBody()); for (Record record : records){ Envelope envelope = message.getEnvelope(); BasicProperties properties = message.getProperties(); Record.Header outHeader = record.getHeader(); if (envelope != null) { setHeaderIfNotNull(outHeader, "deliveryTag", envelope.getDeliveryTag()); setHeaderIfNotNull(outHeader, "exchange", envelope.getExchange()); setHeaderIfNotNull(outHeader, "routingKey", envelope.getRoutingKey()); setHeaderIfNotNull(outHeader, "redelivered", envelope.isRedeliver()); } setHeaderIfNotNull(outHeader, "contentType", properties.getContentType()); setHeaderIfNotNull(outHeader, "contentEncoding", properties.getContentEncoding()); setHeaderIfNotNull(outHeader, "deliveryMode", properties.getDeliveryMode()); setHeaderIfNotNull(outHeader, "priority", properties.getPriority()); setHeaderIfNotNull(outHeader, "correlationId", properties.getCorrelationId()); setHeaderIfNotNull(outHeader, "replyTo", properties.getReplyTo()); setHeaderIfNotNull(outHeader, "expiration", properties.getExpiration()); setHeaderIfNotNull(outHeader, "messageId", properties.getMessageId()); setHeaderIfNotNull(outHeader, "timestamp", properties.getTimestamp()); setHeaderIfNotNull(outHeader, "messageType", properties.getType()); setHeaderIfNotNull(outHeader, "userId", properties.getUserId()); setHeaderIfNotNull(outHeader, "appId", properties.getAppId()); Map<String, Object> inHeaders = properties.getHeaders(); if (inHeaders != null) { for (Map.Entry<String, Object> pair : inHeaders.entrySet()) { // I am concerned about overlapping with the above headers but it seems somewhat unlikely // in addition the behavior of copying these attributes in with no custom prefix is // how the jms origin behaves setHeaderIfNotNull(outHeader, pair.getKey(), pair.getValue()); } } batchMaker.addRecord(record); nextSourceOffset = outHeader.getAttribute("deliveryTag"); numRecords++; } } catch (InterruptedException e) { LOG.warn("Pipeline is shutting down."); } } return nextSourceOffset; }
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { queue.add(new Delivery(envelope, properties, body)); }
public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) { this.envelope = envelope; this.properties = properties; this.body = body; }
private MessageController buildMessageController(final BasicProperties properties) { final String rawContenType = properties.getContentType(); final Optional<Message> contentType = MessageUtils.fromJson( ServiceProto.ContentType.getDefaultInstance(), rawContenType); final String rawDestinationServiceRef = properties.getAppId(); final Optional<Message> destinationServiceRef = MessageUtils.fromJson( ServiceProto.ServiceRef.getDefaultInstance(), rawDestinationServiceRef); final String rawSenderServiceRef = properties.getReplyTo(); final Optional<Message> senderServiceRef = MessageUtils.fromJson( ServiceProto.ServiceRef.getDefaultInstance(), rawSenderServiceRef); Optional<byte[]> optionalMessageId = Optional.absent(); Optional<byte[]> optionalCorrelationId = Optional.absent(); final String messageId = properties.getMessageId(); if (messageId != null && !messageId.isEmpty()) { optionalMessageId = Optional.of(B16.decode(messageId)); } final String correlationId = properties.getCorrelationId(); if (correlationId != null && !correlationId.isEmpty()) { optionalCorrelationId = Optional.of(B16.decode(correlationId)); } Optional<DateTime> expiresOptional = Optional.absent(); final Map<String, Object> headers = properties.getHeaders(); if (headers != null) { final Long millis = Longs.tryParse(headers.get("expires").toString()); if (millis != null) { final DateTime expires = new DateTime(millis); expiresOptional = Optional.of(expires); } } return new DefaultMessageController( (ServiceProto.ServiceRef) senderServiceRef.get(), (ServiceProto.ServiceRef) destinationServiceRef.get(), (ServiceProto.ContentType) contentType.get(), optionalMessageId, optionalCorrelationId, expiresOptional); }
BasicProperties properties();