diff --git a/ixmp/__init__.py b/ixmp/__init__.py index d9975c343..fc6c1174e 100644 --- a/ixmp/__init__.py +++ b/ixmp/__init__.py @@ -4,6 +4,7 @@ from ._config import config from .backend import BACKENDS, ItemType +from .backend.dbapi import DatabaseBackend from .backend.jdbc import JDBCBackend from .core import IAMC_IDX, Platform, Scenario, TimeSeries from .model import MODELS @@ -33,6 +34,7 @@ __version__ = "999" # Register Backends provided by ixmp +BACKENDS["dbapi"] = DatabaseBackend BACKENDS["jdbc"] = JDBCBackend # Register Models provided by ixmp diff --git a/ixmp/_config.py b/ixmp/_config.py index f5ecf70cd..2170bc369 100644 --- a/ixmp/_config.py +++ b/ixmp/_config.py @@ -267,6 +267,10 @@ def add_platform(self, name, *args, **kwargs): "JDBCBackend with driver=hsqldb" ) assert len(args) == 0 + elif cls == "dbapi": + from ixmp.backend.dbapi import DatabaseBackend + + info = DatabaseBackend.handle_config(args) else: raise ValueError(cls) diff --git a/ixmp/backend/base.py b/ixmp/backend/base.py index 55efee3d0..ccc4fdf1a 100644 --- a/ixmp/backend/base.py +++ b/ixmp/backend/base.py @@ -766,12 +766,12 @@ def clone( The cloned Scenario. """ - @abstractmethod def has_solution(self, s: Scenario): - """Return `True` if Scenario *s* has been solved. + """OPTIONAL: Return `True` if Scenario *s* has been solved. If :obj:`True`, model solution data is available from the Backend. """ + raise NotImplementedError @abstractmethod def list_items(self, s: Scenario, type): diff --git a/ixmp/backend/dbapi.py b/ixmp/backend/dbapi.py new file mode 100644 index 000000000..f0e55847a --- /dev/null +++ b/ixmp/backend/dbapi.py @@ -0,0 +1,673 @@ +import json +import logging +import pickle +from hashlib import sha1 +from operator import itemgetter +from typing import Any, Dict, Generator, Tuple + +import pandas as pd +import sqlite3 + +from ixmp.backend import FIELDS, ItemType +from ixmp.backend.base import Backend +from ixmp.backend.io import s_write_gdx +from ixmp.core import TimeSeries +from ixmp.utils import as_str_list + +log = logging.getLogger(__name__) + + +def _filters_match(identifiers, **filters): + return all( + (len(v) == 0 or identifiers.get(k, None) in v) for k, v in filters.items() + ) + + +class DatabaseBackend(Backend): + """Backend based on Python DB API 2.0.""" + + # TODO use CachingBackend as the base class + + # Database connection object. + conn = None + + _index = {} + + def __init__(self, **kwargs): + self._db_path = kwargs.pop("path") + assert 0 == len(kwargs) + + # Open and maybe initialize the database + self.open_db() + + # Optional methods + + def open_db(self): + self.conn = sqlite3.connect(self._db_path) + init_schema(self.conn) + + def close_db(self): + try: + self.conn.close() + except AttributeError: + pass # Already closed + else: + self.conn = None + + # Methods for ixmp.Platform + + def set_node(self, name, parent=None, hierarchy=None, synonym=None): + if synonym: + raise NotImplementedError + + hierarchy = hierarchy or "default" + self._insert_code(f"node_{hierarchy}", name, parent) + + def get_nodes(self): + cur = self.conn.cursor() + cur.execute("SELECT * FROM code WHERE codelist LIKE 'node_%'") + while True: + row = cur.fetchone() + if row is None: + return + yield (row[1], None, row[2], row[0].split("node_")[-1]) + + def get_timeslices(self): + cur = self.conn.cursor() + cur.execute( + """ + SELECT c.id, c.codelist, a.value FROM code AS c INNER JOIN annotation AS a + ON a.obj_class == 'code' AND printf('%s:%s', c.codelist, c.id) == a.obj_id + WHERE codelist LIKE 'timeslice_%' + """ + ) + while True: + row = cur.fetchone() + if row is None: + return + yield (row[0], row[1].split("timeslice_")[1], float(row[2])) + + def set_timeslice(self, name, category, duration): + codelist = f"timeslice_{category}" + self._insert_code(codelist, name) + self._annotate_code(codelist, name, "duration", repr(duration)) + + def add_model_name(self, name): + self._insert_code("model_name", name) + + def add_scenario_name(self, name): + self._insert_code("scenario_name", name) + + def get_model_names(self) -> Generator[str, None, None]: + yield from map(itemgetter(0), self._select_codes("model_name")) + + def get_scenario_names(self) -> Generator[str, None, None]: + yield from map(itemgetter(0), self._select_codes("scenario_name")) + + def set_unit(self, name, comment): + self._insert_code("unit", name) + if comment: + log.info(comment) + + def get_units(self): + return list(map(itemgetter(0), self._select_codes("unit"))) + + def write_file(self, path, item_type, **kwargs): + # TODO move this code to the parent class. s_write_gdx() works with any + # backend, so it should be for any class that *doesn't* overload it + s, kwargs["filters"] = self._handle_rw_filters(kwargs.pop("filters", {})) + + if path.suffix == ".gdx" and item_type is ItemType.MODEL and s: + s_write_gdx(self, s, path, **kwargs) + else: + super().write_file(path, item_type, **kwargs) + + # Methods for ixmp.TimeSeries + + def init(self, ts, annotation): + # Identifiers for `ts` + info = [ts.__class__.__name__, ts.model, ts.scenario] + cur = self.conn.cursor() + + # Identify the maximum previous version matching these identifiers + cur.execute( + """ + SELECT max(version) FROM timeseries WHERE class = ? AND model_name = ? + AND scenario_name = ? + """, + info, + ) + previous_version = cur.fetchone()[0] or 0 + + # Use the next available version + ts.version = previous_version + 1 + + # Insert + cur.execute( + """ + INSERT OR ABORT INTO timeseries (class, model_name, scenario_name, version) + VALUES (?, ?, ?, ?) + """, + info + [ts.version], + ) + cur.execute("SELECT last_insert_rowid()") + + # Store the ID + self._index[ts] = cur.fetchone()[0] + + self.conn.commit() + + def get(self, ts): + id, version = self._get(ts.model, ts.scenario, ts.version) + + ts.version = ts.version or version + assert ts.version == version # Sanity check + + self._index[ts] = id + + def check_out(self, ts, timeseries_only): + if timeseries_only: + log.info("timeseries_only=True ignored") + log.warning("check_out() has no effect") + + def commit(self, ts, comment): + log.warning("commit() has no effect") + log.info(comment) + + def set_as_default(self, ts): + cur = self.conn.cursor() + + cond = "a.obj_class == 'timeseries' AND a.id == '__ixmp_default_version'" + + cur.execute( + f""" + SELECT ts.id FROM timeseries AS ts JOIN annotation AS a ON a.obj_id == ts.id + WHERE ts.model_name = ? AND ts.scenario_name = ? AND {cond} + """, + (ts.model, ts.scenario), + ) + existing_default = cur.fetchone() + + if existing_default: + # Delete the existing default + cur.execute( + f"DELETE FROM annotation AS a WHERE a.obj_id = ? AND {cond}", + existing_default, + ) + + self._annotate(ts, "__ixmp_default_version", None) + + def is_default(self, ts): + return self._select_anno(ts, "__ixmp_default_version") is not None + + def run_id(self, ts): + return self._jindex[ts] + + def get_data(self, ts, region, variable, unit, year): + # NB this is a simple implementation + # TODO use the __ixmp_ts_info annotation to filter before loading item data + filters = dict( + region=as_str_list(region), + variable=as_str_list(variable), + unit=as_str_list(unit), + year=as_str_list(year), + ) + + # Loop over all "tsdata" items + for name, *info in self._iter_items(ts, "tsdata"): + id, item = self._item_data(ts, name) + + if not _filters_match(item, **filters): + # Doesn't match the filters + continue + + # Iterate through the pd.Series + for year, value in item["data"].items(): + # Insert the values in the item dict to simply construction of the tuple + item["year"] = int(year) + item["value"] = float(value) + yield tuple(item[f] for f in FIELDS["ts_get"]) + + def get_geo(self, ts): + for name, *info in self._iter_items(ts, "geodata"): + id, item = self._item_data(ts, name) + item["value"] = item["data"] + yield tuple(item[f] for f in FIELDS["ts_get_geo"]) + + def set_data(self, ts, region, variable, data, unit, subannual, meta): + self._set_item_with_identifiers( + ts=ts, + kind="tsdata", + data=data, + dims=["year"], + identifiers=dict( + region=region, + variable=variable, + subannual=subannual, + unit=unit, + meta=meta, + ), + ) + + def set_geo(self, ts, region, variable, subannual, year, value, unit, meta): + self._set_item_with_identifiers( + ts=ts, + kind="geodata", + data=value, + dims=[], + identifiers=dict( + region=region, + variable=variable, + subannual=subannual, + unit=unit, + meta=meta, + year=year, + ), + ) + + # Methods for ixmp.Scenario + + def clone( + self, + s, + platform_dest, + model, + scenario, + annotation, + keep_solution, + first_model_year=None, + ): + log.warning( + f"clone({s}, {platform_dest}, {model}, {scenario}, {annotation}, " + f"{keep_solution}, {first_model_year}) has no effect" + ) + return s + + def list_items(self, s, type): + return list(map(itemgetter(0), self._iter_items(s, type))) + + def init_item(self, s, type, name, idx_sets, idx_names): + idx_names = idx_names or idx_sets + dimensions = {n: s for n, s in zip(idx_names, idx_sets)} + + cur = self.conn.cursor() + + cur.execute( + """ + INSERT OR ABORT INTO item (timeseries, type, name, dims) + VALUES (?, ?, ?, ?) + """, + (self._index[s], type, name, repr(dimensions)), + ) + + def item_index(self, s, name, sets_or_names): + cur = self.conn.cursor() + + cur.execute( + "SELECT dims FROM item AS i WHERE i.timeseries = ? AND i.name = ?", + (self._index[s], name), + ) + dims = eval(cur.fetchone()[0]) + return list(dims.keys() if sets_or_names == "names" else dims.values()) + + def item_get_elements(self, s, type, name, filters): + id, data = self._item_data(s, name) + + cur = self.conn.cursor() + if data is None: + cur.execute("SELECT dims from item WHERE id = ?", (id,)) + dims = eval(cur.fetchone()[0]) + idx_names, idx_sets = ( + list(zip(*dims.items())) if len(dims) else (tuple(), tuple()) + ) + data = tuple() + + if len(idx_sets): + # Mapping set or multi-dimensional equation, parameter, or variable + columns = list(idx_names) + + # Prepare dtypes for index columns + dtypes = {} + for idx_name, idx_set in zip(columns, idx_sets): + dtypes[idx_name] = str + + # Prepare dtypes for additional columns + if type == "par": + columns.extend(["value", "unit"]) + dtypes["value"] = float + dtypes["unit"] = str + elif type in ("equ", "var"): + columns.extend(["lvl", "mrg"]) + dtypes.update({"lvl": float, "mrg": float}) + + # Create data frame + result = pd.DataFrame(data, columns=columns).astype(dtypes) + elif type in ("equ", "var"): + # Scalar equations and variables + result = dict(zip(("lvl", "mrg"), data)) + else: + raise NotImplementedError("non-indexed items") + + return result + + def item_set_elements(self, s, type, name, elements): + cur = self.conn.cursor() + + id, data = self._item_data(s, name) + + if data is not None: + raise NotImplementedError("update existing items") + + data = [] + + for e in elements: + data.append(e[0] if type == "set" else e) + + cur.execute( + "INSERT OR REPLACE INTO item_data (item, value) VALUES (?, ?)", + (id, pickle.dumps(data)), + ) + self.conn.commit() + + def _get_meta(self, target): + cur = self.conn.cursor() + cur.execute( + "SELECT id, value FROM annotation WHERE obj_class = ? AND obj_id = ?", + target, + ) + result = dict() + + while True: + anno = cur.fetchone() + if anno is None: + break + result[anno[0]] = eval(anno[1]) + + return result + + def get_meta(self, model, scenario, version, strict): + if strict: + targets = [self._meta_target(model, scenario, version)] + else: + raise NotImplementedError + + result = dict() + for target in targets: + result.update(self._get_meta(target)) + + return result + + def set_meta(self, meta, model, scenario, version): + target = self._meta_target(model, scenario, version) + for key, value in meta.items(): + self._annotate(target, key, repr(value)) + + def remove_meta(self, categories, model, scenario, version): + cur = self.conn.cursor() + cur.execute( + f""" + DELETE FROM annotation WHERE obj_class = ? AND obj_id = ? + AND id IN ({', '.join('?' * len(categories))}) + """, + list(self._meta_target(model, scenario, version)) + as_str_list(categories), + ) + + # Internal + + def _annotate(self, obj, anno_id, value): + if isinstance(obj, TimeSeries): + data = ["timeseries", str(self._index[obj])] + elif isinstance(obj, tuple): + data = list(obj) + else: + raise NotImplementedError + + self.conn.execute( + "INSERT OR ABORT INTO annotation VALUES (?, ?, ?, ?)", + data + [anno_id, value], + ) + self.conn.commit() + + def _annotate_code(self, codelist, code_id, anno_id, value): + self.conn.execute( + "INSERT OR ABORT INTO annotation VALUES (?, ?, ?, ?)", + ("code", f"{codelist}:{code_id}", anno_id, value), + ) + self.conn.commit() + + def _get(self, model, scenario, version): + args = [model, scenario] + if version: + query = """ + SELECT ts.id, ts.version FROM timeseries AS ts WHERE model_name = ? + AND scenario_name = ? AND version = ? + """ + args.append(version) + else: + query = """ + SELECT ts.id, ts.version FROM timeseries AS ts JOIN annotation AS a + ON a.obj_id == ts.id WHERE ts.model_name = ? AND ts.scenario_name = ? + AND a.obj_class == 'timeseries' AND a.id == '__ixmp_default_version' + """ + + cur = self.conn.cursor() + cur.execute(query, args) + + result = cur.fetchone() + if result is None: + raise ValueError(f"model={model}, scenario={scenario}") + else: + return result + + def _hash(self, identifiers): + return sha1(json.dumps(identifiers).encode()).hexdigest() + + def _insert_code(self, codelist, id, parent=None): + self.conn.execute( + "INSERT OR ABORT INTO code VALUES (?, ?, ?)", (codelist, id, parent) + ) + self.conn.commit() + + def _item_data(self, s, name): + """Retrieve and unpickle data for item `name` in TimeSeries `s`. + + Returns + ------- + int + integer ID of the item. + object + item data. + """ + cur = self.conn.cursor() + + cur.execute( + """ + SELECT i.id, value FROM item AS i LEFT JOIN item_data + ON i.id == item_data.item WHERE i.timeseries = ? AND i.name = ? + """, + (self._index[s], name), + ) + result = cur.fetchone() + if result[1]: + return result[0], pickle.loads(result[1]) + else: + return result[0], None + + def _iter_items(self, s, type): + cur = self.conn.cursor() + cur.execute( + """ + SELECT i.name FROM timeseries AS ts + JOIN item as i ON ts.id == i.timeseries + WHERE ts.id = ? AND i.type = ? + """, + (self._index[s], type), + ) + while True: + result = cur.fetchone() + if result is None: + return + yield result + + def _meta_target(self, model, scenario, version): + """Return the target object to be annotated with metadata.""" + if scenario is version is None: + return ("code", f"model_name:{model}") + elif model is version is None: + return ("code", f"scenario_name:{scenario}") + elif isinstance(version, int): + id, _version = self._get(model, scenario, version) + assert _version == version + return ("timeseries", id) + else: + return ("model/scenario", f"{model}/{scenario}") + + def _select_anno(self, obj, anno_id): + if isinstance(obj, TimeSeries): + data = ["timeseries", str(self._index[obj])] + elif isinstance(obj, tuple): + data = list(obj) + else: + raise NotImplementedError + + cur = self.conn.cursor() + cur.execute( + """ + SELECT * FROM annotation WHERE annotation.obj_class = ? + AND annotation.obj_id = ? AND annotation.id = ? + """, + data + [anno_id], + ) + return cur.fetchone() + + def _select_codes(self, codelist): + cur = self.conn.cursor() + cur.execute("SELECT id FROM code WHERE codelist == ?", (codelist,)) + while True: + results = cur.fetchmany() + if not len(results): + break + yield from results + + def _set_item_with_identifiers(self, ts, kind, data, dims, identifiers): + cur = self.conn.cursor() + + # Compute a unique name for this combination of identifiers + name = self._hash(identifiers) + log.debug(f"hash {name} for {identifiers}") + + # Create the entry in the database + self.init_item(ts, kind, name, dims, dims) + + # Retrieve any existing data + id, existing = self._item_data(ts, name) + + if existing: + raise NotImplementedError("set_data() with existing data") + + all_data = identifiers.copy() + all_data["data"] = data + + # Dump the data + cur.execute( + "INSERT OR REPLACE INTO item_data (item, value) VALUES (?, ?)", + (id, pickle.dumps(all_data)), + ) + self.conn.commit() + + # Store an annotation with the identifiers + self._annotate(("item", name), "__ixmp_ts_info", repr(identifiers)) + + # Required methods that are not implemented + # + # Since base.Backend is an abstract base class with abstract methods, a subclass + # (like this one) cannot be instantiated unless a concrete implementation is given + # for each abstract method. Here we raise NotImplementedError for each. + + def nie(self, *args, **kwargs): + raise NotImplementedError + + cat_get_elements = nie + cat_list = nie + cat_set_elements = nie + clear_solution = nie + delete = nie + delete_geo = nie + delete_item = nie + discard_changes = nie + get_doc = nie + get_scenarios = nie + item_delete_elements = nie + last_update = nie + set_doc = nie + + # Class-specific methods + + @classmethod + def handle_config(cls, args: Tuple[str, ...]) -> Dict[str, Any]: + """Handle ``ixmp platform add`` CLI arguments.""" + info = {"class": "dbapi", "path": args.pop(0)} + assert 0 == len(args) + return info + + +SCHEMA = """ + CREATE TABLE schema (key, value); + INSERT INTO schema VALUES ('version', '1.0'); + + CREATE TABLE code (codelist, id, parent); + + CREATE TABLE annotation ( + obj_class VARCHAR NOT NULL, + obj_id NOT NULL, + id VARCHAR NOT NULL, + value, + UNIQUE (obj_class, obj_id, id) + ); + + CREATE TABLE timeseries ( + id INTEGER PRIMARY KEY, + class, + model_name, + scenario_name, + version INTEGER, + UNIQUE (model_name, scenario_name, version) + ); + + CREATE TABLE item ( + id INTEGER PRIMARY KEY, + timeseries INTEGER, + name VARCHAR, + type VARCHAR, + dims TEXT, + FOREIGN KEY (timeseries) REFERENCES timeseries(id), + UNIQUE (timeseries, name) + ); + + CREATE TABLE item_data ( + item INTEGER, + value BLOB, + FOREIGN KEY (item) REFERENCES item(id) + ); +""" + + +def init_schema(conn): + """Check or initialize the database schema in `conn`.""" + cur = conn.cursor() + + try: + # Check that the schema table exists and there is one expected value in it + cur.execute("SELECT value FROM schema WHERE key == 'version'") + except sqlite3.OperationalError as e: + if "no such table: schema" in e.args: + pass # Not initialized yet + else: + raise # Something else + else: + if "1.0" == cur.fetchone()[0]: + return # Already initialized + + # Initialize the database + cur.executescript(SCHEMA) + conn.commit() diff --git a/ixmp/backend/io.py b/ixmp/backend/io.py index be5d082ca..fe9e6935f 100644 --- a/ixmp/backend/io.py +++ b/ixmp/backend/io.py @@ -39,6 +39,11 @@ def ts_read_file(ts, path, firstyear=None, lastyear=None): ts.commit(msg) +def s_write_gdx(be, s, path): + """Write `s` to a GAMS Data Exchange (:file:`.gdx`) file at `path`.""" + raise NotImplementedError + + def s_write_excel(be, s, path, item_type, filters=None, max_row=None): """Write *s* to a Microsoft Excel file at *path*. diff --git a/ixmp/backend/jdbc.py b/ixmp/backend/jdbc.py index c25279c3a..b657dcc6a 100644 --- a/ixmp/backend/jdbc.py +++ b/ixmp/backend/jdbc.py @@ -569,24 +569,6 @@ def _index_and_set_attrs(self, jobj, ts): ts.scheme = s - def _validate_meta_args(self, model, scenario, version): - """Validate arguments for getting/setting/deleting meta""" - valid = False - if model and not scenario and version is None: - valid = True - elif scenario and not model and version is None: - valid = True - elif model and scenario and version is None: - valid = True - elif model and scenario and version is not None: - valid = True - if not valid: - msg = ( - "Invalid arguments. Valid combinations are: (model), " - "(scenario), (model, scenario), (model, scenario, version)" - ) - raise ValueError(msg) - def init(self, ts, annotation): klass = ts.__class__.__name__ @@ -996,7 +978,6 @@ def get_meta( version: int = None, strict: bool = False, ) -> dict: - self._validate_meta_args(model, scenario, version) if version is not None: version = java.Long(version) meta = self.jobj.getMeta(model, scenario, version, strict) @@ -1005,7 +986,6 @@ def get_meta( def set_meta( self, meta: dict, model: str = None, scenario: str = None, version: int = None ) -> None: - self._validate_meta_args(model, scenario, version) if version is not None: version = java.Long(version) @@ -1017,7 +997,6 @@ def set_meta( def remove_meta( self, categories, model: str = None, scenario: str = None, version: int = None ): - self._validate_meta_args(model, scenario, version) if version is not None: version = java.Long(version) return self.jobj.removeMeta(model, scenario, version, to_jlist(categories)) diff --git a/ixmp/core.py b/ixmp/core.py index a9b331df8..62a2ea6ef 100644 --- a/ixmp/core.py +++ b/ixmp/core.py @@ -2,7 +2,7 @@ from functools import partial from itertools import repeat, zip_longest from pathlib import Path -from typing import List, Union +from typing import Dict, Mapping, List, Optional, Union from warnings import warn from weakref import ProxyType, proxy @@ -56,13 +56,11 @@ class Platform: "add_scenario_name", "close_db", "get_doc", - "get_meta", "get_model_names", "get_scenario_names", "open_db", "remove_meta", "set_doc", - "set_meta", ] def __init__(self, name=None, backend=None, **backend_args): @@ -340,6 +338,59 @@ def add_region_synonym(self, region, mapped_to): if not self._existing_node(region): self._backend.set_node(region, synonym=mapped_to) + def get_meta( + self, + model: Optional[str] = None, + scenario: Optional[str] = None, + version: Optional[int] = None, + strict: bool = False, + ) -> Dict: + """Retrieve metadata. + + .. todo:: Complete docstring before merging. + """ + validate_meta_args(model, scenario, version) + return self._backend.get_meta(model, scenario, version, strict) + + def set_meta( + self, + data: Mapping, + model: Optional[str] = None, + scenario: Optional[str] = None, + version: Optional[int] = None, + ): + """Store metadata. + + .. todo:: Complete docstring before merging. + """ + kind = validate_meta_args(model, scenario, version) + + if len(data) == 0: + return + + # Arguments for checking other + other_args = [ + (model, None, None), + (None, scenario, None), + (model, scenario, None), + ] + other_data = list( + self._backend.get_meta(*args, strict=True) if i != kind else dict() + for i, args in enumerate(other_args) + ) + + # Check for existing meta key + for key, value in data.items(): + exists = list(key in other for other in other_data) + if any(exists): + args = other_args[exists.index(True)] + raise RuntimeError( + f"The meta category {repr(key)} is already used at another level: " + "model {!r}, scenario {!r}, version {!r}".format(*args) + ) + + self._backend.set_meta(data, model, scenario, version) + def timeslices(self): """Return all subannual timeslices defined in this Platform instance. @@ -1571,7 +1622,21 @@ def has_solution(self): If ``has_solution() == True``, model solution data exists in the db. """ - return self._backend("has_solution") + try: + return self._backend("has_solution") + except NotImplementedError: + # Fallback implementation if the backend does not define the optional + # method + + for ix_type in "equ", "var": + for name in self._backend("list_items", ix_type): + if len( + self._backend("item_get_elements", ix_type, name, filters=None) + ): + # At least one 'equ' or 'var' item has data === a "solution" + return True + + return False def remove_solution(self, first_model_year=None): """Remove the solution from the scenario @@ -1699,7 +1764,7 @@ def get_meta(self, name=None): meta category name """ all_meta = self.platform._backend.get_meta( - self.model, self.scenario, self.version + self.model, self.scenario, self.version, strict=False ) return all_meta[name] if name else all_meta @@ -1719,15 +1784,9 @@ def set_meta(self, name_or_dict, value=None): if isinstance(name_or_dict, str): name_or_dict = {name_or_dict: value} else: - msg = ( - "Unsupported parameter type of name_or_dict: %s. " - "Supported parameter types for name_or_dict are " - "String and Dictionary" - ) % type(name_or_dict) - raise ValueError(msg) - self.platform._backend.set_meta( - name_or_dict, self.model, self.scenario, self.version - ) + raise TypeError(f"{type(name_or_dict)}; expected str or dict") + + self.platform.set_meta(name_or_dict, self.model, self.scenario, self.version) def delete_meta(self, *args, **kwargs): """Remove scenario meta. @@ -1870,3 +1929,18 @@ def to_iamc_layout(df): df["subannual"] = "Year" return df + + +def validate_meta_args( + model: Optional[str], scenario: Optional[str], version: Optional[str] +): + """Helper for :meth:`.Platform.set_meta` and :meth:`.Platform.get_meta`.""" + try: + return [(1, 0, 0), (0, 1, 0), (1, 1, 0), (1, 1, 1)].index( + (model is not None, scenario is not None, version is not None) + ) + except IndexError: + raise ValueError( + "Invalid arguments. Valid combinations are: (model), (scenario), " + "(model, scenario), (model, scenario, version)" + ) from None diff --git a/ixmp/testing.py b/ixmp/testing.py index 44e9bd038..b606165b1 100644 --- a/ixmp/testing.py +++ b/ixmp/testing.py @@ -268,8 +268,8 @@ def create_test_platform(tmp_path, data_path, name, **properties): return props_file -def populate_test_platform(platform): - """Populate *platform* with data for testing. +def populate_test_platform(platform: Platform, solve: bool = True) -> None: + """Populate `platform` with data for testing. Many of the tests in test_core.py depend on this set of data. @@ -282,8 +282,13 @@ def populate_test_platform(platform): - 1 version of a TimeSeries with model name 'Douglas Adams' and scenario name 'Hitchhiker', containing 2 values. + + Parameters + ---------- + solve : bool, optional + If :obj:`True`, the scenarios are solved. """ - s1 = make_dantzig(platform, solve=True, quiet=True) + s1 = make_dantzig(platform, solve=solve, quiet=True) s2 = s1.clone() s2.set_as_default() diff --git a/ixmp/tests/backend/test_base.py b/ixmp/tests/backend/test_base.py index 9034d2486..19213b93f 100644 --- a/ixmp/tests/backend/test_base.py +++ b/ixmp/tests/backend/test_base.py @@ -47,7 +47,6 @@ class BE2(Backend): get_scenario_names = noop get_timeslices = noop get_units = noop - has_solution = noop init = noop init_item = noop is_default = noop diff --git a/ixmp/tests/backend/test_dbapi.py b/ixmp/tests/backend/test_dbapi.py new file mode 100644 index 000000000..b717f13f5 --- /dev/null +++ b/ixmp/tests/backend/test_dbapi.py @@ -0,0 +1,534 @@ +import copy + +import pandas as pd +import pandas.testing as pdt +import pytest + +import ixmp +from ixmp import Platform, Scenario, TimeSeries, config as ixmp_config +from ixmp.testing import make_dantzig, models, populate_test_platform +from ixmp.tests.core.test_timeseries import DATA, expected, wide + + +@pytest.fixture(scope="class") +def mp(request, tmp_env, tmp_path_factory): + """An empty ixmp.Platform connected to a temporary, in-memory database. + + This is a lightly-modified clone of ixmp.testing.test_mp fixture. + """ + # Long, unique name for the platform. + # Remove '/' so that the name can be used in URL tests. + platform_name = request.node.nodeid.replace("/", " ") + + # Add a platform + # ixmp_config.add_platform(platform_name, "dbapi", ":memory:") + ixmp_config.add_platform( + platform_name, "dbapi", tmp_path_factory.mktemp(platform_name) / "test.db" + ) + + # Launch Platform + mp = Platform(name=platform_name) + yield mp + + # Remove from config + ixmp_config.remove_platform(platform_name) + + +@pytest.fixture(scope="function", params=[TimeSeries, Scenario]) +def ts(request, mp): + """Copied from :func:`core.test_timeseries.ts`.""" + # Use a hash of the pytest node ID to avoid exceeding the maximum length for a + # scenario name + node = hash(request.node.nodeid.replace("/", " ")) + # Class of object to yield + cls = request.param + yield cls(mp, model=f"test-{node}", scenario="test", version="new") + + +class TestDatabaseBackend: + def test_init_backend(self, mp): + """A Platform backed by DatabaseBackend can be initialized.""" + + @pytest.mark.parametrize("kind", ("model", "scenario")) + def test_names(self, mp, kind): + setter = getattr(mp, f"add_{kind}_name") + getter = getattr(mp, f"get_{kind}_names") + + items = ("foo", "bar", "baz") + for i in items: + setter(i) + + assert items == tuple(getter()) + + def test_node(self, mp): + """Nodes can be stored and retrieved.""" + mp.add_region("AT", "country") + mp.regions() + + def test_unit(self, mp): + """Units can be stored and retrieved.""" + units = ["kg", "km"] + for unit in units: + mp.add_unit(unit) + + assert units == mp.units() + + # NB only works when using a file-backed `mp` fixture. With :memory:, nothing + # is persisted, so this doesn't work. + mp.close_db() + mp.open_db() + + assert units == mp.units() + + def test_timeslice(self, mp): + """Time slices can be stored and retrieved.""" + items = ( + dict(name="Spring", category="season", duration=1.0 / 9), + dict(name="Summer", category="season", duration=2.0 / 9), + dict(name="Autumn", category="season", duration=4.0 / 9), + dict(name="Winter", category="season", duration=2.0 / 9), + ) + for item in items: + mp.add_timeslice(**item) + + pdt.assert_frame_equal(pd.DataFrame(items), mp.timeslices()) + + def test_ts(self, mp): + """Test Backend.{init,set_as_default,is_default}.""" + args = dict(model="Foo model", scenario="Baz scenario", version="new") + ts0 = TimeSeries(mp, **args) + assert 1 == ts0.version + + ts1 = TimeSeries(mp, **args) + assert 2 == ts1.version + + ts1.set_as_default() + assert ts1.is_default() + + assert not ts0.is_default() + + del ts0, ts1 + + args.pop("version") + ts2 = TimeSeries(mp, **args) + assert 2 == ts2.version + assert ts2.is_default() + + @pytest.mark.parametrize("fmt", ["long", "wide"]) + def test_tsdata(self, ts, fmt): + # Copied from core.test_timeseries.test_add_timeseries + data = DATA[0] if fmt == "long" else wide(DATA[0]) + + # Data added + ts.add_timeseries(data) + ts.commit("") + + # Error: column 'unit' is missing + with pytest.raises(ValueError): + ts.add_timeseries(DATA[0].drop("unit", axis=1)) + + # Copied from core.test_timeseries.test_get + exp = expected(data, ts) + args = {} + + if fmt == "wide": + args["iamc"] = True + + # Data can be retrieved and has the expected value + obs = ts.timeseries(**args) + + pdt.assert_frame_equal(exp, obs) + + def test_geodata(self, ts): + # Copied from core.test_timeseries.test_add_geodata + + # Empty TimeSeries includes no geodata + pdt.assert_frame_equal(DATA["geo"].loc[[False, False, False]], ts.get_geodata()) + + # Data can be added + ts.add_geodata(DATA["geo"]) + ts.commit("") + + # Added data can be retrieved + obs = ts.get_geodata().sort_values("year").reset_index(drop=True) + pdt.assert_frame_equal(DATA["geo"], obs) + + @pytest.mark.parametrize( + "solve", + ( + False, + pytest.param( + True, + marks=pytest.mark.xfail( + raises=NotImplementedError, + reason=".backend.io.s_write_gdx() is not implemented", + ), + ), + ), + ) + def test_make_dantzig(self, mp, solve): + make_dantzig(mp, solve=solve) + + +@pytest.fixture(scope="class") +def mp_(mp): + """A Platform containing test data.""" + populate_test_platform(mp, solve=False) + yield mp + + +# Copied from core.test_meta + +SAMPLE_META = {"sample_int": 3, "sample_string": "string_value", "sample_bool": False} +META_ENTRIES = [ + {"sample_int": 3}, + {"sample_string": "string_value"}, + {"sample_bool": False}, + { + "sample_int": 3, + "sample_string": "string_value", + "sample_bool": False, + }, + {"mixed_category": ["string", 0.01, 2, True]}, +] +DANTZIG = models["dantzig"] + + +@pytest.mark.parametrize("meta", META_ENTRIES) +def test_set_meta_missing_argument(mp_, meta): + mp = mp_ + + with pytest.raises(ValueError): + mp.set_meta(meta) + with pytest.raises(ValueError): + mp.set_meta(meta, model=DANTZIG["model"], version=0) + with pytest.raises(ValueError): + mp.set_meta(meta, scenario=DANTZIG["scenario"], version=0) + + +@pytest.mark.parametrize("meta", META_ENTRIES) +def test_set_get_meta(mp_, meta): + """Assert that storing+retrieving meta yields expected values.""" + mp = mp_ + + mp.set_meta(meta, model=DANTZIG["model"]) + obs = mp.get_meta(model=DANTZIG["model"]) + assert obs == meta + + +@pytest.mark.parametrize("meta", META_ENTRIES) +def test_unique_meta(mp_, meta): + """ + When setting a meta category on two distinct levels, a uniqueness error is + expected. + """ + mp = mp_ + + scenario = ixmp.Scenario(mp, **DANTZIG, version="new") + scenario.commit("save dummy scenario") + mp.set_meta(meta, model=DANTZIG["model"]) + expected = ( + r"The meta category .* is already used at another level: " + r"model 'canning problem', scenario None, version None" + ) + with pytest.raises(Exception, match=expected): + mp.set_meta(meta, **DANTZIG, version=scenario.version) + scen = ixmp.Scenario(mp, **DANTZIG) + with pytest.raises(Exception, match=expected): + scen.set_meta(meta) + # changing the category value type of an entry should also raise an error + meta = {"sample_entry": 3} + mp.set_meta(meta, **DANTZIG) + meta["sample_entry"] = "test-string" + expected = ( + r"The meta category .* is already used at another level: " + r"model 'canning problem', scenario 'standard', version None" + ) + with pytest.raises(Exception, match=expected): + mp.set_meta(meta, **DANTZIG, version=scenario.version) + + +@pytest.mark.parametrize("meta", META_ENTRIES) +def test_set_get_meta_equals(mp_, meta): + mp = mp_ + + initial_meta = mp.get_meta(scenario=DANTZIG["scenario"]) + mp.set_meta(meta, model=DANTZIG["model"]) + obs_meta = mp.get_meta(scenario=DANTZIG["scenario"]) + assert obs_meta == initial_meta + + +@pytest.mark.parametrize("meta", META_ENTRIES) +def test_unique_meta_model_scenario(mp_, meta): + """ + When setting a meta key for a Model, it shouldn't be possible to set it + for a Model+Scenario then. + """ + mp = mp_ + + mp.set_meta(meta, model=DANTZIG["model"]) + expected = r"The meta category .* is already used at another level: " + with pytest.raises(Exception, match=expected): + mp.set_meta(meta, **DANTZIG) + + # Setting this meta category on a new model should fail too + dantzig2 = { + "model": "canning problem 2", + "scenario": "standard", + } + mp.add_model_name(dantzig2["model"]) + expected = r"The meta category .* is already used at another level: " + with pytest.raises(Exception, match=expected): + mp.set_meta(meta, **dantzig2) + + +@pytest.mark.parametrize("meta", META_ENTRIES) +def test_get_meta_strict(mp_, meta): + """ + Set meta indicators on several model/scenario/version levels and test + the 'strict' parameter of get_meta(). + """ + mp = mp_ + + # set meta on various levels + model_meta = {"model_int": 3, "model_string": "string_value", "model_bool": False} + scenario_meta = { + "scenario_int": 3, + "scenario_string": "string_value", + "scenario_bool": False, + } + meta2 = {"sample_int2": 3, "sample_string2": "string_value2", "sample_bool2": False} + meta3 = { + "sample_int3": 3, + "sample_string3": "string_value3", + "sample_bool3": False, + "mixed3": ["string", 0.01, 2, True], + } + meta_scen = { + "sample_int4": 3, + "sample_string4": "string_value4", + "sample_bool4": False, + "mixed4": ["string", 0.01, 2, True], + } + scenario2 = "standard 2" + model2 = "canning problem 2" + mp.add_scenario_name(scenario2) + mp.add_model_name(model2) + dantzig2 = { + "model": model2, + "scenario": "standard", + } + dantzig3 = { + "model": model2, + "scenario": scenario2, + } + mp.set_meta(model_meta, model=DANTZIG["model"]) + mp.set_meta(scenario_meta, scenario=DANTZIG["scenario"]) + mp.set_meta(meta, **DANTZIG) + mp.set_meta(meta2, **dantzig2) + mp.set_meta(meta3, **dantzig3) + scen = ixmp.Scenario(mp, **DANTZIG, version="new") + scen.commit("save dummy scenario") + scen.set_meta(meta_scen) + + # Retrieve and validate meta indicators + # model + obs1 = mp.get_meta(model=DANTZIG["model"]) + assert obs1 == model_meta + # scenario + obs2 = mp.get_meta(scenario=DANTZIG["scenario"], strict=True) + assert obs2 == scenario_meta + # model+scenario + obs3 = mp.get_meta(**DANTZIG) + exp3 = copy.copy(meta) + exp3.update(model_meta) + exp3.update(scenario_meta) + assert obs3 == exp3 + # model+scenario, strict + obs3_strict = mp.get_meta(**DANTZIG, strict=True) + assert obs3_strict == meta + assert obs3 != obs3_strict + + # second model+scenario combination + obs4 = mp.get_meta(**dantzig2) + exp4 = copy.copy(meta2) + exp4.update(scenario_meta) + assert obs4 == exp4 + # second model+scenario combination, strict + obs4_strict = mp.get_meta(**dantzig2, strict=True) + assert obs4_strict == meta2 + assert obs4 != obs4_strict + + # second model+scenario combination + obs5 = mp.get_meta(**dantzig3) + exp5 = copy.copy(meta3) + assert obs5 == exp5 + + # model+scenario+version + obs6 = mp.get_meta(**DANTZIG, version=scen.version) + exp6 = copy.copy(meta_scen) + exp6.update(meta) + exp6.update(model_meta) + exp6.update(scenario_meta) + assert obs6 == exp6 + obs6_strict = mp.get_meta( + DANTZIG["model"], DANTZIG["scenario"], scen.version, strict=True + ) + assert obs6_strict == meta_scen + + +@pytest.mark.parametrize("meta", META_ENTRIES) +def test_unique_meta_scenario(mp_, meta): + """ + When setting a meta key on a specific Scenario run, setting the same key + on an higher level (Model or Model+Scenario) should fail. + """ + mp = mp_ + + scen = ixmp.Scenario(mp, **DANTZIG) + scen.set_meta(meta) + # add a second scenario and verify that setting+getting Meta works + scen2 = ixmp.Scenario(mp, **DANTZIG, version="new") + scen2.commit("save dummy scenario") + scen2.set_meta(meta) + assert scen2.get_meta() == scen.get_meta() + + expected = ( + r"The meta category .* is already used at another level: " + r"model 'canning problem', scenario 'standard', " + ) + with pytest.raises(Exception, match=expected): + mp.set_meta(meta, **DANTZIG) + with pytest.raises(Exception, match=expected): + mp.set_meta(meta, model=DANTZIG["model"]) + + +def test_meta_partial_overwrite(mp_): + mp = mp_ + + meta1 = { + "sample_string": 3.0, + "another_string": "string_value", + "sample_bool": False, + } + meta2 = {"sample_string": 5.0, "yet_another_string": "hello", "sample_bool": True} + scen = ixmp.Scenario(mp, **DANTZIG) + scen.set_meta(meta1) + scen.set_meta(meta2) + expected = copy.copy(meta1) + expected.update(meta2) + obs = scen.get_meta() + assert obs == expected + + +def test_remove_meta(mp_): + mp = mp_ + + meta = {"sample_int": 3.0, "another_string": "string_value"} + remove_key = "another_string" + mp.set_meta(meta, **DANTZIG) + mp.remove_meta(remove_key, **DANTZIG) + expected = copy.copy(meta) + del expected[remove_key] + obs = mp.get_meta(**DANTZIG) + assert expected == obs + + +def test_remove_invalid_meta(mp_): + """ + Removing nonexisting meta entries or None shouldn't result in any meta + being removed. Providing None should give a ValueError. + """ + mp = mp_ + + mp.set_meta(SAMPLE_META, **DANTZIG) + with pytest.raises(ValueError): + mp.remove_meta(None, **DANTZIG) + mp.remove_meta("nonexisting_category", **DANTZIG) + mp.remove_meta([], **DANTZIG) + obs = mp.get_meta(**DANTZIG) + assert obs == SAMPLE_META + + +def test_set_and_remove_meta_scenario(mp_): + """ + Test partial overwriting and meta deletion on scenario level. + """ + mp = mp_ + + meta1 = {"sample_string": 3.0, "another_string": "string_value"} + meta2 = {"sample_string": 5.0, "yet_another_string": "hello"} + remove_key = "another_string" + + scen = ixmp.Scenario(mp, **DANTZIG) + scen.set_meta(meta1) + scen.set_meta(meta2) + expected = copy.copy(meta1) + expected.update(meta2) + obs = scen.get_meta() + assert expected == obs + + scen.remove_meta(remove_key) + del expected[remove_key] + obs = scen.get_meta() + assert obs == expected + + +def test_scenario_delete_meta_warning(mp_): + """ + Scenario.delete_meta works but raises a deprecation warning. + + This test can be removed once Scenario.delete_meta is removed. + """ + mp = mp_ + + scen = ixmp.Scenario(mp, **DANTZIG) + meta = {"sample_int": 3, "sample_string": "string_value"} + remove_key = "sample_string" + + scen.set_meta(meta) + with pytest.warns(DeprecationWarning): + scen.delete_meta(remove_key) + expected = copy.copy(meta) + del expected[remove_key] + obs = scen.get_meta() + assert obs == expected + + +def test_meta_arguments(mp_): + """Set scenario meta with key-value arguments""" + mp = mp_ + + meta = {"sample_int": 3} + scen = ixmp.Scenario(mp, **DANTZIG) + scen.set_meta(meta) + # add a second scenario and verify that setting Meta for it works + scen2 = ixmp.Scenario(mp, **DANTZIG, version="new") + scen2.commit("save dummy scenario") + scen2.set_meta(*meta.popitem()) + assert scen.get_meta() == scen2.get_meta() + + +def test_update_meta_lists(mp_): + """Set metadata categories having list/array values.""" + mp = mp_ + + SAMPLE_META = {"list_category": ["a", "b", "c"]} + mp.set_meta(SAMPLE_META, model=DANTZIG["model"]) + obs = mp.get_meta(model=DANTZIG["model"]) + assert obs == SAMPLE_META + # try updating meta + SAMPLE_META = {"list_category": ["a", "e", "f"]} + mp.set_meta(SAMPLE_META, model=DANTZIG["model"]) + obs = mp.get_meta(model=DANTZIG["model"]) + assert obs == SAMPLE_META + + +def test_meta_mixed_list(mp_): + """Set metadata categories having list/array values.""" + mp = mp_ + + meta = {"mixed_category": ["string", 0.01, True]} + mp.set_meta(meta, model=DANTZIG["model"]) + obs = mp.get_meta(model=DANTZIG["model"]) + assert obs == meta