Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh committed Feb 3, 2025
1 parent 5c35522 commit f6bd2e2
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 36 deletions.
1 change: 0 additions & 1 deletion data-prepper-plugins/rds-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,4 @@ dependencies {
testImplementation libs.avro.core
testImplementation libs.parquet.hadoop
testImplementation libs.parquet.avro
// testImplementation 'org.slf4j:slf4j-simple:2.0.9'
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public BinlogEventListener(final StreamPartition streamPartition,
this.dbTableMetadata = dbTableMetadata;
this.streamCheckpointManager = new StreamCheckpointManager(
streamCheckpointer, sourceConfig.isAcknowledgmentsEnabled(),
acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout(), sourceConfig.getEngine());
acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout(),
sourceConfig.getEngine(), pluginMetrics);
streamCheckpointManager.start();

this.cascadeActionDetector = cascadeActionDetector;
Expand Down Expand Up @@ -200,7 +201,7 @@ void handleRotateEvent(com.github.shyiko.mysql.binlog.event.Event event) {

// Trigger a checkpoint update for this rotate when there're no row mutation events being processed
if (streamCheckpointManager.getChangeEventStatuses().isEmpty()) {
ChangeEventStatus changeEventStatus = streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate);
ChangeEventStatus changeEventStatus = streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate, 0);
if (isAcknowledgmentsEnabled) {
changeEventStatus.setAcknowledgmentStatus(ChangeEventStatus.AcknowledgmentStatus.POSITIVE_ACK);
}
Expand Down Expand Up @@ -347,9 +348,10 @@ void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event,
LOG.debug("Current binlog coordinate after receiving a row change event: " + currentBinlogCoordinate);
}

final long recordCount = rows.size();
AcknowledgementSet acknowledgementSet = null;
if (isAcknowledgmentsEnabled) {
acknowledgementSet = streamCheckpointManager.createAcknowledgmentSet(currentBinlogCoordinate);
acknowledgementSet = streamCheckpointManager.createAcknowledgmentSet(currentBinlogCoordinate, recordCount);
}

final long bytes = event.toString().getBytes().length;
Expand Down Expand Up @@ -398,7 +400,7 @@ void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event,
if (isAcknowledgmentsEnabled) {
acknowledgementSet.complete();
} else {
streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate);
streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate, recordCount);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class ChangeEventStatus {
private final BinlogCoordinate binlogCoordinate;
private final LogSequenceNumber logSequenceNumber;
private final long timestamp;
private final long recordCount;
private volatile AcknowledgmentStatus acknowledgmentStatus;

public enum AcknowledgmentStatus {
Expand All @@ -21,17 +22,19 @@ public enum AcknowledgmentStatus {
NO_ACK
}

public ChangeEventStatus(final BinlogCoordinate binlogCoordinate, final long timestamp) {
public ChangeEventStatus(final BinlogCoordinate binlogCoordinate, final long timestamp, final long recordCount) {
this.binlogCoordinate = binlogCoordinate;
this.logSequenceNumber = null;
this.timestamp = timestamp;
this.recordCount = recordCount;
acknowledgmentStatus = AcknowledgmentStatus.NO_ACK;
}

public ChangeEventStatus(final LogSequenceNumber logSequenceNumber, final long timestamp) {
public ChangeEventStatus(final LogSequenceNumber logSequenceNumber, final long timestamp, final long recordCount) {
this.binlogCoordinate = null;
this.logSequenceNumber = logSequenceNumber;
this.timestamp = timestamp;
this.recordCount = recordCount;
acknowledgmentStatus = AcknowledgmentStatus.NO_ACK;
}

Expand Down Expand Up @@ -62,4 +65,8 @@ public LogSequenceNumber getLogSequenceNumber() {
public long getTimestamp() {
return timestamp;
}

public long getRecordCount() {
return recordCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public static TupleDataType fromValue(char value) {
static final String BYTES_RECEIVED = "bytesReceived";
static final String BYTES_PROCESSED = "bytesProcessed";
static final String REPLICATION_LOG_EVENT_PROCESSING_TIME = "replicationLogEntryProcessingTime";
static final String REPLICATION_LOG_PROCESSING_ERROR_COUNT = "replicationLogEntryProcessingErrors";

private final StreamPartition streamPartition;
private final RdsSourceConfig sourceConfig;
Expand All @@ -94,6 +95,7 @@ public static TupleDataType fromValue(char value) {
private final DistributionSummary bytesReceivedSummary;
private final DistributionSummary bytesProcessedSummary;
private final Timer eventProcessingTimer;
private final Counter eventProcessingErrorCounter;

private long currentLsn;
private long currentEventTimestamp;
Expand All @@ -120,7 +122,8 @@ public LogicalReplicationEventProcessor(final StreamPartition streamPartition,
this.streamCheckpointer = streamCheckpointer;
streamCheckpointManager = new StreamCheckpointManager(
streamCheckpointer, sourceConfig.isAcknowledgmentsEnabled(),
acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout(), sourceConfig.getEngine());
acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout(),
sourceConfig.getEngine(), pluginMetrics);
streamCheckpointManager.start();

tableMetadataMap = new HashMap<>();
Expand All @@ -131,6 +134,7 @@ public LogicalReplicationEventProcessor(final StreamPartition streamPartition,
bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
eventProcessingTimer = pluginMetrics.timer(REPLICATION_LOG_EVENT_PROCESSING_TIME);
eventProcessingErrorCounter = pluginMetrics.counter(REPLICATION_LOG_PROCESSING_ERROR_COUNT);
}

public void process(ByteBuffer msg) {
Expand Down Expand Up @@ -228,9 +232,10 @@ void processCommitMessage(ByteBuffer msg) {
throw new RuntimeException("Commit LSN does not match current LSN, skipping");
}

final long recordCount = pipelineEvents.size();
AcknowledgementSet acknowledgementSet = null;
if (sourceConfig.isAcknowledgmentsEnabled()) {
acknowledgementSet = streamCheckpointManager.createAcknowledgmentSet(LogSequenceNumber.valueOf(currentLsn));
acknowledgementSet = streamCheckpointManager.createAcknowledgmentSet(LogSequenceNumber.valueOf(currentLsn), recordCount);
}

writeToBuffer(bufferAccumulator, acknowledgementSet);
Expand All @@ -240,7 +245,7 @@ void processCommitMessage(ByteBuffer msg) {
if (sourceConfig.isAcknowledgmentsEnabled()) {
acknowledgementSet.complete();
} else {
streamCheckpointManager.saveChangeEventsStatus(LogSequenceNumber.valueOf(currentLsn));
streamCheckpointManager.saveChangeEventsStatus(LogSequenceNumber.valueOf(currentLsn), recordCount);
}
}

Expand Down Expand Up @@ -417,6 +422,7 @@ private void handleMessageAndErrors(ByteBuffer message, Consumer<ByteBuffer> fun
eventProcessingTimer.record(() -> function.accept(message));
} catch (Exception e) {
LOG.error("Failed to process change event of type {}", messageType, e);
eventProcessingErrorCounter.increment();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.source.rds.stream;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
Expand All @@ -23,6 +25,11 @@ public class StreamCheckpointManager {
private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointManager.class);
static final int REGULAR_CHECKPOINT_INTERVAL_MILLIS = 60_000;
static final int CHANGE_EVENT_COUNT_PER_CHECKPOINT_BATCH = 1000;
static final String POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME = "positiveAcknowledgementSets";
static final String NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME = "negativeAcknowledgementSets";
static final String RECORDS_CHECKPOINTED = "recordsCheckpointed";
static final String NO_DATA_EXTEND_LEASE_COUNT = "noDataExtendLeaseCount";
static final String GIVE_UP_PARTITION_COUNT = "giveupPartitionCount";

private final ConcurrentLinkedQueue<ChangeEventStatus> changeEventStatuses = new ConcurrentLinkedQueue<>();
private final StreamCheckpointer streamCheckpointer;
Expand All @@ -32,20 +39,34 @@ public class StreamCheckpointManager {
private final AcknowledgementSetManager acknowledgementSetManager;
private final Duration acknowledgmentTimeout;
private final EngineType engineType;
private final PluginMetrics pluginMetrics;
private final Counter positiveAcknowledgementSets;
private final Counter negativeAcknowledgementSets;
private final Counter recordsCheckpointed;
private final Counter noDataExtendLeaseCount;
private final Counter giveupPartitionCount;

public StreamCheckpointManager(final StreamCheckpointer streamCheckpointer,
final boolean isAcknowledgmentEnabled,
final AcknowledgementSetManager acknowledgementSetManager,
final Runnable stopStreamRunnable,
final Duration acknowledgmentTimeout,
final EngineType engineType) {
final EngineType engineType,
final PluginMetrics pluginMetrics) {
this.acknowledgementSetManager = acknowledgementSetManager;
this.streamCheckpointer = streamCheckpointer;
this.isAcknowledgmentEnabled = isAcknowledgmentEnabled;
this.stopStreamRunnable = stopStreamRunnable;
this.acknowledgmentTimeout = acknowledgmentTimeout;
this.engineType = engineType;
this.pluginMetrics = pluginMetrics;
executorService = Executors.newSingleThreadExecutor();

this.positiveAcknowledgementSets = pluginMetrics.counter(POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME);
this.negativeAcknowledgementSets = pluginMetrics.counter(NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME);
this.recordsCheckpointed = pluginMetrics.counter(RECORDS_CHECKPOINTED);
this.noDataExtendLeaseCount = pluginMetrics.counter(NO_DATA_EXTEND_LEASE_COUNT);
this.giveupPartitionCount = pluginMetrics.counter(GIVE_UP_PARTITION_COUNT);
}

public void start() {
Expand All @@ -59,6 +80,7 @@ void runCheckpointing() {
try {
if (changeEventStatuses.isEmpty()) {
LOG.debug("No records processed. Extend the lease on stream partition.");
noDataExtendLeaseCount.increment();
streamCheckpointer.extendLease();
} else {
if (isAcknowledgmentEnabled) {
Expand All @@ -70,13 +92,14 @@ void runCheckpointing() {
}

if (lastChangeEventStatus != null) {
streamCheckpointer.checkpoint(engineType, lastChangeEventStatus);
checkpoint(engineType, lastChangeEventStatus);
}

// If negative ack is seen, give up partition and exit loop to stop processing stream
if (currentChangeEventStatus != null && currentChangeEventStatus.isNegativeAcknowledgment()) {
LOG.info("Received negative acknowledgement for change event at {}. Will restart from most recent checkpoint", currentChangeEventStatus.getBinlogCoordinate());
streamCheckpointer.giveUpPartition();
giveupPartitionCount.increment();
break;
}
} else {
Expand All @@ -86,10 +109,10 @@ void runCheckpointing() {
changeEventCount++;
// In case queue are populated faster than the poll, checkpoint when reaching certain count
if (changeEventCount % CHANGE_EVENT_COUNT_PER_CHECKPOINT_BATCH == 0) {
streamCheckpointer.checkpoint(engineType, currentChangeEventStatus);
checkpoint(engineType, currentChangeEventStatus);
}
} while (!changeEventStatuses.isEmpty());
streamCheckpointer.checkpoint(engineType, currentChangeEventStatus);
checkpoint(engineType, currentChangeEventStatus);
}
}
} catch (Exception e) {
Expand All @@ -112,42 +135,52 @@ public void stop() {
executorService.shutdownNow();
}

public ChangeEventStatus saveChangeEventsStatus(BinlogCoordinate binlogCoordinate) {
final ChangeEventStatus changeEventStatus = new ChangeEventStatus(binlogCoordinate, Instant.now().toEpochMilli());
public ChangeEventStatus saveChangeEventsStatus(BinlogCoordinate binlogCoordinate, long recordCount) {
final ChangeEventStatus changeEventStatus = new ChangeEventStatus(binlogCoordinate, Instant.now().toEpochMilli(), recordCount);
changeEventStatuses.add(changeEventStatus);
return changeEventStatus;
}

public ChangeEventStatus saveChangeEventsStatus(LogSequenceNumber logSequenceNumber) {
final ChangeEventStatus changeEventStatus = new ChangeEventStatus(logSequenceNumber, Instant.now().toEpochMilli());
public ChangeEventStatus saveChangeEventsStatus(LogSequenceNumber logSequenceNumber, long recordCount) {
final ChangeEventStatus changeEventStatus = new ChangeEventStatus(logSequenceNumber, Instant.now().toEpochMilli(), recordCount);
changeEventStatuses.add(changeEventStatus);
return changeEventStatus;
}

public AcknowledgementSet createAcknowledgmentSet(BinlogCoordinate binlogCoordinate) {
public AcknowledgementSet createAcknowledgmentSet(BinlogCoordinate binlogCoordinate, long recordCount) {
LOG.debug("Create acknowledgment set for events receive prior to {}", binlogCoordinate);
final ChangeEventStatus changeEventStatus = new ChangeEventStatus(binlogCoordinate, Instant.now().toEpochMilli());
final ChangeEventStatus changeEventStatus = new ChangeEventStatus(binlogCoordinate, Instant.now().toEpochMilli(), recordCount);
changeEventStatuses.add(changeEventStatus);
return getAcknowledgementSet(changeEventStatus);
}

public AcknowledgementSet createAcknowledgmentSet(LogSequenceNumber logSequenceNumber) {
public AcknowledgementSet createAcknowledgmentSet(LogSequenceNumber logSequenceNumber, long recordCount) {
LOG.debug("Create acknowledgment set for events receive prior to {}", logSequenceNumber);
final ChangeEventStatus changeEventStatus = new ChangeEventStatus(logSequenceNumber, Instant.now().toEpochMilli());
final ChangeEventStatus changeEventStatus = new ChangeEventStatus(logSequenceNumber, Instant.now().toEpochMilli(), recordCount);
changeEventStatuses.add(changeEventStatus);
return getAcknowledgementSet(changeEventStatus);
}

private AcknowledgementSet getAcknowledgementSet(ChangeEventStatus changeEventStatus) {
return acknowledgementSetManager.create((result) -> {
if (result) {
positiveAcknowledgementSets.increment();
changeEventStatus.setAcknowledgmentStatus(ChangeEventStatus.AcknowledgmentStatus.POSITIVE_ACK);
} else {
negativeAcknowledgementSets.increment();
changeEventStatus.setAcknowledgmentStatus(ChangeEventStatus.AcknowledgmentStatus.NEGATIVE_ACK);
}
}, acknowledgmentTimeout);
}

private void checkpoint(final EngineType engineType, final ChangeEventStatus changeEventStatus) {
LOG.debug("Checkpoint at {} with record count {}. ", engineType == EngineType.MYSQL ?
changeEventStatus.getBinlogCoordinate() : changeEventStatus.getLogSequenceNumber(),
changeEventStatus.getRecordCount());
streamCheckpointer.checkpoint(engineType, changeEventStatus);
recordsCheckpointed.increment(changeEventStatus.getRecordCount());
}

//VisibleForTesting
ConcurrentLinkedQueue<ChangeEventStatus> getChangeEventStatuses() {
return changeEventStatuses;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
package org.opensearch.dataprepper.plugins.source.rds.stream;

import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -32,11 +31,11 @@
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.rds.stream.BinlogEventListener.REPLICATION_LOG_EVENT_PROCESSING_TIME;

@ExtendWith(MockitoExtension.class)
class LogicalReplicationEventProcessorTest {
Expand Down Expand Up @@ -70,14 +69,12 @@ class LogicalReplicationEventProcessorTest {

private Random random;

private Timer eventProcessingTimer;

@BeforeEach
void setUp() {
s3Prefix = UUID.randomUUID().toString();
random = new Random();
eventProcessingTimer = Metrics.timer("test-timer");
when(pluginMetrics.timer(REPLICATION_LOG_EVENT_PROCESSING_TIME)).thenReturn(eventProcessingTimer);
when(pluginMetrics.timer(anyString())).thenReturn(Metrics.timer("test-timer"));
when(pluginMetrics.counter(anyString())).thenReturn(Metrics.counter("test-counter"));

objectUnderTest = spy(createObjectUnderTest());
}
Expand Down
Loading

0 comments on commit f6bd2e2

Please sign in to comment.