diff --git a/app/YtManagerApp/appmain.py b/app/YtManagerApp/appmain.py index 7eba9b4..215786c 100644 --- a/app/YtManagerApp/appmain.py +++ b/app/YtManagerApp/appmain.py @@ -5,9 +5,8 @@ import sys from django.conf import settings as dj_settings -from .management.appconfig import appconfig -from .management.jobs.synchronize import SynchronizeJob -from .scheduler import scheduler +from YtManagerApp.scheduler.jobs.synchronize_job import SynchronizeJob +from YtManagerApp.services import Services from django.db.utils import OperationalError @@ -36,8 +35,8 @@ def main(): __initialize_logger() try: - if appconfig.initialized: - scheduler.initialize() + if Services.appConfig.initialized: + Services.scheduler.initialize() SynchronizeJob.schedule_global_job() except OperationalError: # Settings table is not created when running migrate or makemigrations; diff --git a/app/YtManagerApp/management/appconfig.py b/app/YtManagerApp/management/appconfig.py index 46b6711..6676644 100644 --- a/app/YtManagerApp/management/appconfig.py +++ b/app/YtManagerApp/management/appconfig.py @@ -1,4 +1,3 @@ -from dynamic_preferences.registries import global_preferences_registry from YtManagerApp.dynamic_preferences_registry import Initialized, YouTubeAPIKey, AllowRegistrations, SyncSchedule, SchedulerConcurrency @@ -35,7 +34,3 @@ class AppConfig(object): value = subscription.user.preferences[pref] return value - - -global_prefs = global_preferences_registry.manager() -appconfig = AppConfig(global_prefs) diff --git a/app/YtManagerApp/management/downloader.py b/app/YtManagerApp/management/downloader.py index d206ed4..0b4ce75 100644 --- a/app/YtManagerApp/management/downloader.py +++ b/app/YtManagerApp/management/downloader.py @@ -1,4 +1,4 @@ -from YtManagerApp.management.jobs.download_video import DownloadVideoJob +from YtManagerApp.scheduler.jobs.download_video_job import DownloadVideoJob from YtManagerApp.models import Video, Subscription, VIDEO_ORDER_MAPPING from YtManagerApp.utils import first_non_null from django.conf import settings as srv_settings diff --git a/app/YtManagerApp/management/youtube_dl_manager.py b/app/YtManagerApp/management/youtube_dl_manager.py new file mode 100644 index 0000000..61eb291 --- /dev/null +++ b/app/YtManagerApp/management/youtube_dl_manager.py @@ -0,0 +1,111 @@ +import logging +import os +import subprocess +import sys + +import requests +from django.conf import settings as dj_settings + +LATEST_URL = "https://yt-dl.org/downloads/latest/youtube-dl" +GITHUB_API_LATEST_RELEASE = "https://api.github.com/repos/ytdl-org/youtube-dl/releases/latest" +log = logging.getLogger("YoutubeDlManager") + + +class YoutubeDlException(Exception): + pass + + +class YoutubeDlNotInstalledException(YoutubeDlException): + pass + + +class YoutubeDlRuntimeException(YoutubeDlException): + pass + + +class YoutubeDlManager(object): + + def __init__(self): + self.verbose = False + self.progress = False + + def _get_path(self): + return os.path.join(dj_settings.DATA_DIR, 'youtube-dl') + + def _check_installed(self, path): + return os.path.isfile(path) and os.access(path, os.X_OK) + + def _get_run_args(self): + run_args = [] + if self.verbose: + run_args.append('-v') + if self.progress: + run_args.append('--newline') + else: + run_args.append('--no-progress') + + return run_args + + def run(self, *args): + path = self._get_path() + if not self._check_installed(path): + log.error("Cannot run youtube-dl, it is not installed!") + raise YoutubeDlNotInstalledException + + run_args = self._get_run_args() + ret = subprocess.run([sys.executable, path, *run_args, *args], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + stdout = ret.stdout.decode('utf-8') + if len(stdout) > 0: + log.info("YoutubeDL: " + stdout) + + stderr = ret.stderr.decode('utf-8') + if len(stderr) > 0: + log.error("YoutubeDL: " + stderr) + + if ret.returncode != 0: + raise YoutubeDlRuntimeException() + + return stdout + + def get_installed_version(self): + return self.run('--version') + + def get_latest_version(self): + resp = requests.get(GITHUB_API_LATEST_RELEASE, allow_redirects=True) + resp.raise_for_status() + + info = resp.json() + return info['tag_name'] + + def install(self): + # Check if we are running the latest version + latest = self.get_latest_version() + try: + current = self.get_installed_version() + except YoutubeDlNotInstalledException: + current = None + if latest == current: + log.info(f"Running latest youtube-dl version ({current})!") + return + + # Download latest + resp = requests.get(LATEST_URL, allow_redirects=True, stream=True) + resp.raise_for_status() + + path = self._get_path() + with open(path + ".tmp", "wb") as f: + for chunk in resp.iter_content(10 * 1024): + f.write(chunk) + + # Replace + os.unlink(path) + os.rename(path + ".tmp", path) + os.chmod(path, 555) + + # Test run + newver = self.get_installed_version() + if current is None: + log.info(f"Installed youtube-dl version {newver}.") + else: + log.info(f"Upgraded youtube-dl from version {current} to {newver}.") diff --git a/app/YtManagerApp/models.py b/app/YtManagerApp/models.py index 3b779d3..227efda 100644 --- a/app/YtManagerApp/models.py +++ b/app/YtManagerApp/models.py @@ -199,17 +199,17 @@ class Video(models.Model): self.save() if self.downloaded_path is not None: from YtManagerApp.management.appconfig import appconfig - from YtManagerApp.management.jobs.delete_video import DeleteVideoJob - from YtManagerApp.management.jobs.synchronize import SynchronizeJob + from YtManagerApp.scheduler.jobs import DeleteVideoJob + from YtManagerApp.scheduler.jobs import SynchronizeJob if appconfig.for_sub(self.subscription, 'automatically_delete_watched'): DeleteVideoJob.schedule(self) SynchronizeJob.schedule_now_for_subscription(self.subscription) def mark_unwatched(self): - from YtManagerApp.management.jobs.synchronize import SynchronizeJob self.watched = False self.save() + from YtManagerApp.scheduler.jobs.synchronize_job import SynchronizeJob SynchronizeJob.schedule_now_for_subscription(self.subscription) def get_files(self): @@ -234,9 +234,9 @@ class Video(models.Model): def delete_files(self): if self.downloaded_path is not None: - from YtManagerApp.management.jobs.delete_video import DeleteVideoJob + from YtManagerApp.scheduler.jobs import DeleteVideoJob from YtManagerApp.management.appconfig import appconfig - from YtManagerApp.management.jobs.synchronize import SynchronizeJob + from YtManagerApp.scheduler.jobs import SynchronizeJob DeleteVideoJob.schedule(self) @@ -247,7 +247,7 @@ class Video(models.Model): def download(self): if not self.downloaded_path: - from YtManagerApp.management.jobs.download_video import DownloadVideoJob + from YtManagerApp.scheduler.jobs.download_video_job import DownloadVideoJob DownloadVideoJob.schedule(self) def __str__(self): diff --git a/app/YtManagerApp/scheduler.py b/app/YtManagerApp/scheduler.py deleted file mode 100644 index 5f3e252..0000000 --- a/app/YtManagerApp/scheduler.py +++ /dev/null @@ -1,268 +0,0 @@ -import datetime -import logging -import traceback -from typing import Type, Union, Optional, Callable, List, Any - -import pytz -from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.triggers.base import BaseTrigger -from django.contrib.auth.models import User - -from YtManagerApp.management.appconfig import appconfig -from YtManagerApp.models import JobExecution, JobMessage, JOB_STATES_MAP, JOB_MESSAGE_LEVELS_MAP - - -class ProgressTracker(object): - """ - Class which helps keep track of complex operation progress. - """ - - def __init__(self, total_steps: float = 100, initial_steps: float = 0, - listener: Callable[[float, str], None] = None, - listener_args: List[Any] = None, - parent: Optional["ProgressTracker"] = None): - """ - Constructor - :param total_steps: Total number of steps required by this operation - :param initial_steps: Starting steps - :param parent: Parent progress tracker - :param listener: Callable which is called when any progress happens - """ - - self.total_steps = total_steps - self.steps = initial_steps - - self.__subtask: ProgressTracker = None - self.__subtask_steps = 0 - - self.__parent = parent - self.__listener = listener - self.__listener_args = listener_args or [] - - def __on_progress(self, progress_msg): - if self.__listener is not None: - self.__listener(*self.__listener_args, self.compute_progress(), progress_msg) - - if self.__parent is not None: - self.__parent.__on_progress(progress_msg) - - def advance(self, steps: float = 1, progress_msg: str = ''): - """ - Advances a number of steps. - :param steps: Number of steps to advance - :param progress_msg: A message which will be passed to a listener - :return: - """ - - # We can assume previous subtask is now completed - if self.__subtask is not None: - self.steps += self.__subtask_steps - self.__subtask = None - - self.steps += steps - self.__on_progress(progress_msg) - - def subtask(self, steps: float = 1, subtask_total_steps: float = 100, subtask_initial_steps: float = 0): - """ - Creates a 'subtask' which has its own progress, which will be used in the calculation of the final progress. - :param steps: Number of steps the subtask is 'worth' - :param subtask_total_steps: Total number of steps for subtask - :param subtask_initial_steps: Initial steps for subtask - :return: ProgressTracker for subtask - """ - - # We can assume previous subtask is now completed - if self.__subtask is not None: - self.steps += self.__subtask_steps - - self.__subtask = ProgressTracker(total_steps=subtask_total_steps, - initial_steps=subtask_initial_steps, - parent=self) - self.__subtask_steps = steps - - return self.__subtask - - def compute_progress(self): - """ - Calculates final progress value in percent. - :return: value in [0,1] interval representing progress - """ - base = float(self.steps) / self.total_steps - if self.__subtask is not None: - base += self.__subtask.compute_progress() * self.__subtask_steps / self.total_steps - - return min(base, 1.0) - - -class Job(object): - name = 'GenericJob' - - """ - Base class for jobs running in the scheduler. - """ - - def __init__(self, job_execution, *args): - self.job_execution = job_execution - self.log = logging.getLogger(self.name) - self.__progress_tracker = ProgressTracker(listener=Job.__on_progress, - listener_args=[self]) - - def get_description(self) -> str: - """ - Gets a user friendly description of this job. - Should be overriden in job classes. - :return: - """ - return "Running job..." - - # - # progress tracking - # - - def __on_progress(self, percent: float, message: str): - self.usr_log(message, progress=percent) - - def set_total_steps(self, steps: float): - """ - Sets the total number of work steps this task has. This is used for tracking progress. - Should be overriden in job classes. - :return: - """ - self.__progress_tracker.total_steps = steps - - def progress_advance(self, steps: float = 1, progress_msg: str = ''): - """ - Advances a number of steps. - :param steps: Number of steps to advance - :param progress_msg: A message which will be passed to a listener - :return: - """ - self.__progress_tracker.advance(steps, progress_msg) - - def create_subtask(self, steps: float = 1, subtask_total_steps: float = 100, subtask_initial_steps: float = 0): - """ - Creates a 'subtask' which has its own progress, which will be used in the calculation of the final progress. - :param steps: Number of steps the subtask is 'worth' - :param subtask_total_steps: Total number of steps for subtask - :param subtask_initial_steps: Initial steps for subtask - :return: ProgressTracker for subtask - """ - return self.__progress_tracker.subtask(steps, subtask_total_steps, subtask_initial_steps) - - # - # user log messages - # - - def usr_log(self, message, progress: Optional[float] = None, level: int = JOB_MESSAGE_LEVELS_MAP['normal'], - suppress_notification: bool = False): - """ - Creates a new log message which will be shown on the user interface. - Progress can also be updated using this method. - :param message: A message to be displayed to the user - :param progress: Progress percentage in [0,1] interval - :param level: Log level (normal, warning, error) - :param suppress_notification: If set to true, a notification will not displayed to the user, but it will - appear in the system logs. - :return: - """ - - message = JobMessage(job=self.job_execution, - progress=progress, - message=message, - level=level, - suppress_notification=suppress_notification) - message.save() - - def usr_warn(self, message, progress: Optional[float] = None, suppress_notification: bool = False): - """ - Creates a new warning message which will be shown on the user interface. - Progress can also be updated using this method. - :param message: A message to be displayed to the user - :param progress: Progress percentage in [0,1] interval - :param suppress_notification: If set to true, a notification will not displayed to the user, but it will - appear in the system logs. - :return: - """ - self.usr_log(message, progress, JOB_MESSAGE_LEVELS_MAP['warning'], suppress_notification) - - def usr_err(self, message, progress: Optional[float] = None, suppress_notification: bool = False): - """ - Creates a new error message which will be shown on the user interface. - Progress can also be updated using this method. - :param message: A message to be displayed to the user - :param progress: Progress percentage in [0,1] interval - :param suppress_notification: If set to true, a notification will not displayed to the user, but it will - appear in the system logs. - :return: - """ - self.usr_log(message, progress, JOB_MESSAGE_LEVELS_MAP['error'], suppress_notification) - - # - # main run method - # - def run(self): - pass - - -class YtsmScheduler(object): - - def __init__(self): - self._apscheduler = BackgroundScheduler() - - def initialize(self): - # set state of existing jobs as "interrupted" - JobExecution.objects\ - .filter(status=JOB_STATES_MAP['running'])\ - .update(status=JOB_STATES_MAP['interrupted']) - - self._configure_scheduler() - self._apscheduler.start() - - def _configure_scheduler(self): - logger = logging.getLogger('scheduler') - executors = { - 'default': { - 'type': 'threadpool', - 'max_workers': appconfig.concurrency - } - } - job_defaults = { - 'misfire_grace_time': 60 * 60 * 24 * 365 # 1 year - } - self._apscheduler.configure(logger=logger, executors=executors, job_defaults=job_defaults) - - def _run_job(self, job_class: Type[Job], user: Optional[User], args: Union[tuple, list]): - - job_execution = JobExecution(user=user, status=JOB_STATES_MAP['running']) - job_execution.save() - job_instance = job_class(job_execution, *args) - - # update description - job_execution.description = job_instance.get_description() - job_execution.save() - - try: - job_instance.run() - job_execution.status = JOB_STATES_MAP['finished'] - - except Exception as ex: - job_instance.log.critical("Job failed with exception: %s", traceback.format_exc()) - job_instance.usr_err(job_instance.name + " operation failed: " + str(ex)) - job_execution.status = JOB_STATES_MAP['failed'] - - finally: - job_execution.end_date = datetime.datetime.now(tz=pytz.UTC) - job_execution.save() - - def add_job(self, job_class: Type[Job], trigger: Union[str, BaseTrigger] = None, - args: Union[list, tuple] = None, - user: Optional[User] = None, - **kwargs): - if args is None: - args = [] - - return self._apscheduler.add_job(YtsmScheduler._run_job, trigger=trigger, args=[self, job_class, user, args], - **kwargs) - - -scheduler = YtsmScheduler() diff --git a/app/YtManagerApp/management/jobs/__init__.py b/app/YtManagerApp/scheduler/__init__.py similarity index 100% rename from app/YtManagerApp/management/jobs/__init__.py rename to app/YtManagerApp/scheduler/__init__.py diff --git a/app/YtManagerApp/scheduler/job.py b/app/YtManagerApp/scheduler/job.py new file mode 100644 index 0000000..34731be --- /dev/null +++ b/app/YtManagerApp/scheduler/job.py @@ -0,0 +1,118 @@ +import logging +from abc import abstractmethod +from typing import Optional + +from YtManagerApp.models import JOB_MESSAGE_LEVELS_MAP, JobMessage +from .progress_tracker import ProgressTracker + + +class Job(object): + name = 'GenericJob' + + """ + Base class for jobs running in the scheduler. + """ + + def __init__(self, job_execution, *_): + self.job_execution = job_execution + self.log = logging.getLogger(self.name) + self.__progress_tracker = ProgressTracker(listener=Job.__on_progress, + listener_args=[self]) + + @abstractmethod + def get_description(self) -> str: + """ + Gets a user friendly description of this job. + Should be overriden in job classes. + :return: + """ + return "Running job..." + + # + # progress tracking + # + + def __on_progress(self, percent: float, message: str): + self.usr_log(message, progress=percent) + + def set_total_steps(self, steps: float): + """ + Sets the total number of work steps this task has. This is used for tracking progress. + Should be overriden in job classes. + :return: + """ + self.__progress_tracker.total_steps = steps + + def progress_advance(self, steps: float = 1, progress_msg: str = ''): + """ + Advances a number of steps. + :param steps: Number of steps to advance + :param progress_msg: A message which will be passed to a listener + :return: + """ + self.__progress_tracker.advance(steps, progress_msg) + + def create_subtask(self, steps: float = 1, subtask_total_steps: float = 100, subtask_initial_steps: float = 0): + """ + Creates a 'subtask' which has its own progress, which will be used in the calculation of the final progress. + :param steps: Number of steps the subtask is 'worth' + :param subtask_total_steps: Total number of steps for subtask + :param subtask_initial_steps: Initial steps for subtask + :return: ProgressTracker for subtask + """ + return self.__progress_tracker.subtask(steps, subtask_total_steps, subtask_initial_steps) + + # + # user log messages + # + + def usr_log(self, message, progress: Optional[float] = None, level: int = JOB_MESSAGE_LEVELS_MAP['normal'], + suppress_notification: bool = False): + """ + Creates a new log message which will be shown on the user interface. + Progress can also be updated using this method. + :param message: A message to be displayed to the user + :param progress: Progress percentage in [0,1] interval + :param level: Log level (normal, warning, error) + :param suppress_notification: If set to true, a notification will not displayed to the user, but it will + appear in the system logs. + :return: + """ + + message = JobMessage(job=self.job_execution, + progress=progress, + message=message, + level=level, + suppress_notification=suppress_notification) + message.save() + + def usr_warn(self, message, progress: Optional[float] = None, suppress_notification: bool = False): + """ + Creates a new warning message which will be shown on the user interface. + Progress can also be updated using this method. + :param message: A message to be displayed to the user + :param progress: Progress percentage in [0,1] interval + :param suppress_notification: If set to true, a notification will not displayed to the user, but it will + appear in the system logs. + :return: + """ + self.usr_log(message, progress, JOB_MESSAGE_LEVELS_MAP['warning'], suppress_notification) + + def usr_err(self, message, progress: Optional[float] = None, suppress_notification: bool = False): + """ + Creates a new error message which will be shown on the user interface. + Progress can also be updated using this method. + :param message: A message to be displayed to the user + :param progress: Progress percentage in [0,1] interval + :param suppress_notification: If set to true, a notification will not displayed to the user, but it will + appear in the system logs. + :return: + """ + self.usr_log(message, progress, JOB_MESSAGE_LEVELS_MAP['error'], suppress_notification) + + # + # main run method + # + @abstractmethod + def run(self): + pass diff --git a/app/YtManagerApp/scheduler/jobs/__init__.py b/app/YtManagerApp/scheduler/jobs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/YtManagerApp/management/jobs/delete_video.py b/app/YtManagerApp/scheduler/jobs/delete_video_job.py similarity index 88% rename from app/YtManagerApp/management/jobs/delete_video.py rename to app/YtManagerApp/scheduler/jobs/delete_video_job.py index 977935a..39141f7 100644 --- a/app/YtManagerApp/management/jobs/delete_video.py +++ b/app/YtManagerApp/scheduler/jobs/delete_video_job.py @@ -1,7 +1,7 @@ import os from YtManagerApp.models import Video -from YtManagerApp.scheduler import Job, scheduler +from YtManagerApp.scheduler.job import Job class DeleteVideoJob(Job): @@ -43,4 +43,5 @@ class DeleteVideoJob(Job): :param video: :return: """ - scheduler.add_job(DeleteVideoJob, args=[video]) + from YtManagerApp.services import Services + Services.scheduler.add_job(DeleteVideoJob, args=[video]) diff --git a/app/YtManagerApp/management/jobs/download_video.py b/app/YtManagerApp/scheduler/jobs/download_video_job.py similarity index 77% rename from app/YtManagerApp/management/jobs/download_video.py rename to app/YtManagerApp/scheduler/jobs/download_video_job.py index 1d8b838..14984c6 100644 --- a/app/YtManagerApp/management/jobs/download_video.py +++ b/app/YtManagerApp/scheduler/jobs/download_video_job.py @@ -6,7 +6,7 @@ from threading import Lock import youtube_dl from YtManagerApp.models import Video -from YtManagerApp.scheduler import Job, scheduler +from YtManagerApp.scheduler.job import Job class DownloadVideoJob(Job): @@ -15,14 +15,14 @@ class DownloadVideoJob(Job): def __init__(self, job_execution, video: Video, attempt: int = 1): super().__init__(job_execution) - self.__video = video - self.__attempt = attempt - self.__log_youtube_dl = self.log.getChild('youtube_dl') + self._video = video + self._attempt = attempt + self._log_youtube_dl = self.log.getChild('youtube_dl') def get_description(self): - ret = "Downloading video " + self.__video.name - if self.__attempt > 1: - ret += f" (attempt {self.__attempt})" + ret = "Downloading video " + self._video.name + if self._attempt > 1: + ret += f" (attempt {self._attempt})" return ret def run(self): @@ -32,29 +32,30 @@ class DownloadVideoJob(Job): self.__lock.acquire() try: - user = self.__video.subscription.user + user = self._video.subscription.user max_attempts = user.preferences['max_download_attempts'] - youtube_dl_params, output_path = self.__build_youtube_dl_params(self.__video) + youtube_dl_params, output_path = self.__build_youtube_dl_params(self._video) with youtube_dl.YoutubeDL(youtube_dl_params) as yt: - ret = yt.download(["https://www.youtube.com/watch?v=" + self.__video.video_id]) + ret = yt.download(["https://www.youtube.com/watch?v=" + self._video.video_id]) self.log.info('Download finished with code %d', ret) if ret == 0: - self.__video.downloaded_path = output_path - self.__video.save() - self.log.info('Video %d [%s %s] downloaded successfully!', self.__video.id, self.__video.video_id, self.__video.name) + self._video.downloaded_path = output_path + self._video.save() + self.log.info('Video %d [%s %s] downloaded successfully!', self._video.id, self._video.video_id, + self._video.name) - elif self.__attempt <= max_attempts: - self.log.warning('Re-enqueueing video (attempt %d/%d)', self.__attempt, max_attempts) - DownloadVideoJob.schedule(self.__video, self.__attempt + 1) + elif self._attempt <= max_attempts: + self.log.warning('Re-enqueueing video (attempt %d/%d)', self._attempt, max_attempts) + DownloadVideoJob.schedule(self._video, self._attempt + 1) else: - self.log.error('Multiple attempts to download video %d [%s %s] failed!', self.__video.id, self.__video.video_id, - self.__video.name) - self.__video.downloaded_path = '' - self.__video.save() + self.log.error('Multiple attempts to download video %d [%s %s] failed!', self._video.id, + self._video.video_id, self._video.name) + self._video.downloaded_path = '' + self._video.save() finally: self.__lock.release() @@ -74,7 +75,7 @@ class DownloadVideoJob(Job): output_path = os.path.normpath(output_path) youtube_dl_params = { - 'logger': self.__log_youtube_dl, + 'logger': self._log_youtube_dl, 'format': user.preferences['download_format'], 'outtmpl': output_path, 'writethumbnail': True, @@ -131,4 +132,5 @@ class DownloadVideoJob(Job): :param attempt: :return: """ - scheduler.add_job(DownloadVideoJob, args=[video, attempt]) + from YtManagerApp.services import Services + Services.scheduler.add_job(DownloadVideoJob, args=[video, attempt]) diff --git a/app/YtManagerApp/management/jobs/synchronize.py b/app/YtManagerApp/scheduler/jobs/synchronize_job.py similarity index 92% rename from app/YtManagerApp/management/jobs/synchronize.py rename to app/YtManagerApp/scheduler/jobs/synchronize_job.py index 71cc34e..d1935e7 100644 --- a/app/YtManagerApp/management/jobs/synchronize.py +++ b/app/YtManagerApp/scheduler/jobs/synchronize_job.py @@ -6,10 +6,10 @@ from apscheduler.triggers.cron import CronTrigger from django.db.models import Max from django.conf import settings -from YtManagerApp.management.appconfig import appconfig from YtManagerApp.management.downloader import fetch_thumbnail, downloader_process_subscription from YtManagerApp.models import * -from YtManagerApp.scheduler import scheduler, Job +from YtManagerApp.scheduler.job import Job +from YtManagerApp.services import Services from YtManagerApp.utils import youtube from external.pytaw.pytaw.utils import iterate_chunks @@ -82,7 +82,6 @@ class SynchronizeJob(Job): if video.video_id in video_stats: self.update_video_stats(video, video_stats[video.video_id]) - # Start downloading videos for sub in work_subs: downloader_process_subscription(sub) @@ -167,19 +166,19 @@ class SynchronizeJob(Job): @staticmethod def schedule_global_job(): - trigger = CronTrigger.from_crontab(appconfig.sync_schedule) + trigger = CronTrigger.from_crontab(Services.appConfig.sync_schedule) if SynchronizeJob.__global_sync_job is None: - trigger = CronTrigger.from_crontab(appconfig.sync_schedule) - SynchronizeJob.__global_sync_job = scheduler.add_job(SynchronizeJob, trigger, max_instances=1, coalesce=True) + trigger = CronTrigger.from_crontab(Services.appConfig.sync_schedule) + SynchronizeJob.__global_sync_job = Services.scheduler.add_job(SynchronizeJob, trigger, max_instances=1, coalesce=True) else: SynchronizeJob.__global_sync_job.reschedule(trigger, max_instances=1, coalesce=True) @staticmethod def schedule_now(): - scheduler.add_job(SynchronizeJob, max_instances=1, coalesce=True) + Services.scheduler.add_job(SynchronizeJob, max_instances=1, coalesce=True) @staticmethod def schedule_now_for_subscription(subscription): - scheduler.add_job(SynchronizeJob, user=subscription.user, args=[subscription]) + Services.scheduler.add_job(SynchronizeJob, user=subscription.user, args=[subscription]) diff --git a/app/YtManagerApp/scheduler/progress_tracker.py b/app/YtManagerApp/scheduler/progress_tracker.py new file mode 100644 index 0000000..b551b73 --- /dev/null +++ b/app/YtManagerApp/scheduler/progress_tracker.py @@ -0,0 +1,83 @@ +from typing import Callable, List, Any, Optional + + +class ProgressTracker(object): + """ + Class which helps keep track of complex operation progress. + """ + + def __init__(self, total_steps: float = 100, initial_steps: float = 0, + listener: Callable[[float, str], None] = None, + listener_args: List[Any] = None, + parent: Optional["ProgressTracker"] = None): + """ + Constructor + :param total_steps: Total number of steps required by this operation + :param initial_steps: Starting steps + :param parent: Parent progress tracker + :param listener: Callable which is called when any progress happens + """ + + self.total_steps = total_steps + self.steps = initial_steps + + self.__subtask: ProgressTracker = None + self.__subtask_steps = 0 + + self.__parent = parent + self.__listener = listener + self.__listener_args = listener_args or [] + + def __on_progress(self, progress_msg): + if self.__listener is not None: + self.__listener(*self.__listener_args, self.compute_progress(), progress_msg) + + if self.__parent is not None: + self.__parent.__on_progress(progress_msg) + + def advance(self, steps: float = 1, progress_msg: str = ''): + """ + Advances a number of steps. + :param steps: Number of steps to advance + :param progress_msg: A message which will be passed to a listener + :return: + """ + + # We can assume previous subtask is now completed + if self.__subtask is not None: + self.steps += self.__subtask_steps + self.__subtask = None + + self.steps += steps + self.__on_progress(progress_msg) + + def subtask(self, steps: float = 1, subtask_total_steps: float = 100, subtask_initial_steps: float = 0): + """ + Creates a 'subtask' which has its own progress, which will be used in the calculation of the final progress. + :param steps: Number of steps the subtask is 'worth' + :param subtask_total_steps: Total number of steps for subtask + :param subtask_initial_steps: Initial steps for subtask + :return: ProgressTracker for subtask + """ + + # We can assume previous subtask is now completed + if self.__subtask is not None: + self.steps += self.__subtask_steps + + self.__subtask = ProgressTracker(total_steps=subtask_total_steps, + initial_steps=subtask_initial_steps, + parent=self) + self.__subtask_steps = steps + + return self.__subtask + + def compute_progress(self): + """ + Calculates final progress value in percent. + :return: value in [0,1] interval representing progress + """ + base = float(self.steps) / self.total_steps + if self.__subtask is not None: + base += self.__subtask.compute_progress() * self.__subtask_steps / self.total_steps + + return min(base, 1.0) diff --git a/app/YtManagerApp/scheduler/scheduler.py b/app/YtManagerApp/scheduler/scheduler.py new file mode 100644 index 0000000..15bb40f --- /dev/null +++ b/app/YtManagerApp/scheduler/scheduler.py @@ -0,0 +1,75 @@ +import datetime +import logging +import traceback +from typing import Type, Union, Optional + +import pytz +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.base import BaseTrigger +from django.contrib.auth.models import User + +from YtManagerApp.management.appconfig import AppConfig +from YtManagerApp.models import JobExecution, JOB_STATES_MAP +from YtManagerApp.scheduler.job import Job + + +class YtsmScheduler(object): + + def __init__(self, app_config: AppConfig): + self._ap_scheduler = BackgroundScheduler() + self._app_config = app_config + + def initialize(self): + # set state of existing jobs as "interrupted" + JobExecution.objects\ + .filter(status=JOB_STATES_MAP['running'])\ + .update(status=JOB_STATES_MAP['interrupted']) + + self._configure_scheduler() + self._ap_scheduler.start() + + def _configure_scheduler(self): + logger = logging.getLogger('scheduler') + executors = { + 'default': { + 'type': 'threadpool', + 'max_workers': self._app_config.concurrency + } + } + job_defaults = { + 'misfire_grace_time': 60 * 60 * 24 * 365 # 1 year + } + self._ap_scheduler.configure(logger=logger, executors=executors, job_defaults=job_defaults) + + def _run_job(self, job_class: Type[Job], user: Optional[User], args: Union[tuple, list]): + + job_execution = JobExecution(user=user, status=JOB_STATES_MAP['running']) + job_execution.save() + job_instance = job_class(job_execution, *args) + + # update description + job_execution.description = job_instance.get_description() + job_execution.save() + + try: + job_instance.run() + job_execution.status = JOB_STATES_MAP['finished'] + + except Exception as ex: + job_instance.log.critical("Job failed with exception: %s", traceback.format_exc()) + job_instance.usr_err(job_instance.name + " operation failed: " + str(ex)) + job_execution.status = JOB_STATES_MAP['failed'] + + finally: + job_execution.end_date = datetime.datetime.now(tz=pytz.UTC) + job_execution.save() + + def add_job(self, job_class: Type[Job], trigger: Union[str, BaseTrigger] = None, + args: Union[list, tuple] = None, + user: Optional[User] = None, + **kwargs): + if args is None: + args = [] + + return self._ap_scheduler.add_job(YtsmScheduler._run_job, trigger=trigger, args=[self, job_class, user, args], + **kwargs) diff --git a/app/YtManagerApp/services.py b/app/YtManagerApp/services.py new file mode 100644 index 0000000..3057ae5 --- /dev/null +++ b/app/YtManagerApp/services.py @@ -0,0 +1,13 @@ +import dependency_injector.containers as containers +import dependency_injector.providers as providers +from dynamic_preferences.registries import global_preferences_registry +from YtManagerApp.management.appconfig import AppConfig +from YtManagerApp.management.youtube_dl_manager import YoutubeDlManager +from YtManagerApp.scheduler.scheduler import YtsmScheduler + + +class Services(containers.DeclarativeContainer): + globalPreferencesRegistry = providers.Object(global_preferences_registry.manager()) + appConfig = providers.Singleton(AppConfig, globalPreferencesRegistry) + scheduler = providers.Singleton(YtsmScheduler, appConfig) + youtubeDLManager = providers.Singleton(YoutubeDlManager) diff --git a/app/YtManagerApp/utils/youtube.py b/app/YtManagerApp/utils/youtube.py index f8df22e..d30097d 100644 --- a/app/YtManagerApp/utils/youtube.py +++ b/app/YtManagerApp/utils/youtube.py @@ -46,4 +46,4 @@ def best_thumbnail(resource: Resource) -> Optional[Thumbnail]: if thumbs is None or len(thumbs) <= 0: return None - return max(thumbs, key=lambda t: t.width * t.height) \ No newline at end of file + return max(thumbs, key=lambda t: t.width * t.height) diff --git a/app/YtManagerApp/views/actions.py b/app/YtManagerApp/views/actions.py index bbab4ca..35e0e84 100644 --- a/app/YtManagerApp/views/actions.py +++ b/app/YtManagerApp/views/actions.py @@ -2,8 +2,8 @@ from django.contrib.auth.mixins import LoginRequiredMixin from django.http import JsonResponse from django.views.generic import View -from YtManagerApp.management.jobs.synchronize import SynchronizeJob from YtManagerApp.models import Video +from YtManagerApp.scheduler.jobs.synchronize_job import SynchronizeJob class SyncNowView(LoginRequiredMixin, View): diff --git a/app/YtManagerApp/views/auth.py b/app/YtManagerApp/views/auth.py index cbea096..4f07836 100644 --- a/app/YtManagerApp/views/auth.py +++ b/app/YtManagerApp/views/auth.py @@ -6,7 +6,7 @@ from django.http import HttpResponseForbidden from django.urls import reverse_lazy from django.views.generic import FormView, TemplateView -from YtManagerApp.management.appconfig import appconfig +from YtManagerApp.services import Services from YtManagerApp.views.forms.auth import ExtendedAuthenticationForm, ExtendedUserCreationForm @@ -36,7 +36,7 @@ class RegisterView(FormView): return context def post(self, request, *args, **kwargs): - if not appconfig.allow_registrations: + if not Services.appConfig.allow_registrations: return HttpResponseForbidden("Registrations are disabled!") return super().post(request, *args, **kwargs) diff --git a/app/YtManagerApp/views/first_time.py b/app/YtManagerApp/views/first_time.py index dc65c2e..fe3ef43 100644 --- a/app/YtManagerApp/views/first_time.py +++ b/app/YtManagerApp/views/first_time.py @@ -8,9 +8,8 @@ from django.shortcuts import redirect from django.urls import reverse_lazy from django.views.generic import FormView -from YtManagerApp.management.appconfig import appconfig -from YtManagerApp.management.jobs.synchronize import SynchronizeJob -from YtManagerApp.scheduler import scheduler +from YtManagerApp.scheduler.jobs.synchronize_job import SynchronizeJob +from YtManagerApp.services import Services from YtManagerApp.views.forms.first_time import WelcomeForm, ApiKeyForm, PickAdminUserForm, ServerConfigForm, DoneForm, \ UserCreationForm, LoginForm @@ -25,7 +24,7 @@ class WizardStepMixin: def get(self, request, *args, **kwargs): # Prevent access if application is already initialized - if appconfig.initialized: + if Services.appConfig.initialized: logger.debug(f"Attempted to access {request.path}, but first time setup already run. Redirected to home " f"page.") return redirect('home') @@ -33,7 +32,7 @@ class WizardStepMixin: return super().get(request, *args, **kwargs) def post(self, request, *args, **kwargs): - if appconfig.initialized: + if Services.appConfig.initialized: logger.debug(f"Attempted to post {request.path}, but first time setup already run.") return HttpResponseForbidden() return super().post(request, *args, **kwargs) @@ -66,14 +65,14 @@ class Step1ApiKeyView(WizardStepMixin, FormView): def get_initial(self): initial = super().get_initial() - initial['api_key'] = appconfig.youtube_api_key + initial['api_key'] = Services.appConfig.youtube_api_key return initial def form_valid(self, form): key = form.cleaned_data['api_key'] # TODO: validate key if key is not None and len(key) > 0: - appconfig.youtube_api_key = key + Services.appConfig.youtube_api_key = key return super().form_valid(form) @@ -150,8 +149,8 @@ class Step3ConfigureView(WizardStepMixin, FormView): def get_initial(self): initial = super().get_initial() - initial['allow_registrations'] = appconfig.allow_registrations - initial['sync_schedule'] = appconfig.sync_schedule + initial['allow_registrations'] = Services.appConfig.allow_registrations + initial['sync_schedule'] = Services.appConfig.sync_schedule initial['auto_download'] = self.request.user.preferences['auto_download'] initial['download_location'] = self.request.user.preferences['download_path'] return initial @@ -159,11 +158,11 @@ class Step3ConfigureView(WizardStepMixin, FormView): def form_valid(self, form): allow_registrations = form.cleaned_data['allow_registrations'] if allow_registrations is not None: - appconfig.allow_registrations = allow_registrations + Services.appConfig.allow_registrations = allow_registrations sync_schedule = form.cleaned_data['sync_schedule'] if sync_schedule is not None and len(sync_schedule) > 0: - appconfig.sync_schedule = sync_schedule + Services.appConfig.sync_schedule = sync_schedule auto_download = form.cleaned_data['auto_download'] if auto_download is not None: @@ -174,10 +173,10 @@ class Step3ConfigureView(WizardStepMixin, FormView): self.request.user.preferences['download_path'] = download_location # Set initialized to true - appconfig.initialized = True + Services.appConfig.initialized = True # Start scheduler if not started - scheduler.initialize() + Services.scheduler.initialize() SynchronizeJob.schedule_global_job() return super().form_valid(form) diff --git a/app/YtManagerApp/views/forms/settings.py b/app/YtManagerApp/views/forms/settings.py index df0ef69..f9b6f58 100644 --- a/app/YtManagerApp/views/forms/settings.py +++ b/app/YtManagerApp/views/forms/settings.py @@ -6,8 +6,8 @@ from YtManagerApp.dynamic_preferences_registry import MarkDeletedAsWatched, Auto DownloadGlobalLimit, DownloadGlobalSizeLimit, DownloadSubscriptionLimit, DownloadMaxAttempts, DownloadOrder, \ DownloadPath, DownloadFilePattern, DownloadFormat, DownloadSubtitles, DownloadAutogeneratedSubtitles, \ DownloadAllSubtitles, DownloadSubtitlesLangs, DownloadSubtitlesFormat -from YtManagerApp.management.appconfig import appconfig from YtManagerApp.models import VIDEO_ORDER_CHOICES +from YtManagerApp.services import Services class SettingsForm(forms.Form): @@ -234,25 +234,25 @@ class AdminSettingsForm(forms.Form): @staticmethod def get_initials(): return { - 'api_key': appconfig.youtube_api_key, - 'allow_registrations': appconfig.allow_registrations, - 'sync_schedule': appconfig.sync_schedule, - 'scheduler_concurrency': appconfig.concurrency, + 'api_key': Services.appConfig.youtube_api_key, + 'allow_registrations': Services.appConfig.allow_registrations, + 'sync_schedule': Services.appConfig.sync_schedule, + 'scheduler_concurrency': Services.appConfig.concurrency, } def save(self): api_key = self.cleaned_data['api_key'] if api_key is not None and len(api_key) > 0: - appconfig.youtube_api_key = api_key + Services.appConfig.youtube_api_key = api_key allow_registrations = self.cleaned_data['allow_registrations'] if allow_registrations is not None: - appconfig.allow_registrations = allow_registrations + Services.appConfig.allow_registrations = allow_registrations sync_schedule = self.cleaned_data['sync_schedule'] if sync_schedule is not None and len(sync_schedule) > 0: - appconfig.sync_schedule = sync_schedule + Services.appConfig.sync_schedule = sync_schedule concurrency = self.cleaned_data['scheduler_concurrency'] if concurrency is not None: - appconfig.concurrency = concurrency + Services.appConfig.concurrency = concurrency diff --git a/app/YtManagerApp/views/index.py b/app/YtManagerApp/views/index.py index 57864f2..dd49d79 100644 --- a/app/YtManagerApp/views/index.py +++ b/app/YtManagerApp/views/index.py @@ -11,8 +11,8 @@ from django.views.generic.edit import FormMixin from django.conf import settings from django.core.paginator import Paginator from YtManagerApp.management.videos import get_videos -from YtManagerApp.management.appconfig import appconfig from YtManagerApp.models import Subscription, SubscriptionFolder, VIDEO_ORDER_CHOICES, VIDEO_ORDER_MAPPING +from YtManagerApp.services import Services from YtManagerApp.utils import youtube, subscription_file_parser from YtManagerApp.views.controls.modal import ModalMixin @@ -111,7 +111,7 @@ def __tree_sub_id(sub_id): def index(request: HttpRequest): - if not appconfig.initialized: + if not Services.appConfig.initialized: return redirect('first_time_0') context = { diff --git a/app/YtManagerApp/views/settings.py b/app/YtManagerApp/views/settings.py index 36ecf93..9d621ac 100644 --- a/app/YtManagerApp/views/settings.py +++ b/app/YtManagerApp/views/settings.py @@ -3,7 +3,7 @@ from django.http import HttpResponseForbidden from django.urls import reverse_lazy from django.views.generic import FormView -from YtManagerApp.management.jobs.synchronize import SynchronizeJob +from YtManagerApp.scheduler.jobs.synchronize_job import SynchronizeJob from YtManagerApp.views.forms.settings import SettingsForm, AdminSettingsForm diff --git a/requirements.txt b/requirements.txt index d120bba..fd1526a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,4 +12,5 @@ google_auth_oauthlib oauth2client psycopg2-binary python-dateutil -Pillow \ No newline at end of file +Pillow +dependency-injector \ No newline at end of file