From 3a4e07afc8c7414e9442301a907c78d3c63cf418 Mon Sep 17 00:00:00 2001 From: Berack96 Date: Thu, 2 May 2024 14:19:23 +0200 Subject: [PATCH] Refactor Dataset - better finalize function - support for one-hot-encoding --- src/app.py | 38 ++++++------- src/learning/data.py | 108 ++++++++++++++++++++++--------------- src/learning/ml.py | 47 +++++++--------- src/learning/supervised.py | 23 ++++---- 4 files changed, 118 insertions(+), 98 deletions(-) diff --git a/src/app.py b/src/app.py index 9ed22c3..9fb1266 100644 --- a/src/app.py +++ b/src/app.py @@ -1,43 +1,43 @@ -from learning.data import Dataset -from learning.supervised import LinearRegression, LogisticRegression, MultiLogisticRegression +from learning.data import Dataset, TargetType +from learning.supervised import LinearRegression, LogisticRegression, MultiLayerPerceptron from learning.ml import MLAlgorithm from typing import Callable def auto_mpg() -> tuple[int, MLAlgorithm]: - ds = Dataset("datasets\\auto-mpg.csv", "MPG") + ds = Dataset("datasets\\auto-mpg.csv", "MPG", TargetType.Regression) - ds.to_numbers(["HP"]) + ds.numbers(["HP"]) ds.handle_na() - ds.regularize(excepts=["Cylinders","Year","Origin"]) - return (1000, LinearRegression(ds.as_ndarray(), learning_rate=0.0001)) + ds.normalize(excepts=["Cylinders","Year","Origin"]) + return (1000, LinearRegression(ds, learning_rate=0.0001)) def automobile() -> tuple[int, MLAlgorithm]: - ds = Dataset("datasets\\regression\\automobile.csv", "symboling") + ds = Dataset("datasets\\regression\\automobile.csv", "symboling", TargetType.Regression) attributes_to_modify = ["fuel-system", "engine-type", "drive-wheels", "body-style", "make", "engine-location", "aspiration", "fuel-type", "num-of-cylinders", "num-of-doors"] ds.factorize(attributes_to_modify) - ds.to_numbers(["normalized-losses", "bore", "stroke", "horsepower", "peak-rpm", "price"]) + ds.numbers(["normalized-losses", "bore", "stroke", "horsepower", "peak-rpm", "price"]) ds.handle_na() - ds.regularize(excepts=attributes_to_modify) - return (1000, LinearRegression(ds.as_ndarray(), learning_rate=0.004)) + ds.normalize(excepts=attributes_to_modify) + return (1000, LinearRegression(ds, learning_rate=0.004)) def power_plant() -> tuple[int, MLAlgorithm]: - ds = Dataset("datasets\\regression\\power-plant.csv", "energy-output") - ds.regularize() - return (80, LinearRegression(ds.as_ndarray(), learning_rate=0.1)) + ds = Dataset("datasets\\regression\\power-plant.csv", "energy-output", TargetType.Regression) + ds.normalize() + return (80, LinearRegression(ds, learning_rate=0.1)) def electrical_grid() -> tuple[int, MLAlgorithm]: - ds = Dataset("datasets\\classification\\electrical_grid.csv", "stabf") + ds = Dataset("datasets\\classification\\electrical_grid.csv", "stabf", TargetType.Classification) ds.factorize(["stabf"]) - ds.regularize() - return (1000, LogisticRegression(ds.as_ndarray(), learning_rate=0.08)) + ds.normalize() + return (1000, LogisticRegression(ds, learning_rate=0.08)) def frogs() -> tuple[int, MLAlgorithm]: - ds = Dataset("datasets\\classification\\frogs.csv", "Species") + ds = Dataset("datasets\\classification\\frogs.csv", "Species", TargetType.MultiClassification) ds.remove(["Family", "Genus", "RecordID"]) ds.factorize(["Species"]) - return (1000, MultiLogisticRegression(ds.as_ndarray(), learning_rate=0.08)) + return (1000, MultiLayerPerceptron(ds, learning_rate=0.08)) @@ -55,5 +55,5 @@ def learn_dataset(function:Callable[..., tuple[int, MLAlgorithm]], epochs:int=10 return ml if __name__ == "__main__": - ml = learn_dataset(electrical_grid) + ml = learn_dataset(automobile) print(ml.accuracy(ml.testset)) diff --git a/src/learning/data.py b/src/learning/data.py index 4a4ed08..7fbac19 100644 --- a/src/learning/data.py +++ b/src/learning/data.py @@ -1,33 +1,53 @@ import pandas as pd import numpy as np +from enum import Enum from typing_extensions import Self +class TargetType(Enum): + Regression = 1 + Classification = 2 + MultiClassification = 3 + NoTarget = 4 + +class Data: + x: np.ndarray + y: np.ndarray + size: int + param: int + + def __init__(self, x:np.ndarray, y:np.ndarray) -> None: + self.x = x + self.y = y + self.size = x.shape[0] + self.param = x.shape[1] + def __str__(self) -> str: + return "X: " + str(self.x) + "\nY: " + str(self.y) + def as_tuple(self) -> tuple[np.ndarray, np.ndarray, int, int]: + return (self.x, self.y, self.size, self.param) + class Dataset: - def __init__(self, csv:str, target:str, classification:bool=None) -> None: - data = pd.read_csv(csv) + data: pd.DataFrame + target: str + target_type: TargetType + + def __init__(self, csv:str, target:str, target_type:TargetType) -> None: + self.original = pd.read_csv(csv) + self.data = self.original + self.target = target + self.target_type = target_type # move target to the start - col_target = data.pop(target) - data.insert(0, target, col_target) - data.insert(1, "Bias", 1.0) - - if classification == None: - classification = (data[target].dtype == object) - - self.original = data - self.data = data - self.target = target - self.classification = classification + col_target = self.data.pop(target) + self.data.insert(0, target, col_target) def remove(self, columns:list[str]) -> Self: for col in columns: self.data.pop(col) return self - def regularize(self, excepts:list[str]=[]) -> Self: + def normalize(self, excepts:list[str]=[]) -> Self: excepts.append(self.target) - excepts.append("Bias") for col in self.data: if col not in excepts: index = self.data.columns.get_loc(col) @@ -42,7 +62,7 @@ class Dataset: data[col] = pd.factorize(data[col])[0] return self - def to_numbers(self, columns:list[str]=[]) -> Self: + def numbers(self, columns:list[str]=[]) -> Self: data = self.data for col in columns: if data[col].dtype == object: @@ -53,34 +73,38 @@ class Dataset: self.data = self.data.dropna() return self - def shuffle(self) -> Self: - self.data = self.data.sample(frac=1) - return self + def get_dataset(self, test_frac:float=0.15, valid_frac:float=0.15) -> tuple[Data, Data, Data]: + data = self.data.to_numpy() + data = np.insert(data, 1, 1, axis=1) # adding bias + np.random.shuffle(data) - def as_ndarray(self) -> np.ndarray: - return self.data.to_numpy() - - def get_index(self, column:str) -> int: - return self.data.columns.get_loc(column) - -class PrincipalComponentAnalisys: - def __init__(self, data:np.ndarray) -> None: - self.data = data - - def reduce(self, total:int=0, threshold:float=1) -> Self: - columns = self.data.shape[1] - if total > columns or total <= 0: - total = columns - if threshold <= 0 or threshold > 1: - threshold = 1 + total = data.shape[0] + valid_cutoff = int(total * valid_frac) + test_cutoff = int(total * test_frac) + valid_cutoff + valid = data[:valid_cutoff] + test = data[valid_cutoff:test_cutoff] + learn = data[test_cutoff:] + l = [] + for ds in [learn, test, valid]: + target = ds[:, 0] if self.target_type != TargetType.NoTarget else None + ds = ds[:, 1:] + if self.target_type == TargetType.MultiClassification: + target = target.astype(int) + uniques = np.unique(target).shape[0] + target = np.eye(uniques)[target] + l.append(Data(ds, target)) + return l if __name__ == "__main__": - df = Dataset("datasets\\regression\\automobile.csv", "symboling") - attributes_to_modify = ["fuel-system", "engine-type", "drive-wheels", "body-style", "make", "engine-location", "aspiration", "fuel-type", "num-of-cylinders", "num-of-doors"] - df.factorize(attributes_to_modify) - df.to_numbers(["normalized-losses", "bore", "stroke", "horsepower", "peak-rpm", "price"]) - df.handle_na() - df.regularize(excepts=attributes_to_modify) - print(df.data.dtypes) + ds = Dataset("datasets\\classification\\frogs.csv", "Species", TargetType.MultiClassification) + ds.remove(["Family", "Genus", "RecordID"]) + ds.factorize(["Species"]) + + np.random.seed(0) + learn, test, valid = ds.get_dataset() + print(learn) + print(test) + print(valid) + diff --git a/src/learning/ml.py b/src/learning/ml.py index d1cf836..97becef 100644 --- a/src/learning/ml.py +++ b/src/learning/ml.py @@ -1,33 +1,25 @@ from abc import ABC, abstractmethod from plot import Plot from tqdm import tqdm +from learning.data import Dataset, Data -import pandas as pd import numpy as np class MLAlgorithm(ABC): """ Classe generica per gli algoritmi di Machine Learning """ - testset: np.ndarray - learnset: np.ndarray + learnset: Data + validset: Data + testset: Data + _learn_loss: list[float] _valid_loss: list[float] - _train_loss: list[float] - def _set_dataset(self, dataset:np.ndarray, split:float=0.2): - splitT = int(dataset.shape[0] * split) - splitV = int(splitT / 2) - - np.random.shuffle(dataset) - self.validset = dataset[:splitV] - self.testset = dataset[splitV:splitT] - self.learnset = dataset[splitT:] - - def _split_data_target(self, dset:np.ndarray) -> tuple[np.ndarray, np.ndarray, int]: - x = np.delete(dset, 0, 1) - y = dset[:, 0] - m = dset.shape[0] - return (x, y, m) + def __init__(self, dataset:Dataset) -> None: + learn, test, valid = dataset.get_dataset(0.2, 0.2) + self.learnset = learn + self.validset = valid + self.testset = test def learn(self, epochs:int, early_stop:float=0.0000001, max_patience:int=10, verbose:bool=False) -> tuple[int, list, list]: learn = [] @@ -56,7 +48,7 @@ class MLAlgorithm(ABC): except KeyboardInterrupt: pass if verbose: print(f"Loop ended after {count} epochs") - self._train_loss = learn + self._learn_loss = learn self._valid_loss = valid return (count, learn, valid) @@ -70,24 +62,23 @@ class MLAlgorithm(ABC): return self.predict_loss(self.testset) def plot(self, skip:int=1000) -> None: - skip = skip if len(self._train_loss) > skip else 0 + skip = skip if len(self._learn_loss) > skip else 0 plot = Plot("Loss", "Time", "Mean Loss") - plot.line("training", "blue", data=self._train_loss[skip:]) + plot.line("training", "blue", data=self._learn_loss[skip:]) plot.line("validation", "red", data=self._valid_loss[skip:]) plot.wait() - def confusion_matrix(self, dataset:np.ndarray) -> np.ndarray: - x, y, _ = self._split_data_target(dataset) - h0 = np.where(self._h0(x) > 0.5, 1, 0) + def confusion_matrix(self, dataset:Data) -> np.ndarray: + h0 = np.where(self._h0(dataset.x) > 0.5, 1, 0) - classes = len(np.unique(y)) + classes = len(np.unique(dataset.y)) conf_matrix = np.zeros((classes, classes), dtype=int) - for actual, prediction in zip(y, h0): + for actual, prediction in zip(dataset.y, h0): conf_matrix[int(actual), int(prediction)] += 1 return conf_matrix - def accuracy(self, dataset:np.ndarray) -> np.ndarray: + def accuracy(self, dataset:Data) -> float: conf = self.confusion_matrix(dataset) correct = np.sum(np.diagonal(conf)) total = np.sum(conf) @@ -98,7 +89,7 @@ class MLAlgorithm(ABC): @abstractmethod def learning_step(self) -> float: pass @abstractmethod - def predict_loss(self, dataset:np.ndarray) -> float: pass + def predict_loss(self, dataset:Data) -> float: pass @abstractmethod def get_parameters(self): pass @abstractmethod diff --git a/src/learning/supervised.py b/src/learning/supervised.py index d8cd4df..f5a2006 100644 --- a/src/learning/supervised.py +++ b/src/learning/supervised.py @@ -3,27 +3,25 @@ import numpy as np from abc import abstractmethod from learning.ml import MLAlgorithm +from learning.data import Dataset, Data class GradientDescent(MLAlgorithm): theta:np.ndarray alpha:float - def __init__(self, dataset:np.ndarray, learning_rate:float=0.1) -> None: - self._set_dataset(dataset) - - parameters = dataset.shape[1] - 1 #removing the result - self.theta = np.random.rand(parameters) + def __init__(self, dataset:Dataset, learning_rate:float=0.1) -> None: + self.__init__(dataset) + self.theta = np.random.rand(self.learnset.param) self.alpha = max(0, learning_rate) def learning_step(self) -> float: - x, y, m = self._split_data_target(self.learnset) + x, y, m, _ = self.learnset.as_tuple() self.theta -= self.alpha * (1/m) * np.sum((self._h0(x) - y) * x.T, axis=1) return self._loss(x, y, m) - def predict_loss(self, dataset:np.ndarray) -> float: - x, y, m = self._split_data_target(dataset) - return self._loss(x, y, m) + def predict_loss(self, dataset:Data) -> float: + return self._loss(dataset.x, dataset.y, dataset.size) def get_parameters(self): return self.theta.copy() @@ -51,3 +49,10 @@ class LogisticRegression(GradientDescent): h0 = self._h0(x) diff = -y*np.log(h0) -(1-y)*np.log(1-h0) return 1/m * np.sum(diff) + +class MultiLayerPerceptron(MLAlgorithm): + neurons: list[np.ndarray] + + def __init__(self, dataset:Dataset, layers:list[int]=[4,3]) -> None: + self.__init__(dataset) +