diff --git a/sdk/python/feast/expediagroup/vectordb/milvus_online_store.py b/sdk/python/feast/expediagroup/vectordb/milvus_online_store.py index afd37d6400..ab3055e529 100644 --- a/sdk/python/feast/expediagroup/vectordb/milvus_online_store.py +++ b/sdk/python/feast/expediagroup/vectordb/milvus_online_store.py @@ -34,7 +34,6 @@ String, UnixTimestamp, ) -from feast.usage import log_exceptions_and_usage logger = logging.getLogger(__name__) @@ -117,6 +116,7 @@ def online_write_batch( progress: Optional[Callable[[int], Any]], ) -> None: with MilvusConnectionManager(config.online_store): + self._create_collection_if_not_exists(table) print("Starting the process to batch write data into Milvus.") collection_to_load_data = Collection(table.name) rows = self._format_data_for_milvus(data, collection_to_load_data) @@ -152,7 +152,6 @@ def online_read( # results do not have timestamps return [(None, row) for row in results] - @log_exceptions_and_usage(online_store="milvus") def update( self, config: RepoConfig, @@ -164,41 +163,7 @@ def update( ): with MilvusConnectionManager(config.online_store): for table_to_keep in tables_to_keep: - collection_available = utility.has_collection(table_to_keep.name) - - if collection_available: - logger.info(f"Collection {table_to_keep.name} already exists.") - print(f"Collection {table_to_keep.name} already exists.") - else: - if not table_to_keep.schema: - raise ValueError( - f"a schema must be provided for feature_view: {table_to_keep}" - ) - - ( - schema, - indexes, - ) = self._convert_featureview_schema_to_milvus_readable( - table_to_keep, - ) - - logger.info( - f"creating collection {table_to_keep.name} with schema: {schema}" - ) - print( - f"creating collection {table_to_keep.name} with schema: {schema}" - ) - collection = Collection(name=table_to_keep.name, schema=schema) - - for field_name, index_params in indexes.items(): - collection.create_index(field_name, index_params) - - logger.info( - f"Collection {table_to_keep.name} has been created successfully." - ) - print( - f"Collection {table_to_keep.name} has been created successfully." - ) + self._create_collection_if_not_exists(table_to_keep) for table_to_delete in tables_to_delete: collection_available = utility.has_collection(table_to_delete.name) @@ -229,6 +194,40 @@ def teardown( logger.info(f"Dropping collection: {collection_name}") utility.drop_collection(collection_name) + def _create_collection_if_not_exists(self, feature_view: FeatureView): + """ + Checks whether the collection already exists and creates it based on the provided feature view. + + Parameters: + feature_view (FeatureView): the FeatureView that contains the schema. + """ + collection_available = utility.has_collection(feature_view.name) + + if collection_available: + logger.info(f"Collection {feature_view.name} already exists.") + print(f"Collection {feature_view.name} already exists.") + else: + if not feature_view.schema: + raise ValueError( + f"a schema must be provided for feature_view: {feature_view.name}" + ) + + (schema, indexes,) = self._convert_featureview_schema_to_milvus_readable( + feature_view, + ) + + logger.info( + f"creating collection {feature_view.name} with schema: {schema}" + ) + collection = Collection(name=feature_view.name, schema=schema) + + for field_name, index_params in indexes.items(): + collection.create_index(field_name, index_params) + + logger.info( + f"Collection {feature_view.name} has been created successfully." + ) + def _convert_featureview_schema_to_milvus_readable( self, feature_view: FeatureView ) -> Tuple[CollectionSchema, Dict]: diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 45d4500800..04f0f8b5d1 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -821,8 +821,6 @@ def apply( ... ) >>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view """ - # TODO: remove - print(f"Apply with: {objects}") # TODO: Add locking if not isinstance(objects, Iterable): objects = [objects] @@ -993,15 +991,6 @@ def apply( tables_to_delete: List[FeatureView] = views_to_delete + sfvs_to_delete if not partial else [] # type: ignore tables_to_keep: List[FeatureView] = views_to_update + sfvs_to_update # type: ignore - # TODO: remove - print( - f"""Update project={self.project}, - tables_to_delete={tables_to_delete}, - tables_to_keep={tables_to_keep}, - entities_to_delete={entities_to_delete if not partial else []}, - entities_to_keep={entities_to_update}, - partial={partial},""" - ) self._get_provider().update_infra( project=self.project, tables_to_delete=tables_to_delete, diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index c3917a5398..28b10c1259 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -116,8 +116,6 @@ def update_infra( ): set_usage_attribute("provider", self.__class__.__name__) - # TODO: remove - print(f"Is online store={self.online_store}") # Call update only if there is an online store if self.online_store: self.online_store.update( diff --git a/sdk/python/tests/expediagroup/test_milvus_online_store.py b/sdk/python/tests/expediagroup/test_milvus_online_store.py index cd0857292b..55ee1fa966 100644 --- a/sdk/python/tests/expediagroup/test_milvus_online_store.py +++ b/sdk/python/tests/expediagroup/test_milvus_online_store.py @@ -26,7 +26,7 @@ from feast.protos.feast.types.Value_pb2 import FloatList from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import RepoConfig -from feast.types import Array, Float32, Int64 +from feast.types import Array, Float32, Int64, String from tests.expediagroup.milvus_online_store_creator import MilvusOnlineStoreCreator logging.basicConfig(level=logging.INFO) @@ -408,11 +408,30 @@ def test_milvus_update_delete_unavailable_collection(self, repo_config, caplog): assert len(utility.list_collections()) == 0 def test_milvus_online_write_batch(self, repo_config, caplog): - self._create_collection_in_milvus(self.collection_to_write, repo_config) - + entity = Entity(name="name") feature_view = FeatureView( name=self.collection_to_write, source=SOURCE, + entities=[entity], + schema=[ + Field( + name="name", + dtype=String, + ), + Field( + name="age", + dtype=Int64, + ), + Field( + name="avg_orders_day", + dtype=Array(Float32), + tags={ + "dimensions": 5, + "index_type": "HNSW", + "index_params": '{ "M": 32, "efConstruction": 256}', + }, + ), + ], ) total_rows_to_write = 100