Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Parallelize IO writes in new streaming engine #20840

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions crates/polars-stream/src/nodes/io_sinks/csv.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::cmp::Reverse;
use std::io::Write;
use std::path::{Path, PathBuf};

use polars_core::frame::DataFrame;
Expand All @@ -11,6 +10,7 @@ use polars_io::SerWriter;
use polars_utils::priority::Priority;

use crate::async_primitives::linearizer::Linearizer;
use crate::nodes::io_sinks::write_buffers_from_linearizer_to_file;
use crate::nodes::{ComputeNode, JoinHandle, MorselSeq, PortState, TaskPriority, TaskScope};
use crate::pipe::{RecvPort, SendPort};
use crate::DEFAULT_LINEARIZER_BUFFER_SIZE;
Expand Down Expand Up @@ -69,7 +69,7 @@ impl ComputeNode for CsvSinkNode {
// .. -> Encode task
let receivers = recv_ports[0].take().unwrap().parallel();
// Encode tasks -> IO task
let (mut linearizer, senders) = Linearizer::<Priority<Reverse<MorselSeq>, Vec<u8>>>::new(
let (linearizer, senders) = Linearizer::<Priority<Reverse<MorselSeq>, Vec<u8>>>::new(
receivers.len(),
DEFAULT_LINEARIZER_BUFFER_SIZE,
);
Expand Down Expand Up @@ -152,11 +152,7 @@ impl ComputeNode for CsvSinkNode {
writer.write_batch(&DataFrame::empty_with_schema(&schema))?;
}

while let Some(Priority(_, buffer)) = linearizer.get().await {
file.write_all(&buffer)?;
}

PolarsResult::Ok(())
write_buffers_from_linearizer_to_file(file, linearizer).await
});
join_handles
.push(scope.spawn_task(TaskPriority::Low, async move { io_task.await.unwrap() }));
Expand Down
81 changes: 81 additions & 0 deletions crates/polars-stream/src/nodes/io_sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,87 @@
use std::cmp::Reverse;
use std::fs::File;

use polars_error::PolarsResult;
use polars_utils::priority::Priority;

use super::MorselSeq;
use crate::async_primitives::linearizer::Linearizer;

#[cfg(feature = "csv")]
pub mod csv;
#[cfg(feature = "ipc")]
pub mod ipc;
#[cfg(feature = "parquet")]
pub mod parquet;

/// Write buffers coming from a linearizer to a [`File`].
async fn write_buffers_from_linearizer_to_file(
mut file: File,
mut linearizer: Linearizer<Priority<Reverse<MorselSeq>, Vec<u8>>>,
) -> PolarsResult<()> {
// On Unix systems we can use the `pwrite64` syscall to parallelize multiple writes.
//
// This seems to speed-up writing by quite a bit.
#[cfg(target_family = "unix")]
{
use std::io::Seek;
use std::os::unix::fs::FileExt;

// This is taken without too much thought. It might be good to couple to the number of
// threads available.
const NUM_WRITING_TASKS: usize = 4;

// Get the initial position in the file. We will start writing from here.
let mut seek_position = file.seek(std::io::SeekFrom::Current(0))?;
let mut futures = Vec::with_capacity(NUM_WRITING_TASKS);

let io_runtime = polars_io::pl_async::get_runtime();
// A reference to file needs to be given to each writing task. Therefore, we put the file
// in an Arc and clone it for each write.
let file = std::sync::Arc::new(file);

while let Some(Priority(_, buffer)) = linearizer.get().await {
// If we have too many writing tasks out at the moment, wait for one to complete before
// spawning another one.
if futures.len() >= NUM_WRITING_TASKS {
let result: Result<_, tokio::task::JoinError>;
(result, _, futures) = futures::future::select_all(futures).await;
result.unwrap_or_else(|err| {
if err.is_panic() {
// Resume the panic on the main task
std::panic::resume_unwind(err.into_panic());
}

Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to join on write_all_at",
))
})?;
}

let buffer_len = buffer.len();
let file = file.clone();
futures.push(io_runtime.spawn_blocking(move || {
// Move the buffer over. This allows freeing (i.e. munmap) it to be parallel as
// well.
let buffer = buffer;

file.write_all_at(&buffer, seek_position)
}));
seek_position += buffer_len as u64;
}
}

// @TODO:
// It might be worth it to investigate optimizing this for WASI also with `write_all_at` and
// for windows with `seek_write`.
#[cfg(not(target_family = "unix"))]
{
use std::io::Write;
while let Some(Priority(_, buffer)) = linearizer.get().await {
file.write_all(&buffer)
}
}

Ok(())
}
Loading