ytsm/YtManagerApp/models.py

300 lines
11 KiB
Python
Raw Normal View History

import logging
from typing import Callable, Union, Any, Optional
import os
from django.contrib.auth.models import User
2018-10-10 22:43:50 +00:00
from django.contrib.auth.models import User
from django.db import models
2018-10-14 21:45:08 +00:00
from django.db.models.functions import Lower
from YtManagerApp.utils.youtube import YoutubeAPI, YoutubeChannelInfo, YoutubePlaylistInfo
2018-10-10 22:43:50 +00:00
2018-10-13 20:01:45 +00:00
# help_text = user shown text
# verbose_name = user shown name
# null = nullable, blank = user is allowed to set value to empty
2018-10-10 22:43:50 +00:00
class UserSettings(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE)
mark_deleted_as_watched = models.BooleanField(null=True)
delete_watched = models.BooleanField(null=True)
auto_download = models.BooleanField(null=True)
download_global_limit = models.IntegerField(null=True)
download_subscription_limit = models.IntegerField(null=True)
download_order = models.TextField(null=True)
download_path = models.TextField(null=True)
download_file_pattern = models.TextField(null=True)
download_format = models.TextField(null=True)
download_subtitles = models.BooleanField(null=True)
download_autogenerated_subtitles = models.BooleanField(null=True)
download_subtitles_all = models.BooleanField(null=True)
download_subtitles_langs = models.TextField(null=True)
download_subtitles_format = models.TextField(null=True)
@staticmethod
def find_by_user(user: User):
result = UserSettings.objects.filter(user=user)
2018-10-10 22:43:50 +00:00
if len(result) > 0:
return result.first()
return None
def __str__(self):
return str(self.user)
def to_dict(self):
ret = {}
if self.mark_deleted_as_watched is not None:
ret['MarkDeletedAsWatched'] = self.mark_deleted_as_watched
if self.delete_watched is not None:
ret['DeleteWatched'] = self.delete_watched
if self.auto_download is not None:
ret['AutoDownload'] = self.auto_download
if self.download_global_limit is not None:
ret['DownloadGlobalLimit'] = self.download_global_limit
if self.download_subscription_limit is not None:
ret['DownloadSubscriptionLimit'] = self.download_subscription_limit
if self.download_order is not None:
ret['DownloadOrder'] = self.download_order
if self.download_path is not None:
ret['DownloadPath'] = self.download_path
if self.download_file_pattern is not None:
ret['DownloadFilePattern'] = self.download_file_pattern
if self.download_format is not None:
ret['DownloadFormat'] = self.download_format
if self.download_subtitles is not None:
ret['DownloadSubtitles'] = self.download_subtitles
if self.download_autogenerated_subtitles is not None:
ret['DownloadAutogeneratedSubtitles'] = self.download_autogenerated_subtitles
if self.download_subtitles_all is not None:
ret['DownloadSubtitlesAll'] = self.download_subtitles_all
if self.download_subtitles_langs is not None:
ret['DownloadSubtitlesLangs'] = self.download_subtitles_langs
if self.download_subtitles_format is not None:
ret['DownloadSubtitlesFormat'] = self.download_subtitles_format
return ret
2018-10-04 11:36:11 +00:00
2018-10-04 11:36:11 +00:00
class SubscriptionFolder(models.Model):
2018-10-14 21:45:08 +00:00
name = models.CharField(null=False, max_length=250)
2018-10-04 11:36:11 +00:00
parent = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True)
2018-10-13 20:01:45 +00:00
user = models.ForeignKey(User, on_delete=models.CASCADE, null=False, blank=False)
2018-10-04 11:36:11 +00:00
class Meta:
ordering = [Lower('parent__name'), Lower('name')]
2018-10-04 11:36:11 +00:00
def __str__(self):
2018-10-14 21:45:08 +00:00
s = ""
current = self
while current is not None:
s = current.name + " > " + s
current = current.parent
return s[:-3]
def delete_folder(self, keep_subscriptions: bool):
if keep_subscriptions:
def visit(node: Union["SubscriptionFolder", "Subscription"]):
if isinstance(node, Subscription):
node.parent_folder = None
node.save()
SubscriptionFolder.traverse(self.id, self.user, visit)
self.delete()
@staticmethod
def traverse(root_folder_id: Optional[int],
user: User,
visit_func: Callable[[Union["SubscriptionFolder", "Subscription"]], Any]):
data_collected = []
def collect(data):
if data is not None:
data_collected.append(data)
# Visit root
if root_folder_id is not None:
root_folder = SubscriptionFolder.objects.get(id=root_folder_id)
collect(visit_func(root_folder))
queue = [root_folder_id]
visited = []
while len(queue) > 0:
folder_id = queue.pop()
if folder_id in visited:
logging.error('Found folder tree cycle for folder id %d.', folder_id)
continue
visited.append(folder_id)
for folder in SubscriptionFolder.objects.filter(parent_id=folder_id, user=user).order_by(Lower('name')):
collect(visit_func(folder))
queue.append(folder.id)
for subscription in Subscription.objects.filter(parent_folder_id=folder_id, user=user).order_by(Lower('name')):
collect(visit_func(subscription))
return data_collected
2018-10-04 11:36:11 +00:00
class Channel(models.Model):
channel_id = models.TextField(null=False, unique=True)
username = models.TextField(null=True, unique=True)
custom_url = models.TextField(null=True, unique=True)
name = models.TextField()
description = models.TextField()
icon_default = models.TextField()
icon_best = models.TextField()
upload_playlist_id = models.TextField()
def __str__(self):
return self.name
@staticmethod
def find_by_channel_id(channel_id):
result = Channel.objects.filter(channel_id=channel_id)
if len(result) > 0:
return result.first()
return None
@staticmethod
def find_by_username(username):
result = Channel.objects.filter(username=username)
if len(result) > 0:
return result.first()
return None
@staticmethod
def find_by_custom_url(custom_url):
result = Channel.objects.filter(custom_url=custom_url)
if len(result) > 0:
return result.first()
return None
def fill(self, yt_channel_info: YoutubeChannelInfo):
self.channel_id = yt_channel_info.getId()
self.custom_url = yt_channel_info.getCustomUrl()
self.name = yt_channel_info.getTitle()
self.description = yt_channel_info.getDescription()
self.icon_default = yt_channel_info.getDefaultThumbnailUrl()
self.icon_best = yt_channel_info.getBestThumbnailUrl()
self.upload_playlist_id = yt_channel_info.getUploadsPlaylist()
self.save()
@staticmethod
def get_or_create(url_type: str, url_id: str, yt_api: YoutubeAPI):
channel: Channel = None
info_channel: YoutubeChannelInfo = None
if url_type == 'user':
channel = Channel.find_by_username(url_id)
if not channel:
info_channel = yt_api.get_channel_info_by_username(url_id)
channel = Channel.find_by_channel_id(info_channel.getId())
elif url_type == 'channel_id':
channel = Channel.find_by_channel_id(url_id)
if not channel:
info_channel = yt_api.get_channel_info(url_id)
elif url_type == 'channel_custom':
channel = Channel.find_by_custom_url(url_id)
if not channel:
found_channel_id = yt_api.search_channel(url_id)
channel = Channel.find_by_channel_id(found_channel_id)
if not channel:
info_channel = yt_api.get_channel_info(found_channel_id)
# If we downloaded information about the channel, store information
# about the channel here.
if info_channel:
if not channel:
channel = Channel()
if url_type == 'user':
channel.username = url_id
channel.fill(info_channel)
return channel
2018-10-04 11:36:11 +00:00
class Subscription(models.Model):
name = models.CharField(null=False, max_length=1024)
parent_folder = models.ForeignKey(SubscriptionFolder, on_delete=models.CASCADE, null=True, blank=True)
playlist_id = models.CharField(null=False, max_length=128)
description = models.TextField()
channel = models.ForeignKey(Channel, on_delete=models.CASCADE)
icon_default = models.CharField(max_length=1024)
icon_best = models.CharField(max_length=1024)
2018-10-10 22:43:50 +00:00
user = models.ForeignKey(User, on_delete=models.CASCADE)
# overrides
auto_download = models.BooleanField(null=True, blank=True)
download_limit = models.IntegerField(null=True, blank=True)
download_order = models.CharField(null=True, blank=True, max_length=128)
manager_delete_after_watched = models.BooleanField(null=True, blank=True)
def fill_from_playlist(self, info_playlist: YoutubePlaylistInfo):
self.name = info_playlist.getTitle()
self.playlist_id = info_playlist.getId()
self.description = info_playlist.getDescription()
self.icon_default = info_playlist.getDefaultThumbnailUrl()
self.icon_best = info_playlist.getBestThumbnailUrl()
def copy_from_channel(self):
# No point in storing info about the 'uploads from X' playlist
self.name = self.channel.name
self.playlist_id = self.channel.upload_playlist_id
self.description = self.channel.description
self.icon_default = self.channel.icon_default
self.icon_best = self.channel.icon_best
def fetch_from_url(self, url, yt_api: YoutubeAPI):
url_type, url_id = yt_api.parse_channel_url(url)
if url_type == 'playlist_id':
info_playlist = yt_api.get_playlist_info(url_id)
self.channel = Channel.get_or_create('channel_id', info_playlist.getChannelId(), yt_api)
self.fill_from_playlist(info_playlist)
else:
self.channel = Channel.get_or_create(url_type, url_id, yt_api)
self.copy_from_channel()
def delete_subscription(self, keep_downloaded_videos: bool):
self.delete()
2018-10-04 11:36:11 +00:00
def __str__(self):
return self.name
class Video(models.Model):
video_id = models.TextField(null=False)
2018-10-04 11:36:11 +00:00
name = models.TextField(null=False)
description = models.TextField()
2018-10-04 11:36:11 +00:00
watched = models.BooleanField(default=False, null=False)
downloaded_path = models.TextField(null=True, blank=True)
2018-10-04 11:36:11 +00:00
subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE)
playlist_index = models.IntegerField(null=False)
publish_date = models.DateTimeField(null=False)
icon_default = models.TextField()
icon_best = models.TextField()
2018-10-13 20:01:45 +00:00
uploader_name = models.TextField(null=False)
views = models.IntegerField(null=False, default=0)
rating = models.FloatField(null=False, default=0.5)
2018-10-04 11:36:11 +00:00
def mark_watched(self):
self.watched = True
def mark_unwatched(self):
self.watched = False
def get_files(self):
if self.downloaded_path is not None:
directory, file_pattern = os.path.split(self.downloaded_path)
for file in os.listdir(directory):
if file.startswith(file_pattern):
yield os.path.join(directory, file)
2018-10-04 11:36:11 +00:00
def __str__(self):
return self.name