Team Workflow aggiornato #37
@@ -1,4 +1,4 @@
|
|||||||
from app.agents.predictor import PredictorInput, PredictorOutput
|
|
||||||
from app.agents.pipeline import Pipeline, PipelineInputs, PipelineEvent
|
from app.agents.pipeline import Pipeline, PipelineInputs, PipelineEvent
|
||||||
|
|
|||||||
|
from app.agents.core import PipelineInputs, QueryOutputs
|
||||||
|
|
||||||
__all__ = ["PredictorInput", "PredictorOutput", "Pipeline", "PipelineInputs", "PipelineEvent"]
|
__all__ = ["Pipeline", "PipelineInputs", "PipelineEvent", "QueryOutputs"]
|
||||||
|
|||||||
72
src/app/agents/core.py
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
from pydantic import BaseModel
|
||||||
|
from app.configs import AppConfig
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class QueryOutputs(BaseModel):
|
||||||
|
response: str
|
||||||
|
is_ok: bool
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class PipelineInputs:
|
||||||
|
"""
|
||||||
|
Classe necessaria per passare gli input alla Pipeline.
|
||||||
|
Serve per raggruppare i parametri e semplificare l'inizializzazione.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, configs: AppConfig | None = None) -> None:
|
||||||
|
"""
|
||||||
|
Inputs per la Pipeline di agenti.
|
||||||
|
Setta i valori di default se non specificati.
|
||||||
|
"""
|
||||||
|
self.configs = configs if configs else AppConfig()
|
||||||
|
|
||||||
|
agents = self.configs.agents
|
||||||
|
self.team_model = self.configs.get_model_by_name(agents.team_model)
|
||||||
|
self.team_leader_model = self.configs.get_model_by_name(agents.team_leader_model)
|
||||||
|
self.predictor_model = self.configs.get_model_by_name(agents.predictor_model)
|
||||||
|
self.strategy = self.configs.get_strategy_by_name(agents.strategy)
|
||||||
|
self.user_query = ""
|
||||||
|
|
||||||
|
# ======================
|
||||||
|
# Dropdown handlers
|
||||||
|
# ======================
|
||||||
|
def choose_team_leader(self, index: int):
|
||||||
|
"""
|
||||||
|
Sceglie il modello LLM da usare per il Team Leader.
|
||||||
|
"""
|
||||||
|
self.leader_model = self.configs.models.all_models[index]
|
||||||
|
|
||||||
|
def choose_team(self, index: int):
|
||||||
|
"""
|
||||||
|
Sceglie il modello LLM da usare per il Team.
|
||||||
|
"""
|
||||||
|
self.team_model = self.configs.models.all_models[index]
|
||||||
|
|
||||||
|
def choose_strategy(self, index: int):
|
||||||
|
"""
|
||||||
|
Variable assignment uses Variable assignment uses `team_leader_model` instead of `leader_model` as mentioned in the method's docstring. The docstring should be updated or the variable name should be consistent.
The The `choose_team_leader` method assigns to `self.team_leader_model` but the initialization in `__init__` uses a different assignment pattern via `get_model_by_name()`. This inconsistency means the model assignment won't have the same validation and could break if `index` is out of bounds. Use `self.team_leader_model = self.configs.models.all_models[index]` with proper validation or use the same pattern as `__init__`.
```suggestion
model_list = self.configs.models.all_models
if 0 <= index < len(model_list):
model_name = model_list[index].name
self.team_leader_model = self.configs.get_model_by_name(model_name)
else:
raise IndexError(f"Model index {index} out of range for team leader selection.")
```
|
|||||||
|
Sceglie la strategia da usare per il Team.
|
||||||
|
"""
|
||||||
|
self.strategy = self.configs.strategies[index]
|
||||||
|
|
||||||
|
# ======================
|
||||||
|
# Helpers
|
||||||
|
# ======================
|
||||||
|
The The `choose_team` method has the same issue as `choose_team_leader`: it directly indexes into `all_models` without validation, which could raise an `IndexError` if the index is invalid. Consider adding bounds checking or using a safer retrieval method.
```suggestion
all_models = self.configs.models.all_models
if not (0 <= index < len(all_models)):
raise ValueError(f"Invalid index {index} for team models. Must be between 0 and {len(all_models)-1}.")
self.team_model = all_models[index]
```
|
|||||||
|
def list_models_names(self) -> list[str]:
|
||||||
|
"""
|
||||||
|
Restituisce la lista dei nomi dei modelli disponibili.
|
||||||
|
"""
|
||||||
|
return [model.label for model in self.configs.models.all_models]
|
||||||
|
|
||||||
|
def list_strategies_names(self) -> list[str]:
|
||||||
|
"""
|
||||||
|
Restituisce la lista delle strategie disponibili.
|
||||||
|
"""
|
||||||
|
return [strat.label for strat in self.configs.strategies]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -12,7 +12,7 @@ from agno.workflow.workflow import Workflow
|
|||||||
|
|
||||||
from app.api.tools import *
|
from app.api.tools import *
|
||||||
from app.agents.prompts import *
|
from app.agents.prompts import *
|
||||||
from app.configs import AppConfig
|
from app.agents.core import PipelineInputs
|
||||||
|
|
||||||
logging = logging.getLogger("pipeline")
|
logging = logging.getLogger("pipeline")
|
||||||
|
|
||||||
@@ -28,63 +28,6 @@ class PipelineEvent(str, Enum):
|
|||||||
return event == self.value or (WorkflowRunEvent.step_completed and step_name == self.value)
|
return event == self.value or (WorkflowRunEvent.step_completed and step_name == self.value)
|
||||||
|
|
||||||
|
|
||||||
class PipelineInputs:
|
|
||||||
"""
|
|
||||||
Classe necessaria per passare gli input alla Pipeline.
|
|
||||||
Serve per raggruppare i parametri e semplificare l'inizializzazione.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, configs: AppConfig | None = None) -> None:
|
|
||||||
"""
|
|
||||||
Inputs per la Pipeline di agenti.
|
|
||||||
Setta i valori di default se non specificati.
|
|
||||||
"""
|
|
||||||
self.configs = configs if configs else AppConfig()
|
|
||||||
|
|
||||||
agents = self.configs.agents
|
|
||||||
self.team_model = self.configs.get_model_by_name(agents.team_model)
|
|
||||||
self.team_leader_model = self.configs.get_model_by_name(agents.team_leader_model)
|
|
||||||
self.predictor_model = self.configs.get_model_by_name(agents.predictor_model)
|
|
||||||
self.strategy = self.configs.get_strategy_by_name(agents.strategy)
|
|
||||||
self.user_query = ""
|
|
||||||
|
|
||||||
# ======================
|
|
||||||
# Dropdown handlers
|
|
||||||
# ======================
|
|
||||||
def choose_team_leader(self, index: int):
|
|
||||||
"""
|
|
||||||
Sceglie il modello LLM da usare per il Team Leader.
|
|
||||||
"""
|
|
||||||
self.leader_model = self.configs.models.all_models[index]
|
|
||||||
|
|
||||||
def choose_team(self, index: int):
|
|
||||||
"""
|
|
||||||
Sceglie il modello LLM da usare per il Team.
|
|
||||||
"""
|
|
||||||
self.team_model = self.configs.models.all_models[index]
|
|
||||||
|
|
||||||
def choose_strategy(self, index: int):
|
|
||||||
"""
|
|
||||||
Sceglie la strategia da usare per il Team.
|
|
||||||
"""
|
|
||||||
self.strategy = self.configs.strategies[index]
|
|
||||||
|
|
||||||
# ======================
|
|
||||||
# Helpers
|
|
||||||
# ======================
|
|
||||||
def list_models_names(self) -> list[str]:
|
|
||||||
"""
|
|
||||||
Restituisce la lista dei nomi dei modelli disponibili.
|
|
||||||
"""
|
|
||||||
return [model.label for model in self.configs.models.all_models]
|
|
||||||
|
|
||||||
def list_strategies_names(self) -> list[str]:
|
|
||||||
"""
|
|
||||||
Restituisce la lista delle strategie disponibili.
|
|
||||||
"""
|
|
||||||
return [strat.label for strat in self.configs.strategies]
|
|
||||||
|
|
||||||
|
|
||||||
class Pipeline:
|
class Pipeline:
|
||||||
"""
|
"""
|
||||||
Coordina gli agenti di servizio (Market, News, Social) e il Predictor finale.
|
Coordina gli agenti di servizio (Market, News, Social) e il Predictor finale.
|
||||||
|
|||||||
@@ -1,16 +0,0 @@
|
|||||||
from pydantic import BaseModel, Field
|
|
||||||
from app.api.core.markets import ProductInfo
|
|
||||||
|
|
||||||
class PredictorInput(BaseModel):
|
|
||||||
data: list[ProductInfo] = Field(..., description="Market data as a list of ProductInfo")
|
|
||||||
style: str = Field(..., description="Prediction style")
|
|
||||||
sentiment: str = Field(..., description="Aggregated sentiment from news and social analysis")
|
|
||||||
|
|
||||||
class ItemPortfolio(BaseModel):
|
|
||||||
asset: str = Field(..., description="Name of the asset")
|
|
||||||
percentage: float = Field(..., description="Percentage allocation to the asset")
|
|
||||||
motivation: str = Field(..., description="Motivation for the allocation")
|
|
||||||
|
|
||||||
class PredictorOutput(BaseModel):
|
|
||||||
strategy: str = Field(..., description="Concise operational strategy in Italian")
|
|
||||||
portfolio: list[ItemPortfolio] = Field(..., description="List of portfolio items with allocations")
|
|
||||||
PipelineInputsis imported twice from different modules, which will cause the second import to shadow the first. Remove the duplicate import from line 1.