首页  编辑  

ELK聚合字段和runtime字段示例

Tags: /Java/   Date Created:
Java下,使用 HighLevelClient,带多重聚合查询,支持按日历日期(支持自然天/自然月聚合),然后再按某个字段聚合,支持动态映射字段,并按百分位数聚合,示例代码。
/**   * Statistic percentile duration and save to table ete_fe_duration   *   * @param from     start date   * @param to       end date   * @param unitType Aggregation, Day or Month   * @return list of PercentileDuration   */  public List<PercentileDuration> processPercentileDuration(ZonedDateTime from, ZonedDateTime to, UnitType unitType) {      // 本處等價效果 SQL:      // select date_format(timestamp, 'yyyy-MM-dd') as day, TRANSACTION_ID, percentile(my_duration, 10), percentile(my_duration, 20), ...      // from elasticsearch      // where timestamp between from and end      // group by day, TRANSACTION_ID      DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'hh:mm:ss.sss'Z'");      MatchQueryBuilder traceLogQuery = QueryBuilders.matchQuery("LOG_TYPE.keyword", "TRACE");      PrefixQueryBuilder contentQuery = QueryBuilders.prefixQuery("CONTENT.keyword", "{\"duration");      TermsQueryBuilder transactionQuery = QueryBuilders.termsQuery("TRANSACTION_ID.keyword", "Summary_Dx", "Note_Submenu");      BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()              .must(traceLogQuery)              .must(contentQuery)              .must(transactionQuery);      String fromDateStr = from.withZoneSameInstant(ZoneOffset.UTC).format(formatter);      String toDateStr = to.withZoneSameInstant(ZoneOffset.UTC).format(formatter);      RangeQueryBuilder dateRange = QueryBuilders.rangeQuery("@timestamp").gte(fromDateStr).lte(toDateStr);      queryBuilder.must(dateRange);      // Add runtime field to parse duration string: "duration: 123ms" ==> 123      Map<String, Object> elements = new HashMap<>();      elements.put("type", "double");      elements.put("script",              "if (doc['CONTENT.keyword'].size() != 0) { def m = /^\\D*(\\d+)ms.*$/.matcher(doc['CONTENT.keyword'].value); if ( m.matches() ) { emit(Integer.parseInt(m.group(1))) } else { emit(-1); } } else { emit(-1); }");      Map<String, Object> runtimeMappings = new HashMap<>();      runtimeMappings.put("my_duration", elements);      PercentilesAggregationBuilder aggregationBuilder = AggregationBuilders.percentiles("percentile_duration")              .field("my_duration")              .percentiles(10d, 50d, 75d, 90d, 95d, 99d, 100d);      TermsAggregationBuilder groupByFunction = AggregationBuilders.terms("function_code")              .field("TRANSACTION_ID.keyword").subAggregation(aggregationBuilder);      AggregationBuilder group = AggregationBuilders              .dateHistogram("MyDate")              .calendarInterval(DateHistogramInterval.DAY)              .minDocCount(0)              .field("@timestamp")              .format("yyyy-MM-dd")              .subAggregation(groupByFunction);      // Run search      Optional<SearchResponse> searchResponse = search("elastic search index", queryBuilder, group, runtimeMappings);      if (!searchResponse.isPresent()) {          return Collections.emptyList();      }      // 處理聚合結果中的 日期聚合結果 MyDate,由前面 AggregationBuilder中定義      ParsedDateHistogram aggregations = searchResponse.get().getAggregations().get("MyDate");      List<PercentileDuration> percentileDurations = new ArrayList<>();      // 對日期聚合結果中的每個Bucket處理      for (Object item : aggregations.getBuckets()) {          // dateGroup中保存了 日期和日期下的聚合結果          ParsedDateHistogram.ParsedBucket dateGroup = (ParsedDateHistogram.ParsedBucket) item;          // 獲取每個日期聚合結果中的 function_code 聚合結果          Optional<ParsedStringTerms> functionTerms = dateGroup.getAggregations().asList().stream()                  .filter(aggr -> aggr.getName().equals("function_code"))                  .filter(ParsedStringTerms.class::isInstance)                  .map(ParsedStringTerms.class::cast)                  .findAny();          // 掃描 function_code 結果下的 Bucket          for (Object item2 : functionTerms.get().getBuckets()) {              ParsedStringTerms.ParsedBucket terms = (ParsedStringTerms.ParsedBucket) item2;              String transID = terms.getKeyAsString();              // 獲取到  percentile_duration 聚合結果              Optional<ParsedTDigestPercentiles> percentileDuration = terms.getAggregations().asList().stream()                      .filter(agg -> agg.getName().equals("percentile_duration"))                      .filter(ParsedTDigestPercentiles.class::isInstance)                      .map(ParsedTDigestPercentiles.class::cast)                      .findAny();              // 對 percentile_duration 的每一條結果記錄進行處理,生成行記錄:              // statistic_date, unit_cd, function_cd, percentile, percentile_value, total_count              percentileDuration.ifPresent(ps -> ps.forEach(percentile -> {                  percentileDurations.add(PercentileDuration.builder()                          .percentile((int) percentile.getPercent())                          .percentileValue(Double.isNaN(percentile.getValue()) ? null : percentile.getValue() / 1000)                          .unitCd(unitType)                          .statisticDate(((ZonedDateTime) dateGroup.getKey()).toLocalDateTime())                          .functionCd(TransactionID.toFunctionCode(transID))                          .build());              }));          }      }      if (!percentileDurations.isEmpty()) {          percentileDurationMapper.addList(percentileDurations);      }      return percentileDurations;  }  /**   * aggregation search   *   * @param index              index   * @param queryBuilder       query builder   * @param aggregationBuilder aggregation builder   * @return aggregation response   */  public Optional<SearchResponse> search(String index, QueryBuilder queryBuilder, AggregationBuilder aggregationBuilder, Map<String, Object> runtimeMappings) {      SearchSourceBuilder builder = new SearchSourceBuilder();      builder.query(queryBuilder);      builder.aggregation(aggregationBuilder);      builder.runtimeMappings(runtimeMappings);      SearchRequest searchRequest = new SearchRequest(index);      searchRequest.source(builder);      try {          SearchResponse response = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT);          return Optional.ofNullable(response);      } catch (IOException e) {          log.error("failed to search: {}", e.getMessage());      }      return Optional.empty();  }