- moved common functions to separate file
- removed unused values and imports
- added momentum to NN
This commit is contained in:
2024-08-14 08:17:38 +02:00
parent 8e8e0b2d51
commit 142fe5ccdf
6 changed files with 115 additions and 81 deletions

View File

@@ -1,14 +1,14 @@
import random
from typing import Any
import numpy as np
import sklearn
import sklearn.cluster
import sklearn.linear_model
import sklearn.model_selection
import sklearn.neural_network
from typing import Any
from learning.ml import MLAlgorithm
from learning.data import Dataset, TargetType
from learning.supervised import LinearRegression, LogisticRegression, MultiLayerPerceptron
from learning.ml import MLAlgorithm
from learning.unsupervised import KMeans
DATASET = "datasets/"
@@ -67,8 +67,8 @@ def frogs() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(CLASSIFICATION + "frogs.csv", "Species", TargetType.MultiClassification)
ds.remove(["Family", "Genus", "RecordID"])
ds.factorize(["Species"])
size = [8, 5]
return (ds, MultiLayerPerceptron(ds, size, 0.1), sklearn.neural_network.MLPClassifier(size, 'relu'))
size = [18, 15, 12, 10, 8]
return (ds, MultiLayerPerceptron(ds, size), sklearn.neural_network.MLPClassifier(size, 'relu'))
def iris() -> tuple[Dataset, MLAlgorithm, Any]:
ds = Dataset(CLASSIFICATION + "iris.csv", "Class", TargetType.MultiClassification)
@@ -100,16 +100,17 @@ def iris_no_target() -> tuple[Dataset, MLAlgorithm, Any]:
if __name__ == "__main__":
np.set_printoptions(linewidth=np.inf, formatter={'float': '{:>10.5f}'.format})
rand = random.randint(0, 4294967295)
rand = np.random.randint(0, 4294967295)
#rand = 1997847910 # LiR for power_plant
#rand = 347617386 # LoR for electrical_grid
#rand = 1793295160 # MLP for iris
#rand = 2914000170 # MLP for frogs
#rand = 885416001 # KMe for frogs_no_target
np.random.seed(rand)
print(f"Using seed: {rand}")
ds, ml, sk = iris()
ds, ml, sk = frogs()
epochs, _, _ = ml.learn(1000, verbose=True)
ml.display_results()
@@ -122,7 +123,3 @@ if __name__ == "__main__":
print("========================")
ml.plot()
# migliori parametri trovati per electrical_grid
# temp = np.array([-48.28601, 0.00429, 0.07933, 0.02144, -0.04225, 0.36898, 0.24723, 0.36445, 0.21437, 0.29666, 0.22532, 0.38619, 0.24171, -113.65430])
# ml._set_parameters(temp)

View File

@@ -1,5 +1,5 @@
import pandas as pd
import numpy as np
import pandas as pd
from enum import Enum
from typing_extensions import Self
@@ -184,16 +184,3 @@ class ConfusionMatrix:
specificity_per_class = self.specificity_per_class()
support = np.sum(self.matrix, axis=1)
return np.average(specificity_per_class, weights=support)
if __name__ == "__main__":
ds = Dataset("datasets\\classification\\frogs.csv", "Species", TargetType.MultiClassification)
ds.remove(["Family", "Genus", "RecordID"])
ds.factorize(["Species"])
np.random.seed(0)
learn, test, valid = ds.get_dataset()
print(learn)
print(test)
print(valid)

55
src/learning/functions.py Normal file
View File

@@ -0,0 +1,55 @@
import numpy as np
NOT_ZERO = 1e-15
LEAKY_RELU = 0.2
# **********
# For NN
# **********
def relu(x:np.ndarray) -> np.ndarray:
return np.where(x < 0, 0, x)
def relu_derivative(x:np.ndarray) -> np.ndarray:
return np.where(x < 0, 0, 1)
def lrelu(x:np.ndarray) -> np.ndarray:
return np.where(x < 0, LEAKY_RELU * x, x)
def lrelu_derivative(x:np.ndarray) -> np.ndarray:
return np.where(x < 0, LEAKY_RELU, 1)
def softmax(x:np.ndarray) -> np.ndarray:
x = x - np.max(x, axis=1, keepdims=True) # for overflow
exp_x = np.exp(x)
sum_x = np.sum(exp_x, axis=1, keepdims=True)
return exp_x / sum_x
def softmax_derivative(h0:np.ndarray, y:np.ndarray) -> np.ndarray:
return h0 - y
# **********
# For loss
# **********
def square_loss(h0:np.ndarray, y:np.ndarray) -> float:
return np.mean((h0 - y) ** 2) / 2
def log_loss(h0:np.ndarray, y:np.ndarray) -> float:
return np.mean(- y*np.log(h0 + NOT_ZERO) - (1-y)*np.log(1-h0 + NOT_ZERO))
def cross_entropy_loss(h0:np.ndarray, y:np.ndarray) -> float:
return -np.mean(np.sum(y*np.log(h0 + NOT_ZERO), axis=1)) # mean is not "correct", but useful for comparing models
# **********
# Randoms
# **********
def r_squared(h0:np.ndarray, y:np.ndarray) -> float:
y_mean = np.mean(y)
ss_resid = np.sum((y - h0) ** 2)
ss_total = np.sum((y - y_mean) ** 2)
return 1 - (ss_resid / ss_total)
def with_bias(x:np.ndarray) -> np.ndarray:
ones = np.ones(shape=(x.shape[0], 1))
return np.hstack([x, ones])

View File

@@ -1,10 +1,11 @@
import sys
import numpy as np
from abc import ABC, abstractmethod
from plot import Plot
from tqdm import tqdm
from learning.data import ConfusionMatrix, Dataset, Data, TargetType
import numpy as np
from learning.functions import r_squared
class MLAlgorithm(ABC):
""" Classe generica per gli algoritmi di Machine Learning """
@@ -22,14 +23,12 @@ class MLAlgorithm(ABC):
self._validset = valid
self._testset = test
def with_bias(self, x:np.ndarray) -> np.ndarray:
return np.hstack([x, np.ones(shape=(x.shape[0], 1))])
def learn(self, epochs:int, early_stop:float=0.0000001, max_patience:int=10, verbose:bool=False) -> tuple[int, list, list]:
learn = []
valid = []
count = 0
patience = 0
best = (sys.float_info.max, [])
trange = range(epochs)
if verbose: trange = tqdm(trange, bar_format="Epochs {percentage:3.0f}% [{bar}] {elapsed}{postfix}")
@@ -45,13 +44,19 @@ class MLAlgorithm(ABC):
patience = 0
count += 1
learn.append(self._learning_step())
valid.append(self.validation_loss())
learn_loss = self._learning_step()
valid_loss = self.validation_loss()
if valid_loss < best[0]:
best = (valid_loss, self._get_parameters())
learn.append(learn_loss)
valid.append(valid_loss)
if verbose: trange.set_postfix({"learn": f"{learn[-1]:2.5f}", "validation": f"{valid[-1]:2.5f}"})
except KeyboardInterrupt: pass
if verbose: print(f"Loop ended after {count} epochs")
self._set_parameters(best[1])
self._learn_loss = learn
self._valid_loss = valid
return (count, learn, valid)
@@ -104,12 +109,7 @@ class MLAlgorithm(ABC):
def test_r_squared(self) -> float:
if self._target_type != TargetType.Regression:
return 0
h0 = self._h0(self._testset.x)
y_mean = np.mean(self._testset.y)
ss_total = np.sum((self._testset.y - y_mean) ** 2)
ss_resid = np.sum((self._testset.y - h0) ** 2)
return 1 - (ss_resid / ss_total)
return r_squared(self._h0(self._testset.x), self._testset.y)
@abstractmethod
def _h0(self, x:np.ndarray) -> np.ndarray: pass

View File

@@ -4,7 +4,7 @@ import numpy as np
from abc import abstractmethod
from learning.ml import MLAlgorithm
from learning.data import Dataset, Data
NOT_ZERO = 1e-15
from learning.functions import cross_entropy_loss, log_loss, lrelu, lrelu_derivative, softmax, softmax_derivative, square_loss, with_bias
class GradientDescent(MLAlgorithm):
theta:np.ndarray
@@ -22,12 +22,12 @@ class GradientDescent(MLAlgorithm):
regularization = (self.lambd / m) * self.theta
regularization[0] = 0
derivative = self.alpha * (1/m) * np.sum((self._h0(x) - y) * self.with_bias(x).T, axis=1)
derivative = self.alpha * np.mean((self._h0(x) - y) * with_bias(x).T, axis=1)
self.theta -= derivative + regularization
return self._loss(x, y, m)
return self._loss(x, y)
def _predict_loss(self, dataset:Data) -> float:
return self._loss(dataset.x, dataset.y, dataset.size)
return self._loss(dataset.x, dataset.y)
def _get_parameters(self):
return self.theta.copy()
@@ -36,31 +36,30 @@ class GradientDescent(MLAlgorithm):
self.theta = parameters
@abstractmethod
def _loss(self, x:np.ndarray, y:np.ndarray, m:int) -> float: pass
def _loss(self, x:np.ndarray, y:np.ndarray) -> float: pass
class LinearRegression(GradientDescent):
def _h0(self, x: np.ndarray) -> np.ndarray:
return self.theta.dot(self.with_bias(x).T)
return self.theta.dot(with_bias(x).T)
def _loss(self, x:np.ndarray, y:np.ndarray, m:int) -> float:
diff = (self._h0(x) - y)
return 1/(2*m) * np.sum(diff ** 2)
def _loss(self, x:np.ndarray, y:np.ndarray) -> float:
return square_loss(self._h0(x), y)
class LogisticRegression(GradientDescent):
def _h0(self, x: np.ndarray) -> np.ndarray:
return 1 / (1 + np.exp(-self.theta.dot(self.with_bias(x).T)))
return 1 / (1 + np.exp(-self.theta.dot(with_bias(x).T)))
def _loss(self, x:np.ndarray, y:np.ndarray, m:int) -> float:
h0 = self._h0(x)
diff = - y*np.log(h0 + NOT_ZERO) - (1-y)*np.log(1-h0 + NOT_ZERO)
return 1/m * np.sum(diff)
def _loss(self, x:np.ndarray, y:np.ndarray) -> float:
return log_loss(self._h0(x), y)
class MultiLayerPerceptron(MLAlgorithm):
layers: list[np.ndarray]
activations: list[np.ndarray]
previous_delta: list[np.ndarray]
momentum: float
learning_rate: float
def __init__(self, dataset:Dataset, layers:list[int], learning_rate:float=0.1) -> None:
def __init__(self, dataset:Dataset, layers:list[int], learning_rate:float=0.1, momentum:float=0.9) -> None:
super().__init__(dataset)
input = self._learnset.x.shape[1]
output = self._learnset.y.shape[1]
@@ -71,54 +70,52 @@ class MultiLayerPerceptron(MLAlgorithm):
self.layers = []
self.activations = []
self.previous_delta = []
self.momentum = momentum
self.learning_rate = learning_rate
for next in layers:
current = np.random.rand(input + 1, next) * np.sqrt(2 / input) # +1 bias, sqrt is He init
self.layers.append(current)
self.previous_delta.append(np.zeros(current.shape))
input = next
def _h0(self, x:np.ndarray) -> np.ndarray:
self.activations = [x]
for layer in self.layers:
x = self.with_bias(x)
x = x.dot(layer)
x = x * (x > 0) # activation function ReLU
x = lrelu(with_bias(x).dot(layer))
self.activations.append(x) # saving activation result
return self.softmax(x)
return softmax(x)
def _learning_step(self) -> float:
x, y, m, _ = self._learnset.as_tuple()
delta = self._h0(x) - y # first term is derivative of softmax
delta = softmax_derivative(self._h0(x), y)
for l in reversed(range(len(self.layers))):
activation = self.activations[l]
deltaW = np.dot(self.with_bias(activation).T, delta) / m
deltaW = np.dot(with_bias(activation).T, delta) / m
deltaW *= self.learning_rate
deltaW += self.momentum * self.previous_delta[l]
if l > 0:
delta = np.dot(delta, self.layers[l][:-1].T) # ignoring bias
delta[activation <= 0] = 0 # derivative ReLU
self.layers[l] -= deltaW * self.learning_rate
delta = np.dot(delta, self.layers[l][:-1].T) # ignoring bias
delta *= lrelu_derivative(activation)
self.layers[l] -= deltaW
self.previous_delta[l] = deltaW
return self._predict_loss(self._learnset)
def softmax(self, input:np.ndarray) -> np.ndarray:
input = input - np.max(input, axis=1, keepdims=True) # for overflow
exp_input = np.exp(input)
total_sum = np.sum(exp_input, axis=1, keepdims=True)
return exp_input / total_sum
def _predict_loss(self, dataset:Data) -> float: # cross-entropy
diff = dataset.y * np.log(self._h0(dataset.x) + NOT_ZERO)
return -np.mean(np.sum(diff, axis=1))
def _predict_loss(self, dataset:Data) -> float:
return cross_entropy_loss(self._h0(dataset.x), dataset.y)
def _get_parameters(self):
parameters = []
for x in self.layers:
parameters.append(x.copy())
parameters = { 'layers': [], 'previous_delta': [] }
for x in range(len(self.layers)):
parameters['layers'].append(self.layers[x].copy())
parameters['previous_delta'].append(self.previous_delta[x].copy())
return parameters
def _set_parameters(self, parameters):
self.layers = parameters
self.layers = parameters['layers']
self.previous_delta = parameters['previous_delta']

View File

@@ -1,10 +1,8 @@
import math as math
import numpy as np
from abc import abstractmethod
from learning.ml import MLAlgorithm
from learning.data import Dataset, Data
NOT_ZERO = 1e-15
class KMeans(MLAlgorithm):
def __init__(self, dataset: Dataset, clusters:int) -> None: