diff --git a/examples/example_bulkwriter.py b/examples/example_bulkwriter.py index 1b41223e7..d1d20b66e 100644 --- a/examples/example_bulkwriter.py +++ b/examples/example_bulkwriter.py @@ -295,7 +295,9 @@ def test_retrieve_imported_data(bin_vec: bool): def test_cloud_bulkinsert(): url = "https://_your_cloud_server_url_" + api_key = "_api_key_for_the_url_" cluster_id = "_your_cloud_instance_id_" + collection_name = "_collection_name_on_the_cloud_" print(f"\n===================== import files to cloud vectordb ====================") object_url = "_your_object_storage_service_url_" @@ -303,31 +305,34 @@ def test_cloud_bulkinsert(): object_url_secret_key = "_your_object_storage_service_secret_key_" resp = bulk_import( url=url, + api_key=api_key, object_url=object_url, access_key=object_url_access_key, secret_key=object_url_secret_key, cluster_id=cluster_id, - collection_name=CSV_COLLECTION_NAME, + collection_name=collection_name, ) - print(resp) + print(resp.json()) print(f"\n===================== get import job progress ====================") - job_id = resp['data']['jobId'] + job_id = resp.json()['data']['jobId'] resp = get_import_progress( url=url, + api_key=api_key, job_id=job_id, cluster_id=cluster_id, ) - print(resp) + print(resp.json()) print(f"\n===================== list import jobs ====================") resp = list_import_jobs( url=url, + api_key=api_key, cluster_id=cluster_id, page_size=10, current_page=1, ) - print(resp) + print(resp.json()) if __name__ == '__main__': @@ -351,5 +356,6 @@ def test_cloud_bulkinsert(): test_call_bulkinsert(schema, batch_files) test_retrieve_imported_data(bin_vec=True) + # to test cloud bulkinsert api, you need to apply a cloud service from Zilliz Cloud(https://zilliz.com/cloud) # test_cloud_bulkinsert() diff --git a/pymilvus/bulk_writer/bulk_import.py b/pymilvus/bulk_writer/bulk_import.py index 736fbf76c..a5ce9e54c 100644 --- a/pymilvus/bulk_writer/bulk_import.py +++ b/pymilvus/bulk_writer/bulk_import.py @@ -44,7 +44,9 @@ def _handle_response(url: str, res: json): _throw(f"Failed to request url: {url}, code: {inner_code}, message: {inner_message}") -def _post_request(url: str, api_key: str, params: {}, timeout: int = 20, **kwargs): +def _post_request( + url: str, api_key: str, params: {}, timeout: int = 20, **kwargs +) -> requests.Response: try: resp = requests.post( url=url, headers=_http_headers(api_key), json=params, timeout=timeout, **kwargs @@ -57,7 +59,9 @@ def _post_request(url: str, api_key: str, params: {}, timeout: int = 20, **kwarg _throw(f"Failed to post url: {url}, error: {err}") -def _get_request(url: str, api_key: str, params: {}, timeout: int = 20, **kwargs): +def _get_request( + url: str, api_key: str, params: {}, timeout: int = 20, **kwargs +) -> requests.Response: try: resp = requests.get( url=url, headers=_http_headers(api_key), params=params, timeout=timeout, **kwargs @@ -80,7 +84,7 @@ def bulk_import( cluster_id: str, collection_name: str, **kwargs, -): +) -> requests.Response: """call bulkinsert restful interface to import files Args: @@ -108,7 +112,9 @@ def bulk_import( return resp -def get_import_progress(url: str, api_key: str, job_id: str, cluster_id: str, **kwargs): +def get_import_progress( + url: str, api_key: str, job_id: str, cluster_id: str, **kwargs +) -> requests.Response: """get job progress Args: @@ -132,7 +138,7 @@ def get_import_progress(url: str, api_key: str, job_id: str, cluster_id: str, ** def list_import_jobs( url: str, api_key: str, cluster_id: str, page_size: int, current_page: int, **kwargs -): +) -> requests.Response: """list jobs in a cluster Args: