diff --git a/.buildinfo b/.buildinfo new file mode 100644 index 0000000..ac2ed63 --- /dev/null +++ b/.buildinfo @@ -0,0 +1,4 @@ +# Sphinx build info version 1 +# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done. +config: e0708c73fdf2c3ea84bd6deec14f47f3 +tags: 645f666f9bcd5a90fca523b33c5a78b7 diff --git a/.nojekyll b/.nojekyll new file mode 100644 index 0000000..e69de29 diff --git a/_images/activity.drawio.svg b/_images/activity.drawio.svg new file mode 100644 index 0000000..9e05ebe --- /dev/null +++ b/_images/activity.drawio.svg @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/_modules/index.html b/_modules/index.html new file mode 100644 index 0000000..763237c --- /dev/null +++ b/_modules/index.html @@ -0,0 +1,121 @@ + + +
+ + +
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+from mardi_importer.importer.Importer import ADataSource, ImporterException
+from mardi_importer.integrator import MardiIntegrator
+from .RPackage import RPackage
+
+import pandas as pd
+import time
+import json
+import os
+import logging
+log = logging.getLogger('CRANlogger')
+
+
+[docs]
+class CRANSource(ADataSource):
+ """Processes data from the Comprehensive R Archive Network.
+
+ Metadata for each R package is scrapped from the CRAN Repository. Each
+ Wikibase item corresponding to each R package is subsequently updated
+ or created, in case of a new package.
+
+ Attributes:
+ packages (Pandas dataframe):
+ Dataframe with **package name**, **title** and **date of publication** for
+ each package in CRAN.
+ """
+
+ def __init__(self):
+ self.integrator = MardiIntegrator()
+ self.filepath = os.path.realpath(os.path.dirname(__file__))
+ self.packages = ""
+
+
+[docs]
+ def setup(self):
+ """Create all necessary properties and entities for CRAN
+ """
+ # Import entities from Wikidata
+ filename = self.filepath + "/wikidata_entities.txt"
+ self.integrator.import_entities(filename=filename)
+
+ # Create new required local entities
+ self.create_local_entities()
+
+
+
+[docs]
+ def create_local_entities(self):
+ filename = self.filepath + "/new_entities.json"
+ f = open(filename)
+ entities = json.load(f)
+
+ for prop_element in entities['properties']:
+ prop = self.integrator.property.new()
+ prop.labels.set(language='en', value=prop_element['label'])
+ prop.descriptions.set(language='en', value=prop_element['description'])
+ prop.datatype = prop_element['datatype']
+ if not prop.exists(): prop.write()
+
+ for item_element in entities['items']:
+ item = self.integrator.item.new()
+ item.labels.set(language='en', value=item_element['label'])
+ item.descriptions.set(language='en', value=item_element['description'])
+ for key, value in item_element['claims'].items():
+ item.add_claim(key,value=value)
+ if not item.exists(): item.write()
+
+
+
+[docs]
+ def pull(self):
+ """Reads **date**, **package name** and **title** from the CRAN Repository URL.
+
+ The result is saved as a pandas dataframe in the attribute **packages**.
+
+ Returns:
+ Pandas dataframe: Attribute ``packages``
+
+ Raises:
+ ImporterException: If table at the CRAN url cannot be accessed or read.
+ """
+ url = r"https://cran.r-project.org/web/packages/available_packages_by_date.html"
+
+ try:
+ tables = pd.read_html(url) # Returns list of all tables on page
+ except Exception as e:
+ raise ImporterException(
+ "Error attempting to read table from CRAN url\n{}".format(e)
+ )
+ else:
+ self.packages = tables[0]
+ return self.packages
+
+
+
+[docs]
+ def push(self):
+ """Updates the MaRDI Wikibase entities corresponding to R packages.
+
+ For each **package name** in the attribute **packages** checks
+ if the date in CRAN coincides with the date in the MaRDI
+ knowledge graph. If not, the package is updated. If the package
+ is not found in the MaRDI knowledge graph, the corresponding
+ item is created.
+
+ It creates a :class:`mardi_importer.cran.RPackage` instance
+ for each package.
+ """
+ # Limit the query to only 30 packages (Comment next line to process data on all ~19000 packages)
+ #self.packages = self.packages.loc[:100, :]
+
+ flag = False
+
+ for _, row in self.packages.iterrows():
+ package_date = row["Date"]
+ package_label = row["Package"]
+ package_title = row["Title"]
+
+ #if not flag and package_label != "BeSS":
+ # continue
+ #flag = True
+ #if package_label == "GeoModels":
+
+ package = RPackage(package_date, package_label, package_title, self.integrator)
+ if package.exists():
+ if not package.is_updated():
+ print(f"Package {package_label} found: Not up to date. Attempting update...")
+ package.update()
+ else:
+ print(f"Package {package_label} found: Already up to date.")
+ else:
+ print(f"Package {package_label} not found: Attempting item creation...")
+ package.create()
+
+ time.sleep(2)
+
+
+
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+from mardi_importer.integrator import MardiIntegrator, MardiItemEntity
+from mardi_importer.publications import (ArxivPublication,
+ CrossrefPublication,
+ ZenodoResource,
+ Author)
+from wikibaseintegrator.wbi_enums import ActionIfExists
+from wikibaseintegrator.wbi_helpers import search_entities, remove_claims
+
+from dataclasses import dataclass, field
+from typing import Optional, Dict, List, Tuple
+from io import StringIO
+
+from bs4 import BeautifulSoup
+import pandas as pd
+import requests
+import re
+
+import logging
+log = logging.getLogger('CRANlogger')
+
+
+[docs]
+@dataclass
+class RPackage:
+ """Class to manage R package items in the local Wikibase instance.
+
+ Attributes:
+ date:
+ Date of publication
+ label:
+ Package name
+ description:
+ Title of the R package
+ long_description:
+ Detailed description of the R package
+ url:
+ URL to the CRAN repository
+ version:
+ Version of the R package
+ versions:
+ Previous published versions
+ author:
+ Author(s) of the package
+ license:
+ Software license
+ dependency:
+ Dependencies to R and other packages
+ imports:
+ Imported R packages
+ maintainer:
+ Software maintainer
+ _QID:
+ Package QID
+ integrator:
+ API to MaRDI integrator
+ """
+ date: str
+ label: str
+ description: str
+ api: MardiIntegrator
+ long_description: str = ""
+ url: str = ""
+ version: str = ""
+ versions: List[Tuple[str, str]] = field(default_factory=list)
+ authors: List[Author] = field(default_factory=list)
+ license_data: List[Tuple[str, str]] = field(default_factory=list)
+ dependencies: List[Tuple[str, str]] = field(default_factory=list)
+ imports: List[Tuple[str, str]] = field(default_factory=list)
+ maintainer: str = ""
+ author_pool: List[Author] = field(default_factory=list)
+ crossref_publications: List[CrossrefPublication] = field(default_factory=list)
+ arxiv_publications: List[ArxivPublication] = field(default_factory=list)
+ zenodo_resources: List[ZenodoResource] = field(default_factory=list)
+ _QID: str = ""
+ _item: MardiItemEntity = None
+
+ @property
+ def QID(self) -> str:
+ """Return the QID of the R package in the knowledge graph.
+
+ Searches for an item with the package label in the Wikibase
+ SQL tables and returns the QID if a matching result is found.
+
+ Returns:
+ str: The entity QID representing the R package.
+ """
+ self._QID = self._QID or self.item.is_instance_of('wd:Q73539779')
+ return self._QID
+
+ @property
+ def item(self) -> MardiItemEntity:
+ """Return the integrator Item representing the R package.
+
+ Adds also the label and description of the package.
+
+ Returns:
+ MardiItemEntity: Integrator item
+ """
+ if not self._item:
+ self._item = self.api.item.new()
+ self._item.labels.set(language="en", value=self.label)
+ description = self.description
+ if self.label == self.description:
+ description += " (R Package)"
+ self._item.descriptions.set(
+ language="en",
+ value=description
+ )
+ return self._item
+
+
+[docs]
+ def exists(self) -> str:
+ """Checks if an item corresponding to the R package already exists.
+
+ Returns:
+ str: Entity ID
+ """
+ if self.QID:
+ self._item = self.api.item.get(entity_id=self.QID)
+ return self.QID
+
+
+
+[docs]
+ def is_updated(self) -> bool:
+ """Checks if the Item corresponding to the R package is up to date.
+
+ Compares the last update property in the local knowledge graph with
+ the publication date imported from CRAN.
+
+ Returns:
+ bool: **True** if both dates coincide, **False** otherwise.
+ """
+ return self.date == self.get_last_update()
+
+
+
+[docs]
+ def pull(self):
+ """Imports metadata from CRAN corresponding to the R package.
+
+ Imports **Version**, **Dependencies**, **Imports**m **Authors**,
+ **Maintainer** and **License** and saves them as instance
+ attributes.
+ """
+ self.url = f"https://CRAN.R-project.org/package={self.label}"
+
+ try:
+ page = requests.get(self.url)
+ soup = BeautifulSoup(page.content, 'lxml')
+ except:
+ log.warning(f"Package {self.label} package not found in CRAN.")
+ return None
+ else:
+ if soup.find_all('table'):
+ self.long_description = soup.find_all('p')[0].get_text() or ""
+ self.parse_publications(self.long_description)
+ self.long_description = re.sub("\n", "", self.long_description).strip()
+ self.long_description = re.sub("\t", "", self.long_description).strip()
+
+ table = soup.find_all('table')[0]
+ package_df = self.clean_package_list(table)
+
+ if "Version" in package_df.columns:
+ self.version = package_df.loc[1, "Version"]
+ if "Author" in package_df.columns:
+ self.authors = package_df.loc[1, "Author"]
+ if "License" in package_df.columns:
+ self.license_data = package_df.loc[1, "License"]
+ if "Depends" in package_df.columns:
+ self.dependencies = package_df.loc[1, "Depends"]
+ if "Imports" in package_df.columns:
+ self.imports = package_df.loc[1, "Imports"]
+ if "Maintainer" in package_df.columns:
+ self.maintainer = package_df.loc[1, "Maintainer"]
+
+ self.get_versions()
+ else:
+ log.warning(f"Metadata table not found in CRAN. Package has probably been archived.")
+ return self
+
+
+
+[docs]
+ def create(self) -> None:
+ """Create a package in the Wikibase instance.
+
+ This function pulls the package, inserts its claims, and writes
+ it to the Wikibase instance.
+
+ Returns:
+ None
+ """
+ package = self.pull()
+
+ if package:
+ package = package.insert_claims().write()
+
+ if package:
+ log.info(f"Package created with QID: {package['QID']}.")
+ #print('package created')
+ else:
+ log.info(f"Package could not be created.")
+
+ #print('package not created')
+
+
+[docs]
+ def write(self) -> Optional[Dict[str, str]]:
+ """Write the package item to the Wikibase instance.
+
+ If the item has claims, it will be written to the Wikibase instance.
+ If the item is successfully written, a dictionary with the QID of the
+ item will be returned.
+
+ Returns:
+ Optional[Dict[str, str]]:
+ A dictionary with the QID of the written item if successful,
+ or None otherwise.
+ """
+ if self.item.claims:
+ item = self.item.write()
+ if item:
+ return {'QID': item.id}
+
+
+
+[docs]
+ def insert_claims(self):
+
+ # Instance of: R package
+ self.item.add_claim("wdt:P31", "wd:Q73539779")
+
+ # Programmed in: R
+ self.item.add_claim("wdt:P277", "wd:Q206904")
+
+ # Long description
+ prop_nr = self.api.get_local_id_by_label("description", "property")
+ self.item.add_claim(prop_nr, self.long_description)
+
+ # Last update date
+ self.item.add_claim("wdt:P5017", f"+{self.date}T00:00:00Z")
+
+ # Software version identifiers
+ for version, publication_date in self.versions:
+ qualifier = [self.api.get_claim("wdt:P577", publication_date)]
+ self.item.add_claim("wdt:P348", version, qualifiers=qualifier)
+
+ if self.version:
+ qualifier = [self.api.get_claim("wdt:P577", f"+{self.date}T00:00:00Z")]
+ self.item.add_claim("wdt:P348", self.version, qualifiers=qualifier)
+
+ # Disambiguate Authors and create corresponding Author items
+ self.author_pool = Author.disambiguate_authors(self.author_pool)
+
+ # Authors
+ for author in self.authors:
+ author.pull_QID(self.author_pool)
+ self.item.add_claim("wdt:P50", author.QID)
+
+ # Maintainer
+ self.maintainer.pull_QID(self.author_pool)
+ self.item.add_claim("wdt:P126", self.maintainer.QID)
+
+ # Licenses
+ if self.license_data:
+ claims = self.process_claims(self.license_data, 'wdt:P275', 'wdt:P9767')
+ self.item.add_claims(claims)
+
+ # Dependencies
+ if self.dependencies:
+ claims = self.process_claims(self.dependencies, 'wdt:P1547', 'wdt:P348')
+ self.item.add_claims(claims)
+
+ # Imports
+ if self.imports:
+ prop_nr = self.api.get_local_id_by_label("imports", "property")
+ claims = self.process_claims(self.imports, prop_nr, 'wdt:P348')
+ self.item.add_claims(claims)
+
+ # Related publications and sources
+ cites_work = "wdt:P2860"
+ for publications in [self.crossref_publications, self.arxiv_publications, self.zenodo_resources]:
+ for publication in publications:
+ for author in publication.authors:
+ author.pull_QID(self.author_pool)
+ publication.create()
+ self.item.add_claim(cites_work, publication.QID)
+
+ # CRAN Project
+ self.item.add_claim("wdt:P5565", self.label)
+
+ # Wikidata QID
+ wikidata_QID = self.get_wikidata_QID()
+ if wikidata_QID: self.item.add_claim("Wikidata QID", wikidata_QID)
+
+ return self
+
+
+
+[docs]
+ def update(self):
+ """Updates existing WB item with the imported metadata from CRAN.
+
+ The metadata corresponding to the package is first pulled from CRAN and
+ saved as instance attributes through :meth:`pull`. The statements that
+ do not coincide with the locally saved information are updated or
+ subsituted with the updated information.
+
+ Uses :class:`mardi_importer.wikibase.WBItem` to update the item
+ corresponding to the R package.
+
+ Returns:
+ str: ID of the updated R package.
+ """
+ if self.pull():
+ # Obtain current Authors
+ current_authors = self.item.get_value('wdt:P50')
+ for author_qid in current_authors:
+ author_item = self.api.item.get(entity_id=author_qid)
+ author_label = str(author_item.labels.get('en'))
+ current_author = Author(self.api, name=author_label)
+ current_author._QID = author_qid
+ self.author_pool += [current_author]
+
+ # Disambiguate Authors and create corresponding Author items
+ self.author_pool = Author.disambiguate_authors(self.author_pool)
+
+ # GUID to remove
+ remove_guid = []
+ props_to_delete = ['wdt:P50', 'wdt:P275', 'wdt:P1547', 'imports', 'wdt:P2860']
+ for prop_str in props_to_delete:
+ prop_nr = self.api.get_local_id_by_label(prop_str, 'property')
+ for claim in self.item.claims.get(prop_nr):
+ remove_guid.append(claim.id)
+
+ for guid in remove_guid:
+ remove_claims(guid, login=self.api.login, is_bot=True)
+
+ # Restart item state
+ self.exists()
+
+ if self.item.descriptions.values.get('en') != self.description:
+ description = self.description
+ if self.label == self.description:
+ description += " (R Package)"
+ self.item.descriptions.set(
+ language="en",
+ value=description
+ )
+
+ # Long description
+ self.item.add_claim("description", self.long_description, action="replace_all")
+
+ # Last update date
+ self.item.add_claim("wdt:P5017", f"+{self.date}T00:00:00Z", action="replace_all")
+
+ # Software version identifiers
+ for version, publication_date in self.versions:
+ qualifier = [self.api.get_claim("wdt:P577", publication_date)]
+ self.item.add_claim("wdt:P348", version, qualifiers=qualifier)
+
+ if self.version:
+ qualifier = [self.api.get_claim("wdt:P577", f"+{self.date}T00:00:00Z")]
+ self.item.add_claim("wdt:P348", self.version, qualifiers=qualifier)
+
+ # Authors
+ for author in self.authors:
+ author.pull_QID(self.author_pool)
+ self.item.add_claim("wdt:P50", author.QID)
+
+ # Maintainer
+ self.maintainer.pull_QID(self.author_pool)
+ self.item.add_claim("wdt:P126", self.maintainer.QID, action="replace_all")
+
+ # Licenses
+ if self.license_data:
+ claims = self.process_claims(self.license_data, 'wdt:P275', 'wdt:P9767')
+ self.item.add_claims(claims)
+
+ # Dependencies
+ if self.dependencies:
+ claims = self.process_claims(self.dependencies, 'wdt:P1547', 'wdt:P348')
+ self.item.add_claims(claims)
+
+ # Imports
+ if self.imports:
+ prop_nr = self.api.get_local_id_by_label("imports", "property")
+ claims = self.process_claims(self.imports, prop_nr, 'wdt:P348')
+ self.item.add_claims(claims)
+
+ # Related publications and sources
+ cites_work = "wdt:P2860"
+ for publications in [self.crossref_publications, self.arxiv_publications, self.zenodo_resources]:
+ for publication in publications:
+ for author in publication.authors:
+ author.pull_QID(self.author_pool)
+ publication.create()
+ self.item.add_claim(cites_work, publication.QID)
+
+ # CRAN Project
+ self.item.add_claim("wdt:P5565", self.label, action="replace_all")
+
+ # Wikidata QID
+ wikidata_QID = self.get_wikidata_QID()
+ if wikidata_QID: self.item.add_claim("Wikidata QID", wikidata_QID, action="replace_all")
+
+ package = self.write()
+
+ if package:
+ print(f"Package with QID updated: {package['QID']}.")
+ else:
+ print(f"Package could not be updated.")
+
+
+
+[docs]
+ def process_claims(self, data, prop_nr, qualifier_nr=None):
+
+ claims = []
+ for value, qualifier_value in data:
+ qualifier_prop_nr = (
+ 'wdt:P2699' if qualifier_value.startswith('https') else qualifier_nr
+ )
+ qualifier = (
+ [self.api.get_claim(qualifier_prop_nr, qualifier_value)]
+ if qualifier_value else []
+ )
+ claims.append(self.api.get_claim(prop_nr, value, qualifiers=qualifier))
+ return claims
+
+
+
+[docs]
+ def parse_publications(self, description):
+ """Extracts the DOI identification of related publications.
+
+ Identifies the DOI of publications that are mentioned using the
+ format *doi:* or *arXiv:* in the long description of the
+ R package.
+
+ Returns:
+ List:
+ List containing the wikibase IDs of mentioned publications.
+ """
+ doi_references = re.findall('<doi:(.*?)>', description)
+ arxiv_references = re.findall('<arXiv:(.*?)>', description)
+ zenodo_references = re.findall('<zenodo:(.*?)>', description)
+
+ doi_references = list(map(lambda x: x[:-1] if x.endswith('.') else x, doi_references))
+ arxiv_references = list(map(lambda x: x[:-1] if x.endswith('.') else x, arxiv_references))
+ zenodo_references = list(map(lambda x: x[:-1] if x.endswith('.') else x, zenodo_references))
+
+ crossref_references = []
+
+ for doi in doi_references:
+ doi = doi.strip().lower()
+ if re.search('10.48550/', doi):
+ arxiv_id = doi.replace(":",".")
+ arxiv_id = arxiv_id.replace('10.48550/arxiv.', '')
+ arxiv_references.append(arxiv_id.strip())
+ elif re.search('10.5281/', doi):
+ zenodo_id = doi.replace(":",".")
+ zenodo_id = doi.replace('10.5281/zenodo.', '')
+ zenodo_references.append(zenodo_id.strip())
+ else:
+ crossref_references.append(doi)
+
+ for doi in crossref_references:
+ publication = CrossrefPublication(self.api, doi)
+ self.author_pool += publication.authors
+ self.crossref_publications.append(publication)
+
+ for arxiv_id in arxiv_references:
+ arxiv_id = arxiv_id.replace(":",".")
+ publication = ArxivPublication(self.api, arxiv_id)
+ if publication.title != "Error":
+ self.author_pool += publication.authors
+ self.arxiv_publications.append(publication)
+
+ for zenodo_id in zenodo_references:
+ zenodo_id = zenodo_id.replace(":",".")
+ publication = ZenodoResource(self.api, zenodo_id)
+ self.author_pool += publication.authors
+ self.zenodo_resources.append(publication)
+
+
+
+[docs]
+ def get_last_update(self):
+ """Returns the package last update date saved in the Wikibase instance.
+
+ Returns:
+ str: Last update date in format DD-MM-YYYY.
+ """
+ last_update = self.item.get_value("wdt:P5017")
+ return last_update[0][1:11] if last_update else None
+
+
+
+[docs]
+ def clean_package_list(self, table_html):
+ """Processes raw imported data from CRAN to enable the creation of items.
+
+ - Package dependencies are splitted at the comma position.
+ - License information is processed using the :meth:`parse_license` method.
+ - Author information is processed using the :meth:`parse_authors` method.
+ - Maintainer information is processed using the :meth:`parse_maintainer` method.
+
+ Args:
+ table_html:
+ HTML code obtained with BeautifulSoup corresponding to the table
+ containing the metadata of the R package imported from CRAN.
+ Returns:
+ (Pandas dataframe):
+ Dataframe with processed data from a single R package including columns:
+ **Version**, **Author**, **License**, **Depends**, **Imports**
+ and **Maintainer**.
+ """
+ package_df = pd.read_html(StringIO(str(table_html)))
+ package_df = package_df[0].set_index(0).T
+ package_df.columns = package_df.columns.str[:-1]
+ if "Depends" in package_df.columns:
+ package_df["Depends"] = package_df["Depends"].apply(self.parse_software)
+ if "Imports" in package_df.columns:
+ package_df["Imports"] = package_df["Imports"].apply(self.parse_software)
+ if "License" in package_df.columns:
+ package_df["License"] = package_df["License"].apply(self.parse_license)
+ if "Author" in package_df.columns:
+ package_df["Author"] = str(table_html.find("td", text="Author:").find_next_sibling("td")).replace('\n', '').replace('\r', '')
+ package_df["Author"] = package_df["Author"].apply(self.parse_authors)
+ if "Maintainer" in package_df.columns:
+ package_df["Maintainer"] = package_df["Maintainer"].apply(self.parse_maintainer)
+ return package_df
+
+
+
+[docs]
+ def parse_software(self, software_str: str) -> List[Tuple[str, str]]:
+ """Processes the dependency and import information of each R package.
+
+ This includes:
+ - Extracting the version information of each dependency/import if provided.
+ - Providing the Item QID given the dependency/import label.
+ - Creating a new Item if the dependency/import is not found in the
+ local knowledge graph.
+
+ Returns:
+ List[Tuple[str, str]]:
+ List of tuples including software QID and version.
+ """
+ if pd.isna(software_str):
+ return []
+
+ software_list = str(software_str).split(", ")
+ software_tuples = []
+
+ for software_string in software_list:
+ software_version = re.search("\((.*?)\)", software_string)
+ software_version = software_version.group(1) if software_version else ""
+
+ software_name = re.sub("\(.*?\)", "", software_string).strip()
+
+ # Instance of R package
+ if software_name == "R":
+ # Software = R
+ software_QID = self.api.query("local_id", "Q206904")
+ else:
+ item = self.api.item.new()
+ item.labels.set(language="en", value=software_name)
+ software_id = item.is_instance_of("wd:Q73539779")
+ if software_id:
+ # Software = R package
+ software_QID = software_id
+ else:
+ # Software = New instance of R package
+ item.add_claim("wdt:P31", "wd:Q73539779")
+ item.add_claim("wdt:P277", "wd:Q206904")
+ software_QID = item.write().id
+
+ software_tuples.append((software_QID, software_version))
+
+ return software_tuples
+
+
+
+[docs]
+ def parse_license(self, x: str) -> List[Tuple[str, str]]:
+ """Splits string of licenses.
+
+ Takes into account that licenses are often not uniformly listed.
+ Characters \|, + and , are used to separate licenses. Further
+ details on each license are often included in square brackets.
+
+ The concrete License is identified and linked to the corresponding
+ item that has previously been imported from Wikidata. Further license
+ information, when provided between round or square brackets, is added
+ as a qualifier.
+
+ If a file license is mentioned, the linked to the file license
+ in CRAN is added as a qualifier.
+
+ Args:
+ x (str): String imported from CRAN representing license
+ information.
+
+ Returns:
+ List[Tuple[str, str]]:
+ List of license tuples. Each tuple contains the license QID
+ as the first element and the license qualifier as the
+ second element.
+ """
+ if pd.isna(x):
+ return []
+
+ license_list = []
+ licenses = str(x).split(" | ")
+
+ i = 0
+ while i in range(len(licenses)):
+ if not re.findall(r"\[", licenses[i]) or (
+ re.findall(r"\[", licenses[i]) and re.findall(r"\]", licenses[i])
+ ):
+ license_list.append(licenses[i])
+ i += 1
+ elif re.findall(r"\[", licenses[i]) and not re.findall(
+ r"\]", licenses[i]
+ ):
+ j = i + 1
+ license_aux = licenses[i]
+ closed = False
+ while j < len(licenses) and not closed:
+ license_aux += " | "
+ license_aux += licenses[j]
+ if re.findall(r"\]", licenses[j]):
+ closed = True
+ j += 1
+ license_list.append(license_aux)
+ i = j
+
+ split_list = []
+ for item in license_list:
+ items = item.split(" + ")
+ i = 0
+ while i in range(len(items)):
+ if not re.findall(r"\[", items[i]) or (
+ re.findall(r"\[", items[i]) and re.findall(r"\]", items[i])
+ ):
+ split_list.append(items[i])
+ i += 1
+ elif re.findall(r"\[", items[i]) and not re.findall(r"\]", items[i]):
+ j = i + 1
+ items_aux = items[i]
+ closed = False
+ while j < len(items) and not closed:
+ items_aux += " + "
+ items_aux += items[j]
+ if re.findall(r"\]", items[j]):
+ closed = True
+ j += 1
+ split_list.append(items_aux)
+ i = j
+ license_list = list(dict.fromkeys(split_list))
+
+ license_tuples = []
+ for license_str in license_list:
+ license_qualifier = ""
+ if re.findall(r"\(.*?\)", license_str):
+ qualifier_groups = re.search(r"\((.*?)\)", license_str)
+ license_qualifier = qualifier_groups.group(1)
+ license_aux = re.sub(r"\(.*?\)", "", license_str)
+ if re.findall(r"\[.*?\]", license_aux):
+ qualifier_groups = re.search(r"\[(.*?)\]", license_str)
+ license_qualifier = qualifier_groups.group(1)
+ license_str = re.sub(r"\[.*?\]", "", license_aux)
+ else:
+ license_str = license_aux
+ elif re.findall(r"\[.*?\]", license_str):
+ qualifier_groups = re.search(r"\[(.*?)\]", license_str)
+ license_qualifier = qualifier_groups.group(1)
+ license_str = re.sub(r"\[.*?\]", "", license_str)
+
+ license_str = license_str.strip()
+ if license_str in ["file LICENSE", "file LICENCE"]:
+ license_qualifier = f"https://cran.r-project.org/web/packages/{self.label}/LICENSE"
+
+ license_QID = self.get_license_QID(license_str)
+ license_tuples.append((license_QID, license_qualifier))
+ return license_tuples
+
+
+
+[docs]
+ def parse_authors(self, x):
+ """Splits the string corresponding to the authors into a dictionary.
+
+ Author information in CRAN is not registered uniformly. This function
+ parses the imported string and returns just the names of the individuals
+ that can be unequivocally identified as authors (i.e. they are followed
+ by the *[aut]* abbreviation).
+
+ Generally, authors in CRAN are indicated with the abbreviation *[aut]*.
+ When no abbreviations are included, only the first individual is imported
+ to Wikibase (otherwise it can often not be established whether
+ information after the first author refers to another individual,
+ an institution, a funder, etc.)
+
+ Args:
+ x (String): String imported from CRAN representing author
+ information.
+
+ Returns:
+ (Dict): Dictionary of authors and corresponding ORCID ID, if provided.
+ """
+ td_match = re.match(r'<td>(.*?)</td>', x)
+ if td_match: x = td_match.groups()[0]
+
+ x = re.sub("<img alt.*?a>", "", x) # Delete img tags
+ x = re.sub(r"\(.*?\)", "", x) # Delete text in brackets
+ x = re.sub(r'"', "", x) # Delete quotation marks
+ x = re.sub("\t", "", x) # Delete tabs
+ x = re.sub("ORCID iD", "", x) # Delete orcid id refs
+ author_list = re.findall(r".*?\]", x)
+
+ authors = []
+ if author_list:
+ for author in author_list:
+ labels = re.findall(r"\[.*?\]", author)
+ if labels:
+ is_author = re.findall("aut", labels[0])
+ if is_author:
+ orcid = re.findall(r"\d{4}-\d{4}-\d{4}-.{4}", author)
+ if orcid:
+ orcid = orcid[0]
+ author = re.sub(r"<a href=.*?>", "", author)
+ author = re.sub(r"\[.*?\]", "", author)
+ author = re.sub(r"^\s?,", "", author)
+ author = re.sub(r"^\s?and\s?", "", author)
+ author = re.sub(
+ r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", "", author
+ )
+ author = author.strip()
+ multiple_words = author.split(" ")
+ if len(multiple_words) > 1:
+ if author:
+ authors.append(Author(self.api, author, orcid))
+ else:
+ authors_comma = x.split(", ")
+ authors_and = x.split(" and ")
+ if len(authors_and) > len(authors_comma):
+ author = re.sub(
+ r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", "", authors_and[0]
+ )
+ else:
+ author = re.sub(
+ r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+",
+ "",
+ authors_comma[0],
+ )
+ if len(author.split(" ")) > 5 or re.findall(r"[@\(\)\[\]&]", author):
+ author = ""
+ if author:
+ authors.append(Author(self.api, author))
+ self.author_pool += authors
+ return authors
+
+
+
+[docs]
+ def parse_maintainer(self, name: str) -> str:
+ """Remove unnecessary information from maintainer string.
+
+ Args:
+ x (str): String imported from CRAN which may contain e-mail
+ address and comments within brackets
+
+ Returns:
+ (str): Name of the maintainer
+ """
+ if pd.isna(name): return name
+
+ quotes = re.match(r'"(.*?)"', name)
+ if quotes:
+ name = quotes.groups()[0]
+
+ name = re.sub(r"<.*?>", "", name)
+ name = re.sub(r"\(.*?\)", "", name)
+ name = name.strip()
+ name = name.split(',')
+ maintainer = Author(self.api, name=name[0])
+ self.author_pool += [maintainer]
+ return maintainer
+
+
+
+[docs]
+ def get_license_QID(self, license_str: str) -> str:
+ """Returns the Wikidata item ID corresponding to a software license.
+
+ The same license is often denominated in CRAN using differents names.
+ This function returns the wikidata item ID corresponding to a single
+ unique license that is referenced in CRAN under different names (e.g.
+ *Artistic-2.0* and *Artistic License 2.0* both refer to the same
+ license, corresponding to item *Q14624826*).
+
+ Args:
+ license_str (str): String corresponding to a license imported from CRAN.
+
+ Returns:
+ (str): Wikidata item ID.
+ """
+ def get_license(label: str) -> str:
+ license_item = self.api.item.new()
+ license_item.labels.set(language="en", value=label)
+ return license_item.is_instance_of("wd:Q207621")
+
+ license_mapping = {
+ "ACM": get_license("ACM Software License Agreement"),
+ "AGPL":"wd:Q28130012",
+ "AGPL-3": "wd:Q27017232",
+ "Apache License": "wd:Q616526",
+ "Apache License 2.0": "wd:Q13785927",
+ "Apache License version 1.1": "wd:Q17817999",
+ "Apache License version 2.0": "wd:Q13785927",
+ "Artistic-2.0": "wd:Q14624826",
+ "Artistic License 2.0": "wd:Q14624826",
+ "BSD 2-clause License": "wd:Q18517294",
+ "BSD 3-clause License": "wd:Q18491847",
+ "BSD_2_clause": "wd:Q18517294",
+ "BSD_3_clause": "wd:Q18491847",
+ "BSL": "wd:Q2353141",
+ "BSL-1.0": "wd:Q2353141",
+ "CC0": "wd:Q6938433",
+ "CC BY 4.0": "wd:Q20007257",
+ "CC BY-SA 4.0": "wd:Q18199165",
+ "CC BY-NC 4.0": "wd:Q34179348",
+ "CC BY-NC-SA 4.0": "wd:Q42553662",
+ "CeCILL": "wd:Q1052189",
+ "CeCILL-2": "wd:Q19216649",
+ "Common Public License Version 1.0": "wd:Q2477807",
+ "CPL-1.0": "wd:Q2477807",
+ "Creative Commons Attribution 4.0 International License": "wd:Q20007257",
+ "EPL": "wd:Q1281977",
+ "EUPL": "wd:Q1376919",
+ "EUPL-1.1": "wd:Q1376919",
+ "file LICENCE": get_license("File License"),
+ "file LICENSE": get_license("File License"),
+ "FreeBSD": "wd:Q34236",
+ "GNU Affero General Public License": "wd:Q1131681",
+ "GNU General Public License": "wd:Q7603",
+ "GNU General Public License version 2": "wd:Q10513450",
+ "GNU General Public License version 3": "wd:Q10513445",
+ "GPL": "wd:Q7603",
+ "GPL-2": "wd:Q10513450",
+ "GPL-3": "wd:Q10513445",
+ "LGPL": "wd:Q192897",
+ "LGPL-2": "wd:Q23035974",
+ "LGPL-2.1": "wd:Q18534390",
+ "LGPL-3": "wd:Q18534393",
+ "Lucent Public License": "wd:Q6696468",
+ "MIT": "wd:Q334661",
+ "MIT License": "wd:Q334661",
+ "Mozilla Public License 1.1": "wd:Q26737735",
+ "Mozilla Public License 2.0": "wd:Q25428413",
+ "Mozilla Public License Version 2.0": "wd:Q25428413",
+ "MPL": "wd:Q308915",
+ "MPL version 1.0": "wd:Q26737738",
+ "MPL version 1.1": "wd:Q26737735",
+ "MPL version 2.0": "wd:Q25428413",
+ "MPL-1.1": "wd:Q26737735",
+ "MPL-2.0": "wd:Q25428413",
+ "Unlimited": get_license("Unlimited License"),
+ }
+
+ license_info = license_mapping.get(license_str)
+ if callable(license_info):
+ return license_info()
+ else:
+ return license_info
+
+
+
+[docs]
+ def get_wikidata_QID(self) -> Optional[str]:
+ """Get the Wikidata QID for the R package.
+
+ Searches for the R package in Wikidata using its label. Retrieves
+ the QID of matching entities and checks if there is an instance of
+ an R package. If so, returns the QID.
+
+ Returns:
+ Optional[str]:
+ The Wikidata QID of the R package if found, or None otherwise.
+ """
+ results = search_entities(
+ search_string=self.label,
+ mediawiki_api_url='https://www.wikidata.org/w/api.php'
+ )
+
+ for result in results:
+ item = self.api.item.get(
+ entity_id=result,
+ mediawiki_api_url='https://www.wikidata.org/w/api.php'
+ )
+ if 'P31' in item.claims.get_json().keys():
+ instance_claims = item.claims.get('P31')
+ if instance_claims:
+ for claim in instance_claims:
+ claim = claim.get_json()
+ if claim['mainsnak']['datatype'] == "wikibase-item":
+ # If instance of R package
+ if 'datavalue' in claim['mainsnak'].keys():
+ if claim['mainsnak']['datavalue']['value']['id'] == "Q73539779":
+ return result
+
+
+
+[docs]
+ def get_versions(self):
+ url = f"https://cran.r-project.org/src/contrib/Archive/{self.label}"
+
+ try:
+ page = requests.get(url)
+ soup = BeautifulSoup(page.content, 'lxml')
+ except:
+ log.warning(f"Version page for package {self.label} not found.")
+ else:
+ if soup.find_all('table'):
+ table = soup.find_all('table')[0]
+ versions_df = pd.read_html(StringIO(str(table)))
+ versions_df = versions_df[0]
+ versions_df = versions_df.drop(columns=['Unnamed: 0', 'Size', 'Description'])
+ versions_df = versions_df.drop(index= [0, 1])
+
+ for _, row in versions_df.iterrows():
+ name = row['Name']
+ publication_date = row['Last modified']
+ if isinstance(name, str):
+ version = re.sub(f'{self.label}_', '', name)
+ version = re.sub('.tar.gz', '', version)
+
+ publication_date = publication_date.split()[0]
+ publication_date = f"+{publication_date}T00:00:00Z"
+
+ self.versions.append((version, publication_date))
+
+
+
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Thu Feb 17 18:53:53 2022
+
+@author: alvaro
+"""
+
+
+
+[docs]
+class Importer:
+ """Controller class for importing data from an external source to the local Wikibase."""
+
+ def __init__(self, dataSource):
+ """
+ Construct.
+ Args:
+ entityCreator: object implementing AEntityCreator
+ dataSource: object implementig ADataSource
+ """
+ self.dataSource = dataSource
+
+
+[docs]
+ def import_all(self, pull=True, push=True):
+ """
+ Manages the import process.
+ """
+ self.dataSource.setup()
+ if pull:
+ self.dataSource.pull()
+ if push:
+ self.dataSource.push()
+
+
+
+
+
+[docs]
+class ADataSource:
+ """Abstract base class for reading data from external sources."""
+
+
+[docs]
+ def write_data_dump(self):
+ """
+ Write data dump from API.
+ """
+ raise NotImplementedError
+
+
+
+
+
+
+
+
+
+[docs]
+ def push(self):
+ """
+ Push data into the MaRDI knowledge graph.
+ """
+ raise NotImplementedError
+
+
+
+
+
+
+
+
+
+
+
+import re
+import sqlalchemy as db
+from sqlalchemy import and_
+
+from mardiclient import MardiItem, MardiProperty
+from wikibaseintegrator.wbi_exceptions import ModificationFailed
+from wikibaseintegrator.datatypes import ExternalID
+from wikibaseintegrator.wbi_enums import ActionIfExists
+from mardi_importer.importer import ImporterException
+
+
+[docs]
+class MardiItemEntity(MardiItem):
+
+
+
+
+
+[docs]
+ def get(self, entity_id, **kwargs):
+ json_data = super(MardiItemEntity, self)._get(entity_id=entity_id, **kwargs)
+ return MardiItemEntity(api=self.api).from_json(json_data=json_data['entities'][entity_id])
+
+
+
+[docs]
+ def get_QID(self, alias=False):
+ """Creates a list of QID of all items in the local wikibase with the
+ same label
+
+ Returns:
+ QIDs (list): List of QID
+ """
+
+ label = ""
+ if 'en' in self.labels.values:
+ label = self.labels.values['en'].value
+
+ def query_wikidata_table(field_type):
+ # field_type = 1 : Label
+ # field_type = 2 : Alias
+ # see: https://doc.wikimedia.org/Wikibase/REL1_41/php/docs_sql_wbt_type.html
+ entity_id = []
+ with self.api.engine.connect() as connection:
+ metadata = db.MetaData()
+ try:
+ wbt_item_terms = db.Table(
+ "wbt_item_terms", metadata, autoload_with=connection
+ )
+ wbt_term_in_lang = db.Table(
+ "wbt_term_in_lang", metadata, autoload_with=connection
+ )
+ wbt_text_in_lang = db.Table(
+ "wbt_text_in_lang", metadata, autoload_with=connection
+ )
+ wbt_text = db.Table(
+ "wbt_text", metadata, autoload_with=connection
+ )
+ query = (db.select(wbt_item_terms.columns.wbit_item_id)
+ .join(wbt_term_in_lang, wbt_item_terms.columns.wbit_term_in_lang_id == wbt_term_in_lang.columns.wbtl_id)
+ .join(wbt_text_in_lang, wbt_term_in_lang.columns.wbtl_text_in_lang_id == wbt_text_in_lang.columns.wbxl_id)
+ .join(wbt_text, wbt_text.columns.wbx_id == wbt_text_in_lang.columns.wbxl_text_id)
+ .where(and_(wbt_text.columns.wbx_text == bytes(label, "utf-8"),
+ wbt_term_in_lang.columns.wbtl_type_id == field_type,
+ wbt_text_in_lang.columns.wbxl_language == bytes("en", "utf-8"))))
+ results = connection.execute(query).fetchall()
+ if results:
+ for result in results:
+ entity_id.append(f"Q{str(result[0])}")
+
+ except Exception as e:
+ raise ImporterException(
+ "Error attempting to read mappings from database\n{}".format(e)
+ )
+
+ return entity_id
+
+ entity_id = query_wikidata_table(field_type=1)
+ if alias:
+ entity_id += query_wikidata_table(field_type=2)
+
+ return entity_id
+
+
+
+
+
+[docs]
+class MardiPropertyEntity(MardiProperty):
+
+
+
+
+
+[docs]
+ def get(self, entity_id, **kwargs):
+ json_data = super(MardiPropertyEntity, self)._get(entity_id=entity_id, **kwargs)
+ return MardiPropertyEntity(api=self.api).from_json(json_data=json_data['entities'][entity_id])
+
+
+
+[docs]
+ def get_PID(self):
+ """Returns the PID of the property with the same label
+ """
+
+ label = ""
+ if 'en' in self.labels.values:
+ label = self.labels.values['en'].value
+
+ with self.api.engine.connect() as connection:
+ metadata = db.MetaData()
+ try:
+ wbt_property_terms = db.Table(
+ "wbt_property_terms", metadata, autoload_with=connection
+ )
+ wbt_term_in_lang = db.Table(
+ "wbt_term_in_lang", metadata, autoload_with=connection
+ )
+ wbt_text_in_lang = db.Table(
+ "wbt_text_in_lang", metadata, autoload_with=connection
+ )
+ wbt_text = db.Table(
+ "wbt_text", metadata, autoload_with=connection
+ )
+ query = (db.select(wbt_property_terms.columns.wbpt_property_id)
+ .join(wbt_term_in_lang, wbt_term_in_lang.columns.wbtl_id == wbt_property_terms.columns.wbpt_term_in_lang_id)
+ .join(wbt_text_in_lang, wbt_term_in_lang.columns.wbtl_text_in_lang_id == wbt_text_in_lang.columns.wbxl_id)
+ .join(wbt_text, wbt_text.columns.wbx_id == wbt_text_in_lang.columns.wbxl_text_id)
+ .where(and_(wbt_text.columns.wbx_text == bytes(label, "utf-8"),
+ wbt_term_in_lang.columns.wbtl_type_id == 1,
+ wbt_text_in_lang.columns.wbxl_language == bytes("en", "utf-8"))))
+ prefix = "P"
+ results = connection.execute(query).fetchall()
+ if results:
+ for result in results:
+ return f"P{str(result[0])}"
+
+ except Exception as e:
+ raise ImporterException(
+ "Error attempting to read mappings from database\n{}".format(e)
+ )
+
+
+
+import os
+import re
+import sqlalchemy as db
+
+from .MardiEntities import MardiItemEntity, MardiPropertyEntity
+from mardiclient import MardiClient
+from wikibaseintegrator import wbi_login
+from wikibaseintegrator.models import Claim, Claims, Qualifiers, Reference
+from wikibaseintegrator.wbi_config import config as wbi_config
+from wikibaseintegrator.wbi_enums import ActionIfExists
+from wikibaseintegrator.wbi_helpers import search_entities, execute_sparql_query
+from wikibaseintegrator.datatypes import (URL, CommonsMedia, ExternalID, Form, GeoShape, GlobeCoordinate, Item, Lexeme, Math, MonolingualText, MusicalNotation, Property, Quantity,
+ Sense, String, TabularData, Time)
+
+
+[docs]
+class MardiIntegrator(MardiClient):
+ def __init__(self, languages=["en", "de"]) -> None:
+ super().__init__()
+ self.languages = languages
+
+ self.setup = True
+ self.login = self.config()
+ self.engine = self.create_engine()
+ self.create_db_table()
+
+ # local id of properties for linking to wikidata PID/QID
+ self.wikidata_PID = self.init_wikidata_PID() if self.setup else None
+ self.wikidata_QID = self.init_wikidata_QID() if self.setup else None
+
+ self.item = MardiItemEntity(api=self)
+ self.property = MardiPropertyEntity(api=self)
+
+ self.excluded_properties = ['P1151', 'P1855', 'P2139', 'P2302', \
+ 'P2559', 'P2875', 'P3254', 'P3709', \
+ 'P3713', 'P3734', 'P6104', 'P6685', \
+ 'P8093', 'P8979', 'P12861']
+
+
+[docs]
+ def config(self):
+ """
+ Sets up initial configuration for the integrator
+
+ Returns:
+ Clientlogin object
+ """
+ if os.environ.get("IMPORTER_USER") and os.environ.get("IMPORTER_PASS"):
+ wbi_config["USER_AGENT"] = os.environ.get("IMPORTER_AGENT")
+ wbi_config["MEDIAWIKI_API_URL"] = os.environ.get("MEDIAWIKI_API_URL")
+ wbi_config["SPARQL_ENDPOINT_URL"] = os.environ.get("SPARQL_ENDPOINT_URL")
+ wbi_config["WIKIBASE_URL"] = os.environ.get("WIKIBASE_URL")
+ return wbi_login.Clientlogin(
+ user=os.environ.get("IMPORTER_USER"),
+ password=os.environ.get("IMPORTER_PASS"),
+ )
+ else:
+ self.setup = False
+
+
+
+[docs]
+ def create_engine(self):
+ """
+ Creates SQLalchemy engine
+
+ Returns:
+ SQLalchemy engine
+ """
+ if self.setup:
+ db_user = os.environ["DB_USER"]
+ db_pass = os.environ["DB_PASS"]
+ db_name = os.environ["DB_NAME"]
+ db_host = os.environ["DB_HOST"]
+ return db.create_engine(
+ f"mysql+mysqlconnector://{db_user}:{db_pass}@{db_host}/{db_name}"
+ )
+
+
+
+[docs]
+ def create_id_list_from_file(self, file):
+ """Function for creating a list of ids
+ from a while where each id is in a new line
+
+ Args:
+ file: path to file
+
+ Returns: list of ids
+ """
+ id_list = []
+ with open(file, "r") as file:
+ for line in file:
+ id_list.append(line.strip())
+ return id_list
+
+
+
+[docs]
+ def create_db_table(self):
+ """
+ Check if db table for id mapping is there; if not, create.
+
+ Args:
+ None
+
+ Returns:
+ None
+ """
+ if self.engine:
+ with self.engine.connect() as connection:
+ metadata = db.MetaData()
+ if not db.inspect(self.engine).has_table("wb_id_mapping"):
+ mapping_table = db.Table(
+ "wb_id_mapping",
+ metadata,
+ db.Column("id", db.Integer, primary_key=True),
+ db.Column("wikidata_id", db.String(24), nullable=False),
+ db.Column("local_id", db.String(24), nullable=False),
+ db.Column("has_all_claims", db.Boolean(), nullable=False),
+ db.UniqueConstraint("wikidata_id"),
+ db.UniqueConstraint("local_id"),
+ )
+ metadata.create_all(self.engine)
+
+
+
+[docs]
+ def insert_id_in_db(self, wikidata_id, local_id, has_all_claims):
+ """
+ Insert wikidata_id, local_id and has_all_claims into mapping table.
+
+ Args:
+ wikidata_id: Wikidata id
+ local_id: local Wikibase id
+ has_all_claims: Boolean indicating whether the entity has been
+ imported with all claims or no claims (i.e. no recurse)
+
+ Returns:
+ None
+ """
+ metadata = db.MetaData()
+ table = db.Table(
+ "wb_id_mapping",
+ metadata,
+ autoload_with=self.engine
+ )
+
+ ins = table.insert().values(
+ wikidata_id=wikidata_id,
+ local_id=local_id,
+ has_all_claims=has_all_claims
+ )
+
+ with self.engine.connect() as connection:
+ connection.execute(ins)
+ connection.commit()
+
+
+
+[docs]
+ def update_has_all_claims(self, wikidata_id):
+ """
+ Set the has_all_claims property in the wb_id_mapping table
+ to True for the given wikidata_id.
+
+ Args:
+ wikidata_id: Wikidata id to be updated.
+
+ Returns:
+ None
+ """
+ metadata = db.MetaData()
+ table = db.Table(
+ "wb_id_mapping",
+ metadata,
+ autoload_with=self.engine
+ )
+
+ ins = table.update().values(
+ has_all_claims=True
+ ).where(table.c.wikidata_id == wikidata_id)
+
+ with self.engine.connect() as connection:
+ connection.execute(ins)
+ connection.commit()
+
+
+
+[docs]
+ def init_wikidata_PID(self):
+ """
+ Searches the wikidata PID property ID to link
+ properties to its ID in wikidata. When not found,
+ it creates the property.
+
+ Returns
+ wikidata_PID (str): wikidata PID property ID
+ """
+ label = "Wikidata PID"
+ wikidata_PID = self.get_local_id_by_label(label, "property")
+ if not wikidata_PID:
+ prop = self.property.new()
+ prop.labels.set(language="en", value=label)
+ prop.descriptions.set(
+ language="en",
+ value="Identifier in Wikidata of the corresponding properties"
+ )
+ prop.datatype = "external-id"
+ wikidata_PID = prop.write(login=self.login, as_new=True).id
+ return wikidata_PID
+
+
+
+[docs]
+ def init_wikidata_QID(self):
+ """
+ Searches the wikidata QID property ID to link
+ items to its ID in wikidata. When not found,
+ it creates the property.
+
+ Returns
+ wikidata_QID (str): wikidata QID property ID
+ """
+ label = "Wikidata QID"
+ wikidata_QID = self.get_local_id_by_label(label, "property")
+ if not wikidata_QID:
+ prop = self.property.new()
+ prop.labels.set(language="en", value=label)
+ prop.descriptions.set(
+ language="en",
+ value="Corresponding QID in Wikidata"
+ )
+ prop.datatype = "external-id"
+ wikidata_QID = prop.write(login=self.login, as_new=True).id
+ return wikidata_QID
+
+
+
+[docs]
+ def import_entities(self, id_list=None, filename="", recurse=True):
+ """Function for importing entities from wikidata
+ into the local instance.
+
+ It can accept a single id, a list of ids or a file containing
+ a the ids to be imported.
+
+ Args:
+ id_list: Single string or list of strings of wikidata
+ entity ids. Lexemes not supported.
+ filename: Filename containing list of entities to be
+ imported.
+ recurse: Whether to import claims for the entities in
+ id_list
+
+ Returns:
+ Imported entities (Dict): Dictionary containing the local ids of
+ all the imported entities.
+ """
+ imported_entities = {}
+ if filename: id_list = self.create_id_list_from_file(filename)
+ if isinstance(id_list, str): id_list = [id_list]
+
+ for wikidata_id in id_list:
+
+ if wikidata_id.startswith("L"):
+ print(
+ f"Warning: Lexemes not supported. Lexeme {wikidata_id} was not imported"
+ )
+ continue
+
+ print(f"importing entity {wikidata_id}")
+
+ has_all_claims = self.query('has_all_claims', wikidata_id)
+ if not has_all_claims:
+ # API call
+ entity = self.get_wikidata_information(
+ wikidata_id,
+ recurse
+ )
+
+ if not entity:
+ print(f"No labels for entity with id {wikidata_id}, skipping")
+ continue
+
+ if entity.type == "property" and entity.datatype.value in \
+ ["wikibase-lexeme", "wikibase-sense", "wikibase-form"]:
+ print(f"Warning: Lexemes not supported. Property skipped")
+ continue
+
+ # Check if there is an internal ID redirection in Wikidata
+ if wikidata_id != entity.id:
+ wikidata_id = entity.id
+ has_all_claims = self.query('has_all_claims', wikidata_id)
+ if has_all_claims:
+ imported_entities[wikidata_id] = self.query('local_id', wikidata_id)
+ continue
+
+ if recurse:
+ self.convert_claim_ids(entity)
+
+ entity.add_linker_claim(wikidata_id)
+
+ local_id = entity.exists()
+ if not local_id:
+ local_id = self.query('local_id', wikidata_id)
+
+ if local_id:
+ # Update existing entity
+ if entity.type == "item":
+ local_entity = self.item.get(entity_id=local_id)
+ elif entity.type == "property":
+ local_entity = self.property.get(entity_id=local_id)
+ # replace descriptions
+ local_entity.descriptions = entity.descriptions
+ # add new claims if they are different from old claims
+ local_entity.claims.add(
+ entity.claims,
+ ActionIfExists.APPEND_OR_REPLACE,
+ )
+ local_entity.write(login=self.login)
+ if self.query('local_id', wikidata_id) and recurse:
+ self.update_has_all_claims(wikidata_id)
+ else:
+ self.insert_id_in_db(wikidata_id, local_id, has_all_claims=recurse)
+ else:
+ # Create entity
+ local_id = entity.write(login=self.login, as_new=True).id
+ self.insert_id_in_db(wikidata_id, local_id, has_all_claims=recurse)
+
+ if has_all_claims:
+ imported_entities[wikidata_id] = self.query('local_id', wikidata_id)
+ else:
+ imported_entities[wikidata_id] = local_id
+
+ if len(imported_entities) == 1:
+ return list(imported_entities.values())[0]
+ return imported_entities
+
+
+
+[docs]
+ def overwrite_entity(self, wikidata_id, local_id):
+ """Function for completing an already existing local entity
+ with its statements from wikidata.
+
+ Args:
+ wikidata_id: Wikidata entity ID to be imported.
+ local_id: Local id of the existing entity that needs to
+ be completed with further statements.
+
+ Returns:
+ local_id: Local entity ID
+ """
+ if wikidata_id.startswith("L"):
+ print(
+ f"Warning: Lexemes not supported. Lexeme {wikidata_id} was not imported"
+ )
+
+ print(f"Overwriting entity {local_id}")
+
+ has_all_claims = self.query('has_all_claims', wikidata_id)
+ if has_all_claims:
+ return self.query('local_id', wikidata_id)
+ else:
+ # API call
+ entity = self.get_wikidata_information(
+ wikidata_id,
+ recurse=True
+ )
+
+ if entity:
+
+ # Check if there is an entity ID redirection in Wikidata
+ if wikidata_id != entity.id:
+ wikidata_id = entity.id
+ has_all_claims = self.query('has_all_claims', wikidata_id)
+ if has_all_claims:
+ return self.query('local_id', wikidata_id)
+
+ self.convert_claim_ids(entity)
+ entity.add_linker_claim(wikidata_id)
+
+ # Retrieve existing entity
+ if entity.type == "item":
+ local_entity = self.item.get(entity_id=local_id)
+ elif entity.type == "property":
+ local_entity = self.property.get(entity_id=local_id)
+ # replace descriptions
+ local_entity.descriptions = entity.descriptions
+ # add new claims if they are different from old claims
+ local_entity.claims.add(
+ entity.claims,
+ ActionIfExists.APPEND_OR_REPLACE,
+ )
+ local_entity.write(login=self.login)
+ if self.query('local_id', wikidata_id):
+ self.update_has_all_claims(wikidata_id)
+ else:
+ self.insert_id_in_db(wikidata_id, local_id, has_all_claims=True)
+
+ return local_id
+
+
+
+[docs]
+ def import_claim_entities(self, wikidata_id):
+ """Function for importing entities that are mentioned
+ in claims from wikidata to the local wikibase instance
+
+ Args:
+ wikidata_id(str): id of the entity to be imported
+
+ Returns:
+ local id or None, if the entity had no labels
+ """
+ local_id = self.query('local_id', wikidata_id)
+ if local_id: return local_id
+ else:
+ entity = self.get_wikidata_information(wikidata_id)
+
+ if not entity:
+ return None
+
+ if entity.type == "property" and \
+ entity.datatype.value in ["wikibase-lexeme", \
+ "wikibase-sense", "wikibase-form"]:
+ return None
+
+ elif wikidata_id != entity.id:
+ wikidata_id = entity.id
+ local_id = self.query('local_id', wikidata_id)
+ if local_id: return local_id
+
+ # Check if the entity has been redirected by Wikidata
+ # into another entity that has already been imported
+ local_id = self.query('local_id', entity.id)
+ if local_id: return local_id
+
+ local_id = entity.exists()
+ if local_id:
+ if entity.type == "item":
+ new_entity = self.item.get(entity_id=local_id)
+ elif entity.type == "property":
+ new_entity = self.property.get(entity_id=local_id)
+ # replace descriptions
+ new_entity.descriptions = entity.descriptions
+ entity = new_entity
+ entity.add_linker_claim(wikidata_id)
+ local_id = entity.write(login=self.login).id
+ else:
+ entity.add_linker_claim(wikidata_id)
+ local_id = entity.write(login=self.login, as_new=True).id
+
+ self.insert_id_in_db(wikidata_id, local_id, has_all_claims=False)
+ return local_id
+
+
+
+[docs]
+ def get_wikidata_information(self, wikidata_id, recurse=False):
+ """Function for pulling wikidata information
+
+ Args:
+ wikidata_id (str): wikidata id of the desired entity
+ recurse (Bool): if claims should also be imported
+
+ Returns: wikibase integrator entity or None, if the entity has no labels
+
+ """
+ if wikidata_id.startswith("Q"):
+ entity = self.item.get(
+ entity_id=wikidata_id,
+ mediawiki_api_url='https://www.wikidata.org/w/api.php'
+ )
+ elif wikidata_id.startswith("P"):
+ entity = self.property.get(
+ entity_id=wikidata_id,
+ mediawiki_api_url='https://www.wikidata.org/w/api.php'
+ )
+ else:
+ raise Exception(
+ f"Wrong ID format, should start with P, L or Q but ID is {wikidata_id}"
+ )
+ if not self.languages == "all":
+ # set labels in desired languages
+ label_dict = {
+ k: entity.labels.values[k]
+ for k in self.languages
+ if k in entity.labels.values
+ }
+ # if there are no labels, this is not
+ # a valid entity
+ if not label_dict:
+ return None
+ entity.labels.values = label_dict
+
+ # set descriptions in desired languages
+ description_dict = {
+ k: entity.descriptions.values[k]
+ for k in self.languages
+ if k in entity.descriptions.values
+ }
+ entity.descriptions.values = description_dict
+
+ # make sure label != description (e.g. wdt:P121)
+ for k in self.languages:
+ if (label_dict.get(k) and
+ label_dict.get(k) == description_dict.get(k)):
+ entity.descriptions.set(
+ language=k,
+ value=None
+ )
+
+ # set aliases in desired languages
+ alias_dict = {
+ k: entity.aliases.aliases[k]
+ for k in self.languages
+ if k in entity.aliases.aliases
+ }
+ entity.aliases.aliases = alias_dict
+ if not recurse:
+ entity.claims = Claims()
+ return entity
+
+
+
+[docs]
+ def convert_claim_ids(self, entity):
+ """Function for in-place conversion of wikidata
+ ids found in claims into local ids
+
+ Args:
+ entity
+
+ Returns:
+ None
+ """
+ entity_names = [
+ "wikibase-item",
+ "wikibase-property",
+ ]
+ claims = entity.claims.claims
+ new_claims = {}
+ # structure of claims: Dict[str,List[Claim]]
+ # where str is the property id
+ for prop_id, claim_list in claims.items():
+ local_claim_list = []
+ if prop_id not in self.excluded_properties:
+ local_prop_id = self.import_claim_entities(wikidata_id=prop_id)
+ if not local_prop_id:
+ print("Warning: local id skipped")
+ continue
+ for c in claim_list:
+ c_dict = c.get_json()
+ if c_dict["mainsnak"]["datatype"] in entity_names:
+ if "datavalue" in c_dict["mainsnak"]:
+ local_mainsnak_id = self.import_claim_entities(
+ wikidata_id=c_dict["mainsnak"]["datavalue"]["value"]["id"],
+ )
+ if not local_mainsnak_id:
+ continue
+ c_dict["mainsnak"]["datavalue"]["value"][
+ "id"
+ ] = local_mainsnak_id
+ c_dict["mainsnak"]["datavalue"]["value"]["numeric-id"] = int(
+ local_mainsnak_id[1:]
+ )
+ c_dict["mainsnak"]["property"] = local_prop_id
+ # to avoid problem with missing reference hash
+ if "references" in c_dict:
+ c_dict.pop("references")
+ new_c = Claim().from_json(c_dict)
+ new_c.id = None
+ else:
+ continue
+ elif c_dict["mainsnak"]["datatype"] in ["wikibase-lexeme", "wikibase-sense", "wikibase-form"]:
+ continue
+ else:
+ self.convert_entity_links(snak=c_dict["mainsnak"])
+ new_c = c
+ new_c.mainsnak.property_number = local_prop_id
+ new_c.id = None
+ # get reference details
+ new_references = self.get_references(c)
+ if new_references:
+ new_c.references.references = new_references
+ # get qualifier details
+ new_qualifiers = self.get_qualifiers(c)
+ new_c.qualifiers = new_qualifiers
+ local_claim_list.append(new_c)
+ new_claims[local_prop_id] = local_claim_list
+ entity.claims.claims = new_claims
+
+
+
+[docs]
+ def get_references(self, claim):
+ """Function for creating references from wikidata references
+ and in place adding them to the claim
+
+ Args:
+ claim: a wikibaseintegrator claim
+
+ Returns:
+ List with references, can also be an empty list
+ """
+ entity_names = [
+ "wikibase-item",
+ "wikibase-property",
+ ]
+ # format: List(Reference)
+ ref_list = claim.references.references
+ if not ref_list:
+ return None
+ new_ref_list = []
+ for ref in ref_list:
+ new_snak_dict = {}
+ snak_dict = ref.get_json()
+ for prop_id, snak_list in snak_dict["snaks"].items():
+ new_snak_list = []
+ new_prop_id = self.import_claim_entities(
+ wikidata_id=prop_id,
+ )
+ if not new_prop_id:
+ continue
+ for snak in snak_list:
+ if snak["datatype"] in entity_names:
+ if not "datavalue" in snak:
+ continue
+ new_snak_id = self.import_claim_entities(
+ wikidata_id=snak["datavalue"]["value"]["id"],
+ )
+ if not new_snak_id:
+ continue
+ snak["datavalue"]["value"]["id"] = new_snak_id
+ snak["datavalue"]["value"]["numeric-id"] = int(new_snak_id[1:])
+ elif snak["datatype"] in ["wikibase-lexeme", "wikibase-sense", "wikibase-form"]:
+ continue
+ else:
+ self.convert_entity_links(
+ snak=snak,
+ )
+ snak["property"] = new_prop_id
+ new_snak_list.append(snak)
+ new_snak_dict[new_prop_id] = new_snak_list
+ complete_new_snak_dict = {}
+ complete_new_snak_dict["hash"] = None
+ complete_new_snak_dict["snaks"] = new_snak_dict
+ complete_new_snak_dict["snaks-order"] = []
+ r = Reference()
+ new_ref_list.append(r.from_json(json_data=complete_new_snak_dict))
+ return new_ref_list
+
+
+
+[docs]
+ def get_qualifiers(self, claim):
+ """Function for creating qualifiers from wikidata qualifiers
+ and in place adding them to the claim
+
+ Args:
+ claim: a wikibaseintegrator claim
+
+ Returns:
+ Qualifiers object, can also be an empty object
+ """
+ entity_names = [
+ "wikibase-item",
+ "wikibase-property",
+ ]
+ qual_dict = claim.qualifiers.get_json()
+ new_qual_dict = {}
+ for qual_id, qual_list in qual_dict.items():
+ new_qual_id = self.import_claim_entities(wikidata_id=qual_id)
+ if not new_qual_id:
+ continue
+ new_qual_list = []
+ for qual_val in qual_list:
+ if qual_val["datatype"] in entity_names:
+ if not "datavalue" in qual_val:
+ continue
+ new_qual_val_id = self.import_claim_entities(
+ wikidata_id=qual_val["datavalue"]["value"]["id"],
+ )
+ if not new_qual_val_id:
+ continue
+ qual_val["datavalue"]["value"]["id"] = new_qual_val_id
+ qual_val["datavalue"]["value"]["numeric-id"] = int(
+ new_qual_val_id[1:]
+ )
+ elif qual_val["datatype"] in ["wikibase-lexeme", "wikibase-sense", "wikibase-form"]:
+ continue
+ else:
+ self.convert_entity_links(
+ snak=qual_val,
+ )
+ qual_val["property"] = new_qual_id
+ new_qual_list.append(qual_val)
+ new_qual_dict[new_qual_id] = new_qual_list
+ q = Qualifiers()
+ qualifiers = q.from_json(json_data=new_qual_dict)
+ return qualifiers
+
+
+
+[docs]
+ def convert_entity_links(self, snak):
+ """Function for in-place conversion of unit for quantity
+ and globe for globecoordinate to a link to the local entity
+ instead of a link to the wikidata entity.
+
+ Args:
+ snak: a wikibaseintegrator snak
+
+ Returns:
+ None
+ """
+ if "datatype" not in snak or "datavalue" not in snak:
+ return
+ data = snak["datavalue"]["value"]
+ if snak["datatype"] == "quantity":
+ if "unit" in data:
+ link_string = data["unit"]
+ key_string = "unit"
+ elif snak["datatype"] == "globe-coordinate":
+ #if "globe" in data:
+ # link_string = data["globe"]
+ # key_string = "globe"
+ if not data["precision"]:
+ data["precision"] = 1/3600
+ return
+ else:
+ return
+ if "www.wikidata.org/" in link_string:
+ uid = link_string.split("/")[-1]
+ local_id = self.import_claim_entities(
+ wikidata_id=uid,
+ )
+ data[key_string] = wbi_config["WIKIBASE_URL"] + "/entity/" + local_id
+
+
+
+[docs]
+ def query(self, parameter, wikidata_id):
+ """Query the wb_id_mapping db table for a given parameter.
+
+ The two important parameters are the local_id and whether the
+ entity has already been imported with all claims
+
+ Args:
+ parameter (str): Either local_id or has_all_claims
+ wikidata_id (str): Wikidata ID
+ Returns:
+ str or boolean: for local_id returns the local ID if it exists,
+ otherwise None. For has_all_claims, a boolean is returned.
+ """
+ metadata = db.MetaData()
+ table = db.Table(
+ "wb_id_mapping",
+ metadata,
+ autoload_with=self.engine
+ )
+ if parameter in ['local_id', 'has_all_claims']:
+ sql = db.select(table.columns[parameter]).where(
+ table.columns.wikidata_id == wikidata_id,
+ )
+ with self.engine.connect() as connection:
+ db_result = connection.execute(sql).fetchone()
+ if db_result:
+ return db_result[0]
+
+
+
+[docs]
+ def query_with_local_id(self, parameter, local_id):
+ """Query the wb_id_mapping db table for a given parameter.
+
+ The two important parameters are the wikidata_id and whether the
+ entity has already been imported with all claims
+
+ Args:
+ parameter (str): Either wikidata_id or has_all_claims
+ local_id (str): local ID
+ Returns:
+ str or boolean: for wikidata_id returns the wikidata ID if it exists,
+ otherwise None. For has_all_claims, a boolean is returned.
+ """
+ metadata = db.MetaData()
+ table = db.Table(
+ "wb_id_mapping",
+ metadata,
+ autoload_with=self.engine
+ )
+ if parameter in ['wikidata_id', 'has_all_claims']:
+ sql = db.select(table.columns[parameter]).where(
+ table.columns.local_id == local_id,
+ )
+ with self.engine.connect() as connection:
+ db_result = connection.execute(sql).fetchone()
+ if db_result:
+ return db_result[0]
+
+
+
+[docs]
+ def get_local_id_by_label(self, entity_str, entity_type):
+ """Check if entity with a given label or wikidata PID/QID
+ exists in the local wikibase instance.
+
+ Args:
+ entity_str (str): It can be a string label or a wikidata ID,
+ specified with the prefix wdt: for properties and wd:
+ for items.
+ entity_type (str): Either 'property' or 'item' to specify
+ which type of entity to look for.
+
+ Returns:
+ str: Local ID of the entity, if found.
+ """
+ if re.match("^[PQ]\d+$", entity_str):
+ return entity_str
+ elif not entity_str.startswith("wdt:") and not entity_str.startswith("wd:"):
+ if entity_type == "property":
+ new_property = MardiPropertyEntity(api=self).new()
+ new_property.labels.set(language='en', value=entity_str)
+ return new_property.get_PID()
+ elif entity_type == "item":
+ new_item = MardiItemEntity(api=self).new()
+ new_item.labels.set(language='en', value=entity_str)
+ return new_item.get_QID()
+ elif entity_str.startswith("wdt:"):
+ wikidata_id = entity_str[4:]
+ elif entity_str.startswith("wd:"):
+ wikidata_id = entity_str[3:]
+
+ with self.engine.connect() as connection:
+ metadata = db.MetaData()
+ table = db.Table(
+ "wb_id_mapping", metadata, autoload_with=connection
+ )
+ sql = db.select(table.columns.local_id).where(
+ table.columns.wikidata_id == wikidata_id,
+ )
+ db_result = connection.execute(sql).fetchone()
+ if db_result:
+ return db_result[0]
+
+
+
+[docs]
+ def import_from_label(self, label):
+ """
+ Imports an entity from Wikidata just from a label
+
+ Args:
+ label (str): label to be imported from wikidata
+
+ Returns:
+ local_id (str): local id for the imported entity
+ """
+ results = search_entities(label,
+ dict_result=True,
+ mediawiki_api_url='https://www.wikidata.org/w/api.php')
+ for result in results:
+ if label == result['label']:
+ return self.import_entities(result['id'])
+ if label.lower() == result['label'].lower():
+ return self.import_entities(result['id'])
+ if result['aliases']:
+ if label.lower() == result['aliases'][0].lower():
+ return self.import_entities(result['id'])
+
+
+
+import json
+import os
+import re
+import sys
+import time
+import traceback
+import xml.etree.ElementTree as ET
+
+from datetime import datetime
+from habanero import Crossref # , RequestError
+from requests.exceptions import HTTPError, ContentDecodingError
+from sickle import Sickle
+import pandas as pd
+import requests
+from time import sleep
+from ast import literal_eval
+
+from mardi_importer.integrator import MardiIntegrator
+from mardi_importer.importer import ADataSource
+from .ZBMathPublication import ZBMathPublication
+from .ZBMathAuthor import ZBMathAuthor
+from .ZBMathJournal import ZBMathJournal
+from .misc import get_tag, get_info_from_doi
+
+
+
+[docs]
+class ZBMathSource(ADataSource):
+ """Reads data from zb math API."""
+
+ def __init__(
+ self,
+ out_dir,
+ tags,
+ from_date=None,
+ until_date=None,
+ raw_dump_path=None,
+ processed_dump_path=None,
+ split_id=None,
+ ): # , path
+ """
+ Args:
+ out_dir (string): target directory for saved files
+ tags (list): list of tags to extract from the zbMath response
+ from_date (string, optional): earliest date from when to pull information
+ until_date (string, optional): latest date from when to pull information
+ raw_dump_path (string, optional): path where the raw data dump is located, in case it has previously been pulled
+ processed_dump_path (string, optional): path to the processed dump file
+ split_id (string, optional): zbMath id from where to start processing the raw dump, in case it aborted mid-processing
+ """
+ # load the list of swMath software
+ # software_df = pd.read_csv(path)
+ # self.software_list = software_df['Software'].tolist()
+ if out_dir[-1] != "/":
+ out_dir = out_dir + "/"
+ self.out_dir = out_dir
+ self.split_id = split_id
+ if self.split_id:
+ self.split_mode = True
+ else:
+ self.split_mode = False
+ self.from_date = from_date
+ self.until_date = until_date
+ self.tags = tags
+ self.integrator = MardiIntegrator()
+ self.conflict_string = (
+ "zbMATH Open Web Interface contents unavailable due to conflicting licenses"
+ )
+ self.raw_dump_path = raw_dump_path
+ self.filepath = os.path.realpath(os.path.dirname(__file__))
+ self.processed_dump_path = processed_dump_path
+ self.namespace = "http://www.openarchives.org/OAI/2.0/"
+ self.preview_namespace = "https://zbmath.org/OAI/2.0/oai_zb_preview/"
+ self.tag_namespace = "https://zbmath.org/zbmath/elements/1.0/"
+ self.conflict_text = "zbMATH Open Web Interface contents unavailable due to conflicting licenses."
+ # dict for counting how often a doi was not found and which agency it was registered with
+ self.unknown_doi_agency_dict = {"Crossref": [], "crossref": [], "nonsense": []}
+ # tags that will not be found in doi query
+ self.internal_tags = ["author_id", "source", "classifications", "links"]
+ self.existing_authors = {}
+ self.existing_journals = {}
+
+
+[docs]
+ def setup(self):
+ """Create all necessary properties and entities for zbMath"""
+ # Import entities from Wikidata
+ filename = self.filepath + "/wikidata_entities.txt"
+ self.integrator.import_entities(filename=filename)
+ #self.create_local_entities()
+ self.de_number_prop = self.integrator.get_local_id_by_label(
+ "zbMATH DE Number", "property"
+ )
+ self.keyword_prop = self.integrator.get_local_id_by_label(
+ "zbMATH Keywords", "property"
+ )
+
+
+
+[docs]
+ def create_local_entities(self):
+ filename = self.filepath + "/new_entities.json"
+ f = open(filename)
+ entities = json.load(f)
+
+ for prop_element in entities["properties"]:
+ prop = self.integrator.property.new()
+ prop.labels.set(language="en", value=prop_element["label"])
+ prop.descriptions.set(language="en", value=prop_element["description"])
+ prop.datatype = prop_element["datatype"]
+ if not prop.exists():
+ prop.write()
+
+ for item_element in entities["items"]:
+ item = self.integrator.item.new()
+ item.labels.set(language="en", value=item_element["label"])
+ item.descriptions.set(language="en", value=item_element["description"])
+ if "claims" in item_element:
+ for key, value in item_element["claims"].items():
+ item.add_claim(key, value=value)
+ if not item.exists():
+ item.write()
+
+
+
+
+
+
+
+[docs]
+ def get_line(values):
+ new_values = []
+ for x in values:
+ x = str(x)
+ x = x.replace("\t", " ")
+ new_values.append(x)
+ return("\t".join(new_values) + "\n")
+
+
+
+[docs]
+ def write_data_dump(self):
+ """
+ Overrides abstract method.
+ This method queries the zbMath API to get a data dump of all records,
+ optionally between from_date and until_date
+ """
+ url = "https://api.zbmath.org/v1/document/_all"
+ timestr = time.strftime("%Y%m%d-%H%M%S")
+ self.raw_dump_path = self.out_dir + "raw_zbmath_data_dump" + timestr + ".txt"
+ headers = ['biographic_references', 'contributors', 'database', 'datestamp', 'document_type', 'editorial_contributions', 'id', 'identifier', 'keywords', 'language', 'license', 'links', 'msc', 'references', 'source', 'states', 'title', 'year', 'zbmath_url']
+ with open(self.raw_dump_path, "a+") as f:
+ f.write("\t".join(headers) + "\n")
+ start_after = 0
+ retries = 0
+ max_retries = 5
+ while True:
+ results = []
+ params = {"start_after": start_after,
+ "results_per_request": 500}
+ response = requests.get(url, params=params)
+ if response.status_code == 200:
+ retries = 0
+ data=response.json()
+ if not data["result"]:
+ break
+ results.extend(data["result"])
+ start_after = data["status"]["last_id"]
+ for r in results:
+ if list(r.keys()) != headers:
+ print(f"wrong headers in {r}")
+ break
+ f.write(get_line(r.values()))
+ f.flush()
+ os.fsync(f)
+ elif response.status_code == 502 and retries < max_retries:
+ print("Encountered 502 error, retrying...")
+ retries += 1
+ sleep(5)
+ continue
+ else:
+ print(f"Failed to retrieve data: {response.status_code}")
+ break
+
+
+
+
+[docs]
+ def old_write_data_dump(self):
+ """
+ Overrides abstract method.
+ This method queries the zbMath API to get a data dump of all records,
+ optionally between from_date and until_date
+ """
+ timestr = time.strftime("%Y%m%d-%H%M%S")
+ self.raw_dump_path = self.out_dir + "raw_zbmath_data_dump" + timestr + ".txt"
+ sickle = Sickle("https://oai.zbmath.org/v1")
+ # date has to have format like 2012-12-12
+ if self.from_date and self.until_date:
+ records = sickle.ListRecords(
+ **{
+ "metadataPrefix": "oai_zb_preview",
+ "from": self.from_date,
+ "until": self.until_date,
+ }
+ )
+ elif self.from_date:
+ records = sickle.ListRecords(
+ **{"metadataPrefix": "oai_zb_preview", "from": self.from_date}
+ )
+ elif self.until_date:
+ records = sickle.ListRecords(
+ **{"metadataPrefix": "oai_zb_preview", "until": self.until_date}
+ )
+ else:
+ records = sickle.ListRecords(metadataPrefix="oai_zb_preview")
+ with open(self.raw_dump_path, "w+") as f:
+ for rec in records:
+ f.write(rec.raw + "\n")
+
+
+
+[docs]
+ def process_data(self):
+ """
+ Overrides abstract method.
+ Reads a raw zbMath data dump and processes it, then saves it as a csv.
+ """
+ if not self.processed_dump_path:
+ timestr = time.strftime("%Y%m%d-%H%M%S")
+ self.processed_dump_path = (
+ self.out_dir + "zbmath_data_dump" + timestr + ".csv"
+ )
+ with open(self.processed_dump_path, "a") as outfile:
+ outfile.write(
+ "de_number\t"
+ + "creation_date\t"
+ + ("\t").join(self.tags)
+ + "_text\treview_sign\treviewer_id\n"
+ )
+
+ #df = pd.read_csv(self.raw_dump_path, sep = "\t")
+ found = False
+ for chunk in pd.read_csv(self.raw_dump_path, sep = "\t", chunksize=2000):
+ for _, row in chunk.iterrows():
+ record = {}
+ record["de_number"] = row["id"]
+ # if row["id"] == 2522407:
+ # found = True
+ # continue
+ # if not found:
+ # continue
+ record["creation_date"] = row["datestamp"]
+ authors = []
+ author_ids = []
+ for d in literal_eval(row["contributors"])["authors"]:
+ authors.append(d['name'])
+ if d["codes"]:
+ author_ids.append(d["codes"][0])
+ else:
+ author_ids.append("None")
+ record["author"] = ";".join(authors)
+ record["author_ids"] = ";".join(author_ids)
+ title = literal_eval(row["title"])["title"]
+ record["document_title"] = title
+ record["source"] = literal_eval(row["source"])["source"]
+ msc = []
+ for d in literal_eval(row["msc"]):
+ msc.append(d["code"])
+ record["classifications"] = ";".join(msc)
+ if literal_eval(row["language"])["languages"]:
+ record["language"] = literal_eval(row["language"])["languages"][0]
+ links = []
+ doi = None
+ for d in literal_eval(row["links"]):
+ if "type" not in d:
+ continue
+ if d["type"] in ["http", "https"]:
+ if d['url'] is not None and d['url'] != "None":
+ links.append(d['url'])
+ elif d["type"] == "doi":
+ doi = d["identifier"]
+ record["links"] = ";".join(links)
+ record["keywords"] = ";".join([x for x in literal_eval(row["keywords"]) if x])
+ record["doi"] = doi
+ record["publication_year"] = row["year"]
+ if literal_eval(row["source"])["series"]:
+ record["serial"] = literal_eval(row["source"])["series"][0]["title"]
+ record["zbl_id"] = row["identifier"]
+ ref_ids = []
+ for d in literal_eval(row["references"]):
+ ref_ids.append(str(d["zbmath"]["document_id"]))
+ record["references"] = ";".join(ref_ids)
+ for d in literal_eval(row["editorial_contributions"]):
+ if d["contribution_type"] == "review":
+ review_text = d["text"]
+ row["review_text"] = review_text
+ row["review_sign"] = d["reviewer"]["name"]
+ row["reviewer_id"] = d["reviewer"]["author_code"]
+ break
+ if record:
+ for key, value in record.items():
+ if isinstance(value, str):
+ record[key] = value.replace("\t", " ").replace("\n", " ")
+ outfile.write(
+ "\t".join(str(x) for x in record.values()) + "\n"
+ )
+
+
+
+[docs]
+ def old_process_data(self):
+ """
+ Overrides abstract method.
+ Reads a raw zbMath data dump and processes it, then saves it as a csv.
+ """
+ if not (self.processed_dump_path and self.split_mode):
+ timestr = time.strftime("%Y%m%d-%H%M%S")
+ self.processed_dump_path = (
+ self.out_dir + "zbmath_data_dump" + timestr + ".csv"
+ )
+ # def do_all(xml_file, out_file):
+ with open(self.raw_dump_path) as infile:
+ with open(self.processed_dump_path, "a") as outfile:
+ # if we are not continuing with a pre-filled file
+ if not self.split_mode:
+ outfile.write(
+ "de_number\t"
+ + "creation_date\t"
+ + ("\t").join(self.tags)
+ + "_text\treview_sign\treviewer_id\n"
+ )
+ record_string = ""
+ for line in infile:
+ record_string = record_string + line
+ if line.endswith("</record>\n"):
+ element = ET.fromstring(record_string)
+ if self.split_mode:
+ de_number = self.get_de_number(element)
+ # if the last processed id is found
+ if de_number == self.split_id:
+ # next iteration, continue with writing
+ self.split_mode = False
+ record_string = ""
+ continue
+ else:
+ # continue searching
+ record_string = ""
+ continue
+ record = self.parse_record(element)
+ if record:
+ outfile.write(
+ "\t".join(str(x) for x in record.values()) + "\n"
+ )
+ record_string = ""
+
+
+
+[docs]
+ def parse_record(self, xml_record):
+ """
+ Parse xml record from zbMath API.
+
+ Args:
+ xml_record (xml element): record returned by zbMath API
+
+ Returns:
+ dict: dict of (tag,value) pairs extracted from xml_record
+ """
+ is_conflict = False
+ new_entry = {}
+ # zbMath identifier
+ de_number = self.get_de_number(xml_record)
+ creation_date = self.get_creation_date(xml_record)
+ new_entry["de_number"] = de_number
+ new_entry["creation_date"] = creation_date
+ # read tags
+ zb_preview = xml_record.find(
+ get_tag("metadata", namespace=self.namespace)
+ ).find(get_tag("zbmath", self.preview_namespace))
+ if zb_preview:
+ for tag in self.tags:
+ value = zb_preview.find(get_tag(tag, self.tag_namespace))
+ if value is not None:
+ if len(value):
+ if tag == "review":
+ for subtag in ["review_text", "review_sign", "reviewer_id"]:
+ subvalue = value.find(
+ get_tag(subtag, self.tag_namespace)
+ )
+ if subvalue is not None:
+ if len(subvalue):
+ sys.exit(f"tag {subtag} has children")
+ else:
+ text = subvalue.text
+ if subtag == "review_text":
+ text = text.replace("\t", " ")
+ text = text.replace("\n", " ")
+ new_entry[subtag] = text
+ else:
+ new_entry[subtag] = None
+ continue
+
+ # element has children
+ texts = []
+ for child in value:
+ texts.append(child.text)
+ texts = [t for t in texts if t is not None]
+ text = ";".join(
+ texts
+ ) # multiple values are rendered as a semicolon-separated string
+
+ else:
+ # element content is a simple text
+ text = zb_preview.find(get_tag(tag, self.tag_namespace)).text
+
+ text = text.replace("\n", " ")
+ new_entry[tag] = text
+ # if tag is not found in zbMath return, we still want to get it from doi
+ else:
+ new_entry[tag] = None
+ # return record, even if incomplete
+ return new_entry
+ else:
+ sys.exit("Error: zb_preview not found")
+
+
+
+[docs]
+ def push(self):
+ """Updates the MaRDI Wikibase entities corresponding to zbMath publications.
+ It creates a :class:`mardi_importer.zbmath.ZBMathPublication` instance
+ for each publication. Authors and journals are added, as well.
+ """
+ found = False
+ with open(self.processed_dump_path, "r") as infile:
+ in_header_line = True
+ for line in infile:
+ if in_header_line:
+ headers = line.strip().split("\t")
+ in_header_line = False
+ continue
+ split_line = line.strip("\n").split("\t")
+ # formatting error: skip
+ if len(split_line) != len(headers):
+ continue
+ info_dict = dict(zip(headers, split_line))
+ # this part is for continuing at a certain position if the import failed
+ # if not found:
+ # if info_dict["de_number"].strip() != " ":
+ # if info_dict["document_title"] != "Unimodular supergravity":
+ # continue
+ # else:
+ # found = True
+ # continue
+ # if there is not title, don't add
+ if self.conflict_string in info_dict["document_title"]:
+ if (
+ self.conflict_string not in info_dict["doi"]
+ and info_dict["doi"] != "None"
+ ):
+ document_title = get_info_from_doi(
+ doi=info_dict["doi"].strip(), key="document_title"
+ )
+ if not document_title:
+ print("No title from doi, uploading empty")
+ else:
+ print(f"Found document title {document_title} from doi")
+ else:
+ print("No doi found, uploading empty.")
+ document_title = None
+ # only upload those where there was a conflict before
+ else:
+ document_title = info_dict["document_title"].strip()
+ if not info_dict["zbl_id"] == "None":
+ zbl_id = info_dict["zbl_id"]
+ else:
+ zbl_id = None
+
+ if (
+ not self.conflict_string in info_dict["author_ids"]
+ and "None" not in info_dict["author_ids"]
+ ):
+ author_ids = info_dict["author_ids"].split(";")
+ if (
+ self.conflict_string in info_dict["author"]
+ or "None" in info_dict["author"]
+ ):
+ author_strings = [None] * len(author_ids)
+ else:
+ author_strings = info_dict["author"].split(";")
+ authors = []
+ for a, a_id in zip(author_strings, author_ids):
+ if not a and not a_id:
+ continue
+ if a:
+ a = a.strip()
+ a_id = a_id.strip()
+ if a_id in self.existing_authors:
+ authors.append(self.existing_authors[a_id])
+ print(f"Author with name {a} was already created this run.")
+ else:
+ for attempt in range(5):
+ try:
+ author = ZBMathAuthor(
+ integrator=self.integrator,
+ name=a,
+ zbmath_author_id=a_id,
+ )
+ local_author_id = author.create()
+ except Exception as e:
+ print(f"Exception: {e}, sleeping")
+ print(traceback.format_exc())
+ time.sleep(120)
+ else:
+ break
+ else:
+ sys.exit("Uploading author did not work after retries!")
+ authors.append(local_author_id)
+ self.existing_authors[a_id] = local_author_id
+ else:
+ authors = []
+
+ if (
+ self.conflict_string in info_dict["serial"]
+ or info_dict["serial"].strip() == "None"
+ ):
+ if (
+ self.conflict_string not in info_dict["doi"]
+ and info_dict["doi"] != "None"
+ ):
+ journal_string = get_info_from_doi(
+ doi=info_dict["doi"].strip(), key="journal"
+ )
+ else:
+ journal_string = None
+ else:
+ journal_string = info_dict["serial"].split(";")[-1].strip()
+ if journal_string:
+ if journal_string in self.existing_journals:
+ journal = self.existing_journals[journal_string]
+ print(
+ f"Journal {journal_string} was already created in this run."
+ )
+ else:
+ for attempt in range(5):
+ try:
+ journal_item = ZBMathJournal(
+ integrator=self.integrator, name=journal_string
+ )
+ if journal_item.exists():
+ print(f"Journal {journal_string} exists!")
+ journal = journal_item.QID
+ else:
+ print(f"Creating journal {journal_string}")
+ journal = journal_item.create()
+ except Exception as e:
+ print(f"Exception: {e}, sleeping")
+ print(traceback.format_exc())
+ time.sleep(120)
+ else:
+ break
+ else:
+ sys.exit("Uploading journal did not work after retries!")
+ self.existing_journals[journal_string] = journal
+ else:
+ journal = None
+
+ if not self.conflict_string in info_dict["language"]:
+ language = info_dict["language"].strip()
+ else:
+ language = None
+
+ if not self.conflict_string in info_dict["publication_year"]:
+ time_string = (
+ f"+{info_dict['publication_year'].strip()}-00-00T00:00:00Z"
+ )
+ else:
+ time_string = None
+
+ if not self.conflict_string in info_dict["links"]:
+ pattern = re.compile(
+ r"^([a-z][a-z\d+.-]*):([^][<>\"\x00-\x20\x7F])+$"
+ )
+ links = info_dict["links"].split(";")
+ links = [
+ x.strip() for x in links if (pattern.match(x) and "http" in x)
+ ]
+ arxiv_prefix = "https://arxiv.org/abs/"
+ arxiv_id = None
+ for l in links:
+ if arxiv_prefix in l:
+ arxiv_id = l.removeprefix(arxiv_prefix)
+ else:
+ links = []
+
+ if (
+ not self.conflict_string in info_dict["doi"]
+ and not "None" in info_dict["doi"]
+ ):
+ doi = info_dict["doi"].strip()
+ else:
+ doi = None
+
+ if info_dict["creation_date"] != "0001-01-01T00:00:00":
+ # because there can be no hours etc
+ creation_date = (
+ f"{info_dict['creation_date'].split('T')[0]}T00:00:00Z"
+ )
+ else:
+ creation_date = None
+
+ if (
+ not self.conflict_string in info_dict["review_text"]
+ and info_dict["review_text"].strip() != "None"
+ ):
+ review_text = info_dict["review_text"].strip()
+ if (
+ not self.conflict_string in info_dict["review_sign"]
+ and info_dict["review_sign"].strip() != "None"
+ and not self.conflict_string in info_dict["reviewer_id"]
+ and info_dict["reviewer_id"].strip() != "None"
+ and info_dict["reviewer_id"].strip() != ""
+ ):
+ reviewer_id = info_dict["reviewer_id"].strip()
+ reviewer_name = (
+ info_dict["review_sign"]
+ .strip()
+ .split("/")[0]
+ .strip()
+ .split("(")[0]
+ .strip()
+ )
+ if reviewer_id in self.existing_authors:
+ reviewer = self.existing_authors[reviewer_id]
+ print(
+ f"Reviewer with name {a} was already created this run."
+ )
+ else:
+ for attempt in range(5):
+ try:
+ reviewer_object = ZBMathAuthor(
+ integrator=self.integrator,
+ name=reviewer_name,
+ zbmath_author_id=reviewer_id,
+ )
+ reviewer = reviewer_object.create()
+ except Exception as e:
+ print(f"Exception: {e}, sleeping")
+ print(traceback.format_exc())
+ time.sleep(120)
+ else:
+ break
+ else:
+ sys.exit(
+ "Uploading reviewer did not work after retries!"
+ )
+ self.existing_authors[reviewer_id] = reviewer
+ else:
+ reviewer = None
+ else:
+ review_text = None
+ reviewer = None
+
+ if (
+ not self.conflict_string in info_dict["classifications"]
+ and info_dict["classifications"].strip() != "None"
+ and info_dict["classifications"].strip() != ""
+ ):
+ classifications = info_dict["classifications"].strip().split(";")
+ else:
+ classifications = None
+
+ if info_dict["de_number"].strip() != "None":
+ de_number = info_dict["de_number"].strip()
+ else:
+ de_number = None
+
+ if (
+ not self.conflict_string in info_dict["keywords"]
+ and info_dict["keywords"].strip() != "None"
+ and info_dict["keywords"].strip() != ""
+ ):
+ keywords = info_dict["keywords"].strip().split(";")
+ keywords = [x.strip() for x in keywords]
+ else:
+ keywords = None
+ for attempt in range(5):
+ try:
+ publication = ZBMathPublication(
+ integrator=self.integrator,
+ title=document_title,
+ doi=doi,
+ authors=authors,
+ journal=journal,
+ language=language,
+ time=time_string,
+ links=links,
+ creation_date=creation_date,
+ zbl_id=zbl_id,
+ arxiv_id=arxiv_id,
+ review_text=review_text,
+ reviewer=reviewer,
+ classifications=classifications,
+ de_number=de_number,
+ keywords=keywords,
+ de_number_prop=self.de_number_prop,
+ keyword_prop=self.keyword_prop,
+ )
+ if publication.exists():
+ print(f"Publication {document_title} exists")
+ publication.update()
+ else:
+ print(f"Creating publication {document_title}")
+ publication.create()
+ except Exception as e:
+ print(f"Exception: {e}, sleeping")
+ print(traceback.format_exc())
+ time.sleep(120)
+ else:
+ break
+ else:
+ sys.exit("Uploading publication did not work after retries!")
+
+
+
+[docs]
+ def get_de_number(self, xml_record):
+ """
+ Get zbMath id from xml record.
+
+ Args:
+ xml_record (xml element): record returned by zbMath API
+
+ Returns:
+ string: zbMath ID
+ """
+ de_number = (
+ xml_record.find(get_tag("header", self.namespace))
+ .find(get_tag("identifier", namespace=self.namespace))
+ .text
+ )
+ de_number = de_number.split(":")[-1]
+ return de_number
+
+
+
+[docs]
+ def get_creation_date(self, xml_record):
+ """
+ Get creation date from xml record.
+
+ Args:
+ xml_record (xml element): record returned by zbMath API
+
+ Returns:
+ string: creation date
+ """
+ creation_date = (
+ xml_record.find(get_tag("header", self.namespace))
+ .find(get_tag("datestamp", namespace=self.namespace))
+ .text
+ )
+ return creation_date
+
+
+
+from habanero import Crossref
+from requests.exceptions import HTTPError
+
+
+
+[docs]
+def get_tag(tag_name, namespace):
+ """
+ Returns a fully qualified tag name.
+
+ Args:
+ tag_name (string): name of tag, e.g. author
+ namespace (string): namespace URL of a namespace
+ """
+ return "{{{}}}{}".format(namespace, tag_name)
+
+
+
+
+[docs]
+def parse_doi_info(val, work_info):
+ """
+ Function to extract information returned by a doi query for a specific tag.
+
+ Args:
+ val (string): tag, e.g. author
+ work_info (dict): information from doi query response
+
+ Returns:
+ string: information for specific tag, None if not found
+ """
+ # information about return fields can be found under https://api.crossref.org/swagger-ui/index.html#/Works/get_works
+ if val == "author":
+ # author and the familiy subfield are mandatory fields in crossref api
+ # looks like: 'author': [{'given': 'Max', 'family': 'Mustermann', 'sequence': 'first', 'affiliation': []}]
+ if "author" not in work_info:
+ return None
+ first_name = ""
+ family_name = ""
+ author_list = []
+ for author_dict in work_info["author"]:
+ # family name not known: too little information
+ if "family" not in author_dict:
+ return None
+ family_name = author_dict["family"]
+ # family name not known; too little information
+ if not family_name:
+ return None
+ if "given" in author_dict:
+ first_name = author_dict["given"]
+ # first name not necessarily needed
+ if not first_name:
+ author_list.append(family_name)
+ else:
+ author_list.append(family_name + ", " + first_name)
+
+ return ";".join(author_list)
+ elif val == "document_title":
+ if "document_title" not in work_info:
+ return None
+ title_list = work_info["title"]
+ if title_list:
+ return ";".join(title_list)
+ else:
+ return None
+ elif val == "publication_year":
+ # date-parts is a mandaory field for published in crossref api
+ # 'published': {'date-parts': [[2008]]}} this is not necessarily the year this was published in the journal, apparently...
+ if "published" not in work_info:
+ return None
+ # this is either a year or None
+ return work_info["published"]["date_parts"][0][0]
+ elif val == "serial":
+ if "reference" not in work_info:
+ return None
+ serials = []
+ for serial_dict in work_info["reference"]:
+ if "journal_title" in serial_dict:
+ serials.append(serial_dict["journal-title"])
+ # if no serials were found
+ if not serials:
+ return None
+ # make list unique
+ serials = list(set(serials))
+ return ";".join(serials)
+
+ elif val == "language":
+ if "language" not in work_info:
+ return None
+ return work_info["language"]
+ elif val == "keywords":
+ if "subject" not in work_info:
+ return None
+ return ";".join(work_info["subject"])
+
+
+
+
+[docs]
+def get_info_from_doi(doi, key):
+ """
+ Query crossref API for DOI information.
+
+ Args:
+ doi: doi
+ key: document_title only for now
+
+ Returns:
+ title: document title
+ """
+ doi_list = doi.split(";")
+ # print("doi")
+ # print(doi)
+ # print("doi list")
+ # print(doi_list)
+ cr = Crossref(mailto="pusch@zib.de")
+ for doi in doi_list:
+ try:
+ work_info = cr.works(ids=doi)
+ # print("work info")
+ # print(work_info)
+ if key == "document_title":
+ if "title" not in work_info["message"]:
+ continue
+ # print(work_info["message"])
+ # print(work_info["message"]["title"])
+ title_list = work_info["message"]["title"]
+ if title_list:
+ joint_title = ";".join(title_list).strip()
+ joint_title = joint_title.replace("\n", " ").strip()
+ joint_title = joint_title.replace("\t", " ").strip()
+ if len(joint_title) > 500:
+ return None
+ return joint_title
+ else:
+ continue
+ elif key == "journal":
+ if "container-title" not in work_info["message"]:
+ return None
+ if not work_info["message"]["container-title"]:
+ return None
+ journal = work_info["message"]["container-title"][0].strip()
+ return journal
+ # if the doi is not found, there is a 404
+ except HTTPError:
+ print("HTTP Error!")
+ continue
+ return None
+
+
' + + '' + + _("Hide Search Matches") + + "
" + ) + ); + }, + + /** + * helper function to hide the search marks again + */ + hideSearchWords: () => { + document + .querySelectorAll("#searchbox .highlight-link") + .forEach((el) => el.remove()); + document + .querySelectorAll("span.highlighted") + .forEach((el) => el.classList.remove("highlighted")); + localStorage.removeItem("sphinx_highlight_terms") + }, + + initEscapeListener: () => { + // only install a listener if it is really needed + if (!DOCUMENTATION_OPTIONS.ENABLE_SEARCH_SHORTCUTS) return; + + document.addEventListener("keydown", (event) => { + // bail for input elements + if (BLACKLISTED_KEY_CONTROL_ELEMENTS.has(document.activeElement.tagName)) return; + // bail with special keys + if (event.shiftKey || event.altKey || event.ctrlKey || event.metaKey) return; + if (DOCUMENTATION_OPTIONS.ENABLE_SEARCH_SHORTCUTS && (event.key === "Escape")) { + SphinxHighlight.hideSearchWords(); + event.preventDefault(); + } + }); + }, +}; + +_ready(() => { + /* Do not call highlightSearchWords() when we are on the search page. + * It will highlight words from the *previous* search query. + */ + if (typeof Search === "undefined") SphinxHighlight.highlightSearchWords(); + SphinxHighlight.initEscapeListener(); +}); diff --git a/cran.html b/cran.html new file mode 100644 index 0000000..53804a7 --- /dev/null +++ b/cran.html @@ -0,0 +1,844 @@ + + + + + + +This module imports R packages published at the Comprehensive R Archive Network (CRAN).
+Specifically, it reads the table of packages ordered by date of publication. +This table contains for each R package the package name, title and date of publication. Based on the package name, +each package url can be accessed from:
+https://cran.r-project.org/web/packages/<package_name>/index.html
Several attributes are listed for each package. Among them, the following attributes are imported, when present, to the MaRDI knowledge graph:
+Version of the package.
+Software and package dependencies, including other R packages.
+Date of publication.
+Authors of the package are to be indicated according to the CRAN Repository Policy with the abbreviation [aut]. Given that this guideline is not always implemented, it is not always +possible to properly parse the authors.
+When no abbrevations describing the role of each individual are included, just the first listed author is imported.
+Software maintainer (Generally one of the authors).
+Bases: ADataSource
Processes data from the Comprehensive R Archive Network.
+Metadata for each R package is scrapped from the CRAN Repository. Each +Wikibase item corresponding to each R package is subsequently updated +or created, in case of a new package.
+Dataframe with package name, title and date of publication for +each package in CRAN.
+Pandas dataframe
+Reads date, package name and title from the CRAN Repository URL.
+The result is saved as a pandas dataframe in the attribute packages.
+Attribute packages
Pandas dataframe
+ImporterException – If table at the CRAN url cannot be accessed or read.
+Updates the MaRDI Wikibase entities corresponding to R packages.
+For each package name in the attribute packages checks +if the date in CRAN coincides with the date in the MaRDI +knowledge graph. If not, the package is updated. If the package +is not found in the MaRDI knowledge graph, the corresponding +item is created.
+It creates a mardi_importer.cran.RPackage
instance
+for each package.
Bases: object
Class to manage R package items in the local Wikibase instance.
+Date of publication
+str
+Package name
+str
+Title of the R package
+str
+Detailed description of the R package
+str
+URL to the CRAN repository
+str
+Version of the R package
+str
+Previous published versions
+List[Tuple[str, str]]
+Author(s) of the package
+Software license
+Dependencies to R and other packages
+Imported R packages
+List[Tuple[str, str]]
+Software maintainer
+str
+Package QID
+str
+API to MaRDI integrator
+Return the QID of the R package in the knowledge graph.
+Searches for an item with the package label in the Wikibase +SQL tables and returns the QID if a matching result is found.
+The entity QID representing the R package.
+str
+Return the integrator Item representing the R package.
+Adds also the label and description of the package.
+Integrator item
+Checks if an item corresponding to the R package already exists.
+Entity ID
+str
+Checks if the Item corresponding to the R package is up to date.
+Compares the last update property in the local knowledge graph with +the publication date imported from CRAN.
+True if both dates coincide, False otherwise.
+bool
+Imports metadata from CRAN corresponding to the R package.
+Imports Version, Dependencies, Imports**m **Authors, +Maintainer and License and saves them as instance +attributes.
+Create a package in the Wikibase instance.
+This function pulls the package, inserts its claims, and writes +it to the Wikibase instance.
+None
+Write the package item to the Wikibase instance.
+If the item has claims, it will be written to the Wikibase instance. +If the item is successfully written, a dictionary with the QID of the +item will be returned.
+A dictionary with the QID of the written item if successful, +or None otherwise.
+Optional[Dict[str, str]]
+Updates existing WB item with the imported metadata from CRAN.
+The metadata corresponding to the package is first pulled from CRAN and
+saved as instance attributes through pull()
. The statements that
+do not coincide with the locally saved information are updated or
+subsituted with the updated information.
Uses mardi_importer.wikibase.WBItem
to update the item
+corresponding to the R package.
ID of the updated R package.
+str
+Extracts the DOI identification of related publications.
+Identifies the DOI of publications that are mentioned using the +format doi: or arXiv: in the long description of the +R package.
+List containing the wikibase IDs of mentioned publications.
+List
+Returns the package last update date saved in the Wikibase instance.
+Last update date in format DD-MM-YYYY.
+str
+Processes raw imported data from CRAN to enable the creation of items.
+Package dependencies are splitted at the comma position.
License information is processed using the parse_license()
method.
Author information is processed using the parse_authors()
method.
Maintainer information is processed using the parse_maintainer()
method.
table_html – HTML code obtained with BeautifulSoup corresponding to the table +containing the metadata of the R package imported from CRAN.
+Dataframe with processed data from a single R package including columns: +Version, Author, License, Depends, Imports +and Maintainer.
+(Pandas dataframe)
+Processes the dependency and import information of each R package.
+This includes: +- Extracting the version information of each dependency/import if provided. +- Providing the Item QID given the dependency/import label. +- Creating a new Item if the dependency/import is not found in the
+++local knowledge graph.
+
List of tuples including software QID and version.
+List[Tuple[str, str]]
+Splits string of licenses.
+Takes into account that licenses are often not uniformly listed. +Characters |, + and , are used to separate licenses. Further +details on each license are often included in square brackets.
+The concrete License is identified and linked to the corresponding +item that has previously been imported from Wikidata. Further license +information, when provided between round or square brackets, is added +as a qualifier.
+If a file license is mentioned, the linked to the file license +in CRAN is added as a qualifier.
+x (str) – String imported from CRAN representing license +information.
+List of license tuples. Each tuple contains the license QID +as the first element and the license qualifier as the +second element.
+List[Tuple[str, str]]
+Splits the string corresponding to the authors into a dictionary.
+Author information in CRAN is not registered uniformly. This function +parses the imported string and returns just the names of the individuals +that can be unequivocally identified as authors (i.e. they are followed +by the [aut] abbreviation).
+Generally, authors in CRAN are indicated with the abbreviation [aut]. +When no abbreviations are included, only the first individual is imported +to Wikibase (otherwise it can often not be established whether +information after the first author refers to another individual, +an institution, a funder, etc.)
+x (String) – String imported from CRAN representing author +information.
+Dictionary of authors and corresponding ORCID ID, if provided.
+(Dict)
+Remove unnecessary information from maintainer string.
+x (str) – String imported from CRAN which may contain e-mail +address and comments within brackets
+Name of the maintainer
+(str)
+Returns the Wikidata item ID corresponding to a software license.
+The same license is often denominated in CRAN using differents names. +This function returns the wikidata item ID corresponding to a single +unique license that is referenced in CRAN under different names (e.g. +Artistic-2.0 and Artistic License 2.0 both refer to the same +license, corresponding to item Q14624826).
+license_str (str) – String corresponding to a license imported from CRAN.
+Wikidata item ID.
+(str)
+Get the Wikidata QID for the R package.
+Searches for the R package in Wikidata using its label. Retrieves +the QID of matching entities and checks if there is an instance of +an R package. If so, returns the QID.
+The Wikidata QID of the R package if found, or None otherwise.
+Optional[str]
++ |
+ | + |
+ | + |
+ |
+ | + |
|
+
|
+
+ |
+ | + |
+ | + |
+ |
+ |
+ | + |
+ | + |
+ | + |
+ | + |
Created on Thu Feb 17 18:53:53 2022
+@author: alvaro
+Bases: object
Abstract base class for parsing config files
+ + +Bases: object
Abstract base class for reading data from external sources.
+ + + + + + + + +This is the documentation of docker-importer.
+Bases: MardiClient
Sets up initial configuration for the integrator
+Clientlogin object
+Function for in-place conversion of wikidata +ids found in claims into local ids
+entity
+None
+Function for in-place conversion of unit for quantity +and globe for globecoordinate to a link to the local entity +instead of a link to the wikidata entity.
+snak – a wikibaseintegrator snak
+None
+Check if db table for id mapping is there; if not, create.
+None
+None
+Function for creating a list of ids +from a while where each id is in a new line
+file – path to file
+Returns: list of ids
+Check if entity with a given label or wikidata PID/QID +exists in the local wikibase instance.
+entity_str (str) – It can be a string label or a wikidata ID, +specified with the prefix wdt: for properties and wd: +for items.
entity_type (str) – Either ‘property’ or ‘item’ to specify +which type of entity to look for.
Local ID of the entity, if found.
+str
+Function for creating qualifiers from wikidata qualifiers +and in place adding them to the claim
+claim – a wikibaseintegrator claim
+Qualifiers object, can also be an empty object
+Function for creating references from wikidata references +and in place adding them to the claim
+claim – a wikibaseintegrator claim
+List with references, can also be an empty list
+Function for pulling wikidata information
+wikidata_id (str) – wikidata id of the desired entity
recurse (Bool) – if claims should also be imported
Returns: wikibase integrator entity or None, if the entity has no labels
+Function for importing entities that are mentioned +in claims from wikidata to the local wikibase instance
+wikidata_id (str) – id of the entity to be imported
+local id or None, if the entity had no labels
+Function for importing entities from wikidata +into the local instance.
+It can accept a single id, a list of ids or a file containing +a the ids to be imported.
+id_list – Single string or list of strings of wikidata +entity ids. Lexemes not supported.
filename – Filename containing list of entities to be +imported.
recurse – Whether to import claims for the entities in +id_list
Dictionary containing the local ids of +all the imported entities.
+Imported entities (Dict)
+Imports an entity from Wikidata just from a label
+label (str) – label to be imported from wikidata
+local id for the imported entity
+local_id (str)
+Searches the wikidata PID property ID to link +properties to its ID in wikidata. When not found, +it creates the property.
+wikidata_PID (str): wikidata PID property ID
+Searches the wikidata QID property ID to link +items to its ID in wikidata. When not found, +it creates the property.
+wikidata_QID (str): wikidata QID property ID
+Insert wikidata_id, local_id and has_all_claims into mapping table.
+wikidata_id – Wikidata id
local_id – local Wikibase id
has_all_claims – Boolean indicating whether the entity has been +imported with all claims or no claims (i.e. no recurse)
None
+Function for completing an already existing local entity +with its statements from wikidata.
+wikidata_id – Wikidata entity ID to be imported.
local_id – Local id of the existing entity that needs to +be completed with further statements.
Local entity ID
+local_id
+Query the wb_id_mapping db table for a given parameter.
+The two important parameters are the local_id and whether the +entity has already been imported with all claims
+parameter (str) – Either local_id or has_all_claims
wikidata_id (str) – Wikidata ID
otherwise None. For has_all_claims, a boolean is returned.
+str or boolean
+Query the wb_id_mapping db table for a given parameter.
+The two important parameters are the wikidata_id and whether the +entity has already been imported with all claims
+parameter (str) – Either wikidata_id or has_all_claims
local_id (str) – local ID
otherwise None. For has_all_claims, a boolean is returned.
+str or boolean
+Bases: MardiItem
Request the MediaWiki API to get data for the entity specified in argument.
+entity_id – The entity_id of the Item entity you want. Must start with a ‘Q’.
kwargs
an ItemEntity instance
+What is this about?
+Install the python package of mardi-importer
by first installing the
+requirements from requirements.txt
,
pip install -r requirements.txt
+
Then install the packages via
+pip install -U -e .
+
-U
enforces reinstalling the package, with -e
modifications in
+the source files are automatically taken into account.
Note: for convenience, local installations not using docker can be placed within +virtual environments by first calling
+python3 -m venv env
+source env/bin/activate
+
TODO
+In docs/
, run make html
to generate the documentation for a
+local installation. The modules have to be installed and findable by import
+module
. To view the docs, open the file docs/_build/html/index.html
.
TODO
++ m | ||
+ |
+ mardi_importer | + |
+ |
+ mardi_importer.cran.CRANSource | + |
+ |
+ mardi_importer.cran.RPackage | + |
+ |
+ mardi_importer.importer.Importer | + |
+ |
+ mardi_importer.integrator.MardiEntities | + |
+ |
+ mardi_importer.integrator.MardiIntegrator | + |
+ |
+ mardi_importer.polydb | + |
+ |
+ mardi_importer.zbmath.misc | + |
+ |
+ mardi_importer.zbmath.ZBMathSource | + |
usage: import.py [-h] --mode {ZBMath,CRAN,polydb,OpenML,zenodo}
+ [--conf_path CONF_PATH]
+ [--wikidata_id_file_path WIKIDATA_ID_FILE_PATH]
+
Possible choices: ZBMath, CRAN, polydb, OpenML, zenodo
+Bases: ADataSource
Reads data from zb math API.
+ + +Get creation date from xml record.
+xml_record (xml element) – record returned by zbMath API
+creation date
+string
+Get zbMath id from xml record.
+xml_record (xml element) – record returned by zbMath API
+zbMath ID
+string
+Overrides abstract method. +Reads a raw zbMath data dump and processes it, then saves it as a csv.
+Overrides abstract method. +This method queries the zbMath API to get a data dump of all records, +optionally between from_date and until_date
+Parse xml record from zbMath API.
+xml_record (xml element) – record returned by zbMath API
+dict of (tag,value) pairs extracted from xml_record
+dict
+Overrides abstract method. +Reads a raw zbMath data dump and processes it, then saves it as a csv.
+Query crossref API for DOI information.
+doi – doi
key – document_title only for now
document title
+title
+Returns a fully qualified tag name.
+tag_name (string) – name of tag, e.g. author
namespace (string) – namespace URL of a namespace
Function to extract information returned by a doi query for a specific tag.
+val (string) – tag, e.g. author
work_info (dict) – information from doi query response
information for specific tag, None if not found
+string
+