Skip to content

Commit 58f99d9

Browse files
committed
Don't call poll_complete once Sink01::close has been called
1 parent 0607921 commit 58f99d9

File tree

1 file changed

+26
-15
lines changed

1 file changed

+26
-15
lines changed

futures-util/src/compat/compat01as03.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use futures_01::{
99
use futures_core::{task as task03, Future as Future03, Stream as Stream03};
1010
use std::pin::Pin;
1111
use futures_sink::Sink as Sink03;
12-
use std::pin::{Pin, Unpin};
1312
use std::task::LocalWaker;
1413

1514
/// Converts a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
@@ -127,6 +126,7 @@ impl<St: Stream01> Stream03 for Compat01As03<St> {
127126
pub struct Compat01As03Sink<S, SinkItem> {
128127
pub(crate) inner: Spawn01<S>,
129128
pub(crate) buffer: Option<SinkItem>,
129+
pub(crate) close_started: bool,
130130
}
131131

132132
impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {}
@@ -137,6 +137,7 @@ impl<S, SinkItem> Compat01As03Sink<S, SinkItem> {
137137
Compat01As03Sink {
138138
inner: spawn01(inner),
139139
buffer: None,
140+
close_started: false
140141
}
141142
}
142143

@@ -231,39 +232,49 @@ where
231232
lw: &LocalWaker,
232233
) -> task03::Poll<Result<(), Self::SinkError>> {
233234
let item = self.buffer.take();
235+
let close_started = self.close_started;
236+
234237
match self.in_notify(lw, |f| match item {
235238
Some(i) => match f.start_send(i) {
236239
Ok(AsyncSink01::Ready) => {
237240
match f.poll_complete() {
238241
Ok(Async01::Ready(_)) => {
239242
match <S as Sink01>::close(f) {
240-
Ok(i) => Ok((i, None)),
243+
Ok(i) => Ok((i, None, true)),
241244
Err(e) => Err(e)
242245
}
243246
},
244-
Ok(Async01::NotReady) => Ok((Async01::NotReady, None)),
247+
Ok(Async01::NotReady) => Ok((Async01::NotReady, None, false)),
245248
Err(e) => Err(e)
246249
}
247250
},
248251
Ok(AsyncSink01::NotReady(t)) => {
249-
Ok((Async01::NotReady, Some(t)))
252+
Ok((Async01::NotReady, Some(t), close_started))
250253
}
251254
Err(e) => Err(e),
252255
},
253-
None => match f.poll_complete() {
254-
Ok(Async01::Ready(_)) => {
255-
match <S as Sink01>::close(f) {
256-
Ok(i) => Ok((i, None)),
257-
Err(e) => Err(e)
258-
}
259-
},
260-
Ok(Async01::NotReady) => Ok((Async01::NotReady, None)),
261-
Err(e) => Err(e)
256+
None => if close_started {
257+
match <S as Sink01>::close(f) {
258+
Ok(i) => Ok((i, None, true)),
259+
Err(e) => Err(e)
260+
}
261+
} else {
262+
match f.poll_complete() {
263+
Ok(Async01::Ready(_)) => {
264+
match <S as Sink01>::close(f) {
265+
Ok(i) => Ok((i, None, true)),
266+
Err(e) => Err(e)
267+
}
268+
},
269+
Ok(Async01::NotReady) => Ok((Async01::NotReady, None, close_started)),
270+
Err(e) => Err(e)
271+
}
262272
},
263273
}) {
264-
Ok((Async01::Ready(_), _)) => task03::Poll::Ready(Ok(())),
265-
Ok((Async01::NotReady, item)) => {
274+
Ok((Async01::Ready(_), _, _)) => task03::Poll::Ready(Ok(())),
275+
Ok((Async01::NotReady, item, close_started)) => {
266276
self.buffer = item;
277+
self.close_started = close_started;
267278
task03::Poll::Pending
268279
}
269280
Err(e) => task03::Poll::Ready(Err(e)),

0 commit comments

Comments
 (0)