Skip to content

Commit

Permalink
some changes (needs testing)
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks committed Jan 2, 2025
1 parent 28dc3b3 commit 1b0d175
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package com.logicalclocks.hsfs;

import com.google.common.base.Strings;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -67,7 +69,17 @@ public class JobConfiguration {
@Setter
private int dynamicAllocationInitialExecutors;

@Getter
@Setter
private int yarnMaxAppAttempts;
private String properties;

public String getProperties() {
String defaultProperty = "spark.yarn.maxAppAttempts=2";
if (Strings.isNullOrEmpty(properties)) {
properties = defaultProperty;
} else {
properties = properties + (properties.contains(defaultProperty.split("=")[0]) ? "" : "\\n" + defaultProperty);
}

return properties;
}
}
3 changes: 1 addition & 2 deletions python/hsfs/core/deltastreamer_jobconf.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ def __init__(self, options, spark_options, **kwargs):
self._spark_options = spark_options

def to_dict(self):
self._spark_options["type"] = JobConfiguration.DTO_TYPE
return {
"writeOptions": self._options,
"sparkJobConfiguration": self._spark_options,
"sparkJobConfiguration": JobConfiguration(**self._spark_options).to_dict(),
}

def json(self):
Expand Down
12 changes: 9 additions & 3 deletions python/hsfs/core/job_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(
dynamic_allocation=True,
dynamic_min_executors=1,
dynamic_max_executors=2,
yarn_max_app_attempts=1,
properties=None,
**kwargs,
):
self._am_memory = am_memory
Expand All @@ -44,9 +44,15 @@ def __init__(
self._dynamic_allocation = dynamic_allocation
self._dynamic_min_executors = dynamic_min_executors
self._dynamic_max_executors = dynamic_max_executors
self._yarn_max_app_attempts = yarn_max_app_attempts
self._properties = properties

def to_dict(self):
default_property = "spark.yarn.maxAppAttempts=2"
if not self._properties:
self._properties = default_property
else:
self._properties = self._properties + (f"\\n{default_property}" if default_property.split("=")[0] not in self._properties else "")

return {
"amMemory": self._am_memory,
"amCores": self._am_cores,
Expand All @@ -56,7 +62,7 @@ def to_dict(self):
"spark.dynamicAllocation.enabled": self._dynamic_allocation,
"spark.dynamicAllocation.minExecutors": self._dynamic_min_executors,
"spark.dynamicAllocation.maxExecutors": self._dynamic_max_executors,
"spark.yarn.maxAppAttempts": self._yarn_max_app_attempts,
"properties": self._properties,
"type": JobConfiguration.DTO_TYPE,
}

Expand Down
5 changes: 3 additions & 2 deletions python/tests/core/test_job_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_to_dict_defaults(self):
"spark.dynamicAllocation.enabled": True,
"spark.dynamicAllocation.minExecutors": 1,
"spark.dynamicAllocation.maxExecutors": 2,
"spark.yarn.maxAppAttempts": 1,
"properties": "spark.yarn.maxAppAttempts=2",
"type": job_configuration.JobConfiguration.DTO_TYPE,
}
assert expected_dict == result_dict
Expand All @@ -53,6 +53,7 @@ def test_to_dict_non_defaults(self):
dynamic_min_executors=2,
dynamic_max_executors=4,
yarn_max_app_attempts=2,
properties="spark.test=xxx",
)

# Act
Expand All @@ -68,7 +69,7 @@ def test_to_dict_non_defaults(self):
"spark.dynamicAllocation.enabled": False,
"spark.dynamicAllocation.minExecutors": 2,
"spark.dynamicAllocation.maxExecutors": 4,
"spark.yarn.maxAppAttempts": 2,
"properties": "spark.test=xxx\\nspark.yarn.maxAppAttempts=2",
"type": job_configuration.JobConfiguration.DTO_TYPE,
}
assert expected_dict == result_dict

0 comments on commit 1b0d175

Please sign in to comment.