public void onInput(Object event) { ObjectToAMQPCollectorContext holder = collectorDataTL.get(); if (holder == null) { if (settings.getExchange() != null && settings.getRoutingKey() != null) { if (log.isDebugEnabled()) { log.debug("Using exchange " + settings.getExchange() + " routing-key " + settings.getRoutingKey()); } holder = new ObjectToAMQPCollectorContext(new AMQPEmitter() { public void send(byte[] bytes) { try { channel.basicPublish(settings.getExchange(), settings.getRoutingKey(), null, bytes); } catch (IOException e) { String message = "Failed to publish to AMQP: " + e.getMessage(); log.error(message, e); throw new RuntimeException(message, e); } } public void send(byte[] bytes, Map<String, Object> headers) { try { Builder builder = new Builder(); channel.basicPublish(settings.getExchange(), settings.getRoutingKey(), builder.headers(headers).build(), bytes); } catch (IOException e) { String message = "Failed to publish to AMQP: " + e.getMessage(); log.error(message, e); throw new RuntimeException(message, e); } } }, event); } else { if (log.isDebugEnabled()) { log.debug("Using queue " + settings.getQueueName()); } holder = new ObjectToAMQPCollectorContext(new AMQPEmitter() { public void send(byte[] bytes) { try { channel.basicPublish("", settings.getQueueName(), null, bytes); } catch (IOException e) { String message = "Failed to publish to AMQP: " + e.getMessage(); log.error(message, e); throw new RuntimeException(message, e); } } public void send(byte[] bytes, Map<String, Object> headers) { try { Builder builder = new Builder(); channel.basicPublish("", settings.getQueueName(), builder.headers(headers).build(), bytes); } catch (IOException e) { String message = "Failed to publish to AMQP: " + e.getMessage(); log.error(message, e); throw new RuntimeException(message, e); } } }, event); } collectorDataTL.set(holder); } else { holder.setObject(event); } settings.getCollector().collect(holder); }