private static GenericConversionService safeConversionService() { final GenericConversionService converter = new GenericConversionService(); converter.addConverter(Object.class, byte[].class, new SerializingConverter()); final DeserializingConverter byteConverter = new DeserializingConverter(); converter.addConverter(byte[].class, Object.class, (byte[] bytes) -> { try { return byteConverter.convert(bytes); } catch (SerializationFailedException e) { LOG.error("Could not extract attribute: {}", e.getMessage()); return null; } }); return converter; }
@SuppressWarnings("unchecked") @Override public List<EntityCommand<?>> fetch(String txId) { List<EntityCommand<?>> transactionOperations = new ArrayList<EntityCommand<?>>(); Map<String, Object> consumerConfigs = (Map<String, Object>)configuration.get("kafkaConsumerConfiguration"); consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(consumerConfigs); kafkaConsumer.subscribe(Arrays.asList(txId)); ConsumerRecords<String, String> records = kafkaConsumer.poll(kafkaConsumerPollTimeout); for (ConsumerRecord<String, String> record : records){ LOG.info("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value()); try { transactionOperations.add(serializer.readFromString(record.value())); } catch (SerializationFailedException e) { LOG.error("Unable to deserialize [{}] because of: {}", record.value(), e.getMessage()); } } kafkaConsumer.close(); return transactionOperations; }
@Override public String writeToString(EntityCommand<T> object) throws SerializationFailedException { try { return jacksonMapper.writeValueAsString(object); } catch (JsonProcessingException e) { throw new SerializationFailedException("Error performing EntityCommand serialization", e); } }
@Test public void nonSerializableObject() { SerializingConverter toBytes = new SerializingConverter(); try { toBytes.convert(new Object()); fail("Expected IllegalArgumentException"); } catch (SerializationFailedException e) { assertNotNull(e.getCause()); assertTrue(e.getCause() instanceof IllegalArgumentException); } }
@Test public void nonSerializableField() { SerializingConverter toBytes = new SerializingConverter(); try { toBytes.convert(new UnSerializable()); fail("Expected SerializationFailureException"); } catch (SerializationFailedException e) { assertNotNull(e.getCause()); assertTrue(e.getCause() instanceof NotSerializableException); } }
@Override public byte[] write(EntityCommand<T> object) throws SerializationFailedException { return writeToString(object).getBytes(); }
@Override public EntityCommand<T> read(byte[] bytes) throws SerializationFailedException { return readFromString(new String(bytes)); }
@Override public abstract EntityCommand<T> readFromString(String chars) throws SerializationFailedException;
@Test(expected = SerializationFailedException.class) public void deserializationFailure() { DeserializingConverter fromBytes = new DeserializingConverter(); fromBytes.convert("Junk".getBytes()); }
byte[] write(T object) throws SerializationFailedException;
String writeToString(T object) throws SerializationFailedException;
T read(byte[] bytes) throws SerializationFailedException;
T readFromString(String chars) throws SerializationFailedException;