From 6816f9178651832b2dee9dd6272ccfe8e7c229b8 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 9 Apr 2025 00:55:21 +0000 Subject: [PATCH 01/28] wip --- .../state/AsynchronousMetricStorage.java | 130 ++++++++++-------- .../state/AsynchronousMetricStorageTest.java | 2 +- 2 files changed, 77 insertions(+), 55 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 42110e2cb1e..9ff959c5bf0 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -21,6 +21,7 @@ import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; +import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter; @@ -31,6 +32,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; @@ -58,16 +60,15 @@ public final class AsynchronousMetricStorage points; - // Only populated if aggregationTemporality == DELTA private Map lastPoints; // Only populated if memoryMode == REUSABLE_DATA private final ObjectPool reusablePointsPool; + private final List reusableDeltaPoints = new ArrayList<>(); - // Only populated if memoryMode == REUSABLE_DATA - private final ArrayList reusableResultList = new ArrayList<>(); + private Map> aggregatorHandles = new HashMap<>(); + private Map latestRecordedMeasurements = new HashMap<>(); private final MemoryMode memoryMode; @@ -88,12 +89,15 @@ private AsynchronousMetricStorage( this.attributesProcessor = attributesProcessor; this.maxCardinality = maxCardinality - 1; this.reusablePointsPool = new ObjectPool<>(aggregator::createReusablePoint); + if (memoryMode == REUSABLE_DATA) { lastPoints = new PooledHashMap<>(); - points = new PooledHashMap<>(); + aggregatorHandles = new PooledHashMap<>(); + latestRecordedMeasurements = new PooledHashMap<>(); } else { lastPoints = new HashMap<>(); - points = new HashMap<>(); + aggregatorHandles = new HashMap<>(); + latestRecordedMeasurements = new HashMap<>(); } } @@ -141,7 +145,7 @@ void record(Measurement measurement) { } private void recordPoint(Attributes attributes, Measurement measurement) { - if (points.size() >= maxCardinality) { + if (aggregatorHandles.size() >= maxCardinality) { throttlingLogger.log( Level.WARNING, "Instrument " @@ -151,7 +155,7 @@ private void recordPoint(Attributes attributes, Measurement measurement) { + ")."); attributes = MetricStorage.CARDINALITY_OVERFLOW; measurement = measurement.withAttributes(attributes); - } else if (points.containsKey( + } else if (aggregatorHandles.containsKey( attributes)) { // Check there is not already a recording for the attributes throttlingLogger.log( Level.WARNING, @@ -159,18 +163,33 @@ private void recordPoint(Attributes attributes, Measurement measurement) { + metricDescriptor.getSourceInstrument().getName() + " has recorded multiple values for the same attributes: " + attributes); - return; } - T dataPoint; - if (memoryMode == REUSABLE_DATA) { - dataPoint = reusablePointsPool.borrowObject(); - aggregator.toPoint(measurement, dataPoint); + Measurement recentMeasurement = latestRecordedMeasurements.get(attributes); + if (recentMeasurement != null + && (recentMeasurement.startEpochNanos() != measurement.startEpochNanos() + || recentMeasurement.epochNanos() != measurement.epochNanos())) { + String msg = + String.format( + "Instrument " + + metricDescriptor.getSourceInstrument().getName() + + " has recorded multiple values for the same attributes (%s) with different metadata: %s/%s", + attributes, + recentMeasurement, + measurement); + throttlingLogger.log(Level.WARNING, msg); + } + latestRecordedMeasurements.put(attributes, measurement); + + AggregatorHandle handle = + aggregatorHandles.computeIfAbsent(attributes, key -> aggregator.createHandle()); + if (measurement.hasDoubleValue()) { + handle.recordDouble(measurement.doubleValue(), attributes, Context.current()); + } else if (measurement.hasLongValue()) { + handle.recordLong(measurement.longValue(), attributes, Context.current()); } else { - dataPoint = aggregator.toPoint(measurement); + throw new IllegalStateException(); } - - points.put(attributes, dataPoint); } @Override @@ -189,47 +208,60 @@ public MetricData collect( InstrumentationScopeInfo instrumentationScopeInfo, long startEpochNanos, long epochNanos) { + boolean reset = aggregationTemporality == AggregationTemporality.DELTA; + if (memoryMode == REUSABLE_DATA) { // Collect can not run concurrently for same reader, hence we safely assume // the previous collect result has been used and done with - reusableResultList.forEach(reusablePointsPool::returnObject); - reusableResultList.clear(); + reusableDeltaPoints.forEach(reusablePointsPool::returnObject); + reusableDeltaPoints.clear(); } - Collection result; - if (aggregationTemporality == AggregationTemporality.DELTA) { - Map points = this.points; - Map lastPoints = this.lastPoints; + Map currentPoints = new HashMap<>(); + aggregatorHandles.forEach( + (attributes, handle) -> { + Measurement latestRecordedMeasurement = latestRecordedMeasurements.get(attributes); + if (latestRecordedMeasurement == null) { + throw new IllegalStateException("Unexpected"); + } + T value = + handle.aggregateThenMaybeReset( + latestRecordedMeasurement.startEpochNanos(), + latestRecordedMeasurement.epochNanos(), + attributes, + reset); + currentPoints.put(attributes, value); + }); - Collection deltaPoints; - if (memoryMode == REUSABLE_DATA) { - deltaPoints = reusableResultList; - } else { - deltaPoints = new ArrayList<>(); - } + if (memoryMode == REUSABLE_DATA) { + aggregatorHandles.clear(); + latestRecordedMeasurements.clear(); + } else { + aggregatorHandles = new HashMap<>(); + latestRecordedMeasurements = new HashMap<>(); + } - points.forEach( - (k, v) -> { - T lastPoint = lastPoints.get(k); + Collection result; + if (aggregationTemporality == AggregationTemporality.DELTA) { + Collection deltaPoints = + memoryMode == REUSABLE_DATA ? reusableDeltaPoints : new ArrayList<>(); + currentPoints.forEach( + (attributes, currentPoint) -> { + T lastPoint = lastPoints.get(attributes); T deltaPoint; if (lastPoint == null) { - if (memoryMode == REUSABLE_DATA) { - deltaPoint = reusablePointsPool.borrowObject(); - aggregator.copyPoint(v, deltaPoint); - } else { - deltaPoint = v; - } + deltaPoint = currentPoint; } else { if (memoryMode == REUSABLE_DATA) { - aggregator.diffInPlace(lastPoint, v); + aggregator.diffInPlace(lastPoint, currentPoint); deltaPoint = lastPoint; // Remaining last points are returned to reusablePointsPool, but // this reusable point is still used, so don't return it to pool yet - lastPoints.remove(k); + lastPoints.remove(attributes); } else { - deltaPoint = aggregator.diff(lastPoint, v); + deltaPoint = aggregator.diff(lastPoint, currentPoint); } } @@ -237,25 +269,15 @@ public MetricData collect( }); if (memoryMode == REUSABLE_DATA) { - lastPoints.forEach((k, v) -> reusablePointsPool.returnObject(v)); + lastPoints.forEach((attributes, value) -> reusablePointsPool.returnObject(value)); lastPoints.clear(); - this.points = lastPoints; - } else { - this.points = new HashMap<>(); } - this.lastPoints = points; result = deltaPoints; - } else /* CUMULATIVE */ { - if (memoryMode == REUSABLE_DATA) { - points.forEach((k, v) -> reusableResultList.add(v)); - points.clear(); - result = reusableResultList; - } else { - result = points.values(); - points = new HashMap<>(); - } + } else { + result = currentPoints.values(); } + this.lastPoints = currentPoints; return aggregator.toMetricData( resource, instrumentationScopeInfo, metricDescriptor, result, aggregationTemporality); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java index 23c45a9349f..44d8e22d7cf 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java @@ -227,7 +227,7 @@ void record_DuplicateAttributes(MemoryMode memoryMode) { sum -> sum.hasPointsSatisfying( point -> - point.hasValue(1).hasAttributes(attributeEntry("key1", "a"))))); + point.hasValue(3).hasAttributes(attributeEntry("key1", "a"))))); logs.assertContains( "Instrument long-counter has recorded multiple values for the same attributes: {key1=\"a\"}"); } From ed783f96af5eded4442d7f53291fc1230f30d5f2 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 9 Apr 2025 22:18:35 +0000 Subject: [PATCH 02/28] fix tests --- .../state/AsynchronousMetricStorage.java | 107 +++++++----------- .../state/SdkObservableMeasurement.java | 44 ++----- .../sdk/metrics/CardinalityTest.java | 6 +- .../state/AsynchronousMetricStorageTest.java | 69 +++++------ .../state/CallbackRegistrationTest.java | 51 +++++---- .../state/SdkObservableMeasurementTest.java | 61 ++-------- 6 files changed, 130 insertions(+), 208 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 9ff959c5bf0..24d67281af9 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -53,6 +53,7 @@ public final class AsynchronousMetricStorage aggregator; private final AttributesProcessor attributesProcessor; + private final MemoryMode memoryMode; /** * This field is set to 1 less than the actual intended cardinality limit, allowing the last slot @@ -63,14 +64,14 @@ public final class AsynchronousMetricStorage lastPoints; + private Map> aggregatorHandles = new HashMap<>(); + // Only populated if memoryMode == REUSABLE_DATA private final ObjectPool reusablePointsPool; private final List reusableDeltaPoints = new ArrayList<>(); - private Map> aggregatorHandles = new HashMap<>(); - private Map latestRecordedMeasurements = new HashMap<>(); - - private final MemoryMode memoryMode; + private long startEpochNanos; + private long epochNanos; private AsynchronousMetricStorage( RegisteredReader registeredReader, @@ -91,13 +92,11 @@ private AsynchronousMetricStorage( this.reusablePointsPool = new ObjectPool<>(aggregator::createReusablePoint); if (memoryMode == REUSABLE_DATA) { - lastPoints = new PooledHashMap<>(); - aggregatorHandles = new PooledHashMap<>(); - latestRecordedMeasurements = new PooledHashMap<>(); + this.lastPoints = new PooledHashMap<>(); + this.aggregatorHandles = new PooledHashMap<>(); } else { - lastPoints = new HashMap<>(); - aggregatorHandles = new HashMap<>(); - latestRecordedMeasurements = new HashMap<>(); + this.lastPoints = new HashMap<>(); + this.aggregatorHandles = new HashMap<>(); } } @@ -127,24 +126,34 @@ AsynchronousMetricStorage create( registeredView.getCardinalityLimit()); } - /** - * Record callback measurement from {@link ObservableLongMeasurement} or {@link - * ObservableDoubleMeasurement}. - */ - void record(Measurement measurement) { - Context context = Context.current(); - Attributes processedAttributes = attributesProcessor.process(measurement.attributes(), context); - long start = - aggregationTemporality == AggregationTemporality.DELTA - ? registeredReader.getLastCollectEpochNanos() - : measurement.startEpochNanos(); + /** Record callback measurement from {@link ObservableLongMeasurement}. */ + void record(Attributes attributes, long value) { + attributes = validateAndProcessAttributes(attributes); + AggregatorHandle handle = + aggregatorHandles.computeIfAbsent(attributes, key -> aggregator.createHandle()); + handle.recordLong(value, attributes, Context.current()); + } - measurement = measurement.withAttributes(processedAttributes).withStartEpochNanos(start); + /** Record callback measurement from {@link ObservableDoubleMeasurement}. */ + void record(Attributes attributes, double value) { + attributes = validateAndProcessAttributes(attributes); + AggregatorHandle handle = + aggregatorHandles.computeIfAbsent(attributes, key -> aggregator.createHandle()); + handle.recordDouble(value, attributes, Context.current()); + } - recordPoint(processedAttributes, measurement); + void setEpochInformation(long startEpochNanos, long epochNanos) { + this.startEpochNanos = + aggregationTemporality == AggregationTemporality.DELTA + ? registeredReader.getLastCollectEpochNanos() + : startEpochNanos; + this.epochNanos = epochNanos; } - private void recordPoint(Attributes attributes, Measurement measurement) { + private Attributes validateAndProcessAttributes(Attributes attributes) { + Context context = Context.current(); + attributes = attributesProcessor.process(attributes, context); + if (aggregatorHandles.size() >= maxCardinality) { throttlingLogger.log( Level.WARNING, @@ -153,9 +162,10 @@ private void recordPoint(Attributes attributes, Measurement measurement) { + " has exceeded the maximum allowed cardinality (" + maxCardinality + ")."); - attributes = MetricStorage.CARDINALITY_OVERFLOW; - measurement = measurement.withAttributes(attributes); - } else if (aggregatorHandles.containsKey( + return MetricStorage.CARDINALITY_OVERFLOW; + } + + if (aggregatorHandles.containsKey( attributes)) { // Check there is not already a recording for the attributes throttlingLogger.log( Level.WARNING, @@ -165,31 +175,7 @@ private void recordPoint(Attributes attributes, Measurement measurement) { + attributes); } - Measurement recentMeasurement = latestRecordedMeasurements.get(attributes); - if (recentMeasurement != null - && (recentMeasurement.startEpochNanos() != measurement.startEpochNanos() - || recentMeasurement.epochNanos() != measurement.epochNanos())) { - String msg = - String.format( - "Instrument " - + metricDescriptor.getSourceInstrument().getName() - + " has recorded multiple values for the same attributes (%s) with different metadata: %s/%s", - attributes, - recentMeasurement, - measurement); - throttlingLogger.log(Level.WARNING, msg); - } - latestRecordedMeasurements.put(attributes, measurement); - - AggregatorHandle handle = - aggregatorHandles.computeIfAbsent(attributes, key -> aggregator.createHandle()); - if (measurement.hasDoubleValue()) { - handle.recordDouble(measurement.doubleValue(), attributes, Context.current()); - } else if (measurement.hasLongValue()) { - handle.recordLong(measurement.longValue(), attributes, Context.current()); - } else { - throw new IllegalStateException(); - } + return attributes; } @Override @@ -220,14 +206,10 @@ public MetricData collect( Map currentPoints = new HashMap<>(); aggregatorHandles.forEach( (attributes, handle) -> { - Measurement latestRecordedMeasurement = latestRecordedMeasurements.get(attributes); - if (latestRecordedMeasurement == null) { - throw new IllegalStateException("Unexpected"); - } T value = handle.aggregateThenMaybeReset( - latestRecordedMeasurement.startEpochNanos(), - latestRecordedMeasurement.epochNanos(), + AsynchronousMetricStorage.this.startEpochNanos, + AsynchronousMetricStorage.this.epochNanos, attributes, reset); currentPoints.put(attributes, value); @@ -235,16 +217,13 @@ public MetricData collect( if (memoryMode == REUSABLE_DATA) { aggregatorHandles.clear(); - latestRecordedMeasurements.clear(); } else { aggregatorHandles = new HashMap<>(); - latestRecordedMeasurements = new HashMap<>(); } Collection result; if (aggregationTemporality == AggregationTemporality.DELTA) { - Collection deltaPoints = - memoryMode == REUSABLE_DATA ? reusableDeltaPoints : new ArrayList<>(); + result = memoryMode == REUSABLE_DATA ? reusableDeltaPoints : new ArrayList<>(); currentPoints.forEach( (attributes, currentPoint) -> { T lastPoint = lastPoints.get(attributes); @@ -265,15 +244,13 @@ public MetricData collect( } } - deltaPoints.add(deltaPoint); + result.add(deltaPoint); }); if (memoryMode == REUSABLE_DATA) { lastPoints.forEach((attributes, value) -> reusablePointsPool.returnObject(value)); lastPoints.clear(); } - - result = deltaPoints; } else { result = currentPoints.values(); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java index 27eeac81190..b047e95de2f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java @@ -5,19 +5,14 @@ package io.opentelemetry.sdk.metrics.internal.state; -import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createDouble; -import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createLong; - import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; import io.opentelemetry.api.metrics.ObservableLongMeasurement; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; -import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader; import java.util.List; -import java.util.Objects; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -38,9 +33,6 @@ public final class SdkObservableMeasurement private final InstrumentDescriptor instrumentDescriptor; private final List> storages; - /** Only used when {@code activeReader}'s memoryMode is {@link MemoryMode#REUSABLE_DATA}. */ - private final MutableMeasurement mutableMeasurement = new MutableMeasurement(); - // These fields are set before invoking callbacks. They allow measurements to be recorded to the // storages for correct reader, and with the correct time. @Nullable private volatile RegisteredReader activeReader; @@ -114,18 +106,12 @@ public void record(long value, Attributes attributes) { return; } - Measurement measurement; - - MemoryMode memoryMode = activeReader.getReader().getMemoryMode(); - if (Objects.requireNonNull(memoryMode) == MemoryMode.IMMUTABLE_DATA) { - measurement = createLong(startEpochNanos, epochNanos, value, attributes); - } else { - MutableMeasurement.setLongMeasurement( - mutableMeasurement, startEpochNanos, epochNanos, value, attributes); - measurement = mutableMeasurement; + for (AsynchronousMetricStorage storage : storages) { + if (storage.getRegisteredReader().equals(activeReader)) { + storage.setEpochInformation(startEpochNanos, epochNanos); + storage.record(attributes, value); + } } - - doRecord(measurement); } @Override @@ -135,10 +121,12 @@ public void record(double value) { @Override public void record(double value, Attributes attributes) { + RegisteredReader activeReader = this.activeReader; if (activeReader == null) { logNoActiveReader(); return; } + if (Double.isNaN(value)) { logger.log( Level.FINE, @@ -150,24 +138,10 @@ public void record(double value, Attributes attributes) { return; } - Measurement measurement; - MemoryMode memoryMode = activeReader.getReader().getMemoryMode(); - if (Objects.requireNonNull(memoryMode) == MemoryMode.IMMUTABLE_DATA) { - measurement = createDouble(startEpochNanos, epochNanos, value, attributes); - } else { - MutableMeasurement.setDoubleMeasurement( - mutableMeasurement, startEpochNanos, epochNanos, value, attributes); - measurement = mutableMeasurement; - } - - doRecord(measurement); - } - - private void doRecord(Measurement measurement) { - RegisteredReader activeReader = this.activeReader; for (AsynchronousMetricStorage storage : storages) { if (storage.getRegisteredReader().equals(activeReader)) { - storage.record(measurement); + storage.setEpochInformation(startEpochNanos, epochNanos); + storage.record(attributes, value); } } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java index 17989d14a1a..fd5e90aa3f6 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java @@ -421,7 +421,7 @@ void readerAndViewCardinalityConfiguration() { 0, asyncCounterLimit, LongPointData::getValue, - 1)))); + DEFAULT_MAX_CARDINALITY - (asyncCounterLimit - 1))))); assertThat(cumulativeReader.collectAllMetrics()) .as("cumulative collection") .satisfiesExactlyInAnyOrder( @@ -485,7 +485,7 @@ void readerAndViewCardinalityConfiguration() { 0, asyncCounterLimit, LongPointData::getValue, - 1)))); + DEFAULT_MAX_CARDINALITY - (asyncCounterLimit - 1))))); // Record another round of measurements, again exceeding cardinality limits for (int i = DEFAULT_MAX_CARDINALITY; i < DEFAULT_MAX_CARDINALITY * 2; i++) { @@ -625,7 +625,7 @@ void readerAndViewCardinalityConfiguration() { 0, asyncCounterLimit, LongPointData::getValue, - 1)))); + DEFAULT_MAX_CARDINALITY - (asyncCounterLimit - 1))))); } /** diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java index 44d8e22d7cf..bfc4ff97f1b 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java @@ -6,8 +6,6 @@ package io.opentelemetry.sdk.metrics.internal.state; import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA; -import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createDouble; -import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createLong; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry; import static org.mockito.ArgumentMatchers.any; @@ -107,9 +105,10 @@ void setup(MemoryMode memoryMode) { void recordLong(MemoryMode memoryMode) { setup(memoryMode); - longCounterStorage.record(createLong(0, 1, 1, Attributes.builder().put("key", "a").build())); - longCounterStorage.record(createLong(0, 1, 2, Attributes.builder().put("key", "b").build())); - longCounterStorage.record(createLong(0, 1, 3, Attributes.builder().put("key", "c").build())); + longCounterStorage.setEpochInformation(0, 1); + longCounterStorage.record(Attributes.builder().put("key", "a").build(), 1); + longCounterStorage.record(Attributes.builder().put("key", "b").build(), 2); + longCounterStorage.record(Attributes.builder().put("key", "c").build(), 3); assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( @@ -132,12 +131,10 @@ void recordLong(MemoryMode memoryMode) { void recordDouble(MemoryMode memoryMode) { setup(memoryMode); - doubleCounterStorage.record( - createDouble(0, 1, 1.1, Attributes.builder().put("key", "a").build())); - doubleCounterStorage.record( - createDouble(0, 1, 2.2, Attributes.builder().put("key", "b").build())); - doubleCounterStorage.record( - createDouble(0, 1, 3.3, Attributes.builder().put("key", "c").build())); + longCounterStorage.setEpochInformation(0, 1); + doubleCounterStorage.record(Attributes.builder().put("key", "a").build(), 1.1); + doubleCounterStorage.record(Attributes.builder().put("key", "b").build(), 2.2); + doubleCounterStorage.record(Attributes.builder().put("key", "c").build(), 3.3); assertThat(doubleCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( @@ -179,8 +176,8 @@ void record_ProcessesAttributes(MemoryMode memoryMode) { InstrumentValueType.LONG, Advice.empty())); - storage.record( - createLong(0, 1, 1, Attributes.builder().put("key1", "a").put("key2", "b").build())); + storage.setEpochInformation(0, 1); + storage.record(Attributes.builder().put("key1", "a").put("key2", "b").build(), 1); assertThat(storage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( @@ -199,9 +196,9 @@ void record_ProcessesAttributes(MemoryMode memoryMode) { void record_MaxCardinality(MemoryMode memoryMode) { setup(memoryMode); + longCounterStorage.setEpochInformation(0, 1); for (int i = 0; i <= CARDINALITY_LIMIT + 1; i++) { - longCounterStorage.record( - createLong(0, 1, 1, Attributes.builder().put("key" + i, "val").build())); + longCounterStorage.record(Attributes.builder().put("key" + i, "val").build(), 1); } assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) @@ -216,8 +213,9 @@ void record_MaxCardinality(MemoryMode memoryMode) { void record_DuplicateAttributes(MemoryMode memoryMode) { setup(memoryMode); - longCounterStorage.record(createLong(0, 1, 1, Attributes.builder().put("key1", "a").build())); - longCounterStorage.record(createLong(0, 1, 2, Attributes.builder().put("key1", "a").build())); + longCounterStorage.setEpochInformation(0, 1); + longCounterStorage.record(Attributes.builder().put("key1", "a").build(), 1); + longCounterStorage.record(Attributes.builder().put("key1", "a").build(), 2); assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( @@ -238,7 +236,8 @@ void collect_CumulativeReportsCumulativeObservations(MemoryMode memoryMode) { setup(memoryMode); // Record measurement and collect at time 10 - longCounterStorage.record(createLong(0, 10, 3, Attributes.empty())); + longCounterStorage.setEpochInformation(0, 10); + longCounterStorage.record(Attributes.empty(), 3); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -253,9 +252,9 @@ void collect_CumulativeReportsCumulativeObservations(MemoryMode memoryMode) { registeredReader.setLastCollectEpochNanos(10); // Record measurements and collect at time 30 - longCounterStorage.record(createLong(0, 30, 3, Attributes.empty())); - longCounterStorage.record( - createLong(0, 30, 6, Attributes.builder().put("key", "value1").build())); + longCounterStorage.setEpochInformation(0, 30); + longCounterStorage.record(Attributes.empty(), 3); + longCounterStorage.record(Attributes.builder().put("key", "value1").build(), 6); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -276,9 +275,9 @@ void collect_CumulativeReportsCumulativeObservations(MemoryMode memoryMode) { registeredReader.setLastCollectEpochNanos(30); // Record measurement and collect at time 35 - longCounterStorage.record(createLong(0, 35, 4, Attributes.empty())); - longCounterStorage.record( - createLong(0, 35, 5, Attributes.builder().put("key", "value2").build())); + longCounterStorage.setEpochInformation(0, 35); + longCounterStorage.record(Attributes.empty(), 4); + longCounterStorage.record(Attributes.builder().put("key", "value2").build(), 5); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -317,7 +316,8 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) { Advice.empty())); // Record measurement and collect at time 10 - longCounterStorage.record(createLong(0, 10, 3, Attributes.empty())); + longCounterStorage.setEpochInformation(0, 10); + longCounterStorage.record(Attributes.empty(), 3); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -332,9 +332,9 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) { registeredReader.setLastCollectEpochNanos(10); // Record measurement and collect at time 30 - longCounterStorage.record(createLong(0, 30, 3, Attributes.empty())); - longCounterStorage.record( - createLong(0, 30, 6, Attributes.builder().put("key", "value1").build())); + longCounterStorage.setEpochInformation(0, 30); + longCounterStorage.record(Attributes.empty(), 3); + longCounterStorage.record(Attributes.builder().put("key", "value1").build(), 6); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -355,9 +355,9 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) { registeredReader.setLastCollectEpochNanos(30); // Record measurement and collect at time 35 - longCounterStorage.record(createLong(0, 35, 4, Attributes.empty())); - longCounterStorage.record( - createLong(0, 35, 5, Attributes.builder().put("key", "value2").build())); + longCounterStorage.setEpochInformation(0, 35); + longCounterStorage.record(Attributes.empty(), 4); + longCounterStorage.record(Attributes.builder().put("key", "value2").build(), 5); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -381,9 +381,10 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) { void collect_reusableData_reusedObjectsAreReturnedOnSecondCall() { setup(REUSABLE_DATA); - longCounterStorage.record(createLong(0, 1, 1, Attributes.builder().put("key", "a").build())); - longCounterStorage.record(createLong(0, 1, 2, Attributes.builder().put("key", "b").build())); - longCounterStorage.record(createLong(0, 1, 3, Attributes.builder().put("key", "c").build())); + longCounterStorage.setEpochInformation(0, 1); + longCounterStorage.record(Attributes.builder().put("key", "a").build(), 1); + longCounterStorage.record(Attributes.builder().put("key", "b").build(), 2); + longCounterStorage.record(Attributes.builder().put("key", "c").build(), 3); MetricData firstCollectMetricData = longCounterStorage.collect(resource, scope, 0, testClock.nanoTime()); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java index a8b424dfcd5..b6bc13a858c 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java @@ -5,11 +5,11 @@ package io.opentelemetry.sdk.metrics.internal.state; -import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createDouble; -import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createLong; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -146,10 +146,12 @@ void invokeCallback_Double() { callbackRegistration.invokeCallback(registeredReader, 0, 1); assertThat(counter.get()).isEqualTo(1.1); - verify(storage1) - .record(createDouble(0, 1, 1.1, Attributes.builder().put("key", "val").build())); - verify(storage2, never()).record(any()); - verify(storage3, never()).record(any()); + verify(storage1).setEpochInformation(0, 1); + verify(storage1).record(Attributes.builder().put("key", "val").build(), 1.1); + verify(storage2, never()).setEpochInformation(anyLong(), anyLong()); + verify(storage2, never()).record(any(), anyDouble()); + verify(storage3, never()).setEpochInformation(anyLong(), anyLong()); + verify(storage3, never()).record(any(), anyDouble()); } @Test @@ -165,9 +167,12 @@ void invokeCallback_Long() { callbackRegistration.invokeCallback(registeredReader, 0, 1); assertThat(counter.get()).isEqualTo(1); - verify(storage1, never()).record(any()); - verify(storage2).record(createLong(0, 1, 1, Attributes.builder().put("key", "val").build())); - verify(storage3).record(createLong(0, 1, 1, Attributes.builder().put("key", "val").build())); + verify(storage1, never()).setEpochInformation(anyLong(), anyLong()); + verify(storage1, never()).record(any(), anyLong()); + verify(storage2).setEpochInformation(0, 1); + verify(storage2).record(Attributes.builder().put("key", "val").build(), 1); + verify(storage3).setEpochInformation(0, 1); + verify(storage3).record(Attributes.builder().put("key", "val").build(), 1); } @Test @@ -188,10 +193,12 @@ void invokeCallback_MultipleMeasurements() { assertThat(doubleCounter.get()).isEqualTo(1.1); assertThat(longCounter.get()).isEqualTo(1); - verify(storage1) - .record(createDouble(0, 1, 1.1, Attributes.builder().put("key", "val").build())); - verify(storage2).record(createLong(0, 1, 1, Attributes.builder().put("key", "val").build())); - verify(storage3).record(createLong(0, 1, 1, Attributes.builder().put("key", "val").build())); + verify(storage1).setEpochInformation(0, 1); + verify(storage1).record(Attributes.builder().put("key", "val").build(), 1.1); + verify(storage2).setEpochInformation(0, 1); + verify(storage2).record(Attributes.builder().put("key", "val").build(), 1); + verify(storage3).setEpochInformation(0, 1); + verify(storage3).record(Attributes.builder().put("key", "val").build(), 1); } @Test @@ -223,9 +230,12 @@ void invokeCallback_MultipleMeasurements_ThrowsException() { callbackRegistration.invokeCallback(registeredReader, 0, 1); - verify(storage1, never()).record(any()); - verify(storage2, never()).record(any()); - verify(storage3, never()).record(any()); + verify(storage1, never()).record(any(), anyLong()); + verify(storage1, never()).record(any(), anyDouble()); + verify(storage2, never()).record(any(), anyLong()); + verify(storage2, never()).record(any(), anyDouble()); + verify(storage3, never()).record(any(), anyLong()); + verify(storage3, never()).record(any(), anyDouble()); logs.assertContains("An exception occurred invoking callback"); } @@ -240,9 +250,12 @@ void invokeCallback_SingleMeasurement_ThrowsException() { callbackRegistration.invokeCallback(registeredReader, 0, 1); - verify(storage1, never()).record(any()); - verify(storage2, never()).record(any()); - verify(storage3, never()).record(any()); + verify(storage1, never()).record(any(), anyLong()); + verify(storage1, never()).record(any(), anyDouble()); + verify(storage2, never()).record(any(), anyLong()); + verify(storage2, never()).record(any(), anyDouble()); + verify(storage3, never()).record(any(), anyLong()); + verify(storage3, never()).record(any(), anyDouble()); logs.assertContains("An exception occurred invoking callback"); } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java index aaef2c841fe..eadc83cda51 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java @@ -6,15 +6,15 @@ package io.opentelemetry.sdk.metrics.internal.state; import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE; -import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.github.netmikey.logunit.api.LogCapturer; +import io.opentelemetry.api.common.Attributes; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.common.export.MemoryMode; @@ -28,7 +28,6 @@ import java.util.Arrays; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.mockito.ArgumentCaptor; import org.slf4j.event.Level; @SuppressWarnings("rawtypes") @@ -41,7 +40,6 @@ class SdkObservableMeasurementTest { private AsynchronousMetricStorage mockAsyncStorage1; private RegisteredReader registeredReader1; private SdkObservableMeasurement sdkObservableMeasurement; - private ArgumentCaptor measurementArgumentCaptor; @SuppressWarnings("unchecked") private void setup(MemoryMode memoryMode) { @@ -66,7 +64,6 @@ private void setup(MemoryMode memoryMode) { InMemoryMetricReader reader2 = InMemoryMetricReader.builder().setMemoryMode(memoryMode).build(); RegisteredReader registeredReader2 = RegisteredReader.create(reader2, ViewRegistry.create()); - measurementArgumentCaptor = ArgumentCaptor.forClass(Measurement.class); mockAsyncStorage1 = mock(AsynchronousMetricStorage.class); when(mockAsyncStorage1.getRegisteredReader()).thenReturn(registeredReader1); AsynchronousMetricStorage mockAsyncStorage2 = mock(AsynchronousMetricStorage.class); @@ -88,12 +85,7 @@ void recordLong_ImmutableData() { try { sdkObservableMeasurement.record(5); - verify(mockAsyncStorage1).record(measurementArgumentCaptor.capture()); - Measurement passedMeasurement = measurementArgumentCaptor.getValue(); - assertThat(passedMeasurement).isInstanceOf(ImmutableMeasurement.class); - assertThat(passedMeasurement.longValue()).isEqualTo(5); - assertThat(passedMeasurement.startEpochNanos()).isEqualTo(0); - assertThat(passedMeasurement.epochNanos()).isEqualTo(10); + verify(mockAsyncStorage1).record(Attributes.empty(), 5); } finally { sdkObservableMeasurement.unsetActiveReader(); } @@ -108,12 +100,7 @@ void recordDouble_ImmutableData() { try { sdkObservableMeasurement.record(4.3); - verify(mockAsyncStorage1).record(measurementArgumentCaptor.capture()); - Measurement passedMeasurement = measurementArgumentCaptor.getValue(); - assertThat(passedMeasurement).isInstanceOf(ImmutableMeasurement.class); - assertThat(passedMeasurement.doubleValue()).isEqualTo(4.3); - assertThat(passedMeasurement.startEpochNanos()).isEqualTo(0); - assertThat(passedMeasurement.epochNanos()).isEqualTo(10); + verify(mockAsyncStorage1).record(Attributes.empty(), 4.3); } finally { sdkObservableMeasurement.unsetActiveReader(); } @@ -127,25 +114,10 @@ void recordDouble_ReusableData() { try { sdkObservableMeasurement.record(4.3); - - verify(mockAsyncStorage1).record(measurementArgumentCaptor.capture()); - Measurement firstMeasurement = measurementArgumentCaptor.getValue(); - assertThat(firstMeasurement).isInstanceOf(MutableMeasurement.class); - assertThat(firstMeasurement.doubleValue()).isEqualTo(4.3); - assertThat(firstMeasurement.startEpochNanos()).isEqualTo(0); - assertThat(firstMeasurement.epochNanos()).isEqualTo(10); + verify(mockAsyncStorage1).record(Attributes.empty(), 4.3); sdkObservableMeasurement.record(5.3); - - verify(mockAsyncStorage1, times(2)).record(measurementArgumentCaptor.capture()); - Measurement secondMeasurement = measurementArgumentCaptor.getValue(); - assertThat(secondMeasurement).isInstanceOf(MutableMeasurement.class); - assertThat(secondMeasurement.doubleValue()).isEqualTo(5.3); - assertThat(secondMeasurement.startEpochNanos()).isEqualTo(0); - assertThat(secondMeasurement.epochNanos()).isEqualTo(10); - - // LeasedMeasurement should be re-used - assertThat(secondMeasurement).isSameAs(firstMeasurement); + verify(mockAsyncStorage1).record(Attributes.empty(), 5.3); } finally { sdkObservableMeasurement.unsetActiveReader(); } @@ -159,25 +131,10 @@ void recordLong_ReusableData() { try { sdkObservableMeasurement.record(2); - - verify(mockAsyncStorage1).record(measurementArgumentCaptor.capture()); - Measurement firstMeasurement = measurementArgumentCaptor.getValue(); - assertThat(firstMeasurement).isInstanceOf(MutableMeasurement.class); - assertThat(firstMeasurement.longValue()).isEqualTo(2); - assertThat(firstMeasurement.startEpochNanos()).isEqualTo(0); - assertThat(firstMeasurement.epochNanos()).isEqualTo(10); + verify(mockAsyncStorage1).record(Attributes.empty(), 2); sdkObservableMeasurement.record(6); - - verify(mockAsyncStorage1, times(2)).record(measurementArgumentCaptor.capture()); - Measurement secondMeasurement = measurementArgumentCaptor.getValue(); - assertThat(secondMeasurement).isInstanceOf(MutableMeasurement.class); - assertThat(secondMeasurement.longValue()).isEqualTo(6); - assertThat(secondMeasurement.startEpochNanos()).isEqualTo(0); - assertThat(secondMeasurement.epochNanos()).isEqualTo(10); - - // LeasedMeasurement should be re-used - assertThat(secondMeasurement).isSameAs(firstMeasurement); + verify(mockAsyncStorage1).record(Attributes.empty(), 6); } finally { sdkObservableMeasurement.unsetActiveReader(); } @@ -190,7 +147,7 @@ void recordDouble_NaN() { sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); sdkObservableMeasurement.record(Double.NaN); - verify(mockAsyncStorage1, never()).record(any()); + verify(mockAsyncStorage1, never()).record(any(), anyDouble()); logs.assertContains( "Instrument testCounter has recorded measurement Not-a-Number (NaN) value with attributes {}. Dropping measurement."); } From 0c637759d41f57e375f525fdc85fcea20283373b Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 9 Apr 2025 22:23:24 +0000 Subject: [PATCH 03/28] keep ref value locally --- .../sdk/metrics/internal/state/SdkObservableMeasurement.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java index b047e95de2f..e795a1b477b 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java @@ -101,6 +101,7 @@ public void record(long value) { @Override public void record(long value, Attributes attributes) { + RegisteredReader activeReader = this.activeReader; if (activeReader == null) { logNoActiveReader(); return; From ab8bd4dc24f4769c8cbeea8105239b0d3a05ae71 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 9 Apr 2025 22:47:59 +0000 Subject: [PATCH 04/28] comments --- .../metrics/internal/state/AsynchronousMetricStorage.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 24d67281af9..6efcb11ba7f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -61,15 +61,18 @@ public final class AsynchronousMetricStorage> aggregatorHandles; + // Only populated if aggregationTemporality == DELTA private Map lastPoints; - private Map> aggregatorHandles = new HashMap<>(); - // Only populated if memoryMode == REUSABLE_DATA private final ObjectPool reusablePointsPool; private final List reusableDeltaPoints = new ArrayList<>(); + // Time information relative to recording of data in aggregatorHandles, set while calling + // callbacks private long startEpochNanos; private long epochNanos; From e942dc2d898cf55347af38349f973724712576cb Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 16 Apr 2025 00:17:46 +0200 Subject: [PATCH 05/28] review comment Co-authored-by: jack-berg <34418638+jack-berg@users.noreply.github.com> --- .../sdk/metrics/internal/state/AsynchronousMetricStorage.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 6efcb11ba7f..a9aeb2146d5 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -254,7 +254,7 @@ public MetricData collect( lastPoints.forEach((attributes, value) -> reusablePointsPool.returnObject(value)); lastPoints.clear(); } - } else { + } else { /* CUMULATIVE */ result = currentPoints.values(); } this.lastPoints = currentPoints; From fa58b021e5c8106f7158951c8d17694b8a592bd5 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 16 Apr 2025 21:03:37 +0000 Subject: [PATCH 06/28] remove log statement --- .../internal/state/AsynchronousMetricStorage.java | 10 ---------- .../internal/state/AsynchronousMetricStorageTest.java | 2 -- 2 files changed, 12 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index a9aeb2146d5..48caec23fbe 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -168,16 +168,6 @@ private Attributes validateAndProcessAttributes(Attributes attributes) { return MetricStorage.CARDINALITY_OVERFLOW; } - if (aggregatorHandles.containsKey( - attributes)) { // Check there is not already a recording for the attributes - throttlingLogger.log( - Level.WARNING, - "Instrument " - + metricDescriptor.getSourceInstrument().getName() - + " has recorded multiple values for the same attributes: " - + attributes); - } - return attributes; } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java index bfc4ff97f1b..48793defa37 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java @@ -226,8 +226,6 @@ void record_DuplicateAttributes(MemoryMode memoryMode) { sum.hasPointsSatisfying( point -> point.hasValue(3).hasAttributes(attributeEntry("key1", "a"))))); - logs.assertContains( - "Instrument long-counter has recorded multiple values for the same attributes: {key1=\"a\"}"); } @ParameterizedTest From 561e479de02de01cbcfa12bd68442d35d0c03a11 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 16 Apr 2025 22:41:17 +0000 Subject: [PATCH 07/28] reusable currentPoints --- .../state/AsynchronousMetricStorage.java | 112 +++++++++++------- 1 file changed, 68 insertions(+), 44 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 48caec23fbe..ecc62379a60 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -69,7 +69,10 @@ public final class AsynchronousMetricStorage reusablePointsPool; - private final List reusableDeltaPoints = new ArrayList<>(); + private final List reusablePointsList = new ArrayList<>(); + // If aggregationTemporality == DELTA, this reference and lastPoints will be swapped at every + // collection + private Map reusablePointsMap = new PooledHashMap<>(); // Time information relative to recording of data in aggregatorHandles, set while calling // callbacks @@ -187,16 +190,34 @@ public MetricData collect( InstrumentationScopeInfo instrumentationScopeInfo, long startEpochNanos, long epochNanos) { - boolean reset = aggregationTemporality == AggregationTemporality.DELTA; - if (memoryMode == REUSABLE_DATA) { // Collect can not run concurrently for same reader, hence we safely assume // the previous collect result has been used and done with - reusableDeltaPoints.forEach(reusablePointsPool::returnObject); - reusableDeltaPoints.clear(); + reusablePointsList.forEach(reusablePointsPool::returnObject); + reusablePointsList.clear(); + + reusablePointsMap.forEach((key, value) -> reusablePointsPool.returnObject(value)); + reusablePointsMap.clear(); + } + + Collection result = + aggregationTemporality == AggregationTemporality.DELTA + ? collectWithDeltaAggregationTemporality() + : collectWithCumulativeAggregationTemporality(); + + if (memoryMode == REUSABLE_DATA) { + aggregatorHandles.clear(); + } else { + aggregatorHandles = new HashMap<>(); } - Map currentPoints = new HashMap<>(); + return aggregator.toMetricData( + resource, instrumentationScopeInfo, metricDescriptor, result, aggregationTemporality); + } + + private Collection collectWithDeltaAggregationTemporality() { + Map currentPoints = + memoryMode == REUSABLE_DATA ? reusablePointsMap : new HashMap<>(); aggregatorHandles.forEach( (attributes, handle) -> { T value = @@ -204,53 +225,56 @@ public MetricData collect( AsynchronousMetricStorage.this.startEpochNanos, AsynchronousMetricStorage.this.epochNanos, attributes, - reset); + /* reset= */ true); currentPoints.put(attributes, value); }); - if (memoryMode == REUSABLE_DATA) { - aggregatorHandles.clear(); - } else { - aggregatorHandles = new HashMap<>(); - } - - Collection result; - if (aggregationTemporality == AggregationTemporality.DELTA) { - result = memoryMode == REUSABLE_DATA ? reusableDeltaPoints : new ArrayList<>(); - currentPoints.forEach( - (attributes, currentPoint) -> { - T lastPoint = lastPoints.get(attributes); - - T deltaPoint; - if (lastPoint == null) { - deltaPoint = currentPoint; + List deltaPoints = memoryMode == REUSABLE_DATA ? reusablePointsList : new ArrayList<>(); + currentPoints.forEach( + (attributes, currentPoint) -> { + T lastPoint = lastPoints.get(attributes); + + T deltaPoint; + if (lastPoint == null) { + deltaPoint = currentPoint; + } else { + if (memoryMode == REUSABLE_DATA) { + aggregator.diffInPlace(lastPoint, currentPoint); + deltaPoint = lastPoint; + + // Remaining last points are returned to reusablePointsPool, but + // this reusable point is still used, so don't return it to pool yet + lastPoints.remove(attributes); } else { - if (memoryMode == REUSABLE_DATA) { - aggregator.diffInPlace(lastPoint, currentPoint); - deltaPoint = lastPoint; - - // Remaining last points are returned to reusablePointsPool, but - // this reusable point is still used, so don't return it to pool yet - lastPoints.remove(attributes); - } else { - deltaPoint = aggregator.diff(lastPoint, currentPoint); - } + deltaPoint = aggregator.diff(lastPoint, currentPoint); } + } - result.add(deltaPoint); - }); + deltaPoints.add(deltaPoint); + }); - if (memoryMode == REUSABLE_DATA) { - lastPoints.forEach((attributes, value) -> reusablePointsPool.returnObject(value)); - lastPoints.clear(); - } - } else { /* CUMULATIVE */ - result = currentPoints.values(); + if (memoryMode == REUSABLE_DATA) { + Map tmp = lastPoints; + lastPoints = reusablePointsMap; + reusablePointsMap = tmp; } - this.lastPoints = currentPoints; - return aggregator.toMetricData( - resource, instrumentationScopeInfo, metricDescriptor, result, aggregationTemporality); + return deltaPoints; + } + + private Collection collectWithCumulativeAggregationTemporality() { + List currentPoints = memoryMode == REUSABLE_DATA ? reusablePointsList : new ArrayList<>(); + aggregatorHandles.forEach( + (attributes, handle) -> { + T value = + handle.aggregateThenMaybeReset( + AsynchronousMetricStorage.this.startEpochNanos, + AsynchronousMetricStorage.this.epochNanos, + attributes, + /* reset= */ false); + currentPoints.add(value); + }); + return currentPoints; } @Override From 8bde0f4966de54d0f924bdc04e6b0b3cbb35c952 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 16 Apr 2025 23:26:37 +0000 Subject: [PATCH 08/28] comments and fix test --- .../metrics/internal/state/AsynchronousMetricStorage.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index ecc62379a60..9d31d0d0537 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -225,7 +225,8 @@ private Collection collectWithDeltaAggregationTemporality() { AsynchronousMetricStorage.this.startEpochNanos, AsynchronousMetricStorage.this.epochNanos, attributes, - /* reset= */ true); + // No need to reset, aggregatorHandles is going to be cleared anyways + /* reset= */ false); currentPoints.put(attributes, value); }); @@ -257,6 +258,8 @@ private Collection collectWithDeltaAggregationTemporality() { Map tmp = lastPoints; lastPoints = reusablePointsMap; reusablePointsMap = tmp; + } else { + lastPoints = currentPoints; } return deltaPoints; @@ -271,6 +274,7 @@ private Collection collectWithCumulativeAggregationTemporality() { AsynchronousMetricStorage.this.startEpochNanos, AsynchronousMetricStorage.this.epochNanos, attributes, + // No need to reset, aggregatorHandles is going to be cleared anyways /* reset= */ false); currentPoints.add(value); }); From 6e96eda52d3650a12e9b8baafd99deb919783695 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 17 Apr 2025 20:56:02 +0000 Subject: [PATCH 09/28] set epoch info once --- .../internal/state/SdkObservableMeasurement.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java index e795a1b477b..6e195fa545f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java @@ -36,8 +36,6 @@ public final class SdkObservableMeasurement // These fields are set before invoking callbacks. They allow measurements to be recorded to the // storages for correct reader, and with the correct time. @Nullable private volatile RegisteredReader activeReader; - private volatile long startEpochNanos; - private volatile long epochNanos; private SdkObservableMeasurement( InstrumentationScopeInfo instrumentationScopeInfo, @@ -75,8 +73,11 @@ public InstrumentationScopeInfo getInstrumentationScopeInfo() { public void setActiveReader( RegisteredReader registeredReader, long startEpochNanos, long epochNanos) { this.activeReader = registeredReader; - this.startEpochNanos = startEpochNanos; - this.epochNanos = epochNanos; + for (AsynchronousMetricStorage storage : storages) { + if (storage.getRegisteredReader().equals(activeReader)) { + storage.setEpochInformation(startEpochNanos, epochNanos); + } + } } /** @@ -109,7 +110,6 @@ public void record(long value, Attributes attributes) { for (AsynchronousMetricStorage storage : storages) { if (storage.getRegisteredReader().equals(activeReader)) { - storage.setEpochInformation(startEpochNanos, epochNanos); storage.record(attributes, value); } } @@ -141,7 +141,6 @@ public void record(double value, Attributes attributes) { for (AsynchronousMetricStorage storage : storages) { if (storage.getRegisteredReader().equals(activeReader)) { - storage.setEpochInformation(startEpochNanos, epochNanos); storage.record(attributes, value); } } From 4b3f8741f739b8ce6ad8db58d3b7add176acaaa0 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 17 Apr 2025 21:12:32 +0000 Subject: [PATCH 10/28] add test for setActiveReader --- .../state/SdkObservableMeasurementTest.java | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java index eadc83cda51..b21797d9756 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java @@ -11,6 +11,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import io.github.netmikey.logunit.api.LogCapturer; @@ -38,10 +39,10 @@ class SdkObservableMeasurementTest { LogCapturer.create().captureForLogger(SdkObservableMeasurement.class.getName(), Level.DEBUG); private AsynchronousMetricStorage mockAsyncStorage1; + private AsynchronousMetricStorage mockAsyncStorage2; private RegisteredReader registeredReader1; private SdkObservableMeasurement sdkObservableMeasurement; - @SuppressWarnings("unchecked") private void setup(MemoryMode memoryMode) { InstrumentationScopeInfo instrumentationScopeInfo = InstrumentationScopeInfo.builder("test-scope").build(); @@ -66,7 +67,7 @@ private void setup(MemoryMode memoryMode) { mockAsyncStorage1 = mock(AsynchronousMetricStorage.class); when(mockAsyncStorage1.getRegisteredReader()).thenReturn(registeredReader1); - AsynchronousMetricStorage mockAsyncStorage2 = mock(AsynchronousMetricStorage.class); + mockAsyncStorage2 = mock(AsynchronousMetricStorage.class); when(mockAsyncStorage2.getRegisteredReader()).thenReturn(registeredReader2); sdkObservableMeasurement = @@ -76,15 +77,28 @@ private void setup(MemoryMode memoryMode) { Arrays.asList(mockAsyncStorage1, mockAsyncStorage2)); } + void setupAndSetActiveReader(MemoryMode memoryMode) { + setup(memoryMode); + sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); + } + @Test - void recordLong_ImmutableData() { + void setActiveReader_SetsEpochInformation() { setup(MemoryMode.IMMUTABLE_DATA); sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); + verify(mockAsyncStorage1).setEpochInformation(0, 10); + verify(mockAsyncStorage2).getRegisteredReader(); + verifyNoMoreInteractions(mockAsyncStorage2); + } + + @Test + void recordLong_ImmutableData() { + setupAndSetActiveReader(MemoryMode.IMMUTABLE_DATA); + try { sdkObservableMeasurement.record(5); - verify(mockAsyncStorage1).record(Attributes.empty(), 5); } finally { sdkObservableMeasurement.unsetActiveReader(); @@ -93,13 +107,10 @@ void recordLong_ImmutableData() { @Test void recordDouble_ImmutableData() { - setup(MemoryMode.IMMUTABLE_DATA); - - sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); + setupAndSetActiveReader(MemoryMode.IMMUTABLE_DATA); try { sdkObservableMeasurement.record(4.3); - verify(mockAsyncStorage1).record(Attributes.empty(), 4.3); } finally { sdkObservableMeasurement.unsetActiveReader(); @@ -108,9 +119,7 @@ void recordDouble_ImmutableData() { @Test void recordDouble_ReusableData() { - setup(MemoryMode.REUSABLE_DATA); - - sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); + setupAndSetActiveReader(MemoryMode.REUSABLE_DATA); try { sdkObservableMeasurement.record(4.3); @@ -125,9 +134,7 @@ void recordDouble_ReusableData() { @Test void recordLong_ReusableData() { - setup(MemoryMode.REUSABLE_DATA); - - sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); + setupAndSetActiveReader(MemoryMode.REUSABLE_DATA); try { sdkObservableMeasurement.record(2); @@ -143,10 +150,9 @@ void recordLong_ReusableData() { @Test @SuppressLogger(SdkObservableMeasurement.class) void recordDouble_NaN() { - setup(MemoryMode.REUSABLE_DATA); - sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); - sdkObservableMeasurement.record(Double.NaN); + setupAndSetActiveReader(MemoryMode.REUSABLE_DATA); + sdkObservableMeasurement.record(Double.NaN); verify(mockAsyncStorage1, never()).record(any(), anyDouble()); logs.assertContains( "Instrument testCounter has recorded measurement Not-a-Number (NaN) value with attributes {}. Dropping measurement."); From 4254d30ac8b77cc305c7ac6f70ec1a8b36b9b603 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 23 Apr 2025 00:48:04 +0200 Subject: [PATCH 11/28] reduce allocation in REUSABLE_DATA mode --- .../state/AsynchronousMetricStorage.java | 89 +++++++++++++------ 1 file changed, 60 insertions(+), 29 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 9d31d0d0537..d348a594d19 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -62,7 +62,7 @@ public final class AsynchronousMetricStorage> aggregatorHandles; + private final Map> aggregatorHandles; // Only populated if aggregationTemporality == DELTA private Map lastPoints; @@ -190,25 +190,13 @@ public MetricData collect( InstrumentationScopeInfo instrumentationScopeInfo, long startEpochNanos, long epochNanos) { - if (memoryMode == REUSABLE_DATA) { - // Collect can not run concurrently for same reader, hence we safely assume - // the previous collect result has been used and done with - reusablePointsList.forEach(reusablePointsPool::returnObject); - reusablePointsList.clear(); - - reusablePointsMap.forEach((key, value) -> reusablePointsPool.returnObject(value)); - reusablePointsMap.clear(); - } - Collection result = aggregationTemporality == AggregationTemporality.DELTA ? collectWithDeltaAggregationTemporality() : collectWithCumulativeAggregationTemporality(); - if (memoryMode == REUSABLE_DATA) { + if (memoryMode != REUSABLE_DATA) { aggregatorHandles.clear(); - } else { - aggregatorHandles = new HashMap<>(); } return aggregator.toMetricData( @@ -216,18 +204,40 @@ public MetricData collect( } private Collection collectWithDeltaAggregationTemporality() { - Map currentPoints = - memoryMode == REUSABLE_DATA ? reusablePointsMap : new HashMap<>(); + Map currentPoints; + if (memoryMode == REUSABLE_DATA) { + // deltaPoints computed in the previous collection can be released + reusablePointsList.forEach(reusablePointsPool::returnObject); + reusablePointsList.clear(); + + currentPoints = reusablePointsMap; + } else { + currentPoints = new HashMap<>(); + } + aggregatorHandles.forEach( (attributes, handle) -> { - T value = + if (!handle.hasRecordedValues()) { + return; + } + + T point = handle.aggregateThenMaybeReset( AsynchronousMetricStorage.this.startEpochNanos, AsynchronousMetricStorage.this.epochNanos, attributes, - // No need to reset, aggregatorHandles is going to be cleared anyways - /* reset= */ false); - currentPoints.put(attributes, value); + /* reset= */ true); + + T pointForCurrentPoints; + if (memoryMode == REUSABLE_DATA) { + // AggregatorHandle is going to modify the point eventually, but we must persist its + // value to used it at the next collection (within lastPoints). Thus, we make a copy. + pointForCurrentPoints = reusablePointsPool.borrowObject(); + aggregator.copyPoint(point, pointForCurrentPoints); + } else { + pointForCurrentPoints = point; + } + currentPoints.put(attributes, pointForCurrentPoints); }); List deltaPoints = memoryMode == REUSABLE_DATA ? reusablePointsList : new ArrayList<>(); @@ -237,24 +247,33 @@ private Collection collectWithDeltaAggregationTemporality() { T deltaPoint; if (lastPoint == null) { - deltaPoint = currentPoint; + if (memoryMode == REUSABLE_DATA) { + // All deltaPoints are released at the end of the collection. Thus, we need a copy + // to make sure currentPoint can still be used within lastPoints during the next + // collection. + deltaPoint = reusablePointsPool.borrowObject(); + aggregator.copyPoint(currentPoint, deltaPoint); + } else { + deltaPoint = currentPoint; + } } else { if (memoryMode == REUSABLE_DATA) { aggregator.diffInPlace(lastPoint, currentPoint); deltaPoint = lastPoint; - - // Remaining last points are returned to reusablePointsPool, but - // this reusable point is still used, so don't return it to pool yet - lastPoints.remove(attributes); } else { deltaPoint = aggregator.diff(lastPoint, currentPoint); } } - deltaPoints.add(deltaPoint); }); if (memoryMode == REUSABLE_DATA) { + // lastPoints for the current collection can be discarded when the collection is completed. + // They can be returned to the pool because they're not managed by the AggregatorHandle, + // we made a copy. + lastPoints.forEach((attributes, point) -> reusablePointsPool.returnObject(point)); + lastPoints.clear(); + Map tmp = lastPoints; lastPoints = reusablePointsMap; reusablePointsMap = tmp; @@ -266,16 +285,28 @@ private Collection collectWithDeltaAggregationTemporality() { } private Collection collectWithCumulativeAggregationTemporality() { - List currentPoints = memoryMode == REUSABLE_DATA ? reusablePointsList : new ArrayList<>(); + List currentPoints; + if (memoryMode == REUSABLE_DATA) { + // We should not return the points in this list to the pool, they belong to the + // AggregatorHandle + reusablePointsList.clear(); + currentPoints = reusablePointsList; + } else { + currentPoints = new ArrayList<>(); + } + aggregatorHandles.forEach( (attributes, handle) -> { + if (!handle.hasRecordedValues()) { + return; + } + T value = handle.aggregateThenMaybeReset( AsynchronousMetricStorage.this.startEpochNanos, AsynchronousMetricStorage.this.epochNanos, attributes, - // No need to reset, aggregatorHandles is going to be cleared anyways - /* reset= */ false); + /* reset= */ true); currentPoints.add(value); }); return currentPoints; From 0dca607328215e26e26caedc424111b72886aec1 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 23 Apr 2025 00:53:50 +0200 Subject: [PATCH 12/28] handle overflow --- .../state/AsynchronousMetricStorage.java | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index d348a594d19..a6985df1842 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -161,16 +161,23 @@ private Attributes validateAndProcessAttributes(Attributes attributes) { attributes = attributesProcessor.process(attributes, context); if (aggregatorHandles.size() >= maxCardinality) { - throttlingLogger.log( - Level.WARNING, - "Instrument " - + metricDescriptor.getSourceInstrument().getName() - + " has exceeded the maximum allowed cardinality (" - + maxCardinality - + ")."); - return MetricStorage.CARDINALITY_OVERFLOW; + aggregatorHandles.forEach( + (attr, handle) -> { + if (!handle.hasRecordedValues()) { + aggregatorHandles.remove(attr); + } + }); + if (aggregatorHandles.size() >= maxCardinality) { + throttlingLogger.log( + Level.WARNING, + "Instrument " + + metricDescriptor.getSourceInstrument().getName() + + " has exceeded the maximum allowed cardinality (" + + maxCardinality + + ")."); + return MetricStorage.CARDINALITY_OVERFLOW; + } } - return attributes; } From 9dcaf3be42522d6b552aeb733fcda59c11449f86 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 23 Apr 2025 01:30:36 +0200 Subject: [PATCH 13/28] dont delete all of them --- .../state/AsynchronousMetricStorage.java | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index a6985df1842..977aacabad1 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -79,6 +80,20 @@ public final class AsynchronousMetricStorage> handlesDeleter = + new BiConsumer>() { + private boolean active = true; + + @Override + public void accept(Attributes attributes, AggregatorHandle handle) { + if (active && !handle.hasRecordedValues()) { + aggregatorHandles.remove(attributes); + active = false; + } + } + }; + private AsynchronousMetricStorage( RegisteredReader registeredReader, MetricDescriptor metricDescriptor, @@ -160,14 +175,9 @@ private Attributes validateAndProcessAttributes(Attributes attributes) { Context context = Context.current(); attributes = attributesProcessor.process(attributes, context); - if (aggregatorHandles.size() >= maxCardinality) { - aggregatorHandles.forEach( - (attr, handle) -> { - if (!handle.hasRecordedValues()) { - aggregatorHandles.remove(attr); - } - }); - if (aggregatorHandles.size() >= maxCardinality) { + if (aggregatorHandles.size() == maxCardinality) { + aggregatorHandles.forEach(handlesDeleter); + if (aggregatorHandles.size() == maxCardinality) { throttlingLogger.log( Level.WARNING, "Instrument " From 1609f5fbbfc33b627cbc2fcec35074e07b488206 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 23 Apr 2025 01:50:10 +0200 Subject: [PATCH 14/28] gt --- .../sdk/metrics/internal/state/AsynchronousMetricStorage.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 977aacabad1..276d990247d 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -175,9 +175,9 @@ private Attributes validateAndProcessAttributes(Attributes attributes) { Context context = Context.current(); attributes = attributesProcessor.process(attributes, context); - if (aggregatorHandles.size() == maxCardinality) { + if (aggregatorHandles.size() >= maxCardinality) { aggregatorHandles.forEach(handlesDeleter); - if (aggregatorHandles.size() == maxCardinality) { + if (aggregatorHandles.size() >= maxCardinality) { throttlingLogger.log( Level.WARNING, "Instrument " From 944b3efdde93582aeefa78be1070c1a0e26147e4 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 23 Apr 2025 23:32:55 +0200 Subject: [PATCH 15/28] ops --- .../state/AsynchronousMetricStorage.java | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 276d990247d..5c35907fbf0 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -81,18 +81,7 @@ public final class AsynchronousMetricStorage> handlesDeleter = - new BiConsumer>() { - private boolean active = true; - - @Override - public void accept(Attributes attributes, AggregatorHandle handle) { - if (active && !handle.hasRecordedValues()) { - aggregatorHandles.remove(attributes); - active = false; - } - } - }; + private HandlesDeleter handlesDeleter; private AsynchronousMetricStorage( RegisteredReader registeredReader, @@ -111,6 +100,7 @@ private AsynchronousMetricStorage( this.attributesProcessor = attributesProcessor; this.maxCardinality = maxCardinality - 1; this.reusablePointsPool = new ObjectPool<>(aggregator::createReusablePoint); + this.handlesDeleter = new HandlesDeleter(); if (memoryMode == REUSABLE_DATA) { this.lastPoints = new PooledHashMap<>(); @@ -177,6 +167,12 @@ private Attributes validateAndProcessAttributes(Attributes attributes) { if (aggregatorHandles.size() >= maxCardinality) { aggregatorHandles.forEach(handlesDeleter); + if (memoryMode == REUSABLE_DATA) { + handlesDeleter.reset(); + } else { + handlesDeleter = new HandlesDeleter(); + } + if (aggregatorHandles.size() >= maxCardinality) { throttlingLogger.log( Level.WARNING, @@ -333,4 +329,20 @@ private Collection collectWithCumulativeAggregationTemporality() { public boolean isEmpty() { return aggregator == Aggregator.drop(); } + + private class HandlesDeleter implements BiConsumer> { + private boolean active = true; + + @Override + public void accept(Attributes attributes, AggregatorHandle handle) { + if (active && !handle.hasRecordedValues()) { + aggregatorHandles.remove(attributes); + active = false; + } + } + + private void reset() { + active = true; + } + } } From 7115187096636dc53e1e08667fe6de4710e6d2fd Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 23 Apr 2025 23:36:25 +0200 Subject: [PATCH 16/28] reusable handles --- .../internal/state/AsynchronousMetricStorage.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 5c35907fbf0..fa61a731fa9 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -70,6 +70,8 @@ public final class AsynchronousMetricStorage reusablePointsPool; + private final ObjectPool> reusableHandlesPool; + private final List reusablePointsList = new ArrayList<>(); // If aggregationTemporality == DELTA, this reference and lastPoints will be swapped at every // collection @@ -100,6 +102,7 @@ private AsynchronousMetricStorage( this.attributesProcessor = attributesProcessor; this.maxCardinality = maxCardinality - 1; this.reusablePointsPool = new ObjectPool<>(aggregator::createReusablePoint); + this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle); this.handlesDeleter = new HandlesDeleter(); if (memoryMode == REUSABLE_DATA) { @@ -141,7 +144,7 @@ AsynchronousMetricStorage create( void record(Attributes attributes, long value) { attributes = validateAndProcessAttributes(attributes); AggregatorHandle handle = - aggregatorHandles.computeIfAbsent(attributes, key -> aggregator.createHandle()); + aggregatorHandles.computeIfAbsent(attributes, key -> reusableHandlesPool.borrowObject()); handle.recordLong(value, attributes, Context.current()); } @@ -149,7 +152,7 @@ void record(Attributes attributes, long value) { void record(Attributes attributes, double value) { attributes = validateAndProcessAttributes(attributes); AggregatorHandle handle = - aggregatorHandles.computeIfAbsent(attributes, key -> aggregator.createHandle()); + aggregatorHandles.computeIfAbsent(attributes, key -> reusableHandlesPool.borrowObject()); handle.recordDouble(value, attributes, Context.current()); } @@ -336,7 +339,8 @@ private class HandlesDeleter implements BiConsumer handle) { if (active && !handle.hasRecordedValues()) { - aggregatorHandles.remove(attributes); + AggregatorHandle aggregatorHandle = aggregatorHandles.remove(attributes); + reusableHandlesPool.returnObject(aggregatorHandle); active = false; } } From 4e566d7c64b6fbd4085c30cb156133d0ebc5c44e Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 23 Apr 2025 23:43:29 +0200 Subject: [PATCH 17/28] pool for handles --- .../state/AsynchronousMetricStorage.java | 61 ++++--------------- 1 file changed, 11 insertions(+), 50 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index fa61a731fa9..2e465aee9fd 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -34,7 +34,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.BiConsumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -82,9 +81,6 @@ public final class AsynchronousMetricStorage(aggregator::createReusablePoint); this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle); - this.handlesDeleter = new HandlesDeleter(); if (memoryMode == REUSABLE_DATA) { this.lastPoints = new PooledHashMap<>(); @@ -169,23 +164,14 @@ private Attributes validateAndProcessAttributes(Attributes attributes) { attributes = attributesProcessor.process(attributes, context); if (aggregatorHandles.size() >= maxCardinality) { - aggregatorHandles.forEach(handlesDeleter); - if (memoryMode == REUSABLE_DATA) { - handlesDeleter.reset(); - } else { - handlesDeleter = new HandlesDeleter(); - } - - if (aggregatorHandles.size() >= maxCardinality) { - throttlingLogger.log( - Level.WARNING, - "Instrument " - + metricDescriptor.getSourceInstrument().getName() - + " has exceeded the maximum allowed cardinality (" - + maxCardinality - + ")."); - return MetricStorage.CARDINALITY_OVERFLOW; - } + throttlingLogger.log( + Level.WARNING, + "Instrument " + + metricDescriptor.getSourceInstrument().getName() + + " has exceeded the maximum allowed cardinality (" + + maxCardinality + + ")."); + return MetricStorage.CARDINALITY_OVERFLOW; } return attributes; } @@ -211,9 +197,9 @@ public MetricData collect( ? collectWithDeltaAggregationTemporality() : collectWithCumulativeAggregationTemporality(); - if (memoryMode != REUSABLE_DATA) { - aggregatorHandles.clear(); - } + // collectWith*AggregationTemporality() methods are responsible for resetting the handle + aggregatorHandles.forEach((attribute, handle) -> reusableHandlesPool.returnObject(handle)); + aggregatorHandles.clear(); return aggregator.toMetricData( resource, instrumentationScopeInfo, metricDescriptor, result, aggregationTemporality); @@ -233,10 +219,6 @@ private Collection collectWithDeltaAggregationTemporality() { aggregatorHandles.forEach( (attributes, handle) -> { - if (!handle.hasRecordedValues()) { - return; - } - T point = handle.aggregateThenMaybeReset( AsynchronousMetricStorage.this.startEpochNanos, @@ -313,10 +295,6 @@ private Collection collectWithCumulativeAggregationTemporality() { aggregatorHandles.forEach( (attributes, handle) -> { - if (!handle.hasRecordedValues()) { - return; - } - T value = handle.aggregateThenMaybeReset( AsynchronousMetricStorage.this.startEpochNanos, @@ -332,21 +310,4 @@ private Collection collectWithCumulativeAggregationTemporality() { public boolean isEmpty() { return aggregator == Aggregator.drop(); } - - private class HandlesDeleter implements BiConsumer> { - private boolean active = true; - - @Override - public void accept(Attributes attributes, AggregatorHandle handle) { - if (active && !handle.hasRecordedValues()) { - AggregatorHandle aggregatorHandle = aggregatorHandles.remove(attributes); - reusableHandlesPool.returnObject(aggregatorHandle); - active = false; - } - } - - private void reset() { - active = true; - } - } } From a5d8b675cc5350b0616d36dcac08aa3898ca7c78 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 23 Apr 2025 23:43:45 +0200 Subject: [PATCH 18/28] new test --- .../state/AsynchronousMetricStorageTest.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java index 48793defa37..e33e3d71800 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java @@ -208,6 +208,37 @@ void record_MaxCardinality(MemoryMode memoryMode) { logs.assertContains("Instrument long-counter has exceeded the maximum allowed cardinality"); } + @ParameterizedTest + @EnumSource(MemoryMode.class) + void record_HandlesSpotsAreReleasedAfterCollection(MemoryMode memoryMode) { + setup(memoryMode); + + longCounterStorage.setEpochInformation(0, 1); + for (int i = 0; i < CARDINALITY_LIMIT - 1; i++) { + longCounterStorage.record(Attributes.builder().put("key" + i, "val").build(), 1); + } + + assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) + .satisfies( + metricData -> + assertThat(metricData.getLongSumData().getPoints()).hasSize(CARDINALITY_LIMIT - 1)); + logs.assertDoesNotContain( + "Instrument long-counter has exceeded the maximum allowed cardinality"); + + longCounterStorage.setEpochInformation(1, 2); + for (int i = 0; i < CARDINALITY_LIMIT - 1; i++) { + // Different attribute + longCounterStorage.record(Attributes.builder().put("key" + i, "val2").build(), 1); + } + + assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) + .satisfies( + metricData -> + assertThat(metricData.getLongSumData().getPoints()).hasSize(CARDINALITY_LIMIT - 1)); + logs.assertDoesNotContain( + "Instrument long-counter has exceeded the maximum allowed cardinality"); + } + @ParameterizedTest @EnumSource(MemoryMode.class) void record_DuplicateAttributes(MemoryMode memoryMode) { From f83157268cbd3e76c3eac66099e332b0a2d74c9b Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 24 Apr 2025 00:22:22 +0200 Subject: [PATCH 19/28] save some alloc --- .../internal/state/AsynchronousMetricStorage.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 2e465aee9fd..d5bc73f093a 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -34,6 +34,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; @@ -70,6 +72,8 @@ public final class AsynchronousMetricStorage reusablePointsPool; private final ObjectPool> reusableHandlesPool; + private final Function> handleBuilder; + private final BiConsumer> handleReleaser; private final List reusablePointsList = new ArrayList<>(); // If aggregationTemporality == DELTA, this reference and lastPoints will be swapped at every @@ -99,6 +103,8 @@ private AsynchronousMetricStorage( this.maxCardinality = maxCardinality - 1; this.reusablePointsPool = new ObjectPool<>(aggregator::createReusablePoint); this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle); + this.handleBuilder = ignored -> reusableHandlesPool.borrowObject(); + this.handleReleaser = (ignored, handle) -> reusableHandlesPool.returnObject(handle); if (memoryMode == REUSABLE_DATA) { this.lastPoints = new PooledHashMap<>(); @@ -138,16 +144,14 @@ AsynchronousMetricStorage create( /** Record callback measurement from {@link ObservableLongMeasurement}. */ void record(Attributes attributes, long value) { attributes = validateAndProcessAttributes(attributes); - AggregatorHandle handle = - aggregatorHandles.computeIfAbsent(attributes, key -> reusableHandlesPool.borrowObject()); + AggregatorHandle handle = aggregatorHandles.computeIfAbsent(attributes, handleBuilder); handle.recordLong(value, attributes, Context.current()); } /** Record callback measurement from {@link ObservableDoubleMeasurement}. */ void record(Attributes attributes, double value) { attributes = validateAndProcessAttributes(attributes); - AggregatorHandle handle = - aggregatorHandles.computeIfAbsent(attributes, key -> reusableHandlesPool.borrowObject()); + AggregatorHandle handle = aggregatorHandles.computeIfAbsent(attributes, handleBuilder); handle.recordDouble(value, attributes, Context.current()); } @@ -198,7 +202,7 @@ public MetricData collect( : collectWithCumulativeAggregationTemporality(); // collectWith*AggregationTemporality() methods are responsible for resetting the handle - aggregatorHandles.forEach((attribute, handle) -> reusableHandlesPool.returnObject(handle)); + aggregatorHandles.forEach(handleReleaser); aggregatorHandles.clear(); return aggregator.toMetricData( From 1bc1d7832168be6842e9035a7afd20653115d06e Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 24 Apr 2025 00:48:11 +0200 Subject: [PATCH 20/28] process only when needed --- .../metrics/internal/state/AsynchronousMetricStorage.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index d5bc73f093a..e7c41e399ee 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -164,9 +164,6 @@ void setEpochInformation(long startEpochNanos, long epochNanos) { } private Attributes validateAndProcessAttributes(Attributes attributes) { - Context context = Context.current(); - attributes = attributesProcessor.process(attributes, context); - if (aggregatorHandles.size() >= maxCardinality) { throttlingLogger.log( Level.WARNING, @@ -177,6 +174,9 @@ private Attributes validateAndProcessAttributes(Attributes attributes) { + ")."); return MetricStorage.CARDINALITY_OVERFLOW; } + + Context context = Context.current(); + attributes = attributesProcessor.process(attributes, context); return attributes; } From 63f2471e4acf2dc543ed887411277f2a3644aa14 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 24 Apr 2025 00:54:00 +0200 Subject: [PATCH 21/28] one less lambda --- .../sdk/metrics/internal/state/AsynchronousMetricStorage.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index e7c41e399ee..e3b4f7f3f5a 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -74,6 +74,7 @@ public final class AsynchronousMetricStorage> reusableHandlesPool; private final Function> handleBuilder; private final BiConsumer> handleReleaser; + private final BiConsumer pointReleaser; private final List reusablePointsList = new ArrayList<>(); // If aggregationTemporality == DELTA, this reference and lastPoints will be swapped at every @@ -105,6 +106,7 @@ private AsynchronousMetricStorage( this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle); this.handleBuilder = ignored -> reusableHandlesPool.borrowObject(); this.handleReleaser = (ignored, handle) -> reusableHandlesPool.returnObject(handle); + this.pointReleaser = (ignored, point) -> reusablePointsPool.returnObject(point); if (memoryMode == REUSABLE_DATA) { this.lastPoints = new PooledHashMap<>(); @@ -273,7 +275,7 @@ private Collection collectWithDeltaAggregationTemporality() { // lastPoints for the current collection can be discarded when the collection is completed. // They can be returned to the pool because they're not managed by the AggregatorHandle, // we made a copy. - lastPoints.forEach((attributes, point) -> reusablePointsPool.returnObject(point)); + lastPoints.forEach(pointReleaser); lastPoints.clear(); Map tmp = lastPoints; From 29f93c51099824865bcf5cf9878afa3b6c5a87f9 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 24 Apr 2025 01:23:34 +0200 Subject: [PATCH 22/28] reduce allocs --- .../metrics/internal/state/AsynchronousMetricStorage.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index e3b4f7f3f5a..9a915f59031 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -247,7 +247,7 @@ private Collection collectWithDeltaAggregationTemporality() { List deltaPoints = memoryMode == REUSABLE_DATA ? reusablePointsList : new ArrayList<>(); currentPoints.forEach( (attributes, currentPoint) -> { - T lastPoint = lastPoints.get(attributes); + T lastPoint = lastPoints.remove(attributes); T deltaPoint; if (lastPoint == null) { @@ -272,9 +272,9 @@ private Collection collectWithDeltaAggregationTemporality() { }); if (memoryMode == REUSABLE_DATA) { - // lastPoints for the current collection can be discarded when the collection is completed. - // They can be returned to the pool because they're not managed by the AggregatorHandle, - // we made a copy. + // - If the point was used to compute a delta, it's now in deltaPoints (and thus in + // reusablePointsList) + // - If the point hasn't been used, it's still in lastPoints and can be returned lastPoints.forEach(pointReleaser); lastPoints.clear(); From cbe07843f8edf684a70ed934f22c99f1c96c5165 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 24 Apr 2025 08:52:06 +0000 Subject: [PATCH 23/28] no useless allocs --- .../aggregator/DoubleLastValueAggregator.java | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java index 2063cb212d7..0f275a33230 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java @@ -22,8 +22,6 @@ import io.opentelemetry.sdk.resources.Resource; import java.util.Collection; import java.util.List; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -114,8 +112,8 @@ public MetricData toMetricData( } static final class Handle extends AggregatorHandle { - @Nullable private static final Double DEFAULT_VALUE = null; - private final AtomicReference current = new AtomicReference<>(DEFAULT_VALUE); + private volatile boolean set = false; + private volatile double current = 0; // Only used when memoryMode is REUSABLE_DATA @Nullable private final MutableDoublePointData reusablePoint; @@ -136,20 +134,31 @@ protected DoublePointData doAggregateThenMaybeReset( Attributes attributes, List exemplars, boolean reset) { - Double value = reset ? this.current.getAndSet(DEFAULT_VALUE) : this.current.get(); + double currentLocal = current; + if (!set) { + throw new NullPointerException(); + } + if (reset) { + set = false; + } + + DoublePointData output; if (reusablePoint != null) { - reusablePoint.set( - startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars); - return reusablePoint; + reusablePoint.set(startEpochNanos, epochNanos, attributes, currentLocal, exemplars); + output = reusablePoint; } else { - return ImmutableDoublePointData.create( - startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars); + output = + ImmutableDoublePointData.create( + startEpochNanos, epochNanos, attributes, currentLocal, exemplars); } + + return output; } @Override protected void doRecordDouble(double value) { - current.set(value); + current = value; + set = true; } } } From 4613e7c682496e0842af89e3d53638d369f8fd14 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 24 Apr 2025 10:24:57 +0000 Subject: [PATCH 24/28] new test --- .../aggregator/DoubleLastValueAggregatorTest.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java index b28a8e4879c..593d0da94a1 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java @@ -6,6 +6,7 @@ package io.opentelemetry.sdk.metrics.internal.aggregator; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; @@ -276,4 +277,13 @@ void testReusableDataOnCollect() { assertThat(pointData).isSameAs(pointDataWithReset); } + + @Test + void testNullPointerExceptionWhenUnset() { + init(MemoryMode.REUSABLE_DATA); + AggregatorHandle handle = aggregator.createHandle(); + assertThatThrownBy( + () -> handle.aggregateThenMaybeReset(0, 10, Attributes.empty(), /* reset= */ true)) + .isInstanceOf(NullPointerException.class); + } } From 1371683f82618fe0b922aa631bf315d5dcabd4e8 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 24 Apr 2025 20:26:55 +0200 Subject: [PATCH 25/28] fix impl --- .../aggregator/DoubleLastValueAggregator.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java index 0f275a33230..8735ef1f8b9 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java @@ -22,6 +22,7 @@ import io.opentelemetry.sdk.resources.Resource; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -112,7 +113,7 @@ public MetricData toMetricData( } static final class Handle extends AggregatorHandle { - private volatile boolean set = false; + private final AtomicBoolean set = new AtomicBoolean(false); private volatile double current = 0; // Only used when memoryMode is REUSABLE_DATA @@ -135,12 +136,9 @@ protected DoublePointData doAggregateThenMaybeReset( List exemplars, boolean reset) { double currentLocal = current; - if (!set) { + if ((reset && !set.compareAndSet(true, false)) || (!reset && !set.get())) { throw new NullPointerException(); } - if (reset) { - set = false; - } DoublePointData output; if (reusablePoint != null) { @@ -151,14 +149,13 @@ protected DoublePointData doAggregateThenMaybeReset( ImmutableDoublePointData.create( startEpochNanos, epochNanos, attributes, currentLocal, exemplars); } - return output; } @Override protected void doRecordDouble(double value) { current = value; - set = true; + set.set(true); } } } From 1f9a4b535f1f12d7ba0a466792764d023c15bd7a Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Fri, 25 Apr 2025 00:48:01 +0200 Subject: [PATCH 26/28] atomicreference+atomiclong --- .../aggregator/DoubleLastValueAggregator.java | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java index 8735ef1f8b9..1f2cb11491d 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java @@ -22,7 +22,9 @@ import io.opentelemetry.sdk.resources.Resource; import java.util.Collection; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -113,8 +115,8 @@ public MetricData toMetricData( } static final class Handle extends AggregatorHandle { - private final AtomicBoolean set = new AtomicBoolean(false); - private volatile double current = 0; + private final AtomicReference current = new AtomicReference<>(null); + private final AtomicLong valueBits = new AtomicLong(); // Only used when memoryMode is REUSABLE_DATA @Nullable private final MutableDoublePointData reusablePoint; @@ -135,27 +137,22 @@ protected DoublePointData doAggregateThenMaybeReset( Attributes attributes, List exemplars, boolean reset) { - double currentLocal = current; - if ((reset && !set.compareAndSet(true, false)) || (!reset && !set.get())) { - throw new NullPointerException(); - } - - DoublePointData output; + AtomicLong valueBits = + Objects.requireNonNull(reset ? this.current.getAndSet(null) : this.current.get()); + double value = Double.longBitsToDouble(valueBits.get()); if (reusablePoint != null) { - reusablePoint.set(startEpochNanos, epochNanos, attributes, currentLocal, exemplars); - output = reusablePoint; + reusablePoint.set(startEpochNanos, epochNanos, attributes, value, exemplars); + return reusablePoint; } else { - output = - ImmutableDoublePointData.create( - startEpochNanos, epochNanos, attributes, currentLocal, exemplars); + return ImmutableDoublePointData.create( + startEpochNanos, epochNanos, attributes, value, exemplars); } - return output; } @Override protected void doRecordDouble(double value) { - current = value; - set.set(true); + valueBits.set(Double.doubleToLongBits(value)); + current.compareAndSet(null, valueBits); } } } From 53ad5ddbf8062d352b8786e2e72a6c76960221a7 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Sun, 27 Apr 2025 12:57:33 +0200 Subject: [PATCH 27/28] trigger From 44b982544100a86b68d43a6d0c1891f0a09b3743 Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Thu, 1 May 2025 14:16:52 -0500 Subject: [PATCH 28/28] Delete unused Measurement and related code --- .../internal/aggregator/Aggregator.java | 19 --- .../aggregator/DoubleLastValueAggregator.java | 20 --- .../aggregator/DoubleSumAggregator.java | 20 --- .../aggregator/LongLastValueAggregator.java | 20 --- .../aggregator/LongSumAggregator.java | 20 --- .../state/AsynchronousMetricStorage.java | 5 +- .../internal/state/ImmutableMeasurement.java | 64 --------- .../metrics/internal/state/Measurement.java | 53 -------- .../internal/state/MutableMeasurement.java | 125 ------------------ 9 files changed, 1 insertion(+), 345 deletions(-) delete mode 100644 sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ImmutableMeasurement.java delete mode 100644 sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/Measurement.java delete mode 100644 sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MutableMeasurement.java diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java index e44eb2de33e..db167bab5b4 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java @@ -13,7 +13,6 @@ import io.opentelemetry.sdk.metrics.data.MetricDataType; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; -import io.opentelemetry.sdk.metrics.internal.state.Measurement; import io.opentelemetry.sdk.resources.Resource; import java.util.Collection; import javax.annotation.concurrent.Immutable; @@ -68,24 +67,6 @@ default void diffInPlace(T previousCumulativeReusable, T currentCumulative) { throw new UnsupportedOperationException("This aggregator does not support diffInPlace."); } - /** - * Return a new point representing the measurement. - * - *

Aggregators MUST implement diff if it can be used with asynchronous instruments. - */ - default T toPoint(Measurement measurement) { - throw new UnsupportedOperationException("This aggregator does not support toPoint."); - } - - /** - * Resets {@code reusablePoint} to represent the {@code measurement}. - * - *

Aggregators MUST implement diff if it can be used with asynchronous instruments. - */ - default void toPoint(Measurement measurement, T reusablePoint) { - throw new UnsupportedOperationException("This aggregator does not support toPoint."); - } - /** Creates a new reusable point. */ default T createReusablePoint() { throw new UnsupportedOperationException( diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java index 1f2cb11491d..d1be6c83917 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java @@ -18,7 +18,6 @@ import io.opentelemetry.sdk.metrics.internal.data.MutableDoublePointData; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; -import io.opentelemetry.sdk.metrics.internal.state.Measurement; import io.opentelemetry.sdk.resources.Resource; import java.util.Collection; import java.util.List; @@ -67,25 +66,6 @@ public void diffInPlace(DoublePointData previousReusable, DoublePointData curren ((MutableDoublePointData) previousReusable).set(current); } - @Override - public DoublePointData toPoint(Measurement measurement) { - return ImmutableDoublePointData.create( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.doubleValue()); - } - - @Override - public void toPoint(Measurement measurement, DoublePointData reusablePoint) { - ((MutableDoublePointData) reusablePoint) - .set( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.doubleValue()); - } - @Override public DoublePointData createReusablePoint() { return new MutableDoublePointData(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java index 17f02f85208..c0701d7e0ec 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java @@ -21,7 +21,6 @@ import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; -import io.opentelemetry.sdk.metrics.internal.state.Measurement; import io.opentelemetry.sdk.resources.Resource; import java.util.Collection; import java.util.List; @@ -82,25 +81,6 @@ public void diffInPlace(DoublePointData previousReusablePoint, DoublePointData c currentPoint.getExemplars()); } - @Override - public DoublePointData toPoint(Measurement measurement) { - return ImmutableDoublePointData.create( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.doubleValue()); - } - - @Override - public void toPoint(Measurement measurement, DoublePointData reusablePoint) { - ((MutableDoublePointData) reusablePoint) - .set( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.doubleValue()); - } - @Override public DoublePointData createReusablePoint() { return new MutableDoublePointData(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java index 27422733c43..e5a8d67682d 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java @@ -18,7 +18,6 @@ import io.opentelemetry.sdk.metrics.internal.data.MutableLongPointData; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; -import io.opentelemetry.sdk.metrics.internal.state.Measurement; import io.opentelemetry.sdk.resources.Resource; import java.util.Collection; import java.util.List; @@ -63,25 +62,6 @@ public void diffInPlace(LongPointData previousReusablePoint, LongPointData curre ((MutableLongPointData) previousReusablePoint).set(currentPoint); } - @Override - public LongPointData toPoint(Measurement measurement) { - return ImmutableLongPointData.create( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.longValue()); - } - - @Override - public void toPoint(Measurement measurement, LongPointData reusablePoint) { - ((MutableLongPointData) reusablePoint) - .set( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.longValue()); - } - @Override public LongPointData createReusablePoint() { return new MutableLongPointData(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java index cfd23c0cf97..e728ff28bb4 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java @@ -21,7 +21,6 @@ import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; -import io.opentelemetry.sdk.metrics.internal.state.Measurement; import io.opentelemetry.sdk.resources.Resource; import java.util.Collection; import java.util.List; @@ -75,25 +74,6 @@ public void diffInPlace(LongPointData previousReusablePoint, LongPointData curre currentPoint.getExemplars()); } - @Override - public LongPointData toPoint(Measurement measurement) { - return ImmutableLongPointData.create( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.longValue()); - } - - @Override - public void toPoint(Measurement measurement, LongPointData reusablePoint) { - ((MutableLongPointData) reusablePoint) - .set( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.longValue()); - } - @Override public LongPointData createReusablePoint() { return new MutableLongPointData(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 9a915f59031..ad68fdd3ec7 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -227,10 +227,7 @@ private Collection collectWithDeltaAggregationTemporality() { (attributes, handle) -> { T point = handle.aggregateThenMaybeReset( - AsynchronousMetricStorage.this.startEpochNanos, - AsynchronousMetricStorage.this.epochNanos, - attributes, - /* reset= */ true); + this.startEpochNanos, this.epochNanos, attributes, /* reset= */ true); T pointForCurrentPoints; if (memoryMode == REUSABLE_DATA) { diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ImmutableMeasurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ImmutableMeasurement.java deleted file mode 100644 index 9cac89e96e9..00000000000 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ImmutableMeasurement.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.metrics.internal.state; - -import com.google.auto.value.AutoValue; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; -import io.opentelemetry.api.metrics.ObservableLongMeasurement; - -/** - * A long or double measurement recorded from {@link ObservableLongMeasurement} or {@link - * ObservableDoubleMeasurement}. - * - *

This class is internal and is hence not for public use. Its APIs are unstable and can change - * at any time. - */ -@AutoValue -public abstract class ImmutableMeasurement implements Measurement { - - static ImmutableMeasurement createDouble( - long startEpochNanos, long epochNanos, double value, Attributes attributes) { - return new AutoValue_ImmutableMeasurement( - startEpochNanos, - epochNanos, - /* hasLongValue= */ false, - 0L, - /* hasDoubleValue= */ true, - value, - attributes); - } - - static ImmutableMeasurement createLong( - long startEpochNanos, long epochNanos, long value, Attributes attributes) { - return new AutoValue_ImmutableMeasurement( - startEpochNanos, - epochNanos, - /* hasLongValue= */ true, - value, - /* hasDoubleValue= */ false, - 0.0, - attributes); - } - - @Override - public Measurement withAttributes(Attributes attributes) { - if (hasDoubleValue()) { - return createDouble(startEpochNanos(), epochNanos(), doubleValue(), attributes); - } else { - return createLong(startEpochNanos(), epochNanos(), longValue(), attributes); - } - } - - @Override - public Measurement withStartEpochNanos(long startEpochNanos) { - if (hasDoubleValue()) { - return createDouble(startEpochNanos, epochNanos(), doubleValue(), attributes()); - } else { - return createLong(startEpochNanos, epochNanos(), longValue(), attributes()); - } - } -} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/Measurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/Measurement.java deleted file mode 100644 index a5023995dad..00000000000 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/Measurement.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.metrics.internal.state; - -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; -import io.opentelemetry.api.metrics.ObservableLongMeasurement; - -/** - * A long or double measurement recorded from {@link ObservableLongMeasurement} or {@link - * ObservableDoubleMeasurement}. - * - *

This class is internal and is hence not for public use. Its APIs are unstable and can change - * at any time. - */ -public interface Measurement { - long startEpochNanos(); - - long epochNanos(); - - boolean hasLongValue(); - - long longValue(); - - boolean hasDoubleValue(); - - double doubleValue(); - - Attributes attributes(); - - /** - * Updates the attributes. - * - * @param attributes The attributes to update - * @return The updated object. For {@link ImmutableMeasurement} it will be a new object with the - * updated attributes and for {@link MutableMeasurement} it will return itself with the - * attributes updated - */ - Measurement withAttributes(Attributes attributes); - - /** - * Updates the startEpochNanos. - * - * @param startEpochNanos start epoch nanosecond - * @return The updated object. For {@link ImmutableMeasurement} it will be a new object with the - * updated startEpochNanos and for {@link MutableMeasurement} it will return itself with the - * startEpochNanos updated - */ - Measurement withStartEpochNanos(long startEpochNanos); -} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MutableMeasurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MutableMeasurement.java deleted file mode 100644 index 7bac8202447..00000000000 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MutableMeasurement.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.metrics.internal.state; - -import io.opentelemetry.api.common.Attributes; - -/** - * A mutable {@link Measurement} implementation - * - *

This class is internal and is hence not for public use. Its APIs are unstable and can change - * at any time. - * - *

This class is not thread-safe. - */ -public final class MutableMeasurement implements Measurement { - - static void setDoubleMeasurement( - MutableMeasurement mutableMeasurement, - long startEpochNanos, - long epochNanos, - double value, - Attributes attributes) { - mutableMeasurement.set( - startEpochNanos, - epochNanos, - /* hasLongValue= */ false, - 0L, - /* hasDoubleValue= */ true, - value, - attributes); - } - - static void setLongMeasurement( - MutableMeasurement mutableMeasurement, - long startEpochNanos, - long epochNanos, - long value, - Attributes attributes) { - mutableMeasurement.set( - startEpochNanos, - epochNanos, - /* hasLongValue= */ true, - value, - /* hasDoubleValue= */ false, - 0.0, - attributes); - } - - private long startEpochNanos; - private long epochNanos; - private boolean hasLongValue; - private long longValue; - private boolean hasDoubleValue; - private double doubleValue; - - private Attributes attributes = Attributes.empty(); - - /** Sets the values. */ - private void set( - long startEpochNanos, - long epochNanos, - boolean hasLongValue, - long longValue, - boolean hasDoubleValue, - double doubleValue, - Attributes attributes) { - this.startEpochNanos = startEpochNanos; - this.epochNanos = epochNanos; - this.hasLongValue = hasLongValue; - this.longValue = longValue; - this.hasDoubleValue = hasDoubleValue; - this.doubleValue = doubleValue; - this.attributes = attributes; - } - - @Override - public Measurement withStartEpochNanos(long startEpochNanos) { - this.startEpochNanos = startEpochNanos; - return this; - } - - @Override - public Measurement withAttributes(Attributes attributes) { - this.attributes = attributes; - return this; - } - - @Override - public long startEpochNanos() { - return startEpochNanos; - } - - @Override - public long epochNanos() { - return epochNanos; - } - - @Override - public boolean hasLongValue() { - return hasLongValue; - } - - @Override - public long longValue() { - return longValue; - } - - @Override - public boolean hasDoubleValue() { - return hasDoubleValue; - } - - @Override - public double doubleValue() { - return doubleValue; - } - - @Override - public Attributes attributes() { - return attributes; - } -}