/** * @throws ChannelException if some problems during connecting to Broker were occurred */ protected Connection createConnection(ConnectionFactory connectionFactory) { try { LOG.info("Opening AMQP connection to host = {}, port = {}, username = {}, password = xxx, virtualHost = {}...", connectionFactory.getHost(), connectionFactory.getPort(), connectionFactory.getUsername(), connectionFactory.getVirtualHost()); Connection connection = connectionFactory.newConnection(); if (connection instanceof Recoverable) { // This cast is possible for connections created by a factory that supports auto-recovery ((Recoverable) connection).addRecoveryListener(recoverable -> LOG.info("AMQP connection recovered.")); } LOG.info("AMQP connection opened."); return connection; } catch (IOException | TimeoutException e) { throw new ChannelException("Failed to obtain connection to AMQP broker", e); } }
@Override public void openConnection() throws IOException { // Already connected? Do nothing this.logger.info( getId() + " is opening a connection to RabbitMQ." ); if( isConnected()) { this.logger.info( getId() + " has already a connection to RabbitMQ." ); return; } // Initialize the connection ConnectionFactory factory = new ConnectionFactory(); RabbitMqUtils.configureFactory( factory, this.configuration ); this.channel = factory.newConnection().createChannel(); this.logger.info( getId() + " established a new connection with RabbitMQ. Channel # " + this.channel.getChannelNumber()); // Be notified when a message does not arrive in a queue (i.e. nobody is listening) this.channel.addReturnListener( new RoboconfReturnListener()); // Add a recoverable listener (when broken connections are recovered). // Given the way the RabbitMQ factory is configured, the channel should be "recoverable". ((Recoverable) this.channel).addRecoveryListener( new RoboconfRecoveryListener()); // Declare the exchanges. RabbitMqUtils.declareGlobalExchanges( this.domain, this.channel ); RabbitMqUtils.declareApplicationExchanges( this.domain, this.applicationName, this.channel ); // Declare the dedicated queue. String queueName = getQueueName(); this.channel.queueDeclare( queueName, true, false, true, null ); // Start listening to messages. RoboconfConsumer consumer = new RoboconfConsumer( getId(), this.channel, this.messageQueue ); consumer.handleConsumeOk( queueName ); this.consumerTag = this.channel.basicConsume( queueName, true, consumer ); this.logger.finer( "A new consumer tag was created: " + this.consumerTag ); }
@Override public void handleRecovery( Recoverable recoverable ) { if( recoverable instanceof Channel ) { int channelNumber = ((Channel) recoverable).getChannelNumber(); this.logger.fine( "Connection to channel #" + channelNumber + " was recovered." ); } }
@Test public void testHandleRecovery() { // Channel being recovered RoboconfRecoveryListener listener = new RoboconfRecoveryListener(); AutorecoveringChannel ch = Mockito.mock( AutorecoveringChannel.class ); listener.handleRecovery( ch ); Mockito.verify( ch, Mockito.only()).getChannelNumber(); // Not a channel (e.g. a connection) Recoverable recoverable = Mockito.mock( Recoverable.class ); listener.handleRecovery( recoverable ); Mockito.verifyZeroInteractions( recoverable ); }
@Override public Connection create() throws Exception { ConnectionFactory factory = null; Connection connection = null; do { try { factory = pickOne(); connection = factory.newConnection(); } catch (Exception e) { logger.error("fail to create new connection from factory: [" + factory.getHost() + ":" + factory.getPort() + "], kicking this one out and retry..."); kick(factory.getHost() + ":" + factory.getPort()); } } while (connection == null && ConnectionFactoryManager.getInstance().getAvailableFactories().keySet().size() > 0); if (connection == null) { throw new Exception("fail to get new connection. no hosts left to use."); } /* ADD CONNECTION & CHANNEL CONNECTION LISTENER */ connection.addShutdownListener(new ShutdownListener() { public void shutdownCompleted(ShutdownSignalException cause) { String hardError = ""; String applInit = ""; if (cause.isHardError()) { hardError = "connection"; } else { hardError = "channel"; } if (cause.isInitiatedByApplication()) { applInit = "application"; } else { applInit = "broker"; } logger.warn("Connectivity to MQ has failed. It was caused by " + applInit + " at the " + hardError + " level. Reason received " + cause.getReason()); } }); ((Recoverable) connection).addRecoveryListener(new RecoveryListener() { public void handleRecovery(Recoverable recoverable) { if (recoverable instanceof Connection) { logger.info("Connection was recovered."); } else if (recoverable instanceof Channel) { int channelNumber = ((Channel) recoverable).getChannelNumber(); logger.info("Connection to channel #" + channelNumber + " was recovered."); } } public void handleRecoveryStarted(Recoverable arg0) { } }); /* ADD CONNECTION & CHANNEL CONNECTION LISTENER */ logger.info( "new connection was establesed...from host <- [" + factory.getHost() + ":" + factory.getPort() + "]"); return connection; }