Skip to content

Commit

Permalink
fix session end (#17)
Browse files Browse the repository at this point in the history
* dont remove session before confirmation

* fix client handling of end reply
  • Loading branch information
wpbrown authored Dec 14, 2021
1 parent e5fcc1f commit 48a3a7e
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 18 deletions.
21 changes: 14 additions & 7 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ pub(crate) struct ConnectionInner {
pub(crate) enum SessionState {
Opening(Option<oneshot::Sender<Session>>, Cell<ConnectionInner>),
Established(Cell<SessionInner>),
#[allow(dead_code)]
Closing(Option<oneshot::Sender<Result<(), AmqpProtocolError>>>),
Closing(Cell<SessionInner>),
}

impl SessionState {
Expand Down Expand Up @@ -153,6 +152,14 @@ impl Connection {
}
}

pub(crate) fn close_session(&self, id: usize) {
if let Some(state) = self.0.get_mut().sessions.get_mut(id) {
if let SessionState::Established(inner) = state {
*state = SessionState::Closing(inner.clone());
}
}
}

pub(crate) fn post_frame(&self, frame: AmqpFrame) {
#[cfg(feature = "frame-trace")]
log::trace!("outgoing: {:#?}", frame);
Expand Down Expand Up @@ -382,16 +389,16 @@ impl ConnectionInner {
}
_ => session.get_mut().handle_frame(frame),
},
SessionState::Closing(ref mut tx) => match frame {
SessionState::Closing(ref mut session) => match frame {
Frame::End(frm) => {
trace!("Session end is confirmed: {:?}", frm);
if let Some(tx) = tx.take() {
let _ = tx.send(Ok(()));
}
let action = session
.get_mut()
.end(AmqpProtocolError::SessionEnded(frm.error));
if let Some(token) = self.sessions_map.remove(&channel_id) {
self.sessions.remove(token);
}
Ok(Action::None)
Ok(action)
}
frm => {
trace!("Got frame after initiated session end: {:?}", frm);
Expand Down
27 changes: 18 additions & 9 deletions src/session.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{collections::VecDeque, fmt, future::ready, future::Future};
use std::{collections::VecDeque, fmt, future::Future};

use ntex::channel::{condition, oneshot, pool};
use ntex::util::{ByteString, Bytes, Either, HashMap, PoolRef};
use ntex::util::{ByteString, Bytes, Either, HashMap, PoolRef, Ready};
use slab::Slab;

use ntex_amqp_codec::protocol::{
Expand Down Expand Up @@ -39,17 +39,26 @@ impl Session {
&self.inner.get_ref().sink
}

pub fn end(&self) -> impl Future<Output = ()> {
pub fn end(&self) -> impl Future<Output = Result<(), AmqpProtocolError>> {
let inner = self.inner.get_mut();

if inner.flags.contains(Flags::ENDED) {
Either::Left(ready(()))
} else if inner.flags.contains(Flags::ENDING) {
Either::Right(inner.closed.wait())
Either::Left(Ready::Ok(()))
} else {
inner.post_frame(Frame::End(End { error: None }));
inner.flags.insert(Flags::ENDING);
Either::Right(inner.closed.wait())
if !inner.flags.contains(Flags::ENDING) {
inner.sink.close_session(inner.remote_channel_id as usize);
inner.post_frame(Frame::End(End { error: None }));
inner.flags.insert(Flags::ENDING);
}
let inner = self.inner.clone();
Either::Right(async move {
inner.closed.wait().await;
if let Some(err @ AmqpProtocolError::SessionEnded(Some(_))) = inner.error.clone() {
Err(err)
} else {
Ok(())
}
})
}
}

Expand Down
5 changes: 3 additions & 2 deletions tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ async fn test_session_end() -> std::io::Result<()> {
let uri = Uri::try_from(format!("amqp://{}:{}", srv.addr().ip(), srv.addr().port())).unwrap();
let client = client::Connector::new().connect(uri).await.unwrap();

let sink = client.sink();
let mut sink = client.sink();
ntex::rt::spawn(async move {
let _ = client.start_default().await;
});
Expand All @@ -201,8 +201,9 @@ async fn test_session_end() -> std::io::Result<()> {
.unwrap();
link.send(Bytes::from(b"test".as_ref())).await.unwrap();

session.end().await;
session.end().await.unwrap();
assert_eq!(link_names.lock().unwrap()[0], "test");
assert!(sink.is_opened());

Ok(())
}

0 comments on commit 48a3a7e

Please sign in to comment.