From 1d7c0337f92504c8616b337818b4df0630932bd2 Mon Sep 17 00:00:00 2001 From: Marius van Niekerk Date: Mon, 2 Nov 2020 13:13:19 -0500 Subject: [PATCH] Add a simple test suite for doing downstream tests --- .github/workflows/downstream.yml | 45 +++++++ downstream_tests/test_dask_dataframe.py | 162 ++++++++++++++++++++++++ 2 files changed, 207 insertions(+) create mode 100644 .github/workflows/downstream.yml create mode 100644 downstream_tests/test_dask_dataframe.py diff --git a/.github/workflows/downstream.yml b/.github/workflows/downstream.yml new file mode 100644 index 00000000..0181367d --- /dev/null +++ b/.github/workflows/downstream.yml @@ -0,0 +1,45 @@ +name: CI + +on: + push: + branches: master + pull_request: + branches: master + +jobs: + checks: + name: Downstream tests (${{ matrix.python-version }}, ${{ matrix.os }}) + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: ["ubuntu-latest", "macos-latest", "windows-latest"] + python-version: ["3.7", "3.8"] + pyarrow-version: [">=1,<2", ">=2,<3"] + + - uses: conda-incubator/setup-miniconda@v1 + with: + auto-update-conda: true + python-version: ${{ matrix.python-version }} + mamba-version: "*" + + - name: Checkout + uses: actions/checkout@v2 + + - name: Install conda dependencies + run: > + mamba create -n test-downstreams + -c conda-forge + 'python=${{ matrix.python-version }}' + 'pyarrow=${{ matrix.pyarrow-version }}' + pip botocore aiobotocore moto pytest fastparquet + + - name: Install portions from source + run: | + conda activate test-downstreams + pip install git+https://github.com/intake/filesystem_spec/ --no-deps + + - name: Run test suite + run: | + conda activate test-downstreams + pytest -vrsx downstream_tests diff --git a/downstream_tests/test_dask_dataframe.py b/downstream_tests/test_dask_dataframe.py new file mode 100644 index 00000000..42ca5609 --- /dev/null +++ b/downstream_tests/test_dask_dataframe.py @@ -0,0 +1,162 @@ +import pytest +from pytest import fixture +import pandas as pd +import numpy as np +import time +import pyarrow as pa +import dask.dataframe as dd +import s3fs +import moto.server +import sys + + +@fixture(scope="session") +def partitioned_dataset() -> dict: + rows = 500000 + cds_df = pd.DataFrame( + { + "id": range(rows), + "part_key": np.random.choice(["A", "B", "C", "D"], rows), + "timestamp": np.random.randint(1051638817, 1551638817, rows), + "int_value": np.random.randint(0, 60000, rows), + } + ) + return {"dataframe": cds_df, "partitioning_column": "part_key"} + + +def free_port(): + import socketserver + + with socketserver.TCPServer(("localhost", 0), None) as s: + free_port = s.server_address[1] + return free_port + + +@fixture(scope="session") +def moto_server(): + import subprocess + + port = free_port() + process = subprocess.Popen([ + sys.executable, + moto.server.__file__, + '--port', str(port), + '--host', 'localhost', + 's3' + ]) + + s3fs_kwargs = dict( + client_kwargs={"endpoint_url": f'http://localhost:{port}'}, + ) + + start = time.time() + while True: + try: + fs = s3fs.S3FileSystem(skip_instance_cache=True, **s3fs_kwargs) + fs.ls("/") + except: + if time.time() - start > 30: + raise TimeoutError("Could not get a working moto server in time") + time.sleep(0.1) + + break + + yield s3fs_kwargs + + process.terminate() + + +@fixture(scope="session") +def moto_s3fs(moto_server): + return s3fs.S3FileSystem(**moto_server) + + +@fixture(scope="session") +def s3_bucket(moto_server): + test_bucket_name = 'test' + from botocore.session import Session + # NB: we use the sync botocore client for setup + session = Session() + client = session.create_client('s3', **moto_server['client_kwargs']) + client.create_bucket(Bucket=test_bucket_name, ACL='public-read') + return test_bucket_name + + +@fixture(scope="session") +def partitioned_parquet_path(partitioned_dataset, moto_s3fs, s3_bucket): + cds_df = partitioned_dataset["dataframe"] + table = pa.Table.from_pandas(cds_df, preserve_index=False) + path = s3_bucket + "/partitioned/dataset" + import pyarrow.parquet + + pyarrow.parquet.write_to_dataset( + table, + path, + filesystem=moto_s3fs, + partition_cols=[ + partitioned_dataset["partitioning_column"] + ], # new parameter included + ) + + # storage_options = dict(use_listings_cache=False) + # storage_options.update(docker_aws_s3.s3fs_kwargs) + # + # import dask.dataframe + # + # ddf = dask.dataframe.read_parquet( + # f"s3://{path}", storage_options=storage_options, gather_statistics=False + # ) + # all_rows = ddf.compute() + # assert "name" in all_rows.columns + return path + + +@pytest.fixture(scope='session', params=[ + pytest.param("pyarrow"), + pytest.param("fastparquet"), +]) +def parquet_engine(request): + return request.param + + +@pytest.fixture(scope='session', params=[ + pytest.param(False, id='gather_statistics=F'), + pytest.param(True, id='gather_statistics=T'), +]) +def gather_statistics(request): + return request.param + + +def test_partitioned_read(partitioned_dataset, partitioned_parquet_path, moto_server, parquet_engine, gather_statistics): + """The directory based reading is quite finicky""" + storage_options = moto_server.copy() + ddf = dd.read_parquet( + f"s3://{partitioned_parquet_path}", + storage_options=storage_options, + gather_statistics=gather_statistics, + engine=parquet_engine + ) + + assert 'part_key' in ddf.columns + actual = ddf.compute().sort_values('id') + + assert actual == partitioned_dataset["dataframe"] + + +def test_non_partitioned_read(partitioned_dataset, partitioned_parquet_path, moto_server, parquet_engine, gather_statistics): + """The directory based reading is quite finicky""" + storage_options = moto_server.copy() + ddf = dd.read_parquet( + f"s3://{partitioned_parquet_path}/part_key=A", + storage_options=storage_options, + gather_statistics=gather_statistics, + engine=parquet_engine + ) + + if parquet_engine == 'pyarrow': + assert 'part_key' in ddf.columns + actual = ddf.compute().sort_values('id') + expected = partitioned_dataset["dataframe"] + expected = expected.loc[expected.part_key == "A"] + + assert actual == expected