Skip to content

Commit

Permalink
Added chunk handling and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AlenkaF committed Aug 31, 2021
1 parent 3389cd4 commit f74a168
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 79 deletions.
114 changes: 74 additions & 40 deletions packages/vaex-core/vaex/dataframe_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,31 @@ def _from_dataframe_to_vaex(df : DataFrameObject) -> vaex.dataframe.DataFrame:
"""
Note: we need to implement/test support for bit/byte masks, chunk handling, etc.
"""
# Check number of chunks, if there's more than one we need to iterate
# For now it is set to 1
if df.num_chunks() > 1:
raise NotImplementedError

# We need a dict of columns here, with each column being a numpy array.
columns = dict()
labels = dict()
_k = _DtypeKind
for name in df.column_names():
col = df.get_column_by_name(name)

# Warning if variable name is not a string
# protocol-design-requirements No.4
if not isinstance(name, str):
raise NotImplementedError(f"Column names must be string (not {name}).")

if col.dtype[0] in (_k.INT, _k.UINT, _k.FLOAT, _k.BOOL):
# Simple numerical or bool dtype, turn into numpy array
columns[name] = convert_column_to_ndarray(col)
elif col.dtype[0] == _k.CATEGORICAL:
columns[name] = convert_categorical_column(col)
else:
raise NotImplementedError(f"Data type {col.dtype[0]} not handled yet")

dataframe = vaex.from_dict(columns)

return dataframe
# Iterate through the chunks
dataframe = []
for chunk in df.get_chunks():
# We need a dict of columns here, with each column being a numpy array.
columns = dict()
labels = dict()
_k = _DtypeKind
for name in chunk.column_names():
col = chunk.get_column_by_name(name)
# Warning if variable name is not a string
# protocol-design-requirements No.4
if not isinstance(name, str):
raise NotImplementedError(f"Column names must be string (not {name}).")
if col.dtype[0] in (_k.INT, _k.UINT, _k.FLOAT, _k.BOOL):
# Simple numerical or bool dtype, turn into numpy array
columns[name] = convert_column_to_ndarray(col)
elif col.dtype[0] == _k.CATEGORICAL:
columns[name] = convert_categorical_column(col)
else:
raise NotImplementedError(f"Data type {col.dtype[0]} not handled yet")
dataframe.append(vaex.from_dict(columns))


# Join the chunks into tuple for now
return vaex.concat(dataframe, resolver='strict')

class _DtypeKind(enum.IntEnum):
INT = 0
Expand Down Expand Up @@ -86,7 +83,6 @@ def convert_column_to_ndarray(col : ColumnObject) -> pa.Array:
x = pa.array(x, mask=mask)
else:
x = pa.array(x)

return x

def buffer_to_ndarray(_buffer, _dtype) -> np.ndarray:
Expand Down Expand Up @@ -230,7 +226,7 @@ def size(self) -> int:
"""
Size of the column, in elements.
"""
return self._col.values.size
return self._col.df.count("*")

@property
def offset(self) -> int:
Expand Down Expand Up @@ -375,22 +371,42 @@ def num_chunks(self) -> int:
"""
Return the number of chunks the column consists of.
"""
return 1
if isinstance(self._col.values, pa.ChunkedArray):
return self._col.values.num_chunks
else:
return 1

def get_chunks(self, n_chunks : Optional[int] = None) -> Iterable['_PandasColumn']:
def get_chunks(self, metadata, n_chunks : Optional[int] = None) -> Iterable['_VaexColumn']:
"""
Return an iterator yielding the chunks.
See `DataFrame.get_chunks` for details on ``n_chunks``.
"""
return {}
if n_chunks==None:
size = self.size
n_chunks = self.num_chunks()
i = self._col.df.evaluate_iterator(self._col, chunk_size=size//n_chunks)
iterator = []
for i1, i2, chunk in i:
iterator.append(_VaexColumn(self._col[i1:i2], metadata))
return iterator
elif self.num_chunks==1:
size = self.size
i = self._col.df.evaluate_iterator(self._col, chunk_size=size//n_chunks)
iterator = []
for i1, i2, chunk in i:
iterator.append(_VaexColumn(self._col[i1:i2], metadata))
return iterator

else:
raise ValueError(f'Column {self._col.expression} is already chunked.')

@property
def metadata(self, dict_is_cat) -> Dict[str, Any]:
def metadata(self, metadata) -> Dict[str, Any]:
"""
Store specific metadata of the column.
"""
# Boolean if column is categorical or not
return dict_is_cat
# Metadata about categories
return metadata

def get_data_buffer(self) -> Tuple[_VaexBuffer, Any]: # Any is for self.dtype tuple
"""
Expand Down Expand Up @@ -483,7 +499,10 @@ def num_rows(self) -> int:
return len(self._df)

def num_chunks(self) -> int:
return 1
if isinstance(self.get_column(0)._col.values, pa.ChunkedArray):
return self.get_column(0)._col.values.num_chunks
else:
return 1

def column_names(self) -> Iterable[str]:
return self._df.get_column_names()
Expand All @@ -506,11 +525,26 @@ def select_columns(self, indices: Sequence[int]) -> '_VaexDataFrame':
def select_columns_by_name(self, names: Sequence[str]) -> '_VaexDataFrame':
if not isinstance(names, collections.Sequence):
raise ValueError("`names` is not a sequence")

return {} # TODO
return self._df[names]

def get_chunks(self, n_chunks : Optional[int] = None) -> Iterable['_VaexDataFrame']:
"""
Return an iterator yielding the chunks.
"""
return {} # TODO
if n_chunks==None:
size = self.num_rows()
n_chunks = self.num_chunks()
i = self._df.evaluate_iterator(self.get_column(0)._col, chunk_size=size//n_chunks)
iterator = []
for i1, i2, chunk in i:
iterator.append(_VaexDataFrame(self._df[i1:i2]))
return iterator
elif self.num_chunks==1:
size = self.num_rows()
i = self._df.evaluate_iterator(self.get_column(0)._col, chunk_size=size//n_chunks)
iterator = []
for i1, i2, chunk in i:
iterator.append(_VaexColumn(self._df[i1:i2]))
return iterator
else:
raise ValueError("Column `self._col.expression` is already chunked.")
109 changes: 70 additions & 39 deletions tests/dataframe_protocol_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,31 +34,78 @@ def test_mixed_intfloatbool(df_factory):
assert df2.__dataframe__().get_column_by_name('y').null_count == 0
assert df2.__dataframe__().get_column_by_name('z').null_count == 0

def test_categorical():
df = vaex.from_arrays(year=[2012, 2015, 2019], weekday=[0, 4, 6])
def test_mixed_missing(df_factory_arrow):
df = df_factory_arrow(
x=np.array([True, None, False, None, True]),
y=np.array([None, 2, 0, 1, 2]),
z=np.array([9.2, 10.5, None, 11.8, None]))

df2 = _from_dataframe_to_vaex(df.__dataframe__())

assert df.__dataframe__().metadata == df2.__dataframe__().metadata

assert df['x'].tolist() == df2['x'].tolist()
assert not df2['x'].is_masked
assert df2.__dataframe__().get_column_by_name('x').null_count == 2
assert df['x'].dtype == df2['x'].dtype

assert df['y'].tolist() == df2['y'].tolist()
assert not df2['y'].is_masked
assert df2.__dataframe__().get_column_by_name('y').null_count == 1
assert df['y'].dtype == df2['y'].dtype

assert df['z'].tolist() == df2['z'].tolist()
assert not df2['z'].is_masked
assert df2.__dataframe__().get_column_by_name('z').null_count == 2
assert df['z'].dtype == df2['z'].dtype

def test_missing_from_masked(df_factory_numpy):
df = df_factory_numpy(
x=np.ma.array([1, 2, 3, 4, 0], mask=[0, 0, 0, 1, 1], dtype=int),
y=np.ma.array([1.5, 2.5, 3.5, 4.5, 0], mask=[False, True, True, True, False], dtype=float),
z=np.ma.array([True, False, True, True, True], mask=[1, 0, 0, 1, 0], dtype=bool))

df2 = _from_dataframe_to_vaex(df.__dataframe__())

assert df.__dataframe__().metadata == df2.__dataframe__().metadata

assert df['x'].tolist() == df2['x'].tolist()
assert not df2['x'].is_masked
assert df2.__dataframe__().get_column_by_name('x').null_count == 2
assert df['x'].dtype == df2['x'].dtype

assert df['y'].tolist() == df2['y'].tolist()
assert not df2['y'].is_masked
assert df2.__dataframe__().get_column_by_name('y').null_count == 3
assert df['y'].dtype == df2['y'].dtype

assert df['z'].tolist() == df2['z'].tolist()
assert not df2['z'].is_masked
assert df2.__dataframe__().get_column_by_name('z').null_count == 2
assert df['z'].dtype == df2['z'].dtype

def test_categorical_ordinal():
colors = ['red', 'blue', 'green', 'blue']
ds = vaex.from_arrays(
colors=colors,
year=[2012, 2013, 2015, 2019],
weekday=[0, 1, 4, 6])
df = ds.ordinal_encode('colors', ['red', 'green', 'blue'])
df = df.categorize('year', min_value=2012, max_value=2019)
df = df.categorize('weekday', labels=['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'])

# Some detailed testing for correctness of dtype and null handling:
col = df.__dataframe__().get_column_by_name('weekday')
assert col.dtype[0] == _DtypeKind.CATEGORICAL
assert col.describe_categorical == (False, True, {0: 'Mon', 1: 'Tue', 2: 'Wed', 3: 'Thu', 4: 'Fri', 5: 'Sat', 6: 'Sun'})
col2 = df.__dataframe__().get_column_by_name('colors')
assert col2.dtype[0] == _DtypeKind.CATEGORICAL
assert col2.describe_categorical == (False, True, {0: 'red', 1: 'green', 2: 'blue'})

df2 = _from_dataframe_to_vaex(df.__dataframe__())
assert df2['year'].tolist() == [2012, 2015, 2019]
assert df2['weekday'].tolist() == ['Mon', 'Fri', 'Sun']

def test_virtual_column(df_factory):
df = df_factory(
x=np.array([True, True, False]),
y=np.array([1, 2, 0]),
z=np.array([9.2, 10.5, 11.8]))
df.add_virtual_column("r", "sqrt(y**2 + z**2)")
df2 = _from_dataframe_to_vaex(df.__dataframe__())
assert df2.x.tolist() == df.x.tolist()
assert df2.y.tolist() == df.y.tolist()
assert df2.z.tolist() == df.z.tolist()
assert df2.r.tolist() == df.r.tolist()
assert df2['colors'].tolist() == ['red', 'blue', 'green', 'blue']
assert df2['year'].tolist() == [2012, 2013, 2015, 2019]
assert df2['weekday'].tolist() == ['Mon', 'Tue', 'Fri', 'Sun']

def test_arrow_dictionary():
indices = pa.array([0, 1, 0, 1, 2, 0, 1, 2])
Expand Down Expand Up @@ -91,27 +138,11 @@ def test_arrow_dictionary_missing():
assert df2.__dataframe__().get_column_by_name('x').null_count == 2
assert df['x'].dtype.index_type == df2['x'].dtype.index_type

def test_missing_from_masked(df_factory):
df = df_factory(
x=np.ma.array([1, 2, 3, 4, 0], mask=[0, 0, 0, 1, 1], dtype=int),
y=np.ma.array([1.5, 2.5, 3.5, 4.5, 0], mask=[False, True, True, True, False], dtype=float),
z=np.ma.array([True, False, True, True, True], mask=[1, 0, 0, 1, 0], dtype=bool))

def test_virtual_column():
df = vaex.from_arrays(
x=np.array([True, True, False]),
y=np.array([1, 2, 0]),
z=np.array([9.2, 10.5, 11.8]))
df.add_virtual_column("r", "sqrt(y**2 + z**2)")
df2 = _from_dataframe_to_vaex(df.__dataframe__())

assert df.__dataframe__().metadata == df2.__dataframe__().metadata

assert df['x'].tolist() == df2['x'].tolist()
assert not df2['x'].is_masked
assert df2.__dataframe__().get_column_by_name('x').null_count == 2
assert df['x'].dtype == df2['x'].dtype

assert df['y'].tolist() == df2['y'].tolist()
assert not df2['y'].is_masked
assert df2.__dataframe__().get_column_by_name('y').null_count == 3
assert df['y'].dtype == df2['y'].dtype

assert df['z'].tolist() == df2['z'].tolist()
assert not df2['z'].is_masked
assert df2.__dataframe__().get_column_by_name('z').null_count == 2
assert df['z'].dtype == df2['z'].dtype
assert df2.r.tolist() == df.r.tolist()

0 comments on commit f74a168

Please sign in to comment.