-
Notifications
You must be signed in to change notification settings - Fork 15
/
kafka_twitter_spark_streaming.py
45 lines (30 loc) · 1.38 KB
/
kafka_twitter_spark_streaming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
"""
RUNNING PROGRAM;
1-Start Apache Kafka
./kafka/kafka_2.11-0.11.0.0/bin/kafka-server-start.sh ./kafka/kafka_2.11-0.11.0.0/config/server.properties
2-Run kafka_push_listener.py (Start Producer)
ipython >> run kafka_push_listener.py
3-Run kafka_twitter_spark_streaming.py (Start Consumer)
PYSPARK_PYTHON=python3 bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ~/Documents/kafka_twitter_spark_streaming.py
"""
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
if __name__ == "__main__":
#Create Spark Context to Connect Spark Cluster
sc = SparkContext(appName="PythonStreamingKafkaTweetCount")
#Set the Batch Interval is 10 sec of Streaming Context
ssc = StreamingContext(sc, 10)
#Create Kafka Stream to Consume Data Comes From Twitter Topic
#localhost:2181 = Default Zookeeper Consumer Address
kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', {'twitter':1})
#Parse Twitter Data as json
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
#Count the number of tweets per User
user_counts = parsed.map(lambda tweet: (tweet['user']["screen_name"], 1)).reduceByKey(lambda x,y: x + y)
#Print the User tweet counts
user_counts.pprint()
#Start Execution of Streams
ssc.start()
ssc.awaitTermination()