Skip to content

feat: impl time-wheel based timer #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions fastimer-driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use parking::Unparker;

mod heap;
pub use heap::*;
mod wheel;
pub use wheel::*;

#[derive(Debug)]
struct TimeEntry {
Expand Down
210 changes: 210 additions & 0 deletions fastimer-driver/src/wheel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright 2024 FastLabs Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;
use std::ops::ControlFlow;
use std::sync::Arc;
use std::sync::atomic;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::time::Duration;
use std::time::Instant;

use crossbeam_queue::SegQueue;
use parking::Parker;
use parking::Unparker;

use crate::TimeContext;
use crate::TimeDriverShutdown;
use crate::TimeEntry;

/// Returns a new time driver, its time context and the shutdown handle.
pub fn time_wheel_driver() -> (TimeWheelTimeDriver, TimeContext, TimeDriverShutdown) {
let (parker, unparker) = parking::pair();
let wheel = TimeWheel::new();
let inbounds = Arc::new(SegQueue::new());
let shutdown = Arc::new(AtomicBool::new(false));

let driver = TimeWheelTimeDriver {
parker,
unparker,
wheel,
inbounds,
shutdown,
last_tick: Instant::now(),
};

let context = TimeContext {
unparker: driver.unparker.clone(),
inbounds: driver.inbounds.clone(),
};

let shutdown = TimeDriverShutdown {
unparker: driver.unparker.clone(),
shutdown: driver.shutdown.clone(),
};

(driver, context, shutdown)
}

const WHEEL_BITS: u64 = 6;
const WHEEL_SIZE: u64 = 1 << WHEEL_BITS;
const WHEEL_MASK: u64 = WHEEL_SIZE - 1;
const WHEEL_LEVELS: usize = 4;

type Slot = VecDeque<TimeEntry>;

#[derive(Debug)]

/// The time wheel structure for hierarchical timer management
struct TimeWheel {
/// Multi-level time wheel, each level has 64 slots
wheels: [Vec<Slot>; WHEEL_LEVELS],
/// The current tick count
current_tick: AtomicU64,
/// Duration of each tick in milliseconds
tick_duration: Duration,
}

impl TimeWheel {
fn new() -> Self {
let wheels = [(); WHEEL_LEVELS].map(|_| (0..WHEEL_SIZE).map(|_| VecDeque::new()).collect());

Self {
wheels,
current_tick: AtomicU64::new(0),
tick_duration: Duration::from_millis(1),
}
}

/// Calculate which wheel level and slot a timer should go into.
fn calculate_slot(&self, when: Instant) -> Option<(usize, usize)> {
let now = Instant::now();
if when <= now {
return None;
}

let delta = when.duration_since(now);
let ticks = delta.as_millis() as u64;
let current_tick = self.current_tick.load(atomic::Ordering::Acquire);
let target_tick = current_tick + ticks;

// Determine which wheel level to place the timer in.
for level in 0..WHEEL_LEVELS {
let shift = WHEEL_BITS * level as u64;
let mask = WHEEL_MASK << shift;

if (target_tick & mask) != (current_tick & mask) {
let slot_index = ((target_tick >> shift) & WHEEL_MASK) as usize;
return Some((level, slot_index));
}
}

// If the delay exceeds all wheel levels, place in the last slot of the highest level.
Some((WHEEL_LEVELS - 1, (WHEEL_SIZE - 1) as usize))
}

/// Add a timer entry into the time wheel.
fn add_timer(&mut self, entry: TimeEntry) {
if let Some((level, slot)) = self.calculate_slot(entry.when) {
self.wheels[level][slot].push_back(entry);
} else {
// If already expired, wake immediately.
entry.waker.wake();
}
}

/// Advance the time wheel by one tick and return all expired timers.
fn advance_tick(&mut self) -> Vec<TimeEntry> {
let mut expired = Vec::new();
let current_tick = self.current_tick.fetch_add(1, atomic::Ordering::AcqRel);

// Check the current slot in the first-level wheel.
let slot_index = (current_tick & WHEEL_MASK) as usize;
expired.extend(self.wheels[0][slot_index].drain(..));

// Check if timers from higher-level wheels need to be cascaded down.
for level in 1..WHEEL_LEVELS {
let shift = WHEEL_BITS * level as u64;
if (current_tick & ((1 << shift) - 1)) == 0 {
let slot_index = ((current_tick >> shift) & WHEEL_MASK) as usize;
let timers: Vec<_> = self.wheels[level][slot_index].drain(..).collect();

for timer in timers {
self.add_timer(timer);
}
}
}

expired
}
}

/// A time-wheel based time driver that drives registered timers.
#[derive(Debug)]
pub struct TimeWheelTimeDriver {
parker: Parker,
unparker: Unparker,
wheel: TimeWheel,
inbounds: Arc<SegQueue<TimeEntry>>,
shutdown: Arc<AtomicBool>,
last_tick: Instant,
}

impl TimeWheelTimeDriver {
/// Drives the timers and returns `true` if the driver has been shut down.
pub fn turn(&mut self) -> ControlFlow<()> {
if self.shutdown.load(atomic::Ordering::Acquire) {
return ControlFlow::Break(());
}

let now = Instant::now();

while let Some(entry) = self.inbounds.pop() {
self.wheel.add_timer(entry);
}

let elapsed = now.duration_since(self.last_tick);
let ticks_to_advance = (elapsed.as_millis() as u64).max(1);

let mut all_expired = Vec::new();
for _ in 0..ticks_to_advance {
let expired = self.wheel.advance_tick();
all_expired.extend(expired);
}

for entry in all_expired {
if entry.when <= now {
entry.waker.wake();
} else {
self.wheel.add_timer(entry);
}
}

self.last_tick = now;

let next_tick_time = self.last_tick + self.wheel.tick_duration;
let sleep_duration = next_tick_time.saturating_duration_since(Instant::now());

if sleep_duration > Duration::ZERO {
self.parker.park_timeout(sleep_duration);
}

if self.shutdown.load(atomic::Ordering::Acquire) {
ControlFlow::Break(())
} else {
ControlFlow::Continue(())
}
}
}
33 changes: 33 additions & 0 deletions fastimer-driver/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::Instant;

use fastimer::make_instant_from_now;
use fastimer_driver::binary_heap_driver;
use fastimer_driver::time_wheel_driver;

#[track_caller]
fn assert_duration_eq(actual: Duration, expected: Duration) {
Expand All @@ -25,6 +26,38 @@ fn assert_duration_eq(actual: Duration, expected: Duration) {
}
}

#[test]
fn test_time_wheel_driver() {
let (mut driver, context, shutdown) = time_wheel_driver();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
loop {
if driver.turn().is_break() {
tx.send(()).unwrap();
break;
}
}
});

let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let now = Instant::now();

context.delay(Duration::from_secs(2)).await;
assert_duration_eq(now.elapsed(), Duration::from_secs(2));

let now = Instant::now();
let future = make_instant_from_now(Duration::from_secs(3));
let f1 = context.delay_until(future);
let f2 = context.delay_until(future);
tokio::join!(f1, f2);
assert_duration_eq(now.elapsed(), Duration::from_secs(3));

shutdown.shutdown();
});
rx.recv_timeout(Duration::from_secs(1)).unwrap();
}

#[test]
fn test_binary_heap_driver() {
let (mut driver, context, shutdown) = binary_heap_driver();
Expand Down
Loading