public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); factory.setHost("localhost"); factory.setPort(5672); Connection newConnection = factory.newConnection(); Channel channel = newConnection.createChannel(); Scanner scanner = new Scanner(System.in); String message = ""; while(!message.equals("exit")){ System.out.println("Enter your message"); message = scanner.next(); channel.queueDeclare("flink-test", true, false, false, null); channel.basicPublish("", "flink-test", new BasicProperties.Builder() .correlationId(java.util.UUID.randomUUID().toString()).build(), message.getBytes()); } scanner.close(); channel.close(); newConnection.close(); }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String msg = getMessage(argv); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("dd MMM yyyy @ HH:mm:ss"); String sDate = sdf.format(date); String finalMsg = sDate + ": " + msg; channel.basicPublish(EXCHANGE_NAME, "", null, finalMsg.getBytes("UTF-8")); System.out.println("Emmited message: " + finalMsg); channel.close(); conn.close(); }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月11日 下午5:53:02 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); // 分发信息 for (int i = 0; i < 20; i++) { String message = "Hello RabbitMQ" + i; channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("NewTask send '" + message + "'"); } channel.close(); connection.close(); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:49:49 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct");// 注意是direct // 发送信息 for (String routingKey : routingKeys) { String message = "RoutingSendDirect Send the message level:" + routingKey; channel.basicPublish(EXCHANGE_NAME_ROUTING, routingKey, null, message.getBytes()); System.out.println("RoutingSendDirect Send" + routingKey + "':'" + message); } channel.close(); connection.close(); }
private void close(Connection connection, Channel channel) { try { if (channel != null && channel.isOpen()) { if (this.consumerTag != null) { channel.basicCancel(this.consumerTag); this.consumerTag = null; } log.info("Closing RabbitMQ Channel - " + this.serverIP); channel.close(); this.channel = null; } if (connection != null && connection.isOpen()) { log.info("Closing RabbitMQ Connection - " + this.serverIP); connection.close(CLOSE_CONNECTION_TIMEOUT); this.connection = null; } } catch (Exception e) { log.error("Failed to close RabbitMQ connections " + this.serverIP, e); } }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // factory.setHost(""); factory.setUri("amqp://alpha.netkiller.cn"); factory.setUsername("admin"); // factory.setPassword("admin123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
/** * Do something with a pooled channel (similar to Spring JDBC TransactionTemplate#execute) */ private <T> T execute(ChannelCallback<T> callback) throws Exception { Channel channel; try { channel = channelPool.borrowObject(); } catch (IllegalStateException e) { // Since this method is not synchronized its possible the // channelPool has been cleared by another thread checkConnectionAndChannelPool(); channel = channelPool.borrowObject(); } if (!channel.isOpen()) { log.warn("Got a closed channel from the pool"); // Reconnect if another thread hasn't yet checkConnectionAndChannelPool(); channel = channelPool.borrowObject(); } try { return callback.doWithChannel(channel); } finally { channelPool.returnObject(channel); } }
/** * 普通消息监听 * * @param message 消息实体 * @param channel channel 就是当前的会话通道 * @throws Exception 备注: 手动ack就是在当前channel里面调用basicAsk的方法,并传入当前消息的tagId就可以了。 */ @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); logger.debug("deliveryTag= " + deliveryTag); try { logger.info("------消费者处理消息------"); logger.info("receive message" + message.getMessageProperties().getAppId()); logger.info("receive channel" + channel.getChannelNumber() + "----"); // 获取消息 if (null != message.getBody()) { EventMessage eventMessage = (EventMessage) ObjectAndByteCovertUtil.ByteToObject(message.getBody()); if (null != eventMessage) { System.out.println(Thread.currentThread().getName() + ":" + TimeUtils.getSysTime("yyyy-MM-dd HH:mm:ss") + ":[下游应用- 消费普通消息]:" + message.getMessageProperties()); // TODO 业务处理 } } // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除 ) channel.basicAck(deliveryTag, false); } catch (Exception e) { logger.warn("message consume failed: " + e.getMessage()); // ack返回false,requeue-true并重新回到队列 channel.basicNack(deliveryTag, false, true); } }
/** * 延迟消息监听并处理 * * @param message 消息实体 * @param channel channel 就是当前的会话通道 * @throws Exception 备注: 手动ack就是在当前channel里面调用basicAsk的方法,并传入当前消息的tagId就可以了。 */ @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); logger.debug("deliveryTag= " + deliveryTag); try { logger.info("[延时消息]" + message.getMessageProperties()); // 获取消息 if (null != message.getBody()) { EventMessage eventMessage = (EventMessage) ObjectAndByteCovertUtil.ByteToObject(message.getBody()); if (null != eventMessage) { System.out.println(Thread.currentThread().getName() + ":" + TimeUtils.getSysTime("yyyy-MM-dd HH:mm:ss") + ":[下游应用-消费延时消息]:" + eventMessage.getObject().toString()); // TODO 业务处理 } } // 手动确认 - false只确认当前一个消息收到,true确认所有consumer获得的消息 channel.basicAck(deliveryTag, false); } catch (Exception e) { logger.warn("message consume failed: " + e.getMessage()); // ack返回false,requeue-true并重新回到队列 channel.basicNack(deliveryTag, false, true); } }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:37:37 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// fanout表示分发,所有的消费者得到同样的队列信息 // 分发信息 for (int i = 0; i < 5; i++) { String message = "Hello World" + i; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println("EmitLog Sent '" + message + "'"); } channel.close(); connection.close(); }
@Override public void onMessage(Message message, Channel channel) throws IOException { System.out.println("----- received" + message.getMessageProperties()); try { Object msg = messageConverter.fromMessage(message); if (!appId.equals(message.getMessageProperties().getAppId())){ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); throw new SecurityException("非法应用appId:" + message.getMessageProperties().getAppId()); } Object service = ctx.getBean(message.getMessageProperties().getHeaders().get("ServiceName").toString()); String serviceMethodName = message.getMessageProperties().getHeaders().get("ServiceMethodName").toString(); Method method = service.getClass().getMethod(serviceMethodName, msg.getClass()); method.invoke(service, msg); //确认消息成功消费 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { System.out.println("------ err"+ e.getMessage()); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } }
private void updateTaskStatus(TaskStatus status) { logger.info("[Study = " + taskStudy + "] [Unit = "+ unitId + "] Sending task update to server. Task id = [" + task.getId() + "] status = ["+status.toString()+"]"); final String QUEUE_NAME = SystemConstants.UBONGO_SERVER_TASKS_STATUS_QUEUE; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(serverAddress); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); task.setStatus(status); RabbitData message = new RabbitData(task, MachineConstants.UPDATE_TASK_REQUEST); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); if (logger.isDebugEnabled()) { logger.debug(" [!] Sent '" + message.getMessage() + "'"); } channel.close(); connection.close(); } catch (Exception e){ logger.error("[Study = " + taskStudy + "] [Unit = "+ unitId + "] Failed sending task status to server. Task id = [" + task.getId() + "] Status = [" + status.toString() + "] error: " + e.getMessage(), e); } }
public static void main(String[] args) throws Exception { String queueName = "TestQueue"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); System.out.println(" [*] Waiting for messages..."); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } }
private byte[] readMessage() throws Exception { final CountDownLatch countDown = new CountDownLatch(1); final AtomicReference<byte[]> result = new AtomicReference<>(); Channel channel = sender.get().createChannel(); try { channel.basicConsume(sender.queue(), true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { result.set(body); countDown.countDown(); } }); countDown.await(5, TimeUnit.SECONDS); } finally { channel.close(); } return result.get(); }
public void execute() throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("trade.request.q", true, consumer); int index = 0; while (true) { QueueingConsumer.Delivery message = consumer.nextDelivery(); String msg = new String(message.getBody()); System.out.println("processing trade: " + msg); String newMsg = "response"; byte[] bmsg = newMsg.getBytes(); Thread.sleep(responseTimes.get(index)); channel.basicPublish("", "trade.response.q", null, bmsg); index++; } }
public void execute() throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("trade.eq.q", true, consumer); QueueingConsumer.Delivery msg = null; while (true) { try { msg = consumer.nextDelivery(); String message = new String(msg.getBody()); System.out.println("message received: " + message); String[] parts = message.split(","); long shares = new Long(parts[2]).longValue(); Thread.sleep(1000); } catch (Exception e) { System.out.println("error with trade: " + e.getMessage()); System.out.println("sending to workflow"); channel.basicPublish("", "workflow.q", null, msg.getBody()); } } }
public void start(Connection connection) { try { active = true; Channel channel = connection.createChannel(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicQos(1); channel.basicConsume("trade.eq.q", false, consumer); while (active) { QueueingConsumer.Delivery msg = consumer.nextDelivery(); System.out.println("message received: " + new String(msg.getBody())); Thread.sleep(1000); channel.basicAck(msg.getEnvelope().getDeliveryTag(), false); } channel.close(); } catch (Exception e) { e.printStackTrace(); } }
public static void main(String[] args) throws Exception { //建立连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置连接地址 factory.setHost("seaof-153-125-234-173.jp-tokyo-10.arukascloud.io"); factory.setPort(31084); //获取连接 Connection connection = factory.newConnection(); //获取渠道 Channel channel = connection.createChannel(); //声明交换机类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //产生随机数字 String message = RandomStringUtils.randomNumeric(8); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); channel.close(); connection.close(); }
public static void main(String[] args) throws Exception { Channel channel = AMQPCommon.connect(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicQos(1); channel.basicConsume("trade.eq.q", false, consumer); int numMsgs = args.length > 0 ? new Integer(args[0]).intValue() : 1; for (int i=0; i<numMsgs; i++) { QueueingConsumer.Delivery msg = consumer.nextDelivery(); System.out.println("message received: " + new String(msg.getBody())); Thread.sleep(1000); channel.basicAck(msg.getEnvelope().getDeliveryTag(), false); } AMQPCommon.close(channel); }
/** * Get a channel (create it if needed). * @return The channel. */ private static Channel getChannel() { // thread-safe singleton synchronized (RabbitUtil.class) { // check if channel is null if (_channel == null) { try { // create a new connection and then a new channel _channel = RabbitConfiguration.connectionFactory().newConnection().createChannel(); } catch (IOException e) { e.printStackTrace(); } } } // return channel return _channel; }
@Bean public SpringAMQPMessageSource messageSourceApiGateway(Serializer serializer) { return new SpringAMQPMessageSource(new DefaultAMQPMessageConverter(serializer)) { @RabbitListener(queues = "${spring.application.queue}") @Override public void onMessage(Message message, Channel channel) throws Exception { super.onMessage(message, channel); } }; }
/** * @param args * @throws TimeoutException * @throws IOException * @throws InterruptedException * @throws ConsumerCancelledException * @throws ShutdownSignalException */ public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println("RPCServer Awating RPC request"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(props.getCorrelationId()) .build(); String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println("RPCServer fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月13日 下午2:57:32 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行多重绑定 for (String severity : routingKeys2) { channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, severity); System.out.println("ReceiveLogsDirect2 exchange:" + EXCHANGE_NAME_ROUTING + ", queue:" + queueName + ", BindRoutingKey:" + severity); } System.out.println("ReceiveLogsDirect2 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsDirect2 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:53:18 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行绑定 for (String routingKey : routingKeys1) { channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, routingKey); System.out.println("ReceiveLogsDirect1 exchange:" + EXCHANGE_NAME_ROUTING + "," + " queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("ReceiveLogsDirect1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月11日 下午5:21:46 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = null; Channel channel = null; try { // 创建一个新的连接 connection = factory.newConnection(); // 创建一个通道 channel = connection.createChannel(); // 声明一个队列 // queueDeclare第一个参数表示队列名称 //第二个参数为是否持久化(true表示是,队列将在服务器重启时生存) //第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除) //第四个参数为当所有消费者客户端连接断开时是否自动删除队列 //第五个参数为队列的其他参数 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "{\"temperature\":100}"; // 发送消息到队列中 //basicPublish第一个参数为交换机名称 //第二个参数为队列映射的路由key //第三个参数为消息的其他属性 //第四个参数为发送信息的主体 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("Producer Send +'" + message + "'"); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭通道和连接 channel.close(); connection.close(); } }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月11日 下午5:32:45 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ地址 factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); // 创建一个新的连接 Connection connection = factory.newConnection(); // 创建一个通道 Channel channel = connection.createChannel(); // 声明要关注的队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Customer Waiting Received messages"); // DefaultConsumer类实现了Consumer接口,通过传入一个频道, // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery Consumer consumer = new DefaultConsumer(channel) { //envelope主要存放生产者相关信息(比如交换机、路由key等) //body是消息实体 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Customer Received '" + message + "'"); } }; // 自动回复队列应答 -- RabbitMQ中的消息确认机制 channel.basicConsume(QUEUE_NAME, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:40:52 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 产生一个随机的队列名称 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");// 对队列进行绑定 System.out.println("ReceiveLogs1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogs1 Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer);// 队列会自动删除 }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午3:03:24 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); connection = factory.newConnection(); channel = connection.createChannel(); // 声明一个匹配模式的交换机 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); // 待发送的消息 String[] routingKeys = new String[] { "quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "quick.brown.fox", "quick.orange.male.rabbit", "lazy.orange.male.rabbit" }; // 发送消息 for (String severity : routingKeys) { String message = "From " + severity + " routingKey' s message!"; channel.basicPublish(EXCHANGE_NAME_TOPIC, severity, null, message.getBytes()); System.out.println("TopicSend Sent '" + severity + "':'" + message + "'"); } } catch (Exception e) { e.printStackTrace(); if (connection != null) { channel.close(); connection.close(); } } finally { if (connection != null) { channel.close(); connection.close(); } } }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午3:08:40 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个匹配模式的交换器 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由关键字 String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"}; // 绑定路由关键字 for (String bindingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, bindingKey); System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME_TOPIC+", queue:"+queueName+", BindRoutingKey:" + bindingKey); } System.out.println("ReceiveLogsTopic2 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsTopic2 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月13日 下午3:06:20 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个匹配模式的交换机 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由关键字 String[] routingKeys = new String[] { "*.orange.*" }; // 绑定路由 for (String routingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, routingKey); System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME_TOPIC + ", queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("ReceiveLogsTopic1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsTopic1 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
private void initChannel(Channel channel) throws IOException { channel.basicQos(1); // this.channel.exchangeDeclare(this.exchange, TOPIC); Map<String, Object> args = new HashMap<>(); args.put("x-expires", 180000); // Three minutes channel.queueDeclare(QUEUE_NAME, true, true, true, args); channel.queueBind(QUEUE_NAME, NOVA_EXCHANGE, ROUTING_KEY); channel.queueBind(QUEUE_NAME, NEUTRON_EXCHANGE, ROUTING_KEY); channel.queueBind(QUEUE_NAME, KEYSTONE_EXCHANGE, ROUTING_KEY); channel.queueBind(this.queue, this.exchange, this.routingKey); }
public static void main(String[] argv) throws java.io.IOException, TimeoutException { Rabbit rabbit = new Rabbit("localhost"); Channel channel = rabbit.createChannel(); rabbit.makeQueue(channel, QUEUE_NAME); sendMessage(channel, QUEUE_NAME, "Hello EIP!"); channel.close(); rabbit.close(); }
public static void main(String[] argv) throws java.io.IOException, TimeoutException { Rabbit rabbit = new Rabbit("localhost"); Channel channel = rabbit.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); sendMessage(channel, EXCHANGE_NAME, "Widget", "Hello EIP Widget!"); sendMessage(channel, EXCHANGE_NAME, "Gadget", "Hello EIP Gadget!"); channel.close(); rabbit.close(); }
public void closeChannel(Channel channel) { try { channel.close(); } catch (Throwable e) { log.error("RabbitMq channel close error.", e); } }
@POST @Path("rabbitmqRecv") public void send() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(properties.getHeaders()); String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); }