Skip to content

Commit fe53eaf

Browse files
authored
fix: Limit together with pushdown_filters (#13788)
* fix: Limit together with pushdown_filters * Fix format * Address new comments * Fix testing case to hit the problem
1 parent 59410ea commit fe53eaf

File tree

2 files changed

+79
-1
lines changed

2 files changed

+79
-1
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -843,8 +843,13 @@ impl TableProvider for ListingTable {
843843
});
844844
// TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here?
845845
let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
846+
847+
// We should not limit the number of partitioned files to scan if there are filters and limit
848+
// at the same time. This is because the limit should be applied after the filters are applied.
849+
let statistic_file_limit = if filters.is_empty() { limit } else { None };
850+
846851
let (mut partitioned_file_lists, statistics) = self
847-
.list_files_for_scan(session_state, &partition_filters, limit)
852+
.list_files_for_scan(session_state, &partition_filters, statistic_file_limit)
848853
.await?;
849854

850855
// if no files need to be read, return an `EmptyExec`

datafusion/sqllogictest/test_files/push_down_filter.slt

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,76 @@ logical_plan
122122

123123
statement ok
124124
drop table d;
125+
126+
127+
# Test push down filter with limit for parquet
128+
statement ok
129+
set datafusion.execution.parquet.pushdown_filters = true;
130+
131+
# this one is also required to make DF skip second file due to "sufficient" amount of rows
132+
statement ok
133+
set datafusion.execution.collect_statistics = true;
134+
135+
# Create a table as a data source
136+
statement ok
137+
CREATE TABLE src_table (
138+
part_key INT,
139+
value INT
140+
) AS VALUES(1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100), (3, 4), (3, 5), (3, 6);
141+
142+
143+
# There will be more than 2 records filtered from the table to check that `limit 1` actually applied.
144+
# Setup 3 files, i.e., as many as there are partitions:
145+
146+
# File 1:
147+
query I
148+
COPY (SELECT * FROM src_table where part_key = 1)
149+
TO 'test_files/scratch/parquet/test_filter_with_limit/part-0.parquet'
150+
STORED AS PARQUET;
151+
----
152+
3
153+
154+
# File 2:
155+
query I
156+
COPY (SELECT * FROM src_table where part_key = 2)
157+
TO 'test_files/scratch/parquet/test_filter_with_limit/part-1.parquet'
158+
STORED AS PARQUET;
159+
----
160+
4
161+
162+
# File 3:
163+
query I
164+
COPY (SELECT * FROM src_table where part_key = 3)
165+
TO 'test_files/scratch/parquet/test_filter_with_limit/part-2.parquet'
166+
STORED AS PARQUET;
167+
----
168+
3
169+
170+
statement ok
171+
CREATE EXTERNAL TABLE test_filter_with_limit
172+
(
173+
part_key INT,
174+
value INT
175+
)
176+
STORED AS PARQUET
177+
LOCATION 'test_files/scratch/parquet/test_filter_with_limit/';
178+
179+
query TT
180+
explain select * from test_filter_with_limit where value = 2 limit 1;
181+
----
182+
logical_plan
183+
01)Limit: skip=0, fetch=1
184+
02)--TableScan: test_filter_with_limit projection=[part_key, value], full_filters=[test_filter_with_limit.value = Int32(2)], fetch=1
185+
186+
query II
187+
select * from test_filter_with_limit where value = 2 limit 1;
188+
----
189+
2 2
190+
191+
# Tear down test_filter_with_limit table:
192+
statement ok
193+
DROP TABLE test_filter_with_limit;
194+
195+
# Tear down src_table table:
196+
statement ok
197+
DROP TABLE src_table;

0 commit comments

Comments
 (0)