From dec75b6942d6bd138527a60962dc5054046fffe2 Mon Sep 17 00:00:00 2001 From: AbdulRehman Date: Thu, 28 Mar 2024 14:39:27 -0700 Subject: [PATCH] Extended QPT to athena-sqlserver (#1823) Co-authored-by: AbdulRehman Faraj --- .../sqlserver/SqlServerDataType.java | 58 ++++++++++++++++++ .../sqlserver/SqlServerMetadataHandler.java | 59 +++++++------------ .../sqlserver/SqlServerRecordHandler.java | 11 +++- 3 files changed, 88 insertions(+), 40 deletions(-) create mode 100644 athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerDataType.java diff --git a/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerDataType.java b/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerDataType.java new file mode 100644 index 0000000000..6f290834f7 --- /dev/null +++ b/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerDataType.java @@ -0,0 +1,58 @@ +/*- + * #%L + * athena-sqlserver + * %% + * Copyright (C) 2019 - 2024 Amazon Web Services + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package com.amazonaws.athena.connectors.sqlserver; + +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.ArrowType; + +public enum SqlServerDataType { + BIT(Types.MinorType.TINYINT.getType()), + TINYINT(Types.MinorType.SMALLINT.getType()), + NUMERIC(Types.MinorType.FLOAT8.getType()), + SMALLMONEY(Types.MinorType.FLOAT8.getType()), + DATE(Types.MinorType.DATEDAY.getType()), + DATETIME(Types.MinorType.DATEMILLI.getType()), + DATETIME2(Types.MinorType.DATEMILLI.getType()), + SMALLDATETIME(Types.MinorType.DATEMILLI.getType()), + DATETIMEOFFSET(Types.MinorType.DATEMILLI.getType()); + + private ArrowType arrowType; + + SqlServerDataType(ArrowType arrowType) + { + this.arrowType = arrowType; + } + + public static ArrowType fromType(String sqlServerType) + { + SqlServerDataType result = valueOf(sqlServerType.toUpperCase()); + return result.arrowType; + } + + public static boolean isSupported(String dataType) + { + for (SqlServerDataType sqlServerDataType : values()) { + if (sqlServerDataType.name().equalsIgnoreCase(dataType)) { + return true; + } + } + return false; + } +} diff --git a/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerMetadataHandler.java b/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerMetadataHandler.java index 95914b56e8..9ffd2dd16e 100644 --- a/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerMetadataHandler.java +++ b/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerMetadataHandler.java @@ -71,6 +71,7 @@ import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; @@ -203,6 +204,7 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca TopNPushdownSubType.SUPPORTS_ORDER_BY )); + jdbcQueryPassthrough.addQueryPassthroughCapabilityIfEnabled(capabilities, configOptions); return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } @@ -300,6 +302,10 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) { LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } int partitionContd = decodeContinuationToken(getSplitsRequest); LOGGER.info("partitionContd: {}", partitionContd); @@ -388,6 +394,18 @@ public GetTableResponse doGetTable(final BlockAllocator blockAllocator, final Ge } } + @Override + protected ArrowType convertDatasourceTypeToArrow(int columnIndex, int precision, Map configOptions, ResultSetMetaData metadata) throws SQLException + { + String dataType = metadata.getColumnTypeName(columnIndex); + LOGGER.info("In convertDatasourceTypeToArrow: converting {}", dataType); + if (dataType != null && SqlServerDataType.isSupported(dataType)) { + LOGGER.debug("Sql Server Datatype is support: {}", dataType); + return SqlServerDataType.fromType(dataType); + } + return super.convertDatasourceTypeToArrow(columnIndex, precision, configOptions, metadata); + } + /** * Appropriate datatype to arrow type conversions will be done by fetching data types of columns * @param jdbcConnection @@ -436,48 +454,13 @@ private Schema getSchema(Connection jdbcConnection, TableName tableName, Schema LOGGER.debug("columnName: " + columnName); LOGGER.debug("dataType: " + dataType); - /** - * Converting date data type into DATEDAY since framework is unable to do it by default - */ - if ("date".equalsIgnoreCase(dataType)) { - columnType = Types.MinorType.DATEDAY.getType(); - } - /** - * Converting bit data type into TINYINT because BIT type is showing 0 as false and 1 as true. - * we can avoid it by changing to TINYINT. - */ - if ("bit".equalsIgnoreCase(dataType)) { - columnType = Types.MinorType.TINYINT.getType(); - } - /** - * Converting tinyint data type into SMALLINT. - * TINYINT range is 0 to 255 in SQL Server, usage of TINYINT(ArrowType) leads to data loss - * as its using 1 bit as signed flag. - */ - if ("tinyint".equalsIgnoreCase(dataType)) { - columnType = Types.MinorType.SMALLINT.getType(); - } - /** - * Converting numeric, smallmoney data types into FLOAT8 to avoid data loss - * (ex: 123.45 is shown as 123 (loosing its scale)) - */ - if ("numeric".equalsIgnoreCase(dataType) || "smallmoney".equalsIgnoreCase(dataType)) { - columnType = Types.MinorType.FLOAT8.getType(); - } - /** - * Converting time data type(s) into DATEMILLI since framework is unable to map it by default - */ - if ("datetime".equalsIgnoreCase(dataType) || "datetime2".equalsIgnoreCase(dataType) - || "smalldatetime".equalsIgnoreCase(dataType) || "datetimeoffset".equalsIgnoreCase(dataType)) { - columnType = Types.MinorType.DATEMILLI.getType(); + if (dataType != null && SqlServerDataType.isSupported(dataType)) { + columnType = SqlServerDataType.fromType(dataType); } /** * converting into VARCHAR for non supported data types. */ - if (columnType == null) { - columnType = Types.MinorType.VARCHAR.getType(); - } - if (columnType != null && !SupportedTypes.isSupported(columnType)) { + if ((columnType == null) || !SupportedTypes.isSupported(columnType)) { columnType = Types.MinorType.VARCHAR.getType(); } diff --git a/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerRecordHandler.java b/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerRecordHandler.java index 985969821b..e1b64e79f5 100644 --- a/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerRecordHandler.java +++ b/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerRecordHandler.java @@ -79,8 +79,15 @@ public SqlServerRecordHandler(DatabaseConnectionConfig databaseConnectionConfig, public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException { - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), - schema, constraints, split); + PreparedStatement preparedStatement; + + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), + schema, constraints, split); + } // Disable fetching all rows. preparedStatement.setFetchSize(FETCH_SIZE); return preparedStatement;