From bbc1387271bfc13e69331c63c8e62cbe2ef22eb7 Mon Sep 17 00:00:00 2001 From: omesser Date: Mon, 1 Mar 2021 22:00:10 +0200 Subject: [PATCH 01/49] ? --- core/manifest_creator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/manifest_creator.py b/core/manifest_creator.py index 2117f18..02f7d55 100644 --- a/core/manifest_creator.py +++ b/core/manifest_creator.py @@ -21,7 +21,7 @@ def create(self): manifest["layers"] = [] for layer in self._layers_paths: layer_data = dict() - layer_data["mediaType"] = "application/vnd.docker.image.rootfs.diff.tar" + layer_data["mediaType"] = "application/vnd.docker.image.rootfs.diff.tar.gzip" layer_data["size"] = os.path.getsize(layer) layer_data["digest"] = self._get_digest(layer) manifest["layers"].append(layer_data) From 0ba55cdea23fd8dd5f222bdcb95864bbe2d979eb Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 03:59:07 +0200 Subject: [PATCH 02/49] M --- core/manifest_creator.py | 20 +++++++------- core/registry.py | 60 +++++++++++++++++++++++++++++----------- 2 files changed, 54 insertions(+), 26 deletions(-) diff --git a/core/manifest_creator.py b/core/manifest_creator.py index 02f7d55..ce2cf79 100644 --- a/core/manifest_creator.py +++ b/core/manifest_creator.py @@ -12,18 +12,18 @@ def create(self): manifest = dict() manifest["schemaVersion"] = 2 manifest["mediaType"] = "application/vnd.docker.distribution.manifest.v2+json" - manifest["config"] = dict() - manifest["config"][ - "mediaType" - ] = "application/vnd.docker.container.image.v1+json" - manifest["config"]["size"] = os.path.getsize(self._config_path) - manifest["config"]["digest"] = self._get_digest(self._config_path) + manifest["config"] = { + "mediaType": "application/vnd.docker.container.image.v1+json", + "size": os.path.getsize(self._config_path), + "digest": self._get_digest(self._config_path), + } manifest["layers"] = [] for layer in self._layers_paths: - layer_data = dict() - layer_data["mediaType"] = "application/vnd.docker.image.rootfs.diff.tar.gzip" - layer_data["size"] = os.path.getsize(layer) - layer_data["digest"] = self._get_digest(layer) + layer_data = { + "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip", + "size": os.path.getsize(layer), + "digest": self._get_digest(layer), + } manifest["layers"].append(layer_data) return json.dumps(manifest) diff --git a/core/registry.py b/core/registry.py index 4964ae5..4d59833 100644 --- a/core/registry.py +++ b/core/registry.py @@ -6,6 +6,7 @@ import urllib.parse import time import threading +import zlib import humanfriendly import requests @@ -134,7 +135,7 @@ def _push_manifest(self, manifest, image, tag): headers = { "Content-Type": "application/vnd.docker.distribution.manifest.v2+json" } - url = self._registry_url + "/v2/" + image + "/manifests/" + tag + url = f'{self._registry_url}/v2/{image}/manifests/{tag}' response = requests.put( url, headers=headers, @@ -172,12 +173,12 @@ def _process_layer(self, layer, image, tmp_dir_name): def _initialize_push(self, repository): """ - Request a push URL for the image repository for a layer or manifest + Request starting an upload for the image repository for a layer or manifest """ self._logger.debug('Initializing push', repository=repository) response = requests.post( - self._registry_url + "/v2/" + repository + "/blobs/uploads/", + f'{self._registry_url}/v2/{repository}/blobs/uploads/', auth=self._basicauth, verify=self._ssl_verify, ) @@ -195,27 +196,28 @@ def _initialize_push(self, repository): return upload_url def _push_layer(self, layer_path, upload_url): - self._chunked_upload(layer_path, upload_url) + self._chunked_upload(layer_path, upload_url, gzip=True) def _push_config(self, layer_path, upload_url): self._chunked_upload(layer_path, upload_url) - def _chunked_upload(self, filepath, url): + def _chunked_upload(self, filepath, upload_url, gzip=False): content_path = os.path.abspath(filepath) content_size = os.stat(content_path).st_size with open(content_path, "rb") as f: index = 0 headers = {} - upload_url = url sha256hash = hashlib.sha256() for chunk in self._read_in_chunks(f, sha256hash): - if "http" not in upload_url: - upload_url = self._registry_url + upload_url + if gzip: + chunk = zlib.compress(chunk) + headers['Content-Encoding'] = 'gzip' + offset = index + len(chunk) headers['Content-Type'] = 'application/octet-stream' headers['Content-Length'] = str(len(chunk)) - headers['Content-Range'] = '%s-%s' % (index, offset) + headers['Content-Range'] = f'{index}-{offset}' index = offset last = False if offset == content_size: @@ -227,15 +229,26 @@ def _chunked_upload(self, filepath, url): + "% ", end="\r", ) + + # complete the upload if last: - digest_str = str(sha256hash.hexdigest()) - requests.put( - f"{upload_url}&digest=sha256:{digest_str}", + digest = f'sha256:{str(sha256hash.hexdigest())}' + response = requests.put( + f"{upload_url}&digest={digest}", data=chunk, headers=headers, auth=self._basicauth, verify=self._ssl_verify, ) + if response.status_code != 201: + self._logger.log_and_raise( + 'error', + 'Failed to complete upload', + digest=digest, + filepath=filepath, + status_code=response.status_code, + content=response.content, + ) else: response = requests.patch( upload_url, @@ -244,21 +257,36 @@ def _chunked_upload(self, filepath, url): auth=self._basicauth, verify=self._ssl_verify, ) + + if response.status_code != 202: + self._logger.log_and_raise( + 'error', + 'Failed to upload chunk', + digest=digest, + filepath=filepath, + status_code=response.status_code, + content=response.content, + ) + if "Location" in response.headers: upload_url = response.headers["Location"] except Exception as exc: - self._logger.error( - 'Failed to upload file image upload', filepath=filepath, exc=exc + self._logger.log_and_raise( + 'error', + 'Failed to upload file', + filepath=filepath, + exc=exc, ) - raise f.close() self._conditional_print("") - # chunk size default 2T (??) @staticmethod def _read_in_chunks(file_object, hashed, chunk_size=2097152): + """ + Chunk size default 2T (monolithic upload) + """ while True: data = file_object.read(chunk_size) hashed.update(data) From 9e3271ca6d301ced44e72a5215656aa62bf284e1 Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 05:59:01 +0200 Subject: [PATCH 03/49] m --- core/manifest_creator.py | 10 ++++----- core/registry.py | 46 ++++++++++++++++++++++++---------------- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/core/manifest_creator.py b/core/manifest_creator.py index ce2cf79..dce77c2 100644 --- a/core/manifest_creator.py +++ b/core/manifest_creator.py @@ -4,9 +4,9 @@ class ImageManifestCreator(object): - def __init__(self, config_path, layers_paths): + def __init__(self, config_path, layers_info): self._config_path = config_path - self._layers_paths = layers_paths + self._layers_info = layers_info def create(self): manifest = dict() @@ -18,11 +18,11 @@ def create(self): "digest": self._get_digest(self._config_path), } manifest["layers"] = [] - for layer in self._layers_paths: + for layer_info in self._layers_info: layer_data = { "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip", - "size": os.path.getsize(layer), - "digest": self._get_digest(layer), + "size": layer_info['size'], + "digest": layer_info['digest'], } manifest["layers"].append(layer_data) diff --git a/core/registry.py b/core/registry.py index 4d59833..d7a1473 100644 --- a/core/registry.py +++ b/core/registry.py @@ -83,8 +83,17 @@ def process_image(self, tmp_dir_name, image_config): # push individual image layers layers = image_config["Layers"] + manifest_layer_info = {} for layer in layers: - self._process_layer(layer, image, tmp_dir_name) + layer_digest, layer_size = self._process_layer( + layer, image, tmp_dir_name + ) + manifest_layer_info.append( + { + 'digest': layer_digest, + 'size': layer_size, + } + ) # then, push image config self._logger.info( @@ -93,14 +102,9 @@ def process_image(self, tmp_dir_name, image_config): push_url = self._initialize_push(image) self._push_config(config_path, push_url) - # keep the pushed layers - properly_formatted_layers = [ - os.path.join(tmp_dir_name, layer) for layer in layers - ] - # Now we need to create and push a manifest for the image creator = manifest_creator.ImageManifestCreator( - config_path, properly_formatted_layers + config_path, manifest_layer_info ) image_manifest = creator.create() @@ -166,7 +170,7 @@ def _process_layer(self, layer, image, tmp_dir_name): self._logger.info('Pushing layer', layer=layer) push_url = self._initialize_push(image) layer_path = os.path.join(tmp_dir_name, layer) - self._push_layer(layer_path, push_url) + return self._push_layer(layer_path, push_url) finally: self._logger.debug('Releasing layer lock', layer_key=layer_key) self._layer_locks[layer_key].release() @@ -196,36 +200,44 @@ def _initialize_push(self, repository): return upload_url def _push_layer(self, layer_path, upload_url): - self._chunked_upload(layer_path, upload_url, gzip=True) + return self._chunked_upload(layer_path, upload_url, gzip=True) def _push_config(self, layer_path, upload_url): self._chunked_upload(layer_path, upload_url) def _chunked_upload(self, filepath, upload_url, gzip=False): content_path = os.path.abspath(filepath) - content_size = os.stat(content_path).st_size + uncompressed_size = os.stat(filepath).st_size + total_pushed_size = 0 + uncompressed_length_read = 0 + digest = None with open(content_path, "rb") as f: index = 0 headers = {} sha256hash = hashlib.sha256() - for chunk in self._read_in_chunks(f, sha256hash): + for chunk in self._read_in_chunks(f): + uncompressed_length_read += len(chunk) if gzip: chunk = zlib.compress(chunk) headers['Content-Encoding'] = 'gzip' + sha256hash.update(chunk) + + total_pushed_size += len(chunk) offset = index + len(chunk) + headers['Content-Type'] = 'application/octet-stream' headers['Content-Length'] = str(len(chunk)) headers['Content-Range'] = f'{index}-{offset}' index = offset last = False - if offset == content_size: + if uncompressed_length_read == uncompressed_size: last = True try: self._conditional_print( "Pushing... " - + str(round((offset / content_size) * 100, 2)) + + str(round((offset / uncompressed_size) * 100, 2)) + "% ", end="\r", ) @@ -262,7 +274,6 @@ def _chunked_upload(self, filepath, upload_url, gzip=False): self._logger.log_and_raise( 'error', 'Failed to upload chunk', - digest=digest, filepath=filepath, status_code=response.status_code, content=response.content, @@ -278,18 +289,17 @@ def _chunked_upload(self, filepath, upload_url, gzip=False): filepath=filepath, exc=exc, ) - f.close() self._conditional_print("") + return digest, total_pushed_size @staticmethod - def _read_in_chunks(file_object, hashed, chunk_size=2097152): + def _read_in_chunks(file_object, chunk_size=2097152): """ - Chunk size default 2T (monolithic upload) + Chunk size default 2T """ while True: data = file_object.read(chunk_size) - hashed.update(data) if not data: break yield data From f00c0968997b7737e8fc81f70138e83bae2a1d39 Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 06:01:23 +0200 Subject: [PATCH 04/49] manifest_layer_info list --- core/registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/registry.py b/core/registry.py index d7a1473..f063f7a 100644 --- a/core/registry.py +++ b/core/registry.py @@ -83,7 +83,7 @@ def process_image(self, tmp_dir_name, image_config): # push individual image layers layers = image_config["Layers"] - manifest_layer_info = {} + manifest_layer_info = [] for layer in layers: layer_digest, layer_size = self._process_layer( layer, image, tmp_dir_name From e46917493a63d730dca959ee766ff9c658d2ccbf Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 06:10:16 +0200 Subject: [PATCH 05/49] Not gzipping --- core/registry.py | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/core/registry.py b/core/registry.py index f063f7a..1bca77b 100644 --- a/core/registry.py +++ b/core/registry.py @@ -200,31 +200,24 @@ def _initialize_push(self, repository): return upload_url def _push_layer(self, layer_path, upload_url): - return self._chunked_upload(layer_path, upload_url, gzip=True) + return self._chunked_upload(layer_path, upload_url) def _push_config(self, layer_path, upload_url): self._chunked_upload(layer_path, upload_url) - def _chunked_upload(self, filepath, upload_url, gzip=False): + def _chunked_upload(self, filepath, upload_url): content_path = os.path.abspath(filepath) - uncompressed_size = os.stat(filepath).st_size + total_size = os.stat(filepath).st_size total_pushed_size = 0 - uncompressed_length_read = 0 + length_read = 0 digest = None with open(content_path, "rb") as f: index = 0 headers = {} sha256hash = hashlib.sha256() - for chunk in self._read_in_chunks(f): - uncompressed_length_read += len(chunk) - if gzip: - chunk = zlib.compress(chunk) - headers['Content-Encoding'] = 'gzip' - - sha256hash.update(chunk) - - total_pushed_size += len(chunk) + for chunk in self._read_in_chunks(f, sha256hash): + length_read += len(chunk) offset = index + len(chunk) headers['Content-Type'] = 'application/octet-stream' @@ -232,12 +225,12 @@ def _chunked_upload(self, filepath, upload_url, gzip=False): headers['Content-Range'] = f'{index}-{offset}' index = offset last = False - if uncompressed_length_read == uncompressed_size: + if length_read == total_size: last = True try: self._conditional_print( "Pushing... " - + str(round((offset / uncompressed_size) * 100, 2)) + + str(round((offset / total_size) * 100, 2)) + "% ", end="\r", ) @@ -282,6 +275,7 @@ def _chunked_upload(self, filepath, upload_url, gzip=False): if "Location" in response.headers: upload_url = response.headers["Location"] + total_pushed_size += len(chunk) except Exception as exc: self._logger.log_and_raise( 'error', @@ -294,12 +288,13 @@ def _chunked_upload(self, filepath, upload_url, gzip=False): return digest, total_pushed_size @staticmethod - def _read_in_chunks(file_object, chunk_size=2097152): + def _read_in_chunks(file_object, hashed, chunk_size=2097152): """ Chunk size default 2T """ while True: data = file_object.read(chunk_size) + hashed.update(data) if not data: break yield data From 72c650613916cf3a404e36aab946208388aa2901 Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 06:10:28 +0200 Subject: [PATCH 06/49] No --- core/registry.py | 1 - 1 file changed, 1 deletion(-) diff --git a/core/registry.py b/core/registry.py index 1bca77b..bc5e8a6 100644 --- a/core/registry.py +++ b/core/registry.py @@ -6,7 +6,6 @@ import urllib.parse import time import threading -import zlib import humanfriendly import requests From 80ea0aec01adbc1c64068f81e59d071bd6b46534 Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 07:29:41 +0200 Subject: [PATCH 07/49] compress --- core/registry.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/core/registry.py b/core/registry.py index bc5e8a6..9af907f 100644 --- a/core/registry.py +++ b/core/registry.py @@ -6,6 +6,7 @@ import urllib.parse import time import threading +import subprocess import humanfriendly import requests @@ -199,19 +200,31 @@ def _initialize_push(self, repository): return upload_url def _push_layer(self, layer_path, upload_url): - return self._chunked_upload(layer_path, upload_url) + return self._chunked_upload(layer_path, upload_url, gzip=True) def _push_config(self, layer_path, upload_url): self._chunked_upload(layer_path, upload_url) - def _chunked_upload(self, filepath, upload_url): + def _chunked_upload(self, filepath, initial_url, gzip=False): content_path = os.path.abspath(filepath) total_size = os.stat(filepath).st_size + + # for kaniko compatibility - must be real tar.gzip and not just tar + if gzip: + if os.path.splitext(filepath)[1] == '.tar': + self._logger.debug('Failed is not gzipped and gzipped asked - compressing before upload', + filepath=filepath, + total_size=total_size) + + subprocess.check_call(f'gzip -9 {content_path}') + content_path = content_path + '.gz' + total_pushed_size = 0 length_read = 0 digest = None with open(content_path, "rb") as f: index = 0 + upload_url = initial_url headers = {} sha256hash = hashlib.sha256() From 01142fc2c76406511761e592b770059fbad33674 Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 07:36:58 +0200 Subject: [PATCH 08/49] shlex --- core/registry.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/registry.py b/core/registry.py index 9af907f..d928d9f 100644 --- a/core/registry.py +++ b/core/registry.py @@ -6,6 +6,7 @@ import urllib.parse import time import threading +import shlex import subprocess import humanfriendly @@ -212,11 +213,13 @@ def _chunked_upload(self, filepath, initial_url, gzip=False): # for kaniko compatibility - must be real tar.gzip and not just tar if gzip: if os.path.splitext(filepath)[1] == '.tar': - self._logger.debug('Failed is not gzipped and gzipped asked - compressing before upload', - filepath=filepath, - total_size=total_size) + self._logger.debug( + 'File is not gzipped - compressing before upload', + filepath=filepath, + total_size=total_size, + ) - subprocess.check_call(f'gzip -9 {content_path}') + subprocess.check_call(shlex.split(f'gzip -9 {content_path}')) content_path = content_path + '.gz' total_pushed_size = 0 From 8199effb1e9155b9a2ce91e2fea2fa32f11ba2bd Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 07:42:22 +0200 Subject: [PATCH 09/49] try --- core/registry.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/registry.py b/core/registry.py index d928d9f..1b3646c 100644 --- a/core/registry.py +++ b/core/registry.py @@ -213,14 +213,16 @@ def _chunked_upload(self, filepath, initial_url, gzip=False): # for kaniko compatibility - must be real tar.gzip and not just tar if gzip: if os.path.splitext(filepath)[1] == '.tar': + new_content_path = content_path + '.gz' self._logger.debug( 'File is not gzipped - compressing before upload', - filepath=filepath, + content_path=content_path, + new_content_path=new_content_path, total_size=total_size, ) subprocess.check_call(shlex.split(f'gzip -9 {content_path}')) - content_path = content_path + '.gz' + content_path = new_content_path total_pushed_size = 0 length_read = 0 @@ -235,7 +237,10 @@ def _chunked_upload(self, filepath, initial_url, gzip=False): length_read += len(chunk) offset = index + len(chunk) - headers['Content-Type'] = 'application/octet-stream' + if gzip: + headers['Content-Type'] = 'application/gzip' + else: + headers['Content-Type'] = 'application/octet-stream' headers['Content-Length'] = str(len(chunk)) headers['Content-Range'] = f'{index}-{offset}' index = offset From c49458f2b008b8f99527473a5c537773a595fced Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 07:45:21 +0200 Subject: [PATCH 10/49] headers --- core/registry.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/registry.py b/core/registry.py index 1b3646c..dc2a148 100644 --- a/core/registry.py +++ b/core/registry.py @@ -238,9 +238,9 @@ def _chunked_upload(self, filepath, initial_url, gzip=False): offset = index + len(chunk) if gzip: - headers['Content-Type'] = 'application/gzip' - else: - headers['Content-Type'] = 'application/octet-stream' + headers['Content-Encoding'] = 'gzip' + + headers['Content-Type'] = 'application/octet-stream' headers['Content-Length'] = str(len(chunk)) headers['Content-Range'] = f'{index}-{offset}' index = offset From 2ea506c37e68cb368395d8d114677f50cc3c36f9 Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 07:50:28 +0200 Subject: [PATCH 11/49] reorder --- core/registry.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/registry.py b/core/registry.py index dc2a148..81fdf55 100644 --- a/core/registry.py +++ b/core/registry.py @@ -152,6 +152,7 @@ def _push_manifest(self, manifest, image, tag): self._logger.log_and_raise( 'error', 'Failed to push manifest', + manifest=manifest, image=image, tag=tag, status_code=response.status_code, @@ -208,7 +209,6 @@ def _push_config(self, layer_path, upload_url): def _chunked_upload(self, filepath, initial_url, gzip=False): content_path = os.path.abspath(filepath) - total_size = os.stat(filepath).st_size # for kaniko compatibility - must be real tar.gzip and not just tar if gzip: @@ -218,12 +218,13 @@ def _chunked_upload(self, filepath, initial_url, gzip=False): 'File is not gzipped - compressing before upload', content_path=content_path, new_content_path=new_content_path, - total_size=total_size, ) subprocess.check_call(shlex.split(f'gzip -9 {content_path}')) content_path = new_content_path + total_size = os.stat(filepath).st_size + total_pushed_size = 0 length_read = 0 digest = None From 802f0dc4736f80a555b22fcec310452fbfedfa00 Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 07:54:13 +0200 Subject: [PATCH 12/49] log --- core/registry.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/core/registry.py b/core/registry.py index 81fdf55..15d513b 100644 --- a/core/registry.py +++ b/core/registry.py @@ -211,17 +211,18 @@ def _chunked_upload(self, filepath, initial_url, gzip=False): content_path = os.path.abspath(filepath) # for kaniko compatibility - must be real tar.gzip and not just tar - if gzip: - if os.path.splitext(filepath)[1] == '.tar': - new_content_path = content_path + '.gz' - self._logger.debug( - 'File is not gzipped - compressing before upload', - content_path=content_path, - new_content_path=new_content_path, - ) + if gzip and os.path.splitext(filepath)[1] == '.tar': + new_content_path = content_path + '.gz' + self._logger.debug( + 'File is not gzipped - compressing before upload', + content_path=content_path, + new_content_path=new_content_path, + ) - subprocess.check_call(shlex.split(f'gzip -9 {content_path}')) - content_path = new_content_path + gzip_cmd = shlex.split(f'gzip -9 {content_path}') + out = subprocess.check_call(gzip_cmd) + self._logger.debug('Finished gzipping', gzip_cmd=gzip_cmd, out=out) + content_path = new_content_path total_size = os.stat(filepath).st_size From 9462df938df8e8f3bff0c438b555e283b0261dff Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 08:16:02 +0200 Subject: [PATCH 13/49] nicer --- core/registry.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/core/registry.py b/core/registry.py index 15d513b..42eafd2 100644 --- a/core/registry.py +++ b/core/registry.py @@ -210,21 +210,19 @@ def _push_config(self, layer_path, upload_url): def _chunked_upload(self, filepath, initial_url, gzip=False): content_path = os.path.abspath(filepath) - # for kaniko compatibility - must be real tar.gzip and not just tar + # for Kaniko compatibility - must be real tar.gzip and not just tar if gzip and os.path.splitext(filepath)[1] == '.tar': - new_content_path = content_path + '.gz' self._logger.debug( 'File is not gzipped - compressing before upload', content_path=content_path, - new_content_path=new_content_path, ) gzip_cmd = shlex.split(f'gzip -9 {content_path}') - out = subprocess.check_call(gzip_cmd) - self._logger.debug('Finished gzipping', gzip_cmd=gzip_cmd, out=out) - content_path = new_content_path + out = subprocess.check_output(gzip_cmd, encoding='utf-8') + self._logger.debug('Finished gzip command', gzip_cmd=gzip_cmd, out=out) + content_path = content_path + '.gz' - total_size = os.stat(filepath).st_size + total_size = os.stat(content_path).st_size total_pushed_size = 0 length_read = 0 From 322440729a427d61a501a12093d0c78d9206ad85 Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 08:24:56 +0200 Subject: [PATCH 14/49] Moving up --- core/registry.py | 39 ++++++++++++++++++++------------------- dockerregistrypusher.py | 2 +- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/core/registry.py b/core/registry.py index 42eafd2..52a9e5d 100644 --- a/core/registry.py +++ b/core/registry.py @@ -169,9 +169,23 @@ def _process_layer(self, layer, image, tmp_dir_name): self._layer_locks.setdefault(layer_key, threading.Lock()) self._layer_locks[layer_key].acquire() try: + layer_path = os.path.abspath(os.path.join(tmp_dir_name, layer)) + + # for Kaniko compatibility - must be real tar.gzip and not just tar + if os.path.splitext(layer)[1] == '.tar': + self._logger.debug( + 'File is not gzipped - compressing before upload', + layer_path=layer_path, + ) + + gzip_cmd = shlex.split(f'gzip -9 {layer_path}') + out = subprocess.check_output(gzip_cmd, encoding='utf-8') + self._logger.debug('Finished gzip command', gzip_cmd=gzip_cmd, out=out) + layer_path += '.gz' + self._logger.info('Pushing layer', layer=layer) push_url = self._initialize_push(image) - layer_path = os.path.join(tmp_dir_name, layer) + return self._push_layer(layer_path, push_url) finally: self._logger.debug('Releasing layer lock', layer_key=layer_key) @@ -202,26 +216,13 @@ def _initialize_push(self, repository): return upload_url def _push_layer(self, layer_path, upload_url): - return self._chunked_upload(layer_path, upload_url, gzip=True) + return self._chunked_upload(layer_path, upload_url) - def _push_config(self, layer_path, upload_url): - self._chunked_upload(layer_path, upload_url) + def _push_config(self, config_path, upload_url): + self._chunked_upload(config_path, upload_url) - def _chunked_upload(self, filepath, initial_url, gzip=False): + def _chunked_upload(self, filepath, initial_url): content_path = os.path.abspath(filepath) - - # for Kaniko compatibility - must be real tar.gzip and not just tar - if gzip and os.path.splitext(filepath)[1] == '.tar': - self._logger.debug( - 'File is not gzipped - compressing before upload', - content_path=content_path, - ) - - gzip_cmd = shlex.split(f'gzip -9 {content_path}') - out = subprocess.check_output(gzip_cmd, encoding='utf-8') - self._logger.debug('Finished gzip command', gzip_cmd=gzip_cmd, out=out) - content_path = content_path + '.gz' - total_size = os.stat(content_path).st_size total_pushed_size = 0 @@ -237,7 +238,7 @@ def _chunked_upload(self, filepath, initial_url, gzip=False): length_read += len(chunk) offset = index + len(chunk) - if gzip: + if content_path.endswith('gz') or content_path.endswith('gzip'): headers['Content-Encoding'] = 'gzip' headers['Content-Type'] = 'application/octet-stream' diff --git a/dockerregistrypusher.py b/dockerregistrypusher.py index 595c96b..abdf749 100644 --- a/dockerregistrypusher.py +++ b/dockerregistrypusher.py @@ -72,7 +72,7 @@ def register_arguments(parser): 'archive_path', metavar='ARCHIVE_PATH', type=str, - help='The url of the target registry to push to', + help='The path of the images archive to push', ) parser.add_argument( From 91b1aa397dda4236e85f99fbedc77868a6ef8d9d Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 09:00:23 +0200 Subject: [PATCH 15/49] M --- core/registry.py | 42 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/core/registry.py b/core/registry.py index 52a9e5d..f0afc06 100644 --- a/core/registry.py +++ b/core/registry.py @@ -16,7 +16,23 @@ from . import manifest_creator -class Registry(object): +class LayersLock: + def __init__(self): + self._global_lock = threading.Lock() + self._layers_locks = {} + + def get_lock(self, key): + + # global lock to check if layer was done + self._global_lock.acquire() + try: + self._layers_locks.setdefault(key, threading.Lock()) + return self._layers_locks[key] + finally: + self._global_lock.release() + + +class Registry: def __init__( self, logger, @@ -44,7 +60,8 @@ def __init__( if self._login: self._basicauth = requests.auth.HTTPBasicAuth(self._login, self._password) - self._layer_locks = {} + self._layers_lock = LayersLock() + self._layers_info = {} self._logger.debug( 'Initialized', @@ -166,9 +183,16 @@ def _process_layer(self, layer, image, tmp_dir_name): # pushing the layer in parallel from different images might result in 500 internal server error self._logger.debug('Acquiring layer lock', layer_key=layer_key) - self._layer_locks.setdefault(layer_key, threading.Lock()) - self._layer_locks[layer_key].acquire() + self._layers_lock.get_lock(layer_key).acquire() try: + + if layer_key in self._layers_info: + self._logger.info( + 'Layer already pushed, skipping', + layer_info=self._layers_info[layer_key], + ) + return self._layers_info['digest'], self._layers_info['size'] + layer_path = os.path.abspath(os.path.join(tmp_dir_name, layer)) # for Kaniko compatibility - must be real tar.gzip and not just tar @@ -181,15 +205,21 @@ def _process_layer(self, layer, image, tmp_dir_name): gzip_cmd = shlex.split(f'gzip -9 {layer_path}') out = subprocess.check_output(gzip_cmd, encoding='utf-8') self._logger.debug('Finished gzip command', gzip_cmd=gzip_cmd, out=out) + layer += '.gz' layer_path += '.gz' self._logger.info('Pushing layer', layer=layer) push_url = self._initialize_push(image) - return self._push_layer(layer_path, push_url) + digest, size = self._push_layer(layer_path, push_url) + self._layers_info[layer_key] = { + 'digest': digest, + 'size': size, + } + return digest, size finally: self._logger.debug('Releasing layer lock', layer_key=layer_key) - self._layer_locks[layer_key].release() + self._layers_lock.get_lock(layer_key).release() def _initialize_push(self, repository): """ From 6fff6844bd1a203f32e6e87718dd902aadcc538a Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 09:12:13 +0200 Subject: [PATCH 16/49] Handle some cases nicely --- core/registry.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/core/registry.py b/core/registry.py index f0afc06..6148fe7 100644 --- a/core/registry.py +++ b/core/registry.py @@ -197,14 +197,22 @@ def _process_layer(self, layer, image, tmp_dir_name): # for Kaniko compatibility - must be real tar.gzip and not just tar if os.path.splitext(layer)[1] == '.tar': - self._logger.debug( - 'File is not gzipped - compressing before upload', - layer_path=layer_path, - ) - gzip_cmd = shlex.split(f'gzip -9 {layer_path}') - out = subprocess.check_output(gzip_cmd, encoding='utf-8') - self._logger.debug('Finished gzip command', gzip_cmd=gzip_cmd, out=out) + # safety - handle cases where some race of corruption may have occurred, if .tar.gz is in place, skip + # compression and ignore the original + if not os.path.exists(layer_path + '.gz'): + self._logger.debug( + 'Layer file is not gzipped - compressing before upload', + layer_path=layer_path, + ) + + gzip_cmd = shlex.split(f'gzip -9 {layer_path}') + out = subprocess.check_output(gzip_cmd, encoding='utf-8') + self._logger.debug( + 'Finished gzip command', gzip_cmd=gzip_cmd, out=out + ) + + # whether we compressed it now or beforehand, the new name has this new prefix by gzip layer += '.gz' layer_path += '.gz' From d655e3f89b2418c8bdc78c5e862c0fc451051c8a Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 09:13:45 +0200 Subject: [PATCH 17/49] bug fix layer_info access --- core/registry.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/registry.py b/core/registry.py index 6148fe7..235aa34 100644 --- a/core/registry.py +++ b/core/registry.py @@ -187,11 +187,12 @@ def _process_layer(self, layer, image, tmp_dir_name): try: if layer_key in self._layers_info: + layer_info = self._layers_info[layer_key] self._logger.info( 'Layer already pushed, skipping', - layer_info=self._layers_info[layer_key], + layer_info=layer_info, ) - return self._layers_info['digest'], self._layers_info['size'] + return layer_info['digest'], layer_info['size'] layer_path = os.path.abspath(os.path.join(tmp_dir_name, layer)) From 497adbf6dc438d2c2dc66c2f338d2ba7afa33474 Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 12:20:11 +0200 Subject: [PATCH 18/49] force --- core/registry.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/registry.py b/core/registry.py index 235aa34..c72f77c 100644 --- a/core/registry.py +++ b/core/registry.py @@ -207,7 +207,8 @@ def _process_layer(self, layer, image, tmp_dir_name): layer_path=layer_path, ) - gzip_cmd = shlex.split(f'gzip -9 {layer_path}') + # use -f to avoid "Too many levels of symbolic links" failures + gzip_cmd = shlex.split(f'gzip -9 -f {layer_path}') out = subprocess.check_output(gzip_cmd, encoding='utf-8') self._logger.debug( 'Finished gzip command', gzip_cmd=gzip_cmd, out=out From 91c328cced84f2223b5254a22f3af2115b85b1b3 Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 12:24:57 +0200 Subject: [PATCH 19/49] log message change --- core/registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/registry.py b/core/registry.py index c72f77c..d49f554 100644 --- a/core/registry.py +++ b/core/registry.py @@ -211,7 +211,7 @@ def _process_layer(self, layer, image, tmp_dir_name): gzip_cmd = shlex.split(f'gzip -9 -f {layer_path}') out = subprocess.check_output(gzip_cmd, encoding='utf-8') self._logger.debug( - 'Finished gzip command', gzip_cmd=gzip_cmd, out=out + 'Successfully gzipped layer', gzip_cmd=gzip_cmd, out=out ) # whether we compressed it now or beforehand, the new name has this new prefix by gzip From 61da504a9ee6821bfcc0ab90fbf7dd1d99654930 Mon Sep 17 00:00:00 2001 From: omesser Date: Tue, 2 Mar 2021 13:14:55 +0200 Subject: [PATCH 20/49] handle symlinkx --- core/registry.py | 60 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 45 insertions(+), 15 deletions(-) diff --git a/core/registry.py b/core/registry.py index d49f554..7191705 100644 --- a/core/registry.py +++ b/core/registry.py @@ -34,15 +34,15 @@ def get_lock(self, key): class Registry: def __init__( - self, - logger, - registry_url, - stream=False, - login=None, - password=None, - ssl_verify=True, - replace_tags_match=None, - replace_tags_target=None, + self, + logger, + registry_url, + stream=False, + login=None, + password=None, + ssl_verify=True, + replace_tags_match=None, + replace_tags_target=None, ): self._logger = logger.get_child('registry') @@ -197,7 +197,26 @@ def _process_layer(self, layer, image, tmp_dir_name): layer_path = os.path.abspath(os.path.join(tmp_dir_name, layer)) # for Kaniko compatibility - must be real tar.gzip and not just tar - if os.path.splitext(layer)[1] == '.tar': + if os.path.splitext(layer_path)[1].endswith('tar'): + + # handle symlinks + if os.path.islink(layer_path): + symlinked_path = os.path.realpath(layer_path) + + # symlink target missing (probably compressed tar -> tar.gz) + compressed_symlinked_path = symlinked_path + '.gz' + if not os.path.exists(symlinked_path) and os.path.exists(compressed_symlinked_path): + + # fix + tmp_link_path = f'{layer_path}_tmplink' + os.symlink(compressed_symlinked_path, tmp_link_path) + os.rename(tmp_link_path, layer_path) + elif not os.path.exists(symlinked_path): + self._logger.log_and_raise('error', + 'Target for symlink is missing. Cannot continue pushing', + symlinked_path=symlinked_path, + layer_path=layer_path, + image=image) # safety - handle cases where some race of corruption may have occurred, if .tar.gz is in place, skip # compression and ignore the original @@ -208,11 +227,22 @@ def _process_layer(self, layer, image, tmp_dir_name): ) # use -f to avoid "Too many levels of symbolic links" failures - gzip_cmd = shlex.split(f'gzip -9 -f {layer_path}') - out = subprocess.check_output(gzip_cmd, encoding='utf-8') - self._logger.debug( - 'Successfully gzipped layer', gzip_cmd=gzip_cmd, out=out - ) + try: + # use -f to avoid "Too many levels of symbolic links" failures + gzip_cmd = shlex.split(f'gzip -9 -f {layer_path}') + out = subprocess.check_output(gzip_cmd, encoding='utf-8') + self._logger.debug( + 'Successfully gzipped layer', gzip_cmd=gzip_cmd, out=out + ) + + # catch and print for debugging + except Exception as exc: + cmd = shlex.split(f'ls -latr {os.path.dirname(layer_path)}') + out = subprocess.check_output(cmd, encoding='utf-8') + self._logger.warn( + 'Finished ls command', cmd=cmd, out=out, orig_exc=exc, + ) + raise # whether we compressed it now or beforehand, the new name has this new prefix by gzip layer += '.gz' From 2d06530855c5811632a87e051098bf4e6717e0d1 Mon Sep 17 00:00:00 2001 From: omesser Date: Wed, 3 Mar 2021 10:39:58 +0200 Subject: [PATCH 21/49] Add log, unlink old symlink --- core/registry.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/registry.py b/core/registry.py index 7191705..08799f9 100644 --- a/core/registry.py +++ b/core/registry.py @@ -202,6 +202,7 @@ def _process_layer(self, layer, image, tmp_dir_name): # handle symlinks if os.path.islink(layer_path): symlinked_path = os.path.realpath(layer_path) + self._logger.debug('Found symlink layer', layer_path=layer_path, symlinked_path=symlinked_path) # symlink target missing (probably compressed tar -> tar.gz) compressed_symlinked_path = symlinked_path + '.gz' @@ -210,6 +211,7 @@ def _process_layer(self, layer, image, tmp_dir_name): # fix tmp_link_path = f'{layer_path}_tmplink' os.symlink(compressed_symlinked_path, tmp_link_path) + os.unlink(layer_path) os.rename(tmp_link_path, layer_path) elif not os.path.exists(symlinked_path): self._logger.log_and_raise('error', From cc7800b7d201e6d26583fdf598db8d1e81b1dd5e Mon Sep 17 00:00:00 2001 From: omesser Date: Wed, 3 Mar 2021 10:55:36 +0200 Subject: [PATCH 22/49] another log --- core/registry.py | 1 + 1 file changed, 1 insertion(+) diff --git a/core/registry.py b/core/registry.py index 08799f9..fa73cdf 100644 --- a/core/registry.py +++ b/core/registry.py @@ -258,6 +258,7 @@ def _process_layer(self, layer, image, tmp_dir_name): 'digest': digest, 'size': size, } + self._logger.info('Layer pushed', layer=layer, digest=digest, size=size) return digest, size finally: self._logger.debug('Releasing layer lock', layer_key=layer_key) From eb967415920919fe4d70ccc7d841eea6a23f40db Mon Sep 17 00:00:00 2001 From: omesser Date: Wed, 3 Mar 2021 11:16:43 +0200 Subject: [PATCH 23/49] No skipping --- core/registry.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/registry.py b/core/registry.py index fa73cdf..cc5a979 100644 --- a/core/registry.py +++ b/core/registry.py @@ -186,13 +186,14 @@ def _process_layer(self, layer, image, tmp_dir_name): self._layers_lock.get_lock(layer_key).acquire() try: - if layer_key in self._layers_info: - layer_info = self._layers_info[layer_key] - self._logger.info( - 'Layer already pushed, skipping', - layer_info=layer_info, - ) - return layer_info['digest'], layer_info['size'] + # if layer_key in self._layers_info: + # layer_info = self._layers_info[layer_key] + # self._logger.info( + # 'Layer already pushed, skipping', + # layer_key=layer_key, + # layer_info=layer_info, + # ) + # return layer_info['digest'], layer_info['size'] layer_path = os.path.abspath(os.path.join(tmp_dir_name, layer)) @@ -220,7 +221,7 @@ def _process_layer(self, layer, image, tmp_dir_name): layer_path=layer_path, image=image) - # safety - handle cases where some race of corruption may have occurred, if .tar.gz is in place, skip + # safety - handle cases where some race or corruption may have occurred, if .tar.gz is in place, skip # compression and ignore the original if not os.path.exists(layer_path + '.gz'): self._logger.debug( From c0a9ec44d8814166c10001efcf07f70b3ef5936f Mon Sep 17 00:00:00 2001 From: omesser Date: Wed, 3 Mar 2021 13:18:54 +0200 Subject: [PATCH 24/49] log change --- core/registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/registry.py b/core/registry.py index cc5a979..5b168c4 100644 --- a/core/registry.py +++ b/core/registry.py @@ -93,7 +93,7 @@ def process_image(self, tmp_dir_name, image_config): repo_tag_start_time = time.time() image, tag = self._parse_image_tag(repo) self._logger.info( - 'Pushing image repo and tag', + 'Processing image repo and tag', image=image, tag=tag, tmp_dir_name=tmp_dir_name, From de4e3b597f2ce68afec5ea877cabbb17c2879757 Mon Sep 17 00:00:00 2001 From: omesser Date: Wed, 3 Mar 2021 13:19:16 +0200 Subject: [PATCH 25/49] fmt --- core/registry.py | 45 ++++++++++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/core/registry.py b/core/registry.py index 5b168c4..fa19e9a 100644 --- a/core/registry.py +++ b/core/registry.py @@ -34,15 +34,15 @@ def get_lock(self, key): class Registry: def __init__( - self, - logger, - registry_url, - stream=False, - login=None, - password=None, - ssl_verify=True, - replace_tags_match=None, - replace_tags_target=None, + self, + logger, + registry_url, + stream=False, + login=None, + password=None, + ssl_verify=True, + replace_tags_match=None, + replace_tags_target=None, ): self._logger = logger.get_child('registry') @@ -203,11 +203,17 @@ def _process_layer(self, layer, image, tmp_dir_name): # handle symlinks if os.path.islink(layer_path): symlinked_path = os.path.realpath(layer_path) - self._logger.debug('Found symlink layer', layer_path=layer_path, symlinked_path=symlinked_path) + self._logger.debug( + 'Found symlink layer', + layer_path=layer_path, + symlinked_path=symlinked_path, + ) # symlink target missing (probably compressed tar -> tar.gz) compressed_symlinked_path = symlinked_path + '.gz' - if not os.path.exists(symlinked_path) and os.path.exists(compressed_symlinked_path): + if not os.path.exists(symlinked_path) and os.path.exists( + compressed_symlinked_path + ): # fix tmp_link_path = f'{layer_path}_tmplink' @@ -215,11 +221,13 @@ def _process_layer(self, layer, image, tmp_dir_name): os.unlink(layer_path) os.rename(tmp_link_path, layer_path) elif not os.path.exists(symlinked_path): - self._logger.log_and_raise('error', - 'Target for symlink is missing. Cannot continue pushing', - symlinked_path=symlinked_path, - layer_path=layer_path, - image=image) + self._logger.log_and_raise( + 'error', + 'Target for symlink is missing. Cannot continue pushing', + symlinked_path=symlinked_path, + layer_path=layer_path, + image=image, + ) # safety - handle cases where some race or corruption may have occurred, if .tar.gz is in place, skip # compression and ignore the original @@ -243,7 +251,10 @@ def _process_layer(self, layer, image, tmp_dir_name): cmd = shlex.split(f'ls -latr {os.path.dirname(layer_path)}') out = subprocess.check_output(cmd, encoding='utf-8') self._logger.warn( - 'Finished ls command', cmd=cmd, out=out, orig_exc=exc, + 'Finished ls command', + cmd=cmd, + out=out, + orig_exc=exc, ) raise From 0d75b91a8ca3fddef32650278a8526a3ca91141d Mon Sep 17 00:00:00 2001 From: omesser Date: Wed, 3 Mar 2021 15:52:30 +0200 Subject: [PATCH 26/49] Do all before --- core/manifest_creator.py | 19 +------ core/processor.py | 117 ++++++++++++++++++++++++++++++++++++++- core/registry.py | 93 +------------------------------ utils/helpers.py | 27 +++++++++ 4 files changed, 147 insertions(+), 109 deletions(-) create mode 100644 utils/helpers.py diff --git a/core/manifest_creator.py b/core/manifest_creator.py index dce77c2..9ea0f5e 100644 --- a/core/manifest_creator.py +++ b/core/manifest_creator.py @@ -1,7 +1,8 @@ import os -import hashlib import json +import utils.helpers + class ImageManifestCreator(object): def __init__(self, config_path, layers_info): @@ -15,7 +16,7 @@ def create(self): manifest["config"] = { "mediaType": "application/vnd.docker.container.image.v1+json", "size": os.path.getsize(self._config_path), - "digest": self._get_digest(self._config_path), + "digest": utils.helpers.get_digest(self._config_path), } manifest["layers"] = [] for layer_info in self._layers_info: @@ -27,17 +28,3 @@ def create(self): manifest["layers"].append(layer_data) return json.dumps(manifest) - - def _get_digest(self, filepath): - return "sha256:" + self.get_file_sha256(filepath) - - @staticmethod - def get_file_sha256(filepath): - sha256hash = hashlib.sha256() - with open(filepath, "rb") as f: - while True: - data = f.read(65536) - sha256hash.update(data) - if not data: - break - return sha256hash.hexdigest() diff --git a/core/processor.py b/core/processor.py index 4e6e33b..485f3ca 100644 --- a/core/processor.py +++ b/core/processor.py @@ -2,12 +2,16 @@ import multiprocessing.pool import time import os.path +import pathlib import json +import subprocess +import shlex import humanfriendly from . import registry from . import extractor +import utils.helpers class Processor(object): @@ -67,6 +71,9 @@ def process(self): # extract the whole thing self._extractor.extract_all(tmp_dir_name) + # compress layers in place - for kaniko + self._compress_layer_files(tmp_dir_name) + manifest = self._get_manifest(tmp_dir_name) self._logger.debug('Extracted archive manifest', manifest=manifest) @@ -94,12 +101,118 @@ def process(self): elapsed=humanfriendly.format_timespan(elapsed), ) + def _compress_layer_files(self, root_dir): + start_time = time.time() + + # for Kaniko compatibility - must be real tar.gzip and not just tar + self._logger.info('Compressing all layer files (pre-processing)') + for tar_files in pathlib.Path(root_dir).rglob('*.tar'): + file_path = tar_files.absolute() + self._logger.info('Compressing file (.tar -> .tar.gz)', file_path=file_path) + + # safety - if .tar.gz is in place, skip + # compression and ignore the original + if os.path.exists(str(file_path) + '.gz'): + self._logger.debug( + 'Layer file is gzipped - skipping', + file_path=file_path, + gzipped_path=str(file_path) + '.gz', + ) + continue + + # use -f to avoid "Too many levels of symbolic links" failures + try: + # use -f to avoid "Too many levels of symbolic links" failures + gzip_cmd = shlex.split(f'gzip -9 -f {file_path}') + out = subprocess.check_output(gzip_cmd, encoding='utf-8') + self._logger.debug( + 'Successfully gzipped layer', + gzip_cmd=gzip_cmd, + out=out, + file_path=file_path, + ) + + # catch and print for debugging + except Exception as exc: + cmd = shlex.split(f'ls -latr {os.path.dirname(file_path)}') + out = subprocess.check_output(cmd, encoding='utf-8') + self._logger.warn( + 'Finished ls command', + cmd=cmd, + out=out, + orig_exc=exc, + ) + raise + + # fix symlinks + for root, dirs, files in os.walk(root_dir): + for filename in files: + path = os.path.join(root, filename) + + # If it's not a symlink we're not interested. + if not os.path.islink(path): + continue + + target_path = os.readlink(path) + + if not os.path.exists(target_path): + self._logger.debug( + 'Found broken link', target_path=target_path, path=path + ) + + # try and fix - point to tar.gz + new_target_path = target_path + b'.gz' + if os.path.exists(new_target_path): + tmp_link_path = f'{target_path}_tmplink' + os.symlink(new_target_path, tmp_link_path) + os.unlink(path) + os.rename(tmp_link_path, path) + self._logger.debug( + 'Fixed broken link', + new_target_path=new_target_path, + path=path, + ) + else: + self._logger.log_and_raise( + 'error', + 'Cannot fix broken link', + target_path=target_path, + path=path, + ) + self._logger.debug('Correcting image manifests') + manifest = self._get_manifest(root_dir) + for image_config in manifest: + config_filename = image_config["Config"] + + # rootfs.diff_ids contains layer digests - TODO change them? + config_path = os.path.join(root_dir, config_filename) + config_parsed = utils.helpers.load_json_file(config_path) + + # warning - spammy + self._logger.verbose('Parsed image config', config_parsed=config_parsed) + for idx, layer in enumerate(image_config["Layers"]): + if layer.endswith('.tar'): + image_config["Layers"][idx] = layer + '.gz' + + # write modified image config + self._write_manifest(root_dir) + + elapsed = time.time() - start_time + self._logger.info( + 'Finished compressing all layer files (pre-processing)', elapsed=elapsed + ) + @staticmethod - def _get_manifest(tmp_dir_name): - with open(os.path.join(tmp_dir_name, 'manifest.json'), 'r') as fh: + def _get_manifest(archive_dir): + with open(os.path.join(archive_dir, 'manifest.json'), 'r') as fh: manifest = json.loads(fh.read()) return manifest + @staticmethod + def _write_manifest(archive_dir, contents): + with open(os.path.join(archive_dir, 'manifest.json'), 'w') as fh: + json.dump(contents, fh) + # # Global wrappers to use with multiprocessing.pool.Pool which can't pickle instance methods diff --git a/core/registry.py b/core/registry.py index fa19e9a..e60e597 100644 --- a/core/registry.py +++ b/core/registry.py @@ -6,14 +6,13 @@ import urllib.parse import time import threading -import shlex -import subprocess import humanfriendly import requests import requests.auth from . import manifest_creator +import utils.helpers class LayersLock: @@ -61,7 +60,6 @@ def __init__( self._basicauth = requests.auth.HTTPBasicAuth(self._login, self._password) self._layers_lock = LayersLock() - self._layers_info = {} self._logger.debug( 'Initialized', @@ -84,7 +82,7 @@ def process_image(self, tmp_dir_name, image_config): self._logger.info('Processing image', repo_tags=repo_tags) image_start_time = time.time() - config_parsed = self._load_json_file(config_path) + config_parsed = utils.helpers.load_json_file(config_path) # warning - spammy self._logger.verbose('Parsed image config', config_parsed=config_parsed) @@ -185,91 +183,12 @@ def _process_layer(self, layer, image, tmp_dir_name): self._logger.debug('Acquiring layer lock', layer_key=layer_key) self._layers_lock.get_lock(layer_key).acquire() try: - - # if layer_key in self._layers_info: - # layer_info = self._layers_info[layer_key] - # self._logger.info( - # 'Layer already pushed, skipping', - # layer_key=layer_key, - # layer_info=layer_info, - # ) - # return layer_info['digest'], layer_info['size'] - layer_path = os.path.abspath(os.path.join(tmp_dir_name, layer)) - # for Kaniko compatibility - must be real tar.gzip and not just tar - if os.path.splitext(layer_path)[1].endswith('tar'): - - # handle symlinks - if os.path.islink(layer_path): - symlinked_path = os.path.realpath(layer_path) - self._logger.debug( - 'Found symlink layer', - layer_path=layer_path, - symlinked_path=symlinked_path, - ) - - # symlink target missing (probably compressed tar -> tar.gz) - compressed_symlinked_path = symlinked_path + '.gz' - if not os.path.exists(symlinked_path) and os.path.exists( - compressed_symlinked_path - ): - - # fix - tmp_link_path = f'{layer_path}_tmplink' - os.symlink(compressed_symlinked_path, tmp_link_path) - os.unlink(layer_path) - os.rename(tmp_link_path, layer_path) - elif not os.path.exists(symlinked_path): - self._logger.log_and_raise( - 'error', - 'Target for symlink is missing. Cannot continue pushing', - symlinked_path=symlinked_path, - layer_path=layer_path, - image=image, - ) - - # safety - handle cases where some race or corruption may have occurred, if .tar.gz is in place, skip - # compression and ignore the original - if not os.path.exists(layer_path + '.gz'): - self._logger.debug( - 'Layer file is not gzipped - compressing before upload', - layer_path=layer_path, - ) - - # use -f to avoid "Too many levels of symbolic links" failures - try: - # use -f to avoid "Too many levels of symbolic links" failures - gzip_cmd = shlex.split(f'gzip -9 -f {layer_path}') - out = subprocess.check_output(gzip_cmd, encoding='utf-8') - self._logger.debug( - 'Successfully gzipped layer', gzip_cmd=gzip_cmd, out=out - ) - - # catch and print for debugging - except Exception as exc: - cmd = shlex.split(f'ls -latr {os.path.dirname(layer_path)}') - out = subprocess.check_output(cmd, encoding='utf-8') - self._logger.warn( - 'Finished ls command', - cmd=cmd, - out=out, - orig_exc=exc, - ) - raise - - # whether we compressed it now or beforehand, the new name has this new prefix by gzip - layer += '.gz' - layer_path += '.gz' - self._logger.info('Pushing layer', layer=layer) push_url = self._initialize_push(image) digest, size = self._push_layer(layer_path, push_url) - self._layers_info[layer_key] = { - 'digest': digest, - 'size': size, - } self._logger.info('Layer pushed', layer=layer, digest=digest, size=size) return digest, size finally: @@ -323,9 +242,6 @@ def _chunked_upload(self, filepath, initial_url): length_read += len(chunk) offset = index + len(chunk) - if content_path.endswith('gz') or content_path.endswith('gzip'): - headers['Content-Encoding'] = 'gzip' - headers['Content-Type'] = 'application/octet-stream' headers['Content-Length'] = str(len(chunk)) headers['Content-Range'] = f'{index}-{offset}' @@ -433,8 +349,3 @@ def _replace_tag(self, image, orig_tag): ) return orig_tag - - @staticmethod - def _load_json_file(filepath): - with open(filepath, 'r') as fh: - return json.loads(fh.read()) diff --git a/utils/helpers.py b/utils/helpers.py new file mode 100644 index 0000000..3c8ef63 --- /dev/null +++ b/utils/helpers.py @@ -0,0 +1,27 @@ +import hashlib +import json + + +def get_digest(filepath): + return "sha256:" + get_file_sha256(filepath) + + +def load_json_file(filepath): + with open(filepath, 'r') as fh: + return json.loads(fh.read()) + + +def dump_json_file(filepath, json_contents): + with open(filepath, 'w') as fh: + json.dump(json_contents, fh) + + +def get_file_sha256(filepath): + sha256hash = hashlib.sha256() + with open(filepath, "rb") as f: + while True: + data = f.read(65536) + sha256hash.update(data) + if not data: + break + return sha256hash.hexdigest() From f452bff9958660b9f9bd0c33564b72717d9d03f8 Mon Sep 17 00:00:00 2001 From: omesser Date: Wed, 3 Mar 2021 16:00:08 +0200 Subject: [PATCH 27/49] Write manifest --- core/processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/processor.py b/core/processor.py index 485f3ca..5c3ce86 100644 --- a/core/processor.py +++ b/core/processor.py @@ -195,7 +195,7 @@ def _compress_layer_files(self, root_dir): image_config["Layers"][idx] = layer + '.gz' # write modified image config - self._write_manifest(root_dir) + self._write_manifest(root_dir, manifest) elapsed = time.time() - start_time self._logger.info( From 8050a6e980e4b7fb29d0fcc76a61a81261e41a4d Mon Sep 17 00:00:00 2001 From: omesser Date: Wed, 3 Mar 2021 16:38:34 +0200 Subject: [PATCH 28/49] Log harder --- core/processor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/processor.py b/core/processor.py index 5c3ce86..e6728bf 100644 --- a/core/processor.py +++ b/core/processor.py @@ -199,7 +199,8 @@ def _compress_layer_files(self, root_dir): elapsed = time.time() - start_time self._logger.info( - 'Finished compressing all layer files (pre-processing)', elapsed=elapsed + 'Finished compressing all layer files (pre-processing)', + elapsed=humanfriendly.format_timespan(elapsed) ) @staticmethod From dc3428431a3ec3fcf37485f2302693a48e299ed1 Mon Sep 17 00:00:00 2001 From: omesser Date: Wed, 3 Mar 2021 16:44:49 +0200 Subject: [PATCH 29/49] M --- core/processor.py | 2 +- core/registry.py | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/core/processor.py b/core/processor.py index e6728bf..71ba8e3 100644 --- a/core/processor.py +++ b/core/processor.py @@ -72,7 +72,7 @@ def process(self): self._extractor.extract_all(tmp_dir_name) # compress layers in place - for kaniko - self._compress_layer_files(tmp_dir_name) + # self._compress_layer_files(tmp_dir_name) manifest = self._get_manifest(tmp_dir_name) self._logger.debug('Extracted archive manifest', manifest=manifest) diff --git a/core/registry.py b/core/registry.py index e60e597..89ecfb7 100644 --- a/core/registry.py +++ b/core/registry.py @@ -6,7 +6,7 @@ import urllib.parse import time import threading - +import zlib import humanfriendly import requests import requests.auth @@ -245,6 +245,11 @@ def _chunked_upload(self, filepath, initial_url): headers['Content-Type'] = 'application/octet-stream' headers['Content-Length'] = str(len(chunk)) headers['Content-Range'] = f'{index}-{offset}' + + # compress + headers['content-encoding'] = 'gzip' + request_body = zlib.compress(chunk) + index = offset last = False if length_read == total_size: @@ -262,7 +267,7 @@ def _chunked_upload(self, filepath, initial_url): digest = f'sha256:{str(sha256hash.hexdigest())}' response = requests.put( f"{upload_url}&digest={digest}", - data=chunk, + data=request_body, headers=headers, auth=self._basicauth, verify=self._ssl_verify, @@ -279,7 +284,7 @@ def _chunked_upload(self, filepath, initial_url): else: response = requests.patch( upload_url, - data=chunk, + data=request_body, headers=headers, auth=self._basicauth, verify=self._ssl_verify, From 5982f8d775af15fd24abde8ea8d2b7beb3eb1aeb Mon Sep 17 00:00:00 2001 From: omesser Date: Wed, 3 Mar 2021 17:01:43 +0200 Subject: [PATCH 30/49] M --- core/registry.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/registry.py b/core/registry.py index 89ecfb7..6c4aa4c 100644 --- a/core/registry.py +++ b/core/registry.py @@ -242,14 +242,14 @@ def _chunked_upload(self, filepath, initial_url): length_read += len(chunk) offset = index + len(chunk) - headers['Content-Type'] = 'application/octet-stream' - headers['Content-Length'] = str(len(chunk)) - headers['Content-Range'] = f'{index}-{offset}' - # compress headers['content-encoding'] = 'gzip' request_body = zlib.compress(chunk) + headers['Content-Type'] = 'application/octet-stream' + headers['Content-Length'] = str(len(request_body)) + headers['Content-Range'] = f'{index}-{offset}' + index = offset last = False if length_read == total_size: From a18f8f609a00cc93621f153746ea66093e4193aa Mon Sep 17 00:00:00 2001 From: omesser Date: Wed, 3 Mar 2021 18:30:14 +0200 Subject: [PATCH 31/49] Resolved existing layer skip issue --- core/processor.py | 2 +- core/registry.py | 61 ++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/core/processor.py b/core/processor.py index 71ba8e3..401c762 100644 --- a/core/processor.py +++ b/core/processor.py @@ -200,7 +200,7 @@ def _compress_layer_files(self, root_dir): elapsed = time.time() - start_time self._logger.info( 'Finished compressing all layer files (pre-processing)', - elapsed=humanfriendly.format_timespan(elapsed) + elapsed=humanfriendly.format_timespan(elapsed), ) @staticmethod diff --git a/core/registry.py b/core/registry.py index 6c4aa4c..3fc1ee8 100644 --- a/core/registry.py +++ b/core/registry.py @@ -113,7 +113,10 @@ def process_image(self, tmp_dir_name, image_config): # then, push image config self._logger.info( - 'Pushing image config', image=image, config_loc=config_filename + 'Pushing image config', + image=image, + config_loc=config_filename, + config_parsed=config_parsed, ) push_url = self._initialize_push(image) self._push_config(config_path, push_url) @@ -185,6 +188,38 @@ def _process_layer(self, layer, image, tmp_dir_name): try: layer_path = os.path.abspath(os.path.join(tmp_dir_name, layer)) + digest = utils.helpers.get_digest(layer_path) + self._logger.debug( + 'Check if layer exists in registry', + layer=layer, + image=image, + digest=digest, + ) + response = requests.head( + f'{self._registry_url}/v2/{image}/blobs/{digest}', + auth=self._basicauth, + verify=self._ssl_verify, + ) + + if response.status_code == 200: + size = response.headers['Content-Length'] + digest = response.headers['Docker-Content-Digest'] + self._logger.info( + 'Layer exists in registry, skipping', + layer=layer, + image=image, + size=size, + digest=digest, + ) + return digest, int(size) + self._logger.debug( + 'Layer does not exist and will be pushed', + layer=layer, + image=image, + response_code=response.status_code, + response_content=response.content, + ) + self._logger.info('Pushing layer', layer=layer) push_url = self._initialize_push(image) @@ -220,31 +255,40 @@ def _initialize_push(self, repository): return upload_url def _push_layer(self, layer_path, upload_url): - return self._chunked_upload(layer_path, upload_url) + return self._chunked_upload(layer_path, upload_url, compress=False) def _push_config(self, config_path, upload_url): self._chunked_upload(config_path, upload_url) - def _chunked_upload(self, filepath, initial_url): + def _chunked_upload(self, filepath, initial_url, compress=False): content_path = os.path.abspath(filepath) total_size = os.stat(content_path).st_size total_pushed_size = 0 length_read = 0 digest = None + self._logger.debug( + 'Pushing layer', + filepath=filepath, + initial_url=initial_url, + compress=compress, + ) with open(content_path, "rb") as f: index = 0 upload_url = initial_url headers = {} sha256hash = hashlib.sha256() - for chunk in self._read_in_chunks(f, sha256hash): + for chunk in self._read_in_chunks(f): length_read += len(chunk) offset = index + len(chunk) + request_body = chunk + + if compress: + headers['content-encoding'] = 'gzip' + request_body = zlib.compress(chunk) - # compress - headers['content-encoding'] = 'gzip' - request_body = zlib.compress(chunk) + sha256hash.update(request_body) headers['Content-Type'] = 'application/octet-stream' headers['Content-Length'] = str(len(request_body)) @@ -315,13 +359,12 @@ def _chunked_upload(self, filepath, initial_url): return digest, total_pushed_size @staticmethod - def _read_in_chunks(file_object, hashed, chunk_size=2097152): + def _read_in_chunks(file_object, chunk_size=2097152): """ Chunk size default 2T """ while True: data = file_object.read(chunk_size) - hashed.update(data) if not data: break yield data From c0956d972a57bafce0b0d0a29dd664794b4988c5 Mon Sep 17 00:00:00 2001 From: omesser Date: Wed, 3 Mar 2021 18:47:16 +0200 Subject: [PATCH 32/49] compress --- core/registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/registry.py b/core/registry.py index 3fc1ee8..9d65674 100644 --- a/core/registry.py +++ b/core/registry.py @@ -255,7 +255,7 @@ def _initialize_push(self, repository): return upload_url def _push_layer(self, layer_path, upload_url): - return self._chunked_upload(layer_path, upload_url, compress=False) + return self._chunked_upload(layer_path, upload_url, compress=True) def _push_config(self, config_path, upload_url): self._chunked_upload(config_path, upload_url) From f39ee3bf8c3733bb818fd797b5fe1e36fa4ecc13 Mon Sep 17 00:00:00 2001 From: omesser Date: Wed, 3 Mar 2021 21:22:35 +0200 Subject: [PATCH 33/49] M --- .gitignore | 3 +- core/processor.py | 90 +++++++++++++++++++++++++++++++---------------- core/registry.py | 18 ++++------ 3 files changed, 67 insertions(+), 44 deletions(-) diff --git a/.gitignore b/.gitignore index e5c6535..8daaf33 100644 --- a/.gitignore +++ b/.gitignore @@ -10,5 +10,6 @@ ENV/ # pycharm proj .idea -# stray tar.gz files used for manual testing +# stray archive files used for manual testing +*.tar *.tar.gz diff --git a/core/processor.py b/core/processor.py index 401c762..c0db144 100644 --- a/core/processor.py +++ b/core/processor.py @@ -6,6 +6,7 @@ import json import subprocess import shlex +import gzip import humanfriendly @@ -72,7 +73,7 @@ def process(self): self._extractor.extract_all(tmp_dir_name) # compress layers in place - for kaniko - # self._compress_layer_files(tmp_dir_name) + self._pre_process_contents(tmp_dir_name) manifest = self._get_manifest(tmp_dir_name) self._logger.debug('Extracted archive manifest', manifest=manifest) @@ -101,49 +102,71 @@ def process(self): elapsed=humanfriendly.format_timespan(elapsed), ) - def _compress_layer_files(self, root_dir): + def _pre_process_contents(self, root_dir): start_time = time.time() + self._logger.debug('Preprocessing extracted contents') # for Kaniko compatibility - must be real tar.gzip and not just tar - self._logger.info('Compressing all layer files (pre-processing)') + self._compress_layers(root_dir) + self._correct_symlinks(root_dir) + self._update_manifests(root_dir) + + elapsed = time.time() - start_time + self._logger.info( + 'Finished compressing all layer files (pre-processing)', + elapsed=humanfriendly.format_timespan(elapsed), + ) + + def _compress_layers(self, root_dir): + self._logger.debug('Compressing all layer files (pre-processing)') for tar_files in pathlib.Path(root_dir).rglob('*.tar'): - file_path = tar_files.absolute() - self._logger.info('Compressing file (.tar -> .tar.gz)', file_path=file_path) + file_path = str(tar_files.absolute()) + gzipped_file_path = str(file_path) + '.gz' # safety - if .tar.gz is in place, skip # compression and ignore the original - if os.path.exists(str(file_path) + '.gz'): + if os.path.exists(gzipped_file_path): self._logger.debug( 'Layer file is gzipped - skipping', file_path=file_path, - gzipped_path=str(file_path) + '.gz', + gzipped_path=gzipped_file_path, ) continue # use -f to avoid "Too many levels of symbolic links" failures try: # use -f to avoid "Too many levels of symbolic links" failures - gzip_cmd = shlex.split(f'gzip -9 -f {file_path}') - out = subprocess.check_output(gzip_cmd, encoding='utf-8') + + # inplace .tar ->.tar.gz + self._logger.info('Compressing layer file', file_path=file_path) + + with open(file_path, 'rb') as f_in, gzip.open(gzipped_file_path, 'wb') as f_out: + f_out.writelines(f_in) + self._logger.debug( 'Successfully gzipped layer', - gzip_cmd=gzip_cmd, - out=out, + gzipped_file_path=gzipped_file_path, file_path=file_path, ) - # catch and print for debugging except Exception as exc: - cmd = shlex.split(f'ls -latr {os.path.dirname(file_path)}') - out = subprocess.check_output(cmd, encoding='utf-8') - self._logger.warn( - 'Finished ls command', - cmd=cmd, - out=out, - orig_exc=exc, + + # print debugging info + layer_dir = pathlib.Path(file_path).parents[0] + files = layer_dir.glob('**/*') + self._logger.debug( + 'Listed elements in layer dir', + files=files, + layer_dir=layer_dir, + exc=exc, ) raise + self._logger.debug('Finished compressing all layer files') + + def _correct_symlinks(self, root_dir): + self._logger.debug('Updating symlinks') + # fix symlinks for root, dirs, files in os.walk(root_dir): for filename in files: @@ -179,29 +202,34 @@ def _compress_layer_files(self, root_dir): target_path=target_path, path=path, ) + self._logger.debug('Finished updating symlinks') + + def _update_manifests(self, root_dir): self._logger.debug('Correcting image manifests') manifest = self._get_manifest(root_dir) - for image_config in manifest: - config_filename = image_config["Config"] + + for manifest_image_section in manifest: + config_filename = manifest_image_section["Config"] # rootfs.diff_ids contains layer digests - TODO change them? config_path = os.path.join(root_dir, config_filename) - config_parsed = utils.helpers.load_json_file(config_path) + image_config = utils.helpers.load_json_file(config_path) # warning - spammy - self._logger.verbose('Parsed image config', config_parsed=config_parsed) - for idx, layer in enumerate(image_config["Layers"]): + self._logger.verbose('Parsed image config', image_config=image_config) + + for idx, layer in enumerate(manifest_image_section["Layers"]): if layer.endswith('.tar'): - image_config["Layers"][idx] = layer + '.gz' + gzipped_layer_file_path = layer + '.gz' + manifest_image_section["Layers"][idx] = gzipped_layer_file_path + image_config["rootfs"]['diff_ids'][idx] = \ + utils.helpers.get_digest(os.path.join(root_dir, gzipped_layer_file_path)) + self._logger.debug('Corrected image config', config_path=config_path, image_config=image_config) + utils.helpers.dump_json_file(config_path, image_config) # write modified image config self._write_manifest(root_dir, manifest) - - elapsed = time.time() - start_time - self._logger.info( - 'Finished compressing all layer files (pre-processing)', - elapsed=humanfriendly.format_timespan(elapsed), - ) + self._logger.debug('Corrected image manifests', manifest=manifest) @staticmethod def _get_manifest(archive_dir): diff --git a/core/registry.py b/core/registry.py index 9d65674..f875af5 100644 --- a/core/registry.py +++ b/core/registry.py @@ -255,12 +255,12 @@ def _initialize_push(self, repository): return upload_url def _push_layer(self, layer_path, upload_url): - return self._chunked_upload(layer_path, upload_url, compress=True) + return self._chunked_upload(layer_path, upload_url) def _push_config(self, config_path, upload_url): self._chunked_upload(config_path, upload_url) - def _chunked_upload(self, filepath, initial_url, compress=False): + def _chunked_upload(self, filepath, initial_url): content_path = os.path.abspath(filepath) total_size = os.stat(content_path).st_size @@ -271,7 +271,6 @@ def _chunked_upload(self, filepath, initial_url, compress=False): 'Pushing layer', filepath=filepath, initial_url=initial_url, - compress=compress, ) with open(content_path, "rb") as f: index = 0 @@ -282,16 +281,11 @@ def _chunked_upload(self, filepath, initial_url, compress=False): for chunk in self._read_in_chunks(f): length_read += len(chunk) offset = index + len(chunk) - request_body = chunk - if compress: - headers['content-encoding'] = 'gzip' - request_body = zlib.compress(chunk) - - sha256hash.update(request_body) + sha256hash.update(chunk) headers['Content-Type'] = 'application/octet-stream' - headers['Content-Length'] = str(len(request_body)) + headers['Content-Length'] = str(len(chunk)) headers['Content-Range'] = f'{index}-{offset}' index = offset @@ -311,7 +305,7 @@ def _chunked_upload(self, filepath, initial_url, compress=False): digest = f'sha256:{str(sha256hash.hexdigest())}' response = requests.put( f"{upload_url}&digest={digest}", - data=request_body, + data=chunk, headers=headers, auth=self._basicauth, verify=self._ssl_verify, @@ -328,7 +322,7 @@ def _chunked_upload(self, filepath, initial_url, compress=False): else: response = requests.patch( upload_url, - data=request_body, + data=chunk, headers=headers, auth=self._basicauth, verify=self._ssl_verify, From 21611b78779d2e96317209ff3f98024f94a43146 Mon Sep 17 00:00:00 2001 From: omesser Date: Wed, 3 Mar 2021 21:22:52 +0200 Subject: [PATCH 34/49] fmting --- core/processor.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/core/processor.py b/core/processor.py index c0db144..54c6ab9 100644 --- a/core/processor.py +++ b/core/processor.py @@ -140,7 +140,9 @@ def _compress_layers(self, root_dir): # inplace .tar ->.tar.gz self._logger.info('Compressing layer file', file_path=file_path) - with open(file_path, 'rb') as f_in, gzip.open(gzipped_file_path, 'wb') as f_out: + with open(file_path, 'rb') as f_in, gzip.open( + gzipped_file_path, 'wb' + ) as f_out: f_out.writelines(f_in) self._logger.debug( @@ -222,9 +224,14 @@ def _update_manifests(self, root_dir): if layer.endswith('.tar'): gzipped_layer_file_path = layer + '.gz' manifest_image_section["Layers"][idx] = gzipped_layer_file_path - image_config["rootfs"]['diff_ids'][idx] = \ - utils.helpers.get_digest(os.path.join(root_dir, gzipped_layer_file_path)) - self._logger.debug('Corrected image config', config_path=config_path, image_config=image_config) + image_config["rootfs"]['diff_ids'][idx] = utils.helpers.get_digest( + os.path.join(root_dir, gzipped_layer_file_path) + ) + self._logger.debug( + 'Corrected image config', + config_path=config_path, + image_config=image_config, + ) utils.helpers.dump_json_file(config_path, image_config) # write modified image config From 0b733f89f60705a702db9e608328ff1e0febd1e5 Mon Sep 17 00:00:00 2001 From: omesser Date: Thu, 4 Mar 2021 05:30:53 +0200 Subject: [PATCH 35/49] M --- core/__init__.py | 2 +- ...t_creator.py => image_manifest_creator.py} | 6 +- core/processor.py | 79 ++++++++++++++++--- core/registry.py | 33 +++++--- dockerregistrypusher.py | 26 ++++++ 5 files changed, 120 insertions(+), 26 deletions(-) rename core/{manifest_creator.py => image_manifest_creator.py} (77%) diff --git a/core/__init__.py b/core/__init__.py index d35f993..609e965 100755 --- a/core/__init__.py +++ b/core/__init__.py @@ -1,4 +1,4 @@ -from .manifest_creator import ImageManifestCreator +from .image_manifest_creator import ImageManifestCreator from .registry import Registry from .extractor import Extractor from .processor import Processor diff --git a/core/manifest_creator.py b/core/image_manifest_creator.py similarity index 77% rename from core/manifest_creator.py rename to core/image_manifest_creator.py index 9ea0f5e..c106089 100644 --- a/core/manifest_creator.py +++ b/core/image_manifest_creator.py @@ -20,8 +20,12 @@ def create(self): } manifest["layers"] = [] for layer_info in self._layers_info: + if layer_info['ext'].endswith('gz'): + media_type = "application/vnd.docker.image.rootfs.diff.tar.gzip" + else: + media_type = "application/vnd.docker.image.rootfs.diff.tar" layer_data = { - "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip", + "mediaType": media_type, "size": layer_info['size'], "digest": layer_info['digest'], } diff --git a/core/processor.py b/core/processor.py index 54c6ab9..b9db9fc 100644 --- a/core/processor.py +++ b/core/processor.py @@ -3,9 +3,8 @@ import time import os.path import pathlib +import shutil import json -import subprocess -import shlex import gzip import humanfriendly @@ -19,10 +18,13 @@ class Processor(object): def __init__( self, logger, + tmp_dir, + tmp_dir_override, parallel, registry_url, archive_path, stream=False, + gzip_layers=False, login=None, password=None, ssl_verify=True, @@ -31,6 +33,9 @@ def __init__( ): self._logger = logger self._parallel = parallel + self._tmp_dir = tmp_dir + self._tmp_dir_override = tmp_dir_override + self._gzip_layers = gzip_layers if parallel > 1 and stream: self._logger.info( @@ -60,8 +65,14 @@ def process(self): """ start_time = time.time() results = [] - with tempfile.TemporaryDirectory() as tmp_dir_name: - + if self._tmp_dir_override: + tmp_dir_name = self._tmp_dir_override + os.mkdir(tmp_dir_name, 0o700) + else: + tmp_dir_name = tempfile.mkdtemp(dir=self._tmp_dir) + + # since we're not always using TemporaryDirectory we're making and cleaning up ourselves + try: self._logger.info( 'Processing archive', archive_path=self._extractor.archive_path, @@ -72,9 +83,11 @@ def process(self): # extract the whole thing self._extractor.extract_all(tmp_dir_name) - # compress layers in place - for kaniko - self._pre_process_contents(tmp_dir_name) + # pre-process layers in place - for kaniko + if self._gzip_layers: + self._pre_process_contents(tmp_dir_name) + self._verify_configs_integrity(tmp_dir_name) manifest = self._get_manifest(tmp_dir_name) self._logger.debug('Extracted archive manifest', manifest=manifest) @@ -91,9 +104,12 @@ def process(self): pool.close() pool.join() - # this will throw if any pool worker caught an exception - for res in results: - res.get() + # this will throw if any pool worker caught an exception + for res in results: + res.get() + finally: + # shutil.rmtree(tmp_dir_name) + pass elapsed = time.time() - start_time self._logger.info( @@ -127,7 +143,7 @@ def _compress_layers(self, root_dir): # compression and ignore the original if os.path.exists(gzipped_file_path): self._logger.debug( - 'Layer file is gzipped - skipping', + 'Layer file is already gzipped - skipping', file_path=file_path, gzipped_path=gzipped_file_path, ) @@ -145,6 +161,7 @@ def _compress_layers(self, root_dir): ) as f_out: f_out.writelines(f_in) + os.remove(file_path) self._logger.debug( 'Successfully gzipped layer', gzipped_file_path=gzipped_file_path, @@ -212,8 +229,6 @@ def _update_manifests(self, root_dir): for manifest_image_section in manifest: config_filename = manifest_image_section["Config"] - - # rootfs.diff_ids contains layer digests - TODO change them? config_path = os.path.join(root_dir, config_filename) image_config = utils.helpers.load_json_file(config_path) @@ -227,8 +242,13 @@ def _update_manifests(self, root_dir): image_config["rootfs"]['diff_ids'][idx] = utils.helpers.get_digest( os.path.join(root_dir, gzipped_layer_file_path) ) + self._logger.debug( + 'XXX - layer has sha256', + gzipped_layer_file_path=gzipped_layer_file_path, + digest=image_config["rootfs"]['diff_ids'][idx], + ) self._logger.debug( - 'Corrected image config', + '', config_path=config_path, image_config=image_config, ) @@ -238,6 +258,39 @@ def _update_manifests(self, root_dir): self._write_manifest(root_dir, manifest) self._logger.debug('Corrected image manifests', manifest=manifest) + def _verify_configs_integrity(self, root_dir): + self._logger.debug('Verifying configurations consistency') + manifest = self._get_manifest(root_dir) + + # check for layer mismatches + for manifest_image_section in manifest: + config_filename = manifest_image_section["Config"] + config_path = os.path.join(root_dir, config_filename) + image_config = utils.helpers.load_json_file(config_path) + + # warning - spammy + self._logger.debug('Parsed image config', image_config=image_config) + + for layer_idx, layer in enumerate(manifest_image_section["Layers"]): + self._logger.debug( + 'Inspecting layer', image_config=config_path, layer_idx=layer_idx + ) + layer_path = os.path.join(root_dir, layer) + digest_from_manifest_path = utils.helpers.get_digest(layer_path) + digest_from_image_config = image_config['rootfs']['diff_ids'][layer_idx] + log_kwargs = { + 'digest_from_manifest_path': digest_from_manifest_path, + 'digest_from_image_config': digest_from_image_config, + 'layer_idx': layer_idx, + } + self._logger.debug('Digests comparison passed', **log_kwargs) + if digest_from_image_config != digest_from_image_config: + self._logger.log_and_raise( + 'error', 'Failed layer digest validation', **log_kwargs + ) + + self._logger.debug('Finished config/manifest verification', manifest=manifest) + @staticmethod def _get_manifest(archive_dir): with open(os.path.join(archive_dir, 'manifest.json'), 'r') as fh: diff --git a/core/registry.py b/core/registry.py index f875af5..149569d 100644 --- a/core/registry.py +++ b/core/registry.py @@ -1,17 +1,15 @@ import os import os.path import re -import json import hashlib import urllib.parse import time import threading -import zlib import humanfriendly import requests import requests.auth -from . import manifest_creator +from . import image_manifest_creator import utils.helpers @@ -108,6 +106,7 @@ def process_image(self, tmp_dir_name, image_config): { 'digest': layer_digest, 'size': layer_size, + 'ext': os.path.splitext(layer)[1], } ) @@ -122,7 +121,7 @@ def process_image(self, tmp_dir_name, image_config): self._push_config(config_path, push_url) # Now we need to create and push a manifest for the image - creator = manifest_creator.ImageManifestCreator( + creator = image_manifest_creator.ImageManifestCreator( config_path, manifest_layer_info ) image_manifest = creator.create() @@ -147,7 +146,7 @@ def process_image(self, tmp_dir_name, image_config): elapsed=humanfriendly.format_timespan(image_elapsed), ) - def _conditional_print(self, what, end=None): + def _stream_print(self, what, end=None): if self._stream: if end: print(what, end=end) @@ -202,7 +201,7 @@ def _process_layer(self, layer, image, tmp_dir_name): ) if response.status_code == 200: - size = response.headers['Content-Length'] + size = int(response.headers['Content-Length']) digest = response.headers['Docker-Content-Digest'] self._logger.info( 'Layer exists in registry, skipping', @@ -211,7 +210,7 @@ def _process_layer(self, layer, image, tmp_dir_name): size=size, digest=digest, ) - return digest, int(size) + return digest, size self._logger.debug( 'Layer does not exist and will be pushed', layer=layer, @@ -268,9 +267,10 @@ def _chunked_upload(self, filepath, initial_url): length_read = 0 digest = None self._logger.debug( - 'Pushing layer', + 'Pushing chunked content', filepath=filepath, initial_url=initial_url, + total_size=humanfriendly.format_size(total_size, binary=True), ) with open(content_path, "rb") as f: index = 0 @@ -293,7 +293,7 @@ def _chunked_upload(self, filepath, initial_url): if length_read == total_size: last = True try: - self._conditional_print( + self._stream_print( "Pushing... " + str(round((offset / total_size) * 100, 2)) + "% ", @@ -341,6 +341,7 @@ def _chunked_upload(self, filepath, initial_url): upload_url = response.headers["Location"] total_pushed_size += len(chunk) + except Exception as exc: self._logger.log_and_raise( 'error', @@ -349,11 +350,21 @@ def _chunked_upload(self, filepath, initial_url): exc=exc, ) - self._conditional_print("") + # sanity + if total_pushed_size != total_size: + self._logger.log_and_raise( + 'error', + 'File size and pushed size differ, inconsistency detected', + total_pushed_size=total_pushed_size, + total_size=total_size, + filepath=filepath, + ) + + self._stream_print("") return digest, total_pushed_size @staticmethod - def _read_in_chunks(file_object, chunk_size=2097152): + def _read_in_chunks(file_object, chunk_size=65536): """ Chunk size default 2T """ diff --git a/dockerregistrypusher.py b/dockerregistrypusher.py index abdf749..d5bac0b 100644 --- a/dockerregistrypusher.py +++ b/dockerregistrypusher.py @@ -28,10 +28,13 @@ def run(args): # initialize and start processing processor = core.Processor( logger=logger, + tmp_dir=args.tmp_dir, + tmp_dir_override=args.tmp_dir_override, parallel=args.parallel, registry_url=args.registry_url, archive_path=args.archive_path, stream=args.stream, + gzip_layers=args.gzip_layers, login=args.login, password=args.password, ssl_verify=args.ssl_verify, @@ -68,6 +71,22 @@ def register_arguments(parser): default=1, ) + parser.add_argument( + '-td', + '--tmp-dir', + help='Create the temporary workspace inside this dir (optional)', + type=str, + required=False, + ) + + parser.add_argument( + '-tdo', + '--tmp-dir-override', + help='Use this dir as the temporary workspace (optional)', + type=str, + required=False, + ) + parser.add_argument( 'archive_path', metavar='ARCHIVE_PATH', @@ -107,6 +126,13 @@ def register_arguments(parser): default=True, ) + parser.add_argument( + '--gzip-layers', + help='Gzip all layers (pre-processing) before pushing', + type=bool, + default=False, + ) + parser.add_argument( '--replace-tags-match', help='A regex string to match on tags. If matches will be replaces with --replace-tags-target', From 2d369da7470c0f5938b7fc20611aa78890f21bd1 Mon Sep 17 00:00:00 2001 From: omesser Date: Thu, 4 Mar 2021 06:58:12 +0200 Subject: [PATCH 36/49] M --- core/image_manifest_creator.py | 9 +++++---- core/processor.py | 3 +-- core/registry.py | 28 ++++++++++++++++++---------- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/core/image_manifest_creator.py b/core/image_manifest_creator.py index c106089..f5e1531 100644 --- a/core/image_manifest_creator.py +++ b/core/image_manifest_creator.py @@ -5,9 +5,10 @@ class ImageManifestCreator(object): - def __init__(self, config_path, layers_info): + def __init__(self, config_path, layers_info, config_info): self._config_path = config_path self._layers_info = layers_info + self._config_info = config_info def create(self): manifest = dict() @@ -15,12 +16,12 @@ def create(self): manifest["mediaType"] = "application/vnd.docker.distribution.manifest.v2+json" manifest["config"] = { "mediaType": "application/vnd.docker.container.image.v1+json", - "size": os.path.getsize(self._config_path), - "digest": utils.helpers.get_digest(self._config_path), + "size": self._config_info['size'], + "digest": self._config_info['digest'], } manifest["layers"] = [] for layer_info in self._layers_info: - if layer_info['ext'].endswith('gz'): + if layer_info['ext'].endswith('gz') or layer_info['ext'].endswith('gzip'): media_type = "application/vnd.docker.image.rootfs.diff.tar.gzip" else: media_type = "application/vnd.docker.image.rootfs.diff.tar" diff --git a/core/processor.py b/core/processor.py index b9db9fc..533fda5 100644 --- a/core/processor.py +++ b/core/processor.py @@ -108,8 +108,7 @@ def process(self): for res in results: res.get() finally: - # shutil.rmtree(tmp_dir_name) - pass + shutil.rmtree(tmp_dir_name) elapsed = time.time() - start_time self._logger.info( diff --git a/core/registry.py b/core/registry.py index 149569d..eab9d2e 100644 --- a/core/registry.py +++ b/core/registry.py @@ -118,18 +118,18 @@ def process_image(self, tmp_dir_name, image_config): config_parsed=config_parsed, ) push_url = self._initialize_push(image) - self._push_config(config_path, push_url) - + digest, size = self._push_config(config_path, push_url) + config_info = {'digest': digest, 'size': size} # Now we need to create and push a manifest for the image creator = image_manifest_creator.ImageManifestCreator( - config_path, manifest_layer_info + config_path, manifest_layer_info, config_info ) image_manifest = creator.create() # Override tags if needed: from --replace-tags-match and --replace-tags-target tag = self._replace_tag(image, tag) - self._logger.info('Pushing image tag manifest', image=image, tag=tag) + self._logger.info('Pushing image tag manifest', image=image, tag=tag, image_manifest=image_manifest) self._push_manifest(image_manifest, image, tag) repo_tag_elapsed = time.time() - repo_tag_start_time self._logger.info( @@ -257,11 +257,12 @@ def _push_layer(self, layer_path, upload_url): return self._chunked_upload(layer_path, upload_url) def _push_config(self, config_path, upload_url): - self._chunked_upload(config_path, upload_url) + return self._chunked_upload(config_path, upload_url) def _chunked_upload(self, filepath, initial_url): content_path = os.path.abspath(filepath) total_size = os.stat(content_path).st_size + response_digest = None total_pushed_size = 0 length_read = 0 @@ -319,6 +320,8 @@ def _chunked_upload(self, filepath, initial_url): status_code=response.status_code, content=response.content, ) + + response_digest = response.headers["Docker-Content-Digest"] else: response = requests.patch( upload_url, @@ -359,15 +362,20 @@ def _chunked_upload(self, filepath, initial_url): total_size=total_size, filepath=filepath, ) + if response_digest != digest: + self._logger.log_and_raise( + 'error', + 'Server-side digest different from client digest', + response_digest=response_digest, + digest=digest, + filepath=filepath, + ) self._stream_print("") - return digest, total_pushed_size + return response_digest, total_pushed_size @staticmethod - def _read_in_chunks(file_object, chunk_size=65536): - """ - Chunk size default 2T - """ + def _read_in_chunks(file_object, chunk_size=256 * (1024 ** 2)): while True: data = file_object.read(chunk_size) if not data: From 86564dde183e0435e0f573d028f0fb26de8c0488 Mon Sep 17 00:00:00 2001 From: omesser Date: Thu, 4 Mar 2021 07:59:29 +0200 Subject: [PATCH 37/49] Formatting --- core/processor.py | 1 + core/registry.py | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/processor.py b/core/processor.py index 533fda5..3e82cb2 100644 --- a/core/processor.py +++ b/core/processor.py @@ -109,6 +109,7 @@ def process(self): res.get() finally: shutil.rmtree(tmp_dir_name) + self._logger.verbose('Removed workdir', tmp_dir_name=tmp_dir_name) elapsed = time.time() - start_time self._logger.info( diff --git a/core/registry.py b/core/registry.py index eab9d2e..4dd8dfb 100644 --- a/core/registry.py +++ b/core/registry.py @@ -129,7 +129,12 @@ def process_image(self, tmp_dir_name, image_config): # Override tags if needed: from --replace-tags-match and --replace-tags-target tag = self._replace_tag(image, tag) - self._logger.info('Pushing image tag manifest', image=image, tag=tag, image_manifest=image_manifest) + self._logger.info( + 'Pushing image tag manifest', + image=image, + tag=tag, + image_manifest=image_manifest, + ) self._push_manifest(image_manifest, image, tag) repo_tag_elapsed = time.time() - repo_tag_start_time self._logger.info( From 9b1aa087eabcc0a4b638b767d736de7c7c4fbde2 Mon Sep 17 00:00:00 2001 From: omesser Date: Thu, 4 Mar 2021 20:22:06 +0200 Subject: [PATCH 38/49] Leave the discrepency --- core/image_manifest_creator.py | 4 ++- core/processor.py | 49 ++++++++++++++++------------------ core/registry.py | 30 ++++++++++----------- dockerregistrypusher.py | 2 +- 4 files changed, 42 insertions(+), 43 deletions(-) diff --git a/core/image_manifest_creator.py b/core/image_manifest_creator.py index f5e1531..95bdbd4 100644 --- a/core/image_manifest_creator.py +++ b/core/image_manifest_creator.py @@ -5,7 +5,9 @@ class ImageManifestCreator(object): - def __init__(self, config_path, layers_info, config_info): + def __init__(self, name, tag, config_path, layers_info, config_info): + self._name = name + self._tag = tag self._config_path = config_path self._layers_info = layers_info self._config_info = config_info diff --git a/core/processor.py b/core/processor.py index 3e82cb2..790c35d 100644 --- a/core/processor.py +++ b/core/processor.py @@ -123,9 +123,10 @@ def _pre_process_contents(self, root_dir): self._logger.debug('Preprocessing extracted contents') # for Kaniko compatibility - must be real tar.gzip and not just tar - self._compress_layers(root_dir) - self._correct_symlinks(root_dir) - self._update_manifests(root_dir) + gzip_ext = '.gz' + self._compress_layers(root_dir, gzip_ext) + self._correct_symlinks(root_dir, gzip_ext) + self._update_manifests(root_dir, gzip_ext) elapsed = time.time() - start_time self._logger.info( @@ -133,11 +134,12 @@ def _pre_process_contents(self, root_dir): elapsed=humanfriendly.format_timespan(elapsed), ) - def _compress_layers(self, root_dir): + def _compress_layers(self, root_dir, gzip_ext): self._logger.debug('Compressing all layer files (pre-processing)') + for tar_files in pathlib.Path(root_dir).rglob('*.tar'): file_path = str(tar_files.absolute()) - gzipped_file_path = str(file_path) + '.gz' + gzipped_file_path = str(file_path) + gzip_ext # safety - if .tar.gz is in place, skip # compression and ignore the original @@ -149,11 +151,9 @@ def _compress_layers(self, root_dir): ) continue - # use -f to avoid "Too many levels of symbolic links" failures try: - # use -f to avoid "Too many levels of symbolic links" failures - # inplace .tar ->.tar.gz + # .tar ->.tar.gzip self._logger.info('Compressing layer file', file_path=file_path) with open(file_path, 'rb') as f_in, gzip.open( @@ -183,7 +183,7 @@ def _compress_layers(self, root_dir): self._logger.debug('Finished compressing all layer files') - def _correct_symlinks(self, root_dir): + def _correct_symlinks(self, root_dir, gzip_ext): self._logger.debug('Updating symlinks') # fix symlinks @@ -203,7 +203,7 @@ def _correct_symlinks(self, root_dir): ) # try and fix - point to tar.gz - new_target_path = target_path + b'.gz' + new_target_path = target_path + bytes(gzip_ext) if os.path.exists(new_target_path): tmp_link_path = f'{target_path}_tmplink' os.symlink(new_target_path, tmp_link_path) @@ -223,7 +223,7 @@ def _correct_symlinks(self, root_dir): ) self._logger.debug('Finished updating symlinks') - def _update_manifests(self, root_dir): + def _update_manifests(self, root_dir, gzip_ext): self._logger.debug('Correcting image manifests') manifest = self._get_manifest(root_dir) @@ -237,22 +237,18 @@ def _update_manifests(self, root_dir): for idx, layer in enumerate(manifest_image_section["Layers"]): if layer.endswith('.tar'): - gzipped_layer_file_path = layer + '.gz' + gzipped_layer_file_path = layer + gzip_ext manifest_image_section["Layers"][idx] = gzipped_layer_file_path - image_config["rootfs"]['diff_ids'][idx] = utils.helpers.get_digest( - os.path.join(root_dir, gzipped_layer_file_path) - ) - self._logger.debug( - 'XXX - layer has sha256', - gzipped_layer_file_path=gzipped_layer_file_path, - digest=image_config["rootfs"]['diff_ids'][idx], - ) + # image_config["rootfs"]['diff_ids'][idx] = utils.helpers.get_digest( + # os.path.join(root_dir, gzipped_layer_file_path) + # ) + self._logger.debug( '', config_path=config_path, image_config=image_config, ) - utils.helpers.dump_json_file(config_path, image_config) + # utils.helpers.dump_json_file(config_path, image_config) # write modified image config self._write_manifest(root_dir, manifest) @@ -283,11 +279,12 @@ def _verify_configs_integrity(self, root_dir): 'digest_from_image_config': digest_from_image_config, 'layer_idx': layer_idx, } - self._logger.debug('Digests comparison passed', **log_kwargs) - if digest_from_image_config != digest_from_image_config: - self._logger.log_and_raise( - 'error', 'Failed layer digest validation', **log_kwargs - ) + if digest_from_image_config == digest_from_image_config: + self._logger.debug('Digests comparison passed', **log_kwargs) + # else: + # self._logger.log_and_raise( + # 'error', 'Failed layer digest validation', **log_kwargs + # ) self._logger.debug('Finished config/manifest verification', manifest=manifest) diff --git a/core/registry.py b/core/registry.py index 4dd8dfb..00dc4c2 100644 --- a/core/registry.py +++ b/core/registry.py @@ -75,8 +75,8 @@ def process_image(self, tmp_dir_name, image_config): Processing a single image entry from extracted files - pushing to registry """ repo_tags = image_config["RepoTags"] - config_filename = image_config["Config"] - config_path = os.path.join(tmp_dir_name, config_filename) + config_path_relative = image_config["Config"] + config_path = os.path.join(tmp_dir_name, config_path_relative) self._logger.info('Processing image', repo_tags=repo_tags) image_start_time = time.time() @@ -114,21 +114,21 @@ def process_image(self, tmp_dir_name, image_config): self._logger.info( 'Pushing image config', image=image, - config_loc=config_filename, + config_path=config_path, config_parsed=config_parsed, ) push_url = self._initialize_push(image) digest, size = self._push_config(config_path, push_url) config_info = {'digest': digest, 'size': size} # Now we need to create and push a manifest for the image - creator = image_manifest_creator.ImageManifestCreator( - config_path, manifest_layer_info, config_info - ) - image_manifest = creator.create() # Override tags if needed: from --replace-tags-match and --replace-tags-target tag = self._replace_tag(image, tag) + image_manifest = image_manifest_creator.ImageManifestCreator( + image, tag, config_path, manifest_layer_info, config_info + ).create() + self._logger.info( 'Pushing image tag manifest', image=image, @@ -264,8 +264,8 @@ def _push_layer(self, layer_path, upload_url): def _push_config(self, config_path, upload_url): return self._chunked_upload(config_path, upload_url) - def _chunked_upload(self, filepath, initial_url): - content_path = os.path.abspath(filepath) + def _chunked_upload(self, file_path, initial_url): + content_path = os.path.abspath(file_path) total_size = os.stat(content_path).st_size response_digest = None @@ -274,7 +274,7 @@ def _chunked_upload(self, filepath, initial_url): digest = None self._logger.debug( 'Pushing chunked content', - filepath=filepath, + file_path=file_path, initial_url=initial_url, total_size=humanfriendly.format_size(total_size, binary=True), ) @@ -321,7 +321,7 @@ def _chunked_upload(self, filepath, initial_url): 'error', 'Failed to complete upload', digest=digest, - filepath=filepath, + filepath=file_path, status_code=response.status_code, content=response.content, ) @@ -340,7 +340,7 @@ def _chunked_upload(self, filepath, initial_url): self._logger.log_and_raise( 'error', 'Failed to upload chunk', - filepath=filepath, + filepath=file_path, status_code=response.status_code, content=response.content, ) @@ -354,7 +354,7 @@ def _chunked_upload(self, filepath, initial_url): self._logger.log_and_raise( 'error', 'Failed to upload file', - filepath=filepath, + filepath=file_path, exc=exc, ) @@ -365,7 +365,7 @@ def _chunked_upload(self, filepath, initial_url): 'File size and pushed size differ, inconsistency detected', total_pushed_size=total_pushed_size, total_size=total_size, - filepath=filepath, + filepath=file_path, ) if response_digest != digest: self._logger.log_and_raise( @@ -373,7 +373,7 @@ def _chunked_upload(self, filepath, initial_url): 'Server-side digest different from client digest', response_digest=response_digest, digest=digest, - filepath=filepath, + filepath=file_path, ) self._stream_print("") diff --git a/dockerregistrypusher.py b/dockerregistrypusher.py index d5bac0b..6e353b3 100644 --- a/dockerregistrypusher.py +++ b/dockerregistrypusher.py @@ -129,7 +129,7 @@ def register_arguments(parser): parser.add_argument( '--gzip-layers', help='Gzip all layers (pre-processing) before pushing', - type=bool, + action='store_true', default=False, ) From e9eec6c84f7274ad7038f758e362417f1e16cfac Mon Sep 17 00:00:00 2001 From: omesser Date: Thu, 4 Mar 2021 23:22:39 +0200 Subject: [PATCH 39/49] symlinks first --- core/processor.py | 76 +++++++++++++++++++---------------------- dockerregistrypusher.py | 4 +-- 2 files changed, 37 insertions(+), 43 deletions(-) diff --git a/core/processor.py b/core/processor.py index 790c35d..9e6780d 100644 --- a/core/processor.py +++ b/core/processor.py @@ -124,8 +124,8 @@ def _pre_process_contents(self, root_dir): # for Kaniko compatibility - must be real tar.gzip and not just tar gzip_ext = '.gz' - self._compress_layers(root_dir, gzip_ext) self._correct_symlinks(root_dir, gzip_ext) + self._compress_layers(root_dir, gzip_ext) self._update_manifests(root_dir, gzip_ext) elapsed = time.time() - start_time @@ -134,6 +134,40 @@ def _pre_process_contents(self, root_dir): elapsed=humanfriendly.format_timespan(elapsed), ) + def _correct_symlinks(self, root_dir, gzip_ext): + self._logger.debug('Updating symlinks to compressed layers') + + # move layer symlinks to gz files, even if they are not there + for root, dirs, files in os.walk(root_dir): + for filename in files: + path = os.path.join(root, filename) + + # If it's not a symlink we're not interested. + if not os.path.islink(path): + continue + + target_path = os.readlink(path) + + if target_path.endswith(b'layer.tar'): + self._logger.debug( + 'Found link to tar layer, pointing to compressed', + target_path=target_path, + path=path, + ) + + # try and fix - point to tar.gz + new_target_path = target_path + bytes(gzip_ext) + tmp_link_path = f'{target_path}_tmplink' + os.symlink(new_target_path, tmp_link_path) + os.unlink(path) + os.rename(tmp_link_path, path) + self._logger.debug( + 'Moved layer link to compressed target', + new_target_path=new_target_path, + path=path, + ) + self._logger.debug('Finished updating symlinks') + def _compress_layers(self, root_dir, gzip_ext): self._logger.debug('Compressing all layer files (pre-processing)') @@ -183,46 +217,6 @@ def _compress_layers(self, root_dir, gzip_ext): self._logger.debug('Finished compressing all layer files') - def _correct_symlinks(self, root_dir, gzip_ext): - self._logger.debug('Updating symlinks') - - # fix symlinks - for root, dirs, files in os.walk(root_dir): - for filename in files: - path = os.path.join(root, filename) - - # If it's not a symlink we're not interested. - if not os.path.islink(path): - continue - - target_path = os.readlink(path) - - if not os.path.exists(target_path): - self._logger.debug( - 'Found broken link', target_path=target_path, path=path - ) - - # try and fix - point to tar.gz - new_target_path = target_path + bytes(gzip_ext) - if os.path.exists(new_target_path): - tmp_link_path = f'{target_path}_tmplink' - os.symlink(new_target_path, tmp_link_path) - os.unlink(path) - os.rename(tmp_link_path, path) - self._logger.debug( - 'Fixed broken link', - new_target_path=new_target_path, - path=path, - ) - else: - self._logger.log_and_raise( - 'error', - 'Cannot fix broken link', - target_path=target_path, - path=path, - ) - self._logger.debug('Finished updating symlinks') - def _update_manifests(self, root_dir, gzip_ext): self._logger.debug('Correcting image manifests') manifest = self._get_manifest(root_dir) diff --git a/dockerregistrypusher.py b/dockerregistrypusher.py index 6e353b3..d255929 100644 --- a/dockerregistrypusher.py +++ b/dockerregistrypusher.py @@ -129,8 +129,8 @@ def register_arguments(parser): parser.add_argument( '--gzip-layers', help='Gzip all layers (pre-processing) before pushing', - action='store_true', - default=False, + type=bool, + default=True, ) parser.add_argument( From ad4e7c2de2828f93330e71917d41b11e2bd71396 Mon Sep 17 00:00:00 2001 From: omesser Date: Fri, 5 Mar 2021 00:04:06 +0200 Subject: [PATCH 40/49] strings --- core/processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/processor.py b/core/processor.py index 9e6780d..f89587e 100644 --- a/core/processor.py +++ b/core/processor.py @@ -148,7 +148,7 @@ def _correct_symlinks(self, root_dir, gzip_ext): target_path = os.readlink(path) - if target_path.endswith(b'layer.tar'): + if str(target_path).endswith('layer.tar'): self._logger.debug( 'Found link to tar layer, pointing to compressed', target_path=target_path, From 6b5bf5e6b2a06f92e033ffa429033c46912a97e1 Mon Sep 17 00:00:00 2001 From: omesser Date: Fri, 5 Mar 2021 01:15:16 +0200 Subject: [PATCH 41/49] to strings --- core/processor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/processor.py b/core/processor.py index f89587e..275238b 100644 --- a/core/processor.py +++ b/core/processor.py @@ -146,9 +146,9 @@ def _correct_symlinks(self, root_dir, gzip_ext): if not os.path.islink(path): continue - target_path = os.readlink(path) + target_path = str(os.readlink(path)) - if str(target_path).endswith('layer.tar'): + if target_path.endswith('layer.tar'): self._logger.debug( 'Found link to tar layer, pointing to compressed', target_path=target_path, @@ -156,7 +156,7 @@ def _correct_symlinks(self, root_dir, gzip_ext): ) # try and fix - point to tar.gz - new_target_path = target_path + bytes(gzip_ext) + new_target_path = target_path + gzip_ext tmp_link_path = f'{target_path}_tmplink' os.symlink(new_target_path, tmp_link_path) os.unlink(path) From 7f6cecdd99c1896320901a7abc3f86a629d01c97 Mon Sep 17 00:00:00 2001 From: omesser Date: Fri, 5 Mar 2021 01:25:58 +0200 Subject: [PATCH 42/49] symlink bug --- core/processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/processor.py b/core/processor.py index 275238b..3a3db3d 100644 --- a/core/processor.py +++ b/core/processor.py @@ -157,7 +157,7 @@ def _correct_symlinks(self, root_dir, gzip_ext): # try and fix - point to tar.gz new_target_path = target_path + gzip_ext - tmp_link_path = f'{target_path}_tmplink' + tmp_link_path = f'{path}_tmplink' os.symlink(new_target_path, tmp_link_path) os.unlink(path) os.rename(tmp_link_path, path) From b61a0aa8a23e08aa87c7f5d94749bd654396ac66 Mon Sep 17 00:00:00 2001 From: omesser Date: Fri, 5 Mar 2021 12:04:53 +0200 Subject: [PATCH 43/49] M --- core/processor.py | 83 ++++++++++++++++++++++++++++++----------------- 1 file changed, 53 insertions(+), 30 deletions(-) diff --git a/core/processor.py b/core/processor.py index 3a3db3d..3ab664a 100644 --- a/core/processor.py +++ b/core/processor.py @@ -171,9 +171,24 @@ def _correct_symlinks(self, root_dir, gzip_ext): def _compress_layers(self, root_dir, gzip_ext): self._logger.debug('Compressing all layer files (pre-processing)') - for tar_files in pathlib.Path(root_dir).rglob('*.tar'): - file_path = str(tar_files.absolute()) - gzipped_file_path = str(file_path) + gzip_ext + # we do this in 2 passes, because some layers are symlinked and we re-pointed them to tar.gz earlier, we must + # skip them first, since they are broken symlinks. first gzip all non-linked layers, then do another pass and + # gzip the symlinked ones. + self._logger.debug('Compressing non symlink layers') + for element in pathlib.Path(root_dir).rglob('*.tar'): + file_path = str(element.absolute()) + self._compress_layer(file_path, gzip_ext, compress_symlinks=False) + + self._logger.debug('Compressing symlink layers') + for element in pathlib.Path(root_dir).rglob('*.tar'): + file_path = str(element.absolute()) + self._compress_layer(file_path, gzip_ext, compress_symlinks=True) + + self._logger.debug('Finished compressing all layer files') + + def _compress_layer(self, file_path, gzip_ext, compress_symlinks): + try: + gzipped_file_path = file_path + gzip_ext # safety - if .tar.gz is in place, skip # compression and ignore the original @@ -183,39 +198,47 @@ def _compress_layers(self, root_dir, gzip_ext): file_path=file_path, gzipped_path=gzipped_file_path, ) - continue + return - try: + if os.path.islink(file_path) != compress_symlinks: + if compress_symlinks: + log_message = ( + 'Layer file is not a symlink (symlinks only requested) - skipping', + ) + else: + log_message = ( + 'Layer file is a symlink (non-symlinks requested) - skipping', + ) + self._logger.debug(log_message, file_path=file_path) + return - # .tar ->.tar.gzip - self._logger.info('Compressing layer file', file_path=file_path) + # .tar ->.tar.gzip + self._logger.info('Compressing layer file', file_path=file_path) - with open(file_path, 'rb') as f_in, gzip.open( - gzipped_file_path, 'wb' - ) as f_out: - f_out.writelines(f_in) + with open(file_path, 'rb') as f_in, gzip.open( + gzipped_file_path, 'wb' + ) as f_out: + f_out.writelines(f_in) - os.remove(file_path) - self._logger.debug( - 'Successfully gzipped layer', - gzipped_file_path=gzipped_file_path, - file_path=file_path, - ) - - except Exception as exc: + os.remove(file_path) + self._logger.debug( + 'Successfully gzipped layer', + gzipped_file_path=gzipped_file_path, + file_path=file_path, + ) - # print debugging info - layer_dir = pathlib.Path(file_path).parents[0] - files = layer_dir.glob('**/*') - self._logger.debug( - 'Listed elements in layer dir', - files=files, - layer_dir=layer_dir, - exc=exc, - ) - raise + except Exception as exc: - self._logger.debug('Finished compressing all layer files') + # print debugging info + layer_dir = pathlib.Path(file_path).parents[0] + files = layer_dir.glob('**/*') + self._logger.debug( + 'Listed elements in layer dir', + files=[f for f in files], + layer_dir=layer_dir, + exc=exc, + ) + raise def _update_manifests(self, root_dir, gzip_ext): self._logger.debug('Correcting image manifests') From 86ca822440d04a47a24ad566a242252b7f44b87c Mon Sep 17 00:00:00 2001 From: omesser Date: Fri, 5 Mar 2021 21:05:22 +0200 Subject: [PATCH 44/49] remove verification and parallelize gzip --- core/processor.py | 93 +++++++++++++++++++++++------------------------ core/registry.py | 3 +- 2 files changed, 46 insertions(+), 50 deletions(-) diff --git a/core/processor.py b/core/processor.py index 3ab664a..618462a 100644 --- a/core/processor.py +++ b/core/processor.py @@ -84,10 +84,8 @@ def process(self): self._extractor.extract_all(tmp_dir_name) # pre-process layers in place - for kaniko - if self._gzip_layers: - self._pre_process_contents(tmp_dir_name) + self._pre_process_contents(tmp_dir_name) - self._verify_configs_integrity(tmp_dir_name) manifest = self._get_manifest(tmp_dir_name) self._logger.debug('Extracted archive manifest', manifest=manifest) @@ -120,6 +118,9 @@ def process(self): def _pre_process_contents(self, root_dir): start_time = time.time() + if not self._gzip_layers: + return + self._logger.debug('Preprocessing extracted contents') # for Kaniko compatibility - must be real tar.gzip and not just tar @@ -130,7 +131,7 @@ def _pre_process_contents(self, root_dir): elapsed = time.time() - start_time self._logger.info( - 'Finished compressing all layer files (pre-processing)', + 'Finished preprocessing archive contents', elapsed=humanfriendly.format_timespan(elapsed), ) @@ -169,20 +170,50 @@ def _correct_symlinks(self, root_dir, gzip_ext): self._logger.debug('Finished updating symlinks') def _compress_layers(self, root_dir, gzip_ext): - self._logger.debug('Compressing all layer files (pre-processing)') + """ + we do this in 2 passes, because some layers are symlinked and we re-pointed them to tar.gz earlier, we must + skip them first, since they are broken symlinks. first gzip all non-linked layers, then do another pass and + gzip the symlinked ones. + """ + self._logger.debug( + 'Compressing all layer files (pre-processing)', processes=self._parallel + ) + results = [] - # we do this in 2 passes, because some layers are symlinked and we re-pointed them to tar.gz earlier, we must - # skip them first, since they are broken symlinks. first gzip all non-linked layers, then do another pass and - # gzip the symlinked ones. self._logger.debug('Compressing non symlink layers') - for element in pathlib.Path(root_dir).rglob('*.tar'): - file_path = str(element.absolute()) - self._compress_layer(file_path, gzip_ext, compress_symlinks=False) + with multiprocessing.pool.ThreadPool(processes=self._parallel) as pool: + for element in pathlib.Path(root_dir).rglob('*.tar'): + file_path = str(element.absolute()) + res = pool.apply_async( + self._compress_layer, + (file_path, gzip_ext, False), + ) + results.append(res) + pool.close() + pool.join() + + # this will throw if any pool worker caught an exception + for res in results: + res.get() + + results = [] self._logger.debug('Compressing symlink layers') - for element in pathlib.Path(root_dir).rglob('*.tar'): - file_path = str(element.absolute()) - self._compress_layer(file_path, gzip_ext, compress_symlinks=True) + with multiprocessing.pool.ThreadPool(processes=self._parallel) as pool: + for element in pathlib.Path(root_dir).rglob('*.tar'): + file_path = str(element.absolute()) + res = pool.apply_async( + self._compress_layer, + (file_path, gzip_ext, True), + ) + results.append(res) + + pool.close() + pool.join() + + # this will throw if any pool worker caught an exception + for res in results: + res.get() self._logger.debug('Finished compressing all layer files') @@ -271,40 +302,6 @@ def _update_manifests(self, root_dir, gzip_ext): self._write_manifest(root_dir, manifest) self._logger.debug('Corrected image manifests', manifest=manifest) - def _verify_configs_integrity(self, root_dir): - self._logger.debug('Verifying configurations consistency') - manifest = self._get_manifest(root_dir) - - # check for layer mismatches - for manifest_image_section in manifest: - config_filename = manifest_image_section["Config"] - config_path = os.path.join(root_dir, config_filename) - image_config = utils.helpers.load_json_file(config_path) - - # warning - spammy - self._logger.debug('Parsed image config', image_config=image_config) - - for layer_idx, layer in enumerate(manifest_image_section["Layers"]): - self._logger.debug( - 'Inspecting layer', image_config=config_path, layer_idx=layer_idx - ) - layer_path = os.path.join(root_dir, layer) - digest_from_manifest_path = utils.helpers.get_digest(layer_path) - digest_from_image_config = image_config['rootfs']['diff_ids'][layer_idx] - log_kwargs = { - 'digest_from_manifest_path': digest_from_manifest_path, - 'digest_from_image_config': digest_from_image_config, - 'layer_idx': layer_idx, - } - if digest_from_image_config == digest_from_image_config: - self._logger.debug('Digests comparison passed', **log_kwargs) - # else: - # self._logger.log_and_raise( - # 'error', 'Failed layer digest validation', **log_kwargs - # ) - - self._logger.debug('Finished config/manifest verification', manifest=manifest) - @staticmethod def _get_manifest(archive_dir): with open(os.path.join(archive_dir, 'manifest.json'), 'r') as fh: diff --git a/core/registry.py b/core/registry.py index 00dc4c2..80836ec 100644 --- a/core/registry.py +++ b/core/registry.py @@ -115,7 +115,6 @@ def process_image(self, tmp_dir_name, image_config): 'Pushing image config', image=image, config_path=config_path, - config_parsed=config_parsed, ) push_url = self._initialize_push(image) digest, size = self._push_config(config_path, push_url) @@ -129,7 +128,7 @@ def process_image(self, tmp_dir_name, image_config): image, tag, config_path, manifest_layer_info, config_info ).create() - self._logger.info( + self._logger.debug( 'Pushing image tag manifest', image=image, tag=tag, From d380023e0a175b1df3fcb754ff085bb7de7899ab Mon Sep 17 00:00:00 2001 From: omesser Date: Sat, 6 Mar 2021 23:33:00 +0200 Subject: [PATCH 45/49] M --- core/gzip_stream.py | 80 +++++++++++++++++++++++++++++++++ core/image_manifest_creator.py | 5 +-- core/processor.py | 2 +- core/registry.py | 81 ++++++++++++++++++++++++++++------ 4 files changed, 149 insertions(+), 19 deletions(-) create mode 100644 core/gzip_stream.py diff --git a/core/gzip_stream.py b/core/gzip_stream.py new file mode 100644 index 0000000..eb91249 --- /dev/null +++ b/core/gzip_stream.py @@ -0,0 +1,80 @@ +import gzip +import io +from typing import BinaryIO + + +class GZIPCompressedStream(io.RawIOBase): + def __init__(self, stream: BinaryIO, *, compression_level: int): + assert 1 <= compression_level <= 9 + + self._compression_level = compression_level + self._stream = stream + + self._compressed_stream = io.BytesIO() + self._compressor = gzip.GzipFile( + mode='wb', fileobj=self._compressed_stream, compresslevel=compression_level + ) + + # because of the GZIP header written by `GzipFile.__init__`: + self._compressed_stream.seek(0) + + @property + def compression_level(self) -> int: + return self._compression_level + + @property + def stream(self) -> BinaryIO: + return self._stream + + def readable(self) -> bool: + return True + + def _read_compressed_into(self, b: memoryview) -> int: + buf = self._compressed_stream.read(len(b)) + b[: len(buf)] = buf + return len(buf) + + def readinto(self, b: bytearray) -> int: + b = memoryview(b) + + offset = 0 + size = len(b) + while offset < size: + offset += self._read_compressed_into(b[offset:]) + if offset < size: + # self._compressed_buffer now empty + if self._compressor.closed: + # nothing to compress anymore + break + # compress next bytes + self._read_n_compress(size) + + return offset + + def _read_n_compress(self, size: int): + assert size > 0 + + data = self._stream.read(size) + + # rewind buffer to the start to free up memory + # (because anything currently in the buffer should be already + # streamed off the object) + self._compressed_stream.seek(0) + self._compressed_stream.truncate(0) + + if data: + self._compressor.write(data) + else: + # this will write final data (will flush zlib with Z_FINISH) + self._compressor.close() + + # rewind to the buffer start + self._compressed_stream.seek(0) + + def __repr__(self) -> str: + return ( + '{self.__class__.__name__}(' + '{self.stream!r}, ' + 'compression_level={self.compression_level!r}' + ')' + ).format(self=self) diff --git a/core/image_manifest_creator.py b/core/image_manifest_creator.py index 95bdbd4..ef218be 100644 --- a/core/image_manifest_creator.py +++ b/core/image_manifest_creator.py @@ -1,8 +1,5 @@ -import os import json -import utils.helpers - class ImageManifestCreator(object): def __init__(self, name, tag, config_path, layers_info, config_info): @@ -26,7 +23,7 @@ def create(self): if layer_info['ext'].endswith('gz') or layer_info['ext'].endswith('gzip'): media_type = "application/vnd.docker.image.rootfs.diff.tar.gzip" else: - media_type = "application/vnd.docker.image.rootfs.diff.tar" + media_type = "application/vnd.docker.image.rootfs.diff.tar.gzip" layer_data = { "mediaType": media_type, "size": layer_info['size'], diff --git a/core/processor.py b/core/processor.py index 618462a..1af5d35 100644 --- a/core/processor.py +++ b/core/processor.py @@ -84,7 +84,7 @@ def process(self): self._extractor.extract_all(tmp_dir_name) # pre-process layers in place - for kaniko - self._pre_process_contents(tmp_dir_name) + # self._pre_process_contents(tmp_dir_name) manifest = self._get_manifest(tmp_dir_name) self._logger.debug('Extracted archive manifest', manifest=manifest) diff --git a/core/registry.py b/core/registry.py index 80836ec..034d71d 100644 --- a/core/registry.py +++ b/core/registry.py @@ -9,7 +9,7 @@ import requests import requests.auth -from . import image_manifest_creator +from . import image_manifest_creator, gzip_stream import utils.helpers @@ -116,6 +116,7 @@ def process_image(self, tmp_dir_name, image_config): image=image, config_path=config_path, ) + # self._change_config(config_path, manifest_layer_info) push_url = self._initialize_push(image) digest, size = self._push_config(config_path, push_url) config_info = {'digest': digest, 'size': size} @@ -150,6 +151,26 @@ def process_image(self, tmp_dir_name, image_config): elapsed=humanfriendly.format_timespan(image_elapsed), ) + def _change_config(self, config_path, layers_info): + contents = utils.helpers.load_json_file(config_path) + + # sanity + if len(contents['rootfs']['diff_ids']) != len(layers_info): + self._logger.log_and_raise( + 'error', + 'Mismatch in layer count for config', + orig_num=len(contents['rootfs']['diff_ids']), + layers_info_num=len(layers_info), + config_path=config_path, + ) + + for idx, layer_info in enumerate(layers_info): + contents['rootfs']['diff_ids'][idx] = layer_info['digest'] + utils.helpers.dump_json_file(config_path, contents) + self._logger.info( + 'Corrected image config digests', contents=contents, config_path=config_path + ) + def _stream_print(self, what, end=None): if self._stream: if end: @@ -258,12 +279,12 @@ def _initialize_push(self, repository): return upload_url def _push_layer(self, layer_path, upload_url): - return self._chunked_upload(layer_path, upload_url) + return self._chunked_upload(layer_path, upload_url, gzip=True) def _push_config(self, config_path, upload_url): return self._chunked_upload(config_path, upload_url) - def _chunked_upload(self, file_path, initial_url): + def _chunked_upload(self, file_path, initial_url, gzip=False): content_path = os.path.abspath(file_path) total_size = os.stat(content_path).st_size response_digest = None @@ -277,18 +298,23 @@ def _chunked_upload(self, file_path, initial_url): initial_url=initial_url, total_size=humanfriendly.format_size(total_size, binary=True), ) + with open(content_path, "rb") as f: index = 0 upload_url = initial_url headers = {} sha256hash = hashlib.sha256() - for chunk in self._read_in_chunks(f): + if gzip: + f_read = gzip_stream.GZIPCompressedStream(f, compression_level=9) + else: + f_read = f + for chunk in self._read_in_chunks(f_read): length_read += len(chunk) offset = index + len(chunk) - sha256hash.update(chunk) + headers['Content-Encoding'] = 'gzip' headers['Content-Type'] = 'application/octet-stream' headers['Content-Length'] = str(len(chunk)) headers['Content-Range'] = f'{index}-{offset}' @@ -307,6 +333,7 @@ def _chunked_upload(self, file_path, initial_url): # complete the upload if last: + self._logger.debug('Pushing last') digest = f'sha256:{str(sha256hash.hexdigest())}' response = requests.put( f"{upload_url}&digest={digest}", @@ -327,6 +354,7 @@ def _chunked_upload(self, file_path, initial_url): response_digest = response.headers["Docker-Content-Digest"] else: + self._logger.debug('Pushing') response = requests.patch( upload_url, data=chunk, @@ -357,16 +385,41 @@ def _chunked_upload(self, file_path, initial_url): exc=exc, ) - # sanity - if total_pushed_size != total_size: - self._logger.log_and_raise( - 'error', - 'File size and pushed size differ, inconsistency detected', - total_pushed_size=total_pushed_size, - total_size=total_size, - filepath=file_path, + if not last: + self._logger.debug('Pushing last (empty)') + headers = {} + headers['Content-Length'] = "0" + + # we compressed so don't know when the last was + digest = f'sha256:{str(sha256hash.hexdigest())}' + response = requests.put( + f"{upload_url}&digest={digest}", + headers=headers, + auth=self._basicauth, + verify=self._ssl_verify, ) - if response_digest != digest: + if response.status_code != 201: + self._logger.log_and_raise( + 'error', + 'Failed to complete upload retroactively', + digest=digest, + filepath=file_path, + status_code=response.status_code, + content=response.content, + ) + + response_digest = response.headers["Docker-Content-Digest"] + + # sanity + # if total_pushed_size != total_size: + # self._logger.log_and_raise( + # 'error', + # 'File size and pushed size differ, inconsistency detected', + # total_pushed_size=total_pushed_size, + # total_size=total_size, + # filepath=file_path, + # ) + if response_digest != digest or response_digest is None: self._logger.log_and_raise( 'error', 'Server-side digest different from client digest', From 566813f80a08a3d669a4fa53138b736a428f7b3a Mon Sep 17 00:00:00 2001 From: omesser Date: Sun, 7 Mar 2021 00:47:39 +0200 Subject: [PATCH 46/49] Cleaning up --- core/image_manifest_creator.py | 6 +- core/processor.py | 2 +- core/registry.py | 124 ++++++++++++--------------------- 3 files changed, 45 insertions(+), 87 deletions(-) diff --git a/core/image_manifest_creator.py b/core/image_manifest_creator.py index ef218be..e63a5fd 100644 --- a/core/image_manifest_creator.py +++ b/core/image_manifest_creator.py @@ -20,12 +20,8 @@ def create(self): } manifest["layers"] = [] for layer_info in self._layers_info: - if layer_info['ext'].endswith('gz') or layer_info['ext'].endswith('gzip'): - media_type = "application/vnd.docker.image.rootfs.diff.tar.gzip" - else: - media_type = "application/vnd.docker.image.rootfs.diff.tar.gzip" layer_data = { - "mediaType": media_type, + "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip", "size": layer_info['size'], "digest": layer_info['digest'], } diff --git a/core/processor.py b/core/processor.py index 1af5d35..f085940 100644 --- a/core/processor.py +++ b/core/processor.py @@ -35,7 +35,6 @@ def __init__( self._parallel = parallel self._tmp_dir = tmp_dir self._tmp_dir_override = tmp_dir_override - self._gzip_layers = gzip_layers if parallel > 1 and stream: self._logger.info( @@ -46,6 +45,7 @@ def __init__( self._registry = registry.Registry( logger=self._logger, + gzip_layers=gzip_layers, registry_url=registry_url, stream=stream, login=login, diff --git a/core/registry.py b/core/registry.py index 034d71d..2028c1b 100644 --- a/core/registry.py +++ b/core/registry.py @@ -33,6 +33,7 @@ class Registry: def __init__( self, logger, + gzip_layers, registry_url, stream=False, login=None, @@ -42,6 +43,7 @@ def __init__( replace_tags_target=None, ): self._logger = logger.get_child('registry') + self._gzip_layers = gzip_layers # enrich http suffix if missing if urllib.parse.urlparse(registry_url).scheme not in ['http', 'https']: @@ -106,7 +108,6 @@ def process_image(self, tmp_dir_name, image_config): { 'digest': layer_digest, 'size': layer_size, - 'ext': os.path.splitext(layer)[1], } ) @@ -279,7 +280,7 @@ def _initialize_push(self, repository): return upload_url def _push_layer(self, layer_path, upload_url): - return self._chunked_upload(layer_path, upload_url, gzip=True) + return self._chunked_upload(layer_path, upload_url, gzip=self._gzip_layers) def _push_config(self, config_path, upload_url): return self._chunked_upload(config_path, upload_url) @@ -287,16 +288,15 @@ def _push_config(self, config_path, upload_url): def _chunked_upload(self, file_path, initial_url, gzip=False): content_path = os.path.abspath(file_path) total_size = os.stat(content_path).st_size - response_digest = None total_pushed_size = 0 length_read = 0 - digest = None self._logger.debug( 'Pushing chunked content', file_path=file_path, initial_url=initial_url, total_size=humanfriendly.format_size(total_size, binary=True), + gzip=gzip, ) with open(content_path, "rb") as f: @@ -307,6 +307,7 @@ def _chunked_upload(self, file_path, initial_url, gzip=False): if gzip: f_read = gzip_stream.GZIPCompressedStream(f, compression_level=9) + headers['Content-Encoding'] = 'gzip' else: f_read = f for chunk in self._read_in_chunks(f_read): @@ -314,15 +315,11 @@ def _chunked_upload(self, file_path, initial_url, gzip=False): offset = index + len(chunk) sha256hash.update(chunk) - headers['Content-Encoding'] = 'gzip' headers['Content-Type'] = 'application/octet-stream' headers['Content-Length'] = str(len(chunk)) headers['Content-Range'] = f'{index}-{offset}' index = offset - last = False - if length_read == total_size: - last = True try: self._stream_print( "Pushing... " @@ -331,49 +328,25 @@ def _chunked_upload(self, file_path, initial_url, gzip=False): end="\r", ) - # complete the upload - if last: - self._logger.debug('Pushing last') - digest = f'sha256:{str(sha256hash.hexdigest())}' - response = requests.put( - f"{upload_url}&digest={digest}", - data=chunk, - headers=headers, - auth=self._basicauth, - verify=self._ssl_verify, - ) - if response.status_code != 201: - self._logger.log_and_raise( - 'error', - 'Failed to complete upload', - digest=digest, - filepath=file_path, - status_code=response.status_code, - content=response.content, - ) - - response_digest = response.headers["Docker-Content-Digest"] - else: - self._logger.debug('Pushing') - response = requests.patch( - upload_url, - data=chunk, - headers=headers, - auth=self._basicauth, - verify=self._ssl_verify, - ) + response = requests.patch( + upload_url, + data=chunk, + headers=headers, + auth=self._basicauth, + verify=self._ssl_verify, + ) - if response.status_code != 202: - self._logger.log_and_raise( - 'error', - 'Failed to upload chunk', - filepath=file_path, - status_code=response.status_code, - content=response.content, - ) + if response.status_code != 202: + self._logger.log_and_raise( + 'error', + 'Failed to upload chunk', + filepath=file_path, + status_code=response.status_code, + content=response.content, + ) - if "Location" in response.headers: - upload_url = response.headers["Location"] + if "Location" in response.headers: + upload_url = response.headers["Location"] total_pushed_size += len(chunk) @@ -385,40 +358,29 @@ def _chunked_upload(self, file_path, initial_url, gzip=False): exc=exc, ) - if not last: - self._logger.debug('Pushing last (empty)') - headers = {} - headers['Content-Length'] = "0" - - # we compressed so don't know when the last was - digest = f'sha256:{str(sha256hash.hexdigest())}' - response = requests.put( - f"{upload_url}&digest={digest}", - headers=headers, - auth=self._basicauth, - verify=self._ssl_verify, + # request the finalizing put, no body + headers = {'Content-Length': "0"} + + # we compressed so don't know when the last was + digest = f'sha256:{str(sha256hash.hexdigest())}' + response = requests.put( + f"{upload_url}&digest={digest}", + headers=headers, + auth=self._basicauth, + verify=self._ssl_verify, + ) + if response.status_code != 201: + self._logger.log_and_raise( + 'error', + 'Failed to complete upload retroactively', + digest=digest, + filepath=file_path, + status_code=response.status_code, + content=response.content, ) - if response.status_code != 201: - self._logger.log_and_raise( - 'error', - 'Failed to complete upload retroactively', - digest=digest, - filepath=file_path, - status_code=response.status_code, - content=response.content, - ) - response_digest = response.headers["Docker-Content-Digest"] - - # sanity - # if total_pushed_size != total_size: - # self._logger.log_and_raise( - # 'error', - # 'File size and pushed size differ, inconsistency detected', - # total_pushed_size=total_pushed_size, - # total_size=total_size, - # filepath=file_path, - # ) + response_digest = response.headers["Docker-Content-Digest"] + if response_digest != digest or response_digest is None: self._logger.log_and_raise( 'error', From 0452de38a6fe3ac43a643114629cc694666b0018 Mon Sep 17 00:00:00 2001 From: omesser Date: Sun, 7 Mar 2021 03:40:14 +0200 Subject: [PATCH 47/49] Removing old logic --- core/processor.py | 189 --------------------------------- core/registry.py | 27 +---- {core => utils}/gzip_stream.py | 26 +++-- 3 files changed, 20 insertions(+), 222 deletions(-) rename {core => utils}/gzip_stream.py (87%) diff --git a/core/processor.py b/core/processor.py index f085940..cee44bf 100644 --- a/core/processor.py +++ b/core/processor.py @@ -83,9 +83,6 @@ def process(self): # extract the whole thing self._extractor.extract_all(tmp_dir_name) - # pre-process layers in place - for kaniko - # self._pre_process_contents(tmp_dir_name) - manifest = self._get_manifest(tmp_dir_name) self._logger.debug('Extracted archive manifest', manifest=manifest) @@ -116,192 +113,6 @@ def process(self): elapsed=humanfriendly.format_timespan(elapsed), ) - def _pre_process_contents(self, root_dir): - start_time = time.time() - if not self._gzip_layers: - return - - self._logger.debug('Preprocessing extracted contents') - - # for Kaniko compatibility - must be real tar.gzip and not just tar - gzip_ext = '.gz' - self._correct_symlinks(root_dir, gzip_ext) - self._compress_layers(root_dir, gzip_ext) - self._update_manifests(root_dir, gzip_ext) - - elapsed = time.time() - start_time - self._logger.info( - 'Finished preprocessing archive contents', - elapsed=humanfriendly.format_timespan(elapsed), - ) - - def _correct_symlinks(self, root_dir, gzip_ext): - self._logger.debug('Updating symlinks to compressed layers') - - # move layer symlinks to gz files, even if they are not there - for root, dirs, files in os.walk(root_dir): - for filename in files: - path = os.path.join(root, filename) - - # If it's not a symlink we're not interested. - if not os.path.islink(path): - continue - - target_path = str(os.readlink(path)) - - if target_path.endswith('layer.tar'): - self._logger.debug( - 'Found link to tar layer, pointing to compressed', - target_path=target_path, - path=path, - ) - - # try and fix - point to tar.gz - new_target_path = target_path + gzip_ext - tmp_link_path = f'{path}_tmplink' - os.symlink(new_target_path, tmp_link_path) - os.unlink(path) - os.rename(tmp_link_path, path) - self._logger.debug( - 'Moved layer link to compressed target', - new_target_path=new_target_path, - path=path, - ) - self._logger.debug('Finished updating symlinks') - - def _compress_layers(self, root_dir, gzip_ext): - """ - we do this in 2 passes, because some layers are symlinked and we re-pointed them to tar.gz earlier, we must - skip them first, since they are broken symlinks. first gzip all non-linked layers, then do another pass and - gzip the symlinked ones. - """ - self._logger.debug( - 'Compressing all layer files (pre-processing)', processes=self._parallel - ) - results = [] - - self._logger.debug('Compressing non symlink layers') - with multiprocessing.pool.ThreadPool(processes=self._parallel) as pool: - for element in pathlib.Path(root_dir).rglob('*.tar'): - file_path = str(element.absolute()) - res = pool.apply_async( - self._compress_layer, - (file_path, gzip_ext, False), - ) - results.append(res) - - pool.close() - pool.join() - - # this will throw if any pool worker caught an exception - for res in results: - res.get() - - results = [] - self._logger.debug('Compressing symlink layers') - with multiprocessing.pool.ThreadPool(processes=self._parallel) as pool: - for element in pathlib.Path(root_dir).rglob('*.tar'): - file_path = str(element.absolute()) - res = pool.apply_async( - self._compress_layer, - (file_path, gzip_ext, True), - ) - results.append(res) - - pool.close() - pool.join() - - # this will throw if any pool worker caught an exception - for res in results: - res.get() - - self._logger.debug('Finished compressing all layer files') - - def _compress_layer(self, file_path, gzip_ext, compress_symlinks): - try: - gzipped_file_path = file_path + gzip_ext - - # safety - if .tar.gz is in place, skip - # compression and ignore the original - if os.path.exists(gzipped_file_path): - self._logger.debug( - 'Layer file is already gzipped - skipping', - file_path=file_path, - gzipped_path=gzipped_file_path, - ) - return - - if os.path.islink(file_path) != compress_symlinks: - if compress_symlinks: - log_message = ( - 'Layer file is not a symlink (symlinks only requested) - skipping', - ) - else: - log_message = ( - 'Layer file is a symlink (non-symlinks requested) - skipping', - ) - self._logger.debug(log_message, file_path=file_path) - return - - # .tar ->.tar.gzip - self._logger.info('Compressing layer file', file_path=file_path) - - with open(file_path, 'rb') as f_in, gzip.open( - gzipped_file_path, 'wb' - ) as f_out: - f_out.writelines(f_in) - - os.remove(file_path) - self._logger.debug( - 'Successfully gzipped layer', - gzipped_file_path=gzipped_file_path, - file_path=file_path, - ) - - except Exception as exc: - - # print debugging info - layer_dir = pathlib.Path(file_path).parents[0] - files = layer_dir.glob('**/*') - self._logger.debug( - 'Listed elements in layer dir', - files=[f for f in files], - layer_dir=layer_dir, - exc=exc, - ) - raise - - def _update_manifests(self, root_dir, gzip_ext): - self._logger.debug('Correcting image manifests') - manifest = self._get_manifest(root_dir) - - for manifest_image_section in manifest: - config_filename = manifest_image_section["Config"] - config_path = os.path.join(root_dir, config_filename) - image_config = utils.helpers.load_json_file(config_path) - - # warning - spammy - self._logger.verbose('Parsed image config', image_config=image_config) - - for idx, layer in enumerate(manifest_image_section["Layers"]): - if layer.endswith('.tar'): - gzipped_layer_file_path = layer + gzip_ext - manifest_image_section["Layers"][idx] = gzipped_layer_file_path - # image_config["rootfs"]['diff_ids'][idx] = utils.helpers.get_digest( - # os.path.join(root_dir, gzipped_layer_file_path) - # ) - - self._logger.debug( - '', - config_path=config_path, - image_config=image_config, - ) - # utils.helpers.dump_json_file(config_path, image_config) - - # write modified image config - self._write_manifest(root_dir, manifest) - self._logger.debug('Corrected image manifests', manifest=manifest) - @staticmethod def _get_manifest(archive_dir): with open(os.path.join(archive_dir, 'manifest.json'), 'r') as fh: diff --git a/core/registry.py b/core/registry.py index 2028c1b..86399c5 100644 --- a/core/registry.py +++ b/core/registry.py @@ -9,8 +9,9 @@ import requests import requests.auth -from . import image_manifest_creator, gzip_stream +from . import image_manifest_creator import utils.helpers +import utils.gzip_stream class LayersLock: @@ -117,7 +118,7 @@ def process_image(self, tmp_dir_name, image_config): image=image, config_path=config_path, ) - # self._change_config(config_path, manifest_layer_info) + push_url = self._initialize_push(image) digest, size = self._push_config(config_path, push_url) config_info = {'digest': digest, 'size': size} @@ -152,26 +153,6 @@ def process_image(self, tmp_dir_name, image_config): elapsed=humanfriendly.format_timespan(image_elapsed), ) - def _change_config(self, config_path, layers_info): - contents = utils.helpers.load_json_file(config_path) - - # sanity - if len(contents['rootfs']['diff_ids']) != len(layers_info): - self._logger.log_and_raise( - 'error', - 'Mismatch in layer count for config', - orig_num=len(contents['rootfs']['diff_ids']), - layers_info_num=len(layers_info), - config_path=config_path, - ) - - for idx, layer_info in enumerate(layers_info): - contents['rootfs']['diff_ids'][idx] = layer_info['digest'] - utils.helpers.dump_json_file(config_path, contents) - self._logger.info( - 'Corrected image config digests', contents=contents, config_path=config_path - ) - def _stream_print(self, what, end=None): if self._stream: if end: @@ -306,7 +287,7 @@ def _chunked_upload(self, file_path, initial_url, gzip=False): sha256hash = hashlib.sha256() if gzip: - f_read = gzip_stream.GZIPCompressedStream(f, compression_level=9) + f_read = utils.gzip_stream.GZIPCompressedStream(f, compression_level=7) headers['Content-Encoding'] = 'gzip' else: f_read = f diff --git a/core/gzip_stream.py b/utils/gzip_stream.py similarity index 87% rename from core/gzip_stream.py rename to utils/gzip_stream.py index eb91249..4a2cc4a 100644 --- a/core/gzip_stream.py +++ b/utils/gzip_stream.py @@ -11,11 +11,13 @@ def __init__(self, stream: BinaryIO, *, compression_level: int): self._stream = stream self._compressed_stream = io.BytesIO() + + # writes into self._compressed_stream self._compressor = gzip.GzipFile( mode='wb', fileobj=self._compressed_stream, compresslevel=compression_level ) - # because of the GZIP header written by `GzipFile.__init__`: + # because of the GZIP header written by GzipFile.__init__ self._compressed_stream.seek(0) @property @@ -29,11 +31,6 @@ def stream(self) -> BinaryIO: def readable(self) -> bool: return True - def _read_compressed_into(self, b: memoryview) -> int: - buf = self._compressed_stream.read(len(b)) - b[: len(buf)] = buf - return len(buf) - def readinto(self, b: bytearray) -> int: b = memoryview(b) @@ -42,16 +39,24 @@ def readinto(self, b: bytearray) -> int: while offset < size: offset += self._read_compressed_into(b[offset:]) if offset < size: + # self._compressed_buffer now empty if self._compressor.closed: + # nothing to compress anymore break + # compress next bytes - self._read_n_compress(size) + self._read_and_compress(size) return offset - def _read_n_compress(self, size: int): + def _read_compressed_into(self, b: memoryview) -> int: + buf = self._compressed_stream.read(len(b)) + b[: len(buf)] = buf + return len(buf) + + def _read_and_compress(self, size: int): assert size > 0 data = self._stream.read(size) @@ -65,10 +70,11 @@ def _read_n_compress(self, size: int): if data: self._compressor.write(data) else: - # this will write final data (will flush zlib with Z_FINISH) + + # write final data (will flush zlib with Z_FINISH) self._compressor.close() - # rewind to the buffer start + # rewind to buffer start self._compressed_stream.seek(0) def __repr__(self) -> str: From 1f2fe266489b1bded948ec21f552a14986a118a4 Mon Sep 17 00:00:00 2001 From: omesser Date: Sun, 7 Mar 2021 03:41:24 +0200 Subject: [PATCH 48/49] More cleanup for manifest creator --- core/image_manifest_creator.py | 4 +--- core/registry.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/image_manifest_creator.py b/core/image_manifest_creator.py index e63a5fd..77d3e63 100644 --- a/core/image_manifest_creator.py +++ b/core/image_manifest_creator.py @@ -2,9 +2,7 @@ class ImageManifestCreator(object): - def __init__(self, name, tag, config_path, layers_info, config_info): - self._name = name - self._tag = tag + def __init__(self, config_path, layers_info, config_info): self._config_path = config_path self._layers_info = layers_info self._config_info = config_info diff --git a/core/registry.py b/core/registry.py index 86399c5..45fe569 100644 --- a/core/registry.py +++ b/core/registry.py @@ -128,7 +128,7 @@ def process_image(self, tmp_dir_name, image_config): tag = self._replace_tag(image, tag) image_manifest = image_manifest_creator.ImageManifestCreator( - image, tag, config_path, manifest_layer_info, config_info + config_path, manifest_layer_info, config_info ).create() self._logger.debug( From 305c13b3ee9d1ecad658c4d4ba9752ebf1f89f48 Mon Sep 17 00:00:00 2001 From: omesser Date: Sun, 7 Mar 2021 03:44:01 +0200 Subject: [PATCH 49/49] Cleaning old style class defs and unused imports --- core/extractor.py | 2 +- core/image_manifest_creator.py | 2 +- core/processor.py | 5 +---- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/core/extractor.py b/core/extractor.py index 1e9da6d..3455487 100644 --- a/core/extractor.py +++ b/core/extractor.py @@ -6,7 +6,7 @@ import humanfriendly -class Extractor(object): +class Extractor: def __init__(self, logger, archive_path): self._logger = logger.get_child('tar') self._archive_path = os.path.abspath(archive_path) diff --git a/core/image_manifest_creator.py b/core/image_manifest_creator.py index 77d3e63..fae92b9 100644 --- a/core/image_manifest_creator.py +++ b/core/image_manifest_creator.py @@ -1,7 +1,7 @@ import json -class ImageManifestCreator(object): +class ImageManifestCreator: def __init__(self, config_path, layers_info, config_info): self._config_path = config_path self._layers_info = layers_info diff --git a/core/processor.py b/core/processor.py index cee44bf..277fd15 100644 --- a/core/processor.py +++ b/core/processor.py @@ -2,19 +2,16 @@ import multiprocessing.pool import time import os.path -import pathlib import shutil import json -import gzip import humanfriendly from . import registry from . import extractor -import utils.helpers -class Processor(object): +class Processor: def __init__( self, logger,