diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java index e44eb2de33e..db167bab5b4 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java @@ -13,7 +13,6 @@ import io.opentelemetry.sdk.metrics.data.MetricDataType; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; -import io.opentelemetry.sdk.metrics.internal.state.Measurement; import io.opentelemetry.sdk.resources.Resource; import java.util.Collection; import javax.annotation.concurrent.Immutable; @@ -68,24 +67,6 @@ default void diffInPlace(T previousCumulativeReusable, T currentCumulative) { throw new UnsupportedOperationException("This aggregator does not support diffInPlace."); } - /** - * Return a new point representing the measurement. - * - *

Aggregators MUST implement diff if it can be used with asynchronous instruments. - */ - default T toPoint(Measurement measurement) { - throw new UnsupportedOperationException("This aggregator does not support toPoint."); - } - - /** - * Resets {@code reusablePoint} to represent the {@code measurement}. - * - *

Aggregators MUST implement diff if it can be used with asynchronous instruments. - */ - default void toPoint(Measurement measurement, T reusablePoint) { - throw new UnsupportedOperationException("This aggregator does not support toPoint."); - } - /** Creates a new reusable point. */ default T createReusablePoint() { throw new UnsupportedOperationException( diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java index 2063cb212d7..d1be6c83917 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java @@ -18,11 +18,11 @@ import io.opentelemetry.sdk.metrics.internal.data.MutableDoublePointData; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; -import io.opentelemetry.sdk.metrics.internal.state.Measurement; import io.opentelemetry.sdk.resources.Resource; import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import javax.annotation.Nullable; @@ -66,25 +66,6 @@ public void diffInPlace(DoublePointData previousReusable, DoublePointData curren ((MutableDoublePointData) previousReusable).set(current); } - @Override - public DoublePointData toPoint(Measurement measurement) { - return ImmutableDoublePointData.create( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.doubleValue()); - } - - @Override - public void toPoint(Measurement measurement, DoublePointData reusablePoint) { - ((MutableDoublePointData) reusablePoint) - .set( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.doubleValue()); - } - @Override public DoublePointData createReusablePoint() { return new MutableDoublePointData(); @@ -114,8 +95,8 @@ public MetricData toMetricData( } static final class Handle extends AggregatorHandle { - @Nullable private static final Double DEFAULT_VALUE = null; - private final AtomicReference current = new AtomicReference<>(DEFAULT_VALUE); + private final AtomicReference current = new AtomicReference<>(null); + private final AtomicLong valueBits = new AtomicLong(); // Only used when memoryMode is REUSABLE_DATA @Nullable private final MutableDoublePointData reusablePoint; @@ -136,20 +117,22 @@ protected DoublePointData doAggregateThenMaybeReset( Attributes attributes, List exemplars, boolean reset) { - Double value = reset ? this.current.getAndSet(DEFAULT_VALUE) : this.current.get(); + AtomicLong valueBits = + Objects.requireNonNull(reset ? this.current.getAndSet(null) : this.current.get()); + double value = Double.longBitsToDouble(valueBits.get()); if (reusablePoint != null) { - reusablePoint.set( - startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars); + reusablePoint.set(startEpochNanos, epochNanos, attributes, value, exemplars); return reusablePoint; } else { return ImmutableDoublePointData.create( - startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars); + startEpochNanos, epochNanos, attributes, value, exemplars); } } @Override protected void doRecordDouble(double value) { - current.set(value); + valueBits.set(Double.doubleToLongBits(value)); + current.compareAndSet(null, valueBits); } } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java index 17f02f85208..c0701d7e0ec 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java @@ -21,7 +21,6 @@ import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; -import io.opentelemetry.sdk.metrics.internal.state.Measurement; import io.opentelemetry.sdk.resources.Resource; import java.util.Collection; import java.util.List; @@ -82,25 +81,6 @@ public void diffInPlace(DoublePointData previousReusablePoint, DoublePointData c currentPoint.getExemplars()); } - @Override - public DoublePointData toPoint(Measurement measurement) { - return ImmutableDoublePointData.create( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.doubleValue()); - } - - @Override - public void toPoint(Measurement measurement, DoublePointData reusablePoint) { - ((MutableDoublePointData) reusablePoint) - .set( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.doubleValue()); - } - @Override public DoublePointData createReusablePoint() { return new MutableDoublePointData(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java index 27422733c43..e5a8d67682d 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java @@ -18,7 +18,6 @@ import io.opentelemetry.sdk.metrics.internal.data.MutableLongPointData; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; -import io.opentelemetry.sdk.metrics.internal.state.Measurement; import io.opentelemetry.sdk.resources.Resource; import java.util.Collection; import java.util.List; @@ -63,25 +62,6 @@ public void diffInPlace(LongPointData previousReusablePoint, LongPointData curre ((MutableLongPointData) previousReusablePoint).set(currentPoint); } - @Override - public LongPointData toPoint(Measurement measurement) { - return ImmutableLongPointData.create( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.longValue()); - } - - @Override - public void toPoint(Measurement measurement, LongPointData reusablePoint) { - ((MutableLongPointData) reusablePoint) - .set( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.longValue()); - } - @Override public LongPointData createReusablePoint() { return new MutableLongPointData(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java index cfd23c0cf97..e728ff28bb4 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java @@ -21,7 +21,6 @@ import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; -import io.opentelemetry.sdk.metrics.internal.state.Measurement; import io.opentelemetry.sdk.resources.Resource; import java.util.Collection; import java.util.List; @@ -75,25 +74,6 @@ public void diffInPlace(LongPointData previousReusablePoint, LongPointData curre currentPoint.getExemplars()); } - @Override - public LongPointData toPoint(Measurement measurement) { - return ImmutableLongPointData.create( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.longValue()); - } - - @Override - public void toPoint(Measurement measurement, LongPointData reusablePoint) { - ((MutableLongPointData) reusablePoint) - .set( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.attributes(), - measurement.longValue()); - } - @Override public LongPointData createReusablePoint() { return new MutableLongPointData(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 42110e2cb1e..ad68fdd3ec7 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -21,6 +21,7 @@ import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; +import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter; @@ -31,7 +32,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; @@ -51,6 +55,7 @@ public final class AsynchronousMetricStorage aggregator; private final AttributesProcessor attributesProcessor; + private final MemoryMode memoryMode; /** * This field is set to 1 less than the actual intended cardinality limit, allowing the last slot @@ -58,18 +63,28 @@ public final class AsynchronousMetricStorage points; + // Handles responsible for aggregating data recorded during callbacks + private final Map> aggregatorHandles; // Only populated if aggregationTemporality == DELTA private Map lastPoints; // Only populated if memoryMode == REUSABLE_DATA private final ObjectPool reusablePointsPool; + private final ObjectPool> reusableHandlesPool; + private final Function> handleBuilder; + private final BiConsumer> handleReleaser; + private final BiConsumer pointReleaser; - // Only populated if memoryMode == REUSABLE_DATA - private final ArrayList reusableResultList = new ArrayList<>(); + private final List reusablePointsList = new ArrayList<>(); + // If aggregationTemporality == DELTA, this reference and lastPoints will be swapped at every + // collection + private Map reusablePointsMap = new PooledHashMap<>(); - private final MemoryMode memoryMode; + // Time information relative to recording of data in aggregatorHandles, set while calling + // callbacks + private long startEpochNanos; + private long epochNanos; private AsynchronousMetricStorage( RegisteredReader registeredReader, @@ -88,12 +103,17 @@ private AsynchronousMetricStorage( this.attributesProcessor = attributesProcessor; this.maxCardinality = maxCardinality - 1; this.reusablePointsPool = new ObjectPool<>(aggregator::createReusablePoint); + this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle); + this.handleBuilder = ignored -> reusableHandlesPool.borrowObject(); + this.handleReleaser = (ignored, handle) -> reusableHandlesPool.returnObject(handle); + this.pointReleaser = (ignored, point) -> reusablePointsPool.returnObject(point); + if (memoryMode == REUSABLE_DATA) { - lastPoints = new PooledHashMap<>(); - points = new PooledHashMap<>(); + this.lastPoints = new PooledHashMap<>(); + this.aggregatorHandles = new PooledHashMap<>(); } else { - lastPoints = new HashMap<>(); - points = new HashMap<>(); + this.lastPoints = new HashMap<>(); + this.aggregatorHandles = new HashMap<>(); } } @@ -123,25 +143,30 @@ AsynchronousMetricStorage create( registeredView.getCardinalityLimit()); } - /** - * Record callback measurement from {@link ObservableLongMeasurement} or {@link - * ObservableDoubleMeasurement}. - */ - void record(Measurement measurement) { - Context context = Context.current(); - Attributes processedAttributes = attributesProcessor.process(measurement.attributes(), context); - long start = - aggregationTemporality == AggregationTemporality.DELTA - ? registeredReader.getLastCollectEpochNanos() - : measurement.startEpochNanos(); + /** Record callback measurement from {@link ObservableLongMeasurement}. */ + void record(Attributes attributes, long value) { + attributes = validateAndProcessAttributes(attributes); + AggregatorHandle handle = aggregatorHandles.computeIfAbsent(attributes, handleBuilder); + handle.recordLong(value, attributes, Context.current()); + } - measurement = measurement.withAttributes(processedAttributes).withStartEpochNanos(start); + /** Record callback measurement from {@link ObservableDoubleMeasurement}. */ + void record(Attributes attributes, double value) { + attributes = validateAndProcessAttributes(attributes); + AggregatorHandle handle = aggregatorHandles.computeIfAbsent(attributes, handleBuilder); + handle.recordDouble(value, attributes, Context.current()); + } - recordPoint(processedAttributes, measurement); + void setEpochInformation(long startEpochNanos, long epochNanos) { + this.startEpochNanos = + aggregationTemporality == AggregationTemporality.DELTA + ? registeredReader.getLastCollectEpochNanos() + : startEpochNanos; + this.epochNanos = epochNanos; } - private void recordPoint(Attributes attributes, Measurement measurement) { - if (points.size() >= maxCardinality) { + private Attributes validateAndProcessAttributes(Attributes attributes) { + if (aggregatorHandles.size() >= maxCardinality) { throttlingLogger.log( Level.WARNING, "Instrument " @@ -149,28 +174,12 @@ private void recordPoint(Attributes attributes, Measurement measurement) { + " has exceeded the maximum allowed cardinality (" + maxCardinality + ")."); - attributes = MetricStorage.CARDINALITY_OVERFLOW; - measurement = measurement.withAttributes(attributes); - } else if (points.containsKey( - attributes)) { // Check there is not already a recording for the attributes - throttlingLogger.log( - Level.WARNING, - "Instrument " - + metricDescriptor.getSourceInstrument().getName() - + " has recorded multiple values for the same attributes: " - + attributes); - return; + return MetricStorage.CARDINALITY_OVERFLOW; } - T dataPoint; - if (memoryMode == REUSABLE_DATA) { - dataPoint = reusablePointsPool.borrowObject(); - aggregator.toPoint(measurement, dataPoint); - } else { - dataPoint = aggregator.toPoint(measurement); - } - - points.put(attributes, dataPoint); + Context context = Context.current(); + attributes = attributesProcessor.process(attributes, context); + return attributes; } @Override @@ -189,76 +198,115 @@ public MetricData collect( InstrumentationScopeInfo instrumentationScopeInfo, long startEpochNanos, long epochNanos) { + Collection result = + aggregationTemporality == AggregationTemporality.DELTA + ? collectWithDeltaAggregationTemporality() + : collectWithCumulativeAggregationTemporality(); + + // collectWith*AggregationTemporality() methods are responsible for resetting the handle + aggregatorHandles.forEach(handleReleaser); + aggregatorHandles.clear(); + + return aggregator.toMetricData( + resource, instrumentationScopeInfo, metricDescriptor, result, aggregationTemporality); + } + + private Collection collectWithDeltaAggregationTemporality() { + Map currentPoints; if (memoryMode == REUSABLE_DATA) { - // Collect can not run concurrently for same reader, hence we safely assume - // the previous collect result has been used and done with - reusableResultList.forEach(reusablePointsPool::returnObject); - reusableResultList.clear(); + // deltaPoints computed in the previous collection can be released + reusablePointsList.forEach(reusablePointsPool::returnObject); + reusablePointsList.clear(); + + currentPoints = reusablePointsMap; + } else { + currentPoints = new HashMap<>(); } - Collection result; - if (aggregationTemporality == AggregationTemporality.DELTA) { - Map points = this.points; - Map lastPoints = this.lastPoints; - - Collection deltaPoints; - if (memoryMode == REUSABLE_DATA) { - deltaPoints = reusableResultList; - } else { - deltaPoints = new ArrayList<>(); - } - - points.forEach( - (k, v) -> { - T lastPoint = lastPoints.get(k); - - T deltaPoint; - if (lastPoint == null) { - if (memoryMode == REUSABLE_DATA) { - deltaPoint = reusablePointsPool.borrowObject(); - aggregator.copyPoint(v, deltaPoint); - } else { - deltaPoint = v; - } + aggregatorHandles.forEach( + (attributes, handle) -> { + T point = + handle.aggregateThenMaybeReset( + this.startEpochNanos, this.epochNanos, attributes, /* reset= */ true); + + T pointForCurrentPoints; + if (memoryMode == REUSABLE_DATA) { + // AggregatorHandle is going to modify the point eventually, but we must persist its + // value to used it at the next collection (within lastPoints). Thus, we make a copy. + pointForCurrentPoints = reusablePointsPool.borrowObject(); + aggregator.copyPoint(point, pointForCurrentPoints); + } else { + pointForCurrentPoints = point; + } + currentPoints.put(attributes, pointForCurrentPoints); + }); + + List deltaPoints = memoryMode == REUSABLE_DATA ? reusablePointsList : new ArrayList<>(); + currentPoints.forEach( + (attributes, currentPoint) -> { + T lastPoint = lastPoints.remove(attributes); + + T deltaPoint; + if (lastPoint == null) { + if (memoryMode == REUSABLE_DATA) { + // All deltaPoints are released at the end of the collection. Thus, we need a copy + // to make sure currentPoint can still be used within lastPoints during the next + // collection. + deltaPoint = reusablePointsPool.borrowObject(); + aggregator.copyPoint(currentPoint, deltaPoint); + } else { + deltaPoint = currentPoint; + } + } else { + if (memoryMode == REUSABLE_DATA) { + aggregator.diffInPlace(lastPoint, currentPoint); + deltaPoint = lastPoint; } else { - if (memoryMode == REUSABLE_DATA) { - aggregator.diffInPlace(lastPoint, v); - deltaPoint = lastPoint; - - // Remaining last points are returned to reusablePointsPool, but - // this reusable point is still used, so don't return it to pool yet - lastPoints.remove(k); - } else { - deltaPoint = aggregator.diff(lastPoint, v); - } + deltaPoint = aggregator.diff(lastPoint, currentPoint); } + } + deltaPoints.add(deltaPoint); + }); - deltaPoints.add(deltaPoint); - }); - - if (memoryMode == REUSABLE_DATA) { - lastPoints.forEach((k, v) -> reusablePointsPool.returnObject(v)); - lastPoints.clear(); - this.points = lastPoints; - } else { - this.points = new HashMap<>(); - } - - this.lastPoints = points; - result = deltaPoints; - } else /* CUMULATIVE */ { - if (memoryMode == REUSABLE_DATA) { - points.forEach((k, v) -> reusableResultList.add(v)); - points.clear(); - result = reusableResultList; - } else { - result = points.values(); - points = new HashMap<>(); - } + if (memoryMode == REUSABLE_DATA) { + // - If the point was used to compute a delta, it's now in deltaPoints (and thus in + // reusablePointsList) + // - If the point hasn't been used, it's still in lastPoints and can be returned + lastPoints.forEach(pointReleaser); + lastPoints.clear(); + + Map tmp = lastPoints; + lastPoints = reusablePointsMap; + reusablePointsMap = tmp; + } else { + lastPoints = currentPoints; } - return aggregator.toMetricData( - resource, instrumentationScopeInfo, metricDescriptor, result, aggregationTemporality); + return deltaPoints; + } + + private Collection collectWithCumulativeAggregationTemporality() { + List currentPoints; + if (memoryMode == REUSABLE_DATA) { + // We should not return the points in this list to the pool, they belong to the + // AggregatorHandle + reusablePointsList.clear(); + currentPoints = reusablePointsList; + } else { + currentPoints = new ArrayList<>(); + } + + aggregatorHandles.forEach( + (attributes, handle) -> { + T value = + handle.aggregateThenMaybeReset( + AsynchronousMetricStorage.this.startEpochNanos, + AsynchronousMetricStorage.this.epochNanos, + attributes, + /* reset= */ true); + currentPoints.add(value); + }); + return currentPoints; } @Override diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ImmutableMeasurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ImmutableMeasurement.java deleted file mode 100644 index 9cac89e96e9..00000000000 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ImmutableMeasurement.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.metrics.internal.state; - -import com.google.auto.value.AutoValue; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; -import io.opentelemetry.api.metrics.ObservableLongMeasurement; - -/** - * A long or double measurement recorded from {@link ObservableLongMeasurement} or {@link - * ObservableDoubleMeasurement}. - * - *

This class is internal and is hence not for public use. Its APIs are unstable and can change - * at any time. - */ -@AutoValue -public abstract class ImmutableMeasurement implements Measurement { - - static ImmutableMeasurement createDouble( - long startEpochNanos, long epochNanos, double value, Attributes attributes) { - return new AutoValue_ImmutableMeasurement( - startEpochNanos, - epochNanos, - /* hasLongValue= */ false, - 0L, - /* hasDoubleValue= */ true, - value, - attributes); - } - - static ImmutableMeasurement createLong( - long startEpochNanos, long epochNanos, long value, Attributes attributes) { - return new AutoValue_ImmutableMeasurement( - startEpochNanos, - epochNanos, - /* hasLongValue= */ true, - value, - /* hasDoubleValue= */ false, - 0.0, - attributes); - } - - @Override - public Measurement withAttributes(Attributes attributes) { - if (hasDoubleValue()) { - return createDouble(startEpochNanos(), epochNanos(), doubleValue(), attributes); - } else { - return createLong(startEpochNanos(), epochNanos(), longValue(), attributes); - } - } - - @Override - public Measurement withStartEpochNanos(long startEpochNanos) { - if (hasDoubleValue()) { - return createDouble(startEpochNanos, epochNanos(), doubleValue(), attributes()); - } else { - return createLong(startEpochNanos, epochNanos(), longValue(), attributes()); - } - } -} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/Measurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/Measurement.java deleted file mode 100644 index a5023995dad..00000000000 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/Measurement.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.metrics.internal.state; - -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; -import io.opentelemetry.api.metrics.ObservableLongMeasurement; - -/** - * A long or double measurement recorded from {@link ObservableLongMeasurement} or {@link - * ObservableDoubleMeasurement}. - * - *

This class is internal and is hence not for public use. Its APIs are unstable and can change - * at any time. - */ -public interface Measurement { - long startEpochNanos(); - - long epochNanos(); - - boolean hasLongValue(); - - long longValue(); - - boolean hasDoubleValue(); - - double doubleValue(); - - Attributes attributes(); - - /** - * Updates the attributes. - * - * @param attributes The attributes to update - * @return The updated object. For {@link ImmutableMeasurement} it will be a new object with the - * updated attributes and for {@link MutableMeasurement} it will return itself with the - * attributes updated - */ - Measurement withAttributes(Attributes attributes); - - /** - * Updates the startEpochNanos. - * - * @param startEpochNanos start epoch nanosecond - * @return The updated object. For {@link ImmutableMeasurement} it will be a new object with the - * updated startEpochNanos and for {@link MutableMeasurement} it will return itself with the - * startEpochNanos updated - */ - Measurement withStartEpochNanos(long startEpochNanos); -} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MutableMeasurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MutableMeasurement.java deleted file mode 100644 index 7bac8202447..00000000000 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MutableMeasurement.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.metrics.internal.state; - -import io.opentelemetry.api.common.Attributes; - -/** - * A mutable {@link Measurement} implementation - * - *

This class is internal and is hence not for public use. Its APIs are unstable and can change - * at any time. - * - *

This class is not thread-safe. - */ -public final class MutableMeasurement implements Measurement { - - static void setDoubleMeasurement( - MutableMeasurement mutableMeasurement, - long startEpochNanos, - long epochNanos, - double value, - Attributes attributes) { - mutableMeasurement.set( - startEpochNanos, - epochNanos, - /* hasLongValue= */ false, - 0L, - /* hasDoubleValue= */ true, - value, - attributes); - } - - static void setLongMeasurement( - MutableMeasurement mutableMeasurement, - long startEpochNanos, - long epochNanos, - long value, - Attributes attributes) { - mutableMeasurement.set( - startEpochNanos, - epochNanos, - /* hasLongValue= */ true, - value, - /* hasDoubleValue= */ false, - 0.0, - attributes); - } - - private long startEpochNanos; - private long epochNanos; - private boolean hasLongValue; - private long longValue; - private boolean hasDoubleValue; - private double doubleValue; - - private Attributes attributes = Attributes.empty(); - - /** Sets the values. */ - private void set( - long startEpochNanos, - long epochNanos, - boolean hasLongValue, - long longValue, - boolean hasDoubleValue, - double doubleValue, - Attributes attributes) { - this.startEpochNanos = startEpochNanos; - this.epochNanos = epochNanos; - this.hasLongValue = hasLongValue; - this.longValue = longValue; - this.hasDoubleValue = hasDoubleValue; - this.doubleValue = doubleValue; - this.attributes = attributes; - } - - @Override - public Measurement withStartEpochNanos(long startEpochNanos) { - this.startEpochNanos = startEpochNanos; - return this; - } - - @Override - public Measurement withAttributes(Attributes attributes) { - this.attributes = attributes; - return this; - } - - @Override - public long startEpochNanos() { - return startEpochNanos; - } - - @Override - public long epochNanos() { - return epochNanos; - } - - @Override - public boolean hasLongValue() { - return hasLongValue; - } - - @Override - public long longValue() { - return longValue; - } - - @Override - public boolean hasDoubleValue() { - return hasDoubleValue; - } - - @Override - public double doubleValue() { - return doubleValue; - } - - @Override - public Attributes attributes() { - return attributes; - } -} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java index 27eeac81190..6e195fa545f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java @@ -5,19 +5,14 @@ package io.opentelemetry.sdk.metrics.internal.state; -import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createDouble; -import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createLong; - import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; import io.opentelemetry.api.metrics.ObservableLongMeasurement; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; -import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader; import java.util.List; -import java.util.Objects; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -38,14 +33,9 @@ public final class SdkObservableMeasurement private final InstrumentDescriptor instrumentDescriptor; private final List> storages; - /** Only used when {@code activeReader}'s memoryMode is {@link MemoryMode#REUSABLE_DATA}. */ - private final MutableMeasurement mutableMeasurement = new MutableMeasurement(); - // These fields are set before invoking callbacks. They allow measurements to be recorded to the // storages for correct reader, and with the correct time. @Nullable private volatile RegisteredReader activeReader; - private volatile long startEpochNanos; - private volatile long epochNanos; private SdkObservableMeasurement( InstrumentationScopeInfo instrumentationScopeInfo, @@ -83,8 +73,11 @@ public InstrumentationScopeInfo getInstrumentationScopeInfo() { public void setActiveReader( RegisteredReader registeredReader, long startEpochNanos, long epochNanos) { this.activeReader = registeredReader; - this.startEpochNanos = startEpochNanos; - this.epochNanos = epochNanos; + for (AsynchronousMetricStorage storage : storages) { + if (storage.getRegisteredReader().equals(activeReader)) { + storage.setEpochInformation(startEpochNanos, epochNanos); + } + } } /** @@ -109,23 +102,17 @@ public void record(long value) { @Override public void record(long value, Attributes attributes) { + RegisteredReader activeReader = this.activeReader; if (activeReader == null) { logNoActiveReader(); return; } - Measurement measurement; - - MemoryMode memoryMode = activeReader.getReader().getMemoryMode(); - if (Objects.requireNonNull(memoryMode) == MemoryMode.IMMUTABLE_DATA) { - measurement = createLong(startEpochNanos, epochNanos, value, attributes); - } else { - MutableMeasurement.setLongMeasurement( - mutableMeasurement, startEpochNanos, epochNanos, value, attributes); - measurement = mutableMeasurement; + for (AsynchronousMetricStorage storage : storages) { + if (storage.getRegisteredReader().equals(activeReader)) { + storage.record(attributes, value); + } } - - doRecord(measurement); } @Override @@ -135,10 +122,12 @@ public void record(double value) { @Override public void record(double value, Attributes attributes) { + RegisteredReader activeReader = this.activeReader; if (activeReader == null) { logNoActiveReader(); return; } + if (Double.isNaN(value)) { logger.log( Level.FINE, @@ -150,24 +139,9 @@ public void record(double value, Attributes attributes) { return; } - Measurement measurement; - MemoryMode memoryMode = activeReader.getReader().getMemoryMode(); - if (Objects.requireNonNull(memoryMode) == MemoryMode.IMMUTABLE_DATA) { - measurement = createDouble(startEpochNanos, epochNanos, value, attributes); - } else { - MutableMeasurement.setDoubleMeasurement( - mutableMeasurement, startEpochNanos, epochNanos, value, attributes); - measurement = mutableMeasurement; - } - - doRecord(measurement); - } - - private void doRecord(Measurement measurement) { - RegisteredReader activeReader = this.activeReader; for (AsynchronousMetricStorage storage : storages) { if (storage.getRegisteredReader().equals(activeReader)) { - storage.record(measurement); + storage.record(attributes, value); } } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java index 17989d14a1a..fd5e90aa3f6 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java @@ -421,7 +421,7 @@ void readerAndViewCardinalityConfiguration() { 0, asyncCounterLimit, LongPointData::getValue, - 1)))); + DEFAULT_MAX_CARDINALITY - (asyncCounterLimit - 1))))); assertThat(cumulativeReader.collectAllMetrics()) .as("cumulative collection") .satisfiesExactlyInAnyOrder( @@ -485,7 +485,7 @@ void readerAndViewCardinalityConfiguration() { 0, asyncCounterLimit, LongPointData::getValue, - 1)))); + DEFAULT_MAX_CARDINALITY - (asyncCounterLimit - 1))))); // Record another round of measurements, again exceeding cardinality limits for (int i = DEFAULT_MAX_CARDINALITY; i < DEFAULT_MAX_CARDINALITY * 2; i++) { @@ -625,7 +625,7 @@ void readerAndViewCardinalityConfiguration() { 0, asyncCounterLimit, LongPointData::getValue, - 1)))); + DEFAULT_MAX_CARDINALITY - (asyncCounterLimit - 1))))); } /** diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java index b28a8e4879c..593d0da94a1 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java @@ -6,6 +6,7 @@ package io.opentelemetry.sdk.metrics.internal.aggregator; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; @@ -276,4 +277,13 @@ void testReusableDataOnCollect() { assertThat(pointData).isSameAs(pointDataWithReset); } + + @Test + void testNullPointerExceptionWhenUnset() { + init(MemoryMode.REUSABLE_DATA); + AggregatorHandle handle = aggregator.createHandle(); + assertThatThrownBy( + () -> handle.aggregateThenMaybeReset(0, 10, Attributes.empty(), /* reset= */ true)) + .isInstanceOf(NullPointerException.class); + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java index 23c45a9349f..e33e3d71800 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java @@ -6,8 +6,6 @@ package io.opentelemetry.sdk.metrics.internal.state; import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA; -import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createDouble; -import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createLong; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry; import static org.mockito.ArgumentMatchers.any; @@ -107,9 +105,10 @@ void setup(MemoryMode memoryMode) { void recordLong(MemoryMode memoryMode) { setup(memoryMode); - longCounterStorage.record(createLong(0, 1, 1, Attributes.builder().put("key", "a").build())); - longCounterStorage.record(createLong(0, 1, 2, Attributes.builder().put("key", "b").build())); - longCounterStorage.record(createLong(0, 1, 3, Attributes.builder().put("key", "c").build())); + longCounterStorage.setEpochInformation(0, 1); + longCounterStorage.record(Attributes.builder().put("key", "a").build(), 1); + longCounterStorage.record(Attributes.builder().put("key", "b").build(), 2); + longCounterStorage.record(Attributes.builder().put("key", "c").build(), 3); assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( @@ -132,12 +131,10 @@ void recordLong(MemoryMode memoryMode) { void recordDouble(MemoryMode memoryMode) { setup(memoryMode); - doubleCounterStorage.record( - createDouble(0, 1, 1.1, Attributes.builder().put("key", "a").build())); - doubleCounterStorage.record( - createDouble(0, 1, 2.2, Attributes.builder().put("key", "b").build())); - doubleCounterStorage.record( - createDouble(0, 1, 3.3, Attributes.builder().put("key", "c").build())); + longCounterStorage.setEpochInformation(0, 1); + doubleCounterStorage.record(Attributes.builder().put("key", "a").build(), 1.1); + doubleCounterStorage.record(Attributes.builder().put("key", "b").build(), 2.2); + doubleCounterStorage.record(Attributes.builder().put("key", "c").build(), 3.3); assertThat(doubleCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( @@ -179,8 +176,8 @@ void record_ProcessesAttributes(MemoryMode memoryMode) { InstrumentValueType.LONG, Advice.empty())); - storage.record( - createLong(0, 1, 1, Attributes.builder().put("key1", "a").put("key2", "b").build())); + storage.setEpochInformation(0, 1); + storage.record(Attributes.builder().put("key1", "a").put("key2", "b").build(), 1); assertThat(storage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( @@ -199,9 +196,9 @@ void record_ProcessesAttributes(MemoryMode memoryMode) { void record_MaxCardinality(MemoryMode memoryMode) { setup(memoryMode); + longCounterStorage.setEpochInformation(0, 1); for (int i = 0; i <= CARDINALITY_LIMIT + 1; i++) { - longCounterStorage.record( - createLong(0, 1, 1, Attributes.builder().put("key" + i, "val").build())); + longCounterStorage.record(Attributes.builder().put("key" + i, "val").build(), 1); } assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) @@ -211,13 +208,45 @@ void record_MaxCardinality(MemoryMode memoryMode) { logs.assertContains("Instrument long-counter has exceeded the maximum allowed cardinality"); } + @ParameterizedTest + @EnumSource(MemoryMode.class) + void record_HandlesSpotsAreReleasedAfterCollection(MemoryMode memoryMode) { + setup(memoryMode); + + longCounterStorage.setEpochInformation(0, 1); + for (int i = 0; i < CARDINALITY_LIMIT - 1; i++) { + longCounterStorage.record(Attributes.builder().put("key" + i, "val").build(), 1); + } + + assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) + .satisfies( + metricData -> + assertThat(metricData.getLongSumData().getPoints()).hasSize(CARDINALITY_LIMIT - 1)); + logs.assertDoesNotContain( + "Instrument long-counter has exceeded the maximum allowed cardinality"); + + longCounterStorage.setEpochInformation(1, 2); + for (int i = 0; i < CARDINALITY_LIMIT - 1; i++) { + // Different attribute + longCounterStorage.record(Attributes.builder().put("key" + i, "val2").build(), 1); + } + + assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) + .satisfies( + metricData -> + assertThat(metricData.getLongSumData().getPoints()).hasSize(CARDINALITY_LIMIT - 1)); + logs.assertDoesNotContain( + "Instrument long-counter has exceeded the maximum allowed cardinality"); + } + @ParameterizedTest @EnumSource(MemoryMode.class) void record_DuplicateAttributes(MemoryMode memoryMode) { setup(memoryMode); - longCounterStorage.record(createLong(0, 1, 1, Attributes.builder().put("key1", "a").build())); - longCounterStorage.record(createLong(0, 1, 2, Attributes.builder().put("key1", "a").build())); + longCounterStorage.setEpochInformation(0, 1); + longCounterStorage.record(Attributes.builder().put("key1", "a").build(), 1); + longCounterStorage.record(Attributes.builder().put("key1", "a").build(), 2); assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( @@ -227,9 +256,7 @@ void record_DuplicateAttributes(MemoryMode memoryMode) { sum -> sum.hasPointsSatisfying( point -> - point.hasValue(1).hasAttributes(attributeEntry("key1", "a"))))); - logs.assertContains( - "Instrument long-counter has recorded multiple values for the same attributes: {key1=\"a\"}"); + point.hasValue(3).hasAttributes(attributeEntry("key1", "a"))))); } @ParameterizedTest @@ -238,7 +265,8 @@ void collect_CumulativeReportsCumulativeObservations(MemoryMode memoryMode) { setup(memoryMode); // Record measurement and collect at time 10 - longCounterStorage.record(createLong(0, 10, 3, Attributes.empty())); + longCounterStorage.setEpochInformation(0, 10); + longCounterStorage.record(Attributes.empty(), 3); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -253,9 +281,9 @@ void collect_CumulativeReportsCumulativeObservations(MemoryMode memoryMode) { registeredReader.setLastCollectEpochNanos(10); // Record measurements and collect at time 30 - longCounterStorage.record(createLong(0, 30, 3, Attributes.empty())); - longCounterStorage.record( - createLong(0, 30, 6, Attributes.builder().put("key", "value1").build())); + longCounterStorage.setEpochInformation(0, 30); + longCounterStorage.record(Attributes.empty(), 3); + longCounterStorage.record(Attributes.builder().put("key", "value1").build(), 6); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -276,9 +304,9 @@ void collect_CumulativeReportsCumulativeObservations(MemoryMode memoryMode) { registeredReader.setLastCollectEpochNanos(30); // Record measurement and collect at time 35 - longCounterStorage.record(createLong(0, 35, 4, Attributes.empty())); - longCounterStorage.record( - createLong(0, 35, 5, Attributes.builder().put("key", "value2").build())); + longCounterStorage.setEpochInformation(0, 35); + longCounterStorage.record(Attributes.empty(), 4); + longCounterStorage.record(Attributes.builder().put("key", "value2").build(), 5); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -317,7 +345,8 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) { Advice.empty())); // Record measurement and collect at time 10 - longCounterStorage.record(createLong(0, 10, 3, Attributes.empty())); + longCounterStorage.setEpochInformation(0, 10); + longCounterStorage.record(Attributes.empty(), 3); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -332,9 +361,9 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) { registeredReader.setLastCollectEpochNanos(10); // Record measurement and collect at time 30 - longCounterStorage.record(createLong(0, 30, 3, Attributes.empty())); - longCounterStorage.record( - createLong(0, 30, 6, Attributes.builder().put("key", "value1").build())); + longCounterStorage.setEpochInformation(0, 30); + longCounterStorage.record(Attributes.empty(), 3); + longCounterStorage.record(Attributes.builder().put("key", "value1").build(), 6); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -355,9 +384,9 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) { registeredReader.setLastCollectEpochNanos(30); // Record measurement and collect at time 35 - longCounterStorage.record(createLong(0, 35, 4, Attributes.empty())); - longCounterStorage.record( - createLong(0, 35, 5, Attributes.builder().put("key", "value2").build())); + longCounterStorage.setEpochInformation(0, 35); + longCounterStorage.record(Attributes.empty(), 4); + longCounterStorage.record(Attributes.builder().put("key", "value2").build(), 5); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -381,9 +410,10 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) { void collect_reusableData_reusedObjectsAreReturnedOnSecondCall() { setup(REUSABLE_DATA); - longCounterStorage.record(createLong(0, 1, 1, Attributes.builder().put("key", "a").build())); - longCounterStorage.record(createLong(0, 1, 2, Attributes.builder().put("key", "b").build())); - longCounterStorage.record(createLong(0, 1, 3, Attributes.builder().put("key", "c").build())); + longCounterStorage.setEpochInformation(0, 1); + longCounterStorage.record(Attributes.builder().put("key", "a").build(), 1); + longCounterStorage.record(Attributes.builder().put("key", "b").build(), 2); + longCounterStorage.record(Attributes.builder().put("key", "c").build(), 3); MetricData firstCollectMetricData = longCounterStorage.collect(resource, scope, 0, testClock.nanoTime()); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java index a8b424dfcd5..b6bc13a858c 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java @@ -5,11 +5,11 @@ package io.opentelemetry.sdk.metrics.internal.state; -import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createDouble; -import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createLong; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -146,10 +146,12 @@ void invokeCallback_Double() { callbackRegistration.invokeCallback(registeredReader, 0, 1); assertThat(counter.get()).isEqualTo(1.1); - verify(storage1) - .record(createDouble(0, 1, 1.1, Attributes.builder().put("key", "val").build())); - verify(storage2, never()).record(any()); - verify(storage3, never()).record(any()); + verify(storage1).setEpochInformation(0, 1); + verify(storage1).record(Attributes.builder().put("key", "val").build(), 1.1); + verify(storage2, never()).setEpochInformation(anyLong(), anyLong()); + verify(storage2, never()).record(any(), anyDouble()); + verify(storage3, never()).setEpochInformation(anyLong(), anyLong()); + verify(storage3, never()).record(any(), anyDouble()); } @Test @@ -165,9 +167,12 @@ void invokeCallback_Long() { callbackRegistration.invokeCallback(registeredReader, 0, 1); assertThat(counter.get()).isEqualTo(1); - verify(storage1, never()).record(any()); - verify(storage2).record(createLong(0, 1, 1, Attributes.builder().put("key", "val").build())); - verify(storage3).record(createLong(0, 1, 1, Attributes.builder().put("key", "val").build())); + verify(storage1, never()).setEpochInformation(anyLong(), anyLong()); + verify(storage1, never()).record(any(), anyLong()); + verify(storage2).setEpochInformation(0, 1); + verify(storage2).record(Attributes.builder().put("key", "val").build(), 1); + verify(storage3).setEpochInformation(0, 1); + verify(storage3).record(Attributes.builder().put("key", "val").build(), 1); } @Test @@ -188,10 +193,12 @@ void invokeCallback_MultipleMeasurements() { assertThat(doubleCounter.get()).isEqualTo(1.1); assertThat(longCounter.get()).isEqualTo(1); - verify(storage1) - .record(createDouble(0, 1, 1.1, Attributes.builder().put("key", "val").build())); - verify(storage2).record(createLong(0, 1, 1, Attributes.builder().put("key", "val").build())); - verify(storage3).record(createLong(0, 1, 1, Attributes.builder().put("key", "val").build())); + verify(storage1).setEpochInformation(0, 1); + verify(storage1).record(Attributes.builder().put("key", "val").build(), 1.1); + verify(storage2).setEpochInformation(0, 1); + verify(storage2).record(Attributes.builder().put("key", "val").build(), 1); + verify(storage3).setEpochInformation(0, 1); + verify(storage3).record(Attributes.builder().put("key", "val").build(), 1); } @Test @@ -223,9 +230,12 @@ void invokeCallback_MultipleMeasurements_ThrowsException() { callbackRegistration.invokeCallback(registeredReader, 0, 1); - verify(storage1, never()).record(any()); - verify(storage2, never()).record(any()); - verify(storage3, never()).record(any()); + verify(storage1, never()).record(any(), anyLong()); + verify(storage1, never()).record(any(), anyDouble()); + verify(storage2, never()).record(any(), anyLong()); + verify(storage2, never()).record(any(), anyDouble()); + verify(storage3, never()).record(any(), anyLong()); + verify(storage3, never()).record(any(), anyDouble()); logs.assertContains("An exception occurred invoking callback"); } @@ -240,9 +250,12 @@ void invokeCallback_SingleMeasurement_ThrowsException() { callbackRegistration.invokeCallback(registeredReader, 0, 1); - verify(storage1, never()).record(any()); - verify(storage2, never()).record(any()); - verify(storage3, never()).record(any()); + verify(storage1, never()).record(any(), anyLong()); + verify(storage1, never()).record(any(), anyDouble()); + verify(storage2, never()).record(any(), anyLong()); + verify(storage2, never()).record(any(), anyDouble()); + verify(storage3, never()).record(any(), anyLong()); + verify(storage3, never()).record(any(), anyDouble()); logs.assertContains("An exception occurred invoking callback"); } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java index aaef2c841fe..b21797d9756 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java @@ -6,15 +6,16 @@ package io.opentelemetry.sdk.metrics.internal.state; import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE; -import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import io.github.netmikey.logunit.api.LogCapturer; +import io.opentelemetry.api.common.Attributes; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.common.export.MemoryMode; @@ -28,7 +29,6 @@ import java.util.Arrays; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.mockito.ArgumentCaptor; import org.slf4j.event.Level; @SuppressWarnings("rawtypes") @@ -39,11 +39,10 @@ class SdkObservableMeasurementTest { LogCapturer.create().captureForLogger(SdkObservableMeasurement.class.getName(), Level.DEBUG); private AsynchronousMetricStorage mockAsyncStorage1; + private AsynchronousMetricStorage mockAsyncStorage2; private RegisteredReader registeredReader1; private SdkObservableMeasurement sdkObservableMeasurement; - private ArgumentCaptor measurementArgumentCaptor; - @SuppressWarnings("unchecked") private void setup(MemoryMode memoryMode) { InstrumentationScopeInfo instrumentationScopeInfo = InstrumentationScopeInfo.builder("test-scope").build(); @@ -66,10 +65,9 @@ private void setup(MemoryMode memoryMode) { InMemoryMetricReader reader2 = InMemoryMetricReader.builder().setMemoryMode(memoryMode).build(); RegisteredReader registeredReader2 = RegisteredReader.create(reader2, ViewRegistry.create()); - measurementArgumentCaptor = ArgumentCaptor.forClass(Measurement.class); mockAsyncStorage1 = mock(AsynchronousMetricStorage.class); when(mockAsyncStorage1.getRegisteredReader()).thenReturn(registeredReader1); - AsynchronousMetricStorage mockAsyncStorage2 = mock(AsynchronousMetricStorage.class); + mockAsyncStorage2 = mock(AsynchronousMetricStorage.class); when(mockAsyncStorage2.getRegisteredReader()).thenReturn(registeredReader2); sdkObservableMeasurement = @@ -79,21 +77,29 @@ private void setup(MemoryMode memoryMode) { Arrays.asList(mockAsyncStorage1, mockAsyncStorage2)); } + void setupAndSetActiveReader(MemoryMode memoryMode) { + setup(memoryMode); + sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); + } + @Test - void recordLong_ImmutableData() { + void setActiveReader_SetsEpochInformation() { setup(MemoryMode.IMMUTABLE_DATA); sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); + verify(mockAsyncStorage1).setEpochInformation(0, 10); + verify(mockAsyncStorage2).getRegisteredReader(); + verifyNoMoreInteractions(mockAsyncStorage2); + } + + @Test + void recordLong_ImmutableData() { + setupAndSetActiveReader(MemoryMode.IMMUTABLE_DATA); + try { sdkObservableMeasurement.record(5); - - verify(mockAsyncStorage1).record(measurementArgumentCaptor.capture()); - Measurement passedMeasurement = measurementArgumentCaptor.getValue(); - assertThat(passedMeasurement).isInstanceOf(ImmutableMeasurement.class); - assertThat(passedMeasurement.longValue()).isEqualTo(5); - assertThat(passedMeasurement.startEpochNanos()).isEqualTo(0); - assertThat(passedMeasurement.epochNanos()).isEqualTo(10); + verify(mockAsyncStorage1).record(Attributes.empty(), 5); } finally { sdkObservableMeasurement.unsetActiveReader(); } @@ -101,19 +107,11 @@ void recordLong_ImmutableData() { @Test void recordDouble_ImmutableData() { - setup(MemoryMode.IMMUTABLE_DATA); - - sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); + setupAndSetActiveReader(MemoryMode.IMMUTABLE_DATA); try { sdkObservableMeasurement.record(4.3); - - verify(mockAsyncStorage1).record(measurementArgumentCaptor.capture()); - Measurement passedMeasurement = measurementArgumentCaptor.getValue(); - assertThat(passedMeasurement).isInstanceOf(ImmutableMeasurement.class); - assertThat(passedMeasurement.doubleValue()).isEqualTo(4.3); - assertThat(passedMeasurement.startEpochNanos()).isEqualTo(0); - assertThat(passedMeasurement.epochNanos()).isEqualTo(10); + verify(mockAsyncStorage1).record(Attributes.empty(), 4.3); } finally { sdkObservableMeasurement.unsetActiveReader(); } @@ -121,31 +119,14 @@ void recordDouble_ImmutableData() { @Test void recordDouble_ReusableData() { - setup(MemoryMode.REUSABLE_DATA); - - sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); + setupAndSetActiveReader(MemoryMode.REUSABLE_DATA); try { sdkObservableMeasurement.record(4.3); - - verify(mockAsyncStorage1).record(measurementArgumentCaptor.capture()); - Measurement firstMeasurement = measurementArgumentCaptor.getValue(); - assertThat(firstMeasurement).isInstanceOf(MutableMeasurement.class); - assertThat(firstMeasurement.doubleValue()).isEqualTo(4.3); - assertThat(firstMeasurement.startEpochNanos()).isEqualTo(0); - assertThat(firstMeasurement.epochNanos()).isEqualTo(10); + verify(mockAsyncStorage1).record(Attributes.empty(), 4.3); sdkObservableMeasurement.record(5.3); - - verify(mockAsyncStorage1, times(2)).record(measurementArgumentCaptor.capture()); - Measurement secondMeasurement = measurementArgumentCaptor.getValue(); - assertThat(secondMeasurement).isInstanceOf(MutableMeasurement.class); - assertThat(secondMeasurement.doubleValue()).isEqualTo(5.3); - assertThat(secondMeasurement.startEpochNanos()).isEqualTo(0); - assertThat(secondMeasurement.epochNanos()).isEqualTo(10); - - // LeasedMeasurement should be re-used - assertThat(secondMeasurement).isSameAs(firstMeasurement); + verify(mockAsyncStorage1).record(Attributes.empty(), 5.3); } finally { sdkObservableMeasurement.unsetActiveReader(); } @@ -153,31 +134,14 @@ void recordDouble_ReusableData() { @Test void recordLong_ReusableData() { - setup(MemoryMode.REUSABLE_DATA); - - sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); + setupAndSetActiveReader(MemoryMode.REUSABLE_DATA); try { sdkObservableMeasurement.record(2); - - verify(mockAsyncStorage1).record(measurementArgumentCaptor.capture()); - Measurement firstMeasurement = measurementArgumentCaptor.getValue(); - assertThat(firstMeasurement).isInstanceOf(MutableMeasurement.class); - assertThat(firstMeasurement.longValue()).isEqualTo(2); - assertThat(firstMeasurement.startEpochNanos()).isEqualTo(0); - assertThat(firstMeasurement.epochNanos()).isEqualTo(10); + verify(mockAsyncStorage1).record(Attributes.empty(), 2); sdkObservableMeasurement.record(6); - - verify(mockAsyncStorage1, times(2)).record(measurementArgumentCaptor.capture()); - Measurement secondMeasurement = measurementArgumentCaptor.getValue(); - assertThat(secondMeasurement).isInstanceOf(MutableMeasurement.class); - assertThat(secondMeasurement.longValue()).isEqualTo(6); - assertThat(secondMeasurement.startEpochNanos()).isEqualTo(0); - assertThat(secondMeasurement.epochNanos()).isEqualTo(10); - - // LeasedMeasurement should be re-used - assertThat(secondMeasurement).isSameAs(firstMeasurement); + verify(mockAsyncStorage1).record(Attributes.empty(), 6); } finally { sdkObservableMeasurement.unsetActiveReader(); } @@ -186,11 +150,10 @@ void recordLong_ReusableData() { @Test @SuppressLogger(SdkObservableMeasurement.class) void recordDouble_NaN() { - setup(MemoryMode.REUSABLE_DATA); - sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); - sdkObservableMeasurement.record(Double.NaN); + setupAndSetActiveReader(MemoryMode.REUSABLE_DATA); - verify(mockAsyncStorage1, never()).record(any()); + sdkObservableMeasurement.record(Double.NaN); + verify(mockAsyncStorage1, never()).record(any(), anyDouble()); logs.assertContains( "Instrument testCounter has recorded measurement Not-a-Number (NaN) value with attributes {}. Dropping measurement."); }