-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.py
83 lines (70 loc) · 2.5 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# -*- coding: utf-8 -*-
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, mapper
import sqlalchemy.exc
import tables
class DB(object):
def __init__(self):
Session = sessionmaker(bind=tables.engine)
self.session = Session()
def __row2dict(self, row):
"""transform sqlalchemy row proxy into an dict"""
if not row:
raise ValueError("None isn't a valid value")
d = {}
for column in row.__table__.columns:
d[column.name] = getattr(row, column.name)
return d
def __dictlist(self, res):
"""transform table object into a list of dicts"""
l = []
for row in res:
l.append(self.__row2dict(row))
return l
def addNode(self, node):
try:
self.session.add(tables.Nodes(**node))
self.session.commit()
except:
self.session.rollback()
raise
def getNode(self, token):
try:
return self.__row2dict(self.session.query(tables.Nodes).filter(tables.Nodes.token == token).one())
except:
raise
def getNodeList(self):
try:
return self.__dictlist(self.session.query(tables.Nodes).all())
except:
raise
def updateNode(self, node):
try:
self.session.query(tables.Nodes).filter(tables.Nodes.token == node['token']).update(node)
self.session.commit()
except:
raise
def __checkValue(self, column, value, token):
if token:
if self.session.query(tables.Nodes).filter(tables.Nodes.token != token).filter(column == value).first():
return True
else:
return False
else:
if self.session.query(tables.Nodes).filter(column == value).first():
return True
else:
return False
def checkHostname(self, hostname, token):
return self.__checkValue(tables.Nodes.hostname, hostname, token)
def checkMAC(self, mac, token):
return self.__checkValue(tables.Nodes.mac, mac, token)
def checkKey(self, key, token):
return self.__checkValue(tables.Nodes.key, key, token)
def checkToken(self, token):
if self.session.query(tables.Nodes).filter(tables.Nodes.token == token).first():
return True
else:
return False
def getNodeMailMap(self):
return self.session.query(tables.Nodes.hostname, tables.Nodes.email).all()