From 7a659bb12ef51d4fc302c7e7f3ba1f3f613478d1 Mon Sep 17 00:00:00 2001 From: Wai-Shing Luk Date: Sun, 3 Sep 2023 09:34:29 +0000 Subject: [PATCH] fix according to sonarlint; test performance without parallel cut --- setup.cfg | 6 +- src/ellalgo/oracles/lowpass_oracle.py | 132 ++++++++++---------------- tests/test_lowpass.py | 20 ++-- 3 files changed, 60 insertions(+), 98 deletions(-) diff --git a/setup.cfg b/setup.cfg index 82d8646..2f414e2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -85,9 +85,9 @@ testing = # in order to write a coverage file that can be read by Jenkins. # CAUTION: --cov flags may prohibit setting breakpoints while debugging. # Comment those flags to avoid this pytest issue. -addopts = - --cov ellalgo --cov-report term-missing - --verbose +# addopts = +# --cov ellalgo --cov-report term-missing +# --verbose norecursedirs = dist build diff --git a/src/ellalgo/oracles/lowpass_oracle.py b/src/ellalgo/oracles/lowpass_oracle.py index 8626a0a..85a0c98 100644 --- a/src/ellalgo/oracles/lowpass_oracle.py +++ b/src/ellalgo/oracles/lowpass_oracle.py @@ -32,9 +32,6 @@ # delta is allowed passband ripple. # This is a convex problem (can be formulated as an SDP after sampling). -# rand('twister',sum(100*clock)) -# randn('state',sum(100*clock)) - # ********************************************************************* # filter specs (for a low-pass filter) @@ -43,114 +40,97 @@ class LowpassOracle: more_alt: bool = True - def __init__(self, N: int, wpass: float, wstop: float, Lpsq: float, Upsq: float): + def __init__(self, ndim: int, wpass: float, wstop: float, lp_sq: float, up_sq: float): # ********************************************************************* # optimization parameters # ********************************************************************* # rule-of-thumb discretization (from Cheney's Approximation Theory) - m = 15 * N - w = np.linspace(0, np.pi, m) # omega - - # A is the matrix used to compute the power spectrum - # A(w,:) = [1 2*cos(w) 2*cos(2*w) ... 2*cos(N*w)] - An = 2 * np.cos(np.outer(w, np.arange(1, N))) - self.A = np.concatenate((np.ones((m, 1)), An), axis=1) - self.nwpass: int = floor(wpass * (m - 1)) + 1 # end of passband - self.nwstop: int = floor(wstop * (m - 1)) + 1 # end of stopband - self.Lpsq = Lpsq - self.Upsq = Upsq - - def assess_optim(self, x: Arr, Spsq: float): + mdim = 15 * ndim + w = np.linspace(0, np.pi, mdim) # omega + + # spectrum is the matrix used to compute the power spectrum + # spectrum(w,:) = [1 2*cos(w) 2*cos(2*w) ... 2*cos(mdim*w)] + temp = 2 * np.cos(np.outer(w, np.arange(1, ndim))) + self.spectrum = np.concatenate((np.ones((mdim, 1)), temp), axis=1) + self.nwpass: int = floor(wpass * (mdim - 1)) + 1 # end of passband + self.nwstop: int = floor(wstop * (mdim - 1)) + 1 # end of stopband + self.lp_sq = lp_sq + self.up_sq = up_sq + + def assess_optim(self, x: Arr, sp_sq: float): """[summary] Arguments: x (Arr): coefficients of autocorrelation - Spsq (float): the best-so-far Sp^2 + sp_sq (float): the best-so-far stop_pass^2 Returns: [type]: [description] """ # 1. nonnegative-real constraint - n = len(x) self.more_alt = True # case 2, # 2. passband constraints - N, n = self.A.shape - for k in range(0, self.nwpass): - v = self.A[k, :].dot(x) - if v > self.Upsq: - g = self.A[k, :] - f = (v - self.Upsq, v - self.Lpsq) - return (g, f), None - - if v < self.Lpsq: - g = -self.A[k, :] - f = (-v + self.Lpsq, -v + self.Upsq) - return (g, f), None + mdim, ndim = self.spectrum.shape + for k in range(self.nwpass): + col_k = self.spectrum[k, :] + v = col_k.dot(x) + if v > self.up_sq: + f = (v - self.up_sq, v - self.lp_sq) + return (col_k, f), None + if v < self.lp_sq: + f = (-v + self.lp_sq, -v + self.up_sq) + return (-col_k, f), None # case 3, # 3. stopband constraint fmax = float("-inf") - imax = 0 - for k in range(self.nwstop, N): - v = self.A[k, :].dot(x) - if v > Spsq: - g = self.A[k, :] - f = (v - Spsq, v) - return (g, f), None - + kmax = 0 + for k in range(self.nwstop, mdim): + col_k = self.spectrum[k, :] + v = col_k.dot(x) + if v > sp_sq: + return (col_k, (v - sp_sq, v)), None if v < 0: - g = -self.A[k, :] - f = (-v, -v + Spsq) - return (g, f), None - + return (-col_k, (-v, -v + sp_sq)), None if v > fmax: fmax = v - imax = k + kmax = k # case 4, # 1. nonnegative-real constraint on other frequences for k in range(self.nwpass, self.nwstop): - v = self.A[k, :].dot(x) + col_k = self.spectrum[k, :] + v = col_k.dot(x) if v < 0: - f = -v - g = -self.A[k, :] - return (g, f), None # single cut + return (-col_k, -v), None # single cut self.more_alt = False # case 1 (unlikely) if x[0] < 0: - g = np.zeros(n) - g[0] = -1.0 - f = -x[0] - return (g, f), None + grade = np.zeros(ndim) + grade[0] = -1.0 + return (grad, -x[0]), None # Begin objective function - Spsq = fmax - f = (0.0, fmax) - # f = 0. - g = self.A[imax, :] - return (g, f), Spsq + return (self.spectrum[kmax, :], (0.0, fmax)), fmax # ********************************************************************* # filter specs (for a low-pass filter) # ********************************************************************* # number of FIR coefficients (including zeroth) -def create_lowpass_case(N=48): +def create_lowpass_case(ndim=48): """[summary] Keyword Arguments: - N (int): [description] (default: {48}) + mdim (int): [description] (default: {48}) Returns: [type]: [description] """ - # wpass = 0.12 * np.pi # end of passband - # wstop = 0.20 * np.pi # start of stopband - delta0_wpass = 0.025 delta0_wstop = 0.125 # maximum passband ripple in dB (+/- around 0 dB) @@ -159,25 +139,13 @@ def create_lowpass_case(N=48): delta2 = 20 * np.log10(delta0_wstop) # passband 0 <= w <= w_pass - Lp = pow(10, -delta1 / 20) - Up = pow(10, +delta1 / 20) - Sp = pow(10, +delta2 / 20) - - # ind_p = np.where(w <= wpass)[0] # passband - # Ap = A[ind_p, :] - - # # stopband (w_stop <= w) - # ind_s = np.where(wstop <= w)[0] # stopband - # As = A[ind_s, :] - - # # remove redundant contraints - # ind_beg = ind_p[-1] - # ind_end = ind_s[0] - # Anr = A[range(ind_beg + 1, ind_end), :] + low_pass = pow(10, -delta1 / 20) + up_pass = pow(10, +delta1 / 20) + stop_pass = pow(10, +delta2 / 20) - Lpsq = Lp * Lp - Upsq = Up * Up - Spsq = Sp * Sp + lp_sq = low_pass * low_pass + up_sq = up_pass * up_pass + sp_sq = stop_pass * stop_pass - omega = LowpassOracle(N, 0.12, 0.20, Lpsq, Upsq) - return omega, Spsq + omega = LowpassOracle(ndim, 0.12, 0.20, lp_sq, up_sq) + return omega, sp_sq diff --git a/tests/test_lowpass.py b/tests/test_lowpass.py index 3b731b5..c517df8 100644 --- a/tests/test_lowpass.py +++ b/tests/test_lowpass.py @@ -29,25 +29,19 @@ def run_lowpass(use_parallel_cut: bool, duration=0.000001): options.tol = 1e-8 h, _, num_iters = cutting_plane_optim(omega, ellip, Spsq, options) time.sleep(duration) - # h = spectral_fact(r) return num_iters, h is not None -# def test_no_parallel_cut(benchmark): -# result, feasible = benchmark(run_lowpass, False) -# assert feasible -# assert result >= 13334 - -# def test_w_parallel_cut(benchmark): -# result, feasible = benchmark(run_lowpass, True) -# assert feasible -# assert result <= 568 - - def test_lowpass(): - """[summary]""" + """ Test the lowpass case with parallel cut """ result, feasible = run_lowpass(True) assert feasible assert result >= 1083 assert result <= 1194 + +def test_no_parallel_cut(): + """ Test the lowpass case with no parallel cut """ + result, feasible = run_lowpass(False) + assert feasible + assert result >= 16461