diff --git a/benchmarks/src/gatling/resources/benchmark-defaults.conf b/benchmarks/src/gatling/resources/benchmark-defaults.conf index 8c326fb..739d282 100644 --- a/benchmarks/src/gatling/resources/benchmark-defaults.conf +++ b/benchmarks/src/gatling/resources/benchmark-defaults.conf @@ -161,4 +161,32 @@ workload { # Default: 5 duration-in-minutes = 5 } + + # Configuration for the WeightedWorkloadOnTreeDataset simulation + weighted-workload-on-tree-dataset { + # Seed used for RNG during the test + seed = 42 + + # Distributions for readers + # Each distribution will have `count` threads assigned to it + # mean / variance describe the properties of the normal distribution + # Readers will read a random table in the table space based on sampling + # Default: [{ count = 8, mean = 0.3, variance = 0.0278 }] + readers = [ + { count = 8, mean = 0.3, variance = 0.0278 } + ] + + # Distributions for writers + # Each distribution will have `count` threads assigned to it + # mean / variance describe the properties of the normal distribution + # Writers will write to a random table in the table space based on sampling + # Default: [{ count = 2, mean = 0.7, variance = 0.0278 }] + writers = [ + { count = 2, mean = 0.7, variance = 0.0278 } + ] + + # Duration of the simulation in minutes + # Default: 5 + duration-in-minutes = 5 + } } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala index a65d2b8..4cb86c3 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala @@ -31,6 +31,7 @@ import org.apache.polaris.benchmarks.parameters.ConnectionParameters import org.slf4j.LoggerFactory import java.util.concurrent.atomic.AtomicReference +import scala.concurrent.duration.DurationInt /** * Actions for performance testing authentication operations. This class provides methods to @@ -85,8 +86,9 @@ case class AuthenticationActions( .check(jsonPath("$.access_token").saveAs("accessToken")) ) .exec { session => - if (session.contains("accessToken")) + if (session.contains("accessToken") && session("accessToken") != null) { accessToken.set(session("accessToken").as[String]) + } session } @@ -96,5 +98,9 @@ case class AuthenticationActions( * scenario. */ val restoreAccessTokenInSession: ChainBuilder = - exec(session => session.set("accessToken", accessToken.get())) + asLongAs(_ => accessToken.get() == null) { + pause(1.second) + }.exec { session => + session.set("accessToken", accessToken.get()) + } } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/TableActions.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/TableActions.scala index 1d5b951..c974064 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/TableActions.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/TableActions.scala @@ -184,6 +184,7 @@ case class TableActions( http("Fetch Table") .get("/api/catalog/v1/#{catalogName}/namespaces/#{multipartNamespace}/tables/#{tableName}") .header("Authorization", "Bearer #{accessToken}") + .header("If-None-Match", "") .check(status.is(200)) .check(jsonPath("$.metadata.table-uuid").saveAs("tableUuid")) .check(jsonPath("$.metadata.location").is("#{location}")) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala index 730321c..c140a5f 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala @@ -42,6 +42,7 @@ object BenchmarkConfig { val rtdConfig = workload.getConfig("read-tree-dataset") val ctdConfig = workload.getConfig("create-tree-dataset") val rutdConfig = workload.getConfig("read-update-tree-dataset") + val wwotdConfig = workload.getConfig("weighted-workload-on-tree-dataset") WorkloadParameters( ReadTreeDatasetParameters( @@ -56,6 +57,12 @@ object BenchmarkConfig { rutdConfig.getDouble("read-write-ratio"), rutdConfig.getInt("throughput"), rutdConfig.getInt("duration-in-minutes") + ), + WeightedWorkloadOnTreeDatasetParameters( + wwotdConfig.getInt("seed"), + WeightedWorkloadOnTreeDatasetParameters.loadDistributionsList(wwotdConfig, "readers"), + WeightedWorkloadOnTreeDatasetParameters.loadDistributionsList(wwotdConfig, "writers"), + wwotdConfig.getInt("duration-in-minutes") ) ) } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala index 894cee2..2898dff 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala @@ -54,7 +54,7 @@ case class DatasetParameters( numViewProperties: Int ) { val nAryTree: NAryTreeBuilder = NAryTreeBuilder(nsWidth, nsDepth) - private val maxPossibleTables = nAryTree.numberOfLastLevelElements * numTablesPerNs + val maxPossibleTables = nAryTree.numberOfLastLevelElements * numTablesPerNs private val maxPossibleViews = nAryTree.numberOfLastLevelElements * numViewsPerNs val numTables: Int = if (numTablesMax <= 0) maxPossibleTables else numTablesMax val numViews: Int = if (numViewsMax <= 0) maxPossibleViews else numViewsMax diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala new file mode 100644 index 0000000..9505875 --- /dev/null +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.benchmarks.parameters + +import com.typesafe.config.Config +import com.typesafe.scalalogging.Logger +import org.slf4j.LoggerFactory + +import scala.jdk.CollectionConverters._ +import scala.collection.immutable.LazyList +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +/** + * Case class to hold the parameters for the WeightedWorkloadOnTreeDataset simulation. + * + * @param seed The RNG seed to use + * @param readers A seq of distrbutions to use for reading tables + * @param writers A seq of distrbutions to use for writing to tables + */ +case class WeightedWorkloadOnTreeDatasetParameters( + seed: Int, + readers: Seq[Distribution], + writers: Seq[Distribution], + durationInMinutes: Int +) { + require(readers.nonEmpty || writers.nonEmpty, "At least one reader or writer is required") + require(durationInMinutes > 0, "Duration in minutes must be positive") +} + +object WeightedWorkloadOnTreeDatasetParameters { + def loadDistributionsList(config: Config, key: String): List[Distribution] = + config.getConfigList(key).asScala.toList.map { conf => + Distribution( + count = conf.getInt("count"), + mean = conf.getDouble("mean"), + variance = conf.getDouble("variance") + ) + } +} + +case class Distribution(count: Int, mean: Double, variance: Double) { + private val logger = LoggerFactory.getLogger(getClass) + + def printDescription(dataset: DatasetParameters): Unit = { + println(s"Summary for ${this}:") + + // Visualize distributions + printVisualization(dataset.maxPossibleTables) + + // Warn if a large amount of resampling will be needed + val debugRandomNumberProvider = RandomNumberProvider("debug".hashCode, -1) + def resampleStream: LazyList[Double] = + LazyList.continually(sample(dataset.maxPossibleTables, debugRandomNumberProvider)) + + val (_, resamples) = resampleStream.zipWithIndex + .take(100000) + .find { case (value, _) => value >= 0 && value < dataset.maxPossibleTables } + .map { case (value, index) => (value, index) } + .getOrElse((-1, 100000)) + + if (resamples > 100) { + logger.warn( + s"A distribution appears to require aggressive resampling: ${this} took ${resamples + 1} samples!" + ) + } + } + + /** + * Return a value in [0, items) based on this distribution using truncated normal resampling. + */ + def sample(items: Int, randomNumberProvider: RandomNumberProvider): Int = { + val stddev = math.sqrt(variance) + // Resample until the value is in [0, 1] + val maxSamples = 100000 + val value = Iterator + .continually(randomNumberProvider.next() * stddev + mean) + .take(maxSamples) + .find(x => x >= 0.0 && x <= 1.0) + .getOrElse( + throw new RuntimeException( + s"Failed to sample a value in [0, 1] after ${maxSamples} attempts" + ) + ) + + (value * items).toInt.min(items - 1) + } + + def printVisualization(tables: Int, samples: Int = 100000, bins: Int = 10): Unit = { + val binCounts = Array.fill(bins)(0) + val hits = new mutable.HashMap[Int, Int]() + val rng = RandomNumberProvider("visualization".hashCode, -1) + + (1 to samples).foreach { _ => + val value = sample(tables, rng) + val bin = ((value.toDouble / tables) * bins).toInt.min(bins - 1) + hits.put(value, hits.getOrElse(value, 0) + 1) + binCounts(bin) += 1 + } + + val maxBarWidth = 50 + val total = binCounts.sum.toDouble + println(" Range | % of Samples | Visualization") + println(" --------------|--------------|------------------") + + (0 until bins).foreach { i => + val low = i.toDouble / bins + val high = (i + 1).toDouble / bins + val percent = binCounts(i) / total * 100 + val bar = "█" * ((percent / 100 * maxBarWidth).round.toInt) + println(f" [$low%.1f - $high%.1f) | $percent%6.2f%% | $bar") + } + println() + + val mode = hits.maxBy(_._2) + val modePercentage: Int = Math.round(mode._2.toFloat / samples * 100) + println(s" The most frequently selected table was chosen in ~${modePercentage}% of samples") + + println() + } +} + +object Distribution { + + // Map an index back to a table path + def tableIndexToIdentifier(index: Int, dp: DatasetParameters): (String, List[String], String) = { + require( + dp.numTablesMax == -1, + "Sampling is incompatible with numTablesMax settings other than -1" + ) + + val namespaceIndex = index / dp.numTablesPerNs + val namespaceOrdinal = dp.nAryTree.lastLevelOrdinals.toList.apply(namespaceIndex) + val namespacePath = dp.nAryTree.pathToRoot(namespaceOrdinal) + + (s"C_0", namespacePath.map(n => s"NS_${n}"), s"T_${index}") + } +} + +case class RandomNumberProvider(seed: Int, threadId: Int) { + private[this] val random = new Random(seed + threadId) + def next(): Double = random.nextGaussian() +} diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WorkloadParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WorkloadParameters.scala index b6fec3c..6a8c5f4 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WorkloadParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WorkloadParameters.scala @@ -22,5 +22,6 @@ package org.apache.polaris.benchmarks.parameters case class WorkloadParameters( readTreeDataset: ReadTreeDatasetParameters, createTreeDataset: CreateTreeDatasetParameters, - readUpdateTreeDataset: ReadUpdateTreeDatasetParameters + readUpdateTreeDataset: ReadUpdateTreeDatasetParameters, + weightedWorkloadOnTreeDataset: WeightedWorkloadOnTreeDatasetParameters ) {} diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala new file mode 100644 index 0000000..a996a34 --- /dev/null +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.benchmarks.simulations + +import io.gatling.core.Predef._ +import io.gatling.core.structure.ScenarioBuilder +import io.gatling.http.Predef._ +import org.apache.polaris.benchmarks.actions._ +import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config +import org.apache.polaris.benchmarks.parameters.{ + ConnectionParameters, + DatasetParameters, + Distribution, + RandomNumberProvider, + WorkloadParameters +} +import org.slf4j.LoggerFactory + +import java.util.concurrent.atomic.AtomicReference +import scala.concurrent.duration._ + +/** + * This simulation performs reads and writes based on distributions specified in the config. It + * allows the simulation of workloads where e.g. a small fraction of tables get most writes. It is + * intended to be used against a Polaris instance with a pre-existing tree dataset. + */ +class WeightedWorkloadOnTreeDataset extends Simulation { + private val logger = LoggerFactory.getLogger(getClass) + + // -------------------------------------------------------------------------------- + // Load parameters + // -------------------------------------------------------------------------------- + val cp: ConnectionParameters = config.connectionParameters + val dp: DatasetParameters = config.datasetParameters + val wp: WorkloadParameters = config.workloadParameters + + println("### Reader distributions ###") + wp.weightedWorkloadOnTreeDataset.readers.foreach(_.printDescription(dp)) + + println("### Writer distributions ###") + wp.weightedWorkloadOnTreeDataset.writers.foreach(_.printDescription(dp)) + + // -------------------------------------------------------------------------------- + // Helper values + // -------------------------------------------------------------------------------- + private val accessToken: AtomicReference[String] = new AtomicReference() + + private val authActions = AuthenticationActions(cp, accessToken) + private val tblActions = TableActions(dp, wp, accessToken) + + // -------------------------------------------------------------------------------- + // Authentication related workloads + // -------------------------------------------------------------------------------- + val refreshOauthForDuration: ScenarioBuilder = + scenario("Authenticate every minute using the Iceberg REST API") + .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) { + feed(authActions.feeder()) + .exec(authActions.authenticateAndSaveAccessToken) + .pause(30.seconds) + } + + val waitForAuthentication: ScenarioBuilder = + scenario("Wait for the authentication token to be available") + .asLongAs(_ => accessToken.get() == null) { + pause(1.second) + } + + // -------------------------------------------------------------------------------- + // Build up the HTTP protocol configuration and set up the simulation + // -------------------------------------------------------------------------------- + private val httpProtocol = http + .baseUrl(cp.baseUrl) + .acceptHeader("application/json") + .contentTypeHeader("application/json") + .disableCaching + + // -------------------------------------------------------------------------------- + // Create all reader/writer scenarios and prepare them for injection + // -------------------------------------------------------------------------------- + private val readerScenarioBuilders: List[ScenarioBuilder] = + wp.weightedWorkloadOnTreeDataset.readers.zipWithIndex.flatMap { case (dist, i) => + (0 until dist.count).map { threadId => + val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, i * 1000 + threadId) + scenario(s"Reader-$i-$threadId") + .exec(authActions.restoreAccessTokenInSession) + .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) { + exec { session => + val tableIndex = dist.sample(dp.maxPossibleTables, rnp) + val (catalog, namespace, table) = + Distribution.tableIndexToIdentifier(tableIndex, dp) + + // Checked in `fetchTable` + val expectedProperties: Map[String, String] = (0 until dp.numTableProperties) + .map(id => s"InitialAttribute_$id" -> s"$id") + .toMap + val expectedLocation = + s"${dp.defaultBaseLocation}/$catalog/${namespace.mkString("/")}/${table}" + + session + .set("catalogName", catalog) + .set("multipartNamespace", namespace.mkString(0x1f.toChar.toString)) + .set("tableName", table) + .set("initialProperties", expectedProperties) + .set("location", expectedLocation) + }.exec(tblActions.fetchTable) + } + } + }.toList + + private val writerScenarioBuilders: List[ScenarioBuilder] = + wp.weightedWorkloadOnTreeDataset.writers.zipWithIndex.flatMap { case (dist, i) => + (0 until dist.count).map { threadId => + val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, i * 2000 + threadId) + scenario(s"Writer-$i-$threadId") + .exec(authActions.restoreAccessTokenInSession) + .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) { + exec { session => + val tableIndex = dist.sample(dp.maxPossibleTables, rnp) + val (catalog, namespace, table) = + Distribution.tableIndexToIdentifier(tableIndex, dp) + + // Needed for `updateTable` + val now = System.currentTimeMillis + val newProperty = s"""{"last_updated": "${now}"}""" + + session + .set("catalogName", catalog) + .set("multipartNamespace", namespace.mkString(0x1f.toChar.toString)) + .set("tableName", table) + .set("newProperty", newProperty) + }.exec(tblActions.updateTable) + } + } + }.toList + + // -------------------------------------------------------------------------------- + // Setup + // -------------------------------------------------------------------------------- + setUp( + List( + refreshOauthForDuration.inject(atOnceUsers(1)).protocols(httpProtocol), + waitForAuthentication.inject(atOnceUsers(1)).protocols(httpProtocol) + ) ++ + readerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol)) ++ + writerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol)) + ) +}