我想将afro中的主题从kafka索引为elasticsearch格式,但是我的时间戳字段有问题,elasticsearch会将其识别为日期格式字段。
我对连接器使用了以下配置。
{ "name": "es-sink-barchart-10", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter.schema.registry.url": "http://localhost:8081", "connection.url": "http://localhost:9200", "type.name":"type.name=kafka-connect", "topics": "exchange_avro_01", "topic.index.map": "exchange_avro_01:exchange_barchart", "key.ignore": "true" } }
原始字段是bigint类型,我希望目标字段是具有Elasticsearch的任何有效格式的日期类型。我定义了一个动态模板,尝试通过以下方式解决它:
curl -XPUT "http://localhost:9200/_template/kafkaconnect/" -H 'Content-Type: application/json' -d' { "index_patterns": "exchange*", "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "kafka-connect": { "dynamic_templates": [ { "dates": { "match_mapping_type": "long", "match": "TIME", "mapping": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss" } } } ] , "properties": { "CLOSE": { "type": "double" }, . . . } } } } }'
当我加载上述连接器时,没有任何东西索引到Elasticsearch。
有什么帮助吗?
如果您的来源是bigint,那么大概是一个时代。如果是一个纪元,那么这将不起作用:
"mapping": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss" }
因为您要告诉Elasticsearch日期格式为yyyy-MM-dd HH:mm:ss(不是)。
yyyy-MM-dd HH:mm:ss
因此,请尝试以下操作(暂时省略您的自定义映射;先使它工作,然后再添加回去):
{ "index_patterns": "exchange*", "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "kafka-connect": { "dynamic_templates": [ { "dates": { "match": "TIME", "mapping": { "type": "date" } } } ] } } }
另请参阅:https : //www.elastic.co/guide/zh- CN/elasticsearch/reference/current/dynamic-field-mapping.html#date- detection
没有东西被索引到elasticsearch。
检查Kafka Connect工作程序日志和Elasticsearch日志是否有任何错误。