Skip to content

Commit 70f2837

Browse files
authored
clean up remaining wait tasks (#403)
2 parents 6134c34 + 907e285 commit 70f2837

File tree

5 files changed

+62
-13
lines changed

5 files changed

+62
-13
lines changed

core/src/co_pool/mod.rs

+23-11
Original file line numberDiff line numberDiff line change
@@ -201,18 +201,32 @@ impl<'p> CoroutinePool<'p> {
201201
match self.state() {
202202
PoolState::Running => {
203203
assert_eq!(PoolState::Running, self.stopping()?);
204-
_ = self.try_timed_schedule_task(dur)?;
205-
assert_eq!(PoolState::Stopping, self.stopped()?);
204+
self.do_stop(dur)?;
206205
}
207-
PoolState::Stopping => {
208-
_ = self.try_timed_schedule_task(dur)?;
209-
assert_eq!(PoolState::Stopping, self.stopped()?);
210-
}
211-
PoolState::Stopped => {}
206+
PoolState::Stopping => self.do_stop(dur)?,
207+
PoolState::Stopped => self.do_clean(),
212208
}
213209
Ok(())
214210
}
215211

212+
fn do_stop(&mut self, dur: Duration) -> std::io::Result<()> {
213+
_ = self.try_timed_schedule_task(dur)?;
214+
assert_eq!(PoolState::Stopping, self.stopped()?);
215+
self.do_clean();
216+
Ok(())
217+
}
218+
219+
fn do_clean(&mut self) {
220+
// clean up remaining wait tasks
221+
for r in &self.waits {
222+
let task_name = *r.key();
223+
_ = self
224+
.results
225+
.insert(task_name.to_string(), Err("The coroutine pool has stopped"));
226+
self.notify(task_name);
227+
}
228+
}
229+
216230
/// Submit a new task to this pool.
217231
///
218232
/// Allow multiple threads to concurrently submit task to the pool,
@@ -271,7 +285,6 @@ impl<'p> CoroutinePool<'p> {
271285
let key = Box::leak(Box::from(task_name));
272286
if let Some(r) = self.try_take_task_result(key) {
273287
self.notify(key);
274-
drop(self.waits.remove(key));
275288
return Ok(r);
276289
}
277290
if SchedulableCoroutine::current().is_some() {
@@ -304,7 +317,6 @@ impl<'p> CoroutinePool<'p> {
304317
);
305318
if let Some(r) = self.try_take_task_result(key) {
306319
self.notify(key);
307-
assert!(self.waits.remove(key).is_some());
308320
return Ok(r);
309321
}
310322
Err(Error::new(ErrorKind::TimedOut, "wait timeout"))
@@ -415,8 +427,8 @@ impl<'p> CoroutinePool<'p> {
415427
}
416428

417429
fn notify(&self, task_name: &str) {
418-
if let Some(arc) = self.waits.get(task_name) {
419-
let (lock, cvar) = &**arc;
430+
if let Some((_, arc)) = self.waits.remove(task_name) {
431+
let (lock, cvar) = &*arc;
420432
let mut pending = lock.lock().expect("notify task failed");
421433
*pending = false;
422434
cvar.notify_one();

core/src/common/constants.rs

+2
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ pub enum SyscallName {
129129
WaitOnAddress,
130130
#[cfg(windows)]
131131
WSAPoll,
132+
/// panic!
133+
panicking,
132134
}
133135

134136
impl SyscallName {

core/src/coroutine/local.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ mod tests {
5252
let local = CoroutineLocal::default();
5353
assert!(local.put("1", 1).is_none());
5454
assert_eq!(Some(1), local.put("1", 2));
55-
assert_eq!(2, *local.get("1").unwrap());
55+
assert_eq!(2, *local.get::<i32>("1").unwrap());
5656
*local.get_mut("1").unwrap() = 3;
5757
assert_eq!(Some(3), local.remove("1"));
5858
}

core/src/monitor.rs

+29
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,35 @@ impl Monitor {
8080
match self.state.get() {
8181
MonitorState::Created => {
8282
self.state.set(MonitorState::Running);
83+
// install panic hook
84+
std::panic::set_hook(Box::new(|panic_hook_info| {
85+
let syscall = crate::common::constants::SyscallName::panicking;
86+
if let Some(co) = crate::scheduler::SchedulableCoroutine::current() {
87+
let new_state = crate::common::constants::SyscallState::Executing;
88+
if co.syscall((), syscall, new_state).is_err() {
89+
error!(
90+
"{} change to syscall {} {} failed !",
91+
co.name(),
92+
syscall,
93+
new_state
94+
);
95+
}
96+
}
97+
eprintln!(
98+
"panic hooked in open-coroutine, thread '{}' {}",
99+
std::thread::current().name().unwrap_or("unknown"),
100+
panic_hook_info
101+
);
102+
eprintln!(
103+
"stack backtrace:\n{}",
104+
std::backtrace::Backtrace::force_capture()
105+
);
106+
if let Some(co) = crate::scheduler::SchedulableCoroutine::current() {
107+
if co.running().is_err() {
108+
error!("{} change to running state failed !", co.name());
109+
}
110+
}
111+
}));
83112
// install SIGURG signal handler
84113
let mut set = SigSet::empty();
85114
set.add(Signal::SIGURG);

core/src/net/operator/windows/mod.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ impl<'o> Operator<'o> {
8585
unsafe {
8686
let ret = CreateIoCompletionPort(handle, self.iocp, self.cpu, 0);
8787
if ret.is_null()
88-
&& ERROR_INVALID_PARAMETER == WSAGetLastError().try_into().expect("overflow")
88+
&& ERROR_INVALID_PARAMETER
89+
== TryInto::<u32>::try_into(WSAGetLastError()).expect("overflow")
8990
{
9091
// duplicate bind
9192
return Ok(());
@@ -192,6 +193,11 @@ impl<'o> Operator<'o> {
192193
Ok((cq.len(), cq, timeout.map(|t| t.saturating_sub(cost))))
193194
}
194195

196+
#[allow(warnings)]
197+
pub(crate) fn async_cancel(&self, user_data: usize) -> std::io::Result<()> {
198+
todo!("CancelIoEx")
199+
}
200+
195201
pub(crate) fn accept(
196202
&self,
197203
user_data: usize,

0 commit comments

Comments
 (0)