Skip to content

Commit ee63307

Browse files
func_client
1 parent 1444237 commit ee63307

11 files changed

+334
-68
lines changed

qlib/bytestream.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ impl SocketBufIovs {
107107
let dst = buf.as_mut_ptr();
108108
unsafe {
109109
ptr::copy_nonoverlapping(src, dst, size);
110-
}
110+
}
111111
} else {
112112
let src = self.iovs[0].start as * const u8;
113113
let dst = buf.as_mut_ptr();
@@ -121,12 +121,14 @@ impl SocketBufIovs {
121121
ptr::copy_nonoverlapping(src, dst, size - self.iovs[0].len);
122122
}
123123
}
124-
124+
125+
self.Consume(size);
125126
return Ok(buf)
126127
}
127128

128129
pub fn ReadString(&mut self, size: usize) -> Result<String> {
129130
let buf = self.ReadVec(size)?;
131+
self.Consume(size);
130132
match String::from_utf8(buf) {
131133
Err(_) => return Err(Error::SysError(SysErr::EINVAL)),
132134
Ok(s) => return Ok(s),
@@ -142,12 +144,14 @@ impl SocketBufIovs {
142144

143145
if self.iovs[0].len >= size {
144146
let data = unsafe {*(self.iovs[0].start as * const T)};
147+
self.Consume(size);
145148
return Ok(data)
146149
} else {
147150
let buf = self.ReadVec(size);
148151
match buf {
149152
Ok(b) => {
150153
let data = unsafe {*(&b[0] as * const _ as u64 as * const T)};
154+
self.Consume(size);
151155
return Ok(data)
152156
}
153157
Err(e) => {

rdma_cli/Cargo.lock

Lines changed: 46 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rdma_cli/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ tokio-eventfd = "0.2.0"
5959
tokio-io = "0.1"
6060
futures-lite = "1.11"
6161
bytes = "1.3.0"
62+
futures = "0.3.25"
6263

6364
[dependencies.lazy_static]
6465
version = "1.4"

rdma_cli/src/async_test.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ use rdma_ingress_informer::RdmaIngressInformer;
109109
use service_informer::ServiceInformer;
110110
use crate::constants::*;
111111

112+
use crate::funclib::func_client::*;
113+
112114
pub static GLOBAL_ALLOCATOR: HostAllocator = HostAllocator::New();
113115

114116
lazy_static! {
@@ -118,6 +120,12 @@ lazy_static! {
118120

119121
#[tokio::main]
120122
async fn main() -> Result<()> {
121-
//not_semaphore_reads_and_resets().await?;
123+
println!("main ....");
124+
let client1 = FuncClient::NewTestTCPClient(1).await?;
125+
let client2 = FuncClient::NewTestTCPClient(1).await?;
126+
tokio::select! {
127+
_ = client1.msgStream.ReadMsg() => {},
128+
_ = client2.msgStream.ReadMsg() => {},
129+
}
122130
Ok(())
123131
}

rdma_cli/src/func_agent.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ use rdma_ingress_informer::RdmaIngressInformer;
109109
use service_informer::ServiceInformer;
110110
use crate::constants::*;
111111

112+
use crate::funclib::func_agent::*;
113+
use crate::funclib::agent::*;
114+
112115
pub static GLOBAL_ALLOCATOR: HostAllocator = HostAllocator::New();
113116

114117
lazy_static! {
@@ -118,5 +121,10 @@ lazy_static! {
118121

119122
#[tokio::main]
120123
async fn main() -> Result<()> {
121-
return Ok(());
124+
tokio::select! {
125+
_a = Execution() => {}
126+
_b = ChannelProcess() => {}
127+
};
128+
129+
return Ok(())
122130
}

0 commit comments

Comments
 (0)