Skip to content

Commit

Permalink
Add e2e ack, checkpointing and metrics to Postgres stream processing (#…
Browse files Browse the repository at this point in the history
…5375)

* Initial commit

Signed-off-by: Hai Yan <[email protected]>

* Update unit tests

Signed-off-by: Hai Yan <[email protected]>

* Add more metrics

Signed-off-by: Hai Yan <[email protected]>

* Add more tests

Signed-off-by: Hai Yan <[email protected]>

* Address review comments

Signed-off-by: Hai Yan <[email protected]>

* Address review comments

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Feb 4, 2025
1 parent 170b395 commit b181a8d
Show file tree
Hide file tree
Showing 13 changed files with 454 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ public Map<String, ParentTable> getParentTableMap(StreamPartition streamPartitio

/**
* Detects if a binlog event contains cascading updates and if detected, creates resync partitions
* @param event event
* @param event binlog event
* @param parentTableMap parent table map
* @param tableMetadata table meta data
* @param tableMetadata table metadata
*/
public void detectCascadingUpdates(Event event, Map<String, ParentTable> parentTableMap, TableMetadata tableMetadata) {
final UpdateRowsEventData data = event.getData();
Expand Down Expand Up @@ -143,9 +143,9 @@ public void detectCascadingUpdates(Event event, Map<String, ParentTable> parentT

/**
* Detects if a binlog event contains cascading deletes and if detected, creates resync partitions
* @param event event
* @param event binlog event
* @param parentTableMap parent table map
* @param tableMetadata table meta data
* @param tableMetadata table metadata
*/
public void detectCascadingDeletes(Event event, Map<String, ParentTable> parentTableMap, TableMetadata tableMetadata) {
if (parentTableMap.containsKey(tableMetadata.getFullTableName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public BinlogEventListener(final StreamPartition streamPartition,
this.dbTableMetadata = dbTableMetadata;
this.streamCheckpointManager = new StreamCheckpointManager(
streamCheckpointer, sourceConfig.isAcknowledgmentsEnabled(),
acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout());
acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout(),
sourceConfig.getEngine(), pluginMetrics);
streamCheckpointManager.start();

this.cascadeActionDetector = cascadeActionDetector;
Expand Down Expand Up @@ -200,7 +201,7 @@ void handleRotateEvent(com.github.shyiko.mysql.binlog.event.Event event) {

// Trigger a checkpoint update for this rotate when there're no row mutation events being processed
if (streamCheckpointManager.getChangeEventStatuses().isEmpty()) {
ChangeEventStatus changeEventStatus = streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate);
ChangeEventStatus changeEventStatus = streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate, 0);
if (isAcknowledgmentsEnabled) {
changeEventStatus.setAcknowledgmentStatus(ChangeEventStatus.AcknowledgmentStatus.POSITIVE_ACK);
}
Expand Down Expand Up @@ -347,9 +348,10 @@ void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event,
LOG.debug("Current binlog coordinate after receiving a row change event: " + currentBinlogCoordinate);
}

final long recordCount = rows.size();
AcknowledgementSet acknowledgementSet = null;
if (isAcknowledgmentsEnabled) {
acknowledgementSet = streamCheckpointManager.createAcknowledgmentSet(currentBinlogCoordinate);
acknowledgementSet = streamCheckpointManager.createAcknowledgmentSet(currentBinlogCoordinate, recordCount);
}

final long bytes = event.toString().getBytes().length;
Expand Down Expand Up @@ -398,7 +400,7 @@ void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event,
if (isAcknowledgmentsEnabled) {
acknowledgementSet.complete();
} else {
streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate);
streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate, recordCount);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
package org.opensearch.dataprepper.plugins.source.rds.stream;

import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.postgresql.replication.LogSequenceNumber;

public class ChangeEventStatus {

private final BinlogCoordinate binlogCoordinate;
private final LogSequenceNumber logSequenceNumber;
private final long timestamp;
private final long recordCount;
private volatile AcknowledgmentStatus acknowledgmentStatus;

public enum AcknowledgmentStatus {
Expand All @@ -19,9 +22,19 @@ public enum AcknowledgmentStatus {
NO_ACK
}

public ChangeEventStatus(final BinlogCoordinate binlogCoordinate, final long timestamp) {
public ChangeEventStatus(final BinlogCoordinate binlogCoordinate, final long timestamp, final long recordCount) {
this.binlogCoordinate = binlogCoordinate;
this.logSequenceNumber = null;
this.timestamp = timestamp;
this.recordCount = recordCount;
acknowledgmentStatus = AcknowledgmentStatus.NO_ACK;
}

public ChangeEventStatus(final LogSequenceNumber logSequenceNumber, final long timestamp, final long recordCount) {
this.binlogCoordinate = null;
this.logSequenceNumber = logSequenceNumber;
this.timestamp = timestamp;
this.recordCount = recordCount;
acknowledgmentStatus = AcknowledgmentStatus.NO_ACK;
}

Expand All @@ -45,7 +58,15 @@ public BinlogCoordinate getBinlogCoordinate() {
return binlogCoordinate;
}

public LogSequenceNumber getLogSequenceNumber() {
return logSequenceNumber;
}

public long getTimestamp() {
return timestamp;
}

public long getRecordCount() {
return recordCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public LogicalReplicationClient(final ConnectionManager connectionManager,

@Override
public void connect() {
LOG.debug("Start connecting logical replication stream. ");
PGReplicationStream stream;
try (Connection conn = connectionManager.getConnection()) {
PGConnection pgConnection = conn.unwrap(PGConnection.class);
Expand All @@ -62,6 +63,7 @@ public void connect() {
logicalStreamBuilder.withStartPosition(startLsn);
}
stream = logicalStreamBuilder.start();
LOG.debug("Logical replication stream started. ");

if (eventProcessor != null) {
while (!disconnectRequested) {
Expand All @@ -88,7 +90,8 @@ public void connect() {
}

stream.close();
LOG.info("Replication stream closed successfully.");
disconnectRequested = false;
LOG.debug("Replication stream closed successfully.");
} catch (Exception e) {
LOG.error("Exception while creating Postgres replication stream. ", e);
}
Expand All @@ -97,6 +100,7 @@ public void connect() {
@Override
public void disconnect() {
disconnectRequested = true;
LOG.debug("Requested to disconnect logical replication stream.");
}

public void setEventProcessor(LogicalReplicationEventProcessor eventProcessor) {
Expand Down
Loading

0 comments on commit b181a8d

Please sign in to comment.