From a12471ca8921960aca84671d7dacf282843948b5 Mon Sep 17 00:00:00 2001
From: Souvik Bose <souvik04in@gmail.com>
Date: Fri, 13 Dec 2024 11:40:41 -0800
Subject: [PATCH] Add metrics to stream acknowledgement manager (#5256)

* Add metrics to stream acknowledgement manager

Signed-off-by: Souvik Bose <souvbose@amazon.com>

* Fix the unit test failure

Signed-off-by: Souvik Bose <souvbose@amazon.com>

* Fix checkstyle

Signed-off-by: Souvik Bose <souvbose@amazon.com>

* Address comments

Signed-off-by: Souvik Bose <souvbose@amazon.com>

---------

Signed-off-by: Souvik Bose <souvbose@amazon.com>
Co-authored-by: Souvik Bose <souvbose@amazon.com>
---
 .../stream/StreamAcknowledgementManager.java  | 35 ++++++++++++--
 .../plugins/mongo/stream/StreamScheduler.java |  2 +-
 .../StreamAcknowledgementManagerTest.java     | 48 +++++++++++++++++--
 3 files changed, 77 insertions(+), 8 deletions(-)

diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java
index 7567a22786..38d4fc9794 100644
--- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java
+++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java
@@ -1,7 +1,9 @@
 package org.opensearch.dataprepper.plugins.mongo.stream;
 
 import com.google.common.annotations.VisibleForTesting;
+import io.micrometer.core.instrument.Counter;
 import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
+import org.opensearch.dataprepper.metrics.PluginMetrics;
 import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
 import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
 import org.slf4j.Logger;
@@ -16,6 +18,8 @@
 import java.util.concurrent.Executors;
 import java.util.function.Consumer;
 
+import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
+
 
 public class StreamAcknowledgementManager {
     private static final Logger LOG = LoggerFactory.getLogger(StreamAcknowledgementManager.class);
@@ -32,17 +36,35 @@ public class StreamAcknowledgementManager {
 
     private boolean enableAcknowledgement = false;
 
+    private final Counter positiveAcknowledgementSets;
+    private final Counter negativeAcknowledgementSets;
+    private final Counter recordsCheckpointed;
+    private final Counter noDataExtendLeaseCount;
+    private final Counter giveupPartitionCount;
+    public static final String POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME = "positiveAcknowledgementSets";
+    public static final String NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME = "negativeAcknowledgementSets";
+    public static final String RECORDS_CHECKPOINTED = "recordsCheckpointed";
+    public static final String NO_DATA_EXTEND_LEASE_COUNT = "noDataExtendLeaseCount";
+    public static final String GIVE_UP_PARTITION_COUNT = "giveupPartitionCount";
+
+
     public StreamAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager,
                                         final DataStreamPartitionCheckpoint partitionCheckpoint,
                                         final Duration partitionAcknowledgmentTimeout,
                                         final int acknowledgementMonitorWaitTimeInMs,
-                                        final int checkPointIntervalInMs) {
+                                        final int checkPointIntervalInMs,
+                                        final PluginMetrics pluginMetrics) {
         this.acknowledgementSetManager = acknowledgementSetManager;
         this.partitionCheckpoint = partitionCheckpoint;
         this.partitionAcknowledgmentTimeout = partitionAcknowledgmentTimeout;
         this.acknowledgementMonitorWaitTimeInMs = acknowledgementMonitorWaitTimeInMs;
         this.checkPointIntervalInMs = checkPointIntervalInMs;
-        executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("mongodb-stream-ack-monitor"));
+        this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("mongodb-stream-ack-monitor"));
+        this.positiveAcknowledgementSets = pluginMetrics.counter(POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME);
+        this.negativeAcknowledgementSets = pluginMetrics.counter(NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME);
+        this.recordsCheckpointed = pluginMetrics.counter(RECORDS_CHECKPOINTED);
+        this.noDataExtendLeaseCount = pluginMetrics.counter(NO_DATA_EXTEND_LEASE_COUNT);
+        this.giveupPartitionCount = pluginMetrics.counter(GIVE_UP_PARTITION_COUNT);
     }
 
     void init(final Consumer<Void> stopWorkerConsumer) {
@@ -79,17 +101,19 @@ private void monitorAcknowledgment(final ExecutorService executorService, final
                         if (checkpointStatus.isNegativeAcknowledgement()) {
                             // Give up partition and should interrupt parent thread to stop processing stream
                             if (lastCheckpointStatus != null && lastCheckpointStatus.isPositiveAcknowledgement()) {
-                                partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
+                                checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
                             }
                             LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken());
                             partitionCheckpoint.giveUpPartition();
+                            this.giveupPartitionCount.increment();
                             break;
                         }
                     }
                 } else {
                     if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) {
-                        LOG.debug("No records processed. Extend the lease of the partition worker.");
+                        LOG.info(NOISY, "No records processed. Extend the lease of the partition worker.");
                         partitionCheckpoint.extendLease();
+                        this.noDataExtendLeaseCount.increment();
                         lastCheckpointTime = System.currentTimeMillis();
                     }
                 }
@@ -111,6 +135,7 @@ private void monitorAcknowledgment(final ExecutorService executorService, final
     private void checkpoint(final String resumeToken, final long recordCount) {
         LOG.debug("Perform regular checkpointing for resume token {} at record count {}", resumeToken, recordCount);
         partitionCheckpoint.checkpoint(resumeToken, recordCount);
+        this.recordsCheckpointed.increment(recordCount);
     }
 
     Optional<AcknowledgementSet> createAcknowledgementSet(final String resumeToken, final long recordNumber) {
@@ -126,9 +151,11 @@ Optional<AcknowledgementSet> createAcknowledgementSet(final String resumeToken,
             final CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken);
             ackCheckpointStatus.setAcknowledgedTimestamp(Instant.now().toEpochMilli());
             if (result) {
+                this.positiveAcknowledgementSets.increment();
                 ackCheckpointStatus.setAcknowledged(CheckpointStatus.AcknowledgmentStatus.POSITIVE_ACK);
                 LOG.debug("Received acknowledgment of completion from sink for checkpoint {}", resumeToken);
             } else {
+                this.negativeAcknowledgementSets.increment();
                 ackCheckpointStatus.setAcknowledged(CheckpointStatus.AcknowledgmentStatus.NEGATIVE_ACK);
                 LOG.warn("Negative acknowledgment received for checkpoint {}, resetting checkpoint", resumeToken);
                 // default CheckpointStatus acknowledged value is false. The monitorCheckpoints method will time out
diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java
index 82a3db975d..40a80bb71f 100644
--- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java
+++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java
@@ -109,7 +109,7 @@ public void run() {
     private StreamWorker getStreamWorker (final StreamPartition streamPartition) {
         final DataStreamPartitionCheckpoint partitionCheckpoint = new DataStreamPartitionCheckpoint(sourceCoordinator, streamPartition);
         final StreamAcknowledgementManager streamAcknowledgementManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint,
-                sourceConfig.getPartitionAcknowledgmentTimeout(), DEFAULT_MONITOR_WAIT_TIME_MS, DEFAULT_CHECKPOINT_INTERVAL_MILLS);
+                sourceConfig.getPartitionAcknowledgmentTimeout(), DEFAULT_MONITOR_WAIT_TIME_MS, DEFAULT_CHECKPOINT_INTERVAL_MILLS, pluginMetrics);
         final PartitionKeyRecordConverter recordConverter = getPartitionKeyRecordConverter(streamPartition);
         final CollectionConfig partitionCollectionConfig = sourceConfig.getCollections().stream()
                 .filter(collectionConfig -> collectionConfig.getCollection().equals(streamPartition.getCollection()))
diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java
index 4e41008627..c515df0086 100644
--- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java
+++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java
@@ -1,11 +1,13 @@
 package org.opensearch.dataprepper.plugins.mongo.stream;
 
+import io.micrometer.core.instrument.Counter;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
+import org.opensearch.dataprepper.metrics.PluginMetrics;
 import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
 import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
 
@@ -22,11 +24,19 @@
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.CoreMatchers.is;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyDouble;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.when;
+import static org.opensearch.dataprepper.plugins.mongo.stream.StreamAcknowledgementManager.GIVE_UP_PARTITION_COUNT;
+import static org.opensearch.dataprepper.plugins.mongo.stream.StreamAcknowledgementManager.NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME;
+import static org.opensearch.dataprepper.plugins.mongo.stream.StreamAcknowledgementManager.NO_DATA_EXTEND_LEASE_COUNT;
+import static org.opensearch.dataprepper.plugins.mongo.stream.StreamAcknowledgementManager.POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME;
+import static org.opensearch.dataprepper.plugins.mongo.stream.StreamAcknowledgementManager.RECORDS_CHECKPOINTED;
 
 @ExtendWith(MockitoExtension.class)
 public class StreamAcknowledgementManagerTest {
@@ -41,11 +51,32 @@ public class StreamAcknowledgementManagerTest {
     private AcknowledgementSet acknowledgementSet;
     @Mock
     private Consumer<Void> stopWorkerConsumer;
+    @Mock
+    private PluginMetrics pluginMetrics;
+    @Mock
+    private Counter positiveAcknowledgementSets;
+    @Mock
+    private Counter negativeAcknowledgementSets;
+    @Mock
+    private Counter recordsCheckpointed;
+    @Mock
+    private Counter noDataExtendLeaseCount;
+    @Mock
+    private Counter giveupPartitionCount;
+
     private StreamAcknowledgementManager streamAckManager;
 
+
+
     @BeforeEach
     public void setup() {
-        streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0);
+        when(pluginMetrics.counter(POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME)).thenReturn(positiveAcknowledgementSets);
+        when(pluginMetrics.counter(NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME)).thenReturn(negativeAcknowledgementSets);
+        when(pluginMetrics.counter(RECORDS_CHECKPOINTED)).thenReturn(recordsCheckpointed);
+        when(pluginMetrics.counter(NO_DATA_EXTEND_LEASE_COUNT)).thenReturn(noDataExtendLeaseCount);
+        when(pluginMetrics.counter(GIVE_UP_PARTITION_COUNT)).thenReturn(giveupPartitionCount);
+
+        streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0, pluginMetrics);
     }
 
     @Test
@@ -57,7 +88,7 @@ public void createAcknowledgementSet_disabled_emptyAckSet() {
     @Test
     public void createAcknowledgementSet_enabled_ackSetWithAck() {
         lenient().when(timeout.getSeconds()).thenReturn(10_000L);
-        streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0);
+        streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0, pluginMetrics);
         streamAckManager.init(stopWorkerConsumer);
         final String resumeToken = UUID.randomUUID().toString();
         final long recordCount = new Random().nextLong();
@@ -78,12 +109,15 @@ public void createAcknowledgementSet_enabled_ackSetWithAck() {
            .atMost(Duration.ofSeconds(10)).untilAsserted(() ->
                 verify(partitionCheckpoint).checkpoint(resumeToken, recordCount));
         assertThat(streamAckManager.getCheckpoints().peek(), is(nullValue()));
+        verify(positiveAcknowledgementSets).increment();
+        verifyNoInteractions(negativeAcknowledgementSets);
+        verify(recordsCheckpointed, atLeastOnce()).increment(anyDouble());
     }
 
     @Test
     public void createAcknowledgementSet_enabled_multipleAckSetWithAck() {
         lenient().when(timeout.getSeconds()).thenReturn(10_000L);
-        streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0);
+        streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0, pluginMetrics);
         streamAckManager.init(stopWorkerConsumer);
         final String resumeToken1 = UUID.randomUUID().toString();
         final long recordCount1 = new Random().nextLong();
@@ -114,6 +148,10 @@ public void createAcknowledgementSet_enabled_multipleAckSetWithAck() {
             .atMost(Duration.ofSeconds(10)).untilAsserted(() ->
                 verify(partitionCheckpoint).checkpoint(resumeToken2, recordCount2));
         assertThat(streamAckManager.getCheckpoints().peek(), is(nullValue()));
+
+        verify(positiveAcknowledgementSets, atLeastOnce()).increment();
+        verifyNoInteractions(negativeAcknowledgementSets);
+        verify(recordsCheckpointed, atLeastOnce()).increment(anyDouble());
     }
 
     @Test
@@ -149,6 +187,9 @@ public void createAcknowledgementSet_enabled_multipleAckSetWithAckFailure() {
                 verify(partitionCheckpoint).giveUpPartition());
         assertThat(streamAckManager.getCheckpoints().peek().getResumeToken(), is(resumeToken1));
         assertThat(streamAckManager.getCheckpoints().peek().getRecordCount(), is(recordCount1));
+        verify(positiveAcknowledgementSets).increment();
+        verify(negativeAcknowledgementSets).increment();
+        verify(giveupPartitionCount).increment();
         verify(stopWorkerConsumer).accept(null);
     }
 
@@ -173,5 +214,6 @@ public void createAcknowledgementSet_enabled_ackSetWithNoAck() {
         await()
             .atMost(Duration.ofSeconds(10)).untilAsserted(() ->
                 verify(stopWorkerConsumer).accept(null));
+        verify(negativeAcknowledgementSets).increment();
     }
 }