diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java index a006edd7e6479..05fcffb43c6aa 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java @@ -319,6 +319,10 @@ private CompletableFuture performRecordPruning(TopicPartition tp) { fut.completeExceptionally(exp); return; } + shareCoordinatorMetrics.recordPrune( + off, + tp + ); fut.complete(null); // Best effort prevention of issuing duplicate delete calls. lastPrunedOffsets.put(tp, off); diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java index 9076fdcbb8f2b..197b1d76e0b0c 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics; import org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard; @@ -46,6 +47,8 @@ public class ShareCoordinatorMetrics extends CoordinatorMetrics implements AutoC public static final String SHARE_COORDINATOR_WRITE_SENSOR_NAME = "ShareCoordinatorWrite"; public static final String SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME = "ShareCoordinatorWriteLatency"; + public static final String SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME = "ShareCoordinatorStateTopicPruneSensorName"; + private Map pruneMetrics = new ConcurrentHashMap<>(); /** * Global sensors. These are shared across all metrics shards. @@ -92,6 +95,7 @@ public void close() throws Exception { SHARE_COORDINATOR_WRITE_SENSOR_NAME, SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME ).forEach(metrics::removeSensor); + pruneMetrics.values().forEach(v -> metrics.removeSensor(v.pruneSensor.name())); } @Override @@ -153,4 +157,31 @@ public void record(String sensorName) { globalSensors.get(sensorName).record(); } } + + public void recordPrune(double value, TopicPartition tp) { + pruneMetrics.computeIfAbsent(tp, k -> new ShareGroupPruneMetrics(tp)) + .pruneSensor.record(value); + } + + private class ShareGroupPruneMetrics { + private final Sensor pruneSensor; + + ShareGroupPruneMetrics(TopicPartition tp) { + String sensorNameSuffix = tp.toString(); + Map tags = Map.of( + "topic", tp.topic(), + "partition", Integer.toString(tp.partition()) + ); + + pruneSensor = metrics.sensor(SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME + sensorNameSuffix); + + pruneSensor.add( + metrics.metricName("last-pruned-offset", + METRICS_GROUP, + "The offset at which the share-group state topic was last pruned.", + tags), + new Value() + ); + } + } } diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java index e7ace5fd2b923..7f7776cd93ea9 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java @@ -52,6 +52,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -971,11 +972,13 @@ public void testRecordPruningTaskPeriodicityWithAllSuccess() throws Exception { CompletableFuture.completedFuture(Optional.of(11L)) ); + Metrics metrics = new Metrics(); + ShareCoordinatorService service = spy(new ShareCoordinatorService( new LogContext(), ShareCoordinatorTestConfig.testConfig(), runtime, - new ShareCoordinatorMetrics(), + new ShareCoordinatorMetrics(metrics), time, timer, writer @@ -1007,6 +1010,10 @@ public void testRecordPruningTaskPeriodicityWithAllSuccess() throws Exception { verify(writer, times(2)) .deleteRecords(any(), anyLong()); + + checkMetrics(metrics); + checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true); + service.shutdown(); } @@ -1058,11 +1065,13 @@ public void testRecordPruningTaskPeriodicityWithSomeFailures() throws Exception CompletableFuture.completedFuture(Optional.of(21L)) ); + Metrics metrics = new Metrics(); + ShareCoordinatorService service = spy(new ShareCoordinatorService( new LogContext(), ShareCoordinatorTestConfig.testConfig(), runtime, - new ShareCoordinatorMetrics(), + new ShareCoordinatorMetrics(metrics), time, timer, writer @@ -1094,6 +1103,11 @@ public void testRecordPruningTaskPeriodicityWithSomeFailures() throws Exception verify(writer, times(4)) .deleteRecords(any(), anyLong()); + + checkMetrics(metrics); + checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true); + checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1, false); + service.shutdown(); } @@ -1111,11 +1125,13 @@ public void testRecordPruningTaskException() throws Exception { any() )).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())); + Metrics metrics = new Metrics(); + ShareCoordinatorService service = spy(new ShareCoordinatorService( new LogContext(), ShareCoordinatorTestConfig.testConfig(), runtime, - new ShareCoordinatorMetrics(), + new ShareCoordinatorMetrics(metrics), time, timer, writer @@ -1139,6 +1155,10 @@ public void testRecordPruningTaskException() throws Exception { verify(writer, times(0)) .deleteRecords(any(), anyLong()); + + checkMetrics(metrics); + checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, false); + service.shutdown(); } @@ -1156,11 +1176,13 @@ public void testRecordPruningTaskSuccess() throws Exception { any() )).thenReturn(CompletableFuture.completedFuture(Optional.of(20L))); + Metrics metrics = new Metrics(); + ShareCoordinatorService service = spy(new ShareCoordinatorService( new LogContext(), ShareCoordinatorTestConfig.testConfig(), runtime, - new ShareCoordinatorMetrics(), + new ShareCoordinatorMetrics(metrics), time, timer, writer @@ -1184,6 +1206,9 @@ public void testRecordPruningTaskSuccess() throws Exception { verify(writer, times(1)) .deleteRecords(any(), eq(20L)); + + checkMetrics(metrics); + service.shutdown(); } @@ -1201,11 +1226,12 @@ public void testRecordPruningTaskEmptyOffsetReturned() throws Exception { any() )).thenReturn(CompletableFuture.completedFuture(Optional.empty())); + Metrics metrics = new Metrics(); ShareCoordinatorService service = spy(new ShareCoordinatorService( new LogContext(), ShareCoordinatorTestConfig.testConfig(), runtime, - new ShareCoordinatorMetrics(), + new ShareCoordinatorMetrics(metrics), time, timer, writer @@ -1229,6 +1255,10 @@ public void testRecordPruningTaskEmptyOffsetReturned() throws Exception { verify(writer, times(0)) .deleteRecords(any(), anyLong()); + + checkMetrics(metrics); + checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, false); + service.shutdown(); } @@ -1257,11 +1287,12 @@ public void testRecordPruningTaskRepeatedSameOffsetForTopic() throws Exception { CompletableFuture.completedFuture(Optional.of(10L)) ); + Metrics metrics = new Metrics(); ShareCoordinatorService service = spy(new ShareCoordinatorService( new LogContext(), ShareCoordinatorTestConfig.testConfig(), runtime, - new ShareCoordinatorMetrics(), + new ShareCoordinatorMetrics(metrics), time, timer, writer @@ -1293,6 +1324,10 @@ public void testRecordPruningTaskRepeatedSameOffsetForTopic() throws Exception { verify(writer, times(1)) .deleteRecords(any(), anyLong()); + + checkMetrics(metrics); + checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true); + service.shutdown(); } @@ -1325,11 +1360,13 @@ public void testRecordPruningTaskRetriesRepeatedSameOffsetForTopic() throws Exce CompletableFuture.completedFuture(Optional.of(10L)) ); + Metrics metrics = new Metrics(); + ShareCoordinatorService service = spy(new ShareCoordinatorService( new LogContext(), ShareCoordinatorTestConfig.testConfig(), runtime, - new ShareCoordinatorMetrics(), + new ShareCoordinatorMetrics(metrics), time, timer, writer @@ -1361,6 +1398,36 @@ public void testRecordPruningTaskRetriesRepeatedSameOffsetForTopic() throws Exce verify(writer, times(2)) .deleteRecords(any(), anyLong()); + + checkMetrics(metrics); + checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true); + service.shutdown(); } + + private void checkMetrics(Metrics metrics) { + Set usualMetrics = new HashSet<>(Arrays.asList( + metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP), + metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP), + metrics.metricName("write-rate", ShareCoordinatorMetrics.METRICS_GROUP), + metrics.metricName("write-total", ShareCoordinatorMetrics.METRICS_GROUP) + )); + + usualMetrics.forEach(metric -> assertTrue(metrics.metrics().containsKey(metric))); + } + + private void checkPruneMetric(Metrics metrics, String topic, int partition, boolean checkPresence) { + boolean isPresent = metrics.metrics().containsKey( + metrics.metricName( + "last-pruned-offset", + ShareCoordinatorMetrics.METRICS_GROUP, + "The offset at which the share-group state topic was last pruned.", + Map.of( + "topic", topic, + "partition", Integer.toString(partition) + ) + ) + ); + assertEquals(checkPresence, isPresent); + } } diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java index 80b2889ef93a3..39ca697f0696c 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -27,10 +28,12 @@ import java.util.Arrays; import java.util.HashSet; +import java.util.Map; import static org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME; import static org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; public class ShareCoordinatorMetricsTest { @@ -46,14 +49,21 @@ public void testMetricNames() { metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP) )); - ShareCoordinatorMetrics ignored = new ShareCoordinatorMetrics(metrics); + ShareCoordinatorMetrics coordMetrics = new ShareCoordinatorMetrics(metrics); for (MetricName metricName : expectedMetrics) { assertTrue(metrics.metrics().containsKey(metricName)); } + + assertFalse(metrics.metrics().containsKey(pruneMetricName(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1))); + coordMetrics.recordPrune( + 10.0, + new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1) + ); + assertTrue(metrics.metrics().containsKey(pruneMetricName(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1))); } @Test - public void testGlobalSensors() { + public void testShardGlobalSensors() { MockTime time = new MockTime(); Metrics metrics = new Metrics(time); ShareCoordinatorMetrics coordinatorMetrics = new ShareCoordinatorMetrics(metrics); @@ -71,7 +81,43 @@ public void testGlobalSensors() { assertMetricValue(metrics, metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP), 30.0); } + @Test + public void testCoordinatorGlobalSensors() { + MockTime time = new MockTime(); + Metrics metrics = new Metrics(time); + ShareCoordinatorMetrics coordinatorMetrics = new ShareCoordinatorMetrics(metrics); + + coordinatorMetrics.record(SHARE_COORDINATOR_WRITE_SENSOR_NAME); + assertMetricValue(metrics, metrics.metricName("write-rate", ShareCoordinatorMetrics.METRICS_GROUP), 1.0 / 30); //sampled stats + assertMetricValue(metrics, metrics.metricName("write-total", ShareCoordinatorMetrics.METRICS_GROUP), 1.0); + + coordinatorMetrics.record(SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME, 20); + coordinatorMetrics.record(SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME, 30); + assertMetricValue(metrics, metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP), 50.0 / 2); + assertMetricValue(metrics, metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP), 30.0); + + + assertFalse(metrics.metrics().containsKey(pruneMetricName(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1))); + coordinatorMetrics.recordPrune( + 10.0, + new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1) + ); + assertMetricValue(metrics, pruneMetricName(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1), 10.0); + } + private void assertMetricValue(Metrics metrics, MetricName metricName, double val) { assertEquals(val, metrics.metric(metricName).metricValue()); } + + private MetricName pruneMetricName(Metrics metrics, String topic, Integer partition) { + return metrics.metricName( + "last-pruned-offset", + ShareCoordinatorMetrics.METRICS_GROUP, + "The offset at which the share-group state topic was last pruned.", + Map.of( + "topic", topic, + "partition", Integer.toString(partition) + ) + ); + } }