Skip to content

Commit 39aa15e

Browse files
Change schema_infer_max_rec config to use Option<usize> rather than usize (#13250)
* Make schema_infer_max_rec an Option * Add lifetime parameter to CSV and compression BoxStreams
1 parent 6612d7c commit 39aa15e

File tree

13 files changed

+66
-55
lines changed

13 files changed

+66
-55
lines changed

datafusion/common/src/config.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1648,7 +1648,7 @@ config_namespace! {
16481648
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
16491649
pub newlines_in_values: Option<bool>, default = None
16501650
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
1651-
pub schema_infer_max_rec: usize, default = 100
1651+
pub schema_infer_max_rec: Option<usize>, default = None
16521652
pub date_format: Option<String>, default = None
16531653
pub datetime_format: Option<String>, default = None
16541654
pub timestamp_format: Option<String>, default = None
@@ -1673,7 +1673,7 @@ impl CsvOptions {
16731673
/// Set a limit in terms of records to scan to infer the schema
16741674
/// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
16751675
pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
1676-
self.schema_infer_max_rec = max_rec;
1676+
self.schema_infer_max_rec = Some(max_rec);
16771677
self
16781678
}
16791679

@@ -1773,7 +1773,7 @@ config_namespace! {
17731773
/// Options controlling JSON format
17741774
pub struct JsonOptions {
17751775
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
1776-
pub schema_infer_max_rec: usize, default = 100
1776+
pub schema_infer_max_rec: Option<usize>, default = None
17771777
}
17781778
}
17791779

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::fmt::{self, Debug};
2323
use std::sync::Arc;
2424

2525
use super::write::orchestration::stateless_multipart_put;
26-
use super::{FileFormat, FileFormatFactory};
26+
use super::{FileFormat, FileFormatFactory, DEFAULT_SCHEMA_INFER_MAX_RECORD};
2727
use crate::datasource::file_format::file_compression_type::FileCompressionType;
2828
use crate::datasource::file_format::write::BatchSerializer;
2929
use crate::datasource::physical_plan::{
@@ -137,11 +137,11 @@ impl CsvFormat {
137137
/// Return a newline delimited stream from the specified file on
138138
/// Stream, decompressing if necessary
139139
/// Each returned `Bytes` has a whole number of newline delimited rows
140-
async fn read_to_delimited_chunks(
140+
async fn read_to_delimited_chunks<'a>(
141141
&self,
142142
store: &Arc<dyn ObjectStore>,
143143
object: &ObjectMeta,
144-
) -> BoxStream<'static, Result<Bytes>> {
144+
) -> BoxStream<'a, Result<Bytes>> {
145145
// stream to only read as many rows as needed into memory
146146
let stream = store
147147
.get(&object.location)
@@ -165,10 +165,10 @@ impl CsvFormat {
165165
stream.boxed()
166166
}
167167

168-
async fn read_to_delimited_chunks_from_stream(
168+
async fn read_to_delimited_chunks_from_stream<'a>(
169169
&self,
170-
stream: BoxStream<'static, Result<Bytes>>,
171-
) -> BoxStream<'static, Result<Bytes>> {
170+
stream: BoxStream<'a, Result<Bytes>>,
171+
) -> BoxStream<'a, Result<Bytes>> {
172172
let file_compression_type: FileCompressionType = self.options.compression.into();
173173
let decoder = file_compression_type.convert_stream(stream);
174174
let steam = match decoder {
@@ -204,7 +204,7 @@ impl CsvFormat {
204204
/// Set a limit in terms of records to scan to infer the schema
205205
/// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
206206
pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
207-
self.options.schema_infer_max_rec = max_rec;
207+
self.options.schema_infer_max_rec = Some(max_rec);
208208
self
209209
}
210210

@@ -319,7 +319,10 @@ impl FileFormat for CsvFormat {
319319
) -> Result<SchemaRef> {
320320
let mut schemas = vec![];
321321

322-
let mut records_to_read = self.options.schema_infer_max_rec;
322+
let mut records_to_read = self
323+
.options
324+
.schema_infer_max_rec
325+
.unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);
323326

324327
for object in objects {
325328
let stream = self.read_to_delimited_chunks(store, object).await;
@@ -945,7 +948,10 @@ mod tests {
945948
let integration = LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();
946949
let path = Path::from("csv/aggregate_test_100.csv");
947950
let csv = CsvFormat::default().with_has_header(true);
948-
let records_to_read = csv.options().schema_infer_max_rec;
951+
let records_to_read = csv
952+
.options()
953+
.schema_infer_max_rec
954+
.unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);
949955
let store = Arc::new(integration) as Arc<dyn ObjectStore>;
950956
let original_stream = store.get(&path).await?;
951957

datafusion/core/src/datasource/file_format/file_compression_type.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,10 @@ impl FileCompressionType {
123123
}
124124

125125
/// Given a `Stream`, create a `Stream` which data are compressed with `FileCompressionType`.
126-
pub fn convert_to_compress_stream(
126+
pub fn convert_to_compress_stream<'a>(
127127
&self,
128-
s: BoxStream<'static, Result<Bytes>>,
129-
) -> Result<BoxStream<'static, Result<Bytes>>> {
128+
s: BoxStream<'a, Result<Bytes>>,
129+
) -> Result<BoxStream<'a, Result<Bytes>>> {
130130
Ok(match self.variant {
131131
#[cfg(feature = "compression")]
132132
GZIP => ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
@@ -180,10 +180,10 @@ impl FileCompressionType {
180180
}
181181

182182
/// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`.
183-
pub fn convert_stream(
183+
pub fn convert_stream<'a>(
184184
&self,
185-
s: BoxStream<'static, Result<Bytes>>,
186-
) -> Result<BoxStream<'static, Result<Bytes>>> {
185+
s: BoxStream<'a, Result<Bytes>>,
186+
) -> Result<BoxStream<'a, Result<Bytes>>> {
187187
Ok(match self.variant {
188188
#[cfg(feature = "compression")]
189189
GZIP => {

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ use std::io::BufReader;
2525
use std::sync::Arc;
2626

2727
use super::write::orchestration::stateless_multipart_put;
28-
use super::{FileFormat, FileFormatFactory, FileScanConfig};
28+
use super::{
29+
FileFormat, FileFormatFactory, FileScanConfig, DEFAULT_SCHEMA_INFER_MAX_RECORD,
30+
};
2931
use crate::datasource::file_format::file_compression_type::FileCompressionType;
3032
use crate::datasource::file_format::write::BatchSerializer;
3133
use crate::datasource::physical_plan::FileGroupDisplay;
@@ -147,7 +149,7 @@ impl JsonFormat {
147149
/// Set a limit in terms of records to scan to infer the schema
148150
/// - defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
149151
pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
150-
self.options.schema_infer_max_rec = max_rec;
152+
self.options.schema_infer_max_rec = Some(max_rec);
151153
self
152154
}
153155

@@ -187,7 +189,10 @@ impl FileFormat for JsonFormat {
187189
objects: &[ObjectMeta],
188190
) -> Result<SchemaRef> {
189191
let mut schemas = Vec::new();
190-
let mut records_to_read = self.options.schema_infer_max_rec;
192+
let mut records_to_read = self
193+
.options
194+
.schema_infer_max_rec
195+
.unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);
191196
let file_compression_type = FileCompressionType::from(self.options.compression);
192197
for object in objects {
193198
let mut take_while = || {

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ mod tests {
254254
let format = listing_table.options().format.clone();
255255
let csv_format = format.as_any().downcast_ref::<CsvFormat>().unwrap();
256256
let csv_options = csv_format.options().clone();
257-
assert_eq!(csv_options.schema_infer_max_rec, 1000);
257+
assert_eq!(csv_options.schema_infer_max_rec, Some(1000));
258258
let listing_options = listing_table.options();
259259
assert_eq!(".tbl", listing_options.file_extension);
260260
}

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ message CsvOptions {
414414
bytes quote = 3; // Quote character as a byte
415415
bytes escape = 4; // Optional escape character as a byte
416416
CompressionTypeVariant compression = 5; // Compression type
417-
uint64 schema_infer_max_rec = 6; // Max records for schema inference
417+
optional uint64 schema_infer_max_rec = 6; // Optional max records for schema inference
418418
string date_format = 7; // Optional date format
419419
string datetime_format = 8; // Optional datetime format
420420
string timestamp_format = 9; // Optional timestamp format
@@ -430,7 +430,7 @@ message CsvOptions {
430430
// Options controlling CSV format
431431
message JsonOptions {
432432
CompressionTypeVariant compression = 1; // Compression type
433-
uint64 schema_infer_max_rec = 2; // Max records for schema inference
433+
optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference
434434
}
435435

436436
message TableParquetOptions {

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -869,7 +869,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
869869
double_quote: proto_opts.has_header.first().map(|h| *h != 0),
870870
newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
871871
compression: proto_opts.compression().into(),
872-
schema_infer_max_rec: proto_opts.schema_infer_max_rec as usize,
872+
schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
873873
date_format: (!proto_opts.date_format.is_empty())
874874
.then(|| proto_opts.date_format.clone()),
875875
datetime_format: (!proto_opts.datetime_format.is_empty())
@@ -1050,7 +1050,7 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
10501050
let compression: protobuf::CompressionTypeVariant = proto_opts.compression();
10511051
Ok(JsonOptions {
10521052
compression: compression.into(),
1053-
schema_infer_max_rec: proto_opts.schema_infer_max_rec as usize,
1053+
schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
10541054
})
10551055
}
10561056
}

datafusion/proto-common/src/generated/pbjson.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,7 +1512,7 @@ impl serde::Serialize for CsvOptions {
15121512
if self.compression != 0 {
15131513
len += 1;
15141514
}
1515-
if self.schema_infer_max_rec != 0 {
1515+
if self.schema_infer_max_rec.is_some() {
15161516
len += 1;
15171517
}
15181518
if !self.date_format.is_empty() {
@@ -1571,10 +1571,10 @@ impl serde::Serialize for CsvOptions {
15711571
.map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.compression)))?;
15721572
struct_ser.serialize_field("compression", &v)?;
15731573
}
1574-
if self.schema_infer_max_rec != 0 {
1574+
if let Some(v) = self.schema_infer_max_rec.as_ref() {
15751575
#[allow(clippy::needless_borrow)]
15761576
#[allow(clippy::needless_borrows_for_generic_args)]
1577-
struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&self.schema_infer_max_rec).as_str())?;
1577+
struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&v).as_str())?;
15781578
}
15791579
if !self.date_format.is_empty() {
15801580
struct_ser.serialize_field("dateFormat", &self.date_format)?;
@@ -1787,7 +1787,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
17871787
return Err(serde::de::Error::duplicate_field("schemaInferMaxRec"));
17881788
}
17891789
schema_infer_max_rec__ =
1790-
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
1790+
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0)
17911791
;
17921792
}
17931793
GeneratedField::DateFormat => {
@@ -1866,7 +1866,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
18661866
quote: quote__.unwrap_or_default(),
18671867
escape: escape__.unwrap_or_default(),
18681868
compression: compression__.unwrap_or_default(),
1869-
schema_infer_max_rec: schema_infer_max_rec__.unwrap_or_default(),
1869+
schema_infer_max_rec: schema_infer_max_rec__,
18701870
date_format: date_format__.unwrap_or_default(),
18711871
datetime_format: datetime_format__.unwrap_or_default(),
18721872
timestamp_format: timestamp_format__.unwrap_or_default(),
@@ -3929,7 +3929,7 @@ impl serde::Serialize for JsonOptions {
39293929
if self.compression != 0 {
39303930
len += 1;
39313931
}
3932-
if self.schema_infer_max_rec != 0 {
3932+
if self.schema_infer_max_rec.is_some() {
39333933
len += 1;
39343934
}
39353935
let mut struct_ser = serializer.serialize_struct("datafusion_common.JsonOptions", len)?;
@@ -3938,10 +3938,10 @@ impl serde::Serialize for JsonOptions {
39383938
.map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.compression)))?;
39393939
struct_ser.serialize_field("compression", &v)?;
39403940
}
3941-
if self.schema_infer_max_rec != 0 {
3941+
if let Some(v) = self.schema_infer_max_rec.as_ref() {
39423942
#[allow(clippy::needless_borrow)]
39433943
#[allow(clippy::needless_borrows_for_generic_args)]
3944-
struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&self.schema_infer_max_rec).as_str())?;
3944+
struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&v).as_str())?;
39453945
}
39463946
struct_ser.end()
39473947
}
@@ -4019,14 +4019,14 @@ impl<'de> serde::Deserialize<'de> for JsonOptions {
40194019
return Err(serde::de::Error::duplicate_field("schemaInferMaxRec"));
40204020
}
40214021
schema_infer_max_rec__ =
4022-
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
4022+
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0)
40234023
;
40244024
}
40254025
}
40264026
}
40274027
Ok(JsonOptions {
40284028
compression: compression__.unwrap_or_default(),
4029-
schema_infer_max_rec: schema_infer_max_rec__.unwrap_or_default(),
4029+
schema_infer_max_rec: schema_infer_max_rec__,
40304030
})
40314031
}
40324032
}

datafusion/proto-common/src/generated/prost.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -572,9 +572,9 @@ pub struct CsvOptions {
572572
/// Compression type
573573
#[prost(enumeration = "CompressionTypeVariant", tag = "5")]
574574
pub compression: i32,
575-
/// Max records for schema inference
576-
#[prost(uint64, tag = "6")]
577-
pub schema_infer_max_rec: u64,
575+
/// Optional max records for schema inference
576+
#[prost(uint64, optional, tag = "6")]
577+
pub schema_infer_max_rec: ::core::option::Option<u64>,
578578
/// Optional date format
579579
#[prost(string, tag = "7")]
580580
pub date_format: ::prost::alloc::string::String,
@@ -612,9 +612,9 @@ pub struct JsonOptions {
612612
/// Compression type
613613
#[prost(enumeration = "CompressionTypeVariant", tag = "1")]
614614
pub compression: i32,
615-
/// Max records for schema inference
616-
#[prost(uint64, tag = "2")]
617-
pub schema_infer_max_rec: u64,
615+
/// Optional max records for schema inference
616+
#[prost(uint64, optional, tag = "2")]
617+
pub schema_infer_max_rec: ::core::option::Option<u64>,
618618
}
619619
#[derive(Clone, PartialEq, ::prost::Message)]
620620
pub struct TableParquetOptions {

datafusion/proto-common/src/to_proto/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -921,7 +921,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
921921
.newlines_in_values
922922
.map_or_else(Vec::new, |h| vec![h as u8]),
923923
compression: compression.into(),
924-
schema_infer_max_rec: opts.schema_infer_max_rec as u64,
924+
schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
925925
date_format: opts.date_format.clone().unwrap_or_default(),
926926
datetime_format: opts.datetime_format.clone().unwrap_or_default(),
927927
timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
@@ -940,7 +940,7 @@ impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
940940
let compression: protobuf::CompressionTypeVariant = opts.compression.into();
941941
Ok(protobuf::JsonOptions {
942942
compression: compression.into(),
943-
schema_infer_max_rec: opts.schema_infer_max_rec as u64,
943+
schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
944944
})
945945
}
946946
}

datafusion/proto/src/generated/datafusion_proto_common.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -572,9 +572,9 @@ pub struct CsvOptions {
572572
/// Compression type
573573
#[prost(enumeration = "CompressionTypeVariant", tag = "5")]
574574
pub compression: i32,
575-
/// Max records for schema inference
576-
#[prost(uint64, tag = "6")]
577-
pub schema_infer_max_rec: u64,
575+
/// Optional max records for schema inference
576+
#[prost(uint64, optional, tag = "6")]
577+
pub schema_infer_max_rec: ::core::option::Option<u64>,
578578
/// Optional date format
579579
#[prost(string, tag = "7")]
580580
pub date_format: ::prost::alloc::string::String,
@@ -612,9 +612,9 @@ pub struct JsonOptions {
612612
/// Compression type
613613
#[prost(enumeration = "CompressionTypeVariant", tag = "1")]
614614
pub compression: i32,
615-
/// Max records for schema inference
616-
#[prost(uint64, tag = "2")]
617-
pub schema_infer_max_rec: u64,
615+
/// Optional max records for schema inference
616+
#[prost(uint64, optional, tag = "2")]
617+
pub schema_infer_max_rec: ::core::option::Option<u64>,
618618
}
619619
#[derive(Clone, PartialEq, ::prost::Message)]
620620
pub struct TableParquetOptions {

datafusion/proto/src/logical_plan/file_formats.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl CsvOptionsProto {
5757
escape: options.escape.map_or(vec![], |v| vec![v]),
5858
double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
5959
compression: options.compression as i32,
60-
schema_infer_max_rec: options.schema_infer_max_rec as u64,
60+
schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
6161
date_format: options.date_format.clone().unwrap_or_default(),
6262
datetime_format: options.datetime_format.clone().unwrap_or_default(),
6363
timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
@@ -110,7 +110,7 @@ impl From<&CsvOptionsProto> for CsvOptions {
110110
3 => CompressionTypeVariant::ZSTD,
111111
_ => CompressionTypeVariant::UNCOMPRESSED,
112112
},
113-
schema_infer_max_rec: proto.schema_infer_max_rec as usize,
113+
schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
114114
date_format: if proto.date_format.is_empty() {
115115
None
116116
} else {
@@ -239,7 +239,7 @@ impl JsonOptionsProto {
239239
if let Some(options) = &factory.options {
240240
JsonOptionsProto {
241241
compression: options.compression as i32,
242-
schema_infer_max_rec: options.schema_infer_max_rec as u64,
242+
schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
243243
}
244244
} else {
245245
JsonOptionsProto::default()
@@ -257,7 +257,7 @@ impl From<&JsonOptionsProto> for JsonOptions {
257257
3 => CompressionTypeVariant::ZSTD,
258258
_ => CompressionTypeVariant::UNCOMPRESSED,
259259
},
260-
schema_infer_max_rec: proto.schema_infer_max_rec as usize,
260+
schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
261261
}
262262
}
263263
}

datafusion/proto/tests/cases/roundtrip_logical_plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ async fn roundtrip_logical_plan_copy_to_json() -> Result<()> {
556556

557557
// Set specific JSON format options
558558
json_format.compression = CompressionTypeVariant::GZIP;
559-
json_format.schema_infer_max_rec = 1000;
559+
json_format.schema_infer_max_rec = Some(1000);
560560

561561
let file_type = format_as_file_type(Arc::new(JsonFormatFactory::new_with_options(
562562
json_format.clone(),

0 commit comments

Comments
 (0)