From 37169f32eee476a58a67d49a91b1164fc4808f61 Mon Sep 17 00:00:00 2001 From: Solomon Negusse Date: Thu, 25 May 2023 15:50:50 -0500 Subject: [PATCH] Revert "Staging -> Production (VIIRS metadata and unique constraint creation options PRs)" --- app/crud/assets.py | 34 + app/crud/metadata.py | 4 +- app/models/enum/creation_options.py | 5 - app/models/orm/queries/raster_assets.py | 23 - app/models/pydantic/creation_options.py | 40 +- app/routes/datasets/queries.py | 21 +- app/tasks/table_source_assets.py | 20 +- batch/scripts/append_data.sh | 21 + batch/scripts/create_tabular_schema.sh | 67 +- batch/scripts/get_arguments.sh | 5 - batch/scripts/load_tabular_data.sh | 26 +- tests/conftest.py | 6 +- tests/tasks/__init__.py | 54 +- tests/tasks/test_table_source_assets.py | 790 ++++++----------------- tests/tasks/test_vector_source_assets.py | 46 +- tests/utils.py | 56 +- tests_v2/conftest.py | 3 +- 17 files changed, 362 insertions(+), 859 deletions(-) delete mode 100644 app/models/orm/queries/raster_assets.py create mode 100644 batch/scripts/append_data.sh diff --git a/app/crud/assets.py b/app/crud/assets.py index a3b56d0d6..2580998b3 100644 --- a/app/crud/assets.py +++ b/app/crud/assets.py @@ -37,6 +37,40 @@ async def get_assets(dataset: str, version: str) -> List[ORMAsset]: return rows +@alru_cache(maxsize=128) +async def get_raster_tile_sets(): + latest_tile_sets = await ( + ORMAsset.join(ORMVersion) + .select() + .with_only_columns( + [ + ORMAsset.asset_id, + ORMAsset.dataset, + ORMAsset.version, + ORMAsset.creation_options, + ORMAsset.asset_uri, + ] + ) + .where( + and_( + ORMAsset.asset_type == AssetType.raster_tile_set, + ORMVersion.is_latest == True, # noqa: E712 + ) + ) + ).gino.all() + + assets_with_metadata = [] + for asset in latest_tile_sets: + asset_dict = dict(asset.items()) + try: + metadata = await get_asset_metadata(asset.asset_id) + asset_dict["metadata"] = metadata + except RecordNotFoundError: + pass + assets_with_metadata.append(asset_dict) + + return assets_with_metadata + async def get_assets_by_type(asset_type: str) -> List[ORMAsset]: assets = await ORMAsset.query.where(ORMAsset.asset_type == asset_type).gino.all() diff --git a/app/crud/metadata.py b/app/crud/metadata.py index d05976744..15d1b791a 100644 --- a/app/crud/metadata.py +++ b/app/crud/metadata.py @@ -80,8 +80,8 @@ async def create_version_metadata(dataset: str, version: str, **data): content_date_range = data.pop("content_date_range", None) if content_date_range: - data["content_start_date"] = content_date_range.get("start_date") - data["content_end_date"] = content_date_range.get("end_date") + data["content_start_date"] = content_date_range["start_date"] + data["content_end_date"] = content_date_range["end_date"] try: new_metadata: ORMVersionMetadata = await ORMVersionMetadata.create( diff --git a/app/models/enum/creation_options.py b/app/models/enum/creation_options.py index c5ff44578..aa97d2db9 100644 --- a/app/models/enum/creation_options.py +++ b/app/models/enum/creation_options.py @@ -38,11 +38,6 @@ class IndexType(str, Enum): hash = "hash" -class ConstraintType(str, Enum): - __doc__ = "Constraint type" - unique = "unique" - - class TileStrategy(str, Enum): __doc__ = ( "Tile strategy for generating vector tiles. " diff --git a/app/models/orm/queries/raster_assets.py b/app/models/orm/queries/raster_assets.py deleted file mode 100644 index 0a199024a..000000000 --- a/app/models/orm/queries/raster_assets.py +++ /dev/null @@ -1,23 +0,0 @@ -from ....application import db - -_raster_assets_sql = """ -SELECT - assets.asset_id, - assets.dataset, - assets.version, - creation_options, - asset_uri, - rb.values_table -FROM - assets - LEFT JOIN asset_metadata am - ON am.asset_id = assets.asset_id - JOIN versions - ON versions.version = assets.version - LEFT JOIN raster_band_metadata rb - ON rb.asset_metadata_id = am.id - WHERE versions.is_latest = true - AND assets.asset_type = 'Raster tile set' -""" - -latest_raster_tile_sets = db.text(_raster_assets_sql) diff --git a/app/models/pydantic/creation_options.py b/app/models/pydantic/creation_options.py index 44fb25659..0411f8d70 100644 --- a/app/models/pydantic/creation_options.py +++ b/app/models/pydantic/creation_options.py @@ -8,7 +8,6 @@ from ...settings.globals import DEFAULT_JOB_DURATION, PIXETL_DEFAULT_RESAMPLING from ..enum.assets import AssetType, is_default_asset from ..enum.creation_options import ( - ConstraintType, Delimiters, IndexType, PartitionType, @@ -47,27 +46,9 @@ class Index(StrictBaseModel): index_type: IndexType column_names: List[str] = Field( - ..., - description="Columns to be used by index", - regex=COLUMN_REGEX, - min_items=1, - max_items=32, # A PostgreSQL upper limit - ) - - -class Constraint(StrictBaseModel): - constraint_type: ConstraintType - column_names: List[str] = Field( - ..., - description="Columns included in the constraint", - regex=COLUMN_REGEX, - min_items=1, - max_items=32, # A PostgreSQL upper limit + ..., description="Columns to be used by index", regex=COLUMN_REGEX ) - class Config: - orm_mode = True - class HashPartitionSchema(StrictBaseModel): partition_count: PositiveInt @@ -235,6 +216,7 @@ def validate_source_uri(cls, v, values, **kwargs): class TableAssetCreationOptions(StrictBaseModel): has_header: bool = Field(True, description="Input file has header. Must be true") delimiter: Delimiters = Field(..., description="Delimiter used in input file") + latitude: Optional[str] = Field( None, description="Column with latitude coordinate", regex=COLUMN_REGEX ) @@ -248,9 +230,6 @@ class TableAssetCreationOptions(StrictBaseModel): None, description="Partitioning schema (optional)" ) indices: List[Index] = Field([], description="List of indices to add to table") - constraints: Optional[List[Constraint]] = Field( - None, description="List of constraints to add to table. (optional)" - ) table_schema: Optional[List[FieldType]] = Field( None, description="List of Field Types. Missing field types will be inferred. (optional)", @@ -263,17 +242,6 @@ class TableAssetCreationOptions(StrictBaseModel): ) timeout: int = DEFAULT_JOB_DURATION - @validator("constraints") - def validate_max_1_unique_constraints(cls, v, values, **kwargs): - if v is not None: - unique_constraints = [ - c for c in v if c.constraint_type == ConstraintType.unique - ] - assert ( - len(unique_constraints) < 2 - ), "Currently cannot specify more than 1 unique constraint" - return v - class TableSourceCreationOptions(TableAssetCreationOptions): source_type: TableSourceType = Field(..., description="Source type of input file.") @@ -381,18 +349,18 @@ class StaticVectorFileCreationOptions(StrictBaseModel): SourceCreationOptions = Union[ - TableSourceCreationOptions, RasterTileSetSourceCreationOptions, + TableSourceCreationOptions, VectorSourceCreationOptions, ] OtherCreationOptions = Union[ - TableAssetCreationOptions, RasterTileCacheCreationOptions, StaticVectorTileCacheCreationOptions, StaticVectorFileCreationOptions, DynamicVectorTileCacheCreationOptions, RasterTileSetAssetCreationOptions, + TableAssetCreationOptions, ] CreationOptions = Union[SourceCreationOptions, OtherCreationOptions] diff --git a/app/routes/datasets/queries.py b/app/routes/datasets/queries.py index cbc39d000..501055872 100644 --- a/app/routes/datasets/queries.py +++ b/app/routes/datasets/queries.py @@ -27,7 +27,6 @@ # from ...authentication.api_keys import get_api_key from ...crud import assets -from ...models.orm.queries.raster_assets import latest_raster_tile_sets from ...models.enum.assets import AssetType from ...models.enum.creation_options import Delimiters from ...models.enum.geostore import GeostoreOrigin @@ -647,11 +646,11 @@ def _get_default_layer(dataset, pixel_meaning): async def _get_data_environment(grid: Grid) -> DataEnvironment: # get all Raster tile set assets - latest_tile_sets = await db.all(latest_raster_tile_sets) + latest_tile_sets = await assets.get_raster_tile_sets() # create layers layers: List[Layer] = [] for row in latest_tile_sets: - creation_options = row.creation_options + creation_options = row["creation_options"] if creation_options["grid"] != grid: # skip if not on the right grid continue @@ -665,9 +664,13 @@ async def _get_data_environment(grid: Grid) -> DataEnvironment: continue if creation_options["pixel_meaning"] == "is": - source_layer_name = f"{creation_options['pixel_meaning']}__{row.dataset}" + source_layer_name = ( + f"{creation_options['pixel_meaning']}__{row['dataset']}" + ) else: - source_layer_name = f"{row.dataset}__{creation_options['pixel_meaning']}" + source_layer_name = ( + f"{row['dataset']}__{creation_options['pixel_meaning']}" + ) no_data_val = parse_obj_as( Optional[Union[List[NoDataType], NoDataType]], @@ -676,10 +679,14 @@ async def _get_data_environment(grid: Grid) -> DataEnvironment: if isinstance(no_data_val, List): no_data_val = no_data_val[0] - raster_table = getattr(row, "values_table", None) + bands = getattr(row.get("metadata"), "bands", []) + raster_table = None + if bands: + raster_table = RasterTable(**bands[0].to_dict()["values_table"]) + print("TABLE", raster_table) layers.append( _get_source_layer( - row.asset_uri, + row['asset_uri'], source_layer_name, grid, no_data_val, diff --git a/app/tasks/table_source_assets.py b/app/tasks/table_source_assets.py index 030a8fb18..4bd80b0f6 100644 --- a/app/tasks/table_source_assets.py +++ b/app/tasks/table_source_assets.py @@ -3,7 +3,6 @@ from typing import Any, Dict, List, Optional from uuid import UUID -from ..models.enum.creation_options import ConstraintType from ..models.pydantic.change_log import ChangeLog from ..models.pydantic.creation_options import ( Index, @@ -23,6 +22,7 @@ async def table_source_asset( asset_id: UUID, input_data: Dict[str, Any], ) -> ChangeLog: + creation_options = TableSourceCreationOptions(**input_data["creation_options"]) if creation_options.source_uri: source_uris: List[str] = creation_options.source_uri @@ -40,14 +40,9 @@ async def table_source_asset( version, "-s", source_uris[0], + "-m", + json.dumps(creation_options.dict(by_alias=True)["table_schema"]), ] - if creation_options.table_schema: - command.extend( - [ - "-m", - json.dumps(creation_options.dict(by_alias=True)["table_schema"]), - ] - ) if creation_options.partitions: command.extend( [ @@ -57,13 +52,6 @@ async def table_source_asset( creation_options.partitions.partition_column, ] ) - if creation_options.constraints: - unique_constraint_columns = [] - for constraint in creation_options.constraints: - if constraint.constraint_type == ConstraintType.unique: - unique_constraint_columns += constraint.column_names - - command.extend(["-u", ",".join(unique_constraint_columns)]) job_env: List[Dict[str, Any]] = writer_secrets + [ {"name": "ASSET_ID", "value": str(asset_id)} @@ -407,7 +395,7 @@ def _create_cluster_jobs( if partitions: # When using partitions we need to cluster each partition table separately. - # Play it safe and cluster partition tables one after the other. + # Playing it save and cluster partition tables one after the other. # TODO: Still need to test if we can cluster tables which are part of the same partition concurrently. # this would speed up this step by a lot. Partitions require a full lock on the table, # but I don't know if the lock is acquired for the entire partition or only the partition table. diff --git a/batch/scripts/append_data.sh b/batch/scripts/append_data.sh new file mode 100644 index 000000000..3887e3799 --- /dev/null +++ b/batch/scripts/append_data.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +set -e + +# requires arguments +# -d | --dataset +# -v | --version +# -s | --source +# -D | --delimiter +ME=$(basename "$0") +. get_arguments.sh "$@" + + +# Unescape TAB character +if [ "$DELIMITER" == "\t" ]; then + DELIMITER=$(echo -e "\t") +fi + +for uri in "${SRC[@]}"; do + aws s3 cp "${uri}" - | psql -c "COPY \"$DATASET\".\"$VERSION\" FROM STDIN WITH (FORMAT CSV, DELIMITER '$DELIMITER', HEADER)" +done diff --git a/batch/scripts/create_tabular_schema.sh b/batch/scripts/create_tabular_schema.sh index fece2f52f..005057d3a 100755 --- a/batch/scripts/create_tabular_schema.sh +++ b/batch/scripts/create_tabular_schema.sh @@ -6,54 +6,41 @@ set -e # -d | --dataset # -v | --version # -s | --source - -# optional arguments # -p | --partition_type # -c | --column_name # -m | --field_map -# -u | --unique_constraint - ME=$(basename "$0") . get_arguments.sh "$@" - -if [[ -n "${UNIQUE_CONSTRAINT_COLUMN_NAMES}" ]]; then - UNIQUE_CONSTRAINT="--unique-constraint ${UNIQUE_CONSTRAINT_COLUMN_NAMES}" -fi - -# Fetch first 100 lines from CSV, analyze and create table create statement -# The `head` command will cause a broken pipe error for `aws s3 cp`: This is -# expected and can be ignored, so temporarily use set +e here. HOWEVER we -# still want to know if csvsql had a problem, so check its exit status +# The `head` command will cause a broken pipe error for `aws s3 cp`, this is expected and can be ignored +# We hence we have to temporarily use set +e here set +e -aws s3 cp "${SRC}" - | head -100 | csvsql -i postgresql --no-constraints $UNIQUE_CONSTRAINT --tables "$VERSION" -q \" > create_table.sql -CSVSQL_EXIT_CODE="${PIPESTATUS[2]}" # Grab the exit code of the csvsql command -set -e +# Fetch first 100 rows from input table, analyse and create table create statement +aws s3 cp "${SRC}" - | head -100 | csvsql -i postgresql --no-constraints --tables "$VERSION" -q \" > create_table.sql -if [ $CSVSQL_EXIT_CODE -ne 0 ]; then - echo "csvsql exited with code ${CSVSQL_EXIT_CODE}" - exit $CSVSQL_EXIT_CODE -fi +set -e # csvsql sets the quotes for schema and table wrong. It is safer to set the schema separately sed -i "1s/^/SET SCHEMA '$DATASET';\n/" create_table.sql -# If the user has specified a field map, override csvsql's inferred field -# types with those specified. It is expected that $FIELD_MAP contains (after -# sourcing get_arguments.sh above) a JSON List of the form -# '[{"name":"name1", "data_type":"type1"},{"name":"name2", "data_type":"type2"}]' +# update field types +# This expects a JSON List like this '[{"name":"name1", "data_type":"type1"},{"name":"name2", "data_type":"type2"}]' +# It will export the different key value pairs as ENV variables so that we can reference them within the for loop +# We will then update rows within our create_table.sql file using the new field type # https://starkandwayne.com/blog/bash-for-loop-over-json-array-using-jq/ if [[ -n "${FIELD_MAP}" ]]; then for row in $(echo "${FIELD_MAP}" | jq -r '.[] | @base64'); do - _jq() { - echo "${row}" | base64 --decode | jq -r "${1}" - } - FIELD_NAME=$(_jq '.name') - FIELD_TYPE=$(_jq '.data_type') - - # Override the field types, whether naked or in double quotes - sed -i "s/^\t${FIELD_NAME} .*$/\t${FIELD_NAME} ${FIELD_TYPE},/" create_table.sql - sed -i "s/^\t\"${FIELD_NAME}\" .*$/\t\"${FIELD_NAME}\" ${FIELD_TYPE},/" create_table.sql + _jq() { + echo "${row}" | base64 --decode | jq -r "${1}" + } + FIELD_NAME=$(_jq '.name') + FIELD_TYPE=$(_jq '.data_type') + + # field names might be in double quotes + # make sure there is no comma after the last field + sed -i "s/^\t${FIELD_NAME} .*$/\t${FIELD_NAME} ${FIELD_TYPE},/" create_table.sql + sed -i "s/^\t\"${FIELD_NAME}\" .*$/\t\"${FIELD_NAME}\" ${FIELD_TYPE},/" create_table.sql + sed -i 'x; ${s/,//;p;x}; 1d' create_table.sql done fi @@ -63,16 +50,6 @@ if [[ -n "${PARTITION_TYPE}" ]]; then sed -i "s/);$/) PARTITION BY $PARTITION_TYPE ($COLUMN_NAME);/g" create_table.sql fi -# After enforcing the field map above, the last field will now have a trailing -# comma before a parenthesis . Remove it. -# This is difficult to do with sed because the comma and parenthesis are on -# different lines, and sed likes to work with a line at a time. So first turn -# all tabs into spaces, turn all newlines into spaces, and then squish all -# consecutive spaces into one. Finally remove the comma(s). -cat create_table.sql |tr "\t" " " | tr "\n" " "| tr -s " " | sed "s/, )/)/" > create_table_squeezed.sql - -echo "Resulting create_table_squeezed.sql:" -cat create_table_squeezed.sql -echo "end of create_table_squeezed.sql" +cat create_table.sql -psql -v ON_ERROR_STOP=1 -f create_table_squeezed.sql \ No newline at end of file +psql -f create_table.sql diff --git a/batch/scripts/get_arguments.sh b/batch/scripts/get_arguments.sh index 297e5eab5..64b541aec 100755 --- a/batch/scripts/get_arguments.sh +++ b/batch/scripts/get_arguments.sh @@ -188,11 +188,6 @@ do shift # past argument shift # past value ;; - -u|--unique_constraint) - UNIQUE_CONSTRAINT_COLUMN_NAMES="$2" - shift # past argument - shift # past value - ;; -v|--version) VERSION="$2" shift # past argument diff --git a/batch/scripts/load_tabular_data.sh b/batch/scripts/load_tabular_data.sh index f63ecce03..495f1f66d 100755 --- a/batch/scripts/load_tabular_data.sh +++ b/batch/scripts/load_tabular_data.sh @@ -7,7 +7,6 @@ set -e # -v | --version # -s | --source # -D | --delimiter - ME=$(basename "$0") . get_arguments.sh "$@" @@ -17,24 +16,13 @@ if [ "$DELIMITER" == "\t" ]; then DELIMITER=$(echo -e "\t") fi -# I think Postgres temporary tables are such that concurrent jobs won't -# interfere with each other, but make the temp table name unique just -# in case. -UUID=$(python -c 'import uuid; print(uuid.uuid4(), end="")' | sed s/-//g) -TEMP_TABLE="temp_${UUID}" - -# https://stackoverflow.com/questions/48019381/how-postgresql-copy-to-stdin-with-csv-do-on-conflic-do-update for uri in "${SRC[@]}"; do - aws s3 cp "${uri}" - | psql -c "BEGIN; - CREATE TEMP TABLE \"$TEMP_TABLE\" - (LIKE \"$DATASET\".\"$VERSION\" INCLUDING DEFAULTS) - ON COMMIT DROP; - - COPY \"$TEMP_TABLE\" FROM STDIN WITH (FORMAT CSV, DELIMITER '$DELIMITER', HEADER); + set +e + FIELDS=$(aws s3 cp "${uri}" - | head -1) + set -e - INSERT INTO \"$DATASET\".\"$VERSION\" - SELECT * FROM \"$TEMP_TABLE\" - ON CONFLICT DO NOTHING; + FIELDS=$(sed -e $'s/\\t/,/g' -e 's/^/"/;s/$/"/' -e 's/,/","/g' -e 's/\r$//' <<< "$FIELDS") + FIELDS="($FIELDS)" - COMMIT;" -done \ No newline at end of file + aws s3 cp "${uri}" - | sed -e 's/\r$//' | psql -c "COPY \"$DATASET\".\"$VERSION\" $FIELDS FROM STDIN WITH (FORMAT CSV, DELIMITER '$DELIMITER', HEADER)" +done diff --git a/tests/conftest.py b/tests/conftest.py index cfb7326de..1c9c9fdab 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,7 +8,6 @@ import httpx import numpy import pytest -import pytest_asyncio import rasterio from alembic.config import main from docker.models.containers import ContainerCollection @@ -241,7 +240,8 @@ def client(): main(["--raiseerr", "downgrade", "base"]) -@pytest_asyncio.fixture(autouse=True) +@pytest.fixture(autouse=True) +@pytest.mark.asyncio async def async_client(): """Async Test Client.""" from app.main import app @@ -319,7 +319,7 @@ def copy_fixtures(): out.close() -@pytest_asyncio.fixture(autouse=True) +@pytest.fixture(autouse=True) async def tmp_folder(): """Create TMP dir.""" diff --git a/tests/tasks/__init__.py b/tests/tasks/__init__.py index 230e7775f..d27dfaf03 100644 --- a/tests/tasks/__init__.py +++ b/tests/tasks/__init__.py @@ -112,50 +112,32 @@ async def check_version_status(dataset, version, log_count): row = await versions.get_version(dataset, version) assert row.status == "saved" - try: - assert len(row.change_log) == log_count - except AssertionError: - print(f"Expected {log_count} changelog rows, observed {len(row.change_log)}") - for cl in row.change_log: - print(cl) - raise + assert len(row.change_log) == log_count assert row.change_log[0]["message"] == "Successfully scheduled batch jobs" async def check_asset_status(dataset, version, nb_assets): rows = await assets.get_assets(dataset, version) - try: - assert len(rows) == nb_assets - except AssertionError: - print(f"Expected {nb_assets} asset rows, but found {len(rows)}") - for row in rows: - print(row) - raise + assert len(rows) == 2 + # in this test we don't set the final asset status to saved or failed assert rows[0].status == "saved" assert rows[0].is_default is True + # in this test we only see the logs from background task, not from batch jobs + assert len(rows[0].change_log) == nb_assets * 2 + async def check_task_status(asset_id: UUID, nb_jobs: int, last_job_name: str): rows = await tasks.get_tasks(asset_id) - try: - assert len(rows) == nb_jobs - except AssertionError: - print(f"Expected {nb_jobs} jobs but found {len(rows)}:") - for row in rows: - print(f"{row.change_log[0]['message']}") - raise + assert len(rows) == nb_jobs for row in rows: # in this test we don't set the final asset status to saved or failed assert row.status == "pending" # in this test we only see the logs from background task, not from batch jobs - try: - assert rows[-1].change_log[0]["message"] == f"Scheduled job {last_job_name}" - except AssertionError: - print(f"Last task's changelog: {rows[-1].change_log[0]['message']}") - raise + assert rows[-1].change_log[0]["message"] == f"Scheduled job {last_job_name}" async def check_dynamic_vector_tile_cache_status(dataset, version): @@ -173,19 +155,7 @@ async def check_dynamic_vector_tile_cache_status(dataset, version): assert geo_database.asset_type == AssetType.geo_database_table assert dynamic_vector_tile.asset_type == AssetType.dynamic_vector_tile_cache assert dynamic_vector_tile.status == AssetStatus.saved - assert ( - geo_database.metadata.fields[0].name - == dynamic_vector_tile.metadata.fields[0].name - ) - assert ( - geo_database.metadata.fields[0].data_type - == dynamic_vector_tile.metadata.fields[0].data_type - ) - assert ( - geo_database.metadata.fields[1].name - == dynamic_vector_tile.metadata.fields[1].name - ) - assert ( - geo_database.metadata.fields[1].data_type - == dynamic_vector_tile.metadata.fields[1].data_type - ) + assert geo_database.metadata.fields[0].name == dynamic_vector_tile.metadata.fields[0].name + assert geo_database.metadata.fields[0].data_type == dynamic_vector_tile.metadata.fields[0].data_type + assert geo_database.metadata.fields[1].name == dynamic_vector_tile.metadata.fields[1].name + assert geo_database.metadata.fields[1].data_type == dynamic_vector_tile.metadata.fields[1].data_type diff --git a/tests/tasks/test_table_source_assets.py b/tests/tasks/test_table_source_assets.py index 598865121..d92c82b25 100644 --- a/tests/tasks/test_table_source_assets.py +++ b/tests/tasks/test_table_source_assets.py @@ -1,249 +1,26 @@ -import copy import json -import string -from typing import Dict import httpx import pendulum import pytest -from httpx import AsyncClient from pendulum.parsing.exceptions import ParserError from app.application import ContextEngine, db +from app.utils.aws import get_s3_client -from .. import APPEND_TSV_NAME, BUCKET, PORT, TSV_NAME -from ..utils import ( - create_dataset, - create_default_asset, - get_cluster_count, - get_index_count, - get_partition_count, - get_row_count, - poll_jobs, -) +from .. import APPEND_TSV_NAME, BUCKET, PORT, TSV_NAME, TSV_PATH +from ..utils import create_default_asset, poll_jobs, version_metadata from . import check_asset_status, check_task_status, check_version_status -basic_table_input_data: Dict = { - "creation_options": { - "source_type": "table", - "source_uri": [f"s3://{BUCKET}/{TSV_NAME}"], - "source_driver": "text", - "delimiter": "\t", - "has_header": True, - "timeout": 600, - "latitude": None, - "longitude": None, - "create_dynamic_vector_tile_cache": False, - "partitions": None, - "cluster": None, - "indices": [], - "constraints": None, - "table_schema": [ - { - "name": "rspo_oil_palm__certification_status", - "data_type": "text", - }, - {"name": "per_forest_concession__type", "data_type": "text"}, - {"name": "idn_forest_area__type", "data_type": "text"}, - {"name": "alert__count", "data_type": "integer"}, - {"name": "adm1", "data_type": "integer"}, - {"name": "adm2", "data_type": "integer"}, - ], - } -} - @pytest.mark.asyncio -async def test_prove_correct_schema(): - from app.models.pydantic.creation_options import TableSourceCreationOptions - - input_data: Dict = copy.deepcopy(basic_table_input_data["creation_options"]) - input_data["cluster"] = { - "index_type": "btree", - "column_names": ["iso", "adm1", "adm2", "alert__date"], - } - bar = TableSourceCreationOptions(**input_data) - assert bar.cluster is not None - - -@pytest.mark.asyncio -async def test_table_source_asset_minimal(batch_client, async_client: AsyncClient): +async def test_table_source_asset(batch_client, async_client): _, logs = batch_client ############################ # Setup test ############################ - dataset = "table_test" - version = "v202002.1" - input_data: Dict = copy.deepcopy(basic_table_input_data) - - ##################### - # Test asset creation - ##################### - asset = await create_default_asset( - dataset, - version, - version_payload=input_data, - execute_batch_jobs=True, - logs=logs, - async_client=async_client, - ) - asset_id = asset["asset_id"] - - ################# - # Check results - ################# - await check_version_status(dataset, version, 2) - await check_asset_status(dataset, version, 1) # There should be 1 asset - - # There should be the following tasks: - # 1 to create the table schema - # 0 to partition because we didn't specify it - # 1 to load the data - # 0 to add point geometry because we didn't specify it - # 0 to add indices because we didn't specify them - # 0 to add clustering because we didn't specify it - await check_task_status(asset_id, 2, "load_data_0") - - # There should be a table called "table_test"."v202002.1" with 99 rows. - # It should have the right amount of partitions and indices - async with ContextEngine("READ"): - row_count = await get_row_count(db, dataset, version) - partition_count = await get_partition_count(db, dataset, version) - index_count = await get_index_count(db, dataset, version) - cluster_count = await get_cluster_count(db) - - assert row_count == 99 - assert partition_count == 0 - assert index_count == 0 - assert cluster_count == 0 - -@pytest.mark.asyncio -async def test_table_source_asset_indices(batch_client, async_client: AsyncClient): - _, logs = batch_client - - ############################ - # Setup test - ############################ - dataset = "table_test" - version = "v202002.1" - input_data: Dict = copy.deepcopy(basic_table_input_data) - input_data["creation_options"]["indices"] = [ - {"index_type": "btree", "column_names": ["iso"]}, - {"index_type": "hash", "column_names": ["rspo_oil_palm__certification_status"]}, - {"index_type": "btree", "column_names": ["iso", "adm1", "adm2", "alert__date"]}, - ] - - ##################### - # Test asset creation - ##################### - asset = await create_default_asset( - dataset, - version, - version_payload=input_data, - execute_batch_jobs=True, - logs=logs, - async_client=async_client, - ) - asset_id = asset["asset_id"] - - ################# - # Check results - ################# - await check_version_status(dataset, version, 2) - await check_asset_status(dataset, version, 1) # There should be 1 asset - - # There should be the following tasks: - # 1 to create the table schema - # 0 to partition because we didn't specify it - # 1 to load the data - # 0 to add point geometry because we didn't specify it - # 3 to add indices - # 0 to add clustering because we didn't specify it - await check_task_status(asset_id, 5, "create_index_iso_adm1_adm2_alert__date_btree") - - # There should be a table called "table_test"."v202002.1" with 99 rows. - # It should have the right amount of partitions and indices - async with ContextEngine("READ"): - row_count = await get_row_count(db, dataset, version) - partition_count = await get_partition_count(db, dataset, version) - index_count = await get_index_count(db, dataset, version) - cluster_count = await get_cluster_count(db) - - assert row_count == 99 - assert partition_count == 0 - # postgres12 also adds indices to the main table, hence there are more indices than partitions - assert index_count == (partition_count + 1) * len( - input_data["creation_options"]["indices"] - ) - assert cluster_count == 0 - - -@pytest.mark.asyncio -async def test_table_source_asset_lat_long(batch_client, async_client: AsyncClient): - _, logs = batch_client - - ############################ - # Setup test - ############################ - dataset = "table_test" - version = "v202002.1" - input_data: Dict = copy.deepcopy(basic_table_input_data) - input_data["creation_options"]["latitude"] = "latitude" - input_data["creation_options"]["longitude"] = "longitude" - input_data["creation_options"]["create_dynamic_vector_tile_cache"] = True - - ##################### - # Test asset creation - ##################### - asset = await create_default_asset( - dataset, - version, - version_payload=input_data, - execute_batch_jobs=True, - logs=logs, - async_client=async_client, - ) - asset_id = asset["asset_id"] - - ################# - # Check results - ################# - await check_version_status(dataset, version, 3) - # There should be an extra asset from the dynamic vector tile cache - await check_asset_status(dataset, version, 2) - - # There should be the following tasks: - # 1 to create the table schema - # 0 to partition because we didn't specify it - # 1 to load the data - # 1 to add point geometry - # 0 to add indices because we didn't specify it - # 0 to add clustering because we didn't specify it - await check_task_status(asset_id, 3, "add_point_geometry") - - # There should be a table called "table_test"."v202002.1" with 99 rows. - # It should have the right amount of partitions and indices - async with ContextEngine("READ"): - row_count = await get_row_count(db, dataset, version) - partition_count = await get_partition_count(db, dataset, version) - index_count = await get_index_count(db, dataset, version) - cluster_count = await get_cluster_count(db) - - assert row_count == 99 - assert partition_count == 0 - assert index_count == 0 - assert cluster_count == 0 - - -@pytest.mark.asyncio -async def test_table_source_asset_partition(batch_client, async_client: AsyncClient): - _, logs = batch_client - - ############################ - # Setup test - ############################ dataset = "table_test" version = "v202002.1" @@ -264,18 +41,51 @@ async def test_table_source_asset_partition(batch_client, async_client: AsyncCli # Year has only 52 weeks pass - input_data: Dict = copy.deepcopy(basic_table_input_data) - input_data["creation_options"]["partitions"] = { - "partition_type": "range", - "partition_column": "alert__date", - "partition_schema": partition_schema, + input_data = { + "creation_options": { + "source_type": "table", + "source_uri": [f"s3://{BUCKET}/{TSV_NAME}"], + "source_driver": "text", + "delimiter": "\t", + "has_header": True, + "latitude": "latitude", + "longitude": "longitude", + "cluster": { + "index_type": "btree", + "column_names": ["iso", "adm1", "adm2", "alert__date"], + }, + "partitions": { + "partition_type": "range", + "partition_column": "alert__date", + "partition_schema": partition_schema, + }, + "indices": [ + {"index_type": "gist", "column_names": ["geom"]}, + {"index_type": "gist", "column_names": ["geom_wm"]}, + { + "index_type": "btree", + "column_names": ["iso", "adm1", "adm2", "alert__date"], + }, + ], + "table_schema": [ + { + "name": "rspo_oil_palm__certification_status", + "data_type": "text", + }, + {"name": "per_forest_concession__type", "data_type": "text"}, + {"name": "idn_forest_area__type", "data_type": "text"}, + {"name": "alert__count", "data_type": "integer"}, + {"name": "adm1", "data_type": "integer"}, + {"name": "adm2", "data_type": "integer"}, + ], + }, + "metadata": version_metadata, } - partition_count_expected = len(partition_schema) - ##################### # Test asset creation ##################### + asset = await create_default_asset( dataset, version, @@ -286,168 +96,128 @@ async def test_table_source_asset_partition(batch_client, async_client: AsyncCli ) asset_id = asset["asset_id"] - ################# - # Check results - ################# - await check_version_status(dataset, version, 2) - await check_asset_status(dataset, version, 1) # There should be 1 asset - - # There should be the following tasks: - # 1 to create the table schema - # 4 to partition - # 1 to load the data - # 0 to add point geometry - # 0 to add indices - # 0 to add clustering because we didn't specify it - await check_task_status(asset_id, 6, "load_data_0") + await check_version_status(dataset, version, 3) + await check_asset_status(dataset, version, 1) + await check_task_status(asset_id, 14, "cluster_partitions_3") # There should be a table called "table_test"."v202002.1" with 99 rows. # It should have the right amount of partitions and indices async with ContextEngine("READ"): - row_count = await get_row_count(db, dataset, version) - partition_count = await get_partition_count(db, dataset, version) - index_count = await get_index_count(db, dataset, version) - cluster_count = await get_cluster_count(db) - - assert row_count == 99 - assert partition_count == partition_count_expected + count = await db.scalar( + db.text( + f""" + SELECT count(*) + FROM "{dataset}"."{version}";""" + ) + ) + partition_count = await db.scalar( + db.text( + f""" + SELECT count(i.inhrelid::regclass) + FROM pg_inherits i + WHERE i.inhparent = '"{dataset}"."{version}"'::regclass;""" + ) + ) + index_count = await db.scalar( + db.text( + f""" + SELECT count(indexname) + FROM pg_indexes + WHERE schemaname = '{dataset}' AND tablename like '{version}%';""" + ) + ) + cluster_count = await db.scalar( + db.text( + """ + SELECT count(relname) + FROM pg_class c + JOIN pg_index i ON i.indrelid = c.oid + WHERE relkind = 'r' AND relhasindex AND i.indisclustered""" + ) + ) + + assert count == 99 + assert partition_count == len(partition_schema) # postgres12 also adds indices to the main table, hence there are more indices than partitions - assert index_count == 0 - assert cluster_count == 0 - - -@pytest.mark.asyncio -async def test_table_source_asset_cluster(batch_client, async_client: AsyncClient): - _, logs = batch_client + assert index_count == (partition_count + 1) * len( + input_data["creation_options"]["indices"] + ) + assert cluster_count == len(partition_schema) - ############################ - # Setup test - ############################ - dataset = "table_test" - version = "v202002.1" - input_data: Dict = copy.deepcopy(basic_table_input_data) - input_data["creation_options"]["cluster"] = { - "index_type": "btree", - "column_names": ["iso", "adm1", "adm2", "alert__date"], - } - input_data["creation_options"]["indices"] = [ - {"index_type": "btree", "column_names": ["iso"]}, - {"index_type": "hash", "column_names": ["rspo_oil_palm__certification_status"]}, - {"index_type": "btree", "column_names": ["iso", "adm1", "adm2", "alert__date"]}, - ] + ######################## + # Append + ######################### - ##################### - # Test asset creation - ##################### - asset = await create_default_asset( - dataset, - version, - version_payload=input_data, - execute_batch_jobs=True, - logs=logs, - async_client=async_client, - ) - asset_id = asset["asset_id"] + httpx.delete(f"http://localhost:{PORT}") - ################# - # Check results - ################# - await check_version_status(dataset, version, 2) - await check_asset_status(dataset, version, 1) # There should be 1 asset - - # There should be the following tasks: - # 1 to create the table schema - # 0 to partition because we didn't specify it - # 1 to load the data - # 0 to add point geometry because we didn't specify it - # 3 to add indices - # 1 to add clustering - await check_task_status(asset_id, 6, "cluster_table") + # Create default asset in mocked BATCH - # There should be a table called "table_test"."v202002.1" with 99 rows. - # It should have the right amount of partitions and indices - async with ContextEngine("READ"): - row_count = await get_row_count(db, dataset, version) - partition_count = await get_partition_count(db, dataset, version) - index_count = await get_index_count(db, dataset, version) - cluster_count = await get_cluster_count(db) + httpx.delete(f"http://localhost:{PORT}") - assert row_count == 99 - assert partition_count == 0 - # postgres12 also adds indices to the main table, hence there are more indices than partitions - assert index_count == (partition_count + 1) * len( - input_data["creation_options"]["indices"] + response = await async_client.post( + f"/dataset/{dataset}/{version}/append", + json={"source_uri": [f"s3://{BUCKET}/{APPEND_TSV_NAME}"]}, ) - assert cluster_count == 1 + assert response.status_code == 200 + response = await async_client.get(f"/dataset/{dataset}/{version}/change_log") + assert response.status_code == 200 + tasks = json.loads(response.json()["data"][-1]["detail"]) + task_ids = [task["job_id"] for task in tasks] + # print(task_ids) -@pytest.mark.asyncio -async def test_table_source_asset_constraints(batch_client, async_client: AsyncClient): - _, logs = batch_client + # make sure, all jobs completed + status = await poll_jobs(task_ids, logs=logs, async_client=async_client) - ############################ - # Setup test - ############################ - dataset = "table_test" - version = "v202002.1" - input_data: Dict = copy.deepcopy(basic_table_input_data) - input_data["creation_options"]["constraints"] = [ - {"constraint_type": "unique", "column_names": ["adm1", "adm2", "alert__date"]} - ] + assert status == "saved" - ##################### - # Test asset creation - ##################### - asset = await create_default_asset( - dataset, - version, - version_payload=input_data, - execute_batch_jobs=True, - logs=logs, - async_client=async_client, - ) - asset_id = asset["asset_id"] + await check_version_status(dataset, version, 6) + await check_asset_status(dataset, version, 2) - ################# - # Check results - ################# - await check_version_status(dataset, version, 2) - await check_asset_status(dataset, version, 1) # There should be 1 asset - - # There should be the following tasks: - # 1 to create the table schema - # 0 to partition because we didn't specify it - # 1 to load the data - # 0 to add point geometry because we didn't specify it - # 0 to add indices - # 0 to add clustering because we didn't specify it - await check_task_status(asset_id, 2, "load_data_0") + # The table should now have 101 rows after append + async with ContextEngine("READ"): + count = await db.scalar( + db.text( + f""" + SELECT count(*) + FROM "{dataset}"."{version}";""" + ) + ) - # There should be a table called "table_test"."v202002.1" with 99 rows. - # It should have the right amount of partitions and indices + assert count == 101 + + # The table should now have no empty geometries async with ContextEngine("READ"): - row_count = await get_row_count(db, dataset, version) - partition_count = await get_partition_count(db, dataset, version) - index_count = await get_index_count(db, dataset, version) - cluster_count = await get_cluster_count(db) + count = await db.scalar( + db.text( + f""" + SELECT count(*) + FROM "{dataset}"."{version}" WHERE geom IS NULL;""" + ) + ) - assert row_count == 99 - assert partition_count == 0 - # postgres12 also adds indices to the main table, hence there are more indices than partitions - assert index_count == len(input_data["creation_options"]["constraints"]) - assert cluster_count == 0 + assert count == 0 +# @pytest.mark.skip( +# reason="Something weird going on with how I'm creating virtual CSVs. Fix later." +# ) @pytest.mark.asyncio -async def test_table_source_asset_everything(batch_client, async_client: AsyncClient): +async def test_table_source_asset_parallel(batch_client, async_client): _, logs = batch_client ############################ # Setup test ############################ + dataset = "table_test" version = "v202002.1" + s3_client = get_s3_client() + + for i in range(2, 101): + s3_client.upload_file(TSV_PATH, BUCKET, f"test_{i}.tsv") + # define partition schema partition_schema = list() years = range(2018, 2021) @@ -465,34 +235,46 @@ async def test_table_source_asset_everything(batch_client, async_client: AsyncCl # Year has only 52 weeks pass - input_data: Dict = copy.deepcopy(basic_table_input_data) - input_data["creation_options"]["timeout"] = 600 - input_data["creation_options"]["latitude"] = "latitude" - input_data["creation_options"]["longitude"] = "longitude" - input_data["creation_options"]["create_dynamic_vector_tile_cache"] = True - input_data["creation_options"]["partitions"] = { - "partition_type": "range", - "partition_column": "alert__date", - "partition_schema": partition_schema, - } - input_data["creation_options"]["constraints"] = [ - {"constraint_type": "unique", "column_names": ["adm1", "adm2", "alert__date"]} - ] - input_data["creation_options"]["indices"] = [ - {"index_type": "btree", "column_names": ["iso"]}, - {"index_type": "hash", "column_names": ["rspo_oil_palm__certification_status"]}, - {"index_type": "btree", "column_names": ["iso", "adm1", "adm2", "alert__date"]}, - ] - input_data["creation_options"]["cluster"] = { - "index_type": "btree", - "column_names": ["iso", "adm1", "adm2", "alert__date"], + input_data = { + "creation_options": { + "source_type": "table", + "source_uri": [f"s3://{BUCKET}/{TSV_NAME}"] + + [f"s3://{BUCKET}/test_{i}.tsv" for i in range(2, 101)], + "source_driver": "text", + "delimiter": "\t", + "has_header": True, + "latitude": "latitude", + "longitude": "longitude", + "cluster": {"index_type": "gist", "column_names": ["geom_wm"]}, + "partitions": { + "partition_type": "range", + "partition_column": "alert__date", + "partition_schema": partition_schema, + }, + "indices": [ + {"index_type": "gist", "column_names": ["geom"]}, + {"index_type": "gist", "column_names": ["geom_wm"]}, + {"index_type": "btree", "column_names": ["alert__date"]}, + ], + "table_schema": [ + { + "name": "rspo_oil_palm__certification_status", + "data_type": "text", + }, + {"name": "per_forest_concession__type", "data_type": "text"}, + {"name": "idn_forest_area__type", "data_type": "text"}, + {"name": "alert__count", "data_type": "integer"}, + {"name": "adm1", "data_type": "integer"}, + {"name": "adm2", "data_type": "integer"}, + ], + }, + "metadata": version_metadata, } - partition_count_expected = len(partition_schema) - ##################### # Test asset creation ##################### + asset = await create_default_asset( dataset, version, @@ -503,198 +285,50 @@ async def test_table_source_asset_everything(batch_client, async_client: AsyncCl ) asset_id = asset["asset_id"] - ################# - # Check results - ################# await check_version_status(dataset, version, 3) - # There should be an extra asset from the dynamic vector tile cache - await check_asset_status(dataset, version, 2) - - # There should be the following tasks: - # 1 to create the table schema - # 4 to partition - # 1 to load the data - # 1 to add point geometry - # 3 to add indices - # 4 to add clustering - await check_task_status(asset_id, 14, "cluster_partitions_3") + await check_asset_status(dataset, version, 1) + await check_task_status(asset_id, 26, "cluster_partitions_3") # There should be a table called "table_test"."v202002.1" with 99 rows. # It should have the right amount of partitions and indices async with ContextEngine("READ"): - row_count = await get_row_count(db, dataset, version) - partition_count = await get_partition_count(db, dataset, version) - index_count = await get_index_count(db, dataset, version) - cluster_count = await get_cluster_count(db) - - assert row_count == 99 - assert partition_count == partition_count_expected - # Disclaimer: These next two values are observed... I have no idea how - # to calculate whether or not they are correct. Probably by some - # multiplication of indices, partitions, constraints, and clusters - assert index_count == 632 - assert cluster_count == 157 - - -@pytest.mark.asyncio -async def test_table_source_asset_append(batch_client, async_client: AsyncClient): - _, logs = batch_client - - ############################ - # Setup test - ############################ - dataset = "table_test" - version = "v202002.1" - input_data: Dict = copy.deepcopy(basic_table_input_data) - - ##################### - # Test asset creation - ##################### - asset = await create_default_asset( - dataset, - version, - version_payload=input_data, - execute_batch_jobs=True, - logs=logs, - async_client=async_client, - ) - asset_id = asset["asset_id"] - - ################# - # Check results - ################# - await check_version_status(dataset, version, 2) - await check_asset_status(dataset, version, 1) # There should be 1 asset - - # There should be the following tasks: - # 1 to create the table schema - # 0 to partition because we didn't specify it - # 1 to load the data - # 0 to add point geometry because we didn't specify it - # 0 to add indices because we didn't specify them - # 0 to add clustering because we didn't specify it - await check_task_status(asset_id, 2, "load_data_0") - - ######################## - # Append - ######################### - httpx.delete(f"http://localhost:{PORT}") - - response = await async_client.post( - f"/dataset/{dataset}/{version}/append", - json={"source_uri": [f"s3://{BUCKET}/{APPEND_TSV_NAME}"]}, + count = await db.scalar( + db.text( + f""" + SELECT count(*) + FROM "{dataset}"."{version}";""" + ) + ) + partition_count = await db.scalar( + db.text( + f""" + SELECT count(i.inhrelid::regclass) + FROM pg_inherits i + WHERE i.inhparent = '"{dataset}"."{version}"'::regclass;""" + ) + ) + index_count = await db.scalar( + db.text( + f""" + SELECT count(indexname) + FROM pg_indexes + WHERE schemaname = '{dataset}' AND tablename like '{version}%';""" + ) + ) + cluster_count = await db.scalar( + db.text( + """ + SELECT count(relname) + FROM pg_class c + JOIN pg_index i ON i.indrelid = c.oid + WHERE relkind = 'r' AND relhasindex AND i.indisclustered""" + ) + ) + + assert count == 9900 + assert partition_count == len(partition_schema) + # postgres12 also adds indices to the main table, hence there are more indices than partitions + assert index_count == (partition_count + 1) * len( + input_data["creation_options"]["indices"] ) - assert response.status_code == 200 - - response = await async_client.get(f"/dataset/{dataset}/{version}/change_log") - assert response.status_code == 200 - tasks = json.loads(response.json()["data"][-1]["detail"]) - task_ids = [task["job_id"] for task in tasks] - - # make sure all jobs completed - status = await poll_jobs(task_ids, logs=logs, async_client=async_client) - assert status == "saved" - - await check_version_status(dataset, version, 4) - await check_asset_status(dataset, version, 1) - - -@pytest.mark.asyncio -async def test_table_source_asset_too_many_columns_in_unique_constraint( - async_client: AsyncClient, -): - dataset = "table_test" - version = "v202002.1" - input_data: Dict = copy.deepcopy(basic_table_input_data) - input_data["creation_options"]["constraints"] = [ - { - "constraint_type": "unique", - "column_names": [f"a_{letter}" for letter in string.ascii_letters], - } - ] - - await create_dataset(dataset, async_client) - - resp = await async_client.put(f"/dataset/{dataset}/{version}", json=input_data) - - assert resp.status_code == 422 - assert "ensure this value has at most 32 items" in resp.text - - -@pytest.mark.asyncio -async def test_table_source_asset_no_columns_in_unique_constraint( - async_client: AsyncClient, -): - dataset = "table_test" - version = "v202002.1" - input_data: Dict = copy.deepcopy(basic_table_input_data) - input_data["creation_options"]["constraints"] = [ - {"constraint_type": "unique", "column_names": []} - ] - - await create_dataset(dataset, async_client) - - resp = await async_client.put(f"/dataset/{dataset}/{version}", json=input_data) - - assert resp.status_code == 422 - assert "ensure this value has at least 1 items" in resp.text - - -@pytest.mark.asyncio -async def test_table_source_asset_too_many_unique_constraints( - async_client: AsyncClient, -): - dataset = "table_test" - version = "v202002.1" - input_data: Dict = copy.deepcopy(basic_table_input_data) - input_data["creation_options"]["constraints"] = [ - {"constraint_type": "unique", "column_names": ["foo", "bar"]}, - { - "constraint_type": "unique", - "column_names": [f"a_{letter}" for letter in string.ascii_lowercase], - }, - ] - - await create_dataset(dataset, async_client) - - resp = await async_client.put(f"/dataset/{dataset}/{version}", json=input_data) - - assert resp.status_code == 422 - assert "Currently cannot specify more than 1 unique constraint" in resp.text - - -@pytest.mark.asyncio -async def test_table_source_asset_too_many_columns_in_index(async_client: AsyncClient): - dataset = "table_test" - version = "v202002.1" - input_data: Dict = copy.deepcopy(basic_table_input_data) - input_data["creation_options"]["indices"] = [ - { - "index_type": "btree", - "column_names": [f"a_{letter}" for letter in string.ascii_letters], - } - ] - - await create_dataset(dataset, async_client) - - resp = await async_client.put(f"/dataset/{dataset}/{version}", json=input_data) - - assert resp.status_code == 422 - assert "ensure this value has at most 32 items" in resp.text - - -@pytest.mark.asyncio -async def test_table_source_asset_no_columns_in_index(async_client: AsyncClient): - dataset = "table_test" - version = "v202002.1" - input_data: Dict = copy.deepcopy(basic_table_input_data) - input_data["creation_options"]["indices"] = [ - {"index_type": "btree", "column_names": []} - ] - - await create_dataset(dataset, async_client) - - resp = await async_client.put(f"/dataset/{dataset}/{version}", json=input_data) - - assert resp.status_code == 422 - assert "ensure this value has at least 1 items" in resp.text + assert cluster_count == len(partition_schema) diff --git a/tests/tasks/test_vector_source_assets.py b/tests/tasks/test_vector_source_assets.py index d9ffbca04..f2ba6222f 100644 --- a/tests/tasks/test_vector_source_assets.py +++ b/tests/tasks/test_vector_source_assets.py @@ -13,6 +13,7 @@ from app.models.orm.tasks import Task as ORMTask from app.models.pydantic.geostore import Geometry, GeostoreCommon +from ..utils import create_default_asset, version_metadata, poll_jobs from .. import ( BUCKET, CSV2_NAME, @@ -24,7 +25,6 @@ PORT, SHP_NAME, ) -from ..utils import create_default_asset, poll_jobs, version_metadata from . import ( check_asset_status, check_dynamic_vector_tile_cache_status, @@ -73,7 +73,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): asset_id = asset["asset_id"] await check_version_status(dataset, version, 3) - await check_asset_status(dataset, version, 2) + await check_asset_status(dataset, version, 1) await check_task_status(asset_id, 8, "inherit_from_geostore") # There should be a table called "test"."v1.1.1" with one row @@ -163,7 +163,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "integer", "is_feature_info": True, "is_filter": True, - "unit": None, + "unit": None }, { "name": "fid", @@ -172,7 +172,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "numeric", "is_feature_info": True, "is_filter": True, - "unit": None, + "unit": None }, { "name": "geom", @@ -181,7 +181,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "geometry", "is_feature_info": False, "is_filter": False, - "unit": None, + "unit": None }, { "name": "geom_wm", @@ -190,7 +190,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "geometry", "is_feature_info": False, "is_filter": False, - "unit": None, + "unit": None }, { "name": "gfw_area__ha", @@ -199,7 +199,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "numeric", "is_feature_info": True, "is_filter": True, - "unit": None, + "unit": None }, { "name": "gfw_geostore_id", @@ -208,7 +208,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "uuid", "is_feature_info": True, "is_filter": True, - "unit": None, + "unit": None }, { "name": "gfw_geojson", @@ -217,7 +217,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "text", "is_feature_info": False, "is_filter": False, - "unit": None, + "unit": None }, { "name": "gfw_bbox", @@ -226,7 +226,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "ARRAY", "is_feature_info": False, "is_filter": False, - "unit": None, + "unit": None }, { "name": "created_on", @@ -235,7 +235,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "timestamp without time zone", "is_feature_info": False, "is_filter": False, - "unit": None, + "unit": None }, { "name": "updated_on", @@ -244,7 +244,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "timestamp without time zone", "is_feature_info": False, "is_filter": False, - "unit": None, + "unit": None }, ] else: @@ -257,7 +257,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "integer", "is_feature_info": True, "is_filter": True, - "unit": None, + "unit": None }, { "name": "geom", @@ -266,7 +266,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "geometry", "is_feature_info": False, "is_filter": False, - "unit": None, + "unit": None }, { "name": "geom_wm", @@ -275,7 +275,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "geometry", "is_feature_info": False, "is_filter": False, - "unit": None, + "unit": None }, { "name": "gfw_area__ha", @@ -284,7 +284,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "numeric", "is_feature_info": True, "is_filter": True, - "unit": None, + "unit": None }, { "name": "gfw_geostore_id", @@ -293,7 +293,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "uuid", "is_feature_info": True, "is_filter": True, - "unit": None, + "unit": None }, { "name": "gfw_geojson", @@ -302,7 +302,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "text", "is_feature_info": False, "is_filter": False, - "unit": None, + "unit": None }, { "name": "gfw_bbox", @@ -311,7 +311,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "ARRAY", "is_feature_info": False, "is_filter": False, - "unit": None, + "unit": None }, { "name": "created_on", @@ -320,7 +320,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "timestamp without time zone", "is_feature_info": False, "is_filter": False, - "unit": None, + "unit": None }, { "name": "updated_on", @@ -329,7 +329,7 @@ async def test_vector_source_asset(batch_client, async_client: AsyncClient): "data_type": "timestamp without time zone", "is_feature_info": False, "is_filter": False, - "unit": None, + "unit": None }, ] @@ -369,7 +369,7 @@ async def test_vector_source_asset_csv(batch_client, async_client: AsyncClient): asset_id = asset["asset_id"] await check_version_status(dataset, version, 3) - await check_asset_status(dataset, version, 2) + await check_asset_status(dataset, version, 1) await check_task_status(asset_id, 8, "inherit_from_geostore") # There should be a table called "test"."v1.1.1" with one row @@ -478,4 +478,4 @@ async def test_vector_source_asset_geojson_append( # Now "test"."v1.1.1" should have an additional row async with ContextEngine("READ"): count = await db.scalar(db.text(f'SELECT count(*) FROM {dataset}."{version}"')) - assert count == 2 \ No newline at end of file + assert count == 2 diff --git a/tests/utils.py b/tests/utils.py index 307578ea4..8a778b619 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -10,7 +10,6 @@ import rasterio from affine import Affine from botocore.exceptions import ClientError -from gino import Gino from rasterio.crs import CRS from app.crud import tasks @@ -62,11 +61,8 @@ async def create_version( ) -> Dict[str, Any]: resp = await async_client.put(f"/dataset/{dataset}/{version}", json=payload) - try: - assert resp.json()["status"] == "success" - except AssertionError: - print(f"UNSUCCESSFUL PUT RESPONSE: {resp.json()}") - raise + assert resp.json()["status"] == "success" + return resp.json()["data"] @@ -325,51 +321,3 @@ def upload_fake_data(dtype, dtype_name, no_data, prefix, data): DATA_LAKE_BUCKET, f"{prefix}/{data_file_name}", ) - - -async def get_row_count(db: Gino, dataset: str, version: str) -> int: - count = await db.scalar( - db.text( - f""" - SELECT count(*) - FROM "{dataset}"."{version}";""" - ) - ) - return int(count) - - -async def get_partition_count(db: Gino, dataset: str, version: str) -> int: - partition_count = await db.scalar( - db.text( - f""" - SELECT count(i.inhrelid::regclass) - FROM pg_inherits i - WHERE i.inhparent = '"{dataset}"."{version}"'::regclass;""" - ) - ) - return int(partition_count) - - -async def get_index_count(db: Gino, dataset: str, version: str) -> int: - index_count = await db.scalar( - db.text( - f""" - SELECT count(indexname) - FROM pg_indexes - WHERE schemaname = '{dataset}' AND tablename like '{version}%';""" - ) - ) - return int(index_count) - - -async def get_cluster_count(db: Gino) -> int: - cluster_count = await db.scalar( - db.text( - """ - SELECT count(relname) - FROM pg_class c - JOIN pg_index i ON i.indrelid = c.oid - WHERE relkind = 'r' AND relhasindex AND i.indisclustered""" - ) - ) - return int(cluster_count) diff --git a/tests_v2/conftest.py b/tests_v2/conftest.py index 1c6592003..b2b8a3ae0 100644 --- a/tests_v2/conftest.py +++ b/tests_v2/conftest.py @@ -36,7 +36,8 @@ ) -@pytest_asyncio.fixture +@pytest.fixture() +@pytest.mark.asyncio async def db(): """In between tests, tear down/set up all DBs.""" main(["--raiseerr", "upgrade", "head"])