Skip to content

Commit

Permalink
Add metrics to stream acknowledgement manager (#5256)
Browse files Browse the repository at this point in the history
* Add metrics to stream acknowledgement manager

Signed-off-by: Souvik Bose <[email protected]>

* Fix the unit test failure

Signed-off-by: Souvik Bose <[email protected]>

* Fix checkstyle

Signed-off-by: Souvik Bose <[email protected]>

* Address comments

Signed-off-by: Souvik Bose <[email protected]>

---------

Signed-off-by: Souvik Bose <[email protected]>
Co-authored-by: Souvik Bose <[email protected]>
  • Loading branch information
sb2k16 and sbose2k21 authored Dec 13, 2024
1 parent 46973ec commit a12471c
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.opensearch.dataprepper.plugins.mongo.stream;

import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.slf4j.Logger;
Expand All @@ -16,6 +18,8 @@
import java.util.concurrent.Executors;
import java.util.function.Consumer;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;


public class StreamAcknowledgementManager {
private static final Logger LOG = LoggerFactory.getLogger(StreamAcknowledgementManager.class);
Expand All @@ -32,17 +36,35 @@ public class StreamAcknowledgementManager {

private boolean enableAcknowledgement = false;

private final Counter positiveAcknowledgementSets;
private final Counter negativeAcknowledgementSets;
private final Counter recordsCheckpointed;
private final Counter noDataExtendLeaseCount;
private final Counter giveupPartitionCount;
public static final String POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME = "positiveAcknowledgementSets";
public static final String NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME = "negativeAcknowledgementSets";
public static final String RECORDS_CHECKPOINTED = "recordsCheckpointed";
public static final String NO_DATA_EXTEND_LEASE_COUNT = "noDataExtendLeaseCount";
public static final String GIVE_UP_PARTITION_COUNT = "giveupPartitionCount";


public StreamAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager,
final DataStreamPartitionCheckpoint partitionCheckpoint,
final Duration partitionAcknowledgmentTimeout,
final int acknowledgementMonitorWaitTimeInMs,
final int checkPointIntervalInMs) {
final int checkPointIntervalInMs,
final PluginMetrics pluginMetrics) {
this.acknowledgementSetManager = acknowledgementSetManager;
this.partitionCheckpoint = partitionCheckpoint;
this.partitionAcknowledgmentTimeout = partitionAcknowledgmentTimeout;
this.acknowledgementMonitorWaitTimeInMs = acknowledgementMonitorWaitTimeInMs;
this.checkPointIntervalInMs = checkPointIntervalInMs;
executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("mongodb-stream-ack-monitor"));
this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("mongodb-stream-ack-monitor"));
this.positiveAcknowledgementSets = pluginMetrics.counter(POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME);
this.negativeAcknowledgementSets = pluginMetrics.counter(NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME);
this.recordsCheckpointed = pluginMetrics.counter(RECORDS_CHECKPOINTED);
this.noDataExtendLeaseCount = pluginMetrics.counter(NO_DATA_EXTEND_LEASE_COUNT);
this.giveupPartitionCount = pluginMetrics.counter(GIVE_UP_PARTITION_COUNT);
}

void init(final Consumer<Void> stopWorkerConsumer) {
Expand Down Expand Up @@ -79,17 +101,19 @@ private void monitorAcknowledgment(final ExecutorService executorService, final
if (checkpointStatus.isNegativeAcknowledgement()) {
// Give up partition and should interrupt parent thread to stop processing stream
if (lastCheckpointStatus != null && lastCheckpointStatus.isPositiveAcknowledgement()) {
partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
}
LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken());
partitionCheckpoint.giveUpPartition();
this.giveupPartitionCount.increment();
break;
}
}
} else {
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) {
LOG.debug("No records processed. Extend the lease of the partition worker.");
LOG.info(NOISY, "No records processed. Extend the lease of the partition worker.");
partitionCheckpoint.extendLease();
this.noDataExtendLeaseCount.increment();
lastCheckpointTime = System.currentTimeMillis();
}
}
Expand All @@ -111,6 +135,7 @@ private void monitorAcknowledgment(final ExecutorService executorService, final
private void checkpoint(final String resumeToken, final long recordCount) {
LOG.debug("Perform regular checkpointing for resume token {} at record count {}", resumeToken, recordCount);
partitionCheckpoint.checkpoint(resumeToken, recordCount);
this.recordsCheckpointed.increment(recordCount);
}

Optional<AcknowledgementSet> createAcknowledgementSet(final String resumeToken, final long recordNumber) {
Expand All @@ -126,9 +151,11 @@ Optional<AcknowledgementSet> createAcknowledgementSet(final String resumeToken,
final CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken);
ackCheckpointStatus.setAcknowledgedTimestamp(Instant.now().toEpochMilli());
if (result) {
this.positiveAcknowledgementSets.increment();
ackCheckpointStatus.setAcknowledged(CheckpointStatus.AcknowledgmentStatus.POSITIVE_ACK);
LOG.debug("Received acknowledgment of completion from sink for checkpoint {}", resumeToken);
} else {
this.negativeAcknowledgementSets.increment();
ackCheckpointStatus.setAcknowledged(CheckpointStatus.AcknowledgmentStatus.NEGATIVE_ACK);
LOG.warn("Negative acknowledgment received for checkpoint {}, resetting checkpoint", resumeToken);
// default CheckpointStatus acknowledged value is false. The monitorCheckpoints method will time out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void run() {
private StreamWorker getStreamWorker (final StreamPartition streamPartition) {
final DataStreamPartitionCheckpoint partitionCheckpoint = new DataStreamPartitionCheckpoint(sourceCoordinator, streamPartition);
final StreamAcknowledgementManager streamAcknowledgementManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint,
sourceConfig.getPartitionAcknowledgmentTimeout(), DEFAULT_MONITOR_WAIT_TIME_MS, DEFAULT_CHECKPOINT_INTERVAL_MILLS);
sourceConfig.getPartitionAcknowledgmentTimeout(), DEFAULT_MONITOR_WAIT_TIME_MS, DEFAULT_CHECKPOINT_INTERVAL_MILLS, pluginMetrics);
final PartitionKeyRecordConverter recordConverter = getPartitionKeyRecordConverter(streamPartition);
final CollectionConfig partitionCollectionConfig = sourceConfig.getCollections().stream()
.filter(collectionConfig -> collectionConfig.getCollection().equals(streamPartition.getCollection()))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package org.opensearch.dataprepper.plugins.mongo.stream;

import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;

Expand All @@ -22,11 +24,19 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.mongo.stream.StreamAcknowledgementManager.GIVE_UP_PARTITION_COUNT;
import static org.opensearch.dataprepper.plugins.mongo.stream.StreamAcknowledgementManager.NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.mongo.stream.StreamAcknowledgementManager.NO_DATA_EXTEND_LEASE_COUNT;
import static org.opensearch.dataprepper.plugins.mongo.stream.StreamAcknowledgementManager.POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.mongo.stream.StreamAcknowledgementManager.RECORDS_CHECKPOINTED;

@ExtendWith(MockitoExtension.class)
public class StreamAcknowledgementManagerTest {
Expand All @@ -41,11 +51,32 @@ public class StreamAcknowledgementManagerTest {
private AcknowledgementSet acknowledgementSet;
@Mock
private Consumer<Void> stopWorkerConsumer;
@Mock
private PluginMetrics pluginMetrics;
@Mock
private Counter positiveAcknowledgementSets;
@Mock
private Counter negativeAcknowledgementSets;
@Mock
private Counter recordsCheckpointed;
@Mock
private Counter noDataExtendLeaseCount;
@Mock
private Counter giveupPartitionCount;

private StreamAcknowledgementManager streamAckManager;



@BeforeEach
public void setup() {
streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0);
when(pluginMetrics.counter(POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME)).thenReturn(positiveAcknowledgementSets);
when(pluginMetrics.counter(NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME)).thenReturn(negativeAcknowledgementSets);
when(pluginMetrics.counter(RECORDS_CHECKPOINTED)).thenReturn(recordsCheckpointed);
when(pluginMetrics.counter(NO_DATA_EXTEND_LEASE_COUNT)).thenReturn(noDataExtendLeaseCount);
when(pluginMetrics.counter(GIVE_UP_PARTITION_COUNT)).thenReturn(giveupPartitionCount);

streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0, pluginMetrics);
}

@Test
Expand All @@ -57,7 +88,7 @@ public void createAcknowledgementSet_disabled_emptyAckSet() {
@Test
public void createAcknowledgementSet_enabled_ackSetWithAck() {
lenient().when(timeout.getSeconds()).thenReturn(10_000L);
streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0);
streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0, pluginMetrics);
streamAckManager.init(stopWorkerConsumer);
final String resumeToken = UUID.randomUUID().toString();
final long recordCount = new Random().nextLong();
Expand All @@ -78,12 +109,15 @@ public void createAcknowledgementSet_enabled_ackSetWithAck() {
.atMost(Duration.ofSeconds(10)).untilAsserted(() ->
verify(partitionCheckpoint).checkpoint(resumeToken, recordCount));
assertThat(streamAckManager.getCheckpoints().peek(), is(nullValue()));
verify(positiveAcknowledgementSets).increment();
verifyNoInteractions(negativeAcknowledgementSets);
verify(recordsCheckpointed, atLeastOnce()).increment(anyDouble());
}

@Test
public void createAcknowledgementSet_enabled_multipleAckSetWithAck() {
lenient().when(timeout.getSeconds()).thenReturn(10_000L);
streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0);
streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0, pluginMetrics);
streamAckManager.init(stopWorkerConsumer);
final String resumeToken1 = UUID.randomUUID().toString();
final long recordCount1 = new Random().nextLong();
Expand Down Expand Up @@ -114,6 +148,10 @@ public void createAcknowledgementSet_enabled_multipleAckSetWithAck() {
.atMost(Duration.ofSeconds(10)).untilAsserted(() ->
verify(partitionCheckpoint).checkpoint(resumeToken2, recordCount2));
assertThat(streamAckManager.getCheckpoints().peek(), is(nullValue()));

verify(positiveAcknowledgementSets, atLeastOnce()).increment();
verifyNoInteractions(negativeAcknowledgementSets);
verify(recordsCheckpointed, atLeastOnce()).increment(anyDouble());
}

@Test
Expand Down Expand Up @@ -149,6 +187,9 @@ public void createAcknowledgementSet_enabled_multipleAckSetWithAckFailure() {
verify(partitionCheckpoint).giveUpPartition());
assertThat(streamAckManager.getCheckpoints().peek().getResumeToken(), is(resumeToken1));
assertThat(streamAckManager.getCheckpoints().peek().getRecordCount(), is(recordCount1));
verify(positiveAcknowledgementSets).increment();
verify(negativeAcknowledgementSets).increment();
verify(giveupPartitionCount).increment();
verify(stopWorkerConsumer).accept(null);
}

Expand All @@ -173,5 +214,6 @@ public void createAcknowledgementSet_enabled_ackSetWithNoAck() {
await()
.atMost(Duration.ofSeconds(10)).untilAsserted(() ->
verify(stopWorkerConsumer).accept(null));
verify(negativeAcknowledgementSets).increment();
}
}

0 comments on commit a12471c

Please sign in to comment.