public static void main(String[] args) throws Exception { String queueName = "TestQueue"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); System.out.println(" [*] Waiting for messages..."); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } }
public void execute() throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("trade.request.q", true, consumer); int index = 0; while (true) { QueueingConsumer.Delivery message = consumer.nextDelivery(); String msg = new String(message.getBody()); System.out.println("processing trade: " + msg); String newMsg = "response"; byte[] bmsg = newMsg.getBytes(); Thread.sleep(responseTimes.get(index)); channel.basicPublish("", "trade.response.q", null, bmsg); index++; } }
public void execute() throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("trade.eq.q", true, consumer); QueueingConsumer.Delivery msg = null; while (true) { try { msg = consumer.nextDelivery(); String message = new String(msg.getBody()); System.out.println("message received: " + message); String[] parts = message.split(","); long shares = new Long(parts[2]).longValue(); Thread.sleep(1000); } catch (Exception e) { System.out.println("error with trade: " + e.getMessage()); System.out.println("sending to workflow"); channel.basicPublish("", "workflow.q", null, msg.getBody()); } } }
public void start(Connection connection) { try { active = true; Channel channel = connection.createChannel(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicQos(1); channel.basicConsume("trade.eq.q", false, consumer); while (active) { QueueingConsumer.Delivery msg = consumer.nextDelivery(); System.out.println("message received: " + new String(msg.getBody())); Thread.sleep(1000); channel.basicAck(msg.getEnvelope().getDeliveryTag(), false); } channel.close(); } catch (Exception e) { e.printStackTrace(); } }
public static void main(String[] args) throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("sync.q", true, consumer); displayCache(); while (true) { QueueingConsumer.Delivery msg = consumer.nextDelivery(); String body = new String(msg.getBody()); System.out.println("synchronize message received: " + body); String[] parts = body.split(","); String cust = parts[0]; long price = new Long(parts[2]).longValue(); price = (long)(price - (price*.10)); long cost = new Long(cache.get(cust).split(",")[1]).longValue() + price; long qty = new Long(cache.get(cust).split(",")[0]).longValue() + 1; cache.put(cust, qty + "," + cost); displayCache(); } }
public static void main(String[] args) throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicQos(1); channel.basicConsume("trade.eq.q", false, consumer); int numMsgs = args.length > 0 ? new Integer(args[0]).intValue() : 1; for (int i=0; i<numMsgs; i++) { QueueingConsumer.Delivery msg = consumer.nextDelivery(); System.out.println("message received: " + new String(msg.getBody())); Thread.sleep(1000); channel.basicAck(msg.getEnvelope().getDeliveryTag(), false); } AMQPCommon.close(channel); }
public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // setRemoteConnectionFactory(factory); setLocalConnectionFactory(factory); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } }
/** * @param args * @throws TimeoutException * @throws IOException * @throws InterruptedException * @throws ConsumerCancelledException * @throws ShutdownSignalException */ public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println("RPCServer Awating RPC request"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(props.getCorrelationId()) .build(); String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println("RPCServer fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
private void startSyncReceiveThread(final QueueingConsumer consumer, final boolean autoAck, final BindingVo binding) { syncReceiveThread = new SyncReceiveThread() { @Override public void run() { log.info("start listen to the " + typeStr + "[" + queue.getName() + "]."); while (running) { try { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); XCO xcoMessage = getMessage(delivery.getBody()); log.info("received a message from " + typeStr + "[" + queue.getName() + "]: " + xcoMessage); boolean result = exec(service, xcoMessage, binding); if (!autoAck && result) { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } catch (ShutdownSignalException e) { // TODO 可能会出现断链的问题 e.printStackTrace(); } catch (Throwable e) { log.error("listen to the [" + queue.getName() + "] error.", e); } } closed = true; } }; syncReceiveThread.start(); }
public String call(String message) throws Exception { String response = null; String corrId = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(), "UTF-8"); break; } } return response; }
public String call(String message) throws Exception { String response; String corrId = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(), "UTF-8"); break; } } return response; }
public void dispatchMessages() throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicQos(1); channel.basicConsume("trade.eq.q", false, consumer); while (true) { QueueingConsumer.Delivery msg = consumer.nextDelivery(); channel.basicAck(msg.getEnvelope().getDeliveryTag(), false); new Thread(new POJOThreadProcessor( this, new String(msg.getBody()))).start(); numThreads++; System.out.println("Threads: " + numThreads); } }
public static void main(String[] args) throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicQos(1); channel.basicConsume("trade.eq.q", false, consumer); while (true) { QueueingConsumer.Delivery msg = consumer.nextDelivery(); Thread.sleep(2000); System.out.println("Trade placed: " + new String(msg.getBody())); channel.basicAck(msg.getEnvelope().getDeliveryTag(), false); } }
public static void main(String[] args) throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("movie.q", true, consumer); while (true) { QueueingConsumer.Delivery msg = consumer.nextDelivery(); System.out.println("PROCESSING MOVIE ORDER: " + new String(msg.getBody())); Thread.sleep(2000); } }
public static void main(String[] args) throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("book.q", true, consumer); while (true) { QueueingConsumer.Delivery msg = consumer.nextDelivery(); System.out.println("PROCESSING BOOK ORDER: " + new String(msg.getBody())); Thread.sleep(2000); } }
public static void main(String[] args) throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("music.q", true, consumer); while (true) { QueueingConsumer.Delivery msg = consumer.nextDelivery(); System.out.println("PROCESSING MUSIC ORDER: " + new String(msg.getBody())); Thread.sleep(2000); } }
public void execute() throws Exception { long threshold = 2000; Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("trade.response.q", true, consumer); for (int i=0;i<10;i++) { long start = System.currentTimeMillis(); long shares = ((long) ((new Random().nextDouble() * 4000) + 1)); String text = "BUY,AAPL," + shares; byte[] message = text.getBytes(); System.out.println("sending trade: " + text); channel.basicPublish("", "trade.request.q", null, message); consumer.nextDelivery(); long end = System.currentTimeMillis(); long duration = end - start; System.out.println("trade confirmation received in " + duration + " ms"); System.out.println(""); Thread.sleep(1000); } }
public void execute() throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("workflow.q", true, consumer); while (true) { QueueingConsumer.Delivery message = consumer.nextDelivery(); String msg = new String(message.getBody()); System.out.println("message received: " + msg); String newMsg = msg.substring(0, msg.indexOf(" shares")); byte[] bmsg = newMsg.getBytes(); System.out.println("Trade fixed: " + newMsg); channel.basicPublish("", "trade.eq.q", null, bmsg); } }
public void execute() throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("trade.eq.q", true, consumer); while (true) { QueueingConsumer.Delivery msg = consumer.nextDelivery(); String message = new String(msg.getBody()); System.out.println("message received: " + message); String[] parts = message.split(","); long shares = new Long(parts[2]).longValue(); Thread.sleep(1000); } }
public static void main(String[] args) throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicQos(1); channel.basicConsume("trade.eq.q", false, consumer); while (true) { QueueingConsumer.Delivery msg = consumer.nextDelivery(); System.out.println("message received: " + new String(msg.getBody())); Thread.sleep(2000); channel.basicAck(msg.getEnvelope().getDeliveryTag(), false); } }
public void execute() throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("order.q", true, consumer); displayCache(); while (true) { QueueingConsumer.Delivery msg = consumer.nextDelivery(); String order = new String(msg.getBody()); System.out.println("order received: " + order); placeOrder(order); sendEvent(channel, order); displayCache(); } }
public static void main(String[] args) throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicQos(1); channel.basicConsume(args[0], false, consumer); while (true) { QueueingConsumer.Delivery msg = consumer.nextDelivery(1000); if (msg == null) break; System.out.println("message received: " + new String(msg.getBody())); Thread.sleep(100); channel.basicAck(msg.getEnvelope().getDeliveryTag(), false); } System.exit(0); }
public static void main(String[] args) throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("order.q", true, consumer); while (true) { QueueingConsumer.Delivery msg = consumer.nextDelivery(); String orderType = msg.getProperties().getHeaders().get("type").toString(); String orderItem = new String(msg.getBody()); processor.get(orderType).processOrder(orderItem); Thread.sleep(2000); } }
@Bean QueueingConsumer queueingConsumer() throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(messageQueueHostname); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(Constants.ANSWERSHEET_DATA_QUEUE, true, false, false, null); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(Constants.ANSWERSHEET_DATA_QUEUE, true, consumer); return consumer; }
@Override public void accept( QueueingConsumer.Delivery request ) { try { Map<String, Object> params = RabbitMQ.fromAMQPTable( request.getBody() ); Map<String, Object> ret = call.apply( params ); AMQP.BasicProperties requestProperties = request.getProperties(); String correlationId = requestProperties.getCorrelationId(); String replyTo = requestProperties.getReplyTo(); if( correlationId != null && replyTo != null ) { AMQP.BasicProperties replyProperties = new AMQP.BasicProperties.Builder() .correlationId( correlationId ) .build(); connection.getChannel( this ) .basicPublish( "", replyTo, replyProperties, RabbitMQ.toAMQPTable( ret ) ); } } catch( Throwable ex ) { LOG.log( Level.SEVERE, null, ex ); } }
public byte[] call(String methodName, String message) throws Exception { byte[] response = null; String corrId = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", REQUEST_QUEUE_NAME, props, message.getBytes("UTF-8")); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = delivery.getBody(); break; } } return response; }
/*** * getParam().getWaitTime() 指定消息池为空时的堵塞超时 * */ @Override public String readOneMessage() { try { channel.basicConsume(getParam().getQueue(), false, consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery(getParam().getWaitTime()); if (delivery != null) { deliveryTag = delivery != null ? delivery.getEnvelope().getDeliveryTag() : deliveryTag; String msg = getMessageContent(delivery); return msg; } else return null; } catch (IOException | ShutdownSignalException | ConsumerCancelledException | InterruptedException e) { throw new MqReceiveException(e); } }
public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } }
@Test public void receive() throws Exception { String QUEUE_NAME = "TEST_QUEUE"; // DeclareOk declare = channel.queueDeclare(QUEUE_NAME, false, false, false, null); // System.out.println("declare: " + declare); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); // while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(message); } }
@Test public void receivePersistent() throws Exception { String QUEUE_NAME = "TEST_PERSISTENT"; // DeclareOk declare = channel.queueDeclare(QUEUE_NAME, true, false, false, null);// durable=true channel.basicQos(1);// // System.out.println("declare: " + declare); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); // while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(message); // channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
@Test public void receiveExchange() throws Exception { String EXCHANGE_NAME = "TEST_EXCHANGE"; // com.rabbitmq.client.AMQP.Exchange.DeclareOk declare = channel .exchangeDeclare(EXCHANGE_NAME, "fanout"); // System.out.println("declare: " + declare); String queueName = channel.queueDeclare().getQueue(); System.out.println("queueName: " + queueName); channel.queueBind(queueName, EXCHANGE_NAME, ""); // QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); // while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(message); } }
@Test public void receiveExchangeDirect() throws Exception { String EXCHANGE_NAME = "TEST_EXCHANGE_DIRECT"; // com.rabbitmq.client.AMQP.Exchange.DeclareOk declare = channel .exchangeDeclare(EXCHANGE_NAME, "direct"); // System.out.println("declare: " + declare); String queueName = channel.queueDeclare().getQueue(); System.out.println("queueName: " + queueName); // channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "error"); // QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); // while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } }
@Test public void receiveExchangeTopic() throws Exception { String EXCHANGE_NAME = "TEST_EXCHANGE_TOPIC"; // com.rabbitmq.client.AMQP.Exchange.DeclareOk declare = channel .exchangeDeclare(EXCHANGE_NAME, "topic"); // System.out.println("declare: " + declare); String queueName = channel.queueDeclare().getQueue(); System.out.println("queueName: " + queueName); // channel.queueBind(queueName, EXCHANGE_NAME, "#"); channel.queueBind(queueName, EXCHANGE_NAME, "kern.*"); channel.queueBind(queueName, EXCHANGE_NAME, "*.critical"); // QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); // while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } }
private String rpcCall(String message) throws Exception { String QUEUE_NAME = "TEST_RPC"; String replyQueueName = channel.queueDeclare().getQueue(); System.out.println("replyQueueName: " + replyQueueName); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); String response = null; String corrId = java.util.UUID.randomUUID().toString(); BasicProperties props = new BasicProperties.Builder() .correlationId(corrId).replyTo(replyQueueName).build(); channel.basicPublish("", QUEUE_NAME, props, message.getBytes()); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody()); break; } } return response; }
public RPCClient() throws IOException, TimeoutException { Properties properties = new Properties(); ClassLoader loader = Thread.currentThread().getContextClassLoader(); try (InputStream resourceStream = loader.getResourceAsStream("twitter4j.properties")) { properties.load(resourceStream); } ConnectionFactory factory = new ConnectionFactory(); factory.setHost(properties.getProperty("rabbitmq")); this.connection = factory.newConnection(); this.channel = this.connection.createChannel(); this.replyQueueName = this.channel.queueDeclare().getQueue(); this.consumer = new QueueingConsumer(this.channel); this.channel.basicConsume(this.replyQueueName, true, this.consumer); this.mapper = new ObjectMapper(); this.mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); }
@Override public void run() { //Set the name of this thread Thread.currentThread().setName(THREAD_NAME); // Setup the exchange and subscribe to the route we need to service while (!Thread.interrupted()) { try { QueueingConsumer.Delivery delivery = queueConsumer.nextDelivery(); System.out.println("Ack Service thread running got:\n" + new String(delivery.getBody())); } catch (InterruptedException e) { break; } } }
@Override public void run() { //Set the name of this thread Thread.currentThread().setName(THREAD_NAME); // Setup the exchange and subscribe to the route we need to service while (!Thread.interrupted()) { try { QueueingConsumer.Delivery delivery = queueConsumer.nextDelivery(); System.out.println("MessageProtocolHandler thread received:\n" + new String(delivery.getBody())); } catch (InterruptedException e) { break; } } }
int consume() throws Exception { int count = 0; while (this.running) { QueueingConsumer.Delivery delivery; Log.v(TAG, "waiting for message..."); delivery = this.queueingConsumer.nextDelivery(); this.message = delivery.getBody(); count++; Log.v(TAG, String.format("message recieved\t[%d]", count)); this.messageRecieveHandler.post(new Runnable() { @Override public void run() { callback.messageReceived(RabbitMQConsumer.this.message); } }); this.channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); Log.v(TAG, "ack sent"); } return count; }
private void runListener(String queueName, final OutputStream os) throws Exception { if (queueName != null) { channel.queueDeclare(queueName, true, false, false, null); } else { queueName = channel.queueDeclare().getQueue(); } channel.queueBind(queueName, exchangeName, ""); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, false, consumer); int c = 0; while (true) { Delivery d = consumer.nextDelivery(5000); c += 1; if (d != null) { os.write(d.getBody()); channel.basicAck(d.getEnvelope().getDeliveryTag(), true); } if (d == null || c > 1000) { os.flush(); c = 0; } } }