private byte[] readMessage() throws Exception { final CountDownLatch countDown = new CountDownLatch(1); final AtomicReference<byte[]> result = new AtomicReference<>(); Channel channel = sender.get().createChannel(); try { channel.basicConsume(sender.queue(), true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { result.set(body); countDown.countDown(); } }); countDown.await(5, TimeUnit.SECONDS); } finally { channel.close(); } return result.get(); }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月13日 下午2:57:32 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行多重绑定 for (String severity : routingKeys2) { channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, severity); System.out.println("ReceiveLogsDirect2 exchange:" + EXCHANGE_NAME_ROUTING + ", queue:" + queueName + ", BindRoutingKey:" + severity); } System.out.println("ReceiveLogsDirect2 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsDirect2 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:53:18 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行绑定 for (String routingKey : routingKeys1) { channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, routingKey); System.out.println("ReceiveLogsDirect1 exchange:" + EXCHANGE_NAME_ROUTING + "," + " queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("ReceiveLogsDirect1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月11日 下午5:32:45 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ地址 factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); // 创建一个新的连接 Connection connection = factory.newConnection(); // 创建一个通道 Channel channel = connection.createChannel(); // 声明要关注的队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Customer Waiting Received messages"); // DefaultConsumer类实现了Consumer接口,通过传入一个频道, // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery Consumer consumer = new DefaultConsumer(channel) { //envelope主要存放生产者相关信息(比如交换机、路由key等) //body是消息实体 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Customer Received '" + message + "'"); } }; // 自动回复队列应答 -- RabbitMQ中的消息确认机制 channel.basicConsume(QUEUE_NAME, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:40:52 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 产生一个随机的队列名称 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");// 对队列进行绑定 System.out.println("ReceiveLogs1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogs1 Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer);// 队列会自动删除 }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午3:08:40 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个匹配模式的交换器 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由关键字 String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"}; // 绑定路由关键字 for (String bindingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, bindingKey); System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME_TOPIC+", queue:"+queueName+", BindRoutingKey:" + bindingKey); } System.out.println("ReceiveLogsTopic2 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsTopic2 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月13日 下午3:06:20 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个匹配模式的交换机 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由关键字 String[] routingKeys = new String[] { "*.orange.*" }; // 绑定路由 for (String routingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, routingKey); System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME_TOPIC + ", queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("ReceiveLogsTopic1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsTopic1 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * Configures the rabbitmq queues and the {@code shard-<id>-receive} consumer. * * Subclasses that want to change how messages are sent/received can override this method to disable the default implementation. * * @throws IOException if there's an error on {@link Channel#queueDeclare(String, boolean, boolean, boolean, Map)} or {@link Channel#basicConsume(String, boolean, Consumer)}. */ protected void configure() throws IOException { channel.queueDeclare("shard-" + shardId + "-send", true, false, false, null); channel.queueDeclare("shard-" + shardId + "-receive", true, false, false, null); channel.queueDeclare("shard-" + shardId + "-available", true, false, false, null); channel.queueDeclare("shard-" + shardId + "-unavailable", true, false, false, null); channel.basicConsume("shard-" + shardId + "-receive", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ClientWebSocketClient c = client.get(); if(c == null) return; JSONObject obj = new JSONObject(new String(body, StandardCharsets.UTF_8)); if(obj.has("t") && obj.getString("t").equals("gateway-ping-update")) { c.getJDA().setPing(obj.getJSONObject("d").getLong("ping")); return; } if(enableRawGatewayEvent) c.getJDA().getEventManager().handle(new RawGatewayEvent(c.getJDA(), new JSONObject(obj.toString()))); c.handleEvent(obj); } }); }
/** * 接受消息 */ public void receiveQuickstartMessage() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); AMQP.Queue.DeclareOk declareOk = channel.getChannel().queueDeclare(QUICKSTART_QUEUE_NAME, false, false, false, null); System.out.println("等待接受队列【" + QUICKSTART_QUEUE_NAME + "】消息"); //建立一个消费者 监听消息的接受 Consumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受消息:" + message); } }; channel.getChannel().basicConsume(QUICKSTART_QUEUE_NAME, true, consumer); //channel.close(); }
/** * 调用接口 * * @param message 消息内容 不能为空 */ public void client(String message) throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); String replyQueueName = channel.getChannel().queueDeclare().getQueue(); String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); System.out.println("rpc客户端发送消息:" + message); channel.getChannel().basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8")); channel.getChannel().basicConsume(replyQueueName, true, new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { System.out.println("rpc客户端收到结果:" + new String(body, "UTF-8") + "\n"); } } }); }
/** * 接受消息 */ public void receivePubsubMessage() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); channel.getChannel().exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.getChannel().queueDeclare().getQueue(); channel.getChannel().queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("等待接受pusub订阅【" + EXCHANGE_NAME + "】消息"); System.out.println("选择队列:"+queueName); Consumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受消息:" + message + "'"); } }; channel.getChannel().basicConsume(queueName, true, consumer); }
/** * 接受主题消息 */ public void receiveTopicMessage() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); channel.getChannel().exchangeDeclare(TOPIC, "topic"); String queueName = channel.getChannel().queueDeclare().getQueue(); channel.getChannel().queueBind(queueName, TOPIC, "bindingKey"); System.out.println("等待接受topic主题【" + TOPIC + "】消息"); System.out.println("选择队列:" + queueName); Consumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受消息:" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.getChannel().basicConsume(queueName, true, consumer); }
@POST @Path("rabbitmqRecv") public void send() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(properties.getHeaders()); String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); }
GabrielGateway(int shardId, Channel channel) throws IOException { super(shardId, channel); channel.queueDeclare("shard-" + shardId + "-getping", false, false, false, null); channel.queueDeclare("shard-" + shardId + "-getping-response", false, false, false, null); channel.basicConsume("shard-" + shardId + "-getping", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { channel.basicPublish("", "shard-" + shardId + "-getping-response", null, body); } }); }
public static synchronized void init(Channel channel) throws IOException, TimeoutException { if(current != null) { throw new IllegalStateException("Already started"); } current = new GatewayInfo("unknown", "unknown", -1, -1, -1, -1, -1, -1, -1); channel.queueDeclare("gateway-info", false, false, false, null); channel.basicConsume("gateway-info", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { JSONObject object = new JSONObject(new String(body, StandardCharsets.UTF_8)); JSONObject ram = object.getJSONObject("ram"); try { current = new GatewayInfo( object.getString("version"), object.getString("jda-version"), object.getDouble("cpu-usage"), object.getInt("thread-count"), object.getLong("uptime"), ram.getLong("used"), ram.getLong("free"), ram.getLong("total"), ram.getLong("max") ); } catch(JSONException e) { GabrielBot.LOGGER.error("Error creating GatewayInfo: " + e.getMessage()); } } }); }
public GabrielGatewayClient(int shardId, Channel channel) throws IOException { super(shardId, channel, true); channel.queueDeclare("shard-" + shardId + "-getping", false, false, false, null); channel.queueDeclare("shard-" + shardId + "-getping-response", false, false, false, null); channel.basicConsume("shard-" + shardId + "-getping-response", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long now = System.currentTimeMillis(); ping = now - Longs.fromByteArray(body); } }); calculatePing(); PING_CALCULATOR.scheduleAtFixedRate(this::calculatePing, 30, 30, TimeUnit.SECONDS); }
protected void connect() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); channel = connection.createChannel(); String queueName = "flowing-retail-" + name; channel.queueDeclare(queueName, true, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true); // publish/subscribe model channel.queueBind(queueName, EXCHANGE_NAME, "*"); System.out.println(" [*] Waiting for messages."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); eventHandler.handleEvent(message); } }; channel.basicConsume(queueName, true, consumer); }
@Test public void testConnectionFactory() throws Exception { Assert.assertNotNull(connectionFactory1); Assert.assertNotNull(queue); RabbitmqConnection connection = connectionFactory1.getConnection(); Assert.assertNotNull(connection); String queueName = "testing"; Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); String message = "Hello World!"; final CountDownLatch counter = new CountDownLatch(1); Consumer consume = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { Assert.assertEquals("Hello World!", new String(body)); counter.countDown(); } }; channel.basicConsume(queueName, true, consume); channel.basicPublish("", queueName, null, message.getBytes()); counter.await(10, TimeUnit.SECONDS); Assert.assertEquals(0, counter.getCount()); channel.close(); }
public void setup() throws Exception { if (log.isTraceEnabled()) { log.trace("setup()"); } this.consumer = new DefaultConsumer(this.channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { MessageEndpoint localEndpoint; try { localEndpoint = endpointFactory.createEndpoint(null); RabbitmqBytesMessage m = new RabbitmqBytesMessage(consumerTag,envelope,properties,body); onMessage(localEndpoint, m); } catch (UnavailableException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; if ("javax.jms.Queue".equals(this.spec.getDestinationType())) { RabbitmqAdminQueueImpl queue = Util.lookup(new InitialContext(), this.spec.getDestination(), RabbitmqAdminQueueImpl.class); this.channel.basicConsume(queue.getDestinationAddress(),true, consumer); } }
@Override protected Sender createSender() throws Exception { RabbitMQSender result = RabbitMQSender.newBuilder() .queue("zipkin-jmh") .addresses("localhost:5672").build(); CheckResult check = result.check(); if (!check.ok()) { throw new AssumptionViolatedException(check.error().getMessage(), check.error()); } channel = result.get().createChannel(); channel.queueDelete(result.queue()); channel.queueDeclare(result.queue(), false, true, true, null); Thread.sleep(500L); new Thread(() -> { try { channel.basicConsume(result.queue(), true, new DefaultConsumer(channel)); } catch (IOException e) { e.printStackTrace(); } }).start(); return result; }
@Override public void beginSubscriptionThread() throws InterruptedException, IOException { DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); LOG.info("[RabbitMQEventConsumer {}] Received Message: {}", Thread.currentThread().getId(), new String(body)); Gson gson = new Gson(); @SuppressWarnings("unchecked") final CoordinationEntryEvent<K> event = gson.fromJson(new String(body), CoordinationEntryEvent.class); routeEventToListeners(event); channel.basicAck(deliveryTag, false); } }; consumerTag = channel.basicConsume(queueName, false, consumer); }
public Channel addMsmbHandler(String routingKey, final MsmbMessageHandler handler) { Channel channel; try { channel = this.connection.createChannel(); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, MSMB_EXCHANGE, routingKey); channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); MsmbMessage msmbMessage = MessageBusClient.this.adaptor.buildResource(message, MsmbMessage.class); handler.handleMessage(msmbMessage); } }); } catch (IOException e) { throw new SDKMessageBusException(SDKErrorEnum.messageBusConnectionError, "Could not subscribe to Metric Streaming Message Bus", e); } return channel; }
@Override protected void doInAfterTrace(SpanRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) { DefaultConsumer consumer = (DefaultConsumer) target; Connection connection = consumer.getChannel().getConnection(); Envelope envelope = (Envelope) args[1]; AMQP.BasicProperties properties = (AMQP.BasicProperties) args[2]; byte[] body = (byte[]) args[3]; String exchange=envelope.getExchange(); if (exchange == null || exchange.equals("")) exchange = "unknown"; recorder.recordApi(methodDescriptor); recorder.recordAttribute(RabbitMQConstants.RABBITMQ_ROUTINGKEY_ANNOTATION_KEY, envelope.getRoutingKey()); recorder.recordRemoteAddress(connection.getAddress().getHostAddress() + ":" + connection.getPort()); if (throwable != null) { recorder.recordException(throwable); } }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月11日 下午5:55:38 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println("Worker1 Waiting for messages"); // 每次从队列获取的数量 //channel.basicQos(1);保证一次只分发一个 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Worker1 Received '" + message + "'"); try { //throw new Exception(); doWork(message); } catch (Exception e) { channel.abort(); } finally { System.out.println("Worker1 Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; //autoAck是否自动回复, //如果为true的话,每次生产者只要发送信息就会从内存中删除, //那么如果消费者程序异常退出,那么就无法获取数据, //我们当然是不希望出现这样的情况,所以才去手动回复, //每当消费者收到并处理信息然后在通知生成者。 //最后从队列中删除这条信息。 //如果消费者异常退出,如果还有其他消费者, //那么就会把队列中的消息发送给其他消费者, //如果没有,等消费者启动时候再次发送。 boolean autoAck = false; // 消息消费完成确认 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月11日 下午5:55:38 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println("Worker2 Waiting for messages"); // 每次从队列获取的数量 //channel.basicQos(1);保证一次只分发一个 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Worker2 Received '" + message + "'"); try { //throw new Exception(); doWork(message); } catch (Exception e) { channel.abort(); } finally { System.out.println("Worker2 Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; //autoAck是否自动回复, //如果为true的话,每次生产者只要发送信息就会从内存中删除, //那么如果消费者程序异常退出,那么就无法获取数据, //我们当然是不希望出现这样的情况,所以才去手动回复, //每当消费者收到并处理信息然后在通知生成者。 //最后从队列中删除这条信息。 //如果消费者异常退出,如果还有其他消费者, //那么就会把队列中的消息发送给其他消费者, //如果没有,等消费者启动时候再次发送。 boolean autoAck = false; // 消息消费完成确认 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); }
/** * Method to exchange a message to another service * @param data * @param routingKey * @return */ public String rabbitRPCRoutingKeyExchange(byte[] data, String routingKey){ this.corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(this.corrId).replyTo(replyQueueName).build(); try { channel.basicPublish(this.exchange, routingKey, props, data); channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { boolean b = response.offer(new String(body, ENCODE)); Log .forContext("responseStatus",b) .forContext("Service", "web-service") .information("rabbit message handled status "); } } }); return response.take(); } catch (Exception e) { Log .forContext("MemberName", "rabbitRPCRoutingKeyExchange") .forContext("Service", "web-service") .error(e,"Exception"); } return null; }
/** * 服务端开启服务 */ public void service() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); channel.getChannel().queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.getChannel().basicQos(1); System.out.println("等待rpc客户端连接..."); Consumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); String response = ""; try { String message = new String(body, "UTF-8"); System.out.println("服务端接受到消息:" + message); response = message + UUID.randomUUID().toString(); } catch (RuntimeException e) { e.printStackTrace(); } finally { channel.getChannel().basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.getChannel().basicAck(envelope.getDeliveryTag(), false); System.out.println("服务端将处理结果:" + response + ",返回客户单\n"); } } }; channel.getChannel().basicConsume(RPC_QUEUE_NAME, false, consumer); }
/** * 接受工作队列消息 */ public void receiveWorkQueueMessage() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); channel.getChannel().queueDeclare(WORK_QUEUE_NAME, true, false, false, null); channel.getChannel().basicQos(1); System.out.println("等待接受workQueue队列【"+WORK_QUEUE_NAME+"】消息"); //建立一个消费者 监听消息的接受 DefaultConsumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受workQueue消息:" + message); try { for (char ch : message.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } finally { getChannel().basicAck(envelope.getDeliveryTag(), false); } } }; channel.getChannel().basicConsume(WORK_QUEUE_NAME, false, consumer); }
private Consumer createConsumer(Channel channel) { final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { incomingMessagesSubject.onNext(body); } }; return consumer; }
protected void init() throws IOException { Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { processByteMessage(body); } }; channel.basicConsume(topic, true, consumer); }
private Consumer createConsumer(final Channel channel) { return new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); String row[] = ExportOnServerVerifier.RoughCSVTokenizer.tokenize(new String(body, Charsets.UTF_8)); if (row.length != 29) { return; } ExportOnServerVerifier.ValidationErr err = null; try { err = ExportOnServerVerifier.verifyRow(row); } catch (ExportOnServerVerifier.ValidationErr validationErr) { validationErr.printStackTrace(); } if (err != null) { System.out.println("ERROR in validation: " + err.toString()); } if (++m_verifiedRows % VALIDATION_REPORT_INTERVAL == 0) { System.out.println("Verified " + m_verifiedRows + " rows."); } channel.basicAck(deliveryTag, false); } }; }
@SneakyThrows({IOException.class, InterruptedException.class}) public MessageWrapper receiveMessage(String queue, int timeoutInSeconds) { Channel channel = connectionService.getChannel(); BlockingQueue<MessageWrapper> result = new ArrayBlockingQueue<MessageWrapper>(1); channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String response = new String(body); MessageWrapper messageWrapper = new MessageWrapper(response, properties); result.add(messageWrapper); } }); return result.poll(timeoutInSeconds, TimeUnit.SECONDS); }
public <T extends BaseModelResource> Channel addScmbTypedHandler(String routingKey, final ScmbTypedMessageHandler<T> handler) { Channel channel; try { channel = this.connection.createChannel(); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, SCMB_EXCHANGE, routingKey); channel.basicConsume(queue, true, new DefaultConsumer(channel) { @SuppressWarnings("unchecked") @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); ScmbTypedMessage<T> scmbMessage = (ScmbTypedMessage<T>) MessageBusClient.this.adaptor.buildResource( message, handler.typeToken().getType()); handler.handleMessage(scmbMessage); } }); } catch (IOException e) { throw new SDKMessageBusException(SDKErrorEnum.messageBusConnectionError, "Could not subscribe to State-Changed Message Bus", e); } return channel; }
public Channel addScmbHandler(String routingKey, final ScmbMessageHandler handler) { Channel channel; try { channel = this.connection.createChannel(); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, SCMB_EXCHANGE, routingKey); channel.basicConsume(queue, true, new DefaultConsumer(channel) { @SuppressWarnings("unchecked") @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); ScmbMessage scmbMessage = MessageBusClient.this.adaptor.buildResource(message, ScmbMessage.class); handler.handleMessage(scmbMessage); } }); } catch (IOException e) { throw new SDKMessageBusException(SDKErrorEnum.messageBusConnectionError, "Could not subscribe to State-Changed Message Bus", e); } return channel; }
private static DefaultConsumer instantiateConsumer(final Channel ch1, final String priority) { return new DefaultConsumer(ch1) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("[" + priority + " priority] Consumed " + new String(body, "UTF-8")); } }; }
void subscribe() { subscribeThread = new Thread(new Runnable() { Connection connection = null; @Override public void run() { try { ConnectionFactory factory; factory = new ConnectionFactory(); setupConnectionFactory(factory); connection = factory.newConnection(); Log.d("debug", "connection success"); Channel channel = connection.createChannel(); channel.basicQos(1); AMQP.Queue.DeclareOk q = channel.queueDeclare(); channel.queueBind(q.getQueue(), "logs", ""); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); Log.d("TEST", " [x] Received '" + message + "'"); try { JSONObject jsonObject = new JSONObject(message); //dataList 변수에 vel_raw, vel_max, dis_raw, dis_max 정보가 담기게 된다. //저장한 값을 그래프에 넣어준다. dataList = parseJsonFromDataSet(jsonObject); float[] velRaw = dataList.getVelRaw(); float velMax = dataList.getVelMax(); float[] disRaw = dataList.getDisRaw(); float disMax = dataList.getDisMax(); ArrayList<ChartItem> chartItems = new ArrayList<>(); chartItems.add(new LineChartItem(generateDataLine(velRaw, "vel_raw"), getApplicationContext(),velMax)); chartItems.add(new LineChartItem(generateDataLine(disRaw, "dis_raw"), getApplicationContext(),disMax)); ChartDataAdapter cda = new ChartDataAdapter(getApplicationContext(), chartItems); lv.setAdapter(cda); } catch (IOException | JSONException | ParseException e) { e.printStackTrace(); } } }; channel.basicConsume(q.getQueue(), true, consumer); } catch (Exception e1) { Log.d("", "Connection broken: " + e1.getClass().getName()); try { Thread.sleep(4000); //sleep and then try again } catch (InterruptedException e) { ; } } } }); subscribeThread.start(); }
@Test public void basicConsume() throws Exception { String exchangeName = "basicConsumeExchange"; String queueName = "basicConsumeQueue"; String routingKey = "#"; channel.exchangeDeclare(exchangeName, "direct", true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); byte[] messageBodyBytes = "Hello, world!".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); final CountDownLatch latch = new CountDownLatch(1); channel.basicConsume(queueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); channel.basicAck(deliveryTag, false); latch.countDown(); } }); latch.await(30, TimeUnit.SECONDS); List<MockSpan> finishedSpans = mockTracer.finishedSpans(); int tries = 10; while (tries > 0 && finishedSpans.size() < 2) { TimeUnit.SECONDS.sleep(1L); finishedSpans = mockTracer.finishedSpans(); tries--; } assertEquals(2, finishedSpans.size()); checkSpans(finishedSpans); assertNull(mockTracer.activeSpan()); }
/** * * This method will create a Connection with a channel to the provided server and Will provide a consumer to consume * Notifications sent on the subscribed Queue * * @throws IOException * */ public void connect() throws IOException { ConnectionFactory factory = initConnectionFactory(); while (true) { try { if (this.connection == null || !this.connection.isOpen()) { this.connection = factory.newConnection(); } if (this.channel == null || !this.channel.isOpen()) { this.channel = this.connection.createChannel(); } initChannel(this.channel); final DefaultConsumer consumer = new DefaultConsumer(this.channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); receiveMessage(new String(body)); } }; this.consumerTag = this.channel.basicConsume(this.queue, true, consumer); log.info("Successfully connected to RabbitMQ Server :- " + this.serverIP); return; } catch (Exception e) { // Ignore connection timeout and retry until you connect... log.error("Failed to connect to Rabbit MQ server '" + this.serverIP + "' Error:" + e.getMessage()); generateConnectionFailureAlert(); try { Thread.sleep(RECONNECT_DELAY); } catch (InterruptedException ex) { log.error("RabbitMQ connect was interrupted! '" + this.serverIP + "' Error:" + e.getMessage()); return; } } } }