From d29a7252ff3f55520bb5b370036f669f138d78c4 Mon Sep 17 00:00:00 2001 From: Vinayakumar B Date: Wed, 16 Aug 2023 09:18:08 +0530 Subject: [PATCH] Consider only images starting with `dataproc--` as base (#79) * Consider only images starting with `dataproc--` as base images while resolving from version. --- custom_image_utils/args_inferer.py | 49 ++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/custom_image_utils/args_inferer.py b/custom_image_utils/args_inferer.py index 37c4d6d..d237e4d 100644 --- a/custom_image_utils/args_inferer.py +++ b/custom_image_utils/args_inferer.py @@ -129,14 +129,18 @@ def _get_dataproc_image_path_by_version(version): """Get Dataproc base image name from version.""" # version regex already checked in arg parser parsed_version = version.split(".") + major_version = parsed_version[0] if len(parsed_version) == 2: # The input version must be of format 1.5-debian10 in which case we need to # expand it to 1-5-\d+-debian10 so we can do a regexp on the minor version + minor_version = parsed_version[1].split("-")[0] parsed_version[1] = parsed_version[1].replace("-", "-\d+-") filter_arg = ("labels.goog-dataproc-version ~ ^{}-{} AND NOT name ~ -eap$" " AND status = READY").format(parsed_version[0], parsed_version[1]) else: + major_version = parsed_version[0] + minor_version = parsed_version[1] # Moreover, push the filter of READY status and name not containing 'eap' to # gcloud command so we don't have to iterate the list filter_arg = ("labels.goog-dataproc-version = {}-{}-{} AND NOT name ~ -eap$" @@ -144,20 +148,21 @@ def _get_dataproc_image_path_by_version(version): parsed_version[1], parsed_version[2]) command = [ - "gcloud", "compute", "images", "list", "--project", "cloud-dataproc", - "--filter", filter_arg, "--format", - "csv[no-heading=true](name,labels.goog-dataproc-version)", - "--sort-by=~creationTimestamp" + "gcloud", "compute", "images", "list", "--project", "cloud-dataproc", + "--filter", filter_arg, "--format", + "csv[no-heading=true](name,labels.goog-dataproc-version)", + "--sort-by=~creationTimestamp" ] + _LOG.info("Executing command: {}".format(command)) # get stdout from compute images list --filters with tempfile.NamedTemporaryFile() as temp_file: pipe = subprocess.Popen(command, stdout=temp_file) pipe.wait() if pipe.returncode != 0: raise RuntimeError( - "Cannot find dataproc base image, please check and verify " - "[--dataproc-version]") + "Cannot find dataproc base image, please check and verify " + "[--dataproc-version]") temp_file.seek(0) # go to start of the stdout stdout = temp_file.read() @@ -165,14 +170,40 @@ def _get_dataproc_image_path_by_version(version): if stdout: # in case there are multiple images parsed_lines = stdout.decode('utf-8').strip().split('\n') + expected_prefix = "dataproc-{}-{}".format(major_version, minor_version) + _LOG.info("Filtering images : %s", expected_prefix) + image_versions=[] + all_images_for_version = {} for line in parsed_lines: parsed_image = line.split(",") if len(parsed_image) == 2: - return (_IMAGE_PATH.format("cloud-dataproc", - parsed_image[0]), parsed_image[1]) + parsed_image_name = parsed_image[0] + if not parsed_image_name.startswith(expected_prefix): + _LOG.info("Skipping non-release image %s", parsed_image_name) + # Not a regular dataproc release image. Maybe a custom image with same label. + continue + parsed_image_version = parsed_image[1] + if parsed_image_version not in all_images_for_version: + all_images_for_version[parsed_image_version] = [_IMAGE_PATH.format("cloud-dataproc", parsed_image_name)] + image_versions.append(parsed_image_version) + else: + all_images_for_version[parsed_image_version].append(_IMAGE_PATH.format("cloud-dataproc", parsed_image_name)) + + _LOG.info("All Images : %s", all_images_for_version) + _LOG.info("All Image-Versions : %s", image_versions) + + latest_available_version = image_versions[0] + if (len(all_images_for_version[latest_available_version]) > 1): + raise RuntimeError( + "Found more than one images for latest dataproc-version={}. Images: {}".format( + latest_available_version, + str(all_images_for_version[latest_available_version]))) + + _LOG.info("Choosing image %s with version %s", all_images_for_version[image_versions[0]][0], image_versions[0]) + return all_images_for_version[image_versions[0]][0], image_versions[0] raise RuntimeError( - "Cannot find dataproc base image with dataproc-version=%s." % version) + "Cannot find dataproc base image with dataproc-version=%s." % version) def _infer_project_id(args):