-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathweb-extract-link.py
166 lines (141 loc) · 5.89 KB
/
web-extract-link.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# ---
# name: web-extract-link
# deployed: true
# title: Website Link Extraction
# description: Returns information for all hyperlinks on one-or-more web pages matching a search string; information includes domain, link, and matching text.
# params:
# - name: url
# type: array
# description: Urls of web pages to search; parameter can be a single url or a comma-delimited list of urls.
# required: true
# - name: search
# type: string
# description: The search string to use to find the corresponding links.
# required: true
# - name: properties
# type: array
# description: The properties to return (defaults to all properties). See "Returns" for a listing of the available properties.
# required: false
# returns:
# - name: domain
# type: string
# description: The domain of the link for the matched item
# - name: link
# type: string
# description: The link of the matched item
# - name: text
# type: string
# description: The text of the matched item
# examples:
# - '"https://www.flex.io", "Contact Us"'
# - '"https://news.ycombinator.com/news?p=1,https://news.ycombinator.com/news?p=2,https://news.ycombinator.com/news?p=3","Show HN"'
# ---
import json
import aiohttp
import asyncio
import urllib
import itertools
from cerberus import Validator
from collections import OrderedDict
from bs4 import BeautifulSoup
def flexio_handler(flex):
# get the input
input = flex.input.read()
try:
input = json.loads(input)
if not isinstance(input, list): raise ValueError
except ValueError:
raise ValueError
# define the expected parameters and map the values to the parameter names
# based on the positions of the keys/values
params = OrderedDict()
params['urls'] = {'required': True, 'validator': validator_list, 'coerce': to_list}
params['search'] = {'required': True, 'type': 'string'}
params['properties'] = {'required': False, 'validator': validator_list, 'coerce': to_list, 'default': '*'}
input = dict(zip(params.keys(), input))
# validate the mapped input against the validator
v = Validator(params, allow_unknown = True)
input = v.validated(input)
if input is None:
raise ValueError
# get the urls to process
search_urls = input['urls']
search_urls = [s.strip() for s in search_urls]
# get the search term to use to find the corresponding links
search_text = input['search']
search_text = " ".join(search_text.split()).lower().strip() # remove leading/trailing/duplicate spaces and convert to lowercase
# get the properties to return and the property map
property_map = OrderedDict()
property_map['domain'] = 'domain'
property_map['link'] = 'link'
property_map['text'] = 'text'
properties = [p.lower().strip() for p in input['properties']]
# if we have a wildcard, get all the properties
if len(properties) == 1 and properties[0] == '*':
properties = list(property_map.keys())
loop = asyncio.get_event_loop()
result = loop.run_until_complete(fetch_all(search_urls, search_text, properties))
# if we don't have any results, return an empty result
if len(result) == 0:
result = [['']]
# return the results
result = json.dumps(result, default=to_string)
flex.output.content_type = "application/json"
flex.output.write(result)
async def fetch_all(search_urls, search_text, properties):
tasks = []
async with aiohttp.ClientSession() as session:
for search_url in search_urls:
tasks.append(fetch(session, search_url, search_text, properties))
content = await asyncio.gather(*tasks)
return list(itertools.chain.from_iterable(content))
async def fetch(session, search_url, search_text, properties):
try:
async with session.get(search_url) as response:
content = await response.text()
return parseContent(content, search_url, search_text, properties)
except Exception:
return []
def parseContent(content, search_url, search_text, properties):
# extract the info and build up the result
result = []
# remove leading/trailing/duplicate spaces and convert to lowercase
cleaned_search_text = " ".join(search_text.split()).lower().strip()
# parse the content and look for anchors
soup = BeautifulSoup(content, "lxml")
for item in soup.findAll('a'):
# get the anchor and item text
anchor_href = item.get('href')
anchor_text = item.text
# remove leading/trailing/duplicate spaces and convert to lowercase
# if the cleaned search text is in the cleaned anchor text, add the item to the result
cleaned_anchor_text = " ".join(anchor_text.split()).lower().strip()
if cleaned_search_text in cleaned_anchor_text:
link = urllib.parse.urljoin(search_url, anchor_href)
domain = urllib.parse.urlparse(link)[1] # second item is the network location part of the url
row = [{'domain': domain, 'link': link, 'text': anchor_text}.get(p,'') for p in properties]
result.append(row)
return result
def validator_list(field, value, error):
if isinstance(value, str):
return
if isinstance(value, list):
for item in value:
if not isinstance(item, str):
error(field, 'Must be a list with only string values')
return
error(field, 'Must be a string or a list of strings')
def to_string(value):
if isinstance(value, (date, datetime)):
return value.isoformat()
if isinstance(value, (Decimal)):
return str(value)
return value
def to_list(value):
# if we have a list of strings, create a list from them; if we have
# a list of lists, flatten it into a single list of strings
if isinstance(value, str):
return value.split(",")
if isinstance(value, list):
return list(itertools.chain.from_iterable(value))
return None