forked from YuAnChenToTheMoon/COEN_242_PySpark
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Linear_Regression_Pyspark.py
161 lines (65 loc) · 2.1 KB
/
Linear_Regression_Pyspark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#Start a new Spark Session
from pyspark.sql import SparkSession
# App named 'Cruise'
spark = SparkSession.builder.appName('cruise').getOrCreate()
# In[6]:
#Read the csv file in a dataframe
df = spark.read.csv('transactions.csv',inferSchema=True,header=True)
# In[7]:
#Check the structure of schema
df.printSchema()
# In[8]:
df.show()
# In[9]:
df.describe().show()
# In[10]:
# df.groupBy('Cruise_line').count().show()
# In[23]:
#Convert string categorical values to integer categorical values
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="creditLimit", outputCol = "credit_limit_output")
indexed = indexer.fit(df).transform(df)
indexed.head(5)
# In[24]:
from pyspark.ml.linalg import Vectors
# In[25]:
from pyspark.ml.feature import VectorAssembler
# In[26]:
indexed.columns
# In[28]:
# Create assembler object to include only relevant columns
assembler = VectorAssembler(
inputCols=["creditLimit", "availableMoney", "transactionAmount"],
outputCol="Features")
# In[29]:
output = assembler.transform(indexed)
# In[30]:
output.select("features","creditLimit").show()
# In[31]:
final_data = output.select("features","creditLimit")
# In[32]:
#Split the train and test data into 70/30 ratio
train_data,test_data = final_data.randomSplit([0.7,0.3])
# In[33]:
from pyspark.ml.regression import LinearRegression
# In[34]:
#Training the linear model
lr = LinearRegression(labelCol='creditLimit')
# In[35]:
lrModel = lr.fit(train_data)
# In[39]:
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))
# In[40]:
#Evaluate the results with the test data
test_results = lrModel.evaluate(test_data)
# In[41]:
print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("MSE: {}".format(test_results.meanSquaredError))
print("R2: {}".format(test_results.r2))
# In[42]:
from pyspark.sql.functions import corr
# In[43]:
#Checking for correlations to explain high R2 values
df.select(corr('creditLimit','availableMoney')).show()
# In[44]:
df.select(corr('creditLimit','transactionAmount')).show()