diff --git a/engine/crawl.py b/engine/crawl.py index cac94c3..ede0ec1 100644 --- a/engine/crawl.py +++ b/engine/crawl.py @@ -20,9 +20,7 @@ import duckdb from custom_db import * -##### Constants ##### -# Maximum size of the links -MAX_SIZE = 20 +# Constants # URL seeds to start crawling from SEEDS = [ "https://www.tuebingen.de/en/", @@ -53,7 +51,7 @@ def __init__(self, dbcon: duckdb.DuckDBPyConnection): self.cursor = dbcon.cursor() # Initialize the crawler state - self.found_links = set() + self.urls_crawled = set() self.ignore_links = set() self.to_crawl = collections.deque(SEEDS) self.to_crawl_set = set(self.to_crawl) @@ -61,14 +59,27 @@ def __init__(self, dbcon: duckdb.DuckDBPyConnection): # Load the global state self._load_state() + # Internal state + self._connector = aiohttp.TCPConnector(limit=50, limit_per_host=10) + self._page_count = 0 + self.max_size = 1000 # Example maximum size self.ignore_domains = ["github.com", "linkedin.com", "xing.com", "instagram.com", "twitter.com", "youtube.com", "de.wikipedia.org", "wikipedia.org", "google.com", "google.de", "google.co.uk", "amazon.com", "cctue.de", "spotify.com"] self.langs = ["en", "en-de", "eng", "en-GB", "en-US", "english"] self.required_keywords = ["tübingen", "tuebingen", "tubingen", "t%C3%BCbingen"] - self.user_agent = ("Modern Search Engines University of Tuebingen Project Crawler (" - "https://uni-tuebingen.de/de/262377)") + self.user_agents = [("Modern Search Engines University of Tuebingen Project Crawler (" + "https://uni-tuebingen.de/de/262377)"), + ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/58.0.3029.110 Safari/537.3")] + + self.headers = { + "User-Agent": self.user_agent, + "Accept-Language": "en-US,en;q=0.9,de;q=0.8", + "Accept-Encoding": "gzip, deflate, br", + "Connection": "keep-alive", + } def __del__(self) -> None: self.cursor.close() @@ -87,42 +98,95 @@ async def fetch(self, session, url): print(f"Error fetching {url}: {e}") return None + @staticmethod + async def _fetch_with_playwright(url, max_retries=3): + for attempt in range(max_retries): + try: + async with async_playwright() as p: + browser = await p.chromium.launch() + page = await browser.new_page() + await page.goto(url, wait_until='networkidle') + content = await page.content() + await browser.close() + return content + except Exception as e: + print(f"Error on attempt {attempt + 1} for {url}: {e}") + if attempt == max_retries - 1: + print(f"Max retries reached for {url}") + return None + + @staticmethod + def _needs_javascript_rendering(html: str) -> bool: + # Check for JavaScript frameworks + if any(framework in html.lower() for framework in ['react', 'vue', 'angular']): + return True + + return False + async def process(self): - async with ClientSession() as session: - while self.to_crawl and len(self.found_links) < self.max_size: + async with ClientSession(connector=self._connector) as session: + while not self.is_shutdown() and self.to_crawl and len(self.urls_crawled) < self.max_size: + # Process multiple links concurrently tasks = [] - for _ in range(min(MAX_SIZE, len(self.to_crawl))): - link = self.to_crawl.popleft() - self.to_crawl_set.remove(link) - tasks.append(self._handle_link(session, link)) - await asyncio.gather(*tasks) + for _ in range(min(10, len(self.to_crawl))): # Process up to 10 links at a time + if self.to_crawl and len(self.urls_crawled) < self.max_size: + link = self.to_crawl.popleft() + task = asyncio.create_task(self._handle_link(session, link)) + tasks.append(task) + else: + break + + if not tasks: + break + + # Wait for all tasks to complete or for shutdown + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + + # Process completed tasks + for task in done: + try: + await task + except Exception as e: + print(f"Error processing link: {e}") + + # Check for shutdown + if self.is_shutdown(): + # Cancel pending tasks + for task in pending: + task.cancel() + # Wait for cancellation to complete + await asyncio.gather(*pending, return_exceptions=True) + self.save_state() + break async def _handle_link(self, session, link): - if len(self.found_links) >= self.max_size: + if len(self.urls_crawled) >= self.max_size: + print("Maximum size reached") return - crawling_str = f"Crawler crawling {link}: " + print(f"Crawler crawling {link}...") if not link.startswith("http"): - print(crawling_str + "skipped") + print(f"Invalid URL: {link}") return if any(domain in link for domain in self.ignore_domains): - print(crawling_str + "domain in ignore list") + print(f"Ignoring {link} because it is in the ignore domains list") self.ignore_links.add(link) return - if link in self.ignore_links or link in self.found_links: - print(crawling_str + "already ignored or visited") + if link in self.ignore_links or link in self.urls_crawled: + print(f"Ignoring {link} because it is in the ignore or found list") return if not check_robots(link): - print(crawling_str + "robots.txt disallows") + print(f"Ignoring {link} because it is disallowed by robots.txt") self.ignore_links.add(link) return html_content = await self.fetch(session, link) if html_content is None: + print(f"Error fetching {link}") self.ignore_links.add(link) return @@ -135,34 +199,58 @@ async def _handle_link(self, session, link): check_text_lang = LANG_DETECTOR.detect(text) in self.langs if not check_html_tag_lang and not check_xml_tag_lang and not check_link_lang and not check_text_lang: - print(crawling_str + "unsupported language") + print(f"Ignoring {link} because it is not in the correct language") self.ignore_links.add(link) return if not any(keyword in text for keyword in self.required_keywords): + print(f"Ignoring {link} because it does not contain the required keywords") self.ignore_links.add(link) - print(crawling_str + "no required keywords") return for a_tag in soup.find_all("a", href=True): found_link = a_tag.get("href") + + # Check if link is a fragment if found_link.startswith("#"): continue + # Check if link is relative if found_link.startswith("/"): base_url = get_base_url(link) - found_link = base_url + found_link + found_link = get_full_url(base_url, found_link) + elif found_link.startswith("../"): + base_url = get_base_url(link) + found_link = get_full_url(base_url, found_link) + + # Check if link is an email + if found_link.startswith("mailto:"): + continue + + # Check if link is a phone number + if found_link.startswith("tel:"): + continue + + # Check if link is a file + if found_link.endswith((".pdf", ".doc", ".docx", ".ppt", ".pptx", ".xls", ".xlsx", ".csv", ".zip", ".rar", + ".tar", ".gz", ".7z", ".mp3", ".mp4", ".avi", ".mkv", ".mov", ".flv", ".wav", + ".ogg", ".webm", ".m4a", ".flac", ".aac", ".wma", ".jpg", ".jpeg", ".png", ".gif", + ".bmp", ".svg", ".webp")): + continue - if found_link not in self.ignore_links and found_link not in self.found_links and found_link not in self.to_crawl_set and found_link.startswith( - "http"): + if (found_link not in self.ignore_links + and found_link not in self.urls_crawled + and found_link not in self.to_crawl_set + and found_link.startswith("http")): self.to_crawl.append(found_link) self.to_crawl_set.add(found_link) - if link not in self.found_links and link not in self.ignore_links: - self.found_links.add(link) + if link not in self.urls_crawled and link not in self.ignore_links: + self.urls_crawled.add(link) - print(crawling_str + "done") - self.call_next(soup, link) + print(f"Finished crawling {link}. Total: {len(self.urls_crawled)} links.") + if not self.is_shutdown(): + await self.call_next(soup, link) def save_state(self): """ @@ -178,7 +266,7 @@ def save_state(self): f.write(json.dumps({ "to_crawl": list(self.to_crawl), "ignore_links": list(self.ignore_links), - "found_links": list(self.found_links) + "found_links": list(self.urls_crawled) })) def _load_state(self): @@ -196,7 +284,7 @@ def _load_state(self): self.to_crawl = collections.deque(data["to_crawl"]) self.to_crawl_set = set(data["to_crawl"]) self.ignore_links = set(data["ignore_links"]) - self.found_links = set(data["found_links"]) + self.urls_crawled = set(data["found_links"]) # IMPORTANT: Please use main.py instead of this file diff --git a/engine/custom_tokenizer.py b/engine/custom_tokenizer.py index b911ea0..9457ac7 100644 --- a/engine/custom_tokenizer.py +++ b/engine/custom_tokenizer.py @@ -103,7 +103,7 @@ class Tokenizer(PipelineElement): def __init__(self): super().__init__("Tokenizer") - def process(self, data, link): + async def process(self, data, link): """ Tokenizes the input data. """ @@ -121,3 +121,5 @@ def process(self, data, link): tokenized_text = tokenize_data(data=text) add_tokens_to_index(url=link, tokenized_text=tokenized_text) + + print(f"Tokenized text for {link}") diff --git a/engine/index.py b/engine/index.py index 0bdf9c2..5b9f1cb 100644 --- a/engine/index.py +++ b/engine/index.py @@ -12,7 +12,7 @@ def __init__(self): self._load_state() - def process(self, data, link): + async def process(self, data, link): """ Indexes the input data. """ @@ -32,7 +32,9 @@ def process(self, data, link): add_title_to_index(url=link, title=title_content) add_snippet_to_index(url=link, snippet=description_content) - self.call_next(soup, link) + print(f"Indexed {link}") + if not self.is_shutdown(): + await self.call_next(soup, link) def _load_state(self): """ diff --git a/engine/main.py b/engine/main.py index 59e35ab..a1c1802 100644 --- a/engine/main.py +++ b/engine/main.py @@ -2,6 +2,8 @@ Pipeline for Crawling, Tokenizing, and Indexing """ from concurrent.futures import ThreadPoolExecutor +import asyncio +import nest_asyncio # Database import duckdb @@ -9,51 +11,74 @@ from crawl import Crawler from custom_db import index_pages, access_index, save_pages from custom_tokenizer import Tokenizer -# Async -import asyncio - from index import Indexer +# Constants MAX_THREADS = 10 -if __name__ == "__main__": - con = duckdb.connect("crawlies.db") - # Database setup - con.install_extension("fts") - con.load_extension("fts") +# Patch asyncio to allow nested event loops +nest_asyncio.apply() - # Initialize the pipeline - with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: - try: - crawler = Crawler(con) - crawler.max_size = 100 - crawler.add_executor(executor) +# Database setup +con = duckdb.connect("crawlies.db") +con.install_extension("fts") +con.load_extension("fts") + +# Initialize the pipeline elements +crawler = Crawler(con) +crawler.max_size = 1000 +indexer = Indexer() +tokenizer = Tokenizer() + +# Add the pipeline elements +crawler.add_next(indexer) +indexer.add_next(tokenizer) + + +def signal_handler(signum, frame): + print("Interrupt received, shutting down... Please wait") + for element in [crawler, indexer, tokenizer]: + element.shutdown() - indexer = Indexer() - indexer.add_executor(executor) - tokenizer = Tokenizer() - tokenizer.add_executor(executor) +signal.signal(signal.SIGINT, signal_handler) - # Add the pipeline elements - crawler.add_next(indexer) - indexer.add_next(tokenizer) - # Start the pipeline - asyncio.run(crawler.process()) - except (KeyboardInterrupt, SystemExit): - print("Interrupted") - crawler.save_state() +async def main(): + # Initialize the pipeline + with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: + # Add the executor to the pipeline elements + crawler.add_executor(executor) + indexer.add_executor(executor) + tokenizer.add_executor(executor) + + # Start the pipeline + asyncio.run(crawler.process()) + try: + await crawler.process() + except Exception as e: + print(f"An error occurred: {e}") + finally: + # Ensure states are saved even if an exception occurs + for element in [crawler, indexer, tokenizer]: + element.save_state() index_pages() + save_pages() index_df = access_index() index_df.to_csv("inverted_index.csv") con.close() print("State saved") + # Save the state+ + for element in [crawler, indexer, tokenizer]: + element.save_state() index_pages() + save_pages() index_df = access_index() index_df.to_csv("inverted_index.csv") - save_pages() - - # Close the connection con.close() + print("State saved") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/engine/pipeline.py b/engine/pipeline.py index 6ddff25..37ba1b4 100644 --- a/engine/pipeline.py +++ b/engine/pipeline.py @@ -1,4 +1,5 @@ -from concurrent.futures import ThreadPoolExecutor, wait +import asyncio +import threading class PipelineElement: @@ -6,6 +7,8 @@ def __init__(self, name): self.name = name self.next = [] self.executor = None + self.tasks = [] + self.shutdown_flag = threading.Event() print(f"Initialized {self.name}") def add_executor(self, executor): @@ -14,13 +17,34 @@ def add_executor(self, executor): def process(self, *args): raise NotImplementedError + def save_state(self): + pass + def add_next(self, next_element): self.next.append(next_element) - def call_next(self, *args): - futures = [] + async def call_next(self, *args): + if not self.next: + print(f"No next elements for {self.name}") + return # No next elements to process + + print(f"Processing next elements for {self.name}") + tasks = [] for element in self.next: - print(f"{self.name} -> {element.name}") - future = element.executor.submit(element.process, *args) - futures.append(future) - wait(futures) # Wait for all futures to complete + if asyncio.iscoroutinefunction(element.process): + # If the process method is a coroutine, create a task + task = asyncio.create_task(element.process(*args)) + else: + # If it's a regular function, run it in the executor + loop = asyncio.get_running_loop() + task = loop.run_in_executor(self.executor, element.process, *args) + tasks.append(task) + + # Wait for all tasks to complete + await asyncio.gather(*tasks) + + def shutdown(self): + self.shutdown_flag.set() + + def is_shutdown(self): + return self.shutdown_flag.is_set() diff --git a/engine/requirements.txt b/engine/requirements.txt index a167548..e850b9a 100644 --- a/engine/requirements.txt +++ b/engine/requirements.txt @@ -1,9 +1,12 @@ +# Automatically generated by https://github.com/damnever/pigar. + beautifulsoup4==4.12.3 +duckdb==1.0.0 eld==1.0.6 Flask==3.0.3 -lxml==5.2.2 -duckdb==1.0.0 +nest-asyncio==1.6.0 nltk==3.8.1 +numpy==2.0.0 pandas==2.2.2 -requests==2.32.3 +playwright==1.45.0 scikit-learn==1.5.1 diff --git a/engine/utils.py b/engine/utils.py index 0e033ea..dbefc6d 100644 --- a/engine/utils.py +++ b/engine/utils.py @@ -1,5 +1,5 @@ import urllib -from urllib.parse import urlparse # Parsing URLs +from urllib.parse import urlparse, urljoin # Parsing URLs # Robots.txt import urllib.robotparser # For checking robots.txt @@ -18,6 +18,20 @@ def get_domain(url: str) -> str: return urlparse(url).netloc +def get_full_url(base_url, relative_url): + """ + Converts a relative URL to a full URL based on the base URL. + + Parameters: + - `base_url` (str): The base URL. + - `relative_url` (str): The relative URL to convert. + + Returns: + - `str`: The full URL. + """ + return urljoin(base_url, relative_url) + + def get_base_url(url: str) -> str: """ Extracts the base URL from a URL. @@ -28,8 +42,8 @@ def get_base_url(url: str) -> str: Returns: - `str`: The base URL of the URL. """ - - return urlparse(url).scheme + "://" + urlparse(url).netloc + parsed_url = urlparse(url) + return f"{parsed_url.scheme}://{parsed_url.netloc}" def check_robots(url: str) -> bool: