Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

Commit

Permalink
final code
Browse files Browse the repository at this point in the history
  • Loading branch information
wintonzheng committed Nov 15, 2023
1 parent cbe2f2e commit a255155
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 32 deletions.
18 changes: 1 addition & 17 deletions wyvern/feature_store/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,22 +515,6 @@ async def get_historical_features_v2(
continue

if len(entity_names) == 2:
entity_name_1 = entity_names[0]
entity_name_2 = entity_names[1]

# TODO: valid entity_name_1 and entity_name_2 are in the table columns
# if entity_name_1 not in data.entities:
# logger.warning(
# f"Realtime feature {realtime_feature} depends on "
# f"entity={entity_name_1}, which is not found in entities",
# )
# continue
# if entity_name_2 not in data.entities:
# logger.warning(
# f"Realtime feature {realtime_feature} depends on "
# f"entity={entity_name_2}, which is not found in entities",
# )
# continue
composite_entities[entity_type_column] = entity_names
valid_realtime_features.append(realtime_feature)

Expand All @@ -544,7 +528,7 @@ async def get_historical_features_v2(

# TODO: send this sql to snowflake to create temporary table with this select_sql query
select_sql = f"""
CREATE TABLE {composite_historical_feature_table} AS
CREATE TEMPORARY TABLE {composite_historical_feature_table} AS
SELECT *, {composite_columns}, TIMESTAMP as event_timestamp
FROM {data.table}
"""
Expand Down
38 changes: 23 additions & 15 deletions wyvern/feature_store/historical_feature_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ def build_and_merge_realtime_pivot_tables(
],
)
feature_names_pivot_raw = ",".join(
["\"'{fn}'\" as {fn}" for fn in curr_feature_names_underscore],
[f"\"'{fn}'\" as {fn}" for fn in curr_feature_names_underscore],
)

# TODO: send this sql to snowflake
pivot_sql = f"""
CREATE TABLE {next_table} AS (
CREATE TEMPORARY TABLE {next_table} AS (
WITH PIVOT_DATA AS (
SELECT F.REQUEST_ID AS REQUEST,
F.API_SOURCE,
Expand All @@ -200,14 +200,16 @@ def build_and_merge_realtime_pivot_tables(
{prev_table}.*,{feature_names_with_pivot_table_str}
FROM
{prev_table}
LEFT JOIN PIVOT_TABLE ON {prev_table}.REQUEST = PIVOT_TABLE.REQUEST AND {prev_table}.{entity_identifier_type} = PIVOT_TABLE.FEATURE_IDENTIFIER
LEFT JOIN PIVOT_TABLE ON
{prev_table}.REQUEST = PIVOT_TABLE.REQUEST AND
{prev_table}.{entity_identifier_type} = PIVOT_TABLE.FEATURE_IDENTIFIER
)
"""
context.cursor().execute(pivot_sql)
counter += 1
prev_table = next_table
next_table = f"{composite_table}_{counter}"
return next_table
return prev_table


def process_historical_real_time_features_requests(
Expand Down Expand Up @@ -436,6 +438,8 @@ def build_and_merge_feast_tables(
f"Entity name should be singular or composite: {entity_name}",
)

feature_columns = [fn.replace(":", "__") for fn in feature_names]

# TODO: validate that all entities are in the entity_df_table
# for entity in entities:
# if entity not in entity_values:
Expand All @@ -454,7 +458,7 @@ def build_and_merge_feast_tables(

# dedupe (IDENTIFIER, event_timestamp)
identifier_table_sql = f"""
WITH identifier_table_sql_dedupe AS ({identifier_table_sql_dupe})
WITH identifier_table_sql_dupe AS ({identifier_table_sql_dupe})
SELECT IDENTIFIER, event_timestamp
FROM identifier_table_sql_dupe
WHERE rn = 1
Expand All @@ -471,30 +475,34 @@ def build_and_merge_feast_tables(
# in the format "feast_entity_df_" followed by a hex string (UUID without dashes)
result_sql = re.sub(
r'"feast_entity_df_[0-9a-f]{32}"',
'"identifier_tbl"',
"identifier_tbl",
result_sql,
flags=re.IGNORECASE,
)
new_feast_table_sql = f"""
CREATE TABLE {next_table}_feast AS (
WITH identifier_tbl_dupe AS ({identifier_table_sql_dupe}),
identifier_tbl AS (
SELECT IDENTIFIER, event_timestamp
FROM identifier_tbl_dupe,
WHERE rn = 1
),
CREATE TEMPORARY TABLE {next_table}_feast AS (
WITH identifier_tbl_dupe AS ({identifier_table_sql_dupe}),
identifier_tbl AS (
SELECT IDENTIFIER, event_timestamp
FROM identifier_tbl_dupe
WHERE rn = 1
),
{result_sql}
)
"""
context.cursor().execute(new_feast_table_sql)

# left join to the previous composite table
picked_feature_columns_str = ", ".join(
[f'{next_table}_feast."{c}"' for c in feature_columns],
)
new_composite_table_sql = f"""
CREATE TABLE {next_table} AS (
SELECT *
SELECT {prev_table}.*, {picked_feature_columns_str}
FROM {prev_table}
LEFT JOIN {next_table}_feast
ON {prev_table}.{identifier_column} = {next_table}_feast.IDENTIFIER and {prev_table}.event_timestamp = {next_table}_feast.event_timestamp
ON {prev_table}.{identifier_column} = {next_table}_feast.IDENTIFIER and
{prev_table}.event_timestamp = {next_table}_feast.event_timestamp
)
"""
context.cursor().execute(new_composite_table_sql)
Expand Down

0 comments on commit a255155

Please sign in to comment.