Skip to content

[KYUUBI #5373] Engine adaptive pool size #5662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,13 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.kubernetes.submit.timeout | PT30S | The engine submit timeout for Kubernetes application. | duration | 1.7.2 |
| kyuubi.engine.operation.convert.catalog.database.enabled | true | When set to true, The engine converts the JDBC methods of set/get Catalog and set/get Schema to the implementation of different engines | boolean | 1.6.0 |
| kyuubi.engine.operation.log.dir.root | engine_operation_logs | Root directory for query operation log at engine-side. | string | 1.4.0 |
| kyuubi.engine.pool.adaptive.session.threshold | 10 | The threshold of a engine open session count for adaptive engine pool select policy. | int | 1.10.0 |
| kyuubi.engine.pool.name | engine-pool | The name of the engine pool. | string | 1.5.0 |
| kyuubi.engine.pool.selectPolicy | RANDOM | The select policy of an engine from the corresponding engine pool engine for a session. <ul><li>RANDOM - Randomly use the engine in the pool</li><li>POLLING - Polling use the engine in the pool</li></ul> | string | 1.7.0 |
| kyuubi.engine.pool.selectPolicy | RANDOM | The select policy of an engine from the corresponding engine pool engine for a session. <ul><li>RANDOM - Randomly use the engine in the pool</li><li>POLLING - Polling use the engine in the pool</li><li>ADAPTIVE - ADAPTIVE use the engine in the pool</li></ul> | string | 1.7.0 |
| kyuubi.engine.pool.size | -1 | The size of the engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold). | int | 1.4.0 |
| kyuubi.engine.pool.size.threshold | 9 | This parameter is introduced as a server-side parameter controlling the upper limit of the engine pool. | int | 1.4.0 |
| kyuubi.engine.principal | &lt;undefined&gt; | Kerberos principal for the kyuubi engine. | string | 1.10.0 |
| kyuubi.engine.report.interval | PT1M | The interval for the engine to report metrics when using the ADAPTIVE select policy. | duration | 1.10.0 |
| kyuubi.engine.session.initialize.sql || SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver. | seq | 1.3.0 |
| kyuubi.engine.share.level | USER | Engines will be shared in different levels, available configs are: <ul> <li>CONNECTION: the engine will not be shared but only used by the current client connection, and the engine will be launched by session user.</li> <li>USER: the engine will be shared by all sessions created by a unique username, and the engine will be launched by session user.</li> <li>GROUP: the engine will be shared by all sessions created by all users belong to the same primary group name. The engine will be launched by the primary group name as the effective username, so here the group name is in value of special user who is able to visit the computing resources/data of the team. It follows the [Hadoop GroupsMapping](https://reurl.cc/xE61Y5) to map user to a primary group. If the primary group is not found, it fallback to the USER level. <li>SERVER: the engine will be shared by Kyuubi servers, and the engine will be launched by Server's user.</li> </ul> See also `kyuubi.engine.share.level.subdomain` and `kyuubi.engine.doAs.enabled`. | string | 1.2.0 |
| kyuubi.engine.share.level.sub.domain | &lt;undefined&gt; | (deprecated) - Using kyuubi.engine.share.level.subdomain instead | string | 1.2.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.annotations.VisibleForTesting
import org.apache.hadoop.fs.Path
import org.apache.spark.{ui, SparkConf}
Expand All @@ -43,7 +44,7 @@ import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, Sp
import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager}
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, RetryPolicies}
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.{SignalRegister, ThreadUtils}
Expand All @@ -56,11 +57,12 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin

private val shutdown = new AtomicBoolean(false)
private val gracefulStopDeregistered = new AtomicBoolean(false)

private val objectMapper = new ObjectMapper
@volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None
@volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
private lazy val engineSavePath =
backendService.sessionManager.asInstanceOf[SparkSQLSessionManager].getEngineResultSavePath()
@volatile private var metricsReporter: Option[ScheduledExecutorService] = None

override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)
Expand Down Expand Up @@ -97,6 +99,15 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
fs.mkdirs(path)
fs.deleteOnExit(path)
}

if (conf.get(ENGINE_POOL_SELECT_POLICY) == "ADAPTIVE") {
val subdomain = conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN)
val shareLevel = conf.get(ENGINE_SHARE_LEVEL)
val enginePoolIgnoreSubdomain = conf.get(ENGINE_POOL_IGNORE_SUBDOMAIN)
if (!"CONNECTION".equals(shareLevel) && (subdomain.isEmpty || enginePoolIgnoreSubdomain)) {
startMetricsReporter()
}
}
}

override def stop(): Unit = if (shutdown.compareAndSet(false, true)) {
Expand All @@ -112,6 +123,11 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
exec,
Duration(60, TimeUnit.SECONDS))
})
metricsReporter.foreach(reporter => {
ThreadUtils.shutdown(
reporter,
Duration(60, TimeUnit.SECONDS))
})
try {
val path = new Path(engineSavePath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
Expand Down Expand Up @@ -161,6 +177,41 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
}
}

private[kyuubi] def startMetricsReporter(): Unit = {
val interval = conf.get(ENGINE_REPORT_INTERVAL)
val engineSpace = conf.get(HA_NAMESPACE)
val statusTracker = spark.sparkContext.statusTracker
val metricsSpace = Utils.concatEngineMetricsPath(engineSpace)
val report: Runnable = () => {
if (!shutdown.get) {
val openSessionCount = backendService.sessionManager.getOpenSessionCount
val activeTask = statusTracker.getActiveStageIds()
.flatMap { stage =>
statusTracker.getStageInfo(stage).map(_.numActiveTasks)
}.sum
val engineMetrics = objectMapper.createObjectNode()
.put("openSessionCount", openSessionCount)
.put("activeTask", activeTask)
.put("poolID", engineSpace.split("-").last.toInt).toString
DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
if (client.pathNonExists(metricsSpace)) {
client.create(metricsSpace, "PERSISTENT")
}
client.setData(
Utils.concatEngineMetricsPath(engineSpace),
engineMetrics.getBytes)
}
}
}
metricsReporter =
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-engine-metrics-reporter"))
metricsReporter.get.scheduleWithFixedDelay(
report,
interval,
interval,
TimeUnit.MILLISECONDS)
}

override protected def stopServer(): Unit = {
countDownLatch.countDown()
}
Expand Down
Loading
Loading