49 lines
1.9 KiB
Python
49 lines
1.9 KiB
Python
import os
|
|
import json
|
|
import subprocess
|
|
from shutil import which
|
|
from datetime import datetime
|
|
from app.api.core.social import SocialWrapper, SocialPost
|
|
|
|
|
|
# This is the list of users that can be interesting
|
|
# To get the ID of a new user is necessary to search it on X, copy the url and insert it in a service like "https://get-id-x.foundtt.com/en/"
|
|
X_USERS = [
|
|
'watcherguru',
|
|
'Cointelegraph',
|
|
'BTC_Archive',
|
|
'elonmusk'
|
|
]
|
|
|
|
class XWrapper(SocialWrapper):
|
|
def __init__(self):
|
|
'''
|
|
This wrapper uses the rettiwt API to get data from X in order to avoid the rate limits of the free X API,
|
|
even if improbable this could lead to a ban so do not use the personal account,
|
|
In order to work it is necessary to install the rettiwt cli tool, for more information visit the official documentation at https://www.npmjs.com/package/rettiwt-api
|
|
'''
|
|
|
|
self.api_key = os.getenv("X_API_KEY")
|
|
assert self.api_key, "X_API_KEY environment variable not set"
|
|
assert which('rettiwt') is not None, "Command `rettiwt` not installed"
|
|
|
|
|
|
def get_top_crypto_posts(self, limit:int = 5) -> list[SocialPost]:
|
|
posts: list[SocialPost] = []
|
|
|
|
for user in X_USERS:
|
|
cmd = ['rettiwt', '-k', self.api_key, 'tweet', 'search', str(limit), '-f', str(user)]
|
|
process = subprocess.run(cmd, capture_output=True)
|
|
results = process.stdout.decode()
|
|
json_result = json.loads(results)
|
|
|
|
for tweet in json_result.get('list', []):
|
|
time = datetime.fromisoformat(tweet['createdAt'])
|
|
social_post = SocialPost()
|
|
social_post.set_timestamp(timestamp_s=int(time.timestamp()))
|
|
social_post.title = f"{user} tweeted: "
|
|
social_post.description = tweet['fullText']
|
|
posts.append(social_post)
|
|
|
|
return posts
|