Skip to content

Commit f01d130

Browse files
committed
fix style
1 parent 1872fc6 commit f01d130

File tree

2 files changed

+27
-25
lines changed

2 files changed

+27
-25
lines changed

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
2525
import scala.concurrent.duration.Duration
2626
import scala.util.control.NonFatal
2727

28+
import com.fasterxml.jackson.databind.ObjectMapper
2829
import com.google.common.annotations.VisibleForTesting
2930
import org.apache.hadoop.fs.Path
3031
import org.apache.spark.{ui, SparkConf}
@@ -57,7 +58,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
5758

5859
private val shutdown = new AtomicBoolean(false)
5960
private val gracefulStopDeregistered = new AtomicBoolean(false)
60-
61+
private val objectMapper = new ObjectMapper
6162
@volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None
6263
@volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
6364
@volatile private var engineSavePath: Option[String] = None
@@ -185,18 +186,17 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
185186
.flatMap { stage =>
186187
statusTracker.getStageInfo(stage).map(_.numActiveTasks)
187188
}.sum
188-
val engineMetrics = Map(
189-
"openSessionCount" -> openSessionCount,
190-
"activeTask" -> activeTask,
191-
"poolId" -> engineSpace.split("-").last)
192-
info(s"Spark engine has $openSessionCount open sessions and $activeTask active tasks.")
189+
val engineMetrics = objectMapper.createObjectNode()
190+
.put("openSessionCount", openSessionCount)
191+
.put("activeTask", activeTask)
192+
.put("poolID", engineSpace.split("-").last.toInt).toString
193193
DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
194194
if (client.pathNonExists(metricsSpace)) {
195195
client.create(metricsSpace, "PERSISTENT")
196196
}
197197
client.setData(
198198
s"/metrics$engineSpace",
199-
engineMetrics.map { case (k, v) => s"$k=$v" }.mkString(";").getBytes)
199+
engineMetrics.getBytes)
200200
}
201201
}
202202
}

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import scala.collection.JavaConverters._
2323
import scala.util.Random
2424

2525
import com.codahale.metrics.MetricRegistry
26+
import com.fasterxml.jackson.databind.ObjectMapper
27+
import com.fasterxml.jackson.module.scala.DefaultScalaModule
2628
import com.google.common.annotations.VisibleForTesting
2729

2830
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Logging, Utils}
@@ -65,6 +67,8 @@ private[kyuubi] class EngineRef(
6567

6668
private val timeout: Long = conf.get(ENGINE_INIT_TIMEOUT)
6769

70+
private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
71+
6872
// Share level of the engine
6973
private val shareLevel: ShareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL))
7074

@@ -324,10 +328,10 @@ private[kyuubi] class EngineRef(
324328
}
325329
}
326330

327-
def getAdaptivePoolId(poolSize: Int): Int = {
331+
private def getAdaptivePoolId(poolSize: Int): Int = {
328332
val sessionThreshold = conf.get(ENGINE_POOL_ADAPTIVE_SESSION_THRESHOLD)
329333
val metricsSpace =
330-
s"/metrics/${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}/$user"
334+
s"/metrics/${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType/$user"
331335
DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
332336
tryWithLock(client) {
333337
if (client.pathNonExists(metricsSpace)) {
@@ -340,25 +344,23 @@ private[kyuubi] class EngineRef(
340344
} else {
341345
engineType match {
342346
case SPARK_SQL =>
343-
val engineMetricsMap = metrics.map(p =>
344-
new String(client.getData(s"$metricsSpace/$p"))
345-
.split(";")
346-
.map(_.split("=", 2))
347-
.filter(_.length == 2)
348-
.map(kv => (kv.head, kv.last.toInt))
349-
.toMap)
347+
val engineMetricsMap = metrics.map { p =>
348+
objectMapper.readValue(
349+
new String(client.getData(s"$metricsSpace/$p")),
350+
classOf[Map[String, Int]])
351+
}
350352
if (engineMetricsMap.isEmpty) {
351353
return Random.nextInt(poolSize)
352354
} else {
353-
val sortedEngineMetrics = engineMetricsMap.sortBy { map =>
354-
(
355-
map.getOrElse("openSessionCount", sessionThreshold),
356-
map.getOrElse("activeTask", 0))
357-
}
358-
val candidate = sortedEngineMetrics.head
359-
if (candidate.contains("poolId") && (candidate(
360-
"openSessionCount") < sessionThreshold || metrics.size == poolSize)) {
361-
candidate("poolId")
355+
val candidate = engineMetricsMap.filter(_.contains("poolID"))
356+
.minBy { map =>
357+
(
358+
map.getOrElse("openSessionCount", sessionThreshold),
359+
map.getOrElse("activeTask", 0))
360+
}
361+
if ((candidate.nonEmpty && candidate("openSessionCount") < sessionThreshold) ||
362+
metrics.size == poolSize) {
363+
candidate("poolID")
362364
} else {
363365
Random.nextInt(poolSize)
364366
}

0 commit comments

Comments
 (0)