diff --git a/datasets/heart.csv b/datasets/classification/heart.csv similarity index 100% rename from datasets/heart.csv rename to datasets/classification/heart.csv diff --git a/datasets/iris.csv b/datasets/classification/iris.csv similarity index 100% rename from datasets/iris.csv rename to datasets/classification/iris.csv diff --git a/datasets/auto-mpg.csv b/datasets/regression/auto-mpg.csv similarity index 100% rename from datasets/auto-mpg.csv rename to datasets/regression/auto-mpg.csv diff --git a/src/app.py b/src/app.py index 1fc6609..6400793 100644 --- a/src/app.py +++ b/src/app.py @@ -1,21 +1,31 @@ +import random +from typing import Any +import numpy as np +import sklearn +import sklearn.linear_model +import sklearn.model_selection +import sklearn.neural_network from learning.data import Dataset, TargetType from learning.supervised import LinearRegression, LogisticRegression, MultiLayerPerceptron from learning.ml import MLAlgorithm -from typing import Callable DATASET = "datasets/" REGRESSION = DATASET + "regression/" CLASSIFICATION = DATASET + "classification/" -def auto_mpg() -> tuple[int, MLAlgorithm]: +# ******************** +# Linear Regression +# ******************** + +def auto_mpg() -> tuple[Dataset, MLAlgorithm, Any]: ds = Dataset(REGRESSION + "auto-mpg.csv", "MPG", TargetType.Regression) ds.numbers(["HP"]) ds.handle_na() ds.normalize(excepts=["Cylinders","Year","Origin"]) - return (1000, LinearRegression(ds, learning_rate=0.0001)) + return (ds, LinearRegression(ds, learning_rate=0.0001), sklearn.linear_model.LinearRegression()) -def automobile() -> tuple[int, MLAlgorithm]: +def automobile() -> tuple[Dataset, MLAlgorithm, Any]: ds = Dataset(REGRESSION + "automobile.csv", "symboling", TargetType.Regression) attributes_to_modify = ["fuel-system", "engine-type", "drive-wheels", "body-style", "make", "engine-location", "aspiration", "fuel-type", "num-of-cylinders", "num-of-doors"] @@ -23,41 +33,68 @@ def automobile() -> tuple[int, MLAlgorithm]: ds.numbers(["normalized-losses", "bore", "stroke", "horsepower", "peak-rpm", "price"]) ds.handle_na() ds.normalize(excepts=attributes_to_modify) - return (1000, LinearRegression(ds, learning_rate=0.004)) + return (ds, LinearRegression(ds, learning_rate=0.004), sklearn.linear_model.LinearRegression()) -def power_plant() -> tuple[int, MLAlgorithm]: +def power_plant() -> tuple[Dataset, MLAlgorithm, Any]: ds = Dataset(REGRESSION + "power-plant.csv", "energy-output", TargetType.Regression) ds.normalize() - return (80, LinearRegression(ds, learning_rate=0.1)) + return (ds, LinearRegression(ds, learning_rate=0.1), sklearn.linear_model.LinearRegression()) +# ******************** +# Logistic Regression +# ******************** -def electrical_grid() -> tuple[int, MLAlgorithm]: +def electrical_grid() -> tuple[Dataset, MLAlgorithm, Any]: ds = Dataset(CLASSIFICATION + "electrical_grid.csv", "stabf", TargetType.Classification) ds.factorize(["stabf"]) ds.normalize() - return (1000, LogisticRegression(ds, learning_rate=0.08)) + return (ds, LogisticRegression(ds, learning_rate=100), sklearn.linear_model.LogisticRegression()) -def frogs() -> tuple[int, MLAlgorithm]: +def heart() -> tuple[Dataset, MLAlgorithm, Any]: + ds = Dataset(CLASSIFICATION + "heart.csv", "Disease", TargetType.Classification) + attributes_to_modify = ["Disease", "Sex", "ChestPainType"] + ds.factorize(attributes_to_modify) + ds.normalize(excepts=attributes_to_modify) + return (ds, LogisticRegression(ds, learning_rate=0.001), sklearn.linear_model.LogisticRegression()) + +# ******************** +# MultiLayerPerceptron +# ******************** + +def frogs() -> tuple[Dataset, MLAlgorithm, Any]: ds = Dataset(CLASSIFICATION + "frogs.csv", "Species", TargetType.MultiClassification) ds.remove(["Family", "Genus", "RecordID"]) ds.factorize(["Species"]) - return (1000, MultiLayerPerceptron(ds, [4, 3])) + return (ds, MultiLayerPerceptron(ds, [4, 3]), sklearn.neural_network.MLPClassifier([4, 3], 'relu')) +def iris() -> tuple[Dataset, MLAlgorithm, Any]: + ds = Dataset(CLASSIFICATION + "iris.csv", "Class", TargetType.MultiClassification) + ds.factorize(["Class"]) + ds.normalize() + return (ds, MultiLayerPerceptron(ds, [4, 3]), sklearn.neural_network.MLPClassifier([4, 3], 'relu')) - - -def learn_dataset(function:Callable[..., tuple[int, MLAlgorithm]], epochs:int=10000, verbose=True)-> MLAlgorithm: - skip, ml = function() - ml.learn(epochs, verbose=verbose) - - err_tests = ml.test_loss() - err_valid = ml.validation_loss() - err_learn = ml.learning_loss() - print(f"Loss value: tests={err_tests:1.5f}, valid={err_valid:1.5f}, learn={err_learn:1.5f}") - - ml.plot(skip=skip) - return ml +# ******************** +# Main & random +# ******************** if __name__ == "__main__": - ml = learn_dataset(frogs) - print(ml.accuracy(ml.testset)) + np.set_printoptions(linewidth=np.inf, formatter={'float': '{:>10.5f}'.format}) + rand = random.randint(0, 4294967295) + np.random.seed(rand) + print(f"Using seed: {rand}") + + ds, ml, sk = electrical_grid() + ml.learn(10000, verbose=True) + ml.display_results() + + np.random.seed(rand) + learn, test, valid = ds.get_dataset() + sk.fit(learn.x, learn.y) + print(f"Sklearn : {sk.score(test.x, test.y):0.5f}") + print("========================") + + ml.plot() + +# migliori parametri trovati per electrical_grid +# temp = np.array([-48.28601, 0.00429, 0.07933, 0.02144, -0.04225, 0.36898, 0.24723, 0.36445, 0.21437, 0.29666, 0.22532, 0.38619, 0.24171, -113.65430]) +# ml._set_parameters(temp) diff --git a/src/learning/data.py b/src/learning/data.py index 7fbac19..19528c6 100644 --- a/src/learning/data.py +++ b/src/learning/data.py @@ -73,7 +73,7 @@ class Dataset: self.data = self.data.dropna() return self - def get_dataset(self, test_frac:float=0.15, valid_frac:float=0.15) -> tuple[Data, Data, Data]: + def get_dataset(self, test_frac:float=0.2, valid_frac:float=0.2) -> tuple[Data, Data, Data]: data = self.data.to_numpy() data = np.insert(data, 1, 1, axis=1) # adding bias np.random.shuffle(data) @@ -97,6 +97,43 @@ class Dataset: l.append(Data(ds, target)) return l +class ConfusionMatrix: + matrix:np.ndarray + + def __init__(self, dataset_y: np.ndarray, predictions_y:np.ndarray) -> None: + classes = len(np.unique(dataset_y)) + conf_matrix = np.zeros((classes, classes), dtype=int) + + for actual, prediction in zip(dataset_y, predictions_y): + conf_matrix[int(actual), int(prediction)] += 1 + self.matrix = conf_matrix + + def accuracy_per_class(self) -> np.ndarray: + return np.diag(self.matrix) / np.sum(self.matrix, axis=1) + + def precision_per_class(self) -> np.ndarray: + tp = np.diagonal(self.matrix) + fp = np.sum(self.matrix, axis=0) - tp + return tp / (tp + fp) + + def recall_per_class(self) -> np.ndarray: + tp = np.diagonal(self.matrix) + fn = np.sum(self.matrix, axis=1) - tp + return tp / (tp + fn) + + def f1_score_per_class(self) -> np.ndarray: + prec = self.precision_per_class() + rec = self.recall_per_class() + return 2 * (prec * rec) / (prec + rec) + + def specificity_per_class(self) -> np.ndarray: + total = np.sum(self.matrix) + tp = np.diagonal(self.matrix) + fp = np.sum(self.matrix, axis=0) - tp + fn = np.sum(self.matrix, axis=1) - tp + tn = total - (tp + fp + fn) + return tn / (tn + fp) + if __name__ == "__main__": ds = Dataset("datasets\\classification\\frogs.csv", "Species", TargetType.MultiClassification) ds.remove(["Family", "Genus", "RecordID"]) diff --git a/src/learning/ml.py b/src/learning/ml.py index 97becef..a58878b 100644 --- a/src/learning/ml.py +++ b/src/learning/ml.py @@ -1,25 +1,26 @@ from abc import ABC, abstractmethod from plot import Plot from tqdm import tqdm -from learning.data import Dataset, Data +from learning.data import ConfusionMatrix, Dataset, Data, TargetType import numpy as np class MLAlgorithm(ABC): """ Classe generica per gli algoritmi di Machine Learning """ - - learnset: Data - validset: Data - testset: Data + _target_type: TargetType + _learnset: Data + _validset: Data + _testset: Data _learn_loss: list[float] _valid_loss: list[float] def __init__(self, dataset:Dataset) -> None: - learn, test, valid = dataset.get_dataset(0.2, 0.2) - self.learnset = learn - self.validset = valid - self.testset = test + learn, test, valid = dataset.get_dataset() + self._target_type = dataset.target_type + self._learnset = learn + self._validset = valid + self._testset = test def learn(self, epochs:int, early_stop:float=0.0000001, max_patience:int=10, verbose:bool=False) -> tuple[int, list, list]: learn = [] @@ -33,15 +34,15 @@ class MLAlgorithm(ABC): for _ in trange: if count > 1 and valid[-2] - valid[-1] < early_stop: if patience >= max_patience: - self.set_parameters(backup) + self._set_parameters(backup) break patience += 1 else: - backup = self.get_parameters() + backup = self._get_parameters() patience = 0 count += 1 - learn.append(self.learning_step()) + learn.append(self._learning_step()) valid.append(self.validation_loss()) if verbose: trange.set_postfix({"learn": f"{learn[-1]:2.5f}", "validation": f"{valid[-1]:2.5f}"}) @@ -53,13 +54,13 @@ class MLAlgorithm(ABC): return (count, learn, valid) def learning_loss(self) -> float: - return self.predict_loss(self.learnset) + return self._predict_loss(self._learnset) def validation_loss(self) -> float: - return self.predict_loss(self.validset) + return self._predict_loss(self._validset) def test_loss(self) -> float: - return self.predict_loss(self.testset) + return self._predict_loss(self._testset) def plot(self, skip:int=1000) -> None: skip = skip if len(self._learn_loss) > skip else 0 @@ -68,29 +69,46 @@ class MLAlgorithm(ABC): plot.line("validation", "red", data=self._valid_loss[skip:]) plot.wait() - def confusion_matrix(self, dataset:Data) -> np.ndarray: - h0 = np.where(self._h0(dataset.x) > 0.5, 1, 0) + def display_results(self) -> None: + print("======== RESULT ========") + print(f"Loss learn : {self.learning_loss():0.5f}") + print(f"Loss valid : {self.validation_loss():0.5f}") + print(f"Loss test : {self.test_loss():0.5f}") + if self._target_type == TargetType.Regression: + print(f"R^2 : {self.test_r_squared():0.5f}") + else: + conf = self.test_confusion_matrix() + print(f"Accuracy : {conf.accuracy_per_class()}") + print(f"Precision : {conf.precision_per_class()}") + print(f"Recall : {conf.recall_per_class()}") + print(f"F1 score : {conf.f1_score_per_class()}") + print(f"Specificity: {conf.specificity_per_class()}") - classes = len(np.unique(dataset.y)) - conf_matrix = np.zeros((classes, classes), dtype=int) + def test_confusion_matrix(self) -> ConfusionMatrix: + if self._target_type != TargetType.Classification\ + and self._target_type != TargetType.MultiClassification: + return None - for actual, prediction in zip(dataset.y, h0): - conf_matrix[int(actual), int(prediction)] += 1 - return conf_matrix + h0 = np.where(self._h0(self._testset.x) > 0.5, 1, 0) + return ConfusionMatrix(self._testset.y, h0) - def accuracy(self, dataset:Data) -> float: - conf = self.confusion_matrix(dataset) - correct = np.sum(np.diagonal(conf)) - total = np.sum(conf) - return correct / total + def test_r_squared(self) -> float: + if self._target_type != TargetType.Regression: + return 0 + + h0 = self._h0(self._testset.x) + y_mean = np.mean(self._testset.y) + ss_total = np.sum((self._testset.y - y_mean) ** 2) + ss_resid = np.sum((self._testset.y - h0) ** 2) + return 1 - (ss_resid / ss_total) @abstractmethod def _h0(self, x:np.ndarray) -> np.ndarray: pass @abstractmethod - def learning_step(self) -> float: pass + def _learning_step(self) -> float: pass @abstractmethod - def predict_loss(self, dataset:Data) -> float: pass + def _predict_loss(self, dataset:Data) -> float: pass @abstractmethod - def get_parameters(self): pass + def _get_parameters(self): pass @abstractmethod - def set_parameters(self, parameters): pass + def _set_parameters(self, parameters): pass diff --git a/src/learning/supervised.py b/src/learning/supervised.py index ec4fcd4..4bc01e4 100644 --- a/src/learning/supervised.py +++ b/src/learning/supervised.py @@ -12,12 +12,12 @@ class GradientDescent(MLAlgorithm): def __init__(self, dataset:Dataset, learning_rate:float=0.1, regularization:float=0.01) -> None: super().__init__(dataset) - self.theta = np.random.rand(self.learnset.param) + self.theta = np.random.rand(self._learnset.param) self.alpha = max(0, learning_rate) self.lambd = max(0, regularization) - def learning_step(self) -> float: - x, y, m, _ = self.learnset.as_tuple() + def _learning_step(self) -> float: + x, y, m, _ = self._learnset.as_tuple() regularization = (self.lambd / m) * self.theta regularization[0] = 0 @@ -25,13 +25,13 @@ class GradientDescent(MLAlgorithm): self.theta -= derivative + regularization return self._loss(x, y, m) - def predict_loss(self, dataset:Data) -> float: + def _predict_loss(self, dataset:Data) -> float: return self._loss(dataset.x, dataset.y, dataset.size) - def get_parameters(self): + def _get_parameters(self): return self.theta.copy() - def set_parameters(self, parameters): + def _set_parameters(self, parameters): self.theta = parameters @abstractmethod @@ -51,8 +51,9 @@ class LogisticRegression(GradientDescent): return 1 / (1 + np.exp(-self.theta.dot(x.T))) def _loss(self, x:np.ndarray, y:np.ndarray, m:int) -> float: + not_zero = 1e-15 h0 = self._h0(x) - diff = -y*np.log(h0) -(1-y)*np.log(1-h0) + diff = - y*np.log(h0 + not_zero) - (1-y)*np.log(1-h0 + not_zero) return 1/m * np.sum(diff) class MultiLayerPerceptron(MLAlgorithm): @@ -61,8 +62,8 @@ class MultiLayerPerceptron(MLAlgorithm): def __init__(self, dataset:Dataset, layers:list[int]) -> None: super().__init__(dataset) - input = self.learnset.x.shape[1] - output = self.learnset.y.shape[1] + input = self._learnset.x.shape[1] + output = self._learnset.y.shape[1] if type(layers) is not list[int]: layers = [4, 3, output] @@ -93,20 +94,20 @@ class MultiLayerPerceptron(MLAlgorithm): input = input.T / total_sum return input.T - def learning_step(self) -> float: + def _learning_step(self) -> float: raise NotImplemented - def predict_loss(self, dataset:Data) -> float: + def _predict_loss(self, dataset:Data) -> float: diff = self._h0(dataset.x) - dataset.y return 1/(2*dataset.size) * np.sum(diff ** 2) - def get_parameters(self): + def _get_parameters(self): parameters = [] for x in self.layers: parameters.append(x.copy()) return parameters - def set_parameters(self, parameters): + def _set_parameters(self, parameters): self.layers = parameters