/** * Adds the record to the list of sent records. * * @see #history() */ @Override public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { verifyProducerState(); int partition = 0; if (!this.cluster.partitionsForTopic(record.topic()).isEmpty()) partition = partition(record, this.cluster); TopicPartition topicPartition = new TopicPartition(record.topic(), partition); ProduceRequestResult result = new ProduceRequestResult(topicPartition); FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0); long offset = nextOffset(topicPartition); Completion completion = new Completion(offset, new RecordMetadata(topicPartition, 0, offset, RecordBatch.NO_TIMESTAMP, Long.valueOf(0L), 0, 0), result, callback); if (!this.transactionInFlight) this.sent.add(record); else this.uncommittedSends.add(record); if (autoComplete) completion.complete(null); else this.completions.addLast(completion); return future; }
/** * Test that waiting on a request that never completes times out */ @Test public void testTimeout() throws Exception { ProduceRequestResult request = new ProduceRequestResult(topicPartition); FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0); assertFalse("Request is not completed", future.isDone()); try { future.get(5, TimeUnit.MILLISECONDS); fail("Should have thrown exception."); } catch (TimeoutException e) { /* this is good */ } request.set(baseOffset, RecordBatch.NO_TIMESTAMP, null); request.done(); assertTrue(future.isDone()); assertEquals(baseOffset + relOffset, future.get().offset()); }
/** * Adds the record to the list of sent records. * * @see #history() */ @Override public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { int partition = 0; if (this.cluster.partitionsForTopic(record.topic()) != null) partition = partition(record, this.cluster); ProduceRequestResult result = new ProduceRequestResult(); FutureRecordMetadata future = new FutureRecordMetadata(result, 0, Record.NO_TIMESTAMP, 0, 0, 0); TopicPartition topicPartition = new TopicPartition(record.topic(), partition); long offset = nextOffset(topicPartition); Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, 0, offset, Record.NO_TIMESTAMP, 0, 0, 0), result, callback); this.sent.add(record); if (autoComplete) completion.complete(null); else this.completions.addLast(completion); return future; }
/** * Test that waiting on a request that never completes times out */ @Test public void testTimeout() throws Exception { ProduceRequestResult request = new ProduceRequestResult(); FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset, Record.NO_TIMESTAMP, 0, 0, 0); assertFalse("Request is not completed", future.isDone()); try { future.get(5, TimeUnit.MILLISECONDS); fail("Should have thrown exception."); } catch (TimeoutException e) { /* this is good */ } request.done(topicPartition, baseOffset, null); assertTrue(future.isDone()); assertEquals(baseOffset + relOffset, future.get().offset()); }
/** * Test that an asynchronous request will eventually throw the right exception */ @Test(expected = ExecutionException.class) public void testError() throws Exception { FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L), relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0); future.get(); }
/** * Test that an asynchronous request will eventually return the right offset */ @Test public void testBlocking() throws Exception { FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L), relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0); assertEquals(baseOffset + relOffset, future.get().offset()); }
/** * Test that an asynchronous request will eventually throw the right exception */ @Test(expected = ExecutionException.class) public void testError() throws Exception { FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L), relOffset, Record.NO_TIMESTAMP, 0, 0, 0); future.get(); }
/** * Test that an asynchronous request will eventually return the right offset */ @Test public void testBlocking() throws Exception { FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L), relOffset, Record.NO_TIMESTAMP, 0, 0, 0); assertEquals(baseOffset + relOffset, future.get().offset()); }