diff --git a/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2DataType.java b/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2DataType.java new file mode 100644 index 0000000000..f4e7303e3b --- /dev/null +++ b/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2DataType.java @@ -0,0 +1,60 @@ +/*- + * #%L + * athena-datalakegen2 + * %% + * Copyright (C) 2019 - 2024 Amazon Web Services + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package com.amazonaws.athena.connectors.datalakegen2; + +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.ArrowType; + +import java.util.Arrays; + +public enum DataLakeGen2DataType { + BIT(Types.MinorType.TINYINT.getType()), + TINYINT(Types.MinorType.SMALLINT.getType()), + NUMERIC(Types.MinorType.FLOAT8.getType()), + SMALLMONEY(Types.MinorType.FLOAT8.getType()), + DATE(Types.MinorType.DATEDAY.getType()), + DATETIME(Types.MinorType.DATEMILLI.getType()), + DATETIME2(Types.MinorType.DATEMILLI.getType()), + SMALLDATETIME(Types.MinorType.DATEMILLI.getType()), + DATETIMEOFFSET(Types.MinorType.DATEMILLI.getType()); + + private ArrowType arrowType; + + DataLakeGen2DataType(ArrowType arrowType) + { + this.arrowType = arrowType; + } + + public static ArrowType fromType(String gen2Type) + { + DataLakeGen2DataType result = DataLakeGen2DataType.valueOf(gen2Type.toUpperCase()); + return result.arrowType; + } + + public static boolean isSupported(String dataType) + { + return Arrays.stream(values()).anyMatch(value -> value.name().equalsIgnoreCase(dataType)); + } + + public ArrowType getArrowType() + { + return this.arrowType; + } +} diff --git a/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2MetadataHandler.java b/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2MetadataHandler.java index 73f3fb06ef..53fd9386fe 100644 --- a/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2MetadataHandler.java +++ b/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2MetadataHandler.java @@ -63,6 +63,7 @@ import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.Arrays; import java.util.HashMap; @@ -138,6 +139,7 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca TopNPushdownSubType.SUPPORTS_ORDER_BY )); + jdbcQueryPassthrough.addQueryPassthroughCapabilityIfEnabled(capabilities, configOptions); return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } @@ -177,7 +179,10 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) { LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); - + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } // Always create single split Set splits = new HashSet<>(); splits.add(Split.newBuilder(makeSpillLocation(getSplitsRequest), makeEncryptionKey()) @@ -197,6 +202,18 @@ public GetTableResponse doGetTable(final BlockAllocator blockAllocator, final Ge } } + @Override + protected ArrowType convertDatasourceTypeToArrow(int columnIndex, int precision, Map configOptions, ResultSetMetaData metadata) throws SQLException + { + String dataType = metadata.getColumnTypeName(columnIndex); + LOGGER.info("In convertDatasourceTypeToArrow: converting {}", dataType); + if (dataType != null && DataLakeGen2DataType.isSupported(dataType)) { + LOGGER.debug("Data lake Gen2 Datatype is support: {}", dataType); + return DataLakeGen2DataType.fromType(dataType); + } + return super.convertDatasourceTypeToArrow(columnIndex, precision, configOptions, metadata); + } + /** * Appropriate datatype to arrow type conversions will be done by fetching data types of columns * @param jdbcConnection @@ -247,47 +264,14 @@ private Schema getSchema(Connection jdbcConnection, TableName tableName, Schema LOGGER.debug("columnName: " + columnName); LOGGER.debug("dataType: " + dataType); - /** - * Converting date data type into DATEDAY since framework is unable to do it by default - */ - if ("date".equalsIgnoreCase(dataType)) { - columnType = Types.MinorType.DATEDAY.getType(); - } - /** - * Converting bit data type into TINYINT because BIT type is showing 0 as false and 1 as true. - * we can avoid it by changing to TINYINT. - */ - if ("bit".equalsIgnoreCase(dataType)) { - columnType = Types.MinorType.TINYINT.getType(); - } - /** - * Converting tinyint data type into SMALLINT. - * TINYINT range is 0 to 255 in SQL Server, usage of TINYINT(ArrowType) leads to data loss as its using 1 bit as signed flag. - */ - if ("tinyint".equalsIgnoreCase(dataType)) { - columnType = Types.MinorType.SMALLINT.getType(); - } - /** - * Converting numeric, smallmoney data types into FLOAT8 to avoid data loss - * (ex: 123.45 is shown as 123 (loosing its scale)) - */ - if ("numeric".equalsIgnoreCase(dataType) || "smallmoney".equalsIgnoreCase(dataType)) { - columnType = Types.MinorType.FLOAT8.getType(); - } - /** - * Converting time data type(s) into DATEMILLI since framework is unable to map it by default - */ - if ("datetime".equalsIgnoreCase(dataType) || "datetime2".equalsIgnoreCase(dataType) - || "smalldatetime".equalsIgnoreCase(dataType) || "datetimeoffset".equalsIgnoreCase(dataType)) { - columnType = Types.MinorType.DATEMILLI.getType(); + if (dataType != null && DataLakeGen2DataType.isSupported(dataType)) { + columnType = DataLakeGen2DataType.fromType(dataType); } + /** * converting into VARCHAR for non supported data types. */ - if (columnType == null) { - columnType = Types.MinorType.VARCHAR.getType(); - } - if (columnType != null && !SupportedTypes.isSupported(columnType)) { + if ((columnType == null) || !SupportedTypes.isSupported(columnType)) { columnType = Types.MinorType.VARCHAR.getType(); } diff --git a/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2RecordHandler.java b/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2RecordHandler.java index da9da4bb98..16b3e5b584 100644 --- a/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2RecordHandler.java +++ b/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2RecordHandler.java @@ -67,7 +67,14 @@ public DataLakeGen2RecordHandler(DatabaseConnectionConfig databaseConnectionConf @Override public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException { - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + PreparedStatement preparedStatement; + + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + } // Disable fetching all rows. preparedStatement.setFetchSize(FETCH_SIZE); return preparedStatement; diff --git a/athena-datalakegen2/src/test/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2DataTypeTest.java b/athena-datalakegen2/src/test/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2DataTypeTest.java new file mode 100644 index 0000000000..af0b103d8b --- /dev/null +++ b/athena-datalakegen2/src/test/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2DataTypeTest.java @@ -0,0 +1,56 @@ +/*- + * #%L + * athena-datalakegen2 + * %% + * Copyright (C) 2019 - 2024 Amazon Web Services + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package com.amazonaws.athena.connectors.datalakegen2; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class DataLakeGen2DataTypeTest { + + @Test + void isSupported() { + assertTrue(DataLakeGen2DataType.isSupported("bit")); + assertTrue(DataLakeGen2DataType.isSupported("TINYINT")); + assertTrue(DataLakeGen2DataType.isSupported("NUMERIC")); + assertTrue(DataLakeGen2DataType.isSupported("SMALLMONEY")); + assertTrue(DataLakeGen2DataType.isSupported("DATE")); + assertTrue(DataLakeGen2DataType.isSupported("DATETIME")); + assertTrue(DataLakeGen2DataType.isSupported("DATETIME2")); + assertTrue(DataLakeGen2DataType.isSupported("SMALLDATETIME")); + assertTrue(DataLakeGen2DataType.isSupported("DATETIMEOFFSET")); + + assertFalse(DataLakeGen2DataType.isSupported("unknownType")); + } + + @Test + void fromType() + { + assertEquals(DataLakeGen2DataType.BIT.getArrowType(), DataLakeGen2DataType.fromType("BIT")); + assertEquals(DataLakeGen2DataType.TINYINT.getArrowType(), DataLakeGen2DataType.fromType("TINYINT")); + assertEquals(DataLakeGen2DataType.NUMERIC.getArrowType(), DataLakeGen2DataType.fromType("NUMERIC")); + assertEquals(DataLakeGen2DataType.SMALLMONEY.getArrowType(), DataLakeGen2DataType.fromType("SMALLMONEY")); + assertEquals(DataLakeGen2DataType.DATE.getArrowType(), DataLakeGen2DataType.fromType("DATE")); + assertEquals(DataLakeGen2DataType.DATETIME.getArrowType(), DataLakeGen2DataType.fromType("DATETIME")); + assertEquals(DataLakeGen2DataType.DATETIME2.getArrowType(), DataLakeGen2DataType.fromType("DATETIME2")); + assertEquals(DataLakeGen2DataType.SMALLDATETIME.getArrowType(), DataLakeGen2DataType.fromType("SMALLDATETIME")); + assertEquals(DataLakeGen2DataType.DATETIMEOFFSET.getArrowType(), DataLakeGen2DataType.fromType("DATETIMEOFFSET")); + } +} diff --git a/athena-jdbc/src/main/java/com/amazonaws/athena/connectors/jdbc/manager/JdbcMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/athena/connectors/jdbc/manager/JdbcMetadataHandler.java index a542182190..bb4154e6dd 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/athena/connectors/jdbc/manager/JdbcMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/athena/connectors/jdbc/manager/JdbcMetadataHandler.java @@ -275,13 +275,7 @@ public GetTableResponse doGetQueryPassthroughSchema(final BlockAllocator blockAl columnName = columnName.equals(columnLabel) ? columnName : columnLabel; int precision = metadata.getPrecision(columnIndex); - int scale = metadata.getScale(columnIndex); - - ArrowType columnType = JdbcArrowTypeConverter.toArrowType( - metadata.getColumnType(columnIndex), - precision, - scale, - configOptions); + ArrowType columnType = convertDatasourceTypeToArrow(columnIndex, precision, configOptions, metadata); if (columnType != null && SupportedTypes.isSupported(columnType)) { if (columnType instanceof ArrowType.List) { @@ -307,6 +301,28 @@ public GetTableResponse doGetQueryPassthroughSchema(final BlockAllocator blockAl } } + /** + * A method that takes in a JDBC type; and converts it to Arrow Type + * This can be overriden by other Metadata Handlers extending JDBC + * + * @param columnIndex + * @param precision + * @param configOptions + * @param metadata + * @return Arrow Type + */ + protected ArrowType convertDatasourceTypeToArrow(int columnIndex, int precision, Map configOptions, ResultSetMetaData metadata) throws SQLException + { + int scale = metadata.getScale(columnIndex); + int columnType = metadata.getColumnType(columnIndex); + + return JdbcArrowTypeConverter.toArrowType( + columnType, + precision, + scale, + configOptions); + } + /** * While being a no-op by default, this function will be overriden by subclasses that support this search. *