import logging
import os
from typing import Callable, Union, Any, Optional
from django.contrib.auth.models import User
from django.db import models
from django.db.models.functions import Lower
from YtManagerApp.utils import youtube
# help_text = user shown text
# verbose_name = user shown name
# null = nullable, blank = user is allowed to set value to empty
VIDEO_ORDER_CHOICES = [
('newest', 'Newest'),
('oldest', 'Oldest'),
('playlist', 'Playlist order'),
('playlist_reverse', 'Reverse playlist order'),
('popularity', 'Popularity'),
('rating', 'Top rated'),
]
VIDEO_ORDER_MAPPING = {
'newest': '-publish_date',
'oldest': 'publish_date',
'playlist': 'playlist_index',
'playlist_reverse': '-playlist_index',
'popularity': '-views',
'rating': '-rating'
}
class UserSettings(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE)
mark_deleted_as_watched = models.BooleanField(
null=True, blank=True,
help_text='When a downloaded video is deleted from the system, it will be marked as \'watched\'.')
delete_watched = models.BooleanField(
null=True, blank=True,
help_text='Videos marked as watched are automatically deleted.')
auto_download = models.BooleanField(
null=True, blank=True,
help_text='Enables or disables automatic downloading.')
download_global_limit = models.IntegerField(
null=True, blank=True,
help_text='Limits the total number of videos downloaded (-1 = no limit).')
download_subscription_limit = models.IntegerField(
null=True, blank=True,
help_text='Limits the number of videos downloaded per subscription (-1 = no limit). '
' This setting can be overriden for each individual subscription in the subscription edit dialog.')
download_order = models.CharField(
null=True, blank=True,
max_length=100,
choices=VIDEO_ORDER_CHOICES,
help_text='The order in which videos will be downloaded.'
)
download_path = models.CharField(
null=True, blank=True,
max_length=1024,
help_text='Path on the disk where downloaded videos are stored. '
' You can use environment variables using syntax: ${env:...}
'
)
download_file_pattern = models.CharField(
null=True, blank=True,
max_length=1024,
help_text='A pattern which describes how downloaded files are organized. Extensions are automatically appended.'
' You can use the following fields, using the ${field}
syntax:'
' channel, channel_id, playlist, playlist_id, playlist_index, title, id.'
' Example: ${channel}/${playlist}/S01E${playlist_index} - ${title} [${id}]
')
download_format = models.CharField(
null=True, blank=True,
max_length=256,
help_text='Download format that will be passed to youtube-dl. '
' See the '
' youtube-dl documentation for more details.')
download_subtitles = models.BooleanField(
null=True, blank=True,
help_text='Enable downloading subtitles for the videos.'
' The flag is passed directly to youtube-dl. You can find more information'
' here.')
download_autogenerated_subtitles = models.BooleanField(
null=True, blank=True,
help_text='Enables downloading the automatically generated subtitle.'
' The flag is passed directly to youtube-dl. You can find more information'
' here.')
download_subtitles_all = models.BooleanField(
null=True, blank=True,
help_text='If enabled, all the subtitles in all the available languages will be downloaded.'
' The flag is passed directly to youtube-dl. You can find more information'
' here.')
download_subtitles_langs = models.CharField(
null=True, blank=True,
max_length=250,
help_text='Comma separated list of languages for which subtitles will be downloaded.'
' The flag is passed directly to youtube-dl. You can find more information'
' here.')
download_subtitles_format = models.CharField(
null=True, blank=True,
max_length=100,
help_text='Subtitles format preference. Examples: srt/ass/best'
' The flag is passed directly to youtube-dl. You can find more information'
' here.')
@staticmethod
def find_by_user(user: User):
result = UserSettings.objects.filter(user=user)
if len(result) > 0:
return result.first()
return None
def __str__(self):
return str(self.user)
def to_dict(self):
ret = {}
if self.mark_deleted_as_watched is not None:
ret['MarkDeletedAsWatched'] = self.mark_deleted_as_watched
if self.delete_watched is not None:
ret['DeleteWatched'] = self.delete_watched
if self.auto_download is not None:
ret['AutoDownload'] = self.auto_download
if self.download_global_limit is not None:
ret['DownloadGlobalLimit'] = self.download_global_limit
if self.download_subscription_limit is not None:
ret['DownloadSubscriptionLimit'] = self.download_subscription_limit
if self.download_order is not None:
ret['DownloadOrder'] = self.download_order
if self.download_path is not None:
ret['DownloadPath'] = self.download_path
if self.download_file_pattern is not None:
ret['DownloadFilePattern'] = self.download_file_pattern
if self.download_format is not None:
ret['DownloadFormat'] = self.download_format
if self.download_subtitles is not None:
ret['DownloadSubtitles'] = self.download_subtitles
if self.download_autogenerated_subtitles is not None:
ret['DownloadAutogeneratedSubtitles'] = self.download_autogenerated_subtitles
if self.download_subtitles_all is not None:
ret['DownloadSubtitlesAll'] = self.download_subtitles_all
if self.download_subtitles_langs is not None:
ret['DownloadSubtitlesLangs'] = self.download_subtitles_langs
if self.download_subtitles_format is not None:
ret['DownloadSubtitlesFormat'] = self.download_subtitles_format
return ret
class SubscriptionFolder(models.Model):
name = models.CharField(null=False, max_length=250)
parent = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True)
user = models.ForeignKey(User, on_delete=models.CASCADE, null=False, blank=False)
class Meta:
ordering = [Lower('parent__name'), Lower('name')]
def __str__(self):
s = ""
current = self
while current is not None:
s = current.name + " > " + s
current = current.parent
return s[:-3]
def __repr__(self):
return f'folder {self.id}, name="{self.name}"'
def delete_folder(self, keep_subscriptions: bool):
if keep_subscriptions:
def visit(node: Union["SubscriptionFolder", "Subscription"]):
if isinstance(node, Subscription):
node.parent_folder = None
node.save()
SubscriptionFolder.traverse(self.id, self.user, visit)
self.delete()
@staticmethod
def traverse(root_folder_id: Optional[int],
user: User,
visit_func: Callable[[Union["SubscriptionFolder", "Subscription"]], Any]):
data_collected = []
def collect(data):
if data is not None:
data_collected.append(data)
# Visit root
if root_folder_id is not None:
root_folder = SubscriptionFolder.objects.get(id=root_folder_id)
collect(visit_func(root_folder))
queue = [root_folder_id]
visited = []
while len(queue) > 0:
folder_id = queue.pop()
if folder_id in visited:
logging.error('Found folder tree cycle for folder id %d.', folder_id)
continue
visited.append(folder_id)
for folder in SubscriptionFolder.objects.filter(parent_id=folder_id, user=user).order_by(Lower('name')):
collect(visit_func(folder))
queue.append(folder.id)
for subscription in Subscription.objects.filter(parent_folder_id=folder_id, user=user).order_by(Lower('name')):
collect(visit_func(subscription))
return data_collected
class Subscription(models.Model):
name = models.CharField(null=False, max_length=1024)
parent_folder = models.ForeignKey(SubscriptionFolder, on_delete=models.CASCADE, null=True, blank=True)
playlist_id = models.CharField(null=False, max_length=128)
description = models.TextField()
channel_id = models.CharField(max_length=128)
channel_name = models.CharField(max_length=1024)
icon_default = models.CharField(max_length=1024)
icon_best = models.CharField(max_length=1024)
user = models.ForeignKey(User, on_delete=models.CASCADE)
# overrides
auto_download = models.BooleanField(null=True, blank=True)
download_limit = models.IntegerField(null=True, blank=True)
download_order = models.CharField(
null=True, blank=True,
max_length=128,
choices=VIDEO_ORDER_CHOICES)
delete_after_watched = models.BooleanField(null=True, blank=True)
def __str__(self):
return self.name
def __repr__(self):
return f'subscription {self.id}, name="{self.name}", playlist_id="{self.playlist_id}"'
def fill_from_playlist(self, info_playlist: youtube.Playlist):
self.name = info_playlist.title
self.playlist_id = info_playlist.id
self.description = info_playlist.description
self.channel_id = info_playlist.channel_id
self.channel_name = info_playlist.channel_title
self.icon_default = youtube.default_thumbnail(info_playlist).url
self.icon_best = youtube.best_thumbnail(info_playlist).url
def copy_from_channel(self, info_channel: youtube.Channel):
# No point in storing info about the 'uploads from X' playlist
self.name = info_channel.title
self.playlist_id = info_channel.uploads_playlist.id
self.description = info_channel.description
self.channel_id = info_channel.id
self.channel_name = info_channel.title
self.icon_default = youtube.default_thumbnail(info_channel).url
self.icon_best = youtube.best_thumbnail(info_channel).url
def fetch_from_url(self, url, yt_api: youtube.YoutubeAPI):
url_parsed = yt_api.parse_url(url)
if 'playlist' in url_parsed:
info_playlist = yt_api.playlist(url=url)
if info_playlist is None:
raise ValueError('Invalid playlist ID!')
self.fill_from_playlist(info_playlist)
else:
info_channel = yt_api.channel(url=url)
if info_channel is None:
raise ValueError('Cannot find channel!')
self.copy_from_channel(info_channel)
def delete_subscription(self, keep_downloaded_videos: bool):
self.delete()
def get_overloads_dict(self) -> dict:
d = {}
if self.auto_download is not None:
d['AutoDownload'] = self.auto_download
if self.download_limit is not None:
d['DownloadSubscriptionLimit'] = self.download_limit
if self.download_order is not None:
d['DownloadOrder'] = self.download_order
if self.delete_after_watched is not None:
d['DeleteWatched'] = self.delete_after_watched
return d
class Video(models.Model):
video_id = models.TextField(null=False)
name = models.TextField(null=False)
description = models.TextField()
watched = models.BooleanField(default=False, null=False)
downloaded_path = models.TextField(null=True, blank=True)
subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE)
playlist_index = models.IntegerField(null=False)
publish_date = models.DateTimeField(null=False)
icon_default = models.TextField()
icon_best = models.TextField()
uploader_name = models.TextField(null=False)
views = models.IntegerField(null=False, default=0)
rating = models.FloatField(null=False, default=0.5)
@staticmethod
def create(playlist_item: youtube.PlaylistItem, subscription: Subscription):
video = Video()
video.video_id = playlist_item.resource_video_id
video.name = playlist_item.title
video.description = playlist_item.description
video.watched = False
video.downloaded_path = None
video.subscription = subscription
video.playlist_index = playlist_item.position
video.publish_date = playlist_item.published_at
video.icon_default = youtube.default_thumbnail(playlist_item).url
video.icon_best = youtube.best_thumbnail(playlist_item).url
video.save()
return video
def mark_watched(self):
self.watched = True
self.save()
if self.downloaded_path is not None:
from YtManagerApp.appconfig import settings
from YtManagerApp.management.jobs.delete_video import schedule_delete_video
from YtManagerApp.management.jobs.synchronize import schedule_synchronize_now_subscription
if settings.getboolean_sub(self.subscription, 'user', 'DeleteWatched'):
schedule_delete_video(self)
schedule_synchronize_now_subscription(self.subscription)
def mark_unwatched(self):
from YtManagerApp.management.jobs.synchronize import schedule_synchronize_now_subscription
self.watched = False
self.save()
schedule_synchronize_now_subscription(self.subscription)
def get_files(self):
if self.downloaded_path is not None:
directory, file_pattern = os.path.split(self.downloaded_path)
for file in os.listdir(directory):
if file.startswith(file_pattern):
yield os.path.join(directory, file)
def delete_files(self):
if self.downloaded_path is not None:
from YtManagerApp.management.jobs.delete_video import schedule_delete_video
from YtManagerApp.appconfig import settings
from YtManagerApp.management.jobs.synchronize import schedule_synchronize_now_subscription
schedule_delete_video(self)
# Mark watched?
if settings.getboolean_sub(self, 'user', 'MarkDeletedAsWatched'):
self.watched = True
schedule_synchronize_now_subscription(self.subscription)
def download(self):
if not self.downloaded_path:
from YtManagerApp.management.jobs.download_video import schedule_download_video
schedule_download_video(self)
def __str__(self):
return self.name
def __repr__(self):
return f'video {self.id}, video_id="{self.video_id}"'