diff --git a/.gitignore b/.gitignore index e5c6535..8daaf33 100644 --- a/.gitignore +++ b/.gitignore @@ -10,5 +10,6 @@ ENV/ # pycharm proj .idea -# stray tar.gz files used for manual testing +# stray archive files used for manual testing +*.tar *.tar.gz diff --git a/core/__init__.py b/core/__init__.py index d35f993..609e965 100755 --- a/core/__init__.py +++ b/core/__init__.py @@ -1,4 +1,4 @@ -from .manifest_creator import ImageManifestCreator +from .image_manifest_creator import ImageManifestCreator from .registry import Registry from .extractor import Extractor from .processor import Processor diff --git a/core/extractor.py b/core/extractor.py index 1e9da6d..3455487 100644 --- a/core/extractor.py +++ b/core/extractor.py @@ -6,7 +6,7 @@ import humanfriendly -class Extractor(object): +class Extractor: def __init__(self, logger, archive_path): self._logger = logger.get_child('tar') self._archive_path = os.path.abspath(archive_path) diff --git a/core/image_manifest_creator.py b/core/image_manifest_creator.py new file mode 100644 index 0000000..fae92b9 --- /dev/null +++ b/core/image_manifest_creator.py @@ -0,0 +1,28 @@ +import json + + +class ImageManifestCreator: + def __init__(self, config_path, layers_info, config_info): + self._config_path = config_path + self._layers_info = layers_info + self._config_info = config_info + + def create(self): + manifest = dict() + manifest["schemaVersion"] = 2 + manifest["mediaType"] = "application/vnd.docker.distribution.manifest.v2+json" + manifest["config"] = { + "mediaType": "application/vnd.docker.container.image.v1+json", + "size": self._config_info['size'], + "digest": self._config_info['digest'], + } + manifest["layers"] = [] + for layer_info in self._layers_info: + layer_data = { + "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip", + "size": layer_info['size'], + "digest": layer_info['digest'], + } + manifest["layers"].append(layer_data) + + return json.dumps(manifest) diff --git a/core/manifest_creator.py b/core/manifest_creator.py deleted file mode 100644 index 2117f18..0000000 --- a/core/manifest_creator.py +++ /dev/null @@ -1,43 +0,0 @@ -import os -import hashlib -import json - - -class ImageManifestCreator(object): - def __init__(self, config_path, layers_paths): - self._config_path = config_path - self._layers_paths = layers_paths - - def create(self): - manifest = dict() - manifest["schemaVersion"] = 2 - manifest["mediaType"] = "application/vnd.docker.distribution.manifest.v2+json" - manifest["config"] = dict() - manifest["config"][ - "mediaType" - ] = "application/vnd.docker.container.image.v1+json" - manifest["config"]["size"] = os.path.getsize(self._config_path) - manifest["config"]["digest"] = self._get_digest(self._config_path) - manifest["layers"] = [] - for layer in self._layers_paths: - layer_data = dict() - layer_data["mediaType"] = "application/vnd.docker.image.rootfs.diff.tar" - layer_data["size"] = os.path.getsize(layer) - layer_data["digest"] = self._get_digest(layer) - manifest["layers"].append(layer_data) - - return json.dumps(manifest) - - def _get_digest(self, filepath): - return "sha256:" + self.get_file_sha256(filepath) - - @staticmethod - def get_file_sha256(filepath): - sha256hash = hashlib.sha256() - with open(filepath, "rb") as f: - while True: - data = f.read(65536) - sha256hash.update(data) - if not data: - break - return sha256hash.hexdigest() diff --git a/core/processor.py b/core/processor.py index 4e6e33b..277fd15 100644 --- a/core/processor.py +++ b/core/processor.py @@ -2,6 +2,7 @@ import multiprocessing.pool import time import os.path +import shutil import json import humanfriendly @@ -10,14 +11,17 @@ from . import extractor -class Processor(object): +class Processor: def __init__( self, logger, + tmp_dir, + tmp_dir_override, parallel, registry_url, archive_path, stream=False, + gzip_layers=False, login=None, password=None, ssl_verify=True, @@ -26,6 +30,8 @@ def __init__( ): self._logger = logger self._parallel = parallel + self._tmp_dir = tmp_dir + self._tmp_dir_override = tmp_dir_override if parallel > 1 and stream: self._logger.info( @@ -36,6 +42,7 @@ def __init__( self._registry = registry.Registry( logger=self._logger, + gzip_layers=gzip_layers, registry_url=registry_url, stream=stream, login=login, @@ -55,8 +62,14 @@ def process(self): """ start_time = time.time() results = [] - with tempfile.TemporaryDirectory() as tmp_dir_name: - + if self._tmp_dir_override: + tmp_dir_name = self._tmp_dir_override + os.mkdir(tmp_dir_name, 0o700) + else: + tmp_dir_name = tempfile.mkdtemp(dir=self._tmp_dir) + + # since we're not always using TemporaryDirectory we're making and cleaning up ourselves + try: self._logger.info( 'Processing archive', archive_path=self._extractor.archive_path, @@ -83,9 +96,12 @@ def process(self): pool.close() pool.join() - # this will throw if any pool worker caught an exception - for res in results: - res.get() + # this will throw if any pool worker caught an exception + for res in results: + res.get() + finally: + shutil.rmtree(tmp_dir_name) + self._logger.verbose('Removed workdir', tmp_dir_name=tmp_dir_name) elapsed = time.time() - start_time self._logger.info( @@ -95,11 +111,16 @@ def process(self): ) @staticmethod - def _get_manifest(tmp_dir_name): - with open(os.path.join(tmp_dir_name, 'manifest.json'), 'r') as fh: + def _get_manifest(archive_dir): + with open(os.path.join(archive_dir, 'manifest.json'), 'r') as fh: manifest = json.loads(fh.read()) return manifest + @staticmethod + def _write_manifest(archive_dir, contents): + with open(os.path.join(archive_dir, 'manifest.json'), 'w') as fh: + json.dump(contents, fh) + # # Global wrappers to use with multiprocessing.pool.Pool which can't pickle instance methods diff --git a/core/registry.py b/core/registry.py index 4964ae5..45fe569 100644 --- a/core/registry.py +++ b/core/registry.py @@ -1,23 +1,40 @@ import os import os.path import re -import json import hashlib import urllib.parse import time import threading - import humanfriendly import requests import requests.auth -from . import manifest_creator +from . import image_manifest_creator +import utils.helpers +import utils.gzip_stream + + +class LayersLock: + def __init__(self): + self._global_lock = threading.Lock() + self._layers_locks = {} + + def get_lock(self, key): + + # global lock to check if layer was done + self._global_lock.acquire() + try: + self._layers_locks.setdefault(key, threading.Lock()) + return self._layers_locks[key] + finally: + self._global_lock.release() -class Registry(object): +class Registry: def __init__( self, logger, + gzip_layers, registry_url, stream=False, login=None, @@ -27,6 +44,7 @@ def __init__( replace_tags_target=None, ): self._logger = logger.get_child('registry') + self._gzip_layers = gzip_layers # enrich http suffix if missing if urllib.parse.urlparse(registry_url).scheme not in ['http', 'https']: @@ -42,7 +60,7 @@ def __init__( if self._login: self._basicauth = requests.auth.HTTPBasicAuth(self._login, self._password) - self._layer_locks = {} + self._layers_lock = LayersLock() self._logger.debug( 'Initialized', @@ -60,12 +78,12 @@ def process_image(self, tmp_dir_name, image_config): Processing a single image entry from extracted files - pushing to registry """ repo_tags = image_config["RepoTags"] - config_filename = image_config["Config"] - config_path = os.path.join(tmp_dir_name, config_filename) + config_path_relative = image_config["Config"] + config_path = os.path.join(tmp_dir_name, config_path_relative) self._logger.info('Processing image', repo_tags=repo_tags) image_start_time = time.time() - config_parsed = self._load_json_file(config_path) + config_parsed = utils.helpers.load_json_file(config_path) # warning - spammy self._logger.verbose('Parsed image config', config_parsed=config_parsed) @@ -74,7 +92,7 @@ def process_image(self, tmp_dir_name, image_config): repo_tag_start_time = time.time() image, tag = self._parse_image_tag(repo) self._logger.info( - 'Pushing image repo and tag', + 'Processing image repo and tag', image=image, tag=tag, tmp_dir_name=tmp_dir_name, @@ -82,31 +100,43 @@ def process_image(self, tmp_dir_name, image_config): # push individual image layers layers = image_config["Layers"] + manifest_layer_info = [] for layer in layers: - self._process_layer(layer, image, tmp_dir_name) + layer_digest, layer_size = self._process_layer( + layer, image, tmp_dir_name + ) + manifest_layer_info.append( + { + 'digest': layer_digest, + 'size': layer_size, + } + ) # then, push image config self._logger.info( - 'Pushing image config', image=image, config_loc=config_filename + 'Pushing image config', + image=image, + config_path=config_path, ) - push_url = self._initialize_push(image) - self._push_config(config_path, push_url) - - # keep the pushed layers - properly_formatted_layers = [ - os.path.join(tmp_dir_name, layer) for layer in layers - ] + push_url = self._initialize_push(image) + digest, size = self._push_config(config_path, push_url) + config_info = {'digest': digest, 'size': size} # Now we need to create and push a manifest for the image - creator = manifest_creator.ImageManifestCreator( - config_path, properly_formatted_layers - ) - image_manifest = creator.create() # Override tags if needed: from --replace-tags-match and --replace-tags-target tag = self._replace_tag(image, tag) - self._logger.info('Pushing image tag manifest', image=image, tag=tag) + image_manifest = image_manifest_creator.ImageManifestCreator( + config_path, manifest_layer_info, config_info + ).create() + + self._logger.debug( + 'Pushing image tag manifest', + image=image, + tag=tag, + image_manifest=image_manifest, + ) self._push_manifest(image_manifest, image, tag) repo_tag_elapsed = time.time() - repo_tag_start_time self._logger.info( @@ -123,7 +153,7 @@ def process_image(self, tmp_dir_name, image_config): elapsed=humanfriendly.format_timespan(image_elapsed), ) - def _conditional_print(self, what, end=None): + def _stream_print(self, what, end=None): if self._stream: if end: print(what, end=end) @@ -134,7 +164,7 @@ def _push_manifest(self, manifest, image, tag): headers = { "Content-Type": "application/vnd.docker.distribution.manifest.v2+json" } - url = self._registry_url + "/v2/" + image + "/manifests/" + tag + url = f'{self._registry_url}/v2/{image}/manifests/{tag}' response = requests.put( url, headers=headers, @@ -146,6 +176,7 @@ def _push_manifest(self, manifest, image, tag): self._logger.log_and_raise( 'error', 'Failed to push manifest', + manifest=manifest, image=image, tag=tag, status_code=response.status_code, @@ -159,25 +190,60 @@ def _process_layer(self, layer, image, tmp_dir_name): # pushing the layer in parallel from different images might result in 500 internal server error self._logger.debug('Acquiring layer lock', layer_key=layer_key) - self._layer_locks.setdefault(layer_key, threading.Lock()) - self._layer_locks[layer_key].acquire() + self._layers_lock.get_lock(layer_key).acquire() try: + layer_path = os.path.abspath(os.path.join(tmp_dir_name, layer)) + + digest = utils.helpers.get_digest(layer_path) + self._logger.debug( + 'Check if layer exists in registry', + layer=layer, + image=image, + digest=digest, + ) + response = requests.head( + f'{self._registry_url}/v2/{image}/blobs/{digest}', + auth=self._basicauth, + verify=self._ssl_verify, + ) + + if response.status_code == 200: + size = int(response.headers['Content-Length']) + digest = response.headers['Docker-Content-Digest'] + self._logger.info( + 'Layer exists in registry, skipping', + layer=layer, + image=image, + size=size, + digest=digest, + ) + return digest, size + self._logger.debug( + 'Layer does not exist and will be pushed', + layer=layer, + image=image, + response_code=response.status_code, + response_content=response.content, + ) + self._logger.info('Pushing layer', layer=layer) push_url = self._initialize_push(image) - layer_path = os.path.join(tmp_dir_name, layer) - self._push_layer(layer_path, push_url) + + digest, size = self._push_layer(layer_path, push_url) + self._logger.info('Layer pushed', layer=layer, digest=digest, size=size) + return digest, size finally: self._logger.debug('Releasing layer lock', layer_key=layer_key) - self._layer_locks[layer_key].release() + self._layers_lock.get_lock(layer_key).release() def _initialize_push(self, repository): """ - Request a push URL for the image repository for a layer or manifest + Request starting an upload for the image repository for a layer or manifest """ self._logger.debug('Initializing push', repository=repository) response = requests.post( - self._registry_url + "/v2/" + repository + "/blobs/uploads/", + f'{self._registry_url}/v2/{repository}/blobs/uploads/', auth=self._basicauth, verify=self._ssl_verify, ) @@ -195,73 +261,123 @@ def _initialize_push(self, repository): return upload_url def _push_layer(self, layer_path, upload_url): - self._chunked_upload(layer_path, upload_url) + return self._chunked_upload(layer_path, upload_url, gzip=self._gzip_layers) - def _push_config(self, layer_path, upload_url): - self._chunked_upload(layer_path, upload_url) + def _push_config(self, config_path, upload_url): + return self._chunked_upload(config_path, upload_url) + + def _chunked_upload(self, file_path, initial_url, gzip=False): + content_path = os.path.abspath(file_path) + total_size = os.stat(content_path).st_size + + total_pushed_size = 0 + length_read = 0 + self._logger.debug( + 'Pushing chunked content', + file_path=file_path, + initial_url=initial_url, + total_size=humanfriendly.format_size(total_size, binary=True), + gzip=gzip, + ) - def _chunked_upload(self, filepath, url): - content_path = os.path.abspath(filepath) - content_size = os.stat(content_path).st_size with open(content_path, "rb") as f: index = 0 + upload_url = initial_url headers = {} - upload_url = url sha256hash = hashlib.sha256() - for chunk in self._read_in_chunks(f, sha256hash): - if "http" not in upload_url: - upload_url = self._registry_url + upload_url + if gzip: + f_read = utils.gzip_stream.GZIPCompressedStream(f, compression_level=7) + headers['Content-Encoding'] = 'gzip' + else: + f_read = f + for chunk in self._read_in_chunks(f_read): + length_read += len(chunk) offset = index + len(chunk) + sha256hash.update(chunk) + headers['Content-Type'] = 'application/octet-stream' headers['Content-Length'] = str(len(chunk)) - headers['Content-Range'] = '%s-%s' % (index, offset) + headers['Content-Range'] = f'{index}-{offset}' + index = offset - last = False - if offset == content_size: - last = True try: - self._conditional_print( + self._stream_print( "Pushing... " - + str(round((offset / content_size) * 100, 2)) + + str(round((offset / total_size) * 100, 2)) + "% ", end="\r", ) - if last: - digest_str = str(sha256hash.hexdigest()) - requests.put( - f"{upload_url}&digest=sha256:{digest_str}", - data=chunk, - headers=headers, - auth=self._basicauth, - verify=self._ssl_verify, - ) - else: - response = requests.patch( - upload_url, - data=chunk, - headers=headers, - auth=self._basicauth, - verify=self._ssl_verify, + + response = requests.patch( + upload_url, + data=chunk, + headers=headers, + auth=self._basicauth, + verify=self._ssl_verify, + ) + + if response.status_code != 202: + self._logger.log_and_raise( + 'error', + 'Failed to upload chunk', + filepath=file_path, + status_code=response.status_code, + content=response.content, ) - if "Location" in response.headers: - upload_url = response.headers["Location"] + + if "Location" in response.headers: + upload_url = response.headers["Location"] + + total_pushed_size += len(chunk) except Exception as exc: - self._logger.error( - 'Failed to upload file image upload', filepath=filepath, exc=exc + self._logger.log_and_raise( + 'error', + 'Failed to upload file', + filepath=file_path, + exc=exc, ) - raise - f.close() - self._conditional_print("") + # request the finalizing put, no body + headers = {'Content-Length': "0"} + + # we compressed so don't know when the last was + digest = f'sha256:{str(sha256hash.hexdigest())}' + response = requests.put( + f"{upload_url}&digest={digest}", + headers=headers, + auth=self._basicauth, + verify=self._ssl_verify, + ) + if response.status_code != 201: + self._logger.log_and_raise( + 'error', + 'Failed to complete upload retroactively', + digest=digest, + filepath=file_path, + status_code=response.status_code, + content=response.content, + ) + + response_digest = response.headers["Docker-Content-Digest"] + + if response_digest != digest or response_digest is None: + self._logger.log_and_raise( + 'error', + 'Server-side digest different from client digest', + response_digest=response_digest, + digest=digest, + filepath=file_path, + ) + + self._stream_print("") + return response_digest, total_pushed_size - # chunk size default 2T (??) @staticmethod - def _read_in_chunks(file_object, hashed, chunk_size=2097152): + def _read_in_chunks(file_object, chunk_size=256 * (1024 ** 2)): while True: data = file_object.read(chunk_size) - hashed.update(data) if not data: break yield data @@ -294,8 +410,3 @@ def _replace_tag(self, image, orig_tag): ) return orig_tag - - @staticmethod - def _load_json_file(filepath): - with open(filepath, 'r') as fh: - return json.loads(fh.read()) diff --git a/dockerregistrypusher.py b/dockerregistrypusher.py index 595c96b..d255929 100644 --- a/dockerregistrypusher.py +++ b/dockerregistrypusher.py @@ -28,10 +28,13 @@ def run(args): # initialize and start processing processor = core.Processor( logger=logger, + tmp_dir=args.tmp_dir, + tmp_dir_override=args.tmp_dir_override, parallel=args.parallel, registry_url=args.registry_url, archive_path=args.archive_path, stream=args.stream, + gzip_layers=args.gzip_layers, login=args.login, password=args.password, ssl_verify=args.ssl_verify, @@ -68,11 +71,27 @@ def register_arguments(parser): default=1, ) + parser.add_argument( + '-td', + '--tmp-dir', + help='Create the temporary workspace inside this dir (optional)', + type=str, + required=False, + ) + + parser.add_argument( + '-tdo', + '--tmp-dir-override', + help='Use this dir as the temporary workspace (optional)', + type=str, + required=False, + ) + parser.add_argument( 'archive_path', metavar='ARCHIVE_PATH', type=str, - help='The url of the target registry to push to', + help='The path of the images archive to push', ) parser.add_argument( @@ -107,6 +126,13 @@ def register_arguments(parser): default=True, ) + parser.add_argument( + '--gzip-layers', + help='Gzip all layers (pre-processing) before pushing', + type=bool, + default=True, + ) + parser.add_argument( '--replace-tags-match', help='A regex string to match on tags. If matches will be replaces with --replace-tags-target', diff --git a/utils/gzip_stream.py b/utils/gzip_stream.py new file mode 100644 index 0000000..4a2cc4a --- /dev/null +++ b/utils/gzip_stream.py @@ -0,0 +1,86 @@ +import gzip +import io +from typing import BinaryIO + + +class GZIPCompressedStream(io.RawIOBase): + def __init__(self, stream: BinaryIO, *, compression_level: int): + assert 1 <= compression_level <= 9 + + self._compression_level = compression_level + self._stream = stream + + self._compressed_stream = io.BytesIO() + + # writes into self._compressed_stream + self._compressor = gzip.GzipFile( + mode='wb', fileobj=self._compressed_stream, compresslevel=compression_level + ) + + # because of the GZIP header written by GzipFile.__init__ + self._compressed_stream.seek(0) + + @property + def compression_level(self) -> int: + return self._compression_level + + @property + def stream(self) -> BinaryIO: + return self._stream + + def readable(self) -> bool: + return True + + def readinto(self, b: bytearray) -> int: + b = memoryview(b) + + offset = 0 + size = len(b) + while offset < size: + offset += self._read_compressed_into(b[offset:]) + if offset < size: + + # self._compressed_buffer now empty + if self._compressor.closed: + + # nothing to compress anymore + break + + # compress next bytes + self._read_and_compress(size) + + return offset + + def _read_compressed_into(self, b: memoryview) -> int: + buf = self._compressed_stream.read(len(b)) + b[: len(buf)] = buf + return len(buf) + + def _read_and_compress(self, size: int): + assert size > 0 + + data = self._stream.read(size) + + # rewind buffer to the start to free up memory + # (because anything currently in the buffer should be already + # streamed off the object) + self._compressed_stream.seek(0) + self._compressed_stream.truncate(0) + + if data: + self._compressor.write(data) + else: + + # write final data (will flush zlib with Z_FINISH) + self._compressor.close() + + # rewind to buffer start + self._compressed_stream.seek(0) + + def __repr__(self) -> str: + return ( + '{self.__class__.__name__}(' + '{self.stream!r}, ' + 'compression_level={self.compression_level!r}' + ')' + ).format(self=self) diff --git a/utils/helpers.py b/utils/helpers.py new file mode 100644 index 0000000..3c8ef63 --- /dev/null +++ b/utils/helpers.py @@ -0,0 +1,27 @@ +import hashlib +import json + + +def get_digest(filepath): + return "sha256:" + get_file_sha256(filepath) + + +def load_json_file(filepath): + with open(filepath, 'r') as fh: + return json.loads(fh.read()) + + +def dump_json_file(filepath, json_contents): + with open(filepath, 'w') as fh: + json.dump(json_contents, fh) + + +def get_file_sha256(filepath): + sha256hash = hashlib.sha256() + with open(filepath, "rb") as f: + while True: + data = f.read(65536) + sha256hash.update(data) + if not data: + break + return sha256hash.hexdigest()