diff --git a/.buildinfo b/.buildinfo new file mode 100644 index 0000000..0f7bbac --- /dev/null +++ b/.buildinfo @@ -0,0 +1,4 @@ +# Sphinx build info version 1 +# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done. +config: 9b9f7ca5503430bd1b9c7e338b67cd53 +tags: 645f666f9bcd5a90fca523b33c5a78b7 diff --git a/.nojekyll b/.nojekyll new file mode 100644 index 0000000..e69de29 diff --git a/_images/activity.drawio.svg b/_images/activity.drawio.svg new file mode 100644 index 0000000..9e05ebe --- /dev/null +++ b/_images/activity.drawio.svg @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/_modules/index.html b/_modules/index.html new file mode 100644 index 0000000..ec5bace --- /dev/null +++ b/_modules/index.html @@ -0,0 +1,119 @@ + + +
+ + +
+import json
+import logging
+import os
+import pandas as pd
+import time
+
+from mardi_importer.integrator import MardiIntegrator
+from mardi_importer.importer import ADataSource, ImporterException
+from .RPackage import RPackage
+
+log = logging.getLogger('CRANlogger')
+
+
+[docs]
+class CRANSource(ADataSource):
+ """Processes data from the Comprehensive R Archive Network.
+
+ Metadata for each R package is scrapped from the CRAN Repository. Each
+ Wikibase item corresponding to each R package is subsequently updated
+ or created, in case of a new package.
+
+ Attributes:
+ packages (Pandas dataframe):
+ Dataframe with **package name**, **title** and **date of publication** for
+ each package in CRAN.
+ """
+
+ def __init__(self):
+ self.integrator = MardiIntegrator()
+ self.filepath = os.path.realpath(os.path.dirname(__file__))
+ self.packages = ""
+
+
+[docs]
+ def setup(self):
+ """Create all necessary properties and entities for CRAN
+ """
+ # Import entities from Wikidata
+ filename = self.filepath + "/wikidata_entities.txt"
+ self.integrator.import_entities(filename=filename)
+
+ # Create new required local entities
+ self.create_local_entities()
+
+
+
+[docs]
+ def create_local_entities(self):
+ filename = self.filepath + "/new_entities.json"
+ f = open(filename)
+ entities = json.load(f)
+
+ for prop_element in entities['properties']:
+ prop = self.integrator.property.new()
+ prop.labels.set(language='en', value=prop_element['label'])
+ prop.descriptions.set(language='en', value=prop_element['description'])
+ prop.datatype = prop_element['datatype']
+ if not prop.exists(): prop.write()
+
+ for item_element in entities['items']:
+ item = self.integrator.item.new()
+ item.labels.set(language='en', value=item_element['label'])
+ item.descriptions.set(language='en', value=item_element['description'])
+ for key, value in item_element['claims'].items():
+ item.add_claim(key,value=value)
+ if not item.exists(): item.write()
+
+
+
+
+
+[docs]
+ def pull(self):
+ """Reads **date**, **package name** and **title** from the CRAN Repository URL.
+
+ The result is saved as a pandas dataframe in the attribute **packages**.
+
+ Returns:
+ Pandas dataframe: Attribute ``packages``
+
+ Raises:
+ ImporterException: If table at the CRAN url cannot be accessed or read.
+ """
+ url = r"https://cran.r-project.org/web/packages/available_packages_by_date.html"
+
+ try:
+ tables = pd.read_html(url) # Returns list of all tables on page
+ except Exception as e:
+ raise ImporterException(
+ "Error attempting to read table from CRAN url\n{}".format(e)
+ )
+ else:
+ self.packages = tables[0]
+ return self.packages
+
+
+
+[docs]
+ def push(self):
+ """Updates the MaRDI Wikibase entities corresponding to R packages.
+
+ For each **package name** in the attribute **packages** checks
+ if the date in CRAN coincides with the date in the MaRDI
+ knowledge graph. If not, the package is updated. If the package
+ is not found in the MaRDI knowledge graph, the corresponding
+ item is created.
+
+ It creates a :class:`mardi_importer.cran.RPackage` instance
+ for each package.
+ """
+ # Limit the query to only 30 packages (Comment next line to process data on all ~19000 packages)
+ self.packages = self.packages.loc[1:10, :]
+
+ #flag = False
+
+ for i, row in self.packages.iterrows():
+ package_date = self.packages.loc[i, "Date"]
+ package_label = self.packages.loc[i, "Package"]
+ package_title = self.packages.loc[i, "Title"]
+
+ #if not flag and package_label != "UpSetVP":
+ # continue
+ #flag = True
+
+ package = RPackage(package_date, package_label, package_title, self.integrator)
+ if package.exists():
+ if not package.is_updated():
+ log.info(f"Package {package_label} found: Not up to date. Attempting update...")
+ package.update()
+ else:
+ log.info(f"Package {package_label} found: Already up to date.")
+ else:
+ log.info(f"Package {package_label} not found: Attempting item creation...")
+ package.create()
+
+ time.sleep(2)
+
+
+
+import io
+import json
+import logging
+import pandas as pd
+import re
+import requests
+
+from mardi_importer.publications import (Author,
+ ArxivPublication,
+ CrossrefPublication,
+ ZenodoResource)
+from wikibaseintegrator.wbi_enums import ActionIfExists
+from wikibaseintegrator.wbi_helpers import search_entities, remove_claims
+from bs4 import BeautifulSoup
+
+log = logging.getLogger('CRANlogger')
+
+
+[docs]
+class RPackage:
+ """Class to manage R package items in the local Wikibase instance.
+
+ Attributes:
+ date:
+ Date of publication
+ label:
+ Package name
+ description:
+ Title of the R package
+ long_description:
+ Extended description of the R package
+ url:
+ URL to the CRAN repository
+ version:
+ Version of the R package
+ author:
+ Author(s) of the package
+ license:
+ Software license
+ dependency:
+ Dependencies to R and other packages
+ maintainer:
+ Software maintainer
+ """
+ def __init__(self, date, label, title, integrator):
+ self.date = date
+ self.label = label
+ self.description = title
+ self.long_description = ""
+ self.url = ""
+ self.version = ""
+ self.author = ""
+ self.license = ""
+ self.dependency = ""
+ self.imports = ""
+ self.maintainer = ""
+ self.author_ID = []
+ self.QID = None
+ self.api = integrator
+ self.item = self.init_item()
+
+
+[docs]
+ def init_item(self):
+ item = self.api.item.new()
+ item.labels.set(language="en", value=self.label)
+ description = ""
+ if self.label != self.description:
+ description = self.description
+ else:
+ description += self.description + " (R Package)"
+ item.descriptions.set(
+ language="en",
+ value=description
+ )
+ return item
+
+
+
+[docs]
+ def pull(self):
+ """Imports metadata from CRAN corresponding to the R package.
+
+ Imports **Version**, **Dependencies**, **Authors**, **Maintainer**
+ and **License** and saves them as instance attributes.
+ """
+ url = f"https://CRAN.R-project.org/package={self.label}"
+ self.url = url
+
+ try:
+ page = requests.get(url)
+ soup = BeautifulSoup(page.content, 'lxml')
+ except:
+ log.warning(f"Package {self.label} package not found in CRAN.")
+ return None
+ else:
+ if soup.find_all('table'):
+ table = soup.find_all('table')[0]
+ self.long_description = soup.find_all('p')[0].get_text()
+ package_df = self.clean_package_list(table)
+ if "Version" in package_df.columns:
+ self.version = package_df.loc[1, "Version"]
+ if "Author" in package_df.columns:
+ self.author = package_df.loc[1, "Author"]
+ if "License" in package_df.columns:
+ self.license = package_df.loc[1, "License"]
+ if "Depends" in package_df.columns:
+ self.dependency = package_df.loc[1, "Depends"]
+ if "Imports" in package_df.columns:
+ self.imports = package_df.loc[1, "Imports"]
+ if "Maintainer" in package_df.columns:
+ self.maintainer = package_df.loc[1, "Maintainer"]
+ return self
+ else:
+ log.warning(f"Metadata table not found in CRAN. Package has probably been archived.")
+ return None
+
+
+
+
+[docs]
+ def exists(self):
+ """Checks if a WB item corresponding to the R package already exists.
+
+ Searches for a WB item with the package label in the SQL Wikibase
+ tables and returns **True** if a matching result is found.
+
+ It uses for that the :meth:`mardi_importer.wikibase.WBItem.instance_exists()`
+ method.
+
+ Returns:
+ String: Entity ID
+ """
+ if self.QID: return self.QID
+ self.QID = self.item.is_instance_of('wd:Q73539779')
+ #self.item.id = self.QID
+ return self.QID
+
+
+
+[docs]
+ def is_updated(self):
+ """Checks if the WB item corresponding to the R package is up to date.
+
+ Compares the publication date in the local knowledge graph with the
+ publication date imported from CRAN.
+
+ Returns:
+ Boolean: **True** if both dates coincide, **False** otherwise.
+ """
+ return self.date in self.get_WB_package_date()
+
+
+
+[docs]
+ def create(self):
+ """Creates a WB item with the imported metadata from CRAN.
+
+ The metadata corresponding to one package is first pulled as instance
+ attributes through :meth:`pull`. Before creating the new entity
+ corresponding to an R package, new entities corresponding to dependencies
+ and authors are alreday created, when these do not already exist in the
+ local Wikibase instance.
+
+ Uses :class:`mardi_importer.wikibase.WBItem` to create the
+ corresponding new item.
+
+ Returns:
+ String: ID of the created R package.
+ """
+ if self.pull():
+ self.insert_claims(self.item)
+ package = self.item.write()
+ if package.id:
+ log.info(f"Package created with ID {package.id}.")
+ return package.id
+ else:
+ log.info(f"Package could not be created.")
+
+
+
+[docs]
+ def update(self):
+ """Updates existing WB item with the imported metadata from CRAN.
+
+ The metadata corresponding to the package is first pulled from CRAN and
+ saved as instance attributes through :meth:`pull`. The statements that
+ do not coincide with the locally saved information are updated or
+ subsituted with the updated information.
+
+ Uses :class:`mardi_importer.wikibase.WBItem` to update the item
+ corresponding to the R package.
+
+ Returns:
+ String: ID of the updated R package.
+ """
+ if self.pull():
+ self.item = self.api.item.get(entity_id=self.QID)
+
+ if self.item.descriptions.values.get('en') != self.description:
+ description = ""
+ if self.label != self.description:
+ description = self.description
+ else:
+ description += self.description + " (R Package)"
+ self.item.descriptions.set(
+ language="en",
+ value=description
+ )
+
+ new_item = self.api.item.new()
+
+ self.author_ID = self.item.get_value('wdt:P50')
+ self.insert_claims(new_item)
+
+ # Remove GUID for last update statement
+ last_update_prop_nr = self.api.get_local_id_by_label('last update', 'property')
+ claim = self.item.claims.get(last_update_prop_nr)
+ guid = claim[0].id
+
+ self.item.claims.add(
+ new_item.claims,
+ ActionIfExists.APPEND_OR_REPLACE,
+ )
+ self.item.write()
+
+ # Remove last update statement
+ remove_claims(guid, login=self.api.login, is_bot=True)
+
+ if self.QID:
+ log.info(f"Package with ID {self.QID} has been updated.")
+ return self.QID
+ else:
+ log.info(f"Package could not be updated.")
+ return None
+ return None
+
+
+
+[docs]
+ def insert_claims(self, item):
+ # Instance of: R package
+ item.add_claim("wdt:P31", "wd:Q73539779")
+
+ # Programmed in: R
+ item.add_claim("wdt:P277", "wd:Q206904")
+
+ # Last update date
+ item.add_claim("wdt:P5017", f"+{self.date}T00:00:00Z")
+
+ # Software version identifier
+ qualifier = [self.api.get_claim("wdt:P577", f"+{self.date}T00:00:00Z")]
+ item.add_claim("wdt:P348", self.version, qualifiers=qualifier)
+
+ # Authors
+ self.author_ID = self.preprocess_authors()
+ claims = []
+ for author in self.author_ID:
+ claims.append(self.api.get_claim("wdt:P50", author))
+ item.add_claims(claims)
+
+ # Maintainer
+ maintainer_ID = self.preprocess_maintainer()
+ item.add_claim("wdt:P126", maintainer_ID)
+
+ # Licenses
+ licenses = self.process_licenses()
+ item.add_claims(licenses)
+
+ # Dependencies
+ dependencies = self.process_dependencies()
+ item.add_claims(dependencies)
+
+ # Imports
+ imports = self.process_imports()
+ item.add_claims(imports)
+
+ # Related publication
+ publication_list = self.preprocess_publications()
+ cites_work = "wdt:P2860"
+ claims = []
+ for publication in publication_list:
+ claims.append(self.api.get_claim(cites_work, publication))
+ item.add_claims(claims)
+
+ # CRAN Project
+ item.add_claim("wdt:P5565", self.label)
+
+ # Wikidata QID
+ wikidata_QID = self.get_wikidata_QID()
+ if wikidata_QID: item.add_claim("Wikidata QID", wikidata_QID)
+
+
+
+[docs]
+ def preprocess_authors(self):
+ """Processes the author information of each R package. This includes:
+
+ - Searching if an author with the given ID already exists in the KG.
+ - Alternatively, create WB Items for new authors.
+
+ Returns:
+ List:
+ Item IDs corresponding to each author.
+ """
+
+ author_ID = []
+ for name, orcid in self.author.items():
+ new_author = ""
+ if name.lower() in ["r foundation", "the r foundation"]:
+ author = self.api.query('local_id', 'Q111430684')
+ author_ID.append(author)
+ elif name == "R Core Team":
+ author = self.api.query('local_id', 'Q116739338')
+ author_ID.append(author)
+ elif name == "CRAN Team":
+ author = self.api.query('local_id', 'Q116739332')
+ author_ID.append(author)
+ else:
+ author = Author(self.api, name, orcid, self.author_ID)
+ if author.QID:
+ new_author = author.QID
+ elif self.QID:
+ current_authors = self.item.get_value("wdt:P50")
+ for author_id in current_authors:
+ author_label = self.api.item.get(entity_id=author_id).labels.values['en']
+ if name == author_label:
+ new_author = author_id
+ if not new_author:
+ new_author = author.create()
+ else:
+ new_author = author.create()
+ author_ID.append(new_author)
+ return author_ID
+
+
+
+[docs]
+ def preprocess_maintainer(self):
+ """Processes the maintainer information of each R package. This includes:
+
+ - Providing the Item ID given the maintainer name.
+ - Creating a new WB Item if the maintainer is not found in the
+ local graph.
+
+ Returns:
+ String:
+ Item ID corresponding to the maintainer.
+ """
+ for author in self.author_ID:
+ package_author_name = str(self.api.item.get(entity_id=author).labels.values['en'])
+ if Author(self.api, package_author_name).compare_names(self.maintainer):
+ return author
+ # Create item for the maintainer, if it does not exist already
+ maintainer = self.api.item.new()
+ maintainer.labels.set(language="en", value=self.maintainer)
+ maintainer.add_claim("wdt:P31", "wd:Q5")
+ return maintainer.write().id
+
+
+
+[docs]
+ def preprocess_software(self, packages):
+ """Processes the dependency and import information of each R package. This includes:
+
+ - Extracting the version information of each dependency/import if provided.
+ - Providing the Item ID given the dependency/import label.
+ - Creating a new WB Item if the dependency/import is not found in the
+ local knowledge graph.
+
+ Returns:
+ Dict:
+ Dictionary with key value corresponding to the Item ID of each dependency or
+ import. The value indicates the version of each dependency or import, if
+ provided, which is added in the statement as a qualifier.
+ """
+ if packages == "dependencies":
+ process_list = self.dependency
+ elif packages == "imports":
+ process_list = self.imports
+ software = {}
+ if type(process_list) is list:
+ for software_string in process_list:
+ software_version = re.search("\((.*?)\)", software_string)
+ software_name = re.sub("\(.*?\)", "", software_string).strip()
+ item = self.api.item.new()
+ item.labels.set(language="en", value=software_name)
+ software_id = item.is_instance_of("wd:Q73539779") # Instance of R package
+ if software_name == "R":
+ # Software = R
+ software_ID = self.api.query("local_id", "Q206904")
+ elif software_id:
+ # Software = R package
+ software_ID = software_id
+ else:
+ # Software = New instance of R package
+ item.add_claim("wdt:P31", "wd:Q73539779")
+ item.add_claim("wdt:P277", "wd:Q206904")
+ software_ID = item.write().id
+
+ software[software_ID] = ""
+ if software_version:
+ software[software_ID] = software_version.group(1)
+
+ return software
+
+
+
+[docs]
+ def preprocess_publications(self):
+ """Extracts the DOI identification of related publications.
+
+ Identifies the DOI of publications that are mentioned using the
+ format *doi:* or *arXiv:* in the long description of the
+ R package.
+
+ Returns:
+ List:
+ List containing the wikibase IDs of mentioned publications.
+ """
+ publication_id_array = []
+ publication_authors = self.author_ID
+ scholarly_article = "wd:Q13442814"
+ doi_id = "wdt:P356"
+
+ doi_references = re.findall('<doi:(.*?)>', self.long_description)
+ arxiv_references = re.findall('<arXiv:(.*?)>', self.long_description)
+ zenodo_references = re.findall('<zenodo:(.*?)>', self.long_description)
+
+ doi_references = list(map(lambda x: x[:-1] if x.endswith('.') else x, doi_references))
+ arxiv_references = list(map(lambda x: x[:-1] if x.endswith('.') else x, arxiv_references))
+ zenodo_references = list(map(lambda x: x[:-1] if x.endswith('.') else x, zenodo_references))
+
+ crossref_references = []
+
+ for doi in doi_references:
+ doi = doi.strip().lower()
+ if re.search('10.48550/', doi):
+ arxiv_id = doi.replace('10.48550/arxiv.', '')
+ arxiv_references.append(arxiv_id)
+ elif re.search('10.5281/', doi):
+ zenodo_id = doi.replace('10.5281/zenodo.', '')
+ zenodo_references.append(zenodo_id)
+ else:
+ crossref_references.append(doi)
+
+ for doi in crossref_references:
+ publication = CrossrefPublication(self.api, doi, publication_authors)
+ publication_item = self.api.item.new()
+ publication_item.labels.set(language="en", value=publication.title)
+ publication_id = publication_item.is_instance_of_with_property(scholarly_article, doi_id, doi)
+
+ if publication_id:
+ publication_item = self.api.item.get(entity_id=publication_id)
+ coauthors = publication_item.get_value("wdt:P50")
+ for coauthor in coauthors:
+ if coauthor not in publication_authors:
+ publication_authors.append(coauthor)
+ #publication_authors += coauthors
+ else:
+ publication_id = publication.create()
+ for coauthor in publication.coauthors:
+ if coauthor not in publication_authors:
+ publication_authors.append(coauthor)
+ #publication_authors += publication.coauthors
+
+ if publication_id:
+ publication_id_array.append(publication_id)
+
+ for arxiv_id in arxiv_references:
+ arxiv_id = arxiv_id.strip()
+ if ":" in arxiv_id: arxiv_id = arxiv_id.replace(":",".")
+ if "10.48550/" in arxiv_id: arxiv_id = arxiv_id.lower().replace('10.48550/arxiv.', '')
+ publication = ArxivPublication(self.api, arxiv_id, publication_authors)
+
+ publication_item = self.api.item.new()
+ publication_item.labels.set(language="en", value=publication.title)
+ arxiv_id_prop_nr = "wdt:P818"
+ publication_id = publication_item.is_instance_of_with_property(scholarly_article, arxiv_id_prop_nr, arxiv_id)
+
+ if publication_id:
+ publication_item = self.api.item.get(entity_id=publication_id)
+ coauthors = publication_item.get_value("wdt:P50")
+ for coauthor in coauthors:
+ if coauthor not in publication_authors:
+ publication_authors.append(coauthor)
+ #publication_authors += coauthors
+ else:
+ publication_id = publication.create()
+ for coauthor in publication.coauthors:
+ if coauthor not in publication_authors:
+ publication_authors.append(coauthor)
+ #publication_authors += publication.coauthors
+
+ if publication_id:
+ publication_id_array.append(publication_id)
+
+ for zenodo_id in zenodo_references:
+ zenodo_id = zenodo_id.strip()
+ if ":" in zenodo_id: zenodo_id = zenodo_id.replace(":",".")
+ if "10.5281/" in zenodo_id: zenodo_id = zenodo_id.lower().replace('10.5281/zenodo.', '')
+
+ resource_id = None
+ resource = ZenodoResource(self.api, zenodo_id, publication_authors)
+
+ resource_item = self.api.item.new()
+ resource_item.labels.set(language="en", value=resource.title)
+ zenodo_prop_nr = "wdt:P4901"
+
+ for resource_item.resource_type in ["wd:Q1172284",
+ "wd:Q7397",
+ "wd:Q604733",
+ "wd:Q10870555",
+ "wd:Q429785",
+ "wd:Q478798",
+ "wd:Q2431196",
+ "wd:Q379833",
+ "wd:Q580922"
+ "wd:Q37866906"]:
+ found = resource_item.is_instance_of_with_property(
+ resource_item.resource_type,
+ zenodo_prop_nr,
+ zenodo_id
+ )
+ if found: resource_id = found
+
+ if not resource_id:
+ resource_id = resource.create()
+
+ if resource_id:
+ publication_id_array.append(resource_id)
+
+ return publication_id_array
+
+
+
+[docs]
+ def process_dependencies(self):
+ """Adds the statements corresponding to the package dependencies.
+
+ Insert the wikibase statements corresponding the required R package for
+ the instantiated R package. The statement includes a link to the item
+ representing the dependency and, when provided, a qualifier
+ specifying the required version of the dependency.
+
+ Args:
+ item (WBItem):
+ Item representing the R package to which the statement must be added.
+ """
+ preprocessed_dependencies = self.preprocess_software("dependencies")
+ claims = []
+ for software, version in preprocessed_dependencies.items():
+ qualifier = []
+ if version:
+ qualifier = [self.api.get_claim("wdt:P348", version)]
+ claims.append(self.api.get_claim("wdt:P1547", software, qualifiers=qualifier))
+ return claims
+
+
+
+[docs]
+ def process_imports(self):
+ """Adds the statements corresponding to the package imports.
+
+ Insert the wikibase statements corresponding the imported R packages for
+ the instantiated R package. The statement includes a link to the item
+ representing the imported package and, when provided, a qualifier
+ specifying the required version of this package.
+
+ Args:
+ item (WBItem):
+ Item representing the R package to which the imported packages
+ statements must be added.
+ """
+ preprocessed_imports = self.preprocess_software("imports")
+ prop_nr = self.api.get_local_id_by_label("imports", "property")
+ claims = []
+ for software, version in preprocessed_imports.items():
+ qualifier = []
+ if version:
+ qualifier = [self.api.get_claim("wdt:P348", version)]
+ claims.append(self.api.get_claim(prop_nr, software, qualifiers=qualifier))
+ return claims
+
+
+ #for software, version in preprocessed_imports.items():
+ # item.add_statement(import_property, software, WD_P348=version) if len(version) > 0 else item.add_statement(import_property, software)
+
+
+[docs]
+ def process_licenses(self):
+ """Processes the license string and adds the corresponding statements.
+
+ The concrete License is identified and linked to the corresponding
+ item that has previously been imported from Wikidata. Further license
+ information, when provided between round or square brackets, is added
+ as a qualifier.
+
+ If a file license is mentioned, the linked to the file license
+ in CRAN is added as a qualifier.
+
+ Args:
+ item (WBItem):
+ Item representing the R package to which the statement must be added.
+ """
+ claims = []
+ for license_str in self.license:
+ license_qualifier = ""
+ if re.findall("\(.*?\)", license_str):
+ qualifier_groups = re.search("\((.*?)\)", license_str)
+ license_qualifier = qualifier_groups.group(1)
+ license_aux = re.sub("\(.*?\)", "", license_str)
+ if re.findall("\[.*?\]", license_aux):
+ qualifier_groups = re.search("\[(.*?)\]", license_str)
+ license_qualifier = qualifier_groups.group(1)
+ license_str = re.sub("\[.*?\]", "", license_aux)
+ else:
+ license_str = license_aux
+ elif re.findall("\[.*?\]", license_str):
+ qualifier_groups = re.search("\[(.*?)\]", license_str)
+ license_qualifier = qualifier_groups.group(1)
+ license_str = re.sub("\[.*?\]", "", license_str)
+ license_str = license_str.strip()
+ license_QID = self.get_license_QID(license_str)
+ if license_str == "file LICENSE" or license_str == "file LICENCE":
+ qualifier = [self.api.get_claim("wdt:P2699", f"https://cran.r-project.org/web/packages/{self.label}/LICENSE")]
+ claims.append(self.api.get_claim("wdt:P275", license_QID, qualifiers=qualifier))
+ elif license_QID:
+ if license_qualifier:
+ qualifier = [self.api.get_claim("wdt:P9767", license_qualifier)]
+ claims.append(self.api.get_claim("wdt:P275", license_QID, qualifiers=qualifier))
+ else:
+ claims.append(self.api.get_claim("wdt:P275", license_QID))
+ return claims
+
+
+
+[docs]
+ def get_WB_package_date(self):
+ """Reads the package publication date saved in the local Wikibase instance.
+
+ Queries the WB Item corresponding to the R package label through the
+ Wikibase API.
+
+ Returns:
+ String: Package publication date in format DD-MM-YYYY.
+ """
+ package_dates = self.item.get_value("wdt:P5017") or []
+ return list(map(lambda x: x[1:11], package_dates))
+
+
+
+[docs]
+ def clean_package_list(self, table_html):
+ """Processes raw imported data from CRAN to enable the creation of items.
+
+ - Package dependencies are splitted at the comma position.
+ - License information is processed using the :meth:`split_license` method.
+ - Author information is processed using the :meth:`split_authors` method.
+ - Maintainer information is processed using the :meth:`clean_maintainer` method.
+
+ Args:
+ table_html:
+ HTML code obtained with BeautifulSoup corresponding to the table
+ containing the metadata of the R package imported from CRAN.
+ Returns:
+ (Pandas dataframe):
+ Dataframe with processed data from a single R package including columns:
+ **Version**, **Author**, **License**, **Depends**, **Imports**
+ and **Maintainer**.
+ """
+ package_df = pd.read_html(io.StringIO(str(table_html)))
+ package_df = package_df[0].set_index(0).T
+ package_df.columns = package_df.columns.str[:-1]
+ if "Depends" in package_df.columns:
+ package_df["Depends"] = package_df["Depends"].apply(self.split_list)
+ if "Imports" in package_df.columns:
+ package_df["Imports"] = package_df["Imports"].apply(self.split_list)
+ if "License" in package_df.columns:
+ package_df["License"] = package_df["License"].apply(self.split_license)
+ if "Author" in package_df.columns:
+ package_df["Author"] = str(table_html.find("td", text="Author:").find_next_sibling("td")).replace('\n', '').replace('\r', '')
+ package_df["Author"] = package_df["Author"].apply(self.split_authors)
+ if "Maintainer" in package_df.columns:
+ package_df["Maintainer"] = package_df["Maintainer"].apply(self.clean_maintainer)
+ return package_df
+
+
+
+[docs]
+ @staticmethod
+ def split_list(x):
+ """Splits given list in the comma position.
+
+ Args:
+ x (String): String to be splitted.
+ Returns:
+ (List): List of elements
+ """
+ return [] if pd.isna(x) else str(x).split(", ")
+
+
+
+[docs]
+ @staticmethod
+ def split_license(x):
+ """Splits string of licenses.
+
+ Takes into account that licenses are often not uniformly listed.
+ Characters \|, + and , are used to separate licenses. Further
+ details on each license are often included in square brackets.
+
+ Args:
+ x (String): String imported from CRAN representing license
+ information.
+
+ Returns:
+ (List): List of licenses.
+ """
+ if not pd.isna(x):
+ licenses = str(x).split(" | ")
+ license_list = []
+ i = 0
+ while i in range(len(licenses)):
+ if not re.findall("\[", licenses[i]) or (
+ re.findall("\[", licenses[i]) and re.findall("\]", licenses[i])
+ ):
+ license_list.append(licenses[i])
+ i += 1
+ elif re.findall("\[", licenses[i]) and not re.findall(
+ "\]", licenses[i]
+ ):
+ j = i + 1
+ license_aux = licenses[i]
+ closed = False
+ while j < len(licenses) and not closed:
+ license_aux += " | "
+ license_aux += licenses[j]
+ if re.findall("\]", licenses[j]):
+ closed = True
+ j += 1
+ license_list.append(license_aux)
+ i = j
+ split_list = []
+ for item in license_list:
+ items = item.split(" + ")
+ i = 0
+ while i in range(len(items)):
+ if not re.findall("\[", items[i]) or (
+ re.findall("\[", items[i]) and re.findall("\]", items[i])
+ ):
+ split_list.append(items[i])
+ i += 1
+ elif re.findall("\[", items[i]) and not re.findall("\]", items[i]):
+ j = i + 1
+ items_aux = items[i]
+ closed = False
+ while j < len(items) and not closed:
+ items_aux += " + "
+ items_aux += items[j]
+ if re.findall("\]", items[j]):
+ closed = True
+ j += 1
+ split_list.append(items_aux)
+ i = j
+ return list(dict.fromkeys(split_list))
+ else:
+ return []
+
+
+
+[docs]
+ def split_authors(self,x):
+ """Splits the string corresponding to the authors into a dictionary.
+
+ Author information in CRAN is not registered uniformly. This function
+ parses the imported string and returns just the names of the individuals
+ that can be unequivocally identified as authors (i.e. they are followed
+ by the *[aut]* abbreviation).
+
+ Generally, authors in CRAN are indicated with the abbreviation *[aut]*.
+ When no abbreviations are included, only the first individual is imported
+ to Wikibase (otherwise it can often not be established whether
+ information after the first author refers to another individual,
+ an institution, a funder, etc.)
+
+ Args:
+ x (String): String imported from CRAN representing author
+ information.
+
+ Returns:
+ (Dict): Dictionary of authors and corresponding ORCID ID, if provided.
+ """
+ x = re.sub("<td>", "", x)
+ x = re.sub("</td>", "", x)
+ x = re.sub("<img alt.*?a>", "", x)
+ x = re.sub("\(.*?\)", "", x)
+ x = re.sub("\t", "", x)
+ x = re.sub("ORCID iD", "", x)
+ authors = re.findall(".*?\]", x)
+ author_dict = {}
+ if authors:
+ for author in authors:
+ labels = re.findall("\[.*?\]", author)
+ if labels:
+ is_author = re.findall("aut", labels[0])
+ if is_author:
+ orcid = None
+ if re.findall("\d{4}-\d{4}-\d{4}-.{4}", author):
+ orcid = re.findall("\d{4}-\d{4}-\d{4}-.{4}", author)[0]
+ author = re.sub("<a href=.*?>", "", author)
+ author = re.sub("\[.*?\]", "", author)
+ author = re.sub("^\s?,", "", author)
+ author = re.sub("^\s?and\s?", "", author)
+ author = re.sub(
+ "[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", "", author
+ )
+ author = author.strip()
+ multiple_words = author.split(" ")
+ if len(multiple_words) > 1:
+ author = self.capitalize_author(author)
+ if author:
+ author_dict[author] = orcid
+ else:
+ authors_comma = x.split(", ")
+ authors_and = x.split(" and ")
+ if len(authors_and) > len(authors_comma):
+ author = re.sub(
+ "[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", "", authors_and[0]
+ )
+ else:
+ author = re.sub(
+ "[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+",
+ "",
+ authors_comma[0],
+ )
+ if len(author.split(" ")) > 5 or re.findall("[@\(\)\[\]&]", author):
+ author = ""
+ author = self.capitalize_author(author)
+ if author:
+ author_dict[author] = None
+ return author_dict
+
+
+
+[docs]
+ def capitalize_author(self, author):
+ if author != "":
+ author_terms = author.split()
+ author = author_terms[0].capitalize()
+ for index in range(1,len(author_terms)):
+ author = author + " " + author_terms[index].capitalize()
+ return author
+
+
+
+[docs]
+ def clean_maintainer(self, x):
+ """Remove unnecessary information from maintainer string.
+
+ Args:
+ x (String): String imported from CRAN which my contain e-mail
+ address and comments within brackets
+
+ Returns:
+ (String): Name of the maintainer
+ """
+ if not pd.isna(x):
+ x = re.sub("<.*?>", "", x)
+ x = re.sub("\(.*?\)", "", x)
+ x = self.capitalize_author(x)
+ return x.strip()
+ return x
+
+
+
+[docs]
+ def get_license_QID(self, license_str):
+ """Returns the Wikidata item ID corresponding to a software license.
+
+ The same license is often denominated in CRAN using differents names.
+ This function returns the wikidata item ID corresponding to a single
+ unique license that is referenced in CRAN under different names (e.g.
+ *Artistic-2.0* and *Artistic License 2.0* both refer to the same
+ license, corresponding to item *Q14624826*).
+
+ Args:
+ license (String): String corresponding to a license imported from CRAN.
+
+ Returns:
+ (String): Wikidata item ID.
+ """
+ if license_str == "ACM":
+ license_item = self.api.item.new()
+ license_item.labels.set(language="en", value="ACM Software License Agreement")
+ return license_item.is_instance_of("wd:Q207621")
+ elif license_str == "AGPL":
+ return "wd:Q28130012"
+ elif license_str == "AGPL-3":
+ return "wd:Q27017232"
+ elif license_str == "Apache License":
+ return "wd:Q616526"
+ elif license_str == "Apache License 2.0":
+ return "wd:Q13785927"
+ elif license_str == "Apache License version 1.1":
+ return "wd:Q17817999"
+ elif license_str == "Apache License version 2.0":
+ return "wd:Q13785927"
+ elif license_str == "Artistic-2.0":
+ return "wd:Q14624826"
+ elif license_str == "Artistic License 2.0":
+ return "wd:Q14624826"
+ elif license_str == "BSD 2-clause License":
+ return "wd:Q18517294"
+ elif license_str == "BSD 3-clause License":
+ return "wd:Q18491847"
+ elif license_str == "BSD_2_clause":
+ return "wd:Q18517294"
+ elif license_str == "BSD_3_clause":
+ return "wd:Q18491847"
+ elif license_str == "BSL":
+ return "wd:Q2353141"
+ elif license_str == "BSL-1.0":
+ return "wd:Q2353141"
+ elif license_str == "CC0":
+ return "wd:Q6938433"
+ elif license_str == "CC BY 4.0":
+ return "wd:Q20007257"
+ elif license_str == "CC BY-SA 4.0":
+ return "wd:Q18199165"
+ elif license_str == "CC BY-NC 4.0":
+ return "wd:Q34179348"
+ elif license_str == "CC BY-NC-SA 4.0":
+ return "wd:Q42553662"
+ elif license_str == "CeCILL":
+ return "wd:Q1052189"
+ elif license_str == "CeCILL-2":
+ return "wd:Q19216649"
+ elif license_str == "Common Public License Version 1.0":
+ return "wd:Q2477807"
+ elif license_str == "CPL-1.0":
+ return "wd:Q2477807"
+ elif license_str == "Creative Commons Attribution 4.0 International License":
+ return "wd:Q20007257"
+ elif license_str == "EPL":
+ return "wd:Q1281977"
+ elif license_str == "EUPL":
+ return "wd:Q1376919"
+ elif license_str == "EUPL-1.1":
+ return "wd:Q1376919"
+ elif license_str == "file LICENCE" or license_str == "file LICENSE":
+ license_item = self.api.item.new()
+ license_item.labels.set(language="en", value="File License")
+ return license_item.is_instance_of("wd:Q207621")
+ elif license_str == "FreeBSD":
+ return "wd:Q34236"
+ elif license_str == "GNU Affero General Public License":
+ return "wd:Q1131681"
+ elif license_str == "GNU General Public License":
+ return "wd:Q7603"
+ elif license_str == "GNU General Public License version 2":
+ return "wd:Q10513450"
+ elif license_str == "GNU General Public License version 3":
+ return "wd:Q10513445"
+ elif license_str == "GPL":
+ return "wd:Q7603"
+ elif license_str == "GPL-2":
+ return "wd:Q10513450"
+ elif license_str == "GPL-3":
+ return "wd:Q10513445"
+ elif license_str == "LGPL":
+ return "wd:Q192897"
+ elif license_str == "LGPL-2":
+ return "wd:Q23035974"
+ elif license_str == "LGPL-2.1":
+ return "wd:Q18534390"
+ elif license_str == "LGPL-3":
+ return "wd:Q18534393"
+ elif license_str == "Lucent Public License":
+ return "wd:Q6696468"
+ elif license_str == "MIT":
+ return "wd:Q334661"
+ elif license_str == "MIT License":
+ return "wd:Q334661"
+ elif license_str == "Mozilla Public License 1.1":
+ return "wd:Q26737735"
+ elif license_str == "Mozilla Public License 2.0":
+ return "wd:Q25428413"
+ elif license_str == "Mozilla Public License Version 2.0":
+ return "wd:Q25428413"
+ elif license_str == "MPL":
+ return "wd:Q308915"
+ elif license_str == "MPL version 1.0":
+ return "wd:Q26737738"
+ elif license_str == "MPL version 1.1":
+ return "wd:Q26737735"
+ elif license_str == "MPL version 2.0":
+ return "wd:Q25428413"
+ elif license_str == "MPL-1.1":
+ return "wd:Q26737735"
+ elif license_str == "MPL-2.0":
+ return "wd:Q25428413"
+ elif license_str == "Unlimited":
+ license_item = self.api.item.new()
+ license_item.labels.set(language="en", value="Unlimited License")
+ return license_item.is_instance_of("wd:Q207621")
+
+
+
+[docs]
+ def get_wikidata_QID(self):
+ results = search_entities(
+ search_string=self.label,
+ mediawiki_api_url='https://www.wikidata.org/w/api.php'
+ )
+
+ for result in results:
+ item = self.api.item.get(
+ entity_id=result,
+ mediawiki_api_url='https://www.wikidata.org/w/api.php'
+ )
+ if 'P31' in item.claims.get_json().keys():
+ instance_claims = item.claims.get('P31')
+ if instance_claims:
+ for claim in instance_claims:
+ claim = claim.get_json()
+ if claim['mainsnak']['datatype'] == "wikibase-item":
+ # If instance of R package
+ if 'datavalue' in claim['mainsnak'].keys():
+ if claim['mainsnak']['datavalue']['value']['id'] == "Q73539779":
+ return result
+
+
+
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Thu Feb 17 18:53:53 2022
+
+@author: alvaro
+"""
+
+
+
+[docs]
+class Importer:
+ """Controller class for importing data from an external source to the local Wikibase."""
+
+ def __init__(self, dataSource):
+ """
+ Construct.
+ Args:
+ entityCreator: object implementing AEntityCreator
+ dataSource: object implementig ADataSource
+ """
+ self.dataSource = dataSource
+
+
+[docs]
+ def import_all(self, pull=True, push=True):
+ """
+ Manages the import process.
+ """
+ self.dataSource.setup()
+ if pull:
+ self.dataSource.pull()
+ if push:
+ self.dataSource.push()
+
+
+
+
+
+[docs]
+class ADataSource:
+ """Abstract base class for reading data from external sources."""
+
+
+[docs]
+ def write_data_dump(self):
+ """
+ Write data dump from API.
+ """
+ raise NotImplementedError
+
+
+
+
+
+
+
+
+
+[docs]
+ def push(self):
+ """
+ Push data into the MaRDI knowledge graph.
+ """
+ raise NotImplementedError
+
+
+
+
+
+
+
+
+
+
+
+import re
+import sqlalchemy as db
+from sqlalchemy import and_
+
+from mardiclient import MardiItem, MardiProperty
+from wikibaseintegrator.wbi_exceptions import ModificationFailed
+from wikibaseintegrator.datatypes import ExternalID
+from wikibaseintegrator.wbi_enums import ActionIfExists
+from mardi_importer.importer import ImporterException
+
+
+[docs]
+class MardiItemEntity(MardiItem):
+
+
+
+
+
+[docs]
+ def get(self, entity_id, **kwargs):
+ json_data = super(MardiItemEntity, self)._get(entity_id=entity_id, **kwargs)
+ return MardiItemEntity(api=self.api).from_json(json_data=json_data['entities'][entity_id])
+
+
+
+[docs]
+ def get_QID(self):
+ """Creates a list of QID of all items in the local wikibase with the
+ same label
+
+ Returns:
+ QIDs (list): List of QID
+ """
+
+ label = ""
+ if 'en' in self.labels.values:
+ label = self.labels.values['en'].value
+
+ with self.api.engine.connect() as connection:
+ metadata = db.MetaData()
+ try:
+ wbt_item_terms = db.Table(
+ "wbt_item_terms", metadata, autoload_with=connection
+ )
+ wbt_term_in_lang = db.Table(
+ "wbt_term_in_lang", metadata, autoload_with=connection
+ )
+ wbt_text_in_lang = db.Table(
+ "wbt_text_in_lang", metadata, autoload_with=connection
+ )
+ wbt_text = db.Table(
+ "wbt_text", metadata, autoload_with=connection
+ )
+ query = (db.select(wbt_item_terms.columns.wbit_item_id)
+ .join(wbt_term_in_lang, wbt_item_terms.columns.wbit_term_in_lang_id == wbt_term_in_lang.columns.wbtl_id)
+ .join(wbt_text_in_lang, wbt_term_in_lang.columns.wbtl_text_in_lang_id == wbt_text_in_lang.columns.wbxl_id)
+ .join(wbt_text, wbt_text.columns.wbx_id == wbt_text_in_lang.columns.wbxl_text_id)
+ .where(and_(wbt_text.columns.wbx_text == bytes(label, "utf-8"),
+ wbt_term_in_lang.columns.wbtl_type_id == 1,
+ wbt_text_in_lang.columns.wbxl_language == bytes("en", "utf-8"))))
+ results = connection.execute(query).fetchall()
+ entity_id = []
+ if results:
+ for result in results:
+ entity_id.append(f"Q{str(result[0])}")
+
+ except Exception as e:
+ raise ImporterException(
+ "Error attempting to read mappings from database\n{}".format(e)
+ )
+
+ return entity_id
+
+
+
+
+
+[docs]
+class MardiPropertyEntity(MardiProperty):
+
+
+
+
+
+[docs]
+ def get(self, entity_id, **kwargs):
+ json_data = super(MardiPropertyEntity, self)._get(entity_id=entity_id, **kwargs)
+ return MardiPropertyEntity(api=self.api).from_json(json_data=json_data['entities'][entity_id])
+
+
+
+[docs]
+ def get_PID(self):
+ """Returns the PID of the property with the same label
+ """
+
+ label = ""
+ if 'en' in self.labels.values:
+ label = self.labels.values['en'].value
+
+ with self.api.engine.connect() as connection:
+ metadata = db.MetaData()
+ try:
+ wbt_property_terms = db.Table(
+ "wbt_property_terms", metadata, autoload_with=connection
+ )
+ wbt_term_in_lang = db.Table(
+ "wbt_term_in_lang", metadata, autoload_with=connection
+ )
+ wbt_text_in_lang = db.Table(
+ "wbt_text_in_lang", metadata, autoload_with=connection
+ )
+ wbt_text = db.Table(
+ "wbt_text", metadata, autoload_with=connection
+ )
+ query = (db.select(wbt_property_terms.columns.wbpt_property_id)
+ .join(wbt_term_in_lang, wbt_term_in_lang.columns.wbtl_id == wbt_property_terms.columns.wbpt_term_in_lang_id)
+ .join(wbt_text_in_lang, wbt_term_in_lang.columns.wbtl_text_in_lang_id == wbt_text_in_lang.columns.wbxl_id)
+ .join(wbt_text, wbt_text.columns.wbx_id == wbt_text_in_lang.columns.wbxl_text_id)
+ .where(and_(wbt_text.columns.wbx_text == bytes(label, "utf-8"),
+ wbt_term_in_lang.columns.wbtl_type_id == 1,
+ wbt_text_in_lang.columns.wbxl_language == bytes("en", "utf-8"))))
+ prefix = "P"
+ results = connection.execute(query).fetchall()
+ if results:
+ for result in results:
+ return f"P{str(result[0])}"
+
+ except Exception as e:
+ raise ImporterException(
+ "Error attempting to read mappings from database\n{}".format(e)
+ )
+
+
+
+import os
+import re
+import sqlalchemy as db
+
+from .MardiEntities import MardiItemEntity, MardiPropertyEntity
+from mardiclient import MardiClient
+from wikibaseintegrator import wbi_login
+from wikibaseintegrator.models import Claim, Claims, Qualifiers, Reference
+from wikibaseintegrator.wbi_config import config as wbi_config
+from wikibaseintegrator.wbi_enums import ActionIfExists
+from wikibaseintegrator.wbi_helpers import search_entities, execute_sparql_query
+from wikibaseintegrator.datatypes import (URL, CommonsMedia, ExternalID, Form, GeoShape, GlobeCoordinate, Item, Lexeme, Math, MonolingualText, MusicalNotation, Property, Quantity,
+ Sense, String, TabularData, Time)
+
+
+[docs]
+class MardiIntegrator(MardiClient):
+ def __init__(self, languages=["en", "de"]) -> None:
+ super().__init__()
+ self.languages = languages
+
+ self.setup = True
+ self.login = self.config()
+ self.engine = self.create_engine()
+ self.create_db_table()
+
+ # local id of properties for linking to wikidata PID/QID
+ self.wikidata_PID = self.init_wikidata_PID() if self.setup else None
+ self.wikidata_QID = self.init_wikidata_QID() if self.setup else None
+
+ self.item = MardiItemEntity(api=self)
+ self.property = MardiPropertyEntity(api=self)
+
+ self.excluded_properties = ['P1151', 'P1855', 'P2302', 'P2559', \
+ 'P2875', 'P3254', 'P3709', 'P3713', \
+ 'P3734', 'P6104', 'P6685', 'P8979']
+
+
+[docs]
+ def config(self):
+ """
+ Sets up initial configuration for the integrator
+
+ Returns:
+ Clientlogin object
+ """
+ if os.environ.get("IMPORTER_USER") and os.environ.get("IMPORTER_PASS"):
+ wbi_config["USER_AGENT"] = os.environ.get("IMPORTER_AGENT")
+ wbi_config["MEDIAWIKI_API_URL"] = os.environ.get("MEDIAWIKI_API_URL")
+ wbi_config["SPARQL_ENDPOINT_URL"] = os.environ.get("SPARQL_ENDPOINT_URL")
+ wbi_config["WIKIBASE_URL"] = os.environ.get("WIKIBASE_URL")
+ return wbi_login.Clientlogin(
+ user=os.environ.get("IMPORTER_USER"),
+ password=os.environ.get("IMPORTER_PASS"),
+ )
+ else:
+ self.setup = False
+
+
+
+[docs]
+ def create_engine(self):
+ """
+ Creates SQLalchemy engine
+
+ Returns:
+ SQLalchemy engine
+ """
+ if self.setup:
+ db_user = os.environ["DB_USER"]
+ db_pass = os.environ["DB_PASS"]
+ db_name = os.environ["DB_NAME"]
+ db_host = os.environ["DB_HOST"]
+ return db.create_engine(
+ f"mysql+mysqlconnector://{db_user}:{db_pass}@{db_host}/{db_name}"
+ )
+
+
+
+[docs]
+ def create_id_list_from_file(self, file):
+ """Function for creating a list of ids
+ from a while where each id is in a new line
+
+ Args:
+ file: path to file
+
+ Returns: list of ids
+ """
+ id_list = []
+ with open(file, "r") as file:
+ for line in file:
+ id_list.append(line.strip())
+ return id_list
+
+
+
+[docs]
+ def create_db_table(self):
+ """
+ Check if db table for id mapping is there; if not, create.
+
+ Args:
+ None
+
+ Returns:
+ None
+ """
+ if self.engine:
+ with self.engine.connect() as connection:
+ metadata = db.MetaData()
+ if not db.inspect(self.engine).has_table("wb_id_mapping"):
+ mapping_table = db.Table(
+ "wb_id_mapping",
+ metadata,
+ db.Column("id", db.Integer, primary_key=True),
+ db.Column("wikidata_id", db.String(24), nullable=False),
+ db.Column("local_id", db.String(24), nullable=False),
+ db.Column("has_all_claims", db.Boolean(), nullable=False),
+ db.UniqueConstraint("wikidata_id"),
+ db.UniqueConstraint("local_id"),
+ )
+ metadata.create_all(self.engine)
+
+
+
+[docs]
+ def insert_id_in_db(self, wikidata_id, local_id, has_all_claims):
+ """
+ Insert wikidata_id, local_id and has_all_claims into mapping table.
+
+ Args:
+ wikidata_id: Wikidata id
+ local_id: local Wikibase id
+ has_all_claims: Boolean indicating whether the entity has been
+ imported with all claims or no claims (i.e. no recurse)
+
+ Returns:
+ None
+ """
+ metadata = db.MetaData()
+ table = db.Table(
+ "wb_id_mapping",
+ metadata,
+ autoload_with=self.engine
+ )
+
+ ins = table.insert().values(
+ wikidata_id=wikidata_id,
+ local_id=local_id,
+ has_all_claims=has_all_claims
+ )
+
+ with self.engine.connect() as connection:
+ connection.execute(ins)
+ connection.commit()
+
+
+
+[docs]
+ def update_has_all_claims(self, wikidata_id):
+ """
+ Set the has_all_claims property in the wb_id_mapping table
+ to True for the given wikidata_id.
+
+ Args:
+ wikidata_id: Wikidata id to be updated.
+
+ Returns:
+ None
+ """
+ metadata = db.MetaData()
+ table = db.Table(
+ "wb_id_mapping",
+ metadata,
+ autoload_with=self.engine
+ )
+
+ ins = table.update().values(
+ has_all_claims=True
+ ).where(table.c.wikidata_id == wikidata_id)
+
+ with self.engine.connect() as connection:
+ connection.execute(ins)
+ connection.commit()
+
+
+
+[docs]
+ def init_wikidata_PID(self):
+ """
+ Searches the wikidata PID property ID to link
+ properties to its ID in wikidata. When not found,
+ it creates the property.
+
+ Returns
+ wikidata_PID (str): wikidata PID property ID
+ """
+ label = "Wikidata PID"
+ wikidata_PID = self.get_local_id_by_label(label, "property")
+ if not wikidata_PID:
+ prop = self.property.new()
+ prop.labels.set(language="en", value=label)
+ prop.descriptions.set(
+ language="en",
+ value="Identifier in Wikidata of the corresponding properties"
+ )
+ prop.datatype = "external-id"
+ wikidata_PID = prop.write(login=self.login, as_new=True).id
+ return wikidata_PID
+
+
+
+[docs]
+ def init_wikidata_QID(self):
+ """
+ Searches the wikidata QID property ID to link
+ items to its ID in wikidata. When not found,
+ it creates the property.
+
+ Returns
+ wikidata_QID (str): wikidata QID property ID
+ """
+ label = "Wikidata QID"
+ wikidata_QID = self.get_local_id_by_label(label, "property")
+ if not wikidata_QID:
+ prop = self.property.new()
+ prop.labels.set(language="en", value=label)
+ prop.descriptions.set(
+ language="en",
+ value="Corresponding QID in Wikidata"
+ )
+ prop.datatype = "external-id"
+ wikidata_QID = prop.write(login=self.login, as_new=True).id
+ return wikidata_QID
+
+
+
+[docs]
+ def import_entities(self, id_list=None, filename="", recurse=True):
+ """Function for importing entities from wikidata
+ into the local instance.
+
+ It can accept a single id, a list of ids or a file containing
+ a the ids to be imported.
+
+ Args:
+ id_list: Single string or list of strings of wikidata
+ entity ids. Lexemes not supported.
+ filename: Filename containing list of entities to be
+ imported.
+ recurse: Whether to import claims for the entities in
+ id_list
+
+ Returns:
+ Imported entities (Dict): Dictionary containing the local ids of
+ all the imported entities.
+ """
+ imported_entities = {}
+ if filename: id_list = self.create_id_list_from_file(filename)
+ if isinstance(id_list, str): id_list = [id_list]
+
+ for wikidata_id in id_list:
+
+ if wikidata_id.startswith("L"):
+ print(
+ f"Warning: Lexemes not supported. Lexeme {wikidata_id} was not imported"
+ )
+ continue
+
+ print(f"importing entity {wikidata_id}")
+
+ has_all_claims = self.query('has_all_claims', wikidata_id)
+ if not has_all_claims:
+ # API call
+ entity = self.get_wikidata_information(
+ wikidata_id,
+ recurse
+ )
+
+ if not entity:
+ print(f"No labels for entity with id {wikidata_id}, skipping")
+ continue
+
+ if entity.type == "property" and entity.datatype.value in \
+ ["wikibase-lexeme", "wikibase-sense", "wikibase-form"]:
+ print(f"Warning: Lexemes not supported. Property skipped")
+ continue
+
+ # Check if there is an internal ID redirection in Wikidata
+ if wikidata_id != entity.id:
+ wikidata_id = entity.id
+ has_all_claims = self.query('has_all_claims', wikidata_id)
+ if has_all_claims:
+ imported_entities[wikidata_id] = self.query('local_id', wikidata_id)
+ continue
+
+ if recurse:
+ self.convert_claim_ids(entity)
+
+ entity.add_linker_claim(wikidata_id)
+
+ local_id = entity.exists()
+ if not local_id:
+ local_id = self.query('local_id', wikidata_id)
+
+ if local_id:
+ # Update existing entity
+ if entity.type == "item":
+ local_entity = self.item.get(entity_id=local_id)
+ elif entity.type == "property":
+ local_entity = self.property.get(entity_id=local_id)
+ # replace descriptions
+ local_entity.descriptions = entity.descriptions
+ # add new claims if they are different from old claims
+ local_entity.claims.add(
+ entity.claims,
+ ActionIfExists.APPEND_OR_REPLACE,
+ )
+ local_entity.write(login=self.login)
+ if self.query('local_id', wikidata_id) and recurse:
+ self.update_has_all_claims(wikidata_id)
+ else:
+ self.insert_id_in_db(wikidata_id, local_id, has_all_claims=recurse)
+ else:
+ # Create entity
+ local_id = entity.write(login=self.login, as_new=True).id
+ self.insert_id_in_db(wikidata_id, local_id, has_all_claims=recurse)
+
+ if has_all_claims:
+ imported_entities[wikidata_id] = self.query('local_id', wikidata_id)
+ else:
+ imported_entities[wikidata_id] = local_id
+
+ if len(imported_entities) == 1:
+ return list(imported_entities.values())[0]
+ return imported_entities
+
+
+
+[docs]
+ def overwrite_entity(self, wikidata_id, local_id):
+ """Function for completing an already existing local entity
+ with its statements from wikidata.
+
+ Args:
+ wikidata_id: Wikidata entity ID to be imported.
+ local_id: Local id of the existing entity that needs to
+ be completed with further statements.
+
+ Returns:
+ local_id: Local entity ID
+ """
+ if wikidata_id.startswith("L"):
+ print(
+ f"Warning: Lexemes not supported. Lexeme {wikidata_id} was not imported"
+ )
+
+ print(f"Overwriting entity {local_id}")
+
+ has_all_claims = self.query('has_all_claims', wikidata_id)
+ if has_all_claims:
+ return self.query('local_id', wikidata_id)
+ else:
+ # API call
+ entity = self.get_wikidata_information(
+ wikidata_id,
+ recurse=True
+ )
+
+ if entity:
+
+ # Check if there is an entity ID redirection in Wikidata
+ if wikidata_id != entity.id:
+ wikidata_id = entity.id
+ has_all_claims = self.query('has_all_claims', wikidata_id)
+ if has_all_claims:
+ return self.query('local_id', wikidata_id)
+
+ self.convert_claim_ids(entity)
+ entity.add_linker_claim(wikidata_id)
+
+ # Retrieve existing entity
+ if entity.type == "item":
+ local_entity = self.item.get(entity_id=local_id)
+ elif entity.type == "property":
+ local_entity = self.property.get(entity_id=local_id)
+ # replace descriptions
+ local_entity.descriptions = entity.descriptions
+ # add new claims if they are different from old claims
+ local_entity.claims.add(
+ entity.claims,
+ ActionIfExists.APPEND_OR_REPLACE,
+ )
+ local_entity.write(login=self.login)
+ if self.query('local_id', wikidata_id):
+ self.update_has_all_claims(wikidata_id)
+ else:
+ self.insert_id_in_db(wikidata_id, local_id, has_all_claims=True)
+
+ return local_id
+
+
+
+[docs]
+ def import_claim_entities(self, wikidata_id):
+ """Function for importing entities that are mentioned
+ in claims from wikidata to the local wikibase instance
+
+ Args:
+ wikidata_id(str): id of the entity to be imported
+
+ Returns:
+ local id or None, if the entity had no labels
+ """
+ local_id = self.query('local_id', wikidata_id)
+ if local_id: return local_id
+ else:
+ entity = self.get_wikidata_information(wikidata_id)
+
+ if not entity:
+ return None
+
+ if entity.type == "property" and \
+ entity.datatype.value in ["wikibase-lexeme", \
+ "wikibase-sense", "wikibase-form"]:
+ return None
+
+ elif wikidata_id != entity.id:
+ wikidata_id = entity.id
+ local_id = self.query('local_id', wikidata_id)
+ if local_id: return local_id
+
+ # Check if the entity has been redirected by Wikidata
+ # into another entity that has already been imported
+ local_id = self.query('local_id', entity.id)
+ if local_id: return local_id
+
+ local_id = entity.exists()
+ if local_id:
+ if entity.type == "item":
+ new_entity = self.item.get(entity_id=local_id)
+ elif entity.type == "property":
+ new_entity = self.property.get(entity_id=local_id)
+ # replace descriptions
+ new_entity.descriptions = entity.descriptions
+ entity = new_entity
+ entity.add_linker_claim(wikidata_id)
+ local_id = entity.write(login=self.login).id
+ else:
+ entity.add_linker_claim(wikidata_id)
+ local_id = entity.write(login=self.login, as_new=True).id
+
+ self.insert_id_in_db(wikidata_id, local_id, has_all_claims=False)
+ return local_id
+
+
+
+[docs]
+ def get_wikidata_information(self, wikidata_id, recurse=False):
+ """Function for pulling wikidata information
+
+ Args:
+ wikidata_id (str): wikidata id of the desired entity
+ recurse (Bool): if claims should also be imported
+
+ Returns: wikibase integrator entity or None, if the entity has no labels
+
+ """
+ if wikidata_id.startswith("Q"):
+ entity = self.item.get(
+ entity_id=wikidata_id,
+ mediawiki_api_url='https://www.wikidata.org/w/api.php'
+ )
+ elif wikidata_id.startswith("P"):
+ entity = self.property.get(
+ entity_id=wikidata_id,
+ mediawiki_api_url='https://www.wikidata.org/w/api.php'
+ )
+ else:
+ raise Exception(
+ f"Wrong ID format, should start with P, L or Q but ID is {wikidata_id}"
+ )
+ if not self.languages == "all":
+ # set labels in desired languages
+ label_dict = {
+ k: entity.labels.values[k]
+ for k in self.languages
+ if k in entity.labels.values
+ }
+ # if there are no labels, this is not
+ # a valid entity
+ if not label_dict:
+ return None
+ entity.labels.values = label_dict
+
+ # set descriptions in desired languages
+ description_dict = {
+ k: entity.descriptions.values[k]
+ for k in self.languages
+ if k in entity.descriptions.values
+ }
+ entity.descriptions.values = description_dict
+
+ # make sure label != description (e.g. wdt:P121)
+ for k in self.languages:
+ if (label_dict.get(k) and
+ label_dict.get(k) == description_dict.get(k)):
+ entity.descriptions.set(
+ language=k,
+ value=None
+ )
+
+ # set aliases in desired languages
+ alias_dict = {
+ k: entity.aliases.aliases[k]
+ for k in self.languages
+ if k in entity.aliases.aliases
+ }
+ entity.aliases.aliases = alias_dict
+ if not recurse:
+ entity.claims = Claims()
+ return entity
+
+
+
+[docs]
+ def convert_claim_ids(self, entity):
+ """Function for in-place conversion of wikidata
+ ids found in claims into local ids
+
+ Args:
+ entity
+
+ Returns:
+ None
+ """
+ entity_names = [
+ "wikibase-item",
+ "wikibase-property",
+ ]
+ claims = entity.claims.claims
+ new_claims = {}
+ # structure of claims: Dict[str,List[Claim]]
+ # where str is the property id
+ for prop_id, claim_list in claims.items():
+ local_claim_list = []
+ if prop_id not in self.excluded_properties:
+ local_prop_id = self.import_claim_entities(wikidata_id=prop_id)
+ if not local_prop_id:
+ print("Warning: local id skipped")
+ continue
+ for c in claim_list:
+ c_dict = c.get_json()
+ if c_dict["mainsnak"]["datatype"] in entity_names:
+ if "datavalue" in c_dict["mainsnak"]:
+ local_mainsnak_id = self.import_claim_entities(
+ wikidata_id=c_dict["mainsnak"]["datavalue"]["value"]["id"],
+ )
+ if not local_mainsnak_id:
+ continue
+ c_dict["mainsnak"]["datavalue"]["value"][
+ "id"
+ ] = local_mainsnak_id
+ c_dict["mainsnak"]["datavalue"]["value"]["numeric-id"] = int(
+ local_mainsnak_id[1:]
+ )
+ c_dict["mainsnak"]["property"] = local_prop_id
+ # to avoid problem with missing reference hash
+ if "references" in c_dict:
+ c_dict.pop("references")
+ new_c = Claim().from_json(c_dict)
+ new_c.id = None
+ else:
+ continue
+ elif c_dict["mainsnak"]["datatype"] in ["wikibase-lexeme", "wikibase-sense", "wikibase-form"]:
+ continue
+ else:
+ self.convert_entity_links(snak=c_dict["mainsnak"])
+ new_c = c
+ new_c.mainsnak.property_number = local_prop_id
+ new_c.id = None
+ # get reference details
+ new_references = self.get_references(c)
+ if new_references:
+ new_c.references.references = new_references
+ # get qualifier details
+ new_qualifiers = self.get_qualifiers(c)
+ new_c.qualifiers = new_qualifiers
+ local_claim_list.append(new_c)
+ new_claims[local_prop_id] = local_claim_list
+ entity.claims.claims = new_claims
+
+
+
+[docs]
+ def get_references(self, claim):
+ """Function for creating references from wikidata references
+ and in place adding them to the claim
+
+ Args:
+ claim: a wikibaseintegrator claim
+
+ Returns:
+ List with references, can also be an empty list
+ """
+ entity_names = [
+ "wikibase-item",
+ "wikibase-property",
+ ]
+ # format: List(Reference)
+ ref_list = claim.references.references
+ if not ref_list:
+ return None
+ new_ref_list = []
+ for ref in ref_list:
+ new_snak_dict = {}
+ snak_dict = ref.get_json()
+ for prop_id, snak_list in snak_dict["snaks"].items():
+ new_snak_list = []
+ new_prop_id = self.import_claim_entities(
+ wikidata_id=prop_id,
+ )
+ if not new_prop_id:
+ continue
+ for snak in snak_list:
+ if snak["datatype"] in entity_names:
+ if not "datavalue" in snak:
+ continue
+ new_snak_id = self.import_claim_entities(
+ wikidata_id=snak["datavalue"]["value"]["id"],
+ )
+ if not new_snak_id:
+ continue
+ snak["datavalue"]["value"]["id"] = new_snak_id
+ snak["datavalue"]["value"]["numeric-id"] = int(new_snak_id[1:])
+ elif snak["datatype"] in ["wikibase-lexeme", "wikibase-sense", "wikibase-form"]:
+ continue
+ else:
+ self.convert_entity_links(
+ snak=snak,
+ )
+ snak["property"] = new_prop_id
+ new_snak_list.append(snak)
+ new_snak_dict[new_prop_id] = new_snak_list
+ complete_new_snak_dict = {}
+ complete_new_snak_dict["hash"] = None
+ complete_new_snak_dict["snaks"] = new_snak_dict
+ complete_new_snak_dict["snaks-order"] = []
+ r = Reference()
+ new_ref_list.append(r.from_json(json_data=complete_new_snak_dict))
+ return new_ref_list
+
+
+
+[docs]
+ def get_qualifiers(self, claim):
+ """Function for creating qualifiers from wikidata qualifiers
+ and in place adding them to the claim
+
+ Args:
+ claim: a wikibaseintegrator claim
+
+ Returns:
+ Qualifiers object, can also be an empty object
+ """
+ entity_names = [
+ "wikibase-item",
+ "wikibase-property",
+ ]
+ qual_dict = claim.qualifiers.get_json()
+ new_qual_dict = {}
+ for qual_id, qual_list in qual_dict.items():
+ new_qual_id = self.import_claim_entities(wikidata_id=qual_id)
+ if not new_qual_id:
+ continue
+ new_qual_list = []
+ for qual_val in qual_list:
+ if qual_val["datatype"] in entity_names:
+ if not "datavalue" in qual_val:
+ continue
+ new_qual_val_id = self.import_claim_entities(
+ wikidata_id=qual_val["datavalue"]["value"]["id"],
+ )
+ if not new_qual_val_id:
+ continue
+ qual_val["datavalue"]["value"]["id"] = new_qual_val_id
+ qual_val["datavalue"]["value"]["numeric-id"] = int(
+ new_qual_val_id[1:]
+ )
+ elif qual_val["datatype"] in ["wikibase-lexeme", "wikibase-sense", "wikibase-form"]:
+ continue
+ else:
+ self.convert_entity_links(
+ snak=qual_val,
+ )
+ qual_val["property"] = new_qual_id
+ new_qual_list.append(qual_val)
+ new_qual_dict[new_qual_id] = new_qual_list
+ q = Qualifiers()
+ qualifiers = q.from_json(json_data=new_qual_dict)
+ return qualifiers
+
+
+
+[docs]
+ def convert_entity_links(self, snak):
+ """Function for in-place conversion of unit for quantity
+ and globe for globecoordinate to a link to the local entity
+ instead of a link to the wikidata entity.
+
+ Args:
+ snak: a wikibaseintegrator snak
+
+ Returns:
+ None
+ """
+ if "datatype" not in snak or "datavalue" not in snak:
+ return
+ data = snak["datavalue"]["value"]
+ if snak["datatype"] == "quantity":
+ if "unit" in data:
+ link_string = data["unit"]
+ key_string = "unit"
+ elif snak["datatype"] == "globe-coordinate":
+ #if "globe" in data:
+ # link_string = data["globe"]
+ # key_string = "globe"
+ if not data["precision"]:
+ data["precision"] = 1/3600
+ return
+ else:
+ return
+ if "www.wikidata.org/" in link_string:
+ uid = link_string.split("/")[-1]
+ local_id = self.import_claim_entities(
+ wikidata_id=uid,
+ )
+ data[key_string] = wbi_config["WIKIBASE_URL"] + "/entity/" + local_id
+
+
+
+[docs]
+ def query(self, parameter, wikidata_id):
+ """Query the wb_id_mapping db table for a given parameter.
+
+ The two important parameters are the local_id and whether the
+ entity has already been imported with all claims
+
+ Args:
+ parameter (str): Either local_id or has_all_claims
+ wikidata_id (str): Wikidata ID
+ Returns:
+ str or boolean: for local_id returns the local ID if it exists,
+ otherwise None. For has_all_claims, a boolean is returned.
+ """
+ metadata = db.MetaData()
+ table = db.Table(
+ "wb_id_mapping",
+ metadata,
+ autoload_with=self.engine
+ )
+ if parameter in ['local_id', 'has_all_claims']:
+ sql = db.select(table.columns[parameter]).where(
+ table.columns.wikidata_id == wikidata_id,
+ )
+ with self.engine.connect() as connection:
+ db_result = connection.execute(sql).fetchone()
+ if db_result:
+ return db_result[0]
+
+
+
+[docs]
+ def query_with_local_id(self, parameter, local_id):
+ """Query the wb_id_mapping db table for a given parameter.
+
+ The two important parameters are the wikidata_id and whether the
+ entity has already been imported with all claims
+
+ Args:
+ parameter (str): Either wikidata_id or has_all_claims
+ local_id (str): local ID
+ Returns:
+ str or boolean: for wikidata_id returns the wikidata ID if it exists,
+ otherwise None. For has_all_claims, a boolean is returned.
+ """
+ metadata = db.MetaData()
+ table = db.Table(
+ "wb_id_mapping",
+ metadata,
+ autoload_with=self.engine
+ )
+ if parameter in ['wikidata_id', 'has_all_claims']:
+ sql = db.select(table.columns[parameter]).where(
+ table.columns.local_id == local_id,
+ )
+ with self.engine.connect() as connection:
+ db_result = connection.execute(sql).fetchone()
+ if db_result:
+ return db_result[0]
+
+
+
+[docs]
+ def get_local_id_by_label(self, entity_str, entity_type):
+ """Check if entity with a given label or wikidata PID/QID
+ exists in the local wikibase instance.
+
+ Args:
+ entity_str (str): It can be a string label or a wikidata ID,
+ specified with the prefix wdt: for properties and wd:
+ for items.
+ entity_type (str): Either 'property' or 'item' to specify
+ which type of entity to look for.
+
+ Returns:
+ str: Local ID of the entity, if found.
+ """
+ if re.match("^[PQ]\d+$", entity_str):
+ return entity_str
+ elif not entity_str.startswith("wdt:") and not entity_str.startswith("wd:"):
+ if entity_type == "property":
+ new_property = MardiPropertyEntity(api=self).new()
+ new_property.labels.set(language='en', value=entity_str)
+ return new_property.get_PID()
+ elif entity_type == "item":
+ new_item = MardiItemEntity(api=self).new()
+ new_item.labels.set(language='en', value=entity_str)
+ return new_item.get_QID()
+ elif entity_str.startswith("wdt:"):
+ wikidata_id = entity_str[4:]
+ elif entity_str.startswith("wd:"):
+ wikidata_id = entity_str[3:]
+
+ with self.engine.connect() as connection:
+ metadata = db.MetaData()
+ table = db.Table(
+ "wb_id_mapping", metadata, autoload_with=connection
+ )
+ sql = db.select(table.columns.local_id).where(
+ table.columns.wikidata_id == wikidata_id,
+ )
+ db_result = connection.execute(sql).fetchone()
+ if db_result:
+ return db_result[0]
+
+
+
+[docs]
+ def import_from_label(self, label):
+ """
+ Imports an entity from Wikidata just from a label
+
+ Args:
+ label (str): label to be imported from wikidata
+
+ Returns:
+ local_id (str): local id for the imported entity
+ """
+ results = search_entities(label,
+ dict_result=True,
+ mediawiki_api_url='https://www.wikidata.org/w/api.php')
+ for result in results:
+ if label == result['label']:
+ return self.import_entities(result['id'])
+ if label.lower() == result['label'].lower():
+ return self.import_entities(result['id'])
+ if result['aliases']:
+ if label.lower() == result['aliases'][0].lower():
+ return self.import_entities(result['id'])
+
+
+
+import json
+import os
+import re
+import sys
+import time
+import traceback
+import xml.etree.ElementTree as ET
+
+from datetime import datetime
+from habanero import Crossref # , RequestError
+from requests.exceptions import HTTPError, ContentDecodingError
+from sickle import Sickle
+
+from mardi_importer.integrator import MardiIntegrator
+from mardi_importer.importer import ADataSource
+from .ZBMathPublication import ZBMathPublication
+from .ZBMathAuthor import ZBMathAuthor
+from .ZBMathJournal import ZBMathJournal
+from .misc import get_tag, get_info_from_doi
+
+
+
+[docs]
+class ZBMathSource(ADataSource):
+ """Reads data from zb math API."""
+
+ def __init__(
+ self,
+ out_dir,
+ tags,
+ from_date=None,
+ until_date=None,
+ raw_dump_path=None,
+ processed_dump_path=None,
+ split_id=None,
+ ): # , path
+ """
+ Args:
+ out_dir (string): target directory for saved files
+ tags (list): list of tags to extract from the zbMath response
+ from_date (string, optional): earliest date from when to pull information
+ until_date (string, optional): latest date from when to pull information
+ raw_dump_path (string, optional): path where the raw data dump is located, in case it has previously been pulled
+ processed_dump_path (string, optional): path to the processed dump file
+ split_id (string, optional): zbMath id from where to start processing the raw dump, in case it aborted mid-processing
+ """
+ # load the list of swMath software
+ # software_df = pd.read_csv(path)
+ # self.software_list = software_df['Software'].tolist()
+ if out_dir[-1] != "/":
+ out_dir = out_dir + "/"
+ self.out_dir = out_dir
+ self.split_id = split_id
+ if self.split_id:
+ self.split_mode = True
+ else:
+ self.split_mode = False
+ self.from_date = from_date
+ self.until_date = until_date
+ self.tags = tags
+ self.integrator = MardiIntegrator()
+ self.conflict_string = (
+ "zbMATH Open Web Interface contents unavailable due to conflicting licenses"
+ )
+ self.raw_dump_path = raw_dump_path
+ self.filepath = os.path.realpath(os.path.dirname(__file__))
+ self.processed_dump_path = processed_dump_path
+ self.namespace = "http://www.openarchives.org/OAI/2.0/"
+ self.preview_namespace = "https://zbmath.org/OAI/2.0/oai_zb_preview/"
+ self.tag_namespace = "https://zbmath.org/zbmath/elements/1.0/"
+ self.conflict_text = "zbMATH Open Web Interface contents unavailable due to conflicting licenses."
+ # dict for counting how often a doi was not found and which agency it was registered with
+ self.unknown_doi_agency_dict = {"Crossref": [], "crossref": [], "nonsense": []}
+ # tags that will not be found in doi query
+ self.internal_tags = ["author_id", "source", "classifications", "links"]
+ self.existing_authors = {}
+ self.existing_journals = {}
+ self.existing_publications = []
+
+
+[docs]
+ def setup(self):
+ """Create all necessary properties and entities for zbMath"""
+ # Import entities from Wikidata
+ filename = self.filepath + "/wikidata_entities.txt"
+ self.integrator.import_entities(filename=filename)
+ self.create_local_entities()
+ self.de_number_prop = self.integrator.get_local_id_by_label(
+ "zbMATH DE Number", "property"
+ )
+ self.keyword_prop = self.integrator.get_local_id_by_label(
+ "zbMATH keyword string", "property"
+ )
+
+
+
+[docs]
+ def create_local_entities(self):
+ filename = self.filepath + "/new_entities.json"
+ f = open(filename)
+ entities = json.load(f)
+
+ for prop_element in entities["properties"]:
+ prop = self.integrator.property.new()
+ prop.labels.set(language="en", value=prop_element["label"])
+ prop.descriptions.set(language="en", value=prop_element["description"])
+ prop.datatype = prop_element["datatype"]
+ if not prop.exists():
+ prop.write()
+
+ for item_element in entities["items"]:
+ item = self.integrator.item.new()
+ item.labels.set(language="en", value=item_element["label"])
+ item.descriptions.set(language="en", value=item_element["description"])
+ for key, value in item_element["claims"].items():
+ item.add_claim(key, value=value)
+ if not item.exists():
+ item.write()
+
+
+
+
+
+
+[docs]
+ def write_data_dump(self):
+ """
+ Overrides abstract method.
+ This method queries the zbMath API to get a data dump of all records,
+ optionally between from_date and until_date
+ """
+ timestr = time.strftime("%Y%m%d-%H%M%S")
+ self.raw_dump_path = self.out_dir + "raw_zbmath_data_dump" + timestr + ".txt"
+ sickle = Sickle("https://oai.zbmath.org/v1")
+ # date has to have format like 2012-12-12
+ if self.from_date and self.until_date:
+ records = sickle.ListRecords(
+ **{
+ "metadataPrefix": "oai_zb_preview",
+ "from": self.from_date,
+ "until": self.until_date,
+ }
+ )
+ elif self.from_date:
+ records = sickle.ListRecords(
+ **{"metadataPrefix": "oai_zb_preview", "from": self.from_date}
+ )
+ elif self.until_date:
+ records = sickle.ListRecords(
+ **{"metadataPrefix": "oai_zb_preview", "until": self.until_date}
+ )
+ else:
+ records = sickle.ListRecords(metadataPrefix="oai_zb_preview")
+ with open(self.raw_dump_path, "w+") as f:
+ for rec in records:
+ f.write(rec.raw + "\n")
+
+
+
+[docs]
+ def process_data(self):
+ """
+ Overrides abstract method.
+ Reads a raw zbMath data dump and processes it, then saves it as a csv.
+ """
+ if not (self.processed_dump_path and self.split_mode):
+ timestr = time.strftime("%Y%m%d-%H%M%S")
+ self.processed_dump_path = (
+ self.out_dir + "zbmath_data_dump" + timestr + ".csv"
+ )
+ # def do_all(xml_file, out_file):
+ with open(self.raw_dump_path) as infile:
+ with open(self.processed_dump_path, "a") as outfile:
+ # if we are not continuing with a pre-filled file
+ if not self.split_mode:
+ outfile.write(
+ "de_number\t"
+ + "creation_date\t"
+ + ("\t").join(self.tags)
+ + "_text\treview_sign\treviewer_id\n"
+ )
+ record_string = ""
+ for line in infile:
+ record_string = record_string + line
+ if line.endswith("</record>\n"):
+ element = ET.fromstring(record_string)
+ if self.split_mode:
+ de_number = self.get_de_number(element)
+ # if the last processed id is found
+ if de_number == self.split_id:
+ # next iteration, continue with writing
+ self.split_mode = False
+ record_string = ""
+ continue
+ else:
+ # continue searching
+ record_string = ""
+ continue
+ record = self.parse_record(element)
+ if record:
+ outfile.write(
+ "\t".join(str(x) for x in record.values()) + "\n"
+ )
+ record_string = ""
+
+
+
+[docs]
+ def parse_record(self, xml_record):
+ """
+ Parse xml record from zbMath API.
+
+ Args:
+ xml_record (xml element): record returned by zbMath API
+
+ Returns:
+ dict: dict of (tag,value) pairs extracted from xml_record
+ """
+ is_conflict = False
+ new_entry = {}
+ # zbMath identifier
+ de_number = self.get_de_number(xml_record)
+ creation_date = self.get_creation_date(xml_record)
+ new_entry["de_number"] = de_number
+ new_entry["creation_date"] = creation_date
+ # read tags
+ zb_preview = xml_record.find(
+ get_tag("metadata", namespace=self.namespace)
+ ).find(get_tag("zbmath", self.preview_namespace))
+ if zb_preview:
+ for tag in self.tags:
+ value = zb_preview.find(get_tag(tag, self.tag_namespace))
+ if value is not None:
+ if len(value):
+ if tag == "review":
+ for subtag in ["review_text", "review_sign", "reviewer_id"]:
+ subvalue = value.find(
+ get_tag(subtag, self.tag_namespace)
+ )
+ if subvalue is not None:
+ if len(subvalue):
+ sys.exit(f"tag {subtag} has children")
+ else:
+ text = subvalue.text
+ if subtag == "review_text":
+ text = text.replace("\t", " ")
+ text = text.replace("\n", " ")
+ new_entry[subtag] = text
+ else:
+ new_entry[subtag] = None
+ continue
+
+ # element has children
+ texts = []
+ for child in value:
+ texts.append(child.text)
+ texts = [t for t in texts if t is not None]
+ text = ";".join(
+ texts
+ ) # multiple values are rendered as a semicolon-separated string
+
+ else:
+ # element content is a simple text
+ text = zb_preview.find(get_tag(tag, self.tag_namespace)).text
+
+ new_entry[tag] = text
+ # if tag is not found in zbMath return, we still want to get it from doi
+ else:
+ new_entry[tag] = None
+ # return record, even if incomplete
+ return new_entry
+ else:
+ sys.exit("Error: zb_preview not found")
+
+
+
+[docs]
+ def push(self):
+ """Updates the MaRDI Wikibase entities corresponding to zbMath publications.
+ It creates a :class:`mardi_importer.zbmath.ZBMathPublication` instance
+ for each publication. Authors and journals are added, as well.
+ """
+ found = False
+ with open(self.processed_dump_path, "r") as infile:
+ in_header_line = True
+ for line in infile:
+ if in_header_line:
+ headers = line.strip().split("\t")
+ in_header_line = False
+ continue
+ split_line = line.strip().split("\t")
+ # formatting error: skip
+ if len(split_line) != len(headers):
+ continue
+ info_dict = dict(zip(headers, split_line))
+ # this part is for continuing at a certain position if the import failed
+ # if not found:
+ # if info_dict["de_number"].strip() != " ":
+ # if info_dict["document_title"] != "Unimodular supergravity":
+ # continue
+ # else:
+ # found = True
+ # continue
+ if info_dict["document_title"] in self.existing_publications:
+ print(
+ f"A publication with the name {info_dict['document_title']} was already created in this run."
+ )
+ continue
+ # if there is not title, don't add
+ if self.conflict_string in info_dict["document_title"]:
+ if (
+ self.conflict_string not in info_dict["doi"]
+ and info_dict["doi"] != "None"
+ ):
+ document_title = get_info_from_doi(
+ doi=info_dict["doi"].strip(), key="document_title"
+ )
+ if not document_title:
+ print("No title from doi, uploading empty")
+ else:
+ print(f"Found document title {document_title} from doi")
+ else:
+ print("No doi found, uploading empty.")
+ document_title = None
+ # only upload those where there was a conflict before
+ else:
+ print(f"Skipping non-conflict paper {info_dict['document_title']}")
+ continue
+ document_title = info_dict["document_title"].strip()
+ if not info_dict["zbl_id"] == "None":
+ zbl_id = info_dict["zbl_id"]
+ else:
+ zbl_id = None
+
+ if (
+ not self.conflict_string in info_dict["author_ids"]
+ and "None" not in info_dict["author_ids"]
+ ):
+ author_ids = info_dict["author_ids"].split(";")
+ if (
+ self.conflict_string in info_dict["author"]
+ or "None" in info_dict["author"]
+ ):
+ author_strings = [None] * len(author_ids)
+ else:
+ author_strings = info_dict["author"].split(";")
+ authors = []
+ for a, a_id in zip(author_strings, author_ids):
+ if a:
+ a = a.strip()
+ a_id = a_id.strip()
+ if a_id in self.existing_authors:
+ authors.append(self.existing_authors[a_id])
+ print(f"Author with name {a} was already created this run.")
+ else:
+ for attempt in range(5):
+ try:
+ author = ZBMathAuthor(
+ integrator=self.integrator,
+ name=a,
+ zbmath_author_id=a_id,
+ )
+ local_author_id = author.create()
+ except Exception as e:
+ print(f"Exception: {e}, sleeping")
+ print(traceback.format_exc())
+ time.sleep(120)
+ else:
+ break
+ else:
+ sys.exit("Uploading author did not work after retries!")
+ authors.append(local_author_id)
+ self.existing_authors[a_id] = local_author_id
+ else:
+ authors = []
+
+ if (
+ self.conflict_string in info_dict["serial"]
+ or info_dict["serial"].strip() == "None"
+ ):
+ if (
+ self.conflict_string not in info_dict["doi"]
+ and info_dict["doi"] != "None"
+ ):
+ journal_string = get_info_from_doi(
+ doi=info_dict["doi"].strip(), key="journal"
+ )
+ else:
+ journal_string = None
+ else:
+ journal_string = info_dict["serial"].split(";")[-1].strip()
+ if journal_string:
+ if journal_string in self.existing_journals:
+ journal = self.existing_journals[journal_string]
+ print(
+ f"Journal {journal_string} was already created in this run."
+ )
+ else:
+ for attempt in range(5):
+ try:
+ journal_item = ZBMathJournal(
+ integrator=self.integrator, name=journal_string
+ )
+ if journal_item.exists():
+ print(f"Journal {journal_string} exists!")
+ journal = journal_item.QID
+ else:
+ print(f"Creating journal {journal_string}")
+ journal = journal_item.create()
+ except Exception as e:
+ print(f"Exception: {e}, sleeping")
+ print(traceback.format_exc())
+ time.sleep(120)
+ else:
+ break
+ else:
+ sys.exit("Uploading journal did not work after retries!")
+ self.existing_journals[journal_string] = journal
+ else:
+ journal = None
+
+ if not self.conflict_string in info_dict["language"]:
+ language = info_dict["language"].strip()
+ else:
+ language = None
+
+ if not self.conflict_string in info_dict["publication_year"]:
+ time_string = (
+ f"+{info_dict['publication_year'].strip()}-00-00T00:00:00Z"
+ )
+ else:
+ time_string = None
+
+ if not self.conflict_string in info_dict["links"]:
+ pattern = re.compile(r'^([a-z][a-z\d+.-]*):([^][<>\"\x00-\x20\x7F])+$')
+ links = info_dict["links"].split(";")
+ links = [ x.strip() for x in links if (pattern.match(x) and "http" in x)]
+ else:
+ links = []
+
+ if (
+ not self.conflict_string in info_dict["doi"]
+ and not "None" in info_dict["doi"]
+ ):
+ doi = info_dict["doi"].strip()
+ else:
+ doi = None
+
+ if info_dict["creation_date"] != "0001-01-01T00:00:00":
+ # because there can be no hours etc
+ creation_date = (
+ f"{info_dict['creation_date'].split('T')[0]}T00:00:00Z"
+ )
+ else:
+ creation_date = None
+
+ if (
+ not self.conflict_string in info_dict["review_text"]
+ and info_dict["review_text"].strip() != "None"
+ ):
+ review_text = info_dict["review_text"].strip()
+ if (
+ not self.conflict_string in info_dict["review_sign"]
+ and info_dict["review_sign"].strip() != "None"
+ and not self.conflict_string in info_dict["reviewer_id"]
+ and info_dict["reviewer_id"].strip() != "None"
+ ):
+ reviewer_id = info_dict["reviewer_id"].strip()
+ reviewer_name = (
+ info_dict["review_sign"]
+ .strip()
+ .split("/")[0]
+ .strip()
+ .split("(")[0]
+ .strip()
+ )
+ if reviewer_id in self.existing_authors:
+ reviewer = self.existing_authors[reviewer_id]
+ print(
+ f"Reviewer with name {a} was already created this run."
+ )
+ else:
+ for attempt in range(5):
+ try:
+ reviewer_object = ZBMathAuthor(
+ integrator=self.integrator,
+ name=reviewer_name,
+ zbmath_author_id=reviewer_id,
+ )
+ reviewer = reviewer_object.create()
+ except Exception as e:
+ print(f"Exception: {e}, sleeping")
+ print(traceback.format_exc())
+ time.sleep(120)
+ else:
+ break
+ else:
+ sys.exit(
+ "Uploading reviewer did not work after retries!"
+ )
+ self.existing_authors[reviewer_id] = reviewer
+ else:
+ reviewer = None
+ else:
+ review_text = None
+ reviewer = None
+
+ if (
+ not self.conflict_string in info_dict["classifications"]
+ and info_dict["classifications"].strip() != "None"
+ ):
+ classifications = info_dict["classifications"].strip().split(";")
+ else:
+ classifications = None
+
+ if info_dict["de_number"].strip() != "None":
+ de_number = info_dict["de_number"].strip()
+ else:
+ de_number = None
+
+ if (
+ not self.conflict_string in info_dict["keywords"]
+ and info_dict["keywords"].strip() != "None"
+ ):
+ keywords = info_dict["keywords"].strip().split(";")
+ keywords = [x.strip() for x in keywords]
+ else:
+ keywords = None
+ for attempt in range(5):
+ try:
+ publication = ZBMathPublication(
+ integrator=self.integrator,
+ title=document_title,
+ doi=doi,
+ authors=authors,
+ journal=journal,
+ language=language,
+ time=time_string,
+ links=links,
+ creation_date=creation_date,
+ zbl_id=zbl_id,
+ review_text=review_text,
+ reviewer=reviewer,
+ classifications=classifications,
+ de_number=de_number,
+ keywords=keywords,
+ de_number_prop=self.de_number_prop,
+ keyword_prop=self.keyword_prop,
+ )
+ if publication.exists():
+ print(f"Publication {document_title} exists")
+ publication.update()
+ else:
+ print(f"Creating publication {document_title}")
+ publication.create()
+ except Exception as e:
+ print(f"Exception: {e}, sleeping")
+ print(traceback.format_exc())
+ time.sleep(120)
+ else:
+ break
+ else:
+ sys.exit("Uploading publication did not work after retries!")
+ # in case a publication is listed twice; this normally happens
+ # within a distance of a few lines
+ if document_title:
+ self.existing_publications.append(document_title)
+ self.existing_publications = self.existing_publications[-100:]
+
+
+
+[docs]
+ def get_de_number(self, xml_record):
+ """
+ Get zbMath id from xml record.
+
+ Args:
+ xml_record (xml element): record returned by zbMath API
+
+ Returns:
+ string: zbMath ID
+ """
+ de_number = (
+ xml_record.find(get_tag("header", self.namespace))
+ .find(get_tag("identifier", namespace=self.namespace))
+ .text
+ )
+ de_number = de_number.split(":")[-1]
+ return de_number
+
+
+
+[docs]
+ def get_creation_date(self, xml_record):
+ """
+ Get creation date from xml record.
+
+ Args:
+ xml_record (xml element): record returned by zbMath API
+
+ Returns:
+ string: creation date
+ """
+ creation_date = (
+ xml_record.find(get_tag("header", self.namespace))
+ .find(get_tag("datestamp", namespace=self.namespace))
+ .text
+ )
+ return creation_date
+
+
+
+from habanero import Crossref
+from requests.exceptions import HTTPError
+
+
+
+[docs]
+def get_tag(tag_name, namespace):
+ """
+ Returns a fully qualified tag name.
+
+ Args:
+ tag_name (string): name of tag, e.g. author
+ namespace (string): namespace URL of a namespace
+ """
+ return "{{{}}}{}".format(namespace, tag_name)
+
+
+
+
+[docs]
+def parse_doi_info(val, work_info):
+ """
+ Function to extract information returned by a doi query for a specific tag.
+
+ Args:
+ val (string): tag, e.g. author
+ work_info (dict): information from doi query response
+
+ Returns:
+ string: information for specific tag, None if not found
+ """
+ # information about return fields can be found under https://api.crossref.org/swagger-ui/index.html#/Works/get_works
+ if val == "author":
+ # author and the familiy subfield are mandatory fields in crossref api
+ # looks like: 'author': [{'given': 'Max', 'family': 'Mustermann', 'sequence': 'first', 'affiliation': []}]
+ if "author" not in work_info:
+ return None
+ first_name = ""
+ family_name = ""
+ author_list = []
+ for author_dict in work_info["author"]:
+ # family name not known: too little information
+ if "family" not in author_dict:
+ return None
+ family_name = author_dict["family"]
+ # family name not known; too little information
+ if not family_name:
+ return None
+ if "given" in author_dict:
+ first_name = author_dict["given"]
+ # first name not necessarily needed
+ if not first_name:
+ author_list.append(family_name)
+ else:
+ author_list.append(family_name + ", " + first_name)
+
+ return ";".join(author_list)
+ elif val == "document_title":
+ if "document_title" not in work_info:
+ return None
+ title_list = work_info["title"]
+ if title_list:
+ return ";".join(title_list)
+ else:
+ return None
+ elif val == "publication_year":
+ # date-parts is a mandaory field for published in crossref api
+ # 'published': {'date-parts': [[2008]]}} this is not necessarily the year this was published in the journal, apparently...
+ if "published" not in work_info:
+ return None
+ # this is either a year or None
+ return work_info["published"]["date_parts"][0][0]
+ elif val == "serial":
+ if "reference" not in work_info:
+ return None
+ serials = []
+ for serial_dict in work_info["reference"]:
+ if "journal_title" in serial_dict:
+ serials.append(serial_dict["journal-title"])
+ # if no serials were found
+ if not serials:
+ return None
+ # make list unique
+ serials = list(set(serials))
+ return ";".join(serials)
+
+ elif val == "language":
+ if "language" not in work_info:
+ return None
+ return work_info["language"]
+ elif val == "keywords":
+ if "subject" not in work_info:
+ return None
+ return ";".join(work_info["subject"])
+
+
+
+
+[docs]
+def get_info_from_doi(doi, key):
+ """
+ Query crossref API for DOI information.
+
+ Args:
+ doi: doi
+ key: document_title only for now
+
+ Returns:
+ title: document title
+ """
+ doi_list = doi.split(";")
+ # print("doi")
+ # print(doi)
+ # print("doi list")
+ # print(doi_list)
+ cr = Crossref(mailto="pusch@zib.de")
+ for doi in doi_list:
+ try:
+ work_info = cr.works(ids=doi)
+ # print("work info")
+ # print(work_info)
+ if key == "document_title":
+ if "title" not in work_info["message"]:
+ continue
+ # print(work_info["message"])
+ # print(work_info["message"]["title"])
+ title_list = work_info["message"]["title"]
+ if title_list:
+ joint_title = ";".join(title_list).strip()
+ joint_title = joint_title.replace("\n", " ").strip()
+ joint_title = joint_title.replace("\t", " ").strip()
+ if len(joint_title) > 500:
+ return None
+ return joint_title
+ else:
+ continue
+ elif key == "journal":
+ if "container-title" not in work_info["message"]:
+ return None
+ if not work_info["message"]["container-title"]:
+ return None
+ journal = work_info["message"]["container-title"][0].strip()
+ return journal
+ # if the doi is not found, there is a 404
+ except HTTPError:
+ print("HTTP Error!")
+ continue
+ return None
+
+
' + + '' + + _("Hide Search Matches") + + "
" + ) + ); + }, + + /** + * helper function to hide the search marks again + */ + hideSearchWords: () => { + document + .querySelectorAll("#searchbox .highlight-link") + .forEach((el) => el.remove()); + document + .querySelectorAll("span.highlighted") + .forEach((el) => el.classList.remove("highlighted")); + localStorage.removeItem("sphinx_highlight_terms") + }, + + initEscapeListener: () => { + // only install a listener if it is really needed + if (!DOCUMENTATION_OPTIONS.ENABLE_SEARCH_SHORTCUTS) return; + + document.addEventListener("keydown", (event) => { + // bail for input elements + if (BLACKLISTED_KEY_CONTROL_ELEMENTS.has(document.activeElement.tagName)) return; + // bail with special keys + if (event.shiftKey || event.altKey || event.ctrlKey || event.metaKey) return; + if (DOCUMENTATION_OPTIONS.ENABLE_SEARCH_SHORTCUTS && (event.key === "Escape")) { + SphinxHighlight.hideSearchWords(); + event.preventDefault(); + } + }); + }, +}; + +_ready(() => { + /* Do not call highlightSearchWords() when we are on the search page. + * It will highlight words from the *previous* search query. + */ + if (typeof Search === "undefined") SphinxHighlight.highlightSearchWords(); + SphinxHighlight.initEscapeListener(); +}); diff --git a/cran.html b/cran.html new file mode 100644 index 0000000..c526062 --- /dev/null +++ b/cran.html @@ -0,0 +1,712 @@ + + + + + + +This module imports R packages published at the Comprehensive R Archive Network (CRAN).
+Specifically, it reads the table of packages ordered by date of publication. +This table contains for each R package the package name, title and date of publication. Based on the package name, +each package url can be accessed from:
+https://cran.r-project.org/web/packages/<package_name>/index.html
Several attributes are listed for each package. Among them, the following attributes are imported, when present, to the MaRDI knowledge graph:
+Version of the package.
+Software and package dependencies, including other R packages.
+Date of publication.
+Authors of the package are to be indicated according to the CRAN Repository Policy with the abbreviation [aut]. Given that this guideline is not always implemented, it is not always +possible to properly parse the authors.
+When no abbrevations describing the role of each individual are included, just the first listed author is imported.
+Software maintainer (Generally one of the authors).
+Bases: ADataSource
Processes data from the Comprehensive R Archive Network.
+Metadata for each R package is scrapped from the CRAN Repository. Each +Wikibase item corresponding to each R package is subsequently updated +or created, in case of a new package.
+Dataframe with package name, title and date of publication for +each package in CRAN.
+Pandas dataframe
+Reads date, package name and title from the CRAN Repository URL.
+The result is saved as a pandas dataframe in the attribute packages.
+Attribute packages
Pandas dataframe
+ImporterException – If table at the CRAN url cannot be accessed or read.
+Updates the MaRDI Wikibase entities corresponding to R packages.
+For each package name in the attribute packages checks +if the date in CRAN coincides with the date in the MaRDI +knowledge graph. If not, the package is updated. If the package +is not found in the MaRDI knowledge graph, the corresponding +item is created.
+It creates a mardi_importer.cran.RPackage
instance
+for each package.
Bases: object
Class to manage R package items in the local Wikibase instance.
+Date of publication
+Package name
+Title of the R package
+Extended description of the R package
+URL to the CRAN repository
+Version of the R package
+Author(s) of the package
+Software license
+Dependencies to R and other packages
+Software maintainer
+Imports metadata from CRAN corresponding to the R package.
+Imports Version, Dependencies, Authors, Maintainer +and License and saves them as instance attributes.
+Checks if a WB item corresponding to the R package already exists.
+Searches for a WB item with the package label in the SQL Wikibase +tables and returns True if a matching result is found.
+It uses for that the mardi_importer.wikibase.WBItem.instance_exists()
+method.
Entity ID
+String
+Checks if the WB item corresponding to the R package is up to date.
+Compares the publication date in the local knowledge graph with the +publication date imported from CRAN.
+True if both dates coincide, False otherwise.
+Boolean
+Creates a WB item with the imported metadata from CRAN.
+The metadata corresponding to one package is first pulled as instance
+attributes through pull()
. Before creating the new entity
+corresponding to an R package, new entities corresponding to dependencies
+and authors are alreday created, when these do not already exist in the
+local Wikibase instance.
Uses mardi_importer.wikibase.WBItem
to create the
+corresponding new item.
ID of the created R package.
+String
+Updates existing WB item with the imported metadata from CRAN.
+The metadata corresponding to the package is first pulled from CRAN and
+saved as instance attributes through pull()
. The statements that
+do not coincide with the locally saved information are updated or
+subsituted with the updated information.
Uses mardi_importer.wikibase.WBItem
to update the item
+corresponding to the R package.
ID of the updated R package.
+String
+Processes the author information of each R package. This includes:
+Searching if an author with the given ID already exists in the KG.
Alternatively, create WB Items for new authors.
Item IDs corresponding to each author.
+List
+Processes the maintainer information of each R package. This includes:
+Providing the Item ID given the maintainer name.
Creating a new WB Item if the maintainer is not found in the +local graph.
Item ID corresponding to the maintainer.
+String
+Processes the dependency and import information of each R package. This includes:
+Extracting the version information of each dependency/import if provided.
Providing the Item ID given the dependency/import label.
Creating a new WB Item if the dependency/import is not found in the +local knowledge graph.
Dictionary with key value corresponding to the Item ID of each dependency or +import. The value indicates the version of each dependency or import, if +provided, which is added in the statement as a qualifier.
+Dict
+Extracts the DOI identification of related publications.
+Identifies the DOI of publications that are mentioned using the +format doi: or arXiv: in the long description of the +R package.
+List containing the wikibase IDs of mentioned publications.
+List
+Adds the statements corresponding to the package dependencies.
+Insert the wikibase statements corresponding the required R package for +the instantiated R package. The statement includes a link to the item +representing the dependency and, when provided, a qualifier +specifying the required version of the dependency.
+item (WBItem) – Item representing the R package to which the statement must be added.
+Adds the statements corresponding to the package imports.
+Insert the wikibase statements corresponding the imported R packages for +the instantiated R package. The statement includes a link to the item +representing the imported package and, when provided, a qualifier +specifying the required version of this package.
+item (WBItem) – Item representing the R package to which the imported packages +statements must be added.
+Processes the license string and adds the corresponding statements.
+The concrete License is identified and linked to the corresponding +item that has previously been imported from Wikidata. Further license +information, when provided between round or square brackets, is added +as a qualifier.
+If a file license is mentioned, the linked to the file license +in CRAN is added as a qualifier.
+item (WBItem) – Item representing the R package to which the statement must be added.
+Reads the package publication date saved in the local Wikibase instance.
+Queries the WB Item corresponding to the R package label through the +Wikibase API.
+Package publication date in format DD-MM-YYYY.
+String
+Processes raw imported data from CRAN to enable the creation of items.
+Package dependencies are splitted at the comma position.
License information is processed using the split_license()
method.
Author information is processed using the split_authors()
method.
Maintainer information is processed using the clean_maintainer()
method.
table_html – HTML code obtained with BeautifulSoup corresponding to the table +containing the metadata of the R package imported from CRAN.
+Dataframe with processed data from a single R package including columns: +Version, Author, License, Depends, Imports +and Maintainer.
+(Pandas dataframe)
+Splits given list in the comma position.
+x (String) – String to be splitted.
+List of elements
+(List)
+Splits string of licenses.
+Takes into account that licenses are often not uniformly listed. +Characters |, + and , are used to separate licenses. Further +details on each license are often included in square brackets.
+x (String) – String imported from CRAN representing license +information.
+List of licenses.
+(List)
+Splits the string corresponding to the authors into a dictionary.
+Author information in CRAN is not registered uniformly. This function +parses the imported string and returns just the names of the individuals +that can be unequivocally identified as authors (i.e. they are followed +by the [aut] abbreviation).
+Generally, authors in CRAN are indicated with the abbreviation [aut]. +When no abbreviations are included, only the first individual is imported +to Wikibase (otherwise it can often not be established whether +information after the first author refers to another individual, +an institution, a funder, etc.)
+x (String) – String imported from CRAN representing author +information.
+Dictionary of authors and corresponding ORCID ID, if provided.
+(Dict)
+Remove unnecessary information from maintainer string.
+x (String) – String imported from CRAN which my contain e-mail +address and comments within brackets
+Name of the maintainer
+(String)
+Returns the Wikidata item ID corresponding to a software license.
+The same license is often denominated in CRAN using differents names. +This function returns the wikidata item ID corresponding to a single +unique license that is referenced in CRAN under different names (e.g. +Artistic-2.0 and Artistic License 2.0 both refer to the same +license, corresponding to item Q14624826).
+license (String) – String corresponding to a license imported from CRAN.
+Wikidata item ID.
+(String)
++ | + |
+ | + |
+ |
+ | + |
|
+
|
+
+ |
+ |
+ | + |
+ |
+ | + |
+ | + |
+ |
+ |
+ |
Created on Thu Feb 17 18:53:53 2022
+@author: alvaro
+Bases: object
Abstract base class for parsing config files
+ + +Bases: object
Abstract base class for reading data from external sources.
+ + + + + + + + +This is the documentation of docker-importer.
+Bases: MardiClient
Sets up initial configuration for the integrator
+Clientlogin object
+Function for in-place conversion of wikidata +ids found in claims into local ids
+entity –
+None
+Function for in-place conversion of unit for quantity +and globe for globecoordinate to a link to the local entity +instead of a link to the wikidata entity.
+snak – a wikibaseintegrator snak
+None
+Check if db table for id mapping is there; if not, create.
+None –
+None
+Function for creating a list of ids +from a while where each id is in a new line
+file – path to file
+Returns: list of ids
+Check if entity with a given label or wikidata PID/QID +exists in the local wikibase instance.
+entity_str (str) – It can be a string label or a wikidata ID, +specified with the prefix wdt: for properties and wd: +for items.
entity_type (str) – Either ‘property’ or ‘item’ to specify +which type of entity to look for.
Local ID of the entity, if found.
+str
+Function for creating qualifiers from wikidata qualifiers +and in place adding them to the claim
+claim – a wikibaseintegrator claim
+Qualifiers object, can also be an empty object
+Function for creating references from wikidata references +and in place adding them to the claim
+claim – a wikibaseintegrator claim
+List with references, can also be an empty list
+Function for pulling wikidata information
+wikidata_id (str) – wikidata id of the desired entity
recurse (Bool) – if claims should also be imported
Returns: wikibase integrator entity or None, if the entity has no labels
+Function for importing entities that are mentioned +in claims from wikidata to the local wikibase instance
+wikidata_id (str) – id of the entity to be imported
+local id or None, if the entity had no labels
+Function for importing entities from wikidata +into the local instance.
+It can accept a single id, a list of ids or a file containing +a the ids to be imported.
+id_list – Single string or list of strings of wikidata +entity ids. Lexemes not supported.
filename – Filename containing list of entities to be +imported.
recurse – Whether to import claims for the entities in +id_list
Dictionary containing the local ids of +all the imported entities.
+Imported entities (Dict)
+Imports an entity from Wikidata just from a label
+label (str) – label to be imported from wikidata
+local id for the imported entity
+local_id (str)
+Searches the wikidata PID property ID to link +properties to its ID in wikidata. When not found, +it creates the property.
+wikidata_PID (str): wikidata PID property ID
+Searches the wikidata QID property ID to link +items to its ID in wikidata. When not found, +it creates the property.
+wikidata_QID (str): wikidata QID property ID
+Insert wikidata_id, local_id and has_all_claims into mapping table.
+wikidata_id – Wikidata id
local_id – local Wikibase id
has_all_claims – Boolean indicating whether the entity has been +imported with all claims or no claims (i.e. no recurse)
None
+Function for completing an already existing local entity +with its statements from wikidata.
+wikidata_id – Wikidata entity ID to be imported.
local_id – Local id of the existing entity that needs to +be completed with further statements.
Local entity ID
+local_id
+Query the wb_id_mapping db table for a given parameter.
+The two important parameters are the local_id and whether the +entity has already been imported with all claims
+parameter (str) – Either local_id or has_all_claims
wikidata_id (str) – Wikidata ID
otherwise None. For has_all_claims, a boolean is returned.
+str or boolean
+Query the wb_id_mapping db table for a given parameter.
+The two important parameters are the wikidata_id and whether the +entity has already been imported with all claims
+parameter (str) – Either wikidata_id or has_all_claims
local_id (str) – local ID
otherwise None. For has_all_claims, a boolean is returned.
+str or boolean
+Bases: MardiItem
Request the MediaWiki API to get data for the entity specified in argument.
+entity_id – The entity_id of the Item entity you want. Must start with a ‘Q’.
kwargs –
an ItemEntity instance
+What is this about?
+Install the python package of mardi-importer
by first installing the
+requirements from requirements.txt
,
pip install -r requirements.txt
+
Then install the packages via
+pip install -U -e .
+
-U
enforces reinstalling the package, with -e
modifications in
+the source files are automatically taken into account.
Note: for convenience, local installations not using docker can be placed within +virtual environments by first calling
+python3 -m venv env
+source env/bin/activate
+
TODO
+In docs/
, run make html
to generate the documentation for a
+local installation. The modules have to be installed and findable by import
+module
. To view the docs, open the file docs/_build/html/index.html
.
TODO
++ m | ||
+ |
+ mardi_importer | + |
+ |
+ mardi_importer.cran.CRANSource | + |
+ |
+ mardi_importer.cran.RPackage | + |
+ |
+ mardi_importer.importer.Importer | + |
+ |
+ mardi_importer.integrator.MardiEntities | + |
+ |
+ mardi_importer.integrator.MardiIntegrator | + |
+ |
+ mardi_importer.polydb | + |
+ |
+ mardi_importer.zbmath.misc | + |
+ |
+ mardi_importer.zbmath.ZBMathSource | + |
Bases: ADataSource
Reads data from zb math API.
+ + +Get creation date from xml record.
+xml_record (xml element) – record returned by zbMath API
+creation date
+string
+Get zbMath id from xml record.
+xml_record (xml element) – record returned by zbMath API
+zbMath ID
+string
+Parse xml record from zbMath API.
+xml_record (xml element) – record returned by zbMath API
+dict of (tag,value) pairs extracted from xml_record
+dict
+Overrides abstract method. +Reads a raw zbMath data dump and processes it, then saves it as a csv.
+Query crossref API for DOI information.
+doi – doi
key – document_title only for now
document title
+title
+Returns a fully qualified tag name.
+tag_name (string) – name of tag, e.g. author
namespace (string) – namespace URL of a namespace
Function to extract information returned by a doi query for a specific tag.
+val (string) – tag, e.g. author
work_info (dict) – information from doi query response
information for specific tag, None if not found
+string
+