from bs4 import BeautifulSoup as bs import csv, jsbeautifier, cloudscraper, os, re, smtplib from selenium import webdriver from email.mime.text import MIMEText from json import loads from datetime import datetime from yt_dlp import YoutubeDL def get_animu(): with open("animelist.csv",mode='r') as animecsv: csvfile=csv.reader(animecsv) file=list(csvfile) return file def get_config(): with open("cfg.json",mode='r') as config: configs=config.read() configs=loads(configs) return configs def get_soup(url): #dall'url outputta la soup della pagina #driver.get(url) #data = driver.page_source r = cloudscraper.create_scraper(session) page=r.get(url) data=page.text return bs(data, features="html.parser") def new_email(subject,body,configs): msg = MIMEText(body) msg['Subject'] = subject msg['From'] = configs['email'] msg['To'] = configs['email'] with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp_server: smtp_server.login(configs['email'], configs['email_password']) smtp_server.sendmail(configs['email'], configs['email'], msg.as_string()) print('Email successfully sent!') def tutti_gli_episodi(url): #trova tutti gli episodi dalla pagina descrittiva dell'anime cosette=get_soup(url).find('div', {'class':'tab-content'}).find_all('a') return [episodi['href'] for episodi in cosette] def link_ep_da_url(url): #prende la pagina del "Guarda lo Streaming" e trova il link per guardarlo #print('link_ep_da_url') cosetta=get_soup(url).find('div', {'class':"btn btn-light w-100 mt-3 mb-3"}).parent return cosetta['href'] def links_ep_da_scaricare(url): #prende il link output di link_ep_da_url e cerca al suo interno tutti i link di file di tutti i provider possibili ritornando un dict #print('links_ep_da_scaricare') soup=get_soup(url) cosetta=soup.find('div', {'class':'main-container'}).find('source') links={} if cosetta: links.update({'legacy':cosetta['src']}) #legacy video format else: try: links.update({'saturn':soup.findAll('script',{'type':'text/javascript'})[2].text.split('\n')[2].strip().split('"')[1]}) #new video format except Exception as e: pass try: prelink=[link['href'] for link in soup.find('div',{'class':'dropdown-menu'}).find_all('a')] except Exception as e: prelink=[] for link in prelink: try: button=get_soup(link).find('i', {'class':'bi bi-download'}).parent links.update({button.text.split(' ')[-1].lower():button.parent['href']}) except Exception as e: pass return links def link_down(named_urls): #prende il dict di links_ep_da_scaricare e per quanto può converte i siti in link scaricabili tramite yt-dlp #print('link-down') #print(named_urls) links={} if named_urls.get('saturn'): links.update({'saturn':named_urls['saturn']}) if named_urls.get('legacy'): links.update({'legacy':named_urls['legacy']}) if named_urls.get('streamtape'): soup=get_soup(named_urls['streamtape']) try: link = soup.find_all(string=re.compile("""document.getElementById"""))[-2].split('\n')[-1].split("'") #fa magie per estrarre il link del download links.update({'streamtape':'https:'+link[3]+link[5][3:]+'&dl=1'}) except Exception as e: pass if named_urls.get('streamhide'): soup=get_soup(named_urls['streamhide']) try: script=soup.find('script',attrs={"type": "text/javascript"},string=re.compile('m3u8')).text links.update({'streamhide':[el for el in script.split('"') if 'm3u8' in el][0]}) except Exception as e: pass #print(links) return links def scarica(lista,configs): #la lista sarà nella forma filepath,url e prova in ordine di priorità da conf orario=datetime.now().strftime("%d/%m/%Y %H:%M:%S") priority=configs['DownPriority'] urls_file=link_down(links_ep_da_scaricare(link_ep_da_url(lista[1]))) for provider in priority: try: print('['+orario+']'+'Avvio download '+provider+' per '+lista[0].split('/')[-1]) #open(lista[0], 'wb').write(r.get(urls_file['provider']).content) with YoutubeDL({'outtmpl':{'default':lista[0]},'quiet':True,'no_warnings':True,'ignoreerrors':False,'retries':10}) as ydl: ydl.download(urls_file[provider]) print('['+orario+']'+'Successo per '+lista[0].split('/')[-1]) return except Exception as e: print(e) print('['+orario+']'+'Errore '+provider+' per '+lista[0].split('/')[-1]) def modulo_scarica(): file = get_animu() configs = get_config() lista_email=[] for lines in file[1:]: #lines 0 è url, lines 1 è cart down, lines 2 è naming episodi, lines 3 è abilitato?, lines 4 è jellynaming? ora=datetime.now() orario=ora.strftime("%d/%m/%Y %H:%M:%S") if int(lines[3]): try: lista_ep=tutti_gli_episodi(lines[0]) #ho trovato la lista dei num episodi #print(lista_ep) if not os.path.exists(os.path.join(configs['DownDir'],lines[1])): #se la cartella down non c'è falla os.makedirs(os.path.join(configs['DownDir'],lines[1])) ep_gia_scar=os.listdir(os.path.join(configs['DownDir'],lines[1])) if len(ep_gia_scar)==len(lista_ep): print('['+orario+']'+"Nessun aggiornamento per "+lines[1]) else: results=[] #lista in cui ci saranno i link da scaricare for episodi in lista_ep: numero=episodi.split('-')[-1] #estrae il numero dell'episodio dall'url #print(numero) #print(lines) if len(lines)>4 and lines[4]: #se ho la new naming convention numerello = format(int(float(numero)),'03d') if float(numero)%1: virgola = '.'+format(float(numero),'.1f').split('.')[-1] else: virgola = '' nome_ep=lines[2] + 'E' + numerello + virgola + '.mp4' else: nome_ep=lines[2]+' Ep '+ numero + '.mp4' #definisco nome dell'episodio if nome_ep in ep_gia_scar: print('['+orario+']'+lines[1]+'/'+nome_ep+' è già presente nella cartella') else: #scarico l'ep mancante filepath=os.path.join(configs['DownDir'],lines[1], nome_ep) results.append((filepath,episodi)) for ep in results: scarica(ep,configs) lista_email.append((ep[0],0)) except Exception as e: print(e) lista_email.append((lines[1],1)) print('['+orario+']'+lines[1]+'/'+lines[2]+': Errore Critico ottenendo la lista episodi') else: print('['+orario+']'+"Skip "+lines[2]) if configs['email']: #se nei config email è vuota non inviamo nulla novità=[el for el in lista_email if el[1]==0] errori=[el for el in lista_email if el[1]==1] #so che potrei scrivere con meno controlli le righe sotto ma per facilità di lettura faccio controlli inutili if novità and errori: subject='Saturn_cli: Errori e Novità' body="Yo sono usciti nuovi episodi, valli a vedere soldato ;)\n" for el in novità: body+=f"{ el[0] } è stato aggiunto con successo!\n" body+="\nOhibò ci sono stati degli errori :(\n" for el in errori: body+=f"{ el[0] }, oof ci sono stati errori con questo anime\n" body+="\nFine resoconto, sono sbalordito delle mie capacità o_o" new_email(subject,body,configs) elif novità: subject='Saturn_cli: Novità' body="Yo sono usciti nuovi episodi, valli a vedere soldato ;)\n" for el in novità: body+=f"{ el[0] } è stato aggiunto con successo!\n" body+="\nFine resoconto, sono sbalordito delle mie capacità o_o" new_email(subject,body,configs) elif errori: subject='Saturn_cli: Errori' body="Ohibò ci sono stati degli errori :(\n" for el in errori: body+=f"{ el[0] }, oof ci sono stati errori con questo anime\n" body+="\nFine resoconto, sono sbalordito delle mie capacità o_o" new_email(subject,body,configs) if __name__ == "__main__": ########################to correct the tls error #import cloudscraper from requests.adapters import HTTPAdapter from requests.packages.urllib3.poolmanager import PoolManager import ssl class SSLAdapter(HTTPAdapter): def init_poolmanager(self, connections, maxsize, block=False): self.poolmanager = PoolManager( num_pools=connections, maxsize=maxsize, block=block, ssl_version=ssl.PROTOCOL_TLS, ciphers='ECDHE-RSA-AES128-GCM-SHA256') #firefox_options = webdriver.FirefoxOptions() #firefox_options.add_argument("--profile=/opt/saturn_cli/selenium_profile") #driver = webdriver.Chrome() session = cloudscraper.Session() session.mount('https://', SSLAdapter()) #response = session.get('https://example.com') #print(response.text) ######################## modulo_scarica()