Merge branch 'main' into 6-telegram-interface

This commit is contained in:
2025-10-08 17:37:51 +02:00
44 changed files with 883 additions and 677 deletions

View File

@@ -0,0 +1,5 @@
from app.utils.market_aggregation import aggregate_history_prices, aggregate_product_info
from app.utils.wrapper_handler import WrapperHandler
from app.utils.chat_manager import ChatManager
__all__ = ["aggregate_history_prices", "aggregate_product_info", "WrapperHandler", "ChatManager"]

View File

@@ -0,0 +1,58 @@
import json
import os
class ChatManager:
"""
Gestisce la conversazione con la Pipeline:
- mantiene lo storico dei messaggi
- invoca la Pipeline per generare risposte
- salva e ricarica le chat
"""
def __init__(self):
self.history: list[dict[str, str]] = [] # [{"role": "user"/"assistant", "content": "..."}]
def send_message(self, message: str) -> None:
"""
Aggiunge un messaggio utente, chiama la Pipeline e salva la risposta nello storico.
"""
# Aggiungi messaggio utente allo storico
self.history.append({"role": "user", "content": message})
def receive_message(self, response: str) -> str:
"""
Riceve un messaggio dalla pipeline e lo aggiunge allo storico.
"""
# Aggiungi risposta assistente allo storico
self.history.append({"role": "assistant", "content": response})
return response
def save_chat(self, filename: str = "chat.json") -> None:
"""
Salva la chat corrente in src/saves/<filename>.
"""
with open(filename, "w", encoding="utf-8") as f:
json.dump(self.history, f, ensure_ascii=False, indent=2)
def load_chat(self, filename: str = "chat.json") -> None:
"""
Carica una chat salvata da src/saves/<filename>.
"""
if not os.path.exists(filename):
self.history = []
return
with open(filename, "r", encoding="utf-8") as f:
self.history = json.load(f)
def reset_chat(self) -> None:
"""
Resetta lo storico della chat.
"""
self.history = []
def get_history(self) -> list[dict[str, str]]:
"""
Restituisce lo storico completo della chat.
"""
return self.history

View File

@@ -1,28 +1,27 @@
import statistics
from app.markets.base import ProductInfo, Price
from app.base.markets import ProductInfo, Price
def aggregate_history_prices(prices: dict[str, list[Price]]) -> list[Price]:
"""
Aggrega i prezzi storici per symbol calcolando la media oraria.
Aggrega i prezzi storici per symbol calcolando la media.
Args:
prices (dict[str, list[Price]]): Mappa provider -> lista di Price
Returns:
list[Price]: Lista di Price aggregati per ora
list[Price]: Lista di Price aggregati per timestamp
"""
# Costruiamo una mappa timestamp_h -> lista di Price
timestamped_prices: dict[int, list[Price]] = {}
# Costruiamo una mappa timestamp -> lista di Price
timestamped_prices: dict[str, list[Price]] = {}
for _, price_list in prices.items():
for price in price_list:
time = price.timestamp_ms - (price.timestamp_ms % 3600000) # arrotonda all'ora (non dovrebbe essere necessario)
timestamped_prices.setdefault(time, []).append(price)
timestamped_prices.setdefault(price.timestamp, []).append(price)
# Ora aggregiamo i prezzi per ogni ora
aggregated_prices = []
# Ora aggregiamo i prezzi per ogni timestamp
aggregated_prices: list[Price] = []
for time, price_list in timestamped_prices.items():
price = Price()
price.timestamp_ms = time
price.timestamp = time
price.high = statistics.mean([p.high for p in price_list])
price.low = statistics.mean([p.low for p in price_list])
price.open = statistics.mean([p.open for p in price_list])
@@ -47,14 +46,13 @@ def aggregate_product_info(products: dict[str, list[ProductInfo]]) -> list[Produ
symbols_infos.setdefault(product.symbol, []).append(product)
# Aggregazione per ogni symbol
sources = list(products.keys())
aggregated_products = []
aggregated_products: list[ProductInfo] = []
for symbol, product_list in symbols_infos.items():
product = ProductInfo()
product.id = f"{symbol}_AGGREGATED"
product.symbol = symbol
product.quote_currency = next(p.quote_currency for p in product_list if p.quote_currency)
product.currency = next(p.currency for p in product_list if p.currency)
volume_sum = sum(p.volume_24h for p in product_list)
product.volume_24h = volume_sum / len(product_list) if product_list else 0.0
@@ -65,27 +63,3 @@ def aggregate_product_info(products: dict[str, list[ProductInfo]]) -> list[Produ
aggregated_products.append(product)
return aggregated_products
def _calculate_confidence(products: list[ProductInfo], sources: list[str]) -> float:
"""Calcola un punteggio di confidenza 0-1"""
if not products:
return 0.0
score = 1.0
# Riduci score se pochi dati
if len(products) < 2:
score *= 0.7
# Riduci score se prezzi troppo diversi
prices = [p.price for p in products if p.price > 0]
if len(prices) > 1:
price_std = (max(prices) - min(prices)) / statistics.mean(prices)
if price_std > 0.05: # >5% variazione
score *= 0.8
# Riduci score se fonti sconosciute
unknown_sources = sum(1 for s in sources if s == "unknown")
if unknown_sources > 0:
score *= (1 - unknown_sources / len(sources))
return max(0.0, min(1.0, score))

View File

@@ -1,13 +1,15 @@
import inspect
import time
import traceback
from typing import TypeVar, Callable, Generic, Iterable, Type
from agno.utils.log import log_warning, log_info
from typing import Any, Callable, Generic, TypeVar
from agno.utils.log import log_info, log_warning #type: ignore
W = TypeVar("W")
T = TypeVar("T")
WrapperType = TypeVar("WrapperType")
WrapperClassType = TypeVar("WrapperClassType")
OutputType = TypeVar("OutputType")
class WrapperHandler(Generic[W]):
class WrapperHandler(Generic[WrapperType]):
"""
A handler for managing multiple wrappers with retry logic.
It attempts to call a function on the current wrapper, and if it fails,
@@ -17,7 +19,7 @@ class WrapperHandler(Generic[W]):
Note: use `build_wrappers` to create an instance of this class for better error handling.
"""
def __init__(self, wrappers: list[W], try_per_wrapper: int = 3, retry_delay: int = 2):
def __init__(self, wrappers: list[WrapperType], try_per_wrapper: int = 3, retry_delay: int = 2):
"""
Initializes the WrapperHandler with a list of wrappers and retry settings.\n
Use `build_wrappers` to create an instance of this class for better error handling.
@@ -32,9 +34,8 @@ class WrapperHandler(Generic[W]):
self.retry_per_wrapper = try_per_wrapper
self.retry_delay = retry_delay
self.index = 0
self.retry_count = 0
def try_call(self, func: Callable[[W], T]) -> T:
def try_call(self, func: Callable[[WrapperType], OutputType]) -> OutputType:
"""
Attempts to call the provided function on the current wrapper.
If it fails, it retries a specified number of times before switching to the next wrapper.
@@ -46,35 +47,9 @@ class WrapperHandler(Generic[W]):
Raises:
Exception: If all wrappers fail after retries.
"""
log_info(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}")
return self.__try_call(func, try_all=False).popitem()[1]
iterations = 0
while iterations < len(self.wrappers):
wrapper = self.wrappers[self.index]
wrapper_name = wrapper.__class__.__name__
try:
log_info(f"try_call {wrapper_name}")
result = func(wrapper)
log_info(f"{wrapper_name} succeeded")
self.retry_count = 0
return result
except Exception as e:
self.retry_count += 1
error = WrapperHandler.__concise_error(e)
log_warning(f"{wrapper_name} failed {self.retry_count}/{self.retry_per_wrapper}: {error}")
if self.retry_count >= self.retry_per_wrapper:
self.index = (self.index + 1) % len(self.wrappers)
self.retry_count = 0
iterations += 1
else:
time.sleep(self.retry_delay)
raise Exception(f"All wrappers failed, latest error: {error}")
def try_call_all(self, func: Callable[[W], T]) -> dict[str, T]:
def try_call_all(self, func: Callable[[WrapperType], OutputType]) -> dict[str, OutputType]:
"""
Calls the provided function on all wrappers, collecting results.
If a wrapper fails, it logs a warning and continues with the next.
@@ -86,24 +61,57 @@ class WrapperHandler(Generic[W]):
Raises:
Exception: If all wrappers fail.
"""
log_info(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}")
return self.__try_call(func, try_all=True)
results = {}
for wrapper in self.wrappers:
def __try_call(self, func: Callable[[WrapperType], OutputType], try_all: bool) -> dict[str, OutputType]:
"""
Internal method to handle the logic of trying to call a function on wrappers.
It can either stop at the first success or try all wrappers.
Args:
func (Callable[[W], T]): A function that takes a wrapper and returns a result.
try_all (bool): If True, tries all wrappers and collects results; if False, stops at the first success.
Returns:
dict[str, T]: A dictionary mapping wrapper class names to results.
Raises:
Exception: If all wrappers fail after retries.
"""
log_info(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}")
results: dict[str, OutputType] = {}
starting_index = self.index
for i in range(starting_index, len(self.wrappers) + starting_index):
self.index = i % len(self.wrappers)
wrapper = self.wrappers[self.index]
wrapper_name = wrapper.__class__.__name__
try:
result = func(wrapper)
log_info(f"{wrapper_name} succeeded")
results[wrapper.__class__] = result
except Exception as e:
error = WrapperHandler.__concise_error(e)
log_warning(f"{wrapper_name} failed: {error}")
if not try_all:
log_info(f"try_call {wrapper_name}")
for try_count in range(1, self.retry_per_wrapper + 1):
try:
result = func(wrapper)
log_info(f"{wrapper_name} succeeded")
results[wrapper_name] = result
break
except Exception as e:
error = WrapperHandler.__concise_error(e)
log_warning(f"{wrapper_name} failed {try_count}/{self.retry_per_wrapper}: {error}")
time.sleep(self.retry_delay)
if not try_all and results:
return results
if not results:
error = locals().get("error", "Unknown error")
raise Exception(f"All wrappers failed, latest error: {error}")
self.index = starting_index
return results
@staticmethod
def __check(wrappers: list[W]) -> bool:
def __check(wrappers: list[Any]) -> bool:
return all(w.__class__ is type for w in wrappers)
@staticmethod
@@ -112,13 +120,13 @@ class WrapperHandler(Generic[W]):
return f"{e} [\"{last_frame.filename}\", line {last_frame.lineno}]"
@staticmethod
def build_wrappers(constructors: Iterable[Type[W]], try_per_wrapper: int = 3, retry_delay: int = 2, kwargs: dict | None = None) -> 'WrapperHandler[W]':
def build_wrappers(constructors: list[type[WrapperClassType]], try_per_wrapper: int = 3, retry_delay: int = 2, kwargs: dict[str, Any] | None = None) -> 'WrapperHandler[WrapperClassType]':
"""
Builds a WrapperHandler instance with the given wrapper constructors.
It attempts to initialize each wrapper and logs a warning if any cannot be initialized.
Only successfully initialized wrappers are included in the handler.
Args:
constructors (Iterable[Type[W]]): An iterable of wrapper classes to instantiate. e.g. [WrapperA, WrapperB]
constructors (list[type[W]]): An iterable of wrapper classes to instantiate. e.g. [WrapperA, WrapperB]
try_per_wrapper (int): Number of retries per wrapper before switching to the next.
retry_delay (int): Delay in seconds between retries.
kwargs (dict | None): Optional dictionary with keyword arguments common to all wrappers.
@@ -129,7 +137,7 @@ class WrapperHandler(Generic[W]):
"""
assert WrapperHandler.__check(constructors), f"All constructors must be classes. Received: {constructors}"
result = []
result: list[WrapperClassType] = []
for wrapper_class in constructors:
try:
wrapper = wrapper_class(**(kwargs or {}))