@Test public void testStreamPartitioner() { final RecordCollectorImpl collector = new RecordCollectorImpl( new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer), "RecordCollectorTest-TestStreamPartitioner"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "27", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "81", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "243", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "28", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "82", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "244", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "245", "0", null, stringSerializer, stringSerializer, streamPartitioner); final Map<TopicPartition, Long> offsets = collector.offsets(); assertEquals((Long) 4L, offsets.get(new TopicPartition("topic1", 0))); assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1))); assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2))); }
@SuppressWarnings("unchecked") @Test public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception { final AtomicInteger attempt = new AtomicInteger(0); final RecordCollectorImpl collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { if (attempt.getAndIncrement() == 0) { throw new TimeoutException(); } return super.send(record, callback); } }, "test"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); final Long offset = collector.offsets().get(new TopicPartition("topic1", 0)); assertEquals(Long.valueOf(0L), offset); }
@Override public boolean visible(String name, Map<String, Object> connectorConfigs) { String partitionerName = (String) connectorConfigs.get(PARTITIONER_CLASS_CONFIG); try { @SuppressWarnings("unchecked") Class<? extends Partitioner> partitioner = (Class<? extends Partitioner>) Class.forName(partitionerName); if (classNameEquals(partitionerName, DefaultPartitioner.class)) { return false; } else if (FieldPartitioner.class.isAssignableFrom(partitioner)) { // subclass of FieldPartitioner return name.equals(PARTITION_FIELD_NAME_CONFIG); } else if (TimeBasedPartitioner.class.isAssignableFrom(partitioner)) { // subclass of TimeBasedPartitioner if (classNameEquals(partitionerName, DailyPartitioner.class) || classNameEquals(partitionerName, HourlyPartitioner.class)) { return name.equals(LOCALE_CONFIG) || name.equals(TIMEZONE_CONFIG); } else { return name.equals(PARTITION_DURATION_MS_CONFIG) || name.equals(PATH_FORMAT_CONFIG) || name.equals(LOCALE_CONFIG) || name.equals(TIMEZONE_CONFIG); } } else { throw new ConfigException("Not a valid partitioner class: " + partitionerName); } } catch (ClassNotFoundException e) { throw new ConfigException("Partitioner class not found: " + partitionerName); } }
@Test public void testPartitionSpread() throws Exception { Multiset<Integer> results = TreeMultiset.create(); Cluster c = Cluster.empty(); try (Partitioner p = new DefaultPartitioner()) { PartitionKeyGenerator pkg = new PartitionKeyGenerator(); mockPartitions(c); for (int i = 0; i < messages; i++) { results.add(p.partition("test", null, pkg.next(), null, null, c)); } int expected = messages / partitions; double threshold = expected * 0.05; for (Multiset.Entry<Integer> e : results.entrySet()) { int offBy = Math.abs(e.getCount() - expected); assertTrue("Partition " + e.getElement() + " had " + e.getCount() + " elements, expected " + expected + ", threshold is " + threshold, offBy < threshold); } } }
@Test public void testCopartitioning() { Random rand = new Random(); DefaultPartitioner defaultPartitioner = new DefaultPartitioner(); WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(intSerializer); WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(topicName, windowedSerializer); for (int k = 0; k < 10; k++) { Integer key = rand.nextInt(); byte[] keyBytes = intSerializer.serialize(topicName, key); String value = key.toString(); byte[] valueBytes = stringSerializer.serialize(topicName, value); Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster); for (int w = 1; w < 10; w++) { TimeWindow window = new TimeWindow(10 * w, 20 * w); Windowed<Integer> windowedKey = new Windowed<>(key, window); Integer actual = streamPartitioner.partition(windowedKey, value, infos.size()); assertEquals(expected, actual); } } }
@Test public void testSpecificPartition() { final RecordCollectorImpl collector = new RecordCollectorImpl( new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer), "RecordCollectorTest-TestSpecificPartition"); collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer); final Map<TopicPartition, Long> offsets = collector.offsets(); assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 0))); assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 1))); assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2))); // ignore StreamPartitioner collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer); assertEquals((Long) 3L, offsets.get(new TopicPartition("topic1", 0))); assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1))); assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 2))); }
@SuppressWarnings("unchecked") @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { throw new TimeoutException(); } }, "test"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); }
@SuppressWarnings("unchecked") @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() throws Exception { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { callback.onCompletion(null, new Exception()); return null; } }, "test"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); }
@SuppressWarnings("unchecked") @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionOnFlushIfASendFailed() throws Exception { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { callback.onCompletion(null, new Exception()); return null; } }, "test"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.flush(); }
@SuppressWarnings("unchecked") @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { callback.onCompletion(null, new Exception()); return null; } }, "test"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.close(); }
@SuppressWarnings("unchecked") @Test(expected = StreamsException.class) public void shouldThrowIfTopicIsUnknown() { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public List<PartitionInfo> partitionsFor(final String topic) { return Collections.EMPTY_LIST; } }, "test"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); }
@Test public void testPartitioner() throws Exception { PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null); PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null); Cluster cluster = new Cluster(null, new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1), Collections.<String>emptySet(), Collections.<String>emptySet()); MockProducer<String, String> producer = new MockProducer<>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer()); ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key", "value"); Future<RecordMetadata> metadata = producer.send(record); assertEquals("Partition should be correct", 1, metadata.get().partition()); producer.clear(); assertEquals("Clear should erase our history", 0, producer.history().size()); }
@Test public void testPartitioner() throws Exception { PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null); PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null); Cluster cluster = new Cluster(new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1), Collections.<String>emptySet()); MockProducer<String, String> producer = new MockProducer<String, String>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer()); ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "key", "value"); Future<RecordMetadata> metadata = producer.send(record); assertEquals("Partition should be correct", 1, metadata.get().partition()); producer.clear(); assertEquals("Clear should erase our history", 0, producer.history().size()); }
@Bean public Map<String, Object> producerConfigs() { //FIXME: 12factorize Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList()); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.ACKS_CONFIG, acks); //props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //props.put(ProducerConfig.LINGER_MS_CONFIG, 1); //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; }
public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster, final String topic) { this.keySerializer = keySerializer; this.cluster = cluster; this.topic = topic; this.defaultPartitioner = new DefaultPartitioner(); }
/** * Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers. * * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)} */ public MockProducer(final boolean autoComplete, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer); }
/** * Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers * * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)} */ public MockProducer(boolean autoComplete, Serializer<K> keySerializer, Serializer<V> valueSerializer) { this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer); }