diff --git a/data-prepper-plugins/rds-source/build.gradle b/data-prepper-plugins/rds-source/build.gradle index 09f734cc91..1b325457bf 100644 --- a/data-prepper-plugins/rds-source/build.gradle +++ b/data-prepper-plugins/rds-source/build.gradle @@ -34,5 +34,4 @@ dependencies { testImplementation libs.avro.core testImplementation libs.parquet.hadoop testImplementation libs.parquet.avro -// testImplementation 'org.slf4j:slf4j-simple:2.0.9' } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java index 4445c0d0df..1612e94ec3 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java @@ -130,7 +130,8 @@ public BinlogEventListener(final StreamPartition streamPartition, this.dbTableMetadata = dbTableMetadata; this.streamCheckpointManager = new StreamCheckpointManager( streamCheckpointer, sourceConfig.isAcknowledgmentsEnabled(), - acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout(), sourceConfig.getEngine()); + acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout(), + sourceConfig.getEngine(), pluginMetrics); streamCheckpointManager.start(); this.cascadeActionDetector = cascadeActionDetector; @@ -200,7 +201,7 @@ void handleRotateEvent(com.github.shyiko.mysql.binlog.event.Event event) { // Trigger a checkpoint update for this rotate when there're no row mutation events being processed if (streamCheckpointManager.getChangeEventStatuses().isEmpty()) { - ChangeEventStatus changeEventStatus = streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate); + ChangeEventStatus changeEventStatus = streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate, 0); if (isAcknowledgmentsEnabled) { changeEventStatus.setAcknowledgmentStatus(ChangeEventStatus.AcknowledgmentStatus.POSITIVE_ACK); } @@ -347,9 +348,10 @@ void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event, LOG.debug("Current binlog coordinate after receiving a row change event: " + currentBinlogCoordinate); } + final long recordCount = rows.size(); AcknowledgementSet acknowledgementSet = null; if (isAcknowledgmentsEnabled) { - acknowledgementSet = streamCheckpointManager.createAcknowledgmentSet(currentBinlogCoordinate); + acknowledgementSet = streamCheckpointManager.createAcknowledgmentSet(currentBinlogCoordinate, recordCount); } final long bytes = event.toString().getBytes().length; @@ -398,7 +400,7 @@ void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event, if (isAcknowledgmentsEnabled) { acknowledgementSet.complete(); } else { - streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate); + streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate, recordCount); } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/ChangeEventStatus.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/ChangeEventStatus.java index 7a425dcc4e..af6ef02362 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/ChangeEventStatus.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/ChangeEventStatus.java @@ -13,6 +13,7 @@ public class ChangeEventStatus { private final BinlogCoordinate binlogCoordinate; private final LogSequenceNumber logSequenceNumber; private final long timestamp; + private final long recordCount; private volatile AcknowledgmentStatus acknowledgmentStatus; public enum AcknowledgmentStatus { @@ -21,17 +22,19 @@ public enum AcknowledgmentStatus { NO_ACK } - public ChangeEventStatus(final BinlogCoordinate binlogCoordinate, final long timestamp) { + public ChangeEventStatus(final BinlogCoordinate binlogCoordinate, final long timestamp, final long recordCount) { this.binlogCoordinate = binlogCoordinate; this.logSequenceNumber = null; this.timestamp = timestamp; + this.recordCount = recordCount; acknowledgmentStatus = AcknowledgmentStatus.NO_ACK; } - public ChangeEventStatus(final LogSequenceNumber logSequenceNumber, final long timestamp) { + public ChangeEventStatus(final LogSequenceNumber logSequenceNumber, final long timestamp, final long recordCount) { this.binlogCoordinate = null; this.logSequenceNumber = logSequenceNumber; this.timestamp = timestamp; + this.recordCount = recordCount; acknowledgmentStatus = AcknowledgmentStatus.NO_ACK; } @@ -62,4 +65,8 @@ public LogSequenceNumber getLogSequenceNumber() { public long getTimestamp() { return timestamp; } + + public long getRecordCount() { + return recordCount; + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java index 4778976f1e..6ec0dba24b 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java @@ -76,6 +76,7 @@ public static TupleDataType fromValue(char value) { static final String BYTES_RECEIVED = "bytesReceived"; static final String BYTES_PROCESSED = "bytesProcessed"; static final String REPLICATION_LOG_EVENT_PROCESSING_TIME = "replicationLogEntryProcessingTime"; + static final String REPLICATION_LOG_PROCESSING_ERROR_COUNT = "replicationLogEntryProcessingErrors"; private final StreamPartition streamPartition; private final RdsSourceConfig sourceConfig; @@ -94,6 +95,7 @@ public static TupleDataType fromValue(char value) { private final DistributionSummary bytesReceivedSummary; private final DistributionSummary bytesProcessedSummary; private final Timer eventProcessingTimer; + private final Counter eventProcessingErrorCounter; private long currentLsn; private long currentEventTimestamp; @@ -120,7 +122,8 @@ public LogicalReplicationEventProcessor(final StreamPartition streamPartition, this.streamCheckpointer = streamCheckpointer; streamCheckpointManager = new StreamCheckpointManager( streamCheckpointer, sourceConfig.isAcknowledgmentsEnabled(), - acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout(), sourceConfig.getEngine()); + acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout(), + sourceConfig.getEngine(), pluginMetrics); streamCheckpointManager.start(); tableMetadataMap = new HashMap<>(); @@ -131,6 +134,7 @@ public LogicalReplicationEventProcessor(final StreamPartition streamPartition, bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED); bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED); eventProcessingTimer = pluginMetrics.timer(REPLICATION_LOG_EVENT_PROCESSING_TIME); + eventProcessingErrorCounter = pluginMetrics.counter(REPLICATION_LOG_PROCESSING_ERROR_COUNT); } public void process(ByteBuffer msg) { @@ -228,9 +232,10 @@ void processCommitMessage(ByteBuffer msg) { throw new RuntimeException("Commit LSN does not match current LSN, skipping"); } + final long recordCount = pipelineEvents.size(); AcknowledgementSet acknowledgementSet = null; if (sourceConfig.isAcknowledgmentsEnabled()) { - acknowledgementSet = streamCheckpointManager.createAcknowledgmentSet(LogSequenceNumber.valueOf(currentLsn)); + acknowledgementSet = streamCheckpointManager.createAcknowledgmentSet(LogSequenceNumber.valueOf(currentLsn), recordCount); } writeToBuffer(bufferAccumulator, acknowledgementSet); @@ -240,7 +245,7 @@ void processCommitMessage(ByteBuffer msg) { if (sourceConfig.isAcknowledgmentsEnabled()) { acknowledgementSet.complete(); } else { - streamCheckpointManager.saveChangeEventsStatus(LogSequenceNumber.valueOf(currentLsn)); + streamCheckpointManager.saveChangeEventsStatus(LogSequenceNumber.valueOf(currentLsn), recordCount); } } @@ -417,6 +422,7 @@ private void handleMessageAndErrors(ByteBuffer message, Consumer fun eventProcessingTimer.record(() -> function.accept(message)); } catch (Exception e) { LOG.error("Failed to process change event of type {}", messageType, e); + eventProcessingErrorCounter.increment(); } } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManager.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManager.java index 274730770c..37e9e14615 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManager.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManager.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.source.rds.stream; +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType; @@ -23,6 +25,11 @@ public class StreamCheckpointManager { private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointManager.class); static final int REGULAR_CHECKPOINT_INTERVAL_MILLIS = 60_000; static final int CHANGE_EVENT_COUNT_PER_CHECKPOINT_BATCH = 1000; + static final String POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME = "positiveAcknowledgementSets"; + static final String NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME = "negativeAcknowledgementSets"; + static final String RECORDS_CHECKPOINTED = "recordsCheckpointed"; + static final String NO_DATA_EXTEND_LEASE_COUNT = "noDataExtendLeaseCount"; + static final String GIVE_UP_PARTITION_COUNT = "giveupPartitionCount"; private final ConcurrentLinkedQueue changeEventStatuses = new ConcurrentLinkedQueue<>(); private final StreamCheckpointer streamCheckpointer; @@ -32,20 +39,34 @@ public class StreamCheckpointManager { private final AcknowledgementSetManager acknowledgementSetManager; private final Duration acknowledgmentTimeout; private final EngineType engineType; + private final PluginMetrics pluginMetrics; + private final Counter positiveAcknowledgementSets; + private final Counter negativeAcknowledgementSets; + private final Counter recordsCheckpointed; + private final Counter noDataExtendLeaseCount; + private final Counter giveupPartitionCount; public StreamCheckpointManager(final StreamCheckpointer streamCheckpointer, final boolean isAcknowledgmentEnabled, final AcknowledgementSetManager acknowledgementSetManager, final Runnable stopStreamRunnable, final Duration acknowledgmentTimeout, - final EngineType engineType) { + final EngineType engineType, + final PluginMetrics pluginMetrics) { this.acknowledgementSetManager = acknowledgementSetManager; this.streamCheckpointer = streamCheckpointer; this.isAcknowledgmentEnabled = isAcknowledgmentEnabled; this.stopStreamRunnable = stopStreamRunnable; this.acknowledgmentTimeout = acknowledgmentTimeout; this.engineType = engineType; + this.pluginMetrics = pluginMetrics; executorService = Executors.newSingleThreadExecutor(); + + this.positiveAcknowledgementSets = pluginMetrics.counter(POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME); + this.negativeAcknowledgementSets = pluginMetrics.counter(NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME); + this.recordsCheckpointed = pluginMetrics.counter(RECORDS_CHECKPOINTED); + this.noDataExtendLeaseCount = pluginMetrics.counter(NO_DATA_EXTEND_LEASE_COUNT); + this.giveupPartitionCount = pluginMetrics.counter(GIVE_UP_PARTITION_COUNT); } public void start() { @@ -59,6 +80,7 @@ void runCheckpointing() { try { if (changeEventStatuses.isEmpty()) { LOG.debug("No records processed. Extend the lease on stream partition."); + noDataExtendLeaseCount.increment(); streamCheckpointer.extendLease(); } else { if (isAcknowledgmentEnabled) { @@ -70,13 +92,14 @@ void runCheckpointing() { } if (lastChangeEventStatus != null) { - streamCheckpointer.checkpoint(engineType, lastChangeEventStatus); + checkpoint(engineType, lastChangeEventStatus); } // If negative ack is seen, give up partition and exit loop to stop processing stream if (currentChangeEventStatus != null && currentChangeEventStatus.isNegativeAcknowledgment()) { LOG.info("Received negative acknowledgement for change event at {}. Will restart from most recent checkpoint", currentChangeEventStatus.getBinlogCoordinate()); streamCheckpointer.giveUpPartition(); + giveupPartitionCount.increment(); break; } } else { @@ -86,10 +109,10 @@ void runCheckpointing() { changeEventCount++; // In case queue are populated faster than the poll, checkpoint when reaching certain count if (changeEventCount % CHANGE_EVENT_COUNT_PER_CHECKPOINT_BATCH == 0) { - streamCheckpointer.checkpoint(engineType, currentChangeEventStatus); + checkpoint(engineType, currentChangeEventStatus); } } while (!changeEventStatuses.isEmpty()); - streamCheckpointer.checkpoint(engineType, currentChangeEventStatus); + checkpoint(engineType, currentChangeEventStatus); } } } catch (Exception e) { @@ -112,28 +135,28 @@ public void stop() { executorService.shutdownNow(); } - public ChangeEventStatus saveChangeEventsStatus(BinlogCoordinate binlogCoordinate) { - final ChangeEventStatus changeEventStatus = new ChangeEventStatus(binlogCoordinate, Instant.now().toEpochMilli()); + public ChangeEventStatus saveChangeEventsStatus(BinlogCoordinate binlogCoordinate, long recordCount) { + final ChangeEventStatus changeEventStatus = new ChangeEventStatus(binlogCoordinate, Instant.now().toEpochMilli(), recordCount); changeEventStatuses.add(changeEventStatus); return changeEventStatus; } - public ChangeEventStatus saveChangeEventsStatus(LogSequenceNumber logSequenceNumber) { - final ChangeEventStatus changeEventStatus = new ChangeEventStatus(logSequenceNumber, Instant.now().toEpochMilli()); + public ChangeEventStatus saveChangeEventsStatus(LogSequenceNumber logSequenceNumber, long recordCount) { + final ChangeEventStatus changeEventStatus = new ChangeEventStatus(logSequenceNumber, Instant.now().toEpochMilli(), recordCount); changeEventStatuses.add(changeEventStatus); return changeEventStatus; } - public AcknowledgementSet createAcknowledgmentSet(BinlogCoordinate binlogCoordinate) { + public AcknowledgementSet createAcknowledgmentSet(BinlogCoordinate binlogCoordinate, long recordCount) { LOG.debug("Create acknowledgment set for events receive prior to {}", binlogCoordinate); - final ChangeEventStatus changeEventStatus = new ChangeEventStatus(binlogCoordinate, Instant.now().toEpochMilli()); + final ChangeEventStatus changeEventStatus = new ChangeEventStatus(binlogCoordinate, Instant.now().toEpochMilli(), recordCount); changeEventStatuses.add(changeEventStatus); return getAcknowledgementSet(changeEventStatus); } - public AcknowledgementSet createAcknowledgmentSet(LogSequenceNumber logSequenceNumber) { + public AcknowledgementSet createAcknowledgmentSet(LogSequenceNumber logSequenceNumber, long recordCount) { LOG.debug("Create acknowledgment set for events receive prior to {}", logSequenceNumber); - final ChangeEventStatus changeEventStatus = new ChangeEventStatus(logSequenceNumber, Instant.now().toEpochMilli()); + final ChangeEventStatus changeEventStatus = new ChangeEventStatus(logSequenceNumber, Instant.now().toEpochMilli(), recordCount); changeEventStatuses.add(changeEventStatus); return getAcknowledgementSet(changeEventStatus); } @@ -141,13 +164,23 @@ public AcknowledgementSet createAcknowledgmentSet(LogSequenceNumber logSequenceN private AcknowledgementSet getAcknowledgementSet(ChangeEventStatus changeEventStatus) { return acknowledgementSetManager.create((result) -> { if (result) { + positiveAcknowledgementSets.increment(); changeEventStatus.setAcknowledgmentStatus(ChangeEventStatus.AcknowledgmentStatus.POSITIVE_ACK); } else { + negativeAcknowledgementSets.increment(); changeEventStatus.setAcknowledgmentStatus(ChangeEventStatus.AcknowledgmentStatus.NEGATIVE_ACK); } }, acknowledgmentTimeout); } + private void checkpoint(final EngineType engineType, final ChangeEventStatus changeEventStatus) { + LOG.debug("Checkpoint at {} with record count {}. ", engineType == EngineType.MYSQL ? + changeEventStatus.getBinlogCoordinate() : changeEventStatus.getLogSequenceNumber(), + changeEventStatus.getRecordCount()); + streamCheckpointer.checkpoint(engineType, changeEventStatus); + recordsCheckpointed.increment(changeEventStatus.getRecordCount()); + } + //VisibleForTesting ConcurrentLinkedQueue getChangeEventStatuses() { return changeEventStatuses; diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessorTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessorTest.java index 9d61fd0e9f..90e8149319 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessorTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessorTest.java @@ -11,7 +11,6 @@ package org.opensearch.dataprepper.plugins.source.rds.stream; import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -32,11 +31,11 @@ import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.opensearch.dataprepper.plugins.source.rds.stream.BinlogEventListener.REPLICATION_LOG_EVENT_PROCESSING_TIME; @ExtendWith(MockitoExtension.class) class LogicalReplicationEventProcessorTest { @@ -70,14 +69,12 @@ class LogicalReplicationEventProcessorTest { private Random random; - private Timer eventProcessingTimer; - @BeforeEach void setUp() { s3Prefix = UUID.randomUUID().toString(); random = new Random(); - eventProcessingTimer = Metrics.timer("test-timer"); - when(pluginMetrics.timer(REPLICATION_LOG_EVENT_PROCESSING_TIME)).thenReturn(eventProcessingTimer); + when(pluginMetrics.timer(anyString())).thenReturn(Metrics.timer("test-timer")); + when(pluginMetrics.counter(anyString())).thenReturn(Metrics.counter("test-counter")); objectUnderTest = spy(createObjectUnderTest()); } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManagerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManagerTest.java index 873da31834..1b32639daf 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManagerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManagerTest.java @@ -11,12 +11,14 @@ import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType; import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; import org.postgresql.replication.LogSequenceNumber; import java.time.Duration; +import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @@ -44,12 +46,16 @@ class StreamCheckpointManagerTest { @Mock private Runnable stopStreamRunnable; + @Mock + private PluginMetrics pluginMetrics; + private boolean isAcknowledgmentEnabled = false; private EngineType engineType = EngineType.MYSQL; + private Random random; @BeforeEach void setUp() { - + random = new Random(); } @Test @@ -81,33 +87,43 @@ void test_shutdown() { @Test void test_saveChangeEventsStatus_mysql() { final BinlogCoordinate binlogCoordinate = mock(BinlogCoordinate.class); + final long recordCount = random.nextLong(); final StreamCheckpointManager streamCheckpointManager = createObjectUnderTest(); - streamCheckpointManager.saveChangeEventsStatus(binlogCoordinate); + + streamCheckpointManager.saveChangeEventsStatus(binlogCoordinate, recordCount); assertThat(streamCheckpointManager.getChangeEventStatuses().size(), is(1)); - assertThat(streamCheckpointManager.getChangeEventStatuses().peek().getBinlogCoordinate(), is(binlogCoordinate)); + final ChangeEventStatus changeEventStatus = streamCheckpointManager.getChangeEventStatuses().peek(); + assertThat(changeEventStatus.getBinlogCoordinate(), is(binlogCoordinate)); + assertThat(changeEventStatus.getRecordCount(), is(recordCount)); } @Test void test_saveChangeEventsStatus_postgres() { final LogSequenceNumber logSequenceNumber = mock(LogSequenceNumber.class); engineType = EngineType.POSTGRES; + final long recordCount = random.nextLong(); final StreamCheckpointManager streamCheckpointManager = createObjectUnderTest(); - streamCheckpointManager.saveChangeEventsStatus(logSequenceNumber); + + streamCheckpointManager.saveChangeEventsStatus(logSequenceNumber, recordCount); assertThat(streamCheckpointManager.getChangeEventStatuses().size(), is(1)); - assertThat(streamCheckpointManager.getChangeEventStatuses().peek().getLogSequenceNumber(), is(logSequenceNumber)); + final ChangeEventStatus changeEventStatus = streamCheckpointManager.getChangeEventStatuses().peek(); + assertThat(changeEventStatus.getLogSequenceNumber(), is(logSequenceNumber)); + assertThat(changeEventStatus.getRecordCount(), is(recordCount)); } @Test void test_createAcknowledgmentSet_mysql() { final BinlogCoordinate binlogCoordinate = mock(BinlogCoordinate.class); + final long recordCount = random.nextLong(); final StreamCheckpointManager streamCheckpointManager = createObjectUnderTest(); - streamCheckpointManager.createAcknowledgmentSet(binlogCoordinate); + streamCheckpointManager.createAcknowledgmentSet(binlogCoordinate, recordCount); assertThat(streamCheckpointManager.getChangeEventStatuses().size(), is(1)); ChangeEventStatus changeEventStatus = streamCheckpointManager.getChangeEventStatuses().peek(); assertThat(changeEventStatus.getBinlogCoordinate(), is(binlogCoordinate)); + assertThat(changeEventStatus.getRecordCount(), is(recordCount)); verify(acknowledgementSetManager).create(any(Consumer.class), eq(ACK_TIMEOUT)); } @@ -115,17 +131,19 @@ void test_createAcknowledgmentSet_mysql() { void test_createAcknowledgmentSet_postgres() { final LogSequenceNumber logSequenceNumber = mock(LogSequenceNumber.class); engineType = EngineType.POSTGRES; + final long recordCount = random.nextLong(); final StreamCheckpointManager streamCheckpointManager = createObjectUnderTest(); - streamCheckpointManager.createAcknowledgmentSet(logSequenceNumber); + streamCheckpointManager.createAcknowledgmentSet(logSequenceNumber, recordCount); assertThat(streamCheckpointManager.getChangeEventStatuses().size(), is(1)); ChangeEventStatus changeEventStatus = streamCheckpointManager.getChangeEventStatuses().peek(); assertThat(changeEventStatus.getLogSequenceNumber(), is(logSequenceNumber)); + assertThat(changeEventStatus.getRecordCount(), is(recordCount)); verify(acknowledgementSetManager).create(any(Consumer.class), eq(ACK_TIMEOUT)); } private StreamCheckpointManager createObjectUnderTest() { return new StreamCheckpointManager( - streamCheckpointer, isAcknowledgmentEnabled, acknowledgementSetManager, stopStreamRunnable, ACK_TIMEOUT, engineType); + streamCheckpointer, isAcknowledgmentEnabled, acknowledgementSetManager, stopStreamRunnable, ACK_TIMEOUT, engineType, pluginMetrics); } }