|
| 1 | +/* |
| 2 | + * Copyright (C) 2020 Graylog, Inc. |
| 3 | + * |
| 4 | + * This program is free software: you can redistribute it and/or modify |
| 5 | + * it under the terms of the Server Side Public License, version 1, |
| 6 | + * as published by MongoDB, Inc. |
| 7 | + * |
| 8 | + * This program is distributed in the hope that it will be useful, |
| 9 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 10 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 11 | + * Server Side Public License for more details. |
| 12 | + * |
| 13 | + * You should have received a copy of the Server Side Public License |
| 14 | + * along with this program. If not, see |
| 15 | + * <http://www.mongodb.com/licensing/server-side-public-license>. |
| 16 | + */ |
| 17 | +package org.graylog.storage.opensearch3; |
| 18 | + |
| 19 | +import com.google.common.collect.ImmutableMap; |
| 20 | +import com.google.common.collect.Maps; |
| 21 | +import jakarta.inject.Inject; |
| 22 | +import org.graylog2.indexer.IndexToolsAdapter; |
| 23 | +import org.graylog2.plugin.Message; |
| 24 | +import org.graylog2.plugin.streams.Stream; |
| 25 | +import org.joda.time.DateTime; |
| 26 | +import org.joda.time.DateTimeZone; |
| 27 | +import org.opensearch.client.opensearch._types.ExpandWildcard; |
| 28 | +import org.opensearch.client.opensearch._types.FieldValue; |
| 29 | +import org.opensearch.client.opensearch._types.aggregations.Aggregation; |
| 30 | +import org.opensearch.client.opensearch._types.aggregations.DateHistogramBucket; |
| 31 | +import org.opensearch.client.opensearch._types.aggregations.FilterAggregate; |
| 32 | +import org.opensearch.client.opensearch._types.aggregations.StringTermsBucket; |
| 33 | +import org.opensearch.client.opensearch._types.query_dsl.BoolQuery; |
| 34 | +import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery; |
| 35 | +import org.opensearch.client.opensearch._types.query_dsl.Query; |
| 36 | +import org.opensearch.client.opensearch.core.SearchRequest; |
| 37 | +import org.opensearch.client.opensearch.core.SearchResponse; |
| 38 | + |
| 39 | +import java.util.HashMap; |
| 40 | +import java.util.List; |
| 41 | +import java.util.Map; |
| 42 | +import java.util.Optional; |
| 43 | +import java.util.Set; |
| 44 | + |
| 45 | +public class IndexToolsAdapterOS implements IndexToolsAdapter { |
| 46 | + private static final String AGG_DATE_HISTOGRAM = "source_date_histogram"; |
| 47 | + private static final String AGG_MESSAGE_FIELD = "message_field"; |
| 48 | + private static final String AGG_FILTER = "message_filter"; |
| 49 | + private final OfficialOpensearchClient client; |
| 50 | + |
| 51 | + @Inject |
| 52 | + public IndexToolsAdapterOS(OfficialOpensearchClient client) { |
| 53 | + this.client = client; |
| 54 | + } |
| 55 | + |
| 56 | + @Override |
| 57 | + public Map<DateTime, Map<String, Long>> fieldHistogram(String fieldName, Set<String> indices, |
| 58 | + Optional<Set<String>> includedStreams, long interval) { |
| 59 | + final Query streamFilter = buildStreamIdFilter(includedStreams); |
| 60 | + |
| 61 | + final Aggregation dateHistogramAgg = Aggregation.of(a -> a |
| 62 | + .dateHistogram(dh -> dh |
| 63 | + .field("timestamp") |
| 64 | + .fixedInterval(fi -> fi.time(interval + "ms")) |
| 65 | + // We use "min_doc_count" here to avoid empty buckets in the histogram result. |
| 66 | + // This is needed to avoid out-of-memory errors when creating a histogram for a really large |
| 67 | + // date range. See: https://github.com/Graylog2/graylog-plugin-archive/issues/59 |
| 68 | + .minDocCount(1) |
| 69 | + ) |
| 70 | + .aggregations(AGG_MESSAGE_FIELD, Aggregation.of(ta -> ta |
| 71 | + .terms(t -> t.field(fieldName)) |
| 72 | + )) |
| 73 | + ); |
| 74 | + |
| 75 | + final Aggregation filterAgg = Aggregation.of(a -> a |
| 76 | + .filter(streamFilter) |
| 77 | + .aggregations(AGG_DATE_HISTOGRAM, dateHistogramAgg) |
| 78 | + ); |
| 79 | + |
| 80 | + final SearchRequest searchRequest = SearchRequest.of(sr -> sr |
| 81 | + .index(indices.stream().toList()) |
| 82 | + .query(Query.of(q -> q.matchAll(m -> m))) |
| 83 | + .aggregations(AGG_FILTER, filterAgg) |
| 84 | + .size(0) |
| 85 | + ); |
| 86 | + |
| 87 | + final SearchResponse<Void> searchResult = client.sync( |
| 88 | + c -> c.search(searchRequest, Void.class), |
| 89 | + "Unable to retrieve field histogram." |
| 90 | + ); |
| 91 | + |
| 92 | + final FilterAggregate filterAggregate = searchResult.aggregations().get(AGG_FILTER).filter(); |
| 93 | + final List<DateHistogramBucket> histogramBuckets = filterAggregate.aggregations() |
| 94 | + .get(AGG_DATE_HISTOGRAM).dateHistogram().buckets().array(); |
| 95 | + |
| 96 | + final Map<DateTime, Map<String, Long>> result = Maps.newHashMapWithExpectedSize(histogramBuckets.size()); |
| 97 | + |
| 98 | + for (final DateHistogramBucket bucket : histogramBuckets) { |
| 99 | + final DateTime date = new DateTime(bucket.key(), DateTimeZone.UTC); |
| 100 | + |
| 101 | + final List<StringTermsBucket> termBuckets = bucket.aggregations() |
| 102 | + .get(AGG_MESSAGE_FIELD).sterms().buckets().array(); |
| 103 | + |
| 104 | + final HashMap<String, Long> termCounts = Maps.newHashMapWithExpectedSize(termBuckets.size()); |
| 105 | + for (final StringTermsBucket termBucket : termBuckets) { |
| 106 | + termCounts.put(termBucket.key(), termBucket.docCount()); |
| 107 | + } |
| 108 | + |
| 109 | + result.put(date, termCounts); |
| 110 | + } |
| 111 | + |
| 112 | + return ImmutableMap.copyOf(result); |
| 113 | + } |
| 114 | + |
| 115 | + @Override |
| 116 | + public long count(Set<String> indices, Optional<Set<String>> includedStreams) { |
| 117 | + final Query query = buildStreamIdFilter(includedStreams); |
| 118 | + |
| 119 | + final SearchRequest searchRequest = SearchRequest.of(sr -> sr |
| 120 | + .index(indices.stream().toList()) |
| 121 | + .query(query) |
| 122 | + .ignoreUnavailable(true) |
| 123 | + .allowNoIndices(true) |
| 124 | + .expandWildcards(ExpandWildcard.Open) |
| 125 | + .trackTotalHits(t -> t.enabled(true)) |
| 126 | + .size(0) |
| 127 | + ); |
| 128 | + |
| 129 | + final SearchResponse<Void> response = client.sync( |
| 130 | + c -> c.search(searchRequest, Void.class), |
| 131 | + "Unable to count documents of index." |
| 132 | + ); |
| 133 | + |
| 134 | + return response.hits().total().value(); |
| 135 | + } |
| 136 | + |
| 137 | + private Query buildStreamIdFilter(Optional<Set<String>> includedStreams) { |
| 138 | + BoolQuery.Builder queryBuilder = BoolQuery.builder().must(MatchAllQuery.builder().build().toQuery()); |
| 139 | + |
| 140 | + // If the included streams are not present, we do not filter on streams |
| 141 | + if (includedStreams.isPresent()) { |
| 142 | + final Set<String> streams = includedStreams.get(); |
| 143 | + final BoolQuery.Builder filterBuilder = new BoolQuery.Builder(); |
| 144 | + |
| 145 | + // If the included streams set contains the default stream, we also want all documents which do not |
| 146 | + // have any stream assigned. Those documents have basically been in the "default stream" which didn't |
| 147 | + // exist in Graylog <2.2.0. |
| 148 | + if (streams.contains(Stream.DEFAULT_STREAM_ID)) { |
| 149 | + final Query noStreamsField = Query.of(q -> q |
| 150 | + .bool(b -> b.mustNot(mn -> mn.exists(e -> e.field(Message.FIELD_STREAMS)))) |
| 151 | + ); |
| 152 | + filterBuilder.should(noStreamsField); |
| 153 | + } |
| 154 | + |
| 155 | + // Only select messages which are assigned to the given streams |
| 156 | + final Query termsQuery = Query.of(q -> q |
| 157 | + .terms(t -> t |
| 158 | + .field(Message.FIELD_STREAMS) |
| 159 | + .terms(tv -> tv.value(streams.stream().map(FieldValue::of).toList())) |
| 160 | + ) |
| 161 | + ); |
| 162 | + filterBuilder.should(termsQuery); |
| 163 | + |
| 164 | + queryBuilder.filter(filterBuilder.build().toQuery()); |
| 165 | + } |
| 166 | + |
| 167 | + return queryBuilder.build().toQuery(); |
| 168 | + } |
| 169 | +} |
0 commit comments