Skip to content

Commit

Permalink
Revert "Staging -> Production (VIIRS metadata and unique constraint c…
Browse files Browse the repository at this point in the history
…reation options PRs)"
  • Loading branch information
solomon-negusse authored May 25, 2023
1 parent f554eac commit 37169f3
Show file tree
Hide file tree
Showing 17 changed files with 362 additions and 859 deletions.
34 changes: 34 additions & 0 deletions app/crud/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,40 @@ async def get_assets(dataset: str, version: str) -> List[ORMAsset]:

return rows

@alru_cache(maxsize=128)
async def get_raster_tile_sets():
latest_tile_sets = await (
ORMAsset.join(ORMVersion)
.select()
.with_only_columns(
[
ORMAsset.asset_id,
ORMAsset.dataset,
ORMAsset.version,
ORMAsset.creation_options,
ORMAsset.asset_uri,
]
)
.where(
and_(
ORMAsset.asset_type == AssetType.raster_tile_set,
ORMVersion.is_latest == True, # noqa: E712
)
)
).gino.all()

assets_with_metadata = []
for asset in latest_tile_sets:
asset_dict = dict(asset.items())
try:
metadata = await get_asset_metadata(asset.asset_id)
asset_dict["metadata"] = metadata
except RecordNotFoundError:
pass
assets_with_metadata.append(asset_dict)

return assets_with_metadata


async def get_assets_by_type(asset_type: str) -> List[ORMAsset]:
assets = await ORMAsset.query.where(ORMAsset.asset_type == asset_type).gino.all()
Expand Down
4 changes: 2 additions & 2 deletions app/crud/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ async def create_version_metadata(dataset: str, version: str, **data):

content_date_range = data.pop("content_date_range", None)
if content_date_range:
data["content_start_date"] = content_date_range.get("start_date")
data["content_end_date"] = content_date_range.get("end_date")
data["content_start_date"] = content_date_range["start_date"]
data["content_end_date"] = content_date_range["end_date"]

try:
new_metadata: ORMVersionMetadata = await ORMVersionMetadata.create(
Expand Down
5 changes: 0 additions & 5 deletions app/models/enum/creation_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ class IndexType(str, Enum):
hash = "hash"


class ConstraintType(str, Enum):
__doc__ = "Constraint type"
unique = "unique"


class TileStrategy(str, Enum):
__doc__ = (
"Tile strategy for generating vector tiles. "
Expand Down
23 changes: 0 additions & 23 deletions app/models/orm/queries/raster_assets.py

This file was deleted.

40 changes: 4 additions & 36 deletions app/models/pydantic/creation_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from ...settings.globals import DEFAULT_JOB_DURATION, PIXETL_DEFAULT_RESAMPLING
from ..enum.assets import AssetType, is_default_asset
from ..enum.creation_options import (
ConstraintType,
Delimiters,
IndexType,
PartitionType,
Expand Down Expand Up @@ -47,27 +46,9 @@
class Index(StrictBaseModel):
index_type: IndexType
column_names: List[str] = Field(
...,
description="Columns to be used by index",
regex=COLUMN_REGEX,
min_items=1,
max_items=32, # A PostgreSQL upper limit
)


class Constraint(StrictBaseModel):
constraint_type: ConstraintType
column_names: List[str] = Field(
...,
description="Columns included in the constraint",
regex=COLUMN_REGEX,
min_items=1,
max_items=32, # A PostgreSQL upper limit
..., description="Columns to be used by index", regex=COLUMN_REGEX
)

class Config:
orm_mode = True


class HashPartitionSchema(StrictBaseModel):
partition_count: PositiveInt
Expand Down Expand Up @@ -235,6 +216,7 @@ def validate_source_uri(cls, v, values, **kwargs):
class TableAssetCreationOptions(StrictBaseModel):
has_header: bool = Field(True, description="Input file has header. Must be true")
delimiter: Delimiters = Field(..., description="Delimiter used in input file")

latitude: Optional[str] = Field(
None, description="Column with latitude coordinate", regex=COLUMN_REGEX
)
Expand All @@ -248,9 +230,6 @@ class TableAssetCreationOptions(StrictBaseModel):
None, description="Partitioning schema (optional)"
)
indices: List[Index] = Field([], description="List of indices to add to table")
constraints: Optional[List[Constraint]] = Field(
None, description="List of constraints to add to table. (optional)"
)
table_schema: Optional[List[FieldType]] = Field(
None,
description="List of Field Types. Missing field types will be inferred. (optional)",
Expand All @@ -263,17 +242,6 @@ class TableAssetCreationOptions(StrictBaseModel):
)
timeout: int = DEFAULT_JOB_DURATION

@validator("constraints")
def validate_max_1_unique_constraints(cls, v, values, **kwargs):
if v is not None:
unique_constraints = [
c for c in v if c.constraint_type == ConstraintType.unique
]
assert (
len(unique_constraints) < 2
), "Currently cannot specify more than 1 unique constraint"
return v


class TableSourceCreationOptions(TableAssetCreationOptions):
source_type: TableSourceType = Field(..., description="Source type of input file.")
Expand Down Expand Up @@ -381,18 +349,18 @@ class StaticVectorFileCreationOptions(StrictBaseModel):


SourceCreationOptions = Union[
TableSourceCreationOptions,
RasterTileSetSourceCreationOptions,
TableSourceCreationOptions,
VectorSourceCreationOptions,
]

OtherCreationOptions = Union[
TableAssetCreationOptions,
RasterTileCacheCreationOptions,
StaticVectorTileCacheCreationOptions,
StaticVectorFileCreationOptions,
DynamicVectorTileCacheCreationOptions,
RasterTileSetAssetCreationOptions,
TableAssetCreationOptions,
]

CreationOptions = Union[SourceCreationOptions, OtherCreationOptions]
Expand Down
21 changes: 14 additions & 7 deletions app/routes/datasets/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

# from ...authentication.api_keys import get_api_key
from ...crud import assets
from ...models.orm.queries.raster_assets import latest_raster_tile_sets
from ...models.enum.assets import AssetType
from ...models.enum.creation_options import Delimiters
from ...models.enum.geostore import GeostoreOrigin
Expand Down Expand Up @@ -647,11 +646,11 @@ def _get_default_layer(dataset, pixel_meaning):

async def _get_data_environment(grid: Grid) -> DataEnvironment:
# get all Raster tile set assets
latest_tile_sets = await db.all(latest_raster_tile_sets)
latest_tile_sets = await assets.get_raster_tile_sets()
# create layers
layers: List[Layer] = []
for row in latest_tile_sets:
creation_options = row.creation_options
creation_options = row["creation_options"]
if creation_options["grid"] != grid:
# skip if not on the right grid
continue
Expand All @@ -665,9 +664,13 @@ async def _get_data_environment(grid: Grid) -> DataEnvironment:
continue

if creation_options["pixel_meaning"] == "is":
source_layer_name = f"{creation_options['pixel_meaning']}__{row.dataset}"
source_layer_name = (
f"{creation_options['pixel_meaning']}__{row['dataset']}"
)
else:
source_layer_name = f"{row.dataset}__{creation_options['pixel_meaning']}"
source_layer_name = (
f"{row['dataset']}__{creation_options['pixel_meaning']}"
)

no_data_val = parse_obj_as(
Optional[Union[List[NoDataType], NoDataType]],
Expand All @@ -676,10 +679,14 @@ async def _get_data_environment(grid: Grid) -> DataEnvironment:
if isinstance(no_data_val, List):
no_data_val = no_data_val[0]

raster_table = getattr(row, "values_table", None)
bands = getattr(row.get("metadata"), "bands", [])
raster_table = None
if bands:
raster_table = RasterTable(**bands[0].to_dict()["values_table"])
print("TABLE", raster_table)
layers.append(
_get_source_layer(
row.asset_uri,
row['asset_uri'],
source_layer_name,
grid,
no_data_val,
Expand Down
20 changes: 4 additions & 16 deletions app/tasks/table_source_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Any, Dict, List, Optional
from uuid import UUID

from ..models.enum.creation_options import ConstraintType
from ..models.pydantic.change_log import ChangeLog
from ..models.pydantic.creation_options import (
Index,
Expand All @@ -23,6 +22,7 @@ async def table_source_asset(
asset_id: UUID,
input_data: Dict[str, Any],
) -> ChangeLog:

creation_options = TableSourceCreationOptions(**input_data["creation_options"])
if creation_options.source_uri:
source_uris: List[str] = creation_options.source_uri
Expand All @@ -40,14 +40,9 @@ async def table_source_asset(
version,
"-s",
source_uris[0],
"-m",
json.dumps(creation_options.dict(by_alias=True)["table_schema"]),
]
if creation_options.table_schema:
command.extend(
[
"-m",
json.dumps(creation_options.dict(by_alias=True)["table_schema"]),
]
)
if creation_options.partitions:
command.extend(
[
Expand All @@ -57,13 +52,6 @@ async def table_source_asset(
creation_options.partitions.partition_column,
]
)
if creation_options.constraints:
unique_constraint_columns = []
for constraint in creation_options.constraints:
if constraint.constraint_type == ConstraintType.unique:
unique_constraint_columns += constraint.column_names

command.extend(["-u", ",".join(unique_constraint_columns)])

job_env: List[Dict[str, Any]] = writer_secrets + [
{"name": "ASSET_ID", "value": str(asset_id)}
Expand Down Expand Up @@ -407,7 +395,7 @@ def _create_cluster_jobs(

if partitions:
# When using partitions we need to cluster each partition table separately.
# Play it safe and cluster partition tables one after the other.
# Playing it save and cluster partition tables one after the other.
# TODO: Still need to test if we can cluster tables which are part of the same partition concurrently.
# this would speed up this step by a lot. Partitions require a full lock on the table,
# but I don't know if the lock is acquired for the entire partition or only the partition table.
Expand Down
21 changes: 21 additions & 0 deletions batch/scripts/append_data.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

set -e

# requires arguments
# -d | --dataset
# -v | --version
# -s | --source
# -D | --delimiter
ME=$(basename "$0")
. get_arguments.sh "$@"


# Unescape TAB character
if [ "$DELIMITER" == "\t" ]; then
DELIMITER=$(echo -e "\t")
fi

for uri in "${SRC[@]}"; do
aws s3 cp "${uri}" - | psql -c "COPY \"$DATASET\".\"$VERSION\" FROM STDIN WITH (FORMAT CSV, DELIMITER '$DELIMITER', HEADER)"
done
Loading

0 comments on commit 37169f3

Please sign in to comment.