Skip to content

Commit e8273f7

Browse files
committed
Add concurrent
1 parent 781d9aa commit e8273f7

File tree

10 files changed

+242
-24
lines changed

10 files changed

+242
-24
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ Cargo.lock
99
# These are backup files generated by rustfmt
1010
**/*.rs.bk
1111

12-
.idea/
12+
/.idea/
13+
/.vscode/

README.md

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,126 @@
11
# erpc-rs
22
eRPC library for Rust
3+
4+
# Installing junction & turf
5+
```
6+
$ git clone https://github.com/preshing/junction.git
7+
$ git clone https://github.com/preshing/turf.git
8+
$ cd junction
9+
$ mkdir build
10+
$ cd build
11+
$ cmake -DCMAKE_INSTALL_PREFIX=~/junction-install -DJUNCTION_WITH_SAMPLES=OFF ..
12+
$ cmake --build . --target install --config RelWithDebInfo
13+
```
14+
15+
# Installing eRPC
16+
```
17+
sudo apt-get install libboost-dev libboost-filesystem-dev libboost-thread-dev libboost-program-options-dev libboost-python-dev libboost-dev
18+
19+
# Installing googletest
20+
sudo apt install libgtest-dev build-essential cmake
21+
cd /usr/src/googletest
22+
sudo cmake .
23+
sudo cmake --build . --target install
24+
25+
cd ~/
26+
git clone https://github.com/erpc-io/eRPC.git
27+
cd eRPC/
28+
cmake . -DPERF=OFF -DTRANSPORT=infiniband -DROCE=on; make -j;
29+
```
30+
31+
# Server
32+
```rust,editable
33+
use erpc_sys::ffi;
34+
use std::os::raw::{c_int, c_void};
35+
use erpc_rs::context::AppContext;
36+
use erpc_rs::nexus::Nexus;
37+
use erpc_rs::reqhandle::ReqHandle;
38+
use erpc_rs::rpc::Rpc;
39+
40+
extern fn req_handler(req_handle: *mut ffi::ReqHandle, context: *mut c_void) -> () {
41+
println!("req_handler start");
42+
let req_handle = ReqHandle::from_raw(req_handle);
43+
let s = req_handle.get_req_msgbuf();
44+
println!("req: {}", String::from_utf8(s).expect(""));
45+
46+
let c = AppContext::from_raw(context);
47+
let r = Rpc::from_context(&c);
48+
let s = "world".to_string().into_bytes();
49+
r.enqueue_response(&req_handle, s);
50+
println!("req_handler end");
51+
}
52+
53+
extern fn sm_handler(_session_num: c_int, _sm_event_type: ffi::SmEventType, _sm_err_type: ffi::SmErrType, _context: *mut c_void) {
54+
println!("sm_handler");
55+
}
56+
57+
fn main() {
58+
// sudo rxe_cfg start
59+
// sudo rxe_cfg status
60+
let context = AppContext::new();
61+
let nexus = Nexus::new("127.0.0.1:31850".to_string(), 0, 0);
62+
63+
nexus.register_req_func(1, req_handler, 0);
64+
65+
let rpc = Rpc::new(&context, &nexus, 0, sm_handler, 0);
66+
67+
loop {
68+
rpc.run_event_loop(1000);
69+
}
70+
}
71+
```
72+
73+
# Client
74+
```rust,editable
75+
use erpc_sys::ffi;
76+
use std::os::raw::{c_int, c_void};
77+
use erpc_rs::context::AppContext;
78+
use erpc_rs::msgbuffs;
79+
use erpc_rs::rpc::Rpc;
80+
use erpc_rs::nexus::Nexus;
81+
82+
use msgbuffs::MsgBuffers;
83+
84+
extern fn sm_handler(session_num: c_int, sm_event_type: ffi::SmEventType, sm_err_type: ffi::SmErrType, context: *mut c_void) {
85+
println!("sm_handler session_num: {} sm_event_type: {} sm_err_type: {}", session_num, sm_event_type, sm_err_type);
86+
let _ctx: *mut ffi::AppContext = context as *mut ffi::AppContext;
87+
}
88+
89+
extern fn cont_func(_context: *mut c_void, tag: *mut c_void) {
90+
let context = AppContext::from_raw(_context);
91+
let tag = tag as usize;
92+
93+
let msg_buffs = MsgBuffers::from_context(&context, tag);
94+
let s = msg_buffs.get_resp_msgbuf();
95+
//let s = context.get_resp_msgbuf(tag);
96+
let s = String::from_utf8(s).expect("");
97+
println!("cont_func tag: {} resp: {}", tag, s);
98+
99+
let session_num = context.get_session_num();
100+
let rpc = Rpc::from_context(&context);
101+
let s = "hello".to_string().into_bytes();
102+
rpc.enqueue_request(&context, session_num, 1, s, cont_func, 1000, 0);
103+
}
104+
105+
fn main() {
106+
let context = AppContext::new();
107+
let nexus = Nexus::new("127.0.0.1:31851".to_string(), 0, 0);
108+
let rpc = Rpc::new(&context, &nexus, 0, sm_handler, 0);
109+
110+
let session_num = context.connect_session("127.0.0.1:31850".to_string(), 0);
111+
112+
println!("session_num: {}", session_num);
113+
114+
while !rpc.is_connected(session_num) {
115+
rpc.run_event_loop_once();
116+
}
117+
118+
println!("connected");
119+
120+
let s = "hello".to_string().into_bytes();
121+
rpc.enqueue_request(&context, session_num, 1, s, cont_func, 1000, 0);
122+
rpc.run_event_loop(1000*5);
123+
124+
println!("OK");
125+
}
126+
```

erpc-examples/src/client.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
use erpc_sys::ffi;
22
use std::os::raw::{c_int, c_void};
33
use erpc_rs::context::AppContext;
4+
use erpc_rs::msgbuffs;
45
use erpc_rs::rpc::Rpc;
56
use erpc_rs::nexus::Nexus;
67

8+
use msgbuffs::MsgBuffers;
9+
710
extern fn sm_handler(session_num: c_int, sm_event_type: ffi::SmEventType, sm_err_type: ffi::SmErrType, context: *mut c_void) {
811
println!("sm_handler session_num: {} sm_event_type: {} sm_err_type: {}", session_num, sm_event_type, sm_err_type);
912
let _ctx: *mut ffi::AppContext = context as *mut ffi::AppContext;
@@ -13,7 +16,9 @@ extern fn cont_func(_context: *mut c_void, tag: *mut c_void) {
1316
let context = AppContext::from_raw(_context);
1417
let tag = tag as usize;
1518

16-
let s = context.get_resp_msgbuf(tag);
19+
let msg_buffs = MsgBuffers::from_context(&context, tag);
20+
let s = msg_buffs.get_resp_msgbuf();
21+
//let s = context.get_resp_msgbuf(tag);
1722
let s = String::from_utf8(s).expect("");
1823
println!("cont_func tag: {} resp: {}", tag, s);
1924

erpc-examples/src/client_raw.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@ extern fn cont_func(_context: *mut c_void, _tag: *mut c_void) {
1515

1616
let data: *mut u8;
1717
let data_size : size_t = 0;
18-
unsafe { data = ffi::erpc_get_resp_msgbuf(ctx, &data_size) };
19-
//println!("data_size: {:?} {}", data, data_size);
18+
let msgbufs = unsafe { ffi::erpc_msgbuffs_get_by_tag(ctx, tag) };
19+
data = unsafe { ffi::erpc_msgbuffs_resp_msgbuf(msgbufs, &data_size) };
2020

2121
let s = unsafe { String::from_raw_parts(data, data_size, 0) };
2222
println!("cont_func tag: {} resp: {}", tag, s);
23+
24+
unsafe { ffi::erpc_msgbuffs_destroy(msgbufs) };
2325
}
2426

2527
fn main() {

erpc-sys/build.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ fn main() {
1212
.flag("-std=c++11") // -std=c++1y
1313
.flag("-DERPC_INFINIBAND=true")
1414
.include(Path::new( &home_dir ).join("eRPC/src"))
15+
.include(Path::new( &home_dir ).join("junction-install/include"))
1516
.include("src")
1617
.file("src/ffi.cpp")
1718
.file("src/server.cpp")
@@ -20,13 +21,17 @@ fn main() {
2021
.compile("erpc_ffi");
2122

2223
println!("cargo:rustc-link-search=native={}", Path::new(&home_dir).join("eRPC/build").display());
24+
println!("cargo:rustc-link-search=native={}", Path::new(&home_dir).join("junction-install/lib").display());
2325

2426
println!("cargo:rustc-link-lib=erpc");
2527
println!("cargo:rustc-link-lib=pthread");
2628
println!("cargo:rustc-link-lib=numa");
2729
println!("cargo:rustc-link-lib=dl");
2830
println!("cargo:rustc-link-lib=ibverbs");
2931

32+
println!("cargo:rustc-link-lib=junction");
33+
println!("cargo:rustc-link-lib=turf");
34+
3035
println!("cargo:rerun-if-changed=src/*");
3136
println!("cargo:rerun-if-changed=build.rs");
3237
}

erpc-sys/src/ffi.cpp

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,31 @@
55
#include "util/numautils.h"
66
#include "transport_impl/eth_common.h"
77
#include "rpc_types.h"
8+
#include <junction/ConcurrentMap_Leapfrog.h>
9+
10+
class MsgBuffers {
11+
public:
12+
erpc::MsgBuffer req_msgbuf;
13+
erpc::MsgBuffer resp_msgbuf;
14+
};
15+
16+
typedef junction::ConcurrentMap_Leapfrog<size_t, MsgBuffers*> ConcurrentMap;
817

918
class AppContext {
1019
public:
1120
erpc::Rpc<erpc::CTransport> *rpc = nullptr;
1221
int session_num = 0;
13-
erpc::MsgBuffer req_msgbuf;
14-
erpc::MsgBuffer resp_msgbuf;
22+
ConcurrentMap msgbufs;
1523

1624
~AppContext() {}
25+
26+
inline MsgBuffers *alloc_msg_buffer_or_die(size_t req_id, size_t req_max_data_size, size_t resp_max_data_size) {
27+
MsgBuffers *buffs = new MsgBuffers();
28+
buffs->req_msgbuf = rpc->alloc_msg_buffer_or_die(req_max_data_size);
29+
buffs->resp_msgbuf = rpc->alloc_msg_buffer_or_die(resp_max_data_size);
30+
msgbufs.assign(req_id, buffs);
31+
return buffs;
32+
}
1733
};
1834

1935
extern "C" {
@@ -97,13 +113,12 @@ uint8_t *erpc_get_req_msgbuf(erpc::ReqHandle *req_handle, size_t &data_size) {
97113

98114
void erpc_enqueue_request(AppContext *context, erpc::Rpc<erpc::CTransport> *rpc, int session_num, uint8_t req_type,
99115
const uint8_t *data, size_t data_size, erpc::erpc_cont_func_t cont_func, size_t tag, size_t cont_etid) {
100-
context->req_msgbuf = rpc->alloc_msg_buffer_or_die(data_size);
101-
context->resp_msgbuf = rpc->alloc_msg_buffer_or_die(256); // rpc->get_max_msg_size()
116+
auto buffs = context->alloc_msg_buffer_or_die(tag, data_size, 1024);
102117

103-
memcpy(context->req_msgbuf.buf, data, data_size);
118+
memcpy(buffs->req_msgbuf.buf, data, data_size);
104119

105-
rpc->enqueue_request(session_num, 1, &context->req_msgbuf,
106-
&context->resp_msgbuf, cont_func, reinterpret_cast<void *>(tag)); // nullptr
120+
rpc->enqueue_request(session_num, 1, &buffs->req_msgbuf,
121+
&buffs->resp_msgbuf, cont_func, reinterpret_cast<void *>(tag)); // nullptr
107122
}
108123

109124
void erpc_enqueue_response(erpc::Rpc<erpc::CTransport> *rpc, erpc::ReqHandle *req_handle, const uint8_t *data,
@@ -115,9 +130,30 @@ void erpc_enqueue_response(erpc::Rpc<erpc::CTransport> *rpc, erpc::ReqHandle *re
115130
rpc->enqueue_response(req_handle, &resp);
116131
}
117132

118-
uint8_t *erpc_get_resp_msgbuf(AppContext *context, size_t &data_size) {
119-
data_size = context->resp_msgbuf.get_data_size();
120-
return context->resp_msgbuf.buf;
133+
MsgBuffers *erpc_msgbuffs_get_by_tag(AppContext *context, size_t tag) {
134+
auto item = context->msgbufs.get(tag);
135+
if (item == nullptr) {
136+
return nullptr;
137+
}
138+
context->msgbufs.erase(tag);
139+
return item;
140+
}
141+
142+
void erpc_msgbuffs_destroy(MsgBuffers *buffs) {
143+
if (buffs != nullptr) {
144+
delete buffs;
145+
buffs = nullptr;
146+
}
147+
}
148+
149+
uint8_t *erpc_msgbuffs_req_msgbuf(MsgBuffers *buffs, size_t &data_size) {
150+
data_size = buffs->req_msgbuf.get_data_size();
151+
return buffs->req_msgbuf.buf;
152+
}
153+
154+
uint8_t *erpc_msgbuffs_resp_msgbuf(MsgBuffers *buffs, size_t &data_size) {
155+
data_size = buffs->resp_msgbuf.get_data_size();
156+
return buffs->resp_msgbuf.buf;
121157
}
122158

123159
}

erpc-sys/src/ffi.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use libc::{size_t};
44
pub enum Nexus {} // erpc::Nexus
55
pub enum ReqHandle {} // erpc::ReqHandle
66
pub enum AppContext {} // AppContext
7+
pub enum MsgBuffers {} // MsgBuffers
78
pub enum Rpc {} // erpc::Rpc<erpc::CTransport>
89

910
unsafe impl Send for Rpc {}
@@ -81,7 +82,11 @@ extern "C" {
8182
data_size: size_t, cont_func: extern fn(*mut c_void, *mut c_void),
8283
tag: size_t, cont_etid: size_t) -> ();
8384
pub fn erpc_enqueue_response(rpc: *mut Rpc, req_handle: *mut ReqHandle, data: *const u8, data_size: size_t) -> ();
84-
pub fn erpc_get_resp_msgbuf(context: *mut AppContext, data_size: &size_t) -> *mut u8;
85+
86+
pub fn erpc_msgbuffs_get_by_tag(context: *mut AppContext, tag: size_t) -> *mut MsgBuffers;
87+
pub fn erpc_msgbuffs_destroy(buffs: *mut MsgBuffers) -> ();
88+
pub fn erpc_msgbuffs_req_msgbuf(buffs: *mut MsgBuffers, data_size: &size_t) -> *mut u8;
89+
pub fn erpc_msgbuffs_resp_msgbuf(buffs: *mut MsgBuffers, data_size: &size_t) -> *mut u8;
8590

8691
pub fn server_test() -> c_int;
8792
pub fn client_test() -> c_int;

src/context.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,15 @@ impl AppContext {
3232
unsafe { ffi::erpc_connect_session(self.inner, server_uri.as_ptr(), rem_rpc_id) }
3333
}
3434

35-
pub fn get_resp_msgbuf(&self, _tag: usize) -> Vec<u8> {
36-
let data: *mut u8;
37-
let data_size : usize = 0;
38-
unsafe { data = ffi::erpc_get_resp_msgbuf(self.inner, &data_size) };
39-
//println!("data_size: {:?} {}", data, data_size);
40-
41-
let s = unsafe { String::from_raw_parts(data, data_size, 0) };
42-
s.into_bytes()
43-
}
35+
// pub fn get_resp_msgbuf(&self, tag: usize) -> Vec<u8> {
36+
// let data: *mut u8;
37+
// let data_size : usize = 0;
38+
// unsafe { data = ffi::erpc_get_resp_msgbuf(self.inner, tag, &data_size) };
39+
// //println!("data_size: {:?} {}", data, data_size);
40+
41+
// let s = unsafe { String::from_raw_parts(data, data_size, 0) };
42+
// s.into_bytes()
43+
// }
4444
}
4545

4646
impl Drop for AppContext {

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod context;
2+
pub mod msgbuffs;
23
pub mod nexus;
34
pub mod reqhandle;
45
pub mod rpc;

src/msgbuffs.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use erpc_sys::ffi::{self};
2+
use crate::context::AppContext;
3+
4+
pub struct MsgBuffers {
5+
pub inner: *mut ffi::MsgBuffers,
6+
}
7+
8+
impl MsgBuffers {
9+
pub fn from_context(context: &AppContext, tag: usize) -> Self {
10+
let msg_buffs = unsafe { ffi::erpc_msgbuffs_get_by_tag(context.inner, tag) };
11+
MsgBuffers {
12+
inner: msg_buffs,
13+
}
14+
}
15+
16+
pub fn get_req_msgbuf(&self) -> Vec<u8> {
17+
let data: *mut u8;
18+
let data_size : usize = 0;
19+
unsafe { data = ffi::erpc_msgbuffs_req_msgbuf(self.inner, &data_size) };
20+
21+
let s = unsafe { String::from_raw_parts(data, data_size, 0) };
22+
s.into_bytes()
23+
}
24+
25+
pub fn get_resp_msgbuf(&self) -> Vec<u8> {
26+
let data: *mut u8;
27+
let data_size : usize = 0;
28+
unsafe { data = ffi::erpc_msgbuffs_resp_msgbuf(self.inner, &data_size) };
29+
30+
let s = unsafe { String::from_raw_parts(data, data_size, 0) };
31+
s.into_bytes()
32+
}
33+
}
34+
35+
impl Drop for MsgBuffers {
36+
fn drop(&mut self) {
37+
unsafe { ffi::erpc_msgbuffs_destroy(self.inner) }
38+
}
39+
}

0 commit comments

Comments
 (0)