Skip to content

Commit 09f14e9

Browse files
committed
feat(codspeed): add runner ipc via fifo
1 parent 071a15c commit 09f14e9

File tree

7 files changed

+298
-11
lines changed

7 files changed

+298
-11
lines changed

Cargo.lock

Lines changed: 40 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ members = [
1515
resolver = "2"
1616

1717
[workspace.dependencies]
18+
anyhow = "1.0.97"
1819
itertools = "0.14.0"
1920
serde = { version = "1.0.217", features = ["derive"] }
2021
serde_json = "1.0.138"

crates/cargo-codspeed/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ keywords = ["codspeed", "benchmark", "cargo"]
2121
cargo_metadata = "0.19.2"
2222
clap = { version = "=4.5.17", features = ["derive", "env"] }
2323
termcolor = "1.4"
24-
anyhow = "1.0.86"
24+
anyhow = { workspace = true }
2525
itertools = { workspace = true }
2626
anstyle = "1.0.8"
2727
serde = { workspace = true }

crates/codspeed/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ categories = [
1818
keywords = ["codspeed", "benchmark"]
1919

2020
[dependencies]
21+
anyhow = { workspace = true }
22+
bincode = "1.3.3"
2123
colored = "2.0.0"
2224
libc = "^0.2"
25+
nix = { version = "0.29.0", features = ["fs"] }
2326
serde = { workspace = true }
2427
serde_json = { workspace = true }
2528
uuid = { version = "1.12.1", features = ["v4"] }

crates/codspeed/src/fifo.rs

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
pub use super::shared::*;
2+
use anyhow::bail;
3+
use nix::libc::O_NONBLOCK;
4+
use nix::sys::stat;
5+
use nix::unistd::{self, unlink};
6+
use std::fs::{File, OpenOptions};
7+
use std::io::Read;
8+
use std::os::unix::fs::OpenOptionsExt;
9+
use std::path::{Path, PathBuf};
10+
11+
pub struct BenchGuard {
12+
ctl_fifo: FifoIpc,
13+
ack_fifo: FifoIpc,
14+
}
15+
16+
impl BenchGuard {
17+
pub fn new(ctl_fifo: &str, ack_fifo: &str) -> anyhow::Result<Self> {
18+
let mut instance = Self {
19+
ctl_fifo: FifoIpc::connect(ctl_fifo)?.with_writer()?,
20+
ack_fifo: FifoIpc::connect(ack_fifo)?.with_reader()?,
21+
};
22+
23+
instance.send_cmd(Command::SetIntegration {
24+
name: "codspeed-rust".into(),
25+
version: env!("CARGO_PKG_VERSION").into(),
26+
})?; // FIXME: Just send it once
27+
instance.send_cmd(Command::StartBenchmark)?;
28+
29+
Ok(instance)
30+
}
31+
32+
pub fn new_with_runner_fifo() -> anyhow::Result<Self> {
33+
Self::new(RUNNER_CTL_FIFO, RUNNER_ACK_FIFO)
34+
}
35+
36+
fn send_cmd(&mut self, cmd: Command) -> anyhow::Result<()> {
37+
self.ctl_fifo.send_cmd(cmd)?;
38+
self.ack_fifo.wait_for_ack();
39+
40+
Ok(())
41+
}
42+
}
43+
44+
impl Drop for BenchGuard {
45+
fn drop(&mut self) {
46+
self.send_cmd(Command::StopBenchmark)
47+
.expect("Failed to send stop command");
48+
}
49+
}
50+
51+
pub fn send_cmd(cmd: Command) -> anyhow::Result<()> {
52+
let mut writer = FifoIpc::connect(RUNNER_CTL_FIFO)?.with_writer()?;
53+
writer.send_cmd(cmd).unwrap();
54+
55+
let mut reader = FifoIpc::connect(RUNNER_ACK_FIFO)?.with_reader()?;
56+
reader.wait_for_ack();
57+
58+
Ok(())
59+
}
60+
61+
pub struct FifoIpc {
62+
path: PathBuf,
63+
reader: Option<File>,
64+
writer: Option<File>,
65+
}
66+
67+
impl FifoIpc {
68+
/// Creates a new FIFO at the specified path and connects to it.
69+
///
70+
/// ```rust
71+
/// use codspeed::fifo::{FifoIpc, Command};
72+
///
73+
/// // Create the reader before the writer (required!):
74+
/// let mut read_fifo = FifoIpc::create("/tmp/doctest.fifo").unwrap().with_reader().unwrap();
75+
///
76+
/// // Connect to the FIFO and send a command
77+
/// let mut fifo = FifoIpc::connect("/tmp/doctest.fifo").unwrap().with_writer().unwrap();
78+
/// fifo.send_cmd(Command::StartBenchmark).unwrap();
79+
///
80+
/// // Receive the command in the reader
81+
/// let cmd = read_fifo.recv_cmd().unwrap();
82+
/// assert_eq!(cmd, Command::StartBenchmark);
83+
/// ```
84+
pub fn create<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
85+
// Remove the previous FIFO (if it exists)
86+
let _ = unlink(path.as_ref());
87+
88+
// Create the FIFO with RWX permissions for the owner
89+
unistd::mkfifo(path.as_ref(), stat::Mode::S_IRWXU)?;
90+
91+
Self::connect(path.as_ref())
92+
}
93+
94+
pub fn connect<P: Into<PathBuf>>(path: P) -> anyhow::Result<Self> {
95+
let path = path.into();
96+
97+
if !path.exists() {
98+
bail!("FIFO does not exist: {}", path.display());
99+
}
100+
101+
Ok(Self {
102+
path,
103+
reader: None,
104+
writer: None,
105+
})
106+
}
107+
108+
pub fn with_reader(mut self) -> anyhow::Result<Self> {
109+
self.reader = Some(
110+
OpenOptions::new()
111+
.write(true)
112+
.read(true)
113+
.custom_flags(O_NONBLOCK)
114+
.open(&self.path)?,
115+
);
116+
Ok(self)
117+
}
118+
119+
/// WARNING: Writer must be opened _AFTER_ the reader.
120+
pub fn with_writer(mut self) -> anyhow::Result<Self> {
121+
self.writer = Some(
122+
OpenOptions::new()
123+
.write(true)
124+
.custom_flags(O_NONBLOCK)
125+
.open(&self.path)?,
126+
);
127+
Ok(self)
128+
}
129+
130+
pub fn recv_cmd(&mut self) -> anyhow::Result<Command> {
131+
// First read the length (u32 = 4 bytes)
132+
let mut len_buffer = [0u8; 4];
133+
self.read_exact(&mut len_buffer)?;
134+
let message_len = u32::from_le_bytes(len_buffer) as usize;
135+
136+
// Try to read the message
137+
let mut buffer = vec![0u8; message_len];
138+
loop {
139+
if self.read_exact(&mut buffer).is_ok() {
140+
break;
141+
}
142+
}
143+
144+
let decoded = bincode::deserialize(&buffer)?;
145+
Ok(decoded)
146+
}
147+
148+
pub fn send_cmd(&mut self, cmd: Command) -> anyhow::Result<()> {
149+
use std::io::Write;
150+
151+
let encoded = bincode::serialize(&cmd)?;
152+
self.write_all(&(encoded.len() as u32).to_le_bytes())?;
153+
self.write_all(&encoded)?;
154+
Ok(())
155+
}
156+
157+
pub fn wait_for_ack(&mut self) {
158+
loop {
159+
if let Ok(Command::Ack) = self.recv_cmd() {
160+
break;
161+
}
162+
}
163+
}
164+
}
165+
166+
impl std::io::Write for FifoIpc {
167+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
168+
if let Some(writer) = self.writer.as_mut() {
169+
writer.write(buf)
170+
} else {
171+
Err(std::io::Error::new(
172+
std::io::ErrorKind::NotConnected,
173+
"Writer not initialized",
174+
))
175+
}
176+
}
177+
178+
fn flush(&mut self) -> std::io::Result<()> {
179+
Ok(())
180+
}
181+
}
182+
183+
impl std::io::Read for FifoIpc {
184+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
185+
if let Some(reader) = self.reader.as_mut() {
186+
reader.read(buf)
187+
} else {
188+
Err(std::io::Error::new(
189+
std::io::ErrorKind::NotConnected,
190+
"Reader not initialized",
191+
))
192+
}
193+
}
194+
}
195+
196+
#[cfg(test)]
197+
mod tests {
198+
use super::*;
199+
use std::io::Write;
200+
201+
#[test]
202+
fn test_ipc_write_read() {
203+
let mut fifo = FifoIpc::create("/tmp/test1.fifo")
204+
.unwrap()
205+
.with_reader()
206+
.unwrap()
207+
.with_writer()
208+
.unwrap();
209+
210+
fifo.write_all(b"Hello").unwrap();
211+
let mut buffer = [0; 5];
212+
fifo.read_exact(&mut buffer).unwrap();
213+
assert_eq!(&buffer, b"Hello");
214+
}
215+
216+
#[test]
217+
fn test_ipc_send_recv_cmd() {
218+
let mut fifo = FifoIpc::create("/tmp/test2.fifo")
219+
.unwrap()
220+
.with_reader()
221+
.unwrap()
222+
.with_writer()
223+
.unwrap();
224+
225+
fifo.send_cmd(Command::StartBenchmark).unwrap();
226+
let cmd = fifo.recv_cmd().unwrap();
227+
assert_eq!(cmd, Command::StartBenchmark);
228+
}
229+
}

crates/codspeed/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
pub mod codspeed;
2+
pub mod fifo;
23
mod macros;
34
mod measurement;
45
mod request;
6+
mod shared;
57
pub mod utils;
68
pub mod walltime;

0 commit comments

Comments
 (0)