youtube-channel-manager/YtManagerApp/models.py

335 lines
13 KiB
Python

import logging
from typing import Callable, Union, Any, Optional
import os
from django.contrib.auth.models import User
from django.contrib.auth.models import User
from django.db import models
from django.db.models.functions import Lower
from YtManagerApp.utils.youtube import YoutubeAPI, YoutubeChannelInfo, YoutubePlaylistInfo
# help_text = user shown text
# verbose_name = user shown name
# null = nullable, blank = user is allowed to set value to empty
class UserSettings(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE)
mark_deleted_as_watched = models.BooleanField(null=True)
delete_watched = models.BooleanField(null=True)
auto_download = models.BooleanField(null=True)
download_global_limit = models.IntegerField(null=True)
download_subscription_limit = models.IntegerField(null=True)
download_order = models.TextField(null=True)
download_path = models.TextField(null=True)
download_file_pattern = models.TextField(null=True)
download_format = models.TextField(null=True)
download_subtitles = models.BooleanField(null=True)
download_autogenerated_subtitles = models.BooleanField(null=True)
download_subtitles_all = models.BooleanField(null=True)
download_subtitles_langs = models.TextField(null=True)
download_subtitles_format = models.TextField(null=True)
@staticmethod
def find_by_user(user: User):
result = UserSettings.objects.filter(user=user)
if len(result) > 0:
return result.first()
return None
def __str__(self):
return str(self.user)
def to_dict(self):
ret = {}
if self.mark_deleted_as_watched is not None:
ret['MarkDeletedAsWatched'] = self.mark_deleted_as_watched
if self.delete_watched is not None:
ret['DeleteWatched'] = self.delete_watched
if self.auto_download is not None:
ret['AutoDownload'] = self.auto_download
if self.download_global_limit is not None:
ret['DownloadGlobalLimit'] = self.download_global_limit
if self.download_subscription_limit is not None:
ret['DownloadSubscriptionLimit'] = self.download_subscription_limit
if self.download_order is not None:
ret['DownloadOrder'] = self.download_order
if self.download_path is not None:
ret['DownloadPath'] = self.download_path
if self.download_file_pattern is not None:
ret['DownloadFilePattern'] = self.download_file_pattern
if self.download_format is not None:
ret['DownloadFormat'] = self.download_format
if self.download_subtitles is not None:
ret['DownloadSubtitles'] = self.download_subtitles
if self.download_autogenerated_subtitles is not None:
ret['DownloadAutogeneratedSubtitles'] = self.download_autogenerated_subtitles
if self.download_subtitles_all is not None:
ret['DownloadSubtitlesAll'] = self.download_subtitles_all
if self.download_subtitles_langs is not None:
ret['DownloadSubtitlesLangs'] = self.download_subtitles_langs
if self.download_subtitles_format is not None:
ret['DownloadSubtitlesFormat'] = self.download_subtitles_format
return ret
class SubscriptionFolder(models.Model):
name = models.CharField(null=False, max_length=250)
parent = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True)
user = models.ForeignKey(User, on_delete=models.CASCADE, null=False, blank=False)
class Meta:
ordering = [Lower('parent__name'), Lower('name')]
def __str__(self):
s = ""
current = self
while current is not None:
s = current.name + " > " + s
current = current.parent
return s[:-3]
def delete_folder(self, keep_subscriptions: bool):
if keep_subscriptions:
def visit(node: Union["SubscriptionFolder", "Subscription"]):
if isinstance(node, Subscription):
node.parent_folder = None
node.save()
SubscriptionFolder.traverse(self.id, self.user, visit)
self.delete()
@staticmethod
def traverse(root_folder_id: Optional[int],
user: User,
visit_func: Callable[[Union["SubscriptionFolder", "Subscription"]], Any]):
data_collected = []
def collect(data):
if data is not None:
data_collected.append(data)
# Visit root
if root_folder_id is not None:
root_folder = SubscriptionFolder.objects.get(id=root_folder_id)
collect(visit_func(root_folder))
queue = [root_folder_id]
visited = []
while len(queue) > 0:
folder_id = queue.pop()
if folder_id in visited:
logging.error('Found folder tree cycle for folder id %d.', folder_id)
continue
visited.append(folder_id)
for folder in SubscriptionFolder.objects.filter(parent_id=folder_id, user=user).order_by(Lower('name')):
collect(visit_func(folder))
queue.append(folder.id)
for subscription in Subscription.objects.filter(parent_folder_id=folder_id, user=user).order_by(Lower('name')):
collect(visit_func(subscription))
return data_collected
class Channel(models.Model):
channel_id = models.TextField(null=False, unique=True)
username = models.TextField(null=True, unique=True)
custom_url = models.TextField(null=True, unique=True)
name = models.TextField()
description = models.TextField()
icon_default = models.TextField()
icon_best = models.TextField()
upload_playlist_id = models.TextField()
def __str__(self):
return self.name
@staticmethod
def find_by_channel_id(channel_id):
result = Channel.objects.filter(channel_id=channel_id)
if len(result) > 0:
return result.first()
return None
@staticmethod
def find_by_username(username):
result = Channel.objects.filter(username=username)
if len(result) > 0:
return result.first()
return None
@staticmethod
def find_by_custom_url(custom_url):
result = Channel.objects.filter(custom_url=custom_url)
if len(result) > 0:
return result.first()
return None
def fill(self, yt_channel_info: YoutubeChannelInfo):
self.channel_id = yt_channel_info.getId()
self.custom_url = yt_channel_info.getCustomUrl()
self.name = yt_channel_info.getTitle()
self.description = yt_channel_info.getDescription()
self.icon_default = yt_channel_info.getDefaultThumbnailUrl()
self.icon_best = yt_channel_info.getBestThumbnailUrl()
self.upload_playlist_id = yt_channel_info.getUploadsPlaylist()
self.save()
@staticmethod
def get_or_create(url_type: str, url_id: str, yt_api: YoutubeAPI):
channel: Channel = None
info_channel: YoutubeChannelInfo = None
if url_type == 'user':
channel = Channel.find_by_username(url_id)
if not channel:
info_channel = yt_api.get_channel_info_by_username(url_id)
channel = Channel.find_by_channel_id(info_channel.getId())
elif url_type == 'channel_id':
channel = Channel.find_by_channel_id(url_id)
if not channel:
info_channel = yt_api.get_channel_info(url_id)
elif url_type == 'channel_custom':
channel = Channel.find_by_custom_url(url_id)
if not channel:
found_channel_id = yt_api.search_channel(url_id)
channel = Channel.find_by_channel_id(found_channel_id)
if not channel:
info_channel = yt_api.get_channel_info(found_channel_id)
# If we downloaded information about the channel, store information
# about the channel here.
if info_channel:
if not channel:
channel = Channel()
if url_type == 'user':
channel.username = url_id
channel.fill(info_channel)
return channel
class Subscription(models.Model):
name = models.CharField(null=False, max_length=1024)
parent_folder = models.ForeignKey(SubscriptionFolder, on_delete=models.CASCADE, null=True, blank=True)
playlist_id = models.CharField(null=False, max_length=128)
description = models.TextField()
channel = models.ForeignKey(Channel, on_delete=models.CASCADE)
icon_default = models.CharField(max_length=1024)
icon_best = models.CharField(max_length=1024)
user = models.ForeignKey(User, on_delete=models.CASCADE)
# overrides
auto_download = models.BooleanField(null=True, blank=True)
download_limit = models.IntegerField(null=True, blank=True)
download_order = models.CharField(null=True, blank=True, max_length=128)
manager_delete_after_watched = models.BooleanField(null=True, blank=True)
def fill_from_playlist(self, info_playlist: YoutubePlaylistInfo):
self.name = info_playlist.getTitle()
self.playlist_id = info_playlist.getId()
self.description = info_playlist.getDescription()
self.icon_default = info_playlist.getDefaultThumbnailUrl()
self.icon_best = info_playlist.getBestThumbnailUrl()
def copy_from_channel(self):
# No point in storing info about the 'uploads from X' playlist
self.name = self.channel.name
self.playlist_id = self.channel.upload_playlist_id
self.description = self.channel.description
self.icon_default = self.channel.icon_default
self.icon_best = self.channel.icon_best
def fetch_from_url(self, url, yt_api: YoutubeAPI):
url_type, url_id = yt_api.parse_channel_url(url)
if url_type == 'playlist_id':
info_playlist = yt_api.get_playlist_info(url_id)
self.channel = Channel.get_or_create('channel_id', info_playlist.getChannelId(), yt_api)
self.fill_from_playlist(info_playlist)
else:
self.channel = Channel.get_or_create(url_type, url_id, yt_api)
self.copy_from_channel()
def delete_subscription(self, keep_downloaded_videos: bool):
self.delete()
def __str__(self):
return self.name
class Video(models.Model):
video_id = models.TextField(null=False)
name = models.TextField(null=False)
description = models.TextField()
watched = models.BooleanField(default=False, null=False)
downloaded_path = models.TextField(null=True, blank=True)
subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE)
playlist_index = models.IntegerField(null=False)
publish_date = models.DateTimeField(null=False)
icon_default = models.TextField()
icon_best = models.TextField()
uploader_name = models.TextField(null=False)
views = models.IntegerField(null=False, default=0)
rating = models.FloatField(null=False, default=0.5)
def mark_watched(self):
self.watched = True
self.save()
if self.downloaded_path is not None:
from YtManagerApp.appconfig import get_user_config
from YtManagerApp.management.jobs.delete_video import schedule_delete_video
from YtManagerApp.management.jobs.synchronize import schedule_synchronize_now_subscription
user_cfg = get_user_config(self.subscription.user)
if user_cfg.getboolean('user', 'DeleteWatched'):
schedule_delete_video(self)
schedule_synchronize_now_subscription(self.subscription)
def mark_unwatched(self):
from YtManagerApp.management.jobs.synchronize import schedule_synchronize_now_subscription
self.watched = False
self.save()
schedule_synchronize_now_subscription(self.subscription)
def get_files(self):
if self.downloaded_path is not None:
directory, file_pattern = os.path.split(self.downloaded_path)
for file in os.listdir(directory):
if file.startswith(file_pattern):
yield os.path.join(directory, file)
def delete_files(self):
if self.downloaded_path is not None:
from YtManagerApp.management.jobs.delete_video import schedule_delete_video
from YtManagerApp.appconfig import get_user_config
from YtManagerApp.management.jobs.synchronize import schedule_synchronize_now_subscription
schedule_delete_video(self)
# Mark watched?
user_cfg = get_user_config(self.subscription.user)
if user_cfg.getboolean('user', 'MarkDeletedAsWatched'):
self.watched = True
schedule_synchronize_now_subscription(self.subscription)
def download(self):
if not self.downloaded_path:
from YtManagerApp.management.jobs.download_video import schedule_download_video
schedule_download_video(self)
def __str__(self):
return self.name