Skip to content

Commit

Permalink
Extended QPT To Datalakegen2 (#1822)
Browse files Browse the repository at this point in the history
Co-authored-by: AbdulRehman Faraj <[email protected]>
Co-authored-by: Jithendar Trianz <[email protected]>
  • Loading branch information
3 people authored Mar 25, 2024
1 parent 944e3a9 commit 9f3ad72
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*-
* #%L
* athena-datalakegen2
* %%
* Copyright (C) 2019 - 2024 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package com.amazonaws.athena.connectors.datalakegen2;

import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;

import java.util.Arrays;

public enum DataLakeGen2DataType {
BIT(Types.MinorType.TINYINT.getType()),
TINYINT(Types.MinorType.SMALLINT.getType()),
NUMERIC(Types.MinorType.FLOAT8.getType()),
SMALLMONEY(Types.MinorType.FLOAT8.getType()),
DATE(Types.MinorType.DATEDAY.getType()),
DATETIME(Types.MinorType.DATEMILLI.getType()),
DATETIME2(Types.MinorType.DATEMILLI.getType()),
SMALLDATETIME(Types.MinorType.DATEMILLI.getType()),
DATETIMEOFFSET(Types.MinorType.DATEMILLI.getType());

private ArrowType arrowType;

DataLakeGen2DataType(ArrowType arrowType)
{
this.arrowType = arrowType;
}

public static ArrowType fromType(String gen2Type)
{
DataLakeGen2DataType result = DataLakeGen2DataType.valueOf(gen2Type.toUpperCase());
return result.arrowType;
}

public static boolean isSupported(String dataType)
{
return Arrays.stream(values()).anyMatch(value -> value.name().equalsIgnoreCase(dataType));
}

public ArrowType getArrowType()
{
return this.arrowType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -138,6 +139,7 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca
TopNPushdownSubType.SUPPORTS_ORDER_BY
));

jdbcQueryPassthrough.addQueryPassthroughCapabilityIfEnabled(capabilities, configOptions);
return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build());
}

Expand Down Expand Up @@ -177,7 +179,10 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl
public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest)
{
LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName());

if (getSplitsRequest.getConstraints().isQueryPassThrough()) {
LOGGER.info("QPT Split Requested");
return setupQueryPassthroughSplit(getSplitsRequest);
}
// Always create single split
Set<Split> splits = new HashSet<>();
splits.add(Split.newBuilder(makeSpillLocation(getSplitsRequest), makeEncryptionKey())
Expand All @@ -197,6 +202,18 @@ public GetTableResponse doGetTable(final BlockAllocator blockAllocator, final Ge
}
}

@Override
protected ArrowType convertDatasourceTypeToArrow(int columnIndex, int precision, Map<String, String> configOptions, ResultSetMetaData metadata) throws SQLException
{
String dataType = metadata.getColumnTypeName(columnIndex);
LOGGER.info("In convertDatasourceTypeToArrow: converting {}", dataType);
if (dataType != null && DataLakeGen2DataType.isSupported(dataType)) {
LOGGER.debug("Data lake Gen2 Datatype is support: {}", dataType);
return DataLakeGen2DataType.fromType(dataType);
}
return super.convertDatasourceTypeToArrow(columnIndex, precision, configOptions, metadata);
}

/**
* Appropriate datatype to arrow type conversions will be done by fetching data types of columns
* @param jdbcConnection
Expand Down Expand Up @@ -247,47 +264,14 @@ private Schema getSchema(Connection jdbcConnection, TableName tableName, Schema
LOGGER.debug("columnName: " + columnName);
LOGGER.debug("dataType: " + dataType);

/**
* Converting date data type into DATEDAY since framework is unable to do it by default
*/
if ("date".equalsIgnoreCase(dataType)) {
columnType = Types.MinorType.DATEDAY.getType();
}
/**
* Converting bit data type into TINYINT because BIT type is showing 0 as false and 1 as true.
* we can avoid it by changing to TINYINT.
*/
if ("bit".equalsIgnoreCase(dataType)) {
columnType = Types.MinorType.TINYINT.getType();
}
/**
* Converting tinyint data type into SMALLINT.
* TINYINT range is 0 to 255 in SQL Server, usage of TINYINT(ArrowType) leads to data loss as its using 1 bit as signed flag.
*/
if ("tinyint".equalsIgnoreCase(dataType)) {
columnType = Types.MinorType.SMALLINT.getType();
}
/**
* Converting numeric, smallmoney data types into FLOAT8 to avoid data loss
* (ex: 123.45 is shown as 123 (loosing its scale))
*/
if ("numeric".equalsIgnoreCase(dataType) || "smallmoney".equalsIgnoreCase(dataType)) {
columnType = Types.MinorType.FLOAT8.getType();
}
/**
* Converting time data type(s) into DATEMILLI since framework is unable to map it by default
*/
if ("datetime".equalsIgnoreCase(dataType) || "datetime2".equalsIgnoreCase(dataType)
|| "smalldatetime".equalsIgnoreCase(dataType) || "datetimeoffset".equalsIgnoreCase(dataType)) {
columnType = Types.MinorType.DATEMILLI.getType();
if (dataType != null && DataLakeGen2DataType.isSupported(dataType)) {
columnType = DataLakeGen2DataType.fromType(dataType);
}

/**
* converting into VARCHAR for non supported data types.
*/
if (columnType == null) {
columnType = Types.MinorType.VARCHAR.getType();
}
if (columnType != null && !SupportedTypes.isSupported(columnType)) {
if ((columnType == null) || !SupportedTypes.isSupported(columnType)) {
columnType = Types.MinorType.VARCHAR.getType();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,14 @@ public DataLakeGen2RecordHandler(DatabaseConnectionConfig databaseConnectionConf
@Override
public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException
{
PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
PreparedStatement preparedStatement;

if (constraints.isQueryPassThrough()) {
preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints);
}
else {
preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
}
// Disable fetching all rows.
preparedStatement.setFetchSize(FETCH_SIZE);
return preparedStatement;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*-
* #%L
* athena-datalakegen2
* %%
* Copyright (C) 2019 - 2024 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package com.amazonaws.athena.connectors.datalakegen2;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;

class DataLakeGen2DataTypeTest {

@Test
void isSupported() {
assertTrue(DataLakeGen2DataType.isSupported("bit"));
assertTrue(DataLakeGen2DataType.isSupported("TINYINT"));
assertTrue(DataLakeGen2DataType.isSupported("NUMERIC"));
assertTrue(DataLakeGen2DataType.isSupported("SMALLMONEY"));
assertTrue(DataLakeGen2DataType.isSupported("DATE"));
assertTrue(DataLakeGen2DataType.isSupported("DATETIME"));
assertTrue(DataLakeGen2DataType.isSupported("DATETIME2"));
assertTrue(DataLakeGen2DataType.isSupported("SMALLDATETIME"));
assertTrue(DataLakeGen2DataType.isSupported("DATETIMEOFFSET"));

assertFalse(DataLakeGen2DataType.isSupported("unknownType"));
}

@Test
void fromType()
{
assertEquals(DataLakeGen2DataType.BIT.getArrowType(), DataLakeGen2DataType.fromType("BIT"));
assertEquals(DataLakeGen2DataType.TINYINT.getArrowType(), DataLakeGen2DataType.fromType("TINYINT"));
assertEquals(DataLakeGen2DataType.NUMERIC.getArrowType(), DataLakeGen2DataType.fromType("NUMERIC"));
assertEquals(DataLakeGen2DataType.SMALLMONEY.getArrowType(), DataLakeGen2DataType.fromType("SMALLMONEY"));
assertEquals(DataLakeGen2DataType.DATE.getArrowType(), DataLakeGen2DataType.fromType("DATE"));
assertEquals(DataLakeGen2DataType.DATETIME.getArrowType(), DataLakeGen2DataType.fromType("DATETIME"));
assertEquals(DataLakeGen2DataType.DATETIME2.getArrowType(), DataLakeGen2DataType.fromType("DATETIME2"));
assertEquals(DataLakeGen2DataType.SMALLDATETIME.getArrowType(), DataLakeGen2DataType.fromType("SMALLDATETIME"));
assertEquals(DataLakeGen2DataType.DATETIMEOFFSET.getArrowType(), DataLakeGen2DataType.fromType("DATETIMEOFFSET"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,7 @@ public GetTableResponse doGetQueryPassthroughSchema(final BlockAllocator blockAl
columnName = columnName.equals(columnLabel) ? columnName : columnLabel;

int precision = metadata.getPrecision(columnIndex);
int scale = metadata.getScale(columnIndex);

ArrowType columnType = JdbcArrowTypeConverter.toArrowType(
metadata.getColumnType(columnIndex),
precision,
scale,
configOptions);
ArrowType columnType = convertDatasourceTypeToArrow(columnIndex, precision, configOptions, metadata);

if (columnType != null && SupportedTypes.isSupported(columnType)) {
if (columnType instanceof ArrowType.List) {
Expand All @@ -307,6 +301,28 @@ public GetTableResponse doGetQueryPassthroughSchema(final BlockAllocator blockAl
}
}

/**
* A method that takes in a JDBC type; and converts it to Arrow Type
* This can be overriden by other Metadata Handlers extending JDBC
*
* @param columnIndex
* @param precision
* @param configOptions
* @param metadata
* @return Arrow Type
*/
protected ArrowType convertDatasourceTypeToArrow(int columnIndex, int precision, Map<String, String> configOptions, ResultSetMetaData metadata) throws SQLException
{
int scale = metadata.getScale(columnIndex);
int columnType = metadata.getColumnType(columnIndex);

return JdbcArrowTypeConverter.toArrowType(
columnType,
precision,
scale,
configOptions);
}

/**
* While being a no-op by default, this function will be overriden by subclasses that support this search.
*
Expand Down

0 comments on commit 9f3ad72

Please sign in to comment.