2024-01-20 15:12:58 +01:00
from bs4 import BeautifulSoup as bs
import csv , jsbeautifier , cloudscraper , os , re , smtplib
2025-02-16 13:17:15 +01:00
#from selenium import webdriver
2024-01-20 15:12:58 +01:00
from email . mime . text import MIMEText
from json import loads
from datetime import datetime
from yt_dlp import YoutubeDL
def get_animu ( ) :
with open ( " animelist.csv " , mode = ' r ' ) as animecsv :
csvfile = csv . reader ( animecsv )
file = list ( csvfile )
return file
def get_config ( ) :
with open ( " cfg.json " , mode = ' r ' ) as config :
configs = config . read ( )
configs = loads ( configs )
return configs
def get_soup ( url ) : #dall'url outputta la soup della pagina
2025-02-01 09:25:17 +01:00
#driver.get(url)
#data = driver.page_source
r = cloudscraper . create_scraper ( session )
2025-02-16 13:17:15 +01:00
headers = { " User-Agent " : " Mozilla/5.0 (X11; Linux x86_64; rv:135.0) Gecko/20100101 Firefox/135.0 " }
cookies = { " ASNew-q8 " : " 2fa7ff5d81fa1a0db26b3f696a98dec9 " }
page = r . get ( url , headers = headers , cookies = cookies )
2024-01-20 15:12:58 +01:00
data = page . text
return bs ( data , features = " html.parser " )
def new_email ( subject , body , configs ) :
msg = MIMEText ( body )
msg [ ' Subject ' ] = subject
msg [ ' From ' ] = configs [ ' email ' ]
msg [ ' To ' ] = configs [ ' email ' ]
with smtplib . SMTP_SSL ( ' smtp.gmail.com ' , 465 ) as smtp_server :
smtp_server . login ( configs [ ' email ' ] , configs [ ' email_password ' ] )
smtp_server . sendmail ( configs [ ' email ' ] , configs [ ' email ' ] , msg . as_string ( ) )
print ( ' Email successfully sent! ' )
def tutti_gli_episodi ( url ) : #trova tutti gli episodi dalla pagina descrittiva dell'anime
cosette = get_soup ( url ) . find ( ' div ' , { ' class ' : ' tab-content ' } ) . find_all ( ' a ' )
return [ episodi [ ' href ' ] for episodi in cosette ]
2024-01-20 14:22:00 +01:00
2024-01-20 15:12:58 +01:00
def link_ep_da_url ( url ) : #prende la pagina del "Guarda lo Streaming" e trova il link per guardarlo
#print('link_ep_da_url')
cosetta = get_soup ( url ) . find ( ' div ' , { ' class ' : " btn btn-light w-100 mt-3 mb-3 " } ) . parent
return cosetta [ ' href ' ]
def links_ep_da_scaricare ( url ) : #prende il link output di link_ep_da_url e cerca al suo interno tutti i link di file di tutti i provider possibili ritornando un dict
#print('links_ep_da_scaricare')
soup = get_soup ( url )
cosetta = soup . find ( ' div ' , { ' class ' : ' main-container ' } ) . find ( ' source ' )
links = { }
if cosetta :
links . update ( { ' legacy ' : cosetta [ ' src ' ] } ) #legacy video format
else :
try :
links . update ( { ' saturn ' : soup . findAll ( ' script ' , { ' type ' : ' text/javascript ' } ) [ 2 ] . text . split ( ' \n ' ) [ 2 ] . strip ( ) . split ( ' " ' ) [ 1 ] } ) #new video format
except Exception as e :
pass
try :
prelink = [ link [ ' href ' ] for link in soup . find ( ' div ' , { ' class ' : ' dropdown-menu ' } ) . find_all ( ' a ' ) ]
except Exception as e :
prelink = [ ]
for link in prelink :
try :
button = get_soup ( link ) . find ( ' i ' , { ' class ' : ' bi bi-download ' } ) . parent
links . update ( { button . text . split ( ' ' ) [ - 1 ] . lower ( ) : button . parent [ ' href ' ] } )
except Exception as e :
pass
return links
def link_down ( named_urls ) : #prende il dict di links_ep_da_scaricare e per quanto può converte i siti in link scaricabili tramite yt-dlp
#print('link-down')
#print(named_urls)
links = { }
if named_urls . get ( ' saturn ' ) :
links . update ( { ' saturn ' : named_urls [ ' saturn ' ] } )
if named_urls . get ( ' legacy ' ) :
links . update ( { ' legacy ' : named_urls [ ' legacy ' ] } )
if named_urls . get ( ' streamtape ' ) :
soup = get_soup ( named_urls [ ' streamtape ' ] )
try :
link = soup . find_all ( string = re . compile ( """ document.getElementById """ ) ) [ - 2 ] . split ( ' \n ' ) [ - 1 ] . split ( " ' " ) #fa magie per estrarre il link del download
links . update ( { ' streamtape ' : ' https: ' + link [ 3 ] + link [ 5 ] [ 3 : ] + ' &dl=1 ' } )
except Exception as e :
pass
if named_urls . get ( ' streamhide ' ) :
soup = get_soup ( named_urls [ ' streamhide ' ] )
try :
script = soup . find ( ' script ' , attrs = { " type " : " text/javascript " } , string = re . compile ( ' m3u8 ' ) ) . text
links . update ( { ' streamhide ' : [ el for el in script . split ( ' " ' ) if ' m3u8 ' in el ] [ 0 ] } )
except Exception as e :
pass
#print(links)
return links
def scarica ( lista , configs ) : #la lista sarà nella forma filepath,url e prova in ordine di priorità da conf
orario = datetime . now ( ) . strftime ( " %d / % m/ % Y % H: % M: % S " )
priority = configs [ ' DownPriority ' ]
urls_file = link_down ( links_ep_da_scaricare ( link_ep_da_url ( lista [ 1 ] ) ) )
for provider in priority :
try :
print ( ' [ ' + orario + ' ] ' + ' Avvio download ' + provider + ' per ' + lista [ 0 ] . split ( ' / ' ) [ - 1 ] )
#open(lista[0], 'wb').write(r.get(urls_file['provider']).content)
with YoutubeDL ( { ' outtmpl ' : { ' default ' : lista [ 0 ] } , ' quiet ' : True , ' no_warnings ' : True , ' ignoreerrors ' : False , ' retries ' : 10 } ) as ydl :
ydl . download ( urls_file [ provider ] )
print ( ' [ ' + orario + ' ] ' + ' Successo per ' + lista [ 0 ] . split ( ' / ' ) [ - 1 ] )
return
except Exception as e :
print ( e )
print ( ' [ ' + orario + ' ] ' + ' Errore ' + provider + ' per ' + lista [ 0 ] . split ( ' / ' ) [ - 1 ] )
def modulo_scarica ( ) :
file = get_animu ( )
configs = get_config ( )
lista_email = [ ]
2024-01-22 09:18:20 +01:00
for lines in file [ 1 : ] : #lines 0 è url, lines 1 è cart down, lines 2 è naming episodi, lines 3 è abilitato?, lines 4 è jellynaming?
2024-01-20 15:12:58 +01:00
ora = datetime . now ( )
orario = ora . strftime ( " %d / % m/ % Y % H: % M: % S " )
if int ( lines [ 3 ] ) :
try :
lista_ep = tutti_gli_episodi ( lines [ 0 ] ) #ho trovato la lista dei num episodi
#print(lista_ep)
if not os . path . exists ( os . path . join ( configs [ ' DownDir ' ] , lines [ 1 ] ) ) : #se la cartella down non c'è falla
os . makedirs ( os . path . join ( configs [ ' DownDir ' ] , lines [ 1 ] ) )
ep_gia_scar = os . listdir ( os . path . join ( configs [ ' DownDir ' ] , lines [ 1 ] ) )
if len ( ep_gia_scar ) == len ( lista_ep ) :
print ( ' [ ' + orario + ' ] ' + " Nessun aggiornamento per " + lines [ 1 ] )
else :
results = [ ] #lista in cui ci saranno i link da scaricare
for episodi in lista_ep :
numero = episodi . split ( ' - ' ) [ - 1 ] #estrae il numero dell'episodio dall'url
#print(numero)
#print(lines)
if len ( lines ) > 4 and lines [ 4 ] : #se ho la new naming convention
numerello = format ( int ( float ( numero ) ) , ' 03d ' )
if float ( numero ) % 1 :
virgola = ' . ' + format ( float ( numero ) , ' .1f ' ) . split ( ' . ' ) [ - 1 ]
else :
virgola = ' '
nome_ep = lines [ 2 ] + ' E ' + numerello + virgola + ' .mp4 '
else :
nome_ep = lines [ 2 ] + ' Ep ' + numero + ' .mp4 ' #definisco nome dell'episodio
if nome_ep in ep_gia_scar :
print ( ' [ ' + orario + ' ] ' + lines [ 1 ] + ' / ' + nome_ep + ' è già presente nella cartella ' )
else : #scarico l'ep mancante
filepath = os . path . join ( configs [ ' DownDir ' ] , lines [ 1 ] , nome_ep )
results . append ( ( filepath , episodi ) )
for ep in results :
scarica ( ep , configs )
2024-01-22 09:18:20 +01:00
lista_email . append ( ( ep [ 0 ] , 0 ) )
2024-01-20 15:12:58 +01:00
except Exception as e :
print ( e )
lista_email . append ( ( lines [ 1 ] , 1 ) )
print ( ' [ ' + orario + ' ] ' + lines [ 1 ] + ' / ' + lines [ 2 ] + ' : Errore Critico ottenendo la lista episodi ' )
else :
print ( ' [ ' + orario + ' ] ' + " Skip " + lines [ 2 ] )
if configs [ ' email ' ] : #se nei config email è vuota non inviamo nulla
novità = [ el for el in lista_email if el [ 1 ] == 0 ]
errori = [ el for el in lista_email if el [ 1 ] == 1 ]
#so che potrei scrivere con meno controlli le righe sotto ma per facilità di lettura faccio controlli inutili
if novità and errori :
subject = ' Saturn_cli: Errori e Novità '
body = " Yo sono usciti nuovi episodi, valli a vedere soldato ;) \n "
for el in novità :
body + = f " { el [ 0 ] } è stato aggiunto con successo! \n "
body + = " \n Ohibò ci sono stati degli errori :( \n "
for el in errori :
body + = f " { el [ 0 ] } , oof ci sono stati errori con questo anime \n "
body + = " \n Fine resoconto, sono sbalordito delle mie capacità o_o "
new_email ( subject , body , configs )
elif novità :
subject = ' Saturn_cli: Novità '
body = " Yo sono usciti nuovi episodi, valli a vedere soldato ;) \n "
for el in novità :
body + = f " { el [ 0 ] } è stato aggiunto con successo! \n "
body + = " \n Fine resoconto, sono sbalordito delle mie capacità o_o "
new_email ( subject , body , configs )
elif errori :
subject = ' Saturn_cli: Errori '
body = " Ohibò ci sono stati degli errori :( \n "
for el in errori :
body + = f " { el [ 0 ] } , oof ci sono stati errori con questo anime \n "
body + = " \n Fine resoconto, sono sbalordito delle mie capacità o_o "
new_email ( subject , body , configs )
2024-01-20 14:22:00 +01:00
if __name__ == " __main__ " :
2025-02-01 09:25:17 +01:00
########################to correct the tls error
#import cloudscraper
from requests . adapters import HTTPAdapter
from requests . packages . urllib3 . poolmanager import PoolManager
import ssl
class SSLAdapter ( HTTPAdapter ) :
def init_poolmanager ( self , connections , maxsize , block = False ) :
self . poolmanager = PoolManager (
num_pools = connections ,
maxsize = maxsize ,
block = block ,
ssl_version = ssl . PROTOCOL_TLS ,
ciphers = ' ECDHE-RSA-AES128-GCM-SHA256 ' )
#firefox_options = webdriver.FirefoxOptions()
#firefox_options.add_argument("--profile=/opt/saturn_cli/selenium_profile")
#driver = webdriver.Chrome()
session = cloudscraper . Session ( )
session . mount ( ' https:// ' , SSLAdapter ( ) )
#response = session.get('https://example.com')
#print(response.text)
########################
modulo_scarica ( )
2024-01-20 14:22:00 +01:00