From eebecd3ca7889dc5e03765ad25c72482394e9b31 Mon Sep 17 00:00:00 2001 From: Squeaky Date: Fri, 4 Oct 2024 18:20:15 +0200 Subject: [PATCH] Run without gevent #notests --- bin/recompress-raw-mime.py | 45 ++++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/bin/recompress-raw-mime.py b/bin/recompress-raw-mime.py index a6fcaff7a..5af0a7312 100755 --- a/bin/recompress-raw-mime.py +++ b/bin/recompress-raw-mime.py @@ -1,7 +1,4 @@ #!/usr/bin/env python -from gevent import monkey - -monkey.patch_all() import datetime import enum @@ -40,7 +37,7 @@ class Resolution(enum.Enum): # https://stackoverflow.com/questions/73395864/how-do-i-wait-when-all-threadpoolexecutor-threads-are-busy -class RecompressThreadPoolExecutor(ThreadPoolExecutor): +class CompressThreadPoolExecutor(ThreadPoolExecutor): """ThreadPoolExecutor that keeps track of the number of available workers. Refs: @@ -182,8 +179,12 @@ def overwrite_parallel(compressed_raw_mime_by_sha256: "dict[str, bytes]") -> Non ) -def recompress_batch( - recompress_sha256s: "dict[str, int]", *, dry_run=True, compression_level: int = 3 +def compress_batch( + recompress_sha256s: "dict[str, int]", + *, + dry_run=True, + compression_level: int = 3, + recompress: bool = False, ) -> None: if not recompress_sha256s: return @@ -191,9 +192,21 @@ def recompress_batch( data_by_sha256 = { data_sha256: data for data_sha256, data in download_parallel(set(recompress_sha256s)) - if data is not None and not data.startswith(blockstore.ZSTD_MAGIC_NUMBER_PREFIX) + if data is not None } + if recompress: + data_by_sha256 = { + data_sha256: blockstore.maybe_decompress_raw_mime(data) + for data_sha256, data in data_by_sha256.items() + } + else: + data_by_sha256 = { + data_sha256: data + for data_sha256, data in data_by_sha256.items() + if not data.startswith(blockstore.ZSTD_MAGIC_NUMBER_PREFIX) + } + if not data_by_sha256: return @@ -276,6 +289,7 @@ def recompress_batch( "--max-recompress-batch-bytes", type=int, default=MAX_RECOMPRESS_BATCH_BYTES ) @click.option("--fraction", type=str, default=None) +@click.option("--recompress/--no-recompress", default=False) def run( limit: "int | None", after: "str | None", @@ -294,6 +308,7 @@ def run( min_size: "int | None", max_recompress_batch_bytes: int, fraction: "str | None", + recompress: bool, ) -> int: shutting_down = False @@ -317,7 +332,7 @@ def shutdown(signum, frame): assert batch_size > 0 assert recompress_batch_size > 0 - recompress_executor = RecompressThreadPoolExecutor( + compress_executor = CompressThreadPoolExecutor( max_workers=recompress_executor_workers ) @@ -377,12 +392,13 @@ def shutdown(signum, frame): len(recompress_sha256s) >= recompress_batch_size or recompress_bytes > max_recompress_batch_bytes ): - recompress_executor.wait_for_available_worker() - recompress_executor.submit( - recompress_batch, + compress_executor.wait_for_available_worker() + compress_executor.submit( + compress_batch, recompress_sha256s.copy(), dry_run=dry_run, compression_level=compression_level, + recompress=recompress, ) recompress_sha256s.clear() recompress_bytes = 0 @@ -390,11 +406,12 @@ def shutdown(signum, frame): if shutting_down: break - recompress_executor.submit( - recompress_batch, + compress_executor.submit( + compress_batch, recompress_sha256s.copy(), dry_run=dry_run, compression_level=compression_level, + recompress=recompress, ) if shutting_down: @@ -405,7 +422,7 @@ def shutdown(signum, frame): after_id = max_id + 1 - recompress_executor.shutdown(wait=True) + compress_executor.shutdown(wait=True) if __name__ == "__main__":