Java 类org.apache.kafka.clients.consumer.MockConsumer 实例源码

项目:kafka-0.11.0.0-src-with-comment    文件:StoreChangelogReaderTest.java   
@SuppressWarnings("unchecked")
@Test
public void shouldThrowStreamsExceptionWhenTimeoutExceptionThrown() throws Exception {
    final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
        @Override
        public Map<String, List<PartitionInfo>> listTopics() {
            throw new TimeoutException("KABOOM!");
        }
    };
    final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 0);
    try {
        changelogReader.validatePartitionExists(topicPartition, "store");
        fail("Should have thrown streams exception");
    } catch (final StreamsException e) {
        // pass
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:StoreChangelogReaderTest.java   
@SuppressWarnings("unchecked")
@Test
public void shouldFallbackToPartitionsForIfPartitionNotInAllPartitionsList() throws Exception {
    final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
        @Override
        public List<PartitionInfo> partitionsFor(final String topic) {
            return Collections.singletonList(partitionInfo);
        }
    };

    final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 10);
    changelogReader.validatePartitionExists(topicPartition, "store");
}
项目:kafka-0.11.0.0-src-with-comment    文件:StoreChangelogReaderTest.java   
@SuppressWarnings("unchecked")
@Test
public void shouldThrowStreamsExceptionIfTimeoutOccursDuringPartitionsFor() throws Exception {
    final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
        @Override
        public List<PartitionInfo> partitionsFor(final String topic) {
            throw new TimeoutException("KABOOM!");
        }
    };
    final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 5);
    try {
        changelogReader.validatePartitionExists(topicPartition, "store");
        fail("Should have thrown streams exception");
    } catch (final StreamsException e) {
        // pass
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:StoreChangelogReaderTest.java   
@SuppressWarnings("unchecked")
@Test
public void shouldRequestPartitionInfoIfItDoesntExist() throws Exception {
    final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
        @Override
        public Map<String, List<PartitionInfo>> listTopics() {
            return Collections.emptyMap();
        }
    };

    consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(partitionInfo));
    final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Time.SYSTEM, 5000);
    changelogReader.validatePartitionExists(topicPartition, "store");
}
项目:kafka-0.11.0.0-src-with-comment    文件:GlobalStateManagerImplTest.java   
@Before
public void before() throws IOException {
    final Map<String, String> storeToTopic = new HashMap<>();
    storeToTopic.put("t1-store", "t1");
    storeToTopic.put("t2-store", "t2");

    final Map<StateStore, ProcessorNode> storeToProcessorNode = new HashMap<>();
    store1 = new NoOpReadOnlyStore<>("t1-store");
    storeToProcessorNode.put(store1, new MockProcessorNode(-1));
    store2 = new NoOpReadOnlyStore("t2-store");
    storeToProcessorNode.put(store2, new MockProcessorNode(-1));
    topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
                                     Collections.<String, SourceNode>emptyMap(),
                                     Collections.<String, SinkNode>emptyMap(),
                                     Collections.<StateStore>emptyList(),
                                     storeToTopic,
                                     Arrays.<StateStore>asList(store1, store2));

    context = new NoOpProcessorContext();
    stateDirPath = TestUtils.tempDirectory().getPath();
    stateDirectory = new StateDirectory("appId", stateDirPath, time);
    consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    stateManager = new GlobalStateManagerImpl(topology, consumer, stateDirectory);
    checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
}
项目:kafka-0.11.0.0-src-with-comment    文件:GlobalStreamThreadTest.java   
@SuppressWarnings("unchecked")
@Test
public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() throws Exception {
    final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
        @Override
        public List<PartitionInfo> partitionsFor(final String topic) {
            throw new RuntimeException("KABOOM!");
        }
    };
    globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(),
                                                config,
                                                mockConsumer,
                                                new StateDirectory("appId", TestUtils.tempDirectory().getPath(), time),
                                                new Metrics(),
                                                new MockTime(),
                                                "clientId");

    try {
        globalStreamThread.start();
        fail("Should have thrown StreamsException if start up failed");
    } catch (StreamsException e) {
        assertThat(e.getCause(), instanceOf(RuntimeException.class));
        assertThat(e.getCause().getMessage(), equalTo("KABOOM!"));
    }
    assertFalse(globalStreamThread.stillRunning());
}
项目:vertx-kafka-client    文件:ConsumerMockTestBase.java   
@Test
public void testConsume(TestContext ctx) throws Exception {
  MockConsumer<String, String> mock = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
  KafkaReadStream<String, String> consumer = createConsumer(vertx, mock);
  Async doneLatch = ctx.async();
  consumer.handler(record -> {
    ctx.assertEquals("the_topic", record.topic());
    ctx.assertEquals(0, record.partition());
    ctx.assertEquals("abc", record.key());
    ctx.assertEquals("def", record.value());
    consumer.close(v -> doneLatch.complete());
  });
  consumer.subscribe(Collections.singleton("the_topic"), v -> {
    mock.schedulePollTask(()-> {
      mock.rebalance(Collections.singletonList(new TopicPartition("the_topic", 0)));
      mock.addRecord(new ConsumerRecord<>("the_topic", 0, 0L, "abc", "def"));
      mock.seek(new TopicPartition("the_topic", 0), 0L);
    });
  });
}
项目:kafka-reactive-streams    文件:KafkaPublisherTest.java   
@Override public Publisher<ConsumerRecord<Long, Double>> createPublisher(final long l) {
    long nRecords = 100;

    mockConsumer = new MockConsumer<Long, Double>(OffsetResetStrategy.LATEST);
    mockConsumer.assign(Arrays.asList(topicPartition));
    final HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();
    topicPartitionLongHashMap.put(topicPartition, 0L);
    mockConsumer.updateBeginningOffsets(topicPartitionLongHashMap);
    topicPartitionLongHashMap.put(topicPartition, nRecords - 1);
    mockConsumer.updateEndOffsets(topicPartitionLongHashMap);
    final Random random = new Random();
    for (int i = 0; i < nRecords; i++)
        mockConsumer.addRecord(
                new ConsumerRecord<Long, Double>(
                        topicPartition.topic(),
                        topicPartition.partition(),
                        i,
                        random.nextLong(),
                        random.nextDouble()));

    return new KafkaPublisher<Long, Double>(mockConsumer, 100, Executors.newSingleThreadExecutor());
}
项目:incubator-gobblin    文件:Kafka09ConsumerClientTest.java   
@Test
public void testConsume() throws Exception {
  Config testConfig = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.KAFKA_BROKERS, "test"));
  MockConsumer<String, String> consumer = new MockConsumer<String, String>(OffsetResetStrategy.NONE);
  consumer.assign(Arrays.asList(new TopicPartition("test_topic", 0)));

  HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
  beginningOffsets.put(new TopicPartition("test_topic", 0), 0L);
  consumer.updateBeginningOffsets(beginningOffsets);

  ConsumerRecord<String, String> record0 = new ConsumerRecord<>("test_topic", 0, 0L, "key", "value0");
  ConsumerRecord<String, String> record1 = new ConsumerRecord<>("test_topic", 0, 1L, "key", "value1");
  ConsumerRecord<String, String> record2 = new ConsumerRecord<>("test_topic", 0, 2L, "key", "value2");

  consumer.addRecord(record0);
  consumer.addRecord(record1);
  consumer.addRecord(record2);

  try (Kafka09ConsumerClient<String, String> kafka09Client = new Kafka09ConsumerClient<>(testConfig, consumer);) {

    // Consume from 0 offset
    Set<KafkaConsumerRecord> consumedRecords =
        Sets.newHashSet(kafka09Client.consume(new KafkaPartition.Builder().withId(0).withTopicName("test_topic")
            .build(), 0l, 100l));

    Set<Kafka09ConsumerRecord<String, String>> expected =
        ImmutableSet.<Kafka09ConsumerRecord<String, String>> of(new Kafka09ConsumerRecord<>(record0),
            new Kafka09ConsumerRecord<>(record1), new Kafka09ConsumerRecord<>(record2));
    Assert.assertEquals(consumedRecords, expected);

  }

}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaBasedLogTest.java   
@Before
public void setUp() throws Exception {
    store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"},
            TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, initializer);
    consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
    Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
    beginningOffsets.put(TP0, 0L);
    beginningOffsets.put(TP1, 0L);
    consumer.updateBeginningOffsets(beginningOffsets);
}
项目:kafka-0.11.0.0-src-with-comment    文件:AbstractTaskTest.java   
private Consumer mockConsumer(final RuntimeException toThrow) {
    return new MockConsumer(OffsetResetStrategy.EARLIEST) {
        @Override
        public OffsetAndMetadata committed(final TopicPartition partition) {
            throw toThrow;
        }
    };
}
项目:vertx-kafka-client    文件:ConsumerMockTestBase.java   
@Test
public void testBatch(TestContext ctx) throws Exception {
  int num = 50;
  MockConsumer<String, String> mock = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
  KafkaReadStream<String, String> consumer = createConsumer(vertx, mock);
  Async doneLatch = ctx.async();
  AtomicInteger count = new AtomicInteger();
  consumer.handler(record -> {
    int val = count.getAndIncrement();
    if (val < num) {
      ctx.assertEquals("the_topic", record.topic());
      ctx.assertEquals(0, record.partition());
      ctx.assertEquals("key-" + val, record.key());
      ctx.assertEquals("value-" + val, record.value());
      if (val == num - 1) {
        consumer.close(v -> doneLatch.complete());
      }
    }
  });
  consumer.subscribe(Collections.singleton("the_topic"), v -> {
    mock.schedulePollTask(() -> {
      mock.rebalance(Collections.singletonList(new TopicPartition("the_topic", 0)));
      mock.seek(new TopicPartition("the_topic", 0), 0);
      for (int i = 0;i < num;i++) {
        mock.addRecord(new ConsumerRecord<>("the_topic", 0, i, "key-" + i, "value-" + i));
      }
    });
  });
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamThreadTest.java   
@Test
public void shouldInitializeRestoreConsumerWithOffsetsFromStandbyTasks() throws Exception {
    final KStreamBuilder builder = new KStreamBuilder();
    builder.setApplicationId(applicationId);
    builder.stream("t1").groupByKey().count("count-one");
    builder.stream("t2").groupByKey().count("count-two");

    final StreamThread thread = new StreamThread(
        builder,
        config,
        clientSupplier,
        applicationId,
        clientId,
        processId,
        metrics,
        mockTime,
        new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
        0);

    final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
    restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
                                     Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog",
                                                                                 0,
                                                                                 null,
                                                                                 new Node[0],
                                                                                 new Node[0])));
    restoreConsumer.updatePartitions("stream-thread-test-count-two-changelog",
                                     Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog",
                                                                                 0,
                                                                                 null,
                                                                                 new Node[0],
                                                                                 new Node[0])));

    final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
    final TopicPartition t1 = new TopicPartition("t1", 0);
    standbyTasks.put(new TaskId(0, 0), Utils.mkSet(t1));

    thread.setPartitionAssignor(new StreamPartitionAssignor() {
        @Override
        Map<TaskId, Set<TopicPartition>> standbyTasks() {
            return standbyTasks;
        }
    });

    thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
    thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());

    assertThat(restoreConsumer.assignment(), equalTo(Utils.mkSet(new TopicPartition("stream-thread-test-count-one-changelog", 0))));

    // assign an existing standby plus a new one
    standbyTasks.put(new TaskId(1, 0), Utils.mkSet(new TopicPartition("t2", 0)));
    thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
    thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());

    assertThat(restoreConsumer.assignment(), equalTo(Utils.mkSet(new TopicPartition("stream-thread-test-count-one-changelog", 0),
                                                                 new TopicPartition("stream-thread-test-count-two-changelog", 0))));
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamThreadTest.java   
@Test
public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception {
    final KStreamBuilder builder = new KStreamBuilder();
    builder.setApplicationId(applicationId);
    builder.stream("t1").groupByKey().count("count-one");
    builder.stream("t2").groupByKey().count("count-two");

    final StreamThread thread = new StreamThread(
        builder,
        config,
        clientSupplier,
        applicationId,
        clientId,
        processId,
        metrics,
        mockTime,
        new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
        0);
    final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
    restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
                                     Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog",
                                                                                 0,
                                                                                 null,
                                                                                 new Node[0],
                                                                                 new Node[0])));
    restoreConsumer.updatePartitions("stream-thread-test-count-two-changelog",
                                     Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog",
                                                                                 0,
                                                                                 null,
                                                                                 new Node[0],
                                                                                 new Node[0])));


    final HashMap<TopicPartition, Long> offsets = new HashMap<>();
    offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
    offsets.put(new TopicPartition("stream-thread-test-count-two-changelog", 0), 0L);
    restoreConsumer.updateEndOffsets(offsets);
    restoreConsumer.updateBeginningOffsets(offsets);

    final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
    final TopicPartition t1 = new TopicPartition("t1", 0);
    standbyTasks.put(new TaskId(0, 0), Utils.mkSet(t1));

    final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
    final TopicPartition t2 = new TopicPartition("t2", 0);
    activeTasks.put(new TaskId(1, 0), Utils.mkSet(t2));

    thread.setPartitionAssignor(new StreamPartitionAssignor() {
        @Override
        Map<TaskId, Set<TopicPartition>> standbyTasks() {
            return standbyTasks;
        }

        @Override
        Map<TaskId, Set<TopicPartition>> activeTasks() {
            return activeTasks;
        }
    });

    thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
    thread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(t2));

    // swap the assignment around and make sure we don't get any exceptions
    standbyTasks.clear();
    activeTasks.clear();
    standbyTasks.put(new TaskId(1, 0), Utils.mkSet(t2));
    activeTasks.put(new TaskId(0, 0), Utils.mkSet(t1));

    thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
    thread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(t1));
}
项目:kafka-0.11.0.0-src-with-comment    文件:MockInternalTopicManager.java   
public MockInternalTopicManager(StreamsConfig streamsConfig, MockConsumer<byte[], byte[]> restoreConsumer) {
    super(new StreamsKafkaClient(streamsConfig), 0, 0, new MockTime());

    this.restoreConsumer = restoreConsumer;
}
项目:kafka-0.11.0.0-src-with-comment    文件:ProcessorTopologyTestDriver.java   
/**
 * Create a new test driver instance.
 * @param config the stream configuration for the topology
 * @param builder the topology builder that will be used to create the topology instance
 */
public ProcessorTopologyTestDriver(final StreamsConfig config,
                                   final TopologyBuilder builder) {
    topology = builder.setApplicationId(APPLICATION_ID).build(null);
    final ProcessorTopology globalTopology  = builder.buildGlobalStateTopology();

    // Set up the consumer and producer ...
    final Consumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
    producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
        @Override
        public List<PartitionInfo> partitionsFor(final String topic) {
            return Collections.singletonList(new PartitionInfo(topic, PARTITION_ID, null, null, null));
        }
    };

    // Identify internal topics for forwarding in process ...
    for (final TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) {
        internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
    }

    // Set up all of the topic+partition information and subscribe the consumer to each ...
    for (final String topic : topology.sourceTopics()) {
        final TopicPartition tp = new TopicPartition(topic, PARTITION_ID);
        partitionsByTopic.put(topic, tp);
        offsetsByTopicPartition.put(tp, new AtomicLong());
    }

    consumer.assign(offsetsByTopicPartition.keySet());

    final StateDirectory stateDirectory = new StateDirectory(APPLICATION_ID, TestUtils.tempDirectory().getPath(), Time.SYSTEM);
    final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
    final ThreadCache cache = new ThreadCache("mock", 1024 * 1024, streamsMetrics);

    if (globalTopology != null) {
        final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer();
        for (final String topicName : globalTopology.sourceTopics()) {
            final List<PartitionInfo> partitionInfos = new ArrayList<>();
            partitionInfos.add(new PartitionInfo(topicName, 1, null, null, null));
            globalConsumer.updatePartitions(topicName, partitionInfos);
            final TopicPartition partition = new TopicPartition(topicName, 1);
            globalConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
            globalPartitionsByTopic.put(topicName, partition);
            offsetsByTopicPartition.put(partition, new AtomicLong());
        }
        final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory);
        globalStateTask = new GlobalStateUpdateTask(globalTopology,
                                                    new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache),
                                                    stateManager
        );
        globalStateTask.initialize();
    }

    if (!partitionsByTopic.isEmpty()) {
        task = new StreamTask(TASK_ID,
                              APPLICATION_ID,
                              partitionsByTopic.values(),
                              topology,
                              consumer,
                              new StoreChangelogReader(
                                  createRestoreConsumer(topology.storeToChangelogTopic()),
                                  Time.SYSTEM,
                                  5000),
                              config,
                              streamsMetrics, stateDirectory,
                              cache,
                              new MockTime(),
                              producer);
    }
}
项目:replicate-connector-for-kafka    文件:ReplicateTestConfig.java   
@Override
public synchronized Map <String, ReplicateInfo> read () 
    throws Exception {
    Map<String, ReplicateInfo> catalog = Collections.synchronizedMap(
        new LinkedHashMap<String, ReplicateInfo> ()
    );
    Consumer<String, String> consumer = 
        new MockConsumer<String, String>(
            OffsetResetStrategy.EARLIEST
        );
    consumer.subscribe(Arrays.asList(mockTopicName));

    try {
        /* greedy implementation, always fetch all replicate info 
         * messages when source connector is started */
        ConsumerRecords<String, String> records = consumer.poll(
            100
        );
        if (records != null) {
            for (ConsumerRecord<String, String> record : records) {
                String identifier  = record.key();
                ReplicateInfo info = ReplicateInfo.fromJSONString(
                    record.value()
                );
                /* all message are consumed in order, always overwrite 
                 * with the latest info */
                catalog.put (identifier, info);
            }
        }
    }
    catch (Exception e) {
        throw new Exception (
            "Failed to read replicate info records from topic: " + 
            mockTopicName + ", reason: " + e.getMessage(),
            e
        );
    }
    finally {
        consumer.close();
    }

    return catalog;
}
项目:core-ng-project    文件:TestModule.java   
@Override
public Consumer<String, byte[]> create() {
    return new MockConsumer<>(OffsetResetStrategy.EARLIEST);
}