Skip to content

Commit

Permalink
Cloning async and parallel if possible
Browse files Browse the repository at this point in the history
  • Loading branch information
eikek committed Jul 12, 2024
1 parent 4773ec1 commit c104957
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 84 deletions.
2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@

# Additional dev-shell environment variables can be set directly
# MY_CUSTOM_DEVELOPMENT_VAR = "something else";
RENKU_CLI_RENKU_URL = "https://ci-renku-3668.dev.renku.ch";
RENKU_CLI_RENKU_URL = "https://ci-renku-3689.dev.renku.ch";

# Enable mold https://github.com/rui314/mold
CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER = "${pkgs.clang}/bin/clang";
Expand Down
2 changes: 1 addition & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub async fn execute_cmd(opts: MainOpts) -> Result<(), CmdError> {
let mut app = MainOpts::command();
input.print_completions(&mut app).await;
}
SubCommand::Project(input) => input.exec(&ctx).await?,
SubCommand::Project(input) => input.exec(ctx).await?,

#[cfg(feature = "user-doc")]
SubCommand::UserDoc(input) => input.exec(&ctx).await?,
Expand Down
8 changes: 4 additions & 4 deletions src/cli/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ use snafu::{ResultExt, Snafu};

const RENKULAB_IO: &str = "https://renkulab.io";

pub struct Context<'a> {
pub opts: &'a CommonOpts,
pub struct Context {
pub opts: CommonOpts,
pub client: Client,
pub renku_url: String,
}

impl Context<'_> {
impl Context {
pub fn new(opts: &CommonOpts) -> Result<Context, CmdError> {
let base_url = get_renku_url(opts);
let client = Client::new(&base_url, proxy_settings(opts), &None, false)
.context(ContextCreateSnafu)?;
Ok(Context {
opts,
opts: opts.clone(),
client,
renku_url: base_url,
})
Expand Down
2 changes: 1 addition & 1 deletion src/cli/cmd/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct Input {
}

impl Input {
pub async fn exec<'a>(&self, ctx: &Context<'a>) -> Result<(), Error> {
pub async fn exec(&self, ctx: Context) -> Result<(), Error> {
match &self.subcmd {
ProjectCommand::Clone(input) => input.exec(ctx).await.context(CloneSnafu),
}
Expand Down
142 changes: 90 additions & 52 deletions src/cli/cmd/project/clone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,29 @@ use super::Context;
use crate::cli::sink::Error as SinkError;
use crate::httpclient::Error as HttpError;
use crate::util::data::{ProjectId, SimpleMessage};
use std::sync::Arc;

use clap::Parser;
use git2::{Error as GitError, Repository};
use snafu::{ResultExt, Snafu};
use std::path::Path;
use std::path::PathBuf;
use tokio::task::{JoinError, JoinSet};

/// Clone a project
/// Clone a project.
///
/// Clones a renku project by creating a directory with the project
/// slug and cloning each code repository into it.
#[derive(Parser, Debug)]
pub struct Input {
/// The first argument is the project to clone, identified by
/// either its id or the namespace/slug identifier. The second
/// argument is optional, defining the target directory to create
/// the project in.
#[arg(required = true, num_args = 1..=2)]
pub project_and_target: Vec<String>,
/// The project to clone, identified by either its id or the
/// namespace/slug identifier.
#[arg()]
pub project_ref: String,

/// Optional target directory to create the project in. By default
/// the current working directory is used.
#[arg()]
pub target_dir: Option<String>,
}

#[derive(Debug, Snafu)]
Expand All @@ -43,92 +50,123 @@ pub enum Error {

#[snafu(display("Error cloning project: {}", source))]
GitClone { source: GitError },

#[snafu(display("Error in task: {}", source))]
TaskJoin { source: JoinError },
}

impl Input {
pub async fn exec<'a>(&self, ctx: &Context<'a>) -> Result<(), Error> {
let details = match self.project_id()? {
pub async fn exec(&self, ctx: Context) -> Result<(), Error> {
let project_id = self.project_id()?;
let opt_details = match &project_id {
ProjectId::NamespaceSlug { namespace, slug } => ctx
.client
.get_project_by_slug(&namespace, &slug, ctx.opts.verbose > 1)
.get_project_by_slug(namespace, slug, ctx.opts.verbose > 1)
.await
.context(HttpClientSnafu)?,
ProjectId::Id(id) => ctx
.client
.get_project_by_id(&id, ctx.opts.verbose > 1)
.get_project_by_id(id, ctx.opts.verbose > 1)
.await
.context(HttpClientSnafu)?,
};
let target = self.target_dir()?.join(&details.slug);
ctx.write_err(&SimpleMessage {
message: format!(
"Cloning {} ({}) into {}...",
details.slug,
details.id,
target.display()
),
})
.await
.context(WriteResultSnafu)?;

clone_project(ctx, &details, &target).await?;
ctx.write_result(&details).await.context(WriteResultSnafu)?;
if let Some(details) = opt_details {
let target = self.target_dir()?.join(&details.slug);
ctx.write_err(&SimpleMessage {
message: format!(
"Cloning {} ({}) into {}...",
details.slug,
details.id,
&target.display()
),
})
.await
.context(WriteResultSnafu)?;

let ctx = clone_project(ctx, &details, target).await?;
ctx.write_result(&details).await.context(WriteResultSnafu)?;
} else {
ctx.write_err(&SimpleMessage {
message: format!("Project '{}' doesn't exist.", &project_id),
})
.await
.context(WriteResultSnafu)?;
}
Ok(())
}

fn project_id(&self) -> Result<ProjectId, Error> {
self.project_and_target
.first()
.unwrap() // clap makes sure there is at least one element (🤞)
self.project_ref
.parse::<ProjectId>()
.context(ProjectIdParseSnafu)
}

fn target_dir(&self) -> Result<PathBuf, Error> {
match self.project_and_target.get(1) {
match &self.target_dir {
Some(dir) => Ok(std::path::PathBuf::from(dir)),
None => std::env::current_dir().context(CurrentDirSnafu),
}
}
}

//TODO make async
async fn clone_project<'a>(
ctx: &Context<'a>,
ctx: Context,
project: &ProjectDetails,
target: &Path,
) -> Result<(), Error> {
std::fs::create_dir_all(target).context(CreateDirSnafu)?;
// TODO use JoinSet or something to propagate errors
futures::future::join_all(
project
.repositories
.iter()
.map(|repo| clone_repository(ctx, &repo, target)),
)
.await;
// for repo in project.repositories.iter() {
// clone_repository(ctx, &repo, target).await?;
// }
Ok(())
target: PathBuf,
) -> Result<Context, Error> {
tokio::fs::create_dir_all(&target)
.await
.context(CreateDirSnafu)?;

let mut tasks = JoinSet::new();
let cc = Arc::new(ctx);
let tt = Arc::new(target);
for repo in project.repositories.iter() {
let cc = cc.clone();
let tt = tt.clone();
let rr = repo.to_string();
tasks.spawn(clone_repository(cc, rr, tt));
}

while let Some(res) = tasks.join_next().await {
res.context(TaskJoinSnafu)??;
}
Ok(Arc::into_inner(cc).unwrap())
}

async fn clone_repository<'a>(ctx: &Context<'a>, repo_url: &str, dir: &Path) -> Result<(), Error> {
async fn clone_repository(
ctx: Arc<Context>,
repo_url: String,
dir: Arc<PathBuf>,
) -> Result<(), Error> {
let name = match repo_url.rsplit_once('/') {
Some((_, n)) => n,
None => "no-name",
};
let local_path = dir.join(&name);
let local_path = dir.join(name);
if local_path.exists() {
ctx.write_err(&SimpleMessage {
message: format!("The repository {} already exists", name),
})
.await
.context(WriteResultSnafu)?;
} else {
// TODO use tokio::task::spawn_blocking!
// TODO use the builder to access more options
Repository::clone(repo_url, &local_path).context(GitCloneSnafu)?;
// TODO use the repository builder to access more options,
// show clone progress and provide credentials
let (repo, repo_url, local_path) = tokio::task::spawn_blocking(|| {
let r = Repository::clone(&repo_url, &local_path).context(GitCloneSnafu);
(r, repo_url, local_path)
})
.await
.context(TaskJoinSnafu)?;
let git_repo = repo?;
if ctx.opts.verbose > 1 {
let head = git_repo
.head()
.ok()
.and_then(|r| r.name().map(str::to_string));
log::debug!("Checked out ref {:?} for repo {}", head, repo_url);
}

ctx.write_err(&SimpleMessage {
message: format!("Cloned: {} to {}", repo_url, local_path.display()),
Expand Down
2 changes: 1 addition & 1 deletion src/cli/cmd/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub enum Error {
}

impl Input {
pub async fn exec<'a>(&self, ctx: &Context<'a>) -> Result<(), Error> {
pub async fn exec(&self, ctx: &Context) -> Result<(), Error> {
if self.client_only {
let vinfo = BuildInfo::default();
ctx.write_result(&vinfo).await.context(WriteResultSnafu)?;
Expand Down
2 changes: 1 addition & 1 deletion src/cli/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::str::FromStr;

/// Main options are available to all commands. They must appear
/// before a sub-command.
#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
#[command()]
pub struct CommonOpts {
/// Be more verbose when logging. Verbosity increases with each
Expand Down
75 changes: 52 additions & 23 deletions src/httpclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,27 +111,52 @@ impl Client {
/// expected structure.
async fn json_get<R: DeserializeOwned>(&self, path: &str, debug: bool) -> Result<R, Error> {
let url = &format!("{}{}", self.base_url, path);
let resp = self
.client
.get(url)
.send()
.await
.context(HttpSnafu { url })?;
if debug {
let resp = self
.client
.get(url)
.send()
.await
.context(HttpSnafu { url })?
.text()
.await
.context(DeserializeRespSnafu)?;
log::debug!("GET {} -> {}", url, resp);
serde_json::from_str::<R>(&resp).context(DeserializeJsonSnafu)
let body = resp.text().await.context(DeserializeRespSnafu)?;
log::debug!("GET {} -> {}", url, body);
serde_json::from_str::<R>(&body).context(DeserializeJsonSnafu)
} else {
self.client
.get(url)
.send()
.await
.context(HttpSnafu { url })?
.json::<R>()
.await
.context(DeserializeRespSnafu)
resp.json::<R>().await.context(DeserializeRespSnafu)
}
}

/// Runs a GET request to the given url. When `debug` is true, the
/// response is first decoded into utf8 chars and logged at debug
/// level. Otherwise bytes are directly decoded from JSON into the
/// expected structure.
async fn json_get_option<R: DeserializeOwned>(
&self,
path: &str,
debug: bool,
) -> Result<Option<R>, Error> {
let url = &format!("{}{}", self.base_url, path);
let resp = self
.client
.get(url)
.send()
.await
.context(HttpSnafu { url })?;

if debug {
if resp.status() == reqwest::StatusCode::NOT_FOUND {
Ok(None)
} else {
let body = &resp.text().await.context(DeserializeRespSnafu)?;
log::debug!("GET {} -> {}", url, body);
let r = serde_json::from_str::<R>(body).context(DeserializeJsonSnafu)?;
Ok(Some(r))
}
} else if resp.status() == reqwest::StatusCode::NOT_FOUND {
Ok(None)
} else {
let r = resp.json::<R>().await.context(DeserializeRespSnafu)?;
Ok(Some(r))
}
}

Expand All @@ -152,18 +177,22 @@ impl Client {
namespace: &str,
slug: &str,
debug: bool,
) -> Result<ProjectDetails, Error> {
) -> Result<Option<ProjectDetails>, Error> {
log::debug!("Get project by namespace/slug: {}/{}", namespace, slug);
let path = format!("/api/data/projects/{}/{}", namespace, slug);
let details = self.json_get::<ProjectDetails>(&path, debug).await?;
let details = self.json_get_option::<ProjectDetails>(&path, debug).await?;
Ok(details)
}

/// Get project details by project id.
pub async fn get_project_by_id(&self, id: &str, debug: bool) -> Result<ProjectDetails, Error> {
pub async fn get_project_by_id(
&self,
id: &str,
debug: bool,
) -> Result<Option<ProjectDetails>, Error> {
log::debug!("Get project by id: {}", id);
let path = format!("/api/data/projects/{}", id);
let details = self.json_get::<ProjectDetails>(&path, debug).await?;
let details = self.json_get_option::<ProjectDetails>(&path, debug).await?;
Ok(details)
}
}

0 comments on commit c104957

Please sign in to comment.