Skip to content

Commit

Permalink
Adding step detection test (ARM-DOE#889)
Browse files Browse the repository at this point in the history
* Adding step detection test

* Changing method in cusum() to not require running twice.

* Correcting a misspelling.
  • Loading branch information
kenkehoe authored Dec 11, 2024
1 parent d667bb3 commit 2a6e1f8
Show file tree
Hide file tree
Showing 3 changed files with 286 additions and 0 deletions.
138 changes: 138 additions & 0 deletions act/qc/qctests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1629,3 +1629,141 @@ def add_atmospheric_pressure_test(
)

return result

def add_step_change_test(
self,
var_name,
k=1.0,
detrend=True,
n_flagged=2,
add_nan=False,
test_meaning=None,
test_assessment='Indeterminate',
test_number=None,
flag_value=False,
prepend_text=None,
):
"""
Method to detect a shift change in values using the CUSUM (cumulative sum control chart) test.
Parameters
----------
var_name : str
Data variable name in Dataset to use for testing. Results are inserted into
accompanying embedded quality control variable.
k : float
Reference value. This is typically around the value of the shift size change to
to be detected when detrend=True. Will typically be around half the value
of the shift size when no detrend is applied.
detrend : bool
Remove the trend in the data by differencing the data before
applying the CUSUM algorithm. Needed for most data that have atmospheric variability.
n_flagged : int
Number of time steps to flag in the quality control variable. Default is to
flag the point of and one after the step change since we will not know which
value of the two will be suspect. Can set to any number of time steps
or -1 to flag the remaining data after the detected step change.
add_nan : bool
Should a NaN value be added to the data where a value is missing. A value is
determined to be missing when the time step is larger than the mode of the
difference in time values. This will stop the reporting of a step when there
is an outage and the data normally rises/decends but with the gap is appears
as a step change.
test_meaning : str
Optional text description to add to flag_meanings
describing the test. Will use a default if not set.
test_assessment : str
Optional single word describing the assessment of the test.
Will use a default if not set.
test_number : int
Optional test number to use. If not set will use next available
test number.
flag_value : boolean
Indicates that the tests are stored as integers
not bit packed values in quality control variable.
prepend_text : str
Optional text to prepend to the test meaning.
Example is indicate what institution added the test.
Returns
-------
test_info : tuple
A tuple containing test information including var_name, qc
variable name, test_number, test_meaning, test_assessment
"""

def cusum(data, k):
"""
CUSUM algorithm used to detect step changes.
data : numpy array
1D numpy array of time series data to analze
k : float
Reference value. This is typically half the value of the shift size change to
to be detected or the size of the shift change if the data is detrended by
differencing.
Returns
-------
C : numpy array
Numpy array containing a 0 when there is no shift detected
or positive value when a shift is detected.
"""

mean_val = np.nanmean(data)
Cu = np.zeros(data.size, dtype=np.float16)
Cl = np.zeros(data.size, dtype=np.float16)
for ii in range(1, data.size):
Cl[ii] = max(0.0, Cl[ii - 1] - (data[ii] - mean_val + k))
Cu[ii] = max(0.0, Cu[ii - 1] + (data[ii] - mean_val - k))

return np.maximum(Cu, Cl)

data = self._ds[var_name].values
if add_nan:
from act.utils.data_utils import add_in_nan

time, data = add_in_nan(self._ds['time'].values, data)

data = data.astype(float)
if detrend:
data = np.diff(data)
data = np.append(np.nan, data)

if n_flagged < 0:
n_flagged = data.size

index = np.full(data.size, False)
C = cusum(data, k)
found_ind = np.where(np.diff(C) > 0.0)[0]
for ind in found_ind:
ind = np.arange(ind, ind + n_flagged)
ind = ind[ind < data.size]
index[ind] = True

if add_nan:
import xarray as xr

da = xr.DataArray(index, dims=['time'], coords=[time])
da = da.sel(time=self._ds['time'])
index = da.values
del da

if test_meaning is None:
test_meaning = f'Shift in data detected with CUSUM algorithm: k={round(k, 2)}'

if prepend_text is not None:
test_meaning = f'{prepend_text}: {test_meaning}'

result = self._ds.qcfilter.add_test(
var_name,
index=index,
test_number=test_number,
test_meaning=test_meaning,
test_assessment=test_assessment,
flag_value=flag_value,
)

return result
60 changes: 60 additions & 0 deletions examples/qc/plot_qc_step_change.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
This is an example for how to use the step change detection test.
The test uses the cumulative sum control chart to detect when
a sudden shift in values occurs. It has an option to insert
NaN value when there is a data gap to not have those periods
returned as a data shift. This example produces two plots,
one with the data gap flagged and one without.
"""

from matplotlib import pyplot as plt
import numpy as np

from arm_test_data import DATASETS
from act.io.arm import read_arm_netcdf


# Get example data from ARM Test Data repository
EXAMPLE_MET = DATASETS.fetch('sgpmetE13.b1.20190101.000000.cdf')
variable = 'temp_mean'
ds = read_arm_netcdf(EXAMPLE_MET, keep_variables=variable)

# Add shifts in the data
data = ds[variable].values
data[600:] += 2
data[1000:] -= 2
ds[variable].values = data

# Remove data from the Dataset to simulate instrument being off-line
ds = ds.where((ds["time.hour"] < 3) | (ds["time.hour"] > 5), drop=True)

# Add step change test
ds.qcfilter.add_step_change_test(variable)

# Add step change test but insert NaN values during period of missing data
# so it does not trip the test.
ds.qcfilter.add_step_change_test(variable, add_nan=True)

# Make plot with results from the step change test for when the missing data
# is included and a second plot without including the missing data gap.
title = 'Step change detection'
for ii in range(1, 3):
plt.figure(figsize=(10, 6))
plt.plot(ds['time'].values, ds[variable].values, label='Data')
plt.xlabel('Time')
plt.ylabel(f"{ds[variable].attrs['long_name']} ({ds[variable].attrs['units']})")
plt.title(title)
plt.grid(lw=2, ls=':')

label = 'Step change'
index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=ii)
for jj in np.where(index)[0]:
plt.axvline(x=ds['time'].values[jj], color='orange', linestyle='--', label=label)
label = None

title += ' with NaN added in data gaps'

plt.legend()
plt.show()
88 changes: 88 additions & 0 deletions tests/qc/test_qctests.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,3 +417,91 @@ def test_add_atmospheric_pressure_test():

ds.close
del ds


def test_add_step_change_test():
variable = 'temp_mean'
qc_variable = f"qc_{variable}"
ds = read_arm_netcdf(EXAMPLE_MET1, keep_variables=['temp_mean', 'atmos_pressure'])
ds.load()

result = ds.qcfilter.add_step_change_test(variable)
assert result == {
'test_number': 1,
'test_meaning': 'Shift in data detected with CUSUM algorithm: k=1.0',
'test_assessment': 'Indeterminate',
'qc_variable_name': qc_variable,
'variable_name': variable,
}
index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=1)
assert len(np.where(index)[0]) == 0
assert ds[qc_variable].attrs['flag_meanings'] == [
'Shift in data detected with CUSUM algorithm: k=1.0'
]
assert ds[qc_variable].attrs['flag_assessments'] == ['Indeterminate']

data = ds[variable].values
data[100:] -= 5
data[600:] += 4
data[800:] += 10
data[1000:] -= 2
ds[variable].values = data

ds.qcfilter.add_step_change_test(variable)
index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=2)
assert np.all(np.where(index)[0] == [99, 100, 599, 600, 799, 800, 999, 1000])
assert (
ds[qc_variable].attrs['flag_meanings'][1]
== 'Shift in data detected with CUSUM algorithm: k=1.0'
)
assert ds[qc_variable].attrs['flag_assessments'][1] == 'Indeterminate'

ds.qcfilter.add_step_change_test(variable, k=4, prepend_text='ARM')
index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=3)
assert np.all(np.where(index)[0] == [99, 100, 599, 600, 799, 800])
assert (
ds[qc_variable].attrs['flag_meanings'][2]
== 'ARM: Shift in data detected with CUSUM algorithm: k=4'
)

ds.qcfilter.add_step_change_test(variable, n_flagged=3)
index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=4)
assert np.all(
np.where(index)[0] == [99, 100, 101, 599, 600, 601, 799, 800, 801, 999, 1000, 1001]
)

ds.qcfilter.add_step_change_test(variable, n_flagged=-1, k=5.1, test_assessment='Suspect')
index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=5)
assert np.all(np.where(index)[0] == np.arange(799, 1440))
assert (
ds[qc_variable].attrs['flag_meanings'][4]
== 'Shift in data detected with CUSUM algorithm: k=5.1'
)
assert ds[qc_variable].attrs['flag_assessments'][4] == 'Suspect'

variable = 'atmos_pressure'
ds.qcfilter.add_step_change_test(variable, detrend=False)
index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=1)
assert len(np.where(index)[0]) == 0

ds.close
del ds

# Test add_nan keyword
variable = 'temp_mean'
ds = read_arm_netcdf(EXAMPLE_MET1, keep_variables=variable)
data = ds[variable].values
data[600:] += 2
ds[variable].values = data

ds = ds.where((ds["time.hour"] < 3) | (ds["time.hour"] > 5), drop=True)

ds.qcfilter.add_step_change_test(variable, add_nan=False)
index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=1)
assert np.all(np.where(index)[0] == [179, 180, 419, 420])

ds.qcfilter.add_step_change_test(variable, add_nan=True)
index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=2)
assert np.all(np.where(index)[0] == [419, 420])

del ds

0 comments on commit 2a6e1f8

Please sign in to comment.