Skip to content

Commit 6ae9409

Browse files
authored
Dev add stack pool2 (#351)
2 parents 32b3bd3 + c69cb61 commit 6ae9409

File tree

6 files changed

+315
-9
lines changed

6 files changed

+315
-9
lines changed

.github/workflows/ci.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,9 @@ jobs:
114114

115115
- target: x86_64-pc-windows-gnu
116116
os: windows-latest
117-
- target: i686-pc-windows-gnu
118-
os: windows-latest
117+
# - target: i686-pc-windows-gnu
118+
# os: windows-latest
119119
- target: x86_64-pc-windows-msvc
120120
os: windows-latest
121-
- target: i686-pc-windows-msvc
122-
os: windows-latest
121+
# - target: i686-pc-windows-msvc
122+
# os: windows-latest

core/src/common/constants.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ pub const COROUTINE_GLOBAL_QUEUE_BEAN: &str = "coroutineGlobalQueueBean";
1515
/// Task global queue bean name.
1616
pub const TASK_GLOBAL_QUEUE_BEAN: &str = "taskGlobalQueueBean";
1717

18+
/// Stack pool bean name.
19+
pub const STACK_POOL_BEAN: &str = "stackPoolBean";
20+
1821
/// Monitor bean name.
1922
pub const MONITOR_BEAN: &str = "monitorBean";
2023

core/src/coroutine/korosensei.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ use crate::catch;
22
use crate::common::constants::CoroutineState;
33
use crate::coroutine::listener::Listener;
44
use crate::coroutine::local::CoroutineLocal;
5+
use crate::coroutine::stack_pool::{PooledStack, StackPool};
56
use crate::coroutine::suspender::Suspender;
67
use crate::coroutine::StackInfo;
7-
use corosensei::stack::{DefaultStack, Stack};
8+
use corosensei::stack::Stack;
89
use corosensei::trap::TrapHandlerRegs;
910
use corosensei::CoroutineResult;
1011
use std::cell::{Cell, RefCell};
@@ -26,7 +27,7 @@ cfg_if::cfg_if! {
2627
#[repr(C)]
2728
pub struct Coroutine<'c, Param, Yield, Return> {
2829
pub(crate) name: String,
29-
inner: corosensei::Coroutine<Param, Yield, Result<Return, &'static str>, DefaultStack>,
30+
inner: corosensei::Coroutine<Param, Yield, Result<Return, &'static str>, PooledStack>,
3031
pub(crate) state: Cell<CoroutineState<Yield, Return>>,
3132
pub(crate) stack_size: usize,
3233
pub(crate) stack_infos: RefCell<VecDeque<StackInfo>>,
@@ -307,12 +308,13 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
307308
stack_size: usize,
308309
callback: F,
309310
) -> std::io::Result<R> {
311+
let stack_pool = StackPool::get_instance();
310312
if let Some(co) = Self::current() {
311313
let remaining_stack = unsafe { co.remaining_stack() };
312314
if remaining_stack >= red_zone {
313315
return Ok(callback());
314316
}
315-
return DefaultStack::new(stack_size).map(|stack| {
317+
return stack_pool.allocate(stack_size).map(|stack| {
316318
co.stack_infos.borrow_mut().push_back(StackInfo {
317319
stack_top: stack.base().get(),
318320
stack_bottom: stack.limit().get(),
@@ -335,7 +337,7 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
335337
return Ok(callback());
336338
}
337339
}
338-
DefaultStack::new(stack_size).map(|stack| {
340+
stack_pool.allocate(stack_size).map(|stack| {
339341
STACK_INFOS.with(|s| {
340342
s.borrow_mut().push_back(StackInfo {
341343
stack_top: stack.base().get(),
@@ -378,7 +380,7 @@ where
378380
F: FnOnce(&Suspender<Param, Yield>, Param) -> Return + 'static,
379381
{
380382
let stack_size = stack_size.max(crate::common::page_size());
381-
let stack = DefaultStack::new(stack_size)?;
383+
let stack = StackPool::get_instance().allocate(stack_size)?;
382384
let stack_infos = RefCell::new(VecDeque::from([StackInfo {
383385
stack_top: stack.base().get(),
384386
stack_bottom: stack.limit().get(),

core/src/coroutine/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ pub struct StackInfo {
7676
/// Coroutine state abstraction and impl.
7777
mod state;
7878

79+
pub(crate) mod stack_pool;
80+
7981
impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
8082
/// Get the name of this coroutine.
8183
pub fn name(&self) -> &str {

core/src/coroutine/stack_pool.rs

Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
use crate::common::beans::BeanFactory;
2+
use crate::common::constants::STACK_POOL_BEAN;
3+
use crate::common::now;
4+
use corosensei::stack::{DefaultStack, Stack, StackPointer};
5+
use std::cell::UnsafeCell;
6+
use std::cmp::Ordering;
7+
use std::collections::BinaryHeap;
8+
use std::ops::{Deref, DerefMut};
9+
use std::rc::Rc;
10+
use std::sync::atomic::{AtomicU64, AtomicUsize};
11+
12+
pub(crate) struct PooledStack {
13+
stack_size: usize,
14+
stack: Rc<UnsafeCell<DefaultStack>>,
15+
create_time: u64,
16+
}
17+
18+
impl Deref for PooledStack {
19+
type Target = DefaultStack;
20+
21+
fn deref(&self) -> &DefaultStack {
22+
unsafe {
23+
self.stack
24+
.deref()
25+
.get()
26+
.as_ref()
27+
.expect("PooledStack is not unique")
28+
}
29+
}
30+
}
31+
32+
impl DerefMut for PooledStack {
33+
fn deref_mut(&mut self) -> &mut Self::Target {
34+
unsafe {
35+
self.stack
36+
.deref()
37+
.get()
38+
.as_mut()
39+
.expect("PooledStack is not unique")
40+
}
41+
}
42+
}
43+
44+
impl Clone for PooledStack {
45+
fn clone(&self) -> Self {
46+
Self {
47+
stack_size: self.stack_size,
48+
stack: self.stack.clone(),
49+
create_time: self.create_time,
50+
}
51+
}
52+
}
53+
54+
impl PartialEq<Self> for PooledStack {
55+
fn eq(&self, other: &Self) -> bool {
56+
Rc::strong_count(&other.stack).eq(&Rc::strong_count(&self.stack))
57+
}
58+
}
59+
60+
impl Eq for PooledStack {}
61+
62+
impl PartialOrd<Self> for PooledStack {
63+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
64+
Some(self.cmp(other))
65+
}
66+
}
67+
68+
impl Ord for PooledStack {
69+
fn cmp(&self, other: &Self) -> Ordering {
70+
// BinaryHeap defaults to a large top heap, but we need a small top heap
71+
match Rc::strong_count(&other.stack).cmp(&Rc::strong_count(&self.stack)) {
72+
Ordering::Less => Ordering::Less,
73+
Ordering::Equal => match other.stack_size.cmp(&self.stack_size) {
74+
Ordering::Less => Ordering::Less,
75+
Ordering::Equal => other.create_time.cmp(&self.create_time),
76+
Ordering::Greater => Ordering::Greater,
77+
},
78+
Ordering::Greater => Ordering::Greater,
79+
}
80+
}
81+
}
82+
83+
unsafe impl Stack for PooledStack {
84+
#[inline]
85+
fn base(&self) -> StackPointer {
86+
self.deref().base()
87+
}
88+
89+
#[inline]
90+
fn limit(&self) -> StackPointer {
91+
self.deref().limit()
92+
}
93+
94+
#[cfg(windows)]
95+
#[inline]
96+
fn teb_fields(&self) -> corosensei::stack::StackTebFields {
97+
self.deref().teb_fields()
98+
}
99+
100+
#[cfg(windows)]
101+
#[inline]
102+
fn update_teb_fields(&mut self, stack_limit: usize, guaranteed_stack_bytes: usize) {
103+
self.deref_mut()
104+
.update_teb_fields(stack_limit, guaranteed_stack_bytes);
105+
}
106+
}
107+
108+
impl PooledStack {
109+
pub(crate) fn new(stack_size: usize) -> std::io::Result<Self> {
110+
Ok(Self {
111+
stack_size,
112+
stack: Rc::new(UnsafeCell::new(DefaultStack::new(stack_size)?)),
113+
create_time: now(),
114+
})
115+
}
116+
117+
/// This function must be called after a stack has finished running a coroutine
118+
/// so that the `StackLimit` and `GuaranteedStackBytes` fields from the TEB can
119+
/// be updated in the stack. This is necessary if the stack is reused for
120+
/// another coroutine.
121+
#[inline]
122+
#[cfg(windows)]
123+
pub(crate) fn update_stack_teb_fields(&mut self) {
124+
cfg_if::cfg_if! {
125+
if #[cfg(target_arch = "x86_64")] {
126+
type StackWord = u64;
127+
} else if #[cfg(target_arch = "x86")] {
128+
type StackWord = u32;
129+
}
130+
}
131+
let base = self.base().get() as *const StackWord;
132+
unsafe {
133+
let stack_limit = usize::try_from(*base.sub(1)).expect("stack limit overflow");
134+
let guaranteed_stack_bytes =
135+
usize::try_from(*base.sub(2)).expect("guaranteed stack bytes overflow");
136+
self.update_teb_fields(stack_limit, guaranteed_stack_bytes);
137+
}
138+
}
139+
}
140+
141+
pub(crate) struct StackPool {
142+
pool: UnsafeCell<BinaryHeap<PooledStack>>,
143+
len: AtomicUsize,
144+
//最小内存数,即核心内存数
145+
min_size: AtomicUsize,
146+
//非核心内存的最大存活时间,单位ns
147+
keep_alive_time: AtomicU64,
148+
}
149+
150+
unsafe impl Send for StackPool {}
151+
152+
unsafe impl Sync for StackPool {}
153+
154+
impl Default for StackPool {
155+
fn default() -> Self {
156+
Self::new(0, 10_000_000_000)
157+
}
158+
}
159+
160+
impl StackPool {
161+
pub(crate) fn get_instance<'m>() -> &'m Self {
162+
BeanFactory::get_or_default(STACK_POOL_BEAN)
163+
}
164+
165+
pub(crate) fn new(min_size: usize, keep_alive_time: u64) -> Self {
166+
Self {
167+
pool: UnsafeCell::new(BinaryHeap::default()),
168+
len: AtomicUsize::default(),
169+
min_size: AtomicUsize::new(min_size),
170+
keep_alive_time: AtomicU64::new(keep_alive_time),
171+
}
172+
}
173+
174+
pub(crate) fn allocate(&self, stack_size: usize) -> std::io::Result<PooledStack> {
175+
let heap = unsafe { self.pool.get().as_mut().expect("StackPool is not unique") };
176+
// find min stack
177+
let mut not_use = Vec::new();
178+
while let Some(stack) = heap.peek() {
179+
if Rc::strong_count(&stack.stack) > 1 {
180+
// can't use the stack
181+
break;
182+
}
183+
#[allow(unused_mut)]
184+
if let Some(mut stack) = heap.pop() {
185+
self.sub_len();
186+
if stack_size <= stack.stack_size {
187+
for s in not_use {
188+
heap.push(s);
189+
self.add_len();
190+
}
191+
heap.push(stack.clone());
192+
self.add_len();
193+
#[cfg(windows)]
194+
stack.update_stack_teb_fields();
195+
return Ok(stack);
196+
}
197+
if self.min_size() < self.len()
198+
&& now() <= stack.create_time.saturating_add(self.keep_alive_time())
199+
{
200+
// clean the expired stack
201+
continue;
202+
}
203+
not_use.push(stack);
204+
}
205+
}
206+
let stack = PooledStack::new(stack_size)?;
207+
heap.push(stack.clone());
208+
self.add_len();
209+
Ok(stack)
210+
}
211+
212+
#[allow(dead_code)]
213+
pub(crate) fn set_keep_alive_time(&self, keep_alive_time: u64) {
214+
self.keep_alive_time
215+
.store(keep_alive_time, std::sync::atomic::Ordering::Release);
216+
}
217+
218+
pub(crate) fn keep_alive_time(&self) -> u64 {
219+
self.keep_alive_time
220+
.load(std::sync::atomic::Ordering::Acquire)
221+
}
222+
223+
#[allow(dead_code)]
224+
pub(crate) fn set_min_size(&self, min_size: usize) {
225+
self.min_size
226+
.store(min_size, std::sync::atomic::Ordering::Release);
227+
}
228+
229+
pub(crate) fn min_size(&self) -> usize {
230+
self.min_size.load(std::sync::atomic::Ordering::Acquire)
231+
}
232+
233+
pub(crate) fn len(&self) -> usize {
234+
self.len.load(std::sync::atomic::Ordering::Acquire)
235+
}
236+
237+
fn add_len(&self) {
238+
self.len.store(
239+
self.len().saturating_add(1),
240+
std::sync::atomic::Ordering::Release,
241+
);
242+
}
243+
244+
fn sub_len(&self) {
245+
self.len.store(
246+
self.len().saturating_sub(1),
247+
std::sync::atomic::Ordering::Release,
248+
);
249+
}
250+
251+
/// Clean the expired stack.
252+
#[allow(dead_code)]
253+
pub(crate) fn clean(&self) {
254+
let heap = unsafe { self.pool.get().as_mut().expect("StackPool is not unique") };
255+
let mut maybe_free = Vec::new();
256+
while let Some(stack) = heap.peek() {
257+
if Rc::strong_count(&stack.stack) > 1 {
258+
// can't free the stack
259+
break;
260+
}
261+
if let Some(stack) = heap.pop() {
262+
self.sub_len();
263+
maybe_free.push(stack);
264+
}
265+
}
266+
for stack in maybe_free {
267+
if self.min_size() < self.len()
268+
&& now() <= stack.create_time.saturating_add(self.keep_alive_time())
269+
{
270+
// free the stack
271+
continue;
272+
}
273+
heap.push(stack);
274+
self.add_len();
275+
}
276+
}
277+
}
278+
279+
#[cfg(test)]
280+
mod tests {
281+
use super::*;
282+
use crate::common::constants::DEFAULT_STACK_SIZE;
283+
284+
#[test]
285+
fn test_stack_pool() -> std::io::Result<()> {
286+
let pool = StackPool::default();
287+
let stack = pool.allocate(DEFAULT_STACK_SIZE)?;
288+
assert_eq!(Rc::strong_count(&stack.stack), 2);
289+
drop(stack);
290+
let stack = pool.allocate(DEFAULT_STACK_SIZE)?;
291+
assert_eq!(Rc::strong_count(&stack.stack), 2);
292+
assert_eq!(pool.len(), 1);
293+
_ = pool.allocate(DEFAULT_STACK_SIZE)?;
294+
assert_eq!(pool.len(), 2);
295+
Ok(())
296+
}
297+
}

0 commit comments

Comments
 (0)