/** Recovers a queue using the {@code channelSupplier}, returning the recovered queue's name. */ String recoverQueue(String queueName, QueueDeclaration queueDeclaration) throws Exception { try { String newQueueName = ((Queue.DeclareOk) queueDeclaration.invoke(getRecoveryChannel())).getQueue(); if (queueName.equals(newQueueName)) log.info("Recovered queue {} via {}", queueName, this); else { log.info("Recovered queue {} as {} via {}", queueName, newQueueName, this); queueDeclaration.name = newQueueName; } return newQueueName; } catch (Exception e) { log.error("Failed to recover queue {} via {}", queueName, this, e); if (throwOnRecoveryFailure() || Exceptions.isCausedByConnectionClosure(e)) throw e; return queueName; } }
@Override @SuppressWarnings("unchecked") void createResources() throws IOException { Channel adminChannel = mockRecoveryChannel(); MockChannel channel = mockChannel(1); Queue.DeclareOk declareOk = mock(Queue.DeclareOk.class); when(declareOk.getQueue()).thenReturn("test-queue"); when( adminChannel.queueDeclare(eq("test-queue"), anyBoolean(), anyBoolean(), anyBoolean(), anyMap())).thenReturn(declareOk); when( channel.delegate.queueDeclare(eq("test-queue"), anyBoolean(), anyBoolean(), anyBoolean(), anyMap())).thenReturn(declareOk); channel.proxy.exchangeDeclare("test-exchange", "topic"); channel.proxy.queueDeclare("test-queue", false, false, true, null); channel.proxy.queueBind("test-queue", "test-exchange", "#", null); }
@Override public Object invoke(Object ignored, final Method method, final Object[] args) throws Throwable { if (closed && method.getDeclaringClass().isAssignableFrom(Channel.class)) throw new AlreadyClosedException(delegate.getCloseReason()); Callable<Object> callable = new Callable<Object>() { @Override public Object call() throws Exception { if (method.getDeclaringClass().isAssignableFrom(ChannelConfig.class)) return Reflection.invoke(config, method, args); String methodName = method.getName(); if ("basicAck".equals(methodName) || "basicNack".equals(methodName) || "basicReject".equals(methodName)) { long deliveryTag = (Long) args[0] - previousMaxDeliveryTag; if (deliveryTag > 0) args[0] = deliveryTag; else return null; } else if ("basicConsume".equals(methodName)) return handleConsumerDeclare(method, args); else if ("basicCancel".equals(methodName) && args[0] != null) consumerDeclarations.remove((String) args[0]); else if ("exchangeDelete".equals(methodName) && args[0] != null) connectionHandler.exchangeDeclarations.remove((String) args[0]); else if ("exchangeUnbind".equals(methodName) && args[0] != null) connectionHandler.exchangeBindings.remove((String) args[0], new Binding(args)); else if ("queueDelete".equals(methodName) && args[0] != null) connectionHandler.queueDeclarations.remove((String) args[0]); else if ("queueUnbind".equals(methodName) && args[0] != null) connectionHandler.queueBindings.remove((String) args[0], new Binding(args)); Object result = Reflection.invoke(delegate, method, args); if ("exchangeDeclare".equals(methodName)) handleExchangeDeclare(method, args); else if ("exchangeBind".equals(methodName)) handleExchangeBind(args); else if ("queueDeclare".equals(methodName)) handleQueueDeclare(((Queue.DeclareOk) result).getQueue(), method, args); else if ("queueBind".equals(methodName)) handleQueueBind(method, args); else if ("flowBlocked".equals(methodName)) flowBlocked = true; else if ("basicQos".equals(methodName)) { // Store non-global Qos if (args.length < 3 || !(Boolean) args[2]) basicQos = new ResourceDeclaration(method, args); } else if ("confirmSelect".equals(methodName)) confirmSelect = true; else if ("txSelect".equals(methodName)) txSelect = true; else if (methodName.startsWith("add")) handleAdd(methodName, args[0]); else if (methodName.startsWith("remove")) handleRemove(methodName, args[0]); else if (methodName.startsWith("clear")) handleClear(methodName); return result; } @Override public String toString() { return Reflection.toString(method); } }; return handleCommonMethods(delegate, method, args) ? null : callWithRetries(callable, config.getChannelRetryPolicy(), null, config.getRetryableExceptions(), canRecover(), true); }
/** * Actively declare a server-named exclusive, autodelete, non-durable queue. * The name of the new queue is held in the "queue" field of the {@link com.rabbitmq.client.AMQP.Queue.DeclareOk} result. * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare() throws IOException;
/** * Declare a queue * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
/** * Declare a queue passively; i.e., check if it exists. In AMQP * 0-9-1, all arguments aside from nowait are ignored; and sending * nowait makes this method a no-op, so we default it to false. * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue the name of the queue * @return a declaration-confirm method to indicate the queue exists * @throws java.io.IOException if an error is encountered, * including if the queue does not exist and if the queue is * exclusively owned by another connection. */ Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
/** * Delete a queue, without regard for whether it is in use or has messages on it * @see com.rabbitmq.client.AMQP.Queue.Delete * @see com.rabbitmq.client.AMQP.Queue.DeleteOk * @param queue the name of the queue * @return a deletion-confirm method to indicate the queue was successfully deleted * @throws java.io.IOException if an error is encountered */ Queue.DeleteOk queueDelete(String queue) throws IOException;
/** * Delete a queue * @see com.rabbitmq.client.AMQP.Queue.Delete * @see com.rabbitmq.client.AMQP.Queue.DeleteOk * @param queue the name of the queue * @param ifUnused true if the queue should be deleted only if not in use * @param ifEmpty true if the queue should be deleted only if empty * @return a deletion-confirm method to indicate the queue was successfully deleted * @throws java.io.IOException if an error is encountered */ Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
/** * Bind a queue to an exchange, with no extra arguments. * @see com.rabbitmq.client.AMQP.Queue.Bind * @see com.rabbitmq.client.AMQP.Queue.BindOk * @param queue the name of the queue * @param exchange the name of the exchange * @param routingKey the routine key to use for the binding * @return a binding-confirm method if the binding was successfully created * @throws java.io.IOException if an error is encountered */ Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
/** * Bind a queue to an exchange. * @see com.rabbitmq.client.AMQP.Queue.Bind * @see com.rabbitmq.client.AMQP.Queue.BindOk * @param queue the name of the queue * @param exchange the name of the exchange * @param routingKey the routine key to use for the binding * @param arguments other properties (binding parameters) * @return a binding-confirm method if the binding was successfully created * @throws java.io.IOException if an error is encountered */ Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
/** * Unbinds a queue from an exchange, with no extra arguments. * @see com.rabbitmq.client.AMQP.Queue.Unbind * @see com.rabbitmq.client.AMQP.Queue.UnbindOk * @param queue the name of the queue * @param exchange the name of the exchange * @param routingKey the routine key to use for the binding * @return an unbinding-confirm method if the binding was successfully deleted * @throws java.io.IOException if an error is encountered */ Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
/** * Unbind a queue from an exchange. * @see com.rabbitmq.client.AMQP.Queue.Unbind * @see com.rabbitmq.client.AMQP.Queue.UnbindOk * @param queue the name of the queue * @param exchange the name of the exchange * @param routingKey the routine key to use for the binding * @param arguments other properties (binding parameters) * @return an unbinding-confirm method if the binding was successfully deleted * @throws java.io.IOException if an error is encountered */ Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
/** * Purges the contents of the given queue. * @see com.rabbitmq.client.AMQP.Queue.Purge * @see com.rabbitmq.client.AMQP.Queue.PurgeOk * @param queue the name of the queue * @return a purge-confirm method if the purge was executed succesfully * @throws java.io.IOException if an error is encountered */ Queue.PurgeOk queuePurge(String queue) throws IOException;