Skip to content

Commit

Permalink
Fix bug causing duplicates in querying subscriber (#597)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart authored Nov 22, 2023
1 parent 7295201 commit 8954a81
Showing 1 changed file with 39 additions and 20 deletions.
59 changes: 39 additions & 20 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,9 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> {

let key_expr = conf.key_expr?;

// declare subscriber at first
// register fetch handler
let handler = register_handler(state.clone(), callback.clone());
// declare subscriber
let subscriber = match conf.session.clone() {
SessionRef::Borrow(session) => match conf.key_space.into() {
crate::KeySpace::User => session
Expand Down Expand Up @@ -707,15 +709,15 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> {
},
};

let mut fetch_subscriber = FetchingSubscriber {
let fetch_subscriber = FetchingSubscriber {
subscriber,
callback,
state,
receiver,
};

// start fetch
fetch_subscriber.fetch(conf.fetch).res_sync()?;
// run fetch
run_fetch(conf.fetch, handler)?;

Ok(fetch_subscriber)
}
Expand Down Expand Up @@ -882,22 +884,8 @@ where
<TryIntoSample as TryInto<Sample>>::Error: Into<zenoh_core::Error>,
{
fn res_sync(self) -> <Self as Resolvable>::To {
zlock!(self.state).pending_fetches += 1;
// pending fetches will be decremented in RepliesHandler drop()
let handler = RepliesHandler {
state: self.state,
callback: self.callback,
};

log::debug!("Fetch");
(self.fetch)(Box::new(move |s: TryIntoSample| match s.try_into() {
Ok(s) => {
let mut state = zlock!(handler.state);
log::trace!("Fetched sample received: push it to merge_queue");
state.merge_queue.push(s);
}
Err(e) => log::debug!("Received error fetching data: {}", e.into()),
}))
let handler = register_handler(self.state, self.callback);
run_fetch(self.fetch, handler)
}
}

Expand All @@ -913,3 +901,34 @@ where
std::future::ready(self.res_sync())
}
}

fn register_handler(
state: Arc<Mutex<InnerState>>,
callback: Arc<dyn Fn(Sample) + Send + Sync>,
) -> RepliesHandler {
zlock!(state).pending_fetches += 1;
// pending fetches will be decremented in RepliesHandler drop()
RepliesHandler { state, callback }
}

fn run_fetch<
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
>(
fetch: Fetch,
handler: RepliesHandler,
) -> ZResult<()>
where
TryIntoSample: TryInto<Sample>,
<TryIntoSample as TryInto<Sample>>::Error: Into<zenoh_core::Error>,
{
log::debug!("Fetch data for FetchingSubscriber");
(fetch)(Box::new(move |s: TryIntoSample| match s.try_into() {
Ok(s) => {
let mut state = zlock!(handler.state);
log::trace!("Fetched sample received: push it to merge_queue");
state.merge_queue.push(s);
}
Err(e) => log::debug!("Received error fetching data: {}", e.into()),
}))
}

0 comments on commit 8954a81

Please sign in to comment.