Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions embassy-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod once_lock;
pub mod pipe;
pub mod priority_channel;
pub mod pubsub;
pub mod resource_pool;
pub mod rwlock;
pub mod semaphore;
pub mod signal;
Expand Down
192 changes: 192 additions & 0 deletions embassy-sync/src/resource_pool.rs
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure to add tests & more docs

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, absolutely!

Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
//! A collection of objects that may be shared between tasks.
//!
//! Multiple tasks may share a reference to the pool and acquire resources when required.
//! Acquired resources may be kept or moved between tasks before they are released.
use core::cell::RefCell;
use core::future::poll_fn;
use core::marker::PhantomData;
use core::ops::{Deref, DerefMut};
use core::task::Poll;

use heapless::Vec;

use crate::blocking_mutex::Mutex;
use crate::blocking_mutex::raw::RawMutex;
use crate::waitqueue::WakerRegistration;

/// Resource pool
pub struct ResourcePool<'a, M: RawMutex, T, const N: usize> {
buf: BufferPtr<T>,
phantom: PhantomData<&'a mut T>,
state: Mutex<M, RefCell<State<N>>>,
}

impl<'a, M: RawMutex, T, const N: usize> ResourcePool<'a, M, T, N> {
/// Crate a new resource pool, taking an array of resources which will be managed.
pub fn new(buf: &'a mut [T]) -> Self {
let mut available = Vec::new();
available.extend(0..buf.len());
Self {
buf: BufferPtr(buf.as_mut_ptr()),
phantom: PhantomData,
state: Mutex::new(RefCell::new(State {
available,
waker: WakerRegistration::new(),
})),
}
}

/// Attempt to acquire one instance of the resource.
///
/// If no instance is available, return None immediately.
pub fn try_take<'guard>(&'guard self) -> Option<ResourceGuard<'guard, 'a, M, T, N>> {
self.state.lock(|state| {
let state = &mut *state.borrow_mut();
let index = state.available.pop()?;
Some(ResourceGuard { store: self, index })
})
}

/// Acquire one instance of the resource.
///
/// If no instance is available, wait for an instance to be returned to the pool.
pub fn take<'guard>(&'guard self) -> impl Future<Output = ResourceGuard<'guard, 'a, M, T, N>> {
poll_fn(|cx| {
self.state.lock(|state| {
let state = &mut *state.borrow_mut();
let Some(index) = state.available.pop() else {
state.waker.register(cx.waker());
return Poll::Pending;
};
Poll::Ready(ResourceGuard { store: self, index })
})
})
}
}

#[repr(transparent)]
#[derive(Debug)]
struct BufferPtr<T: ?Sized>(*mut T);

unsafe impl<T: ?Sized> Send for BufferPtr<T> {}
unsafe impl<T: ?Sized> Sync for BufferPtr<T> {}

struct State<const N: usize> {
available: Vec<usize, N>,
Comment thread
diondokter marked this conversation as resolved.
waker: WakerRegistration,
}

/// Resource guard
///
/// Owning this guard provides mutable access to the underlying resource.
///
/// Dropping the guard returns the resource back to the pool.
pub struct ResourceGuard<'guard, 'buffer, M: RawMutex, T, const N: usize> {
store: &'guard ResourcePool<'buffer, M, T, N>,
index: usize,
}

impl<'guard, 'buffer, M: RawMutex, T, const N: usize> Drop for ResourceGuard<'guard, 'buffer, M, T, N> {
fn drop(&mut self) {
self.store.state.lock(|state| {
let state = &mut *state.borrow_mut();
state.available.push(self.index).unwrap();
state.waker.wake();
});
}
}

impl<'guard, 'buffer, M: RawMutex, T, const N: usize> Deref for ResourceGuard<'guard, 'buffer, M, T, N> {
type Target = T;

fn deref(&self) -> &Self::Target {
unsafe { &*self.store.buf.0.add(self.index) }
}
}

impl<'guard, 'buffer, M: RawMutex, T, const N: usize> DerefMut for ResourceGuard<'guard, 'buffer, M, T, N> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.store.buf.0.add(self.index) }
}
}

impl<'guard, 'buffer, M: RawMutex, T, const N: usize> ResourceGuard<'guard, 'buffer, M, T, N> {
/// maps the value contained to another, referencing the original value. Does not take "self" to avoid shadowing any functions of the wrapped type.
pub fn map<U: ?Sized>(
orig: Self,
fun: impl FnOnce(&mut T) -> &mut U,
) -> MappedResourceGuard<'guard, 'buffer, M, T, U, N> {
Comment on lines +115 to +118
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a weird API to me... When would you use this?
Especially the &mut U is weird to me. Where would the U be stored? In the closure? But then you get lifetime issues.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

embassy is already doing this with MutexGuard. It allows you to "narrow-down" a reference, eg. from a slice to only a sub-slice.

The main use I see here would be taking a buffer (eg. &mut [u8; 1024]), reading some data into it and then passing on a slice (&buffer[..n_bytes_read]) to the next part of your software. This works because the rust compiler will use fat pointers for pointers to slices (eg *mut/const [u8])

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I guess. It's fine to have and the impl is fine. Just doesn't seem that useful to me

let store = orig.store;
let index = orig.index;
let value = fun(unsafe { &mut *store.buf.0.add(index) });
// Don't run the `drop` method for MutexGuard. The ownership of the underlying
// locked state is being moved to the returned MappedMutexGuard.
core::mem::forget(orig);
MappedResourceGuard {
store,
value: BufferPtr(value),
index,
}
}
}

/// Resource guard
///
/// Owning this guard provides mutable access to the underlying resource.
///
/// Dropping the guard returns the resource back to the pool.
pub struct MappedResourceGuard<'guard, 'buffer, M: RawMutex, T, U: ?Sized, const N: usize> {
store: &'guard ResourcePool<'buffer, M, T, N>,
index: usize,
value: BufferPtr<U>,
}

impl<'guard, 'buffer, M: RawMutex, T, U: ?Sized, const N: usize> Drop
for MappedResourceGuard<'guard, 'buffer, M, T, U, N>
{
fn drop(&mut self) {
self.store.state.lock(|state| {
let state = &mut *state.borrow_mut();
state.available.push(self.index).unwrap();
state.waker.wake();
});
}
}

impl<'guard, 'buffer, M: RawMutex, T, U: ?Sized, const N: usize> Deref
for MappedResourceGuard<'guard, 'buffer, M, T, U, N>
{
type Target = U;

fn deref(&self) -> &Self::Target {
unsafe { &*self.value.0 }
}
}

impl<'guard, 'buffer, M: RawMutex, T, U: ?Sized, const N: usize> DerefMut
for MappedResourceGuard<'guard, 'buffer, M, T, U, N>
{
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.value.0 }
}
}

impl<'guard, 'buffer, M: RawMutex, T, U: ?Sized, const N: usize> MappedResourceGuard<'guard, 'buffer, M, T, U, N> {
/// maps the value contained to another, referencing the original value. Does not take "self" to avoid shadowing any functions of the wrapped type.
pub fn map<V: ?Sized>(
orig: Self,
fun: impl FnOnce(&mut U) -> &mut V,
) -> MappedResourceGuard<'guard, 'buffer, M, T, V, N> {
let store = orig.store;
let index = orig.index;
let value = fun(unsafe { &mut *orig.value.0 });
// Don't run the `drop` method for MutexGuard. The ownership of the underlying
// locked state is being moved to the returned MappedMutexGuard.
core::mem::forget(orig);
MappedResourceGuard {
store,
value: BufferPtr(value),
index,
}
}
}
87 changes: 87 additions & 0 deletions examples/stm32g4/src/bin/resource_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#![no_std]
#![no_main]

use core::fmt::Write;

use defmt::info;
use embassy_executor::Spawner;
use embassy_sync::blocking_mutex::raw::ThreadModeRawMutex;
use embassy_sync::channel::Channel;
use embassy_sync::resource_pool::{MappedResourceGuard, ResourceGuard, ResourcePool};
use embassy_time::Timer;
use heapless::String;
use static_cell::{ConstStaticCell, StaticCell};
use {defmt_rtt as _, panic_probe as _};

const N_BUFFERS: usize = 3;
const N_BYTES: usize = 256;

static BUFFERS: ConstStaticCell<[String<N_BYTES>; N_BUFFERS]> =
ConstStaticCell::new([String::new(), String::new(), String::new()]);

static SHARED_CHANNEL: Channel<
ThreadModeRawMutex,
MappedResourceGuard<'static, 'static, ThreadModeRawMutex, String<N_BYTES>, str, N_BUFFERS>,
8,
> = Channel::new();

#[embassy_executor::main]
async fn main(spawner: Spawner) {
let _p = embassy_stm32::init(Default::default());

static POOL: StaticCell<ResourcePool<'static, ThreadModeRawMutex, String<N_BYTES>, N_BUFFERS>> = StaticCell::new();
let pool = POOL.init(ResourcePool::new(BUFFERS.take()));

spawner.spawn(produce_data(pool, 0).unwrap());
Timer::after_millis(100).await;
spawner.spawn(produce_data(pool, 1).unwrap());
Timer::after_millis(100).await;
spawner.spawn(produce_data(pool, 2).unwrap());
Timer::after_millis(100).await;
spawner.spawn(produce_data(pool, 3).unwrap());

info!("started producers");

let receiver = SHARED_CHANNEL.receiver();

loop {
let guard = receiver.receive().await;

defmt::info!("received: {} at addr {}", &*guard, guard.as_ptr() as usize);

// keep buffer for a while so it is not immediately returned to the pool
Timer::after_millis(1500).await;

// extra verbose, this happens automatically
// core::mem::drop(guard);
}
}

#[embassy_executor::task(pool_size = 4)]
async fn produce_data(pool: &'static ResourcePool<'static, ThreadModeRawMutex, String<N_BYTES>, N_BUFFERS>, num: u32) {
let sender = SHARED_CHANNEL.sender();

let mut n = 0;
loop {
Timer::after_secs(3).await;

// acquire one buffer
let mut guard = pool.take().await;

// write to buffer
guard.clear();
write!(&mut *guard, "hello {} from task {}", n, num).unwrap();

// map
let guard = ResourceGuard::map(guard, |g| g.as_mut_str());

let addr = guard.as_ptr() as usize;

// send buffer to main loop
sender.try_send(guard).ok().unwrap();

info!("task {} sent buffer with addr {}", num, addr);

n += 1;
}
}