Skip to content

Commit

Permalink
Merge branch 'master' into FSTORE-916
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks authored Nov 1, 2023
2 parents da3d194 + 9686480 commit aae93eb
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 18 deletions.
5 changes: 5 additions & 0 deletions python/hsfs/core/arrow_flight_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ def get_instance():
return _arrow_flight_instance


def close():
global _arrow_flight_instance
_arrow_flight_instance = None


class ArrowFlightClient:
SUPPORTED_FORMATS = ["parquet"]
SUPPORTED_EXTERNAL_CONNECTORS = [
Expand Down
58 changes: 41 additions & 17 deletions python/hsfs/core/vector_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(
self._prepared_statement_engine = None
self._prepared_statements = None
self._serving_keys = serving_keys
self._valid_serving_key = None
self._pkname_by_serving_index = None
self._prefix_by_serving_index = None
self._serving_key_by_serving_index = {}
Expand Down Expand Up @@ -181,13 +182,17 @@ def init_prepared_statement(self, entity, batch, external, options=None):
)
# get schemas for complex features once
self._complex_features = self.get_complex_feature_schemas()
self._valid_serving_key = set(
[sk.feature_name for sk in self._serving_keys]
+ [sk.required_serving_key for sk in self._serving_keys]
)

def _validate_serving_key(self, entry):
for key in entry:
if key not in self.serving_keys:
if key not in self._valid_serving_key:
raise ValueError(
f"'{key}' is not a correct serving key. Expect one of the"
f" followings: [{', '.join(self.serving_keys)}]"
f" followings: [{', '.join(self._valid_serving_key)}]"
)

def get_feature_vector(
Expand All @@ -212,11 +217,15 @@ def get_feature_vector(
next_statement = False
for sk in self._serving_key_by_serving_index[prepared_statement_index]:
if sk.required_serving_key not in entry.keys():
# User did not provide the necessary serving keys, we expect they have
# provided the necessary features as passed_features.
# We are going to check later if this is true
next_statement = True
break
# Check if there is any entry matched with feature name.
if sk.feature_name in entry.keys():
pk_entry[sk.feature_name] = entry[sk.feature_name]
else:
# User did not provide the necessary serving keys, we expect they have
# provided the necessary features as passed_features.
# We are going to check later if this is true
next_statement = True
break
else:
pk_entry[sk.feature_name] = entry[sk.required_serving_key]
if next_statement:
Expand Down Expand Up @@ -275,7 +284,12 @@ def get_feature_vectors(
map(
lambda e: tuple(
[
e.get(sk.required_serving_key)
(
e.get(sk.required_serving_key)
# Check if there is any entry matched with feature name,
# if the required serving key is not provided.
or e.get(sk.feature_name)
)
for sk in self._serving_key_by_serving_index[
prepared_statement_index
]
Expand All @@ -290,11 +304,8 @@ def get_feature_vectors(
).fetchall()

statement_results = {}
serving_keys = [
sk.required_serving_key
for sk in self._serving_key_by_serving_index[
prepared_statement_index
]
serving_keys = self._serving_key_by_serving_index[
prepared_statement_index
]
# Use prefix from prepare statement because prefix from serving key is collision adjusted.
prefix_features = [
Expand All @@ -320,7 +331,7 @@ def get_feature_vectors(
for i, entry in enumerate(entries):
batch_results[i].update(
statement_results.get(
self._get_result_key(serving_keys, entry), {}
self._get_result_key_serving_key(serving_keys, entry), {}
)
)
# apply passed features to each batch result
Expand Down Expand Up @@ -400,10 +411,13 @@ def _generate_vector(self, result_dict, fill_na=False):
vector.append(None)
else:
raise Exception(
f"Feature {feature_name} is missing from vector"
" because there is no match in the given entry."
f"Feature '{feature_name}' is missing from vector."
"Possible reasons: "
"1. There is no match in the given entry."
" Please check if the entry exists in the online feature store"
" or provide the feature as passed_feature."
" or provide the feature as passed_feature. "
f"2. Required entries [{', '.join(self.serving_keys)}] or "
f"[{', '.join(set(sk.feature_name for sk in self._serving_keys))}] are not provided."
)
else:
vector.append(result_dict[feature_name])
Expand All @@ -425,6 +439,16 @@ def _get_result_key(primary_keys, result_dict):
result_key.append(result_dict.get(pk))
return tuple(result_key)

@staticmethod
def _get_result_key_serving_key(serving_keys, result_dict):
result_key = []
for sk in serving_keys:
result_key.append(
result_dict.get(sk.required_serving_key)
or result_dict.get(sk.feature_name)
)
return tuple(result_key)

@staticmethod
def _parametrize_query(name, query_online):
# Now we have ordered pk_names, iterate over it and replace `?` with `:feature_name` one by one.
Expand Down
2 changes: 2 additions & 0 deletions python/hsfs/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from hsfs.engine import spark
from hsfs.client import exceptions
from hsfs.core import arrow_flight_client

_engine = None
_engine_type = None
Expand Down Expand Up @@ -73,3 +74,4 @@ def get_type():
def stop():
global _engine
_engine = None
arrow_flight_client.close()
11 changes: 10 additions & 1 deletion python/hsfs/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ def get_feature_vector(
# Arguments
entry: dictionary of feature group primary key and values provided by serving application.
Set of required primary keys is [`feature_view.primary_keys`](#primary_keys)
If the required primary keys is not provided, it will look for name
of the primary key in feature group in the entry.
passed_features: dictionary of feature values provided by the application at runtime.
They can replace features values fetched from the feature store as well as
providing feature values which are not available in the feature store.
Expand Down Expand Up @@ -486,6 +488,8 @@ def get_feature_vectors(
# Arguments
entry: a list of dictionary of feature group primary key and values provided by serving application.
Set of required primary keys is [`feature_view.primary_keys`](#primary_keys)
If the required primary keys is not provided, it will look for name
of the primary key in feature group in the entry.
passed_features: a list of dictionary of feature values provided by the application at runtime.
They can replace features values fetched from the feature store as well as
providing feature values which are not available in the feature store.
Expand Down Expand Up @@ -2566,7 +2570,12 @@ def schema(self, features):

@property
def primary_keys(self):
"""Set of primary key names that is required as keys in input dict object for `get_feature_vector(s)` method."""
"""Set of primary key names that is required as keys in input dict object
for [`get_feature_vector(s)`](#get_feature_vector) method.
When there are duplicated primary key names and prefix is not defined in the query,
prefix is generated and prepended to the primary key name in this format
"fgId_{feature_group_id}_{join_index}" where `join_index` is the order of the join.
"""
_vector_server = self._single_vector_server or self._batch_vectors_server
if _vector_server:
return _vector_server.serving_keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ public static void main(String[] args) throws Exception {
.hasArg()
.build());

// This option here is used, however it's here to make the deltastreamer job
// working with the Hopsworks scheduler which adds the `start_time` option
options.addOption(Option.builder("start_time")
.argName("start_time")
.required(false)
.hasArg()
.build());

CommandLineParser parser = new DefaultParser();
CommandLine commandLine = parser.parse(options, args);

Expand Down

0 comments on commit aae93eb

Please sign in to comment.