Skip to content

Rename DataSource and FileSource fields for consistency #14898

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions datafusion-examples/examples/parquet_exec_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
/// or `post_visit` (visit each node after its children/inputs)
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
// If needed match on a specific `ExecutionPlan` node type
if let Some(data_source) = plan.as_any().downcast_ref::<DataSourceExec>() {
let source = data_source.source();
if let Some(file_config) = source.as_any().downcast_ref::<FileScanConfig>() {
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
let data_source = data_source_exec.data_source();
if let Some(file_config) =
data_source.as_any().downcast_ref::<FileScanConfig>()
{
if file_config
.file_source()
.as_any()
Expand All @@ -108,7 +110,7 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
{
self.file_groups = Some(file_config.file_groups.clone());

let metrics = match data_source.metrics() {
let metrics = match data_source_exec.metrics() {
None => return Ok(true),
Some(metrics) => metrics,
};
Expand Down
9 changes: 4 additions & 5 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,16 @@ impl ArrowExec {
}

fn file_scan_config(&self) -> FileScanConfig {
let source = self.inner.source();
source
self.inner
.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap()
.clone()
}

fn json_source(&self) -> JsonSource {
let source = self.file_scan_config();
source
self.file_scan_config()
.file_source()
.as_any()
.downcast_ref::<JsonSource>()
Expand Down Expand Up @@ -130,7 +129,7 @@ impl ArrowExec {
self.base_config.file_groups = file_groups.clone();
let mut file_source = self.file_scan_config();
file_source = file_source.with_file_groups(file_groups);
self.inner = self.inner.with_source(Arc::new(file_source));
self.inner = self.inner.with_data_source(Arc::new(file_source));
self
}
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ impl CsvExec {
}

fn file_scan_config(&self) -> FileScanConfig {
let source = self.inner.source();
source
self.inner
.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap()
Expand Down Expand Up @@ -316,7 +316,7 @@ impl CsvExec {
self.base_config.file_groups = file_groups.clone();
let mut file_source = self.file_scan_config();
file_source = file_source.with_file_groups(file_groups);
self.inner = self.inner.with_source(Arc::new(file_source));
self.inner = self.inner.with_data_source(Arc::new(file_source));
self
}
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ impl NdJsonExec {
}

fn file_scan_config(&self) -> FileScanConfig {
let source = self.inner.source();
source
self.inner
.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap()
Expand Down Expand Up @@ -148,7 +148,7 @@ impl NdJsonExec {
self.base_config.file_groups = file_groups.clone();
let mut file_source = self.file_scan_config();
file_source = file_source.with_file_groups(file_groups);
self.inner = self.inner.with_source(Arc::new(file_source));
self.inner = self.inner.with_data_source(Arc::new(file_source));
self
}
}
Expand Down
19 changes: 9 additions & 10 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,17 +292,16 @@ impl ParquetExec {
}
}
fn file_scan_config(&self) -> FileScanConfig {
let source = self.inner.source();
source
self.inner
.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap()
.clone()
}

fn parquet_source(&self) -> ParquetSource {
let source = self.file_scan_config();
source
self.file_scan_config()
.file_source()
.as_any()
.downcast_ref::<ParquetSource>()
Expand Down Expand Up @@ -343,7 +342,7 @@ impl ParquetExec {
let file_source = self.file_scan_config();
self.inner = self
.inner
.with_source(Arc::new(file_source.with_source(Arc::new(parquet))));
.with_data_source(Arc::new(file_source.with_source(Arc::new(parquet))));
self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
self
}
Expand All @@ -366,7 +365,7 @@ impl ParquetExec {
let file_source = self.file_scan_config();
self.inner = self
.inner
.with_source(Arc::new(file_source.with_source(Arc::new(parquet))));
.with_data_source(Arc::new(file_source.with_source(Arc::new(parquet))));
self.schema_adapter_factory = Some(schema_adapter_factory);
self
}
Expand All @@ -380,7 +379,7 @@ impl ParquetExec {
let file_source = self.file_scan_config();
self.inner = self
.inner
.with_source(Arc::new(file_source.with_source(Arc::new(parquet))));
.with_data_source(Arc::new(file_source.with_source(Arc::new(parquet))));
self.table_parquet_options.global.pushdown_filters = pushdown_filters;
self
}
Expand All @@ -404,7 +403,7 @@ impl ParquetExec {
let file_source = self.file_scan_config();
self.inner = self
.inner
.with_source(Arc::new(file_source.with_source(Arc::new(parquet))));
.with_data_source(Arc::new(file_source.with_source(Arc::new(parquet))));
self.table_parquet_options.global.reorder_filters = reorder_filters;
self
}
Expand Down Expand Up @@ -463,7 +462,7 @@ impl ParquetExec {
) -> Self {
let mut config = self.file_scan_config();
config.file_groups = file_groups;
self.inner = self.inner.with_source(Arc::new(config));
self.inner = self.inner.with_data_source(Arc::new(config));
self
}
}
Expand Down Expand Up @@ -1469,7 +1468,7 @@ mod tests {
])
.build();
let partition_count = parquet_exec
.source()
.data_source()
.output_partitioning()
.partition_count();
assert_eq!(partition_count, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ use object_store::ObjectStore;
/// # fn parquet_exec() -> DataSourceExec { unimplemented!() }
/// // Split a single DataSourceExec into multiple DataSourceExecs, one for each file
/// let exec = parquet_exec();
/// let source = exec.source();
/// let base_config = source.as_any().downcast_ref::<FileScanConfig>().unwrap();
/// let data_source = exec.data_source();
/// let base_config = data_source.as_any().downcast_ref::<FileScanConfig>().unwrap();
/// let existing_file_groups = &base_config.file_groups;
/// let new_execs = existing_file_groups
/// .iter()
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ pub fn partitioned_file_groups(
pub fn partitioned_csv_config(
schema: SchemaRef,
file_groups: Vec<Vec<PartitionedFile>>,
source: Arc<dyn FileSource>,
file_source: Arc<dyn FileSource>,
) -> FileScanConfig {
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, source)
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, file_source)
.with_file_groups(file_groups)
}

Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,18 @@ impl TestParquetFile {
/// Recursively searches for DataSourceExec and returns the metrics
/// on the first one it finds
pub fn parquet_metrics(plan: &Arc<dyn ExecutionPlan>) -> Option<MetricsSet> {
if let Some(maybe_file) = plan.as_any().downcast_ref::<DataSourceExec>() {
let source = maybe_file.source();
if let Some(maybe_parquet) = source.as_any().downcast_ref::<FileScanConfig>()
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
let data_source = data_source_exec.data_source();
if let Some(maybe_parquet) =
data_source.as_any().downcast_ref::<FileScanConfig>()
{
if maybe_parquet
.file_source()
.as_any()
.downcast_ref::<ParquetSource>()
.is_some()
{
return maybe_file.metrics();
return data_source_exec.metrics();
}
}
}
Expand Down
27 changes: 18 additions & 9 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,12 @@ async fn list_files_with_session_level_cache() {
//Session 1 first time list files
assert_eq!(get_list_file_cache_size(&state1), 0);
let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
let data_source = exec1.as_any().downcast_ref::<DataSourceExec>().unwrap();
let source = data_source.source();
let parquet1 = source.as_any().downcast_ref::<FileScanConfig>().unwrap();
let data_source_exec = exec1.as_any().downcast_ref::<DataSourceExec>().unwrap();
let data_source = data_source_exec.data_source();
let parquet1 = data_source
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap();

assert_eq!(get_list_file_cache_size(&state1), 1);
let fg = &parquet1.file_groups;
Expand All @@ -163,9 +166,12 @@ async fn list_files_with_session_level_cache() {
//check session 1 cache result not show in session 2
assert_eq!(get_list_file_cache_size(&state2), 0);
let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
let data_source = exec2.as_any().downcast_ref::<DataSourceExec>().unwrap();
let source = data_source.source();
let parquet2 = source.as_any().downcast_ref::<FileScanConfig>().unwrap();
let data_source_exec = exec2.as_any().downcast_ref::<DataSourceExec>().unwrap();
let data_source = data_source_exec.data_source();
let parquet2 = data_source
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap();

assert_eq!(get_list_file_cache_size(&state2), 1);
let fg2 = &parquet2.file_groups;
Expand All @@ -176,9 +182,12 @@ async fn list_files_with_session_level_cache() {
//check session 1 cache result not show in session 2
assert_eq!(get_list_file_cache_size(&state1), 1);
let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
let data_source = exec3.as_any().downcast_ref::<DataSourceExec>().unwrap();
let source = data_source.source();
let parquet3 = source.as_any().downcast_ref::<FileScanConfig>().unwrap();
let data_source_exec = exec3.as_any().downcast_ref::<DataSourceExec>().unwrap();
let data_source = data_source_exec.data_source();
let parquet3 = data_source
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap();

assert_eq!(get_list_file_cache_size(&state1), 1);
let fg = &parquet3.file_groups;
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/tests/parquet/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,18 @@ impl MetricsFinder {
impl ExecutionPlanVisitor for MetricsFinder {
type Error = std::convert::Infallible;
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
let source = exec.source();
if let Some(file_config) = source.as_any().downcast_ref::<FileScanConfig>() {
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
let data_source = data_source_exec.data_source();
if let Some(file_config) =
data_source.as_any().downcast_ref::<FileScanConfig>()
{
if file_config
.file_source()
.as_any()
.downcast_ref::<ParquetSource>()
.is_some()
{
self.metrics = exec.metrics();
self.metrics = data_source_exec.metrics();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ fn test_memory_after_projection() -> Result<()> {
.as_any()
.downcast_ref::<DataSourceExec>()
.unwrap()
.source()
.data_source()
.as_any()
.downcast_ref::<MemorySourceConfig>()
.unwrap()
Expand Down
9 changes: 6 additions & 3 deletions datafusion/core/tests/sql/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,12 @@ async fn parquet_partition_pruning_filter() -> Result<()> {
Expr::gt(col("id"), lit(1)),
];
let exec = table.scan(&ctx.state(), None, &filters, None).await?;
let data_source = exec.as_any().downcast_ref::<DataSourceExec>().unwrap();
let source = data_source.source();
let file_source = source.as_any().downcast_ref::<FileScanConfig>().unwrap();
let data_source_exec = exec.as_any().downcast_ref::<DataSourceExec>().unwrap();
let data_source = data_source_exec.data_source();
let file_source = data_source
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap();
let parquet_config = file_source
.file_source()
.as_any()
Expand Down
Loading