diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala index 01266eb2c85..05068534fcc 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi.plugin.spark.authz import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, NamedExpression} @@ -129,6 +130,7 @@ object PrivilegesBuilder { spark) } } + checkHivePermanentUDF(p, privilegeObjects) } } @@ -341,4 +343,50 @@ object PrivilegesBuilder { } (inputObjs, outputObjs, opType) } + + def checkHivePermanentUDF( + plan: LogicalPlan, + privilegeObjects: ArrayBuffer[PrivilegeObject]): Unit = { + + def checkExpressionForUDF(expr: Expression): Unit = { + expr match { + case udfExpr + if udfExpr.nodeName.equals("HiveGenericUDF") || + udfExpr.nodeName.equals("HiveGenericUDTF") || + udfExpr.nodeName.equals("HiveSimpleUDF") || + udfExpr.nodeName.equals("HiveUDAFFunction") => + val funName: String = getFieldValNoError(udfExpr, "name") + val isFun: Boolean = getFieldValNoError(udfExpr, "isUDFDeterministic") + if (isFun && funName != null) { + val nameParts = funName.split("\\.") + if (nameParts.length == 2) { + val dbName = nameParts(0) + val funcName = nameParts(1) + privilegeObjects += PrivilegeObject(Function(None, Option(dbName), funcName)) + } + } + case otherExpr => + otherExpr.children.foreach(checkExpressionForUDF) + } + } + + plan match { + case p: Project => + p.projectList.foreach(expr => checkExpressionForUDF(expr)) + + case _ => + plan.children.foreach(child => checkHivePermanentUDF(child, privilegeObjects)) + } + } + + def getFieldValNoError[T](o: Any, name: String): T = { + Try { + val field = o.getClass.getDeclaredField(name) + field.setAccessible(true) + field.get(o) + } match { + case Success(value) => value.asInstanceOf[T] + case Failure(e) => null.asInstanceOf[T] + } + } } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala index 63837f2501d..a94c4acd73f 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala @@ -30,6 +30,8 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.funsuite.AnyFunSuite import org.apache.kyuubi.plugin.spark.authz.OperationType._ +import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._ +import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectType._ import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._ import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._ import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType @@ -115,6 +117,53 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite super.beforeEach() } + test("Hive permanent UDF detection via PrivilegesBuilder.build") { + val sqlStr = s"SELECT my_udf(id) FROM test_table" + val plan: LogicalPlan = spark.sql(sqlStr).queryExecution.analyzed + val (inputObjs, outputObjs, operationType) = PrivilegesBuilder.build(plan, spark) + assert(operationType === QUERY) + + assert(inputObjs.size === 1) + val privilegeObject = inputObjs.head + assert(privilegeObject.actionType === OTHER) + assert(privilegeObject.privilegeObjectType === FUNCTION) + assert(privilegeObject.dbname === "default") + assert(privilegeObject.objectName === "my_udf") + + assert(outputObjs.isEmpty) + } + + test("Nested Hive UDF detection via PrivilegesBuilder.build") { + val sqlStr = + s""" + |SELECT my_udf(id) + |FROM ( + | SELECT id, value + | FROM test_table + |) t + |""".stripMargin + val plan: LogicalPlan = spark.sql(sqlStr).queryExecution.analyzed + + val (inputObjs, outputObjs, operationType) = PrivilegesBuilder.build(plan, spark) + assert(operationType === QUERY) + assert(inputObjs.size === 1) + val privilegeObject = inputObjs.head + assert(privilegeObject.actionType === OTHER) + assert(privilegeObject.privilegeObjectType === FUNCTION) + assert(privilegeObject.dbname === "default") + assert(privilegeObject.objectName === "my_udf") + assert(outputObjs.isEmpty) + } + + test("No Hive UDF detection via PrivilegesBuilder.build") { + val sqlStr = s"SELECT id FROM test_table" + val plan: LogicalPlan = spark.sql(sqlStr).queryExecution.analyzed + val (inputObjs, outputObjs, operationType) = PrivilegesBuilder.build(plan, spark) + assert(operationType === QUERY) + assert(inputObjs.isEmpty) + assert(outputObjs.isEmpty) + } + test("AlterDatabasePropertiesCommand") { assume(SPARK_RUNTIME_VERSION <= "3.2") val plan = sql("ALTER DATABASE default SET DBPROPERTIES (abc = '123')").queryExecution.analyzed