Skip to content

Commit

Permalink
Merge pull request #27 from am9zZWY/josef-fix-threading
Browse files Browse the repository at this point in the history
Fix threading and gracefully shutting down problems
  • Loading branch information
am9zZWY authored Jul 14, 2024
2 parents aa503a4 + ce96037 commit 93396ae
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 76 deletions.
150 changes: 119 additions & 31 deletions engine/crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import duckdb
from custom_db import *

##### Constants #####
# Maximum size of the links
MAX_SIZE = 20
# Constants
# URL seeds to start crawling from
SEEDS = [
"https://www.tuebingen.de/en/",
Expand Down Expand Up @@ -53,22 +51,35 @@ def __init__(self, dbcon: duckdb.DuckDBPyConnection):
self.cursor = dbcon.cursor()

# Initialize the crawler state
self.found_links = set()
self.urls_crawled = set()
self.ignore_links = set()
self.to_crawl = collections.deque(SEEDS)
self.to_crawl_set = set(self.to_crawl)

# Load the global state
self._load_state()

# Internal state
self._connector = aiohttp.TCPConnector(limit=50, limit_per_host=10)
self._page_count = 0

self.max_size = 1000 # Example maximum size
self.ignore_domains = ["github.com", "linkedin.com", "xing.com", "instagram.com", "twitter.com", "youtube.com",
"de.wikipedia.org", "wikipedia.org", "google.com", "google.de", "google.co.uk",
"amazon.com", "cctue.de", "spotify.com"]
self.langs = ["en", "en-de", "eng", "en-GB", "en-US", "english"]
self.required_keywords = ["tübingen", "tuebingen", "tubingen", "t%C3%BCbingen"]
self.user_agent = ("Modern Search Engines University of Tuebingen Project Crawler ("
"https://uni-tuebingen.de/de/262377)")
self.user_agents = [("Modern Search Engines University of Tuebingen Project Crawler ("
"https://uni-tuebingen.de/de/262377)"),
("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/58.0.3029.110 Safari/537.3")]

self.headers = {
"User-Agent": self.user_agent,
"Accept-Language": "en-US,en;q=0.9,de;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
}

def __del__(self) -> None:
self.cursor.close()
Expand All @@ -87,42 +98,95 @@ async def fetch(self, session, url):
print(f"Error fetching {url}: {e}")
return None

@staticmethod
async def _fetch_with_playwright(url, max_retries=3):
for attempt in range(max_retries):
try:
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.goto(url, wait_until='networkidle')
content = await page.content()
await browser.close()
return content
except Exception as e:
print(f"Error on attempt {attempt + 1} for {url}: {e}")
if attempt == max_retries - 1:
print(f"Max retries reached for {url}")
return None

@staticmethod
def _needs_javascript_rendering(html: str) -> bool:
# Check for JavaScript frameworks
if any(framework in html.lower() for framework in ['react', 'vue', 'angular']):
return True

return False

async def process(self):
async with ClientSession() as session:
while self.to_crawl and len(self.found_links) < self.max_size:
async with ClientSession(connector=self._connector) as session:
while not self.is_shutdown() and self.to_crawl and len(self.urls_crawled) < self.max_size:
# Process multiple links concurrently
tasks = []
for _ in range(min(MAX_SIZE, len(self.to_crawl))):
link = self.to_crawl.popleft()
self.to_crawl_set.remove(link)
tasks.append(self._handle_link(session, link))
await asyncio.gather(*tasks)
for _ in range(min(10, len(self.to_crawl))): # Process up to 10 links at a time
if self.to_crawl and len(self.urls_crawled) < self.max_size:
link = self.to_crawl.popleft()
task = asyncio.create_task(self._handle_link(session, link))
tasks.append(task)
else:
break

if not tasks:
break

# Wait for all tasks to complete or for shutdown
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

# Process completed tasks
for task in done:
try:
await task
except Exception as e:
print(f"Error processing link: {e}")

# Check for shutdown
if self.is_shutdown():
# Cancel pending tasks
for task in pending:
task.cancel()
# Wait for cancellation to complete
await asyncio.gather(*pending, return_exceptions=True)
self.save_state()
break

async def _handle_link(self, session, link):
if len(self.found_links) >= self.max_size:
if len(self.urls_crawled) >= self.max_size:
print("Maximum size reached")
return

crawling_str = f"Crawler crawling {link}: "
print(f"Crawler crawling {link}...")

if not link.startswith("http"):
print(crawling_str + "skipped")
print(f"Invalid URL: {link}")
return

if any(domain in link for domain in self.ignore_domains):
print(crawling_str + "domain in ignore list")
print(f"Ignoring {link} because it is in the ignore domains list")
self.ignore_links.add(link)
return

if link in self.ignore_links or link in self.found_links:
print(crawling_str + "already ignored or visited")
if link in self.ignore_links or link in self.urls_crawled:
print(f"Ignoring {link} because it is in the ignore or found list")
return

if not check_robots(link):
print(crawling_str + "robots.txt disallows")
print(f"Ignoring {link} because it is disallowed by robots.txt")
self.ignore_links.add(link)
return

html_content = await self.fetch(session, link)
if html_content is None:
print(f"Error fetching {link}")
self.ignore_links.add(link)
return

Expand All @@ -135,34 +199,58 @@ async def _handle_link(self, session, link):
check_text_lang = LANG_DETECTOR.detect(text) in self.langs

if not check_html_tag_lang and not check_xml_tag_lang and not check_link_lang and not check_text_lang:
print(crawling_str + "unsupported language")
print(f"Ignoring {link} because it is not in the correct language")
self.ignore_links.add(link)
return

if not any(keyword in text for keyword in self.required_keywords):
print(f"Ignoring {link} because it does not contain the required keywords")
self.ignore_links.add(link)
print(crawling_str + "no required keywords")
return

for a_tag in soup.find_all("a", href=True):
found_link = a_tag.get("href")

# Check if link is a fragment
if found_link.startswith("#"):
continue

# Check if link is relative
if found_link.startswith("/"):
base_url = get_base_url(link)
found_link = base_url + found_link
found_link = get_full_url(base_url, found_link)
elif found_link.startswith("../"):
base_url = get_base_url(link)
found_link = get_full_url(base_url, found_link)

# Check if link is an email
if found_link.startswith("mailto:"):
continue

# Check if link is a phone number
if found_link.startswith("tel:"):
continue

# Check if link is a file
if found_link.endswith((".pdf", ".doc", ".docx", ".ppt", ".pptx", ".xls", ".xlsx", ".csv", ".zip", ".rar",
".tar", ".gz", ".7z", ".mp3", ".mp4", ".avi", ".mkv", ".mov", ".flv", ".wav",
".ogg", ".webm", ".m4a", ".flac", ".aac", ".wma", ".jpg", ".jpeg", ".png", ".gif",
".bmp", ".svg", ".webp")):
continue

if found_link not in self.ignore_links and found_link not in self.found_links and found_link not in self.to_crawl_set and found_link.startswith(
"http"):
if (found_link not in self.ignore_links
and found_link not in self.urls_crawled
and found_link not in self.to_crawl_set
and found_link.startswith("http")):
self.to_crawl.append(found_link)
self.to_crawl_set.add(found_link)

if link not in self.found_links and link not in self.ignore_links:
self.found_links.add(link)
if link not in self.urls_crawled and link not in self.ignore_links:
self.urls_crawled.add(link)

print(crawling_str + "done")
self.call_next(soup, link)
print(f"Finished crawling {link}. Total: {len(self.urls_crawled)} links.")
if not self.is_shutdown():
await self.call_next(soup, link)

def save_state(self):
"""
Expand All @@ -178,7 +266,7 @@ def save_state(self):
f.write(json.dumps({
"to_crawl": list(self.to_crawl),
"ignore_links": list(self.ignore_links),
"found_links": list(self.found_links)
"found_links": list(self.urls_crawled)
}))

def _load_state(self):
Expand All @@ -196,7 +284,7 @@ def _load_state(self):
self.to_crawl = collections.deque(data["to_crawl"])
self.to_crawl_set = set(data["to_crawl"])
self.ignore_links = set(data["ignore_links"])
self.found_links = set(data["found_links"])
self.urls_crawled = set(data["found_links"])


# IMPORTANT: Please use main.py instead of this file
Expand Down
4 changes: 3 additions & 1 deletion engine/custom_tokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class Tokenizer(PipelineElement):
def __init__(self):
super().__init__("Tokenizer")

def process(self, data, link):
async def process(self, data, link):
"""
Tokenizes the input data.
"""
Expand All @@ -121,3 +121,5 @@ def process(self, data, link):

tokenized_text = tokenize_data(data=text)
add_tokens_to_index(url=link, tokenized_text=tokenized_text)

print(f"Tokenized text for {link}")
6 changes: 4 additions & 2 deletions engine/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(self):

self._load_state()

def process(self, data, link):
async def process(self, data, link):
"""
Indexes the input data.
"""
Expand All @@ -32,7 +32,9 @@ def process(self, data, link):
add_title_to_index(url=link, title=title_content)
add_snippet_to_index(url=link, snippet=description_content)

self.call_next(soup, link)
print(f"Indexed {link}")
if not self.is_shutdown():
await self.call_next(soup, link)

def _load_state(self):
"""
Expand Down
83 changes: 54 additions & 29 deletions engine/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,83 @@
Pipeline for Crawling, Tokenizing, and Indexing
"""
from concurrent.futures import ThreadPoolExecutor
import asyncio
import nest_asyncio

# Database
import duckdb
# Pipeline
from crawl import Crawler
from custom_db import index_pages, access_index, save_pages
from custom_tokenizer import Tokenizer
# Async
import asyncio

from index import Indexer

# Constants
MAX_THREADS = 10

if __name__ == "__main__":
con = duckdb.connect("crawlies.db")
# Database setup
con.install_extension("fts")
con.load_extension("fts")
# Patch asyncio to allow nested event loops
nest_asyncio.apply()

# Initialize the pipeline
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
try:
crawler = Crawler(con)
crawler.max_size = 100
crawler.add_executor(executor)
# Database setup
con = duckdb.connect("crawlies.db")
con.install_extension("fts")
con.load_extension("fts")

# Initialize the pipeline elements
crawler = Crawler(con)
crawler.max_size = 1000
indexer = Indexer()
tokenizer = Tokenizer()

# Add the pipeline elements
crawler.add_next(indexer)
indexer.add_next(tokenizer)


def signal_handler(signum, frame):
print("Interrupt received, shutting down... Please wait")
for element in [crawler, indexer, tokenizer]:
element.shutdown()

indexer = Indexer()
indexer.add_executor(executor)

tokenizer = Tokenizer()
tokenizer.add_executor(executor)
signal.signal(signal.SIGINT, signal_handler)

# Add the pipeline elements
crawler.add_next(indexer)
indexer.add_next(tokenizer)

# Start the pipeline
asyncio.run(crawler.process())
except (KeyboardInterrupt, SystemExit):
print("Interrupted")
crawler.save_state()
async def main():
# Initialize the pipeline
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
# Add the executor to the pipeline elements
crawler.add_executor(executor)
indexer.add_executor(executor)
tokenizer.add_executor(executor)

# Start the pipeline
asyncio.run(crawler.process())
try:
await crawler.process()
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Ensure states are saved even if an exception occurs
for element in [crawler, indexer, tokenizer]:
element.save_state()
index_pages()
save_pages()
index_df = access_index()
index_df.to_csv("inverted_index.csv")
con.close()
print("State saved")

# Save the state+
for element in [crawler, indexer, tokenizer]:
element.save_state()
index_pages()
save_pages()
index_df = access_index()
index_df.to_csv("inverted_index.csv")
save_pages()

# Close the connection
con.close()
print("State saved")


if __name__ == "__main__":
asyncio.run(main())
Loading

0 comments on commit 93396ae

Please sign in to comment.