Skip to content

Commit

Permalink
Merge branch 'main' into retry
Browse files Browse the repository at this point in the history
Signed-off-by: Srikanth Govindarajan <[email protected]>
  • Loading branch information
srikanthjg authored Jan 28, 2025
2 parents 1f4c5fa + 3692fc3 commit 060f301
Show file tree
Hide file tree
Showing 88 changed files with 3,022 additions and 695 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# This should match the owning team set up in https://github.com/orgs/opensearch-project/teams
* @sb2k16 @chenqi0805 @engechas @san81 @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh
* @sb2k16 @chenqi0805 @engechas @san81 @srikanthjg @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh
27 changes: 14 additions & 13 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ This document contains a list of maintainers in this repo. See [opensearch-proje

## Current Maintainers

| Maintainer | GitHub ID | Affiliation |
| -------------------- | --------------------------------------------------------- | ----------- |
| Souvik Bose | [sb2k16](https://github.com/sb2k16) | Amazon |
| Qi Chen | [chenqi0805](https://github.com/chenqi0805) | Amazon |
| Chase Engelbrecht | [engechas](https://github.com/engechas) | Amazon |
| Santhosh Gandhe | [san81](https://github.com/san81) | Amazon |
| Taylor Gray | [graytaylor0](https://github.com/graytaylor0) | Amazon |
| Dinu John | [dinujoh](https://github.com/dinujoh) | Amazon |
| Krishna Kondaka | [kkondaka](https://github.com/kkondaka) | Amazon |
| Asif Sohail Mohammed | [asifsmohammed](https://github.com/asifsmohammed) | Amazon |
| Karsten Schnitter | [KarstenSchnitter](https://github.com/KarstenSchnitter) | SAP |
| David Venable | [dlvenable](https://github.com/dlvenable) | Amazon |
| Hai Yan | [oeyh](https://github.com/oeyh) | Amazon |
| Maintainer | GitHub ID | Affiliation |
| ---------------------- | --------------------------------------------------------- | ----------- |
| Souvik Bose | [sb2k16](https://github.com/sb2k16) | Amazon |
| Qi Chen | [chenqi0805](https://github.com/chenqi0805) | Amazon |
| Chase Engelbrecht | [engechas](https://github.com/engechas) | Amazon |
| Santhosh Gandhe | [san81](https://github.com/san81) | Amazon |
| Srikanth Govindarajan | [srikanthjg](https://github.com/srikanthjg) | Amazon |
| Taylor Gray | [graytaylor0](https://github.com/graytaylor0) | Amazon |
| Dinu John | [dinujoh](https://github.com/dinujoh) | Amazon |
| Krishna Kondaka | [kkondaka](https://github.com/kkondaka) | Amazon |
| Asif Sohail Mohammed | [asifsmohammed](https://github.com/asifsmohammed) | Amazon |
| Karsten Schnitter | [KarstenSchnitter](https://github.com/KarstenSchnitter) | SAP |
| David Venable | [dlvenable](https://github.com/dlvenable) | Amazon |
| Hai Yan | [oeyh](https://github.com/oeyh) | Amazon |


## Emeritus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public interface PluginConfigVariable {
*/
void setValue(Object updatedValue);

/**
* Refresh the secret value on demand
*/
void refresh();

/**
* Returns if the variable is updatable.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ public void setValue(Object updatedValue) {
this.secretValue = updatedValue.toString();
}

@Override
public void refresh() {
}

@Override
public boolean isUpdatable() {
return true;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public void setValue(Object newValue) {
this.secretValue = newValue;
}

@Override
public void refresh() {
secretsSupplier.refresh(secretId);
}


@Override
public boolean isUpdatable() {
return isUpdatable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -81,4 +83,11 @@ void testSetValueSuccess(final String input) {
assertThat(objectUnderTest.getValue(), equalTo(input));
}


@Test
void testRefreshSecretsWithKey() {
objectUnderTest.refresh();
verify(secretsSupplier, times(1)).refresh(secretId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException;
import software.amazon.kinesis.exceptions.ThrottlingException;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

import java.time.Duration;
Expand Down Expand Up @@ -185,9 +186,10 @@ public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {
.tableName(tableName)
.namespace(kclMetricsNamespaceName);

RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
ConsumerStrategy consumerStrategy = kinesisSourceConfig.getConsumerStrategy();
if (consumerStrategy == ConsumerStrategy.POLLING) {
configsBuilder.retrievalConfig().retrievalSpecificConfig(
retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(
new PollingConfig(kinesisClient)
.maxRecords(kinesisSourceConfig.getPollingConfig().getMaxPollingRecords())
.idleTimeBetweenReadsInMillis(
Expand All @@ -203,7 +205,7 @@ public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
retrievalConfig
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -276,6 +277,30 @@ void testCreateSchedulerWithPollingStrategy() {
verify(workerIdentifierGenerator, times(1)).generate();
}

@Test
void testCreateSchedulerWithPollingStrategyAndPollingConfig() {
when(kinesisSourceConfig.getConsumerStrategy()).thenReturn(ConsumerStrategy.POLLING);
when(kinesisSourceConfig.getPollingConfig()).thenReturn(kinesisStreamPollingConfig);
KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory,
pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator);
Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer);

assertEquals(kinesisService.getApplicationName(), pipelineName);
assertNotNull(schedulerObjectUnderTest);
assertNotNull(schedulerObjectUnderTest.checkpointConfig());
assertNotNull(schedulerObjectUnderTest.leaseManagementConfig());
assertSame(schedulerObjectUnderTest.leaseManagementConfig().initialPositionInStream().getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
assertNotNull(schedulerObjectUnderTest.lifecycleConfig());
assertNotNull(schedulerObjectUnderTest.metricsConfig());
assertSame(schedulerObjectUnderTest.metricsConfig().metricsLevel(), MetricsLevel.DETAILED);
assertNotNull(schedulerObjectUnderTest.processorConfig());
assertNotNull(schedulerObjectUnderTest.retrievalConfig().retrievalSpecificConfig());
assertEquals(((PollingConfig)schedulerObjectUnderTest.retrievalConfig().retrievalSpecificConfig()).maxRecords(), kinesisStreamPollingConfig.getMaxPollingRecords());
assertEquals(((PollingConfig)schedulerObjectUnderTest.retrievalConfig().retrievalSpecificConfig()).idleTimeBetweenReadsInMillis(), kinesisStreamPollingConfig.getIdleTimeBetweenReads().toMillis());
assertNotNull(schedulerObjectUnderTest.retrievalConfig());
verify(workerIdentifierGenerator, times(1)).generate();
}

@Test
void testServiceStartNullBufferThrows() {
KinesisService kinesisService = createObjectUnderTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ void testMetricsOnSpanSet() {

@ParameterizedTest
@CsvSource({
"1, 4",
"0, 4",
"2, 6"
})
void traceGroupCacheMaxSize_provides_an_upper_bound(final long cacheMaxSize, final int expectedProcessedRecords) {
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/rds-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {

implementation 'com.zendesk:mysql-binlog-connector-java:0.29.2'
implementation 'com.mysql:mysql-connector-j:8.4.0'
implementation 'org.postgresql:postgresql:42.7.4'

compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
import org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler;
import org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler;
import org.opensearch.dataprepper.plugins.source.rds.export.ExportTaskManager;
Expand All @@ -26,9 +27,13 @@
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.resync.ResyncScheduler;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManagerFactory;
import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlSchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientFactory;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManagerFactory;
import org.opensearch.dataprepper.plugins.source.rds.stream.ReplicationLogClientFactory;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamScheduler;
import org.opensearch.dataprepper.plugins.source.rds.utils.IdentifierShortener;
import org.slf4j.Logger;
Expand All @@ -37,6 +42,7 @@
import software.amazon.awssdk.services.s3.S3Client;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -101,9 +107,16 @@ public void start(Buffer<Record<Event>> buffer) {
new ClusterApiStrategy(rdsClient) : new InstanceApiStrategy(rdsClient);
final DbMetadata dbMetadata = rdsApiStrategy.describeDb(sourceConfig.getDbIdentifier());
final String s3PathPrefix = getS3PathPrefix();

final SchemaManager schemaManager = getSchemaManager(sourceConfig, dbMetadata);
final Map<String, Map<String, String>> tableColumnDataTypeMap = getColumnDataTypeMap(schemaManager);
final DbTableMetadata dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap);
DbTableMetadata dbTableMetadata;
if (sourceConfig.getEngine() == EngineType.MYSQL) {
final Map<String, Map<String, String>> tableColumnDataTypeMap = getColumnDataTypeMap(
(MySqlSchemaManager) schemaManager);
dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap);
} else {
dbTableMetadata = new DbTableMetadata(dbMetadata, Collections.emptyMap());
}

leaderScheduler = new LeaderScheduler(
sourceCoordinator, sourceConfig, s3PathPrefix, schemaManager, dbTableMetadata);
Expand All @@ -121,21 +134,23 @@ public void start(Buffer<Record<Event>> buffer) {
}

if (sourceConfig.isStreamEnabled()) {
BinlogClientFactory binaryLogClientFactory = new BinlogClientFactory(sourceConfig, rdsClient, dbMetadata);
ReplicationLogClientFactory replicationLogClientFactory = new ReplicationLogClientFactory(sourceConfig, rdsClient, dbMetadata);

if (sourceConfig.isTlsEnabled()) {
binaryLogClientFactory.setSSLMode(SSLMode.REQUIRED);
replicationLogClientFactory.setSSLMode(SSLMode.REQUIRED);
} else {
binaryLogClientFactory.setSSLMode(SSLMode.DISABLED);
replicationLogClientFactory.setSSLMode(SSLMode.DISABLED);
}

streamScheduler = new StreamScheduler(
sourceCoordinator, sourceConfig, s3PathPrefix, binaryLogClientFactory, buffer, pluginMetrics, acknowledgementSetManager, pluginConfigObservable);
sourceCoordinator, sourceConfig, s3PathPrefix, replicationLogClientFactory, buffer, pluginMetrics, acknowledgementSetManager, pluginConfigObservable);
runnableList.add(streamScheduler);

resyncScheduler = new ResyncScheduler(
sourceCoordinator, sourceConfig, getQueryManager(sourceConfig, dbMetadata), s3PathPrefix, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(resyncScheduler);
if (sourceConfig.getEngine() == EngineType.MYSQL) {
resyncScheduler = new ResyncScheduler(
sourceCoordinator, sourceConfig, getQueryManager(sourceConfig, dbMetadata), s3PathPrefix, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(resyncScheduler);
}
}

executor = Executors.newFixedThreadPool(runnableList.size());
Expand Down Expand Up @@ -164,19 +179,14 @@ public void shutdown() {
}

private SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) {
final ConnectionManager connectionManager = new ConnectionManager(
dbMetadata.getEndpoint(),
dbMetadata.getPort(),
sourceConfig.getAuthenticationConfig().getUsername(),
sourceConfig.getAuthenticationConfig().getPassword(),
sourceConfig.isTlsEnabled());
return new SchemaManager(connectionManager);
final ConnectionManager connectionManager = new ConnectionManagerFactory(sourceConfig, dbMetadata).getConnectionManager();
return new SchemaManagerFactory(connectionManager).getSchemaManager();
}

private QueryManager getQueryManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) {
final String readerEndpoint = dbMetadata.getReaderEndpoint() != null ? dbMetadata.getReaderEndpoint() : dbMetadata.getEndpoint();
final int readerPort = dbMetadata.getReaderPort() == 0 ? dbMetadata.getPort() : dbMetadata.getReaderPort();
final ConnectionManager readerConnectionManager = new ConnectionManager(
final MySqlConnectionManager readerConnectionManager = new MySqlConnectionManager(
readerEndpoint,
readerPort,
sourceConfig.getAuthenticationConfig().getUsername(),
Expand All @@ -203,13 +213,11 @@ private String getS3PathPrefix() {
return s3PathPrefix;
}

private Map<String, Map<String, String>> getColumnDataTypeMap(final SchemaManager schemaManager) {
private Map<String, Map<String, String>> getColumnDataTypeMap(final MySqlSchemaManager schemaManager) {
return sourceConfig.getTableNames().stream()
.collect(Collectors.toMap(
fullTableName -> fullTableName,
fullTableName -> schemaManager.getColumnDataTypes(fullTableName.split("\\.")[0], fullTableName.split("\\.")[1])
));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

public enum EngineType {

MYSQL("mysql");
MYSQL("mysql"),
POSTGRES("postgres");

private static final Map<String, EngineType> ENGINE_TYPE_MAP = Arrays.stream(EngineType.values())
.collect(Collectors.toMap(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.rds.coordination.state;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.model.ForeignKeyRelation;

import java.util.List;

public class MySqlStreamState {

@JsonProperty("currentPosition")
private BinlogCoordinate currentPosition;

@JsonProperty("foreignKeyRelations")
private List<ForeignKeyRelation> foreignKeyRelations;

public BinlogCoordinate getCurrentPosition() {
return currentPosition;
}

public void setCurrentPosition(BinlogCoordinate currentPosition) {
this.currentPosition = currentPosition;
}

public List<ForeignKeyRelation> getForeignKeyRelations() {
return foreignKeyRelations;
}

public void setForeignKeyRelations(List<ForeignKeyRelation> foreignKeyRelations) {
this.foreignKeyRelations = foreignKeyRelations;
}
}
Loading

0 comments on commit 060f301

Please sign in to comment.