Skip to content

Commit

Permalink
Merge branch 'main' into fix-metrics-then-cherry-pick
Browse files Browse the repository at this point in the history
Signed-off-by: Maxwell Brown <[email protected]>
  • Loading branch information
Galactus22625 authored Jan 28, 2025
2 parents 7efbcae + 0423cd1 commit 4f619dd
Show file tree
Hide file tree
Showing 69 changed files with 2,757 additions and 599 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ void testMetricsOnSpanSet() {

@ParameterizedTest
@CsvSource({
"1, 4",
"0, 4",
"2, 6"
})
void traceGroupCacheMaxSize_provides_an_upper_bound(final long cacheMaxSize, final int expectedProcessedRecords) {
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/rds-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {

implementation 'com.zendesk:mysql-binlog-connector-java:0.29.2'
implementation 'com.mysql:mysql-connector-j:8.4.0'
implementation 'org.postgresql:postgresql:42.7.4'

compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
import org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler;
import org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler;
import org.opensearch.dataprepper.plugins.source.rds.export.ExportTaskManager;
Expand All @@ -26,9 +27,13 @@
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.resync.ResyncScheduler;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManagerFactory;
import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlSchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientFactory;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManagerFactory;
import org.opensearch.dataprepper.plugins.source.rds.stream.ReplicationLogClientFactory;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamScheduler;
import org.opensearch.dataprepper.plugins.source.rds.utils.IdentifierShortener;
import org.slf4j.Logger;
Expand All @@ -37,6 +42,7 @@
import software.amazon.awssdk.services.s3.S3Client;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -101,9 +107,16 @@ public void start(Buffer<Record<Event>> buffer) {
new ClusterApiStrategy(rdsClient) : new InstanceApiStrategy(rdsClient);
final DbMetadata dbMetadata = rdsApiStrategy.describeDb(sourceConfig.getDbIdentifier());
final String s3PathPrefix = getS3PathPrefix();

final SchemaManager schemaManager = getSchemaManager(sourceConfig, dbMetadata);
final Map<String, Map<String, String>> tableColumnDataTypeMap = getColumnDataTypeMap(schemaManager);
final DbTableMetadata dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap);
DbTableMetadata dbTableMetadata;
if (sourceConfig.getEngine() == EngineType.MYSQL) {
final Map<String, Map<String, String>> tableColumnDataTypeMap = getColumnDataTypeMap(
(MySqlSchemaManager) schemaManager);
dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap);
} else {
dbTableMetadata = new DbTableMetadata(dbMetadata, Collections.emptyMap());
}

leaderScheduler = new LeaderScheduler(
sourceCoordinator, sourceConfig, s3PathPrefix, schemaManager, dbTableMetadata);
Expand All @@ -121,21 +134,23 @@ public void start(Buffer<Record<Event>> buffer) {
}

if (sourceConfig.isStreamEnabled()) {
BinlogClientFactory binaryLogClientFactory = new BinlogClientFactory(sourceConfig, rdsClient, dbMetadata);
ReplicationLogClientFactory replicationLogClientFactory = new ReplicationLogClientFactory(sourceConfig, rdsClient, dbMetadata);

if (sourceConfig.isTlsEnabled()) {
binaryLogClientFactory.setSSLMode(SSLMode.REQUIRED);
replicationLogClientFactory.setSSLMode(SSLMode.REQUIRED);
} else {
binaryLogClientFactory.setSSLMode(SSLMode.DISABLED);
replicationLogClientFactory.setSSLMode(SSLMode.DISABLED);
}

streamScheduler = new StreamScheduler(
sourceCoordinator, sourceConfig, s3PathPrefix, binaryLogClientFactory, buffer, pluginMetrics, acknowledgementSetManager, pluginConfigObservable);
sourceCoordinator, sourceConfig, s3PathPrefix, replicationLogClientFactory, buffer, pluginMetrics, acknowledgementSetManager, pluginConfigObservable);
runnableList.add(streamScheduler);

resyncScheduler = new ResyncScheduler(
sourceCoordinator, sourceConfig, getQueryManager(sourceConfig, dbMetadata), s3PathPrefix, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(resyncScheduler);
if (sourceConfig.getEngine() == EngineType.MYSQL) {
resyncScheduler = new ResyncScheduler(
sourceCoordinator, sourceConfig, getQueryManager(sourceConfig, dbMetadata), s3PathPrefix, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(resyncScheduler);
}
}

executor = Executors.newFixedThreadPool(runnableList.size());
Expand Down Expand Up @@ -164,19 +179,14 @@ public void shutdown() {
}

private SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) {
final ConnectionManager connectionManager = new ConnectionManager(
dbMetadata.getEndpoint(),
dbMetadata.getPort(),
sourceConfig.getAuthenticationConfig().getUsername(),
sourceConfig.getAuthenticationConfig().getPassword(),
sourceConfig.isTlsEnabled());
return new SchemaManager(connectionManager);
final ConnectionManager connectionManager = new ConnectionManagerFactory(sourceConfig, dbMetadata).getConnectionManager();
return new SchemaManagerFactory(connectionManager).getSchemaManager();
}

private QueryManager getQueryManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) {
final String readerEndpoint = dbMetadata.getReaderEndpoint() != null ? dbMetadata.getReaderEndpoint() : dbMetadata.getEndpoint();
final int readerPort = dbMetadata.getReaderPort() == 0 ? dbMetadata.getPort() : dbMetadata.getReaderPort();
final ConnectionManager readerConnectionManager = new ConnectionManager(
final MySqlConnectionManager readerConnectionManager = new MySqlConnectionManager(
readerEndpoint,
readerPort,
sourceConfig.getAuthenticationConfig().getUsername(),
Expand All @@ -203,13 +213,11 @@ private String getS3PathPrefix() {
return s3PathPrefix;
}

private Map<String, Map<String, String>> getColumnDataTypeMap(final SchemaManager schemaManager) {
private Map<String, Map<String, String>> getColumnDataTypeMap(final MySqlSchemaManager schemaManager) {
return sourceConfig.getTableNames().stream()
.collect(Collectors.toMap(
fullTableName -> fullTableName,
fullTableName -> schemaManager.getColumnDataTypes(fullTableName.split("\\.")[0], fullTableName.split("\\.")[1])
));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

public enum EngineType {

MYSQL("mysql");
MYSQL("mysql"),
POSTGRES("postgres");

private static final Map<String, EngineType> ENGINE_TYPE_MAP = Arrays.stream(EngineType.values())
.collect(Collectors.toMap(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.rds.coordination.state;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.model.ForeignKeyRelation;

import java.util.List;

public class MySqlStreamState {

@JsonProperty("currentPosition")
private BinlogCoordinate currentPosition;

@JsonProperty("foreignKeyRelations")
private List<ForeignKeyRelation> foreignKeyRelations;

public BinlogCoordinate getCurrentPosition() {
return currentPosition;
}

public void setCurrentPosition(BinlogCoordinate currentPosition) {
this.currentPosition = currentPosition;
}

public List<ForeignKeyRelation> getForeignKeyRelations() {
return foreignKeyRelations;
}

public void setForeignKeyRelations(List<ForeignKeyRelation> foreignKeyRelations) {
this.foreignKeyRelations = foreignKeyRelations;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.rds.coordination.state;

import com.fasterxml.jackson.annotation.JsonProperty;

public class PostgresStreamState {

@JsonProperty("currentLsn")
private String currentLsn;

@JsonProperty("publicationName")
private String publicationName;

@JsonProperty("replicationSlotName")
private String replicationSlotName;

public String getCurrentLsn() {
return currentLsn;
}

public void setCurrentLsn(String currentLsn) {
this.currentLsn = currentLsn;
}

public String getPublicationName() {
return publicationName;
}

public void setPublicationName(String publicationName) {
this.publicationName = publicationName;
}

public String getReplicationSlotName() {
return replicationSlotName;
}

public void setReplicationSlotName(String replicationSlotName) {
this.replicationSlotName = replicationSlotName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,44 @@
package org.opensearch.dataprepper.plugins.source.rds.coordination.state;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.model.ForeignKeyRelation;

import java.util.List;
import java.util.Map;

public class StreamProgressState {

@JsonProperty("currentPosition")
private BinlogCoordinate currentPosition;
@JsonProperty("engineType")
private String engineType;

@JsonProperty("waitForExport")
private boolean waitForExport = false;

@JsonProperty("foreignKeyRelations")
private List<ForeignKeyRelation> foreignKeyRelations;
/**
* Map of table name to primary keys
*/
@JsonProperty("primaryKeyMap")
private Map<String, List<String>> primaryKeyMap;

public BinlogCoordinate getCurrentPosition() {
return currentPosition;
@JsonProperty("mySqlStreamState")
private MySqlStreamState mySqlStreamState;

@JsonProperty("postgresStreamState")
private PostgresStreamState postgresStreamState;

public String getEngineType() {
return engineType;
}

public void setEngineType(String engineType) {
this.engineType = engineType;
}

public Map<String, List<String>> getPrimaryKeyMap() {
return primaryKeyMap;
}

public void setCurrentPosition(BinlogCoordinate currentPosition) {
this.currentPosition = currentPosition;
public void setPrimaryKeyMap(Map<String, List<String>> primaryKeyMap) {
this.primaryKeyMap = primaryKeyMap;
}

public boolean shouldWaitForExport() {
Expand All @@ -38,11 +54,19 @@ public void setWaitForExport(boolean waitForExport) {
this.waitForExport = waitForExport;
}

public List<ForeignKeyRelation> getForeignKeyRelations() {
return foreignKeyRelations;
public MySqlStreamState getMySqlStreamState() {
return mySqlStreamState;
}

public void setMySqlStreamState(MySqlStreamState mySqlStreamState) {
this.mySqlStreamState = mySqlStreamState;
}

public PostgresStreamState getPostgresStreamState() {
return postgresStreamState;
}

public void setForeignKeyRelations(List<ForeignKeyRelation> foreignKeyRelations) {
this.foreignKeyRelations = foreignKeyRelations;
public void setPostgresStreamState(PostgresStreamState postgresStreamState) {
this.postgresStreamState = postgresStreamState;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres;

import java.util.HashMap;
import java.util.Map;

public enum ColumnType {
BOOLEAN(16, "boolean"),
SMALLINT(21, "smallint"),
INTEGER(23, "integer"),
BIGINT(20, "bigint"),
REAL(700, "real"),
DOUBLE_PRECISION(701, "double precision"),
NUMERIC(1700, "numeric"),
TEXT(25, "text"),
VARCHAR(1043, "varchar"),
DATE(1082, "date"),
TIME(1083, "time"),
TIMESTAMP(1114, "timestamp"),
TIMESTAMPTZ(1184, "timestamptz"),
UUID(2950, "uuid"),
JSON(114, "json"),
JSONB(3802, "jsonb");

private final int typeId;
private final String typeName;

private static final Map<Integer, ColumnType> TYPE_ID_MAP = new HashMap<>();

static {
for (ColumnType type : values()) {
TYPE_ID_MAP.put(type.typeId, type);
}
}

ColumnType(int typeId, String typeName) {
this.typeId = typeId;
this.typeName = typeName;
}

public int getTypeId() {
return typeId;
}

public String getTypeName() {
return typeName;
}

public static ColumnType getByTypeId(int typeId) {
if (!TYPE_ID_MAP.containsKey(typeId)) {
throw new IllegalArgumentException("Unsupported column type id: " + typeId);
}
return TYPE_ID_MAP.get(typeId);
}

public static String getTypeNameByEnum(ColumnType columnType) {
return columnType.getTypeName();
}
}
Loading

0 comments on commit 4f619dd

Please sign in to comment.