Skip to content

Commit

Permalink
API Import: organization mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
LesterLyu committed Oct 22, 2024
1 parent 0c5357d commit 25a4f7a
Show file tree
Hide file tree
Showing 13 changed files with 580 additions and 139 deletions.
2 changes: 1 addition & 1 deletion ckanext/udc_import_other_portals/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def job_run_import(import_config_id: str, run_by: str, job_id: str):
logic.ValidationError("import_config_id should be provided.")
)

import_config = CUDCImportConfig.get(import_config_id)
import_config: 'CUDCImportConfig' = CUDCImportConfig.get(import_config_id)

if not import_config:
raise logger.exception(
Expand Down
112 changes: 90 additions & 22 deletions ckanext/udc_import_other_portals/logic/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from ckanext.udc_import_other_portals.logger import ImportLogger, generate_trace
from ckanext.udc_import_other_portals.model import CUDCImportConfig
from ckanext.udc_import_other_portals.worker.socketio_client import SocketClient
import ckan.plugins.toolkit as toolkit
from ckan.types import Context
Expand All @@ -7,19 +8,21 @@
from ckan.common import current_user
from ckan.lib.search.common import SearchIndexError

import threading
from typing import List, Dict, cast
from .deduplication import find_duplicated_packages, process_duplication

import logging

base_logger = logging.getLogger(__name__)

lock = threading.Lock()

class ImportError(ValueError):
pass


def get_package(context: Context, package_id: str=None, package_name: str=None):
def get_package(context: Context, package_id: str = None, package_name: str = None):
if not package_id and not package_name:
raise ValueError("Either package_id or package_name should be provided.")
if package_id and package_name:
Expand All @@ -30,15 +33,15 @@ def get_package(context: Context, package_id: str=None, package_name: str=None):
data_dict = {"name": package_name}
logic.check_access("package_show", context, data_dict=data_dict)
package_dict = logic.get_action("package_show")(context, data_dict)

# Prevent the package with the same name but different id (the provided id is treated as a name)
if package_id and package_dict["id"] != package_id:
raise ValueError(f"Package id={package_id} is not found.")

return package_dict


def check_existing_package_id_or_name(context, id: str=None, name: str=None):
def check_existing_package_id_or_name(context, id: str = None, name: str = None):
if not id and not name:
raise ValueError("Either id or name should be provided.")
if id and name:
Expand Down Expand Up @@ -92,6 +95,32 @@ def delete_package(context: Context, package_id: str):
logic.get_action("package_delete")(context, {"id": package_id})


def get_organization(context: Context, organization_id: str = None):
data_dict = {"id": organization_id}
logic.check_access("organization_show", context, data_dict=data_dict)
return logic.get_action("organization_show")(context, data_dict)


def get_organization_ids(context: Context):
logic.check_access("organization_list", context)
return logic.get_action("organization_list")(context)


def ensure_organization(context: Context, organization: dict):
with lock:
if not context:
# testing environment, do not create organization
return
logic.check_access("organization_list", context)
organization_ids = logic.get_action("organization_list")(context)
for organization_id in organization_ids:
if organization_id == organization["id"]:
return

logic.check_access("organization_create", context, data_dict=organization)
logic.get_action("organization_create")(context, organization)


class BaseImport:
"""
Abstract class that manages logging and provides interface to backend APIs
Expand All @@ -102,7 +131,7 @@ class BaseImport:
running = False
socket_client: SocketClient = None

def __init__(self, context, import_config, job_id):
def __init__(self, context, import_config: "CUDCImportConfig", job_id):
self.context = context
self.import_config = import_config
self.job_id = job_id
Expand All @@ -123,7 +152,7 @@ def build_context(self):
)
return context

def map_to_cudc_package(self, src: dict):
def map_to_cudc_package(self, src: dict, target: dict):
"""
Map source package to cudc package.
Expand All @@ -142,8 +171,47 @@ def process_package(self, src):
Returns:
str: The ID of the mapped package.
"""
# Some defaults
target = {
"owner_org": self.import_config.owner_org,
"type": "catalogue",
"license_id": "notspecified",
}
platform = self.import_config.platform
if platform == "ckan":
org_import_mode = self.import_config.other_config.get("org_import_mode")
org_mapping = self.import_config.other_config.get("org_mapping") or {}

if org_import_mode == "importToOwnOrg":
if org_mapping.get(src["owner_org"]):
target["owner_org"] = org_mapping[src["owner_org"]]
else:
# Use the same organization id
target["owner_org"] = src["owner_org"]


query = (
model.Session.query(model.Group)
.filter(model.Group.id == src["owner_org"])
.filter(model.Group.is_organization == True)
)
org = query.first()

# Create the organization if not exists
if org is None:
ensure_organization(
self.build_context(),
{
"id": src["organization"]["id"],
"name": src["organization"]["name"],
"title": src["organization"]["title"],
"description": src["organization"]["description"],
},
)
model.Session.commit()

try:
mapped = self.map_to_cudc_package(src)
mapped = self.map_to_cudc_package(src, target)
except Exception as e:
self.logger.error(f"ERROR: Failed to map package from source.")
self.logger.exception(e)
Expand Down Expand Up @@ -260,19 +328,19 @@ def run_imports(self):

def ensure_license(context, license_id, license_title, license_url, check=True):
"""Ensure that the license exists in the database."""
if not context:
# tesing environment, do not create license
return
licenses = logic.get_action("licenses_get")(context)
for license in licenses:
if license["id"] == license_id:
with lock:
if not context:
# tesing environment, do not create license
return
try:
logic.get_action("license_create")(
context, {"id": license_id, "title": license_title, "url": license_url}
)
except:
# Weird concurrency issue
pass
return

licenses = logic.get_action("licenses_get")(context)
for license in licenses:
if license["id"] == license_id:
return
try:
logic.get_action("license_create")(
context, {"id": license_id, "title": license_title, "url": license_url}
)
except:
# Weird concurrency issue
pass
return
37 changes: 36 additions & 1 deletion ckanext/udc_import_other_portals/logic/ckan_based/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def get_package(package_id, base_api, api_key=None):
return res["result"]


def get_all_packages(base_api, size=None, api_key=None):
def get_all_packages(base_api, size=None, api_key=None, cb=None):
"""
Retrieve all packages from the CKAN API using the package_search endpoint.
Expand All @@ -45,6 +45,8 @@ def get_all_packages(base_api, size=None, api_key=None):
# Construct the API request URL
url = f"{base_api}/3/action/package_search?rows={rows}&start={offset}"
print(f"getting package rows={rows} offset={offset}")
if cb:
cb(f"Got {offset} packages")

try:
# Make the API request
Expand Down Expand Up @@ -138,3 +140,36 @@ def check_site_alive(base_api):
return res["result"]
except:
return False

def get_organization(base_api, organization_id=None):
res = requests.get(f"{base_api}/3/action/organization_show?id={organization_id}").json()
return res["result"]

def get_organization_ids(base_api):
res = requests.get(f"{base_api}/3/action/organization_list").json()
return res["result"]

def get_organizations(base_api):
"""
Example response:
[
{
"approval_status": "approved",
"created": "2018-07-27T18:51:10.451359",
"description": "",
"display_name": "Argentia Private Investments Inc. | Argentia Private Investments Inc.",
"id": "76287b5c-ceb0-44fb-a62f-3cd4ee5de656",
"image_display_url": "",
"image_url": "",
"is_organization": true,
"name": "api",
"num_followers": 0,
"package_count": 0,
"state": "active",
"title": "Argentia Private Investments Inc. | Argentia Private Investments Inc.",
"type": "organization"
},
]
"""
res = requests.get(f"{base_api}/3/action/organization_list?all_fields=true&limit=1000").json()
return res["result"]
12 changes: 7 additions & 5 deletions ckanext/udc_import_other_portals/logic/ckan_based/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import traceback
import logging
from ckanext.udc_import_other_portals.model import CUDCImportConfig
from ckanext.udc_import_other_portals.worker.socketio_client import SocketClient
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
Expand All @@ -18,9 +19,9 @@ class CKANBasedImport(BaseImport):
Abstract class for imports
"""

def __init__(self, context, import_config, job_id, base_api):
def __init__(self, context, import_config: 'CUDCImportConfig', job_id: str):
super().__init__(context, import_config, job_id)
self.base_api = base_api
self.base_api = import_config.other_config.get("base_api")

def iterate_imports(self):
"""
Expand All @@ -35,18 +36,19 @@ def run_imports(self):
"""
self.running = True
self.socket_client = SocketClient(self.job_id)
self.all_packages = get_all_packages(self.base_api)
self.logger = ImportLogger(base_logger, 0, self.socket_client)

self.all_packages = get_all_packages(self.base_api, cb=lambda x: self.logger.info(x))
self.packages_ids = [p['id'] for p in self.all_packages]
# Set the import size for reporting in the frontend
self.import_size = len(self.packages_ids)
self.logger.total = self.import_size = len(self.packages_ids)

# Make sure the sockeio server is connected
while not self.socket_client.registered:
time.sleep(0.2)
base_logger.info("Waiting socketio to be connected.")
base_logger.info("socketio connected.")
print("self.import_size", self.import_size)
self.logger = ImportLogger(base_logger, self.import_size, self.socket_client)

# Check if packages are deleted from the remote since last import
if self.import_config.other_data is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,8 @@
"XML": "application/xml",
}


# "https://ckan0.cf.opendata.inter.prod-toronto.ca/api"
class CityOfTorontoImport(CKANBasedImport):
def __init__(self, context, import_config, job_id):
super().__init__(
context,
import_config,
job_id,
# City of Toronto URL
"https://ckan0.cf.opendata.inter.prod-toronto.ca/api",
)

def iterate_imports(self):
"""
Expand All @@ -82,7 +74,7 @@ def iterate_imports(self):

yield package

def map_to_cudc_package(self, src: dict):
def map_to_cudc_package(self, src: dict, target: dict):
"""
Map source package to cudc package.
Expand All @@ -93,13 +85,6 @@ def map_to_cudc_package(self, src: dict):
from datetime import datetime
import re

# Default fields in CUDC
target = {
"owner_org": self.import_config.owner_org,
"type": "catalogue",
"license_id": "notspecified",
}

global package_mapping

# One-to-one Mapping
Expand Down
Loading

0 comments on commit 25a4f7a

Please sign in to comment.