-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathhashtag.py
215 lines (197 loc) · 10 KB
/
hashtag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# -*- coding: utf-8 -*-
"""
@author: Blacklistede
"""
import requests
import random
import time
import argparse
class HashTagResearch():
def __init__(self):
self.s = requests.Session()
# self.s.verify = False
self.browser_headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36',
'Host': 'www.instagram.com',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Upgrade-Insecure-Requests': '1'}
self.request_headers = {'User-Agent' : 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36',
'X-Instagram-AJAX' : '1',
'X-CSRFToken' : 'csrftoken',
'Origin' : 'https://www.instagram.com',
'Referer' : 'https://www.instagram.com'}
self.explore_headers = {'Host' : 'www.instagram.com',
'User-Agent' : 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 Firefox/52.0',
'Accept' : '*/*',
'Accept-Language' : 'en-US;q=0.7,en;q=0.3',
'Accept-Encoding' : 'gzip, deflate, br',
'X-Requested-With' : 'XMLHttpRequest',
'Referer' : 'https://www.instagram.com/',
'Connection' : 'keep-alive'}
self.s.headers = self.browser_headers
self.base_url = 'https://www.instagram.com/'
self.query_url = self.base_url + 'query/'
self.login_url = self.base_url + 'accounts/login/ajax/'
self.search_url = self.base_url + 'web/search/topsearch/'
self.explore_url = self.base_url + 'explore/tags/'
def post_request(self, url, data, **kwargs):
"""Make a POST request"""
request = self.s.post(url, data = data, **kwargs)
return self.analyze_request(request)
def get_request(self, url, params = None, **kwargs):
"""Make a GET request"""
request = self.s.get(url, params = params, **kwargs)
return self.analyze_request(request)
def analyze_request(self, request):
"""Check if the request was successful"""
if request.status_code == 200:
return request
else:
raise requests.HTTPError(str(request.status_code))
def setup_session(self):
"""Go to instagram.com to fetch crsf token and session cookies"""
self.get_request(self.base_url)
self.request_headers['X-CSRFToken'] = self.s.cookies.get_dict()['csrftoken']
def login(self, username, password):
"""Login to Instagram account. Required to explore hashtags"""
payload = {'username': username,
'password': password}
response = self.post_request(self.login_url, data = payload, headers = self.request_headers).json()
if response['status'] == 'ok' and response['authenticated']:
return True
else:
print(response)
raise Exception('Failed to login')
def explore_hashtags(self, hashtag, min_posts = None, max_posts = None):
"""Get recommended hashtags based on a hashtag. Takes a string, returns a list"""
if hashtag[0] != '#':
hashtag = '#' + hashtag
params = {'context' : 'blended',
'query' : hashtag,
'rank_token' : random.uniform(0, 1)}
response = self.get_request(self.search_url, params = params, headers = self.explore_headers)
tag_list = response.json()['hashtags']
tags = []
for tag in tag_list:
if min_posts:
if tag['hashtag']['media_count'] < min_posts:
continue
if max_posts:
if tag['hashtag']['media_count'] > max_posts:
continue
tags.append(tag['hashtag']['name'])
return tags
def trim_hashtags(self, hashtags, amount):
"""Returns a list with the first x items"""
return hashtags[:amount]
def get_hashtag_info(self, hashtag):
"""Get the posts of a hashtag and the like/comment range of the popular pictures"""
url = self.explore_url + str(hashtag) + '/'
params = {'__a': 1}
response = self.get_request(url, params = params).json()
post_amount = response['graphql']['hashtag']['edge_hashtag_to_media']['count']
top_posts = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges']
"""Get min/max comments/likes"""
min_likes, min_comments = 9999999999999, 9999999999999 #very big number, just temporary
max_likes, max_comments = 0, 0
for post in top_posts:
likes = post['node']['edge_liked_by']['count']
comments = post['node']['edge_media_to_comment']['count']
if likes < min_likes:
min_likes = likes
if likes > max_likes:
max_likes = likes
if comments < min_comments:
min_comments = comments
if comments > max_comments:
max_comments = comments
hashtag_info = {'name' : hashtag,
'post_amount' : post_amount,
'min_likes' : min_likes,
'max_likes' : max_likes,
'min_comments' : min_comments,
'max_comments' : max_comments}
return hashtag_info
def main(username, password, hashtags, max_hashtags = None, min_posts = None, max_posts = None, suggestions = True, file = False):
htr = HashTagResearch()
"""Setup"""
htr.setup_session()
htr.login(username, password)
if file:
""" Read file into list """
with open(file) as f:
hashtags = f.read().splitlines()
"""Get suggested hashtags"""
hashtag_list = []
if suggestions:
for tag in hashtags:
print('Getting suggested hashtags for ' + str(str(tag).encode()))
try:
new_tags = htr.explore_hashtags(tag, min_posts = min_posts, max_posts = max_posts)
"""Cut list if max_hashtags are set"""
if max_hashtags:
new_tags = htr.trim_hashtags(new_tags, max_hashtags)
"""Add tags to hashtag list to be analyzed later"""
hashtag_list.extend(new_tags)
except Exception as ex:
print(ex)
print('Error fetching recommended hashtags for ' + str(tag))
time.sleep(1.5)
else:
hashtag_list = hashtags
"""Remove duplicates"""
hashtag_list = list(set(hashtag_list))
"""Analyze hashtags"""
hashtag_infos = []
for tag in hashtag_list:
print('Getting infos for ' + str(str(tag).encode()))
try:
tag_info = htr.get_hashtag_info(tag)
hashtag_infos.append(tag_info)
except Exception as ex:
print(ex)
print('Failed to get informations for ' + str(str(tag).encode()))
error = 'ERROR'
tag_info = {'name' : str(str(tag).encode()),
'post_amount' : error,
'min_likes' : error,
'max_likes' : error,
'min_comments' : error,
'max_comments' : error}
hashtag_infos.append(tag_info)
"""Security delay"""
time.sleep(1.5)
"""Write to file"""
with open('hashtaginfo.csv', 'w+', encoding = 'utf-8') as file:
"""Legend"""
print('Writing to file...')
file.write('HASHTAG,POST AMOUNT,MIN LIKES, MAX LIKES, MIN COMMENTS, MAX COMMENTS\n')
for info in hashtag_infos:
file.write(str(info['name']) + ',' +
str(info['post_amount']) + ',' +
str(info['min_likes']) + ',' +
str(info['max_likes']) + ',' +
str(info['min_comments']) + ',' +
str(info['max_comments']) + '\n')
print('Scraped ' + str(len(hashtag_infos)) + ' tags!')
print('DONE!')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('username', help = 'Your IG username')
parser.add_argument('password', help = 'Your IG password')
parser.add_argument('hashtags', help = 'The hashtags you want to analyze. Seperate by comma. Example: car,boat,plane')
parser.add_argument('--max_tags', help = 'The maximum of suggested tags to fetch per hashtag', type = int)
parser.add_argument('--min_posts', help = 'Check only tags with minimum posts', type = int)
parser.add_argument('--max_posts', help = 'Check only tags with maximum posts', type = int)
parser.add_argument('--nosuggestions', help = 'Don\'t get suggestions for the inputted hashtags', dest = 'suggestions', action = 'store_false')
parser.add_argument('--file', help = 'You need this flag if you are using a .txt file to import hashtags.', dest = 'file', action = 'store_true')
parser.set_defaults(suggestions = True, file = False)
args = parser.parse_args()
if not args.file:
hashtags = list(filter(None, args.hashtags.split(','))) #Create list and remove empty elements
else:
hashtags = []
if args.file:
file = args.hashtags
else:
file = False
main(args.username, args.password, hashtags, args.max_tags, args.min_posts, args.max_posts, args.suggestions, file)