Skip to content

Commit

Permalink
Add missing value handling
Browse files Browse the repository at this point in the history
  • Loading branch information
AlenkaF committed Aug 31, 2021
1 parent 143f96f commit 3389cd4
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 56 deletions.
117 changes: 68 additions & 49 deletions packages/vaex-core/vaex/dataframe_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,29 @@ class _DtypeKind(enum.IntEnum):
DATETIME = 22
CATEGORICAL = 23

def convert_column_to_ndarray(col : ColumnObject) -> np.ndarray:
def convert_column_to_ndarray(col : ColumnObject) -> pa.Array:
"""
Convert an int, uint, float or bool column to a numpy array
Convert an int, uint, float or bool column to an arrow array
"""
if col.offset != 0:
raise NotImplementedError("column.offset > 0 not handled yet")

if col.describe_null[0] not in (0, 1):
raise NotImplementedError("Null values represented as masks or "
raise NotImplementedError("column.offset > 0 not handled yet")
if col.describe_null[0] not in (0, 1, 3, 4):
raise NotImplementedError("Null values represented as"
"sentinel values not handled yet")

_buffer, _dtype = col.get_data_buffer()
return buffer_to_ndarray(_buffer, _dtype)
x = buffer_to_ndarray(_buffer, _dtype)

# If there are any missing data with mask, apply the mask to the data
if col.describe_null[0] in (3, 4) and col.null_count>0:
mask_buffer, mask_dtype = col.get_mask()
mask = buffer_to_ndarray(mask_buffer, mask_dtype)
x = pa.array(x, mask=mask)
else:
x = pa.array(x)

return x

def buffer_to_ndarray(_buffer, _dtype) -> np.ndarray:
# Handle the dtype
Expand Down Expand Up @@ -105,25 +115,32 @@ def buffer_to_ndarray(_buffer, _dtype) -> np.ndarray:

return x

def convert_categorical_column(col : ColumnObject) -> Tuple[np.ndarray, np.ndarray]:
def convert_categorical_column(col : ColumnObject) -> pa.DictionaryArray:
"""
Convert a categorical column to a numpy array of codes, values and categories/labels
Convert a categorical column to an arrow dictionary
"""
ordered, is_dict, mapping = col.describe_categorical
if not is_dict:
raise NotImplementedError('Non-dictionary categoricals not supported yet')

categories = np.asarray(list(mapping.values()))
codes_buffer, codes_dtype = col.get_data_buffer()
codes = buffer_to_ndarray(codes_buffer, codes_dtype)

if col.describe_null[0] not in (0, 1):
raise NotImplementedError("Null values represented as masks or "
"sentinel values not handled yet")

if col.describe_null[0] == 2: # sentinel value
codes = pd.Series(codes) # TODO: can we do without Pandas?
sentinel = col.describe_null[1]
codes[codes == sentinel] = None

indices = pa.array(codes)
dictionary = pa.array(categories)
values = pa.DictionaryArray.from_arrays(indices, dictionary)

if col.describe_null[0] in (3, 4) and col.null_count>0: # masked missing values
mask_buffer, mask_dtype = col.get_mask()
mask = buffer_to_ndarray(mask_buffer, mask_dtype)
values = pa.DictionaryArray.from_arrays((pa.array(codes, mask=mask)), dictionary)
else:
values = pa.DictionaryArray.from_arrays(indices, dictionary)

return values

Expand Down Expand Up @@ -340,20 +357,9 @@ def describe_null(self) -> Tuple[int, Any]:
_k = _DtypeKind
kind = self.dtype[0]
value = None
if kind == _k.FLOAT:
null = 1 # np.nan
elif kind == _k.DATETIME:
null = 1 # np.datetime64('NaT')
elif kind in (_k.INT, _k.UINT, _k.BOOL):
# TODO: check if extension dtypes are used once support for them is
# implemented in this procotol code
null = 0 # integer and boolean dtypes are non-nullable
elif kind == _k.CATEGORICAL:
# Null values for categoricals are stored as `-1` sentinel values
# in the category date (e.g., `col.values.codes` is int8 np.ndarray)
null = 2
value = -1
else:
if kind in (_k.INT, _k.UINT, _k.FLOAT, _k.BOOL, _k.CATEGORICAL):
null = 3
else:
raise NotImplementedError(f'Data type {self.dtype} not yet supported')

return null, value
Expand All @@ -363,7 +369,7 @@ def null_count(self) -> int:
"""
Number of null elements. Should always be known.
"""
return {}
return self._col.countmissing()

def num_chunks(self) -> int:
"""
Expand Down Expand Up @@ -392,40 +398,53 @@ def get_data_buffer(self) -> Tuple[_VaexBuffer, Any]: # Any is for self.dtype t
"""
_k = _DtypeKind
if self.dtype[0] in (_k.INT, _k.UINT, _k.FLOAT, _k.BOOL):
buffer = _VaexBuffer(self._col.to_numpy())
# If arrow array is boolean .to_numpy changes values for some reason
# For that reason data is transferred to numpy through .tolist
if self.dtype[0] == _k.BOOL and isinstance(self._col.values, (pa.Array, pa.ChunkedArray)):
buffer = _VaexBuffer(np.array(self._col.tolist(), dtype=bool))
else:
buffer = _VaexBuffer(self._col.to_numpy())
dtype = self.dtype
elif self.dtype[0] == _k.CATEGORICAL:
elif self.dtype[0] == _k.CATEGORICAL:
# TODO: Use expression.codes (https://github.com/vaexio/vaex/pull/1503), when merged
bool_c = False # If it is external (call from_dataframe) _dtype_from_vaexdtype must give data dtype
if isinstance(self._col.values, (pa.DictionaryArray)):
codes = self._col.values.indices.to_numpy()
dtype = self._dtype_from_vaexdtype(self._col.dtype.index_type, bool_c)
# If indices from arrow dict are used something funny comes out from the buffer
# I have to create a separate Vaex dataframe containing the indices column
# and then transfer it through the buffer
# TODO: try to optimize this (maybe expressions.codes (#1503) will solve this)
name = self._col.expression
some_dict = {}
some_dict[name] = self._col.evaluate().indices
between = vaex.from_arrays(**some_dict)
buffer = _VaexBuffer(between[name].to_numpy())
dtype = self._dtype_from_vaexdtype(between[name].dtype, bool_c)
else:
codes = self._col.values
# if codes are not real codes but values = labels
# In case of Vaex categorize
# if codes are not real codes but values (= labels)
if min(codes)!=0:
for i in self._col.values:
codes[np.where(codes==i)] = np.where(self.labels == i)
dtype = self._dtype_from_vaexdtype(self._col.dtype, bool_c)
buffer = _VaexBuffer(codes)
buffer = _VaexBuffer(codes)
else:
raise NotImplementedError(f"Data type {self._col.dtype} not handled yet")

return buffer, dtype

def get_mask(self) -> _VaexBuffer:
def get_mask(self) -> Tuple[_VaexBuffer, Any]:
"""
Return the buffer containing the mask values indicating missing data (not handled yet).
Raises RuntimeError if null representation is not a bit or byte mask.
Return the buffer containing the mask values indicating missing data.
"""
null, value = self.describe_null
if null == 0:
msg = "This column is non-nullable so does not have a mask"
elif null == 1:
msg = "This column uses NaN as null so does not have a separate mask"
mask = self._col.ismissing()
if isinstance(self._col.values, (pa.Array, pa.ChunkedArray)):
data = np.array(mask.tolist())
else:
raise NotImplementedError('See self.describe_null')

raise RuntimeError(msg)
data = mask.to_numpy()
buffer = _VaexBuffer(data)
dtype = self._dtype_from_vaexdtype(mask.dtype, False)

return buffer, dtype

class _VaexDataFrame:
"""
Expand Down
63 changes: 56 additions & 7 deletions tests/dataframe_protocol_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,29 @@ def test_float_only(df_factory):
df2 = _from_dataframe_to_vaex(df.__dataframe__())
assert df2.x.tolist() == df.x.tolist()
assert df2.y.tolist() == df.y.tolist()
assert df2.__dataframe__().get_column_by_name('x').null_count == 0
assert df2.__dataframe__().get_column_by_name('y').null_count == 0

def test_mixed_intfloat(df_factory):
df = df_factory(x=[1, 2, 0], y=[9.2, 10.5, 11.8])
df2 = _from_dataframe_to_vaex(df.__dataframe__())
assert df2.x.tolist() == df.x.tolist()
assert df2.y.tolist() == df.y.tolist()
assert df2.__dataframe__().get_column_by_name('x').null_count == 0
assert df2.__dataframe__().get_column_by_name('y').null_count == 0

def test_mixed_intfloatbool():
df = vaex.from_arrays(
def test_mixed_intfloatbool(df_factory):
df = df_factory(
x=np.array([True, True, False]),
y=np.array([1, 2, 0]),
z=np.array([9.2, 10.5, 11.8]))
df2 = _from_dataframe_to_vaex(df.__dataframe__())
assert df2.x.tolist() == df.x.tolist()
assert df2.y.tolist() == df.y.tolist()
assert df2.z.tolist() == df.z.tolist()
assert df2.__dataframe__().get_column_by_name('x').null_count == 0
assert df2.__dataframe__().get_column_by_name('y').null_count == 0
assert df2.__dataframe__().get_column_by_name('z').null_count == 0

def test_categorical():
df = vaex.from_arrays(year=[2012, 2015, 2019], weekday=[0, 4, 6])
Expand All @@ -38,11 +45,11 @@ def test_categorical():
assert col.describe_categorical == (False, True, {0: 'Mon', 1: 'Tue', 2: 'Wed', 3: 'Thu', 4: 'Fri', 5: 'Sat', 6: 'Sun'})

df2 = _from_dataframe_to_vaex(df.__dataframe__())
assert df2['year'].tolist() == df['year'].tolist()
assert df2['weekday'].tolist() == df['weekday'].tolist()
assert df2['year'].tolist() == [2012, 2015, 2019]
assert df2['weekday'].tolist() == ['Mon', 'Fri', 'Sun']

def test_virtual_column():
df = vaex.from_arrays(
def test_virtual_column(df_factory):
df = df_factory(
x=np.array([True, True, False]),
y=np.array([1, 2, 0]),
z=np.array([9.2, 10.5, 11.8]))
Expand All @@ -65,4 +72,46 @@ def test_arrow_dictionary():
assert col.describe_categorical == (False, True, {0: 'foo', 1: 'bar', 2: 'baz'})

df2 = _from_dataframe_to_vaex(df.__dataframe__())
assert df2.x.tolist() == df.x.tolist()
assert df2.x.tolist() == df.x.tolist()
assert df2.__dataframe__().get_column_by_name('x').null_count == 0

def test_arrow_dictionary_missing():
indices = pa.array([0, 1, 2, 0, 1], mask=np.array([0, 1, 1, 0, 0], dtype=bool))
dictionary = pa.array(['aap', 'noot', 'mies'])
dict_array = pa.DictionaryArray.from_arrays(indices, dictionary)
df = vaex.from_arrays(x = dict_array)

# Some detailed testing for correctness of dtype and null handling:
col = df.__dataframe__().get_column_by_name('x')
assert col.dtype[0] == _DtypeKind.CATEGORICAL
assert col.describe_categorical == (False, True, {0: 'aap', 1: 'noot', 2: 'mies'})

df2 = _from_dataframe_to_vaex(df.__dataframe__())
assert df2.x.tolist() == df.x.tolist()
assert df2.__dataframe__().get_column_by_name('x').null_count == 2
assert df['x'].dtype.index_type == df2['x'].dtype.index_type

def test_missing_from_masked(df_factory):
df = df_factory(
x=np.ma.array([1, 2, 3, 4, 0], mask=[0, 0, 0, 1, 1], dtype=int),
y=np.ma.array([1.5, 2.5, 3.5, 4.5, 0], mask=[False, True, True, True, False], dtype=float),
z=np.ma.array([True, False, True, True, True], mask=[1, 0, 0, 1, 0], dtype=bool))

df2 = _from_dataframe_to_vaex(df.__dataframe__())

assert df.__dataframe__().metadata == df2.__dataframe__().metadata

assert df['x'].tolist() == df2['x'].tolist()
assert not df2['x'].is_masked
assert df2.__dataframe__().get_column_by_name('x').null_count == 2
assert df['x'].dtype == df2['x'].dtype

assert df['y'].tolist() == df2['y'].tolist()
assert not df2['y'].is_masked
assert df2.__dataframe__().get_column_by_name('y').null_count == 3
assert df['y'].dtype == df2['y'].dtype

assert df['z'].tolist() == df2['z'].tolist()
assert not df2['z'].is_masked
assert df2.__dataframe__().get_column_by_name('z').null_count == 2
assert df['z'].dtype == df2['z'].dtype

0 comments on commit 3389cd4

Please sign in to comment.