Skip to content

Commit

Permalink
[FSTORE-743] Add online support for External Feature Groups (#976)
Browse files Browse the repository at this point in the history
  • Loading branch information
moritzmeister authored Apr 4, 2023
1 parent 0223120 commit 7a934e2
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 208 deletions.
48 changes: 48 additions & 0 deletions python/hsfs/core/external_feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,54 @@ def save(self, feature_group):

self._feature_group_api.save(feature_group)

def insert(
self,
feature_group,
feature_dataframe,
write_options: dict,
validation_options: dict = {},
):
if not feature_group.online_enabled:
raise FeatureStoreException(
"Online storage is not enabled for this feature group. External feature groups can only store data in"
+ " online storage. To create an offline only external feature group, use the `save` method."
)

schema = engine.get_instance().parse_schema_feature_group(feature_dataframe)

if not feature_group._id:
# only save metadata if feature group does not exist
feature_group.features = schema
self.save(feature_group)
else:
# else, just verify that feature group schema matches user-provided dataframe
self._verify_schema_compatibility(feature_group.features, schema)

# ge validation on python and non stream feature groups on spark
ge_report = feature_group._great_expectation_engine.validate(
feature_group=feature_group,
dataframe=feature_dataframe,
validation_options=validation_options,
ingestion_result="INGESTED",
ge_type=False,
)

if ge_report is not None and ge_report.ingestion_result == "REJECTED":
return None, ge_report

return (
engine.get_instance().save_dataframe(
feature_group=feature_group,
dataframe=feature_dataframe,
operation=None,
online_enabled=feature_group.online_enabled,
storage="online",
offline_write_options=write_options,
online_write_options=write_options,
),
ge_report,
)

def _update_features_metadata(self, feature_group, features):
# perform changes on copy in case the update fails, so we don't leave
# the user object in corrupted state
Expand Down
47 changes: 47 additions & 0 deletions python/hsfs/core/feature_group_base_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

from hsfs.core import feature_group_api, storage_connector_api, tags_api, kafka_api
from hsfs.client.exceptions import FeatureStoreException


class FeatureGroupBaseEngine:
Expand Down Expand Up @@ -107,3 +108,49 @@ def new_feature_list(self, feature_group, updated_features):
):
new_features.append(feature)
return new_features + updated_features

def _verify_schema_compatibility(self, feature_group_features, dataframe_features):
err = []
feature_df_dict = {feat.name: feat.type for feat in dataframe_features}
for feature_fg in feature_group_features:
fg_type = feature_fg.type.lower().replace(" ", "")
# check if feature exists dataframe
if feature_fg.name in feature_df_dict:
df_type = feature_df_dict[feature_fg.name].lower().replace(" ", "")
# remove match from lookup table
del feature_df_dict[feature_fg.name]

# check if types match
if fg_type != df_type:
# don't check structs for exact match
if fg_type.startswith("struct") and df_type.startswith("struct"):
continue

err += [
f"{feature_fg.name} ("
f"expected type: '{fg_type}', "
f"derived from input: '{df_type}') has the wrong type."
]

else:
err += [
f"{feature_fg.name} (type: '{feature_fg.type}') is missing from "
f"input dataframe."
]

# any features that are left in lookup table are superfluous
for feature_df_name, feature_df_type in feature_df_dict.items():
err += [
f"{feature_df_name} (type: '{feature_df_type}') does not exist "
f"in feature group."
]

# raise exception if any errors were found.
if len(err) > 0:
raise FeatureStoreException(
"Features are not compatible with Feature Group schema: "
+ "".join(["\n - " + e for e in err])
)

def get_subject(self, feature_group):
return self._kafka_api.get_topic_subject(feature_group._online_topic_name)
47 changes: 0 additions & 47 deletions python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from hsfs import engine, client, util
from hsfs import feature_group as fg
from hsfs.client import exceptions
from hsfs.client.exceptions import FeatureStoreException
from hsfs.core import feature_group_base_engine, hudi_engine
from hsfs.core.deltastreamer_jobconf import DeltaStreamerJobConf

Expand Down Expand Up @@ -234,9 +233,6 @@ def update_description(self, feature_group, description):
feature_group, copy_feature_group, "updateMetadata"
)

def get_subject(self, feature_group):
return self._kafka_api.get_topic_subject(feature_group._online_topic_name)

def insert_stream(
self,
feature_group,
Expand Down Expand Up @@ -305,49 +301,6 @@ def insert_stream(

return streaming_query

def _verify_schema_compatibility(self, feature_group_features, dataframe_features):
err = []
feature_df_dict = {feat.name: feat.type for feat in dataframe_features}
for feature_fg in feature_group_features:
fg_type = feature_fg.type.lower().replace(" ", "")
# check if feature exists dataframe
if feature_fg.name in feature_df_dict:
df_type = feature_df_dict[feature_fg.name].lower().replace(" ", "")
# remove match from lookup table
del feature_df_dict[feature_fg.name]

# check if types match
if fg_type != df_type:
# don't check structs for exact match
if fg_type.startswith("struct") and df_type.startswith("struct"):
continue

err += [
f"{feature_fg.name} ("
f"expected type: '{fg_type}', "
f"derived from input: '{df_type}') has the wrong type."
]

else:
err += [
f"{feature_fg.name} (type: '{feature_fg.type}') is missing from "
f"input dataframe."
]

# any features that are left in lookup table are superfluous
for feature_df_name, feature_df_type in feature_df_dict.items():
err += [
f"{feature_df_name} (type: '{feature_df_type}') does not exist "
f"in feature group."
]

# raise exception if any errors were found.
if len(err) > 0:
raise FeatureStoreException(
"Features are not compatible with Feature Group schema: "
+ "".join(["\n - " + e for e in err])
)

def _save_feature_group_metadata(
self, feature_group, dataframe_features, write_options
):
Expand Down
15 changes: 11 additions & 4 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from sqlalchemy import sql

from hsfs import client, feature, util
from hsfs.feature_group import ExternalFeatureGroup
from hsfs.client.exceptions import FeatureStoreException
from hsfs.core import (
feature_group_api,
Expand Down Expand Up @@ -436,7 +437,10 @@ def save_dataframe(
online_write_options: dict,
validation_id: int = None,
):
if feature_group.stream:
if (
isinstance(feature_group, ExternalFeatureGroup)
and feature_group.online_enabled
) or feature_group.stream:
return self._write_dataframe_kafka(
feature_group, dataframe, offline_write_options
)
Expand Down Expand Up @@ -896,13 +900,16 @@ def acked(err, msg):
progress_bar.close()

# start backfilling job
if offline_write_options is not None and offline_write_options.get(
"start_offline_backfill", True
if (
not isinstance(feature_group, ExternalFeatureGroup)
and offline_write_options is not None
and offline_write_options.get("start_offline_backfill", True)
):
feature_group.backfill_job.run(
await_termination=offline_write_options.get("wait_for_job", True)
)

if isinstance(feature_group, ExternalFeatureGroup):
return None
return feature_group.backfill_job

def _kafka_produce(
Expand Down
6 changes: 5 additions & 1 deletion python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
pass

from hsfs import feature, training_dataset_feature, client, util
from hsfs.feature_group import ExternalFeatureGroup
from hsfs.storage_connector import StorageConnector
from hsfs.client.exceptions import FeatureStoreException
from hsfs.core import hudi_engine, transformation_function_engine, kafka_api
Expand Down Expand Up @@ -261,7 +262,10 @@ def save_dataframe(
validation_id=None,
):
try:
if feature_group.stream:
if (
isinstance(feature_group, ExternalFeatureGroup)
and feature_group.online_enabled
) or feature_group.stream:
self._save_online_dataframe(
feature_group, dataframe, online_write_options
)
Expand Down
Loading

0 comments on commit 7a934e2

Please sign in to comment.