diff --git a/crates/polars-io/src/partition.rs b/crates/polars-io/src/partition.rs index 2e3499fe57aa..bf4af7ca9818 100644 --- a/crates/polars-io/src/partition.rs +++ b/crates/polars-io/src/partition.rs @@ -5,41 +5,52 @@ use std::path::Path; use polars_core::prelude::*; use polars_core::series::IsSorted; use polars_core::POOL; -use polars_utils::create_file; use rayon::prelude::*; +use crate::cloud::CloudOptions; use crate::parquet::write::ParquetWriteOptions; #[cfg(feature = "ipc")] use crate::prelude::IpcWriterOptions; use crate::prelude::URL_ENCODE_CHAR_SET; -use crate::{SerWriter, WriteDataFrameToFile}; +use crate::utils::file::try_get_writeable; +use crate::{is_cloud_url, SerWriter, WriteDataFrameToFile}; impl WriteDataFrameToFile for ParquetWriteOptions { - fn write_df_to_file(&self, mut df: DataFrame, file: W) -> PolarsResult<()> { - self.to_writer(file).finish(&mut df)?; + fn write_df_to_file( + &self, + df: &mut DataFrame, + path: &str, + cloud_options: Option<&CloudOptions>, + ) -> PolarsResult<()> { + let f = try_get_writeable(path, cloud_options)?; + self.to_writer(f).finish(df)?; Ok(()) } } #[cfg(feature = "ipc")] impl WriteDataFrameToFile for IpcWriterOptions { - fn write_df_to_file(&self, mut df: DataFrame, file: W) -> PolarsResult<()> { - self.to_writer(file).finish(&mut df)?; + fn write_df_to_file( + &self, + df: &mut DataFrame, + path: &str, + cloud_options: Option<&CloudOptions>, + ) -> PolarsResult<()> { + let f = try_get_writeable(path, cloud_options)?; + self.to_writer(f).finish(df)?; Ok(()) } } -fn write_partitioned_dataset_impl( +/// Write a partitioned parquet dataset. This functionality is unstable. +pub fn write_partitioned_dataset( df: &mut DataFrame, path: &Path, partition_by: Vec, - file_write_options: &W, + file_write_options: &(dyn WriteDataFrameToFile + Send + Sync), + cloud_options: Option<&CloudOptions>, chunk_size: usize, -) -> PolarsResult<()> -where - W: WriteDataFrameToFile + Send + Sync, -{ - let partition_by = partition_by.into_iter().collect::>(); +) -> PolarsResult<()> { // Ensure we have a single chunk as the gather will otherwise rechunk per group. df.as_single_chunk_par(); @@ -86,12 +97,16 @@ where }; let base_path = path; + let is_cloud = is_cloud_url(base_path); let groups = df.group_by(partition_by)?.take_groups(); let init_part_base_dir = |part_df: &DataFrame| { let path_part = get_hive_path_part(part_df); let dir = base_path.join(path_part); - std::fs::create_dir_all(&dir)?; + + if !is_cloud { + std::fs::create_dir_all(&dir)?; + } PolarsResult::Ok(dir) }; @@ -107,9 +122,8 @@ where (n_files, rows_per_file) }; - let write_part = |df: DataFrame, path: &Path| { - let f = create_file(path)?; - file_write_options.write_df_to_file(df, f)?; + let write_part = |mut df: DataFrame, path: &Path| { + file_write_options.write_df_to_file(&mut df, path.to_str().unwrap(), cloud_options)?; PolarsResult::Ok(()) }; @@ -184,23 +198,3 @@ where Ok(()) } - -/// Write a partitioned parquet dataset. This functionality is unstable. -pub fn write_partitioned_dataset( - df: &mut DataFrame, - path: &Path, - partition_by: I, - file_write_options: &W, - chunk_size: usize, -) -> PolarsResult<()> -where - I: IntoIterator, - S: Into, - W: WriteDataFrameToFile + Send + Sync, -{ - let partition_by = partition_by - .into_iter() - .map(Into::into) - .collect::>(); - write_partitioned_dataset_impl(df, path, partition_by, file_write_options, chunk_size) -} diff --git a/crates/polars-io/src/shared.rs b/crates/polars-io/src/shared.rs index 1eea338f4788..fe29f12feb83 100644 --- a/crates/polars-io/src/shared.rs +++ b/crates/polars-io/src/shared.rs @@ -5,6 +5,7 @@ use arrow::array::new_empty_array; use arrow::record_batch::RecordBatch; use polars_core::prelude::*; +use crate::cloud::CloudOptions; use crate::options::RowIndex; #[cfg(any(feature = "ipc", feature = "avro", feature = "ipc_streaming",))] use crate::predicates::PhysicalIoExpr; @@ -41,7 +42,12 @@ where } pub trait WriteDataFrameToFile { - fn write_df_to_file(&self, df: DataFrame, file: W) -> PolarsResult<()>; + fn write_df_to_file( + &self, + df: &mut DataFrame, + path: &str, + cloud_options: Option<&CloudOptions>, + ) -> PolarsResult<()>; } pub trait ArrowReader { diff --git a/crates/polars-python/src/dataframe/io.rs b/crates/polars-python/src/dataframe/io.rs index a95ee907dbd5..17c27c6f7eaf 100644 --- a/crates/polars-python/src/dataframe/io.rs +++ b/crates/polars-python/src/dataframe/io.rs @@ -432,8 +432,24 @@ impl PyDataFrame { let compression = parse_parquet_compression(compression, compression_level)?; + #[cfg(feature = "cloud")] + let cloud_options = if let Ok(path) = py_f.extract::>(py) { + let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?; + Some( + cloud_options + .with_max_retries(retries) + .with_credential_provider( + credential_provider.map(PlCredentialProvider::from_python_func_object), + ), + ) + } else { + None + }; + + #[cfg(not(feature = "cloud"))] + let cloud_options = None; + if let Some(partition_by) = partition_by { - // TODO: Support cloud let path = py_f.extract::(py)?; py.allow_threads(|| { @@ -447,8 +463,9 @@ impl PyDataFrame { write_partitioned_dataset( &mut self.df, std::path::Path::new(path.as_str()), - partition_by.as_slice(), + partition_by.into_iter().map(|x| x.into()).collect(), &write_options, + cloud_options.as_ref(), partition_chunk_size_bytes, ) .map_err(PyPolarsErr::from) @@ -457,23 +474,6 @@ impl PyDataFrame { return Ok(()); }; - #[cfg(feature = "cloud")] - let cloud_options = if let Ok(path) = py_f.extract::>(py) { - let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?; - Some( - cloud_options - .with_max_retries(retries) - .with_credential_provider( - credential_provider.map(PlCredentialProvider::from_python_func_object), - ), - ) - } else { - None - }; - - #[cfg(not(feature = "cloud"))] - let cloud_options = None; - let f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?; py.allow_threads(|| {