Skip to content

Commit

Permalink
Fetch Redshift query results unloaded to S3
Browse files Browse the repository at this point in the history
Co-authored-by: Mayank Vadariya <[email protected]>
  • Loading branch information
2 people authored and raunaqmorarka committed Dec 31, 2024
1 parent b8785da commit b382e03
Show file tree
Hide file tree
Showing 21 changed files with 1,308 additions and 17 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ jobs:
REDSHIFT_IAM_ROLES: ${{ vars.REDSHIFT_IAM_ROLES }}
REDSHIFT_VPC_SECURITY_GROUP_IDS: ${{ vars.REDSHIFT_VPC_SECURITY_GROUP_IDS }}
REDSHIFT_S3_TPCH_TABLES_ROOT: ${{ vars.REDSHIFT_S3_TPCH_TABLES_ROOT }}
REDSHIFT_S3_UNLOAD_ROOT: ${{ vars.REDSHIFT_S3_UNLOAD_ROOT }}
if: >-
contains(matrix.modules, 'trino-redshift') &&
(contains(matrix.profile, 'cloud-tests') || contains(matrix.profile, 'fte-tests')) &&
Expand All @@ -752,6 +753,7 @@ jobs:
-Dtest.redshift.jdbc.password="${REDSHIFT_PASSWORD}" \
-Dtest.redshift.jdbc.endpoint="${REDSHIFT_ENDPOINT}:${REDSHIFT_PORT}/" \
-Dtest.redshift.s3.tpch.tables.root="${REDSHIFT_S3_TPCH_TABLES_ROOT}" \
-Dtest.redshift.s3.unload.root="${REDSHIFT_S3_UNLOAD_ROOT}" \
-Dtest.redshift.iam.role="${REDSHIFT_IAM_ROLES}" \
-Dtest.redshift.aws.region="${AWS_REGION}" \
-Dtest.redshift.aws.access-key="${AWS_ACCESS_KEY_ID}" \
Expand Down
37 changes: 37 additions & 0 deletions docs/src/main/sphinx/connector/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,43 @@ documentation](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-configura
```{include} jdbc-authentication.fragment
```

### UNLOAD configuration

This feature enables using Amazon S3 to efficiently transfer data out of Redshift
instead of the default single threaded JDBC based implementation.
The connector automatically triggers the appropriate `UNLOAD` command
on Redshift to extract the output from Redshift to the configured
S3 bucket in the form of Parquet files. These Parquet files are read in parallel
from S3 to improve latency of reading from Redshift tables. The Parquet
files will be removed when Trino finishes executing the query. It is recommended
to define a custom life cycle policy on the S3 bucket used for unloading the
Redshift query results.
This feature is supported only when the Redshift cluster and the configured S3
bucket are in the same AWS region.

The following table describes configuration properties for using
`UNLOAD` command in Redshift connector. `redshift.unload-location` must be set
to use `UNLOAD`.

:::{list-table} UNLOAD configuration properties
:widths: 30, 60
:header-rows: 1

* - Property value
- Description
* - `redshift.unload-location`
- A writeable location in Amazon S3, to be used for temporarily unloading
Redshift query results.
* - `redshift.unload-iam-role`
- Optional. Fully specified ARN of the IAM Role attached to the Redshift cluster.
Provided role will be used in `UNLOAD` command. IAM role must have access to
Redshift cluster and write access to S3 bucket. The default IAM role attached to
Redshift cluster is used when this property is not configured.
:::

Additionally, define appropriate [S3 configurations](/object-storage/file-system-s3)
except `fs.native-s3.enabled`, required to read Parquet files from S3 bucket.

### Multiple Redshift databases or clusters

The Redshift connector can only access a single database within
Expand Down
81 changes: 76 additions & 5 deletions plugin/trino-redshift/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
<version>2.1.0.30</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand All @@ -30,21 +35,61 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>configuration</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-base-jdbc</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem-s3</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-matching</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-memory-context</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-parquet</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-plugin-toolkit</artifactId>
Expand All @@ -55,6 +100,16 @@
<artifactId>jakarta.validation-api</artifactId>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
</dependency>

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-core</artifactId>
Expand Down Expand Up @@ -116,19 +171,31 @@

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
<artifactId>log-manager</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-core</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<scope>runtime</scope>
</dependency>

Expand Down Expand Up @@ -236,9 +303,11 @@
<exclude>**/TestRedshiftAutomaticJoinPushdown.java</exclude>
<exclude>**/TestRedshiftCastPushdown.java</exclude>
<exclude>**/TestRedshiftConnectorTest.java</exclude>
<exclude>**/TestRedshiftUnload.java</exclude>
<exclude>**/TestRedshiftConnectorSmokeTest.java</exclude>
<exclude>**/TestRedshiftTableStatisticsReader.java</exclude>
<exclude>**/TestRedshiftTypeMapping.java</exclude>
<exclude>**/TestRedshiftUnloadTypeMapping.java</exclude>
<exclude>**/Test*FailureRecoveryTest.java</exclude>
<exclude>**/Test*FailureRecoverySmokeTest.java</exclude>
</excludes>
Expand All @@ -265,6 +334,8 @@
<!-- JDBC operations performed on the ephemeral AWS Redshift cluster. -->
<include>**/TestRedshiftCastPushdown.java</include>
<include>**/TestRedshiftConnectorSmokeTest.java</include>
<include>**/TestRedshiftUnloadTypeMapping.java</include>
<include>**/TestRedshiftUnload.java</include>
</includes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,44 @@

import com.amazon.redshift.Driver;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.filesystem.s3.S3FileSystemModule;
import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.DecimalModule;
import io.trino.plugin.jdbc.DriverConnectionFactory;
import io.trino.plugin.jdbc.ForBaseJdbc;
import io.trino.plugin.jdbc.ForJdbcDynamicFiltering;
import io.trino.plugin.jdbc.JdbcClient;
import io.trino.plugin.jdbc.JdbcConnector;
import io.trino.plugin.jdbc.JdbcJoinPushdownSupportModule;
import io.trino.plugin.jdbc.JdbcMetadataConfig;
import io.trino.plugin.jdbc.JdbcQueryEventListener;
import io.trino.plugin.jdbc.JdbcRecordSetProvider;
import io.trino.plugin.jdbc.JdbcSplitManager;
import io.trino.plugin.jdbc.JdbcStatisticsConfig;
import io.trino.plugin.jdbc.RemoteQueryCancellationModule;
import io.trino.plugin.jdbc.credential.CredentialProvider;
import io.trino.plugin.jdbc.ptf.Query;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorRecordSetProvider;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.function.table.ConnectorTableFunction;

import java.util.Properties;

import static com.google.inject.Scopes.SINGLETON;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.jdbc.JdbcModule.bindSessionPropertiesProvider;

public class RedshiftClientModule
extends AbstractConfigurationAwareModule
Expand All @@ -53,13 +65,28 @@ public void setup(Binder binder)
configBinder(binder).bindConfigDefaults(JdbcMetadataConfig.class, config -> config.setBulkListColumns(true));
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(SINGLETON);
configBinder(binder).bindConfig(JdbcStatisticsConfig.class);
bindSessionPropertiesProvider(binder, RedshiftSessionProperties.class);

install(new DecimalModule());
install(new JdbcJoinPushdownSupportModule());
install(new RemoteQueryCancellationModule());
binder.bind(ConnectorRecordSetProvider.class).to(JdbcRecordSetProvider.class).in(Scopes.SINGLETON);

binder.bind(RedshiftConnector.class).in(Scopes.SINGLETON);
install(conditionalModule(
RedshiftConfig.class,
config -> config.getUnloadLocation().isPresent(),
unloadBinder -> {
install(new S3FileSystemModule());
unloadBinder.bind(JdbcSplitManager.class).in(Scopes.SINGLETON);
unloadBinder.bind(Connector.class).to(RedshiftUnloadConnector.class).in(Scopes.SINGLETON);
unloadBinder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);

newSetBinder(unloadBinder, JdbcQueryEventListener.class).addBinding().to(RedshiftUnloadJdbcQueryEventListener.class).in(Scopes.SINGLETON);

newOptionalBinder(unloadBinder, Key.get(ConnectorSplitManager.class, ForJdbcDynamicFiltering.class))
.setBinding().to(RedshiftSplitManager.class).in(SINGLETON);
},
jdbcBinder -> jdbcBinder.bind(Connector.class).to(JdbcConnector.class).in(Scopes.SINGLETON)));
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.Pattern;

import java.util.Optional;

Expand All @@ -27,6 +28,8 @@
public class RedshiftConfig
{
private Integer fetchSize;
private String unloadLocation;
private String unloadIamRole;

public Optional<@Min(0) Integer> getFetchSize()
{
Expand All @@ -40,4 +43,30 @@ public RedshiftConfig setFetchSize(Integer fetchSize)
this.fetchSize = fetchSize;
return this;
}

public Optional<@Pattern(regexp = "^s3://[^/]+(/[^/]+)?$", message = "Path shouldn't end with trailing slash") String> getUnloadLocation()
{
return Optional.ofNullable(unloadLocation);
}

@Config("redshift.unload-location")
@ConfigDescription("A writeable location in Amazon S3, to be used for unloading Redshift query results")
public RedshiftConfig setUnloadLocation(String unloadLocation)
{
this.unloadLocation = unloadLocation;
return this;
}

public Optional<String> getUnloadIamRole()
{
return Optional.ofNullable(unloadIamRole);
}

@Config("redshift.unload-iam-role")
@ConfigDescription("Fully specified ARN of the IAM Role attached to the Redshift cluster and having access to S3")
public RedshiftConfig setUnloadIamRole(String unloadIamRole)
{
this.unloadIamRole = unloadIamRole;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,6 @@ public Connector create(String catalogName, Map<String, String> requiredConfig,
.setRequiredConfigurationProperties(requiredConfig)
.initialize();

return injector.getInstance(RedshiftConnector.class);
return injector.getInstance(Connector.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ public enum RedshiftErrorCode
implements ErrorCodeSupplier
{
REDSHIFT_INVALID_TYPE(0, EXTERNAL),
REDSHIFT_PARQUET_BAD_DATA(1, EXTERNAL),
REDSHIFT_PARQUET_CURSOR_ERROR(2, EXTERNAL),
REDSHIFT_FILESYSTEM_ERROR(3, EXTERNAL),
REDSHIFT_S3_CROSS_REGION_UNSUPPORTED(4, EXTERNAL),
/**/;

private final ErrorCode errorCode;
Expand Down
Loading

0 comments on commit b382e03

Please sign in to comment.