Project struct
This commit is contained in:
66
src/learning/data.py
Normal file
66
src/learning/data.py
Normal file
@@ -0,0 +1,66 @@
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
from typing_extensions import Self
|
||||
|
||||
class Dataset:
|
||||
def __init__(self, csv:str, target:str) -> None:
|
||||
data = pd.read_csv(csv)
|
||||
|
||||
# move target to the start
|
||||
col_target = data.pop(target)
|
||||
data.insert(0, target, col_target)
|
||||
data.insert(1, "Bias", 1.0)
|
||||
|
||||
self.original = data
|
||||
self.data = data
|
||||
self.target = target
|
||||
self.classification = (data[target].dtype == object)
|
||||
|
||||
def regularize(self, excepts:list=[]) -> Self:
|
||||
excepts.append(self.target)
|
||||
excepts.append("Bias")
|
||||
for col in self.data:
|
||||
if col not in excepts:
|
||||
dt = self.data[col]
|
||||
self.data[col] = (dt - dt.mean()) / dt.std()
|
||||
return self
|
||||
|
||||
def factorize(self, columns:list=[]) -> Self:
|
||||
data = self.data
|
||||
for col in columns:
|
||||
data[col] = pd.factorize(data[col])[0]
|
||||
return self
|
||||
|
||||
def to_numbers(self, columns:list=[]) -> Self:
|
||||
data = self.data
|
||||
for col in self.data.columns:
|
||||
if data[col].dtype == object:
|
||||
data[col] = pd.to_numeric(data[col], errors='coerce')
|
||||
return self
|
||||
|
||||
def handle_na(self) -> Self:
|
||||
self.data = self.data.dropna()
|
||||
return self
|
||||
|
||||
def shuffle(self) -> Self:
|
||||
self.data = self.data.sample(frac=1)
|
||||
return self
|
||||
|
||||
def as_ndarray(self) -> np.ndarray:
|
||||
return self.data.to_numpy()
|
||||
|
||||
def get_index(self, column:str) -> int:
|
||||
return self.data.columns.get_loc(column)
|
||||
|
||||
class PrincipalComponentAnalisys:
|
||||
def __init__(self, data:np.ndarray) -> None:
|
||||
self.data = data
|
||||
|
||||
def reduce(self, total:int=0, threshold:float=1) -> Self:
|
||||
columns = self.data.shape[1]
|
||||
if total > columns or total <= 0:
|
||||
total = columns
|
||||
if threshold <= 0 or threshold > 1:
|
||||
threshold = 1
|
||||
|
||||
41
src/learning/ml.py
Normal file
41
src/learning/ml.py
Normal file
@@ -0,0 +1,41 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from learning.data import Dataset
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
class MLAlgorithm(ABC):
|
||||
|
||||
dataset: Dataset
|
||||
testset: np.ndarray
|
||||
learnset: np.ndarray
|
||||
|
||||
def _set_dataset(self, dataset:Dataset, split:float=0.2):
|
||||
ndarray = dataset.shuffle().as_ndarray()
|
||||
split = int(ndarray.shape[0] * split)
|
||||
|
||||
self.dataset = dataset
|
||||
self.testset = ndarray[split:]
|
||||
self.learnset = ndarray[:split]
|
||||
|
||||
def _split_data_target(self, dset:np.ndarray) -> tuple[np.ndarray, np.ndarray, int]:
|
||||
x = np.delete(dset, 0, 1)
|
||||
y = dset[:, 0]
|
||||
m = dset.shape[0]
|
||||
return (x, y, m)
|
||||
|
||||
def learn(self, times:int) -> tuple[list, list]:
|
||||
train = []
|
||||
test = []
|
||||
for _ in range(0, max(1, times)):
|
||||
train.append(self.learning_step())
|
||||
test.append(self.test_error())
|
||||
return (train, test)
|
||||
|
||||
@abstractmethod
|
||||
def learning_step(self) -> float:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def test_error(self) -> float:
|
||||
pass
|
||||
29
src/learning/supervised.py
Normal file
29
src/learning/supervised.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import math as math
|
||||
import numpy as np
|
||||
|
||||
from ml import MLAlgorithm
|
||||
from learning.data import Dataset
|
||||
|
||||
class LinearRegression(MLAlgorithm):
|
||||
def __init__(self, dataset:Dataset, learning_rate:float=0.1) -> None:
|
||||
self._set_dataset(dataset)
|
||||
|
||||
parameters = dataset.data.shape[1] - 1 #removing the result
|
||||
self.theta = np.random.rand(parameters)
|
||||
self.alpha = max(0, learning_rate)
|
||||
|
||||
def learning_step(self) -> float:
|
||||
theta = self.theta
|
||||
alpha = self.alpha
|
||||
x, y, m = self._split_data_target(self.learnset)
|
||||
|
||||
self.theta -= alpha * (1/m) * np.sum((x.dot(theta) - y) * x.T, axis=1)
|
||||
return self._error(x, y, m)
|
||||
|
||||
def test_error(self) -> float:
|
||||
x, y, m = self._split_data_target(self.testset)
|
||||
return self._error(x, y, m)
|
||||
|
||||
def _error(self, x:np.ndarray, y:np.ndarray, m:int) -> float:
|
||||
diff = (x.dot(self.theta) - y)
|
||||
return 1/(2*m) * np.sum(diff ** 2)
|
||||
Reference in New Issue
Block a user