-
Notifications
You must be signed in to change notification settings - Fork 0
/
db_connection.py
133 lines (92 loc) · 4.26 KB
/
db_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#---------------------------------------------------------------------------------------------------#
# File name: db_connection.py #
# Autor: Chrissi2802 #
# Created on: 19.12.2022 #
#---------------------------------------------------------------------------------------------------#
# This file provides classes / functions to connect and communicate with a database.
# Exact description in the functions.
import yaml
from yaml.loader import SafeLoader
import pyodbc
import pandas as pd
class SQL_Connection():
"""This class can be used to establish a connection to an SQL server.
"""
def __init__(self):
"""Initialisation of the class (constructor).
"""
# Read yaml file
self.stream = open("db_connection.yaml", "r")
login_data_yaml = list(yaml.load_all(self.stream, Loader = SafeLoader))[0] # Data are only in the first element -> dictionary
self.driver = "ODBC Driver 17 for SQL Server"
self.server = login_data_yaml["server"]
self.database = login_data_yaml["database"]
self.username = login_data_yaml["username"]
self.password = login_data_yaml["password"]
self.login = "DRIVER={" + self.driver + "};SERVER=" + self.server + ";DATABASE=" + self.database + \
";UID=" + self.username + ";PWD=" + self.password
self.connect()
def connect(self):
"""This method connects to the database.
"""
try:
self.conn = pyodbc.connect(self.login)
self.cursor = self.conn.cursor()
except pyodbc.Error as e_connection:
print("Error with SQL connection!")
print("Error number:", e_connection)
def disconnect(self):
"""This method disconnects from the database.
"""
self.conn.close()
class MSSQL_Connection(SQL_Connection):
"""This class can be used to connect to an SQL server and execute commands.
Args:
SQL_Connection (class): Parent class from which is inherited
"""
def __init__(self, table):
"""Initialisation of the class (constructor).
Args:
table (string): Table name
"""
super().__init__()
self.table = table
def count_id(self):
"""This method counts all ids in the table.
Returns:
count (integer): Number of ids
"""
sql_command = "SELECT COUNT(ID) FROM " + self.database + "." + self.table + ";"
df = pd.read_sql(sql_command, self.conn)
count = df.iloc[0, 0] # only the count as integer
return count
def read_data(self, where = ""):
"""This method reads all data from the table into a DataFrame.
Args:
where (string, optional): SQL WHERE statement, if only a part of the data is to be considered. Defaults to "".
Returns:
df (pandas DataFrame): Data from the table
"""
sql_command = "SELECT * FROM " + self.database + "." + self.table + where + ";"
df = pd.read_sql(sql_command, self.conn)
return df
def read_data_select(self, sql_command):
"""This method reads all data from the table into a DataFrame with a passed SELECT command.
Args:
sql_command (string): Complete SELECT command
Returns:
df (pandas DataFrame): data from the table
"""
df = pd.read_sql(sql_command, self.conn)
return df
if (__name__ == "__main__"):
table = "dbo.table"
select_command = "SELECT ID, NAME FROM Customers." + table + " WHERE STATUS = 'active';"
CMSSQL_Connection = MSSQL_Connection(table)
counts = CMSSQL_Connection.count_id()
df_complete = CMSSQL_Connection.read_data()
df_part = CMSSQL_Connection.read_data_select(select_command)
CMSSQL_Connection.disconnect()
print(counts)
print(df_complete)
print(df_part)