Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFDisksCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFHeapMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFNodeStatsAllShardsMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFShardOperationRateCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFThreadPoolMetricsCollector;
import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory;
import org.opensearch.performanceanalyzer.commons.collectors.DisksCollector;
Expand Down Expand Up @@ -230,6 +231,9 @@ private void scheduleTelemetryCollectors() {
new RTFDisksCollector(performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFHeapMetricsCollector(performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFShardOperationRateCollector(
performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFThreadPoolMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.collectors.telemetry;

import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector;
import org.opensearch.performanceanalyzer.commons.collectors.TelemetryCollector;
import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics;
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.MetricUnits;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.util.Utils;
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

/**
* This collector measures indexing and search rate per shard. The metric measurement is difference
* between current and last window's operation. For example - if the last window had operation count
* as 10, and now it changed to 12, then collector will publish 2 ops/interval.
*/
public class RTFShardOperationRateCollector extends PerformanceAnalyzerMetricsCollector
implements TelemetryCollector {

private static final Logger LOG = LogManager.getLogger(RTFShardOperationRateCollector.class);
public static final int SAMPLING_TIME_INTERVAL =
MetricsConfiguration.CONFIG_MAP.get(RTFShardOperationRateCollector.class)
.samplingInterval;

private Counter indexingRateCounter;
private Counter searchRateCounter;

private final Map<ShardId, Long> prevIndexingOps;
private final Map<ShardId, Long> prevSearchOps;
private long lastCollectionTimeInMillis;

private MetricsRegistry metricsRegistry;
private boolean metricsInitialized;
private final PerformanceAnalyzerController controller;
private final ConfigOverridesWrapper configOverridesWrapper;

public RTFShardOperationRateCollector(
PerformanceAnalyzerController controller,
ConfigOverridesWrapper configOverridesWrapper) {
super(
SAMPLING_TIME_INTERVAL,
"RTFShardOperationRateCollector",
StatMetrics.RTF_SHARD_OPERATION_RATE_COLLECTOR_EXECUTION_TIME,
StatExceptionCode.RTF_SHARD_OPERATION_RATE_COLLECTOR_ERROR);

this.controller = controller;
this.configOverridesWrapper = configOverridesWrapper;
this.metricsInitialized = false;

this.prevIndexingOps = new HashMap<>();
this.prevSearchOps = new HashMap<>();
this.lastCollectionTimeInMillis = System.currentTimeMillis();
}

@Override
public void collectMetrics(long startTime) {
if (controller.isCollectorDisabled(configOverridesWrapper, getCollectorName())) {
LOG.info("RTFShardOperationRateCollector is disabled. Skipping collection.");
return;
}

metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
if (metricsRegistry == null) {
LOG.error("Could not get the instance of MetricsRegistry class");
return;
}

initializeMetricsIfNeeded();
LOG.debug("Executing collect metrics for RTFShardOperationRateCollector");

// Get all shards
Map<ShardId, IndexShard> currentShards = Utils.getShards();

for (Map.Entry<ShardId, IndexShard> entry : currentShards.entrySet()) {
ShardId shardId = entry.getKey();
IndexShard shard = entry.getValue();

try {
long currentIndexingOps = shard.indexingStats().getTotal().getIndexCount();
long currentSearchOps = shard.searchStats().getTotal().getQueryCount();

if (prevIndexingOps.containsKey(shardId)) {
processIndexingOperations(shardId, currentIndexingOps);
}

if (prevSearchOps.containsKey(shardId)) {
processSearchOperations(shardId, currentSearchOps);
}

// Update previous values for next collection
prevIndexingOps.put(shardId, currentIndexingOps);
prevSearchOps.put(shardId, currentSearchOps);
} catch (Exception e) {
LOG.error(
"Error collecting indexing/search rate metrics for shard {}: {}",
shardId,
e.getMessage());
}
}
}

private void processIndexingOperations(ShardId shardId, long currentIndexingOps) {
long indexingOpsDiff = Math.max(0, currentIndexingOps - prevIndexingOps.get(shardId));

if (indexingOpsDiff > 0) {
Tags tags = createTags(shardId);
indexingRateCounter.add(indexingOpsDiff, tags);
}
}

private void processSearchOperations(ShardId shardId, long currentSearchOps) {
long searchOpsDiff = Math.max(0, currentSearchOps - prevSearchOps.get(shardId));

if (searchOpsDiff > 0) {
Tags tags = createTags(shardId);
searchRateCounter.add(searchOpsDiff, tags);
}
}

// attributes= {index_name="test", shard_id="0"}
private Tags createTags(ShardId shardId) {
return Tags.create()
.addTag(RTFMetrics.CommonDimension.INDEX_NAME.toString(), shardId.getIndexName())
.addTag(
RTFMetrics.CommonDimension.SHARD_ID.toString(),
String.valueOf(shardId.getId()));
}

private void initializeMetricsIfNeeded() {
if (!metricsInitialized) {
indexingRateCounter =
metricsRegistry.createCounter(
RTFMetrics.OperationsValue.Constants.INDEXING_RATE,
"Indexing operations per shard",
MetricUnits.RATE.toString());

searchRateCounter =
metricsRegistry.createCounter(
RTFMetrics.OperationsValue.Constants.SEARCH_RATE,
"Search operations per shard",
MetricUnits.RATE.toString());

metricsInitialized = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics;
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.OperationsValue;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.util.Utils;
Expand All @@ -38,6 +39,7 @@ public class RTFPerformanceAnalyzerSearchListener
LogManager.getLogger(RTFPerformanceAnalyzerSearchListener.class);
private static final String SHARD_FETCH_PHASE = "shard_fetch";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change to Otel standards shard.fetch this is same for all dimensions added

private static final String SHARD_QUERY_PHASE = "shard_query";
private static final String SHARD_QUERY_PLUS_FETCH_PHASE = "shard_query_plus_fetch";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just call search.latency ?

public static final String QUERY_START_TIME = "query_start_time";
public static final String FETCH_START_TIME = "fetch_start_time";
public static final String QUERY_TASK_ID = "query_task_id";
Expand All @@ -47,6 +49,10 @@ public class RTFPerformanceAnalyzerSearchListener
private final PerformanceAnalyzerController controller;
private final Histogram cpuUtilizationHistogram;
private final Histogram heapUsedHistogram;
private final Histogram queryPhaseHistogram;
private final Histogram fetchPhaseHistogram;
private final Histogram queryPlusFetchPhaseHistogram;
private final Histogram searchLatencyHistogram;
private final int numProcessors;

public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) {
Expand All @@ -55,6 +61,15 @@ public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController
createCPUUtilizationHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry());
this.heapUsedHistogram =
createHeapUsedHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry());
this.queryPhaseHistogram =
createQueryPhaseHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry());
this.fetchPhaseHistogram =
createFetchPhaseHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry());
this.queryPlusFetchPhaseHistogram =
createQueryPlusFetchPhaseHistogram(
OpenSearchResources.INSTANCE.getMetricsRegistry());
this.searchLatencyHistogram =
createSearchLatencyHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry());
this.threadLocal = ThreadLocal.withInitial(() -> new HashMap<String, Long>());
this.numProcessors = Runtime.getRuntime().availableProcessors();
}
Expand Down Expand Up @@ -83,6 +98,58 @@ private Histogram createHeapUsedHistogram(MetricsRegistry metricsRegistry) {
}
}

private Histogram createQueryPhaseHistogram(MetricsRegistry metricsRegistry) {
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
RTFMetrics.OperationsValue.QUERY_PHASE_LATENCY.toString(),
"Query phase latency per shard",
RTFMetrics.MetricUnits.MILLISECOND.toString());
} else {
LOG.debug("MetricsRegistry is null");
return null;
}
}

private Histogram createFetchPhaseHistogram(MetricsRegistry metricsRegistry) {
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
RTFMetrics.OperationsValue.FETCH_PHASE_LATENCY.toString(),
"Fetch phase latency per shard",
RTFMetrics.MetricUnits.MILLISECOND.toString());
} else {
LOG.debug("MetricsRegistry is null");
return null;
}
}

// This histogram will help to get the sum of latencies for query and fetch phase using getSum
// over an interval.
private Histogram createQueryPlusFetchPhaseHistogram(MetricsRegistry metricsRegistry) {
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
RTFMetrics.OperationsValue.QUERY_PLUS_FETCH_PHASE_LATENCY.toString(),
"Query plus fetch phase latency per shard",
RTFMetrics.MetricUnits.MILLISECOND.toString());
} else {
LOG.debug("MetricsRegistry is null");
return null;
}
}

// This histogram will help to get the total latency for search request using getMax over an
// interval.
private Histogram createSearchLatencyHistogram(MetricsRegistry metricsRegistry) {
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
OperationsValue.SEARCH_LATENCY.toString(),
"Total Search latency per shard",
RTFMetrics.MetricUnits.MILLISECOND.toString());
} else {
LOG.debug("MetricsRegistry is null");
return null;
}
}

@Override
public String toString() {
return RTFPerformanceAnalyzerSearchListener.class.getSimpleName();
Expand Down Expand Up @@ -171,6 +238,13 @@ public void preQueryPhase(SearchContext searchContext) {
public void queryPhase(SearchContext searchContext, long tookInNanos) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime());
long queryTime = (System.nanoTime() - queryStartTime);
double queryTimeInMills = queryTime / 1_000_000.0;

queryPhaseHistogram.record(
queryTimeInMills, createTags(searchContext, SHARD_QUERY_PHASE, false));
queryPlusFetchPhaseHistogram.record(
queryTimeInMills, createTags(searchContext, SHARD_QUERY_PLUS_FETCH_PHASE, false));

addResourceTrackingCompletionListener(
searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, false);
}
Expand All @@ -192,6 +266,13 @@ public void preFetchPhase(SearchContext searchContext) {
public void fetchPhase(SearchContext searchContext, long tookInNanos) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime());
long fetchTime = (System.nanoTime() - fetchStartTime);
double fetchTimeInMills = fetchTime / 1_000_000.0;

fetchPhaseHistogram.record(
fetchTimeInMills, createTags(searchContext, SHARD_FETCH_PHASE, false));
queryPlusFetchPhaseHistogram.record(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add the phase here? It defeats the purpose of this metric.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the metric tags to have same value for phase, so that both metrics are the same.

fetchTimeInMills, createTags(searchContext, SHARD_QUERY_PLUS_FETCH_PHASE, false));

addResourceTrackingCompletionListenerForFetchPhase(
searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, false);
}
Expand Down Expand Up @@ -262,32 +343,21 @@ protected void innerOnResponse(Task task) {
* overall start time.
*/
long totalTime = System.nanoTime() - startTime;
double totalTimeInMills = totalTime / 1_000_000.0;
double shareFactor = computeShareFactor(phaseTookTime, totalTime);

searchLatencyHistogram.record(
totalTimeInMills, createTags(searchContext, phase, isFailed));
cpuUtilizationHistogram.record(
Utils.calculateCPUUtilization(
numProcessors,
totalTime,
task.getTotalResourceStats().getCpuTimeInNanos(),
shareFactor),
createTags());
createTags(searchContext, phase, isFailed));
heapUsedHistogram.record(
Math.max(0, task.getTotalResourceStats().getMemoryInBytes() * shareFactor),
createTags());
}

private Tags createTags() {
return Tags.create()
.addTag(
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
searchContext.request().shardId().getIndex().getName())
.addTag(
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
searchContext.request().shardId().getIndex().getUUID())
.addTag(
RTFMetrics.CommonDimension.SHARD_ID.toString(),
searchContext.request().shardId().getId())
.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase)
.addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed);
createTags(searchContext, phase, isFailed));
}

@Override
Expand All @@ -297,6 +367,21 @@ protected void innerOnFailure(Exception e) {
};
}

private Tags createTags(SearchContext searchContext, String phase, boolean isFailed) {
return Tags.create()
.addTag(
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
searchContext.request().shardId().getIndex().getName())
.addTag(
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
searchContext.request().shardId().getIndex().getUUID())
.addTag(
RTFMetrics.CommonDimension.SHARD_ID.toString(),
searchContext.request().shardId().getId())
.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase)
.addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed);
}

@VisibleForTesting
static double computeShareFactor(long phaseTookTime, long totalTime) {
return Math.min(1, ((double) phaseTookTime) / Math.max(1.0, totalTime));
Expand Down
Loading