Skip to content

Commit

Permalink
Expose timeout parameter
Browse files Browse the repository at this point in the history
Signed-off-by: YunMei.Li <[email protected]>
  • Loading branch information
Bennu-Li authored Aug 2, 2021
1 parent daaf31f commit 0ab47ba
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 37 deletions.
63 changes: 37 additions & 26 deletions orm/pymilvus_orm/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,11 @@ def primary_field(self) -> FieldSchema:
"""
return self._schema.primary_field

def drop(self, **kwargs):
def drop(self, timeout=None, **kwargs):
"""
Drops the collection together with its index files.
:param kwargs:
:param timeout:
* *timeout* (``float``) --
An optional duration of time in seconds to allow for the RPC.
If timeout is set to None,
Expand All @@ -400,17 +400,17 @@ def drop(self, **kwargs):
conn = self._get_connection()
indexes = self.indexes
for index in indexes:
index.drop(**kwargs)
conn.drop_collection(self._name, timeout=kwargs.get("timeout", None))
index.drop(timeout=timeout, **kwargs)
conn.drop_collection(self._name, timeout=timeout, **kwargs)

def load(self, partition_names=None, **kwargs):
def load(self, partition_names=None, timeout=None, **kwargs):
"""
Loads the collection from disk to memory.
:param partition_names: The specified partitions to load.
:type partition_names: list[str]
:param kwargs:
:param timeout:
* *timeout* (``float``) --
An optional duration of time in seconds to allow for the RPC. If timeout
is set to None, the client keeps waiting until the server responds or error occurs.
Expand All @@ -435,15 +435,15 @@ def load(self, partition_names=None, **kwargs):
"""
conn = self._get_connection()
if partition_names is not None:
conn.load_partitions(self._name, partition_names, timeout=kwargs.get("timeout", None))
conn.load_partitions(self._name, partition_names, timeout=timeout, **kwargs)
else:
conn.load_collection(self._name, timeout=kwargs.get("timeout", None))
conn.load_collection(self._name, timeout=timeout, **kwargs)

def release(self, **kwargs):
def release(self, timeout=None, **kwargs):
"""
Releases the collection from memory.
:param kwargs:
:param timeout:
* *timeout* (``float``) --
An optional duration of time in seconds to allow for the RPC. If timeout
is set to None, the client keeps waiting until the server responds or an error occurs.
Expand All @@ -467,9 +467,9 @@ def release(self, **kwargs):
>>> collection.release() # release the collection from memory
"""
conn = self._get_connection()
conn.release_collection(self._name, timeout=kwargs.get("timeout", None))
conn.release_collection(self._name, timeout=timeout, **kwargs)

def insert(self, data, partition_name=None, **kwargs):
def insert(self, data, partition_name=None, timeout=None, **kwargs):
"""
Insert data into the collection.
Expand All @@ -481,7 +481,7 @@ def insert(self, data, partition_name=None, **kwargs):
partition
:type partition_name: str
:param kwargs:
:param timeout:
* *timeout* (``float``) --
An optional duration of time in seconds to allow for the RPC. If timeout
is set to None, the client keeps waiting until the server responds or an error occurs.
Expand Down Expand Up @@ -514,7 +514,6 @@ def insert(self, data, partition_name=None, **kwargs):
raise SchemaNotReadyException(0, ExceptionsMessage.TypeOfDataAndSchemaInconsistent)
conn = self._get_connection()
entities = Prepare.prepare_insert_data(data, self._schema)
timeout = kwargs.pop("timeout", None)
res = conn.insert(collection_name=self._name, entities=entities, ids=None,
partition_name=partition_name, timeout=timeout, **kwargs)
if kwargs.get("_async", False):
Expand Down Expand Up @@ -734,13 +733,17 @@ def create_partition(self, partition_name, description=""):
raise PartitionAlreadyExistException(0, ExceptionsMessage.PartitionAlreadyExist)
return Partition(self, partition_name, description=description)

def has_partition(self, partition_name) -> bool:
def has_partition(self, partition_name, timeout=None) -> bool:
"""
Checks if a specified partition exists.
:param partition_name: The name of the partition to check
:type partition_name: str
:param timeout: An optional duration of time in seconds to allow for the RPC. When timeout
is set to None, client waits until server response or error occur
:type timeout: float
:return bool:
Whether a specified partition exists.
Expand All @@ -762,16 +765,16 @@ def has_partition(self, partition_name) -> bool:
False
"""
conn = self._get_connection()
return conn.has_partition(self._name, partition_name)
return conn.has_partition(self._name, partition_name, timeout=timeout)

def drop_partition(self, partition_name, **kwargs):
def drop_partition(self, partition_name, timeout=None, **kwargs):
"""
Drop the partition and its corresponding index files.
:param partition_name: The name of the partition to drop.
:type partition_name: str
:param kwargs:
:param timeout:
* *timeout* (``float``) --
An optional duration of time in seconds to allow for the RPC. If timeout
is set to None, the client keeps waiting until the server responds or an error occurs.
Expand All @@ -798,7 +801,7 @@ def drop_partition(self, partition_name, **kwargs):
if self.has_partition(partition_name) is False:
raise PartitionNotExistException(0, ExceptionsMessage.PartitionNotExist)
conn = self._get_connection()
return conn.drop_partition(self._name, partition_name, timeout=kwargs.get("timeout", None))
return conn.drop_partition(self._name, partition_name, timeout=timeout, **kwargs)

@property
def indexes(self) -> list:
Expand Down Expand Up @@ -862,7 +865,7 @@ def index(self) -> Index:
return Index(self, field_name, tmp_index, construct_only=True)
raise IndexNotExistException(0, ExceptionsMessage.IndexNotExist)

def create_index(self, field_name, index_params, **kwargs) -> Index:
def create_index(self, field_name, index_params, timeout=None, **kwargs) -> Index:
"""
Creates index for a specified field. Return Index Object.
Expand All @@ -872,6 +875,10 @@ def create_index(self, field_name, index_params, **kwargs) -> Index:
:param index_params: The indexing parameters.
:type index_params: dict
:param timeout: An optional duration of time in seconds to allow for the RPC. When timeout
is set to None, client waits until server response or error occur
:type timeout: float
:raises CollectionNotExistException: If the collection does not exist.
:raises ParamError: If the index parameters are invalid.
:raises BaseException: If field does not exist.
Expand All @@ -893,12 +900,16 @@ def create_index(self, field_name, index_params, **kwargs) -> Index:
"""
conn = self._get_connection()
return conn.create_index(self._name, field_name, index_params,
timeout=kwargs.pop("timeout", None), **kwargs)
timeout=timeout, **kwargs)

def has_index(self) -> bool:
def has_index(self, timeout=None) -> bool:
"""
Checks whether a specified index exists.
:param timeout: An optional duration of time in seconds to allow for the RPC. When timeout
is set to None, client waits until server response or error occur
:type timeout: float
:return bool:
Whether the specified index exists.
Expand All @@ -919,15 +930,15 @@ def has_index(self) -> bool:
"""
conn = self._get_connection()
# TODO(yukun): Need field name, but provide index name
if conn.describe_index(self._name, "") is None:
if conn.describe_index(self._name, "", timeout=timeout) is None:
return False
return True

def drop_index(self, **kwargs):
def drop_index(self, timeout=None, **kwargs):
"""
Drop index and its corresponding index files.
:param kwargs:
:param timeout:
* *timeout* (``float``) --
An optional duration of time in seconds to allow for the RPC. If timeout
is set to None, the client keeps waiting until the server responds or an error occurs.
Expand Down Expand Up @@ -958,4 +969,4 @@ def drop_index(self, **kwargs):
tmp_index = conn.describe_index(self._name, "")
if tmp_index is not None:
index = Index(self, tmp_index['field_name'], tmp_index, construct_only=True)
index.drop(**kwargs)
index.drop(timeout=timeout, **kwargs)
8 changes: 6 additions & 2 deletions orm/pymilvus_orm/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,17 @@ def to_dict(self):
}
return _dict

def drop(self, **kwargs):
def drop(self, timeout=None, **kwargs):
"""
Drop an index and its corresponding index files.
:param timeout: An optional duration of time in seconds to allow for the RPC. When timeout
is set to None, client waits until server response or error occur
:type timeout: float
:raises IndexNotExistException: If the specified index does not exist.
"""
conn = self._get_connection()
if conn.describe_index(self._collection.name) is None:
raise IndexNotExistException(0, ExceptionsMessage.IndexNotExist)
conn.drop_index(self._collection.name, self.field_name, **kwargs)
conn.drop_index(self._collection.name, self.field_name, timeout=timeout, **kwargs)
33 changes: 24 additions & 9 deletions orm/pymilvus_orm/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,14 @@ def num_entities(self) -> int:
partition_name=self._name)
return status["row_count"]

def drop(self, **kwargs):
def drop(self, timeout=None, **kwargs):
"""
Drop the partition, as well as its corresponding index files.
:param timeout: An optional duration of time in seconds to allow for the RPC. When timeout
is set to None, client waits until server response or error occur
:type timeout: float
:raises PartitionNotExistException:
When partitoin does not exist
Expand All @@ -160,14 +164,18 @@ def drop(self, **kwargs):
>>> partition.drop()
"""
conn = self._get_connection()
if conn.has_partition(self._collection.name, self._name) is False:
if conn.has_partition(self._collection.name, self._name, timeout=timeout) is False:
raise PartitionNotExistException(0, ExceptionsMessage.PartitionNotExist)
return conn.drop_partition(self._collection.name, self._name, **kwargs)
return conn.drop_partition(self._collection.name, self._name, timeout=timeout, **kwargs)

def load(self, **kwargs):
def load(self, timeout=None, **kwargs):
"""
Load the partition from disk to memory.
:param timeout: An optional duration of time in seconds to allow for the RPC. When timeout
is set to None, client waits until server response or error occur
:type timeout: float
:raises InvalidArgumentException:
If argument is not valid
Expand All @@ -187,13 +195,17 @@ def load(self, **kwargs):
# if index_names is not None, raise Exception Not Supported
conn = self._get_connection()
if conn.has_partition(self._collection.name, self._name):
return conn.load_partitions(self._collection.name, [self._name], **kwargs)
return conn.load_partitions(self._collection.name, [self._name], timeout=timeout, **kwargs)
raise PartitionNotExistException(0, ExceptionsMessage.PartitionNotExist)

def release(self, **kwargs):
def release(self, timeout=None, **kwargs):
"""
Release the partition from memory.
:param timeout: An optional duration of time in seconds to allow for the RPC. When timeout
is set to None, client waits until server response or error occur
:type timeout: float
:raises PartitionNotExistException:
When partitoin does not exist
Expand All @@ -211,17 +223,21 @@ def release(self, **kwargs):
"""
conn = self._get_connection()
if conn.has_partition(self._collection.name, self._name):
return conn.release_partitions(self._collection.name, [self._name], **kwargs)
return conn.release_partitions(self._collection.name, [self._name], timeout=timeout, **kwargs)
raise PartitionNotExistException(0, ExceptionsMessage.PartitionNotExist)

def insert(self, data, **kwargs):
def insert(self, data, timeout=None, **kwargs):
"""
Insert data into partition.
:param data: The specified data to insert, the dimension of data needs to align with column
number
:type data: list-like(list, tuple) object or pandas.DataFrame
:param timeout: An optional duration of time in seconds to allow for the RPC. When timeout
is set to None, client waits until server response or error occur
:type timeout: float
:param kwargs:
* *timeout* (``float``) --
An optional duration of time in seconds to allow for the RPC. When timeout
Expand Down Expand Up @@ -251,7 +267,6 @@ def insert(self, data, **kwargs):
if conn.has_partition(self._collection.name, self._name) is False:
raise PartitionNotExistException(0, ExceptionsMessage.PartitionNotExist)
entities = Prepare.prepare_insert_data(data, self._collection.schema)
timeout = kwargs.pop("timeout", None)
res = conn.insert(self._collection.name, entities=entities, ids=None,
partition_name=self._name, timeout=timeout, orm=True, **kwargs)
if kwargs.get("_async", False):
Expand Down

0 comments on commit 0ab47ba

Please sign in to comment.