From 7409a2b02b578bd3da334eec35011778c4ca4aa6 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Sun, 5 Mar 2023 12:12:50 +0100 Subject: [PATCH] encode tree metadata and node data in seperate dfs --- benchmark.py | 37 +++++++------ model_parser.py | 1 + slim_trees/lgbm_booster.py | 111 +++++++++++++++++++++++++++++-------- 3 files changed, 109 insertions(+), 40 deletions(-) diff --git a/benchmark.py b/benchmark.py index 5cf7771..b0d09da 100644 --- a/benchmark.py +++ b/benchmark.py @@ -5,6 +5,7 @@ from typing import Callable, List import lightgbm as lgb +import tabulate from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor from examples.utils import generate_dataset @@ -64,36 +65,36 @@ def benchmark_model(name, train_func, dump_func) -> dict: naive_dump_time = benchmark(pickle.dumps, model) naive_pickled = pickle.dumps(model) naive_pickled_size = len(naive_pickled) - naive_load_time = benchmark(pickle.loads, naive_pickled) + # naive_load_time = benchmark(pickle.loads, naive_pickled) our_dump_time = benchmark(dump_func, model, io.BytesIO()) our_pickled_buf = io.BytesIO() dump_func(model, our_pickled_buf) our_pickled = our_pickled_buf.getvalue() our_pickled_size = len(our_pickled) - our_load_time = benchmark(pickle.loads, our_pickled) + # our_load_time = benchmark(pickle.loads, our_pickled) return { "name": name, "baseline": { "size": naive_pickled_size, "dump_time": naive_dump_time, - "load_time": naive_load_time, + # "load_time": naive_load_time, }, "ours": { "size": our_pickled_size, "dump_time": our_dump_time, - "load_time": our_load_time, + # "load_time": our_load_time, }, "change": { "size": naive_pickled_size / our_pickled_size, "dump_time": our_dump_time / naive_dump_time, - "load_time": our_load_time / naive_load_time, + # "load_time": our_load_time / naive_load_time, }, } def format_size(n_bytes: int) -> str: - MiB = 1024**2 + MiB = 1024 ** 2 return f"{n_bytes / MiB:.1f} MiB" @@ -111,9 +112,9 @@ def format_benchmarks_results_table(benchmark_results: List[dict]) -> str: |--|--:|--:|--:| """ - def format_row(results): + def format_row(results, min_width=10): def format_cell(base, ours, change): - return f"{base} / {ours} / {change}" + return f"{base:<5} / {ours} / {change}" column_data = [ results["name"], @@ -127,11 +128,11 @@ def format_cell(base, ours, change): format_time(results["ours"]["dump_time"]), format_change(results["change"]["dump_time"]), ), - format_cell( - format_time(results["baseline"]["load_time"]), - format_time(results["ours"]["load_time"]), - format_change(results["change"]["load_time"]), - ), + # format_cell( + # format_time(results["baseline"]["load_time"]), + # format_time(results["ours"]["load_time"]), + # format_change(results["change"]["load_time"]), + # ), ] return " | ".join(column_data) @@ -142,12 +143,12 @@ def format_cell(base, ours, change): if __name__ == "__main__": models_to_benchmark = [ - ("`RandomForestRegressor`", train_model_sklearn, dump_sklearn), - ("`GradientBoostingRegressor`", train_gb_sklearn, dump_sklearn), - ("`LGBMRegressor gbdt`", train_gbdt_lgbm, dump_lgbm), - ("`LGBMRegressor gbdt large`", train_gbdt_large_lgbm, dump_lgbm), + # ("`RandomForestRegressor`", train_model_sklearn, dump_sklearn), + # ("`GradientBoostingRegressor`", train_gb_sklearn, dump_sklearn), + # ("`LGBMRegressor gbdt`", train_gbdt_lgbm, dump_lgbm), + # ("`LGBMRegressor gbdt large`", train_gbdt_large_lgbm, dump_lgbm), ("`LGBMRegressor rf`", train_rf_lgbm, dump_lgbm), ] benchmark_results = [benchmark_model(*args) for args in models_to_benchmark] print("Base results / Our results / Change") - print(format_benchmarks_results_table(benchmark_results)) + print(format_benchmarks_results_table(benchmark_results)) \ No newline at end of file diff --git a/model_parser.py b/model_parser.py index bd78ff8..3b4d9b0 100644 --- a/model_parser.py +++ b/model_parser.py @@ -65,6 +65,7 @@ def get_type(s: str): else: return str + def pyarrow_table_to_bytes(table: pa.Table) -> bytes: stream = pa.BufferOutputStream() writer = pa.RecordBatchStreamWriter(stream, table.schema) diff --git a/slim_trees/lgbm_booster.py b/slim_trees/lgbm_booster.py index 9efa15b..625a007 100644 --- a/slim_trees/lgbm_booster.py +++ b/slim_trees/lgbm_booster.py @@ -4,7 +4,9 @@ import re import sys from typing import Any, BinaryIO, List, Tuple - +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq import numpy as np from slim_trees.compression_utils import ( @@ -58,7 +60,26 @@ def _decompress_booster_state(compressed_state: dict): return state -def _compress_booster_handle(model_string: str) -> Tuple[str, List[dict], str]: +def pyarrow_table_to_bytes(table: pa.Table) -> bytes: + # TODO: move to utils + stream = pa.BufferOutputStream() + writer = pa.RecordBatchStreamWriter(stream, table.schema) + writer.write_table(table) + writer.close() + return stream.getvalue().to_pybytes() + + +def bytes_to_pandas_df(bytes_: bytes) -> pd.DataFrame: + """ + Given a bytes object, create a pandas DataFrame. + """ + stream = pa.BufferReader(bytes_) + reader = pa.RecordBatchStreamReader(stream) + table = reader.read_all() + return table.to_pandas() + + +def _compress_booster_handle(model_string: str) -> Tuple[str, bytes, bytes, str]: if not model_string.startswith("tree\nversion=v3"): raise ValueError("Only v3 is supported for the booster string format.") FRONT_STRING_REGEX = r"(?:\w+(?:=.*)?\n)*\n(?=Tree)" @@ -82,6 +103,7 @@ def _extract_feature(feature_line): raise ValueError("Could not find back string.") back_str = back_str_match.group() tree_matches = re.findall(TREE_GROUP_REGEX, model_string) + nodes: List[dict] = [] trees: List[dict] = [] for i, tree_match in enumerate(tree_matches): tree_name, features_list = tree_match @@ -106,30 +128,61 @@ def parse(str_list, dtype): assert len(feats_map["is_linear"]) == 1 assert len(feats_map["shrinkage"]) == 1 - trees.append( - { - "num_leaves": int(feats_map["num_leaves"][0]), - "num_cat": int(feats_map["num_cat"][0]), - "split_feature": parse(feats_map["split_feature"], split_feature_dtype), - "threshold": compress_half_int_float_array( - parse(feats_map["threshold"], threshold_dtype) - ), - "decision_type": parse(feats_map["decision_type"], decision_type_dtype), - "left_child": parse(feats_map["left_child"], left_child_dtype), - "right_child": parse(feats_map["right_child"], right_child_dtype), - "leaf_value": parse(feats_map["leaf_value"], leaf_value_dtype), - "is_linear": int(feats_map["is_linear"][0]), - "shrinkage": float(feats_map["shrinkage"][0]), - } - ) - return front_str, trees, back_str + """ + strategy: we have two datastructures: one on tree_level and one on node_level + here, this looks like just splitting the features into two dicts + but one of them can be "exploded" later (node level) while the tree level is for meta information + """ + + tree = { + "tree_idx": tree_idx, + "num_leaves": int(feats_map["num_leaves"][0]), + "num_cat": int(feats_map["num_cat"][0]), + "last_leaf_value": parse(feats_map["leaf_value"], leaf_value_dtype)[-1], + "is_linear": int(feats_map["is_linear"][0]), + "is_shrinkage": float(feats_map["shrinkage"][0]), + } + trees.append(tree) + + node = { + "tree_idx": tree_idx, # TODO: this is new, have to recover this as well + "node_idx": list(range(int(feats_map["num_leaves"][0]) - 1)), + # all of these attributes have length num_leaves - 1 + "num_leaves": int(feats_map["num_leaves"][0]), + "num_cat": int(feats_map["num_cat"][0]), + "split_feature": parse(feats_map["split_feature"], split_feature_dtype), + # "threshold": compress_half_int_float_array( + # parse(feats_map["threshold"], threshold_dtype) + # ), + "threshold": parse(feats_map["threshold"], threshold_dtype), + "decision_type": parse(feats_map["decision_type"], decision_type_dtype), + "left_child": parse(feats_map["left_child"], left_child_dtype), + "right_child": parse(feats_map["right_child"], right_child_dtype), + "leaf_value": parse(feats_map["leaf_value"], leaf_value_dtype)[:-1], # don't get the last value + } + nodes.append(node) + + trees_df = pd.DataFrame(trees) + # create .parquet file in bytes from trees_df + trees_df_bytes = pyarrow_table_to_bytes(pa.Table.from_pandas(trees_df)) + + # transform nodes_df s.t. each feature is a column + nodes_df = pd.DataFrame(nodes) + nodes_df = nodes_df.explode( + ["node_idx", "split_feature", "threshold", "decision_type", "left_child", "right_child", "leaf_value"] + ) + nodes_df_bytes = pyarrow_table_to_bytes(pa.Table.from_pandas(nodes_df)) + return front_str, trees_df_bytes, nodes_df_bytes, back_str -def _decompress_booster_handle(compressed_state: Tuple[str, List[dict], str]) -> str: - front_str, trees, back_str = compressed_state + +def _decompress_booster_handle(compressed_state: Tuple[str, bytes, bytes, str]) -> str: + front_str, trees_df_bytes, nodes_df_bytes, back_str = compressed_state assert type(front_str) == str - assert type(trees) == list + # assert type(trees) == list assert type(back_str) == str + trees_df = bytes_to_pandas_df(trees_df_bytes) + nodes_df = bytes_to_pandas_df(nodes_df_bytes) handle = front_str @@ -175,3 +228,17 @@ def _decompress_booster_handle(compressed_state: Tuple[str, List[dict], str]) -> handle += tree_str handle += back_str return handle + + +def compress_handle_parquet(trees: List[dict]) -> bytes: + """ + Take the list of dictionaries (tree) and create a pyarrow Table. + """ + + # step 1: turn features into pyarrow arrays + # loop over all tree dicts in trees and create one dict per node with all features + for tree in trees: + pass + # step 2: create pyarrow table + # step 3: write table to parquet + # step 4: return bytes