public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); factory.setHost("localhost"); factory.setPort(5672); Connection newConnection = factory.newConnection(); Channel channel = newConnection.createChannel(); Scanner scanner = new Scanner(System.in); String message = ""; while(!message.equals("exit")){ System.out.println("Enter your message"); message = scanner.next(); channel.queueDeclare("flink-test", true, false, false, null); channel.basicPublish("", "flink-test", new BasicProperties.Builder() .correlationId(java.util.UUID.randomUUID().toString()).build(), message.getBytes()); } scanner.close(); channel.close(); newConnection.close(); }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String msg = getMessage(argv); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("dd MMM yyyy @ HH:mm:ss"); String sDate = sdf.format(date); String finalMsg = sDate + ": " + msg; channel.basicPublish(EXCHANGE_NAME, "", null, finalMsg.getBytes("UTF-8")); System.out.println("Emmited message: " + finalMsg); channel.close(); conn.close(); }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月11日 下午5:53:02 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); // 分发信息 for (int i = 0; i < 20; i++) { String message = "Hello RabbitMQ" + i; channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("NewTask send '" + message + "'"); } channel.close(); connection.close(); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:49:49 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct");// 注意是direct // 发送信息 for (String routingKey : routingKeys) { String message = "RoutingSendDirect Send the message level:" + routingKey; channel.basicPublish(EXCHANGE_NAME_ROUTING, routingKey, null, message.getBytes()); System.out.println("RoutingSendDirect Send" + routingKey + "':'" + message); } channel.close(); connection.close(); }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // factory.setHost(""); factory.setUri("amqp://alpha.netkiller.cn"); factory.setUsername("admin"); // factory.setPassword("admin123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:37:37 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// fanout表示分发,所有的消费者得到同样的队列信息 // 分发信息 for (int i = 0; i < 5; i++) { String message = "Hello World" + i; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println("EmitLog Sent '" + message + "'"); } channel.close(); connection.close(); }
private void updateTaskStatus(TaskStatus status) { logger.info("[Study = " + taskStudy + "] [Unit = "+ unitId + "] Sending task update to server. Task id = [" + task.getId() + "] status = ["+status.toString()+"]"); final String QUEUE_NAME = SystemConstants.UBONGO_SERVER_TASKS_STATUS_QUEUE; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(serverAddress); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); task.setStatus(status); RabbitData message = new RabbitData(task, MachineConstants.UPDATE_TASK_REQUEST); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); if (logger.isDebugEnabled()) { logger.debug(" [!] Sent '" + message.getMessage() + "'"); } channel.close(); connection.close(); } catch (Exception e){ logger.error("[Study = " + taskStudy + "] [Unit = "+ unitId + "] Failed sending task status to server. Task id = [" + task.getId() + "] Status = [" + status.toString() + "] error: " + e.getMessage(), e); } }
public static void main(String[] args) throws Exception { String queueName = "TestQueue"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); System.out.println(" [*] Waiting for messages..."); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } }
/** * Helper method to retrieve queue message from rabbitMQ * * @return result * @throws Exception */ private static String consumeWithoutCertificate() throws Exception { String result = ""; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5671); factory.useSslProtocol(); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); GetResponse chResponse = channel.basicGet("WithoutClientCertQueue", true); if(chResponse != null) { byte[] body = chResponse.getBody(); result = new String(body); } channel.close(); conn.close(); return result; }
public static void main(String[] args) throws Exception { //建立连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置连接地址 factory.setHost("seaof-153-125-234-173.jp-tokyo-10.arukascloud.io"); factory.setPort(31084); //获取连接 Connection connection = factory.newConnection(); //获取渠道 Channel channel = connection.createChannel(); //声明交换机类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //产生随机数字 String message = RandomStringUtils.randomNumeric(8); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); channel.close(); connection.close(); }
public static void main(String[] args) throws IOException, TimeoutException { //建立连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置连接地址 factory.setHost("seaof-153-125-234-173.jp-tokyo-10.arukascloud.io"); factory.setPort(31084); //获取连接 Connection connection = factory.newConnection(); //获取渠道 Channel channel = connection.createChannel(); //声明队列,如果不存在就新建 //参数1队列名称;参数2是否持久化;参数3排他性队列,连接断开自动删除;参数4是否自动删除;参数5.参数 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //发送的消息 String message = Thread.currentThread().getName() + "Hello "; //参数1 交换机;参数2 路由键;参数3 基础属性;参数4 消息体 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(Thread.currentThread().getName() + "[send]" + message); channel.close(); connection.close(); }
/** * 注册主机 * * @param hostPort */ private synchronized static void registerFactory(String hostPort) { if (hostPort == null || hostPort.isEmpty()) return; if (!hostPort.contains(":")) return; String[] params = hostPort.split(":"); if (params.length != 2) { logger.warn("hostPort illegal, length is not 2"); return; } logger.info("registering new factory [" + hostPort + "] ..."); ConnectionFactory factory = new ConnectionFactory(); factory.setHost(params[0]); factory.setPort(Integer.valueOf(params[1])); factory.setAutomaticRecoveryEnabled(automaticRecovery); factory.setNetworkRecoveryInterval(networkRecoveryInterval); factory.setUsername(userName); factory.setPassword(password); ConnectionFactoryManager.getInstance().register(hostPort, factory); }
private ConnectionFactory createConnectionFactory() throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(this.config.getUsername()); factory.setPassword(this.config.getPassword()); factory.setVirtualHost(this.config.getVirtualHost()); factory.setAutomaticRecoveryEnabled(true); factory.setConnectionTimeout(this.config.getConnectionTimeout()); factory.setNetworkRecoveryInterval(this.config.getNetworkRecoveryInterval()); if (this.threadFactory != null) { factory.setThreadFactory(this.threadFactory); } return factory; }
public static ConfigDef config() { return new ConfigDef() .define(HOST_CONFIG, ConfigDef.Type.STRING, ConnectionFactory.DEFAULT_HOST, ConfigDef.Importance.HIGH, HOST_DOC) .define(USERNAME_CONFIG, ConfigDef.Type.STRING, ConnectionFactory.DEFAULT_USER, ConfigDef.Importance.HIGH, USERNAME_DOC) .define(PASSWORD_CONFIG, ConfigDef.Type.STRING, ConnectionFactory.DEFAULT_PASS, ConfigDef.Importance.HIGH, PASSWORD_DOC) .define(VIRTUAL_HOST_CONFIG, ConfigDef.Type.STRING, ConnectionFactory.DEFAULT_VHOST, ConfigDef.Importance.HIGH, VIRTUAL_HOST_DOC) .define(REQUESTED_CHANNEL_MAX_CONFIG, ConfigDef.Type.INT, ConnectionFactory.DEFAULT_CHANNEL_MAX, ConfigDef.Importance.LOW, REQUESTED_CHANNEL_MAX_DOC) .define(REQUESTED_FRAME_MAX_CONFIG, ConfigDef.Type.INT, ConnectionFactory.DEFAULT_FRAME_MAX, ConfigDef.Importance.LOW, REQUESTED_FRAME_MAX_DOC) .define(CONNECTION_TIMEOUT_CONFIG, ConfigDef.Type.INT, ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, ConfigDef.Importance.LOW, CONNECTION_TIMEOUT_DOC) .define(HANDSHAKE_TIMEOUT_CONFIG, ConfigDef.Type.INT, ConnectionFactory.DEFAULT_HANDSHAKE_TIMEOUT, ConfigDef.Importance.LOW, HANDSHAKE_TIMEOUT_DOC) .define(SHUTDOWN_TIMEOUT_CONFIG, ConfigDef.Type.INT, ConnectionFactory.DEFAULT_SHUTDOWN_TIMEOUT, ConfigDef.Importance.LOW, SHUTDOWN_TIMEOUT_DOC) .define(REQUESTED_HEARTBEAT_CONFIG, ConfigDef.Type.INT, ConnectionFactory.DEFAULT_HEARTBEAT, ConfigDef.Importance.LOW, REQUESTED_HEARTBEAT_DOC) .define(AUTOMATIC_RECOVERY_ENABLED_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, AUTOMATIC_RECOVERY_ENABLED_DOC) .define(TOPOLOGY_RECOVERY_ENABLED_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, TOPOLOGY_RECOVERY_ENABLED_DOC) .define(NETWORK_RECOVERY_INTERVAL_CONFIG, ConfigDef.Type.INT, 10000, ConfigDef.Importance.LOW, NETWORK_RECOVERY_INTERVAL_DOC) .define(PORT_CONFIG, ConfigDef.Type.INT, ConnectionFactory.DEFAULT_AMQP_PORT, ConfigDef.Importance.MEDIUM, PORT_DOC); }
/** * @param args * @throws TimeoutException * @throws IOException * @throws InterruptedException * @throws ConsumerCancelledException * @throws ShutdownSignalException */ public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println("RPCServer Awating RPC request"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(props.getCorrelationId()) .build(); String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println("RPCServer fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月13日 下午2:57:32 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行多重绑定 for (String severity : routingKeys2) { channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, severity); System.out.println("ReceiveLogsDirect2 exchange:" + EXCHANGE_NAME_ROUTING + ", queue:" + queueName + ", BindRoutingKey:" + severity); } System.out.println("ReceiveLogsDirect2 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsDirect2 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:53:18 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行绑定 for (String routingKey : routingKeys1) { channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, routingKey); System.out.println("ReceiveLogsDirect1 exchange:" + EXCHANGE_NAME_ROUTING + "," + " queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("ReceiveLogsDirect1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月11日 下午5:21:46 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = null; Channel channel = null; try { // 创建一个新的连接 connection = factory.newConnection(); // 创建一个通道 channel = connection.createChannel(); // 声明一个队列 // queueDeclare第一个参数表示队列名称 //第二个参数为是否持久化(true表示是,队列将在服务器重启时生存) //第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除) //第四个参数为当所有消费者客户端连接断开时是否自动删除队列 //第五个参数为队列的其他参数 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "{\"temperature\":100}"; // 发送消息到队列中 //basicPublish第一个参数为交换机名称 //第二个参数为队列映射的路由key //第三个参数为消息的其他属性 //第四个参数为发送信息的主体 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("Producer Send +'" + message + "'"); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭通道和连接 channel.close(); connection.close(); } }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月11日 下午5:32:45 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ地址 factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); // 创建一个新的连接 Connection connection = factory.newConnection(); // 创建一个通道 Channel channel = connection.createChannel(); // 声明要关注的队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Customer Waiting Received messages"); // DefaultConsumer类实现了Consumer接口,通过传入一个频道, // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery Consumer consumer = new DefaultConsumer(channel) { //envelope主要存放生产者相关信息(比如交换机、路由key等) //body是消息实体 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Customer Received '" + message + "'"); } }; // 自动回复队列应答 -- RabbitMQ中的消息确认机制 channel.basicConsume(QUEUE_NAME, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:40:52 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 产生一个随机的队列名称 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");// 对队列进行绑定 System.out.println("ReceiveLogs1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogs1 Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer);// 队列会自动删除 }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午3:03:24 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); connection = factory.newConnection(); channel = connection.createChannel(); // 声明一个匹配模式的交换机 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); // 待发送的消息 String[] routingKeys = new String[] { "quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "quick.brown.fox", "quick.orange.male.rabbit", "lazy.orange.male.rabbit" }; // 发送消息 for (String severity : routingKeys) { String message = "From " + severity + " routingKey' s message!"; channel.basicPublish(EXCHANGE_NAME_TOPIC, severity, null, message.getBytes()); System.out.println("TopicSend Sent '" + severity + "':'" + message + "'"); } } catch (Exception e) { e.printStackTrace(); if (connection != null) { channel.close(); connection.close(); } } finally { if (connection != null) { channel.close(); connection.close(); } } }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午3:08:40 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个匹配模式的交换器 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由关键字 String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"}; // 绑定路由关键字 for (String bindingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, bindingKey); System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME_TOPIC+", queue:"+queueName+", BindRoutingKey:" + bindingKey); } System.out.println("ReceiveLogsTopic2 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsTopic2 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月13日 下午3:06:20 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个匹配模式的交换机 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由关键字 String[] routingKeys = new String[] { "*.orange.*" }; // 绑定路由 for (String routingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, routingKey); System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME_TOPIC + ", queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("ReceiveLogsTopic1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsTopic1 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
@Test public void createConnectionFactory() { int requestedHeartbeat = 121, port = 8889; String host = "testHost", password = "testPassword", username = "testUsername", virtualHost = "testVirtualHost"; RabbitMqConfig rabbitMqConfig = new RabbitMqConfigBuilder() .setRequestedHeartbeat(requestedHeartbeat) .setHost(host) .setPassword(password) .setPort(port) .setUsername(username) .setVirtualHost(virtualHost) .build(); RabbitMqChannelProvider rabbitMqChannelProvider = new RabbitMqChannelProvider(rabbitMqConfig); ConnectionFactory connectionFactory = rabbitMqChannelProvider.createConnectionFactory(); assertEquals(requestedHeartbeat, connectionFactory.getRequestedHeartbeat()); assertEquals(port, connectionFactory.getPort()); assertEquals(host, connectionFactory.getHost()); assertEquals(password, connectionFactory.getPassword()); assertEquals(username, connectionFactory.getUsername()); assertEquals(virtualHost, connectionFactory.getVirtualHost()); }
@Override public void open() { try { ConnectionFactory factory = new ConnectionFactory(); // factory.setHost( ); factory.setUri(this.uri); if (this.username != null) { factory.setUsername(this.username); factory.setPassword(this.password); } this.connection = factory.newConnection(); this.channel = this.connection.createChannel(); channel.queueDeclare(this.queue, false, false, false, null); } catch (Exception e) { logger.warn(e.getMessage()); } }
public void connectToBroker(){ try { /*Get a ConnectionFactory object */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(ip); factory.setPort(port); factory.setUsername(user); factory.setPassword(password); /* Create a connection */ connection = factory.newConnection(); /* Create a channel over that TCP/IP connection */ channel = connection.createChannel(); } catch(Exception e){ e.printStackTrace(); } }
@POST @Path("rabbitmqRecv") public void send() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(properties.getHeaders()); String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); }
public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) {} } } }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
public static Connection connection() throws IOException, TimeoutException { if(connection == null) { synchronized(GabrielData.class) { if(connection != null) return connection; Config config = config(); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(config.rabbitMQHost); connectionFactory.setPort(config.rabbitMQPort); connectionFactory.setUsername(config.rabbitMQUsername); connectionFactory.setPassword(config.rabbitMQPassword); connection = connectionFactory.newConnection(); generalPurposeChannel = connection.createChannel(); GatewayInfo.init(generalPurposeChannel); } } return connection; }
protected void connect() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); channel = connection.createChannel(); String queueName = "flowing-retail-" + name; channel.queueDeclare(queueName, true, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true); // publish/subscribe model channel.queueBind(queueName, EXCHANGE_NAME, "*"); System.out.println(" [*] Waiting for messages."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); eventHandler.handleEvent(message); } }; channel.basicConsume(queueName, true, consumer); }
public static void main(String[] args) throws Exception { String queueName = "TestQueue"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }