Skip to content

Commit 2c832b4

Browse files
authored
feat: Require offHeap memory to be enabled (always use unified memory) (apache#1062)
* Require offHeap memory * remove unused import * use off heap memory in stability tests * reorder imports
1 parent f3da844 commit 2c832b4

File tree

5 files changed

+16
-61
lines changed

5 files changed

+16
-61
lines changed

docs/source/user-guide/tuning.md

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,40 +23,12 @@ Comet provides some tuning options to help you get the best performance from you
2323

2424
## Memory Tuning
2525

26-
Comet provides two options for memory management:
27-
28-
- **Unified Memory Management** shares an off-heap memory pool between Spark and Comet. This is the recommended option.
29-
- **Native Memory Management** leverages DataFusion's memory management for the native plans and allocates memory independently of Spark.
30-
31-
### Unified Memory Management
32-
33-
This option is automatically enabled when `spark.memory.offHeap.enabled=true`.
26+
Comet shares an off-heap memory pool between Spark and Comet. This requires setting `spark.memory.offHeap.enabled=true`.
27+
If this setting is not enabled, Comet will not accelerate queries and will fall back to Spark.
3428

3529
Each executor will have a single memory pool which will be shared by all native plans being executed within that
3630
process, and by Spark itself. The size of the pool is specified by `spark.memory.offHeap.size`.
3731

38-
### Native Memory Management
39-
40-
This option is automatically enabled when `spark.memory.offHeap.enabled=false`.
41-
42-
Each native plan has a dedicated memory pool.
43-
44-
By default, the size of each pool is `spark.comet.memory.overhead.factor * spark.executor.memory`. The default value
45-
for `spark.comet.memory.overhead.factor` is `0.2`.
46-
47-
It is important to take executor concurrency into account. The maximum number of concurrent plans in an executor can
48-
be calculated with `spark.executor.cores / spark.task.cpus`.
49-
50-
For example, if the executor can execute 4 plans concurrently, then the total amount of memory allocated will be
51-
`4 * spark.comet.memory.overhead.factor * spark.executor.memory`.
52-
53-
It is also possible to set `spark.comet.memoryOverhead` to the desired size for each pool, rather than calculating
54-
it based on `spark.comet.memory.overhead.factor`.
55-
56-
If both `spark.comet.memoryOverhead` and `spark.comet.memory.overhead.factor` are set, the former will be used.
57-
58-
Comet will allocate at least `spark.comet.memory.overhead.min` memory per pool.
59-
6032
### Determining How Much Memory to Allocate
6133

6234
Generally, increasing memory overhead will improve query performance, especially for queries containing joins and

native/core/src/execution/jni_api.rs

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -202,27 +202,9 @@ fn prepare_datafusion_session_context(
202202

203203
let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);
204204

205-
// Check if we are using unified memory manager integrated with Spark. Default to false if not
206-
// set.
207-
let use_unified_memory_manager = parse_bool(conf, "use_unified_memory_manager")?;
208-
209-
if use_unified_memory_manager {
210-
// Set Comet memory pool for native
211-
let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
212-
rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
213-
} else {
214-
// Use the memory pool from DF
215-
if conf.contains_key("memory_limit") {
216-
let memory_limit = conf.get("memory_limit").unwrap().parse::<usize>()?;
217-
let memory_fraction = conf
218-
.get("memory_fraction")
219-
.ok_or(CometError::Internal(
220-
"Config 'memory_fraction' is not specified from Comet JVM side".to_string(),
221-
))?
222-
.parse::<f64>()?;
223-
rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction)
224-
}
225-
}
205+
// Set Comet memory pool for native
206+
let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
207+
rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
226208

227209
// Get Datafusion configuration from Spark Execution context
228210
// can be configured in Comet Spark JVM using Spark --conf parameters

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark._
2323
import org.apache.spark.sql.comet.CometMetricNode
2424
import org.apache.spark.sql.vectorized._
2525

26-
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
26+
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
2727
import org.apache.comet.vector.NativeUtil
2828

2929
/**
@@ -75,15 +75,6 @@ class CometExecIterator(
7575
val result = new java.util.HashMap[String, String]()
7676
val conf = SparkEnv.get.conf
7777

78-
val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf)
79-
// Only enable unified memory manager when off-heap mode is enabled. Otherwise,
80-
// we'll use the built-in memory pool from DF, and initializes with `memory_limit`
81-
// and `memory_fraction` below.
82-
result.put(
83-
"use_unified_memory_manager",
84-
String.valueOf(conf.get("spark.memory.offHeap.enabled", "false")))
85-
result.put("memory_limit", String.valueOf(maxMemory))
86-
result.put("memory_fraction", String.valueOf(COMET_EXEC_MEMORY_FRACTION.get()))
8778
result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get()))
8879
result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get()))
8980
result.put("explain_native", String.valueOf(COMET_EXPLAIN_NATIVE_ENABLED.get()))

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -915,6 +915,13 @@ class CometSparkSessionExtensions
915915
}
916916

917917
override def apply(plan: SparkPlan): SparkPlan = {
918+
919+
// Comet required off-heap memory to be enabled
920+
if ("true" != conf.getConfString("spark.memory.offHeap.enabled", "false")) {
921+
logInfo("Comet extension disabled because spark.memory.offHeap.enabled=false")
922+
return plan
923+
}
924+
918925
// DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is
919926
// enabled.
920927
if (isANSIEnabled(conf)) {

spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import scala.collection.mutable
2626

2727
import org.apache.commons.io.FileUtils
2828
import org.apache.spark.SparkContext
29+
import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE}
2930
import org.apache.spark.sql.TPCDSBase
3031
import org.apache.spark.sql.catalyst.expressions.AttributeSet
3132
import org.apache.spark.sql.catalyst.util.resourceToString
@@ -286,6 +287,8 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa
286287
conf.set(
287288
"spark.shuffle.manager",
288289
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
290+
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
291+
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
289292
conf.set(CometConf.COMET_ENABLED.key, "true")
290293
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
291294
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")

0 commit comments

Comments
 (0)