-
Notifications
You must be signed in to change notification settings - Fork 5
/
main.py
83 lines (53 loc) · 2.48 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from util import get_cfg, add_table_suffixes
from phase_1_data_preparation import phase_1_data_preparation
from phase_2_data_preprocessing import phase_2_data_preprocessing
from phase_3_classification import phase_3_classification
sc = SparkContext(appName="Churn Prediction")
sqlContext = HiveContext(sc)
# CONFIG & LOGGER PATHS CONSTANTS
CONFIG_FILE_PATH = './config/config.cfg'
CONFIG_FILE_TMP_FILES = './config/config_tmp_files.cfg'
cfg = get_cfg(CONFIG_FILE_PATH)
cfg_tables = get_cfg(CONFIG_FILE_TMP_FILES)
cfg, cfg_tables = add_table_suffixes(cfg,cfg_tables)
phase_1_data_preparation.run(cfg, cfg_tables, sqlContext)
phase_2_data_preprocessing.run(cfg, cfg_tables, sqlContext)
predictions = phase_3_classification.run(cfg, cfg_tables, sqlContext)
predictions.select("label", "churned", "probability").show()
# AUROC
evaluator_roc = BinaryClassificationEvaluator(metricName="areaUnderROC")
print('Area under ROC: {}'.format(evaluator_roc.evaluate(predictions)))
stats = predictions.groupBy("label", "prediction").count().toPandas()
stats
# compute evaluation metrics using the RDD ML lib library
predictionAndLabels = predictions.select("prediction", "label").rdd
metrics = MulticlassMetrics(predictionAndLabels)
conf = metrics.confusionMatrix().toArray()
conf
# accuracy of the model
metrics.accuracy
# class churned
metrics.precision(1.0)
metrics.recall(1.0)
metrics.fMeasure(1.0, beta=1.0)
# class nonchurned
metrics.precision(0.0)
metrics.recall(0.0)
metrics.fMeasure(0.0, beta=1.0)
# split probability column (of vectors) into two columns
split1_udf = udf(lambda value: value[0].item(), FloatType())
split2_udf = udf(lambda value: value[1].item(), FloatType())
predictions = predictions.withColumn("probability_nonchurned", split1_udf('probability').alias('c1'))
predictions = predictions.withColumn("probability_churned", split2_udf('probability').alias('c2'))
predictions.groupBy("label").count().show()
N = 1000
top_potential_churners = predictions.filter("prediction = 1.0").orderBy("probability_churned", ascending = False).limit(N)
predictions_counts = top_potential_churners.groupBy("label").count()
predictions_counts = predictions_counts.withColumn("fraction", predictions_counts["count"] * 1.0 / N)
predictions_counts.show()