diff --git a/.gitignore b/.gitignore index 3b8cbc7..6ca3423 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,5 @@ Cargo.lock /target *.log /*.txt +*.dat +.idea \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 7517535..36cd7ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,12 +6,12 @@ edition = "2018" repository = "https://github.com/BinChengZhao/delay-timer" documentation = "https://docs.rs/delay_timer" readme = "README.md" -homepage = "https://hyper.rs" +homepage = "https://github.com/BinChengZhao/delay-timer" description = "Time-manager of delayed tasks. Like crontab, but synchronous asynchronous tasks are possible, and dynamic add/cancel/remove is supported." keywords = [ "cron", "schedule", "timer", "crontab", "delay" ] license = "Apache-2.0 OR MIT" categories = ["development-tools", "data-structures", "asynchronous", "data-structures", "accessibility"] - +build = "build/build.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -29,13 +29,13 @@ status-report = [] [dependencies] -cron_clock = "0.6.6" +cron_clock = "0.6.7" anyhow = "1.0.31" -rs-snowflake = "0.4.0" +rs-snowflake = "0.5.0" waitmap = "1.1.0" lru = "0.6.1" futures = "0.3.8" -smol = "1.2.4" +smol = "1.2.5" concat-idents = "1.1.1" @@ -45,7 +45,7 @@ tokio = { version = "~1.0.0", features = ["full"] , optional = true } [dev-dependencies] surf = "2.1.0" tokio = { version = "~1.0.0", features = ["full"] } -hyper= {version = "0.14.1" , features = ["full"] } +hyper= {version = "0.14.2" , features = ["full"] } pretty_env_logger = "0.4" mockall = "0.8.2" @@ -53,7 +53,12 @@ mockall = "0.8.2" version = "1.6.3" features = ["attributes"] +[build-dependencies] +autocfg = "1" +rustc_version = "0.2" +# Append the cfg-tag:docsrs to activate the feature(doc_cfg) attribute +# when generating a document on docs.rs. [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] diff --git a/README.md b/README.md index 18d7f24..9f4b1d2 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ such as Sunday at 4am to execute a backup task. Supports configuration of the maximum number of parallelism of tasks. +### TODO The minimum-supported version fo rustc is `1.*.*` . + [![Build](https://github.com/BinChengZhao/delay-timer/workflows/Build%20and%20test/badge.svg)]( https://github.com/BinChengZhao/delay-timer/actions) [![License](https://img.shields.io/badge/license-MIT%2FApache--2.0-blue.svg)]( diff --git a/benches/body.rs b/benches/body.rs index eeb5833..a036886 100644 --- a/benches/body.rs +++ b/benches/body.rs @@ -53,3 +53,8 @@ fn bench_try_wait(b: &mut Bencher) { b.iter(|| child.try_wait()); } + +#[bench] +fn bench_get_timestamp(b: &mut Bencher) { + b.iter(|| get_timestamp()); +} diff --git a/build/build.rs b/build/build.rs new file mode 100644 index 0000000..7df2c9b --- /dev/null +++ b/build/build.rs @@ -0,0 +1,32 @@ +extern crate autocfg; + +use autocfg::emit; +use rustc_version::{version, version_meta, Channel, Version}; + +fn main() { + // Set cfg flags depending on release channel + match version_meta().unwrap().channel { + Channel::Stable => { + println!("cargo:rustc-cfg=RUSTC_IS_STABLE"); + } + Channel::Beta => { + println!("cargo:rustc-cfg=RUSTC_IS_BETA"); + } + Channel::Nightly => { + emit("nightly"); + println!("cargo:rustc-cfg=RUSTC_IS_NIGHTLY"); + } + Channel::Dev => { + println!("cargo:rustc-cfg=RUSTC_IS_DEV"); + } + } + + // Check for a minimum version + if version().unwrap() >= Version::parse("1.51.0").unwrap() { + println!("cargo:rustc-cfg=SPLIT_INCLUSIVE_COMPATIBLE"); + } + + // (optional) We don't need to rerun for anything external. + // In order to see the compilation parameters at `cargo check --verbose` time, keep it. + autocfg::rerun_path("build.rs"); +} diff --git a/examples/demo.rs b/examples/demo.rs index 3af9da6..58c3c3d 100644 --- a/examples/demo.rs +++ b/examples/demo.rs @@ -16,6 +16,7 @@ fn main() { delay_timer.add_task(build_task2(task_builder)).unwrap(); delay_timer.add_task(build_task3(task_builder)).unwrap(); delay_timer.add_task(build_task5(task_builder)).unwrap(); + delay_timer.add_task(build_task7(task_builder)).unwrap(); // Let's do someting about 2s. sleep(Duration::new(2, 1_000_000)); @@ -80,6 +81,20 @@ fn build_task5(mut task_builder: TaskBuilder) -> Task { .unwrap() } +fn build_task7(mut task_builder: TaskBuilder) -> Task { + let body = create_async_fn_body!({ + dbg!(get_timestamp()); + + Timer::after(Duration::from_secs(3)).await; + }); + task_builder + .set_task_id(7) + .set_frequency_by_candy(CandyFrequency::Repeated(AuspiciousTime::PerDayFiveAclock)) + .set_maximun_parallel_runable_num(2) + .spawn(body) + .unwrap() +} + pub fn generate_closure_template( name: String, ) -> impl Fn(TaskContext) -> Box + 'static + Send + Sync { @@ -136,14 +151,16 @@ enum AuspiciousTime { PerSevenSeconds, PerEightSeconds, LoveTime, + PerDayFiveAclock, } impl Into for AuspiciousTime { fn into(self) -> CandyCronStr { match self { - Self::PerSevenSeconds => CandyCronStr("0/7 * * * * * *"), - Self::PerEightSeconds => CandyCronStr("0/8 * * * * * *"), - Self::LoveTime => CandyCronStr("0,10,15,25,50 0/1 * * Jan-Dec * 2020-2100"), + Self::PerSevenSeconds => CandyCronStr("0/7 * * * * * *".to_string()), + Self::PerEightSeconds => CandyCronStr("0/8 * * * * * *".to_string()), + Self::LoveTime => CandyCronStr("0,10,15,25,50 0/1 * * Jan-Dec * 2020-2100".to_string()), + Self::PerDayFiveAclock => CandyCronStr("01 00 1 * * * *".to_string()), } } } diff --git a/examples/increase.rs b/examples/increase.rs index 09ad488..072cc72 100644 --- a/examples/increase.rs +++ b/examples/increase.rs @@ -1,12 +1,25 @@ -#![feature(ptr_internals)] use delay_timer::prelude::*; -use std::ptr::Unique; +use std::ops::Deref; +use std::ptr::NonNull; use std::sync::atomic::{AtomicUsize, Ordering::SeqCst}; use std::sync::Arc; use std::thread::{current, park, Thread}; use surf; + +#[derive(Debug, Clone, Copy)] +struct SafePointer(NonNull>); + +unsafe impl Send for SafePointer {} +unsafe impl Sync for SafePointer {} +impl Deref for SafePointer { + type Target = NonNull>; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + // Remember close terminal can speed up because of // printnl! block process if stand-pipe if full. fn main() { @@ -15,7 +28,8 @@ fn main() { // The Sync-Task run_flay. let mut run_flag = Arc::new(AtomicUsize::new(0)); // cross thread share raw-pointer. - let run_flag_ref: Option>> = Unique::new(&mut run_flag); + let run_flag_ref: SafePointer = + SafePointer(NonNull::new(&mut run_flag as *mut Arc).unwrap()); // Sync-Task body. let body = get_increase_fn(run_flag_ref); @@ -53,10 +67,10 @@ fn main() { } fn get_increase_fn( - run_flag_ref: Option>>, + run_flag_ref: SafePointer, ) -> impl Copy + Fn(TaskContext) -> Box { move |_context| { - let local_run_flag = run_flag_ref.unwrap().as_ptr(); + let local_run_flag = run_flag_ref.as_ptr(); unsafe { (*local_run_flag).fetch_add(1, SeqCst); @@ -67,10 +81,10 @@ fn get_increase_fn( fn get_wake_fn( thread: Thread, - run_flag_ref: Option>>, + run_flag_ref: SafePointer, ) -> impl Fn(TaskContext) -> Box { move |_context| { - let local_run_flag = run_flag_ref.unwrap().as_ptr(); + let local_run_flag = run_flag_ref.as_ptr(); unsafe { println!( "end time {}, result {}", diff --git a/examples/profile_memory.rs b/examples/profile_memory.rs new file mode 100644 index 0000000..30b7367 --- /dev/null +++ b/examples/profile_memory.rs @@ -0,0 +1,52 @@ +use delay_timer::prelude::*; + +#[derive(Default)] +struct RequstBody { + _id: u64, + cron_expression: String, + _token: String, +} + +impl RequstBody { + fn fake_request_body() -> Self { + let string_buf = [16u16, 16u16, 16u16].repeat(10000); + let cron_expression = String::from_utf16(&string_buf).unwrap(); + Self { + cron_expression, + ..Default::default() + } + } +} + +impl Into for RequstBody { + fn into(self) -> CandyCronStr { + CandyCronStr(self.cron_expression) + } +} + +// LD_PRELOAD=../../tools-bin/libmemory_profiler.so ./target/debug/examples/profile_memory +// ../../tools-bin/memory-profiler-cli server memory-profiling_*.dat +fn main() { + // let mut task_builder_vec: Vec = Vec::with_capacity(100_000); + let mut task_builder_vec: Vec = Vec::with_capacity(100_000); + + for _ in 0..256_00 { + task_builder_vec.push({ + let mut task_builder = TaskBuilder::default(); + task_builder + .set_frequency_by_candy(CandyFrequency::Repeated(RequstBody::fake_request_body())); + + task_builder + }); + + // task_builder_vec.push(RequstBody::fake_request_body()); + } + + for _ in 0..256_00 { + // FIXME: It can't free memory. + task_builder_vec.pop().unwrap().free(); + // task_builder_vec.pop().unwrap(); + } + + drop(task_builder_vec); +} diff --git a/src/entity.rs b/src/entity.rs index 3417b47..930ceab 100644 --- a/src/entity.rs +++ b/src/entity.rs @@ -275,6 +275,7 @@ impl DelayTimer { } /// Cancel a task in timer_core by event-channel. + /// `Cancel` is for instances derived from the task running up. pub fn cancel_task(&self, task_id: u64, record_id: i64) -> Result<()> { self.seed_timer_event(TimerEvent::CancelTask(task_id, record_id)) } diff --git a/src/lib.rs b/src/lib.rs index 2af582d..4eb2cc6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,7 @@ //! //! ```toml //! [dependencies] -//! delay_timer = "0.2.0" +//! delay_timer = "*" //! ``` //! //! Next: @@ -76,9 +76,9 @@ //! impl Into for AuspiciousTime { //! fn into(self) -> CandyCronStr { //! match self { -//! Self::PerSevenSeconds => CandyCronStr("0/7 * * * * * *"), -//! Self::PerEightSeconds => CandyCronStr("0/8 * * * * * *"), -//! Self::LoveTime => CandyCronStr("0,10,15,25,50 0/1 * * Jan-Dec * 2020-2100"), +//! Self::PerSevenSeconds => CandyCronStr("0/7 * * * * * *".to_string()), +//! Self::PerEightSeconds => CandyCronStr("0/8 * * * * * *".to_string()), +//! Self::LoveTime => CandyCronStr("0,10,15,25,50 0/1 * * Jan-Dec * 2020-2100".to_string()), //! } //! } //! } @@ -194,18 +194,16 @@ //! impl Into for AuspiciousTime { //! fn into(self) -> CandyCronStr { //! match self { -//! Self::PerSevenSeconds => CandyCronStr("0/7 * * * * * *"), -//! Self::PerEightSeconds => CandyCronStr("0/8 * * * * * *"), -//! Self::LoveTime => CandyCronStr("0,10,15,25,50 0/1 * * Jan-Dec * 2020-2100"), +//! Self::PerSevenSeconds => CandyCronStr("0/7 * * * * * *".to_string()), +//! Self::PerEightSeconds => CandyCronStr("0/8 * * * * * *".to_string()), +//! Self::LoveTime => CandyCronStr("0,10,15,25,50 0/1 * * Jan-Dec * 2020-2100".to_string()), //! } //! } //! } //! ``` - -#![feature(linked_list_cursors)] -#![feature(doc_cfg)] -// Backup : https://github.com/contain-rs/linked-list/blob/master/src/lib.rs - +#![cfg_attr(RUSTC_IS_NIGHTLY, feature(linked_list_cursors))] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![feature(renamed_spin_loop)] // TODO:When the version is stable in the future, we should consider using stable compile unified. // FIXME: Auto fill cli-args `features = full` when exec cargo test. #[macro_use] diff --git a/src/prelude.rs b/src/prelude.rs index 349c7a8..bc73a4e 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -14,7 +14,7 @@ pub use crate::entity::{get_timestamp, get_timestamp_micros, DelayTimer, DelayTi pub use crate::macros::*; pub use crate::timer::runtime_trace::task_handle::DelayTaskHandler; pub use crate::timer::task::TaskContext; -pub use crate::timer::task::{Frequency, Task, TaskBuilder}; +pub use crate::timer::task::{Frequency, ScheduleIteratorTimeZone, Task, TaskBuilder}; pub use crate::timer::timer_core::TimerEvent; pub use crate::utils::convenience::cron_expression_grammatical_candy::{ CandyCron, CandyCronStr, CandyFrequency, @@ -24,7 +24,7 @@ pub use crate::utils::convenience::functions::{ }; pub use anyhow::Result as AnyResult; -pub use cron_clock; +pub use cron_clock::{self, FixedOffset, Local, TimeZone, Utc}; pub use smol::future as future_lite; pub use smol::spawn as async_spawn; pub use smol::unblock as unblock_spawn; diff --git a/src/timer/event_handle.rs b/src/timer/event_handle.rs index d7e2ab1..1392d7f 100644 --- a/src/timer/event_handle.rs +++ b/src/timer/event_handle.rs @@ -221,7 +221,10 @@ impl EventHandle { self.task_trace.insert(task_id, delay_task_handler_box); } - TimerEvent::FinishTask(task_id, record_id) => { + TimerEvent::FinishTask(task_id, record_id, _finish_time) => { + //TODO: maintain a outside-task-handle , through it pass the _finish_time and final-state. + //给外部单独提供开始时间, record_id 的时间有延迟 + //或者是用snowflake.real_time 生成record_id ,就不用单独加字段了。。。 self.cancel_task(task_id, record_id); } } @@ -236,12 +239,6 @@ impl EventHandle { .unwrap_or_else(|e| println!("{}", e)); } - //TODO: - //cancel for exit running task. - //stop is suspension of execution(set vaild). - //user delete task , node should remove. - //any `Task` can set `valid` for that stop. - //add task to wheel_queue slot fn add_task(&mut self, mut task: Task) -> TaskMark { let second_hand = self.shared_header.second_hand.load(Acquire); diff --git a/src/timer/runtime_trace/task_handle.rs b/src/timer/runtime_trace/task_handle.rs index a5f197f..4c34e51 100644 --- a/src/timer/runtime_trace/task_handle.rs +++ b/src/timer/runtime_trace/task_handle.rs @@ -10,12 +10,12 @@ use anyhow::Result; use smol::Task as SmolTask; #[derive(Default, Debug)] -///TaskTrace is contanier that own global task-handle. +/// TaskTrace is contanier that own global task-handle. pub(crate) struct TaskTrace { inner: HashMap>, } -//TaskTrace can cancel a task via a Task Handle. +// TaskTrace can cancel a task via a Task Handle. impl TaskTrace { pub(crate) fn insert(&mut self, task_id: u64, task_handler_box: DelayTaskHandlerBox) { //entry is amazing! @@ -34,9 +34,13 @@ impl TaskTrace { } } - //linkedlist is ordered by record_id, if input record_id is small than linkedlist first record_id - //that is no task_handler can cancel or record_id bigger than last record_id. - //one record_id may be used for many handler. + // linkedlist is ordered by record_id, if input record_id is small than linkedlist first record_id + // that is no task_handler can cancel or record_id bigger than last record_id. + // one record_id may be used for many handler. + + //TODO: 一个stable 的cfg, 一个nightly 的cfg . + + #[cfg(RUSTC_IS_NIGHTLY)] pub(crate) fn quit_one_task_handler( &mut self, task_id: u64, @@ -68,6 +72,23 @@ impl TaskTrace { None } } + + #[cfg(not(RUSTC_IS_NIGHTLY))] + pub(crate) fn quit_one_task_handler( + &mut self, + task_id: u64, + record_id: i64, + ) -> Option> { + let task_handler_list = self.inner.get_mut(&task_id)?; + let index = task_handler_list + .iter() + .position(|d| d.record_id == record_id)?; + + let mut has_remove_element_list = task_handler_list.split_off(index); + let mut remove_element = has_remove_element_list.pop_front()?; + task_handler_list.append(&mut has_remove_element_list); + Some(remove_element.quit()) + } } //I export that trait for that crate user. diff --git a/src/timer/task.rs b/src/timer/task.rs index e5b9976..dc8e3d1 100644 --- a/src/timer/task.rs +++ b/src/timer/task.rs @@ -13,9 +13,9 @@ use cron_clock::{Schedule, ScheduleIteratorOwned, Utc}; use lru::LruCache; //TODO: Add doc. -thread_local!(static CRON_EXPRESSION_CACHE: RefCell>> = RefCell::new(LruCache::new(256))); +thread_local!(static CRON_EXPRESSION_CACHE: RefCell> = RefCell::new(LruCache::new(256))); -//TaskMark is use to remove/stop the task. +// TaskMark is use to remove/stop the task. #[derive(Default, Debug, Clone, Copy)] pub(crate) struct TaskMark { pub(crate) task_id: u64, @@ -54,22 +54,140 @@ impl TaskMark { } #[derive(Debug, Copy, Clone)] -///Enumerated values of repeating types. +/// Enumerated values of repeating types. pub enum Frequency<'a> { - ///Repeat once. + /// Repeat once. Once(&'a str), - ///Repeat ad infinitum. + /// Repeat ad infinitum. Repeated(&'a str), - ///Type of countdown. + /// Type of countdown. CountDown(u32, &'a str), } -///Iterator for task internal control of execution time. +/// Iterator for task internal control of execution time. #[derive(Debug, Clone)] pub(crate) enum FrequencyInner { ///Unlimited repetition types. - Repeated(ScheduleIteratorOwned), + Repeated(DelayTimerScheduleIteratorOwned), ///Type of countdown. - CountDown(u32, ScheduleIteratorOwned), + CountDown(u32, DelayTimerScheduleIteratorOwned), +} + +/// Set the time zone for the time of the expression iteration. +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub enum ScheduleIteratorTimeZone { + /// Utc specifies the UTC time zone. It is most efficient. + Utc, + /// Local specifies the system local time zone. + Local, + /// FixedOffset specifies an arbitrary, fixed time zone such as UTC+09:00 or UTC-10:30. This often results from the parsed textual date and time. Since it stores the most information and does not depend on the system environment, you would want to normalize other TimeZones into this type. + FixedOffset(FixedOffset), +} + +#[derive(Debug, Clone, Default, Hash, PartialEq, Eq)] +pub(crate) struct ScheduleIteratorTimeZoneQuery { + time_zone: ScheduleIteratorTimeZone, + cron_expression: String, +} + +impl Default for ScheduleIteratorTimeZone { + fn default() -> Self { + ScheduleIteratorTimeZone::Local + } +} + +/// The Cron-expression scheduling iterator enum. +/// There are three variants. +/// The declaration `enum` is to avoid the problems caused by generalized contagion and monomorphism. +/// +// +// Frequency -> FrequencyInner -> Task -> Slot -> Wheel .... +// Frequency or Frequency caused Task Task +// The Wheel must only exist one for delay-timer run , +// can't store two kind of task-type . +// +/// +/// The intention is to provide an api to the user to set the time zone of `ScheduleIteratorOwned` conveniently, +/// if you use a generic that wraps its type need to add this generic parameter, +/// and after the type will be inconsistent and can not be stored in the same container, +/// so use enum to avoid these problems. + +#[derive(Debug, Clone)] +pub(crate) enum DelayTimerScheduleIteratorOwned { + InnerUtcScheduleIteratorOwned(ScheduleIteratorOwned), + InnerLocalScheduleIteratorOwned(ScheduleIteratorOwned), + InnerFixedOffsetScheduleIteratorOwned(ScheduleIteratorOwned), +} + +impl DelayTimerScheduleIteratorOwned { + pub(crate) fn new( + ScheduleIteratorTimeZoneQuery { + time_zone, + ref cron_expression, + }: ScheduleIteratorTimeZoneQuery, + ) -> DelayTimerScheduleIteratorOwned { + match time_zone { + ScheduleIteratorTimeZone::Utc => { + DelayTimerScheduleIteratorOwned::InnerUtcScheduleIteratorOwned( + Schedule::from_str(cron_expression) + .unwrap() + .upcoming_owned(Utc), + ) + } + ScheduleIteratorTimeZone::Local => { + DelayTimerScheduleIteratorOwned::InnerLocalScheduleIteratorOwned( + Schedule::from_str(cron_expression) + .unwrap() + .upcoming_owned(Local), + ) + } + ScheduleIteratorTimeZone::FixedOffset(fixed_offset) => { + DelayTimerScheduleIteratorOwned::InnerFixedOffsetScheduleIteratorOwned( + Schedule::from_str(cron_expression) + .unwrap() + .upcoming_owned(fixed_offset), + ) + } + } + } + + #[inline(always)] + pub(crate) fn next(&mut self) -> i64 { + match self { + Self::InnerUtcScheduleIteratorOwned(ref mut iterator) => { + iterator.next().unwrap().timestamp() + } + Self::InnerLocalScheduleIteratorOwned(ref mut iterator) => { + iterator.next().unwrap().timestamp() + } + Self::InnerFixedOffsetScheduleIteratorOwned(ref mut iterator) => { + iterator.next().unwrap().timestamp() + } + } + } + + // Analyze expressions, get cache. + fn analyze_cron_expression( + time_zone: ScheduleIteratorTimeZone, + cron_expression: &str, + ) -> Result { + let indiscriminate_expression = cron_expression.trim_matches(' ').to_owned(); + let schedule_iterator_time_zone_query: ScheduleIteratorTimeZoneQuery = + ScheduleIteratorTimeZoneQuery { + cron_expression: indiscriminate_expression, + time_zone, + }; + + CRON_EXPRESSION_CACHE.try_with(|expression_cache| { + let mut lru_cache = expression_cache.borrow_mut(); + if let Some(schedule_iterator) = lru_cache.get(&schedule_iterator_time_zone_query) { + return schedule_iterator.clone(); + } + let task_schedule = + DelayTimerScheduleIteratorOwned::new(schedule_iterator_time_zone_query.clone()); + lru_cache.put(schedule_iterator_time_zone_query, task_schedule.clone()); + task_schedule + }) + } } impl FrequencyInner { @@ -85,8 +203,8 @@ impl FrequencyInner { fn next_alarm_timestamp(&mut self) -> i64 { //TODO:handle error match self { - FrequencyInner::CountDown(_, ref mut clock) => clock.next().unwrap().timestamp(), - FrequencyInner::Repeated(ref mut clock) => clock.next().unwrap().timestamp(), + FrequencyInner::CountDown(_, ref mut clock) => clock.next(), + FrequencyInner::Repeated(ref mut clock) => clock.next(), } } @@ -105,23 +223,28 @@ impl FrequencyInner { } } +//TODO: Support customer time-zore. #[derive(Debug, Default, Copy, Clone)] -///Cycle plan task builder. +/// Cycle plan task builder. pub struct TaskBuilder<'a> { - ///Repeat type. + /// Repeat type. frequency: Option>, - ///Task_id should unique. + /// Task_id should unique. task_id: u64, - ///Maximum execution time (optional). - /// it can be use to deadline (excution-time + maximum_running_time). - /// TODO: whether auto cancel. - /// Zip other future to auto cancel it be poll when first future-task finished. + /// Maximum execution time (optional). + /// it can be use to deadline (excution-time + maximum_running_time). maximum_running_time: Option, - ///Maximum parallel runable num (optional). + /// Maximum parallel runable num (optional). maximun_parallel_runable_num: Option, + + /// If it is built by set_frequency_by_candy, set the tag separately. + build_by_candy_str: bool, + + /// Time zone for cron-expression iteration time. + schedule_iterator_time_zone: ScheduleIteratorTimeZone, } //TODO:Future tasks will support single execution (not multiple executions in the same time frame). @@ -159,7 +282,11 @@ impl TaskContext { pub async fn finishe_task(self) { if let Some(timer_event_sender) = self.timer_event_sender { timer_event_sender - .send(TimerEvent::FinishTask(self.task_id, self.record_id)) + .send(TimerEvent::FinishTask( + self.task_id, + self.record_id, + get_timestamp(), + )) .await .unwrap(); } @@ -187,6 +314,7 @@ pub struct Task { /// Loop the line and check how many more clock cycles it will take to execute it. cylinder_line: u64, /// Validity. + /// Any `Task` can set `valid` for that stop. valid: bool, /// Maximum parallel runable num (optional). pub(crate) maximun_parallel_runable_num: Option, @@ -209,21 +337,36 @@ impl<'a> TaskBuilder<'a> { } /// Set task Frequency by customized CandyCronStr. + /// In order to build a high-performance, + /// highly reusable `TaskBuilder` that maintains the Copy feature . + /// when supporting building from CandyCronStr , + /// here actively leaks memory for create a str-slice (because str-slice support Copy, String does not) + /// We need to call `free` manually before `TaskBuilder` drop or before we leave the scope. + /// + /// Explain: + /// Explicitly implementing both `Drop` and `Copy` trait on a type is currently + /// disallowed. This feature can make some sense in theory, but the current + /// implementation is incorrect and can lead to memory unsafety (see + /// [issue #20126][iss20126]), so it has been disabled for now. + #[inline(always)] pub fn set_frequency_by_candy>( &mut self, frequency: CandyFrequency, ) -> &mut Self { + self.build_by_candy_str = true; + let frequency = match frequency { CandyFrequency::Once(candy_cron_middle_str) => { - Frequency::Once(candy_cron_middle_str.into().0) + Frequency::Once(Box::leak(candy_cron_middle_str.into().0.into_boxed_str())) } CandyFrequency::Repeated(candy_cron_middle_str) => { - Frequency::Repeated(candy_cron_middle_str.into().0) - } - CandyFrequency::CountDown(exec_count, candy_cron_middle_str) => { - Frequency::CountDown(exec_count, candy_cron_middle_str.into().0) + Frequency::Repeated(Box::leak(candy_cron_middle_str.into().0.into_boxed_str())) } + CandyFrequency::CountDown(exec_count, candy_cron_middle_str) => Frequency::CountDown( + exec_count, + Box::leak(candy_cron_middle_str.into().0.into_boxed_str()), + ), }; self.frequency = Some(frequency); @@ -253,6 +396,17 @@ impl<'a> TaskBuilder<'a> { self.maximun_parallel_runable_num = Some(maximun_parallel_runable_num); self } + + /// Set time zone for cron-expression iteration time. + #[inline(always)] + pub fn set_schedule_iterator_time_zone( + &mut self, + schedule_iterator_time_zone: ScheduleIteratorTimeZone, + ) -> &mut Self { + self.schedule_iterator_time_zone = schedule_iterator_time_zone; + self + } + /// Spawn a task. pub fn spawn(self, body: F) -> Result where @@ -269,7 +423,10 @@ impl<'a> TaskBuilder<'a> { } }; - let taskschedule = Self::analyze_cron_expression(expression_str)?; + let taskschedule = DelayTimerScheduleIteratorOwned::analyze_cron_expression( + self.schedule_iterator_time_zone, + expression_str, + )?; // Building TaskFrequencyInner patterns based on repetition types. frequency_inner = match repeat_type { @@ -290,23 +447,28 @@ impl<'a> TaskBuilder<'a> { }) } - // Analyze expressions, get cache. - fn analyze_cron_expression( - cron_expression: &str, - ) -> Result, AccessError> { - let indiscriminate_expression = cron_expression.trim_matches(' ').to_owned(); - - CRON_EXPRESSION_CACHE.try_with(|expression_cache| { - let mut lru_cache = expression_cache.borrow_mut(); - if let Some(schedule_iterator) = lru_cache.get(&indiscriminate_expression) { - return schedule_iterator.clone(); + /// If we call set_frequency_by_candy explicitly and generate TaskBuilder, + /// We need to call `free` manually before `TaskBuilder` drop or before we leave the scope. + /// + /// Explain: + /// Explicitly implementing both `Drop` and `Copy` trait on a type is currently + /// disallowed. This feature can make some sense in theory, but the current + /// implementation is incorrect and can lead to memory unsafety (see + /// [issue #20126][iss20126]), so it has been disabled for now. + + /// So I can't go through Drop and handle these automatically. + pub fn free(&mut self) { + if self.build_by_candy_str { + if let Some(frequency) = self.frequency { + let s = match frequency { + Frequency::Once(s) => s, + Frequency::Repeated(s) => s, + Frequency::CountDown(_, s) => s, + }; + + as From<&'_ str>>::from(s); } - let taskschedule = Schedule::from_str(&indiscriminate_expression) - .unwrap() - .upcoming_owned(Utc); - lru_cache.put(indiscriminate_expression, taskschedule.clone()); - taskschedule - }) + } } } diff --git a/src/timer/timer_core.rs b/src/timer/timer_core.rs index 7f77a59..97dae5f 100644 --- a/src/timer/timer_core.rs +++ b/src/timer/timer_core.rs @@ -85,16 +85,23 @@ cfg_tokio_support!( struct SmolClock { inner: smolTimer, period: Duration, + offset: Instant, } impl SmolClock { pub fn new(start: Instant, period: Duration) -> Self { - let inner = smolTimer::at(start + period); - SmolClock { inner, period } + let offset = start + period; + let inner = smolTimer::at(offset); + SmolClock { + inner, + period, + offset, + } } pub async fn tick(&mut self) { - let new_inner = smolTimer::after(self.period); + self.offset += self.period; + let new_inner = smolTimer::at(self.offset); replace(&mut self.inner, new_inner).await; } } @@ -106,7 +113,8 @@ pub enum TimerEvent { AddTask(Box), RemoveTask(u64), CancelTask(u64, i64), - FinishTask(u64, i64), + // 结构化, 增加开始时间 + FinishTask(u64, i64, u64), AppendTaskHandle(u64, DelayTaskHandlerBox), } #[derive(Clone)] diff --git a/src/utils/convenience.rs b/src/utils/convenience.rs index d5c945c..9cf1373 100644 --- a/src/utils/convenience.rs +++ b/src/utils/convenience.rs @@ -107,11 +107,15 @@ pub mod functions { pub mod cron_expression_grammatical_candy { use std::ops::Deref; - #[derive(Debug, Copy, Clone)] - pub struct CandyCronStr(pub &'static str); + #[derive(Debug, Clone)] + // Here, for the convenience of the user to create CandyCronStr, + // it is the internal type of CandyCronStr that from &'static str is changed to String, + // so that the user can construct CandyCronStr according to the indefinite conditions of the runtime. + // For: https://github.com/BinChengZhao/delay-timer/issues/4 + pub struct CandyCronStr(pub String); impl Deref for CandyCronStr { - type Target = &'static str; + type Target = str; fn deref(&self) -> &Self::Target { &self.0 @@ -133,13 +137,13 @@ pub mod cron_expression_grammatical_candy { impl Into for CandyCron { fn into(self) -> CandyCronStr { match self { - Secondly => CandyCronStr("@secondly"), - Minutely => CandyCronStr("@minutely"), - Hourly => CandyCronStr("@hourly"), - Daily => CandyCronStr("@daily"), - Weekly => CandyCronStr("@weekly"), - Monthly => CandyCronStr("@monthly"), - Yearly => CandyCronStr("@yearly"), + Secondly => CandyCronStr(String::from("@secondly")), + Minutely => CandyCronStr(String::from("@minutely")), + Hourly => CandyCronStr(String::from("@hourly")), + Daily => CandyCronStr(String::from("@daily")), + Weekly => CandyCronStr(String::from("@weekly")), + Monthly => CandyCronStr(String::from("@monthly")), + Yearly => CandyCronStr(String::from("@yearly")), } } } @@ -174,15 +178,15 @@ mod tests { fn test_cron_candy() { use super::cron_expression_grammatical_candy::{CandyCron, CandyCronStr}; - let mut s: &'static str; + let mut s: String; s = >::into(CandyCron::Daily).0; assert_eq!(s, "@daily"); - s = *>::into(CandyCron::Yearly); + s = >::into(CandyCron::Yearly).0; assert_eq!(s, "@yearly"); - s = *>::into(CandyCron::Secondly); + s = >::into(CandyCron::Secondly).0; assert_eq!(s, "@secondly"); } @@ -191,6 +195,7 @@ mod tests { fn test_customization_cron_candy() { use super::cron_expression_grammatical_candy::CandyCronStr; use std::convert::Into; + use std::ops::Deref; struct CustomizationCandyCron(i32); @@ -201,19 +206,22 @@ mod tests { 1 => "0 59 23 18 11 3 2100", _ => "* * * * * * *", }; - CandyCronStr(s) + CandyCronStr(s.to_owned()) } } let mut candy_cron_str: CandyCronStr; candy_cron_str = CustomizationCandyCron(0).into(); - debug_assert_eq!(*candy_cron_str, "1 1 1 1 1 1 1"); + debug_assert_eq!( + ::deref(&candy_cron_str), + "1 1 1 1 1 1 1" + ); candy_cron_str = CustomizationCandyCron(1).into(); - debug_assert_eq!(*candy_cron_str, "0 59 23 18 11 3 2100"); + debug_assert_eq!(candy_cron_str.deref(), "0 59 23 18 11 3 2100"); candy_cron_str = CustomizationCandyCron(999).into(); - debug_assert_eq!(*candy_cron_str, "* * * * * * *"); + debug_assert_eq!(&*candy_cron_str, "* * * * * * *"); } } diff --git a/src/utils/parse.rs b/src/utils/parse.rs index 623ec78..2ecf7b9 100644 --- a/src/utils/parse.rs +++ b/src/utils/parse.rs @@ -135,16 +135,32 @@ pub mod shell_command { // I should give Option> //By Option(Some(Result)), determine if there is an output stdio.. //By Result(OK(t)), determine if there is success open file. + #[cfg(not(SPLIT_INCLUSIVE_COMPATIBLE))] fn _has_redirect_file(command: &str) -> Option> { - let angle_bracket; - - if command.contains(">>") { - angle_bracket = ">>"; + let angle_bracket = if command.contains(">>") { + ">>" } else if command.contains('>') { - angle_bracket = ">"; + ">" } else { return None; + }; + + if let Some(filename) = command.trim().rsplit(angle_bracket).next() { + Some(create_stdio_file(angle_bracket, filename)) + } else { + None } + } + + #[cfg(SPLIT_INCLUSIVE_COMPATIBLE)] + fn _has_redirect_file(command: &str) -> Option> { + let angle_bracket = if command.contains(">>") { + ">>" + } else if command.contains('>') { + ">" + } else { + return None; + }; let mut sub_command_inner = command.trim().split_inclusive(angle_bracket).rev(); if let Some(filename) = sub_command_inner.next() { diff --git a/src/utils/status_report.rs b/src/utils/status_report.rs index 5348753..6bdf41d 100644 --- a/src/utils/status_report.rs +++ b/src/utils/status_report.rs @@ -39,7 +39,7 @@ impl TryFrom<&TimerEvent> for PublicEvent { TimerEvent::AppendTaskHandle(_, delay_task_handler_box) => { Ok(PublicEvent::RunningTask(delay_task_handler_box.get_task_id(), delay_task_handler_box.get_record_id())) } - TimerEvent::FinishTask(task_id, record_id) => { + TimerEvent::FinishTask(task_id, record_id, _) => { Ok(PublicEvent::FinishTask(*task_id, *record_id)) } _ => Err("PublicEvent only accepts timer_event some variant( RemoveTask, CancelTask ,FinishTask )!"), diff --git a/tests/simulation.rs b/tests/simulation.rs index de10748..4e4f508 100644 --- a/tests/simulation.rs +++ b/tests/simulation.rs @@ -42,13 +42,13 @@ fn go_works() { for _ in 0..3 { debug_assert_eq!(i, share_num.load(Acquire)); - park_timeout(Duration::from_micros(park_time + 200_000)); + park_timeout(Duration::from_micros(park_time + 280_000)); //Testing, whether the mission is performing as expected. i = i + 1; // Coordinates the inner-Runtime with the external(test-thread) clock.(200_000 is a buffer.) - next_exec_time = schedule_itertor.next().unwrap().timestamp_millis() as u128 * 1000; + next_exec_time = dbg!(schedule_itertor.next()).unwrap().timestamp_millis() as u128 * 1000; current_time = get_timestamp_micros(); park_time = next_exec_time .checked_sub(current_time)