From 944e3a99a4c5702f6c859e4997eaa10e125642b1 Mon Sep 17 00:00:00 2001 From: AbdulRehman Date: Mon, 25 Mar 2024 09:41:10 -0700 Subject: [PATCH] Extended QPT to athena-db2 (#1825) Co-authored-by: AbdulRehman Faraj --- .../athena/connectors/db2/Db2MetadataHandler.java | 6 ++++++ .../athena/connectors/db2/Db2RecordHandler.java | 9 ++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2MetadataHandler.java b/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2MetadataHandler.java index 62f6783e76..d5dec08242 100644 --- a/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2MetadataHandler.java +++ b/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2MetadataHandler.java @@ -272,6 +272,10 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) { LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } int partitionContd = decodeContinuationToken(getSplitsRequest); Block partitions = getSplitsRequest.getPartitions(); @@ -338,6 +342,8 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca capabilities.put(DataSourceOptimizations.SUPPORTS_LIMIT_PUSHDOWN.withSupportedSubTypes( LimitPushdownSubType.INTEGER_CONSTANT )); + + jdbcQueryPassthrough.addQueryPassthroughCapabilityIfEnabled(capabilities, configOptions); return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } diff --git a/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2RecordHandler.java b/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2RecordHandler.java index 13f7efd868..442d19fee3 100644 --- a/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2RecordHandler.java +++ b/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2RecordHandler.java @@ -88,7 +88,14 @@ public Db2RecordHandler(DatabaseConnectionConfig databaseConnectionConfig, java. @Override public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException { - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + PreparedStatement preparedStatement; + + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + } // Disable fetching all rows. preparedStatement.setFetchSize(FETCH_SIZE); return preparedStatement;