Skip to content

Commit e0a3df3

Browse files
committed
Async uart
1 parent be10b2e commit e0a3df3

File tree

6 files changed

+170
-14
lines changed

6 files changed

+170
-14
lines changed

rust/sysroot-stage2/Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/zephyr-futures/src/lib.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
extern crate alloc;
22
extern crate zephyr_core;
33

4-
use core::cell::{UnsafeCell, RefCell};
4+
use alloc::sync::{Arc, Weak};
5+
use core::cell::{RefCell, UnsafeCell};
6+
use core::marker::PhantomData;
57
use core::pin::Pin;
68
use core::task::{Context, Poll, Waker};
7-
use core::marker::PhantomData;
8-
use alloc::sync::{Arc, Weak};
99

1010
use futures::future::Future;
1111
use futures::stream::Stream;
@@ -71,6 +71,14 @@ thread_local! {
7171
static REACTOR: RefCell<Option<Reactor>> = RefCell::new(None);
7272
}
7373

74+
#[inline(never)]
75+
pub fn current_reactor_register(signal: &'static impl PollableKobj, context: &mut Context) {
76+
match REACTOR.try_with(|r| r.borrow_mut().as_mut().map(|r| r.register(signal, context))) {
77+
Ok(None) | Err(_) => panic!("register with no reactor"),
78+
Ok(Some(_)) => (),
79+
}
80+
}
81+
7482
struct Task {
7583
future: UnsafeCell<Pin<Box<dyn Future<Output = ()>>>>,
7684
executor: ExecutorHandle,
@@ -132,7 +140,10 @@ impl ExecutorInner {
132140
// that calls poll being not Send or Sync. Since we're not requiring spawned futures to be Send or
133141
// Sync and Executor is the effective owner, add a PhantomData here as if we directly own a Future
134142
// that is not explicitly Send or Sync.
135-
pub struct Executor(Arc<Mutex<'static, ExecutorInner>>, PhantomData<Future<Output = ()>>);
143+
pub struct Executor(
144+
Arc<Mutex<'static, ExecutorInner>>,
145+
PhantomData<Future<Output = ()>>,
146+
);
136147
#[derive(Clone)]
137148
struct ExecutorHandle(Weak<Mutex<'static, ExecutorInner>>);
138149

rust/zephyr-uart-buffered/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,5 @@ authors = ["Tyler Hall <[email protected]>"]
55
edition = "2018"
66

77
[dependencies]
8+
futures-preview = "=0.3.0-alpha.17"
9+
zephyr-futures = { path = "../zephyr-futures" }
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
use core::pin::Pin;
2+
use core::task::{Context, Poll};
3+
4+
use futures::io::{AsyncRead, AsyncWrite, Error, Initializer};
5+
6+
use zephyr_core::context::Any as C;
7+
use zephyr_core::poll::Signal;
8+
use zephyr_futures::current_reactor_register;
9+
10+
use super::{UartBufferedRx, UartBufferedTx};
11+
12+
pub struct UartBufferedRxAsync {
13+
uart: UartBufferedRx,
14+
}
15+
16+
impl UartBufferedRxAsync {
17+
pub fn new(uart: UartBufferedRx) -> Self {
18+
UartBufferedRxAsync { uart }
19+
}
20+
}
21+
22+
impl AsyncRead for UartBufferedRxAsync {
23+
fn poll_read(
24+
self: Pin<&mut Self>,
25+
cx: &mut Context,
26+
buf: &mut [u8],
27+
) -> Poll<Result<usize, Error>> {
28+
let s = self.get_mut();
29+
let uart = &mut s.uart;
30+
31+
if let Some(len) = uart.read_nb(buf) {
32+
return Poll::Ready(Ok(len));
33+
}
34+
35+
// Need to register for readiness on the signal. We wait to clear the
36+
// signal until after the uart is not ready so that we don't make a
37+
// redundant system call before each read attempt, e.g. if the client is
38+
// reading one byte at a time and there are several buffered.
39+
// Because the signal is edge triggered, resetting here could clear an
40+
// event that happened since the poll above. So poll one more time.
41+
let signal = uart.get_signal();
42+
signal.reset::<C>();
43+
current_reactor_register(signal, cx);
44+
45+
if let Some(len) = uart.read_nb(buf) {
46+
return Poll::Ready(Ok(len));
47+
}
48+
49+
Poll::Pending
50+
}
51+
52+
unsafe fn initializer(&self) -> Initializer {
53+
Initializer::nop()
54+
}
55+
}
56+
57+
pub struct UartBufferedTxAsync {
58+
uart: UartBufferedTx,
59+
}
60+
61+
impl UartBufferedTxAsync {
62+
pub fn new(uart: UartBufferedTx) -> Self {
63+
UartBufferedTxAsync { uart }
64+
}
65+
}
66+
67+
impl AsyncWrite for UartBufferedTxAsync {
68+
fn poll_write(
69+
self: Pin<&mut Self>,
70+
cx: &mut Context,
71+
buf: &[u8],
72+
) -> Poll<Result<usize, Error>> {
73+
let s = self.get_mut();
74+
let uart = &mut s.uart;
75+
76+
if let Some(len) = uart.write_nb(buf) {
77+
return Poll::Ready(Ok(len));
78+
}
79+
80+
// Need to register for readiness on the signal. We wait to clear the
81+
// signal until after the uart is not ready so that we don't make a
82+
// redundant system call before each write attempt, e.g. if the client
83+
// is writing one byte at a time and there are several buffered.
84+
// Because the signal is edge triggered, resetting here could clear an
85+
// event that happened since the poll above. So poll one more time.
86+
let signal = uart.get_signal();
87+
signal.reset::<C>();
88+
current_reactor_register(signal, cx);
89+
90+
if let Some(len) = uart.write_nb(buf) {
91+
return Poll::Ready(Ok(len));
92+
}
93+
94+
Poll::Pending
95+
}
96+
97+
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
98+
Poll::Ready(Ok(()))
99+
}
100+
101+
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
102+
Poll::Ready(Ok(()))
103+
}
104+
}

rust/zephyr-uart-buffered/src/lib.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,17 @@ use zephyr_core::poll::KPollSignal;
55
use zephyr_core::NegErr;
66
use zephyr_sys::raw::{uart_buffered_rx_handle, uart_buffered_tx_handle};
77

8+
mod futures;
9+
10+
pub use crate::futures::{UartBufferedRxAsync, UartBufferedTxAsync};
11+
812
pub struct UartBufferedRx {
913
handle: uart_buffered_rx_handle,
1014
}
1115

1216
impl UartBufferedRx {
17+
/// Unsafe because this is passed from C and caller must guarantee there is
18+
/// only one instance created per handle.
1319
pub unsafe fn new(handle: uart_buffered_rx_handle) -> Self {
1420
UartBufferedRx { handle }
1521
}
@@ -44,13 +50,19 @@ impl UartBufferedRx {
4450
pub fn get_signal(&self) -> &'static KPollSignal {
4551
unsafe { &*self.handle.fifo.signal }
4652
}
53+
54+
pub fn into_async(self) -> UartBufferedRxAsync {
55+
UartBufferedRxAsync::new(self)
56+
}
4757
}
4858

4959
pub struct UartBufferedTx {
5060
handle: uart_buffered_tx_handle,
5161
}
5262

5363
impl UartBufferedTx {
64+
/// Unsafe because this is passed from C and caller must guarantee there is
65+
/// only one instance created per handle.
5466
pub unsafe fn new(handle: uart_buffered_tx_handle) -> Self {
5567
UartBufferedTx { handle }
5668
}
@@ -85,4 +97,8 @@ impl UartBufferedTx {
8597
pub fn get_signal(&self) -> &'static KPollSignal {
8698
unsafe { &*self.handle.fifo.signal }
8799
}
100+
101+
pub fn into_async(self) -> UartBufferedTxAsync {
102+
UartBufferedTxAsync::new(self)
103+
}
88104
}

samples/serial/src/lib.rs

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,41 @@
1+
#![feature(async_await)]
2+
3+
extern crate futures;
14
extern crate zephyr;
25
extern crate zephyr_sys;
36
extern crate zephyr_uart_buffered;
7+
extern crate zephyr_macros;
8+
extern crate zephyr_futures;
9+
10+
use futures::{StreamExt, AsyncBufReadExt, AsyncWriteExt};
11+
use futures::io::BufReader;
412

513
use zephyr_sys::raw::{uart_buffered_rx_handle, uart_buffered_tx_handle};
614
use zephyr_uart_buffered::{UartBufferedRx, UartBufferedTx};
15+
use zephyr_futures::Executor;
16+
17+
zephyr_macros::k_mutex_define!(EXECUTOR_MUTEX);
18+
19+
async fn echo<R: AsyncBufReadExt + Unpin, W: AsyncWriteExt + Unpin>(rx: R, mut tx: W) {
20+
let mut lines = rx.lines();
21+
while let Some(line) = lines.next().await {
22+
let line = line.unwrap();
23+
println!("got line: {}", line);
24+
let line = line.into_bytes();
25+
tx.write_all(&line).await.unwrap();
26+
}
27+
}
728

829
#[no_mangle]
930
pub extern "C" fn rust_main(rx: uart_buffered_rx_handle, tx: uart_buffered_tx_handle) {
10-
let mut rx = unsafe { UartBufferedRx::new(rx) };
11-
let mut tx = unsafe { UartBufferedTx::new(tx) };
31+
use zephyr::context::Kernel as C;
1232

13-
loop {
14-
const X: &str = "hello\n";
15-
tx.write(X.as_bytes());
33+
let rx = unsafe { UartBufferedRx::new(rx) };
34+
let rx = BufReader::with_capacity(32, rx.into_async());
1635

17-
let mut buf = [0u8; 32];
18-
let n = rx.read(&mut buf);
19-
let mystr = std::str::from_utf8(&buf[..n]).unwrap();
20-
println!("read: n={} mystr={}", n, mystr);
21-
}
36+
let tx = unsafe { UartBufferedTx::new(tx) }.into_async();
37+
38+
let mut executor = unsafe { Executor::new(&EXECUTOR_MUTEX) };
39+
executor.spawn(C, echo(rx, tx));
40+
executor.run::<C>();
2241
}

0 commit comments

Comments
 (0)