Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TPC-H] Configure temporary directory in duckdb for out-of-core processing #1510

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 88 additions & 39 deletions tests/tpch/test_duckdb.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import uuid
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path

import botocore.session
import pytest

Expand All @@ -8,178 +13,222 @@
from . import duckdb_queries # noqa: E402


@contextmanager
def tempdir(base_path: str) -> Iterator[str]:
path = Path(base_path) / str(uuid.uuid4())
path.mkdir()
try:
yield path.name
finally:
path.rmdir()


@pytest.fixture
def connection(local, restart):
def _():
con = duckdb.connect()

if not local: # Setup s3 credentials
session = botocore.session.Session()
creds = session.get_credentials()
con.install_extension("httpfs")
con.load_extension("httpfs")
con.sql(
f"""
SET s3_region='us-east-2';
SET s3_access_key_id='{creds.access_key}';
SET s3_secret_access_key='{creds.secret_key}';
SET s3_session_token='{creds.token}';
"""
)
return con
@contextmanager
def _():
if local:
from tempfile import TemporaryDirectory

tempdir_ctx = TemporaryDirectory()
else:
from distributed import get_worker

tempdir_ctx = tempdir(get_worker().local_directory)

with tempdir_ctx as dir, duckdb.connect() as con:
con.sql(f"SET temp_directory='{dir}';")

if not local:
# Setup s3 credentials
session = botocore.session.Session()
creds = session.get_credentials()
con.install_extension("httpfs")
con.load_extension("httpfs")
con.sql(
f"""
SET s3_region='us-east-2';
SET s3_access_key_id='{creds.access_key}';
SET s3_secret_access_key='{creds.secret_key}';
SET s3_session_token='{creds.token}';
"""
)
yield con

return _


def test_query_1(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_1(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_1(con, dataset_path, scale)

run(_)


def test_query_2(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_2(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_2(con, dataset_path, scale)

run(_)


def test_query_3(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_3(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_3(con, dataset_path, scale)

run(_)


def test_query_4(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_4(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_4(con, dataset_path, scale)

run(_)


def test_query_5(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_5(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_5(con, dataset_path, scale)

run(_)


def test_query_6(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_6(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_6(con, dataset_path, scale)

run(_)


def test_query_7(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_7(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_7(con, dataset_path, scale)

run(_)


def test_query_8(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_8(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_8(con, dataset_path, scale)

run(_)


def test_query_9(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_9(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_9(con, dataset_path, scale)

run(_)


def test_query_10(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_10(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_10(con, dataset_path, scale)

run(_)


def test_query_11(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_11(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_11(con, dataset_path, scale)

run(_)


def test_query_12(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_12(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_12(con, dataset_path, scale)

run(_)


def test_query_13(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_13(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_13(con, dataset_path, scale)

run(_)


def test_query_14(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_14(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_14(con, dataset_path, scale)

run(_)


def test_query_15(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_15(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_15(con, dataset_path, scale)

run(_)


def test_query_16(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_16(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_16(con, dataset_path, scale)

run(_)


def test_query_17(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_17(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_17(con, dataset_path, scale)

run(_)


def test_query_18(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_18(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_18(con, dataset_path, scale)

run(_)


def test_query_19(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_19(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_19(con, dataset_path, scale)

run(_)


def test_query_20(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_20(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_20(con, dataset_path, scale)

run(_)


def test_query_21(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_21(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_21(con, dataset_path, scale)

run(_)


def test_query_22(run, connection, dataset_path, scale):
def _():
duckdb_queries.query_22(connection(), dataset_path, scale)
with connection() as con:
duckdb_queries.query_22(con, dataset_path, scale)

run(_)
Loading