Merge branch 'main' into demos

This commit is contained in:
2025-10-22 09:07:50 +02:00
43 changed files with 10533 additions and 310 deletions

View File

@@ -42,6 +42,15 @@ CRYPTOPANIC_API_KEY=
REDDIT_API_CLIENT_ID= REDDIT_API_CLIENT_ID=
REDDIT_API_CLIENT_SECRET= REDDIT_API_CLIENT_SECRET=
# Per ottenere questa API è necessario seguire i seguenti passaggi:
# - Installare l'estensione su chrome X Auth Helper
# - Dargli il permesso di girare in incognito
# - Andare in incognito ed entrare sul proprio account X
# - Aprire l'estensione e fare "get key"
# - Chiudere chrome
# Dovrebbe funzionare per 5 anni o finchè non si si fa il log out, in ogni caso si può ricreare
X_API_KEY=
############################################################################### ###############################################################################
# Configurazioni per API di messaggistica # Configurazioni per API di messaggistica

View File

@@ -1,6 +1,11 @@
# Utilizziamo Debian slim invece di Alpine per migliore compatibilità # Utilizziamo Debian slim invece di Alpine per migliore compatibilità
FROM debian:bookworm-slim FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/*
# Installiamo le dipendenze di sistema
RUN apt-get update && \
apt-get install -y curl npm && \
rm -rf /var/lib/apt/lists/*
RUN npm install -g rettiwt-api
# Installiamo uv # Installiamo uv
RUN curl -LsSf https://astral.sh/uv/install.sh | sh RUN curl -LsSf https://astral.sh/uv/install.sh | sh

View File

@@ -32,7 +32,6 @@ models:
api: api:
retry_attempts: 3 retry_attempts: 3
retry_delay_seconds: 2 retry_delay_seconds: 2
currency: USD
# TODO Magari implementare un sistema per settare i providers # TODO Magari implementare un sistema per settare i providers
market_providers: [BinanceWrapper, YFinanceWrapper] market_providers: [BinanceWrapper, YFinanceWrapper]
news_providers: [GoogleNewsWrapper, DuckDuckGoWrapper] news_providers: [GoogleNewsWrapper, DuckDuckGoWrapper]
@@ -41,5 +40,6 @@ api:
agents: agents:
strategy: Conservative strategy: Conservative
team_model: qwen3:1.7b team_model: qwen3:1.7b
team_leader_model: qwen3:4b team_leader_model: qwen3:8b
predictor_model: qwen3:4b query_analyzer_model: qwen3:4b
report_generation_model: qwen3:8b

View File

@@ -14,6 +14,7 @@ dependencies = [
"dotenv", # Gestire variabili d'ambiente (generalmente API keys od opzioni) "dotenv", # Gestire variabili d'ambiente (generalmente API keys od opzioni)
"gradio", # UI web semplice con user_input e output "gradio", # UI web semplice con user_input e output
"colorlog", # Log colorati in console "colorlog", # Log colorati in console
"html5lib", # Parsing HTML & Scraping
# Per costruire agenti (ovvero modelli che possono fare più cose tramite tool) https://github.com/agno-agi/agno # Per costruire agenti (ovvero modelli che possono fare più cose tramite tool) https://github.com/agno-agi/agno
# altamente consigliata dato che ha anche tools integrati per fare scraping, calcoli e molto altro # altamente consigliata dato che ha anche tools integrati per fare scraping, calcoli e molto altro

9570
resources/cryptos.csv Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
from app.agents.predictor import PredictorInput, PredictorOutput from app.agents.pipeline import Pipeline, PipelineEvent
from app.agents.pipeline import Pipeline, PipelineInputs, PipelineEvent from app.agents.core import PipelineInputs, QueryOutputs
__all__ = ["PredictorInput", "PredictorOutput", "Pipeline", "PipelineInputs", "PipelineEvent"] __all__ = ["Pipeline", "PipelineInputs", "PipelineEvent", "QueryOutputs"]

121
src/app/agents/core.py Normal file
View File

@@ -0,0 +1,121 @@
from pydantic import BaseModel
from agno.agent import Agent
from agno.team import Team
from agno.tools.reasoning import ReasoningTools
from app.agents.plan_memory_tool import PlanMemoryTool
from app.api.tools import *
from app.configs import AppConfig
from app.agents.prompts import *
class QueryInputs(BaseModel):
user_query: str
strategy: str
class QueryOutputs(BaseModel):
response: str
is_crypto: bool
class PipelineInputs:
"""
Classe necessaria per passare gli input alla Pipeline.
Serve per raggruppare i parametri e semplificare l'inizializzazione.
"""
def __init__(self, configs: AppConfig | None = None) -> None:
"""
Inputs per la Pipeline di agenti.
Setta i valori di default se non specificati.
"""
self.configs = configs if configs else AppConfig()
agents = self.configs.agents
self.team_model = self.configs.get_model_by_name(agents.team_model)
self.team_leader_model = self.configs.get_model_by_name(agents.team_leader_model)
self.query_analyzer_model = self.configs.get_model_by_name(agents.query_analyzer_model)
self.report_generation_model = self.configs.get_model_by_name(agents.report_generation_model)
self.strategy = self.configs.get_strategy_by_name(agents.strategy)
self.user_query = ""
# ======================
# Dropdown handlers
# ======================
def choose_team_leader(self, index: int):
"""
Sceglie il modello LLM da usare per il Team Leader.
"""
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
self.team_leader_model = self.configs.models.all_models[index]
def choose_team(self, index: int):
"""
Sceglie il modello LLM da usare per il Team.
"""
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
self.team_model = self.configs.models.all_models[index]
def choose_strategy(self, index: int):
"""
Sceglie la strategia da usare per il Team.
"""
self.strategy = self.configs.strategies[index]
# ======================
# Helpers
# ======================
def list_models_names(self) -> list[str]:
"""
Restituisce la lista dei nomi dei modelli disponibili.
"""
return [model.label for model in self.configs.models.all_models]
def list_strategies_names(self) -> list[str]:
"""
Restituisce la lista delle strategie disponibili.
"""
return [strat.label for strat in self.configs.strategies]
def get_query_inputs(self) -> QueryInputs:
"""
Restituisce gli input per l'agente di verifica della query.
"""
return QueryInputs(
user_query=self.user_query,
strategy=self.strategy.label,
)
# ======================
# Agent getters
# ======================
def get_agent_team(self) -> Team:
market, news, social = self.get_tools()
market_agent = self.team_model.get_agent(MARKET_INSTRUCTIONS, "Market Agent", tools=[market])
news_agent = self.team_model.get_agent(NEWS_INSTRUCTIONS, "News Agent", tools=[news])
social_agent = self.team_model.get_agent(SOCIAL_INSTRUCTIONS, "Socials Agent", tools=[social])
return Team(
model=self.team_leader_model.get_model(TEAM_LEADER_INSTRUCTIONS),
name="CryptoAnalysisTeam",
tools=[ReasoningTools(), PlanMemoryTool(), CryptoSymbolsTools()],
members=[market_agent, news_agent, social_agent],
)
def get_agent_query_checker(self) -> Agent:
return self.query_analyzer_model.get_agent(QUERY_CHECK_INSTRUCTIONS, "Query Check Agent", output_schema=QueryOutputs)
def get_agent_report_generator(self) -> Agent:
return self.report_generation_model.get_agent(REPORT_GENERATION_INSTRUCTIONS, "Report Generator Agent")
def get_tools(self) -> tuple[MarketAPIsTool, NewsAPIsTool, SocialAPIsTool]:
"""
Restituisce la lista di tools disponibili per gli agenti.
"""
api = self.configs.api
market_tool = MarketAPIsTool()
market_tool.handler.set_retries(api.retry_attempts, api.retry_delay_seconds)
news_tool = NewsAPIsTool()
news_tool.handler.set_retries(api.retry_attempts, api.retry_delay_seconds)
social_tool = SocialAPIsTool()
social_tool.handler.set_retries(api.retry_attempts, api.retry_delay_seconds)
return market_tool, news_tool, social_tool

View File

@@ -4,85 +4,38 @@ import logging
import random import random
from typing import Any, Callable from typing import Any, Callable
from agno.agent import RunEvent from agno.agent import RunEvent
from agno.team import Team, TeamRunEvent
from agno.tools.reasoning import ReasoningTools
from agno.run.workflow import WorkflowRunEvent from agno.run.workflow import WorkflowRunEvent
from agno.workflow.types import StepInput, StepOutput
from agno.workflow.step import Step from agno.workflow.step import Step
from agno.workflow.workflow import Workflow from agno.workflow.workflow import Workflow
from app.agents.core import *
from app.api.tools import *
from app.agents.prompts import *
from app.configs import AppConfig
logging = logging.getLogger("pipeline") logging = logging.getLogger("pipeline")
class PipelineEvent(str, Enum): class PipelineEvent(str, Enum):
PLANNER = "Planner" QUERY_CHECK = "Query Check"
QUERY_ANALYZER = "Query Analyzer"
INFO_RECOVERY = "Info Recovery" INFO_RECOVERY = "Info Recovery"
REPORT_GENERATION = "Report Generation" REPORT_GENERATION = "Report Generation"
REPORT_TRANSLATION = "Report Translation" REPORT_TRANSLATION = "Report Translation"
TOOL_USED = RunEvent.tool_call_completed RUN_FINISHED = WorkflowRunEvent.workflow_completed.value
TOOL_USED = RunEvent.tool_call_completed.value
def check_event(self, event: str, step_name: str) -> bool: def check_event(self, event: str, step_name: str) -> bool:
return event == self.value or (WorkflowRunEvent.step_completed and step_name == self.value) return event == self.value or (WorkflowRunEvent.step_completed == event and step_name == self.value)
@classmethod
class PipelineInputs: def get_log_events(cls, run_id: int) -> list[tuple['PipelineEvent', Callable[[Any], None]]]:
""" return [
Classe necessaria per passare gli input alla Pipeline. (PipelineEvent.QUERY_CHECK, lambda _: logging.info(f"[{run_id}] Query Check completed.")),
Serve per raggruppare i parametri e semplificare l'inizializzazione. (PipelineEvent.QUERY_ANALYZER, lambda _: logging.info(f"[{run_id}] Query Analyzer completed.")),
""" (PipelineEvent.INFO_RECOVERY, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")),
(PipelineEvent.REPORT_GENERATION, lambda _: logging.info(f"[{run_id}] Report Generation completed.")),
def __init__(self, configs: AppConfig | None = None) -> None: (PipelineEvent.TOOL_USED, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name}] by {e.agent_name}.")),
""" (PipelineEvent.RUN_FINISHED, lambda _: logging.info(f"[{run_id}] Run completed.")),
Inputs per la Pipeline di agenti. ]
Setta i valori di default se non specificati.
"""
self.configs = configs if configs else AppConfig()
agents = self.configs.agents
self.team_model = self.configs.get_model_by_name(agents.team_model)
self.team_leader_model = self.configs.get_model_by_name(agents.team_leader_model)
self.predictor_model = self.configs.get_model_by_name(agents.predictor_model)
self.strategy = self.configs.get_strategy_by_name(agents.strategy)
self.user_query = ""
# ======================
# Dropdown handlers
# ======================
def choose_team_leader(self, index: int):
"""
Sceglie il modello LLM da usare per il Team Leader.
"""
self.leader_model = self.configs.models.all_models[index]
def choose_team(self, index: int):
"""
Sceglie il modello LLM da usare per il Team.
"""
self.team_model = self.configs.models.all_models[index]
def choose_strategy(self, index: int):
"""
Sceglie la strategia da usare per il Team.
"""
self.strategy = self.configs.strategies[index]
# ======================
# Helpers
# ======================
def list_models_names(self) -> list[str]:
"""
Restituisce la lista dei nomi dei modelli disponibili.
"""
return [model.label for model in self.configs.models.all_models]
def list_strategies_names(self) -> list[str]:
"""
Restituisce la lista delle strategie disponibili.
"""
return [strat.label for strat in self.configs.strategies]
class Pipeline: class Pipeline:
@@ -93,12 +46,14 @@ class Pipeline:
""" """
def __init__(self, inputs: PipelineInputs): def __init__(self, inputs: PipelineInputs):
"""
Inizializza la pipeline con gli input forniti.
Args:
inputs: istanza di PipelineInputs contenente le configurazioni e i parametri della pipeline.
"""
self.inputs = inputs self.inputs = inputs
# ====================== def interact(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []) -> str:
# Core interaction
# ======================
def interact(self, listeners: dict[RunEvent | TeamRunEvent, Callable[[PipelineEvent], None]] = {}) -> str:
""" """
Esegue la pipeline di agenti per rispondere alla query dell'utente. Esegue la pipeline di agenti per rispondere alla query dell'utente.
Args: Args:
@@ -108,7 +63,7 @@ class Pipeline:
""" """
return asyncio.run(self.interact_async(listeners)) return asyncio.run(self.interact_async(listeners))
async def interact_async(self, listeners: dict[RunEvent | TeamRunEvent, Callable[[PipelineEvent], None]] = {}) -> str: async def interact_async(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []) -> str:
""" """
Versione asincrona che esegue la pipeline di agenti per rispondere alla query dell'utente. Versione asincrona che esegue la pipeline di agenti per rispondere alla query dell'utente.
Args: Args:
@@ -119,61 +74,47 @@ class Pipeline:
run_id = random.randint(1000, 9999) # Per tracciare i log run_id = random.randint(1000, 9999) # Per tracciare i log
logging.info(f"[{run_id}] Pipeline query: {self.inputs.user_query}") logging.info(f"[{run_id}] Pipeline query: {self.inputs.user_query}")
# Step 1: Crea gli agenti e il team events = [*PipelineEvent.get_log_events(run_id), *listeners]
market_tool, news_tool, social_tool = self.get_tools() query = QueryInputs(
market_agent = self.inputs.team_model.get_agent(instructions=MARKET_INSTRUCTIONS, name="MarketAgent", tools=[market_tool]) user_query=self.inputs.user_query,
news_agent = self.inputs.team_model.get_agent(instructions=NEWS_INSTRUCTIONS, name="NewsAgent", tools=[news_tool]) strategy=self.inputs.strategy.description
social_agent = self.inputs.team_model.get_agent(instructions=SOCIAL_INSTRUCTIONS, name="SocialAgent", tools=[social_tool])
team = Team(
model=self.inputs.team_leader_model.get_model(COORDINATOR_INSTRUCTIONS),
name="CryptoAnalysisTeam",
tools=[ReasoningTools()],
members=[market_agent, news_agent, social_agent],
) )
# Step 3: Crea il workflow workflow = self.build_workflow()
#query_planner = Step(name=PipelineEvent.PLANNER, agent=Agent()) result = await self.run(workflow, query, events=events)
info_recovery = Step(name=PipelineEvent.INFO_RECOVERY, team=team)
#report_generation = Step(name=PipelineEvent.REPORT_GENERATION, agent=Agent())
#report_translate = Step(name=AppEvent.REPORT_TRANSLATION, agent=Agent())
workflow = Workflow(
name="App Workflow",
steps=[
#query_planner,
info_recovery,
#report_generation,
#report_translate
]
)
# Step 4: Fai partire il workflow e prendi l'output
query = f"The user query is: {self.inputs.user_query}\n\n They requested a {self.inputs.strategy.label} investment strategy."
result = await self.run(workflow, query, events={})
logging.info(f"[{run_id}] Run finished")
return result return result
# ======================
# Helpers
# =====================
def get_tools(self) -> tuple[MarketAPIsTool, NewsAPIsTool, SocialAPIsTool]:
"""
Restituisce la lista di tools disponibili per gli agenti.
"""
api = self.inputs.configs.api
market_tool = MarketAPIsTool(currency=api.currency) def build_workflow(self) -> Workflow:
market_tool.handler.set_retries(api.retry_attempts, api.retry_delay_seconds) """
news_tool = NewsAPIsTool() Costruisce il workflow della pipeline di agenti.
news_tool.handler.set_retries(api.retry_attempts, api.retry_delay_seconds) Returns:
social_tool = SocialAPIsTool() L'istanza di Workflow costruita.
social_tool.handler.set_retries(api.retry_attempts, api.retry_delay_seconds) """
# Step 1: Crea gli agenti e il team
team = self.inputs.get_agent_team()
query_check = self.inputs.get_agent_query_checker()
report = self.inputs.get_agent_report_generator()
return (market_tool, news_tool, social_tool) # Step 2: Crea gli steps
def condition_query_ok(step_input: StepInput) -> StepOutput:
val = step_input.previous_step_content
return StepOutput(stop=not val.is_crypto) if isinstance(val, QueryOutputs) else StepOutput(stop=True)
query_check = Step(name=PipelineEvent.QUERY_CHECK, agent=query_check)
info_recovery = Step(name=PipelineEvent.INFO_RECOVERY, team=team)
report_generation = Step(name=PipelineEvent.REPORT_GENERATION, agent=report)
# Step 3: Ritorna il workflow completo
return Workflow(name="App Workflow", steps=[
query_check,
condition_query_ok,
info_recovery,
report_generation
])
@classmethod @classmethod
async def run(cls, workflow: Workflow, query: str, events: dict[PipelineEvent, Callable[[Any], None]]) -> str: async def run(cls, workflow: Workflow, query: QueryInputs, events: list[tuple[PipelineEvent, Callable[[Any], None]]]) -> str:
""" """
Esegue il workflow e gestisce gli eventi tramite le callback fornite. Esegue il workflow e gestisce gli eventi tramite le callback fornite.
Args: Args:
@@ -188,16 +129,18 @@ class Pipeline:
content = None content = None
async for event in iterator: async for event in iterator:
step_name = getattr(event, 'step_name', '') step_name = getattr(event, 'step_name', '')
for app_event, listener in events:
for app_event, listener in events.items():
if app_event.check_event(event.event, step_name): if app_event.check_event(event.event, step_name):
listener(event) listener(event)
if event.event == WorkflowRunEvent.step_completed:
if event.event == WorkflowRunEvent.workflow_completed:
content = getattr(event, 'content', '') content = getattr(event, 'content', '')
if isinstance(content, str):
if content and isinstance(content, str):
think_str = "</think>" think_str = "</think>"
think = content.rfind(think_str) think = content.rfind(think_str)
content = content[(think + len(think_str)):] if think != -1 else content return content[(think + len(think_str)):] if think != -1 else content
if content and isinstance(content, QueryOutputs):
return content.response
return content if content else "No output from workflow, something went wrong." logging.error(f"No output from workflow: {content}")
return "No output from workflow, something went wrong."

View File

@@ -0,0 +1,55 @@
from agno.tools.toolkit import Toolkit
from typing import TypedDict, Literal
class Task(TypedDict):
name: str
status: Literal["pending", "completed", "failed"]
result: str | None
class PlanMemoryTool(Toolkit):
def __init__(self):
self.tasks: list[Task] = []
Toolkit.__init__(self, # type: ignore[call-arg]
instructions="This tool manages an execution plan. Add tasks, get the next pending task, update a task's status (completed, failed) and result, or list all tasks.",
tools=[
self.add_tasks,
self.get_next_pending_task,
self.update_task_status,
self.list_all_tasks,
]
)
def add_tasks(self, task_names: list[str]) -> str:
"""Adds multiple new tasks to the plan with 'pending' status."""
count = 0
for name in task_names:
if not any(t['name'] == name for t in self.tasks):
self.tasks.append({"name": name, "status": "pending", "result": None})
count += 1
return f"Added {count} new tasks."
def get_next_pending_task(self) -> Task | None:
"""Retrieves the first task that is still 'pending'."""
for task in self.tasks:
if task["status"] == "pending":
return task
return None
def update_task_status(self, task_name: str, status: Literal["completed", "failed"], result: str | None = None) -> str:
"""Updates the status and result of a specific task by its name."""
for task in self.tasks:
if task["name"] == task_name:
task["status"] = status
if result is not None:
task["result"] = result
return f"Task '{task_name}' updated to {status}."
return f"Error: Task '{task_name}' not found."
def list_all_tasks(self) -> list[str]:
"""Lists all tasks in the plan with their status and result."""
if not self.tasks:
return ["No tasks in the plan."]
return [f"- {t['name']}: {t['status']} (Result: {t.get('result', 'N/A')})" for t in self.tasks]

View File

@@ -1,16 +0,0 @@
from pydantic import BaseModel, Field
from app.api.core.markets import ProductInfo
class PredictorInput(BaseModel):
data: list[ProductInfo] = Field(..., description="Market data as a list of ProductInfo")
style: str = Field(..., description="Prediction style")
sentiment: str = Field(..., description="Aggregated sentiment from news and social analysis")
class ItemPortfolio(BaseModel):
asset: str = Field(..., description="Name of the asset")
percentage: float = Field(..., description="Percentage allocation to the asset")
motivation: str = Field(..., description="Motivation for the allocation")
class PredictorOutput(BaseModel):
strategy: str = Field(..., description="Concise operational strategy in Italian")
portfolio: list[ItemPortfolio] = Field(..., description="List of portfolio items with allocations")

View File

@@ -6,16 +6,18 @@ def __load_prompt(file_name: str) -> str:
file_path = __PROMPTS_PATH / file_name file_path = __PROMPTS_PATH / file_name
return file_path.read_text(encoding='utf-8').strip() return file_path.read_text(encoding='utf-8').strip()
COORDINATOR_INSTRUCTIONS = __load_prompt("team_leader.txt") TEAM_LEADER_INSTRUCTIONS = __load_prompt("team_leader.txt")
MARKET_INSTRUCTIONS = __load_prompt("team_market.txt") MARKET_INSTRUCTIONS = __load_prompt("team_market.txt")
NEWS_INSTRUCTIONS = __load_prompt("team_news.txt") NEWS_INSTRUCTIONS = __load_prompt("team_news.txt")
SOCIAL_INSTRUCTIONS = __load_prompt("team_social.txt") SOCIAL_INSTRUCTIONS = __load_prompt("team_social.txt")
PREDICTOR_INSTRUCTIONS = __load_prompt("predictor.txt") QUERY_CHECK_INSTRUCTIONS = __load_prompt("query_check.txt")
REPORT_GENERATION_INSTRUCTIONS = __load_prompt("report_generation.txt")
__all__ = [ __all__ = [
"COORDINATOR_INSTRUCTIONS", "TEAM_LEADER_INSTRUCTIONS",
"MARKET_INSTRUCTIONS", "MARKET_INSTRUCTIONS",
"NEWS_INSTRUCTIONS", "NEWS_INSTRUCTIONS",
"SOCIAL_INSTRUCTIONS", "SOCIAL_INSTRUCTIONS",
"PREDICTOR_INSTRUCTIONS", "QUERY_CHECK_INSTRUCTIONS",
"REPORT_GENERATION_INSTRUCTIONS",
] ]

View File

@@ -1,27 +0,0 @@
You are an **Allocation Algorithm (Crypto-Algo)** specialized in analyzing market data and sentiment to generate an investment strategy and a target portfolio.
Your sole objective is to process the user_input data and generate the strictly structured output as required by the response format. **You MUST NOT provide introductions, preambles, explanations, conclusions, or any additional comments that are not strictly required.**
## Processing Instructions (Absolute Rule)
The allocation strategy must be **derived exclusively from the "Allocation Logic" corresponding to the requested *style*** and the provided market/sentiment data. **DO NOT** use external or historical knowledge.
## Allocation Logic
### "Aggressivo" Style (Aggressive)
* **Priority:** Maximizing return (high volatility accepted).
* **Focus:** Higher allocation to **non-BTC/ETH assets** with high momentum potential (Altcoins, mid/low-cap assets).
* **BTC/ETH:** Must serve as a base (anchor), but their allocation **must not exceed 50%** of the total portfolio.
* **Sentiment:** Use positive sentiment to increase exposure to high-risk assets.
### "Conservativo" Style (Conservative)
* **Priority:** Capital preservation (volatility minimized).
* **Focus:** Major allocation to **BTC and/or ETH (Large-Cap Assets)**.
* **BTC/ETH:** Their allocation **must be at least 70%** of the total portfolio.
* **Altcoins:** Any allocations to non-BTC/ETH assets must be minimal (max 30% combined) and for assets that minimize speculative risk.
* **Sentiment:** Use positive sentiment only as confirmation for exposure, avoiding reactions to excessive "FOMO" signals.
## Output Requirements (Content MUST be in Italian)
1. **Strategy (strategy):** Must be a concise operational description **in Italian ("in Italiano")**, with a maximum of 5 sentences.
2. **Portfolio (portfolio):** The sum of all percentages must be **exactly 100%**. The justification (motivation) for each asset must be a single clear sentence **in Italian ("in Italiano")**.

View File

@@ -0,0 +1,18 @@
GOAL: check if the query is crypto-related
1) Determine the language of the query:
- This will help you understand better the intention of the user
- Focus on the query of the user
- DO NOT answer the query
2) Determine if the query is crypto or investment-related:
- Crypto-related if it mentions cryptocurrencies, tokens, NFTs, blockchain, exchanges, wallets, DeFi, oracles, smart contracts, on-chain, off-chain, staking, yield, liquidity, tokenomics, coins, ticker symbols, etc.
- Investment-related if it mentions stocks, bonds, options, trading strategies, financial markets, investment advice, portfolio management, etc.
- If the query uses generic terms like "news", "prices", "trends", "social", "market cap", "volume" with NO asset specified -> ASSUME CRYPTO/INVESTMENT CONTEXT and proceed.
- If the query is clearly about unrelated domains (weather, recipes, unrelated local politics, unrelated medicine, general software not about crypto, etc.) -> return NOT_CRYPTO error.
- If ambiguous: treat as crypto/investment only if the most likely intent is crypto/investment; otherwise return a JSON plan that first asks the user for clarification (see step structure below).
3) Ouput the result:
- if is crypto related then output the query
- if is not crypto related, then output why is not related in a brief message

View File

@@ -0,0 +1,61 @@
**TASK:** You are a specialized **Markdown Reporting Assistant**. Your task is to receive a structured analysis report from a "Team Leader" and re-format it into a single, cohesive, and well-structured final report in Markdown for the end-user.
**INPUT:** The input will be a structured block containing an `Overall Summary` and *zero or more* data sections (e.g., `Market`, `News`, `Social`, `Assumptions`). Each section will contain a `Summary` and `Full Data`.
**CORE RULES:**
1. **Strict Conditional Rendering (CRUCIAL):** Your primary job is to format *only* the data you receive. You MUST check each data section from the input (e.g., `Market & Price Data`, `News & Market Sentiment`).
2. **Omit Empty Sections (CRUCIAL):** If a data section is **not present** in the input, or if its `Full Data` field is empty, null, or marked as 'Data not available', you **MUST** completely omit that entire section from the final report. **DO NOT** print the Markdown header (e.g., `## 1. Market & Price Data`), the summary, or any placeholder text for that missing section.
3. **Omit Report Notes:** This same rule applies to the `## 4. Report Notes` section. Render it *only* if an `Assumptions` or `Execution Log` field is present in the input.
4. **Present All Data:** For sections that *are* present and contain data, your report's text MUST be based on the `Summary` provided, and you MUST include the `Full Data` (e.g., Markdown tables for prices).
5. **Do Not Invent:**
* **Do NOT** invent new hypotheses, metrics, or conclusions.
* **Do NOT** print internal field names (like 'Full Data') or agent names.
6. **No Extraneous Output:**
* Your entire response must be **only the Markdown report**.
* Do not include any pre-amble (e.g., "Here is the report:").
---
**MANDATORY REPORT STRUCTURE:**
(Follow the CORE RULES to conditionally render these sections. If no data sections are present, you will only render the Title and Executive Summary.)
# [Report Title - e.g., "Crypto Analysis Report: Bitcoin"]
## Executive Summary
[Use the `Overall Summary` from the input here.]
---
## 1. Market & Price Data
[Use the `Summary` from the input's Market section here.]
**Detailed Price Data:**
[Present the `Full Data` from the Market section here.]
---
## 2. News & Market Sentiment
[Use the `Summary` from the input's News section here.]
**Key Topics Discussed:**
[List the main topics identified in the News summary.]
**Supporting News/Data:**
[Present the `Full Data` from the News section here.]
---
## 3. Social Sentiment
[Use the `Summary` from the input's Social section here.]
**Trending Narratives:**
[List the main narratives identified in the Social summary.]
**Supporting Social/Data:**
[Present the `Full Data` from the Social section here.]
---
## 4. Report Notes
[Use this section to report any `Assumptions` or `Execution Log` data provided in the input.]

View File

@@ -1,15 +1,48 @@
You are the expert coordinator of a financial analysis team specializing in cryptocurrencies. **TASK:** You are the **Crypto Analysis Team Leader**, an expert coordinator of a financial analysis team.
Your team consists of three agents: **INPUT:** You will receive a user query. Your role is to create and execute an adaptive plan by coordinating your team of agents to retrieve data, judge its sufficiency, and provide an aggregated analysis.
- **MarketAgent**: Provides quantitative market data, price analysis, and technical indicators.
- **NewsAgent**: Scans and analyzes the latest news, articles, and official announcements.
- **SocialAgent**: Gauges public sentiment, trends, and discussions on social media.
Your primary objective is to answer the user's query by orchestrating the work of your team members. **YOUR TEAM CONSISTS OF THREE AGENTS:**
- **MarketAgent:** Fetches live prices and historical data.
- **NewsAgent:** Analyzes news sentiment and top topics.
- **SocialAgent:** Gauges public sentiment and trending narratives.
Your workflow is as follows: **PRIMARY OBJECTIVE:** Execute the user query by creating a dynamic execution plan. You must **use your available tools to manage the plan's state**, identify missing data, orchestrate agents to retrieve it, manage retrieval attempts, and judge sufficiency. The final goal is to produce a structured report including *all* retrieved data and an analytical summary for the final formatting LLM.
1. **Deconstruct the user's query** to identify the required information.
2. **Delegate specific tasks** to the most appropriate agent(s) to gather the necessary data and initial analysis. **WORKFLOW (Execution Logic):**
3. **Analyze the information** returned by the agents. 1. **Analyze Query & Scope Plan:** Analyze the user's query. Create an execution plan identifying the *target data* needed. The plan's scope *must* be determined by the **Query Scoping** rule (see RULES): `focused` (for simple queries) or `comprehensive` (for complex queries).
4. If the initial data is insufficient or the query is complex, **iteratively re-engage the agents** with follow-up questions to build a comprehensive picture. 2. **Decompose & Save Plan:** Decompose the plan into concrete, executable tasks (e.g., "Get BTC Price," "Analyze BTC News Sentiment," "Gauge BTC Social Sentiment"). **Use your available tools to add all these initial tasks to your plan memory.**
5. **Synthesize all the gathered information** into a final, coherent, and complete analysis that fills all the required output fields. 3. **Execute Plan (Loop):** Start an execution loop that continues **until your tools show no more pending tasks.**
4. **Get & Dispatch Task:** **Use your tools to retrieve the next pending task.** Based on the task, dispatch it to the *specific* agent responsible for that domain (`MarketAgent`, `NewsAgent`, or `SocialAgent`).
5. **Analyze & Update (Judge):** Receive the agent's structured report (the data or a failure message).
6. **Use your tools to update the task's status** (e.g., 'completed' or 'failed') and **store the received data/result.**
7. **Iterate & Retry (If Needed):**
* If a task `failed` (e.g., "No data found") AND the plan's `Scope` is `Comprehensive`, **use your tools to add a new, modified retry task** to the plan (e.g., "Retry: Get News with wider date range").
* This logic ensures you attempt to get all data for complex queries.
8. **Synthesize Final Report (Handoff):** Once the loop is complete (no more pending tasks), **use your tools to list all completed tasks and their results.** Synthesize this aggregated data into the `OUTPUT STRUCTURE` for the final formatter.
**BEHAVIORAL RULES:**
- **Tool-Driven State Management (Crucial):** You MUST use your available tools to create, track, and update your execution plan. Your workflow is a loop: 1. Get task from plan, 2. Execute task (via Agent), 3. Update task status in plan. Repeat until done.
- **Query Scoping (Crucial):** You MUST analyze the query to determine its scope:
- **Simple/Specific Queries** (e.g., "BTC Price?"): Create a *focused plan* (e.g., only one task for `MarketAgent`).
- **Complex/Analytical Queries** (e.g., "Status of Bitcoin?"): Create a *comprehensive plan* (e.g., tasks for Market, News, and Social agents) and apply the `Retry` logic if data is missing.
- **Retry & Failure Handling:** You must track failures. **Do not add more than 2-3 retry tasks for the same objective** (e.g., max 3 attempts total to get News). If failure persists, report "Data not available" in the final output.
- **Agent Delegation (No Data Tools):** You, the Leader, do not retrieve data. You *only* orchestrate. **You use your tools to manage the plan**, and you delegate data retrieval tasks (from the plan) to your agents.
- **Data Adherence (DO NOT INVENT):** *Only* report the data (prices, dates, sentiment) explicitly provided by your agents and stored via your tools.
**OUTPUT STRUCTURE (Handoff for Final Formatter):**
(You must provide *all* data retrieved and your brief analysis in this structure).
1. **Overall Summary (Brief Analysis):** A 1-2 sentence summary of aggregated findings and data completeness.
2. **Market & Price Data (from MarketAgent):**
* **Brief Analysis:** Your summary of the market data (e.g., key trends, volatility).
* **Full Data:** The *complete, raw data* (e.g., list of prices, timestamps) received from the agent.
3. **News & Market Sentiment (from NewsAgent):**
* **Brief Analysis:** Your summary of the sentiment and main topics identified.
* **Full Data:** The *complete list of articles/data* used by the agent. If not found, specify "Data not available".
4. **Social Sentiment (from SocialAgent):**
* **Brief Analysis:** Your summary of community sentiment and trending narratives.
* **Full Data:** The *complete list of posts/data* used by the agent. If not found, specify "Data not available".
5. **Execution Log & Assumptions:**
* **Scope:** (e.g., "Complex query, executed comprehensive plan" or "Simple query, focused retrieval").
* **Execution Notes:** (e.g., "NewsAgent failed 1st attempt. Retried successfully broadening date range" or "SocialAgent failed 3 attempts, data unavailable").

View File

@@ -1,19 +1,16 @@
**TASK:** You are a specialized **Crypto Price Data Retrieval Agent**. Your primary goal is to fetch the most recent and/or historical price data for requested cryptocurrency assets (e.g., 'BTC', 'ETH', 'SOL'). You must provide the data in a clear and structured format. **TASK:** You are a specialized **Crypto Price Data Retrieval Agent**. Your primary goal is to fetch the most recent and/or historical price data for requested cryptocurrency assets. You must provide the data in a clear and structured format.
**AVAILABLE TOOLS:**
1. `get_products(asset_ids: list[str])`: Get **current** product/price info for a list of assets. **(PREFERITA: usa questa per i prezzi live)**
2. `get_historical_prices(asset_id: str, limit: int)`: Get historical price data for one asset. Default limit is 100. **(PREFERITA: usa questa per i dati storici)**
3. `get_products_aggregated(asset_ids: list[str])`: Get **aggregated current** product/price info for a list of assets. **(USA SOLO SE richiesto 'aggregato' o se `get_products` fallisce)**
4. `get_historical_prices_aggregated(asset_id: str, limit: int)`: Get **aggregated historical** price data for one asset. **(USA SOLO SE richiesto 'aggregato' o se `get_historical_prices` fallisce)**
**USAGE GUIDELINE:** **USAGE GUIDELINE:**
* **Asset ID:** Always convert common names (e.g., 'Bitcoin', 'Ethereum') into their official ticker/ID (e.g., 'BTC', 'ETH'). - **Asset ID:** Always convert common names (e.g., 'Bitcoin', 'Ethereum') into their official ticker/ID (e.g., 'BTC', 'ETH').
* **Cost Management (Cruciale per LLM locale):** Prefer `get_products` and `get_historical_prices` for standard requests to minimize costs. - **Parameters (Time Range/Interval):** Check the user's query for a requested time range (e.g., "last 7 days") or interval (e.g., "hourly"). Use sensible defaults if not specified.
* **Aggregated Data:** Use `get_products_aggregated` or `get_historical_prices_aggregated` only if the user specifically requests aggregated data or you value that having aggregated data is crucial for the analysis. - **Tool Strategy:**
* **Failing Tool:** If the tool doesn't return any data or fails, try the alternative aggregated tool if not already used. 1. Attempt to use the primary price retrieval tools.
2. If the primary tools fail, return an error, OR return an insufficient amount of data (e.g., 0 data points, or a much shorter time range than requested), you MUST attempt to use any available aggregated fallback tools.
- **Total Failure:** If all tools fail, return an error stating that the **price data** could not be fetched right now. If you have the error message, report that too.
- **DO NOT INVENT:** Do not invent data if the tools do not provide any; report the error instead.
**REPORTING REQUIREMENT:** **REPORTING REQUIREMENT:**
1. **Format:** Output the results in a clear, easy-to-read list or table. 1. **Format:** Output the results in a clear, easy-to-read list or table.
2. **Live Price Request:** If an asset's *current price* is requested, report the **Asset ID**, **Latest Price**, and **Time/Date of the price**. 2. **Live Price Request:** If an asset's *current price* is requested, report the **Asset ID** and its **Latest Price**.
3. **Historical Price Request:** If *historical data* is requested, report the **Asset ID**, the **Limit** of points returned, and the **First** and **Last** entries from the list of historical prices (Date, Price). 3. **Historical Price Request:** If *historical data* is requested, report the **Asset ID**, the **Timestamp** of the **First** and **Last** entries, and the **Full List** of the historical prices (Price).
4. **Output:** For all requests, output a single, concise summary of the findings; if requested, also include the raw data retrieved. 4. **Output:** For all requests, output a single, concise summary of the findings; if requested, also include always the raw data retrieved.

View File

@@ -1,18 +1,17 @@
**TASK:** You are a specialized **Crypto News Analyst**. Your goal is to fetch the latest news or top headlines related to cryptocurrencies, and then **analyze the sentiment** of the content to provide a concise report to the team leader. Prioritize 'crypto' or specific cryptocurrency names (e.g., 'Bitcoin', 'Ethereum') in your searches. **TASK:** You are a specialized **Crypto News Analyst**. Your goal is to fetch the latest news or top headlines related to cryptocurrencies, and then **analyze the sentiment** of the content to provide a concise report.
**AVAILABLE TOOLS:**
1. `get_latest_news(query: str, limit: int)`: Get the 'limit' most recent news articles for a specific 'query'.
2. `get_top_headlines(limit: int)`: Get the 'limit' top global news headlines.
3. `get_latest_news_aggregated(query: str, limit: int)`: Get aggregated latest news articles for a specific 'query'.
4. `get_top_headlines_aggregated(limit: int)`: Get aggregated top global news headlines.
**USAGE GUIDELINE:** **USAGE GUIDELINE:**
* Always use `get_latest_news` with a relevant crypto-related query first. - **Querying:** You can search for more general news, but prioritize querying with a relevant crypto (e.g., 'Bitcoin', 'Ethereum').
* The default limit for news items should be 5 unless specified otherwise. - **Limit:** Check the user's query for a requested number of articles (limit). If no specific number is mentioned, use a default limit of 5.
* If the tool doesn't return any articles, respond with "No relevant news articles found." - **Tool Strategy:**
1. Attempt to use the primary tools (e.g., `get_latest_news`).
2. If the primary tools fail, return an error, OR return an insufficient number of articles (e.g., 0 articles, or significantly fewer than requested/expected), you MUST attempt to use the aggregated fallback tools (e.g., `get_latest_news_aggregated`) to find more results.
- **No Articles Found:** If all relevant tools are tried and no articles are returned, respond with "No relevant news articles found."
- **Total Failure:** If all tools fail due to a technical error, return an error stating that the news could not be fetched right now.
- **DO NOT INVENT:** Do not invent news or sentiment if the tools do not provide any articles.
**REPORTING REQUIREMENT:** **REPORTING REQUIREMENT (If news is found):**
1. **Analyze** the tone and key themes of the retrieved articles. 1. **Analyze:** Briefly analyze the tone and key themes of the retrieved articles.
2. **Summarize** the overall **market sentiment** (e.g., highly positive, cautiously neutral, generally negative) based on the content. 2. **Sentiment:** Summarize the overall **market sentiment** (e.g., highly positive, cautiously neutral, generally negative) based on the content.
3. **Identify** the top 2-3 **main topics** discussed (e.g., new regulation, price surge, institutional adoption). 3. **Topics:** Identify the top 2-3 **main topics** discussed (e.g., new regulation, price surge, institutional adoption).
4. **Output** a single, brief report summarizing these findings. Do not output the raw articles. 4. **Output:** Output a single, brief report summarizing these findings. **Do not** output the raw articles.

View File

@@ -1,15 +1,16 @@
**TASK:** You are a specialized **Social Media Sentiment Analyst**. Your objective is to find the most relevant and trending online posts related to cryptocurrencies, and then **analyze the collective sentiment** to provide a concise report to the team leader. **TASK:** You are a specialized **Social Media Sentiment Analyst**. Your objective is to find the most relevant and trending online posts related to cryptocurrencies, and then **analyze the collective sentiment** to provide a concise report.
**AVAILABLE TOOLS:**
1. `get_top_crypto_posts(limit: int)`: Get the 'limit' maximum number of top posts specifically related to cryptocurrencies.
**USAGE GUIDELINE:** **USAGE GUIDELINE:**
* Always use the `get_top_crypto_posts` tool to fulfill the request. - **Tool Strategy:**
* The default limit for posts should be 5 unless specified otherwise. 1. Attempt to use the primary tools (e.g., `get_top_crypto_posts`).
* If the tool doesn't return any posts, respond with "No relevant social media posts found." 2. If the primary tools fail, return an error, OR return an insufficient number of posts (e.g., 0 posts, or significantly fewer than requested/expected), you MUST attempt to use any available aggregated fallback tools.
- **Limit:** Check the user's query for a requested number of posts (limit). If no specific number is mentioned, use a default limit of 5.
- **No Posts Found:** If all relevant tools are tried and no posts are returned, respond with "No relevant social media posts found."
- **Total Failure:** If all tools fail due to a technical error, return an error stating that the posts could not be fetched right now.
- **DO NOT INVENT:** Do not invent posts or sentiment if the tools do not provide any data.
**REPORTING REQUIREMENT:** **REPORTING REQUIREMENT (If posts are found):**
1. **Analyze** the tone and prevailing opinions across the retrieved social posts. 1. **Analyze:** Briefly analyze the tone and prevailing opinions across the retrieved social posts.
2. **Summarize** the overall **community sentiment** (e.g., high enthusiasm/FOMO, uncertainty, FUD/fear) based on the content. 2. **Sentiment:** Summarize the overall **community sentiment** (e.g., high enthusiasm/FOMO, uncertainty, FUD/fear) based on the content.
3. **Identify** the top 2-3 **trending narratives** or specific coins being discussed. 3. **Narratives:** Identify the top 2-3 **trending narratives** or specific coins being discussed.
4. **Output** a single, brief report summarizing these findings. Do not output the raw posts. 4. **Output:** Output a single, brief report summarizing these findings. **Do not** output the raw posts.

View File

@@ -0,0 +1,22 @@
from datetime import datetime
def unified_timestamp(timestamp_ms: int | None = None, timestamp_s: int | None = None) -> str:
"""
Transform the timestamp from milliseconds or seconds to a unified string format.
The resulting string is a formatted string 'YYYY-MM-DD HH:MM'.
Args:
timestamp_ms: Timestamp in milliseconds.
timestamp_s: Timestamp in seconds.
Raises:
ValueError: If neither timestamp_ms nor timestamp_s is provided.
"""
if timestamp_ms is not None:
timestamp = timestamp_ms // 1000
elif timestamp_s is not None:
timestamp = timestamp_s
else:
raise ValueError("Either timestamp_ms or timestamp_s must be provided")
assert timestamp > 0, "Invalid timestamp data received"
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M')

View File

@@ -1,6 +1,6 @@
import statistics import statistics
from datetime import datetime
from pydantic import BaseModel from pydantic import BaseModel
from app.api.core import unified_timestamp
class ProductInfo(BaseModel): class ProductInfo(BaseModel):
@@ -64,24 +64,8 @@ class Price(BaseModel):
"""Timestamp in format YYYY-MM-DD HH:MM""" """Timestamp in format YYYY-MM-DD HH:MM"""
def set_timestamp(self, timestamp_ms: int | None = None, timestamp_s: int | None = None) -> None: def set_timestamp(self, timestamp_ms: int | None = None, timestamp_s: int | None = None) -> None:
""" """ Use the unified_timestamp function to set the timestamp."""
Sets the timestamp from milliseconds or seconds. self.timestamp = unified_timestamp(timestamp_ms, timestamp_s)
The timestamp is saved as a formatted string 'YYYY-MM-DD HH:MM'.
Args:
timestamp_ms: Timestamp in milliseconds.
timestamp_s: Timestamp in seconds.
Raises:
ValueError: If neither timestamp_ms nor timestamp_s is provided.
"""
if timestamp_ms is not None:
timestamp = timestamp_ms // 1000
elif timestamp_s is not None:
timestamp = timestamp_s
else:
raise ValueError("Either timestamp_ms or timestamp_s must be provided")
assert timestamp > 0, "Invalid timestamp data received"
self.timestamp = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M')
@staticmethod @staticmethod
def aggregate(prices: dict[str, list['Price']]) -> list['Price']: def aggregate(prices: dict[str, list['Price']]) -> list['Price']:

View File

@@ -1,6 +1,10 @@
from pydantic import BaseModel from pydantic import BaseModel
from app.api.core import unified_timestamp
MAX_COMMENTS = 5
class SocialPost(BaseModel): class SocialPost(BaseModel):
""" """
Represents a social media post with time, title, description, and comments. Represents a social media post with time, title, description, and comments.
@@ -10,6 +14,10 @@ class SocialPost(BaseModel):
description: str = "" description: str = ""
comments: list["SocialComment"] = [] comments: list["SocialComment"] = []
def set_timestamp(self, timestamp_ms: int | None = None, timestamp_s: int | None = None) -> None:
""" Use the unified_timestamp function to set the time."""
self.time = unified_timestamp(timestamp_ms, timestamp_s)
class SocialComment(BaseModel): class SocialComment(BaseModel):
""" """
Represents a comment on a social media post. Represents a comment on a social media post.
@@ -17,6 +25,10 @@ class SocialComment(BaseModel):
time: str = "" time: str = ""
description: str = "" description: str = ""
def set_timestamp(self, timestamp_ms: int | None = None, timestamp_s: int | None = None) -> None:
""" Use the unified_timestamp function to set the time."""
self.time = unified_timestamp(timestamp_ms, timestamp_s)
class SocialWrapper: class SocialWrapper:
""" """

View File

@@ -57,7 +57,9 @@ class BinanceWrapper(MarketWrapper):
""" """
Formatta l'asset_id nel formato richiesto da Binance. Formatta l'asset_id nel formato richiesto da Binance.
""" """
return asset_id.replace('-', '') if '-' in asset_id else f"{asset_id}{self.currency}" i = asset_id.find('-')
if i != -1: asset_id = asset_id[:i]
return f"{asset_id}{self.currency}" if self.currency not in asset_id else asset_id
def get_product(self, asset_id: str) -> ProductInfo: def get_product(self, asset_id: str) -> ProductInfo:
symbol = self.__format_symbol(asset_id) symbol = self.__format_symbol(asset_id)

View File

@@ -61,7 +61,9 @@ class CoinBaseWrapper(MarketWrapper):
) )
def __format(self, asset_id: str) -> str: def __format(self, asset_id: str) -> str:
return asset_id if '-' in asset_id else f"{asset_id}-{self.currency}" i = asset_id.find('-')
if i != -1: asset_id = asset_id[:i]
return f"{asset_id}-{self.currency}"
def get_product(self, asset_id: str) -> ProductInfo: def get_product(self, asset_id: str) -> ProductInfo:
asset_id = self.__format(asset_id) asset_id = self.__format(asset_id)

View File

@@ -47,8 +47,9 @@ class YFinanceWrapper(MarketWrapper):
Formatta il simbolo per yfinance. Formatta il simbolo per yfinance.
Per crypto, aggiunge '-' e la valuta (es. BTC -> BTC-USD). Per crypto, aggiunge '-' e la valuta (es. BTC -> BTC-USD).
""" """
asset_id = asset_id.upper() i = asset_id.find('-')
return f"{asset_id}-{self.currency}" if '-' not in asset_id else asset_id if i != -1: asset_id = asset_id[:i]
return f"{asset_id}-{self.currency}"
def get_product(self, asset_id: str) -> ProductInfo: def get_product(self, asset_id: str) -> ProductInfo:
symbol = self._format_symbol(asset_id) symbol = self._format_symbol(asset_id)

View File

@@ -1,3 +1,5 @@
from app.api.social.reddit import RedditWrapper from app.api.social.reddit import RedditWrapper
from app.api.social.x import XWrapper
from app.api.social.chan import ChanWrapper
__all__ = ["RedditWrapper"] __all__ = ["RedditWrapper", "XWrapper", "ChanWrapper"]

View File

@@ -0,0 +1,89 @@
'''
Usiamo le API di 4chan per ottenere un catalogo di threads dalla board /biz/
'''
import re
import html
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from app.api.core.social import *
class ChanWrapper(SocialWrapper):
def __init__(self):
super().__init__()
def __time_str(self, timestamp: str) -> int:
"""Converte una stringa da MM/GG/AA(DAY)HH:MM:SS di 4chan a millisecondi"""
time = datetime.strptime(timestamp, "%m/%d/%y(%a)%H:%M:%S")
return int(time.timestamp() * 1000)
def __unformat_html_str(self, html_element: str) -> str:
"""Pulisce il commento rimuovendo HTML e formattazioni inutili"""
if not html_element: return ""
html_entities = html.unescape(html_element)
soup = BeautifulSoup(html_entities, 'html.parser')
html_element = soup.get_text(separator=" ")
html_element = re.sub(r"[\\/]+", "/", html_element)
html_element = re.sub(r"\s+", " ", html_element).strip()
return html_element
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
url = 'https://a.4cdn.org/biz/catalog.json'
response = requests.get(url)
assert response.status_code == 200, f"Error in 4chan API request [{response.status_code}] {response.text}"
social_posts: list[SocialPost] = []
# Questa lista contiene un dizionario per ogni pagina della board di questo tipo {"page": page_number, "threads": [{thread_data}]}
for page in response.json():
for thread in page['threads']:
# ci indica se il thread è stato fissato o meno, se non è presente vuol dire che non è stato fissato, i thread sticky possono essere ignorati
if 'sticky' in thread:
continue
# la data di creazione del thread tipo "MM/GG/AA(day)hh:mm:ss", ci interessa solo MM/GG/AA
time = self.__time_str(thread.get('now', ''))
# il nome dell'utente
name: str = thread.get('name', 'Anonymous')
# il nome del thread, può contenere anche elementi di formattazione html che saranno da ignorare, potrebbe non essere presente
title = self.__unformat_html_str(thread.get('sub', ''))
title = f"{name} posted: {title}"
# il commento del thread, può contenere anche elementi di formattazione html che saranno da ignorare
thread_description = self.__unformat_html_str(thread.get('com', ''))
if not thread_description:
continue
# una lista di dizionari conteneti le risposte al thread principale, sono strutturate similarmente al thread
response_list = thread.get('last_replies', [])
comments_list: list[SocialComment] = []
for i, response in enumerate(response_list):
if i >= MAX_COMMENTS: break
# la data di creazione della risposta tipo "MM/GG/AA(day)hh:mm:ss", ci interessa solo MM/GG/AA
time = self.__time_str(response['now'])
# il commento della risposta, può contenere anche elementi di formattazione html che saranno da ignorare
comment = self.__unformat_html_str(response.get('com', ''))
if not comment:
continue
social_comment = SocialComment(description=comment)
social_comment.set_timestamp(timestamp_ms=time)
comments_list.append(social_comment)
social_post: SocialPost = SocialPost(
title=title,
description=thread_description,
comments=comments_list
)
social_post.set_timestamp(timestamp_ms=time)
social_posts.append(social_post)
return social_posts[:limit]

View File

@@ -1,10 +1,9 @@
import os import os
from praw import Reddit # type: ignore from praw import Reddit # type: ignore
from praw.models import Submission # type: ignore from praw.models import Submission # type: ignore
from app.api.core.social import SocialWrapper, SocialPost, SocialComment from app.api.core.social import *
MAX_COMMENTS = 5
# metterne altri se necessario. # metterne altri se necessario.
# fonti: https://lkiconsulting.io/marketing/best-crypto-subreddits/ # fonti: https://lkiconsulting.io/marketing/best-crypto-subreddits/
SUBREDDITS = [ SUBREDDITS = [
@@ -24,13 +23,13 @@ SUBREDDITS = [
def extract_post(post: Submission) -> SocialPost: def extract_post(post: Submission) -> SocialPost:
social = SocialPost() social = SocialPost()
social.time = str(post.created) social.set_timestamp(timestamp_ms=post.created)
social.title = post.title social.title = post.title
social.description = post.selftext social.description = post.selftext
for top_comment in post.comments: for top_comment in post.comments:
comment = SocialComment() comment = SocialComment()
comment.time = str(top_comment.created) comment.set_timestamp(timestamp_ms=top_comment.created)
comment.description = top_comment.body comment.description = top_comment.body
social.comments.append(comment) social.comments.append(comment)

46
src/app/api/social/x.py Normal file
View File

@@ -0,0 +1,46 @@
import os
import json
import subprocess
from shutil import which
from app.api.core.social import SocialWrapper, SocialPost
# This is the list of users that can be interesting
# To get the ID of a new user is necessary to search it on X, copy the url and insert it in a service like "https://get-id-x.foundtt.com/en/"
X_USERS = [
'watcherguru',
'Cointelegraph',
'BTC_Archive',
'elonmusk'
]
class XWrapper(SocialWrapper):
def __init__(self):
'''
This wrapper uses the rettiwt API to get data from X in order to avoid the rate limits of the free X API,
even if improbable this could lead to a ban so do not use the personal account,
In order to work it is necessary to install the rettiwt cli tool, for more information visit the official documentation at https://www.npmjs.com/package/rettiwt-api
'''
self.api_key = os.getenv("X_API_KEY")
assert self.api_key, "X_API_KEY environment variable not set"
assert which('rettiwt') is not None, "Command `rettiwt` not installed"
def get_top_crypto_posts(self, limit:int = 5) -> list[SocialPost]:
social_posts: list[SocialPost] = []
for user in X_USERS:
process = subprocess.run(f"rettiwt -k {self.api_key} tweet search -f {str(user)}", capture_output=True)
results = process.stdout.decode()
json_result = json.loads(results)
tweets = json_result['list']
for tweet in tweets[:limit]:
social_post = SocialPost()
social_post.time = tweet['createdAt']
social_post.title = str(user) + " tweeted: "
social_post.description = tweet['fullText']
social_posts.append(social_post)
return social_posts

View File

@@ -1,5 +1,6 @@
from app.api.tools.market_tool import MarketAPIsTool from app.api.tools.market_tool import MarketAPIsTool
from app.api.tools.social_tool import SocialAPIsTool from app.api.tools.social_tool import SocialAPIsTool
from app.api.tools.news_tool import NewsAPIsTool from app.api.tools.news_tool import NewsAPIsTool
from app.api.tools.symbols_tool import CryptoSymbolsTools
__all__ = ["MarketAPIsTool", "NewsAPIsTool", "SocialAPIsTool"] __all__ = ["MarketAPIsTool", "NewsAPIsTool", "SocialAPIsTool", "CryptoSymbolsTools"]

View File

@@ -15,7 +15,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
- CryptoCompareWrapper - CryptoCompareWrapper
""" """
def __init__(self, currency: str = "USD"): def __init__(self):
""" """
Initialize the MarketAPIsTool with multiple market API wrappers. Initialize the MarketAPIsTool with multiple market API wrappers.
The following wrappers are included in this order: The following wrappers are included in this order:
@@ -23,12 +23,9 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
- YFinanceWrapper - YFinanceWrapper
- CoinBaseWrapper - CoinBaseWrapper
- CryptoCompareWrapper - CryptoCompareWrapper
Args:
currency (str): Valuta in cui restituire i prezzi. Default è "USD".
""" """
kwargs = {"currency": currency or "USD"}
wrappers: list[type[MarketWrapper]] = [BinanceWrapper, YFinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper] wrappers: list[type[MarketWrapper]] = [BinanceWrapper, YFinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers, kwargs=kwargs) self.handler = WrapperHandler.build_wrappers(wrappers)
Toolkit.__init__( # type: ignore Toolkit.__init__( # type: ignore
self, self,

View File

@@ -1,7 +1,7 @@
from agno.tools import Toolkit from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler from app.api.wrapper_handler import WrapperHandler
from app.api.core.social import SocialPost, SocialWrapper from app.api.core.social import SocialPost, SocialWrapper
from app.api.social import RedditWrapper from app.api.social import *
class SocialAPIsTool(SocialWrapper, Toolkit): class SocialAPIsTool(SocialWrapper, Toolkit):
@@ -23,7 +23,7 @@ class SocialAPIsTool(SocialWrapper, Toolkit):
- RedditWrapper. - RedditWrapper.
""" """
wrappers: list[type[SocialWrapper]] = [RedditWrapper] wrappers: list[type[SocialWrapper]] = [RedditWrapper, XWrapper, ChanWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers) self.handler = WrapperHandler.build_wrappers(wrappers)
Toolkit.__init__( # type: ignore Toolkit.__init__( # type: ignore

View File

@@ -0,0 +1,103 @@
import os
import httpx
import asyncio
import logging
import pandas as pd
from io import StringIO
from agno.tools.toolkit import Toolkit
logging.basicConfig(level=logging.INFO)
logging = logging.getLogger("crypto_symbols")
BASE_URL = "https://finance.yahoo.com/markets/crypto/all/"
class CryptoSymbolsTools(Toolkit):
"""
Classe per ottenere i simboli delle criptovalute tramite Yahoo Finance.
"""
def __init__(self, cache_file: str = 'resources/cryptos.csv'):
self.cache_file = cache_file
self.final_table = pd.read_csv(self.cache_file) if os.path.exists(self.cache_file) else pd.DataFrame() # type: ignore
Toolkit.__init__(self, # type: ignore
name="Crypto Symbols Tool",
instructions="Tool to get cryptocurrency symbols and search them by name.",
tools=[
self.get_all_symbols,
self.get_symbols_by_name,
],
)
def get_all_symbols(self) -> list[str]:
"""
Restituisce tutti i simboli delle criptovalute.
Returns:
list[str]: Lista di tutti i simboli delle criptovalute.
"""
return self.final_table['Symbol'].tolist() if not self.final_table.empty else []
def get_symbols_by_name(self, query: str) -> list[tuple[str, str]]:
"""
Cerca i simboli che contengono la query.
Args:
query (str): Query di ricerca.
Returns:
list[tuple[str, str]]: Lista di tuple (simbolo, nome) che contengono la query.
"""
query_lower = query.lower()
positions = self.final_table['Name'].str.lower().str.contains(query_lower)
return self.final_table[positions][['Symbol', 'Name']].apply(tuple, axis=1).tolist()
async def fetch_crypto_symbols(self, force_refresh: bool = False) -> None:
"""
Recupera tutti i simboli delle criptovalute da Yahoo Finance e li memorizza in cache.
Args:
force_refresh (bool): Se True, forza il recupero anche se i dati sono già in cache.
"""
if not force_refresh and not self.final_table.empty:
return
num_currencies = 250 # It looks like this is the max per page otherwise yahoo returns 26
offset = 0
stop = not self.final_table.empty
table = self.final_table.copy()
while not stop:
text = await self.___request(offset, num_currencies)
tables = pd.read_html(text) # type: ignore
df = tables[0]
df.columns = table.columns if not table.empty else df.columns
table = pd.concat([table, df], ignore_index=True)
total_rows = df.shape[0]
offset += total_rows
if total_rows < num_currencies:
stop = True
table.dropna(axis=0, how='all', inplace=True) # type: ignore
table.dropna(axis=1, how='all', inplace=True) # type: ignore
table.to_csv(self.cache_file, index=False)
self.final_table = table
async def ___request(self, offset: int, num_currencies: int) -> StringIO:
while True:
async with httpx.AsyncClient() as client:
resp = await client.get(f"{BASE_URL}?start={offset}&count={num_currencies}", headers={"User-Agent": "Mozilla/5.0"})
if resp.status_code == 429: # Too many requests
secs = int(resp.headers.get("Retry-After", 2))
logging.warning(f"Rate limit exceeded, waiting {secs}s before retrying...")
await asyncio.sleep(secs)
continue
if resp.status_code != 200:
logging.error(f"Error fetching crypto symbols: [{resp.status_code}] {resp.text}")
break
return StringIO(resp.text)
return StringIO("")
if __name__ == "__main__":
crypto_symbols = CryptoSymbolsTools()
asyncio.run(crypto_symbols.fetch_crypto_symbols(force_refresh=True))

View File

@@ -97,12 +97,12 @@ class WrapperHandler(Generic[WrapperType]):
wrapper_name = wrapper.__class__.__name__ wrapper_name = wrapper.__class__.__name__
if not try_all: if not try_all:
logging.info(f"try_call {wrapper_name}") logging.debug(f"try_call {wrapper_name}")
for try_count in range(1, self.retry_per_wrapper + 1): for try_count in range(1, self.retry_per_wrapper + 1):
try: try:
result = func(wrapper) result = func(wrapper)
logging.info(f"{wrapper_name} succeeded") logging.debug(f"{wrapper_name} succeeded")
results[wrapper_name] = result results[wrapper_name] = result
break break

View File

@@ -57,7 +57,6 @@ class AppModel(BaseModel):
class APIConfig(BaseModel): class APIConfig(BaseModel):
retry_attempts: int = 3 retry_attempts: int = 3
retry_delay_seconds: int = 2 retry_delay_seconds: int = 2
currency: str = "USD"
class Strategy(BaseModel): class Strategy(BaseModel):
name: str = "Conservative" name: str = "Conservative"
@@ -76,7 +75,8 @@ class AgentsConfigs(BaseModel):
strategy: str = "Conservative" strategy: str = "Conservative"
team_model: str = "gemini-2.0-flash" team_model: str = "gemini-2.0-flash"
team_leader_model: str = "gemini-2.0-flash" team_leader_model: str = "gemini-2.0-flash"
predictor_model: str = "gemini-2.0-flash" query_analyzer_model: str = "gemini-2.0-flash"
report_generation_model: str = "gemini-2.0-flash"
class AppConfig(BaseModel): class AppConfig(BaseModel):
port: int = 8000 port: int = 8000

View File

@@ -13,25 +13,9 @@ class ChatManager:
""" """
def __init__(self): def __init__(self):
self.history: list[dict[str, str]] = [] # [{"role": "user"/"assistant", "content": "..."}] self.history: list[tuple[str, str]] = []
self.inputs = PipelineInputs() self.inputs = PipelineInputs()
def send_message(self, message: str) -> None:
"""
Aggiunge un messaggio utente, chiama la Pipeline e salva la risposta nello storico.
"""
# Aggiungi messaggio utente allo storico
self.history.append({"role": "user", "content": message})
def receive_message(self, response: str) -> str:
"""
Riceve un messaggio dalla pipeline e lo aggiunge allo storico.
"""
# Aggiungi risposta assistente allo storico
self.history.append({"role": "assistant", "content": response})
return response
def save_chat(self, filename: str = "chat.json") -> None: def save_chat(self, filename: str = "chat.json") -> None:
""" """
Salva la chat corrente in src/saves/<filename>. Salva la chat corrente in src/saves/<filename>.
@@ -55,7 +39,7 @@ class ChatManager:
""" """
self.history = [] self.history = []
def get_history(self) -> list[dict[str, str]]: def get_history(self) -> list[tuple[str, str]]:
""" """
Restituisce lo storico completo della chat. Restituisce lo storico completo della chat.
""" """
@@ -65,33 +49,28 @@ class ChatManager:
######################################## ########################################
# Funzioni Gradio # Funzioni Gradio
######################################## ########################################
def gradio_respond(self, message: str, history: list[dict[str, str]]) -> tuple[list[dict[str, str]], list[dict[str, str]], str]: def gradio_respond(self, message: str, history: list[tuple[str, str]]) -> str:
self.send_message(message)
self.inputs.user_query = message self.inputs.user_query = message
pipeline = Pipeline(self.inputs) pipeline = Pipeline(self.inputs)
response = pipeline.interact() response = pipeline.interact()
self.receive_message(response) self.history.append((message, response))
history.append({"role": "user", "content": message}) return response
history.append({"role": "assistant", "content": response})
return history, history, ""
def gradio_save(self) -> str: def gradio_save(self) -> str:
self.save_chat("chat.json") self.save_chat("chat.json")
return "💾 Chat salvata in chat.json" return "💾 Chat salvata in chat.json"
def gradio_load(self) -> tuple[list[dict[str, str]], list[dict[str, str]]]: def gradio_load(self) -> tuple[list[tuple[str, str]], list[tuple[str, str]]]:
self.load_chat("chat.json") self.load_chat("chat.json")
history: list[dict[str, str]] = [] history = self.get_history()
for m in self.get_history():
history.append({"role": m["role"], "content": m["content"]})
return history, history return history, history
def gradio_clear(self) -> tuple[list[dict[str, str]], list[dict[str, str]]]: def gradio_clear(self) -> tuple[list[str], list[str]]:
self.reset_chat() self.reset_chat()
return [], [] return [], []
def gradio_build_interface(self) -> gr.Blocks: def gradio_build_interface(self) -> gr.Blocks:
with gr.Blocks() as interface: with gr.Blocks() as interface:
gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)") gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)")
@@ -112,18 +91,16 @@ class ChatManager:
) )
style.change(fn=self.inputs.choose_strategy, inputs=style, outputs=None) style.change(fn=self.inputs.choose_strategy, inputs=style, outputs=None)
chatbot = gr.Chatbot(label="Conversazione", height=500, type="messages") chat = gr.ChatInterface(
msg = gr.Textbox(label="Scrivi la tua richiesta", placeholder="Es: Quali sono le crypto interessanti oggi?") fn=self.gradio_respond
)
with gr.Row(): with gr.Row():
clear_btn = gr.Button("🗑️ Reset Chat") clear_btn = gr.Button("🗑️ Reset Chat")
save_btn = gr.Button("💾 Salva Chat") save_btn = gr.Button("💾 Salva Chat")
load_btn = gr.Button("📂 Carica Chat") load_btn = gr.Button("📂 Carica Chat")
# Eventi e interazioni clear_btn.click(self.gradio_clear, inputs=None, outputs=[chat.chatbot, chat.chatbot_state])
msg.submit(self.gradio_respond, inputs=[msg, chatbot], outputs=[chatbot, chatbot, msg])
clear_btn.click(self.gradio_clear, inputs=None, outputs=[chatbot, chatbot])
save_btn.click(self.gradio_save, inputs=None, outputs=None) save_btn.click(self.gradio_save, inputs=None, outputs=None)
load_btn.click(self.gradio_load, inputs=None, outputs=[chatbot, chatbot]) load_btn.click(self.gradio_load, inputs=None, outputs=[chat.chatbot, chat.chatbot_state])
return interface return interface

View File

@@ -0,0 +1,48 @@
import pytest
from app.agents.core import QueryOutputs
from app.agents.prompts import QUERY_CHECK_INSTRUCTIONS
from app.configs import AppConfig
class TestQueryCheckAgent:
@pytest.fixture(autouse=True)
def setup(self):
self.configs = AppConfig.load()
self.model = self.configs.get_model_by_name("qwen3:1.7b")
self.agent = self.model.get_agent(QUERY_CHECK_INSTRUCTIONS, output_schema=QueryOutputs)
def test_query_not_ok(self):
response = self.agent.run("Is the sky blue?") # type: ignore
assert response is not None
assert response.content is not None
content = response.content
assert isinstance(content, QueryOutputs)
assert content.is_crypto == False
def test_query_not_ok2(self):
response = self.agent.run("What is the capital of France?") # type: ignore
assert response is not None
assert response.content is not None
content = response.content
assert isinstance(content, QueryOutputs)
assert content.is_crypto == False
def test_query_ok(self):
response = self.agent.run("Bitcoin") # type: ignore
assert response is not None
assert response.content is not None
content = response.content
assert isinstance(content, QueryOutputs)
assert content.is_crypto == True
def test_query_ok2(self):
response = self.agent.run("Ha senso investire in Ethereum?") # type: ignore
assert response is not None
assert response.content is not None
content = response.content
assert isinstance(content, QueryOutputs)
assert content.is_crypto == True

View File

@@ -0,0 +1,31 @@
import pytest
from app.agents.prompts import REPORT_GENERATION_INSTRUCTIONS
from app.configs import AppConfig
class TestReportGenerationAgent:
@pytest.fixture(autouse=True)
def setup(self):
self.configs = AppConfig.load()
self.model = self.configs.get_model_by_name("qwen3:1.7b")
self.agent = self.model.get_agent(REPORT_GENERATION_INSTRUCTIONS)
def test_report_generation(self):
sample_data = """
The analysis reported from the Market Agent have highlighted the following key metrics for the cryptocurrency market:
Bitcoin (BTC) has shown strong performance over the last 24 hours with a price of $30,000 and a Market Cap of $600 Billion
Ethereum (ETH) is currently priced at $2,000 with a Market Cap of $250 Billion and a 24h Volume of $20 Billion.
The overall market sentiment is bullish with a 5% increase in total market capitalization.
No significant regulatory news has been reported and the social media sentiment remains unknown.
"""
response = self.agent.run(sample_data) # type: ignore
assert response is not None
assert response.content is not None
content = response.content
assert isinstance(content, str)
print(content)
assert "Bitcoin" in content
assert "Ethereum" in content
assert "Summary" in content

37
tests/agents/test_team.py Normal file
View File

@@ -0,0 +1,37 @@
import asyncio
import pytest
from app.agents.core import PipelineInputs
from app.agents.prompts import *
from app.configs import AppConfig
# fix warning about no event loop
@pytest.fixture(scope="session", autouse=True)
def event_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
loop.close()
@pytest.mark.slow
class TestTeamAgent:
@pytest.fixture(autouse=True)
def setup(self):
self.configs = AppConfig.load()
self.configs.agents.team_model = "qwen3:1.7b"
self.configs.agents.team_leader_model = "qwen3:1.7b"
self.inputs = PipelineInputs(self.configs)
self.team = self.inputs.get_agent_team()
def test_team_agent_response(self):
self.inputs.user_query = "Is Bitcoin a good investment now?"
inputs = self.inputs.get_query_inputs()
response = self.team.run(inputs) # type: ignore
assert response is not None
assert response.content is not None
content = response.content
print(content)
assert isinstance(content, str)
assert "Bitcoin" in content

View File

@@ -0,0 +1,22 @@
import re
import pytest
from app.api.social.chan import ChanWrapper
@pytest.mark.social
@pytest.mark.api
class TestChanWrapper:
def test_initialization(self):
wrapper = ChanWrapper()
assert wrapper is not None
def test_get_top_crypto_posts(self):
wrapper = ChanWrapper()
posts = wrapper.get_top_crypto_posts(limit=2)
assert isinstance(posts, list)
assert len(posts) == 2
for post in posts:
assert post.title != ""
assert post.time != ""
assert re.match(r'\d{4}-\d{2}-\d{2}', post.time)
assert isinstance(post.comments, list)

View File

@@ -1,4 +1,5 @@
import os import os
import re
import pytest import pytest
from app.api.social.reddit import MAX_COMMENTS, RedditWrapper from app.api.social.reddit import MAX_COMMENTS, RedditWrapper
@@ -18,6 +19,8 @@ class TestRedditWrapper:
assert len(posts) == 2 assert len(posts) == 2
for post in posts: for post in posts:
assert post.title != "" assert post.title != ""
assert re.match(r'\d{4}-\d{2}-\d{2}', post.time)
assert isinstance(post.comments, list) assert isinstance(post.comments, list)
assert len(post.comments) <= MAX_COMMENTS assert len(post.comments) <= MAX_COMMENTS
for comment in post.comments: for comment in post.comments:

View File

@@ -0,0 +1,22 @@
import os
import re
import pytest
from app.api.social.x import XWrapper
@pytest.mark.social
@pytest.mark.api
@pytest.mark.skipif(not os.getenv("X_API_KEY"), reason="X_API_KEY not set in environment variables")
class TestXWrapper:
def test_initialization(self):
wrapper = XWrapper()
assert wrapper is not None
def test_get_top_crypto_posts(self):
wrapper = XWrapper()
posts = wrapper.get_top_crypto_posts(limit=2)
assert isinstance(posts, list)
assert len(posts) == 2
for post in posts:
assert post.title != ""
assert re.match(r'\d{4}-\d{2}-\d{2}', post.time)
assert isinstance(post.comments, list)

View File

@@ -0,0 +1,27 @@
import pytest
from app.api.tools import CryptoSymbolsTools
@pytest.mark.tools
class TestCryptoSymbolsTools:
def test_get_symbols(self):
tool = CryptoSymbolsTools()
symbols = tool.get_all_symbols()
assert isinstance(symbols, list)
assert "BTC-USD" in symbols
def test_get_symbol_by_name(self):
tool = CryptoSymbolsTools()
results = tool.get_symbols_by_name("Bitcoin")
assert isinstance(results, list)
assert ("BTC-USD", "Bitcoin USD") in results
results = tool.get_symbols_by_name("Banana")
assert isinstance(results, list)
assert ("BANANA28886-USD", "BananaCoin USD") in results
def test_get_symbol_by_invalid_name(self):
tool = CryptoSymbolsTools()
results = tool.get_symbols_by_name("InvalidName")
assert isinstance(results, list)
assert not results

24
uv.lock generated
View File

@@ -690,6 +690,19 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/07/c6/80c95b1b2b94682a72cbdbfb85b81ae2daffa4291fbfa1b1464502ede10d/hpack-4.1.0-py3-none-any.whl", hash = "sha256:157ac792668d995c657d93111f46b4535ed114f0c9c8d672271bbec7eae1b496", size = 34357, upload-time = "2025-01-22T21:44:56.92Z" }, { url = "https://files.pythonhosted.org/packages/07/c6/80c95b1b2b94682a72cbdbfb85b81ae2daffa4291fbfa1b1464502ede10d/hpack-4.1.0-py3-none-any.whl", hash = "sha256:157ac792668d995c657d93111f46b4535ed114f0c9c8d672271bbec7eae1b496", size = 34357, upload-time = "2025-01-22T21:44:56.92Z" },
] ]
[[package]]
name = "html5lib"
version = "1.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "six" },
{ name = "webencodings" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ac/b6/b55c3f49042f1df3dcd422b7f224f939892ee94f22abcf503a9b7339eaf2/html5lib-1.1.tar.gz", hash = "sha256:b2e5b40261e20f354d198eae92afc10d750afb487ed5e50f9c4eaf07c184146f", size = 272215, upload-time = "2020-06-22T23:32:38.834Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/6c/dd/a834df6482147d48e225a49515aabc28974ad5a4ca3215c18a882565b028/html5lib-1.1-py2.py3-none-any.whl", hash = "sha256:0d78f8fde1c230e99fe37986a60526d7049ed4bf8a9fadbad5f00e22e58e041d", size = 112173, upload-time = "2020-06-22T23:32:36.781Z" },
]
[[package]] [[package]]
name = "httpcore" name = "httpcore"
version = "1.0.9" version = "1.0.9"
@@ -1662,6 +1675,7 @@ dependencies = [
{ name = "gnews" }, { name = "gnews" },
{ name = "google-genai" }, { name = "google-genai" },
{ name = "gradio" }, { name = "gradio" },
{ name = "html5lib" },
{ name = "markdown-pdf" }, { name = "markdown-pdf" },
{ name = "newsapi-python" }, { name = "newsapi-python" },
{ name = "ollama" }, { name = "ollama" },
@@ -1682,6 +1696,7 @@ requires-dist = [
{ name = "gnews" }, { name = "gnews" },
{ name = "google-genai" }, { name = "google-genai" },
{ name = "gradio" }, { name = "gradio" },
{ name = "html5lib" },
{ name = "markdown-pdf" }, { name = "markdown-pdf" },
{ name = "newsapi-python" }, { name = "newsapi-python" },
{ name = "ollama" }, { name = "ollama" },
@@ -1714,6 +1729,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/85/cd/584a2ceb5532af99dd09e50919e3615ba99aa127e9850eafe5f31ddfdb9a/uvicorn-0.37.0-py3-none-any.whl", hash = "sha256:913b2b88672343739927ce381ff9e2ad62541f9f8289664fa1d1d3803fa2ce6c", size = 67976, upload-time = "2025-09-23T13:33:45.842Z" }, { url = "https://files.pythonhosted.org/packages/85/cd/584a2ceb5532af99dd09e50919e3615ba99aa127e9850eafe5f31ddfdb9a/uvicorn-0.37.0-py3-none-any.whl", hash = "sha256:913b2b88672343739927ce381ff9e2ad62541f9f8289664fa1d1d3803fa2ce6c", size = 67976, upload-time = "2025-09-23T13:33:45.842Z" },
] ]
[[package]]
name = "webencodings"
version = "0.5.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/0b/02/ae6ceac1baeda530866a85075641cec12989bd8d31af6d5ab4a3e8c92f47/webencodings-0.5.1.tar.gz", hash = "sha256:b36a1c245f2d304965eb4e0a82848379241dc04b865afcc4aab16748587e1923", size = 9721, upload-time = "2017-04-05T20:21:34.189Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/f4/24/2a3e3df732393fed8b3ebf2ec078f05546de641fe1b667ee316ec1dcf3b7/webencodings-0.5.1-py2.py3-none-any.whl", hash = "sha256:a0af1213f3c2226497a97e2b3aa01a7e4bee4f403f95be16fc9acd2947514a78", size = 11774, upload-time = "2017-04-05T20:21:32.581Z" },
]
[[package]] [[package]]
name = "websocket-client" name = "websocket-client"
version = "1.8.0" version = "1.8.0"