Skip to content

Commit

Permalink
Add a simple test suite for doing downstream tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusvniekerk committed Nov 2, 2020
1 parent 327e3fc commit 1d7c033
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 0 deletions.
45 changes: 45 additions & 0 deletions .github/workflows/downstream.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: CI

on:
push:
branches: master
pull_request:
branches: master

jobs:
checks:
name: Downstream tests (${{ matrix.python-version }}, ${{ matrix.os }})
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
python-version: ["3.7", "3.8"]
pyarrow-version: [">=1,<2", ">=2,<3"]

- uses: conda-incubator/setup-miniconda@v1
with:
auto-update-conda: true
python-version: ${{ matrix.python-version }}
mamba-version: "*"

- name: Checkout
uses: actions/checkout@v2

- name: Install conda dependencies
run: >
mamba create -n test-downstreams
-c conda-forge
'python=${{ matrix.python-version }}'
'pyarrow=${{ matrix.pyarrow-version }}'
pip botocore aiobotocore moto pytest fastparquet
- name: Install portions from source
run: |
conda activate test-downstreams
pip install git+https://github.com/intake/filesystem_spec/ --no-deps
- name: Run test suite
run: |
conda activate test-downstreams
pytest -vrsx downstream_tests
162 changes: 162 additions & 0 deletions downstream_tests/test_dask_dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import pytest
from pytest import fixture
import pandas as pd
import numpy as np
import time
import pyarrow as pa
import dask.dataframe as dd
import s3fs
import moto.server
import sys


@fixture(scope="session")
def partitioned_dataset() -> dict:
rows = 500000
cds_df = pd.DataFrame(
{
"id": range(rows),
"part_key": np.random.choice(["A", "B", "C", "D"], rows),
"timestamp": np.random.randint(1051638817, 1551638817, rows),
"int_value": np.random.randint(0, 60000, rows),
}
)
return {"dataframe": cds_df, "partitioning_column": "part_key"}


def free_port():
import socketserver

with socketserver.TCPServer(("localhost", 0), None) as s:
free_port = s.server_address[1]
return free_port


@fixture(scope="session")
def moto_server():
import subprocess

port = free_port()
process = subprocess.Popen([
sys.executable,
moto.server.__file__,
'--port', str(port),
'--host', 'localhost',
's3'
])

s3fs_kwargs = dict(
client_kwargs={"endpoint_url": f'http://localhost:{port}'},
)

start = time.time()
while True:
try:
fs = s3fs.S3FileSystem(skip_instance_cache=True, **s3fs_kwargs)
fs.ls("/")
except:
if time.time() - start > 30:
raise TimeoutError("Could not get a working moto server in time")
time.sleep(0.1)

break

yield s3fs_kwargs

process.terminate()


@fixture(scope="session")
def moto_s3fs(moto_server):
return s3fs.S3FileSystem(**moto_server)


@fixture(scope="session")
def s3_bucket(moto_server):
test_bucket_name = 'test'
from botocore.session import Session
# NB: we use the sync botocore client for setup
session = Session()
client = session.create_client('s3', **moto_server['client_kwargs'])
client.create_bucket(Bucket=test_bucket_name, ACL='public-read')
return test_bucket_name


@fixture(scope="session")
def partitioned_parquet_path(partitioned_dataset, moto_s3fs, s3_bucket):
cds_df = partitioned_dataset["dataframe"]
table = pa.Table.from_pandas(cds_df, preserve_index=False)
path = s3_bucket + "/partitioned/dataset"
import pyarrow.parquet

pyarrow.parquet.write_to_dataset(
table,
path,
filesystem=moto_s3fs,
partition_cols=[
partitioned_dataset["partitioning_column"]
], # new parameter included
)

# storage_options = dict(use_listings_cache=False)
# storage_options.update(docker_aws_s3.s3fs_kwargs)
#
# import dask.dataframe
#
# ddf = dask.dataframe.read_parquet(
# f"s3://{path}", storage_options=storage_options, gather_statistics=False
# )
# all_rows = ddf.compute()
# assert "name" in all_rows.columns
return path


@pytest.fixture(scope='session', params=[
pytest.param("pyarrow"),
pytest.param("fastparquet"),
])
def parquet_engine(request):
return request.param


@pytest.fixture(scope='session', params=[
pytest.param(False, id='gather_statistics=F'),
pytest.param(True, id='gather_statistics=T'),
])
def gather_statistics(request):
return request.param


def test_partitioned_read(partitioned_dataset, partitioned_parquet_path, moto_server, parquet_engine, gather_statistics):
"""The directory based reading is quite finicky"""
storage_options = moto_server.copy()
ddf = dd.read_parquet(
f"s3://{partitioned_parquet_path}",
storage_options=storage_options,
gather_statistics=gather_statistics,
engine=parquet_engine
)

assert 'part_key' in ddf.columns
actual = ddf.compute().sort_values('id')

assert actual == partitioned_dataset["dataframe"]


def test_non_partitioned_read(partitioned_dataset, partitioned_parquet_path, moto_server, parquet_engine, gather_statistics):
"""The directory based reading is quite finicky"""
storage_options = moto_server.copy()
ddf = dd.read_parquet(
f"s3://{partitioned_parquet_path}/part_key=A",
storage_options=storage_options,
gather_statistics=gather_statistics,
engine=parquet_engine
)

if parquet_engine == 'pyarrow':
assert 'part_key' in ddf.columns
actual = ddf.compute().sort_values('id')
expected = partitioned_dataset["dataframe"]
expected = expected.loc[expected.part_key == "A"]

assert actual == expected

0 comments on commit 1d7c033

Please sign in to comment.