Skip to content

Commit

Permalink
added xz decoder
Browse files Browse the repository at this point in the history
xz decoding support is very unstable at the moment due to the high
demand of memory.
  • Loading branch information
svenrademakers committed Nov 15, 2023
1 parent 99a61f4 commit fd45f47
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/api/legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ async fn handle_flash_request(

let size = u64::from_str(size)
.map_err(|_| LegacyResponse::bad_request("`length` parameter is not a number"))?;
DataTransfer::remote(PathBuf::from(&file), size, 256)
DataTransfer::remote(PathBuf::from(&file), size, 16)
};

let transfer = InitializeTransfer::new(process_name, upgrade_command, data_transfer);
Expand Down
2 changes: 1 addition & 1 deletion src/app/upgrade_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use tokio::{
use tokio_util::sync::CancellationToken;

const TMP_UPGRADE_DIR: &str = "/tmp/os_upgrade";
const BLOCK_WRITE_SIZE: usize = 4194304; // 4Mib
const BLOCK_WRITE_SIZE: usize = BLOCK_READ_SIZE; // 4Mib
const BLOCK_READ_SIZE: usize = 524288; // 512Kib

// Contains collection of functions that execute some business flow in relation
Expand Down
42 changes: 32 additions & 10 deletions src/streaming_data_service/data_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::Path;
use anyhow::Context;
use async_compression::tokio::bufread::XzDecoder;
use bytes::Bytes;
use std::ffi::OsStr;
use std::io::Seek;
use std::{io::ErrorKind, path::PathBuf};
use tokio::fs::OpenOptions;
use tokio::io;
use tokio::io::AsyncBufRead;
use tokio::io::AsyncRead;
use tokio::io::BufReader;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -89,20 +93,24 @@ impl DataTransfer {

pub async fn reader(self) -> anyhow::Result<impl AsyncRead + Sync + Send + Unpin> {
match self {
DataTransfer::Local { path } => OpenOptions::new()
.read(true)
.open(&path)
.await
.map(|x| Box::new(BufReader::new(x)) as Box<dyn AsyncRead + Sync + Send + Unpin>)
.with_context(|| path.to_string_lossy().to_string()),
DataTransfer::Local { path } => {
let file = OpenOptions::new()
.read(true)
.open(&path)
.await
.with_context(|| path.to_string_lossy().to_string())?;

Ok(with_xz_support(&path, BufReader::new(file)))
}
DataTransfer::Remote {
file_name: _,
file_name,
size: _,
sender: _,
receiver,
} => Ok(Box::new(StreamReader::new(
ReceiverStream::new(receiver).map(Ok::<bytes::Bytes, std::io::Error>),
)) as Box<dyn AsyncRead + Sync + Send + Unpin>),
} => {
let stream = ReceiverStream::new(receiver).map(Ok::<bytes::Bytes, io::Error>);
Ok(with_xz_support(&file_name, StreamReader::new(stream)))
}
}
}

Expand All @@ -119,3 +127,17 @@ impl DataTransfer {
None
}
}

fn with_xz_support(
file: &Path,
reader: impl AsyncBufRead + Send + Sync + Unpin + 'static,
) -> Box<dyn AsyncRead + Send + Sync + Unpin> {
if file.extension().unwrap_or_default() == "xz" {
log::info!("enabled xz decoder for {}", file.to_string_lossy());
// 66Meg is really on the limit on what we can give the decoder.
let decoder = XzDecoder::with_mem_limit(reader, 66 * 1024 * 1024);
Box::new(decoder) as Box<dyn AsyncRead + Sync + Send + Unpin>
} else {
Box::new(reader) as Box<dyn AsyncRead + Sync + Send + Unpin>
}
}

0 comments on commit fd45f47

Please sign in to comment.