Began work on refactoring the YTSM application.

This commit is contained in:
2019-12-16 22:19:50 +02:00
parent 794b9bd42d
commit fd5d05232f
23 changed files with 475 additions and 347 deletions

View File

View File

@ -0,0 +1,118 @@
import logging
from abc import abstractmethod
from typing import Optional
from YtManagerApp.models import JOB_MESSAGE_LEVELS_MAP, JobMessage
from .progress_tracker import ProgressTracker
class Job(object):
name = 'GenericJob'
"""
Base class for jobs running in the scheduler.
"""
def __init__(self, job_execution, *_):
self.job_execution = job_execution
self.log = logging.getLogger(self.name)
self.__progress_tracker = ProgressTracker(listener=Job.__on_progress,
listener_args=[self])
@abstractmethod
def get_description(self) -> str:
"""
Gets a user friendly description of this job.
Should be overriden in job classes.
:return:
"""
return "Running job..."
#
# progress tracking
#
def __on_progress(self, percent: float, message: str):
self.usr_log(message, progress=percent)
def set_total_steps(self, steps: float):
"""
Sets the total number of work steps this task has. This is used for tracking progress.
Should be overriden in job classes.
:return:
"""
self.__progress_tracker.total_steps = steps
def progress_advance(self, steps: float = 1, progress_msg: str = ''):
"""
Advances a number of steps.
:param steps: Number of steps to advance
:param progress_msg: A message which will be passed to a listener
:return:
"""
self.__progress_tracker.advance(steps, progress_msg)
def create_subtask(self, steps: float = 1, subtask_total_steps: float = 100, subtask_initial_steps: float = 0):
"""
Creates a 'subtask' which has its own progress, which will be used in the calculation of the final progress.
:param steps: Number of steps the subtask is 'worth'
:param subtask_total_steps: Total number of steps for subtask
:param subtask_initial_steps: Initial steps for subtask
:return: ProgressTracker for subtask
"""
return self.__progress_tracker.subtask(steps, subtask_total_steps, subtask_initial_steps)
#
# user log messages
#
def usr_log(self, message, progress: Optional[float] = None, level: int = JOB_MESSAGE_LEVELS_MAP['normal'],
suppress_notification: bool = False):
"""
Creates a new log message which will be shown on the user interface.
Progress can also be updated using this method.
:param message: A message to be displayed to the user
:param progress: Progress percentage in [0,1] interval
:param level: Log level (normal, warning, error)
:param suppress_notification: If set to true, a notification will not displayed to the user, but it will
appear in the system logs.
:return:
"""
message = JobMessage(job=self.job_execution,
progress=progress,
message=message,
level=level,
suppress_notification=suppress_notification)
message.save()
def usr_warn(self, message, progress: Optional[float] = None, suppress_notification: bool = False):
"""
Creates a new warning message which will be shown on the user interface.
Progress can also be updated using this method.
:param message: A message to be displayed to the user
:param progress: Progress percentage in [0,1] interval
:param suppress_notification: If set to true, a notification will not displayed to the user, but it will
appear in the system logs.
:return:
"""
self.usr_log(message, progress, JOB_MESSAGE_LEVELS_MAP['warning'], suppress_notification)
def usr_err(self, message, progress: Optional[float] = None, suppress_notification: bool = False):
"""
Creates a new error message which will be shown on the user interface.
Progress can also be updated using this method.
:param message: A message to be displayed to the user
:param progress: Progress percentage in [0,1] interval
:param suppress_notification: If set to true, a notification will not displayed to the user, but it will
appear in the system logs.
:return:
"""
self.usr_log(message, progress, JOB_MESSAGE_LEVELS_MAP['error'], suppress_notification)
#
# main run method
#
@abstractmethod
def run(self):
pass

View File

@ -0,0 +1,47 @@
import os
from YtManagerApp.models import Video
from YtManagerApp.scheduler.job import Job
class DeleteVideoJob(Job):
name = "DeleteVideoJob"
def __init__(self, job_execution, video: Video):
super().__init__(job_execution)
self._video = video
def get_description(self):
return f"Deleting video {self._video}"
def run(self):
count = 0
try:
for file in self._video.get_files():
self.log.info("Deleting file %s", file)
count += 1
try:
os.unlink(file)
except OSError as e:
self.log.error("Failed to delete file %s: Error: %s", file, e)
except OSError as e:
self.log.error("Failed to delete video %d [%s %s]. Error: %s", self._video.id,
self._video.video_id, self._video.name, e)
self._video.downloaded_path = None
self._video.save()
self.log.info('Deleted video %d successfully! (%d files) [%s %s]', self._video.id, count,
self._video.video_id, self._video.name)
@staticmethod
def schedule(video: Video):
"""
Schedules a delete video job to run immediately.
:param video:
:return:
"""
from YtManagerApp.services import Services
Services.scheduler.add_job(DeleteVideoJob, args=[video])

View File

@ -0,0 +1,136 @@
import os
import re
from string import Template
from threading import Lock
import youtube_dl
from YtManagerApp.models import Video
from YtManagerApp.scheduler.job import Job
class DownloadVideoJob(Job):
name = "DownloadVideoJob"
__lock = Lock()
def __init__(self, job_execution, video: Video, attempt: int = 1):
super().__init__(job_execution)
self._video = video
self._attempt = attempt
self._log_youtube_dl = self.log.getChild('youtube_dl')
def get_description(self):
ret = "Downloading video " + self._video.name
if self._attempt > 1:
ret += f" (attempt {self._attempt})"
return ret
def run(self):
# Issue: if multiple videos are downloaded at the same time, a race condition appears in the mkdirs() call that
# youtube-dl makes, which causes it to fail with the error 'Cannot create folder - file already exists'.
# For now, allow a single download instance.
self.__lock.acquire()
try:
user = self._video.subscription.user
max_attempts = user.preferences['max_download_attempts']
youtube_dl_params, output_path = self.__build_youtube_dl_params(self._video)
with youtube_dl.YoutubeDL(youtube_dl_params) as yt:
ret = yt.download(["https://www.youtube.com/watch?v=" + self._video.video_id])
self.log.info('Download finished with code %d', ret)
if ret == 0:
self._video.downloaded_path = output_path
self._video.save()
self.log.info('Video %d [%s %s] downloaded successfully!', self._video.id, self._video.video_id,
self._video.name)
elif self._attempt <= max_attempts:
self.log.warning('Re-enqueueing video (attempt %d/%d)', self._attempt, max_attempts)
DownloadVideoJob.schedule(self._video, self._attempt + 1)
else:
self.log.error('Multiple attempts to download video %d [%s %s] failed!', self._video.id,
self._video.video_id, self._video.name)
self._video.downloaded_path = ''
self._video.save()
finally:
self.__lock.release()
def __build_youtube_dl_params(self, video: Video):
sub = video.subscription
user = sub.user
# resolve path
download_path = user.preferences['download_path']
template_dict = self.__build_template_dict(video)
output_pattern = Template(user.preferences['download_file_pattern']).safe_substitute(template_dict)
output_path = os.path.join(download_path, output_pattern)
output_path = os.path.normpath(output_path)
youtube_dl_params = {
'logger': self._log_youtube_dl,
'format': user.preferences['download_format'],
'outtmpl': output_path,
'writethumbnail': True,
'writedescription': True,
'writesubtitles': user.preferences['download_subtitles'],
'writeautomaticsub': user.preferences['download_autogenerated_subtitles'],
'allsubtitles': user.preferences['download_subtitles_all'],
'merge_output_format': 'mp4',
'postprocessors': [
{
'key': 'FFmpegMetadata'
},
]
}
sub_langs = user.preferences['download_subtitles_langs'].split(',')
sub_langs = [i.strip() for i in sub_langs]
if len(sub_langs) > 0:
youtube_dl_params['subtitleslangs'] = sub_langs
sub_format = user.preferences['download_subtitles_format']
if len(sub_format) > 0:
youtube_dl_params['subtitlesformat'] = sub_format
return youtube_dl_params, output_path
def __build_template_dict(self, video: Video):
return {
'channel': video.subscription.channel_name,
'channel_id': video.subscription.channel_id,
'playlist': video.subscription.name,
'playlist_id': video.subscription.playlist_id,
'playlist_index': "{:03d}".format(1 + video.playlist_index),
'title': video.name,
'id': video.video_id,
}
def __get_valid_path(self, path):
"""
Normalizes string, converts to lowercase, removes non-alpha characters,
and converts spaces to hyphens.
"""
import unicodedata
value = unicodedata.normalize('NFKD', path).encode('ascii', 'ignore').decode('ascii')
value = re.sub('[:"*]', '', value).strip()
value = re.sub('[?<>|]', '#', value)
return value
@staticmethod
def schedule(video: Video, attempt: int = 1):
"""
Schedules to download video immediately
:param video:
:param attempt:
:return:
"""
from YtManagerApp.services import Services
Services.scheduler.add_job(DownloadVideoJob, args=[video, attempt])

View File

@ -0,0 +1,184 @@
import errno
import itertools
from threading import Lock
from apscheduler.triggers.cron import CronTrigger
from django.db.models import Max
from django.conf import settings
from YtManagerApp.management.downloader import fetch_thumbnail, downloader_process_subscription
from YtManagerApp.models import *
from YtManagerApp.scheduler.job import Job
from YtManagerApp.services import Services
from YtManagerApp.utils import youtube
from external.pytaw.pytaw.utils import iterate_chunks
_ENABLE_UPDATE_STATS = True
class SynchronizeJob(Job):
name = "SynchronizeJob"
__lock = Lock()
running = False
__global_sync_job = None
def __init__(self, job_execution, subscription: Optional[Subscription] = None):
super().__init__(job_execution)
self.__subscription = subscription
self.__api = youtube.YoutubeAPI.build_public()
self.__new_vids = []
def get_description(self):
if self.__subscription is not None:
return "Running synchronization for subscription " + self.__subscription.name
return "Running synchronization..."
def get_subscription_list(self):
if self.__subscription is not None:
return [self.__subscription]
return Subscription.objects.all()
def get_videos_list(self, subs):
return Video.objects.filter(subscription__in=subs)
def run(self):
self.__lock.acquire(blocking=True)
SynchronizeJob.running = True
try:
self.log.info(self.get_description())
# Build list of work items
work_subs = self.get_subscription_list()
work_vids = self.get_videos_list(work_subs)
self.set_total_steps(len(work_subs) + len(work_vids))
# Remove the 'new' flag
work_vids.update(new=False)
# Process subscriptions
for sub in work_subs:
self.progress_advance(1, "Synchronizing subscription " + sub.name)
self.check_new_videos(sub)
self.fetch_missing_thumbnails(sub)
# Add new videos to progress calculation
self.set_total_steps(len(work_subs) + len(work_vids) + len(self.__new_vids))
# Process videos
all_videos = itertools.chain(work_vids, self.__new_vids)
for batch in iterate_chunks(all_videos, 50):
video_stats = {}
if _ENABLE_UPDATE_STATS:
batch_ids = [video.video_id for video in batch]
video_stats = {v.id: v for v in self.__api.videos(batch_ids, part='id,statistics')}
for video in batch:
self.progress_advance(1, "Updating video " + video.name)
self.check_video_deleted(video)
self.fetch_missing_thumbnails(video)
if video.video_id in video_stats:
self.update_video_stats(video, video_stats[video.video_id])
# Start downloading videos
for sub in work_subs:
downloader_process_subscription(sub)
finally:
SynchronizeJob.running = False
self.__lock.release()
def check_new_videos(self, sub: Subscription):
playlist_items = self.__api.playlist_items(sub.playlist_id)
if sub.rewrite_playlist_indices:
playlist_items = sorted(playlist_items, key=lambda x: x.published_at)
else:
playlist_items = sorted(playlist_items, key=lambda x: x.position)
for item in playlist_items:
results = Video.objects.filter(video_id=item.resource_video_id, subscription=sub)
if not results.exists():
self.log.info('New video for subscription %s: %s %s"', sub, item.resource_video_id, item.title)
# fix playlist index if necessary
if sub.rewrite_playlist_indices or Video.objects.filter(subscription=sub, playlist_index=item.position).exists():
highest = Video.objects.filter(subscription=sub).aggregate(Max('playlist_index'))['playlist_index__max']
item.position = 1 + (highest or -1)
self.__new_vids.append(Video.create(item, sub))
def fetch_missing_thumbnails(self, obj: Union[Subscription, Video]):
if obj.thumbnail.startswith("http"):
if isinstance(obj, Subscription):
obj.thumbnail = fetch_thumbnail(obj.thumbnail, 'sub', obj.playlist_id, settings.THUMBNAIL_SIZE_SUBSCRIPTION)
elif isinstance(obj, Video):
obj.thumbnail = fetch_thumbnail(obj.thumbnail, 'video', obj.video_id, settings.THUMBNAIL_SIZE_VIDEO)
obj.save()
def check_video_deleted(self, video: Video):
if video.downloaded_path is not None:
files = []
try:
files = list(video.get_files())
except OSError as e:
if e.errno != errno.ENOENT:
self.log.error("Could not access path %s. Error: %s", video.downloaded_path, e)
self.usr_err(f"Could not access path {video.downloaded_path}: {e}", suppress_notification=True)
return
# Try to find a valid video file
found_video = False
for file in files:
mime, _ = mimetypes.guess_type(file)
if mime is not None and mime.startswith("video"):
found_video = True
# Video not found, we can safely assume that the video was deleted.
if not found_video:
self.log.info("Video %d was deleted! [%s %s]", video.id, video.video_id, video.name)
# Clean up
for file in files:
try:
os.unlink(file)
except OSError as e:
self.log.error("Could not delete redundant file %s. Error: %s", file, e)
self.usr_err(f"Could not delete redundant file {file}: {e}", suppress_notification=True)
video.downloaded_path = None
# Mark watched?
user = video.subscription.user
if user.preferences['mark_deleted_as_watched']:
video.watched = True
video.save()
def update_video_stats(self, video: Video, yt_video):
if yt_video.n_likes is not None \
and yt_video.n_dislikes is not None \
and yt_video.n_likes + yt_video.n_dislikes > 0:
video.rating = yt_video.n_likes / (yt_video.n_likes + yt_video.n_dislikes)
video.views = yt_video.n_views
video.save()
@staticmethod
def schedule_global_job():
trigger = CronTrigger.from_crontab(Services.appConfig.sync_schedule)
if SynchronizeJob.__global_sync_job is None:
trigger = CronTrigger.from_crontab(Services.appConfig.sync_schedule)
SynchronizeJob.__global_sync_job = Services.scheduler.add_job(SynchronizeJob, trigger, max_instances=1, coalesce=True)
else:
SynchronizeJob.__global_sync_job.reschedule(trigger, max_instances=1, coalesce=True)
@staticmethod
def schedule_now():
Services.scheduler.add_job(SynchronizeJob, max_instances=1, coalesce=True)
@staticmethod
def schedule_now_for_subscription(subscription):
Services.scheduler.add_job(SynchronizeJob, user=subscription.user, args=[subscription])

View File

@ -0,0 +1,83 @@
from typing import Callable, List, Any, Optional
class ProgressTracker(object):
"""
Class which helps keep track of complex operation progress.
"""
def __init__(self, total_steps: float = 100, initial_steps: float = 0,
listener: Callable[[float, str], None] = None,
listener_args: List[Any] = None,
parent: Optional["ProgressTracker"] = None):
"""
Constructor
:param total_steps: Total number of steps required by this operation
:param initial_steps: Starting steps
:param parent: Parent progress tracker
:param listener: Callable which is called when any progress happens
"""
self.total_steps = total_steps
self.steps = initial_steps
self.__subtask: ProgressTracker = None
self.__subtask_steps = 0
self.__parent = parent
self.__listener = listener
self.__listener_args = listener_args or []
def __on_progress(self, progress_msg):
if self.__listener is not None:
self.__listener(*self.__listener_args, self.compute_progress(), progress_msg)
if self.__parent is not None:
self.__parent.__on_progress(progress_msg)
def advance(self, steps: float = 1, progress_msg: str = ''):
"""
Advances a number of steps.
:param steps: Number of steps to advance
:param progress_msg: A message which will be passed to a listener
:return:
"""
# We can assume previous subtask is now completed
if self.__subtask is not None:
self.steps += self.__subtask_steps
self.__subtask = None
self.steps += steps
self.__on_progress(progress_msg)
def subtask(self, steps: float = 1, subtask_total_steps: float = 100, subtask_initial_steps: float = 0):
"""
Creates a 'subtask' which has its own progress, which will be used in the calculation of the final progress.
:param steps: Number of steps the subtask is 'worth'
:param subtask_total_steps: Total number of steps for subtask
:param subtask_initial_steps: Initial steps for subtask
:return: ProgressTracker for subtask
"""
# We can assume previous subtask is now completed
if self.__subtask is not None:
self.steps += self.__subtask_steps
self.__subtask = ProgressTracker(total_steps=subtask_total_steps,
initial_steps=subtask_initial_steps,
parent=self)
self.__subtask_steps = steps
return self.__subtask
def compute_progress(self):
"""
Calculates final progress value in percent.
:return: value in [0,1] interval representing progress
"""
base = float(self.steps) / self.total_steps
if self.__subtask is not None:
base += self.__subtask.compute_progress() * self.__subtask_steps / self.total_steps
return min(base, 1.0)

View File

@ -0,0 +1,75 @@
import datetime
import logging
import traceback
from typing import Type, Union, Optional
import pytz
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.base import BaseTrigger
from django.contrib.auth.models import User
from YtManagerApp.management.appconfig import AppConfig
from YtManagerApp.models import JobExecution, JOB_STATES_MAP
from YtManagerApp.scheduler.job import Job
class YtsmScheduler(object):
def __init__(self, app_config: AppConfig):
self._ap_scheduler = BackgroundScheduler()
self._app_config = app_config
def initialize(self):
# set state of existing jobs as "interrupted"
JobExecution.objects\
.filter(status=JOB_STATES_MAP['running'])\
.update(status=JOB_STATES_MAP['interrupted'])
self._configure_scheduler()
self._ap_scheduler.start()
def _configure_scheduler(self):
logger = logging.getLogger('scheduler')
executors = {
'default': {
'type': 'threadpool',
'max_workers': self._app_config.concurrency
}
}
job_defaults = {
'misfire_grace_time': 60 * 60 * 24 * 365 # 1 year
}
self._ap_scheduler.configure(logger=logger, executors=executors, job_defaults=job_defaults)
def _run_job(self, job_class: Type[Job], user: Optional[User], args: Union[tuple, list]):
job_execution = JobExecution(user=user, status=JOB_STATES_MAP['running'])
job_execution.save()
job_instance = job_class(job_execution, *args)
# update description
job_execution.description = job_instance.get_description()
job_execution.save()
try:
job_instance.run()
job_execution.status = JOB_STATES_MAP['finished']
except Exception as ex:
job_instance.log.critical("Job failed with exception: %s", traceback.format_exc())
job_instance.usr_err(job_instance.name + " operation failed: " + str(ex))
job_execution.status = JOB_STATES_MAP['failed']
finally:
job_execution.end_date = datetime.datetime.now(tz=pytz.UTC)
job_execution.save()
def add_job(self, job_class: Type[Job], trigger: Union[str, BaseTrigger] = None,
args: Union[list, tuple] = None,
user: Optional[User] = None,
**kwargs):
if args is None:
args = []
return self._ap_scheduler.add_job(YtsmScheduler._run_job, trigger=trigger, args=[self, job_class, user, args],
**kwargs)