@Override public <K, V, E> boolean send(Producer<K, V> producer, ProducerRecord<K, V> record, final E event, final FailedDeliveryCallback<E> failedDeliveryCallback) { try { producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { failedDeliveryCallback.onFailedDelivery(event, exception); } } }); return true; } catch (BufferExhaustedException e) { failedDeliveryCallback.onFailedDelivery(event, e); return false; } }
@Override public boolean shouldRetry(RuntimeException e) { if(e instanceof BufferExhaustedException || e instanceof QueueFullException) { // kind of applying back pressure as we make the current thread to sleep. try { Thread.sleep(retryInterval.toMillis()); retries.inc(); } catch (InterruptedException ex) { throw Throwables.propagate(ex); } return true; } else { return false; } }
@Test public void whenPostEventOverflowsBufferThenUpdateItemStatus() throws Exception { final BatchItem item = new BatchItem("{}"); item.setPartition("1"); final List<BatchItem> batch = new ArrayList<>(); batch.add(item); when(kafkaProducer.partitionsFor(EXPECTED_PRODUCER_RECORD.topic())).thenReturn(ImmutableList.of( new PartitionInfo(EXPECTED_PRODUCER_RECORD.topic(), 1, new Node(1, "host", 9091), null, null))); Mockito .doThrow(BufferExhaustedException.class) .when(kafkaProducer) .send(any(), any()); try { kafkaTopicRepository.syncPostBatch(EXPECTED_PRODUCER_RECORD.topic(), batch); fail(); } catch (final EventPublishingException e) { assertThat(item.getResponse().getPublishingStatus(), equalTo(EventPublishingStatus.FAILED)); assertThat(item.getResponse().getDetail(), equalTo("internal error")); } }