Skip to content

Commit 9c13715

Browse files
committed
Don't call poll_complete once Sink01::close has been called
1 parent 709673d commit 9c13715

File tree

1 file changed

+26
-14
lines changed

1 file changed

+26
-14
lines changed

futures-util/src/compat/compat01as03.rs

+26-14
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ impl<St: Stream01> Stream03 for Compat01As03<St> {
126126
pub struct Compat01As03Sink<S, SinkItem> {
127127
pub(crate) inner: Spawn01<S>,
128128
pub(crate) buffer: Option<SinkItem>,
129+
pub(crate) close_started: bool,
129130
}
130131

131132
impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {}
@@ -136,6 +137,7 @@ impl<S, SinkItem> Compat01As03Sink<S, SinkItem> {
136137
Compat01As03Sink {
137138
inner: spawn01(inner),
138139
buffer: None,
140+
close_started: false
139141
}
140142
}
141143

@@ -230,39 +232,49 @@ where
230232
lw: &LocalWaker,
231233
) -> task03::Poll<Result<(), Self::SinkError>> {
232234
let item = self.buffer.take();
235+
let close_started = self.close_started;
236+
233237
match self.in_notify(lw, |f| match item {
234238
Some(i) => match f.start_send(i) {
235239
Ok(AsyncSink01::Ready) => {
236240
match f.poll_complete() {
237241
Ok(Async01::Ready(_)) => {
238242
match <S as Sink01>::close(f) {
239-
Ok(i) => Ok((i, None)),
243+
Ok(i) => Ok((i, None, true)),
240244
Err(e) => Err(e)
241245
}
242246
},
243-
Ok(Async01::NotReady) => Ok((Async01::NotReady, None)),
247+
Ok(Async01::NotReady) => Ok((Async01::NotReady, None, false)),
244248
Err(e) => Err(e)
245249
}
246250
},
247251
Ok(AsyncSink01::NotReady(t)) => {
248-
Ok((Async01::NotReady, Some(t)))
252+
Ok((Async01::NotReady, Some(t), close_started))
249253
}
250254
Err(e) => Err(e),
251255
},
252-
None => match f.poll_complete() {
253-
Ok(Async01::Ready(_)) => {
254-
match <S as Sink01>::close(f) {
255-
Ok(i) => Ok((i, None)),
256-
Err(e) => Err(e)
257-
}
258-
},
259-
Ok(Async01::NotReady) => Ok((Async01::NotReady, None)),
260-
Err(e) => Err(e)
256+
None => if close_started {
257+
match <S as Sink01>::close(f) {
258+
Ok(i) => Ok((i, None, true)),
259+
Err(e) => Err(e)
260+
}
261+
} else {
262+
match f.poll_complete() {
263+
Ok(Async01::Ready(_)) => {
264+
match <S as Sink01>::close(f) {
265+
Ok(i) => Ok((i, None, true)),
266+
Err(e) => Err(e)
267+
}
268+
},
269+
Ok(Async01::NotReady) => Ok((Async01::NotReady, None, close_started)),
270+
Err(e) => Err(e)
271+
}
261272
},
262273
}) {
263-
Ok((Async01::Ready(_), _)) => task03::Poll::Ready(Ok(())),
264-
Ok((Async01::NotReady, item)) => {
274+
Ok((Async01::Ready(_), _, _)) => task03::Poll::Ready(Ok(())),
275+
Ok((Async01::NotReady, item, close_started)) => {
265276
self.buffer = item;
277+
self.close_started = close_started;
266278
task03::Poll::Pending
267279
}
268280
Err(e) => task03::Poll::Ready(Err(e)),

0 commit comments

Comments
 (0)