diff --git a/Cargo.lock b/Cargo.lock index 626d47ce6..c8f4d402a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -286,6 +286,7 @@ dependencies = [ "fehler", "flate2", "futures", + "hex", "iai", "itertools 0.10.0", "log", @@ -335,9 +336,11 @@ dependencies = [ "pyo3-built", "rayon", "rust_decimal", + "serde_json", "sqlparser", "thiserror", "tokio", + "uuid", ] [[package]] @@ -1633,6 +1636,8 @@ dependencies = [ "chrono", "fallible-iterator", "postgres-protocol", + "serde", + "serde_json", "uuid", ] diff --git a/Justfile b/Justfile index 8216662df..eb238f555 100644 --- a/Justfile +++ b/Justfile @@ -21,8 +21,6 @@ test-python: setup-python cd connectorx-python && poetry run pytest connectorx/tests -v -s seed-db: - psql $POSTGRES_URL -c "DROP TABLE IF EXISTS test_table;" - psql $POSTGRES_URL -c "DROP TABLE IF EXISTS test_str;" psql $POSTGRES_URL -f scripts/postgres.sql # benches diff --git a/connectorx-python/Cargo.toml b/connectorx-python/Cargo.toml index 880485104..24b6ae100 100644 --- a/connectorx-python/Cargo.toml +++ b/connectorx-python/Cargo.toml @@ -22,9 +22,11 @@ numpy = "0.13" pyo3 = {version = "0.13", default-features = false, features = ["macros"]} pyo3-built = "0.4" rust_decimal = {version = "1", features = ["db-postgres"]} +serde_json = "1" sqlparser = "0.8.0" thiserror = "1" tokio = {version = "1", features = ["rt-multi-thread", "io-util"]} +uuid = "0.8" [build-dependencies] built = {version = "0.4", features = ["chrono"]} diff --git a/connectorx-python/connectorx/tests/test_read_sql.py b/connectorx-python/connectorx/tests/test_read_sql.py index 6ed52d962..156f8b693 100644 --- a/connectorx-python/connectorx/tests/test_read_sql.py +++ b/connectorx-python/connectorx/tests/test_read_sql.py @@ -223,3 +223,95 @@ def test_read_sql_on_utf8(postgres_url: str) -> None: }, ) assert_frame_equal(df, expected, check_names=True) + + +def test_types_binary(postgres_url: str) -> None: + query = "SELECT test_int16, test_char, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum FROM test_types" + df = read_sql(postgres_url, query) + expected = pd.DataFrame( + index=range(4), + data={ + "test_int16": pd.Series([0, 1, 2, 3], dtype="Int64"), + "test_char": pd.Series(["a", "b", "c", "d"], dtype="object"), + "test_uuid": pd.Series( + [ + "86b494cc-96b2-11eb-9298-3e22fbb9fe9d", + "86b49b84-96b2-11eb-9298-3e22fbb9fe9d", + "86b49c42-96b2-11eb-9298-3e22fbb9fe9d", + "86b49cce-96b2-11eb-9298-3e22fbb9fe9d" + ], dtype="object" + ), + "test_time": pd.Series(["08:12:40", "10:03:00", "23:00:10", "18:30:00"], dtype="object"), + "test_json": pd.Series( + [ + '{"customer":"John Doe","items":{"product":"Beer","qty":6}}', + '{"customer":"Lily Bush","items":{"product":"Diaper","qty":24}}', + '{"customer":"Josh William","items":{"product":"Toy Car","qty":1}}', + '{"customer":"Mary Clark","items":{"product":"Toy Train","qty":2}}', + ], dtype="object" + ), + "test_jsonb": pd.Series( + [ + '{"qty":6,"product":"Beer"}', + '{"qty":24,"product":"Diaper"}', + '{"qty":1,"product":"Toy Car"}', + '{"qty":2,"product":"Toy Train"}', + ], dtype="object" + ), + "test_bytea": pd.Series( + [ + b'test', + b'\xd0\x97\xd0\xb4\xd1\x80\xd0\xb0\xcc\x81\xd0\xb2\xd1\x81\xd1\x82\xd0\xb2\xd1\x83\xd0\xb9\xd1\x82\xd0\xb5', + b'123bhaf4', + b'\xf0\x9f\x98\x9c' + ], dtype="object"), + "test_enum": pd.Series(['happy', 'very happy', 'ecstatic', 'ecstatic'], dtype="object") + }, + ) + assert_frame_equal(df, expected, check_names=True) + + +def test_types_csv(postgres_url: str) -> None: + query = "SELECT test_int16, test_char, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text FROM test_types" + df = read_sql(postgres_url, query, protocol="csv") + expected = pd.DataFrame( + index=range(4), + data={ + "test_int16": pd.Series([0, 1, 2, 3], dtype="Int64"), + "test_char": pd.Series(["a", "b", "c", "d"], dtype="object"), + "test_uuid": pd.Series( + [ + "86b494cc-96b2-11eb-9298-3e22fbb9fe9d", + "86b49b84-96b2-11eb-9298-3e22fbb9fe9d", + "86b49c42-96b2-11eb-9298-3e22fbb9fe9d", + "86b49cce-96b2-11eb-9298-3e22fbb9fe9d" + ], dtype="object" + ), + "test_time": pd.Series(["08:12:40", "10:03:00", "23:00:10", "18:30:00"], dtype="object"), + "test_json": pd.Series( + [ + '{"customer":"John Doe","items":{"product":"Beer","qty":6}}', + '{"customer":"Lily Bush","items":{"product":"Diaper","qty":24}}', + '{"customer":"Josh William","items":{"product":"Toy Car","qty":1}}', + '{"customer":"Mary Clark","items":{"product":"Toy Train","qty":2}}', + ], dtype="object" + ), + "test_jsonb": pd.Series( + [ + '{"qty":6,"product":"Beer"}', + '{"qty":24,"product":"Diaper"}', + '{"qty":1,"product":"Toy Car"}', + '{"qty":2,"product":"Toy Train"}', + ], dtype="object" + ), + "test_bytea": pd.Series( + [ + b'test', + b'\xd0\x97\xd0\xb4\xd1\x80\xd0\xb0\xcc\x81\xd0\xb2\xd1\x81\xd1\x82\xd0\xb2\xd1\x83\xd0\xb9\xd1\x82\xd0\xb5', + b'123bhaf4', + b'\xf0\x9f\x98\x9c' + ], dtype="object"), + "test_enum": pd.Series(['happy', 'very happy', 'ecstatic', 'ecstatic'], dtype="object") + }, + ) + assert_frame_equal(df, expected, check_names=True) diff --git a/connectorx-python/src/pandas/destination.rs b/connectorx-python/src/pandas/destination.rs index dd108e7f2..0e38f03db 100644 --- a/connectorx-python/src/pandas/destination.rs +++ b/connectorx-python/src/pandas/destination.rs @@ -1,6 +1,6 @@ use super::pandas_columns::{ - BooleanBlock, DateTimeBlock, Float64Block, HasPandasColumn, Int64Block, PandasColumn, - PandasColumnObject, StringBlock, + BooleanBlock, BytesBlock, DateTimeBlock, Float64Block, HasPandasColumn, Int64Block, + PandasColumn, PandasColumnObject, StringBlock, }; use super::types::{PandasDType, PandasTypeSystem}; use anyhow::anyhow; @@ -156,7 +156,9 @@ impl<'a> Destination for PandasDestination<'a> { .collect() } } - PandasTypeSystem::String(_) => { + PandasTypeSystem::String(_) + | PandasTypeSystem::Str(_) + | PandasTypeSystem::Char(_) => { let block = StringBlock::extract(buf).map_err(|e| anyhow!(e))?; let cols = block.split()?; for (&cid, col) in cids.iter().zip_eq(cols) { @@ -167,6 +169,17 @@ impl<'a> Destination for PandasDestination<'a> { .collect() } } + PandasTypeSystem::Bytes(_) => { + let block = BytesBlock::extract(buf).map_err(|e| anyhow!(e))?; + let cols = block.split()?; + for (&cid, col) in cids.iter().zip_eq(cols) { + partitioned_columns[cid] = col + .partition(&counts) + .into_iter() + .map(|c| Box::new(c) as _) + .collect() + } + } PandasTypeSystem::DateTime(_) => { let block = DateTimeBlock::extract(buf).map_err(|e| anyhow!(e))?; let cols = block.split()?; diff --git a/connectorx-python/src/pandas/pandas_columns/bytes.rs b/connectorx-python/src/pandas/pandas_columns/bytes.rs new file mode 100644 index 000000000..68b354a15 --- /dev/null +++ b/connectorx-python/src/pandas/pandas_columns/bytes.rs @@ -0,0 +1,187 @@ +use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject}; +use anyhow::anyhow; +use connectorx::ConnectorAgentError; +use fehler::throws; +use ndarray::{ArrayViewMut2, Axis, Ix2}; +use numpy::{npyffi::NPY_TYPES, Element, PyArray, PyArrayDescr}; +use pyo3::{FromPyObject, Py, PyAny, PyResult, Python}; +use std::any::TypeId; +use std::sync::{Arc, Mutex}; + +#[derive(Clone)] +#[repr(transparent)] +pub struct PyBytes(Py); + +// In order to put it into a numpy array +impl Element for PyBytes { + const DATA_TYPE: numpy::DataType = numpy::DataType::Object; + fn is_same_type(dtype: &PyArrayDescr) -> bool { + unsafe { *dtype.as_dtype_ptr() }.type_num == NPY_TYPES::NPY_OBJECT as i32 + } +} + +pub struct BytesBlock<'a> { + data: ArrayViewMut2<'a, PyBytes>, + mutex: Arc>, + buf_size_mb: usize, +} + +impl<'a> FromPyObject<'a> for BytesBlock<'a> { + fn extract(ob: &'a PyAny) -> PyResult { + check_dtype(ob, "object")?; + let array = ob.downcast::>()?; + let data = unsafe { array.as_array_mut() }; + Ok(BytesBlock { + data, + mutex: Arc::new(Mutex::new(())), // allocate the lock here since only BytesBlock needs to aquire the GIL for now + buf_size_mb: 16, // in MB + }) + } +} + +impl<'a> BytesBlock<'a> { + #[throws(ConnectorAgentError)] + pub fn split(self) -> Vec> { + let mut ret = vec![]; + let mut view = self.data; + + let nrows = view.ncols(); + while view.nrows() > 0 { + let (col, rest) = view.split_at(Axis(0), 1); + view = rest; + ret.push(BytesColumn { + data: col + .into_shape(nrows)? + .into_slice() + .ok_or_else(|| anyhow!("get None for splitted String data"))?, + next_write: 0, + bytes_lengths: vec![], + bytes_buf: Vec::with_capacity(self.buf_size_mb * 2 << 20 * 11 / 10), // allocate a little bit more memory to avoid Vec growth + buf_size: self.buf_size_mb * 2 << 20, + mutex: self.mutex.clone(), + }) + } + ret + } +} + +pub struct BytesColumn<'a> { + data: &'a mut [PyBytes], + next_write: usize, + bytes_buf: Vec, + bytes_lengths: Vec, + buf_size: usize, + mutex: Arc>, +} + +impl<'a> PandasColumnObject for BytesColumn<'a> { + fn typecheck(&self, id: TypeId) -> bool { + id == TypeId::of::<&'static [u8]>() || id == TypeId::of::>() + } + fn len(&self) -> usize { + self.data.len() + } + fn typename(&self) -> &'static str { + std::any::type_name::<&'static [u8]>() + } + #[throws(ConnectorAgentError)] + fn finalize(&mut self) { + self.flush()?; + } +} + +impl<'a> PandasColumn> for BytesColumn<'a> { + #[throws(ConnectorAgentError)] + fn write(&mut self, val: Vec) { + self.bytes_lengths.push(val.len()); + self.bytes_buf.extend_from_slice(&val[..]); + self.try_flush()?; + } +} + +impl<'a> PandasColumn>> for BytesColumn<'a> { + #[throws(ConnectorAgentError)] + fn write(&mut self, val: Option>) { + match val { + Some(b) => { + self.bytes_lengths.push(b.len()); + self.bytes_buf.extend_from_slice(&b[..]); + self.try_flush()?; + } + None => { + self.bytes_lengths.push(0); + } + } + } +} + +impl HasPandasColumn for Vec { + type PandasColumn<'a> = BytesColumn<'a>; +} + +impl HasPandasColumn for Option> { + type PandasColumn<'a> = BytesColumn<'a>; +} + +impl<'a> BytesColumn<'a> { + pub fn partition(self, counts: &[usize]) -> Vec> { + let mut partitions = vec![]; + let mut data = self.data; + + for &c in counts { + let (splitted_data, rest) = data.split_at_mut(c); + data = rest; + + partitions.push(BytesColumn { + data: splitted_data, + next_write: 0, + bytes_lengths: vec![], + bytes_buf: Vec::with_capacity(self.buf_size), + buf_size: self.buf_size, + mutex: self.mutex.clone(), + }); + } + + partitions + } + + #[throws(ConnectorAgentError)] + pub fn flush(&mut self) { + let nstrings = self.bytes_lengths.len(); + + if nstrings > 0 { + let py = unsafe { Python::assume_gil_acquired() }; + + { + // allocation in python is not thread safe + let _guard = self + .mutex + .lock() + .map_err(|e| anyhow!("mutex poisoned {}", e))?; + let mut start = 0; + for (i, &len) in self.bytes_lengths.iter().enumerate() { + let end = start + len; + if len != 0 { + unsafe { + // allocate and write in the same time + *self.data.get_unchecked_mut(self.next_write + i) = PyBytes( + pyo3::types::PyBytes::new(py, &self.bytes_buf[start..end]).into(), + ); + }; + } + start = end; + } + } + + self.bytes_buf.truncate(0); + self.next_write += nstrings; + } + } + + #[throws(ConnectorAgentError)] + pub fn try_flush(&mut self) { + if self.bytes_buf.len() >= self.buf_size { + self.flush()?; + } + } +} diff --git a/connectorx-python/src/pandas/pandas_columns/mod.rs b/connectorx-python/src/pandas/pandas_columns/mod.rs index 9efc8aa5a..52ea2f912 100644 --- a/connectorx-python/src/pandas/pandas_columns/mod.rs +++ b/connectorx-python/src/pandas/pandas_columns/mod.rs @@ -1,10 +1,12 @@ mod boolean; +mod bytes; mod datetime; mod float64; mod int64; mod string; // TODO: use macro for integers +pub use crate::pandas::pandas_columns::bytes::{BytesBlock, BytesColumn}; pub use boolean::{BooleanBlock, BooleanColumn}; use connectorx::Result; pub use datetime::{DateTimeBlock, DateTimeColumn}; diff --git a/connectorx-python/src/pandas/pandas_columns/string.rs b/connectorx-python/src/pandas/pandas_columns/string.rs index 5e045c016..4acc2d592 100644 --- a/connectorx-python/src/pandas/pandas_columns/string.rs +++ b/connectorx-python/src/pandas/pandas_columns/string.rs @@ -90,6 +90,27 @@ impl<'r, 'a> PandasColumn<&'r str> for StringColumn<'a> { } } +impl<'a> PandasColumn for StringColumn<'a> { + #[throws(ConnectorAgentError)] + fn write(&mut self, val: String) { + let bytes = val.as_bytes(); + self.string_lengths.push(bytes.len()); + self.string_buf.extend_from_slice(bytes); + self.try_flush()?; + } +} + +impl<'a> PandasColumn for StringColumn<'a> { + #[throws(ConnectorAgentError)] + fn write(&mut self, val: char) { + let mut buffer = [0; 4]; // a char is max to 4 bytes + let bytes = val.encode_utf8(&mut buffer).as_bytes(); + self.string_lengths.push(bytes.len()); + self.string_buf.extend_from_slice(bytes); + self.try_flush()?; + } +} + impl<'r, 'a> PandasColumn> for StringColumn<'a> { #[throws(ConnectorAgentError)] fn write(&mut self, val: Option<&'r str>) { @@ -107,6 +128,41 @@ impl<'r, 'a> PandasColumn> for StringColumn<'a> { } } +impl<'a> PandasColumn> for StringColumn<'a> { + #[throws(ConnectorAgentError)] + fn write(&mut self, val: Option) { + match val { + Some(b) => { + let bytes = b.as_bytes(); + self.string_lengths.push(bytes.len()); + self.string_buf.extend_from_slice(bytes); + self.try_flush()?; + } + None => { + self.string_lengths.push(0); + } + } + } +} + +impl<'a> PandasColumn> for StringColumn<'a> { + #[throws(ConnectorAgentError)] + fn write(&mut self, val: Option) { + match val { + Some(b) => { + let mut buffer = [0; 4]; // a char is max to 4 bytes + let bytes = b.encode_utf8(&mut buffer).as_bytes(); + self.string_lengths.push(bytes.len()); + self.string_buf.extend_from_slice(bytes); + self.try_flush()?; + } + None => { + self.string_lengths.push(0); + } + } + } +} + impl<'r> HasPandasColumn for &'r str { type PandasColumn<'a> = StringColumn<'a>; } @@ -115,6 +171,22 @@ impl<'r> HasPandasColumn for Option<&'r str> { type PandasColumn<'a> = StringColumn<'a>; } +impl HasPandasColumn for String { + type PandasColumn<'a> = StringColumn<'a>; +} + +impl HasPandasColumn for Option { + type PandasColumn<'a> = StringColumn<'a>; +} + +impl HasPandasColumn for char { + type PandasColumn<'a> = StringColumn<'a>; +} + +impl HasPandasColumn for Option { + type PandasColumn<'a> = StringColumn<'a>; +} + impl<'a> StringColumn<'a> { pub fn partition(self, counts: &[usize]) -> Vec> { let mut partitions = vec![]; diff --git a/connectorx-python/src/pandas/transport.rs b/connectorx-python/src/pandas/transport.rs index 7a0bcede7..616c14112 100644 --- a/connectorx-python/src/pandas/transport.rs +++ b/connectorx-python/src/pandas/transport.rs @@ -1,13 +1,15 @@ use super::destination::PandasDestination; use super::types::PandasTypeSystem; -use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc}; +use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; use connectorx::{ impl_transport, sources::postgres::{Binary, PostgresSource, PostgresTypeSystem, CSV}, typesystem::TypeConversion, }; use rust_decimal::prelude::*; +use serde_json::{to_string, Value}; use std::marker::PhantomData; +use uuid::Uuid; pub struct PostgresPandasTransport<'py, P>(&'py (), PhantomData

); @@ -23,13 +25,19 @@ impl_transport!( { Int4[i32] => I64[i64] | conversion all } { Int8[i64] => I64[i64] | conversion all } { Bool[bool] => Bool[bool] | conversion all } - { Text[&'r str] => String[&'r str] | conversion all } - { BpChar[&'r str] => String[&'r str] | conversion none } - { VarChar[&'r str] => String[&'r str] | conversion none } + { Char[i8] => Char[char] | conversion half } + { Text[&'r str] => Str[&'r str] | conversion all } + { BpChar[&'r str] => Str[&'r str] | conversion none } + { VarChar[&'r str] => Str[&'r str] | conversion none } { Timestamp[NaiveDateTime] => DateTime[DateTime] | conversion half } { TimestampTz[DateTime] => DateTime[DateTime] | conversion all } { Date[NaiveDate] => DateTime[DateTime] | conversion half } - { Char[&'r str] => String[&'r str] | conversion none } + { UUID[Uuid] => String[String] | conversion half } + { JSON[Value] => String[String] | conversion half } + { JSONB[Value] => String[String] | conversion none } + { Time[NaiveTime] => String[String] | conversion half } + { ByteA[Vec] => Bytes[Vec] | conversion all } + { Enum[&'r str] => Str[&'r str] | conversion none } } ); @@ -45,13 +53,19 @@ impl_transport!( { Int4[i32] => I64[i64] | conversion all } { Int8[i64] => I64[i64] | conversion all } { Bool[bool] => Bool[bool] | conversion all } - { Text[&'r str] => String[&'r str] | conversion all } - { BpChar[&'r str] => String[&'r str] | conversion none } - { VarChar[&'r str] => String[&'r str] | conversion none } + { Char[i8] => Char[char] | conversion half } + { Text[&'r str] => Str[&'r str] | conversion all } + { BpChar[&'r str] => Str[&'r str] | conversion none } + { VarChar[&'r str] => Str[&'r str] | conversion none } { Timestamp[NaiveDateTime] => DateTime[DateTime] | conversion half } { TimestampTz[DateTime] => DateTime[DateTime] | conversion all } { Date[NaiveDate] => DateTime[DateTime] | conversion half } - { Char[&'r str] => String[&'r str] | conversion none } + { UUID[Uuid] => String[String] | conversion half } + { JSON[Value] => String[String] | conversion half } + { JSONB[Value] => String[String] | conversion none } + { Time[NaiveTime] => String[String] | conversion half } + { ByteA[Vec] => Bytes[Vec] | conversion all } + { Enum[&'r str] => Str[&'r str] | conversion none } } ); @@ -62,6 +76,18 @@ impl<'py, P> TypeConversion for PostgresPandasTransport<'py, P> { } } +impl<'py, P> TypeConversion for PostgresPandasTransport<'py, P> { + fn convert(val: NaiveTime) -> String { + val.to_string() + } +} + +impl<'py, P> TypeConversion for PostgresPandasTransport<'py, P> { + fn convert(val: i8) -> char { + val as u8 as char + } +} + impl<'py, P> TypeConversion> for PostgresPandasTransport<'py, P> { fn convert(val: NaiveDateTime) -> DateTime { DateTime::from_utc(val, Utc) @@ -73,3 +99,15 @@ impl<'py, P> TypeConversion> for PostgresPandasTranspor DateTime::from_utc(val.and_hms(0, 0, 0), Utc) } } + +impl<'py, P> TypeConversion for PostgresPandasTransport<'py, P> { + fn convert(val: Uuid) -> String { + val.to_string() + } +} + +impl<'py, P> TypeConversion for PostgresPandasTransport<'py, P> { + fn convert(val: Value) -> String { + to_string(&val).unwrap() + } +} diff --git a/connectorx-python/src/pandas/types.rs b/connectorx-python/src/pandas/types.rs index 875dc9e45..ba8c9fad0 100644 --- a/connectorx-python/src/pandas/types.rs +++ b/connectorx-python/src/pandas/types.rs @@ -9,7 +9,10 @@ pub enum PandasTypeSystem { F64(bool), I64(bool), Bool(bool), + Char(bool), + Str(bool), String(bool), + Bytes(bool), DateTime(bool), } @@ -19,7 +22,10 @@ impl_typesystem! { { F64 => f64 } { I64 => i64 } { Bool => bool } - { String => &'r str } + { Char => char } + { Str => &'r str } + { String => String } + { Bytes => Vec } { DateTime => DateTime } } } @@ -41,7 +47,10 @@ impl PandasDType for PandasTypeSystem { PandasTypeSystem::F64(_) => "float64", PandasTypeSystem::Bool(false) => "bool", PandasTypeSystem::Bool(true) => "boolean", + PandasTypeSystem::Char(_) => "object", + PandasTypeSystem::Str(_) => "object", PandasTypeSystem::String(_) => "object", + PandasTypeSystem::Bytes(_) => "object", PandasTypeSystem::DateTime(_) => "datetime64[ns]", } } @@ -51,7 +60,10 @@ impl PandasDType for PandasTypeSystem { PandasTypeSystem::I64(_) => "i8", PandasTypeSystem::F64(_) => "f8", PandasTypeSystem::Bool(_) => "b1", + PandasTypeSystem::Char(_) => "O", + PandasTypeSystem::Str(_) => "O", PandasTypeSystem::String(_) => "O", + PandasTypeSystem::Bytes(_) => "O", PandasTypeSystem::DateTime(_) => "M8[ns]", } } @@ -77,7 +89,10 @@ impl PandasDType for PandasTypeSystem { PandasTypeSystem::F64(_) => false, PandasTypeSystem::Bool(false) => false, PandasTypeSystem::Bool(true) => true, + PandasTypeSystem::Char(_) => false, // we use object instead of string (Extension) for now + PandasTypeSystem::Str(_) => false, // we use object instead of string (Extension) for now PandasTypeSystem::String(_) => false, // we use object instead of string (Extension) for now + PandasTypeSystem::Bytes(_) => false, // we use object instead of string (Extension) for now PandasTypeSystem::DateTime(_) => false, } } @@ -89,7 +104,10 @@ impl PandasDType for PandasTypeSystem { PandasTypeSystem::F64(_) => "FloatBlock", PandasTypeSystem::Bool(false) => "BoolBlock", PandasTypeSystem::Bool(true) => "ExtensionBlock", + PandasTypeSystem::Char(_) => "ObjectBlock", // we use object instead of string (Extension) for now + PandasTypeSystem::Str(_) => "ObjectBlock", // we use object instead of string (Extension) for now PandasTypeSystem::String(_) => "ObjectBlock", // we use object instead of string (Extension) for now + PandasTypeSystem::Bytes(_) => "ObjectBlock", // we use object instead of string (Extension) for now PandasTypeSystem::DateTime(_) => "DatetimeBlock", } } diff --git a/connectorx/Cargo.toml b/connectorx/Cargo.toml index fe1b860bc..7cfeba9e8 100644 --- a/connectorx/Cargo.toml +++ b/connectorx/Cargo.toml @@ -21,7 +21,7 @@ itertools = "0.10" log = "0.4" ndarray = "0.14" num-traits = "0.2" -postgres = {version = "0.19", features = ["with-chrono-0_4", "with-uuid-0_8"]} +postgres = {version = "0.19", features = ["with-chrono-0_4", "with-uuid-0_8", "with-serde_json-1"]} r2d2 = "0.8" r2d2_postgres = "0.18" rand = "0.8" @@ -37,6 +37,7 @@ strum = {version = "0.20", features = ["derive"]} thiserror = "1" tokio = {version = "1", features = ["rt-multi-thread", "io-util"]} uuid = "0.8" +hex = "0.4" [lib] crate-type = ["cdylib", "rlib"] diff --git a/connectorx/src/errors.rs b/connectorx/src/errors.rs index 2d021246c..2a54b4220 100644 --- a/connectorx/src/errors.rs +++ b/connectorx/src/errors.rs @@ -63,6 +63,9 @@ pub enum ConnectorAgentError { #[error(transparent)] ArrowError(#[from] arrow::error::ArrowError), + #[error(transparent)] + HexError(#[from] hex::FromHexError), + /// Any other errors that are too trivial to be put here explicitly. #[error(transparent)] Other(#[from] anyhow::Error), diff --git a/connectorx/src/sources/postgres/mod.rs b/connectorx/src/sources/postgres/mod.rs index 62fbac577..6ef075e28 100644 --- a/connectorx/src/sources/postgres/mod.rs +++ b/connectorx/src/sources/postgres/mod.rs @@ -8,6 +8,7 @@ use anyhow::anyhow; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; use csv::{ReaderBuilder, StringRecord, StringRecordsIntoIter}; use fehler::throw; +use hex::decode; use log::debug; use postgres::{ binary_copy::{BinaryCopyOutIter, BinaryCopyOutRow}, @@ -17,6 +18,7 @@ use postgres::{ use r2d2::{Pool, PooledConnection}; use r2d2_postgres::{postgres::NoTls, PostgresConnectionManager}; use rust_decimal::Decimal; +use serde_json::{from_str, Value}; use sql::{count_query, get_limit, limit1_query}; use std::marker::PhantomData; pub use typesystem::PostgresTypeSystem; @@ -257,7 +259,7 @@ impl<'a> PostgresBinarySourcePartitionParser<'a> { ) -> Self { Self { iter, - buf_size: buf_size, + buf_size, rowbuf: Vec::with_capacity(buf_size), ncols: schema.len(), current_row: 0, @@ -299,20 +301,22 @@ impl<'a> PartitionParser<'a> for PostgresBinarySourcePartitionParser<'a> { } macro_rules! impl_produce { - ($($t: ty),+) => { + ($($t: ty,)+) => { $( impl<'r, 'a> Produce<'r, $t> for PostgresBinarySourcePartitionParser<'a> { - fn produce(&mut self) -> Result<$t> { + fn produce(&'r mut self) -> Result<$t> { let (ridx, cidx) = self.next_loc()?; - let val = self.rowbuf[ridx].try_get(cidx)?; + let row = &self.rowbuf[ridx]; + let val = row.try_get(cidx)?; Ok(val) } } impl<'r, 'a> Produce<'r, Option<$t>> for PostgresBinarySourcePartitionParser<'a> { - fn produce(&mut self) -> Result> { + fn produce(&'r mut self) -> Result> { let (ridx, cidx) = self.next_loc()?; - let val = self.rowbuf[ridx].try_get(cidx)?; + let row = &self.rowbuf[ridx]; + let val = row.try_get(cidx)?; Ok(val) } } @@ -321,40 +325,24 @@ macro_rules! impl_produce { } impl_produce!( + i8, i16, i32, i64, f32, f64, + Decimal, bool, - DateTime, + &'r str, + Vec, + NaiveTime, NaiveDateTime, + DateTime, NaiveDate, - Decimal, Uuid, - NaiveTime + Value, ); -impl<'r, 'a> Produce<'r, &'r str> for PostgresBinarySourcePartitionParser<'a> { - fn produce(&'r mut self) -> Result<&'r str> { - let (ridx, cidx) = self.next_loc()?; - let row = &self.rowbuf[ridx]; - let val = row.try_get(cidx)?; - - Ok(val) - } -} - -impl<'r, 'a> Produce<'r, Option<&'r str>> for PostgresBinarySourcePartitionParser<'a> { - fn produce(&'r mut self) -> Result> { - let (ridx, cidx) = self.next_loc()?; - let row = &self.rowbuf[ridx]; - let val = row.try_get(cidx)?; - - Ok(val) - } -} - pub struct PostgresCSVSourceParser<'a> { iter: StringRecordsIntoIter>, buf_size: usize, @@ -372,7 +360,7 @@ impl<'a> PostgresCSVSourceParser<'a> { ) -> Self { Self { iter, - buf_size: buf_size, + buf_size, rowbuf: Vec::with_capacity(buf_size), ncols: schema.len(), current_row: 0, @@ -413,10 +401,10 @@ impl<'a> PartitionParser<'a> for PostgresCSVSourceParser<'a> { } macro_rules! impl_csv_produce { - ($($t: ty),+) => { + ($($t: ty,)+) => { $( impl<'r, 'a> Produce<'r, $t> for PostgresCSVSourceParser<'a> { - fn produce(&mut self) -> Result<$t> { + fn produce(&'r mut self) -> Result<$t> { let (ridx, cidx) = self.next_loc()?; self.rowbuf[ridx][cidx].parse().map_err(|_| { ConnectorAgentError::cannot_produce::<$t>(Some(self.rowbuf[ridx][cidx].into())) @@ -425,7 +413,7 @@ macro_rules! impl_csv_produce { } impl<'r, 'a> Produce<'r, Option<$t>> for PostgresCSVSourceParser<'a> { - fn produce(&mut self) -> Result> { + fn produce(&'r mut self) -> Result> { let (ridx, cidx) = self.next_loc()?; match &self.rowbuf[ridx][cidx][..] { "" => Ok(None), @@ -439,7 +427,7 @@ macro_rules! impl_csv_produce { }; } -impl_csv_produce!(i16, i32, i64, f32, f64, Decimal, Uuid); +impl_csv_produce!(i8, i16, i32, i64, f32, f64, Decimal, Uuid,); impl<'r, 'a> Produce<'r, bool> for PostgresCSVSourceParser<'a> { fn produce(&mut self) -> Result { @@ -539,6 +527,27 @@ impl<'r, 'a> Produce<'r, Option> for PostgresCSVSourceParser<'a> } } +impl<'r, 'a> Produce<'r, NaiveTime> for PostgresCSVSourceParser<'a> { + fn produce(&mut self) -> Result { + let (ridx, cidx) = self.next_loc()?; + NaiveTime::parse_from_str(&self.rowbuf[ridx][cidx], "%H:%M:%S").map_err(|_| { + ConnectorAgentError::cannot_produce::(Some(self.rowbuf[ridx][cidx].into())) + }) + } +} + +impl<'r, 'a> Produce<'r, Option> for PostgresCSVSourceParser<'a> { + fn produce(&mut self) -> Result> { + let (ridx, cidx) = self.next_loc()?; + match &self.rowbuf[ridx][cidx][..] { + "" => Ok(None), + v => Ok(Some(NaiveTime::parse_from_str(v, "%H:%M:%S").map_err( + |_| ConnectorAgentError::cannot_produce::(Some(v.into())), + )?)), + } + } +} + impl<'r, 'a> Produce<'r, &'r str> for PostgresCSVSourceParser<'a> { fn produce(&'r mut self) -> Result<&'r str> { let (ridx, cidx) = self.next_loc()?; @@ -556,11 +565,40 @@ impl<'r, 'a> Produce<'r, Option<&'r str>> for PostgresCSVSourceParser<'a> { } } -// impl<'r, 'a> Produce<'r, NaiveTime> for PostgresCSVSourceParser<'a> { -// fn produce(&mut self) -> Result { -// let (ridx, cidx) = self.next_loc()?; -// NaiveTime::parse_from_str(&self.rowbuf[ridx][cidx], "%H:%M:%S").map_err(|_| { -// ConnectorAgentError::cannot_produce::(Some(self.rowbuf[ridx][cidx].into())) -// }) -// } -// } +impl<'r, 'a> Produce<'r, Vec> for PostgresCSVSourceParser<'a> { + fn produce(&'r mut self) -> Result> { + let (ridx, cidx) = self.next_loc()?; + Ok(decode(&self.rowbuf[ridx][cidx][2..])?) // escape \x in the beginning + } +} + +impl<'r, 'a> Produce<'r, Option>> for PostgresCSVSourceParser<'a> { + fn produce(&'r mut self) -> Result>> { + let (ridx, cidx) = self.next_loc()?; + match &self.rowbuf[ridx][cidx][2..] { + // escape \x in the beginning + "" => Ok(None), + v => Ok(Some(decode(&v)?)), + } + } +} + +impl<'r, 'a> Produce<'r, Value> for PostgresCSVSourceParser<'a> { + fn produce(&'r mut self) -> Result { + let (ridx, cidx) = self.next_loc()?; + let v = &self.rowbuf[ridx][cidx]; + from_str(v).map_err(|_| ConnectorAgentError::cannot_produce::(Some(v.into()))) + } +} + +impl<'r, 'a> Produce<'r, Option> for PostgresCSVSourceParser<'a> { + fn produce(&'r mut self) -> Result> { + let (ridx, cidx) = self.next_loc()?; + + match &self.rowbuf[ridx][cidx][..] { + "" => Ok(None), + v => from_str(v) + .map_err(|_| ConnectorAgentError::cannot_produce::(Some(v.into()))), + } + } +} diff --git a/connectorx/src/sources/postgres/typesystem.rs b/connectorx/src/sources/postgres/typesystem.rs index 59b771175..c0c619706 100644 --- a/connectorx/src/sources/postgres/typesystem.rs +++ b/connectorx/src/sources/postgres/typesystem.rs @@ -1,6 +1,7 @@ use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; use postgres::types::Type; use rust_decimal::Decimal; +use serde_json::Value; use uuid::Uuid; #[derive(Copy, Clone, Debug)] @@ -13,34 +14,39 @@ pub enum PostgresTypeSystem { Int4(bool), Int8(bool), Date(bool), + Char(bool), BpChar(bool), VarChar(bool), Text(bool), + ByteA(bool), + Time(bool), Timestamp(bool), TimestampTz(bool), UUID(bool), - Char(bool), - // Time(bool), - // Interval(bool), + JSON(bool), + JSONB(bool), + Enum(bool), } impl_typesystem! { system = PostgresTypeSystem, mappings = { - { Int2 => i16} + { Int2 => i16 } { Int4 => i32 } { Int8 => i64 } { Float4 => f32 } { Float8 => f64 } { Numeric => Decimal } { Bool => bool } - { Text | BpChar | VarChar | Char => &'r str } + { Char => i8 } + { Text | BpChar | VarChar | Enum => &'r str } + { ByteA => Vec } + { Time => NaiveTime } { Timestamp => NaiveDateTime } { TimestampTz => DateTime } { Date => NaiveDate } - { UUID => Uuid} - // { Time => NaiveTime } - // { Interval => &'r str } + { UUID => Uuid } + { JSON | JSONB => Value } } } @@ -55,17 +61,22 @@ impl<'a> From<&'a Type> for PostgresTypeSystem { "float8" => Float8(true), "numeric" => Numeric(true), "bool" => Bool(true), + "char" => Char(true), "text" => Text(true), "bpchar" => BpChar(true), "varchar" => VarChar(true), + "bytea" => ByteA(true), + "time" => Time(true), "timestamp" => TimestampTz(true), "timestamptz" => Timestamp(true), "date" => Date(true), "uuid" => UUID(true), - "char" => Char(true), - // "time" => Time(true), - // "interval" => Interval(true), - ty => unimplemented!("{}", ty), + "json" => JSON(true), + "jsonb" => JSONB(true), + _ => match ty.kind() { + postgres::types::Kind::Enum(_) => Enum(true), + _ => unimplemented!("{}", ty.name()), + }, } } } @@ -85,13 +96,16 @@ impl<'a> From for Type { Text(_) => Type::TEXT, BpChar(_) => Type::BPCHAR, VarChar(_) => Type::VARCHAR, + Char(_) => Type::CHAR, + ByteA(_) => Type::BYTEA, + Date(_) => Type::DATE, + Time(_) => Type::TIME, Timestamp(_) => Type::TIMESTAMP, TimestampTz(_) => Type::TIMESTAMPTZ, - Date(_) => Type::DATE, UUID(_) => Type::UUID, - Char(_) => Type::CHAR, - // Time(_) => Type::TIME, - // Interval(_) => Type::INTERVAL + JSON(_) => Type::JSON, + JSONB(_) => Type::JSONB, + Enum(_) => Type::TEXT, } } } diff --git a/connectorx/tests/test_postgres.rs b/connectorx/tests/test_postgres.rs index 91ba69d53..5267bd059 100644 --- a/connectorx/tests/test_postgres.rs +++ b/connectorx/tests/test_postgres.rs @@ -115,49 +115,6 @@ fn test_postgres() { ); } -#[test] -fn test_postgres_new_types() { - let _ = env_logger::builder().is_test(true).try_init(); - - let dburl = env::var("POSTGRES_URL").unwrap(); - - let queries = [ - "select * from test_uuid_char_int16 where test_int16 < 2", - "select * from test_uuid_char_int16 where test_int16 >= 2", - ]; - let builder = PostgresSource::new(&dburl, 2).unwrap(); - let mut destination = MemoryDestination::new(); - let dispatcher = Dispatcher::<_, _, PostgresMemoryTransport>::new( - builder, - &mut destination, - &queries, - ); - - dispatcher.run().expect("run dispatcher"); - assert_eq!( - array![Some(0), Some(1), Some(2), Some(3)], - destination.column_view::>(0).unwrap() - ); - assert_eq!( - array![ - Some("a".to_string()), - Some("b".to_string()), - Some("c".to_string()), - Some("d".to_string()) - ], - destination.column_view::>(1).unwrap() - ); - assert_eq!( - array![ - Some("86b494cc-96b2-11eb-9298-3e22fbb9fe9d".to_string()), - Some("86b49b84-96b2-11eb-9298-3e22fbb9fe9d".to_string()), - Some("86b49c42-96b2-11eb-9298-3e22fbb9fe9d".to_string()), - Some("86b49cce-96b2-11eb-9298-3e22fbb9fe9d".to_string()), - ], - destination.column_view::>(2).unwrap() - ); -} - #[test] fn test_postgres_agg() { let _ = env_logger::builder().is_test(true).try_init(); @@ -285,43 +242,3 @@ fn test_postgres_csv() { dst.column_view::>(4).unwrap() ); } - -#[test] -fn test_postgres_new_types_csv() { - let _ = env_logger::builder().is_test(true).try_init(); - - let dburl = env::var("POSTGRES_URL").unwrap(); - - let queries = [ - "select * from test_uuid_char_int16 where test_int16 < 2", - "select * from test_uuid_char_int16 where test_int16 >= 2", - ]; - let builder = PostgresSource::::new(&dburl, 2).unwrap(); - let mut dst = MemoryDestination::new(); - let dispatcher = - Dispatcher::<_, _, PostgresMemoryTransport>::new(builder, &mut dst, &queries); - - dispatcher.run().expect("run dispatcher"); - assert_eq!( - array![Some(0), Some(1), Some(2), Some(3)], - dst.column_view::>(0).unwrap() - ); - assert_eq!( - array![ - Some("a".to_string()), - Some("b".to_string()), - Some("c".to_string()), - Some("d".to_string()) - ], - dst.column_view::>(1).unwrap() - ); - assert_eq!( - array![ - Some("86b494cc-96b2-11eb-9298-3e22fbb9fe9d".to_string()), - Some("86b49b84-96b2-11eb-9298-3e22fbb9fe9d".to_string()), - Some("86b49c42-96b2-11eb-9298-3e22fbb9fe9d".to_string()), - Some("86b49cce-96b2-11eb-9298-3e22fbb9fe9d".to_string()), - ], - dst.column_view::>(2).unwrap() - ); -} diff --git a/scripts/postgres.sql b/scripts/postgres.sql index 346664138..ceca7d208 100644 --- a/scripts/postgres.sql +++ b/scripts/postgres.sql @@ -1,3 +1,8 @@ +DROP TABLE IF EXISTS test_table; +DROP TABLE IF EXISTS test_str; +DROP TABLE IF EXISTS test_types; +DROP TYPE IF EXISTS happiness; + CREATE TABLE IF NOT EXISTS test_table( test_int INTEGER NOT NULL, test_nullint INTEGER, @@ -28,14 +33,21 @@ INSERT INTO test_str VALUES (5, 'Latin1', '¥§¤®ð'); INSERT INTO test_str VALUES (6, 'Extra', 'y̆'); INSERT INTO test_str VALUES (7, 'Mixed', 'Ha好ち😁ðy̆'); -CREATE TABLE IF NOT EXISTS test_uuid_char_int16( +CREATE TYPE happiness AS ENUM ('happy', 'very happy', 'ecstatic'); +CREATE TABLE IF NOT EXISTS test_types( test_int16 SMALLINT, test_char CHAR, - test_uuid UUID NOT NULL + test_uuid UUID NOT NULL, + test_time TIME, + test_interval INTERVAL, + test_json JSON, + test_jsonb JSONB, + test_bytea BYTEA, + test_enum happiness ); -INSERT INTO test_uuid_char_int16 VALUES (0, 'a', '86b494cc-96b2-11eb-9298-3e22fbb9fe9d'); -INSERT INTO test_uuid_char_int16 VALUES (1, 'b', '86b49b84-96b2-11eb-9298-3e22fbb9fe9d'); -INSERT INTO test_uuid_char_int16 VALUES (2, 'c', '86b49c42-96b2-11eb-9298-3e22fbb9fe9d'); -INSERT INTO test_uuid_char_int16 VALUES (3, 'd', '86b49cce-96b2-11eb-9298-3e22fbb9fe9d'); +INSERT INTO test_types VALUES (0, 'a', '86b494cc-96b2-11eb-9298-3e22fbb9fe9d', '08:12:40', '1 year 2 months 3 days', '{"customer": "John Doe", "items": {"product": "Beer","qty": 6}}', '{"product": "Beer","qty": 6}', 'test', 'happy'); +INSERT INTO test_types VALUES (1, 'b', '86b49b84-96b2-11eb-9298-3e22fbb9fe9d', '10:03:00', '2 weeks ago', '{"customer": "Lily Bush", "items": {"product": "Diaper","qty": 24}}', '{"product": "Diaper","qty": 24}', 'Здра́вствуйте', 'very happy'); +INSERT INTO test_types VALUES (2, 'c', '86b49c42-96b2-11eb-9298-3e22fbb9fe9d', '23:00:10', '3 months 2 days ago', '{"customer": "Josh William", "items": {"product": "Toy Car","qty": 1}}', '{"product": "Toy Car","qty": 1}', '123bhaf4', 'ecstatic'); +INSERT INTO test_types VALUES (3, 'd', '86b49cce-96b2-11eb-9298-3e22fbb9fe9d', '18:30:00', '3 year', '{"customer": "Mary Clark", "items": {"product": "Toy Train","qty": 2}}', '{"product": "Toy Train","qty": 2}', '😜', 'ecstatic');