Skip to content

Commit

Permalink
minimum working example
Browse files Browse the repository at this point in the history
  • Loading branch information
simplymathematics committed Feb 7, 2024
1 parent c2dbfe7 commit f1c8c00
Showing 1 changed file with 99 additions and 6 deletions.
105 changes: 99 additions & 6 deletions examples/gzip/gzip_classifier.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
#!/usr/bin/env python
"""
This is a module toa be used as a reference for building other modules
This is a module that implments a gzip classifier. You can test it by running the following command:
python -m gzip_classifier --compressor gzip --k 3 --m 100 --method random --distance_matrix None --dataset 20newsgroups
"""
# These lines will be used to setup a virtual environment inside the current working directory in a folder called env
# python3 -m pip install venv
# python3 -m venv env
# source env/bin/activate
# run `deactivate` to exit the virtual environment
# These lines will be used to install the dependencies needed for this file
# python -m pip install numpy scikit-learn tqdm scikit-learn-extra pandas

import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
Expand All @@ -9,6 +18,20 @@
import gzip
from tqdm import tqdm
from pathlib import Path
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.multiclass import unique_labels
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.datasets import fetch_20newsgroups
from sklearn.preprocessing import LabelEncoder
import time
import gzip
from tqdm import tqdm
import argparse
import pandas as pd


import logging

Expand Down Expand Up @@ -84,12 +107,16 @@ def __init__(self, k=3, m=-1, compressor="gzip", method="random", distance_matri
ValueError: If distance_matrix is not a path to a numpy file or a numpy array.
"""
logger.info(f"Initializing GzipClassifier with k={k}, m={m}, compressor={compressor}, method={method}, distance_matrix={distance_matrix}")
self.k = k
self.compressor = compressor
self.m = m
self._set_compressor()
self.method = method
pathExists = Path(distance_matrix).exists()
if distance_matrix is not None: # Added this line because the next fails when it is None
pathExists = Path(distance_matrix).exists()
else: # Added this to handle the case when distance_matrix is None
pathExists = False
isString = isinstance(distance_matrix, str)
if isString and pathExists:
self.distance_matrix = np.load(distance_matrix, allow_pickle=True)['X']
Expand Down Expand Up @@ -125,6 +152,7 @@ def fit(self, X, y):
self.X_ = np.array(X)
self.y_ = np.array(y)
Cxs = []
logger.info(f"Training with {len(self.X_)} samples")
# Convert all strings to gzip compressed strings
for x in tqdm(self.X_, desc="Compressing...", leave=False):
Cx = self._compress(x)
Expand All @@ -133,9 +161,6 @@ def fit(self, X, y):
self.Cx_ = Cxs
if self.m != -1 :
# Calculate a distance matrix



# For each class, find the m-nearest neighbors
indices = self._find_best_training_samples(method = self.method)
self.X_ = self.X_[indices]
Expand Down Expand Up @@ -272,9 +297,77 @@ def predict(self, X):
# https://www.freecodecamp.org/news/python-switch-statement-switch-case-example/
def _set_compressor(self):
if self.compressor in compressors:
self._compress = compressors(self.compressor)
self._compress = compressors[self.compressor]
else:
raise NotImplementedError(
f"Compressing with {self.compressor} not supported."
)


def test_model(X, y, train_size = 100, test_size =100, **kwargs):
X_train, X_test, y_train, y_test = train_test_split(X,y, train_size=train_size, test_size=100, stratify=y, random_state=42)
model = GzipClassifier(**kwargs)
start = time.time()
model.fit(X_train, y_train)
end = time.time()
train_time = end - start
print(f"Training time: {end - start}")
start = time.time()
predictions = model.predict(X_test)
end = time.time()
pred_time = end - start
score = round(accuracy_score(y_test, predictions), 3)
print(f"Training time: {end - start}")
print(f"Prediction time: {pred_time}")
print(f"Accuracy score is: {score}")
return {
"accuracy": score,
"train_time": train_time,
"pred_time": pred_time,
}


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--compressor", type=str, default="gzip")
parser.add_argument("--k", type=int, default=3)
parser.add_argument("--m", type=int, default=-1)
parser.add_argument("--method", type=str, default="random")
parser.add_argument("--distance_matrix", type=str, default=None)
parser.add_argument("--dataset", type=str, default="20newsgroups")
parser.add_argument("--train_size", type=int, default=100)
parser.add_argument("--test_size", type=int, default=100)
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
if args.dataset == "20newsgroups":
X, y = fetch_20newsgroups(subset='train', categories=["alt.atheism", "talk.religion.misc"], shuffle=True, random_state=42, return_X_y=True)
y = LabelEncoder().fit(y).transform(y) # Turns the labels "alt.atheism" and "talk.religion.misc" into 0 and 1
elif args.dataset == "kdd_nsl":
df = pd.read_csv("https://gist.githubusercontent.com/simplymathematics/8c6c04bd151950d5ea9e62825db97fdd/raw/34e546e4813f154d11d4f13869b9e3481fc3e829/kdd-nsl.csv", header=None)
width = df.shape[1]
y = df[width-2] # the 2nd to last column is the target
del df[width-2] # remove the target from the dataframe
X = np.array(df)
del df
new_y = []
for entry in y: # convert the target to binary from 'normal' and various attacks.
if entry == "normal":
new_y.append(0)
else:
new_y.append(1)
y = LabelEncoder().fit(new_y).transform(new_y)
elif args.dataset == "truthseeker":
df = pd.read_csv("https://gist.githubusercontent.com/simplymathematics/8c6c04bd151950d5ea9e62825db97fdd/raw/34e546e4813f154d11d4f13869b9e3481fc3e829/truthseeker.csv")
y = np.array(df['BotScoreBinary'].astype("int"))
del df['BotScoreBinary']
del df['BotScore']
del df['statement']
X = np.array(df)
else:
raise ValueError(f"Dataset {args.dataset} not found")
params = vars(args)
params.pop("dataset")
results = test_model(X, y, train_size=args.train_size, test_size=args.test_size, k=args.k, m=args.m, method=args.method, distance_matrix=args.distance_matrix, compressor=args.compressor)



0 comments on commit f1c8c00

Please sign in to comment.