Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1061] Add documentation for get_feature_vector and fallback to use raw feature name #1138

Merged
merged 2 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions python/hsfs/core/vector_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(
self._prepared_statement_engine = None
self._prepared_statements = None
self._serving_keys = serving_keys
self._valid_serving_key = None
self._pkname_by_serving_index = None
self._prefix_by_serving_index = None
self._serving_key_by_serving_index = {}
Expand Down Expand Up @@ -181,13 +182,17 @@ def init_prepared_statement(self, entity, batch, external, options=None):
)
# get schemas for complex features once
self._complex_features = self.get_complex_feature_schemas()
self._valid_serving_key = set(
[sk.feature_name for sk in self._serving_keys]
+ [sk.required_serving_key for sk in self._serving_keys]
)

def _validate_serving_key(self, entry):
for key in entry:
if key not in self.serving_keys:
if key not in self._valid_serving_key:
raise ValueError(
f"'{key}' is not a correct serving key. Expect one of the"
f" followings: [{', '.join(self.serving_keys)}]"
f" followings: [{', '.join(self._valid_serving_key)}]"
)

def get_feature_vector(
Expand All @@ -212,11 +217,15 @@ def get_feature_vector(
next_statement = False
for sk in self._serving_key_by_serving_index[prepared_statement_index]:
if sk.required_serving_key not in entry.keys():
# User did not provide the necessary serving keys, we expect they have
# provided the necessary features as passed_features.
# We are going to check later if this is true
next_statement = True
break
# Check if there is any entry matched with feature name.
if sk.feature_name in entry.keys():
pk_entry[sk.feature_name] = entry[sk.feature_name]
else:
# User did not provide the necessary serving keys, we expect they have
# provided the necessary features as passed_features.
# We are going to check later if this is true
next_statement = True
break
else:
pk_entry[sk.feature_name] = entry[sk.required_serving_key]
if next_statement:
Expand Down Expand Up @@ -275,7 +284,12 @@ def get_feature_vectors(
map(
lambda e: tuple(
[
e.get(sk.required_serving_key)
(
e.get(sk.required_serving_key)
# Check if there is any entry matched with feature name,
# if the required serving key is not provided.
or e.get(sk.feature_name)
)
for sk in self._serving_key_by_serving_index[
prepared_statement_index
]
Expand All @@ -290,11 +304,8 @@ def get_feature_vectors(
).fetchall()

statement_results = {}
serving_keys = [
sk.required_serving_key
for sk in self._serving_key_by_serving_index[
prepared_statement_index
]
serving_keys = self._serving_key_by_serving_index[
prepared_statement_index
]
# Use prefix from prepare statement because prefix from serving key is collision adjusted.
prefix_features = [
Expand All @@ -320,7 +331,7 @@ def get_feature_vectors(
for i, entry in enumerate(entries):
batch_results[i].update(
statement_results.get(
self._get_result_key(serving_keys, entry), {}
self._get_result_key_serving_key(serving_keys, entry), {}
)
)
# apply passed features to each batch result
Expand Down Expand Up @@ -400,10 +411,13 @@ def _generate_vector(self, result_dict, fill_na=False):
vector.append(None)
else:
raise Exception(
f"Feature {feature_name} is missing from vector"
" because there is no match in the given entry."
f"Feature '{feature_name}' is missing from vector."
"Possible reasons: "
"1. There is no match in the given entry."
" Please check if the entry exists in the online feature store"
" or provide the feature as passed_feature."
" or provide the feature as passed_feature. "
f"2. Required entries [{', '.join(self.serving_keys)}] or "
f"[{', '.join(set(sk.feature_name for sk in self._serving_keys))}] are not provided."
)
else:
vector.append(result_dict[feature_name])
Expand All @@ -425,6 +439,16 @@ def _get_result_key(primary_keys, result_dict):
result_key.append(result_dict.get(pk))
return tuple(result_key)

@staticmethod
def _get_result_key_serving_key(serving_keys, result_dict):
result_key = []
for sk in serving_keys:
result_key.append(
result_dict.get(sk.required_serving_key)
or result_dict.get(sk.feature_name)
)
return tuple(result_key)

@staticmethod
def _parametrize_query(name, query_online):
# Now we have ordered pk_names, iterate over it and replace `?` with `:feature_name` one by one.
Expand Down
11 changes: 10 additions & 1 deletion python/hsfs/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ def get_feature_vector(
# Arguments
entry: dictionary of feature group primary key and values provided by serving application.
Set of required primary keys is [`feature_view.primary_keys`](#primary_keys)
If the required primary keys is not provided, it will look for name
of the primary key in feature group in the entry.
passed_features: dictionary of feature values provided by the application at runtime.
They can replace features values fetched from the feature store as well as
providing feature values which are not available in the feature store.
Expand Down Expand Up @@ -486,6 +488,8 @@ def get_feature_vectors(
# Arguments
entry: a list of dictionary of feature group primary key and values provided by serving application.
Set of required primary keys is [`feature_view.primary_keys`](#primary_keys)
If the required primary keys is not provided, it will look for name
of the primary key in feature group in the entry.
passed_features: a list of dictionary of feature values provided by the application at runtime.
They can replace features values fetched from the feature store as well as
providing feature values which are not available in the feature store.
Expand Down Expand Up @@ -2566,7 +2570,12 @@ def schema(self, features):

@property
def primary_keys(self):
"""Set of primary key names that is required as keys in input dict object for `get_feature_vector(s)` method."""
"""Set of primary key names that is required as keys in input dict object
for [`get_feature_vector(s)`](#get_feature_vector) method.
When there are duplicated primary key names and prefix is not defined in the query,
prefix is generated and prepended to the primary key name in this format
"fgId_{feature_group_id}_{join_index}" where `join_index` is the order of the join.
"""
_vector_server = self._single_vector_server or self._batch_vectors_server
if _vector_server:
return _vector_server.serving_keys
Expand Down