Skip to content

Commit

Permalink
Feature: Enable/Disable kerberos authentication for cloudera manager …
Browse files Browse the repository at this point in the history
…hbase instance (#1793)
  • Loading branch information
VenkatasivareddyTR authored Mar 21, 2024
1 parent 3a5e9e4 commit 18da9a6
Show file tree
Hide file tree
Showing 7 changed files with 563 additions and 0 deletions.
25 changes: 25 additions & 0 deletions athena-hbase/athena-hbase.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,22 @@ Parameters:
Description: "(Optional) An IAM policy ARN to use as the PermissionsBoundary for the created Lambda function's execution role"
Default: ''
Type: String
KerberosAuthEnabled:
Description: 'Kerberos authentication enabled or not'
Default: "false"
Type: String
KerberosConfigFilesS3Reference:
Description: 'The S3 bucket reference where kerberos auth config files are uploaded. Applicable for Kerberos auth'
Default: ""
Type: String
PrincipalName:
Description: 'Principal name for Kerberos authentication'
Default: ""
Type: String
HbaseRpcProtection:
Description: 'Hbase Rpc Protection value for Kerberos authentication'
Default: ""
Type: String
Conditions:
HasPermissionsBoundary: !Not [ !Equals [ !Ref PermissionsBoundaryARN, "" ] ]
Resources:
Expand All @@ -64,6 +80,10 @@ Resources:
spill_bucket: !Ref SpillBucket
spill_prefix: !Ref SpillPrefix
default_hbase: !Ref HBaseConnectionString
kerberos_auth_enabled: !Ref KerberosAuthEnabled
kerberos_config_files_s3_reference: !Ref KerberosConfigFilesS3Reference
principal_name: !Ref PrincipalName
hbase_rpc_protection: !Ref HbaseRpcProtection
FunctionName: !Ref AthenaCatalogName
Handler: "com.amazonaws.athena.connectors.hbase.HbaseCompositeHandler"
CodeUri: "./target/athena-hbase-2022.47.1.jar"
Expand Down Expand Up @@ -91,6 +111,11 @@ Resources:
- glue:GetDatabase
- athena:GetQueryExecution
- s3:ListAllMyBuckets
- s3:ListBucket
- s3:GetObject
- s3:GetBucketLocation
- s3:GetObjectVersion
- s3:GetLifecycleConfiguration
Effect: Allow
Resource: '*'
Version: '2012-10-17'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,28 @@
package com.amazonaws.athena.connectors.hbase;

import org.apache.arrow.util.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static com.amazonaws.athena.connectors.hbase.HbaseKerberosUtils.HBASE_RPC_PROTECTION;
import static com.amazonaws.athena.connectors.hbase.HbaseKerberosUtils.KERBEROS_AUTH_ENABLED;
import static com.amazonaws.athena.connectors.hbase.HbaseKerberosUtils.KERBEROS_CONFIG_FILES_S3_REFERENCE;
import static com.amazonaws.athena.connectors.hbase.HbaseKerberosUtils.PRINCIPAL_NAME;
import static com.amazonaws.athena.connectors.hbase.HbaseKerberosUtils.copyConfigFilesFromS3ToTempFolder;

/**
* Creates and Caches HBase Connection Instances, using the connection string as the cache key.
*
Expand Down Expand Up @@ -113,6 +123,40 @@ private Connection createConnection(String host, String masterPort, String zooke
logger.info("createConnection: applying client config {}:{}", nextConfig.getKey(), nextConfig.getValue());
config.set(nextConfig.getKey(), nextConfig.getValue());
}

Map<String, String> configOptions = System.getenv();
boolean kerberosAuthEnabled = configOptions.get(KERBEROS_AUTH_ENABLED) != null && "true".equalsIgnoreCase(configOptions.get(KERBEROS_AUTH_ENABLED));
logger.info("Kerberos Authentication Enabled: " + kerberosAuthEnabled);
if (kerberosAuthEnabled) {
String keytabLocation = null;
config.set("hbase.rpc.protection", configOptions.get(HBASE_RPC_PROTECTION));
logger.info("hbase.rpc.protection: " + config.get("hbase.rpc.protection"));
String s3uri = configOptions.get(KERBEROS_CONFIG_FILES_S3_REFERENCE);
if (StringUtils.isNotBlank(s3uri)) {
try {
Path tempDir = copyConfigFilesFromS3ToTempFolder(configOptions);
logger.debug("tempDir: " + tempDir);
keytabLocation = tempDir + File.separator + "hbase.keytab";
System.setProperty("java.security.krb5.conf", tempDir + File.separator + "krb5.conf");
logger.debug("krb5.conf location: " + tempDir + File.separator + "krb5.conf");
}
catch (Exception e) {
throw new RuntimeException("Error Copying Config files from S3 to temp folder: ", e);
}
}
logger.debug("keytabLocation: " + keytabLocation);

UserGroupInformation.setConfiguration(config);
try {
String principalName = configOptions.get(PRINCIPAL_NAME);
UserGroupInformation.loginUserFromKeytab(principalName, keytabLocation);
}
catch (IOException ex) {
throw new RuntimeException("Exception in UserGroupInformation.loginUserFromKeytab: ", ex);
}
logger.debug("UserGroupInformation.loginUserFromKeytab Success.");
}

Connection conn = ConnectionFactory.createConnection(config);
logger.info("createConnection: hbase.zookeeper.quorum:" + config.get("hbase.zookeeper.quorum"));
logger.info("createConnection: hbase.zookeeper.property.clientPort:" + config.get("hbase.zookeeper.property.clientPort"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*-
* #%L
* athena-hbase
* %%
* Copyright (C) 2019 - 2024 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package com.amazonaws.athena.connectors.hbase;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;

public class HbaseKerberosUtils
{
private static final Logger logger = LoggerFactory.getLogger(HbaseKerberosUtils.class);

/**
* For Kerberos authentication, user need to give S3 bucket reference where the config
* files are uploaded.
*/
public static final String KERBEROS_CONFIG_FILES_S3_REFERENCE = "kerberos_config_files_s3_reference";
/**
* This is temp folder where the kerberos config files from S3 will be downloaded
*/
public static final String TEMP_DIR = "/tmp";
public static final String PRINCIPAL_NAME = "principal_name";
public static final String HBASE_RPC_PROTECTION = "hbase_rpc_protection";
public static final String KERBEROS_AUTH_ENABLED = "kerberos_auth_enabled";

private HbaseKerberosUtils() {}

/**
* Downloads the config files from S3 to temp directory.
*
* @throws Exception - {@link Exception}
*/
public static Path copyConfigFilesFromS3ToTempFolder(java.util.Map<String, String> configOptions) throws Exception
{
logger.debug("Creating the connection with AWS S3 for copying config files to Temp Folder");
Path tempDir = getTempDirPath();
AWSCredentials credentials = new DefaultAWSCredentialsProviderChain().getCredentials();
AmazonS3 s3Client = AmazonS3ClientBuilder.standard().
withCredentials(new AWSStaticCredentialsProvider(credentials)).
build();

String s3uri = getRequiredConfig(KERBEROS_CONFIG_FILES_S3_REFERENCE, configOptions);
String[] s3Bucket = s3uri.split("s3://")[1].split("/");

ObjectListing objectListing = s3Client.listObjects(s3Bucket[0], s3Bucket[1]);

for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
S3Object object = s3Client.getObject(new GetObjectRequest(s3Bucket[0], objectSummary.getKey()));
InputStream inputStream = new BufferedInputStream(object.getObjectContent());
String key = objectSummary.getKey();
String fName = key.substring(key.indexOf('/') + 1);
if (!fName.isEmpty()) {
File file = new File(tempDir + File.separator + fName);
Files.copy(inputStream, file.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
}
return tempDir;
}

/**
* Creates the temp directory to put the kerberos auth config files.
*
* @return {@link Path} Temp directory path
*/
private static Path getTempDirPath()
{
Path tmpPath = Paths.get(TEMP_DIR).toAbsolutePath();
File filePath = new File(tmpPath + File.separator + "hbasekerberosconfigs");
if (filePath.exists()) {
return filePath.toPath();
}
boolean isCreated = filePath.mkdirs();
logger.info("Is new directory created? " + isCreated);
return filePath.toPath();
}

/**
* Gets the environment variable.
*
* @param key - the config key
* @return {@link String}
*/
private static String getRequiredConfig(String key, java.util.Map<String, String> configOptions)
{
String value = configOptions.getOrDefault(key, "");
if (value.isEmpty()) {
throw new IllegalArgumentException("Lambda Environment Variable " + key + " has not been populated! ");
}
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,26 @@
package com.amazonaws.athena.connectors.hbase.connection;

import org.apache.arrow.util.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static com.amazonaws.athena.connectors.hbase.HbaseKerberosUtils.HBASE_RPC_PROTECTION;
import static com.amazonaws.athena.connectors.hbase.HbaseKerberosUtils.KERBEROS_AUTH_ENABLED;
import static com.amazonaws.athena.connectors.hbase.HbaseKerberosUtils.KERBEROS_CONFIG_FILES_S3_REFERENCE;
import static com.amazonaws.athena.connectors.hbase.HbaseKerberosUtils.PRINCIPAL_NAME;
import static com.amazonaws.athena.connectors.hbase.HbaseKerberosUtils.copyConfigFilesFromS3ToTempFolder;

/**
* Creates and Caches HBase Connection Instances, using the connection string as the cache key.
*
Expand Down Expand Up @@ -116,6 +126,37 @@ private HBaseConnection createConnection(String host, String masterPort, String
config.set(nextConfig.getKey(), nextConfig.getValue());
}

Map<String, String> configOptions = System.getenv();
boolean kerberosAuthEnabled = configOptions.get(KERBEROS_AUTH_ENABLED) != null && "true".equalsIgnoreCase(configOptions.get(KERBEROS_AUTH_ENABLED));
logger.info("Kerberos Authentication Enabled: " + kerberosAuthEnabled);
if (kerberosAuthEnabled) {
String keytabLocation = null;
config.set("hbase.rpc.protection", configOptions.get(HBASE_RPC_PROTECTION));
logger.info("hbase.rpc.protection: " + config.get("hbase.rpc.protection"));
String s3uri = configOptions.get(KERBEROS_CONFIG_FILES_S3_REFERENCE);
if (StringUtils.isNotBlank(s3uri)) {
try {
Path tempDir = copyConfigFilesFromS3ToTempFolder(configOptions);
logger.debug("tempDir: " + tempDir);
keytabLocation = tempDir + File.separator + "hbase.keytab";
System.setProperty("java.security.krb5.conf", tempDir + File.separator + "krb5.conf");
logger.debug("keytabLocation: " + keytabLocation);
}
catch (Exception e) {
throw new RuntimeException("Error Copying Config files from S3 to temp folder: ", e);
}
}

UserGroupInformation.setConfiguration(config);
try {
String principalName = configOptions.get(PRINCIPAL_NAME);
UserGroupInformation.loginUserFromKeytab(principalName, keytabLocation);
}
catch (IOException ex) {
throw new RuntimeException("Exception in UserGroupInformation.loginUserFromKeytab: ", ex);
}
logger.debug("UserGroupInformation.loginUserFromKeytab Success.");
}
return new HBaseConnection(config, maxRetries);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.hbase.integ;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
Expand All @@ -30,12 +31,21 @@
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;

import static com.amazonaws.athena.connectors.hbase.HbaseKerberosUtils.HBASE_RPC_PROTECTION;
import static com.amazonaws.athena.connectors.hbase.HbaseKerberosUtils.KERBEROS_AUTH_ENABLED;
import static com.amazonaws.athena.connectors.hbase.HbaseKerberosUtils.KERBEROS_CONFIG_FILES_S3_REFERENCE;
import static com.amazonaws.athena.connectors.hbase.HbaseKerberosUtils.PRINCIPAL_NAME;
import static com.amazonaws.athena.connectors.hbase.HbaseKerberosUtils.copyConfigFilesFromS3ToTempFolder;

/**
* This class can be used to establish a connection to a HBase instance. Once the connection is established, a new
* database/namespace and table can be created, and new rows can be inserted into the newly created table.
Expand Down Expand Up @@ -88,6 +98,39 @@ private Configuration getHbaseConfiguration(String connectionStr)
configuration.set("hbase.client.pause", "500");
configuration.set("zookeeper.recovery.retry", "2");

java.util.Map<String, String> configOptions = System.getenv();
boolean kerberosAuthEnabled = configOptions.get(KERBEROS_AUTH_ENABLED) != null && "true".equalsIgnoreCase(configOptions.get(KERBEROS_AUTH_ENABLED));
logger.info("Kerberos Authentication Enabled: " + kerberosAuthEnabled);
if (kerberosAuthEnabled) {
String keytabLocation = null;
configuration.set("hbase.rpc.protection", configOptions.get(HBASE_RPC_PROTECTION));
logger.info("hbase.rpc.protection: " + configuration.get("hbase.rpc.protection"));
String s3uri = configOptions.get(KERBEROS_CONFIG_FILES_S3_REFERENCE);
if (StringUtils.isNotBlank(s3uri)) {
try {
Path tempDir = copyConfigFilesFromS3ToTempFolder(configOptions);
logger.debug("tempDir: " + tempDir);
keytabLocation = tempDir + File.separator + "hbase.keytab";
System.setProperty("java.security.krb5.conf", tempDir + File.separator + "krb5.conf");
logger.debug("krb5.conf location: " + tempDir + File.separator + "krb5.conf");
}
catch (Exception e) {
throw new RuntimeException("Error Copying Config files from S3 to temp folder: ", e);
}
}
logger.debug("keytabLocation: " + keytabLocation);

UserGroupInformation.setConfiguration(configuration);
try {
String principalName = configOptions.get(PRINCIPAL_NAME);
UserGroupInformation.loginUserFromKeytab(principalName, keytabLocation);
}
catch (IOException ex) {
throw new RuntimeException("Exception in UserGroupInformation.loginUserFromKeytab: ", ex);
}
logger.debug("UserGroupInformation.loginUserFromKeytab Success.");
}

return configuration;
}

Expand Down
Loading

0 comments on commit 18da9a6

Please sign in to comment.