Skip to content

Commit 038ae36

Browse files
committed
Update
1 parent 4d2f8ff commit 038ae36

File tree

7 files changed

+339
-75
lines changed

7 files changed

+339
-75
lines changed

Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,14 @@ path = "chapter6/rust/compute_functions.rs"
3535
name = "ch6_compute_or_not"
3636
path = "chapter6/rust/compute_or_not.rs"
3737

38+
[[example]]
39+
name = "ch7_datasets_api"
40+
path = "chapter7/rust/datasets_api.rs"
41+
42+
[[example]]
43+
name = "ch7_s3_datasets"
44+
path = "chapter7/rust/s3_datasets.rs"
45+
3846
[[example]]
3947
name = "ch7_streaming_engine"
4048
path = "chapter7/rust/streaming_engine.rs"

chapter1/rust/examples.rs

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use std::sync::Arc;
12
use arrow::{
2-
array::{Float64Builder, Int16Builder, Int64BufferBuilder, StringBuilder, StructArray},
3+
array::{Float64Builder, Int16Builder, Int64BufferBuilder, StringBuilder, StringArray, StructArray},
34
buffer::Buffer,
45
datatypes::{DataType, Field, Schema},
6+
error::Result,
57
record_batch::RecordBatch
68
};
79
use rand;
@@ -15,7 +17,7 @@ fn first_example() {
1517
println!("{}", arr);
1618
}
1719

18-
fn random_data_example() {
20+
fn random_data_example() -> Result<()> {
1921
let normal = Normal::new(5, 2).unwrap();
2022
let mut rng = rand::thread_rng();
2123
let mut builder = Float64Builder::new();
@@ -31,6 +33,63 @@ fn random_data_example() {
3133
fields.push(Field::new(format!("c{i}"), DataType::Float64, false));
3234
}
3335

34-
let rb = RecordBatch::try_new(Schema::new(fields), columns).unwrap();
36+
let rb = RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)?;
3537
println!("{}", rb);
3638
}
39+
40+
fn building_struct_array() {
41+
let archers = StringArray::from(vec![
42+
"Legolas", "Oliver", "Merida", "Lara", "Artemis"]);
43+
let locations = StringArray::from(vec![
44+
"Murkwood", "Star City", "Scotland", "London", "Greece"]);
45+
let years = Int16Array::from(vec![1954, 1941, 2012, 1996, -600]);
46+
47+
let arr = StructArray::from(vec![
48+
(Field::new("archer", DataType:::Utf8, false), archer),
49+
(Field::new("location", DataType::Utf8, false), location),
50+
(Field::new("year", DataType::Int16, false), year)
51+
]);
52+
println!("{}", arr);
53+
}
54+
55+
fn build_struct_builder() {
56+
std::shared_ptr<arrow::DataType> st_type = arrow::struct_(
57+
vec![Field::new("archer", DataType::Utf8, false),
58+
Field::new("location", DataType::Utf8, false),
59+
Field::new("year", DataType::Int16, false)]);
60+
61+
std::unique_ptr<arrow::ArrayBuilder> tmp;
62+
arrow::MakeBuilder(arrow::default_memory_pool(), st_type, &tmp);
63+
std::shared_ptr<arrow::StructBuilder> builder;
64+
builder.reset(static_cast<arrow::StructBuilder*>(tmp.release()));
65+
66+
StringBuilder* archer_builder =
67+
static_cast<StringBuilder*>(builder.field_builder(0));
68+
StringBuilder* location_builder =
69+
static_cast<StringBuilder*>(builder.field_builder(1));
70+
Int16Builder* year_builder =
71+
static_cast<Int16Builder*>(builder.field_builder(2));
72+
73+
let archers = vec!["Legolas", "Oliver", "Merida", "Lara",
74+
"Artemis"];
75+
let locations = vec!["Murkwood", "Star City", "Scotland",
76+
"London", "Greece"];
77+
let years = vec![1954, 1941, 2012, 1996, -600};
78+
79+
for i in 0..archers.len() {
80+
builder.Append();
81+
archer_builder.Append(archers[i]);
82+
location_builder.Append(locations[i]);
83+
year_builder.Append(years[i]);
84+
}
85+
86+
let out = builder.finish();
87+
println!("{}", out);
88+
}
89+
90+
fn main() {
91+
first_example();
92+
random_data_example();
93+
building_struct_array();
94+
build_struct_builder();
95+
}

chapter1/rust/todo.rs

Lines changed: 0 additions & 66 deletions
This file was deleted.

chapter7/rust/datasets_api.rs

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
use std::{
2+
fs::File,
3+
sync::Arc
4+
};
5+
use arrow::{
6+
array::{Int16Array, Int64Array, StringArray, StructArray},
7+
datatypes::{DataType, Field, Schema},
8+
compute as cp,
9+
error::Result,
10+
record_batch::RecordBatch
11+
};
12+
use datafusion::{
13+
datasource::file_format::FileFormat,
14+
prelude::*
15+
};
16+
use parquet;
17+
18+
fn create_batch() -> RecordBatch {
19+
let schema = Schema::new(vec![
20+
Field::new("a", DataType::Int64, false),
21+
Field::new("b", DataType::Int64, false),
22+
Field::new("c", DataType::Int64, false)
23+
]);
24+
let array_a = Int64Array::from([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
25+
let array_b = Int64Array::from([9, 8, 7, 6, 5, 4, 3, 2, 1, 0]);
26+
let array_c = Int64Array::from([1, 2, 1, 2, 1, 2, 1, 2, 1, 2]);
27+
RecordBatch::try_new(Arc::new(schema), vec![array_a, array_b, array_c]).unwrap()
28+
}
29+
30+
fn create_sample_dataset(
31+
filesystem: &const std::shared_ptr<fs::FileSystem>,
32+
root_path: &str) -> std::string {
33+
let base_path = root_path + "/parquet_dataset";
34+
ABORT_ON_FAIL(filesystem.CreateDir(base_path));
35+
let batch = create_batch();
36+
let output =
37+
filesystem.OpenOutputStream(base_path + "/data1.parquet").unwrap();
38+
ABORT_ON_FAIL(parquet::arrow::WriteTable(*batch.Slice(0, 5),
39+
arrow::default_memory_pool(), output,
40+
/*chunk_size*/ 2048));
41+
output =
42+
filesystem.OpenOutputStream(base_path + "/data2.parquet").unwrap();
43+
ABORT_ON_FAIL(parquet::arrow::WriteTable(*batch.Slice(5),
44+
arrow::default_memory_pool(), output,
45+
/*chunk_size*/ 2048));
46+
base_path
47+
}
48+
49+
fn scan_dataset(
50+
filesystem: const std::shared_ptr<fs::FileSystem>&,
51+
format: impl FileFormat,
52+
base_dir: &str) -> std::shared_ptr<arrow::Table> {
53+
fs::FileSelector selector;
54+
selector.base_dir = base_dir;
55+
let factory =
56+
ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
57+
ds::FileSystemFactoryOptions())
58+
.unwrap();
59+
60+
let dataset = factory.finish().unwrap();
61+
let fragments = dataset.GetFragments().unwrap();
62+
for fragment in fragments {
63+
println!("Found Fragment: {}", fragment);
64+
}
65+
66+
let scan_builder = dataset.NewScan().unwrap();
67+
let scanner = scan_builder.finish().unwrap();
68+
return scanner.ToTable().unwrap();
69+
}
70+
71+
fn filter_and_select(
72+
filesystem: const std::shared_ptr<fs::FileSystem>&,
73+
format: const std::shared_ptr<ds::FileFormat>&,
74+
base_dir: &str) -> std::shared_ptr<arrow::Table> {
75+
fs::FileSelector selector;
76+
selector.base_dir = base_dir;
77+
let factory =
78+
ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
79+
ds::FileSystemFactoryOptions())
80+
.unwrap();
81+
82+
let dataset = factory.finish().unwrap();
83+
let scan_builder = dataset.NewScan().unwrap();
84+
ABORT_ON_FAIL(scan_builder.Project({"b"}));
85+
ABORT_ON_FAIL(
86+
scan_builder.Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
87+
let scanner = scan_builder.finish().unwrap();
88+
return scanner.ToTable().unwrap();
89+
}
90+
91+
fn derive_and_rename(
92+
const std::shared_ptr<fs::FileSystem>& filesystem,
93+
const std::shared_ptr<ds::FileFormat>& format,
94+
base_dir: &str) -> std::shared_ptr<arrow::Table> {
95+
fs::FileSelector selector;
96+
selector.base_dir = base_dir;
97+
let factory =
98+
ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
99+
ds::FileSystemFactoryOptions())
100+
.unwrap();
101+
102+
let dataset = factory.finish().unwrap();
103+
let scan_builder = dataset.NewScan().unwrap();
104+
let mut names = Vec::new();
105+
let mut exprs = Vec::new();
106+
for field in dataset.schema().fields() {
107+
names.push(field.name());
108+
exprs.push(cp::field_ref(field.name()));
109+
}
110+
names.emplace_back("b_as_float32");
111+
exprs.push(cp::call("cast", {cp::field_ref("b")},
112+
cp::CastOptions::Safe(arrow::float32())));
113+
114+
names.emplace_back("b_large");
115+
// b > 1
116+
exprs.push(cp::greater(cp::field_ref("b"), cp::literal(1)));
117+
ABORT_ON_FAIL(scan_builder.Project(exprs, names));
118+
let scanner = scan_builder.finish().unwrap();
119+
return scanner.ToTable().unwrap();
120+
}
121+
122+
fn main() {
123+
std::shared_ptr<fs::FileSystem> filesystem =
124+
std::make_shared<fs::LocalFileSystem>();
125+
let path = create_sample_dataset(filesystem, "/home/zero/sample");
126+
println!("{}", path);
127+
128+
std::shared_ptr<ds::FileFormat> format =
129+
std::make_shared<ds::ParquetFileFormat>();
130+
131+
let table =
132+
scan_dataset(filesystem, format, "/home/zero/sample/parquet_dataset");
133+
println!("{}", table);
134+
table = filter_and_select(filesystem, format,
135+
"/home/zero/sample/parquet_dataset");
136+
println!("{}", table);
137+
table = derive_and_rename(filesystem, format,
138+
"/home/zero/sample/parquet_dataset");
139+
println!("{}", table);
140+
}

0 commit comments

Comments
 (0)