diff --git a/examples/example_bulkinsert_csv.py b/examples/example_bulkinsert_csv.py index e3946f6de..f8effcb1e 100644 --- a/examples/example_bulkinsert_csv.py +++ b/examples/example_bulkinsert_csv.py @@ -99,15 +99,15 @@ def create_partition(collection, partition_name): def gen_csv_rowbased(num, path, partition_name, sep=","): global id_start - header = [_ID_FIELD_NAME, _JSON_FIELD_NAME, _VECTOR_FIELD_NAME, _VARCHAR_FIELD_NAME, "dynamic_field"] + header = [_ID_FIELD_NAME, _JSON_FIELD_NAME, _VECTOR_FIELD_NAME, _VARCHAR_FIELD_NAME, _DYNAMIC_FIELD_NAME] rows = [] for i in range(num): rows.append([ id_start, # id field json.dumps({"Number": id_start, "Name": "book_"+str(id_start)}), # json field - json.dumps([round(random.random(), 6) for _ in range(_DIM)]), # vector field + [round(random.random(), 6) for _ in range(_DIM)], # vector field "{}_{}".format(partition_name, id_start) if partition_name is not None else "description_{}".format(id_start), # varchar field - id_start, # no field matches this value, this value will be put into dynamic field + json.dumps({"dynamic_field": id_start}), # no field matches this value, this value will be put into dynamic field ]) id_start = id_start + 1 data = [header] + rows diff --git a/examples/example_bulkwriter.py b/examples/example_bulkwriter.py index 4cd504288..891acd227 100644 --- a/examples/example_bulkwriter.py +++ b/examples/example_bulkwriter.py @@ -484,6 +484,7 @@ def cloud_bulkinsert(): BulkFileType.JSON, BulkFileType.NUMPY, BulkFileType.PARQUET, + BulkFileType.CSV, ] schema = build_simple_collection() diff --git a/pymilvus/bulk_writer/buffer.py b/pymilvus/bulk_writer/buffer.py index 90e84bb2a..54ead0ffa 100644 --- a/pymilvus/bulk_writer/buffer.py +++ b/pymilvus/bulk_writer/buffer.py @@ -10,7 +10,6 @@ # or implied. See the License for the specific language governing permissions and limitations under # the License. -import csv import json import logging from pathlib import Path @@ -290,49 +289,71 @@ def _persist_parquet(self, local_path: str, **kwargs): def _persist_csv(self, local_path: str, **kwargs): sep = self._config.get("sep", ",") + nullkey = self._config.get("nullkey", "") header = list(self._buffer.keys()) - data = [] - data_count = len(next(iter(self._buffer.values()))) - for i in range(data_count): - row = [] - for name in header: - field_schema = self._fields[name] - - # null is not supported yet - # convert to string - if field_schema.dtype in { - DataType.JSON, - DataType.ARRAY, - DataType.SPARSE_FLOAT_VECTOR, - DataType.BINARY_VECTOR, - DataType.FLOAT_VECTOR, - }: - row.append(json.dumps(self._buffer[name][i])) - elif field_schema.dtype in {DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR}: - row.append( - json.dumps( - np.frombuffer( - self._buffer[name][i], - dtype=NUMPY_TYPE_CREATOR[field_schema.dtype.name], - ) - ) - ) - elif field_schema.dtype in {DataType.BOOL}: - row.append("true" if self._buffer[name][i] else "false") - else: - row.append(str(self._buffer[name][i])) - data.append(row) - - rows = [header, *data] + data = pd.DataFrame(columns=header) + for k, v in self._buffer.items(): + field_schema = self._fields[k] + # When using df.to_csv(arr) to write non-scalar data, + # the repr function is used to convert the data to a string. + # if the value of arr is [1.0, 2.0], repr(arr) will change with the type of arr: + # when arr is a list, the output is '[1.0, 2.0]' + # when arr is a tuple, the output is '(1.0, 2.0)' + # when arr is a np.array, the output is '[1.0 2.0]' + # we needs the output to be '[1.0, 2.0]', consistent with the array format in json + # so 1. whether make sure that arr of type + # (BINARY_VECTOR, FLOAT_VECTOR, FLOAT16_VECTOR, BFLOAT16_VECTOR) is a LIST, + # 2. or convert arr into a string using json.dumps(arr) first and then add it to df + # I choose method 2 here + if field_schema.dtype in { + DataType.SPARSE_FLOAT_VECTOR, + DataType.BINARY_VECTOR, + DataType.FLOAT_VECTOR, + }: + arr = [] + for val in v: + arr.append(json.dumps(val)) + data[k] = pd.Series(arr, dtype=np.dtype("str")) + elif field_schema.dtype in {DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR}: + # special process for float16 vector, the self._buffer stores bytes for + # float16 vector, convert the bytes to float list + dt = ( + np.dtype("bfloat16") + if (field_schema.dtype == DataType.BFLOAT16_VECTOR) + else np.dtype("float16") + ) + arr = [] + for val in v: + arr.append(json.dumps(np.frombuffer(val, dtype=dt).tolist())) + data[k] = pd.Series(arr, dtype=np.dtype("str")) + elif field_schema.dtype in { + DataType.JSON, + DataType.ARRAY, + }: + arr = [] + for val in v: + if val is None: + arr.append(nullkey) + else: + arr.append(json.dumps(val)) + data[k] = pd.Series(arr, dtype=np.dtype("str")) + elif field_schema.dtype in {DataType.BOOL}: + arr = [] + for val in v: + if val is not None: + arr.append("true" if val else "false") + data[k] = pd.Series(arr, dtype=np.dtype("str")) + else: + data[k] = pd.Series(v, dtype=NUMPY_TYPE_CREATOR[field_schema.dtype.name]) file_path = Path(local_path + ".csv") try: - with file_path.open("w", encoding="utf-8") as f: - writer = csv.writer(f, delimiter=sep) - writer.writerows(rows) + # pd.Series will convert None to np.nan, + # so we can use 'na_rep=nullkey' to replace NaN with nullkey + data.to_csv(file_path, sep=sep, na_rep=nullkey, index=False) except Exception as e: self._throw(f"Failed to persist file {file_path}, error: {e}") - logger.info(f"Successfully persist file {file_path}, row count: {len(rows)}") + logger.info("Successfully persist file %s, row count: %s", file_path, len(data)) return [str(file_path)] diff --git a/pymilvus/bulk_writer/bulk_writer.py b/pymilvus/bulk_writer/bulk_writer.py index 6668f49dd..0a4451d8c 100644 --- a/pymilvus/bulk_writer/bulk_writer.py +++ b/pymilvus/bulk_writer/bulk_writer.py @@ -199,6 +199,20 @@ def _verify_row(self, row: dict): self._throw(f"The field '{field.name}' is missed in the row") dtype = DataType(field.dtype) + + # deal with null (None) + if field.nullable and row[field.name] is None: + if ( + field.default_value is not None + and field.default_value.WhichOneof("data") is not None + ): + # set default value + data_type = field.default_value.WhichOneof("data") + row[field.name] = getattr(field.default_value, data_type) + else: + # skip field check if the field is null + continue + if dtype in { DataType.BINARY_VECTOR, DataType.FLOAT_VECTOR,