Releases: asonix/tokio-zmq
Pinned dependencies?
Bump dependencies?
This release sees a cleaning of Tokio ZMQ's dependencies. It now relies on sub-crates of Futures and of Tokio, so it doesn't pull more than it needs to. I also bumped tokio-file-unix, which recently released full tokio-support, so it doesn't pull tokio-core anymore.
There will probably be another beta release before 0.4 hits. I'm waiting on tokio to fully adopt futures2 support, so I can drop my own fork of tokio-timer. And since I messed up the versioning for tokio-zmq-derive already, the first version of 0.4 might be 0.4.1 or 0.4.2. We'll just have to see.
The 0.4.0 beta!
Wow! A lot has happened!
Changes:
- Tokio ZMQ doesn't use Rc<> in the sockets anymore. Instead, they get passed around where they're needed.
.send()
,.recv()
,.stream()
, and.sink()
now consume the calling socket, meaning if you want to use aStream
and aSink
, you need to use the newly introduced.sink_stream()
and then call.split()
on it. WARNING: if you try to spawn a future with one part of the sink-stream, but not the other, you will experience a Panic. This is because the underlying datastructure used to represent the sink and stream isSend
but notSync
. Sending part of the sink-stream without the other opens the possibility of simultaneous access from multiple threads. See this example for a way to avoid doing this.- Tokio ZMQ now uses Tokio 0.1.4 and Futures 0.2.0-beta
Proper Errors!
This crate's Error
type now implements std::error::Error
in order to be better compatible with other error-handling libraries. This change is much needed, since the adoption of crates like failure depend on errors at least implementing StdError
.
In the future, this library may be updated to implement Fail
instead of implementing StdError
directly.
Version 0.3.0! Already!
This release sees the introduction of the TimeoutStream<S: Stream<Error = Error>>
struct. This type is particularly useful since you can wrap a MultipartStream
, a ControlledStream
, or an EndingStream
in a TimeoutStream
and receive an Either<A: S::Item, B: Timeout>
when the stream is ready. This means that you either get a Timeout notification, or you get the value the stream was intending to produce.
tokio_zmq::TimeoutStream
differs from tokio_timer::TimeoutStream
in that tokio_timer
's stream will error on timeout, while tokio_zmq
's stream will produce an Either
.
Example with TimeoutStream
#![feature(try_from)]
extern crate futures;
extern crate tokio_core;
extern crate tokio_zmq;
extern crate zmq;
use std::rc::Rc;
use std::convert::TryInto;
use std::time::Duration;
use futures::future::Either;
use futures::Stream;
use tokio_core::reactor::Core;
use tokio_zmq::prelude::*;
use tokio_zmq::{Socket, Sub};
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let ctx = Rc::new(zmq::Context::new());
let sub: Sub = Socket::builder(ctx, &handle)
.connect("tcp://localhost:5556")
.filter(b"")
.try_into()
.unwrap();
let consumer = sub
.stream()
.timeout(Duration::from_secs(30))
.filter(|either| {
Either::A(multipart) => Some(multipart),
Either::B(_) => {
println!("Operation timed out");
None
}
})
.for_each(|multipart| {
for msg in multipart {
if let Some(msg) = msg.as_str() {
println!("Received: {}", msg);
}
}
Ok(())
});
core.run(consumer).unwrap();
}
API Changes
- Introduce streams with timeouts
- Rename
Socket::create
toSocket::builder
to reflect that it returns aSocketBuilder
. - Do builder notation for
MultipartStreams
for control, ending, and timers. Remove the separate Controlled variants fromsrc/socket/types.rs
. - Remove
AsControlledSocket
,ControlledStreamSocket
,ControlledSinkSocket
, andIntoControlledSocket
. - Remove Controlled macros.
- Remove
Option<EndHandler>
fromMultipartStream
. - Remove the
DefaultHandler
concept, sinceEndHandler
s are only in the type if they're in use. - Remove
stream_with_end(end_handler: EndHandler)
in favor ofstream().with_end(end_handler: EndHandler)
. - Remove
ControlledSocket
socket type in favor ofsocket.stream().controlled(control: impl StreamSocket, handler: impl ControlHandler)
. - Update all examples to work with changes.
Links
Version 0.2.0
This version doesn't have a whole lot of difference from 0.1.3, other than it should be completely working
now. Some socket types were fine with read and write methods intermingling need_read
and need_write
, but some socket types (namely router) really didn't like that.
API Changes:
Req
now implementsStreamSocket
andSinkSocket
instead ofFutureSocket
. All the same functionality is provided in addition toReq
now having a controlled variant.- The
FutureSocket
trait no longer exists SockConfigStart
has been renamedSocketBuilder
- To build sockets, the call is now
Socket::create
instead ofSocket::new
SocketBuilder::new
now accepts a reference to atokio_core::reactor::Handle
instead of the handle itself.SocketBuilder
now has anidentity
method to set the socket's identity.PairConfigStart
has been removed in favour ofSocketBuilder::pair(self, addr: &str, bind: bool)
Multipart
is it's own type now, wrapping VecDeque.Multipart
implementsFrom<zmq::Message>
andFrom<Vec<zmq::Message>>
Other changes:
- Added example for load-balancing
req
workers andreq
clients withrouters
Drive! Drive! Derive!
This release sees no noticeable changes to the API (unless you really cared about the ControlledStreamSocket having an H: ControlHandler
type, because now that's specified in the call to .stream()
), although some of the re-exports were moved around. If you were manually importing *Controlled sockets, you'll now need to get them from tokio_zmq::socket::types
.
We derive the traits in tokio_zmq::prelude
for all the types in tokio_zmq::socket::types
now, and the derivation code is nested within this crate as the tokio-zmq-derive crate. I should probably add a readme and documentation for that, but it shouldn't be used outside the context of tokio-zmq.
Stop your Streams
The v0.1.2 release comes with a new trait called EndHandler
, which can be used to handle End messages that are sent in the same socket that data is sent through.
Here's an example pulled from the documentation. This stream stops producing values after 10 multiparts have been received.
#![feature(try_from)]
extern crate zmq;
extern crate futures;
extern crate tokio_core;
extern crate tokio_zmq;
use std::rc::Rc;
use std::convert::TryInto;
use futures::Stream;
use tokio_core::reactor::Core;
use tokio_zmq::prelude::*;
use tokio_zmq::async::Multipart;
use tokio_zmq::{Socket, Sub};
struct Stop {
count: usize,
}
impl Stop {
fn new() -> Self {
Stop {
count: 0,
}
}
}
impl EndHandler for Stop {
fn should_stop(&mut self, _: &Multipart) -> bool {
if self.count < 10 {
self.count += 1;
false
} else {
true
}
}
}
fn main() {
let core = Core::new().unwrap();
let context = Rc::new(zmq::Context::new());
let sub: Sub = Socket::new(context, core.handle())
.connect("tcp://localhost:5569")
.filter(b"")
.try_into()
.unwrap();
let fut = sub.stream_with_end(Stop::new()).for_each(|multipart| {
for msg in multipart {
if let Some(msg) = msg.as_str() {
println!("Message: {}", msg);
}
}
Ok(())
});
core.run(fut).unwrap();
}
Dealers, Routers, Restructures, oh my!
To clean the documentation, this release moves all the socket wrapper types into a single file. It also moves to make MultipartSink
and MultipartStream
use the zmq socket logic from MultipartRequest
and MultipartResponse
from the async::future
module. This simplifies the library and ensures any change to sending or receiving zmq messages affects streams and futures.
This release also brings the Dealer and Router socket types, to allow creation of brokers on the Tokio event loop.