Skip to content

Commit a7ff7a5

Browse files
authored
Consolidate Example: dataframe_output.rs into dataframe.rs (#13877)
1 parent 242f45f commit a7ff7a5

File tree

3 files changed

+68
-80
lines changed

3 files changed

+68
-80
lines changed

datafusion-examples/README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ cargo run --example dataframe
5757
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
5858
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
5959
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
60-
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data
61-
- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame
60+
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file.
6261
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
6362
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
6463
- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.

datafusion-examples/examples/dataframe.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717

1818
use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1919
use datafusion::arrow::datatypes::{DataType, Field, Schema};
20+
use datafusion::dataframe::DataFrameWriteOptions;
2021
use datafusion::error::Result;
2122
use datafusion::prelude::*;
23+
use datafusion_common::config::CsvOptions;
24+
use datafusion_common::parsers::CompressionTypeVariant;
25+
use datafusion_common::DataFusionError;
2226
use std::fs::File;
2327
use std::io::Write;
2428
use std::sync::Arc;
@@ -29,13 +33,19 @@ use tempfile::tempdir;
2933
/// * [read_parquet]: execute queries against parquet files
3034
/// * [read_csv]: execute queries against csv files
3135
/// * [read_memory]: execute queries against in-memory arrow data
36+
///
37+
/// This example demonstrates the various methods to write out a DataFrame to local storage.
38+
/// See datafusion-examples/examples/external_dependency/dataframe-to-s3.rs for an example
39+
/// using a remote object store.
40+
/// * [write_out]: write out a DataFrame to a table, parquet file, csv file, or json file
3241
#[tokio::main]
3342
async fn main() -> Result<()> {
3443
// The SessionContext is the main high level API for interacting with DataFusion
3544
let ctx = SessionContext::new();
3645
read_parquet(&ctx).await?;
3746
read_csv(&ctx).await?;
3847
read_memory(&ctx).await?;
48+
write_out(&ctx).await?;
3949
Ok(())
4050
}
4151

@@ -139,3 +149,60 @@ async fn read_memory(ctx: &SessionContext) -> Result<()> {
139149

140150
Ok(())
141151
}
152+
153+
/// Use the DataFrame API to:
154+
/// 1. Write out a DataFrame to a table
155+
/// 2. Write out a DataFrame to a parquet file
156+
/// 3. Write out a DataFrame to a csv file
157+
/// 4. Write out a DataFrame to a json file
158+
async fn write_out(ctx: &SessionContext) -> std::result::Result<(), DataFusionError> {
159+
let mut df = ctx.sql("values ('a'), ('b'), ('c')").await.unwrap();
160+
161+
// Ensure the column names and types match the target table
162+
df = df.with_column_renamed("column1", "tablecol1").unwrap();
163+
164+
ctx.sql(
165+
"create external table
166+
test(tablecol1 varchar)
167+
stored as parquet
168+
location './datafusion-examples/test_table/'",
169+
)
170+
.await?
171+
.collect()
172+
.await?;
173+
174+
// This is equivalent to INSERT INTO test VALUES ('a'), ('b'), ('c').
175+
// The behavior of write_table depends on the TableProvider's implementation
176+
// of the insert_into method.
177+
df.clone()
178+
.write_table("test", DataFrameWriteOptions::new())
179+
.await?;
180+
181+
df.clone()
182+
.write_parquet(
183+
"./datafusion-examples/test_parquet/",
184+
DataFrameWriteOptions::new(),
185+
None,
186+
)
187+
.await?;
188+
189+
df.clone()
190+
.write_csv(
191+
"./datafusion-examples/test_csv/",
192+
// DataFrameWriteOptions contains options which control how data is written
193+
// such as compression codec
194+
DataFrameWriteOptions::new(),
195+
Some(CsvOptions::default().with_compression(CompressionTypeVariant::GZIP)),
196+
)
197+
.await?;
198+
199+
df.clone()
200+
.write_json(
201+
"./datafusion-examples/test_json/",
202+
DataFrameWriteOptions::new(),
203+
None,
204+
)
205+
.await?;
206+
207+
Ok(())
208+
}

datafusion-examples/examples/dataframe_output.rs

Lines changed: 0 additions & 78 deletions
This file was deleted.

0 commit comments

Comments
 (0)