Skip to content

Commit

Permalink
feat: add interrupt for download from remote peers (#717)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Sep 4, 2024
1 parent b200aef commit f473797
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 34 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.105"
version = "0.1.106"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.105" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.105" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.105" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.105" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.105" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.105" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.105" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.106" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.106" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.106" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.106" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.106" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.106" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.106" }
thiserror = "1.0"
dragonfly-api = "=2.0.154"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
Expand Down
4 changes: 2 additions & 2 deletions dragonfly-client/src/grpc/dfdaemon_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
let download_clone = download.clone();
let task_manager_clone = task_manager.clone();
let task_clone = task.clone();
let (out_stream_tx, out_stream_rx) = mpsc::channel(1024);
let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10);
tokio::spawn(
async move {
match task_manager_clone
Expand Down Expand Up @@ -743,7 +743,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
let request_clone = request.clone();
let task_manager_clone = task_manager.clone();
let task_clone = task.clone();
let (out_stream_tx, out_stream_rx) = mpsc::channel(1024);
let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10);
tokio::spawn(
async move {
match task_manager_clone
Expand Down
6 changes: 3 additions & 3 deletions dragonfly-client/src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
let download_clone = download.clone();
let task_manager_clone = task_manager.clone();
let task_clone = task.clone();
let (out_stream_tx, out_stream_rx) = mpsc::channel(1024);
let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10);
tokio::spawn(
async move {
match task_manager_clone
Expand Down Expand Up @@ -636,7 +636,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
let task_manager = self.task.clone();

// Initialize stream channel.
let (out_stream_tx, out_stream_rx) = mpsc::channel(1024);
let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10);
tokio::spawn(
async move {
loop {
Expand Down Expand Up @@ -926,7 +926,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
let request_clone = request.clone();
let task_manager_clone = task_manager.clone();
let task_clone = task.clone();
let (out_stream_tx, out_stream_rx) = mpsc::channel(1024);
let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10);
tokio::spawn(
async move {
match task_manager_clone
Expand Down
4 changes: 2 additions & 2 deletions dragonfly-client/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,10 +556,10 @@ async fn proxy_by_dfdaemon(
};

// Write the task data to the reader.
let (reader, mut writer) = tokio::io::duplex(1024);
let (reader, mut writer) = tokio::io::duplex(4096);

// Write the status code to the writer.
let (sender, mut receiver) = mpsc::channel(4096);
let (sender, mut receiver) = mpsc::channel(1024 * 10);

// Construct the response body.
let reader_stream = ReaderStream::new(reader);
Expand Down
2 changes: 1 addition & 1 deletion dragonfly-client/src/resource/cache_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ impl CacheTask {
let mut finished_pieces: Vec<metadata::Piece> = Vec::new();

// Initialize stream channel.
let (in_stream_tx, in_stream_rx) = mpsc::channel(4096);
let (in_stream_tx, in_stream_rx) = mpsc::channel(1024 * 10);

// Send the register peer request.
in_stream_tx
Expand Down
2 changes: 1 addition & 1 deletion dragonfly-client/src/resource/piece_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl PieceCollector {
let interested_pieces = self.interested_pieces.clone();
let collected_pieces = self.collected_pieces.clone();
let collected_piece_timeout = self.config.download.piece_timeout;
let (collected_piece_tx, collected_piece_rx) = mpsc::channel(1024);
let (collected_piece_tx, collected_piece_rx) = mpsc::channel(1024 * 10);
tokio::spawn(
async move {
Self::collect_from_remote_peers(
Expand Down
38 changes: 29 additions & 9 deletions dragonfly-client/src/resource/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ use dragonfly_client_util::{
};
use reqwest::header::HeaderMap;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::sync::{
mpsc::{self, Sender},
Expand Down Expand Up @@ -481,7 +482,7 @@ impl Task {
let mut finished_pieces: Vec<metadata::Piece> = Vec::new();

// Initialize stream channel.
let (in_stream_tx, in_stream_rx) = mpsc::channel(4096);
let (in_stream_tx, in_stream_rx) = mpsc::channel(1024 * 10);

// Send the register peer request.
in_stream_tx
Expand Down Expand Up @@ -923,6 +924,13 @@ impl Task {
);
let mut piece_collector_rx = piece_collector.run().await;

// Initialize the interrupt. If download from remote peer failed with scheduler or download
// progress, interrupt the collector and return the finished pieces.
let interrupt = Arc::new(AtomicBool::new(false));

// Initialize the finished pieces.
let finished_pieces = Arc::new(Mutex::new(Vec::new()));

// Initialize the join set.
let mut join_set = JoinSet::new();
let semaphore = Arc::new(Semaphore::new(
Expand All @@ -931,6 +939,13 @@ impl Task {

// Download the pieces from the remote peers.
while let Some(collect_piece) = piece_collector_rx.recv().await {
if interrupt.load(Ordering::SeqCst) {
// If the interrupt is true, break the collector loop.
info!("interrupt the piece collector");
drop(piece_collector_rx);
break;
}

async fn download_from_remote_peer(
task_id: String,
host_id: String,
Expand All @@ -943,6 +958,8 @@ impl Task {
semaphore: Arc<Semaphore>,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
in_stream_tx: Sender<AnnouncePeerRequest>,
interrupt: Arc<AtomicBool>,
finished_pieces: Arc<Mutex<Vec<metadata::Piece>>>,
) -> ClientResult<metadata::Piece> {
// Limit the concurrent piece count.
let _permit = semaphore.acquire().await.unwrap();
Expand Down Expand Up @@ -1012,6 +1029,7 @@ impl Task {
storage.piece_id(task_id.as_str(), number),
err
);
interrupt.store(true, Ordering::SeqCst);
err
})?;

Expand Down Expand Up @@ -1039,6 +1057,7 @@ impl Task {
storage.piece_id(task_id.as_str(), number),
err
);
interrupt.store(true, Ordering::SeqCst);
err
})?;

Expand All @@ -1048,6 +1067,9 @@ impl Task {
metadata.parent_id
);

let mut finished_pieces = finished_pieces.lock().unwrap();
finished_pieces.push(metadata.clone());

Ok(metadata)
}

Expand All @@ -1064,14 +1086,13 @@ impl Task {
semaphore.clone(),
download_progress_tx.clone(),
in_stream_tx.clone(),
interrupt.clone(),
finished_pieces.clone(),
)
.in_current_span(),
);
}

// Initialize the finished pieces.
let mut finished_pieces: Vec<metadata::Piece> = Vec::new();

// Wait for the pieces to be downloaded.
while let Some(message) = join_set
.join_next()
Expand All @@ -1080,10 +1101,7 @@ impl Task {
.or_err(ErrorType::AsyncRuntimeError)?
{
match message {
Ok(metadata) => {
// Store the finished piece.
finished_pieces.push(metadata.clone());
}
Ok(_) => {}
Err(Error::DownloadFromRemotePeerFailed(err)) => {
let (piece_number, parent_id) = (err.piece_number, err.parent_id);

Expand Down Expand Up @@ -1125,6 +1143,7 @@ impl Task {
// If the send timeout with scheduler or download progress, return the finished pieces.
// It will stop the download from the remote peer with scheduler
// and download from the source directly from middle.
let finished_pieces = finished_pieces.lock().unwrap().clone();
return Ok(finished_pieces);
}
Err(err) => {
Expand All @@ -1137,6 +1156,7 @@ impl Task {
}
}

let finished_pieces = finished_pieces.lock().unwrap().clone();
Ok(finished_pieces)
}

Expand Down

0 comments on commit f473797

Please sign in to comment.