Skip to content

Spatial aggregation for async instruments with filtering views #7264

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6816f91
wip
fandreuz Apr 9, 2025
ed783f9
fix tests
fandreuz Apr 9, 2025
0c63775
keep ref value locally
fandreuz Apr 9, 2025
ab8bd4d
comments
fandreuz Apr 9, 2025
e942dc2
review comment
fandreuz Apr 15, 2025
fa58b02
remove log statement
fandreuz Apr 16, 2025
561e479
reusable currentPoints
fandreuz Apr 16, 2025
8bde0f4
comments and fix test
fandreuz Apr 16, 2025
4fe5834
Merge branch 'main' into 4901-aggregate-async-instruments
fandreuz Apr 16, 2025
6e96eda
set epoch info once
fandreuz Apr 17, 2025
4b3f874
add test for setActiveReader
fandreuz Apr 17, 2025
4254d30
reduce allocation in REUSABLE_DATA mode
fandreuz Apr 22, 2025
0dca607
handle overflow
fandreuz Apr 22, 2025
9dcaf3b
dont delete all of them
fandreuz Apr 22, 2025
1609f5f
gt
fandreuz Apr 22, 2025
944b3ef
ops
fandreuz Apr 23, 2025
7115187
reusable handles
fandreuz Apr 23, 2025
4e566d7
pool for handles
fandreuz Apr 23, 2025
a5d8b67
new test
fandreuz Apr 23, 2025
f831572
save some alloc
fandreuz Apr 23, 2025
1bc1d78
process only when needed
fandreuz Apr 23, 2025
63f2471
one less lambda
fandreuz Apr 23, 2025
29f93c5
reduce allocs
fandreuz Apr 23, 2025
cbe0784
no useless allocs
fandreuz Apr 24, 2025
4613e7c
new test
fandreuz Apr 24, 2025
1371683
fix impl
fandreuz Apr 24, 2025
1f9a4b5
atomicreference+atomiclong
fandreuz Apr 24, 2025
53ad5dd
trigger
fandreuz Apr 27, 2025
f6f1db7
Merge remote-tracking branch 'origin/main' into 4901-aggregate-async-…
fandreuz Apr 27, 2025
44b9825
Delete unused Measurement and related code
jack-berg May 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.state.Measurement;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collection;
import javax.annotation.concurrent.Immutable;
Expand Down Expand Up @@ -68,24 +67,6 @@ default void diffInPlace(T previousCumulativeReusable, T currentCumulative) {
throw new UnsupportedOperationException("This aggregator does not support diffInPlace.");
}

/**
* Return a new point representing the measurement.
*
* <p>Aggregators MUST implement diff if it can be used with asynchronous instruments.
*/
default T toPoint(Measurement measurement) {
throw new UnsupportedOperationException("This aggregator does not support toPoint.");
}

/**
* Resets {@code reusablePoint} to represent the {@code measurement}.
*
* <p>Aggregators MUST implement diff if it can be used with asynchronous instruments.
*/
default void toPoint(Measurement measurement, T reusablePoint) {
throw new UnsupportedOperationException("This aggregator does not support toPoint.");
}

/** Creates a new reusable point. */
default T createReusablePoint() {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
import io.opentelemetry.sdk.metrics.internal.data.MutableDoublePointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.state.Measurement;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -66,25 +66,6 @@ public void diffInPlace(DoublePointData previousReusable, DoublePointData curren
((MutableDoublePointData) previousReusable).set(current);
}

@Override
public DoublePointData toPoint(Measurement measurement) {
return ImmutableDoublePointData.create(
measurement.startEpochNanos(),
measurement.epochNanos(),
measurement.attributes(),
measurement.doubleValue());
}

@Override
public void toPoint(Measurement measurement, DoublePointData reusablePoint) {
((MutableDoublePointData) reusablePoint)
.set(
measurement.startEpochNanos(),
measurement.epochNanos(),
measurement.attributes(),
measurement.doubleValue());
}

@Override
public DoublePointData createReusablePoint() {
return new MutableDoublePointData();
Expand Down Expand Up @@ -114,8 +95,8 @@ public MetricData toMetricData(
}

static final class Handle extends AggregatorHandle<DoublePointData, DoubleExemplarData> {
@Nullable private static final Double DEFAULT_VALUE = null;
private final AtomicReference<Double> current = new AtomicReference<>(DEFAULT_VALUE);
private final AtomicReference<AtomicLong> current = new AtomicReference<>(null);
private final AtomicLong valueBits = new AtomicLong();

// Only used when memoryMode is REUSABLE_DATA
@Nullable private final MutableDoublePointData reusablePoint;
Expand All @@ -136,20 +117,22 @@ protected DoublePointData doAggregateThenMaybeReset(
Attributes attributes,
List<DoubleExemplarData> exemplars,
boolean reset) {
Double value = reset ? this.current.getAndSet(DEFAULT_VALUE) : this.current.get();
AtomicLong valueBits =
Objects.requireNonNull(reset ? this.current.getAndSet(null) : this.current.get());
double value = Double.longBitsToDouble(valueBits.get());
if (reusablePoint != null) {
reusablePoint.set(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);
reusablePoint.set(startEpochNanos, epochNanos, attributes, value, exemplars);
return reusablePoint;
} else {
return ImmutableDoublePointData.create(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);
startEpochNanos, epochNanos, attributes, value, exemplars);
}
}

@Override
protected void doRecordDouble(double value) {
current.set(value);
valueBits.set(Double.doubleToLongBits(value));
current.compareAndSet(null, valueBits);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.state.Measurement;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -82,25 +81,6 @@ public void diffInPlace(DoublePointData previousReusablePoint, DoublePointData c
currentPoint.getExemplars());
}

@Override
public DoublePointData toPoint(Measurement measurement) {
return ImmutableDoublePointData.create(
measurement.startEpochNanos(),
measurement.epochNanos(),
measurement.attributes(),
measurement.doubleValue());
}

@Override
public void toPoint(Measurement measurement, DoublePointData reusablePoint) {
((MutableDoublePointData) reusablePoint)
.set(
measurement.startEpochNanos(),
measurement.epochNanos(),
measurement.attributes(),
measurement.doubleValue());
}

@Override
public DoublePointData createReusablePoint() {
return new MutableDoublePointData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.opentelemetry.sdk.metrics.internal.data.MutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.state.Measurement;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -63,25 +62,6 @@ public void diffInPlace(LongPointData previousReusablePoint, LongPointData curre
((MutableLongPointData) previousReusablePoint).set(currentPoint);
}

@Override
public LongPointData toPoint(Measurement measurement) {
return ImmutableLongPointData.create(
measurement.startEpochNanos(),
measurement.epochNanos(),
measurement.attributes(),
measurement.longValue());
}

@Override
public void toPoint(Measurement measurement, LongPointData reusablePoint) {
((MutableLongPointData) reusablePoint)
.set(
measurement.startEpochNanos(),
measurement.epochNanos(),
measurement.attributes(),
measurement.longValue());
}

@Override
public LongPointData createReusablePoint() {
return new MutableLongPointData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.state.Measurement;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -75,25 +74,6 @@ public void diffInPlace(LongPointData previousReusablePoint, LongPointData curre
currentPoint.getExemplars());
}

@Override
public LongPointData toPoint(Measurement measurement) {
return ImmutableLongPointData.create(
measurement.startEpochNanos(),
measurement.epochNanos(),
measurement.attributes(),
measurement.longValue());
}

@Override
public void toPoint(Measurement measurement, LongPointData reusablePoint) {
((MutableLongPointData) reusablePoint)
.set(
measurement.startEpochNanos(),
measurement.epochNanos(),
measurement.attributes(),
measurement.longValue());
}

@Override
public LongPointData createReusablePoint() {
return new MutableLongPointData();
Expand Down
Loading
Loading