Skip to content

Commit 29aadf2

Browse files
authored
Merge pull request #71 from sfu-db/postgres_more_types
Postgres add support for more types
2 parents 89a28ae + 954c617 commit 29aadf2

File tree

16 files changed

+575
-163
lines changed

16 files changed

+575
-163
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Justfile

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ test-python: setup-python
2121
cd connectorx-python && poetry run pytest connectorx/tests -v -s
2222

2323
seed-db:
24-
psql $POSTGRES_URL -c "DROP TABLE IF EXISTS test_table;"
25-
psql $POSTGRES_URL -c "DROP TABLE IF EXISTS test_str;"
2624
psql $POSTGRES_URL -f scripts/postgres.sql
2725

2826
# benches

connectorx-python/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ numpy = "0.13"
2222
pyo3 = {version = "0.13", default-features = false, features = ["macros"]}
2323
pyo3-built = "0.4"
2424
rust_decimal = {version = "1", features = ["db-postgres"]}
25+
serde_json = "1"
2526
sqlparser = "0.8.0"
2627
thiserror = "1"
2728
tokio = {version = "1", features = ["rt-multi-thread", "io-util"]}
29+
uuid = "0.8"
2830

2931
[build-dependencies]
3032
built = {version = "0.4", features = ["chrono"]}

connectorx-python/connectorx/tests/test_read_sql.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,3 +223,95 @@ def test_read_sql_on_utf8(postgres_url: str) -> None:
223223
},
224224
)
225225
assert_frame_equal(df, expected, check_names=True)
226+
227+
228+
def test_types_binary(postgres_url: str) -> None:
229+
query = "SELECT test_int16, test_char, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum FROM test_types"
230+
df = read_sql(postgres_url, query)
231+
expected = pd.DataFrame(
232+
index=range(4),
233+
data={
234+
"test_int16": pd.Series([0, 1, 2, 3], dtype="Int64"),
235+
"test_char": pd.Series(["a", "b", "c", "d"], dtype="object"),
236+
"test_uuid": pd.Series(
237+
[
238+
"86b494cc-96b2-11eb-9298-3e22fbb9fe9d",
239+
"86b49b84-96b2-11eb-9298-3e22fbb9fe9d",
240+
"86b49c42-96b2-11eb-9298-3e22fbb9fe9d",
241+
"86b49cce-96b2-11eb-9298-3e22fbb9fe9d"
242+
], dtype="object"
243+
),
244+
"test_time": pd.Series(["08:12:40", "10:03:00", "23:00:10", "18:30:00"], dtype="object"),
245+
"test_json": pd.Series(
246+
[
247+
'{"customer":"John Doe","items":{"product":"Beer","qty":6}}',
248+
'{"customer":"Lily Bush","items":{"product":"Diaper","qty":24}}',
249+
'{"customer":"Josh William","items":{"product":"Toy Car","qty":1}}',
250+
'{"customer":"Mary Clark","items":{"product":"Toy Train","qty":2}}',
251+
], dtype="object"
252+
),
253+
"test_jsonb": pd.Series(
254+
[
255+
'{"qty":6,"product":"Beer"}',
256+
'{"qty":24,"product":"Diaper"}',
257+
'{"qty":1,"product":"Toy Car"}',
258+
'{"qty":2,"product":"Toy Train"}',
259+
], dtype="object"
260+
),
261+
"test_bytea": pd.Series(
262+
[
263+
b'test',
264+
b'\xd0\x97\xd0\xb4\xd1\x80\xd0\xb0\xcc\x81\xd0\xb2\xd1\x81\xd1\x82\xd0\xb2\xd1\x83\xd0\xb9\xd1\x82\xd0\xb5',
265+
b'123bhaf4',
266+
b'\xf0\x9f\x98\x9c'
267+
], dtype="object"),
268+
"test_enum": pd.Series(['happy', 'very happy', 'ecstatic', 'ecstatic'], dtype="object")
269+
},
270+
)
271+
assert_frame_equal(df, expected, check_names=True)
272+
273+
274+
def test_types_csv(postgres_url: str) -> None:
275+
query = "SELECT test_int16, test_char, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text FROM test_types"
276+
df = read_sql(postgres_url, query, protocol="csv")
277+
expected = pd.DataFrame(
278+
index=range(4),
279+
data={
280+
"test_int16": pd.Series([0, 1, 2, 3], dtype="Int64"),
281+
"test_char": pd.Series(["a", "b", "c", "d"], dtype="object"),
282+
"test_uuid": pd.Series(
283+
[
284+
"86b494cc-96b2-11eb-9298-3e22fbb9fe9d",
285+
"86b49b84-96b2-11eb-9298-3e22fbb9fe9d",
286+
"86b49c42-96b2-11eb-9298-3e22fbb9fe9d",
287+
"86b49cce-96b2-11eb-9298-3e22fbb9fe9d"
288+
], dtype="object"
289+
),
290+
"test_time": pd.Series(["08:12:40", "10:03:00", "23:00:10", "18:30:00"], dtype="object"),
291+
"test_json": pd.Series(
292+
[
293+
'{"customer":"John Doe","items":{"product":"Beer","qty":6}}',
294+
'{"customer":"Lily Bush","items":{"product":"Diaper","qty":24}}',
295+
'{"customer":"Josh William","items":{"product":"Toy Car","qty":1}}',
296+
'{"customer":"Mary Clark","items":{"product":"Toy Train","qty":2}}',
297+
], dtype="object"
298+
),
299+
"test_jsonb": pd.Series(
300+
[
301+
'{"qty":6,"product":"Beer"}',
302+
'{"qty":24,"product":"Diaper"}',
303+
'{"qty":1,"product":"Toy Car"}',
304+
'{"qty":2,"product":"Toy Train"}',
305+
], dtype="object"
306+
),
307+
"test_bytea": pd.Series(
308+
[
309+
b'test',
310+
b'\xd0\x97\xd0\xb4\xd1\x80\xd0\xb0\xcc\x81\xd0\xb2\xd1\x81\xd1\x82\xd0\xb2\xd1\x83\xd0\xb9\xd1\x82\xd0\xb5',
311+
b'123bhaf4',
312+
b'\xf0\x9f\x98\x9c'
313+
], dtype="object"),
314+
"test_enum": pd.Series(['happy', 'very happy', 'ecstatic', 'ecstatic'], dtype="object")
315+
},
316+
)
317+
assert_frame_equal(df, expected, check_names=True)

connectorx-python/src/pandas/destination.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::pandas_columns::{
2-
BooleanBlock, DateTimeBlock, Float64Block, HasPandasColumn, Int64Block, PandasColumn,
3-
PandasColumnObject, StringBlock,
2+
BooleanBlock, BytesBlock, DateTimeBlock, Float64Block, HasPandasColumn, Int64Block,
3+
PandasColumn, PandasColumnObject, StringBlock,
44
};
55
use super::types::{PandasDType, PandasTypeSystem};
66
use anyhow::anyhow;
@@ -156,7 +156,9 @@ impl<'a> Destination for PandasDestination<'a> {
156156
.collect()
157157
}
158158
}
159-
PandasTypeSystem::String(_) => {
159+
PandasTypeSystem::String(_)
160+
| PandasTypeSystem::Str(_)
161+
| PandasTypeSystem::Char(_) => {
160162
let block = StringBlock::extract(buf).map_err(|e| anyhow!(e))?;
161163
let cols = block.split()?;
162164
for (&cid, col) in cids.iter().zip_eq(cols) {
@@ -167,6 +169,17 @@ impl<'a> Destination for PandasDestination<'a> {
167169
.collect()
168170
}
169171
}
172+
PandasTypeSystem::Bytes(_) => {
173+
let block = BytesBlock::extract(buf).map_err(|e| anyhow!(e))?;
174+
let cols = block.split()?;
175+
for (&cid, col) in cids.iter().zip_eq(cols) {
176+
partitioned_columns[cid] = col
177+
.partition(&counts)
178+
.into_iter()
179+
.map(|c| Box::new(c) as _)
180+
.collect()
181+
}
182+
}
170183
PandasTypeSystem::DateTime(_) => {
171184
let block = DateTimeBlock::extract(buf).map_err(|e| anyhow!(e))?;
172185
let cols = block.split()?;
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
2+
use anyhow::anyhow;
3+
use connectorx::ConnectorAgentError;
4+
use fehler::throws;
5+
use ndarray::{ArrayViewMut2, Axis, Ix2};
6+
use numpy::{npyffi::NPY_TYPES, Element, PyArray, PyArrayDescr};
7+
use pyo3::{FromPyObject, Py, PyAny, PyResult, Python};
8+
use std::any::TypeId;
9+
use std::sync::{Arc, Mutex};
10+
11+
#[derive(Clone)]
12+
#[repr(transparent)]
13+
pub struct PyBytes(Py<pyo3::types::PyBytes>);
14+
15+
// In order to put it into a numpy array
16+
impl Element for PyBytes {
17+
const DATA_TYPE: numpy::DataType = numpy::DataType::Object;
18+
fn is_same_type(dtype: &PyArrayDescr) -> bool {
19+
unsafe { *dtype.as_dtype_ptr() }.type_num == NPY_TYPES::NPY_OBJECT as i32
20+
}
21+
}
22+
23+
pub struct BytesBlock<'a> {
24+
data: ArrayViewMut2<'a, PyBytes>,
25+
mutex: Arc<Mutex<()>>,
26+
buf_size_mb: usize,
27+
}
28+
29+
impl<'a> FromPyObject<'a> for BytesBlock<'a> {
30+
fn extract(ob: &'a PyAny) -> PyResult<Self> {
31+
check_dtype(ob, "object")?;
32+
let array = ob.downcast::<PyArray<PyBytes, Ix2>>()?;
33+
let data = unsafe { array.as_array_mut() };
34+
Ok(BytesBlock {
35+
data,
36+
mutex: Arc::new(Mutex::new(())), // allocate the lock here since only BytesBlock needs to aquire the GIL for now
37+
buf_size_mb: 16, // in MB
38+
})
39+
}
40+
}
41+
42+
impl<'a> BytesBlock<'a> {
43+
#[throws(ConnectorAgentError)]
44+
pub fn split(self) -> Vec<BytesColumn<'a>> {
45+
let mut ret = vec![];
46+
let mut view = self.data;
47+
48+
let nrows = view.ncols();
49+
while view.nrows() > 0 {
50+
let (col, rest) = view.split_at(Axis(0), 1);
51+
view = rest;
52+
ret.push(BytesColumn {
53+
data: col
54+
.into_shape(nrows)?
55+
.into_slice()
56+
.ok_or_else(|| anyhow!("get None for splitted String data"))?,
57+
next_write: 0,
58+
bytes_lengths: vec![],
59+
bytes_buf: Vec::with_capacity(self.buf_size_mb * 2 << 20 * 11 / 10), // allocate a little bit more memory to avoid Vec growth
60+
buf_size: self.buf_size_mb * 2 << 20,
61+
mutex: self.mutex.clone(),
62+
})
63+
}
64+
ret
65+
}
66+
}
67+
68+
pub struct BytesColumn<'a> {
69+
data: &'a mut [PyBytes],
70+
next_write: usize,
71+
bytes_buf: Vec<u8>,
72+
bytes_lengths: Vec<usize>,
73+
buf_size: usize,
74+
mutex: Arc<Mutex<()>>,
75+
}
76+
77+
impl<'a> PandasColumnObject for BytesColumn<'a> {
78+
fn typecheck(&self, id: TypeId) -> bool {
79+
id == TypeId::of::<&'static [u8]>() || id == TypeId::of::<Option<&'static [u8]>>()
80+
}
81+
fn len(&self) -> usize {
82+
self.data.len()
83+
}
84+
fn typename(&self) -> &'static str {
85+
std::any::type_name::<&'static [u8]>()
86+
}
87+
#[throws(ConnectorAgentError)]
88+
fn finalize(&mut self) {
89+
self.flush()?;
90+
}
91+
}
92+
93+
impl<'a> PandasColumn<Vec<u8>> for BytesColumn<'a> {
94+
#[throws(ConnectorAgentError)]
95+
fn write(&mut self, val: Vec<u8>) {
96+
self.bytes_lengths.push(val.len());
97+
self.bytes_buf.extend_from_slice(&val[..]);
98+
self.try_flush()?;
99+
}
100+
}
101+
102+
impl<'a> PandasColumn<Option<Vec<u8>>> for BytesColumn<'a> {
103+
#[throws(ConnectorAgentError)]
104+
fn write(&mut self, val: Option<Vec<u8>>) {
105+
match val {
106+
Some(b) => {
107+
self.bytes_lengths.push(b.len());
108+
self.bytes_buf.extend_from_slice(&b[..]);
109+
self.try_flush()?;
110+
}
111+
None => {
112+
self.bytes_lengths.push(0);
113+
}
114+
}
115+
}
116+
}
117+
118+
impl HasPandasColumn for Vec<u8> {
119+
type PandasColumn<'a> = BytesColumn<'a>;
120+
}
121+
122+
impl HasPandasColumn for Option<Vec<u8>> {
123+
type PandasColumn<'a> = BytesColumn<'a>;
124+
}
125+
126+
impl<'a> BytesColumn<'a> {
127+
pub fn partition(self, counts: &[usize]) -> Vec<BytesColumn<'a>> {
128+
let mut partitions = vec![];
129+
let mut data = self.data;
130+
131+
for &c in counts {
132+
let (splitted_data, rest) = data.split_at_mut(c);
133+
data = rest;
134+
135+
partitions.push(BytesColumn {
136+
data: splitted_data,
137+
next_write: 0,
138+
bytes_lengths: vec![],
139+
bytes_buf: Vec::with_capacity(self.buf_size),
140+
buf_size: self.buf_size,
141+
mutex: self.mutex.clone(),
142+
});
143+
}
144+
145+
partitions
146+
}
147+
148+
#[throws(ConnectorAgentError)]
149+
pub fn flush(&mut self) {
150+
let nstrings = self.bytes_lengths.len();
151+
152+
if nstrings > 0 {
153+
let py = unsafe { Python::assume_gil_acquired() };
154+
155+
{
156+
// allocation in python is not thread safe
157+
let _guard = self
158+
.mutex
159+
.lock()
160+
.map_err(|e| anyhow!("mutex poisoned {}", e))?;
161+
let mut start = 0;
162+
for (i, &len) in self.bytes_lengths.iter().enumerate() {
163+
let end = start + len;
164+
if len != 0 {
165+
unsafe {
166+
// allocate and write in the same time
167+
*self.data.get_unchecked_mut(self.next_write + i) = PyBytes(
168+
pyo3::types::PyBytes::new(py, &self.bytes_buf[start..end]).into(),
169+
);
170+
};
171+
}
172+
start = end;
173+
}
174+
}
175+
176+
self.bytes_buf.truncate(0);
177+
self.next_write += nstrings;
178+
}
179+
}
180+
181+
#[throws(ConnectorAgentError)]
182+
pub fn try_flush(&mut self) {
183+
if self.bytes_buf.len() >= self.buf_size {
184+
self.flush()?;
185+
}
186+
}
187+
}

connectorx-python/src/pandas/pandas_columns/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
mod boolean;
2+
mod bytes;
23
mod datetime;
34
mod float64;
45
mod int64;
56
mod string;
67
// TODO: use macro for integers
78

9+
pub use crate::pandas::pandas_columns::bytes::{BytesBlock, BytesColumn};
810
pub use boolean::{BooleanBlock, BooleanColumn};
911
use connectorx::Result;
1012
pub use datetime::{DateTimeBlock, DateTimeColumn};

0 commit comments

Comments
 (0)