Skip to content

Commit

Permalink
Add JobServerOption for --jar_cache_dir (#32033)
Browse files Browse the repository at this point in the history
* Add JobServerOption for jar_cache_dir

Signed-off-by: s21.lee <[email protected]>

* fixed for job_server_test error

- add missing comma

Signed-off-by: s21.lee <[email protected]>

* fix error for missing comma

Signed-off-by: s21lee <[email protected]>

* fix lint

Signed-off-by: s21lee <[email protected]>

* fix lint

Signed-off-by: s21lee <[email protected]>

* fix for unit test error

Signed-off-by: s21lee <[email protected]>

* fix lint

Signed-off-by: s21lee <[email protected]>

* fix test error

Signed-off-by: s21lee <[email protected]>

* fix lint

Signed-off-by: s21lee <[email protected]>

* fix error for typo

Signed-off-by: s21lee <[email protected]>

* fix test error

Signed-off-by: s21lee <[email protected]>

* fix lint

Signed-off-by: s21lee <[email protected]>

* fix error

Signed-off-by: s21lee <[email protected]>

* fix error

Signed-off-by: s21lee <[email protected]>

* fix lint

Signed-off-by: s21lee <[email protected]>

* fix error

Signed-off-by: s21lee <[email protected]>

* fix error

Signed-off-by: s21lee <[email protected]>

* fix error

Signed-off-by: s21lee <[email protected]>

* fix lint

Signed-off-by: s21lee <[email protected]>

* fix lint

Signed-off-by: s21lee <[email protected]>

* fix error and lint

Signed-off-by: s21lee <[email protected]>

* fix lint

Signed-off-by: s21lee <[email protected]>

* fix lint

Signed-off-by: s21lee <[email protected]>

---------

Signed-off-by: s21.lee <[email protected]>
Signed-off-by: s21lee <[email protected]>
Co-authored-by: s21.lee <[email protected]>
  • Loading branch information
s21lee and s21.lee authored Oct 31, 2024
1 parent d3a841c commit 2160737
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 6 deletions.
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,10 @@ def _add_argparse_args(cls, parser):
action='append',
default=[],
help='JVM properties to pass to a Java job server.')
parser.add_argument(
'--jar_cache_dir',
default=None,
help='The location to store jar cache for job server.')


class FlinkRunnerOptions(PipelineOptions):
Expand Down
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from apache_beam.options.pipeline_options import CrossLanguageOptions
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import JobServerOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import ProfilingOptions
from apache_beam.options.pipeline_options import TypeOptions
Expand Down Expand Up @@ -645,6 +646,11 @@ def test_transform_name_mapping(self):
mapping = options.view_as(GoogleCloudOptions).transform_name_mapping
self.assertEqual(mapping['from'], 'to')

def test_jar_cache_dir(self):
options = PipelineOptions(['--jar_cache_dir=/path/to/jar_cache_dir'])
jar_cache_dir = options.view_as(JobServerOptions).jar_cache_dir
self.assertEqual(jar_cache_dir, '/path/to/jar_cache_dir')

def test_dataflow_service_options(self):
options = PipelineOptions([
'--dataflow_service_option',
Expand Down
7 changes: 4 additions & 3 deletions sdks/python/apache_beam/runners/portability/job_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def __init__(self, options):
self._artifacts_dir = options.artifacts_dir
self._java_launcher = options.job_server_java_launcher
self._jvm_properties = options.job_server_jvm_properties
self._jar_cache_dir = options.jar_cache_dir

def java_arguments(
self, job_port, artifact_port, expansion_port, artifacts_dir):
Expand All @@ -141,11 +142,11 @@ def path_to_beam_jar(gradle_target, artifact_id=None):
gradle_target, artifact_id=artifact_id)

@staticmethod
def local_jar(url):
return subprocess_server.JavaJarServer.local_jar(url)
def local_jar(url, jar_cache_dir=None):
return subprocess_server.JavaJarServer.local_jar(url, jar_cache_dir)

def subprocess_cmd_and_endpoint(self):
jar_path = self.local_jar(self.path_to_jar())
jar_path = self.local_jar(self.path_to_jar(), self._jar_cache_dir)
artifacts_dir = (
self._artifacts_dir if self._artifacts_dir else self.local_temp_dir(
prefix='artifacts'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def path_to_jar(self):
return '/path/to/jar'

@staticmethod
def local_jar(url):
def local_jar(url, jar_cache_dir=None):
logging.debug("url({%s}), jar_cache_dir({%s})", url, jar_cache_dir)
return url


Expand Down
10 changes: 8 additions & 2 deletions sdks/python/apache_beam/utils/subprocess_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,17 @@ class JavaJarServer(SubprocessServer):
'local', (threading.local, ),
dict(__init__=lambda self: setattr(self, 'replacements', {})))()

def __init__(self, stub_class, path_to_jar, java_arguments, classpath=None):
def __init__(
self,
stub_class,
path_to_jar,
java_arguments,
classpath=None,
cache_dir=None):
if classpath:
# java -jar ignores the classpath, so we make a new jar that embeds
# the requested classpath.
path_to_jar = self.make_classpath_jar(path_to_jar, classpath)
path_to_jar = self.make_classpath_jar(path_to_jar, classpath, cache_dir)
super().__init__(
stub_class, ['java', '-jar', path_to_jar] + list(java_arguments))
self._existing_service = path_to_jar if is_service_endpoint(
Expand Down

0 comments on commit 2160737

Please sign in to comment.