Skip to content

Commit

Permalink
Msudhir/add milvus read (#22)
Browse files Browse the repository at this point in the history
* initial online read implementation

* Adding online read functionality

* linting error fix

* removed utils out and added all the functionality to milvus_online_store

* changeing method types to internal

* fixing linting errors

* Added bidict and removed existing feast/milvus conversion functionality

* Removed the ValueProto redundancy in the code

* added bidict to requirements files

* Added changes to further PR comments

* removing primary_key from feast_schema in tests

* keeping formatting consistent

---------

Co-authored-by: Manisha Sudhir <[email protected]>
Co-authored-by: mbackes <[email protected]>
  • Loading branch information
3 people authored Aug 24, 2023
1 parent ffc8c83 commit 004ca49
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 77 deletions.
232 changes: 178 additions & 54 deletions sdk/python/feast/expediagroup/vectordb/milvus_online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple

import numpy as np
from bidict import bidict
from pydantic.typing import Literal
from pymilvus import (
Collection,
Expand All @@ -19,23 +20,27 @@
from feast.field import Field
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import FloatList
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel
from feast.types import (
Array,
Bytes,
FeastType,
Float32,
Float64,
Int32,
Int64,
Invalid,
String,
)
from feast.types import Array, Bytes, Float32, Float64, Int32, Int64, Invalid, String
from feast.usage import log_exceptions_and_usage

logger = logging.getLogger(__name__)

TYPE_MAPPING = bidict(
{
DataType.INT32: Int32,
DataType.INT64: Int64,
DataType.FLOAT: Float32,
DataType.DOUBLE: Float64,
DataType.STRING: String,
DataType.UNKNOWN: Invalid,
DataType.FLOAT_VECTOR: Array(Float32),
DataType.BINARY_VECTOR: Array(Bytes),
}
)


class MilvusOnlineStoreConfig(FeastConfigBaseModel):
"""Online store config for the Milvus online store"""
Expand Down Expand Up @@ -98,7 +103,7 @@ def online_write_batch(
) -> None:
with MilvusConnectionManager(config.online_store):
try:
rows = self.format_data_for_milvus(data)
rows = self._format_data_for_milvus(data)
collection_to_load_data = Collection(table.name)
collection_to_load_data.insert(rows)
# The flush call will seal any remaining segments and send them for indexing
Expand All @@ -113,9 +118,19 @@ def online_read(
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
raise NotImplementedError(
"to be implemented in https://jira.expedia.biz/browse/EAPC-7972"
)

with MilvusConnectionManager(config.online_store):

quer_expr = self._construct_milvus_query(entity_keys)
collection = Collection(table.name)
query_result = collection.query(
expr=quer_expr, output_fields=requested_features
)
results = self._convert_milvus_result_to_feast_type(
query_result, collection, requested_features
)

return results

@log_exceptions_and_usage(online_store="milvus")
def update(
Expand Down Expand Up @@ -204,7 +219,7 @@ def _convert_featureview_schema_to_milvus_readable(
for field in feast_schema:

field_name = field.name
data_type = self._feast_to_milvus_data_type(field.dtype)
data_type = self._get_milvus_type(field.dtype)
dimensions = 0

if field.tags:
Expand Down Expand Up @@ -253,10 +268,10 @@ def _data_type_is_supported_vector(self, data_type: DataType) -> bool:
whether the Milvus data type is a supported vector in this implementation
Parameters:
data_type (DataType): data type of field in schema
data_type (DataType): data type of field in schema
Returns:
bool: True is supported, False if not
bool: True is supported, False if not
"""
if data_type in [
DataType.BINARY_VECTOR,
Expand All @@ -266,31 +281,9 @@ def _data_type_is_supported_vector(self, data_type: DataType) -> bool:

return False

def _feast_to_milvus_data_type(self, feast_type: FeastType) -> DataType:
"""
Mapping for converting Feast data type to a data type compatible wih Milvus.
Parameters:
feast_type (FeastType): This is a type associated with a Feature that is stored in a FeatureView, readable with Feast.
Returns:
DataType : DataType associated with what Milvus can understand and associate its Feature types to
"""

return {
Int32: DataType.INT32,
Int64: DataType.INT64,
Float32: DataType.FLOAT,
Float64: DataType.DOUBLE,
String: DataType.STRING,
Invalid: DataType.UNKNOWN,
Array(Float32): DataType.FLOAT_VECTOR,
Array(Bytes): DataType.BINARY_VECTOR,
}.get(feast_type, None)

def format_data_for_milvus(self, feast_data):
def _format_data_for_milvus(self, feast_data):
"""
Data stored into Milvus takes the grouped representation approach where each feature value is grouped together:
Format Feast input for Milvus: Data stored into Milvus takes the grouped representation approach where each feature value is grouped together:
[[1,2], [1,3]], [John, Lucy], [3,4]]
Parameters:
Expand All @@ -303,14 +296,7 @@ def format_data_for_milvus(self, feast_data):

milvus_data = []
for entity_key, values, timestamp, created_ts in feast_data:
feature = []
for feature_name, val in values.items():
val_type = val.WhichOneof("val")
value = getattr(val, val_type)
if val_type == "float_list_val":
value = np.array(value.val)
# TODO: Check binary vector conversion
feature.append(value)
feature = self._process_values_for_milvus(values)
milvus_data.append(feature)

transformed_data = [list(item) for item in zip(*milvus_data)]
Expand All @@ -321,12 +307,12 @@ def _create_index_params(self, tags: Dict[str, str], data_type: DataType):
Parses the tags to generate the index_params needed to create the specified index
Parameters:
index_type (MilvusIndexType): the index type to be created
tags (Dict): the tags associated with the field
data_type (DateType): the data type of the field
index_type (MilvusIndexType): the index type to be created
tags (Dict): the tags associated with the field
data_type (DateType): the data type of the field
Returns:
(Dict): a dictionary formatted for the create_index params argument
(Dict): a dictionary formatted for the create_index params argument
"""
valid_indexes = IndexType._member_map_
index_type_tag = tags.get("index_type", "").upper().strip("BIN_")
Expand Down Expand Up @@ -363,3 +349,141 @@ def _create_index_params(self, tags: Dict[str, str], data_type: DataType):
"index_type": index_type_name,
"params": params,
}

def _convert_milvus_result_to_feast_type(
self, milvus_result, collection, features_to_request
):
"""
Convert Milvus result to Feast types.
Parameters:
milvus_result (List[Dict[str, any]]): Milvus query result.
collection (Collection): Milvus collection schema.
features_to_request (List[str]): Features to request from Milvus.
Returns:
List[Dict[str, ValueProto]]: Processed data with Feast types.
"""

# Here we are constructing the feature list to request from Milvus with their relevant types

features_with_types = list(tuple())
for field in collection.schema.fields:
if field.name in features_to_request:
features_with_types.append(
(field.name, self._get_feast_type(field.dtype))
)

feast_type_result = []
prefix = "valuetype."

for row in milvus_result:
result_row = {}
for feature, feast_type in features_with_types:

value_proto = ValueProto()
feature_value = row[feature]

if feature_value:
# Doing some pre-processing here to remove prefix
value_type_method = f"{feast_type.to_value_type()}_val".lower()
if value_type_method.startswith(prefix):
value_type_method = value_type_method[len(prefix) :]
value_proto = self._create_value_proto(
value_proto, feature_value, value_type_method
)
result_row[feature] = value_proto
# Append result after conversion to Feast Type
feast_type_result.append(result_row)
return feast_type_result

def _create_value_proto(self, val_proto, feature_val, value_type) -> ValueProto:
"""
Construct Value Proto so that Feast can interpret Milvus results
Parameters:
val_proto (ValueProto): Initialised Value Proto
feature_val (Union[list, int, str, double, float, bool, bytes]): A row/ an item in the result that Milvus returns.
value_type (Str): Feast Value type; example: int64_val, float_val, etc.
Returns:
val_proto (ValueProto): Constructed result that Feast can understand.
"""

if value_type == "float_list_val":
val_proto = ValueProto(float_list_val=FloatList(val=feature_val))
else:
setattr(val_proto, value_type, feature_val)
return val_proto

def _construct_milvus_query(self, entities) -> str:
"""
Construct a Milvus query expression based on entity_keys provided.
Parameters:
entities (List[Entity]): List of entities with join keys and values.
Returns:
str: Constructed Milvus query expression.
"""

milvus_query_expr = ""
entity_join_key = []
values_to_search = []

for entity in entities:
for key in entity.join_keys:
entity_join_key.append(key)
for value in entity.entity_values:
value_to_search = self._get_value_to_search_in_milvus(value)
values_to_search.append(value_to_search)

# TODO: Enable multiple join key support. Currently only supporting a single primary key/ join key. This is a limitation in Feast.
milvus_query_expr = f"{entity_join_key[0]} in {values_to_search}"

return milvus_query_expr

def _process_values_for_milvus(self, values) -> List:
"""
Process values to prepare them for using in Milvus.
Parameters:
values: (Dict[str, ValueProto]): Dictionary of values from Feast data.
Returns:
(List): Processed feature values ready for storing in Milvus.
"""
feature = []
for feature_name, val in values.items():
value = self._get_value_to_search_in_milvus(val)
feature.append(value)
return feature

def _get_value_to_search_in_milvus(self, value) -> Any:
"""
Process a value to prepare it for searching in Milvus.
Parameters:
value (ValueProto): A value from Feast data.
Returns:
value (Any): Processed value ready for Milvus searching.
"""
val_type = value.WhichOneof("val")
if val_type == "float_list_val":
value = np.array(value.float_list_val.val)
else:
value = getattr(value, val_type)
return value

def _get_milvus_type(self, feast_type) -> DataType:
"""
Convert Feast type to Milvus type using the TYPE_MAPPING bidict.
"""
return TYPE_MAPPING.inverse.get(feast_type, None)

def _get_feast_type(self, milvus_type) -> object:
"""
Convert Milvus type to Feast type using the TYPE_MAPPING bidict.
"""
return TYPE_MAPPING.get(milvus_type, None)
2 changes: 2 additions & 0 deletions sdk/python/requirements/py3.10-ci-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ backcall==0.2.0
# via ipython
beautifulsoup4==4.12.2
# via nbconvert
bidict==0.22.1
# via feast (setup.py)
black==22.12.0
# via eg-feast (setup.py)
bleach==6.0.0
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/requirements/py3.10-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ attrs==23.1.0
# via
# bowler
# jsonschema
bidict==0.22.1
# via feast (setup.py)
bowler==0.9.0
# via feast (setup.py)
certifi==2023.5.7
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/requirements/py3.8-ci-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ backports-zoneinfo==0.2.1;python_version<"3.9"
# tzlocal
beautifulsoup4==4.12.2
# via nbconvert
bidict==0.22.1
# via feast (setup.py)
black==22.12.0
# via feast (setup.py)
bleach==6.0.0
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/requirements/py3.8-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ attrs==23.1.0
# via
# bowler
# jsonschema
bidict==0.22.1
# via feast (setup.py)
bowler==0.9.0
# via feast (setup.py)
certifi==2023.5.7
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/requirements/py3.9-ci-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ backcall==0.2.0
# via ipython
beautifulsoup4==4.12.2
# via nbconvert
bidict==0.22.1
# via feast (setup.py)
black==22.12.0
# via eg-feast (setup.py)
bleach==6.0.0
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/requirements/py3.9-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ attrs==23.1.0
# via
# bowler
# jsonschema
bidict==0.22.1
# via feast (setup.py)
bowler==0.9.0
# via feast (setup.py)
certifi==2023.5.7
Expand Down
Loading

0 comments on commit 004ca49

Please sign in to comment.