一尘不染

Spring Kafka Producer不发送到Kafka 1.0.0(Magic v1不支持记录头)

spring-boot

我正在使用此docker-
compose设置在本地设置Kafka:https :
//github.com/wurstmeister/kafka-
docker/

docker-compose up 工作正常,通过shell创建主题工作正常。

现在我尝试通过以下方式连接到Kafka spring-kafka:2.1.0.RELEASE

启动Spring应用程序时,它会打印正确版本的Kafka:

o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d

我尝试发送这样的消息

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");

在客户端发送失败

UnknownServerException: The server experienced an unexpected error when processing the request

在服务器控制台中,我收到消息 Magic v1不支持记录头

Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers

谷歌搜索表明版本冲突,但该版本似乎合适(org.apache.kafka:kafka-clients:1.0.0在类路径中)。

有什么线索吗?谢谢!

编辑:我缩小了问题的根源。发送纯字符串是可行的,但是通过JsonSerializer发送Json会导致给定的问题。这是我的生产者配置的内容:

@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String

@Bean
fun producerConfigs(): Map<String, Any> =
        HashMap<String, Any>().apply {
            // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
        }

@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
        DefaultKafkaProducerFactory(producerConfigs())

@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
        KafkaTemplate(producerFactory())

阅读 1598

收藏
2020-05-30

共1个答案

一尘不染

解决了。问题既不是代理,也不是某些docker缓存,也不是Spring应用程序。

问题是控制台使用者,我并行使用它进行调试。这是一个“老”消费者kafka-console-consumer.sh --topic=topic --zookeeper=...

它实际上在启动时会显示警告: Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

--bootstrap-server应该使用带有选项的“新”使用者(尤其是在将Kafka
1.0与JsonSerializer一起使用时)。注意:在这里使用老消费者确实会影响生产者。

2020-05-30