diff --git a/packages/vaex-core/vaex/dataframe_protocol.py b/packages/vaex-core/vaex/dataframe_protocol.py index bdac8744b8..78ed5db9ab 100644 --- a/packages/vaex-core/vaex/dataframe_protocol.py +++ b/packages/vaex-core/vaex/dataframe_protocol.py @@ -27,34 +27,31 @@ def _from_dataframe_to_vaex(df : DataFrameObject) -> vaex.dataframe.DataFrame: """ Note: we need to implement/test support for bit/byte masks, chunk handling, etc. """ - # Check number of chunks, if there's more than one we need to iterate - # For now it is set to 1 - if df.num_chunks() > 1: - raise NotImplementedError - - # We need a dict of columns here, with each column being a numpy array. - columns = dict() - labels = dict() - _k = _DtypeKind - for name in df.column_names(): - col = df.get_column_by_name(name) - - # Warning if variable name is not a string - # protocol-design-requirements No.4 - if not isinstance(name, str): - raise NotImplementedError(f"Column names must be string (not {name}).") - - if col.dtype[0] in (_k.INT, _k.UINT, _k.FLOAT, _k.BOOL): - # Simple numerical or bool dtype, turn into numpy array - columns[name] = convert_column_to_ndarray(col) - elif col.dtype[0] == _k.CATEGORICAL: - columns[name] = convert_categorical_column(col) - else: - raise NotImplementedError(f"Data type {col.dtype[0]} not handled yet") - - dataframe = vaex.from_dict(columns) - - return dataframe + # Iterate through the chunks + dataframe = [] + for chunk in df.get_chunks(): + # We need a dict of columns here, with each column being a numpy array. + columns = dict() + labels = dict() + _k = _DtypeKind + for name in chunk.column_names(): + col = chunk.get_column_by_name(name) + # Warning if variable name is not a string + # protocol-design-requirements No.4 + if not isinstance(name, str): + raise NotImplementedError(f"Column names must be string (not {name}).") + if col.dtype[0] in (_k.INT, _k.UINT, _k.FLOAT, _k.BOOL): + # Simple numerical or bool dtype, turn into numpy array + columns[name] = convert_column_to_ndarray(col) + elif col.dtype[0] == _k.CATEGORICAL: + columns[name] = convert_categorical_column(col) + else: + raise NotImplementedError(f"Data type {col.dtype[0]} not handled yet") + dataframe.append(vaex.from_dict(columns)) + + + # Join the chunks into tuple for now + return vaex.concat(dataframe, resolver='strict') class _DtypeKind(enum.IntEnum): INT = 0 @@ -86,7 +83,6 @@ def convert_column_to_ndarray(col : ColumnObject) -> pa.Array: x = pa.array(x, mask=mask) else: x = pa.array(x) - return x def buffer_to_ndarray(_buffer, _dtype) -> np.ndarray: @@ -230,7 +226,7 @@ def size(self) -> int: """ Size of the column, in elements. """ - return self._col.values.size + return self._col.df.count("*") @property def offset(self) -> int: @@ -375,22 +371,42 @@ def num_chunks(self) -> int: """ Return the number of chunks the column consists of. """ - return 1 + if isinstance(self._col.values, pa.ChunkedArray): + return self._col.values.num_chunks + else: + return 1 - def get_chunks(self, n_chunks : Optional[int] = None) -> Iterable['_PandasColumn']: + def get_chunks(self, metadata, n_chunks : Optional[int] = None) -> Iterable['_VaexColumn']: """ Return an iterator yielding the chunks. See `DataFrame.get_chunks` for details on ``n_chunks``. """ - return {} + if n_chunks==None: + size = self.size + n_chunks = self.num_chunks() + i = self._col.df.evaluate_iterator(self._col, chunk_size=size//n_chunks) + iterator = [] + for i1, i2, chunk in i: + iterator.append(_VaexColumn(self._col[i1:i2], metadata)) + return iterator + elif self.num_chunks==1: + size = self.size + i = self._col.df.evaluate_iterator(self._col, chunk_size=size//n_chunks) + iterator = [] + for i1, i2, chunk in i: + iterator.append(_VaexColumn(self._col[i1:i2], metadata)) + return iterator + + else: + raise ValueError(f'Column {self._col.expression} is already chunked.') @property - def metadata(self, dict_is_cat) -> Dict[str, Any]: + def metadata(self, metadata) -> Dict[str, Any]: """ Store specific metadata of the column. """ - # Boolean if column is categorical or not - return dict_is_cat + # Metadata about categories + return metadata def get_data_buffer(self) -> Tuple[_VaexBuffer, Any]: # Any is for self.dtype tuple """ @@ -483,7 +499,10 @@ def num_rows(self) -> int: return len(self._df) def num_chunks(self) -> int: - return 1 + if isinstance(self.get_column(0)._col.values, pa.ChunkedArray): + return self.get_column(0)._col.values.num_chunks + else: + return 1 def column_names(self) -> Iterable[str]: return self._df.get_column_names() @@ -506,11 +525,26 @@ def select_columns(self, indices: Sequence[int]) -> '_VaexDataFrame': def select_columns_by_name(self, names: Sequence[str]) -> '_VaexDataFrame': if not isinstance(names, collections.Sequence): raise ValueError("`names` is not a sequence") - - return {} # TODO + return self._df[names] def get_chunks(self, n_chunks : Optional[int] = None) -> Iterable['_VaexDataFrame']: """ Return an iterator yielding the chunks. """ - return {} # TODO \ No newline at end of file + if n_chunks==None: + size = self.num_rows() + n_chunks = self.num_chunks() + i = self._df.evaluate_iterator(self.get_column(0)._col, chunk_size=size//n_chunks) + iterator = [] + for i1, i2, chunk in i: + iterator.append(_VaexDataFrame(self._df[i1:i2])) + return iterator + elif self.num_chunks==1: + size = self.num_rows() + i = self._df.evaluate_iterator(self.get_column(0)._col, chunk_size=size//n_chunks) + iterator = [] + for i1, i2, chunk in i: + iterator.append(_VaexColumn(self._df[i1:i2])) + return iterator + else: + raise ValueError("Column `self._col.expression` is already chunked.") \ No newline at end of file diff --git a/tests/dataframe_protocol_test.py b/tests/dataframe_protocol_test.py index b2ad80209d..3837562766 100644 --- a/tests/dataframe_protocol_test.py +++ b/tests/dataframe_protocol_test.py @@ -34,8 +34,63 @@ def test_mixed_intfloatbool(df_factory): assert df2.__dataframe__().get_column_by_name('y').null_count == 0 assert df2.__dataframe__().get_column_by_name('z').null_count == 0 -def test_categorical(): - df = vaex.from_arrays(year=[2012, 2015, 2019], weekday=[0, 4, 6]) +def test_mixed_missing(df_factory_arrow): + df = df_factory_arrow( + x=np.array([True, None, False, None, True]), + y=np.array([None, 2, 0, 1, 2]), + z=np.array([9.2, 10.5, None, 11.8, None])) + + df2 = _from_dataframe_to_vaex(df.__dataframe__()) + + assert df.__dataframe__().metadata == df2.__dataframe__().metadata + + assert df['x'].tolist() == df2['x'].tolist() + assert not df2['x'].is_masked + assert df2.__dataframe__().get_column_by_name('x').null_count == 2 + assert df['x'].dtype == df2['x'].dtype + + assert df['y'].tolist() == df2['y'].tolist() + assert not df2['y'].is_masked + assert df2.__dataframe__().get_column_by_name('y').null_count == 1 + assert df['y'].dtype == df2['y'].dtype + + assert df['z'].tolist() == df2['z'].tolist() + assert not df2['z'].is_masked + assert df2.__dataframe__().get_column_by_name('z').null_count == 2 + assert df['z'].dtype == df2['z'].dtype + +def test_missing_from_masked(df_factory_numpy): + df = df_factory_numpy( + x=np.ma.array([1, 2, 3, 4, 0], mask=[0, 0, 0, 1, 1], dtype=int), + y=np.ma.array([1.5, 2.5, 3.5, 4.5, 0], mask=[False, True, True, True, False], dtype=float), + z=np.ma.array([True, False, True, True, True], mask=[1, 0, 0, 1, 0], dtype=bool)) + + df2 = _from_dataframe_to_vaex(df.__dataframe__()) + + assert df.__dataframe__().metadata == df2.__dataframe__().metadata + + assert df['x'].tolist() == df2['x'].tolist() + assert not df2['x'].is_masked + assert df2.__dataframe__().get_column_by_name('x').null_count == 2 + assert df['x'].dtype == df2['x'].dtype + + assert df['y'].tolist() == df2['y'].tolist() + assert not df2['y'].is_masked + assert df2.__dataframe__().get_column_by_name('y').null_count == 3 + assert df['y'].dtype == df2['y'].dtype + + assert df['z'].tolist() == df2['z'].tolist() + assert not df2['z'].is_masked + assert df2.__dataframe__().get_column_by_name('z').null_count == 2 + assert df['z'].dtype == df2['z'].dtype + +def test_categorical_ordinal(): + colors = ['red', 'blue', 'green', 'blue'] + ds = vaex.from_arrays( + colors=colors, + year=[2012, 2013, 2015, 2019], + weekday=[0, 1, 4, 6]) + df = ds.ordinal_encode('colors', ['red', 'green', 'blue']) df = df.categorize('year', min_value=2012, max_value=2019) df = df.categorize('weekday', labels=['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']) @@ -43,22 +98,14 @@ def test_categorical(): col = df.__dataframe__().get_column_by_name('weekday') assert col.dtype[0] == _DtypeKind.CATEGORICAL assert col.describe_categorical == (False, True, {0: 'Mon', 1: 'Tue', 2: 'Wed', 3: 'Thu', 4: 'Fri', 5: 'Sat', 6: 'Sun'}) + col2 = df.__dataframe__().get_column_by_name('colors') + assert col2.dtype[0] == _DtypeKind.CATEGORICAL + assert col2.describe_categorical == (False, True, {0: 'red', 1: 'green', 2: 'blue'}) df2 = _from_dataframe_to_vaex(df.__dataframe__()) - assert df2['year'].tolist() == [2012, 2015, 2019] - assert df2['weekday'].tolist() == ['Mon', 'Fri', 'Sun'] - -def test_virtual_column(df_factory): - df = df_factory( - x=np.array([True, True, False]), - y=np.array([1, 2, 0]), - z=np.array([9.2, 10.5, 11.8])) - df.add_virtual_column("r", "sqrt(y**2 + z**2)") - df2 = _from_dataframe_to_vaex(df.__dataframe__()) - assert df2.x.tolist() == df.x.tolist() - assert df2.y.tolist() == df.y.tolist() - assert df2.z.tolist() == df.z.tolist() - assert df2.r.tolist() == df.r.tolist() + assert df2['colors'].tolist() == ['red', 'blue', 'green', 'blue'] + assert df2['year'].tolist() == [2012, 2013, 2015, 2019] + assert df2['weekday'].tolist() == ['Mon', 'Tue', 'Fri', 'Sun'] def test_arrow_dictionary(): indices = pa.array([0, 1, 0, 1, 2, 0, 1, 2]) @@ -91,27 +138,11 @@ def test_arrow_dictionary_missing(): assert df2.__dataframe__().get_column_by_name('x').null_count == 2 assert df['x'].dtype.index_type == df2['x'].dtype.index_type -def test_missing_from_masked(df_factory): - df = df_factory( - x=np.ma.array([1, 2, 3, 4, 0], mask=[0, 0, 0, 1, 1], dtype=int), - y=np.ma.array([1.5, 2.5, 3.5, 4.5, 0], mask=[False, True, True, True, False], dtype=float), - z=np.ma.array([True, False, True, True, True], mask=[1, 0, 0, 1, 0], dtype=bool)) - +def test_virtual_column(): + df = vaex.from_arrays( + x=np.array([True, True, False]), + y=np.array([1, 2, 0]), + z=np.array([9.2, 10.5, 11.8])) + df.add_virtual_column("r", "sqrt(y**2 + z**2)") df2 = _from_dataframe_to_vaex(df.__dataframe__()) - - assert df.__dataframe__().metadata == df2.__dataframe__().metadata - - assert df['x'].tolist() == df2['x'].tolist() - assert not df2['x'].is_masked - assert df2.__dataframe__().get_column_by_name('x').null_count == 2 - assert df['x'].dtype == df2['x'].dtype - - assert df['y'].tolist() == df2['y'].tolist() - assert not df2['y'].is_masked - assert df2.__dataframe__().get_column_by_name('y').null_count == 3 - assert df['y'].dtype == df2['y'].dtype - - assert df['z'].tolist() == df2['z'].tolist() - assert not df2['z'].is_masked - assert df2.__dataframe__().get_column_by_name('z').null_count == 2 - assert df['z'].dtype == df2['z'].dtype \ No newline at end of file + assert df2.r.tolist() == df.r.tolist()