Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-980] Helper, primary key and event time columns with feature view #1111

Merged
merged 44 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
1e90007
with_extra_features
davitbzh Aug 28, 2023
fcc243f
Merge remote-tracking branch 'upstream/master' into fv_extra_features
davitbzh Aug 28, 2023
662650c
extra_features in td
davitbzh Aug 29, 2023
71126a1
extra features in config
davitbzh Aug 31, 2023
0e0d1a2
helper_columns
davitbzh Sep 7, 2023
d546742
Merge remote-tracking branch 'upstream/master' into fv_extra_features
davitbzh Sep 7, 2023
0958398
helperColumn
davitbzh Sep 7, 2023
6e4ab59
Merge remote-tracking branch 'upstream/master' into fv_extra_features
davitbzh Sep 7, 2023
2c72412
Merge remote-tracking branch 'upstream/master' into fv_extra_features
davitbzh Sep 14, 2023
44ceff1
hepler columns
davitbzh Sep 17, 2023
14764e1
inference, training helper columns
davitbzh Sep 17, 2023
3eb9c1d
Merge remote-tracking branch 'upstream/master' into fv_extra_features
davitbzh Oct 4, 2023
6ade109
inference helper columns for online
davitbzh Oct 13, 2023
46c8d06
inference helper columns for online
davitbzh Oct 13, 2023
378439c
drop columns for td
davitbzh Oct 13, 2023
1463c44
Merge remote-tracking branch 'upstream/master' into fv_extra_features
davitbzh Oct 13, 2023
fd5f5ce
Merge remote-tracking branch 'upstream/master' into fv_extra_features
davitbzh Oct 14, 2023
eb8781a
docs
davitbzh Oct 16, 2023
f8942a5
address comments
davitbzh Oct 25, 2023
a09b21e
remove comments
davitbzh Oct 26, 2023
fbcda11
result dict in vector server
davitbzh Oct 27, 2023
f1a1356
drop serving keyes for batch serving
davitbzh Oct 27, 2023
fa5fa8b
drop pks and serving keys from helper columns
davitbzh Oct 27, 2023
cfabeec
drop pks and serving keys from helper columns if not part of it
davitbzh Oct 27, 2023
99ee962
Merge remote-tracking branch 'upstream/master' into fv_extra_features
davitbzh Oct 30, 2023
9cb1a2d
fix unit tests
davitbzh Nov 1, 2023
e9c80c3
fix unit tests
davitbzh Nov 1, 2023
357d159
fix unit tests
davitbzh Nov 1, 2023
6e757a0
docs
davitbzh Nov 1, 2023
896271a
fix unit tests
davitbzh Nov 1, 2023
87e71c4
fix unit tests
davitbzh Nov 1, 2023
6e57ce5
fix unit tests
davitbzh Nov 1, 2023
40f78a5
Merge remote-tracking branch 'upstream/master' into fv_extra_features
davitbzh Nov 2, 2023
0c177c0
Merge remote-tracking branch 'upstream/master' into fv_extra_features
davitbzh Nov 5, 2023
f1b7543
fix tests
davitbzh Nov 6, 2023
9de62e5
fix pks
davitbzh Nov 6, 2023
bf49e7c
don't drop columns if they exist in query
davitbzh Nov 9, 2023
181c002
improve drop columns function
davitbzh Nov 10, 2023
bb8378e
improve drop columns function
davitbzh Nov 10, 2023
339f732
helper columns
davitbzh Nov 10, 2023
41f2080
helper columns
davitbzh Nov 11, 2023
3019b9f
helper col
davitbzh Nov 11, 2023
bb23f75
Merge remote-tracking branch 'upstream/master' into fv_extra_features
davitbzh Nov 20, 2023
d273738
Merge branch 'master' into fv_extra_features
SirOibaf Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions python/hsfs/core/feature_view_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ def get_batch_query(
end_time,
training_dataset_version=None,
with_label=False,
primary_keys=False,
event_time=False,
inference_helper_columns=False,
training_helper_columns=False,
is_python_engine=False,
):
path = self._base_path + [
Expand All @@ -145,21 +149,30 @@ def get_batch_query(
"start_time": start_time,
"end_time": end_time,
"with_label": with_label,
"with_primary_keys": primary_keys,
"with_event_time": event_time,
"inference_helper_columns": inference_helper_columns,
"training_helper_columns": training_helper_columns,
"is_hive_engine": is_python_engine,
"td_version": training_dataset_version,
},
)
)

def get_serving_prepared_statement(self, name, version, batch):
def get_serving_prepared_statement(
self, name, version, batch, inference_helper_columns
):
path = self._base_path + [
name,
self._VERSION,
version,
self._PREPARED_STATEMENT,
]
headers = {"content-type": "application/json"}
query_params = {"batch": batch}
query_params = {
"batch": batch,
"inference_helper_columns": inference_helper_columns,
}
return serving_prepared_statement.ServingPreparedStatement.from_response_json(
self._client._send_request("GET", path, query_params, headers=headers)
)
Expand Down
156 changes: 149 additions & 7 deletions python/hsfs/core/feature_view_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,36 @@ def save(self, feature_view_obj):
featuregroup=featuregroup,
)
)
if feature_view_obj.inference_helper_columns:
for helper_column_name in feature_view_obj.inference_helper_columns:
(
feature,
prefix,
featuregroup,
) = feature_view_obj.query._get_feature_by_name(helper_column_name)
feature_view_obj._features.append(
training_dataset_feature.TrainingDatasetFeature(
name=feature.name,
inference_helper_column=True,
featuregroup=featuregroup,
)
)

if feature_view_obj.training_helper_columns:
for helper_column_name in feature_view_obj.training_helper_columns:
(
feature,
prefix,
featuregroup,
) = feature_view_obj.query._get_feature_by_name(helper_column_name)
feature_view_obj._features.append(
training_dataset_feature.TrainingDatasetFeature(
name=feature.name,
training_helper_column=True,
featuregroup=featuregroup,
)
)

self._transformation_function_engine.attach_transformation_fn(feature_view_obj)
updated_fv = self._feature_view_api.post(feature_view_obj)
self.attach_transformation_function(updated_fv)
Expand Down Expand Up @@ -124,6 +154,10 @@ def get_batch_query(
start_time,
end_time,
with_label=False,
primary_keys=False,
event_time=False,
inference_helper_columns=False,
training_helper_columns=False,
training_dataset_version=None,
spine=None,
):
Expand All @@ -136,6 +170,10 @@ def get_batch_query(
training_dataset_version=training_dataset_version,
is_python_engine=engine.get_type() == "python",
with_label=with_label,
primary_keys=primary_keys,
event_time=event_time,
inference_helper_columns=inference_helper_columns,
training_helper_columns=training_helper_columns,
)
# verify whatever is passed 1. spine group with dataframe contained, or 2. dataframe
# the schema has to be consistent
Expand Down Expand Up @@ -214,7 +252,14 @@ def get_attached_transformation_fn(self, name, version):
return transformation_functions_dict

def create_training_dataset(
self, feature_view_obj, training_dataset_obj, user_write_options, spine=None
self,
feature_view_obj,
training_dataset_obj,
user_write_options,
spine=None,
primary_keys=False,
event_time=False,
training_helper_columns=False,
):
self._set_event_time(feature_view_obj, training_dataset_obj)
updated_instance = self._create_training_data_metadata(
Expand All @@ -225,6 +270,9 @@ def create_training_dataset(
user_write_options,
training_dataset_obj=training_dataset_obj,
spine=spine,
primary_keys=primary_keys,
event_time=event_time,
training_helper_columns=training_helper_columns,
)
return updated_instance, td_job

Expand All @@ -236,6 +284,9 @@ def get_training_data(
training_dataset_obj=None,
training_dataset_version=None,
spine=None,
primary_keys=False,
event_time=False,
training_helper_columns=False,
):
# check if provided td version has already existed.
if training_dataset_version:
Expand Down Expand Up @@ -265,7 +316,15 @@ def get_training_data(

if td_updated.training_dataset_type != td_updated.IN_MEMORY:
split_df = self._read_from_storage_connector(
td_updated, td_updated.splits, read_options, feature_view_obj.schema
td_updated,
td_updated.splits,
read_options,
with_primary_keys=primary_keys,
primary_keys=feature_view_obj.primary_keys,
with_event_time=event_time,
event_time=feature_view_obj.query._left_feature_group.event_time,
with_training_helper_columns=training_helper_columns,
training_helper_columns=feature_view_obj.training_helper_columns,
)
else:
self._check_feature_group_accessibility(feature_view_obj)
Expand All @@ -275,6 +334,10 @@ def get_training_data(
start_time=td_updated.event_start_time,
end_time=td_updated.event_end_time,
with_label=True,
inference_helper_columns=False,
primary_keys=primary_keys,
event_time=event_time,
training_helper_columns=training_helper_columns,
spine=spine,
)
split_df = engine.get_instance().get_training_data(
Expand Down Expand Up @@ -349,20 +412,46 @@ def recreate_training_dataset(
return training_dataset_obj, td_job

def _read_from_storage_connector(
self, training_data_obj, splits, read_options, schema=None
self,
training_data_obj,
splits,
read_options,
with_primary_keys,
primary_keys,
with_event_time,
event_time,
with_training_helper_columns,
training_helper_columns,
):

if splits:
result = {}
for split in splits:
path = training_data_obj.location + "/" + str(split.name)
result[split.name] = self._read_dir_from_storage_connector(
training_data_obj, path, read_options
training_data_obj,
path,
read_options,
with_primary_keys,
primary_keys,
with_event_time,
event_time,
with_training_helper_columns,
training_helper_columns,
)
return result
else:
path = training_data_obj.location + "/" + training_data_obj.name
return self._read_dir_from_storage_connector(
training_data_obj, path, read_options
training_data_obj,
path,
read_options,
with_primary_keys,
primary_keys,
with_event_time,
event_time,
with_training_helper_columns,
training_helper_columns,
)

def _cast_columns(self, data_format, df, schema):
Expand All @@ -373,15 +462,34 @@ def _cast_columns(self, data_format, df, schema):
else:
return df

def _read_dir_from_storage_connector(self, training_data_obj, path, read_options):
def _read_dir_from_storage_connector(
self,
training_data_obj,
path,
read_options,
with_primary_keys,
primary_keys,
with_event_time,
event_time,
with_training_helper_columns,
training_helper_columns,
):
try:
return training_data_obj.storage_connector.read(
df = training_data_obj.storage_connector.read(
# always read from materialized dataset, not query object
query=None,
data_format=training_data_obj.data_format,
options=read_options,
path=path,
)

df = self._drop_helper_columns(df, with_primary_keys, primary_keys)
df = self._drop_helper_columns(df, with_event_time, event_time)
df = self._drop_helper_columns(
df, with_training_helper_columns, training_helper_columns
)
return df

except Exception as e:
if isinstance(e, FileNotFoundError):
raise FileNotFoundError(
Expand All @@ -391,6 +499,20 @@ def _read_dir_from_storage_connector(self, training_data_obj, path, read_options
else:
raise e

def _drop_helper_columns(self, df, with_columns, columns):
if not with_columns:
if columns:
try:
df = engine.get_instance().drop_columns(df, columns)
except KeyError:
pass
else:
if not columns:
warnings.warn(
"Parent feature view doesn't have helper columns, thus drop will be ignored "
)
return df

# This method is used by hsfs_utils to launch a job for python client
def compute_training_dataset(
self,
Expand All @@ -399,6 +521,9 @@ def compute_training_dataset(
training_dataset_obj=None,
training_dataset_version=None,
spine=None,
primary_keys=False,
event_time=False,
training_helper_columns=False,
):
if training_dataset_obj:
pass
Expand All @@ -414,9 +539,19 @@ def compute_training_dataset(
training_dataset_obj.event_start_time,
training_dataset_obj.event_end_time,
with_label=True,
primary_keys=primary_keys,
event_time=event_time,
inference_helper_columns=False,
training_helper_columns=training_helper_columns,
training_dataset_version=training_dataset_obj.version,
spine=spine,
)

# for spark job
user_write_options["training_helper_columns"] = training_helper_columns
user_write_options["primary_keys"] = primary_keys
user_write_options["event_time"] = event_time

td_job = engine.get_instance().write_training_dataset(
training_dataset_obj,
batch_query,
Expand Down Expand Up @@ -523,6 +658,9 @@ def get_batch_data(
transformation_functions,
read_options=None,
spine=None,
primary_keys=False,
event_time=False,
inference_helper_columns=False,
):
self._check_feature_group_accessibility(feature_view_obj)

Expand All @@ -531,6 +669,10 @@ def get_batch_data(
start_time,
end_time,
with_label=False,
primary_keys=primary_keys,
event_time=event_time,
inference_helper_columns=inference_helper_columns,
training_helper_columns=False,
training_dataset_version=training_dataset_version,
spine=spine,
).read(read_options=read_options)
Expand Down
Loading
Loading