From a2551557fc9f6a7ca31b075778767fea4fe6bfaf Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Wed, 15 Nov 2023 19:03:26 +0800 Subject: [PATCH] final code --- wyvern/feature_store/feature_server.py | 18 +-------- .../feature_store/historical_feature_util.py | 38 +++++++++++-------- 2 files changed, 24 insertions(+), 32 deletions(-) diff --git a/wyvern/feature_store/feature_server.py b/wyvern/feature_store/feature_server.py index 41056d8..2c7689e 100644 --- a/wyvern/feature_store/feature_server.py +++ b/wyvern/feature_store/feature_server.py @@ -515,22 +515,6 @@ async def get_historical_features_v2( continue if len(entity_names) == 2: - entity_name_1 = entity_names[0] - entity_name_2 = entity_names[1] - - # TODO: valid entity_name_1 and entity_name_2 are in the table columns - # if entity_name_1 not in data.entities: - # logger.warning( - # f"Realtime feature {realtime_feature} depends on " - # f"entity={entity_name_1}, which is not found in entities", - # ) - # continue - # if entity_name_2 not in data.entities: - # logger.warning( - # f"Realtime feature {realtime_feature} depends on " - # f"entity={entity_name_2}, which is not found in entities", - # ) - # continue composite_entities[entity_type_column] = entity_names valid_realtime_features.append(realtime_feature) @@ -544,7 +528,7 @@ async def get_historical_features_v2( # TODO: send this sql to snowflake to create temporary table with this select_sql query select_sql = f""" - CREATE TABLE {composite_historical_feature_table} AS + CREATE TEMPORARY TABLE {composite_historical_feature_table} AS SELECT *, {composite_columns}, TIMESTAMP as event_timestamp FROM {data.table} """ diff --git a/wyvern/feature_store/historical_feature_util.py b/wyvern/feature_store/historical_feature_util.py index 0d8897c..0d1b0c2 100644 --- a/wyvern/feature_store/historical_feature_util.py +++ b/wyvern/feature_store/historical_feature_util.py @@ -170,12 +170,12 @@ def build_and_merge_realtime_pivot_tables( ], ) feature_names_pivot_raw = ",".join( - ["\"'{fn}'\" as {fn}" for fn in curr_feature_names_underscore], + [f"\"'{fn}'\" as {fn}" for fn in curr_feature_names_underscore], ) # TODO: send this sql to snowflake pivot_sql = f""" - CREATE TABLE {next_table} AS ( + CREATE TEMPORARY TABLE {next_table} AS ( WITH PIVOT_DATA AS ( SELECT F.REQUEST_ID AS REQUEST, F.API_SOURCE, @@ -200,14 +200,16 @@ def build_and_merge_realtime_pivot_tables( {prev_table}.*,{feature_names_with_pivot_table_str} FROM {prev_table} - LEFT JOIN PIVOT_TABLE ON {prev_table}.REQUEST = PIVOT_TABLE.REQUEST AND {prev_table}.{entity_identifier_type} = PIVOT_TABLE.FEATURE_IDENTIFIER + LEFT JOIN PIVOT_TABLE ON + {prev_table}.REQUEST = PIVOT_TABLE.REQUEST AND + {prev_table}.{entity_identifier_type} = PIVOT_TABLE.FEATURE_IDENTIFIER ) """ context.cursor().execute(pivot_sql) counter += 1 prev_table = next_table next_table = f"{composite_table}_{counter}" - return next_table + return prev_table def process_historical_real_time_features_requests( @@ -436,6 +438,8 @@ def build_and_merge_feast_tables( f"Entity name should be singular or composite: {entity_name}", ) + feature_columns = [fn.replace(":", "__") for fn in feature_names] + # TODO: validate that all entities are in the entity_df_table # for entity in entities: # if entity not in entity_values: @@ -454,7 +458,7 @@ def build_and_merge_feast_tables( # dedupe (IDENTIFIER, event_timestamp) identifier_table_sql = f""" - WITH identifier_table_sql_dedupe AS ({identifier_table_sql_dupe}) + WITH identifier_table_sql_dupe AS ({identifier_table_sql_dupe}) SELECT IDENTIFIER, event_timestamp FROM identifier_table_sql_dupe WHERE rn = 1 @@ -471,30 +475,34 @@ def build_and_merge_feast_tables( # in the format "feast_entity_df_" followed by a hex string (UUID without dashes) result_sql = re.sub( r'"feast_entity_df_[0-9a-f]{32}"', - '"identifier_tbl"', + "identifier_tbl", result_sql, flags=re.IGNORECASE, ) new_feast_table_sql = f""" - CREATE TABLE {next_table}_feast AS ( - WITH identifier_tbl_dupe AS ({identifier_table_sql_dupe}), - identifier_tbl AS ( - SELECT IDENTIFIER, event_timestamp - FROM identifier_tbl_dupe, - WHERE rn = 1 - ), + CREATE TEMPORARY TABLE {next_table}_feast AS ( + WITH identifier_tbl_dupe AS ({identifier_table_sql_dupe}), + identifier_tbl AS ( + SELECT IDENTIFIER, event_timestamp + FROM identifier_tbl_dupe + WHERE rn = 1 + ), {result_sql} ) """ context.cursor().execute(new_feast_table_sql) # left join to the previous composite table + picked_feature_columns_str = ", ".join( + [f'{next_table}_feast."{c}"' for c in feature_columns], + ) new_composite_table_sql = f""" CREATE TABLE {next_table} AS ( - SELECT * + SELECT {prev_table}.*, {picked_feature_columns_str} FROM {prev_table} LEFT JOIN {next_table}_feast - ON {prev_table}.{identifier_column} = {next_table}_feast.IDENTIFIER and {prev_table}.event_timestamp = {next_table}_feast.event_timestamp + ON {prev_table}.{identifier_column} = {next_table}_feast.IDENTIFIER and + {prev_table}.event_timestamp = {next_table}_feast.event_timestamp ) """ context.cursor().execute(new_composite_table_sql)