Skip to content

update ignore and add feature(renamed_spin_loop) #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ Cargo.lock
/target
*.log
/*.txt
*.dat
.idea
17 changes: 11 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ edition = "2018"
repository = "https://github.com/BinChengZhao/delay-timer"
documentation = "https://docs.rs/delay_timer"
readme = "README.md"
homepage = "https://hyper.rs"
homepage = "https://github.com/BinChengZhao/delay-timer"
description = "Time-manager of delayed tasks. Like crontab, but synchronous asynchronous tasks are possible, and dynamic add/cancel/remove is supported."
keywords = [ "cron", "schedule", "timer", "crontab", "delay" ]
license = "Apache-2.0 OR MIT"
categories = ["development-tools", "data-structures", "asynchronous", "data-structures", "accessibility"]

build = "build/build.rs"


# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand All @@ -29,13 +29,13 @@ status-report = []


[dependencies]
cron_clock = "0.6.6"
cron_clock = "0.6.7"
anyhow = "1.0.31"
rs-snowflake = "0.4.0"
rs-snowflake = "0.5.0"
waitmap = "1.1.0"
lru = "0.6.1"
futures = "0.3.8"
smol = "1.2.4"
smol = "1.2.5"
concat-idents = "1.1.1"


Expand All @@ -45,15 +45,20 @@ tokio = { version = "~1.0.0", features = ["full"] , optional = true }
[dev-dependencies]
surf = "2.1.0"
tokio = { version = "~1.0.0", features = ["full"] }
hyper= {version = "0.14.1" , features = ["full"] }
hyper= {version = "0.14.2" , features = ["full"] }
pretty_env_logger = "0.4"
mockall = "0.8.2"

[dev-dependencies.async-std]
version = "1.6.3"
features = ["attributes"]

[build-dependencies]
autocfg = "1"
rustc_version = "0.2"

# Append the cfg-tag:docsrs to activate the feature(doc_cfg) attribute
# when generating a document on docs.rs.
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ such as Sunday at 4am to execute a backup task.

Supports configuration of the maximum number of parallelism of tasks.

### TODO The minimum-supported version fo rustc is `1.*.*` .

[![Build](https://github.com/BinChengZhao/delay-timer/workflows/Build%20and%20test/badge.svg)](
https://github.com/BinChengZhao/delay-timer/actions)
[![License](https://img.shields.io/badge/license-MIT%2FApache--2.0-blue.svg)](
Expand Down
5 changes: 5 additions & 0 deletions benches/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,8 @@ fn bench_try_wait(b: &mut Bencher) {

b.iter(|| child.try_wait());
}

#[bench]
fn bench_get_timestamp(b: &mut Bencher) {
b.iter(|| get_timestamp());
}
32 changes: 32 additions & 0 deletions build/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
extern crate autocfg;

use autocfg::emit;
use rustc_version::{version, version_meta, Channel, Version};

fn main() {
// Set cfg flags depending on release channel
match version_meta().unwrap().channel {
Channel::Stable => {
println!("cargo:rustc-cfg=RUSTC_IS_STABLE");
}
Channel::Beta => {
println!("cargo:rustc-cfg=RUSTC_IS_BETA");
}
Channel::Nightly => {
emit("nightly");
println!("cargo:rustc-cfg=RUSTC_IS_NIGHTLY");
}
Channel::Dev => {
println!("cargo:rustc-cfg=RUSTC_IS_DEV");
}
}

// Check for a minimum version
if version().unwrap() >= Version::parse("1.51.0").unwrap() {
println!("cargo:rustc-cfg=SPLIT_INCLUSIVE_COMPATIBLE");
}

// (optional) We don't need to rerun for anything external.
// In order to see the compilation parameters at `cargo check --verbose` time, keep it.
autocfg::rerun_path("build.rs");
}
23 changes: 20 additions & 3 deletions examples/demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ fn main() {
delay_timer.add_task(build_task2(task_builder)).unwrap();
delay_timer.add_task(build_task3(task_builder)).unwrap();
delay_timer.add_task(build_task5(task_builder)).unwrap();
delay_timer.add_task(build_task7(task_builder)).unwrap();

// Let's do someting about 2s.
sleep(Duration::new(2, 1_000_000));
Expand Down Expand Up @@ -80,6 +81,20 @@ fn build_task5(mut task_builder: TaskBuilder) -> Task {
.unwrap()
}

fn build_task7(mut task_builder: TaskBuilder) -> Task {
let body = create_async_fn_body!({
dbg!(get_timestamp());

Timer::after(Duration::from_secs(3)).await;
});
task_builder
.set_task_id(7)
.set_frequency_by_candy(CandyFrequency::Repeated(AuspiciousTime::PerDayFiveAclock))
.set_maximun_parallel_runable_num(2)
.spawn(body)
.unwrap()
}

pub fn generate_closure_template(
name: String,
) -> impl Fn(TaskContext) -> Box<dyn DelayTaskHandler> + 'static + Send + Sync {
Expand Down Expand Up @@ -136,14 +151,16 @@ enum AuspiciousTime {
PerSevenSeconds,
PerEightSeconds,
LoveTime,
PerDayFiveAclock,
}

impl Into<CandyCronStr> for AuspiciousTime {
fn into(self) -> CandyCronStr {
match self {
Self::PerSevenSeconds => CandyCronStr("0/7 * * * * * *"),
Self::PerEightSeconds => CandyCronStr("0/8 * * * * * *"),
Self::LoveTime => CandyCronStr("0,10,15,25,50 0/1 * * Jan-Dec * 2020-2100"),
Self::PerSevenSeconds => CandyCronStr("0/7 * * * * * *".to_string()),
Self::PerEightSeconds => CandyCronStr("0/8 * * * * * *".to_string()),
Self::LoveTime => CandyCronStr("0,10,15,25,50 0/1 * * Jan-Dec * 2020-2100".to_string()),
Self::PerDayFiveAclock => CandyCronStr("01 00 1 * * * *".to_string()),
}
}
}
28 changes: 21 additions & 7 deletions examples/increase.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
#![feature(ptr_internals)]
use delay_timer::prelude::*;

use std::ptr::Unique;
use std::ops::Deref;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
use std::sync::Arc;
use std::thread::{current, park, Thread};

use surf;

#[derive(Debug, Clone, Copy)]
struct SafePointer(NonNull<Arc<AtomicUsize>>);

unsafe impl Send for SafePointer {}
unsafe impl Sync for SafePointer {}
impl Deref for SafePointer {
type Target = NonNull<Arc<AtomicUsize>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}

// Remember close terminal can speed up because of
// printnl! block process if stand-pipe if full.
fn main() {
Expand All @@ -15,7 +28,8 @@ fn main() {
// The Sync-Task run_flay.
let mut run_flag = Arc::new(AtomicUsize::new(0));
// cross thread share raw-pointer.
let run_flag_ref: Option<Unique<Arc<AtomicUsize>>> = Unique::new(&mut run_flag);
let run_flag_ref: SafePointer =
SafePointer(NonNull::new(&mut run_flag as *mut Arc<AtomicUsize>).unwrap());

// Sync-Task body.
let body = get_increase_fn(run_flag_ref);
Expand Down Expand Up @@ -53,10 +67,10 @@ fn main() {
}

fn get_increase_fn(
run_flag_ref: Option<Unique<Arc<AtomicUsize>>>,
run_flag_ref: SafePointer,
) -> impl Copy + Fn(TaskContext) -> Box<dyn DelayTaskHandler> {
move |_context| {
let local_run_flag = run_flag_ref.unwrap().as_ptr();
let local_run_flag = run_flag_ref.as_ptr();

unsafe {
(*local_run_flag).fetch_add(1, SeqCst);
Expand All @@ -67,10 +81,10 @@ fn get_increase_fn(

fn get_wake_fn(
thread: Thread,
run_flag_ref: Option<Unique<Arc<AtomicUsize>>>,
run_flag_ref: SafePointer,
) -> impl Fn(TaskContext) -> Box<dyn DelayTaskHandler> {
move |_context| {
let local_run_flag = run_flag_ref.unwrap().as_ptr();
let local_run_flag = run_flag_ref.as_ptr();
unsafe {
println!(
"end time {}, result {}",
Expand Down
52 changes: 52 additions & 0 deletions examples/profile_memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use delay_timer::prelude::*;

#[derive(Default)]
struct RequstBody {
_id: u64,
cron_expression: String,
_token: String,
}

impl RequstBody {
fn fake_request_body() -> Self {
let string_buf = [16u16, 16u16, 16u16].repeat(10000);
let cron_expression = String::from_utf16(&string_buf).unwrap();
Self {
cron_expression,
..Default::default()
}
}
}

impl Into<CandyCronStr> for RequstBody {
fn into(self) -> CandyCronStr {
CandyCronStr(self.cron_expression)
}
}

// LD_PRELOAD=../../tools-bin/libmemory_profiler.so ./target/debug/examples/profile_memory
// ../../tools-bin/memory-profiler-cli server memory-profiling_*.dat
fn main() {
// let mut task_builder_vec: Vec<RequstBody> = Vec::with_capacity(100_000);
let mut task_builder_vec: Vec<TaskBuilder> = Vec::with_capacity(100_000);

for _ in 0..256_00 {
task_builder_vec.push({
let mut task_builder = TaskBuilder::default();
task_builder
.set_frequency_by_candy(CandyFrequency::Repeated(RequstBody::fake_request_body()));

task_builder
});

// task_builder_vec.push(RequstBody::fake_request_body());
}

for _ in 0..256_00 {
// FIXME: It can't free memory.
task_builder_vec.pop().unwrap().free();
// task_builder_vec.pop().unwrap();
}

drop(task_builder_vec);
}
1 change: 1 addition & 0 deletions src/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ impl DelayTimer {
}

/// Cancel a task in timer_core by event-channel.
/// `Cancel` is for instances derived from the task running up.
pub fn cancel_task(&self, task_id: u64, record_id: i64) -> Result<()> {
self.seed_timer_event(TimerEvent::CancelTask(task_id, record_id))
}
Expand Down
22 changes: 10 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//!
//! ```toml
//! [dependencies]
//! delay_timer = "0.2.0"
//! delay_timer = "*"
//! ```
//!
//! Next:
Expand Down Expand Up @@ -76,9 +76,9 @@
//! impl Into<CandyCronStr> for AuspiciousTime {
//! fn into(self) -> CandyCronStr {
//! match self {
//! Self::PerSevenSeconds => CandyCronStr("0/7 * * * * * *"),
//! Self::PerEightSeconds => CandyCronStr("0/8 * * * * * *"),
//! Self::LoveTime => CandyCronStr("0,10,15,25,50 0/1 * * Jan-Dec * 2020-2100"),
//! Self::PerSevenSeconds => CandyCronStr("0/7 * * * * * *".to_string()),
//! Self::PerEightSeconds => CandyCronStr("0/8 * * * * * *".to_string()),
//! Self::LoveTime => CandyCronStr("0,10,15,25,50 0/1 * * Jan-Dec * 2020-2100".to_string()),
//! }
//! }
//! }
Expand Down Expand Up @@ -194,18 +194,16 @@
//! impl Into<CandyCronStr> for AuspiciousTime {
//! fn into(self) -> CandyCronStr {
//! match self {
//! Self::PerSevenSeconds => CandyCronStr("0/7 * * * * * *"),
//! Self::PerEightSeconds => CandyCronStr("0/8 * * * * * *"),
//! Self::LoveTime => CandyCronStr("0,10,15,25,50 0/1 * * Jan-Dec * 2020-2100"),
//! Self::PerSevenSeconds => CandyCronStr("0/7 * * * * * *".to_string()),
//! Self::PerEightSeconds => CandyCronStr("0/8 * * * * * *".to_string()),
//! Self::LoveTime => CandyCronStr("0,10,15,25,50 0/1 * * Jan-Dec * 2020-2100".to_string()),
//! }
//! }
//! }
//! ```

#![feature(linked_list_cursors)]
#![feature(doc_cfg)]
// Backup : https://github.com/contain-rs/linked-list/blob/master/src/lib.rs

#![cfg_attr(RUSTC_IS_NIGHTLY, feature(linked_list_cursors))]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![feature(renamed_spin_loop)]
// TODO:When the version is stable in the future, we should consider using stable compile unified.
// FIXME: Auto fill cli-args `features = full` when exec cargo test.
#[macro_use]
Expand Down
4 changes: 2 additions & 2 deletions src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub use crate::entity::{get_timestamp, get_timestamp_micros, DelayTimer, DelayTi
pub use crate::macros::*;
pub use crate::timer::runtime_trace::task_handle::DelayTaskHandler;
pub use crate::timer::task::TaskContext;
pub use crate::timer::task::{Frequency, Task, TaskBuilder};
pub use crate::timer::task::{Frequency, ScheduleIteratorTimeZone, Task, TaskBuilder};
pub use crate::timer::timer_core::TimerEvent;
pub use crate::utils::convenience::cron_expression_grammatical_candy::{
CandyCron, CandyCronStr, CandyFrequency,
Expand All @@ -24,7 +24,7 @@ pub use crate::utils::convenience::functions::{
};

pub use anyhow::Result as AnyResult;
pub use cron_clock;
pub use cron_clock::{self, FixedOffset, Local, TimeZone, Utc};
pub use smol::future as future_lite;
pub use smol::spawn as async_spawn;
pub use smol::unblock as unblock_spawn;
Expand Down
11 changes: 4 additions & 7 deletions src/timer/event_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ impl EventHandle {
self.task_trace.insert(task_id, delay_task_handler_box);
}

TimerEvent::FinishTask(task_id, record_id) => {
TimerEvent::FinishTask(task_id, record_id, _finish_time) => {
//TODO: maintain a outside-task-handle , through it pass the _finish_time and final-state.
//给外部单独提供开始时间, record_id 的时间有延迟
//或者是用snowflake.real_time 生成record_id ,就不用单独加字段了。。。
self.cancel_task(task_id, record_id);
}
}
Expand All @@ -236,12 +239,6 @@ impl EventHandle {
.unwrap_or_else(|e| println!("{}", e));
}

//TODO:
//cancel for exit running task.
//stop is suspension of execution(set vaild).
//user delete task , node should remove.
//any `Task` can set `valid` for that stop.

//add task to wheel_queue slot
fn add_task(&mut self, mut task: Task) -> TaskMark {
let second_hand = self.shared_header.second_hand.load(Acquire);
Expand Down
Loading