Skip to content

Commit

Permalink
Support export from postgres
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh committed Feb 5, 2025
1 parent b181a8d commit 6fa2556
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public String toString() {
}

@JsonCreator
public static EngineType fromOptionValue(final String option) {
public static EngineType fromString(final String option) {
return ENGINE_TYPE_MAP.get(option);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

public class DataFileProgressState {

@JsonProperty("engineType")
private String engineType;

@JsonProperty("isLoaded")
private boolean isLoaded = false;

Expand All @@ -21,6 +24,10 @@ public class DataFileProgressState {
@JsonProperty("sourceDatabase")
private String sourceDatabase;

/**
* For MySQL, sourceTable is in the format of tableName
* For Postgres, sourceTable is in the format of schemaName.tableName
*/
@JsonProperty("sourceTable")
private String sourceTable;

Expand All @@ -33,6 +40,14 @@ public class DataFileProgressState {
@JsonProperty("snapshotTime")
private long snapshotTime;

public String getEngineType() {
return engineType;
}

public void setEngineType(String engineType) {
this.engineType = engineType;
}

public int getTotalRecords() {
return totalRecords;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
public class ExportProgressState {

@JsonProperty("engineType")
private String engineType;

@JsonProperty("snapshotId")
private String snapshotId;

Expand Down Expand Up @@ -48,6 +51,14 @@ public class ExportProgressState {
@JsonProperty("status")
private String status;

public String getEngineType() {
return engineType;
}

public void setEngineType(String engineType) {
this.engineType = engineType;
}

public String getSnapshotId() {
return snapshotId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState;
Expand Down Expand Up @@ -127,7 +128,7 @@ public void run() {

final String fullTableName = progressState.getSourceDatabase() + DOT_DELIMITER + progressState.getSourceTable();
final List<String> primaryKeys = progressState.getPrimaryKeyMap().getOrDefault(fullTableName, List.of());
transformEvent(event, fullTableName);
transformEvent(event, fullTableName, EngineType.fromString(progressState.getEngineType()));

final long snapshotTime = progressState.getSnapshotTime();
final long eventVersionNumber = snapshotTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis();
Expand Down Expand Up @@ -173,13 +174,15 @@ public void run() {
}
}

private void transformEvent(final Event event, final String fullTableName) {
Map<String, String> columnDataTypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName);
for (Map.Entry<String, Object> entry : event.toMap().entrySet()) {
final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataTypeMap.get(entry.getKey())), entry.getKey(),
entry.getValue(), null);
event.put(entry.getKey(), data);
private void transformEvent(final Event event, final String fullTableName, final EngineType engineType) {
// TODO: support data type mapping in Postgres
if (engineType == EngineType.MYSQL) {
Map<String, String> columnDataTypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName);
for (Map.Entry<String, Object> entry : event.toMap().entrySet()) {
final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataTypeMap.get(entry.getKey())), entry.getKey(),
entry.getValue(), null);
event.put(entry.getKey(), data);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState;
Expand Down Expand Up @@ -53,6 +54,7 @@ public class ExportScheduler implements Runnable {
static final String EXPORT_JOB_SUCCESS_COUNT = "exportJobSuccess";
static final String EXPORT_JOB_FAILURE_COUNT = "exportJobFailure";
static final String EXPORT_S3_OBJECTS_TOTAL_COUNT = "exportS3ObjectsTotal";
static final String DOT_DELIMITER = ".";

private final S3Client s3Client;
private final PluginMetrics pluginMetrics;
Expand All @@ -65,6 +67,8 @@ public class ExportScheduler implements Runnable {
private final Counter exportJobFailureCounter;
private final Counter exportS3ObjectsTotalCounter;

private EngineType engineType;

private volatile boolean shutdownRequested = false;

public ExportScheduler(final EnhancedSourceCoordinator sourceCoordinator,
Expand Down Expand Up @@ -133,6 +137,7 @@ public void shutdown() {

private String getOrCreateExportTaskId(ExportPartition exportPartition) {
ExportProgressState progressState = exportPartition.getProgressState().get();
engineType = EngineType.fromString(progressState.getEngineType());

if (progressState.getExportTaskId() != null) {
LOG.info("Export task has already created for db {}", exportPartition.getDbIdentifier());
Expand Down Expand Up @@ -316,7 +321,9 @@ private void createDataFilePartitions(String bucket,
final DataFileProgressState progressState = new DataFileProgressState();
final ExportObjectKey exportObjectKey = ExportObjectKey.fromString(objectKey);
final String database = exportObjectKey.getDatabaseName();
final String table = exportObjectKey.getTableName();
final String table = engineType == EngineType.MYSQL ?
exportObjectKey.getTableName() :
exportObjectKey.getSchemaName() + DOT_DELIMITER + exportObjectKey.getTableName();

progressState.setSourceDatabase(database);
progressState.setSourceTable(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void run() {
if (leaderPartition != null) {
LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get();
if (!leaderProgressState.isInitialized()) {
LOG.info("Performing initialization as LEADER node.");
init();
}
}
Expand Down Expand Up @@ -139,6 +140,7 @@ private void init() {

private void createExportPartition(RdsSourceConfig sourceConfig) {
ExportProgressState progressState = new ExportProgressState();
progressState.setEngineType(sourceConfig.getEngine().toString());
progressState.setIamRoleArn(sourceConfig.getExport().getIamRoleArn());
progressState.setBucket(sourceConfig.getS3Bucket());
// This prefix is for data exported from RDS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,29 @@

/**
* Represents the object key for an object exported to S3 by RDS.
* The object key has this structure: "{prefix}/{export task ID}/{database name}/{table name}/{numbered folder}/{file name}"
* The object key has this structure: "{prefix}/{export task ID}/{database name}/{full table name}/{numbered folder}/{file name}"
*/
public class ExportObjectKey {

static final String S3_PATH_DELIMITER = "/";
private final String prefix;
private final String exportTaskId;
private final String databaseName;

/**
* schemaName is specific for Postgres; For MySQL, this schemaName has the same value as databaseName
*/
private final String schemaName;

private final String tableName;
private final String numberedFolder;
private final String fileName;

ExportObjectKey(final String prefix, final String exportTaskId, final String databaseName, final String tableName, final String numberedFolder, final String fileName) {
ExportObjectKey(final String prefix, final String exportTaskId, final String databaseName, final String schemaName, final String tableName, final String numberedFolder, final String fileName) {
this.prefix = prefix;
this.exportTaskId = exportTaskId;
this.databaseName = databaseName;
this.schemaName = schemaName;
this.tableName = tableName;
this.numberedFolder = numberedFolder;
this.fileName = fileName;
Expand All @@ -42,13 +49,14 @@ public static ExportObjectKey fromString(final String objectKeyString) {
.collect(Collectors.joining(S3_PATH_DELIMITER));
final String exportTaskId = parts[parts.length - 5];
final String databaseName = parts[parts.length - 4];
// fullTableName is in the format of "databaseName.tableName"
// fullTableName is in the format of "databaseName.tableName" for MySQL and "schemaName.tableName" for Postgres
final String fullTableName = parts[parts.length - 3];
final String schemaName = fullTableName.split("\\.")[0];
final String tableName = fullTableName.split("\\.")[1];
final String numberedFolder = parts[parts.length - 2];
final String fileName = parts[parts.length - 1];

return new ExportObjectKey(prefix, exportTaskId, databaseName, tableName, numberedFolder, fileName);
return new ExportObjectKey(prefix, exportTaskId, databaseName, schemaName, tableName, numberedFolder, fileName);
}

public String getPrefix() {
Expand All @@ -63,6 +71,14 @@ public String getDatabaseName() {
return databaseName;
}

/**
* schemaName is specific for Postgres; For MySQL, this schemaName has the same value as databaseName
* @return schemaName
*/
public String getSchemaName() {
return schemaName;
}

public String getTableName() {
return tableName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,20 @@ public void connect() {
stream.setAppliedLSN(lsn);
} catch (Exception e) {
LOG.error("Exception while processing Postgres replication stream. ", e);
throw e;
}
}
}

stream.close();
disconnectRequested = false;
if (eventProcessor != null) {
eventProcessor.stopCheckpointManager();
}
LOG.debug("Replication stream closed successfully.");
} catch (Exception e) {
LOG.error("Exception while creating Postgres replication stream. ", e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,16 @@ public void process(ByteBuffer msg) {
public void stopClient() {
try {
logicalReplicationClient.disconnect();
LOG.info("Binary log client disconnected.");
LOG.info("Logical replication client disconnected.");
} catch (Exception e) {
LOG.error("Binary log client failed to disconnect.", e);
LOG.error("Logical replication client failed to disconnect.", e);
}
}

public void stopCheckpointManager() {
streamCheckpointManager.stop();
}

void processBeginMessage(ByteBuffer msg) {
currentLsn = msg.getLong();
long epochMicro = msg.getLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public void processStream(final StreamPartition streamPartition) {
LOG.info("Connect to database to read change events.");
replicationLogClient.connect();
} catch (Exception e) {
LOG.warn("Error while connecting to replication stream, will retry.");
sourceCoordinator.giveUpPartition(streamPartition);
throw new RuntimeException(e);
} finally {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig;
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
import org.opensearch.dataprepper.plugins.source.rds.configuration.ExportConfig;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState;
Expand Down Expand Up @@ -92,12 +95,14 @@ void non_leader_node_should_not_perform_init() throws InterruptedException {
verify(sourceCoordinator, never()).createPartition(any(ExportPartition.class));
}

@Test
void leader_node_should_perform_init_if_not_initialized() throws InterruptedException {
@ParameterizedTest
@EnumSource(EngineType.class)
void leader_node_should_perform_init_if_not_initialized(EngineType engineType) throws InterruptedException {
when(sourceCoordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).thenReturn(Optional.of(leaderPartition));
when(leaderPartition.getProgressState()).thenReturn(Optional.of(leaderProgressState));
when(leaderProgressState.isInitialized()).thenReturn(false);
when(sourceConfig.isExportEnabled()).thenReturn(true);
when(sourceConfig.getEngine()).thenReturn(engineType);

final ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(leaderScheduler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,28 @@
class ExportObjectKeyTest {

@Test
void test_fromString_with_valid_input_string() {
void test_fromString_with_valid_input_string_mysql() {
final String objectKeyString = "prefix/export-task-id/db-name/db-name.table-name/1/file-name.parquet";
final ExportObjectKey exportObjectKey = ExportObjectKey.fromString(objectKeyString);

assertThat(exportObjectKey.getPrefix(), equalTo("prefix"));
assertThat(exportObjectKey.getExportTaskId(), equalTo("export-task-id"));
assertThat(exportObjectKey.getDatabaseName(), equalTo("db-name"));
assertThat(exportObjectKey.getSchemaName(), equalTo("db-name"));
assertThat(exportObjectKey.getTableName(), equalTo("table-name"));
assertThat(exportObjectKey.getNumberedFolder(), equalTo("1"));
assertThat(exportObjectKey.getFileName(), equalTo("file-name.parquet"));
}

@Test
void test_fromString_with_valid_input_string_postgres() {
final String objectKeyString = "prefix/export-task-id/db-name/schema-name.table-name/1/file-name.parquet";
final ExportObjectKey exportObjectKey = ExportObjectKey.fromString(objectKeyString);

assertThat(exportObjectKey.getPrefix(), equalTo("prefix"));
assertThat(exportObjectKey.getExportTaskId(), equalTo("export-task-id"));
assertThat(exportObjectKey.getDatabaseName(), equalTo("db-name"));
assertThat(exportObjectKey.getSchemaName(), equalTo("schema-name"));
assertThat(exportObjectKey.getTableName(), equalTo("table-name"));
assertThat(exportObjectKey.getNumberedFolder(), equalTo("1"));
assertThat(exportObjectKey.getFileName(), equalTo("file-name.parquet"));
Expand All @@ -35,6 +50,7 @@ void test_fromString_with_path_with_empty_prefix() {
assertThat(exportObjectKey.getPrefix(), equalTo(""));
assertThat(exportObjectKey.getExportTaskId(), equalTo("export-task-id"));
assertThat(exportObjectKey.getDatabaseName(), equalTo("db-name"));
assertThat(exportObjectKey.getSchemaName(), equalTo("db-name"));
assertThat(exportObjectKey.getTableName(), equalTo("table-name"));
assertThat(exportObjectKey.getNumberedFolder(), equalTo("1"));
assertThat(exportObjectKey.getFileName(), equalTo("file-name.parquet"));
Expand All @@ -48,6 +64,7 @@ void test_fromString_with_path_with_multilevel_prefix() {
assertThat(exportObjectKey.getPrefix(), equalTo("prefix1/prefix2/prefix3"));
assertThat(exportObjectKey.getExportTaskId(), equalTo("export-task-id"));
assertThat(exportObjectKey.getDatabaseName(), equalTo("db-name"));
assertThat(exportObjectKey.getSchemaName(), equalTo("db-name"));
assertThat(exportObjectKey.getTableName(), equalTo("table-name"));
assertThat(exportObjectKey.getNumberedFolder(), equalTo("1"));
assertThat(exportObjectKey.getFileName(), equalTo("file-name.parquet"));
Expand Down
Loading

0 comments on commit 6fa2556

Please sign in to comment.