Java 类com.rabbitmq.client.Consumer 实例源码

项目:june.mq    文件:ReceiveLogsDirect2.java   
/**
 * @param args
 * @throws IOException
 * @throws TimeoutException
 * @date 2017年7月13日 下午2:57:32
 * @writer junehappylove
 */
public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setPort(port);
    factory.setVirtualHost(virtualHost);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    // 声明交换器
    channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct");
    // 获取匿名队列名称
    String queueName = channel.queueDeclare().getQueue();
    // 根据路由关键字进行多重绑定
    for (String severity : routingKeys2) {
        channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, severity);
        System.out.println("ReceiveLogsDirect2 exchange:" + EXCHANGE_NAME_ROUTING + ", queue:" + queueName
                + ", BindRoutingKey:" + severity);
    }
    System.out.println("ReceiveLogsDirect2 Waiting for messages");

    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws UnsupportedEncodingException {
            String message = new String(body, "UTF-8");
            System.out.println("ReceiveLogsDirect2 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
        }
    };
    channel.basicConsume(queueName, true, consumer);

}
项目:june.mq    文件:ReceiveLogsDirect1.java   
/**
 * @param args
 * @throws TimeoutException
 * @throws IOException
 * @date 2017年7月13日 下午2:53:18
 * @writer junehappylove
 */
public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setPort(port);
    factory.setVirtualHost(virtualHost);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    // 声明交换器
    channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct");
    // 获取匿名队列名称
    String queueName = channel.queueDeclare().getQueue();

    // 根据路由关键字进行绑定
    for (String routingKey : routingKeys1) {
        channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, routingKey);
        System.out.println("ReceiveLogsDirect1 exchange:" + EXCHANGE_NAME_ROUTING + "," + " queue:" + queueName
                + ", BindRoutingKey:" + routingKey);
    }
    System.out.println("ReceiveLogsDirect1  Waiting for messages");
    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
        }
    };
    channel.basicConsume(queueName, true, consumer);

}
项目:june.mq    文件:Customer.java   
/**
 * @param args
 * @throws TimeoutException
 * @throws IOException
 * @date 2017年7月11日 下午5:32:45
 * @writer junehappylove
 */
public static void main(String[] args) throws IOException, TimeoutException {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 设置RabbitMQ地址
    factory.setHost(host);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setPort(port);
    factory.setVirtualHost(virtualHost);
    // 创建一个新的连接
    Connection connection = factory.newConnection();
    // 创建一个通道
    Channel channel = connection.createChannel();
    // 声明要关注的队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println("Customer Waiting Received messages");
    // DefaultConsumer类实现了Consumer接口,通过传入一个频道,
    // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
    Consumer consumer = new DefaultConsumer(channel) {
        //envelope主要存放生产者相关信息(比如交换机、路由key等)
        //body是消息实体
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Customer Received '" + message + "'");
        }
    };
    // 自动回复队列应答 -- RabbitMQ中的消息确认机制
    channel.basicConsume(QUEUE_NAME, true, consumer);
}
项目:june.mq    文件:ReceiveLogs1.java   
/**
 * @param args
 * @throws TimeoutException
 * @throws IOException
 * @date 2017年7月13日 下午2:40:52
 * @writer junehappylove
 */
public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setPort(port);
    factory.setVirtualHost(virtualHost);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    // 产生一个随机的队列名称
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");// 对队列进行绑定

    System.out.println("ReceiveLogs1 Waiting for messages");
    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("ReceiveLogs1 Received '" + message + "'");
        }
    };
    channel.basicConsume(queueName, true, consumer);// 队列会自动删除
}
项目:june.mq    文件:ReceiveLogsTopic2.java   
/**
     * @param args
     * @throws TimeoutException 
     * @throws IOException 
     * @date 2017年7月13日 下午3:08:40
     * @writer junehappylove
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setPort(port);
        factory.setVirtualHost(virtualHost);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
//      声明一个匹配模式的交换器
        channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic");
        String queueName = channel.queueDeclare().getQueue();
        // 路由关键字
        String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
//      绑定路由关键字
        for (String bindingKey : routingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, bindingKey);
            System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME_TOPIC+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
        }

        System.out.println("ReceiveLogsTopic2 Waiting for messages");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws UnsupportedEncodingException  {
                String message = new String(body, "UTF-8");
                System.out.println("ReceiveLogsTopic2 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
项目:june.mq    文件:ReceiveLogsTopic1.java   
/**
 * @param args
 * @throws IOException
 * @throws TimeoutException
 * @date 2017年7月13日 下午3:06:20
 * @writer junehappylove
 */
public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setPort(port);
    factory.setVirtualHost(virtualHost);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 声明一个匹配模式的交换机
    channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic");
    String queueName = channel.queueDeclare().getQueue();
    // 路由关键字
    String[] routingKeys = new String[] { "*.orange.*" };
    // 绑定路由
    for (String routingKey : routingKeys) {
        channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, routingKey);
        System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME_TOPIC + ", queue:" + queueName
                + ", BindRoutingKey:" + routingKey);
    }
    System.out.println("ReceiveLogsTopic1 Waiting for messages");

    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("ReceiveLogsTopic1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
        }
    };
    channel.basicConsume(queueName, true, consumer);

}
项目:mumu-rabbitmq    文件:RabbitMQQuickStart.java   
/**
 * 接受消息
 */
public void receiveQuickstartMessage() throws IOException, TimeoutException {
    RabbitMQChannel channel = new RabbitMQChannel().channel();
    AMQP.Queue.DeclareOk declareOk = channel.getChannel().queueDeclare(QUICKSTART_QUEUE_NAME, false, false, false, null);
    System.out.println("等待接受队列【" + QUICKSTART_QUEUE_NAME + "】消息");
    //建立一个消费者 监听消息的接受
    Consumer consumer = new DefaultConsumer(channel.getChannel()) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("接受消息:" + message);
        }
    };
    channel.getChannel().basicConsume(QUICKSTART_QUEUE_NAME, true, consumer);
    //channel.close();
}
项目:mumu-rabbitmq    文件:RabbitMQPubsub.java   
/**
 * 接受消息
 */
public void receivePubsubMessage() throws IOException, TimeoutException {
    RabbitMQChannel channel = new RabbitMQChannel().channel();
    channel.getChannel().exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.getChannel().queueDeclare().getQueue();
    channel.getChannel().queueBind(queueName, EXCHANGE_NAME, "");
    System.out.println("等待接受pusub订阅【" + EXCHANGE_NAME + "】消息");
    System.out.println("选择队列:"+queueName);
    Consumer consumer = new DefaultConsumer(channel.getChannel()) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("接受消息:" + message + "'");
        }
    };
    channel.getChannel().basicConsume(queueName, true, consumer);
}
项目:mumu-rabbitmq    文件:RabbitMQTopic.java   
/**
 * 接受主题消息
 */
public void receiveTopicMessage() throws IOException, TimeoutException {
    RabbitMQChannel channel = new RabbitMQChannel().channel();
    channel.getChannel().exchangeDeclare(TOPIC, "topic");
    String queueName = channel.getChannel().queueDeclare().getQueue();
    channel.getChannel().queueBind(queueName, TOPIC, "bindingKey");
    System.out.println("等待接受topic主题【" + TOPIC + "】消息");
    System.out.println("选择队列:" + queueName);
    Consumer consumer = new DefaultConsumer(channel.getChannel()) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("接受消息:" + envelope.getRoutingKey() + "':'" + message + "'");
        }
    };
    channel.getChannel().basicConsume(queueName, true, consumer);
}
项目:flowing-retail-old    文件:RabbitMqConsumer.java   
protected void connect() throws Exception {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  channel = connection.createChannel();

  String queueName = "flowing-retail-" + name;
  channel.queueDeclare(queueName, true, false, false, null);
  channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true); // publish/subscribe model
  channel.queueBind(queueName, EXCHANGE_NAME, "*");

  System.out.println(" [*] Waiting for messages.");

  Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      String message = new String(body, "UTF-8");
      System.out.println(" [x] Received '" + message + "'");
      eventHandler.handleEvent(message);
    }
  };
  channel.basicConsume(queueName, true, consumer);
}
项目:RabbitMQ-gateway    文件:RabbitListener.java   
/**
 * Declare an exchange, via an interface that allows the complete set of arguments.
 * 
 * @see com.rabbitmq.client.AMQP.Exchange.Declare
 * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk <br>
 *      exchange the name of the exchange <br>
 *      type the exchange type <br>
 *      durable true if we are declaring a durable exchange (the exchange will survive a server restart) <br>
 *      autoDelete true if the server should delete the exchange when it is no longer in use <br>
 *      internal true if the exchange is internal, i.e. can't be directly published to by a client. <br>
 *      arguments other properties (construction arguments) for the exchange
 * @throws java.io.IOException if an error is encountered
 */
public void subscribe(Exchange exchange, Topic topic, String routingKey) throws Exception {

  channel.exchangeDeclare(exchange.exchange(),
      exchange.type(),
      exchange.durable(),
      exchange.autoDelete(),
      exchange.autoDelete(),
      exchange.properties());

  channel.queueDeclare(topic.name(),
      topic.durable(),
      topic.exclusive(),
      topic.autoDelete(), null);

  channel.queueBind(topic.name(), exchange.exchange(), routingKey);

  final Consumer consumer = createConsumer(channel);
  boolean autoAck = true;
  channel.basicConsume(topic.name(), autoAck, consumer);
}
项目:rabbitmq-resource-adapter    文件:ConnectorTestCase.java   
@Test
public void testConnectionFactory() throws Exception {
    Assert.assertNotNull(connectionFactory1);
    Assert.assertNotNull(queue);

    RabbitmqConnection connection = connectionFactory1.getConnection();
    Assert.assertNotNull(connection);
    String queueName = "testing";
    Channel channel = connection.createChannel();
    channel.queueDeclare(queueName, false, false, false, null);
    String message = "Hello World!";

    final CountDownLatch counter = new CountDownLatch(1);
    Consumer consume = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                BasicProperties properties, byte[] body) throws IOException {
            Assert.assertEquals("Hello World!", new String(body));
            counter.countDown();
        }
    };

    channel.basicConsume(queueName, true, consume);
    channel.basicPublish("", queueName, null, message.getBytes());
    counter.await(10, TimeUnit.SECONDS);
    Assert.assertEquals(0, counter.getCount());
    channel.close();

}
项目:Camel    文件:RabbitMQConsumerTest.java   
@Test
public void testStoppingConsumerShutdownConnectionWhenServerHasClosedChannel() throws Exception {
    AlreadyClosedException alreadyClosedException = Mockito.mock(AlreadyClosedException.class);

    RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);

    Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3));
    Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
    Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
    Mockito.when(conn.createChannel()).thenReturn(channel);
    Mockito.when(channel.basicConsume(anyString(), anyBoolean(), any(Consumer.class))).thenReturn("TAG");
    Mockito.when(channel.isOpen()).thenReturn(false);
    Mockito.doThrow(alreadyClosedException).when(channel).basicCancel("TAG");
    Mockito.doThrow(alreadyClosedException).when(channel).close();

    consumer.doStart();
    consumer.doStop();

    Mockito.verify(conn).close(30 * 1000);
}
项目:reactor-incubator    文件:LapinQueueSubscription.java   
public LapinQueueSubscription(Stream<QueueSignal> lapinStream, final Subscriber<? super QueueSignal> subscriber,
                              Queue queue,
                              boolean bindAckToRequest,
                              Map<String, Object> consumerArguments,
                              Subscription dependency,
                              java.util.function.Consumer<QueueSignal> doOnNext
) {
    super(lapinStream, subscriber);
    this.queueConfig = queue;
    this.bindAckToRequest = bindAckToRequest;
    this.consumerArguments = consumerArguments;
    this.dependency = dependency;
    this.doOnNext = doOnNext != null ? doOnNext : new java.util.function.Consumer<QueueSignal>() {
        @Override
        public void accept(QueueSignal queueSignal) {
            subscriber.onNext(queueSignal);
        }
    };
}
项目:datacollector    文件:TestStreamSetsMessageConsumer.java   
@Test
public void testConsumerSingleMessage() throws Exception {
  TransferQueue<RabbitMessage> messages = new LinkedTransferQueue<>();

  Channel channel = mock(Channel.class);

  final Consumer consumer = new StreamSetsMessageConsumer(channel, messages);
  final Envelope envelope = new Envelope(1L, false, EXCHANGE_NAME, QUEUE_NAME);

  executor.submit(new Runnable() {
    @Override
    public void run() {
      try {
        consumer.handleDelivery("consumerTag", envelope, null, TEST_MESSAGE_1.getBytes());
      } catch (IOException ignored) {
        // no op
      }
    }
  });

  RabbitMessage message = messages.take();
  assertEquals(TEST_MESSAGE_1, new String(message.getBody(), StandardCharsets.UTF_8));
}
项目:lyra    文件:ChannelHandler.java   
private String handleConsumerDeclare(Method method, Object[] args) throws Exception {
  if (config.isConsumerRecoveryEnabled()) {
    Consumer consumer = (Consumer) args[args.length - 1];
    args[args.length - 1] = new ConsumerDelegate(this, consumer);
    String consumerTag = (String) Reflection.invoke(delegate, method, args);
    String queueName = "".equals(args[0]) ? lastGeneratedQueueName : (String) args[0];
    QueueDeclaration queueDeclaration = connectionHandler.queueDeclarations.get(queueName);
    if (queueDeclaration != null)
      queueName = queueDeclaration.name;
    consumerDeclarations.put(consumerTag, new ConsumerDeclaration(queueDeclaration, method, args));
    log.info("".equals(queueName) ? "Created consumer-{}{} via {}"
      : "Created consumer-{} of {} via {}", consumerTag, queueName, this);
    return consumerTag;
  } else
    return (String) Reflection.invoke(delegate, method, args);
}
项目:lyra    文件:ChannelConfigTest.java   
/**
 * Asserts that channel specific configuration overrides global config.
 */
public void shouldOverrideGlobalConfig() throws Throwable {
  mockConnection();
  MockChannel mc = mockChannel(1);
  ((ConfigurableChannel) mc.proxy).withChannelRecoveryPolicy(RecoveryPolicies.recoverNever())
      .withChannelRecoveryPolicy(RecoveryPolicies.recoverNever());

  when(mc.delegate.basicConsume(anyString(), any(Consumer.class))).thenThrow(
      retryableChannelShutdownSignal());

  try {
    mc.proxy.basicConsume("foo", null);
    fail();
  } catch (Exception e) {
    verify(mc.delegate).basicConsume(anyString(), any(Consumer.class));
  }
}
项目:june.mq    文件:Work1.java   
/**
 * @param args
 * @throws IOException
 * @throws TimeoutException
 * @date 2017年7月11日 下午5:55:38
 * @writer junehappylove
 */
public static void main(String[] args) throws IOException, TimeoutException {
    final ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setPort(port);
    factory.setVirtualHost(virtualHost);
    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println("Worker1  Waiting for messages");

    // 每次从队列获取的数量
    //channel.basicQos(1);保证一次只分发一个 
    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Worker1  Received '" + message + "'");
            try {
                //throw new Exception();
                doWork(message);
            } catch (Exception e) {
                channel.abort();
            } finally {
                System.out.println("Worker1 Done");
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        }
    };
    //autoAck是否自动回复,
    //如果为true的话,每次生产者只要发送信息就会从内存中删除,
    //那么如果消费者程序异常退出,那么就无法获取数据,
    //我们当然是不希望出现这样的情况,所以才去手动回复,
    //每当消费者收到并处理信息然后在通知生成者。
    //最后从队列中删除这条信息。
    //如果消费者异常退出,如果还有其他消费者,
    //那么就会把队列中的消息发送给其他消费者,
    //如果没有,等消费者启动时候再次发送。
    boolean autoAck = false;
    // 消息消费完成确认
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

}
项目:june.mq    文件:Work2.java   
/**
 * @param args
 * @throws IOException
 * @throws TimeoutException
 * @date 2017年7月11日 下午5:55:38
 * @writer junehappylove
 */
public static void main(String[] args) throws IOException, TimeoutException {
    final ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setPort(port);
    factory.setVirtualHost(virtualHost);
    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println("Worker2 Waiting for messages");

    // 每次从队列获取的数量
    //channel.basicQos(1);保证一次只分发一个 
    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Worker2 Received '" + message + "'");
            try {
                //throw new Exception();
                doWork(message);
            } catch (Exception e) {
                channel.abort();
            } finally {
                System.out.println("Worker2 Done");
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        }
    };
    //autoAck是否自动回复,
    //如果为true的话,每次生产者只要发送信息就会从内存中删除,
    //那么如果消费者程序异常退出,那么就无法获取数据,
    //我们当然是不希望出现这样的情况,所以才去手动回复,
    //每当消费者收到并处理信息然后在通知生成者。
    //最后从队列中删除这条信息。
    //如果消费者异常退出,如果还有其他消费者,
    //那么就会把队列中的消息发送给其他消费者,
    //如果没有,等消费者启动时候再次发送。
    boolean autoAck = false;
    // 消息消费完成确认
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

}
项目:mumu-rabbitmq    文件:RabbitMQRPC.java   
/**
 * 服务端开启服务
 */
public void service() throws IOException, TimeoutException {
    RabbitMQChannel channel = new RabbitMQChannel().channel();
    channel.getChannel().queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
    channel.getChannel().basicQos(1);
    System.out.println("等待rpc客户端连接...");

    Consumer consumer = new DefaultConsumer(channel.getChannel()) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(properties.getCorrelationId())
                    .build();
            String response = "";
            try {
                String message = new String(body, "UTF-8");
                System.out.println("服务端接受到消息:" + message);
                response = message + UUID.randomUUID().toString();
            } catch (RuntimeException e) {
                e.printStackTrace();
            } finally {
                channel.getChannel().basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                channel.getChannel().basicAck(envelope.getDeliveryTag(), false);
                System.out.println("服务端将处理结果:" + response + ",返回客户单\n");
            }
        }
    };
    channel.getChannel().basicConsume(RPC_QUEUE_NAME, false, consumer);
}
项目:uavstack    文件:RabbitmqIT.java   
@Override
public void preProcess(Consumer t, Object proxy, Method method, Object[] args) {

    Map<String, Object> params = new HashMap<String, Object>();
    params.put(CaptureConstants.INFO_CLIENT_REQUEST_URL, url);
    params.put(CaptureConstants.INFO_CLIENT_REQUEST_ACTION, "Consumer." + method.getName());
    params.put(CaptureConstants.INFO_CLIENT_APPID, applicationId);
    params.put(CaptureConstants.INFO_CLIENT_TYPE, "rabbitmq.client");

    if (logger.isDebugable()) {
        logger.debug("Invoke START:" + url + ",op=Consumer." + method.getName(), null);
    }

    UAVServer.instance().runMonitorCaptureOnServerCapPoint(CaptureConstants.CAPPOINT_APP_CLIENT,
            Monitor.CapturePhase.PRECAP, params);
    // 调用链只关心真正消费消息
    if (method.getName().equals("handleDelivery")) {

        AMQP.BasicProperties props = (BasicProperties) args[2];
        if (props.getHeaders() != null
                && props.getHeaders().containsKey(InvokeChainConstants.PARAM_MQHEAD_SPANINFO)) {
            params.put(InvokeChainConstants.PARAM_MQHEAD_SPANINFO,
                    props.getHeaders().get(InvokeChainConstants.PARAM_MQHEAD_SPANINFO) + "");
            params.put(CaptureConstants.INFO_APPSERVER_CONNECTOR_REQUEST_URL, url);
        }

        // register adapter
        UAVServer.instance().runSupporter("com.creditease.uav.apm.supporters.InvokeChainSupporter",
                "registerAdapter", RabbitmqConsumerAdapter.class);

        UAVServer.instance().runSupporter("com.creditease.uav.apm.supporters.InvokeChainSupporter", "runCap",
                InvokeChainConstants.CHAIN_APP_SERVICE, InvokeChainConstants.CapturePhase.PRECAP, params,
                RabbitmqConsumerAdapter.class, args);
    }

}
项目:uavstack    文件:RabbitmqIT.java   
private void doCap(int rc, Channel channel, Consumer consumer, Method method, Throwable e) {

            if (null == targetServer) {
                Map<String, Object> conn = channel.getConnection().getServerProperties();
                String cluster_name = (null == conn.get("cluster_name")) ? "unknown"
                        : conn.get("cluster_name").toString();
                String version = (null == conn.get("version")) ? "unknown" : conn.get("version").toString();
                targetServer = cluster_name + "@" + version;
            }

            Map<String, Object> params = new HashMap<String, Object>();

            params.put(CaptureConstants.INFO_CLIENT_TARGETSERVER, targetServer);
            params.put(CaptureConstants.INFO_CLIENT_RESPONSECODE, rc);

            if (logger.isDebugable()) {
                logger.debug("Invoke END: rc=" + rc + "," + targetServer, null);
            }

            UAVServer.instance().runMonitorCaptureOnServerCapPoint(CaptureConstants.CAPPOINT_APP_CLIENT,
                    Monitor.CapturePhase.DOCAP, params);

            if (method.getName().equals("handleDelivery")) {
                params.put(CaptureConstants.INFO_APPSERVER_CONNECTOR_REQUEST_URL, url);
                params.put(CaptureConstants.INFO_CLIENT_APPID, applicationId);
                params.put(InvokeChainConstants.CLIENT_IT_CLASS, consumer.getClass().getName());
                // 调用链只关心一个方法
                params.put(InvokeChainConstants.CLIENT_IT_METHOD, "handleDelivery");
                params.put(CaptureConstants.INFO_CLIENT_RESPONSECODE, rc);
                if (rc == -1) {
                    params.put(CaptureConstants.INFO_CLIENT_RESPONSESTATE, e.toString());
                }

                // invoke chain
                UAVServer.instance().runSupporter("com.creditease.uav.apm.supporters.InvokeChainSupporter", "runCap",
                        InvokeChainConstants.CHAIN_APP_SERVICE, InvokeChainConstants.CapturePhase.DOCAP, params,
                        RabbitmqConsumerAdapter.class, null);
            }
        }
项目:RabbitMQ-gateway    文件:RabbitListener.java   
/**
 * listen on rabbit mq topic.
 * 
 * @param topic to subscribe on.
 * @throws Exception if failed to subscribe.
 */
public void subscribe(Topic topic) throws Exception {
  channel.queueDeclare(topic.name(),
      topic.durable(),
      topic.exclusive(),
      topic.autoDelete(), null);

  final Consumer consumer = createConsumer(channel);
  boolean autoAck = true;
  channel.basicConsume(topic.name(), autoAck, consumer);
}
项目:RabbitMQ-gateway    文件:RabbitListener.java   
private Consumer createConsumer(Channel channel) {
  final Consumer consumer = new DefaultConsumer(channel) {

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
        throws IOException {
      incomingMessagesSubject.onNext(body);
    }
  };
  return consumer;
}
项目:apple-jms    文件:BaseMessageConsumer.java   
protected void init() throws IOException {
    Consumer consumer = new DefaultConsumer(channel) {  
           @Override  
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)  
                   throws IOException {
            processByteMessage(body);
           }  
       };  
       channel.basicConsume(topic, true, consumer);  
}
项目:VoltDB    文件:ExportRabbitMQVerifier.java   
private Consumer createConsumer(final Channel channel)
{
    return new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body)
                throws IOException
        {
            long deliveryTag = envelope.getDeliveryTag();

            String row[] = ExportOnServerVerifier.RoughCSVTokenizer.tokenize(new String(body, Charsets.UTF_8));
            if (row.length != 29) {
                return;
            }
            ExportOnServerVerifier.ValidationErr err = null;
            try {
                err = ExportOnServerVerifier.verifyRow(row);
            } catch (ExportOnServerVerifier.ValidationErr validationErr) {
                validationErr.printStackTrace();
            }
            if (err != null) {
                System.out.println("ERROR in validation: " + err.toString());
            }

            if (++m_verifiedRows % VALIDATION_REPORT_INTERVAL == 0) {
                System.out.println("Verified " + m_verifiedRows + " rows.");
            }

            channel.basicAck(deliveryTag, false);
        }
    };
}
项目:msb-java    文件:AmqpConsumerAdapterTest.java   
@Test
public void testUnsubscribe() throws IOException {
    AmqpConsumerAdapter adapter = createAdapterWithNonDurableConf("myTopic", "myGroupId", false);
    String consumerTag = "my consumer tag";
    when(mockChannel.basicConsume(anyString(), anyBoolean(), any(Consumer.class))).thenReturn(consumerTag);

    adapter.subscribe((jsonMessage, ackHandler) -> {
    });
    adapter.unsubscribe();

    verify(mockChannel).basicCancel(consumerTag);
}
项目:msb-java    文件:AmqpConsumerAdapterTest.java   
@Test(expected = ChannelException.class)
public void testUnsubscribeException() throws IOException {
    AmqpConsumerAdapter adapter = createAdapterWithNonDurableConf("myTopic", "myGroupId", false);
    String consumerTag = "my consumer tag";
    when(mockChannel.basicConsume(anyString(), anyBoolean(), any(Consumer.class)))
            .thenThrow(IOException.class);
    adapter.subscribe((jsonMessage, ackHandler) -> {
    });
    adapter.unsubscribe();
}
项目:rabbitmq-resource-adapter    文件:ConnectorTestCase.java   
@Test
public void testConnectionFactory() throws Exception {
    Assert.assertNotNull(connectionFactory1);
    Assert.assertNotNull(queue);

    RabbitmqConnection connection = connectionFactory1.getConnection();
    Assert.assertNotNull(connection);
    String queueName = "testing";
    Channel channel = connection.createChannel();
    channel.queueDeclare(queueName, false, false, false, null);
    String message = "Hello World!";

    final CountDownLatch counter = new CountDownLatch(1);
    Consumer consume = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                BasicProperties properties, byte[] body) throws IOException {
            Assert.assertEquals("Hello World!", new String(body));
            counter.countDown();
        }
    };

    channel.basicConsume(queueName, true, consume);
    channel.basicPublish("", queueName, null, message.getBytes());
    counter.await(10, TimeUnit.SECONDS);
    Assert.assertEquals(0, counter.getCount());
    channel.close();

}
项目:priority-consumer-examples    文件:HiLoPriorityConsumers.java   
public static void main(String[] args) throws IOException {
    ConnectionFactory cf = new ConnectionFactory();
    Connection conn = cf.newConnection();

    Channel ch1 = conn.createChannel();
    Channel ch2 = conn.createChannel();

    ch1.queueDeclare(HIGH_PRIORITY_Q, false, false, true, null);
    ch2.queueDeclare(LOW_PRIORITY_Q, false, false, true, null);

    Consumer hiCons = instantiateConsumer(ch1, "high");
    Consumer loCons = instantiateConsumer(ch2, "low");

    Channel ch3 = conn.createChannel();
    createBacklog(ch3, 30, LOW_PRIORITY_Q, "low");
    createBacklog(ch3, 3,  HIGH_PRIORITY_Q, "high");

    ch1.basicConsume(HIGH_PRIORITY_Q, hiCons);
    ch2.basicConsume(LOW_PRIORITY_Q, loCons);

    Random rnd = new Random();

    try {
        while(true) {
            // publish one low priority message
            ch3.basicPublish("", LOW_PRIORITY_Q, null, "message".getBytes());

            // publish one low or high priority one
            String key = routingKeys.get(rnd.nextInt(routingKeys.size()));
            ch3.basicPublish("", key, null, "message".getBytes());
            Thread.sleep(500);
        }
    } catch (InterruptedException ie) {
        System.out.println("Terminating...");
        System.exit(0);
    }
}
项目:airavata    文件:StatusReceiver.java   
@Override
public void run() {
    try {
        channel = connection.createChannel();
        channel.exchangeDeclare(exchangeName, EXCHANGE_TYPE);
        channel.queueDeclare(queueName, true, false, false, null).getQueue();
        channel.queueBind(queueName, exchangeName, "");
        System.out.println(" [*] Waiting for messages.");
        Consumer consumer = new StatusConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
    } catch (Exception e) {
        e.printStackTrace();
    }
}
项目:conduit    文件:AMQPTransportTest.java   
@Test
public void testListenImplDynamicQueues() throws IOException {
    String randoq = "randoq";
    String exchange = "exchange";
    String router = "router";

    AMQImpl.Queue.DeclareOk ok = mock(AMQImpl.Queue.DeclareOk.class);
    when(ok.getQueue()).thenReturn(randoq);
    when(channel.queueDeclare()).thenReturn(ok);
    amqpTransport.setChannel(channel);

    AMQPConsumerCallback consumerCallback = mock(AMQPConsumerCallback.class);

    AMQPCommonListenProperties commonListenProperties = AMQPSyncConsumerBuilder.builder()
            .callback(consumerCallback)
            .dynamicQueueCreation(true)
            .exchange(exchange)
            .dynamicQueueRoutingKey(router)
            .poisonQueueEnabled(Boolean.TRUE)
            .prefetchCount(1)
            .buildListenProperties();

    amqpTransport.listenImpl(commonListenProperties);
    verify(amqpTransport).getConsumer(consumerCallback, commonListenProperties, "." + randoq);
    verify(amqpTransport).createDynamicQueue(exchange, router, true);
    verify(channel).basicConsume(eq(randoq), eq(false), any(Consumer.class));
    verify(channel).basicQos(1);
}
项目:conduit    文件:AMQPTransportTest.java   
@Test
public void testListenImplDynamicQueuesPurgeOnConnect() throws IOException {
    String randoq = "randoq";
    String exchange = "exchange";
    String router = "router";

    AMQImpl.Queue.DeclareOk ok = mock(AMQImpl.Queue.DeclareOk.class);
    when(ok.getQueue()).thenReturn(randoq);
    when(channel.queueDeclare()).thenReturn(ok);
    amqpTransport.setChannel(channel);

    AMQPConsumerCallback consumerCallback = mock(AMQPConsumerCallback.class);
    AMQPCommonListenProperties commonListenProperties = AMQPSyncConsumerBuilder.builder()
            .callback(consumerCallback)
            .dynamicQueueCreation(true)
            .exchange(exchange)
            .purgeOnConnect(Boolean.TRUE)
            .dynamicQueueRoutingKey(router)
            .poisonQueueEnabled(Boolean.TRUE)
            .prefetchCount(1)
            .buildListenProperties();

    amqpTransport.listenImpl(commonListenProperties);
    verify(channel).queuePurge(randoq);
    verify(amqpTransport).getConsumer(consumerCallback, commonListenProperties, "." + randoq);
    verify(amqpTransport).createDynamicQueue(exchange, router, true);
    verify(channel).basicConsume(eq(randoq), eq(false), any(Consumer.class));
    verify(channel).basicQos(1);
}
项目:conduit    文件:AMQPTransportTest.java   
@Test
public void testListenImplBasicConfig() throws IOException {
    String queue = "randoq";
    String exchange = "exchange";
    String routingKey = "routingKey";
    AMQPConsumerBuilder.ExchangeType exchangeType = AMQPConsumerBuilder.ExchangeType.DIRECT;

    AMQImpl.Queue.DeclareOk ok = mock(AMQImpl.Queue.DeclareOk.class);
    when(ok.getQueue()).thenReturn(queue);
    when(channel.queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), anyMap())).thenReturn(ok);
    amqpTransport.setChannel(channel);

    AMQPConsumerCallback consumerCallback = mock(AMQPConsumerCallback.class);

    AMQPCommonListenProperties commonListenProperties = AMQPSyncConsumerBuilder.builder()
            .callback(consumerCallback)
            .autoCreateAndBind(exchange, exchangeType, queue, routingKey)
            .poisonQueueEnabled(Boolean.TRUE)
            .prefetchCount(1)
            .buildListenProperties();

    amqpTransport.listenImpl(commonListenProperties);
    verify(amqpTransport).getConsumer(consumerCallback, commonListenProperties, "");
    verify(amqpTransport).autoCreateAndBind(exchange, exchangeType.toString(), queue, routingKey, true);

    verify(channel, times(1)).exchangeDeclare(exchange, exchangeType.toString(), true);
    verify(channel, times(1)).queueDeclare(queue, true, false, false, null);
    verify(channel, times(1)).queueBind(queue, exchange, routingKey);
    verify(channel, times(1)).queueDeclare(queue + AMQPTransport.POISON, true, false, false, null);
    verify(channel, times(1)).queueBind(queue + AMQPTransport.POISON, exchange, routingKey + AMQPTransport.POISON);

    verify(channel).basicConsume(eq(queue), eq(false), any(Consumer.class));
    verify(channel).basicQos(1);
}
项目:dropwizard-experiment    文件:RabbitMQMessageQueue.java   
@Override
public Closeable consume(Function<T, Void> processor) {
    Consumer consumer = new FunctionConsumer<>(channel, processor, type, name, metrics);

    try {
        String tag = channel.basicConsume(name, false, consumer);
        log.info("Set up consumer '{}' for queue '{}'.", tag, name);


        return () -> channel.basicCancel(tag);
    } catch (IOException e) {
        throw new MessageQueueException("Unable to set up consumer.", e);
    }
}
项目:lyra    文件:ChannelHandler.java   
private void notifyConsumerRecoveryStarted(Consumer consumer) {
  for (ConsumerListener listener : config.getConsumerListeners())
    try {
      listener.onRecoveryStarted(consumer, proxy);
    } catch (Exception ignore) {
    }
}
项目:lyra    文件:ChannelHandler.java   
private void notifyConsumerRecoveryCompleted(Consumer consumer) {
  for (ConsumerListener listener : config.getConsumerListeners())
    try {
      listener.onRecoveryCompleted(consumer, proxy);
    } catch (Exception ignore) {
    }
}
项目:lyra    文件:ChannelHandler.java   
private void notifyConsumerRecoveryFailure(Consumer consumer, Exception e) {
  for (ConsumerListener listener : config.getConsumerListeners())
    try {
      listener.onRecoveryFailure(consumer, proxy, e);
    } catch (Exception ignore) {
    }
}
项目:lyra    文件:AbstractFunctionalTest.java   
protected Consumer mockConsumer(String queueName, int consumerNumber) throws IOException {
  Consumer consumer = consumers.get(consumerNumber);
  if (consumer == null) {
    String consumerTag = String.format("%s-%s", delegate.getChannelNumber(), consumerNumber);
    consumer = new MockConsumer(proxy, consumerNumber);
    when(delegate.basicConsume(eq(queueName), argThat(matcherFor(consumer)))).thenReturn(
        consumerTag);
    proxy.basicConsume(queueName, consumer);
    consumers.put(consumerNumber, consumer);
  }

  return consumer;
}
项目:lyra    文件:AbstractFunctionalTest.java   
static ArgumentMatcher<Consumer> matcherFor(final Consumer consumer) {
  return new ArgumentMatcher<Consumer>() {
    @Override
    public boolean matches(Object arg) {
      return arg instanceof MockConsumer ? ((MockConsumer) arg).equals(consumer)
          : ((ConsumerDelegate) arg).delegate.equals(consumer);
    }
  };
}