一尘不染

ElasticSearch Nest。术语聚合及其迭代的更好代码

elasticsearch

我想获取给定期间内唯一数字用户ID的列表。

假设字段为userId,时间字段为startTime,我成功获得如下结果;

HashSet<int> hashUserIdList= new HashSet<int>(); // guarantees to store unique userIds.

// Step 1. get unique number of userIds
var total = client.Search<Log>(s => s
    .Query(q => q
        .DateRange(c => c.Field(p => p.startTime)
        .GreaterThan(FixedDate)))
        .Aggregations(a => a
            .Cardinality("userId_cardinality", c => c
                .Field("userId"))))
    .Aggs.Cardinality("userId_cardinality");

int totalCount = (int)total.Value;

// Step 2. get unique userId values by Terms aggregation.
var response = client.Search<Log>(s => s
    .Source(source => source.Includes(inc => inc.Field("userId")))
    .Query(q => q
        .DateRange(c => c.Field(p => p.startTime)
        .GreaterThan(FixedDate)))
    .Aggregations(a => a
        .Terms("userId_terms", c => c
            .Field("userId").Size(totalCount))))
    .Aggs.Terms("userId_terms");

// Step 3. store unique userIds to HashSet.
foreach (var element in response.Buckets)
{
    hashUserIdList.Add(int.Parse(element.Key));
}

可以工作,
但效率不高,因为(1)totalCount首先获取,并且(2)它定义Size(totalCount)由于存储桶溢出(如果结果有成千上万个),可能导致500个服务器错误。

以某种foreach方式进行迭代会很好,但是我无法使它们按大小迭代100。我在这里放了From/ SizeSkip/
Take,但是返回值不可靠。

如何正确编码?


阅读 406

收藏
2020-06-22

共1个答案

一尘不染

对于某些集合,此方法可能是可行的,但需要注意以下几点:

  1. 基数聚合使用HyperLogLog ++算法来 近似 基数。对于低基数字段,此近似值可能完全准确,而对于高基数字段,则近似值不那么准确。
  2. 术语对于 许多 术语而言,聚合可能在计算上很昂贵,因为每个存储桶都需要构建在内存中,然后序列化以响应。

您可能可以跳过基数汇总来获取大小,而只需将其int.MaxValue作为术语汇总的大小即可。在速度方面效率较低的另一种方法是滚动浏览范围内的所有文档,使用源过滤器仅返回您感兴趣的字段。我希望使用Scroll方法可以减轻群集的压力,但我建议您监视您采用的任何方法。

这是对Stack Overflow数据集(2016年6月,IIRC)上这两种方法的比较,研究了两年前的今天和一年前的今天的独特提问者。

术语汇总

void Main()
{
    var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));

    var connectionSettings = new ConnectionSettings(pool)
        .MapDefaultTypeIndices(d => d
            .Add(typeof(Question), NDC.StackOverflowIndex)
        );


    var client = new ElasticClient(connectionSettings);

    var twoYearsAgo = DateTime.UtcNow.Date.AddYears(-2);
    var yearAgo = DateTime.UtcNow.Date.AddYears(-1);

    var searchResponse = client.Search<Question>(s => s
        .Size(0)
        .Query(q => q
            .DateRange(c => c.Field(p => p.CreationDate)
                .GreaterThan(twoYearsAgo)
                .LessThan(yearAgo)
            )
        )
        .Aggregations(a => a
            .Terms("unique_users", c => c
                .Field(f => f.OwnerUserId)
                .Size(int.MaxValue)
            )
        )
    );

    var uniqueOwnerUserIds = searchResponse.Aggs.Terms("unique_users").Buckets.Select(b => b.KeyAsString).ToList();

    // 3.83 seconds
    // unique question askers: 795352
    Console.WriteLine($"unique question askers: {uniqueOwnerUserIds.Count}");
}

滚动API

void Main()
{
    var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));

    var connectionSettings = new ConnectionSettings(pool)
        .MapDefaultTypeIndices(d => d
            .Add(typeof(Question), NDC.StackOverflowIndex)
        );

    var client = new ElasticClient(connectionSettings);
    var uniqueOwnerUserIds = new HashSet<int>();

    var twoYearsAgo = DateTime.UtcNow.Date.AddYears(-2);
    var yearAgo = DateTime.UtcNow.Date.AddYears(-1);

    var searchResponse = client.Search<Question>(s => s
        .Source(sf => sf
            .Include(ff => ff
                .Field(f => f.OwnerUserId)
            )
        )
        .Size(10000)
        .Scroll("1m")
        .Query(q => q
            .DateRange(c => c
                .Field(p => p.CreationDate)
                .GreaterThan(twoYearsAgo)
                .LessThan(yearAgo)
            )
        )
    );

    while (searchResponse.Documents.Any())
    {
        foreach (var document in searchResponse.Documents)
        {
            if (document.OwnerUserId.HasValue)
                uniqueOwnerUserIds.Add(document.OwnerUserId.Value);
        }

        searchResponse = client.Scroll<Question>("1m", searchResponse.ScrollId);
    }

    client.ClearScroll(c => c.ScrollId(searchResponse.ScrollId));

    // 91.8 seconds
    // unique question askers: 795352
    Console.WriteLine($"unique question askers: {uniqueOwnerUserIds.Count}");
}

术语汇总比Scroll API方法快24倍。

2020-06-22