Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

streaming proxy #731

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sqld/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ rustls-pemfile = "1.0.3"
rustls = "0.21.7"
async-stream = "0.3.5"
libsql = { git = "https://github.com/tursodatabase/libsql.git", rev = "bea8863", optional = true }
futures-option = "0.2.0"

[dev-dependencies]
proptest = "1.0.0"
Expand Down
132 changes: 119 additions & 13 deletions sqld/proto/proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package proxy;
message Queries {
repeated Query queries = 1;
// Uuid
string clientId = 2;
string client_id = 2;
}

message Query {
Expand Down Expand Up @@ -34,10 +34,10 @@ message QueryResult {

message Error {
enum ErrorCode {
SQLError = 0;
TxBusy = 1;
TxTimeout = 2;
Internal = 3;
SQL_ERROR = 0;
TX_BUSY = 1;
TX_TIMEOUT = 2;
INTERNAL = 3;
}

ErrorCode code = 1;
Expand Down Expand Up @@ -71,7 +71,7 @@ message Description {

message Value {
/// bincode encoded Value
bytes data = 1;
bytes data = 1;
}

message Row {
Expand All @@ -84,18 +84,19 @@ message Column {
}

message DisconnectMessage {
string clientId = 1;
string client_id = 1;
}

message Ack { }

enum State {
INIT = 0;
INVALID = 1;
TXN = 2;
}

message ExecuteResults {
repeated QueryResult results = 1;
enum State {
Init = 0;
Invalid = 1;
Txn = 2;
}
/// State after executing the queries
State state = 2;
/// Primary frame_no after executing the request.
Expand All @@ -110,7 +111,6 @@ message Step {
optional Cond cond = 1;
Query query = 2;
}

message Cond {
oneof cond {
OkCond ok = 1;
Expand Down Expand Up @@ -150,7 +150,113 @@ message ProgramReq {
Program pgm = 2;
}

/// Streaming exec request
message ExecReq {
/// id of the request. The response will contain this id.
uint32 request_id = 1;
oneof request {
StreamProgramReq execute = 2;
StreamDescribeReq describe = 3;
}
}

/// Describe request for the streaming protocol
message StreamProgramReq {
Program pgm = 1;
}

/// descibre request for the streaming protocol
message StreamDescribeReq {
string stmt = 1;
}

/// Response message for the streaming proto

/// Request response types
message Init { }
message BeginStep { }
message FinishStep {
uint64 affected_row_count = 1;
optional int64 last_insert_rowid = 2;
}
message StepError {
Error error = 1;
}
message ColsDescription {
repeated Column columns = 1;
}
message RowValue {
oneof value {
string text = 1;
int64 integer = 2;
double real = 3;
bytes blob = 4;
// null if present
bool null = 5;
}
}
message BeginRows { }
message BeginRow { }
message AddRowValue {
RowValue val = 1;
}
message FinishRow { }
message FinishRows { }
message Finish {
optional uint64 last_frame_no = 1;
State state = 2;
}

/// Stream execx dexcribe response messages
message DescribeParam {
optional string name = 1;
}

message DescribeCol {
string name = 1;
optional string decltype = 2;
}

message DescribeResp {
repeated DescribeParam params = 1;
repeated DescribeCol cols = 2;
bool is_explain = 3;
bool is_readonly = 4;
}

message RespStep {
oneof step {
Init init = 1;
BeginStep begin_step = 2;
FinishStep finish_step = 3;
StepError step_error = 4;
ColsDescription cols_description = 5;
BeginRows begin_rows = 6;
BeginRow begin_row = 7;
AddRowValue add_row_value = 8;
FinishRow finish_row = 9;
FinishRows finish_rows = 10;
Finish finish = 11;
}
}

message ProgramResp {
repeated RespStep steps = 1;
}

message ExecResp {
uint32 request_id = 1;
oneof response {
ProgramResp program_resp = 2;
DescribeResp describe_resp = 3;
Error error = 4;
}
}

service Proxy {
rpc StreamExec(stream ExecReq) returns (stream ExecResp) {}

// Deprecated:
rpc Execute(ProgramReq) returns (ExecuteResults) {}
rpc Describe(DescribeRequest) returns (DescribeResult) {}
rpc Disconnect(DisconnectMessage) returns (Ack) {}
Expand Down
Loading