Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 8f4bade

Browse files
committed
stream exec support describe
1 parent df6efbf commit 8f4bade

File tree

4 files changed

+91
-10
lines changed

4 files changed

+91
-10
lines changed

sqld/src/connection/libsql.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::sync::Arc;
55

66
use parking_lot::{Mutex, RwLock};
77
use rusqlite::{DatabaseName, ErrorCode, OpenFlags, StatementStatus, TransactionState};
8-
use sqld_libsql_bindings::wal_hook::{TransparentMethods, WalMethodsHook, };
8+
use sqld_libsql_bindings::wal_hook::{TransparentMethods, WalMethodsHook};
99
use tokio::sync::{watch, Notify};
1010
use tokio::time::{Duration, Instant};
1111

sqld/src/connection/write_proxy.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ pub mod test {
478478

479479
use super::*;
480480
use crate::{
481-
query_result_builder::{test::test_driver, QueryResultBuilderError, Column},
481+
query_result_builder::{test::test_driver, Column, QueryResultBuilderError},
482482
rpc::proxy::rpc::{query_result::RowResult, ExecuteResults},
483483
};
484484

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
---
2+
source: sqld/src/rpc/streaming_exec.rs
3+
expression: stream.next().await.unwrap().unwrap()
4+
---
5+
ExecResp {
6+
request_id: 0,
7+
response: Some(
8+
DescribeResp(
9+
DescribeResp {
10+
params: [
11+
DescribeParam {
12+
name: Some(
13+
"$hello",
14+
),
15+
},
16+
],
17+
cols: [
18+
DescribeCol {
19+
name: "$hello",
20+
decltype: None,
21+
},
22+
],
23+
is_explain: false,
24+
is_readonly: true,
25+
},
26+
),
27+
),
28+
}

sqld/src/rpc/streaming_exec.rs

+61-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::Arc;
22

3+
use futures_core::future::BoxFuture;
34
use futures_core::Stream;
45
use futures_option::OptionExt;
56
use prost::Message;
@@ -11,6 +12,7 @@ use tonic::{Code, Status};
1112

1213
use crate::auth::Authenticated;
1314
use crate::connection::Connection;
15+
use crate::connection::program::Program;
1416
use crate::error::Error;
1517
use crate::query_analysis::TxnStatus;
1618
use crate::query_result_builder::{
@@ -19,6 +21,7 @@ use crate::query_result_builder::{
1921
use crate::replication::FrameNo;
2022
use crate::rpc::proxy::rpc::exec_req::Request;
2123
use crate::rpc::proxy::rpc::exec_resp::{self, Response};
24+
use crate::rpc::proxy::rpc::{DescribeResp, StreamDescribeReq, DescribeCol, DescribeParam};
2225

2326
use super::proxy::rpc::resp_step::Step;
2427
use super::proxy::rpc::row_value::Value;
@@ -52,7 +55,7 @@ where
5255
C: Connection,
5356
{
5457
async_stream::stream! {
55-
let mut current_request_fut = None;
58+
let mut current_request_fut: Option<BoxFuture<'static, (crate::Result<()>, u32)>> = None;
5659
let (snd, mut recv) = mpsc::channel(1);
5760
let conn = Arc::new(conn);
5861

@@ -90,13 +93,41 @@ where
9093
max_program_resp_size,
9194
};
9295

93-
let ret = conn.execute_program(pgm, auth, builder, None).await;
96+
let ret = conn.execute_program(pgm, auth, builder, None).await.map(|_| ());
9497
(ret, request_id)
9598
};
9699

97100
current_request_fut.replace(Box::pin(fut));
98101
}
99-
Some(Request::Describe(_)) => todo!(),
102+
Some(Request::Describe(StreamDescribeReq { stmt })) => {
103+
let auth = auth.clone();
104+
let sender = snd.clone();
105+
let conn = conn.clone();
106+
let fut = async move {
107+
let do_describe = || async move {
108+
let ret = conn.describe(stmt, auth, None).await??;
109+
Ok(DescribeResp {
110+
cols: ret.cols.into_iter().map(|c| DescribeCol { name: c.name, decltype: c.decltype }).collect(),
111+
params: ret.params.into_iter().map(|p| DescribeParam { name: p.name }).collect(),
112+
is_explain: ret.is_explain,
113+
is_readonly: ret.is_readonly
114+
})
115+
};
116+
117+
let ret: crate::Result<()> = match do_describe().await {
118+
Ok(resp) => {
119+
let _ = sender.send(ExecResp { request_id, response: Some(Response::DescribeResp(resp)) }).await;
120+
Ok(())
121+
}
122+
Err(e) => Err(e),
123+
};
124+
125+
(ret, request_id)
126+
};
127+
128+
current_request_fut.replace(Box::pin(fut));
129+
130+
},
100131
None => {
101132
yield Err(Status::new(Code::InvalidArgument, "invalid request"));
102133
break
@@ -414,7 +445,7 @@ mod test {
414445
namespace: None,
415446
permission: Permission::FullAccess,
416447
});
417-
// limit the size of the response for force a split
448+
// limit the size of the response to force a split
418449
let stream = make_proxy_stream_inner(conn, auth, ReceiverStream::new(rcv), 500);
419450

420451
pin!(stream);
@@ -470,8 +501,7 @@ mod test {
470501
namespace: None,
471502
permission: Permission::FullAccess,
472503
});
473-
// limit the size of the response for force a split
474-
let stream = make_proxy_stream_inner(conn, auth, ReceiverStream::new(rcv), 500);
504+
let stream = make_proxy_stream(conn, auth, ReceiverStream::new(rcv));
475505

476506
pin!(stream);
477507

@@ -526,8 +556,7 @@ mod test {
526556
namespace: None,
527557
permission: Permission::FullAccess,
528558
});
529-
// limit the size of the response for force a split
530-
let stream = make_proxy_stream_inner(conn, auth, ReceiverStream::new(rcv), 500);
559+
let stream = make_proxy_stream(conn, auth, ReceiverStream::new(rcv));
531560

532561
pin!(stream);
533562

@@ -540,4 +569,28 @@ mod test {
540569
let resp = stream.next().await.unwrap().unwrap();
541570
assert_eq!(resp.request_id, 1);
542571
}
572+
573+
#[tokio::test]
574+
async fn describe() {
575+
let tmp = tempdir().unwrap();
576+
let conn = LibSqlConnection::new_test(tmp.path());
577+
let (snd, rcv) = mpsc::channel(1);
578+
let auth = Authenticated::Authorized(Authorized {
579+
namespace: None,
580+
permission: Permission::FullAccess,
581+
});
582+
let stream = make_proxy_stream(conn, auth, ReceiverStream::new(rcv));
583+
584+
pin!(stream);
585+
586+
// request 0 should be dropped, and request 1 should be processed instead
587+
let req = ExecReq {
588+
request_id: 0,
589+
request: Some(Request::Describe(StreamDescribeReq { stmt: "select $hello".into() })),
590+
};
591+
592+
snd.send(Ok(req)).await.unwrap();
593+
594+
assert_debug_snapshot!(stream.next().await.unwrap().unwrap());
595+
}
543596
}

0 commit comments

Comments
 (0)