Skip to content

Commit

Permalink
[Break Change] gGRPC metrics exporter unified the metric value type a…
Browse files Browse the repository at this point in the history
…nd support labeled metrics. (#12156)
  • Loading branch information
wankai123 authored Apr 24, 2024
1 parent 2d6bc69 commit fe2c4b5
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 123 deletions.
2 changes: 1 addition & 1 deletion docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
- `memory_swap_percentage` -> `memory_virtual_memory_percentage`
* Fix/Change UI init setting for Windows Swap -> Virtual Memory
* Fix `Memory Swap Usage`/`Virtual Memory Usage` display with UI init.(Linux/Windows)

* Fix inaccurate APISIX metrics.
* Fix inaccurate MongoDB Metrics.
* Support Apache ActiveMQ server monitoring.
Expand All @@ -117,6 +116,7 @@
* Fix inaccurate Hierarchy of RabbitMQ Server monitoring metrics.
* Fix inaccurate MySQL/MariaDB, Redis, PostgreSQL metrics.
* Support DoubleValue,IntValue,BoolValue in OTEL metrics attributes.
* [Break Change] gGRPC metrics exporter unified the metric value type and support labeled metrics.

#### UI

Expand Down
2 changes: 1 addition & 1 deletion docs/en/setup/backend/exporter.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Return empty list, if you want to export all metrics in the incremental event ty

2. Export implementation.
Stream service. All subscribed metrics will be sent here based on the OAP core schedule. Also, if the OAP is deployed as a cluster,
this method will be called concurrently. For metrics value, you need to follow `#type` to choose `#longValue` or `#doubleValue`.
this method will be called concurrently.

## Kafka Exporter
### Trace Kafka Exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,32 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.metrics.DoubleValueHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataLabel;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntValueHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.LabeledValueHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.LongValueHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
import org.apache.skywalking.oap.server.core.analysis.metrics.MultiIntValuesHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
import org.apache.skywalking.oap.server.core.exporter.ExportData;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService;
import org.apache.skywalking.oap.server.exporter.grpc.EventType;
import org.apache.skywalking.oap.server.exporter.grpc.ExportMetricValue;
import org.apache.skywalking.oap.server.exporter.grpc.ExportResponse;
import org.apache.skywalking.oap.server.exporter.grpc.KeyValue;
import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
import org.apache.skywalking.oap.server.exporter.grpc.MetricValue;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
import org.apache.skywalking.oap.server.exporter.grpc.ValueType;
import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.GRPCStreamStatus;

@Slf4j
Expand Down Expand Up @@ -132,99 +135,115 @@ public void fetchSubscriptionList() {

@Override
public void consume(List<ExportData> data) {
GRPCStreamStatus status = new GRPCStreamStatus();
StreamObserver<ExportMetricValue> streamObserver =
exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS)
.export(
new StreamObserver<ExportResponse>() {
@Override
public void onNext(
ExportResponse response) {

}

@Override
public void onError(
Throwable throwable) {
status.done();
}

@Override
public void onCompleted() {
status.done();
}
});
AtomicInteger exportNum = new AtomicInteger();
data.forEach(row -> {
ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();

Metrics metrics = row.getMetrics();
if (metrics instanceof LongValueHolder) {
long value = ((LongValueHolder) metrics).getValue();
builder.setLongValue(value);
builder.setType(ValueType.LONG);
} else if (metrics instanceof IntValueHolder) {
long value = ((IntValueHolder) metrics).getValue();
builder.setLongValue(value);
builder.setType(ValueType.LONG);
} else if (metrics instanceof DoubleValueHolder) {
double value = ((DoubleValueHolder) metrics).getValue();
builder.setDoubleValue(value);
builder.setType(ValueType.DOUBLE);
} else if (metrics instanceof MultiIntValuesHolder) {
int[] values = ((MultiIntValuesHolder) metrics).getValues();
for (int value : values) {
builder.addLongValues(value);
if (CollectionUtils.isNotEmpty(data)) {
GRPCStreamStatus status = new GRPCStreamStatus();
StreamObserver<ExportMetricValue> streamObserver =
exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS)
.export(
new StreamObserver<ExportResponse>() {
@Override
public void onNext(
ExportResponse response) {

}

@Override
public void onError(
Throwable throwable) {
log.error("Export metrics to {}:{} fails.",
setting.getGRPCTargetHost(),
setting.getGRPCTargetPort(), throwable
);
status.done();
}

@Override
public void onCompleted() {
status.done();
}
});
AtomicInteger exportNum = new AtomicInteger();

data.forEach(row -> {
ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();

Metrics metrics = row.getMetrics();
if (metrics instanceof LongValueHolder) {
long value = ((LongValueHolder) metrics).getValue();
MetricValue.Builder valueBuilder = MetricValue.newBuilder();
valueBuilder.setLongValue(value);
builder.addMetricValues(valueBuilder);
} else if (metrics instanceof IntValueHolder) {
long value = ((IntValueHolder) metrics).getValue();
MetricValue.Builder valueBuilder = MetricValue.newBuilder();
valueBuilder.setLongValue(value);
builder.addMetricValues(valueBuilder);
} else if (metrics instanceof LabeledValueHolder) {
DataTable values = ((LabeledValueHolder) metrics).getValue();
values.keys().forEach(key -> {
MetricValue.Builder valueBuilder = MetricValue.newBuilder();
valueBuilder.setLongValue(values.get(key));
DataLabel labels = new DataLabel();
labels.put(key);
labels.forEach((labelName, LabelValue) -> {
KeyValue.Builder kvBuilder = KeyValue.newBuilder();
kvBuilder.setKey(labelName);
kvBuilder.setValue(LabelValue);
valueBuilder.addLabels(kvBuilder);
});
builder.addMetricValues(valueBuilder);
});
} else {
return;
}
builder.setType(ValueType.MULTI_LONG);
} else {
return;
}

MetricsMetaInfo meta = row.getMeta();
builder.setMetricName(meta.getMetricsName());
builder.setEventType(
ExportEvent.EventType.INCREMENT.equals(row.getEventType()) ? EventType.INCREMENT : EventType.TOTAL);
String entityName = getEntityName(meta);
if (entityName == null) {
return;
}
builder.setEntityName(entityName);
builder.setEntityId(meta.getId());
MetricsMetaInfo meta = row.getMeta();
builder.setMetricName(meta.getMetricsName());
builder.setEventType(
ExportEvent.EventType.INCREMENT.equals(row.getEventType()) ? EventType.INCREMENT : EventType.TOTAL);
String entityName = getEntityName(meta);
if (entityName == null) {
return;
}
builder.setEntityName(entityName);
builder.setEntityId(meta.getId());

builder.setTimeBucket(metrics.getTimeBucket());
builder.setTimeBucket(metrics.getTimeBucket());

streamObserver.onNext(builder.build());
exportNum.getAndIncrement();
});
streamObserver.onNext(builder.build());
exportNum.getAndIncrement();
});

streamObserver.onCompleted();
streamObserver.onCompleted();

long sleepTime = 0;
long cycle = 100L;
long sleepTime = 0;
long cycle = 100L;

//For memory safe of oap, we must wait for the peer confirmation.
while (!status.isDone()) {
try {
sleepTime += cycle;
Thread.sleep(cycle);
} catch (InterruptedException e) {
}
//For memory safe of oap, we must wait for the peer confirmation.
while (!status.isDone()) {
try {
sleepTime += cycle;
Thread.sleep(cycle);
} catch (InterruptedException e) {
}

if (sleepTime > 2000L) {
log.warn(
"Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(), setting.getGRPCTargetHost(),
setting
.getGRPCTargetPort(), sleepTime
);
cycle = 2000L;
if (sleepTime > 2000L) {
log.warn(
"Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(),
setting.getGRPCTargetHost(),
setting
.getGRPCTargetPort(), sleepTime
);
cycle = 2000L;
}
}
}

log.debug(
"Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getGRPCTargetHost(), setting
.getGRPCTargetPort(), sleepTime);

log.debug(
"Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getGRPCTargetHost(),
setting
.getGRPCTargetPort(), sleepTime
);
}
fetchSubscriptionList();
}

Expand Down
26 changes: 14 additions & 12 deletions oap-server/exporter/src/main/proto/metric-exporter.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ message ExportMetricValue {
string metricName = 1;
string entityName = 2;
string entityId = 3;
ValueType type = 4;
int64 timeBucket = 5;
int64 longValue = 6;
double doubleValue = 7;
repeated int64 longValues = 8;
EventType eventType = 9;
int64 timeBucket = 4;
EventType eventType = 5;
repeated MetricValue metricValues = 6;
}

message SubscriptionsResp {
Expand All @@ -51,12 +48,6 @@ message SubscriptionMetric {
EventType eventType = 2;
}

enum ValueType {
LONG = 0;
DOUBLE = 1;
MULTI_LONG = 2;
}

enum EventType {
// The metrics aggregated in this bulk, not include the existing persistent data.
INCREMENT = 0;
Expand All @@ -70,3 +61,14 @@ message SubscriptionReq {

message ExportResponse {
}

message MetricValue {
// Could be empty, if it is not a labeled metric.
repeated KeyValue labels = 1;
int64 longValue = 2;
}

message KeyValue {
string key = 1;
string value = 2;
}
Loading

0 comments on commit fe2c4b5

Please sign in to comment.