Skip to content

Commit

Permalink
Extended QPT to athena-docdb (#1796)
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdulR3hman authored Mar 13, 2024
1 parent 834ce65 commit a69a525
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.amazonaws.athena.connector.lambda.domain.TableName;
import com.amazonaws.athena.connector.lambda.domain.spill.SpillLocation;
import com.amazonaws.athena.connector.lambda.handlers.GlueMetadataHandler;
import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesResponse;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse;
import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest;
Expand All @@ -37,13 +39,16 @@
import com.amazonaws.athena.connector.lambda.metadata.ListTablesResponse;
import com.amazonaws.athena.connector.lambda.metadata.MetadataRequest;
import com.amazonaws.athena.connector.lambda.metadata.glue.GlueFieldLexer;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType;
import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory;
import com.amazonaws.athena.connectors.docdb.qpt.DocDBQueryPassthrough;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.glue.AWSGlue;
import com.amazonaws.services.glue.model.Database;
import com.amazonaws.services.glue.model.Table;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
Expand Down Expand Up @@ -98,6 +103,7 @@ public class DocDBMetadataHandler

private final AWSGlue glue;
private final DocDBConnectionFactory connectionFactory;
private final DocDBQueryPassthrough queryPassthrough = new DocDBQueryPassthrough();

public DocDBMetadataHandler(java.util.Map<String, String> configOptions)
{
Expand Down Expand Up @@ -143,6 +149,15 @@ private String getConnStr(MetadataRequest request)
return conStr;
}

@Override
public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAllocator allocator, GetDataSourceCapabilitiesRequest request)
{
ImmutableMap.Builder<String, List<OptimizationSubType>> capabilities = ImmutableMap.builder();
queryPassthrough.addQueryPassthroughCapabilityIfEnabled(capabilities, configOptions);

return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build());
}

/**
* List databases in your DocumentDB instance treating each as a 'schema' (aka database)
*
Expand Down Expand Up @@ -241,7 +256,6 @@ public ListTablesResponse doListTables(BlockAllocator blockAllocator, ListTables
private Stream<String> doListTablesWithCommand(MongoClient client, ListTablesRequest request)
{
logger.debug("doListTablesWithCommand Start");
List<String> tables = new ArrayList<>();
Document queryDocument = new Document("listCollections", 1).append("nameOnly", true).append("authorizedCollections", true);
Document document = client.getDatabase(request.getSchemaName()).runCommand(queryDocument);

Expand All @@ -262,8 +276,19 @@ public GetTableResponse doGetTable(BlockAllocator blockAllocator, GetTableReques
throws Exception
{
logger.info("doGetTable: enter", request.getTableName());
String schemaNameInput = request.getTableName().getSchemaName();
String tableNameInput = request.getTableName().getTableName();
String schemaNameInput;
String tableNameInput;

if (request.isQueryPassthrough()) {
queryPassthrough.verify(request.getQueryPassthroughArguments());
schemaNameInput = request.getQueryPassthroughArguments().get(DocDBQueryPassthrough.DATABASE);
tableNameInput = request.getQueryPassthroughArguments().get(DocDBQueryPassthrough.COLLECTION);
}
else {
schemaNameInput = request.getTableName().getSchemaName();
tableNameInput = request.getTableName().getTableName();
}

TableName tableName = new TableName(schemaNameInput, tableNameInput);
Schema schema = null;
try {
Expand Down Expand Up @@ -292,6 +317,12 @@ public GetTableResponse doGetTable(BlockAllocator blockAllocator, GetTableReques
return new GetTableResponse(request.getCatalogName(), tableName, schema);
}

@Override
public GetTableResponse doGetQueryPassthroughSchema(BlockAllocator allocator, GetTableRequest request) throws Exception
{
return doGetTable(allocator, request);
}

/**
* Our table doesn't support complex layouts or partitioning so we simply make this method a NoOp.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.amazonaws.athena.connector.lambda.domain.predicate.ValueSet;
import com.amazonaws.athena.connector.lambda.handlers.RecordHandler;
import com.amazonaws.athena.connector.lambda.records.ReadRecordsRequest;
import com.amazonaws.athena.connectors.docdb.qpt.DocDBQueryPassthrough;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.athena.AmazonAthenaClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
Expand Down Expand Up @@ -75,6 +76,8 @@ public class DocDBRecordHandler

private final DocDBConnectionFactory connectionFactory;

private final DocDBQueryPassthrough queryPassthrough = new DocDBQueryPassthrough();

public DocDBRecordHandler(java.util.Map<String, String> configOptions)
{
this(
Expand Down Expand Up @@ -138,14 +141,25 @@ protected void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recor
SOURCE_TABLE_PROPERTY, tableNameObj.getTableName());

logger.info("Resolved tableName to: {}", tableName);

Map<String, ValueSet> constraintSummary = recordsRequest.getConstraints().getSummary();

MongoClient client = getOrCreateConn(recordsRequest.getSplit());
MongoDatabase db = client.getDatabase(schemaName);
MongoCollection<Document> table = db.getCollection(tableName);

Document query = QueryUtils.makeQuery(recordsRequest.getSchema(), constraintSummary);
MongoDatabase db;
MongoCollection<Document> table;
Document query;

if (recordsRequest.getConstraints().isQueryPassThrough()) {
Map<String, String> qptArguments = recordsRequest.getConstraints().getQueryPassthroughArguments();
queryPassthrough.verify(qptArguments);
db = client.getDatabase(qptArguments.get(DocDBQueryPassthrough.DATABASE));
table = db.getCollection(qptArguments.get(DocDBQueryPassthrough.COLLECTION));
query = QueryUtils.parseFilter(qptArguments.get(DocDBQueryPassthrough.FILTER));
}
else {
db = client.getDatabase(schemaName);
table = db.getCollection(tableName);
query = QueryUtils.makeQuery(recordsRequest.getSchema(), constraintSummary);
}

String disableProjectionAndCasingEnvValue = configOptions.getOrDefault(DISABLE_PROJECTION_AND_CASING_ENV, "false").toLowerCase();
boolean disableProjectionAndCasing = disableProjectionAndCasingEnvValue.equals("true");
Expand All @@ -157,7 +171,6 @@ protected void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recor
// Once AWS DocumentDB supports collation, then projections do not have to be disabled anymore because case
// insensitive indexes allows for case insensitive projections.
Document projection = disableProjectionAndCasing ? null : QueryUtils.makeProjection(recordsRequest.getSchema());

logger.info("readWithConstraint: query[{}] projection[{}]", query, projection);

final MongoCursor<Document> iterable = table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.Text;
import org.bson.Document;
import org.bson.json.JsonParseException;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -214,6 +215,21 @@ else if (singleValues.size() > 1) {
.collect(toList()));
}

/**
* Parses DocDB/MongoDB Json Filter/Projection to confirm its valid and convert it to Doc
* @param filter json's based filter
* @return Document
*/
public static Document parseFilter(String filter)
{
try {
return Document.parse(filter);
}
catch (JsonParseException e) {
throw new IllegalArgumentException("Can't parse 'filter' argument as json", e);
}
}

private static Document documentOf(String key, Object value)
{
return new Document(key, value);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*-
* #%L
* athena-docdb
* %%
* Copyright (C) 2019 - 2024 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package com.amazonaws.athena.connectors.docdb.qpt;

import com.amazonaws.athena.connector.lambda.metadata.optimizations.querypassthrough.QueryPassthroughSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;

public final class DocDBQueryPassthrough implements QueryPassthroughSignature
{
private static final String SCHEMA_NAME = "system";
private static final String NAME = "query";

// List of arguments for the query, statically initialized as it always contains the same value.
public static final String DATABASE = "DATABASE";
public static final String COLLECTION = "COLLECTION";
public static final String FILTER = "FILTER";
private static final Logger LOGGER = LoggerFactory.getLogger(DocDBQueryPassthrough.class);

@Override
public String getFunctionSchema()
{
return SCHEMA_NAME;
}

@Override
public String getFunctionName()
{
return NAME;
}

@Override
public List<String> getFunctionArguments()
{
return Arrays.asList(DATABASE, COLLECTION, FILTER);
}

@Override
public Logger getLogger()
{
return LOGGER;
}
}

0 comments on commit a69a525

Please sign in to comment.