@Override protected Connection createListenerContainer() throws Exception { log.debug("Creating connection"); Connection conn = endpoint.connect(executorService); log.debug("Creating channel"); Channel channel = conn.createChannel(); // setup the basicQos if (endpoint.isPrefetchEnabled()) { channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal()); } //Let the server pick a random name for us DeclareOk result = channel.queueDeclare(); log.info("Using temporary queue name: {}", result.getQueue()); setReplyTo(result.getQueue()); //TODO check for the RabbitMQConstants.EXCHANGE_NAME header channel.queueBind(getReplyTo(), endpoint.getExchangeName(), getReplyTo()); consumer = new RabbitConsumer(this, channel); consumer.start(); return conn; }
@Test public void receive() throws Exception { String QUEUE_NAME = "TEST_QUEUE"; // DeclareOk declare = channel.queueDeclare(QUEUE_NAME, false, false, false, null); // System.out.println("declare: " + declare); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); // while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(message); } }
@Test public void receivePersistent() throws Exception { String QUEUE_NAME = "TEST_PERSISTENT"; // DeclareOk declare = channel.queueDeclare(QUEUE_NAME, true, false, false, null);// durable=true channel.basicQos(1);// // System.out.println("declare: " + declare); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); // while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(message); // channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
@Test public void receiveExchange() throws Exception { String EXCHANGE_NAME = "TEST_EXCHANGE"; // com.rabbitmq.client.AMQP.Exchange.DeclareOk declare = channel .exchangeDeclare(EXCHANGE_NAME, "fanout"); // System.out.println("declare: " + declare); String queueName = channel.queueDeclare().getQueue(); System.out.println("queueName: " + queueName); channel.queueBind(queueName, EXCHANGE_NAME, ""); // QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); // while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(message); } }
@Test public void receiveExchangeDirect() throws Exception { String EXCHANGE_NAME = "TEST_EXCHANGE_DIRECT"; // com.rabbitmq.client.AMQP.Exchange.DeclareOk declare = channel .exchangeDeclare(EXCHANGE_NAME, "direct"); // System.out.println("declare: " + declare); String queueName = channel.queueDeclare().getQueue(); System.out.println("queueName: " + queueName); // channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "error"); // QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); // while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } }
@Test public void receiveExchangeTopic() throws Exception { String EXCHANGE_NAME = "TEST_EXCHANGE_TOPIC"; // com.rabbitmq.client.AMQP.Exchange.DeclareOk declare = channel .exchangeDeclare(EXCHANGE_NAME, "topic"); // System.out.println("declare: " + declare); String queueName = channel.queueDeclare().getQueue(); System.out.println("queueName: " + queueName); // channel.queueBind(queueName, EXCHANGE_NAME, "#"); channel.queueBind(queueName, EXCHANGE_NAME, "kern.*"); channel.queueBind(queueName, EXCHANGE_NAME, "*.critical"); // QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); // while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } }
/** * Check if a certain receiver (queue) exists. * * @param receiver * the name of the receiver. * @return true if it does, false if it doesn't exists. */ public boolean doesReceiverExist(final String receiver) { if (objTemplate != null) { return objTemplate.execute(new ChannelCallback<DeclareOk>() { @Override public DeclareOk doInRabbit(com.rabbitmq.client.Channel channel) throws Exception { try { Configuration config = objResources.getConfiguration(); String name = config.getQueuePrefix() + receiver + config.getQueueSuffix(); return channel.queueDeclarePassive(name); } catch (Exception e) { objMonitor.info(SendController.class, "The receiver client seems to be existing. This does not mean that it's connected and receiving message."); return null; } } }) != null; } return false; }
@Test public void creatQueue() throws Exception { String QUEUE_NAME = "TEST_BENCHMARK"; // Channel channel = createChannel(); DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(declareOk); assertNotNull(declareOk); }
@Test public void createExchange() throws Exception { String EXCHANGE_NAME = "TEST_BENCHMARK"; // Channel channel = createChannel(); com.rabbitmq.client.AMQP.Exchange.DeclareOk declareOk = channel .exchangeDeclare(EXCHANGE_NAME, "fanout"); System.out.println(declareOk); assertNotNull(declareOk); }
@Test public void send() throws Exception { String QUEUE_NAME = "TEST_QUEUE"; // String message = "Hello World"; DeclareOk declare = channel.queueDeclare(QUEUE_NAME, false, false, false, null); // System.out.println("declare: " + declare); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); }
@Test public void sendPersistent() throws Exception { String QUEUE_NAME = "TEST_PERSISTENT"; // String message = "Hello World"; DeclareOk declare = channel.queueDeclare(QUEUE_NAME, true, false, false, null);// durable=true // System.out.println("declare: " + declare); channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); }
@Test public void sendExchange() throws Exception { String EXCHANGE_NAME = "TEST_EXCHANGE"; // String message = "Hello World"; com.rabbitmq.client.AMQP.Exchange.DeclareOk declare = channel .exchangeDeclare(EXCHANGE_NAME, "fanout"); // System.out.println("declare: " + declare); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); }
@Test public void sendExchangeDirect() throws Exception { String EXCHANGE_NAME = "TEST_EXCHANGE_DIRECT"; // String message = "Hello World"; com.rabbitmq.client.AMQP.Exchange.DeclareOk declare = channel .exchangeDeclare(EXCHANGE_NAME, "direct"); // System.out.println("declare: " + declare); String severity = "info"; channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); }
@Test public void sendExchangeTopic() throws Exception { String EXCHANGE_NAME = "TEST_EXCHANGE_TOPIC"; // String message = "Hello World"; com.rabbitmq.client.AMQP.Exchange.DeclareOk declare = channel .exchangeDeclare(EXCHANGE_NAME, "topic"); // System.out.println("declare: " + declare); String routingKey = "kern.critical"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); }
@Test public void rpcServer() throws Exception { String QUEUE_NAME = "TEST_RPC"; // DeclareOk declare = channel.queueDeclare(QUEUE_NAME, false, false, false, null); // System.out.println("declare: " + declare); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder() .correlationId(props.getCorrelationId()).build(); String message = new String(delivery.getBody()); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
@Override public void run() { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout", durable); DeclareOk ok = channel.queueDeclare(queue_name, durable, false, false, null); String queueName = ok.getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [" + queue_name + "] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); // 消息分发处理 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, false, consumer); while (true) { Thread.sleep(2000); Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [" + queue_name + "] Received '" + message + "'"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } catch (Exception e) { e.printStackTrace(); } }