From 457b5d19d5312c7a3acd0d4beb1de97219d622d7 Mon Sep 17 00:00:00 2001 From: Chi Chang Date: Thu, 17 Aug 2023 18:36:45 +0100 Subject: [PATCH] Setting spark.app.id to a more intuitive name (#124) * Setting spark.app.id to a more intuitive name * Update tests * Replace more special characters --- service_configuration_lib/spark_config.py | 17 +++++++++++++---- setup.py | 2 +- tests/spark_config_test.py | 14 ++++++++++++++ 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/service_configuration_lib/spark_config.py b/service_configuration_lib/spark_config.py index 1ae9bc4..cd0a22d 100644 --- a/service_configuration_lib/spark_config.py +++ b/service_configuration_lib/spark_config.py @@ -6,6 +6,7 @@ import logging import math import os +import re import time from typing import Any from typing import Dict @@ -37,6 +38,7 @@ NON_CONFIGURABLE_SPARK_OPTS = { 'spark.master', + 'spark.app.id', 'spark.ui.port', 'spark.mesos.principal', 'spark.mesos.secret', @@ -1076,6 +1078,11 @@ def get_spark_conf( _pick_random_port(PREFERRED_SPARK_UI_PORT), ) + spark_conf = {**(spark_opts_from_env or {}), **_filter_user_spark_opts(user_spark_opts)} + + if aws_creds[2] is not None: + spark_conf['spark.hadoop.fs.s3a.aws.credentials.provider'] = AWS_ENV_CREDENTIALS_PROVIDER + # app_name from env is already appended port and time to make it unique app_name = (spark_opts_from_env or {}).get('spark.app.name') if not app_name: @@ -1083,13 +1090,15 @@ def get_spark_conf( # from history server. app_name = f'{app_base_name}_{ui_port}_{int(time.time())}' - spark_conf = {**(spark_opts_from_env or {}), **_filter_user_spark_opts(user_spark_opts)} - - if aws_creds[2] is not None: - spark_conf['spark.hadoop.fs.s3a.aws.credentials.provider'] = AWS_ENV_CREDENTIALS_PROVIDER + # Explicitly setting app id: replace special characters to '_' to make it consistent + # in all places for metric systems: + # - since in the Promehteus metrics endpoint those will be converted to '_' + # - while the 'spark-app-selector' executor pod label will keep the original app id + app_id = re.sub(r'[\.,-]', '_', app_name) spark_conf.update({ 'spark.app.name': app_name, + 'spark.app.id': app_id, 'spark.ui.port': str(ui_port), }) diff --git a/setup.py b/setup.py index 1de2655..f262250 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ setup( name='service-configuration-lib', - version='2.18.3', + version='2.18.4', provides=['service_configuration_lib'], description='Start, stop, and inspect Yelp SOA services', url='https://github.com/Yelp/service_configuration_lib', diff --git a/tests/spark_config_test.py b/tests/spark_config_test.py index 92b46aa..f99c5d9 100644 --- a/tests/spark_config_test.py +++ b/tests/spark_config_test.py @@ -2,6 +2,7 @@ import itertools import json import os +import re import sys from unittest import mock @@ -1138,6 +1139,14 @@ def verify(output): return [key] return verify + @pytest.fixture + def assert_app_id(self): + def verify(output): + key = 'spark.app.id' + assert output[key] == re.sub(r'[\.,-]', '_', output['spark.app.name']) + return [key] + return verify + @pytest.fixture def assert_mesos_conf(self): def verify(output): @@ -1230,6 +1239,7 @@ def test_leaders_get_spark_conf_kubernetes( mock_time, assert_ui_port, assert_app_name, + assert_app_id, assert_kubernetes_conf, mock_log, ): @@ -1262,6 +1272,7 @@ def test_leaders_get_spark_conf_kubernetes( verified_keys = set( assert_ui_port(output) + assert_app_name(output) + + assert_app_id(output) + assert_kubernetes_conf(output) + list(other_spark_opts.keys()) + list(mock_adjust_spark_requested_resources_kubernetes.return_value.keys()) + @@ -1321,6 +1332,7 @@ def test_show_console_progress_jupyter( mock_time, assert_ui_port, assert_app_name, + assert_app_id, assert_local_conf, mock_log, ): @@ -1361,6 +1373,7 @@ def test_local_spark( mock_time, assert_ui_port, assert_app_name, + assert_app_id, assert_local_conf, mock_log, ): @@ -1385,6 +1398,7 @@ def test_local_spark( verified_keys = set( assert_ui_port(output) + assert_app_name(output) + + assert_app_id(output) + assert_local_conf(output) + list(mock_append_spark_prometheus_conf.return_value.keys()) + list(mock_append_event_log_conf.return_value.keys()) +