-
Notifications
You must be signed in to change notification settings - Fork 0
/
reporter.py
78 lines (61 loc) · 2.47 KB
/
reporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#!/usr/bin/env python
import pika
import time
import json
import boto3
import requests
from dotenv import find_dotenv, load_dotenv
import os
import shutil
load_dotenv(find_dotenv())
nodedata = { "node": os.environ.get("NODEName"), "cores": os.environ.get("CORES"), "key": os.environ.get("NODEUpdateKey") }
credentials = pika.PlainCredentials(
os.environ.get('RBQUser'), os.environ.get("RBQPass"))
connection = None
channel = None
SAVEPATH = os.environ.get('TMPPATH') + '/maxfield-worker-results'
s3_sess = boto3.Session(region_name="nl-ams")
s3_client = s3_sess.client('s3', endpoint_url=os.environ.get('S3URL'),
aws_access_key_id=os.environ.get('S3ACCESSKEY'), aws_secret_access_key=os.environ.get('S3SECRETKEY'))
def update_node():
headers = {'Content-Type': 'application/json'}
requests.post(url='https://maxfield-api.nia.ac.cn/update_node', headers=headers, data=json.dumps(nodedata))
def upload_dir(path, id):
for root, _, files in os.walk(path):
for file in files:
s3_client.upload_file(os.path.join(
root, file), os.environ.get("S3BUCKET"),
id + "-" + file, ExtraArgs={'ACL':'public-read'})
def init_ch():
global connection
global channel
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=os.environ.get("RBQHost"), virtual_host=os.environ.get("RBQBase"), credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='maxfield-task', durable=True)
def start_loop():
global channel
global connection
init_ch()
for _dir in os.listdir(SAVEPATH):
if _dir.endswith(".json"):
print("[MaxFieldWorker] Process result " + _dir)
with open(SAVEPATH + '/' + _dir) as loadf:
loadjson = json.load(loadf)
channel.basic_publish(
exchange='',
body=json.dumps({
"node": os.environ.get("NODEName"),
"status": loadjson["status"]
}), routing_key=loadjson["routing_key"], properties=pika.BasicProperties(correlation_id=loadjson["correlation_id"]))
upload_dir(SAVEPATH + '/' + loadjson["correlation_id"], loadjson["correlation_id"])
os.remove(SAVEPATH + '/' + _dir)
connection.close()
time.sleep(30)
if __name__ == "__main__":
while True:
update_node()
try:
start_loop()
except:
start_loop()