Skip to content

Commit

Permalink
Fix postgres stream (#5367)
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Jan 29, 2025
1 parent 70e8c8b commit 8f384dc
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ExportProgressState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.LeaderProgressState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.MySqlStreamState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.PostgresStreamState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
Expand Down Expand Up @@ -167,17 +168,20 @@ private void createStreamPartition(RdsSourceConfig sourceConfig) {
progressState.setWaitForExport(sourceConfig.isExportEnabled());
progressState.setPrimaryKeyMap(getPrimaryKeyMap());
if (sourceConfig.getEngine() == EngineType.MYSQL) {
final MySqlStreamState mySqlStreamState = progressState.getMySqlStreamState();
final MySqlStreamState mySqlStreamState = new MySqlStreamState();
getCurrentBinlogPosition().ifPresent(mySqlStreamState::setCurrentPosition);
mySqlStreamState.setForeignKeyRelations(((MySqlSchemaManager)schemaManager).getForeignKeyRelations(sourceConfig.getTableNames()));
progressState.setMySqlStreamState(mySqlStreamState);
} else {
// Postgres
// Create replication slot, which will mark the starting point for stream
final String publicationName = generatePublicationName();
final String slotName = generateReplicationSlotName();
((PostgresSchemaManager)schemaManager).createLogicalReplicationSlot(sourceConfig.getTableNames(), publicationName, slotName);
progressState.getPostgresStreamState().setPublicationName(publicationName);
progressState.getPostgresStreamState().setReplicationSlotName(slotName);
final PostgresStreamState postgresStreamState = new PostgresStreamState();
postgresStreamState.setPublicationName(publicationName);
postgresStreamState.setReplicationSlotName(slotName);
progressState.setPostgresStreamState(postgresStreamState);
}
StreamPartition streamPartition = new StreamPartition(sourceConfig.getDbIdentifier(), progressState);
sourceCoordinator.createPartition(streamPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class LogicalReplicationClient implements ReplicationLogClient {

private static final Logger LOG = LoggerFactory.getLogger(LogicalReplicationClient.class);

static final String PROTO_VERSION_KEY = "proto_version";
static final String VERSION_ONE = "1";
static final String PUBLICATION_NAMES_KEY = "publication_names";

private final ConnectionManager connectionManager;
Expand All @@ -36,10 +38,10 @@ public class LogicalReplicationClient implements ReplicationLogClient {
private volatile boolean disconnectRequested = false;

public LogicalReplicationClient(final ConnectionManager connectionManager,
final String replicationSlotName,
final String publicationName) {
this.publicationName = publicationName;
final String publicationName,
final String replicationSlotName) {
this.connectionManager = connectionManager;
this.publicationName = publicationName;
this.replicationSlotName = replicationSlotName;
}

Expand All @@ -54,6 +56,7 @@ public void connect() {
.replicationStream()
.logical()
.withSlotName(replicationSlotName)
.withSlotOption(PROTO_VERSION_KEY, VERSION_ONE)
.withSlotOption(PUBLICATION_NAMES_KEY, publicationName);
if (startLsn != null) {
logicalStreamBuilder.withStartPosition(startLsn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void processRelationMessage(ByteBuffer msg) {
}

void processCommitMessage(ByteBuffer msg) {
int flag = msg.getInt();
int flag = msg.get();
long commitLsn = msg.getLong();
long endLsn = msg.getLong();
long epochMicro = msg.getLong();
Expand Down

0 comments on commit 8f384dc

Please sign in to comment.