我们有一个关系数据库,其中包含有关我们日常运营的数据。目的是允许用户使用全文本搜索引擎来搜索重要数据。数据经过规范化,因此不是进行全文查询的最佳形式,因此,其想法是对数据的一部分进行规范化,然后将其实时复制到Elasticsearch,这使我们能够创建快速而准确的搜索应用程序。
我们已经有了一个启用数据库操作(插入,更新,删除)事件搜索的系统。这些事件仅包含已更改的列和主键(在更新中,我们没有得到整个行)。Logstash已经为每个事件得到通知,因此这部分已经得到处理。
现在我们要解决我们的问题。由于计划是对数据进行非规范化,因此我们必须确保将父对象的更新传播到Elasticsearch中的非规范化子对象。我们如何配置logstash来做到这一点?
假设我们Employees在Elasticsearch中维护一个列表。每个Employee都分配给一个Company。由于数据是非规范化的(出于快速搜索的目的),每个数据Employee还带有的名称和地址Company。更新会更改a的名称Company- 我们如何配置logstash来更新Employees分配给的所有公司名称Company?
Employees
Employee
Company
@Darth_Vader:我们面临的问题是,我们得到一个事件a Company发生了变化,但是我们希望Employee在Elasticsearch中修改类型的文档,因为它们本身携带有关公司的数据。您的回答期望我们会为每一个都发生一个事件Employee,事实并非如此。
也许这将使其更加清晰。我们在Elasticsearch中有3名员工:
{type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company A'} {type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company A'} {type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'}
然后在源数据库中发生更新。
UPDATE company SET name = 'Company NEW' WHERE cmp_id = 1;
我们在logstash中得到一个事件,它表示如下内容:
{type:'company',cmp_id:'1',old.name:'Company A',new.name:'Company NEW'}
然后,应将其传播到Elasticsearch,以使最终的雇员为:
{type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company NEW'} {type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company NEW'} {type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'}
请注意,该字段已 company.name 更改。
company.name
对于我在这里发布的内容,我建议采用类似的解决方案,即使用http输出插件以便通过查询对Employee索引的查询来发布更新。查询将如下所示:
http
POST employees/_update_by_query { "script": { "source": "ctx._source.company.name = params.name", "lang": "painless", "params": { "name": "Company NEW" } }, "query": { "term": { "company.cmp_id": "1" } } }
因此,您的Logstash配置应如下所示:
input { ... } filter { mutate { add_field => { "[script][lang]" => "painless" "[script][source]" => "ctx._source.company.name = params.name" "[script][params][name]" => "%{new.name}" "[query][term][company.cmp_id]" => "%{cmp_id}" } remove_field => ["host", "@version", "@timestamp", "type", "cmp_id", "old.name", "new.name"] } } output { http { url => "http://localhost:9200/employees/_update_by_query" http_method => "post" format => "json" } }