MLP
- added backprop - fixed data for multiclass - fixed confusion matrix
This commit is contained in:
@@ -75,9 +75,35 @@ class Dataset:
|
|||||||
self.data = self.data.dropna()
|
self.data = self.data.dropna()
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def prepare_classification(self, data:np.ndarray) -> np.ndarray:
|
||||||
|
if self.target_type == TargetType.Regression or self.target_type == TargetType.NoTarget:
|
||||||
|
return data
|
||||||
|
|
||||||
|
classes = np.unique(data[:, 0])
|
||||||
|
splitted = [data[ data[:,0] == k ] for k in classes ]
|
||||||
|
total_each = np.average([len(x) for x in splitted]).astype(int)
|
||||||
|
|
||||||
|
rng = np.random.default_rng()
|
||||||
|
data = []
|
||||||
|
for x in splitted:
|
||||||
|
samples = rng.choice(x, size=total_each, replace=True, shuffle=False)
|
||||||
|
data.append(samples)
|
||||||
|
|
||||||
|
return np.concatenate(data, axis=0)
|
||||||
|
|
||||||
|
def split_data_target(self, data:np.ndarray) -> tuple[np.ndarray, np.ndarray]:
|
||||||
|
target = data[:, 0] if self.target_type != TargetType.NoTarget else None
|
||||||
|
data = data[:, 1:]
|
||||||
|
if self.target_type == TargetType.MultiClassification:
|
||||||
|
target = target.astype(int)
|
||||||
|
uniques = np.unique(target).shape[0]
|
||||||
|
target = np.eye(uniques)[target]
|
||||||
|
return (data, target)
|
||||||
|
|
||||||
def get_dataset(self, test_frac:float=0.2, valid_frac:float=0.2) -> tuple[Data, Data, Data]:
|
def get_dataset(self, test_frac:float=0.2, valid_frac:float=0.2) -> tuple[Data, Data, Data]:
|
||||||
data = self.data.to_numpy()
|
data = self.data.to_numpy()
|
||||||
data = np.insert(data, 1, 1, axis=1) # adding bias
|
data = self.prepare_classification(data)
|
||||||
|
|
||||||
np.random.shuffle(data)
|
np.random.shuffle(data)
|
||||||
|
|
||||||
total = data.shape[0]
|
total = data.shape[0]
|
||||||
@@ -89,14 +115,9 @@ class Dataset:
|
|||||||
learn = data[test_cutoff:]
|
learn = data[test_cutoff:]
|
||||||
|
|
||||||
l = []
|
l = []
|
||||||
for ds in [learn, test, valid]:
|
for data in [learn, test, valid]:
|
||||||
target = ds[:, 0] if self.target_type != TargetType.NoTarget else None
|
data, target = self.split_data_target(data)
|
||||||
ds = ds[:, 1:]
|
l.append(Data(data, target))
|
||||||
if self.target_type == TargetType.MultiClassification:
|
|
||||||
target = target.astype(int)
|
|
||||||
uniques = np.unique(target).shape[0]
|
|
||||||
target = np.eye(uniques)[target]
|
|
||||||
l.append(Data(ds, target))
|
|
||||||
return l
|
return l
|
||||||
|
|
||||||
class ConfusionMatrix:
|
class ConfusionMatrix:
|
||||||
@@ -108,38 +129,40 @@ class ConfusionMatrix:
|
|||||||
|
|
||||||
for actual, prediction in zip(dataset_y, predictions_y):
|
for actual, prediction in zip(dataset_y, predictions_y):
|
||||||
conf_matrix[int(actual), int(prediction)] += 1
|
conf_matrix[int(actual), int(prediction)] += 1
|
||||||
|
|
||||||
self.matrix = conf_matrix
|
self.matrix = conf_matrix
|
||||||
|
self.classes = classes
|
||||||
|
self.total = dataset_y.shape[0]
|
||||||
|
self.tp = np.diagonal(conf_matrix)
|
||||||
|
self.fp = np.sum(conf_matrix, axis=0) - self.tp
|
||||||
|
self.fn = np.sum(conf_matrix, axis=1) - self.tp
|
||||||
|
self.tn = self.total - (self.tp + self.fp + self.fn)
|
||||||
|
|
||||||
|
def divide_ignore_zero(self, a:np.ndarray, b:np.ndarray) -> np.ndarray:
|
||||||
|
with np.errstate(divide='ignore', invalid='ignore'):
|
||||||
|
c = np.true_divide(a, b)
|
||||||
|
c[c == np.inf] = 0
|
||||||
|
return np.nan_to_num(c)
|
||||||
|
|
||||||
def accuracy_per_class(self) -> np.ndarray:
|
def accuracy_per_class(self) -> np.ndarray:
|
||||||
return np.diag(self.matrix) / np.sum(self.matrix, axis=1)
|
return self.tp / np.sum(self.matrix, axis=1)
|
||||||
|
|
||||||
def precision_per_class(self) -> np.ndarray:
|
def precision_per_class(self) -> np.ndarray:
|
||||||
tp = np.diagonal(self.matrix)
|
return self.divide_ignore_zero(self.tp, self.tp + self.fp)
|
||||||
fp = np.sum(self.matrix, axis=0) - tp
|
|
||||||
return tp / (tp + fp)
|
|
||||||
|
|
||||||
def recall_per_class(self) -> np.ndarray:
|
def recall_per_class(self) -> np.ndarray:
|
||||||
tp = np.diagonal(self.matrix)
|
return self.divide_ignore_zero(self.tp, self.tp + self.fn)
|
||||||
fn = np.sum(self.matrix, axis=1) - tp
|
|
||||||
return tp / (tp + fn)
|
|
||||||
|
|
||||||
def f1_score_per_class(self) -> np.ndarray:
|
def f1_score_per_class(self) -> np.ndarray:
|
||||||
prec = self.precision_per_class()
|
prec = self.precision_per_class()
|
||||||
rec = self.recall_per_class()
|
rec = self.recall_per_class()
|
||||||
return 2 * (prec * rec) / (prec + rec)
|
return self.divide_ignore_zero(2 * prec * rec, prec + rec)
|
||||||
|
|
||||||
def specificity_per_class(self) -> np.ndarray:
|
def specificity_per_class(self) -> np.ndarray:
|
||||||
total = np.sum(self.matrix)
|
return self.divide_ignore_zero(self.tn, self.tn + self.fp)
|
||||||
tp = np.diagonal(self.matrix)
|
|
||||||
fp = np.sum(self.matrix, axis=0) - tp
|
|
||||||
fn = np.sum(self.matrix, axis=1) - tp
|
|
||||||
tn = total - (tp + fp + fn)
|
|
||||||
return tn / (tn + fp)
|
|
||||||
|
|
||||||
def accuracy(self) -> float:
|
def accuracy(self) -> float:
|
||||||
tp = np.diag(self.matrix).sum()
|
return self.tp.sum() / self.total
|
||||||
total = self.matrix.sum()
|
|
||||||
return tp / total
|
|
||||||
|
|
||||||
def precision(self) -> float:
|
def precision(self) -> float:
|
||||||
precision_per_class = self.precision_per_class()
|
precision_per_class = self.precision_per_class()
|
||||||
|
|||||||
@@ -22,6 +22,9 @@ class MLAlgorithm(ABC):
|
|||||||
self._validset = valid
|
self._validset = valid
|
||||||
self._testset = test
|
self._testset = test
|
||||||
|
|
||||||
|
def with_bias(self, x:np.ndarray) -> np.ndarray:
|
||||||
|
return np.hstack([x, np.ones(shape=(x.shape[0], 1))])
|
||||||
|
|
||||||
def learn(self, epochs:int, early_stop:float=0.0000001, max_patience:int=10, verbose:bool=False) -> tuple[int, list, list]:
|
def learn(self, epochs:int, early_stop:float=0.0000001, max_patience:int=10, verbose:bool=False) -> tuple[int, list, list]:
|
||||||
learn = []
|
learn = []
|
||||||
valid = []
|
valid = []
|
||||||
@@ -89,8 +92,14 @@ class MLAlgorithm(ABC):
|
|||||||
and self._target_type != TargetType.MultiClassification:
|
and self._target_type != TargetType.MultiClassification:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
h0 = np.where(self._h0(self._testset.x) > 0.5, 1, 0)
|
h0 = self._h0(self._testset.x)
|
||||||
return ConfusionMatrix(self._testset.y, h0)
|
y = self._testset.y
|
||||||
|
if h0.ndim == 1:
|
||||||
|
h0 = np.where(h0 > 0.5, 1, 0)
|
||||||
|
else:
|
||||||
|
h0 = np.argmax(h0, axis=1)
|
||||||
|
y = np.argmax(y, axis=1)
|
||||||
|
return ConfusionMatrix(y, h0)
|
||||||
|
|
||||||
def test_r_squared(self) -> float:
|
def test_r_squared(self) -> float:
|
||||||
if self._target_type != TargetType.Regression:
|
if self._target_type != TargetType.Regression:
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ class GradientDescent(MLAlgorithm):
|
|||||||
|
|
||||||
def __init__(self, dataset:Dataset, learning_rate:float=0.1, regularization:float=0.01) -> None:
|
def __init__(self, dataset:Dataset, learning_rate:float=0.1, regularization:float=0.01) -> None:
|
||||||
super().__init__(dataset)
|
super().__init__(dataset)
|
||||||
self.theta = np.random.rand(self._learnset.param)
|
self.theta = np.random.rand(self._learnset.param + 1) # bias
|
||||||
self.alpha = max(0, learning_rate)
|
self.alpha = max(0, learning_rate)
|
||||||
self.lambd = max(0, regularization)
|
self.lambd = max(0, regularization)
|
||||||
|
|
||||||
@@ -21,7 +21,7 @@ class GradientDescent(MLAlgorithm):
|
|||||||
|
|
||||||
regularization = (self.lambd / m) * self.theta
|
regularization = (self.lambd / m) * self.theta
|
||||||
regularization[0] = 0
|
regularization[0] = 0
|
||||||
derivative = self.alpha * (1/m) * np.sum((self._h0(x) - y) * x.T, axis=1)
|
derivative = self.alpha * (1/m) * np.sum((self._h0(x) - y) * self.with_bias(x).T, axis=1)
|
||||||
self.theta -= derivative + regularization
|
self.theta -= derivative + regularization
|
||||||
return self._loss(x, y, m)
|
return self._loss(x, y, m)
|
||||||
|
|
||||||
@@ -40,7 +40,7 @@ class GradientDescent(MLAlgorithm):
|
|||||||
|
|
||||||
class LinearRegression(GradientDescent):
|
class LinearRegression(GradientDescent):
|
||||||
def _h0(self, x: np.ndarray) -> np.ndarray:
|
def _h0(self, x: np.ndarray) -> np.ndarray:
|
||||||
return self.theta.dot(x.T)
|
return self.theta.dot(self.with_bias(x).T)
|
||||||
|
|
||||||
def _loss(self, x:np.ndarray, y:np.ndarray, m:int) -> float:
|
def _loss(self, x:np.ndarray, y:np.ndarray, m:int) -> float:
|
||||||
diff = (self._h0(x) - y)
|
diff = (self._h0(x) - y)
|
||||||
@@ -48,7 +48,7 @@ class LinearRegression(GradientDescent):
|
|||||||
|
|
||||||
class LogisticRegression(GradientDescent):
|
class LogisticRegression(GradientDescent):
|
||||||
def _h0(self, x: np.ndarray) -> np.ndarray:
|
def _h0(self, x: np.ndarray) -> np.ndarray:
|
||||||
return 1 / (1 + np.exp(-self.theta.dot(x.T)))
|
return 1 / (1 + np.exp(-self.theta.dot(self.with_bias(x).T)))
|
||||||
|
|
||||||
def _loss(self, x:np.ndarray, y:np.ndarray, m:int) -> float:
|
def _loss(self, x:np.ndarray, y:np.ndarray, m:int) -> float:
|
||||||
not_zero = 1e-15
|
not_zero = 1e-15
|
||||||
@@ -58,7 +58,7 @@ class LogisticRegression(GradientDescent):
|
|||||||
|
|
||||||
class MultiLayerPerceptron(MLAlgorithm):
|
class MultiLayerPerceptron(MLAlgorithm):
|
||||||
layers: list[np.ndarray]
|
layers: list[np.ndarray]
|
||||||
calculated: list[np.ndarray]
|
activations: list[np.ndarray]
|
||||||
|
|
||||||
def __init__(self, dataset:Dataset, layers:list[int]) -> None:
|
def __init__(self, dataset:Dataset, layers:list[int]) -> None:
|
||||||
super().__init__(dataset)
|
super().__init__(dataset)
|
||||||
@@ -70,33 +70,43 @@ class MultiLayerPerceptron(MLAlgorithm):
|
|||||||
else: layers.append(output)
|
else: layers.append(output)
|
||||||
|
|
||||||
self.layers = []
|
self.layers = []
|
||||||
self.calculated = []
|
self.activations = []
|
||||||
|
|
||||||
for next in layers:
|
for next in layers:
|
||||||
current = np.random.rand(input, next)
|
current = np.random.rand(input + 1, next) # +1 bias
|
||||||
self.layers.append(current)
|
self.layers.append(current)
|
||||||
input = next + 1 # bias
|
input = next
|
||||||
|
|
||||||
def _h0(self, x:np.ndarray) -> np.ndarray:
|
def _h0(self, x:np.ndarray) -> np.ndarray:
|
||||||
input = x
|
self.activations = [x]
|
||||||
for i, layer in enumerate(self.layers):
|
|
||||||
if i != 0:
|
|
||||||
ones = np.ones(shape=(input.shape[0], 1))
|
|
||||||
input = np.hstack([input, ones])
|
|
||||||
input = input.dot(layer)
|
|
||||||
input = input * (input > 0) # activation function ReLU
|
|
||||||
self.calculated[i] = input # saving previous result
|
|
||||||
return self.soft_max(input)
|
|
||||||
|
|
||||||
def soft_max(self, input:np.ndarray) -> np.ndarray:
|
for layer in self.layers:
|
||||||
input = np.exp(input)
|
x = self.with_bias(x)
|
||||||
total_sum = np.sum(input, axis=1)
|
x = x.dot(layer)
|
||||||
input = input.T / total_sum
|
x = x * (x > 0) # activation function ReLU
|
||||||
return input.T
|
self.activations.append(x) # saving activation result
|
||||||
|
return self.softmax(x)
|
||||||
|
|
||||||
def _learning_step(self) -> float:
|
def _learning_step(self) -> float:
|
||||||
|
x, y, m, _ = self._learnset.as_tuple()
|
||||||
|
delta = self._h0(x) - y # first term is derivative of softmax
|
||||||
|
|
||||||
raise NotImplemented
|
for l in reversed(range(len(self.layers))):
|
||||||
|
activation = self.activations[l]
|
||||||
|
deltaW = np.dot(self.with_bias(activation).T, delta) / m
|
||||||
|
|
||||||
|
if l > 0:
|
||||||
|
delta = np.dot(delta, self.layers[l][:-1].T) # ignoring bias
|
||||||
|
delta[activation <= 0] = 0 # derivative ReLU
|
||||||
|
self.layers[l] -= deltaW
|
||||||
|
|
||||||
|
return self._predict_loss(self._learnset)
|
||||||
|
|
||||||
|
def softmax(self, input:np.ndarray) -> np.ndarray:
|
||||||
|
input = input - np.max(input, axis=1, keepdims=True) # for overflow
|
||||||
|
exp_input = np.exp(input)
|
||||||
|
total_sum = np.sum(exp_input, axis=1, keepdims=True)
|
||||||
|
return exp_input / total_sum
|
||||||
|
|
||||||
def _predict_loss(self, dataset:Data) -> float:
|
def _predict_loss(self, dataset:Data) -> float:
|
||||||
diff = self._h0(dataset.x) - dataset.y
|
diff = self._h0(dataset.x) - dataset.y
|
||||||
|
|||||||
Reference in New Issue
Block a user