Skip to content

Commit a08d26e

Browse files
authored
Fix avro tests (#2570) (#2571)
1 parent 5399f1c commit a08d26e

File tree

5 files changed

+31
-18
lines changed

5 files changed

+31
-18
lines changed

.github/workflows/rust.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ jobs:
115115
run: |
116116
export ARROW_TEST_DATA=$(pwd)/testing/data
117117
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
118-
cargo test
118+
cargo test --features avro
119119
# test datafusion examples
120120
cd datafusion-examples
121121
cargo run --example csv_sql

datafusion/core/src/avro_to_arrow/arrow_array_reader.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,7 @@ fn flatten_string_values(values: &[&Value]) -> Vec<Option<String>> {
861861
values
862862
.iter()
863863
.flat_map(|row| {
864+
let row = maybe_resolve_union(row);
864865
if let Value::Array(values) = row {
865866
values
866867
.iter()

datafusion/core/src/datasource/file_format/avro.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ mod tests {
8787

8888
use super::*;
8989
use crate::datasource::listing::local_unpartitioned_file;
90-
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
9190
use crate::prelude::{SessionConfig, SessionContext};
9291
use arrow::array::{
9392
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
@@ -99,10 +98,10 @@ mod tests {
9998
async fn read_small_batches() -> Result<()> {
10099
let config = SessionConfig::new().with_batch_size(2);
101100
let ctx = SessionContext::with_config(config);
102-
let task_ctx = session_ctx.task_ctx();
101+
let task_ctx = ctx.task_ctx();
103102
let projection = None;
104103
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
105-
let stream = exec.execute(0, task_ctx).await?;
104+
let stream = exec.execute(0, task_ctx)?;
106105

107106
let tt_batches = stream
108107
.map(|batch| {

datafusion/core/src/physical_plan/file_format/avro.rs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! Execution plan for reading line-delimited Avro files
1919
#[cfg(feature = "avro")]
2020
use crate::avro_to_arrow;
21-
use crate::error::{DataFusionError, Result};
21+
use crate::error::Result;
2222
use crate::physical_plan::expressions::PhysicalSortExpr;
2323
use crate::physical_plan::{
2424
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
@@ -98,7 +98,7 @@ impl ExecutionPlan for AvroExec {
9898
_partition: usize,
9999
_context: Arc<TaskContext>,
100100
) -> Result<SendableRecordBatchStream> {
101-
Err(DataFusionError::NotImplemented(
101+
Err(crate::error::DataFusionError::NotImplemented(
102102
"Cannot execute avro plan without avro feature enabled".to_string(),
103103
))
104104
}
@@ -165,17 +165,17 @@ impl ExecutionPlan for AvroExec {
165165
#[cfg(test)]
166166
#[cfg(feature = "avro")]
167167
mod tests {
168-
use crate::datasource::object_store::local::{
169-
local_object_reader_stream, LocalFileSystem,
170-
};
171168
use crate::datasource::{
172169
file_format::{avro::AvroFormat, FileFormat},
173170
listing::local_unpartitioned_file,
174171
};
172+
use crate::prelude::SessionContext;
175173
use crate::scalar::ScalarValue;
176174
use arrow::datatypes::{DataType, Field, Schema};
175+
use datafusion_data_access::object_store::local::{
176+
local_object_reader_stream, LocalFileSystem,
177+
};
177178
use futures::StreamExt;
178-
use sqlparser::ast::ObjectType::Schema;
179179

180180
use super::*;
181181

@@ -196,7 +196,11 @@ mod tests {
196196
});
197197
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
198198

199-
let mut results = avro_exec.execute(0).await.expect("plan execution failed");
199+
let ctx = SessionContext::new();
200+
let mut results = avro_exec
201+
.execute(0, ctx.task_ctx())
202+
.expect("plan execution failed");
203+
200204
let batch = results
201205
.next()
202206
.await
@@ -237,27 +241,32 @@ mod tests {
237241
let testdata = crate::test_util::arrow_test_data();
238242
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
239243
let actual_schema = AvroFormat {}
240-
.infer_schema(local_object_reader_stream(vec![filename]))
244+
.infer_schema(local_object_reader_stream(vec![filename.clone()]))
241245
.await?;
242246

243247
let mut fields = actual_schema.fields().clone();
244248
fields.push(Field::new("missing_col", DataType::Int32, true));
245249

246250
let file_schema = Arc::new(Schema::new(fields));
251+
// Include the missing column in the projection
252+
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);
247253

248254
let avro_exec = AvroExec::new(FileScanConfig {
249255
object_store: Arc::new(LocalFileSystem {}),
250256
file_groups: vec![vec![local_unpartitioned_file(filename.clone())]],
251257
file_schema,
252258
statistics: Statistics::default(),
253-
// Include the missing column in the projection
254-
projection: Some(vec![0, 1, 2, file_schema.fields().len()]),
259+
projection,
255260
limit: None,
256261
table_partition_cols: vec![],
257262
});
258263
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
259264

260-
let mut results = avro_exec.execute(0).await.expect("plan execution failed");
265+
let ctx = SessionContext::new();
266+
let mut results = avro_exec
267+
.execute(0, ctx.task_ctx())
268+
.expect("plan execution failed");
269+
261270
let batch = results
262271
.next()
263272
.await
@@ -310,14 +319,18 @@ mod tests {
310319
projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
311320
object_store: Arc::new(LocalFileSystem {}),
312321
file_groups: vec![vec![partitioned_file]],
313-
file_schema: file_schema,
322+
file_schema,
314323
statistics: Statistics::default(),
315324
limit: None,
316325
table_partition_cols: vec!["date".to_owned()],
317326
});
318327
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
319328

320-
let mut results = avro_exec.execute(0).await.expect("plan execution failed");
329+
let ctx = SessionContext::new();
330+
let mut results = avro_exec
331+
.execute(0, ctx.task_ctx())
332+
.expect("plan execution failed");
333+
321334
let batch = results
322335
.next()
323336
.await

datafusion/core/tests/sql/avro.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ async fn avro_single_nan_schema() {
124124
let plan = ctx.create_logical_plan(sql).unwrap();
125125
let plan = ctx.optimize(&plan).unwrap();
126126
let plan = ctx.create_physical_plan(&plan).await.unwrap();
127-
let runtime = ctx.state.lock().runtime_env.clone();
127+
let runtime = ctx.task_ctx();
128128
let results = collect(plan, runtime).await.unwrap();
129129
for batch in results {
130130
assert_eq!(1, batch.num_rows());

0 commit comments

Comments
 (0)