@SuppressWarnings("unchecked") @Test public void shouldThrowStreamsExceptionWhenTimeoutExceptionThrown() throws Exception { final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public Map<String, List<PartitionInfo>> listTopics() { throw new TimeoutException("KABOOM!"); } }; final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 0); try { changelogReader.validatePartitionExists(topicPartition, "store"); fail("Should have thrown streams exception"); } catch (final StreamsException e) { // pass } }
@SuppressWarnings("unchecked") @Test public void shouldFallbackToPartitionsForIfPartitionNotInAllPartitionsList() throws Exception { final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public List<PartitionInfo> partitionsFor(final String topic) { return Collections.singletonList(partitionInfo); } }; final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 10); changelogReader.validatePartitionExists(topicPartition, "store"); }
@SuppressWarnings("unchecked") @Test public void shouldThrowStreamsExceptionIfTimeoutOccursDuringPartitionsFor() throws Exception { final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public List<PartitionInfo> partitionsFor(final String topic) { throw new TimeoutException("KABOOM!"); } }; final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 5); try { changelogReader.validatePartitionExists(topicPartition, "store"); fail("Should have thrown streams exception"); } catch (final StreamsException e) { // pass } }
@SuppressWarnings("unchecked") @Test public void shouldRequestPartitionInfoIfItDoesntExist() throws Exception { final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public Map<String, List<PartitionInfo>> listTopics() { return Collections.emptyMap(); } }; consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(partitionInfo)); final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Time.SYSTEM, 5000); changelogReader.validatePartitionExists(topicPartition, "store"); }
@Before public void before() throws IOException { final Map<String, String> storeToTopic = new HashMap<>(); storeToTopic.put("t1-store", "t1"); storeToTopic.put("t2-store", "t2"); final Map<StateStore, ProcessorNode> storeToProcessorNode = new HashMap<>(); store1 = new NoOpReadOnlyStore<>("t1-store"); storeToProcessorNode.put(store1, new MockProcessorNode(-1)); store2 = new NoOpReadOnlyStore("t2-store"); storeToProcessorNode.put(store2, new MockProcessorNode(-1)); topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(), Collections.<String, SourceNode>emptyMap(), Collections.<String, SinkNode>emptyMap(), Collections.<StateStore>emptyList(), storeToTopic, Arrays.<StateStore>asList(store1, store2)); context = new NoOpProcessorContext(); stateDirPath = TestUtils.tempDirectory().getPath(); stateDirectory = new StateDirectory("appId", stateDirPath, time); consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); stateManager = new GlobalStateManagerImpl(topology, consumer, stateDirectory); checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME); }
@SuppressWarnings("unchecked") @Test public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() throws Exception { final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public List<PartitionInfo> partitionsFor(final String topic) { throw new RuntimeException("KABOOM!"); } }; globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(), config, mockConsumer, new StateDirectory("appId", TestUtils.tempDirectory().getPath(), time), new Metrics(), new MockTime(), "clientId"); try { globalStreamThread.start(); fail("Should have thrown StreamsException if start up failed"); } catch (StreamsException e) { assertThat(e.getCause(), instanceOf(RuntimeException.class)); assertThat(e.getCause().getMessage(), equalTo("KABOOM!")); } assertFalse(globalStreamThread.stillRunning()); }
@Test public void testConsume(TestContext ctx) throws Exception { MockConsumer<String, String> mock = new MockConsumer<>(OffsetResetStrategy.EARLIEST); KafkaReadStream<String, String> consumer = createConsumer(vertx, mock); Async doneLatch = ctx.async(); consumer.handler(record -> { ctx.assertEquals("the_topic", record.topic()); ctx.assertEquals(0, record.partition()); ctx.assertEquals("abc", record.key()); ctx.assertEquals("def", record.value()); consumer.close(v -> doneLatch.complete()); }); consumer.subscribe(Collections.singleton("the_topic"), v -> { mock.schedulePollTask(()-> { mock.rebalance(Collections.singletonList(new TopicPartition("the_topic", 0))); mock.addRecord(new ConsumerRecord<>("the_topic", 0, 0L, "abc", "def")); mock.seek(new TopicPartition("the_topic", 0), 0L); }); }); }
@Override public Publisher<ConsumerRecord<Long, Double>> createPublisher(final long l) { long nRecords = 100; mockConsumer = new MockConsumer<Long, Double>(OffsetResetStrategy.LATEST); mockConsumer.assign(Arrays.asList(topicPartition)); final HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>(); topicPartitionLongHashMap.put(topicPartition, 0L); mockConsumer.updateBeginningOffsets(topicPartitionLongHashMap); topicPartitionLongHashMap.put(topicPartition, nRecords - 1); mockConsumer.updateEndOffsets(topicPartitionLongHashMap); final Random random = new Random(); for (int i = 0; i < nRecords; i++) mockConsumer.addRecord( new ConsumerRecord<Long, Double>( topicPartition.topic(), topicPartition.partition(), i, random.nextLong(), random.nextDouble())); return new KafkaPublisher<Long, Double>(mockConsumer, 100, Executors.newSingleThreadExecutor()); }
@Test public void testConsume() throws Exception { Config testConfig = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.KAFKA_BROKERS, "test")); MockConsumer<String, String> consumer = new MockConsumer<String, String>(OffsetResetStrategy.NONE); consumer.assign(Arrays.asList(new TopicPartition("test_topic", 0))); HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>(); beginningOffsets.put(new TopicPartition("test_topic", 0), 0L); consumer.updateBeginningOffsets(beginningOffsets); ConsumerRecord<String, String> record0 = new ConsumerRecord<>("test_topic", 0, 0L, "key", "value0"); ConsumerRecord<String, String> record1 = new ConsumerRecord<>("test_topic", 0, 1L, "key", "value1"); ConsumerRecord<String, String> record2 = new ConsumerRecord<>("test_topic", 0, 2L, "key", "value2"); consumer.addRecord(record0); consumer.addRecord(record1); consumer.addRecord(record2); try (Kafka09ConsumerClient<String, String> kafka09Client = new Kafka09ConsumerClient<>(testConfig, consumer);) { // Consume from 0 offset Set<KafkaConsumerRecord> consumedRecords = Sets.newHashSet(kafka09Client.consume(new KafkaPartition.Builder().withId(0).withTopicName("test_topic") .build(), 0l, 100l)); Set<Kafka09ConsumerRecord<String, String>> expected = ImmutableSet.<Kafka09ConsumerRecord<String, String>> of(new Kafka09ConsumerRecord<>(record0), new Kafka09ConsumerRecord<>(record1), new Kafka09ConsumerRecord<>(record2)); Assert.assertEquals(consumedRecords, expected); } }
@Before public void setUp() throws Exception { store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, initializer); consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1)); Map<TopicPartition, Long> beginningOffsets = new HashMap<>(); beginningOffsets.put(TP0, 0L); beginningOffsets.put(TP1, 0L); consumer.updateBeginningOffsets(beginningOffsets); }
private Consumer mockConsumer(final RuntimeException toThrow) { return new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public OffsetAndMetadata committed(final TopicPartition partition) { throw toThrow; } }; }
@Test public void testBatch(TestContext ctx) throws Exception { int num = 50; MockConsumer<String, String> mock = new MockConsumer<>(OffsetResetStrategy.EARLIEST); KafkaReadStream<String, String> consumer = createConsumer(vertx, mock); Async doneLatch = ctx.async(); AtomicInteger count = new AtomicInteger(); consumer.handler(record -> { int val = count.getAndIncrement(); if (val < num) { ctx.assertEquals("the_topic", record.topic()); ctx.assertEquals(0, record.partition()); ctx.assertEquals("key-" + val, record.key()); ctx.assertEquals("value-" + val, record.value()); if (val == num - 1) { consumer.close(v -> doneLatch.complete()); } } }); consumer.subscribe(Collections.singleton("the_topic"), v -> { mock.schedulePollTask(() -> { mock.rebalance(Collections.singletonList(new TopicPartition("the_topic", 0))); mock.seek(new TopicPartition("the_topic", 0), 0); for (int i = 0;i < num;i++) { mock.addRecord(new ConsumerRecord<>("the_topic", 0, i, "key-" + i, "value-" + i)); } }); }); }
@Test public void shouldInitializeRestoreConsumerWithOffsetsFromStandbyTasks() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); builder.setApplicationId(applicationId); builder.stream("t1").groupByKey().count("count-one"); builder.stream("t2").groupByKey().count("count-two"); final StreamThread thread = new StreamThread( builder, config, clientSupplier, applicationId, clientId, processId, metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", 0, null, new Node[0], new Node[0]))); restoreConsumer.updatePartitions("stream-thread-test-count-two-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog", 0, null, new Node[0], new Node[0]))); final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); final TopicPartition t1 = new TopicPartition("t1", 0); standbyTasks.put(new TaskId(0, 0), Utils.mkSet(t1)); thread.setPartitionAssignor(new StreamPartitionAssignor() { @Override Map<TaskId, Set<TopicPartition>> standbyTasks() { return standbyTasks; } }); thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList()); assertThat(restoreConsumer.assignment(), equalTo(Utils.mkSet(new TopicPartition("stream-thread-test-count-one-changelog", 0)))); // assign an existing standby plus a new one standbyTasks.put(new TaskId(1, 0), Utils.mkSet(new TopicPartition("t2", 0))); thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList()); assertThat(restoreConsumer.assignment(), equalTo(Utils.mkSet(new TopicPartition("stream-thread-test-count-one-changelog", 0), new TopicPartition("stream-thread-test-count-two-changelog", 0)))); }
@Test public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); builder.setApplicationId(applicationId); builder.stream("t1").groupByKey().count("count-one"); builder.stream("t2").groupByKey().count("count-two"); final StreamThread thread = new StreamThread( builder, config, clientSupplier, applicationId, clientId, processId, metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", 0, null, new Node[0], new Node[0]))); restoreConsumer.updatePartitions("stream-thread-test-count-two-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog", 0, null, new Node[0], new Node[0]))); final HashMap<TopicPartition, Long> offsets = new HashMap<>(); offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L); offsets.put(new TopicPartition("stream-thread-test-count-two-changelog", 0), 0L); restoreConsumer.updateEndOffsets(offsets); restoreConsumer.updateBeginningOffsets(offsets); final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); final TopicPartition t1 = new TopicPartition("t1", 0); standbyTasks.put(new TaskId(0, 0), Utils.mkSet(t1)); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); final TopicPartition t2 = new TopicPartition("t2", 0); activeTasks.put(new TaskId(1, 0), Utils.mkSet(t2)); thread.setPartitionAssignor(new StreamPartitionAssignor() { @Override Map<TaskId, Set<TopicPartition>> standbyTasks() { return standbyTasks; } @Override Map<TaskId, Set<TopicPartition>> activeTasks() { return activeTasks; } }); thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); thread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(t2)); // swap the assignment around and make sure we don't get any exceptions standbyTasks.clear(); activeTasks.clear(); standbyTasks.put(new TaskId(1, 0), Utils.mkSet(t2)); activeTasks.put(new TaskId(0, 0), Utils.mkSet(t1)); thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); thread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(t1)); }
public MockInternalTopicManager(StreamsConfig streamsConfig, MockConsumer<byte[], byte[]> restoreConsumer) { super(new StreamsKafkaClient(streamsConfig), 0, 0, new MockTime()); this.restoreConsumer = restoreConsumer; }
/** * Create a new test driver instance. * @param config the stream configuration for the topology * @param builder the topology builder that will be used to create the topology instance */ public ProcessorTopologyTestDriver(final StreamsConfig config, final TopologyBuilder builder) { topology = builder.setApplicationId(APPLICATION_ID).build(null); final ProcessorTopology globalTopology = builder.buildGlobalStateTopology(); // Set up the consumer and producer ... final Consumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); final Serializer<byte[]> bytesSerializer = new ByteArraySerializer(); producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) { @Override public List<PartitionInfo> partitionsFor(final String topic) { return Collections.singletonList(new PartitionInfo(topic, PARTITION_ID, null, null, null)); } }; // Identify internal topics for forwarding in process ... for (final TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) { internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet()); } // Set up all of the topic+partition information and subscribe the consumer to each ... for (final String topic : topology.sourceTopics()) { final TopicPartition tp = new TopicPartition(topic, PARTITION_ID); partitionsByTopic.put(topic, tp); offsetsByTopicPartition.put(tp, new AtomicLong()); } consumer.assign(offsetsByTopicPartition.keySet()); final StateDirectory stateDirectory = new StateDirectory(APPLICATION_ID, TestUtils.tempDirectory().getPath(), Time.SYSTEM); final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics()); final ThreadCache cache = new ThreadCache("mock", 1024 * 1024, streamsMetrics); if (globalTopology != null) { final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer(); for (final String topicName : globalTopology.sourceTopics()) { final List<PartitionInfo> partitionInfos = new ArrayList<>(); partitionInfos.add(new PartitionInfo(topicName, 1, null, null, null)); globalConsumer.updatePartitions(topicName, partitionInfos); final TopicPartition partition = new TopicPartition(topicName, 1); globalConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L)); globalPartitionsByTopic.put(topicName, partition); offsetsByTopicPartition.put(partition, new AtomicLong()); } final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory); globalStateTask = new GlobalStateUpdateTask(globalTopology, new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache), stateManager ); globalStateTask.initialize(); } if (!partitionsByTopic.isEmpty()) { task = new StreamTask(TASK_ID, APPLICATION_ID, partitionsByTopic.values(), topology, consumer, new StoreChangelogReader( createRestoreConsumer(topology.storeToChangelogTopic()), Time.SYSTEM, 5000), config, streamsMetrics, stateDirectory, cache, new MockTime(), producer); } }
@Override public synchronized Map <String, ReplicateInfo> read () throws Exception { Map<String, ReplicateInfo> catalog = Collections.synchronizedMap( new LinkedHashMap<String, ReplicateInfo> () ); Consumer<String, String> consumer = new MockConsumer<String, String>( OffsetResetStrategy.EARLIEST ); consumer.subscribe(Arrays.asList(mockTopicName)); try { /* greedy implementation, always fetch all replicate info * messages when source connector is started */ ConsumerRecords<String, String> records = consumer.poll( 100 ); if (records != null) { for (ConsumerRecord<String, String> record : records) { String identifier = record.key(); ReplicateInfo info = ReplicateInfo.fromJSONString( record.value() ); /* all message are consumed in order, always overwrite * with the latest info */ catalog.put (identifier, info); } } } catch (Exception e) { throw new Exception ( "Failed to read replicate info records from topic: " + mockTopicName + ", reason: " + e.getMessage(), e ); } finally { consumer.close(); } return catalog; }
@Override public Consumer<String, byte[]> create() { return new MockConsumer<>(OffsetResetStrategy.EARLIEST); }