-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
368 lines (312 loc) · 14.1 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
"""High level API for charging procedures."""
import os
from datetime import datetime
import csv
import const
import timers
import charger
import access
# window = evsGUI.window
# def printMsg(text=''):
# window['-MESSAGE-'].update(text)
# todo: !! put all functions in a new class Utils
# todo: PV power must remain for x minutes before decision
# todo: night charging solution (charge < x%; manual intervention via remotecontrol ...)
charger_ip = const.C_CHARGER_WIFI_URL # TODO automatic search charger
charge = charger.Charger(charger_ip, const.C_CHARGER_API_VERSION)
class SysData: # class variables as kind of (global) C structure
"""All data used for signal processing and display."""
chargerAPIversion = 0
carPlugged = False
batteryLevel = 0 # % from Renault server car API
batteryLimit = 0 # %
solarPower = 0 # kw from Solar Inverter Cloud
pvToGrid = 0 # kW from Solar Inverter Cloud
chargePower = 0.0 # kW from go-eCharger Wallbox
currentL1 = 0 # A from go-eCharger Wallbox
voltageL1 = 0 # V from go-eCharger Wallbox
chargeActive = False # from go-eCharger
measuredPhases = 0 # 1 | 3 number of measuredPhases
actPhases = 0 # actual charger psm setting (C_CHARGER_x_PHASE)
actCurrSet = 0 # actual charger setting
reqPhases = 0 # requested phase by user or PV situation
calcPvCurrent_1P = 0 # calculated 1 phase current, limited to max. setting
calcPvCurrent_3P = 0 # calculated 3 phase current, if pvToGrid > minimum 3 phase current
setCurrent = 0
chargerState = "?"
carState = "---"
pvHoldTimer = timers.EcTimer()
phaseHoldTimer = timers.EcTimer()
scanTimer = timers.EcTimer()
pvScanTimer = timers.EcTimer()
carScanTimer = timers.EcTimer()
carErrorCounter = 0 # increment if error during car data read
pvError = 0
chargerError = 0
class ChargeModes: # kind of enum
"""Constants defining system state."""
UNPLUGGED = 0
IDLE = 1
PV_INIT = 2
PV_EXEC = 3
FORCE_REQUEST = 4
FORCED = 5
STOPPED = 6
EXTERN = 7
LIMIT_REACHED = 8
CAR_ERROR = 9
CHARGER_ERROR = 10
PV_ERROR = 11
def processChargerData(sysData=SysData):
"""
Converts charger data into sysData format.
:param sysData: data record similar to C structure
:return: updated sysData
"""
chargerData = charge.get_charger_data()
print('Charger Data:', chargerData)
if chargerData['statusCode'] != 200:
sysData.chargerError = chargerData['statusCode']
return sysData
else:
sysData.chargerError = 0
sysData.chargerAPIversion = chargerData['apiVer']
sysData.carPlugged = False
sysData.chargePower = 0
if int(chargerData['car']) > 1: # car is plugged
sysData.carPlugged = True
if sysData.chargerAPIversion == 1:
sysData.chargePower = chargerData['nrg'][11] / 100 # original value is in 10W
sysData.currentL1 = chargerData['nrg'][4] / 10 # original value is in 0.1A
if chargerData['nrg'][4] > 10:
sysData.chargeActive = True
else:
sysData.chargeActive = False
else: # API V2
sysData.chargePower = chargerData['nrg'][11] / 1000 # original value is in W
sysData.actPhases = chargerData['psm']
f_current = "{:.1f}".format(chargerData['nrg'][4] ) # original value is float in 1A
sysData.currentL1 = f_current # 2 decimal digits
if chargerData['frc'] == 2:
sysData.chargeActive = True # True while charging
else:
sysData.chargeActive = False
sysData.voltageL1 = chargerData['nrg'][0]
if chargerData['nrg'][6] > 1: # current on L3, if charging with 3 measuredPhases
sysData.measuredPhases = 3
else:
sysData.measuredPhases = 1
sysData.actCurrSet = chargerData['amp']
else: # car is unplugged
sysData.chargeActive = False
sysData.chargerState = const.C_CHARGER_STATUS_TEXT[int(chargerData['car'])]
return sysData
def calcChargeCurrent(sysData, chargeMode, maxCurrent_1P, minCurrent_3P):
"""
Calculate optimal charge current depending on solar power
:param sysData: data record similar to C structure
:param chargeMode: constant from class ChargeModes
:param maxCurrent_1P: [A] upper limit for 1 phase
:param minCurrent_3P: [A] minimum used for switch over to 3 phases
:return sysData: updatad data record
"""
if chargeMode == ChargeModes.UNPLUGGED:
return sysData
calc_free_current = 0
# todo: correct for external charge: calc depending on chargeMode AND pvToGrid
if chargeMode == ChargeModes.IDLE:
newPower = sysData.pvToGrid - const.C_PV_MIN_REMAIN
else:
newPower = sysData.chargePower + sysData.pvToGrid - const.C_PV_MIN_REMAIN # calculation in kW
if sysData.voltageL1 > 0:
calc_free_current = newPower * 1000 / sysData.voltageL1 # calc_free_current ersetzen durch Power?
# TEST!!! calc_free_current = 9
sysData.calcPvCurrent_3P = 0
sysData.calcPvCurrent_1P = 0
sysData.setCurrent = 0
if calc_free_current > maxCurrent_1P: #
sysData.calcPvCurrent_1P = int(maxCurrent_1P) # limit 1 phase current
else:
sysData.calcPvCurrent_1P = int(calc_free_current)
if calc_free_current >= (minCurrent_3P * 3): # switch to 3 phase request
sysData.calcPvCurrent_3P = int(calc_free_current / 3)
sysData.reqPhases = const.C_CHARGER_3_PHASES
sysData.setCurrent = sysData.calcPvCurrent_3P
else:
sysData.reqPhases = const.C_CHARGER_1_PHASE
sysData.setCurrent = sysData.calcPvCurrent_1P
return sysData
# noinspection PyPep8
def evalChargeMode(chargeMode, sysData, settings):
""" State machine depending on realtime data and user intervention
:param chargeMode: enum like construct defined as class ChargeModes
:param sysData: C structure like record defined as class sysData
:param settings: dictionary from PV_Manager.json file
:return: processed charge mode
"""
sysData.scanTimer.set(const.C_SYS_IDLE_SCAN_TIME) # sets the cyclic timing
new_chargeMode = chargeMode # stay in mode if nothing happens
pvSettings = settings['pv']
manualSettings = settings['manual']
pvAllow3phases = pvSettings['allow_3_phases']
######## Cyclic Suppport Functions
#### Get Charger data
sysData = processChargerData(sysData)
if sysData.chargerError == 0:
if not sysData.carPlugged:
new_chargeMode = ChargeModes.UNPLUGGED
else:
chargeMode = ChargeModes.CHARGER_ERROR # force error state
#### Read battery level from car data
# noinspection PyPep8
# if True: # read data even if car is unplugged
if sysData.carPlugged == True:
if sysData.carScanTimer.read() == 0:
sysData.carScanTimer.set(const.C_SYS_CAR_CLOCK)
carData = access.ec_GetCarData()
print('Car data:', carData)
sysData.batteryLevel = carData['batteryLevel']
if sysData.batteryLevel < 0:
sysData.carErrorCounter = sysData.carErrorCounter + 1
print('ERROR Reading Car Data, count =', sysData.carErrorCounter)
else:
sysData.carErrorCounter = 0
#### Read Solar data and charge decision
if sysData.pvScanTimer.read() == 0:
sysData.pvScanTimer.set(const.C_SYS_PV_CLOCK)
pvData = access.ec_GetPVData(tout=20)
if pvData['statusCode'] == 200:
sysData.pvError = 0
sysData.pvToGrid = round(pvData['PowerToGrid'], 1)
sysData.solarPower = round(pvData['pvPower'], 1)
sysData = calcChargeCurrent(sysData, chargeMode, pvSettings['max_1_Ph_current'],
pvSettings['min_3_Ph_current'])
if chargeMode == ChargeModes.IDLE:
if sysData.calcPvCurrent_1P >= const.C_CHARGER_MIN_CURRENT:
if sysData.batteryLevel > 0: # no error condition
if sysData.batteryLevel < sysData.batteryLimit:
chargeMode = ChargeModes.PV_EXEC # set new state
else:
sysData.pvError = sysData.pvError + 1
if chargeMode == ChargeModes.PV_EXEC:
if sysData.carPlugged:
new_chargeMode = chargeMode # stay in mode
else:
new_chargeMode = ChargeModes.UNPLUGGED
#### charge end criteria
if sysData.batteryLevel >= sysData.batteryLimit \
or sysData.calcPvCurrent_1P < const.C_CHARGER_MIN_CURRENT \
or new_chargeMode == ChargeModes.UNPLUGGED:
charge.stop_charging()
charge.set_phase(const.C_CHARGER_1_PHASE)
sysData.chargeActive = False
new_chargeMode = ChargeModes.IDLE
#### charging process control
else:
if sysData.actPhases == sysData.reqPhases:
if sysData.actCurrSet != sysData.setCurrent:
charge.set_current(sysData.setCurrent)
if not sysData.chargeActive:
pv_hold = sysData.pvHoldTimer.read()
if pv_hold == 0:
charge.start_charging()
sysData.pvHoldTimer.set(const.C_SYS_MIN_PV_HOLD_TIME)
else:
print("PV hold time active ", pv_hold, "sec")
else: # initiate phase switch
if pvAllow3phases:
if sysData.phaseHoldTimer.read() == 0:
sysData.phaseHoldTimer.set(const.C_SYS_MIN_PHASE_HOLD_TIME)
charge.stop_charging()
charge.set_phase(sysData.reqPhases)
else:
print('waiting for phase switch')
#### handle manual start
elif chargeMode == ChargeModes.FORCE_REQUEST:
if sysData.batteryLevel < manualSettings['chargeLimit']:
if manualSettings['3_phases']:
sysData.reqPhases = const.C_CHARGER_3_PHASES
else:
sysData.reqPhases = const.C_CHARGER_1_PHASE
if sysData.phaseHoldTimer.read() == 0:
if sysData.actPhases != sysData.reqPhases: # phase switch requested?
charge.set_phase(sysData.reqPhases)
sysData.phaseHoldTimer.set(const.C_SYS_MIN_PHASE_HOLD_TIME)
charge.set_current(manualSettings['currentSet'])
charge.start_charging()
new_chargeMode = ChargeModes.FORCED
else:
print('Phase change request -> wait, hold time', sysData.phaseHoldTimer.read())
else:
new_chargeMode = ChargeModes.IDLE
sysData.chargeActive = False
elif chargeMode == ChargeModes.FORCED:
if sysData.batteryLevel >= manualSettings['chargeLimit']:
new_chargeMode = ChargeModes.IDLE
print('CHARGE OFF, manual limit reached')
charge.stop_charging()
sysData.chargeActive = False
charge.set_phase(const.C_CHARGER_1_PHASE)
elif chargeMode == ChargeModes.STOPPED:
if sysData.phaseHoldTimer.read() == 0:
new_chargeMode = ChargeModes.IDLE
print('CHARGE STOPPED by user')
charge.stop_charging()
sysData.chargeActive = False
charge.set_phase(const.C_CHARGER_1_PHASE)
else:
print('phaseHoldTimer waiting:', sysData.phaseHoldTimer.read())
elif chargeMode == ChargeModes.IDLE:
if sysData.chargePower > 1:
new_chargeMode = ChargeModes.EXTERN
elif chargeMode == ChargeModes.EXTERN:
if sysData.chargePower < 1:
charge.stop_charging(authenticate=1) # despite external stop, restore authenticate
new_chargeMode = ChargeModes.IDLE
elif chargeMode == ChargeModes.UNPLUGGED:
if sysData.carPlugged:
new_chargeMode = ChargeModes.IDLE
if sysData.pvError >= 2: # continue charging with old data below this limit
if chargeMode == ChargeModes.PV_EXEC:
charge.stop_charging()
new_chargeMode = ChargeModes.IDLE
if sysData.carErrorCounter >= 2:
if chargeMode != ChargeModes.EXTERN:
# sysData.carErrorCounter = 0
if chargeMode != ChargeModes.IDLE:
charge.stop_charging()
new_chargeMode = ChargeModes.IDLE
if sysData.chargerError:
# charge.stop_charging() # try stopping charger anyway
new_chargeMode = ChargeModes.IDLE
return new_chargeMode
def writeLog(sysData, strMessage="", strMode="", logpath=const.C_LOG_PATH):
"""
Write logfile on event or mode change.
:param strMode:
:param strMessage: free text
:param sysData: instance of class SysData
:param logpath: full path including file nane
:return: characters written
"""
chars = 0
now = datetime.now()
date = now.date()
strDate = date.strftime('%Y-%m-%d')
strTime = now.strftime('%H:%M:%S')
logDict = {"Date": strDate, "Time": strTime, "CarState": sysData.carState, "Actual Mode": strMode,
"Message": strMessage,
"BattLevel": sysData.batteryLevel, "Batt Limit": sysData.batteryLimit, "Pwr2Grid": sysData.pvToGrid,
"Charge Power": sysData.chargePower, "Phases": sysData.actPhases, "Charge Active": sysData.chargeActive}
if not os.path.isfile(logpath):
header = []
for keys in logDict:
header.append(keys)
logfile = open(logpath, 'w')
writer = csv.DictWriter(logfile, header)
chars = writer.writeheader()
logfile = open(logpath, 'a', newline='')
writer = csv.DictWriter(logfile, logDict)
chars = writer.writerow(logDict)
logfile.close()
return chars