Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-916] FeatureGroup.insert does not retry in case of a failure #1134

Merged
merged 4 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions python/hsfs/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#

import copy
import time

from hsfs.ge_validation_result import ValidationResult
import humps
import json
Expand Down Expand Up @@ -2590,23 +2592,26 @@ def parents(self):
def materialization_job(self):
"""Get the Job object reference for the materialization job for this
Feature Group."""
if self._materialization_job is None:
try:
job_name = "{fg_name}_{version}_offline_fg_materialization".format(
fg_name=self._name, version=self._version
)
self._materialization_job = job_api.JobApi().get(job_name)
except RestAPIError as e:
if (
e.response.json().get("errorCode", "") == 130009
and e.response.status_code == 404
):
job_name = "{fg_name}_{version}_offline_fg_backfill".format(
fg_name=self._name, version=self._version
)
self._materialization_job = job_api.JobApi().get(job_name)

return self._materialization_job
if self._materialization_job is not None:
return self._materialization_job
else:
feature_group_name = util.feature_group_name(self)
job_suffix_list = ["materialization", "backfill"]
for job_suffix in job_suffix_list:
job_name = "{}_offline_fg_{}".format(feature_group_name, job_suffix)
for _ in range(3): # retry starting job
kennethmhc marked this conversation as resolved.
Show resolved Hide resolved
try:
self._materialization_job = job_api.JobApi().get(job_name)
return self._materialization_job
except RestAPIError as e:
if e.response.status_code == 404:
if e.response.json().get("errorCode", "") == 130009:
break # no need to retry, since no such job exists
else:
time.sleep(1) # backoff and then retry
kennethmhc marked this conversation as resolved.
Show resolved Hide resolved
continue
raise e
raise FeatureStoreException("No materialization job was found")

@description.setter
def description(self, new_description):
Expand Down
76 changes: 75 additions & 1 deletion python/tests/test_feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
feature_group_writer,
)
from hsfs.engine import python
from hsfs.client.exceptions import FeatureStoreException
from hsfs.client.exceptions import FeatureStoreException, RestAPIError
import pytest
import warnings

Expand Down Expand Up @@ -381,6 +381,80 @@ def test_materialization_job(self, mocker):
mock_job_api.assert_called_once
assert fg.materialization_job == mock_job

def test_materialization_job_retry_success(self, mocker):
# Arrange
mocker.patch("time.sleep")

mock_response_job_not_found = mocker.Mock()
mock_response_job_not_found.status_code = 404
mock_response_job_not_found.json.return_value = {"errorCode": 130009}

mock_response_not_found = mocker.Mock()
mock_response_not_found.status_code = 404

mock_job = mocker.Mock()

mock_job_api = mocker.patch(
"hsfs.core.job_api.JobApi.get",
side_effect=[
RestAPIError("", mock_response_job_not_found),
RestAPIError("", mock_response_not_found),
RestAPIError("", mock_response_not_found),
mock_job,
],
)

fg = feature_group.FeatureGroup(
name="test_fg",
version=2,
featurestore_id=99,
primary_key=[],
partition_key=[],
id=10,
)

# Act
job_result = fg.materialization_job

# Assert
assert job_result is mock_job
assert mock_job_api.call_count == 4
assert mock_job_api.call_args_list[0][0] == (
"test_fg_2_offline_fg_materialization",
)
assert mock_job_api.call_args_list[1][0] == ("test_fg_2_offline_fg_backfill",)
assert mock_job_api.call_args_list[2][0] == ("test_fg_2_offline_fg_backfill",)
assert mock_job_api.call_args_list[3][0] == ("test_fg_2_offline_fg_backfill",)

def test_materialization_job_retry_fail(self, mocker):
# Arrange
mocker.patch("time.sleep")

mock_response_not_found = mocker.Mock()
mock_response_not_found.status_code = 404

mock_job_api = mocker.patch(
"hsfs.core.job_api.JobApi.get",
side_effect=RestAPIError("", mock_response_not_found),
)

fg = feature_group.FeatureGroup(
name="test_fg",
version=2,
featurestore_id=99,
primary_key=[],
partition_key=[],
id=10,
)

# Act
with pytest.raises(FeatureStoreException) as e_info:
fg.materialization_job

# Assert
assert mock_job_api.call_count == 6
assert str(e_info.value) == "No materialization job was found"

def test_multi_part_insert_return_writer(self, mocker):
fg = feature_group.FeatureGroup(
name="test_fg",
Expand Down
Loading