diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/RSAUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/RSAUtils.scala index 746b3600a6..bded200e24 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/RSAUtils.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/RSAUtils.scala @@ -26,7 +26,7 @@ import java.nio.charset.StandardCharsets import java.security.{KeyPair, KeyPairGenerator, PrivateKey, PublicKey} object RSAUtils { - private implicit val keyPair = genKeyPair(1024) + private implicit val keyPair = genKeyPair(2048) def genKeyPair(keyLength: Int): KeyPair = { val keyPair = KeyPairGenerator.getInstance("RSA") diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala index c550b3f517..02e1762e2e 100644 --- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala +++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala @@ -64,6 +64,9 @@ object HadoopConf { val HDFS_ENABLE_CACHE_CLOSE = CommonVars("linkis.hadoop.hdfs.cache.close.enable", true).getValue + val HDFS_ENABLE_NOT_CLOSE_USERS = + CommonVars("linkis.hadoop.hdfs.cache.not.close.users", "").getValue + val HDFS_ENABLE_CACHE_IDLE_TIME = CommonVars("wds.linkis.hadoop.hdfs.cache.idle.time", 3 * 60 * 1000).getValue diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala index e18837fd5f..d4b6af555a 100644 --- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala +++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala @@ -68,7 +68,10 @@ object HDFSUtils extends Logging { .foreach { hdfsFileSystemContainer => val locker = hdfsFileSystemContainer.getUser + LOCKER_SUFFIX locker.intern() synchronized { - if (hdfsFileSystemContainer.canRemove()) { + if ( + hdfsFileSystemContainer.canRemove() && !HadoopConf.HDFS_ENABLE_NOT_CLOSE_USERS + .contains(hdfsFileSystemContainer.getUser) + ) { fileSystemCache.remove( hdfsFileSystemContainer.getUser + JOINT + hdfsFileSystemContainer.getLabel ) diff --git a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/security/SecurityFilter.scala b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/security/SecurityFilter.scala index 5cc796d23e..b372ead651 100644 --- a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/security/SecurityFilter.scala +++ b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/security/SecurityFilter.scala @@ -83,7 +83,7 @@ class SecurityFilter extends Filter { ServerConfiguration.BDP_SERVER_RESTFUL_PASS_AUTH_REQUEST_URI .exists(r => !r.equals("") && request.getRequestURI.startsWith(r)) ) { - logger.info("pass auth uri: " + request.getRequestURI) + logger.debug("pass auth uri: " + request.getRequestURI) true } else { val userName = Utils.tryCatch(SecurityFilter.getLoginUser(request)) { diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala index 5330983dd6..ace8509d4a 100644 --- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala +++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala @@ -33,7 +33,7 @@ object LinkisStorageConf { CommonVars .apply( "wds.linkis.hdfs.rest.errs", - ".*Filesystem closed.*|.*Failed to find any Kerberos tgt.*" + ".*Filesystem closed.*|.*Failed to find any Kerberos tgt.*|.*The client is stopped.*" ) .getValue diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala index eff8411603..36f4ee8fc4 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala @@ -176,7 +176,7 @@ object LinkisJobBuilder { var authTokenValue: String = CommonVars[String]( "wds.linkis.client.test.common.tokenValue", - "LINKIS_CLI_TEST" + Configuration.LINKIS_TOKEN.getValue ).getValue // This is the default authToken, we usually suggest set different ones for users. def setDefaultClientConfig(clientConfig: DWSClientConfig): Unit = this.clientConfig = clientConfig diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala index d7c4746188..510aabf7f4 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala @@ -18,6 +18,7 @@ package org.apache.linkis.computation.client.once.simple import org.apache.linkis.bml.client.{BmlClient, BmlClientFactory} +import org.apache.linkis.common.conf.Configuration.LINKIS_TOKEN import org.apache.linkis.common.utils.Utils import org.apache.linkis.computation.client.LinkisJobBuilder import org.apache.linkis.computation.client.LinkisJobBuilder.clientConfig @@ -171,7 +172,7 @@ object SimpleOnceJobBuilder { ) // We think 90s is enough, if SocketTimeoutException is throw, just set a new clientConfig to modify it. .setAuthenticationStrategy(new TokenAuthenticationStrategy()) .setAuthTokenKey(TokenAuthenticationStrategy.TOKEN_KEY) - .setAuthTokenValue(LinkisJobBuilder.authTokenValue) + .setAuthTokenValue(LINKIS_TOKEN.getValue) .setDWSVersion(clientConfig.getDWSVersion) .build() bmlClient = BmlClientFactory.createBmlClient(newClientConfig) diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/job/JobRequestConstants.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/job/JobRequestConstants.scala index 110b02b8fe..5aab48388a 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/job/JobRequestConstants.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/job/JobRequestConstants.scala @@ -36,4 +36,6 @@ object JobRequestConstants { val ENABLE_DIRECT_PUSH = "enableDirectPush" val DIRECT_PUSH_FETCH_SIZE = "direct_push_fetch_size" + val LINKIS_HIVE_EC_READ_RESULT_BY_OBJECT = "readResByObject" + } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoad.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoad.scala deleted file mode 100644 index 34928d8525..0000000000 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoad.scala +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.engineconn.computation.executor.hook - -import org.apache.linkis.common.conf.Configuration.IS_VIEW_FS_ENV -import org.apache.linkis.common.utils.{Logging, Utils} -import org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf -import org.apache.linkis.engineconn.computation.executor.execute.{ - ComputationExecutor, - EngineExecutionContext -} -import org.apache.linkis.engineconn.core.engineconn.EngineConnManager -import org.apache.linkis.engineconn.core.executor.ExecutorManager -import org.apache.linkis.manager.label.entity.Label -import org.apache.linkis.manager.label.entity.engine.RunType.RunType -import org.apache.linkis.rpc.Sender -import org.apache.linkis.udf.UDFClientConfiguration -import org.apache.linkis.udf.api.rpc.{RequestPythonModuleProtocol, ResponsePythonModuleProtocol} -import org.apache.linkis.udf.entity.PythonModuleInfoVO - -import org.apache.commons.lang3.StringUtils - -import java.util - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -/** - * The PythonModuleLoad class is designed to load Python modules into the execution environment - * dynamically. This class is not an extension of UDFLoad, but shares a similar philosophy of - * handling dynamic module loading based on user preferences and system configurations. - */ -abstract class PythonModuleLoad extends Logging { - - /** Abstract properties to be defined by the subclass */ - protected val engineType: String - protected val runType: RunType - - protected def getEngineType(): String = engineType - - protected def constructCode(pythonModuleInfo: PythonModuleInfoVO): String - - private def queryPythonModuleRpc( - userName: String, - engineType: String - ): java.util.List[PythonModuleInfoVO] = { - val infoList = Sender - .getSender(UDFClientConfiguration.UDF_SERVICE_NAME.getValue) - .ask(RequestPythonModuleProtocol(userName, engineType)) - .asInstanceOf[ResponsePythonModuleProtocol] - .getModulesInfo() - infoList - } - - protected def getLoadPythonModuleCode: Array[String] = { - val engineCreationContext = - EngineConnManager.getEngineConnManager.getEngineConn.getEngineCreationContext - val user = engineCreationContext.getUser - - var infoList: util.List[PythonModuleInfoVO] = - Utils.tryAndWarn(queryPythonModuleRpc(user, getEngineType())) - if (infoList == null) { - logger.info("rpc get info is empty.") - infoList = new util.ArrayList[PythonModuleInfoVO]() - } - - // 替换Viewfs - if (IS_VIEW_FS_ENV.getValue) { - infoList.asScala.foreach { info => - val path = info.getPath - logger.info(s"python path: ${path}") - if (path.startsWith("hdfs") || path.startsWith("viewfs")) { - info.setPath(path.replace("hdfs://", "viewfs://")) - } else { - info.setPath("viewfs://" + path) - } - } - } else { - - infoList.asScala.foreach { info => - val path = info.getPath - logger.info(s"hdfs python path: ${path}") - if (!path.startsWith("hdfs")) { - info.setPath("hdfs://" + path) - } - } - } - - logger.info(s"${user} load python modules: ") - infoList.asScala.foreach(l => logger.info(s"module name:${l.getName}, path:${l.getPath}\n")) - - // 创建加载code - val codes: mutable.Buffer[String] = infoList.asScala - .filter { info => StringUtils.isNotEmpty(info.getPath) } - .map(constructCode) - // 打印codes - val str: String = codes.mkString("\n") - logger.info(s"python codes: $str") - codes.toArray - } - - private def executeFunctionCode(codes: Array[String], executor: ComputationExecutor): Unit = { - if (null == codes || null == executor) { - return - } - codes.foreach { code => - logger.info("Submit function registration to engine, code: " + code) - Utils.tryCatch(executor.executeLine(new EngineExecutionContext(executor), code)) { - t: Throwable => - logger.error("Failed to load python module", t) - null - } - } - } - - /** - * Generate and execute the code necessary for loading Python modules. - * - * @param executor - * An object capable of executing code in the current engine context. - */ - protected def loadPythonModules(labels: Array[Label[_]]): Unit = { - - val codes = getLoadPythonModuleCode - logger.info(s"codes length: ${codes.length}") - if (null != codes && codes.nonEmpty) { - val executor = ExecutorManager.getInstance.getExecutorByLabels(labels) - if (executor != null) { - val className = executor.getClass.getName - logger.info(s"executor class: ${className}") - } else { - logger.error(s"Failed to load python, executor is null") - } - - executor match { - case computationExecutor: ComputationExecutor => - executeFunctionCode(codes, computationExecutor) - case _ => - } - } - logger.info(s"Successful to load python, engineType : ${engineType}") - } - -} - -// Note: The actual implementation of methods like `executeFunctionCode` and `construct diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoadEngineConnHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoadEngineConnHook.scala index 80eaa888b8..386cd8f0b5 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoadEngineConnHook.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonModuleLoadEngineConnHook.scala @@ -17,12 +17,151 @@ package org.apache.linkis.engineconn.computation.executor.hook +import org.apache.linkis.common.conf.Configuration.IS_VIEW_FS_ENV import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.engineconn.common.conf.EngineConnConf import org.apache.linkis.engineconn.common.creation.EngineCreationContext import org.apache.linkis.engineconn.common.engineconn.EngineConn import org.apache.linkis.engineconn.common.hook.EngineConnHook +import org.apache.linkis.engineconn.computation.executor.execute.{ + ComputationExecutor, + EngineExecutionContext +} +import org.apache.linkis.engineconn.core.engineconn.EngineConnManager +import org.apache.linkis.engineconn.core.executor.ExecutorManager +import org.apache.linkis.hadoop.common.conf.HadoopConf +import org.apache.linkis.hadoop.common.utils.HDFSUtils import org.apache.linkis.manager.label.entity.Label -import org.apache.linkis.manager.label.entity.engine.CodeLanguageLabel +import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, RunType} +import org.apache.linkis.manager.label.entity.engine.RunType.RunType +import org.apache.linkis.rpc.Sender +import org.apache.linkis.udf.UDFClientConfiguration +import org.apache.linkis.udf.api.rpc.{RequestPythonModuleProtocol, ResponsePythonModuleProtocol} +import org.apache.linkis.udf.entity.PythonModuleInfoVO + +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} + +import java.util +import java.util.{Collections, Comparator} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +abstract class PythonModuleLoad extends Logging { + + /** Abstract properties to be defined by the subclass */ + protected val engineType: String + protected val runType: RunType + protected def getEngineType(): String = engineType + protected def constructCode(pythonModuleInfo: PythonModuleInfoVO): String + + private def queryPythonModuleRpc( + userName: String, + engineType: String + ): java.util.List[PythonModuleInfoVO] = { + val infoList = Sender + .getSender(UDFClientConfiguration.UDF_SERVICE_NAME.getValue) + .ask(RequestPythonModuleProtocol(userName, engineType)) + .asInstanceOf[ResponsePythonModuleProtocol] + .getModulesInfo() + // 使用Collections.sort()和Comparator进行排序 + if (infoList != null && !infoList.isEmpty) { + Collections.sort( + infoList, + new Comparator[PythonModuleInfoVO]() { + override def compare(o1: PythonModuleInfoVO, o2: PythonModuleInfoVO): Int = + Integer.compare(o1.getId.toInt, o1.getId.toInt) + } + ) + } + infoList + } + + protected def getLoadPythonModuleCode: Array[String] = { + val engineCreationContext = + EngineConnManager.getEngineConnManager.getEngineConn.getEngineCreationContext + val user = engineCreationContext.getUser + var infoList: util.List[PythonModuleInfoVO] = + Utils.tryAndWarn(queryPythonModuleRpc(user, getEngineType())) + if (infoList == null) { + logger.info("rpc get info is empty.") + infoList = new util.ArrayList[PythonModuleInfoVO]() + } + // 替换Viewfs + if (IS_VIEW_FS_ENV.getValue) { + infoList.asScala.foreach { info => + val path = info.getPath + logger.info(s"python path: ${path}") + if (path.startsWith("hdfs") || path.startsWith("viewfs")) { + info.setPath(path.replace("hdfs://", "viewfs://")) + } else { + info.setPath("viewfs://" + path) + } + } + } else { + infoList.asScala.foreach { info => + val path = info.getPath + logger.info(s"hdfs python path: ${path}") + if (!path.startsWith("hdfs")) { + info.setPath("hdfs://" + path) + } + } + } + logger.info(s"${user} load python modules: ") + infoList.asScala.foreach(l => logger.info(s"module name:${l.getName}, path:${l.getPath}\n")) + // 创建加载code + val codes: mutable.Buffer[String] = infoList.asScala + .filter { info => StringUtils.isNotEmpty(info.getPath) } + .map(constructCode) + // 打印codes + val str: String = codes.mkString("\n") + logger.info(s"python codes: $str") + codes.toArray + } + + private def executeFunctionCode(codes: Array[String], executor: ComputationExecutor): Unit = { + if (null == codes || null == executor) { + return + } + codes.foreach { code => + logger.info("Submit function registration to engine, code: " + code) + Utils.tryCatch(executor.executeLine(new EngineExecutionContext(executor), code)) { + t: Throwable => + logger.error("Failed to load python module", t) + null + } + } + } + + /** + * Generate and execute the code necessary for loading Python modules. + * + * @param executor + * An object capable of executing code in the current engine context. + */ + protected def loadPythonModules(labels: Array[Label[_]]): Unit = { + val codes = getLoadPythonModuleCode + logger.info(s"codes length: ${codes.length}") + if (null != codes && codes.nonEmpty) { + val executor = ExecutorManager.getInstance.getExecutorByLabels(labels) + if (executor != null) { + val className = executor.getClass.getName + logger.info(s"executor class: ${className}") + } else { + logger.error(s"Failed to load python, executor is null") + } + executor match { + case computationExecutor: ComputationExecutor => + executeFunctionCode(codes, computationExecutor) + case _ => + } + } + logger.info(s"Successful to load python, engineType : ${engineType}") + } + +} abstract class PythonModuleLoadEngineConnHook extends PythonModuleLoad @@ -40,7 +179,6 @@ abstract class PythonModuleLoadEngineConnHook val labels = Array[Label[_]](codeLanguageLabel) loadPythonModules(labels) }(s"Failed to load Python Modules: ${engineType}") - } override def afterEngineServerStartFailed( @@ -62,3 +200,60 @@ abstract class PythonModuleLoadEngineConnHook } } + +// 加载PySpark的Python模块 +class PythonSparkEngineHook extends PythonModuleLoadEngineConnHook { + // 设置engineType属性为"spark",表示此挂钩适用于Spark数据处理引擎 + override val engineType: String = "spark" + // 设置runType属性为RunType.PYSPARK,表示此挂钩将执行PySpark类型的代码 + override protected val runType: RunType = RunType.PYSPARK + + // 重写constructCode方法,用于根据Python模块信息构造加载模块的代码 + override protected def constructCode(pythonModuleInfo: PythonModuleInfoVO): String = { + // 使用pythonModuleInfo的path属性,构造SparkContext.addPyFile的命令字符串 + // 这个命令在PySpark环境中将模块文件添加到所有worker上,以便在代码中可以使用 + val path: String = pythonModuleInfo.getPath + val loadCode = s"sc.addPyFile('${path}')" + logger.info(s"pythonLoadCode: ${loadCode}") + loadCode + } + +} + +// 加载Python的Python模块 +class PythonEngineHook extends PythonModuleLoadEngineConnHook { + // 设置engineType属性为"python",表示此挂钩适用于python引擎 + override val engineType: String = "python" + // 设置runType属性为RunType.PYTHON,表示此挂钩将执行python类型的代码 + override protected val runType: RunType = RunType.PYTHON + + // 重写constructCode方法,用于根据Python模块信息构造加载模块的代码 + override protected def constructCode(pythonModuleInfo: PythonModuleInfoVO): String = { + // 处理文件 + val path: String = pythonModuleInfo.getPath + val engineCreationContext: EngineCreationContext = + EngineConnManager.getEngineConnManager.getEngineConn.getEngineCreationContext + val user: String = engineCreationContext.getUser + var loadCode: String = null + logger.info(s"gen code in constructCode") + Utils.tryAndWarn({ + // 获取引擎临时目录 + var tmpDir: String = EngineConnConf.getEngineTmpDir + if (!tmpDir.endsWith("/")) { + tmpDir += "/" + } + val fileName: String = new java.io.File(path).getName + val destPath: String = tmpDir + fileName + val config: Configuration = HDFSUtils.getConfiguration(HadoopConf.HADOOP_ROOT_USER.getValue) + val fs: FileSystem = HDFSUtils.getHDFSUserFileSystem(user, null, config) + fs.copyToLocalFile(new Path(path), new Path("file://" + destPath)) + if (fileName.endsWith("zip")) { + tmpDir += fileName + } + loadCode = s"import sys; sys.path.append('${tmpDir}')" + logger.info(s"5 load local python code: ${loadCode} in path: $destPath") + }) + loadCode + } + +} diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonSparkEngineHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonSparkEngineHook.scala deleted file mode 100644 index 0fe554f93d..0000000000 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/PythonSparkEngineHook.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.engineconn.computation.executor.hook - -import org.apache.linkis.manager.label.entity.engine.RunType -import org.apache.linkis.manager.label.entity.engine.RunType.RunType -import org.apache.linkis.udf.entity.PythonModuleInfoVO - -/** - * 定义一个用于Spark引擎的Python模块加载与执行挂钩的类 - */ -class PythonSparkEngineHook extends PythonModuleLoadEngineConnHook { - - // 设置engineType属性为"spark",表示此挂钩适用于Spark数据处理引擎 - override val engineType: String = "spark" - - // 设置runType属性为RunType.PYSPARK,表示此挂钩将执行PySpark类型的代码 - override protected val runType: RunType = RunType.PYSPARK - - // 重写constructCode方法,用于根据Python模块信息构造加载模块的代码 - override protected def constructCode(pythonModuleInfo: PythonModuleInfoVO): String = { - // 使用pythonModuleInfo的path属性,构造SparkContext.addPyFile的命令字符串 - // 这个命令在PySpark环境中将模块文件添加到所有worker上,以便在代码中可以使用 - val path: String = pythonModuleInfo.getPath - val loadCode = s"sc.addPyFile('${path}')" - logger.info(s"pythonLoadCode: ${loadCode}") - loadCode - } - -} diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EngineConnPluginConf.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EngineConnPluginConf.scala index 704204577e..c36d2a3b1d 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EngineConnPluginConf.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EngineConnPluginConf.scala @@ -35,4 +35,7 @@ object EngineConnPluginConf { "org.apache.linkis.engineconn.launch.EngineConnServer" ) + val PYTHON_VERSION_KEY: String = "python.version" + val SPARK_PYTHON_VERSION_KEY: String = "spark.python.version" + } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java index 5164542445..72b72d8ebf 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java @@ -101,7 +101,7 @@ public class AMConfiguration { CommonVars.apply("wds.linkis.allow.batch.kill.engine.types", "spark,hive,python"); public static final CommonVars UNALLOW_BATCH_KILL_ENGINE_TYPES = - CommonVars.apply("wds.linkis.allow.batch.kill.engine.types", "trino,appconn,io_file"); + CommonVars.apply("wds.linkis.allow.batch.kill.engine.types", "trino,appconn,io_file,nebula"); public static final CommonVars MULTI_USER_ENGINE_USER = CommonVars.apply("wds.linkis.multi.user.engine.user", getDefaultMultiEngineUser()); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala index 8b47daf87b..b39eb91ac9 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala @@ -340,8 +340,11 @@ class RMMonitorRest extends Logging { record.put("engineInstance", node.getServiceInstance.getInstance) } + // return labels + val labels: util.List[Label[_]] = node.getLabels record.put("creator", userCreatorLabel.getCreator) record.put("engineType", engineTypeLabel.getEngineType) + record.put("labels", labels) if (node.getNodeResource != null) { if (node.getNodeResource.getLockedResource != null) { record.put("preUsedResource", node.getNodeResource.getLockedResource) diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala index 788d9fbef8..03036d30b7 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala @@ -52,6 +52,7 @@ import org.apache.linkis.storage.domain.{Column, DataType} import org.apache.linkis.storage.resultset.ResultSetFactory import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord} +import org.apache.commons.collections.MapUtils import org.apache.commons.lang3.StringUtils import org.apache.hadoop.hive.common.HiveInterruptUtils import org.apache.hadoop.hive.conf.HiveConf @@ -124,6 +125,8 @@ class HiveEngineConnExecutor( private val splitter = "_" + private var readResByObject = false + override def init(): Unit = { LOG.info(s"Ready to change engine state!") if (HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) { @@ -137,6 +140,17 @@ class HiveEngineConnExecutor( engineExecutorContext: EngineExecutionContext, code: String ): ExecuteResponse = { + readResByObject = MapUtils.getBooleanValue( + engineExecutorContext.getProperties, + JobRequestConstants.LINKIS_HIVE_EC_READ_RESULT_BY_OBJECT, + false + ) + if (readResByObject) { + hiveConf.set( + "list.sink.output.formatter", + "org.apache.hadoop.hive.serde2.thrift.ThriftFormatter" + ) + } this.engineExecutorContext = engineExecutorContext CSHiveHelper.setContextIDInfoToHiveConf(engineExecutorContext, hiveConf) singleSqlProgressMap.clear() @@ -354,30 +368,36 @@ class HiveEngineConnExecutor( val resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE) resultSetWriter.addMetaData(metaData) val colLength = metaData.columns.length - val result = new util.ArrayList[String]() + val result = new util.ArrayList[Object]() var rows = 0 while (driver.getResults(result)) { - val scalaResult: mutable.Buffer[String] = result.asScala + val scalaResult: mutable.Buffer[Object] = result.asScala scalaResult foreach { s => - val arr: Array[String] = s.split("\t") - val arrAny: ArrayBuffer[Any] = new ArrayBuffer[Any]() - if (arr.length > colLength) { - logger.error( - s"""There is a \t tab in the result of hive code query, hive cannot cut it, please use spark to execute(查询的结果中有\t制表符,hive不能进行切割,请使用spark执行)""" - ) - throw new ErrorException( - 60078, - """There is a \t tab in the result of your query, hive cannot cut it, please use spark to execute(您查询的结果中有\t制表符,hive不能进行切割,请使用spark执行)""" - ) - } - if (arr.length == colLength) arr foreach arrAny.asJava.add - else if (arr.length == 0) for (i <- 1 to colLength) arrAny.asJava add "" - else { - val i = colLength - arr.length - arr foreach arrAny.asJava.add - for (i <- 1 to i) arrAny.asJava add "" + if (!readResByObject) { + val arr: Array[String] = s.asInstanceOf[String].split("\t") + val arrAny: ArrayBuffer[Any] = new ArrayBuffer[Any]() + if (arr.length > colLength) { + logger.error( + s"""There is a \t tab in the result of hive code query, hive cannot cut it, please use spark to execute(查询的结果中有\t制表符,hive不能进行切割,请使用spark执行)""" + ) + throw new ErrorException( + 60078, + """There is a \t tab in the result of your query, hive cannot cut it, please use spark to execute(您查询的结果中有\t制表符,hive不能进行切割,请使用spark执行)""" + ) + } + if (arr.length == colLength) { + arrAny.appendAll(arr) + } else if (arr.length == 0) for (i <- 1 to colLength) arrAny.asJava add "" + else { + val i = colLength - arr.length + arr foreach arrAny.asJava.add + for (i <- 1 to i) arrAny.asJava add "" + } + resultSetWriter.addRecord(new TableRecord(arrAny.toArray)) + } else { + resultSetWriter.addRecord(new TableRecord(s.asInstanceOf[Array[Any]])) } - resultSetWriter.addRecord(new TableRecord(arrAny.toArray)) + } rows += result.size result.clear() diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java index dfbb7a8b13..8015216a34 100644 --- a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java @@ -42,6 +42,9 @@ public class NebulaConfiguration { public static final CommonVars NEBULA_PASSWORD = CommonVars.apply("linkis.nebula.password", "nebula"); + public static final CommonVars NEBULA_SPACE = + CommonVars.apply("linkis.nebula.space", "nebula"); + public static final CommonVars NEBULA_RECONNECT_ENABLED = CommonVars.apply( "linkis.nebula.reconnect.enabled", diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java index 20605a9cb6..a853313ae0 100644 --- a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java @@ -282,9 +282,12 @@ private Session getSession(String taskId, NebulaPool nebulaPool) { String username = NebulaConfiguration.NEBULA_USER_NAME.getValue(configMap); String password = NebulaConfiguration.NEBULA_PASSWORD.getValue(configMap); Boolean reconnect = NebulaConfiguration.NEBULA_RECONNECT_ENABLED.getValue(configMap); - + String space = NebulaConfiguration.NEBULA_SPACE.getValue(configMap); try { session = nebulaPool.getSession(username, password, reconnect); + if (StringUtils.isNotBlank(space)) { + session.execute("use " + space); + } } catch (Exception e) { logger.error("Nebula Session initialization failed."); throw new NebulaClientException( diff --git a/linkis-engineconn-plugins/python/src/main/java/org/apache/linkis/manager/engineplugin/python/errorcode/LinkisPythonErrorCodeSummary.java b/linkis-engineconn-plugins/python/src/main/java/org/apache/linkis/manager/engineplugin/python/errorcode/LinkisPythonErrorCodeSummary.java index 9158ae6a8a..36ee945e2e 100644 --- a/linkis-engineconn-plugins/python/src/main/java/org/apache/linkis/manager/engineplugin/python/errorcode/LinkisPythonErrorCodeSummary.java +++ b/linkis-engineconn-plugins/python/src/main/java/org/apache/linkis/manager/engineplugin/python/errorcode/LinkisPythonErrorCodeSummary.java @@ -21,8 +21,7 @@ public enum LinkisPythonErrorCodeSummary implements LinkisErrorCode { PYTHON_EXECUTE_ERROR(60002, ""), - PYSPARK_PROCESSS_STOPPED( - 60003, "Pyspark process has stopped, query failed!(Pyspark 进程已停止,查询失败!)"), + PYSPARK_PROCESSS_STOPPED(60003, "python process has stopped, query failed!(Python 进程已停止,查询失败!)"), INVALID_PYTHON_SESSION(400201, "Invalid python session.(无效的 python 会话.)"); /** 错误码 */ private final int errorCode; diff --git a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala index 3b17fa60a4..57943ca329 100644 --- a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala @@ -17,6 +17,7 @@ package org.apache.linkis.manager.engineplugin.python.executor +import org.apache.linkis.common.log.LogUtils import org.apache.linkis.common.utils.Utils import org.apache.linkis.engineconn.computation.executor.execute.{ ComputationExecutor, @@ -31,6 +32,7 @@ import org.apache.linkis.manager.common.entity.resource.{ NodeResource } import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf +import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf.PYTHON_VERSION_KEY import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils import org.apache.linkis.manager.engineplugin.python.conf.PythonEngineConfiguration import org.apache.linkis.manager.label.entity.Label @@ -61,7 +63,7 @@ class PythonEngineConnExecutor(id: Int, pythonSession: PythonSession, outputPrin private def getPyVersion(): String = { if (null != EngineConnServer.getEngineCreationContext.getOptions) { EngineConnServer.getEngineCreationContext.getOptions - .getOrDefault("python.version", "python") + .getOrDefault(PYTHON_VERSION_KEY, "python") } else { PythonEngineConfiguration.PYTHON_VERSION.getValue } @@ -72,13 +74,13 @@ class PythonEngineConnExecutor(id: Int, pythonSession: PythonSession, outputPrin code: String ): ExecuteResponse = { val pythonVersion = engineExecutionContext.getProperties - .getOrDefault("python.version", pythonDefaultVersion) + .getOrDefault(PYTHON_VERSION_KEY, pythonDefaultVersion) .toString .toLowerCase() logger.info(s" EngineExecutionContext user python.version = > ${pythonVersion}") - System.getProperties.put("python.version", pythonVersion) + System.getProperties.put(PYTHON_VERSION_KEY, pythonVersion) logger.info( - s" System getProperties python.version = > ${System.getProperties.getProperty("python.version")}" + s" System getProperties python.version = > ${System.getProperties.getProperty(PYTHON_VERSION_KEY)}" ) // System.getProperties.put("python.application.pyFiles", engineExecutionContext.getProperties.getOrDefault("python.application.pyFiles", "file:///mnt/bdap/test/test/test.zip").toString) pythonSession.lazyInitGateway() @@ -89,6 +91,11 @@ class PythonEngineConnExecutor(id: Int, pythonSession: PythonSession, outputPrin logger.info("Python executor reset new engineExecutorContext!") } engineExecutionContext.appendStdout(s"$getId >> ${code.trim}") + if (this.engineExecutionContext.getCurrentParagraph == 1) { + engineExecutionContext.appendStdout( + LogUtils.generateInfo(s"Your Python Version is:\n$pythonVersion") + ) + } pythonSession.execute(code) // lineOutputStream.flush() SuccessExecuteResponse() diff --git a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala index 9ef07b3eae..7479e94f15 100644 --- a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala +++ b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala @@ -22,6 +22,7 @@ import org.apache.linkis.engineconn.computation.executor.execute.EngineExecution import org.apache.linkis.engineconn.computation.executor.rs.RsOutputStream import org.apache.linkis.engineconn.launch.EngineConnServer import org.apache.linkis.governance.common.utils.GovernanceUtils +import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf.PYTHON_VERSION_KEY import org.apache.linkis.manager.engineplugin.python.conf.PythonEngineConfiguration import org.apache.linkis.manager.engineplugin.python.errorcode.LinkisPythonErrorCodeSummary._ import org.apache.linkis.manager.engineplugin.python.exception.{ @@ -69,7 +70,7 @@ class PythonSession extends Logging { private def getPyVersion(): String = { if (null != EngineConnServer.getEngineCreationContext.getOptions) { EngineConnServer.getEngineCreationContext.getOptions - .getOrDefault("python.version", "python") + .getOrDefault(PYTHON_VERSION_KEY, "python") } else { PythonEngineConfiguration.PYTHON_VERSION.getValue } diff --git a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/hook/PythonVersionEngineHook.scala b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/hook/PythonVersionEngineHook.scala index deec7ebcaa..e9f3e2f0ef 100644 --- a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/hook/PythonVersionEngineHook.scala +++ b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/hook/PythonVersionEngineHook.scala @@ -21,6 +21,7 @@ import org.apache.linkis.common.utils.Logging import org.apache.linkis.engineconn.common.creation.EngineCreationContext import org.apache.linkis.engineconn.common.engineconn.EngineConn import org.apache.linkis.engineconn.common.hook.EngineConnHook +import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf.PYTHON_VERSION_KEY import org.apache.linkis.manager.engineplugin.python.conf.PythonEngineConfiguration import org.apache.linkis.manager.engineplugin.python.executor.PythonSession @@ -36,7 +37,7 @@ class PythonVersionEngineHook extends EngineConnHook with Logging { val params = if (engineCreationContext.getOptions == null) new util.HashMap[String, String]() else engineCreationContext.getOptions - _pythonVersion = params.getOrDefault("python.version", "python3") + _pythonVersion = params.getOrDefault(PYTHON_VERSION_KEY, "python3") _pythonExtraPackage = params .getOrDefault("python.application.pyFiles", "file:///mnt/bdap/test/test/test.zip") .toString diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala index 7fed0f436d..388cc4f27e 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala @@ -179,7 +179,9 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) // with unit if set configuration with unit // if not set sc get will get the value of spark.yarn.executor.memoryOverhead such as 512(without unit) val memoryOverhead = sc.getConf.get("spark.executor.memoryOverhead", "1G") - + val pythonVersion = SparkConfiguration.SPARK_PYTHON_VERSION.getValue( + EngineConnObject.getEngineCreationContext.getOptions + ) val sb = new StringBuilder sb.append(s"spark.executor.instances=$executorNum\n") sb.append(s"spark.executor.memory=${executorMem}G\n") @@ -188,6 +190,7 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) sb.append(s"spark.driver.cores=$sparkDriverCores\n") sb.append(s"spark.yarn.queue=$queue\n") sb.append(s"spark.executor.memoryOverhead=${memoryOverhead}\n") + sb.append(s"spark.python.version=$pythonVersion\n") sb.append("\n") engineExecutionContext.appendStdout( LogUtils.generateInfo(s" Your spark job exec with configs:\n${sb.toString()}") diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala index f947db9338..9ebda85ef6 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala @@ -33,6 +33,7 @@ import org.apache.linkis.engineplugin.spark.imexport.CsvRelation import org.apache.linkis.engineplugin.spark.utils.EngineUtils import org.apache.linkis.governance.common.paser.PythonCodeParser import org.apache.linkis.governance.common.utils.GovernanceUtils +import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf.SPARK_PYTHON_VERSION_KEY import org.apache.linkis.scheduler.executer.{ExecuteResponse, SuccessExecuteResponse} import org.apache.linkis.storage.resultset.ResultSetWriter @@ -154,10 +155,10 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In private def initGateway = { // If the python version set by the user is obtained from the front end as python3, the environment variable of python3 is taken; otherwise, the default is python2 logger.info( - s"spark.python.version => ${engineCreationContext.getOptions.get("spark.python.version")}" + s"spark.python.version => ${engineCreationContext.getOptions.get(SPARK_PYTHON_VERSION_KEY)}" ) val userDefinePythonVersion = engineCreationContext.getOptions - .getOrDefault("spark.python.version", "python") + .getOrDefault(SPARK_PYTHON_VERSION_KEY, "python") .toLowerCase() val sparkPythonVersion = if ( diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala index cb7998e5c4..a93c25d508 100644 --- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala +++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala @@ -44,6 +44,8 @@ class ComputationEngineConnExecutor(engineNode: EngineNode) extends AbstractEngi override def getServiceInstance: ServiceInstance = engineNode.getServiceInstance + def getEngineNode: EngineNode = engineNode + private def getEngineConnSender: Sender = Sender.getSender(getServiceInstance) override def getTicketId: String = engineNode.getTicketId diff --git a/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/constant/Constants.scala b/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/constant/Constants.scala index 1743edac30..a0b163756e 100644 --- a/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/constant/Constants.scala +++ b/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/constant/Constants.scala @@ -18,6 +18,7 @@ package org.apache.linkis.configuration.constant import org.apache.linkis.common.conf.CommonVars +import org.apache.linkis.common.conf.Configuration.LINKIS_TOKEN object Constants { @@ -28,7 +29,7 @@ object Constants { CommonVars[String]("linkis.configuration.linkisclient.auth.token.key", "Validation-Code") val AUTH_TOKEN_VALUE: CommonVars[String] = - CommonVars[String]("linkis.configuration.linkisclient.auth.token.value", "LINKIS-AUTH") + CommonVars[String]("linkis.configuration.linkisclient.auth.token.value", LINKIS_TOKEN.getValue) val CONNECTION_MAX_SIZE: CommonVars[Int] = CommonVars[Int]("linkis.configuration.linkisclient.connection.max.size", 10) diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java index 8a09a0e726..4fa9f49175 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java @@ -18,8 +18,8 @@ package org.apache.linkis.metadata.service.impl; import org.apache.linkis.common.utils.ByteTimeUtils; -import org.apache.linkis.hadoop.common.conf.HadoopConf; import org.apache.linkis.hadoop.common.utils.HDFSUtils; +import org.apache.linkis.hadoop.common.utils.KerberosUtils; import org.apache.linkis.metadata.dao.MdqDao; import org.apache.linkis.metadata.domain.mdq.DomainCoversionUtils; import org.apache.linkis.metadata.domain.mdq.Tunple; @@ -110,6 +110,11 @@ public Long persistTable(MdqTableBO mdqTableBO, String userName) { mdqFieldList.remove(collect.get(1)); } } + if (!table.getPartitionTable()) { + mdqFieldList.stream() + .filter(MdqField::getPartitionField) + .forEach(mdqField -> mdqField.setPartitionField(false)); + } mdqDao.insertFields(mdqFieldList); if (mdqTableBO.getImportInfo() != null) { MdqTableImportInfoBO importInfo = mdqTableBO.getImportInfo(); @@ -151,7 +156,7 @@ public void checkIfNeedDeleteTable(MdqTableBO mdqTableBO) { if (isImport && (importType == MdqImportType.Csv.ordinal() || importType == MdqImportType.Excel.ordinal())) { - String destination = mdqTableBO.getImportInfo().getArgs().get("destination"); + String destination = mdqTableBO.getImportInfo().getDestination(); HashMap hashMap = new Gson().fromJson(destination, HashMap.class); if (Boolean.valueOf(hashMap.get("importData").toString())) { logger.info( @@ -210,10 +215,10 @@ public MdqTableBaseInfoVO getTableBaseInfoFromHive(MetadataQueryParam queryParam .parallelStream() .filter(f -> queryParam.getTableName().equals(f.get("NAME"))) .findFirst(); - Map talbe = + Map table = tableOptional.orElseThrow(() -> new IllegalArgumentException("table不存在")); MdqTableBaseInfoVO mdqTableBaseInfoVO = - DomainCoversionUtils.mapToMdqTableBaseInfoVO(talbe, queryParam.getDbName()); + DomainCoversionUtils.mapToMdqTableBaseInfoVO(table, queryParam.getDbName()); String tableComment = hiveMetaDao.getTableComment(queryParam.getDbName(), queryParam.getTableName()); mdqTableBaseInfoVO.getBase().setComment(tableComment); @@ -379,14 +384,27 @@ private int getTableFileNum(String tableLocation) throws IOException { } private String getTableSize(String tableLocation) throws IOException { - String tableSize = "0B"; - if (StringUtils.isNotBlank(tableLocation)) { - FileStatus tableFile = getFileStatus(tableLocation); - tableSize = - ByteTimeUtils.bytesToString( - getRootHdfs().getContentSummary(tableFile.getPath()).getLength()); + try { + String tableSize = "0B"; + if (StringUtils.isNotBlank(tableLocation) && getRootHdfs().exists(new Path(tableLocation))) { + FileStatus tableFile = getFileStatus(tableLocation); + tableSize = + ByteTimeUtils.bytesToString( + getRootHdfs().getContentSummary(tableFile.getPath()).getLength()); + } + return tableSize; + } catch (IOException e) { + String message = e.getMessage(); + String rootCauseMessage = ExceptionUtils.getRootCauseMessage(e); + if (message != null && message.matches(DWSConfig.HDFS_FILE_SYSTEM_REST_ERRS) + || rootCauseMessage.matches(DWSConfig.HDFS_FILE_SYSTEM_REST_ERRS)) { + logger.info("Failed to get tableSize, retry", e); + resetRootHdfs(); + return getTableSize(tableLocation); + } else { + throw e; + } } - return tableSize; } private static volatile FileSystem rootHdfs = null; @@ -397,9 +415,8 @@ private FileStatus getFileStatus(String location) throws IOException { } catch (IOException e) { String message = e.getMessage(); String rootCauseMessage = ExceptionUtils.getRootCauseMessage(e); - if ((message != null && message.matches(DWSConfig.HDFS_FILE_SYSTEM_REST_ERRS)) - || (rootCauseMessage != null - && rootCauseMessage.matches(DWSConfig.HDFS_FILE_SYSTEM_REST_ERRS))) { + if (message != null && message.matches(DWSConfig.HDFS_FILE_SYSTEM_REST_ERRS) + || rootCauseMessage.matches(DWSConfig.HDFS_FILE_SYSTEM_REST_ERRS)) { logger.info("Failed to getFileStatus, retry", e); resetRootHdfs(); return getFileStatus(location); @@ -410,11 +427,6 @@ private FileStatus getFileStatus(String location) throws IOException { } private void resetRootHdfs() { - if (HadoopConf.HDFS_ENABLE_CACHE()) { - HDFSUtils.closeHDFSFIleSystem( - HDFSUtils.getHDFSRootUserFileSystem(), HadoopConf.HADOOP_ROOT_USER().getValue(), true); - return; - } if (rootHdfs != null) { synchronized (this) { if (rootHdfs != null) { @@ -427,15 +439,11 @@ private void resetRootHdfs() { } private FileSystem getRootHdfs() { - - if (HadoopConf.HDFS_ENABLE_CACHE()) { - return HDFSUtils.getHDFSRootUserFileSystem(); - } - if (rootHdfs == null) { synchronized (this) { if (rootHdfs == null) { rootHdfs = HDFSUtils.getHDFSRootUserFileSystem(); + KerberosUtils.startKerberosRefreshThread(); } } } diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/cache/impl/DefaultQueryCacheManager.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/cache/impl/DefaultQueryCacheManager.java index 7ff5aeb32d..695c38b703 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/cache/impl/DefaultQueryCacheManager.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/cache/impl/DefaultQueryCacheManager.java @@ -20,7 +20,6 @@ import org.apache.linkis.jobhistory.cache.QueryCacheManager; import org.apache.linkis.jobhistory.conf.JobhistoryConfiguration; import org.apache.linkis.jobhistory.dao.JobHistoryMapper; -import org.apache.linkis.jobhistory.entity.JobHistory; import org.apache.linkis.jobhistory.util.QueryConfig; import org.apache.commons.lang3.time.DateUtils; @@ -32,14 +31,9 @@ import javax.annotation.PostConstruct; -import java.util.Arrays; -import java.util.Date; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.function.Consumer; -import com.github.pagehelper.PageHelper; -import com.github.pagehelper.PageInfo; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Maps; @@ -187,24 +181,24 @@ public void refreshAll() { @Override public void refreshUndoneTask() { - PageHelper.startPage(1, 10); - List queryTasks = null; + List queryTasks = null; + Date eDate = new Date(System.currentTimeMillis()); + Date sDate = DateUtils.addDays(eDate, -1); try { - - Date eDate = new Date(System.currentTimeMillis()); - Date sDate = DateUtils.addDays(eDate, -1); queryTasks = jobHistoryMapper.searchWithIdOrderAsc( sDate, eDate, undoneTaskMinId, Arrays.asList("Running", "Inited", "Scheduled")); - } finally { - PageHelper.clearPage(); + } catch (Exception e) { + logger.warn("Failed to refresh undone tasks", e); } - - PageInfo pageInfo = new PageInfo<>(queryTasks); - List list = pageInfo.getList(); - if (!list.isEmpty()) { - undoneTaskMinId = list.get(0).getId(); + if (null != queryTasks && !queryTasks.isEmpty()) { + undoneTaskMinId = queryTasks.get(0).longValue(); logger.info("Refreshing undone tasks, minimum id: {}", undoneTaskMinId); + } else { + Integer maxID = jobHistoryMapper.maxID(sDate, eDate, undoneTaskMinId); + if (null != maxID && maxID > undoneTaskMinId) { + undoneTaskMinId = maxID.longValue(); + } } } diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java index 34f8933176..5783e86a91 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java @@ -45,12 +45,17 @@ public interface JobHistoryMapper { backoff = @Backoff(delay = 10000)) void updateJobHistory(JobHistory jobReq); - List searchWithIdOrderAsc( + List searchWithIdOrderAsc( @Param("startDate") Date startDate, @Param("endDate") Date endDate, @Param("startId") Long startId, @Param("status") List status); + Integer maxID( + @Param("startDate") Date startDate, + @Param("endDate") Date endDate, + @Param("startId") Long startId); + List search( @Param("id") Long id, @Param("umUser") String username, diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/mysql/JobHistoryMapper.xml b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/mysql/JobHistoryMapper.xml index 74cf9057f5..4122fdcf2f 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/mysql/JobHistoryMapper.xml +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/mysql/JobHistoryMapper.xml @@ -118,13 +118,22 @@ ORDER BY job.created_time DESC - + /*slave*/ SELECT id FROM linkis_ps_job_history_group_history and created_time >= #{startDate} AND created_time #{endDate} and id >= #{startId} and #{element} + limit 2 + + + diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala index 76fe090ce6..68441b9e85 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala @@ -384,7 +384,13 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging { cacheKey, new Callable[Integer] { override def call(): Integer = { - getCountUndoneTasks(username, creator, sDate, eDate, engineType, startJobId) + try { + getCountUndoneTasks(username, creator, sDate, eDate, engineType, startJobId) + } catch { + case e: Exception => + logger.error("Failed to get count undone tasks", e) + 0 + } } } ) @@ -399,7 +405,7 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging { engineType: String, startJobId: lang.Long ): Integer = { - logger.info("Get count undone Tasks {}, {}, {}", username, creator, engineType) + logger.info("Get count undone Tasks {}, {}, {}, {}", username, creator, engineType, startJobId) val statusList: util.List[String] = new util.ArrayList[String]() statusList.add(TaskStatus.Running.toString) statusList.add(TaskStatus.Inited.toString) diff --git a/linkis-public-enhancements/linkis-jobhistory/src/test/java/org/apache/linkis/jobhistory/dao/JobHistoryMapperTest.java b/linkis-public-enhancements/linkis-jobhistory/src/test/java/org/apache/linkis/jobhistory/dao/JobHistoryMapperTest.java index b27d7e7d15..d987144c6c 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/test/java/org/apache/linkis/jobhistory/dao/JobHistoryMapperTest.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/test/java/org/apache/linkis/jobhistory/dao/JobHistoryMapperTest.java @@ -102,7 +102,7 @@ public void searchWithIdOrderAscTest() { status.add("Succeed"); Date eDate = new Date(System.currentTimeMillis()); Date sDate = DateUtils.addDays(eDate, -1); - List histories = jobHistoryMapper.searchWithIdOrderAsc(sDate, eDate, 1L, status); + List histories = jobHistoryMapper.searchWithIdOrderAsc(sDate, eDate, 1L, status); Assertions.assertTrue(histories.isEmpty()); } diff --git a/linkis-public-enhancements/linkis-pes-client/src/main/java/org/apache/linkis/errorcode/client/ClientConfiguration.java b/linkis-public-enhancements/linkis-pes-client/src/main/java/org/apache/linkis/errorcode/client/ClientConfiguration.java index e3b3cebba7..27062ad597 100644 --- a/linkis-public-enhancements/linkis-pes-client/src/main/java/org/apache/linkis/errorcode/client/ClientConfiguration.java +++ b/linkis-public-enhancements/linkis-pes-client/src/main/java/org/apache/linkis/errorcode/client/ClientConfiguration.java @@ -18,6 +18,7 @@ package org.apache.linkis.errorcode.client; import org.apache.linkis.common.conf.CommonVars; +import org.apache.linkis.common.conf.Configuration; public class ClientConfiguration { @@ -37,7 +38,7 @@ public class ClientConfiguration { CommonVars.apply("wds.linkis.errorcode.read.timeout", 10 * 60 * 1000L); public static final CommonVars AUTH_TOKEN_VALUE = - CommonVars.apply("wds.linkis.errorcode.auth.token", "LINKIS-AUTH"); + CommonVars.apply("wds.linkis.errorcode.auth.token", Configuration.LINKIS_TOKEN().getValue()); public static final CommonVars FUTURE_TIME_OUT = CommonVars.apply("wds.linkis.errorcode.future.timeout", 2000L); diff --git a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/bml/conf/BmlConfiguration.scala b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/bml/conf/BmlConfiguration.scala index 00f1d95559..cff372440a 100644 --- a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/bml/conf/BmlConfiguration.scala +++ b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/bml/conf/BmlConfiguration.scala @@ -18,6 +18,7 @@ package org.apache.linkis.bml.conf import org.apache.linkis.common.conf.CommonVars +import org.apache.linkis.common.conf.Configuration.LINKIS_TOKEN object BmlConfiguration { @@ -35,7 +36,7 @@ object BmlConfiguration { CommonVars[String]("wds.linkis.bml.auth.token.key", "Validation-Code") val AUTH_TOKEN_VALUE: CommonVars[String] = - CommonVars[String]("wds.linkis.bml.auth.token.value", "LINKIS-AUTH") + CommonVars[String]("wds.linkis.bml.auth.token.value", LINKIS_TOKEN.getValue) val CONNECTION_MAX_SIZE: CommonVars[Int] = CommonVars[Int]("wds.linkis.bml.connection.max.size", 10) diff --git a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/cs/client/utils/ContextClientConf.scala b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/cs/client/utils/ContextClientConf.scala index ee1b6e02ad..120271d38a 100644 --- a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/cs/client/utils/ContextClientConf.scala +++ b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/cs/client/utils/ContextClientConf.scala @@ -18,6 +18,7 @@ package org.apache.linkis.cs.client.utils import org.apache.linkis.common.conf.CommonVars +import org.apache.linkis.common.conf.Configuration.LINKIS_TOKEN object ContextClientConf { @@ -25,7 +26,7 @@ object ContextClientConf { CommonVars[String]("wds.linkis.context.client.auth.key", "Token-Code") val CONTEXT_CLIENT_AUTH_VALUE: CommonVars[String] = - CommonVars[String]("wds.linkis.context.client.auth.value", "LINKIS-AUTH") + CommonVars[String]("wds.linkis.context.client.auth.value", LINKIS_TOKEN.getValue) val URL_PREFIX: CommonVars[String] = CommonVars[String]( diff --git a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/datasource/client/config/DatasourceClientConfig.scala b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/datasource/client/config/DatasourceClientConfig.scala index eff380f60b..426c5aee1d 100644 --- a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/datasource/client/config/DatasourceClientConfig.scala +++ b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/datasource/client/config/DatasourceClientConfig.scala @@ -18,6 +18,7 @@ package org.apache.linkis.datasource.client.config import org.apache.linkis.common.conf.CommonVars +import org.apache.linkis.common.conf.Configuration.LINKIS_TOKEN object DatasourceClientConfig { @@ -34,7 +35,7 @@ object DatasourceClientConfig { CommonVars[String]("wds.linkis.server.dsm.auth.token.key", "Token-Code") val AUTH_TOKEN_VALUE: CommonVars[String] = - CommonVars[String]("wds.linkis.server.dsm.auth.token.value", "LINKIS-AUTH") + CommonVars[String]("wds.linkis.server.dsm.auth.token.value", LINKIS_TOKEN.getValue) val DATA_SOURCE_SERVICE_CLIENT_NAME: CommonVars[String] = CommonVars[String]("wds.linkis.server.dsm.client.name", "DataSource-Client") diff --git a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/filesystem/conf/WorkspaceClientConf.scala b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/filesystem/conf/WorkspaceClientConf.scala index e5e1c963e0..b37dd785b3 100644 --- a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/filesystem/conf/WorkspaceClientConf.scala +++ b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/filesystem/conf/WorkspaceClientConf.scala @@ -18,6 +18,7 @@ package org.apache.linkis.filesystem.conf import org.apache.linkis.common.conf.CommonVars +import org.apache.linkis.common.conf.Configuration.LINKIS_TOKEN object WorkspaceClientConf { @@ -37,7 +38,7 @@ object WorkspaceClientConf { CommonVars[String]("wds.linkis.filesystem.token.key", "Validation-Code").getValue val tokenValue: String = - CommonVars[String]("wds.linkis.filesystem.token.value", "LINKIS-AUTH").getValue + CommonVars[String]("wds.linkis.filesystem.token.value", LINKIS_TOKEN.getValue).getValue val scriptFromBMLUrl: String = prefix + scriptFromBML } diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala index 61a9750ce5..5fc80d7afc 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewayConfiguration.scala @@ -18,6 +18,7 @@ package org.apache.linkis.gateway.config import org.apache.linkis.common.conf.CommonVars +import org.apache.linkis.common.conf.Configuration.LINKIS_TOKEN object GatewayConfiguration { @@ -69,7 +70,9 @@ object GatewayConfiguration { val ENABLE_GATEWAY_AUTH = CommonVars("wds.linkis.enable.gateway.auth", false) val AUTH_IP_FILE = CommonVars("wds.linkis.gateway.auth.file", "auth.txt") - val DEFAULT_GATEWAY_ACCESS_TOKEN = CommonVars("wds.linkis.gateway.access.token", "LINKIS-AUTH") + + val DEFAULT_GATEWAY_ACCESS_TOKEN = + CommonVars("wds.linkis.gateway.access.token", LINKIS_TOKEN.getValue) val CONTROL_WORKSPACE_ID_LIST = CommonVars("wds.linkis.gateway.control.workspace.ids", "224") diff --git a/pom.xml b/pom.xml index 88cf0be8b3..28aba1ef3c 100644 --- a/pom.xml +++ b/pom.xml @@ -154,7 +154,7 @@ 1.5.4 1.4.21 6.4.0 - 1.33 + 2.0 3.25.5 diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt index 6654575af3..c1f7b7ecfb 100644 --- a/tool/dependencies/known-dependencies.txt +++ b/tool/dependencies/known-dependencies.txt @@ -698,7 +698,7 @@ sketches-core-0.9.0.jar slf4j-api-1.7.30.jar slf4j-reload4j-1.7.36.jar slice-0.38.jar -snakeyaml-1.33.jar +snakeyaml-2.0.jar snappy-java-1.1.4.jar snappy-java-1.1.7.7.jar snappy-java-1.1.8.2.jar