diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index 1cfde8b106..4a92e97662 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -19,6 +19,7 @@ pub mod once_lock; pub mod pipe; pub mod priority_channel; pub mod pubsub; +pub mod resource_pool; pub mod rwlock; pub mod semaphore; pub mod signal; diff --git a/embassy-sync/src/resource_pool.rs b/embassy-sync/src/resource_pool.rs new file mode 100644 index 0000000000..a6b1113e5d --- /dev/null +++ b/embassy-sync/src/resource_pool.rs @@ -0,0 +1,192 @@ +//! A collection of objects that may be shared between tasks. +//! +//! Multiple tasks may share a reference to the pool and acquire resources when required. +//! Acquired resources may be kept or moved between tasks before they are released. +use core::cell::RefCell; +use core::future::poll_fn; +use core::marker::PhantomData; +use core::ops::{Deref, DerefMut}; +use core::task::Poll; + +use heapless::Vec; + +use crate::blocking_mutex::Mutex; +use crate::blocking_mutex::raw::RawMutex; +use crate::waitqueue::WakerRegistration; + +/// Resource pool +pub struct ResourcePool<'a, M: RawMutex, T, const N: usize> { + buf: BufferPtr, + phantom: PhantomData<&'a mut T>, + state: Mutex>>, +} + +impl<'a, M: RawMutex, T, const N: usize> ResourcePool<'a, M, T, N> { + /// Crate a new resource pool, taking an array of resources which will be managed. + pub fn new(buf: &'a mut [T]) -> Self { + let mut available = Vec::new(); + available.extend(0..buf.len()); + Self { + buf: BufferPtr(buf.as_mut_ptr()), + phantom: PhantomData, + state: Mutex::new(RefCell::new(State { + available, + waker: WakerRegistration::new(), + })), + } + } + + /// Attempt to acquire one instance of the resource. + /// + /// If no instance is available, return None immediately. + pub fn try_take<'guard>(&'guard self) -> Option> { + self.state.lock(|state| { + let state = &mut *state.borrow_mut(); + let index = state.available.pop()?; + Some(ResourceGuard { store: self, index }) + }) + } + + /// Acquire one instance of the resource. + /// + /// If no instance is available, wait for an instance to be returned to the pool. + pub fn take<'guard>(&'guard self) -> impl Future> { + poll_fn(|cx| { + self.state.lock(|state| { + let state = &mut *state.borrow_mut(); + let Some(index) = state.available.pop() else { + state.waker.register(cx.waker()); + return Poll::Pending; + }; + Poll::Ready(ResourceGuard { store: self, index }) + }) + }) + } +} + +#[repr(transparent)] +#[derive(Debug)] +struct BufferPtr(*mut T); + +unsafe impl Send for BufferPtr {} +unsafe impl Sync for BufferPtr {} + +struct State { + available: Vec, + waker: WakerRegistration, +} + +/// Resource guard +/// +/// Owning this guard provides mutable access to the underlying resource. +/// +/// Dropping the guard returns the resource back to the pool. +pub struct ResourceGuard<'guard, 'buffer, M: RawMutex, T, const N: usize> { + store: &'guard ResourcePool<'buffer, M, T, N>, + index: usize, +} + +impl<'guard, 'buffer, M: RawMutex, T, const N: usize> Drop for ResourceGuard<'guard, 'buffer, M, T, N> { + fn drop(&mut self) { + self.store.state.lock(|state| { + let state = &mut *state.borrow_mut(); + state.available.push(self.index).unwrap(); + state.waker.wake(); + }); + } +} + +impl<'guard, 'buffer, M: RawMutex, T, const N: usize> Deref for ResourceGuard<'guard, 'buffer, M, T, N> { + type Target = T; + + fn deref(&self) -> &Self::Target { + unsafe { &*self.store.buf.0.add(self.index) } + } +} + +impl<'guard, 'buffer, M: RawMutex, T, const N: usize> DerefMut for ResourceGuard<'guard, 'buffer, M, T, N> { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { &mut *self.store.buf.0.add(self.index) } + } +} + +impl<'guard, 'buffer, M: RawMutex, T, const N: usize> ResourceGuard<'guard, 'buffer, M, T, N> { + /// maps the value contained to another, referencing the original value. Does not take "self" to avoid shadowing any functions of the wrapped type. + pub fn map( + orig: Self, + fun: impl FnOnce(&mut T) -> &mut U, + ) -> MappedResourceGuard<'guard, 'buffer, M, T, U, N> { + let store = orig.store; + let index = orig.index; + let value = fun(unsafe { &mut *store.buf.0.add(index) }); + // Don't run the `drop` method for MutexGuard. The ownership of the underlying + // locked state is being moved to the returned MappedMutexGuard. + core::mem::forget(orig); + MappedResourceGuard { + store, + value: BufferPtr(value), + index, + } + } +} + +/// Resource guard +/// +/// Owning this guard provides mutable access to the underlying resource. +/// +/// Dropping the guard returns the resource back to the pool. +pub struct MappedResourceGuard<'guard, 'buffer, M: RawMutex, T, U: ?Sized, const N: usize> { + store: &'guard ResourcePool<'buffer, M, T, N>, + index: usize, + value: BufferPtr, +} + +impl<'guard, 'buffer, M: RawMutex, T, U: ?Sized, const N: usize> Drop + for MappedResourceGuard<'guard, 'buffer, M, T, U, N> +{ + fn drop(&mut self) { + self.store.state.lock(|state| { + let state = &mut *state.borrow_mut(); + state.available.push(self.index).unwrap(); + state.waker.wake(); + }); + } +} + +impl<'guard, 'buffer, M: RawMutex, T, U: ?Sized, const N: usize> Deref + for MappedResourceGuard<'guard, 'buffer, M, T, U, N> +{ + type Target = U; + + fn deref(&self) -> &Self::Target { + unsafe { &*self.value.0 } + } +} + +impl<'guard, 'buffer, M: RawMutex, T, U: ?Sized, const N: usize> DerefMut + for MappedResourceGuard<'guard, 'buffer, M, T, U, N> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { &mut *self.value.0 } + } +} + +impl<'guard, 'buffer, M: RawMutex, T, U: ?Sized, const N: usize> MappedResourceGuard<'guard, 'buffer, M, T, U, N> { + /// maps the value contained to another, referencing the original value. Does not take "self" to avoid shadowing any functions of the wrapped type. + pub fn map( + orig: Self, + fun: impl FnOnce(&mut U) -> &mut V, + ) -> MappedResourceGuard<'guard, 'buffer, M, T, V, N> { + let store = orig.store; + let index = orig.index; + let value = fun(unsafe { &mut *orig.value.0 }); + // Don't run the `drop` method for MutexGuard. The ownership of the underlying + // locked state is being moved to the returned MappedMutexGuard. + core::mem::forget(orig); + MappedResourceGuard { + store, + value: BufferPtr(value), + index, + } + } +} diff --git a/examples/stm32g4/src/bin/resource_pool.rs b/examples/stm32g4/src/bin/resource_pool.rs new file mode 100644 index 0000000000..7c714c45f9 --- /dev/null +++ b/examples/stm32g4/src/bin/resource_pool.rs @@ -0,0 +1,87 @@ +#![no_std] +#![no_main] + +use core::fmt::Write; + +use defmt::info; +use embassy_executor::Spawner; +use embassy_sync::blocking_mutex::raw::ThreadModeRawMutex; +use embassy_sync::channel::Channel; +use embassy_sync::resource_pool::{MappedResourceGuard, ResourceGuard, ResourcePool}; +use embassy_time::Timer; +use heapless::String; +use static_cell::{ConstStaticCell, StaticCell}; +use {defmt_rtt as _, panic_probe as _}; + +const N_BUFFERS: usize = 3; +const N_BYTES: usize = 256; + +static BUFFERS: ConstStaticCell<[String; N_BUFFERS]> = + ConstStaticCell::new([String::new(), String::new(), String::new()]); + +static SHARED_CHANNEL: Channel< + ThreadModeRawMutex, + MappedResourceGuard<'static, 'static, ThreadModeRawMutex, String, str, N_BUFFERS>, + 8, +> = Channel::new(); + +#[embassy_executor::main] +async fn main(spawner: Spawner) { + let _p = embassy_stm32::init(Default::default()); + + static POOL: StaticCell, N_BUFFERS>> = StaticCell::new(); + let pool = POOL.init(ResourcePool::new(BUFFERS.take())); + + spawner.spawn(produce_data(pool, 0).unwrap()); + Timer::after_millis(100).await; + spawner.spawn(produce_data(pool, 1).unwrap()); + Timer::after_millis(100).await; + spawner.spawn(produce_data(pool, 2).unwrap()); + Timer::after_millis(100).await; + spawner.spawn(produce_data(pool, 3).unwrap()); + + info!("started producers"); + + let receiver = SHARED_CHANNEL.receiver(); + + loop { + let guard = receiver.receive().await; + + defmt::info!("received: {} at addr {}", &*guard, guard.as_ptr() as usize); + + // keep buffer for a while so it is not immediately returned to the pool + Timer::after_millis(1500).await; + + // extra verbose, this happens automatically + // core::mem::drop(guard); + } +} + +#[embassy_executor::task(pool_size = 4)] +async fn produce_data(pool: &'static ResourcePool<'static, ThreadModeRawMutex, String, N_BUFFERS>, num: u32) { + let sender = SHARED_CHANNEL.sender(); + + let mut n = 0; + loop { + Timer::after_secs(3).await; + + // acquire one buffer + let mut guard = pool.take().await; + + // write to buffer + guard.clear(); + write!(&mut *guard, "hello {} from task {}", n, num).unwrap(); + + // map + let guard = ResourceGuard::map(guard, |g| g.as_mut_str()); + + let addr = guard.as_ptr() as usize; + + // send buffer to main loop + sender.try_send(guard).ok().unwrap(); + + info!("task {} sent buffer with addr {}", num, addr); + + n += 1; + } +}