Skip to content

Commit 7c10417

Browse files
committed
restore start_send use
1 parent 32fc3c1 commit 7c10417

File tree

1 file changed

+21
-27
lines changed

1 file changed

+21
-27
lines changed

common/nymnoise/src/stream.rs

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ use futures::{Sink, SinkExt, Stream, StreamExt};
77
use nym_crypto::asymmetric::x25519;
88
use pin_project::pin_project;
99
use snow::{Builder, HandshakeState, TransportState};
10-
use std::cmp::min;
1110
use std::io;
1211
use std::pin::Pin;
1312
use std::task::Poll;
13+
use std::{cmp::min, task::ready};
1414
use tokio::{
1515
io::{AsyncRead, AsyncWrite, ReadBuf},
1616
net::TcpStream,
@@ -183,34 +183,28 @@ impl AsyncWrite for NoiseStream {
183183
) -> Poll<Result<usize, std::io::Error>> {
184184
let mut projected_self = self.project();
185185

186-
match projected_self.inner_stream.as_mut().poll_ready(cx) {
187-
Poll::Pending => Poll::Pending,
186+
// returns on Poll::Pending and Poll:Ready(Err)
187+
ready!(projected_self.inner_stream.as_mut().poll_ready(cx))?;
188188

189-
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
189+
// Ready to send, encrypting message
190+
let mut noise_buf = BytesMut::zeroed(MAXMSGLEN + TAGLEN);
190191

191-
Poll::Ready(Ok(())) => {
192-
let mut noise_buf = BytesMut::zeroed(MAXMSGLEN + TAGLEN);
193-
194-
let Ok(len) = (match projected_self.noise {
195-
Some(transport_state) => transport_state.write_message(buf, &mut noise_buf),
196-
None => return Poll::Ready(Err(io::ErrorKind::Other.into())),
197-
}) else {
198-
return Poll::Ready(Err(io::ErrorKind::InvalidInput.into()));
199-
};
200-
noise_buf.truncate(len);
201-
match projected_self
202-
.inner_stream
203-
.as_mut()
204-
.start_send(noise_buf.into())
205-
{
206-
Ok(()) => match projected_self.inner_stream.poll_flush(cx) {
207-
Poll::Pending => Poll::Pending, // A return value of Poll::Pending means the waking is already scheduled
208-
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
209-
Poll::Ready(Ok(())) => Poll::Ready(Ok(buf.len())),
210-
},
211-
Err(e) => Poll::Ready(Err(e)),
212-
}
213-
}
192+
let Ok(len) = (match projected_self.noise {
193+
Some(transport_state) => transport_state.write_message(buf, &mut noise_buf),
194+
None => return Poll::Ready(Err(io::ErrorKind::Other.into())),
195+
}) else {
196+
return Poll::Ready(Err(io::ErrorKind::InvalidInput.into()));
197+
};
198+
noise_buf.truncate(len);
199+
200+
// Tokio uses the same `start_send ` in their SinkWriter implementation. https://docs.rs/tokio-util/latest/src/tokio_util/io/sink_writer.rs.html#104
201+
match projected_self
202+
.inner_stream
203+
.as_mut()
204+
.start_send(noise_buf.into())
205+
{
206+
Ok(()) => Poll::Ready(Ok(buf.len())),
207+
Err(e) => Poll::Ready(Err(e)),
214208
}
215209
}
216210

0 commit comments

Comments
 (0)