Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support writing partitioned parquet to cloud #20590

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 31 additions & 37 deletions crates/polars-io/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,52 @@ use std::path::Path;
use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_core::POOL;
use polars_utils::create_file;
use rayon::prelude::*;

use crate::cloud::CloudOptions;
use crate::parquet::write::ParquetWriteOptions;
#[cfg(feature = "ipc")]
use crate::prelude::IpcWriterOptions;
use crate::prelude::URL_ENCODE_CHAR_SET;
use crate::{SerWriter, WriteDataFrameToFile};
use crate::utils::file::try_get_writeable;
use crate::{is_cloud_url, SerWriter, WriteDataFrameToFile};

impl WriteDataFrameToFile for ParquetWriteOptions {
fn write_df_to_file<W: std::io::Write>(&self, mut df: DataFrame, file: W) -> PolarsResult<()> {
self.to_writer(file).finish(&mut df)?;
fn write_df_to_file(
&self,
df: &mut DataFrame,
path: &str,
cloud_options: Option<&CloudOptions>,
) -> PolarsResult<()> {
let f = try_get_writeable(path, cloud_options)?;
self.to_writer(f).finish(df)?;
Ok(())
}
}

#[cfg(feature = "ipc")]
impl WriteDataFrameToFile for IpcWriterOptions {
fn write_df_to_file<W: std::io::Write>(&self, mut df: DataFrame, file: W) -> PolarsResult<()> {
self.to_writer(file).finish(&mut df)?;
fn write_df_to_file(
&self,
df: &mut DataFrame,
path: &str,
cloud_options: Option<&CloudOptions>,
) -> PolarsResult<()> {
let f = try_get_writeable(path, cloud_options)?;
self.to_writer(f).finish(df)?;
Ok(())
}
}

fn write_partitioned_dataset_impl<W>(
/// Write a partitioned parquet dataset. This functionality is unstable.
pub fn write_partitioned_dataset(
df: &mut DataFrame,
path: &Path,
partition_by: Vec<PlSmallStr>,
file_write_options: &W,
file_write_options: &(dyn WriteDataFrameToFile + Send + Sync),
cloud_options: Option<&CloudOptions>,
chunk_size: usize,
) -> PolarsResult<()>
where
W: WriteDataFrameToFile + Send + Sync,
{
let partition_by = partition_by.into_iter().collect::<Vec<PlSmallStr>>();
) -> PolarsResult<()> {
// Ensure we have a single chunk as the gather will otherwise rechunk per group.
df.as_single_chunk_par();

Expand Down Expand Up @@ -86,12 +97,16 @@ where
};

let base_path = path;
let is_cloud = is_cloud_url(base_path);
let groups = df.group_by(partition_by)?.take_groups();

let init_part_base_dir = |part_df: &DataFrame| {
let path_part = get_hive_path_part(part_df);
let dir = base_path.join(path_part);
std::fs::create_dir_all(&dir)?;

if !is_cloud {
std::fs::create_dir_all(&dir)?;
}

PolarsResult::Ok(dir)
};
Expand All @@ -107,9 +122,8 @@ where
(n_files, rows_per_file)
};

let write_part = |df: DataFrame, path: &Path| {
let f = create_file(path)?;
file_write_options.write_df_to_file(df, f)?;
let write_part = |mut df: DataFrame, path: &Path| {
file_write_options.write_df_to_file(&mut df, path.to_str().unwrap(), cloud_options)?;
PolarsResult::Ok(())
};

Expand Down Expand Up @@ -184,23 +198,3 @@ where

Ok(())
}

/// Write a partitioned parquet dataset. This functionality is unstable.
pub fn write_partitioned_dataset<I, S, W>(
df: &mut DataFrame,
path: &Path,
partition_by: I,
file_write_options: &W,
chunk_size: usize,
) -> PolarsResult<()>
where
I: IntoIterator<Item = S>,
S: Into<PlSmallStr>,
W: WriteDataFrameToFile + Send + Sync,
{
let partition_by = partition_by
.into_iter()
.map(Into::into)
.collect::<Vec<PlSmallStr>>();
write_partitioned_dataset_impl(df, path, partition_by, file_write_options, chunk_size)
}
8 changes: 7 additions & 1 deletion crates/polars-io/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use arrow::array::new_empty_array;
use arrow::record_batch::RecordBatch;
use polars_core::prelude::*;

use crate::cloud::CloudOptions;
use crate::options::RowIndex;
#[cfg(any(feature = "ipc", feature = "avro", feature = "ipc_streaming",))]
use crate::predicates::PhysicalIoExpr;
Expand Down Expand Up @@ -41,7 +42,12 @@ where
}

pub trait WriteDataFrameToFile {
fn write_df_to_file<W: std::io::Write>(&self, df: DataFrame, file: W) -> PolarsResult<()>;
fn write_df_to_file(
&self,
df: &mut DataFrame,
path: &str,
cloud_options: Option<&CloudOptions>,
) -> PolarsResult<()>;
}

pub trait ArrowReader {
Expand Down
38 changes: 19 additions & 19 deletions crates/polars-python/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,24 @@ impl PyDataFrame {

let compression = parse_parquet_compression(compression, compression_level)?;

#[cfg(feature = "cloud")]
let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
Some(
cloud_options
.with_max_retries(retries)
.with_credential_provider(
credential_provider.map(PlCredentialProvider::from_python_func_object),
),
)
} else {
None
};

#[cfg(not(feature = "cloud"))]
let cloud_options = None;

if let Some(partition_by) = partition_by {
// TODO: Support cloud
let path = py_f.extract::<String>(py)?;

py.allow_threads(|| {
Expand All @@ -447,8 +463,9 @@ impl PyDataFrame {
write_partitioned_dataset(
&mut self.df,
std::path::Path::new(path.as_str()),
partition_by.as_slice(),
partition_by.into_iter().map(|x| x.into()).collect(),
&write_options,
cloud_options.as_ref(),
partition_chunk_size_bytes,
)
.map_err(PyPolarsErr::from)
Expand All @@ -457,23 +474,6 @@ impl PyDataFrame {
return Ok(());
};

#[cfg(feature = "cloud")]
let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
Some(
cloud_options
.with_max_retries(retries)
.with_credential_provider(
credential_provider.map(PlCredentialProvider::from_python_func_object),
),
)
} else {
None
};

#[cfg(not(feature = "cloud"))]
let cloud_options = None;

let f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;

py.allow_threads(|| {
Expand Down
Loading