private void rollbackOnFailedRecoveryRedelivery() throws JMSException { if (previouslyDeliveredMessages != null) { // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback // as messages have been dispatched else where. int numberNotReplayed = 0; for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) { if (!entry.getValue()) { numberNotReplayed++; if (LOG.isDebugEnabled()) { LOG.debug("previously delivered message has not been replayed in transaction: " + previouslyDeliveredMessages.transactionId + " , messageId: " + entry.getKey()); } } } if (numberNotReplayed > 0) { String message = "rolling back transaction (" + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId(); LOG.warn(message); throw new TransactionRolledBackException(message); } } }
@Test(timeout=20000) public void testTransactionCommitFails() throws Exception { connection = (JmsConnection) factory.createConnection(); connection.addConnectionListener(new ConnectionInterruptionListener()); connection.start(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue(_testName.getMethodName()); MessageProducer producer = session.createProducer(queue); producer.send(session.createMessage()); mockPeer.shutdown(); connectionInterrupted.await(9, TimeUnit.SECONDS); try { session.commit(); fail("Should not allow a commit while offline."); } catch (TransactionRolledBackException ex) {} connection.close(); }
@Test(timeout=20000) public void testTransactionRollbackSucceeds() throws Exception { connection = (JmsConnection) factory.createConnection(); connection.addConnectionListener(new ConnectionInterruptionListener()); connection.start(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue(_testName.getMethodName()); MessageProducer producer = session.createProducer(queue); producer.send(session.createMessage()); mockPeer.shutdown(); connectionInterrupted.await(9, TimeUnit.SECONDS); try { session.rollback(); } catch (TransactionRolledBackException ex) { fail("Should allow a rollback while offline."); } connection.close(); }
public static JMSRuntimeException convertToRuntimeException(JMSException e) { if (e instanceof javax.jms.IllegalStateException) { return new IllegalStateRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof InvalidClientIDException) { return new InvalidClientIDRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof InvalidDestinationException) { return new InvalidDestinationRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof InvalidSelectorException) { return new InvalidSelectorRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof JMSSecurityException) { return new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof MessageFormatException) { return new MessageFormatRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof MessageNotWriteableException) { return new MessageNotWriteableRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof ResourceAllocationException) { return new ResourceAllocationRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof TransactionInProgressException) { return new TransactionInProgressRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof TransactionRolledBackException) { return new TransactionRolledBackRuntimeException(e.getMessage(), e.getErrorCode(), e); } return new JMSRuntimeException(e.getMessage(), e.getErrorCode(), e); }
/** * Rolls back any work done in this transaction and releases any locks * currently held. * * @throws JMSException if the JMS provider fails to roll back the * transaction due to some internal error. * @throws javax.jms.IllegalStateException if the method is not called by a * transacted session. */ public void rollback() throws JMSException { if (isInXATransaction()) { throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); } try { beforeEnd(); } catch (TransactionRolledBackException canOcurrOnFailover) { LOG.warn("rollback processing error", canOcurrOnFailover); } if (transactionId != null) { if (LOG.isDebugEnabled()) { LOG.debug("Rollback: " + transactionId + " syncCount: " + (synchronizations != null ? synchronizations.size() : 0)); } TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); this.transactionId = null; //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364 this.connection.syncSendPacket(info); // Notify the listener that the tx was rolled back if (localTransactionEventListener != null) { localTransactionEventListener.rollbackEvent(); } } afterRollback(); }
@Test public void testAutoRollbackWithMissingRedeliveries() throws Exception { LOG.info(this + " running test testAutoRollbackWithMissingRedeliveries"); broker = createBroker(); broker.start(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); configureConnectionFactory(cf); Connection connection = cf.createConnection(); try { connection.start(); final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"); final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); MessageConsumer consumer = consumerSession.createConsumer(destination); produceMessage(producerSession, destination); Message msg = consumer.receive(20000); Assert.assertNotNull(msg); broker.stop(); broker = createBroker(); // use empty jdbc store so that default wait(0) for redeliveries will timeout after failover broker.start(); try { consumerSession.commit(); Assert.fail("expected transaction rolledback ex"); } catch (TransactionRolledBackException expected) { } broker.stop(); broker = createBroker(); broker.start(); Assert.assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000)); } finally { connection.close(); } }
/** * Converts instances of sub-classes of {@link JMSException} into the corresponding sub-class of * {@link JMSRuntimeException}. * * @param e * @return */ public static JMSRuntimeException convertToRuntimeException(JMSException e) { if (e instanceof javax.jms.IllegalStateException) { return new IllegalStateRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof InvalidClientIDException) { return new InvalidClientIDRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof InvalidDestinationException) { return new InvalidDestinationRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof InvalidSelectorException) { return new InvalidSelectorRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof JMSSecurityException) { return new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof MessageFormatException) { return new MessageFormatRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof MessageNotWriteableException) { return new MessageNotWriteableRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof ResourceAllocationException) { return new ResourceAllocationRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof TransactionInProgressException) { return new TransactionInProgressRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof TransactionRolledBackException) { return new TransactionRolledBackRuntimeException(e.getMessage(), e.getErrorCode(), e); } return new JMSRuntimeException(e.getMessage(), e.getErrorCode(), e); }
@Override public void commit(final JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionInfo, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { checkClosed(); final FailoverRequest pending = new FailoverRequest(request, requestTimeout) { @Override public void doTask() throws Exception { provider.commit(transactionInfo, nextTransactionInfo, this); } @Override public boolean failureWhenOffline() { return true; } @Override public String toString() { return "TX commit -> " + transactionInfo.getId(); } @Override protected Exception createOfflineFailureException(IOException error) { Exception ex = new TransactionRolledBackException("Commit failed, connection offline: " + error.getMessage()); ex.initCause(error); return ex; } }; serializer.execute(pending); }
public void commit(final JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionInfo, final AsyncResult request) throws Exception { if (!transactionInfo.getId().equals(current)) { if (!transactionInfo.isInDoubt() && current == null) { throw new IllegalStateException("Commit called with no active Transaction."); } else if (!transactionInfo.isInDoubt() && current != null) { throw new IllegalStateException("Attempt to Commit a transaction other than the current one"); } else { throw new TransactionRolledBackException("Transaction in doubt and cannot be committed."); } } preCommit(); LOG.trace("TX Context[{}] committing current TX[[]]", this, current); DischargeCompletion completion = new DischargeCompletion(request, nextTransactionInfo, true); coordinator.discharge(current, completion); current = null; if (completion.isPipelined()) { // If the discharge completed abnormally then we don't bother creating a new TX as the // caller will determine how to recover. if (!completion.isComplete()) { begin(nextTransactionInfo.getId(), completion.getDeclareCompletion()); } else { completion.getDeclareCompletion().onFailure(completion.getFailureCause()); } } }
@Test(timeout=20000) public void testRollbackErrorCoordinatorClosedOnCommit() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); testPeer.expectBegin(); testPeer.expectCoordinatorAttach(); Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); testPeer.expectDeclare(txnId1); testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId1, false, true, txnId2); testPeer.expectCoordinatorAttach(); testPeer.expectDeclare(txnId2); testPeer.expectDischarge(txnId2, true); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); try { session.commit(); fail("Transaction should have rolled back"); } catch (TransactionRolledBackException ex) { LOG.info("Caught expected TransactionRolledBackException"); } testPeer.expectClose(); connection.close(); testPeer.waitForAllHandlersToComplete(1000); } }
@Test(timeout=20000) public void testRollbackErrorWhenCoordinatorRemotelyClosed() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); testPeer.expectBegin(); testPeer.expectCoordinatorAttach(); Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); testPeer.expectDeclare(txnId); testPeer.remotelyCloseLastCoordinatorLink(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); testPeer.waitForAllHandlersToComplete(2000); testPeer.expectCoordinatorAttach(); testPeer.expectDeclare(txnId); testPeer.expectDischarge(txnId, true); try { session.commit(); fail("Transaction should have rolled back"); } catch (TransactionRolledBackException ex) { LOG.info("Caught expected TransactionRolledBackException"); } testPeer.expectClose(); connection.close(); testPeer.waitForAllHandlersToComplete(1000); } }
@Test(timeout=60000) public void testTxConsumerReceiveThenFailoverCommitFails() throws Exception { URI brokerURI = new URI(getAmqpFailoverURI()); connection = createAmqpConnection(brokerURI); connection.start(); final int MSG_COUNT = 5; final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue(name.getMethodName()); final MessageConsumer consumer = session.createConsumer(queue); sendMessages(connection, queue, MSG_COUNT); QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); assertEquals(MSG_COUNT, proxy.getQueueSize()); for (int i = 0; i < MSG_COUNT; ++i) { Message received = consumer.receive(3000); assertNotNull("Mesage was not expected but not received", received); } stopPrimaryBroker(); restartPrimaryBroker(); proxy = getProxyToQueue(name.getMethodName()); assertEquals(MSG_COUNT, proxy.getQueueSize()); try { LOG.info("Session commit firing after connection failed."); session.commit(); fail("Session commit should have failed with TX rolled back."); } catch (TransactionRolledBackException rb) { LOG.info("Transacted commit failed after failover: {}", rb.getMessage()); } assertEquals(MSG_COUNT, proxy.getQueueSize()); }
@Test(timeout=60000) public void testTxProducerSendsThenFailoverCommitFails() throws Exception { URI brokerURI = new URI(getAmqpFailoverURI()); connection = createAmqpConnection(brokerURI); connection.start(); final int MSG_COUNT = 5; final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue(name.getMethodName()); final MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT); QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); assertEquals(0, proxy.getQueueSize()); for (int i = 0; i < MSG_COUNT; ++i) { LOG.debug("Producer sening message #{}", i + 1); producer.send(session.createTextMessage("Message: " + i)); } assertEquals(0, proxy.getQueueSize()); stopPrimaryBroker(); restartPrimaryBroker(); proxy = getProxyToQueue(name.getMethodName()); assertEquals(0, proxy.getQueueSize()); try { session.commit(); fail("Session commit should have failed with TX rolled back."); } catch (TransactionRolledBackException rb) { LOG.info("Transacted commit failed after failover: {}", rb.getMessage()); } assertEquals(0, proxy.getQueueSize()); }
public static JMSRuntimeException toRuntimeException(final JMSException e) { if (e instanceof javax.jms.IllegalStateException) { return new IllegalStateRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof InvalidClientIDException) { return new InvalidClientIDRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof InvalidDestinationException) { return new InvalidDestinationRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof InvalidSelectorException) { return new InvalidSelectorRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof JMSSecurityException) { return new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof MessageFormatException) { return new MessageFormatRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof MessageNotWriteableException) { return new MessageNotWriteableRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof ResourceAllocationException) { return new ResourceAllocationRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof TransactionInProgressException) { return new TransactionInProgressRuntimeException(e.getMessage(), e.getErrorCode(), e); } if (e instanceof TransactionRolledBackException) { return new TransactionRolledBackRuntimeException(e.getMessage(), e.getErrorCode(), e); } return new JMSRuntimeException(e.getMessage(), e.getErrorCode(), e); }
@Test(expected = TransactionRolledBackRuntimeException.class) public void testConvertsTransactionRolledBackExceptionToTransactionRolledBackRuntimeException() { throw JMSExceptionSupport.createRuntimeException(new TransactionRolledBackException("error")); }
public static void main(final String[] args) throws Exception { final int numMessages = 10; Connection connection = null; InitialContext initialContext = null; try { server0 = ServerUtil.startServer(args[0], TransactionFailoverExample.class.getSimpleName() + "0", 0, 5000); server1 = ServerUtil.startServer(args[1], TransactionFailoverExample.class.getSimpleName() + "1", 1, 5000); // Step 1. Get an initial context for looking up JNDI from the server #1 initialContext = new InitialContext(); // Step 2. Look-up the JMS resources from JNDI Queue queue = (Queue) initialContext.lookup("queue/exampleQueue"); ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("ConnectionFactory"); // Step 3. We create a JMS Connection connection = connectionFactory.createConnection(); // Step 4. We create a *transacted* JMS Session Session session = connection.createSession(true, 0); // Step 5. We start the connection to ensure delivery occurs connection.start(); // Step 6. We create a JMS MessageProducer MessageProducer producer = session.createProducer(queue); // Step 7. We create a JMS MessageConsumer MessageConsumer consumer = session.createConsumer(queue); // Step 8. We send half of the messages, kill the live server and send the remaining messages sendMessages(session, producer, numMessages, true); // Step 9. As failover occurred during transaction, the session has been marked for rollback only try { session.commit(); } catch (TransactionRolledBackException e) { System.err.println("transaction has been rolled back: " + e.getMessage()); } // Step 10. We resend all the messages sendMessages(session, producer, numMessages, false); // Step 11. We commit the session successfully: the messages will be all delivered to the activated backup // server session.commit(); // Step 12. We are now transparently reconnected to server #0, the backup server. // We consume the messages sent before the crash of the live server and commit the session. for (int i = 0; i < numMessages; i++) { TextMessage message0 = (TextMessage) consumer.receive(5000); if (message0 == null) { throw new IllegalStateException("Example failed - message wasn't received"); } System.out.println("Got message: " + message0.getText()); } session.commit(); System.out.println("Other message on the server? " + consumer.receive(5000)); } finally { // Step 13. Be sure to close our resources! if (connection != null) { connection.close(); } if (initialContext != null) { initialContext.close(); } ServerUtil.killServer(server0); ServerUtil.killServer(server1); } }
public static void main(final String[] args) throws Exception { final int numMessages = 10; Connection connection = null; InitialContext initialContext = null; try { server0 = ServerUtil.startServer(args[0], ReplicatedTransactionFailoverExample.class.getSimpleName() + "0", 0, 5000); server1 = ServerUtil.startServer(args[1], ReplicatedTransactionFailoverExample.class.getSimpleName() + "1", 1, 5000); // Step 1. Get an initial context for looking up JNDI from the server #1 initialContext = new InitialContext(); // Step 2. Look-up the JMS resources from JNDI Queue queue = (Queue) initialContext.lookup("queue/exampleQueue"); ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("ConnectionFactory"); // Step 3. We create a JMS Connection connection = connectionFactory.createConnection(); // Step 4. We create a *transacted* JMS Session Session session = connection.createSession(true, 0); // Step 5. We start the connection to ensure delivery occurs connection.start(); // Step 6. We create a JMS MessageProducer MessageProducer producer = session.createProducer(queue); // Step 7. We create a JMS MessageConsumer MessageConsumer consumer = session.createConsumer(queue); // Step 8. We send half of the messages, kill the live server and send the remaining messages sendMessages(session, producer, numMessages, true); // Step 9. As failover occurred during transaction, the session has been marked for rollback only try { session.commit(); } catch (TransactionRolledBackException e) { System.err.println("transaction has been rolled back: " + e.getMessage()); } // Step 10. We resend all the messages sendMessages(session, producer, numMessages, false); // Step 11. We commit the session successfully: the messages will be all delivered to the activated backup // server session.commit(); // Step 12. We are now transparently reconnected to server #0, the backup server. // We consume the messages sent before the crash of the live server and commit the session. for (int i = 0; i < numMessages; i++) { TextMessage message0 = (TextMessage) consumer.receive(5000); if (message0 == null) { throw new IllegalStateException("Example failed - message wasn't received"); } System.out.println("Got message: " + message0.getText()); } session.commit(); System.out.println("Other message on the server? " + consumer.receive(5000)); } finally { // Step 13. Be sure to close our resources! if (connection != null) { connection.close(); } if (initialContext != null) { initialContext.close(); } ServerUtil.killServer(server0); ServerUtil.killServer(server1); } }
@Test public void testSyncBeforeEndCalledOnceOnRollback() throws Exception { final AtomicInteger beforeEndCountA = new AtomicInteger(0); final AtomicInteger beforeEndCountB = new AtomicInteger(0); final AtomicInteger rollbackCountA = new AtomicInteger(0); final AtomicInteger rollbackCountB = new AtomicInteger(0); underTest.addSynchronization(new Synchronization() { @Override public void beforeEnd() throws Exception { if (beforeEndCountA.getAndIncrement() == 0) { throw new TransactionRolledBackException("force rollback"); } } @Override public void afterCommit() throws Exception { fail("expected rollback exception"); } @Override public void afterRollback() throws Exception { rollbackCountA.incrementAndGet(); } }); underTest.addSynchronization(new Synchronization() { @Override public void beforeEnd() throws Exception { beforeEndCountB.getAndIncrement(); } @Override public void afterCommit() throws Exception { fail("expected rollback exception"); } @Override public void afterRollback() throws Exception { rollbackCountB.incrementAndGet(); } }); try { underTest.commit(); fail("expected rollback exception"); } catch (TransactionRolledBackException expected) { } assertEquals("beforeEnd A called once", 1, beforeEndCountA.get()); assertEquals("beforeEnd B called once", 1, beforeEndCountA.get()); assertEquals("rollbackCount B 0", 1, rollbackCountB.get()); assertEquals("rollbackCount A B", rollbackCountA.get(), rollbackCountB.get()); }
@Override protected void consumeMessage(Message message, List<Message> messageList) { try { receiveSession.commit(); super.consumeMessage(message, messageList); } catch (JMSException e) { LOG.info("Failed to commit message receipt: " + message, e); try { receiveSession.rollback(); } catch (JMSException ignored) { } if (e instanceof TransactionRolledBackException) { TransactionRolledBackException transactionRolledBackException = (TransactionRolledBackException) e; if (transactionRolledBackException.getMessage().indexOf("in doubt") != -1) { // failover chucked bc there is a missing reply to a commit. // failover is involved b/c the store exception is handled broker side and the client just // sees a disconnect (socket.close()). // If the client needs to be aware of the failure then it should not use IOExceptionHandler // so that the exception will propagate back // for this test case: // the commit may have got there and the reply is lost "or" the commit may be lost. // so we may or may not get a resend. // // At the application level we need to determine if the message is there or not which is not trivial // for this test we assert received == sent // so we need to know whether the message will be replayed. // we can ask the store b/c we know it is jdbc - guess we could go through a destination // message store interface also or use jmx java.sql.Connection dbConnection = null; try { ActiveMQMessage mqMessage = (ActiveMQMessage) message; MessageId id = mqMessage.getMessageId(); dbConnection = sharedDs.getConnection(); PreparedStatement s = dbConnection.prepareStatement(((JDBCPersistenceAdapter) connectedToBroker().getPersistenceAdapter()).getStatements().getFindMessageStatement()); s.setString(1, id.getProducerId().toString()); s.setLong(2, id.getProducerSequenceId()); ResultSet rs = s.executeQuery(); if (!rs.next()) { // message is gone, so lets count it as consumed LOG.info("On TransactionRolledBackException we know that the ack/commit got there b/c message is gone so we count it: " + mqMessage); super.consumeMessage(message, messageList); } else { LOG.info("On TransactionRolledBackException we know that the ack/commit was lost so we expect a replay of: " + mqMessage); } } catch (Exception dbe) { dbe.printStackTrace(); } finally { try { dbConnection.close(); } catch (SQLException e1) { e1.printStackTrace(); } } } } } }
@Test @BMRules( rules = {@BMRule( name = "set no return response and stop the broker", targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor", targetMethod = "processCommitTransactionOnePhase", targetLocation = "EXIT", action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker($0)")}) public void testFailoverCommitReplyLost() throws Exception { LOG.info(this + " running test testFailoverCommitReplyLost"); broker = createBroker(); startBrokerWithDurableQueue(); doByteman.set(true); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); configureConnectionFactory(cf); Connection connection = cf.createConnection(); connection.start(); final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue(QUEUE_NAME); MessageConsumer consumer = session.createConsumer(destination); produceMessage(session, destination); final CountDownLatch commitDoneLatch = new CountDownLatch(1); // broker will die on commit reply so this will hang till restart new Thread() { @Override public void run() { LOG.info("doing async commit..."); try { session.commit(); } catch (JMSException e) { Assert.assertTrue(e instanceof TransactionRolledBackException); LOG.info("got commit exception: ", e); } commitDoneLatch.countDown(); LOG.info("done async commit"); } }.start(); // will be stopped by the plugin brokerStopLatch.await(60, TimeUnit.SECONDS); doByteman.set(false); broker = createBroker(); broker.start(); Assert.assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); // new transaction Message msg = consumer.receive(20000); LOG.info("Received: " + msg); Assert.assertNotNull("we got the message", msg); Assert.assertNull("we got just one message", consumer.receive(2000)); session.commit(); consumer.close(); connection.close(); // ensure no dangling messages with fresh broker etc broker.stop(); LOG.info("Checking for remaining/hung messages.."); broker = createBroker(); broker.start(); // after restart, ensure no dangling messages cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); configureConnectionFactory(cf); connection = cf.createConnection(); connection.start(); Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session2.createConsumer(destination); msg = consumer.receive(1000); if (msg == null) { msg = consumer.receive(5000); } LOG.info("Received: " + msg); Assert.assertNull("no messges left dangling but got: " + msg, msg); connection.close(); }
@Test public void testAMQ1925_TXBegin() throws Exception { Connection connection = cf.createConnection(); connection.start(); connection.setExceptionListener(this); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); boolean restartDone = false; try { for (int i = 0; i < MESSAGE_COUNT; i++) { Message message = consumer.receive(5000); Assert.assertNotNull(message); if (i == 222 && !restartDone) { // Simulate broker failure & restart bs.stop(); bs = createNewServer(); bs.start(); restartDone = true; } Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER)); try { session.commit(); } catch (TransactionRolledBackException expectedOnOccasion) { log.info("got rollback: " + expectedOnOccasion); i--; } } Assert.assertNull(consumer.receive(500)); } catch (Exception eee) { log.error("got exception", eee); throw eee; } finally { consumer.close(); session.close(); connection.close(); } assertQueueEmpty(); Assert.assertNull("no exception on connection listener: " + exception, exception); }
/** * If a transacted session has failed over whilst it has uncommitted sent * data then we need to throw a TransactedRolledbackException on commit() * * The alternative would be to maintain a replay buffer so that the message * could be resent. This is not currently implemented * * @throws Exception if something goes wrong. */ public void testDirtySendingSynchronousTransacted() throws Exception { Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED); // Ensure we get failover notifications ((AMQConnection) _connection).setConnectionListener(this); MessageProducer producer = producerSession.createProducer(_queue); // Create and send message 0 Message msg = producerSession.createMessage(); msg.setIntProperty(INDEX, 0); producer.send(msg); // DON'T commit message .. fail connection failBroker(getFailingPort()); // Ensure destination exists for sending producerSession.createConsumer(_queue).close(); // Send the next message msg.setIntProperty(INDEX, 1); try { producer.send(msg); fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException."); } catch (JMSException jmse) { assertEquals("Early warning of dirty session not correct", "Failover has occurred and session is dirty so unable to send.", jmse.getMessage()); } // Ignore that the session is dirty and attempt to commit to validate the // exception is thrown. AND that the above failure notification did NOT // clean up the session. try { producerSession.commit(); fail("Session is dirty we should get an TransactionRolledBackException"); } catch (TransactionRolledBackException trbe) { // Normal path. } // Resending of messages should now work ok as the commit was forcilbly rolledback msg.setIntProperty(INDEX, 0); producer.send(msg); msg.setIntProperty(INDEX, 1); producer.send(msg); producerSession.commit(); assertEquals("Wrong number of messages on queue", 2, ((AMQSession) producerSession).getQueueDepth((AMQDestination) _queue)); }
@Test(timeout=20000) public void testTransactionCommitFailWithEmptyRejectedDisposition() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); testPeer.expectBegin(); testPeer.expectCoordinatorAttach(); // First expect an unsettled 'declare' transfer to the txn coordinator, and // reply with a Declared disposition state containing the txnId. Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); testPeer.expectDeclare(txnId1); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue("myQueue"); // Create a producer to use in provoking creation of the AMQP transaction testPeer.expectSenderAttach(); MessageProducer producer = session.createProducer(queue); // Expect the message which was sent under the current transaction. Check it carries // TransactionalState with the above txnId but has no outcome. Respond with a // TransactionalState with Accepted outcome. TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); stateMatcher.withTxnId(equalTo(txnId1)); stateMatcher.withOutcome(nullValue()); TransactionalState txState = new TransactionalState(); txState.setTxnId(txnId1); txState.setOutcome(new Accepted()); testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); producer.send(session.createMessage()); // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, // and reply with rejected and settled disposition to indicate the commit failed testPeer.expectDischarge(txnId1, false, new Rejected()); // Then expect an unsettled 'declare' transfer to the txn coordinator, and // reply with a declared disposition state containing the txnId. Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); testPeer.expectDeclare(txnId2); try { session.commit(); fail("Commit operation should have failed."); } catch (TransactionRolledBackException jmsTxRb) { } // session should roll back on close testPeer.expectDischarge(txnId2, true); testPeer.expectClose(); connection.close(); testPeer.waitForAllHandlersToComplete(1000); } }
@Test(timeout=20000) public void testProducedMessagesAfterCommitOfSentMessagesFails() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); testPeer.expectBegin(); testPeer.expectCoordinatorAttach(); // First expect an unsettled 'declare' transfer to the txn coordinator, and // reply with a Declared disposition state containing the txnId. Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); testPeer.expectDeclare(txnId1); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue("myQueue"); // Create a producer to use in provoking creation of the AMQP transaction testPeer.expectSenderAttach(); MessageProducer producer = session.createProducer(queue); // Expect the message which was sent under the current transaction. Check it carries // TransactionalState with the above txnId but has no outcome. Respond with a // TransactionalState with Accepted outcome. TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); stateMatcher.withTxnId(equalTo(txnId1)); stateMatcher.withOutcome(nullValue()); TransactionalState txState = new TransactionalState(); txState.setTxnId(txnId1); txState.setOutcome(new Accepted()); testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); producer.send(session.createMessage()); // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, // and reply with rejected and settled disposition to indicate the commit failed Rejected commitFailure = new Rejected(new Error(Symbol.valueOf("failed"), "Unknown error")); testPeer.expectDischarge(txnId1, false, commitFailure); // Then expect an unsettled 'declare' transfer to the txn coordinator, and // reply with a declared disposition state containing the txnId. Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); testPeer.expectDeclare(txnId2); try { session.commit(); fail("Commit operation should have failed."); } catch (TransactionRolledBackException jmsTxRb) { } // Expect the message which was sent under the current transaction. Check it carries // TransactionalState with the above txnId but has no outcome. Respond with a // TransactionalState with Accepted outcome. stateMatcher = new TransactionalStateMatcher(); stateMatcher.withTxnId(equalTo(txnId2)); stateMatcher.withOutcome(nullValue()); txState = new TransactionalState(); txState.setTxnId(txnId2); txState.setOutcome(new Accepted()); testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); testPeer.expectDischarge(txnId2, true); producer.send(session.createMessage()); testPeer.expectClose(); connection.close(); testPeer.waitForAllHandlersToComplete(1000); } }
@Test(timeout=20000) public void testSendAfterCoordinatorLinkClosedDuringTX() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); testPeer.expectBegin(); testPeer.expectCoordinatorAttach(); // First expect an unsettled 'declare' transfer to the txn coordinator, and // reply with a Declared disposition state containing the txnId. Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); testPeer.expectDeclare(txnId); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue("myQueue"); // Create a producer to use in provoking creation of the AMQP transaction testPeer.expectSenderAttach(); // Close the link, the messages should now just get dropped on the floor. testPeer.remotelyCloseLastCoordinatorLink(); MessageProducer producer = session.createProducer(queue); testPeer.waitForAllHandlersToComplete(2000); producer.send(session.createMessage()); // Expect that a new link will be created in order to start the next TX. txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); testPeer.expectCoordinatorAttach(); testPeer.expectDeclare(txnId); // Expect that the session TX will rollback on close. testPeer.expectDischarge(txnId, true); try { session.commit(); fail("Commit operation should have failed."); } catch (TransactionRolledBackException jmsTxRb) { } testPeer.expectClose(); connection.close(); testPeer.waitForAllHandlersToComplete(1000); } }
@Test(timeout=20000) public void testReceiveAfterCoordinatorLinkClosedDuringTX() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); testPeer.expectBegin(); testPeer.expectCoordinatorAttach(); // First expect an unsettled 'declare' transfer to the txn coordinator, and // reply with a Declared disposition state containing the txnId. Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); testPeer.expectDeclare(txnId); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue("myQueue"); // Create a consumer and send it an initial message for receive to process. DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); testPeer.expectReceiverAttach(); testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); // Close the link, the messages should now just get dropped on the floor. testPeer.remotelyCloseLastCoordinatorLink(); MessageConsumer consumer = session.createConsumer(queue); testPeer.waitForAllHandlersToComplete(2000); // receiving the message would normally ack it, since the TX is failed this // should not result in a disposition going out. Message received = consumer.receive(); assertNotNull(received); // Expect that a new link will be created in order to start the next TX. txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); testPeer.expectCoordinatorAttach(); testPeer.expectDeclare(txnId); // Expect that the session TX will rollback on close. testPeer.expectDischarge(txnId, true); try { session.commit(); fail("Commit operation should have failed."); } catch (TransactionRolledBackException jmsTxRb) { } testPeer.expectClose(); connection.close(); testPeer.waitForAllHandlersToComplete(1000); } }
@Test(expected = TransactionRolledBackRuntimeException.class) public void testConvertsTransactionRolledBackExceptionToTransactionRolledBackRuntimeException() { throw JmsExceptionSupport.createRuntimeException(new TransactionRolledBackException("error")); }
@Test(timeout=60000) public void testTxConsumerReceiveAfterFailoverCommits() throws Exception { URI brokerURI = new URI(getAmqpFailoverURI()); connection = createAmqpConnection(brokerURI); connection.start(); final int MSG_COUNT = 5; final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue(name.getMethodName()); final MessageConsumer consumer = session.createConsumer(queue); sendMessages(connection, queue, MSG_COUNT); QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); assertEquals(MSG_COUNT, proxy.getQueueSize()); stopPrimaryBroker(); restartPrimaryBroker(); assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { return brokerService.getAdminView().getCurrentConnectionsCount() == 1; } }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); assertTrue("Should have a recovered consumer.", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { return brokerService.getAdminView().getQueueSubscribers().length == 1; } }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50))); for (int i = 0; i < MSG_COUNT; ++i) { Message received = consumer.receive(3000); assertNotNull("Mesage was not expected but not received", received); } try { session.commit(); LOG.info("Transacted commit ok after failover."); } catch (TransactionRolledBackException rb) { fail("Session commit should not have failed with TX rolled back."); } assertEquals(0, proxy.getQueueSize()); }
@Test(timeout=60000) public void testTxProducerSendAfterFailoverCommits() throws Exception { URI brokerURI = new URI(getAmqpFailoverURI()); connection = createAmqpConnection(brokerURI); connection.start(); final int MSG_COUNT = 5; final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue(name.getMethodName()); final MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT); QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); assertEquals(0, proxy.getQueueSize()); stopPrimaryBroker(); restartPrimaryBroker(); assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { return brokerService.getAdminView().getCurrentConnectionsCount() == 1; } }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); assertTrue("Should have a recovered producer.", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { return brokerService.getAdminView().getQueueProducers().length == 1; } }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50))); for (int i = 0; i < MSG_COUNT; ++i) { LOG.debug("Producer sening message #{}", i + 1); producer.send(session.createTextMessage("Message: " + i)); } proxy = getProxyToQueue(name.getMethodName()); assertEquals(0, proxy.getQueueSize()); try { session.commit(); LOG.info("Transacted commit ok after failover."); } catch (TransactionRolledBackException rb) { fail("Session commit should not have failed with TX rolled back."); } assertEquals(MSG_COUNT, proxy.getQueueSize()); }
@Test(timeout=60000) @Repeat(repetitions = 1) public void testTxProducerSendWorksButCommitFails() throws Exception { URI brokerURI = new URI(getAmqpFailoverURI()); connection = createAmqpConnection(brokerURI); connection.start(); final int MSG_COUNT = 10; final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue(name.getMethodName()); final MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT); QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); assertEquals(0, proxy.getQueueSize()); for (int i = 0; i < MSG_COUNT / 2; ++i) { LOG.debug("Producer sening message #{}", i + 1); producer.send(session.createTextMessage("Message: " + i)); } assertEquals(0, proxy.getQueueSize()); stopPrimaryBroker(); restartPrimaryBroker(); proxy = getProxyToQueue(name.getMethodName()); assertEquals(0, proxy.getQueueSize()); for (int i = MSG_COUNT / 2; i < MSG_COUNT; ++i) { LOG.debug("Producer sening message #{}", i + 1); producer.send(session.createTextMessage("Message: " + i)); } try { session.commit(); fail("Session commit should have failed with TX rolled back."); } catch (TransactionRolledBackException rb) { LOG.info("Transacted commit failed after failover: {}", rb.getMessage()); } assertEquals(0, proxy.getQueueSize()); }