From d49468057c2005186d6f7343261d9c83c78356cb Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Tue, 29 Apr 2025 17:50:35 -0700 Subject: [PATCH 01/28] initial commit --- .../gatling/resources/benchmark-defaults.conf | 22 +++ .../parameters/BenchmarkConfig.scala | 7 + ...ghtedWorkloadOnTreeDatasetParameters.scala | 54 ++++++ .../parameters/WorkloadParameters.scala | 3 +- .../WeightedWorkloadOnTreeDataset.scala | 169 ++++++++++++++++++ 5 files changed, 254 insertions(+), 1 deletion(-) create mode 100644 benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala create mode 100644 benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala diff --git a/benchmarks/src/gatling/resources/benchmark-defaults.conf b/benchmarks/src/gatling/resources/benchmark-defaults.conf index 8c326fb..af8624f 100644 --- a/benchmarks/src/gatling/resources/benchmark-defaults.conf +++ b/benchmarks/src/gatling/resources/benchmark-defaults.conf @@ -161,4 +161,26 @@ workload { # Default: 5 duration-in-minutes = 5 } + + # Configuration for the WeightedWorkloadOnTreeDataset simulation + weighted-workload-on-tree-dataset { + # Seed used for RNG during the test + seed = 42 + + # Distributions for readers + # Each distribution will have `count` threads assigned to it + # mean / variance describe the properties of the normal distribution + # Readers will read a random table in the table space based on sampling + readers = [ + { count = 8, mean = 0.3, variance = 0.0278 } + ] + + # Distributions for writers + # Each distribution will have `count` threads assigned to it + # mean / variance describe the properties of the normal distribution + # Writers will write to a random table in the table space based on sampling + writers = [ + { count = 2, mean = 0.7, variance = 0.0278 } + ] + } } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala index 730321c..c140a5f 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala @@ -42,6 +42,7 @@ object BenchmarkConfig { val rtdConfig = workload.getConfig("read-tree-dataset") val ctdConfig = workload.getConfig("create-tree-dataset") val rutdConfig = workload.getConfig("read-update-tree-dataset") + val wwotdConfig = workload.getConfig("weighted-workload-on-tree-dataset") WorkloadParameters( ReadTreeDatasetParameters( @@ -56,6 +57,12 @@ object BenchmarkConfig { rutdConfig.getDouble("read-write-ratio"), rutdConfig.getInt("throughput"), rutdConfig.getInt("duration-in-minutes") + ), + WeightedWorkloadOnTreeDatasetParameters( + wwotdConfig.getInt("seed"), + WeightedWorkloadOnTreeDatasetParameters.loadDistributionsList(wwotdConfig, "readers"), + WeightedWorkloadOnTreeDatasetParameters.loadDistributionsList(wwotdConfig, "writers"), + wwotdConfig.getInt("duration-in-minutes") ) ) } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala new file mode 100644 index 0000000..bd72593 --- /dev/null +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.benchmarks.parameters + +import com.typesafe.config.Config + +import scala.collection.JavaConverters._ + +/** + * Case class to hold the parameters for the WeightedWorkloadOnTreeDataset simulation. + * + * @param seed The RNG seed to use + * @param readers A seq of distrbutions to use for reading tables + * @param writers A seq of distrbutions to use for writing to tables + */ +case class WeightedWorkloadOnTreeDatasetParameters( + seed: Int, + readers: Seq[Distribution], + writers: Seq[Distribution], + durationInMinutes: Int) { + require(readers.nonEmpty || writers.nonEmpty, "At least one reader or writer is required") + require(durationInMinutes > 0, "Duration in minutes must be positive") +} + +object WeightedWorkloadOnTreeDatasetParameters { + def loadDistributionsList(config: Config, key: String): List[Distribution] = { + config.getConfigList(key).asScala.toList.map { conf => + Distribution( + count = conf.getInt("count"), + mean = conf.getDouble("mean"), + variance = conf.getDouble("variance") + ) + } + } +} + +case class Distribution(count: Int, mean: Double, variance: Double) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WorkloadParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WorkloadParameters.scala index b6fec3c..6a8c5f4 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WorkloadParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WorkloadParameters.scala @@ -22,5 +22,6 @@ package org.apache.polaris.benchmarks.parameters case class WorkloadParameters( readTreeDataset: ReadTreeDatasetParameters, createTreeDataset: CreateTreeDatasetParameters, - readUpdateTreeDataset: ReadUpdateTreeDatasetParameters + readUpdateTreeDataset: ReadUpdateTreeDatasetParameters, + weightedWorkloadOnTreeDataset: WeightedWorkloadOnTreeDatasetParameters ) {} diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala new file mode 100644 index 0000000..1ae3f20 --- /dev/null +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.benchmarks.simulations + +import io.gatling.core.Predef._ +import io.gatling.core.structure.ScenarioBuilder +import io.gatling.http.Predef._ +import org.apache.polaris.benchmarks.actions._ +import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config +import org.apache.polaris.benchmarks.parameters.WorkloadParameters +import org.slf4j.LoggerFactory + +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference} +import scala.concurrent.duration.DurationInt + +/** + * This simulation performs reads and writes based on distributions specified in the config. It allows + * the simulation of workloads where e.g. a small fraction of tables get most writes. It is intended to + * be used against a Polaris instance with a pre-existing tree dataset. + */ +class WeightedWorkloadOnTreeDataset extends Simulation { + private val logger = LoggerFactory.getLogger(getClass) + + // -------------------------------------------------------------------------------- + // Load parameters + // -------------------------------------------------------------------------------- + private val cp = config.connectionParameters + private val dp = config.datasetParameters + val wp: WorkloadParameters = config.workloadParameters + + // -------------------------------------------------------------------------------- + // Helper values + // -------------------------------------------------------------------------------- + private val numNamespaces: Int = dp.nAryTree.numberOfNodes + private val accessToken: AtomicReference[String] = new AtomicReference() + private val shouldRefreshToken: AtomicBoolean = new AtomicBoolean(true) + + private val authenticationActions = AuthenticationActions(cp, accessToken) + private val catalogActions = CatalogActions(dp, accessToken) + private val namespaceActions = NamespaceActions(dp, wp, accessToken) + private val tableActions = TableActions(dp, wp, accessToken) + private val viewActions = ViewActions(dp, wp, accessToken) + + private val verifiedCatalogs = new AtomicInteger() + private val verifiedNamespaces = new AtomicInteger() + private val verifiedTables = new AtomicInteger() + private val verifiedViews = new AtomicInteger() + + // -------------------------------------------------------------------------------- + // Authentication related workloads: + // * Authenticate and store the access token for later use every minute + // * Wait for an OAuth token to be available + // * Stop the token refresh loop + // -------------------------------------------------------------------------------- + val continuouslyRefreshOauthToken: ScenarioBuilder = + scenario("Authenticate every minute using the Iceberg REST API") + .asLongAs(_ => shouldRefreshToken.get()) { + feed(authenticationActions.feeder()) + .exec(authenticationActions.authenticateAndSaveAccessToken) + .pause(1.minute) + } + + val waitForAuthentication: ScenarioBuilder = + scenario("Wait for the authentication token to be available") + .asLongAs(_ => accessToken.get() == null) { + pause(1.second) + } + + val stopRefreshingToken: ScenarioBuilder = + scenario("Stop refreshing the authentication token") + .exec { session => + shouldRefreshToken.set(false) + session + } + + // -------------------------------------------------------------------------------- + // Workload: Verify each catalog + // -------------------------------------------------------------------------------- + private val verifyCatalogs = scenario("Verify catalogs using the Polaris Management REST API") + .exec(authenticationActions.restoreAccessTokenInSession) + .asLongAs(session => + verifiedCatalogs.getAndIncrement() < dp.numCatalogs && session.contains("accessToken") + )( + feed(catalogActions.feeder()) + .exec(catalogActions.fetchCatalog) + ) + + // -------------------------------------------------------------------------------- + // Workload: Verify namespaces + // -------------------------------------------------------------------------------- + private val verifyNamespaces = scenario("Verify namespaces using the Iceberg REST API") + .exec(authenticationActions.restoreAccessTokenInSession) + .asLongAs(session => + verifiedNamespaces.getAndIncrement() < numNamespaces && session.contains("accessToken") + )( + feed(namespaceActions.namespaceFetchFeeder()) + .exec(namespaceActions.fetchAllChildrenNamespaces) + .exec(namespaceActions.checkNamespaceExists) + .exec(namespaceActions.fetchNamespace) + ) + + // -------------------------------------------------------------------------------- + // Workload: Verify tables + // -------------------------------------------------------------------------------- + private val verifyTables = scenario("Verify tables using the Iceberg REST API") + .exec(authenticationActions.restoreAccessTokenInSession) + .asLongAs(session => + verifiedTables.getAndIncrement() < dp.numTables && session.contains("accessToken") + )( + feed(tableActions.tableFetchFeeder()) + .exec(tableActions.fetchAllTables) + .exec(tableActions.checkTableExists) + .exec(tableActions.fetchTable) + ) + + // -------------------------------------------------------------------------------- + // Workload: Verify views + // -------------------------------------------------------------------------------- + private val verifyViews = scenario("Verify views using the Iceberg REST API") + .exec(authenticationActions.restoreAccessTokenInSession) + .asLongAs(session => + verifiedViews.getAndIncrement() < dp.numViews && session.contains("accessToken") + )( + feed(viewActions.viewFetchFeeder()) + .exec(viewActions.fetchAllViews) + .exec(viewActions.checkViewExists) + .exec(viewActions.fetchView) + ) + + // -------------------------------------------------------------------------------- + // Build up the HTTP protocol configuration and set up the simulation + // -------------------------------------------------------------------------------- + private val httpProtocol = http + .baseUrl(cp.baseUrl) + .acceptHeader("application/json") + .contentTypeHeader("application/json") + + // Get the configured throughput for tables and views + private val tableThroughput = wp.readTreeDataset.tableThroughput + private val viewThroughput = wp.readTreeDataset.viewThroughput + + setUp( + continuouslyRefreshOauthToken.inject(atOnceUsers(1)).protocols(httpProtocol), + waitForAuthentication + .inject(atOnceUsers(1)) + .andThen(verifyCatalogs.inject(atOnceUsers(1)).protocols(httpProtocol)) + .andThen(verifyNamespaces.inject(atOnceUsers(dp.nsDepth)).protocols(httpProtocol)) + .andThen(verifyTables.inject(atOnceUsers(tableThroughput)).protocols(httpProtocol)) + .andThen(verifyViews.inject(atOnceUsers(viewThroughput)).protocols(httpProtocol)) + .andThen(stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol)) + ) +} From cbdb7e1264b8794f2d75a1da12ae8fdd722ec830 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Tue, 29 Apr 2025 20:34:38 -0700 Subject: [PATCH 02/28] slicing --- .../parameters/DatasetParameters.scala | 4 + ...ghtedWorkloadOnTreeDatasetParameters.scala | 65 ++++++++- .../WeightedWorkloadOnTreeDataset.scala | 124 ++++++------------ 3 files changed, 107 insertions(+), 86 deletions(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala index 894cee2..7e1c933 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala @@ -78,4 +78,8 @@ case class DatasetParameters( numViewsMax == -1 || numViewsMax <= maxPossibleViews, s"Maximum number of views ($numViewsMax) cannot exceed computed total ($maxPossibleViews)" ) + + val totalTables: Int = Math.min( + numCatalogs * Math.max(numTablesMax, Int.MaxValue), + numCatalogs * numTablesPerNs * nAryTree.numberOfLastLevelElements) } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala index bd72593..68c7341 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala @@ -22,6 +22,8 @@ package org.apache.polaris.benchmarks.parameters import com.typesafe.config.Config import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.util.Random /** * Case class to hold the parameters for the WeightedWorkloadOnTreeDataset simulation. @@ -51,4 +53,65 @@ object WeightedWorkloadOnTreeDatasetParameters { } } -case class Distribution(count: Int, mean: Double, variance: Double) +case class Distribution(count: Int, mean: Double, variance: Double) { + /** + * Return a value in [0, items) based on this distribution using truncated normal resampling. + */ + def sample(items: Int, randomNumberProvider: RandomNumberProvider): Int = { + val stddev = math.sqrt(variance) + // Resample until the value is in [0, 1] + val value = Iterator + .continually(randomNumberProvider.next() * stddev + mean) + .dropWhile(x => x < 0.0 || x > 1.0) + .next() + + (value * items).toInt.min(items - 1) + } +} + +object Distribution { + + // Used to represent a range of values where start is inclusive and end is exclusive + private case class Slice(start: Int, end: Int) { + def size: Int = end - start + } + + // Partition a Slice into a sub-slice + private def partitionSlice(slice: Slice, totalPartitions: Int, partition: Int): Slice = { + require(partition >= 0 && partition < totalPartitions, s"Invalid partition index: $partition") + val total = slice.end - slice.start + val baseSize = total / totalPartitions + val remainder = total % totalPartitions + + val extra = if (partition < remainder) 1 else 0 + val offset = partition * baseSize + math.min(partition, remainder) + val start = slice.start + offset + val end = start + baseSize + extra + + Slice(start, end) + } + + // Map an index back to a table path + def tableIndexToIdentifier(index: Int, dp: DatasetParameters): (String, List[String], String) = { + require(dp.numTablesMax == -1, "Sampling is incompatible with numTablesMax settings other than -1") + + val tablesPerCatalog = dp.totalTables / dp.numCatalogs + val catalogIndex = index / tablesPerCatalog + + var currentSlice = partitionSlice(Slice(0, dp.totalTables), dp.numCatalogs, catalogIndex) + val namespacePath = ArrayBuffer[String]() + (0 until dp.nsDepth).foreach { _ => + val indexInSlice = index - currentSlice.start + val namespaceIndex = indexInSlice / dp.nsWidth + currentSlice = partitionSlice(currentSlice, dp.nsWidth, namespaceIndex) + namespacePath += s"NS_${namespaceIndex}" + } + require(currentSlice.size == dp.numTablesPerNs, "The final slice size should match numTablesPerNs") + (s"C_$catalogIndex", namespacePath.toList, s"T_${index - currentSlice.start}") + } +} + +case class RandomNumberProvider(seed: Int, threadId: Int) { + private[this] val random = new Random(seed + threadId) + def next(): Double = random.nextGaussian() +} diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala index 1ae3f20..f2b382c 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -20,15 +20,16 @@ package org.apache.polaris.benchmarks.simulations import io.gatling.core.Predef._ -import io.gatling.core.structure.ScenarioBuilder +import io.gatling.core.structure.{ChainBuilder, PopulationBuilder, ScenarioBuilder} import io.gatling.http.Predef._ import org.apache.polaris.benchmarks.actions._ import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config -import org.apache.polaris.benchmarks.parameters.WorkloadParameters +import org.apache.polaris.benchmarks.parameters.{ConnectionParameters, DatasetParameters, Distribution, RandomNumberProvider, WorkloadParameters} +import org.apache.polaris.benchmarks.util.CircularIterator import org.slf4j.LoggerFactory -import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference} -import scala.concurrent.duration.DurationInt +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} +import scala.concurrent.duration._ /** * This simulation performs reads and writes based on distributions specified in the config. It allows @@ -41,27 +42,18 @@ class WeightedWorkloadOnTreeDataset extends Simulation { // -------------------------------------------------------------------------------- // Load parameters // -------------------------------------------------------------------------------- - private val cp = config.connectionParameters - private val dp = config.datasetParameters + val cp: ConnectionParameters = config.connectionParameters + val dp: DatasetParameters = config.datasetParameters val wp: WorkloadParameters = config.workloadParameters // -------------------------------------------------------------------------------- // Helper values // -------------------------------------------------------------------------------- - private val numNamespaces: Int = dp.nAryTree.numberOfNodes private val accessToken: AtomicReference[String] = new AtomicReference() private val shouldRefreshToken: AtomicBoolean = new AtomicBoolean(true) - private val authenticationActions = AuthenticationActions(cp, accessToken) - private val catalogActions = CatalogActions(dp, accessToken) - private val namespaceActions = NamespaceActions(dp, wp, accessToken) - private val tableActions = TableActions(dp, wp, accessToken) - private val viewActions = ViewActions(dp, wp, accessToken) - - private val verifiedCatalogs = new AtomicInteger() - private val verifiedNamespaces = new AtomicInteger() - private val verifiedTables = new AtomicInteger() - private val verifiedViews = new AtomicInteger() + private val authActions = AuthenticationActions(cp, accessToken) + private val tblActions = TableActions(dp, wp, accessToken) // -------------------------------------------------------------------------------- // Authentication related workloads: @@ -72,8 +64,8 @@ class WeightedWorkloadOnTreeDataset extends Simulation { val continuouslyRefreshOauthToken: ScenarioBuilder = scenario("Authenticate every minute using the Iceberg REST API") .asLongAs(_ => shouldRefreshToken.get()) { - feed(authenticationActions.feeder()) - .exec(authenticationActions.authenticateAndSaveAccessToken) + feed(authActions.feeder()) + .exec(authActions.authenticateAndSaveAccessToken) .pause(1.minute) } @@ -90,59 +82,29 @@ class WeightedWorkloadOnTreeDataset extends Simulation { session } - // -------------------------------------------------------------------------------- - // Workload: Verify each catalog - // -------------------------------------------------------------------------------- - private val verifyCatalogs = scenario("Verify catalogs using the Polaris Management REST API") - .exec(authenticationActions.restoreAccessTokenInSession) - .asLongAs(session => - verifiedCatalogs.getAndIncrement() < dp.numCatalogs && session.contains("accessToken") - )( - feed(catalogActions.feeder()) - .exec(catalogActions.fetchCatalog) - ) - - // -------------------------------------------------------------------------------- - // Workload: Verify namespaces - // -------------------------------------------------------------------------------- - private val verifyNamespaces = scenario("Verify namespaces using the Iceberg REST API") - .exec(authenticationActions.restoreAccessTokenInSession) - .asLongAs(session => - verifiedNamespaces.getAndIncrement() < numNamespaces && session.contains("accessToken") - )( - feed(namespaceActions.namespaceFetchFeeder()) - .exec(namespaceActions.fetchAllChildrenNamespaces) - .exec(namespaceActions.checkNamespaceExists) - .exec(namespaceActions.fetchNamespace) - ) - - // -------------------------------------------------------------------------------- - // Workload: Verify tables - // -------------------------------------------------------------------------------- - private val verifyTables = scenario("Verify tables using the Iceberg REST API") - .exec(authenticationActions.restoreAccessTokenInSession) - .asLongAs(session => - verifiedTables.getAndIncrement() < dp.numTables && session.contains("accessToken") - )( - feed(tableActions.tableFetchFeeder()) - .exec(tableActions.fetchAllTables) - .exec(tableActions.checkTableExists) - .exec(tableActions.fetchTable) - ) + // Build the population with readers + writers + val weightedWorkloadPopulation: List[PopulationBuilder] = { + val readers = wp.weightedWorkloadOnTreeDataset.readers.zipWithIndex.flatMap { case (dist, i) => + (0 until dist.count).map { threadId => + val rnp = RandomNumberProvider( + wp.weightedWorkloadOnTreeDataset.seed, i * 1000 + threadId) + scenario(s"Reader-$i-$threadId") + .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) { + exec { session => + val tableIndex = dist.sample(dp.totalTables, rnp) + val (catalog, namespace, table) = + Distribution.tableIndexToIdentifier(tableIndex, dp) + session.set("catalogName", catalog) + session.set("multipartNamespace", namespace.mkString(0x1F.toChar.toString)) + session.set("tableName", table) + }.exec(tblActions.fetchTable) + }.inject(atOnceUsers(1)) + } + }.toList - // -------------------------------------------------------------------------------- - // Workload: Verify views - // -------------------------------------------------------------------------------- - private val verifyViews = scenario("Verify views using the Iceberg REST API") - .exec(authenticationActions.restoreAccessTokenInSession) - .asLongAs(session => - verifiedViews.getAndIncrement() < dp.numViews && session.contains("accessToken") - )( - feed(viewActions.viewFetchFeeder()) - .exec(viewActions.fetchAllViews) - .exec(viewActions.checkViewExists) - .exec(viewActions.fetchView) - ) + val writers = List.empty + readers ++ writers + } // -------------------------------------------------------------------------------- // Build up the HTTP protocol configuration and set up the simulation @@ -152,18 +114,10 @@ class WeightedWorkloadOnTreeDataset extends Simulation { .acceptHeader("application/json") .contentTypeHeader("application/json") - // Get the configured throughput for tables and views - private val tableThroughput = wp.readTreeDataset.tableThroughput - private val viewThroughput = wp.readTreeDataset.viewThroughput - - setUp( - continuouslyRefreshOauthToken.inject(atOnceUsers(1)).protocols(httpProtocol), - waitForAuthentication - .inject(atOnceUsers(1)) - .andThen(verifyCatalogs.inject(atOnceUsers(1)).protocols(httpProtocol)) - .andThen(verifyNamespaces.inject(atOnceUsers(dp.nsDepth)).protocols(httpProtocol)) - .andThen(verifyTables.inject(atOnceUsers(tableThroughput)).protocols(httpProtocol)) - .andThen(verifyViews.inject(atOnceUsers(viewThroughput)).protocols(httpProtocol)) - .andThen(stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol)) - ) + + // -------------------------------------------------------------------------------- + // Setup + // -------------------------------------------------------------------------------- + // TODO add the weightedWorkloadPopulation here + } From de53bf66b4010c280a3e3a8a714fbcb7038354ef Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Tue, 29 Apr 2025 20:39:30 -0700 Subject: [PATCH 03/28] compiles --- .../simulations/WeightedWorkloadOnTreeDataset.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala index f2b382c..7b96f28 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -114,10 +114,14 @@ class WeightedWorkloadOnTreeDataset extends Simulation { .acceptHeader("application/json") .contentTypeHeader("application/json") - // -------------------------------------------------------------------------------- // Setup // -------------------------------------------------------------------------------- - // TODO add the weightedWorkloadPopulation here + val allBuilders: List[PopulationBuilder] = + List( + continuouslyRefreshOauthToken.inject(atOnceUsers(1)).protocols(httpProtocol)) ++ + weightedWorkloadPopulation ++ + List(stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol)) + setUp(allBuilders) } From af4a733123633791fa9720aabde03132308983bc Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Tue, 29 Apr 2025 20:40:35 -0700 Subject: [PATCH 04/28] fix file --- benchmarks/src/gatling/resources/benchmark-defaults.conf | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/benchmarks/src/gatling/resources/benchmark-defaults.conf b/benchmarks/src/gatling/resources/benchmark-defaults.conf index af8624f..444cab1 100644 --- a/benchmarks/src/gatling/resources/benchmark-defaults.conf +++ b/benchmarks/src/gatling/resources/benchmark-defaults.conf @@ -182,5 +182,9 @@ workload { writers = [ { count = 2, mean = 0.7, variance = 0.0278 } ] + + # Duration of the simulation in minutes + # Default: 10 + duration-in-minutes = 10 } } From 75d553b0e974aafe333f38cc54e3254fa8aface3 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Tue, 29 Apr 2025 20:42:10 -0700 Subject: [PATCH 05/28] defaults --- benchmarks/src/gatling/resources/benchmark-defaults.conf | 3 +++ 1 file changed, 3 insertions(+) diff --git a/benchmarks/src/gatling/resources/benchmark-defaults.conf b/benchmarks/src/gatling/resources/benchmark-defaults.conf index 444cab1..0d5be5a 100644 --- a/benchmarks/src/gatling/resources/benchmark-defaults.conf +++ b/benchmarks/src/gatling/resources/benchmark-defaults.conf @@ -165,12 +165,14 @@ workload { # Configuration for the WeightedWorkloadOnTreeDataset simulation weighted-workload-on-tree-dataset { # Seed used for RNG during the test + # Default: 42 seed = 42 # Distributions for readers # Each distribution will have `count` threads assigned to it # mean / variance describe the properties of the normal distribution # Readers will read a random table in the table space based on sampling + # Default: [{ count = 8, mean = 0.3, variance = 0.0278 }] readers = [ { count = 8, mean = 0.3, variance = 0.0278 } ] @@ -179,6 +181,7 @@ workload { # Each distribution will have `count` threads assigned to it # mean / variance describe the properties of the normal distribution # Writers will write to a random table in the table space based on sampling + # Default: [{ count = 2, mean = 0.7, variance = 0.0278 }] writers = [ { count = 2, mean = 0.7, variance = 0.0278 } ] From c0afce57e1a1d7e719b5a3acf39c0bffd5e09b7e Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 00:05:49 -0700 Subject: [PATCH 06/28] messing around with gradle --- .../parameters/DatasetParameters.scala | 8 +- ...ghtedWorkloadOnTreeDatasetParameters.scala | 30 +++--- .../WeightedWorkloadOnTreeDataset.scala | 93 ++++++++++++++----- 3 files changed, 91 insertions(+), 40 deletions(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala index 7e1c933..530e5cb 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala @@ -79,7 +79,9 @@ case class DatasetParameters( s"Maximum number of views ($numViewsMax) cannot exceed computed total ($maxPossibleViews)" ) - val totalTables: Int = Math.min( - numCatalogs * Math.max(numTablesMax, Int.MaxValue), - numCatalogs * numTablesPerNs * nAryTree.numberOfLastLevelElements) + val totalTables: Int = if (numTablesMax != -1) { + numCatalogs * numTablesMax + } else { + numCatalogs * numTablesPerNs * nAryTree.numberOfLastLevelElements + } } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala index 68c7341..ea550a8 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala @@ -20,6 +20,7 @@ package org.apache.polaris.benchmarks.parameters import com.typesafe.config.Config +import com.typesafe.scalalogging.Logger import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -78,35 +79,36 @@ object Distribution { // Partition a Slice into a sub-slice private def partitionSlice(slice: Slice, totalPartitions: Int, partition: Int): Slice = { - require(partition >= 0 && partition < totalPartitions, s"Invalid partition index: $partition") - val total = slice.end - slice.start - val baseSize = total / totalPartitions - val remainder = total % totalPartitions + require(partition >= 0 && partition < totalPartitions, + s"Invalid partition index: ${partition} / ${totalPartitions}") - val extra = if (partition < remainder) 1 else 0 - val offset = partition * baseSize + math.min(partition, remainder) - val start = slice.start + offset - val end = start + baseSize + extra + val newPartitionSize = slice.size / totalPartitions + val newPartitionStart = slice.start + (partition * newPartitionSize) + val newPartitionEnd = newPartitionStart + newPartitionSize - Slice(start, end) + Slice(newPartitionStart, newPartitionEnd) } // Map an index back to a table path def tableIndexToIdentifier(index: Int, dp: DatasetParameters): (String, List[String], String) = { require(dp.numTablesMax == -1, "Sampling is incompatible with numTablesMax settings other than -1") - val tablesPerCatalog = dp.totalTables / dp.numCatalogs + val totalSlice = Slice(0, dp.totalTables) + val tablesPerCatalog = totalSlice.size / dp.numCatalogs val catalogIndex = index / tablesPerCatalog + var currentSlice = partitionSlice(totalSlice, dp.numCatalogs, catalogIndex) - var currentSlice = partitionSlice(Slice(0, dp.totalTables), dp.numCatalogs, catalogIndex) val namespacePath = ArrayBuffer[String]() - (0 until dp.nsDepth).foreach { _ => + (1 until dp.nsDepth).foreach { _ => val indexInSlice = index - currentSlice.start - val namespaceIndex = indexInSlice / dp.nsWidth + val tablesPerNamespace = currentSlice.size / dp.nsWidth + val namespaceIndex = indexInSlice / tablesPerNamespace + currentSlice = partitionSlice(currentSlice, dp.nsWidth, namespaceIndex) namespacePath += s"NS_${namespaceIndex}" } - require(currentSlice.size == dp.numTablesPerNs, "The final slice size should match numTablesPerNs") + require(currentSlice.size == dp.numTablesPerNs, + s"The final slice size should be size ${dp.numTablesPerNs} (slice = ${currentSlice})") (s"C_$catalogIndex", namespacePath.toList, s"T_${index - currentSlice.start}") } } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala index 7b96f28..5f3b237 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -51,6 +51,7 @@ class WeightedWorkloadOnTreeDataset extends Simulation { // -------------------------------------------------------------------------------- private val accessToken: AtomicReference[String] = new AtomicReference() private val shouldRefreshToken: AtomicBoolean = new AtomicBoolean(true) + private val readersLaunched = new AtomicBoolean(false) private val authActions = AuthenticationActions(cp, accessToken) private val tblActions = TableActions(dp, wp, accessToken) @@ -82,46 +83,92 @@ class WeightedWorkloadOnTreeDataset extends Simulation { session } - // Build the population with readers + writers - val weightedWorkloadPopulation: List[PopulationBuilder] = { - val readers = wp.weightedWorkloadOnTreeDataset.readers.zipWithIndex.flatMap { case (dist, i) => + // -------------------------------------------------------------------------------- + // Build up the HTTP protocol configuration and set up the simulation + // -------------------------------------------------------------------------------- + private val httpProtocol = http + .baseUrl(cp.baseUrl) + .acceptHeader("application/json") + .contentTypeHeader("application/json") + + // -------------------------------------------------------------------------------- + // Helper method to restore the token to the session + // -------------------------------------------------------------------------------- + private def restoreAccessTokenInSession = exec { session => + if (accessToken.get() == null) { + logger.warn("Access token is null when trying to restore to session!") + } + session.set("accessToken", accessToken.get()) + } + + // -------------------------------------------------------------------------------- + // Create all reader scenarios and prepare them for injection + // -------------------------------------------------------------------------------- + private val readerScenarioBuilders: List[ScenarioBuilder] = { + wp.weightedWorkloadOnTreeDataset.readers.zipWithIndex.flatMap { case (dist, i) => (0 until dist.count).map { threadId => - val rnp = RandomNumberProvider( - wp.weightedWorkloadOnTreeDataset.seed, i * 1000 + threadId) + val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, i * 1000 + threadId) + scenario(s"Reader-$i-$threadId") + .exec(restoreAccessTokenInSession) .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) { exec { session => val tableIndex = dist.sample(dp.totalTables, rnp) val (catalog, namespace, table) = Distribution.tableIndexToIdentifier(tableIndex, dp) - session.set("catalogName", catalog) - session.set("multipartNamespace", namespace.mkString(0x1F.toChar.toString)) - session.set("tableName", table) + session + .set("catalogName", catalog) + .set("multipartNamespace", namespace.mkString(0x1F.toChar.toString)) + .set("tableName", table) }.exec(tblActions.fetchTable) - }.inject(atOnceUsers(1)) + } } }.toList - - val writers = List.empty - readers ++ writers } + // Convert the ScenarioBuilders to PopulationBuilders ready for injection + private val readerPopulations: List[PopulationBuilder] = + readerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol)) + // -------------------------------------------------------------------------------- - // Build up the HTTP protocol configuration and set up the simulation + // Create a launcher scenario that will start all readers after authentication + // This is the key part that prevents multiple setUp calls // -------------------------------------------------------------------------------- - private val httpProtocol = http - .baseUrl(cp.baseUrl) - .acceptHeader("application/json") - .contentTypeHeader("application/json") + // Define a scenario that triggers all readers to start + val launchReadersScenario: ScenarioBuilder = scenario("Launch readers") + .exec { session => + // Access token is available here + logger.info(s"Authentication complete with token: ${accessToken.get() != null}") + + // Ensure we only launch the readers once + if (!readersLaunched.getAndSet(true)) { + logger.info(s"Starting ${readerScenarioBuilders.size} reader threads") + + // Each reader will get the access token when it runs + // We don't create any more setUp calls here + } + session + } // -------------------------------------------------------------------------------- // Setup // -------------------------------------------------------------------------------- - val allBuilders: List[PopulationBuilder] = + setUp( List( - continuouslyRefreshOauthToken.inject(atOnceUsers(1)).protocols(httpProtocol)) ++ - weightedWorkloadPopulation ++ - List(stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol)) - - setUp(allBuilders) + continuouslyRefreshOauthToken.inject(atOnceUsers(1)).protocols(httpProtocol), + waitForAuthentication + .inject(atOnceUsers(1)) + .protocols(httpProtocol) + .andThen( + launchReadersScenario + .inject(atOnceUsers(1)) + .protocols(httpProtocol) + ) + .andThen( + stopRefreshingToken + .inject(atOnceUsers(1)) + .protocols(httpProtocol) + ) + ) ++ readerPopulations + ) } From 599eb04ee0eba600c66d263e3130584ab50f6ceb Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 00:14:21 -0700 Subject: [PATCH 07/28] mess with gradle more --- .../gatling/resources/benchmark-defaults.conf | 3 +- .../actions/AuthenticationActions.scala | 3 +- .../WeightedWorkloadOnTreeDataset.scala | 54 ++++--------------- 3 files changed, 12 insertions(+), 48 deletions(-) diff --git a/benchmarks/src/gatling/resources/benchmark-defaults.conf b/benchmarks/src/gatling/resources/benchmark-defaults.conf index 0d5be5a..8333ede 100644 --- a/benchmarks/src/gatling/resources/benchmark-defaults.conf +++ b/benchmarks/src/gatling/resources/benchmark-defaults.conf @@ -165,7 +165,6 @@ workload { # Configuration for the WeightedWorkloadOnTreeDataset simulation weighted-workload-on-tree-dataset { # Seed used for RNG during the test - # Default: 42 seed = 42 # Distributions for readers @@ -174,7 +173,7 @@ workload { # Readers will read a random table in the table space based on sampling # Default: [{ count = 8, mean = 0.3, variance = 0.0278 }] readers = [ - { count = 8, mean = 0.3, variance = 0.0278 } + { count = 1, mean = 0.3, variance = 0.0278 } ] # Distributions for writers diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala index a65d2b8..83143cd 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala @@ -85,8 +85,9 @@ case class AuthenticationActions( .check(jsonPath("$.access_token").saveAs("accessToken")) ) .exec { session => - if (session.contains("accessToken")) + if (session.contains("accessToken")) { accessToken.set(session("accessToken").as[String]) + } session } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala index 5f3b237..e2383ad 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -91,16 +91,6 @@ class WeightedWorkloadOnTreeDataset extends Simulation { .acceptHeader("application/json") .contentTypeHeader("application/json") - // -------------------------------------------------------------------------------- - // Helper method to restore the token to the session - // -------------------------------------------------------------------------------- - private def restoreAccessTokenInSession = exec { session => - if (accessToken.get() == null) { - logger.warn("Access token is null when trying to restore to session!") - } - session.set("accessToken", accessToken.get()) - } - // -------------------------------------------------------------------------------- // Create all reader scenarios and prepare them for injection // -------------------------------------------------------------------------------- @@ -108,11 +98,14 @@ class WeightedWorkloadOnTreeDataset extends Simulation { wp.weightedWorkloadOnTreeDataset.readers.zipWithIndex.flatMap { case (dist, i) => (0 until dist.count).map { threadId => val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, i * 1000 + threadId) - scenario(s"Reader-$i-$threadId") - .exec(restoreAccessTokenInSession) + .exec(authActions.restoreAccessTokenInSession) .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) { exec { session => + while (session.contains("accessToken")) { + Thread.sleep(100) + } + val tableIndex = dist.sample(dp.totalTables, rnp) val (catalog, namespace, table) = Distribution.tableIndexToIdentifier(tableIndex, dp) @@ -120,35 +113,11 @@ class WeightedWorkloadOnTreeDataset extends Simulation { .set("catalogName", catalog) .set("multipartNamespace", namespace.mkString(0x1F.toChar.toString)) .set("tableName", table) - }.exec(tblActions.fetchTable) - } + } + }.exec(tblActions.fetchTable) } - }.toList - } - - // Convert the ScenarioBuilders to PopulationBuilders ready for injection - private val readerPopulations: List[PopulationBuilder] = - readerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol)) - - // -------------------------------------------------------------------------------- - // Create a launcher scenario that will start all readers after authentication - // This is the key part that prevents multiple setUp calls - // -------------------------------------------------------------------------------- - // Define a scenario that triggers all readers to start - val launchReadersScenario: ScenarioBuilder = scenario("Launch readers") - .exec { session => - // Access token is available here - logger.info(s"Authentication complete with token: ${accessToken.get() != null}") - - // Ensure we only launch the readers once - if (!readersLaunched.getAndSet(true)) { - logger.info(s"Starting ${readerScenarioBuilders.size} reader threads") - - // Each reader will get the access token when it runs - // We don't create any more setUp calls here - } - session } + }.toList // -------------------------------------------------------------------------------- // Setup @@ -159,16 +128,11 @@ class WeightedWorkloadOnTreeDataset extends Simulation { waitForAuthentication .inject(atOnceUsers(1)) .protocols(httpProtocol) - .andThen( - launchReadersScenario - .inject(atOnceUsers(1)) - .protocols(httpProtocol) - ) .andThen( stopRefreshingToken .inject(atOnceUsers(1)) .protocols(httpProtocol) ) - ) ++ readerPopulations + ) ++ readerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol)) ) } From d1f06ccaf18299b4be1fa1976be427f4f94cc45f Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 00:17:03 -0700 Subject: [PATCH 08/28] maybe? --- benchmarks/src/gatling/resources/benchmark-defaults.conf | 4 ++-- .../simulations/WeightedWorkloadOnTreeDataset.scala | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/benchmarks/src/gatling/resources/benchmark-defaults.conf b/benchmarks/src/gatling/resources/benchmark-defaults.conf index 8333ede..93da763 100644 --- a/benchmarks/src/gatling/resources/benchmark-defaults.conf +++ b/benchmarks/src/gatling/resources/benchmark-defaults.conf @@ -171,9 +171,9 @@ workload { # Each distribution will have `count` threads assigned to it # mean / variance describe the properties of the normal distribution # Readers will read a random table in the table space based on sampling - # Default: [{ count = 8, mean = 0.3, variance = 0.0278 }] + # Default: [{ count = 2, mean = 0.3, variance = 0.0278 }] readers = [ - { count = 1, mean = 0.3, variance = 0.0278 } + { count = 2, mean = 0.3, variance = 0.0278 } ] # Distributions for writers diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala index e2383ad..acee989 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -102,10 +102,6 @@ class WeightedWorkloadOnTreeDataset extends Simulation { .exec(authActions.restoreAccessTokenInSession) .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) { exec { session => - while (session.contains("accessToken")) { - Thread.sleep(100) - } - val tableIndex = dist.sample(dp.totalTables, rnp) val (catalog, namespace, table) = Distribution.tableIndexToIdentifier(tableIndex, dp) From a6dc1ae9d94922c843cb9bb9e920e1246a4afb0c Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 00:21:13 -0700 Subject: [PATCH 09/28] auth changes --- .../WeightedWorkloadOnTreeDataset.scala | 29 ++++--------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala index acee989..4e4a631 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -57,17 +57,14 @@ class WeightedWorkloadOnTreeDataset extends Simulation { private val tblActions = TableActions(dp, wp, accessToken) // -------------------------------------------------------------------------------- - // Authentication related workloads: - // * Authenticate and store the access token for later use every minute - // * Wait for an OAuth token to be available - // * Stop the token refresh loop + // Authentication related workloads // -------------------------------------------------------------------------------- - val continuouslyRefreshOauthToken: ScenarioBuilder = + val refreshOauthForDuration: ScenarioBuilder = scenario("Authenticate every minute using the Iceberg REST API") - .asLongAs(_ => shouldRefreshToken.get()) { + .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) { feed(authActions.feeder()) .exec(authActions.authenticateAndSaveAccessToken) - .pause(1.minute) + .pause(30.seconds) } val waitForAuthentication: ScenarioBuilder = @@ -76,13 +73,6 @@ class WeightedWorkloadOnTreeDataset extends Simulation { pause(1.second) } - val stopRefreshingToken: ScenarioBuilder = - scenario("Stop refreshing the authentication token") - .exec { session => - shouldRefreshToken.set(false) - session - } - // -------------------------------------------------------------------------------- // Build up the HTTP protocol configuration and set up the simulation // -------------------------------------------------------------------------------- @@ -120,15 +110,8 @@ class WeightedWorkloadOnTreeDataset extends Simulation { // -------------------------------------------------------------------------------- setUp( List( - continuouslyRefreshOauthToken.inject(atOnceUsers(1)).protocols(httpProtocol), - waitForAuthentication - .inject(atOnceUsers(1)) - .protocols(httpProtocol) - .andThen( - stopRefreshingToken - .inject(atOnceUsers(1)) - .protocols(httpProtocol) - ) + refreshOauthForDuration.inject(atOnceUsers(1)).protocols(httpProtocol), + waitForAuthentication.inject(atOnceUsers(1)).protocols(httpProtocol) ) ++ readerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol)) ) } From 828767b7739e351f5424aeea61f73483d540d848 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 00:23:07 -0700 Subject: [PATCH 10/28] Fix --- .../simulations/WeightedWorkloadOnTreeDataset.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala index 4e4a631..b0d77a3 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -99,8 +99,8 @@ class WeightedWorkloadOnTreeDataset extends Simulation { .set("catalogName", catalog) .set("multipartNamespace", namespace.mkString(0x1F.toChar.toString)) .set("tableName", table) - } - }.exec(tblActions.fetchTable) + }.exec(tblActions.fetchTable) + } } } }.toList From 84434a83149d1a3c36803cc2bd3a18bacf0e0c48 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 00:28:11 -0700 Subject: [PATCH 11/28] kinda works --- .../benchmarks/actions/AuthenticationActions.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala index 83143cd..0c82f06 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala @@ -23,14 +23,12 @@ import io.gatling.core.Predef._ import io.gatling.core.feeder.Feeder import io.gatling.core.structure.ChainBuilder import io.gatling.http.Predef._ -import org.apache.polaris.benchmarks.RetryOnHttpCodes.{ - retryOnHttpStatus, - HttpRequestBuilderWithStatusSave -} +import org.apache.polaris.benchmarks.RetryOnHttpCodes.{HttpRequestBuilderWithStatusSave, retryOnHttpStatus} import org.apache.polaris.benchmarks.parameters.ConnectionParameters import org.slf4j.LoggerFactory import java.util.concurrent.atomic.AtomicReference +import scala.concurrent.duration.DurationInt /** * Actions for performance testing authentication operations. This class provides methods to @@ -85,7 +83,7 @@ case class AuthenticationActions( .check(jsonPath("$.access_token").saveAs("accessToken")) ) .exec { session => - if (session.contains("accessToken")) { + if (session.contains("accessToken") && session("accessToken") != null) { accessToken.set(session("accessToken").as[String]) } session @@ -97,5 +95,9 @@ case class AuthenticationActions( * scenario. */ val restoreAccessTokenInSession: ChainBuilder = - exec(session => session.set("accessToken", accessToken.get())) + asLongAs(_ => accessToken.get() == null) { + pause(1.second) + }.exec { session => + session.set("accessToken", accessToken.get()) + } } From 87c06000fafe4890da0b4c71507cb82ee3e87ceb Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 01:01:34 -0700 Subject: [PATCH 12/28] simplify code --- ...ghtedWorkloadOnTreeDatasetParameters.scala | 35 ++----------------- 1 file changed, 3 insertions(+), 32 deletions(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala index ea550a8..b028a9b 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala @@ -72,44 +72,15 @@ case class Distribution(count: Int, mean: Double, variance: Double) { object Distribution { - // Used to represent a range of values where start is inclusive and end is exclusive - private case class Slice(start: Int, end: Int) { - def size: Int = end - start - } - - // Partition a Slice into a sub-slice - private def partitionSlice(slice: Slice, totalPartitions: Int, partition: Int): Slice = { - require(partition >= 0 && partition < totalPartitions, - s"Invalid partition index: ${partition} / ${totalPartitions}") - - val newPartitionSize = slice.size / totalPartitions - val newPartitionStart = slice.start + (partition * newPartitionSize) - val newPartitionEnd = newPartitionStart + newPartitionSize - - Slice(newPartitionStart, newPartitionEnd) - } - // Map an index back to a table path def tableIndexToIdentifier(index: Int, dp: DatasetParameters): (String, List[String], String) = { require(dp.numTablesMax == -1, "Sampling is incompatible with numTablesMax settings other than -1") - val totalSlice = Slice(0, dp.totalTables) - val tablesPerCatalog = totalSlice.size / dp.numCatalogs + val tablesPerCatalog = dp.totalTables / dp.numCatalogs val catalogIndex = index / tablesPerCatalog - var currentSlice = partitionSlice(totalSlice, dp.numCatalogs, catalogIndex) - val namespacePath = ArrayBuffer[String]() - (1 until dp.nsDepth).foreach { _ => - val indexInSlice = index - currentSlice.start - val tablesPerNamespace = currentSlice.size / dp.nsWidth - val namespaceIndex = indexInSlice / tablesPerNamespace - - currentSlice = partitionSlice(currentSlice, dp.nsWidth, namespaceIndex) - namespacePath += s"NS_${namespaceIndex}" - } - require(currentSlice.size == dp.numTablesPerNs, - s"The final slice size should be size ${dp.numTablesPerNs} (slice = ${currentSlice})") - (s"C_$catalogIndex", namespacePath.toList, s"T_${index - currentSlice.start}") + val namespaceOrdinal = dp.nAryTree.pathToRoot(index / dp.numTablesPerNs) + (s"C_$catalogIndex", namespaceOrdinal.map(n => s"NS_${n}"), s"T_${index}") } } From 4deb6e404490bcb2ffe28d41b15680849c7dbcb4 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 01:14:47 -0700 Subject: [PATCH 13/28] working --- .../benchmarks/parameters/DatasetParameters.scala | 8 +------- .../WeightedWorkloadOnTreeDatasetParameters.scala | 8 ++++---- .../simulations/WeightedWorkloadOnTreeDataset.scala | 12 +++++++++++- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala index 530e5cb..2898dff 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala @@ -54,7 +54,7 @@ case class DatasetParameters( numViewProperties: Int ) { val nAryTree: NAryTreeBuilder = NAryTreeBuilder(nsWidth, nsDepth) - private val maxPossibleTables = nAryTree.numberOfLastLevelElements * numTablesPerNs + val maxPossibleTables = nAryTree.numberOfLastLevelElements * numTablesPerNs private val maxPossibleViews = nAryTree.numberOfLastLevelElements * numViewsPerNs val numTables: Int = if (numTablesMax <= 0) maxPossibleTables else numTablesMax val numViews: Int = if (numViewsMax <= 0) maxPossibleViews else numViewsMax @@ -78,10 +78,4 @@ case class DatasetParameters( numViewsMax == -1 || numViewsMax <= maxPossibleViews, s"Maximum number of views ($numViewsMax) cannot exceed computed total ($maxPossibleViews)" ) - - val totalTables: Int = if (numTablesMax != -1) { - numCatalogs * numTablesMax - } else { - numCatalogs * numTablesPerNs * nAryTree.numberOfLastLevelElements - } } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala index b028a9b..3a7f613 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala @@ -76,11 +76,11 @@ object Distribution { def tableIndexToIdentifier(index: Int, dp: DatasetParameters): (String, List[String], String) = { require(dp.numTablesMax == -1, "Sampling is incompatible with numTablesMax settings other than -1") - val tablesPerCatalog = dp.totalTables / dp.numCatalogs - val catalogIndex = index / tablesPerCatalog + val namespaceIndex = index / dp.numTablesPerNs + val namespaceOrdinal = dp.nAryTree.lastLevelOrdinals.toList.apply(namespaceIndex) + val namespacePath = dp.nAryTree.pathToRoot(namespaceOrdinal) - val namespaceOrdinal = dp.nAryTree.pathToRoot(index / dp.numTablesPerNs) - (s"C_$catalogIndex", namespaceOrdinal.map(n => s"NS_${n}"), s"T_${index}") + (s"C_0", namespacePath.map(n => s"NS_${n}"), s"T_${index}") } } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala index b0d77a3..5383f47 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -92,13 +92,23 @@ class WeightedWorkloadOnTreeDataset extends Simulation { .exec(authActions.restoreAccessTokenInSession) .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) { exec { session => - val tableIndex = dist.sample(dp.totalTables, rnp) + val tableIndex = dist.sample(dp.maxPossibleTables, rnp) val (catalog, namespace, table) = Distribution.tableIndexToIdentifier(tableIndex, dp) + + // Checked in `fetchTable` + val expectedProperties: Map[String, String] = (0 until dp.numTableProperties) + .map(id => s"InitialAttribute_$id" -> s"$id") + .toMap + val expectedLocation = + s"${dp.defaultBaseLocation}/$catalog/${namespace.mkString("/")}/${table}" + session .set("catalogName", catalog) .set("multipartNamespace", namespace.mkString(0x1F.toChar.toString)) .set("tableName", table) + .set("initialProperties", expectedProperties) + .set("location", expectedLocation) }.exec(tblActions.fetchTable) } } From a40896c0e9ae98a7a9762c1839d864457598f675 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 01:19:17 -0700 Subject: [PATCH 14/28] add writers --- .../gatling/resources/benchmark-defaults.conf | 4 +-- .../WeightedWorkloadOnTreeDataset.scala | 34 +++++++++++++++++-- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/benchmarks/src/gatling/resources/benchmark-defaults.conf b/benchmarks/src/gatling/resources/benchmark-defaults.conf index 93da763..83933a2 100644 --- a/benchmarks/src/gatling/resources/benchmark-defaults.conf +++ b/benchmarks/src/gatling/resources/benchmark-defaults.conf @@ -171,9 +171,9 @@ workload { # Each distribution will have `count` threads assigned to it # mean / variance describe the properties of the normal distribution # Readers will read a random table in the table space based on sampling - # Default: [{ count = 2, mean = 0.3, variance = 0.0278 }] + # Default: [{ count = 8, mean = 0.3, variance = 0.0278 }] readers = [ - { count = 2, mean = 0.3, variance = 0.0278 } + { count = 8, mean = 0.3, variance = 0.0278 } ] # Distributions for writers diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala index 5383f47..e22df0c 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -28,6 +28,7 @@ import org.apache.polaris.benchmarks.parameters.{ConnectionParameters, DatasetPa import org.apache.polaris.benchmarks.util.CircularIterator import org.slf4j.LoggerFactory +import java.util.UUID import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.concurrent.duration._ @@ -82,7 +83,7 @@ class WeightedWorkloadOnTreeDataset extends Simulation { .contentTypeHeader("application/json") // -------------------------------------------------------------------------------- - // Create all reader scenarios and prepare them for injection + // Create all reader/writer scenarios and prepare them for injection // -------------------------------------------------------------------------------- private val readerScenarioBuilders: List[ScenarioBuilder] = { wp.weightedWorkloadOnTreeDataset.readers.zipWithIndex.flatMap { case (dist, i) => @@ -115,6 +116,33 @@ class WeightedWorkloadOnTreeDataset extends Simulation { } }.toList + private val writerScenarioBuilders: List[ScenarioBuilder] = { + wp.weightedWorkloadOnTreeDataset.writers.zipWithIndex.flatMap { case (dist, i) => + (0 until dist.count).map { threadId => + val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, i * 1000 + threadId) + scenario(s"Writer-$i-$threadId") + .exec(authActions.restoreAccessTokenInSession) + .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) { + exec { session => + val tableIndex = dist.sample(dp.maxPossibleTables, rnp) + val (catalog, namespace, table) = + Distribution.tableIndexToIdentifier(tableIndex, dp) + + // Needed for `updateTable` + val now = System.currentTimeMillis + val newProperty = s"""{"last_updated": "${now}"}""" + + session + .set("catalogName", catalog) + .set("multipartNamespace", namespace.mkString(0x1F.toChar.toString)) + .set("tableName", table) + .set("newProperty", newProperty) + }.exec(tblActions.updateTable) + } + } + } + }.toList + // -------------------------------------------------------------------------------- // Setup // -------------------------------------------------------------------------------- @@ -122,6 +150,8 @@ class WeightedWorkloadOnTreeDataset extends Simulation { List( refreshOauthForDuration.inject(atOnceUsers(1)).protocols(httpProtocol), waitForAuthentication.inject(atOnceUsers(1)).protocols(httpProtocol) - ) ++ readerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol)) + ) ++ + readerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol)) ++ + writerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol)) ) } From 18487ceb2e6ea72673f2b00a538c164799bd5fc6 Mon Sep 17 00:00:00 2001 From: eric-maynard Date: Wed, 30 Apr 2025 01:33:27 -0700 Subject: [PATCH 15/28] fix --- .../benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala index e22df0c..6afeb21 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -119,7 +119,7 @@ class WeightedWorkloadOnTreeDataset extends Simulation { private val writerScenarioBuilders: List[ScenarioBuilder] = { wp.weightedWorkloadOnTreeDataset.writers.zipWithIndex.flatMap { case (dist, i) => (0 until dist.count).map { threadId => - val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, i * 1000 + threadId) + val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, i * 2000 + threadId) scenario(s"Writer-$i-$threadId") .exec(authActions.restoreAccessTokenInSession) .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) { From eb392ab82cf11ecf05e0ecc5f0facaad24ebb2b9 Mon Sep 17 00:00:00 2001 From: eric-maynard Date: Wed, 30 Apr 2025 01:36:16 -0700 Subject: [PATCH 16/28] spotless --- .../gatling/resources/benchmark-defaults.conf | 2 +- .../actions/AuthenticationActions.scala | 5 +++- ...ghtedWorkloadOnTreeDatasetParameters.scala | 12 +++++--- .../WeightedWorkloadOnTreeDataset.scala | 28 +++++++++++-------- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/benchmarks/src/gatling/resources/benchmark-defaults.conf b/benchmarks/src/gatling/resources/benchmark-defaults.conf index 83933a2..02a4f3f 100644 --- a/benchmarks/src/gatling/resources/benchmark-defaults.conf +++ b/benchmarks/src/gatling/resources/benchmark-defaults.conf @@ -173,7 +173,7 @@ workload { # Readers will read a random table in the table space based on sampling # Default: [{ count = 8, mean = 0.3, variance = 0.0278 }] readers = [ - { count = 8, mean = 0.3, variance = 0.0278 } + { count = 8, mean = 0.3, variance = 0.001 } ] # Distributions for writers diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala index 0c82f06..4cb86c3 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala @@ -23,7 +23,10 @@ import io.gatling.core.Predef._ import io.gatling.core.feeder.Feeder import io.gatling.core.structure.ChainBuilder import io.gatling.http.Predef._ -import org.apache.polaris.benchmarks.RetryOnHttpCodes.{HttpRequestBuilderWithStatusSave, retryOnHttpStatus} +import org.apache.polaris.benchmarks.RetryOnHttpCodes.{ + retryOnHttpStatus, + HttpRequestBuilderWithStatusSave +} import org.apache.polaris.benchmarks.parameters.ConnectionParameters import org.slf4j.LoggerFactory diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala index 3a7f613..281e9e6 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala @@ -37,13 +37,14 @@ case class WeightedWorkloadOnTreeDatasetParameters( seed: Int, readers: Seq[Distribution], writers: Seq[Distribution], - durationInMinutes: Int) { + durationInMinutes: Int +) { require(readers.nonEmpty || writers.nonEmpty, "At least one reader or writer is required") require(durationInMinutes > 0, "Duration in minutes must be positive") } object WeightedWorkloadOnTreeDatasetParameters { - def loadDistributionsList(config: Config, key: String): List[Distribution] = { + def loadDistributionsList(config: Config, key: String): List[Distribution] = config.getConfigList(key).asScala.toList.map { conf => Distribution( count = conf.getInt("count"), @@ -51,10 +52,10 @@ object WeightedWorkloadOnTreeDatasetParameters { variance = conf.getDouble("variance") ) } - } } case class Distribution(count: Int, mean: Double, variance: Double) { + /** * Return a value in [0, items) based on this distribution using truncated normal resampling. */ @@ -74,7 +75,10 @@ object Distribution { // Map an index back to a table path def tableIndexToIdentifier(index: Int, dp: DatasetParameters): (String, List[String], String) = { - require(dp.numTablesMax == -1, "Sampling is incompatible with numTablesMax settings other than -1") + require( + dp.numTablesMax == -1, + "Sampling is incompatible with numTablesMax settings other than -1" + ) val namespaceIndex = index / dp.numTablesPerNs val namespaceOrdinal = dp.nAryTree.lastLevelOrdinals.toList.apply(namespaceIndex) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala index 6afeb21..5ff5779 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -24,7 +24,13 @@ import io.gatling.core.structure.{ChainBuilder, PopulationBuilder, ScenarioBuild import io.gatling.http.Predef._ import org.apache.polaris.benchmarks.actions._ import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config -import org.apache.polaris.benchmarks.parameters.{ConnectionParameters, DatasetParameters, Distribution, RandomNumberProvider, WorkloadParameters} +import org.apache.polaris.benchmarks.parameters.{ + ConnectionParameters, + DatasetParameters, + Distribution, + RandomNumberProvider, + WorkloadParameters +} import org.apache.polaris.benchmarks.util.CircularIterator import org.slf4j.LoggerFactory @@ -33,9 +39,9 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.concurrent.duration._ /** - * This simulation performs reads and writes based on distributions specified in the config. It allows - * the simulation of workloads where e.g. a small fraction of tables get most writes. It is intended to - * be used against a Polaris instance with a pre-existing tree dataset. + * This simulation performs reads and writes based on distributions specified in the config. It + * allows the simulation of workloads where e.g. a small fraction of tables get most writes. It is + * intended to be used against a Polaris instance with a pre-existing tree dataset. */ class WeightedWorkloadOnTreeDataset extends Simulation { private val logger = LoggerFactory.getLogger(getClass) @@ -85,7 +91,7 @@ class WeightedWorkloadOnTreeDataset extends Simulation { // -------------------------------------------------------------------------------- // Create all reader/writer scenarios and prepare them for injection // -------------------------------------------------------------------------------- - private val readerScenarioBuilders: List[ScenarioBuilder] = { + private val readerScenarioBuilders: List[ScenarioBuilder] = wp.weightedWorkloadOnTreeDataset.readers.zipWithIndex.flatMap { case (dist, i) => (0 until dist.count).map { threadId => val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, i * 1000 + threadId) @@ -106,17 +112,16 @@ class WeightedWorkloadOnTreeDataset extends Simulation { session .set("catalogName", catalog) - .set("multipartNamespace", namespace.mkString(0x1F.toChar.toString)) + .set("multipartNamespace", namespace.mkString(0x1f.toChar.toString)) .set("tableName", table) .set("initialProperties", expectedProperties) .set("location", expectedLocation) }.exec(tblActions.fetchTable) } } - } - }.toList + }.toList - private val writerScenarioBuilders: List[ScenarioBuilder] = { + private val writerScenarioBuilders: List[ScenarioBuilder] = wp.weightedWorkloadOnTreeDataset.writers.zipWithIndex.flatMap { case (dist, i) => (0 until dist.count).map { threadId => val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, i * 2000 + threadId) @@ -134,14 +139,13 @@ class WeightedWorkloadOnTreeDataset extends Simulation { session .set("catalogName", catalog) - .set("multipartNamespace", namespace.mkString(0x1F.toChar.toString)) + .set("multipartNamespace", namespace.mkString(0x1f.toChar.toString)) .set("tableName", table) .set("newProperty", newProperty) }.exec(tblActions.updateTable) } } - } - }.toList + }.toList // -------------------------------------------------------------------------------- // Setup From 8d6ad0094c882655849ee680d1f2e56663fdb1bc Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 01:52:36 -0700 Subject: [PATCH 17/28] spotless again --- .../simulations/WeightedWorkloadOnTreeDataset.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala index 5ff5779..1713170 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -20,7 +20,7 @@ package org.apache.polaris.benchmarks.simulations import io.gatling.core.Predef._ -import io.gatling.core.structure.{ChainBuilder, PopulationBuilder, ScenarioBuilder} +import io.gatling.core.structure.ScenarioBuilder import io.gatling.http.Predef._ import org.apache.polaris.benchmarks.actions._ import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config @@ -31,11 +31,9 @@ import org.apache.polaris.benchmarks.parameters.{ RandomNumberProvider, WorkloadParameters } -import org.apache.polaris.benchmarks.util.CircularIterator import org.slf4j.LoggerFactory -import java.util.UUID -import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} +import java.util.concurrent.atomic.AtomicReference import scala.concurrent.duration._ /** @@ -57,8 +55,6 @@ class WeightedWorkloadOnTreeDataset extends Simulation { // Helper values // -------------------------------------------------------------------------------- private val accessToken: AtomicReference[String] = new AtomicReference() - private val shouldRefreshToken: AtomicBoolean = new AtomicBoolean(true) - private val readersLaunched = new AtomicBoolean(false) private val authActions = AuthenticationActions(cp, accessToken) private val tblActions = TableActions(dp, wp, accessToken) From f5dacf76df1fd52432f2adfebf2085569cad3731 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 11:14:41 -0700 Subject: [PATCH 18/28] add summary viz --- .../gatling/resources/benchmark-defaults.conf | 4 +- ...ghtedWorkloadOnTreeDatasetParameters.scala | 54 +++++++++++++++++++ .../WeightedWorkloadOnTreeDataset.scala | 6 +++ 3 files changed, 62 insertions(+), 2 deletions(-) diff --git a/benchmarks/src/gatling/resources/benchmark-defaults.conf b/benchmarks/src/gatling/resources/benchmark-defaults.conf index 02a4f3f..318ef4c 100644 --- a/benchmarks/src/gatling/resources/benchmark-defaults.conf +++ b/benchmarks/src/gatling/resources/benchmark-defaults.conf @@ -186,7 +186,7 @@ workload { ] # Duration of the simulation in minutes - # Default: 10 - duration-in-minutes = 10 + # Default: 5 + duration-in-minutes = 5 } } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala index 281e9e6..c6694af 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala @@ -21,8 +21,10 @@ package org.apache.polaris.benchmarks.parameters import com.typesafe.config.Config import com.typesafe.scalalogging.Logger +import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ +import scala.collection.immutable.LazyList import scala.collection.mutable.ArrayBuffer import scala.util.Random @@ -55,6 +57,29 @@ object WeightedWorkloadOnTreeDatasetParameters { } case class Distribution(count: Int, mean: Double, variance: Double) { + private val logger = LoggerFactory.getLogger(getClass) + + def printDescription(): Unit = { + println(s"Summary for ${this}") + + // On startup, print some metadata about the distribution + printVisualiztion() + + // Warn if a large amount of resampling will be needed + val debugRandomNumberProvider = RandomNumberProvider(1, 2) + def resampleStream: LazyList[Double] = + LazyList.continually(debugRandomNumberProvider.next() * math.sqrt(variance) + mean) + + val (_, resamples) = resampleStream.zipWithIndex + .take(100000) + .find { case (value, _) => value >= 0 && value <= 1 } + .map { case (value, index) => (value, index) } + .getOrElse((-1, 100000)) + + if (resamples > 5) { + logger.warn(s"A distribution appears to require aggressive resampling: ${this} took ${resamples + 1} samples") + } + } /** * Return a value in [0, items) based on this distribution using truncated normal resampling. @@ -69,6 +94,35 @@ case class Distribution(count: Int, mean: Double, variance: Double) { (value * items).toInt.min(items - 1) } + + def printVisualiztion(samples: Int = 100000, bins: Int = 10): Unit = { + val binCounts = Array.fill(bins)(0) + val rng = new RandomNumberProvider(1, 2) + + for (_ <- 1 to samples) { + val value = Iterator + .continually(rng.next() * math.sqrt(variance) + mean) + .dropWhile(x => x < 0.0 || x > 1.0) + .next() + + val bin = ((value * bins).toInt).min(bins - 1) + binCounts(bin) += 1 + } + + val maxBarWidth = 50 + val total = binCounts.sum.toDouble + println("Range | % of Samples | Visualization") + println("--------------|--------------|------------------") + + for (i <- 0 until bins) { + val low = i.toDouble / bins + val high = (i + 1).toDouble / bins + val percent = binCounts(i) / total * 100 + val bar = "█" * ((percent / 100 * maxBarWidth).round.toInt) + println(f"[$low%.1f - $high%.1f) | $percent%6.2f%% | $bar") + } + println() + } } object Distribution { diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala index 1713170..a86f155 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -51,6 +51,12 @@ class WeightedWorkloadOnTreeDataset extends Simulation { val dp: DatasetParameters = config.datasetParameters val wp: WorkloadParameters = config.workloadParameters + println("Reader distributions:") + wp.weightedWorkloadOnTreeDataset.readers.foreach(_.printDescription()) + + println("Writer distributions:") + wp.weightedWorkloadOnTreeDataset.writers.foreach(_.printDescription()) + // -------------------------------------------------------------------------------- // Helper values // -------------------------------------------------------------------------------- From 70cb5ea8dba6fdf689bb7c1140131ffe914f238e Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 11:16:36 -0700 Subject: [PATCH 19/28] polish --- ...WeightedWorkloadOnTreeDatasetParameters.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala index c6694af..bbf59c4 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala @@ -60,10 +60,10 @@ case class Distribution(count: Int, mean: Double, variance: Double) { private val logger = LoggerFactory.getLogger(getClass) def printDescription(): Unit = { - println(s"Summary for ${this}") + println(s"Summary for ${this}:") // On startup, print some metadata about the distribution - printVisualiztion() + printVisualization() // Warn if a large amount of resampling will be needed val debugRandomNumberProvider = RandomNumberProvider(1, 2) @@ -77,7 +77,7 @@ case class Distribution(count: Int, mean: Double, variance: Double) { .getOrElse((-1, 100000)) if (resamples > 5) { - logger.warn(s"A distribution appears to require aggressive resampling: ${this} took ${resamples + 1} samples") + logger.warn(s"A distribution appears to require aggressive resampling: ${this} took ${resamples + 1} samples!") } } @@ -95,9 +95,9 @@ case class Distribution(count: Int, mean: Double, variance: Double) { (value * items).toInt.min(items - 1) } - def printVisualiztion(samples: Int = 100000, bins: Int = 10): Unit = { + def printVisualization(samples: Int = 100000, bins: Int = 10): Unit = { val binCounts = Array.fill(bins)(0) - val rng = new RandomNumberProvider(1, 2) + val rng = RandomNumberProvider("visualization".hashCode, -1) for (_ <- 1 to samples) { val value = Iterator @@ -111,15 +111,15 @@ case class Distribution(count: Int, mean: Double, variance: Double) { val maxBarWidth = 50 val total = binCounts.sum.toDouble - println("Range | % of Samples | Visualization") - println("--------------|--------------|------------------") + println(" Range | % of Samples | Visualization") + println(" --------------|--------------|------------------") for (i <- 0 until bins) { val low = i.toDouble / bins val high = (i + 1).toDouble / bins val percent = binCounts(i) / total * 100 val bar = "█" * ((percent / 100 * maxBarWidth).round.toInt) - println(f"[$low%.1f - $high%.1f) | $percent%6.2f%% | $bar") + println(f" [$low%.1f - $high%.1f) | $percent%6.2f%% | $bar") } println() } From a5d3b2af39c26435cafc675f20a632f9f1d53f59 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 11:17:19 -0700 Subject: [PATCH 20/28] spotless --- .../WeightedWorkloadOnTreeDatasetParameters.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala index bbf59c4..38c4650 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala @@ -77,7 +77,9 @@ case class Distribution(count: Int, mean: Double, variance: Double) { .getOrElse((-1, 100000)) if (resamples > 5) { - logger.warn(s"A distribution appears to require aggressive resampling: ${this} took ${resamples + 1} samples!") + logger.warn( + s"A distribution appears to require aggressive resampling: ${this} took ${resamples + 1} samples!" + ) } } @@ -105,7 +107,7 @@ case class Distribution(count: Int, mean: Double, variance: Double) { .dropWhile(x => x < 0.0 || x > 1.0) .next() - val bin = ((value * bins).toInt).min(bins - 1) + val bin = (value * bins).toInt.min(bins - 1) binCounts(bin) += 1 } From 130fe1e553c6bc8402c9b91a38c64f849f6a747f Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 11:37:56 -0700 Subject: [PATCH 21/28] spotless --- .../gatling/resources/benchmark-defaults.conf | 2 +- ...ghtedWorkloadOnTreeDatasetParameters.scala | 41 +++++++++++-------- .../WeightedWorkloadOnTreeDataset.scala | 8 ++-- 3 files changed, 29 insertions(+), 22 deletions(-) diff --git a/benchmarks/src/gatling/resources/benchmark-defaults.conf b/benchmarks/src/gatling/resources/benchmark-defaults.conf index 318ef4c..739d282 100644 --- a/benchmarks/src/gatling/resources/benchmark-defaults.conf +++ b/benchmarks/src/gatling/resources/benchmark-defaults.conf @@ -173,7 +173,7 @@ workload { # Readers will read a random table in the table space based on sampling # Default: [{ count = 8, mean = 0.3, variance = 0.0278 }] readers = [ - { count = 8, mean = 0.3, variance = 0.001 } + { count = 8, mean = 0.3, variance = 0.0278 } ] # Distributions for writers diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala index 38c4650..c13e224 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala @@ -23,8 +23,9 @@ import com.typesafe.config.Config import com.typesafe.scalalogging.Logger import org.slf4j.LoggerFactory -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import scala.collection.immutable.LazyList +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.Random @@ -59,16 +60,16 @@ object WeightedWorkloadOnTreeDatasetParameters { case class Distribution(count: Int, mean: Double, variance: Double) { private val logger = LoggerFactory.getLogger(getClass) - def printDescription(): Unit = { + def printDescription(dataset: DatasetParameters): Unit = { println(s"Summary for ${this}:") - // On startup, print some metadata about the distribution - printVisualization() + // Visualize distributions + printVisualization(dataset.maxPossibleTables) // Warn if a large amount of resampling will be needed val debugRandomNumberProvider = RandomNumberProvider(1, 2) def resampleStream: LazyList[Double] = - LazyList.continually(debugRandomNumberProvider.next() * math.sqrt(variance) + mean) + LazyList.continually(sample(dataset.maxPossibleTables, debugRandomNumberProvider)) val (_, resamples) = resampleStream.zipWithIndex .take(100000) @@ -76,7 +77,7 @@ case class Distribution(count: Int, mean: Double, variance: Double) { .map { case (value, index) => (value, index) } .getOrElse((-1, 100000)) - if (resamples > 5) { + if (resamples > 10) { logger.warn( s"A distribution appears to require aggressive resampling: ${this} took ${resamples + 1} samples!" ) @@ -89,25 +90,25 @@ case class Distribution(count: Int, mean: Double, variance: Double) { def sample(items: Int, randomNumberProvider: RandomNumberProvider): Int = { val stddev = math.sqrt(variance) // Resample until the value is in [0, 1] + val maxSamples = 100000 val value = Iterator .continually(randomNumberProvider.next() * stddev + mean) - .dropWhile(x => x < 0.0 || x > 1.0) - .next() + .take(maxSamples) + .find(x => x >= 0.0 && x <= 1.0) + .getOrElse(throw new RuntimeException(s"Failed to sample a value in [0, 1] after ${maxSamples} attempts")) (value * items).toInt.min(items - 1) } - def printVisualization(samples: Int = 100000, bins: Int = 10): Unit = { + def printVisualization(tables: Int, samples: Int = 100000, bins: Int = 10): Unit = { val binCounts = Array.fill(bins)(0) + val hits = new mutable.HashMap[Int, Int]() val rng = RandomNumberProvider("visualization".hashCode, -1) - for (_ <- 1 to samples) { - val value = Iterator - .continually(rng.next() * math.sqrt(variance) + mean) - .dropWhile(x => x < 0.0 || x > 1.0) - .next() - - val bin = (value * bins).toInt.min(bins - 1) + (1 to samples).foreach { _ => + val value = sample(tables, rng) + val bin = ((value.toDouble / tables) * bins).toInt.min(bins - 1) + hits.put(value, hits.getOrElse(value, 0) + 1) binCounts(bin) += 1 } @@ -116,7 +117,7 @@ case class Distribution(count: Int, mean: Double, variance: Double) { println(" Range | % of Samples | Visualization") println(" --------------|--------------|------------------") - for (i <- 0 until bins) { + (0 until bins).foreach { i => val low = i.toDouble / bins val high = (i + 1).toDouble / bins val percent = binCounts(i) / total * 100 @@ -124,6 +125,12 @@ case class Distribution(count: Int, mean: Double, variance: Double) { println(f" [$low%.1f - $high%.1f) | $percent%6.2f%% | $bar") } println() + + val mode = hits.maxBy(_._2) + val modePercentage: Int = Math.round(mode._2.toFloat / samples * 100) + println(s" The most frequently selected table was chosen in ~${modePercentage}% of samples") + + println() } } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala index a86f155..b043d0e 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -51,11 +51,11 @@ class WeightedWorkloadOnTreeDataset extends Simulation { val dp: DatasetParameters = config.datasetParameters val wp: WorkloadParameters = config.workloadParameters - println("Reader distributions:") - wp.weightedWorkloadOnTreeDataset.readers.foreach(_.printDescription()) + println("### Reader distributions ###") + wp.weightedWorkloadOnTreeDataset.readers.foreach(_.printDescription(dp)) - println("Writer distributions:") - wp.weightedWorkloadOnTreeDataset.writers.foreach(_.printDescription()) + println("### Writer distributions ###") + wp.weightedWorkloadOnTreeDataset.writers.foreach(_.printDescription(dp)) // -------------------------------------------------------------------------------- // Helper values From 407993ea0bb5b2d3c0c76dacd30017408b81466c Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 11:38:14 -0700 Subject: [PATCH 22/28] spotless again --- .../WeightedWorkloadOnTreeDatasetParameters.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala index c13e224..1847ce0 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala @@ -95,7 +95,11 @@ case class Distribution(count: Int, mean: Double, variance: Double) { .continually(randomNumberProvider.next() * stddev + mean) .take(maxSamples) .find(x => x >= 0.0 && x <= 1.0) - .getOrElse(throw new RuntimeException(s"Failed to sample a value in [0, 1] after ${maxSamples} attempts")) + .getOrElse( + throw new RuntimeException( + s"Failed to sample a value in [0, 1] after ${maxSamples} attempts" + ) + ) (value * items).toInt.min(items - 1) } From bb73724ddc3790c9f475ffb91d5fca3dafe20e60 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 11:38:57 -0700 Subject: [PATCH 23/28] one fix --- .../parameters/WeightedWorkloadOnTreeDatasetParameters.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala index 1847ce0..2870e6b 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala @@ -67,7 +67,7 @@ case class Distribution(count: Int, mean: Double, variance: Double) { printVisualization(dataset.maxPossibleTables) // Warn if a large amount of resampling will be needed - val debugRandomNumberProvider = RandomNumberProvider(1, 2) + val debugRandomNumberProvider = RandomNumberProvider("debug".hashCode, -1) def resampleStream: LazyList[Double] = LazyList.continually(sample(dataset.maxPossibleTables, debugRandomNumberProvider)) @@ -77,7 +77,7 @@ case class Distribution(count: Int, mean: Double, variance: Double) { .map { case (value, index) => (value, index) } .getOrElse((-1, 100000)) - if (resamples > 10) { + if (resamples > 100) { logger.warn( s"A distribution appears to require aggressive resampling: ${this} took ${resamples + 1} samples!" ) From 127ee67bf13ec27bff37e728ed2d64e1df0090fc Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 19:18:12 -0700 Subject: [PATCH 24/28] fix --- .../parameters/WeightedWorkloadOnTreeDatasetParameters.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala index 2870e6b..197cbd5 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala @@ -73,7 +73,7 @@ case class Distribution(count: Int, mean: Double, variance: Double) { val (_, resamples) = resampleStream.zipWithIndex .take(100000) - .find { case (value, _) => value >= 0 && value <= 1 } + .find { case (value, _) => value >= 0 && value < dataset.maxPossibleTables} .map { case (value, index) => (value, index) } .getOrElse((-1, 100000)) From 7781e6dd17b7de39bd70bca9a890d913e7ecb578 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 22:11:12 -0700 Subject: [PATCH 25/28] remove header --- .../org/apache/polaris/benchmarks/actions/TableActions.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/TableActions.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/TableActions.scala index 1d5b951..56ea470 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/TableActions.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/TableActions.scala @@ -184,6 +184,7 @@ case class TableActions( http("Fetch Table") .get("/api/catalog/v1/#{catalogName}/namespaces/#{multipartNamespace}/tables/#{tableName}") .header("Authorization", "Bearer #{accessToken}") + .header("If-None-Match", null) .check(status.is(200)) .check(jsonPath("$.metadata.table-uuid").saveAs("tableUuid")) .check(jsonPath("$.metadata.location").is("#{location}")) From 2ee7baccb161dc9cd97492964384ba686955cfde Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 22:35:16 -0700 Subject: [PATCH 26/28] empty string --- .../org/apache/polaris/benchmarks/actions/TableActions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/TableActions.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/TableActions.scala index 56ea470..c974064 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/TableActions.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/TableActions.scala @@ -184,7 +184,7 @@ case class TableActions( http("Fetch Table") .get("/api/catalog/v1/#{catalogName}/namespaces/#{multipartNamespace}/tables/#{tableName}") .header("Authorization", "Bearer #{accessToken}") - .header("If-None-Match", null) + .header("If-None-Match", "") .check(status.is(200)) .check(jsonPath("$.metadata.table-uuid").saveAs("tableUuid")) .check(jsonPath("$.metadata.location").is("#{location}")) From 454e167e1a0c781c4094b2372b54aba98bd8524e Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 30 Apr 2025 23:58:34 -0700 Subject: [PATCH 27/28] spotless --- .../parameters/WeightedWorkloadOnTreeDatasetParameters.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala index 197cbd5..9505875 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala @@ -73,7 +73,7 @@ case class Distribution(count: Int, mean: Double, variance: Double) { val (_, resamples) = resampleStream.zipWithIndex .take(100000) - .find { case (value, _) => value >= 0 && value < dataset.maxPossibleTables} + .find { case (value, _) => value >= 0 && value < dataset.maxPossibleTables } .map { case (value, index) => (value, index) } .getOrElse((-1, 100000)) From 9f85e1bca20e8042d1b63a7adc4a166ffb7bbae5 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Thu, 1 May 2025 00:05:38 -0700 Subject: [PATCH 28/28] disablecaching --- .../benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala index b043d0e..a996a34 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala @@ -89,6 +89,7 @@ class WeightedWorkloadOnTreeDataset extends Simulation { .baseUrl(cp.baseUrl) .acceptHeader("application/json") .contentTypeHeader("application/json") + .disableCaching // -------------------------------------------------------------------------------- // Create all reader/writer scenarios and prepare them for injection