从Confluent Platform 5.5版开始,Avro不再是唯一的架构。现在,Protobuf和JSON模式在Confluent Universe中作为一等公民得到支持。但是在继续说明如何在Kafka中使用Protobuf之前,让我们回答一个经常问到的问题:
为什么我们需要模式? 当应用程序通过pub-sub系统进行通信时,它们交换消息,并且通信中的所有参与者都需要理解并同意这些消息。此外,您想检测并阻止对消息格式的更改,这些更改会使某些参与者无法阅读消息。
这就是模式的用处-它代表了通信参与者之间的契约,就像API代表了服务与其使用者之间的契约一样。正如可以使用OpenAPI(Swagger)描述REST API一样,可以使用Avro,Protobuf或Avro模式描述Kafka中的消息。
模式通过以下方式描述数据的结构:
指定消息中的字段 指定每个字段的数据类型以及该字段是否为必填字段 此外,模式与Schema Registry一起可防止生产者发送有害消息-消费者无法解释的格式错误的数据。Schema Registry将检测生产者是否将要引入重大更改,并且可以将其配置为拒绝此类更改。重大更改的一个示例是从架构中删除必填字段。
Protobuf简介 与Apache Avro相似,Protobuf是一种序列化结构化数据的方法。消息格式在.proto文件中定义,您可以使用多种语言从中生成代码,包括Java,Python,C ++,C#,Go和Ruby。与Avro不同,Protobuf不会使用消息序列化架构。因此,为了反序列化消息,您需要使用方中的模式。
这是一个包含一种消息类型的Protobuf模式的示例:
syntax = "proto3"; package com.codingharbour.protobuf; message SimpleMessage { string content = 1; string date_time = 2; }
在第一行中,我们定义我们正在使用Protobuf版本3。我们的消息类型SimpleMessage定义了两个字符串字段:content和date_time。每个字段都分配有一个所谓的字段号,该字段号在消息类型中必须唯一。当邮件序列化为Protobuf二进制格式时,这些数字标识字段。Google建议对最常用的字段使用数字1到15,因为编码需要一个字节。
Protobuf支持常见的标量类型,例如字符串,int32,int64(长整数),double,bool等。有关Protobuf中所有标量类型的完整列表,请查阅Protobuf文档。
除了标量类型,还可以使用复杂的数据类型。在下面,我们看到两种模式,即订购和产品,其中订购可以包含零个,一个或多个产品:
message Order { int64 order_id = 1; int64 date_time = 2; repeated Product product = 3; } message Product { int32 product_id = 1; string name = 2; string description = 3; }
现在,让我们看看这些架构如何最终出现在架构注册表中。
架构注册表和Protobuf 架构注册表是一项服务,用于存储Kafka中使用的架构的版本历史记录。它还以不破坏生产者或消费者的方式支持模式的演变。直到最近,Schema Registry仅支持Avro模式,但是自Confluent Platform 5.5起,该支持已扩展到Protobuf和JSON模式。
如果您以前曾使用过Avro和Kafka,则本节将不会有任何惊喜。与Avro一样,Schema Registry为Protobuf提供了一个序列化器和反序列化器,分别称为KafkaProtobufSerializer和KafkaProtobufDeserializer。
该序列化程序的工作是在生产者将消息写入Kafka之前,将Java对象转换为Protobuf二进制格式。
序列化程序的另一项工作是检查模式注册表中是否存在Protobuf模式。如果不是,它将把架构写入Schema Registry,并将架构ID写入消息(在消息的开头)。然后,当Kafka记录到达使用者时,使用者将使用KafkaProtobufDeserializer根据消息中的模式ID从“模式注册表”中获取模式。提取架构后,KafkaProtobufDeserializer将使用它来反序列化消息。这样,使用者就无需预先知道架构就能使用来自Kafka的消息。
这就是为什么在生产者或使用者中使用KafkaProtobuf(De)Serializer时,我们需要提供Schema Registry的URL。
Java代码生成 好的,现在我们知道Protobuf架构的外观,并且知道它最终在Schema Registry中的状态。现在让我们看看如何使用Java中的Protobuf模式。
您需要的第一件事是一个protobuf-java库。在这些示例中,我正在使用maven,因此让我们添加maven依赖项:
<dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.12.2</version> </dependency> </dependencies>
接下来要做的是使用protoc编译器从中生成Java代码。原始文件。但是我们不会手动邀请编译器,我们将使用一个名为protoc-jar-maven-plugin的maven插件:
<plugin> <groupId>com.github.os72</groupId> <artifactId>protoc-jar-maven-plugin</artifactId> <version>3.11.4</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>run</goal> </goals> <configuration> <inputDirectories> <include>${project.basedir}/src/main/Protobuf</include> </inputDirectories> <outputTargets> <outputTarget> <type>java</type> <addSources>main</addSources> <outputDirectory> ${project.basedir}/target/generated-sources/protobuf </outputDirectory> </outputTarget> </outputTargets> </configuration> </execution> </executions> </plugin>
Protobuf类将在generate-sources阶段生成。该插件将在src / main / protobuf 文件夹中查找proto文件,并且所生成的代码将在target / generated-sources / protobuf文件夹中创建。
要在目标文件夹中生成类,请运行 mvn clean generate-sources
注意:此博客文章中的所有代码示例都可以在Coding Harbour的GitHub上找到。
好的,现在我们已经生成了我们的类,让我们使用新的Protobuf序列化器将其发送给Kafka。
运行本地Kafka群集 在开始之前,让我们使用Schema Registry启动本地Kafka集群,以便我们可以立即尝试我们的代码。我们将使用docker-compose运行集群。
没有docker-compose吗?检查: 如何安装docker-compose
我已经用一个Zookeeper,一个Kafka代理和Schema Registry准备了一个docker-compose文件。您可以从https://github.com/codingharbour/kafka-docker-compose上获取它
https://github.com/codingharbour/kafka-docker-compose
导航到single-node-avro-kafka文件夹并运行docker-compose up -d
现在可以使用本地Kafka群集了。通过运行docker-compose ps,我们可以看到Kafka代理在端口9092上可用,而模式注册表在端口8081上运行。请注意这一点,因为我们很快将需要它。
Writing a Protobuf Producer 随着Kafka集群的建立和运行,现在是时候创建一个Java生产者,它将我们的SimpleMessage发送到Kafka。首先,让我们为生产者准备配置:
Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class); properties.put(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); Producer<String, SimpleMessage> producer = new KafkaProducer<>(properties);
注意,我们使用KafkaProtobufSerializer作为值序列化器类。自5.5版以来,这是Confluent Platform中可用的新序列化程序。它的工作方式与KafkaAvroSerializer相似:发布消息时,它将使用Schema Registry检查模式是否可用。如果该架构尚未注册,它将把它写入Schema Registry,然后将消息发布到Kafka。为此,序列化程序需要Schema Registry的URL,在本例中为http:// localhost:8081。
接下来,我们使用从Protobuf模式生成的SimpleMessage类来准备KafkaRecord:
SimpleMessage simpleMessage = SimpleMessage.newBuilder() .setContent("Hello world") .setDateTime(Instant.now().toString()) .build(); ProducerRecord<String, SimpleMessage> record = new ProducerRecord<>("protobuf-topic", null, simpleMessage);
该记录将被写入名为protobuf-topic的主题。最后要做的是将记录写到Kafka:
producer.send(record); producer.flush(); producer.close();
通常,您不会调用flush()方法,但是由于此后我们的应用程序将停止,因此我们需要确保在此之前将消息写入Kafka。
编写Protobuf消费者 我们说过,由于有了Schema Registry,使用者不需要预先知道该架构就可以反序列化消息。但是,预先拥有可用的架构将使我们能够从中生成Java类,并在代码中使用该类。这有助于提高代码的可读性,并使代码强类型化。
这是操作方法。首先,您将生成一个Java类,如Java代码生成部分中所述。接下来,我们为Kafka使用者准备配置:
Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-consumer-group"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
在这里,我们定义了代理URL,消费者的消费者组,并告诉消费者我们将处理偏移量提交。 接下来,我们为消息定义反序列化器:
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class); properties.put(KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); properties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, SimpleMessage.class.getName());
我们使用字符串反序列化器作为键,但是对于值,我们使用新的KafkaProtobufDeserializer。对于Protobuf反序列化器,我们需要提供Schema Registry URL,就像上面对序列化器所做的一样。
最后一行是最重要的。它告诉反序列化器将记录值反序列化到哪个类。在我们的例子中,它是SimpleMessage类(我们使用Protobuf maven插件从Protobuf模式生成的类)。
现在我们准备创建我们的消费者并将其订阅protobuf-topic:
KafkaConsumer<String, SimpleMessage> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singleton("protobuf-topic"));
然后我们轮询Kafka以获取记录并将其打印到控制台:
while (true) { ConsumerRecords<String, SimpleMessage> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, SimpleMessage> record : records) { System.out.println("Message content: " + record.value().getContent()); System.out.println("Message time: " + record.value().getDateTime()); } consumer.commitAsync(); }
在这里,我们正在使用一批记录,只是将内容打印到控制台。
还记得我们将使用者配置为通过将ENABLE_AUTO_COMMIT_CONFIG设置为false来处理提交偏移量的情况吗?这就是我们在最后一行中所做的事情:只有在完全处理完当前记录组之后,我们才提交消费者偏移量。
这就是编写一个简单的Protobuf消费者所需的全部内容。现在让我们检查另一个变体。
原始Protobuf消费者 如果要在使用者中以通用方式处理消息而不从Protobuf模式生成Java类,该怎么办?好了,您可以使用Protobuf库中的DynamicMessage类的实例。DynamicMessage具有反射API,因此您可以浏览消息字段并读取其值。这是您的操作方法...
首先,让我们配置使用者。它的配置与前面的示例非常相似:
Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "generic-protobuf-consumer-group"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class); properties.put(KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
唯一缺少的是SPECIFIC_PROTOBUF_VALUE_TYPE配置。由于我们希望以通用方式处理消息,因此我们不需要此配置。
现在,我们准备创建我们的使用者并将其订阅protobuf-topic主题,如上例所示:
while (true) { ConsumerRecords<String, DynamicMessage> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, DynamicMessage> record : records) { for (FieldDescriptor field : record.value().getAllFields().keySet()) { System.out.println(field.getName() + ": " + record.value().getField(field)); } } consumer.commitAsync(); }
在我们的使用者中未配置SPECIFIC_PROTOBUF_VALUE_TYPE的情况下,使用者将始终以记录的值返回DynamicMessage的实例。然后,我们使用DynamicMessage.getAllFields()方法获取FieldDescriptor的列表。一旦有了所有的描述符,我们就可以简单地遍历它们并打印每个字段的值。
原文链接:http://codingdict.com