Skip to content

Commit 1e6b807

Browse files
authored
Merge pull request #3775 from embassy-rs/dyn-dispatch-pipe
feat: add dynamic dispatch variants of pipe
2 parents 27fb1f4 + c06862e commit 1e6b807

File tree

2 files changed

+277
-0
lines changed

2 files changed

+277
-0
lines changed

embassy-sync/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## Unreleased
9+
10+
- Add dynamic dispatch variant of `Pipe`.
11+
812
## 0.6.1 - 2024-11-22
913

1014
- Add `LazyLock` sync primitive.

embassy-sync/src/pipe.rs

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,250 @@ impl<M: RawMutex, const N: usize> embedded_io_async::Write for Writer<'_, M, N>
532532
}
533533
}
534534

535+
//
536+
// Type-erased variants
537+
//
538+
539+
pub(crate) trait DynamicPipe {
540+
fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a>;
541+
fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a>;
542+
543+
fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError>;
544+
fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError>;
545+
546+
fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError>;
547+
fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError>;
548+
549+
fn consume(&self, amt: usize);
550+
unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError>;
551+
}
552+
553+
impl<M, const N: usize> DynamicPipe for Pipe<M, N>
554+
where
555+
M: RawMutex,
556+
{
557+
fn consume(&self, amt: usize) {
558+
Pipe::consume(self, amt)
559+
}
560+
561+
unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> {
562+
Pipe::try_fill_buf_with_context(self, cx)
563+
}
564+
565+
fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
566+
Pipe::write(self, buf).into()
567+
}
568+
569+
fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
570+
Pipe::read(self, buf).into()
571+
}
572+
573+
fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
574+
Pipe::try_read(self, buf)
575+
}
576+
577+
fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
578+
Pipe::try_write(self, buf)
579+
}
580+
581+
fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
582+
Pipe::try_write_with_context(self, cx, buf)
583+
}
584+
585+
fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
586+
Pipe::try_read_with_context(self, cx, buf)
587+
}
588+
}
589+
590+
/// Write-only access to a [`DynamicPipe`].
591+
pub struct DynamicWriter<'p> {
592+
pipe: &'p dyn DynamicPipe,
593+
}
594+
595+
impl<'p> Clone for DynamicWriter<'p> {
596+
fn clone(&self) -> Self {
597+
*self
598+
}
599+
}
600+
601+
impl<'p> Copy for DynamicWriter<'p> {}
602+
603+
impl<'p> DynamicWriter<'p> {
604+
/// Write some bytes to the pipe.
605+
///
606+
/// See [`Pipe::write()`]
607+
pub fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
608+
self.pipe.write(buf)
609+
}
610+
611+
/// Attempt to immediately write some bytes to the pipe.
612+
///
613+
/// See [`Pipe::try_write()`]
614+
pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
615+
self.pipe.try_write(buf)
616+
}
617+
}
618+
619+
impl<'p, M, const N: usize> From<Writer<'p, M, N>> for DynamicWriter<'p>
620+
where
621+
M: RawMutex,
622+
{
623+
fn from(value: Writer<'p, M, N>) -> Self {
624+
Self { pipe: value.pipe }
625+
}
626+
}
627+
628+
/// Future returned by [`DynamicWriter::write`].
629+
#[must_use = "futures do nothing unless you `.await` or poll them"]
630+
pub struct DynamicWriteFuture<'p> {
631+
pipe: &'p dyn DynamicPipe,
632+
buf: &'p [u8],
633+
}
634+
635+
impl<'p> Future for DynamicWriteFuture<'p> {
636+
type Output = usize;
637+
638+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
639+
match self.pipe.try_write_with_context(Some(cx), self.buf) {
640+
Ok(n) => Poll::Ready(n),
641+
Err(TryWriteError::Full) => Poll::Pending,
642+
}
643+
}
644+
}
645+
646+
impl<'p> Unpin for DynamicWriteFuture<'p> {}
647+
648+
impl<'p, M, const N: usize> From<WriteFuture<'p, M, N>> for DynamicWriteFuture<'p>
649+
where
650+
M: RawMutex,
651+
{
652+
fn from(value: WriteFuture<'p, M, N>) -> Self {
653+
Self {
654+
pipe: value.pipe,
655+
buf: value.buf,
656+
}
657+
}
658+
}
659+
660+
/// Read-only access to a [`DynamicPipe`].
661+
pub struct DynamicReader<'p> {
662+
pipe: &'p dyn DynamicPipe,
663+
}
664+
665+
impl<'p> DynamicReader<'p> {
666+
/// Read some bytes from the pipe.
667+
///
668+
/// See [`Pipe::read()`]
669+
pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
670+
self.pipe.read(buf)
671+
}
672+
673+
/// Attempt to immediately read some bytes from the pipe.
674+
///
675+
/// See [`Pipe::try_read()`]
676+
pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
677+
self.pipe.try_read(buf)
678+
}
679+
680+
/// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
681+
///
682+
/// If no bytes are currently available to read, this function waits until at least one byte is available.
683+
///
684+
/// If the reader is at end-of-file (EOF), an empty slice is returned.
685+
pub fn fill_buf(&mut self) -> DynamicFillBufFuture<'_> {
686+
DynamicFillBufFuture { pipe: Some(self.pipe) }
687+
}
688+
689+
/// Try returning contents of the internal buffer.
690+
///
691+
/// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`.
692+
///
693+
/// If the reader is at end-of-file (EOF), an empty slice is returned.
694+
pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> {
695+
unsafe { self.pipe.try_fill_buf_with_context(None) }
696+
}
697+
698+
/// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
699+
pub fn consume(&mut self, amt: usize) {
700+
self.pipe.consume(amt)
701+
}
702+
}
703+
704+
impl<'p, M, const N: usize> From<Reader<'p, M, N>> for DynamicReader<'p>
705+
where
706+
M: RawMutex,
707+
{
708+
fn from(value: Reader<'p, M, N>) -> Self {
709+
Self { pipe: value.pipe }
710+
}
711+
}
712+
713+
/// Future returned by [`Pipe::read`] and [`Reader::read`].
714+
#[must_use = "futures do nothing unless you `.await` or poll them"]
715+
pub struct DynamicReadFuture<'p> {
716+
pipe: &'p dyn DynamicPipe,
717+
buf: &'p mut [u8],
718+
}
719+
720+
impl<'p> Future for DynamicReadFuture<'p> {
721+
type Output = usize;
722+
723+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
724+
match self.pipe.try_read_with_context(Some(cx), self.buf) {
725+
Ok(n) => Poll::Ready(n),
726+
Err(TryReadError::Empty) => Poll::Pending,
727+
}
728+
}
729+
}
730+
731+
impl<'p> Unpin for DynamicReadFuture<'p> {}
732+
733+
impl<'p, M, const N: usize> From<ReadFuture<'p, M, N>> for DynamicReadFuture<'p>
734+
where
735+
M: RawMutex,
736+
{
737+
fn from(value: ReadFuture<'p, M, N>) -> Self {
738+
Self {
739+
pipe: value.pipe,
740+
buf: value.buf,
741+
}
742+
}
743+
}
744+
745+
/// Future returned by [`DynamicPipe::fill_buf`] and [`DynamicReader::fill_buf`].
746+
#[must_use = "futures do nothing unless you `.await` or poll them"]
747+
pub struct DynamicFillBufFuture<'p> {
748+
pipe: Option<&'p dyn DynamicPipe>,
749+
}
750+
751+
impl<'p> Future for DynamicFillBufFuture<'p> {
752+
type Output = &'p [u8];
753+
754+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
755+
let pipe = self.pipe.take().unwrap();
756+
match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } {
757+
Ok(buf) => Poll::Ready(buf),
758+
Err(TryReadError::Empty) => {
759+
self.pipe = Some(pipe);
760+
Poll::Pending
761+
}
762+
}
763+
}
764+
}
765+
766+
impl<'p> Unpin for DynamicFillBufFuture<'p> {}
767+
768+
impl<'p, M, const N: usize> From<FillBufFuture<'p, M, N>> for DynamicFillBufFuture<'p>
769+
where
770+
M: RawMutex,
771+
{
772+
fn from(value: FillBufFuture<'p, M, N>) -> Self {
773+
Self {
774+
pipe: value.pipe.map(|p| p as &dyn DynamicPipe),
775+
}
776+
}
777+
}
778+
535779
#[cfg(test)]
536780
mod tests {
537781
use futures_executor::ThreadPool;
@@ -619,6 +863,35 @@ mod tests {
619863
let _ = w.clone();
620864
}
621865

866+
#[test]
867+
fn dynamic_dispatch_pipe() {
868+
let mut c = Pipe::<NoopRawMutex, 3>::new();
869+
let (r, w) = c.split();
870+
let (mut r, w): (DynamicReader<'_>, DynamicWriter<'_>) = (r.into(), w.into());
871+
872+
assert!(w.try_write(&[42, 43]).is_ok());
873+
let buf = r.try_fill_buf().unwrap();
874+
assert_eq!(buf, &[42, 43]);
875+
let buf = r.try_fill_buf().unwrap();
876+
assert_eq!(buf, &[42, 43]);
877+
r.consume(1);
878+
let buf = r.try_fill_buf().unwrap();
879+
assert_eq!(buf, &[43]);
880+
r.consume(1);
881+
assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty));
882+
assert_eq!(w.try_write(&[44, 45, 46]), Ok(1));
883+
assert_eq!(w.try_write(&[45, 46]), Ok(2));
884+
let buf = r.try_fill_buf().unwrap();
885+
assert_eq!(buf, &[44]); // only one byte due to wraparound.
886+
r.consume(1);
887+
let buf = r.try_fill_buf().unwrap();
888+
assert_eq!(buf, &[45, 46]);
889+
assert!(w.try_write(&[47]).is_ok());
890+
let buf = r.try_fill_buf().unwrap();
891+
assert_eq!(buf, &[45, 46, 47]);
892+
r.consume(3);
893+
}
894+
622895
#[futures_test::test]
623896
async fn receiver_receives_given_try_write_async() {
624897
let executor = ThreadPool::new().unwrap();

0 commit comments

Comments
 (0)