From 80ebfff22cb6e56915bc113fba883f6b13feb22d Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 18 Oct 2023 15:30:17 +0200 Subject: [PATCH 1/3] init --- python/hsfs/feature_group.py | 41 ++++++++++------- python/tests/test_feature_group.py | 71 +++++++++++++++++++++++++++++- 2 files changed, 94 insertions(+), 18 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 5cb028273e..fa7fd38d49 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -15,6 +15,8 @@ # import copy +import time + from hsfs.ge_validation_result import ValidationResult import humps import json @@ -2590,23 +2592,28 @@ def parents(self): def materialization_job(self): """Get the Job object reference for the materialization job for this Feature Group.""" - if self._materialization_job is None: - try: - job_name = "{fg_name}_{version}_offline_fg_materialization".format( - fg_name=self._name, version=self._version - ) - self._materialization_job = job_api.JobApi().get(job_name) - except RestAPIError as e: - if ( - e.response.json().get("errorCode", "") == 130009 - and e.response.status_code == 404 - ): - job_name = "{fg_name}_{version}_offline_fg_backfill".format( - fg_name=self._name, version=self._version - ) - self._materialization_job = job_api.JobApi().get(job_name) - - return self._materialization_job + if self._materialization_job is not None: + return self._materialization_job + else: + feature_group_name = util.feature_group_name(self) + job_suffix_list = ["materialization", "backfill"] + for job_suffix in job_suffix_list: + job_name = "{}_offline_fg_{}".format(feature_group_name, job_suffix) + for _ in range(3): # retry starting job + try: + self._materialization_job = job_api.JobApi().get(job_name) + return self._materialization_job + except RestAPIError as e: + if e.response.status_code == 404: + if e.response.json().get("errorCode", "") == 130009: + print(1111) + break # no need to retry, since no such job exists + else: + print(2222) + time.sleep(1) # backoff and then retry + continue + raise e + raise FeatureStoreException("No materialization job was found") @description.setter def description(self, new_description): diff --git a/python/tests/test_feature_group.py b/python/tests/test_feature_group.py index 3de2418afb..ee4ad33776 100644 --- a/python/tests/test_feature_group.py +++ b/python/tests/test_feature_group.py @@ -28,7 +28,7 @@ feature_group_writer, ) from hsfs.engine import python -from hsfs.client.exceptions import FeatureStoreException +from hsfs.client.exceptions import FeatureStoreException, RestAPIError import pytest import warnings @@ -381,6 +381,75 @@ def test_materialization_job(self, mocker): mock_job_api.assert_called_once assert fg.materialization_job == mock_job + def test_materialization_job_retry_success(self, mocker): + # Arrange + mocker.patch("time.sleep") + + mock_response_job_not_found = mocker.Mock() + mock_response_job_not_found.status_code = 404 + mock_response_job_not_found.json.return_value = {"errorCode": 130009} + + mock_response_not_found = mocker.Mock() + mock_response_not_found.status_code = 404 + + mock_job = mocker.Mock() + + mock_job_api = mocker.patch( + "hsfs.core.job_api.JobApi.get", side_effect=[ + RestAPIError("", mock_response_job_not_found), + RestAPIError("", mock_response_not_found), + RestAPIError("", mock_response_not_found), + mock_job] + ) + + fg = feature_group.FeatureGroup( + name="test_fg", + version=2, + featurestore_id=99, + primary_key=[], + partition_key=[], + id=10, + ) + + # Act + job_result = fg.materialization_job + + # Assert + assert job_result is mock_job + assert mock_job_api.call_count == 4 + assert mock_job_api.call_args_list[0][0] == ('test_fg_2_offline_fg_materialization',) + assert mock_job_api.call_args_list[1][0] == ('test_fg_2_offline_fg_backfill',) + assert mock_job_api.call_args_list[2][0] == ('test_fg_2_offline_fg_backfill',) + assert mock_job_api.call_args_list[3][0] == ('test_fg_2_offline_fg_backfill',) + + def test_materialization_job_retry_fail(self, mocker): + # Arrange + mocker.patch("time.sleep") + + mock_response_not_found = mocker.Mock() + mock_response_not_found.status_code = 404 + + mock_job_api = mocker.patch( + "hsfs.core.job_api.JobApi.get", side_effect=RestAPIError("", mock_response_not_found) + ) + + fg = feature_group.FeatureGroup( + name="test_fg", + version=2, + featurestore_id=99, + primary_key=[], + partition_key=[], + id=10, + ) + + # Act + with pytest.raises(FeatureStoreException) as e_info: + fg.materialization_job + + # Assert + assert mock_job_api.call_count == 6 + assert str(e_info.value) == "No materialization job was found" + def test_multi_part_insert_return_writer(self, mocker): fg = feature_group.FeatureGroup( name="test_fg", From 33de4f7ded971bbb539d3cc66e91322b676fc70e Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 18 Oct 2023 15:37:50 +0200 Subject: [PATCH 2/3] Update test_feature_group.py --- python/tests/test_feature_group.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/python/tests/test_feature_group.py b/python/tests/test_feature_group.py index ee4ad33776..0850430754 100644 --- a/python/tests/test_feature_group.py +++ b/python/tests/test_feature_group.py @@ -395,11 +395,13 @@ def test_materialization_job_retry_success(self, mocker): mock_job = mocker.Mock() mock_job_api = mocker.patch( - "hsfs.core.job_api.JobApi.get", side_effect=[ + "hsfs.core.job_api.JobApi.get", + side_effect=[ RestAPIError("", mock_response_job_not_found), RestAPIError("", mock_response_not_found), RestAPIError("", mock_response_not_found), - mock_job] + mock_job, + ], ) fg = feature_group.FeatureGroup( @@ -417,10 +419,12 @@ def test_materialization_job_retry_success(self, mocker): # Assert assert job_result is mock_job assert mock_job_api.call_count == 4 - assert mock_job_api.call_args_list[0][0] == ('test_fg_2_offline_fg_materialization',) - assert mock_job_api.call_args_list[1][0] == ('test_fg_2_offline_fg_backfill',) - assert mock_job_api.call_args_list[2][0] == ('test_fg_2_offline_fg_backfill',) - assert mock_job_api.call_args_list[3][0] == ('test_fg_2_offline_fg_backfill',) + assert mock_job_api.call_args_list[0][0] == ( + "test_fg_2_offline_fg_materialization", + ) + assert mock_job_api.call_args_list[1][0] == ("test_fg_2_offline_fg_backfill",) + assert mock_job_api.call_args_list[2][0] == ("test_fg_2_offline_fg_backfill",) + assert mock_job_api.call_args_list[3][0] == ("test_fg_2_offline_fg_backfill",) def test_materialization_job_retry_fail(self, mocker): # Arrange @@ -430,7 +434,8 @@ def test_materialization_job_retry_fail(self, mocker): mock_response_not_found.status_code = 404 mock_job_api = mocker.patch( - "hsfs.core.job_api.JobApi.get", side_effect=RestAPIError("", mock_response_not_found) + "hsfs.core.job_api.JobApi.get", + side_effect=RestAPIError("", mock_response_not_found), ) fg = feature_group.FeatureGroup( From da3d194de19031551612bcf91806c8b671322b79 Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Oct 2023 11:18:48 +0200 Subject: [PATCH 3/3] Update feature_group.py --- python/hsfs/feature_group.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index fa7fd38d49..9c7d99194f 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2606,10 +2606,8 @@ def materialization_job(self): except RestAPIError as e: if e.response.status_code == 404: if e.response.json().get("errorCode", "") == 130009: - print(1111) break # no need to retry, since no such job exists else: - print(2222) time.sleep(1) # backoff and then retry continue raise e