diff --git a/wyvern/feature_store/feature_server.py b/wyvern/feature_store/feature_server.py index 37e4312..14dbeb8 100644 --- a/wyvern/feature_store/feature_server.py +++ b/wyvern/feature_store/feature_server.py @@ -532,25 +532,26 @@ async def get_historical_features_v2( composite_entities[entity_type_column] = entity_names valid_realtime_features.append(realtime_feature) - # TODO: generate all the composite feature columns in the remote composite table - # this needs to be a sql query that generates a temporary table RT_{HEX_ID}_COMPOSITE - # for entity_type_column in composite_entities: - # entity_name1, entity_name2 = composite_entities[entity_type_column] - # df[entity_type_column] = df[entity_name1] + ":" + df[entity_name2] - composite_columns = ",".join( [ - ":".join(entities) + f" as {entity_type_column}" + " || ':' || ".join(entities) + f" AS {entity_type_column}" for entity_type_column, entities in composite_entities.items() ], ) + composite_historical_feature_table = f"HISTORICAL_FEATURES_{hex_id}" + + # TODO: send this sql to snowflake to create temporary table with this select_sql query select_sql = f""" - SELECT *, {composite_columns}, timestamp as event_timestamp + CREATE TABLE {composite_historical_feature_table} AS + SELECT *, {composite_columns}, TIMESTAMP as event_timestamp FROM {data.table} """ - # create temporary table with this select_sql query - build_and_merge_realtime_pivot_tables(hex_id, realtime_features) + result_table = build_and_merge_realtime_pivot_tables( + valid_realtime_features, + data.table, + composite_historical_feature_table, + ) # feast_requests = build_historical_registry_feature_requests( # store=store, @@ -590,7 +591,7 @@ async def get_historical_features_v2( # final_df["timestamp"] = final_df["timestamp"].astype(str) return GetHistoricalFeaturesResponseV2( - result_table="result_table", + result_table=result_table, ) return app diff --git a/wyvern/feature_store/historical_feature_util.py b/wyvern/feature_store/historical_feature_util.py index f5d4f1e..db86fba 100644 --- a/wyvern/feature_store/historical_feature_util.py +++ b/wyvern/feature_store/historical_feature_util.py @@ -118,9 +118,10 @@ def build_historical_real_time_feature_requests( def build_and_merge_realtime_pivot_tables( - hex_id: str, full_feature_names: List[str], -) -> Dict[str, RequestEntityIdentifierObjects]: + input_table: str, + composite_table: str, +) -> str: """ Build historical real-time feature requests grouped by entity types so that we can process them in parallel. @@ -135,7 +136,15 @@ def build_and_merge_realtime_pivot_tables( features_grouped_by_entity = group_realtime_features_by_entity_type( full_feature_names=full_feature_names, ) - result_dict: Dict[str, RequestEntityIdentifierObjects] = {} + counter = 0 + + # prev_table is the previous temporary composite table + prev_table = composite_table + # next_table is the next temporary composite table joined with the next entity type + next_table = f"{composite_table}_0" + + # iterate through all the entity types. + # For each entity type, build a new temporary composite table with all the features for this entity type for ( entity_identifier_type, curr_feature_names, @@ -145,11 +154,57 @@ def build_and_merge_realtime_pivot_tables( if len(entity_list) > 2: logger.warning("Invalid entity_identifier_type={entity_identifier_type}") continue - # TODO: PIVOT to generate - - # TODO: merge the PIVOT table into the _COMPOSITE table + curr_feature_names_underscore = [ + fn.replace(":", "__", 1) for fn in curr_feature_names + ] + entity_identifier_type_val = ":".join(entity_list) + feature_names_sql_str = ",".join( + [f"'{fn}'" for fn in curr_feature_names_underscore], + ) + feature_names_with_pivot_table_str = ",".join( + [ + f"PIVOT_TABLE.{feature_name}" + for feature_name in curr_feature_names_underscore + ], + ) + feature_names_pivot_raw = ",".join( + ["\"'{fn}'\" as {fn}" for fn in curr_feature_names_underscore], + ) - return result_dict + # TODO: send this sql to snowflake + pivot_sql = f""" + CREATE TABLE {next_table} AS ( + WITH PIVOT_DATA AS ( + SELECT F.REQUEST_ID AS REQUEST, + F.API_SOURCE, + F.EVENT_TYPE, + F.FEATURE_IDENTIFIER, + F.FEATURE_IDENTIFIER_TYPE, + REPLACE(F.FEATURE_NAME, ':', '__') AS FEATURE_NAME, + F.FEATURE_VALUE + FROM FEATURE_LOGS F + INNER JOIN (SELECT DISTINCT REQUEST FROM {input_table}) T + ON F.REQUEST_ID = T.REQUEST + WHERE F.FEATURE_IDENTIFIER_TYPE = '{entity_identifier_type_val}' + ), PIVOT_TABLE_RAW AS ( + SELECT * + FROM PIVOT_DATA + PIVOT(MAX(FEATURE_VALUE) FOR FEATURE_NAME IN ({feature_names_sql_str})) + ), PIVOT_TABLE AS ( + SELECT REQUEST, FEATURE_IDENTIFIER, FEATURE_IDENTIFIER_TYPE, {feature_names_pivot_raw} + FROM PIVOT_TABLE_RAW + ) + SELECT + {prev_table}.*,{feature_names_with_pivot_table_str} + FROM + {prev_table} + LEFT JOIN PIVOT_TABLE ON {prev_table}.REQUEST = PIVOT_TABLE.REQUEST AND {prev_table}.{entity_identifier_type} = PIVOT_TABLE.FEATURE_IDENTIFIER + ) + """ + counter += 1 + prev_table = next_table + next_table = f"{composite_table}_{counter}" + return next_table def process_historical_real_time_features_requests(