diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionState.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionState.scala new file mode 100644 index 00000000000..1857ae962d6 --- /dev/null +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionState.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.session + +import scala.language.implicitConversions + +import org.apache.kyuubi.KyuubiSQLException + +object SessionState extends Enumeration { + + type SessionState = Value + + val INITIALIZED, PENDING, RUNNING, TIMEOUT, CLOSED, ERROR = + Value + + val terminalStates: Seq[SessionState] = Seq(TIMEOUT, CLOSED, ERROR) + + def validateTransition(oldState: SessionState, newState: SessionState): Unit = { + oldState match { + case INITIALIZED if Set(PENDING, RUNNING, TIMEOUT, CLOSED).contains(newState) => + case PENDING + if Set(RUNNING, TIMEOUT, CLOSED, ERROR).contains( + newState) => + case RUNNING + if Set(TIMEOUT, CLOSED, ERROR).contains(newState) => + case TIMEOUT | ERROR if CLOSED.equals(newState) => + case _ => throw KyuubiSQLException( + s"Illegal Session state transition from $oldState to $newState") + } + } + + def isTerminal(state: SessionState): Boolean = { + terminalStates.contains(state) + } +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index 0aa83662352..d3e38345608 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -70,7 +70,7 @@ private[kyuubi] class EngineRef( // Share level of the engine private val shareLevel: ShareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL)) - private val engineType: EngineType = EngineType.withName(conf.get(ENGINE_TYPE)) + private[kyuubi] val engineType: EngineType = EngineType.withName(conf.get(ENGINE_TYPE)) // Server-side engine pool size threshold private val poolThreshold: Int = diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 0336f678625..5ff3cb1e25f 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -27,13 +27,16 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiConf.EngineOpenOnFailure._ import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY, KYUUBI_SESSION_HANDLE_KEY, KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_SIGN} -import org.apache.kyuubi.engine.{EngineRef, KyuubiApplicationManager} +import org.apache.kyuubi.engine.{EngineRef, EngineType, KyuubiApplicationManager} +import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent} import org.apache.kyuubi.ha.client.DiscoveryClientProvider._ import org.apache.kyuubi.ha.client.ServiceNodeInfo import org.apache.kyuubi.operation.{Operation, OperationHandle} import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.server.metadata.api.Metadata import org.apache.kyuubi.service.authentication.InternalSecurityAccessor +import org.apache.kyuubi.session.SessionState.SessionState import org.apache.kyuubi.session.SessionType.SessionType import org.apache.kyuubi.shaded.hive.service.rpc.thrift._ import org.apache.kyuubi.shaded.thrift.transport.TTransportException @@ -54,6 +57,7 @@ class KyuubiSessionImpl( extends KyuubiSession(protocol, user, password, ipAddress, conf, sessionManager) { override val sessionType: SessionType = SessionType.INTERACTIVE + @volatile protected var state: SessionState = SessionState.INITIALIZED private[kyuubi] val optimizedConf: Map[String, String] = { val confOverlay = sessionManager.sessionConfAdvisor.map(_.getConfOverlay( @@ -73,6 +77,39 @@ class KyuubiSessionImpl( case (key, value) => sessionConf.set(key, value) } + val (clusterManager, kubernetesInfo) = engine.engineType match { + case EngineType.SPARK_SQL => + val builder = + new SparkProcessBuilder(user, doAsEnabled, sessionConf, handle.identifier.toString) + val cm = builder.clusterManager() + val k8sInfo = { + val appMgrInfo = builder.appMgrInfo() + appMgrInfo.kubernetesInfo.context.map { context => + Map(KyuubiConf.KUBERNETES_CONTEXT.key -> context) + }.getOrElse(Map.empty) ++ appMgrInfo.kubernetesInfo.namespace.map { namespace => + Map(KyuubiConf.KUBERNETES_NAMESPACE.key -> namespace) + }.getOrElse(Map.empty) ++ builder.appendPodNameConf(optimizedConf) + } + (cm, k8sInfo) + case _ => (None, Map.empty[String, String]) + } + + private val newMetadata = Metadata( + identifier = handle.identifier.toString, + sessionType = sessionType, + realUser = realUser, + username = user, + ipAddress = ipAddress, + kyuubiInstance = connectionUrl, + state = state.toString, + requestName = name.orNull, + requestConf = optimizedConf ++ kubernetesInfo, // save the kubernetes info + createTime = createTime, + engineType = engine.engineType.toString, + clusterManager = clusterManager) + + sessionManager.insertMetadata(newMetadata) + private lazy val engineCredentials = renewEngineCredentials() lazy val engine: EngineRef = new EngineRef( @@ -115,6 +152,9 @@ class KyuubiSessionImpl( checkSessionAccessPathURIs() + setState(SessionState.PENDING) + updateMetadata() + // we should call super.open before running launch engine operation super.open() @@ -229,6 +269,9 @@ class KyuubiSessionImpl( _client.engineName.foreach(e => sessionEvent.engineName = e) _client.engineUrl.foreach(e => sessionEvent.engineUrl = e) EventBus.post(sessionEvent) + + setState(SessionState.RUNNING) + updateMetadata() } } @@ -281,6 +324,15 @@ class KyuubiSessionImpl( } override def close(): Unit = { + val terminalState = if (!checkEngineConnectionAlive()) { + SessionState.ERROR + } else if (isTimedOut) { + SessionState.TIMEOUT + } else { + SessionState.CLOSED + } + setState(terminalState) + super.close() sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString) try { @@ -289,6 +341,9 @@ class KyuubiSessionImpl( openSessionError.foreach { _ => if (engine != null) engine.close() } sessionEvent.endTime = System.currentTimeMillis() EventBus.post(sessionEvent) + + updateMetadata() + traceMetricsOnClose() } } @@ -326,4 +381,40 @@ class KyuubiSessionImpl( if (client.engineConnectionClosed) return false !client.remoteEngineBroken } + + private def isTimedOut: Boolean = { + lastAccessTime + sessionIdleTimeoutThreshold <= System.currentTimeMillis && + getNoOperationTime > sessionIdleTimeoutThreshold + } + + private def setState(newState: SessionState): Unit = { + SessionState.validateTransition(state, newState) + newState match { + case SessionState.RUNNING => + info(s"Session is ready.") + case SessionState.ERROR | SessionState.TIMEOUT | SessionState.CLOSED => + info(s"Session is terminated.") + case _ => + } + state = newState + } + + private def updateMetadata(): Unit = { + val endTime = if (SessionState.isTerminal(state)) lastAccessTime else 0L + + val metadataToUpdate = state match { + case SessionState.RUNNING => Metadata( + identifier = handle.identifier.toString, + state = state.toString, + engineOpenTime = sessionEvent.openedTime, + engineId = _client.engineId.toString, + engineName = _client.engineName.toString, + engineUrl = _client.engineUrl.toString, + endTime = endTime) + case _ => Metadata( + identifier = handle.identifier.toString, + state = state.toString) + } + sessionManager.updateMetadata(metadataToUpdate) + } }