一架梯子,一头程序猿,仰望星空!

java elasticsearch 桶聚合(bucket)


Elasticsearch桶聚合,目的就是数据分组,先将数据按指定的条件分成多个组,然后对每一个组进行统计。

不了解Elasticsearch桶聚合概念,可以先学习下Elasticsearch桶聚合教程

本章介绍java elasticsearch桶聚合的用法

例子

// 首先创建RestClient,后续章节通过RestClient对象进行参数配置。
RestClientBuilder restClientBuilder = RestClient.builder(
                new HttpHost("localhost", 9200, "http"), // 设置ES服务地址,支持多个
                new HttpHost("localhost", 9201, "http"));

// 创建RestHighLevelClient,请求都是通过RestHighLevelClient实例发出去的。
RestHighLevelClient client = new RestHighLevelClient(restClientBuilder);

// 创建SearchRequest对象, 设置索引名=order
SearchRequest searchRequest = new SearchRequest("order");
// 通过SearchSourceBuilder构建搜索参数
SearchSourceBuilder builder = new SearchSourceBuilder();
// 通过QueryBuilders构建ES查询条件,这里查询所有文档,复杂的查询语句设置请参考前面的章节。
builder.query(QueryBuilders.matchAllQuery());

// 创建Terms桶聚合条件
// terms聚合命名为: by_shop, 分组字段为: shop_id
TermsAggregationBuilder byShop = AggregationBuilders.terms("by_shop")
                .field("shop_id");

// 设置聚合条件
builder.aggregation(byShop);

// 设置搜索条件
searchRequest.source(builder);

// 执行ES请求
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

// 处理聚合查询结果
Aggregations aggregations = searchResponse.getAggregations();
// 根据by_shop名字查询terms聚合结果
Terms byShopAggregation = aggregations.get("by_shop");

// 遍历terms聚合结果
for (Terms.Bucket bucket  : byShopAggregation.getBuckets()) {
    // 因为是根据shop_id分组,因此可以直接将桶的key转换成int类型
    int shopId = bucket.getKeyAsNumber().intValue();
    // 如果分组的字段是字符串类型,可以直接转成String类型
    // String key = bucket.getKeyAsString();
    // 获取文档总数
    long count = bucket.getDocCount();
}

其他桶聚合条件的用法类型,下面分别介绍各类常用的桶聚合

常用桶聚合

1.Terms聚合

创建聚合条件

// terms聚合命名为: by_shop, 分组字段为: shop_id
TermsAggregationBuilder byShop = AggregationBuilders.terms("by_shop")
                .field("shop_id");

处理聚合结果

Aggregations aggregations = searchResponse.getAggregations();
// 根据by_shop命名查询terms聚合结果
Terms byShopAggregation = aggregations.get("by_shop");

// 遍历terms聚合结果
for (Terms.Bucket bucket  : byShopAggregation.getBuckets()) {
    // 因为是根据shop_id分组,因此可以直接将桶的key转换成int类型
    int shopId = bucket.getKeyAsNumber().intValue();
    // 如果分组的字段是字符串类型,可以直接转成String类型
    // String key = bucket.getKeyAsString();
    // 获取文档总数
    long count = bucket.getDocCount();
}

2.Histogram聚合

创建聚合条件

// Histogram聚合命名为: prices
HistogramAggregationBuilder histogramAggregationBuilder = AggregationBuilders.histogram("prices")
                .field("price") // 根据price字段值,对数据进行分组
                .interval(100); //  分桶的间隔为100,意思就是price字段值按100间隔分组

处理聚合结果

// 处理聚合查询结果
Aggregations aggregations = searchResponse.getAggregations();
// 根据prices命名查询Histogram聚合结果
Histogram histogram = aggregations.get("prices");
        
// 遍历聚合结果
for (Histogram.Bucket bucket  : histogram.getBuckets()) {
    // 获取桶的Key值
    String key = bucket.getKeyAsString();
    // 获取文档总数
    long count = bucket.getDocCount();
}

3.Date histogram聚合

创建聚合条件

// DateHistogram聚合命名为: sales_over_time
DateHistogramAggregationBuilder dateHistogramAggregationBuilder = AggregationBuilders.dateHistogram("sales_over_time")
                .field("date") // 根据date字段值,对数据进行分组
                // 时间分组间隔:DateHistogramInterval.* 序列常量,支持每月,每年,每天等等时间间隔
                .calendarInterval(DateHistogramInterval.MONTH)
                // 设置返回结果中桶key的时间格式
                .format("yyyy-MM-dd");

处理聚合结果

// 处理聚合查询结果
Aggregations aggregations = searchResponse.getAggregations();
// 根据sales_over_time命名查询Histogram聚合结果
Histogram histogram = aggregations.get("sales_over_time");
        
// 遍历聚合结果
for (Histogram.Bucket bucket  : histogram.getBuckets()) {
    // 获取桶的Key值
    String key = bucket.getKeyAsString();
    // 获取文档总数
    long count = bucket.getDocCount();
}

4.Range聚合

创建聚合条件

//range聚合命名为: price_ranges
RangeAggregationBuilder rangeAggregationBuilder = AggregationBuilders.range("price_ranges")
                .field("price") // 根据price字段分桶
                .addUnboundedFrom(100) // 范围配置, 0 - 100
                .addRange(100.0, 200.0) // 范围配置, 100 - 200
                .addUnboundedTo(200.0); // 范围配置,> 200的值

处理聚合结果

// 处理聚合查询结果
Aggregations aggregations = searchResponse.getAggregations();
Range range = aggregations.get("price_ranges");

// 遍历聚合结果
for (Range.Bucket bucket  : range.getBuckets()) {
    // 获取桶的Key值
    String key = bucket.getKeyAsString();
    // 获取文档总数
    long count = bucket.getDocCount();
}

5.嵌套聚合的用法

桶聚合之间支持互相嵌套,同时桶聚合也可以嵌套多个指标聚合,可以参考下面例子组合聚合条件

创建嵌套聚合条件

// 创建Terms指标聚合
TermsAggregationBuilder byShop = AggregationBuilders.terms("by_shop")
                .field("shop_id");

// 创建avg指标聚合
AvgAggregationBuilder avgPriceBuilder = AggregationBuilders.avg("avg_price")
                .field("price");
// 设置嵌套聚合查询条件
byShop.subAggregation(avgPriceBuilder);

SumAggregationBuilder sumPriceBulder = AggregationBuilders.sum("sum_price")
                .field("price");
// 设置嵌套聚合查询条件
byShop.subAggregation(sumPriceBulder);

处理结果

// 处理聚合查询结果
Aggregations aggregations = searchResponse.getAggregations();
Terms terms = aggregations.get("by_shop");

// 遍历聚合结果
for (Terms.Bucket bucket  : terms.getBuckets()) {
      // 获取桶的Key值
      String key = bucket.getKeyAsString();
      // 获取文档总数
      long count = bucket.getDocCount();

      // 处理嵌套聚合结果
      Aggregations subAggregations = bucket.getAggregations();
      // 根据avg_price命名,查询avg聚合结果
      Avg avgPriceResult = subAggregations.get("avg_price");
      // 获取平均价格
      double avgPrice = avgPriceResult.getValue();

      // 根据sum_price命名,查询sum聚合结果
      Sum sumPriceResult = subAggregations.get("sum_price");
      // 获取总价格
      double sumPrice = sumPriceResult.getValue();
}