diff --git a/act/qc/qctests.py b/act/qc/qctests.py index 4d57661489..b4e619d240 100644 --- a/act/qc/qctests.py +++ b/act/qc/qctests.py @@ -1629,3 +1629,141 @@ def add_atmospheric_pressure_test( ) return result + + def add_step_change_test( + self, + var_name, + k=1.0, + detrend=True, + n_flagged=2, + add_nan=False, + test_meaning=None, + test_assessment='Indeterminate', + test_number=None, + flag_value=False, + prepend_text=None, + ): + """ + Method to detect a shift change in values using the CUSUM (cumulative sum control chart) test. + + Parameters + ---------- + var_name : str + Data variable name in Dataset to use for testing. Results are inserted into + accompanying embedded quality control variable. + k : float + Reference value. This is typically around the value of the shift size change to + to be detected when detrend=True. Will typically be around half the value + of the shift size when no detrend is applied. + detrend : bool + Remove the trend in the data by differencing the data before + applying the CUSUM algorithm. Needed for most data that have atmospheric variability. + n_flagged : int + Number of time steps to flag in the quality control variable. Default is to + flag the point of and one after the step change since we will not know which + value of the two will be suspect. Can set to any number of time steps + or -1 to flag the remaining data after the detected step change. + add_nan : bool + Should a NaN value be added to the data where a value is missing. A value is + determined to be missing when the time step is larger than the mode of the + difference in time values. This will stop the reporting of a step when there + is an outage and the data normally rises/decends but with the gap is appears + as a step change. + test_meaning : str + Optional text description to add to flag_meanings + describing the test. Will use a default if not set. + test_assessment : str + Optional single word describing the assessment of the test. + Will use a default if not set. + test_number : int + Optional test number to use. If not set will use next available + test number. + flag_value : boolean + Indicates that the tests are stored as integers + not bit packed values in quality control variable. + prepend_text : str + Optional text to prepend to the test meaning. + Example is indicate what institution added the test. + + Returns + ------- + test_info : tuple + A tuple containing test information including var_name, qc + variable name, test_number, test_meaning, test_assessment + + """ + + def cusum(data, k): + """ + CUSUM algorithm used to detect step changes. + + data : numpy array + 1D numpy array of time series data to analze + k : float + Reference value. This is typically half the value of the shift size change to + to be detected or the size of the shift change if the data is detrended by + differencing. + + Returns + ------- + C : numpy array + Numpy array containing a 0 when there is no shift detected + or positive value when a shift is detected. + + """ + + mean_val = np.nanmean(data) + Cu = np.zeros(data.size, dtype=np.float16) + Cl = np.zeros(data.size, dtype=np.float16) + for ii in range(1, data.size): + Cl[ii] = max(0.0, Cl[ii - 1] - (data[ii] - mean_val + k)) + Cu[ii] = max(0.0, Cu[ii - 1] + (data[ii] - mean_val - k)) + + return np.maximum(Cu, Cl) + + data = self._ds[var_name].values + if add_nan: + from act.utils.data_utils import add_in_nan + + time, data = add_in_nan(self._ds['time'].values, data) + + data = data.astype(float) + if detrend: + data = np.diff(data) + data = np.append(np.nan, data) + + if n_flagged < 0: + n_flagged = data.size + + index = np.full(data.size, False) + C = cusum(data, k) + found_ind = np.where(np.diff(C) > 0.0)[0] + for ind in found_ind: + ind = np.arange(ind, ind + n_flagged) + ind = ind[ind < data.size] + index[ind] = True + + if add_nan: + import xarray as xr + + da = xr.DataArray(index, dims=['time'], coords=[time]) + da = da.sel(time=self._ds['time']) + index = da.values + del da + + if test_meaning is None: + test_meaning = f'Shift in data detected with CUSUM algorithm: k={round(k, 2)}' + + if prepend_text is not None: + test_meaning = f'{prepend_text}: {test_meaning}' + + result = self._ds.qcfilter.add_test( + var_name, + index=index, + test_number=test_number, + test_meaning=test_meaning, + test_assessment=test_assessment, + flag_value=flag_value, + ) + + return result diff --git a/examples/qc/plot_qc_step_change.py b/examples/qc/plot_qc_step_change.py new file mode 100644 index 0000000000..f725a3d273 --- /dev/null +++ b/examples/qc/plot_qc_step_change.py @@ -0,0 +1,60 @@ +""" + +This is an example for how to use the step change detection test. +The test uses the cumulative sum control chart to detect when +a sudden shift in values occurs. It has an option to insert +NaN value when there is a data gap to not have those periods +returned as a data shift. This example produces two plots, +one with the data gap flagged and one without. + +""" + +from matplotlib import pyplot as plt +import numpy as np + +from arm_test_data import DATASETS +from act.io.arm import read_arm_netcdf + + +# Get example data from ARM Test Data repository +EXAMPLE_MET = DATASETS.fetch('sgpmetE13.b1.20190101.000000.cdf') +variable = 'temp_mean' +ds = read_arm_netcdf(EXAMPLE_MET, keep_variables=variable) + +# Add shifts in the data +data = ds[variable].values +data[600:] += 2 +data[1000:] -= 2 +ds[variable].values = data + +# Remove data from the Dataset to simulate instrument being off-line +ds = ds.where((ds["time.hour"] < 3) | (ds["time.hour"] > 5), drop=True) + +# Add step change test +ds.qcfilter.add_step_change_test(variable) + +# Add step change test but insert NaN values during period of missing data +# so it does not trip the test. +ds.qcfilter.add_step_change_test(variable, add_nan=True) + +# Make plot with results from the step change test for when the missing data +# is included and a second plot without including the missing data gap. +title = 'Step change detection' +for ii in range(1, 3): + plt.figure(figsize=(10, 6)) + plt.plot(ds['time'].values, ds[variable].values, label='Data') + plt.xlabel('Time') + plt.ylabel(f"{ds[variable].attrs['long_name']} ({ds[variable].attrs['units']})") + plt.title(title) + plt.grid(lw=2, ls=':') + + label = 'Step change' + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=ii) + for jj in np.where(index)[0]: + plt.axvline(x=ds['time'].values[jj], color='orange', linestyle='--', label=label) + label = None + + title += ' with NaN added in data gaps' + + plt.legend() + plt.show() diff --git a/tests/qc/test_qctests.py b/tests/qc/test_qctests.py index 0a244eea7c..89e3b48830 100644 --- a/tests/qc/test_qctests.py +++ b/tests/qc/test_qctests.py @@ -417,3 +417,91 @@ def test_add_atmospheric_pressure_test(): ds.close del ds + + +def test_add_step_change_test(): + variable = 'temp_mean' + qc_variable = f"qc_{variable}" + ds = read_arm_netcdf(EXAMPLE_MET1, keep_variables=['temp_mean', 'atmos_pressure']) + ds.load() + + result = ds.qcfilter.add_step_change_test(variable) + assert result == { + 'test_number': 1, + 'test_meaning': 'Shift in data detected with CUSUM algorithm: k=1.0', + 'test_assessment': 'Indeterminate', + 'qc_variable_name': qc_variable, + 'variable_name': variable, + } + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=1) + assert len(np.where(index)[0]) == 0 + assert ds[qc_variable].attrs['flag_meanings'] == [ + 'Shift in data detected with CUSUM algorithm: k=1.0' + ] + assert ds[qc_variable].attrs['flag_assessments'] == ['Indeterminate'] + + data = ds[variable].values + data[100:] -= 5 + data[600:] += 4 + data[800:] += 10 + data[1000:] -= 2 + ds[variable].values = data + + ds.qcfilter.add_step_change_test(variable) + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=2) + assert np.all(np.where(index)[0] == [99, 100, 599, 600, 799, 800, 999, 1000]) + assert ( + ds[qc_variable].attrs['flag_meanings'][1] + == 'Shift in data detected with CUSUM algorithm: k=1.0' + ) + assert ds[qc_variable].attrs['flag_assessments'][1] == 'Indeterminate' + + ds.qcfilter.add_step_change_test(variable, k=4, prepend_text='ARM') + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=3) + assert np.all(np.where(index)[0] == [99, 100, 599, 600, 799, 800]) + assert ( + ds[qc_variable].attrs['flag_meanings'][2] + == 'ARM: Shift in data detected with CUSUM algorithm: k=4' + ) + + ds.qcfilter.add_step_change_test(variable, n_flagged=3) + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=4) + assert np.all( + np.where(index)[0] == [99, 100, 101, 599, 600, 601, 799, 800, 801, 999, 1000, 1001] + ) + + ds.qcfilter.add_step_change_test(variable, n_flagged=-1, k=5.1, test_assessment='Suspect') + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=5) + assert np.all(np.where(index)[0] == np.arange(799, 1440)) + assert ( + ds[qc_variable].attrs['flag_meanings'][4] + == 'Shift in data detected with CUSUM algorithm: k=5.1' + ) + assert ds[qc_variable].attrs['flag_assessments'][4] == 'Suspect' + + variable = 'atmos_pressure' + ds.qcfilter.add_step_change_test(variable, detrend=False) + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=1) + assert len(np.where(index)[0]) == 0 + + ds.close + del ds + + # Test add_nan keyword + variable = 'temp_mean' + ds = read_arm_netcdf(EXAMPLE_MET1, keep_variables=variable) + data = ds[variable].values + data[600:] += 2 + ds[variable].values = data + + ds = ds.where((ds["time.hour"] < 3) | (ds["time.hour"] > 5), drop=True) + + ds.qcfilter.add_step_change_test(variable, add_nan=False) + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=1) + assert np.all(np.where(index)[0] == [179, 180, 419, 420]) + + ds.qcfilter.add_step_change_test(variable, add_nan=True) + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=2) + assert np.all(np.where(index)[0] == [419, 420]) + + del ds