Skip to content

Commit 146a38f

Browse files
committed
Add 2024 challenge 03
1 parent 637c651 commit 146a38f

File tree

5 files changed

+309
-0
lines changed

5 files changed

+309
-0
lines changed

2024/challenge_03/.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/target
2+
/Cargo.lock

2024/challenge_03/Cargo.toml

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[package]
2+
name = "solution"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
8+
[dev-dependencies]
9+
futures = "0.3.31"

2024/challenge_03/README.md

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
Имплементирайте async-aware мутекс.
2+
3+
Това изисква скок зад кулисите на async/await машинарията, защото ще трябва да се имплементира един `Future` на ръка. Но от друга страна е добро упражнение, ако човек иска да добие интуиция как работят future-ите в езика.
4+
5+
За по-лесно, ще искаме въпросния мутекс да не е thread safe, т.е. да може да се използва само от single threaded runtime.
6+
7+
```rust
8+
use std::cell::RefCell;
9+
use std::future::Future;
10+
use std::ops::{Deref, DerefMut};
11+
use std::pin::Pin;
12+
use std::task::{Context, Poll};
13+
14+
/// Неблокиращ мутекс, предназначен да се използва от асинхронен код.
15+
pub struct MyMutex<T> {
16+
value: RefCell<T>,
17+
/* todo: other fields */
18+
}
19+
20+
impl<T> MyMutex<T> {
21+
pub fn new(value: T) -> Self {
22+
todo!()
23+
}
24+
25+
// Забележете, че `lock` не е маркирана като `async fn`, защото си имплементираме future-а
26+
// на ръка (тук компилатора няма как да ни помогне).
27+
//
28+
// Бихме могли да я декларираме `fn lock() -> impl Future<Output = MyMytexGuard<'_, T>>`,
29+
// ако искаме да не правим структурата публична, но пак ще трябва да си напишем и върнем
30+
// наша структура.
31+
pub fn lock(&self) -> MyMutexLockFuture<'_, T> {
32+
todo!()
33+
}
34+
35+
fn unlock(&self) {
36+
todo!()
37+
}
38+
}
39+
40+
pub struct MyMutexLockFuture<'a, T> {
41+
/* todo: fields */
42+
}
43+
44+
impl<'a, T> Future for MyMutexLockFuture<'a, T> {
45+
type Output = MyMutexGuard<'a, T>;
46+
47+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
48+
todo!()
49+
}
50+
}
51+
52+
pub struct MyMutexGuard<'a, T> {
53+
/* todo: fields */
54+
}
55+
56+
impl<'a, T> Deref for MyMutexGuard<'a, T> {
57+
type Target = T;
58+
fn deref(&self) -> &Self::Target {
59+
todo!()
60+
}
61+
}
62+
63+
impl<'a, T> DerefMut for MyMutexGuard<'a, T> {
64+
fn deref_mut(&mut self) -> &mut Self::Target {
65+
todo!()
66+
}
67+
}
68+
69+
impl<'a, T> Drop for MyMutexGuard<'a, T> {
70+
fn drop(&mut self) {
71+
// hint: извикайте MyMytex::unlock
72+
todo!()
73+
}
74+
}
75+
```
76+
77+
Зашо използваме `RefCell` за `value`? Защото ни дава всичката необходима функционалност. За да имплементираме мутекса е нужно да можем от `&MyMutex<T>` да вземем `&mut T`, за което е нужен някакъв вид internal mutability.
78+
Бихме могли да използваме `std::sync::Mutex`, но няма да го използваме пълноценно. Или дори `UnsafeCell`, но това изисква unsafe код и просто ще преимплементираме RefCell (но би имало смисъл, ако правим thread safe вариант).
79+
80+
Изискването за мутекса е когато две задачи (task-а) се опитат да го заключат едновременно, т.е. да извикат `my_mutex.lock().await`, едната задача ще получи `MyMutexGuard` и ще продължи изпълнението си, докато другата ще бъде "блокирана" и ще трябва да изчака, докато мутекса не се освободи.
81+
За да се получи това, при poll-ването на future-а, върнат от `my_mutex.lock()`, във втората задача трябва да се върне `Poll::Pending`, което означава, че задачата за момента не може да продължи работата си. Съответно async runtime-а повече няма да schedule-ва тази задача, но е свободен да изпълнява други задачи. Когато обаче мутекса се освободи, runtime-а трябва да бъде уведомен, че втората задача вече може да направи прогрес. За целта трябва да се вземе `Waker` обекта за съответната задача, който може да бъде получен от `Context` параметъра на `poll`, и да се запази до момента, в който задачата трябва да бъде събудена.
82+
83+
Целия тест можете да намерите на <https://github.com/fmi/rust-homework>.
84+
Внимание - теста използва `futures` библиотеката, но тя е достъпна само за тестовете, но не и за решението (добавена е като dev-dependency). В решението не можете да използвате външни библиотеки - но не са ви и нужни.

2024/challenge_03/src/lib.rs

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use std::cell::RefCell;
2+
use std::future::Future;
3+
use std::ops::{Deref, DerefMut};
4+
use std::pin::Pin;
5+
use std::task::{Context, Poll};
6+
7+
/// Неблокиращ мутекс, предназначен да се използва от асинхронен код.
8+
pub struct MyMutex<T> {
9+
value: RefCell<T>,
10+
/* todo: other fields */
11+
}
12+
13+
impl<T> MyMutex<T> {
14+
pub fn new(value: T) -> Self {
15+
todo!()
16+
}
17+
18+
// Забележете, че `lock` не е маркирана като `async fn`, защото си имплементираме future-а
19+
// на ръка (тук компилатора няма как да ни помогне).
20+
//
21+
// Бихме могли да я декларираме `fn lock() -> impl Future<Output = MyMytexGuard<'_, T>>`,
22+
// ако искаме да не правим структурата публична, но пак ще трябва да си напишем и върнем
23+
// наша структура.
24+
pub fn lock(&self) -> MyMutexLockFuture<'_, T> {
25+
todo!()
26+
}
27+
28+
fn unlock(&self) {
29+
todo!()
30+
}
31+
}
32+
33+
pub struct MyMutexLockFuture<'a, T> {
34+
/* todo: fields */
35+
}
36+
37+
impl<'a, T> Future for MyMutexLockFuture<'a, T> {
38+
type Output = MyMutexGuard<'a, T>;
39+
40+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
41+
todo!()
42+
}
43+
}
44+
45+
pub struct MyMutexGuard<'a, T> {
46+
/* todo: fields */
47+
}
48+
49+
impl<'a, T> Deref for MyMutexGuard<'a, T> {
50+
type Target = T;
51+
fn deref(&self) -> &Self::Target {
52+
todo!()
53+
}
54+
}
55+
56+
impl<'a, T> DerefMut for MyMutexGuard<'a, T> {
57+
fn deref_mut(&mut self) -> &mut Self::Target {
58+
todo!()
59+
}
60+
}
61+
62+
impl<'a, T> Drop for MyMutexGuard<'a, T> {
63+
fn drop(&mut self) {
64+
// hint: извикайте MyMytex::unlock
65+
todo!()
66+
}
67+
}
68+

2024/challenge_03/tests/test_full.rs

+146
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
// Warning: the test uses the `futures` library.
2+
// This is available only to the testing code (it is a dev-dependency),
3+
// it is not available to the solution code.
4+
5+
use std::future::Future;
6+
use std::pin::Pin;
7+
use std::sync::Arc;
8+
9+
use futures::{SinkExt, StreamExt};
10+
use solution::*;
11+
12+
#[test]
13+
fn test_futures_join() {
14+
run_with_timeout(1000, || async {
15+
let resource = Arc::new(MyMutex::new(vec![]));
16+
let (mut sender_3to1, mut recv_3to1) = futures::channel::mpsc::channel(1);
17+
18+
let fut1 = {
19+
// A future that locks the resource, then holds the lock across an
20+
// await point (awaiting the next message from the channel).
21+
//
22+
// With an async-aware mutex this is not a problem - while `fut1` is blocked,
23+
// `fut2` and `fut3` can still run. So eventually `fut3` will send the message
24+
// and unblock `fut1`.
25+
let resource = Arc::clone(&resource);
26+
async move {
27+
let mut lock = resource.lock().await;
28+
let () = recv_3to1.next().await.unwrap();
29+
lock.push("one".to_string());
30+
}
31+
};
32+
33+
let fut2 = {
34+
let resource = Arc::clone(&resource);
35+
async move {
36+
let mut lock = resource.lock().await;
37+
lock.push("two".to_string());
38+
}
39+
};
40+
41+
let fut3 = {
42+
let resource = Arc::clone(&resource);
43+
async move {
44+
sender_3to1.send(()).await.unwrap();
45+
let mut lock = resource.lock().await;
46+
lock.push("three".to_string());
47+
}
48+
};
49+
50+
// `join` polls the futures in order.
51+
//
52+
// Also `join` provides a single `Waker`, which just wakes the `Join3` future,
53+
// which every time polls each of the inner futures in order.
54+
// So using any waker will "wake" all three futures.
55+
futures::future::join3(fut1, fut2, fut3).await;
56+
57+
assert_eq!(&*resource.lock().await, &["one", "two", "three"]);
58+
});
59+
}
60+
61+
#[test]
62+
fn test_futures_unordered() {
63+
run_with_timeout(1000, || async {
64+
let resource = Arc::new(MyMutex::new(vec![]));
65+
let (mut sender_3to1, mut recv_3to1) = futures::channel::mpsc::channel(1);
66+
67+
let fut1 = pin_box({
68+
let resource = Arc::clone(&resource);
69+
async move {
70+
let mut lock = resource.lock().await;
71+
let () = recv_3to1.next().await.unwrap();
72+
lock.push("one".to_string());
73+
}
74+
});
75+
76+
let fut2 = pin_box({
77+
let resource = Arc::clone(&resource);
78+
async move {
79+
let mut lock = resource.lock().await;
80+
lock.push("two".to_string());
81+
}
82+
});
83+
84+
let fut3 = pin_box({
85+
let resource = Arc::clone(&resource);
86+
async move {
87+
sender_3to1.send(()).await.unwrap();
88+
let mut lock = resource.lock().await;
89+
lock.push("three".to_string());
90+
}
91+
});
92+
93+
// Same example, but uses `FuturesUnordered` instead of `join`.
94+
//
95+
// `FuturesUnordered` doesn't guarantee any ordering.
96+
// Also it is more optimized for a large number of futures and will provide a separate
97+
// `Waker` for each of the inner futures.
98+
// So we can test that the correct wakers are being used.
99+
let mut unordered = futures::stream::FuturesUnordered::from_iter([fut1, fut2, fut3]);
100+
while let Some(_) = unordered.next().await {}
101+
102+
let mut final_resource = resource.lock().await.clone();
103+
final_resource.sort();
104+
assert_eq!(final_resource, &["one", "three", "two"]);
105+
});
106+
}
107+
108+
fn run_with_timeout<F, R>(timeout_millis: u64, test_fn: F)
109+
where
110+
F: FnOnce() -> R + Send + std::panic::UnwindSafe + 'static,
111+
R: Future<Output = ()> + 'static,
112+
{
113+
use futures::task::LocalSpawn;
114+
use std::panic::catch_unwind;
115+
use std::sync::mpsc;
116+
117+
let (sender, receiver) = mpsc::sync_channel(1);
118+
119+
std::thread::spawn(move || {
120+
let result = catch_unwind(move || {
121+
let mut runtime = futures::executor::LocalPool::new();
122+
123+
let test_future = Box::new(test_fn());
124+
runtime.spawner().spawn_local_obj(test_future.into()).unwrap();
125+
126+
runtime.run();
127+
});
128+
129+
let _ = sender.send(result);
130+
});
131+
132+
let timeout = std::time::Duration::from_millis(timeout_millis);
133+
match receiver.recv_timeout(timeout) {
134+
Ok(Ok(())) => {}
135+
Ok(Err(any)) => panic!("test panicked: {}", any.downcast::<&str>().unwrap()),
136+
Err(mpsc::RecvTimeoutError::Timeout) => panic!("test timed out"),
137+
Err(mpsc::RecvTimeoutError::Disconnected) => unreachable!(),
138+
}
139+
}
140+
141+
fn pin_box<F>(fut: F) -> Pin<Box<dyn Future<Output = ()>>>
142+
where
143+
F: Future<Output = ()> + 'static,
144+
{
145+
Box::into_pin(Box::new(fut) as Box<dyn Future<Output = ()>>)
146+
}

0 commit comments

Comments
 (0)