diff --git a/app/YtManagerApp/appmain.py b/app/YtManagerApp/appmain.py index 8054260..7eba9b4 100644 --- a/app/YtManagerApp/appmain.py +++ b/app/YtManagerApp/appmain.py @@ -6,8 +6,8 @@ import sys from django.conf import settings as dj_settings from .management.appconfig import appconfig -from .management.jobs.synchronize import schedule_synchronize_global -from .scheduler import initialize_scheduler +from .management.jobs.synchronize import SynchronizeJob +from .scheduler import scheduler from django.db.utils import OperationalError @@ -15,8 +15,6 @@ def __initialize_logger(): log_dir = os.path.join(dj_settings.DATA_DIR, 'logs') os.makedirs(log_dir, exist_ok=True) - handlers = [] - file_handler = logging.handlers.RotatingFileHandler( os.path.join(log_dir, "log.log"), maxBytes=1024 * 1024, @@ -34,16 +32,16 @@ def __initialize_logger(): logging.root.addHandler(console_handler) - def main(): __initialize_logger() try: if appconfig.initialized: - initialize_scheduler() - schedule_synchronize_global() + scheduler.initialize() + SynchronizeJob.schedule_global_job() except OperationalError: - # Settings table is not created when running migrate or makemigrations, so just don't do anything in this case. + # Settings table is not created when running migrate or makemigrations; + # Just don't do anything in this case. pass logging.info('Initialization complete.') diff --git a/app/YtManagerApp/management/downloader.py b/app/YtManagerApp/management/downloader.py index 4259630..41ea0b6 100644 --- a/app/YtManagerApp/management/downloader.py +++ b/app/YtManagerApp/management/downloader.py @@ -1,4 +1,4 @@ -from YtManagerApp.management.jobs.download_video import schedule_download_video +from YtManagerApp.management.jobs.download_video import DownloadVideoJob from YtManagerApp.models import Video, Subscription, VIDEO_ORDER_MAPPING from YtManagerApp.utils import first_non_null from django.conf import settings as srv_settings @@ -51,7 +51,7 @@ def downloader_process_subscription(sub: Subscription): # enqueue download for video in videos_to_download: log.info('Enqueuing video %d [%s %s] index=%d', video.id, video.video_id, video.name, video.playlist_index) - schedule_download_video(video) + DownloadVideoJob.schedule(video) log.info('Finished processing subscription %d [%s %s]', sub.id, sub.playlist_id, sub.id) diff --git a/app/YtManagerApp/management/jobs/delete_video.py b/app/YtManagerApp/management/jobs/delete_video.py index 7320ff6..977935a 100644 --- a/app/YtManagerApp/management/jobs/delete_video.py +++ b/app/YtManagerApp/management/jobs/delete_video.py @@ -1,39 +1,46 @@ -import logging import os -from YtManagerApp import scheduler from YtManagerApp.models import Video - -log = logging.getLogger('video_downloader') +from YtManagerApp.scheduler import Job, scheduler -def delete_video(video: Video): - log.info('Deleting video %d [%s %s]', video.id, video.video_id, video.name) - count = 0 +class DeleteVideoJob(Job): + name = "DeleteVideoJob" - try: - for file in video.get_files(): - log.info("Deleting file %s", file) - count += 1 - try: - os.unlink(file) - except OSError as e: - log.error("Failed to delete file %s: Error: %s", file, e) + def __init__(self, job_execution, video: Video): + super().__init__(job_execution) + self._video = video - except OSError as e: - log.error("Failed to delete video %d [%s %s]. Error: %s", video.id, video.video_id, video.name, e) + def get_description(self): + return f"Deleting video {self._video}" - video.downloaded_path = None - video.save() + def run(self): + count = 0 - log.info('Deleted video %d successfully! (%d files) [%s %s]', video.id, count, video.video_id, video.name) + try: + for file in self._video.get_files(): + self.log.info("Deleting file %s", file) + count += 1 + try: + os.unlink(file) + except OSError as e: + self.log.error("Failed to delete file %s: Error: %s", file, e) + except OSError as e: + self.log.error("Failed to delete video %d [%s %s]. Error: %s", self._video.id, + self._video.video_id, self._video.name, e) -def schedule_delete_video(video: Video): - """ - Schedules a download video job to run immediately. - :param video: - :return: - """ - job = scheduler.scheduler.add_job(delete_video, args=[video]) - log.info('Scheduled delete video job video=(%s), job=%s', video, job.id) + self._video.downloaded_path = None + self._video.save() + + self.log.info('Deleted video %d successfully! (%d files) [%s %s]', self._video.id, count, + self._video.video_id, self._video.name) + + @staticmethod + def schedule(video: Video): + """ + Schedules a delete video job to run immediately. + :param video: + :return: + """ + scheduler.add_job(DeleteVideoJob, args=[video]) diff --git a/app/YtManagerApp/management/jobs/download_video.py b/app/YtManagerApp/management/jobs/download_video.py index 96f9cd3..d507c09 100644 --- a/app/YtManagerApp/management/jobs/download_video.py +++ b/app/YtManagerApp/management/jobs/download_video.py @@ -1,4 +1,3 @@ -import logging import os import re from string import Template @@ -6,128 +5,129 @@ from threading import Lock import youtube_dl -from YtManagerApp import scheduler from YtManagerApp.models import Video - -log = logging.getLogger('video_downloader') -log_youtube_dl = log.getChild('youtube_dl') - -_lock = Lock() +from YtManagerApp.scheduler import Job, scheduler -def __get_valid_path(path): - """ - Normalizes string, converts to lowercase, removes non-alpha characters, - and converts spaces to hyphens. - """ - import unicodedata - value = unicodedata.normalize('NFKD', path).encode('ascii', 'ignore').decode('ascii') - value = re.sub('[:"*]', '', value).strip() - value = re.sub('[?<>|]', '#', value) - return value +class DownloadVideoJob(Job): + name = "DownloadVideoJob" + __lock = Lock() + def __init__(self, job_execution, video: Video, attempt: int = 1): + super().__init__(job_execution) + self.__video = video + self.__attempt = attempt + self.__log_youtube_dl = self.log.getChild('youtube_dl') -def __build_template_dict(video: Video): - return { - 'channel': video.subscription.channel_name, - 'channel_id': video.subscription.channel_id, - 'playlist': video.subscription.name, - 'playlist_id': video.subscription.playlist_id, - 'playlist_index': "{:03d}".format(1 + video.playlist_index), - 'title': video.name, - 'id': video.video_id, - } + def get_description(self): + ret = "Downloading video " + self.__video.name + if self.__attempt > 1: + ret += f" (attempt {self.__attempt})" + return ret + def run(self): + # Issue: if multiple videos are downloaded at the same time, a race condition appears in the mkdirs() call that + # youtube-dl makes, which causes it to fail with the error 'Cannot create folder - file already exists'. + # For now, allow a single download instance. + self.__lock.acquire() -def __build_youtube_dl_params(video: Video): + try: + user = self.__video.subscription.user + max_attempts = user.preferences['max_download_attempts'] - sub = video.subscription - user = sub.user + youtube_dl_params, output_path = self.__build_youtube_dl_params(self.__video) + with youtube_dl.YoutubeDL(youtube_dl_params) as yt: + ret = yt.download(["https://www.youtube.com/watch?v=" + self.__video.video_id]) - # resolve path - download_path = user.preferences['download_path'] + self.log.info('Download finished with code %d', ret) - template_dict = __build_template_dict(video) - output_pattern = Template(user.preferences['download_file_pattern']).safe_substitute(template_dict) + if ret == 0: + self.__video.downloaded_path = output_path + self.__video.save() + self.log.info('Video %d [%s %s] downloaded successfully!', self.__video.id, self.__video.video_id, self.__video.name) - output_path = os.path.join(download_path, output_pattern) - output_path = os.path.normpath(output_path) + elif self.__attempt <= max_attempts: + self.log.warning('Re-enqueueing video (attempt %d/%d)', self.__attempt, max_attempts) + DownloadVideoJob.schedule(self.__video, self.__attempt + 1) - youtube_dl_params = { - 'logger': log_youtube_dl, - 'format': user.preferences['download_format'], - 'outtmpl': output_path, - 'writethumbnail': True, - 'writedescription': True, - 'writesubtitles': user.preferences['download_subtitles'], - 'writeautomaticsub': user.preferences['download_autogenerated_subtitles'], - 'allsubtitles': user.preferences['download_subtitles_all'], - 'postprocessors': [ - { - 'key': 'FFmpegMetadata' - }, - ] - } + else: + self.log.error('Multiple attempts to download video %d [%s %s] failed!', self.__video.id, self.__video.video_id, + self.__video.name) + self.__video.downloaded_path = '' + self.__video.save() - sub_langs = user.preferences['download_subtitles_langs'].split(',') - sub_langs = [i.strip() for i in sub_langs] - if len(sub_langs) > 0: - youtube_dl_params['subtitleslangs'] = sub_langs + finally: + self.__lock.release() - sub_format = user.preferences['download_subtitles_format'] - if len(sub_format) > 0: - youtube_dl_params['subtitlesformat'] = sub_format + def __build_youtube_dl_params(self, video: Video): - return youtube_dl_params, output_path + sub = video.subscription + user = sub.user + # resolve path + download_path = user.preferences['download_path'] -def download_video(video: Video, attempt: int = 1): + template_dict = self.__build_template_dict(video) + output_pattern = Template(user.preferences['download_file_pattern']).safe_substitute(template_dict) - user = video.subscription.user + output_path = os.path.join(download_path, output_pattern) + output_path = os.path.normpath(output_path) - log.info('Downloading video %d [%s %s]', video.id, video.video_id, video.name) + youtube_dl_params = { + 'logger': self.__log_youtube_dl, + 'format': user.preferences['download_format'], + 'outtmpl': output_path, + 'writethumbnail': True, + 'writedescription': True, + 'writesubtitles': user.preferences['download_subtitles'], + 'writeautomaticsub': user.preferences['download_autogenerated_subtitles'], + 'allsubtitles': user.preferences['download_subtitles_all'], + 'postprocessors': [ + { + 'key': 'FFmpegMetadata' + }, + ] + } - # Issue: if multiple videos are downloaded at the same time, a race condition appears in the mkdirs() call that - # youtube-dl makes, which causes it to fail with the error 'Cannot create folder - file already exists'. - # For now, allow a single download instance. - _lock.acquire() + sub_langs = user.preferences['download_subtitles_langs'].split(',') + sub_langs = [i.strip() for i in sub_langs] + if len(sub_langs) > 0: + youtube_dl_params['subtitleslangs'] = sub_langs - try: - max_attempts = user.preferences['max_download_attempts'] + sub_format = user.preferences['download_subtitles_format'] + if len(sub_format) > 0: + youtube_dl_params['subtitlesformat'] = sub_format - youtube_dl_params, output_path = __build_youtube_dl_params(video) - with youtube_dl.YoutubeDL(youtube_dl_params) as yt: - ret = yt.download(["https://www.youtube.com/watch?v=" + video.video_id]) + return youtube_dl_params, output_path - log.info('Download finished with code %d', ret) + def __build_template_dict(self, video: Video): + return { + 'channel': video.subscription.channel_name, + 'channel_id': video.subscription.channel_id, + 'playlist': video.subscription.name, + 'playlist_id': video.subscription.playlist_id, + 'playlist_index': "{:03d}".format(1 + video.playlist_index), + 'title': video.name, + 'id': video.video_id, + } - if ret == 0: - video.downloaded_path = output_path - video.save() - log.info('Video %d [%s %s] downloaded successfully!', video.id, video.video_id, video.name) + def __get_valid_path(self, path): + """ + Normalizes string, converts to lowercase, removes non-alpha characters, + and converts spaces to hyphens. + """ + import unicodedata + value = unicodedata.normalize('NFKD', path).encode('ascii', 'ignore').decode('ascii') + value = re.sub('[:"*]', '', value).strip() + value = re.sub('[?<>|]', '#', value) + return value - elif attempt <= max_attempts: - log.warning('Re-enqueueing video (attempt %d/%d)', attempt, max_attempts) - __schedule_download_video(video, attempt + 1) - - else: - log.error('Multiple attempts to download video %d [%s %s] failed!', video.id, video.video_id, video.name) - video.downloaded_path = '' - video.save() - - finally: - _lock.release() - - -def __schedule_download_video(video: Video, attempt=1): - job = scheduler.scheduler.add_job(download_video, args=[video, attempt]) - log.info('Scheduled download video job video=(%s), attempt=%d, job=%s', video, attempt, job.id) - - -def schedule_download_video(video: Video): - """ - Schedules a download video job to run immediately. - :param video: - :return: - """ - __schedule_download_video(video) + @staticmethod + def schedule(video: Video, attempt: int = 1): + """ + Schedules to download video immediately + :param video: + :param attempt: + :return: + """ + scheduler.add_job(DownloadVideoJob, args=[video, attempt]) diff --git a/app/YtManagerApp/management/jobs/synchronize.py b/app/YtManagerApp/management/jobs/synchronize.py index 303601e..a550c6c 100644 --- a/app/YtManagerApp/management/jobs/synchronize.py +++ b/app/YtManagerApp/management/jobs/synchronize.py @@ -1,211 +1,177 @@ import errno -import mimetypes +import itertools from threading import Lock from apscheduler.triggers.cron import CronTrigger -from YtManagerApp.management.notification_manager import OPERATION_ID_SYNCHRONIZE -from YtManagerApp.scheduler import scheduler from YtManagerApp.management.appconfig import appconfig -from YtManagerApp.management.downloader import fetch_thumbnail, downloader_process_all, downloader_process_subscription +from YtManagerApp.management.downloader import fetch_thumbnail, downloader_process_subscription from YtManagerApp.models import * +from YtManagerApp.scheduler import scheduler, Job from YtManagerApp.utils import youtube - -from YtManagerApp.management import notification_manager - -log = logging.getLogger('sync') -__lock = Lock() +from external.pytaw.pytaw.utils import iterate_chunks _ENABLE_UPDATE_STATS = True -def __check_new_videos_sub(subscription: Subscription, yt_api: youtube.YoutubeAPI, progress_callback=None): - # Get list of videos - for item in yt_api.playlist_items(subscription.playlist_id): - results = Video.objects.filter(video_id=item.resource_video_id, subscription=subscription) - if len(results) == 0: - log.info('New video for subscription %s: %s %s"', subscription, item.resource_video_id, item.title) - Video.create(item, subscription) +class SynchronizeJob(Job): + name = "SynchronizeJob" + __lock = Lock() + running = False + __global_sync_job = None - if _ENABLE_UPDATE_STATS: - all_vids = Video.objects.filter(subscription=subscription) - all_vids_ids = [video.video_id for video in all_vids] - all_vids_dict = {v.video_id: v for v in all_vids} + def __init__(self, job_execution, subscription: Optional[Subscription] = None): + super().__init__(job_execution) + self.__subscription = subscription + self.__api = youtube.YoutubeAPI.build_public() + self.__new_vids = [] - for yt_video in yt_api.videos(all_vids_ids, part='id,statistics'): - video = all_vids_dict.get(yt_video.id) + def get_description(self): + if self.__subscription is not None: + return "Running synchronization for subscription " + self.__subscription.name + return "Running synchronization..." - if yt_video.n_likes is not None \ - and yt_video.n_dislikes is not None \ - and yt_video.n_likes + yt_video.n_dislikes > 0: - video.rating = yt_video.n_likes / (yt_video.n_likes + yt_video.n_dislikes) + def get_subscription_list(self): + if self.__subscription is not None: + return [self.__subscription] + return Subscription.objects.all() - video.views = yt_video.n_views - video.save() + def get_videos_list(self, subs): + return Video.objects.filter(subscription__in=subs) - -def __detect_deleted(subscription: Subscription): - - user = subscription.user - - for video in Video.objects.filter(subscription=subscription, downloaded_path__isnull=False): - found_video = False - files = [] + def run(self): + self.__lock.acquire(blocking=True) + SynchronizeJob.running = True try: - files = list(video.get_files()) - except OSError as e: - if e.errno != errno.ENOENT: - log.error("Could not access path %s. Error: %s", video.downloaded_path, e) - return + self.log.info(self.get_description()) - # Try to find a valid video file - for file in files: - mime, _ = mimetypes.guess_type(file) - if mime is not None and mime.startswith("video"): - found_video = True + # Build list of work items + work_subs = self.get_subscription_list() + work_vids = self.get_videos_list(work_subs) - # Video not found, we can safely assume that the video was deleted. - if not found_video: - log.info("Video %d was deleted! [%s %s]", video.id, video.video_id, video.name) - # Clean up + self.set_total_steps(len(work_subs) + len(work_vids)) + + # Process subscriptions + for sub in work_subs: + self.progress_advance(1, "Synchronizing subscription " + sub.name) + self.check_new_videos(sub) + self.fetch_missing_thumbnails(sub) + + # Add new videos to progress calculation + self.set_total_steps(len(work_subs) + len(work_vids) + len(self.__new_vids)) + + # Process videos + all_videos = itertools.chain(work_vids, self.__new_vids) + for batch in iterate_chunks(all_videos, 50): + video_stats = {} + + if _ENABLE_UPDATE_STATS: + batch_ids = [video.video_id for video in batch] + video_stats = {v.id: v for v in self.__api.videos(batch_ids, part='id,statistics')} + + for video in itertools.chain(work_vids, self.__new_vids): + self.progress_advance(1, "Updating video " + video.name) + self.check_video_deleted(video) + self.fetch_missing_thumbnails(video) + + if video.video_id in video_stats: + self.update_video_stats(video, video_stats[video.video_id]) + + # Start downloading videos + for sub in work_subs: + downloader_process_subscription(sub) + + finally: + SynchronizeJob.running = False + self.__lock.release() + + def check_new_videos(self, sub: Subscription): + playlist_items = self.__api.playlist_items(sub.playlist_id) + + for item in playlist_items: + results = Video.objects.filter(video_id=item.resource_video_id, subscription=sub) + + if len(results) == 0: + self.log.info('New video for subscription %s: %s %s"', sub, item.resource_video_id, item.title) + self.__new_vids.append(Video.create(item, sub)) + + def fetch_missing_thumbnails(self, object: Union[Subscription, Video]): + if isinstance(object, Subscription): + object_type = "sub" + object_id = object.playlist_id + else: + object_type = "video" + object_id = object.video_id + + if object.icon_default.startswith("http"): + object.icon_default = fetch_thumbnail(object.icon_default, object_type, object_id, 'default') + object.save() + + if object.icon_best.startswith("http"): + object.icon_best = fetch_thumbnail(object.icon_best, object_type, object_id, 'best') + object.save() + + def check_video_deleted(self, video: Video): + if video.downloaded_path is not None: + files = [] + try: + files = list(video.get_files()) + except OSError as e: + if e.errno != errno.ENOENT: + self.log.error("Could not access path %s. Error: %s", video.downloaded_path, e) + self.usr_err(f"Could not access path {video.downloaded_path}: {e}", suppress_notification=True) + return + + # Try to find a valid video file + found_video = False for file in files: - try: - os.unlink(file) - except OSError as e: - log.error("Could not delete redundant file %s. Error: %s", file, e) - video.downloaded_path = None + mime, _ = mimetypes.guess_type(file) + if mime is not None and mime.startswith("video"): + found_video = True - # Mark watched? - if user.preferences['mark_deleted_as_watched']: - video.watched = True + # Video not found, we can safely assume that the video was deleted. + if not found_video: + self.log.info("Video %d was deleted! [%s %s]", video.id, video.video_id, video.name) + # Clean up + for file in files: + try: + os.unlink(file) + except OSError as e: + self.log.error("Could not delete redundant file %s. Error: %s", file, e) + self.usr_err(f"Could not delete redundant file {file}: {e}", suppress_notification=True) + video.downloaded_path = None - video.save() + # Mark watched? + user = video.subscription.user + if user.preferences['mark_deleted_as_watched']: + video.watched = True + video.save() -def __fetch_thumbnails_obj(iterable, obj_type, id_attr): - for obj in iterable: - if obj.icon_default.startswith("http"): - obj.icon_default = fetch_thumbnail(obj.icon_default, obj_type, getattr(obj, id_attr), 'default') - if obj.icon_best.startswith("http"): - obj.icon_best = fetch_thumbnail(obj.icon_best, obj_type, getattr(obj, id_attr), 'best') - obj.save() + def update_video_stats(self, video: Video, yt_video): + if yt_video.n_likes is not None \ + and yt_video.n_dislikes is not None \ + and yt_video.n_likes + yt_video.n_dislikes > 0: + video.rating = yt_video.n_likes / (yt_video.n_likes + yt_video.n_dislikes) + video.views = yt_video.n_views + video.save() -def __fetch_thumbnails(): - log.info("Fetching subscription thumbnails... ") - __fetch_thumbnails_obj(Subscription.objects.filter(icon_default__istartswith='http'), 'sub', 'playlist_id') - __fetch_thumbnails_obj(Subscription.objects.filter(icon_best__istartswith='http'), 'sub', 'playlist_id') - - log.info("Fetching video thumbnails... ") - __fetch_thumbnails_obj(Video.objects.filter(icon_default__istartswith='http'), 'video', 'video_id') - __fetch_thumbnails_obj(Video.objects.filter(icon_best__istartswith='http'), 'video', 'video_id') - - -def __compute_progress(stage, stage_count, items, total_items): - stage_percent = float(stage) / stage_count - - -def synchronize(): - if not __lock.acquire(blocking=False): - # Synchronize already running in another thread - log.info("Synchronize already running in another thread") - return - - try: - log.info("Running scheduled synchronization... ") - notification_manager.notify_status_operation_progress( - OPERATION_ID_SYNCHRONIZE, - 'Running scheduled synchronization: checking for new videos...', - 0.1, - None - ) - - # Sync subscribed playlists/channels - log.info("Sync - checking videos") - yt_api = youtube.YoutubeAPI.build_public() - for subscription in Subscription.objects.all(): - __check_new_videos_sub(subscription, yt_api) - __detect_deleted(subscription) - - notification_manager.notify_status_operation_progress( - OPERATION_ID_SYNCHRONIZE, - 'Running scheduled synchronization: enqueueing videos to download...', - 0.5, - None - ) - - log.info("Sync - checking for videos to download") - downloader_process_all() - - notification_manager.notify_status_operation_progress( - OPERATION_ID_SYNCHRONIZE, - 'Running scheduled synchronization: fetching thumbnails...', - 0.7, - None - ) - - log.info("Sync - fetching missing thumbnails") - __fetch_thumbnails() - - log.info("Synchronization finished.") - notification_manager.notify_status_operation_ended( - OPERATION_ID_SYNCHRONIZE, - 'Synchronization finished.', - None - ) - - finally: - __lock.release() - - -def synchronize_subscription(subscription: Subscription): - __lock.acquire() - try: - log.info("Running synchronization for single subscription %d [%s]", subscription.id, subscription.name) - notification_manager.notify_status_update(f'Synchronization started for subscription {subscription.name}.') - - yt_api = youtube.YoutubeAPI.build_public() - - log.info("Sync - checking videos") - __check_new_videos_sub(subscription, yt_api) - __detect_deleted(subscription) - - log.info("Sync - checking for videos to download") - downloader_process_subscription(subscription) - - log.info("Sync - fetching missing thumbnails") - __fetch_thumbnails() - - log.info("Synchronization finished for subscription %d [%s].", subscription.id, subscription.name) - notification_manager.notify_status_update(f'Synchronization finished for subscription {subscription.name}.') - - finally: - __lock.release() - - -__global_sync_job = None - - -def schedule_synchronize_global(): - global __global_sync_job - - trigger = CronTrigger.from_crontab(appconfig.sync_schedule) - - if __global_sync_job is None: + @staticmethod + def schedule_global_job(): trigger = CronTrigger.from_crontab(appconfig.sync_schedule) - __global_sync_job = scheduler.add_job(synchronize, trigger, max_instances=1, coalesce=True) - else: - __global_sync_job.reschedule(trigger, max_instances=1, coalesce=True) + if SynchronizeJob.__global_sync_job is None: + trigger = CronTrigger.from_crontab(appconfig.sync_schedule) + SynchronizeJob.__global_sync_job = scheduler.add_job(SynchronizeJob, trigger, max_instances=1, coalesce=True) - log.info('Scheduled synchronize job job=%s', __global_sync_job.id) + else: + SynchronizeJob.__global_sync_job.reschedule(trigger, max_instances=1, coalesce=True) + @staticmethod + def schedule_now(): + scheduler.add_job(SynchronizeJob, max_instances=1, coalesce=True) -def schedule_synchronize_now(): - job = scheduler.add_job(synchronize, max_instances=1, coalesce=True) - log.info('Scheduled synchronize now job job=%s', job.id) - - -def schedule_synchronize_now_subscription(subscription: Subscription): - job = scheduler.add_job(synchronize_subscription, args=[subscription]) - log.info('Scheduled synchronize subscription job subscription=(%s), job=%s', subscription, job.id) + @staticmethod + def schedule_now_for_subscription(subscription): + scheduler.add_job(SynchronizeJob, user=subscription.user, args=[subscription]) diff --git a/app/YtManagerApp/management/notification_manager.py b/app/YtManagerApp/management/notification_manager.py deleted file mode 100644 index c95409c..0000000 --- a/app/YtManagerApp/management/notification_manager.py +++ /dev/null @@ -1,95 +0,0 @@ -from django.contrib.auth.models import User -from typing import Dict, Deque, Any, Optional -from collections import deque -from datetime import datetime, timedelta -from YtManagerApp.utils.algorithms import bisect_left -from threading import Lock - -# Clients will request updates at most every few seconds, so a retention period of 60 seconds should be more than -# enough. I gave it 15 minutes so that if for some reason the connection fails (internet drops) and then comes back a -# few minutes later, the client will still get the updates -__RETENTION_PERIOD = 15 * 60 -__NOTIFICATIONS: Deque[Dict] = deque() -__NEXT_ID = 0 -__LOCK = Lock() - - -OPERATION_ID_SYNCHRONIZE = 1 - -# Messages enum -class Messages: - STATUS_UPDATE = 'st-up' - STATUS_OPERATION_PROGRESS = 'st-op-prog' - STATUS_OPERATION_END = 'st-op-end' - - -def __add_notification(message, user: User=None, **kwargs): - global __NEXT_ID - - __LOCK.acquire() - - try: - # add notification - notification = { - 'time': datetime.now(), - 'msg': message, - 'id': __NEXT_ID, - 'uid': user and user.id, - } - notification.update(kwargs) - __NOTIFICATIONS.append(notification) - __NEXT_ID += 1 - - # trim old notifications - oldest = __NOTIFICATIONS[0] - while len(__NOTIFICATIONS) > 0 and oldest['time'] + timedelta(seconds=__RETENTION_PERIOD) < datetime.now(): - __NOTIFICATIONS.popleft() - oldest = __NOTIFICATIONS[0] - - finally: - __LOCK.release() - - -def get_notifications(user: User, last_received_id: Optional[int]): - - __LOCK.acquire() - - try: - first_index = 0 - if last_received_id is not None: - first_index = bisect_left(__NOTIFICATIONS, - {'id': last_received_id}, - key=lambda item: item['id']) - - for i in range(first_index, len(__NOTIFICATIONS)): - item = __NOTIFICATIONS[i] - if item['uid'] is None or item['uid'] == user.id: - yield item - - finally: - __LOCK.release() - - -def get_current_notification_id(): - return __NEXT_ID - - -def notify_status_update(status_message: str, user: User=None): - __add_notification(Messages.STATUS_UPDATE, - user=user, - status=status_message) - - -def notify_status_operation_progress(op_id: Any, status_message: str, progress_percent: float, user: User=None): - __add_notification(Messages.STATUS_OPERATION_PROGRESS, - user=user, - operation=op_id, - status=status_message, - progress=progress_percent) - - -def notify_status_operation_ended(op_id: Any, status_message: str, user: User=None): - __add_notification(Messages.STATUS_OPERATION_END, - user=user, - operation=op_id, - status=status_message) diff --git a/app/YtManagerApp/migrations/0009_jobexecution_jobmessage.py b/app/YtManagerApp/migrations/0009_jobexecution_jobmessage.py new file mode 100644 index 0000000..098d54c --- /dev/null +++ b/app/YtManagerApp/migrations/0009_jobexecution_jobmessage.py @@ -0,0 +1,39 @@ +# Generated by Django 2.1.7 on 2019-04-08 15:26 + +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ('YtManagerApp', '0008_auto_20181229_2035'), + ] + + operations = [ + migrations.CreateModel( + name='JobExecution', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('start_date', models.DateTimeField(auto_now=True)), + ('end_date', models.DateTimeField(null=True)), + ('description', models.CharField(default='', max_length=250)), + ('status', models.IntegerField(choices=[('running', 0), ('finished', 1), ('failed', 2), ('interrupted', 3)], default=0)), + ('user', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)), + ], + ), + migrations.CreateModel( + name='JobMessage', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('timestamp', models.DateTimeField(auto_now=True)), + ('progress', models.FloatField(null=True)), + ('message', models.CharField(default='', max_length=1024)), + ('level', models.IntegerField(choices=[('normal', 0), ('warning', 1), ('error', 2)], default=0)), + ('suppress_notification', models.BooleanField(default=False)), + ('job', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='YtManagerApp.JobExecution')), + ], + ), + ] diff --git a/app/YtManagerApp/models.py b/app/YtManagerApp/models.py index 08a920c..2d0e771 100644 --- a/app/YtManagerApp/models.py +++ b/app/YtManagerApp/models.py @@ -199,18 +199,18 @@ class Video(models.Model): self.save() if self.downloaded_path is not None: from YtManagerApp.management.appconfig import appconfig - from YtManagerApp.management.jobs.delete_video import schedule_delete_video - from YtManagerApp.management.jobs.synchronize import schedule_synchronize_now_subscription + from YtManagerApp.management.jobs.delete_video import DeleteVideoJob + from YtManagerApp.management.jobs.synchronize import SynchronizeJob if appconfig.for_sub(self.subscription, 'automatically_delete_watched'): - schedule_delete_video(self) - schedule_synchronize_now_subscription(self.subscription) + DeleteVideoJob.schedule(self) + SynchronizeJob.schedule_now_for_subscription(self.subscription) def mark_unwatched(self): - from YtManagerApp.management.jobs.synchronize import schedule_synchronize_now_subscription + from YtManagerApp.management.jobs.synchronize import SynchronizeJob self.watched = False self.save() - schedule_synchronize_now_subscription(self.subscription) + SynchronizeJob.schedule_now_for_subscription(self.subscription) def get_files(self): if self.downloaded_path is not None: @@ -255,3 +255,47 @@ class Video(models.Model): def __repr__(self): return f'video {self.id}, video_id="{self.video_id}"' + + +JOB_STATES = [ + ('running', 0), + ('finished', 1), + ('failed', 2), + ('interrupted', 3), +] + +JOB_STATES_MAP = { + 'running': 0, + 'finished': 1, + 'failed': 2, + 'interrupted': 3, +} + +JOB_MESSAGE_LEVELS = [ + ('normal', 0), + ('warning', 1), + ('error', 2), +] +JOB_MESSAGE_LEVELS_MAP = { + 'normal': 0, + 'warning': 1, + 'error': 2, +} + + +class JobExecution(models.Model): + start_date = models.DateTimeField(auto_now=True, null=False) + end_date = models.DateTimeField(null=True) + user = models.ForeignKey(User, on_delete=models.CASCADE, null=True) + description = models.CharField(max_length=250, null=False, default="") + status = models.IntegerField(choices=JOB_STATES, null=False, default=0) + + +class JobMessage(models.Model): + timestamp = models.DateTimeField(auto_now=True, null=False) + job = models.ForeignKey(JobExecution, null=False, on_delete=models.CASCADE) + progress = models.FloatField(null=True) + message = models.CharField(max_length=1024, null=False, default="") + level = models.IntegerField(choices=JOB_MESSAGE_LEVELS, null=False, default=0) + suppress_notification = models.BooleanField(null=False, default=False) + diff --git a/app/YtManagerApp/scheduler.py b/app/YtManagerApp/scheduler.py index c103700..83e7d33 100644 --- a/app/YtManagerApp/scheduler.py +++ b/app/YtManagerApp/scheduler.py @@ -1,27 +1,267 @@ +import datetime import logging +import traceback +from typing import Type, Union, Optional, Callable, List, Any from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.base import BaseTrigger +from django.contrib.auth.models import User from YtManagerApp.management.appconfig import appconfig - -scheduler = BackgroundScheduler() +from YtManagerApp.models import JobExecution, JobMessage, JOB_STATES_MAP, JOB_MESSAGE_LEVELS_MAP -def initialize_scheduler(): +class ProgressTracker(object): + """ + Class which helps keep track of complex operation progress. + """ - if scheduler.running: - return + def __init__(self, total_steps: float = 100, initial_steps: float = 0, + listener: Callable[[float, str], None] = None, + listener_args: List[Any] = None, + parent: Optional["ProgressTracker"] = None): + """ + Constructor + :param total_steps: Total number of steps required by this operation + :param initial_steps: Starting steps + :param parent: Parent progress tracker + :param listener: Callable which is called when any progress happens + """ - logger = logging.getLogger('scheduler') - executors = { - 'default': { - 'type': 'threadpool', - 'max_workers': appconfig.concurrency + self.total_steps = total_steps + self.steps = initial_steps + + self.__subtask: ProgressTracker = None + self.__subtask_steps = 0 + + self.__parent = parent + self.__listener = listener + self.__listener_args = listener_args or [] + + def __on_progress(self, progress_msg): + if self.__listener is not None: + self.__listener(*self.__listener_args, self.compute_progress(), progress_msg) + + if self.__parent is not None: + self.__parent.__on_progress(progress_msg) + + def advance(self, steps: float = 1, progress_msg: str = ''): + """ + Advances a number of steps. + :param steps: Number of steps to advance + :param progress_msg: A message which will be passed to a listener + :return: + """ + + # We can assume previous subtask is now completed + if self.__subtask is not None: + self.steps += self.__subtask_steps + self.__subtask = None + + self.steps += steps + self.__on_progress(progress_msg) + + def subtask(self, steps: float = 1, subtask_total_steps: float = 100, subtask_initial_steps: float = 0): + """ + Creates a 'subtask' which has its own progress, which will be used in the calculation of the final progress. + :param steps: Number of steps the subtask is 'worth' + :param subtask_total_steps: Total number of steps for subtask + :param subtask_initial_steps: Initial steps for subtask + :return: ProgressTracker for subtask + """ + + # We can assume previous subtask is now completed + if self.__subtask is not None: + self.steps += self.__subtask_steps + + self.__subtask = ProgressTracker(total_steps=subtask_total_steps, + initial_steps=subtask_initial_steps, + parent=self) + self.__subtask_steps = steps + + return self.__subtask + + def compute_progress(self): + """ + Calculates final progress value in percent. + :return: value in [0,1] interval representing progress + """ + base = float(self.steps) / self.total_steps + if self.__subtask is not None: + base += self.__subtask.compute_progress() * self.__subtask_steps / self.total_steps + + return min(base, 1.0) + + +class Job(object): + name = 'GenericJob' + + """ + Base class for jobs running in the scheduler. + """ + + def __init__(self, job_execution, *args): + self.job_execution = job_execution + self.log = logging.getLogger(self.name) + self.__progress_tracker = ProgressTracker(listener=Job.__on_progress, + listener_args=[self]) + + def get_description(self) -> str: + """ + Gets a user friendly description of this job. + Should be overriden in job classes. + :return: + """ + return "Running job..." + + # + # progress tracking + # + + def __on_progress(self, percent: float, message: str): + self.usr_log(message, progress=percent) + + def set_total_steps(self, steps: float): + """ + Sets the total number of work steps this task has. This is used for tracking progress. + Should be overriden in job classes. + :return: + """ + self.__progress_tracker.total_steps = steps + + def progress_advance(self, steps: float = 1, progress_msg: str = ''): + """ + Advances a number of steps. + :param steps: Number of steps to advance + :param progress_msg: A message which will be passed to a listener + :return: + """ + self.__progress_tracker.advance(steps, progress_msg) + + def create_subtask(self, steps: float = 1, subtask_total_steps: float = 100, subtask_initial_steps: float = 0): + """ + Creates a 'subtask' which has its own progress, which will be used in the calculation of the final progress. + :param steps: Number of steps the subtask is 'worth' + :param subtask_total_steps: Total number of steps for subtask + :param subtask_initial_steps: Initial steps for subtask + :return: ProgressTracker for subtask + """ + return self.__progress_tracker.subtask(steps, subtask_total_steps, subtask_initial_steps) + + # + # user log messages + # + + def usr_log(self, message, progress: Optional[float] = None, level: int = JOB_MESSAGE_LEVELS_MAP['normal'], + suppress_notification: bool = False): + """ + Creates a new log message which will be shown on the user interface. + Progress can also be updated using this method. + :param message: A message to be displayed to the user + :param progress: Progress percentage in [0,1] interval + :param level: Log level (normal, warning, error) + :param suppress_notification: If set to true, a notification will not displayed to the user, but it will + appear in the system logs. + :return: + """ + + message = JobMessage(job=self.job_execution, + progress=progress, + message=message, + level=level, + suppress_notification=suppress_notification) + message.save() + + def usr_warn(self, message, progress: Optional[float] = None, suppress_notification: bool = False): + """ + Creates a new warning message which will be shown on the user interface. + Progress can also be updated using this method. + :param message: A message to be displayed to the user + :param progress: Progress percentage in [0,1] interval + :param suppress_notification: If set to true, a notification will not displayed to the user, but it will + appear in the system logs. + :return: + """ + self.usr_log(message, progress, JOB_MESSAGE_LEVELS_MAP['warning'], suppress_notification) + + def usr_err(self, message, progress: Optional[float] = None, suppress_notification: bool = False): + """ + Creates a new error message which will be shown on the user interface. + Progress can also be updated using this method. + :param message: A message to be displayed to the user + :param progress: Progress percentage in [0,1] interval + :param suppress_notification: If set to true, a notification will not displayed to the user, but it will + appear in the system logs. + :return: + """ + self.usr_log(message, progress, JOB_MESSAGE_LEVELS_MAP['error'], suppress_notification) + + # + # main run method + # + def run(self): + pass + + +class YtsmScheduler(object): + + def __init__(self): + self._apscheduler = BackgroundScheduler() + + def initialize(self): + # set state of existing jobs as "interrupted" + JobExecution.objects\ + .filter(status=JOB_STATES_MAP['running'])\ + .update(status=JOB_STATES_MAP['interrupted']) + + self._configure_scheduler() + self._apscheduler.start() + + def _configure_scheduler(self): + logger = logging.getLogger('scheduler') + executors = { + 'default': { + 'type': 'threadpool', + 'max_workers': appconfig.concurrency + } } - } - job_defaults = { - 'misfire_grace_time': 60 * 60 * 24 * 365 # 1 year - } + job_defaults = { + 'misfire_grace_time': 60 * 60 * 24 * 365 # 1 year + } + self._apscheduler.configure(logger=logger, executors=executors, job_defaults=job_defaults) - scheduler.configure(logger=logger, executors=executors, job_defaults=job_defaults) - scheduler.start() + def _run_job(self, job_class: Type[Job], user: Optional[User], args: Union[tuple, list]): + + job_execution = JobExecution(user=user, status=JOB_STATES_MAP['running']) + job_execution.save() + job_instance = job_class(job_execution, *args) + + # update description + job_execution.description = job_instance.get_description() + job_execution.save() + + try: + job_instance.run() + job_execution.status = JOB_STATES_MAP['finished'] + + except Exception as ex: + job_instance.log.critical("Job failed with exception: %s", traceback.format_exc()) + job_instance.usr_err(job_instance.name + " operation failed: " + str(ex)) + job_execution.status = JOB_STATES_MAP['failed'] + + finally: + job_execution.end_date = datetime.datetime.now() + job_execution.save() + + def add_job(self, job_class: Type[Job], trigger: Union[str, BaseTrigger] = None, + args: Union[list, tuple] = None, + user: Optional[User] = None, + **kwargs): + if args is None: + args = [] + + return self._apscheduler.add_job(YtsmScheduler._run_job, trigger=trigger, args=[self, job_class, user, args], + **kwargs) + + +scheduler = YtsmScheduler() diff --git a/app/YtManagerApp/static/YtManagerApp/css/style.css b/app/YtManagerApp/static/YtManagerApp/css/style.css index 0799bd1..2d6215a 100644 --- a/app/YtManagerApp/static/YtManagerApp/css/style.css +++ b/app/YtManagerApp/static/YtManagerApp/css/style.css @@ -129,4 +129,10 @@ .status-timestamp { margin-right: 0.25rem; } +.dropdown-jobs { + min-width: 25rem; } + .dropdown-jobs .dropdown-item p { + margin: 0; + line-height: normal; } + /*# sourceMappingURL=style.css.map */ diff --git a/app/YtManagerApp/static/YtManagerApp/css/style.css.map b/app/YtManagerApp/static/YtManagerApp/css/style.css.map index 74b7743..932f8e0 100644 --- a/app/YtManagerApp/static/YtManagerApp/css/style.css.map +++ b/app/YtManagerApp/static/YtManagerApp/css/style.css.map @@ -1,6 +1,6 @@ { "version": 3, -"mappings": "AAEA,UAAW;EACP,aAAa,EAAE,IAAI;EACnB,UAAU,EAAE,CAAC;;AAGjB,YAAa;EACT,QAAQ,EAAE,KAAK;EACf,IAAI,EAAE,CAAC;EACP,KAAK,EAAE,CAAC;EACR,MAAM,EAAE,CAAC;EACT,MAAM,EAAE,IAAI;EACZ,WAAW,EAAE,IAAI;EACjB,OAAO,EAAE,MAAM;EACf,OAAO,EAAE,IAAI;EACb,aAAa,EAAE,MAAM;EACrB,SAAS,EAAE,IAAI;;AAqBnB,uBAAuB;AACvB,kBAAmB;EAlBf,OAAO,EAAE,YAAY;EACrB,KAAK,EAAE,IAAa;EACpB,MAAM,EAAE,IAAa;EAErB,wBAAQ;IACJ,OAAO,EAAE,GAAG;IACZ,OAAO,EAAE,KAAK;IACd,KAAK,EAAE,IAAa;IACpB,MAAM,EAAE,IAAa;IACrB,MAAM,EAAE,GAAG;IACX,aAAa,EAAE,GAAG;IAClB,MAAM,EAAE,iBAAkC;IAC1C,YAAY,EAAE,uCAAmD;IACjE,SAAS,EAAE,sCAAsC;;AASzD,wBAAyB;EAtBrB,OAAO,EAAE,YAAY;EACrB,KAAK,EAAE,IAAa;EACpB,MAAM,EAAE,IAAa;EAErB,8BAAQ;IACJ,OAAO,EAAE,GAAG;IACZ,OAAO,EAAE,KAAK;IACd,KAAK,EAAE,IAAa;IACpB,MAAM,EAAE,IAAa;IACrB,MAAM,EAAE,GAAG;IACX,aAAa,EAAE,GAAG;IAClB,MAAM,EAAE,mBAAkC;IAC1C,YAAY,EAAE,uCAAmD;IACjE,SAAS,EAAE,sCAAsC;;AAazD,4BAOC;EANG,EAAG;IACC,SAAS,EAAE,YAAY;EAE3B,IAAK;IACD,SAAS,EAAE,cAAc;AAIjC,gCAAiC;EAC7B,QAAQ,EAAE,KAAK;EACf,GAAG,EAAE,GAAG;EACR,IAAI,EAAE,GAAG;EACT,UAAU,EAAE,KAAK;EACjB,WAAW,EAAE,KAAK;;AAGtB,cAAe;EACX,QAAQ,EAAE,KAAK;EAAE,oCAAoC;EACrD,OAAO,EAAE,IAAI;EAAE,uBAAuB;EACtC,KAAK,EAAE,IAAI;EAAE,uCAAuC;EACpD,MAAM,EAAE,IAAI;EAAE,wCAAwC;EACtD,GAAG,EAAE,CAAC;EACN,IAAI,EAAE,CAAC;EACP,KAAK,EAAE,CAAC;EACR,MAAM,EAAE,CAAC;EACT,gBAAgB,EAAE,kBAAe;EAAE,mCAAmC;EACtE,OAAO,EAAE,CAAC;EAAE,qFAAqF;EACjG,MAAM,EAAE,OAAO;EAAE,4BAA4B;;AAI7C,4BAAc;EACV,OAAO,EAAE,MAAM;EACf,aAAa,EAAE,KAAK;AAGpB,+BAAW;EACP,OAAO,EAAE,MAAM;AAEnB,+BAAW;EACP,SAAS,EAAE,IAAI;EACf,aAAa,EAAE,KAAK;AAExB,gCAAY;EACR,SAAS,EAAE,IAAI;EACf,aAAa,EAAE,KAAK;EAEpB,uCAAO;IACH,SAAS,EAAE,GAAG;AAGtB,iCAAa;EACT,OAAO,EAAE,YAAY;AAGzB,+BAAW;EACP,YAAY,EAAE,QAAQ;EACtB,qCAAQ;IACJ,eAAe,EAAE,IAAI;AAO7B,8BAAU;EACN,KAAK,EAAE,KAAK;AAKpB,8BAAgB;EACZ,KAAK,EAxHE,OAAO;AA0HlB,6BAAe;EACX,KAAK,EAAE,OAAO;;AAItB,WAAY;EACR,SAAS,EAAE,KAAK;EAChB,MAAM,EAAE,MAAM;;AAId,2BAAe;EACX,OAAO,EAAE,IAAI;;AAIrB,kBAAmB;EACf,MAAM,EAAE,QAAQ;EAChB,OAAO,EAAE,QAAQ;EAEjB,qBAAG;IACC,MAAM,EAAE,CAAC;;AAIjB,YAAa;EACT,OAAO,EAAE,YAAY;EACrB,aAAa,EAAE,MAAM;;AAGzB,YAAa;EACT,MAAM,EAAE,OAAO;EACf,iBAAK;IACD,OAAO,EAAE,cAAc;IACvB,SAAS,EAAE,IAAI;;AAIvB,iBAAkB;EACd,YAAY,EAAE,OAAO", +"mappings": "AAEA,UAAW;EACP,aAAa,EAAE,IAAI;EACnB,UAAU,EAAE,CAAC;;AAGjB,YAAa;EACT,QAAQ,EAAE,KAAK;EACf,IAAI,EAAE,CAAC;EACP,KAAK,EAAE,CAAC;EACR,MAAM,EAAE,CAAC;EACT,MAAM,EAAE,IAAI;EACZ,WAAW,EAAE,IAAI;EACjB,OAAO,EAAE,MAAM;EACf,OAAO,EAAE,IAAI;EACb,aAAa,EAAE,MAAM;EACrB,SAAS,EAAE,IAAI;;AAqBnB,uBAAuB;AACvB,kBAAmB;EAlBf,OAAO,EAAE,YAAY;EACrB,KAAK,EAAE,IAAa;EACpB,MAAM,EAAE,IAAa;EAErB,wBAAQ;IACJ,OAAO,EAAE,GAAG;IACZ,OAAO,EAAE,KAAK;IACd,KAAK,EAAE,IAAa;IACpB,MAAM,EAAE,IAAa;IACrB,MAAM,EAAE,GAAG;IACX,aAAa,EAAE,GAAG;IAClB,MAAM,EAAE,iBAAkC;IAC1C,YAAY,EAAE,uCAAmD;IACjE,SAAS,EAAE,sCAAsC;;AASzD,wBAAyB;EAtBrB,OAAO,EAAE,YAAY;EACrB,KAAK,EAAE,IAAa;EACpB,MAAM,EAAE,IAAa;EAErB,8BAAQ;IACJ,OAAO,EAAE,GAAG;IACZ,OAAO,EAAE,KAAK;IACd,KAAK,EAAE,IAAa;IACpB,MAAM,EAAE,IAAa;IACrB,MAAM,EAAE,GAAG;IACX,aAAa,EAAE,GAAG;IAClB,MAAM,EAAE,mBAAkC;IAC1C,YAAY,EAAE,uCAAmD;IACjE,SAAS,EAAE,sCAAsC;;AAazD,4BAOC;EANG,EAAG;IACC,SAAS,EAAE,YAAY;EAE3B,IAAK;IACD,SAAS,EAAE,cAAc;AAIjC,gCAAiC;EAC7B,QAAQ,EAAE,KAAK;EACf,GAAG,EAAE,GAAG;EACR,IAAI,EAAE,GAAG;EACT,UAAU,EAAE,KAAK;EACjB,WAAW,EAAE,KAAK;;AAGtB,cAAe;EACX,QAAQ,EAAE,KAAK;EAAE,oCAAoC;EACrD,OAAO,EAAE,IAAI;EAAE,uBAAuB;EACtC,KAAK,EAAE,IAAI;EAAE,uCAAuC;EACpD,MAAM,EAAE,IAAI;EAAE,wCAAwC;EACtD,GAAG,EAAE,CAAC;EACN,IAAI,EAAE,CAAC;EACP,KAAK,EAAE,CAAC;EACR,MAAM,EAAE,CAAC;EACT,gBAAgB,EAAE,kBAAe;EAAE,mCAAmC;EACtE,OAAO,EAAE,CAAC;EAAE,qFAAqF;EACjG,MAAM,EAAE,OAAO;EAAE,4BAA4B;;AAI7C,4BAAc;EACV,OAAO,EAAE,MAAM;EACf,aAAa,EAAE,KAAK;AAGpB,+BAAW;EACP,OAAO,EAAE,MAAM;AAEnB,+BAAW;EACP,SAAS,EAAE,IAAI;EACf,aAAa,EAAE,KAAK;AAExB,gCAAY;EACR,SAAS,EAAE,IAAI;EACf,aAAa,EAAE,KAAK;EAEpB,uCAAO;IACH,SAAS,EAAE,GAAG;AAGtB,iCAAa;EACT,OAAO,EAAE,YAAY;AAGzB,+BAAW;EACP,YAAY,EAAE,QAAQ;EACtB,qCAAQ;IACJ,eAAe,EAAE,IAAI;AAO7B,8BAAU;EACN,KAAK,EAAE,KAAK;AAKpB,8BAAgB;EACZ,KAAK,EAxHE,OAAO;AA0HlB,6BAAe;EACX,KAAK,EAAE,OAAO;;AAItB,WAAY;EACR,SAAS,EAAE,KAAK;EAChB,MAAM,EAAE,MAAM;;AAId,2BAAe;EACX,OAAO,EAAE,IAAI;;AAIrB,kBAAmB;EACf,MAAM,EAAE,QAAQ;EAChB,OAAO,EAAE,QAAQ;EAEjB,qBAAG;IACC,MAAM,EAAE,CAAC;;AAIjB,YAAa;EACT,OAAO,EAAE,YAAY;EACrB,aAAa,EAAE,MAAM;;AAGzB,YAAa;EACT,MAAM,EAAE,OAAO;EACf,iBAAK;IACD,OAAO,EAAE,cAAc;IACvB,SAAS,EAAE,IAAI;;AAIvB,iBAAkB;EACd,YAAY,EAAE,OAAO;;AAGzB,cAAe;EACX,SAAS,EAAE,KAAK;EAEhB,+BAAiB;IACb,MAAM,EAAE,CAAC;IACT,WAAW,EAAE,MAAM", "sources": ["style.scss"], "names": [], "file": "style.css" diff --git a/app/YtManagerApp/static/YtManagerApp/css/style.scss b/app/YtManagerApp/static/YtManagerApp/css/style.scss index 9c30bce..4e2e1f0 100644 --- a/app/YtManagerApp/static/YtManagerApp/css/style.scss +++ b/app/YtManagerApp/static/YtManagerApp/css/style.scss @@ -160,4 +160,13 @@ $accent-color: #007bff; .status-timestamp { margin-right: 0.25rem; +} + +.dropdown-jobs { + min-width: 25rem; + + .dropdown-item p { + margin: 0; + line-height: normal; + } } \ No newline at end of file diff --git a/app/YtManagerApp/templates/YtManagerApp/index.html b/app/YtManagerApp/templates/YtManagerApp/index.html index 08c2c1e..46ef6d9 100644 --- a/app/YtManagerApp/templates/YtManagerApp/index.html +++ b/app/YtManagerApp/templates/YtManagerApp/index.html @@ -9,8 +9,6 @@ {% block scripts %} {% endblock %} diff --git a/app/YtManagerApp/templates/YtManagerApp/js/index.js b/app/YtManagerApp/templates/YtManagerApp/js/index.js index ee0a5be..7a7b79e 100644 --- a/app/YtManagerApp/templates/YtManagerApp/js/index.js +++ b/app/YtManagerApp/templates/YtManagerApp/js/index.js @@ -178,63 +178,77 @@ function videos_Submit(e) /// /// Notifications /// -const NOTIFICATION_INTERVAL = 1000; -const STATUS_UPDATE = 'st-up'; -const STATUS_OPERATION_PROGRESS = 'st-op-prog'; -const STATUS_OPERATION_END = 'st-op-end'; -const OPERATION_LIST = {}; +const JOB_QUERY_INTERVAL = 1500; -function notifications_update_progress_bar() { - var count = 0; - var percent = 0; - - for(op in OPERATION_LIST) { - count++; - percent += OPERATION_LIST[op]; - } - - let progress = $('#status-progress'); - if (count > 0) { - progress.removeClass('invisible'); - let bar = progress.find('.progress-bar'); - bar.width(percent * 100 + '%'); - bar.text(count + ' operations in progress'); - } - else { - progress.addClass('invisible'); - } -} - -function get_and_process_notifications() +function get_and_process_running_jobs() { - $.get("{% url 'ajax_get_notifications' 12345 %}".replace("12345", LAST_NOTIFICATION_ID)) + $.get("{% url 'ajax_get_running_jobs' %}") .done(function(data) { - for (let entry of data) - { - LAST_NOTIFICATION_ID = entry.id; - let dt = new Date(entry.time); - // Status update - if (entry.msg === STATUS_UPDATE) { - let txt = `${dt.getHours()}:${zeroFill(dt.getMinutes(), 2)}${entry.status}`; - $('#status-message').html(txt); + let progress = $('#status-progress'); + let jobPanel = $('#job_panel'); + let jobTitle = jobPanel.find('#job_panel_title'); + let jobTitleNoJobs = jobPanel.find('#job_panel_no_jobs_title'); + let jobTemplate = jobPanel.find('#job_panel_item_template'); + + if (data.length > 0) { + + // Update status bar + if (data.length > 1) { + $('#status-message').text(`Running ${data.length} jobs...`); } - else if (entry.msg === STATUS_OPERATION_PROGRESS) { - let txt = `${dt.getHours()}:${zeroFill(dt.getMinutes(), 2)}${entry.status}`; - $('#status-message').html(txt); - - OPERATION_LIST[entry.operation] = entry.progress; - notifications_update_progress_bar(); - } - else if (entry.msg === STATUS_OPERATION_END) { - let txt = `${dt.getHours()}:${dt.getMinutes()}${entry.status}`; - $('#status-message').html(txt); - - delete OPERATION_LIST[entry.operation]; - notifications_update_progress_bar(); + else { + $('#status-message').text(`${data[0].description} | ${data[0].message}`); } + // Update global progress bar + let combinedProgress = 0; + for (let entry of data) { + combinedProgress += entry.progress; + } + + let percent = 100 * combinedProgress / data.length; + + progress.removeClass('invisible'); + let bar = progress.find('.progress-bar'); + bar.width(percent + '%'); + bar.text(`${percent.toFixed(0)}%`); + + // Update entries in job list + jobTitle.removeClass('collapse'); + jobTitleNoJobs.addClass('collapse'); + + data.sort(function (a, b) { return a.id - b.id }); + jobPanel.find('.job_entry').remove(); + + for (let entry of data) { + let jobEntry = jobTemplate.clone(); + jobEntry.attr('id', `job_${entry.id}`); + jobEntry.addClass('job_entry'); + jobEntry.removeClass('collapse'); + jobEntry.find('#job_panel_item_title').text(entry.description); + jobEntry.find('#job_panel_item_subtitle').text(entry.message); + + let entryPercent = 100 * entry.progress; + let jobEntryProgress = jobEntry.find('#job_panel_item_progress'); + jobEntryProgress.width(entryPercent + '%'); + jobEntryProgress.text(`${entryPercent.toFixed(0)}%`); + + jobEntry.appendTo(jobPanel); + } + + $('#btn_toggle_job_panel').dropdown('update'); + } + else { + progress.addClass('invisible'); + $('#status-message').text(""); + + jobTitle.addClass('collapse'); + jobTitleNoJobs.removeClass('collapse'); + jobPanel.find('.job_entry').remove(); + + $('#btn_toggle_job_panel').dropdown('update'); } }); } @@ -279,6 +293,7 @@ $(document).ready(function () videos_Reload(); - // Notification manager - setInterval(get_and_process_notifications, NOTIFICATION_INTERVAL); + // Notifications + get_and_process_running_jobs(); + setInterval(get_and_process_running_jobs, JOB_QUERY_INTERVAL); }); diff --git a/app/YtManagerApp/templates/YtManagerApp/master_default.html b/app/YtManagerApp/templates/YtManagerApp/master_default.html index 2a50390..3d2e34a 100644 --- a/app/YtManagerApp/templates/YtManagerApp/master_default.html +++ b/app/YtManagerApp/templates/YtManagerApp/master_default.html @@ -74,6 +74,37 @@
+
+ + +
+ diff --git a/app/YtManagerApp/urls.py b/app/YtManagerApp/urls.py index d619a22..f33336c 100644 --- a/app/YtManagerApp/urls.py +++ b/app/YtManagerApp/urls.py @@ -24,7 +24,7 @@ from .views.actions import SyncNowView, DeleteVideoFilesView, DownloadVideoFiles from .views.auth import ExtendedLoginView, RegisterView, RegisterDoneView from .views.index import index, ajax_get_tree, ajax_get_videos, CreateFolderModal, UpdateFolderModal, DeleteFolderModal, \ CreateSubscriptionModal, UpdateSubscriptionModal, DeleteSubscriptionModal, ImportSubscriptionsModal -from .views.notifications import ajax_get_notifications +from .views.notifications import ajax_get_running_jobs from .views.settings import SettingsView, AdminSettingsView from .views.video import VideoDetailView, video_detail_view @@ -45,7 +45,7 @@ urlpatterns = [ path('ajax/get_tree/', ajax_get_tree, name='ajax_get_tree'), path('ajax/get_videos/', ajax_get_videos, name='ajax_get_videos'), - path('ajax/get_notifications/', ajax_get_notifications, name='ajax_get_notifications'), + path('ajax/get_running_jobs/', ajax_get_running_jobs, name='ajax_get_running_jobs'), # Modals path('modal/create_folder/', CreateFolderModal.as_view(), name='modal_create_folder'), diff --git a/app/YtManagerApp/utils/progress_tracker.py b/app/YtManagerApp/utils/progress_tracker.py new file mode 100644 index 0000000..f2fa680 --- /dev/null +++ b/app/YtManagerApp/utils/progress_tracker.py @@ -0,0 +1,109 @@ +from typing import Optional, Callable + + +class ProgressTracker(object): + """ + Class which helps keep track of complex operation progress. + """ + + def __init__(self, total_steps: float = 100, initial_steps: float = 0, + listener: Callable[[float, str], None] = None, + completed_listener: Callable[[], None] = None, + parent: Optional["ProgressTracker"] = None): + """ + Constructor + :param total_steps: Total number of steps required by this operation + :param initial_steps: Starting steps + :param parent: Parent progress tracker + :param listener: Callable which is called when any progress happens + """ + + self.total_steps = total_steps + self.steps = initial_steps + + self.__subtask: ProgressTracker = None + self.__subtask_steps = 0 + + self.__parent = parent + self.__listener = listener + self.__completed_listener = completed_listener + + def __on_progress(self, progress_msg): + if self.__listener is not None: + self.__listener(self.compute_progress(), progress_msg) + + if self.__parent is not None: + self.__parent.__on_progress(progress_msg) + + if self.steps >= self.total_steps and self.__completed_listener is not None: + self.__completed_listener() + + def advance(self, steps: float = 1, progress_msg: str = ''): + """ + Advances a number of steps. + :param steps: Number of steps to advance + :param progress_msg: A message which will be passed to a listener + :return: + """ + + # We can assume previous subtask is now completed + if self.__subtask is not None: + self.steps += self.__subtask_steps + self.__subtask = None + + self.steps += steps + self.__on_progress(progress_msg) + + def subtask(self, steps: float = 1, subtask_total_steps: float = 100, subtask_initial_steps: float = 0): + """ + Creates a 'subtask' which has its own progress, which will be used in the calculation of the final progress. + :param steps: Number of steps the subtask is 'worth' + :param subtask_total_steps: Total number of steps for subtask + :param subtask_initial_steps: Initial steps for subtask + :return: ProgressTracker for subtask + """ + + # We can assume previous subtask is now completed + if self.__subtask is not None: + self.steps += self.__subtask_steps + + self.__subtask = ProgressTracker(total_steps=subtask_total_steps, + initial_steps=subtask_initial_steps, + parent=self) + self.__subtask_steps = steps + + return self.__subtask + + def compute_progress(self): + """ + Calculates final progress value in percent. + :return: value in [0,1] interval representing progress + """ + base = float(self.steps) / self.total_steps + if self.__subtask is not None: + base += self.__subtask.compute_progress() * self.__subtask_steps / self.total_steps + + return min(base, 1.0) + + +# Test +if __name__ == '__main__': + + def on_progress(progress, message): + print(f'{progress * 100}%: {message}') + + def on_completed(): + print("Complete!") + + main_task = ProgressTracker(total_steps=20, listener=on_progress, completed_listener=on_completed) + + for i in range(10): + main_task.advance(progress_msg='First 10 steps') + + subtask = main_task.subtask(5, subtask_total_steps=10) + + for i in range(10): + subtask.advance(progress_msg='Subtask') + + for i in range(5): + main_task.advance(progress_msg='Main task again') diff --git a/app/YtManagerApp/views/actions.py b/app/YtManagerApp/views/actions.py index 28cfd7a..bbab4ca 100644 --- a/app/YtManagerApp/views/actions.py +++ b/app/YtManagerApp/views/actions.py @@ -2,13 +2,13 @@ from django.contrib.auth.mixins import LoginRequiredMixin from django.http import JsonResponse from django.views.generic import View -from YtManagerApp.management.jobs.synchronize import schedule_synchronize_now +from YtManagerApp.management.jobs.synchronize import SynchronizeJob from YtManagerApp.models import Video class SyncNowView(LoginRequiredMixin, View): def post(self, *args, **kwargs): - schedule_synchronize_now() + SynchronizeJob.schedule_now() return JsonResponse({ 'success': True }) diff --git a/app/YtManagerApp/views/first_time.py b/app/YtManagerApp/views/first_time.py index 2761281..dc65c2e 100644 --- a/app/YtManagerApp/views/first_time.py +++ b/app/YtManagerApp/views/first_time.py @@ -9,8 +9,8 @@ from django.urls import reverse_lazy from django.views.generic import FormView from YtManagerApp.management.appconfig import appconfig -from YtManagerApp.management.jobs.synchronize import schedule_synchronize_global -from YtManagerApp.scheduler import initialize_scheduler +from YtManagerApp.management.jobs.synchronize import SynchronizeJob +from YtManagerApp.scheduler import scheduler from YtManagerApp.views.forms.first_time import WelcomeForm, ApiKeyForm, PickAdminUserForm, ServerConfigForm, DoneForm, \ UserCreationForm, LoginForm @@ -177,8 +177,8 @@ class Step3ConfigureView(WizardStepMixin, FormView): appconfig.initialized = True # Start scheduler if not started - initialize_scheduler() - schedule_synchronize_global() + scheduler.initialize() + SynchronizeJob.schedule_global_job() return super().form_valid(form) diff --git a/app/YtManagerApp/views/index.py b/app/YtManagerApp/views/index.py index 9a8b368..acac5c2 100644 --- a/app/YtManagerApp/views/index.py +++ b/app/YtManagerApp/views/index.py @@ -15,7 +15,6 @@ from YtManagerApp.management.appconfig import appconfig from YtManagerApp.models import Subscription, SubscriptionFolder, VIDEO_ORDER_CHOICES, VIDEO_ORDER_MAPPING from YtManagerApp.utils import youtube, subscription_file_parser from YtManagerApp.views.controls.modal import ModalMixin -from YtManagerApp.management.notification_manager import get_current_notification_id import logging @@ -122,7 +121,6 @@ def index(request: HttpRequest): if request.user.is_authenticated: context.update({ 'filter_form': VideoFilterForm(), - 'current_notification_id': get_current_notification_id(), }) return render(request, 'YtManagerApp/index.html', context) else: diff --git a/app/YtManagerApp/views/notifications.py b/app/YtManagerApp/views/notifications.py index e5435b5..9cec503 100644 --- a/app/YtManagerApp/views/notifications.py +++ b/app/YtManagerApp/views/notifications.py @@ -1,12 +1,42 @@ from django.contrib.auth.decorators import login_required +from django.db.models import Q from django.http import HttpRequest, JsonResponse -from YtManagerApp.management.notification_manager import get_notifications +from YtManagerApp.models import JobExecution, JobMessage, JOB_STATES_MAP @login_required -def ajax_get_notifications(request: HttpRequest, last_id: int): - user = request.user - notifications = get_notifications(user, last_id) - notifications = list(notifications) - return JsonResponse(notifications, safe=False) +def ajax_get_running_jobs(request: HttpRequest): + jobs = JobExecution.objects\ + .filter(status=JOB_STATES_MAP['running'])\ + .filter(Q(user__isnull=True) | Q(user=request.user))\ + .order_by('start_date') + + response = [] + + for job in jobs: + last_progress_message = JobMessage.objects\ + .filter(job=job, progress__isnull=False, suppress_notification=False)\ + .order_by('-timestamp').first() + + last_message = JobMessage.objects\ + .filter(job=job, suppress_notification=False)\ + .order_by('-timestamp').first() + + message = '' + progress = 0 + + if last_message is not None: + message = last_message.message + if last_progress_message is not None: + progress = last_progress_message.progress + + response.append({ + 'id': job.id, + 'description': job.description, + 'progress': progress, + 'message': message + }) + + return JsonResponse(response, safe=False) + diff --git a/app/YtManagerApp/views/settings.py b/app/YtManagerApp/views/settings.py index a1329d7..36ecf93 100644 --- a/app/YtManagerApp/views/settings.py +++ b/app/YtManagerApp/views/settings.py @@ -3,7 +3,7 @@ from django.http import HttpResponseForbidden from django.urls import reverse_lazy from django.views.generic import FormView -from YtManagerApp.management.jobs.synchronize import schedule_synchronize_global +from YtManagerApp.management.jobs.synchronize import SynchronizeJob from YtManagerApp.views.forms.settings import SettingsForm, AdminSettingsForm @@ -45,5 +45,5 @@ class AdminSettingsView(LoginRequiredMixin, FormView): def form_valid(self, form): form.save() - schedule_synchronize_global() + SynchronizeJob.schedule_global_job() return super().form_valid(form) diff --git a/app/external/pytaw/pytaw/utils.py b/app/external/pytaw/pytaw/utils.py index 5bd503d..59db3fb 100644 --- a/app/external/pytaw/pytaw/utils.py +++ b/app/external/pytaw/pytaw/utils.py @@ -89,4 +89,4 @@ def iterate_chunks(iterable: typing.Iterable, chunk_size: int): chunk = tuple(itertools.islice(it, chunk_size)) if not chunk: return - yield chunk \ No newline at end of file + yield chunk