Skip to content

Commit

Permalink
Merge pull request #30 from ExpediaGroup/feature/create_collection_on…
Browse files Browse the repository at this point in the history
…_write

temporary workaround to create a collection on write if it not exists
  • Loading branch information
michaelbackes authored Sep 5, 2023
2 parents 33b16ce + 31f8cb1 commit 24ffa98
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 53 deletions.
73 changes: 36 additions & 37 deletions sdk/python/feast/expediagroup/vectordb/milvus_online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
String,
UnixTimestamp,
)
from feast.usage import log_exceptions_and_usage

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -117,6 +116,7 @@ def online_write_batch(
progress: Optional[Callable[[int], Any]],
) -> None:
with MilvusConnectionManager(config.online_store):
self._create_collection_if_not_exists(table)
print("Starting the process to batch write data into Milvus.")
collection_to_load_data = Collection(table.name)
rows = self._format_data_for_milvus(data, collection_to_load_data)
Expand Down Expand Up @@ -152,7 +152,6 @@ def online_read(
# results do not have timestamps
return [(None, row) for row in results]

@log_exceptions_and_usage(online_store="milvus")
def update(
self,
config: RepoConfig,
Expand All @@ -164,41 +163,7 @@ def update(
):
with MilvusConnectionManager(config.online_store):
for table_to_keep in tables_to_keep:
collection_available = utility.has_collection(table_to_keep.name)

if collection_available:
logger.info(f"Collection {table_to_keep.name} already exists.")
print(f"Collection {table_to_keep.name} already exists.")
else:
if not table_to_keep.schema:
raise ValueError(
f"a schema must be provided for feature_view: {table_to_keep}"
)

(
schema,
indexes,
) = self._convert_featureview_schema_to_milvus_readable(
table_to_keep,
)

logger.info(
f"creating collection {table_to_keep.name} with schema: {schema}"
)
print(
f"creating collection {table_to_keep.name} with schema: {schema}"
)
collection = Collection(name=table_to_keep.name, schema=schema)

for field_name, index_params in indexes.items():
collection.create_index(field_name, index_params)

logger.info(
f"Collection {table_to_keep.name} has been created successfully."
)
print(
f"Collection {table_to_keep.name} has been created successfully."
)
self._create_collection_if_not_exists(table_to_keep)

for table_to_delete in tables_to_delete:
collection_available = utility.has_collection(table_to_delete.name)
Expand Down Expand Up @@ -229,6 +194,40 @@ def teardown(
logger.info(f"Dropping collection: {collection_name}")
utility.drop_collection(collection_name)

def _create_collection_if_not_exists(self, feature_view: FeatureView):
"""
Checks whether the collection already exists and creates it based on the provided feature view.
Parameters:
feature_view (FeatureView): the FeatureView that contains the schema.
"""
collection_available = utility.has_collection(feature_view.name)

if collection_available:
logger.info(f"Collection {feature_view.name} already exists.")
print(f"Collection {feature_view.name} already exists.")
else:
if not feature_view.schema:
raise ValueError(
f"a schema must be provided for feature_view: {feature_view.name}"
)

(schema, indexes,) = self._convert_featureview_schema_to_milvus_readable(
feature_view,
)

logger.info(
f"creating collection {feature_view.name} with schema: {schema}"
)
collection = Collection(name=feature_view.name, schema=schema)

for field_name, index_params in indexes.items():
collection.create_index(field_name, index_params)

logger.info(
f"Collection {feature_view.name} has been created successfully."
)

def _convert_featureview_schema_to_milvus_readable(
self, feature_view: FeatureView
) -> Tuple[CollectionSchema, Dict]:
Expand Down
11 changes: 0 additions & 11 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,8 +821,6 @@ def apply(
... )
>>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view
"""
# TODO: remove
print(f"Apply with: {objects}")
# TODO: Add locking
if not isinstance(objects, Iterable):
objects = [objects]
Expand Down Expand Up @@ -993,15 +991,6 @@ def apply(
tables_to_delete: List[FeatureView] = views_to_delete + sfvs_to_delete if not partial else [] # type: ignore
tables_to_keep: List[FeatureView] = views_to_update + sfvs_to_update # type: ignore

# TODO: remove
print(
f"""Update project={self.project},
tables_to_delete={tables_to_delete},
tables_to_keep={tables_to_keep},
entities_to_delete={entities_to_delete if not partial else []},
entities_to_keep={entities_to_update},
partial={partial},"""
)
self._get_provider().update_infra(
project=self.project,
tables_to_delete=tables_to_delete,
Expand Down
2 changes: 0 additions & 2 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ def update_infra(
):
set_usage_attribute("provider", self.__class__.__name__)

# TODO: remove
print(f"Is online store={self.online_store}")
# Call update only if there is an online store
if self.online_store:
self.online_store.update(
Expand Down
25 changes: 22 additions & 3 deletions sdk/python/tests/expediagroup/test_milvus_online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from feast.protos.feast.types.Value_pb2 import FloatList
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RepoConfig
from feast.types import Array, Float32, Int64
from feast.types import Array, Float32, Int64, String
from tests.expediagroup.milvus_online_store_creator import MilvusOnlineStoreCreator

logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -408,11 +408,30 @@ def test_milvus_update_delete_unavailable_collection(self, repo_config, caplog):
assert len(utility.list_collections()) == 0

def test_milvus_online_write_batch(self, repo_config, caplog):
self._create_collection_in_milvus(self.collection_to_write, repo_config)

entity = Entity(name="name")
feature_view = FeatureView(
name=self.collection_to_write,
source=SOURCE,
entities=[entity],
schema=[
Field(
name="name",
dtype=String,
),
Field(
name="age",
dtype=Int64,
),
Field(
name="avg_orders_day",
dtype=Array(Float32),
tags={
"dimensions": 5,
"index_type": "HNSW",
"index_params": '{ "M": 32, "efConstruction": 256}',
},
),
],
)

total_rows_to_write = 100
Expand Down

0 comments on commit 24ffa98

Please sign in to comment.