From a99253911613a11930bdb5cf1bb195994eff76e4 Mon Sep 17 00:00:00 2001 From: Berack96 Date: Mon, 12 Aug 2024 16:59:17 +0200 Subject: [PATCH] MLP - added backprop - fixed data for multiclass - fixed confusion matrix --- src/learning/data.py | 75 +++++++++++++++++++++++++------------- src/learning/ml.py | 13 ++++++- src/learning/supervised.py | 56 ++++++++++++++++------------ 3 files changed, 93 insertions(+), 51 deletions(-) diff --git a/src/learning/data.py b/src/learning/data.py index 2f74d02..f6d8b2e 100644 --- a/src/learning/data.py +++ b/src/learning/data.py @@ -75,9 +75,35 @@ class Dataset: self.data = self.data.dropna() return self + def prepare_classification(self, data:np.ndarray) -> np.ndarray: + if self.target_type == TargetType.Regression or self.target_type == TargetType.NoTarget: + return data + + classes = np.unique(data[:, 0]) + splitted = [data[ data[:,0] == k ] for k in classes ] + total_each = np.average([len(x) for x in splitted]).astype(int) + + rng = np.random.default_rng() + data = [] + for x in splitted: + samples = rng.choice(x, size=total_each, replace=True, shuffle=False) + data.append(samples) + + return np.concatenate(data, axis=0) + + def split_data_target(self, data:np.ndarray) -> tuple[np.ndarray, np.ndarray]: + target = data[:, 0] if self.target_type != TargetType.NoTarget else None + data = data[:, 1:] + if self.target_type == TargetType.MultiClassification: + target = target.astype(int) + uniques = np.unique(target).shape[0] + target = np.eye(uniques)[target] + return (data, target) + def get_dataset(self, test_frac:float=0.2, valid_frac:float=0.2) -> tuple[Data, Data, Data]: data = self.data.to_numpy() - data = np.insert(data, 1, 1, axis=1) # adding bias + data = self.prepare_classification(data) + np.random.shuffle(data) total = data.shape[0] @@ -89,14 +115,9 @@ class Dataset: learn = data[test_cutoff:] l = [] - for ds in [learn, test, valid]: - target = ds[:, 0] if self.target_type != TargetType.NoTarget else None - ds = ds[:, 1:] - if self.target_type == TargetType.MultiClassification: - target = target.astype(int) - uniques = np.unique(target).shape[0] - target = np.eye(uniques)[target] - l.append(Data(ds, target)) + for data in [learn, test, valid]: + data, target = self.split_data_target(data) + l.append(Data(data, target)) return l class ConfusionMatrix: @@ -108,38 +129,40 @@ class ConfusionMatrix: for actual, prediction in zip(dataset_y, predictions_y): conf_matrix[int(actual), int(prediction)] += 1 + self.matrix = conf_matrix + self.classes = classes + self.total = dataset_y.shape[0] + self.tp = np.diagonal(conf_matrix) + self.fp = np.sum(conf_matrix, axis=0) - self.tp + self.fn = np.sum(conf_matrix, axis=1) - self.tp + self.tn = self.total - (self.tp + self.fp + self.fn) + + def divide_ignore_zero(self, a:np.ndarray, b:np.ndarray) -> np.ndarray: + with np.errstate(divide='ignore', invalid='ignore'): + c = np.true_divide(a, b) + c[c == np.inf] = 0 + return np.nan_to_num(c) def accuracy_per_class(self) -> np.ndarray: - return np.diag(self.matrix) / np.sum(self.matrix, axis=1) + return self.tp / np.sum(self.matrix, axis=1) def precision_per_class(self) -> np.ndarray: - tp = np.diagonal(self.matrix) - fp = np.sum(self.matrix, axis=0) - tp - return tp / (tp + fp) + return self.divide_ignore_zero(self.tp, self.tp + self.fp) def recall_per_class(self) -> np.ndarray: - tp = np.diagonal(self.matrix) - fn = np.sum(self.matrix, axis=1) - tp - return tp / (tp + fn) + return self.divide_ignore_zero(self.tp, self.tp + self.fn) def f1_score_per_class(self) -> np.ndarray: prec = self.precision_per_class() rec = self.recall_per_class() - return 2 * (prec * rec) / (prec + rec) + return self.divide_ignore_zero(2 * prec * rec, prec + rec) def specificity_per_class(self) -> np.ndarray: - total = np.sum(self.matrix) - tp = np.diagonal(self.matrix) - fp = np.sum(self.matrix, axis=0) - tp - fn = np.sum(self.matrix, axis=1) - tp - tn = total - (tp + fp + fn) - return tn / (tn + fp) + return self.divide_ignore_zero(self.tn, self.tn + self.fp) def accuracy(self) -> float: - tp = np.diag(self.matrix).sum() - total = self.matrix.sum() - return tp / total + return self.tp.sum() / self.total def precision(self) -> float: precision_per_class = self.precision_per_class() diff --git a/src/learning/ml.py b/src/learning/ml.py index 5fe0c9c..e39143e 100644 --- a/src/learning/ml.py +++ b/src/learning/ml.py @@ -22,6 +22,9 @@ class MLAlgorithm(ABC): self._validset = valid self._testset = test + def with_bias(self, x:np.ndarray) -> np.ndarray: + return np.hstack([x, np.ones(shape=(x.shape[0], 1))]) + def learn(self, epochs:int, early_stop:float=0.0000001, max_patience:int=10, verbose:bool=False) -> tuple[int, list, list]: learn = [] valid = [] @@ -89,8 +92,14 @@ class MLAlgorithm(ABC): and self._target_type != TargetType.MultiClassification: return None - h0 = np.where(self._h0(self._testset.x) > 0.5, 1, 0) - return ConfusionMatrix(self._testset.y, h0) + h0 = self._h0(self._testset.x) + y = self._testset.y + if h0.ndim == 1: + h0 = np.where(h0 > 0.5, 1, 0) + else: + h0 = np.argmax(h0, axis=1) + y = np.argmax(y, axis=1) + return ConfusionMatrix(y, h0) def test_r_squared(self) -> float: if self._target_type != TargetType.Regression: diff --git a/src/learning/supervised.py b/src/learning/supervised.py index 4bc01e4..1c069e4 100644 --- a/src/learning/supervised.py +++ b/src/learning/supervised.py @@ -12,7 +12,7 @@ class GradientDescent(MLAlgorithm): def __init__(self, dataset:Dataset, learning_rate:float=0.1, regularization:float=0.01) -> None: super().__init__(dataset) - self.theta = np.random.rand(self._learnset.param) + self.theta = np.random.rand(self._learnset.param + 1) # bias self.alpha = max(0, learning_rate) self.lambd = max(0, regularization) @@ -21,7 +21,7 @@ class GradientDescent(MLAlgorithm): regularization = (self.lambd / m) * self.theta regularization[0] = 0 - derivative = self.alpha * (1/m) * np.sum((self._h0(x) - y) * x.T, axis=1) + derivative = self.alpha * (1/m) * np.sum((self._h0(x) - y) * self.with_bias(x).T, axis=1) self.theta -= derivative + regularization return self._loss(x, y, m) @@ -40,7 +40,7 @@ class GradientDescent(MLAlgorithm): class LinearRegression(GradientDescent): def _h0(self, x: np.ndarray) -> np.ndarray: - return self.theta.dot(x.T) + return self.theta.dot(self.with_bias(x).T) def _loss(self, x:np.ndarray, y:np.ndarray, m:int) -> float: diff = (self._h0(x) - y) @@ -48,7 +48,7 @@ class LinearRegression(GradientDescent): class LogisticRegression(GradientDescent): def _h0(self, x: np.ndarray) -> np.ndarray: - return 1 / (1 + np.exp(-self.theta.dot(x.T))) + return 1 / (1 + np.exp(-self.theta.dot(self.with_bias(x).T))) def _loss(self, x:np.ndarray, y:np.ndarray, m:int) -> float: not_zero = 1e-15 @@ -58,7 +58,7 @@ class LogisticRegression(GradientDescent): class MultiLayerPerceptron(MLAlgorithm): layers: list[np.ndarray] - calculated: list[np.ndarray] + activations: list[np.ndarray] def __init__(self, dataset:Dataset, layers:list[int]) -> None: super().__init__(dataset) @@ -70,33 +70,43 @@ class MultiLayerPerceptron(MLAlgorithm): else: layers.append(output) self.layers = [] - self.calculated = [] + self.activations = [] for next in layers: - current = np.random.rand(input, next) + current = np.random.rand(input + 1, next) # +1 bias self.layers.append(current) - input = next + 1 # bias + input = next def _h0(self, x:np.ndarray) -> np.ndarray: - input = x - for i, layer in enumerate(self.layers): - if i != 0: - ones = np.ones(shape=(input.shape[0], 1)) - input = np.hstack([input, ones]) - input = input.dot(layer) - input = input * (input > 0) # activation function ReLU - self.calculated[i] = input # saving previous result - return self.soft_max(input) + self.activations = [x] - def soft_max(self, input:np.ndarray) -> np.ndarray: - input = np.exp(input) - total_sum = np.sum(input, axis=1) - input = input.T / total_sum - return input.T + for layer in self.layers: + x = self.with_bias(x) + x = x.dot(layer) + x = x * (x > 0) # activation function ReLU + self.activations.append(x) # saving activation result + return self.softmax(x) def _learning_step(self) -> float: + x, y, m, _ = self._learnset.as_tuple() + delta = self._h0(x) - y # first term is derivative of softmax - raise NotImplemented + for l in reversed(range(len(self.layers))): + activation = self.activations[l] + deltaW = np.dot(self.with_bias(activation).T, delta) / m + + if l > 0: + delta = np.dot(delta, self.layers[l][:-1].T) # ignoring bias + delta[activation <= 0] = 0 # derivative ReLU + self.layers[l] -= deltaW + + return self._predict_loss(self._learnset) + + def softmax(self, input:np.ndarray) -> np.ndarray: + input = input - np.max(input, axis=1, keepdims=True) # for overflow + exp_input = np.exp(input) + total_sum = np.sum(exp_input, axis=1, keepdims=True) + return exp_input / total_sum def _predict_loss(self, dataset:Data) -> float: diff = self._h0(dataset.x) - dataset.y