-
Notifications
You must be signed in to change notification settings - Fork 90
/
streaming.py
146 lines (123 loc) · 4.3 KB
/
streaming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from datetime import datetime
import os
import json
from auth import TwitterAuth
import time
import sys
try:
import smtplib
from email.mime.text import MIMEText
except:
sys.stderr.write("Error loading smtplib. Will not be able to send emails...\n")
# Output directory to hold json files (one per day) with tweets
# Within the output directory, the script loads a file named FILTER with the terms to be tracked (one per line)
outputDir = "outputDir"
## End of Settings###
class FileDumperListener(StreamListener):
def __init__(self,filepath):
super(FileDumperListener,self).__init__(self)
self.basePath=filepath
os.system("mkdir -p %s"%(filepath))
d=datetime.today()
self.filename = "%i-%02d-%02d.json"%(d.year,d.month,d.day)
self.fh = open(self.basePath + "/" + self.filename,"a")#open for appending just in case
self.tweetCount=0
self.errorCount=0
self.limitCount=0
self.last=datetime.now()
#This function gets called every time a new tweet is received on the stream
def on_data(self, data):
self.fh.write(data)
self.tweetCount+=1
#Status method prints out vitals every five minutes and also rotates the log if needed
self.status()
return True
def close(self):
try:
self.fh.close()
except:
#Log/email
pass
#Rotate the log file if needed.
#Warning: Check for log rotation only occurs when a tweet is received and not more than once every five minutes.
# This means the log file could have tweets from a neighboring period (especially for sparse streams)
def rotateFiles(self):
d=datetime.today()
filenow = "%i-%02d-%02d.json"%(d.year,d.month,d.day)
if (self.filename!=filenow):
print("%s - Rotating log file. Old: %s New: %s"%(datetime.now(),self.filename,filenow))
try:
self.fh.close()
except:
#Log/Email it
pass
self.filename=filenow
self.fh = open(self.basePath + "/" + self.filename,"a")
def on_error(self, statusCode):
print("%s - ERROR with status code %s"%(datetime.now(),statusCode))
self.errorCount+=1
def on_timeout(self):
raise TimeoutException()
def on_limit(self, track):
print("%s - LIMIT message recieved %s"%(datetime.now(),track))
self.limitCount+=1
def status(self):
now=datetime.now()
if (now-self.last).total_seconds()>300:
print("%s - %i tweets, %i limits, %i errors in previous five minutes."%(now,self.tweetCount,self.limitCount,self.errorCount))
self.tweetCount=0
self.limitCount=0
self.errorCount=0
self.last=now
self.rotateFiles()#Check if file rotation is needed
class TimeoutException(Exception):
pass
if __name__ == '__main__':
while True:
try:
#Create the listener
listener = FileDumperListener(outputDir)
auth = OAuthHandler(TwitterAuth.consumer_key, TwitterAuth.consumer_secret)
auth.set_access_token(TwitterAuth.access_token, TwitterAuth.access_token_secret)
fhTerms = open(outputDir+"/FILTER","r")
terms=[]
for line in fhTerms:
terms.append(line.strip())
print("%s - Starting stream to track %s"%(datetime.now(),",".join(terms)))
#Connect to the Twitter stream
stream = Stream(auth, listener)
#stream.filter(locations=[-0.530, 51.322, 0.231, 51.707])#Tweets from London
stream.filter(track=terms)
except KeyboardInterrupt:
#User pressed ctrl+c or cmd+c -- get ready to exit the program
print("%s - KeyboardInterrupt caught. Closing stream and exiting."%datetime.now())
listener.close()
stream.disconnect()
break
except TimeoutException:
#Timeout error, network problems? reconnect.
print("%s - Timeout exception caught. Closing stream and reopening."%datetime.now())
try:
listener.close()
stream.disconnect()
except:
pass
continue
except Exception as e:
#Anything else
try:
info = str(e)
sys.stderr.write("%s - Unexpected exception. %s\n"%(datetime.now(),info))
msg = MIMEText("Unexpected error in Twitter collector. Check server. %s"%info);
msg['Subject'] = "Unexpected error in Twitter collector"
msg['From'] = "[email protected]"
msg['To'] = email
s = smtplib.SMTP("smtp.example.com") #Update this to your SMTP server
s.sendmail("[email protected]", email, msg.as_string())
s.quit()
except:
pass
time.sleep(1800)#Sleep thirty minutes and resume