-
Notifications
You must be signed in to change notification settings - Fork 0
/
sentence_similarity.py
102 lines (80 loc) · 3.87 KB
/
sentence_similarity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline, Model, PipelineModel
from pyspark.ml.feature import RegexTokenizer, NGram, HashingTF, MinHashLSH
from pyspark.sql.types import *
from pyspark.sql.functions import col
'''
This is the scalable ML based code for identifying the sentence similarity
between two datasets. Algorithm used is MinHashLSH.
'''
'''
---------------------------------------
SPARK SESSION CREATION
---------------------------------------
'''
spark = SparkSession \
.builder \
.appName("Sentence_Similarity_Tool") \
.getOrCreate()
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.executor.cores", 6)
spark.conf.set("spark.dynamicAllocation.minExecutors","3")
spark.conf.set("spark.dynamicAllocation.maxExecutors","6")
LOG4JLOGGER = spark.sparkContext._jvm.org.apache.log4j
LOGGER = LOG4JLOGGER.LogManager.getLogger(__name__)
LOGGER.info("-------------------------------------------------------")
LOGGER.info("Starting the Sentence Similarity Identifier...")
LOGGER.info("-------------------------------------------------------")
# Define the Inputs
IN_INPUT_FILE="/user/root/sim-score/input/scrapped_sentences_full/ik_data_final.csv"
PDF_INPUT_FILE = "/user/root/sim-score/input/sc_tokenized_sentences/sent.out"
# Schema of the files
SCHEMA_INPUT_FILE = StructType([StructField("sentences", StringType(), True)])
SCRAPPED_INPUT_FILE = spark.read.format("com.databricks.spark.csv") \
.option("delimiter", ",") \
.option("quote", "\"") \
.option("escape", "\"") \
.option("header", "false") \
.option("delimiter", "\n") \
.schema(SCHEMA_INPUT_FILE) \
.load(IN_INPUT_FILE)
DF_INPUT_FILE_CLEAN = SCRAPPED_INPUT_FILE.filter("sentences rlike '[A-Z,a-z]'").repartition(100)
DOWNLOAD_PDF_INPUT_FILE = spark.read.format("com.databricks.spark.csv") \
.option("delimiter", ",") \
.option("quote", "\"") \
.option("escape", "\"") \
.option("header", "false") \
.option("delimiter", "\n") \
.schema(SCHEMA_INPUT_FILE) \
.load(PDF_INPUT_FILE)
DOWNLOAD_PDF_INPUT_FILE_CLEAN = DOWNLOAD_PDF_INPUT_FILE.filter("sentences rlike '[A-Z,a-z]'").repartition(100)
LOGGER.info("-------------------------------------------------------")
LOGGER.info("Loaded the dataframes...")
LOGGER.info("-------------------------------------------------------")
pipeline = Pipeline(stages=[
RegexTokenizer(
pattern="", inputCol="sentences", outputCol="tokens", minTokenLength=1
),
NGram(n=2, inputCol="tokens", outputCol="ngrams"),
HashingTF(inputCol="ngrams", outputCol="vectors"),
MinHashLSH(inputCol="vectors", outputCol="lsh")
])
model = pipeline.fit(DF_INPUT_FILE_CLEAN)
stored_hashed = model.transform(DF_INPUT_FILE_CLEAN)
landed_hashed = model.transform(DOWNLOAD_PDF_INPUT_FILE_CLEAN)
matched_df = model.stages[-1].approxSimilarityJoin(stored_hashed, landed_hashed, 0.5, "confidence")
.select(col("datasetA.sentences").alias("ik_sentence"),
col("datasetB.sentences").alias("pdf_sentence"),
col("confidence"))
LOGGER.info("-------------------------------------------------------")
LOGGER.info("Completed the ML pipeline...")
LOGGER.info("-------------------------------------------------------")
matched_df.coalesce(1).write \
.format("com.databricks.spark.csv") \
.option("header", "true") \
.mode("overwrite") \
.save("/user/root/sim-score/output/")
LOGGER.info("-------------------------------------------------------")
LOGGER.info("Sentence Similarity File generated succesfully!")
LOGGER.info("-------------------------------------------------------")