Skip to content

Commit af19ce0

Browse files
committed
fix style
1 parent bf878c9 commit af19ce0

File tree

2 files changed

+27
-25
lines changed

2 files changed

+27
-25
lines changed

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala

+7-7
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
2525
import scala.concurrent.duration.Duration
2626
import scala.util.control.NonFatal
2727

28+
import com.fasterxml.jackson.databind.ObjectMapper
2829
import com.google.common.annotations.VisibleForTesting
2930
import org.apache.hadoop.fs.Path
3031
import org.apache.spark.{ui, SparkConf}
@@ -56,7 +57,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
5657

5758
private val shutdown = new AtomicBoolean(false)
5859
private val gracefulStopDeregistered = new AtomicBoolean(false)
59-
60+
private val objectMapper = new ObjectMapper
6061
@volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None
6162
@volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
6263
private lazy val engineSavePath =
@@ -188,18 +189,17 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
188189
.flatMap { stage =>
189190
statusTracker.getStageInfo(stage).map(_.numActiveTasks)
190191
}.sum
191-
val engineMetrics = Map(
192-
"openSessionCount" -> openSessionCount,
193-
"activeTask" -> activeTask,
194-
"poolId" -> engineSpace.split("-").last)
195-
info(s"Spark engine has $openSessionCount open sessions and $activeTask active tasks.")
192+
val engineMetrics = objectMapper.createObjectNode()
193+
.put("openSessionCount", openSessionCount)
194+
.put("activeTask", activeTask)
195+
.put("poolID", engineSpace.split("-").last.toInt).toString
196196
DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
197197
if (client.pathNonExists(metricsSpace)) {
198198
client.create(metricsSpace, "PERSISTENT")
199199
}
200200
client.setData(
201201
s"/metrics$engineSpace",
202-
engineMetrics.map { case (k, v) => s"$k=$v" }.mkString(";").getBytes)
202+
engineMetrics.getBytes)
203203
}
204204
}
205205
}

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala

+20-18
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import scala.collection.JavaConverters._
2323
import scala.util.Random
2424

2525
import com.codahale.metrics.MetricRegistry
26+
import com.fasterxml.jackson.databind.ObjectMapper
27+
import com.fasterxml.jackson.module.scala.DefaultScalaModule
2628
import com.google.common.annotations.VisibleForTesting
2729

2830
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Logging, Utils}
@@ -66,6 +68,8 @@ private[kyuubi] class EngineRef(
6668

6769
private val timeout: Long = conf.get(ENGINE_INIT_TIMEOUT)
6870

71+
private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
72+
6973
// Share level of the engine
7074
private val shareLevel: ShareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL))
7175

@@ -371,10 +375,10 @@ private[kyuubi] class EngineRef(
371375
}
372376
}
373377

374-
def getAdaptivePoolId(poolSize: Int): Int = {
378+
private def getAdaptivePoolId(poolSize: Int): Int = {
375379
val sessionThreshold = conf.get(ENGINE_POOL_ADAPTIVE_SESSION_THRESHOLD)
376380
val metricsSpace =
377-
s"/metrics/${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}/$user"
381+
s"/metrics/${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType/$user"
378382
DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
379383
tryWithLock(client) {
380384
if (client.pathNonExists(metricsSpace)) {
@@ -387,25 +391,23 @@ private[kyuubi] class EngineRef(
387391
} else {
388392
engineType match {
389393
case SPARK_SQL =>
390-
val engineMetricsMap = metrics.map(p =>
391-
new String(client.getData(s"$metricsSpace/$p"))
392-
.split(";")
393-
.map(_.split("=", 2))
394-
.filter(_.length == 2)
395-
.map(kv => (kv.head, kv.last.toInt))
396-
.toMap)
394+
val engineMetricsMap = metrics.map { p =>
395+
objectMapper.readValue(
396+
new String(client.getData(s"$metricsSpace/$p")),
397+
classOf[Map[String, Int]])
398+
}
397399
if (engineMetricsMap.isEmpty) {
398400
return Random.nextInt(poolSize)
399401
} else {
400-
val sortedEngineMetrics = engineMetricsMap.sortBy { map =>
401-
(
402-
map.getOrElse("openSessionCount", sessionThreshold),
403-
map.getOrElse("activeTask", 0))
404-
}
405-
val candidate = sortedEngineMetrics.head
406-
if (candidate.contains("poolId") && (candidate(
407-
"openSessionCount") < sessionThreshold || metrics.size == poolSize)) {
408-
candidate("poolId")
402+
val candidate = engineMetricsMap.filter(_.contains("poolID"))
403+
.minBy { map =>
404+
(
405+
map.getOrElse("openSessionCount", sessionThreshold),
406+
map.getOrElse("activeTask", 0))
407+
}
408+
if ((candidate.nonEmpty && candidate("openSessionCount") < sessionThreshold) ||
409+
metrics.size == poolSize) {
410+
candidate("poolID")
409411
} else {
410412
Random.nextInt(poolSize)
411413
}

0 commit comments

Comments
 (0)