-
Notifications
You must be signed in to change notification settings - Fork 0
/
TEG_profiler_cloudv2.py
364 lines (263 loc) · 11.8 KB
/
TEG_profiler_cloudv2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
################################################
#
# TEG profiler script (cloud+local)
#
# University of Virginia
# Author: Victor Ariel Leal Sobral
#
################################################
import board
import digitalio
import os
import threading
import csv
import adafruit_ads1x15.ads1015 as ADS
from adafruit_ads1x15.analog_in import AnalogIn
from adafruit_bus_device.i2c_device import I2CDevice
import mcp9600
import time
from datetime import datetime
import paho.mqtt.client as mqtt
import json
import logging
#import math
###########
#MQTT functions
# # Uncoment to print MQTT log details
# def on_log(client, userdata, level, buf):
# print("<< log: "+buf+" >>")
# # Uncoment to print sent message details
# def on_message(client, userdata, message):
# print("\nmessage received: ", str(message.payload.decode("utf-8")))
# print("message topic = ", message.topic)
# print("message qos = ", message.qos)
# print("message retain flag = ", message.retain)
def on_connect(client, userdata, flags, rc):
if rc ==0:
client.connected_flag=True
print("Successfully connected")
logging.info("[MQTT]: Successfully connected at "+str(datetime.utcnow().isoformat()))
else:
print("Bad connection, returned code = ", rc)
logging.error("[MQTT]: Bad connection, returned code = "+str(rc)+" at "+str(datetime.utcnow().isoformat()))
def on_disconnect(client, userdata, flags, rc=0):
print("Disconnected, result code = ", str(rc))
logging.info("[MQTT]: Disconnected, result code = " +str(rc)+" at "+str(datetime.utcnow().isoformat()))
###########
# MQTT publishing function
def cloud_upload(APP_ID,BROKER_ADDRESS, data_list):
print("... starting cloud upload thread")
# Standard message with fields expected by the MQTT broker
message = {
"app_id":APP_ID,
"counter": 0,
"payload_fields":{
"voltage_chan_OFF":{
"displayName":"High Impedance",
"unit":"V",
"value":-999.99
},
"voltage_chan_0":{
"displayName":"Channel 0",
"unit":"V",
"value":-999.99
},
"voltage_chan_1":{
"displayName":"Channel 1",
"unit":"V",
"value":-999.99
},
"voltage_chan_2":{
"displayName":"Channel 2",
"unit":"V",
"value":-999.99
},
"voltage_chan_3":{
"displayName":"Channel 3",
"unit":"V",
"value":-999.99
},
"temperature_amb":{
"displayName":"Ambient temperature",
"unit":"°C",
"value":-999.99
},
"temperature_hot":{
"displayName":"Hot side temperature",
"unit":"°C",
"value":-999.99
}
},
"metadata":{
"time":"2020-12-01T12:00:00.000000000Z"
}
}
client = mqtt.Client(APP_ID) # Creates a new MQTT client instance
# client.on_log = on_log
# client.on_message = on_message
client.on_connect = on_connect
client.on_disconnect = on_disconnect
# client.reconnect_delay_set(min_delay=10, max_delay=120)
client.connect(BROKER_ADDRESS) # Connects to MQTT broker
timeout = 10 # Checks connection status for 10 seconds
while not client.connected_flag:
time.sleep(1)
timeout = timeout - 1
if timeout <= 0:
print("connection timeout failure!")
logging.error("[MQTT]: Cloud upload thread exited due to connection timeout at "+str(datetime.utcnow().isoformat()))
return None
client.loop_start()
client.subscribe("linklab/teg_eh_profiler", qos=0) # Subscribes to linklab/teg_eh_profiler topic
for COUNTER in range(len(data_list)):
message['metadata']['time'] = data_list[COUNTER][0]
message['payload_fields']['voltage_chan_OFF']['value'] = data_list[COUNTER][1]
message['payload_fields']['voltage_chan_0']['value'] = data_list[COUNTER][2]
message['payload_fields']['voltage_chan_1']['value'] = data_list[COUNTER][3]
message['payload_fields']['voltage_chan_2']['value'] = data_list[COUNTER][4]
message['payload_fields']['voltage_chan_3']['value'] = data_list[COUNTER][5]
message['payload_fields']['temperature_amb']['value'] = data_list[COUNTER][6]
message['payload_fields']['temperature_hot']['value'] = data_list[COUNTER][7]
message['counter'] = COUNTER
client.publish("linklab/teg_eh_profiler",json.dumps(message),qos=0)
time.sleep(0.2)
client.loop_stop()
client.disconnect() # disconnect
print("cloud upload thread complete!")
return None
###########
# CSV writing function
def file_writer(file_name, directory, header, data_list):
print("... starting local storage thread")
with open(directory+'/'+file_name, 'w') as file:
csvwriter = csv.writer(file, delimiter = ',')
csvwriter.writerow(header)
for row in data_list:
csvwriter.writerow(row)
print("local storage thread complete!")
return None
###########
# Logging file configurations
logging.basicConfig(filename ='/home/pi/Desktop/shared/TEG_profiler.log', level=logging.INFO) # formating log file
logging.info('===================================================================')
logging.info('[Events]: TEG profiler cloud script started at '+str(datetime.utcnow().isoformat()))
###########
# Local data storage configurations
directory = '/home/pi/Desktop/shared/data'
if not os.path.exists(directory):
os.makedirs(directory)
batch_size = 1800 # Equivalent of 15 minutes at sampling rate of 0.5 Hz
header = ['Timestamp', 'voltage_chan_OFF', 'voltage_chan_0', 'voltage_chan_1', 'voltage_chan_2', 'voltage_chan_3', 'temperature_amb', 'temperature_hot']
data_list = [[None]*8 for i in range(batch_size)] # creates a buffer for all variables with given batch size
###########
# Profiler configuration settings
# Reads application info file with application ID and MQTT borker address
try:
print("Reading application info file...")
with open("/home/pi/Desktop/Application_info.txt") as json_appInfo:
APP_INFO = json.load(json_appInfo)
except Exception as e:
print("Application info file could not be loaded")
logging.error("[FileIO]: Application info file could not be loaded")
logging.error("[FileIO]: "+e)
raise
BROKER_ADDRESS = '34.230.161.172' # APP_INFO["BROKER_ADDRESS"] # IP address of the MQTT broker
APP_ID = APP_INFO["APP_ID"] # how this application will be identified in the cloud database
SAMPLING_PERIOD = 0.5 # in seconds (max 0.1)
##########
# Main code
# # Uncoment to start script after pushing GPIO17 button
# button = digitalio.DigitalInOut(board.D17)
# button.direction = digitalio.Direction.INPUT
#
# print("press button to start...")
#
# while True:
# if button.value == False:
# break
#
print("Starting I2C devices...")
try:
i2c = board.I2C()
logging.info("[I2C]: raspberry pi board I2C interface was sucesfully initialized")
except Exception as e:
logging.error("[I2C]: raspberry pi board I2C interface initialization error")
logging.error("[I2C]: "+e)
try:
pca = I2CDevice(i2c, 0x41) # creates the PCA GPIO controller at address 0x41
pca.write(bytes([0x03,0x00]))# configure GPIO as output
logging.info("[I2C]: PCA GPIO controller was sucesfully initialized and configured")
except Exception as e:
logging.error("[I2C]: PCA GPIO controller initialization error")
logging.error("[I2C]: "+e)
try:
ads = ADS.ADS1015(i2c) # creates the ADS analog to digital converter at default address (0x48)
ads.gain = 8 # configures PGA gain to 8, resulting on range of +-0.512V (valid configurations: 2/3, 1, 2, 4, 8, 16)
ads.mode = ADS.Mode.CONTINUOUS # converts at max speed, reads most recent conversion through I2C
chan = AnalogIn(ads, ADS.P0) # configures ADS to read analog values from channel 0
logging.info("[I2C]: ADS analog to digital converter was sucesfully initialized and configured")
except Exception as e:
logging.error("[I2C]: ADS analog to digital converter initialization error")
logging.error("[I2C]: "+e)
try:
mcp = mcp9600.MCP9600(i2c_addr=0x60) # creates the MCP thermocouple amplifier at address 0x60, default config for K-type thermocouple
logging.info("[I2C]: MCP thermocouple amplifier was sucesfully initialized and configured")
except Exception as e:
logging.error("[I2C]: ADS analog to digital converter initialization error")
logging.error("[I2C]: "+e)
# Configuring interruption button on GPIO 17 (hold for 1 SAMPLE_PERIOD to stop)
button = digitalio.DigitalInOut(board.D17)
button.direction = digitalio.Direction.INPUT
COUNTER = 0
print("Starting acquisition...")
while True:
timestamp = datetime.utcnow()
try:
pca.write(bytes([0x01,0x00])) # set all transistor switches off
time.sleep(0.010)
data_list[COUNTER][1] = chan.voltage # read TEG open circuit voltage
time.sleep(0.005)
pca.write(bytes([0x01,0x01])) # open only channel zero switch (0.1 ohm channel)
time.sleep(0.010)
data_list[COUNTER][2] = chan.voltage # read TEG output voltage
time.sleep(0.005)
pca.write(bytes([0x01,0x02])) # open only channel one switch (0.47 ohm channel)
time.sleep(0.010)
data_list[COUNTER][3] = chan.voltage # read TEG output voltage
time.sleep(0.005)
pca.write(bytes([0x01,0x04])) # open only channel two switch (1.5 ohm channel)
time.sleep(0.010)
data_list[COUNTER][4] = chan.voltage # read TEG output voltage
time.sleep(0.005)
pca.write(bytes([0x01,0x08])) # open only channel three switch (4.7 ohm channel)
time.sleep(0.010)
data_list[COUNTER][5] = chan.voltage # read TEG output voltage
except Exception as e:
logging.error("[I2C]: TEG I-V curve scan failed at COUNTER = "+str(COUNTER)+", timestamp = "+str(datetime.utcnow().isoformat()))
logging.error("[I2C]: "+e)
try:
data_list[COUNTER][6] = float(mcp.get_cold_junction_temperature()) # measure ambient temperature (cold junction)
data_list[COUNTER][7] = float(mcp.get_hot_junction_temperature()) # measure probe temperature (hot junction)
except Exception as e:
logging.error("[I2C]: MCP thermocouple amplifier measurements have failed at COUNTER = "+str(COUNTER)+", timestamp = "+str(datetime.utcnow().isoformat()))
logging.error("[I2C]: "+e)
data_list[COUNTER][0] = timestamp.isoformat()+'Z'
COUNTER += 1
if COUNTER == batch_size:
file_name = timestamp.strftime('%Y%m%d_%H_%M')+'.csv'
file_write_thread = threading.Thread(target=file_writer, args = (file_name, directory, header, data_list))
file_write_thread.start()
cloud_upload_thread = threading.Thread(target=cloud_upload, args = (APP_ID,BROKER_ADDRESS, data_list))
cloud_upload_thread.start()
COUNTER = 0
print(str(batch_size)+" messages sucessfully acquired and local store and upload threads started!")
logging.info("[Events]: "+str(batch_size)+" messages sucessfully acquired and local store and upload threads started at "+str(timestamp))
# Hold button on GPIO17 to exit script
if button.value == False:
break
delay = (datetime.utcnow() - timestamp).microseconds
# print(max(SAMPLING_PERIOD - delay*(10**-6),0.01))
time.sleep(max(SAMPLING_PERIOD - delay*(10**-6),0.01))
print("TEG profiler cloud script interrupted")
logging.info('[Events]: TEG profiler cloud script interrupted at '+str(datetime.utcnow().isoformat()))
print("Main data acquisition script complete, wait for local store and cloud upload threads to finish")