Skip to content

Commit

Permalink
[Issue #5] Refactored etl.py and minor formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
abhisrn1986 committed Jun 5, 2022
1 parent 12e83e0 commit e9c1801
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 96 deletions.
137 changes: 48 additions & 89 deletions etl_job/etl.py
Original file line number Diff line number Diff line change
@@ -1,126 +1,83 @@
import socket
import time
from base64 import decode
from collections import deque

import pymongo
import configparser
import streamlit as st

import slack
from preprocess_tweets import get_tweet_image_url, get_tweet_text
from sentimental_analysis import get_score_tweet, get_tweet_sentiment
import utils


def is_replica_set(mongo_db):
"""Checks if mongoDB client is a replica set.
Args:
mongo_db (pymongo.MongoClient): MongoDB client comprising
of database "tweets".
Returns:
bool: True if a replicaset else false.
"""
try:
mongo_db.admin.command("replSetGetStatus")
return True
except pymongo.errors.OperationFailure:
return False


def send_query_to_tweet_stream(query):
# The server's hostname or IP address
HOST = socket.gethostbyname('tweet_collector')
PORT = 8888 # The port used by the server
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
s.sendall(bytes(query, 'utf-8'))
except ConnectionRefusedError:
return False
return True


# Set a parameter in session state for checking if the query is already
# submitted when there are reruns due to intereaction with other widgets.
# Needed to clear the query submission form in the rerun after submitting
# the query initially.
def is_query_sent():

""" Set a parameter in session state for checking if the query is already
submitted when there are reruns due to intereaction with other widgets.
Needed to clear the query submission form in the rerun after submitting
the query initially.
"""
st.session_state['query_submitted'] = True
st.session_state['title'].title("Tweets Sentiment Analyser Pipeline")



if __name__ == '__main__':


# wait until mongo db is connected properly before insertion
time.sleep(10)

# Initialize a mongodb replica set and config it to have only one
# primary node. Replica set allows to notify db change events such
# as tweet insertion for instance.
# https://pymongo.readthedocs.io/en/stable/examples/high_availability.html?highlight=replica#id1
mongodb_client = pymongo.MongoClient(host="mongodb",
port=27017,
directConnection=True)
# Check if db is replica set if not initialize it and config it.
if not is_replica_set(mongodb_client):
config = {'_id': 'dbrs',
'members': [
{'_id': 0, 'host': 'mongodb:27017'}
]}
mongodb_client.admin.command("replSetInitiate", config)

# Connect ot twitter data base
db = mongodb_client.twitter


# Get the config of streamlit app setup
# and connect ot twitter database.
db = utils.get_mongodb_replica_set().twitter

# Get the config of streamlit app setup.
config = configparser.ConfigParser()
config.read('config.ini')
max_tweets_display = int(config['streamlit_app']['max_tweets_display'])



# Setup the streamlit elements for parameter passing
st.session_state['title'] = st.empty()
st.session_state['title'].title("Tweets Sentiment Analyser Pipeline")


# Form to input the tweet query for tweet API Stream filter method.
form_ph = st.empty()
form = form_ph.form("Query_form")
tweets_streaming_query = form.text_input('Tweets Stream Query', 'China')
query_submitted = form.form_submit_button("Submit", on_click=is_query_sent)

# Check box to post in a particular slack channel and form for inputting
# the web hook url.
enable_slack_post = st.checkbox("Post in slack channel")

if(enable_slack_post):
slack_form_widget_ph = st.empty()

slack_form_widget = slack_form_widget_ph.container()
slack_web_hook_url = slack_form_widget.text_input("Slack channel web hook url", "Enter the url")
slack_web_hook_url = slack_form_widget.text_input(
"Slack channel web hook url", "Enter the url")
slack_invalid_text = slack_form_widget.empty()

tweet_post_widgets = []
# Deque is used instead of list due better performance of rotation
# functionality.
tweet_texts = deque()


# Check if the tweet query is submitted
if "query_submitted" in st.session_state.to_dict() and st.session_state['query_submitted']:
send_query_to_tweet_stream(tweets_streaming_query)
# Check if the tweet query is submitted.
if "query_submitted" in st.session_state.to_dict() and st.session_state[
'query_submitted']:
utils.send_query_to_tweet_stream(tweets_streaming_query)
# Clear the form as only one set of queries are allowed at the start
# TODO this can be removed when Twitter API allows changing the stream
# at run time.
form_ph.empty()

# Header for tweet sentiments feed and this is stored in session state
# to not change across streamlit app reruns.
st.session_state['tweet_feed_title'] = st.empty()
st.session_state['tweet_feed_title'].markdown("""<h2> Real time tweets feed </h2>""", unsafe_allow_html=True)
st.session_state['tweet_feed_title'].markdown(
"""<h2> Real time tweets feed </h2>""", unsafe_allow_html=True)

# Post to slack whenever there is a change in the mongo db
# (here changes are only insertions)
# (here the assumption is that the changes are only insertions)
with db.tweets.watch() as stream:
for change in stream:
# st.write("Enterin watch for loop")
Expand All @@ -129,35 +86,37 @@ def is_query_sent():
tweet_dict = change['fullDocument']
tweet_text = get_tweet_text(tweet_dict)
score = get_score_tweet(tweet_text)
sentiment = get_tweet_sentiment(score)
if(enable_slack_post):
if not slack.post_slack(tweet_text, score,
get_tweet_image_url(tweet_dict), slack_web_hook_url):

slack_invalid_text.markdown("""<p style="color:red"> The slack web url is invalid! Enter a valid one </p>""", unsafe_allow_html=True)


if sentiment == 'negative':
html_color = 'red'
elif sentiment == 'positive':
html_color = 'green'
else:
html_color = 'yellow'


sentiment_html = f'<p style="color:{html_color}">{sentiment}</p>'
# If enable post checkbox is checked than post the tweet
# sentiment in the given slack channel.
if(enable_slack_post):
if not slack.post_slack(
tweet_text, score, get_tweet_image_url(tweet_dict),
slack_web_hook_url):
slack_invalid_text.markdown(
"""<p style="color:red"> The slack web url is invalid! Enter a valid one </p>""",
unsafe_allow_html=True)

sentiment_html = utils.get_sentiment_html(
get_tweet_sentiment(score))
html_text = f"""<div style="margin-bottom: 30px"><span style="word-wrap:break-word;">{tweet_text}\n{sentiment_html}</span><div>"""

# Add tweet sentiments text widgets until the number of these
# widgets reaches max_tweets_display.
if len(tweet_post_widgets) < max_tweets_display:
tweet_post_widgets.append(st.markdown(
html_text, unsafe_allow_html=True))
tweet_texts.append(html_text)
tweet_post_widgets[0].markdown(html_text, unsafe_allow_html=True)
tweet_post_widgets[0].markdown(
html_text, unsafe_allow_html=True)

# When the number of tweets displayed exceeds
# max_tweets_display than shift content of the markdown
# widgets by one downwards and add new one at the top.
for i_text, text in enumerate(tweet_texts):
if i_text < len(tweet_texts) - 1:
tweet_post_widgets[i_text + 1].markdown(text,
unsafe_allow_html=True)

tweet_post_widgets[i_text + 1].markdown(
text, unsafe_allow_html=True)
tweet_texts.rotate(1)
tweet_texts[0] = html_text

Expand Down
30 changes: 23 additions & 7 deletions etl_job/sentimental_analysis.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,45 @@
from enum import Enum

from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer


class Sentiment(Enum):
NEUTRAL = 0,
POSITIVE = 1,
NEGATIVE = -1


def get_score_tweet(tweet_text):
"""Get VaderSentiment score of the tweet text
"""Get VaderSentiment score of the tweet text.
Args:
tweet_text (str): Text for performing sentimental analysis
tweet_text (str): Text for performing sentimental analysis.
Returns:
dict: Score in the form
{'neg': 0.0, 'neu': 1.0, 'pos': 0.0, 'compound': 0.0}
refer https://github.com/cjhutto/vaderSentiment
refer https://github.com/cjhutto/vaderSentiment.
"""

return get_score_tweet.analyzer.polarity_scores(tweet_text)


def get_tweet_sentiment(score):
"""Get the general sentiment from the sentiment score.
Args:
score (float): score from VaderSentimentAnalysis.
Returns:
Sentiment: General sentiment.
"""

if score['compound'] <= -0.05:
return 'negative'
return Sentiment.NEGATIVE
elif score['compound'] >= 0.05:
return 'positive'
return 'neutral'
return Sentiment.POSITIVE

return Sentiment.NEUTRAL


get_score_tweet.analyzer = SentimentIntensityAnalyzer()
85 changes: 85 additions & 0 deletions etl_job/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import socket

import pymongo

from sentimental_analysis import Sentiment


def get_sentiment_html(sentiment):
"""Get the html text for streamlit markdown to display the tweet general
sentiment.
Args:
sentiment (Sentiment): General sentiment.
Returns:
str: html of sentiment red color for Negative, green for Positive and
yellow for Neutral.
"""
if sentiment == Sentiment.NEGATIVE:
html_color = 'red'
elif sentiment == Sentiment.POSITIVE:
html_color = 'green'
else:
html_color = 'yellow'

return f'<p style="color:{html_color}">{sentiment}</p>'

def is_replica_set(mongo_db):
"""Checks if mongoDB client is a replica set.
Args:
mongo_db (pymongo.MongoClient): MongoDB client comprising
of database "tweets".
Returns:
bool: True if a replicaset else false.
"""
try:
mongo_db.admin.command("replSetGetStatus")
return True
except pymongo.errors.OperationFailure:
return False

def get_mongodb_replica_set():
""" Init a mongodb replica set and return the pymongo client.
Returns:
pymongo.MongoClient: pymongo client
"""
mongodb_client = pymongo.MongoClient(host="mongodb",
port=27017,
directConnection=True)
# Check if db is replica set if not initialize it and config it.
if not is_replica_set(mongodb_client):
config = {'_id': 'dbrs',
'members': [
{'_id': 0, 'host': 'mongodb:27017'}
]}
mongodb_client.admin.command("replSetInitiate", config)

return mongodb_client


def send_query_to_tweet_stream(query):
"""Pass the query to tweet collector process via python socket
communication
Args:
query (str): query for filterting tweets in Twitter API tweets
streaming.
Returns:
bool: True when connection and passing of query was successfull.
"""
# The server's hostname or IP address
HOST = socket.gethostbyname('tweet_collector')
PORT = 8888 # The port used by the server
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
s.sendall(bytes(query, 'utf-8'))
except ConnectionRefusedError:
return False
return True

0 comments on commit e9c1801

Please sign in to comment.