Skip to content

Commit 121afa8

Browse files
LucioFrancoNemo157
authored andcommitted
Don't call poll_complete once Sink01::close has been called
1 parent 6cf2c2e commit 121afa8

File tree

1 file changed

+26
-14
lines changed

1 file changed

+26
-14
lines changed

futures-util/src/compat/compat01as03.rs

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ impl<St: Stream01> Stream03 for Compat01As03<St> {
122122
pub struct Compat01As03Sink<S, SinkItem> {
123123
pub(crate) inner: Spawn01<S>,
124124
pub(crate) buffer: Option<SinkItem>,
125+
pub(crate) close_started: bool,
125126
}
126127

127128
impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {}
@@ -132,6 +133,7 @@ impl<S, SinkItem> Compat01As03Sink<S, SinkItem> {
132133
Compat01As03Sink {
133134
inner: spawn01(inner),
134135
buffer: None,
136+
close_started: false
135137
}
136138
}
137139

@@ -226,39 +228,49 @@ where
226228
lw: &LocalWaker,
227229
) -> task03::Poll<Result<(), Self::SinkError>> {
228230
let item = self.buffer.take();
231+
let close_started = self.close_started;
232+
229233
match self.in_notify(lw, |f| match item {
230234
Some(i) => match f.start_send(i) {
231235
Ok(AsyncSink01::Ready) => {
232236
match f.poll_complete() {
233237
Ok(Async01::Ready(_)) => {
234238
match <S as Sink01>::close(f) {
235-
Ok(i) => Ok((i, None)),
239+
Ok(i) => Ok((i, None, true)),
236240
Err(e) => Err(e)
237241
}
238242
},
239-
Ok(Async01::NotReady) => Ok((Async01::NotReady, None)),
243+
Ok(Async01::NotReady) => Ok((Async01::NotReady, None, false)),
240244
Err(e) => Err(e)
241245
}
242246
},
243247
Ok(AsyncSink01::NotReady(t)) => {
244-
Ok((Async01::NotReady, Some(t)))
248+
Ok((Async01::NotReady, Some(t), close_started))
245249
}
246250
Err(e) => Err(e),
247251
},
248-
None => match f.poll_complete() {
249-
Ok(Async01::Ready(_)) => {
250-
match <S as Sink01>::close(f) {
251-
Ok(i) => Ok((i, None)),
252-
Err(e) => Err(e)
253-
}
254-
},
255-
Ok(Async01::NotReady) => Ok((Async01::NotReady, None)),
256-
Err(e) => Err(e)
252+
None => if close_started {
253+
match <S as Sink01>::close(f) {
254+
Ok(i) => Ok((i, None, true)),
255+
Err(e) => Err(e)
256+
}
257+
} else {
258+
match f.poll_complete() {
259+
Ok(Async01::Ready(_)) => {
260+
match <S as Sink01>::close(f) {
261+
Ok(i) => Ok((i, None, true)),
262+
Err(e) => Err(e)
263+
}
264+
},
265+
Ok(Async01::NotReady) => Ok((Async01::NotReady, None, close_started)),
266+
Err(e) => Err(e)
267+
}
257268
},
258269
}) {
259-
Ok((Async01::Ready(_), _)) => task03::Poll::Ready(Ok(())),
260-
Ok((Async01::NotReady, item)) => {
270+
Ok((Async01::Ready(_), _, _)) => task03::Poll::Ready(Ok(())),
271+
Ok((Async01::NotReady, item, close_started)) => {
261272
self.buffer = item;
273+
self.close_started = close_started;
262274
task03::Poll::Pending
263275
}
264276
Err(e) => task03::Poll::Ready(Err(e)),

0 commit comments

Comments
 (0)