14 socials integration (#34)
* Create XWrapper.py & ChanWrapper.py * Tests for in XWrapper & ChanWrapper * MAX_COMMENTS in social.py * Soddisfatto Giacomo * unified_timestamp
This commit was merged in pull request #34.
This commit is contained in:
@@ -42,6 +42,15 @@ CRYPTOPANIC_API_KEY=
|
||||
REDDIT_API_CLIENT_ID=
|
||||
REDDIT_API_CLIENT_SECRET=
|
||||
|
||||
# Per ottenere questa API è necessario seguire i seguenti passaggi:
|
||||
# - Installare l'estensione su chrome X Auth Helper
|
||||
# - Dargli il permesso di girare in incognito
|
||||
# - Andare in incognito ed entrare sul proprio account X
|
||||
# - Aprire l'estensione e fare "get key"
|
||||
# - Chiudere chrome
|
||||
# Dovrebbe funzionare per 5 anni o finchè non si si fa il log out, in ogni caso si può ricreare
|
||||
X_API_KEY=
|
||||
|
||||
|
||||
###############################################################################
|
||||
# Configurazioni per API di messaggistica
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
# Utilizziamo Debian slim invece di Alpine per migliore compatibilità
|
||||
FROM debian:bookworm-slim
|
||||
RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Installiamo le dipendenze di sistema
|
||||
RUN apt-get update && \
|
||||
apt-get install -y curl npm && \
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
RUN npm install -g rettiwt-api
|
||||
|
||||
# Installiamo uv
|
||||
RUN curl -LsSf https://astral.sh/uv/install.sh | sh
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def unified_timestamp(timestamp_ms: int | None = None, timestamp_s: int | None = None) -> str:
|
||||
"""
|
||||
Transform the timestamp from milliseconds or seconds to a unified string format.
|
||||
The resulting string is a formatted string 'YYYY-MM-DD HH:MM'.
|
||||
Args:
|
||||
timestamp_ms: Timestamp in milliseconds.
|
||||
timestamp_s: Timestamp in seconds.
|
||||
Raises:
|
||||
ValueError: If neither timestamp_ms nor timestamp_s is provided.
|
||||
"""
|
||||
if timestamp_ms is not None:
|
||||
timestamp = timestamp_ms // 1000
|
||||
elif timestamp_s is not None:
|
||||
timestamp = timestamp_s
|
||||
else:
|
||||
raise ValueError("Either timestamp_ms or timestamp_s must be provided")
|
||||
assert timestamp > 0, "Invalid timestamp data received"
|
||||
|
||||
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M')
|
||||
@@ -1,6 +1,6 @@
|
||||
import statistics
|
||||
from datetime import datetime
|
||||
from pydantic import BaseModel
|
||||
from app.api.core import unified_timestamp
|
||||
|
||||
|
||||
class ProductInfo(BaseModel):
|
||||
@@ -64,24 +64,8 @@ class Price(BaseModel):
|
||||
"""Timestamp in format YYYY-MM-DD HH:MM"""
|
||||
|
||||
def set_timestamp(self, timestamp_ms: int | None = None, timestamp_s: int | None = None) -> None:
|
||||
"""
|
||||
Sets the timestamp from milliseconds or seconds.
|
||||
The timestamp is saved as a formatted string 'YYYY-MM-DD HH:MM'.
|
||||
Args:
|
||||
timestamp_ms: Timestamp in milliseconds.
|
||||
timestamp_s: Timestamp in seconds.
|
||||
Raises:
|
||||
ValueError: If neither timestamp_ms nor timestamp_s is provided.
|
||||
"""
|
||||
if timestamp_ms is not None:
|
||||
timestamp = timestamp_ms // 1000
|
||||
elif timestamp_s is not None:
|
||||
timestamp = timestamp_s
|
||||
else:
|
||||
raise ValueError("Either timestamp_ms or timestamp_s must be provided")
|
||||
assert timestamp > 0, "Invalid timestamp data received"
|
||||
|
||||
self.timestamp = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M')
|
||||
""" Use the unified_timestamp function to set the timestamp."""
|
||||
self.timestamp = unified_timestamp(timestamp_ms, timestamp_s)
|
||||
|
||||
@staticmethod
|
||||
def aggregate(prices: dict[str, list['Price']]) -> list['Price']:
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
from pydantic import BaseModel
|
||||
from app.api.core import unified_timestamp
|
||||
|
||||
|
||||
|
||||
MAX_COMMENTS = 5
|
||||
|
||||
class SocialPost(BaseModel):
|
||||
"""
|
||||
Represents a social media post with time, title, description, and comments.
|
||||
@@ -10,6 +14,10 @@ class SocialPost(BaseModel):
|
||||
description: str = ""
|
||||
comments: list["SocialComment"] = []
|
||||
|
||||
def set_timestamp(self, timestamp_ms: int | None = None, timestamp_s: int | None = None) -> None:
|
||||
""" Use the unified_timestamp function to set the time."""
|
||||
self.time = unified_timestamp(timestamp_ms, timestamp_s)
|
||||
|
||||
class SocialComment(BaseModel):
|
||||
"""
|
||||
Represents a comment on a social media post.
|
||||
@@ -17,6 +25,10 @@ class SocialComment(BaseModel):
|
||||
time: str = ""
|
||||
description: str = ""
|
||||
|
||||
def set_timestamp(self, timestamp_ms: int | None = None, timestamp_s: int | None = None) -> None:
|
||||
""" Use the unified_timestamp function to set the time."""
|
||||
self.time = unified_timestamp(timestamp_ms, timestamp_s)
|
||||
|
||||
|
||||
class SocialWrapper:
|
||||
"""
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from app.api.social.reddit import RedditWrapper
|
||||
from app.api.social.x import XWrapper
|
||||
from app.api.social.chan import ChanWrapper
|
||||
|
||||
__all__ = ["RedditWrapper"]
|
||||
__all__ = ["RedditWrapper", "XWrapper", "ChanWrapper"]
|
||||
|
||||
89
src/app/api/social/chan.py
Normal file
89
src/app/api/social/chan.py
Normal file
@@ -0,0 +1,89 @@
|
||||
'''
|
||||
Usiamo le API di 4chan per ottenere un catalogo di threads dalla board /biz/
|
||||
'''
|
||||
import re
|
||||
import html
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from datetime import datetime
|
||||
from app.api.core.social import *
|
||||
|
||||
|
||||
class ChanWrapper(SocialWrapper):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def __time_str(self, timestamp: str) -> int:
|
||||
"""Converte una stringa da MM/GG/AA(DAY)HH:MM:SS di 4chan a millisecondi"""
|
||||
time = datetime.strptime(timestamp, "%m/%d/%y(%a)%H:%M:%S")
|
||||
return int(time.timestamp() * 1000)
|
||||
|
||||
def __unformat_html_str(self, html_element: str) -> str:
|
||||
"""Pulisce il commento rimuovendo HTML e formattazioni inutili"""
|
||||
if not html_element: return ""
|
||||
|
||||
html_entities = html.unescape(html_element)
|
||||
soup = BeautifulSoup(html_entities, 'html.parser')
|
||||
html_element = soup.get_text(separator=" ")
|
||||
html_element = re.sub(r"[\\/]+", "/", html_element)
|
||||
html_element = re.sub(r"\s+", " ", html_element).strip()
|
||||
return html_element
|
||||
|
||||
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
|
||||
url = 'https://a.4cdn.org/biz/catalog.json'
|
||||
response = requests.get(url)
|
||||
assert response.status_code == 200, f"Error in 4chan API request [{response.status_code}] {response.text}"
|
||||
|
||||
social_posts: list[SocialPost] = []
|
||||
|
||||
# Questa lista contiene un dizionario per ogni pagina della board di questo tipo {"page": page_number, "threads": [{thread_data}]}
|
||||
for page in response.json():
|
||||
for thread in page['threads']:
|
||||
|
||||
# ci indica se il thread è stato fissato o meno, se non è presente vuol dire che non è stato fissato, i thread sticky possono essere ignorati
|
||||
if 'sticky' in thread:
|
||||
continue
|
||||
|
||||
# la data di creazione del thread tipo "MM/GG/AA(day)hh:mm:ss", ci interessa solo MM/GG/AA
|
||||
time = self.__time_str(thread.get('now', ''))
|
||||
|
||||
# il nome dell'utente
|
||||
name: str = thread.get('name', 'Anonymous')
|
||||
|
||||
# il nome del thread, può contenere anche elementi di formattazione html che saranno da ignorare, potrebbe non essere presente
|
||||
title = self.__unformat_html_str(thread.get('sub', ''))
|
||||
title = f"{name} posted: {title}"
|
||||
|
||||
# il commento del thread, può contenere anche elementi di formattazione html che saranno da ignorare
|
||||
thread_description = self.__unformat_html_str(thread.get('com', ''))
|
||||
if not thread_description:
|
||||
continue
|
||||
|
||||
# una lista di dizionari conteneti le risposte al thread principale, sono strutturate similarmente al thread
|
||||
response_list = thread.get('last_replies', [])
|
||||
comments_list: list[SocialComment] = []
|
||||
|
||||
for i, response in enumerate(response_list):
|
||||
if i >= MAX_COMMENTS: break
|
||||
|
||||
# la data di creazione della risposta tipo "MM/GG/AA(day)hh:mm:ss", ci interessa solo MM/GG/AA
|
||||
time = self.__time_str(response['now'])
|
||||
|
||||
# il commento della risposta, può contenere anche elementi di formattazione html che saranno da ignorare
|
||||
comment = self.__unformat_html_str(response.get('com', ''))
|
||||
if not comment:
|
||||
continue
|
||||
|
||||
social_comment = SocialComment(description=comment)
|
||||
social_comment.set_timestamp(timestamp_ms=time)
|
||||
comments_list.append(social_comment)
|
||||
|
||||
social_post: SocialPost = SocialPost(
|
||||
title=title,
|
||||
description=thread_description,
|
||||
comments=comments_list
|
||||
)
|
||||
social_post.set_timestamp(timestamp_ms=time)
|
||||
social_posts.append(social_post)
|
||||
|
||||
return social_posts[:limit]
|
||||
@@ -1,10 +1,9 @@
|
||||
import os
|
||||
from praw import Reddit # type: ignore
|
||||
from praw.models import Submission # type: ignore
|
||||
from app.api.core.social import SocialWrapper, SocialPost, SocialComment
|
||||
from app.api.core.social import *
|
||||
|
||||
|
||||
MAX_COMMENTS = 5
|
||||
# metterne altri se necessario.
|
||||
# fonti: https://lkiconsulting.io/marketing/best-crypto-subreddits/
|
||||
SUBREDDITS = [
|
||||
@@ -24,13 +23,13 @@ SUBREDDITS = [
|
||||
|
||||
def extract_post(post: Submission) -> SocialPost:
|
||||
social = SocialPost()
|
||||
social.time = str(post.created)
|
||||
social.set_timestamp(timestamp_ms=post.created)
|
||||
social.title = post.title
|
||||
social.description = post.selftext
|
||||
|
||||
for top_comment in post.comments:
|
||||
comment = SocialComment()
|
||||
comment.time = str(top_comment.created)
|
||||
comment.set_timestamp(timestamp_ms=top_comment.created)
|
||||
comment.description = top_comment.body
|
||||
social.comments.append(comment)
|
||||
|
||||
|
||||
46
src/app/api/social/x.py
Normal file
46
src/app/api/social/x.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import os
|
||||
import json
|
||||
import subprocess
|
||||
from shutil import which
|
||||
from app.api.core.social import SocialWrapper, SocialPost
|
||||
|
||||
|
||||
# This is the list of users that can be interesting
|
||||
# To get the ID of a new user is necessary to search it on X, copy the url and insert it in a service like "https://get-id-x.foundtt.com/en/"
|
||||
X_USERS = [
|
||||
'watcherguru',
|
||||
'Cointelegraph',
|
||||
'BTC_Archive',
|
||||
'elonmusk'
|
||||
]
|
||||
|
||||
class XWrapper(SocialWrapper):
|
||||
def __init__(self):
|
||||
'''
|
||||
This wrapper uses the rettiwt API to get data from X in order to avoid the rate limits of the free X API,
|
||||
even if improbable this could lead to a ban so do not use the personal account,
|
||||
In order to work it is necessary to install the rettiwt cli tool, for more information visit the official documentation at https://www.npmjs.com/package/rettiwt-api
|
||||
'''
|
||||
|
||||
self.api_key = os.getenv("X_API_KEY")
|
||||
assert self.api_key, "X_API_KEY environment variable not set"
|
||||
assert which('rettiwt') is not None, "Command `rettiwt` not installed"
|
||||
|
||||
|
||||
def get_top_crypto_posts(self, limit:int = 5) -> list[SocialPost]:
|
||||
social_posts: list[SocialPost] = []
|
||||
|
||||
for user in X_USERS:
|
||||
process = subprocess.run(f"rettiwt -k {self.api_key} tweet search -f {str(user)}", capture_output=True)
|
||||
results = process.stdout.decode()
|
||||
json_result = json.loads(results)
|
||||
|
||||
tweets = json_result['list']
|
||||
for tweet in tweets[:limit]:
|
||||
social_post = SocialPost()
|
||||
social_post.time = tweet['createdAt']
|
||||
social_post.title = str(user) + " tweeted: "
|
||||
social_post.description = tweet['fullText']
|
||||
social_posts.append(social_post)
|
||||
|
||||
return social_posts
|
||||
@@ -1,7 +1,7 @@
|
||||
from agno.tools import Toolkit
|
||||
from app.api.wrapper_handler import WrapperHandler
|
||||
from app.api.core.social import SocialPost, SocialWrapper
|
||||
from app.api.social import RedditWrapper
|
||||
from app.api.social import *
|
||||
|
||||
|
||||
class SocialAPIsTool(SocialWrapper, Toolkit):
|
||||
@@ -23,7 +23,7 @@ class SocialAPIsTool(SocialWrapper, Toolkit):
|
||||
- RedditWrapper.
|
||||
"""
|
||||
|
||||
wrappers: list[type[SocialWrapper]] = [RedditWrapper]
|
||||
wrappers: list[type[SocialWrapper]] = [RedditWrapper, XWrapper, ChanWrapper]
|
||||
self.handler = WrapperHandler.build_wrappers(wrappers)
|
||||
|
||||
Toolkit.__init__( # type: ignore
|
||||
|
||||
22
tests/api/test_social_4chan.py
Normal file
22
tests/api/test_social_4chan.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import re
|
||||
import pytest
|
||||
from app.api.social.chan import ChanWrapper
|
||||
|
||||
@pytest.mark.social
|
||||
@pytest.mark.api
|
||||
class TestChanWrapper:
|
||||
def test_initialization(self):
|
||||
wrapper = ChanWrapper()
|
||||
assert wrapper is not None
|
||||
|
||||
def test_get_top_crypto_posts(self):
|
||||
wrapper = ChanWrapper()
|
||||
posts = wrapper.get_top_crypto_posts(limit=2)
|
||||
assert isinstance(posts, list)
|
||||
assert len(posts) == 2
|
||||
for post in posts:
|
||||
assert post.title != ""
|
||||
assert post.time != ""
|
||||
assert re.match(r'\d{4}-\d{2}-\d{2}', post.time)
|
||||
assert isinstance(post.comments, list)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
import re
|
||||
import pytest
|
||||
from app.api.social.reddit import MAX_COMMENTS, RedditWrapper
|
||||
|
||||
@@ -18,6 +19,8 @@ class TestRedditWrapper:
|
||||
assert len(posts) == 2
|
||||
for post in posts:
|
||||
assert post.title != ""
|
||||
assert re.match(r'\d{4}-\d{2}-\d{2}', post.time)
|
||||
|
||||
assert isinstance(post.comments, list)
|
||||
assert len(post.comments) <= MAX_COMMENTS
|
||||
for comment in post.comments:
|
||||
22
tests/api/test_social_x_api.py
Normal file
22
tests/api/test_social_x_api.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import os
|
||||
import re
|
||||
import pytest
|
||||
from app.api.social.x import XWrapper
|
||||
|
||||
@pytest.mark.social
|
||||
@pytest.mark.api
|
||||
@pytest.mark.skipif(not os.getenv("X_API_KEY"), reason="X_API_KEY not set in environment variables")
|
||||
class TestXWrapper:
|
||||
def test_initialization(self):
|
||||
wrapper = XWrapper()
|
||||
assert wrapper is not None
|
||||
|
||||
def test_get_top_crypto_posts(self):
|
||||
wrapper = XWrapper()
|
||||
posts = wrapper.get_top_crypto_posts(limit=2)
|
||||
assert isinstance(posts, list)
|
||||
assert len(posts) == 2
|
||||
for post in posts:
|
||||
assert post.title != ""
|
||||
assert re.match(r'\d{4}-\d{2}-\d{2}', post.time)
|
||||
assert isinstance(post.comments, list)
|
||||
Reference in New Issue
Block a user