Skip to content

Commit

Permalink
Changing method in cusum() to not require running twice.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenkehoe committed Dec 11, 2024
1 parent bb509d7 commit 161737c
Showing 1 changed file with 29 additions and 30 deletions.
59 changes: 29 additions & 30 deletions act/qc/qctests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1693,38 +1693,39 @@ def add_step_change_test(
"""

def cusum(data, k, mean_val, lower=False):
def cusum(data, k):
"""
CUSUM algrithm used to detect step changes.
CUSUM algorithm used to detect step changes.
data : numpy array
1D numpy array of time series data to analze
k : float
Reference value. This is typically half the value of the shift size change to
to be detected.
mean_val : float
Mean value of data.
lower : bool
Option to loof for lower shifts vs. upper shifts
to be detected or the size of the shift change if the data is detrended by
differencing.
Returns
-------
C : numpy array
Numpy array containing a 0 when there is no shift detected
or positive value when a shift is detected.
"""

C = np.zeros(data.size)
if lower:
for ii in range(1, data.size):
C[ii] = max(0, C[ii - 1] - (data[ii] - mean_val + k))
else:
for ii in range(1, data.size):
C[ii] = max(0, C[ii - 1] + (data[ii] - mean_val - k))
mean_val = np.nanmean(data)
Cu = np.zeros(data.size, dtype=np.float16)
Cl = np.zeros(data.size, dtype=np.float16)
for ii in range(1, data.size):
Cl[ii] = max(0.0, Cl[ii - 1] - (data[ii] - mean_val + k))
Cu[ii] = max(0.0, Cu[ii - 1] + (data[ii] - mean_val - k))

return C
return np.maximum(Cu, Cl)

data = self._ds[var_name].values
time = self._ds['time'].values
if add_nan:
from act.utils.data_utils import add_in_nan

time, data = add_in_nan(time, data)
time, data = add_in_nan(self._ds['time'].values, data)

data = data.astype(float)
if detrend:
Expand All @@ -1734,29 +1735,27 @@ def cusum(data, k, mean_val, lower=False):
if n_flagged < 0:
n_flagged = data.size

mean_val = np.nanmean(data)
index = np.full(data.size, False)
for lower in [False, True]:
C = cusum(data, k, mean_val, lower=lower)
found_ind = np.where(np.diff(C) > 0.0)[0]
for ind in found_ind:
ind = np.arange(ind, ind + n_flagged)
ind = ind[ind < data.size]
index[ind] = True
C = cusum(data, k)
found_ind = np.where(np.diff(C) > 0.0)[0]
for ind in found_ind:
ind = np.arange(ind, ind + n_flagged)
ind = ind[ind < data.size]
index[ind] = True

if add_nan:
import xarray as xr

da = xr.DataArray(index, dims=['time'], coords=[time])
result = da.sel(time=self._ds['time'])
index = result.values
del result
da = da.sel(time=self._ds['time'])
index = da.values
del da

if test_meaning is None:
test_meaning = f'Shift in data detected with CUSUM algrithm: k={round(k, 2)}'
test_meaning = f'Shift in data detected with CUSUM algorithm: k={round(k, 2)}'

if prepend_text is not None:
test_meaning = ': '.join((prepend_text, test_meaning))
test_meaning = f'{prepend_text}: {test_meaning}'

result = self._ds.qcfilter.add_test(
var_name,
Expand Down

0 comments on commit 161737c

Please sign in to comment.