-
Notifications
You must be signed in to change notification settings - Fork 0
/
optimize.py
503 lines (381 loc) · 16.3 KB
/
optimize.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
from bisect import insort_left
import argparse
import numpy as np
import matplotlib.pyplot as plt
class Frame:
def __init__(self, index, value):
self.index = index
self.value = value
self.assigned = 0.0
def __lt__(self, other):
if self.value == other.value:
return self.index > other.index
return self.value < other.value
def assign(self, amount):
self.value += amount
self.assigned += abs(amount)
class PriceFrame(Frame):
def assign(self, amount):
self.assigned += abs(amount)
class ArgumentError(Exception):
def __init__(self, message):
self.message = message
def read_curve(path):
with open(path) as file:
data = file.readlines()
data = [float(datum.rstrip()) for datum in data]
return data
def append(list, item):
"""Can be used instead of insort_left when optimizing with price curves."""
list.append(item)
def mean_curve(values, samples=72):
"""
Creates a curve containing the moving average of each point in the original. The default 72
samples means that the previous 36 hours and following 35 hours will be used to generate the
moving average for the current hour.
"""
margin = int(samples / 2)
wrapped = values[len(values) - margin :] + values + values[:margin]
return np.resize(
np.convolve(wrapped, np.ones(samples), "valid") / samples,
len(values),
)
def target_curves(load_curve, mean_curve):
"""
Given the load curve and moving average mean curve, returns two new curves describing the amount
of charging or discharging which would be needed in each hour for a new curve to match the mean.
"""
deviation_curve = np.array(load_curve) - np.array(mean_curve)
charging_target = [-value if value < 0.0 else 0.0 for value in deviation_curve]
discharging_target = [
min(value, load_curve[index]) if value > 0.0 else 0.0
for (index, value) in enumerate(deviation_curve)
]
return (charging_target, discharging_target)
def optimize_simple(
data,
capacity=5000.0,
volume=50000.0,
):
"""
Runs the optimization. Returns the energy stored in the battery in each hour.
Arguments:
data - The residual load curve
Keyword arguments:
volume - The volume of the battery in MWh.
capacity - The volume of the battery in MW.
"""
# Keeps track of how much energy is in the reserve in each hour.
reserve = [0.0]
cum_sum = 0.0
for load in data:
# Excess energy: Let's charge!
if load <= 0:
# The capacity is sufficient to accomodate the load
if abs(load) <= capacity:
# The there is enough volume in the battery
if abs(load) <= (volume - reserve[-1]):
# Store the current load in the battery
reserve.append(reserve[-1] + abs(load))
# The volume is too small, only charge for as much as the volume allows
else:
# Store part of the current load in the battery, this means it is full
reserve.append(volume)
# The capacity is too small, only charge for as much as the capacity allows
else:
# Set the load to capacity
load = capacity
# The there is enough volume in the battery
if load <= (volume - reserve[-1]):
# Store the current load in the battery
reserve.append(reserve[-1] + load)
# The volume is too small, only charge for as much as the volume allows
else:
# Store part of the current load in the battery, this means it is full
reserve.append(volume)
# Net demand: let's try to discharge!
else:
# The capacity is sufficient to accomodate the load
if load <= capacity:
# The there is enough CHARGE in the battery
if load <= reserve[-1]:
# Extract the current load from the battery
reserve.append(reserve[-1] - load)
# The CHARGE is too small, only dicharge for as much as there is CHARGE
else:
# The battery is empty
reserve.append(0.0)
# The capacity is too small, only charge for as much as the capacity allows
else:
# Set the load to capacity
load = capacity
# The there is enough CHARGE in the battery
if load <= reserve[-1]:
# Extract the current load from the battery
reserve.append(reserve[-1] - load)
# The volume is too small, only charge for as much as the volume allows
else:
# The battery is empty
reserve.append(0.0)
return np.array(reserve[1:])
def optimize(
data,
charging_target,
discharging_target,
capacity=5000.0,
gradual=False,
lookbehind=72,
price_curve=None,
volume=50000.0,
):
"""
Runs the optimization. Returns the energy stored in the battery in each hour.
Arguments:
data - The residual load curve
charging_target - Curve describing the desired charging in each hour
discharging_target - Curve describing the desired discharging in each hour
Keyword arguments:
volume - The volume of the battery in MWh.
capacity - The volume of the battery in MW.
lookbehind - How many hours the algorithm can look into the past to search for the minimum.
price_curve - An optional price curve. If given, the algorithm will optimize for profit using
the price curve rather than flattening the load curve.
"""
optimize_profit = price_curve != None
insort_max = append if optimize_profit else insort_left
# All values for the year converted to a Frame.
if optimize_profit:
frames = [PriceFrame(index, value) for (index, value) in enumerate(price_curve)]
else:
frames = [Frame(index, value) for (index, value) in enumerate(data)]
# Contains all hours where there is room to discharge, sorted in ascending order (highest value
# is last).
charge_frames = sorted(
[frame for frame in frames if discharging_target[frame.index] > 0]
)
# Keeps track of how much energy is in the reserve in each hour.
reserve = np.zeros(len(data))
# Keeps track of how much energy has been assigned in each hour.
# assigned = np.zeros(len(data))
# Convert the charging and discharging targets to ndarray, constrained by the capacity of the
# battery.
charging_target = np.array([min(capacity, value) for value in charging_target])
discharging_target = np.array(
[min(capacity, value) for value in discharging_target]
)
while len(charge_frames) > 0:
max_frame = charge_frames.pop()
# Eventually contains the amount of energy to be charged at the min and discharged at the
# max frames.
available_energy = discharging_target[max_frame.index]
# The frame cannot be discharged any further (no margin between current and target load or
# the battery is already at max capacity).
if available_energy == 0:
continue
# Only charge from an hour whose value is 95% or lower than the max.
desired_low = max_frame.value * 0.95
# Contains the hour within the lookbehind periods with the minimum value.
min_frame = None
for min_index in range(
max_frame.index - 1, max(0, max_frame.index - lookbehind) - 1, -1
):
if reserve[min_index] >= volume:
# We've reached a frame already at max-capacity; therefore neither it nor an earlier
# frame will be able to charge.
break
current = frames[min_index]
# Limit charging by the remaining volume in the frame.
available_energy = min(available_energy, volume - reserve[min_index])
if (
available_energy > 0
and charging_target[current.index] > 0
and (not min_frame or current.value < min_frame.value)
and current.value < desired_low
):
min_frame = current
# We now have either the min frame, or None in which case no optimisation can be performed
# on the max frame.
if min_frame == None:
continue
# Contrain the charge/discharge by the charging target.
available_energy = min(available_energy, charging_target[min_frame.index])
if gradual and not optimize_profit:
# Take the half-way point between the peak and trough, if possible.
upper = max_frame.value
lower = min_frame.value
# Restrict the amount of energy assigned to be one twentieth of the difference between
# the max and min. This allows energy to be assigned more fairly to surrounding hours in
# later iterations.
available_energy = min(available_energy, (upper - lower) / 10)
if available_energy == 0:
continue
# Add the charge and discharge to the reserve.
reserve[(min_frame.index) : (max_frame.index)] += available_energy
min_frame.assign(available_energy)
max_frame.assign(-available_energy)
charging_target[min_frame.index] -= available_energy
discharging_target[max_frame.index] -= available_energy
if discharging_target[max_frame.index] > 0:
insort_max(charge_frames, max_frame)
return reserve
def build_targets(args, loads, capacity):
if args.constraints_path == False:
if not args.price_path:
raise ArgumentError(
"argument --no-constrain: requires that --price also be specified"
)
return (np.repeat(capacity, len(loads)), np.repeat(capacity, len(loads)))
if args.constraints_path:
charge = np.zeros(len(loads))
discharge = np.zeros(len(loads))
for (index, value) in enumerate(read_curve(args.constraints_path)):
if value < 0:
discharge[index] = abs(value)
else:
charge[index] = abs(value)
return (charge, discharge)
# If we are still missing a curve, it will be based on the moving average of the load.
return target_curves(loads, mean_curve(loads))
def run(args):
"""
Runs the optimization using the args provided on the command-line.
"""
capacity = args.capacity or args.volume / 10
loads = read_curve(args.input_path)
prices = read_curve(args.price_path) if args.price_path else None
(charging_target, discharging_target) = build_targets(args, loads, capacity)
if not prices and not args.constraints_path:
# When optimizing towards the mean, the algorithm produces better results when each value is
# converted to the difference between itself and the target. This means that instead of
# matching the absolute max with the absolute mean, we find hours which are furthest from
# the target curves.
relative_loads = np.array(loads) - np.array(mean_curve(loads))
else:
relative_loads = loads
#reserve = optimize_simple(relative_loads)
reserve = optimize(
relative_loads,
charging_target,
discharging_target,
capacity=capacity,
gradual=args.gradual,
lookbehind=args.window,
price_curve=prices,
volume=args.volume,
)
with open(args.output_path, "w") as file:
if prices == None:
file.write("index,residual_load,adjusted_load,charge,soc\n")
else:
file.write("index,residual_load,adjusted_load,charge,soc,price\n")
prev_load = 0.0
for index in range(0, len(loads)):
charge = reserve[index] - prev_load
file.write(str(index))
file.write(",")
file.write(str(loads[index]))
file.write(",")
file.write(str(loads[index] + charge))
file.write(",")
file.write(str(charge))
file.write(",")
file.write(str(reserve[index]))
if prices != None:
file.write(",")
file.write(str(prices[index]))
file.write("\n")
prev_load = reserve[index]
create_plot(loads, reserve, charging_target, discharging_target, capacity)
def create_plot(loads_array, reserve_array, charging_target, discharging_target, capacity):
# Initialise plot
plt.close()
fig, ax = plt.subplots(figsize=(25,10))
plt.subplot(2,1,1)
plt.title("P2P behaviour")
plt.xlabel("time (hours)")
plt.ylabel("MW")
# Creating the adjusted residual load curve
charge = np.diff(reserve_array)
charge = np.insert(charge, 1, 0.0)
adjusted_residual_load = loads_array + charge
#### Plotting curves
plot_min = 0
plot_max = 500
mean_load = np.average(loads_array[plot_min:plot_max])
# Creating cumulative curve (to be compared to SoC)
cumulative_loads = np.cumsum(loads_array[plot_min:plot_max] - mean_load)
smoothed_loads = mean_curve(loads_array)
plt.plot(np.array(range(plot_min,plot_max)), loads_array[plot_min:plot_max], color='g', linestyle='-', linewidth=1.0, label="Residual Load")
plt.plot(np.array(range(plot_min,plot_max)), adjusted_residual_load[plot_min:plot_max], color='g', linestyle='-', linewidth=3.0, label="Adjusted Residual Load")
plt.plot(np.array(range(plot_min,plot_max)), mean_load + charge[plot_min:plot_max], color='b', linestyle='-', linewidth=1.0, label="Charging behavior of battery")
plt.plot(np.array(range(plot_min,plot_max)), smoothed_loads[plot_min:plot_max], color='r', linestyle='--', linewidth=1.0, label="Target curve")
plt.axhline(y=mean_load + capacity, color='r', linestyle='--', label="capacity of battery (mean + cap and mean - cap)")
plt.axhline(y=mean_load, color='k', linestyle='--', label="mean")
plt.axhline(y=mean_load - capacity, color='r', linestyle='--')
plt.legend(bbox_to_anchor=[0.7, 0.6])
plt.subplot(2,1,2)
plt.title("P2P State of Charge")
plt.xlabel("time (hours)")
plt.ylabel("%")
plt.plot(np.array(range(plot_min,plot_max)), 100 * reserve_array[plot_min:plot_max] / max(reserve_array),label="SoC")
plt.plot(np.array(range(plot_min,plot_max)), -100 * cumulative_loads / max(cumulative_loads),label="Cumulative load")
plt.legend(bbox_to_anchor=[0.7, 0.8])
plt.axhline(y=0, color='r', linestyle='--')
plt.axhline(y=100, color='r', linestyle='--')
plt.show()
return
parser = argparse.ArgumentParser(
description="Flatten a residual load curve using a battery"
)
parser.add_argument("input_path", help="Path to residual load CSV file.")
parser.add_argument("output_path", help="Path to output file.")
parser.add_argument(
"-c",
"--capacity",
type=float,
help="The battery capacity in MW; defaults to 10 percent of the volume",
)
parser.add_argument(
"-v",
"--volume",
default=50000.0,
type=float,
help="The battery volume in MWh; defaults to 50_000",
)
parser.add_argument(
"-w",
"--window",
default=72,
type=int,
help="The number of hours after charging when energy must be discharged; defaults to 72",
)
# Profit optimization arguments.
constraint_group = parser.add_mutually_exclusive_group()
constraint_group.add_argument(
"--constrain",
dest="constraints_path",
help="Path to an optional constraint CSV",
)
constraint_group.add_argument(
"--no-constrain",
dest="constraints_path",
action="store_false",
help="Disables constraints; useful only when optimizing for profit",
)
# Gradual flattening.
gradual_group = parser.add_mutually_exclusive_group()
gradual_group.add_argument(
"--gradual",
action="store_true",
help="Use an iterative process to flatten the curve gradually (this is slow)",
)
gradual_group.add_argument(
"--price",
dest="price_path",
help="Path to an optional price curve; uses profit optimization instead of load flattening",
)
try:
run(parser.parse_args())
except ArgumentError as e:
parser.print_usage()
print("optimize.py: error:", e)