Skip to content

Commit

Permalink
Require Send for T in Queue instead of Copy
Browse files Browse the repository at this point in the history
By requiring items to be `Copy` we reduce the queue to only work with
primitives. What is more appropriate here is to require that the item
is `Send` and use move semantics instead of copy.
  • Loading branch information
eivindbergem committed Aug 23, 2024
1 parent 59b0a36 commit 2267004
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 46 deletions.
42 changes: 21 additions & 21 deletions freertos-rust/src/patterns/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ pub trait ReplyableMessage {
#[derive(Copy, Clone)]
pub struct InputMessage<I>
where
I: Copy,
I: Copy + Send,
{
val: I,
reply_to_client_id: Option<usize>,
}

impl<I> InputMessage<I>
where
I: Copy,
I: Copy + Send,
{
pub fn request(val: I) -> Self {
InputMessage {
Expand All @@ -46,7 +46,7 @@ where

impl<I> ReplyableMessage for InputMessage<I>
where
I: Copy,
I: Copy + Send,
{
fn reply_to_client_id(&self) -> Option<usize> {
self.reply_to_client_id
Expand All @@ -55,17 +55,17 @@ where

pub struct Processor<I, O>
where
I: ReplyableMessage + Copy,
O: Copy,
I: ReplyableMessage + Copy + Send,
O: Copy + Send,
{
queue: Arc<Queue<I>>,
inner: Arc<Mutex<ProcessorInner<O>>>,
}

impl<I, O> Processor<I, O>
where
I: ReplyableMessage + Copy,
O: Copy,
I: ReplyableMessage + Copy + Send,
O: Copy + Send,
{
pub fn new(queue_size: usize) -> Result<Self, FreeRtosError> {
let p = ProcessorInner {
Expand Down Expand Up @@ -152,8 +152,8 @@ where

impl<I, O> Processor<InputMessage<I>, O>
where
I: Copy,
O: Copy,
I: Copy + Send,
O: Copy + Send,
{
pub fn reply_val<D: DurationTicks>(
&self,
Expand All @@ -167,15 +167,15 @@ where

struct ProcessorInner<O>
where
O: Copy,
O: Copy + Send,
{
clients: Vec<(usize, Weak<ClientWithReplyQueue<O>>)>,
next_client_id: usize,
}

impl<O> ProcessorInner<O>
where
O: Copy,
O: Copy + Send,
{
fn remove_client_reply(&mut self, client: &ClientWithReplyQueue<O>) {
self.clients.retain(|ref x| x.0 != client.id)
Expand All @@ -184,15 +184,15 @@ where

pub struct ProcessorClient<I, C>
where
I: ReplyableMessage + Copy,
I: ReplyableMessage + Copy + Send,
{
processor_queue: Weak<Queue<I>>,
client_reply: C,
}

impl<I, O> ProcessorClient<I, O>
where
I: ReplyableMessage + Copy,
I: ReplyableMessage + Copy + Send,
{
pub fn send<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<(), FreeRtosError> {
let processor_queue = self
Expand All @@ -218,7 +218,7 @@ where

impl<I> ProcessorClient<InputMessage<I>, ()>
where
I: Copy,
I: Copy + Send,
{
pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
self.send(InputMessage::request(val), max_wait)
Expand All @@ -235,8 +235,8 @@ where

impl<I, O> ProcessorClient<I, SharedClientWithReplyQueue<O>>
where
I: ReplyableMessage + Copy,
O: Copy,
I: ReplyableMessage + Copy + Send,
O: Copy + Send,
{
pub fn call<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<O, FreeRtosError> {
self.send(message, max_wait)?;
Expand All @@ -250,8 +250,8 @@ where

impl<I, O> ProcessorClient<InputMessage<I>, SharedClientWithReplyQueue<O>>
where
I: Copy,
O: Copy,
I: Copy + Send,
O: Copy + Send,
{
pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
self.send(InputMessage::request(val), max_wait)
Expand All @@ -268,7 +268,7 @@ where

impl<I, C> Clone for ProcessorClient<I, C>
where
I: ReplyableMessage + Copy,
I: ReplyableMessage + Copy + Send,
C: Clone,
{
fn clone(&self) -> Self {
Expand All @@ -281,7 +281,7 @@ where

pub struct ClientWithReplyQueue<O>
where
O: Copy,
O: Copy + Send,
{
id: usize,
processor_inner: Arc<Mutex<ProcessorInner<O>>>,
Expand All @@ -290,7 +290,7 @@ where

impl<O> Drop for ClientWithReplyQueue<O>
where
O: Copy,
O: Copy + Send,
{
fn drop(&mut self) {
if let Ok(mut p) = self.processor_inner.lock(Duration::ms(1000)) {
Expand Down
18 changes: 9 additions & 9 deletions freertos-rust/src/patterns/pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ use crate::queue::*;
use crate::units::*;

/// A pub-sub queue. An item sent to the publisher is sent to every subscriber.
pub struct QueuePublisher<T: Sized + Copy> {
pub struct QueuePublisher<T: Sized + Copy + Send> {
inner: Arc<Mutex<PublisherInner<T>>>,
}

/// A subscribtion to the publisher.
pub struct QueueSubscriber<T: Sized + Copy> {
pub struct QueueSubscriber<T: Sized + Copy + Send> {
inner: Arc<SubscriberInner<T>>,
}

impl<T: Sized + Copy> QueuePublisher<T> {
impl<T: Sized + Copy + Send> QueuePublisher<T> {
/// Create a new publisher
pub fn new() -> Result<QueuePublisher<T>, FreeRtosError> {
let inner = PublisherInner {
Expand Down Expand Up @@ -69,41 +69,41 @@ impl<T: Sized + Copy> QueuePublisher<T> {
}
}

impl<T: Sized + Copy> Clone for QueuePublisher<T> {
impl<T: Sized + Copy + Send> Clone for QueuePublisher<T> {
fn clone(&self) -> Self {
QueuePublisher {
inner: self.inner.clone(),
}
}
}

impl<T: Sized + Copy> Drop for QueueSubscriber<T> {
impl<T: Sized + Copy + Send> Drop for QueueSubscriber<T> {
fn drop(&mut self) {
if let Ok(mut l) = self.inner.publisher.lock(Duration::infinite()) {
l.unsubscribe(&self.inner);
}
}
}

impl<T: Sized + Copy> QueueSubscriber<T> {
impl<T: Sized + Copy + Send> QueueSubscriber<T> {
/// Wait for an item to be posted from the publisher.
pub fn receive<D: DurationTicks>(&self, max_wait: D) -> Result<T, FreeRtosError> {
self.inner.queue.receive(max_wait)
}
}

struct PublisherInner<T: Sized + Copy> {
struct PublisherInner<T: Sized + Copy + Send> {
subscribers: Vec<Arc<SubscriberInner<T>>>,
queue_next_id: usize,
}

impl<T: Sized + Copy> PublisherInner<T> {
impl<T: Sized + Copy + Send> PublisherInner<T> {
fn unsubscribe(&mut self, subscriber: &SubscriberInner<T>) {
self.subscribers.retain(|ref x| x.id != subscriber.id);
}
}

struct SubscriberInner<T: Sized + Copy> {
struct SubscriberInner<T: Sized + Copy + Send> {
id: usize,
queue: Queue<T>,
publisher: Arc<Mutex<PublisherInner<T>>>,
Expand Down
40 changes: 24 additions & 16 deletions freertos-rust/src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
use mem::MaybeUninit;

use crate::base::*;
use crate::isr::*;
use crate::prelude::v1::*;
use crate::shim::*;
use crate::units::*;

unsafe impl<T: Sized + Copy> Send for Queue<T> {}
unsafe impl<T: Sized + Copy> Sync for Queue<T> {}
unsafe impl<T: Sized + Send> Send for Queue<T> {}
unsafe impl<T: Sized + Send> Sync for Queue<T> {}

/// A queue with a finite size. The items are owned by the queue and are
/// copied.
/// A queue with a finite size.
#[derive(Debug)]
pub struct Queue<T: Sized + Copy> {
pub struct Queue<T: Sized + Send> {
queue: FreeRtosQueueHandle,
item_type: PhantomData<T>,
}

impl<T: Sized + Copy> Queue<T> {
impl<T: Sized + Send> Queue<T> {
pub fn new(max_size: usize) -> Result<Queue<T>, FreeRtosError> {
let item_size = mem::size_of::<T>();

Expand Down Expand Up @@ -50,13 +51,13 @@ impl<T: Sized + Copy> Queue<T> {

/// Send an item to the end of the queue. Wait for the queue to have empty space for it.
pub fn send<D: DurationTicks>(&self, item: T, max_wait: D) -> Result<(), FreeRtosError> {
let ptr = &item as *const _ as FreeRtosVoidPtr;

// Forget item to avoid calling `drop`
core::mem::forget(item);

unsafe {
if freertos_rs_queue_send(
self.queue,
&item as *const _ as FreeRtosVoidPtr,
max_wait.to_ticks(),
) != 0
{
if freertos_rs_queue_send(self.queue, ptr, max_wait.to_ticks()) != 0 {
Err(FreeRtosError::QueueSendTimeout)
} else {
Ok(())
Expand All @@ -70,10 +71,15 @@ impl<T: Sized + Copy> Queue<T> {
context: &mut InterruptContext,
item: T,
) -> Result<(), FreeRtosError> {
let ptr = &item as *const _ as FreeRtosVoidPtr;

// Forget item to avoid calling `drop`
core::mem::forget(item);

unsafe {
if freertos_rs_queue_send_isr(
self.queue,
&item as *const _ as FreeRtosVoidPtr,
ptr,
context.get_task_field_mut(),
) != 0
{
Expand All @@ -87,14 +93,16 @@ impl<T: Sized + Copy> Queue<T> {
/// Wait for an item to be available on the queue.
pub fn receive<D: DurationTicks>(&self, max_wait: D) -> Result<T, FreeRtosError> {
unsafe {
let mut buff = mem::zeroed::<T>();
// Use `MaybeUninit` to avoid calling drop on
// uninitialized struct in case of timeout
let mut buff = MaybeUninit::zeroed();
let r = freertos_rs_queue_receive(
self.queue,
&mut buff as *mut _ as FreeRtosMutVoidPtr,
max_wait.to_ticks(),
);
if r == 0 {
return Ok(buff);
return Ok(buff.assume_init());
} else {
return Err(FreeRtosError::QueueReceiveTimeout);
}
Expand All @@ -107,7 +115,7 @@ impl<T: Sized + Copy> Queue<T> {
}
}

impl<T: Sized + Copy> Drop for Queue<T> {
impl<T: Sized + Send> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
freertos_rs_queue_delete(self.queue);
Expand Down

0 comments on commit 2267004

Please sign in to comment.