mirror of
https://github.com/chibicitiberiu/ytsm.git
synced 2024-02-24 05:43:31 +00:00
247 lines
10 KiB
Python
247 lines
10 KiB
Python
|
import errno
|
||
|
import itertools
|
||
|
import mimetypes
|
||
|
import os
|
||
|
from threading import Lock
|
||
|
from typing import Optional, List, Union
|
||
|
|
||
|
from apscheduler.triggers.cron import CronTrigger
|
||
|
from django.conf import settings
|
||
|
from django.db.models import Max
|
||
|
|
||
|
from YtManagerApp.models import *
|
||
|
from YtManagerApp.providers.video_provider import VideoProvider, InvalidURLError
|
||
|
from YtManagerApp.scheduler.job import Job
|
||
|
from YtManagerApp.utils.algorithms import group_by
|
||
|
|
||
|
_ENABLE_UPDATE_STATS = True
|
||
|
|
||
|
|
||
|
class SynchronizeJob(Job):
|
||
|
name = "SynchronizeJob"
|
||
|
__lock = Lock()
|
||
|
running = False
|
||
|
__global_sync_job = None
|
||
|
|
||
|
def __init__(self, job_execution, subscription: Optional[Subscription] = None):
|
||
|
super().__init__(job_execution)
|
||
|
self.__subscription: Optional[Subscription] = subscription
|
||
|
self.__new_vids: List[Video] = []
|
||
|
|
||
|
def get_description(self):
|
||
|
if self.__subscription is not None:
|
||
|
return "Running synchronization for subscription " + self.__subscription.name
|
||
|
return "Running synchronization..."
|
||
|
|
||
|
def get_subscription_list(self):
|
||
|
if self.__subscription is not None:
|
||
|
return [self.__subscription]
|
||
|
return Subscription.objects.all()
|
||
|
|
||
|
def get_videos_list(self, subs):
|
||
|
return Video.objects.filter(subscription__in=subs)
|
||
|
|
||
|
def run(self):
|
||
|
from YtManagerApp.services import Services
|
||
|
|
||
|
self.__lock.acquire(blocking=True)
|
||
|
SynchronizeJob.running = True
|
||
|
try:
|
||
|
self.log.info(self.get_description())
|
||
|
|
||
|
# Build list of work items
|
||
|
work_subs = self.get_subscription_list()
|
||
|
work_vids = self.get_videos_list(work_subs)
|
||
|
|
||
|
self.set_total_steps(len(work_subs) + len(work_vids))
|
||
|
|
||
|
# Remove the 'new' flag
|
||
|
work_vids.update(new=False)
|
||
|
|
||
|
# Process subscriptions
|
||
|
for sub in work_subs:
|
||
|
self.progress_advance(1, "Synchronizing subscription " + sub.name)
|
||
|
self.check_new_videos(sub)
|
||
|
self.fetch_missing_thumbnails(sub)
|
||
|
|
||
|
# Add new videos to progress calculation
|
||
|
self.set_total_steps(len(work_subs) + len(work_vids) + len(self.__new_vids))
|
||
|
|
||
|
# Group videos by provider
|
||
|
all_videos = itertools.chain(work_vids, self.__new_vids)
|
||
|
all_videos_by_provider = group_by(all_videos, lambda x: x.subscription.provider_id)
|
||
|
|
||
|
for provider_id, videos in all_videos_by_provider.items():
|
||
|
provider: VideoProvider = Services.videoProviderManager().get(provider_id)
|
||
|
if _ENABLE_UPDATE_STATS:
|
||
|
provider.update_videos(videos, update_statistics=True)
|
||
|
|
||
|
for video in videos:
|
||
|
self.progress_advance(1, "Updating video " + video.name)
|
||
|
self.check_video_deleted(video)
|
||
|
self.fetch_missing_thumbnails(video)
|
||
|
|
||
|
# Start downloading videos
|
||
|
for sub in work_subs:
|
||
|
Services.downloadManager().process_subscription(sub)
|
||
|
|
||
|
finally:
|
||
|
SynchronizeJob.running = False
|
||
|
self.__lock.release()
|
||
|
|
||
|
def check_new_videos(self, sub: Subscription):
|
||
|
from YtManagerApp.services import Services
|
||
|
provider: VideoProvider = Services.videoProviderManager().get(sub)
|
||
|
|
||
|
playlist_videos = provider.fetch_videos(sub)
|
||
|
if sub.rewrite_playlist_indices:
|
||
|
playlist_videos = sorted(playlist_videos, key=lambda x: x.publish_date)
|
||
|
else:
|
||
|
playlist_videos = sorted(playlist_videos, key=lambda x: x.playlist_index)
|
||
|
|
||
|
for item in playlist_videos:
|
||
|
results = Video.objects.filter(video_id=item.video_id, subscription=sub)
|
||
|
|
||
|
if not results.exists():
|
||
|
self.log.info('New video for subscription %s: %s %s"', sub, item.video_id, item.name)
|
||
|
|
||
|
# fix playlist index if necessary
|
||
|
if sub.rewrite_playlist_indices or Video.objects.filter(subscription=sub,
|
||
|
playlist_index=item.playlist_index).exists():
|
||
|
highest = Video.objects.filter(subscription=sub).aggregate(Max('playlist_index'))[
|
||
|
'playlist_index__max']
|
||
|
item.playlist_index = 1 + (highest or -1)
|
||
|
|
||
|
item.save()
|
||
|
self.__new_vids.append(item)
|
||
|
|
||
|
def fetch_missing_thumbnails(self, obj: Union[Subscription, Video]):
|
||
|
from YtManagerApp.services import Services
|
||
|
if obj.thumbnail.startswith("http"):
|
||
|
if isinstance(obj, Subscription):
|
||
|
obj.thumbnail = Services.downloadManager().fetch_thumbnail(obj.thumbnail, 'sub', obj.playlist_id,
|
||
|
settings.THUMBNAIL_SIZE_SUBSCRIPTION)
|
||
|
elif isinstance(obj, Video):
|
||
|
obj.thumbnail = Services.downloadManager().fetch_thumbnail(obj.thumbnail, 'video', obj.video_id,
|
||
|
settings.THUMBNAIL_SIZE_VIDEO)
|
||
|
obj.save()
|
||
|
|
||
|
def check_video_deleted(self, video: Video):
|
||
|
if video.downloaded_path is not None:
|
||
|
files = []
|
||
|
try:
|
||
|
files = list(video.get_files())
|
||
|
except OSError as e:
|
||
|
if e.errno != errno.ENOENT:
|
||
|
self.log.error("Could not access path %s. Error: %s", video.downloaded_path, e)
|
||
|
self.usr_err(f"Could not access path {video.downloaded_path}: {e}", suppress_notification=True)
|
||
|
return
|
||
|
|
||
|
# Try to find a valid video file
|
||
|
found_video = False
|
||
|
for file in files:
|
||
|
mime, _ = mimetypes.guess_type(file)
|
||
|
if mime is not None and mime.startswith("video"):
|
||
|
found_video = True
|
||
|
|
||
|
# Video not found, we can safely assume that the video was deleted.
|
||
|
if not found_video:
|
||
|
self.log.info("Video %d was deleted! [%s %s]", video.id, video.video_id, video.name)
|
||
|
# Clean up
|
||
|
for file in files:
|
||
|
try:
|
||
|
os.unlink(file)
|
||
|
except OSError as e:
|
||
|
self.log.error("Could not delete redundant file %s. Error: %s", file, e)
|
||
|
self.usr_err(f"Could not delete redundant file {file}: {e}", suppress_notification=True)
|
||
|
video.downloaded_path = None
|
||
|
|
||
|
# Mark watched?
|
||
|
user = video.subscription.user
|
||
|
if user.preferences['mark_deleted_as_watched']:
|
||
|
video.watched = True
|
||
|
|
||
|
video.save()
|
||
|
|
||
|
def update_video_stats(self, video: Video, yt_video):
|
||
|
if yt_video.n_likes is not None \
|
||
|
and yt_video.n_dislikes is not None \
|
||
|
and yt_video.n_likes + yt_video.n_dislikes > 0:
|
||
|
video.rating = yt_video.n_likes / (yt_video.n_likes + yt_video.n_dislikes)
|
||
|
|
||
|
video.views = yt_video.n_views
|
||
|
video.save()
|
||
|
|
||
|
|
||
|
class SubscriptionImporterJob(Job):
|
||
|
def __init__(self, job_execution, urls: List[str],
|
||
|
parent_folder: SubscriptionFolder,
|
||
|
auto_download: bool,
|
||
|
download_limit: int,
|
||
|
download_order: str,
|
||
|
automatically_delete_watched: bool):
|
||
|
|
||
|
super().__init__(job_execution)
|
||
|
self._urls = urls
|
||
|
self._parent_folder = parent_folder
|
||
|
self._auto_download = auto_download
|
||
|
self._download_limit = download_limit
|
||
|
self._download_order = download_order
|
||
|
self._automatically_delete_watched = automatically_delete_watched
|
||
|
|
||
|
def get_description(self):
|
||
|
return f"Importing {len(self._urls)} subscriptions..."
|
||
|
|
||
|
def run(self):
|
||
|
from YtManagerApp.services import Services
|
||
|
self.set_total_steps(len(self._urls))
|
||
|
for url in self._urls:
|
||
|
try:
|
||
|
self.progress_advance(progress_msg=url)
|
||
|
sub: Subscription = Services.videoProviderManager().fetch_subscription(url)
|
||
|
sub.parent_folder = self._parent_folder
|
||
|
sub.auto_download = self._auto_download
|
||
|
sub.download_limit = self._download_limit
|
||
|
sub.download_order = self._download_order
|
||
|
sub.automatically_delete_watched = self._automatically_delete_watched
|
||
|
sub.save()
|
||
|
except InvalidURLError as e:
|
||
|
self.log.error("Error importing URL %s: %s", url, e)
|
||
|
except ValueError as e:
|
||
|
self.log.error("Error importing URL %s: %s", url, e)
|
||
|
|
||
|
|
||
|
class SubscriptionManager(object):
|
||
|
def __init__(self):
|
||
|
self.__global_sync_job = None
|
||
|
|
||
|
def synchronize(self, sub: Subscription):
|
||
|
from YtManagerApp.services import Services
|
||
|
Services.scheduler().add_job(SynchronizeJob, args=[sub])
|
||
|
|
||
|
def synchronize_all(self):
|
||
|
from YtManagerApp.services import Services
|
||
|
Services.scheduler().add_job(SynchronizeJob, max_instances=1, coalesce=True)
|
||
|
|
||
|
def schedule_global_synchronize_job(self):
|
||
|
from YtManagerApp.services import Services
|
||
|
trigger = CronTrigger.from_crontab(Services.appConfig().sync_schedule)
|
||
|
|
||
|
if self.__global_sync_job is None:
|
||
|
trigger = CronTrigger.from_crontab(Services.appConfig().sync_schedule)
|
||
|
SynchronizeJob.__global_sync_job = Services.scheduler().add_job(SynchronizeJob, trigger, max_instances=1,
|
||
|
coalesce=True)
|
||
|
|
||
|
else:
|
||
|
self.__global_sync_job.reschedule(trigger, max_instances=1, coalesce=True)
|
||
|
|
||
|
def import_multiple(self, urls: List[str],
|
||
|
parent_folder: SubscriptionFolder,
|
||
|
auto_download: bool,
|
||
|
download_limit: int,
|
||
|
download_order: str,
|
||
|
automatically_delete_watched: bool):
|
||
|
from YtManagerApp.services import Services
|
||
|
Services.scheduler().add_job(SubscriptionImporterJob, args=[urls, parent_folder, auto_download, download_limit,
|
||
|
download_order, automatically_delete_watched])
|