From f4737970b58a4fbd7ea5c192501ce3814967ee66 Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 4 Sep 2024 17:25:24 +0800 Subject: [PATCH] feat: add interrupt for download from remote peers (#717) Signed-off-by: Gaius --- Cargo.lock | 16 ++++---- Cargo.toml | 16 ++++---- .../src/grpc/dfdaemon_download.rs | 4 +- dragonfly-client/src/grpc/dfdaemon_upload.rs | 6 +-- dragonfly-client/src/proxy/mod.rs | 4 +- dragonfly-client/src/resource/cache_task.rs | 2 +- .../src/resource/piece_collector.rs | 2 +- dragonfly-client/src/resource/task.rs | 38 ++++++++++++++----- 8 files changed, 54 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5cddb344..c297c48a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -861,7 +861,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.1.105" +version = "0.1.106" dependencies = [ "anyhow", "blake3", @@ -932,7 +932,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.1.105" +version = "0.1.106" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -955,7 +955,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.1.105" +version = "0.1.106" dependencies = [ "bytesize", "bytesize-serde", @@ -976,7 +976,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.1.105" +version = "0.1.106" dependencies = [ "hyper 1.4.1", "hyper-util", @@ -991,7 +991,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.1.105" +version = "0.1.106" dependencies = [ "anyhow", "clap", @@ -1007,7 +1007,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.1.105" +version = "0.1.106" dependencies = [ "base16ct", "bincode", @@ -1032,7 +1032,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.1.105" +version = "0.1.106" dependencies = [ "base16ct", "blake3", @@ -1386,7 +1386,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.1.105" +version = "0.1.106" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index 7b1585c5..c6a5d7b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.1.105" +version = "0.1.106" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,13 +22,13 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.1.105" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.105" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.105" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.105" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.105" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.105" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.105" } +dragonfly-client = { path = "dragonfly-client", version = "0.1.106" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.106" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.106" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.106" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.106" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.106" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.106" } thiserror = "1.0" dragonfly-api = "=2.0.154" reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index 77bb5367..e2f9881c 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -353,7 +353,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { let download_clone = download.clone(); let task_manager_clone = task_manager.clone(); let task_clone = task.clone(); - let (out_stream_tx, out_stream_rx) = mpsc::channel(1024); + let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10); tokio::spawn( async move { match task_manager_clone @@ -743,7 +743,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { let request_clone = request.clone(); let task_manager_clone = task_manager.clone(); let task_clone = task.clone(); - let (out_stream_tx, out_stream_rx) = mpsc::channel(1024); + let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10); tokio::spawn( async move { match task_manager_clone diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index 2c826ce3..eb0645d4 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -337,7 +337,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { let download_clone = download.clone(); let task_manager_clone = task_manager.clone(); let task_clone = task.clone(); - let (out_stream_tx, out_stream_rx) = mpsc::channel(1024); + let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10); tokio::spawn( async move { match task_manager_clone @@ -636,7 +636,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { let task_manager = self.task.clone(); // Initialize stream channel. - let (out_stream_tx, out_stream_rx) = mpsc::channel(1024); + let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10); tokio::spawn( async move { loop { @@ -926,7 +926,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { let request_clone = request.clone(); let task_manager_clone = task_manager.clone(); let task_clone = task.clone(); - let (out_stream_tx, out_stream_rx) = mpsc::channel(1024); + let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10); tokio::spawn( async move { match task_manager_clone diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index 58ff49e2..413921fe 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -556,10 +556,10 @@ async fn proxy_by_dfdaemon( }; // Write the task data to the reader. - let (reader, mut writer) = tokio::io::duplex(1024); + let (reader, mut writer) = tokio::io::duplex(4096); // Write the status code to the writer. - let (sender, mut receiver) = mpsc::channel(4096); + let (sender, mut receiver) = mpsc::channel(1024 * 10); // Construct the response body. let reader_stream = ReaderStream::new(reader); diff --git a/dragonfly-client/src/resource/cache_task.rs b/dragonfly-client/src/resource/cache_task.rs index be1bfb31..b623f7b8 100644 --- a/dragonfly-client/src/resource/cache_task.rs +++ b/dragonfly-client/src/resource/cache_task.rs @@ -474,7 +474,7 @@ impl CacheTask { let mut finished_pieces: Vec = Vec::new(); // Initialize stream channel. - let (in_stream_tx, in_stream_rx) = mpsc::channel(4096); + let (in_stream_tx, in_stream_rx) = mpsc::channel(1024 * 10); // Send the register peer request. in_stream_tx diff --git a/dragonfly-client/src/resource/piece_collector.rs b/dragonfly-client/src/resource/piece_collector.rs index 34365012..54e575a9 100644 --- a/dragonfly-client/src/resource/piece_collector.rs +++ b/dragonfly-client/src/resource/piece_collector.rs @@ -109,7 +109,7 @@ impl PieceCollector { let interested_pieces = self.interested_pieces.clone(); let collected_pieces = self.collected_pieces.clone(); let collected_piece_timeout = self.config.download.piece_timeout; - let (collected_piece_tx, collected_piece_rx) = mpsc::channel(1024); + let (collected_piece_tx, collected_piece_rx) = mpsc::channel(1024 * 10); tokio::spawn( async move { Self::collect_from_remote_peers( diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 7b894164..dece63cd 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -49,7 +49,8 @@ use dragonfly_client_util::{ }; use reqwest::header::HeaderMap; use std::path::Path; -use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use tokio::sync::{ mpsc::{self, Sender}, @@ -481,7 +482,7 @@ impl Task { let mut finished_pieces: Vec = Vec::new(); // Initialize stream channel. - let (in_stream_tx, in_stream_rx) = mpsc::channel(4096); + let (in_stream_tx, in_stream_rx) = mpsc::channel(1024 * 10); // Send the register peer request. in_stream_tx @@ -923,6 +924,13 @@ impl Task { ); let mut piece_collector_rx = piece_collector.run().await; + // Initialize the interrupt. If download from remote peer failed with scheduler or download + // progress, interrupt the collector and return the finished pieces. + let interrupt = Arc::new(AtomicBool::new(false)); + + // Initialize the finished pieces. + let finished_pieces = Arc::new(Mutex::new(Vec::new())); + // Initialize the join set. let mut join_set = JoinSet::new(); let semaphore = Arc::new(Semaphore::new( @@ -931,6 +939,13 @@ impl Task { // Download the pieces from the remote peers. while let Some(collect_piece) = piece_collector_rx.recv().await { + if interrupt.load(Ordering::SeqCst) { + // If the interrupt is true, break the collector loop. + info!("interrupt the piece collector"); + drop(piece_collector_rx); + break; + } + async fn download_from_remote_peer( task_id: String, host_id: String, @@ -943,6 +958,8 @@ impl Task { semaphore: Arc, download_progress_tx: Sender>, in_stream_tx: Sender, + interrupt: Arc, + finished_pieces: Arc>>, ) -> ClientResult { // Limit the concurrent piece count. let _permit = semaphore.acquire().await.unwrap(); @@ -1012,6 +1029,7 @@ impl Task { storage.piece_id(task_id.as_str(), number), err ); + interrupt.store(true, Ordering::SeqCst); err })?; @@ -1039,6 +1057,7 @@ impl Task { storage.piece_id(task_id.as_str(), number), err ); + interrupt.store(true, Ordering::SeqCst); err })?; @@ -1048,6 +1067,9 @@ impl Task { metadata.parent_id ); + let mut finished_pieces = finished_pieces.lock().unwrap(); + finished_pieces.push(metadata.clone()); + Ok(metadata) } @@ -1064,14 +1086,13 @@ impl Task { semaphore.clone(), download_progress_tx.clone(), in_stream_tx.clone(), + interrupt.clone(), + finished_pieces.clone(), ) .in_current_span(), ); } - // Initialize the finished pieces. - let mut finished_pieces: Vec = Vec::new(); - // Wait for the pieces to be downloaded. while let Some(message) = join_set .join_next() @@ -1080,10 +1101,7 @@ impl Task { .or_err(ErrorType::AsyncRuntimeError)? { match message { - Ok(metadata) => { - // Store the finished piece. - finished_pieces.push(metadata.clone()); - } + Ok(_) => {} Err(Error::DownloadFromRemotePeerFailed(err)) => { let (piece_number, parent_id) = (err.piece_number, err.parent_id); @@ -1125,6 +1143,7 @@ impl Task { // If the send timeout with scheduler or download progress, return the finished pieces. // It will stop the download from the remote peer with scheduler // and download from the source directly from middle. + let finished_pieces = finished_pieces.lock().unwrap().clone(); return Ok(finished_pieces); } Err(err) => { @@ -1137,6 +1156,7 @@ impl Task { } } + let finished_pieces = finished_pieces.lock().unwrap().clone(); Ok(finished_pieces) }