From 04dad4acb695057f9dee8320aeff64ddff7f78c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Thu, 26 Apr 2018 10:02:17 +0200 Subject: [PATCH] [api] use shared communicator for Rust client, add session close --- src/bin.rs | 2 +- src/client/client.rs | 56 ++++++------------------------- src/client/communicator.rs | 69 ++++++++++++++++++++++++++++++++++++++ src/client/mod.rs | 2 ++ src/client/session.rs | 22 +++++++++++- src/server/rpc/client.rs | 5 +-- 6 files changed, 107 insertions(+), 49 deletions(-) create mode 100644 src/client/communicator.rs diff --git a/src/bin.rs b/src/bin.rs index 0a9e331..a521765 100644 --- a/src/bin.rs +++ b/src/bin.rs @@ -402,7 +402,7 @@ fn stop_server(_global_args: &ArgMatches, cmd_args: &ArgMatches) { } let scheduler: SocketAddr = resolve_server_address(&address); - let mut client = Client::new(&scheduler).unwrap_or_else(|err| { + let mut client = Client::new(scheduler).unwrap_or_else(|err| { error!("Couldn't connect to server at {}: {}", address, err); exit(1); }); diff --git a/src/client/client.rs b/src/client/client.rs index 45bb1ff..976adc8 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -1,62 +1,28 @@ -use capnp::capability::Promise; -use capnp_rpc::rpc_twoparty_capnp; -use common::rpc::new_rpc_system; -use futures::Future; use std::error::Error; use std::net::SocketAddr; -use tokio_core::net::TcpStream; -use tokio_core::reactor::Core; use CLIENT_PROTOCOL_VERSION; +use common::wrapped::WrappedRcRefCell; use super::session::Session; +use super::communicator::Communicator; pub struct Client { - core: Core, - service: ::client_capnp::client_service::Client, + comm: WrappedRcRefCell, } impl Client { - pub fn new(scheduler: &SocketAddr) -> Result> { - let mut core = Core::new()?; - let handle = core.handle(); - let stream = core.run(TcpStream::connect(&scheduler, &handle))?; - stream.set_nodelay(true)?; + pub fn new(scheduler: SocketAddr) -> Result> { + let comm = WrappedRcRefCell::wrap(Communicator::new(scheduler, CLIENT_PROTOCOL_VERSION)?); - debug!("Connection to server {} established", scheduler); - - let mut rpc = Box::new(new_rpc_system(stream, None)); - let bootstrap: ::server_capnp::server_bootstrap::Client = - rpc.bootstrap(rpc_twoparty_capnp::Side::Server); - handle.spawn(rpc.map_err(|err| panic!("RPC error: {}", err))); - - let mut request = bootstrap.register_as_client_request(); - request.get().set_version(CLIENT_PROTOCOL_VERSION); - - let service = core.run( - request - .send() - .promise - .and_then(|response| Promise::ok(pry!(response.get()).get_service())), - )??; - - Ok(Client { core, service }) + Ok(Client { comm }) } - pub fn new_session(&mut self) -> Result> { - let id: i32 = self.core.run( - self.service - .new_session_request() - .send() - .promise - .and_then(|response| Promise::ok(pry!(response.get()).get_session_id())), - )?; - - Ok(Session { id }) + pub fn new_session(&self) -> Result> { + let session_id = self.comm.get_mut().new_session()?; + Ok(Session::new(session_id, self.comm.clone())) } - pub fn terminate_server(&mut self) -> Result<(), Box> { - self.core - .run(self.service.terminate_server_request().send().promise)?; - Ok(()) + pub fn terminate_server(&self) -> Result<(), Box> { + self.comm.get_mut().terminate_server() } } diff --git a/src/client/communicator.rs b/src/client/communicator.rs new file mode 100644 index 0000000..1408998 --- /dev/null +++ b/src/client/communicator.rs @@ -0,0 +1,69 @@ +use tokio_core::reactor::Core; +use std::net::SocketAddr; +use tokio_core::net::TcpStream; +use std::error::Error; +use common::rpc::new_rpc_system; +use capnp::capability::Promise; +use capnp_rpc::rpc_twoparty_capnp; +use futures::Future; + +pub struct Communicator { + core: Core, + service: ::client_capnp::client_service::Client, +} + +impl Communicator { + pub fn new(scheduler: SocketAddr, version: i32) -> Result> { + let mut core = Core::new()?; + let handle = core.handle(); + let stream = core.run(TcpStream::connect(&scheduler, &handle))?; + stream.set_nodelay(true)?; + + debug!("Connection to server {} established", scheduler); + + let mut rpc = Box::new(new_rpc_system(stream, None)); + let bootstrap: ::server_capnp::server_bootstrap::Client = + rpc.bootstrap(rpc_twoparty_capnp::Side::Server); + handle.spawn(rpc.map_err(|err| panic!("RPC error: {}", err))); + + let mut request = bootstrap.register_as_client_request(); + request.get().set_version(version); + + let service = core.run( + request + .send() + .promise + .and_then(|response| Promise::ok(pry!(response.get()).get_service())), + )??; + + Ok(Self { core, service }) + } + + pub fn new_session(&mut self) -> Result> { + let id: i32 = self.core.run( + self.service + .new_session_request() + .send() + .promise + .and_then(|response| Promise::ok(pry!(response.get()).get_session_id())), + )?; + + Ok(id) + } + + pub fn close_session(&mut self, id: i32) -> Result> { + self.core.run({ + let mut req = self.service.close_session_request(); + req.get().set_session_id(id); + req.send().promise + })?; + + Ok(true) + } + + pub fn terminate_server(&mut self) -> Result<(), Box> { + self.core + .run(self.service.terminate_server_request().send().promise)?; + Ok(()) + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs index cf34e8d..00801c5 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,2 +1,4 @@ pub mod client; pub mod session; + +mod communicator; diff --git a/src/client/session.rs b/src/client/session.rs index 1bc8dab..fcb3ad1 100644 --- a/src/client/session.rs +++ b/src/client/session.rs @@ -1,3 +1,23 @@ +use common::wrapped::WrappedRcRefCell; + +use super::communicator::Communicator; + pub struct Session { - pub id: i32, + id: i32, + comm: WrappedRcRefCell, +} + +impl Session { + pub fn new(id: i32, comm: WrappedRcRefCell) -> Self { + debug!("Session {} created", id); + + Session { id, comm } + } +} + +impl Drop for Session { + fn drop(&mut self) { + self.comm.get_mut().close_session(self.id).unwrap(); + debug!("Session {} destroyed", self.id); + } } diff --git a/src/server/rpc/client.rs b/src/server/rpc/client.rs index 47179a8..9f3a421 100644 --- a/src/server/rpc/client.rs +++ b/src/server/rpc/client.rs @@ -521,10 +521,11 @@ impl client_service::Server for ClientServiceImpl { Promise::ok(()) } + #[allow(unreachable_code)] fn terminate_server( &mut self, - params: client_service::TerminateServerParams, - results: client_service::TerminateServerResults, + _params: client_service::TerminateServerParams, + _results: client_service::TerminateServerResults, ) -> Promise<(), ::capnp::Error> { exit(0); Promise::ok(())