Skip to content

Commit

Permalink
Upgrade to ntex 0.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Dec 30, 2021
1 parent 3ca3004 commit 8a2d4d7
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 166 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --all-features
args: --all --features=ntex/tokio

fmt:
name: Rustfmt
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
timeout-minutes: 40
with:
command: test
args: --all --all-features --no-fail-fast -- --nocapture
args: --all --no-fail-fast --features=ntex/tokio -- --nocapture

- name: Install tarpaulin
if: matrix.version == '1.53.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
Expand All @@ -57,7 +57,7 @@ jobs:
if: matrix.version == '1.53.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
continue-on-error: true
run: |
cargo tarpaulin --out Xml --all --all-features
cargo tarpaulin --out Xml --all --features=ntex/tokio
- name: Upload to Codecov
if: matrix.version == '1.53.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/osx.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: --all --no-fail-fast -- --nocapture
args: --all --no-fail-fast --features=ntex/tokio -- --nocapture

- name: Clear the cargo caches
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: --all --all-features -- --nocapture
args: --all --features=ntex/tokio -- --nocapture
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.6.0] - 2021-12-30

* Upgrade to ntex 0.5.0

## [0.6.0-b.5] - 2021-12-28

* Make Server universal, accept both Io<F> and IoBoxed
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "0.6.0-b.5"
version = "0.6.0"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -24,7 +24,7 @@ default = []
frame-trace = []

[dependencies]
ntex = { version = "0.5.0-b.5", default-features = false }
ntex = { version = "0.5.0", default-features = false }
ntex-amqp-codec = "0.8.0"

bitflags = "1.3"
Expand Down
6 changes: 5 additions & 1 deletion codec/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#![allow(clippy::mutable_key_type, clippy::len_without_is_empty)]
#![allow(
clippy::mutable_key_type,
clippy::len_without_is_empty,
clippy::return_self_not_must_use
)]

#[macro_use]
extern crate derive_more;
Expand Down
230 changes: 115 additions & 115 deletions codec/src/protocol/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,121 @@ impl Encode for Frame {
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum Outcome {
Accepted(Accepted),
Rejected(Rejected),
Released(Released),
Modified(Modified),
}
impl DecodeFormatted for Outcome {
fn decode_with_format(input: &mut Bytes, fmt: u8) -> Result<Self, AmqpParseError> {
validate_code!(fmt, codec::FORMATCODE_DESCRIBED);
let descriptor = Descriptor::decode(input)?;
match descriptor {
Descriptor::Ulong(36) => decode_accepted_inner(input).map(Outcome::Accepted),
Descriptor::Ulong(37) => decode_rejected_inner(input).map(Outcome::Rejected),
Descriptor::Ulong(38) => decode_released_inner(input).map(Outcome::Released),
Descriptor::Ulong(39) => decode_modified_inner(input).map(Outcome::Modified),
Descriptor::Symbol(ref a) if a.as_str() == "amqp:accepted:list" => {
decode_accepted_inner(input).map(Outcome::Accepted)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:rejected:list" => {
decode_rejected_inner(input).map(Outcome::Rejected)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:released:list" => {
decode_released_inner(input).map(Outcome::Released)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:modified:list" => {
decode_modified_inner(input).map(Outcome::Modified)
}
_ => Err(AmqpParseError::InvalidDescriptor(Box::new(descriptor))),
}
}
}
impl Encode for Outcome {
fn encoded_size(&self) -> usize {
match *self {
Outcome::Accepted(ref v) => encoded_size_accepted_inner(v),
Outcome::Rejected(ref v) => encoded_size_rejected_inner(v),
Outcome::Released(ref v) => encoded_size_released_inner(v),
Outcome::Modified(ref v) => encoded_size_modified_inner(v),
}
}
fn encode(&self, buf: &mut BytesMut) {
match *self {
Outcome::Accepted(ref v) => encode_accepted_inner(v, buf),
Outcome::Rejected(ref v) => encode_rejected_inner(v, buf),
Outcome::Released(ref v) => encode_released_inner(v, buf),
Outcome::Modified(ref v) => encode_modified_inner(v, buf),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum SaslFrameBody {
SaslMechanisms(SaslMechanisms),
SaslInit(SaslInit),
SaslChallenge(SaslChallenge),
SaslResponse(SaslResponse),
SaslOutcome(SaslOutcome),
}
impl DecodeFormatted for SaslFrameBody {
fn decode_with_format(input: &mut Bytes, fmt: u8) -> Result<Self, AmqpParseError> {
validate_code!(fmt, codec::FORMATCODE_DESCRIBED);
let descriptor = Descriptor::decode(input)?;
match descriptor {
Descriptor::Ulong(64) => {
decode_sasl_mechanisms_inner(input).map(SaslFrameBody::SaslMechanisms)
}
Descriptor::Ulong(65) => decode_sasl_init_inner(input).map(SaslFrameBody::SaslInit),
Descriptor::Ulong(66) => {
decode_sasl_challenge_inner(input).map(SaslFrameBody::SaslChallenge)
}
Descriptor::Ulong(67) => {
decode_sasl_response_inner(input).map(SaslFrameBody::SaslResponse)
}
Descriptor::Ulong(68) => {
decode_sasl_outcome_inner(input).map(SaslFrameBody::SaslOutcome)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:sasl-mechanisms:list" => {
decode_sasl_mechanisms_inner(input).map(SaslFrameBody::SaslMechanisms)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:sasl-init:list" => {
decode_sasl_init_inner(input).map(SaslFrameBody::SaslInit)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:sasl-challenge:list" => {
decode_sasl_challenge_inner(input).map(SaslFrameBody::SaslChallenge)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:sasl-response:list" => {
decode_sasl_response_inner(input).map(SaslFrameBody::SaslResponse)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:sasl-outcome:list" => {
decode_sasl_outcome_inner(input).map(SaslFrameBody::SaslOutcome)
}
_ => Err(AmqpParseError::InvalidDescriptor(Box::new(descriptor))),
}
}
}
impl Encode for SaslFrameBody {
fn encoded_size(&self) -> usize {
match *self {
SaslFrameBody::SaslMechanisms(ref v) => encoded_size_sasl_mechanisms_inner(v),
SaslFrameBody::SaslInit(ref v) => encoded_size_sasl_init_inner(v),
SaslFrameBody::SaslChallenge(ref v) => encoded_size_sasl_challenge_inner(v),
SaslFrameBody::SaslResponse(ref v) => encoded_size_sasl_response_inner(v),
SaslFrameBody::SaslOutcome(ref v) => encoded_size_sasl_outcome_inner(v),
}
}
fn encode(&self, buf: &mut BytesMut) {
match *self {
SaslFrameBody::SaslMechanisms(ref v) => encode_sasl_mechanisms_inner(v, buf),
SaslFrameBody::SaslInit(ref v) => encode_sasl_init_inner(v, buf),
SaslFrameBody::SaslChallenge(ref v) => encode_sasl_challenge_inner(v, buf),
SaslFrameBody::SaslResponse(ref v) => encode_sasl_response_inner(v, buf),
SaslFrameBody::SaslOutcome(ref v) => encode_sasl_outcome_inner(v, buf),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum Section {
Header(Header),
DeliveryAnnotations(DeliveryAnnotations),
Expand Down Expand Up @@ -263,121 +378,6 @@ impl Encode for DeliveryState {
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum SaslFrameBody {
SaslMechanisms(SaslMechanisms),
SaslInit(SaslInit),
SaslChallenge(SaslChallenge),
SaslResponse(SaslResponse),
SaslOutcome(SaslOutcome),
}
impl DecodeFormatted for SaslFrameBody {
fn decode_with_format(input: &mut Bytes, fmt: u8) -> Result<Self, AmqpParseError> {
validate_code!(fmt, codec::FORMATCODE_DESCRIBED);
let descriptor = Descriptor::decode(input)?;
match descriptor {
Descriptor::Ulong(64) => {
decode_sasl_mechanisms_inner(input).map(SaslFrameBody::SaslMechanisms)
}
Descriptor::Ulong(65) => decode_sasl_init_inner(input).map(SaslFrameBody::SaslInit),
Descriptor::Ulong(66) => {
decode_sasl_challenge_inner(input).map(SaslFrameBody::SaslChallenge)
}
Descriptor::Ulong(67) => {
decode_sasl_response_inner(input).map(SaslFrameBody::SaslResponse)
}
Descriptor::Ulong(68) => {
decode_sasl_outcome_inner(input).map(SaslFrameBody::SaslOutcome)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:sasl-mechanisms:list" => {
decode_sasl_mechanisms_inner(input).map(SaslFrameBody::SaslMechanisms)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:sasl-init:list" => {
decode_sasl_init_inner(input).map(SaslFrameBody::SaslInit)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:sasl-challenge:list" => {
decode_sasl_challenge_inner(input).map(SaslFrameBody::SaslChallenge)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:sasl-response:list" => {
decode_sasl_response_inner(input).map(SaslFrameBody::SaslResponse)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:sasl-outcome:list" => {
decode_sasl_outcome_inner(input).map(SaslFrameBody::SaslOutcome)
}
_ => Err(AmqpParseError::InvalidDescriptor(Box::new(descriptor))),
}
}
}
impl Encode for SaslFrameBody {
fn encoded_size(&self) -> usize {
match *self {
SaslFrameBody::SaslMechanisms(ref v) => encoded_size_sasl_mechanisms_inner(v),
SaslFrameBody::SaslInit(ref v) => encoded_size_sasl_init_inner(v),
SaslFrameBody::SaslChallenge(ref v) => encoded_size_sasl_challenge_inner(v),
SaslFrameBody::SaslResponse(ref v) => encoded_size_sasl_response_inner(v),
SaslFrameBody::SaslOutcome(ref v) => encoded_size_sasl_outcome_inner(v),
}
}
fn encode(&self, buf: &mut BytesMut) {
match *self {
SaslFrameBody::SaslMechanisms(ref v) => encode_sasl_mechanisms_inner(v, buf),
SaslFrameBody::SaslInit(ref v) => encode_sasl_init_inner(v, buf),
SaslFrameBody::SaslChallenge(ref v) => encode_sasl_challenge_inner(v, buf),
SaslFrameBody::SaslResponse(ref v) => encode_sasl_response_inner(v, buf),
SaslFrameBody::SaslOutcome(ref v) => encode_sasl_outcome_inner(v, buf),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum Outcome {
Accepted(Accepted),
Rejected(Rejected),
Released(Released),
Modified(Modified),
}
impl DecodeFormatted for Outcome {
fn decode_with_format(input: &mut Bytes, fmt: u8) -> Result<Self, AmqpParseError> {
validate_code!(fmt, codec::FORMATCODE_DESCRIBED);
let descriptor = Descriptor::decode(input)?;
match descriptor {
Descriptor::Ulong(36) => decode_accepted_inner(input).map(Outcome::Accepted),
Descriptor::Ulong(37) => decode_rejected_inner(input).map(Outcome::Rejected),
Descriptor::Ulong(38) => decode_released_inner(input).map(Outcome::Released),
Descriptor::Ulong(39) => decode_modified_inner(input).map(Outcome::Modified),
Descriptor::Symbol(ref a) if a.as_str() == "amqp:accepted:list" => {
decode_accepted_inner(input).map(Outcome::Accepted)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:rejected:list" => {
decode_rejected_inner(input).map(Outcome::Rejected)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:released:list" => {
decode_released_inner(input).map(Outcome::Released)
}
Descriptor::Symbol(ref a) if a.as_str() == "amqp:modified:list" => {
decode_modified_inner(input).map(Outcome::Modified)
}
_ => Err(AmqpParseError::InvalidDescriptor(Box::new(descriptor))),
}
}
}
impl Encode for Outcome {
fn encoded_size(&self) -> usize {
match *self {
Outcome::Accepted(ref v) => encoded_size_accepted_inner(v),
Outcome::Rejected(ref v) => encoded_size_rejected_inner(v),
Outcome::Released(ref v) => encoded_size_released_inner(v),
Outcome::Modified(ref v) => encoded_size_modified_inner(v),
}
}
fn encode(&self, buf: &mut BytesMut) {
match *self {
Outcome::Accepted(ref v) => encode_accepted_inner(v, buf),
Outcome::Rejected(ref v) => encode_rejected_inner(v, buf),
Outcome::Released(ref v) => encode_released_inner(v, buf),
Outcome::Modified(ref v) => encode_modified_inner(v, buf),
}
}
}
pub type Handle = u32;
pub type Seconds = u32;
pub type Milliseconds = u32;
Expand Down
10 changes: 3 additions & 7 deletions src/client/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use ntex::io::{Dispatcher as IoDispatcher, IoBoxed, Timer};
use ntex::io::{Dispatcher as IoDispatcher, IoBoxed};
use ntex::service::{fn_service, Service};
use ntex::time::Seconds;
use ntex::util::Ready;
Expand All @@ -16,7 +16,6 @@ pub struct Client<St = ()> {
connection: Connection,
keepalive: Seconds,
remote_config: Configuration,
timer: Timer,
_st: State<St>,
}

Expand All @@ -28,15 +27,13 @@ impl Client {
connection: Connection,
keepalive: Seconds,
remote_config: Configuration,
timer: Timer,
) -> Self {
Client {
io,
codec,
connection,
keepalive,
remote_config,
timer,
_st: State::new(()),
}
}
Expand All @@ -61,7 +58,6 @@ where
connection: self.connection,
keepalive: self.keepalive,
remote_config: self.remote_config,
timer: self.timer,
_st: State::new(st),
}
}
Expand All @@ -84,7 +80,7 @@ where
Seconds::ZERO
};

IoDispatcher::new(self.io, self.codec, dispatcher, self.timer)
IoDispatcher::new(self.io, self.codec, dispatcher)
.keepalive_timeout(keepalive)
.await
}
Expand All @@ -110,7 +106,7 @@ where
Seconds::ZERO
};

IoDispatcher::new(self.io, self.codec, dispatcher, self.timer)
IoDispatcher::new(self.io, self.codec, dispatcher)
.keepalive_timeout(keepalive)
.await
}
Expand Down
Loading

0 comments on commit 8a2d4d7

Please sign in to comment.