diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b4ad431..8e72e8c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.7.0 + +* async-await support [#229](https://github.com/softprops/shiplift/pull/229) + # 0.6.0 * add chrono as an optional feature, enabled by default [#190](https://github.com/softprops/shiplift/pull/190) diff --git a/Cargo.toml b/Cargo.toml index c85ed4e2..2dad6ce1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,28 +18,31 @@ coveralls = { repository = "softprops/shipflit" } maintenance = { status = "actively-developed" } [dependencies] -log = "0.4" -mime = "0.3" base64 = "0.11" byteorder = "1.3" bytes = "0.4" chrono = { version = "0.4", optional = true, features = ["serde"] } flate2 = "1.0" -futures = "0.1" -hyper = "0.12" -hyper-openssl = { version = "0.7", optional = true } -hyperlocal = { version = "0.6", optional = true } +futures-util = "0.3" +futures_codec = "0.3" +hyper = "0.13" +hyper-openssl = { version = "0.8", optional = true } +hyperlocal = { version = "0.7", optional = true } +log = "0.4" +mime = "0.3" openssl = { version = "0.10", optional = true } +pin-project = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tar = "0.4" -tokio = "0.1" -tokio-codec = "0.1" -tokio-io = "0.1" +tokio = "0.2" url = "2.1" [dev-dependencies] env_logger = "0.7" +# Required for examples to run +futures = "0.3.1" +tokio = { version = "0.2.6", features = ["macros"] } [features] default = ["chrono", "unix-socket", "tls"] diff --git a/examples/attach.rs b/examples/attach.rs new file mode 100644 index 00000000..e4ed6378 --- /dev/null +++ b/examples/attach.rs @@ -0,0 +1,32 @@ +use futures::StreamExt; +use shiplift::{tty::TtyChunk, Docker}; +use std::env; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let docker = Docker::new(); + let id = env::args() + .nth(1) + .expect("You need to specify a container id"); + + let tty_multiplexer = docker.containers().get(&id).attach().await?; + + let (mut reader, _writer) = tty_multiplexer.split(); + + while let Some(tty_result) = reader.next().await { + match tty_result { + Ok(chunk) => print_chunk(chunk), + Err(e) => eprintln!("Error: {}", e), + } + } + + Ok(()) +} + +fn print_chunk(chunk: TtyChunk) { + match chunk { + TtyChunk::StdOut(bytes) => println!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()), + TtyChunk::StdErr(bytes) => eprintln!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()), + TtyChunk::StdIn(_) => unreachable!(), + } +} diff --git a/examples/containercopyfrom.rs b/examples/containercopyfrom.rs index 2ebeccfc..acbfa19f 100644 --- a/examples/containercopyfrom.rs +++ b/examples/containercopyfrom.rs @@ -1,8 +1,10 @@ +use futures::TryStreamExt; use shiplift::Docker; use std::{env, path}; -use tokio::prelude::{Future, Stream}; +use tar::Archive; -fn main() { +#[tokio::main] +async fn main() -> Result<(), Box> { let docker = Docker::new(); let id = env::args() .nth(1) @@ -10,17 +12,16 @@ fn main() { let path = env::args() .nth(2) .expect("Usage: cargo run --example containercopyfrom -- "); - let fut = docker + + let bytes = docker .containers() .get(&id) .copy_from(path::Path::new(&path)) - .collect() - .and_then(|stream| { - let tar = stream.concat(); - let mut archive = tar::Archive::new(tar.as_slice()); - archive.unpack(env::current_dir()?)?; - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + .try_concat() + .await?; + + let mut archive = Archive::new(&bytes[..]); + archive.unpack(env::current_dir()?)?; + + Ok(()) } diff --git a/examples/containercopyinto.rs b/examples/containercopyinto.rs index 63f0a2d9..689e7afc 100644 --- a/examples/containercopyinto.rs +++ b/examples/containercopyinto.rs @@ -1,8 +1,8 @@ use shiplift::Docker; -use std::env; -use tokio::prelude::Future; +use std::{env, fs::File, io::Read}; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let path = env::args() .nth(1) @@ -11,17 +11,17 @@ fn main() { .nth(2) .expect("Usage: cargo run --example containercopyinto -- "); - use std::{fs::File, io::prelude::*}; - let mut file = File::open(&path).unwrap(); let mut bytes = Vec::new(); file.read_to_end(&mut bytes) .expect("Cannot read file on the localhost."); - let fut = docker + if let Err(e) = docker .containers() .get(&id) - .copy_file_into(path, &bytes[..]) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + .copy_file_into(path, &bytes) + .await + { + eprintln!("Error: {}", e) + } } diff --git a/examples/containercreate.rs b/examples/containercreate.rs index d061f702..ef579a63 100644 --- a/examples/containercreate.rs +++ b/examples/containercreate.rs @@ -1,16 +1,19 @@ use shiplift::{ContainerOptions, Docker}; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let image = env::args() .nth(1) .expect("You need to specify an image name"); - let fut = docker + + match docker .containers() .create(&ContainerOptions::builder(image.as_ref()).build()) - .map(|info| println!("{:?}", info)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + .await + { + Ok(info) => println!("{:?}", info), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/containerdelete.rs b/examples/containerdelete.rs index e3c20368..86bd9c59 100644 --- a/examples/containerdelete.rs +++ b/examples/containerdelete.rs @@ -1,16 +1,14 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) .expect("You need to specify an container id"); - let fut = docker - .containers() - .get(&id) - .delete() - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + + if let Err(e) = docker.containers().get(&id).delete().await { + eprintln!("Error: {}", e) + } } diff --git a/examples/containerexec.rs b/examples/containerexec.rs index 7f12f884..6c545a7e 100644 --- a/examples/containerexec.rs +++ b/examples/containerexec.rs @@ -1,8 +1,9 @@ -use shiplift::{tty::StreamType, Docker, ExecContainerOptions}; -use std::env; -use tokio::prelude::{Future, Stream}; +use futures::StreamExt; +use shiplift::{tty::TtyChunk, Docker, ExecContainerOptions}; +use std::{env, str::from_utf8}; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) @@ -18,19 +19,19 @@ fn main() { .attach_stdout(true) .attach_stderr(true) .build(); - let fut = docker - .containers() - .get(&id) - .exec(&options) - .for_each(|chunk| { - match chunk.stream_type { - StreamType::StdOut => println!("Stdout: {}", chunk.as_string_lossy()), - StreamType::StdErr => eprintln!("Stderr: {}", chunk.as_string_lossy()), - StreamType::StdIn => unreachable!(), - } - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + while let Some(exec_result) = docker.containers().get(&id).exec(&options).next().await { + match exec_result { + Ok(chunk) => print_chunk(chunk), + Err(e) => eprintln!("Error: {}", e), + } + } +} + +fn print_chunk(chunk: TtyChunk) { + match chunk { + TtyChunk::StdOut(bytes) => println!("Stdout: {}", from_utf8(&bytes).unwrap()), + TtyChunk::StdErr(bytes) => eprintln!("Stdout: {}", from_utf8(&bytes).unwrap()), + TtyChunk::StdIn(_) => unreachable!(), + } } diff --git a/examples/containerinspect.rs b/examples/containerinspect.rs index 0f853bc6..8de2a672 100644 --- a/examples/containerinspect.rs +++ b/examples/containerinspect.rs @@ -1,17 +1,15 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) .expect("Usage: cargo run --example containerinspect -- "); - let fut = docker - .containers() - .get(&id) - .inspect() - .map(|container| println!("{:#?}", container)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + + match docker.containers().get(&id).inspect().await { + Ok(container) => println!("{:#?}", container), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/containers.rs b/examples/containers.rs index 0eb51af5..72de1408 100644 --- a/examples/containers.rs +++ b/examples/containers.rs @@ -1,18 +1,15 @@ use shiplift::Docker; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { env_logger::init(); let docker = Docker::new(); - let fut = docker - .containers() - .list(&Default::default()) - .map(|containers| { + match docker.containers().list(&Default::default()).await { + Ok(containers) => { for c in containers { println!("container -> {:#?}", c) } - }) - .map_err(|e| eprintln!("Error: {}", e)); - - tokio::run(fut); + } + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/custom_host.rs b/examples/custom_host.rs index 3b1fd3e2..8b06eae0 100644 --- a/examples/custom_host.rs +++ b/examples/custom_host.rs @@ -1,13 +1,10 @@ -use futures::Future; use shiplift::Docker; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::host("http://yourhost".parse().unwrap()); - - let fut = docker - .ping() - .map(|pong| println!("Ping: {}", pong)) - .map_err(|e| eprintln!("Error: {}", e)); - - tokio::run(fut); + match docker.ping().await { + Ok(pong) => println!("Ping: {}", pong), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/events.rs b/examples/events.rs index 0e35860c..e44d68f7 100644 --- a/examples/events.rs +++ b/examples/events.rs @@ -1,16 +1,15 @@ +use futures::StreamExt; use shiplift::Docker; -use tokio::prelude::{Future, Stream}; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); println!("listening for events"); - let fut = docker - .events(&Default::default()) - .for_each(|e| { - println!("event -> {:?}", e); - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + while let Some(event_result) = docker.events(&Default::default()).next().await { + match event_result { + Ok(event) => println!("event -> {:?}", event), + Err(e) => eprintln!("Error: {}", e), + } + } } diff --git a/examples/export.rs b/examples/export.rs index 55d1b7b5..34f460d5 100644 --- a/examples/export.rs +++ b/examples/export.rs @@ -1,8 +1,10 @@ -use shiplift::{errors::Error, Docker}; +use futures::StreamExt; use std::{env, fs::OpenOptions, io::Write}; -use tokio::prelude::{Future, Stream}; -fn main() { +use shiplift::{errors::Error, Docker}; + +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args().nth(1).expect("You need to specify an image id"); @@ -11,17 +13,13 @@ fn main() { .create(true) .open(format!("{}.tar", &id)) .unwrap(); + let images = docker.images(); - let fut = images - .get(&id) - .export() - .for_each(move |bytes| { - export_file - .write(&bytes[..]) - .map(|n| println!("copied {} bytes", n)) - .map_err(Error::IO) - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut) + while let Some(export_result) = images.get(&id).export().next().await { + match export_result.and_then(|bytes| export_file.write(&bytes).map_err(Error::from)) { + Ok(n) => println!("copied {} bytes", n), + Err(e) => eprintln!("Error: {}", e), + } + } } diff --git a/examples/imagebuild.rs b/examples/imagebuild.rs index 6dbea78e..80d825c3 100644 --- a/examples/imagebuild.rs +++ b/examples/imagebuild.rs @@ -1,19 +1,22 @@ +use futures::StreamExt; use shiplift::{BuildOptions, Docker}; use std::env; -use tokio::prelude::{Future, Stream}; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let path = env::args().nth(1).expect("You need to specify a path"); - let fut = docker - .images() - .build(&BuildOptions::builder(path).tag("shiplift_test").build()) - .for_each(|output| { - println!("{:?}", output); - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); + let options = BuildOptions::builder(path).tag("shiplift_test").build(); - tokio::run(fut); + let images = docker.images(); + + let mut stream = images.build(&options); + + while let Some(build_result) = stream.next().await { + match build_result { + Ok(output) => println!("{:?}", output), + Err(e) => eprintln!("Error: {}", e), + } + } } diff --git a/examples/imagedelete.rs b/examples/imagedelete.rs index efa763c2..3b9bc609 100644 --- a/examples/imagedelete.rs +++ b/examples/imagedelete.rs @@ -1,21 +1,18 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let img = env::args() .nth(1) .expect("You need to specify an image name"); - let fut = docker - .images() - .get(&img[..]) - .delete() - .map(|statuses| { + match docker.images().get(&img).delete().await { + Ok(statuses) => { for status in statuses { println!("{:?}", status); } - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + } + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/imageinspect.rs b/examples/imageinspect.rs index 494480a1..b7d2f9a6 100644 --- a/examples/imageinspect.rs +++ b/examples/imageinspect.rs @@ -1,17 +1,15 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) .expect("Usage: cargo run --example imageinspect -- "); - let fut = docker - .images() - .get(&id) - .inspect() - .map(|image| println!("{:#?}", image)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + + match docker.images().get(&id).inspect().await { + Ok(image) => println!("{:#?}", image), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/imagepull.rs b/examples/imagepull.rs index 84a61498..5b3fbf4d 100644 --- a/examples/imagepull.rs +++ b/examples/imagepull.rs @@ -1,22 +1,25 @@ // cargo run --example imagepull busybox +use futures::StreamExt; use shiplift::{Docker, PullOptions}; use std::env; -use tokio::prelude::{Future, Stream}; -fn main() { +#[tokio::main] +async fn main() { env_logger::init(); let docker = Docker::new(); let img = env::args() .nth(1) .expect("You need to specify an image name"); - let fut = docker + + let mut stream = docker .images() - .pull(&PullOptions::builder().image(img).build()) - .for_each(|output| { - println!("{:?}", output); - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + .pull(&PullOptions::builder().image(img).build()); + + while let Some(pull_result) = stream.next().await { + match pull_result { + Ok(output) => println!("{:?}", output), + Err(e) => eprintln!("Error: {}", e), + } + } } diff --git a/examples/imagepull_auth.rs b/examples/imagepull_auth.rs index 1c559c79..6f0ceec8 100644 --- a/examples/imagepull_auth.rs +++ b/examples/imagepull_auth.rs @@ -1,10 +1,11 @@ // cargo run --example imagepull_auth busybox username password +use futures::StreamExt; use shiplift::{Docker, PullOptions, RegistryAuth}; use std::env; -use tokio::prelude::{Future, Stream}; -fn main() { +#[tokio::main] +async fn main() { env_logger::init(); let docker = Docker::new(); let img = env::args() @@ -16,13 +17,15 @@ fn main() { .username(username) .password(password) .build(); - let fut = docker + + let mut stream = docker .images() - .pull(&PullOptions::builder().image(img).auth(auth).build()) - .for_each(|output| { - println!("{:?}", output); - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + .pull(&PullOptions::builder().image(img).auth(auth).build()); + + while let Some(pull_result) = stream.next().await { + match pull_result { + Ok(output) => println!("{:?}", output), + Err(e) => eprintln!("{}", e), + } + } } diff --git a/examples/images.rs b/examples/images.rs index 7a8a094c..1c0852e0 100644 --- a/examples/images.rs +++ b/examples/images.rs @@ -1,13 +1,14 @@ use shiplift::Docker; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); println!("docker images in stock"); - let fut = docker - .images() - .list(&Default::default()) - .map(|images| { + + let result = docker.images().list(&Default::default()).await; + + match result { + Ok(images) => { for i in images { println!( "{} {} {:?}", @@ -16,7 +17,7 @@ fn main() { i.repo_tags.unwrap_or_else(|| vec!["none".into()]) ); } - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + } + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/imagesearch.rs b/examples/imagesearch.rs index a6d6e522..e2fd1109 100644 --- a/examples/imagesearch.rs +++ b/examples/imagesearch.rs @@ -1,17 +1,16 @@ use shiplift::Docker; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); println!("remote docker images in stock"); - let fut = docker - .images() - .search("rust") - .map(|results| { + + match docker.images().search("rust").await { + Ok(results) => { for result in results { println!("{} - {}", result.name, result.description); } - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + } + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/imagetag.rs b/examples/imagetag.rs index 7ae78dd6..e36af768 100644 --- a/examples/imagetag.rs +++ b/examples/imagetag.rs @@ -2,9 +2,9 @@ use shiplift::{Docker, Image, TagOptions}; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { env_logger::init(); let docker = Docker::new(); let img = env::args() @@ -21,7 +21,7 @@ fn main() { let image = Image::new(&docker, img); - let fut = image.tag(&tag_opts).map_err(|e| eprintln!("Error: {}", e)); - - tokio::run(fut); + if let Err(e) = image.tag(&tag_opts).await { + eprintln!("Error: {}", e) + } } diff --git a/examples/import.rs b/examples/import.rs index 20c61e69..7ea35bdb 100644 --- a/examples/import.rs +++ b/examples/import.rs @@ -1,8 +1,9 @@ +use futures::StreamExt; use shiplift::Docker; use std::{env, fs::File}; -use tokio::prelude::{Future, Stream}; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let path = env::args() .nth(1) @@ -11,14 +12,12 @@ fn main() { let reader = Box::from(f); - let fut = docker - .images() - .import(reader) - .for_each(|output| { - println!("{:?}", output); - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); + let mut stream = docker.images().import(reader); - tokio::run(fut); + while let Some(import_result) = stream.next().await { + match import_result { + Ok(output) => println!("{:?}", output), + Err(e) => eprintln!("Error: {}", e), + } + } } diff --git a/examples/info.rs b/examples/info.rs index 05fdeded..76036e6e 100644 --- a/examples/info.rs +++ b/examples/info.rs @@ -1,12 +1,11 @@ use shiplift::Docker; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); - tokio::run( - docker - .info() - .map(|info| println!("info {:?}", info)) - .map_err(|e| eprintln!("Error: {}", e)), - ); + + match docker.info().await { + Ok(info) => println!("info {:?}", info), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/logs.rs b/examples/logs.rs index 57f12be9..73c80455 100644 --- a/examples/logs.rs +++ b/examples/logs.rs @@ -1,25 +1,31 @@ -use shiplift::{tty::StreamType, Docker, LogsOptions}; +use futures::StreamExt; +use shiplift::{tty::TtyChunk, Docker, LogsOptions}; use std::env; -use tokio::prelude::{Future, Stream}; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) .expect("You need to specify a container id"); - let fut = docker + + let mut logs_stream = docker .containers() .get(&id) - .logs(&LogsOptions::builder().stdout(true).stderr(true).build()) - .for_each(|chunk| { - match chunk.stream_type { - StreamType::StdOut => println!("Stdout: {}", chunk.as_string_lossy()), - StreamType::StdErr => eprintln!("Stderr: {}", chunk.as_string_lossy()), - StreamType::StdIn => unreachable!(), - } - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); + .logs(&LogsOptions::builder().stdout(true).stderr(true).build()); + + while let Some(log_result) = logs_stream.next().await { + match log_result { + Ok(chunk) => print_chunk(chunk), + Err(e) => eprintln!("Error: {}", e), + } + } +} - tokio::run(fut); +fn print_chunk(chunk: TtyChunk) { + match chunk { + TtyChunk::StdOut(bytes) => println!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()), + TtyChunk::StdErr(bytes) => eprintln!("Stdout: {}", std::str::from_utf8(&bytes).unwrap()), + TtyChunk::StdIn(_) => unreachable!(), + } } diff --git a/examples/networkconnect.rs b/examples/networkconnect.rs index 0cfc8fc0..a04db633 100644 --- a/examples/networkconnect.rs +++ b/examples/networkconnect.rs @@ -1,18 +1,20 @@ use shiplift::{ContainerConnectionOptions, Docker}; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let networks = docker.networks(); + match (env::args().nth(1), env::args().nth(2)) { (Some(container_id), Some(network_id)) => { - let fut = networks + if let Err(e) = networks .get(&network_id) .connect(&ContainerConnectionOptions::builder(&container_id).build()) - .map(|v| println!("{:?}", v)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + .await + { + eprintln!("Error: {}", e) + } } _ => eprintln!("please provide a container_id and network_id"), } diff --git a/examples/networkcreate.rs b/examples/networkcreate.rs index 30bb41cc..3173ab40 100644 --- a/examples/networkcreate.rs +++ b/examples/networkcreate.rs @@ -1,20 +1,22 @@ use shiplift::{Docker, NetworkCreateOptions}; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let network_name = env::args() .nth(1) .expect("You need to specify a network name"); - let fut = docker + match docker .networks() .create( &NetworkCreateOptions::builder(network_name.as_ref()) .driver("bridge") .build(), ) - .map(|info| println!("{:?}", info)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + .await + { + Ok(info) => println!("{:?}", info), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/networkdelete.rs b/examples/networkdelete.rs index 16fc4ab6..e419f908 100644 --- a/examples/networkdelete.rs +++ b/examples/networkdelete.rs @@ -1,18 +1,14 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) .expect("You need to specify a network id"); - let fut = docker - .networks() - .get(&id) - .delete() - .map(|network| println!("{:?}", network)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + if let Err(e) = docker.networks().get(&id).delete().await { + eprintln!("Error: {}", e) + } } diff --git a/examples/networkdisconnect.rs b/examples/networkdisconnect.rs index 9588ecc9..8d58b353 100644 --- a/examples/networkdisconnect.rs +++ b/examples/networkdisconnect.rs @@ -1,18 +1,27 @@ use shiplift::{ContainerConnectionOptions, Docker}; use std::env; -use tokio::prelude::Future; -fn main() { +async fn network_disconnect( + container_id: &str, + network_id: &str, +) { let docker = Docker::new(); let networks = docker.networks(); + + if let Err(e) = networks + .get(network_id) + .disconnect(&ContainerConnectionOptions::builder(container_id).build()) + .await + { + eprintln!("Error: {}", e) + } +} + +#[tokio::main] +async fn main() { match (env::args().nth(1), env::args().nth(2)) { (Some(container_id), Some(network_id)) => { - let fut = networks - .get(&network_id) - .disconnect(&ContainerConnectionOptions::builder(&container_id).build()) - .map(|v| println!("{:?}", v)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + network_disconnect(&container_id, &network_id).await; } _ => eprintln!("please provide a container_id and network_id"), } diff --git a/examples/networkinspect.rs b/examples/networkinspect.rs index 86a076b0..143c6373 100644 --- a/examples/networkinspect.rs +++ b/examples/networkinspect.rs @@ -1,17 +1,15 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) .expect("You need to specify a network id"); - let fut = docker - .networks() - .get(&id) - .inspect() - .map(|network| println!("{:#?}", network)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + + match docker.networks().get(&id).inspect().await { + Ok(network_info) => println!("{:#?}", network_info), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/networks.rs b/examples/networks.rs index 9ceea99a..4a1dcf1e 100644 --- a/examples/networks.rs +++ b/examples/networks.rs @@ -1,17 +1,17 @@ use shiplift::Docker; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { env_logger::init(); + let docker = Docker::new(); - let fut = docker - .networks() - .list(&Default::default()) - .map(|networks| { + + match docker.networks().list(&Default::default()).await { + Ok(networks) => { for network in networks { - println!("network -> {:#?}", network); + println!("network -> {:#?}", network) } - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + } + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/stats.rs b/examples/stats.rs index 9e14cf40..063f5026 100644 --- a/examples/stats.rs +++ b/examples/stats.rs @@ -1,21 +1,20 @@ // cargo run --example stats -- +use futures::StreamExt; use shiplift::Docker; use std::env; -use tokio::prelude::{Future, Stream}; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let containers = docker.containers(); let id = env::args() .nth(1) .expect("Usage: cargo run --example stats -- "); - let fut = containers - .get(&id) - .stats() - .for_each(|stat| { - println!("{:?}", stat); - Ok(()) - }) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + + while let Some(result) = containers.get(&id).stats().next().await { + match result { + Ok(stat) => println!("{:?}", stat), + Err(e) => eprintln!("Error: {}", e), + } + } } diff --git a/examples/top.rs b/examples/top.rs index 5fc42297..39e8ea6b 100644 --- a/examples/top.rs +++ b/examples/top.rs @@ -1,17 +1,15 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let id = env::args() .nth(1) .expect("Usage: cargo run --example top -- "); - let fut = docker - .containers() - .get(&id) - .top(Default::default()) - .map(|top| println!("{:#?}", top)) - .map_err(|e| eprintln!("Error: {}", e)); - tokio::run(fut); + + match docker.containers().get(&id).top(Default::default()).await { + Ok(top) => println!("{:#?}", top), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/version.rs b/examples/version.rs index 6125e561..e1ab5d21 100644 --- a/examples/version.rs +++ b/examples/version.rs @@ -1,13 +1,10 @@ use shiplift::Docker; -use tokio::prelude::Future; -fn main() { - env_logger::init(); +#[tokio::main] +async fn main() { let docker = Docker::new(); - let fut = docker - .version() - .map(|ver| println!("version -> {:#?}", ver)) - .map_err(|e| eprintln!("Error: {}", e)); - - tokio::run(fut); + match docker.version().await { + Ok(ver) => println!("version -> {:#?}", ver), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/volumecreate.rs b/examples/volumecreate.rs index 83f50458..a243be64 100644 --- a/examples/volumecreate.rs +++ b/examples/volumecreate.rs @@ -1,8 +1,8 @@ use shiplift::{builder::VolumeCreateOptions, Docker}; use std::{collections::HashMap, env}; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let volumes = docker.volumes(); @@ -13,15 +13,16 @@ fn main() { let mut labels = HashMap::new(); labels.insert("com.github.softprops", "shiplift"); - let fut = volumes + match volumes .create( &VolumeCreateOptions::builder() .name(volume_name.as_ref()) .labels(&labels) .build(), ) - .map(|info| println!("{:?}", info)) - .map_err(|e| eprintln!("Error: {}", e)); - - tokio::run(fut); + .await + { + Ok(info) => println!("{:?}", info), + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/examples/volumedelete.rs b/examples/volumedelete.rs index 3800d22a..ec1da7ec 100644 --- a/examples/volumedelete.rs +++ b/examples/volumedelete.rs @@ -1,8 +1,8 @@ use shiplift::Docker; use std::env; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let volumes = docker.volumes(); @@ -10,11 +10,7 @@ fn main() { .nth(1) .expect("You need to specify an volume name"); - let fut = volumes - .get(&volume_name) - .delete() - .map(|info| println!("{:?}", info)) - .map_err(|e| eprintln!("Error: {}", e)); - - tokio::run(fut); + if let Err(e) = volumes.get(&volume_name).delete().await { + eprintln!("Error: {}", e) + } } diff --git a/examples/volumes.rs b/examples/volumes.rs index c5548ec8..d45c00a8 100644 --- a/examples/volumes.rs +++ b/examples/volumes.rs @@ -1,18 +1,16 @@ use shiplift::Docker; -use tokio::prelude::Future; -fn main() { +#[tokio::main] +async fn main() { let docker = Docker::new(); let volumes = docker.volumes(); - let fut = volumes - .list() - .map(|volumes| { + match volumes.list().await { + Ok(volumes) => { for v in volumes { println!("volume -> {:#?}", v) } - }) - .map_err(|e| eprintln!("Error: {}", e)); - - tokio::run(fut); + } + Err(e) => eprintln!("Error: {}", e), + } } diff --git a/lib.rs b/lib.rs new file mode 100644 index 00000000..e69de29b diff --git a/src/errors.rs b/src/errors.rs index 5506198b..9d01f91a 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,8 +1,10 @@ //! Representations of various client errors -use hyper::{self, StatusCode}; +use hyper::{self, http, StatusCode}; use serde_json::Error as SerdeError; -use std::{error::Error as StdError, fmt, io::Error as IoError, string::FromUtf8Error}; +use std::{error::Error as StdError, fmt, string::FromUtf8Error}; + +use futures_util::io::Error as IoError; #[derive(Debug)] pub enum Error { @@ -34,12 +36,25 @@ impl From for Error { } } +impl From for Error { + fn from(error: http::uri::InvalidUri) -> Self { + let http_error: http::Error = error.into(); + http_error.into() + } +} + impl From for Error { fn from(error: IoError) -> Error { Error::IO(error) } } +impl From for Error { + fn from(error: FromUtf8Error) -> Error { + Error::Encoding(error) + } +} + impl fmt::Display for Error { fn fmt( &self, diff --git a/src/lib.rs b/src/lib.rs index 16e56252..06b31a19 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,22 +3,22 @@ //! # examples //! //! ```no_run -//! use tokio::prelude::Future; -//! +//! # async { //! let docker = shiplift::Docker::new(); -//! let fut = docker.images().list(&Default::default()).map(|images| { -//! println!("docker images in stock"); -//! for i in images { -//! println!("{:?}", i.repo_tags); -//! } -//! }).map_err(|e| eprintln!("Something bad happened! {}", e)); //! -//! tokio::run(fut); +//! match docker.images().list(&Default::default()).await { +//! Ok(images) => { +//! for image in images { +//! println!("{:?}", image.repo_tags); +//! } +//! }, +//! Err(e) => eprintln!("Something bad happened! {}", e), +//! } +//! # }; //! ``` pub mod builder; pub mod errors; -pub mod read; pub mod rep; pub mod transport; pub mod tty; @@ -35,7 +35,6 @@ pub use crate::{ errors::Error, }; use crate::{ - read::StreamReader, rep::{ Change, Container as ContainerRep, ContainerCreateInfo, ContainerDetails, Event, Exit, History, Image as ImageRep, ImageDetails, Info, NetworkCreateInfo, @@ -43,9 +42,14 @@ use crate::{ Volume as VolumeRep, VolumeCreateInfo, Volumes as VolumesRep, }, transport::{tar, Transport}, - tty::TtyDecoder, + tty::Multiplexer as TtyMultiPlexer, +}; +use futures_util::{ + io::{AsyncRead, AsyncWrite}, + stream::Stream, + TryFutureExt, TryStreamExt, }; -use futures::{future::Either, Future, IntoFuture, Stream}; +// use futures::{future::Either, Future, IntoFuture, Stream}; pub use hyper::Uri; use hyper::{client::HttpConnector, Body, Client, Method}; #[cfg(feature = "tls")] @@ -56,8 +60,7 @@ use mime::Mime; #[cfg(feature = "tls")] use openssl::ssl::{SslConnector, SslFiletype, SslMethod}; use serde_json::Value; -use std::{borrow::Cow, env, io::Read, iter, path::Path, time::Duration}; -use tokio_codec::{FramedRead, LinesCodec}; +use std::{borrow::Cow, env, io, io::Read, iter, path::Path, time::Duration}; use url::form_urlencoded; /// Represents the result of all docker operations @@ -70,19 +73,19 @@ pub struct Docker { } /// Interface for accessing and manipulating a named docker image -pub struct Image<'a, 'b> { +pub struct Image<'a> { docker: &'a Docker, - name: Cow<'b, str>, + name: Cow<'a, str>, } -impl<'a, 'b> Image<'a, 'b> { +impl<'a> Image<'a> { /// Exports an interface for operations that may be performed against a named image pub fn new( docker: &'a Docker, name: S, - ) -> Image<'a, 'b> + ) -> Self where - S: Into>, + S: Into>, { Image { docker, @@ -91,40 +94,46 @@ impl<'a, 'b> Image<'a, 'b> { } /// Inspects a named image's details - pub fn inspect(&self) -> impl Future { + pub async fn inspect(&self) -> Result { self.docker .get_json(&format!("/images/{}/json", self.name)[..]) + .await } /// Lists the history of the images set of changes - pub fn history(&self) -> impl Future, Error = Error> { + pub async fn history(&self) -> Result> { self.docker .get_json(&format!("/images/{}/history", self.name)[..]) + .await } /// Deletes an image - pub fn delete(&self) -> impl Future, Error = Error> { + pub async fn delete(&self) -> Result> { self.docker .delete_json::>(&format!("/images/{}", self.name)[..]) + .await } /// Export this image to a tarball - pub fn export(&self) -> impl Stream, Error = Error> { - self.docker - .stream_get(&format!("/images/{}/get", self.name)[..]) - .map(|c| c.to_vec()) + pub fn export(&self) -> impl Stream>> + Unpin + 'a { + Box::pin( + self.docker + .stream_get(format!("/images/{}/get", self.name)) + .map_ok(|c| c.to_vec()), + ) } /// Adds a tag to an image - pub fn tag( + pub async fn tag( &self, opts: &TagOptions, - ) -> impl Future { + ) -> Result<()> { let mut path = vec![format!("/images/{}/tag", self.name)]; if let Some(query) = opts.serialize() { path.push(query) } - self.docker.post::(&path.join("?"), None).map(|_| ()) + let _ = self.docker.post(&path.join("?"), None).await?; + Ok(()) } } @@ -141,76 +150,74 @@ impl<'a> Images<'a> { /// Builds a new image build by reading a Dockerfile in a target directory pub fn build( - &self, - opts: &BuildOptions, - ) -> impl Stream { - let mut path = vec!["/build".to_owned()]; - if let Some(query) = opts.serialize() { - path.push(query) - } + &'a self, + opts: &'a BuildOptions, + ) -> impl Stream> + Unpin + 'a { + Box::pin( + async move { + let mut path = vec!["/build".to_owned()]; + if let Some(query) = opts.serialize() { + path.push(query) + } + + let mut bytes = Vec::default(); + + tarball::dir(&mut bytes, &opts.path[..])?; + + let chunk_stream = self.docker.stream_post( + path.join("?"), + Some((Body::from(bytes), tar())), + None::>, + ); - let mut bytes = vec![]; - - match tarball::dir(&mut bytes, &opts.path[..]) { - Ok(_) => Box::new( - self.docker - .stream_post( - &path.join("?"), - Some((Body::from(bytes), tar())), - None::>, - ) - .map(|r| { - futures::stream::iter_result( - serde_json::Deserializer::from_slice(&r[..]) - .into_iter::() - .collect::>(), - ) - .map_err(Error::from) - }) - .flatten(), - ) as Box + Send>, - Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream()) - as Box + Send>, - } + let value_stream = chunk_stream.and_then(|chunk| async move { + serde_json::from_slice(&chunk).map_err(Error::from) + }); + + Ok(value_stream) + } + .try_flatten_stream(), + ) } /// Lists the docker images on the current docker host - pub fn list( + pub async fn list( &self, opts: &ImageListOptions, - ) -> impl Future, Error = Error> { + ) -> Result> { let mut path = vec!["/images/json".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } - self.docker.get_json::>(&path.join("?")) + self.docker.get_json::>(&path.join("?")).await } /// Returns a reference to a set of operations available for a named image - pub fn get<'b>( + pub fn get( &self, - name: &'b str, - ) -> Image<'a, 'b> { + name: &'a str, + ) -> Image<'a> { Image::new(self.docker, name) } /// Search for docker images by term - pub fn search( + pub async fn search( &self, term: &str, - ) -> impl Future, Error = Error> { + ) -> Result> { let query = form_urlencoded::Serializer::new(String::new()) .append_pair("term", term) .finish(); self.docker .get_json::>(&format!("/images/search?{}", query)[..]) + .await } /// Pull and create a new docker images from an existing image pub fn pull( &self, opts: &PullOptions, - ) -> impl Stream { + ) -> impl Stream> + Unpin + 'a { let mut path = vec!["/images/create".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); @@ -218,18 +225,15 @@ impl<'a> Images<'a> { let headers = opts .auth_header() .map(|a| iter::once(("X-Registry-Auth", a))); - self.docker - .stream_post::(&path.join("?"), None, headers) - // todo: give this a proper enum type - .map(|r| { - futures::stream::iter_result( - serde_json::Deserializer::from_slice(&r[..]) - .into_iter::() - .collect::>(), - ) - .map_err(Error::from) - }) - .flatten() + + Box::pin( + self.docker + .stream_post(path.join("?"), None, headers) + .and_then(move |chunk| { + // todo: give this a proper enum type + futures_util::future::ready(serde_json::from_slice(&chunk).map_err(Error::from)) + }), + ) } /// exports a collection of named images, @@ -237,14 +241,14 @@ impl<'a> Images<'a> { pub fn export( &self, names: Vec<&str>, - ) -> impl Stream, Error = Error> { + ) -> impl Stream>> + 'a { let params = names.iter().map(|n| ("names", *n)); let query = form_urlencoded::Serializer::new(String::new()) .extend_pairs(params) .finish(); self.docker - .stream_get(&format!("/images/get?{}", query)[..]) - .map(|c| c.to_vec()) + .stream_get(format!("/images/get?{}", query)) + .map_ok(|c| c.to_vec()) } /// imports an image or set of images from a given tarball source @@ -252,43 +256,44 @@ impl<'a> Images<'a> { pub fn import( self, mut tarball: Box, - ) -> impl Stream { - let mut bytes = Vec::new(); - - match tarball.read_to_end(&mut bytes) { - Ok(_) => Box::new( - self.docker - .stream_post( - "/images/load", - Some((Body::from(bytes), tar())), - None::>, - ) - .and_then(|bytes| { - serde_json::from_slice::<'_, Value>(&bytes[..]) - .map_err(Error::from) - .into_future() - }), - ) as Box + Send>, - Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream()) - as Box + Send>, - } + ) -> impl Stream> + Unpin + 'a { + Box::pin( + async move { + let mut bytes = Vec::default(); + + tarball.read_to_end(&mut bytes)?; + + let chunk_stream = self.docker.stream_post( + "/images/load", + Some((Body::from(bytes), tar())), + None::>, + ); + + let value_stream = chunk_stream.and_then(|chunk| async move { + serde_json::from_slice(&chunk).map_err(Error::from) + }); + + Ok(value_stream) + } + .try_flatten_stream(), + ) } } /// Interface for accessing and manipulating a docker container -pub struct Container<'a, 'b> { +pub struct Container<'a> { docker: &'a Docker, - id: Cow<'b, str>, + id: Cow<'a, str>, } -impl<'a, 'b> Container<'a, 'b> { +impl<'a> Container<'a> { /// Exports an interface exposing operations against a container instance pub fn new( docker: &'a Docker, id: S, - ) -> Container<'a, 'b> + ) -> Self where - S: Into>, + S: Into>, { Container { docker, @@ -302,16 +307,17 @@ impl<'a, 'b> Container<'a, 'b> { } /// Inspects the current docker container instance's details - pub fn inspect(&self) -> impl Future { + pub async fn inspect(&self) -> Result { self.docker .get_json::(&format!("/containers/{}/json", self.id)[..]) + .await } /// Returns a `top` view of information about the container process - pub fn top( + pub async fn top( &self, psargs: Option<&str>, - ) -> impl Future { + ) -> Result { let mut path = vec![format!("/containers/{}/top", self.id)]; if let Some(ref args) = psargs { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -319,85 +325,95 @@ impl<'a, 'b> Container<'a, 'b> { .finish(); path.push(encoded) } - self.docker.get_json(&path.join("?")) + self.docker.get_json(&path.join("?")).await } /// Returns a stream of logs emitted but the container instance pub fn logs( &self, opts: &LogsOptions, - ) -> impl Stream { + ) -> impl Stream> + Unpin + 'a { let mut path = vec![format!("/containers/{}/logs", self.id)]; if let Some(query) = opts.serialize() { path.push(query) } - let decoder = TtyDecoder::new(); - let chunk_stream = StreamReader::new(self.docker.stream_get(&path.join("?"))); + let stream = Box::pin(self.docker.stream_get(path.join("?"))); - FramedRead::new(chunk_stream, decoder) + Box::pin(tty::decode(stream)) } - /// Attaches to a running container, returning a stream that can - /// be used to interact with the standard IO streams. - pub fn attach(&self) -> impl Future { - self.docker.stream_post_upgrade_multiplexed::( - &format!( - "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1", - self.id - ), - None, - ) + /// Attaches a multiplexed TCP stream to the container that can be used to read Stdout, Stderr and write Stdin. + async fn attach_raw(&self) -> Result { + self.docker + .stream_post_upgrade( + format!( + "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1", + self.id + ), + None, + ) + .await } - /// Attaches to a running container, returning a stream that can - /// be used to interact with the standard IO streams. - pub fn attach_blocking(&self) -> Result { - self.attach().map(|s| s.wait()).wait() + /// Attaches a `[TtyMultiplexer]` to the container. + /// + /// The `[TtyMultiplexer]` implements Stream for returning Stdout and Stderr chunks. It also implements `[AsyncWrite]` for writing to Stdin. + /// + /// The multiplexer can be split into its read and write halves with the `[split](TtyMultiplexer::split)` method + pub async fn attach(&self) -> Result> { + let tcp_stream = self.attach_raw().await?; + + Ok(TtyMultiPlexer::new(tcp_stream)) } /// Returns a set of changes made to the container instance - pub fn changes(&self) -> impl Future, Error = Error> { + pub async fn changes(&self) -> Result> { self.docker .get_json::>(&format!("/containers/{}/changes", self.id)[..]) + .await } /// Exports the current docker container into a tarball - pub fn export(&self) -> impl Stream, Error = Error> { + pub fn export(&self) -> impl Stream>> + 'a { self.docker - .stream_get(&format!("/containers/{}/export", self.id)[..]) - .map(|c| c.to_vec()) + .stream_get(format!("/containers/{}/export", self.id)) + .map_ok(|c| c.to_vec()) } /// Returns a stream of stats specific to this container instance - pub fn stats(&self) -> impl Stream { - let decoder = LinesCodec::new(); - let stream_of_chunks = StreamReader::new( - self.docker - .stream_get(&format!("/containers/{}/stats", self.id)[..]), - ); + pub fn stats(&'a self) -> impl Stream> + Unpin + 'a { + let codec = futures_codec::LinesCodec {}; - FramedRead::new(stream_of_chunks, decoder) - .map_err(Error::IO) - .and_then(|s| { - serde_json::from_str::(&s) - .map_err(Error::SerdeJsonError) - .into_future() - }) + let reader = Box::pin( + self.docker + .stream_get(format!("/containers/{}/stats", self.id)) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), + ) + .into_async_read(); + + Box::pin( + futures_codec::FramedRead::new(reader, codec) + .map_err(Error::IO) + .and_then(|s: String| async move { + serde_json::from_str(&s).map_err(Error::SerdeJsonError) + }), + ) } /// Start the container instance - pub fn start(&self) -> impl Future { + pub async fn start(&self) -> Result<()> { self.docker - .post::(&format!("/containers/{}/start", self.id)[..], None) - .map(|_| ()) + .post(&format!("/containers/{}/start", self.id)[..], None) + .await?; + Ok(()) } /// Stop the container instance - pub fn stop( + pub async fn stop( &self, wait: Option, - ) -> impl Future { + ) -> Result<()> { let mut path = vec![format!("/containers/{}/stop", self.id)]; if let Some(w) = wait { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -406,14 +422,15 @@ impl<'a, 'b> Container<'a, 'b> { path.push(encoded) } - self.docker.post::(&path.join("?"), None).map(|_| ()) + self.docker.post(&path.join("?"), None).await?; + Ok(()) } /// Restart the container instance - pub fn restart( + pub async fn restart( &self, wait: Option, - ) -> impl Future { + ) -> Result<()> { let mut path = vec![format!("/containers/{}/restart", self.id)]; if let Some(w) = wait { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -421,14 +438,15 @@ impl<'a, 'b> Container<'a, 'b> { .finish(); path.push(encoded) } - self.docker.post::(&path.join("?"), None).map(|_| ()) + self.docker.post(&path.join("?"), None).await?; + Ok(()) } /// Kill the container instance - pub fn kill( + pub async fn kill( &self, signal: Option<&str>, - ) -> impl Future { + ) -> Result<()> { let mut path = vec![format!("/containers/{}/kill", self.id)]; if let Some(sig) = signal { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -436,101 +454,125 @@ impl<'a, 'b> Container<'a, 'b> { .finish(); path.push(encoded) } - self.docker.post::(&path.join("?"), None).map(|_| ()) + self.docker.post(&path.join("?"), None).await?; + Ok(()) } /// Rename the container instance - pub fn rename( + pub async fn rename( &self, name: &str, - ) -> impl Future { + ) -> Result<()> { let query = form_urlencoded::Serializer::new(String::new()) .append_pair("name", name) .finish(); self.docker - .post::( + .post( &format!("/containers/{}/rename?{}", self.id, query)[..], None, ) - .map(|_| ()) + .await?; + Ok(()) } /// Pause the container instance - pub fn pause(&self) -> impl Future { + pub async fn pause(&self) -> Result<()> { self.docker - .post::(&format!("/containers/{}/pause", self.id)[..], None) - .map(|_| ()) + .post(&format!("/containers/{}/pause", self.id)[..], None) + .await?; + Ok(()) } /// Unpause the container instance - pub fn unpause(&self) -> impl Future { + pub async fn unpause(&self) -> Result<()> { self.docker - .post::(&format!("/containers/{}/unpause", self.id)[..], None) - .map(|_| ()) + .post(&format!("/containers/{}/unpause", self.id)[..], None) + .await?; + Ok(()) } /// Wait until the container stops - pub fn wait(&self) -> impl Future { + pub async fn wait(&self) -> Result { self.docker - .post_json::(&format!("/containers/{}/wait", self.id)[..], None) + .post_json( + format!("/containers/{}/wait", self.id), + Option::<(Body, Mime)>::None, + ) + .await } /// Delete the container instance /// /// Use remove instead to use the force/v options. - pub fn delete(&self) -> impl Future { + pub async fn delete(&self) -> Result<()> { self.docker .delete(&format!("/containers/{}", self.id)[..]) - .map(|_| ()) + .await?; + Ok(()) } /// Delete the container instance (todo: force/v) - pub fn remove( + pub async fn remove( &self, opts: RmContainerOptions, - ) -> impl Future { + ) -> Result<()> { let mut path = vec![format!("/containers/{}", self.id)]; if let Some(query) = opts.serialize() { path.push(query) } - self.docker.delete(&path.join("?")).map(|_| ()) + self.docker.delete(&path.join("?")).await?; + Ok(()) } - // TODO(abusch) fix this - /// Exec the specified command in the container - pub fn exec( + async fn exec_create( &self, opts: &ExecContainerOptions, - ) -> impl Stream { - let data = opts.serialize().unwrap(); // TODO fixme - let bytes = data.into_bytes(); - let docker2 = self.docker.clone(); - self.docker - .post( + ) -> Result { + #[derive(serde::Deserialize)] + #[serde(rename_all = "PascalCase")] + struct Response { + id: String, + } + + let body: Body = opts.serialize()?.into(); + + let Response { id } = self + .docker + .post_json( &format!("/containers/{}/exec", self.id)[..], - Some((bytes, mime::APPLICATION_JSON)), + Some((body, mime::APPLICATION_JSON)), ) - .map(move |res| { - let data = "{}"; - let bytes = data.as_bytes(); - let id = serde_json::from_str::(res.as_str()) - .ok() - .and_then(|v| { - v.as_object() - .and_then(|v| v.get("Id")) - .and_then(|v| v.as_str().map(|v| v.to_string())) - }) - .unwrap(); // TODO fixme - - let decoder = TtyDecoder::new(); - let chunk_stream = StreamReader::new(docker2.stream_post( - &format!("/exec/{}/start", id)[..], - Some((bytes, mime::APPLICATION_JSON)), - None::>, - )); - FramedRead::new(chunk_stream, decoder) - }) - .flatten_stream() + .await?; + + Ok(id) + } + + fn exec_start( + &self, + id: String, + ) -> impl Stream> + 'a { + let bytes: &[u8] = b"{}"; + + let stream = Box::pin(self.docker.stream_post( + format!("/exec/{}/start", id), + Some((bytes.into(), mime::APPLICATION_JSON)), + None::>, + )); + + tty::decode(stream) + } + + pub fn exec( + &'a self, + opts: &'a ExecContainerOptions, + ) -> impl Stream> + Unpin + 'a { + Box::pin( + async move { + let id = self.exec_create(opts).await?; + Ok(self.exec_start(id)) + } + .try_flatten_stream(), + ) } /// Copy a file/folder from the container. The resulting stream is a tarball of the extracted @@ -544,24 +586,24 @@ impl<'a, 'b> Container<'a, 'b> { pub fn copy_from( &self, path: &Path, - ) -> impl Stream, Error = Error> { + ) -> impl Stream>> + 'a { let path_arg = form_urlencoded::Serializer::new(String::new()) .append_pair("path", &path.to_string_lossy()) .finish(); - self.docker - .stream_get(&format!("/containers/{}/archive?{}", self.id, path_arg)) - .map(|c| c.to_vec()) + + let endpoint = format!("/containers/{}/archive?{}", self.id, path_arg); + self.docker.stream_get(endpoint).map_ok(|c| c.to_vec()) } /// Copy a byte slice as file into (see `bytes`) the container. /// /// The file will be copied at the given location (see `path`) and will be owned by root /// with access mask 644. - pub fn copy_file_into>( + pub async fn copy_file_into>( &self, path: P, bytes: &[u8], - ) -> impl Future { + ) -> Result<()> { let path = path.as_ref(); let mut ar = tar::Builder::new(Vec::new()); @@ -588,9 +630,10 @@ impl<'a, 'b> Container<'a, 'b> { self.docker .put( &format!("/containers/{}/archive?{}", self.id, path_arg), - body, + body.map(|(body, mime)| (body.into(), mime)), ) - .map(|_| ()) + .await?; + Ok(()) } } @@ -606,36 +649,33 @@ impl<'a> Containers<'a> { } /// Lists the container instances on the docker host - pub fn list( + pub async fn list( &self, opts: &ContainerListOptions, - ) -> impl Future, Error = Error> { + ) -> Result> { let mut path = vec!["/containers/json".to_owned()]; if let Some(query) = opts.serialize() { path.push(query) } - self.docker.get_json::>(&path.join("?")) + self.docker + .get_json::>(&path.join("?")) + .await } /// Returns a reference to a set of operations available to a specific container instance - pub fn get<'b>( + pub fn get( &self, - name: &'b str, - ) -> Container<'a, 'b> { + name: &'a str, + ) -> Container<'a> { Container::new(self.docker, name) } /// Returns a builder interface for creating a new container instance - pub fn create( + pub async fn create( &self, opts: &ContainerOptions, - ) -> impl Future { - let data = match opts.serialize() { - Ok(data) => data, - Err(e) => return Either::A(futures::future::err(e)), - }; - - let bytes = data.into_bytes(); + ) -> Result { + let body: Body = opts.serialize()?.into(); let mut path = vec!["/containers/create".to_owned()]; if let Some(ref name) = opts.name { @@ -646,10 +686,9 @@ impl<'a> Containers<'a> { ); } - Either::B( - self.docker - .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))), - ) + self.docker + .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON))) + .await } } @@ -665,15 +704,15 @@ impl<'a> Networks<'a> { } /// List the docker networks on the current docker host - pub fn list( + pub async fn list( &self, opts: &NetworkListOptions, - ) -> impl Future, Error = Error> { + ) -> Result> { let mut path = vec!["/networks".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } - self.docker.get_json(&path.join("?")) + self.docker.get_json(&path.join("?")).await } /// Returns a reference to a set of operations available to a specific network instance @@ -685,21 +724,16 @@ impl<'a> Networks<'a> { } /// Create a new Network instance - pub fn create( + pub async fn create( &self, opts: &NetworkCreateOptions, - ) -> impl Future { - let data = match opts.serialize() { - Ok(data) => data, - Err(e) => return Either::A(futures::future::err(e)), - }; - let bytes = data.into_bytes(); + ) -> Result { + let body: Body = opts.serialize()?.into(); let path = vec!["/networks/create".to_owned()]; - Either::B( - self.docker - .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))), - ) + self.docker + .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON))) + .await } } @@ -730,52 +764,50 @@ impl<'a, 'b> Network<'a, 'b> { } /// Inspects the current docker network instance's details - pub fn inspect(&self) -> impl Future { - self.docker.get_json(&format!("/networks/{}", self.id)[..]) + pub async fn inspect(&self) -> Result { + self.docker + .get_json(&format!("/networks/{}", self.id)[..]) + .await } /// Delete the network instance - pub fn delete(&self) -> impl Future { + pub async fn delete(&self) -> Result<()> { self.docker .delete(&format!("/networks/{}", self.id)[..]) - .map(|_| ()) + .await?; + Ok(()) } /// Connect container to network - pub fn connect( + pub async fn connect( &self, opts: &ContainerConnectionOptions, - ) -> impl Future { - self.do_connection("connect", opts) + ) -> Result<()> { + self.do_connection("connect", opts).await } /// Disconnect container to network - pub fn disconnect( + pub async fn disconnect( &self, opts: &ContainerConnectionOptions, - ) -> impl Future { - self.do_connection("disconnect", opts) + ) -> Result<()> { + self.do_connection("disconnect", opts).await } - fn do_connection( + async fn do_connection( &self, segment: &str, opts: &ContainerConnectionOptions, - ) -> impl Future { - let data = match opts.serialize() { - Ok(data) => data, - Err(e) => return Either::A(futures::future::err(e)), - }; - let bytes = data.into_bytes(); + ) -> Result<()> { + let body: Body = opts.serialize()?.into(); - Either::B( - self.docker - .post( - &format!("/networks/{}/{}", self.id, segment)[..], - Some((bytes, mime::APPLICATION_JSON)), - ) - .map(|_| ()), - ) + self.docker + .post( + &format!("/networks/{}/{}", self.id, segment)[..], + Some((body, mime::APPLICATION_JSON)), + ) + .await?; + Ok(()) } } @@ -790,34 +822,27 @@ impl<'a> Volumes<'a> { Volumes { docker } } - pub fn create( + pub async fn create( &self, opts: &VolumeCreateOptions, - ) -> impl Future { - let data = match opts.serialize() { - Ok(data) => data, - Err(e) => return Either::A(futures::future::err(e)), - }; - - let bytes = data.into_bytes(); + ) -> Result { + let body: Body = opts.serialize()?.into(); let path = vec!["/volumes/create".to_owned()]; - Either::B( - self.docker - .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))), - ) + self.docker + .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON))) + .await } /// Lists the docker volumes on the current docker host - pub fn list(&self) -> impl Future, Error = Error> { + pub async fn list(&self) -> Result> { let path = vec!["/volumes".to_owned()]; - self.docker - .get_json::(&path.join("?")) - .map(|volumes: VolumesRep| match volumes.volumes { - Some(volumes) => volumes, - None => vec![], - }) + let volumes_rep = self.docker.get_json::(&path.join("?")).await?; + Ok(match volumes_rep.volumes { + Some(volumes) => volumes, + None => vec![], + }) } /// Returns a reference to a set of operations available for a named volume @@ -851,15 +876,16 @@ impl<'a, 'b> Volume<'a, 'b> { } /// Deletes a volume - pub fn delete(&self) -> impl Future { + pub async fn delete(&self) -> Result<()> { self.docker .delete(&format!("/volumes/{}", self.name)[..]) - .map(|_| ()) + .await?; + Ok(()) } } fn get_http_connector() -> HttpConnector { - let mut http = HttpConnector::new(1); + let mut http = HttpConnector::new(); http.enforce_http(false); http @@ -950,7 +976,9 @@ impl Docker { { Docker { transport: Transport::Unix { - client: Client::builder().keep_alive(false).build(UnixConnector), + client: Client::builder() + .pool_max_idle_per_host(0) + .build(UnixConnector), path: socket_path.into(), }, } @@ -960,12 +988,12 @@ impl Docker { pub fn host(host: Uri) -> Docker { let tcp_host_str = format!( "{}://{}:{}", - host.scheme_part().map(|s| s.as_str()).unwrap(), + host.scheme_str().unwrap(), host.host().unwrap().to_owned(), host.port_u16().unwrap_or(80) ); - match host.scheme_part().map(|s| s.as_str()) { + match host.scheme_str() { #[cfg(feature = "unix-socket")] Some("unix") => Docker { transport: Transport::Unix { @@ -1000,153 +1028,152 @@ impl Docker { } /// Returns version information associated with the docker daemon - pub fn version(&self) -> impl Future { - self.get_json("/version") + pub async fn version(&self) -> Result { + self.get_json("/version").await } /// Returns information associated with the docker daemon - pub fn info(&self) -> impl Future { - self.get_json("/info") + pub async fn info(&self) -> Result { + self.get_json("/info").await } /// Returns a simple ping response indicating the docker daemon is accessible - pub fn ping(&self) -> impl Future { - self.get("/_ping") + pub async fn ping(&self) -> Result { + self.get("/_ping").await } /// Returns a stream of docker events - pub fn events( - &self, - opts: &EventsOptions, - ) -> impl Stream { + pub fn events<'a>( + &'a self, + opts: &'a EventsOptions, + ) -> impl Stream> + Unpin + 'a { let mut path = vec!["/events".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } - let stream_of_chunks = self.stream_get(&path.join("?")[..]); - let reader = StreamReader::new(stream_of_chunks); - FramedRead::new(reader, LinesCodec::new()) - .map_err(Error::IO) - .and_then(|line| serde_json::from_str::(&line).map_err(Error::from)) + let reader = Box::pin( + self.stream_get(path.join("?")) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), + ) + .into_async_read(); + + let codec = futures_codec::LinesCodec {}; + + Box::pin( + futures_codec::FramedRead::new(reader, codec) + .map_err(Error::IO) + .and_then(|s: String| async move { + serde_json::from_str(&s).map_err(Error::SerdeJsonError) + }), + ) } // // Utility functions to make requests // - fn get( + async fn get( &self, endpoint: &str, - ) -> impl Future { - self.transport.request::(Method::GET, endpoint, None) + ) -> Result { + self.transport + .request(Method::GET, endpoint, Option::<(Body, Mime)>::None) + .await } - fn get_json( + async fn get_json( &self, endpoint: &str, - ) -> impl Future { - self.transport - .request::(Method::GET, endpoint, None) - .and_then(|v| { - serde_json::from_str::(&v) - .map_err(Error::SerdeJsonError) - .into_future() - }) + ) -> Result { + let raw_string = self + .transport + .request(Method::GET, endpoint, Option::<(Body, Mime)>::None) + .await?; + + Ok(serde_json::from_str::(&raw_string)?) } - fn post( + async fn post( &self, endpoint: &str, - body: Option<(B, Mime)>, - ) -> impl Future - where - B: Into, - { - self.transport.request(Method::POST, endpoint, body) + body: Option<(Body, Mime)>, + ) -> Result { + self.transport.request(Method::POST, endpoint, body).await } - fn put( + async fn put( &self, endpoint: &str, - body: Option<(B, Mime)>, - ) -> impl Future - where - B: Into, - { - self.transport.request(Method::PUT, endpoint, body) + body: Option<(Body, Mime)>, + ) -> Result { + self.transport.request(Method::PUT, endpoint, body).await } - fn post_json( + async fn post_json( &self, - endpoint: &str, + endpoint: impl AsRef, body: Option<(B, Mime)>, - ) -> impl Future + ) -> Result where - B: Into, T: serde::de::DeserializeOwned, + B: Into, { - self.transport - .request(Method::POST, endpoint, body) - .and_then(|v| { - serde_json::from_str::(&v) - .map_err(Error::SerdeJsonError) - .into_future() - }) + let string = self.transport.request(Method::POST, endpoint, body).await?; + + Ok(serde_json::from_str::(&string)?) } - fn delete( + async fn delete( &self, endpoint: &str, - ) -> impl Future { + ) -> Result { self.transport - .request::(Method::DELETE, endpoint, None) + .request(Method::DELETE, endpoint, Option::<(Body, Mime)>::None) + .await } - fn delete_json( + async fn delete_json( &self, endpoint: &str, - ) -> impl Future { - self.transport - .request::(Method::DELETE, endpoint, None) - .and_then(|v| { - serde_json::from_str::(&v) - .map_err(Error::SerdeJsonError) - .into_future() - }) + ) -> Result { + let string = self + .transport + .request(Method::DELETE, endpoint, Option::<(Body, Mime)>::None) + .await?; + + Ok(serde_json::from_str::(&string)?) } - fn stream_post( - &self, - endpoint: &str, - body: Option<(B, Mime)>, + fn stream_post<'a, H>( + &'a self, + endpoint: impl AsRef + 'a, + body: Option<(Body, Mime)>, headers: Option, - ) -> impl Stream + ) -> impl Stream> + 'a where - B: Into, - H: IntoIterator, + H: IntoIterator + 'a, { self.transport .stream_chunks(Method::POST, endpoint, body, headers) } - fn stream_get( - &self, - endpoint: &str, - ) -> impl Stream { + fn stream_get<'a>( + &'a self, + endpoint: impl AsRef + Unpin + 'a, + ) -> impl Stream> + 'a { + let headers = Some(Vec::default()); self.transport - .stream_chunks::>(Method::GET, endpoint, None, None) + .stream_chunks(Method::GET, endpoint, Option::<(Body, Mime)>::None, headers) } - fn stream_post_upgrade_multiplexed( - &self, - endpoint: &str, - body: Option<(B, Mime)>, - ) -> impl Future - where - B: Into + 'static, - { + async fn stream_post_upgrade<'a>( + &'a self, + endpoint: impl AsRef + 'a, + body: Option<(Body, Mime)>, + ) -> Result { self.transport - .stream_upgrade_multiplexed(Method::POST, endpoint, body) + .stream_upgrade(Method::POST, endpoint, body) + .await } } diff --git a/src/read.rs b/src/read.rs deleted file mode 100644 index b9dc5ef9..00000000 --- a/src/read.rs +++ /dev/null @@ -1,105 +0,0 @@ -use crate::errors::Error; -use futures::{Async, Stream}; -use hyper::Chunk; -use std::{ - cmp, - io::{self, Read}, -}; -use tokio_io::AsyncRead; - -/* - * The following is taken from - * https://github.com/ferristseng/rust-ipfs-api/blob/master/ipfs-api/src/read.rs. - * TODO: see with upstream author to move to a separate crate. - */ - -/// The state of a stream returning Chunks. -/// -enum ReadState { - /// A chunk is ready to be read from. - /// - Ready(Chunk, usize), - - /// The next chunk isn't ready yet. - /// - NotReady, -} - -/// Reads from a stream of chunks asynchronously. -/// -pub struct StreamReader { - stream: S, - state: ReadState, -} - -impl StreamReader -where - S: Stream, -{ - #[inline] - pub fn new(stream: S) -> StreamReader { - StreamReader { - stream, - state: ReadState::NotReady, - } - } -} - -impl Read for StreamReader -where - S: Stream, -{ - fn read( - &mut self, - buf: &mut [u8], - ) -> io::Result { - loop { - let ret; - - match self.state { - // Stream yielded a Chunk to read. - // - ReadState::Ready(ref mut chunk, ref mut pos) => { - let chunk_start = *pos; - let len = cmp::min(buf.len(), chunk.len() - chunk_start); - let chunk_end = chunk_start + len; - - buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]); - *pos += len; - - if *pos == chunk.len() { - ret = len; - } else { - return Ok(len); - } - } - // Stream is not ready, and a Chunk needs to be read. - // - ReadState::NotReady => { - match self.stream.poll() { - // Polling stream yielded a Chunk that can be read from. - // - Ok(Async::Ready(Some(chunk))) => { - self.state = ReadState::Ready(chunk, 0); - - continue; - } - // Polling stream yielded EOF. - // - Ok(Async::Ready(None)) => return Ok(0), - // Stream could not be read from. - // - Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()), - Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e.to_string())), - } - } - } - - self.state = ReadState::NotReady; - - return Ok(ret); - } - } -} - -impl AsyncRead for StreamReader where S: Stream {} diff --git a/src/transport.rs b/src/transport.rs index 7cc1fb18..112d5486 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -1,13 +1,15 @@ //! Transports for communicating with the docker daemon use crate::{Error, Result}; -use futures::{ - future::{self, Either}, - Future, IntoFuture, Stream, +use futures_util::{ + io::{AsyncRead, AsyncWrite}, + stream::Stream, + StreamExt, TryFutureExt, }; use hyper::{ + body::Bytes, client::{Client, HttpConnector}, - header, Body, Chunk, Method, Request, StatusCode, + header, Body, Method, Request, StatusCode, }; #[cfg(feature = "tls")] use hyper_openssl::HttpsConnector; @@ -15,12 +17,14 @@ use hyper_openssl::HttpsConnector; use hyperlocal::UnixConnector; #[cfg(feature = "unix-socket")] use hyperlocal::Uri as DomainUri; -use log::debug; use mime::Mime; +use pin_project::pin_project; use serde::{Deserialize, Serialize}; -use serde_json; -use std::{fmt, iter}; -use tokio_io::{AsyncRead, AsyncWrite}; +use std::{ + fmt, io, iter, + pin::Pin, + task::{Context, Poll}, +}; pub fn tar() -> Mime { "application/tar".parse().unwrap() @@ -66,116 +70,133 @@ impl fmt::Debug for Transport { impl Transport { /// Make a request and return the whole response in a `String` - pub fn request( + pub async fn request( &self, method: Method, - endpoint: &str, + endpoint: impl AsRef, body: Option<(B, Mime)>, - ) -> impl Future + ) -> Result where B: Into, { - let endpoint = endpoint.to_string(); - self.stream_chunks(method, &endpoint, body, None::>) - .concat2() - .and_then(|v| { - String::from_utf8(v.to_vec()) - .map_err(Error::Encoding) - .into_future() - }) - .inspect(move |body| debug!("{} raw response: {}", endpoint, body)) + let body = self + .get_body(method, endpoint, body, None::>) + .await?; + let bytes = hyper::body::to_bytes(body).await?; + let string = String::from_utf8(bytes.to_vec())?; + + Ok(string) } - /// Make a request and return a `Stream` of `Chunks` as they are returned. - pub fn stream_chunks( + async fn get_body( &self, method: Method, - endpoint: &str, + endpoint: impl AsRef, body: Option<(B, Mime)>, headers: Option, - ) -> impl Stream + ) -> Result where B: Into, H: IntoIterator, { let req = self - .build_request(method, endpoint, body, headers, |_| ()) + .build_request(method, endpoint, body, headers, Request::builder()) .expect("Failed to build request!"); - self.send_request(req) - .and_then(|res| { - let status = res.status(); - match status { - // Success case: pass on the response - StatusCode::OK - | StatusCode::CREATED - | StatusCode::SWITCHING_PROTOCOLS - | StatusCode::NO_CONTENT => Either::A(future::ok(res)), - // Error case: parse the body to try to extract the error message - _ => Either::B( - res.into_body() - .concat2() - .map_err(Error::Hyper) - .and_then(|v| { - String::from_utf8(v.into_iter().collect::>()) - .map_err(Error::Encoding) - }) - .and_then(move |body| { - future::err(Error::Fault { - code: status, - message: Self::get_error_message(&body).unwrap_or_else(|| { - status - .canonical_reason() - .unwrap_or_else(|| "unknown error code") - .to_owned() - }), - }) - }), - ), - } - }) - .map(|r| { - // Convert the response body into a stream of chunks - r.into_body().map_err(Error::Hyper) - }) - .flatten_stream() + let response = self.send_request(req).await?; + + let status = response.status(); + + match status { + // Success case: pass on the response + StatusCode::OK + | StatusCode::CREATED + | StatusCode::SWITCHING_PROTOCOLS + | StatusCode::NO_CONTENT => Ok(response.into_body()), + _ => { + let bytes = hyper::body::to_bytes(response.into_body()).await?; + let message_body = String::from_utf8(bytes.to_vec())?; + + Err(Error::Fault { + code: status, + message: Self::get_error_message(&message_body).unwrap_or_else(|| { + status + .canonical_reason() + .unwrap_or_else(|| "unknown error code") + .to_owned() + }), + }) + } + } + } + + async fn get_chunk_stream( + &self, + method: Method, + endpoint: impl AsRef, + body: Option<(B, Mime)>, + headers: Option, + ) -> Result>> + where + B: Into, + H: IntoIterator, + { + let body = self.get_body(method, endpoint, body, headers).await?; + + Ok(stream_body(body)) + } + + pub fn stream_chunks<'a, H, B>( + &'a self, + method: Method, + endpoint: impl AsRef + 'a, + body: Option<(B, Mime)>, + headers: Option, + ) -> impl Stream> + 'a + where + H: IntoIterator + 'a, + B: Into + 'a, + { + self.get_chunk_stream(method, endpoint, body, headers) + .try_flatten_stream() } /// Builds an HTTP request. fn build_request( &self, method: Method, - endpoint: &str, + endpoint: impl AsRef, body: Option<(B, Mime)>, headers: Option, - f: impl FnOnce(&mut ::hyper::http::request::Builder), + builder: hyper::http::request::Builder, ) -> Result> where B: Into, H: IntoIterator, { - let mut builder = Request::builder(); - f(&mut builder); - let req = match *self { Transport::Tcp { ref host, .. } => { - builder.method(method).uri(&format!("{}{}", host, endpoint)) + builder + .method(method) + .uri(&format!("{}{}", host, endpoint.as_ref())) } #[cfg(feature = "tls")] Transport::EncryptedTcp { ref host, .. } => { - builder.method(method).uri(&format!("{}{}", host, endpoint)) + builder + .method(method) + .uri(&format!("{}{}", host, endpoint.as_ref())) } #[cfg(feature = "unix-socket")] Transport::Unix { ref path, .. } => { - let uri: hyper::Uri = DomainUri::new(&path, endpoint).into(); - builder.method(method).uri(&uri.to_string()) + let uri = DomainUri::new(&path, endpoint.as_ref()); + builder.method(method).uri(uri) } }; - let req = req.header(header::HOST, ""); + let mut req = req.header(header::HOST, ""); if let Some(h) = headers { for (k, v) in h.into_iter() { - req.header(k, v); + req = req.header(k, v); } } @@ -188,19 +209,17 @@ impl Transport { } /// Send the given request to the docker daemon and return a Future of the response. - fn send_request( + async fn send_request( &self, req: Request, - ) -> impl Future, Error = Error> { - let req = match self { - Transport::Tcp { ref client, .. } => client.request(req), + ) -> Result> { + match self { + Transport::Tcp { ref client, .. } => Ok(client.request(req).await?), #[cfg(feature = "tls")] - Transport::EncryptedTcp { ref client, .. } => client.request(req), + Transport::EncryptedTcp { ref client, .. } => Ok(client.request(req).await?), #[cfg(feature = "unix-socket")] - Transport::Unix { ref client, .. } => client.request(req), - }; - - req.map_err(Error::Hyper) + Transport::Unix { ref client, .. } => Ok(client.request(req).await?), + } } /// Makes an HTTP request, upgrading the connection to a TCP @@ -208,12 +227,12 @@ impl Transport { /// /// This method can be used for operations such as viewing /// docker container logs interactively. - pub fn stream_upgrade( + async fn stream_upgrade_tokio( &self, method: Method, - endpoint: &str, + endpoint: impl AsRef, body: Option<(B, Mime)>, - ) -> impl Future + ) -> Result where B: Into, { @@ -226,32 +245,37 @@ impl Transport { }; let req = self - .build_request(method, endpoint, body, None::>, |builder| { - builder + .build_request( + method, + endpoint, + body, + None::>, + Request::builder() .header(header::CONNECTION, "Upgrade") - .header(header::UPGRADE, "tcp"); - }) + .header(header::UPGRADE, "tcp"), + ) .expect("Failed to build request!"); - self.send_request(req) - .and_then(|res| match res.status() { - StatusCode::SWITCHING_PROTOCOLS => Ok(res), - _ => Err(Error::ConnectionNotUpgraded), - }) - .and_then(|res| res.into_body().on_upgrade().from_err()) + let response = self.send_request(req).await?; + + match response.status() { + StatusCode::SWITCHING_PROTOCOLS => Ok(response.into_body().on_upgrade().await?), + _ => Err(Error::ConnectionNotUpgraded), + } } - pub fn stream_upgrade_multiplexed( + pub async fn stream_upgrade( &self, method: Method, - endpoint: &str, + endpoint: impl AsRef, body: Option<(B, Mime)>, - ) -> impl Future + ) -> Result where - B: Into + 'static, + B: Into, { - self.stream_upgrade(method, endpoint, body) - .map(crate::tty::Multiplexed::new) + let tokio_multiplexer = self.stream_upgrade_tokio(method, endpoint, body).await?; + + Ok(Compat { tokio_multiplexer }) } /// Extract the error message content from an HTTP response that @@ -263,7 +287,61 @@ impl Transport { } } +#[pin_project] +struct Compat { + #[pin] + tokio_multiplexer: S, +} + +impl AsyncRead for Compat +where + S: tokio::io::AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.project().tokio_multiplexer.poll_read(cx, buf) + } +} + +impl AsyncWrite for Compat +where + S: tokio::io::AsyncWrite, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().tokio_multiplexer.poll_write(cx, buf) + } + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().tokio_multiplexer.poll_flush(cx) + } + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().tokio_multiplexer.poll_shutdown(cx) + } +} + #[derive(Serialize, Deserialize)] struct ErrorResponse { message: String, } + +fn stream_body(body: Body) -> impl Stream> { + async fn unfold(mut body: Body) -> Option<(Result, Body)> { + let chunk_result = body.next().await?.map_err(Error::from); + + Some((chunk_result, body)) + } + + futures_util::stream::unfold(body, unfold) +} diff --git a/src/tty.rs b/src/tty.rs index a4ab27fd..a26846f8 100644 --- a/src/tty.rs +++ b/src/tty.rs @@ -1,278 +1,172 @@ -use crate::errors::Error; -use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; -use bytes::BytesMut; -use futures::{self, Async}; -use hyper::rt::{Future, Stream}; -use log::trace; -use std::io::{self, Cursor}; -use tokio_codec::Decoder; -use tokio_io::{AsyncRead, AsyncWrite}; - -#[derive(Debug)] -pub struct Chunk { - pub stream_type: StreamType, - pub data: Vec, -} - -#[derive(Debug, Clone, Copy)] -pub enum StreamType { - StdIn, - StdOut, - StdErr, -} - -/// A multiplexed stream. -pub struct Multiplexed { - stdin: Box, - chunks: Box>, -} - -pub struct MultiplexedBlocking { - stdin: Box, - chunks: Box>>, -} - -/// Represent the current state of the decoding of a TTY frame -enum TtyDecoderState { - /// We have yet to read a frame header - WaitingHeader, - /// We have read a header and extracted the payload size and stream type, - /// and are now waiting to read the corresponding payload - WaitingPayload(usize, StreamType), -} - -pub struct TtyDecoder { - state: TtyDecoderState, -} - -impl Chunk { - /// Interprets the raw bytes as a string. - /// - /// Returns `None` if the raw bytes do not represent - /// a valid UTF-8 string. - pub fn as_string(&self) -> Option { - String::from_utf8(self.data.clone()).ok() - } - - /// Unconditionally interprets the raw bytes as a string. - /// - /// Inserts placeholder symbols for all non-character bytes. - pub fn as_string_lossy(&self) -> String { - String::from_utf8_lossy(&self.data).into_owned() +//! Types for working with docker TTY streams + +use crate::{Error, Result}; +use bytes::{BigEndian, ByteOrder}; +use futures_util::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite}, + stream::{Stream, TryStreamExt}, +}; +use pin_project::pin_project; +use std::io; + +/// An enum representing a chunk of TTY text streamed from a Docker container. +/// +/// For convenience, this type can deref to the contained `Vec`. +#[derive(Debug, Clone)] +pub enum TtyChunk { + StdIn(Vec), + StdOut(Vec), + StdErr(Vec), +} + +impl From for Vec { + fn from(tty_chunk: TtyChunk) -> Self { + match tty_chunk { + TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes, + } } } -impl TtyDecoder { - pub fn new() -> Self { - Self { - state: TtyDecoderState::WaitingHeader, +impl AsRef> for TtyChunk { + fn as_ref(&self) -> &Vec { + match self { + TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes, } } } -impl Default for TtyDecoder { - fn default() -> Self { - Self::new() +impl std::ops::Deref for TtyChunk { + type Target = Vec; + fn deref(&self) -> &Self::Target { + self.as_ref() } } -impl Decoder for TtyDecoder { - type Item = Chunk; - type Error = Error; - - fn decode( - &mut self, - src: &mut BytesMut, - ) -> Result, Self::Error> { - loop { - match self.state { - TtyDecoderState::WaitingHeader => { - if src.len() < 8 { - trace!("Not enough data to read a header"); - return Ok(None); - } else { - trace!("Reading header"); - let header_bytes = src.split_to(8); - let payload_size: Vec = header_bytes[4..8].to_vec(); - let stream_type = match header_bytes[0] { - 0 => { - return Err(Error::InvalidResponse( - "Unsupported stream of type stdin".to_string(), - )); - } - 1 => StreamType::StdOut, - 2 => StreamType::StdErr, - n => { - return Err(Error::InvalidResponse(format!( - "Unsupported stream of type {}", - n - ))); - } - }; - - let length = - Cursor::new(&payload_size).read_u32::().unwrap() as usize; - trace!( - "Read header: length = {}, stream_type = {:?}", - length, - stream_type - ); - // We've successfully read a header, now we wait for the payload - self.state = TtyDecoderState::WaitingPayload(length, stream_type); - continue; - } - } - TtyDecoderState::WaitingPayload(len, stream_type) => { - if src.len() < len { - trace!( - "Not enough data to read payload. Need {} but only {} available", - len, - src.len() - ); - return Ok(None); - } else { - trace!("Reading payload"); - let data = src.split_to(len)[..].to_owned(); - let tty_chunk = Chunk { stream_type, data }; - - // We've successfully read a full frame, now we go back to waiting for the next - // header - self.state = TtyDecoderState::WaitingHeader; - return Ok(Some(tty_chunk)); - } - } - } +impl std::ops::DerefMut for TtyChunk { + fn deref_mut(&mut self) -> &mut Vec { + match self { + TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes, } } } -impl Multiplexed { - /// Create a multiplexed stream. - pub(crate) fn new(stream: T) -> Multiplexed - where - T: AsyncRead + AsyncWrite + 'static, - { - let (reader, stdin) = stream.split(); - Multiplexed { - chunks: Box::new(chunks(reader)), - stdin: Box::new(stdin), - } - } +async fn decode_chunk(mut stream: S) -> Option<(Result, S)> +where + S: AsyncRead + Unpin, +{ + let mut header_bytes = vec![0u8; 8]; - pub fn wait(self) -> MultiplexedBlocking { - MultiplexedBlocking { - stdin: self.stdin, - chunks: Box::new(self.chunks.wait()), - } + match stream.read_exact(&mut header_bytes).await { + Err(e) if e.kind() == futures_util::io::ErrorKind::UnexpectedEof => return None, + Err(e) => return Some((Err(Error::IO(e)), stream)), + _ => (), } -} - -impl futures::Stream for Multiplexed { - type Item = Chunk; - type Error = crate::Error; - fn poll(&mut self) -> Result>, crate::Error> { - self.chunks.poll() - } -} + let size_bytes = &header_bytes[4..]; + let data_length = BigEndian::read_u32(size_bytes); -impl Iterator for MultiplexedBlocking { - type Item = Result; + let mut data = vec![0u8; data_length as usize]; - fn next(&mut self) -> Option> { - self.chunks.next() + if stream.read_exact(&mut data).await.is_err() { + return None; } -} - -macro_rules! delegate_io_write { - ($ty:ty) => { - impl io::Write for $ty { - fn write( - &mut self, - buf: &[u8], - ) -> Result { - self.stdin.write(buf) - } - fn flush(&mut self) -> Result<(), io::Error> { - self.stdin.flush() - } - } + let chunk = match header_bytes[0] { + 0 => TtyChunk::StdIn(data), + 1 => TtyChunk::StdOut(data), + 2 => TtyChunk::StdErr(data), + n => panic!("invalid stream number from docker daemon: '{}'", n), }; -} -delegate_io_write!(Multiplexed); -delegate_io_write!(MultiplexedBlocking); + Some((Ok(chunk), stream)) +} -pub fn chunks(stream: S) -> impl futures::Stream +pub(crate) fn decode(hyper_chunk_stream: S) -> impl Stream> where - S: AsyncRead, + S: Stream> + Unpin, { - let stream = futures::stream::unfold(stream, |stream| { - let header_future = ::tokio_io::io::read_exact(stream, vec![0; 8]); + let stream = hyper_chunk_stream + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .into_async_read(); - let fut = header_future.and_then(|(stream, header_bytes)| { - let size_bytes = &header_bytes[4..]; - let data_length = BigEndian::read_u32(size_bytes); - let stream_type = match header_bytes[0] { - 0 => StreamType::StdIn, - 1 => StreamType::StdOut, - 2 => StreamType::StdErr, - n => panic!("invalid stream number from docker daemon: '{}'", n), - }; + futures_util::stream::unfold(stream, decode_chunk) +} - ::tokio_io::io::read_exact(stream, vec![0; data_length as usize]) - .map(move |(stream, data)| (Chunk { stream_type, data }, stream)) - }); - // FIXME: when updated to futures 0.2, the future itself returns the Option((Chunk, - // stream)). - // This is much better because it would allow us to swallow the unexpected eof and - // stop the stream much cleaner than writing a custom stream filter. - Some(fut) - }); +type TtyReader<'a> = Pin> + Send + 'a>>; +type TtyWriter<'a> = Pin>; - util::stop_on_err(stream, |e| e.kind() != io::ErrorKind::UnexpectedEof) - .map_err(crate::Error::from) +/// TTY multiplexer returned by the `attach` method. +/// +/// This object can emit a stream of `TtyChunk`s and also implements `AsyncWrite` for streaming bytes to Stdin. +#[pin_project] +pub struct Multiplexer<'a> { + #[pin] + reader: TtyReader<'a>, + #[pin] + writer: TtyWriter<'a>, } -mod util { - use futures::{Async, Stream}; +impl<'a> Multiplexer<'a> { + pub(crate) fn new(tcp_connection: T) -> Self + where + T: AsyncRead + AsyncWrite + Send + 'a, + { + let (reader, writer) = tcp_connection.split(); - pub struct StopOnError { - stream: S, - f: F, + Self { + reader: Box::pin(futures_util::stream::unfold(reader, |reader| { + decode_chunk(reader) + })), + writer: Box::pin(writer), + } } +} - pub fn stop_on_err( - stream: S, - f: F, - ) -> StopOnError - where - S: Stream, - F: FnMut(&S::Error) -> bool, - { - StopOnError { stream, f } +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +impl<'a> Stream for Multiplexer<'a> { + type Item = Result; + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().reader.poll_next(cx) } +} - impl Stream for StopOnError - where - S: Stream, - F: FnMut(&S::Error) -> bool, - { - type Item = S::Item; - type Error = S::Error; +impl<'a> AsyncWrite for Multiplexer<'a> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().writer.poll_write(cx, buf) + } + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().writer.poll_flush(cx) + } + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().writer.poll_close(cx) + } +} - fn poll(&mut self) -> Result>, S::Error> { - match self.stream.poll() { - Err(e) => { - if (self.f)(&e) { - Err(e) - } else { - Ok(Async::Ready(None)) - } - } - a => a, - } - } +impl<'a> Multiplexer<'a> { + /// Split the `Multiplexer` into the component `Stream` and `AsyncWrite` parts + pub fn split( + self + ) -> ( + impl Stream> + 'a, + impl AsyncWrite + Send + 'a, + ) { + (self.reader, self.writer) } }