/** * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also adding new * partitions to the transaction. flushNewPartitions method is moving this logic to pre-commit/flush, to make * resumeTransaction simpler. Otherwise resumeTransaction would require to restore state of the not yet added/"in-flight" * partitions. */ private void flushNewPartitions() { LOG.info("Flushing new partitions"); TransactionalRequestResult result = enqueueNewPartitions(); Object sender = getValue(kafkaProducer, "sender"); invoke(sender, "wakeup"); result.await(); }
private TransactionalRequestResult enqueueNewPartitions() { Object transactionManager = getValue(kafkaProducer, "transactionManager"); synchronized (transactionManager) { Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler}); TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result"); return result; } }
/** * Needs to be called before any other methods when the transactional.id is set in the configuration. * <p> * This method does the following: * 1. Ensures any transactions initiated by previous instances of the producer * are completed. If the previous instance had failed with a transaction in * progress, it will be aborted. If the last transaction had begun completion, * but not yet finished, this method awaits its completion. * 2. Gets the internal producer id and epoch, used in all future transactional * messages issued by the producer. * * @throws IllegalStateException if the TransactionalId for the producer is not set * in the configuration. */ public void initTransactions() { if (transactionManager == null) throw new IllegalStateException("Cannot call initTransactions without setting a transactional id."); TransactionalRequestResult result = transactionManager.initializeTransactions(); sender.wakeup(); result.await(); }
/** * Sends a list of consumed offsets to the consumer group coordinator, and also marks * those offsets as part of the current transaction. These offsets will be considered * consumed only if the transaction is committed successfully. * <p> * This method should be used when you need to batch consumed and produced messages * together, typically in a consume-transform-produce pattern. * * @throws ProducerFencedException if another producer with the same * transactional.id is active. */ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException { if (transactionManager == null) throw new IllegalStateException("Cannot send offsets to transaction since transactions are not enabled."); TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId); sender.wakeup(); result.await(); }
/** * Commits the ongoing transaction. This method will flush any unsent records before actually committing the transaction. * <p> * Further, if any of the {@link #send(ProducerRecord)} calls which were part of the transaction hit irrecoverable * errors, this method will throw the last received exception immediately and the transaction will not be committed. * So all {@link #send(ProducerRecord)} calls in a transaction must succeed in order for this method to succeed. * * @throws ProducerFencedException if another producer with the same * transactional.id is active. */ public void commitTransaction() throws ProducerFencedException { if (transactionManager == null) throw new IllegalStateException("Cannot commit transaction since transactions are not enabled"); TransactionalRequestResult result = transactionManager.beginCommit(); sender.wakeup(); result.await(); }
/** * Aborts the ongoing transaction. Any unflushed produce messages will be aborted when this call is made. * This call will throw an exception immediately if any prior {@link #send(ProducerRecord)} calls failed with a * {@link ProducerFencedException} or an instance of {@link org.apache.kafka.common.errors.AuthorizationException}. * * @throws ProducerFencedException if another producer with the same * transactional.id is active. */ public void abortTransaction() throws ProducerFencedException { if (transactionManager == null) throw new IllegalStateException("Cannot abort transaction since transactions are not enabled."); TransactionalRequestResult result = transactionManager.beginAbort(); sender.wakeup(); result.await(); }