版权声明:本文为博主原创文章,未经博主允许不得转载。交流请联系:351605040 https://blog.csdn.net/Arvinzr/article/details/83187233
背景
我们在实际业务场景中会遇到聚合筛选的需求,需要先分组然后聚合,再通过聚合的结果进行筛选,关系型数据库中有having或者子查询来实现,ES中key使用 bucket_selector 来实现此功能
实际业务场景 sapv博客之家
需要找出下单次数大于等于2单,并且平均下单金额大于等于100的客户
- 在关系型数据库中对应的SQL语句
SELECT
userId,
AVG(amount) avgAmount,
count(*) orderCount
FROM type_order
GROUP by userId
HAVING avgAmount >= 100 and orderCount >=2
- ES 的 query
GET index_test/type_order/_search
{
"size": 0,
"aggs": {
"groupUserId": {
"terms": {
"field": "userId"
},
"aggs": {
"avgAmount": {
"avg": {
"field": "amount"
}
},
"having": {
"bucket_selector": {
"buckets_path": {
"orderCount": "_count",
"avgAmount": "avgAmount"
},
"script": {
"source": "params.avgAmount >= 100 && params.orderCount >=2 "
}
}
}
}
}
}
}
- 返回结果
{
"took": 16,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 4,
"max_score": 0,
"hits": []
},
"aggregations": {
"groupUserId": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": 1000,
"doc_count": 2,
"avgAmount": {
"value": 275
}
}
]
}
}
}
- 在Java Api 中的实现方式
String tremsAlias = "userIdGroup";
//先分组,如果不指定size,默认10条,这里可以传个int最大值 2147483647,一次取所有数据
TermsAggregationBuilder termsAgg = AggregationBuilders.terms(tremsAlias).field("userId").size(Integer.MAX_VALUE).order(Terms.Order.term(true));
//聚合,count为自带的
termsAgg.subAggregation(AggregationBuilders.avg("avgAmount").field("amount"));
//声明BucketPath,用于后面的bucket筛选
Map<String, String> bucketsPathsMap = new HashMap<>(8);
bucketsPathsMap.put("orderCount", "_count");
bucketsPathsMap.put("avgAmount", "avgAmount");
//设置脚本
Script script = new Script("params.avgAmount >= 100 && params.orderCount >=2");
//构建bucket选择器
BucketSelectorPipelineAggregationBuilder bs =
PipelineAggregatorBuilders.bucketSelector("having", bucketsPathsMap, script);
termsAgg.subAggregation(bs);
SearchRequestBuilder sb = client.prepareSearch("index_test").setTypes("type_order");
SearchResponse sr = sb.setSize(0).addAggregation(termsAgg).execute().actionGet();
System.out.println("查询Query:");
System.out.println(sb);
//获取聚合筛选的结果数据
LongTerms lt = sr.getAggregations().get(tremsAlias);
List<LongTerms.Bucket> buckets = lt.getBuckets();
for (int i = 0; i < buckets.size(); i++) {
LongTerms.Bucket bucket = buckets.get(i);
System.out.println("-------------------------");
System.out.println(bucket.getKey());
System.out.println("count = " + bucket.getDocCount());
List<Aggregation> list = bucket.getAggregations().asList();
for (Aggregation agg : list) {
if (agg instanceof InternalAvg) {
InternalAvg ia = bucket.getAggregations().get("avgAmount");
System.out.println("avgAmount = " + ia.getValue());
}
}
System.out.println("-------------------------");
}