diff --git a/training-slides/src/SUMMARY.md b/training-slides/src/SUMMARY.md index 967ced49..bb027b5c 100644 --- a/training-slides/src/SUMMARY.md +++ b/training-slides/src/SUMMARY.md @@ -1,3 +1,4 @@ + # Summary [Start Here](./start_here.md) @@ -56,6 +57,11 @@ Topics that go beyond [Applied Rust](#applied-rust). * [Using Types to encode State](./type-state.md) * [WASM](./wasm.md) +## Under development + +* [Deconstructing Send, Arc, and Mutex](./deconstructing-send-arc-mutex.md) +* [Deconstructing thread::scope](./deconstructing-thread-scope.md) + # No-Std Rust Rust for the Linux Kernel and other no-std environments with an pre-existing C API. Requires [Applied Rust](#applied-rust). @@ -64,12 +70,14 @@ Rust for the Linux Kernel and other no-std environments with an pre-existing C A * [Foreign Function Interface](./ffi.md) * [Working with Nightly](./working-with-nighly.md) + # Bare-Metal Rust @@ -82,15 +90,19 @@ Topics about using Rust on ARM Cortex-M Microcontrollers (and similar). Requires * [The Embedded HAL and its implementations](./embedded-hals.md) * [Board Support Crates](./board-support.md) + # Ferrocene Topics around [Ferrocene](https://ferrous-systems.com/ferrocene/), the qualified toolchain for writing safety-critical systems in Rust. + diff --git a/training-slides/src/deconstructing-send-arc-mutex.md b/training-slides/src/deconstructing-send-arc-mutex.md new file mode 100644 index 00000000..eb09e715 --- /dev/null +++ b/training-slides/src/deconstructing-send-arc-mutex.md @@ -0,0 +1,400 @@ + +# Deconstructing Send, Arc, and Mutex + +## `thread::spawn` Function + +```rust ignore +pub fn spawn(f: F) -> JoinHandle +where + F: FnOnce() -> T, + F: Send + 'static, + T: Send + 'static, +{ + // ... +} +``` + +## Quick Primer on Rust Closures + +* 3 categories of data + * data the closure *closes over* / *captures*: **Upvars** + * convenient compiler terminology + * not represented by closure type signature + * parameters + * returned value + +```rust ignore +let upper_threshold = 20; +let outliers: Vec<_> = data.iter().copied().filter(|n| -> bool { + // `n` is a parameter, `upper_threshold` is an *upvar* + n >= upper_threshold +}).collect(); +``` + +## Spawn closure type + +* `F: FnOnce() -> T` + * closure doesn't accept any parameters + * closure can *consume upvars* ("FnOnce") +* `F: Send + 'static` + * applies to *upvars* +* `T: Send + 'static` + * applies to returned value + +## `T: 'static` + +Two allowable options: + +* the type doesn't have any references inside ("Owned data") + * `struct User { name: String }` +* the references inside the type are `'static` + * `struct Db { connection_string: &'static str }` + +## Why `F: 'static` and `T: 'static`? + +* applies to data passed from parent thread to child thread or vice-versa +* prevents passing references to local variables + * one thread can finish before the other and such references may become invalid + * `+ 'static` avoids this by ensuring any references point to data that has the static lifetime (i.e. that lives forever) + +## `T: Send` + +`pub unsafe auto trait Send { }` + +* `auto` means all types get this trait automatically + * opt-out instead of opt-in +* various types in standard library implement `Send` or `!Send` +* `unsafe` means you have to put `unsafe` keyword in front of `impl` when implementing `Send` or `!Send` + * precautionary measure + +## Why would one implement `Send` or `!Send` + +* Rust pointers (`*const T`, `*mut T`, `NonNull`) are `!Send` + * Use-case: what if the pointer comes from FFI library that assumes that all functions using this pointer are called from the same thread? +* `Arc` has a `NonNull<..>` inside and becomes `!Send` automatically + * to override this behavior `Arc` explicitly implements `Send` + +## `Send` in `thread::spawn` Function + +`F: Send` and `T: Send` means that all data traveling from the parent thread to child thread has to be marked as `Send` + +* Rust compiler has no inherent knowledge of threads, but the use of marker traits and lifetime annotations let the type / borrow checker prevent data race errors + +## Sharing data between threads + +## Example: Message Log for TCP Echo Server + +```rust ignore +use std::{ + io::{self, BufRead as _, Write as _}, + net, thread, +}; + +fn handle_client(stream: net::TcpStream) -> Result<(), io::Error> { + let mut writer = io::BufWriter::new(&stream); + let reader = io::BufReader::new(&stream); + for line in reader.lines() { + let line = line?; + writeln!(writer, "{}", line)?; + writer.flush()?; + } + Ok(()) +} + +fn main() -> Result<(), io::Error> { + let listener = net::TcpListener::bind("0.0.0.0:7878")?; + + for stream in listener.incoming() { + let stream = stream?; + thread::spawn(|| { + let _ = handle_client(stream); + }); + } + Ok(()) +} +``` + +## Task + +* create a log of lengths of all lines coming from all streams +* `let mut log = Vec::::new();` +* `log.push(line.len());` + +## "Dream" API + +```rust ignore +fn handle_client(stream: net::TcpStream, log: &mut Vec) -> Result<(), io::Error> { + // ... + for line in ... { + log.push(line.len()); + // ... + } + Ok(()) +} + +fn main() -> Result<(), io::Error> { + let mut log = vec![]; + + for stream in listener.incoming() { + // ... + thread::spawn(|| { + let _ = handle_client(stream, &mut log); + }); + } + Ok(()) +} +``` + +## Errors + +
+error[E0373]: closure may outlive the current function, but it borrows `log`,
+which is owned by the current function
+  --> src/main.rs:26:23
+   |
+26 |         thread::spawn(|| {
+   |                       ^^ may outlive borrowed value `log`
+27 |             let _ = handle_client(stream, &mut log);
+   |                                                --- `log` is borrowed here
+   |
+note: function requires argument type to outlive `'static`
+
+ +## Life-time problem + +Problem: + +* local data may be cleaned up prematurely + +Solution: + +* move the decision when to clean the data from compile-time to runtime + * use reference-counting + +## Attempt 1: `Rc` + +* `let mut log = Rc::new(vec![]);` +* `let mut thread_log = log.clone()` now does't clone the data, but simply increases the reference count + * both variables now have *owned* type, and satisfy `F: 'static` requirement + +```text +error[E0277]: `Rc>` cannot be sent between threads safely +``` + +## `Rc` in Rust Standard Library + +* uses `usize` for reference counting +* explicitly marked as `!Send` + +```rust ignore +pub struct Rc { + ptr: NonNull>, +} + +impl !Send for Rc {} + +struct RcBox { + strong: Cell, + weak: Cell, + value: T, +} +``` + +## `Arc` in Rust Standard Library + +* uses `AtomicUsize` for reference counting +* explicitly marked as `Send` + +```rust ignore +pub struct Arc { + ptr: NonNull>, +} + +impl Send for Arc {} + +struct ArcInner { + strong: atomic::AtomicUsize, + weak: atomic::AtomicUsize, + data: T, +} +``` + +## `Rc` vs `Arc` + +* `Arc` uses `AtomicUsize` for reference counting + * slower + * safe to increment / decrement from multiple threads +* With the help of marker trait `Send` and trait bounds on `thread::spawn`, the compiler *forces* you to use the correct type + +## `Arc` / `Rc` "transparency" + +```rust ignore +let mut log = Arc::new(Vec::new()); +// how does this code work? +log.len(); +// and why doesn't this work? +log.push(1); +``` + +## `Deref` and `DerefMut` traits + +```rust ignore +pub trait Deref { + type Target: ?Sized; + fn deref(&self) -> &Self::Target; +} + +pub trait DerefMut: Deref { + fn deref_mut(&mut self) -> &mut Self::Target; +} +``` + +## `Deref` coercions + +* `Deref` can convert a `&self` reference to a reference of another type + * conversion function call can be inserted by the compiler for you automatically + * in most cases the conversion is a no-op or a fixed pointer offset + * deref functions can be inlined +* `Target` is an associated type + * can't `deref()` into multiple different types +* `DerefMut: Deref` allows the later to reuse the same `Target` type + * read-only and read-write references coerce to the references of the same type + +## `Arc` / `Rc` "transparency" with `Deref` + +```rust ignore +let mut log = Arc::new(Vec::new()); +// Arc implements `Deref` from `&Arc into `&T` +log.len(); +// the same as +Vec::len( as Deref>::deref(&log)); + +// Arc DOES NOT implement `DerefMut` +// log.push(1); + +// the line above would have expanded to: +// Vec::push( as DerefMut>::deref_mut(&mut log), 1); +``` + +## `Arc` and mutability + +* lack of `impl DerefMut for Arc` prevents accidental creation of multiple `&mut` to underlying data +* the solution is to move mutability decision to runtime + +```rust ignore +let log = Arc::new(Mutex::new(Vec::new())); +``` +

 

+ +* `Arc` guarantees availability of data in memory + * prevents memory form being cleaned up prematurely +* `Mutex` guarantees exclusivity of mutable access + * provides a single `&mut` to underlying data simultaneously + +## `Mutex` in Action + +* `log` is passed as `&` and is `deref`-ed from `Arc` by the compiler +* `mut`ability is localized to a local `guard` variable + * `Mutex::lock` method takes `&self` +* `MutexGuard` implements `Deref` *and* `DerefMut`! +* `'_` lifetime annotation is needed only because guard struct has a `&Mutex` inside + +```rust ignore +fn handle_client(..., log: &Mutex>) -> ... { + for line in ... { + let mut guard: MutexGuard<'_, Vec> = log.lock().unwrap(); + guard.push(line.len()); + // line above expands to: + // Vec::push( as DerefMut>::deref_mut(&mut guard), line.len()); + writeln!(writer, "{}", line)?; + writer.flush()?; + } +} +``` + +## `Mutex` locking and unlocking + +* we `lock` the mutex for exclusive access to underlying data at runtime +* old C APIs used a pair of functions to lock and unlock the mutex +* `MutexGuard` does unlocking automatically when is dropped + * time between guard creation and drop is called *critical section* + +## Lock Poisoning + +* `MutexGuard` in its `Drop` implementation checks if it is being dropped normally or during a `panic` unwind + * in later case sets a poison flag on the mutex +* calling `lock().unwrap()` on a poisoned Mutex causes `panic` + * if the mutex is *"popular"* poisoning can cause many application threads to panic, too. +* poisoning API is *problematic* + * `PoisonError` doesn't provide information about the panic that caused the poisoning + * no way to recover and revive the mutex (stays poisoned forever) + * `PoisonError::into_inner` *can* produce a guard even for poisoned mutexes + +## Critical Section "Hygiene" + +* keep it short to reduce the window when mutex is locked +* avoid calling functions that can panic +* using a named variable for Mutex guard helps avoiding unexpected temporary lifetime behavior + +```rust ignore +fn handle_client(..., log: &Mutex>) -> ... { + for line in ... { + { + let mut guard: MutexGuard<'_, Vec> = log.lock().unwrap(); + guard.push(line.len()); + } // critical section ends here, before all the IO + writeln!(writer, "{}", line)?; + writer.flush()?; + } +} +``` +

 

+ +* `drop(guard)` also works, but extra block nicely highlights the critical section + +## Lessons Learned + +* careful use of traits and trait boundaries lets the compiler detect problematic multi-threading code at compile time +* `Arc` and `Mutex` let the program ensure data availability and exclusive mutability at runtime where the compiler can't predict the behavior of the program +* `Deref` coercions make concurrency primitives virtually invisible and transparent to use +* **Make invalid state unrepresentable** + +## Full Example + +```rust ignore +use std::{ + io::{self, BufRead as _, Write as _}, + net, + sync::{Arc, Mutex}, + thread, +}; + +fn handle_client(stream: net::TcpStream, log: &Mutex>) -> Result<(), io::Error> { + let mut writer = io::BufWriter::new(&stream); + let reader = io::BufReader::new(&stream); + for line in reader.lines() { + let line = line?; + { + let mut guard = log.lock().unwrap(); + guard.push(line.len()); + } + writeln!(writer, "{}", line)?; + writer.flush()?; + } + Ok(()) +} + +fn main() -> Result<(), io::Error> { + let log = Arc::new(Mutex::new(vec![])); + let listener = net::TcpListener::bind("0.0.0.0:7878")?; + + for stream in listener.incoming() { + let stream = stream?; + let thread_log = log.clone(); + thread::spawn(move || { + let _ = handle_client(stream, &thread_log); + }); + } + Ok(()) +} +``` diff --git a/training-slides/src/deconstructing-thread-scope.md b/training-slides/src/deconstructing-thread-scope.md new file mode 100644 index 00000000..7d1311bc --- /dev/null +++ b/training-slides/src/deconstructing-thread-scope.md @@ -0,0 +1,283 @@ +# Deconstructing `thread::scope` + +## `thread::scope` Example + +```rust +fn main() { + let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let (left, right) = data.split_at(data.len() / 2); + let (mut left_sum, mut right_sum) = (0, 0); + std::thread::scope(|s| { + s.spawn(|| { + left_sum = left.iter().sum(); + }); + s.spawn(|| { + right_sum = right.iter().sum(); + }); + }); + println!("Total: {}", left_sum + right_sum); +} +``` + +## `thread::scope` + +```rust ignore +pub fn scope<'env, F, T>(f: F) -> T +where + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T, +{ + // ... +} +``` + +we will call `f` a **Spawner** closure + +## `Scope` + +```rust ignore +pub struct Scope<'scope, 'env: 'scope> { + data: Arc, + scope: PhantomData<&'scope mut &'scope ()>, + env: PhantomData<&'env mut &'env ()>, +} +``` + +## `Scope::spawn` + +```rust ignore +impl<'scope, 'env> Scope<'scope, 'env> { + pub fn spawn(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> + where + F: FnOnce() -> T + Send + 'scope, + T: Send + 'scope, + { + // ... + } +} +``` + +we'll call `f` a **Thread** closure + +## Closures + +```rust +fn main() { + let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let (left, right) = data.split_at(data.len() / 2); + let (mut left_sum, mut right_sum) = (0, 0); + std::thread::scope(|s| { // <- Spawner closure + s.spawn(|| { // <- Thread closure + left_sum = left.iter().sum(); + }); + s.spawn(|| { // <- Thread closure + right_sum = right.iter().sum(); + }); + }); + println!("Total: {}", left_sum + right_sum); +} +``` + +## Meet lifetime annotations + +* `F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T` + * `'scope` is a *Higher-Rank Trait Bound* - it describes *all possible* lifetimes that closure can observe +* `fn scope<'env, F, T>(f: F) -> T` + * the function observes *some* lifetime `'env` +* `struct Scope<'scope, 'env: 'scope>` + * lifetime `'env` outlives `'scope` + +## Relationship 1 + +All possible `'scope` lifetimes for the **Spawner** closure *cannot outlive* `'env` lifetime that `scope` function observes. + +Upvars with references in **Spawner** closure *cannot outlive* data referenced by `'env` annotation + +## Thread closure + +```text +F: FnOnce() -> T + Send + 'scope, +T: Send + 'scope +``` + +* Data passed to and from the child thread + * should be `Send` + * if has references to surrounding data, they should stay valid *at least* for the whole duration of `'scope` + * `&'scope Scope` in **Spawner** signature allows nested calls to `spawn`! + +## Relationship 2 + +By the time `'scope` is over all calls to `Scope::spawn` are over and all **Thread** closures are completed. + +## Practical implications + +* **Spawner** closure can finish earlier that **Thread** closures. + * **Thread** closures can't use upvars from **Spawner** without moving them +* both **Spawner** and all **Thread** closures are completed before the call to `scope` function returns + * both can take upvars from the code before `scope()` call without moving + +## Relationships in action + +```rust ignore +fn main() { + let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let (left, right) = data.split_at(data.len() / 2); + let mut left_sum = 0; // Ok + std::thread::scope(|s| { + s.spawn(|| { + left_sum = left.iter().sum(); + }); + // ERROR: closure may outlive the current function + let mut right_sum = 0; + s.spawn(|| { + right_sum = right.iter().sum(); + }); + }); + // println!("Total: {}", left_sum + right_sum); +} +``` + +## Using `join` + +```rust +fn main() { + let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let (left, right) = data.split_at(data.len() / 2); + let mut left_sum = 0; + let right_sum = std::thread::scope(|s| { + s.spawn(|| { + left_sum = left.iter().sum(); + }); + + let handle = s.spawn(|| { + right.iter().sum() + }); + let right_sum: i32 = handle.join().unwrap(); + right_sum + }); + println!("Total: {}", left_sum + right_sum); +} +``` + +## How `scope` function waits for all threads to finish? + +```rust ignore +pub fn scope<'env, F, T>(f: F) -> T { + let scope = Scope { + data: Arc::new(ScopeData { + num_running_threads: AtomicUsize::new(0), + main_thread: current(), + a_thread_panicked: AtomicBool::new(false), + }), + // ... + }; + + let result = catch_unwind(AssertUnwindSafe(|| f(&scope))); + + // Wait until all the threads are finished. + while scope.data.num_running_threads.load(Ordering::Acquire) != 0 { + park(); + } + // ... +} +``` + +## (1) Who calls `unpark()`? + +```rust ignore +impl ScopeData { + pub(super) fn decrement_num_running_threads(&self, panic: bool) { + // ... + // fetch_sub returns the previous value + if self.num_running_threads.fetch_sub(1, Ordering::Release) == 1 { + self.main_thread.unpark(); + } + } +} +``` + +## (2) Who calls `unpark()`? + +```rust ignore +impl<'scope, T> Drop for Packet<'scope, T> { + fn drop(&mut self) { + // ... + if let Some(scope) = &self.scope { + scope.decrement_num_running_threads(/* ... */); + } + } +} +``` + +`Packet` is a mechanism to pass panics and results from a thread closure to the parent thread. + +## `thread::scope()` + +* function completion is controlled by a single atomic counter in an `Arc` +* local read-only data can be safely shared across threads +* locking is only needed for safe mutable access +* access rules checked at compile time by the type system + +`thread::spawn` - safe concurrency +`thread::scope` - safe *ergonomic* concurrency + +## When to use what? + +1. `thread::scope` is your *default* choice +2. `thread::spawn` for threads that run forever (background jobs, resource management, etc.) +3. `thread::spawn` for short-living threads that don't have a definite join point (fire and forget) + +## TCP server with `thread::spawn` + +```rust ignore +fn handle_client(stream: net::TcpStream, log: &Mutex>) -> Result<(), io::Error> { + // ... +} + +fn main() -> Result<(), io::Error> { + // Need an Arc to control resource availability at runtime + let log = Arc::new(Mutex::new(vec![])); + let listener = net::TcpListener::bind("0.0.0.0:7878")?; + + for stream in listener.incoming() { + let stream = stream?; + // `Arc`s need explicit cloning + let thread_log = log.clone(); + thread::spawn(move || { + let _ = handle_client(stream, &thread_log); + }); + } + Ok(()) +} +``` + +## TCP server with `thread::scope` + +```rust ignore +fn handle_client(stream: net::TcpStream, log: &Mutex>) -> Result<(), io::Error> { + // ... +} + +fn main() -> Result<(), io::Error> { + // The compiler can deduce the availability at compile time + // No need for runtime reference counting + let log = Mutex::new(vec![]); + let listener = net::TcpListener::bind("0.0.0.0:7878")?; + + thread::scope(|s| { + for stream in listener.incoming() { + let stream = stream?; + // resources are shareable as is + s.spawn(|| { + let _ = handle_client(stream, &log); + }); + } + Ok(()) + }) +} +``` + +## Further Research + +* returned value propagation from child threads to the parent thread +* panic propagation +* covariance `scope: PhantomData<&'scope mut &'scope ()>`