diff --git a/premise/electricity.py b/premise/electricity.py index d05227b7..f0f4d434 100644 --- a/premise/electricity.py +++ b/premise/electricity.py @@ -50,13 +50,14 @@ def load_electricity_variables() -> dict: return techs + def get_losses_per_country(database: list) -> Dict[str, Dict[str, float]]: losses = defaultdict(dict) for ds in database: if ds["name"] in [ "market for electricity, high voltage", "market for electricity, medium voltage", - "market for electricity, low voltage" + "market for electricity, low voltage", ]: country = ds["location"] if "high voltage" in ds["name"]: @@ -67,35 +68,63 @@ def get_losses_per_country(database: list) -> Dict[str, Dict[str, float]]: market_type = "low" for e in ds["exchanges"]: - if e["name"].startswith("market for electricity") and e["type"] == "technosphere": - losses[country].update({f"Transformation loss {market_type} voltage": e["amount"]}) + if ( + e["name"].startswith("market for electricity") + and e["type"] == "technosphere" + ): + losses[country].update( + {f"Transformation loss {market_type} voltage": e["amount"]} + ) for e in ds["exchanges"]: if e["type"] == "production": if "production volume" in e: if "Production volume" not in losses[country]: - losses[country].update({"Production volume": e["production volume"]}) + losses[country].update( + {"Production volume": e["production volume"]} + ) else: - if e["production volume"] > losses[country]["Production volume"]: - losses[country].update({"Production volume": e["production volume"]}) - - if ds["name"] == "electricity voltage transformation from medium to low voltage": + if ( + e["production volume"] + > losses[country]["Production volume"] + ): + losses[country].update( + {"Production volume": e["production volume"]} + ) + + if ( + ds["name"] + == "electricity voltage transformation from medium to low voltage" + ): country = ds["location"] for e in ds["exchanges"]: - if e["name"].startswith("market for electricity") and e["type"] == "technosphere": - losses[country].update({f"Transmission loss to low voltage": e["amount"] - 1}) + if ( + e["name"].startswith("market for electricity") + and e["type"] == "technosphere" + ): + losses[country].update( + {f"Transmission loss to low voltage": e["amount"] - 1} + ) - if ds["name"] == "electricity voltage transformation from high to medium voltage": + if ( + ds["name"] + == "electricity voltage transformation from high to medium voltage" + ): country = ds["location"] for e in ds["exchanges"]: - if e["name"].startswith("market for electricity") and e["type"] == "technosphere": - losses[country].update({f"Transmission loss to medium voltage": e["amount"] - 1}) + if ( + e["name"].startswith("market for electricity") + and e["type"] == "technosphere" + ): + losses[country].update( + {f"Transmission loss to medium voltage": e["amount"] - 1} + ) return losses def get_production_weighted_losses( - losses: Dict[str, Dict[str, float]], locs: List[str] + losses: Dict[str, Dict[str, float]], locs: List[str] ) -> Dict[str, Dict[str, float]]: """ Return the transformation, transmission and distribution losses at a given voltage level for a given location. @@ -112,8 +141,8 @@ def get_production_weighted_losses( ) transf_loss += ( - dict_loss["Transformation loss high voltage"] - * dict_loss["Production volume"] + dict_loss["Transformation loss high voltage"] + * dict_loss["Production volume"] ) cumul_prod += dict_loss["Production volume"] @@ -141,12 +170,12 @@ def get_production_weighted_losses( }, ) transf_loss += ( - dict_loss["Transformation loss medium voltage"] - * dict_loss["Production volume"] + dict_loss["Transformation loss medium voltage"] + * dict_loss["Production volume"] ) distr_loss += ( - dict_loss["Transmission loss to medium voltage"] - * dict_loss["Production volume"] + dict_loss["Transmission loss to medium voltage"] + * dict_loss["Production volume"] ) cumul_prod += dict_loss["Production volume"] transf_loss /= cumul_prod @@ -166,12 +195,12 @@ def get_production_weighted_losses( }, ) transf_loss += ( - dict_loss["Transformation loss low voltage"] - * dict_loss["Production volume"] + dict_loss["Transformation loss low voltage"] + * dict_loss["Production volume"] ) distr_loss += ( - dict_loss["Transmission loss to low voltage"] - * dict_loss["Production volume"] + dict_loss["Transmission loss to low voltage"] + * dict_loss["Production volume"] ) cumul_prod += dict_loss["Production volume"] transf_loss /= cumul_prod @@ -183,10 +212,10 @@ def get_production_weighted_losses( def _update_electricity( - scenario, - version, - system_model, - use_absolute_efficiency, + scenario, + version, + system_model, + use_absolute_efficiency, ): electricity = Electricity( database=scenario["database"], @@ -260,17 +289,17 @@ class Electricity(BaseTransformation): """ def __init__( - self, - database: List[dict], - iam_data: IAMDataCollection, - model: str, - pathway: str, - year: int, - version: str, - system_model: str, - use_absolute_efficiency: bool = False, - cache: dict = None, - index: dict = None, + self, + database: List[dict], + iam_data: IAMDataCollection, + model: str, + pathway: str, + year: int, + version: str, + system_model: str, + use_absolute_efficiency: bool = False, + cache: dict = None, + index: dict = None, ) -> None: super().__init__( database, @@ -321,8 +350,8 @@ def get_production_per_tech_dict(self) -> Dict[Tuple[str, str], float]: for dataset_names in self.powerplant_map.values(): for name in dataset_names: for dataset in ws.get_many( - self.database, - ws.equals("name", name), + self.database, + ws.equals("name", name), ): for exc in ws.production(dataset): # even if non-existent, we set a minimum value of 1e-9 @@ -342,7 +371,7 @@ def check_for_production_volume(self, suppliers: List[dict]) -> List[dict]: ] def get_production_weighted_share( - self, supplier: dict, suppliers: List[dict] + self, supplier: dict, suppliers: List[dict] ) -> float: """ Return the share of production of an electricity-producing dataset in a specific location, @@ -404,7 +433,7 @@ def create_new_markets_low_voltage(self) -> None: "unit": "kilowatt hour", "database": self.database[1]["database"], "comment": f"Dataset created by `premise` from the IAM model {self.model.upper()}" - f" using the pathway {self.scenario} for the year {self.year}.", + f" using the pathway {self.scenario} for the year {self.year}.", "exchanges": [], } @@ -468,12 +497,12 @@ def generate_regional_markets(region: str) -> dict: electricity_mix = dict( zip( self.iam_data.electricity_markets.variables.values, - self.iam_data.electricity_markets.sel( - region=region - ).interp( + self.iam_data.electricity_markets.sel(region=region) + .interp( year=self.year, kwargs={"fill_value": "extrapolate"}, - ).values, + ) + .values, ) ) @@ -668,7 +697,7 @@ def create_new_markets_medium_voltage(self) -> None: "unit": "kilowatt hour", "database": self.database[1]["database"], "comment": f"Dataset created by `premise` from the IAM model {self.model.upper()}" - f" using the pathway {self.scenario} for the year {self.year}.", + f" using the pathway {self.scenario} for the year {self.year}.", "exchanges": [], } @@ -860,7 +889,7 @@ def create_new_markets_high_voltage(self) -> None: "unit": "kilowatt hour", "database": self.database[1]["database"], "comment": f"Dataset created by `premise` from the IAM model {self.model.upper()}" - f" using the pathway {self.scenario} for the year {self.year}.", + f" using the pathway {self.scenario} for the year {self.year}.", "exchanges": [], } @@ -938,12 +967,12 @@ def generate_regional_markets(region: str) -> dict: electricity_mix = dict( zip( self.iam_data.electricity_markets.variables.values, - self.iam_data.electricity_markets.sel( - region=region - ).interp( + self.iam_data.electricity_markets.sel(region=region) + .interp( year=self.year, kwargs={"fill_value": "extrapolate"}, - ).values, + ) + .values, ) ) @@ -1061,7 +1090,9 @@ def generate_regional_markets(region: str) -> dict: self.write_log(new_world_dataset) def generate_world_market( - self, dataset: dict, regions: List[str], + self, + dataset: dict, + regions: List[str], ) -> dict: """ Generate the world market for a given dataset and product variables. @@ -1120,18 +1151,18 @@ def generate_world_market( share = ( ( - self.iam_data.production_volumes.sel( - region=r, - variables=self.iam_data.electricity_markets.variables.values, - ).sum(dim="variables") - / self.iam_data.production_volumes.sel( - region=[ - x - for x in self.iam_data.production_volumes.region.values - if x != "World" - ], - variables=self.iam_data.electricity_markets.variables.values, - ).sum(dim=["variables", "region"]) + self.iam_data.production_volumes.sel( + region=r, + variables=self.iam_data.electricity_markets.variables.values, + ).sum(dim="variables") + / self.iam_data.production_volumes.sel( + region=[ + x + for x in self.iam_data.production_volumes.region.values + if x != "World" + ], + variables=self.iam_data.electricity_markets.variables.values, + ).sum(dim=["variables", "region"]) ) .interp( year=self.year, @@ -1194,11 +1225,11 @@ def update_efficiency_of_solar_pv(self) -> None: power *= 1000 for exc in ws.technosphere( - dataset, - *[ - ws.contains("name", "photovoltaic"), - ws.equals("unit", "square meter"), - ], + dataset, + *[ + ws.contains("name", "photovoltaic"), + ws.equals("unit", "square meter"), + ], ): surface = float(exc["amount"]) max_power = surface # in kW, since we assume a constant 1,000W/m^2 @@ -1265,9 +1296,9 @@ def update_ng_production_ds(self) -> None: to_remove = [] for exc in dataset["exchanges"]: if ( - exc["name"] == "market for natural gas, high pressure" - and exc["location"] in countries - and exc["type"] == "technosphere" + exc["name"] == "market for natural gas, high pressure" + and exc["location"] in countries + and exc["type"] == "technosphere" ): if exc["location"] in amount: amount[exc["location"]] += exc["amount"] @@ -1282,7 +1313,7 @@ def update_ng_production_ds(self) -> None: e for e in dataset["exchanges"] if (e["name"], e.get("product"), e.get("location"), e["type"]) - not in to_remove + not in to_remove ] for loc, val in amount.items(): @@ -1306,9 +1337,9 @@ def update_ng_production_ds(self) -> None: to_remove = [] for exc in dataset["exchanges"]: if ( - any((i in exc["name"] for i in names)) - and exc["location"] in countries - and exc["type"] == "technosphere" + any((i in exc["name"] for i in names)) + and exc["location"] in countries + and exc["type"] == "technosphere" ): if exc["location"] in amount: amount[exc["location"]] += exc["amount"] @@ -1323,7 +1354,7 @@ def update_ng_production_ds(self) -> None: e for e in dataset["exchanges"] if (e["name"], e.get("product"), e.get("location"), e["type"]) - not in to_remove + not in to_remove ] for loc, val in amount.items(): @@ -1400,10 +1431,10 @@ def create_region_specific_power_plants(self): ) for dataset in ws.get_many( - self.database, - ws.either( - *[ws.contains("name", name) for name in list_datasets_to_duplicate] - ), + self.database, + ws.either( + *[ws.contains("name", name) for name in list_datasets_to_duplicate] + ), ): new_plants = self.fetch_proxies( name=dataset["name"], @@ -1447,16 +1478,16 @@ def create_region_specific_power_plants(self): for exc in plant["exchanges"]: if ( - exc["type"] == "technosphere" - and exc["unit"] == "kilogram" - and exc["name"].startswith("carbon dioxide, captured") + exc["type"] == "technosphere" + and exc["unit"] == "kilogram" + and exc["name"].startswith("carbon dioxide, captured") ): exc["amount"] = co2_amount * 0.9 if ( - exc["type"] == "biosphere" - and exc["unit"] == "kilogram" - and exc["name"].startswith("Carbon dioxide, fossil") + exc["type"] == "biosphere" + and exc["unit"] == "kilogram" + and exc["name"].startswith("Carbon dioxide, fossil") ): exc["amount"] = co2_amount * 0.9 * -1 @@ -1502,14 +1533,14 @@ def update_electricity_efficiency(self) -> None: # print("Rescale inventories and emissions for", technology) for dataset in ws.get_many( - self.database, - ws.equals("unit", "kilowatt hour"), - ws.either( - *[ - ws.equals("name", n) - for n in dict_technology["technology filters"] - ] - ), + self.database, + ws.equals("unit", "kilowatt hour"), + ws.either( + *[ + ws.equals("name", n) + for n in dict_technology["technology filters"] + ] + ), ): if not self.is_in_index(dataset): continue @@ -1525,10 +1556,10 @@ def update_electricity_efficiency(self) -> None: dataset["location"] ) if ( - iam_location - in self.iam_data.electricity_efficiencies.coords[ - "region" - ].values + iam_location + in self.iam_data.electricity_efficiencies.coords[ + "region" + ].values ): # Find relative efficiency change indicated by the IAM scaling_factor = 1 / self.find_iam_efficiency_change( @@ -1562,9 +1593,9 @@ def update_electricity_efficiency(self) -> None: # if ei_eff is different from 1 and if the new efficiency # is not NaN or zero, we can rescale the exchanges if ( - ei_eff != 1 - and new_efficiency != 0 - and not np.isnan(new_efficiency) + ei_eff != 1 + and new_efficiency != 0 + and not np.isnan(new_efficiency) ): scaling_factor = ei_eff / new_efficiency else: @@ -1572,8 +1603,8 @@ def update_electricity_efficiency(self) -> None: # ensure that the dataset has not already been adjusted if ( - "new efficiency" not in dataset.get("log parameters", {}) - and scaling_factor != 1 + "new efficiency" not in dataset.get("log parameters", {}) + and scaling_factor != 1 ): if "log parameters" not in dataset: dataset["log parameters"] = {} @@ -1675,7 +1706,7 @@ def adjust_coal_power_plant_emissions(self) -> None: 0 ), "efficiency change": ei_eff - / new_eff.values.item(0), + / new_eff.values.item(0), } ) @@ -1696,34 +1727,34 @@ def adjust_coal_power_plant_emissions(self) -> None: CHP="co-generation" in dataset["name"], variable=species, ) / ( - self.iam_data.coal_power_plants.sel( - country=loc, - fuel=( - "Anthracite coal" - if "hard coal" in dataset["name"] - else "Lignite coal" - ), - CHP="co-generation" in dataset["name"], - variable="generation", - ) - * 1e3 - ) + self.iam_data.coal_power_plants.sel( + country=loc, + fuel=( + "Anthracite coal" + if "hard coal" in dataset["name"] + else "Lignite coal" + ), + CHP="co-generation" in dataset["name"], + variable="generation", + ) + * 1e3 + ) if not np.isnan(emission_factor.values.item(0)): for exc in ws.biosphere(dataset): if ( - exc["name"] == flow - and exc.get( - "categories", - [ - None, - ], - )[0] - == "air" + exc["name"] == flow + and exc.get( + "categories", + [ + None, + ], + )[0] + == "air" ): scaling_factor = ( - emission_factor.values.item(0) - / exc["amount"] + emission_factor.values.item(0) + / exc["amount"] ) exc["amount"] = float( emission_factor.values.item(0) @@ -1854,10 +1885,10 @@ def update_electricity_markets(self) -> None: # print("Empty old electricity datasets") for dataset in ws.get_many( - self.database, - ws.either(*[ws.contains("name", n) for n in list_to_empty]), - ws.equals("unit", "kilowatt hour"), - ws.doesnt_contain_any("name", list_to_preserve), + self.database, + ws.either(*[ws.contains("name", n) for n in list_to_empty]), + ws.equals("unit", "kilowatt hour"), + ws.doesnt_contain_any("name", list_to_preserve), ): # add tag dataset["has_downstream_consumer"] = False diff --git a/premise/external.py b/premise/external.py index d6675cc6..8c488dd3 100644 --- a/premise/external.py +++ b/premise/external.py @@ -12,8 +12,8 @@ import wurst import xarray as xr import yaml -from wurst import searching as ws from datapackage import Package +from wurst import searching as ws from .clean_datasets import get_biosphere_flow_uuid from .data_collection import IAMDataCollection @@ -26,7 +26,7 @@ get_correspondence_bio_flows, ) from .transformation import BaseTransformation, get_shares_from_production_volume -from .utils import rescale_exchanges, HiddenPrints +from .utils import HiddenPrints, rescale_exchanges LOG_CONFIG = DATA_DIR / "utils" / "logging" / "logconfig.yaml" @@ -51,21 +51,22 @@ def _update_external_scenarios( datapackages: list, ) -> dict: datapackages = [ - Package(f"{dp}/datapackage.json") if isinstance(dp, str) else dp for dp in datapackages + Package(f"{dp}/datapackage.json") if isinstance(dp, str) else dp + for dp in datapackages ] for d, data_package in enumerate(datapackages): inventories = [] with HiddenPrints(): if "inventories" in [r.name for r in data_package.resources]: if data_package.get_resource("inventories"): - additional = AdditionalInventory( - database=scenario["database"], - version_in=data_package.descriptor["ecoinvent"]["version"], - version_out=version, - path=data_package.get_resource("inventories").source, - system_model=system_model, - ) - inventories.extend(additional.merge_inventory()) + additional = AdditionalInventory( + database=scenario["database"], + version_in=data_package.descriptor["ecoinvent"]["version"], + version_out=version, + path=data_package.get_resource("inventories").source, + system_model=system_model, + ) + inventories.extend(additional.merge_inventory()) resource = data_package.get_resource("config") config_file = yaml.safe_load(resource.raw_read()) diff --git a/premise/inventory_imports.py b/premise/inventory_imports.py index 3ffb7831..c2dc37d8 100644 --- a/premise/inventory_imports.py +++ b/premise/inventory_imports.py @@ -61,9 +61,7 @@ def get_biosphere_code(version) -> dict: file, delimiter=get_delimiter(filepath=fp), ) - return { - (row[0], row[1], row[2], row[3]): row[4] for row in input_dict - } + return {(row[0], row[1], row[2], row[3]): row[4] for row in input_dict} def get_consequential_blacklist(): diff --git a/premise/new_database.py b/premise/new_database.py index ffe26e76..5385b06c 100644 --- a/premise/new_database.py +++ b/premise/new_database.py @@ -47,6 +47,7 @@ from .steel import _update_steel from .transport import _update_vehicles from .utils import ( + HiddenPrints, clear_existing_cache, create_scenario_list, eidb_label, @@ -55,7 +56,6 @@ load_constants, print_version, warning_about_biogenic_co2, - HiddenPrints ) logger = logging.getLogger("module") @@ -488,8 +488,8 @@ def __init__( # and system_model is "consequential" # raise an error if ( - self.version not in ["3.8", "3.9", "3.9.1"] - and self.system_model == "consequential" + self.version not in ["3.8", "3.9", "3.9.1"] + and self.system_model == "consequential" ): raise ValueError( "Consequential system model is only available for ecoinvent 3.8 or 3.9." @@ -844,7 +844,15 @@ def update(self, sectors: [str, list, None] = None) -> None: }, "external": { "func": _update_external_scenarios, - "args": (self.version, self.system_model, [x.base_path for x in self.datapackages] if self.datapackages else None), + "args": ( + self.version, + self.system_model, + ( + [x.base_path for x in self.datapackages] + if self.datapackages + else None + ), + ), }, } @@ -909,9 +917,7 @@ def update(self, sectors: [str, list, None] = None) -> None: else: # Process scenarios in sequence for the current sector for s, scenario in enumerate(self.scenarios): - self.scenarios[s] = update_func( - scenario, *fixed_args - ) + self.scenarios[s] = update_func(scenario, *fixed_args) # Manually update the outer progress bar after each sector is completed pbar_outer.update(1) diff --git a/premise/transport.py b/premise/transport.py index d3006976..771be612 100644 --- a/premise/transport.py +++ b/premise/transport.py @@ -17,7 +17,7 @@ from .filesystem_constants import DATA_DIR, IAM_OUTPUT_DIR, INVENTORY_DIR from .inventory_imports import VariousVehicles from .transformation import BaseTransformation, IAMDataCollection -from .utils import eidb_label, HiddenPrints +from .utils import HiddenPrints, eidb_label FILEPATH_FLEET_COMP = IAM_OUTPUT_DIR / "fleet_files" / "fleet_all_vehicles.csv" FILEPATH_IMAGE_TRUCKS_FLEET_COMP = ( @@ -157,18 +157,18 @@ def create_fleet_vehicles( # fleet data does not go below 2015 if year < 2015: year = 2015 - #print( + # print( # "Vehicle fleet data is not available before 2015. " # "Hence, 2015 is used as fleet year." - #) + # ) # fleet data does not go beyond 2050 if year > 2050: year = 2050 - #print( + # print( # "Vehicle fleet data is not available beyond 2050. " # "Hence, 2050 is used as fleet year." - #) + # ) # We filter electric vehicles by year of manufacture available_years = np.arange(2000, 2055, 5)