diff --git a/message_ix_models/model/transport/operator.py b/message_ix_models/model/transport/operator.py index aad1b7cb15..0be3b4f30c 100644 --- a/message_ix_models/model/transport/operator.py +++ b/message_ix_models/model/transport/operator.py @@ -28,10 +28,10 @@ from genno.operator import apply_units, rename_dims from genno.testing import assert_qty_allclose, assert_units from scipy import integrate -from sdmx.model.v21 import Code +from sdmx.model.common import Code, Codelist from message_ix_models import ScenarioInfo -from message_ix_models.model.structure import get_codelist, get_codes +from message_ix_models.model.structure import get_codelist from message_ix_models.project.navigate import T35_POLICY from message_ix_models.report.operator import compound_growth from message_ix_models.report.util import as_quantity @@ -59,6 +59,7 @@ "base_model_data_header", "base_shares", "broadcast_advance", + "broadcast_t_c_l", "broadcast_y_yv_ya", "cost", "distance_ldv", @@ -74,7 +75,6 @@ "iea_eei_fv", "indexers_n_cd", "indexers_usage", - "input_commodity_level", "logit", "max", "maybe_select", @@ -215,6 +215,53 @@ def broadcast_n(qty: "AnyQuantity", n: List[str], *, dim: str = "n") -> "AnyQuan return qty +def broadcast_t_c_l( + technologies: List[Code], + commodities: List[Code], + kind: Literal["input", "output"], + default_level: Optional[str] = None, +) -> "AnyQuantity": + """Return a Quantity for broadcasting dimension (t) to (c, l) for `kind`.""" + + # Convert a list of Codes into an SDMX Codelist for simpler usage + cl_commodity = Codelist(items={c.id: c for c in commodities}) + + # Map each `tech` to a `commodity` and `level` + data = [] + for tech in technologies: + # Retrieve the "input" or "output" annotation for this technology + input_ = tech.eval_annotation(kind) + if input_ is None: + continue # No I/O commodity for this technology → skip + + # Retrieve the "commodity" key: either the ID of one commodity, or a sequence + commodity = input_.get("commodity", ()) + + # Iterate over 0 or more commodity IDs + for c_id in (commodity,) if isinstance(commodity, str) else commodity: + try: + # Retrieve the Code object for this commodity + c = cl_commodity[c_id] + except KeyError: + continue # Unknown commodity + + # Level, in order of precedence: + # 1. Technology-specific input level from `t_code`. + # 2. Default level for the commodity from `c_code`. + # 3. `default_level` argument to this function. + level = input_.get("level", None) + try: + level = level or str(c.get_annotation(id="level").text) + except KeyError: + level or default_level + + data.append((tech.id, c, level)) + + idx = pd.MultiIndex.from_frame(pd.DataFrame(data, columns=["t", "c", "l"])) + s = pd.Series(1.0, index=idx) + return genno.Quantity(s) + + def broadcast_y_yv_ya( y: List[int], y_include: List[int], *, method: Literal["product", "zip"] = "product" ) -> "AnyQuantity": @@ -624,46 +671,6 @@ def groups_y_annual(duration_period: "AnyQuantity") -> "AnyQuantity": return dict(y=result) -def input_commodity_level(t: List[Code], default_level=None) -> "AnyQuantity": - """Return a Quantity for broadcasting dimension (t) to (c, l) for ``input``. - - .. todo:: This essentially replaces :func:`.transport.util.input_commodity_level`, - and is much faster. Replace usage of the other function with this one, then - remove the other. - """ - - c_info = get_codes("commodity") - - # Map each `tech` to a `commodity` and `level` - data = [] - for tech in t: - # Retrieve the "input" annotation for this technology - input_ = tech.eval_annotation("input") - - # Retrieve the code for this commodity - try: - # Commodity ID - commodity = input_["commodity"] - c_code = c_info[c_info.index(commodity)] - except (KeyError, ValueError, TypeError): - # TypeError: input_ is None - # KeyError: "commodity" not in the annotation - # ValueError: `commodity` not in c_info - continue - - # Level, in order of precedence: - # 1. Technology-specific input level from `t_code`. - # 2. Default level for the commodity from `c_code`. - # 3. `default_level` argument to this function. - level = input_.get("level") or c_code.eval_annotation("level") or default_level - - data.append((tech.id, commodity, level)) - - idx = pd.MultiIndex.from_frame(pd.DataFrame(data, columns=["t", "c", "l"])) - s = pd.Series(1.0, index=idx) - return genno.Quantity(s) - - def logit( x: "AnyQuantity", k: "AnyQuantity", lamda: "AnyQuantity", y: List[int], dim: str ) -> "AnyQuantity":