Skip to content

Commit

Permalink
Merge branch 'master' into FSTORE-1461
Browse files Browse the repository at this point in the history
  • Loading branch information
aversey authored Jul 8, 2024
2 parents c6fc2cc + 65a05ca commit a0cfa99
Show file tree
Hide file tree
Showing 10 changed files with 703 additions and 47 deletions.
53 changes: 53 additions & 0 deletions python/hsfs/core/feature_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import json
from typing import Any, Dict

import humps
from hsfs import feature_group, util


class FeatureLogging:

def __init__(self, id: int,
transformed_features: "feature_group.FeatureGroup",
untransformed_features: "feature_group.FeatureGroup"):
self._id = id
self._transformed_features = transformed_features
self._untransformed_features = untransformed_features

@classmethod
def from_response_json(cls, json_dict: Dict[str, Any]) -> 'FeatureLogging':
from hsfs.feature_group import FeatureGroup # avoid circular import
json_decamelized = humps.decamelize(json_dict)
transformed_features = json_decamelized.get('transformed_log')
untransformed_features = json_decamelized.get('untransformed_log')
if transformed_features:
transformed_features = FeatureGroup.from_response_json(transformed_features)
if untransformed_features:
untransformed_features = FeatureGroup.from_response_json(untransformed_features)
return cls(json_decamelized.get('id'), transformed_features, untransformed_features)

@property
def transformed_features(self) -> "feature_group.FeatureGroup":
return self._transformed_features

@property
def untransformed_features(self) -> "feature_group.FeatureGroup":
return self._untransformed_features

@property
def id(self) -> str:
return self._id

def to_dict(self):
return {
'id': self._id,
'transformed_log': self._transformed_features,
'untransformed_log': self._untransformed_features,
}

def json(self) -> Dict[str, Any]:
return json.dumps(self, cls=util.FeatureStoreEncoder)

def __repr__(self):
return self.json()

103 changes: 103 additions & 0 deletions python/hsfs/core/feature_view_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from hsfs.client.exceptions import RestAPIError
from hsfs.constructor import query, serving_prepared_statement
from hsfs.core import explicit_provenance, job, training_dataset_job_conf
from hsfs.core.job import Job


class FeatureViewApi:
Expand All @@ -43,6 +44,13 @@ class FeatureViewApi:
_COMPUTE = "compute"
_PROVENANCE = "provenance"
_LINKS = "links"
_LOGGING = "log"
_PAUSE_LOGGING = "pause"
_RESUME_LOGGING = "resume"
_MATERIALIZE_LOGGING = "materialize"
_TRANSFORMED_lOG = "transformed"
_UNTRANSFORMED_LOG = "untransformed"


def __init__(self, feature_store_id: int) -> None:
self._feature_store_id = feature_store_id
Expand Down Expand Up @@ -358,3 +366,98 @@ def get_models_provenance(
explicit_provenance.Links.Type.MODEL,
training_dataset_version=training_dataset_version,
)

def enable_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
self._VERSION,
feature_view_version,
self._LOGGING,
]
_client._send_request("PUT", path_params, {})

def pause_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
self._VERSION,
feature_view_version,
self._LOGGING,
self._PAUSE_LOGGING,
]
return _client._send_request("POST", path_params, {})

def resume_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
self._VERSION,
feature_view_version,
self._LOGGING,
self._RESUME_LOGGING,
]
return _client._send_request("POST", path_params, {})

def materialize_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
self._VERSION,
feature_view_version,
self._LOGGING,
self._MATERIALIZE_LOGGING,
]
jobs_json = _client._send_request("POST", path_params, {})
jobs = []
if jobs_json.get("count", 0) > 1:
for item in jobs_json["items"]:
jobs.append(Job.from_response_json(item))
else:
jobs.append(Job.from_response_json(jobs_json))
return jobs

def get_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
self._VERSION,
feature_view_version,
self._LOGGING,
]
return _client._send_request("GET", path_params, {})

def delete_feature_logs(
self,
feature_view_name: str,
feature_view_version: int,
transformed: bool = None,
):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
self._VERSION,
feature_view_version,
self._LOGGING,
]
if transformed is not None:
if transformed:
path_params += [self._TRANSFORMED_lOG]
else:
path_params += [self._UNTRANSFORMED_LOG]
_client._send_request("DELETE", path_params, {})
154 changes: 151 additions & 3 deletions python/hsfs/core/feature_view_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import datetime
import warnings
from typing import Optional
from typing import Dict, Optional, Union

from hsfs import (
client,
Expand All @@ -29,6 +29,7 @@
)
from hsfs.client import exceptions
from hsfs.client.exceptions import FeatureStoreException
from hsfs.constructor.filter import Filter, Logic
from hsfs.core import (
arrow_flight_client,
code_engine,
Expand All @@ -39,6 +40,7 @@
training_dataset_engine,
transformation_function_engine,
)
from hsfs.core.feature_logging import FeatureLogging
from hsfs.training_dataset_split import TrainingDatasetSplit


Expand All @@ -48,6 +50,10 @@ class FeatureViewEngine:
_OVERWRITE = "overwrite"
_APPEND = "append"

_LOG_TD_VERSION = "td_version"
_LOG_TIME = "log_time"
_HSML_MODEL = "hsml_model"

def __init__(self, feature_store_id):
self._feature_store_id = feature_store_id

Expand Down Expand Up @@ -742,6 +748,7 @@ def get_batch_data(
event_time=False,
inference_helper_columns=False,
dataframe_type="default",
transformed=True,
):
self._check_feature_group_accessibility(feature_view_obj)

Expand All @@ -758,18 +765,23 @@ def get_batch_data(
with_label=False,
primary_keys=primary_keys,
event_time=event_time,
inference_helper_columns=inference_helper_columns,
inference_helper_columns=inference_helper_columns or transformed,
training_helper_columns=False,
training_dataset_version=training_dataset_version,
spine=spine,
).read(read_options=read_options, dataframe_type=dataframe_type)
if transformation_functions:
if transformation_functions and transformed:
return engine.get_instance()._apply_transformation_function(
transformation_functions, dataset=feature_dataframe
)
else:
return feature_dataframe

def transform_batch_data(self, features, transformation_functions):
return engine.get_instance()._apply_transformation_function(
transformation_functions, dataset=features, inplace=False
)

def add_tag(
self, feature_view_obj, name: str, value, training_dataset_version=None
):
Expand Down Expand Up @@ -923,3 +935,139 @@ def _check_if_exists_with_prefix(self, f_name, f_set):
)
else:
return f_name

def enable_feature_logging(self, fv):
self._feature_view_api.enable_feature_logging(fv.name, fv.version)
fv.logging_enabled = True
return fv

def get_feature_logging(self, fv):
return FeatureLogging.from_response_json(
self._feature_view_api.get_feature_logging(fv.name, fv.version)
)

def _get_logging_fg(self, fv, transformed):
feature_logging = self.get_feature_logging(fv)
if transformed:
return feature_logging.transformed_features
else:
return feature_logging.untransformed_features

def log_features(self, fv, features, prediction=None, transformed=False, write_options=None, training_dataset_version=None, hsml_model=None):
default_write_options = {
"start_offline_materialization": False,
}
if write_options:
default_write_options.update(write_options)
fg = self._get_logging_fg(fv, transformed)
df = engine.get_instance().get_feature_logging_df(
fg,
features,
[feature for feature in fv.features if not feature.label],
[feature for feature in fv.features if feature.label],
FeatureViewEngine._LOG_TD_VERSION,
FeatureViewEngine._LOG_TIME,
FeatureViewEngine._HSML_MODEL,
prediction,
training_dataset_version,
hsml_model,
)
return fg.insert(df, write_options=default_write_options)

def read_feature_logs(self, fv,
start_time: Optional[
Union[str, int, datetime, datetime.date]] = None,
end_time: Optional[
Union[str, int, datetime, datetime.date]] = None,
filter: Optional[Union[Filter, Logic]]=None,
transformed: Optional[bool]=False,
training_dataset_version=None,
hsml_model=None,
):
fg = self._get_logging_fg(fv, transformed)
fv_feat_name_map = self._get_fv_feature_name_map(fv)
query = fg.select_all()
if start_time:
query = query.filter(fg.get_feature(FeatureViewEngine._LOG_TIME) >= start_time)
if end_time:
query = query.filter(fg.get_feature(FeatureViewEngine._LOG_TIME) <= end_time)
if training_dataset_version:
query = query.filter(fg.get_feature(FeatureViewEngine._LOG_TD_VERSION) == training_dataset_version)
if hsml_model:
query = query.filter(fg.get_feature(FeatureViewEngine._HSML_MODEL) == self.get_hsml_model_value(hsml_model))
if filter:
query = query.filter(self._convert_to_log_fg_filter(fg, fv, filter, fv_feat_name_map))
df = query.read()
df = df.drop(["log_id", FeatureViewEngine._LOG_TIME], axis=1)
return df

@staticmethod
def get_hsml_model_value(hsml_model):
return f"{hsml_model.name}_{hsml_model.version}"

def _convert_to_log_fg_filter(self, fg, fv, filter, fv_feat_name_map):
if filter is None:
return None

if isinstance(filter, Logic):
return Logic(
filter.type,
left_f=self._convert_to_log_fg_filter(fv, filter.left_f),
right_f=self._convert_to_log_fg_filter(fv, filter.right_f),
left_l=self._convert_to_log_fg_filter(fv, filter.left_l),
right_l=self._convert_to_log_fg_filter(fv, filter.right_l),
)
elif isinstance(filter, Filter):
fv_feature_name = fv_feat_name_map.get(
f"{filter.feature.feature_group_id}_{filter.feature.name}")
if fv_feature_name is None:
raise FeatureStoreException("Filter feature {filter.feature.name} does not exist in feature view feature.")
return Filter(
fg.get_feature(filter.feature.name),
filter.condition,
filter.value,
)
else:
raise FeatureStoreException("Accept only Filter or Logic")

def _get_fv_feature_name_map(self, fv) -> Dict[str, str]:
result_dict = {}
for td_feature in fv.features:
fg_feature_key = f"{td_feature.feature_group.id}_{td_feature.feature_group_feature_name}"
result_dict[fg_feature_key] = td_feature.name
return result_dict

def get_log_timeline(self, fv,
wallclock_time: Optional[
Union[str, int, datetime, datetime.date]] = None,
limit: Optional[int] = None,
transformed: Optional[bool]=False,
) -> Dict[str, Dict[str, str]]:
fg = self._get_logging_fg(fv, transformed)
return fg.commit_details(wallclock_time=wallclock_time, limit=limit)

def pause_logging(self, fv):
self._feature_view_api.pause_feature_logging(
fv.name, fv.version
)
def resume_logging(self, fv):
self._feature_view_api.resume_feature_logging(
fv.name, fv.version
)

def materialize_feature_logs(self, fv, wait):
jobs = self._feature_view_api.materialize_feature_logging(
fv.name, fv.version
)
if wait:
for job in jobs:
try:
job._wait_for_job(wait)
except Exception:
pass
return jobs

def delete_feature_logs(self, fv, transformed):
self._feature_view_api.delete_feature_logs(
fv.name, fv.version, transformed
)
Loading

0 comments on commit a0cfa99

Please sign in to comment.