Refactor Dataset

- better finalize function
- support for one-hot-encoding
This commit is contained in:
2024-05-02 14:19:23 +02:00
parent 969338196b
commit 3a4e07afc8
4 changed files with 118 additions and 98 deletions

View File

@@ -1,43 +1,43 @@
from learning.data import Dataset
from learning.supervised import LinearRegression, LogisticRegression, MultiLogisticRegression
from learning.data import Dataset, TargetType
from learning.supervised import LinearRegression, LogisticRegression, MultiLayerPerceptron
from learning.ml import MLAlgorithm
from typing import Callable
def auto_mpg() -> tuple[int, MLAlgorithm]:
ds = Dataset("datasets\\auto-mpg.csv", "MPG")
ds = Dataset("datasets\\auto-mpg.csv", "MPG", TargetType.Regression)
ds.to_numbers(["HP"])
ds.numbers(["HP"])
ds.handle_na()
ds.regularize(excepts=["Cylinders","Year","Origin"])
return (1000, LinearRegression(ds.as_ndarray(), learning_rate=0.0001))
ds.normalize(excepts=["Cylinders","Year","Origin"])
return (1000, LinearRegression(ds, learning_rate=0.0001))
def automobile() -> tuple[int, MLAlgorithm]:
ds = Dataset("datasets\\regression\\automobile.csv", "symboling")
ds = Dataset("datasets\\regression\\automobile.csv", "symboling", TargetType.Regression)
attributes_to_modify = ["fuel-system", "engine-type", "drive-wheels", "body-style", "make", "engine-location", "aspiration", "fuel-type", "num-of-cylinders", "num-of-doors"]
ds.factorize(attributes_to_modify)
ds.to_numbers(["normalized-losses", "bore", "stroke", "horsepower", "peak-rpm", "price"])
ds.numbers(["normalized-losses", "bore", "stroke", "horsepower", "peak-rpm", "price"])
ds.handle_na()
ds.regularize(excepts=attributes_to_modify)
return (1000, LinearRegression(ds.as_ndarray(), learning_rate=0.004))
ds.normalize(excepts=attributes_to_modify)
return (1000, LinearRegression(ds, learning_rate=0.004))
def power_plant() -> tuple[int, MLAlgorithm]:
ds = Dataset("datasets\\regression\\power-plant.csv", "energy-output")
ds.regularize()
return (80, LinearRegression(ds.as_ndarray(), learning_rate=0.1))
ds = Dataset("datasets\\regression\\power-plant.csv", "energy-output", TargetType.Regression)
ds.normalize()
return (80, LinearRegression(ds, learning_rate=0.1))
def electrical_grid() -> tuple[int, MLAlgorithm]:
ds = Dataset("datasets\\classification\\electrical_grid.csv", "stabf")
ds = Dataset("datasets\\classification\\electrical_grid.csv", "stabf", TargetType.Classification)
ds.factorize(["stabf"])
ds.regularize()
return (1000, LogisticRegression(ds.as_ndarray(), learning_rate=0.08))
ds.normalize()
return (1000, LogisticRegression(ds, learning_rate=0.08))
def frogs() -> tuple[int, MLAlgorithm]:
ds = Dataset("datasets\\classification\\frogs.csv", "Species")
ds = Dataset("datasets\\classification\\frogs.csv", "Species", TargetType.MultiClassification)
ds.remove(["Family", "Genus", "RecordID"])
ds.factorize(["Species"])
return (1000, MultiLogisticRegression(ds.as_ndarray(), learning_rate=0.08))
return (1000, MultiLayerPerceptron(ds, learning_rate=0.08))
@@ -55,5 +55,5 @@ def learn_dataset(function:Callable[..., tuple[int, MLAlgorithm]], epochs:int=10
return ml
if __name__ == "__main__":
ml = learn_dataset(electrical_grid)
ml = learn_dataset(automobile)
print(ml.accuracy(ml.testset))

View File

@@ -1,33 +1,53 @@
import pandas as pd
import numpy as np
from enum import Enum
from typing_extensions import Self
class TargetType(Enum):
Regression = 1
Classification = 2
MultiClassification = 3
NoTarget = 4
class Data:
x: np.ndarray
y: np.ndarray
size: int
param: int
def __init__(self, x:np.ndarray, y:np.ndarray) -> None:
self.x = x
self.y = y
self.size = x.shape[0]
self.param = x.shape[1]
def __str__(self) -> str:
return "X: " + str(self.x) + "\nY: " + str(self.y)
def as_tuple(self) -> tuple[np.ndarray, np.ndarray, int, int]:
return (self.x, self.y, self.size, self.param)
class Dataset:
def __init__(self, csv:str, target:str, classification:bool=None) -> None:
data = pd.read_csv(csv)
data: pd.DataFrame
target: str
target_type: TargetType
def __init__(self, csv:str, target:str, target_type:TargetType) -> None:
self.original = pd.read_csv(csv)
self.data = self.original
self.target = target
self.target_type = target_type
# move target to the start
col_target = data.pop(target)
data.insert(0, target, col_target)
data.insert(1, "Bias", 1.0)
if classification == None:
classification = (data[target].dtype == object)
self.original = data
self.data = data
self.target = target
self.classification = classification
col_target = self.data.pop(target)
self.data.insert(0, target, col_target)
def remove(self, columns:list[str]) -> Self:
for col in columns:
self.data.pop(col)
return self
def regularize(self, excepts:list[str]=[]) -> Self:
def normalize(self, excepts:list[str]=[]) -> Self:
excepts.append(self.target)
excepts.append("Bias")
for col in self.data:
if col not in excepts:
index = self.data.columns.get_loc(col)
@@ -42,7 +62,7 @@ class Dataset:
data[col] = pd.factorize(data[col])[0]
return self
def to_numbers(self, columns:list[str]=[]) -> Self:
def numbers(self, columns:list[str]=[]) -> Self:
data = self.data
for col in columns:
if data[col].dtype == object:
@@ -53,34 +73,38 @@ class Dataset:
self.data = self.data.dropna()
return self
def shuffle(self) -> Self:
self.data = self.data.sample(frac=1)
return self
def get_dataset(self, test_frac:float=0.15, valid_frac:float=0.15) -> tuple[Data, Data, Data]:
data = self.data.to_numpy()
data = np.insert(data, 1, 1, axis=1) # adding bias
np.random.shuffle(data)
def as_ndarray(self) -> np.ndarray:
return self.data.to_numpy()
def get_index(self, column:str) -> int:
return self.data.columns.get_loc(column)
class PrincipalComponentAnalisys:
def __init__(self, data:np.ndarray) -> None:
self.data = data
def reduce(self, total:int=0, threshold:float=1) -> Self:
columns = self.data.shape[1]
if total > columns or total <= 0:
total = columns
if threshold <= 0 or threshold > 1:
threshold = 1
total = data.shape[0]
valid_cutoff = int(total * valid_frac)
test_cutoff = int(total * test_frac) + valid_cutoff
valid = data[:valid_cutoff]
test = data[valid_cutoff:test_cutoff]
learn = data[test_cutoff:]
l = []
for ds in [learn, test, valid]:
target = ds[:, 0] if self.target_type != TargetType.NoTarget else None
ds = ds[:, 1:]
if self.target_type == TargetType.MultiClassification:
target = target.astype(int)
uniques = np.unique(target).shape[0]
target = np.eye(uniques)[target]
l.append(Data(ds, target))
return l
if __name__ == "__main__":
df = Dataset("datasets\\regression\\automobile.csv", "symboling")
attributes_to_modify = ["fuel-system", "engine-type", "drive-wheels", "body-style", "make", "engine-location", "aspiration", "fuel-type", "num-of-cylinders", "num-of-doors"]
df.factorize(attributes_to_modify)
df.to_numbers(["normalized-losses", "bore", "stroke", "horsepower", "peak-rpm", "price"])
df.handle_na()
df.regularize(excepts=attributes_to_modify)
print(df.data.dtypes)
ds = Dataset("datasets\\classification\\frogs.csv", "Species", TargetType.MultiClassification)
ds.remove(["Family", "Genus", "RecordID"])
ds.factorize(["Species"])
np.random.seed(0)
learn, test, valid = ds.get_dataset()
print(learn)
print(test)
print(valid)

View File

@@ -1,33 +1,25 @@
from abc import ABC, abstractmethod
from plot import Plot
from tqdm import tqdm
from learning.data import Dataset, Data
import pandas as pd
import numpy as np
class MLAlgorithm(ABC):
""" Classe generica per gli algoritmi di Machine Learning """
testset: np.ndarray
learnset: np.ndarray
learnset: Data
validset: Data
testset: Data
_learn_loss: list[float]
_valid_loss: list[float]
_train_loss: list[float]
def _set_dataset(self, dataset:np.ndarray, split:float=0.2):
splitT = int(dataset.shape[0] * split)
splitV = int(splitT / 2)
np.random.shuffle(dataset)
self.validset = dataset[:splitV]
self.testset = dataset[splitV:splitT]
self.learnset = dataset[splitT:]
def _split_data_target(self, dset:np.ndarray) -> tuple[np.ndarray, np.ndarray, int]:
x = np.delete(dset, 0, 1)
y = dset[:, 0]
m = dset.shape[0]
return (x, y, m)
def __init__(self, dataset:Dataset) -> None:
learn, test, valid = dataset.get_dataset(0.2, 0.2)
self.learnset = learn
self.validset = valid
self.testset = test
def learn(self, epochs:int, early_stop:float=0.0000001, max_patience:int=10, verbose:bool=False) -> tuple[int, list, list]:
learn = []
@@ -56,7 +48,7 @@ class MLAlgorithm(ABC):
except KeyboardInterrupt: pass
if verbose: print(f"Loop ended after {count} epochs")
self._train_loss = learn
self._learn_loss = learn
self._valid_loss = valid
return (count, learn, valid)
@@ -70,24 +62,23 @@ class MLAlgorithm(ABC):
return self.predict_loss(self.testset)
def plot(self, skip:int=1000) -> None:
skip = skip if len(self._train_loss) > skip else 0
skip = skip if len(self._learn_loss) > skip else 0
plot = Plot("Loss", "Time", "Mean Loss")
plot.line("training", "blue", data=self._train_loss[skip:])
plot.line("training", "blue", data=self._learn_loss[skip:])
plot.line("validation", "red", data=self._valid_loss[skip:])
plot.wait()
def confusion_matrix(self, dataset:np.ndarray) -> np.ndarray:
x, y, _ = self._split_data_target(dataset)
h0 = np.where(self._h0(x) > 0.5, 1, 0)
def confusion_matrix(self, dataset:Data) -> np.ndarray:
h0 = np.where(self._h0(dataset.x) > 0.5, 1, 0)
classes = len(np.unique(y))
classes = len(np.unique(dataset.y))
conf_matrix = np.zeros((classes, classes), dtype=int)
for actual, prediction in zip(y, h0):
for actual, prediction in zip(dataset.y, h0):
conf_matrix[int(actual), int(prediction)] += 1
return conf_matrix
def accuracy(self, dataset:np.ndarray) -> np.ndarray:
def accuracy(self, dataset:Data) -> float:
conf = self.confusion_matrix(dataset)
correct = np.sum(np.diagonal(conf))
total = np.sum(conf)
@@ -98,7 +89,7 @@ class MLAlgorithm(ABC):
@abstractmethod
def learning_step(self) -> float: pass
@abstractmethod
def predict_loss(self, dataset:np.ndarray) -> float: pass
def predict_loss(self, dataset:Data) -> float: pass
@abstractmethod
def get_parameters(self): pass
@abstractmethod

View File

@@ -3,27 +3,25 @@ import numpy as np
from abc import abstractmethod
from learning.ml import MLAlgorithm
from learning.data import Dataset, Data
class GradientDescent(MLAlgorithm):
theta:np.ndarray
alpha:float
def __init__(self, dataset:np.ndarray, learning_rate:float=0.1) -> None:
self._set_dataset(dataset)
parameters = dataset.shape[1] - 1 #removing the result
self.theta = np.random.rand(parameters)
def __init__(self, dataset:Dataset, learning_rate:float=0.1) -> None:
self.__init__(dataset)
self.theta = np.random.rand(self.learnset.param)
self.alpha = max(0, learning_rate)
def learning_step(self) -> float:
x, y, m = self._split_data_target(self.learnset)
x, y, m, _ = self.learnset.as_tuple()
self.theta -= self.alpha * (1/m) * np.sum((self._h0(x) - y) * x.T, axis=1)
return self._loss(x, y, m)
def predict_loss(self, dataset:np.ndarray) -> float:
x, y, m = self._split_data_target(dataset)
return self._loss(x, y, m)
def predict_loss(self, dataset:Data) -> float:
return self._loss(dataset.x, dataset.y, dataset.size)
def get_parameters(self):
return self.theta.copy()
@@ -51,3 +49,10 @@ class LogisticRegression(GradientDescent):
h0 = self._h0(x)
diff = -y*np.log(h0) -(1-y)*np.log(1-h0)
return 1/m * np.sum(diff)
class MultiLayerPerceptron(MLAlgorithm):
neurons: list[np.ndarray]
def __init__(self, dataset:Dataset, layers:list[int]=[4,3]) -> None:
self.__init__(dataset)