Skip to content

Commit

Permalink
Do not store Attach frame in ReceiverLink
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Feb 17, 2022
1 parent d8d902c commit 2074abd
Show file tree
Hide file tree
Showing 15 changed files with 103 additions and 96 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.6.3] - 2022-02-18

* Do not store Attach frame in ReceiverLink

## [0.6.2] - 2022-01-18

* Allow to change max message size for receiver link
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "0.6.2"
version = "0.6.3"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -24,7 +24,7 @@ default = []
frame-trace = []

[dependencies]
ntex = "0.5.8"
ntex = "0.5.14"
ntex-amqp-codec = "0.8.1"

bitflags = "1.3"
Expand All @@ -36,7 +36,7 @@ uuid = { version = "0.8", features = ["v4"] }

[dev-dependencies]
env_logger = "0.9"
ntex = { version = "0.5.8", features = ["tokio"] }
ntex = { version = "0.5", features = ["tokio"] }

[patch.crates-io]
ntex-amqp = { path = "." }
Expand Down
4 changes: 2 additions & 2 deletions codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ license = "MIT/Apache-2.0"
edition = "2018"

[dependencies]
ntex-bytes = "0.1.9"
ntex-codec = "0.6.0"
ntex-bytes = "0.1.14"
ntex-codec = "0.6.2"
byteorder = "1.4"
fxhash = "0.2.1"
chrono = { version = "0.4", default-features = false }
Expand Down
6 changes: 1 addition & 5 deletions codec/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
#![allow(
clippy::mutable_key_type,
clippy::len_without_is_empty,
clippy::return_self_not_must_use
)]
#![allow(clippy::mutable_key_type, clippy::len_without_is_empty)]

#[macro_use]
extern crate derive_more;
Expand Down
2 changes: 1 addition & 1 deletion src/client/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ where
///
/// By default channel max value is set to 1024
pub fn channel_max(&mut self, num: u16) -> &mut Self {
self.config.channel_max = num as usize;
self.config.channel_max = num;
self
}

Expand Down
12 changes: 6 additions & 6 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub(crate) struct ConnectionInner {
pub(crate) sessions_map: HashMap<u16, usize>,
pub(crate) on_close: Condition,
pub(crate) error: Option<AmqpProtocolError>,
channel_max: usize,
pub(crate) max_frame_size: usize,
channel_max: u16,
pub(crate) max_frame_size: u32,
}

pub(crate) enum SessionState {
Expand Down Expand Up @@ -60,7 +60,7 @@ impl Connection {
error: None,
on_close: Condition::new(),
channel_max: local_config.channel_max,
max_frame_size: remote_config.max_frame_size as usize,
max_frame_size: remote_config.max_frame_size,
}))
}

Expand Down Expand Up @@ -133,7 +133,7 @@ impl Connection {
let entry = inner.sessions.vacant_entry();
let token = entry.key();

if token >= inner.channel_max {
if token >= inner.channel_max as usize {
log::trace!("Too many channels: {:?}", token);
Err(AmqpProtocolError::TooManyChannels)
} else {
Expand Down Expand Up @@ -377,8 +377,8 @@ impl ConnectionInner {
// receiver link
let link = session
.get_mut()
.attach_remote_receiver_link(session.clone(), attach);
Ok(Action::AttachReceiver(link))
.attach_remote_receiver_link(session.clone(), &attach);
Ok(Action::AttachReceiver(link, attach))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl fmt::Debug for ControlFrame {

#[derive(Debug)]
pub enum ControlFrameKind {
AttachReceiver(ReceiverLink),
AttachReceiver(protocol::Attach, ReceiverLink),
AttachSender(protocol::Attach, SenderLink),
Flow(protocol::Flow, SenderLink),
DetachSender(protocol::Detach, SenderLink),
Expand Down
15 changes: 9 additions & 6 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ where
) -> Result<(), AmqpDispatcherError> {
if let Some(err) = err {
match &frame.0.get_mut().kind {
ControlFrameKind::AttachReceiver(ref link) => {
ControlFrameKind::AttachReceiver(_, ref link) => {
let _ = link.close_with_error(err);
}
ControlFrameKind::AttachSender(ref frm, _) => {
Expand Down Expand Up @@ -116,14 +116,17 @@ where
}
} else {
match frame.0.get_mut().kind {
ControlFrameKind::AttachReceiver(ref link) => {
ControlFrameKind::AttachReceiver(ref frm, ref link) => {
let link = link.clone();
let fut = self.service.call(types::Message::Attached(link.clone()));
let frm = frm.clone();
let fut = self
.service
.call(types::Message::Attached(frm.clone(), link.clone()));
ntex::rt::spawn(async move {
if let Err(err) = fut.await {
let _ = link.close_with_error(Error::from(err)).await;
} else {
link.confirm_receiver_link();
link.confirm_receiver_link(&frm);
link.set_link_credit(50);
}
});
Expand Down Expand Up @@ -257,10 +260,10 @@ where
*self.ctl_fut.borrow_mut() =
Some((Some(frame.clone()), Box::pin(self.ctl_service.call(frame))));
}
types::Action::AttachReceiver(link) => {
types::Action::AttachReceiver(link, frm) => {
let frame = ControlFrame::new(
link.session().inner.clone(),
ControlFrameKind::AttachReceiver(link),
ControlFrameKind::AttachReceiver(frm, link),
);
*self.ctl_fut.borrow_mut() =
Some((Some(frame.clone()), Box::pin(self.ctl_service.call(frame))));
Expand Down
14 changes: 7 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![deny(rust_2018_idioms)]
#![allow(clippy::type_complexity, clippy::return_self_not_must_use)]
#![allow(clippy::type_complexity)]

#[macro_use]
extern crate derive_more;
Expand Down Expand Up @@ -41,7 +41,7 @@ pub mod codec {
#[derive(Debug, Clone)]
pub struct Configuration {
pub max_frame_size: u32,
pub channel_max: usize,
pub channel_max: u16,
pub idle_time_out: Milliseconds,
pub hostname: Option<ByteString>,
}
Expand Down Expand Up @@ -69,7 +69,7 @@ impl Configuration {
///
/// By default channel max value is set to 1024
pub fn channel_max(&mut self, num: u16) -> &mut Self {
self.channel_max = num as usize;
self.channel_max = num;
self
}

Expand All @@ -82,8 +82,8 @@ impl Configuration {
}

/// Get max frame size for the connection.
pub fn get_max_frame_size(&self) -> usize {
self.max_frame_size as usize
pub fn get_max_frame_size(&self) -> u32 {
self.max_frame_size
}

/// Set idle time-out for the connection in seconds.
Expand All @@ -108,7 +108,7 @@ impl Configuration {
container_id: ByteString::from(Uuid::new_v4().to_simple().to_string()),
hostname: self.hostname.clone(),
max_frame_size: self.max_frame_size,
channel_max: self.channel_max as u16,
channel_max: self.channel_max,
idle_time_out: if self.idle_time_out > 0 {
Some(self.idle_time_out)
} else {
Expand Down Expand Up @@ -143,7 +143,7 @@ impl<'a> From<&'a Open> for Configuration {
fn from(open: &'a Open) -> Self {
Configuration {
max_frame_size: open.max_frame_size(),
channel_max: open.channel_max() as usize,
channel_max: open.channel_max(),
idle_time_out: open.idle_time_out().unwrap_or(0),
hostname: open.hostname().cloned(),
}
Expand Down
37 changes: 22 additions & 15 deletions src/rcvlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ impl ReceiverLink {
ReceiverLink { inner }
}

pub fn name(&self) -> &ByteString {
&self.inner.get_ref().name
}

pub fn handle(&self) -> Handle {
self.inner.get_ref().handle as Handle
}
Expand All @@ -53,10 +57,6 @@ impl ReceiverLink {
&self.inner.get_ref().session
}

pub fn frame(&self) -> &Attach {
&self.inner.get_ref().attach
}

pub fn is_closed(&self) -> bool {
self.inner.get_ref().closed
}
Expand All @@ -65,13 +65,15 @@ impl ReceiverLink {
self.inner.get_ref().error.as_ref()
}

pub(crate) fn confirm_receiver_link(&self) {
pub(crate) fn confirm_receiver_link(&self, frm: &Attach) {
let inner = self.inner.get_mut();
let size = self.inner.get_ref().max_message_size;
let size = if size != 0 { Some(size) } else { None };
inner
.session
.inner
.get_mut()
.confirm_receiver_link(inner.handle, &inner.attach);
.confirm_receiver_link(inner.handle, frm, size);
}

pub fn set_link_credit(&self, credit: u32) {
Expand All @@ -80,7 +82,7 @@ impl ReceiverLink {

/// Set max message size.
pub fn set_max_message_size(&self, size: u64) {
self.inner.get_mut().attach.0.max_message_size = Some(size);
self.inner.get_mut().max_message_size = size;
}

/// Set max total size for partial transfers.
Expand Down Expand Up @@ -135,9 +137,9 @@ impl ReceiverLink {
pub(crate) fn remote_closed(&self, error: Option<Error>) {
let inner = self.inner.get_mut();
trace!(
"Receiver link has been closed remotely handle: {:?} name: {:#?}",
"Receiver link has been closed remotely handle: {:?} name: {:?}",
inner.remote_handle,
inner.attach.name()
inner.name
);
inner.closed = true;
inner.error = error;
Expand Down Expand Up @@ -196,9 +198,9 @@ impl Stream for ReceiverLink {

#[derive(Debug)]
pub(crate) struct ReceiverLinkInner {
name: ByteString,
handle: Handle,
remote_handle: Handle,
attach: Attach,
session: Session,
closed: bool,
reader_task: LocalWaker,
Expand All @@ -208,6 +210,7 @@ pub(crate) struct ReceiverLinkInner {
error: Option<Error>,
partial_body: Option<BytesMut>,
partial_body_max: usize,
max_message_size: u64,
pool: PoolRef,
}

Expand All @@ -216,23 +219,27 @@ impl ReceiverLinkInner {
session: Cell<SessionInner>,
handle: Handle,
remote_handle: Handle,
attach: Attach,
frame: &Attach,
) -> ReceiverLinkInner {
let pool = session.get_ref().memory_pool();
let mut name = frame.name().clone();
name.trimdown();

ReceiverLinkInner {
name,
handle,
remote_handle,
pool,
session: Session::new(session),
closed: false,
queue: VecDeque::with_capacity(4),
credit: 0,
error: None,
partial_body: None,
partial_body_max: 262_144,
delivery_count: attach.initial_delivery_count().unwrap_or(0),
delivery_count: frame.initial_delivery_count().unwrap_or(0),
max_message_size: frame.max_message_size().unwrap_or(0),
reader_task: LocalWaker::new(),
attach,
pool,
}
}

Expand Down Expand Up @@ -371,7 +378,7 @@ impl ReceiverLinkInner {
} else {
let body = if let Some(body) = transfer.0.body.take() {
match body {
TransferBody::Data(data) => BytesMut::from(data),
TransferBody::Data(data) => BytesMut::copy_from_slice(&data),
TransferBody::Message(msg) => {
let mut buf = self.pool.buf_with_capacity(msg.encoded_size());
msg.encode(&mut buf);
Expand Down
Loading

0 comments on commit 2074abd

Please sign in to comment.