-
Notifications
You must be signed in to change notification settings - Fork 0
/
db_handler_mysql.py
133 lines (112 loc) · 4.78 KB
/
db_handler_mysql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
import json
from sqlalchemy import *
from sqlalchemy.exc import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv
from sqlalchemy_utils import database_exists, create_database
from logger import setup_logging
dotenv_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '.env')
load_dotenv(dotenv_path)
database = os.environ.get('fastapi_db')
user = os.environ.get('fastapi_db_user')
passwd = os.environ.get('fastapi_db_user_pass')
host = os.environ.get('fastapi_db_host')
port = os.environ.get('fastapi_db_port')
Base = declarative_base()
class ProbeData(Base):
__tablename__ = 'ProbeData'
SSID = Column('SSID', VARCHAR(length=255), primary_key=True, unique=False)
MAC = Column('MAC', VARCHAR(length=255), unique=False)
def __repr__(self):
return f"<ProbeData(SSID={self.SSID}, MAC={self.MAC})>"
class WiFiData(Base):
__tablename__ = 'WiFiData'
SSID = Column('SSID', VARCHAR(length=255), primary_key=True)
RSSI = Column('RSSI', INTEGER, nullable=True, unique=False)
MAC = Column('MAC', VARCHAR(length=255), nullable=False, unique=False)
Encryptiontype = Column('Encryptiontype', VARCHAR(
length=255), nullable=True, unique=False)
__table_args__ = (UniqueConstraint('SSID'),)
def __repr__(self):
return f"<WiFiData(SSID={self.SSID}, RSSI={self.RSSI}, MAC={self.MAC}, Encryptiontype={self.Encryptiontype})>"
class database_handler():
def __init__(self):
self.config = {
'user': user,
'passwd': passwd,
'host': host,
'port': port
}
self.db = database
self.engine = create_engine(
f"mysql+pymysql://{self.config['user']}:{self.config['passwd']}@{self.config['host']}:{self.config['port']}/{self.db}"
)
self.sqlsession = sessionmaker(bind=self.engine)
self.logger = setup_logging("db_handler")
self.create_database()
def return_url(self):
return f"mysql+pymysql://{self.config['user']}:{self.config['passwd']}@{self.config['host']}:{self.config['port']}/{self.db}"
def return_metadata(self):
return Base.metadata
def create_database(self):
if not database_exists(self.engine.url):
self.logger.log("creating database")
create_database(self.engine.url)
def search_wifi(self, ssid: str):
session = self.sqlsession()
result = session.query(WiFiData).filter_by(SSID=ssid).first()
self.logger.debug(f"Fetching this WiFiData from database: {id}")
if not result:
return False
else:
return {"SSID": result.SSID, "RSSI": result.RSSI, "MAC": result.MAC, "Encryptiontype": result.Encryptiontype}
def get_wifi_data(self):
data_dict = {}
session = self.sqlsession()
result = session.execute(select([WiFiData]))
for row in result:
self.logger.debug(
f"Fetched this WiFiData: {row.SSID} with this mac: {row.MAC}")
data_dict[row.SSID] = {
"RSSI": row.RSSI, "MAC": row.MAC, "Encryptiontype": row.Encryptiontype}
return data_dict
def add_WiFiData(self, scanData):
session = self.sqlsession()
try:
for data in scanData["wifilist"]:
new_WiFiData = WiFiData(SSID=data["SSID"],
RSSI=data["RSSI"], MAC=data["MAC"], Encryptiontype=data["Encryptiontype"])
session.add(new_WiFiData)
session.commit()
except IntegrityError as e:
self.logger.exception(f"Duplicate keys provided: {e}")
return False, "Duplicate keys provided"
return True, None
def get_probe_data(self):
data_dict = {}
session = self.sqlsession()
result = session.execute(select([ProbeData]))
for row in result:
if row.MAC not in data_dict:
data_dict[row.MAC] = []
self.logger.debug(
f"Fetched this probedata: {row.SSID} with this mac: {row.MAC}"
)
if row.SSID not in data_dict[row.MAC]:
data_dict[row.MAC].append(row.SSID)
return data_dict
def add_ProbeData(self, probeddata):
session = self.sqlsession()
try:
for data in probeddata["ProbeList"]:
if data["SSID"] != "":
self.logger.debug("Adding probedata to database")
new_probedata = ProbeData(
SSID=data["SSID"], MAC=data["MAC"])
session.add(new_probedata)
session.commit()
except IntegrityError as e:
self.logger.exception(f"Duplicate somethin, this shouldn't occurr")
return True, None