ytsm/app/YtManagerApp/services/scheduler/jobs/synchronize_job.py

184 lines
7.3 KiB
Python
Raw Normal View History

import errno
2019-08-14 14:14:16 +00:00
import itertools
from threading import Lock
from apscheduler.triggers.cron import CronTrigger
from django.db.models import Max
from django.conf import settings
2018-10-10 22:43:50 +00:00
2019-12-18 22:27:06 +00:00
from YtManagerApp.services.downloader import fetch_thumbnail, downloader_process_subscription
2018-10-13 20:01:45 +00:00
from YtManagerApp.models import *
2019-12-18 22:27:06 +00:00
from YtManagerApp.services.scheduler.job import Job
from YtManagerApp.services import Services
2019-08-14 14:14:16 +00:00
from external.pytaw.pytaw.utils import iterate_chunks
2018-10-10 22:43:50 +00:00
_ENABLE_UPDATE_STATS = True
2018-10-10 22:43:50 +00:00
2019-08-14 14:14:16 +00:00
class SynchronizeJob(Job):
name = "SynchronizeJob"
__lock = Lock()
running = False
__global_sync_job = None
2019-08-14 14:14:16 +00:00
def __init__(self, job_execution, subscription: Optional[Subscription] = None):
super().__init__(job_execution)
self.__subscription = subscription
self.__api = youtube.YoutubeAPI.build_public()
self.__new_vids = []
2019-08-14 14:14:16 +00:00
def get_description(self):
if self.__subscription is not None:
return "Running synchronization for subscription " + self.__subscription.name
return "Running synchronization..."
2019-08-14 14:14:16 +00:00
def get_subscription_list(self):
if self.__subscription is not None:
return [self.__subscription]
return Subscription.objects.all()
2019-08-14 14:14:16 +00:00
def get_videos_list(self, subs):
return Video.objects.filter(subscription__in=subs)
2019-08-14 14:14:16 +00:00
def run(self):
self.__lock.acquire(blocking=True)
SynchronizeJob.running = True
try:
2019-08-14 14:14:16 +00:00
self.log.info(self.get_description())
# Build list of work items
work_subs = self.get_subscription_list()
work_vids = self.get_videos_list(work_subs)
self.set_total_steps(len(work_subs) + len(work_vids))
# Remove the 'new' flag
work_vids.update(new=False)
2019-08-14 14:14:16 +00:00
# Process subscriptions
for sub in work_subs:
self.progress_advance(1, "Synchronizing subscription " + sub.name)
self.check_new_videos(sub)
self.fetch_missing_thumbnails(sub)
# Add new videos to progress calculation
self.set_total_steps(len(work_subs) + len(work_vids) + len(self.__new_vids))
# Process videos
all_videos = itertools.chain(work_vids, self.__new_vids)
for batch in iterate_chunks(all_videos, 50):
video_stats = {}
if _ENABLE_UPDATE_STATS:
batch_ids = [video.video_id for video in batch]
video_stats = {v.id: v for v in self.__api.videos(batch_ids, part='id,statistics')}
for video in batch:
2019-08-14 14:14:16 +00:00
self.progress_advance(1, "Updating video " + video.name)
self.check_video_deleted(video)
self.fetch_missing_thumbnails(video)
if video.video_id in video_stats:
self.update_video_stats(video, video_stats[video.video_id])
# Start downloading videos
for sub in work_subs:
downloader_process_subscription(sub)
finally:
SynchronizeJob.running = False
self.__lock.release()
def check_new_videos(self, sub: Subscription):
playlist_items = self.__api.playlist_items(sub.playlist_id)
if sub.rewrite_playlist_indices:
playlist_items = sorted(playlist_items, key=lambda x: x.published_at)
else:
playlist_items = sorted(playlist_items, key=lambda x: x.position)
2019-08-14 14:14:16 +00:00
for item in playlist_items:
results = Video.objects.filter(video_id=item.resource_video_id, subscription=sub)
if not results.exists():
2019-08-14 14:14:16 +00:00
self.log.info('New video for subscription %s: %s %s"', sub, item.resource_video_id, item.title)
# fix playlist index if necessary
if sub.rewrite_playlist_indices or Video.objects.filter(subscription=sub, playlist_index=item.position).exists():
highest = Video.objects.filter(subscription=sub).aggregate(Max('playlist_index'))['playlist_index__max']
item.position = 1 + (highest or -1)
2019-08-14 14:14:16 +00:00
self.__new_vids.append(Video.create(item, sub))
def fetch_missing_thumbnails(self, obj: Union[Subscription, Video]):
if obj.thumbnail.startswith("http"):
if isinstance(obj, Subscription):
obj.thumbnail = fetch_thumbnail(obj.thumbnail, 'sub', obj.playlist_id, settings.THUMBNAIL_SIZE_SUBSCRIPTION)
elif isinstance(obj, Video):
2019-12-18 22:27:06 +00:00
obj.thumbnail = fetch_thumbnail(obj.thumbnail, 'video', obj.provider_id, settings.THUMBNAIL_SIZE_VIDEO)
obj.save()
2019-08-14 14:14:16 +00:00
def check_video_deleted(self, video: Video):
if video.downloaded_path is not None:
files = []
try:
files = list(video.get_files())
except OSError as e:
if e.errno != errno.ENOENT:
self.log.error("Could not access path %s. Error: %s", video.downloaded_path, e)
self.usr_err(f"Could not access path {video.downloaded_path}: {e}", suppress_notification=True)
return
# Try to find a valid video file
found_video = False
for file in files:
2019-08-14 14:14:16 +00:00
mime, _ = mimetypes.guess_type(file)
if mime is not None and mime.startswith("video"):
found_video = True
# Video not found, we can safely assume that the video was deleted.
if not found_video:
2019-12-18 22:27:06 +00:00
self.log.info("Video %d was deleted! [%s %s]", video.id, video.provider_id, video.name)
2019-08-14 14:14:16 +00:00
# Clean up
for file in files:
try:
os.unlink(file)
except OSError as e:
self.log.error("Could not delete redundant file %s. Error: %s", file, e)
self.usr_err(f"Could not delete redundant file {file}: {e}", suppress_notification=True)
video.downloaded_path = None
# Mark watched?
user = video.subscription.user
if user.preferences['mark_deleted_as_watched']:
video.watched = True
video.save()
def update_video_stats(self, video: Video, yt_video):
if yt_video.n_likes is not None \
and yt_video.n_dislikes is not None \
and yt_video.n_likes + yt_video.n_dislikes > 0:
video.rating = yt_video.n_likes / (yt_video.n_likes + yt_video.n_dislikes)
video.views = yt_video.n_views
video.save()
@staticmethod
def schedule_global_job():
2019-12-18 22:27:06 +00:00
trigger = CronTrigger.from_crontab(Services.appConfig().sync_schedule)
2019-08-14 14:14:16 +00:00
if SynchronizeJob.__global_sync_job is None:
2019-12-18 22:27:06 +00:00
trigger = CronTrigger.from_crontab(Services.appConfig().sync_schedule)
SynchronizeJob.__global_sync_job = Services.scheduler().add_job(SynchronizeJob, trigger, max_instances=1, coalesce=True)
2019-08-14 14:14:16 +00:00
else:
SynchronizeJob.__global_sync_job.reschedule(trigger, max_instances=1, coalesce=True)
2019-08-14 14:14:16 +00:00
@staticmethod
def schedule_now():
Services.scheduler.add_job(SynchronizeJob, max_instances=1, coalesce=True)
2019-08-14 14:14:16 +00:00
@staticmethod
def schedule_now_for_subscription(subscription):
Services.scheduler.add_job(SynchronizeJob, user=subscription.user, args=[subscription])