Skip to content

Commit 705e113

Browse files
Add option to put "database_prefix" when import is mode is "from_s3".
Using this approach its possible to import between multiple accounts and add a database prefix.
1 parent 92d6bcf commit 705e113

File tree

1 file changed

+53
-15
lines changed

1 file changed

+53
-15
lines changed

utilities/Hive_metastore_migration/src/import_into_datacatalog.py

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,34 @@
11
# Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2-
# SPDX-License-Identifier: MIT-0
2+
# Licensed under the Amazon Software License (the "License"). You may not use
3+
# this file except in compliance with the License. A copy of the License is
4+
# located at
5+
#
6+
# http://aws.amazon.com/asl/
7+
#
8+
# and in the "LICENSE" file accompanying this file. This file is distributed
9+
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express
10+
# or implied. See the License for the specific language governing
11+
# permissions and limitations under the License.
12+
313

414
from __future__ import print_function
515

16+
import logging
17+
import os
18+
19+
from pyspark.sql.functions import lit, struct, array, col, concat
20+
621
from awsglue.context import GlueContext
722
from awsglue.dynamicframe import DynamicFrame
823

924
from hive_metastore_migration import *
1025

1126

27+
logging.basicConfig()
28+
logger = logging.getLogger(__name__)
29+
logger.setLevel(getattr(logging, os.getenv('LOG_LEVEL', 'INFO')))
30+
31+
1232
def transform_df_to_catalog_import_schema(sql_context, glue_context, df_databases, df_tables, df_partitions):
1333
df_databases_array = df_databases.select(df_databases['type'], array(df_databases['item']).alias('items'))
1434
df_tables_array = df_tables.select(df_tables['type'], df_tables['database'],
@@ -40,8 +60,8 @@ def import_datacatalog(sql_context, glue_context, datacatalog_name, databases, t
4060
connection_options={'catalog.name': datacatalog_name, 'catalog.region': region})
4161

4262

43-
def metastore_full_migration(sc, sql_context, glue_context, connection, datacatalog_name, db_prefix, table_prefix
44-
, region):
63+
def metastore_full_migration(sc, sql_context, glue_context, connection, datacatalog_name, db_prefix, table_prefix,
64+
region):
4565
# extract
4666
hive_metastore = HiveMetastore(connection, sql_context)
4767
hive_metastore.extract_metastore()
@@ -50,17 +70,25 @@ def metastore_full_migration(sc, sql_context, glue_context, connection, datacata
5070
(databases, tables, partitions) = HiveMetastoreTransformer(
5171
sc, sql_context, db_prefix, table_prefix).transform(hive_metastore)
5272

53-
#load
73+
# load
5474
import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region)
5575

5676

57-
def metastore_import_from_s3(sql_context, glue_context,db_input_dir, tbl_input_dir, parts_input_dir,
58-
datacatalog_name, region):
77+
def metastore_import_from_s3(sql_context, glue_context, db_input_dir, tbl_input_dir, parts_input_dir, db_prefix, datacatalog_name,
78+
region):
5979

6080
# extract
6181
databases = sql_context.read.json(path=db_input_dir, schema=METASTORE_DATABASE_SCHEMA)
6282
tables = sql_context.read.json(path=tbl_input_dir, schema=METASTORE_TABLE_SCHEMA)
6383
partitions = sql_context.read.json(path=parts_input_dir, schema=METASTORE_PARTITION_SCHEMA)
84+
85+
# Changes to Prefix on database
86+
if db_prefix:
87+
databases = databases.withColumn('item', struct(col('item.description'), col('item.locationUri'), concat(lit(db_prefix),col('item.name')).alias('name'), col('item.parameters')))
88+
tables = tables.withColumn("database",concat(lit(db_prefix),col('database')).alias('database'))
89+
partitions = partitions.withColumn("database",concat(lit(db_prefix),col('database')).alias('database'))
90+
partitions = partitions.withColumn('item', struct(col('item.creationTime'), col('item.creationTime'), concat(lit(db_prefix),col('item.namespaceName')).alias('namespaceName'), col('item.parameters'), col('item.storageDescriptor'), col('item.values')))
91+
6492

6593
# load
6694
import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region)
@@ -71,21 +99,29 @@ def main():
7199
from_s3 = 'from-s3'
72100
from_jdbc = 'from-jdbc'
73101
parser = argparse.ArgumentParser(prog=sys.argv[0])
74-
parser.add_argument('-m', '--mode', required=True, choices=[from_s3, from_jdbc], help='Choose to migrate metastore either from JDBC or from S3')
75-
parser.add_argument('-c', '--connection-name', required=False, help='Glue Connection name for Hive metastore JDBC connection')
76-
parser.add_argument('-R', '--region', required=False, help='AWS region of target Glue DataCatalog, default to "us-east-1"')
77-
parser.add_argument('-d', '--database-prefix', required=False, help='Optional prefix for database names in Glue DataCatalog')
78-
parser.add_argument('-t', '--table-prefix', required=False, help='Optional prefix for table name in Glue DataCatalog')
79-
parser.add_argument('-D', '--database-input-path', required=False, help='An S3 path containing json files of metastore database entities')
80-
parser.add_argument('-T', '--table-input-path', required=False, help='An S3 path containing json files of metastore table entities')
81-
parser.add_argument('-P', '--partition-input-path', required=False, help='An S3 path containing json files of metastore partition entities')
102+
parser.add_argument('-m', '--mode', required=True, choices=[from_s3, from_jdbc],
103+
help='Choose to migrate metastore either from JDBC or from S3')
104+
parser.add_argument('-c', '--connection-name', required=False,
105+
help='Glue Connection name for Hive metastore JDBC connection')
106+
parser.add_argument('-R', '--region', required=False,
107+
help='AWS region of target Glue DataCatalog, default to "us-east-1"')
108+
parser.add_argument('-d', '--database-prefix', required=False,
109+
help='Optional prefix for database names in Glue DataCatalog')
110+
parser.add_argument('-t', '--table-prefix', required=False,
111+
help='Optional prefix for table name in Glue DataCatalog')
112+
parser.add_argument('-D', '--database-input-path', required=False,
113+
help='An S3 path containing json files of metastore database entities')
114+
parser.add_argument('-T', '--table-input-path', required=False,
115+
help='An S3 path containing json files of metastore table entities')
116+
parser.add_argument('-P', '--partition-input-path', required=False,
117+
help='An S3 path containing json files of metastore partition entities')
82118

83119
options = get_options(parser, sys.argv)
84120
if options['mode'] == from_s3:
85121
validate_options_in_mode(
86122
options=options, mode=from_s3,
87123
required_options=['database_input_path', 'table_input_path', 'partition_input_path'],
88-
not_allowed_options=['database_prefix', 'table_prefix']
124+
not_allowed_options=['table_prefix']
89125
)
90126
elif options['mode'] == from_jdbc:
91127
validate_options_in_mode(
@@ -110,6 +146,7 @@ def main():
110146
db_input_dir=options['database_input_path'],
111147
tbl_input_dir=options['table_input_path'],
112148
parts_input_dir=options['partition_input_path'],
149+
db_prefix=options.get('database_prefix') or '',
113150
datacatalog_name='datacatalog',
114151
region=options.get('region') or 'us-east-1'
115152
)
@@ -126,5 +163,6 @@ def main():
126163
region=options.get('region') or 'us-east-1'
127164
)
128165

166+
129167
if __name__ == '__main__':
130168
main()

0 commit comments

Comments
 (0)