Skip to content

Commit

Permalink
Consider only images starting with dataproc-<major>-<minor> as base (
Browse files Browse the repository at this point in the history
…#79)

* Consider only images starting with `dataproc-<major>-<minor>` as base images while resolving from version.
  • Loading branch information
vinayakumarb authored Aug 16, 2023
1 parent 6463cd4 commit d29a725
Showing 1 changed file with 40 additions and 9 deletions.
49 changes: 40 additions & 9 deletions custom_image_utils/args_inferer.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,50 +129,81 @@ def _get_dataproc_image_path_by_version(version):
"""Get Dataproc base image name from version."""
# version regex already checked in arg parser
parsed_version = version.split(".")
major_version = parsed_version[0]
if len(parsed_version) == 2:
# The input version must be of format 1.5-debian10 in which case we need to
# expand it to 1-5-\d+-debian10 so we can do a regexp on the minor version
minor_version = parsed_version[1].split("-")[0]
parsed_version[1] = parsed_version[1].replace("-", "-\d+-")
filter_arg = ("labels.goog-dataproc-version ~ ^{}-{} AND NOT name ~ -eap$"
" AND status = READY").format(parsed_version[0],
parsed_version[1])
else:
major_version = parsed_version[0]
minor_version = parsed_version[1]
# Moreover, push the filter of READY status and name not containing 'eap' to
# gcloud command so we don't have to iterate the list
filter_arg = ("labels.goog-dataproc-version = {}-{}-{} AND NOT name ~ -eap$"
" AND status = READY").format(parsed_version[0],
parsed_version[1],
parsed_version[2])
command = [
"gcloud", "compute", "images", "list", "--project", "cloud-dataproc",
"--filter", filter_arg, "--format",
"csv[no-heading=true](name,labels.goog-dataproc-version)",
"--sort-by=~creationTimestamp"
"gcloud", "compute", "images", "list", "--project", "cloud-dataproc",
"--filter", filter_arg, "--format",
"csv[no-heading=true](name,labels.goog-dataproc-version)",
"--sort-by=~creationTimestamp"
]

_LOG.info("Executing command: {}".format(command))
# get stdout from compute images list --filters
with tempfile.NamedTemporaryFile() as temp_file:
pipe = subprocess.Popen(command, stdout=temp_file)
pipe.wait()
if pipe.returncode != 0:
raise RuntimeError(
"Cannot find dataproc base image, please check and verify "
"[--dataproc-version]")
"Cannot find dataproc base image, please check and verify "
"[--dataproc-version]")

temp_file.seek(0) # go to start of the stdout
stdout = temp_file.read()
# parse the first ready image with the dataproc version attached in labels
if stdout:
# in case there are multiple images
parsed_lines = stdout.decode('utf-8').strip().split('\n')
expected_prefix = "dataproc-{}-{}".format(major_version, minor_version)
_LOG.info("Filtering images : %s", expected_prefix)
image_versions=[]
all_images_for_version = {}
for line in parsed_lines:
parsed_image = line.split(",")
if len(parsed_image) == 2:
return (_IMAGE_PATH.format("cloud-dataproc",
parsed_image[0]), parsed_image[1])
parsed_image_name = parsed_image[0]
if not parsed_image_name.startswith(expected_prefix):
_LOG.info("Skipping non-release image %s", parsed_image_name)
# Not a regular dataproc release image. Maybe a custom image with same label.
continue
parsed_image_version = parsed_image[1]
if parsed_image_version not in all_images_for_version:
all_images_for_version[parsed_image_version] = [_IMAGE_PATH.format("cloud-dataproc", parsed_image_name)]
image_versions.append(parsed_image_version)
else:
all_images_for_version[parsed_image_version].append(_IMAGE_PATH.format("cloud-dataproc", parsed_image_name))

_LOG.info("All Images : %s", all_images_for_version)
_LOG.info("All Image-Versions : %s", image_versions)

latest_available_version = image_versions[0]
if (len(all_images_for_version[latest_available_version]) > 1):
raise RuntimeError(
"Found more than one images for latest dataproc-version={}. Images: {}".format(
latest_available_version,
str(all_images_for_version[latest_available_version])))

_LOG.info("Choosing image %s with version %s", all_images_for_version[image_versions[0]][0], image_versions[0])
return all_images_for_version[image_versions[0]][0], image_versions[0]

raise RuntimeError(
"Cannot find dataproc base image with dataproc-version=%s." % version)
"Cannot find dataproc base image with dataproc-version=%s." % version)


def _infer_project_id(args):
Expand Down

0 comments on commit d29a725

Please sign in to comment.