Skip to content

Commit

Permalink
Improve memory management, raise driver memory limit which causes out… (
Browse files Browse the repository at this point in the history
#82)

* Improve memory management, raise driver memory limit which causes out of memory errors when results are large.

* import fix.

* Fix driver_mem_mb and test.

* Make shouldn't fail if some files are missing when doing make clean.

* CR feedback.
  • Loading branch information
larroy authored Aug 10, 2022
1 parent 5150892 commit d425af9
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 27 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ clean:
rm ${BUILD_CONTEXT}/*.whl || true
rm -rf dist || true
rm -rf build || true
rm Pipfile
rm Pipfile.lock
rm setup.py
rm -f Pipfile
rm -f Pipfile.lock
rm -f setup.py

# Removes compiled Scala SBT artifacts
clean-test-scala:
Expand Down
28 changes: 11 additions & 17 deletions src/smspark/bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import shutil
import socket
import subprocess
from smspark import constants
from typing import Any, Dict, List, Sequence, Tuple, Union

import psutil
Expand Down Expand Up @@ -238,15 +239,15 @@ def start_spark_standalone_primary(self) -> None:
subprocess.Popen(cmd_start_primary, shell=True)

def deserialize_user_configuration(
self, configuration_dict_or_list: Union[Dict[str, Any], List[Dict[str, Any]]]
self, configuration_dict_or_list: Union[Dict[str, Any], List[Dict[str, Any]]]
) -> Union[Sequence[Configuration], Configuration]:
if isinstance(configuration_dict_or_list, dict):
return self.deserialize_user_configuration_dict(configuration_dict_or_list)
else:
return self._deserialize_user_configuration_to_sequence(configuration_dict_or_list)

def _deserialize_user_configuration_to_sequence(
self, configuration_list: List[Dict[str, Any]]
self, configuration_list: List[Dict[str, Any]]
) -> Sequence[Configuration]:
return [self.deserialize_user_configuration_dict(conf) for conf in configuration_list]

Expand Down Expand Up @@ -372,26 +373,19 @@ def set_yarn_spark_resource_config(self) -> None:
logging.info("Configuration at {} is: \n{}".format(spark_config.path, spark_config_string))

def get_yarn_spark_resource_config(
self, instance_count: int, instance_mem_mb: int, instance_cores: int
self, instance_count: int, instance_mem_mb: int, instance_cores: int
) -> Tuple[Configuration, Configuration]:
aws_region = os.getenv("AWS_REGION")
executor_cores = instance_cores
executor_count_per_instance = int(instance_cores / executor_cores)
executor_count_total = instance_count * executor_count_per_instance
default_parallelism = instance_count * instance_cores * 2

# Let's leave 3% of the instance memory free
instance_mem_mb = int(instance_mem_mb * 0.97)

driver_mem_mb = 2 * 1024
driver_mem_ovr_pct = 0.1
driver_mem_ovr_mb = int(driver_mem_mb * driver_mem_ovr_pct)
executor_mem_ovr_pct = 0.1
executor_mem_mb = int(
(instance_mem_mb - driver_mem_mb - driver_mem_ovr_mb)
/ (executor_count_per_instance + executor_count_per_instance * executor_mem_ovr_pct)
)
executor_mem_ovr_mb = int(executor_mem_mb * executor_mem_ovr_pct)
driver_mem_mb = int(instance_mem_mb * constants.DRIVER_MEM_INSTANCE_MEM_RATIO)
driver_mem_overhead_mb = int(driver_mem_mb * constants.DRIVER_MEM_OVERHEAD_RATIO)
executor_mem_mb = int(((instance_mem_mb * constants.EXECUTOR_MEM_INSTANCE_MEM_RATIO)
/ executor_count_per_instance) * (1 - constants.EXECUTOR_MEM_OVERHEAD_RATIO))
executor_mem_overhead_mb = int(executor_mem_mb * constants.EXECUTOR_MEM_OVERHEAD_RATIO)

driver_gc_config = (
"-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 "
Expand Down Expand Up @@ -426,10 +420,10 @@ def get_yarn_spark_resource_config(
"spark-defaults",
{
"spark.driver.memory": f"{driver_mem_mb}m",
"spark.driver.memoryOverhead": f"{driver_mem_ovr_mb}m",
"spark.driver.memoryOverhead": f"{driver_mem_overhead_mb}m",
"spark.driver.defaultJavaOptions": f"{driver_java_opts}",
"spark.executor.memory": f"{executor_mem_mb}m",
"spark.executor.memoryOverhead": f"{executor_mem_ovr_mb}m",
"spark.executor.memoryOverhead": f"{executor_mem_overhead_mb}m",
"spark.executor.cores": f"{executor_cores}",
"spark.executor.defaultJavaOptions": f"{executor_java_opts}",
"spark.executor.instances": f"{executor_count_total}",
Expand Down
5 changes: 5 additions & 0 deletions src/smspark/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

DRIVER_MEM_INSTANCE_MEM_RATIO = 0.9
DRIVER_MEM_OVERHEAD_RATIO = 0.1
EXECUTOR_MEM_INSTANCE_MEM_RATIO = 0.95
EXECUTOR_MEM_OVERHEAD_RATIO = 0.1
14 changes: 7 additions & 7 deletions test/unit/test_bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ def test_get_yarn_spark_resource_config(default_bootstrapper: Bootstrapper) -> N
instance_cores = 4
yarn_config, spark_config = default_bootstrapper.get_yarn_spark_resource_config(1, instance_mem_mb, instance_cores)

exp_yarn_max_mem_mb = 15892 # = int(instance_mem_mb * .97) = int(16384 * .97) = int(15892.48)
exp_yarn_max_mem_mb = 16384

exp_yarn_config_props = {
"yarn.scheduler.minimum-allocation-mb": "1",
Expand All @@ -419,14 +419,14 @@ def test_get_yarn_spark_resource_config(default_bootstrapper: Bootstrapper) -> N
exp_executor_count_total = 1 # = instance_count * executor_count_per_instance = 1 * 1
exp_default_parallelism = 8 # = instance_count * instance_cores * 2 = 1 * 4 * 2

exp_driver_mem_mb = 2048 # = 2 * 1024
exp_driver_mem_ovr_mb = 204 # = int(driver_mem_mb * driver_mem_ovr_pct) = int(2048 * 0.1) = int(204.8)
exp_driver_mem_mb = 14745
exp_driver_mem_ovr_mb = 1474
# = int((instance_mem_mb - driver_mem_mb - driver_mem_ovr_mb) /
# (executor_count_per_instance + executor_count_per_instance * executor_mem_ovr_pct))
# = int((15892 - 2048 - 204) / (1 + 1 * 0.1))
# = int(13640 / 1.1)
exp_executor_mem_mb = 12399
exp_executor_mem_ovr_mb = 1239 # = int(executor_mem_mb * executor_mem_ovr_pct) = int(12399 * 0.1) = int(1239.9)
exp_executor_mem_mb = 14008
exp_executor_mem_ovr_mb = 1400

exp_driver_gc_config = (
"-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 "
Expand Down Expand Up @@ -456,8 +456,8 @@ def test_get_yarn_spark_resource_config(default_bootstrapper: Bootstrapper) -> N
"spark.executor.defaultJavaOptions": f"{exp_executor_java_opts}",
"spark.executor.instances": f"{exp_executor_count_total}",
"spark.default.parallelism": f"{exp_default_parallelism}",
"spark.executorEnv.AWS_REGION": f"{region}",
"spark.yarn.appMasterEnv.AWS_REGION": f"{region}",
"spark.yarn.appMasterEnv.AWS_REGION": spark_config.Properties["spark.yarn.appMasterEnv.AWS_REGION"],
"spark.executorEnv.AWS_REGION": spark_config.Properties["spark.executorEnv.AWS_REGION"],
}

assert spark_config.Classification == "spark-defaults"
Expand Down

0 comments on commit d425af9

Please sign in to comment.