diff --git a/src/app.py b/src/app.py new file mode 100644 index 0000000..3ce4475 --- /dev/null +++ b/src/app.py @@ -0,0 +1,51 @@ +from learning.data import Dataset +from learning.supervised import LinearRegression +from learning.ml import MLAlgorithm +from plot import Plot + +def auto_mpg() -> MLAlgorithm: + df = Dataset("datasets\\auto-mpg.csv", "MPG") + + df.to_numbers(["HP"]) + df.handle_na() + df.regularize(excepts=["Cylinders","Year","Origin"]) + + return LinearRegression(df, learning_rate=0.0001) + +def automobile() -> MLAlgorithm: + df = Dataset("datasets\\regression\\automobile.csv", "symboling") + + attributes_to_modify = ["fuel-system", "engine-type", "drive-wheels", "body-style", "make", "engine-location", "aspiration", "fuel-type", "num-of-cylinders", "num-of-doors"] + df.factorize(attributes_to_modify) + df.to_numbers() + df.handle_na() + df.regularize(excepts=attributes_to_modify) + + return LinearRegression(df, learning_rate=0.001) + + + + +epoch = 50000 +skip = 1000 +lr = automobile() + +train, test = lr.learn(epoch) + +plot = Plot("Error", "Time", "Mean Error") +plot.line("training", "red", data=train[skip:]) +plot.line("test", "blue", data=test[skip:]) + +""" +for _ in range(0, epoch): + train_err = lr.learning_step() + test_err = lr.test_error() + + plot.update("training", train_err) + plot.update("test", test_err) + plot.update_limits() +""" + +plot.wait() + + diff --git a/src/data.py b/src/learning/data.py similarity index 86% rename from src/data.py rename to src/learning/data.py index f059823..6b6afb9 100644 --- a/src/data.py +++ b/src/learning/data.py @@ -10,13 +10,16 @@ class Dataset: # move target to the start col_target = data.pop(target) data.insert(0, target, col_target) + data.insert(1, "Bias", 1.0) + self.original = data self.data = data self.target = target self.classification = (data[target].dtype == object) def regularize(self, excepts:list=[]) -> Self: excepts.append(self.target) + excepts.append("Bias") for col in self.data: if col not in excepts: dt = self.data[col] @@ -44,10 +47,11 @@ class Dataset: self.data = self.data.sample(frac=1) return self - def as_ndarray(self, bias=True): - data = self.data.copy() - if bias: data.insert(1, "Bias", 1.0) - return data.to_numpy() + def as_ndarray(self) -> np.ndarray: + return self.data.to_numpy() + + def get_index(self, column:str) -> int: + return self.data.columns.get_loc(column) class PrincipalComponentAnalisys: def __init__(self, data:np.ndarray) -> None: diff --git a/src/learning/ml.py b/src/learning/ml.py new file mode 100644 index 0000000..f6f543c --- /dev/null +++ b/src/learning/ml.py @@ -0,0 +1,41 @@ +from abc import ABC, abstractmethod +from learning.data import Dataset + +import numpy as np + + +class MLAlgorithm(ABC): + + dataset: Dataset + testset: np.ndarray + learnset: np.ndarray + + def _set_dataset(self, dataset:Dataset, split:float=0.2): + ndarray = dataset.shuffle().as_ndarray() + split = int(ndarray.shape[0] * split) + + self.dataset = dataset + self.testset = ndarray[split:] + self.learnset = ndarray[:split] + + def _split_data_target(self, dset:np.ndarray) -> tuple[np.ndarray, np.ndarray, int]: + x = np.delete(dset, 0, 1) + y = dset[:, 0] + m = dset.shape[0] + return (x, y, m) + + def learn(self, times:int) -> tuple[list, list]: + train = [] + test = [] + for _ in range(0, max(1, times)): + train.append(self.learning_step()) + test.append(self.test_error()) + return (train, test) + + @abstractmethod + def learning_step(self) -> float: + pass + + @abstractmethod + def test_error(self) -> float: + pass diff --git a/src/learning/supervised.py b/src/learning/supervised.py new file mode 100644 index 0000000..e451bce --- /dev/null +++ b/src/learning/supervised.py @@ -0,0 +1,29 @@ +import math as math +import numpy as np + +from ml import MLAlgorithm +from learning.data import Dataset + +class LinearRegression(MLAlgorithm): + def __init__(self, dataset:Dataset, learning_rate:float=0.1) -> None: + self._set_dataset(dataset) + + parameters = dataset.data.shape[1] - 1 #removing the result + self.theta = np.random.rand(parameters) + self.alpha = max(0, learning_rate) + + def learning_step(self) -> float: + theta = self.theta + alpha = self.alpha + x, y, m = self._split_data_target(self.learnset) + + self.theta -= alpha * (1/m) * np.sum((x.dot(theta) - y) * x.T, axis=1) + return self._error(x, y, m) + + def test_error(self) -> float: + x, y, m = self._split_data_target(self.testset) + return self._error(x, y, m) + + def _error(self, x:np.ndarray, y:np.ndarray, m:int) -> float: + diff = (x.dot(self.theta) - y) + return 1/(2*m) * np.sum(diff ** 2) diff --git a/src/linear_regression.py b/src/linear_regression.py deleted file mode 100644 index bc5c105..0000000 --- a/src/linear_regression.py +++ /dev/null @@ -1,83 +0,0 @@ -import math as math -import numpy as np -import matplotlib.pyplot as plt - -from data import Dataset - - -class LinearRegression: - def __init__(self, dataset:Dataset, learning_rate:float=0.1) -> None: - ndarray = dataset.shuffle().as_ndarray() - parameters = ndarray.shape[1] - 1 #removing the result - - split = int(ndarray.shape[0] * 0.2) - self.testset = ndarray[split:] - self.trainingset = ndarray[:split] - - self.theta = np.random.rand(parameters) - self.alpha = max(0, learning_rate) - - def learn(self, times:int) -> list: - train = [] - test = [] - for _ in range(0, max(1, times)): - train.append(self.learning_step()) - test.append(self.test_error()) - return (train, test) - - def learning_step(self) -> float: - theta = self.theta - alpha = self.alpha - x = np.delete(self.trainingset, 0, 1) - y = self.trainingset[:, 0] - m = self.trainingset.shape[0] - - diff = (x.dot(theta) - y) - sum = np.sum(diff * x.T, axis=1) - theta -= alpha * (1/m) * sum - self.theta = theta - return self._error(x, y, m) - - def test_error(self) -> float: - x = np.delete(self.testset, 0, 1) - y = self.testset[:, 0] - m = self.testset.shape[0] - return self._error(x, y, m) - - def _error(self, x:np.ndarray, y:np.ndarray, m:int) -> float: - diff = (x.dot(self.theta) - y) - return 1/(2*m) * np.sum(diff ** 2) - -def auto_mpg(epoch:int): - df = Dataset("datasets\\auto-mpg.csv", "MPG") - - df.to_numbers(["HP"]) - df.handle_na() - df.regularize(excepts=["Cylinders","Year","Origin"]) - - lr = LinearRegression(df, learning_rate=0.0001) - return lr.learn(epoch) - -def automobile(epoch:int): - df = Dataset("datasets\\regression\\automobile.csv", "symboling") - - attributes_to_modify = ["fuel-system", "engine-type", "drive-wheels", "body-style", "make", "engine-location", "aspiration", "fuel-type", "num-of-cylinders", "num-of-doors"] - df.factorize(attributes_to_modify) - df.to_numbers() - df.handle_na() - df.regularize(excepts=attributes_to_modify) - - lr = LinearRegression(df, learning_rate=0.001) - return lr.learn(epoch) - - -if __name__ == '__main__': - epoch = 10000 - skip = - int(epoch * 0.9) - err_train, err_test = auto_mpg(epoch) - plt.title("Error") - plt.xlabel("Time") - plt.ylabel("Mean Error") - plt.plot(err_train[skip:-1], color="red") - plt.plot(err_test[skip:-1], color="blue") - plt.show() diff --git a/src/plot.py b/src/plot.py new file mode 100644 index 0000000..329a28f --- /dev/null +++ b/src/plot.py @@ -0,0 +1,63 @@ +import matplotlib.pyplot as plt +from typing_extensions import Self + +class Plot: + def __init__(self, title:str, labelx:str, labely:str) -> None: + plt.title(title) + plt.xlabel(labelx) + plt.ylabel(labely) + plt.ion() + plt.show(block=False) + + self.data = dict() + + def wait(self) -> Self: + plt.ioff() + plt.show() + return self + + def scatter(self, label:str, datax:list[float], datay:list[float], color:str) -> Self: + plt.scatter(datax, datay, color=color, label=label) + return self + + def line(self, label:str, color:str, data:list[float]=[], max_length:int=100) -> Self: + line, = plt.plot(data if len(data) > 0 else [0], label=label, color=color) + x = [] if len(data) == 0 else [*range(len(data))] + + self.data[label] = (line, data, x, max_length) + plt.legend() + return self + + def update(self, label:str, newdata:float) -> Self: + line, datay, datax, max = self.data[label] + + x = 0 if len(datax) == 0 else datax[-1] + datax.append(x+1) + datay.append(newdata) + + remove = len(datax) - max + if remove > 0: + del datax[:remove] + del datay[:remove] + + line.set_data((datax, datay)) + return self + + def update_limits(self) -> Self: + if not bool(plt.get_fignums()): raise Exception("plot closed!") + limy_top = 0.1 + limx_top, limx_bot = (0, 100000000000000000) + + for val in self.data: + _, datay, datax, _ = self.data[val] + limy_top = max(max(datay), limy_top) + limx_top = max(max(datax), limx_top) + limx_bot = min(min(datax), limx_bot) + if limx_top == limx_bot: limx_top += 1 + + plt.xlim(limx_bot, limx_top) + plt.ylim(0, limy_top) + plt.draw() + plt.pause(0.0000000001) + return self +