Skip to content

kyuubi-sparksql support hive-udf permission control #7024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kyuubi.plugin.spark.authz

import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, NamedExpression}
Expand Down Expand Up @@ -129,6 +130,7 @@ object PrivilegesBuilder {
spark)
}
}
checkHivePermanentUDF(p, privilegeObjects)
}
}

Expand Down Expand Up @@ -341,4 +343,50 @@ object PrivilegesBuilder {
}
(inputObjs, outputObjs, opType)
}

def checkHivePermanentUDF(
plan: LogicalPlan,
privilegeObjects: ArrayBuffer[PrivilegeObject]): Unit = {

def checkExpressionForUDF(expr: Expression): Unit = {
expr match {
case udfExpr
if udfExpr.nodeName.equals("HiveGenericUDF") ||
udfExpr.nodeName.equals("HiveGenericUDTF") ||
udfExpr.nodeName.equals("HiveSimpleUDF") ||
udfExpr.nodeName.equals("HiveUDAFFunction") =>
val funName: String = getFieldValNoError(udfExpr, "name")
val isFun: Boolean = getFieldValNoError(udfExpr, "isUDFDeterministic")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why should we check isUDFDeterministic?

if (isFun && funName != null) {
val nameParts = funName.split("\\.")
if (nameParts.length == 2) {
val dbName = nameParts(0)
val funcName = nameParts(1)
privilegeObjects += PrivilegeObject(Function(None, Option(dbName), funcName))
}
}
case otherExpr =>
otherExpr.children.foreach(checkExpressionForUDF)
}
}

plan match {
case p: Project =>
p.projectList.foreach(expr => checkExpressionForUDF(expr))

case _ =>
plan.children.foreach(child => checkHivePermanentUDF(child, privilegeObjects))
}
}

def getFieldValNoError[T](o: Any, name: String): T = {
Try {
val field = o.getClass.getDeclaredField(name)
field.setAccessible(true)
field.get(o)
} match {
case Success(value) => value.asInstanceOf[T]
case Failure(e) => null.asInstanceOf[T]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.funsuite.AnyFunSuite

import org.apache.kyuubi.plugin.spark.authz.OperationType._
import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectType._
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType
Expand Down Expand Up @@ -115,6 +117,53 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
super.beforeEach()
}

test("Hive permanent UDF detection via PrivilegesBuilder.build") {
val sqlStr = s"SELECT my_udf(id) FROM test_table"
val plan: LogicalPlan = spark.sql(sqlStr).queryExecution.analyzed
val (inputObjs, outputObjs, operationType) = PrivilegesBuilder.build(plan, spark)
assert(operationType === QUERY)

assert(inputObjs.size === 1)
val privilegeObject = inputObjs.head
assert(privilegeObject.actionType === OTHER)
assert(privilegeObject.privilegeObjectType === FUNCTION)
assert(privilegeObject.dbname === "default")
assert(privilegeObject.objectName === "my_udf")

assert(outputObjs.isEmpty)
}

test("Nested Hive UDF detection via PrivilegesBuilder.build") {
val sqlStr =
s"""
|SELECT my_udf(id)
|FROM (
| SELECT id, value
| FROM test_table
|) t
|""".stripMargin
val plan: LogicalPlan = spark.sql(sqlStr).queryExecution.analyzed

val (inputObjs, outputObjs, operationType) = PrivilegesBuilder.build(plan, spark)
assert(operationType === QUERY)
assert(inputObjs.size === 1)
val privilegeObject = inputObjs.head
assert(privilegeObject.actionType === OTHER)
assert(privilegeObject.privilegeObjectType === FUNCTION)
assert(privilegeObject.dbname === "default")
assert(privilegeObject.objectName === "my_udf")
assert(outputObjs.isEmpty)
}

test("No Hive UDF detection via PrivilegesBuilder.build") {
val sqlStr = s"SELECT id FROM test_table"
val plan: LogicalPlan = spark.sql(sqlStr).queryExecution.analyzed
val (inputObjs, outputObjs, operationType) = PrivilegesBuilder.build(plan, spark)
assert(operationType === QUERY)
assert(inputObjs.isEmpty)
assert(outputObjs.isEmpty)
}

test("AlterDatabasePropertiesCommand") {
assume(SPARK_RUNTIME_VERSION <= "3.2")
val plan = sql("ALTER DATABASE default SET DBPROPERTIES (abc = '123')").queryExecution.analyzed
Expand Down
Loading