Java 类com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput 实例源码

项目:aws-kinesis-zombies    文件:ZombieRecordProcessor.java   
@SneakyThrows
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
    List<Record> records = processRecordsInput.getRecords();
    // Used to update the last processed record
    IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer();
    log.info("Recovering records from kinesis.");
    for (Record r : records) {
        try {
            int len = r.getData().remaining();
            byte[] buffer = new byte[len];
            r.getData().get(buffer);
            String json = new String(buffer, "UTF-8");
            ZombieLecture lecture = mapper.readValue(json, ZombieLecture.class);
            this.processZombieLecture(lecture);
            log.debug(processedRecords++ + ": " + json);
            if (processedRecords % 1000 == 999) {
                // Uncomment next line to keep track of the processed lectures. 
                checkpointer.checkpoint();
            }
        } catch (UnsupportedEncodingException | MessagingException ex) {
            log.warn(ex.getMessage());
        }
    }
}
项目:samza    文件:TestKinesisRecordProcessor.java   
static Map<KinesisRecordProcessor, List<Record>> generateRecords(int numRecordsPerShard,
    List<KinesisRecordProcessor> processors) throws ShutdownException, InvalidStateException {
  Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new HashMap<>();
  processors.forEach(processor -> {
      try {
        // Create records and call process records
        IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
        doNothing().when(checkpointer).checkpoint(anyString());
        doNothing().when(checkpointer).checkpoint();
        ProcessRecordsInput processRecordsInput = Mockito.mock(ProcessRecordsInput.class);
        when(processRecordsInput.getCheckpointer()).thenReturn(checkpointer);
        when(processRecordsInput.getMillisBehindLatest()).thenReturn(1000L);
        List<Record> inputRecords = createRecords(numRecordsPerShard);
        processorRecordMap.put(processor, inputRecords);
        when(processRecordsInput.getRecords()).thenReturn(inputRecords);
        processor.processRecords(processRecordsInput);
      } catch (ShutdownException | InvalidStateException ex) {
        throw new RuntimeException(ex);
      }
    });
  return processorRecordMap;
}
项目:zipkin-aws    文件:KinesisSpanProcessorTest.java   
@Test
public void collectorFailsWhenRecordEncodedAsSingleSpan() {
  Span span = TestObjects.LOTS_OF_SPANS[0];
  byte[] encodedSpan = Codec.THRIFT.writeSpan(span);
  Record kinesisRecord = new Record().withData(ByteBuffer.wrap(encodedSpan));
  ProcessRecordsInput kinesisInput = new ProcessRecordsInput().withRecords(Collections.singletonList(kinesisRecord));

  kinesisSpanProcessor.processRecords(kinesisInput);

  assertThat(storage.spanStore().getTraces().size()).isEqualTo(0);

  assertThat(metrics.messagesDropped()).isEqualTo(1);
  assertThat(metrics.bytes()).isEqualTo(encodedSpan.length);
}
项目:zipkin-aws    文件:KinesisSpanProcessorTest.java   
private ProcessRecordsInput createTestData(int count) {
  List<Record> records = new ArrayList<>();

  Span[] spans = Arrays.copyOfRange(TestObjects.LOTS_OF_SPANS, 0, count);

  Arrays.stream(spans)
      .map(s -> ByteBuffer.wrap(Codec.THRIFT.writeSpans(Collections.singletonList(s))))
      .map(b -> new Record().withData(b))
      .forEach(records::add);

  return new ProcessRecordsInput().withRecords(records);
}
项目:lumber-mill    文件:RecordProcessor.java   
/**
 * {@inheritDoc}
 */
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
   try {
       List<Record> records = processRecordsInput.getRecords();
       Thread.currentThread().setName(kinesisShardId);
       int bytes = calculateSize(records);

       LOG.debug("Got {} records ({} bytes) and is behind latest with {}",
               records.size(), bytes, printTextBehindLatest(processRecordsInput));

       metricsCallback.shardBehindMs (kinesisShardId, processRecordsInput.getMillisBehindLatest());

       Observable observable = Observable.create(subscriber -> {
           try {
               for (Record record : records) {
                   subscriber.onNext(Codecs.BYTES.from(record.getData().array())
                           .put("_shardId", kinesisShardId));
               }
               subscriber.onCompleted();
               metricsCallback.recordsProcessed (kinesisShardId, records.size());
               metricsCallback.bytesProcessed (kinesisShardId,bytes);
           } catch (RuntimeException e) {
               subscriber.onError(e);
           }
       });

       unitOfWorkListener.apply(observable).toBlocking().subscribe();
       transaction.checkpoint(processRecordsInput.getCheckpointer());
   } catch (RuntimeException t) {
       doOnError(t);
   }
}
项目:samza    文件:KinesisRecordProcessor.java   
/**
 * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
 * application. Upon fail over, the new instance will get records with sequence number greater than the checkpoint
 * position for each partition key.
 *
 * @param processRecordsInput Provides the records to be processed as well as information and capabilities related
 *        to them (eg checkpointing).
 */
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
  // KCL does not send any records to the processor that was shutdown.
  Validate.isTrue(!shutdownRequested,
      String.format("KCL returned records after shutdown is called on the processor %s.", this));
  // KCL aways gives reference to the same checkpointer instance for a given processor instance.
  checkpointer = processRecordsInput.getCheckpointer();
  List<Record> records = processRecordsInput.getRecords();
  // Empty records are expected when KCL config has CallProcessRecordsEvenForEmptyRecordList set to true.
  if (!records.isEmpty()) {
    lastProcessedRecordSeqNumber = new ExtendedSequenceNumber(records.get(records.size() - 1).getSequenceNumber());
    listener.onReceiveRecords(ssp, records, processRecordsInput.getMillisBehindLatest());
  }
}
项目:zipkin-aws    文件:KinesisSpanProcessor.java   
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
  for (Record record : processRecordsInput.getRecords()) {
    collector.acceptSpans(record.getData().array(), DETECTING_DECODER, Callback.NOOP);
  }
}
项目:lumber-mill    文件:RecordProcessor.java   
private String printTextBehindLatest(ProcessRecordsInput processRecordsInput) {
    return processRecordsInput.getMillisBehindLatest() < 60000
            ? String.format("%s secs", TimeUnit.MILLISECONDS.toSeconds(processRecordsInput.getMillisBehindLatest()))
            : String.format("%s min", TimeUnit.MILLISECONDS.toMinutes(processRecordsInput.getMillisBehindLatest()));
}
项目:datacollector    文件:StreamSetsRecordProcessor.java   
/**
 * {@inheritDoc}
 */
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
  LOG.debug("RecordProcessor processRecords called");

  IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer();

  startBatch();
  Optional<Record> lastProcessedRecord = Optional.empty();
  int recordCount = 0;
    for (Record kRecord : processRecordsInput.getRecords()) {
      try {
        KinesisUtil.processKinesisRecord(shardId, kRecord, parserFactory).forEach(batchMaker::addRecord);
        lastProcessedRecord = Optional.of(kRecord);
        if (++recordCount == maxBatchSize) {
          recordCount = 0;
          finishBatch(checkpointer, kRecord);
          startBatch();
        }
      } catch (DataParserException | IOException e) {
        com.streamsets.pipeline.api.Record record = context.createRecord(kRecord.getSequenceNumber());
        record.set(Field.create(kRecord.getData().array()));
        try {
          errorRecordHandler.onError(new OnRecordErrorException(
              record,
              Errors.KINESIS_03,
              kRecord.getSequenceNumber(),
              e.toString(),
              e
          ));
          // move the lastProcessedRecord forward if not set to stop pipeline
          lastProcessedRecord = Optional.of(kRecord);
        } catch (StageException ex) {
          // KCL skips over the data records that were passed prior to the exception
          // that is, these records are not re-sent to this record processor
          // or to any other record processor in the consumer.
          lastProcessedRecord.ifPresent(r -> finishBatch(checkpointer, r));
          try {
            error.put(ex);
          } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
          }
          return;
        }
      }
    }
  lastProcessedRecord.ifPresent(r -> finishBatch(checkpointer, r));
}