/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月13日 下午2:57:32 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行多重绑定 for (String severity : routingKeys2) { channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, severity); System.out.println("ReceiveLogsDirect2 exchange:" + EXCHANGE_NAME_ROUTING + ", queue:" + queueName + ", BindRoutingKey:" + severity); } System.out.println("ReceiveLogsDirect2 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsDirect2 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:53:18 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行绑定 for (String routingKey : routingKeys1) { channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, routingKey); System.out.println("ReceiveLogsDirect1 exchange:" + EXCHANGE_NAME_ROUTING + "," + " queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("ReceiveLogsDirect1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月11日 下午5:32:45 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ地址 factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); // 创建一个新的连接 Connection connection = factory.newConnection(); // 创建一个通道 Channel channel = connection.createChannel(); // 声明要关注的队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Customer Waiting Received messages"); // DefaultConsumer类实现了Consumer接口,通过传入一个频道, // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery Consumer consumer = new DefaultConsumer(channel) { //envelope主要存放生产者相关信息(比如交换机、路由key等) //body是消息实体 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Customer Received '" + message + "'"); } }; // 自动回复队列应答 -- RabbitMQ中的消息确认机制 channel.basicConsume(QUEUE_NAME, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:40:52 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 产生一个随机的队列名称 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");// 对队列进行绑定 System.out.println("ReceiveLogs1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogs1 Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer);// 队列会自动删除 }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午3:08:40 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个匹配模式的交换器 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由关键字 String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"}; // 绑定路由关键字 for (String bindingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, bindingKey); System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME_TOPIC+", queue:"+queueName+", BindRoutingKey:" + bindingKey); } System.out.println("ReceiveLogsTopic2 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsTopic2 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月13日 下午3:06:20 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个匹配模式的交换机 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由关键字 String[] routingKeys = new String[] { "*.orange.*" }; // 绑定路由 for (String routingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, routingKey); System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME_TOPIC + ", queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("ReceiveLogsTopic1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsTopic1 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * 接受消息 */ public void receiveQuickstartMessage() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); AMQP.Queue.DeclareOk declareOk = channel.getChannel().queueDeclare(QUICKSTART_QUEUE_NAME, false, false, false, null); System.out.println("等待接受队列【" + QUICKSTART_QUEUE_NAME + "】消息"); //建立一个消费者 监听消息的接受 Consumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受消息:" + message); } }; channel.getChannel().basicConsume(QUICKSTART_QUEUE_NAME, true, consumer); //channel.close(); }
/** * 接受消息 */ public void receivePubsubMessage() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); channel.getChannel().exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.getChannel().queueDeclare().getQueue(); channel.getChannel().queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("等待接受pusub订阅【" + EXCHANGE_NAME + "】消息"); System.out.println("选择队列:"+queueName); Consumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受消息:" + message + "'"); } }; channel.getChannel().basicConsume(queueName, true, consumer); }
/** * 接受主题消息 */ public void receiveTopicMessage() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); channel.getChannel().exchangeDeclare(TOPIC, "topic"); String queueName = channel.getChannel().queueDeclare().getQueue(); channel.getChannel().queueBind(queueName, TOPIC, "bindingKey"); System.out.println("等待接受topic主题【" + TOPIC + "】消息"); System.out.println("选择队列:" + queueName); Consumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受消息:" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.getChannel().basicConsume(queueName, true, consumer); }
protected void connect() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); channel = connection.createChannel(); String queueName = "flowing-retail-" + name; channel.queueDeclare(queueName, true, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true); // publish/subscribe model channel.queueBind(queueName, EXCHANGE_NAME, "*"); System.out.println(" [*] Waiting for messages."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); eventHandler.handleEvent(message); } }; channel.basicConsume(queueName, true, consumer); }
/** * Declare an exchange, via an interface that allows the complete set of arguments. * * @see com.rabbitmq.client.AMQP.Exchange.Declare * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk <br> * exchange the name of the exchange <br> * type the exchange type <br> * durable true if we are declaring a durable exchange (the exchange will survive a server restart) <br> * autoDelete true if the server should delete the exchange when it is no longer in use <br> * internal true if the exchange is internal, i.e. can't be directly published to by a client. <br> * arguments other properties (construction arguments) for the exchange * @throws java.io.IOException if an error is encountered */ public void subscribe(Exchange exchange, Topic topic, String routingKey) throws Exception { channel.exchangeDeclare(exchange.exchange(), exchange.type(), exchange.durable(), exchange.autoDelete(), exchange.autoDelete(), exchange.properties()); channel.queueDeclare(topic.name(), topic.durable(), topic.exclusive(), topic.autoDelete(), null); channel.queueBind(topic.name(), exchange.exchange(), routingKey); final Consumer consumer = createConsumer(channel); boolean autoAck = true; channel.basicConsume(topic.name(), autoAck, consumer); }
@Test public void testConnectionFactory() throws Exception { Assert.assertNotNull(connectionFactory1); Assert.assertNotNull(queue); RabbitmqConnection connection = connectionFactory1.getConnection(); Assert.assertNotNull(connection); String queueName = "testing"; Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); String message = "Hello World!"; final CountDownLatch counter = new CountDownLatch(1); Consumer consume = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { Assert.assertEquals("Hello World!", new String(body)); counter.countDown(); } }; channel.basicConsume(queueName, true, consume); channel.basicPublish("", queueName, null, message.getBytes()); counter.await(10, TimeUnit.SECONDS); Assert.assertEquals(0, counter.getCount()); channel.close(); }
@Test public void testStoppingConsumerShutdownConnectionWhenServerHasClosedChannel() throws Exception { AlreadyClosedException alreadyClosedException = Mockito.mock(AlreadyClosedException.class); RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor); Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3)); Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1); Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn); Mockito.when(conn.createChannel()).thenReturn(channel); Mockito.when(channel.basicConsume(anyString(), anyBoolean(), any(Consumer.class))).thenReturn("TAG"); Mockito.when(channel.isOpen()).thenReturn(false); Mockito.doThrow(alreadyClosedException).when(channel).basicCancel("TAG"); Mockito.doThrow(alreadyClosedException).when(channel).close(); consumer.doStart(); consumer.doStop(); Mockito.verify(conn).close(30 * 1000); }
public LapinQueueSubscription(Stream<QueueSignal> lapinStream, final Subscriber<? super QueueSignal> subscriber, Queue queue, boolean bindAckToRequest, Map<String, Object> consumerArguments, Subscription dependency, java.util.function.Consumer<QueueSignal> doOnNext ) { super(lapinStream, subscriber); this.queueConfig = queue; this.bindAckToRequest = bindAckToRequest; this.consumerArguments = consumerArguments; this.dependency = dependency; this.doOnNext = doOnNext != null ? doOnNext : new java.util.function.Consumer<QueueSignal>() { @Override public void accept(QueueSignal queueSignal) { subscriber.onNext(queueSignal); } }; }
@Test public void testConsumerSingleMessage() throws Exception { TransferQueue<RabbitMessage> messages = new LinkedTransferQueue<>(); Channel channel = mock(Channel.class); final Consumer consumer = new StreamSetsMessageConsumer(channel, messages); final Envelope envelope = new Envelope(1L, false, EXCHANGE_NAME, QUEUE_NAME); executor.submit(new Runnable() { @Override public void run() { try { consumer.handleDelivery("consumerTag", envelope, null, TEST_MESSAGE_1.getBytes()); } catch (IOException ignored) { // no op } } }); RabbitMessage message = messages.take(); assertEquals(TEST_MESSAGE_1, new String(message.getBody(), StandardCharsets.UTF_8)); }
private String handleConsumerDeclare(Method method, Object[] args) throws Exception { if (config.isConsumerRecoveryEnabled()) { Consumer consumer = (Consumer) args[args.length - 1]; args[args.length - 1] = new ConsumerDelegate(this, consumer); String consumerTag = (String) Reflection.invoke(delegate, method, args); String queueName = "".equals(args[0]) ? lastGeneratedQueueName : (String) args[0]; QueueDeclaration queueDeclaration = connectionHandler.queueDeclarations.get(queueName); if (queueDeclaration != null) queueName = queueDeclaration.name; consumerDeclarations.put(consumerTag, new ConsumerDeclaration(queueDeclaration, method, args)); log.info("".equals(queueName) ? "Created consumer-{}{} via {}" : "Created consumer-{} of {} via {}", consumerTag, queueName, this); return consumerTag; } else return (String) Reflection.invoke(delegate, method, args); }
/** * Asserts that channel specific configuration overrides global config. */ public void shouldOverrideGlobalConfig() throws Throwable { mockConnection(); MockChannel mc = mockChannel(1); ((ConfigurableChannel) mc.proxy).withChannelRecoveryPolicy(RecoveryPolicies.recoverNever()) .withChannelRecoveryPolicy(RecoveryPolicies.recoverNever()); when(mc.delegate.basicConsume(anyString(), any(Consumer.class))).thenThrow( retryableChannelShutdownSignal()); try { mc.proxy.basicConsume("foo", null); fail(); } catch (Exception e) { verify(mc.delegate).basicConsume(anyString(), any(Consumer.class)); } }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月11日 下午5:55:38 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println("Worker1 Waiting for messages"); // 每次从队列获取的数量 //channel.basicQos(1);保证一次只分发一个 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Worker1 Received '" + message + "'"); try { //throw new Exception(); doWork(message); } catch (Exception e) { channel.abort(); } finally { System.out.println("Worker1 Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; //autoAck是否自动回复, //如果为true的话,每次生产者只要发送信息就会从内存中删除, //那么如果消费者程序异常退出,那么就无法获取数据, //我们当然是不希望出现这样的情况,所以才去手动回复, //每当消费者收到并处理信息然后在通知生成者。 //最后从队列中删除这条信息。 //如果消费者异常退出,如果还有其他消费者, //那么就会把队列中的消息发送给其他消费者, //如果没有,等消费者启动时候再次发送。 boolean autoAck = false; // 消息消费完成确认 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月11日 下午5:55:38 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println("Worker2 Waiting for messages"); // 每次从队列获取的数量 //channel.basicQos(1);保证一次只分发一个 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Worker2 Received '" + message + "'"); try { //throw new Exception(); doWork(message); } catch (Exception e) { channel.abort(); } finally { System.out.println("Worker2 Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; //autoAck是否自动回复, //如果为true的话,每次生产者只要发送信息就会从内存中删除, //那么如果消费者程序异常退出,那么就无法获取数据, //我们当然是不希望出现这样的情况,所以才去手动回复, //每当消费者收到并处理信息然后在通知生成者。 //最后从队列中删除这条信息。 //如果消费者异常退出,如果还有其他消费者, //那么就会把队列中的消息发送给其他消费者, //如果没有,等消费者启动时候再次发送。 boolean autoAck = false; // 消息消费完成确认 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); }
/** * 服务端开启服务 */ public void service() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); channel.getChannel().queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.getChannel().basicQos(1); System.out.println("等待rpc客户端连接..."); Consumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); String response = ""; try { String message = new String(body, "UTF-8"); System.out.println("服务端接受到消息:" + message); response = message + UUID.randomUUID().toString(); } catch (RuntimeException e) { e.printStackTrace(); } finally { channel.getChannel().basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.getChannel().basicAck(envelope.getDeliveryTag(), false); System.out.println("服务端将处理结果:" + response + ",返回客户单\n"); } } }; channel.getChannel().basicConsume(RPC_QUEUE_NAME, false, consumer); }
@Override public void preProcess(Consumer t, Object proxy, Method method, Object[] args) { Map<String, Object> params = new HashMap<String, Object>(); params.put(CaptureConstants.INFO_CLIENT_REQUEST_URL, url); params.put(CaptureConstants.INFO_CLIENT_REQUEST_ACTION, "Consumer." + method.getName()); params.put(CaptureConstants.INFO_CLIENT_APPID, applicationId); params.put(CaptureConstants.INFO_CLIENT_TYPE, "rabbitmq.client"); if (logger.isDebugable()) { logger.debug("Invoke START:" + url + ",op=Consumer." + method.getName(), null); } UAVServer.instance().runMonitorCaptureOnServerCapPoint(CaptureConstants.CAPPOINT_APP_CLIENT, Monitor.CapturePhase.PRECAP, params); // 调用链只关心真正消费消息 if (method.getName().equals("handleDelivery")) { AMQP.BasicProperties props = (BasicProperties) args[2]; if (props.getHeaders() != null && props.getHeaders().containsKey(InvokeChainConstants.PARAM_MQHEAD_SPANINFO)) { params.put(InvokeChainConstants.PARAM_MQHEAD_SPANINFO, props.getHeaders().get(InvokeChainConstants.PARAM_MQHEAD_SPANINFO) + ""); params.put(CaptureConstants.INFO_APPSERVER_CONNECTOR_REQUEST_URL, url); } // register adapter UAVServer.instance().runSupporter("com.creditease.uav.apm.supporters.InvokeChainSupporter", "registerAdapter", RabbitmqConsumerAdapter.class); UAVServer.instance().runSupporter("com.creditease.uav.apm.supporters.InvokeChainSupporter", "runCap", InvokeChainConstants.CHAIN_APP_SERVICE, InvokeChainConstants.CapturePhase.PRECAP, params, RabbitmqConsumerAdapter.class, args); } }
private void doCap(int rc, Channel channel, Consumer consumer, Method method, Throwable e) { if (null == targetServer) { Map<String, Object> conn = channel.getConnection().getServerProperties(); String cluster_name = (null == conn.get("cluster_name")) ? "unknown" : conn.get("cluster_name").toString(); String version = (null == conn.get("version")) ? "unknown" : conn.get("version").toString(); targetServer = cluster_name + "@" + version; } Map<String, Object> params = new HashMap<String, Object>(); params.put(CaptureConstants.INFO_CLIENT_TARGETSERVER, targetServer); params.put(CaptureConstants.INFO_CLIENT_RESPONSECODE, rc); if (logger.isDebugable()) { logger.debug("Invoke END: rc=" + rc + "," + targetServer, null); } UAVServer.instance().runMonitorCaptureOnServerCapPoint(CaptureConstants.CAPPOINT_APP_CLIENT, Monitor.CapturePhase.DOCAP, params); if (method.getName().equals("handleDelivery")) { params.put(CaptureConstants.INFO_APPSERVER_CONNECTOR_REQUEST_URL, url); params.put(CaptureConstants.INFO_CLIENT_APPID, applicationId); params.put(InvokeChainConstants.CLIENT_IT_CLASS, consumer.getClass().getName()); // 调用链只关心一个方法 params.put(InvokeChainConstants.CLIENT_IT_METHOD, "handleDelivery"); params.put(CaptureConstants.INFO_CLIENT_RESPONSECODE, rc); if (rc == -1) { params.put(CaptureConstants.INFO_CLIENT_RESPONSESTATE, e.toString()); } // invoke chain UAVServer.instance().runSupporter("com.creditease.uav.apm.supporters.InvokeChainSupporter", "runCap", InvokeChainConstants.CHAIN_APP_SERVICE, InvokeChainConstants.CapturePhase.DOCAP, params, RabbitmqConsumerAdapter.class, null); } }
/** * listen on rabbit mq topic. * * @param topic to subscribe on. * @throws Exception if failed to subscribe. */ public void subscribe(Topic topic) throws Exception { channel.queueDeclare(topic.name(), topic.durable(), topic.exclusive(), topic.autoDelete(), null); final Consumer consumer = createConsumer(channel); boolean autoAck = true; channel.basicConsume(topic.name(), autoAck, consumer); }
private Consumer createConsumer(Channel channel) { final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { incomingMessagesSubject.onNext(body); } }; return consumer; }
protected void init() throws IOException { Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { processByteMessage(body); } }; channel.basicConsume(topic, true, consumer); }
private Consumer createConsumer(final Channel channel) { return new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); String row[] = ExportOnServerVerifier.RoughCSVTokenizer.tokenize(new String(body, Charsets.UTF_8)); if (row.length != 29) { return; } ExportOnServerVerifier.ValidationErr err = null; try { err = ExportOnServerVerifier.verifyRow(row); } catch (ExportOnServerVerifier.ValidationErr validationErr) { validationErr.printStackTrace(); } if (err != null) { System.out.println("ERROR in validation: " + err.toString()); } if (++m_verifiedRows % VALIDATION_REPORT_INTERVAL == 0) { System.out.println("Verified " + m_verifiedRows + " rows."); } channel.basicAck(deliveryTag, false); } }; }
@Test public void testUnsubscribe() throws IOException { AmqpConsumerAdapter adapter = createAdapterWithNonDurableConf("myTopic", "myGroupId", false); String consumerTag = "my consumer tag"; when(mockChannel.basicConsume(anyString(), anyBoolean(), any(Consumer.class))).thenReturn(consumerTag); adapter.subscribe((jsonMessage, ackHandler) -> { }); adapter.unsubscribe(); verify(mockChannel).basicCancel(consumerTag); }
@Test(expected = ChannelException.class) public void testUnsubscribeException() throws IOException { AmqpConsumerAdapter adapter = createAdapterWithNonDurableConf("myTopic", "myGroupId", false); String consumerTag = "my consumer tag"; when(mockChannel.basicConsume(anyString(), anyBoolean(), any(Consumer.class))) .thenThrow(IOException.class); adapter.subscribe((jsonMessage, ackHandler) -> { }); adapter.unsubscribe(); }
public static void main(String[] args) throws IOException { ConnectionFactory cf = new ConnectionFactory(); Connection conn = cf.newConnection(); Channel ch1 = conn.createChannel(); Channel ch2 = conn.createChannel(); ch1.queueDeclare(HIGH_PRIORITY_Q, false, false, true, null); ch2.queueDeclare(LOW_PRIORITY_Q, false, false, true, null); Consumer hiCons = instantiateConsumer(ch1, "high"); Consumer loCons = instantiateConsumer(ch2, "low"); Channel ch3 = conn.createChannel(); createBacklog(ch3, 30, LOW_PRIORITY_Q, "low"); createBacklog(ch3, 3, HIGH_PRIORITY_Q, "high"); ch1.basicConsume(HIGH_PRIORITY_Q, hiCons); ch2.basicConsume(LOW_PRIORITY_Q, loCons); Random rnd = new Random(); try { while(true) { // publish one low priority message ch3.basicPublish("", LOW_PRIORITY_Q, null, "message".getBytes()); // publish one low or high priority one String key = routingKeys.get(rnd.nextInt(routingKeys.size())); ch3.basicPublish("", key, null, "message".getBytes()); Thread.sleep(500); } } catch (InterruptedException ie) { System.out.println("Terminating..."); System.exit(0); } }
@Override public void run() { try { channel = connection.createChannel(); channel.exchangeDeclare(exchangeName, EXCHANGE_TYPE); channel.queueDeclare(queueName, true, false, false, null).getQueue(); channel.queueBind(queueName, exchangeName, ""); System.out.println(" [*] Waiting for messages."); Consumer consumer = new StatusConsumer(channel); channel.basicConsume(queueName, true, consumer); } catch (Exception e) { e.printStackTrace(); } }
@Test public void testListenImplDynamicQueues() throws IOException { String randoq = "randoq"; String exchange = "exchange"; String router = "router"; AMQImpl.Queue.DeclareOk ok = mock(AMQImpl.Queue.DeclareOk.class); when(ok.getQueue()).thenReturn(randoq); when(channel.queueDeclare()).thenReturn(ok); amqpTransport.setChannel(channel); AMQPConsumerCallback consumerCallback = mock(AMQPConsumerCallback.class); AMQPCommonListenProperties commonListenProperties = AMQPSyncConsumerBuilder.builder() .callback(consumerCallback) .dynamicQueueCreation(true) .exchange(exchange) .dynamicQueueRoutingKey(router) .poisonQueueEnabled(Boolean.TRUE) .prefetchCount(1) .buildListenProperties(); amqpTransport.listenImpl(commonListenProperties); verify(amqpTransport).getConsumer(consumerCallback, commonListenProperties, "." + randoq); verify(amqpTransport).createDynamicQueue(exchange, router, true); verify(channel).basicConsume(eq(randoq), eq(false), any(Consumer.class)); verify(channel).basicQos(1); }
@Test public void testListenImplDynamicQueuesPurgeOnConnect() throws IOException { String randoq = "randoq"; String exchange = "exchange"; String router = "router"; AMQImpl.Queue.DeclareOk ok = mock(AMQImpl.Queue.DeclareOk.class); when(ok.getQueue()).thenReturn(randoq); when(channel.queueDeclare()).thenReturn(ok); amqpTransport.setChannel(channel); AMQPConsumerCallback consumerCallback = mock(AMQPConsumerCallback.class); AMQPCommonListenProperties commonListenProperties = AMQPSyncConsumerBuilder.builder() .callback(consumerCallback) .dynamicQueueCreation(true) .exchange(exchange) .purgeOnConnect(Boolean.TRUE) .dynamicQueueRoutingKey(router) .poisonQueueEnabled(Boolean.TRUE) .prefetchCount(1) .buildListenProperties(); amqpTransport.listenImpl(commonListenProperties); verify(channel).queuePurge(randoq); verify(amqpTransport).getConsumer(consumerCallback, commonListenProperties, "." + randoq); verify(amqpTransport).createDynamicQueue(exchange, router, true); verify(channel).basicConsume(eq(randoq), eq(false), any(Consumer.class)); verify(channel).basicQos(1); }
@Test public void testListenImplBasicConfig() throws IOException { String queue = "randoq"; String exchange = "exchange"; String routingKey = "routingKey"; AMQPConsumerBuilder.ExchangeType exchangeType = AMQPConsumerBuilder.ExchangeType.DIRECT; AMQImpl.Queue.DeclareOk ok = mock(AMQImpl.Queue.DeclareOk.class); when(ok.getQueue()).thenReturn(queue); when(channel.queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), anyMap())).thenReturn(ok); amqpTransport.setChannel(channel); AMQPConsumerCallback consumerCallback = mock(AMQPConsumerCallback.class); AMQPCommonListenProperties commonListenProperties = AMQPSyncConsumerBuilder.builder() .callback(consumerCallback) .autoCreateAndBind(exchange, exchangeType, queue, routingKey) .poisonQueueEnabled(Boolean.TRUE) .prefetchCount(1) .buildListenProperties(); amqpTransport.listenImpl(commonListenProperties); verify(amqpTransport).getConsumer(consumerCallback, commonListenProperties, ""); verify(amqpTransport).autoCreateAndBind(exchange, exchangeType.toString(), queue, routingKey, true); verify(channel, times(1)).exchangeDeclare(exchange, exchangeType.toString(), true); verify(channel, times(1)).queueDeclare(queue, true, false, false, null); verify(channel, times(1)).queueBind(queue, exchange, routingKey); verify(channel, times(1)).queueDeclare(queue + AMQPTransport.POISON, true, false, false, null); verify(channel, times(1)).queueBind(queue + AMQPTransport.POISON, exchange, routingKey + AMQPTransport.POISON); verify(channel).basicConsume(eq(queue), eq(false), any(Consumer.class)); verify(channel).basicQos(1); }
@Override public Closeable consume(Function<T, Void> processor) { Consumer consumer = new FunctionConsumer<>(channel, processor, type, name, metrics); try { String tag = channel.basicConsume(name, false, consumer); log.info("Set up consumer '{}' for queue '{}'.", tag, name); return () -> channel.basicCancel(tag); } catch (IOException e) { throw new MessageQueueException("Unable to set up consumer.", e); } }
private void notifyConsumerRecoveryStarted(Consumer consumer) { for (ConsumerListener listener : config.getConsumerListeners()) try { listener.onRecoveryStarted(consumer, proxy); } catch (Exception ignore) { } }
private void notifyConsumerRecoveryCompleted(Consumer consumer) { for (ConsumerListener listener : config.getConsumerListeners()) try { listener.onRecoveryCompleted(consumer, proxy); } catch (Exception ignore) { } }
private void notifyConsumerRecoveryFailure(Consumer consumer, Exception e) { for (ConsumerListener listener : config.getConsumerListeners()) try { listener.onRecoveryFailure(consumer, proxy, e); } catch (Exception ignore) { } }
protected Consumer mockConsumer(String queueName, int consumerNumber) throws IOException { Consumer consumer = consumers.get(consumerNumber); if (consumer == null) { String consumerTag = String.format("%s-%s", delegate.getChannelNumber(), consumerNumber); consumer = new MockConsumer(proxy, consumerNumber); when(delegate.basicConsume(eq(queueName), argThat(matcherFor(consumer)))).thenReturn( consumerTag); proxy.basicConsume(queueName, consumer); consumers.put(consumerNumber, consumer); } return consumer; }
static ArgumentMatcher<Consumer> matcherFor(final Consumer consumer) { return new ArgumentMatcher<Consumer>() { @Override public boolean matches(Object arg) { return arg instanceof MockConsumer ? ((MockConsumer) arg).equals(consumer) : ((ConsumerDelegate) arg).delegate.equals(consumer); } }; }