Fixed typing in XWrapper

This commit is contained in:
2025-10-20 15:45:49 +02:00
parent 1b7f54e10a
commit c7a58cd960

View File

@@ -1,23 +1,22 @@
'''
Usiamo l'API rettiwt per ottenere dati da X aggirando i limiti dell'API free
Questo potrebbe portare al ban dell'account anche se improbabile, non usare l'account personale
Per farlo funzionare è necessario installare npm in un container docker ed installarlo con npm install -g rettiwt-api dopo essersi connessi al docker
https://www.npmjs.com/package/rettiwt-api
'''
import os import os
import json import json
import subprocess import subprocess
from shutil import which from shutil import which
from app.api.core.social import SocialWrapper, SocialPost from app.api.core.social import SocialWrapper, SocialPost
class XWrapper(SocialWrapper): class XWrapper(SocialWrapper):
def __init__(self): def __init__(self):
''' '''
This wrapper uses the rettiwt API to get data from X in order to avoid the rate limits of the free X API, This wrapper uses the rettiwt API to get data from X in order to avoid the rate limits of the free X API,
even if improbable this could lead to a ban so do not use the personal account, even if improbable this could lead to a ban so do not use the personal account,
In order to work a docker container with npm installed is needed, it's also necessary to install rettiwt in the container with npm install -g rettiwt-api In order to work it is necessary to install the rettiwt cli tool, for more information visit the official documentation at https://www.npmjs.com/package/rettiwt-api
''' '''
self.api_key = os.getenv("X_API_KEY")
assert self.api_key, "X_API_KEY environment variable not set"
assert which('rettiwt') is not None, "Command `rettiwt` not installed"
# This is the list of users that can be interesting # This is the list of users that can be interesting
# To get the ID of a new user is necessary to search it on X, copy the url and insert it in a service like "https://get-id-x.foundtt.com/en/" # To get the ID of a new user is necessary to search it on X, copy the url and insert it in a service like "https://get-id-x.foundtt.com/en/"
self.users = [ self.users = [
@@ -26,47 +25,22 @@ class XWrapper(SocialWrapper):
'BTC_Archive', 'BTC_Archive',
'elonmusk' 'elonmusk'
] ]
self.api_key = os.getenv("X_API_KEY")
assert self.api_key, "X_API_KEY environment variable not set"
''' def get_top_crypto_posts(self, limit:int = 5) -> list[SocialPost]:
# Connection to the docker deamon
self.client = docker.from_env()
# Connect with the relative container
self.container = self.client.containers.get("node_rettiwt")
'''
assert which('rettiwt') is not None, "Command `rettiwt` not installed"
self.social_posts: list[SocialPost] = []
def get_top_crypto_posts(self, limit = 5) -> list[SocialPost]: #-> list[SocialPost]:
'''
Otteniamo i post più recenti da X, il limite si applica al numero di post per ogni utente nella lista interna
'''
social_posts: list[SocialPost] = [] social_posts: list[SocialPost] = []
for user in self.users: for user in self.users:
# This currently doesn't work as intended since it returns the posts in random order process = subprocess.run(f"rettiwt -k {self.api_key} tweet search -f {str(user)}", capture_output=True)
# tweets = self.container.exec_run("rettiwt -k" + self.api_key + " tweet search -f " + str(user), tty=True) results = process.stdout.decode()
tweets = subprocess.run("rettiwt -k" + self.api_key + " tweet search -f " + str(user)) json_result = json.loads(results)
tweets = tweets.output.decode()
tweets = json.loads(tweets) tweets = json_result['list']
tweets: list[dict] = tweets['list'] for tweet in tweets[:limit]:
tweets = tweets[:limit]
for tweet in tweets:
social_post = SocialPost() social_post = SocialPost()
social_post.time = tweet['createdAt'] social_post.time = tweet['createdAt']
social_post.title = str(user) + " tweeted: " social_post.title = str(user) + " tweeted: "
social_post.description = tweet['fullText'] social_post.description = tweet['fullText']
social_posts.append(social_post) social_posts.append(social_post)
self.social_posts = social_posts
return social_posts
def print(self):
i = 1
for post in self.social_posts:
print(f"Post {i}:")
print(f"Time: {post.time}")
print(f"Title: {post.title}")
print(f"Description: {post.description}")
print()
i += 1
# x_wrapper = XWrapper() return social_posts
# social_posts = x_wrapper.get_top_crypto_posts(limit=3)
# x_wrapper.print()