Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit b289d66

Browse files
committed
flush when response exceed limit size
1 parent f6c6d1e commit b289d66

File tree

1 file changed

+10
-3
lines changed

1 file changed

+10
-3
lines changed

Diff for: sqld/src/rpc/streaming_exec.rs

+10-3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::Arc;
33
use std::task::{ready, Context, Poll};
44

55
use futures_core::Stream;
6+
use prost::Message;
67
use rusqlite::types::ValueRef;
78
use tokio::sync::mpsc;
89
use tonic::{Code, Status};
@@ -50,6 +51,7 @@ struct StreamResponseBuilder {
5051
request_id: u32,
5152
sender: mpsc::Sender<ExecResp>,
5253
current: Option<ProgramResp>,
54+
current_size: usize,
5355
}
5456

5557
impl StreamResponseBuilder {
@@ -59,12 +61,15 @@ impl StreamResponseBuilder {
5961
}
6062

6163
fn push(&mut self, step: Step) -> Result<(), QueryResultBuilderError> {
62-
const MAX_RESPONSE_STEPS: usize = 10;
64+
const MAX_RESPONSE_SIZE: usize = bytesize::ByteSize::mb(1).as_u64() as usize;
6365

6466
let current = self.current();
65-
current.steps.push(RespStep { step: Some(step) });
67+
let step = RespStep { step: Some(step) };
68+
let size = step.encoded_len();
69+
current.steps.push(step);
70+
self.current_size += size;
6671

67-
if current.steps.len() > MAX_RESPONSE_STEPS {
72+
if self.current_size >= MAX_RESPONSE_SIZE {
6873
self.flush()?;
6974
}
7075

@@ -77,6 +82,7 @@ impl StreamResponseBuilder {
7782
request_id: self.request_id,
7883
response: Some(exec_resp::Response::ProgramResp(current)),
7984
};
85+
self.current_size = 0;
8086
self.sender
8187
.blocking_send(resp)
8288
.map_err(|_| QueryResultBuilderError::Internal(anyhow::anyhow!("stream closed")))?;
@@ -235,6 +241,7 @@ where
235241
request_id,
236242
sender,
237243
current: None,
244+
current_size: 0,
238245
};
239246
let mut fut = conn.execute_program(pgm, authenticated, builder, None);
240247
loop {

0 commit comments

Comments
 (0)