Skip to content

Commit

Permalink
feat: apply compatibility changes to work with Splitgill vNext
Browse files Browse the repository at this point in the history
BREAKING CHANGE: apply compatibility changes to work with Splitgill vNext

This has loads of stuff in it. The major changes are to ingest and sync as related to Splitgill. Additionally, IDs no longer have to be integers, they are strings. There are then a lot of refactoring elements to this commit which cover querying, actions, queued tasks, and a number of other more minor areas. The download code is mostly left as is with a few minor alterations.
  • Loading branch information
jrdh committed Sep 16, 2024
1 parent 573c1c5 commit ec97591
Show file tree
Hide file tree
Showing 127 changed files with 5,479 additions and 7,570 deletions.
60 changes: 35 additions & 25 deletions ckanext/versioned_datastore/cli.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
from typing import Tuple

import click

from ckan.plugins import toolkit
from .model.details import datastore_resource_details_table
from .model.downloads import (
from ckanext.versioned_datastore.model.details import datastore_resource_details_table
from ckanext.versioned_datastore.model.downloads import (
datastore_downloads_requests_table,
datastore_downloads_derivative_files_table,
datastore_downloads_core_files_table,
)
from .model.slugs import datastore_slugs_table, navigational_slugs_table
from .model.stats import import_stats_table
from ckanext.versioned_datastore.model.slugs import (
datastore_slugs_table,
navigational_slugs_table,
)
from ckanext.versioned_datastore.model.stats import import_stats_table


def get_commands():
Expand All @@ -23,7 +28,7 @@ def versioned_datastore():
pass


@versioned_datastore.command(name='initdb')
@versioned_datastore.command(name="initdb")
def init_db():
"""
Ensure the tables needed by this plugin exist.
Expand All @@ -41,51 +46,56 @@ def init_db():
for table in tables:
if not table.exists():
table.create()
click.secho('Finished creating tables', fg='green')
click.secho("Finished creating tables", fg="green")


@versioned_datastore.command()
@click.option('-r', '--resource_id', 'resource_ids', multiple=True)
def reindex(resource_ids):
@click.option("-r", "--resource_id", "resource_ids", multiple=True)
@click.option(
"--full",
is_flag=True,
show_default=True,
default=False,
help="Completely resync all data from MongoDB to Elasticsearch for the resource(s)",
)
def reindex(resource_ids: Tuple[str], full: bool = False):
"""
Reindex either a specific resource or all resources.
"""
ids = set()
context = {'ignore_auth': True}
context = {"ignore_auth": True}

if resource_ids:
# the user has specified some resources to reindex
ids.update(resource_ids)
else:
# the user hasn't specified the resources to reindex so we should get a list of all
# resources in the system
data_dict = {'query': 'name:', 'offset': 0, 'limit': 100}
# the user hasn't specified the resources to reindex, so we should get a list of
# all resources in the system
data_dict = {"query": "name:", "offset": 0, "limit": 100}
while True:
result = toolkit.get_action('resource_search')(context, data_dict)
if len(result['results']) > 0:
ids.update(resource['id'] for resource in result['results'])
data_dict['offset'] += data_dict['limit']
result = toolkit.get_action("resource_search")(context, data_dict)
if len(result["results"]) > 0:
ids.update(resource["id"] for resource in result["results"])
data_dict["offset"] += data_dict["limit"]
else:
break

if not ids:
click.secho('No resources found to reindex', fg='green')
click.secho("No resources found to reindex", fg="green")
return

click.secho(f'Found {len(ids)} resources to reindex', fg='yellow')
click.secho(f"Found {len(ids)} resources to reindex", fg="yellow")

for resource_id in sorted(ids):
try:
result = toolkit.get_action('datastore_reindex')(
context, {'resource_id': resource_id}
result = toolkit.get_action("vds_data_sync")(
context, {"resource_id": resource_id, "full": full}
)
click.secho(
f'Queued reindex of {resource_id} as job {result["job_id"]}', fg='cyan'
f'Queued reindex of {resource_id} as job {result["job_id"]}', fg="cyan"
)
except toolkit.ValidationError as e:
click.secho(
f'Failed to reindex {resource_id} due to validation error: {e}',
fg='red',
f"Failed to reindex {resource_id} due to validation error: {e}",
fg="red",
)

click.secho('Reindexing complete', fg='green')
73 changes: 37 additions & 36 deletions ckanext/versioned_datastore/helpers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import json
from ckan.plugins import toolkit
from datetime import date

from .lib.common import ALL_FORMATS
from .lib.importing import stats
from .lib.query.slugs import create_nav_slug
from ckan.plugins import toolkit
from ckanext.versioned_datastore.lib.common import ALL_FORMATS
from ckanext.versioned_datastore.lib.query.search.query import SchemaQuery
from ckanext.versioned_datastore.lib.query.slugs.slugs import create_nav_slug
from ckanext.versioned_datastore.model import stats


def is_duplicate_ingestion(stat):
Expand All @@ -18,7 +19,7 @@ def is_duplicate_ingestion(stat):
:param stat: the ImportStats object
:return: True if the error on this stat is a duplicate ingestion error, False if not
"""
return stat.error and 'this file has been ingested before' in stat.error.lower()
return stat.error and "this file has been ingested before" in stat.error.lower()


def get_human_duration(stat):
Expand All @@ -33,11 +34,11 @@ def get_human_duration(stat):
:return: a nicely formatted duration string
"""
if stat.duration < 60:
return toolkit._(f'{stat.duration:.2f} seconds')
return toolkit._(f"{stat.duration:.2f} seconds")
elif stat.duration < 60 * 60:
return toolkit._(f'{stat.duration / 60:.0f} minutes')
return toolkit._(f"{stat.duration / 60:.0f} minutes")
else:
return toolkit._(f'{stat.duration / (60 * 60):.0f} hours')
return toolkit._(f"{stat.duration / (60 * 60):.0f} hours")


def get_stat_icon(stat):
Expand All @@ -51,21 +52,21 @@ def get_stat_icon(stat):
"""
if stat.in_progress:
# a spinner, that spins
return 'fa-spinner fa-pulse'
return "fa-spinner fa-pulse"
if stat.error:
if is_duplicate_ingestion(stat):
# we don't want this to look like an error
return 'fa-copy'
return 'fa-exclamation'
return "fa-copy"
return "fa-exclamation"

if stat.type == stats.INGEST:
return 'fa-tasks'
return "fa-tasks"
if stat.type == stats.INDEX:
return 'fa-search'
return "fa-search"
if stat.type == stats.PREP:
return 'fa-cogs'
return "fa-cogs"
# shouldn't get here, just use some default tick thing
return 'fa-check'
return "fa-check"


def get_stat_activity_class(stat):
Expand All @@ -78,11 +79,11 @@ def get_stat_activity_class(stat):
:return: a string
"""
if stat.in_progress:
return 'in_progress'
return "in_progress"
if stat.error:
if is_duplicate_ingestion(stat):
return 'duplicate'
return 'failure'
return "duplicate"
return "failure"
return stat.type


Expand All @@ -95,11 +96,11 @@ def get_stat_title(stat):
:return: the title for the activity item as a unicode string
"""
if stat.type == stats.INGEST:
return toolkit._('Ingested new resource data')
return toolkit._("Ingested new resource data")
if stat.type == stats.INDEX:
return toolkit._('Updated search index with resource data')
return toolkit._("Updated search index with resource data")
if stat.type == stats.PREP:
return toolkit._('Validated and prepared the data for ingestion')
return toolkit._("Validated and prepared the data for ingestion")
return stat.type


Expand Down Expand Up @@ -140,26 +141,26 @@ def latest_item_version(resource_id, record_id=None):
:param record_id: optional; a record id to search for instead
:return: if record id is provided, the latest record version, else the latest resource version
"""
action = (
'datastore_get_record_versions'
if record_id
else 'datastore_get_resource_versions'
)
data_dict = {'resource_id': resource_id}
if record_id:
data_dict['id'] = record_id
action = "vds_version_record"
data_dict = {"resource_id": resource_id, "record_id": record_id}
else:
action = "vds_version_resource"
data_dict = {"resource_id": resource_id}

versions = toolkit.get_action(action)({}, data_dict)
return versions[-1]
result = toolkit.get_action(action)({}, data_dict)

if record_id:
# we get back a list of versions in ascending order
return result[-1]
else:
# we get back a dict of versions and counts
return max(result.keys())


def nav_slug(
query=None, version=None, resource_ids=None, resource_ids_and_versions=None
):
def nav_slug(query=None, version=None, resource_ids=None):
"""
Just a helper proxy for create_nav_slug.
"""
is_new, slug = create_nav_slug(
{}, query or {}, version, resource_ids, resource_ids_and_versions
)
is_new, slug = create_nav_slug(SchemaQuery(resource_ids, version, query))
return slug.get_slug_string()
Loading

0 comments on commit ec97591

Please sign in to comment.