Skip to content

Commit 374d017

Browse files
committed
Merge branch 'master' into parquet-uuid-schema
2 parents 7896455 + 3293a8c commit 374d017

File tree

15 files changed

+311
-105
lines changed

15 files changed

+311
-105
lines changed

arrow-data/src/data.rs

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -693,15 +693,21 @@ impl ArrayData {
693693
///
694694
/// This can be useful for when interacting with data sent over IPC or FFI, that may
695695
/// not meet the minimum alignment requirements
696+
///
697+
/// This also aligns buffers of children data
696698
pub fn align_buffers(&mut self) {
697699
let layout = layout(&self.data_type);
698700
for (buffer, spec) in self.buffers.iter_mut().zip(&layout.buffers) {
699701
if let BufferSpec::FixedWidth { alignment, .. } = spec {
700702
if buffer.as_ptr().align_offset(*alignment) != 0 {
701-
*buffer = Buffer::from_slice_ref(buffer.as_ref())
703+
*buffer = Buffer::from_slice_ref(buffer.as_ref());
702704
}
703705
}
704706
}
707+
// align children data recursively
708+
for data in self.child_data.iter_mut() {
709+
data.align_buffers()
710+
}
705711
}
706712

707713
/// "cheap" validation of an `ArrayData`. Ensures buffers are
@@ -1961,7 +1967,7 @@ impl From<ArrayData> for ArrayDataBuilder {
19611967
#[cfg(test)]
19621968
mod tests {
19631969
use super::*;
1964-
use arrow_schema::Field;
1970+
use arrow_schema::{Field, Fields};
19651971

19661972
// See arrow/tests/array_data_validation.rs for test of array validation
19671973

@@ -2224,6 +2230,7 @@ mod tests {
22242230
};
22252231
data.validate_full().unwrap();
22262232

2233+
// break alignment in data
22272234
data.buffers[0] = sliced;
22282235
let err = data.validate().unwrap_err();
22292236

@@ -2236,6 +2243,44 @@ mod tests {
22362243
data.validate_full().unwrap();
22372244
}
22382245

2246+
#[test]
2247+
fn test_alignment_struct() {
2248+
let buffer = Buffer::from_vec(vec![1_i32, 2_i32, 3_i32]);
2249+
let sliced = buffer.slice(1);
2250+
2251+
let child_data = ArrayData {
2252+
data_type: DataType::Int32,
2253+
len: 0,
2254+
offset: 0,
2255+
buffers: vec![buffer],
2256+
child_data: vec![],
2257+
nulls: None,
2258+
};
2259+
2260+
let schema = DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, false)]));
2261+
let mut data = ArrayData {
2262+
data_type: schema,
2263+
len: 0,
2264+
offset: 0,
2265+
buffers: vec![],
2266+
child_data: vec![child_data],
2267+
nulls: None,
2268+
};
2269+
data.validate_full().unwrap();
2270+
2271+
// break alignment in child data
2272+
data.child_data[0].buffers[0] = sliced;
2273+
let err = data.validate().unwrap_err();
2274+
2275+
assert_eq!(
2276+
err.to_string(),
2277+
"Invalid argument error: Misaligned buffers[0] in array of type Int32, offset from expected alignment of 4 by 1"
2278+
);
2279+
2280+
data.align_buffers();
2281+
data.validate_full().unwrap();
2282+
}
2283+
22392284
#[test]
22402285
fn test_null_view_types() {
22412286
let array_len = 32;

arrow-flight/gen/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,4 @@ publish = false
3434
# (and checked in) arrow.flight.protocol.rs from changing
3535
proc-macro2 = { version = "=1.0.86", default-features = false }
3636
prost-build = { version = "=0.13.3", default-features = false }
37-
tonic-build = { version = "=0.12.2", default-features = false, features = ["transport", "prost"] }
37+
tonic-build = { version = "=0.12.3", default-features = false, features = ["transport", "prost"] }

arrow-flight/src/arrow.flight.protocol.rs

Lines changed: 37 additions & 33 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

arrow-select/src/take.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,65 @@ pub fn take(
9494
}
9595
}
9696

97+
/// For each [ArrayRef] in the [`Vec<ArrayRef>`], take elements by index and create a new
98+
/// [`Vec<ArrayRef>`] from those indices.
99+
///
100+
/// ```text
101+
/// ┌────────┬────────┐
102+
/// │ │ │ ┌────────┐ ┌────────┬────────┐
103+
/// │ A │ 1 │ │ │ │ │ │
104+
/// ├────────┼────────┤ │ 0 │ │ A │ 1 │
105+
/// │ │ │ ├────────┤ ├────────┼────────┤
106+
/// │ D │ 4 │ │ │ │ │ │
107+
/// ├────────┼────────┤ │ 2 │ take_arrays(values,indices) │ B │ 2 │
108+
/// │ │ │ ├────────┤ ├────────┼────────┤
109+
/// │ B │ 2 │ │ │ ───────────────────────────► │ │ │
110+
/// ├────────┼────────┤ │ 3 │ │ C │ 3 │
111+
/// │ │ │ ├────────┤ ├────────┼────────┤
112+
/// │ C │ 3 │ │ │ │ │ │
113+
/// ├────────┼────────┤ │ 1 │ │ D │ 4 │
114+
/// │ │ │ └────────┘ └────────┼────────┘
115+
/// │ E │ 5 │
116+
/// └────────┴────────┘
117+
/// values arrays indices array result
118+
/// ```
119+
///
120+
/// # Errors
121+
/// This function errors whenever:
122+
/// * An index cannot be casted to `usize` (typically 32 bit architectures)
123+
/// * An index is out of bounds and `options` is set to check bounds.
124+
///
125+
/// # Safety
126+
///
127+
/// When `options` is not set to check bounds, taking indexes after `len` will panic.
128+
///
129+
/// # Examples
130+
/// ```
131+
/// # use std::sync::Arc;
132+
/// # use arrow_array::{StringArray, UInt32Array, cast::AsArray};
133+
/// # use arrow_select::take::{take, take_arrays};
134+
/// let string_values = Arc::new(StringArray::from(vec!["zero", "one", "two"]));
135+
/// let values = Arc::new(UInt32Array::from(vec![0, 1, 2]));
136+
///
137+
/// // Take items at index 2, and 1:
138+
/// let indices = UInt32Array::from(vec![2, 1]);
139+
/// let taken_arrays = take_arrays(&[string_values, values], &indices, None).unwrap();
140+
/// let taken_string = taken_arrays[0].as_string::<i32>();
141+
/// assert_eq!(*taken_string, StringArray::from(vec!["two", "one"]));
142+
/// let taken_values = taken_arrays[1].as_primitive();
143+
/// assert_eq!(*taken_values, UInt32Array::from(vec![2, 1]));
144+
/// ```
145+
pub fn take_arrays(
146+
arrays: &[ArrayRef],
147+
indices: &dyn Array,
148+
options: Option<TakeOptions>,
149+
) -> Result<Vec<ArrayRef>, ArrowError> {
150+
arrays
151+
.iter()
152+
.map(|array| take(array.as_ref(), indices, options.clone()))
153+
.collect()
154+
}
155+
97156
/// Verifies that the non-null values of `indices` are all `< len`
98157
fn check_bounds<T: ArrowPrimitiveType>(
99158
len: usize,

parquet/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ twox-hash = { version = "1.6", default-features = false }
6868
paste = { version = "1.0" }
6969
half = { version = "2.1", default-features = false, features = ["num-traits"] }
7070
sysinfo = { version = "0.31.2", optional = true, default-features = false, features = ["system"] }
71+
crc32fast = { version = "1.4.2", optional = true, default-features = false }
7172

7273
[dev-dependencies]
7374
base64 = { version = "0.22", default-features = false, features = ["std"] }
@@ -117,6 +118,8 @@ object_store = ["dep:object_store", "async"]
117118
zstd = ["dep:zstd", "zstd-sys"]
118119
# Display memory in example/write_parquet.rs
119120
sysinfo = ["dep:sysinfo"]
121+
# Verify 32-bit CRC checksum when decoding parquet pages
122+
crc = ["dep:crc32fast"]
120123

121124
[[example]]
122125
name = "read_parquet"

parquet/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ The `parquet` crate provides the following features which may be enabled in your
6060
- `zstd` (default) - support for parquet using `zstd` compression
6161
- `snap` (default) - support for parquet using `snappy` compression
6262
- `cli` - parquet [CLI tools](https://github.com/apache/arrow-rs/tree/master/parquet/src/bin)
63+
- `crc` - enables functionality to automatically verify checksums of each page (if present) when decoding
6364
- `experimental` - Experimental APIs which may change, even between minor releases
6465

6566
## Parquet Feature Status
@@ -82,4 +83,4 @@ The `parquet` crate provides the following features which may be enabled in your
8283

8384
## License
8485

85-
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0.
86+
Licensed under the Apache License, Version 2.0: <http://www.apache.org/licenses/LICENSE-2.0>.

parquet/src/arrow/async_reader/metadata.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
5252
/// Create a new [`MetadataLoader`] by reading the footer information
5353
///
5454
/// See [`fetch_parquet_metadata`] for the meaning of the individual parameters
55+
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
5556
pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
5657
if file_size < FOOTER_SIZE {
5758
return Err(ParquetError::EOF(format!(
@@ -108,6 +109,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
108109
}
109110

110111
/// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
112+
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
111113
pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
112114
Self {
113115
fetch,
@@ -120,6 +122,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
120122
///
121123
/// * `column_index`: if true will load column index
122124
/// * `offset_index`: if true will load offset index
125+
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
123126
pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> {
124127
if !column_index && !offset_index {
125128
return Ok(());
@@ -226,6 +229,7 @@ where
226229
/// in the first request, instead of 8, and only issue further requests
227230
/// if additional bytes are needed. Providing a `prefetch` hint can therefore
228231
/// significantly reduce the number of `fetch` requests, and consequently latency
232+
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
229233
pub async fn fetch_parquet_metadata<F, Fut>(
230234
fetch: F,
231235
file_size: usize,
@@ -236,10 +240,14 @@ where
236240
Fut: Future<Output = Result<Bytes>> + Send,
237241
{
238242
let fetch = MetadataFetchFn(fetch);
239-
let loader = MetadataLoader::load(fetch, file_size, prefetch).await?;
240-
Ok(loader.finish())
243+
ParquetMetaDataReader::new()
244+
.with_prefetch_hint(prefetch)
245+
.load_and_finish(fetch, file_size)
246+
.await
241247
}
242248

249+
// these tests are all replicated in parquet::file::metadata::reader
250+
#[allow(deprecated)]
243251
#[cfg(test)]
244252
mod tests {
245253
use super::*;

0 commit comments

Comments
 (0)