Skip to content

Commit

Permalink
Merge pull request #236 from wri/feature-aliasing
Browse files Browse the repository at this point in the history
Feature version aliasing
  • Loading branch information
solomon-negusse authored Oct 14, 2021
2 parents d61b550 + 8bef0fb commit 6f381a8
Show file tree
Hide file tree
Showing 14 changed files with 508 additions and 9 deletions.
34 changes: 34 additions & 0 deletions app/crud/aliases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from asyncpg import UniqueViolationError

from app.errors import RecordAlreadyExistsError, RecordNotFoundError
from app.models.orm.aliases import Alias as ORMAlias


async def create_alias(alias: str, dataset: str, version: str) -> ORMAlias:
try:
new_alias: ORMAlias = await ORMAlias.create(
alias=alias, dataset=dataset, version=version
)
except UniqueViolationError:
raise RecordAlreadyExistsError(
f"Alias {alias} already exists for dataset {dataset}"
)

return new_alias


async def get_alias(dataset: str, alias: str) -> ORMAlias:
alias_record: ORMAlias = await ORMAlias.get([alias, dataset])
if alias_record is None:
raise RecordNotFoundError(f"Could not find requested alias {alias}.")

return alias_record


async def delete_alias(dataset, alias: str) -> ORMAlias:
alias_record: ORMAlias = await get_alias(dataset, alias)
await ORMAlias.delete.where(ORMAlias.dataset == dataset).where(
ORMAlias.alias == alias
).gino.status()

return alias_record
21 changes: 19 additions & 2 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@
from app.errors import http_error_handler

from .application import app
from .middleware import no_cache_response_header, redirect_latest, set_db_mode
from .middleware import (
no_cache_response_header,
redirect_alias_to_version,
redirect_latest,
set_db_mode,
)
from .routes import health
from .routes.analysis import analysis
from .routes.assets import asset, assets
from .routes.authentication import authentication
from .routes.datasets import aliases
from .routes.datasets import asset as version_asset
from .routes.datasets import (
dataset,
Expand Down Expand Up @@ -90,7 +96,12 @@ async def rve_error_handler(
# MIDDLEWARE
#################

MIDDLEWARE = (set_db_mode, redirect_latest, no_cache_response_header)
MIDDLEWARE = (
set_db_mode,
redirect_latest,
no_cache_response_header,
redirect_alias_to_version,
)

for m in MIDDLEWARE:
app.add_middleware(BaseHTTPMiddleware, dispatch=m)
Expand Down Expand Up @@ -162,6 +173,12 @@ async def rve_error_handler(
for r in analysis_routers:
app.include_router(r, prefix="/analysis")

###############
# ALIAS API
###############

app.include_router(aliases.router, prefix="/alias")

###############
# HEALTH API
###############
Expand Down
42 changes: 40 additions & 2 deletions app/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from fastapi.responses import ORJSONResponse, RedirectResponse

from .application import ContextEngine
from .crud.aliases import get_alias
from .crud.versions import get_latest_version
from .errors import BadRequestError, RecordNotFoundError, http_error_handler

Expand All @@ -25,10 +26,9 @@ async def set_db_mode(request: Request, call_next):
async def redirect_latest(request: Request, call_next):
"""Redirect all GET requests using latest version to actual version number.
Redirect only POST requests to for query and download endpoints, as
Redirect only POST requests for query and download endpoints, as
other POST endpoints will require to list version number explicitly.
"""

if (request.method == "GET" and "latest" in request.url.path) or (
request.method == "POST"
and "latest" in request.url.path
Expand Down Expand Up @@ -77,6 +77,44 @@ async def redirect_latest(request: Request, call_next):
return response


async def redirect_alias_to_version(request: Request, call_next):
"""Redirect version request by alias to the actual dataset version.
For GET requests matching /dataset/{dataset}/{version} OR POST
requests matching /dataset/{dataset}/{version}/download or
/dataset/{dataset}/{version}/query, this will check if {version}
matches an existing alias and redirect to the dataset version
associated with the alias.
"""

path_items = request.url.path.split("/")
is_dataset_version_path = len(path_items) >= 4 and path_items[1] == "dataset"
is_allowed_post_request = request.method == "POST" and (
"query" in request.url.path or "download" in request.url.path
)
if not is_dataset_version_path:
response = await call_next(request)
return response
if request.method not in ["GET", "POST"] or (
request.method == "POST" and not is_allowed_post_request
):
response = await call_next(request)
return response

dataset, version = path_items[2:4]
try:
alias = await get_alias(dataset, version)
path_items[3] = alias.version
url = "/".join(path_items)
if request.query_params:
url = f"{url}?{request.query_params}"
return RedirectResponse(url=url)

except RecordNotFoundError:
response = await call_next(request)
return response


async def no_cache_response_header(request: Request, call_next):
"""This middleware adds a cache control response header.
Expand Down
16 changes: 16 additions & 0 deletions app/models/orm/aliases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from .base import Base, db


class Alias(Base):
__tablename__ = "aliases"
alias = db.Column(db.String, primary_key=True)
dataset = db.Column(db.String, primary_key=True)
version = db.Column(db.String, nullable=False)

fk = db.ForeignKeyConstraint(
["dataset", "version"],
["versions.dataset", "versions.version"],
name="fk",
onupdate="CASCADE",
ondelete="CASCADE",
)
86 changes: 86 additions & 0 deletions app/models/orm/migrations/migrate_asset_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Add version column to all data tables."""
import os

import click
import sqlalchemy as sa


def get_datasets(connection):
datasets = connection.execute(sa.text("select * from public.versions"))
return datasets.fetchall()


def upgrade(connection):
versions = get_datasets(connection)
for version in versions:
if not connection.engine.has_table(version.version, schema=version.dataset) or version.dataset == "nasa_viirs_fire_alerts":
continue
print(f"Adding version column to {version.dataset}.{version.version}")
connection.execute(
sa.text(
f"""ALTER TABLE "{version.dataset}"."{version.version}"
ADD COLUMN IF NOT EXISTS "version" VARCHAR
"""
)
)

null_rows = connection.execute(
sa.text(f"""select count(*) from "{version.dataset}"."{version.version}" where version IS NULL""")
).first()
print('NULL rows', null_rows)
if null_rows[0] == 0:
print(f"{version.dataset}.{version.version} updated already")
continue
connection.execute(
sa.text(
f"""UPDATE "{version.dataset}"."{version.version}" SET version = '{version.version}'
"""
)
)
connection.execute(
sa.text(
f"""ALTER TABLE "{version.dataset}"."{version.version}"
ALTER version SET NOT NULL
"""
)
)


def downgrade(connection):
versions = get_datasets(connection)
for version in versions:
if not connection.engine.has_table(version.version, schema=version.dataset):
continue
print(f"Dropping version column from {version.dataset}.{version.version} table")
connection.execute(
sa.text(
f"""ALTER TABLE "{version.dataset}"."{version.version}"
DROP COLUMN IF EXISTS version
"""
)
)


@click.command()
@click.argument("operation", type=click.Choice(["upgrade", "downgrade"]))
def migrate(operation):
db_user = os.environ["DB_USERNAME"]
db_pass = os.environ["DB_PASSWORD"]
db_host = os.environ["DB_HOST"]
db_port = os.environ["DB_PORT"]
db_name = os.environ["DB_NAME"]
engine = sa.create_engine(
f"postgresql://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}" # pragma: allowlist secret
)

with engine.connect() as connection:
if operation == "upgrade":
upgrade(connection)
elif operation == "downgrade":
downgrade(connection)
else:
raise ValueError("Operation not supported.")


if __name__ == "__main__":
migrate()
41 changes: 41 additions & 0 deletions app/models/orm/migrations/versions/a5787f2eefe5_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Adding dataset version alias table.
Revision ID: a5787f2eefe5
Revises: 4763f4b8141a
Create Date: 2021-09-27 22:12:26.964711
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "a5787f2eefe5"
down_revision = "4763f4b8141a" # pragma: allowlist secret
branch_labels = None
depends_on = None


def upgrade():
op.create_table(
"aliases",
sa.Column("alias", sa.String(), nullable=False),
sa.Column("dataset", sa.String(), nullable=False),
sa.Column("version", sa.String(), nullable=False),
sa.Column(
"created_on", sa.DateTime(), server_default=sa.text("now()"), nullable=True
),
sa.Column(
"updated_on", sa.DateTime(), server_default=sa.text("now()"), nullable=True
),
sa.ForeignKeyConstraint(
["dataset", "version"],
["versions.dataset", "versions.version"],
name="fk",
onupdate="CASCADE",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("dataset", "alias"),
)


def downgrade():
op.drop_table("aliases")
19 changes: 19 additions & 0 deletions app/models/pydantic/aliases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from .base import StrictBaseModel
from .responses import Response


class Alias(StrictBaseModel):
alias: str
version: str
dataset: str

class Config:
orm_mode = True


class AliasResponse(Response):
data: Alias


class AliasCreateIn(StrictBaseModel):
version: str
28 changes: 27 additions & 1 deletion app/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from fastapi import Depends, HTTPException, Path
from fastapi.security import OAuth2PasswordBearer

from ..crud.aliases import get_alias
from ..crud.versions import get_version
from ..errors import RecordNotFoundError

Expand Down Expand Up @@ -44,6 +45,31 @@ async def dataset_version_dependency(
try:
await get_version(dataset, version)
except RecordNotFoundError as e:
raise HTTPException(status_code=404, detail=(str(e)))
try:
version_alias = await get_alias(dataset, version)
if version_alias is not None:
raise HTTPException(
status_code=400,
detail="Getting version by alias is not supported for this operation.",
)
except RecordNotFoundError:
raise HTTPException(status_code=404, detail=str(e))

return dataset, version


async def create_dataset_version_dependency(
dataset: str = Depends(dataset_dependency),
version: str = Depends(version_dependency),
) -> Tuple[str, str]:
try:
version_alias = await get_alias(dataset, version)
if version_alias is not None:
raise HTTPException(
status_code=400,
detail="Conflicts with existing version alias and can not overwrite it.",
)
except RecordNotFoundError:
pass

return dataset, version
Loading

0 comments on commit 6f381a8

Please sign in to comment.