private void initRabbitMQ() throws IOException { Server.LOGGER.info("Initialization of the Notifications channel"); mRabbitMQManager.getChannel().addShutdownListener(new ShutdownListener() { @Override public void shutdownCompleted(ShutdownSignalException cause) { cause.printStackTrace(); } }); mRabbitMQManager.getChannel().exchangeDeclare("Pub", BuiltinExchangeType.FANOUT, true); String queueName = mRabbitMQManager.getChannel().queueDeclare().getQueue(); mRabbitMQManager.getChannel().queueBind(queueName, "Pub", ""); mRabbitMQManager.getChannel().basicConsume(queueName, true, new RabbitMQConsumer(this, mRabbitMQManager.getChannel())); Server.LOGGER.info("Initialization of the Pub channel done."); }
@Override public void declareQueue() throws IOException { RabbitAdmin rabbitAdmin = rabbitAdmin(); try { rabbitAdmin.declareQueue(new Queue("demo2",false,false,false)); }catch (Exception e){ try { if (406 == ((AMQImpl.Channel.Close) ((ShutdownSignalException)e.getCause().getCause()).getReason()).getReplyCode()) { rabbitAdmin.deleteQueue("demo2"); declareQueue(); } }catch (Exception e1){ } log.error("e 's value : {}", e); } }
/** * @param args * @throws TimeoutException * @throws IOException * @throws InterruptedException * @throws ConsumerCancelledException * @throws ShutdownSignalException */ public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println("RPCServer Awating RPC request"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(props.getCorrelationId()) .build(); String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println("RPCServer fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
@Test public void testConnection_WithOpenStack_WithIgnoreControllerException_WithIgnoreProviderException_WhenRabbitClientConnectionThrowsSignalException_ThrowsErrorTypeException() throws Throwable { // Arrange. this.exception.expect(ErrorTypeException.class); DryRunRequest<VirtualizationConnectorRequest> request = VirtualizationConnectorUtilTestData.generateOpenStackVCWithSDN(); List<ErrorType> errorList = new ArrayList<>(); errorList.add(ErrorType.CONTROLLER_EXCEPTION); errorList.add(ErrorType.PROVIDER_EXCEPTION); request.addErrorsToIgnore(errorList); VirtualizationConnector vc = VirtualizationConnectorEntityMgr.createEntity(request.getDto(), this.encrypter); doThrow(mock(ShutdownSignalException.class)).when(this.rabbitClient).testConnection(); DryRunRequest<VirtualizationConnectorRequest> spyRequest = spy(request); // Act. this.util.checkConnection(spyRequest, vc); // Assert. verify(spyRequest, times(1)).isIgnoreErrorsAndCommit(ErrorType.CONTROLLER_EXCEPTION); verify(spyRequest, times(1)).isIgnoreErrorsAndCommit(ErrorType.PROVIDER_EXCEPTION); verify(this.rabbitClient, times(1)).testConnection(); }
private void startSyncReceiveThread(final QueueingConsumer consumer, final boolean autoAck, final BindingVo binding) { syncReceiveThread = new SyncReceiveThread() { @Override public void run() { log.info("start listen to the " + typeStr + "[" + queue.getName() + "]."); while (running) { try { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); XCO xcoMessage = getMessage(delivery.getBody()); log.info("received a message from " + typeStr + "[" + queue.getName() + "]: " + xcoMessage); boolean result = exec(service, xcoMessage, binding); if (!autoAck && result) { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } catch (ShutdownSignalException e) { // TODO 可能会出现断链的问题 e.printStackTrace(); } catch (Throwable e) { log.error("listen to the [" + queue.getName() + "] error.", e); } } closed = true; } }; syncReceiveThread.start(); }
/**** * This method is used to publish the message to RabbitMQ * @param routingKey * @param msg is Eiffel Event * @throws IOException */ public void send(String routingKey, String msg) throws IOException { Channel channel = giveMeRandomChannel(); channel.addShutdownListener(new ShutdownListener() { public void shutdownCompleted(ShutdownSignalException cause) { // Beware that proper synchronization is needed here if (cause.isInitiatedByApplication()) { log.debug("Shutdown is initiated by application. Ignoring it."); } else { log.error("Shutdown is NOT initiated by application."); log.error(cause.getMessage()); boolean cliMode = Boolean.getBoolean(PropertiesConfig.CLI_MODE); if (cliMode) { System.exit(-3); } } } }); BasicProperties msgProps = MessageProperties.BASIC; if (usePersitance) msgProps = MessageProperties.PERSISTENT_BASIC; channel.basicPublish(exchangeName, routingKey, msgProps, msg.getBytes()); log.info("Published message with size {} bytes on exchange '{}' with routing key '{}'", msg.getBytes().length, exchangeName, routingKey); }
@Override public void shutdownCompleted(final ShutdownSignalException shutdownSignalException) { if (!shutdownSignalException.isInitiatedByApplication()) { for (final String subscriberId : s_subscribers.keySet()) { final Ternary<String, Channel, EventSubscriber> subscriberDetails = s_subscribers.get(subscriberId); subscriberDetails.second(null); s_subscribers.put(subscriberId, subscriberDetails); } abortConnection(); // disconnected to AMQP server, so abort the connection and channels s_logger.warn("Connection has been shutdown by AMQP server. Attempting to reconnect."); // initiate re-connect process final ReconnectionTask reconnect = new ReconnectionTask(); executorService.submit(reconnect); } }
public void destroy(){ log.debug("Destroying '" + CMPT_NAME + "' component"); if(null != conn){ try{ if(conn.isOpen()){ conn.close(); } conn = null; }catch(IOException ioe){ log.error("IOException while destroying "+ CMPT_NAME +" component: ", ioe); }catch(ShutdownSignalException sse){ log.error("Shutdown Exception while destroying "+ CMPT_NAME + " component: ", sse); }catch(Exception e){ log.error("Caught unhandled exception while destroying " + CMPT_NAME + " component: ", e); } } log.info("\tdestroyed."); }
/*** * getParam().getWaitTime() 指定消息池为空时的堵塞超时 * */ @Override public String readOneMessage() { try { channel.basicConsume(getParam().getQueue(), false, consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery(getParam().getWaitTime()); if (delivery != null) { deliveryTag = delivery != null ? delivery.getEnvelope().getDeliveryTag() : deliveryTag; String msg = getMessageContent(delivery); return msg; } else return null; } catch (IOException | ShutdownSignalException | ConsumerCancelledException | InterruptedException e) { throw new MqReceiveException(e); } }
public final void close() throws IOException, ShutdownSignalException { if (this.channel != null) { this.channel.close(); Log.d("libretalk::LibretalkConnection", "Closed LibretalkConnection#channel"); } if (this.connection != null) { this.connection.close(); Log.d("libretalk::LibretalkConnection", "Closed LibretalkConnection#connection"); } this.status = ConnectionStatus.NOT_CONNECTED; }
public void sendMessage(File messageFile) throws IOException, ShutdownSignalException, InterruptedException{ InputStream is = new FileInputStream(messageFile); // Get the size of the file long length = messageFile.length(); if (length > Integer.MAX_VALUE) { throw new IOException("Input File ("+messageFile.getName()+") is to large! "); } byte[] messageBodyBytes = new byte[(int)length]; int offset = 0; int numRead = 0; while (offset < messageBodyBytes.length && (numRead=is.read(messageBodyBytes, offset, messageBodyBytes.length-offset)) >= 0) { offset += numRead; } if (offset < messageBodyBytes.length) { throw new IOException("Could not completely read file "+messageFile.getName()); } is.close(); this.channel.basicPublish(this.ExchangeName, this.RoutingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes) ; }
/** * Helper method to close connection in RMQChannel * * @throws ShutdownSignalException * @throws IOException * @throws AlreadyClosedException */ public void closeConnection() throws ShutdownSignalException, IOException, AlreadyClosedException{ if (connection != null) { try { channel.close(); } catch (Exception e) { //ignore as the connection gets closed anyway } channel = null; try { connection.close(); } finally { connection = null; } } }
@Override public void run() { while(!shutdown) { try { Thread.yield(); buffer.waitNotEmpty(); observer.onCallback(); } catch(ShutdownSignalException e) { shutdown=true; log.debug("closed connection of receiver."); } catch(Throwable t) { log.error(t.getMessage()); t.printStackTrace(); System.exit(-1); throw new RuntimeException("Problem during run! "+t.getMessage()); } } }
@Override public void run() { while(!shutdown) { try { Thread.yield(); T m = receive(); // log.info("got:" + m.toString()); if(buffer!=null) { log.error("previous packet has not been picket up! discarded:"+buffer); } buffer = m; observer.onCallback(); } catch(ShutdownSignalException e) { shutdown=true; log.debug("closed connection"); } catch(Throwable t) { log.error(t.getMessage()); t.printStackTrace(); System.exit(-1); throw new RuntimeException("Problem during run! "+t.getMessage()); } } }
@Override public void shutdownCompleted(ShutdownSignalException shutdownSignalException) { if (!shutdownSignalException.isInitiatedByApplication()) { for (String subscriberId : s_subscribers.keySet()) { Ternary<String, Channel, EventSubscriber> subscriberDetails = s_subscribers.get(subscriberId); subscriberDetails.second(null); s_subscribers.put(subscriberId, subscriberDetails); } abortConnection(); // disconnected to AMQP server, so abort the connection and channels s_logger.warn("Connection has been shutdown by AMQP server. Attempting to reconnect."); // initiate re-connect process ReconnectionTask reconnect = new ReconnectionTask(); executorService.submit(reconnect); } }
@Override public void shutdownCompleted(ShutdownSignalException e) { channelShutdown(); if (!e.isInitiatedByApplication()) { log.error("Channel {} was closed unexpectedly", ChannelHandler.this); lastShutdownSignal = e; if (!Exceptions.isConnectionClosure(e) && canRecover()) ConnectionHandler.RECOVERY_EXECUTORS.execute(new Runnable() { @Override public void run() { try { recoveryPending.set(true); recoverChannel(false); } catch (Throwable ignore) { } } }); } }
/** * Returns an answer that fails n times for each thread, throwing t for the first n invocations * and returning {@code returnValue} thereafter. Prior to throwing t, the connection handler's * shutdown listener is completed if t is a connection shutdown signal. */ protected <T> Answer<T> failNTimes(final int n, final Throwable t, final T returnValue, final RetryableResource resource) { return new Answer<T>() { AtomicInteger failures = new AtomicInteger(); @Override public T answer(InvocationOnMock invocation) throws Throwable { if (failures.getAndIncrement() >= n) return returnValue; if (t instanceof ShutdownSignalException) callShutdownListener(resource, (ShutdownSignalException) t); if (t instanceof ShutdownSignalException && !(t instanceof AlreadyClosedException)) throw new IOException(t); else throw t; } }; }
private void performInvocation(final ShutdownSignalException e, final Waiter waiter) throws Throwable { mockConnection(); mockInvocation(e); closeConnectionAfterDelay(); runInThread(new Runnable() { public void run() { try { connectionProxy.createChannel(); waiter.fail("Invocation should have thrown an exception"); } catch (Exception actual) { if (!actual.equals(e)) actual.printStackTrace(); waiter.assertEquals(actual, e); waiter.resume(); } } }); waiter.await(10000); }
private void performInvocation(final ShutdownSignalException e, final Waiter waiter) throws Throwable { mockConnection(); mockInvocation(e); closeChannelAfterDelay(); runInThread(new Runnable() { public void run() { try { mockChannel(1).proxy.basicCancel("foo-tag"); waiter.fail("Invocation should have thrown an exception"); } catch (Exception expected) { waiter.assertEquals(e, expected); waiter.resume(); } } }); waiter.await(100000); }
void performInvocation(ShutdownSignalException invocationFailure, Exception recoveryFailure) throws Throwable { mockConnection(); mockInvocation(invocationFailure); if (recoveryFailure != null) mockRecovery(recoveryFailure); final Waiter waiter = new Waiter(); runInThread(new Runnable() { public void run() { try { performInvocation(); waiter.resume(); } catch (Throwable t) { waiter.fail(t); } } }); waiter.await(1000); }
void performThrowableInvocation(ShutdownSignalException invocationFailure, Exception recoveryFailure) throws Throwable { mockConnection(); mockInvocation(invocationFailure); if (recoveryFailure != null) mockRecovery(recoveryFailure); try { performInvocation(); fail("Invocation should have thrown an exception"); } catch (IOException expected) { assertEquals(invocationFailure, expected.getCause()); } Thread.sleep(100); }
/** * I am wondering if I need to do anything here. The connection shutdown listener SHOULD get this signal * and handle it there. I am thinking I don't have to do anything... */ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { /* * This will add POISON to a Queueing consumer local queue buffer causing a ShutdownSignalException * to be thrown when the application code asks for the queue entry. This is probably ok and desierable. * We don't want to comment this out, because a ShutdownSignal could have been initiated by the application * via a close() call. */ log.debug("Consumer {} proxy recieved a Shutdown Signal {}. {}", consumerTag, sig.isInitiatedByApplication() ? "initiated by the app" : "not initiated by the app", sig.toString()); log.debug(HaUtils.shouldReconnect(sig)?"I will ask to reconnect":"I will not ask to reconnect"); if(HaUtils.shouldReconnect(sig)) { try { haChannel.askConnectionToReconnect(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("Thread was interrupted"); throw new RuntimeException(e); } } else { //TODO: is it ok to skip the shutdownSignal??? //delegateConsumer.handleShutdownSignal(consumerTag, sig); } }
/** * 当重复声明同一名称队列时: * 1、参数相同,"不作为" * 2、参数不同,错误码406 * @throws IOException */ @Override public void declareQueue() throws IOException { Connection connection = connection(connectionFactory()); Channel channel = connection.createChannel(); try { channel.queueDeclare("demo1",true,false,false,null); }catch (IOException e){ if ("406".equals(((AMQImpl.Channel.Close) (((ShutdownSignalException)e.getCause())).getReason()).getReplyCode())){ channel = connection.createChannel(); channel.queueDelete("demo1"); channel.queueDeclare("demo1",false,false,false,null); } } }
@Test public void testConnection_WithOpenStack_WithIgnoreControllerException_WithIgnoreProviderException_WhenRabbitClientConnectionThrowsSignalException_WhenMqClientIsNotConnected_ThrowsErrorTypeException() throws Throwable { // Arrange. this.exception.expect(ErrorTypeException.class); DryRunRequest<VirtualizationConnectorRequest> request = VirtualizationConnectorUtilTestData.generateOpenStackVCWithSDN(); request.getDto().setId(20l); List<ErrorType> errorList = new ArrayList<>(); errorList.add(ErrorType.CONTROLLER_EXCEPTION); errorList.add(ErrorType.PROVIDER_EXCEPTION); request.addErrorsToIgnore(errorList); RabbitMQRunner runner = Mockito.mock(RabbitMQRunner.class); this.util.activeRunner = runner; @SuppressWarnings("unchecked") HashMap<Long, OsRabbitMQClient> map = mock(HashMap.class); when(runner.getVcToRabbitMQClientMap()).thenReturn(map); VirtualizationConnector vc = VirtualizationConnectorEntityMgr.createEntity(request.getDto(), this.encrypter); doThrow(mock(ShutdownSignalException.class)).when(this.rabbitClient).testConnection(); DryRunRequest<VirtualizationConnectorRequest> spyRequest = spy(request); // Act. this.util.checkConnection(spyRequest, vc); // Assert. verify(spyRequest, times(1)).isIgnoreErrorsAndCommit(ErrorType.CONTROLLER_EXCEPTION); verify(spyRequest, times(1)).isIgnoreErrorsAndCommit(ErrorType.PROVIDER_EXCEPTION); verify(this.rabbitClient, times(1)).testConnection(); }
@Test public void testConnection_WithOpenStack_WithIgnoreControllerException_WithIgnoreProviderException_WhenRabbitClientConnectionThrowsSignalException_WhenMqClientIsConnected_ReturnsSuccessful() throws Throwable { // Arrange. DryRunRequest<VirtualizationConnectorRequest> request = VirtualizationConnectorUtilTestData.generateOpenStackVCWithSDN(); RabbitMQRunner runner = Mockito.mock(RabbitMQRunner.class); this.util.activeRunner = runner; @SuppressWarnings("unchecked") HashMap<Long, OsRabbitMQClient> map = mock(HashMap.class); when(runner.getVcToRabbitMQClientMap()).thenReturn(map); OsRabbitMQClient mqClient = mock(OsRabbitMQClient.class); doReturn(mqClient).when(map).get(any(Integer.class)); doReturn(true).when(mqClient).isConnected(); request.getDto().setId(20l); List<ErrorType> errorList = new ArrayList<>(); errorList.add(ErrorType.CONTROLLER_EXCEPTION); errorList.add(ErrorType.PROVIDER_EXCEPTION); request.addErrorsToIgnore(errorList); VirtualizationConnector vc = VirtualizationConnectorEntityMgr.createEntity(request.getDto(), this.encrypter); doThrow(mock(ShutdownSignalException.class)).when(this.rabbitClient).testConnection(); DryRunRequest<VirtualizationConnectorRequest> spyRequest = spy(request); // Act. this.util.checkConnection(spyRequest, vc); // Assert. verify(spyRequest, times(1)).isIgnoreErrorsAndCommit(ErrorType.CONTROLLER_EXCEPTION); verify(spyRequest, times(1)).isIgnoreErrorsAndCommit(ErrorType.PROVIDER_EXCEPTION); verify(this.rabbitClient, times(1)).testConnection(); }
@Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { if (sig.isInitiatedByApplication()) { LOGGER.info("Handled shutdown signal"); } else { LOGGER.error("Handled shutdown signal", sig); } }
/** * Retrieves the benchmarks registered at the HOBBIT PlatformController * * @return A list of benchmarks * @throws IOException * @throws InterruptedException * @throws ConsumerCancelledException * @throws ShutdownSignalException * If something goes wrong with the request */ public List<BenchmarkBean> requestBenchmarks() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { LOGGER.info("Sending request..."); byte[] data = client.request(new byte[] { FrontEndApiCommands.LIST_AVAILABLE_BENCHMARKS }); if (data == null) { throw new IOException("Didn't got a response."); } LOGGER.info("Parsing response..."); // parse the response String jsonString = RabbitMQUtils.readString(data); Collection<BenchmarkMetaData> benchmarks = gson.fromJson(jsonString, new TypeToken<Collection<BenchmarkMetaData>>() { }.getType()); LOGGER.info("Preparing response for GUI..."); // Create output List<BenchmarkBean> benchmarkBeans = new ArrayList<BenchmarkBean>(); for (BenchmarkMetaData benchmark : benchmarks) { benchmarkBeans.add( new BenchmarkBean(benchmark.benchmarkUri, benchmark.benchmarkName, benchmark.benchmarkDescription)); } LOGGER.debug(Arrays.toString(benchmarkBeans.toArray())); LOGGER.info("Sending response to GUI..."); return benchmarkBeans; }
public void begin() throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException, SpiderSettingFileException{ readSetting(); logger.info("worker [" + this.settingObject.getWorkerid() + "] start..."); for (Entry<String, MQRecver> recv : this.recvfromMap.entrySet()) { new Thread(new RecvThread(this, recv.getKey(), recv.getValue(), this.sendtoMap)).start(); } }
public static Map<String, Object> mapCall( RabbitConnection con, String routingKey, int timeout, Map<String, Object> args ) throws IOException, ShutdownSignalException, TimeoutException { try( RabbitRPCClient client = new RabbitRPCClient( con, RabbitMQ.DEFAULT_TOPIC, routingKey, timeout ) ) { return client.mapCall( args ); } }
public static void execute( RabbitConnection con, String routingKey, int timeout, Map<String, Object> args, Consumer<Map<String, Object>> action ) throws IOException, ShutdownSignalException, TimeoutException { try( RabbitRPCClient client = new RabbitRPCClient( con, RabbitMQ.DEFAULT_TOPIC, routingKey, timeout ) ) { client.execute( args, action ); } }
private byte[] primitiveCall( AMQP.BasicProperties props, byte[] message ) throws IOException, ShutdownSignalException, TimeoutException { checkConsumer(); BlockingCell<Object> k = new BlockingCell<>(); String replyId = RabbitMQ.newCorrelationId(); props = ((props == null) ? new AMQP.BasicProperties.Builder() : props.builder()) .correlationId( replyId ) .replyTo( replyQueue ) .build(); continuationMap.put( replyId, k ); publish( props, message ); Object reply = k.uninterruptibleGet( timeout ); if( reply instanceof ShutdownSignalException ) { ShutdownSignalException sig = (ShutdownSignalException) reply; ShutdownSignalException wrapper = new ShutdownSignalException( sig.isHardError(), sig.isInitiatedByApplication(), sig.getReason(), sig.getReference() ); wrapper.initCause( sig ); throw wrapper; } else { return (byte[]) reply; } }
public Map<String, Object> mapCall( Map<String, Object> message ) throws IOException, ShutdownSignalException, TimeoutException { byte[] reply = primitiveCall( null, RabbitMQ.toAMQPTable( message ) ); return RabbitMQ.fromAMQPTable( reply ); }
public void execute( Map<String, Object> args, Consumer<Map<String, Object>> action ) throws IOException, ShutdownSignalException, TimeoutException { Map<String, Object> ret = mapCall( args ); if( ret != null ) { action.accept( ret ); } }
private Observable<Message> createObservable() { final AtomicReference<InternalConsumer> consumerRef = new AtomicReference<>(null); final ConnectionRetryHandler retryHandler = new ConnectionRetryHandler(backoffAlgorithm, maxReconnectAttempts); return create((Observable.OnSubscribe<Message>) subscriber -> { if (!subscriber.isUnsubscribed()) { try { startConsuming(subscriber, consumerRef); } catch (Exception e) { Throwable rootCause; if (e instanceof IOException && e.getCause() != null && e.getCause() instanceof ShutdownSignalException) { rootCause = e.getCause(); } else { rootCause = e; } log.errorWithParams("Unexpected error when registering the rabbit consumer on the broker.", "error", rootCause); subscriber.onError(e); } } }) // If we ever successfully get a message, we should reset the error handler .doOnNext(message -> retryHandler.reset()) // On error, make sure to close the existing channel with an error before using the retryHandler .doOnError(throwable -> terminate(consumerRef)) .retryWhen(retryHandler) // handle back pressure by buffering .onBackpressureBuffer() // If someone unsubscribes, close the channel cleanly .doOnUnsubscribe(() -> close(consumerRef)); }
@Override public synchronized void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { log.errorWithParams("The rabbit connection was unexpectedly disconnected.", sig, "channel", channel.toString(), "queue", channel.getQueue(), "consumerTag", consumerTag); subscriber.onError(sig); }
/** * No-op implementation of {@link Consumer#handleShutdownSignal}. */ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { log.info("Recieved shutdown signal on the rabbitMQ channel"); // Check if the consumer closed the connection or something else if (!sig.isInitiatedByApplication()) { // Something else closed the connection so reconnect boolean connected = false; while (!connected && !stopping) { try { reconnect(); connected = true; } catch (IOException | TimeoutException e) { log.warn("Unable to obtain a RabbitMQ channel. Will try again"); Integer networkRecoveryInterval = consumer.getEndpoint().getNetworkRecoveryInterval(); final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L; try { Thread.sleep(connectionRetryInterval); } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } } } } }
/** * {@inheritDoc} */ @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { LOG.debug("Consumer: {} Received shutdown signal: {}", consumerTag, sig.getMessage()); }
public RabbitPubSubMsg consume() throws ShutdownSignalException, ConsumerCancelledException, InterruptedException { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); RabbitPubSubMsg ret = new RabbitPubSubMsg(message, routingKey); return ret; }
@Override public void shutdownCompleted(ShutdownSignalException cause) { if (cause.isInitiatedByApplication()) { return; } log.info("RabbitMQ connection shutdown! The client will attempt to reconnect automatically", cause); }