From 217e0ec78c0253c0d6f38d5ae735b80422966970 Mon Sep 17 00:00:00 2001 From: Edward Byers Date: Mon, 9 Oct 2023 14:55:43 +0200 Subject: [PATCH] testing functions import, black format, and add mapo plot script --- rime/bivariate_scores.py | 400 +++++++++++++++++++++++++++++ rime/generate_aggregated_inputs.py | 150 ++++++----- rime/process_config.py | 5 +- rime/process_maps.py | 38 ++- rime/process_tabledata.py | 3 +- rime/rime_functions.py | 112 +++++--- 6 files changed, 577 insertions(+), 131 deletions(-) create mode 100644 rime/bivariate_scores.py diff --git a/rime/bivariate_scores.py b/rime/bivariate_scores.py new file mode 100644 index 0000000..e1b5ad3 --- /dev/null +++ b/rime/bivariate_scores.py @@ -0,0 +1,400 @@ +# -*- coding: utf-8 -*- +""" +Created on Tue Aug 29 09:20:24 2023 + +@author: werning +""" + +import sys + +sys.path.append("H:\\git\\climate_impacts_processing") + +import seaborn as sns +import xarray as xr +import numpy as np +import hotspots_functions_tables as hft +import hotspots_functions_pp as hfp +import os +import matplotlib.pyplot as plt +import holoviews as hv +import hvplot.xarray +import hvplot.pandas +import itertools as it +from scipy.interpolate import interp1d + + +input_dir = "C:\\Users\\werning\\IIASA\\ECE.prog - Research Theme - NEXUS\\Hotspots_Explorer_2p0\\Data\\1_mmm_stats" +score_dir = "C:\\Users\\werning\\IIASA\ECE.prog - Research Theme - NEXUS\\Hotspots_Explorer_2p0\\Data\\3_scores" +diff_dir = "C:\\Users\\werning\\IIASA\ECE.prog - Research Theme - NEXUS\\Hotspots_Explorer_2p0\\Data\\2_differences" +std_dir = "C:\\Users\\werning\\IIASA\\ECE.prog - Research Theme - NEXUS\\Hotspots_Explorer_2p0\\Data\\score_revision\\hist_standard_deviations_V2\\" +plot_dir = "C:\\Users\\werning\\IIASA\ECE.prog - Research Theme - NEXUS\\Hotspots_Explorer_2p0\\Data\\score_revision\\bivariate" +yaml_path = "H:\\git\\climate_impacts_processing\\hotspots.yml" +landmask_path = "H:\\git\\climate_impacts_processing\\landareamaskmap0.nc" +kg_class_path = "C:\\Users\\werning\\IIASA\\ECE.prog - Research Theme - NEXUS\\Hotspots_Explorer_2p0\\Data\\kg_class.nc" + +# Set interpolation & load landarea mask and parameters +land_mask = hfp.load_landmask(landmask_path) +params = hfp.load_parameters(yaml_path) +with xr.open_dataset(kg_class_path, engine="netcdf4") as kg_class: + kg_class.load() +kg_class = kg_class.sel(band=0) + +indicators = [ + "cdd", + "dri", + "dri_qtot", + "ia_var", + "ia_var_qtot", + "precip", + "sdd", + "sdd_24p0", + "sdd_20p0", + "sdd_18p3", + "seas", + "seas_qtot", + "tr20", + "wsi", +] +# indicators = ['seas'] +gmt_threshold = 3.0 + +# %% Functions +# ----------------------------------------------------------------------------- + +# def bin_data(data, bins): + +# print(bins) + +# no_impact = xr.where(data < bins[1], bins[0], data) +# low_impact = xr.where((no_impact < bins[2]) & (no_impact >= bins[1]), 1, no_impact) +# medium_impact = xr.where((low_impact < bins[3]) & (low_impact >= bins[2]), 2, low_impact) +# high_impact = xr.where(medium_impact >= bins[3], 3, medium_impact) + +# return high_impact + + +def bin_data(data, bins): + data = xr.where(data > bins[3], bins[3], data) + data = xr.where(data < bins[0], bins[0], data) + + m = interp1d(bins, [0, 1, 2, 3]) + pt = m(data.values) + data.values = pt + + return data + + +# combination = [f'{ind}_{i}' for i in (variables)] + +# indic_bins = {i: params[i]['score_range'] for i in variables} +# # indic_bins = {i: score_ranges[i] for i in combination} + +# for var in variables: + +# # data[var] = xr.where(data[var] > max(indic_bins[f'{ind}_{var}']), +# # max(indic_bins[f'{ind}_{var}']), data[var]) +# # data[var] = xr.where(data[var] < min(indic_bins[f'{ind}_{var}']), +# # min(indic_bins[f'{ind}_{var}']), data[var]) + +# # m = interp1d(indic_bins[f'{ind}_{var}'], score_bins) + +# data[var] = xr.where(data[var] > max(indic_bins[var]), +# max(indic_bins[var]), data[var]) +# data[var] = xr.where(data[var] < min(indic_bins[var]), +# min(indic_bins[var]), data[var]) + +# m = interp1d(indic_bins[var], score_bins) + + +# pt = m(data[var].values) +# data[var].values = pt + +# return data + +# %% Calculation +# ----------------------------------------------------------------------------- + +plot_list = [] + +for ind in indicators: + # Select mean, apply land mask and remove inf + historical = xr.open_dataset( + os.path.join( + input_dir, + "Historical", + f'ISIMIP{params["protocol"][ind]}_MM_historical_{ind}.nc4', + ) + ) + historical = hfp.apply_land_mask(historical, land_mask) + historical = historical.where(historical.pipe(np.isfinite)).fillna(np.nan) + + # Select mean, apply land mask and remove inf + future = xr.open_dataset( + os.path.join(input_dir, f'ISIMIP{params["protocol"][ind]}_MM_{ind}_0p1.nc4') + ) + future = hfp.apply_land_mask(future, land_mask) + future = future.where(future.pipe(np.isfinite)).fillna(np.nan) + + if ind == "seas" or ind == "seas_qtot": + historical = historical.where((historical < 5) & (historical > -5)) + future = future.where((future < 5) & (future > -5)) + + if ind == "ia_var" or ind == "ia_var_qtot": + historical = historical.where((historical < 2) & (historical > -2)) + future = future.where((future < 2) & (future > -2)) + + # Load score data for plotting + scores = xr.open_dataset( + os.path.join(score_dir, f'ISIMIP{params["protocol"][ind]}_Scores_{ind}_0p1.nc4') + ) + + # Load diff data for plotting + diff = xr.open_dataset( + os.path.join(diff_dir, f'ISIMIP{params["protocol"][ind]}_Diff_{ind}_0p1.nc4') + ) + diff = diff.where(diff.pipe(np.isfinite)).fillna(np.nan) + + for var in params["indicators"][ind]: + # Select variable and rename it + hist_data = historical[var][1].rename("hist") + plot_hist = hist_data.hvplot( + x="lon", + y="lat", + shared_axes=False, + cmap=params["indicators"][ind][var]["ind_cmap"], + clim=( + params["indicators"][ind][var]["ind_min"], + params["indicators"][ind][var]["ind_max"], + ), + title="Absolute - historical", + ) + + # Select variable and rename it + future_data = future.sel({"threshold": gmt_threshold}) + future_data = future_data[var][1, :, :].rename( + str(gmt_threshold).replace(".", "p") + ) + # plot_future = future_data.hvplot(x='lon', y='lat', shared_axes=False, cmap = params["indicators"][ind][var]['ind_cmap'], clim=(params["indicators"][ind][var]['ind_min'], params["indicators"][ind][var]['ind_max']), title=f'Absolute - {str(gmt_threshold).replace(".", "p")}') + + # Select variable and rename it + diff_data = diff.sel({"threshold": gmt_threshold}) + diff_data = diff_data[var][0].rename("diff") + # plot_diff = diff_data.hvplot(x='lon', y='lat', shared_axes=False, cmap=params["indicators"][ind][var]['diff_cmap'], clim=(params["indicators"][ind][var]['diff_min'], params["indicators"][ind][var]['diff_max']), title=f'Difference - {str(gmt_threshold).replace(".", "p")}') + + # Select variable and rename it + score_data = scores.sel({"threshold": gmt_threshold}) + score_data = score_data[var][0].rename("score") + # plot_score = score_data.hvplot(x='lon', y='lat', cmap='magma_r', shared_axes=False, title=f'Score - {str(gmt_threshold).replace(".", "p")}') + + # Load standard deviation data, apply land mask, remove inf and rename + std_dev = hft.load_netcdf( + os.path.join( + std_dir, + f'{params["indicators"][ind][var]["short_name"]}_hist_std_GCM_avg.nc', + ) + ) + std_dev = hfp.apply_land_mask(std_dev, land_mask) + std_dev = std_dev.where(std_dev.pipe(np.isfinite)).fillna(np.nan) + std_dev = std_dev.rename("std") + + # Calculate change in standard deviation + std_change = (future_data - hist_data) / std_dev + std_change = std_change.where(std_change.pipe(np.isfinite)).fillna(np.nan) + std_change = std_change.rename("std_change") + # plot_z = std_change.hvplot(x='lon', y='lat', cmap='magma_r', clim=(0,3), shared_axes=False, title='Z score') + + # Joint plots + # hist_plot = sns.jointplot(x=hist_data.to_dataframe()['hist'], y=std_change.to_dataframe()['std_change']) + # hist_plot.ax_marg_x.set_xlim(0, 1800) + # hist_plot.ax_marg_y.set_ylim(0, 50) + # hist_plot.savefig(os.path.join(plot_dir, f'{params["indicators"][ind][var]["short_name"]}_joint_hist_std_change.png')) + # future_plot = sns.jointplot(x=future_data.to_dataframe()[str(gmt_threshold).replace(".", "p")], y=std_change.to_dataframe()['std_change']) + # future_plot.ax_marg_x.set_xlim(0, 1800) + # future_plot.ax_marg_y.set_ylim(0, 50) + # future_plot.savefig(os.path.join(plot_dir, f'{params["indicators"][ind][var]["short_name"]}_joint_{str(gmt_threshold).replace(".", "p")}_std_change.png')) + + joint_merged = xr.merge([hist_data, std_change]).to_dataframe() + # plot_joint_hist = joint_merged.hvplot.scatter(x='hist', y='std_change', shared_axes=False, cmap=['blue'], title='Joint plot hist - z score') + + # Bin data + std_change_binned = bin_data(std_change, [0, 1, 2, 3]) + + # Calculate quartiles + bins = hist_data.quantile([0, 0.25, 0.5, 0.75]) + + # Quartile bins + hist_data_binned = bin_data(hist_data, bins) + future_data_binned = bin_data(future_data, bins) + + # Create bivariate score + hist_bivariate = std_change_binned + hist_data_binned + hist_bivariate = hist_bivariate.rename("std_score") + future_bivariate = std_change_binned + future_data_binned + future_bivariate = future_bivariate.rename("std_score") + # plot_score_quartile = future_bivariate.hvplot(x='lon', y='lat', cmap='magma_r', shared_axes=False, clim=(0,6), title='Score - Quartiles') + + color_cycle = hv.Cycle( + [ + "#fcfdbf", + "#feb078", + "#f1605d", + "#b5367a", + "#721f81", + "#2c115f", + "#000004", + ] + ) + explicit_mapping = { + "0": "#fcfdbf", + "1": "#feb078", + "2": "#f1605d", + "3": "#b5367a", + "4": "#721f81", + "5": "#2c115f", + "6": "#000004", + } + + # + joint_hist_quartiles_merged = xr.merge( + [hist_data, std_change, hist_bivariate] + ).to_dataframe() + # plot_joint_hist_quartiles = joint_hist_quartiles_merged.hvplot.scatter(x='hist', y='std_change', by='std_score', color=explicit_mapping, title='Joint plot hist - z score quartiles') + joint_future_quartiles_merged = xr.merge( + [future_data, std_change, future_bivariate] + ).to_dataframe() + # plot_joint_hist_quartiles = joint_hist_quartiles_merged.hvplot.scatter(x='hist', y='std_change', color='std_score', cmap=explicit_mapping, title='Joint plot hist - z score quartiles') + # plot_joint_future_quartiles = joint_future_quartiles_merged.hvplot.scatter(x=f'{str(gmt_threshold).replace(".", "p")}', y='std_change', color='std_score', cmap=explicit_mapping, title=f'Joint plot {str(gmt_threshold).replace(".", "p")} - z score quartiles') + + # plot_joint_hist_quartiles = joint_hist_quartiles_merged.hvplot.scatter(x='hist', y='std_change', color='std_score', cmap='magma_r', title='Joint plot hist - z score quartiles').redim.range(std_score=(0, 6)) + # plot_joint_future_quartiles = joint_future_quartiles_merged.hvplot.points(x=f'{str(gmt_threshold).replace(".", "p")}', y='std_change', color='std_score', cmap='magma_r', title=f'Joint plot {str(gmt_threshold).replace(".", "p")} - z score quartiles').redim.range(std_score=(0, 6)) + + # # Joint plot - coloured with score + # joint_hist = xr.merge([hist_data, std_change, hist_bivariate]).to_dataframe() + # sns.jointplot(data=joint_hist, x='hist', y='std_change', c=joint_hist.std_score.dropna(), joint_kws={"color":None, 'cmap':'magma_r'}) + # joint_future = xr.merge([future_data, std_change, future_bivariate]).to_dataframe() + # sns.jointplot(data=joint_future, x='2p0', y='std_change', c=joint_future.std_score.dropna(), joint_kws={"color":None, 'cmap':'magma_r'}) + + kg_hist_all = xr.DataArray( + data=np.full([len(hist_data.lat), len(hist_data.lon)], np.nan), + coords={"lat": hist_data.lat, "lon": hist_data.lon}, + ) + kg_future_all = xr.DataArray( + data=np.full([len(hist_data.lat), len(hist_data.lon)], np.nan), + coords={"lat": hist_data.lat, "lon": hist_data.lon}, + ) + + for k in range(1, 6): + kg_hist_data = hist_data.where(kg_class == k).kg_class + kg_future_data = future_data.where(kg_class == k).kg_class + kg_std_change_binned = std_change_binned.where(kg_class == k).kg_class + + if var == "wsi": + kg_bins = [0.1, 0.2, 0.3, 0.4] + else: + kg_bins = kg_hist_data.quantile([0, 0.25, 0.5, 0.75]).values + print(f"{k} - {kg_bins}") + kg_hist_data_binned = bin_data(kg_hist_data, kg_bins) + kg_future_data_binned = bin_data(kg_future_data, kg_bins) + + kg_hist_bivariate = kg_std_change_binned + kg_hist_data_binned + kg_future_bivariate = kg_std_change_binned + kg_future_data_binned + + kg_hist_all = xr.where(kg_class == k, kg_hist_bivariate, kg_hist_all) + kg_future_all = xr.where(kg_class == k, kg_future_bivariate, kg_future_all) + + # fig = plt.figure() + # kg_hist_bivariate.plot(cmap='magma_r', vmin=0, vmax=6) + # plt.savefig(os.path.join(plot_dir, f'{params["indicators"][ind][var]["short_name"]}_kg_class_{k}_score_hist.png')) + # plt.close() + + # fig = plt.figure() + # kg_future_bivariate.plot(cmap='magma_r', vmin=0, vmax=6) + # plt.savefig(os.path.join(plot_dir, f'{params["indicators"][ind][var]["short_name"]}_kg_class_{k}_score_{str(gmt_threshold).replace(".", "p")}.png')) + # plt.close() + + # plot_kg_score_quartile = kg_future_all.hvplot(x='lon', y='lat', cmap='magma_r', shared_axes=False, clim=(0,6), title='Score - Quartiles - KG') + plot_kg_score_quartile = kg_future_all.hvplot( + x="lon", + y="lat", + cmap="magma_r", + shared_axes=False, + clim=(0, 6), + title=f'{params["indicators"][ind][var]["long_name"]}', + ) + + # Create kg bivariate score + kg_hist_all = kg_hist_all.kg_class.rename("std_score") + kg_future_all = kg_future_all.kg_class.rename("std_score") + + # + joint_hist_kg_quartiles_merged = xr.merge( + [hist_data, std_change, kg_hist_all] + ).to_dataframe() + joint_future_kg_quartiles_merged = xr.merge( + [future_data, std_change, kg_future_all] + ).to_dataframe() + # plot_joint_hist_kg_quartiles = joint_hist_kg_quartiles_merged.hvplot.scatter(x='hist', y='std_change', by='std_score', color=color_cycle, title='Joint plot hist - z score quartiles KG') + # plot_joint_future_kg_quartiles = joint_future_kg_quartiles_merged.hvplot.scatter(x=f'{str(gmt_threshold).replace(".", "p")}', y='std_change', by='std_score', color=color_cycle, title=f'Joint plot {str(gmt_threshold).replace(".", "p")} - z score quartiles KG') + # plot_joint_hist_kg_quartiles = joint_hist_kg_quartiles_merged.hvplot.scatter(x='hist', y='std_change', color='std_score', cmap='magma_r', title='Joint plot hist - z score quartiles KG').redim.range(std_score=(0, 6)) + # plot_joint_future_kg_quartiles = joint_future_kg_quartiles_merged.hvplot.scatter(x=f'{str(gmt_threshold).replace(".", "p")}', y='std_change', color='std_score', cmap='magma_r', title=f'Joint plot {str(gmt_threshold).replace(".", "p")} - z score quartiles KG').redim.range(std_score=(0, 6)) + + # print('saving plot') + # plot_list = [plot_future, plot_diff, plot_score, + # plot_joint_hist, plot_hist, plot_z, + # plot_joint_hist_quartiles, plot_joint_future_quartiles, plot_score_quartile, + # plot_joint_hist_kg_quartiles, plot_joint_future_kg_quartiles, plot_kg_score_quartile] + + # plot_list = [plot_future, plot_diff, plot_score, + # plot_joint_hist, plot_hist, plot_z, + # plot_joint_hist_quartiles, plot_joint_future_quartiles, plot_score_quartile] + + plot_list = plot_list + [plot_kg_score_quartile] + + # plot = hv.Layout(plot_list).cols(3) + # hvplot.save(plot, f'{params["indicators"][ind][var]["short_name"]}_bivariate_dashboard_{str(gmt_threshold).replace(".", "p")}_interp.html') + + plot = ( + hv.Layout(plot_list) + .cols(3) + .opts(title=f'{str(gmt_threshold).replace(".", "p")}') + ) + hvplot.save( + plot, + f'All_indicators_bivariate_dashboard_{str(gmt_threshold).replace(".", "p")}_interp.html', + ) + + # fig = plt.figure() + # kg_hist_all.kg_class.plot(cmap='magma_r', vmin=0, vmax=6) + # plt.savefig(os.path.join(plot_dir, f'{params["indicators"][ind][var]["short_name"]}_kg_quartiles_score_hist.png')) + # plt.close() + + # fig = plt.figure() + # kg_future_all.kg_class.plot(cmap='magma_r', vmin=0, vmax=6) + # plt.savefig(os.path.join(plot_dir, f'{params["indicators"][ind][var]["short_name"]}_kg_quartiles_score_{str(gmt_threshold).replace(".", "p")}.png')) + # plt.close() + + # Manual bins + # hist_data_binned = bin_data(hist_data, [10, 50, 100]) + # future_data_binned = bin_data(future_data, [10, 50, 100]) + + # fig = plt.figure() + # hist_bivariate.plot(cmap='magma_r', vmin=0, vmax=6) + # plt.savefig(os.path.join(plot_dir, f'{params["indicators"][ind][var]["short_name"]}_quartiles_score_hist.png')) + # plt.close() + + # fig = plt.figure() + # future_bivariate.plot(cmap='magma_r', vmin=0, vmax=6) + # plt.savefig(os.path.join(plot_dir, f'{params["indicators"][ind][var]["short_name"]}_quartiles_score_{str(gmt_threshold).replace(".", "p")}.png')) + # plt.close() + +# temp4 = bin_data(t20_std_change, [1, 2, 3]) +# temp4_abs = bin_data(t20_hist, [10, 50, 100]) +# temp4_2p0 = bin_data(t20_2p0, [10, 50, 100]) + +# temp4 = bin_data(sdd_std_change, [1, 2, 3]) +# temp4_abs = bin_data(sdd_hist, [250, 750, 1250]) +# temp4_2p0 = bin_data(sdd_2p0, [250, 750, 1250]) + +# test = bin_data(pr20_std_change, [1, 2, 3]) +# temp_abs = bin_data(pr20_hist, [5, 10, 20]) +# temp4_2p0 = bin_data(pr20_2p0, [5, 10, 20]) diff --git a/rime/generate_aggregated_inputs.py b/rime/generate_aggregated_inputs.py index 3035499..4e3c2a4 100644 --- a/rime/generate_aggregated_inputs.py +++ b/rime/generate_aggregated_inputs.py @@ -4,35 +4,38 @@ @author: byers """ - +# from alive_progress import alive_bar import glob # import numpy as np import os -import pandas as pd import pyam import re import xarray as xr import time -from alive_progress import alive_bar - -start = time.time() +# try: -from process_config import * +# ab_present = True +# except: +# print("alive_progress not installed") +# ab_present = False -# Load impact, interpolate +# start = time.time() +from rime.rime_functions import * +from rime.process_config import * -# few_vars = True +few_vars = False # %% Settings config - years = list(range(yr_start, yr_end + 1)) # %% List of indicators and files + +# Define a list of table data files to read in os.chdir(wdtable_input) files = glob.glob(table_output_format.replace("|", "*")) @@ -49,10 +52,8 @@ # files = files[4:5] # heatwav & iavar # files = files[5:10] # files = files[10:12] -files = files[12:15] -# files = files[15:] - -# batch = 1 +# files = files[12:15] +files = files[15:] indicators = [] @@ -111,72 +112,67 @@ # df.drop(columns=['model', 'scenario'], inplace=True) # if few_vars: - # df = df.loc[df.variable.str.contains('High')] small_vars = list(set([x.split("|")[0] for x in dfin.variable])) - - with alive_bar( - total=len(small_vars), - title="Processing", - length=10, - force_tty=True, - bar="circles", - spinner="elements", - ) as bar: - for vari in small_vars: - df_ind = loop_interpolate_gmt(df.loc[df.variable.str.startswith(vari)]) - # dfbig = pd.concat([dfbig, df_ind]) - print(f"dfbig: indicator {ind[0]}: {time.time()-istart}") - - # % Convert and save out to xarray - # dfbig.dropna(how='all') - - dfp = df_ind.melt( - id_vars=[ - "model", - "scenario", - "variable", - "region", - "unit", - "SSP", - "GMT", - ], - value_vars=years, - var_name="year", - ) # change to df_big if concatenating multiple - - dfp.columns = [x.lower() for x in dfp.columns] - - dsout = xr.Dataset() - - for indicator in dfp.variable.unique(): - print(indicator) - dx = ( - dfp.loc[dfp.variable == indicator] - .set_index(["gmt", "year", "ssp", "region"]) - .to_xarray() - ) - # dx.attrs['unit'] = dx.assign_coords({'unit':dx.unit.values[0,0,0,0]}) - - dsout[indicator] = dx["value"].to_dataset(name=indicator)[indicator] - dsout[indicator].attrs["unit"] = dx.unit.values[0, 0, 0, 0] - # dsout = dsout[indicator].assign_coords({'unit':dx.unit.values[0,0,0,0]}) - - # bar() - - dsout["ssp"] = [x.upper() for x in dsout["ssp"].values] - # dsout = dsout.drop_vars('unit') - - # % Write out - print("Writing out... ") - comp = dict(zlib=True, complevel=5) - encoding = {var: comp for var in dsout.data_vars} - - # ind = indicator.replace('|', '_') - filename = f"{output_dir}{vari}_{region}.nc" - - dsout.to_netcdf(filename, encoding=encoding) - bar() + # if ab_present: + # with alive_bar(total=len(small_vars), + # title='Processing', length=10, force_tty=True, + # bar='circles', + # spinner='elements') as bar: + + # print('alive bar present') + # Apply function here + for vari in small_vars: + df_ind = loop_interpolate_gmt( + df.loc[df.variable.str.startswith(vari)], yr_start, yr_end + ) + # dfbig = pd.concat([dfbig, df_ind]) + print(f"dfbig: indicator {ind[0]}: {time.time()-istart}") + + # % Convert and save out to xarray + # dfbig.dropna(how='all') + + dfp = df_ind.melt( + id_vars=[ + "model", + "scenario", + "variable", + "region", + "unit", + "SSP", + "GMT", + ], + value_vars=years, + var_name="year", + ) # change to df_big if concatenating multiple + + dfp.columns = [x.lower() for x in dfp.columns] + + dsout = xr.Dataset() + + for indicator in dfp.variable.unique(): + print(indicator) + dx = ( + dfp.loc[dfp.variable == indicator] + .set_index(["gmt", "year", "ssp", "region"]) + .to_xarray() + ) + # dx.attrs['unit'] = dx.assign_coords({'unit':dx.unit.values[0,0,0,0]}) + dsout[indicator] = dx["value"].to_dataset(name=indicator)[indicator] + dsout[indicator].attrs["unit"] = dx.unit.values[0, 0, 0, 0] + # dsout = dsout[indicator].assign_coords({'unit':dx.unit.values[0,0,0,0]}) + + dsout["ssp"] = [x.upper() for x in dsout["ssp"].values] + # dsout = dsout.drop_vars('unit') + + # % Write out + print("Writing out... ") + comp = dict(zlib=True, complevel=5) + encoding = {var: comp for var in dsout.data_vars} + filename = f"{output_dir}{vari}_{region}.nc" + dsout.to_netcdf(filename, encoding=encoding) + # if ab_present: + # bar() # ============================================================================= diff --git a/rime/process_config.py b/rime/process_config.py index c7c96ed..24aa757 100644 --- a/rime/process_config.py +++ b/rime/process_config.py @@ -16,8 +16,7 @@ # From generate_aggregated_inputs.py -region = "COUNTRIES" -# region = 'R10' +region = "COUNTRIES" # 'R10' or 'COUNTRIES' table_output_format = f"table_output_|_{region}.csv" @@ -64,7 +63,7 @@ elif env == "ebro3": - # wd = 'H:\\' + wd = "H:\\" wd2 = "H:\\rcre_testing\\" # Input source of processed climate data by ssp/year/variable/region diff --git a/rime/process_maps.py b/rime/process_maps.py index ea7c955..65613c5 100644 --- a/rime/process_maps.py +++ b/rime/process_maps.py @@ -11,28 +11,28 @@ """ - +import dask import glob import numpy as np import os -import xarray as xr -import yaml import pyam import time -import dask -import dask.array as da +import xarray as xr +import yaml from dask import delayed from dask.distributed import Client -from process_config import * -from rime_functions import * +from rime.process_config import * +from rime.rime_functions import * # from dask.diagnostics import Profiler, ResourceProfiles, CacheProfiler dask.config.set(scheduler="processes") -dask.config.set(num_workers=72) - +dask.config.set(num_workers=num_workers) client = Client() +# dask.config.set(scheduler='synchronous') + + with open(yaml_path, "r") as f: params = yaml.full_load(f) @@ -44,7 +44,7 @@ dft = np.round(dft.as_pandas()[pyam.IAMC_IDX + ["year", "value", "Ssp_family"]], 2) # Replace & fill missing SSP scenario allocation # dft.Ssp_family.replace(sspdic, inplace=True) # metadata must have Ssp_faily column. If not SSP2 automatically chosen -# dft.loc[dft.Ssp_family.isnull(), ssp_meta_col] = 'ssp2' +dft.loc[dft.Ssp_family.isnull(), ssp_meta_col] = "ssp2" dft = pyam.IamDataFrame(dft) @@ -70,7 +70,12 @@ # df = pyam.IamDataFrame(dft) map_out_MS = map_transform_gmt_multi_dask( - dft.filter(model="POLES ADVANCE"), mapdata, use_dask=True, gmt_name="threshold" + dft.filter(model="POLES ADVANCE"), + mapdata, + years, + use_dask=True, + gmt_name="threshold", + interpolation=interpolation, ) comp = dict(zlib=True, complevel=5) @@ -123,6 +128,7 @@ map_out_MI = map_transform_gmt_multi_dask( dft.filter(model="AIM*", scenario="SSP1-34"), mapdata, + years, use_dask=False, gmt_name="threshold", ) @@ -137,13 +143,5 @@ # 1 scenarios, 5 indis = 14 seconds # 1 scenario, 8 indis = 22s, = 1.8s/s dask=False +# 1 scenario, 12 indis = 62s, = 5s/s dask=False, 24 workers. # 1... 32s dask=True - -# %% Define functions - - -# def test_map_transform_gmt(): -# return x - - -# def test_map_transform_gmt_multi(): diff --git a/rime/process_tabledata.py b/rime/process_tabledata.py index de22b12..011f269 100644 --- a/rime/process_tabledata.py +++ b/rime/process_tabledata.py @@ -36,8 +36,7 @@ files = filesall[15:] if len(files) == 0: - print("no files!") - dsfsdfsdfsdf + raise Exception("No files!") # load input IAMC scenarios file df_scens_in = pyam.IamDataFrame(fname_input_scenarios) diff --git a/rime/rime_functions.py b/rime/rime_functions.py index b3b7bbb..fe757f1 100644 --- a/rime/rime_functions.py +++ b/rime/rime_functions.py @@ -1,22 +1,33 @@ # -*- coding: utf-8 -*- """ rime_functions.py +Set of core functions for RIME """ +import dask +from dask import delayed +import numpy as np +import pandas as pd +import pyam +import xarray as xr -def loop_interpolate_gmt(df): + +def loop_interpolate_gmt(df, yr_start, yr_end): """ - Loop through the variables and regions in df and interpolate the impacts data + Loop through the variables and regions in df and interpolate the impacts data between the global warming levels Parameters ---------- df : pandas.DataFrame() - Df is in IAMC wide format, with columns model/scenario/region/unit/variable/[years] + df is in IAMC wide format, with columns model/scenario/region/unit/variable/[years] + yr_start : int # 2010 # 2015 + start year of the interpolated data + yr_end : int # 2100 # 2099 + end year of the interpolated data Returns ------- - df_ind : TYPE - DESCRIPTION. + df_ind : pandas.DataFrame() """ # if type(df_ind)==None: @@ -26,9 +37,7 @@ def loop_interpolate_gmt(df): for variable in df.variable.unique(): print(variable) - # vstart = time.time() for ssp in SSPs: - # print(ssp) for region in regions: dfs = df.loc[ (df.region == region) & (df.SSP == ssp) & (df.variable == variable) @@ -46,7 +55,6 @@ def loop_interpolate_gmt(df): dfs[cols] = dfs[cols].interpolate() dfs.drop_duplicates(inplace=True) df_ind = pd.concat([df_ind, dfs]) - # print(f'{time.time()-vstart}') return df_ind @@ -57,22 +65,20 @@ def loop_interpolate_gmt(df): def calculate_impacts_gmt( - dfx, dsi, prefix_indicator="RIME|", ssp_meta_col="ssp_family", gmt_below=1.5 + dfx, dsi, prefix_indicator="RIME|", ssp_meta_col="ssp_family", gmt_below=1.2 ): """ Takes in a set of scenarios of GMT and dataset of climate impacts by GWL, and returns a table of climate impacts per scenario through time. - + Parameters ---------- dfx : dask.DataFrame dask.DataFrame in the format of an IAMC table (see pyam.IamDataFrame) with scenario(s) as rows and only 1 variable for the global mean temperature timeseries. Needs to have a column specifying the SSP to be assumed. dsi : xarray.Dataset dimensions ['gmt','year','ssp','region'] and each variable is a climate indicator - filename : str, optional - DESCRIPTION. The default is NONE. output filename as .csv (utf-8), otherwise returns idf prefix_indicator : str, optional - DESCRIPTION. The default is 'RIME|'. Use this to change the output indicator prefix. + DESCRIPTION. The default is 'RIME|'. Use this to change the output indicator prefix in the table data variable column. ssp_meta_col : str, optional DESCRIPTION. The default is 'Ssp_family'. Use this to change the name of the meta column used to assin the SSP per scenario gmt_below : float, optional @@ -136,7 +142,7 @@ def calculate_impacts_gmt( return idf -def fix_dupes(dfti): +def fix_dupes(dfti, years): """ Function that modifies GMT temperatures minutely, in case there are duplicates in the series. @@ -175,16 +181,42 @@ def map_transform_gmt( df1, mapdata, var_name, + years, map_array=xr.Dataset(), + caution_checks=True, + drawdown_max=0.15, gmt_name="threshold", + interpolation=0.01, temp_min=1.2, temp_max=3.5, - drawdown_max=0.15, ): + """ + Takes in a set of scenarios of GMT and dataset of climate impacts by GWL, + and returns a table of climate impacts per scenario through time. + + Parameters + ---------- + df1 : dask.DataFrame + dask.DataFrame in the format of an IAMC table (see pyam.IamDataFrame) with scenario(s) as rows and only 1 variable for the global mean temperature timeseries. Needs to have a column specifying the SSP to be assumed. + mapdata : xarray.Dataset + dimensions ['gmt','year','ssp','region'] and each variable is a climate indicator + ..... + .... + ... + + Returns + ------- + idf : pyam.IamDataFrame + pyam.IamDataFrame with variables for the impact indicators for each scenario(s). + + """ + if len(df1.variable) > 1: - print("Error: more than 1 variable in DataFrame") - breakss + raise Exception("Error: more than 1 variable in DataFrame") + if len(df1.meta) > 1: + raise Exception("Error: more than 1 model-scenario in DataFrame") + # Should only be 1 scenario for model, scenario in df1.index: # Get the GMT values from df gmt_values = df1.filter(year=years).data["value"].values @@ -216,6 +248,8 @@ def map_transform_gmt( except ValueError: print("") + # Get indicator name + short = list(mapdata.data_vars)[0] # provide new interpolated vector (e.g. 0.01 gmt resolution) new_thresholds = np.arange( mapdata.gmt.min(), mapdata.gmt.max() + interpolation, interpolation @@ -267,12 +301,40 @@ def map_transform_gmt( def map_transform_gmt_multi_dask( df, mapdata, + years, gmt_name="threshold", use_dask=True, temp_min=1.2, temp_max=3.5, drawdown_max=0.15, + interpolation=0.01, ): + """ + Takes in a set of scenarios of GMT and dataset of climate impacts by GWL, + and returns a table of climate impacts per scenario through time. + + Parameters + ---------- + df : dask.DataFrame + dask.DataFrame in the format of an IAMC table (see pyam.IamDataFrame) with scenario(s) as rows and only 1 variable for the global mean temperature timeseries. Needs to have a column specifying the SSP to be assumed. + mapdata : xarray.Dataset + dimensions ['gmt','year','ssp','region'] and each variable is a climate indicator + filename : str, optional + DESCRIPTION. The default is NONE. output filename as .csv (utf-8), otherwise returns idf + # prefix_indicator : str, optional + # DESCRIPTION. The default is 'RIME|'. Use this to change the output indicator prefix. + # ssp_meta_col : str, optional + # DESCRIPTION. The default is 'Ssp_family'. Use this to change the name of the meta column used to assin the SSP per scenario + # gmt_below : float, optional + # DESCRIPTION. The default is 1.5. Assign all gmt values below gmt_below to the value of gmt_below (in case not enough data) + + Returns + ------- + idf : pyam.IamDataFrame + pyam.IamDataFrame with variables for the impact indicators for each scenario(s). + + """ + map_array = xr.Dataset() if len(df.index) > 1: @@ -296,15 +358,12 @@ def map_transform_gmt_multi_dask( if use_dask: # Create delayed task for map_transform_gmt delayed_map_transform = delayed(map_transform_gmt)( - df1, mapdata, var_name, map_array + df1, mapdata, var_name, years, map_array ) delayed_tasks.append(delayed_map_transform) else: map_array = map_transform_gmt( - df1, - mapdata, - var_name, - map_array, + df1, mapdata, var_name, years, map_array ) if use_dask: # Compute delayed tasks concurrently @@ -334,16 +393,11 @@ def map_transform_gmt_multi_dask( if use_dask: # Create delayed task for map_transform_gmt_dask delayed_map_transform = delayed(map_transform_gmt)( - df1, mapdata, var_name, map_array + df1, mapdata, var_name, years, map_array ) delayed_tasks.append(delayed_map_transform) else: - map_array = map_transform_gmt( - df1, - mapdata, - var_name, - map_array, - ) + map_array = map_transform_gmt(df1, mapdata, var_name, years, map_array) if use_dask: # Compute delayed tasks concurrently computed_results = dask.compute(*delayed_tasks)