Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Centralise acquisition of "POLARS_TEMP_DIR", and add set_temp_dir to the Config object #18063

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions crates/polars-pipe/src/executors/sinks/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use polars_core::error::ErrString;
use polars_core::prelude::*;
use polars_core::utils::arrow::temporal_conversions::SECONDS_IN_DAY;
use polars_io::path_utils::POLARS_TEMP_DIR_BASE_PATH;
use polars_io::prelude::*;

use crate::executors::sinks::get_base_temp_dir;
use crate::pipeline::morsels_per_sink;

pub(in crate::executors::sinks) type DfIter =
Expand Down Expand Up @@ -38,20 +38,23 @@ fn get_lockfile_path(dir: &Path) -> PathBuf {

fn get_spill_dir(operation_name: &'static str) -> PolarsResult<PathBuf> {
let id = uuid::Uuid::new_v4();

let mut dir = std::path::PathBuf::from(get_base_temp_dir());
dir.push(format!("polars/{operation_name}/{id}"));
let dir = POLARS_TEMP_DIR_BASE_PATH.join(format!("{operation_name}/{id}"));

if !dir.exists() {
fs::create_dir_all(&dir).map_err(|err| {
PolarsError::ComputeError(ErrString::from(format!(
"Failed to create spill directory: {}",
"Failed to create spill directory: {:?}\n {}",
dir.to_str(),
err
)))
})?;
} else if !dir.is_dir() {
return Err(PolarsError::ComputeError(
"Specified spill path is not a directory".into(),
format!(
"Specified spill path is not a directory: {:?}",
dir.to_str()
)
.into(),
));
}

Expand All @@ -76,8 +79,7 @@ fn clean_after_delay(time: Option<SystemTime>, secs: u64, path: &Path) {
fn gc_thread(operation_name: &'static str, rx: Receiver<PathBuf>) {
let _ = std::thread::spawn(move || {
// First clean all existing
let mut dir = std::path::PathBuf::from(get_base_temp_dir());
dir.push(format!("polars/{operation_name}"));
let dir = POLARS_TEMP_DIR_BASE_PATH.join(operation_name);

// if the directory does not exist, there is nothing to clean
let rd = match std::fs::read_dir(&dir) {
Expand Down
16 changes: 0 additions & 16 deletions crates/polars-pipe/src/executors/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ mod slice;
mod sort;
mod utils;

use std::sync::OnceLock;

pub(crate) use joins::*;
pub(crate) use ordered::*;
#[cfg(any(
Expand All @@ -27,17 +25,3 @@ pub(crate) use sort::*;
// We must strike a balance between cache coherence and resizing costs.
// Overallocation seems a lot more expensive than resizing so we start reasonable small.
const HASHMAP_INIT_SIZE: usize = 64;

pub(crate) static POLARS_TEMP_DIR: OnceLock<String> = OnceLock::new();

pub(crate) fn get_base_temp_dir() -> &'static str {
POLARS_TEMP_DIR.get_or_init(|| {
let tmp = std::env::var("POLARS_TEMP_DIR")
.unwrap_or_else(|_| std::env::temp_dir().to_string_lossy().into_owned());

if polars_core::config::verbose() {
eprintln!("Temporary directory path in use: {}", &tmp);
}
tmp
})
}
53 changes: 45 additions & 8 deletions py-polars/polars/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,26 @@
__all__ = ["Config"]

TableFormatNames: TypeAlias = Literal[
"ASCII_FULL",
"ASCII_FULL_CONDENSED",
"ASCII_NO_BORDERS",
"ASCII_BORDERS_ONLY",
"ASCII_BORDERS_ONLY_CONDENSED",
"ASCII_FULL",
"ASCII_FULL_CONDENSED",
"ASCII_HORIZONTAL_ONLY",
"ASCII_MARKDOWN",
"ASCII_NO_BORDERS",
"MARKDOWN",
"NOTHING",
"UTF8_BORDERS_ONLY",
"UTF8_FULL",
"UTF8_FULL_CONDENSED",
"UTF8_NO_BORDERS",
"UTF8_BORDERS_ONLY",
"UTF8_HORIZONTAL_ONLY",
"NOTHING",
"UTF8_NO_BORDERS",
]

# note: register all Config-specific environment variable names here; need to constrain
# which 'POLARS_' environment variables are recognized, as there are other lower-level
# and/or unstable settings that should not be saved or reset with the Config vars.
_POLARS_CFG_ENV_VARS = {
"POLARS_WARN_UNSTABLE",
"POLARS_AUTO_STRUCTIFY",
"POLARS_FMT_MAX_COLS",
"POLARS_FMT_MAX_ROWS",
Expand All @@ -67,10 +66,12 @@
"POLARS_FMT_TABLE_HIDE_DATAFRAME_SHAPE_INFORMATION",
"POLARS_FMT_TABLE_INLINE_COLUMN_DATA_TYPE",
"POLARS_FMT_TABLE_ROUNDED_CORNERS",
"POLARS_MAX_EXPR_DEPTH",
"POLARS_STREAMING_CHUNK_SIZE",
"POLARS_TABLE_WIDTH",
"POLARS_TEMP_DIR",
"POLARS_VERBOSE",
"POLARS_MAX_EXPR_DEPTH",
"POLARS_WARN_UNSTABLE",
}

# vars that set the rust env directly should declare themselves here as the Config
Expand Down Expand Up @@ -517,6 +518,42 @@ def set_decimal_separator(cls, separator: str | None = None) -> type[Config]:
plr.set_decimal_separator(sep=separator)
return cls

@classmethod
def set_temp_dir(cls, path: str | Path | None = None) -> type[Config]:
"""
Set the directory to use for any temporary files created by Polars.

Notes
-----
* This method sets the "POLARS_TEMP_DIR" environment variable, which
is only read once per session (on first use). Any subsequent changes
to this variable will *not* be picked up.

* Temporary files may be created in several situations; for example,
a streaming mode operation may spill intermediate results to disk,
cloud-based files may need local caching on download, and sink ops
may also require temporary storage.

* If not explicitly set the temporary directory is determined using
the Rust `std::env::temp_dir` function. See the Rust documentation
for details: https://doc.rust-lang.org/std/env/fn.temp_dir.html.

Parameters
----------
path : str, Path, None
Path to a directory to use for Polars' temporary files, such as
where streaming operations may spill to disk.

Examples
--------
>>> pl.Config(temp_dir="/tmp/custom_directory/") # doctest: +SKIP
"""
if path is None:
os.environ.pop("POLARS_TEMP_DIR", None)
else:
os.environ["POLARS_TEMP_DIR"] = str(path)
return cls

@classmethod
def set_thousands_separator(
cls, separator: str | bool | None = None
Expand Down
23 changes: 11 additions & 12 deletions py-polars/tests/unit/streaming/test_streaming.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import os
import time
from datetime import date
from pathlib import Path
Expand Down Expand Up @@ -70,8 +69,8 @@ def test_streaming_streamable_functions(monkeypatch: Any, capfd: Any) -> None:
"b": [1, 2, 3],
}

(_, err) = capfd.readouterr()
assert "df -> function -> ordered_sink" in err
(_, err) = capfd.readouterr()
assert "df -> function -> ordered_sink" in err


@pytest.mark.slow
Expand Down Expand Up @@ -123,15 +122,15 @@ def test_streaming_apply(monkeypatch: Any, capfd: Any) -> None:
monkeypatch.setenv("POLARS_VERBOSE", "1")

q = pl.DataFrame({"a": [1, 2]}).lazy()

with pytest.warns(PolarsInefficientMapWarning, match="with this one instead"):
(
q.select(
pl.col("a").map_elements(lambda x: x * 2, return_dtype=pl.Int64)
).collect(streaming=True)
)
(_, err) = capfd.readouterr()
assert "df -> projection -> ordered_sink" in err
with pl.Config(verbose=True): # noqa: SIM117
with pytest.warns(PolarsInefficientMapWarning, match="with this one instead"):
(
q.select(
pl.col("a").map_elements(lambda x: x * 2, return_dtype=pl.Int64)
).collect(streaming=True)
)
(_, err) = capfd.readouterr()
assert "df -> projection -> ordered_sink" in err


def test_streaming_ternary() -> None:
Expand Down
82 changes: 49 additions & 33 deletions py-polars/tests/unit/streaming/test_streaming_unique.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING, Any

import pytest
Expand All @@ -19,36 +20,51 @@ def test_streaming_out_of_core_unique(
io_files_path: Path, tmp_path: Path, monkeypatch: Any, capfd: Any
) -> None:
tmp_path.mkdir(exist_ok=True)
monkeypatch.setenv("POLARS_TEMP_DIR", str(tmp_path))
monkeypatch.setenv("POLARS_FORCE_OOC", "1")
monkeypatch.setenv("POLARS_VERBOSE", "1")
monkeypatch.setenv("POLARS_STREAMING_GROUPBY_SPILL_SIZE", "256")
df = pl.read_csv(io_files_path / "foods*.csv")
# this creates 10M rows
q = df.lazy()
q = q.join(q, how="cross").select(df.columns).head(10_000)

# uses out-of-core unique
df1 = q.join(q.head(1000), how="cross").unique().collect(streaming=True)
# this ensures the cross join gives equal result but uses the in-memory unique
df2 = q.join(q.head(1000), how="cross").collect(streaming=True).unique()
assert df1.shape == df2.shape

# TODO: Re-enable this check when this issue is fixed: https://github.com/pola-rs/polars/issues/10466
_ = capfd.readouterr().err
# assert "OOC group_by started" in err


def test_streaming_unique(monkeypatch: Any, capfd: Any) -> None:
monkeypatch.setenv("POLARS_VERBOSE", "1")
df = pl.DataFrame({"a": [1, 2, 2, 2], "b": [3, 4, 4, 4], "c": [5, 6, 7, 7]})
q = df.lazy().unique(subset=["a", "c"], maintain_order=False).sort(["a", "b", "c"])
assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False))

q = df.lazy().unique(subset=["b", "c"], maintain_order=False).sort(["a", "b", "c"])
assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False))

q = df.lazy().unique(subset=None, maintain_order=False).sort(["a", "b", "c"])
assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False))
(_, err) = capfd.readouterr()
assert "df -> re-project-sink -> sort_multiple" in err
with pl.Config(
temp_dir=tmp_path,
verbose=True,
):
monkeypatch.setenv("POLARS_FORCE_OOC", "1")
monkeypatch.setenv("POLARS_STREAMING_GROUPBY_SPILL_SIZE", "256")

df = pl.read_csv(io_files_path / "foods*.csv")
# this creates 10M rows
q = df.lazy()
q = q.join(q, how="cross").select(df.columns).head(10_000)

# uses out-of-core unique
df1 = q.join(q.head(1000), how="cross").unique().collect(streaming=True)

# confirm that the custom temp path was used to spill ooc op to disk
assert os.listdir(tmp_path), f"Temp directory '{tmp_path}' should not be empty"

# this ensures the cross join gives equal result but uses the in-memory unique
df2 = q.join(q.head(1000), how="cross").collect(streaming=True).unique()
assert df1.shape == df2.shape

# TODO: Re-enable this check when this issue is fixed: https://github.com/pola-rs/polars/issues/10466
_ = capfd.readouterr().err
# assert "OOC group_by started" in err


def test_streaming_unique(capfd: Any) -> None:
with pl.Config(verbose=True):
df = pl.DataFrame({"a": [1, 2, 2, 2], "b": [3, 4, 4, 4], "c": [5, 6, 7, 7]})
q = (
df.lazy()
.unique(subset=["a", "c"], maintain_order=False)
.sort(["a", "b", "c"])
)
assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False))

q = (
df.lazy()
.unique(subset=["b", "c"], maintain_order=False)
.sort(["a", "b", "c"])
)
assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False))

q = df.lazy().unique(subset=None, maintain_order=False).sort(["a", "b", "c"])
assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False))
(_, err) = capfd.readouterr()
assert "df -> re-project-sink -> sort_multiple" in err
1 change: 1 addition & 0 deletions py-polars/tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,7 @@ def test_warn_unstable(recwarn: pytest.WarningsRecorder) -> None:
("POLARS_TABLE_WIDTH", "set_tbl_width_chars", 80, "80"),
("POLARS_VERBOSE", "set_verbose", True, "1"),
("POLARS_WARN_UNSTABLE", "warn_unstable", True, "1"),
("POLARS_TEMP_DIR", "set_temp_dir", "/other/tmp", "/other/tmp"),
],
)
def test_unset_config_env_vars(
Expand Down
Loading