- fixed bias
- fixed confusion matrix
- added print of confusion matrix
- fixed dataset incorrectly splitting sharing classes with train and test
- fixed mlp not correctly initializing layers
- better print results
- break if nan in learning
This commit is contained in:
2024-08-21 14:52:25 +02:00
parent 338df0d18b
commit bba07c0b49
5 changed files with 89 additions and 56 deletions

View File

@@ -7,7 +7,7 @@ import sklearn.neural_network
from typing import Any from typing import Any
from learning.ml import MLAlgorithm from learning.ml import MLAlgorithm
from learning.data import Dataset, TargetType from learning.data import ConfusionMatrix, Dataset, TargetType
from learning.supervised import LinearRegression, LogisticRegression, MultiLayerPerceptron from learning.supervised import LinearRegression, LogisticRegression, MultiLayerPerceptron
from learning.unsupervised import KMeans from learning.unsupervised import KMeans
@@ -24,7 +24,7 @@ def auto_mpg() -> tuple[Dataset, MLAlgorithm, Any]:
ds.numbers(["HP"]) ds.numbers(["HP"])
ds.handle_na() ds.handle_na()
ds.normalize(excepts=["Cylinders","Year","Origin"]) ds.standardize(excepts=["Cylinders","Year","Origin"])
return (ds, LinearRegression(ds, learning_rate=0.0001), sklearn.linear_model.SGDRegressor()) return (ds, LinearRegression(ds, learning_rate=0.0001), sklearn.linear_model.SGDRegressor())
def automobile() -> tuple[Dataset, MLAlgorithm, Any]: def automobile() -> tuple[Dataset, MLAlgorithm, Any]:
@@ -34,12 +34,12 @@ def automobile() -> tuple[Dataset, MLAlgorithm, Any]:
ds.factorize(attributes_to_modify) ds.factorize(attributes_to_modify)
ds.numbers(["normalized-losses", "bore", "stroke", "horsepower", "peak-rpm", "price"]) ds.numbers(["normalized-losses", "bore", "stroke", "horsepower", "peak-rpm", "price"])
ds.handle_na() ds.handle_na()
ds.normalize(excepts=attributes_to_modify) ds.standardize(excepts=attributes_to_modify)
return (ds, LinearRegression(ds, learning_rate=0.003), sklearn.linear_model.SGDRegressor()) return (ds, LinearRegression(ds, learning_rate=0.003), sklearn.linear_model.SGDRegressor())
def power_plant() -> tuple[Dataset, MLAlgorithm, Any]: def power_plant() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(REGRESSION + "power-plant.csv", "energy-output", TargetType.Regression) ds = Dataset(REGRESSION + "power-plant.csv", "energy-output", TargetType.Regression)
ds.normalize(excepts=None) ds.standardize(excepts=None)
return (ds, LinearRegression(ds, learning_rate=0.1), sklearn.linear_model.SGDRegressor()) return (ds, LinearRegression(ds, learning_rate=0.1), sklearn.linear_model.SGDRegressor())
# ******************** # ********************
@@ -49,31 +49,39 @@ def power_plant() -> tuple[Dataset, MLAlgorithm, Any]:
def electrical_grid() -> tuple[Dataset, MLAlgorithm, Any]: def electrical_grid() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(CLASSIFICATION + "electrical_grid.csv", "stabf", TargetType.Classification) ds = Dataset(CLASSIFICATION + "electrical_grid.csv", "stabf", TargetType.Classification)
ds.factorize(["stabf"]) ds.factorize(["stabf"])
ds.normalize() ds.standardize()
return (ds, LogisticRegression(ds, learning_rate=100), sklearn.linear_model.LogisticRegression()) return (ds, LogisticRegression(ds, learning_rate=100), sklearn.linear_model.LogisticRegression())
def heart() -> tuple[Dataset, MLAlgorithm, Any]: def heart() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(CLASSIFICATION + "heart.csv", "Disease", TargetType.Classification) ds = Dataset(CLASSIFICATION + "heart.csv", "Disease", TargetType.Classification)
attributes_to_modify = ["Disease", "Sex", "ChestPainType"] attributes_to_modify = ["Disease", "Sex", "ChestPainType"]
ds.factorize(attributes_to_modify) ds.factorize(attributes_to_modify)
ds.normalize(excepts=attributes_to_modify) ds.standardize(excepts=attributes_to_modify)
return (ds, LogisticRegression(ds, learning_rate=0.01), sklearn.linear_model.LogisticRegression()) return (ds, LogisticRegression(ds, learning_rate=0.01), sklearn.linear_model.LogisticRegression())
# ******************** # ********************
# MultiLayerPerceptron # MultiLayerPerceptron
# ******************** # ********************
def electrical_grid_mlp() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(CLASSIFICATION + "electrical_grid.csv", "stabf", TargetType.MultiClassification)
ds.factorize(["stabf"])
ds.standardize()
size = [4, 3]
return (ds, MultiLayerPerceptron(ds, size, 0.08), sklearn.neural_network.MLPClassifier(size, 'relu'))
def frogs() -> tuple[Dataset, MLAlgorithm, Any]: def frogs() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(CLASSIFICATION + "frogs.csv", "Species", TargetType.MultiClassification) ds = Dataset(CLASSIFICATION + "frogs.csv", "Species", TargetType.MultiClassification)
ds.remove(["Family", "Genus", "RecordID"]) ds.remove(["Family", "Genus", "RecordID"])
ds.factorize(["Species"]) ds.factorize(["Species"])
size = [18, 15, 12, 10, 8] ds.standardize()
return (ds, MultiLayerPerceptron(ds, size), sklearn.neural_network.MLPClassifier(size, 'relu')) size = [18, 15, 12]
return (ds, MultiLayerPerceptron(ds, size, 0.047), sklearn.neural_network.MLPClassifier(size, 'relu'))
def iris() -> tuple[Dataset, MLAlgorithm, Any]: def iris() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(CLASSIFICATION + "iris.csv", "Class", TargetType.MultiClassification) ds = Dataset(CLASSIFICATION + "iris.csv", "Class", TargetType.MultiClassification)
ds.factorize(["Class"]) ds.factorize(["Class"])
ds.normalize() ds.standardize()
size = [4, 3] size = [4, 3]
return (ds, MultiLayerPerceptron(ds, size), sklearn.neural_network.MLPClassifier(size, 'relu')) return (ds, MultiLayerPerceptron(ds, size), sklearn.neural_network.MLPClassifier(size, 'relu'))
@@ -90,7 +98,7 @@ def frogs_no_target() -> tuple[Dataset, MLAlgorithm, Any]:
def iris_no_target() -> tuple[Dataset, MLAlgorithm, Any]: def iris_no_target() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(CLASSIFICATION + "iris.csv", "Class", TargetType.NoTarget) ds = Dataset(CLASSIFICATION + "iris.csv", "Class", TargetType.NoTarget)
ds.remove(["Class"]) ds.remove(["Class"])
ds.normalize() ds.standardize()
clusters = 3 clusters = 3
return (ds, KMeans(ds, clusters), sklearn.cluster.KMeans(clusters)) return (ds, KMeans(ds, clusters), sklearn.cluster.KMeans(clusters))
@@ -121,6 +129,9 @@ if __name__ == "__main__":
sk.set_params(max_iter=epochs) sk.set_params(max_iter=epochs)
sk.fit(learn.x, learn.y) sk.fit(learn.x, learn.y)
print(f"Sklearn : {abs(sk.score(test.x, test.y)):0.5f}") print(f"Sklearn : {abs(sk.score(test.x, test.y)):0.5f}")
if ds.target_type == TargetType.Classification or ds.target_type == TargetType.MultiClassification:
conf = ConfusionMatrix(test.y, sk.predict(test.x))
conf.print()
print("========================") print("========================")
ml.plot() ml.plot()

View File

@@ -2,6 +2,8 @@ import numpy as np
import pandas as pd import pandas as pd
from enum import Enum from enum import Enum
import sklearn
import sklearn.metrics
from typing_extensions import Self from typing_extensions import Self
class TargetType(Enum): class TargetType(Enum):
@@ -46,7 +48,7 @@ class Dataset:
self.data.pop(col) self.data.pop(col)
return self return self
def normalize(self, excepts:list[str]=[]) -> Self: def standardize(self, excepts:list[str]=[]) -> Self:
if excepts is None: excepts = [] if excepts is None: excepts = []
else: excepts.append(self.target) else: excepts.append(self.target)
@@ -81,48 +83,67 @@ class Dataset:
splitted = [data[ data[:,0] == k ] for k in classes ] splitted = [data[ data[:,0] == k ] for k in classes ]
total_each = np.average([len(x) for x in splitted]).astype(int) total_each = np.average([len(x) for x in splitted]).astype(int)
seed = np.random.randint(0, 4294967295)
rng = np.random.default_rng(seed)
data = [] data = []
for x in splitted: for x in splitted:
samples = rng.choice(x, size=total_each, replace=True, shuffle=False) total = total_each - x.shape[0]
data.append(samples) data.append(x)
if total > 0:
samples = np.random.choice(x, size=total, replace=True)
data.append(samples)
return np.concatenate(data, axis=0) return np.concatenate(data, axis=0)
def split_data_target(self, data:np.ndarray) -> tuple[np.ndarray, np.ndarray]: def split_data_target(self, data:np.ndarray) -> Data:
target = data[:, 0] if self.target_type != TargetType.NoTarget else None target = data[:, 0] if self.target_type != TargetType.NoTarget else None
data = data[:, 1:] data = data[:, 1:]
if self.target_type == TargetType.MultiClassification: if self.target_type == TargetType.MultiClassification:
target = target.astype(int) target = target.astype(int)
uniques = np.unique(target).shape[0] uniques = np.unique(target).shape[0]
target = np.eye(uniques)[target] target = np.eye(uniques)[target]
return (data, target) return Data(data, target)
def get_dataset(self, test_frac:float=0.2, valid_frac:float=0.2) -> tuple[Data, Data, Data]:
data = self.data.to_numpy()
data = self.prepare_classification(data)
np.random.shuffle(data)
def split_dataset(self, data:np.ndarray, valid_frac:float, test_frac:float) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
total = data.shape[0] total = data.shape[0]
valid_cutoff = int(total * valid_frac) valid_cutoff = int(total * valid_frac)
test_cutoff = int(total * test_frac) + valid_cutoff test_cutoff = int(total * test_frac) + valid_cutoff
learn = data[test_cutoff:]
valid = data[:valid_cutoff] valid = data[:valid_cutoff]
test = data[valid_cutoff:test_cutoff] test = data[valid_cutoff:test_cutoff]
learn = data[test_cutoff:] return (learn, valid, test)
l = []
for data in [learn, test, valid]: def get_dataset(self, test_frac:float=0.2, valid_frac:float=0.2) -> tuple[Data, Data, Data]:
data, target = self.split_data_target(data) data = self.data.to_numpy()
l.append(Data(data, target)) max_iter = 10
return l while max_iter > 0:
max_iter -= 1
try:
np.random.shuffle(data)
learn, valid, test = self.split_dataset(data, valid_frac, test_frac)
if self.target_type == TargetType.Regression or self.target_type == TargetType.NoTarget:
learn = self.prepare_classification(learn)
valid = self.prepare_classification(valid)
test = self.prepare_classification(test)
learn = self.split_data_target(learn)
valid = self.split_data_target(valid)
test = self.split_data_target(test)
return (learn, valid, test)
except:
if max_iter == 0:
raise Exception("Could not split dataset evenly for the classes, try again with another seed or add more cases in the dataset")
class ConfusionMatrix: class ConfusionMatrix:
matrix:np.ndarray matrix:np.ndarray
def __init__(self, dataset_y: np.ndarray, predictions_y:np.ndarray) -> None: def __init__(self, dataset_y: np.ndarray, predictions_y:np.ndarray) -> None:
if len(dataset_y.shape) > 1:
dataset_y = np.argmax(dataset_y, axis=1)
if len(predictions_y.shape) > 1:
predictions_y = np.argmax(predictions_y, axis=1)
classes = len(np.unique(dataset_y)) classes = len(np.unique(dataset_y))
conf_matrix = np.zeros((classes, classes), dtype=int) conf_matrix = np.zeros((classes, classes), dtype=int)
@@ -137,6 +158,7 @@ class ConfusionMatrix:
self.fp = np.sum(conf_matrix, axis=0) - self.tp self.fp = np.sum(conf_matrix, axis=0) - self.tp
self.fn = np.sum(conf_matrix, axis=1) - self.tp self.fn = np.sum(conf_matrix, axis=1) - self.tp
self.tn = self.total - (self.tp + self.fp + self.fn) self.tn = self.total - (self.tp + self.fp + self.fn)
self.kappa = sklearn.metrics.cohen_kappa_score(dataset_y, predictions_y)
def divide_ignore_zero(self, a:np.ndarray, b:np.ndarray) -> np.ndarray: def divide_ignore_zero(self, a:np.ndarray, b:np.ndarray) -> np.ndarray:
with np.errstate(divide='ignore', invalid='ignore'): with np.errstate(divide='ignore', invalid='ignore'):
@@ -156,12 +178,6 @@ class ConfusionMatrix:
def specificity_per_class(self) -> np.ndarray: def specificity_per_class(self) -> np.ndarray:
return self.divide_ignore_zero(self.tn, self.tn + self.fp) return self.divide_ignore_zero(self.tn, self.tn + self.fp)
def cohen_kappa_per_class(self) -> np.ndarray:
p_pl = (self.tp + self.fn) * (self.tp + self.fp) / (self.total ** 2)
p_ne = (self.tn + self.fp) * (self.tn + self.fn) / (self.total ** 2)
p = p_pl + p_ne
return (self.accuracy_per_class() - p) / (1 - p)
def f1_score_per_class(self) -> np.ndarray: def f1_score_per_class(self) -> np.ndarray:
prec = self.precision_per_class() prec = self.precision_per_class()
rec = self.recall_per_class() rec = self.recall_per_class()
@@ -187,5 +203,12 @@ class ConfusionMatrix:
return np.average(f1_per_class, weights=self.weights) return np.average(f1_per_class, weights=self.weights)
def cohen_kappa(self) -> float: def cohen_kappa(self) -> float:
kappa_per_class = self.cohen_kappa_per_class() return self.kappa
return np.average(kappa_per_class, weights=self.weights)
def print(self)-> None:
print(f"Cohen Kappa: {self.cohen_kappa():0.5f}")
print(f"Accuracy : {self.accuracy():0.5f} - classes {self.accuracy_per_class()}")
print(f"Precision : {self.precision():0.5f} - classes {self.precision_per_class()}")
print(f"Recall : {self.recall():0.5f} - classes {self.recall_per_class()}")
print(f"Specificity: {self.specificity():0.5f} - classes {self.specificity_per_class()}")
print(f"F1 score : {self.f1_score():0.5f} - classes {self.f1_score_per_class()}")

View File

@@ -19,9 +19,10 @@ def lrelu_derivative(x:np.ndarray) -> np.ndarray:
return np.where(x < 0, LEAKY_RELU, 1) return np.where(x < 0, LEAKY_RELU, 1)
def softmax(x:np.ndarray) -> np.ndarray: def softmax(x:np.ndarray) -> np.ndarray:
x = x - np.max(x, axis=1, keepdims=True) # for overflow axis = 1 if len(x.shape) != 1 else 0
x = x - np.max(x, axis=axis, keepdims=True) # for overflow
exp_x = np.exp(x) exp_x = np.exp(x)
sum_x = np.sum(exp_x, axis=1, keepdims=True) sum_x = np.sum(exp_x, axis=axis, keepdims=True)
return exp_x / sum_x return exp_x / sum_x
def softmax_derivative(h0:np.ndarray, y:np.ndarray) -> np.ndarray: def softmax_derivative(h0:np.ndarray, y:np.ndarray) -> np.ndarray:
return h0 - y return h0 - y
@@ -58,5 +59,6 @@ def r_squared(h0:np.ndarray, y:np.ndarray) -> float:
return 1 - (ss_resid / ss_total) return 1 - (ss_resid / ss_total)
def with_bias(x:np.ndarray) -> np.ndarray: def with_bias(x:np.ndarray) -> np.ndarray:
ones = np.ones(shape=(x.shape[0], 1)) shape = (x.shape[0], 1) if len(x.shape) != 1 else (1,)
return np.hstack([x, ones]) ones = np.ones(shape)
return np.hstack([ones, x])

View File

@@ -50,6 +50,7 @@ class MLAlgorithm(ABC):
if valid_loss < best[0]: if valid_loss < best[0]:
best = (valid_loss, self._get_parameters()) best = (valid_loss, self._get_parameters())
if np.isnan(learn_loss) or np.isnan(valid_loss): break
learn.append(learn_loss) learn.append(learn_loss)
valid.append(valid_loss) valid.append(valid_loss)
if verbose: trange.set_postfix({"learn": f"{learn[-1]:2.5f}", "validation": f"{valid[-1]:2.5f}"}) if verbose: trange.set_postfix({"learn": f"{learn[-1]:2.5f}", "validation": f"{valid[-1]:2.5f}"})
@@ -82,17 +83,15 @@ class MLAlgorithm(ABC):
print(f"Loss learn : {self.learning_loss():0.5f}") print(f"Loss learn : {self.learning_loss():0.5f}")
print(f"Loss valid : {self.validation_loss():0.5f}") print(f"Loss valid : {self.validation_loss():0.5f}")
print(f"Loss test : {self.test_loss():0.5f}") print(f"Loss test : {self.test_loss():0.5f}")
print("========================")
if self._target_type == TargetType.Regression: if self._target_type == TargetType.Regression:
print(f"Pearson : {self.test_pearson():0.5f}") print(f"Pearson : {self.test_pearson():0.5f}")
print(f"R^2 : {self.test_r_squared():0.5f}") print(f"R^2 : {self.test_r_squared():0.5f}")
print("========================")
elif self._target_type != TargetType.NoTarget: elif self._target_type != TargetType.NoTarget:
conf = self.test_confusion_matrix() conf = self.test_confusion_matrix()
print(f"Accuracy : {conf.accuracy():0.5f} - classes {conf.accuracy_per_class()}") conf.print()
print(f"Precision : {conf.precision():0.5f} - classes {conf.precision_per_class()}") print("========================")
print(f"Recall : {conf.recall():0.5f} - classes {conf.recall_per_class()}")
print(f"Specificity: {conf.specificity():0.5f} - classes {conf.specificity_per_class()}")
print(f"Cohen Kappa: {conf.cohen_kappa():0.5f} - classes {conf.cohen_kappa_per_class()}")
print(f"F1 score : {conf.f1_score():0.5f} - classes {conf.f1_score_per_class()}")
def test_confusion_matrix(self) -> ConfusionMatrix: def test_confusion_matrix(self) -> ConfusionMatrix:
if self._target_type != TargetType.Classification\ if self._target_type != TargetType.Classification\
@@ -103,9 +102,7 @@ class MLAlgorithm(ABC):
y = self._testset.y y = self._testset.y
if h0.ndim == 1: if h0.ndim == 1:
h0 = np.where(h0 > 0.5, 1, 0) h0 = np.where(h0 > 0.5, 1, 0)
else:
h0 = np.argmax(h0, axis=1)
y = np.argmax(y, axis=1)
return ConfusionMatrix(y, h0) return ConfusionMatrix(y, h0)
def test_pearson(self) -> float: def test_pearson(self) -> float:

View File

@@ -66,8 +66,8 @@ class MultiLayerPerceptron(MLAlgorithm):
input = self._learnset.x.shape[1] input = self._learnset.x.shape[1]
output = self._learnset.y.shape[1] output = self._learnset.y.shape[1]
if type(layers) is not list[int]: if not all(isinstance(x, int) for x in layers):
layers = [4, 3, output] raise Exception("The list of layers must oly be of integers!")
else: layers.append(output) else: layers.append(output)
self.layers = [] self.layers = []
@@ -90,6 +90,9 @@ class MultiLayerPerceptron(MLAlgorithm):
self.activations.append(x) # saving activation result self.activations.append(x) # saving activation result
return softmax(x) return softmax(x)
def _predict_loss(self, dataset:Data) -> float:
return cross_entropy_loss(self._h0(dataset.x), dataset.y)
def _learning_step(self) -> float: def _learning_step(self) -> float:
x, y, m, _ = self._learnset.as_tuple() x, y, m, _ = self._learnset.as_tuple()
delta = softmax_derivative(self._h0(x), y) delta = softmax_derivative(self._h0(x), y)
@@ -100,7 +103,7 @@ class MultiLayerPerceptron(MLAlgorithm):
deltaW *= self.learning_rate deltaW *= self.learning_rate
deltaW += self.momentum * self.previous_delta[l] deltaW += self.momentum * self.previous_delta[l]
delta = np.dot(delta, self.layers[l][:-1].T) # ignoring bias delta = np.dot(delta, self.layers[l][1:].T) # ignoring bias
delta *= lrelu_derivative(activation) delta *= lrelu_derivative(activation)
self.layers[l] -= deltaW self.layers[l] -= deltaW
@@ -108,9 +111,6 @@ class MultiLayerPerceptron(MLAlgorithm):
return self._predict_loss(self._learnset) return self._predict_loss(self._learnset)
def _predict_loss(self, dataset:Data) -> float:
return cross_entropy_loss(self._h0(dataset.x), dataset.y)
def _get_parameters(self): def _get_parameters(self):
parameters = { 'layers': [], 'previous_delta': [] } parameters = { 'layers': [], 'previous_delta': [] }
for x in range(len(self.layers)): for x in range(len(self.layers)):