forked from martinhadid/twitter-scraper
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scraper.py
108 lines (93 loc) · 3.39 KB
/
scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!usr/bin/python3
from bs4 import BeautifulSoup
from tweet import Tweet
from user import User
import config
from logger import Logger
import traceback
"""global variable to log info and error to scraper_logs"""
logger = Logger()
class Scraper:
def __init__(self, driver, url):
self.url = url
self.driver = driver
def get_html(self, scroll_time=config.scraper['homepage_scroll_time']):
"""Extract html from the browser"""
self.driver.scroll(self.url, scroll_time)
return BeautifulSoup(self.driver.get_page_source(), 'html.parser')
def get_tweets(self, soup):
"""Extract tweets from twitter's feed"""
return soup.find_all('div', class_='content')
def build_tweet(self, tweet_html):
"""Parse tweet information"""
tweet = Tweet()
try:
tweet.enrich_tweet(tweet_html)
except IndexError:
logger.error('Not a tweet ' + traceback.format_exc())
tweet.false_tweet()
return tweet
def scrape_tweets(self, all_tweets):
"""Get list of parsed tweets with relevant content"""
tweets = []
for tweet_html in all_tweets:
tweet = self.build_tweet(tweet_html)
if tweet:
tweets.append(tweet)
return tweets
def filter_tweets(self, tweets):
"""Filters all tweets to unique tweets to avoid database conflicts."""
final_tweets = []
for tweet in tweets:
if tweet not in final_tweets:
final_tweets.append(tweet)
return final_tweets
def get_usernames(self, tweets):
"""Get list of users to be scraped"""
users = []
for tweet in tweets:
if tweet.username not in users:
users.append(tweet.username)
return users
def scrape_user(self, html, username):
"""Get parsed info from users"""
user = User(username)
user.enrich_user(html)
return user
def user_url(self, user):
"""Get users page url"""
return config.scraper['twitter_url'] + user
def get_extra_usernames(self, users, tweets):
"""Get users from retweeted tweets"""
all_users = self.get_usernames(tweets)
usernames = []
extra_users = []
for user in users:
usernames.append(user.username)
for user in all_users:
if user not in usernames:
extra_users.append(user)
return extra_users
def scrape_all_users(self, usernames):
"""Scrape users info"""
i = 0
users = []
user_tweets = []
for username in usernames:
if not config.test_mode or i < 2:
self.url = self.user_url(username)
html = self.get_html(config.scraper['user_scroll_time'])
users.append(self.scrape_user(html, username))
user_tweets += self.scrape_tweets(self.get_tweets(html))
else:
users.append(User(username))
i += 1
return users, user_tweets
def scrape(self):
"""Function that combines all scrape methods"""
tweets = self.scrape_tweets(self.get_tweets(self.get_html()))
usernames = self.get_usernames(tweets)
users, user_tweets = self.scrape_all_users(usernames)
tweets += user_tweets
tweets = self.filter_tweets(tweets)
return tweets, users