Finished CRUD operations for subscriptions and folders.
This commit is contained in:
@ -1,6 +1,11 @@
|
||||
from django.db import models
|
||||
import logging
|
||||
from typing import Callable, Union, Any, Optional
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from django.contrib.auth.models import User
|
||||
from django.db import models
|
||||
from django.db.models.functions import Lower
|
||||
from YtManagerApp.utils.youtube import YoutubeAPI, YoutubeChannelInfo, YoutubePlaylistInfo
|
||||
|
||||
# help_text = user shown text
|
||||
# verbose_name = user shown name
|
||||
@ -26,7 +31,7 @@ class UserSettings(models.Model):
|
||||
|
||||
@staticmethod
|
||||
def find_by_user(user: User):
|
||||
result = UserSettings.objects.filter2(user=user)
|
||||
result = UserSettings.objects.filter(user=user)
|
||||
if len(result) > 0:
|
||||
return result.first()
|
||||
return None
|
||||
@ -74,6 +79,9 @@ class SubscriptionFolder(models.Model):
|
||||
parent = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True)
|
||||
user = models.ForeignKey(User, on_delete=models.CASCADE, null=False, blank=False)
|
||||
|
||||
class Meta:
|
||||
ordering = [Lower('parent__name'), Lower('name')]
|
||||
|
||||
def __str__(self):
|
||||
s = ""
|
||||
current = self
|
||||
@ -82,8 +90,53 @@ class SubscriptionFolder(models.Model):
|
||||
current = current.parent
|
||||
return s[:-3]
|
||||
|
||||
class Meta:
|
||||
ordering = [Lower('parent__name'), Lower('name')]
|
||||
def delete_folder(self, keep_subscriptions: bool):
|
||||
if keep_subscriptions:
|
||||
|
||||
def visit(node: Union["SubscriptionFolder", "Subscription"]):
|
||||
if isinstance(node, Subscription):
|
||||
node.parent_folder = None
|
||||
node.save()
|
||||
|
||||
SubscriptionFolder.traverse(self.id, self.user, visit)
|
||||
|
||||
self.delete()
|
||||
|
||||
@staticmethod
|
||||
def traverse(root_folder_id: Optional[int],
|
||||
user: User,
|
||||
visit_func: Callable[[Union["SubscriptionFolder", "Subscription"]], Any]):
|
||||
|
||||
data_collected = []
|
||||
|
||||
def collect(data):
|
||||
if data is not None:
|
||||
data_collected.append(data)
|
||||
|
||||
# Visit root
|
||||
if root_folder_id is not None:
|
||||
root_folder = SubscriptionFolder.objects.get(id=root_folder_id)
|
||||
collect(visit_func(root_folder))
|
||||
|
||||
queue = [root_folder_id]
|
||||
visited = []
|
||||
|
||||
while len(queue) > 0:
|
||||
folder_id = queue.pop()
|
||||
|
||||
if folder_id in visited:
|
||||
logging.error('Found folder tree cycle for folder id %d.', folder_id)
|
||||
continue
|
||||
visited.append(folder_id)
|
||||
|
||||
for folder in SubscriptionFolder.objects.filter(parent_id=folder_id, user=user).order_by(Lower('name')):
|
||||
collect(visit_func(folder))
|
||||
queue.append(folder.id)
|
||||
|
||||
for subscription in Subscription.objects.filter(parent_folder_id=folder_id, user=user).order_by(Lower('name')):
|
||||
collect(visit_func(subscription))
|
||||
|
||||
return data_collected
|
||||
|
||||
|
||||
class Channel(models.Model):
|
||||
@ -96,6 +149,9 @@ class Channel(models.Model):
|
||||
icon_best = models.TextField()
|
||||
upload_playlist_id = models.TextField()
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
@staticmethod
|
||||
def find_by_channel_id(channel_id):
|
||||
result = Channel.objects.filter(channel_id=channel_id)
|
||||
@ -117,25 +173,95 @@ class Channel(models.Model):
|
||||
return result.first()
|
||||
return None
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
def fill(self, yt_channel_info: YoutubeChannelInfo):
|
||||
self.channel_id = yt_channel_info.getId()
|
||||
self.custom_url = yt_channel_info.getCustomUrl()
|
||||
self.name = yt_channel_info.getTitle()
|
||||
self.description = yt_channel_info.getDescription()
|
||||
self.icon_default = yt_channel_info.getDefaultThumbnailUrl()
|
||||
self.icon_best = yt_channel_info.getBestThumbnailUrl()
|
||||
self.upload_playlist_id = yt_channel_info.getUploadsPlaylist()
|
||||
self.save()
|
||||
|
||||
@staticmethod
|
||||
def get_or_create(url_type: str, url_id: str, yt_api: YoutubeAPI):
|
||||
channel: Channel = None
|
||||
info_channel: YoutubeChannelInfo = None
|
||||
|
||||
if url_type == 'user':
|
||||
channel = Channel.find_by_username(url_id)
|
||||
if not channel:
|
||||
info_channel = yt_api.get_channel_info_by_username(url_id)
|
||||
channel = Channel.find_by_channel_id(info_channel.getId())
|
||||
|
||||
elif url_type == 'channel_id':
|
||||
channel = Channel.find_by_channel_id(url_id)
|
||||
if not channel:
|
||||
info_channel = yt_api.get_channel_info(url_id)
|
||||
|
||||
elif url_type == 'channel_custom':
|
||||
channel = Channel.find_by_custom_url(url_id)
|
||||
if not channel:
|
||||
found_channel_id = yt_api.search_channel(url_id)
|
||||
channel = Channel.find_by_channel_id(found_channel_id)
|
||||
if not channel:
|
||||
info_channel = yt_api.get_channel_info(found_channel_id)
|
||||
|
||||
# If we downloaded information about the channel, store information
|
||||
# about the channel here.
|
||||
if info_channel:
|
||||
if not channel:
|
||||
channel = Channel()
|
||||
if url_type == 'user':
|
||||
channel.username = url_id
|
||||
channel.fill(info_channel)
|
||||
|
||||
return channel
|
||||
|
||||
|
||||
class Subscription(models.Model):
|
||||
name = models.TextField(null=False)
|
||||
parent_folder = models.ForeignKey(SubscriptionFolder, on_delete=models.SET_NULL, null=True, blank=True)
|
||||
playlist_id = models.TextField(null=False, unique=True)
|
||||
name = models.CharField(null=False, max_length=1024)
|
||||
parent_folder = models.ForeignKey(SubscriptionFolder, on_delete=models.CASCADE, null=True, blank=True)
|
||||
playlist_id = models.CharField(null=False, max_length=128)
|
||||
description = models.TextField()
|
||||
channel = models.ForeignKey(Channel, on_delete=models.CASCADE)
|
||||
icon_default = models.TextField()
|
||||
icon_best = models.TextField()
|
||||
icon_default = models.CharField(max_length=1024)
|
||||
icon_best = models.CharField(max_length=1024)
|
||||
user = models.ForeignKey(User, on_delete=models.CASCADE)
|
||||
|
||||
# overrides
|
||||
auto_download = models.BooleanField(null=True)
|
||||
download_limit = models.IntegerField(null=True)
|
||||
download_order = models.TextField(null=True)
|
||||
manager_delete_after_watched = models.BooleanField(null=True)
|
||||
auto_download = models.BooleanField(null=True, blank=True)
|
||||
download_limit = models.IntegerField(null=True, blank=True)
|
||||
download_order = models.CharField(null=True, blank=True, max_length=128)
|
||||
manager_delete_after_watched = models.BooleanField(null=True, blank=True)
|
||||
|
||||
def fill_from_playlist(self, info_playlist: YoutubePlaylistInfo):
|
||||
self.name = info_playlist.getTitle()
|
||||
self.playlist_id = info_playlist.getId()
|
||||
self.description = info_playlist.getDescription()
|
||||
self.icon_default = info_playlist.getDefaultThumbnailUrl()
|
||||
self.icon_best = info_playlist.getBestThumbnailUrl()
|
||||
|
||||
def copy_from_channel(self):
|
||||
# No point in storing info about the 'uploads from X' playlist
|
||||
self.name = self.channel.name
|
||||
self.playlist_id = self.channel.upload_playlist_id
|
||||
self.description = self.channel.description
|
||||
self.icon_default = self.channel.icon_default
|
||||
self.icon_best = self.channel.icon_best
|
||||
|
||||
def fetch_from_url(self, url, yt_api: YoutubeAPI):
|
||||
url_type, url_id = yt_api.parse_channel_url(url)
|
||||
if url_type == 'playlist_id':
|
||||
info_playlist = yt_api.get_playlist_info(url_id)
|
||||
self.channel = Channel.get_or_create('channel_id', info_playlist.getChannelId(), yt_api)
|
||||
self.fill_from_playlist(info_playlist)
|
||||
else:
|
||||
self.channel = Channel.get_or_create(url_type, url_id, yt_api)
|
||||
self.copy_from_channel()
|
||||
|
||||
def delete_subscription(self, keep_downloaded_videos: bool):
|
||||
self.delete()
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
Reference in New Issue
Block a user