Java 类org.apache.kafka.clients.producer.MockProducer 实例源码

项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@Test
public void testStreamPartitioner() {

    final RecordCollectorImpl collector = new RecordCollectorImpl(
            new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
            "RecordCollectorTest-TestStreamPartitioner");

    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "27", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "81", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "243", "0", null, stringSerializer, stringSerializer, streamPartitioner);

    collector.send("topic1", "28", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "82", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "244", "0", null, stringSerializer, stringSerializer, streamPartitioner);

    collector.send("topic1", "245", "0", null, stringSerializer, stringSerializer, streamPartitioner);

    final Map<TopicPartition, Long> offsets = collector.offsets();

    assertEquals((Long) 4L, offsets.get(new TopicPartition("topic1", 0)));
    assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1)));
    assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@SuppressWarnings("unchecked")
@Test
public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
    final AtomicInteger attempt = new AtomicInteger(0);
    final RecordCollectorImpl collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    if (attempt.getAndIncrement() == 0) {
                        throw new TimeoutException();
                    }
                    return super.send(record, callback);
                }
            },
            "test");

    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    final Long offset = collector.offsets().get(new TopicPartition("topic1", 0));
    assertEquals(Long.valueOf(0L), offset);
}
项目:beam    文件:KafkaIOTest.java   
private static void verifyProducerRecords(MockProducer<Integer, Long> mockProducer,
                                          String topic, int numElements, boolean keyIsAbsent) {

  // verify that appropriate messages are written to kafka
  List<ProducerRecord<Integer, Long>> sent = mockProducer.history();

  // sort by values
  Collections.sort(sent, new Comparator<ProducerRecord<Integer, Long>>() {
    @Override
    public int compare(ProducerRecord<Integer, Long> o1, ProducerRecord<Integer, Long> o2) {
      return Long.compare(o1.value(), o2.value());
    }
  });

  for (int i = 0; i < numElements; i++) {
    ProducerRecord<Integer, Long> record = sent.get(i);
    assertEquals(topic, record.topic());
    if (keyIsAbsent) {
      assertNull(record.key());
    } else {
      assertEquals(i, record.key().intValue());
    }
    assertEquals(i, record.value().longValue());
  }
}
项目:beam    文件:KafkaIOTest.java   
MockProducerWrapper() {
  producerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
  mockProducer = new MockProducer<Integer, Long>(
    false, // disable synchronous completion of send. see ProducerSendCompletionThread below.
    new IntegerSerializer(),
    new LongSerializer()) {

    // override flush() so that it does not complete all the waiting sends, giving a chance to
    // ProducerCompletionThread to inject errors.

    @Override
    public void flush() {
      while (completeNext()) {
        // there are some uncompleted records. let the completion thread handle them.
        try {
          Thread.sleep(10);
        } catch (InterruptedException e) {
          // ok to retry.
        }
      }
    }
  };

  // Add the producer to the global map so that producer factory function can access it.
  assertNull(MOCK_PRODUCER_MAP.putIfAbsent(producerKey, mockProducer));
}
项目:Lagerta    文件:KafkaMockFactory.java   
private MockProducer createProducer() {
    Collection<PartitionInfo> partitionInfos = getInfos()
            .values()
            .stream()
            .flatMap(Collection::stream)
            .collect(Collectors.toList());
    Cluster cluster = new Cluster(
            UUID.randomUUID().toString(),
            Collections.emptyList(),
            partitionInfos,
            Collections.emptySet(),
            Collections.emptySet()
    );
    return new MockProducer(cluster, true, null, new ByteBufferSerializer(), new ByteBufferSerializer());
}
项目:Lagerta    文件:KafkaMockFactory.java   
private MockProducer createProducer() {
    Map<String, List<PartitionInfo>> infos = getInfos();
    Collection<PartitionInfo> partitionInfos = new ArrayList<>();
    for (List<PartitionInfo> infoEntry : infos.values()) {
        partitionInfos.addAll(infoEntry);
    }
    Cluster cluster = new Cluster(UUID.randomUUID().toString(), Collections.<Node>emptyList(), partitionInfos,
        Collections.<String>emptySet(), Collections.<String>emptySet());
    MockProducer producer = new MockProducer(cluster, true, null, new ByteBufferSerializer(), new ByteBufferSerializer());
    PRODUCERS.add(producer);
    return producer;
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamThreadTest.java   
@Test
public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() {
    builder.addSource("source1", "someTopic");

    final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
    final StreamThread thread = new StreamThread(
        builder,
        new StreamsConfig(configProps(true)),
        clientSupplier,
        applicationId,
        clientId,
        processId,
        metrics,
        mockTime,
        new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
        0);

    final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
    assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
    assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
    thread.setPartitionAssignor(new MockStreamsPartitionAssignor(assignment));

    thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0)));

    thread.close();
    thread.run();

    for (final StreamTask task : thread.tasks().values()) {
        assertTrue(((MockProducer) ((RecordCollectorImpl) task.recordCollector()).producer()).closed());
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamThreadTest.java   
@Test
public void shouldCloseThreadProducerOnCloseIfEosDisabled() {
    builder.addSource("source1", "someTopic");

    final StreamThread thread = new StreamThread(
        builder,
        config,
        clientSupplier,
        applicationId,
        clientId,
        processId,
        metrics,
        mockTime,
        new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
        0);

    final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
    assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
    assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
    thread.setPartitionAssignor(new MockStreamsPartitionAssignor(assignment));

    thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0)));

    thread.close();
    thread.run();

    assertTrue(((MockProducer) thread.threadProducer).closed());
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@Test
public void testSpecificPartition() {

    final RecordCollectorImpl collector = new RecordCollectorImpl(
            new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
            "RecordCollectorTest-TestSpecificPartition");

    collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
    collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
    collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);

    collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
    collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);

    collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);

    final Map<TopicPartition, Long> offsets = collector.offsets();

    assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 0)));
    assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 1)));
    assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));

    // ignore StreamPartitioner
    collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
    collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
    collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);

    assertEquals((Long) 3L, offsets.get(new TopicPartition("topic1", 0)));
    assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1)));
    assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 2)));
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception {
    final RecordCollector collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    throw new TimeoutException();
                }
            },
            "test");

    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);

}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() throws Exception {
    final RecordCollector collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    callback.onCompletion(null, new Exception());
                    return null;
                }
            },
            "test");
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnFlushIfASendFailed() throws Exception {
    final RecordCollector collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    callback.onCompletion(null, new Exception());
                    return null;
                }
            },
            "test");
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.flush();
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception {
    final RecordCollector collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    callback.onCompletion(null, new Exception());
                    return null;
                }
            },
            "test");
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorTest.java   
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowIfTopicIsUnknown() {
    final RecordCollector collector = new RecordCollectorImpl(
        new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
            @Override
            public List<PartitionInfo> partitionsFor(final String topic) {
                return Collections.EMPTY_LIST;
            }

        },
        "test");
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamTaskTest.java   
@Test
public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() throws Exception {
    final MockProducer producer = new MockProducer();
    task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
        eosConfig, streamsMetrics, stateDirectory, null, time, producer);

    assertTrue(producer.transactionInitialized());
    assertTrue(producer.transactionInFlight());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamTaskTest.java   
@Test
public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() throws Exception {
    final MockProducer producer = new MockProducer();
    task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
        config, streamsMetrics, stateDirectory, null, time, producer);

    assertFalse(producer.transactionInitialized());
    assertFalse(producer.transactionInFlight());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamTaskTest.java   
@Test
public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() throws Exception {
    final MockProducer producer = new MockProducer();
    task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
        eosConfig, streamsMetrics, stateDirectory, null, time, producer);

    task.addRecords(partition1, Collections.singletonList(
        new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
    task.process();

    task.suspend();
    assertTrue(producer.sentOffsets());
    assertTrue(producer.transactionCommitted());
    assertFalse(producer.transactionInFlight());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamTaskTest.java   
@Test
public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() throws Exception {
    final MockProducer producer = new MockProducer();
    task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
        eosConfig, streamsMetrics, stateDirectory, null, time, producer);

    task.suspend();
    assertTrue(producer.transactionCommitted());
    assertFalse(producer.transactionInFlight());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamTaskTest.java   
@Test
public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() throws Exception {
    final MockProducer producer = new MockProducer();
    task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
        config, streamsMetrics, stateDirectory, null, time, producer);

    task.addRecords(partition1, Collections.singletonList(
        new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
    task.process();

    task.suspend();
    assertFalse(producer.sentOffsets());
    assertFalse(producer.transactionCommitted());
    assertFalse(producer.transactionInFlight());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamTaskTest.java   
@Test
public void shouldStartNewTransactionOnResumeIfEosEnabled() throws Exception {
    final MockProducer producer = new MockProducer();
    task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
        eosConfig, streamsMetrics, stateDirectory, null, time, producer);

    task.addRecords(partition1, Collections.singletonList(
        new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
    task.process();
    task.suspend();

    task.resume();
    assertTrue(producer.transactionInFlight());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamTaskTest.java   
@Test
public void shouldNotStartNewTransactionOnResumeIfEosDisabled() throws Exception {
    final MockProducer producer = new MockProducer();
    task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
        config, streamsMetrics, stateDirectory, null, time, producer);

    task.addRecords(partition1, Collections.singletonList(
        new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
    task.process();
    task.suspend();

    task.resume();
    assertFalse(producer.transactionInFlight());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamTaskTest.java   
@Test
public void shouldStartNewTransactionOnCommitIfEosEnabled() throws Exception {
    final MockProducer producer = new MockProducer();
    task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
        eosConfig, streamsMetrics, stateDirectory, null, time, producer);

    task.addRecords(partition1, Collections.singletonList(
        new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
    task.process();

    task.commit();
    assertTrue(producer.transactionInFlight());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamTaskTest.java   
@Test
public void shouldNotStartNewTransactionOnCommitIfEosDisabled() throws Exception {
    final MockProducer producer = new MockProducer();
    task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
        config, streamsMetrics, stateDirectory, null, time, producer);

    task.addRecords(partition1, Collections.singletonList(
        new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
    task.process();

    task.commit();
    assertFalse(producer.transactionInFlight());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamTaskTest.java   
@Test
public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() throws Exception {
    final MockProducer producer = new MockProducer();
    task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
        eosConfig, streamsMetrics, stateDirectory, null, time, producer);

    task.close(false);
    task = null;
    assertTrue(producer.transactionAborted());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamTaskTest.java   
@Test
public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() throws Exception {
    final MockProducer producer = new MockProducer();
    task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
        config, streamsMetrics, stateDirectory, null, time, producer);

    task.close(false);
    assertFalse(producer.transactionAborted());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamTaskTest.java   
@SuppressWarnings("unchecked")
@Test
public void shouldCloseProducerOnCloseWhenEosEnabled() throws Exception {
    final MockProducer producer = new MockProducer();

    task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
        changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer);

    task.close(true);
    task = null;
    assertTrue(producer.closed());
}
项目:kafka-0.11.0.0-src-with-comment    文件:MockClientSupplier.java   
@Override
public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
    if (applicationId != null) {
        assertThat((String) config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), startsWith(applicationId + "-"));
    } else {
        assertFalse(config.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
    }
    final MockProducer<byte[], byte[]> producer = new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
    producers.add(producer);
    return producer;
}
项目:beam    文件:KafkaIOTest.java   
ProducerSendCompletionThread(MockProducer<Integer, Long> mockProducer,
                             int maxErrors,
                             int errorFrequency) {
  this.mockProducer = mockProducer;
  this.maxErrors = maxErrors;
  this.errorFrequency = errorFrequency;
  injectorThread = Executors.newSingleThreadExecutor();
}
项目:ache    文件:KafkaTargetRepositoryTest.java   
@Test
public void shouldSendDataToKafka() throws IOException {
    // given
    Page target = new Page(new URL(url), html, responseHeaders);
    target.setCrawlerId("mycrawler");
    target.setTargetRelevance(TargetRelevance.RELEVANT);
    String topicName = "ache-data-topic";

    StringSerializer ss = new StringSerializer();
    MockProducer<String, String> producer = new MockProducer<>(true, ss, ss);

    KafkaConfig.Format format = KafkaConfig.Format.JSON;

    KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format);

    // when
    repository.insert(target);
    repository.close();

    // then
    List<ProducerRecord<String, String>> history = producer.history();

    TargetModelJson page = mapper.readValue(history.get(0).value(), TargetModelJson.class);
    assertThat(page.getContentAsString(), is(html));
    assertThat(page.getUrl(), is(url));
    assertThat(page.getResponseHeaders().get("content-type").get(0), is("text/html"));
    assertThat(page.getRelevance().isRelevant(), is(TargetRelevance.RELEVANT.isRelevant()));
    assertThat(page.getRelevance().getRelevance(), is(TargetRelevance.RELEVANT.getRelevance()));
    assertThat(page.getCrawlerId(), is("mycrawler"));
}
项目:ache    文件:KafkaTargetRepositoryTest.java   
@Test
public void shouldSendDataToKafkaUsingCDR31() throws IOException {
    // given
    Page target = new Page(new URL(url), html, responseHeaders);
    target.setCrawlerId("mycrawler");
    target.setTargetRelevance(TargetRelevance.RELEVANT);
    String topicName = "ache-data-topic";

    StringSerializer ss = new StringSerializer();
    MockProducer<String, String> producer = new MockProducer<>(true, ss, ss);

    KafkaConfig.Format format = KafkaConfig.Format.CDR31;

    KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format);

    // when
    repository.insert(target);
    repository.close();

    // then
    List<ProducerRecord<String, String>> history = producer.history();

    CDR31Document page = mapper.readValue(history.get(0).value(), CDR31Document.class);
    assertThat(page.getRawContent(), is(html));
    assertThat(page.getUrl(), is(url));
    assertThat(page.getResponseHeaders().get("content-type"), is("text/html"));
    assertThat(page.getCrawler(), is("mycrawler"));
}
项目:Lagerta    文件:KafkaMockFactory.java   
@Override
public <K, V> MockProducer<K, V> producer(Properties properties) {
    return createProducer();
}
项目:Lagerta    文件:KafkaMockFactory.java   
@Override public <K, V> MockProducer<K, V> producer(Properties properties) {
    return createProducer();
}
项目:kafka-0.11.0.0-src-with-comment    文件:ProcessorTopologyTestDriver.java   
/**
 * Create a new test driver instance.
 * @param config the stream configuration for the topology
 * @param builder the topology builder that will be used to create the topology instance
 */
public ProcessorTopologyTestDriver(final StreamsConfig config,
                                   final TopologyBuilder builder) {
    topology = builder.setApplicationId(APPLICATION_ID).build(null);
    final ProcessorTopology globalTopology  = builder.buildGlobalStateTopology();

    // Set up the consumer and producer ...
    final Consumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
    producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
        @Override
        public List<PartitionInfo> partitionsFor(final String topic) {
            return Collections.singletonList(new PartitionInfo(topic, PARTITION_ID, null, null, null));
        }
    };

    // Identify internal topics for forwarding in process ...
    for (final TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) {
        internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
    }

    // Set up all of the topic+partition information and subscribe the consumer to each ...
    for (final String topic : topology.sourceTopics()) {
        final TopicPartition tp = new TopicPartition(topic, PARTITION_ID);
        partitionsByTopic.put(topic, tp);
        offsetsByTopicPartition.put(tp, new AtomicLong());
    }

    consumer.assign(offsetsByTopicPartition.keySet());

    final StateDirectory stateDirectory = new StateDirectory(APPLICATION_ID, TestUtils.tempDirectory().getPath(), Time.SYSTEM);
    final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
    final ThreadCache cache = new ThreadCache("mock", 1024 * 1024, streamsMetrics);

    if (globalTopology != null) {
        final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer();
        for (final String topicName : globalTopology.sourceTopics()) {
            final List<PartitionInfo> partitionInfos = new ArrayList<>();
            partitionInfos.add(new PartitionInfo(topicName, 1, null, null, null));
            globalConsumer.updatePartitions(topicName, partitionInfos);
            final TopicPartition partition = new TopicPartition(topicName, 1);
            globalConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
            globalPartitionsByTopic.put(topicName, partition);
            offsetsByTopicPartition.put(partition, new AtomicLong());
        }
        final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory);
        globalStateTask = new GlobalStateUpdateTask(globalTopology,
                                                    new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache),
                                                    stateManager
        );
        globalStateTask.initialize();
    }

    if (!partitionsByTopic.isEmpty()) {
        task = new StreamTask(TASK_ID,
                              APPLICATION_ID,
                              partitionsByTopic.values(),
                              topology,
                              consumer,
                              new StoreChangelogReader(
                                  createRestoreConsumer(topology.storeToChangelogTopic()),
                                  Time.SYSTEM,
                                  5000),
                              config,
                              streamsMetrics, stateDirectory,
                              cache,
                              new MockTime(),
                              producer);
    }
}
项目:trellis-rosid    文件:AbstractResourceServiceTest.java   
public MyResourceService(final CuratorFramework curator, final EventService eventService,
        final InterProcessLock lock) {
    super(baseUrl, new MockProducer<>(true, new StringSerializer(), new StringSerializer()), curator,
            eventService, mockIdSupplier, false);
    this.lock = lock;
}
项目:trellis-rosid    文件:AbstractResourceServiceTest.java   
public MyResourceService(final String baseUrl, final String connectString) {
    super(baseUrl, new MockProducer<>(true, new StringSerializer(), new StringSerializer()),
            getZkClient(connectString), null, null, false);
}
项目:kafka-reactive-streams    文件:KafkaSubscriberBlackboxTest.java   
public KafkaSubscriberBlackboxTest() {
    super(new TestEnvironment());
    mockProducer = new MockProducer<Long, Long>(true, new LongSerializer(), new LongSerializer());
}
项目:kafka-reactive-streams    文件:KafkaSubscriberWhiteboxTest.java   
public KafkaSubscriberWhiteboxTest() {
    super(new TestEnvironment());
    mockProducer = new MockProducer<Long, Long>(true, new LongSerializer(), new LongSerializer());
}
项目:light-eventuate-4j    文件:EventuateKafkaProducerTest.java   
@BeforeClass
public static void setUp() {
    eventuateKafkaProducer = new EventuateKafkaProducer();
    eventuateKafkaProducer.setProducer( new MockProducer(true, null , null, null) );
    event = new PublishedEvent("eventId", "entityId", "entityType", "eventJson", "eventType", null, Optional.of("metadata"));
}
项目:beam    文件:KafkaIOTest.java   
ProducerSendCompletionThread(MockProducer<Integer, Long> mockProducer) {
  // complete everything successfully
  this(mockProducer, 0, 0);
}
项目:core-ng-project    文件:LogForwarderTest.java   
@BeforeEach
void createLogForwarder() {
    logForwarder = new LogForwarder("url", "app", new MockProducer<>());
}