diff --git a/crates/polars-pipe/src/executors/sinks/io.rs b/crates/polars-pipe/src/executors/sinks/io.rs index 7ffdd505444d..cdc7978e881a 100644 --- a/crates/polars-pipe/src/executors/sinks/io.rs +++ b/crates/polars-pipe/src/executors/sinks/io.rs @@ -8,9 +8,9 @@ use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; use polars_core::error::ErrString; use polars_core::prelude::*; use polars_core::utils::arrow::temporal_conversions::SECONDS_IN_DAY; +use polars_io::path_utils::POLARS_TEMP_DIR_BASE_PATH; use polars_io::prelude::*; -use crate::executors::sinks::get_base_temp_dir; use crate::pipeline::morsels_per_sink; pub(in crate::executors::sinks) type DfIter = @@ -38,20 +38,23 @@ fn get_lockfile_path(dir: &Path) -> PathBuf { fn get_spill_dir(operation_name: &'static str) -> PolarsResult { let id = uuid::Uuid::new_v4(); - - let mut dir = std::path::PathBuf::from(get_base_temp_dir()); - dir.push(format!("polars/{operation_name}/{id}")); + let dir = POLARS_TEMP_DIR_BASE_PATH.join(format!("{operation_name}/{id}")); if !dir.exists() { fs::create_dir_all(&dir).map_err(|err| { PolarsError::ComputeError(ErrString::from(format!( - "Failed to create spill directory: {}", + "Failed to create spill directory: {:?}\n {}", + dir.to_str(), err ))) })?; } else if !dir.is_dir() { return Err(PolarsError::ComputeError( - "Specified spill path is not a directory".into(), + format!( + "Specified spill path is not a directory: {:?}", + dir.to_str() + ) + .into(), )); } @@ -76,8 +79,7 @@ fn clean_after_delay(time: Option, secs: u64, path: &Path) { fn gc_thread(operation_name: &'static str, rx: Receiver) { let _ = std::thread::spawn(move || { // First clean all existing - let mut dir = std::path::PathBuf::from(get_base_temp_dir()); - dir.push(format!("polars/{operation_name}")); + let dir = POLARS_TEMP_DIR_BASE_PATH.join(operation_name); // if the directory does not exist, there is nothing to clean let rd = match std::fs::read_dir(&dir) { diff --git a/crates/polars-pipe/src/executors/sinks/mod.rs b/crates/polars-pipe/src/executors/sinks/mod.rs index 86d46eb3082c..67e7aa4f2da8 100644 --- a/crates/polars-pipe/src/executors/sinks/mod.rs +++ b/crates/polars-pipe/src/executors/sinks/mod.rs @@ -9,8 +9,6 @@ mod slice; mod sort; mod utils; -use std::sync::OnceLock; - pub(crate) use joins::*; pub(crate) use ordered::*; #[cfg(any( @@ -27,17 +25,3 @@ pub(crate) use sort::*; // We must strike a balance between cache coherence and resizing costs. // Overallocation seems a lot more expensive than resizing so we start reasonable small. const HASHMAP_INIT_SIZE: usize = 64; - -pub(crate) static POLARS_TEMP_DIR: OnceLock = OnceLock::new(); - -pub(crate) fn get_base_temp_dir() -> &'static str { - POLARS_TEMP_DIR.get_or_init(|| { - let tmp = std::env::var("POLARS_TEMP_DIR") - .unwrap_or_else(|_| std::env::temp_dir().to_string_lossy().into_owned()); - - if polars_core::config::verbose() { - eprintln!("Temporary directory path in use: {}", &tmp); - } - tmp - }) -} diff --git a/py-polars/polars/config.py b/py-polars/polars/config.py index dc5060c4e4c3..25be36ebea72 100644 --- a/py-polars/polars/config.py +++ b/py-polars/polars/config.py @@ -28,27 +28,26 @@ __all__ = ["Config"] TableFormatNames: TypeAlias = Literal[ - "ASCII_FULL", - "ASCII_FULL_CONDENSED", - "ASCII_NO_BORDERS", "ASCII_BORDERS_ONLY", "ASCII_BORDERS_ONLY_CONDENSED", + "ASCII_FULL", + "ASCII_FULL_CONDENSED", "ASCII_HORIZONTAL_ONLY", "ASCII_MARKDOWN", + "ASCII_NO_BORDERS", "MARKDOWN", + "NOTHING", + "UTF8_BORDERS_ONLY", "UTF8_FULL", "UTF8_FULL_CONDENSED", - "UTF8_NO_BORDERS", - "UTF8_BORDERS_ONLY", "UTF8_HORIZONTAL_ONLY", - "NOTHING", + "UTF8_NO_BORDERS", ] # note: register all Config-specific environment variable names here; need to constrain # which 'POLARS_' environment variables are recognized, as there are other lower-level # and/or unstable settings that should not be saved or reset with the Config vars. _POLARS_CFG_ENV_VARS = { - "POLARS_WARN_UNSTABLE", "POLARS_AUTO_STRUCTIFY", "POLARS_FMT_MAX_COLS", "POLARS_FMT_MAX_ROWS", @@ -67,10 +66,12 @@ "POLARS_FMT_TABLE_HIDE_DATAFRAME_SHAPE_INFORMATION", "POLARS_FMT_TABLE_INLINE_COLUMN_DATA_TYPE", "POLARS_FMT_TABLE_ROUNDED_CORNERS", + "POLARS_MAX_EXPR_DEPTH", "POLARS_STREAMING_CHUNK_SIZE", "POLARS_TABLE_WIDTH", + "POLARS_TEMP_DIR", "POLARS_VERBOSE", - "POLARS_MAX_EXPR_DEPTH", + "POLARS_WARN_UNSTABLE", } # vars that set the rust env directly should declare themselves here as the Config @@ -517,6 +518,42 @@ def set_decimal_separator(cls, separator: str | None = None) -> type[Config]: plr.set_decimal_separator(sep=separator) return cls + @classmethod + def set_temp_dir(cls, path: str | Path | None = None) -> type[Config]: + """ + Set the directory to use for any temporary files created by Polars. + + Notes + ----- + * This method sets the "POLARS_TEMP_DIR" environment variable, which + is only read once per session (on first use). Any subsequent changes + to this variable will *not* be picked up. + + * Temporary files may be created in several situations; for example, + a streaming mode operation may spill intermediate results to disk, + cloud-based files may need local caching on download, and sink ops + may also require temporary storage. + + * If not explicitly set the temporary directory is determined using + the Rust `std::env::temp_dir` function. See the Rust documentation + for details: https://doc.rust-lang.org/std/env/fn.temp_dir.html. + + Parameters + ---------- + path : str, Path, None + Path to a directory to use for Polars' temporary files, such as + where streaming operations may spill to disk. + + Examples + -------- + >>> pl.Config(temp_dir="/tmp/custom_directory/") # doctest: +SKIP + """ + if path is None: + os.environ.pop("POLARS_TEMP_DIR", None) + else: + os.environ["POLARS_TEMP_DIR"] = str(path) + return cls + @classmethod def set_thousands_separator( cls, separator: str | bool | None = None diff --git a/py-polars/tests/unit/streaming/test_streaming.py b/py-polars/tests/unit/streaming/test_streaming.py index 225c0b97553c..60b811923434 100644 --- a/py-polars/tests/unit/streaming/test_streaming.py +++ b/py-polars/tests/unit/streaming/test_streaming.py @@ -1,6 +1,5 @@ from __future__ import annotations -import os import time from datetime import date from pathlib import Path @@ -70,8 +69,8 @@ def test_streaming_streamable_functions(monkeypatch: Any, capfd: Any) -> None: "b": [1, 2, 3], } - (_, err) = capfd.readouterr() - assert "df -> function -> ordered_sink" in err + (_, err) = capfd.readouterr() + assert "df -> function -> ordered_sink" in err @pytest.mark.slow @@ -123,15 +122,15 @@ def test_streaming_apply(monkeypatch: Any, capfd: Any) -> None: monkeypatch.setenv("POLARS_VERBOSE", "1") q = pl.DataFrame({"a": [1, 2]}).lazy() - - with pytest.warns(PolarsInefficientMapWarning, match="with this one instead"): - ( - q.select( - pl.col("a").map_elements(lambda x: x * 2, return_dtype=pl.Int64) - ).collect(streaming=True) - ) - (_, err) = capfd.readouterr() - assert "df -> projection -> ordered_sink" in err + with pl.Config(verbose=True): # noqa: SIM117 + with pytest.warns(PolarsInefficientMapWarning, match="with this one instead"): + ( + q.select( + pl.col("a").map_elements(lambda x: x * 2, return_dtype=pl.Int64) + ).collect(streaming=True) + ) + (_, err) = capfd.readouterr() + assert "df -> projection -> ordered_sink" in err def test_streaming_ternary() -> None: diff --git a/py-polars/tests/unit/streaming/test_streaming_unique.py b/py-polars/tests/unit/streaming/test_streaming_unique.py index e477952a8c0b..f1372b4582bc 100644 --- a/py-polars/tests/unit/streaming/test_streaming_unique.py +++ b/py-polars/tests/unit/streaming/test_streaming_unique.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os from typing import TYPE_CHECKING, Any import pytest @@ -19,36 +20,51 @@ def test_streaming_out_of_core_unique( io_files_path: Path, tmp_path: Path, monkeypatch: Any, capfd: Any ) -> None: tmp_path.mkdir(exist_ok=True) - monkeypatch.setenv("POLARS_TEMP_DIR", str(tmp_path)) - monkeypatch.setenv("POLARS_FORCE_OOC", "1") - monkeypatch.setenv("POLARS_VERBOSE", "1") - monkeypatch.setenv("POLARS_STREAMING_GROUPBY_SPILL_SIZE", "256") - df = pl.read_csv(io_files_path / "foods*.csv") - # this creates 10M rows - q = df.lazy() - q = q.join(q, how="cross").select(df.columns).head(10_000) - - # uses out-of-core unique - df1 = q.join(q.head(1000), how="cross").unique().collect(streaming=True) - # this ensures the cross join gives equal result but uses the in-memory unique - df2 = q.join(q.head(1000), how="cross").collect(streaming=True).unique() - assert df1.shape == df2.shape - - # TODO: Re-enable this check when this issue is fixed: https://github.com/pola-rs/polars/issues/10466 - _ = capfd.readouterr().err - # assert "OOC group_by started" in err - - -def test_streaming_unique(monkeypatch: Any, capfd: Any) -> None: - monkeypatch.setenv("POLARS_VERBOSE", "1") - df = pl.DataFrame({"a": [1, 2, 2, 2], "b": [3, 4, 4, 4], "c": [5, 6, 7, 7]}) - q = df.lazy().unique(subset=["a", "c"], maintain_order=False).sort(["a", "b", "c"]) - assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False)) - - q = df.lazy().unique(subset=["b", "c"], maintain_order=False).sort(["a", "b", "c"]) - assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False)) - - q = df.lazy().unique(subset=None, maintain_order=False).sort(["a", "b", "c"]) - assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False)) - (_, err) = capfd.readouterr() - assert "df -> re-project-sink -> sort_multiple" in err + with pl.Config( + temp_dir=tmp_path, + verbose=True, + ): + monkeypatch.setenv("POLARS_FORCE_OOC", "1") + monkeypatch.setenv("POLARS_STREAMING_GROUPBY_SPILL_SIZE", "256") + + df = pl.read_csv(io_files_path / "foods*.csv") + # this creates 10M rows + q = df.lazy() + q = q.join(q, how="cross").select(df.columns).head(10_000) + + # uses out-of-core unique + df1 = q.join(q.head(1000), how="cross").unique().collect(streaming=True) + + # confirm that the custom temp path was used to spill ooc op to disk + assert os.listdir(tmp_path), f"Temp directory '{tmp_path}' should not be empty" + + # this ensures the cross join gives equal result but uses the in-memory unique + df2 = q.join(q.head(1000), how="cross").collect(streaming=True).unique() + assert df1.shape == df2.shape + + # TODO: Re-enable this check when this issue is fixed: https://github.com/pola-rs/polars/issues/10466 + _ = capfd.readouterr().err + # assert "OOC group_by started" in err + + +def test_streaming_unique(capfd: Any) -> None: + with pl.Config(verbose=True): + df = pl.DataFrame({"a": [1, 2, 2, 2], "b": [3, 4, 4, 4], "c": [5, 6, 7, 7]}) + q = ( + df.lazy() + .unique(subset=["a", "c"], maintain_order=False) + .sort(["a", "b", "c"]) + ) + assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False)) + + q = ( + df.lazy() + .unique(subset=["b", "c"], maintain_order=False) + .sort(["a", "b", "c"]) + ) + assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False)) + + q = df.lazy().unique(subset=None, maintain_order=False).sort(["a", "b", "c"]) + assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False)) + (_, err) = capfd.readouterr() + assert "df -> re-project-sink -> sort_multiple" in err diff --git a/py-polars/tests/unit/test_config.py b/py-polars/tests/unit/test_config.py index 02397afcf9d4..6c9b2780b963 100644 --- a/py-polars/tests/unit/test_config.py +++ b/py-polars/tests/unit/test_config.py @@ -931,6 +931,7 @@ def test_warn_unstable(recwarn: pytest.WarningsRecorder) -> None: ("POLARS_TABLE_WIDTH", "set_tbl_width_chars", 80, "80"), ("POLARS_VERBOSE", "set_verbose", True, "1"), ("POLARS_WARN_UNSTABLE", "warn_unstable", True, "1"), + ("POLARS_TEMP_DIR", "set_temp_dir", "/other/tmp", "/other/tmp"), ], ) def test_unset_config_env_vars(