diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 4504c81daa06..50cafc4d9fa1 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -123,6 +123,7 @@ mod tests { Arc::new(ParquetExec::new( vec![ParquetPartition::new( vec!["x".to_string()], + vec![schema.clone()], Statistics::default(), )], schema, @@ -161,6 +162,7 @@ mod tests { Arc::new(ParquetExec::new( vec![ParquetPartition::new( vec!["x".to_string()], + vec![schema.clone()], Statistics::default(), )], schema, diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index ff8bb5b32678..094544699920 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -96,6 +96,8 @@ pub struct ParquetExec { pub struct ParquetPartition { /// The Parquet filename for this partition pub filenames: Vec, + /// Unique schemas in this partition + pub(crate) schemas: Vec, /// Statistics for this partition pub statistics: Statistics, /// Execution metrics @@ -167,235 +169,40 @@ impl ParquetExec { filenames, projection, predicate, limit); // build a list of Parquet partitions with statistics and gather all unique schemas // used in this data set - let mut schemas: Vec = vec![]; + let mut schemas: Vec = vec![]; let mut partitions = Vec::with_capacity(max_concurrency); let filenames: Vec = filenames.iter().map(|s| s.to_string()).collect(); let chunks = split_files(&filenames, max_concurrency); - let mut num_rows = 0; - let mut num_fields = 0; - let mut fields = Vec::new(); - let mut total_byte_size = 0; - let mut null_counts = Vec::new(); - let mut max_values: Vec> = Vec::new(); - let mut min_values: Vec> = Vec::new(); - let mut limit_exhausted = false; - for chunk in chunks { - let mut filenames: Vec = - chunk.iter().map(|x| x.to_string()).collect(); - let mut total_files = 0; - for filename in &filenames { - total_files += 1; - let file = File::open(filename)?; - let file_reader = Arc::new(SerializedFileReader::new(file)?); - let mut arrow_reader = ParquetFileArrowReader::new(file_reader); - let meta_data = arrow_reader.get_metadata(); - // collect all the unique schemas in this data set - let schema = arrow_reader.get_schema()?; - if schemas.is_empty() || schema != schemas[0] { - fields = schema.fields().to_vec(); - num_fields = schema.fields().len(); - null_counts = vec![0; num_fields]; - max_values = schema - .fields() - .iter() - .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - min_values = schema - .fields() - .iter() - .map(|field| MinAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - schemas.push(schema); - } - for row_group_meta in meta_data.row_groups() { - num_rows += row_group_meta.num_rows(); - total_byte_size += row_group_meta.total_byte_size(); - - // Currently assumes every Parquet file has same schema - // https://issues.apache.org/jira/browse/ARROW-11017 - let columns_null_counts = row_group_meta - .columns() - .iter() - .flat_map(|c| c.statistics().map(|stats| stats.null_count())); - - for (i, cnt) in columns_null_counts.enumerate() { - null_counts[i] += cnt - } - - for (i, column) in row_group_meta.columns().iter().enumerate() { - if let Some(stat) = column.statistics() { - match stat { - ParquetStatistics::Boolean(s) => { - if let DataType::Boolean = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Boolean(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Boolean(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Int32(s) => { - if let DataType::Int32 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Int32(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Int32(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Int64(s) => { - if let DataType::Int64 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Int64(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Int64(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Float(s) => { - if let DataType::Float32 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Float32(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Float32(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Double(s) => { - if let DataType::Float64 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Float64(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Float64(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - _ => {} - } - } - } + // collect statistics for all partitions - we should really do this in parallel threads + for chunk in chunks { + let filenames: Vec = chunk.iter().map(|x| x.to_string()).collect(); + partitions.push(ParquetPartition::try_from_files(filenames, limit)?); + } - if limit.map(|x| num_rows >= x as i64).unwrap_or(false) { - limit_exhausted = true; + // apply limit across all partitions + let partitions = match limit { + Some(l) => { + let mut partitions_with_limit = vec![]; + let mut total_rows = 0; + for part in partitions { + total_rows += part.statistics.num_rows.unwrap_or(0); + if total_rows >= l { break; } + partitions_with_limit.push(part) } + partitions_with_limit } - let column_stats = (0..num_fields) - .map(|i| { - let max_value = match &max_values[i] { - Some(max_value) => max_value.evaluate().ok(), - None => None, - }; - let min_value = match &min_values[i] { - Some(min_value) => min_value.evaluate().ok(), - None => None, - }; - ColumnStatistics { - null_count: Some(null_counts[i] as usize), - max_value, - min_value, - distinct_count: None, - } - }) - .collect(); - - let statistics = Statistics { - num_rows: Some(num_rows as usize), - total_byte_size: Some(total_byte_size as usize), - column_statistics: Some(column_stats), - }; - // remove files that are not needed in case of limit - filenames.truncate(total_files); - partitions.push(ParquetPartition::new(filenames, statistics)); - if limit_exhausted { - break; + None => partitions, + }; + + // collect unique schemas across all partitions + for part in &partitions { + for schema in &part.schemas { + if schemas.is_empty() || schema.as_ref() != schemas[0].as_ref() { + schemas.push(schema.clone()) + } } } @@ -409,7 +216,7 @@ impl ParquetExec { schemas.len() ))); } - let schema = Arc::new(schemas.pop().unwrap()); + let schema = schemas.pop().unwrap(); let metrics = ParquetExecMetrics::new(); let predicate_builder = predicate.and_then(|predicate_expr| { @@ -582,14 +389,244 @@ impl ParquetExec { impl ParquetPartition { /// Create a new parquet partition - pub fn new(filenames: Vec, statistics: Statistics) -> Self { + pub fn new( + filenames: Vec, + schemas: Vec, + statistics: Statistics, + ) -> Self { Self { filenames, + schemas, statistics, metrics: ParquetPartitionMetrics::new(), } } + /// Create a new parquet partition by scanning a set of files + pub fn try_from_files(filenames: Vec, limit: Option) -> Result { + let mut num_rows = 0; + let mut num_fields = 0; + let mut fields = Vec::new(); + let mut total_byte_size = 0; + let mut null_counts = Vec::new(); + let mut max_values: Vec> = Vec::new(); + let mut min_values: Vec> = Vec::new(); + let mut total_files = 0; + let mut schemas: Vec = vec![]; + for filename in &filenames { + total_files += 1; + let file = File::open(filename)?; + let file_reader = Arc::new(SerializedFileReader::new(file)?); + let mut arrow_reader = ParquetFileArrowReader::new(file_reader); + let meta_data = arrow_reader.get_metadata(); + // collect all the unique schemas in this data set + let schema = arrow_reader.get_schema()?; + if schemas.is_empty() || &schema != schemas[0].as_ref() { + fields = schema.fields().to_vec(); + num_fields = schema.fields().len(); + null_counts = vec![0; num_fields]; + max_values = schema + .fields() + .iter() + .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) + .collect::>(); + min_values = schema + .fields() + .iter() + .map(|field| MinAccumulator::try_new(field.data_type()).ok()) + .collect::>(); + schemas.push(Arc::new(schema)); + } + + for row_group_meta in meta_data.row_groups() { + num_rows += row_group_meta.num_rows(); + total_byte_size += row_group_meta.total_byte_size(); + + // Currently assumes every Parquet file has same schema + // https://issues.apache.org/jira/browse/ARROW-11017 + let columns_null_counts = row_group_meta + .columns() + .iter() + .flat_map(|c| c.statistics().map(|stats| stats.null_count())); + + for (i, cnt) in columns_null_counts.enumerate() { + null_counts[i] += cnt + } + + for (i, column) in row_group_meta.columns().iter().enumerate() { + if let Some(stat) = column.statistics() { + match stat { + ParquetStatistics::Boolean(s) => { + if let DataType::Boolean = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value.update(&[ + ScalarValue::Boolean(Some(*s.max())), + ]) { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value.update(&[ + ScalarValue::Boolean(Some(*s.min())), + ]) { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Int32(s) => { + if let DataType::Int32 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value.update(&[ScalarValue::Int32( + Some(*s.max()), + )]) { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value.update(&[ScalarValue::Int32( + Some(*s.min()), + )]) { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Int64(s) => { + if let DataType::Int64 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value.update(&[ScalarValue::Int64( + Some(*s.max()), + )]) { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value.update(&[ScalarValue::Int64( + Some(*s.min()), + )]) { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Float(s) => { + if let DataType::Float32 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value.update(&[ + ScalarValue::Float32(Some(*s.max())), + ]) { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value.update(&[ + ScalarValue::Float32(Some(*s.min())), + ]) { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Double(s) => { + if let DataType::Float64 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value.update(&[ + ScalarValue::Float64(Some(*s.max())), + ]) { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value.update(&[ + ScalarValue::Float64(Some(*s.min())), + ]) { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + _ => {} + } + } + } + + if limit.map(|x| num_rows >= x as i64).unwrap_or(false) { + break; + } + } + } + let column_stats = (0..num_fields) + .map(|i| { + let max_value = match &max_values[i] { + Some(max_value) => max_value.evaluate().ok(), + None => None, + }; + let min_value = match &min_values[i] { + Some(min_value) => min_value.evaluate().ok(), + None => None, + }; + ColumnStatistics { + null_count: Some(null_counts[i] as usize), + max_value, + min_value, + distinct_count: None, + } + }) + .collect(); + + let statistics = Statistics { + num_rows: Some(num_rows as usize), + total_byte_size: Some(total_byte_size as usize), + column_statistics: Some(column_stats), + }; + // remove files that are not needed in case of limit + let mut filenames = filenames; + filenames.truncate(total_files); + + Ok(Self::new(filenames.to_vec(), schemas, statistics)) + } + /// The Parquet filename for this partition pub fn filenames(&self) -> &[String] { &self.filenames