-
Notifications
You must be signed in to change notification settings - Fork 1
/
robust_voltage_control_nonlinear.py
270 lines (228 loc) · 10.8 KB
/
robust_voltage_control_nonlinear.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
from __future__ import annotations
from collections.abc import Sequence
import io
from typing import Any
import cvxpy as cp
import numpy as np
import pandapower as pp
import scipy.io as spio
from tqdm.auto import tqdm
from network_utils import np_triangle_norm
from robust_voltage_control import update_dists
from utils import solve_prob
from voltplot import VoltPlot
# ==== BEGINNING OF NONLINEAR MODIFICATIONS: loading data ====
# active and reactive load data
load = spio.loadmat('orig_data/loadavail20150908.mat', squeeze_me=True)
scale = 1.1
load_p = np.stack(load['Load']['kW']) / 1000 # to MW
load_p *= scale
load_q = np.stack(load['Load']['kVar']) / 1000 # to MVar
load_q *= scale
# Solar data
T = 14421
N = 55
gen_p = np.zeros((N, T))
gen_q = np.zeros((N, T))
solar_orig = spio.loadmat('orig_data/pvavail20150908_2.mat', squeeze_me=True)
capacities = np.array([
9.97, 11.36, 13.53, 6.349206814, 106.142148, 154, 600, 293.54, 66.045,
121.588489, 12.94935415, 19.35015173, 100, 31.17327501, 13.06234596,
7.659505852, 100, 700]) # in kW
capacities /= 1000 # to MW
# for whatever reason, Guanan scales the capacities by a factor of 7
# - see line 39 in dynamic_simu_setting_revision_2nd.m
capacities *= 7
# see Generate_PV_power.m
pv_profile = solar_orig['PVavail'][0]['PVp_6s'] / solar_orig['PVavail'][0]['PVacrate']
pv = pv_profile * capacities.reshape(-1, 1) # shape [18, 14421]
solar_index = np.array([9,12,14,16,19,10,11,13,15,7,2,4,20,23,25,26,32,8]) - 2
gen_p[solar_index,:] = pv
# Check data
solar = spio.loadmat('data/PV.mat', squeeze_me=True)['actual_PV_profile'] # shape [14421]
pq_fluc = spio.loadmat('data/pq_fluc.mat', squeeze_me=True)['pq_fluc'] # shape [55, 2, 14421]
all_p = pq_fluc[:, 0] # shape [n, T]
all_q = pq_fluc[:, 1] # shape [n, T]
nodal_injection = -load_p + gen_p
assert np.allclose(all_p, nodal_injection) # check if load_p - gen_p = total active injection
assert np.allclose(-load_q, all_q)
assert np.allclose(gen_p.sum(axis=0), solar)
# ==== END OF NONLINEAR MODIFICATIONS ====
def robust_voltage_control(
p: np.ndarray, qe: np.ndarray,
v_lims: tuple[Any, Any], q_lims: tuple[Any, Any], v_nom: Any,
net: pp.pandapowerNet,
X: np.ndarray, R: np.ndarray,
Pv: np.ndarray, Pu: np.ndarray,
eta: float, ε: float, v_sub: float, β: float,
sel: Any, δ: float = 0.,
ctrl_nodes: Sequence[int] | None = None,
pbar: tqdm | None = None,
log: tqdm | io.TextIOBase | None = None,
volt_plot: VoltPlot | None = None, volt_plot_update: int = 100,
save_params_every: int = 100
) -> tuple[np.ndarray, np.ndarray, dict[str, list],
dict[int, np.ndarray | tuple[np.ndarray, float]],
dict[str, list]]:
"""Runs robust voltage control.
Args
- p: np.array, shape [T, n], active power injection (MW)
- qe: np.array, shape [T, n], exogenous reactive power injection (MVar)
- v_lims: tuple (v_min, v_max), squared voltage magnitude limits (kV^2)
- v_min, v_max could be floats, or np.arrays of shape [n]
- q_lims: tuple (q_min, q_max), reactive power injection limits (MVar)
- q_min, q_max could be floats, or np.arrays of shape [n]
- v_nom: float or np.array of shape [n], desired nominal voltage
- net: pandapower network, used for nonlinear voltage simulation
- X: np.array, shape [n, n], line parameters for reactive power injection
- R: np.array, shape [n, n], line parameters for active power injection
- Pv: np.array, shape [n, n], quadratic (PSD) cost matrix for voltage
- Pu: np.array, shape [n, n], quadratic (PSD) cost matrix for control
- eta: float, noise bound (kV^2)
- ε: float, robustness buffer (kV^2)
- v_sub: float, fixed squared voltage magnitude at substation (kV^2)
- β: float, weight for slack variable
- sel: nested convex body chasing object (e.g., CBCProjection)
- δ: float, weight of noise term in CBC norm when learning eta
- set to 0 if eta is known
- ctrl_nodes: list of int, nodes that we can control voltages for
- pbar: optional tqdm, progress bar
- log: optional log output
- volt_plot: VoltPlot
- volt_plot_update: int, time steps between updating volt_plot
- save_params_every: int, time steps between saving estimated model params
Returns
- vs: np.array, shape [T, n]
- qcs: np.array, shape [T, n]
- dists: dict, keys ['t', '*_true', '*_prev'], values are lists
- 't': list of int, time steps at which model updates occurred,
i.e., X̂(t) != X̂(t-1). X̂(t) is generated by data up to and
including v(t), q^c(t), u(t-1)
- '*_true': list of float, ‖X̂-X‖_△ after each model update
(and likewise for η and (X,η), if learning η)
- '*_prev': list of float, ‖X̂(t)-X̂(t-1)‖_△ after each model update
(and likewise for η and (X,η), if learning η)
- params: dict, keys are time step t, values the estimated model params
after observing vs[t], qcs[t].
- if delta is None: np.array, shape [n, n]
- if delta is given: tuple of (np.ndarray, float)
- check_prediction: dict, maps keys ('adaptive_linear', 'fixed_optimal_linear')
to lists of prediction error (scalars)
"""
assert p.shape == qe.shape
T, n = qe.shape
if log is None:
log = tqdm()
log.write(f'‖X‖_△ = {np_triangle_norm(X):.2f}')
dists: dict[str, list] = {'t': [], 'X_true': [], 'X_prev': []}
check_prediction: dict[str, list] = {'adaptive_linear':[], 'fixed_optimal_linear':[]}
X̂_prev = None
v_min, v_max = v_lims
q_min, q_max = q_lims
vs = sel.v # shape [T, n], vs[t] denotes v(t)
qcs = sel.q # shape [T, n], qcs[t] denotes q^c(t)
# vpars = qe @ X + p @ R + v_sub # shape [T, n], vpars[t] denotes vpar(t)
# assert np.array_equal(vs[0], vpars[0])
nonlinear_vpars = np.load('data/nonlinear_voltage_baseline.npy')[:, 1:] # nonlinear modification
nonlinear_vpars = (nonlinear_vpars * 12.)**2
assert np.array_equal(vs[0], nonlinear_vpars[0])
# we need to use `u` as the variable instead of `qc_next` in order to
# make the problem DPP-convex
u = cp.Variable(n, name='u')
ξ = cp.Variable(nonneg=True, name='ξ') # slack variable
q_norm_2 = np.linalg.norm(np.ones(n) * (q_max-q_min), ord=2)
if δ > 0: # learning eta
dists |= {'η': [], 'η_prev': [], 'X_η_prev': []}
etahat_prev = None
rho = ε * δ / (1 + δ * q_norm_2)
etahat = cp.Parameter(nonneg=True, name='̂η')
k = etahat + rho * (1/δ + cp.norm2(u))
else:
rho = ε / q_norm_2
k = eta + rho * cp.norm(u, p=2)
log.write(f'rho(ε={ε:.2f}) = {rho:.3f}')
# parameters are placeholders for given values
vt = cp.Parameter(n, name='v')
qct = cp.Parameter(n, name='qc')
X̂ = cp.Parameter((n, n), PSD=True, name='X̂')
qc_next = qct + u
v_next = vt + u @ X̂
obj = cp.Minimize(cp.quad_form(v_next - v_nom, Pv)
+ cp.quad_form(u, Pu)
+ β * ξ**2)
constraints = [
q_min <= qc_next, qc_next <= q_max,
v_min + k - ξ <= v_next, v_next <= v_max - k + ξ
]
if ctrl_nodes is not None:
all_nodes = np.arange(n)
unctrl_nodes = np.setdiff1d(all_nodes, ctrl_nodes).tolist()
constraints.append(u[unctrl_nodes] == 0)
prob = cp.Problem(objective=obj, constraints=constraints)
# if cp.Problem is DPP, then it can be compiled for speedup
# - http://cvxpy.org/tutorial/advanced/index.html#disciplined-parametrized-programming
assert prob.is_dcp(dpp=True)
log.write(f'Robust Oracle prob is DPP?: {prob.is_dcp(dpp=True)}')
if pbar is not None:
log.write('pbar present')
pbar.reset(total=T-1)
params: dict[int, np.ndarray | tuple[np.ndarray, float]] = {}
for t in range(T-1): # t = 0, ..., T-2
# fill in Parameters
if δ > 0: # learning eta
X̂.value, etahat.value = sel.select(t)
update_dists(dists, t, X_info=(X̂.value, X̂_prev, X),
η_info=(etahat.value, etahat_prev, eta), δ=δ, log=log)
etahat_prev = float(etahat.value) # save a copy
if (t+1) % save_params_every == 0:
params[t] = (np.array(X̂.value), etahat_prev)
else:
X̂.value = sel.select(t)
update_dists(dists, t, X_info=(X̂.value, X̂_prev, X), log=log)
if (t+1) % save_params_every == 0:
params[t] = np.array(X̂.value) # save a copy
# satisfied, msg = sel._check_newest_obs(t, X)
# if not satisfied:
# print(msg)
# log.write(f't={t} linear X* does not satisfy the nonlinear constraints: {msg}')
X̂_prev = np.array(X̂.value) # save a copy
qct.value = qcs[t]
vt.value = vs[t]
solve_prob(prob, log=log, name=f't={t}. robust oracle')
if prob.status == 'infeasible':
raise RuntimeError('robust controller infeasible')
qcs[t+1] = qc_next.value
# ==== BEGINNING OF NONLINEAR MODIFICATIONS ====
# vs[t+1] = vpars[t+1] + (qc_next.value) @ X
# print('linear: ', np.linalg.norm(vs[t+1]))
net.load['p_mw'] = load_p[:,t]
net.load['q_mvar'] = load_q[:,t]
net.sgen['p_mw'] = gen_p[:,t]
net.sgen['q_mvar'] = gen_q[:,t] + qc_next.value
pp.runpp(net, algorithm='bfsw', init='dc')
vnext = net.res_bus.iloc[1:].vm_pu.to_numpy()
vs[t+1] = (12. * vnext)**2
check_prediction['fixed_optimal_linear'].append(
np.linalg.norm(vs[t+1] - (nonlinear_vpars[t+1] + qc_next.value @ X)))
check_prediction['adaptive_linear'].append(
np.linalg.norm(vs[t+1] - (nonlinear_vpars[t+1] + qc_next.value @ X̂.value)))
# print('nonlinear: ', np.linalg.norm(vs[t+1]))
# ==== END OF NONLINEAR MODIFICATIONS ====
sel.add_obs(t+1)
# log.write(f't = {t}, ‖u‖_1 = {np.linalg.norm(u.value, 1)}')
if volt_plot is not None and (t+1) % volt_plot_update == 0:
volt_plot.update(qcs=qcs[:t+2],
vs=np.sqrt(vs[:t+2]),
vpars=np.sqrt(nonlinear_vpars[:t+2]),
dists=(dists['t'], dists['X_true']))
volt_plot.show(clear_display=False)
if pbar is not None:
pbar.update()
if (t+1) % volt_plot_update == 0:
log.write(f't={t}. robust oracle progress.')
# update voltplot at the end of run
if volt_plot is not None:
volt_plot.update(qcs=qcs, vs=np.sqrt(vs), vpars=np.sqrt(nonlinear_vpars),
dists=(dists['t'], dists['X_true']))
volt_plot.show(clear_display=False)
return vs, qcs, dists, params, check_prediction