Skip to content

Commit

Permalink
Add option to directly process WARC files
Browse files Browse the repository at this point in the history
  • Loading branch information
benoit74 committed Jul 23, 2024
1 parent 8d28566 commit c4af54a
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 24 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## Added

- Add option to directly process WARC files (#301)

## Changed

- Stop fetching and passing browsertrix crawler version as scraperSuffix to warc2zim (#354)
Expand Down
129 changes: 105 additions & 24 deletions src/zimit/zimit.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import atexit
import json
import logging
import re
import shutil
import signal
import subprocess
import sys
import tarfile
import tempfile
import urllib.parse
from argparse import ArgumentParser
Expand All @@ -19,6 +21,7 @@

import inotify
import inotify.adapters
import requests
from warc2zim.main import main as warc2zim
from zimscraperlib.logging import getLogger
from zimscraperlib.uri import rebuild_uri
Expand All @@ -28,6 +31,7 @@
EXIT_CODE_WARC2ZIM_CHECK_FAILED = 2
EXIT_CODE_CRAWLER_LIMIT_HIT = 11
NORMAL_WARC2ZIM_EXIT_CODE = 100
REQUESTS_TIMEOUT = 10

logger = getLogger(name="zimit", level=logging.INFO)

Expand Down Expand Up @@ -354,6 +358,14 @@ def run(raw_args):
help="Crawler logging configuration",
)

parser.add_argument(
"--warcs",
help="Directly convert WARC archives to ZIM, by-passing the crawling phase. "
"This argument must contain the path or HTTP(S) URL to either warc.gz files or"
"to a tar.gz containing the warc.gz files. Single value with individual "
"path/URLs separated by comma",
)

zimit_args, warc2zim_args = parser.parse_known_args(raw_args)

# pass a scraper suffix to warc2zim so that both zimit and warc2zim versions are
Expand Down Expand Up @@ -458,36 +470,105 @@ def cleanup():
f"Output to tempdir: {temp_root_dir} - "
f"{'will keep' if zimit_args.keep else 'will delete'}"
)
logger.info(f"Running browsertrix-crawler crawl: {cmd_line}")
crawl = subprocess.run(cmd_args, check=False)
if crawl.returncode == EXIT_CODE_CRAWLER_LIMIT_HIT:
logger.info("crawl interupted by a limit")
elif crawl.returncode != 0:
raise subprocess.CalledProcessError(crawl.returncode, cmd_args)

if zimit_args.collection:
warc_directory = temp_root_dir.joinpath(
f"collections/{zimit_args.collection}/archive/"
)
else:
warc_dirs = list(temp_root_dir.rglob("collections/crawl-*/archive/"))
if len(warc_dirs) == 0:
raise RuntimeError(
"Failed to find directory where WARC files have been created"
# if warc files are passed, do not run browsertrix crawler but fetch the files if
# they are provided as an HTTP URL + extract the archive if it is a tar.gz
warc_files: list[Path] = []
if zimit_args.warcs:
for warc_location in [
warc_location.strip() for warc_location in zimit_args.warcs.split(",")
]:
suffix = "".join(Path(urllib.parse.urlparse(warc_location).path).suffixes)
if suffix not in {".tar.gz", ".warc", ".warc.gz"}:
raise Exception(f"Unsupported file at {warc_location}")

filename = tempfile.NamedTemporaryFile(
dir=temp_root_dir,
prefix="warc_",
suffix=suffix,
delete_on_close=False,
)
elif len(warc_dirs) > 1:
logger.info("Found many WARC files directories, only last one will be used")
for directory in warc_dirs:
logger.info(f"- {directory}")
warc_directory = warc_dirs[-1]

if not re.match(r"^https?\://", warc_location):
# warc_location is not a URL, so it is a path, simply add it to the list
if not Path(warc_location).exists():
raise Exception(f"Impossible to find file at {warc_location}")

# if it is a plain warc or warc.gz, simply add it to the list
if suffix in {".warc", ".warc.gz"}:
warc_files.append(Path(warc_location))
continue

# otherwise extract tar.gz but do not delete it afterwards
extract_path = temp_root_dir / f"{filename.name}_files"
logger.info(
f"Extracting WARC(s) from {warc_location} to {extract_path}"
)
with tarfile.open(warc_location, "r:gz") as fh:
# Extract all the contents to the specified directory
fh.extractall(path=extract_path, filter="data")
warc_files.append(Path(extract_path))
continue

# warc_location is a URL, let's download it to a temp name to avoid name
# collisions
warc_file = Path(filename.name)
logger.info(f"Downloading WARC(s) from {warc_location} to {warc_file}")
resp = requests.get(warc_location, timeout=REQUESTS_TIMEOUT)
resp.raise_for_status()
warc_file.write_bytes(resp.content)

# if it is a plain warc or warc.gz, simply add it to the list
if suffix in {".warc", ".warc.gz"}:
warc_files.append(warc_file)
continue

# otherwise extract tar.gz and delete it afterwards
extract_path = temp_root_dir / f"{filename.name}_files"
logger.info(f"Extracting WARC(s) from {warc_file} to {extract_path}")
with tarfile.open(warc_file, "r:gz") as fh:
# Extract all the contents to the specified directory
fh.extractall(path=extract_path, filter="data")
logger.info(f"Deleting archive at {warc_file}")
warc_file.unlink()
warc_files.append(Path(extract_path))

else:

logger.info(f"Running browsertrix-crawler crawl: {cmd_line}")
crawl = subprocess.run(cmd_args, check=False)
if crawl.returncode == EXIT_CODE_CRAWLER_LIMIT_HIT:
logger.info("crawl interupted by a limit")
elif crawl.returncode != 0:
raise subprocess.CalledProcessError(crawl.returncode, cmd_args)

if zimit_args.collection:
warc_files = [
temp_root_dir.joinpath(f"collections/{zimit_args.collection}/archive/")
]

else:
warc_dirs = list(temp_root_dir.rglob("collections/crawl-*/archive/"))
if len(warc_dirs) == 0:
raise RuntimeError(
"Failed to find directory where WARC files have been created"
)
elif len(warc_dirs) > 1:
logger.info(
"Found many WARC files directories, only last one will be used"
)
for directory in warc_dirs:
logger.info(f"- {directory}")
warc_files = [warc_dirs[-1]]

logger.info("")
logger.info("----------")
logger.info(f"Processing WARC files in {warc_directory}")
warc2zim_args.append(str(warc_directory))
logger.info(
f"Processing WARC files in/at "
f'{" ".join(str(warc_file) for warc_file in warc_files)}'
)
warc2zim_args.extend(str(warc_file) for warc_file in warc_files)

num_files = sum(1 for _ in warc_directory.iterdir())
logger.info(f"{num_files} WARC files found")
logger.info(f"Calling warc2zim with these args: {warc2zim_args}")

return warc2zim(warc2zim_args)
Expand Down

0 comments on commit c4af54a

Please sign in to comment.