Skip to content

Commit 26f2b6a

Browse files
authored
GeoDataFusion: AsText, AsBinary, GeomFromText, GeomFromWKB (#1105)
- Add `AsText`, `AsBinary`, `GeomFromText`, `GeomFromWKB` scalar UDFs to geodatafusion
1 parent 3205ccb commit 26f2b6a

File tree

4 files changed

+250
-126
lines changed

4 files changed

+250
-126
lines changed

rust/geodatafusion/src/udf/native/io/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
33
// mod geohash;
44
mod wkb;
5-
// mod wkt;
5+
mod wkt;
6+
7+
pub use wkb::{AsBinary, GeomFromWKB};
8+
pub use wkt::{AsText, GeomFromText};
69

710
// use datafusion::prelude::SessionContext;
811

rust/geodatafusion/src/udf/native/io/wkb.rs

Lines changed: 117 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,35 @@ use datafusion::error::{DataFusionError, Result};
66
use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER;
77
use datafusion::logical_expr::{
88
ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature,
9+
Volatility,
910
};
10-
use geoarrow_array::array::from_arrow_array;
11-
use geoarrow_array::cast::to_wkb;
11+
use geoarrow_array::array::{WkbArray, from_arrow_array};
12+
use geoarrow_array::cast::{from_wkb, to_wkb};
1213
use geoarrow_array::{GeoArrowArray, GeoArrowType};
13-
use geoarrow_schema::WkbType;
14+
use geoarrow_schema::{CoordType, GeometryType, WkbType};
1415

1516
use crate::data_types::any_single_geometry_type_input;
16-
use crate::error::GeoDataFusionError;
17+
use crate::error::{GeoDataFusionError, GeoDataFusionResult};
1718

1819
#[derive(Debug)]
19-
pub(super) struct AsBinary {
20+
pub struct AsBinary {
2021
signature: Signature,
2122
}
2223

2324
impl AsBinary {
2425
pub fn new() -> Self {
25-
// TODO: extend to allow specifying little/big endian
2626
Self {
2727
signature: any_single_geometry_type_input(),
2828
}
2929
}
3030
}
3131

32+
impl Default for AsBinary {
33+
fn default() -> Self {
34+
Self::new()
35+
}
36+
}
37+
3238
static AS_BINARY_DOC: OnceLock<Documentation> = OnceLock::new();
3339

3440
impl ScalarUDFImpl for AsBinary {
@@ -49,20 +55,20 @@ impl ScalarUDFImpl for AsBinary {
4955
}
5056

5157
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<Field> {
52-
let field = &args.arg_fields[0];
53-
let data_type = GeoArrowType::try_from(field).map_err(GeoDataFusionError::GeoArrow)?;
58+
let input_field = &args.arg_fields[0];
59+
let data_type =
60+
GeoArrowType::try_from(input_field).map_err(GeoDataFusionError::GeoArrow)?;
5461
let wkb_type = WkbType::new(data_type.metadata().clone());
55-
Ok(
56-
Field::new(field.name(), DataType::Binary, field.is_nullable())
57-
.with_extension_type(wkb_type),
62+
Ok(Field::new(
63+
input_field.name(),
64+
DataType::Binary,
65+
input_field.is_nullable(),
5866
)
67+
.with_extension_type(wkb_type))
5968
}
6069

6170
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
62-
let array = ColumnarValue::values_to_arrays(&args.args)?
63-
.into_iter()
64-
.next()
65-
.unwrap();
71+
let array = &ColumnarValue::values_to_arrays(&args.args)?[0];
6672
let field = args.arg_fields[0];
6773
let geo_array = from_arrow_array(&array, field).map_err(GeoDataFusionError::GeoArrow)?;
6874
let wkb_arr = to_wkb::<i32>(geo_array.as_ref()).map_err(GeoDataFusionError::GeoArrow)?;
@@ -82,64 +88,82 @@ impl ScalarUDFImpl for AsBinary {
8288
}
8389
}
8490

85-
// #[derive(Debug)]
86-
// pub(super) struct GeomFromWKB {
87-
// signature: Signature,
88-
// }
89-
90-
// impl GeomFromWKB {
91-
// pub fn new() -> Self {
92-
// Self {
93-
// signature: Signature::exact(vec![DataType::Binary], Volatility::Immutable),
94-
// }
95-
// }
96-
// }
97-
98-
// static GEOM_FROM_WKB_DOC: OnceLock<Documentation> = OnceLock::new();
99-
100-
// impl ScalarUDFImpl for GeomFromWKB {
101-
// fn as_any(&self) -> &dyn Any {
102-
// self
103-
// }
104-
105-
// fn name(&self) -> &str {
106-
// "st_geomfromwkb"
107-
// }
108-
109-
// fn signature(&self) -> &Signature {
110-
// &self.signature
111-
// }
112-
113-
// fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
114-
// Ok(GEOMETRY_TYPE().into())
115-
// }
116-
117-
// fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
118-
// Ok(geom_from_wkb_impl(args)?)
119-
// }
120-
121-
// fn documentation(&self) -> Option<&Documentation> {
122-
// Some(GEOM_FROM_WKB_DOC.get_or_init(|| {
123-
// Documentation::builder(DOC_SECTION_OTHER, "Takes a well-known binary representation of a geometry and a Spatial Reference System ID (SRID) and creates an instance of the appropriate geometry type", "ST_GeomFromWKB(buffer)")
124-
// .with_argument("geom", "WKB buffers")
125-
// .build()
126-
// }))
127-
// }
128-
// }
129-
130-
// fn geom_from_wkb_impl(args: &[ColumnarValue]) -> GeoDataFusionResult<ColumnarValue> {
131-
// let array = ColumnarValue::values_to_arrays(args)?
132-
// .into_iter()
133-
// .next()
134-
// .unwrap();
135-
// let wkb_arr = WkbArray::new(array.as_binary::<i32>().clone(), Default::default());
136-
// let native_arr = from_wkb(
137-
// &wkb_arr,
138-
// GeoArrowType::Geometry(GeometryType::new(CoordType::Separated, Default::default())),
139-
// false,
140-
// )?;
141-
// Ok(native_arr.to_array_ref().into())
142-
// }
91+
#[derive(Debug)]
92+
pub struct GeomFromWKB {
93+
signature: Signature,
94+
coord_type: CoordType,
95+
}
96+
97+
impl GeomFromWKB {
98+
pub fn new(coord_type: CoordType) -> Self {
99+
Self {
100+
signature: Signature::uniform(
101+
1,
102+
vec![DataType::Binary, DataType::LargeBinary],
103+
Volatility::Immutable,
104+
),
105+
coord_type,
106+
}
107+
}
108+
109+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> GeoDataFusionResult<ColumnarValue> {
110+
let array = &ColumnarValue::values_to_arrays(&args.args)?[0];
111+
let field = args.arg_fields[0];
112+
let to_type = GeoArrowType::try_from(args.return_field)?;
113+
let geom_arr = match field.data_type() {
114+
DataType::Binary => from_wkb(
115+
&WkbArray::<i32>::try_from((array.as_ref(), field))?,
116+
to_type,
117+
),
118+
DataType::LargeBinary => from_wkb(
119+
&WkbArray::<i64>::try_from((array.as_ref(), field))?,
120+
to_type,
121+
),
122+
_ => unreachable!(),
123+
}?;
124+
Ok(ColumnarValue::Array(geom_arr.to_array_ref()))
125+
}
126+
}
127+
128+
static GEOM_FROM_WKB_DOC: OnceLock<Documentation> = OnceLock::new();
129+
130+
impl ScalarUDFImpl for GeomFromWKB {
131+
fn as_any(&self) -> &dyn Any {
132+
self
133+
}
134+
135+
fn name(&self) -> &str {
136+
"st_geomfromwkb"
137+
}
138+
139+
fn signature(&self) -> &Signature {
140+
&self.signature
141+
}
142+
143+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
144+
Err(DataFusionError::Internal("return_type".to_string()))
145+
}
146+
147+
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<Field> {
148+
let input_field = &args.arg_fields[0];
149+
let data_type =
150+
GeoArrowType::try_from(input_field).map_err(GeoDataFusionError::GeoArrow)?;
151+
let geom_type = GeometryType::new(self.coord_type, data_type.metadata().clone());
152+
Ok(geom_type.to_field(input_field.name(), input_field.is_nullable()))
153+
}
154+
155+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
156+
Ok(self.invoke_with_args(args)?)
157+
}
158+
159+
fn documentation(&self) -> Option<&Documentation> {
160+
Some(GEOM_FROM_WKB_DOC.get_or_init(|| {
161+
Documentation::builder(DOC_SECTION_OTHER, "Takes a well-known binary representation of a geometry and a Spatial Reference System ID (SRID) and creates an instance of the appropriate geometry type", "ST_GeomFromWKB(buffer)")
162+
.with_argument("geom", "WKB buffers")
163+
.build()
164+
}))
165+
}
166+
}
143167

144168
#[cfg(test)]
145169
mod test {
@@ -148,6 +172,7 @@ mod test {
148172
use arrow_array::RecordBatch;
149173
use arrow_schema::Schema;
150174
use datafusion::prelude::SessionContext;
175+
use geoarrow_array::array::GeometryArray;
151176
use geoarrow_array::test::point;
152177
use geoarrow_schema::{CoordType, Crs, Dimension, Metadata};
153178

@@ -160,16 +185,17 @@ mod test {
160185
let crs = Crs::from_srid("4326".to_string());
161186
let metadata = Arc::new(Metadata::new(crs.clone(), Default::default()));
162187

163-
let geo_arr = point::array(CoordType::Separated, Dimension::XY).with_metadata(metadata);
188+
let point_arr = point::array(CoordType::Separated, Dimension::XY).with_metadata(metadata);
164189

165-
let arr = geo_arr.to_array_ref();
166-
let field = geo_arr.data_type().to_field("geometry", true);
190+
let arr = point_arr.to_array_ref();
191+
let field = point_arr.data_type().to_field("geometry", true);
167192
let schema = Schema::new([Arc::new(field)]);
168193
let batch = RecordBatch::try_new(Arc::new(schema), vec![arr]).unwrap();
169194

170195
ctx.register_batch("t", batch).unwrap();
171196

172197
ctx.register_udf(AsBinary::new().into());
198+
ctx.register_udf(GeomFromWKB::new(CoordType::Separated).into());
173199

174200
let sql_df = ctx
175201
.sql("SELECT ST_AsBinary(geometry) FROM t;")
@@ -185,5 +211,20 @@ mod test {
185211
let output_wkb_type = output_field.try_extension_type::<WkbType>().unwrap();
186212

187213
assert_eq!(&crs, output_wkb_type.metadata().crs());
214+
215+
let sql_df2 = ctx
216+
.sql("SELECT ST_GeomFromWKB(ST_AsBinary(geometry)) FROM t;")
217+
.await
218+
.unwrap();
219+
220+
let output_batches = sql_df2.collect().await.unwrap();
221+
assert_eq!(output_batches.len(), 1);
222+
let output_batch = &output_batches[0];
223+
let output_schema = output_batch.schema();
224+
let output_field = output_schema.field(0);
225+
let output_column = output_batch.column(0);
226+
let geom_arr = GeometryArray::try_from((output_column.as_ref(), output_field)).unwrap();
227+
228+
assert_eq!(geom_arr, GeometryArray::from(point_arr));
188229
}
189230
}

0 commit comments

Comments
 (0)