Merge pull request #208 from hockeygoalie35/dev
ARLChecker 1.6 - Refactor, Debug mode, Test flag
This commit is contained in:
commit
5810903b97
2 changed files with 193 additions and 147 deletions
|
@ -1,6 +1,6 @@
|
||||||
#!/usr/bin/with-contenv bash
|
#!/usr/bin/with-contenv bash
|
||||||
### Default values
|
### Default values
|
||||||
scriptVersion="1.5"
|
scriptVersion="1.6"
|
||||||
scriptName="ARLChecker"
|
scriptName="ARLChecker"
|
||||||
sleepInterval='24h'
|
sleepInterval='24h'
|
||||||
### Import Settings
|
### Import Settings
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
import re
|
import re
|
||||||
from pathlib import Path
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from requests import Session
|
from requests import Session
|
||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
|
@ -11,36 +10,19 @@ import logging
|
||||||
import os
|
import os
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
# Pull script version from bash script. will likely change this to a var passthrough
|
CUSTOM_INIT_PATH = '/custom-cont_init.d/'
|
||||||
with open("/custom-services.d/ARLChecker", "r") as r:
|
CUSTOM_SERVICES_PATH = '/custom-services.d/'
|
||||||
for line in r:
|
STATUS_FALLBACK_LOCATION = '/custom-services.d/python/ARLStatus.txt'
|
||||||
if 'scriptVersion' in line:
|
EXTENDED_CONF_PATH = '/config/extended.conf'
|
||||||
VERSION = re.search(r'"([A-Za-z0-9_\./\\-]*)"', line)[0].replace('"','')
|
NOT_FOUND_PATH = '/config/extended/logs/notfound'
|
||||||
|
FAILED_DOWNLOADS_PATH = '/config/extended/logs/downloaded/failed/deezer'
|
||||||
# Get current log file
|
LOG_FILES_DIRECTORY = '/config/logs'
|
||||||
path = '/config/logs'
|
DEBUG_ROOT_PATH = './env'
|
||||||
latest_file = max([os.path.join(path, f) for f in os.listdir(path) if 'ARLChecker' in f],key=os.path.getctime)
|
|
||||||
|
|
||||||
# Logging Setup
|
|
||||||
logging.basicConfig(
|
|
||||||
format=f'%(asctime)s :: ARLChecker :: {VERSION} :: %(levelname)s :: %(message)s',
|
|
||||||
datefmt='%Y-%m-%d %H:%M:%S',
|
|
||||||
level=logging.INFO,
|
|
||||||
handlers=[
|
|
||||||
logging.StreamHandler(stdout),
|
|
||||||
logging.FileHandler(latest_file, mode="a")
|
|
||||||
]
|
|
||||||
)
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Initialize colorama
|
|
||||||
init(autoreset=True)
|
|
||||||
|
|
||||||
# Web agent used to access Deezer
|
# Web agent used to access Deezer
|
||||||
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:83.0) Gecko/20100101 Firefox/110.0'
|
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:83.0) Gecko/20100101 Firefox/110.0'
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Plan:
|
class Plan:
|
||||||
name: str
|
name: str
|
||||||
|
@ -85,6 +67,7 @@ class DeezerPlatformProvider:
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
self.log = logging.getLogger('ARLChecker')
|
||||||
self.session = Session()
|
self.session = Session()
|
||||||
self.session.headers.update({'User-Agent': USER_AGENT})
|
self.session.headers.update({'User-Agent': USER_AGENT})
|
||||||
|
|
||||||
|
@ -97,7 +80,7 @@ class DeezerPlatformProvider:
|
||||||
)
|
)
|
||||||
res.raise_for_status()
|
res.raise_for_status()
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
logger.error(Fore.RED + 'Could not connect! Service down, API changed, wrong credentials or code-related issue.' + Fore.LIGHTWHITE_EX)
|
self.log.error(Fore.RED + 'Could not connect! Service down, API changed, wrong credentials or code-related issue.' + Fore.LIGHTWHITE_EX)
|
||||||
raise ConnectionError()
|
raise ConnectionError()
|
||||||
|
|
||||||
self.session.cookies.clear()
|
self.session.cookies.clear()
|
||||||
|
@ -105,17 +88,16 @@ class DeezerPlatformProvider:
|
||||||
try:
|
try:
|
||||||
res = res.json()
|
res = res.json()
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
logger.error(Fore.RED + "Could not parse JSON response from DEEZER!" + Fore.LIGHTWHITE_EX)
|
self.log.error(Fore.RED + "Could not parse JSON response from DEEZER!" + Fore.LIGHTWHITE_EX)
|
||||||
raise ParseError()
|
raise ParseError()
|
||||||
|
|
||||||
if 'error' in res and res['error']:
|
if 'error' in res and res['error']:
|
||||||
logger.error(Fore.RED + "Deezer returned the following error:{}".format(res["error"]) + Fore.LIGHTWHITE_EX)
|
self.log.error(Fore.RED + "Deezer returned the following error:{}".format(res["error"]) + Fore.LIGHTWHITE_EX)
|
||||||
raise ServiceError()
|
raise ServiceError()
|
||||||
|
|
||||||
res = res['results']
|
res = res['results']
|
||||||
|
|
||||||
if res['USER']['USER_ID'] == 0:
|
if res['USER']['USER_ID'] == 0:
|
||||||
logger.error(Fore.RED+"ARL Token Expired. Update the token in extended.conf"+Fore.LIGHTWHITE_EX)
|
|
||||||
raise AuthError()
|
raise AuthError()
|
||||||
|
|
||||||
return Account(username, secret, res['COUNTRY'], Plan(
|
return Account(username, secret, res['COUNTRY'], Plan(
|
||||||
|
@ -130,15 +112,11 @@ class DeezerPlatformProvider:
|
||||||
|
|
||||||
class LidarrExtendedAPI:
|
class LidarrExtendedAPI:
|
||||||
# sets new token to extended.conf
|
# sets new token to extended.conf
|
||||||
def __init__(self, new_arl_token):
|
def __init__(self):
|
||||||
workingDir = Path(os.getcwd())
|
self.root = ''
|
||||||
print(workingDir)
|
self.log = logging.getLogger('ARLChecker')
|
||||||
#self.parentDir = str(workingDir.parents[1])
|
self.newARLToken = None
|
||||||
self.parentDir = str(workingDir.parents[3])
|
self.currentARLToken = None
|
||||||
print(self.parentDir)
|
|
||||||
self.extendedConfDir = self.parentDir + '/config/extended.conf'
|
|
||||||
self.newARLToken = new_arl_token
|
|
||||||
self.arlToken = None
|
|
||||||
self.arlLineText = None
|
self.arlLineText = None
|
||||||
self.arlLineIndex = None
|
self.arlLineIndex = None
|
||||||
self.fileText = None
|
self.fileText = None
|
||||||
|
@ -148,23 +126,19 @@ class LidarrExtendedAPI:
|
||||||
self.telegram_user_chat_id = None
|
self.telegram_user_chat_id = None
|
||||||
self.telegramBotEnableLineText = None
|
self.telegramBotEnableLineText = None
|
||||||
self.telegramBotEnableLineIndex = None
|
self.telegramBotEnableLineIndex = None
|
||||||
|
|
||||||
self.bot = None
|
self.bot = None
|
||||||
self.parse_extended_conf()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def parse_extended_conf(self):
|
def parse_extended_conf(self):
|
||||||
deezer_active = False
|
self.currentARLToken = None
|
||||||
self.arlToken = None
|
|
||||||
arl_token_match = None
|
arl_token_match = None
|
||||||
|
deezer_active = False
|
||||||
re_search_pattern = r'"([^"]*)"'
|
re_search_pattern = r'"([^"]*)"'
|
||||||
try: # Try to open extended.conf and read all text into a var.
|
try: # Try to open extended.conf and read all text into a var.
|
||||||
with open(self.extendedConfDir, 'r', encoding='utf-8') as file:
|
with open(self.root+EXTENDED_CONF_PATH, 'r', encoding='utf-8') as file:
|
||||||
self.fileText = file.readlines()
|
self.fileText = file.readlines()
|
||||||
file.close()
|
file.close()
|
||||||
except:
|
except:
|
||||||
logger.error(f"Could not find {self.extendedConfDir}")
|
self.log.error(f"Could not find {self.root+EXTENDED_CONF_PATH}")
|
||||||
exit(1)
|
exit(1)
|
||||||
# Ensure Deezer is enabled and ARL token is populated
|
# Ensure Deezer is enabled and ARL token is populated
|
||||||
for line in self.fileText:
|
for line in self.fileText:
|
||||||
|
@ -178,14 +152,14 @@ class LidarrExtendedAPI:
|
||||||
|
|
||||||
# ARL Token wrong flag error handling.
|
# ARL Token wrong flag error handling.
|
||||||
if arl_token_match is None:
|
if arl_token_match is None:
|
||||||
logger.error("ARL Token not found in extended.conf. Exiting")
|
self.log.error("ARL Token not found in extended.conf. Exiting.")
|
||||||
exit(1)
|
exit(1)
|
||||||
elif deezer_active is False:
|
elif deezer_active is False:
|
||||||
logger.error("Deezer not set as an active downloader in extended.conf. Exiting")
|
self.log.error("Deezer not set as an active downloader in extended.conf. Exiting.")
|
||||||
file.close()
|
file.close()
|
||||||
exit(1)
|
exit(1)
|
||||||
self.arlToken = arl_token_match[0]
|
self.currentARLToken = arl_token_match[0]
|
||||||
logger.info('ARL Found in extended.conf')
|
self.log.info('ARL Found in extended.conf')
|
||||||
|
|
||||||
for line in self.fileText:
|
for line in self.fileText:
|
||||||
if 'telegramBotEnable=' in line:
|
if 'telegramBotEnable=' in line:
|
||||||
|
@ -197,96 +171,85 @@ class LidarrExtendedAPI:
|
||||||
if 'telegramUserChatID=' in line:
|
if 'telegramUserChatID=' in line:
|
||||||
self.telegram_user_chat_id = re.search(re_search_pattern, line)[0].replace('"', '')
|
self.telegram_user_chat_id = re.search(re_search_pattern, line)[0].replace('"', '')
|
||||||
|
|
||||||
|
|
||||||
if self.enable_telegram_bot:
|
if self.enable_telegram_bot:
|
||||||
logger.info('Telegram bot is enabled.')
|
self.log.info('Telegram bot is enabled.')
|
||||||
|
|
||||||
if self.telegram_bot_token is None or self.telegram_user_chat_id is None:
|
if self.telegram_bot_token is None or self.telegram_user_chat_id is None:
|
||||||
logger.error('Telegram bot token or user chat ID not set in extended.conf. Exiting')
|
self.log.error('Telegram bot token or user chat ID not set in extended.conf. Exiting')
|
||||||
exit(1)
|
exit(1)
|
||||||
else:
|
else:
|
||||||
logger.info('Telegram bot is disabled. Set the flag in extended.conf to enable.')
|
self.log.info('Telegram bot is disabled. Set the flag in extended.conf to enable.')
|
||||||
|
|
||||||
# Uses DeezerPlatformProvider to check if the token is valid
|
def check_token_wrapper(self): # adds Lidarr_extended specific logging and actions around check_token
|
||||||
def check_token(self, token=None):
|
self.log.info("Checking ARL Token from extended.conf")
|
||||||
logger.info('Checking ARL Token Validity...')
|
if self.currentARLToken == '""':
|
||||||
if token == '""':
|
self.log.info(Fore.YELLOW+"No ARL Token set in Extended.conf"+Fore.LIGHTWHITE_EX)
|
||||||
logger.info(Fore.YELLOW+"No ARL Token set in Extended.conf"+Fore.LIGHTWHITE_EX)
|
|
||||||
self.report_status("NOT SET")
|
self.report_status("NOT SET")
|
||||||
exit(0)
|
exit(0)
|
||||||
if token is None:
|
if self.currentARLToken is None:
|
||||||
print('Invalid ARL Token Entry')
|
self.log.error('Invalid ARL Token Entry (None Object)')
|
||||||
return False
|
return False
|
||||||
try:
|
validity_results = check_token(self.currentARLToken)
|
||||||
deezer_check = DeezerPlatformProvider()
|
if validity_results is True:
|
||||||
account = deezer_check.login('', token.replace('"',''))
|
self.report_status('VALID') # For text fallback method
|
||||||
if account.plan:
|
else:
|
||||||
logger.info(Fore.GREEN + f'Deezer Account Found.'+ Fore.LIGHTWHITE_EX)
|
|
||||||
logger.info('-------------------------------')
|
|
||||||
logger.info(f'Plan: {account.plan.name}')
|
|
||||||
logger.info(f'Expiration: {account.plan.expires}')
|
|
||||||
logger.info(f'Active: {Fore.GREEN+"Y" if account.plan.active else "N"}'+Fore.LIGHTWHITE_EX)
|
|
||||||
logger.info(f'Download: {Fore.GREEN+"Y" if account.plan.download else Fore.RED+"N"}'+Fore.LIGHTWHITE_EX)
|
|
||||||
logger.info(f'Lossless: {Fore.GREEN+"Y" if account.plan.lossless else Fore.RED+"N"}'+Fore.LIGHTWHITE_EX)
|
|
||||||
logger.info(f'Explicit: {Fore.GREEN+"Y" if account.plan.explicit else Fore.RED+"N"}'+Fore.LIGHTWHITE_EX)
|
|
||||||
logger.info('-------------------------------')
|
|
||||||
self.report_status('VALID')
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
print(e)
|
|
||||||
self.report_status('EXPIRED')
|
self.report_status('EXPIRED')
|
||||||
if self.telegram_bot_running:
|
self.log.error(Fore.RED + 'Update the token in extended.conf' + Fore.LIGHTWHITE_EX)
|
||||||
|
if self.telegram_bot_running: # Don't re-start the telegram bot if it's already running after bot invalid token entry
|
||||||
return False
|
return False
|
||||||
if self.enable_telegram_bot:
|
if self.enable_telegram_bot:
|
||||||
logger.info('Starting Telegram bot...Check Telegram and follow instructions.')
|
self.log.info(Fore.YELLOW + 'Starting Telegram bot...Check Telegram and follow instructions.' + Fore.LIGHTWHITE_EX)
|
||||||
self.telegram_bot_running = True
|
self.telegram_bot_running = True
|
||||||
self.start_telegram_bot()
|
self.start_telegram_bot()
|
||||||
exit(420)
|
exit(420)
|
||||||
|
|
||||||
def set_new_token(self): # Re-writes extended.conf with previously read-in text, replacing w/ new ARL
|
def set_new_token(self): # Re-writes extended.conf with previously read-in text, replacing w/ new ARL
|
||||||
self.fileText[self.arlLineIndex] = self.arlLineText.replace(self.arlToken, self.newARLToken)
|
self.fileText[self.arlLineIndex] = self.arlLineText.replace(self.currentARLToken, self.newARLToken)
|
||||||
with open(self.extendedConfDir, 'w', encoding='utf-8') as file:
|
with open(self.root+EXTENDED_CONF_PATH, 'w', encoding='utf-8') as file:
|
||||||
file.writelines(self.fileText)
|
file.writelines(self.fileText)
|
||||||
file.close()
|
file.close()
|
||||||
logger.info("New ARL token written to extended.conf")
|
self.log.info("New ARL token written to extended.conf")
|
||||||
|
self.parse_extended_conf()
|
||||||
|
|
||||||
# After new token is set, clean up notfound and failed downloads to bypass the default 30 day wait
|
# After new token is set, clean up notfound and failed downloads to bypass the default 30 day wait
|
||||||
def clear_not_found(self):
|
def clear_not_found(self):
|
||||||
paths = [self.parentDir + '/config/extended/logs/notfound',self.parentDir+'/config/extended/logs/downloaded/failed/deezer']
|
paths = [self.root + NOT_FOUND_PATH, self.root+FAILED_DOWNLOADS_PATH]
|
||||||
for path in paths:
|
for path in paths:
|
||||||
for file in os.listdir(path):
|
for file in os.listdir(path):
|
||||||
file_to_delete = os.path.join(path,file)
|
file_to_delete = os.path.join(path, file)
|
||||||
os.remove(file_to_delete)
|
os.remove(file_to_delete)
|
||||||
|
|
||||||
def report_status(self, status):
|
def report_status(self, status):
|
||||||
f = open("/custom-services.d/python/ARLStatus.txt", "w")
|
f = open(self.root+STATUS_FALLBACK_LOCATION, "w")
|
||||||
now = datetime.strftime(datetime.now(),"%b-%d-%Y at %H:%M:%S")
|
now = datetime.strftime(datetime.now(), "%b-%d-%Y at %H:%M:%S")
|
||||||
f.write(f"{now}: ARL Token is {status}.{' Please update arlToken in extended.conf' if status=='EXPIRED' else ''}")
|
f.write(f"{now}: ARL Token is {status}.{' Please update arlToken in extended.conf' if status=='EXPIRED' else ''}")
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
def start_telegram_bot(self):
|
def start_telegram_bot(self):
|
||||||
self.bot = TelegramBotControl(self,self.telegram_bot_token,self.telegram_user_chat_id)
|
self.bot = TelegramBotControl(self, self.telegram_bot_token, self.telegram_user_chat_id)
|
||||||
|
|
||||||
def disable_telegram_bot(self):
|
def disable_telegram_bot(self):
|
||||||
compiled = re.compile(re.escape('true'), re.IGNORECASE)
|
compiled = re.compile(re.escape('true'), re.IGNORECASE)
|
||||||
self.fileText[self.telegramBotEnableLineIndex] = compiled.sub('false', self.telegramBotEnableLineText)
|
self.fileText[self.telegramBotEnableLineIndex] = compiled.sub('false', self.telegramBotEnableLineText)
|
||||||
with open(self.extendedConfDir, 'w', encoding='utf-8') as file:
|
with open(self.root+EXTENDED_CONF_PATH, 'w', encoding='utf-8') as file:
|
||||||
file.writelines(self.fileText)
|
file.writelines(self.fileText)
|
||||||
file.close()
|
file.close()
|
||||||
logger.info("Telegram Bot Disabled.")
|
self.log.info("Telegram Bot Disabled.")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class TelegramBotControl:
|
class TelegramBotControl:
|
||||||
def __init__(self, parent,telegram_bot_token,telegram_user_chat_id):
|
def __init__(self, parent, telegram_bot_token, telegram_user_chat_id):
|
||||||
|
self.log = logging.getLogger('ARLChecker')
|
||||||
async def send_expired_token_notification(application):
|
|
||||||
await application.bot.sendMessage(chat_id=self.telegram_chat_id,text='---\U0001F6A8WARNING\U0001F6A8-----\nARL TOKEN EXPIRED\n Update Token by running "/set_token <TOKEN>"\n You can find a new ARL at:\nhttps://rentry.org/firehawk52#deezer-arls\n\n\n Other Commands:\n/cancel - Cancel this session\n/disable - Disable Telegram Bot',disable_web_page_preview=True)
|
|
||||||
# TODO: Get Chat ID/ test on new bot
|
|
||||||
|
|
||||||
self.parent = parent
|
self.parent = parent
|
||||||
self.telegram_bot_token = telegram_bot_token
|
self.telegram_bot_token = telegram_bot_token
|
||||||
self.telegram_chat_id = telegram_user_chat_id
|
self.telegram_chat_id = telegram_user_chat_id
|
||||||
|
|
||||||
|
# Send initial notification
|
||||||
|
async def send_expired_token_notification(application):
|
||||||
|
await application.bot.sendMessage(chat_id=self.telegram_chat_id, text='---\U0001F6A8WARNING\U0001F6A8-----\nARL TOKEN EXPIRED\n Update Token by running "/set_token <TOKEN>"\n You can find a new ARL at:\nhttps://rentry.org/firehawk52#deezer-arls\n\n\n Other Commands:\n/cancel - Cancel this session\n/disable - Disable Telegram Bot', disable_web_page_preview=True)
|
||||||
|
self.log.info(Fore.YELLOW + "Telegram Bot Sent ARL Token Expiry Message " + Fore.LIGHTWHITE_EX)
|
||||||
|
# TODO: Get Chat ID/ test on new bot
|
||||||
|
|
||||||
# start bot control
|
# start bot control
|
||||||
self.application = ApplicationBuilder().token(self.telegram_bot_token).post_init(send_expired_token_notification).build()
|
self.application = ApplicationBuilder().token(self.telegram_bot_token).post_init(send_expired_token_notification).build()
|
||||||
token_handler = CommandHandler('set_token', self.set_token)
|
token_handler = CommandHandler('set_token', self.set_token)
|
||||||
|
@ -297,93 +260,176 @@ class TelegramBotControl:
|
||||||
self.application.add_handler(disable_handler)
|
self.application.add_handler(disable_handler)
|
||||||
self.application.run_polling(allowed_updates=Update.ALL_TYPES)
|
self.application.run_polling(allowed_updates=Update.ALL_TYPES)
|
||||||
|
|
||||||
|
|
||||||
async def disable_bot(self, update, context: ContextTypes.DEFAULT_TYPE):
|
async def disable_bot(self, update, context: ContextTypes.DEFAULT_TYPE):
|
||||||
self.parent.disable_telegram_bot()
|
self.parent.disable_telegram_bot()
|
||||||
await update.message.reply_text('Disabled Telegram Bot. \U0001F614\nIf you would like to re-enable,\nset telegramBotEnable to true\nin extended.conf')
|
await update.message.reply_text('Disabled Telegram Bot. \U0001F614\nIf you would like to re-enable,\nset telegramBotEnable to true\nin extended.conf')
|
||||||
|
self.log.info(Fore.YELLOW + 'Telegram Bot: Send Disable Bot Message :(' + Fore.LIGHTWHITE_EX)
|
||||||
self.application.stop_running()
|
self.application.stop_running()
|
||||||
|
|
||||||
|
|
||||||
async def cancel(self, update, context: ContextTypes.DEFAULT_TYPE):
|
async def cancel(self, update, context: ContextTypes.DEFAULT_TYPE):
|
||||||
await update.message.reply_text('Canceling...ARLToken is still expired.')
|
await update.message.reply_text('Canceling...ARLToken is still expired.')
|
||||||
|
self.log.info(Fore.YELLOW + 'Telegram Bot: Canceling...ARLToken is still expired.' + Fore.LIGHTWHITE_EX)
|
||||||
try:
|
try:
|
||||||
self.application.stop_running()
|
self.application.stop_running()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def set_token(self, update, context: ContextTypes.DEFAULT_TYPE):
|
async def set_token(self, update, context: ContextTypes.DEFAULT_TYPE):
|
||||||
|
async def send_message(text, reply=False):
|
||||||
|
if reply is True:
|
||||||
|
await update.message.reply_text(text=text)
|
||||||
|
else:
|
||||||
|
await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
|
||||||
|
self.log.info(Fore.YELLOW+"Telegram Bot: " + text + Fore.LIGHTWHITE_EX)
|
||||||
try:
|
try:
|
||||||
new_token = update.message.text.split('/set_token ')[1]
|
new_token = update.message.text.split('/set_token ')[1]
|
||||||
if new_token == '':
|
if new_token == '':
|
||||||
raise Exception
|
raise Exception
|
||||||
except:
|
except:
|
||||||
await update.message.reply_text('Invalid Entry... please try again.')
|
await update.message.reply_text('Invalid Entry... Please try again.')
|
||||||
return
|
return
|
||||||
print(new_token)
|
self.log.info(Fore.YELLOW+f"Telegram Bot:Token received: {new_token}" + Fore.LIGHTWHITE_EX)
|
||||||
logger.info("Testing ARL Token Validity...")
|
token_validity = check_token(new_token)
|
||||||
token_validity = self.parent.check_token(new_token)
|
|
||||||
if token_validity:
|
if token_validity:
|
||||||
await context.bot.send_message(chat_id=update.effective_chat.id, text="ARL valid, applying...")
|
await send_message("ARL valid, applying...")
|
||||||
self.parent.newARLToken = '"'+new_token+'"'
|
self.parent.newARLToken = '"'+new_token+'"'
|
||||||
self.parent.set_new_token()
|
self.parent.set_new_token()
|
||||||
self.parent.arlToken = self.parent.newARLToken
|
await send_message("Checking configuration...")
|
||||||
# TODO Fix this garbage - move functionality out of telegram stuff
|
|
||||||
await context.bot.send_message(chat_id=update.effective_chat.id, text="Checking configuration")
|
|
||||||
# reparse extended.conf
|
# reparse extended.conf
|
||||||
self.parent.parse_extended_conf()
|
self.parent.parse_extended_conf()
|
||||||
token_validity = self.parent.check_token(self.parent.arlToken)
|
token_validity = check_token(self.parent.currentARLToken)
|
||||||
if token_validity:
|
if token_validity:
|
||||||
await context.bot.send_message(chat_id=update.effective_chat.id, text="ARL Updated! \U0001F44D")
|
await send_message("ARL Token Updated! \U0001F44D", reply=True)
|
||||||
try:
|
try:
|
||||||
await self.application.stop_running()
|
self.application.stop_running()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
else:# If Token invalid
|
else: # If Token invalid
|
||||||
await update.message.reply_text(text="Token expired or inactive. try another token.")
|
await send_message("Token expired or invalid. Try another token.", reply=True)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def check_token(token=None):
|
||||||
|
log = logging.getLogger('ARLChecker')
|
||||||
|
log.info(f"ARL Token to check: {token}")
|
||||||
|
log.info('Checking ARL Token Validity...')
|
||||||
|
try:
|
||||||
|
deezer_check = DeezerPlatformProvider()
|
||||||
|
account = deezer_check.login('', token.replace('"', ''))
|
||||||
|
if account.plan:
|
||||||
|
log.info(Fore.GREEN + f'Deezer Account Found.' + Fore.LIGHTWHITE_EX)
|
||||||
|
log.info('-------------------------------')
|
||||||
|
log.info(f'Plan: {account.plan.name}')
|
||||||
|
log.info(f'Expiration: {account.plan.expires}')
|
||||||
|
log.info(f'Active: {Fore.GREEN+"Y" if account.plan.active else "N"}'+Fore.LIGHTWHITE_EX)
|
||||||
|
log.info(f'Download: {Fore.GREEN+"Y" if account.plan.download else Fore.RED+"N"}'+Fore.LIGHTWHITE_EX)
|
||||||
|
log.info(f'Lossless: {Fore.GREEN+"Y" if account.plan.lossless else Fore.RED+"N"}'+Fore.LIGHTWHITE_EX)
|
||||||
|
log.info(f'Explicit: {Fore.GREEN+"Y" if account.plan.explicit else Fore.RED+"N"}'+Fore.LIGHTWHITE_EX)
|
||||||
|
log.info('-------------------------------')
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
if type(e) is AuthError:
|
||||||
|
log.error(Fore.RED + 'ARL Token Invalid/Expired.' + Fore.LIGHTWHITE_EX)
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
log.error(e)
|
||||||
|
return
|
||||||
|
|
||||||
def main(arlToken = None):
|
|
||||||
parser = ArgumentParser(prog='Account Checker', description='Check if Deezer ARL Token is valid')
|
def parse_arguments():
|
||||||
parser.add_argument('-c', '--check', help='Check if current ARL Token is active/valid',required=False, default=False, action='store_true')
|
parser = ArgumentParser(prog='Account Checker', description='Lidarr Extended Deezer ARL Token Tools')
|
||||||
parser.add_argument('-n', '--new', help='Set new ARL Token',type = str, required=False, default=False)
|
parser.add_argument('-c', '--check', help='Check if currently set ARL Token is active/valid', required=False, default=False, action='store_true')
|
||||||
|
parser.add_argument('-n', '--new_token', help='Set new ARL Token', type=str, required=False, default=False)
|
||||||
|
parser.add_argument('-t', '--test_token', help='Test any token for validity', type=str, required=False, default=False)
|
||||||
|
parser.add_argument('-d', '--debug', help='For debug and development, sets root path to match testing env. See DEBUG_ROOT_PATH', required=False, default=False, action='store_true')
|
||||||
|
|
||||||
if not argv[1:]:
|
if not argv[1:]:
|
||||||
parser.print_help()
|
parser.print_help()
|
||||||
parser.exit()
|
parser.exit()
|
||||||
|
|
||||||
args = parser.parse_args()
|
return parser, parser.parse_args()
|
||||||
arlToken_instance = LidarrExtendedAPI(arlToken)
|
|
||||||
|
|
||||||
if args.check is True:
|
|
||||||
if arlToken_instance.arlToken == '':
|
def get_version(root):
|
||||||
print("ARL Token not set. re-run with -n flag")
|
# Pull script version from bash script. will likely change this to a var passthrough
|
||||||
|
with open(root+CUSTOM_SERVICES_PATH+"ARLChecker", "r") as r:
|
||||||
|
for line in r:
|
||||||
|
if 'scriptVersion' in line:
|
||||||
|
return re.search(r'"([A-Za-z0-9_./\\-]*)"', line)[0].replace('"', '')
|
||||||
|
logging.error('Script Version not found! Exiting...')
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def get_active_log(root):
|
||||||
|
# Get current log file
|
||||||
|
path = root + LOG_FILES_DIRECTORY
|
||||||
|
latest_file = max([os.path.join(path, f) for f in os.listdir(path) if 'ARLChecker' in f], key=os.path.getctime)
|
||||||
|
return latest_file
|
||||||
|
|
||||||
|
|
||||||
|
def init_logging(version, log_file_path):
|
||||||
|
# Logging Setup
|
||||||
|
logging.basicConfig(
|
||||||
|
format=f'%(asctime)s :: ARLChecker :: {version} :: %(levelname)s :: %(message)s',
|
||||||
|
datefmt='%Y-%m-%d %H:%M:%S',
|
||||||
|
level=logging.INFO,
|
||||||
|
handlers=[
|
||||||
|
logging.StreamHandler(stdout),
|
||||||
|
logging.FileHandler(log_file_path, mode="a", encoding='utf-8')
|
||||||
|
]
|
||||||
|
)
|
||||||
|
logger = logging.getLogger('ARLChecker')
|
||||||
|
|
||||||
|
# Initialize colorama
|
||||||
|
init(autoreset=True)
|
||||||
|
logger.info(Fore.GREEN + 'Logger initialized'+Fore.LIGHTWHITE_EX)
|
||||||
|
|
||||||
|
return logger
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
root = ''
|
||||||
|
parser, args = parse_arguments()
|
||||||
|
if args.debug is True: # If debug flag set, works with IDE structure
|
||||||
|
root = DEBUG_ROOT_PATH
|
||||||
|
log = init_logging(get_version(root), get_active_log(root))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
arlToken_instance.check_token(arlToken_instance.arlToken)
|
if args.test_token:
|
||||||
|
log.info(Fore.CYAN+"CLI Token Tester"+Fore.LIGHTWHITE_EX)
|
||||||
|
check_token(args.test_token)
|
||||||
|
exit(0)
|
||||||
|
arl_checker_instance = LidarrExtendedAPI()
|
||||||
|
arl_checker_instance.root = root
|
||||||
|
if args.check is True:
|
||||||
|
if arl_checker_instance.currentARLToken == '':
|
||||||
|
log.error("ARL Token not set. re-run with -n flag")
|
||||||
|
try:
|
||||||
|
arl_checker_instance.parse_extended_conf()
|
||||||
|
arl_checker_instance.check_token_wrapper()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if 'Chat not found' in str(e):
|
if 'Chat not found' in str(e) or 'Chat_id' in str(e):
|
||||||
logger.error(Fore.RED + "Chat not found. Check your chat ID in extended.conf, or start a chat with your bot."+Fore.LIGHTWHITE_EX)
|
log.error(Fore.RED + "Chat not found. Check your chat ID in extended.conf, or start a chat with your bot."+Fore.LIGHTWHITE_EX)
|
||||||
elif 'The token' in str(e):
|
elif 'The token' in str(e):
|
||||||
logger.error(Fore.RED + "Check your Bot Token in extended.conf."+Fore.LIGHTWHITE_EX)
|
log.error(Fore.RED + "Check your Bot Token in extended.conf."+Fore.LIGHTWHITE_EX)
|
||||||
else:
|
else:
|
||||||
print(e)
|
log.error(e)
|
||||||
exit(1)
|
|
||||||
|
|
||||||
|
elif args.new_token:
|
||||||
elif args.new:
|
if args.new_token == '':
|
||||||
if args.new == '':
|
log.error('Please pass new ARL token as an argument')
|
||||||
print("Please pass new ARL token as an argument")
|
|
||||||
exit(96)
|
exit(96)
|
||||||
|
arl_checker_instance.newARLToken = '"'+args.new_token+'"'
|
||||||
arlToken_instance.newARLToken = '"'+args.new+'"'
|
arl_checker_instance.parse_extended_conf()
|
||||||
arlToken_instance.set_new_token()
|
arl_checker_instance.set_new_token()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
parser.print_help()
|
parser.print_help()
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(e, exc_info=True)
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main('FAKETOKEN')
|
main()
|
||||||
|
|
Loading…
Reference in a new issue