From 208ff8896935a5ff8d0c135f03adc9107cb9c902 Mon Sep 17 00:00:00 2001 From: Kerem Yilmaz Date: Thu, 12 Oct 2023 07:36:40 -0700 Subject: [PATCH] Update tests, fix concat vs join bugs --- examples/real_time_features_main.py | 22 +- .../feature_store/test_real_time_features.py | 272 +++++------------- .../features/feature_retrieval_pipeline.py | 82 ++++-- .../features/realtime_features_component.py | 19 +- wyvern/components/helpers/polars.py | 2 +- .../components/models/modelbit_component.py | 7 +- 6 files changed, 162 insertions(+), 242 deletions(-) diff --git a/examples/real_time_features_main.py b/examples/real_time_features_main.py index ef047be..1b8c33e 100644 --- a/examples/real_time_features_main.py +++ b/examples/real_time_features_main.py @@ -337,15 +337,27 @@ async def execute( ) time_start = time() - feature_map = await self.feature_retrieval_pipeline.execute(request) + feature_df = await self.feature_retrieval_pipeline.execute(request) logger.info(f"operation feature_retrieval took:{time()-time_start:2.4f} sec") profiler.stop() profiler.print() + feature_dicts = feature_df.df.to_dicts() + feature_data: Dict[str, FeatureData] = { + str(feature_dict["IDENTIFIER"]): FeatureData( + identifier=Identifier( + identifier_type=feature_dict["IDENTIFIER"].split("::")[0], + identifier=feature_dict["IDENTIFIER"].split("::")[1], + ), + features={ + feature_name: feature_value + for feature_name, feature_value in feature_dict.items() + if feature_name != "IDENTIFIER" + }, + ) + for feature_dict in feature_dicts + } return FeatureStoreResponse( - feature_data={ - str(identifier): feature_map.feature_map[identifier] - for identifier in feature_map.feature_map.keys() - }, + feature_data=feature_data, ) diff --git a/tests/feature_store/test_real_time_features.py b/tests/feature_store/test_real_time_features.py index 6c6c3e5..d47fcd3 100644 --- a/tests/feature_store/test_real_time_features.py +++ b/tests/feature_store/test_real_time_features.py @@ -69,112 +69,60 @@ async def test_end_to_end(mock_redis, test_client, mock_feature_store): "identifier_type": "request", }, "features": { - "RealTimeNumberOfCandidatesFeature:f_number_of_candidates": 3.0, + "RealTimeNumberOfCandidatesFeature__f_number_of_candidates": 3.0, }, }, "query::candle": { "identifier": {"identifier": "candle", "identifier_type": "query"}, "features": { - "RealTimeQueryFeature:f_query_length": 6.0, - "RealTimeStringFeature:f_query": "candle", - "RealTimeEmbeddingFeature:f_query_embedding_vector_8": [ - 1, - 2, - 3, - 4, - 5, - 6, - 7, - 8, + "RealTimeQueryFeature__f_query_length": 6.0, + "RealTimeStringFeature__f_query": "candle", + "RealTimeEmbeddingFeature__f_query_embedding_vector_8": [ + 1.0, + 2.0, + 3.0, + 4.0, + 5.0, + 6.0, + 7.0, + 8.0, ], + "RealTimeUserQueryFeature__f_user_query_name_edit_distance": 3.0, + "RealTimeUserQueryFeature__f_user_query_name_jaccard_similarity": -3.0, }, }, "user::1234": { "identifier": {"identifier": "1234", "identifier_type": "user"}, - "features": {"RealTimeUserFeature:f_user_name_length": 9.0}, - }, - "query:user::candle:1234": { - "identifier": { - "identifier": "candle:1234", - "identifier_type": "query:user", - }, - "features": { - "RealTimeUserQueryFeature:f_user_query_name_edit_distance": 3.0, - "RealTimeUserQueryFeature:f_user_query_name_jaccard_similarity": -3.0, - }, + "features": {"RealTimeUserFeature__f_user_name_length": 9.0}, }, "product::p1": { "identifier": {"identifier": "p1", "identifier_type": "product"}, - "features": {"RealTimeProductFeature:f_opensearch_score": 1.0}, - }, - "product::p2": { - "identifier": {"identifier": "p2", "identifier_type": "product"}, - "features": {}, - }, - "product::p3": { - "identifier": {"identifier": "p3", "identifier_type": "product"}, - "features": {}, - }, - "product:query::p1:candle": { - "identifier": { - "identifier": "p1:candle", - "identifier_type": "product:query", - }, "features": { - "RealTimeMatchedQueriesProductFeature:f_matched_queries_QUERY_1": 1.0, - "RealTimeMatchedQueriesProductFeature:f_matched_queries_QUERY_2": 1.0, - "RealTimeQueryProductFeature:f_query_product_name_edit_distance": 4.0, - "RealTimeQueryProductFeature:f_query_product_name_jaccard_similarity": -4.0, + "RealTimeMatchedQueriesProductFeature__f_matched_queries_QUERY_1": 1.0, + "RealTimeMatchedQueriesProductFeature__f_matched_queries_QUERY_2": 1.0, + "RealTimeProductFeature__f_opensearch_score": 1.0, + "RealTimeQueryProductFeature__f_query_product_name_edit_distance": 4.0, + "RealTimeQueryProductFeature__f_query_product_name_jaccard_similarity": -4.0, + "RealTimeUserProductFeature__f_user_product_name_edit_distance": 7.0, + "RealTimeUserProductFeature__f_user_product_name_jaccard_similarity": -7.0, }, }, - "product:query::p2:candle": { - "identifier": { - "identifier": "p2:candle", - "identifier_type": "product:query", - }, - "features": { - "RealTimeQueryProductFeature:f_query_product_name_edit_distance": 4.0, - "RealTimeQueryProductFeature:f_query_product_name_jaccard_similarity": -4.0, - }, - }, - "product:query::p3:candle": { - "identifier": { - "identifier": "p3:candle", - "identifier_type": "product:query", - }, - "features": { - "RealTimeQueryProductFeature:f_query_product_name_edit_distance": 4.0, - "RealTimeQueryProductFeature:f_query_product_name_jaccard_similarity": -4.0, - }, - }, - "product:user::p1:1234": { - "identifier": { - "identifier": "p1:1234", - "identifier_type": "product:user", - }, - "features": { - "RealTimeUserProductFeature:f_user_product_name_edit_distance": 7.0, - "RealTimeUserProductFeature:f_user_product_name_jaccard_similarity": -7.0, - }, - }, - "product:user::p2:1234": { - "identifier": { - "identifier": "p2:1234", - "identifier_type": "product:user", - }, + "product::p2": { + "identifier": {"identifier": "p2", "identifier_type": "product"}, "features": { - "RealTimeUserProductFeature:f_user_product_name_edit_distance": 7.0, - "RealTimeUserProductFeature:f_user_product_name_jaccard_similarity": -7.0, + "RealTimeQueryProductFeature__f_query_product_name_edit_distance": 4.0, + "RealTimeQueryProductFeature__f_query_product_name_jaccard_similarity": -4.0, + "RealTimeUserProductFeature__f_user_product_name_edit_distance": 7.0, + "RealTimeUserProductFeature__f_user_product_name_jaccard_similarity": -7.0, }, }, - "product:user::p3:1234": { - "identifier": { - "identifier": "p3:1234", - "identifier_type": "product:user", - }, + "product::p3": { + "identifier": {"identifier": "p3", "identifier_type": "product"}, "features": { - "RealTimeUserProductFeature:f_user_product_name_edit_distance": 7.0, - "RealTimeUserProductFeature:f_user_product_name_jaccard_similarity": -7.0, + "RealTimeQueryProductFeature__f_query_product_name_edit_distance": 4.0, + "RealTimeQueryProductFeature__f_query_product_name_jaccard_similarity": -4.0, + "RealTimeUserProductFeature__f_user_product_name_edit_distance": 7.0, + "RealTimeUserProductFeature__f_user_product_name_jaccard_similarity": -7.0, }, }, }, @@ -225,138 +173,72 @@ async def test_end_to_end__2(mock_redis__2, test_client): "identifier_type": "request", }, "features": { - "RealTimeNumberOfCandidatesFeature:f_number_of_candidates": 4.0, + "RealTimeNumberOfCandidatesFeature__f_number_of_candidates": 4.0, }, }, "query::candle": { "identifier": {"identifier": "candle", "identifier_type": "query"}, "features": { - "RealTimeQueryFeature:f_query_length": 6.0, - "RealTimeStringFeature:f_query": "candle", - "RealTimeEmbeddingFeature:f_query_embedding_vector_8": [ - 1, - 2, - 3, - 4, - 5, - 6, - 7, - 8, + "RealTimeQueryFeature__f_query_length": 6.0, + "RealTimeStringFeature__f_query": "candle", + "RealTimeEmbeddingFeature__f_query_embedding_vector_8": [ + 1.0, + 2.0, + 3.0, + 4.0, + 5.0, + 6.0, + 7.0, + 8.0, ], + "RealTimeUserQueryFeature__f_user_query_name_edit_distance": 3.0, + "RealTimeUserQueryFeature__f_user_query_name_jaccard_similarity": -3.0, }, }, "user::1234": { "identifier": {"identifier": "1234", "identifier_type": "user"}, - "features": {"RealTimeUserFeature:f_user_name_length": 9.0}, - }, - "query:user::candle:1234": { - "identifier": { - "identifier": "candle:1234", - "identifier_type": "query:user", - }, - "features": { - "RealTimeUserQueryFeature:f_user_query_name_edit_distance": 3.0, - "RealTimeUserQueryFeature:f_user_query_name_jaccard_similarity": -3.0, - }, + "features": {"RealTimeUserFeature__f_user_name_length": 9.0}, }, "product::p1": { "identifier": {"identifier": "p1", "identifier_type": "product"}, - "features": {"RealTimeProductFeature:f_opensearch_score": 1.0}, - }, - "product::p2": { - "identifier": {"identifier": "p2", "identifier_type": "product"}, - "features": {}, - }, - "product::p3": { - "identifier": {"identifier": "p3", "identifier_type": "product"}, - "features": {}, - }, - "product::p4": { - "identifier": {"identifier": "p4", "identifier_type": "product"}, - "features": {"RealTimeProductFeature:f_opensearch_score": 100.0}, - }, - "product:query::p1:candle": { - "identifier": { - "identifier": "p1:candle", - "identifier_type": "product:query", - }, "features": { - "RealTimeMatchedQueriesProductFeature:f_matched_queries_QUERY_1": 1.0, - "RealTimeMatchedQueriesProductFeature:f_matched_queries_QUERY_2": 1.0, - "RealTimeQueryProductFeature:f_query_product_name_edit_distance": 4.0, - "RealTimeQueryProductFeature:f_query_product_name_jaccard_similarity": -4.0, + "RealTimeProductFeature__f_opensearch_score": 1.0, + "RealTimeMatchedQueriesProductFeature__f_matched_queries_QUERY_1": 1.0, + "RealTimeMatchedQueriesProductFeature__f_matched_queries_QUERY_2": 1.0, + "RealTimeQueryProductFeature__f_query_product_name_edit_distance": 4.0, + "RealTimeQueryProductFeature__f_query_product_name_jaccard_similarity": -4.0, + "RealTimeUserProductFeature__f_user_product_name_edit_distance": 7.0, + "RealTimeUserProductFeature__f_user_product_name_jaccard_similarity": -7.0, }, }, - "product:query::p2:candle": { - "identifier": { - "identifier": "p2:candle", - "identifier_type": "product:query", - }, - "features": { - "RealTimeQueryProductFeature:f_query_product_name_edit_distance": 4.0, - "RealTimeQueryProductFeature:f_query_product_name_jaccard_similarity": -4.0, - }, - }, - "product:query::p3:candle": { - "identifier": { - "identifier": "p3:candle", - "identifier_type": "product:query", - }, - "features": { - "RealTimeQueryProductFeature:f_query_product_name_edit_distance": 4.0, - "RealTimeQueryProductFeature:f_query_product_name_jaccard_similarity": -4.0, - }, - }, - "product:query::p4:candle": { - "identifier": { - "identifier": "p4:candle", - "identifier_type": "product:query", - }, - "features": { - "RealTimeMatchedQueriesProductFeature:f_matched_queries_MATIAS": 1.0, - "RealTimeMatchedQueriesProductFeature:f_matched_queries_QUERY_2": 1.0, - "RealTimeQueryProductFeature:f_query_product_name_edit_distance": 4.0, - "RealTimeQueryProductFeature:f_query_product_name_jaccard_similarity": -4.0, - }, - }, - "product:user::p1:1234": { - "identifier": { - "identifier": "p1:1234", - "identifier_type": "product:user", - }, - "features": { - "RealTimeUserProductFeature:f_user_product_name_edit_distance": 7.0, - "RealTimeUserProductFeature:f_user_product_name_jaccard_similarity": -7.0, - }, - }, - "product:user::p2:1234": { - "identifier": { - "identifier": "p2:1234", - "identifier_type": "product:user", - }, + "product::p2": { + "identifier": {"identifier": "p2", "identifier_type": "product"}, "features": { - "RealTimeUserProductFeature:f_user_product_name_edit_distance": 7.0, - "RealTimeUserProductFeature:f_user_product_name_jaccard_similarity": -7.0, + "RealTimeQueryProductFeature__f_query_product_name_edit_distance": 4.0, + "RealTimeQueryProductFeature__f_query_product_name_jaccard_similarity": -4.0, + "RealTimeUserProductFeature__f_user_product_name_edit_distance": 7.0, + "RealTimeUserProductFeature__f_user_product_name_jaccard_similarity": -7.0, }, }, - "product:user::p3:1234": { - "identifier": { - "identifier": "p3:1234", - "identifier_type": "product:user", - }, + "product::p3": { + "identifier": {"identifier": "p3", "identifier_type": "product"}, "features": { - "RealTimeUserProductFeature:f_user_product_name_edit_distance": 7.0, - "RealTimeUserProductFeature:f_user_product_name_jaccard_similarity": -7.0, + "RealTimeQueryProductFeature__f_query_product_name_edit_distance": 4.0, + "RealTimeQueryProductFeature__f_query_product_name_jaccard_similarity": -4.0, + "RealTimeUserProductFeature__f_user_product_name_edit_distance": 7.0, + "RealTimeUserProductFeature__f_user_product_name_jaccard_similarity": -7.0, }, }, - "product:user::p4:1234": { - "identifier": { - "identifier": "p4:1234", - "identifier_type": "product:user", - }, + "product::p4": { + "identifier": {"identifier": "p4", "identifier_type": "product"}, "features": { - "RealTimeUserProductFeature:f_user_product_name_edit_distance": 7.0, - "RealTimeUserProductFeature:f_user_product_name_jaccard_similarity": -7.0, + "RealTimeProductFeature__f_opensearch_score": 100.0, + "RealTimeMatchedQueriesProductFeature__f_matched_queries_MATIAS": 1.0, + "RealTimeMatchedQueriesProductFeature__f_matched_queries_QUERY_2": 1.0, + "RealTimeQueryProductFeature__f_query_product_name_edit_distance": 4.0, + "RealTimeQueryProductFeature__f_query_product_name_jaccard_similarity": -4.0, + "RealTimeUserProductFeature__f_user_product_name_edit_distance": 7.0, + "RealTimeUserProductFeature__f_user_product_name_jaccard_similarity": -7.0, }, }, }, diff --git a/wyvern/components/features/feature_retrieval_pipeline.py b/wyvern/components/features/feature_retrieval_pipeline.py index b354127..e32e19c 100644 --- a/wyvern/components/features/feature_retrieval_pipeline.py +++ b/wyvern/components/features/feature_retrieval_pipeline.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- import asyncio import logging -from typing import Generic, List, Optional, Set, Type +from collections import defaultdict +from typing import Generic, List, Optional, Set, Tuple, Type import polars as pl from ddtrace import tracer @@ -93,6 +94,41 @@ def __init__( name=name, ) + @tracer.wrap(name="FeatureRetrievalPipeline._concat_real_time_features") + def _concat_real_time_features( + self, + real_time_feature_dfs: List[Tuple[str, Optional[pl.DataFrame]]], + ) -> Optional[pl.DataFrame]: + """ + This method is used to cast and concatenate real-time features into one DataFrame. + + Args: + real_time_feature_dfs: A list of DataFrames that contain real-time features. + + Returns: + A DataFrame that contains all the real-time features. + """ + grouped_features = defaultdict(list) + for key, value in real_time_feature_dfs: + grouped_features[key].append(cast_float32_to_float64(value)) + + merged_features = [ + pl.concat(value, how="diagonal") if len(value) > 1 else value[0] + for value in grouped_features.values() + ] + + if not merged_features: + return None + + real_time_feature_merged_df = merged_features[0] + for df in merged_features[1:]: + real_time_feature_merged_df = real_time_feature_merged_df.join( + df, + on=IDENTIFIER, + how="outer", + ) + return real_time_feature_merged_df + @tracer.wrap(name="FeatureRetrievalPipeline._generate_real_time_features") def _generate_real_time_features( self, @@ -208,7 +244,7 @@ async def execute( feature_retrieval_response=feature_df, ) real_time_request_no_entity_features: List[ - Optional[pl.DataFrame] + Tuple[str, Optional[pl.DataFrame]] ] = await asyncio.gather( *[ real_time_feature.compute_request_features_wrapper( @@ -221,7 +257,7 @@ async def execute( with tracer.trace("FeatureRetrievalPipeline.real_time_entity_features"): real_time_request_features: List[ - Optional[pl.DataFrame] + Tuple[str, Optional[pl.DataFrame]] ] = await asyncio.gather( *[ real_time_feature.compute_features_wrapper( @@ -236,7 +272,7 @@ async def execute( with tracer.trace("FeatureRetrievalPipeline.real_time_combination_features"): real_time_request_combination_features: List[ - Optional[pl.DataFrame] + Tuple[str, Optional[pl.DataFrame]] ] = await asyncio.gather( *[ real_time_feature.compute_composite_features_wrapper( @@ -256,8 +292,10 @@ async def execute( ] ) - real_time_candidate_features: List[Optional[pl.DataFrame]] = [] - real_time_candidate_combination_features: List[Optional[pl.DataFrame]] = [] + real_time_candidate_features: List[Tuple[str, Optional[pl.DataFrame]]] = [] + real_time_candidate_combination_features: List[ + Tuple[str, Optional[pl.DataFrame]] + ] = [] if isinstance(input.request, CandidateSetEntity): with tracer.trace("FeatureRetrievalPipeline.real_time_candidate_features"): @@ -304,27 +342,21 @@ async def execute( # and we collect them together ahead of this dict comprehension # pytest / linter validation: we should assert for feature name conflicts -- this should never happen with tracer.trace("FeatureRetrievalPipeline.merge_feature_dfs"): - real_time_feature_dfs_optional = [ - *real_time_request_no_entity_features, - *real_time_request_features, - *real_time_request_combination_features, - *real_time_candidate_features, - *real_time_candidate_combination_features, - ] - real_time_feature_dfs = [ - cast_float32_to_float64(df) - for df in real_time_feature_dfs_optional - if df is not None - ] - real_time_feature_merged_df = pl.DataFrame() - if real_time_feature_dfs: - real_time_feature_merged_df = pl.concat( - real_time_feature_dfs, - how="diagonal", - ) + real_time_feature_merged_df = self._concat_real_time_features( + [ + *real_time_request_no_entity_features, + *real_time_request_features, + *real_time_request_combination_features, + *real_time_candidate_features, + *real_time_candidate_combination_features, + ], + ) with tracer.trace("FeatureRetrievalPipeline.create_feature_response"): - if real_time_feature_merged_df.is_empty(): + if ( + real_time_feature_merged_df is None + or real_time_feature_merged_df.is_empty() + ): feature_responses = feature_df.df else: await self.feature_logger_component.execute( diff --git a/wyvern/components/features/realtime_features_component.py b/wyvern/components/features/realtime_features_component.py index 3297faf..2d954b4 100644 --- a/wyvern/components/features/realtime_features_component.py +++ b/wyvern/components/features/realtime_features_component.py @@ -16,7 +16,6 @@ ) import polars as pl -from polars import DataFrame from pydantic.generics import GenericModel from wyvern.components.component import Component @@ -70,7 +69,7 @@ class RealtimeFeatureComponent( RealtimeFeatureRequest[REQUEST_ENTITY], RealtimeFeatureEntity[PRIMARY_ENTITY, SECONDARY_ENTITY], ], - Optional[DataFrame], + Tuple[str, Optional[pl.DataFrame]], ], Generic[PRIMARY_ENTITY, SECONDARY_ENTITY, REQUEST_ENTITY], ): @@ -261,7 +260,7 @@ async def execute( RealtimeFeatureEntity[PRIMARY_ENTITY, SECONDARY_ENTITY], ], **kwargs, - ) -> Optional[pl.DataFrame]: + ) -> Tuple[str, Optional[pl.DataFrame]]: # TODO (Suchintan): Delete this method -- this has been fully delegated upwards? request = input[0] entities = input[1] @@ -271,7 +270,7 @@ async def execute( entities.primary_entity, entities.secondary_entity, ): - return pl.DataFrame().with_columns( + return self.name, pl.DataFrame().with_columns( pl.Series(name=IDENTIFIER, dtype=pl.Utf8), ) @@ -339,7 +338,7 @@ async def compute_composite_features( async def compute_request_features_wrapper( self, request: RealtimeFeatureRequest[REQUEST_ENTITY], - ) -> Optional[pl.DataFrame]: + ) -> Tuple[str, Optional[pl.DataFrame]]: feature_data = await self.compute_request_features(request) return self.create_df_with_full_feature_name(feature_data) @@ -347,7 +346,7 @@ async def compute_features_wrapper( self, entity: PRIMARY_ENTITY, request: RealtimeFeatureRequest[REQUEST_ENTITY], - ) -> Optional[pl.DataFrame]: + ) -> Tuple[str, Optional[pl.DataFrame]]: feature_data = await self.compute_features(entity, request) return self.create_df_with_full_feature_name(feature_data) @@ -356,7 +355,7 @@ async def compute_composite_features_wrapper( primary_entity: PRIMARY_ENTITY, secondary_entity: SECONDARY_ENTITY, request: RealtimeFeatureRequest[REQUEST_ENTITY], - ) -> Optional[pl.DataFrame]: + ) -> Tuple[str, Optional[pl.DataFrame]]: feature_data = await self.compute_composite_features( primary_entity, secondary_entity, @@ -367,12 +366,12 @@ async def compute_composite_features_wrapper( def create_df_with_full_feature_name( self, feature_data: Optional[FeatureData], - ) -> Optional[pl.DataFrame]: + ) -> Tuple[str, Optional[pl.DataFrame]]: """ Creates a dataframe with the full feature name for the feature data """ if not feature_data: - return None + return self.name, None df = pl.DataFrame().with_columns( [ @@ -388,4 +387,4 @@ def create_df_with_full_feature_name( for feature_name, feature_value in feature_data.features.items() ], ) - return df + return self.name, df diff --git a/wyvern/components/helpers/polars.py b/wyvern/components/helpers/polars.py index bed6600..f2f1b6b 100644 --- a/wyvern/components/helpers/polars.py +++ b/wyvern/components/helpers/polars.py @@ -2,7 +2,7 @@ import polars as pl -def cast_float32_to_float64(df): +def cast_float32_to_float64(df) -> pl.DataFrame: float32_cols = [ col for col, dtype in zip(df.columns, df.dtypes) if dtype == pl.Float32 ] diff --git a/wyvern/components/models/modelbit_component.py b/wyvern/components/models/modelbit_component.py index 290949b..fc012af 100644 --- a/wyvern/components/models/modelbit_component.py +++ b/wyvern/components/models/modelbit_component.py @@ -161,13 +161,8 @@ async def build_requests( Union[WyvernEntity, BaseWyvernRequest] ] = input.entities or [input.request] target_identifiers = [entity.identifier for entity in target_entities] - target_identifier_type = target_identifiers[0].identifier_type - target_identifier_keys = [ - identifier.identifier for identifier in target_identifiers - ] features = self.get_features( - target_identifier_type, - target_identifier_keys, + target_identifiers, self.modelbit_features, )