@SneakyThrows @Override public void processRecords(ProcessRecordsInput processRecordsInput) { List<Record> records = processRecordsInput.getRecords(); // Used to update the last processed record IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer(); log.info("Recovering records from kinesis."); for (Record r : records) { try { int len = r.getData().remaining(); byte[] buffer = new byte[len]; r.getData().get(buffer); String json = new String(buffer, "UTF-8"); ZombieLecture lecture = mapper.readValue(json, ZombieLecture.class); this.processZombieLecture(lecture); log.debug(processedRecords++ + ": " + json); if (processedRecords % 1000 == 999) { // Uncomment next line to keep track of the processed lectures. checkpointer.checkpoint(); } } catch (UnsupportedEncodingException | MessagingException ex) { log.warn(ex.getMessage()); } } }
static Map<KinesisRecordProcessor, List<Record>> generateRecords(int numRecordsPerShard, List<KinesisRecordProcessor> processors) throws ShutdownException, InvalidStateException { Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new HashMap<>(); processors.forEach(processor -> { try { // Create records and call process records IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class); doNothing().when(checkpointer).checkpoint(anyString()); doNothing().when(checkpointer).checkpoint(); ProcessRecordsInput processRecordsInput = Mockito.mock(ProcessRecordsInput.class); when(processRecordsInput.getCheckpointer()).thenReturn(checkpointer); when(processRecordsInput.getMillisBehindLatest()).thenReturn(1000L); List<Record> inputRecords = createRecords(numRecordsPerShard); processorRecordMap.put(processor, inputRecords); when(processRecordsInput.getRecords()).thenReturn(inputRecords); processor.processRecords(processRecordsInput); } catch (ShutdownException | InvalidStateException ex) { throw new RuntimeException(ex); } }); return processorRecordMap; }
@Test public void collectorFailsWhenRecordEncodedAsSingleSpan() { Span span = TestObjects.LOTS_OF_SPANS[0]; byte[] encodedSpan = Codec.THRIFT.writeSpan(span); Record kinesisRecord = new Record().withData(ByteBuffer.wrap(encodedSpan)); ProcessRecordsInput kinesisInput = new ProcessRecordsInput().withRecords(Collections.singletonList(kinesisRecord)); kinesisSpanProcessor.processRecords(kinesisInput); assertThat(storage.spanStore().getTraces().size()).isEqualTo(0); assertThat(metrics.messagesDropped()).isEqualTo(1); assertThat(metrics.bytes()).isEqualTo(encodedSpan.length); }
private ProcessRecordsInput createTestData(int count) { List<Record> records = new ArrayList<>(); Span[] spans = Arrays.copyOfRange(TestObjects.LOTS_OF_SPANS, 0, count); Arrays.stream(spans) .map(s -> ByteBuffer.wrap(Codec.THRIFT.writeSpans(Collections.singletonList(s)))) .map(b -> new Record().withData(b)) .forEach(records::add); return new ProcessRecordsInput().withRecords(records); }
/** * {@inheritDoc} */ @Override public void processRecords(ProcessRecordsInput processRecordsInput) { try { List<Record> records = processRecordsInput.getRecords(); Thread.currentThread().setName(kinesisShardId); int bytes = calculateSize(records); LOG.debug("Got {} records ({} bytes) and is behind latest with {}", records.size(), bytes, printTextBehindLatest(processRecordsInput)); metricsCallback.shardBehindMs (kinesisShardId, processRecordsInput.getMillisBehindLatest()); Observable observable = Observable.create(subscriber -> { try { for (Record record : records) { subscriber.onNext(Codecs.BYTES.from(record.getData().array()) .put("_shardId", kinesisShardId)); } subscriber.onCompleted(); metricsCallback.recordsProcessed (kinesisShardId, records.size()); metricsCallback.bytesProcessed (kinesisShardId,bytes); } catch (RuntimeException e) { subscriber.onError(e); } }); unitOfWorkListener.apply(observable).toBlocking().subscribe(); transaction.checkpoint(processRecordsInput.getCheckpointer()); } catch (RuntimeException t) { doOnError(t); } }
/** * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the * application. Upon fail over, the new instance will get records with sequence number greater than the checkpoint * position for each partition key. * * @param processRecordsInput Provides the records to be processed as well as information and capabilities related * to them (eg checkpointing). */ @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // KCL does not send any records to the processor that was shutdown. Validate.isTrue(!shutdownRequested, String.format("KCL returned records after shutdown is called on the processor %s.", this)); // KCL aways gives reference to the same checkpointer instance for a given processor instance. checkpointer = processRecordsInput.getCheckpointer(); List<Record> records = processRecordsInput.getRecords(); // Empty records are expected when KCL config has CallProcessRecordsEvenForEmptyRecordList set to true. if (!records.isEmpty()) { lastProcessedRecordSeqNumber = new ExtendedSequenceNumber(records.get(records.size() - 1).getSequenceNumber()); listener.onReceiveRecords(ssp, records, processRecordsInput.getMillisBehindLatest()); } }
@Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { collector.acceptSpans(record.getData().array(), DETECTING_DECODER, Callback.NOOP); } }
private String printTextBehindLatest(ProcessRecordsInput processRecordsInput) { return processRecordsInput.getMillisBehindLatest() < 60000 ? String.format("%s secs", TimeUnit.MILLISECONDS.toSeconds(processRecordsInput.getMillisBehindLatest())) : String.format("%s min", TimeUnit.MILLISECONDS.toMinutes(processRecordsInput.getMillisBehindLatest())); }
/** * {@inheritDoc} */ @Override public void processRecords(ProcessRecordsInput processRecordsInput) { LOG.debug("RecordProcessor processRecords called"); IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer(); startBatch(); Optional<Record> lastProcessedRecord = Optional.empty(); int recordCount = 0; for (Record kRecord : processRecordsInput.getRecords()) { try { KinesisUtil.processKinesisRecord(shardId, kRecord, parserFactory).forEach(batchMaker::addRecord); lastProcessedRecord = Optional.of(kRecord); if (++recordCount == maxBatchSize) { recordCount = 0; finishBatch(checkpointer, kRecord); startBatch(); } } catch (DataParserException | IOException e) { com.streamsets.pipeline.api.Record record = context.createRecord(kRecord.getSequenceNumber()); record.set(Field.create(kRecord.getData().array())); try { errorRecordHandler.onError(new OnRecordErrorException( record, Errors.KINESIS_03, kRecord.getSequenceNumber(), e.toString(), e )); // move the lastProcessedRecord forward if not set to stop pipeline lastProcessedRecord = Optional.of(kRecord); } catch (StageException ex) { // KCL skips over the data records that were passed prior to the exception // that is, these records are not re-sent to this record processor // or to any other record processor in the consumer. lastProcessedRecord.ifPresent(r -> finishBatch(checkpointer, r)); try { error.put(ex); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } return; } } } lastProcessedRecord.ifPresent(r -> finishBatch(checkpointer, r)); }