@Override public void process(Exchange exchange) throws Exception { final FacilityStateChangedEvent event = exchange.getIn().getBody(FacilityStateChangedEvent.class); if (event != null) { // Transform final Station station = event.getSource(); final Facility facility = event.getFacility(); final FacilityState oldFacilityState = event.getOldSnapshot() == null ? null : event.getOldSnapshot().getState(); final FacilityState newFacilityState = event.getNewSnapshot().getState(); final String stationName = station.getName(); final Long facilityEquipmentnumber = facility.getEquipmentnumber(); final String facilityDescription = facility.getDescription() != null ? facility.getDescription() : MessageFormat.format("{0} {1,number,#}", facility.getType(), facilityEquipmentnumber); final String subject = MessageFormat.format("{0}: {1} is now {2}", stationName, facilityDescription, newFacilityState); final String message; if (oldFacilityState != null) { message = MessageFormat.format("{0}: {1} has changed state from {2} to {3}", stationName, facilityDescription, oldFacilityState, newFacilityState); } else { message = MessageFormat.format("{0}: {1} has changed state to {3}", stationName, facilityDescription, newFacilityState); } exchange.getIn().setHeader(SnsConstants.SUBJECT, subject); final String topic = MessageFormat.format("facility-{0,number,#}", facilityEquipmentnumber); exchange.getIn().setHeader(DynamicSnsConstants.TOPIC, topic); exchange.getIn().setBody(message); } }
private String determineSubject(Exchange exchange) { String subject = exchange.getIn().getHeader(SnsConstants.SUBJECT, String.class); if (subject == null) { subject = getConfiguration().getSubject(); } return subject; }
private String determineMessageStructure(Exchange exchange) { String structure = exchange.getIn().getHeader(SnsConstants.MESSAGE_STRUCTURE, String.class); if (structure == null) { structure = getConfiguration().getMessageStructure(); } return structure; }
public void process(Exchange exchange) throws Exception { // TODO cache arns and don't create if not necessary final String topic = determineTopic(exchange); // creates a new topic, or returns the URL of an existing one CreateTopicRequest request = new CreateTopicRequest(topic); LOG.trace("Creating topic [{}] with request [{}]...", topic, request); final AmazonSNS snsClient = getEndpoint().getSNSClient(); CreateTopicResult result = snsClient.createTopic(request); final String topicArn = result.getTopicArn(); LOG.trace("Topic created with Amazon resource name: {}", topicArn); final SnsConfiguration configuration = getEndpoint().getConfiguration(); if (ObjectHelper.isNotEmpty(configuration.getPolicy())) { LOG.trace("Updating topic [{}] with policy [{}]", topicArn, configuration.getPolicy()); snsClient.setTopicAttributes( new SetTopicAttributesRequest(topicArn, "Policy", configuration.getPolicy())); LOG.trace("Topic policy updated"); } PublishRequest publishRequest = new PublishRequest(); publishRequest.setTopicArn(topicArn); publishRequest.setSubject(determineSubject(exchange)); publishRequest.setMessageStructure(determineMessageStructure(exchange)); publishRequest.setMessage(exchange.getIn().getBody(String.class)); LOG.trace("Sending request [{}] from exchange [{}]...", publishRequest, exchange); PublishResult publishResult = snsClient.publish(publishRequest); LOG.trace("Received result [{}]", publishResult); Message message = getMessageForResponse(exchange); message.setHeader(SnsConstants.MESSAGE_ID, publishResult.getMessageId()); }