diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index c1e418d6249..1dd46a6e736 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -175,11 +175,13 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.engine.kubernetes.submit.timeout | PT30S | The engine submit timeout for Kubernetes application. | duration | 1.7.2 | | kyuubi.engine.operation.convert.catalog.database.enabled | true | When set to true, The engine converts the JDBC methods of set/get Catalog and set/get Schema to the implementation of different engines | boolean | 1.6.0 | | kyuubi.engine.operation.log.dir.root | engine_operation_logs | Root directory for query operation log at engine-side. | string | 1.4.0 | +| kyuubi.engine.pool.adaptive.session.threshold | 10 | The threshold of a engine open session count for adaptive engine pool select policy. | int | 1.10.0 | | kyuubi.engine.pool.name | engine-pool | The name of the engine pool. | string | 1.5.0 | -| kyuubi.engine.pool.selectPolicy | RANDOM | The select policy of an engine from the corresponding engine pool engine for a session. | string | 1.7.0 | +| kyuubi.engine.pool.selectPolicy | RANDOM | The select policy of an engine from the corresponding engine pool engine for a session. | string | 1.7.0 | | kyuubi.engine.pool.size | -1 | The size of the engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold). | int | 1.4.0 | | kyuubi.engine.pool.size.threshold | 9 | This parameter is introduced as a server-side parameter controlling the upper limit of the engine pool. | int | 1.4.0 | | kyuubi.engine.principal | <undefined> | Kerberos principal for the kyuubi engine. | string | 1.10.0 | +| kyuubi.engine.report.interval | PT1M | The interval for the engine to report metrics when using the ADAPTIVE select policy. | duration | 1.10.0 | | kyuubi.engine.session.initialize.sql || SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver. | seq | 1.3.0 | | kyuubi.engine.share.level | USER | Engines will be shared in different levels, available configs are: See also `kyuubi.engine.share.level.subdomain` and `kyuubi.engine.doAs.enabled`. | string | 1.2.0 | | kyuubi.engine.share.level.sub.domain | <undefined> | (deprecated) - Using kyuubi.engine.share.level.subdomain instead | string | 1.2.0 | diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index d1331cd0284..40cc2913b0d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.duration.Duration import scala.util.control.NonFatal +import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.annotations.VisibleForTesting import org.apache.hadoop.fs.Path import org.apache.spark.{ui, SparkConf} @@ -43,7 +44,7 @@ import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, Sp import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager} import org.apache.kyuubi.events.EventBus import org.apache.kyuubi.ha.HighAvailabilityConf._ -import org.apache.kyuubi.ha.client.RetryPolicies +import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, RetryPolicies} import org.apache.kyuubi.service.Serverable import org.apache.kyuubi.session.SessionHandle import org.apache.kyuubi.util.{SignalRegister, ThreadUtils} @@ -56,11 +57,12 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin private val shutdown = new AtomicBoolean(false) private val gracefulStopDeregistered = new AtomicBoolean(false) - + private val objectMapper = new ObjectMapper @volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None @volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None private lazy val engineSavePath = backendService.sessionManager.asInstanceOf[SparkSQLSessionManager].getEngineResultSavePath() + @volatile private var metricsReporter: Option[ScheduledExecutorService] = None override def initialize(conf: KyuubiConf): Unit = { val listener = new SparkSQLEngineListener(this) @@ -97,6 +99,15 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin fs.mkdirs(path) fs.deleteOnExit(path) } + + if (conf.get(ENGINE_POOL_SELECT_POLICY) == "ADAPTIVE") { + val subdomain = conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN) + val shareLevel = conf.get(ENGINE_SHARE_LEVEL) + val enginePoolIgnoreSubdomain = conf.get(ENGINE_POOL_IGNORE_SUBDOMAIN) + if (!"CONNECTION".equals(shareLevel) && (subdomain.isEmpty || enginePoolIgnoreSubdomain)) { + startMetricsReporter() + } + } } override def stop(): Unit = if (shutdown.compareAndSet(false, true)) { @@ -112,6 +123,11 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin exec, Duration(60, TimeUnit.SECONDS)) }) + metricsReporter.foreach(reporter => { + ThreadUtils.shutdown( + reporter, + Duration(60, TimeUnit.SECONDS)) + }) try { val path = new Path(engineSavePath) val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration) @@ -161,6 +177,41 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin } } + private[kyuubi] def startMetricsReporter(): Unit = { + val interval = conf.get(ENGINE_REPORT_INTERVAL) + val engineSpace = conf.get(HA_NAMESPACE) + val statusTracker = spark.sparkContext.statusTracker + val metricsSpace = Utils.concatEngineMetricsPath(engineSpace) + val report: Runnable = () => { + if (!shutdown.get) { + val openSessionCount = backendService.sessionManager.getOpenSessionCount + val activeTask = statusTracker.getActiveStageIds() + .flatMap { stage => + statusTracker.getStageInfo(stage).map(_.numActiveTasks) + }.sum + val engineMetrics = objectMapper.createObjectNode() + .put("openSessionCount", openSessionCount) + .put("activeTask", activeTask) + .put("poolID", engineSpace.split("-").last.toInt).toString + DiscoveryClientProvider.withDiscoveryClient(conf) { client => + if (client.pathNonExists(metricsSpace)) { + client.create(metricsSpace, "PERSISTENT") + } + client.setData( + Utils.concatEngineMetricsPath(engineSpace), + engineMetrics.getBytes) + } + } + } + metricsReporter = + Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-engine-metrics-reporter")) + metricsReporter.get.scheduleWithFixedDelay( + report, + interval, + interval, + TimeUnit.MILLISECONDS) + } + override protected def stopServer(): Unit = { countDownLatch.countDown() } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala index f0b0fea9168..82f7c8b8e4d 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala @@ -456,4 +456,14 @@ object Utils extends Logging { } } + /** + * Concatenates the engine-specific path to the metrics endpoint. + * + * @param path Segment of the path (e.g., engine space) to append to the "/metrics" base path. + * @return The combined metrics path, formatted as "/metrics{path}", where {path} is the input. + */ + def concatEngineMetricsPath(path: String): String = { + s"/metrics$path" + } + } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index bf547c5ff77..e0deb018369 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -2260,13 +2260,30 @@ object KyuubiConf { "a session. ") .version("1.7.0") .stringConf .transformToUpperCase - .checkValues(Set("RANDOM", "POLLING")) + .checkValues(Set("RANDOM", "POLLING", "ADAPTIVE")) .createWithDefault("RANDOM") + val ENGINE_POOL_ADAPTIVE_SESSION_THRESHOLD: ConfigEntry[Int] = + buildConf("kyuubi.engine.pool.adaptive.session.threshold") + .doc("The threshold of a engine open session count for adaptive engine pool select policy.") + .version("1.10.0") + .intConf + .checkValue(_ >= 1, "must be positive number") + .createWithDefault(10) + + val ENGINE_REPORT_INTERVAL: ConfigEntry[Long] = + buildConf("kyuubi.engine.report.interval") + .doc("The interval for the engine to report metrics when using the ADAPTIVE select policy.") + .version("1.10.0") + .timeConf + .checkValue(_ >= Duration.ofSeconds(1).toMillis, "Minimum 1 seconds") + .createWithDefault(Duration.ofMinutes(1).toMillis) + val ENGINE_INITIALIZE_SQL: ConfigEntry[Seq[String]] = buildConf("kyuubi.engine.initialize.sql") .doc("SemiColon-separated list of SQL statements to be initialized in the newly created " + diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala index 88388f3ba8a..43f680a87e6 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala @@ -19,7 +19,9 @@ package org.apache.kyuubi.ha.client import scala.util.control.NonFatal +import org.apache.kyuubi.Utils import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL +import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE import org.apache.kyuubi.service.FrontendService /** @@ -33,6 +35,10 @@ class EngineServiceDiscovery( override def stop(): Unit = synchronized { if (!isServerLost.get()) { discoveryClient.deregisterService() + val path = Utils.concatEngineMetricsPath(conf.get(HA_NAMESPACE)) + if (discoveryClient.pathExists(path)) { + discoveryClient.delete(path) + } conf.get(ENGINE_SHARE_LEVEL) match { // For connection level, we should clean up the namespace in zk in case the disk stress. case "CONNECTION" => diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala index 34ed0559383..1326adc6e3f 100644 --- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala +++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala @@ -224,7 +224,6 @@ abstract class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests } } finally { service.stop() - discovery.stop() } } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index bb7f7ecbcf4..429e2e2f7e8 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -23,6 +23,8 @@ import scala.collection.JavaConverters._ import scala.util.Random import com.codahale.metrics.MetricRegistry +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.google.common.annotations.VisibleForTesting import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Logging, Utils} @@ -66,6 +68,8 @@ private[kyuubi] class EngineRef( private val timeout: Long = conf.get(ENGINE_INIT_TIMEOUT) + private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule) + // Share level of the engine private val shareLevel: ShareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL)) @@ -119,6 +123,10 @@ private[kyuubi] class EngineRef( DiscoveryClientProvider.withDiscoveryClient(conf) { client => client.getAndIncrement(snPath) } + + case "ADAPTIVE" => + getAdaptivePoolId(clientPoolSize) + case "RANDOM" => Random.nextInt(poolSize) } @@ -373,4 +381,48 @@ private[kyuubi] class EngineRef( } } } + + private def getAdaptivePoolId(poolSize: Int): Int = { + val sessionThreshold = conf.get(ENGINE_POOL_ADAPTIVE_SESSION_THRESHOLD) + val metricsSpace = Utils.concatEngineMetricsPath( + s"/${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType/$sessionUser") + DiscoveryClientProvider.withDiscoveryClient(conf) { client => + tryWithLock(client) { + if (client.pathNonExists(metricsSpace)) { + client.create(metricsSpace, "PERSISTENT") + } + } + val metrics = client.getChildren(metricsSpace) + if (metrics.isEmpty) { + return Random.nextInt(poolSize) + } else { + engineType match { + case SPARK_SQL => + val engineMetricsMap = metrics.map { p => + objectMapper.readValue( + new String(client.getData(s"$metricsSpace/$p")), + classOf[Map[String, Int]]) + } + if (engineMetricsMap.isEmpty) { + return Random.nextInt(poolSize) + } else { + val candidate = engineMetricsMap.filter(_.contains("poolID")) + .minBy { map => + ( + map.getOrElse("openSessionCount", 0), + map.getOrElse("activeTask", 0)) + } + if ((candidate.nonEmpty && candidate("openSessionCount") < sessionThreshold) || + metrics.size == poolSize) { + candidate("poolID") + } else { + Random.nextInt(poolSize) + } + } + // TODO: other engine support adaptive + case _ => Random.nextInt(poolSize) + } + } + } + } }