diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java index d7ab374684..f2587a079d 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ExportProgressState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.LeaderProgressState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.MySqlStreamState; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.PostgresStreamState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; @@ -167,17 +168,20 @@ private void createStreamPartition(RdsSourceConfig sourceConfig) { progressState.setWaitForExport(sourceConfig.isExportEnabled()); progressState.setPrimaryKeyMap(getPrimaryKeyMap()); if (sourceConfig.getEngine() == EngineType.MYSQL) { - final MySqlStreamState mySqlStreamState = progressState.getMySqlStreamState(); + final MySqlStreamState mySqlStreamState = new MySqlStreamState(); getCurrentBinlogPosition().ifPresent(mySqlStreamState::setCurrentPosition); mySqlStreamState.setForeignKeyRelations(((MySqlSchemaManager)schemaManager).getForeignKeyRelations(sourceConfig.getTableNames())); + progressState.setMySqlStreamState(mySqlStreamState); } else { // Postgres // Create replication slot, which will mark the starting point for stream final String publicationName = generatePublicationName(); final String slotName = generateReplicationSlotName(); ((PostgresSchemaManager)schemaManager).createLogicalReplicationSlot(sourceConfig.getTableNames(), publicationName, slotName); - progressState.getPostgresStreamState().setPublicationName(publicationName); - progressState.getPostgresStreamState().setReplicationSlotName(slotName); + final PostgresStreamState postgresStreamState = new PostgresStreamState(); + postgresStreamState.setPublicationName(publicationName); + postgresStreamState.setReplicationSlotName(slotName); + progressState.setPostgresStreamState(postgresStreamState); } StreamPartition streamPartition = new StreamPartition(sourceConfig.getDbIdentifier(), progressState); sourceCoordinator.createPartition(streamPartition); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java index 130f004960..22935fc6e3 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java @@ -25,6 +25,8 @@ public class LogicalReplicationClient implements ReplicationLogClient { private static final Logger LOG = LoggerFactory.getLogger(LogicalReplicationClient.class); + static final String PROTO_VERSION_KEY = "proto_version"; + static final String VERSION_ONE = "1"; static final String PUBLICATION_NAMES_KEY = "publication_names"; private final ConnectionManager connectionManager; @@ -36,10 +38,10 @@ public class LogicalReplicationClient implements ReplicationLogClient { private volatile boolean disconnectRequested = false; public LogicalReplicationClient(final ConnectionManager connectionManager, - final String replicationSlotName, - final String publicationName) { - this.publicationName = publicationName; + final String publicationName, + final String replicationSlotName) { this.connectionManager = connectionManager; + this.publicationName = publicationName; this.replicationSlotName = replicationSlotName; } @@ -54,6 +56,7 @@ public void connect() { .replicationStream() .logical() .withSlotName(replicationSlotName) + .withSlotOption(PROTO_VERSION_KEY, VERSION_ONE) .withSlotOption(PUBLICATION_NAMES_KEY, publicationName); if (startLsn != null) { logicalStreamBuilder.withStartPosition(startLsn); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java index f9881d0063..3d5c1a04b1 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java @@ -158,7 +158,7 @@ void processRelationMessage(ByteBuffer msg) { } void processCommitMessage(ByteBuffer msg) { - int flag = msg.getInt(); + int flag = msg.get(); long commitLsn = msg.getLong(); long endLsn = msg.getLong(); long epochMicro = msg.getLong();