Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 48 additions & 30 deletions xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.grpc.util.ForwardingSubchannel;
import io.grpc.util.MultiChildLoadBalancer;
import io.grpc.xds.internal.MetricReportUtils;
import io.grpc.xds.internal.MetricReportUtils.ParsedMetricName;
import io.grpc.xds.orca.OrcaOobUtil;
import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
import io.grpc.xds.orca.OrcaPerRequestUtil;
Expand Down Expand Up @@ -239,7 +240,7 @@ protected void updateOverallBalancingState() {
private SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {
WeightedRoundRobinPicker picker = new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
config.enableOobLoadReport, config.errorUtilizationPenalty, sequence,
config.metricNamesForComputingUtilization);
config.parsedMetricNamesForComputingUtilization);
updateWeight(picker);
return picker;
}
Expand Down Expand Up @@ -329,15 +330,15 @@ public void addSubchannel(WrrSubchannel wrrSubchannel) {
}

public OrcaReportListener getOrCreateOrcaListener(float errorUtilizationPenalty,
ImmutableList<String> metricNamesForComputingUtilization) {
ImmutableList<ParsedMetricName> parsedMetricNamesForComputingUtilization) {
if (orcaReportListener != null
&& orcaReportListener.errorUtilizationPenalty == errorUtilizationPenalty
&& orcaReportListener.metricNamesForComputingUtilization
.equals(metricNamesForComputingUtilization)) {
&& orcaReportListener.parsedMetricNamesForComputingUtilization
.equals(parsedMetricNamesForComputingUtilization)) {
return orcaReportListener;
}
orcaReportListener =
new OrcaReportListener(errorUtilizationPenalty, metricNamesForComputingUtilization);
new OrcaReportListener(errorUtilizationPenalty, parsedMetricNamesForComputingUtilization);
return orcaReportListener;
}

Expand All @@ -362,17 +363,17 @@ public void updateBalancingState(ConnectivityState newState, SubchannelPicker ne

final class OrcaReportListener implements OrcaPerRequestReportListener, OrcaOobReportListener {
private final float errorUtilizationPenalty;
private final ImmutableList<String> metricNamesForComputingUtilization;
private final ImmutableList<ParsedMetricName> parsedMetricNamesForComputingUtilization;

OrcaReportListener(float errorUtilizationPenalty,
ImmutableList<String> metricNamesForComputingUtilization) {
ImmutableList<ParsedMetricName> parsedMetricNamesForComputingUtilization) {
this.errorUtilizationPenalty = errorUtilizationPenalty;
this.metricNamesForComputingUtilization = metricNamesForComputingUtilization;
this.parsedMetricNamesForComputingUtilization = parsedMetricNamesForComputingUtilization;
}

@Override
public void onLoadReport(MetricReport report) {
double utilization = getUtilization(report, metricNamesForComputingUtilization);
double utilization = getUtilization(report);

double newWeight = 0;
if (utilization > 0 && report.getQps() > 0) {
Expand All @@ -398,8 +399,8 @@ public void onLoadReport(MetricReport report) {
* if application utilization is > 0, it is returned. If neither are present, the CPU
* utilization is returned.
*/
private double getUtilization(MetricReport report, ImmutableList<String> metricNames) {
OptionalDouble customUtil = getCustomMetricUtilization(report, metricNames);
private double getUtilization(MetricReport report) {
OptionalDouble customUtil = getCustomMetricUtilization(report);
if (customUtil.isPresent()) {
return customUtil.getAsDouble();
}
Expand All @@ -411,19 +412,23 @@ private double getUtilization(MetricReport report, ImmutableList<String> metricN
}

/**
* Returns the maximum utilization value among the specified metric names.
* Returns the maximum utilization value among the parsed metric names.
* Returns OptionalDouble.empty() if NONE of the specified metrics are present in the report,
* or if all present metrics are NaN.
* Returns OptionalDouble.of(maxUtil) if at least one non-NaN metric is present.
* or if all present metrics are NaN or non positive.
*/
private OptionalDouble getCustomMetricUtilization(MetricReport report,
ImmutableList<String> metricNames) {
return metricNames.stream()
.map(name -> MetricReportUtils.getMetric(report, name))
.filter(OptionalDouble::isPresent)
.mapToDouble(OptionalDouble::getAsDouble)
.filter(d -> !Double.isNaN(d) && d > 0)
.max();
private OptionalDouble getCustomMetricUtilization(MetricReport report) {
OptionalDouble max = OptionalDouble.empty();
for (int i = 0; i < parsedMetricNamesForComputingUtilization.size(); i++) {
OptionalDouble opt = MetricReportUtils.getMetricValue(report,
parsedMetricNamesForComputingUtilization.get(i));
if (opt.isPresent()) {
double d = opt.getAsDouble();
if (!Double.isNaN(d) && d > 0 && (!max.isPresent() || d > max.getAsDouble())) {
max = opt;
}
}
}
return max;
}
Comment thread
sauravzg marked this conversation as resolved.
}
}
Expand All @@ -446,7 +451,7 @@ private void createAndApplyOrcaListeners() {
if (config.enableOobLoadReport) {
OrcaOobUtil.setListener(weightedSubchannel,
wChild.getOrCreateOrcaListener(config.errorUtilizationPenalty,
config.metricNamesForComputingUtilization),
config.parsedMetricNamesForComputingUtilization),
OrcaOobUtil.OrcaReportingConfig.newBuilder()
.setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS).build());
} else {
Expand Down Expand Up @@ -516,7 +521,7 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {

WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
float errorUtilizationPenalty, AtomicInteger sequence,
ImmutableList<String> metricNamesForComputingUtilization) {
ImmutableList<ParsedMetricName> parsedMetricNamesForComputingUtilization) {
checkNotNull(children, "children");
Preconditions.checkArgument(!children.isEmpty(), "empty child list");
this.children = children;
Expand All @@ -526,7 +531,7 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {
WeightedChildLbState wChild = (WeightedChildLbState) child;
pickers.add(wChild.getCurrentPicker());
reportListeners.add(wChild.getOrCreateOrcaListener(errorUtilizationPenalty,
metricNamesForComputingUtilization));
parsedMetricNamesForComputingUtilization));
}
this.pickers = pickers;
this.reportListeners = reportListeners;
Expand Down Expand Up @@ -767,7 +772,7 @@ static final class WeightedRoundRobinLoadBalancerConfig {
final long oobReportingPeriodNanos;
final long weightUpdatePeriodNanos;
final float errorUtilizationPenalty;
final ImmutableList<String> metricNamesForComputingUtilization;
final ImmutableList<ParsedMetricName> parsedMetricNamesForComputingUtilization;

public static Builder newBuilder() {
return new Builder();
Expand All @@ -783,7 +788,20 @@ private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos,
this.oobReportingPeriodNanos = oobReportingPeriodNanos;
this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
this.errorUtilizationPenalty = errorUtilizationPenalty;
this.metricNamesForComputingUtilization = metricNamesForComputingUtilization;

ImmutableList.Builder<ParsedMetricName> builder = ImmutableList.builder();
if (metricNamesForComputingUtilization != null) {
for (int i = 0; i < metricNamesForComputingUtilization.size(); i++) {
String metricName = metricNamesForComputingUtilization.get(i);
ParsedMetricName parsed = MetricReportUtils.ParsedMetricName.parse(metricName);
if (parsed.getMetricType() != MetricReportUtils.MetricType.INVALID) {
builder.add(parsed);
} else {
log.log(Level.FINE, "Invalid custom metric name configured and ignored: " + metricName);
}
}
}
this.parsedMetricNamesForComputingUtilization = builder.build();
}

@Override
Expand All @@ -799,15 +817,15 @@ public boolean equals(Object o) {
&& this.weightUpdatePeriodNanos == that.weightUpdatePeriodNanos
// Float.compare considers NaNs equal
&& Float.compare(this.errorUtilizationPenalty, that.errorUtilizationPenalty) == 0
&& Objects.equals(this.metricNamesForComputingUtilization,
that.metricNamesForComputingUtilization);
&& Objects.equals(this.parsedMetricNamesForComputingUtilization,
that.parsedMetricNamesForComputingUtilization);
}

@Override
public int hashCode() {
return Objects.hash(blackoutPeriodNanos, weightExpirationPeriodNanos, enableOobLoadReport,
oobReportingPeriodNanos, weightUpdatePeriodNanos, errorUtilizationPenalty,
metricNamesForComputingUtilization);
parsedMetricNamesForComputingUtilization);
}

static final class Builder {
Expand Down
116 changes: 84 additions & 32 deletions xds/src/main/java/io/grpc/xds/internal/MetricReportUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,104 @@

package io.grpc.xds.internal;

import com.google.auto.value.AutoValue;
import io.grpc.services.MetricReport;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalDouble;


/**
* Utilities for parsing and resolving metrics from {@link MetricReport}.
*/
public final class MetricReportUtils {

private MetricReportUtils() {}

public enum MetricType {
CPU_UTILIZATION,
APPLICATION_UTILIZATION,
MEMORY_UTILIZATION,
UTILIZATION,
NAMED_METRICS,
INVALID
}

@AutoValue
public abstract static class ParsedMetricName {
public abstract MetricType getMetricType();

public abstract Optional<String> getKey();

public static ParsedMetricName create(MetricType metricType, Optional<String> key) {
return new AutoValue_MetricReportUtils_ParsedMetricName(metricType, key);
}

/**
* Pre-parses a custom metric name into a {@link ParsedMetricName}.
*
* @param name The custom metric name to parse.
* @return The parsed metric name.
*/
public static ParsedMetricName parse(String name) {
Comment thread
sauravzg marked this conversation as resolved.
if (name.equals("cpu_utilization")) {
return create(MetricType.CPU_UTILIZATION, Optional.empty());
}
if (name.equals("application_utilization")) {
return create(MetricType.APPLICATION_UTILIZATION, Optional.empty());
}
if (name.equals("mem_utilization")) {
return create(MetricType.MEMORY_UTILIZATION, Optional.empty());
}
if (name.startsWith("utilization.")) {
return create(MetricType.UTILIZATION, Optional.of(name.substring("utilization.".length())));
}
if (name.startsWith("named_metrics.")) {
return create(MetricType.NAMED_METRICS,
Optional.of(name.substring("named_metrics.".length())));
}
return create(MetricType.INVALID, Optional.empty());
}

}

/**
* Resolves a metric value from the report based on the given metric name.
* The logic checks for specific prefixes to determine where to look up the metric:
* <ul>
* <li>"cpu_utilization" -> getCpuUtilization()</li>
* <li>"application_utilization" -> getApplicationUtilization()</li>
* <li>"mem_utilization" -> getMemoryUtilization()</li>
* <li>"utilization." -> lookup in utilizationMetrics</li>
* <li>"named_metrics." -> lookup in namedMetrics</li>
* </ul>
* Resolves a custom metric value for `parsedMetric`
* Returns OptionalDouble.empty() if the metric is absent or invalid.
*
* @param report The metric report to query.
* @param metricName The name of the custom metric to look up.
* @return The value of the metric if found, or empty if not found.
* @param parsedMetric The parsed metric to lookup.
* @return The metric value wrapped in an OptionalDouble, or empty if absent.
*/
public static OptionalDouble getMetric(MetricReport report, String metricName) {
if (metricName.equals("cpu_utilization")) {
return OptionalDouble.of(report.getCpuUtilization());
} else if (metricName.equals("application_utilization")) {
return OptionalDouble.of(report.getApplicationUtilization());
} else if (metricName.equals("mem_utilization")) {
return OptionalDouble.of(report.getMemoryUtilization());
} else if (metricName.startsWith("utilization.")) {
Map<String, Double> map = report.getUtilizationMetrics();
Double val = map.get(metricName.substring("utilization.".length()));
if (val != null) {
return OptionalDouble.of(val);
}
} else if (metricName.startsWith("named_metrics.")) {
Map<String, Double> map = report.getNamedMetrics();
Double val = map.get(metricName.substring("named_metrics.".length()));
if (val != null) {
return OptionalDouble.of(val);
}

public static OptionalDouble getMetricValue(MetricReport report, ParsedMetricName parsedMetric) {
switch (parsedMetric.getMetricType()) {
case CPU_UTILIZATION:
return OptionalDouble.of(report.getCpuUtilization());
case APPLICATION_UTILIZATION:
return OptionalDouble.of(report.getApplicationUtilization());
case MEMORY_UTILIZATION:
return OptionalDouble.of(report.getMemoryUtilization());
case UTILIZATION:
if (parsedMetric.getKey().isPresent()) {
String key = parsedMetric.getKey().get();
Double val = report.getUtilizationMetrics().get(key);
if (val != null) {
return OptionalDouble.of(val);
}
}
return OptionalDouble.empty();
case NAMED_METRICS:
if (parsedMetric.getKey().isPresent()) {
String key = parsedMetric.getKey().get();
Double val = report.getNamedMetrics().get(key);
if (val != null) {
return OptionalDouble.of(val);
}
}
return OptionalDouble.empty();
case INVALID:
default:
return OptionalDouble.empty();
}
return OptionalDouble.empty();
}
Comment thread
sauravzg marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.grpc.internal.FakeClock;
import io.grpc.internal.JsonParser;
import io.grpc.xds.WeightedRoundRobinLoadBalancer.WeightedRoundRobinLoadBalancerConfig;
import io.grpc.xds.internal.MetricReportUtils.ParsedMetricName;
import java.io.IOException;
import java.util.Map;
import org.junit.Test;
Expand Down Expand Up @@ -112,16 +113,19 @@ public void parseLoadBalancingConfigDefaultValues() throws IOException {
}

@Test
public void parseLoadBalancingConfigCustomMetrics() throws IOException {
public void parseLoadBalancingConfigCustomMetricsIgnoresInvalid() throws IOException {
System.setProperty("GRPC_EXPERIMENTAL_WRR_CUSTOM_METRICS", "true");
try {
String lbConfig = "{\"metricNamesForComputingUtilization\" : [\"foo\", \"bar\"]}";
String lbConfig =
"{\"metricNamesForComputingUtilization\" : "
+ "[\"utilization.foo\", \"invalid_name\", \"named_metrics.bar\"]}";
ConfigOrError configOrError = provider.parseLoadBalancingPolicyConfig(
parseJsonObject(lbConfig));
assertThat(configOrError.getConfig()).isNotNull();
WeightedRoundRobinLoadBalancerConfig config =
(WeightedRoundRobinLoadBalancerConfig) configOrError.getConfig();
assertThat(config.metricNamesForComputingUtilization).containsExactly("foo", "bar");
assertThat(config.parsedMetricNamesForComputingUtilization).containsExactly(
ParsedMetricName.parse("utilization.foo"), ParsedMetricName.parse("named_metrics.bar"));
} finally {
System.clearProperty("GRPC_EXPERIMENTAL_WRR_CUSTOM_METRICS");
}
Expand Down
Loading
Loading