Fix socials timestamp #50

Merged
Berack96 merged 8 commits from fix-socials-timestamp into main 2025-10-27 12:45:40 +01:00
17 changed files with 407 additions and 1096 deletions
Showing only changes of commit 257f38adc5 - Show all commits

View File

@@ -8,9 +8,7 @@ try:
reasoning_agent = Agent(
model=Gemini(),
tools=[
ReasoningTools(),
],
tools=[ReasoningTools()],
instructions="Use tables to display data.",
markdown=True,
)

View File

@@ -0,0 +1,12 @@
from dotenv import load_dotenv
from app.api.tools import MarketAPIsTool
def main():
api = MarketAPIsTool()
prices_aggregated = api.get_historical_prices_aggregated("BTC", limit=5)
for price in prices_aggregated:
print(f"== [{price.timestamp}] {price.low:.2f} - {price.high:.2f} ==")
if __name__ == "__main__":
load_dotenv()
main()

View File

@@ -0,0 +1,16 @@
from dotenv import load_dotenv
from app.api.tools import NewsAPIsTool
def main():
api = NewsAPIsTool()
articles_aggregated = api.get_latest_news_aggregated(query="bitcoin", limit=2)
for provider, articles in articles_aggregated.items():
print("===================================")
print(f"Provider: {provider}")
for article in articles:
print(f"== [{article.time}] {article.title} ==")
print(f" {article.description}")
if __name__ == "__main__":
load_dotenv()
main()

View File

@@ -8,7 +8,7 @@ def main():
print("===================================")
print(f"Provider: {provider}")
for post in posts:
print(f"== [{post.timestamp}] - {post.title} ==")
print(f"== [{post.time}] - {post.title} ==")
print(f" {post.description}")
print(f" {len(post.comments)}")

View File

@@ -1,353 +0,0 @@
#!/usr/bin/env python3
"""
Demo Completo per Market Data Providers
========================================
Questo script dimostra l'utilizzo di tutti i wrapper che implementano BaseWrapper:
- CoinBaseWrapper (richiede credenziali)
- CryptoCompareWrapper (richiede API key)
- BinanceWrapper (richiede credenziali)
- PublicBinanceAgent (accesso pubblico)
- YFinanceWrapper (accesso gratuito a dati azionari e crypto)
Lo script effettua chiamate GET a diversi provider e visualizza i dati
in modo strutturato con informazioni dettagliate su timestamp, stato
delle richieste e formattazione tabellare.
"""
import sys
import os
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Any
import traceback
# Aggiungi il path src al PYTHONPATH
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))
from dotenv import load_dotenv
from app.api.markets import (
CoinBaseWrapper,
CryptoCompareWrapper,
BinanceWrapper,
YFinanceWrapper,
MarketWrapper
)
# Carica variabili d'ambiente
load_dotenv()
class DemoFormatter:
"""Classe per formattare l'output del demo in modo strutturato."""
@staticmethod
def print_header(title: str, char: str = "=", width: int = 80):
"""Stampa un'intestazione formattata."""
print(f"\n{char * width}")
print(f"{title:^{width}}")
print(f"{char * width}")
@staticmethod
def print_subheader(title: str, char: str = "-", width: int = 60):
"""Stampa una sotto-intestazione formattata."""
print(f"\n{char * width}")
print(f" {title}")
print(f"{char * width}")
@staticmethod
def print_request_info(provider_name: str, method: str, timestamp: datetime,
status: str, error: Optional[str] = None):
"""Stampa informazioni sulla richiesta."""
print(f"🕒 Timestamp: {timestamp.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"🏷️ Provider: {provider_name}")
print(f"🔧 Method: {method}")
print(f"📊 Status: {status}")
if error:
print(f"❌ Error: {error}")
print()
@staticmethod
def print_product_table(products: List[Any], title: str = "Products"):
"""Stampa una tabella di prodotti."""
if not products:
print(f"📋 {title}: Nessun prodotto trovato")
return
print(f"📋 {title} ({len(products)} items):")
print(f"{'Symbol':<15} {'ID':<20} {'Price':<12} {'Quote':<10} {'Status':<10}")
print("-" * 67)
for product in products[:10]: # Mostra solo i primi 10
symbol = getattr(product, 'symbol', 'N/A')
product_id = getattr(product, 'id', 'N/A')
price = getattr(product, 'price', 0.0)
quote = getattr(product, 'quote_currency', 'N/A')
status = getattr(product, 'status', 'N/A')
# Tronca l'ID se troppo lungo
if len(product_id) > 18:
product_id = product_id[:15] + "..."
price_str = f"${price:.2f}" if price > 0 else "N/A"
print(f"{symbol:<15} {product_id:<20} {price_str:<12} {quote:<10} {status:<10}")
if len(products) > 10:
print(f"... e altri {len(products) - 10} prodotti")
print()
@staticmethod
def print_prices_table(prices: List[Any], title: str = "Historical Prices"):
"""Stampa una tabella di prezzi storici."""
if not prices:
print(f"💰 {title}: Nessun prezzo trovato")
return
print(f"💰 {title} ({len(prices)} entries):")
print(f"{'Time':<12} {'Open':<12} {'High':<12} {'Low':<12} {'Close':<12} {'Volume':<15}")
print("-" * 75)
for price in prices[:5]: # Mostra solo i primi 5
time_str = getattr(price, 'time', 'N/A')
# Il time è già una stringa, non serve strftime
if len(time_str) > 10:
time_str = time_str[:10] # Tronca se troppo lungo
open_price = f"${getattr(price, 'open', 0):.2f}"
high_price = f"${getattr(price, 'high', 0):.2f}"
low_price = f"${getattr(price, 'low', 0):.2f}"
close_price = f"${getattr(price, 'close', 0):.2f}"
volume = f"{getattr(price, 'volume', 0):,.0f}"
print(f"{time_str:<12} {open_price:<12} {high_price:<12} {low_price:<12} {close_price:<12} {volume:<15}")
if len(prices) > 5:
print(f"... e altri {len(prices) - 5} prezzi")
print()
class ProviderTester:
"""Classe per testare i provider di market data."""
def __init__(self):
self.formatter = DemoFormatter()
self.test_symbols = ["BTC", "ETH", "ADA"]
def test_provider(self, wrapper: MarketWrapper, provider_name: str) -> Dict[str, Any]:
"""Testa un provider specifico con tutti i metodi disponibili."""
results: Dict[str, Any] = {
"provider_name": provider_name,
"tests": {},
"overall_status": "SUCCESS"
}
self.formatter.print_subheader(f"🔍 Testing {provider_name}")
# Test get_product
for symbol in self.test_symbols:
timestamp = datetime.now()
try:
product = wrapper.get_product(symbol)
self.formatter.print_request_info(
provider_name, f"get_product({symbol})", timestamp, "✅ SUCCESS"
)
if product:
print(f"📦 Product: {product.symbol} (ID: {product.id})")
print(f" Price: ${product.price:.2f}, Quote: {product.currency}")
print(f" Volume 24h: {product.volume_24h:,.2f}")
else:
print(f"📦 Product: Nessun prodotto trovato per {symbol}")
results["tests"][f"get_product_{symbol}"] = "SUCCESS"
except Exception as e:
error_msg = str(e)
self.formatter.print_request_info(
provider_name, f"get_product({symbol})", timestamp, "❌ ERROR", error_msg
)
results["tests"][f"get_product_{symbol}"] = f"ERROR: {error_msg}"
results["overall_status"] = "PARTIAL"
# Test get_products
timestamp = datetime.now()
try:
products = wrapper.get_products(self.test_symbols)
self.formatter.print_request_info(
provider_name, f"get_products({self.test_symbols})", timestamp, "✅ SUCCESS"
)
self.formatter.print_product_table(products, f"{provider_name} Products")
results["tests"]["get_products"] = "SUCCESS"
except Exception as e:
error_msg = str(e)
self.formatter.print_request_info(
provider_name, f"get_products({self.test_symbols})", timestamp, "❌ ERROR", error_msg
)
results["tests"]["get_products"] = f"ERROR: {error_msg}"
results["overall_status"] = "PARTIAL"
# Test get_historical_prices
timestamp = datetime.now()
try:
prices = wrapper.get_historical_prices("BTC")
self.formatter.print_request_info(
provider_name, "get_historical_prices(BTC)", timestamp, "✅ SUCCESS"
)
self.formatter.print_prices_table(prices, f"{provider_name} BTC Historical Prices")
results["tests"]["get_historical_prices"] = "SUCCESS"
except Exception as e:
error_msg = str(e)
self.formatter.print_request_info(
provider_name, "get_historical_prices(BTC)", timestamp, "❌ ERROR", error_msg
)
results["tests"]["get_historical_prices"] = f"ERROR: {error_msg}"
results["overall_status"] = "PARTIAL"
return results
def check_environment_variables() -> Dict[str, bool]:
"""Verifica la presenza delle variabili d'ambiente necessarie."""
env_vars = {
"COINBASE_API_KEY": bool(os.getenv("COINBASE_API_KEY")),
"COINBASE_API_SECRET": bool(os.getenv("COINBASE_API_SECRET")),
"CRYPTOCOMPARE_API_KEY": bool(os.getenv("CRYPTOCOMPARE_API_KEY")),
"BINANCE_API_KEY": bool(os.getenv("BINANCE_API_KEY")),
"BINANCE_API_SECRET": bool(os.getenv("BINANCE_API_SECRET")),
}
return env_vars
def initialize_providers() -> Dict[str, MarketWrapper]:
"""Inizializza tutti i provider disponibili."""
providers: Dict[str, MarketWrapper] = {}
env_vars = check_environment_variables()
# CryptoCompareWrapper
if env_vars["CRYPTOCOMPARE_API_KEY"]:
try:
providers["CryptoCompare"] = CryptoCompareWrapper()
print("✅ CryptoCompareWrapper inizializzato con successo")
except Exception as e:
print(f"❌ Errore nell'inizializzazione di CryptoCompareWrapper: {e}")
else:
print("⚠️ CryptoCompareWrapper saltato: CRYPTOCOMPARE_API_KEY non trovata")
# CoinBaseWrapper
if env_vars["COINBASE_API_KEY"] and env_vars["COINBASE_API_SECRET"]:
try:
providers["CoinBase"] = CoinBaseWrapper()
print("✅ CoinBaseWrapper inizializzato con successo")
except Exception as e:
print(f"❌ Errore nell'inizializzazione di CoinBaseWrapper: {e}")
else:
print("⚠️ CoinBaseWrapper saltato: credenziali Coinbase non complete")
# BinanceWrapper
try:
providers["Binance"] = BinanceWrapper()
print("✅ BinanceWrapper inizializzato con successo")
except Exception as e:
print(f"❌ Errore nell'inizializzazione di BinanceWrapper: {e}")
# YFinanceWrapper (sempre disponibile - dati azionari e crypto gratuiti)
try:
providers["YFinance"] = YFinanceWrapper()
print("✅ YFinanceWrapper inizializzato con successo")
except Exception as e:
print(f"❌ Errore nell'inizializzazione di YFinanceWrapper: {e}")
return providers
def print_summary(results: List[Dict[str, Any]]):
"""Stampa un riassunto finale dei risultati."""
formatter = DemoFormatter()
formatter.print_header("📊 RIASSUNTO FINALE", "=", 80)
total_providers = len(results)
successful_providers = sum(1 for r in results if r["overall_status"] == "SUCCESS")
partial_providers = sum(1 for r in results if r["overall_status"] == "PARTIAL")
print(f"🔢 Provider testati: {total_providers}")
print(f"✅ Provider completamente funzionanti: {successful_providers}")
print(f"⚠️ Provider parzialmente funzionanti: {partial_providers}")
print(f"❌ Provider non funzionanti: {total_providers - successful_providers - partial_providers}")
print("\n📋 Dettaglio per provider:")
for result in results:
provider_name = result["provider_name"]
status = result["overall_status"]
status_icon = "" if status == "SUCCESS" else "⚠️" if status == "PARTIAL" else ""
print(f"\n{status_icon} {provider_name}:")
for test_name, test_result in result["tests"].items():
test_icon = "" if test_result == "SUCCESS" else ""
print(f" {test_icon} {test_name}: {test_result}")
def main():
"""Funzione principale del demo."""
formatter = DemoFormatter()
# Intestazione principale
formatter.print_header("🚀 DEMO COMPLETO MARKET DATA PROVIDERS", "=", 80)
print(f"🕒 Avvio demo: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("📝 Questo demo testa tutti i wrapper BaseWrapper disponibili")
print("🔍 Ogni test include timestamp, stato della richiesta e dati formattati")
# Verifica variabili d'ambiente
formatter.print_subheader("🔐 Verifica Configurazione")
env_vars = check_environment_variables()
print("Variabili d'ambiente:")
for var_name, is_present in env_vars.items():
status = "✅ Presente" if is_present else "❌ Mancante"
print(f" {var_name}: {status}")
# Inizializza provider
formatter.print_subheader("🏗️ Inizializzazione Provider")
providers = initialize_providers()
if not providers:
print("❌ Nessun provider disponibile. Verifica la configurazione.")
return
print(f"\n🎯 Provider disponibili per il test: {list(providers.keys())}")
# Testa ogni provider
formatter.print_header("🧪 ESECUZIONE TEST PROVIDER", "=", 80)
tester = ProviderTester()
all_results: List[Dict[str, Any]] = []
for provider_name, wrapper in providers.items():
try:
result = tester.test_provider(wrapper, provider_name)
all_results.append(result)
except Exception as e:
print(f"❌ Errore critico nel test di {provider_name}: {e}")
traceback.print_exc()
all_results.append({
"provider_name": provider_name,
"tests": {},
"overall_status": "CRITICAL_ERROR",
"error": str(e)
})
# Stampa riassunto finale
print_summary(all_results)
# Informazioni aggiuntive
formatter.print_header(" INFORMAZIONI AGGIUNTIVE", "=", 80)
print("📚 Documentazione:")
print(" - BaseWrapper: src/app/markets/base.py")
print(" - Test completi: tests/agents/test_market.py")
print(" - Configurazione: .env")
print("\n🔧 Per abilitare tutti i provider:")
print(" 1. Configura le credenziali nel file .env")
print(" 2. Segui la documentazione di ogni provider")
print(" 3. Riavvia il demo")
print(f"\n🏁 Demo completato: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if __name__ == "__main__":
main()

View File

@@ -1,18 +0,0 @@
#### FOR ALL FILES OUTSIDE src/ FOLDER ####
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../src')))
###########################################
from dotenv import load_dotenv
from app.api.news import NewsApiWrapper
def main():
api = NewsApiWrapper()
articles = api.get_latest_news(query="bitcoin", limit=5)
assert len(articles) > 0
print("ok")
if __name__ == "__main__":
load_dotenv()
main()

View File

@@ -1,20 +1,4 @@
#!/usr/bin/env python3
"""
Demo di Ollama (Python) mostra:
1. Elenco dei modelli disponibili
2. Generazione di testo semplice
3. Chat con streaming
4. Calcolo di embeddings
5. Esempio (opzionale) di function calling / tools
Uso:
python ollama_demo.py
Requisiti:
pip install ollama
Avviare il server Ollama (es. 'ollama serve' o l'app desktop) e avere i modelli già pullati.
"""
from typing import Any
import ollama
# Configurazione modelli
@@ -33,8 +17,8 @@ def list_models():
print(" (Nessun modello trovato)")
return
for m in models:
name = getattr(m, 'model', None) or (m.get('model') if isinstance(m, dict) else 'sconosciuto')
details = getattr(m, 'details', None)
name = getattr(m, 'model', None) or (m.get('model') if isinstance(m, dict) else 'sconosciuto') # type: ignore
details = getattr(m, 'details', None) # type: ignore
fmt = getattr(details, 'format', None) if details else 'unknown'
print(f"{name} {fmt}")
except Exception as e:
@@ -46,7 +30,7 @@ def list_models():
def generate_text(model: str, prompt: str, max_tokens: int = 200) -> str:
"""Genera testo dal modello indicato."""
print(f"\n[2] Generazione testo con '{model}'")
response = ollama.chat(
response = ollama.chat( # type: ignore
model=model,
messages=[{"role": "user", "content": prompt}]
)
@@ -57,10 +41,10 @@ def generate_text(model: str, prompt: str, max_tokens: int = 200) -> str:
# 3. Chat con streaming --------------------------------------------------------
def chat_streaming(model: str, messages: list) -> str:
def chat_streaming(model: str, messages: list[dict[str, str]]) -> str:
"""Esegue una chat mostrando progressivamente la risposta."""
print(f"\n[3] Chat (streaming) con '{model}'")
stream = ollama.chat(model=model, messages=messages, stream=True)
stream = ollama.chat(model=model, messages=messages, stream=True) # type: ignore
full = ""
for chunk in stream:
if 'message' in chunk and 'content' in chunk['message']:
@@ -91,7 +75,7 @@ def get_embedding(model: str, text: str):
def try_tools(model: str):
"""Esempio di function calling; se non supportato mostra messaggio informativo."""
print(f"\n[5] Function calling / tools con '{model}'")
tools = [
tools: list[dict[str, Any]] = [
{
"type": "function",
"function": {
@@ -109,7 +93,7 @@ def try_tools(model: str):
}
]
try:
response = ollama.chat(
response = ollama.chat( # type: ignore
model=model,
messages=[{"role": "user", "content": "Che tempo fa a Milano?"}],
tools=tools

View File

@@ -1,255 +1,160 @@
# 📊 Architettura e Flussi dell'App upo-appAI
# 📊 Architettura upo-appAI
## 🏗️ Diagramma Architettura Generale
## 🏗️ Architettura Generale
```
┌─────────────────────────────────────────────────────────────────┐
│ 🌐 GRADIO UI │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ User Input │ │ Provider │ │ Style │
│ (Query) │ │ (Model) │ │ (Conservative/ │ │
│ │ │ │ │ │ Aggressive) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 🔧 TOOL AGENT │
│ (Central Orchestrator) │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 1. Collect Data │ │ 2. Analyze │ │ 3. Predict & │ │
│ │ │ │ Sentiment │ │ Recommend │
└─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 📊 AGENT ECOSYSTEM │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌───────────┐│
│ │ MARKET │ │ NEWS │ │ SOCIAL │ │ PREDICTOR ││
│ │ AGENT │ │ AGENT │ │ AGENT │ │ AGENT ││
│ │ │ │ │ │ │ │ ││
│ │ 📈 Coinbase │ │ 📰 News API │ │ 🐦 Social │ │ 🤖 LLM ││
│ │ 📊 CryptoCmp│ │ │ │ Media │ │ Analysis ││
│ │ 🟡 Binance │ │ │ │ │ │ ││
│ └─────────────┘ └─────────────┘ └─────────────┘ └───────────┘│
└─────────────────────────────────────────────────────────────────┘
INTERFACCE UTENTE
├── 💬 Gradio Web (Chat + Dropdown modelli/strategie)
└── 📱 Telegram Bot (Mini App)
CHAT MANAGER
├── Storico messaggi
├── Gestione PipelineInputs
└── Salva/Carica chat
AGNO WORKFLOW PIPELINE (4 Steps)
├── 1. Query Check → Verifica crypto
├── 2. Condition → Valida procedere
├── 3. Info Recovery → Team raccolta dati
└── 4. Report Generation → Report finale
AGNO AGENT ECOSYSTEM
├── 👔 TEAM LEADER (coordina Market, News, Social)
│ Tools: ReasoningTools, PlanMemoryTool, CryptoSymbolsTools
├── 📈 MARKET AGENT → MarketAPIsTool
├── 📰 NEWS AGENT → NewsAPIsTool
├── 🐦 SOCIAL AGENT → SocialAPIsTool
├── 🔍 QUERY CHECK AGENT → QueryOutputs (is_crypto: bool)
└── 📋 REPORT GENERATOR AGENT → Strategia applicata
```
## 🔄 Flusso di Esecuzione Dettagliato
## 🔄 Flusso Esecuzione
```
👤 USER REQUEST
│ "Analizza Bitcoin con strategia aggressiva"
┌─────────────────────────────────────────────────────────────┐
🔧 TOOL AGENT │
│ │
│ def interact(query, provider, style): │
│ │ │
│ ├── 📊 market_data = market_agent.analyze(query) │
│ ├── 📰 news_sentiment = news_agent.analyze(query) │
│ ├── 🐦 social_sentiment = social_agent.analyze(query) │
│ │ │
│ └── 🤖 prediction = predictor_agent.predict(...) │
└─────────────────────────────────────────────────────────────┘
📊 MARKET AGENT - Parallel Data Collection
┌─────────────────────────────────────────────────────────────┐
│ │
│ 🔍 Auto-detect Available Providers: │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Coinbase │ │ CryptoComp │ │ Binance │ │
│ │ REST │ │ API │ │ Mock │ │
│ │ │ │ │ │ │ │
│ │ ✅ Active │ │ ✅ Active │ │ ✅ Active │ │
│ │ $63,500 BTC │ │ $63,450 BTC │ │ $63,600 BTC │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ 📈 Aggregated Result: │
│ { │
│ "aggregated_data": { │
│ "BTC_USD": { │
│ "price": 63516.67, │
│ "confidence": 0.94, │
│ "sources_count": 3 │
│ } │
│ }, │
│ "individual_sources": {...}, │
│ "market_signals": {...} │
│ } │
└─────────────────────────────────────────────────────────────┘
📰 NEWS AGENT + 🐦 SOCIAL AGENT
┌─────────────────────────────────────────────────────────────┐
│ │
│ 📰 News Sentiment: "Positive momentum, institutional │
│ adoption increasing..." │
│ │
│ 🐦 Social Sentiment: "Bullish sentiment on Reddit, │
│ Twitter mentions up 15%..." │
└─────────────────────────────────────────────────────────────┘
🤖 PREDICTOR AGENT
┌─────────────────────────────────────────────────────────────┐
│ │
│ Input: │
│ ├── 📊 Market Data (aggregated + confidence) │
│ ├── 📰🐦 Combined Sentiment │
│ ├── 🎯 Style: "aggressive" │
│ └── 🤖 Provider: "openai/anthropic/google..." │
│ │
│ 🧠 LLM Processing: │
│ "Based on high confidence market data (0.94) showing │
│ $63,516 BTC with positive sentiment across news and │
│ social channels, aggressive strategy recommendation..." │
└─────────────────────────────────────────────────────────────┘
📋 FINAL OUTPUT
┌─────────────────────────────────────────────────────────────┐
│ 📊 Market Data Summary │
│ 📰🐦 Sentiment Analysis │
│ 📈 Final Recommendation: │
│ "Strong BUY signal with 85% confidence..." │
└─────────────────────────────────────────────────────────────┘
**Input:** "Analizza Bitcoin con strategia aggressiva"
1. CHAT MANAGER riceve e prepara PipelineInputs
2. WORKFLOW PIPELINE esegue 4 step:
- Query Check: valida `is_crypto: true`
- Condition: se false, termina
- Info Recovery: Team raccoglie dati
- Report Generation: genera report
3. OUTPUT: Report con analisi + raccomandazioni
## 🏛️ Architettura API
**Tools (Agno Toolkit):**
- MarketAPIsTool: Binance, YFinance, CoinBase, CryptoCompare
- NewsAPIsTool: NewsAPI, GoogleNews, DuckDuckGo, CryptoPanic
- SocialAPIsTool: Reddit, X, 4chan
- CryptoSymbolsTools: `resources/cryptos.csv`
**WrapperHandler:** Failover automatico (3 tentativi/wrapper, 2s delay)
## 📊 Data Aggregation
**ProductInfo:**
- Volume: media tra sources
- Price: weighted average (price × volume)
- Confidence: spread + numero sources
**Historical Price:**
- Align per timestamp
- Media: high, low, open, close, volume
## 🎯 Configuration
**configs.yaml:**
```yaml
port: 8000
models: [Ollama, OpenAI, Anthropic, Google]
strategies: [Conservative, Aggressive]
agents: {team_model, team_leader_model, ...}
api: {retry_attempts: 3, retry_delay_seconds: 2}
```
## 🏛️ Architettura dei Provider (Market Agent)
**.env (API Keys):**
- Market: CDP_API_KEY, CRYPTOCOMPARE_API_KEY, ...
- News: NEWS_API_KEY, CRYPTOPANIC_API_KEY, ...
- Social: REDDIT_CLIENT_ID, X_API_KEY, ...
- LLM: OPENAI_API_KEY, ANTHROPIC_API_KEY, ...
- Bot: TELEGRAM_BOT_TOKEN
## 🗂️ Struttura Progetto
```
┌─────────────────────────────────────────────────────────────────┐
│ 📊 MARKET AGENT │
│ │
│ 🔍 Provider Detection Logic: │
┌─────────────────────────────────────────────────────────────┐│
│ def _setup_providers(): ││
│ ├── 🔑 Check CDP_API_KEY_NAME + CDP_API_PRIVATE_KEY ││
│ │ │ └── ✅ Setup Coinbase Advanced Trade ││
│ │ ├── 🔑 Check CRYPTOCOMPARE_API_KEY ││
│ │ └── ✅ Setup CryptoCompare ││
│ └── 🔑 Check BINANCE_API_KEY (future) ││
└── ✅ Setup Binance API ││
└─────────────────────────────────────────────────────────────┘│
📡 Data Flow: │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Provider 1 │───▶│ │◀───│ Provider 2 │ │
│ │ Coinbase │ │ AGGREGATOR │ │ CryptoComp │ │
│ │ │ │ │ │ │ │
│ │ Real-time │ │ ┌─────────┐ │ │ Real-time │ │
│ │ Market Data │ │ │Confidence│ │ │ Market Data │ │
│ └─────────────┘ │ │Scoring │ │ └─────────────┘ │
│ │ │ │ │ │
│ ┌─────────────┐ │ │ Spread │ │ ┌─────────────┐ │
│ │ Provider 3 │───▶│ │Analysis │ │◀───│ Provider N │ │
│ │ Binance │ │ │ │ │ │ Future │ │
│ │ │ │ └─────────┘ │ │ │ │
│ │ Mock Data │ │ │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
src/app/
├── __main__.py
├── configs.py
├── agents/
├── core.py
├── pipeline.py
│ ├── plan_memory_tool.py
│ └── prompts/
├── api/
├── wrapper_handler.py
├── core/ (markets, news, social)
├── markets/ (Binance, CoinBase, CryptoCompare, YFinance)
├── news/ (NewsAPI, GoogleNews, DuckDuckGo, CryptoPanic)
├── social/ (Reddit, X, 4chan)
└── tools/ (Agno Toolkits)
└── interface/ (chat.py, telegram_app.py)
tests/
demos/
resources/cryptos.csv
docs/
configs.yaml
.env
```
## 🔧 Signers Architecture
## 🔑 Componenti Chiave
```
┌─────────────────────────────────────────────────────────────────┐
│ 🔐 SIGNERS ECOSYSTEM │
│ │
│ 📁 src/app/signers/market_signers/ │
│ │ │
│ ├── 🏦 coinbase_rest_signer.py │
│ │ ├── 🔑 Uses: CDP_API_KEY_NAME + CDP_API_PRIVATE_KEY │
│ │ ├── 📡 RESTClient from coinbase.rest │
│ │ ├── 📊 get_asset_info() → Real Coinbase data │
│ │ └── 📈 get_multiple_assets() → Bulk data │
│ │ │
│ ├── 📊 cryptocompare_signer.py │
│ │ ├── 🔑 Uses: CRYPTOCOMPARE_API_KEY │
│ │ ├── 📡 Direct HTTP requests │
│ │ ├── 💰 get_crypto_prices() → Multi-currency │
│ │ └── 🏆 get_top_cryptocurrencies() → Market cap │
│ │ │
│ └── 🟡 binance_signer.py │
│ ├── 🔑 Uses: BINANCE_API_KEY (future) │
│ ├── 📡 Mock implementation │
│ ├── 🎭 Simulated market data │
│ └── 📈 Compatible interface │
└─────────────────────────────────────────────────────────────────┘
1. **Agno Framework**: Agent, Team, Workflow, Toolkit, RunEvent
2. **WrapperHandler**: Failover, Retry logic, Type safety
3. **Data Aggregation**: Multiple sources, Confidence score
4. **Multi-Interface**: Gradio + Telegram
5. **Configuration**: configs.yaml + .env
## 🚀 Deployment
**Docker:**
```bash
docker-compose up --build -d
```
## 🚀 Future Enhancement: Async Flow
```
📱 USER REQUEST
🔧 TOOL AGENT (async)
┌────────────────┼────────────────┐
│ │ │
▼ ▼ ▼
📊 Market 📰 News 🐦 Social
Agent (async) Agent (async) Agent (async)
│ │ │
┌────┼────┐ │ │
▼ ▼ ▼ │ │
Coinbase │ Binance │ │
CC │ │ │
▼▼▼ ▼ ▼
🔄 Parallel 📰 Sentiment 🐦 Sentiment
Aggregation Analysis Analysis
│ │ │
└────────────────┼────────────────┘
🤖 PREDICTOR AGENT
(LLM Analysis)
📋 FINAL RESULT
(JSON + Confidence)
**Local (UV):**
```bash
uv venv
uv pip install -e .
uv run src/app
```
## 📊 Data Flow Example
## 🎯 Workflow Asincrono
```
Input: "Analyze Bitcoin aggressive strategy"
├── 📊 Market Agent Output:
│ {
│ "aggregated_data": {
│ "BTC_USD": {"price": 63516.67, "confidence": 0.94}
│ },
│ "individual_sources": {
│ "coinbase": {"price": 63500, "volume": "1.2M"},
│ "cryptocompare": {"price": 63450, "volume": "N/A"},
│ "binance": {"price": 63600, "volume": "2.1M"}
│ },
│ "market_signals": {
│ "spread_analysis": "Low spread (0.24%) - healthy liquidity",
│ "price_divergence": "Max deviation: 0.24% - Normal range"
│ }
│ }
├── 📰 News Sentiment: "Positive institutional adoption news..."
├── 🐦 Social Sentiment: "Bullish Reddit sentiment, +15% mentions"
└── 🤖 Predictor Output:
"📈 Strong BUY recommendation based on:
- High confidence market data (94%)
- Positive news sentiment
- Bullish social indicators
- Low spread indicates healthy liquidity
```python
workflow = Workflow(steps=[
query_check, condition,
info_recovery, report_generation
])
Aggressive Strategy: Consider 15-20% portfolio allocation"
iterator = await workflow.arun(query, stream=True)
async for event in iterator:
if event.event == PipelineEvent.TOOL_USED:
log(f"Tool: {event.tool.tool_name}")
```
---
*Diagrammi creati: 2025-09-23*
*Sistema: upo-appAI Market Analysis Platform*
**Vantaggi:** Asincrono, Streaming, Condizionale, Retry
## 📈 Future Enhancements
- Parallel Tool Execution
- Caching (Redis)
- Database (PostgreSQL)
- Real-time WebSocket
- ML Models
- User Profiles
- Backtesting

View File

@@ -1,203 +0,0 @@
# 🚀 Diagramma Dettaglio: Implementazione Asincrona
## ⚡ Async Market Data Collection (Fase 3)
```
┌─────────────────────────────────────────────────────────────────┐
│ 🔧 TOOL AGENT │
│ │
│ async def interact(query, provider, style): │
│ │ │
│ ├── 📊 market_data = await market_agent.analyze_async() │
│ ├── 📰 news_data = await news_agent.analyze_async() │
│ ├── 🐦 social_data = await social_agent.analyze_async() │
│ │ │
│ └── 🤖 prediction = await predictor.predict_async(...) │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 📊 MARKET AGENT - ASYNC IMPLEMENTATION │
│ │
│ async def analyze_async(self, query): │
│ symbols = extract_symbols(query) # ["BTC", "ETH"] │
│ │ │
│ └── 🔄 tasks = [ │
│ │ self._query_coinbase_async(symbols), │
│ │ self._query_cryptocompare_async(symbols), │
│ │ self._query_binance_async(symbols) │
│ │ ] │
│ │ │
│ └── 📊 results = await asyncio.gather(*tasks) │
│ │ │
│ ▼ │
│ 🧮 aggregate_results(results) │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ ⏱️ TIMING DIAGRAM │
│ │
│ Time: 0ms 500ms 1000ms 1500ms 2000ms │
│ │ │ │ │ │ │
│ 📡 Start all requests │
│ ├─────────────────────────────────────────┐ │
│ │ 🏦 Coinbase Request │ │
│ │ ✅ Response │ (1.2s) │
│ ├─────────────────────────────┐ │ │
│ │ 📊 CryptoCompare Request │ │ │
│ │ ✅ Response (0.8s) │ │
│ ├─────────────┐ │ │ │
│ │ 🟡 Binance │ │ │ │
│ │ ✅ Response (0.3s - mock) │ │ │
│ │ │ │ │ │
│ └─────────────┼───────────────┼───────────┘ │
│ │ │ │
│ Wait for all... │ │
│ │ │
│ 🧮 Aggregate (1.2s total) │
│ │
│ 📈 Performance Gain: │
│ Sequential: 1.2s + 0.8s + 0.3s = 2.3s │
│ Parallel: max(1.2s, 0.8s, 0.3s) = 1.2s │
│ Improvement: ~48% faster! 🚀 │
└─────────────────────────────────────────────────────────────────┘
```
## 🧮 Aggregation Algorithm Detail
```
┌─────────────────────────────────────────────────────────────────┐
│ 🔬 DATA AGGREGATION LOGIC │
│ │
│ def aggregate_market_data(results): │
│ │ │
│ ├── 📊 Input Data: │
│ │ ┌─────────────────────────────────────────────────┐ │
│ │ │ coinbase: {"BTC": 63500, "ETH": 4150} │ │
│ │ │ cryptocomp: {"BTC": 63450, "ETH": 4160} │ │
│ │ │ binance: {"BTC": 63600, "ETH": 4140} │ │
│ │ └─────────────────────────────────────────────────┘ │
│ │ │
│ ├── 🧮 Price Calculation: │
│ │ ┌─────────────────────────────────────────────────┐ │
│ │ │ BTC_prices = [63500, 63450, 63600] │ │
│ │ │ BTC_avg = 63516.67 │ │
│ │ │ BTC_std = 75.83 │ │
│ │ │ BTC_spread = (max-min)/avg = 0.24% │ │
│ │ └─────────────────────────────────────────────────┘ │
│ │ │
│ ├── 🎯 Confidence Scoring: │
│ │ ┌─────────────────────────────────────────────────┐ │
│ │ │ confidence = 1 - (std_dev / mean) │ │
│ │ │ if spread < 0.5%: confidence += 0.1 │ │
│ │ │ if sources >= 3: confidence += 0.05 │ │
│ │ │ BTC_confidence = 0.94 (excellent!) │ │
│ │ └─────────────────────────────────────────────────┘ │
│ │ │
│ └── 📈 Market Signals: │
│ ┌─────────────────────────────────────────────────┐ │
│ │ spread_analysis: │ │
│ │ "Low spread (0.24%) indicates healthy liq." │ │
│ │ volume_trend: │ │
│ │ "Combined volume: 4.1M USD" │ │
│ │ price_divergence: │ │
│ │ "Max deviation: 0.24% - Normal range" │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
## 🔄 Error Handling & Resilience
```
┌─────────────────────────────────────────────────────────────────┐
│ 🛡️ RESILIENCE STRATEGY │
│ │
│ Scenario 1: One Provider Fails │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 🏦 Coinbase: ✅ Success (BTC: $63500) │ │
│ │ 📊 CryptoComp: ❌ Timeout/Error │ │
│ │ 🟡 Binance: ✅ Success (BTC: $63600) │ │
│ │ │ │
│ │ Result: Continue with 2 sources │ │
│ │ Confidence: 0.89 (slightly reduced) │ │
│ │ Note: "CryptoCompare unavailable" │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Scenario 2: Multiple Providers Fail │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 🏦 Coinbase: ❌ API Limit │ │
│ │ 📊 CryptoComp: ✅ Success (BTC: $63450) │ │
│ │ 🟡 Binance: ❌ Network Error │ │
│ │ │ │
│ │ Result: Single source data │ │
│ │ Confidence: 0.60 (low - warn user) │ │
│ │ Note: "Limited data - consider waiting" │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Scenario 3: All Providers Fail │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 🏦 Coinbase: ❌ Maintenance │ │
│ │ 📊 CryptoComp: ❌ API Down │ │
│ │ 🟡 Binance: ❌ Rate Limit │ │
│ │ │ │
│ │ Result: Graceful degradation │ │
│ │ Message: "Market data temporarily unavailable" │ │
│ │ Fallback: Cached data (if available) │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
## 📊 JSON Output Schema
```json
{
"aggregated_data": {
"BTC_USD": {
"price": 63516.67,
"confidence": 0.94,
"sources_count": 3,
"last_updated": "2025-09-23T17:30:00Z"
},
"ETH_USD": {
"price": 4150.33,
"confidence": 0.91,
"sources_count": 3,
"last_updated": "2025-09-23T17:30:00Z"
}
},
"individual_sources": {
"coinbase": {
"BTC": {"price": 63500, "volume": "1.2M", "status": "online"},
"ETH": {"price": 4150, "volume": "25.6M", "status": "online"}
},
"cryptocompare": {
"BTC": {"price": 63450, "volume": "N/A", "status": "active"},
"ETH": {"price": 4160, "volume": "N/A", "status": "active"}
},
"binance": {
"BTC": {"price": 63600, "volume": "2.1M", "status": "mock"},
"ETH": {"price": 4140, "volume": "18.3M", "status": "mock"}
}
},
"market_signals": {
"spread_analysis": "Low spread (0.24%) indicates healthy liquidity",
"volume_trend": "Combined BTC volume: 3.3M USD (+12% from avg)",
"price_divergence": "Max deviation: 0.24% - Normal range",
"data_quality": "High - 3 sources, low variance",
"recommendation": "Data suitable for trading decisions"
},
"metadata": {
"query_time_ms": 1247,
"sources_queried": ["coinbase", "cryptocompare", "binance"],
"sources_successful": ["coinbase", "cryptocompare", "binance"],
"sources_failed": [],
"aggregation_method": "weighted_average",
"confidence_threshold": 0.75
}
}
```
---
*Diagramma dettaglio asincrono: 2025-09-23*
*Focus: Performance, Resilienza, Qualità Dati*

View File

@@ -1,96 +0,0 @@
# 🚀 Piano di Implementazione - Market Data Enhancement
## 📋 Roadmap Implementazioni
### **Fase 1: Binance Mock Provider**
**Obiettivo**: Aggiungere terzo provider per test aggregazione
- ✅ Creare `binance_signer.py` con mock data
- ✅ Integrare nel MarketAgent
- ✅ Testare detection automatica provider
- **Deliverable**: 3 provider funzionanti (Coinbase, CryptoCompare, Binance)
### **Fase 2: Interrogazione Condizionale**
**Obiettivo**: Auto-detection credenziali e interrogazione intelligente
- ✅ Migliorare detection chiavi API nel MarketAgent
- ✅ Skip provider se credenziali mancanti (no errori)
- ✅ Logging informativo per provider disponibili/non disponibili
- ✅ Gestione graceful degradation
- **Deliverable**: Sistema resiliente che funziona con qualsiasi combinazione di provider
### **Fase 3: Interrogazione Asincrona + Aggregazione JSON**
**Obiettivo**: Performance boost e formato dati professionale
#### **3A. Implementazione Asincrona**
- ✅ Refactor MarketAgent per supporto `async/await`
- ✅ Chiamate parallele a tutti i provider disponibili
- ✅ Timeout management per provider lenti
- ✅ Error handling per provider che falliscono
#### **3B. Aggregazione Dati Intelligente**
- ✅ Calcolo `confidence` basato su concordanza prezzi
- ✅ Analisi `spread` tra provider
- ✅ Detection `price_divergence` per anomalie
- ✅ Volume trend analysis
- ✅ Formato JSON strutturato:
```json
{
"aggregated_data": {
"BTC_USD": {
"price": 43250.12,
"confidence": 0.95,
"sources_count": 4
}
},
"individual_sources": {
"coinbase": {"price": 43245.67, "volume": "1.2M"},
"binance": {"price": 43255.89, "volume": "2.1M"},
"cryptocompare": {"price": 43248.34, "volume": "0.8M"}
},
"market_signals": {
"spread_analysis": "Low spread (0.02%) indicates healthy liquidity",
"volume_trend": "Volume up 15% from 24h average",
"price_divergence": "Max deviation: 0.05% - Normal range"
}
}
```
**Deliverable**: Sistema asincrono con analisi avanzata dei dati di mercato
## 🎯 Benefici Attesi
### **Performance**
- ⚡ Tempo risposta: da ~4s sequenziali a ~1s paralleli
- 🔄 Resilienza: sistema funziona anche se 1-2 provider falliscono
- 📊 Qualità dati: validazione incrociata tra provider
### **Professionalità**
- 📈 Confidence scoring per decisioni informate
- 🔍 Market signals per trading insights
- 📋 Formato standardizzato per integrazioni future
### **Scalabilità**
- Facile aggiunta nuovi provider
- 🔧 Configurazione flessibile via environment
- 📝 Logging completo per debugging
## 🧪 Test Strategy
1. **Unit Tests**: Ogni provider singolarmente
2. **Integration Tests**: Aggregazione multi-provider
3. **Performance Tests**: Confronto sync vs async
4. **Resilience Tests**: Fallimento provider singoli
5. **E2E Tests**: Full workflow con UI Gradio
## 📅 Timeline Stimata
- **Fase 1**: ~1h (setup Binance mock)
- **Fase 2**: ~1h (detection condizionale)
- **Fase 3**: ~2-3h (async + aggregazione)
- **Testing**: ~1h (validation completa)
**Total**: ~5-6h di lavoro strutturato
---
*Documento creato: 2025-09-23*
*Versione: 1.0*

View File

@@ -1,73 +0,0 @@
# Guida alla Realizzazione del Progetto
Questa guida è una lista di controllo per l'implementazione del tuo progetto. È divisa in fasi logiche, ognuna con i compiti specifici da svolgere.
## Fase 1: Preparazione e Architettura di Base
### Impostazione dell'ambiente
* Scegliere il linguaggio di programmazione (es. **Python**).
* Utilizzare la libreria `agno` per la creazione di agenti e **LangChain/LlamaIndex** per la gestione dell'LLM e dell'orchestrazione.
### Definizione dell'Architettura degli agenti
* Definire la classe base per gli agenti, con metodi comuni come `execute()` e `reason()`.
* Delineare i ruoli e le interfacce di tutti gli agenti (`RicercatoreDati`, `AnalistaSentiment`, `MotorePredittivo`, `Orchestratore`), stabilendo come comunicheranno tra loro.
---
## Fase 2: Implementazione degli Agenti Core
### Agente `RicercatoreDati`
* Implementare la logica per connettersi a un'API di exchange (es. **Binance, Coindesk, CoinMarketCap**).
* Testare la capacità di recuperare dati in tempo reale per diverse criptovalute (prezzo, volume, capitalizzazione) e **assicurarsi che la gestione degli errori sia robusta**.
### Agente `AnalistaSentiment`
* **Agente `Social`:**
* Scegliere un metodo per lo scraping di forum e social media (es. **Reddit API, librerie per Twitter/X, BeautifulSoup per web scraping**).
* Implementare un modulo di analisi del testo per classificare il sentiment (positivo, negativo, neutro) utilizzando **modelli pre-addestrati (es. VADER) o fine-tuning di modelli più avanzati**.
* **Agente `News`:**
* Ottenere una chiave API per un servizio di notizie (es. **NewsAPI**).
* Implementare la logica per cercare articoli pertinenti a una criptovaluta specifica o al mercato in generale, e **filtrare le notizie in base a parole chiave rilevanti**.
### Agente `MotorePredittivo`
* Definire la logica per integrare i dati numerici del `RicercatoreDati` con il sentiment dell'`AnalistaSentiment`.
* Creare un **prompt avanzato** per l'LLM che lo guidi a generare previsioni e strategie. Dovrai usare tecniche come la **chain-of-thought** per rendere il ragionamento trasparente. Assicurarsi che il prompt includa vincoli specifici per lo stile di investimento (aggressivo/conservativo).
---
## Fase 3: Costruzione dell'Orchestratore e Test di Integrazione
### Implementazione dell'Agente Orchestratore
* **Gestione dell'Input Utente:** Creare un metodo che riceve la richiesta dell'utente (es. `analizza_cripto('Bitcoin', 'aggressivo')`). Analizzare il tipo di richiesta e le preferenze utente.
* **Recupero della Memoria Utente:** Integrare la logica per recuperare la cronologia delle richieste passate dal database e preparare i dati come contesto aggiuntivo per l'LLM.
* **Orchestrazione e Flusso di Lavoro:** Chiamare gli agenti (`RicercatoreDati`, `AnalistaSentiment`) e passare i risultati combinati all'**Agente `MotorePredittivo`** per generare previsioni e strategie.
* **Valutazione e Selezione Strategica:** Ricevere le previsioni dal `MotorePredittivo` e applicare le regole di valutazione basate sulle preferenze dell'utente per selezionare le strategie più appropriate.
* **Presentazione e Persistenza:** Costruire il report finale e salvare la sessione completa nel database.
---
## Fase 4: Gestione della Persistenza e dell'Interfaccia Utente
* **Database per la persistenza:** Scegli un database (es. **Firestore, MongoDB, PostgreSQL**) per salvare la cronologia delle richieste degli utenti. Implementa la logica per salvare e recuperare le sessioni di consulenza passate, associandole a un ID utente, e **struttura i dati per una ricerca efficiente**.
* **Interfaccia utente (UI):** Costruisci un'interfaccia utente semplice e intuitiva che permetta di inserire i parametri di richiesta. Aggiungi una sezione per visualizzare i risultati, inclusi i grafici e le note che spiegano il ragionamento dell'agente.
---
## Fase 5: Test del Sistema
* **Test unitari:** Esegui test su ogni agente singolarmente per assicurarti che funzioni correttamente (es. l'agente `RicercatoreDati` recupera i dati, l'agente `AnalistaSentiment` classifica correttamente un testo). **Crea dei mock per le API esterne per testare la logica interna senza dipendenze esterne**.
* **Test di integrazione:** Esegui scenari di test completi per l'intero sistema. Verifica che l'orchestrazione tra gli agenti avvenga senza intoppi e che i dati vengano passati correttamente tra di essi.
---
## Fase 6: Valutazione dei Risultati
* **Valutazione della qualità:** Verifica la qualità delle raccomandazioni generate. L'output è logico e ben argomentato?
* **Trasparenza del ragionamento:** Controlla che le note (`Ragionamenti`) siano chiare e forniscano un'effettiva trasparenza del processo decisionale dell'agente.
* **Confronto e validazione:** Confronta le raccomandazioni con dati storici e scenari ipotetici per valutarne la plausibilità.

View File

@@ -41,6 +41,13 @@ class PipelineInputs:
# ======================
# Dropdown handlers
# ======================
def choose_query_checker(self, index: int):
"""
Sceglie il modello LLM da usare per l'analizzatore di query.
"""
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
self.query_analyzer_model = self.configs.models.all_models[index]
def choose_team_leader(self, index: int):
"""
Sceglie il modello LLM da usare per il Team Leader.
@@ -55,6 +62,13 @@ class PipelineInputs:
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
self.team_model = self.configs.models.all_models[index]
def choose_report_generator(self, index: int):
"""
Sceglie il modello LLM da usare per il generatore di report.
"""
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
self.report_generation_model = self.configs.models.all_models[index]
def choose_strategy(self, index: int):
"""
Sceglie la strategia da usare per il Team.
@@ -119,3 +133,81 @@ class PipelineInputs:
social_tool = SocialAPIsTool()
social_tool.handler.set_retries(api.retry_attempts, api.retry_delay_seconds)
return market_tool, news_tool, social_tool
def __str__(self) -> str:
return "\n".join([
f"Query Check: {self.query_analyzer_model.label}",
f"Team Leader: {self.team_leader_model.label}",
f"Team: {self.team_model.label}",
f"Report: {self.report_generation_model.label}",
f"Strategy: {self.strategy.label}",
f"User Query: \"{self.user_query}\"",
])
class RunMessage:
"""
Classe per gestire i messaggi di stato durante l'esecuzione della pipeline.
Inizializza il messaggio con gli step e aggiorna lo stato, permettendo di ottenere
il messaggio più recente da inviare all'utente.
"""
def __init__(self, inputs: PipelineInputs, prefix: str = "", suffix: str = ""):
"""
Inizializza il messaggio di esecuzione con gli step iniziali.
Tre stati possibili per ogni step:
- In corso (🔳)
- In esecuzione (➡️)
- Completato (✅)
Lo stato di esecuzione può essere assegnato solo ad uno step alla volta.
Args:
inputs (PipelineInputs): Input della pipeline per mostrare la configurazione.
prefix (str, optional): Prefisso del messaggio. Defaults to "".
suffix (str, optional): Suffisso del messaggio. Defaults to "".
"""
self.base_message = f"Running configurations: \n{prefix}{inputs}{suffix}\n\n"
self.emojis = ['🔳', '➡️', '']
self.placeholder = '<<<>>>'
self.current = 0
self.steps_total = [
(f"{self.placeholder} Query Check", 1),
(f"{self.placeholder} Info Recovery", 0),
(f"{self.placeholder} Report Generation", 0),
]
def update(self) -> 'RunMessage':
"""
Sposta lo stato di esecuzione al passo successivo.
Lo step precedente completato viene marcato come completato.
Returns:
RunMessage: L'istanza aggiornata di RunMessage.
"""
text_curr, state_curr = self.steps_total[self.current]
self.steps_total[self.current] = (text_curr, state_curr + 1)
self.current = min(self.current + 1, len(self.steps_total))
if self.current < len(self.steps_total):
text_curr, state_curr = self.steps_total[self.current]
self.steps_total[self.current] = (text_curr, state_curr + 1)
return self
def update_step(self, text_extra: str = "") -> 'RunMessage':
"""
Aggiorna il messaggio per lo step corrente.
Args:
text_extra (str, optional): Testo aggiuntivo da includere nello step. Defaults to "".
"""
text_curr, state_curr = self.steps_total[self.current]
if text_extra:
text_curr = f"{text_curr.replace('', '')}\n╚═ {text_extra}"
self.steps_total[self.current] = (text_curr, state_curr)
return self
def get_latest(self) -> str:
"""
Restituisce il messaggio di esecuzione più recente.
Returns:
str: Messaggio di esecuzione aggiornato.
"""
steps = [msg.replace(self.placeholder, self.emojis[state]) for msg, state in self.steps_total]
return self.base_message + "\n".join(steps)

View File

@@ -33,7 +33,7 @@ class PipelineEvent(str, Enum):
(PipelineEvent.QUERY_ANALYZER, lambda _: logging.info(f"[{run_id}] Query Analyzer completed.")),
(PipelineEvent.INFO_RECOVERY, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")),
(PipelineEvent.REPORT_GENERATION, lambda _: logging.info(f"[{run_id}] Report Generation completed.")),
(PipelineEvent.TOOL_USED, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name}] by {e.agent_name}.")),
(PipelineEvent.TOOL_USED, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name} {e.tool.tool_args}] by {e.agent_name}.")),
(PipelineEvent.RUN_FINISHED, lambda _: logging.info(f"[{run_id}] Run completed.")),
]

View File

@@ -87,7 +87,7 @@ class WrapperHandler(Generic[WrapperType]):
Exception: If all wrappers fail after retries.
"""
logging.info(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}")
logging.debug(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}")
results: dict[str, OutputType] = {}
starting_index = self.index

View File

@@ -1,4 +1,4 @@
from app.interface.chat import ChatManager
from app.interface.telegram_app import TelegramApp
from app.interface.telegram import TelegramApp
__all__ = ["ChatManager", "TelegramApp"]

View File

@@ -79,6 +79,7 @@ class ChatManager:
with gr.Row():
provider = gr.Dropdown(
choices=self.inputs.list_models_names(),
value=self.inputs.team_leader_model.label,
type="index",
label="Modello da usare"
)
@@ -86,6 +87,7 @@ class ChatManager:
style = gr.Dropdown(
choices=self.inputs.list_strategies_names(),
value=self.inputs.strategy.label,
type="index",
label="Stile di investimento"
)

View File

@@ -1,6 +1,8 @@
import asyncio
import io
import os
import json
from typing import Any
import httpx
import logging
import warnings
@@ -9,7 +11,7 @@ from markdown_pdf import MarkdownPdf, Section
from telegram import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message, Update, User
from telegram.constants import ChatAction
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, ConversationHandler, MessageHandler, filters
from app.agents.pipeline import Pipeline, PipelineInputs
from app.agents.pipeline import Pipeline, PipelineEvent, PipelineInputs, RunMessage
# per per_message di ConversationHandler che rompe sempre qualunque input tu metta
warnings.filterwarnings("ignore")
@@ -21,23 +23,44 @@ logging = logging.getLogger("telegram")
# Un semplice schema delle interazioni:
# /start
# ║
# V
# v
# ╔══ CONFIGS <═════╗
# ║ ║ ╚══> SELECT_CONFIG
# ║ V
# ║ start_team (polling for updates)
# ║ v ^
# ║ MODELS ══════╝
# ║
# ╠══> start (polling for updates)
# ║ ║
# ║ V
# ║ v
# ╚═══> END
CONFIGS, SELECT_CONFIG = range(2)
CONFIGS, SELECT_MODEL, SELECT_CONFIG = range(3)
# Usato per separare la query arrivata da Telegram
QUERY_SEP = "|==|"
class ConfigsChat(Enum):
MODEL_CHECK = "Check Model"
MODEL_TEAM_LEADER = "Team Leader Model"
MODEL_TEAM = "Team Model"
MODEL_OUTPUT = "Output Model"
MODEL_REPORT = "Report Model"
CHANGE_MODELS = "Change Models"
STRATEGY = "Strategy"
CANCEL = "Cancel"
def get_inline_button(self, value_to_display:str="") -> InlineKeyboardButton:
display = self.value if not value_to_display else f"{self.value}: {value_to_display}"
return InlineKeyboardButton(display, callback_data=self.name)
def change_value(self, inputs: PipelineInputs, new_value:int) -> None:
functions_map = {
self.MODEL_CHECK.name: inputs.choose_query_checker,
self.MODEL_TEAM_LEADER.name: inputs.choose_team_leader,
self.MODEL_TEAM.name: inputs.choose_team,
self.MODEL_REPORT.name: inputs.choose_report_generator,
self.STRATEGY.name: inputs.choose_strategy,
}
functions_map[self.name](new_value)
class TelegramApp:
def __init__(self):
@@ -72,14 +95,21 @@ class TelegramApp:
entry_points=[CommandHandler('start', self.__start)],
states={
CONFIGS: [
CallbackQueryHandler(self.__model_team, pattern=ConfigsChat.MODEL_TEAM.name),
CallbackQueryHandler(self.__model_output, pattern=ConfigsChat.MODEL_OUTPUT.name),
CallbackQueryHandler(self.__models, pattern=ConfigsChat.CHANGE_MODELS.name),
CallbackQueryHandler(self.__strategy, pattern=ConfigsChat.STRATEGY.name),
CallbackQueryHandler(self.__cancel, pattern='^cancel$'),
MessageHandler(filters.TEXT, self.__start_team) # Any text message
CallbackQueryHandler(self.__cancel, pattern='^CANCEL$'),
MessageHandler(filters.TEXT, self.__start_llms) # Any text message
],
SELECT_MODEL: [
CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_CHECK.name),
CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_TEAM_LEADER.name),
CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_TEAM.name),
CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_REPORT.name),
CallbackQueryHandler(self.__go_to_start, pattern='^CANCEL$'),
],
SELECT_CONFIG: [
CallbackQueryHandler(self.__select_config, pattern=f"^__select_config{QUERY_SEP}.*$"),
CallbackQueryHandler(self.__go_to_start, pattern='^CANCEL$'),
]
},
fallbacks=[CommandHandler('start', self.__start)],
@@ -87,45 +117,28 @@ class TelegramApp:
self.app = app
def run(self) -> None:
"""
Start the Telegram bot polling. This will keep the bot running and listening for updates.\n
This function blocks until the bot is stopped.
"""
self.app.run_polling()
########################################
# Funzioni di utilità
########################################
async def start_message(self, user: User, query: CallbackQuery | Message) -> None:
confs = self.user_requests.setdefault(user, PipelineInputs())
str_model_team = f"{ConfigsChat.MODEL_TEAM.value}: {confs.team_model.label}"
str_model_output = f"{ConfigsChat.MODEL_OUTPUT.value}: {confs.team_leader_model.label}"
str_strategy = f"{ConfigsChat.STRATEGY.value}: {confs.strategy.label}"
msg, keyboard = (
"Please choose an option or write your query",
InlineKeyboardMarkup([
[InlineKeyboardButton(str_model_team, callback_data=ConfigsChat.MODEL_TEAM.name)],
[InlineKeyboardButton(str_model_output, callback_data=ConfigsChat.MODEL_OUTPUT.name)],
[InlineKeyboardButton(str_strategy, callback_data=ConfigsChat.STRATEGY.name)],
[InlineKeyboardButton("Cancel", callback_data='cancel')]
])
)
if isinstance(query, CallbackQuery):
await query.edit_message_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2')
else:
await query.reply_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2')
async def handle_callbackquery(self, update: Update) -> tuple[CallbackQuery, User]:
assert update.callback_query and update.callback_query.from_user, "Update callback_query or user is None"
assert update.callback_query, "Update callback_query is None"
assert update.effective_user, "Update effective_user is None"
query = update.callback_query
await query.answer() # Acknowledge the callback query
return query, query.from_user
return query, update.effective_user
async def handle_message(self, update: Update) -> tuple[Message, User]:
assert update.message and update.message.from_user, "Update message or user is None"
return update.message, update.message.from_user
def handle_message(self, update: Update) -> tuple[Message, User]:
assert update.message and update.effective_user, "Update message or user is None"
return update.message, update.effective_user
def build_callback_data(self, callback: str, config: ConfigsChat, labels: list[str]) -> list[tuple[str, str]]:
return [(label, QUERY_SEP.join((callback, config.value, str(i)))) for i, label in enumerate(labels)]
return [(label, QUERY_SEP.join((callback, config.name, str(i)))) for i, label in enumerate(labels)]
async def __error_handler(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
@@ -142,28 +155,69 @@ class TelegramApp:
logging.exception("Exception in the error handler")
#########################################
# Funzioni async per i comandi e messaggi
# Funzioni base di gestione stati
#########################################
async def __start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
message, user = await self.handle_message(update)
logging.info(f"@{user.username} started the conversation.")
await self.start_message(user, message)
user = update.effective_user.username if update.effective_user else "Unknown"
logging.info(f"@{user} started the conversation.")
return await self.__go_to_start(update, context)
async def __go_to_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
user = update.effective_user
assert user, "Update effective_user is None"
msg = update.callback_query if update.callback_query else update.message
assert msg, "Update message and callback_query are both None"
confs = self.user_requests.setdefault(user, PipelineInputs()) # despite the name, it creates a default only if not present
args: dict[str, Any] = {
"text": "Please choose an option or write your query",
"parse_mode": 'MarkdownV2',
"reply_markup": InlineKeyboardMarkup([
[ConfigsChat.CHANGE_MODELS.get_inline_button()],
[ConfigsChat.STRATEGY.get_inline_button(confs.strategy.label)],
[ConfigsChat.CANCEL.get_inline_button()],
])
}
await (msg.edit_message_text(**args) if isinstance(msg, CallbackQuery) else msg.reply_text(**args))
return CONFIGS
async def __model_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
return await self._model_select(update, ConfigsChat.MODEL_TEAM)
async def __cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query, user = await self.handle_callbackquery(update)
logging.info(f"@{user.username} canceled the conversation.")
if user in self.user_requests:
del self.user_requests[user]
await query.edit_message_text("Conversation canceled. Use /start to begin again.")
return ConversationHandler.END
async def __model_output(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
return await self._model_select(update, ConfigsChat.MODEL_OUTPUT)
##########################################
# Configurazioni
##########################################
async def __models(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query, user = await self.handle_callbackquery(update)
req = self.user_requests[user]
async def _model_select(self, update: Update, state: ConfigsChat, msg: str | None = None) -> int:
await query.edit_message_text("Select a model", reply_markup=InlineKeyboardMarkup([
[ConfigsChat.MODEL_CHECK.get_inline_button(req.query_analyzer_model.label)],
[ConfigsChat.MODEL_TEAM_LEADER.get_inline_button(req.team_leader_model.label)],
[ConfigsChat.MODEL_TEAM.get_inline_button(req.team_model.label)],
[ConfigsChat.MODEL_REPORT.get_inline_button(req.report_generation_model.label)],
[ConfigsChat.CANCEL.get_inline_button()]
]))
return SELECT_MODEL
async def __model_select(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query, user = await self.handle_callbackquery(update)
if not query.data:
logging.error("Callback query data is None")
return CONFIGS
req = self.user_requests[user]
models = self.build_callback_data("__select_config", state, req.list_models_names())
models = self.build_callback_data("__select_config", ConfigsChat[query.data], req.list_models_names())
inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in models]
await query.edit_message_text(msg or state.value, reply_markup=InlineKeyboardMarkup(inline_btns))
await query.edit_message_text("Select a model", reply_markup=InlineKeyboardMarkup(inline_btns))
return SELECT_CONFIG
async def __strategy(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
@@ -182,71 +236,62 @@ class TelegramApp:
req = self.user_requests[user]
_, state, index = str(query.data).split(QUERY_SEP)
if state == str(ConfigsChat.MODEL_TEAM):
req.choose_team(int(index))
if state == str(ConfigsChat.MODEL_OUTPUT):
req.choose_team_leader(int(index))
if state == str(ConfigsChat.STRATEGY):
req.choose_strategy(int(index))
ConfigsChat[state].change_value(req, int(index))
await self.start_message(user, query)
return CONFIGS
return await self.__go_to_start(update, context)
async def __start_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
message, user = await self.handle_message(update)
async def __start_llms(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
message, user = self.handle_message(update)
confs = self.user_requests[user]
confs.user_query = message.text or ""
logging.info(f"@{user.username} started the team with [{confs.team_model.label}, {confs.team_leader_model.label}, {confs.strategy.label}]")
await self.__run_team(update, confs)
logging.info(f"@{user.username} started the team with [{confs.query_analyzer_model.label}, {confs.team_model.label}, {confs.team_leader_model.label}, {confs.report_generation_model.label}, {confs.strategy.label}]")
await self.__run(update, confs)
logging.info(f"@{user.username} team finished.")
return ConversationHandler.END
async def __cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query, user = await self.handle_callbackquery(update)
logging.info(f"@{user.username} canceled the conversation.")
if user in self.user_requests:
del self.user_requests[user]
await query.edit_message_text("Conversation canceled. Use /start to begin again.")
return ConversationHandler.END
async def __run_team(self, update: Update, inputs: PipelineInputs) -> None:
##########################################
# RUN APP
##########################################
async def __run(self, update: Update, inputs: PipelineInputs) -> None:
if not update.message: return
bot = update.get_bot()
msg_id = update.message.message_id - 1
chat_id = update.message.chat_id
configs_str = [
'Running with configurations: ',
f'Team: {inputs.team_model.label}',
f'Output: {inputs.team_leader_model.label}',
f'Strategy: {inputs.strategy.label}',
f'Query: "{inputs.user_query}"'
]
full_message = f"""```\n{'\n'.join(configs_str)}\n```\n\n"""
first_message = full_message + "Generating report, please wait"
msg = await bot.edit_message_text(chat_id=chat_id, message_id=msg_id, text=first_message, parse_mode='MarkdownV2')
run_message = RunMessage(inputs, prefix="```\n", suffix="\n```")
msg = await bot.edit_message_text(chat_id=chat_id, message_id=msg_id, text=run_message.get_latest(), parse_mode='MarkdownV2')
if isinstance(msg, bool): return
# Remove user query and bot message
await bot.delete_message(chat_id=chat_id, message_id=update.message.id)
# TODO migliorare messaggi di attesa
def update_user(update_step: str = "") -> None:
if update_step: run_message.update_step(update_step)
else: run_message.update()
message = run_message.get_latest()
if msg.text != message:
asyncio.create_task(msg.edit_text(message, parse_mode='MarkdownV2'))
await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
pipeline = Pipeline(inputs)
report_content = await pipeline.interact_async()
await msg.delete()
report_content = await pipeline.interact_async(listeners=[
(PipelineEvent.QUERY_CHECK, lambda _: update_user()),
(PipelineEvent.TOOL_USED, lambda e: update_user(e.tool.tool_name.replace('get_', '').replace("_", "\\_"))),
(PipelineEvent.INFO_RECOVERY, lambda _: update_user()),
(PipelineEvent.REPORT_GENERATION, lambda _: update_user()),
])
# attach report file to the message
pdf = MarkdownPdf(toc_level=2, optimize=True)
pdf.add_section(Section(report_content, toc=False))
# TODO vedere se ha senso dare il pdf o solo il messaggio
document = io.BytesIO()
pdf.save_bytes(document)
document.seek(0)
await bot.send_document(chat_id=chat_id, document=document, filename="report.pdf", parse_mode='MarkdownV2', caption=full_message)
await msg.reply_document(document=document, filename="report.pdf")