ⓘ
Java下,使用 HighLevelClient,带多重聚合查询,支持按日历日期(支持自然天/自然月聚合),然后再按某个字段聚合,支持动态映射字段,并按百分位数聚合,示例代码。
public List<PercentileDuration> processPercentileDuration(ZonedDateTime from, ZonedDateTime to, UnitType unitType) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'hh:mm:ss.sss'Z'");
MatchQueryBuilder traceLogQuery = QueryBuilders.matchQuery("LOG_TYPE.keyword", "TRACE");
PrefixQueryBuilder contentQuery = QueryBuilders.prefixQuery("CONTENT.keyword", "{\"duration");
TermsQueryBuilder transactionQuery = QueryBuilders.termsQuery("TRANSACTION_ID.keyword", "Summary_Dx", "Note_Submenu");
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(traceLogQuery)
.must(contentQuery)
.must(transactionQuery);
String fromDateStr = from.withZoneSameInstant(ZoneOffset.UTC).format(formatter);
String toDateStr = to.withZoneSameInstant(ZoneOffset.UTC).format(formatter);
RangeQueryBuilder dateRange = QueryBuilders.rangeQuery("@timestamp").gte(fromDateStr).lte(toDateStr);
queryBuilder.must(dateRange);
Map<String, Object> elements = new HashMap<>();
elements.put("type", "double");
elements.put("script",
"if (doc['CONTENT.keyword'].size() != 0) { def m = /^\\D*(\\d+)ms.*$/.matcher(doc['CONTENT.keyword'].value); if ( m.matches() ) { emit(Integer.parseInt(m.group(1))) } else { emit(-1); } } else { emit(-1); }");
Map<String, Object> runtimeMappings = new HashMap<>();
runtimeMappings.put("my_duration", elements);
PercentilesAggregationBuilder aggregationBuilder = AggregationBuilders.percentiles("percentile_duration")
.field("my_duration")
.percentiles(10d, 50d, 75d, 90d, 95d, 99d, 100d);
TermsAggregationBuilder groupByFunction = AggregationBuilders.terms("function_code")
.field("TRANSACTION_ID.keyword").subAggregation(aggregationBuilder);
AggregationBuilder group = AggregationBuilders
.dateHistogram("MyDate")
.calendarInterval(DateHistogramInterval.DAY)
.minDocCount(0)
.field("@timestamp")
.format("yyyy-MM-dd")
.subAggregation(groupByFunction);
Optional<SearchResponse> searchResponse = search("elastic search index", queryBuilder, group, runtimeMappings);
if (!searchResponse.isPresent()) {
return Collections.emptyList();
}
ParsedDateHistogram aggregations = searchResponse.get().getAggregations().get("MyDate");
List<PercentileDuration> percentileDurations = new ArrayList<>();
for (Object item : aggregations.getBuckets()) {
ParsedDateHistogram.ParsedBucket dateGroup = (ParsedDateHistogram.ParsedBucket) item;
Optional<ParsedStringTerms> functionTerms = dateGroup.getAggregations().asList().stream()
.filter(aggr -> aggr.getName().equals("function_code"))
.filter(ParsedStringTerms.class::isInstance)
.map(ParsedStringTerms.class::cast)
.findAny();
for (Object item2 : functionTerms.get().getBuckets()) {
ParsedStringTerms.ParsedBucket terms = (ParsedStringTerms.ParsedBucket) item2;
String transID = terms.getKeyAsString();
Optional<ParsedTDigestPercentiles> percentileDuration = terms.getAggregations().asList().stream()
.filter(agg -> agg.getName().equals("percentile_duration"))
.filter(ParsedTDigestPercentiles.class::isInstance)
.map(ParsedTDigestPercentiles.class::cast)
.findAny();
percentileDuration.ifPresent(ps -> ps.forEach(percentile -> {
percentileDurations.add(PercentileDuration.builder()
.percentile((int) percentile.getPercent())
.percentileValue(Double.isNaN(percentile.getValue()) ? null : percentile.getValue() / 1000)
.unitCd(unitType)
.statisticDate(((ZonedDateTime) dateGroup.getKey()).toLocalDateTime())
.functionCd(TransactionID.toFunctionCode(transID))
.build());
}));
}
}
if (!percentileDurations.isEmpty()) {
percentileDurationMapper.addList(percentileDurations);
}
return percentileDurations;
}
public Optional<SearchResponse> search(String index, QueryBuilder queryBuilder, AggregationBuilder aggregationBuilder, Map<String, Object> runtimeMappings) {
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(queryBuilder);
builder.aggregation(aggregationBuilder);
builder.runtimeMappings(runtimeMappings);
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.source(builder);
try {
SearchResponse response = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT);
return Optional.ofNullable(response);
} catch (IOException e) {
log.error("failed to search: {}", e.getMessage());
}
return Optional.empty();
}