Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
quadrismegistus committed Aug 20, 2024
1 parent 4921eaf commit 5f2006b
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 174 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ on:
push:
branches:
- '*'
- '!main'
pull_request:
branches:
- '*'
- '!main'

jobs:
test-and-coverage:
Expand Down
2 changes: 1 addition & 1 deletion hashstash/engines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def __hash__(self):


# @fcache
@log.info
# @log.info
def HashStash(
name: str = None,
engine: ENGINE_TYPES = None,
Expand Down
237 changes: 82 additions & 155 deletions hashstash/profilers/engine_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

RAW_SIZE_KEY = "Raw Size (MB)"

def time_function(func, *args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
return result, end_time - start_time

class HashStashProfiler:
def __init__(self, stash):
Expand All @@ -11,64 +16,72 @@ def __init__(self, stash):
def profile(
self,
size: list = PROFILE_SIZES,
iterations: list = DEFAULT_ITERATIONS,
iterations: int = DEFAULT_ITERATIONS,
num_proc: int = DEFAULT_NUM_PROC,
verbose: bool = False,
progress: bool = True
):
tasks = [{"size": random.choice(size)} for _ in range(iterations)]
results = pmap(
self.profile_stash_transaction,
objects=self.stash,
options=tasks,
num_proc=num_proc,
ordered=False,
progress=progress,
desc=f'Profiling {self.stash}'
)
return pd.DataFrame([{'Iteration':i, **r} for i,result in enumerate(results) if result for r in result])

def profile_stash_transaction(self, stash, size: int = DEFAULT_DATA_SIZE):
data = generate_data(size)
raw_size = bytesize(data)
cache_key = f"test_data_{size}_{random.random()}"

results = []
common_data = {
"Engine": stash.engine,
"Compress": stash.compress,
"Base64": stash.b64,
"Size (MB)": int(size) / 1024 / 1024,
"Raw Size (MB)": raw_size / 1024 / 1024,
}

def stream_results(stash):
write_num = 0
write_time = 0
write_size = 0
timenow = time.time()

iterr = pmap(
profile_stash_transaction,
objects=stash,
options=tasks,
num_proc=num_proc,
ordered=False,
progress=progress,
desc=f'Profiling {stash}'
)
for result in iterr:
if not result:
continue

write_num += 1
timenownow = time.time()
write_time += timenownow - timenow
timenow = timenownow

try:
if isinstance(result, Exception):
raise result
sizenow = result[0][RAW_SIZE_KEY]
write_size += sizenow

for d in result:
d["Num Processes"] = num_proc
d["Iteration"] = write_num
d["Cumulative Time (s)"] = write_time
d["Cumulative Size (MB)"] = write_size
yield d
except Exception as e:
print(f"Error processing result: {e}")
print(f"Result type: {type(result)}")
print(f"Result content: {result}")
continue

with self.stash.tmp() as tmpstash:
return pd.DataFrame(stream_results(tmpstash))

@staticmethod
def _profile_one(args):
profiler, task = args
return profiler.profile_one(**task)
def add_result(operation, time_taken, additional_data=None):
result = {
**common_data,
"Operation": operation,
"Time (s)": time_taken,
"Rate (it/s)": (1 / time_taken) if time_taken else 0,
"Speed (MB/s)": ((raw_size / time_taken) if time_taken else 0) / 1024 / 1024,
}
if additional_data:
result.update(additional_data)
results.append(result)

operations = [
("Encode Key", lambda: stash.encode_key(cache_key)),
("Decode Key", lambda: stash.decode_key(stash.encode_key(cache_key))),
("Encode Value", lambda: stash.encode_value(data)),
("Decode Value", lambda: stash.decode_value(stash.encode_value(data))),
("Write", lambda: stash.set(cache_key, data)),
("Read", lambda: stash.get(cache_key)),
]

for operation, func in operations:
_, time_taken = time_function(func)
add_result(operation, time_taken)

# Add cached size and compression ratio after write operation
encoded_value = stash.encode_value(data)
for d in results:
d.update({
"Cached Size (MB)": len(encoded_value) / 1024 / 1024,
"Compression Ratio (%)": (len(encoded_value) / raw_size * 100) if raw_size else 0,
})

return results


def profile_df(
self,
*args,
Expand All @@ -82,14 +95,14 @@ def profile_df(
if operations:
df = df[df.Operation.isin(operations)]
df = pd.concat(
gdf.sort_values("write_num").assign(
gdf.sort_values("Iteration").assign(
**{
"Cumulative Time (s)": gdf["Time (s)"].cumsum(),
"Cumulative Size (MB)": gdf["Size (B)"].cumsum() / 1024 / 1024,
"Cumulative Size (MB)": gdf["Raw Size (MB)"].cumsum(),
}
)
for g, gdf in df.groupby(
[x for x in group_by if not x.startswith("write_num")]
[x for x in group_by if x != "Iteration"]
)
)
if group_by:
Expand All @@ -98,105 +111,19 @@ def profile_df(
df = df.sort_values(sort_by, ascending=False)
return df

def generate_data(size):
return {
"string": "".join(
random.choices("abcdefghijklmnopqrstuvwxyz", k=size // 2)
),
"number": random.randint(1, 1000000),
"list": [random.randint(1, 1000) for _ in range(size // 20)],
"nested": {
f"key_{i}": {"value": random.random()} for i in range(size // 200)
},
}


def profile_stash_transaction(
stash,
size: int = DEFAULT_DATA_SIZE,
verbose: bool = False,
):
cache = stash
if cache is None:
raise Exception('Profiler must be used as context manager')
data = generate_data(size)
raw_size = len(json.dumps(data).encode())
cache_key = f"test_data_{size}_{random.random()}"

# Encode value to get cached size
encoded_value = cache.encode(data)
cached_size = len(encoded_value)

results = []
common_data = {
"Engine": cache.engine,
"Compress": cache.compress,
"Base64": cache.b64,
"Size (MB)": int(size) / 1024 / 1024,
"Raw Size (MB)": raw_size / 1024 / 1024,
"Cached Size (MB)": cached_size / 1024 / 1024,
"Compression Ratio (%)": (cached_size / raw_size * 100) if raw_size else 0,
}

def add_result(operation, time_taken, additional_data=None):
result = {
**common_data,
"Operation": operation,
"Time (s)": time_taken,
"Rate (it/s)": (1 / time_taken) if time_taken else 0,
"Speed (MB/s)": ((raw_size / time_taken) if time_taken else 0)
/ 1024
/ 1024,
}
if additional_data:
result.update(additional_data)
results.append(result)

# Measure key encoding speed
start_time = time.time()
encoded_key = cache.encode(cache_key)
key_encode_time = time.time() - start_time
add_result("Encode Key", key_encode_time)

# Measure key decoding speed
start_time = time.time()
_ = cache.decode_key(encoded_key)
key_decode_time = time.time() - start_time
add_result("Decode Key", key_decode_time)

# Measure value encoding speed
start_time = time.time()
encoded_value = cache.encode(data)
value_encode_time = time.time() - start_time
add_result("Encode Value", value_encode_time)

# Measure value decoding speed
start_time = time.time()
_ = cache.decode_value(encoded_value)
value_decode_time = time.time() - start_time
add_result("Decode Value", value_decode_time)

# Measure write speed
start_time = time.time()
cache[cache_key] = data
write_time = time.time() - start_time
add_result("Write", write_time)

# Add compression data
# add_result("Compress", value_encode_time)

# Measure read speed
start_time = time.time()
_ = cache[cache_key]
read_time = time.time() - start_time
add_result("Read", read_time)

# Calculate and add raw write and read times
# raw_write_time = write_time - (key_encode_time + value_encode_time)
# raw_read_time = read_time - (key_decode_time + value_decode_time)
# add_result("Raw Write", raw_write_time)
# add_result("Raw Read", raw_read_time)
def run_engine_profile(iterations=1000, **kwargs):
with temporary_log_level(logging.WARN):
results_df = HashStashProfiler(HashStash()).profile(iterations=iterations, **kwargs)

grouped_results = results_df.groupby(['Engine', 'Compress', 'Base64', 'Operation']).agg({
'Speed (MB/s)': 'median',
'Time (s)': 'sum',
'Raw Size (MB)': 'sum',
'Cached Size (MB)': 'mean',
'Compression Ratio (%)': 'mean',
}).reset_index()

return grouped_results.sort_values(['Engine', 'Compress', 'Base64', 'Speed (MB/s)'], ascending=[True, True, True, False])

if verbose:
print(results)
return results
if __name__ == "__main__":
run_engine_profile()
24 changes: 15 additions & 9 deletions hashstash/profilers/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,22 @@ def generate_pandas_series(max_length: int = 100) -> pd.Series:

def generate_complex_data(size: int) -> Dict[str, Any]:
return {
"nested_structure": generate_data(depth=5),
"nested_structure": generate_data(target_size=size, depth=5),
"dataframe": generate_pandas_dataframe(
max_rows=size // 100, max_cols=size // 1000
),
"numpy_array": generate_numpy_array(max_size=size // 100),
"series": generate_pandas_series(max_length=size // 10),
"large_list": generate_list(depth=2, max_length=size // 10),
"large_dict": generate_dict(depth=2, max_keys=size // 10),
"large_list": generate_list(max_length=size // 10),
"large_dict": generate_dict(max_keys=size // 10),
}



@log.debug
def generate_data(
target_size: int,
data_types=[
data_types: List[str] = [
"primitive",
"list",
"dict",
Expand All @@ -67,9 +67,11 @@ def generate_data(
"prosodic_text",
"prosodic_line",
],
) -> Dict[str, Any]:
data = {}

depth: int = 1,
) -> Any:
if depth <= 0:
return generate_primitive()

choice = random.choice(data_types)

if choice == "primitive":
Expand All @@ -85,8 +87,12 @@ def generate_data(
max_rows=max(1, min(target_size // 100, 1000)),
max_cols=max(1, min(target_size // 1000, 50)),
)
elif choice == "pandas_series":
return generate_pandas_series(max_length=min(target_size, 1000))
else:
return
# For unsupported types, recursively call generate_data with reduced depth
return generate_data(target_size, data_types, depth - 1)


def generate_list(max_length: int = 10) -> List[Any]:
return [generate_primitive() for _ in range(random.randint(0, max_length))]
Expand All @@ -95,4 +101,4 @@ def generate_list(max_length: int = 10) -> List[Any]:
def generate_dict(max_keys: int = 10) -> Dict[str, Any]:
return {
f"key_{i}": generate_list(max_length=max_keys) for i in range(random.randint(0, max_keys))
}
}
3 changes: 3 additions & 0 deletions hashstash/serializers/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,6 @@ def serialize_pickle(obj):

def deserialize_pickle(data):
return pickle.loads(data)

def bytesize(obj, serializer='custom'):
return len(serialize(obj, serializer).encode())
Loading

0 comments on commit 5f2006b

Please sign in to comment.