Skip to content

Commit d9381c6

Browse files
authored
Add IpcError variant to replace some uses of IoErrorthat don't have underlying std::io::Error (#4726)
1 parent 90449ff commit d9381c6

File tree

9 files changed

+58
-50
lines changed

9 files changed

+58
-50
lines changed

arrow-flight/examples/flight_sql_server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -802,7 +802,7 @@ mod tests {
802802

803803
fn endpoint(uri: String) -> Result<Endpoint, ArrowError> {
804804
let endpoint = Endpoint::new(uri)
805-
.map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
805+
.map_err(|_| ArrowError::IpcError("Cannot create endpoint".to_string()))?
806806
.connect_timeout(Duration::from_secs(20))
807807
.timeout(Duration::from_secs(20))
808808
.tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait

arrow-flight/src/bin/flight_sql_client.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ async fn setup_client(
151151
let protocol = if args.tls { "https" } else { "http" };
152152

153153
let mut endpoint = Endpoint::new(format!("{}://{}:{}", protocol, args.host, port))
154-
.map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
154+
.map_err(|_| ArrowError::IpcError("Cannot create endpoint".to_string()))?
155155
.connect_timeout(Duration::from_secs(20))
156156
.timeout(Duration::from_secs(20))
157157
.tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
@@ -162,15 +162,15 @@ async fn setup_client(
162162

163163
if args.tls {
164164
let tls_config = ClientTlsConfig::new();
165-
endpoint = endpoint
166-
.tls_config(tls_config)
167-
.map_err(|_| ArrowError::IoError("Cannot create TLS endpoint".to_string()))?;
165+
endpoint = endpoint.tls_config(tls_config).map_err(|_| {
166+
ArrowError::IpcError("Cannot create TLS endpoint".to_string())
167+
})?;
168168
}
169169

170170
let channel = endpoint
171171
.connect()
172172
.await
173-
.map_err(|e| ArrowError::IoError(format!("Cannot connect to endpoint: {e}")))?;
173+
.map_err(|e| ArrowError::IpcError(format!("Cannot connect to endpoint: {e}")))?;
174174

175175
let mut client = FlightSqlServiceClient::new(channel);
176176
info!("connected");

arrow-flight/src/sql/client.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ impl FlightSqlServiceClient<Channel> {
150150
.flight_client
151151
.handshake(req)
152152
.await
153-
.map_err(|e| ArrowError::IoError(format!("Can't handshake {e}")))?;
153+
.map_err(|e| ArrowError::IpcError(format!("Can't handshake {e}")))?;
154154
if let Some(auth) = resp.metadata().get("authorization") {
155155
let auth = auth.to_str().map_err(|_| {
156156
ArrowError::ParseError("Can't read auth header".to_string())
@@ -390,16 +390,20 @@ impl FlightSqlServiceClient<Channel> {
390390
) -> Result<tonic::Request<T>, ArrowError> {
391391
for (k, v) in &self.headers {
392392
let k = AsciiMetadataKey::from_str(k.as_str()).map_err(|e| {
393-
ArrowError::IoError(format!("Cannot convert header key \"{k}\": {e}"))
393+
ArrowError::ParseError(format!("Cannot convert header key \"{k}\": {e}"))
394394
})?;
395395
let v = v.parse().map_err(|e| {
396-
ArrowError::IoError(format!("Cannot convert header value \"{v}\": {e}"))
396+
ArrowError::ParseError(format!(
397+
"Cannot convert header value \"{v}\": {e}"
398+
))
397399
})?;
398400
req.metadata_mut().insert(k, v);
399401
}
400402
if let Some(token) = &self.token {
401403
let val = format!("Bearer {token}").parse().map_err(|e| {
402-
ArrowError::IoError(format!("Cannot convert token to header value: {e}"))
404+
ArrowError::ParseError(format!(
405+
"Cannot convert token to header value: {e}"
406+
))
403407
})?;
404408
req.metadata_mut().insert("authorization", val);
405409
}
@@ -504,11 +508,11 @@ impl PreparedStatement<Channel> {
504508
}
505509

506510
fn decode_error_to_arrow_error(err: prost::DecodeError) -> ArrowError {
507-
ArrowError::IoError(err.to_string())
511+
ArrowError::IpcError(err.to_string())
508512
}
509513

510514
fn status_to_arrow_error(status: tonic::Status) -> ArrowError {
511-
ArrowError::IoError(format!("{status:?}"))
515+
ArrowError::IpcError(format!("{status:?}"))
512516
}
513517

514518
// A polymorphic structure to natively represent different types of data contained in `FlightData`

arrow-flight/tests/encode_decode.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ async fn test_mismatched_schema_message() {
386386
do_test(
387387
make_primitive_batch(5),
388388
make_dictionary_batch(3),
389-
"Error decoding ipc RecordBatch: Io error: Invalid data for schema",
389+
"Error decoding ipc RecordBatch: Schema error: Invalid data for schema",
390390
)
391391
.await;
392392

arrow-ipc/src/convert.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,12 @@ pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema, ArrowErr
150150
if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
151151
Ok(schema)
152152
} else {
153-
Err(ArrowError::IoError(
153+
Err(ArrowError::ParseError(
154154
"Unable to get head as schema".to_string(),
155155
))
156156
}
157157
} else {
158-
Err(ArrowError::IoError(
158+
Err(ArrowError::ParseError(
159159
"Unable to get root as message".to_string(),
160160
))
161161
}

arrow-ipc/src/reader.rs

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,12 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
138138
let index_buffers = [reader.next_buffer()?, reader.next_buffer()?];
139139

140140
let dict_id = field.dict_id().ok_or_else(|| {
141-
ArrowError::IoError(format!("Field {field} does not have dict id"))
141+
ArrowError::ParseError(format!("Field {field} does not have dict id"))
142142
})?;
143143

144144
let value_array =
145145
reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
146-
ArrowError::IoError(format!(
146+
ArrowError::ParseError(format!(
147147
"Cannot find a dictionary batch with dict id: {dict_id}"
148148
))
149149
})?;
@@ -193,7 +193,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
193193
let null_count = node.null_count();
194194

195195
if length != null_count {
196-
return Err(ArrowError::IoError(format!(
196+
return Err(ArrowError::SchemaError(format!(
197197
"Field {field} of NullArray has unequal null_count {null_count} and len {length}"
198198
)));
199199
}
@@ -325,7 +325,7 @@ impl<'a> ArrayReader<'a> {
325325

326326
fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
327327
self.nodes.next().ok_or_else(|| {
328-
ArrowError::IoError(format!(
328+
ArrowError::SchemaError(format!(
329329
"Invalid data for schema. {} refers to node not found in schema",
330330
field
331331
))
@@ -402,10 +402,10 @@ pub fn read_record_batch(
402402
metadata: &MetadataVersion,
403403
) -> Result<RecordBatch, ArrowError> {
404404
let buffers = batch.buffers().ok_or_else(|| {
405-
ArrowError::IoError("Unable to get buffers from IPC RecordBatch".to_string())
405+
ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
406406
})?;
407407
let field_nodes = batch.nodes().ok_or_else(|| {
408-
ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string())
408+
ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
409409
})?;
410410
let batch_compression = batch.compression();
411411
let compression = batch_compression
@@ -462,7 +462,7 @@ pub fn read_dictionary(
462462
metadata: &crate::MetadataVersion,
463463
) -> Result<(), ArrowError> {
464464
if batch.isDelta() {
465-
return Err(ArrowError::IoError(
465+
return Err(ArrowError::InvalidArgumentError(
466466
"delta dictionary batches not supported".to_string(),
467467
));
468468
}
@@ -569,14 +569,14 @@ impl<R: Read + Seek> FileReader<R> {
569569
let mut magic_buffer: [u8; 6] = [0; 6];
570570
reader.read_exact(&mut magic_buffer)?;
571571
if magic_buffer != super::ARROW_MAGIC {
572-
return Err(ArrowError::IoError(
572+
return Err(ArrowError::ParseError(
573573
"Arrow file does not contain correct header".to_string(),
574574
));
575575
}
576576
reader.seek(SeekFrom::End(-6))?;
577577
reader.read_exact(&mut magic_buffer)?;
578578
if magic_buffer != super::ARROW_MAGIC {
579-
return Err(ArrowError::IoError(
579+
return Err(ArrowError::ParseError(
580580
"Arrow file does not contain correct footer".to_string(),
581581
));
582582
}
@@ -592,11 +592,11 @@ impl<R: Read + Seek> FileReader<R> {
592592
reader.read_exact(&mut footer_data)?;
593593

594594
let footer = crate::root_as_footer(&footer_data[..]).map_err(|err| {
595-
ArrowError::IoError(format!("Unable to get root as footer: {err:?}"))
595+
ArrowError::ParseError(format!("Unable to get root as footer: {err:?}"))
596596
})?;
597597

598598
let blocks = footer.recordBatches().ok_or_else(|| {
599-
ArrowError::IoError(
599+
ArrowError::ParseError(
600600
"Unable to get record batches from IPC Footer".to_string(),
601601
)
602602
})?;
@@ -633,7 +633,9 @@ impl<R: Read + Seek> FileReader<R> {
633633
reader.read_exact(&mut block_data)?;
634634

635635
let message = crate::root_as_message(&block_data[..]).map_err(|err| {
636-
ArrowError::IoError(format!("Unable to get root as message: {err:?}"))
636+
ArrowError::ParseError(format!(
637+
"Unable to get root as message: {err:?}"
638+
))
637639
})?;
638640

639641
match message.header_type() {
@@ -657,7 +659,7 @@ impl<R: Read + Seek> FileReader<R> {
657659
)?;
658660
}
659661
t => {
660-
return Err(ArrowError::IoError(format!(
662+
return Err(ArrowError::ParseError(format!(
661663
"Expecting DictionaryBatch in dictionary blocks, found {t:?}."
662664
)));
663665
}
@@ -705,7 +707,7 @@ impl<R: Read + Seek> FileReader<R> {
705707
/// Sets the current block to the index, allowing random reads
706708
pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
707709
if index >= self.total_blocks {
708-
Err(ArrowError::IoError(format!(
710+
Err(ArrowError::InvalidArgumentError(format!(
709711
"Cannot set batch to index {} from {} total batches",
710712
index, self.total_blocks
711713
)))
@@ -732,25 +734,25 @@ impl<R: Read + Seek> FileReader<R> {
732734
let mut block_data = vec![0; meta_len as usize];
733735
self.reader.read_exact(&mut block_data)?;
734736
let message = crate::root_as_message(&block_data[..]).map_err(|err| {
735-
ArrowError::IoError(format!("Unable to get root as footer: {err:?}"))
737+
ArrowError::ParseError(format!("Unable to get root as footer: {err:?}"))
736738
})?;
737739

738740
// some old test data's footer metadata is not set, so we account for that
739741
if self.metadata_version != crate::MetadataVersion::V1
740742
&& message.version() != self.metadata_version
741743
{
742-
return Err(ArrowError::IoError(
744+
return Err(ArrowError::IpcError(
743745
"Could not read IPC message as metadata versions mismatch".to_string(),
744746
));
745747
}
746748

747749
match message.header_type() {
748-
crate::MessageHeader::Schema => Err(ArrowError::IoError(
750+
crate::MessageHeader::Schema => Err(ArrowError::IpcError(
749751
"Not expecting a schema when messages are read".to_string(),
750752
)),
751753
crate::MessageHeader::RecordBatch => {
752754
let batch = message.header_as_record_batch().ok_or_else(|| {
753-
ArrowError::IoError(
755+
ArrowError::IpcError(
754756
"Unable to read IPC message as record batch".to_string(),
755757
)
756758
})?;
@@ -774,7 +776,7 @@ impl<R: Read + Seek> FileReader<R> {
774776
crate::MessageHeader::NONE => {
775777
Ok(None)
776778
}
777-
t => Err(ArrowError::IoError(format!(
779+
t => Err(ArrowError::InvalidArgumentError(format!(
778780
"Reading types other than record batches not yet supported, unable to read {t:?}"
779781
))),
780782
}
@@ -886,11 +888,11 @@ impl<R: Read> StreamReader<R> {
886888
reader.read_exact(&mut meta_buffer)?;
887889

888890
let message = crate::root_as_message(meta_buffer.as_slice()).map_err(|err| {
889-
ArrowError::IoError(format!("Unable to get root as message: {err:?}"))
891+
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
890892
})?;
891893
// message header is a Schema, so read it
892894
let ipc_schema: crate::Schema = message.header_as_schema().ok_or_else(|| {
893-
ArrowError::IoError("Unable to read IPC message as schema".to_string())
895+
ArrowError::ParseError("Unable to read IPC message as schema".to_string())
894896
})?;
895897
let schema = crate::convert::fb_to_schema(ipc_schema);
896898

@@ -965,16 +967,16 @@ impl<R: Read> StreamReader<R> {
965967

966968
let vecs = &meta_buffer.to_vec();
967969
let message = crate::root_as_message(vecs).map_err(|err| {
968-
ArrowError::IoError(format!("Unable to get root as message: {err:?}"))
970+
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
969971
})?;
970972

971973
match message.header_type() {
972-
crate::MessageHeader::Schema => Err(ArrowError::IoError(
974+
crate::MessageHeader::Schema => Err(ArrowError::IpcError(
973975
"Not expecting a schema when messages are read".to_string(),
974976
)),
975977
crate::MessageHeader::RecordBatch => {
976978
let batch = message.header_as_record_batch().ok_or_else(|| {
977-
ArrowError::IoError(
979+
ArrowError::IpcError(
978980
"Unable to read IPC message as record batch".to_string(),
979981
)
980982
})?;
@@ -986,7 +988,7 @@ impl<R: Read> StreamReader<R> {
986988
}
987989
crate::MessageHeader::DictionaryBatch => {
988990
let batch = message.header_as_dictionary_batch().ok_or_else(|| {
989-
ArrowError::IoError(
991+
ArrowError::IpcError(
990992
"Unable to read IPC message as dictionary batch".to_string(),
991993
)
992994
})?;
@@ -1004,7 +1006,7 @@ impl<R: Read> StreamReader<R> {
10041006
crate::MessageHeader::NONE => {
10051007
Ok(None)
10061008
}
1007-
t => Err(ArrowError::IoError(
1009+
t => Err(ArrowError::InvalidArgumentError(
10081010
format!("Reading types other than record batches not yet supported, unable to read {t:?} ")
10091011
)),
10101012
}

arrow-ipc/src/writer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -757,7 +757,7 @@ impl<W: Write> FileWriter<W> {
757757
/// Write a record batch to the file
758758
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
759759
if self.finished {
760-
return Err(ArrowError::IoError(
760+
return Err(ArrowError::IpcError(
761761
"Cannot write record batch to file writer as it is closed".to_string(),
762762
));
763763
}
@@ -794,7 +794,7 @@ impl<W: Write> FileWriter<W> {
794794
/// Write footer and closing tag, then mark the writer as done
795795
pub fn finish(&mut self) -> Result<(), ArrowError> {
796796
if self.finished {
797-
return Err(ArrowError::IoError(
797+
return Err(ArrowError::IpcError(
798798
"Cannot write footer to file writer as it is closed".to_string(),
799799
));
800800
}
@@ -909,7 +909,7 @@ impl<W: Write> StreamWriter<W> {
909909
/// Write a record batch to the stream
910910
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
911911
if self.finished {
912-
return Err(ArrowError::IoError(
912+
return Err(ArrowError::IpcError(
913913
"Cannot write record batch to stream writer as it is closed".to_string(),
914914
));
915915
}
@@ -930,7 +930,7 @@ impl<W: Write> StreamWriter<W> {
930930
/// Write continuation bytes, and mark the stream as done
931931
pub fn finish(&mut self) -> Result<(), ArrowError> {
932932
if self.finished {
933-
return Err(ArrowError::IoError(
933+
return Err(ArrowError::IpcError(
934934
"Cannot write footer to stream writer as it is closed".to_string(),
935935
));
936936
}

arrow-schema/src/error.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ pub enum ArrowError {
3535
DivideByZero,
3636
CsvError(String),
3737
JsonError(String),
38-
IoError(String),
38+
IoError(String, std::io::Error),
39+
IpcError(String),
3940
InvalidArgumentError(String),
4041
ParquetError(String),
4142
/// Error during import or export to/from the C Data Interface
@@ -53,7 +54,7 @@ impl ArrowError {
5354

5455
impl From<std::io::Error> for ArrowError {
5556
fn from(error: std::io::Error) -> Self {
56-
ArrowError::IoError(error.to_string())
57+
ArrowError::IoError(error.to_string(), error)
5758
}
5859
}
5960

@@ -65,7 +66,7 @@ impl From<std::string::FromUtf8Error> for ArrowError {
6566

6667
impl<W: Write> From<std::io::IntoInnerError<W>> for ArrowError {
6768
fn from(error: std::io::IntoInnerError<W>) -> Self {
68-
ArrowError::IoError(error.to_string())
69+
ArrowError::IoError(error.to_string(), error.into())
6970
}
7071
}
7172

@@ -84,7 +85,8 @@ impl Display for ArrowError {
8485
ArrowError::DivideByZero => write!(f, "Divide by zero error"),
8586
ArrowError::CsvError(desc) => write!(f, "Csv error: {desc}"),
8687
ArrowError::JsonError(desc) => write!(f, "Json error: {desc}"),
87-
ArrowError::IoError(desc) => write!(f, "Io error: {desc}"),
88+
ArrowError::IoError(desc, _) => write!(f, "Io error: {desc}"),
89+
ArrowError::IpcError(desc) => write!(f, "Ipc error: {desc}"),
8890
ArrowError::InvalidArgumentError(desc) => {
8991
write!(f, "Invalid argument error: {desc}")
9092
}

arrow/src/ffi_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ fn get_error_code(err: &ArrowError) -> i32 {
258258
match err {
259259
ArrowError::NotYetImplemented(_) => ENOSYS,
260260
ArrowError::MemoryError(_) => ENOMEM,
261-
ArrowError::IoError(_) => EIO,
261+
ArrowError::IoError(_, _) => EIO,
262262
_ => EINVAL,
263263
}
264264
}

0 commit comments

Comments
 (0)