|
| 1 | +// Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +// or more contributor license agreements. See the NOTICE file |
| 3 | +// distributed with this work for additional information |
| 4 | +// regarding copyright ownership. The ASF licenses this file |
| 5 | +// to you under the Apache License, Version 2.0 (the |
| 6 | +// "License"); you may not use this file except in compliance |
| 7 | +// with the License. You may obtain a copy of the License at |
| 8 | +// |
| 9 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +// |
| 11 | +// Unless required by applicable law or agreed to in writing, |
| 12 | +// software distributed under the License is distributed on an |
| 13 | +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +// KIND, either express or implied. See the License for the |
| 15 | +// specific language governing permissions and limitations |
| 16 | +// under the License. |
| 17 | + |
| 18 | +use std::sync::Arc; |
| 19 | + |
| 20 | +use arrow_array::{Array, RecordBatch, StringArray}; |
| 21 | +use arrow_schema::{DataType, Field, Schema}; |
| 22 | +use bytes::{BufMut, Bytes, BytesMut}; |
| 23 | +use datafusion::{ |
| 24 | + datasource::{ |
| 25 | + listing::PartitionedFile, |
| 26 | + physical_plan::{parquet::ParquetExecBuilder, FileScanConfig}, |
| 27 | + }, |
| 28 | + prelude::*, |
| 29 | +}; |
| 30 | +use datafusion_common::DFSchema; |
| 31 | +use datafusion_execution::object_store::ObjectStoreUrl; |
| 32 | +use datafusion_physical_expr::PhysicalExpr; |
| 33 | +use datafusion_physical_plan::{collect, filter::FilterExec, ExecutionPlan}; |
| 34 | +use itertools::Itertools; |
| 35 | +use object_store::{memory::InMemory, path::Path, ObjectStore, PutPayload}; |
| 36 | +use parquet::{ |
| 37 | + arrow::ArrowWriter, |
| 38 | + file::properties::{EnabledStatistics, WriterProperties}, |
| 39 | +}; |
| 40 | +use rand::seq::SliceRandom; |
| 41 | +use url::Url; |
| 42 | + |
| 43 | +#[tokio::test] |
| 44 | +async fn test_fuzz_utf8() { |
| 45 | + // Fuzz testing for UTF8 predicate pruning |
| 46 | + // The basic idea is that query results should always be the same with or without stats/pruning |
| 47 | + // If we get this right we at least guarantee that there are no incorrect results |
| 48 | + // There may still be suboptimal pruning or stats but that's something we can try to catch |
| 49 | + // with more targeted tests. |
| 50 | + |
| 51 | + // Since we know where the edge cases might be we don't do random black box fuzzing. |
| 52 | + // Instead we fuzz on specific pre-defined axis: |
| 53 | + // |
| 54 | + // - Which characters are in each value. We want to make sure to include characters that when |
| 55 | + // incremented, truncated or otherwise manipulated might cause issues. |
| 56 | + // - The values in each row group. This impacts which min/max stats are generated for each rg. |
| 57 | + // We'll generate combinations of the characters with lengths ranging from 1 to 4. |
| 58 | + // - Truncation of statistics to 1, 2 or 3 characters as well as no truncation. |
| 59 | + |
| 60 | + let mut rng = rand::thread_rng(); |
| 61 | + |
| 62 | + let characters = [ |
| 63 | + "z", |
| 64 | + "0", |
| 65 | + "~", |
| 66 | + "ß", |
| 67 | + "℣", |
| 68 | + "%", // this one is useful for like/not like tests since it will result in randomly inserted wildcards |
| 69 | + "_", // this one is useful for like/not like tests since it will result in randomly inserted wildcards |
| 70 | + "\u{7F}", |
| 71 | + "\u{7FF}", |
| 72 | + "\u{FF}", |
| 73 | + "\u{10FFFF}", |
| 74 | + "\u{D7FF}", |
| 75 | + "\u{FDCF}", |
| 76 | + // null character |
| 77 | + "\u{0}", |
| 78 | + ]; |
| 79 | + |
| 80 | + let value_lengths = [1, 2, 3]; |
| 81 | + |
| 82 | + // generate all combinations of characters with lengths ranging from 1 to 4 |
| 83 | + let mut values = vec![]; |
| 84 | + for length in &value_lengths { |
| 85 | + values.extend( |
| 86 | + characters |
| 87 | + .iter() |
| 88 | + .cloned() |
| 89 | + .combinations(*length) |
| 90 | + // now get all permutations of each combination |
| 91 | + .flat_map(|c| c.into_iter().permutations(*length)) |
| 92 | + // and join them into strings |
| 93 | + .map(|c| c.join("")), |
| 94 | + ); |
| 95 | + } |
| 96 | + |
| 97 | + println!("Generated {} values", values.len()); |
| 98 | + |
| 99 | + // randomly pick 100 values |
| 100 | + values.shuffle(&mut rng); |
| 101 | + values.truncate(100); |
| 102 | + |
| 103 | + let mut row_groups = vec![]; |
| 104 | + // generate all combinations of values for row groups (1 or 2 values per rg, more is unessecarry since we only get min/max stats out) |
| 105 | + for rg_length in [1, 2] { |
| 106 | + row_groups.extend(values.iter().cloned().combinations(rg_length)); |
| 107 | + } |
| 108 | + |
| 109 | + println!("Generated {} row groups", row_groups.len()); |
| 110 | + |
| 111 | + // Randomly pick 100 row groups (combinations of said values) |
| 112 | + row_groups.shuffle(&mut rng); |
| 113 | + row_groups.truncate(100); |
| 114 | + |
| 115 | + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)])); |
| 116 | + let df_schema = DFSchema::try_from(schema.clone()).unwrap(); |
| 117 | + |
| 118 | + let store = InMemory::new(); |
| 119 | + let mut files = vec![]; |
| 120 | + for (idx, truncation_length) in [Some(1), Some(2), None].iter().enumerate() { |
| 121 | + // parquet files only support 32767 row groups per file, so chunk up into multiple files so we don't error if running on a large number of row groups |
| 122 | + for (rg_idx, row_groups) in row_groups.chunks(32766).enumerate() { |
| 123 | + let buf = write_parquet_file( |
| 124 | + *truncation_length, |
| 125 | + schema.clone(), |
| 126 | + row_groups.to_vec(), |
| 127 | + ) |
| 128 | + .await; |
| 129 | + let filename = format!("test_fuzz_utf8_{idx}_{rg_idx}.parquet"); |
| 130 | + files.push((filename.clone(), buf.len())); |
| 131 | + let payload = PutPayload::from(buf); |
| 132 | + let path = Path::from(filename); |
| 133 | + store.put(&path, payload).await.unwrap(); |
| 134 | + } |
| 135 | + } |
| 136 | + |
| 137 | + println!("Generated {} parquet files", files.len()); |
| 138 | + |
| 139 | + let ctx = SessionContext::new(); |
| 140 | + |
| 141 | + ctx.register_object_store(&Url::parse("memory://").unwrap(), Arc::new(store)); |
| 142 | + |
| 143 | + let mut predicates = vec![]; |
| 144 | + for value in values { |
| 145 | + predicates.push(col("a").eq(lit(value.clone()))); |
| 146 | + predicates.push(col("a").not_eq(lit(value.clone()))); |
| 147 | + predicates.push(col("a").lt(lit(value.clone()))); |
| 148 | + predicates.push(col("a").lt_eq(lit(value.clone()))); |
| 149 | + predicates.push(col("a").gt(lit(value.clone()))); |
| 150 | + predicates.push(col("a").gt_eq(lit(value.clone()))); |
| 151 | + predicates.push(col("a").like(lit(value.clone()))); |
| 152 | + predicates.push(col("a").not_like(lit(value.clone()))); |
| 153 | + predicates.push(col("a").like(lit(format!("%{}", value.clone())))); |
| 154 | + predicates.push(col("a").like(lit(format!("{}%", value.clone())))); |
| 155 | + predicates.push(col("a").not_like(lit(format!("%{}", value.clone())))); |
| 156 | + predicates.push(col("a").not_like(lit(format!("{}%", value.clone())))); |
| 157 | + } |
| 158 | + |
| 159 | + for predicate in predicates { |
| 160 | + println!("Testing predicate {:?}", predicate); |
| 161 | + let phys_expr_predicate = ctx |
| 162 | + .create_physical_expr(predicate.clone(), &df_schema) |
| 163 | + .unwrap(); |
| 164 | + let expected = execute_with_predicate( |
| 165 | + &files, |
| 166 | + phys_expr_predicate.clone(), |
| 167 | + false, |
| 168 | + schema.clone(), |
| 169 | + &ctx, |
| 170 | + ) |
| 171 | + .await; |
| 172 | + let with_pruning = execute_with_predicate( |
| 173 | + &files, |
| 174 | + phys_expr_predicate, |
| 175 | + true, |
| 176 | + schema.clone(), |
| 177 | + &ctx, |
| 178 | + ) |
| 179 | + .await; |
| 180 | + assert_eq!(expected, with_pruning); |
| 181 | + } |
| 182 | +} |
| 183 | + |
| 184 | +async fn execute_with_predicate( |
| 185 | + files: &[(String, usize)], |
| 186 | + predicate: Arc<dyn PhysicalExpr>, |
| 187 | + prune_stats: bool, |
| 188 | + schema: Arc<Schema>, |
| 189 | + ctx: &SessionContext, |
| 190 | +) -> Vec<String> { |
| 191 | + let scan = |
| 192 | + FileScanConfig::new(ObjectStoreUrl::parse("memory://").unwrap(), schema.clone()) |
| 193 | + .with_file_group( |
| 194 | + files |
| 195 | + .iter() |
| 196 | + .map(|(path, size)| PartitionedFile::new(path.clone(), *size as u64)) |
| 197 | + .collect(), |
| 198 | + ); |
| 199 | + let mut builder = ParquetExecBuilder::new(scan); |
| 200 | + if prune_stats { |
| 201 | + builder = builder.with_predicate(predicate.clone()) |
| 202 | + } |
| 203 | + let exec = Arc::new(builder.build()) as Arc<dyn ExecutionPlan>; |
| 204 | + let exec = |
| 205 | + Arc::new(FilterExec::try_new(predicate, exec).unwrap()) as Arc<dyn ExecutionPlan>; |
| 206 | + |
| 207 | + let batches = collect(exec, ctx.task_ctx()).await.unwrap(); |
| 208 | + let mut values = vec![]; |
| 209 | + for batch in batches { |
| 210 | + let column = batch |
| 211 | + .column(0) |
| 212 | + .as_any() |
| 213 | + .downcast_ref::<StringArray>() |
| 214 | + .unwrap(); |
| 215 | + for i in 0..column.len() { |
| 216 | + values.push(column.value(i).to_string()); |
| 217 | + } |
| 218 | + } |
| 219 | + values |
| 220 | +} |
| 221 | + |
| 222 | +async fn write_parquet_file( |
| 223 | + truncation_length: Option<usize>, |
| 224 | + schema: Arc<Schema>, |
| 225 | + row_groups: Vec<Vec<String>>, |
| 226 | +) -> Bytes { |
| 227 | + let mut buf = BytesMut::new().writer(); |
| 228 | + let mut props = WriterProperties::builder(); |
| 229 | + if let Some(truncation_length) = truncation_length { |
| 230 | + props = props.set_max_statistics_size(truncation_length); |
| 231 | + } |
| 232 | + props = props.set_statistics_enabled(EnabledStatistics::Chunk); // row group level |
| 233 | + let props = props.build(); |
| 234 | + { |
| 235 | + let mut writer = |
| 236 | + ArrowWriter::try_new(&mut buf, schema.clone(), Some(props)).unwrap(); |
| 237 | + for rg_values in row_groups.iter() { |
| 238 | + let arr = StringArray::from_iter_values(rg_values.iter()); |
| 239 | + let batch = |
| 240 | + RecordBatch::try_new(schema.clone(), vec![Arc::new(arr)]).unwrap(); |
| 241 | + writer.write(&batch).unwrap(); |
| 242 | + writer.flush().unwrap(); // finishes the current row group and starts a new one |
| 243 | + } |
| 244 | + writer.finish().unwrap(); |
| 245 | + } |
| 246 | + buf.into_inner().freeze() |
| 247 | +} |
0 commit comments