Skip to content

Commit

Permalink
Merge pull request #59 from zslayton/io_result
Browse files Browse the repository at this point in the history
io -> old_io, send/receive use IoResult
  • Loading branch information
zslayton committed Jan 30, 2015
2 parents 6b8ad4d + 9fff32e commit 4c9813b
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 41 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]

name = "stomp"
version = "0.5.0"
version = "0.5.1"
authors = [ "zslayton <[email protected]" ]
repository = "https://github.com/zslayton/stomp-rs"
readme = "README.md"
Expand All @@ -11,3 +11,6 @@ license = "MIT"

[lib]
name = "stomp"

[dependencies.log]
log = "*"
12 changes: 6 additions & 6 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::io::net::tcp::TcpStream;
use std::io::BufferedReader;
use std::io::BufferedWriter;
use std::old_io::net::tcp::TcpStream;
use std::old_io::BufferedReader;
use std::old_io::BufferedWriter;
use frame::Transmission::{HeartBeat, CompleteFrame};
use std::io::IoResult;
use std::io::IoError;
use std::io::InvalidInput;
use std::old_io::IoResult;
use std::old_io::IoError;
use std::old_io::InvalidInput;
use std::str::from_utf8;
use frame::Frame;
use session::Session;
Expand Down
14 changes: 7 additions & 7 deletions src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use header::Header;
use header::ContentLength;
use header::StompHeaderSet;
use subscription::AckMode;
use std::io::IoResult;
use std::io::IoError;
use std::io::InvalidInput;
use std::io::BufferedReader;
use std::io::BufferedWriter;
use std::old_io::IoResult;
use std::old_io::IoError;
use std::old_io::InvalidInput;
use std::old_io::BufferedReader;
use std::old_io::BufferedWriter;
use std::str::from_utf8;

use std::fmt;
Expand Down Expand Up @@ -74,8 +74,8 @@ impl Frame {
try!(stream.write_str("\n"));
}
try!(stream.write_str("\n"));
try!(stream.write(self.body.as_slice()));
try!(stream.write(&[0]));
try!(stream.write_all(self.body.as_slice()));
try!(stream.write_all(&[0]));
try!(stream.flush());
debug!("write() complete.");
Ok(())
Expand Down
54 changes: 35 additions & 19 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use std::thread::Thread;
use std::collections::hash_set::HashSet;
use std::collections::hash_map::HashMap;
use std::time::Duration;
use std::io::BufferedReader;
use std::io::BufferedWriter;
use std::old_io::BufferedReader;
use std::old_io::BufferedWriter;
use std::sync::mpsc::{Sender, Receiver, channel};
use std::io::Timer;
use std::io::IoResult;
use std::io::net::tcp::TcpStream;
use std::old_io::Timer;
use std::old_io::IoResult;
use std::old_io::IoError;
use std::old_io::IoErrorKind::{ConnectionFailed};
use std::old_io::net::tcp::TcpStream;
use connection::Connection;
use subscription::AckMode;
use subscription::AckMode::{Auto, Client, ClientIndividual};
Expand Down Expand Up @@ -205,11 +207,11 @@ impl <'a> Session <'a> {

pub fn send_bytes(&mut self, topic: &str, mime_type: &str, body: &[u8]) -> IoResult<()> {
let send_frame = Frame::send(topic, mime_type, body);
Ok(self.send(send_frame))
self.send(send_frame)
}

pub fn send_text_with_receipt(&mut self, topic: &str, body: &str) -> IoResult<()> {
Ok(try!(self.send_bytes_with_receipt(topic, "text/plain", body.as_bytes())))
self.send_bytes_with_receipt(topic, "text/plain", body.as_bytes())
}

pub fn send_bytes_with_receipt(&mut self, topic: &str, mime_type: &str, body: &[u8]) -> IoResult<()> {
Expand All @@ -218,7 +220,7 @@ impl <'a> Session <'a> {
send_frame.headers.push(
Header::encode_key_value("receipt", receipt_id.as_slice())
);
self.send(send_frame);
try!(self.send(send_frame));
self.outstanding_receipts.insert(receipt_id);
Ok(())
}
Expand All @@ -229,7 +231,7 @@ impl <'a> Session <'a> {
let sub = Subscription::new(next_id, topic, ack_mode, message_handler);
let subscribe_frame = Frame::subscribe(sub.id.as_slice(), sub.topic.as_slice(), ack_mode);
debug!("Sending frame:\n{}", subscribe_frame);
self.send(subscribe_frame);
try!(self.send(subscribe_frame));
debug!("Registering callback for subscription id: {}", sub.id);
let id_to_return = sub.id.to_string();
self.subscriptions.insert(sub.id.to_string(), sub);
Expand All @@ -239,12 +241,12 @@ impl <'a> Session <'a> {
pub fn unsubscribe(&mut self, sub_id: &str) -> IoResult<()> {
let _ = self.subscriptions.remove(sub_id);
let unsubscribe_frame = Frame::unsubscribe(sub_id.as_slice());
Ok(self.send(unsubscribe_frame))
self.send(unsubscribe_frame)
}

pub fn disconnect(&mut self) -> IoResult<()> {
let disconnect_frame = Frame::disconnect();
Ok(self.send(disconnect_frame))
self.send(disconnect_frame)
}

pub fn begin_transaction<'b>(&'b mut self) -> IoResult<Transaction<'b, 'a>> {
Expand All @@ -253,12 +255,26 @@ impl <'a> Session <'a> {
Ok(tx)
}

pub fn send(&self, frame: Frame) {
let _ = self.sender.send(frame);
pub fn send(&self, frame: Frame) -> IoResult<()> {
match self.sender.send(frame) {
Ok(_) => Ok(()),
Err(_) => Err(IoError {
kind: ConnectionFailed,
desc: "The connection to the server was lost.",
detail: None
})
}
}

pub fn receive(&self) -> Frame {
self.receiver.recv().unwrap()
pub fn receive(&self) -> IoResult<Frame> {
match self.receiver.recv() {
Ok(frame) => Ok(frame),
Err(_) => Err(IoError {
kind: ConnectionFailed,
desc: "Could not receive frame: the connection to the server was lost.",
detail: None
})
}
}

pub fn dispatch(&mut self, frame: Frame) {
Expand Down Expand Up @@ -313,17 +329,17 @@ impl <'a> Session <'a> {

fn acknowledge_frame(&mut self, ack_id: &str) -> IoResult<()> {
let ack_frame = Frame::ack(ack_id);
Ok(self.send(ack_frame))
self.send(ack_frame)
}

fn negatively_acknowledge_frame(&mut self, ack_id: &str) -> IoResult<()>{
let nack_frame = Frame::nack(ack_id);
Ok(self.send(nack_frame))
self.send(nack_frame)
}

pub fn listen(&mut self) {
pub fn listen(&mut self) -> IoResult<()> {
loop {
let frame = self.receive();
let frame = try!(self.receive());
debug!("Received '{}' frame, dispatching.", frame.command);
self.dispatch(frame)
}
Expand Down
7 changes: 5 additions & 2 deletions src/stomp.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#![crate_name = "stomp"]
#![crate_type = "lib"]

#![allow(unstable)]
#![feature(collections)]
#![feature(core)]
#![feature(std_misc)]
#![feature(io)]

#[macro_use]
extern crate log;
extern crate collections;

use std::io::IoResult;
use std::old_io::IoResult;
use session::Session;
use connection::Connection;

Expand Down
12 changes: 6 additions & 6 deletions src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use frame::Frame;
use std::io::IoResult;
use std::old_io::IoResult;
use header::Header;
use session::Session;

Expand All @@ -21,25 +21,25 @@ impl <'a, 'b> Transaction<'a, 'b> {
send_frame.headers.push(
Header::encode_key_value("transaction", self.id.as_slice())
);
Ok(self.session.send(send_frame))
self.session.send(send_frame)
}

pub fn send_text(&mut self, topic: &str, body: &str) -> IoResult<()> {
Ok(try!(self.send_bytes(topic, "text/plain", body.as_bytes())))
self.send_bytes(topic, "text/plain", body.as_bytes())
}

pub fn begin(&mut self) -> IoResult<()> {
let begin_frame = Frame::begin(self.id.as_slice());
Ok(self.session.send(begin_frame))
self.session.send(begin_frame)
}

pub fn commit(&mut self) -> IoResult<()> {
let commit_frame = Frame::commit(self.id.as_slice());
Ok(self.session.send(commit_frame))
self.session.send(commit_frame)
}

pub fn abort(&mut self) -> IoResult<()> {
let abort_frame = Frame::abort(self.id.as_slice());
Ok(self.session.send(abort_frame))
self.session.send(abort_frame)
}
}

0 comments on commit 4c9813b

Please sign in to comment.