-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtwitter.py
119 lines (95 loc) · 4.39 KB
/
twitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import urllib.request
import urllib.parse
import json
import oauth2 as oauth
import pickle
'''inspired by https://raw.githubusercontent.com/uwescience/datasci_course_materials/master/assignment1/twitterstream.py '''
class Twitter():
def __init__(self, irc, tpl):
self.irc = irc
self.tpl = tpl
self.api_key = 'WiJMPiiEXSchOlHqbtXCNKQSV'
self.api_secret = 'Fcb2nEPwupqc2bKkVEgOBMcJOiHTDcyz71uFazMO9QF5TW84nd'
self.access_token_key = "2839460820-7E54B2kkY0kEeX1jYU6xEC4SIEkjYF1MxOZEObs"
self.access_token_secret = "pbKjDh0GSpNpc3ua1wc97HXRNnD5FDXNbVcDAwDcP4jhI"
self.signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
self.http_method = "GET"
self.url = "https://api.twitter.com/1.1/statuses/user_timeline.json"
self.oauth_token = oauth.Token(key=self.access_token_key, secret=self.access_token_secret)
self.oauth_consumer = oauth.Consumer(key=self.api_key, secret=self.api_secret)
f = open(self.tpl, 'rb')
self.data = pickle.load(f)
f.close()
self.content_template = {'type':'PRIVMSG','channel': self.irc.master,'message':'','private_messaged':False}
self.new_line_template = '\r\n%s %s :' %(self.content_template['type'],self.content_template['channel'])
#Add 'master_online_status' into run().
def run(self):
print('checking twitter...')
try:
buffer = '%s, you have unread tweets:' %(self.irc.master)
new_tweets = 0
for user in self.data.keys():
req = oauth.Request.from_consumer_and_token(self.oauth_consumer,
token=self.oauth_token,
http_method=self.http_method,
http_url=self.url,
parameters=self.data[user]['parameters'])
req.sign_request(self.signature_method_hmac_sha1, self.oauth_consumer, self.oauth_token)
url = req.to_url()
response = urllib.request.urlopen(url)
response = json.loads(response.read().decode())
max_id = 0
for a in response:
if a['id'] <= self.data[user]['id']:
break
if not a['in_reply_to_status_id']:
buffer += self.new_line_template + '%s: %s' %(a['user']['name'], a['text'])
new_tweets += 1
if a['id'] > max_id:
max_id = a['id']
if max_id > self.data[user]['id']:
self.data[user]['id'] = max_id
if new_tweets > 0:
f = open(self.tpl, 'wb')
pickle.dump(self.data,f)
f.close()
content = self.content_template
content['message'] = buffer
self.irc.send(content)
print('sent')
except:
pass
'''
api_key = 'WiJMPiiEXSchOlHqbtXCNKQSV'
api_secret = 'Fcb2nEPwupqc2bKkVEgOBMcJOiHTDcyz71uFazMO9QF5TW84nd'
access_token_key = "2839460820-7E54B2kkY0kEeX1jYU6xEC4SIEkjYF1MxOZEObs"
access_token_secret = "pbKjDh0GSpNpc3ua1wc97HXRNnD5FDXNbVcDAwDcP4jhI"
signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
http_method = "GET"
url = "https://api.twitter.com/1.1/statuses/user_timeline.json"
parameters = {'screen_name':'nqrse', 'count':10}
oauth_token = oauth.Token(key=access_token_key, secret=access_token_secret)
oauth_consumer = oauth.Consumer(key=api_key, secret=api_secret)
req = oauth.Request.from_consumer_and_token(oauth_consumer,
token=oauth_token,
http_method=http_method,
http_url=url,
parameters=parameters)
req.sign_request(signature_method_hmac_sha1, oauth_consumer, oauth_token)
headers = req.to_header()
print(headers)
url = req.to_url()
print(url)
response = urllib.request.urlopen(url)#, urllib.parse.urlencode(parameters).encode())
response = json.loads(response.read().decode())
for a in response:
print(a['user']['name'], a['text'])
'''
'''
f = open('tweets.txt','wb')
for a in response:
print('entered')
string = str(a['user']['name'] + a['text']+'\r\n').encode()
f.write(string)
f.close()
'''