Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solve most congestions issues in /tmp (client & Server) #619

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion dangerzone/conversion/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import sys
import time
from abc import abstractmethod
from typing import Callable, Dict, List, Optional, Tuple, Union
from typing import Callable, Dict, Generator, List, Optional, Tuple, Union

TIMEOUT_PER_PAGE: float = 30 # (seconds)
TIMEOUT_PER_MB: float = 30 # (seconds)
TIMEOUT_MIN: float = 60 # (seconds)

PAGE_BATCH_SIZE = 50 # number of pages to be processed simulatenously


def running_on_qubes() -> bool:
# https://www.qubes-os.org/faq/#what-is-the-canonical-way-to-detect-qubes-vm
Expand Down Expand Up @@ -44,6 +46,24 @@ def calculate_timeout(size: float, pages: Optional[float] = None) -> float:
return timeout


def batch_iterator(num_pages: int) -> Generator[Tuple[int, int], None, None]:
"""Iterates over batches of PAGE_BATCH_SIZE pages"""
for first_page in range(1, num_pages + 1, PAGE_BATCH_SIZE):
if first_page + PAGE_BATCH_SIZE >= num_pages: # Last batch
last_page = num_pages
else:
last_page = first_page + PAGE_BATCH_SIZE - 1
yield (first_page, last_page)


def get_batch_timeout(timeout: Optional[float], num_pages: int) -> Optional[float]:
if timeout is None:
return None
else:
num_batches = int(num_pages / PAGE_BATCH_SIZE) + 1
return timeout / num_batches


class DangerzoneConverter:
def __init__(self, progress_callback: Optional[Callable] = None) -> None:
self.percentage: float = 0.0
Expand Down
149 changes: 85 additions & 64 deletions dangerzone/conversion/doc_to_pixels.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@
import magic

from . import errors
from .common import DangerzoneConverter, running_on_qubes
from .common import (
PAGE_BATCH_SIZE,
DangerzoneConverter,
batch_iterator,
get_batch_timeout,
running_on_qubes,
)

PAGE_BASE = "/tmp/page"


class DocumentToPixels(DangerzoneConverter):
Expand Down Expand Up @@ -276,39 +284,86 @@ async def convert(self) -> None:
# Get a more precise timeout, based on the number of pages
timeout = self.calculate_timeout(size, num_pages)

async def pdftoppm_progress_callback(line: bytes) -> None:
"""Function called for every line the 'pdftoppm' command outputs

Sample pdftoppm output:

$ pdftoppm sample.pdf /tmp/safe -progress
1 4 /tmp/safe-1.ppm
2 4 /tmp/safe-2.ppm
3 4 /tmp/safe-3.ppm
4 4 /tmp/safe-4.ppm

Each successful line is in the format "{page} {page_num} {ppm_filename}"
"""
try:
(page_str, num_pages_str, _) = line.decode().split()
num_pages = int(num_pages_str)
page = int(page_str)
except ValueError as e:
# Ignore all non-progress related output, since pdftoppm sends
# everything to stderr and thus, errors can't be distinguished
# easily. We rely instead on the exit code.
return
timeout_per_batch = get_batch_timeout(timeout, num_pages)
for first_page, last_page in batch_iterator(num_pages):
# XXX send data from the previous loop's conversion to
# always be able to process and send data at the same time
if first_page == 1: # If in first pass
await self.pdf_to_rgb(
first_page, last_page, pdf_filename, timeout_per_batch
)
delayed_send_rgb_files = self.send_rgb_files(
first_page, last_page, num_pages
)
else:
await asyncio.gather(
self.pdf_to_rgb(
first_page, last_page, pdf_filename, timeout_per_batch
),
delayed_send_rgb_files,
)
delayed_send_rgb_files = self.send_rgb_files(
first_page, last_page, num_pages
)

await delayed_send_rgb_files

final_files = (
glob.glob("/tmp/page-*.rgb")
+ glob.glob("/tmp/page-*.width")
+ glob.glob("/tmp/page-*.height")
)

# XXX: Sanity check to avoid situations like #560.
if not running_on_qubes() and len(final_files) != 3 * num_pages:
raise errors.PageCountMismatch()

# Move converted files into /tmp/dangerzone
for filename in final_files:
shutil.move(filename, "/tmp/dangerzone")

self.update_progress("Converted document to pixels")

async def pdf_to_rgb(
self,
first_page: int,
last_page: int,
pdf_filename: str,
timeout: Optional[float],
) -> None:
await self.run_command(
[
"pdftoppm",
pdf_filename,
PAGE_BASE,
"-progress",
"-f",
str(first_page),
"-l",
str(last_page),
],
error_message="Conversion from PDF to PPM failed",
timeout_message=(
f"Error converting from PDF to PPM, pdftoppm timed out after {timeout}"
" seconds"
),
timeout=timeout,
)

async def send_rgb_files(
self, first_page: int, last_page: int, num_pages: int
) -> None:
for page in range(first_page, last_page + 1):
percentage_per_page = 45.0 / num_pages
self.percentage += percentage_per_page
self.update_progress(f"Converting page {page}/{num_pages} to pixels")
self.update_progress(f"Converting pages {page}/{num_pages} to pixels")

zero_padding = "0" * (len(num_pages_str) - len(page_str))
ppm_filename = f"{page_base}-{zero_padding}{page}.ppm"
rgb_filename = f"{page_base}-{page}.rgb"
width_filename = f"{page_base}-{page}.width"
height_filename = f"{page_base}-{page}.height"
filename_base = f"{page_base}-{page}"
zero_padding = "0" * (len(str(num_pages)) - len(str(page)))
ppm_filename = f"{PAGE_BASE}-{zero_padding}{page}.ppm"
rgb_filename = f"{PAGE_BASE}-{page}.rgb"
width_filename = f"{PAGE_BASE}-{page}.width"
height_filename = f"{PAGE_BASE}-{page}.height"
filename_base = f"{PAGE_BASE}-{page}"

with open(ppm_filename, "rb") as f:
# NOTE: PPM files have multiple ways of writing headers.
Expand Down Expand Up @@ -339,40 +394,6 @@ async def pdftoppm_progress_callback(line: bytes) -> None:
# Delete the ppm file
os.remove(ppm_filename)

page_base = "/tmp/page"

await self.run_command(
[
"pdftoppm",
pdf_filename,
page_base,
"-progress",
],
error_message="Conversion from PDF to PPM failed",
timeout_message=(
f"Error converting from PDF to PPM, pdftoppm timed out after {timeout}"
" seconds"
),
stderr_callback=pdftoppm_progress_callback,
timeout=timeout,
)

final_files = (
glob.glob("/tmp/page-*.rgb")
+ glob.glob("/tmp/page-*.width")
+ glob.glob("/tmp/page-*.height")
)

# XXX: Sanity check to avoid situations like #560.
if not running_on_qubes() and len(final_files) != 3 * num_pages:
raise errors.PageCountMismatch()

# Move converted files into /tmp/dangerzone
for filename in final_files:
shutil.move(filename, "/tmp/dangerzone")

self.update_progress("Converted document to pixels")

async def install_libreoffice_ext(self, libreoffice_ext: str) -> None:
self.update_progress(f"Installing LibreOffice extension '{libreoffice_ext}'")
unzip_args = [
Expand Down
10 changes: 10 additions & 0 deletions dangerzone/conversion/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ class PDFtoPPMInvalidDepth(PDFtoPPMException):
error_message = "Error converting PDF to Pixels (Invalid PPM depth)"


class PPMtoPNGError(ConversionException):
error_code = ERROR_SHIFT + 55
error_message = "Document page could not be reassembled from individual pixels"


class InterruptedConversion(ConversionException):
"""Protocol received num of bytes different than expected"""

Expand All @@ -113,6 +118,11 @@ class InterruptedConversion(ConversionException):
)


class PixelFilesMismatch(ConversionException):
error_code = ERROR_SHIFT + 70
error_message = "The number of pages received is different than expected"


class UnexpectedConversionError(PDFtoPPMException):
error_code = ERROR_SHIFT + 100
error_message = "Some unexpected error occurred while converting the document"
Expand Down
96 changes: 40 additions & 56 deletions dangerzone/conversion/pixels_to_pdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,68 +13,44 @@
import sys
from typing import Optional

from .common import DangerzoneConverter, running_on_qubes
from .common import (
DangerzoneConverter,
batch_iterator,
get_batch_timeout,
running_on_qubes,
)


class PixelsToPDF(DangerzoneConverter):
async def convert(
self, ocr_lang: Optional[str] = None, tempdir: Optional[str] = None
self, ocr_lang: Optional[str] = None, tempdir: Optional[str] = "/tmp"
) -> None:
self.percentage = 50.0
if tempdir is None:
tempdir = "/tmp"

num_pages = len(glob.glob(f"{tempdir}/dangerzone/page-*.rgb"))
num_pages = len(glob.glob(f"{tempdir}/page-*.png"))
total_size = 0.0

# Convert RGB files to PDF files
percentage_per_page = 45.0 / num_pages
for page in range(1, num_pages + 1):
filename_base = f"{tempdir}/dangerzone/page-{page}"
rgb_filename = f"{filename_base}.rgb"
width_filename = f"{filename_base}.width"
height_filename = f"{filename_base}.height"
png_filename = f"{tempdir}/page-{page}.png"
ocr_filename = f"{tempdir}/page-{page}"
pdf_filename = f"{tempdir}/page-{page}.pdf"

with open(width_filename) as f:
width = f.read().strip()
with open(height_filename) as f:
height = f.read().strip()
filename_base = f"{tempdir}/page-{page}"
png_filename = f"{filename_base}.png"
pdf_filename = f"{filename_base}.pdf"

# The first few operations happen on a per-page basis.
page_size = os.path.getsize(filename_base + ".rgb") / 1024**2
page_size = os.path.getsize(png_filename) / 1024**2
total_size += page_size
timeout = self.calculate_timeout(page_size, 1)

if ocr_lang: # OCR the document
self.update_progress(
f"Converting page {page}/{num_pages} from pixels to searchable PDF"
)
await self.run_command(
[
"gm",
"convert",
"-size",
f"{width}x{height}",
"-depth",
"8",
f"rgb:{rgb_filename}",
f"png:{png_filename}",
],
error_message=f"Page {page}/{num_pages} conversion to PNG failed",
timeout_message=(
"Error converting pixels to PNG, convert timed out after"
f" {timeout} seconds"
),
timeout=timeout,
)
await self.run_command(
[
"tesseract",
png_filename,
ocr_filename,
filename_base,
"-l",
ocr_lang,
"--dpi",
Expand All @@ -97,11 +73,7 @@ async def convert(
[
"gm",
"convert",
"-size",
f"{width}x{height}",
"-depth",
"8",
f"rgb:{rgb_filename}",
f"png:{png_filename}",
f"pdf:{pdf_filename}",
],
error_message=f"Page {page}/{num_pages} conversion to PDF failed",
Expand All @@ -112,27 +84,39 @@ async def convert(
timeout=timeout,
)

# remove PNG file when it is no longer needed
os.remove(png_filename)

self.percentage += percentage_per_page

# Next operations apply to the all the pages, so we need to recalculate the
# timeout.
timeout = self.calculate_timeout(total_size, num_pages)

# Merge pages into a single PDF
timeout_per_batch = get_batch_timeout(timeout, num_pages)
self.update_progress(f"Merging {num_pages} pages into a single PDF")
args = ["pdfunite"]
for page in range(1, num_pages + 1):
args.append(f"{tempdir}/page-{page}.pdf")
args.append(f"{tempdir}/safe-output.pdf")
await self.run_command(
args,
error_message="Merging pages into a single PDF failed",
timeout_message=(
"Error merging pages into a single PDF, pdfunite timed out after"
f" {timeout} seconds"
),
timeout=timeout,
)
for first_page, last_page in batch_iterator(num_pages):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: what do we gain by batching the pdfunite operations?

At this point, we already have all the pages as PDF files on disk, and that is the main space hog, right? I get that with a single pdfunite call we need double the space, but that's a requirement for the conversion step as well, which takes the original PDF and creates a compressed one, and they both co-exist in the disk.

I have another suggestion here: using PyPDF2 instead, and perform compression online:

from pypdf import PdfReader, PdfWriter

reader = PdfReader("example.pdf")
writer = PdfWriter()

for page in reader.pages:
    writer.add_page(page)

for page in writer.pages:
    # ⚠️ This has to be done on the writer, not the reader!
    page.compress_content_streams()  # This is CPU intensive!

with open("out.pdf", "wb") as f:
    writer.write(f)

(snippet taken from the docs)

With PyPDF2, we can open each PDF, compress it, and immediately create the output PDF, without pdfunite and ps2pdf. We can even delete the underlying PDFs as we process them. The good thing about PyPDF2 is that it's available in Fedora repos as well, so it will work on Qubes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: what do we gain by batching the pdfunite operations?

It's in the commit message. So it's not size-related but rather ulimit related. We could change the ulimit, but since I had already done the batch logic, I figured it could be reused here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh snap. I didn't expect that Qubes had such a low ulimit (nor that pdfunite opens the files at the same time, jeez...).

args = ["pdfunite"]
accumulator = f"{tempdir}/safe-output.pdf" # PDF which accumulates pages
accumulator_temp = f"{tempdir}/safe-output_tmp.pdf"
if first_page > 1: # Append at the beginning
args.append(accumulator)
for page in range(first_page, last_page + 1):
args.append(f"{tempdir}/page-{page}.pdf")
args.append(accumulator_temp)
await self.run_command(
args,
error_message="Merging pages into a single PDF failed",
timeout_message=(
"Error merging pages into a single PDF, pdfunite timed out after"
f" {timeout_per_batch} seconds"
),
timeout=timeout_per_batch,
)
for page in range(first_page, last_page + 1):
os.remove(f"{tempdir}/page-{page}.pdf")
os.rename(accumulator_temp, accumulator)

self.percentage += 2

Expand Down Expand Up @@ -165,7 +149,7 @@ async def main() -> int:
converter = PixelsToPDF()

try:
await converter.convert(ocr_lang)
await converter.convert(ocr_lang, tempdir="/tmp/dangerzone")
error_code = 0 # Success!

except (RuntimeError, TimeoutError, ValueError) as e:
Expand Down
Loading