diff --git a/src/lisfloodutilities/gridding/configuration/1arcmin/config_pr6.txt b/src/lisfloodutilities/gridding/configuration/1arcmin/config_pr6.txt index 0f7f211..40e8662 100755 --- a/src/lisfloodutilities/gridding/configuration/1arcmin/config_pr6.txt +++ b/src/lisfloodutilities/gridding/configuration/1arcmin/config_pr6.txt @@ -19,7 +19,7 @@ LONG_NAME = 6 Hourly Accumulated Precipitation # 1304 - EURO4M-APGD # 1310 - HNMS # 1329 - ERA5-land -KIWIS_FILTER_PLUGIN_CLASSES = {'DowgradedObservationsKiwisFilter': {'1304': 1.0, '1302': 1.0, '1295': 1.0}, 'ObservationsKiwisFilter': {'1303': 100.0, '1329': 100.0}} +# KIWIS_FILTER_PLUGIN_CLASSES = {'DowgradedObservationsKiwisFilter': {'1304': 1.0, '1302': 1.0, '1295': 1.0}, 'ObservationsKiwisFilter': {'1303': 100.0, '1329': 100.0}} [VAR_TIME] diff --git a/src/lisfloodutilities/gridding/configuration/1arcmin/config_ta6.txt b/src/lisfloodutilities/gridding/configuration/1arcmin/config_ta6.txt index 823cbaa..e762d32 100755 --- a/src/lisfloodutilities/gridding/configuration/1arcmin/config_ta6.txt +++ b/src/lisfloodutilities/gridding/configuration/1arcmin/config_ta6.txt @@ -13,6 +13,16 @@ DATA_TYPE_PACKED = i2 STANDARD_NAME = air_temperature LONG_NAME = 6 Hourly Average Temperature +# 1280 - IMGW +# 1295 - MARS +# 1302 - CarpatClim +# 1303 - ERAinterim +# 1304 - EURO4M-APGD +# 1310 - HNMS +# 1323 - ICON +# 1329 - ERA5-land +KIWIS_FILTER_PLUGIN_CLASSES = {'ObservationsKiwisFilter': {'1329': 700.0}, 'ProvidersKiwisFilter': {'1323': [('2022-01-01 06:00:00', '2023-12-31 06:00:00')]}} + [VAR_TIME] UNIT_PATTERN = hours since %%Y-%%m-%%d %%H:%%M:%%S.%%f diff --git a/src/lisfloodutilities/gridding/configuration/1arcmin/config_tn.txt b/src/lisfloodutilities/gridding/configuration/1arcmin/config_tn.txt index f1ff0f5..132b4fd 100755 --- a/src/lisfloodutilities/gridding/configuration/1arcmin/config_tn.txt +++ b/src/lisfloodutilities/gridding/configuration/1arcmin/config_tn.txt @@ -13,6 +13,16 @@ DATA_TYPE_PACKED = i2 STANDARD_NAME = air_temperature LONG_NAME = Daily Minimum Temperature +# 1280 - IMGW +# 1295 - MARS +# 1302 - CarpatClim +# 1303 - ERAinterim +# 1304 - EURO4M-APGD +# 1310 - HNMS +# 1323 - ICON +# 1329 - ERA5-land +KIWIS_FILTER_PLUGIN_CLASSES = {'ObservationsKiwisFilter': {'1329': 700.0}, 'ProvidersKiwisFilter': {'1323': [('2022-01-01 06:00:00', '2023-12-31 06:00:00')]}} + [VAR_TIME] UNIT = days since 1990-01-01 06:00:00.0 diff --git a/src/lisfloodutilities/gridding/configuration/1arcmin/config_tx.txt b/src/lisfloodutilities/gridding/configuration/1arcmin/config_tx.txt index be2d1cf..36296cd 100755 --- a/src/lisfloodutilities/gridding/configuration/1arcmin/config_tx.txt +++ b/src/lisfloodutilities/gridding/configuration/1arcmin/config_tx.txt @@ -13,6 +13,16 @@ DATA_TYPE_PACKED = i2 STANDARD_NAME = air_temperature LONG_NAME = Daily Maximum Temperature +# 1280 - IMGW +# 1295 - MARS +# 1302 - CarpatClim +# 1303 - ERAinterim +# 1304 - EURO4M-APGD +# 1310 - HNMS +# 1323 - ICON +# 1329 - ERA5-land +KIWIS_FILTER_PLUGIN_CLASSES = {'ObservationsKiwisFilter': {'1329': 700.0}, 'ProvidersKiwisFilter': {'1323': [('2022-01-01 18:00:00', '2023-12-31 18:00:00')]}} + [VAR_TIME] UNIT = days since 1990-01-01 18:00:00.0 diff --git a/src/lisfloodutilities/gridding/lib/filters.py b/src/lisfloodutilities/gridding/lib/filters.py index 67729a8..b05227d 100644 --- a/src/lisfloodutilities/gridding/lib/filters.py +++ b/src/lisfloodutilities/gridding/lib/filters.py @@ -235,6 +235,43 @@ def has_neighbor_within_radius_from_other_providers(self, row: pd.Series, tree: return False +class ProvidersKiwisFilter(KiwisFilter): + """ + Class to filter Kiwis files metadata for stations that belong to a list of providers and inside a defined list of time intervals. + Expects to have in filter_args a dictionary containing the provider ID whose stations we want to + filter (as key) and an array of pairs of start and end dates defining the intervals to filter the station from. + filter_args = {1121: [('1992-01-02 06:00:00', '1993-01-01 06:00:00'), ('1995-01-02 06:00:00', '1996-01-01 06:00:00')]} + """ + + def __init__(self, filter_columns: dict = {}, filter_args: dict = {}, var_code: str = '', quiet_mode: bool = False): + super().__init__(filter_columns, filter_args, var_code, quiet_mode) + # Getting the intervals and providers. {(start1, end2): [provider_id1, provider_id2]} + print('args:', self.args) + self.provider_intervals = {} + for provider_id in self.args: + time_intervals = self.args[provider_id] + for time_interval in time_intervals: + start, end = time_interval + start = dt.strptime(start, "%Y-%m-%d %H:%M:%S") + end = dt.strptime(end, "%Y-%m-%d %H:%M:%S") + cur_interval = (start, end) + if cur_interval not in self.provider_intervals: + self.provider_intervals[cur_interval] = [] + self.provider_intervals[cur_interval].append(provider_id) + print('provider_intervals:', self.provider_intervals) + + def apply_filter(self, df: pd.DataFrame) -> pd.DataFrame: + df = super().apply_filter(df) + # Filter providers only if current file datetime belongs to any of the intervals + for time_interval in self.provider_intervals: + start, end = time_interval + if start <= self.cur_timestamp and end >= self.cur_timestamp: + providers_to_remove = self.provider_intervals[time_interval] + df = df[~df[self.COL_PROVIDER_ID].isin(providers_to_remove)] + self.print_statistics(df) + return df + + class DowgradedObservationsKiwisFilter(ObservationsKiwisFilter): """ Class to filter Kiwis files metadata for stations whose daily data was down graded to 6hourly data diff --git a/src/lisfloodutilities/gridding/tools/get_stats_from_kiwis_logs.py b/src/lisfloodutilities/gridding/tools/get_stats_from_kiwis_logs.py new file mode 100644 index 0000000..554bb10 --- /dev/null +++ b/src/lisfloodutilities/gridding/tools/get_stats_from_kiwis_logs.py @@ -0,0 +1,143 @@ + +__author__="Goncalo Gomes" +__date__="$Jun 06, 2024 10:45:00$" +__version__="0.1" +__updated__="$Jun 06, 2024 10:45:00$" + +""" +Copyright 2019-2020 European Union +Licensed under the EUPL, Version 1.2 or as soon they will be approved by the European Commission subsequent versions of the EUPL (the "Licence"); +You may not use this work except in compliance with the Licence. +You may obtain a copy of the Licence at: +https://joinup.ec.europa.eu/sites/default/files/inline-files/EUPL%20v1_2%20EN(1).txt +Unless required by applicable law or agreed to in writing, software distributed under the Licence is distributed on an "AS IS" basis, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the Licence for the specific language governing permissions and limitations under the Licence. + +""" + +import sys +import os +from pathlib import Path +from argparse import ArgumentParser, ArgumentTypeError +import pandas as pd +import json +import csv +from datetime import datetime, timedelta +from lisfloodutilities.gridding.lib.utils import FileUtils + + +COL_OUTPUT_QUALITY_CODE_WRONG = 'QUALITY_CODE_WRONG' +COL_OUTPUT_TOTAL_OBSERVATIONS = 'TOTAL_OBSERVATIONS' +COL_OUTPUT_TIMESTAMP = 'TIMESTAMP' +COL_OUTPUT_VAR_CODE = 'VAR_CODE' +COL_OUTPUT_PROVIDER_ID = 'PROVIDER_ID' + + +def run(statfile: str, outfile: str): + + outfilepath = Path(outfile) + # Create the output parent folders if not exist yet + Path(outfilepath.parent).mkdir(parents=True, exist_ok=True) + + statfilepath = Path(statfile) + print(f'Reading statistics file: {statfilepath}') + df_stats = pd.read_csv(statfilepath, sep="\t") + provider_ids = sorted(df_stats[COL_OUTPUT_PROVIDER_ID].unique()) + + print('provider_ids:', provider_ids) + + first_timestamp_cell = df_stats[COL_OUTPUT_TIMESTAMP].iloc[0] + yyyy = first_timestamp_cell[:4] + + ncols = len(provider_ids) + + out_row1 = [yyyy, 'DP'] + out_row1.extend(provider_ids) + out_row2 = [yyyy, 'average stations with data'] + out_row2.extend([''] * ncols) + out_row3 = [yyyy, 'average error'] + out_row3.extend([''] * ncols) + out_row4 = [yyyy, 'max number of errors in a day'] + out_row4.extend([''] * ncols) + + i = 0 + for provider_id in provider_ids: + average_stations = df_stats.loc[df_stats[COL_OUTPUT_PROVIDER_ID] == provider_id, COL_OUTPUT_TOTAL_OBSERVATIONS].mean() + out_row2[2 + i] = round(average_stations,0) + average_error = df_stats.loc[df_stats[COL_OUTPUT_PROVIDER_ID] == provider_id, COL_OUTPUT_QUALITY_CODE_WRONG].mean() + out_row3[2 + i] = round(average_error) + max_error = df_stats.loc[df_stats[COL_OUTPUT_PROVIDER_ID] == provider_id, COL_OUTPUT_QUALITY_CODE_WRONG].max() + out_row4[2 + i] = round(max_error) + i += 1 + + with open(outfilepath, 'a', newline='') as file: + writer = csv.writer(file, delimiter='\t') + writer.writerow(out_row1) + writer.writerow(out_row2) + writer.writerow(out_row3) + writer.writerow(out_row4) + + print(f'Wrote file: {outfilepath}') + + +def main(argv): + '''Command line options.''' + global quiet_mode + + program_name = os.path.basename(sys.argv[0]) + program_path = os.path.dirname(os.path.realpath(sys.argv[0])) + program_version = "v%s" % __version__ + program_build_date = "%s" % __updated__ + + program_version_string = 'version %s (%s)\n' % (program_version, program_build_date) + program_longdesc = ''' + This script extracts kiwis logged statistics into another tab separated file. + ''' + program_license = """ + Copyright 2019-2020 European Union + Licensed under the EUPL, Version 1.2 or as soon they will be approved by the European Commission subsequent versions of the EUPL (the "Licence"); + You may not use this work except in compliance with the Licence. + You may obtain a copy of the Licence at: + https://joinup.ec.europa.eu/sites/default/files/inline-files/EUPL%20v1_2%20EN(1).txt + Unless required by applicable law or agreed to in writing, software distributed under the Licence is distributed on an "AS IS" basis, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the Licence for the specific language governing permissions and limitations under the Licence. + """ + + # try: + if True: + # setup option parser + parser = ArgumentParser(epilog=program_license, description=program_version_string+program_longdesc) + + # set defaults + # parser.set_defaults(input_wildcard='*.tsv') + + parser.add_argument("-s", "--stat", dest="statfile", required=True, type=FileUtils.file_type, + help="Set input file containing kiwis statistics name (*.tsv).", + metavar="/path/to/kiwis_stats_ws_2001.tsv") + parser.add_argument("-o", "--out", dest="outfile", required=True, type=FileUtils.file_type, + help="Set output file name (*.tsv).", + metavar="/path/to/output_file.tsv") + + # process options + args = parser.parse_args(argv) + + print(f"Statistics File: {args.statfile}") + print(f"Output File: {args.outfile}") + + run(args.statfile, args.outfile) + print("Finished.") + # except Exception as e: + # indent = len(program_name) * " " + # sys.stderr.write(program_name + ": " + repr(e) + "\n") + # sys.stderr.write(indent + " for help use --help") + # return 2 + + +def main_script(): + sys.exit(main(sys.argv[1:])) + + +if __name__ == "__main__": + main_script() diff --git a/src/lisfloodutilities/gridding/tools/insert_stats_from_kiwis_code_160.py b/src/lisfloodutilities/gridding/tools/insert_stats_from_kiwis_code_160.py new file mode 100644 index 0000000..fb61c26 --- /dev/null +++ b/src/lisfloodutilities/gridding/tools/insert_stats_from_kiwis_code_160.py @@ -0,0 +1,171 @@ + +__author__="Goncalo Gomes" +__date__="$Jun 06, 2024 10:45:00$" +__version__="0.1" +__updated__="$Jun 06, 2024 10:45:00$" + +""" +Copyright 2019-2020 European Union +Licensed under the EUPL, Version 1.2 or as soon they will be approved by the European Commission subsequent versions of the EUPL (the "Licence"); +You may not use this work except in compliance with the Licence. +You may obtain a copy of the Licence at: +https://joinup.ec.europa.eu/sites/default/files/inline-files/EUPL%20v1_2%20EN(1).txt +Unless required by applicable law or agreed to in writing, software distributed under the Licence is distributed on an "AS IS" basis, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the Licence for the specific language governing permissions and limitations under the Licence. + +""" + +import sys +import os +from pathlib import Path +from argparse import ArgumentParser, ArgumentTypeError +import pandas as pd +import json +from datetime import datetime, timedelta +from lisfloodutilities.gridding.lib.utils import Config, FileUtils + + +COL_PROVIDER_ID = 'site_no' +COL_QUALITY_CODE = 'q_code' +QUALITY_CODE_WRONG = 160 +COL_OUTPUT_QUALITY_CODE_WRONG = 'QUALITY_CODE_WRONG' +COL_OUTPUT_TOTAL_OBSERVATIONS = 'TOTAL_OBSERVATIONS' +COL_OUTPUT_TIMESTAMP = 'TIMESTAMP' +COL_OUTPUT_VAR_CODE = 'VAR_CODE' +COL_OUTPUT_PROVIDER_ID = 'PROVIDER_ID' + + +def set_values(row: pd.Series, timestamp: str, var_code: str, provider_id: str, total_wrong_values: int) -> pd.Series: + if str(row[COL_OUTPUT_TIMESTAMP]) == timestamp and str(row[COL_OUTPUT_VAR_CODE]) == var_code and str(row[COL_OUTPUT_PROVIDER_ID]) == provider_id: + row[COL_OUTPUT_QUALITY_CODE_WRONG] += total_wrong_values + row[COL_OUTPUT_TOTAL_OBSERVATIONS] += total_wrong_values + return row + +def get_timestamp_from_filename(conf: Config, filename: Path) -> datetime: + file_timestamp = datetime.strptime(filename.name, conf.input_timestamp_pattern) + if conf.force_time is not None: + new_time = datetime.strptime(conf.force_time, "%H%M").time() + file_timestamp = datetime.combine(file_timestamp.date(), new_time) + return file_timestamp + +def run(config_filename: str, infolder: str, variable_code: str, statfile: str, outfile: str): + + conf = Config(config_filename) + + INPUT_TIMESTAMP_PATTERN = conf.input_timestamp_pattern + inwildcard = conf.input_wildcard + + netcdf_offset_file_date = int(conf.get_config_field('VAR_TIME','OFFSET_FILE_DATE')) + + outfilepath = Path(outfile) + # Create the output parent folders if not exist yet + Path(outfilepath.parent).mkdir(parents=True, exist_ok=True) + + statfilepath = Path(statfile) + print(f'Reading statistics file: {statfilepath}') + df_stats = pd.read_csv(statfilepath, sep="\t") + + for filename in sorted(Path(infolder).rglob(inwildcard)): + print(f'Processing file: {filename}') + df_kiwis = pd.read_csv(filename, sep="\t") + file_timestamp = get_timestamp_from_filename(conf, filename) + file_timestamp = file_timestamp + timedelta(days=netcdf_offset_file_date) + timestamp = file_timestamp.strftime('%Y-%m-%d %H:%M:%S') + new_df = df_kiwis.groupby([COL_PROVIDER_ID, COL_QUALITY_CODE]).size().reset_index(name='count') + new_df = new_df.loc[new_df[COL_QUALITY_CODE] == QUALITY_CODE_WRONG] + if not new_df.empty: + for index, row in new_df.iterrows(): + provider_id = str(row[COL_PROVIDER_ID]) + total_wrong_values = int(row['count']) + df_stats = df_stats.apply(set_values, axis=1, timestamp=timestamp, var_code=variable_code, + provider_id=provider_id, total_wrong_values=total_wrong_values) + df_stats.to_csv(outfilepath, index=False, header=True, sep='\t') + print(f'Wrote file: {outfilepath}') + print(df_stats) + + +def main(argv): + '''Command line options.''' + global quiet_mode + + program_name = os.path.basename(sys.argv[0]) + program_path = os.path.dirname(os.path.realpath(sys.argv[0])) + program_version = "v%s" % __version__ + program_build_date = "%s" % __updated__ + + program_version_string = 'version %s (%s)\n' % (program_version, program_build_date) + program_longdesc = ''' + This script parses a list of log files containing statistics in the form of dictionary and converts the logged statistics into one tab separated file. + ''' + program_license = """ + Copyright 2019-2020 European Union + Licensed under the EUPL, Version 1.2 or as soon they will be approved by the European Commission subsequent versions of the EUPL (the "Licence"); + You may not use this work except in compliance with the Licence. + You may obtain a copy of the Licence at: + https://joinup.ec.europa.eu/sites/default/files/inline-files/EUPL%20v1_2%20EN(1).txt + Unless required by applicable law or agreed to in writing, software distributed under the Licence is distributed on an "AS IS" basis, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the Licence for the specific language governing permissions and limitations under the Licence. + """ + + try: + # setup option parser + parser = ArgumentParser(epilog=program_license, description=program_version_string+program_longdesc) + + # set defaults + # parser.set_defaults(input_wildcard='*.kiwi') + + parser.add_argument("-i", "--in", dest="infolder", required=True, type=FileUtils.folder_type, + help="Set input folder path with kiwis files (*.kiwi)", + metavar="/input/folder/kiwis/") + parser.add_argument("-s", "--stat", dest="statfile", required=True, type=FileUtils.file_type, + help="Set input file containing kiwis statistics name (*.tsv).", + metavar="/path/to/kiwis_stats_file.tsv") + parser.add_argument("-o", "--out", dest="outfile", required=True, type=FileUtils.file_type, + help="Set output file name (*.tsv).", + metavar="/path/to/output_file.tsv") + parser.add_argument("-v", "--var", dest="variable_code", required=True, + help="Set the variable to be processed.", + metavar="{pr,pd,tn,tx,ws,rg,...}") + parser.add_argument("-c", "--conf", dest="config_type", required=True, + help="Set the grid configuration type to use.", + metavar="{5x5km, 1arcmin,...}") + parser.add_argument("-p", "--pathconf", dest="config_base_path", required=False, type=FileUtils.folder_type, + help="Overrides the base path where the configurations are stored.", + metavar="/path/to/config") + + # process options + args = parser.parse_args(argv) + + configuration_base_folder = os.path.join(program_path, '../../src/lisfloodutilities/gridding/configuration') + if args.config_base_path is not None and len(args.config_base_path) > 0: + configuration_base_folder = args.config_base_path + + file_utils = FileUtils(args.variable_code) + + config_type_path = file_utils.get_config_type_path(configuration_base_folder, args.config_type) + + config_filename = file_utils.get_config_file(config_type_path) + + print(f"Variable: {args.variable_code}") + print(f"Input Folder: {args.infolder}") + print(f"Statistics File: {args.statfile}") + print(f"Output File: {args.outfile}") + print(f"Config File: {config_filename}") + + run(config_filename, args.infolder, args.variable_code, args.statfile, args.outfile) + print("Finished.") + except Exception as e: + indent = len(program_name) * " " + sys.stderr.write(program_name + ": " + repr(e) + "\n") + sys.stderr.write(indent + " for help use --help") + return 2 + + +def main_script(): + sys.exit(main(sys.argv[1:])) + + +if __name__ == "__main__": + main_script() diff --git a/src/lisfloodutilities/gridding/tools/read_stats_from_logs.py b/src/lisfloodutilities/gridding/tools/read_stats_from_logs.py index b0efa78..4a902ad 100644 --- a/src/lisfloodutilities/gridding/tools/read_stats_from_logs.py +++ b/src/lisfloodutilities/gridding/tools/read_stats_from_logs.py @@ -1,5 +1,3 @@ -from dask.dataframe.io.tests.test_json import df -from pandas.tests.io.test_fsspec import df1 __author__="Goncalo Gomes" __date__="$Jun 06, 2024 10:45:00$" __version__="0.1" @@ -38,10 +36,20 @@ def merge_kiwis_stats(df: pd.DataFrame, search_string: str = ''): KWIWS_SEARCH_STRING = '#KIWIS_STATS: ' result_df = df if search_string == KWIWS_SEARCH_STRING: + result_df['COUNT_ROWS'] = 0 group_cols = ['TIMESTAMP', 'VAR_CODE', 'PROVIDER_ID'] agg_dict = {'QUALITY_CODE_VALID': 'min', 'QUALITY_CODE_SUSPICIOUS': 'min', - 'QUALITY_CODE_WRONG': 'max', 'TOTAL_OBSERVATIONS': 'max'} + 'QUALITY_CODE_WRONG': 'max', 'TOTAL_OBSERVATIONS': 'max', 'COUNT_ROWS': 'count'} result_df = result_df.groupby(group_cols).agg(agg_dict).reset_index() + # At every application of a filter (gridding.lib.filters.py) an entry in the log is + # written for each provider that was not filtered. Therefore, all providers should have + # the same number of rows and if a provider have less means it was completely filtered out + # and needs to be removed from this df. + result_df = result_df.loc[result_df['COUNT_ROWS'] == result_df['COUNT_ROWS'].max()] + result_df.reset_index(drop=True, inplace=True) + result_df = result_df.drop('COUNT_ROWS', axis=1) + result_df.reset_index(drop=True, inplace=True) + # Getting only the necessary columns result_df.columns = ['TIMESTAMP', 'VAR_CODE', 'PROVIDER_ID', 'QUALITY_CODE_VALID', 'QUALITY_CODE_SUSPICIOUS', 'QUALITY_CODE_WRONG', 'TOTAL_OBSERVATIONS'] result_df.reset_index(drop=True, inplace=True) @@ -68,14 +76,14 @@ def run(infolder: str, outfile: str, search_string: str, inwildcard: str = '*.lo if not df.empty: out_df = df elif not df.empty and have_the_same_columns(out_df, df): - out_df = pd.concat([out_df], df) + out_df = pd.concat([out_df, df]) else: print('ERROR: Both datasets do not have the same columns') if out_df is None or out_df.empty: print('WARNING: No lines containing statistics where found.') else: - out_df = out_df.drop_duplicates() out_df = merge_kiwis_stats(out_df, search_string) + out_df = out_df.drop_duplicates() out_df.to_csv(outfilepath, index=False, header=True, sep='\t') print(f'Wrote file: {outfilepath}') print(out_df) @@ -131,7 +139,7 @@ def main(argv): args = parser.parse_args(argv) print(f"Input Folder: {args.infolder}") - print(f"Output Filer: {args.outfile}") + print(f"Output File: {args.outfile}") print(f"Input Wildcard: {args.input_wildcard}") print(f'Search String: [{args.search_string}]')