From 72590b38ac34f338c6e6124976b7436c982a43ed Mon Sep 17 00:00:00 2001 From: Zack Slayton Date: Thu, 29 Jan 2015 22:36:11 -0500 Subject: [PATCH 1/3] Made session::send() and session::receive return an IoResult. Will continue to refactor around this later. --- src/session.rs | 44 ++++++++++++++++++++++++++++++-------------- src/transaction.rs | 10 +++++----- 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/src/session.rs b/src/session.rs index 4a24799..0b1e85f 100644 --- a/src/session.rs +++ b/src/session.rs @@ -7,6 +7,8 @@ use std::io::BufferedWriter; use std::sync::mpsc::{Sender, Receiver, channel}; use std::io::Timer; use std::io::IoResult; +use std::io::IoError; +use std::io::IoErrorKind::{ConnectionFailed}; use std::io::net::tcp::TcpStream; use connection::Connection; use subscription::AckMode; @@ -205,11 +207,11 @@ impl <'a> Session <'a> { pub fn send_bytes(&mut self, topic: &str, mime_type: &str, body: &[u8]) -> IoResult<()> { let send_frame = Frame::send(topic, mime_type, body); - Ok(self.send(send_frame)) + self.send(send_frame) } pub fn send_text_with_receipt(&mut self, topic: &str, body: &str) -> IoResult<()> { - Ok(try!(self.send_bytes_with_receipt(topic, "text/plain", body.as_bytes()))) + self.send_bytes_with_receipt(topic, "text/plain", body.as_bytes()) } pub fn send_bytes_with_receipt(&mut self, topic: &str, mime_type: &str, body: &[u8]) -> IoResult<()> { @@ -218,7 +220,7 @@ impl <'a> Session <'a> { send_frame.headers.push( Header::encode_key_value("receipt", receipt_id.as_slice()) ); - self.send(send_frame); + try!(self.send(send_frame)); self.outstanding_receipts.insert(receipt_id); Ok(()) } @@ -229,7 +231,7 @@ impl <'a> Session <'a> { let sub = Subscription::new(next_id, topic, ack_mode, message_handler); let subscribe_frame = Frame::subscribe(sub.id.as_slice(), sub.topic.as_slice(), ack_mode); debug!("Sending frame:\n{}", subscribe_frame); - self.send(subscribe_frame); + try!(self.send(subscribe_frame)); debug!("Registering callback for subscription id: {}", sub.id); let id_to_return = sub.id.to_string(); self.subscriptions.insert(sub.id.to_string(), sub); @@ -239,12 +241,12 @@ impl <'a> Session <'a> { pub fn unsubscribe(&mut self, sub_id: &str) -> IoResult<()> { let _ = self.subscriptions.remove(sub_id); let unsubscribe_frame = Frame::unsubscribe(sub_id.as_slice()); - Ok(self.send(unsubscribe_frame)) + self.send(unsubscribe_frame) } pub fn disconnect(&mut self) -> IoResult<()> { let disconnect_frame = Frame::disconnect(); - Ok(self.send(disconnect_frame)) + self.send(disconnect_frame) } pub fn begin_transaction<'b>(&'b mut self) -> IoResult> { @@ -253,12 +255,26 @@ impl <'a> Session <'a> { Ok(tx) } - pub fn send(&self, frame: Frame) { - let _ = self.sender.send(frame); + pub fn send(&self, frame: Frame) -> IoResult<()> { + match self.sender.send(frame) { + Ok(_) => Ok(()), + Err(_) => Err(IoError { + kind: ConnectionFailed, + desc: "The connection to the server was lost.", + detail: None + }) + } } - pub fn receive(&self) -> Frame { - self.receiver.recv().unwrap() + pub fn receive(&self) -> IoResult { + match self.receiver.recv() { + Ok(frame) => Ok(frame), + Err(_) => Err(IoError { + kind: ConnectionFailed, + desc: "Could not receive frame: the connection to the server was lost.", + detail: None + }) + } } pub fn dispatch(&mut self, frame: Frame) { @@ -313,17 +329,17 @@ impl <'a> Session <'a> { fn acknowledge_frame(&mut self, ack_id: &str) -> IoResult<()> { let ack_frame = Frame::ack(ack_id); - Ok(self.send(ack_frame)) + self.send(ack_frame) } fn negatively_acknowledge_frame(&mut self, ack_id: &str) -> IoResult<()>{ let nack_frame = Frame::nack(ack_id); - Ok(self.send(nack_frame)) + self.send(nack_frame) } - pub fn listen(&mut self) { + pub fn listen(&mut self) -> IoResult<()> { loop { - let frame = self.receive(); + let frame = try!(self.receive()); debug!("Received '{}' frame, dispatching.", frame.command); self.dispatch(frame) } diff --git a/src/transaction.rs b/src/transaction.rs index b7ccb3f..a45e32b 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -21,25 +21,25 @@ impl <'a, 'b> Transaction<'a, 'b> { send_frame.headers.push( Header::encode_key_value("transaction", self.id.as_slice()) ); - Ok(self.session.send(send_frame)) + self.session.send(send_frame) } pub fn send_text(&mut self, topic: &str, body: &str) -> IoResult<()> { - Ok(try!(self.send_bytes(topic, "text/plain", body.as_bytes()))) + self.send_bytes(topic, "text/plain", body.as_bytes()) } pub fn begin(&mut self) -> IoResult<()> { let begin_frame = Frame::begin(self.id.as_slice()); - Ok(self.session.send(begin_frame)) + self.session.send(begin_frame) } pub fn commit(&mut self) -> IoResult<()> { let commit_frame = Frame::commit(self.id.as_slice()); - Ok(self.session.send(commit_frame)) + self.session.send(commit_frame) } pub fn abort(&mut self) -> IoResult<()> { let abort_frame = Frame::abort(self.id.as_slice()); - Ok(self.session.send(abort_frame)) + self.session.send(abort_frame) } } From e9e6aba95af29e45cdbfeee2b7229f1553dd78a4 Mon Sep 17 00:00:00 2001 From: Zack Slayton Date: Thu, 29 Jan 2015 22:54:14 -0500 Subject: [PATCH 2/3] Rename std::io to std::old_io. Update allow statements to be per-feature. --- Cargo.toml | 3 +++ src/connection.rs | 12 ++++++------ src/frame.rs | 14 +++++++------- src/session.rs | 14 +++++++------- src/stomp.rs | 7 +++++-- src/transaction.rs | 2 +- 6 files changed, 29 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ebfe437..f03ea10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,6 @@ license = "MIT" [lib] name = "stomp" + +[dependencies.log] +log = "*" diff --git a/src/connection.rs b/src/connection.rs index ea380c5..d2a1648 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,10 +1,10 @@ -use std::io::net::tcp::TcpStream; -use std::io::BufferedReader; -use std::io::BufferedWriter; +use std::old_io::net::tcp::TcpStream; +use std::old_io::BufferedReader; +use std::old_io::BufferedWriter; use frame::Transmission::{HeartBeat, CompleteFrame}; -use std::io::IoResult; -use std::io::IoError; -use std::io::InvalidInput; +use std::old_io::IoResult; +use std::old_io::IoError; +use std::old_io::InvalidInput; use std::str::from_utf8; use frame::Frame; use session::Session; diff --git a/src/frame.rs b/src/frame.rs index c3b88c4..e08d17e 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -3,11 +3,11 @@ use header::Header; use header::ContentLength; use header::StompHeaderSet; use subscription::AckMode; -use std::io::IoResult; -use std::io::IoError; -use std::io::InvalidInput; -use std::io::BufferedReader; -use std::io::BufferedWriter; +use std::old_io::IoResult; +use std::old_io::IoError; +use std::old_io::InvalidInput; +use std::old_io::BufferedReader; +use std::old_io::BufferedWriter; use std::str::from_utf8; use std::fmt; @@ -74,8 +74,8 @@ impl Frame { try!(stream.write_str("\n")); } try!(stream.write_str("\n")); - try!(stream.write(self.body.as_slice())); - try!(stream.write(&[0])); + try!(stream.write_all(self.body.as_slice())); + try!(stream.write_all(&[0])); try!(stream.flush()); debug!("write() complete."); Ok(()) diff --git a/src/session.rs b/src/session.rs index 0b1e85f..4202e17 100644 --- a/src/session.rs +++ b/src/session.rs @@ -2,14 +2,14 @@ use std::thread::Thread; use std::collections::hash_set::HashSet; use std::collections::hash_map::HashMap; use std::time::Duration; -use std::io::BufferedReader; -use std::io::BufferedWriter; +use std::old_io::BufferedReader; +use std::old_io::BufferedWriter; use std::sync::mpsc::{Sender, Receiver, channel}; -use std::io::Timer; -use std::io::IoResult; -use std::io::IoError; -use std::io::IoErrorKind::{ConnectionFailed}; -use std::io::net::tcp::TcpStream; +use std::old_io::Timer; +use std::old_io::IoResult; +use std::old_io::IoError; +use std::old_io::IoErrorKind::{ConnectionFailed}; +use std::old_io::net::tcp::TcpStream; use connection::Connection; use subscription::AckMode; use subscription::AckMode::{Auto, Client, ClientIndividual}; diff --git a/src/stomp.rs b/src/stomp.rs index 2595da4..aecc30a 100644 --- a/src/stomp.rs +++ b/src/stomp.rs @@ -1,13 +1,16 @@ #![crate_name = "stomp"] #![crate_type = "lib"] -#![allow(unstable)] +#![feature(collections)] +#![feature(core)] +#![feature(std_misc)] +#![feature(io)] #[macro_use] extern crate log; extern crate collections; -use std::io::IoResult; +use std::old_io::IoResult; use session::Session; use connection::Connection; diff --git a/src/transaction.rs b/src/transaction.rs index a45e32b..11a551d 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -1,5 +1,5 @@ use frame::Frame; -use std::io::IoResult; +use std::old_io::IoResult; use header::Header; use session::Session; From 9fff32ea7b7fa757dca0ed4d1da718e5186bae39 Mon Sep 17 00:00:00 2001 From: Zack Slayton Date: Thu, 29 Jan 2015 22:54:51 -0500 Subject: [PATCH 3/3] Version bump. --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index f03ea10..6c9ff9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "stomp" -version = "0.5.0" +version = "0.5.1" authors = [ "zslayton