From 7a934e246c77cd905c353a9c5c89e43c8dd7dd73 Mon Sep 17 00:00:00 2001 From: Moritz Meister <8422705+moritzmeister@users.noreply.github.com> Date: Tue, 4 Apr 2023 20:16:38 +0200 Subject: [PATCH] [FSTORE-743] Add online support for External Feature Groups (#976) --- .../core/external_feature_group_engine.py | 48 +++ python/hsfs/core/feature_group_base_engine.py | 47 +++ python/hsfs/core/feature_group_engine.py | 47 --- python/hsfs/engine/python.py | 15 +- python/hsfs/engine/spark.py | 6 +- python/hsfs/feature_group.py | 390 +++++++++++------- python/hsfs/feature_store.py | 27 ++ 7 files changed, 372 insertions(+), 208 deletions(-) diff --git a/python/hsfs/core/external_feature_group_engine.py b/python/hsfs/core/external_feature_group_engine.py index 6df5a959a6..389033be35 100644 --- a/python/hsfs/core/external_feature_group_engine.py +++ b/python/hsfs/core/external_feature_group_engine.py @@ -47,6 +47,54 @@ def save(self, feature_group): self._feature_group_api.save(feature_group) + def insert( + self, + feature_group, + feature_dataframe, + write_options: dict, + validation_options: dict = {}, + ): + if not feature_group.online_enabled: + raise FeatureStoreException( + "Online storage is not enabled for this feature group. External feature groups can only store data in" + + " online storage. To create an offline only external feature group, use the `save` method." + ) + + schema = engine.get_instance().parse_schema_feature_group(feature_dataframe) + + if not feature_group._id: + # only save metadata if feature group does not exist + feature_group.features = schema + self.save(feature_group) + else: + # else, just verify that feature group schema matches user-provided dataframe + self._verify_schema_compatibility(feature_group.features, schema) + + # ge validation on python and non stream feature groups on spark + ge_report = feature_group._great_expectation_engine.validate( + feature_group=feature_group, + dataframe=feature_dataframe, + validation_options=validation_options, + ingestion_result="INGESTED", + ge_type=False, + ) + + if ge_report is not None and ge_report.ingestion_result == "REJECTED": + return None, ge_report + + return ( + engine.get_instance().save_dataframe( + feature_group=feature_group, + dataframe=feature_dataframe, + operation=None, + online_enabled=feature_group.online_enabled, + storage="online", + offline_write_options=write_options, + online_write_options=write_options, + ), + ge_report, + ) + def _update_features_metadata(self, feature_group, features): # perform changes on copy in case the update fails, so we don't leave # the user object in corrupted state diff --git a/python/hsfs/core/feature_group_base_engine.py b/python/hsfs/core/feature_group_base_engine.py index 4fe38a56e9..86703d6d40 100644 --- a/python/hsfs/core/feature_group_base_engine.py +++ b/python/hsfs/core/feature_group_base_engine.py @@ -15,6 +15,7 @@ # from hsfs.core import feature_group_api, storage_connector_api, tags_api, kafka_api +from hsfs.client.exceptions import FeatureStoreException class FeatureGroupBaseEngine: @@ -107,3 +108,49 @@ def new_feature_list(self, feature_group, updated_features): ): new_features.append(feature) return new_features + updated_features + + def _verify_schema_compatibility(self, feature_group_features, dataframe_features): + err = [] + feature_df_dict = {feat.name: feat.type for feat in dataframe_features} + for feature_fg in feature_group_features: + fg_type = feature_fg.type.lower().replace(" ", "") + # check if feature exists dataframe + if feature_fg.name in feature_df_dict: + df_type = feature_df_dict[feature_fg.name].lower().replace(" ", "") + # remove match from lookup table + del feature_df_dict[feature_fg.name] + + # check if types match + if fg_type != df_type: + # don't check structs for exact match + if fg_type.startswith("struct") and df_type.startswith("struct"): + continue + + err += [ + f"{feature_fg.name} (" + f"expected type: '{fg_type}', " + f"derived from input: '{df_type}') has the wrong type." + ] + + else: + err += [ + f"{feature_fg.name} (type: '{feature_fg.type}') is missing from " + f"input dataframe." + ] + + # any features that are left in lookup table are superfluous + for feature_df_name, feature_df_type in feature_df_dict.items(): + err += [ + f"{feature_df_name} (type: '{feature_df_type}') does not exist " + f"in feature group." + ] + + # raise exception if any errors were found. + if len(err) > 0: + raise FeatureStoreException( + "Features are not compatible with Feature Group schema: " + + "".join(["\n - " + e for e in err]) + ) + + def get_subject(self, feature_group): + return self._kafka_api.get_topic_subject(feature_group._online_topic_name) diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index 6021933507..78773facb7 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -17,7 +17,6 @@ from hsfs import engine, client, util from hsfs import feature_group as fg from hsfs.client import exceptions -from hsfs.client.exceptions import FeatureStoreException from hsfs.core import feature_group_base_engine, hudi_engine from hsfs.core.deltastreamer_jobconf import DeltaStreamerJobConf @@ -234,9 +233,6 @@ def update_description(self, feature_group, description): feature_group, copy_feature_group, "updateMetadata" ) - def get_subject(self, feature_group): - return self._kafka_api.get_topic_subject(feature_group._online_topic_name) - def insert_stream( self, feature_group, @@ -305,49 +301,6 @@ def insert_stream( return streaming_query - def _verify_schema_compatibility(self, feature_group_features, dataframe_features): - err = [] - feature_df_dict = {feat.name: feat.type for feat in dataframe_features} - for feature_fg in feature_group_features: - fg_type = feature_fg.type.lower().replace(" ", "") - # check if feature exists dataframe - if feature_fg.name in feature_df_dict: - df_type = feature_df_dict[feature_fg.name].lower().replace(" ", "") - # remove match from lookup table - del feature_df_dict[feature_fg.name] - - # check if types match - if fg_type != df_type: - # don't check structs for exact match - if fg_type.startswith("struct") and df_type.startswith("struct"): - continue - - err += [ - f"{feature_fg.name} (" - f"expected type: '{fg_type}', " - f"derived from input: '{df_type}') has the wrong type." - ] - - else: - err += [ - f"{feature_fg.name} (type: '{feature_fg.type}') is missing from " - f"input dataframe." - ] - - # any features that are left in lookup table are superfluous - for feature_df_name, feature_df_type in feature_df_dict.items(): - err += [ - f"{feature_df_name} (type: '{feature_df_type}') does not exist " - f"in feature group." - ] - - # raise exception if any errors were found. - if len(err) > 0: - raise FeatureStoreException( - "Features are not compatible with Feature Group schema: " - + "".join(["\n - " + e for e in err]) - ) - def _save_feature_group_metadata( self, feature_group, dataframe_features, write_options ): diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 6c25c7a7f1..8c3fd1254d 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -44,6 +44,7 @@ from sqlalchemy import sql from hsfs import client, feature, util +from hsfs.feature_group import ExternalFeatureGroup from hsfs.client.exceptions import FeatureStoreException from hsfs.core import ( feature_group_api, @@ -436,7 +437,10 @@ def save_dataframe( online_write_options: dict, validation_id: int = None, ): - if feature_group.stream: + if ( + isinstance(feature_group, ExternalFeatureGroup) + and feature_group.online_enabled + ) or feature_group.stream: return self._write_dataframe_kafka( feature_group, dataframe, offline_write_options ) @@ -896,13 +900,16 @@ def acked(err, msg): progress_bar.close() # start backfilling job - if offline_write_options is not None and offline_write_options.get( - "start_offline_backfill", True + if ( + not isinstance(feature_group, ExternalFeatureGroup) + and offline_write_options is not None + and offline_write_options.get("start_offline_backfill", True) ): feature_group.backfill_job.run( await_termination=offline_write_options.get("wait_for_job", True) ) - + if isinstance(feature_group, ExternalFeatureGroup): + return None return feature_group.backfill_job def _kafka_produce( diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 60f5cbf199..5be74a1d29 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -66,6 +66,7 @@ pass from hsfs import feature, training_dataset_feature, client, util +from hsfs.feature_group import ExternalFeatureGroup from hsfs.storage_connector import StorageConnector from hsfs.client.exceptions import FeatureStoreException from hsfs.core import hudi_engine, transformation_function_engine, kafka_api @@ -261,7 +262,10 @@ def save_dataframe( validation_id=None, ): try: - if feature_group.stream: + if ( + isinstance(feature_group, ExternalFeatureGroup) + and feature_group.online_enabled + ) or feature_group.stream: self._save_online_dataframe( feature_group, dataframe, online_write_options ) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 8d5f9bcf32..729a74c6ef 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -57,9 +57,25 @@ class FeatureGroupBase: - def __init__(self, featurestore_id, location, event_time=None): + def __init__( + self, + featurestore_id, + location, + event_time=None, + online_enabled=False, + id=None, + expectation_suite=None, + online_topic_name=None, + ): self.event_time = event_time + self._online_enabled = online_enabled self._location = location + self._id = id + self._subject = None + self._online_topic_name = online_topic_name + self._feature_store_id = featurestore_id + # use setter for correct conversion + self.expectation_suite = expectation_suite self._statistics_engine = statistics_engine.StatisticsEngine( featurestore_id, self.ENTITY_TYPE ) @@ -67,8 +83,31 @@ def __init__(self, featurestore_id, location, event_time=None): self._great_expectation_engine = ( great_expectation_engine.GreatExpectationEngine(featurestore_id) ) + if self._id is not None: + if expectation_suite: + self._expectation_suite._init_expectation_engine( + feature_store_id=featurestore_id, feature_group_id=self._id + ) + self._expectation_suite_engine = ( + expectation_suite_engine.ExpectationSuiteEngine( + feature_store_id=featurestore_id, feature_group_id=self._id + ) + ) + self._validation_report_engine = ( + validation_report_engine.ValidationReportEngine( + featurestore_id, self._id + ) + ) + self._validation_result_engine = ( + validation_result_engine.ValidationResultEngine( + featurestore_id, self._id + ) + ) + self._feature_store_id = featurestore_id self._variable_api = VariableApi() + self._feature_group_engine = None + self._multi_part_insert = False def delete(self): """Drop the entire feature group along with its feature data. @@ -973,6 +1012,71 @@ def get_validation_history( "Only Feature Group registered with Hopsworks can fetch validation history." ) + def validate( + self, + dataframe: Optional[ + Union[pd.DataFrame, TypeVar("pyspark.sql.DataFrame")] # noqa: F821 + ] = None, + expectation_suite: Optional[ExpectationSuite] = None, + save_report: Optional[bool] = False, + validation_options: Optional[Dict[Any, Any]] = {}, + ingestion_result: str = "UNKNOWN", + ge_type: bool = True, + ) -> Union[ge.core.ExpectationSuiteValidationResult, ValidationReport, None]: + """Run validation based on the attached expectations. + + Runs any expectation attached with Deequ. But also runs attached Great Expectation + Suites. + + !!! example + ```python + # connect to the Feature Store + fs = ... + + # get feature group instance + fg = fs.get_or_create_feature_group(...) + + ge_report = fg.validate(df, save_report=False) + ``` + + # Arguments + dataframe: The dataframe to run the data validation expectations against. + expectation_suite: Optionally provide an Expectation Suite to override the + one that is possibly attached to the feature group. This is useful for + testing new Expectation suites. When an extra suite is provided, the results + will never be persisted. Defaults to `None`. + validation_options: Additional validation options as key-value pairs, defaults to `{}`. + * key `run_validation` boolean value, set to `False` to skip validation temporarily on ingestion. + * key `ge_validate_kwargs` a dictionary containing kwargs for the validate method of Great Expectations. + ingestion_result: Specify the fate of the associated data, defaults + to "UNKNOWN". Supported options are "UNKNOWN", "INGESTED", "REJECTED", + "EXPERIMENT", "FG_DATA". Use "INGESTED" or "REJECTED" for validation + of DataFrames to be inserted in the Feature Group. Use "EXPERIMENT" + for testing and development and "FG_DATA" when validating data + already in the Feature Group. + save_report: Whether to save the report to the backend. This is only possible if the Expectation suite + is initialised and attached to the Feature Group. Defaults to False. + ge_type: Whether to return a Great Expectations object or Hopsworks own abstraction. Defaults to True. + + # Returns + A Validation Report produced by Great Expectations. + """ + # Activity is logged only if a the validation concerns the feature group and not a specific dataframe + if dataframe is None: + dataframe = self.read() + if ingestion_result == "UNKNOWN": + ingestion_result = "FG_DATA" + + return self._great_expectation_engine.validate( + self, + dataframe=engine.get_instance().convert_to_default_dataframe(dataframe), + expectation_suite=expectation_suite, + save_report=save_report, + validation_options=validation_options, + ingestion_result=ingestion_result, + ge_type=ge_type, + ) + def __getattr__(self, name): try: return self.__getitem__(name) @@ -1175,6 +1279,59 @@ def expectation_suite( ) ) + @property + def online_enabled(self): + """Setting if the feature group is available in online storage.""" + return self._online_enabled + + @online_enabled.setter + def online_enabled(self, online_enabled): + self._online_enabled = online_enabled + + @property + def subject(self): + """Subject of the feature group.""" + if self._subject is None: + # cache the schema + self._subject = self._feature_group_engine.get_subject(self) + return self._subject + + @property + def avro_schema(self): + """Avro schema representation of the feature group.""" + return self.subject["schema"] + + def get_complex_features(self): + """Returns the names of all features with a complex data type in this + feature group. + + !!! example + ```python + complex_dtype_features = fg.get_complex_features() + ``` + """ + return [f.name for f in self.features if f.is_complex()] + + def _get_encoded_avro_schema(self): + complex_features = self.get_complex_features() + schema = json.loads(self.avro_schema) + + for field in schema["fields"]: + if field["name"] in complex_features: + field["type"] = ["null", "bytes"] + + schema_s = json.dumps(schema) + try: + avro.schema.parse(schema_s) + except avro.schema.SchemaParseException as e: + raise FeatureStoreException("Failed to construct Avro Schema: {}".format(e)) + return schema_s + + def _get_feature_avro_schema(self, feature_name): + for field in json.loads(self.avro_schema)["fields"]: + if field["name"] == feature_name: + return json.dumps(field["type"]) + class FeatureGroup(FeatureGroupBase): CACHED_FEATURE_GROUP = "CACHED_FEATURE_GROUP" @@ -1206,7 +1363,15 @@ def __init__( parents=None, href=None, ): - super().__init__(featurestore_id, location, event_time=event_time) + super().__init__( + featurestore_id, + location, + event_time=event_time, + online_enabled=online_enabled, + id=id, + expectation_suite=expectation_suite, + online_topic_name=online_topic_name, + ) self._feature_store_name = featurestore_name self._description = description @@ -1214,19 +1379,15 @@ def __init__( self._creator = user.User.from_response_json(creator) self._version = version self._name = name - self._id = id self._features = [ feature.Feature.from_response_json(feat) if isinstance(feat, dict) else feat for feat in (features or []) ] - self._online_enabled = online_enabled self._time_travel_format = ( time_travel_format.upper() if time_travel_format is not None else None ) - self._subject = None - self._online_topic_name = online_topic_name self._stream = stream self._parents = parents self._deltastreamer_jobconf = None @@ -1256,26 +1417,6 @@ def __init__( self._hudi_precombine_key = None self.statistics_config = statistics_config - self.expectation_suite = expectation_suite - if expectation_suite: - self._expectation_suite._init_expectation_engine( - feature_store_id=featurestore_id, feature_group_id=self._id - ) - self._expectation_suite_engine = ( - expectation_suite_engine.ExpectationSuiteEngine( - feature_store_id=self._feature_store_id, feature_group_id=self._id - ) - ) - self._validation_report_engine = ( - validation_report_engine.ValidationReportEngine( - self._feature_store_id, self._id - ) - ) - self._validation_result_engine = ( - validation_result_engine.ValidationResultEngine( - self._feature_store_id, self._id - ) - ) else: # initialized by user @@ -1305,7 +1446,6 @@ def __init__( else None ) self.statistics_config = statistics_config - self.expectation_suite = expectation_suite self._feature_group_engine = feature_group_engine.FeatureGroupEngine( featurestore_id @@ -1313,7 +1453,6 @@ def __init__( self._href = href # cache for optimized writes - self._multi_part_insert = False self._kafka_producer = None self._feature_writers = None self._writer = None @@ -2095,71 +2234,6 @@ def as_of( """ return self.select_all().as_of(wallclock_time, exclude_until) - def validate( - self, - dataframe: Optional[ - Union[pd.DataFrame, TypeVar("pyspark.sql.DataFrame")] # noqa: F821 - ] = None, - expectation_suite: Optional[ExpectationSuite] = None, - save_report: Optional[bool] = False, - validation_options: Optional[Dict[Any, Any]] = {}, - ingestion_result: str = "UNKNOWN", - ge_type: bool = True, - ) -> Union[ge.core.ExpectationSuiteValidationResult, ValidationReport, None]: - """Run validation based on the attached expectations. - - Runs any expectation attached with Deequ. But also runs attached Great Expectation - Suites. - - !!! example - ```python - # connect to the Feature Store - fs = ... - - # get feature group instance - fg = fs.get_or_create_feature_group(...) - - ge_report = fg.validate(df, save_report=False) - ``` - - # Arguments - dataframe: The dataframe to run the data validation expectations against. - expectation_suite: Optionally provide an Expectation Suite to override the - one that is possibly attached to the feature group. This is useful for - testing new Expectation suites. When an extra suite is provided, the results - will never be persisted. Defaults to `None`. - validation_options: Additional validation options as key-value pairs, defaults to `{}`. - * key `run_validation` boolean value, set to `False` to skip validation temporarily on ingestion. - * key `ge_validate_kwargs` a dictionary containing kwargs for the validate method of Great Expectations. - ingestion_result: Specify the fate of the associated data, defaults - to "UNKNOWN". Supported options are "UNKNOWN", "INGESTED", "REJECTED", - "EXPERIMENT", "FG_DATA". Use "INGESTED" or "REJECTED" for validation - of DataFrames to be inserted in the Feature Group. Use "EXPERIMENT" - for testing and development and "FG_DATA" when validating data - already in the Feature Group. - save_report: Whether to save the report to the backend. This is only possible if the Expectation suite - is initialised and attached to the Feature Group. Defaults to False. - ge_type: Whether to return a Great Expectations object or Hopsworks own abstraction. Defaults to True. - - # Returns - A Validation Report produced by Great Expectations. - """ - # Activity is logged only if a the validation concerns the feature group and not a specific dataframe - if dataframe is None: - dataframe = self.read() - if ingestion_result == "UNKNOWN": - ingestion_result = "FG_DATA" - - return self._great_expectation_engine.validate( - self, - dataframe=engine.get_instance().convert_to_default_dataframe(dataframe), - expectation_suite=expectation_suite, - save_report=save_report, - validation_options=validation_options, - ingestion_result=ingestion_result, - ge_type=ge_type, - ) - def compute_statistics( self, wallclock_time: Optional[Union[str, int, datetime, date]] = None ): @@ -2289,37 +2363,6 @@ def _get_table_name(self): def _get_online_table_name(self): return self.name + "_" + str(self.version) - def get_complex_features(self): - """Returns the names of all features with a complex data type in this - feature group. - - !!! example - ```python - complex_dtype_features = fg.get_complex_features() - ``` - """ - return [f.name for f in self.features if f.is_complex()] - - def _get_encoded_avro_schema(self): - complex_features = self.get_complex_features() - schema = json.loads(self.avro_schema) - - for field in schema["fields"]: - if field["name"] in complex_features: - field["type"] = ["null", "bytes"] - - schema_s = json.dumps(schema) - try: - avro.schema.parse(schema_s) - except avro.schema.SchemaParseException as e: - raise FeatureStoreException("Failed to construct Avro Schema: {}".format(e)) - return schema_s - - def _get_feature_avro_schema(self, feature_name): - for field in json.loads(self.avro_schema)["fields"]: - if field["name"] == feature_name: - return json.dumps(field["type"]) - @property def id(self): """Feature group id.""" @@ -2345,11 +2388,6 @@ def features(self): """Schema information.""" return self._features - @property - def online_enabled(self): - """Setting if the feature group is available in online storage.""" - return self._online_enabled - @property def time_travel_format(self): """Setting of the feature group time travel format.""" @@ -2384,19 +2422,6 @@ def created(self): """Timestamp when the feature group was created.""" return self._created - @property - def subject(self): - """Subject of the feature group.""" - if self._subject is None: - # cache the schema - self._subject = self._feature_group_engine.get_subject(self) - return self._subject - - @property - def avro_schema(self): - """Avro schema representation of the feature group.""" - return self.subject["schema"] - @property def stream(self): """Whether to enable real time stream writing capabilities.""" @@ -2443,10 +2468,6 @@ def partition_key(self, new_partition_key): def hudi_precombine_key(self, hudi_precombine_key): self._hudi_precombine_key = hudi_precombine_key.lower() - @online_enabled.setter - def online_enabled(self, new_online_enabled): - self._online_enabled = new_online_enabled - @stream.setter def stream(self, stream): self._stream = stream @@ -2481,9 +2502,20 @@ def __init__( statistics_config=None, event_time=None, expectation_suite=None, + online_enabled=False, href=None, + online_topic_name=None, ): - super().__init__(featurestore_id, location, event_time=event_time) + super().__init__( + featurestore_id, + location, + event_time=event_time, + online_enabled=online_enabled, + id=id, + expectation_suite=expectation_suite, + online_topic_name=online_topic_name, + ) + self._feature_store_name = featurestore_name self._description = description self._created = created @@ -2493,8 +2525,6 @@ def __init__( self._query = query self._data_format = data_format.upper() if data_format else None self._path = path - self._id = id - self._expectation_suite = expectation_suite self._features = [ feature.Feature.from_response_json(feat) if isinstance(feat, dict) else feat @@ -2523,7 +2553,6 @@ def __init__( else [] ) self.statistics_config = statistics_config - self.expectation_suite = expectation_suite self._options = ( {option["name"]: option["value"] for option in options} @@ -2533,7 +2562,6 @@ def __init__( else: self.primary_key = primary_key self.statistics_config = statistics_config - self.expectation_suite = expectation_suite self._features = features self._options = options @@ -2544,7 +2572,6 @@ def __init__( else: self._storage_connector = storage_connector - self.expectation_suite = expectation_suite self._href = href def save(self): @@ -2573,7 +2600,50 @@ def save(self): if self.statistics_config.enabled: self._statistics_engine.compute_statistics(self) - def read(self, dataframe_type="default"): + def insert( + self, + features: Union[ + pd.DataFrame, + TypeVar("pyspark.sql.DataFrame"), # noqa: F821 + TypeVar("pyspark.RDD"), # noqa: F821 + np.ndarray, + List[list], + ], + write_options: Optional[Dict[str, Any]] = {}, + validation_options: Optional[Dict[str, Any]] = {}, + save_code: Optional[bool] = True, + ) -> Tuple[Optional[Job], Optional[ValidationReport]]: + feature_dataframe = engine.get_instance().convert_to_default_dataframe(features) + + job, ge_report = self._feature_group_engine.insert( + self, + feature_dataframe=feature_dataframe, + write_options=write_options, + validation_options={"save_report": True, **validation_options}, + ) + + if save_code and ( + ge_report is None or ge_report.ingestion_result == "INGESTED" + ): + self._code_engine.save_code(self) + + if self.statistics_config.enabled: + warnings.warn( + ( + "Statistics are not computed for insertion to online enabled external feature group `{}`, with version" + " `{}`. Call `compute_statistics` explicitly to compute statistics over the data in the external storage system." + ).format(self._name, self._version), + util.StorageWarning, + ) + + return ( + job, + ge_report.to_ge_type() if ge_report is not None else None, + ) + + def read( + self, dataframe_type: Optional[str] = "default", online: Optional[bool] = False + ): """Get the feature group as a DataFrame. !!! example @@ -2598,6 +2668,8 @@ def read(self, dataframe_type="default"): # Arguments dataframe_type: str, optional. Possible values are `"default"`, `"spark"`, `"pandas"`, `"numpy"` or `"python"`, defaults to `"default"`. + online: bool, optional. If `True` read from online feature store, defaults + to `False`. # Returns `DataFrame`: The spark dataframe containing the feature data. @@ -2609,13 +2681,20 @@ def read(self, dataframe_type="default"): # Raises `hsfs.client.exceptions.RestAPIError`. """ + if engine.get_type() == "python": + raise FeatureStoreException( + "Reading an External Feature Group directly into a Pandas Dataframe using " + + "Python/Pandas as Engine is not supported, however, you can use the " + + "Query API to create Feature Views/Training Data containing External " + + "Feature Groups." + ) engine.get_instance().set_job_group( "Fetching Feature group", "Getting feature group: {} from the featurestore {}".format( self._name, self._feature_store_name ), ) - return self.select_all().read(dataframe_type=dataframe_type) + return self.select_all().read(dataframe_type=dataframe_type, online=online) def show(self, n): """Show the first n rows of the feature group. @@ -2643,11 +2722,9 @@ def show(self, n): def from_response_json(cls, json_dict): json_decamelized = humps.decamelize(json_dict) if isinstance(json_decamelized, dict): - _ = json_decamelized.pop("online_topic_name", None) _ = json_decamelized.pop("type", None) return cls(**json_decamelized) for fg in json_decamelized: - _ = fg.pop("online_topic_name", None) _ = fg.pop("type", None) return [cls(**fg) for fg in json_decamelized] @@ -2680,6 +2757,7 @@ def to_dict(self): "statisticsConfig": self._statistics_config, "eventTime": self._event_time, "expectationSuite": self._expectation_suite, + "onlineEnabled": self._online_enabled, } @property diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index cf96e74007..fbda5b37e0 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -784,6 +784,7 @@ def create_external_feature_group( expectation_suite: Optional[ Union[expectation_suite.ExpectationSuite, ge.core.ExpectationSuite] ] = None, + online_enabled: Optional[bool] = False, ): """Create a external feature group metadata object. @@ -808,6 +809,29 @@ def create_external_feature_group( feature store on its own. To persist the feature group metadata in the feature store, call the `save()` method. + You can enable online storage for external feature groups, however, the sync from the + external storage to Hopsworks online storage needs to be done manually: + + ```python + external_fg = fs.create_external_feature_group( + name="sales", + version=1, + description="Physical shop sales features", + query=query, + storage_connector=connector, + primary_key=['ss_store_sk'], + event_time='sale_date', + online_enabled=True + ) + external_fg.save() + + # read from external storage and filter data to sync to online + df = external_fg.read().filter(external_fg.customer_status == "active") + + # insert to online storage + external_fg.insert(df) + ``` + # Arguments name: Name of the external feature group to create. storage_connector: the storage connector to use to establish connectivity @@ -854,6 +878,8 @@ def create_external_feature_group( expectation_suite: Optionally, attach an expectation suite to the feature group which dataframes should be validated against upon insertion. Defaults to `None`. + online_enabled: Define whether it should be possible to sync the feature group to + the online feature store for low latency access, defaults to `False`. # Returns `ExternalFeatureGroup`. The external feature group metadata object. @@ -874,6 +900,7 @@ def create_external_feature_group( statistics_config=statistics_config, event_time=event_time, expectation_suite=expectation_suite, + online_enabled=online_enabled, ) def create_training_dataset(