Skip to content

Commit

Permalink
[api] use shared communicator for Rust client, add session close
Browse files Browse the repository at this point in the history
  • Loading branch information
Kobzol committed Apr 26, 2018
1 parent 4a12e7c commit 04dad4a
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 49 deletions.
2 changes: 1 addition & 1 deletion src/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ fn stop_server(_global_args: &ArgMatches, cmd_args: &ArgMatches) {
}

let scheduler: SocketAddr = resolve_server_address(&address);
let mut client = Client::new(&scheduler).unwrap_or_else(|err| {
let mut client = Client::new(scheduler).unwrap_or_else(|err| {
error!("Couldn't connect to server at {}: {}", address, err);
exit(1);
});
Expand Down
56 changes: 11 additions & 45 deletions src/client/client.rs
Original file line number Diff line number Diff line change
@@ -1,62 +1,28 @@
use capnp::capability::Promise;
use capnp_rpc::rpc_twoparty_capnp;
use common::rpc::new_rpc_system;
use futures::Future;
use std::error::Error;
use std::net::SocketAddr;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;

use CLIENT_PROTOCOL_VERSION;
use common::wrapped::WrappedRcRefCell;
use super::session::Session;
use super::communicator::Communicator;

pub struct Client {
core: Core,
service: ::client_capnp::client_service::Client,
comm: WrappedRcRefCell<Communicator>,
}

impl Client {
pub fn new(scheduler: &SocketAddr) -> Result<Self, Box<Error>> {
let mut core = Core::new()?;
let handle = core.handle();
let stream = core.run(TcpStream::connect(&scheduler, &handle))?;
stream.set_nodelay(true)?;
pub fn new(scheduler: SocketAddr) -> Result<Self, Box<Error>> {
let comm = WrappedRcRefCell::wrap(Communicator::new(scheduler, CLIENT_PROTOCOL_VERSION)?);

debug!("Connection to server {} established", scheduler);

let mut rpc = Box::new(new_rpc_system(stream, None));
let bootstrap: ::server_capnp::server_bootstrap::Client =
rpc.bootstrap(rpc_twoparty_capnp::Side::Server);
handle.spawn(rpc.map_err(|err| panic!("RPC error: {}", err)));

let mut request = bootstrap.register_as_client_request();
request.get().set_version(CLIENT_PROTOCOL_VERSION);

let service = core.run(
request
.send()
.promise
.and_then(|response| Promise::ok(pry!(response.get()).get_service())),
)??;

Ok(Client { core, service })
Ok(Client { comm })
}

pub fn new_session(&mut self) -> Result<Session, Box<Error>> {
let id: i32 = self.core.run(
self.service
.new_session_request()
.send()
.promise
.and_then(|response| Promise::ok(pry!(response.get()).get_session_id())),
)?;

Ok(Session { id })
pub fn new_session(&self) -> Result<Session, Box<Error>> {
let session_id = self.comm.get_mut().new_session()?;
Ok(Session::new(session_id, self.comm.clone()))
}

pub fn terminate_server(&mut self) -> Result<(), Box<Error>> {
self.core
.run(self.service.terminate_server_request().send().promise)?;
Ok(())
pub fn terminate_server(&self) -> Result<(), Box<Error>> {
self.comm.get_mut().terminate_server()
}
}
69 changes: 69 additions & 0 deletions src/client/communicator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use tokio_core::reactor::Core;
use std::net::SocketAddr;
use tokio_core::net::TcpStream;
use std::error::Error;
use common::rpc::new_rpc_system;
use capnp::capability::Promise;
use capnp_rpc::rpc_twoparty_capnp;
use futures::Future;

pub struct Communicator {
core: Core,
service: ::client_capnp::client_service::Client,
}

impl Communicator {
pub fn new(scheduler: SocketAddr, version: i32) -> Result<Self, Box<Error>> {
let mut core = Core::new()?;
let handle = core.handle();
let stream = core.run(TcpStream::connect(&scheduler, &handle))?;
stream.set_nodelay(true)?;

debug!("Connection to server {} established", scheduler);

let mut rpc = Box::new(new_rpc_system(stream, None));
let bootstrap: ::server_capnp::server_bootstrap::Client =
rpc.bootstrap(rpc_twoparty_capnp::Side::Server);
handle.spawn(rpc.map_err(|err| panic!("RPC error: {}", err)));

let mut request = bootstrap.register_as_client_request();
request.get().set_version(version);

let service = core.run(
request
.send()
.promise
.and_then(|response| Promise::ok(pry!(response.get()).get_service())),
)??;

Ok(Self { core, service })
}

pub fn new_session(&mut self) -> Result<i32, Box<Error>> {
let id: i32 = self.core.run(
self.service
.new_session_request()
.send()
.promise
.and_then(|response| Promise::ok(pry!(response.get()).get_session_id())),
)?;

Ok(id)
}

pub fn close_session(&mut self, id: i32) -> Result<bool, Box<Error>> {
self.core.run({
let mut req = self.service.close_session_request();
req.get().set_session_id(id);
req.send().promise
})?;

Ok(true)
}

pub fn terminate_server(&mut self) -> Result<(), Box<Error>> {
self.core
.run(self.service.terminate_server_request().send().promise)?;
Ok(())
}
}
2 changes: 2 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
pub mod client;
pub mod session;

mod communicator;
22 changes: 21 additions & 1 deletion src/client/session.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
use common::wrapped::WrappedRcRefCell;

use super::communicator::Communicator;

pub struct Session {
pub id: i32,
id: i32,
comm: WrappedRcRefCell<Communicator>,
}

impl Session {
pub fn new(id: i32, comm: WrappedRcRefCell<Communicator>) -> Self {
debug!("Session {} created", id);

Session { id, comm }
}
}

impl Drop for Session {
fn drop(&mut self) {
self.comm.get_mut().close_session(self.id).unwrap();
debug!("Session {} destroyed", self.id);
}
}
5 changes: 3 additions & 2 deletions src/server/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,10 +521,11 @@ impl client_service::Server for ClientServiceImpl {
Promise::ok(())
}

#[allow(unreachable_code)]
fn terminate_server(
&mut self,
params: client_service::TerminateServerParams,
results: client_service::TerminateServerResults,
_params: client_service::TerminateServerParams,
_results: client_service::TerminateServerResults,
) -> Promise<(), ::capnp::Error> {
exit(0);
Promise::ok(())
Expand Down

0 comments on commit 04dad4a

Please sign in to comment.