From bff23b1ffde4b5060b45205196b233ff43c779a7 Mon Sep 17 00:00:00 2001 From: StochasticMouse Date: Sat, 20 Jan 2024 15:12:58 +0100 Subject: [PATCH] divided into simpler functions --- saturn_cli.py | 359 +++++++++++++++++++++++++------------------------- 1 file changed, 182 insertions(+), 177 deletions(-) diff --git a/saturn_cli.py b/saturn_cli.py index 1c8405c..f87c7b2 100644 --- a/saturn_cli.py +++ b/saturn_cli.py @@ -1,181 +1,186 @@ -def modulo_scarica(): - from bs4 import BeautifulSoup as bs - import csv, jsbeautifier, cloudscraper, os, re, smtplib - from email.mime.text import MIMEText - from json import loads - from datetime import datetime - #from multiprocessing.dummy import Pool as ThreadPool - from yt_dlp import YoutubeDL +from bs4 import BeautifulSoup as bs +import csv, jsbeautifier, cloudscraper, os, re, smtplib +from email.mime.text import MIMEText +from json import loads +from datetime import datetime +from yt_dlp import YoutubeDL - r = cloudscraper.create_scraper() - with open("animelist.csv",mode='r') as animecsv: - csvfile=csv.reader(animecsv) - file=list(csvfile) - animecsv.close() - with open("cfg.json",mode='r') as config: - configs=config.read() - configs=loads(configs) - config.close() - def get_soup(url): #dall'url outputta la soup della pagina - page=r.get(url) - data=page.text - return bs(data, features="html.parser") - def new_email(subject,body): - msg = MIMEText(body) - msg['Subject'] = subject - msg['From'] = configs['email'] - msg['To'] = configs['email'] - with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp_server: - smtp_server.login(configs['email'], configs['email_password']) - smtp_server.sendmail(configs['email'], configs['email'], msg.as_string()) - print('Email successfully sent!') - def tutti_gli_episodi(url): #trova tutti gli episodi dalla pagina descrittiva dell'anime - cosette=get_soup(url).find('div', {'class':'tab-content'}).find_all('a') - return [episodi['href'] for episodi in cosette] - def link_ep_da_url(url): #prende la pagina del "Guarda lo Streaming" e trova il link per guardarlo - #print('ledu') - cosetta=get_soup(url).find('div', {'class':"btn btn-light w-100 mt-3 mb-3"}).parent - return cosetta['href'] - def links_ep_da_scaricare(url): #prende il link output di link_ep_da_url e cerca al suo interno tutti i link di file di tutti i provider possibili ritornando un dict - #print('leds') - soup=get_soup(url) - cosetta=soup.find('div', {'class':'main-container'}).find('source') - links={} - if cosetta: - links.update({'legacy':cosetta['src']}) #legacy video format - else: - try: - links.update({'saturn':soup.findAll('script',{'type':'text/javascript'})[2].text.split('\n')[2].strip().split('"')[1]}) #new video format - except Exception as e: - pass - try: - prelink=[link['href'] for link in soup.find('div',{'class':'dropdown-menu'}).find_all('a')] - except Exception as e: - prelink=[] - for link in prelink: - try: - button=get_soup(link).find('i', {'class':'bi bi-download'}).parent - links.update({button.text.split(' ')[-1].lower():button.parent['href']}) - except Exception as e: - pass - return links - def link_down(named_urls): #prende il dict di links_ep_da_scaricare e per quanto può converte i siti in link scaricabili tramite yt-dlp - #print('link-down') - #print(named_urls) - links={} - if named_urls.get('saturn'): - links.update({'saturn':named_urls['saturn']}) - if named_urls.get('legacy'): - links.update({'legacy':named_urls['legacy']}) - if named_urls.get('streamtape'): - soup=get_soup(named_urls['streamtape']) - try: - link = soup.find_all(string=re.compile("""document.getElementById"""))[-2].split('\n')[-1].split("'") #fa magie per estrarre il link del download - links.update({'streamtape':'https:'+link[3]+link[5][3:]+'&dl=1'}) - except Exception as e: - pass - if named_urls.get('streamhide'): - soup=get_soup(named_urls['streamhide']) - try: - script=soup.find('script',attrs={"type": "text/javascript"},string=re.compile('m3u8')).text - links.update({'streamhide':[el for el in script.split('"') if 'm3u8' in el][0]}) - except Exception as e: - pass - #print(links) - return links - def scarica(lista): #la lista sarà nella forma filepath,url e prova in ordine di priorità da conf - orario=datetime.now().strftime("%d/%m/%Y %H:%M:%S") - priority=configs['DownPriority'] - urls_file=link_down(links_ep_da_scaricare(link_ep_da_url(lista[1]))) - for provider in priority: - try: - print('['+orario+']'+'Avvio download '+provider+' per '+lista[0].split('/')[-1]) - #open(lista[0], 'wb').write(r.get(urls_file['provider']).content) - with YoutubeDL({'outtmpl':{'default':lista[0]},'quiet':True,'no_warnings':True,'ignoreerrors':False,'retries':10}) as ydl: - ydl.download(urls_file[provider]) - print('['+orario+']'+'Successo per '+lista[0].split('/')[-1]) - return - except Exception as e: - print(e) - print('['+orario+']'+'Errore '+provider+' per '+lista[0].split('/')[-1]) - #lines 0 è url ep, lines 1 è cartella download, lines 2 è nome anime - lista_email=[] - for lines in file[1:]: - ora=datetime.now() - orario=ora.strftime("%d/%m/%Y %H:%M:%S") - if int(lines[3]): - try: - lista_ep=tutti_gli_episodi(lines[0]) #ho trovato la lista dei num episodi - #print(lista_ep) - if not os.path.exists(os.path.join(configs['DownDir'],lines[1])): #se la cartella down non c'è falla - os.makedirs(os.path.join(configs['DownDir'],lines[1])) - ep_gia_scar=os.listdir(os.path.join(configs['DownDir'],lines[1])) - if len(ep_gia_scar)==len(lista_ep): - print('['+orario+']'+"Nessun aggiornamento per "+lines[1]) - else: - results=[] #lista in cui ci saranno i link da scaricare - for episodi in lista_ep: - numero=episodi.split('-')[-1] #estrae il numero dell'episodio dall'url - #print(numero) - #print(lines) - if len(lines)>4 and lines[4]: #se ho la new naming convention - numerello = format(int(float(numero)),'03d') - if float(numero)%1: - virgola = '.'+format(float(numero),'.1f').split('.')[-1] - else: - virgola = '' - nome_ep=lines[2] + 'E' + numerello + virgola + '.mp4' - else: - nome_ep=lines[2]+' Ep '+ numero + '.mp4' #definisco nome dell'episodio - if nome_ep in ep_gia_scar: - print('['+orario+']'+lines[1]+'/'+nome_ep+' è già presente nella cartella') - else: #scarico l'ep mancante - filepath=os.path.join(configs['DownDir'],lines[1], nome_ep) - results.append((filepath,episodi)) - for ep in results: - scarica(ep) - lista_email.append((os.path.basename(ep[0]),0)) - ''' - print(results) - pool=ThreadPool(1) - pool.map(scarica,results) - pool.close() - pool.join() - ''' - except Exception as e: - print(e) - lista_email.append((lines[1],1)) - print('['+orario+']'+lines[1]+'/'+lines[2]+': Errore Critico ottenendo la lista episodi') - else: - print('['+orario+']'+"Skip "+lines[2]) - if configs['email']: #se nei config email è vuota non inviamo nulla - novità=[el for el in lista_email if el[1]==0] - errori=[el for el in lista_email if el[1]==1] - #so che potrei scrivere con meno controlli le righe sotto ma per facilità di lettura faccio controlli inutili - if novità and errori: - subject='Saturn_cli: Errori e Novità' - body="Yo sono usciti nuovi episodi, valli a vedere soldato ;)\n" - for el in novità: - body+=f"{ el[0] } è stato aggiunto con successo!\n" - body+="\nOhibò ci sono stati degli errori :(\n" - for el in errori: - body+=f"{ el[0] }, oof ci sono stati errori con questo anime\n" - body+="\nFine resoconto, sono sbalordito delle mie capacità o_o" - new_email(subject,body) - elif novità: - subject='Saturn_cli: Novità' - body="Yo sono usciti nuovi episodi, valli a vedere soldato ;)\n" - for el in novità: - body+=f"{ el[0] } è stato aggiunto con successo!\n" - body+="\nFine resoconto, sono sbalordito delle mie capacità o_o" - new_email(subject,body) - elif errori: - subject='Saturn_cli: Errori' - body="Ohibò ci sono stati degli errori :(\n" - for el in errori: - body+=f"{ el[0] }, oof ci sono stati errori con questo anime\n" - body+="\nFine resoconto, sono sbalordito delle mie capacità o_o" - new_email(subject,body) +def get_animu(): + with open("animelist.csv",mode='r') as animecsv: + csvfile=csv.reader(animecsv) + file=list(csvfile) + return file + +def get_config(): + with open("cfg.json",mode='r') as config: + configs=config.read() + configs=loads(configs) + return configs + +def get_soup(url): #dall'url outputta la soup della pagina + r = cloudscraper.create_scraper() + page=r.get(url) + data=page.text + return bs(data, features="html.parser") + +def new_email(subject,body,configs): + msg = MIMEText(body) + msg['Subject'] = subject + msg['From'] = configs['email'] + msg['To'] = configs['email'] + with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp_server: + smtp_server.login(configs['email'], configs['email_password']) + smtp_server.sendmail(configs['email'], configs['email'], msg.as_string()) + print('Email successfully sent!') + +def tutti_gli_episodi(url): #trova tutti gli episodi dalla pagina descrittiva dell'anime + cosette=get_soup(url).find('div', {'class':'tab-content'}).find_all('a') + return [episodi['href'] for episodi in cosette] + +def link_ep_da_url(url): #prende la pagina del "Guarda lo Streaming" e trova il link per guardarlo + #print('link_ep_da_url') + cosetta=get_soup(url).find('div', {'class':"btn btn-light w-100 mt-3 mb-3"}).parent + return cosetta['href'] + +def links_ep_da_scaricare(url): #prende il link output di link_ep_da_url e cerca al suo interno tutti i link di file di tutti i provider possibili ritornando un dict + #print('links_ep_da_scaricare') + soup=get_soup(url) + cosetta=soup.find('div', {'class':'main-container'}).find('source') + links={} + if cosetta: + links.update({'legacy':cosetta['src']}) #legacy video format + else: + try: + links.update({'saturn':soup.findAll('script',{'type':'text/javascript'})[2].text.split('\n')[2].strip().split('"')[1]}) #new video format + except Exception as e: + pass + try: + prelink=[link['href'] for link in soup.find('div',{'class':'dropdown-menu'}).find_all('a')] + except Exception as e: + prelink=[] + for link in prelink: + try: + button=get_soup(link).find('i', {'class':'bi bi-download'}).parent + links.update({button.text.split(' ')[-1].lower():button.parent['href']}) + except Exception as e: + pass + return links + +def link_down(named_urls): #prende il dict di links_ep_da_scaricare e per quanto può converte i siti in link scaricabili tramite yt-dlp + #print('link-down') + #print(named_urls) + links={} + if named_urls.get('saturn'): + links.update({'saturn':named_urls['saturn']}) + if named_urls.get('legacy'): + links.update({'legacy':named_urls['legacy']}) + if named_urls.get('streamtape'): + soup=get_soup(named_urls['streamtape']) + try: + link = soup.find_all(string=re.compile("""document.getElementById"""))[-2].split('\n')[-1].split("'") #fa magie per estrarre il link del download + links.update({'streamtape':'https:'+link[3]+link[5][3:]+'&dl=1'}) + except Exception as e: + pass + if named_urls.get('streamhide'): + soup=get_soup(named_urls['streamhide']) + try: + script=soup.find('script',attrs={"type": "text/javascript"},string=re.compile('m3u8')).text + links.update({'streamhide':[el for el in script.split('"') if 'm3u8' in el][0]}) + except Exception as e: + pass + #print(links) + return links + +def scarica(lista,configs): #la lista sarà nella forma filepath,url e prova in ordine di priorità da conf + orario=datetime.now().strftime("%d/%m/%Y %H:%M:%S") + priority=configs['DownPriority'] + urls_file=link_down(links_ep_da_scaricare(link_ep_da_url(lista[1]))) + for provider in priority: + try: + print('['+orario+']'+'Avvio download '+provider+' per '+lista[0].split('/')[-1]) + #open(lista[0], 'wb').write(r.get(urls_file['provider']).content) + with YoutubeDL({'outtmpl':{'default':lista[0]},'quiet':True,'no_warnings':True,'ignoreerrors':False,'retries':10}) as ydl: + ydl.download(urls_file[provider]) + print('['+orario+']'+'Successo per '+lista[0].split('/')[-1]) + return + except Exception as e: + print(e) + print('['+orario+']'+'Errore '+provider+' per '+lista[0].split('/')[-1]) + +def modulo_scarica(): + file = get_animu() + configs = get_config() + #lines 0 è url ep, lines 1 è cartella download, lines 2 è nome anime + lista_email=[] + for lines in file[1:]: + ora=datetime.now() + orario=ora.strftime("%d/%m/%Y %H:%M:%S") + if int(lines[3]): + try: + lista_ep=tutti_gli_episodi(lines[0]) #ho trovato la lista dei num episodi + #print(lista_ep) + if not os.path.exists(os.path.join(configs['DownDir'],lines[1])): #se la cartella down non c'è falla + os.makedirs(os.path.join(configs['DownDir'],lines[1])) + ep_gia_scar=os.listdir(os.path.join(configs['DownDir'],lines[1])) + if len(ep_gia_scar)==len(lista_ep): + print('['+orario+']'+"Nessun aggiornamento per "+lines[1]) + else: + results=[] #lista in cui ci saranno i link da scaricare + for episodi in lista_ep: + numero=episodi.split('-')[-1] #estrae il numero dell'episodio dall'url + #print(numero) + #print(lines) + if len(lines)>4 and lines[4]: #se ho la new naming convention + numerello = format(int(float(numero)),'03d') + if float(numero)%1: + virgola = '.'+format(float(numero),'.1f').split('.')[-1] + else: + virgola = '' + nome_ep=lines[2] + 'E' + numerello + virgola + '.mp4' + else: + nome_ep=lines[2]+' Ep '+ numero + '.mp4' #definisco nome dell'episodio + if nome_ep in ep_gia_scar: + print('['+orario+']'+lines[1]+'/'+nome_ep+' è già presente nella cartella') + else: #scarico l'ep mancante + filepath=os.path.join(configs['DownDir'],lines[1], nome_ep) + results.append((filepath,episodi)) + for ep in results: + scarica(ep,configs) + lista_email.append((os.path.basename(ep[0]),0)) + except Exception as e: + print(e) + lista_email.append((lines[1],1)) + print('['+orario+']'+lines[1]+'/'+lines[2]+': Errore Critico ottenendo la lista episodi') + else: + print('['+orario+']'+"Skip "+lines[2]) + if configs['email']: #se nei config email è vuota non inviamo nulla + novità=[el for el in lista_email if el[1]==0] + errori=[el for el in lista_email if el[1]==1] + #so che potrei scrivere con meno controlli le righe sotto ma per facilità di lettura faccio controlli inutili + if novità and errori: + subject='Saturn_cli: Errori e Novità' + body="Yo sono usciti nuovi episodi, valli a vedere soldato ;)\n" + for el in novità: + body+=f"{ el[0] } è stato aggiunto con successo!\n" + body+="\nOhibò ci sono stati degli errori :(\n" + for el in errori: + body+=f"{ el[0] }, oof ci sono stati errori con questo anime\n" + body+="\nFine resoconto, sono sbalordito delle mie capacità o_o" + new_email(subject,body,configs) + elif novità: + subject='Saturn_cli: Novità' + body="Yo sono usciti nuovi episodi, valli a vedere soldato ;)\n" + for el in novità: + body+=f"{ el[0] } è stato aggiunto con successo!\n" + body+="\nFine resoconto, sono sbalordito delle mie capacità o_o" + new_email(subject,body,configs) + elif errori: + subject='Saturn_cli: Errori' + body="Ohibò ci sono stati degli errori :(\n" + for el in errori: + body+=f"{ el[0] }, oof ci sono stati errori con questo anime\n" + body+="\nFine resoconto, sono sbalordito delle mie capacità o_o" + new_email(subject,body,configs) if __name__ == "__main__": modulo_scarica()