diff --git a/python/hsfs/core/feature_view_api.py b/python/hsfs/core/feature_view_api.py index 73212e1a6f..68ce4f8edd 100644 --- a/python/hsfs/core/feature_view_api.py +++ b/python/hsfs/core/feature_view_api.py @@ -128,6 +128,10 @@ def get_batch_query( end_time, training_dataset_version=None, with_label=False, + primary_keys=False, + event_time=False, + inference_helper_columns=False, + training_helper_columns=False, is_python_engine=False, ): path = self._base_path + [ @@ -145,13 +149,19 @@ def get_batch_query( "start_time": start_time, "end_time": end_time, "with_label": with_label, + "with_primary_keys": primary_keys, + "with_event_time": event_time, + "inference_helper_columns": inference_helper_columns, + "training_helper_columns": training_helper_columns, "is_hive_engine": is_python_engine, "td_version": training_dataset_version, }, ) ) - def get_serving_prepared_statement(self, name, version, batch): + def get_serving_prepared_statement( + self, name, version, batch, inference_helper_columns + ): path = self._base_path + [ name, self._VERSION, @@ -159,7 +169,10 @@ def get_serving_prepared_statement(self, name, version, batch): self._PREPARED_STATEMENT, ] headers = {"content-type": "application/json"} - query_params = {"batch": batch} + query_params = { + "batch": batch, + "inference_helper_columns": inference_helper_columns, + } return serving_prepared_statement.ServingPreparedStatement.from_response_json( self._client._send_request("GET", path, query_params, headers=headers) ) diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 7619c5509e..95cdb03f50 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -79,6 +79,36 @@ def save(self, feature_view_obj): featuregroup=featuregroup, ) ) + if feature_view_obj.inference_helper_columns: + for helper_column_name in feature_view_obj.inference_helper_columns: + ( + feature, + prefix, + featuregroup, + ) = feature_view_obj.query._get_feature_by_name(helper_column_name) + feature_view_obj._features.append( + training_dataset_feature.TrainingDatasetFeature( + name=feature.name, + inference_helper_column=True, + featuregroup=featuregroup, + ) + ) + + if feature_view_obj.training_helper_columns: + for helper_column_name in feature_view_obj.training_helper_columns: + ( + feature, + prefix, + featuregroup, + ) = feature_view_obj.query._get_feature_by_name(helper_column_name) + feature_view_obj._features.append( + training_dataset_feature.TrainingDatasetFeature( + name=feature.name, + training_helper_column=True, + featuregroup=featuregroup, + ) + ) + self._transformation_function_engine.attach_transformation_fn(feature_view_obj) updated_fv = self._feature_view_api.post(feature_view_obj) self.attach_transformation_function(updated_fv) @@ -124,6 +154,10 @@ def get_batch_query( start_time, end_time, with_label=False, + primary_keys=False, + event_time=False, + inference_helper_columns=False, + training_helper_columns=False, training_dataset_version=None, spine=None, ): @@ -136,6 +170,10 @@ def get_batch_query( training_dataset_version=training_dataset_version, is_python_engine=engine.get_type() == "python", with_label=with_label, + primary_keys=primary_keys, + event_time=event_time, + inference_helper_columns=inference_helper_columns, + training_helper_columns=training_helper_columns, ) # verify whatever is passed 1. spine group with dataframe contained, or 2. dataframe # the schema has to be consistent @@ -214,7 +252,14 @@ def get_attached_transformation_fn(self, name, version): return transformation_functions_dict def create_training_dataset( - self, feature_view_obj, training_dataset_obj, user_write_options, spine=None + self, + feature_view_obj, + training_dataset_obj, + user_write_options, + spine=None, + primary_keys=False, + event_time=False, + training_helper_columns=False, ): self._set_event_time(feature_view_obj, training_dataset_obj) updated_instance = self._create_training_data_metadata( @@ -225,6 +270,9 @@ def create_training_dataset( user_write_options, training_dataset_obj=training_dataset_obj, spine=spine, + primary_keys=primary_keys, + event_time=event_time, + training_helper_columns=training_helper_columns, ) return updated_instance, td_job @@ -236,6 +284,9 @@ def get_training_data( training_dataset_obj=None, training_dataset_version=None, spine=None, + primary_keys=False, + event_time=False, + training_helper_columns=False, ): # check if provided td version has already existed. if training_dataset_version: @@ -265,7 +316,18 @@ def get_training_data( if td_updated.training_dataset_type != td_updated.IN_MEMORY: split_df = self._read_from_storage_connector( - td_updated, td_updated.splits, read_options, feature_view_obj.schema + td_updated, + td_updated.splits, + read_options, + with_primary_keys=primary_keys, + primary_keys=self._get_primary_keys_from_query(feature_view_obj.query), + with_event_time=event_time, + event_time=[feature_view_obj.query._left_feature_group.event_time], + with_training_helper_columns=training_helper_columns, + training_helper_columns=feature_view_obj.training_helper_columns, + feature_view_features=[ + feature.name for feature in feature_view_obj.features + ], ) else: self._check_feature_group_accessibility(feature_view_obj) @@ -275,6 +337,10 @@ def get_training_data( start_time=td_updated.event_start_time, end_time=td_updated.event_end_time, with_label=True, + inference_helper_columns=False, + primary_keys=primary_keys, + event_time=event_time, + training_helper_columns=training_helper_columns, spine=spine, ) split_df = engine.get_instance().get_training_data( @@ -349,20 +415,49 @@ def recreate_training_dataset( return training_dataset_obj, td_job def _read_from_storage_connector( - self, training_data_obj, splits, read_options, schema=None + self, + training_data_obj, + splits, + read_options, + with_primary_keys, + primary_keys, + with_event_time, + event_time, + with_training_helper_columns, + training_helper_columns, + feature_view_features, ): + if splits: result = {} for split in splits: path = training_data_obj.location + "/" + str(split.name) result[split.name] = self._read_dir_from_storage_connector( - training_data_obj, path, read_options + training_data_obj, + path, + read_options, + with_primary_keys, + primary_keys, + with_event_time, + event_time, + with_training_helper_columns, + training_helper_columns, + feature_view_features, ) return result else: path = training_data_obj.location + "/" + training_data_obj.name return self._read_dir_from_storage_connector( - training_data_obj, path, read_options + training_data_obj, + path, + read_options, + with_primary_keys, + primary_keys, + with_event_time, + event_time, + with_training_helper_columns, + training_helper_columns, + feature_view_features, ) def _cast_columns(self, data_format, df, schema): @@ -373,15 +468,43 @@ def _cast_columns(self, data_format, df, schema): else: return df - def _read_dir_from_storage_connector(self, training_data_obj, path, read_options): + def _read_dir_from_storage_connector( + self, + training_data_obj, + path, + read_options, + with_primary_keys, + primary_keys, + with_event_time, + event_time, + with_training_helper_columns, + training_helper_columns, + feature_view_features, + ): try: - return training_data_obj.storage_connector.read( + df = training_data_obj.storage_connector.read( # always read from materialized dataset, not query object query=None, data_format=training_data_obj.data_format, options=read_options, path=path, ) + + df = self._drop_helper_columns( + df, feature_view_features, with_primary_keys, primary_keys, False + ) + df = self._drop_helper_columns( + df, feature_view_features, with_event_time, event_time, False + ) + df = self._drop_helper_columns( + df, + feature_view_features, + with_training_helper_columns, + training_helper_columns, + True, + ) + return df + except Exception as e: if isinstance(e, FileNotFoundError): raise FileNotFoundError( @@ -391,6 +514,23 @@ def _read_dir_from_storage_connector(self, training_data_obj, path, read_options else: raise e + def _drop_helper_columns( + self, df, feature_view_features, with_columns, columns, training_helper + ): + if not with_columns: + if engine.get_type() == "spark": + existing_cols = [field.name for field in df.schema.fields] + else: + existing_cols = df.columns + # primary keys and event time are dropped only if they are in the query + drop_cols = list(set(existing_cols).intersection(columns)) + # training helper is always in the query + if not training_helper: + drop_cols = list(set(drop_cols).difference(feature_view_features)) + if drop_cols: + df = engine.get_instance().drop_columns(df, drop_cols) + return df + # This method is used by hsfs_utils to launch a job for python client def compute_training_dataset( self, @@ -399,6 +539,9 @@ def compute_training_dataset( training_dataset_obj=None, training_dataset_version=None, spine=None, + primary_keys=False, + event_time=False, + training_helper_columns=False, ): if training_dataset_obj: pass @@ -414,9 +557,19 @@ def compute_training_dataset( training_dataset_obj.event_start_time, training_dataset_obj.event_end_time, with_label=True, + primary_keys=primary_keys, + event_time=event_time, + inference_helper_columns=False, + training_helper_columns=training_helper_columns, training_dataset_version=training_dataset_obj.version, spine=spine, ) + + # for spark job + user_write_options["training_helper_columns"] = training_helper_columns + user_write_options["primary_keys"] = primary_keys + user_write_options["event_time"] = event_time + td_job = engine.get_instance().write_training_dataset( training_dataset_obj, batch_query, @@ -523,6 +676,9 @@ def get_batch_data( transformation_functions, read_options=None, spine=None, + primary_keys=False, + event_time=False, + inference_helper_columns=False, ): self._check_feature_group_accessibility(feature_view_obj) @@ -531,6 +687,10 @@ def get_batch_data( start_time, end_time, with_label=False, + primary_keys=primary_keys, + event_time=event_time, + inference_helper_columns=inference_helper_columns, + training_helper_columns=False, training_dataset_version=training_dataset_version, spine=spine, ).read(read_options=read_options) @@ -616,3 +776,22 @@ def _get_feature_view_url(self, feature_view): + str(feature_view.version) ) return util.get_hostname_replaced_url(path) + + def _get_primary_keys_from_query(self, fv_query_obj): + fv_pks = set( + [ + feature.name + for feature in fv_query_obj._left_feature_group.features + if feature.primary + ] + ) + for _join in fv_query_obj._joins: + fv_pks.update( + [ + feature.name + for feature in _join.query._left_feature_group.features + if feature.primary + ] + ) + + return list(fv_pks) diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index 6cba795f6c..eb82c6bf9e 100644 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -43,10 +43,21 @@ def __init__( self._training_dataset_version = training_dataset_version self._features = features self._feature_vector_col_name = ( - [feat.name for feat in features if not feat.label] if features else [] + [ + feat.name + for feat in features + if not ( + feat.label + or feat.inference_helper_column + or feat.training_helper_column + ) + ] + if features + else [] ) self._prepared_statement_engine = None self._prepared_statements = None + self._helper_column_prepared_statements = None self._serving_keys = serving_keys self._valid_serving_key = None self._pkname_by_serving_index = None @@ -67,14 +78,19 @@ def __init__( self._feature_view_engine = feature_view_engine.FeatureViewEngine( feature_store_id ) + self._transformation_functions = None - def init_serving(self, entity, batch, external, options=None): + def init_serving( + self, entity, batch, external, inference_helper_columns=False, options=None + ): if external is None: external = isinstance(client.get_instance(), client.external.Client) # `init_prepared_statement` should be the last because other initialisations # has to be done successfully before it is able to fetch feature vectors. self.init_transformation(entity) - self.init_prepared_statement(entity, batch, external, options=options) + self.init_prepared_statement( + entity, batch, external, inference_helper_columns, options=options + ) def init_batch_scoring(self, entity): self.init_transformation(entity) @@ -83,10 +99,18 @@ def init_transformation(self, entity): # attach transformation functions self._transformation_functions = self._get_transformation_fns(entity) - def init_prepared_statement(self, entity, batch, external, options=None): + def init_prepared_statement( + self, entity, batch, external, inference_helper_columns, options=None + ): if isinstance(entity, feature_view.FeatureView): + if inference_helper_columns: + helper_column_prepared_statements = ( + self._feature_view_api.get_serving_prepared_statement( + entity.name, entity.version, batch, inference_helper_columns + ) + ) prepared_statements = self._feature_view_api.get_serving_prepared_statement( - entity.name, entity.version, batch + entity.name, entity.version, batch, inference_helper_columns=False ) elif isinstance(entity, training_dataset.TrainingDataset): prepared_statements = ( @@ -99,15 +123,54 @@ def init_prepared_statement(self, entity, batch, external, options=None): # reset values to default, as user may be re-initialising with different parameters self._prepared_statement_engine = None self._prepared_statements = None + self._helper_column_prepared_statements = None self._external = external self._set_mysql_connection(options=options) + ( + self._prepared_statements, + feature_name_order_by_psp, + prefix_by_serving_index, + ) = self._parametrize_prepared_statements(prepared_statements, batch) + + if inference_helper_columns: + ( + self._helper_column_prepared_statements, + _, + _, + ) = self._parametrize_prepared_statements( + helper_column_prepared_statements, batch + ) + + self._prefix_by_serving_index = prefix_by_serving_index + for sk in self._serving_keys: + self._serving_key_by_serving_index[ + sk.join_index + ] = self._serving_key_by_serving_index.get(sk.join_index, []) + [sk] + # sort the serving by PreparedStatementParameter.index + for join_index in self._serving_key_by_serving_index: + # feature_name_order_by_psp do not include the join index when the joint feature only contains label only + # But _serving_key_by_serving_index include the index when the join_index is 0 (left side) + if join_index in feature_name_order_by_psp: + self._serving_key_by_serving_index[join_index] = sorted( + self._serving_key_by_serving_index[join_index], + key=lambda _sk: feature_name_order_by_psp[join_index].get( + _sk.feature_name, 0 + ), + ) + # get schemas for complex features once + self._complex_features = self.get_complex_feature_schemas() + self._valid_serving_key = set( + [sk.feature_name for sk in self._serving_keys] + + [sk.required_serving_key for sk in self._serving_keys] + ) + + def _parametrize_prepared_statements(self, prepared_statements, batch): prepared_statements_dict = {} serving_keys = set() feature_name_order_by_psp = dict() prefix_by_serving_index = {} - for prepared_statement in prepared_statements: query_online = str(prepared_statement.query_online).replace("\n", " ") prefix_by_serving_index[ @@ -159,32 +222,15 @@ def init_prepared_statement(self, entity, batch, external, options=None): prepared_statements_dict[ prepared_statement.prepared_statement_index ] = query_online + # assign serving key if it is not provided. if self._serving_keys is None: self._serving_keys = serving_keys - self._prepared_statements = prepared_statements_dict - self._prefix_by_serving_index = prefix_by_serving_index - for sk in self._serving_keys: - self._serving_key_by_serving_index[ - sk.join_index - ] = self._serving_key_by_serving_index.get(sk.join_index, []) + [sk] - # sort the serving by PreparedStatementParameter.index - for join_index in self._serving_key_by_serving_index: - # feature_name_order_by_psp do not include the join index when the joint feature only contains label only - # But _serving_key_by_serving_index include the index when the join_index is 0 (left side) - if join_index in feature_name_order_by_psp: - self._serving_key_by_serving_index[join_index] = sorted( - self._serving_key_by_serving_index[join_index], - key=lambda _sk: feature_name_order_by_psp[join_index].get( - _sk.feature_name, 0 - ), - ) - # get schemas for complex features once - self._complex_features = self.get_complex_feature_schemas() - self._valid_serving_key = set( - [sk.feature_name for sk in self._serving_keys] - + [sk.required_serving_key for sk in self._serving_keys] + return ( + prepared_statements_dict, + feature_name_order_by_psp, + prefix_by_serving_index, ) def _validate_serving_key(self, entry): @@ -196,10 +242,131 @@ def _validate_serving_key(self, entry): ) def get_feature_vector( - self, entry, return_type=None, passed_features=[], allow_missing=False + self, + entry, + return_type=None, + passed_features=[], + allow_missing=False, + ): + """Assembles serving vector from online feature store.""" + serving_vector = self._vector_result(entry, self._prepared_statements) + + # Add the passed features + serving_vector.update(passed_features) + + # apply transformation functions + result_dict = self._apply_transformation(serving_vector) + + vector = self._generate_vector(result_dict, allow_missing) + + if return_type.lower() == "list": + return vector + elif return_type.lower() == "numpy": + return np.array(vector) + elif return_type.lower() == "pandas": + pandas_df = pd.DataFrame(vector).transpose() + pandas_df.columns = self._feature_vector_col_name + return pandas_df + else: + raise Exception( + "Unknown return type. Supported return types are 'list', 'pandas' and 'numpy'" + ) + + def get_feature_vectors( + self, + entries, + return_type=None, + passed_features=[], + allow_missing=False, + ): + """Assembles serving vector from online feature store.""" + batch_results, _ = self._batch_vector_results( + entries, self._prepared_statements + ) + + # apply passed features to each batch result + for vector_index, pf in enumerate(passed_features): + batch_results[vector_index].update(pf) + + # apply transformation functions + batch_transformed = list( + map( + lambda results_dict: self._apply_transformation(results_dict), + batch_results, + ) + ) + + # get vectors + vectors = [] + for result in batch_transformed: + # for backward compatibility, before 3.4, if result is empty, + # instead of throwing error, it skips the result + if len(result) != 0 or allow_missing: + vectors.append(self._generate_vector(result, fill_na=allow_missing)) + + if return_type.lower() == "list": + return vectors + elif return_type.lower() == "numpy": + return np.array(vectors) + elif return_type.lower() == "pandas": + pandas_df = pd.DataFrame(vectors) + pandas_df.columns = self._feature_vector_col_name + return pandas_df + else: + raise Exception( + "Unknown return type. Supported return types are 'list', 'pandas' and 'numpy'" + ) + + def get_inference_helper(self, entry, return_type): + """Assembles serving vector from online feature store.""" + + serving_vector = self._vector_result( + entry, self._helper_column_prepared_statements + ) + + if return_type.lower() == "pandas": + return pd.DataFrame([serving_vector]) + elif return_type.lower() == "dict": + return serving_vector + else: + raise Exception( + "Unknown return type. Supported return types are 'pandas' and 'dict'" + ) + + def get_inference_helpers( + self, + feature_view_object, + entries, + return_type, ): """Assembles serving vector from online feature store.""" + batch_results, serving_keys = self._batch_vector_results( + entries, self._helper_column_prepared_statements + ) + # drop serving and primary key names from the result dict + drop_list = serving_keys + list(feature_view_object.primary_keys) + _ = list( + map( + lambda results_dict: [ + results_dict.pop(x, None) + for x in drop_list + if x not in feature_view_object.inference_helper_columns + ], + batch_results, + ) + ) + + if return_type.lower() == "dict": + return batch_results + elif return_type.lower() == "pandas": + return pd.DataFrame(batch_results) + else: + raise Exception( + "Unknown return type. Supported return types are 'dict' and 'pandas'" + ) + + def _vector_result(self, entry, prepared_statement_objects): if all([isinstance(val, list) for val in entry.values()]): raise ValueError( "Entry is expected to be single value per primary key. " @@ -212,7 +379,7 @@ def get_feature_vector( # Initialize the set of values serving_vector = {} with self._prepared_statement_engine.connect() as mysql_conn: - for prepared_statement_index in self._prepared_statements: + for prepared_statement_index in prepared_statement_objects: pk_entry = {} next_statement = False for sk in self._serving_key_by_serving_index[prepared_statement_index]: @@ -232,7 +399,9 @@ def get_feature_vector( continue # Fetch the data from the online feature store - prepared_statement = self._prepared_statements[prepared_statement_index] + prepared_statement = prepared_statement_objects[ + prepared_statement_index + ] result_proxy = mysql_conn.execute( prepared_statement, pk_entry ).fetchall() @@ -241,32 +410,10 @@ def get_feature_vector( self._complex_features, row._asdict() ) serving_vector.update(result_dict) - # Add the passed features - serving_vector.update(passed_features) - # apply transformation functions - result_dict = self._apply_transformation(serving_vector) - - vector = self._generate_vector(result_dict, allow_missing) - - if return_type.lower() == "list": - return vector - elif return_type.lower() == "numpy": - return np.array(vector) - elif return_type.lower() == "pandas": - pandas_df = pd.DataFrame(vector).transpose() - pandas_df.columns = self._feature_vector_col_name - return pandas_df - else: - raise Exception( - "Unknown return type. Supported return types are 'list', 'pandas' and 'numpy'" - ) - - def get_feature_vectors( - self, entries, return_type=None, passed_features=[], allow_missing=False - ): - """Assembles serving vector from online feature store.""" + return serving_vector + def _batch_vector_results(self, entries, prepared_statement_objects): # create dict object that will have of order of the vector as key and values as # vector itself to stitch them correctly if there are multiple feature groups involved. At this point we # expect that backend will return correctly ordered vectors. @@ -274,12 +421,14 @@ def get_feature_vectors( # for each prepare statement, do a batch look up # then concatenate the results with self._prepared_statement_engine.connect() as mysql_conn: - for prepared_statement_index in self._prepared_statements: + for prepared_statement_index in prepared_statement_objects: # prepared_statement_index include fg with label only # But _serving_key_by_serving_index include the index when the join_index is 0 (left side) if prepared_statement_index not in self._serving_key_by_serving_index: continue - prepared_statement = self._prepared_statements[prepared_statement_index] + prepared_statement = prepared_statement_objects[ + prepared_statement_index + ] entry_values_tuples = list( map( lambda e: tuple( @@ -334,38 +483,7 @@ def get_feature_vectors( self._get_result_key_serving_key(serving_keys, entry), {} ) ) - # apply passed features to each batch result - for vector_index, pf in enumerate(passed_features): - batch_results[vector_index].update(pf) - - # apply transformation functions - batch_transformed = list( - map( - lambda results_dict: self._apply_transformation(results_dict), - batch_results, - ) - ) - - # get vectors - vectors = [] - for result in batch_transformed: - # for backward compatibility, before 3.4, if result is empty, - # instead of throwing error, it skips the result - if len(result) != 0 or allow_missing: - vectors.append(self._generate_vector(result, fill_na=allow_missing)) - - if return_type.lower() == "list": - return vectors - elif return_type.lower() == "numpy": - return np.array(vectors) - elif return_type.lower() == "pandas": - pandas_df = pd.DataFrame(vectors) - pandas_df.columns = self._feature_vector_col_name - return pandas_df - else: - raise Exception( - "Unknown return type. Supported return types are 'list', 'pandas' and 'numpy'" - ) + return batch_results, serving_keys def get_complex_feature_schemas(self): return { diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 6e2f106d3b..4151811a6b 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -571,6 +571,9 @@ def split_labels(self, df, labels): else: return df, None + def drop_columns(self, df, drop_cols): + return df.drop(columns=drop_cols) + def _prepare_transform_split_df( self, query_obj, training_dataset_obj, feature_view_obj, read_option ): diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 7f0efbd841..854abfc649 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -500,6 +500,9 @@ def split_labels(self, df, labels): else: return df, None + def drop_columns(self, df, drop_cols): + return df.drop(*drop_cols) + def write_training_dataset( self, training_dataset, diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 61c9409d0f..91fb9f8204 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -1384,6 +1384,8 @@ def create_feature_view( version: Optional[int] = None, description: Optional[str] = "", labels: Optional[List[str]] = [], + inference_helper_columns: Optional[List[str]] = [], + training_helper_columns: Optional[List[str]] = [], transformation_functions: Optional[Dict[str, TransformationFunction]] = {}, ): """Create a feature view metadata object and saved it to hopsworks. @@ -1451,6 +1453,23 @@ def create_feature_view( the feature view. When replaying a `Query` during model inference, the label features can be omitted from the feature vector retrieval. Defaults to `[]`, no label. + inference_helper_columns: A list of feature names that are not used in training the model itself but can be + used during batch or online inference for extra information. Inference helper column name(s) must be + part of the `Query` object. If inference helper column name(s) belong to feature group that is part + of a `Join` with `prefix` defined, then this prefix needs to be prepended to the original column name + when defining `inference_helper_columns` list. When replaying a `Query` during model inference, + the inference helper columns optionally can be omitted during batch (`get_batch_data`) and will be + omitted during online inference (`get_feature_vector(s)`). To get inference helper column(s) during + online inference use `get_inference_helper(s)` method. Defaults to `[], no helper columns. + training_helper_columns: A list of feature names that are not the part of the model schema itself but can be + used during training as a helper for extra information. Training helper column name(s) must be + part of the `Query` object. If training helper column name(s) belong to feature group that is part + of a `Join` with `prefix` defined, then this prefix needs to prepended to the original column name when + defining `training_helper_columns` list. When replaying a `Query` during model inference, + the training helper columns will be omitted during both batch and online inference. + Training helper columns can be optionally fetched with training data. For more details see + documentation for feature view's get training data methods. Defaults to `[], no training helper + columns. transformation_functions: A dictionary mapping tansformation functions to to the features they should be applied to before writing out the vector and at inference time. Defaults to `{}`, no @@ -1466,6 +1485,8 @@ def create_feature_view( version=version, description=description, labels=labels, + inference_helper_columns=inference_helper_columns, + training_helper_columns=training_helper_columns, transformation_functions=transformation_functions, ) return self._feature_view_engine.save(feat_view) @@ -1478,6 +1499,8 @@ def get_or_create_feature_view( version: int, description: Optional[str] = "", labels: Optional[List[str]] = [], + inference_helper_columns: Optional[List[str]] = [], + training_helper_columns: Optional[List[str]] = [], transformation_functions: Optional[Dict[str, TransformationFunction]] = {}, ): """Get feature view metadata object or create a new one if it doesn't exist. This method doesn't update @@ -1507,6 +1530,23 @@ def get_or_create_feature_view( the feature view. When replaying a `Query` during model inference, the label features can be omitted from the feature vector retrieval. Defaults to `[]`, no label. + inference_helper_columns: A list of feature names that are not used in training the model itself but can be + used during batch or online inference for extra information. Inference helper column name(s) must be + part of the `Query` object. If inference helper column name(s) belong to feature group that is part + of a `Join` with `prefix` defined, then this prefix needs to be prepended to the original column name + when defining `inference_helper_columns` list. When replaying a `Query` during model inference, + the inference helper columns optionally can be omitted during batch (`get_batch_data`) and will be + omitted during online inference (`get_feature_vector(s)`). To get inference helper column(s) during + online inference use `get_inference_helper(s)` method. Defaults to `[], no helper columns. + training_helper_columns: A list of feature names that are not the part of the model schema itself but can be + used during training as a helper for extra information. Training helper column name(s) must be + part of the `Query` object. If training helper column name(s) belong to feature group that is part + of a `Join` with `prefix` defined, then this prefix needs to prepended to the original column name when + defining `training_helper_columns` list. When replaying a `Query` during model inference, + the training helper columns will be omitted during both batch and online inference. + Training helper columns can be optionally fetched with training data. For more details see + documentation for feature view's get training data methods. Defaults to `[], no training helper + columns. transformation_functions: A dictionary mapping tansformation functions to to the features they should be applied to before writing out the vector and at inference time. Defaults to `{}`, no @@ -1528,6 +1568,8 @@ def get_or_create_feature_view( version=version, description=description, labels=labels, + inference_helper_columns=inference_helper_columns, + training_helper_columns=training_helper_columns, transformation_functions=transformation_functions, ) else: diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index ca5c5749bc..5da8fd43e5 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -56,6 +56,8 @@ def __init__( version: Optional[int] = None, description: Optional[str] = "", labels: Optional[List[str]] = [], + inference_helper_columns: Optional[List[str]] = [], + training_helper_columns: Optional[List[str]] = [], transformation_functions: Optional[Dict[str, TransformationFunction]] = {}, featurestore_name=None, serving_keys: Optional[List[ServingKey]] = None, @@ -69,6 +71,8 @@ def __init__( self._version = version self._description = description self._labels = labels + self._inference_helper_columns = inference_helper_columns + self._training_helper_columns = training_helper_columns self._transformation_functions = ( { ft_name: copy.deepcopy(transformation_functions[ft_name]) @@ -240,7 +244,9 @@ def init_serving( training_dataset_version, serving_keys=self._serving_keys, ) - self._single_vector_server.init_serving(self, False, external, options=options) + self._single_vector_server.init_serving( + self, False, external, True, options=options + ) # initiate batch vector server self._batch_vectors_server = vector_server.VectorServer( @@ -249,7 +255,9 @@ def init_serving( training_dataset_version, serving_keys=self._serving_keys, ) - self._batch_vectors_server.init_serving(self, True, external, options=options) + self._batch_vectors_server.init_serving( + self, True, external, True, options=options + ) def init_batch_scoring( self, @@ -519,6 +527,108 @@ def get_feature_vectors( entry, return_type, passed_features, allow_missing ) + def get_inference_helper( + self, + entry: Dict[str, Any], + external: Optional[bool] = None, + return_type: Optional[str] = "pandas", + ): + """Returns assembled inference helper column vectors from online feature store. + !!! example + ```python + # get feature store instance + fs = ... + + # get feature view instance + feature_view = fs.get_feature_view(...) + + # get assembled inference helper column vector + feature_view.get_inference_helper( + entry = {"pk1": 1, "pk2": 2} + ) + ``` + + # Arguments + entry: dictionary of feature group primary key and values provided by serving application. + Set of required primary keys is [`feature_view.primary_keys`](#primary_keys) + external: boolean, optional. If set to True, the connection to the + online feature store is established using the same host as + for the `host` parameter in the [`hsfs.connection()`](connection_api.md#connection) method. + If set to False, the online feature store storage connector is used + which relies on the private IP. Defaults to True if connection to Hopsworks is established from + external environment (e.g AWS Sagemaker or Google Colab), otherwise to False. + return_type: `"pandas"` or `"dict"`. Defaults to `"pandas"`. + + # Returns + `pd.DataFrame` or `dict`. Defaults to `pd.DataFrame`. + + # Raises + `Exception`. When primary key entry cannot be found in one or more of the feature groups used by this + feature view. + """ + if self._single_vector_server is None: + self.init_serving(external=external) + return self._single_vector_server.get_inference_helper(entry, return_type) + + def get_inference_helpers( + self, + entry: List[Dict[str, Any]], + external: Optional[bool] = None, + return_type: Optional[str] = "pandas", + ): + """Returns assembled inference helper column vectors in batches from online feature store. + !!! warning "Missing primary key entries" + If any of the provided primary key elements in `entry` can't be found in any + of the feature groups, no inference helper column vectors for that primary key value will be + returned. + If it can be found in at least one but not all feature groups used by + this feature view the call to this method will raise an exception. + + !!! example + ```python + # get feature store instance + fs = ... + + # get feature view instance + feature_view = fs.get_feature_view(...) + + # get assembled inference helper column vectors + feature_view.get_inference_helpers( + entry = [ + {"pk1": 1, "pk2": 2}, + {"pk1": 3, "pk2": 4}, + {"pk1": 5, "pk2": 6} + ] + ) + ``` + + # Arguments + entry: a list of dictionary of feature group primary key and values provided by serving application. + Set of required primary keys is [`feature_view.primary_keys`](#primary_keys) + external: boolean, optional. If set to True, the connection to the + online feature store is established using the same host as + for the `host` parameter in the [`hsfs.connection()`](connection_api.md#connection) method. + If set to False, the online feature store storage connector is used + which relies on the private IP. Defaults to True if connection to Hopsworks is established from + external environment (e.g AWS Sagemaker or Google Colab), otherwise to False. + return_type: `"pandas"` or `"dict"`. Defaults to `"dict"`. + + # Returns + `pd.DataFrame` or `List[dict]`. Defaults to `pd.DataFrame`. + + Returned `pd.DataFrame` or `List[dict]` contains feature values related to provided primary + keys, ordered according to positions of this features in the feature view query. + + # Raises + `Exception`. When primary key entry cannot be found in one or more of the feature groups used by this + feature view. + """ + if self._batch_vectors_server is None: + self.init_serving(external=external) + return self._batch_vectors_server.get_inference_helpers( + self, entry, return_type + ) + @usage.method_logger def get_batch_data( self, @@ -535,6 +645,9 @@ def get_batch_data( TypeVar("SpineGroup"), ] ] = None, + primary_keys=False, + event_time=False, + inference_helper_columns=False, ): """Get a batch of data from an event time interval from the offline feature store. @@ -582,7 +695,14 @@ def get_batch_data( It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - + primary_keys: whether to include primary key features or not. Defaults to `False`, no primary key + features. + event_time: whether to include event time feature or not. Defaults to `False`, no event time feature. + inference_helper_columns: whether to include inference helper columns or not. + Inference helper columns are a list of feature names in the feature view, defined during its creation, + that may not be used in training the model itself but can be used during batch or online inference + for extra information. If inference helper columns were not defined in the feature view + `inference_helper_columns=True` will not any effect. Defaults to `False`, no helper columns. # Returns `DataFrame`: A dataframe """ @@ -598,6 +718,9 @@ def get_batch_data( self._batch_scoring_server._transformation_functions, read_options, spine, + primary_keys, + event_time, + inference_helper_columns, ) def add_tag(self, name: str, value): @@ -736,6 +859,9 @@ def create_training_data( TypeVar("SpineGroup"), ] ] = None, + primary_keys=False, + event_time=False, + training_helper_columns=False, ): """Create the metadata for a training dataset and save the corresponding training data into `location`. The training data can be retrieved by calling `feature_view.get_training_data`. @@ -899,7 +1025,14 @@ def create_training_data( It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - + primary_keys: whether to include primary key features or not. Defaults to `False`, no primary key + features. + event_time: whether to include event time feature or not. Defaults to `False`, no event time feature. + training_helper_columns: whether to include training helper columns or not. Training helper columns are a + list of feature names in the feature view, defined during its creation, that are not the part of the + model schema itself but can be used during training as a helper for extra information. + If training helper columns were not defined in the feature view then`training_helper_columns=True` + will not have any effect. Defaults to `False`, no training helper columns. # Returns (td_version, `Job`): Tuple of training dataset version and job. When using the `python` engine, it returns the Hopsworks Job @@ -923,7 +1056,13 @@ def create_training_data( ) # td_job is used only if the python engine is used td, td_job = self._feature_view_engine.create_training_dataset( - self, td, write_options, spine + self, + td, + write_options, + spine=spine, + primary_keys=primary_keys, + event_time=event_time, + training_helper_columns=training_helper_columns, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -959,6 +1098,9 @@ def create_train_test_split( TypeVar("SpineGroup"), ] ] = None, + primary_keys=False, + event_time=False, + training_helper_columns=False, ): """Create the metadata for a training dataset and save the corresponding training data into `location`. The training data is split into train and test set at random or according to time ranges. @@ -1166,7 +1308,15 @@ def create_train_test_split( It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - + primary_keys: whether to include primary key features or not. Defaults to `False`, no primary key + features. + event_time: whether to include event time feature or not. Defaults to `False`, no event time feature. + training_helper_columns: whether to include training helper columns or not. + Training helper columns are a list of feature names in the feature view, defined during its creation, + that are not the part of the model schema itself but can be used during training as a helper for + extra information. If training helper columns were not defined in the feature view + then`training_helper_columns=True` will not have any effect. Defaults to `False`, no training helper + columns. # Returns (td_version, `Job`): Tuple of training dataset version and job. When using the `python` engine, it returns the Hopsworks Job @@ -1198,7 +1348,13 @@ def create_train_test_split( ) # td_job is used only if the python engine is used td, td_job = self._feature_view_engine.create_training_dataset( - self, td, write_options, spine + self, + td, + write_options, + spine=spine, + primary_keys=primary_keys, + event_time=event_time, + training_helper_columns=training_helper_columns, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -1237,6 +1393,9 @@ def create_train_validation_test_split( TypeVar("SpineGroup"), ] ] = None, + primary_keys=False, + event_time=False, + training_helper_columns=False, ): """Create the metadata for a training dataset and save the corresponding training data into `location`. The training data is split into train, validation, and test set at random or according to time range. @@ -1430,7 +1589,15 @@ def create_train_validation_test_split( It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - + primary_keys: whether to include primary key features or not. Defaults to `False`, no primary key + features. + event_time: whether to include event time feature or not. Defaults to `False`, no event time feature. + training_helper_columns: whether to include training helper columns or not. + Training helper columns are a list of feature names in the feature view, defined during its creation, + that are not the part of the model schema itself but can be used during training as a helper for + extra information. If training helper columns were not defined in the feature view + then`training_helper_columns=True` will not have any effect. Defaults to `False`, no training helper + columns. # Returns (td_version, `Job`): Tuple of training dataset version and job. When using the `python` engine, it returns the Hopsworks Job @@ -1470,7 +1637,13 @@ def create_train_validation_test_split( ) # td_job is used only if the python engine is used td, td_job = self._feature_view_engine.create_training_dataset( - self, td, write_options, spine + self, + td, + write_options, + spine=spine, + primary_keys=primary_keys, + event_time=event_time, + training_helper_columns=training_helper_columns, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -1566,6 +1739,9 @@ def training_data( TypeVar("SpineGroup"), ] ] = None, + primary_keys=False, + event_time=False, + training_helper_columns=False, ): """ Create the metadata for a training dataset and get the corresponding training data from the offline feature store. @@ -1653,7 +1829,15 @@ def training_data( It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - + primary_keys: whether to include primary key features or not. Defaults to `False`, no primary key + features. + event_time: whether to include event time feature or not. Defaults to `False`, no event time feature. + training_helper_columns: whether to include training helper columns or not. + Training helper columns are a list of feature names in the feature view, defined during its creation, + that are not the part of the model schema itself but can be used during training as a helper for + extra information. If training helper columns were not defined in the feature view + then`training_helper_columns=True` will not have any effect. Defaults to `False`, no training helper + columns. # Returns (X, y): Tuple of dataframe of features and labels. If there are no labels, y returns `None`. """ @@ -1673,7 +1857,13 @@ def training_data( extra_filter=extra_filter, ) td, df = self._feature_view_engine.get_training_data( - self, read_options, training_dataset_obj=td, spine=spine + self, + read_options, + training_dataset_obj=td, + spine=spine, + primary_keys=primary_keys, + event_time=event_time, + training_helper_columns=training_helper_columns, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -1703,6 +1893,9 @@ def train_test_split( TypeVar("SpineGroup"), ] ] = None, + primary_keys=False, + event_time=False, + training_helper_columns=False, ): """ Create the metadata for a training dataset and get the corresponding training data from the offline feature store. @@ -1800,7 +1993,15 @@ def train_test_split( It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - + primary_keys: whether to include primary key features or not. Defaults to `False`, no primary key + features. + event_time: whether to include event time feature or not. Defaults to `False`, no event time feature. + training_helper_columns: whether to include training helper columns or not. + Training helper columns are a list of feature names in the feature view, defined during its creation, + that are not the part of the model schema itself but can be used during training as a helper for + extra information. If training helper columns were not defined in the feature view + then`training_helper_columns=True` will not have any effect. Defaults to `False`, no training helper + columns. # Returns (X_train, X_test, y_train, y_test): Tuple of dataframe of features and labels @@ -1833,6 +2034,9 @@ def train_test_split( training_dataset_obj=td, splits=[TrainingDatasetSplit.TRAIN, TrainingDatasetSplit.TEST], spine=spine, + primary_keys=primary_keys, + event_time=event_time, + training_helper_columns=training_helper_columns, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -1874,6 +2078,9 @@ def train_validation_test_split( TypeVar("SpineGroup"), ] ] = None, + primary_keys=False, + event_time=False, + training_helper_columns=False, ): """ Create the metadata for a training dataset and get the corresponding training data from the offline feature store. @@ -1984,7 +2191,15 @@ def train_validation_test_split( It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - + primary_keys: whether to include primary key features or not. Defaults to `False`, no primary key + features. + event_time: whether to include event time feature or not. Defaults to `False`, no event time feature. + training_helper_columns: whether to include training helper columns or not. + Training helper columns are a list of feature names in the feature view, defined during its creation, + that are not the part of the model schema itself but can be used during training as a helper for + extra information. If training helper columns were not defined in the feature view + then`training_helper_columns=True` will not have any effect. Defaults to `False`, no training helper + columns. # Returns (X_train, X_val, X_test, y_train, y_val, y_test): Tuple of dataframe of features and labels @@ -2030,6 +2245,9 @@ def train_validation_test_split( TrainingDatasetSplit.TEST, ], spine=spine, + primary_keys=primary_keys, + event_time=event_time, + training_helper_columns=training_helper_columns, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -2063,6 +2281,9 @@ def get_training_data( self, training_dataset_version, read_options: Optional[Dict[Any, Any]] = None, + primary_keys=False, + event_time=False, + training_helper_columns=False, ): """ Get training data created by `feature_view.create_training_data` @@ -2098,12 +2319,25 @@ def get_training_data( * key `"hive_config"` to pass a dictionary of hive or tez configurations. For example: `{"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}}` Defaults to `{}`. - + primary_keys: whether to include primary key features or not. Defaults to `False`, no primary key + features. + event_time: whether to include event time feature or not. Defaults to `False`, no event time feature. + training_helper_columns: whether to include training helper columns or not. + Training helper columns are a list of feature names in the feature view, defined during its creation, + that are not the part of the model schema itself but can be used during training as a helper for + extra information. If training helper columns were not defined in the feature view or during + materializing training dataset in the file system then`training_helper_columns=True` will not have + any effect. Defaults to `False`, no training helper columns. # Returns (X, y): Tuple of dataframe of features and labels """ td, df = self._feature_view_engine.get_training_data( - self, read_options, training_dataset_version=training_dataset_version + self, + read_options, + training_dataset_version=training_dataset_version, + primary_keys=primary_keys, + event_time=event_time, + training_helper_columns=training_helper_columns, ) return df @@ -2112,6 +2346,9 @@ def get_train_test_split( self, training_dataset_version, read_options: Optional[Dict[Any, Any]] = None, + primary_keys=False, + event_time=False, + training_helper_columns=False, ): """ Get training data created by `feature_view.create_train_test_split` @@ -2142,7 +2379,15 @@ def get_train_test_split( * key `"hive_config"` to pass a dictionary of hive or tez configurations. For example: `{"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}}` Defaults to `{}`. - + primary_keys: whether to include primary key features or not. Defaults to `False`, no primary key + features. + event_time: whether to include event time feature or not. Defaults to `False`, no event time feature. + training_helper_columns: whether to include training helper columns or not. + Training helper columns are a list of feature names in the feature view, defined during its creation, + that are not the part of the model schema itself but can be used during training as a helper for + extra information. If training helper columns were not defined in the feature view or during + materializing training dataset in the file system then`training_helper_columns=True` will not have + any effect. Defaults to `False`, no training helper columns. # Returns (X_train, X_test, y_train, y_test): Tuple of dataframe of features and labels @@ -2152,6 +2397,9 @@ def get_train_test_split( read_options, training_dataset_version=training_dataset_version, splits=[TrainingDatasetSplit.TRAIN, TrainingDatasetSplit.TEST], + primary_keys=primary_keys, + event_time=event_time, + training_helper_columns=training_helper_columns, ) return df @@ -2160,6 +2408,9 @@ def get_train_validation_test_split( self, training_dataset_version, read_options: Optional[Dict[Any, Any]] = None, + primary_keys=False, + event_time=False, + training_helper_columns=False, ): """ Get training data created by `feature_view.create_train_validation_test_split` @@ -2190,7 +2441,15 @@ def get_train_validation_test_split( * key `"hive_config"` to pass a dictionary of hive or tez configurations. For example: `{"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}}` Defaults to `{}`. - + primary_keys: whether to include primary key features or not. Defaults to `False`, no primary key + features. + event_time: whether to include event time feature or not. Defaults to `False`, no event time feature. + training_helper_columns: whether to include training helper columns or not. + Training helper columns are a list of feature names in the feature view, defined during its creation, + that are not the part of the model schema itself but can be used during training as a helper for + extra information. If training helper columns were not defined in the feature view or during + materializing training dataset in the file system then`training_helper_columns=True` will not have + any effect. Defaults to `False`, no training helper columns. # Returns (X_train, X_val, X_test, y_train, y_val, y_test): Tuple of dataframe of features and labels @@ -2204,6 +2463,9 @@ def get_train_validation_test_split( TrainingDatasetSplit.VALIDATION, TrainingDatasetSplit.TEST, ], + primary_keys=primary_keys, + event_time=event_time, + training_helper_columns=training_helper_columns, ) return df @@ -2452,6 +2714,12 @@ def from_response_json(cls, json_dict): ] fv.schema = features fv.labels = [feature.name for feature in features if feature.label] + fv.inference_helper_columns = [ + feature.name for feature in features if feature.inference_helper_column + ] + fv.training_helper_columns = [ + feature.name for feature in features if feature.training_helper_column + ] return fv def update_from_response_json(self, json_dict): @@ -2464,6 +2732,8 @@ def update_from_response_json(self, json_dict): "featurestore_id", "version", "labels", + "inference_helper_columns", + "training_helper_columns", "schema", "serving_keys", ]: @@ -2541,6 +2811,32 @@ def labels(self): def labels(self, labels): self._labels = [lb.lower() for lb in labels] + @property + def inference_helper_columns(self): + """The helper column sof the feature view. + + Can be a composite of multiple features. + """ + return self._inference_helper_columns + + @inference_helper_columns.setter + def inference_helper_columns(self, inference_helper_columns): + self._inference_helper_columns = [ + exf.lower() for exf in inference_helper_columns + ] + + @property + def training_helper_columns(self): + """The helper column sof the feature view. + + Can be a composite of multiple features. + """ + return self._training_helper_columns + + @training_helper_columns.setter + def training_helper_columns(self, training_helper_columns): + self._training_helper_columns = [exf.lower() for exf in training_helper_columns] + @property def description(self): """Description of the feature view.""" @@ -2597,7 +2893,7 @@ def primary_keys(self): _vector_server = vector_server.VectorServer( self._featurestore_id, self._features, serving_keys=self._serving_keys ) - _vector_server.init_prepared_statement(self, False, False) + _vector_server.init_prepared_statement(self, False, False, False) return _vector_server.serving_keys @property diff --git a/python/hsfs/training_dataset_feature.py b/python/hsfs/training_dataset_feature.py index 8ab02b1546..767d78b78d 100644 --- a/python/hsfs/training_dataset_feature.py +++ b/python/hsfs/training_dataset_feature.py @@ -30,6 +30,8 @@ def __init__( featuregroup=None, feature_group_feature_name=None, label=False, + inference_helper_column=False, + training_helper_column=False, transformation_function=None, **kwargs, ): @@ -43,6 +45,8 @@ def __init__( ) self._feature_group_feature_name = feature_group_feature_name self._label = label + self._inference_helper_column = inference_helper_column + self._training_helper_column = training_helper_column self._transformation_function = ( TransformationFunction.from_response_json(transformation_function) if isinstance(transformation_function, dict) @@ -55,6 +59,8 @@ def to_dict(self): "type": self._type, "index": self._index, "label": self._label, + "inferenceHelperColumn": self._inference_helper_column, + "trainingHelperColumn": self._training_helper_column, "transformationFunction": self._transformation_function, "featureGroupFeatureName": self._feature_group_feature_name, "featuregroup": self._feature_group, @@ -100,6 +106,24 @@ def label(self): def label(self, label): self._label = label + @property + def inference_helper_column(self): + """Indicator if it is feature.""" + return self._inference_helper_column + + @inference_helper_column.setter + def inference_helper_column(self, inference_helper_column): + self._inference_helper_column = inference_helper_column + + @property + def training_helper_column(self): + """Indicator if it is feature.""" + return self._training_helper_column + + @training_helper_column.setter + def training_helper_column(self, training_helper_column): + self._training_helper_column = training_helper_column + @property def transformation_function(self): """Set transformation functions.""" diff --git a/python/tests/core/test_feature_view_engine.py b/python/tests/core/test_feature_view_engine.py index bde5d433f7..92985bd6ea 100644 --- a/python/tests/core/test_feature_view_engine.py +++ b/python/tests/core/test_feature_view_engine.py @@ -676,7 +676,7 @@ def test_create_training_dataset(self, mocker): # Act fv_engine.create_training_dataset( - feature_view_obj=None, training_dataset_obj=None, user_write_options=None + feature_view_obj=None, training_dataset_obj=None, user_write_options={} ) # Assert @@ -796,6 +796,7 @@ def test_get_training_data_type_in_memory(self, mocker): "hsfs.core.feature_view_engine.FeatureViewEngine._check_feature_group_accessibility" ) mocker.patch("hsfs.core.feature_view_engine.FeatureViewEngine.get_batch_query") + mock_fv_engine_compute_training_dataset_statistics = mocker.patch( "hsfs.core.feature_view_engine.FeatureViewEngine.compute_training_dataset_statistics" ) @@ -848,7 +849,9 @@ def test_get_training_data_splits(self, mocker): mocker.patch( "hsfs.core.feature_view_engine.FeatureViewEngine._check_feature_group_accessibility" ) + mocker.patch("hsfs.core.feature_view_engine.FeatureViewEngine.get_batch_query") + mock_fv_engine_compute_training_dataset_statistics = mocker.patch( "hsfs.core.feature_view_engine.FeatureViewEngine.compute_training_dataset_statistics" ) @@ -1064,7 +1067,7 @@ def test_recreate_training_dataset(self, mocker): fv_engine.recreate_training_dataset( feature_view_obj=None, training_dataset_version=None, - user_write_options=None, + user_write_options={}, ) # Assert @@ -1095,7 +1098,16 @@ def test_read_from_storage_connector(self, mocker): # Act fv_engine._read_from_storage_connector( - training_data_obj=td, splits=None, read_options=None + training_data_obj=td, + splits=None, + read_options=None, + with_primary_keys=None, + primary_keys=None, + with_event_time=None, + event_time=None, + with_training_helper_columns=None, + training_helper_columns=None, + feature_view_features=[], ) # Assert @@ -1132,7 +1144,16 @@ def test_read_from_storage_connector_splits(self, mocker): # Act fv_engine._read_from_storage_connector( - training_data_obj=td, splits=splits, read_options=None + training_data_obj=td, + splits=splits, + read_options=None, + with_primary_keys=None, + primary_keys=None, + with_event_time=None, + event_time=None, + with_training_helper_columns=None, + training_helper_columns=None, + feature_view_features=[], ) # Assert @@ -1168,7 +1189,16 @@ def test_read_dir_from_storage_connector(self, mocker): # Act fv_engine._read_dir_from_storage_connector( - training_data_obj=td, path="test", read_options=None + training_data_obj=td, + path="test", + read_options=None, + with_primary_keys=False, + primary_keys=[], + with_event_time=False, + event_time=[], + with_training_helper_columns=False, + training_helper_columns=[], + feature_view_features=[], ) # Assert @@ -1200,7 +1230,16 @@ def test_read_dir_from_storage_connector_file_not_found(self, mocker): # Act with pytest.raises(FileNotFoundError) as e_info: fv_engine._read_dir_from_storage_connector( - training_data_obj=td, path="test", read_options=None + training_data_obj=td, + path="test", + read_options=None, + with_primary_keys=None, + primary_keys=None, + with_event_time=None, + event_time=None, + with_training_helper_columns=None, + training_helper_columns=None, + feature_view_features=[], ) # Assert @@ -1237,7 +1276,7 @@ def test_compute_training_dataset(self, mocker): with pytest.raises(ValueError) as e_info: fv_engine.compute_training_dataset( feature_view_obj=None, - user_write_options=None, + user_write_options={}, training_dataset_obj=None, training_dataset_version=None, ) @@ -1283,7 +1322,7 @@ def test_compute_training_dataset_td(self, mocker): # Act fv_engine.compute_training_dataset( feature_view_obj=None, - user_write_options=None, + user_write_options={}, training_dataset_obj=td, training_dataset_version=None, ) @@ -1330,7 +1369,7 @@ def test_compute_training_dataset_td_version(self, mocker): # Act fv_engine.compute_training_dataset( feature_view_obj=None, - user_write_options=None, + user_write_options={}, training_dataset_obj=None, training_dataset_version=1, ) @@ -1375,7 +1414,7 @@ def test_compute_training_dataset_td_spark_type_split(self, mocker): # Act fv_engine.compute_training_dataset( feature_view_obj=None, - user_write_options=None, + user_write_options={}, training_dataset_obj=td, training_dataset_version=None, ) @@ -1420,7 +1459,7 @@ def test_compute_training_dataset_td_spark_type(self, mocker): # Act fv_engine.compute_training_dataset( feature_view_obj=None, - user_write_options=None, + user_write_options={}, training_dataset_obj=td, training_dataset_version=None, ) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 2d6dc74532..9a6a4d3e76 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -100,9 +100,17 @@ def create_fv_td(job_conf: Dict[Any, Any]) -> None: fv = fs.get_feature_view(name=job_conf["name"], version=job_conf["version"]) fv_engine = feature_view_engine.FeatureViewEngine(fv.featurestore_id) + user_write_options = job_conf.pop("write_options", {}) or {} + + training_helper_columns = user_write_options.get("training_helper_columns") + primary_keys = user_write_options.get("primary_keys") + event_time = user_write_options.get("event_time") fv_engine.compute_training_dataset( - fv, - job_conf.pop("write_options", {}) or {}, + feature_view_obj=fv, + user_write_options=user_write_options, + primary_keys=primary_keys, + event_time=event_time, + training_helper_columns=training_helper_columns, training_dataset_version=job_conf["td_version"], )