Skip to content

Commit

Permalink
error catching around pyarrow to pandas convertion
Browse files Browse the repository at this point in the history
  • Loading branch information
piket committed Oct 6, 2023
1 parent 0894c2d commit 5bbe27f
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions sdk/python/feast/embedded_go/online_features_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -36,6 +37,8 @@
MILLI_SECOND = 1000 * MICRO_SECOND
SECOND = 1000 * MILLI_SECOND

logger = logging.getLogger(__name__)


class EmbeddedOnlineFeatureServer:
def __init__(
Expand Down Expand Up @@ -243,23 +246,27 @@ def transformation_callback(
output_schema_ptr: int,
full_feature_names: bool,
) -> int:
odfv = fs.get_on_demand_feature_view(on_demand_feature_view_name, allow_cache=True)
try:
odfv = fs.get_on_demand_feature_view(on_demand_feature_view_name, allow_cache=True)

input_record = pa.RecordBatch._import_from_c(input_arr_ptr, input_schema_ptr)
input_record = pa.RecordBatch._import_from_c(input_arr_ptr, input_schema_ptr)

# For some reason, the callback is called with `full_feature_names` as a 1 if True or 0 if false. This handles
# the typeguard requirement.
full_feature_names = bool(full_feature_names)
# For some reason, the callback is called with `full_feature_names` as a 1 if True or 0 if false. This handles
# the typeguard requirement.
full_feature_names = bool(full_feature_names)

output = odfv.get_transformed_features_df(
input_record.to_pandas(), full_feature_names=full_feature_names
)
output_record = pa.RecordBatch.from_pandas(output)
output = odfv.get_transformed_features_df(
input_record.to_pandas(), full_feature_names=full_feature_names
)
output_record = pa.RecordBatch.from_pandas(output)

output_record.schema._export_to_c(output_schema_ptr)
output_record._export_to_c(output_arr_ptr)
output_record.schema._export_to_c(output_schema_ptr)
output_record._export_to_c(output_arr_ptr)

return output_record.num_rows
return output_record.num_rows
except Exception as e:
logger.error(f"transformation callback failed with exception: {e}")
return 0


def logging_callback(
Expand Down

0 comments on commit 5bbe27f

Please sign in to comment.