- renamed many variables
- results better displayed
- fixed log(0) error with 1e-15
This commit is contained in:
2024-06-11 16:30:59 +02:00
parent 17a7ea9435
commit f800fd1a60
7 changed files with 164 additions and 71 deletions

View File

@@ -1,21 +1,31 @@
import random
from typing import Any
import numpy as np
import sklearn
import sklearn.linear_model
import sklearn.model_selection
import sklearn.neural_network
from learning.data import Dataset, TargetType
from learning.supervised import LinearRegression, LogisticRegression, MultiLayerPerceptron
from learning.ml import MLAlgorithm
from typing import Callable
DATASET = "datasets/"
REGRESSION = DATASET + "regression/"
CLASSIFICATION = DATASET + "classification/"
def auto_mpg() -> tuple[int, MLAlgorithm]:
# ********************
# Linear Regression
# ********************
def auto_mpg() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(REGRESSION + "auto-mpg.csv", "MPG", TargetType.Regression)
ds.numbers(["HP"])
ds.handle_na()
ds.normalize(excepts=["Cylinders","Year","Origin"])
return (1000, LinearRegression(ds, learning_rate=0.0001))
return (ds, LinearRegression(ds, learning_rate=0.0001), sklearn.linear_model.LinearRegression())
def automobile() -> tuple[int, MLAlgorithm]:
def automobile() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(REGRESSION + "automobile.csv", "symboling", TargetType.Regression)
attributes_to_modify = ["fuel-system", "engine-type", "drive-wheels", "body-style", "make", "engine-location", "aspiration", "fuel-type", "num-of-cylinders", "num-of-doors"]
@@ -23,41 +33,68 @@ def automobile() -> tuple[int, MLAlgorithm]:
ds.numbers(["normalized-losses", "bore", "stroke", "horsepower", "peak-rpm", "price"])
ds.handle_na()
ds.normalize(excepts=attributes_to_modify)
return (1000, LinearRegression(ds, learning_rate=0.004))
return (ds, LinearRegression(ds, learning_rate=0.004), sklearn.linear_model.LinearRegression())
def power_plant() -> tuple[int, MLAlgorithm]:
def power_plant() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(REGRESSION + "power-plant.csv", "energy-output", TargetType.Regression)
ds.normalize()
return (80, LinearRegression(ds, learning_rate=0.1))
return (ds, LinearRegression(ds, learning_rate=0.1), sklearn.linear_model.LinearRegression())
# ********************
# Logistic Regression
# ********************
def electrical_grid() -> tuple[int, MLAlgorithm]:
def electrical_grid() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(CLASSIFICATION + "electrical_grid.csv", "stabf", TargetType.Classification)
ds.factorize(["stabf"])
ds.normalize()
return (1000, LogisticRegression(ds, learning_rate=0.08))
return (ds, LogisticRegression(ds, learning_rate=100), sklearn.linear_model.LogisticRegression())
def frogs() -> tuple[int, MLAlgorithm]:
def heart() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(CLASSIFICATION + "heart.csv", "Disease", TargetType.Classification)
attributes_to_modify = ["Disease", "Sex", "ChestPainType"]
ds.factorize(attributes_to_modify)
ds.normalize(excepts=attributes_to_modify)
return (ds, LogisticRegression(ds, learning_rate=0.001), sklearn.linear_model.LogisticRegression())
# ********************
# MultiLayerPerceptron
# ********************
def frogs() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(CLASSIFICATION + "frogs.csv", "Species", TargetType.MultiClassification)
ds.remove(["Family", "Genus", "RecordID"])
ds.factorize(["Species"])
return (1000, MultiLayerPerceptron(ds, [4, 3]))
return (ds, MultiLayerPerceptron(ds, [4, 3]), sklearn.neural_network.MLPClassifier([4, 3], 'relu'))
def iris() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(CLASSIFICATION + "iris.csv", "Class", TargetType.MultiClassification)
ds.factorize(["Class"])
ds.normalize()
return (ds, MultiLayerPerceptron(ds, [4, 3]), sklearn.neural_network.MLPClassifier([4, 3], 'relu'))
def learn_dataset(function:Callable[..., tuple[int, MLAlgorithm]], epochs:int=10000, verbose=True)-> MLAlgorithm:
skip, ml = function()
ml.learn(epochs, verbose=verbose)
err_tests = ml.test_loss()
err_valid = ml.validation_loss()
err_learn = ml.learning_loss()
print(f"Loss value: tests={err_tests:1.5f}, valid={err_valid:1.5f}, learn={err_learn:1.5f}")
ml.plot(skip=skip)
return ml
# ********************
# Main & random
# ********************
if __name__ == "__main__":
ml = learn_dataset(frogs)
print(ml.accuracy(ml.testset))
np.set_printoptions(linewidth=np.inf, formatter={'float': '{:>10.5f}'.format})
rand = random.randint(0, 4294967295)
np.random.seed(rand)
print(f"Using seed: {rand}")
ds, ml, sk = electrical_grid()
ml.learn(10000, verbose=True)
ml.display_results()
np.random.seed(rand)
learn, test, valid = ds.get_dataset()
sk.fit(learn.x, learn.y)
print(f"Sklearn : {sk.score(test.x, test.y):0.5f}")
print("========================")
ml.plot()
# migliori parametri trovati per electrical_grid
# temp = np.array([-48.28601, 0.00429, 0.07933, 0.02144, -0.04225, 0.36898, 0.24723, 0.36445, 0.21437, 0.29666, 0.22532, 0.38619, 0.24171, -113.65430])
# ml._set_parameters(temp)

View File

@@ -73,7 +73,7 @@ class Dataset:
self.data = self.data.dropna()
return self
def get_dataset(self, test_frac:float=0.15, valid_frac:float=0.15) -> tuple[Data, Data, Data]:
def get_dataset(self, test_frac:float=0.2, valid_frac:float=0.2) -> tuple[Data, Data, Data]:
data = self.data.to_numpy()
data = np.insert(data, 1, 1, axis=1) # adding bias
np.random.shuffle(data)
@@ -97,6 +97,43 @@ class Dataset:
l.append(Data(ds, target))
return l
class ConfusionMatrix:
matrix:np.ndarray
def __init__(self, dataset_y: np.ndarray, predictions_y:np.ndarray) -> None:
classes = len(np.unique(dataset_y))
conf_matrix = np.zeros((classes, classes), dtype=int)
for actual, prediction in zip(dataset_y, predictions_y):
conf_matrix[int(actual), int(prediction)] += 1
self.matrix = conf_matrix
def accuracy_per_class(self) -> np.ndarray:
return np.diag(self.matrix) / np.sum(self.matrix, axis=1)
def precision_per_class(self) -> np.ndarray:
tp = np.diagonal(self.matrix)
fp = np.sum(self.matrix, axis=0) - tp
return tp / (tp + fp)
def recall_per_class(self) -> np.ndarray:
tp = np.diagonal(self.matrix)
fn = np.sum(self.matrix, axis=1) - tp
return tp / (tp + fn)
def f1_score_per_class(self) -> np.ndarray:
prec = self.precision_per_class()
rec = self.recall_per_class()
return 2 * (prec * rec) / (prec + rec)
def specificity_per_class(self) -> np.ndarray:
total = np.sum(self.matrix)
tp = np.diagonal(self.matrix)
fp = np.sum(self.matrix, axis=0) - tp
fn = np.sum(self.matrix, axis=1) - tp
tn = total - (tp + fp + fn)
return tn / (tn + fp)
if __name__ == "__main__":
ds = Dataset("datasets\\classification\\frogs.csv", "Species", TargetType.MultiClassification)
ds.remove(["Family", "Genus", "RecordID"])

View File

@@ -1,25 +1,26 @@
from abc import ABC, abstractmethod
from plot import Plot
from tqdm import tqdm
from learning.data import Dataset, Data
from learning.data import ConfusionMatrix, Dataset, Data, TargetType
import numpy as np
class MLAlgorithm(ABC):
""" Classe generica per gli algoritmi di Machine Learning """
learnset: Data
validset: Data
testset: Data
_target_type: TargetType
_learnset: Data
_validset: Data
_testset: Data
_learn_loss: list[float]
_valid_loss: list[float]
def __init__(self, dataset:Dataset) -> None:
learn, test, valid = dataset.get_dataset(0.2, 0.2)
self.learnset = learn
self.validset = valid
self.testset = test
learn, test, valid = dataset.get_dataset()
self._target_type = dataset.target_type
self._learnset = learn
self._validset = valid
self._testset = test
def learn(self, epochs:int, early_stop:float=0.0000001, max_patience:int=10, verbose:bool=False) -> tuple[int, list, list]:
learn = []
@@ -33,15 +34,15 @@ class MLAlgorithm(ABC):
for _ in trange:
if count > 1 and valid[-2] - valid[-1] < early_stop:
if patience >= max_patience:
self.set_parameters(backup)
self._set_parameters(backup)
break
patience += 1
else:
backup = self.get_parameters()
backup = self._get_parameters()
patience = 0
count += 1
learn.append(self.learning_step())
learn.append(self._learning_step())
valid.append(self.validation_loss())
if verbose: trange.set_postfix({"learn": f"{learn[-1]:2.5f}", "validation": f"{valid[-1]:2.5f}"})
@@ -53,13 +54,13 @@ class MLAlgorithm(ABC):
return (count, learn, valid)
def learning_loss(self) -> float:
return self.predict_loss(self.learnset)
return self._predict_loss(self._learnset)
def validation_loss(self) -> float:
return self.predict_loss(self.validset)
return self._predict_loss(self._validset)
def test_loss(self) -> float:
return self.predict_loss(self.testset)
return self._predict_loss(self._testset)
def plot(self, skip:int=1000) -> None:
skip = skip if len(self._learn_loss) > skip else 0
@@ -68,29 +69,46 @@ class MLAlgorithm(ABC):
plot.line("validation", "red", data=self._valid_loss[skip:])
plot.wait()
def confusion_matrix(self, dataset:Data) -> np.ndarray:
h0 = np.where(self._h0(dataset.x) > 0.5, 1, 0)
def display_results(self) -> None:
print("======== RESULT ========")
print(f"Loss learn : {self.learning_loss():0.5f}")
print(f"Loss valid : {self.validation_loss():0.5f}")
print(f"Loss test : {self.test_loss():0.5f}")
if self._target_type == TargetType.Regression:
print(f"R^2 : {self.test_r_squared():0.5f}")
else:
conf = self.test_confusion_matrix()
print(f"Accuracy : {conf.accuracy_per_class()}")
print(f"Precision : {conf.precision_per_class()}")
print(f"Recall : {conf.recall_per_class()}")
print(f"F1 score : {conf.f1_score_per_class()}")
print(f"Specificity: {conf.specificity_per_class()}")
classes = len(np.unique(dataset.y))
conf_matrix = np.zeros((classes, classes), dtype=int)
def test_confusion_matrix(self) -> ConfusionMatrix:
if self._target_type != TargetType.Classification\
and self._target_type != TargetType.MultiClassification:
return None
for actual, prediction in zip(dataset.y, h0):
conf_matrix[int(actual), int(prediction)] += 1
return conf_matrix
h0 = np.where(self._h0(self._testset.x) > 0.5, 1, 0)
return ConfusionMatrix(self._testset.y, h0)
def accuracy(self, dataset:Data) -> float:
conf = self.confusion_matrix(dataset)
correct = np.sum(np.diagonal(conf))
total = np.sum(conf)
return correct / total
def test_r_squared(self) -> float:
if self._target_type != TargetType.Regression:
return 0
h0 = self._h0(self._testset.x)
y_mean = np.mean(self._testset.y)
ss_total = np.sum((self._testset.y - y_mean) ** 2)
ss_resid = np.sum((self._testset.y - h0) ** 2)
return 1 - (ss_resid / ss_total)
@abstractmethod
def _h0(self, x:np.ndarray) -> np.ndarray: pass
@abstractmethod
def learning_step(self) -> float: pass
def _learning_step(self) -> float: pass
@abstractmethod
def predict_loss(self, dataset:Data) -> float: pass
def _predict_loss(self, dataset:Data) -> float: pass
@abstractmethod
def get_parameters(self): pass
def _get_parameters(self): pass
@abstractmethod
def set_parameters(self, parameters): pass
def _set_parameters(self, parameters): pass

View File

@@ -12,12 +12,12 @@ class GradientDescent(MLAlgorithm):
def __init__(self, dataset:Dataset, learning_rate:float=0.1, regularization:float=0.01) -> None:
super().__init__(dataset)
self.theta = np.random.rand(self.learnset.param)
self.theta = np.random.rand(self._learnset.param)
self.alpha = max(0, learning_rate)
self.lambd = max(0, regularization)
def learning_step(self) -> float:
x, y, m, _ = self.learnset.as_tuple()
def _learning_step(self) -> float:
x, y, m, _ = self._learnset.as_tuple()
regularization = (self.lambd / m) * self.theta
regularization[0] = 0
@@ -25,13 +25,13 @@ class GradientDescent(MLAlgorithm):
self.theta -= derivative + regularization
return self._loss(x, y, m)
def predict_loss(self, dataset:Data) -> float:
def _predict_loss(self, dataset:Data) -> float:
return self._loss(dataset.x, dataset.y, dataset.size)
def get_parameters(self):
def _get_parameters(self):
return self.theta.copy()
def set_parameters(self, parameters):
def _set_parameters(self, parameters):
self.theta = parameters
@abstractmethod
@@ -51,8 +51,9 @@ class LogisticRegression(GradientDescent):
return 1 / (1 + np.exp(-self.theta.dot(x.T)))
def _loss(self, x:np.ndarray, y:np.ndarray, m:int) -> float:
not_zero = 1e-15
h0 = self._h0(x)
diff = -y*np.log(h0) -(1-y)*np.log(1-h0)
diff = - y*np.log(h0 + not_zero) - (1-y)*np.log(1-h0 + not_zero)
return 1/m * np.sum(diff)
class MultiLayerPerceptron(MLAlgorithm):
@@ -61,8 +62,8 @@ class MultiLayerPerceptron(MLAlgorithm):
def __init__(self, dataset:Dataset, layers:list[int]) -> None:
super().__init__(dataset)
input = self.learnset.x.shape[1]
output = self.learnset.y.shape[1]
input = self._learnset.x.shape[1]
output = self._learnset.y.shape[1]
if type(layers) is not list[int]:
layers = [4, 3, output]
@@ -93,20 +94,20 @@ class MultiLayerPerceptron(MLAlgorithm):
input = input.T / total_sum
return input.T
def learning_step(self) -> float:
def _learning_step(self) -> float:
raise NotImplemented
def predict_loss(self, dataset:Data) -> float:
def _predict_loss(self, dataset:Data) -> float:
diff = self._h0(dataset.x) - dataset.y
return 1/(2*dataset.size) * np.sum(diff ** 2)
def get_parameters(self):
def _get_parameters(self):
parameters = []
for x in self.layers:
parameters.append(x.copy())
return parameters
def set_parameters(self, parameters):
def _set_parameters(self, parameters):
self.layers = parameters