diff --git a/cluster/src/main/scala/io/snappydata/gemxd/ClusterCallbacksImpl.scala b/cluster/src/main/scala/io/snappydata/gemxd/ClusterCallbacksImpl.scala index 68341d5be4..6fa3e006ab 100644 --- a/cluster/src/main/scala/io/snappydata/gemxd/ClusterCallbacksImpl.scala +++ b/cluster/src/main/scala/io/snappydata/gemxd/ClusterCallbacksImpl.scala @@ -17,15 +17,14 @@ package io.snappydata.gemxd import java.io.{File, InputStream} +import java.util.{Iterator => JIterator} import java.{lang, util} import java.util.{List, Iterator => JIterator} -import scala.collection.mutable.ArrayBuffer -import scala.util.Try import scala.collection.mutable.ArrayBuffer import scala.util.Try import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl +import com.gemstone.gemfire.internal.cache.{ExternalTableMetaData, GemFireCacheImpl} import com.gemstone.gemfire.internal.shared.Version import com.gemstone.gemfire.internal.{ByteArrayDataInput, ClassPathLoader, GemFireVersion} import com.pivotal.gemfirexd.Attribute @@ -38,6 +37,8 @@ import io.snappydata.cluster.ExecutorInitiator import io.snappydata.impl.LeadImpl import io.snappydata.recovery.RecoveryService import io.snappydata.remote.interpreter.SnappyInterpreterExecute +import io.snappydata.sql.catalog.CatalogObjectType +import io.snappydata.util.ServiceUtils import io.snappydata.{ServiceManager, SnappyEmbeddedTableStatsProviderService} import org.apache.spark.Logging import org.apache.spark.scheduler.cluster.SnappyClusterManager @@ -45,6 +46,9 @@ import org.apache.spark.serializer.{KryoSerializerPool, StructTypeSerializer} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.collection.ToolsCallbackInit import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.execution.columnar.ExternalStoreUtils +import org.apache.spark.sql.{SaveMode, SnappyContext} /** * Callbacks that are sent by GemXD to Snappy for cluster management @@ -53,6 +57,8 @@ object ClusterCallbacksImpl extends ClusterCallbacks with Logging { CallbackFactoryProvider.setClusterCallbacks(this) + private val PASSWORD_MATCH = "(?i)(password|passwd|secret).*".r + private[snappydata] def initialize(): Unit = { // nothing to be done; singleton constructor does all } @@ -265,6 +271,37 @@ object ClusterCallbacksImpl extends ClusterCallbacks with Logging { } } + override def getHiveTablesMetadata(): + util.Collection[ExternalTableMetaData] = { + val catalogTables = SnappyContext.getHiveCatalogTables() + import scala.collection.JavaConverters._ + getTablesMetadata(catalogTables).asJava + } + + private def getTablesMetadata(catalogTables: Seq[CatalogTable]): Seq[ExternalTableMetaData] = { + catalogTables.map(catalogTableToMetadata) + } + + private def catalogTableToMetadata(table: CatalogTable) = { + val tableType = CatalogObjectType.getTableType(table) + val tblDataSourcePath = table.storage.locationUri match { + case None => "" + case Some(l) => ServiceUtils.maskLocationURI(l) + } + + val metaData = new ExternalTableMetaData(table.identifier.table, + table.database, tableType.toString, null, -1, + -1, null, null, null, null, + tblDataSourcePath, "", false) + metaData.provider = table.provider match { + case None => "" + case Some(p) => p + } + metaData.shortProvider = metaData.provider + metaData.columns = ExternalStoreUtils.getColumnMetadata(table.schema) + metaData + } + override def getInterpreterExecution(sql: String, v: Version, connId: lang.Long): InterpreterExecute = new SnappyInterpreterExecute(sql, connId) diff --git a/core/src/main/scala/io/snappydata/sql/catalog/SnappyExternalCatalog.scala b/core/src/main/scala/io/snappydata/sql/catalog/SnappyExternalCatalog.scala index 2de0b15d2a..bd7d30bcee 100644 --- a/core/src/main/scala/io/snappydata/sql/catalog/SnappyExternalCatalog.scala +++ b/core/src/main/scala/io/snappydata/sql/catalog/SnappyExternalCatalog.scala @@ -279,7 +279,6 @@ object SnappyExternalCatalog { val INDEXED_TABLE_LOWER: String = Utils.toLowerCase("INDEXED_TABLE") val EMPTY_SCHEMA: StructType = StructType(Nil) - private[sql] val PASSWORD_MATCH = "(?i)(password|passwd|secret).*".r val currentFunctionIdentifier = new ThreadLocal[FunctionIdentifier] diff --git a/core/src/main/scala/io/snappydata/sql/catalog/impl/StoreHiveCatalog.scala b/core/src/main/scala/io/snappydata/sql/catalog/impl/StoreHiveCatalog.scala index c079d84b60..073727dd22 100644 --- a/core/src/main/scala/io/snappydata/sql/catalog/impl/StoreHiveCatalog.scala +++ b/core/src/main/scala/io/snappydata/sql/catalog/impl/StoreHiveCatalog.scala @@ -41,6 +41,7 @@ import io.snappydata.Constant.{SPARK_STORE_PREFIX, STORE_PROPERTY_PREFIX} import io.snappydata.sql.catalog.SnappyExternalCatalog.checkSchemaPermission import io.snappydata.sql.catalog.{CatalogObjectType, ConnectorExternalCatalog, SnappyExternalCatalog} import io.snappydata.thrift._ +import io.snappydata.util.ServiceUtils import org.apache.log4j.{Level, LogManager} import org.apache.spark.sql.catalyst.TableIdentifier @@ -511,33 +512,17 @@ class StoreHiveCatalog extends ExternalCatalog with Logging { } } - private def maskPassword(s: String): String = { - SnappyExternalCatalog.PASSWORD_MATCH.replaceAllIn(s, "xxx") - } - - // Mask access key and secret access key in case of S3 URI - private def maskLocationURI(locURI: String): String = { - val uri = toLowerCase(locURI) - val maskedSrcPath = if ((uri.startsWith("s3a://") || - uri.startsWith("s3://") || - uri.startsWith("s3n://")) && uri.contains("@")) { - locURI.replace(locURI.slice(locURI.indexOf("//") + 2, - locURI.indexOf("@")), "****:****") - } else maskPassword(locURI) - maskedSrcPath - } - // latest change is here - mask it here - include s3 masking here too private def getDataSourcePath(properties: scala.collection.Map[String, String], storage: CatalogStorageFormat): String = { properties.get("path") match { - case Some(p) if !p.isEmpty => maskLocationURI(p) + case Some(p) if !p.isEmpty => ServiceUtils.maskLocationURI(p) case _ => properties.get("region.path") match { // for external GemFire connector - case Some(p) if !p.isEmpty => maskLocationURI(p) + case Some(p) if !p.isEmpty => ServiceUtils.maskLocationURI(p) case _ => properties.get("url") match { // jdbc case Some(p) if !p.isEmpty => // mask the password if present - val url = maskLocationURI(p) + val url = ServiceUtils.maskLocationURI(p) // add dbtable if present properties.get(SnappyExternalCatalog.DBTABLE_PROPERTY) match { case Some(d) if !d.isEmpty => s"$url; ${SnappyExternalCatalog.DBTABLE_PROPERTY}=$d" @@ -545,7 +530,7 @@ class StoreHiveCatalog extends ExternalCatalog with Logging { } case _ => storage.locationUri match { // fallback to locationUri case None => "" - case Some(l) => maskLocationURI(l) + case Some(l) => ServiceUtils.maskLocationURI(l) } } } diff --git a/core/src/main/scala/io/snappydata/util/ServiceUtils.scala b/core/src/main/scala/io/snappydata/util/ServiceUtils.scala index 9be9bb77dd..55b9eb6f0d 100644 --- a/core/src/main/scala/io/snappydata/util/ServiceUtils.scala +++ b/core/src/main/scala/io/snappydata/util/ServiceUtils.scala @@ -21,6 +21,7 @@ import java.util.Properties import java.util.regex.Pattern import scala.collection.JavaConverters._ +import scala.util.matching.Regex import _root_.com.gemstone.gemfire.distributed.DistributedMember import _root_.com.gemstone.gemfire.distributed.internal.DistributionConfig @@ -33,6 +34,7 @@ import io.snappydata.{Constant, Property, ServerManager, SnappyTableStatsProvide import org.apache.spark.memory.MemoryMode import org.apache.spark.sql.collection.Utils import org.apache.spark.sql.hive.HiveClientUtil +import org.apache.spark.sql.sources.JdbcExtendedUtils.toLowerCase import org.apache.spark.sql.{SnappyContext, SparkSession, ThinClientConnectorMode} import org.apache.spark.{SparkContext, SparkEnv} @@ -41,6 +43,7 @@ import org.apache.spark.{SparkContext, SparkEnv} */ object ServiceUtils { + val PASSWORD_MATCH: Regex = "(?i)(password|passwd|secret).*".r val LOCATOR_URL_PATTERN: Pattern = Pattern.compile("(.+:[0-9]+)|(.+\\[[0-9]+\\])") private[snappydata] def getStoreProperties( @@ -178,6 +181,24 @@ object ServiceUtils { } } + /** + * Masks access key and secret access key in case of S3 URI + */ + def maskLocationURI(locURI: String): String = { + val uri = toLowerCase(locURI) + val maskedSrcPath = if ((uri.startsWith("s3a://") || + uri.startsWith("s3://") || + uri.startsWith("s3n://")) && uri.contains("@")) { + locURI.replace(locURI.slice(locURI.indexOf("//") + 2, + locURI.indexOf("@")), "****:****") + } else maskPassword(locURI) + maskedSrcPath + } + + private def maskPassword(s: String): String = { + PASSWORD_MATCH.replaceAllIn(s, "xxx") + } + /** * We capture table ddl string and add it to table properties before adding to catalog. * This will also contain passwords in the string such as jdbc connection string or diff --git a/core/src/main/scala/org/apache/spark/sql/SnappyContext.scala b/core/src/main/scala/org/apache/spark/sql/SnappyContext.scala index 8a330bd90d..656a4ccecd 100644 --- a/core/src/main/scala/org/apache/spark/sql/SnappyContext.scala +++ b/core/src/main/scala/org/apache/spark/sql/SnappyContext.scala @@ -37,6 +37,7 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM import com.pivotal.gemfirexd.Attribute import com.pivotal.gemfirexd.internal.engine.Misc import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils +import com.pivotal.gemfirexd.internal.engine.store.GemFireStore.StoreAdvisee import com.pivotal.gemfirexd.internal.shared.common.SharedUtils import io.snappydata.sql.catalog.{CatalogObjectType, ConnectorExternalCatalog} import io.snappydata.util.ServiceUtils @@ -49,6 +50,7 @@ import org.apache.spark.memory.MemoryManagerCallback import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerExecutorAdded} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.SortDirection import org.apache.spark.sql.collection.{ToolsCallbackInit, Utils} import org.apache.spark.sql.execution.columnar.ExternalStoreUtils.CaseInsensitiveMutableHashMap @@ -874,6 +876,12 @@ object SnappyContext extends Logging { } } + def getHiveCatalogTables(skipSchemas: Seq[String] = "SYS" :: Nil): Seq[CatalogTable] = { + val catalog = hiveSession.sessionState.catalog + catalog.listDatabases().filter(s => skipSchemas.isEmpty || !skipSchemas.contains(s)). + flatMap(schema => catalog.listTables(schema).map(table => catalog.getTableMetadata(table))) + } + private[spark] def getBlockIdIfNull( executorId: String): Option[BlockAndExecutorId] = Option(storeToBlockMap.get(executorId)) @@ -1190,7 +1198,7 @@ object SnappyContext extends Logging { def newHiveSession(): SparkSession = contextLock.synchronized { val sc = globalSparkContext sc.conf.set(StaticSQLConf.CATALOG_IMPLEMENTATION.key, "hive") - if (this.hiveSession ne null) this.hiveSession.newSession() + val hiveSession = if (this.hiveSession ne null) this.hiveSession.newSession() else { val session = SparkSession.builder().enableHiveSupport().getOrCreate() if (session.sharedState.externalCatalog.isInstanceOf[HiveExternalCatalog] && @@ -1203,6 +1211,14 @@ object SnappyContext extends Logging { this.hiveSession } } + updateAndDistributeProfile() + hiveSession + } + + private def updateAndDistributeProfile(): Unit = { + val advisee = GemFireXDUtils.getGfxdAdvisor.getAdvisee.asInstanceOf[StoreAdvisee] + advisee.setHiveSessionInitialized(true) + GemFireXDUtils.getGfxdAdvisor.distributeProfileUpdate() } def hasHiveSession: Boolean = contextLock.synchronized(this.hiveSession ne null)