Skip to content

Commit 980931c

Browse files
authored
refactor: Move various parts of datasource out of core (#14616)
* exploration with cargo machete * readdition * more dependency removals * fix: ci * fix: format * revert unnecessary * First Iteration * fix: forgotten Header * Tweaks
1 parent a0b68d9 commit 980931c

File tree

10 files changed

+564
-476
lines changed

10 files changed

+564
-476
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use object_store::{path::Path, ObjectMeta};
21+
22+
use crate::FileRange;
23+
24+
/// A single file or part of a file that should be read, along with its schema, statistics
25+
pub struct FileMeta {
26+
/// Path for the file (e.g. URL, filesystem path, etc)
27+
pub object_meta: ObjectMeta,
28+
/// An optional file range for a more fine-grained parallel execution
29+
pub range: Option<FileRange>,
30+
/// An optional field for user defined per object metadata
31+
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
32+
/// Size hint for the metadata of this file
33+
pub metadata_size_hint: Option<usize>,
34+
}
35+
36+
impl FileMeta {
37+
/// The full path to the object
38+
pub fn location(&self) -> &Path {
39+
&self.object_meta.location
40+
}
41+
}
42+
43+
impl From<ObjectMeta> for FileMeta {
44+
fn from(object_meta: ObjectMeta) -> Self {
45+
Self {
46+
object_meta,
47+
range: None,
48+
extensions: None,
49+
metadata_size_hint: None,
50+
}
51+
}
52+
}
Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::{borrow::Cow, collections::HashMap, marker::PhantomData, sync::Arc};
19+
20+
use arrow::{
21+
array::{
22+
ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
23+
RecordBatchOptions,
24+
},
25+
buffer::Buffer,
26+
datatypes::{ArrowNativeType, DataType, SchemaRef, UInt16Type},
27+
};
28+
use datafusion_common::{exec_err, Result};
29+
use datafusion_common::{DataFusionError, ScalarValue};
30+
use log::warn;
31+
32+
/// A helper that projects partition columns into the file record batches.
33+
///
34+
/// One interesting trick is the usage of a cache for the key buffers of the partition column
35+
/// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them
36+
/// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches,
37+
/// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count).
38+
pub struct PartitionColumnProjector {
39+
/// An Arrow buffer initialized to zeros that represents the key array of all partition
40+
/// columns (partition columns are materialized by dictionary arrays with only one
41+
/// value in the dictionary, thus all the keys are equal to zero).
42+
key_buffer_cache: ZeroBufferGenerators,
43+
/// Mapping between the indexes in the list of partition columns and the target
44+
/// schema. Sorted by index in the target schema so that we can iterate on it to
45+
/// insert the partition columns in the target record batch.
46+
projected_partition_indexes: Vec<(usize, usize)>,
47+
/// The schema of the table once the projection was applied.
48+
projected_schema: SchemaRef,
49+
}
50+
51+
impl PartitionColumnProjector {
52+
// Create a projector to insert the partitioning columns into batches read from files
53+
// - `projected_schema`: the target schema with both file and partitioning columns
54+
// - `table_partition_cols`: all the partitioning column names
55+
pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
56+
let mut idx_map = HashMap::new();
57+
for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
58+
if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
59+
idx_map.insert(partition_idx, schema_idx);
60+
}
61+
}
62+
63+
let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
64+
projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
65+
66+
Self {
67+
projected_partition_indexes,
68+
key_buffer_cache: Default::default(),
69+
projected_schema,
70+
}
71+
}
72+
73+
// Transform the batch read from the file by inserting the partitioning columns
74+
// to the right positions as deduced from `projected_schema`
75+
// - `file_batch`: batch read from the file, with internal projection applied
76+
// - `partition_values`: the list of partition values, one for each partition column
77+
pub fn project(
78+
&mut self,
79+
file_batch: RecordBatch,
80+
partition_values: &[ScalarValue],
81+
) -> Result<RecordBatch> {
82+
let expected_cols =
83+
self.projected_schema.fields().len() - self.projected_partition_indexes.len();
84+
85+
if file_batch.columns().len() != expected_cols {
86+
return exec_err!(
87+
"Unexpected batch schema from file, expected {} cols but got {}",
88+
expected_cols,
89+
file_batch.columns().len()
90+
);
91+
}
92+
93+
let mut cols = file_batch.columns().to_vec();
94+
for &(pidx, sidx) in &self.projected_partition_indexes {
95+
let p_value =
96+
partition_values
97+
.get(pidx)
98+
.ok_or(DataFusionError::Execution(
99+
"Invalid partitioning found on disk".to_string(),
100+
))?;
101+
102+
let mut partition_value = Cow::Borrowed(p_value);
103+
104+
// check if user forgot to dict-encode the partition value
105+
let field = self.projected_schema.field(sidx);
106+
let expected_data_type = field.data_type();
107+
let actual_data_type = partition_value.data_type();
108+
if let DataType::Dictionary(key_type, _) = expected_data_type {
109+
if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
110+
warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name());
111+
partition_value = Cow::Owned(ScalarValue::Dictionary(
112+
key_type.clone(),
113+
Box::new(partition_value.as_ref().clone()),
114+
));
115+
}
116+
}
117+
118+
cols.insert(
119+
sidx,
120+
create_output_array(
121+
&mut self.key_buffer_cache,
122+
partition_value.as_ref(),
123+
file_batch.num_rows(),
124+
)?,
125+
)
126+
}
127+
128+
RecordBatch::try_new_with_options(
129+
Arc::clone(&self.projected_schema),
130+
cols,
131+
&RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
132+
)
133+
.map_err(Into::into)
134+
}
135+
}
136+
137+
#[derive(Debug, Default)]
138+
struct ZeroBufferGenerators {
139+
gen_i8: ZeroBufferGenerator<i8>,
140+
gen_i16: ZeroBufferGenerator<i16>,
141+
gen_i32: ZeroBufferGenerator<i32>,
142+
gen_i64: ZeroBufferGenerator<i64>,
143+
gen_u8: ZeroBufferGenerator<u8>,
144+
gen_u16: ZeroBufferGenerator<u16>,
145+
gen_u32: ZeroBufferGenerator<u32>,
146+
gen_u64: ZeroBufferGenerator<u64>,
147+
}
148+
149+
/// Generate a arrow [`Buffer`] that contains zero values.
150+
#[derive(Debug, Default)]
151+
struct ZeroBufferGenerator<T>
152+
where
153+
T: ArrowNativeType,
154+
{
155+
cache: Option<Buffer>,
156+
_t: PhantomData<T>,
157+
}
158+
159+
impl<T> ZeroBufferGenerator<T>
160+
where
161+
T: ArrowNativeType,
162+
{
163+
const SIZE: usize = size_of::<T>();
164+
165+
fn get_buffer(&mut self, n_vals: usize) -> Buffer {
166+
match &mut self.cache {
167+
Some(buf) if buf.len() >= n_vals * Self::SIZE => {
168+
buf.slice_with_length(0, n_vals * Self::SIZE)
169+
}
170+
_ => {
171+
let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
172+
key_buffer_builder.advance(n_vals); // keys are all 0
173+
self.cache.insert(key_buffer_builder.finish()).clone()
174+
}
175+
}
176+
}
177+
}
178+
179+
fn create_dict_array<T>(
180+
buffer_gen: &mut ZeroBufferGenerator<T>,
181+
dict_val: &ScalarValue,
182+
len: usize,
183+
data_type: DataType,
184+
) -> Result<ArrayRef>
185+
where
186+
T: ArrowNativeType,
187+
{
188+
let dict_vals = dict_val.to_array()?;
189+
190+
let sliced_key_buffer = buffer_gen.get_buffer(len);
191+
192+
// assemble pieces together
193+
let mut builder = ArrayData::builder(data_type)
194+
.len(len)
195+
.add_buffer(sliced_key_buffer);
196+
builder = builder.add_child_data(dict_vals.to_data());
197+
Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
198+
builder.build().unwrap(),
199+
)))
200+
}
201+
202+
fn create_output_array(
203+
key_buffer_cache: &mut ZeroBufferGenerators,
204+
val: &ScalarValue,
205+
len: usize,
206+
) -> Result<ArrayRef> {
207+
if let ScalarValue::Dictionary(key_type, dict_val) = &val {
208+
match key_type.as_ref() {
209+
DataType::Int8 => {
210+
return create_dict_array(
211+
&mut key_buffer_cache.gen_i8,
212+
dict_val,
213+
len,
214+
val.data_type(),
215+
);
216+
}
217+
DataType::Int16 => {
218+
return create_dict_array(
219+
&mut key_buffer_cache.gen_i16,
220+
dict_val,
221+
len,
222+
val.data_type(),
223+
);
224+
}
225+
DataType::Int32 => {
226+
return create_dict_array(
227+
&mut key_buffer_cache.gen_i32,
228+
dict_val,
229+
len,
230+
val.data_type(),
231+
);
232+
}
233+
DataType::Int64 => {
234+
return create_dict_array(
235+
&mut key_buffer_cache.gen_i64,
236+
dict_val,
237+
len,
238+
val.data_type(),
239+
);
240+
}
241+
DataType::UInt8 => {
242+
return create_dict_array(
243+
&mut key_buffer_cache.gen_u8,
244+
dict_val,
245+
len,
246+
val.data_type(),
247+
);
248+
}
249+
DataType::UInt16 => {
250+
return create_dict_array(
251+
&mut key_buffer_cache.gen_u16,
252+
dict_val,
253+
len,
254+
val.data_type(),
255+
);
256+
}
257+
DataType::UInt32 => {
258+
return create_dict_array(
259+
&mut key_buffer_cache.gen_u32,
260+
dict_val,
261+
len,
262+
val.data_type(),
263+
);
264+
}
265+
DataType::UInt64 => {
266+
return create_dict_array(
267+
&mut key_buffer_cache.gen_u64,
268+
dict_val,
269+
len,
270+
val.data_type(),
271+
);
272+
}
273+
_ => {}
274+
}
275+
}
276+
277+
val.to_array_of_size(len)
278+
}

0 commit comments

Comments
 (0)