Skip to content

Commit ab8761d

Browse files
authored
docs: add example for custom file format with COPY TO (#11174)
* feat: add example for copy to * better docs plus tempdir * build: clean examples if over 10GB * only 1GB * build: try clearing some disk space before running * build: remove sudo * build: try clean * build: run clean * build: only clean examples * docs: better output for example
1 parent 09cdb78 commit ab8761d

File tree

2 files changed

+235
-0
lines changed

2 files changed

+235
-0
lines changed

ci/scripts/rust_example.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,6 @@ do
2929
# Skip tests that rely on external storage and flight
3030
if [ ! -d $filename ]; then
3131
cargo run --example $example_name
32+
cargo clean -p datafusion-examples
3233
fi
3334
done
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::{any::Any, sync::Arc};
19+
20+
use arrow::{
21+
array::{AsArray, RecordBatch, StringArray, UInt8Array},
22+
datatypes::UInt64Type,
23+
};
24+
use arrow_schema::{DataType, Field, Schema, SchemaRef};
25+
use datafusion::{
26+
datasource::{
27+
file_format::{
28+
csv::CsvFormatFactory, file_compression_type::FileCompressionType,
29+
FileFormat, FileFormatFactory,
30+
},
31+
physical_plan::{FileScanConfig, FileSinkConfig},
32+
MemTable,
33+
},
34+
error::Result,
35+
execution::{context::SessionState, runtime_env::RuntimeEnv},
36+
physical_plan::ExecutionPlan,
37+
prelude::{SessionConfig, SessionContext},
38+
};
39+
use datafusion_common::{GetExt, Statistics};
40+
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
41+
use object_store::{ObjectMeta, ObjectStore};
42+
use tempfile::tempdir;
43+
44+
/// Example of a custom file format that reads and writes TSV files.
45+
///
46+
/// TSVFileFormatFactory is responsible for creating instances of TSVFileFormat.
47+
/// The former, once registered with the SessionState, will then be used
48+
/// to facilitate SQL operations on TSV files, such as `COPY TO` shown here.
49+
50+
#[derive(Debug)]
51+
/// Custom file format that reads and writes TSV files
52+
///
53+
/// This file format is a wrapper around the CSV file format
54+
/// for demonstration purposes.
55+
struct TSVFileFormat {
56+
csv_file_format: Arc<dyn FileFormat>,
57+
}
58+
59+
impl TSVFileFormat {
60+
pub fn new(csv_file_format: Arc<dyn FileFormat>) -> Self {
61+
Self { csv_file_format }
62+
}
63+
}
64+
65+
#[async_trait::async_trait]
66+
impl FileFormat for TSVFileFormat {
67+
fn as_any(&self) -> &dyn Any {
68+
self
69+
}
70+
71+
fn get_ext(&self) -> String {
72+
"tsv".to_string()
73+
}
74+
75+
fn get_ext_with_compression(
76+
&self,
77+
c: &FileCompressionType,
78+
) -> datafusion::error::Result<String> {
79+
if c == &FileCompressionType::UNCOMPRESSED {
80+
Ok("tsv".to_string())
81+
} else {
82+
todo!("Compression not supported")
83+
}
84+
}
85+
86+
async fn infer_schema(
87+
&self,
88+
state: &SessionState,
89+
store: &Arc<dyn ObjectStore>,
90+
objects: &[ObjectMeta],
91+
) -> Result<SchemaRef> {
92+
self.csv_file_format
93+
.infer_schema(state, store, objects)
94+
.await
95+
}
96+
97+
async fn infer_stats(
98+
&self,
99+
state: &SessionState,
100+
store: &Arc<dyn ObjectStore>,
101+
table_schema: SchemaRef,
102+
object: &ObjectMeta,
103+
) -> Result<Statistics> {
104+
self.csv_file_format
105+
.infer_stats(state, store, table_schema, object)
106+
.await
107+
}
108+
109+
async fn create_physical_plan(
110+
&self,
111+
state: &SessionState,
112+
conf: FileScanConfig,
113+
filters: Option<&Arc<dyn PhysicalExpr>>,
114+
) -> Result<Arc<dyn ExecutionPlan>> {
115+
self.csv_file_format
116+
.create_physical_plan(state, conf, filters)
117+
.await
118+
}
119+
120+
async fn create_writer_physical_plan(
121+
&self,
122+
input: Arc<dyn ExecutionPlan>,
123+
state: &SessionState,
124+
conf: FileSinkConfig,
125+
order_requirements: Option<Vec<PhysicalSortRequirement>>,
126+
) -> Result<Arc<dyn ExecutionPlan>> {
127+
self.csv_file_format
128+
.create_writer_physical_plan(input, state, conf, order_requirements)
129+
.await
130+
}
131+
}
132+
133+
#[derive(Default)]
134+
/// Factory for creating TSV file formats
135+
///
136+
/// This factory is a wrapper around the CSV file format factory
137+
/// for demonstration purposes.
138+
pub struct TSVFileFactory {
139+
csv_file_factory: CsvFormatFactory,
140+
}
141+
142+
impl TSVFileFactory {
143+
pub fn new() -> Self {
144+
Self {
145+
csv_file_factory: CsvFormatFactory::new(),
146+
}
147+
}
148+
}
149+
150+
impl FileFormatFactory for TSVFileFactory {
151+
fn create(
152+
&self,
153+
state: &SessionState,
154+
format_options: &std::collections::HashMap<String, String>,
155+
) -> Result<std::sync::Arc<dyn FileFormat>> {
156+
let mut new_options = format_options.clone();
157+
new_options.insert("format.delimiter".to_string(), "\t".to_string());
158+
159+
let csv_file_format = self.csv_file_factory.create(state, &new_options)?;
160+
let tsv_file_format = Arc::new(TSVFileFormat::new(csv_file_format));
161+
162+
Ok(tsv_file_format)
163+
}
164+
165+
fn default(&self) -> std::sync::Arc<dyn FileFormat> {
166+
todo!()
167+
}
168+
}
169+
170+
impl GetExt for TSVFileFactory {
171+
fn get_ext(&self) -> String {
172+
"tsv".to_string()
173+
}
174+
}
175+
176+
#[tokio::main]
177+
async fn main() -> Result<()> {
178+
// Create a new context with the default configuration
179+
let config = SessionConfig::new();
180+
let runtime = RuntimeEnv::default();
181+
let mut state = SessionState::new_with_config_rt(config, Arc::new(runtime));
182+
183+
// Register the custom file format
184+
let file_format = Arc::new(TSVFileFactory::new());
185+
state.register_file_format(file_format, true).unwrap();
186+
187+
// Create a new context with the custom file format
188+
let ctx = SessionContext::new_with_state(state);
189+
190+
let mem_table = create_mem_table();
191+
ctx.register_table("mem_table", mem_table).unwrap();
192+
193+
let temp_dir = tempdir().unwrap();
194+
let table_save_path = temp_dir.path().join("mem_table.tsv");
195+
196+
let d = ctx
197+
.sql(&format!(
198+
"COPY mem_table TO '{}' STORED AS TSV;",
199+
table_save_path.display(),
200+
))
201+
.await?;
202+
203+
let results = d.collect().await?;
204+
println!(
205+
"Number of inserted rows: {:?}",
206+
(results[0]
207+
.column_by_name("count")
208+
.unwrap()
209+
.as_primitive::<UInt64Type>()
210+
.value(0))
211+
);
212+
213+
Ok(())
214+
}
215+
216+
// create a simple mem table
217+
fn create_mem_table() -> Arc<MemTable> {
218+
let fields = vec![
219+
Field::new("id", DataType::UInt8, false),
220+
Field::new("data", DataType::Utf8, false),
221+
];
222+
let schema = Arc::new(Schema::new(fields));
223+
224+
let partitions = RecordBatch::try_new(
225+
schema.clone(),
226+
vec![
227+
Arc::new(UInt8Array::from(vec![1, 2])),
228+
Arc::new(StringArray::from(vec!["foo", "bar"])),
229+
],
230+
)
231+
.unwrap();
232+
233+
Arc::new(MemTable::try_new(schema, vec![vec![partitions]]).unwrap())
234+
}

0 commit comments

Comments
 (0)