Skip to content

Commit ff13876

Browse files
authored
Merge pull request #72 from tursodatabase/lucio/fix-rt
fix rt drop before hrana close
2 parents 5cdf4b3 + 9537eba commit ff13876

File tree

4 files changed

+42
-49
lines changed

4 files changed

+42
-49
lines changed

Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "libsql-python"
3-
version = "0.0.40"
3+
version = "0.0.41"
44
edition = "2021"
55

66
[lib]
@@ -16,4 +16,4 @@ tracing-subscriber = "0.3"
1616
[build-dependencies]
1717
version_check = "0.9.5"
1818
# used where logic has to be version/distribution specific, e.g. pypy
19-
pyo3-build-config = { version = "0.19.0" }
19+
pyo3-build-config = { version = "0.19.0" }

shell.nix

+5-5
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
(pkgs.buildFHSUserEnv {
33
name = "pipzone";
44
targetPkgs = pkgs: (with pkgs; [
5-
python39
6-
python39Packages.pip
7-
python39Packages.virtualenv
8-
python39Packages.pytest
9-
python39Packages.pyperf
5+
python312
6+
python312Packages.pip
7+
python312Packages.virtualenv
8+
python312Packages.pytest
9+
python312Packages.pyperf
1010
maturin
1111
]);
1212
runScript = "bash";

src/lib.rs

+34-41
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,26 @@ use pyo3::create_exception;
33
use pyo3::exceptions::PyValueError;
44
use pyo3::prelude::*;
55
use pyo3::types::{PyList, PyTuple};
6-
use std::cell::RefCell;
7-
use std::sync::Arc;
6+
use std::cell::{OnceCell, RefCell};
7+
use std::sync::{Arc, OnceLock};
8+
use tokio::runtime::{Handle, Runtime};
89

910
const LEGACY_TRANSACTION_CONTROL: i32 = -1;
1011

12+
fn rt() -> Handle {
13+
static RT: OnceLock<Runtime> = OnceLock::new();
14+
15+
RT.get_or_init(|| {
16+
tokio::runtime::Builder::new_multi_thread()
17+
.worker_threads(1)
18+
.enable_all()
19+
.build()
20+
.unwrap()
21+
})
22+
.handle()
23+
.clone()
24+
}
25+
1126
fn to_py_err(error: libsql_core::errors::Error) -> PyErr {
1227
let msg = match error {
1328
libsql::Error::SqliteFailure(_, err) => err,
@@ -99,7 +114,7 @@ fn _connect_core(
99114
) -> PyResult<Connection> {
100115
let ver = env!("CARGO_PKG_VERSION");
101116
let ver = format!("libsql-python-rpc-{ver}");
102-
let rt = tokio::runtime::Runtime::new().unwrap();
117+
let rt = rt();
103118
let encryption_config = match encryption_key {
104119
Some(key) => {
105120
let cipher = libsql::Cipher::default();
@@ -147,9 +162,8 @@ fn _connect_core(
147162
db,
148163
conn: Arc::new(ConnectionGuard {
149164
conn: Some(conn),
150-
handle: rt.handle().clone(),
165+
handle: rt.clone(),
151166
}),
152-
rt,
153167
isolation_level,
154168
autocommit,
155169
})
@@ -186,7 +200,6 @@ impl Drop for ConnectionGuard {
186200
pub struct Connection {
187201
db: libsql_core::Database,
188202
conn: Arc<ConnectionGuard>,
189-
rt: tokio::runtime::Runtime,
190203
isolation_level: Option<String>,
191204
autocommit: i32,
192205
}
@@ -199,7 +212,6 @@ impl Connection {
199212
fn cursor(&self) -> PyResult<Cursor> {
200213
Ok(Cursor {
201214
arraysize: 1,
202-
rt: self.rt.handle().clone(),
203215
conn: self.conn.clone(),
204216
stmt: RefCell::new(None),
205217
rows: RefCell::new(None),
@@ -212,24 +224,19 @@ impl Connection {
212224

213225
fn sync(self_: PyRef<'_, Self>, py: Python<'_>) -> PyResult<()> {
214226
let fut = {
215-
let _enter = self_.rt.enter();
227+
let _enter = rt().enter();
216228
self_.db.sync()
217229
};
218230
tokio::pin!(fut);
219231

220-
self_
221-
.rt
222-
.block_on(check_signals(py, fut))
223-
.map_err(to_py_err)?;
232+
rt().block_on(check_signals(py, fut)).map_err(to_py_err)?;
224233
Ok(())
225234
}
226235

227236
fn commit(self_: PyRef<'_, Self>) -> PyResult<()> {
228237
// TODO: Switch to libSQL transaction API
229238
if !self_.conn.is_autocommit() {
230-
self_
231-
.rt
232-
.block_on(async { self_.conn.execute("COMMIT", ()).await })
239+
rt().block_on(async { self_.conn.execute("COMMIT", ()).await })
233240
.map_err(to_py_err)?;
234241
}
235242
Ok(())
@@ -238,9 +245,7 @@ impl Connection {
238245
fn rollback(self_: PyRef<'_, Self>) -> PyResult<()> {
239246
// TODO: Switch to libSQL transaction API
240247
if !self_.conn.is_autocommit() {
241-
self_
242-
.rt
243-
.block_on(async { self_.conn.execute("ROLLBACK", ()).await })
248+
rt().block_on(async { self_.conn.execute("ROLLBACK", ()).await })
244249
.map_err(to_py_err)?;
245250
}
246251
Ok(())
@@ -252,8 +257,7 @@ impl Connection {
252257
parameters: Option<&PyTuple>,
253258
) -> PyResult<Cursor> {
254259
let cursor = Connection::cursor(&self_)?;
255-
let rt = self_.rt.handle();
256-
rt.block_on(async { execute(&cursor, sql, parameters).await })?;
260+
rt().block_on(async { execute(&cursor, sql, parameters).await })?;
257261
Ok(cursor)
258262
}
259263

@@ -265,17 +269,15 @@ impl Connection {
265269
let cursor = Connection::cursor(&self_)?;
266270
for parameters in parameters.unwrap().iter() {
267271
let parameters = parameters.extract::<&PyTuple>()?;
268-
self_
269-
.rt
270-
.block_on(async { execute(&cursor, sql.clone(), Some(parameters)).await })?;
272+
rt().block_on(async { execute(&cursor, sql.clone(), Some(parameters)).await })?;
271273
}
272274
Ok(cursor)
273275
}
274276

275277
fn executescript(self_: PyRef<'_, Self>, script: String) -> PyResult<()> {
276-
let _ = self_.rt.block_on(async {
277-
self_.conn.execute_batch(&script).await
278-
}).map_err(to_py_err);
278+
let _ = rt()
279+
.block_on(async { self_.conn.execute_batch(&script).await })
280+
.map_err(to_py_err);
279281
Ok(())
280282
}
281283

@@ -316,7 +318,6 @@ impl Connection {
316318
pub struct Cursor {
317319
#[pyo3(get, set)]
318320
arraysize: usize,
319-
rt: tokio::runtime::Handle,
320321
conn: Arc<ConnectionGuard>,
321322
stmt: RefCell<Option<libsql_core::Statement>>,
322323
rows: RefCell<Option<libsql_core::Rows>>,
@@ -336,9 +337,7 @@ impl Cursor {
336337
sql: String,
337338
parameters: Option<&PyTuple>,
338339
) -> PyResult<pyo3::PyRef<'a, Self>> {
339-
self_
340-
.rt
341-
.block_on(async { execute(&self_, sql, parameters).await })?;
340+
rt().block_on(async { execute(&self_, sql, parameters).await })?;
342341
Ok(self_)
343342
}
344343

@@ -349,9 +348,7 @@ impl Cursor {
349348
) -> PyResult<pyo3::PyRef<'a, Cursor>> {
350349
for parameters in parameters.unwrap().iter() {
351350
let parameters = parameters.extract::<&PyTuple>()?;
352-
self_
353-
.rt
354-
.block_on(async { execute(&self_, sql.clone(), Some(parameters)).await })?;
351+
rt().block_on(async { execute(&self_, sql.clone(), Some(parameters)).await })?;
355352
}
356353
Ok(self_)
357354
}
@@ -360,9 +357,7 @@ impl Cursor {
360357
self_: PyRef<'a, Self>,
361358
script: String,
362359
) -> PyResult<pyo3::PyRef<'a, Self>> {
363-
self_
364-
.rt
365-
.block_on(async { self_.conn.execute_batch(&script).await })
360+
rt().block_on(async { self_.conn.execute_batch(&script).await })
366361
.map_err(to_py_err)?;
367362
Ok(self_)
368363
}
@@ -398,7 +393,7 @@ impl Cursor {
398393
let mut rows = self_.rows.borrow_mut();
399394
match rows.as_mut() {
400395
Some(rows) => {
401-
let row = self_.rt.block_on(rows.next()).map_err(to_py_err)?;
396+
let row = rt().block_on(rows.next()).map_err(to_py_err)?;
402397
match row {
403398
Some(row) => {
404399
let row = convert_row(self_.py(), row, rows.column_count())?;
@@ -422,8 +417,7 @@ impl Cursor {
422417
// done before iterating.
423418
if !*self_.done.borrow() {
424419
for _ in 0..size {
425-
let row = self_
426-
.rt
420+
let row = rt()
427421
.block_on(async { rows.next().await })
428422
.map_err(to_py_err)?;
429423
match row {
@@ -450,8 +444,7 @@ impl Cursor {
450444
Some(rows) => {
451445
let mut elements: Vec<Py<PyAny>> = vec![];
452446
loop {
453-
let row = self_
454-
.rt
447+
let row = rt()
455448
.block_on(async { rows.next().await })
456449
.map_err(to_py_err)?;
457450
match row {

0 commit comments

Comments
 (0)