Skip to content

Commit

Permalink
Had to go back to old method of aligning table with new schema. the n…
Browse files Browse the repository at this point in the history
…ew method involved to_pylist which is way to expensive. the difference is 6s, vs 120s for 100,000 material records
  • Loading branch information
lllangWV committed Oct 10, 2024
1 parent d849ae8 commit f9bf5f4
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 10 deletions.
21 changes: 13 additions & 8 deletions parquetdb/core/parquetdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,17 @@ def create(self, data:Union[List[dict],dict,pd.DataFrame],
# This was done because this will automatically fill non-exisiting
# fields in the current table if the update creates new fields
current_table = pq.read_table(current_file)
current_table=pa.Table.from_pylist(current_table.to_pylist(), schema=merged_schema)
current_table=pyarrow_utils.align_table(current_table, merged_schema)

# current_table=pa.Table.from_pylist(current_table.to_pylist(), schema=merged_schema)
# Sometimes records have a nested dictionaries and some do not.
# This ensures all records have the same nested structes
current_table=pyarrow_utils.fill_null_nested_structs_in_table(current_table)
# current_table=pyarrow_utils.fill_null_nested_structs_in_table(current_table)

pq.write_table(current_table, current_file)

incoming_table=pa.Table.from_pylist(incoming_table.to_pylist(), schema=merged_schema)
incoming_table=pyarrow_utils.align_table(incoming_table, merged_schema)
# incoming_table=pa.Table.from_pylist(incoming_table.to_pylist(), schema=merged_schema)

except Exception as e:
logger.exception(f"exception aligning schemas: {e}")
Expand Down Expand Up @@ -272,8 +275,9 @@ def update(self, data: Union[List[dict], dict, pd.DataFrame]):
# This was done because this will automatically fill non-exisiting
# fields in the current table if the update creates new fields
current_table = pq.read_table(current_file)
current_table=pa.Table.from_pylist(current_table.to_pylist(), schema=merged_schema)
current_table=pyarrow_utils.fill_null_nested_structs_in_table(current_table)
current_table=pyarrow_utils.align_table(current_table, merged_schema)
# current_table=pa.Table.from_pylist(current_table.to_pylist(), schema=merged_schema)
# current_table=pyarrow_utils.fill_null_nested_structs_in_table(current_table)

# The flatten method will flatten out all nested structs, update, then rebuild the nested structs
updated_table=pyarrow_utils.update_table(current_table, incoming_table, flatten_method=True)
Expand Down Expand Up @@ -458,8 +462,9 @@ def update_schema(self, field_dict:dict=None, schema:pa.Schema=None):

try:
current_table=pq.read_table(current_file)
pylist=current_table.to_pylist()
updated_table=pa.Table.from_pylist(pylist, schema=updated_schema)
# pylist=current_table.to_pylist()
# updated_table=pa.Table.from_pylist(pylist, schema=updated_schema)
updated_table=pyarrow_utils.align_table(current_table, updated_schema)
pq.write_table(updated_table, current_file)
except Exception as e:
logger.exception(f"exception processing {current_file}: {e}")
Expand Down Expand Up @@ -1244,6 +1249,6 @@ def _validate_id(self, id_column):
logger.info(f"Validating ids")
current_table=self.read(columns=['id'], output_format='table').combine_chunks()
filtered_table = current_table.filter(~pc.field('id').isin(id_column))
logger.warning(f"The following ids are not in the main table", extra={'ids_do_not_exist': filtered_table['id'].to_pylist()})
logger.warning(f"The following ids are not in the main table", extra={'ids_do_not_exist': filtered_table['id']})
return None

90 changes: 88 additions & 2 deletions parquetdb/utils/pyarrow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def order_fields_in_table(table, new_schema):
new_struct_type = field.type
column_array = order_fields_in_struct(original_column, new_struct_type)
else:
column_array = column_array
column_array = original_column
new_columns.append(column_array)

return pa.Table.from_arrays(new_columns, schema=new_schema)
Expand Down Expand Up @@ -1043,4 +1043,90 @@ def update_schema(current_schema, schema=None, field_dict=None):
if schema:
updated_schema=schema

return updated_schema
return updated_schema


def align_table(current_table: pa.Table, new_schema: pa.Schema) -> pa.Table:
"""
Aligns the given table to the new schema, filling in missing fields or struct fields with null values.
Args:
table (pa.Table): The table to align.
new_schema (pa.Schema): The target schema to align the table to.
Returns:
pa.Table: The aligned table.
"""
# current_table=replace_empty_structs_in_table(current_table)

current_table=add_new_null_fields_in_table(current_table, new_schema)

current_table=order_fields_in_table(current_table, new_schema)

return current_table

def add_new_null_fields_in_column(column_array, field, new_type):
column_type = column_array.type
logger.debug(f"Field name: {field.name}")
logger.debug(f"Column type: {column_type}")
logger.debug(f"New type: {new_type}")

if pa.types.is_struct(column_type):
logger.debug('This column is a struct')
# Replacing empty structs with dummy structs
new_type_names=[field.name for field in new_type]
if field.name in new_type_names:
new_struct_type=new_type.field(field.name).type
else:
new_struct_type=new_type
new_struct_type = merge_structs(new_struct_type,column_type)
logger.debug(f"New struct type: {new_struct_type}")
new_array=add_new_null_fields_in_struct(column_array, new_struct_type)
new_field=pa.field(field.name, new_array.type)
return new_array, new_field
else:
logger.debug('This column is not a struct')
return column_array, field

def add_new_null_fields_in_table(table, new_schema):
new_columns_fields=[]
new_columns=[]
for field in new_schema:
if field.name not in table.schema.names:
new_column=pa.nulls(table.num_rows, type=field.type)
new_field=pa.field(field.name, field.type)
else:
original_column = table.column(field.name)
new_column, new_field = add_new_null_fields_in_column(original_column, field, field.type)

new_columns.append(new_column)
new_columns_fields.append(new_field)
table = pa.Table.from_arrays(new_columns, schema=pa.schema(new_columns_fields))
return table


def add_new_null_fields_in_struct(column_array, new_struct_type):
# Combine chunks if necessary
if isinstance(column_array, pa.ChunkedArray):
column_array = column_array.combine_chunks()

# Detecting if array is a struct type
original_type = column_array.type
if not pa.types.is_struct(original_type):
return column_array

original_fields_dict = {field.name: i for i, field in enumerate(original_type)}

new_arrays=[]
for field in new_struct_type:
if field.name in original_fields_dict:
logger.debug("Adding values to a existing field")
# Recursively generate the new array for the field
field_array = column_array.field(original_fields_dict[field.name])
new_field_array = add_new_null_fields_in_struct(field_array, field_array.type)
new_arrays.append(new_field_array)
else:
logger.debug("Adding null values to a previously non-existing field")
null_array = pa.nulls(len(column_array), field.type)
new_arrays.append(null_array)
return pa.StructArray.from_arrays(new_arrays, fields=new_struct_type)

0 comments on commit f9bf5f4

Please sign in to comment.