From e15c2faa1dfc089667e2e52f58d067e29781cfc2 Mon Sep 17 00:00:00 2001 From: Steffen Grohsschmiedt Date: Fri, 20 Oct 2023 13:58:05 +0200 Subject: [PATCH 1/3] [FSTORE-1050] Reset arrow_flight_client on close (#1135) --- python/hsfs/core/arrow_flight_client.py | 5 +++++ python/hsfs/engine/__init__.py | 2 ++ 2 files changed, 7 insertions(+) diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index 9816d7d072..b7e62f9698 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -40,6 +40,11 @@ def get_instance(): return _arrow_flight_instance +def close(): + global _arrow_flight_instance + _arrow_flight_instance = None + + class ArrowFlightClient: SUPPORTED_FORMATS = ["parquet"] SUPPORTED_EXTERNAL_CONNECTORS = [ diff --git a/python/hsfs/engine/__init__.py b/python/hsfs/engine/__init__.py index 204dcee188..bc05346f69 100644 --- a/python/hsfs/engine/__init__.py +++ b/python/hsfs/engine/__init__.py @@ -16,6 +16,7 @@ from hsfs.engine import spark from hsfs.client import exceptions +from hsfs.core import arrow_flight_client _engine = None _engine_type = None @@ -73,3 +74,4 @@ def get_type(): def stop(): global _engine _engine = None + arrow_flight_client.close() From 3d6237c6a9cdc66cd4623a84f3f1c5a78781c0f7 Mon Sep 17 00:00:00 2001 From: kennethmhc Date: Wed, 25 Oct 2023 09:30:02 +0200 Subject: [PATCH 2/3] [FSTORE-1061] Add documentation for get_feature_vector and fallback to use raw feature name (#1138) * add doc and primary key fallback * fix style --- python/hsfs/core/vector_server.py | 58 ++++++++++++++++++++++--------- python/hsfs/feature_view.py | 11 +++++- 2 files changed, 51 insertions(+), 18 deletions(-) diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index 7054149dad..6cba795f6c 100644 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -48,6 +48,7 @@ def __init__( self._prepared_statement_engine = None self._prepared_statements = None self._serving_keys = serving_keys + self._valid_serving_key = None self._pkname_by_serving_index = None self._prefix_by_serving_index = None self._serving_key_by_serving_index = {} @@ -181,13 +182,17 @@ def init_prepared_statement(self, entity, batch, external, options=None): ) # get schemas for complex features once self._complex_features = self.get_complex_feature_schemas() + self._valid_serving_key = set( + [sk.feature_name for sk in self._serving_keys] + + [sk.required_serving_key for sk in self._serving_keys] + ) def _validate_serving_key(self, entry): for key in entry: - if key not in self.serving_keys: + if key not in self._valid_serving_key: raise ValueError( f"'{key}' is not a correct serving key. Expect one of the" - f" followings: [{', '.join(self.serving_keys)}]" + f" followings: [{', '.join(self._valid_serving_key)}]" ) def get_feature_vector( @@ -212,11 +217,15 @@ def get_feature_vector( next_statement = False for sk in self._serving_key_by_serving_index[prepared_statement_index]: if sk.required_serving_key not in entry.keys(): - # User did not provide the necessary serving keys, we expect they have - # provided the necessary features as passed_features. - # We are going to check later if this is true - next_statement = True - break + # Check if there is any entry matched with feature name. + if sk.feature_name in entry.keys(): + pk_entry[sk.feature_name] = entry[sk.feature_name] + else: + # User did not provide the necessary serving keys, we expect they have + # provided the necessary features as passed_features. + # We are going to check later if this is true + next_statement = True + break else: pk_entry[sk.feature_name] = entry[sk.required_serving_key] if next_statement: @@ -275,7 +284,12 @@ def get_feature_vectors( map( lambda e: tuple( [ - e.get(sk.required_serving_key) + ( + e.get(sk.required_serving_key) + # Check if there is any entry matched with feature name, + # if the required serving key is not provided. + or e.get(sk.feature_name) + ) for sk in self._serving_key_by_serving_index[ prepared_statement_index ] @@ -290,11 +304,8 @@ def get_feature_vectors( ).fetchall() statement_results = {} - serving_keys = [ - sk.required_serving_key - for sk in self._serving_key_by_serving_index[ - prepared_statement_index - ] + serving_keys = self._serving_key_by_serving_index[ + prepared_statement_index ] # Use prefix from prepare statement because prefix from serving key is collision adjusted. prefix_features = [ @@ -320,7 +331,7 @@ def get_feature_vectors( for i, entry in enumerate(entries): batch_results[i].update( statement_results.get( - self._get_result_key(serving_keys, entry), {} + self._get_result_key_serving_key(serving_keys, entry), {} ) ) # apply passed features to each batch result @@ -400,10 +411,13 @@ def _generate_vector(self, result_dict, fill_na=False): vector.append(None) else: raise Exception( - f"Feature {feature_name} is missing from vector" - " because there is no match in the given entry." + f"Feature '{feature_name}' is missing from vector." + "Possible reasons: " + "1. There is no match in the given entry." " Please check if the entry exists in the online feature store" - " or provide the feature as passed_feature." + " or provide the feature as passed_feature. " + f"2. Required entries [{', '.join(self.serving_keys)}] or " + f"[{', '.join(set(sk.feature_name for sk in self._serving_keys))}] are not provided." ) else: vector.append(result_dict[feature_name]) @@ -425,6 +439,16 @@ def _get_result_key(primary_keys, result_dict): result_key.append(result_dict.get(pk)) return tuple(result_key) + @staticmethod + def _get_result_key_serving_key(serving_keys, result_dict): + result_key = [] + for sk in serving_keys: + result_key.append( + result_dict.get(sk.required_serving_key) + or result_dict.get(sk.feature_name) + ) + return tuple(result_key) + @staticmethod def _parametrize_query(name, query_online): # Now we have ordered pk_names, iterate over it and replace `?` with `:feature_name` one by one. diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index b5c5c079b3..05c0d36ee8 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -397,6 +397,8 @@ def get_feature_vector( # Arguments entry: dictionary of feature group primary key and values provided by serving application. Set of required primary keys is [`feature_view.primary_keys`](#primary_keys) + If the required primary keys is not provided, it will look for name + of the primary key in feature group in the entry. passed_features: dictionary of feature values provided by the application at runtime. They can replace features values fetched from the feature store as well as providing feature values which are not available in the feature store. @@ -486,6 +488,8 @@ def get_feature_vectors( # Arguments entry: a list of dictionary of feature group primary key and values provided by serving application. Set of required primary keys is [`feature_view.primary_keys`](#primary_keys) + If the required primary keys is not provided, it will look for name + of the primary key in feature group in the entry. passed_features: a list of dictionary of feature values provided by the application at runtime. They can replace features values fetched from the feature store as well as providing feature values which are not available in the feature store. @@ -2566,7 +2570,12 @@ def schema(self, features): @property def primary_keys(self): - """Set of primary key names that is required as keys in input dict object for `get_feature_vector(s)` method.""" + """Set of primary key names that is required as keys in input dict object + for [`get_feature_vector(s)`](#get_feature_vector) method. + When there are duplicated primary key names and prefix is not defined in the query, + prefix is generated and prepended to the primary key name in this format + "fgId_{feature_group_id}_{join_index}" where `join_index` is the order of the join. + """ _vector_server = self._single_vector_server or self._batch_vectors_server if _vector_server: return _vector_server.serving_keys From 9686480f736990a6d71c2062b4ec998f7a876c11 Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Fri, 27 Oct 2023 18:24:49 -0500 Subject: [PATCH 3/3] =?UTF-8?q?[FSTORE-1062]=20Materialisation=20job=20doe?= =?UTF-8?q?s=20not=20work=20with=20new=20Hopsworks=20sc=E2=80=A6=20(#1136)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/com/logicalclocks/utils/MainClass.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/utils/java/src/main/java/com/logicalclocks/utils/MainClass.java b/utils/java/src/main/java/com/logicalclocks/utils/MainClass.java index 65d0a7c4ea..c267309145 100644 --- a/utils/java/src/main/java/com/logicalclocks/utils/MainClass.java +++ b/utils/java/src/main/java/com/logicalclocks/utils/MainClass.java @@ -79,6 +79,14 @@ public static void main(String[] args) throws Exception { .hasArg() .build()); + // This option here is used, however it's here to make the deltastreamer job + // working with the Hopsworks scheduler which adds the `start_time` option + options.addOption(Option.builder("start_time") + .argName("start_time") + .required(false) + .hasArg() + .build()); + CommandLineParser parser = new DefaultParser(); CommandLine commandLine = parser.parse(options, args);