Skip to content

Commit

Permalink
Extend QPT to Postgresql & Redshift (#1820)
Browse files Browse the repository at this point in the history
Co-authored-by: AbdulRehman Faraj <[email protected]>
  • Loading branch information
AbdulR3hman and AbdulRehman Faraj authored Mar 21, 2024
1 parent 0d7c6b7 commit 7ac0853
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import java.util.Map;
import java.util.Set;

import static com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler.TABLES_AND_VIEWS;
import static com.amazonaws.athena.connectors.postgresql.PostGreSqlConstants.POSTGRESQL_DEFAULT_PORT;
import static com.amazonaws.athena.connectors.postgresql.PostGreSqlConstants.POSTGRESQL_DRIVER_CLASS;
import static com.amazonaws.athena.connectors.postgresql.PostGreSqlConstants.POSTGRES_NAME;
Expand Down Expand Up @@ -142,6 +141,7 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca
.toArray(String[]::new))
));

jdbcQueryPassthrough.addQueryPassthroughCapabilityIfEnabled(capabilities, configOptions);
return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build());
}

Expand Down Expand Up @@ -198,6 +198,11 @@ public void getPartitions(final BlockWriter blockWriter, final GetTableLayoutReq
public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest)
{
LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName());
if (getSplitsRequest.getConstraints().isQueryPassThrough()) {
LOGGER.info("QPT Split Requested");
return setupQueryPassthroughSplit(getSplitsRequest);
}

int partitionContd = decodeContinuationToken(getSplitsRequest);
Set<Split> splits = new HashSet<>();
Block partitions = getSplitsRequest.getPartitions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,13 @@ protected PostGreSqlRecordHandler(DatabaseConnectionConfig databaseConnectionCon
public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split)
throws SQLException
{
PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);

PreparedStatement preparedStatement;
if (constraints.isQueryPassThrough()) {
preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints);
}
else {
preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
}
// Disable fetching all rows.
preparedStatement.setFetchSize(FETCH_SIZE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.Arrays;
import java.util.List;

import static com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler.TABLES_AND_VIEWS;
import static com.amazonaws.athena.connectors.redshift.RedshiftConstants.REDSHIFT_DEFAULT_PORT;
import static com.amazonaws.athena.connectors.redshift.RedshiftConstants.REDSHIFT_DRIVER_CLASS;
import static com.amazonaws.athena.connectors.redshift.RedshiftConstants.REDSHIFT_NAME;
Expand Down Expand Up @@ -125,6 +124,7 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca
TopNPushdownSubType.SUPPORTS_ORDER_BY
));

jdbcQueryPassthrough.addQueryPassthroughCapabilityIfEnabled(capabilities, configOptions);
return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build());
}

Expand Down

0 comments on commit 7ac0853

Please sign in to comment.