Skip to content

Commit

Permalink
[FSTORE-980] Helper, primary key and event time columns with feature …
Browse files Browse the repository at this point in the history
…view (#1111)
  • Loading branch information
davitbzh authored Nov 21, 2023
1 parent 1685adf commit 6a6ab76
Show file tree
Hide file tree
Showing 10 changed files with 854 additions and 129 deletions.
17 changes: 15 additions & 2 deletions python/hsfs/core/feature_view_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ def get_batch_query(
end_time,
training_dataset_version=None,
with_label=False,
primary_keys=False,
event_time=False,
inference_helper_columns=False,
training_helper_columns=False,
is_python_engine=False,
):
path = self._base_path + [
Expand All @@ -145,21 +149,30 @@ def get_batch_query(
"start_time": start_time,
"end_time": end_time,
"with_label": with_label,
"with_primary_keys": primary_keys,
"with_event_time": event_time,
"inference_helper_columns": inference_helper_columns,
"training_helper_columns": training_helper_columns,
"is_hive_engine": is_python_engine,
"td_version": training_dataset_version,
},
)
)

def get_serving_prepared_statement(self, name, version, batch):
def get_serving_prepared_statement(
self, name, version, batch, inference_helper_columns
):
path = self._base_path + [
name,
self._VERSION,
version,
self._PREPARED_STATEMENT,
]
headers = {"content-type": "application/json"}
query_params = {"batch": batch}
query_params = {
"batch": batch,
"inference_helper_columns": inference_helper_columns,
}
return serving_prepared_statement.ServingPreparedStatement.from_response_json(
self._client._send_request("GET", path, query_params, headers=headers)
)
Expand Down
193 changes: 186 additions & 7 deletions python/hsfs/core/feature_view_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,36 @@ def save(self, feature_view_obj):
featuregroup=featuregroup,
)
)
if feature_view_obj.inference_helper_columns:
for helper_column_name in feature_view_obj.inference_helper_columns:
(
feature,
prefix,
featuregroup,
) = feature_view_obj.query._get_feature_by_name(helper_column_name)
feature_view_obj._features.append(
training_dataset_feature.TrainingDatasetFeature(
name=feature.name,
inference_helper_column=True,
featuregroup=featuregroup,
)
)

if feature_view_obj.training_helper_columns:
for helper_column_name in feature_view_obj.training_helper_columns:
(
feature,
prefix,
featuregroup,
) = feature_view_obj.query._get_feature_by_name(helper_column_name)
feature_view_obj._features.append(
training_dataset_feature.TrainingDatasetFeature(
name=feature.name,
training_helper_column=True,
featuregroup=featuregroup,
)
)

self._transformation_function_engine.attach_transformation_fn(feature_view_obj)
updated_fv = self._feature_view_api.post(feature_view_obj)
self.attach_transformation_function(updated_fv)
Expand Down Expand Up @@ -124,6 +154,10 @@ def get_batch_query(
start_time,
end_time,
with_label=False,
primary_keys=False,
event_time=False,
inference_helper_columns=False,
training_helper_columns=False,
training_dataset_version=None,
spine=None,
):
Expand All @@ -136,6 +170,10 @@ def get_batch_query(
training_dataset_version=training_dataset_version,
is_python_engine=engine.get_type() == "python",
with_label=with_label,
primary_keys=primary_keys,
event_time=event_time,
inference_helper_columns=inference_helper_columns,
training_helper_columns=training_helper_columns,
)
# verify whatever is passed 1. spine group with dataframe contained, or 2. dataframe
# the schema has to be consistent
Expand Down Expand Up @@ -214,7 +252,14 @@ def get_attached_transformation_fn(self, name, version):
return transformation_functions_dict

def create_training_dataset(
self, feature_view_obj, training_dataset_obj, user_write_options, spine=None
self,
feature_view_obj,
training_dataset_obj,
user_write_options,
spine=None,
primary_keys=False,
event_time=False,
training_helper_columns=False,
):
self._set_event_time(feature_view_obj, training_dataset_obj)
updated_instance = self._create_training_data_metadata(
Expand All @@ -225,6 +270,9 @@ def create_training_dataset(
user_write_options,
training_dataset_obj=training_dataset_obj,
spine=spine,
primary_keys=primary_keys,
event_time=event_time,
training_helper_columns=training_helper_columns,
)
return updated_instance, td_job

Expand All @@ -236,6 +284,9 @@ def get_training_data(
training_dataset_obj=None,
training_dataset_version=None,
spine=None,
primary_keys=False,
event_time=False,
training_helper_columns=False,
):
# check if provided td version has already existed.
if training_dataset_version:
Expand Down Expand Up @@ -265,7 +316,18 @@ def get_training_data(

if td_updated.training_dataset_type != td_updated.IN_MEMORY:
split_df = self._read_from_storage_connector(
td_updated, td_updated.splits, read_options, feature_view_obj.schema
td_updated,
td_updated.splits,
read_options,
with_primary_keys=primary_keys,
primary_keys=self._get_primary_keys_from_query(feature_view_obj.query),
with_event_time=event_time,
event_time=[feature_view_obj.query._left_feature_group.event_time],
with_training_helper_columns=training_helper_columns,
training_helper_columns=feature_view_obj.training_helper_columns,
feature_view_features=[
feature.name for feature in feature_view_obj.features
],
)
else:
self._check_feature_group_accessibility(feature_view_obj)
Expand All @@ -275,6 +337,10 @@ def get_training_data(
start_time=td_updated.event_start_time,
end_time=td_updated.event_end_time,
with_label=True,
inference_helper_columns=False,
primary_keys=primary_keys,
event_time=event_time,
training_helper_columns=training_helper_columns,
spine=spine,
)
split_df = engine.get_instance().get_training_data(
Expand Down Expand Up @@ -349,20 +415,49 @@ def recreate_training_dataset(
return training_dataset_obj, td_job

def _read_from_storage_connector(
self, training_data_obj, splits, read_options, schema=None
self,
training_data_obj,
splits,
read_options,
with_primary_keys,
primary_keys,
with_event_time,
event_time,
with_training_helper_columns,
training_helper_columns,
feature_view_features,
):

if splits:
result = {}
for split in splits:
path = training_data_obj.location + "/" + str(split.name)
result[split.name] = self._read_dir_from_storage_connector(
training_data_obj, path, read_options
training_data_obj,
path,
read_options,
with_primary_keys,
primary_keys,
with_event_time,
event_time,
with_training_helper_columns,
training_helper_columns,
feature_view_features,
)
return result
else:
path = training_data_obj.location + "/" + training_data_obj.name
return self._read_dir_from_storage_connector(
training_data_obj, path, read_options
training_data_obj,
path,
read_options,
with_primary_keys,
primary_keys,
with_event_time,
event_time,
with_training_helper_columns,
training_helper_columns,
feature_view_features,
)

def _cast_columns(self, data_format, df, schema):
Expand All @@ -373,15 +468,43 @@ def _cast_columns(self, data_format, df, schema):
else:
return df

def _read_dir_from_storage_connector(self, training_data_obj, path, read_options):
def _read_dir_from_storage_connector(
self,
training_data_obj,
path,
read_options,
with_primary_keys,
primary_keys,
with_event_time,
event_time,
with_training_helper_columns,
training_helper_columns,
feature_view_features,
):
try:
return training_data_obj.storage_connector.read(
df = training_data_obj.storage_connector.read(
# always read from materialized dataset, not query object
query=None,
data_format=training_data_obj.data_format,
options=read_options,
path=path,
)

df = self._drop_helper_columns(
df, feature_view_features, with_primary_keys, primary_keys, False
)
df = self._drop_helper_columns(
df, feature_view_features, with_event_time, event_time, False
)
df = self._drop_helper_columns(
df,
feature_view_features,
with_training_helper_columns,
training_helper_columns,
True,
)
return df

except Exception as e:
if isinstance(e, FileNotFoundError):
raise FileNotFoundError(
Expand All @@ -391,6 +514,23 @@ def _read_dir_from_storage_connector(self, training_data_obj, path, read_options
else:
raise e

def _drop_helper_columns(
self, df, feature_view_features, with_columns, columns, training_helper
):
if not with_columns:
if engine.get_type() == "spark":
existing_cols = [field.name for field in df.schema.fields]
else:
existing_cols = df.columns
# primary keys and event time are dropped only if they are in the query
drop_cols = list(set(existing_cols).intersection(columns))
# training helper is always in the query
if not training_helper:
drop_cols = list(set(drop_cols).difference(feature_view_features))
if drop_cols:
df = engine.get_instance().drop_columns(df, drop_cols)
return df

# This method is used by hsfs_utils to launch a job for python client
def compute_training_dataset(
self,
Expand All @@ -399,6 +539,9 @@ def compute_training_dataset(
training_dataset_obj=None,
training_dataset_version=None,
spine=None,
primary_keys=False,
event_time=False,
training_helper_columns=False,
):
if training_dataset_obj:
pass
Expand All @@ -414,9 +557,19 @@ def compute_training_dataset(
training_dataset_obj.event_start_time,
training_dataset_obj.event_end_time,
with_label=True,
primary_keys=primary_keys,
event_time=event_time,
inference_helper_columns=False,
training_helper_columns=training_helper_columns,
training_dataset_version=training_dataset_obj.version,
spine=spine,
)

# for spark job
user_write_options["training_helper_columns"] = training_helper_columns
user_write_options["primary_keys"] = primary_keys
user_write_options["event_time"] = event_time

td_job = engine.get_instance().write_training_dataset(
training_dataset_obj,
batch_query,
Expand Down Expand Up @@ -523,6 +676,9 @@ def get_batch_data(
transformation_functions,
read_options=None,
spine=None,
primary_keys=False,
event_time=False,
inference_helper_columns=False,
):
self._check_feature_group_accessibility(feature_view_obj)

Expand All @@ -531,6 +687,10 @@ def get_batch_data(
start_time,
end_time,
with_label=False,
primary_keys=primary_keys,
event_time=event_time,
inference_helper_columns=inference_helper_columns,
training_helper_columns=False,
training_dataset_version=training_dataset_version,
spine=spine,
).read(read_options=read_options)
Expand Down Expand Up @@ -616,3 +776,22 @@ def _get_feature_view_url(self, feature_view):
+ str(feature_view.version)
)
return util.get_hostname_replaced_url(path)

def _get_primary_keys_from_query(self, fv_query_obj):
fv_pks = set(
[
feature.name
for feature in fv_query_obj._left_feature_group.features
if feature.primary
]
)
for _join in fv_query_obj._joins:
fv_pks.update(
[
feature.name
for feature in _join.query._left_feature_group.features
if feature.primary
]
)

return list(fv_pks)
Loading

0 comments on commit 6a6ab76

Please sign in to comment.