-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -751,20 +705,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -772,9 +712,6 @@
-
-
-
@@ -797,48 +734,15 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -856,29 +760,13 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
@@ -916,30 +804,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -954,15 +818,204 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/YtManagerApp/management/jobs/delete_video.py b/YtManagerApp/management/jobs/delete_video.py
index 19057d6..7320ff6 100644
--- a/YtManagerApp/management/jobs/delete_video.py
+++ b/YtManagerApp/management/jobs/delete_video.py
@@ -35,4 +35,5 @@ def schedule_delete_video(video: Video):
:param video:
:return:
"""
- scheduler.instance.add_job(delete_video, args=[video])
+ job = scheduler.scheduler.add_job(delete_video, args=[video])
+ log.info('Scheduled delete video job video=(%s), job=%s', video, job.id)
diff --git a/YtManagerApp/management/jobs/download_video.py b/YtManagerApp/management/jobs/download_video.py
index cc1756c..7920877 100644
--- a/YtManagerApp/management/jobs/download_video.py
+++ b/YtManagerApp/management/jobs/download_video.py
@@ -25,8 +25,8 @@ def __get_valid_path(path):
def __build_youtube_dl_params(video: Video):
# resolve path
pattern_dict = {
- 'channel': video.subscription.channel.name,
- 'channel_id': video.subscription.channel.channel_id,
+ 'channel': video.subscription.channel_name,
+ 'channel_id': video.subscription.channel_id,
'playlist': video.subscription.name,
'playlist_id': video.subscription.playlist_id,
'playlist_index': "{:03d}".format(1 + video.playlist_index),
@@ -88,7 +88,7 @@ def download_video(video: Video, attempt: int = 1):
elif attempt <= max_attempts:
log.warning('Re-enqueueing video (attempt %d/%d)', attempt, max_attempts)
- scheduler.instance.add_job(download_video, args=[video, attempt + 1])
+ __schedule_download_video(video, attempt + 1)
else:
log.error('Multiple attempts to download video %d [%s %s] failed!', video.id, video.video_id, video.name)
@@ -96,10 +96,15 @@ def download_video(video: Video, attempt: int = 1):
video.save()
+def __schedule_download_video(video: Video, attempt=1):
+ job = scheduler.scheduler.add_job(download_video, args=[video, attempt])
+ log.info('Scheduled download video job video=(%s), attempt=%d, job=%s', video, attempt, job.id)
+
+
def schedule_download_video(video: Video):
"""
Schedules a download video job to run immediately.
:param video:
:return:
"""
- scheduler.instance.add_job(download_video, args=[video, 1])
+ __schedule_download_video(video)
diff --git a/YtManagerApp/management/jobs/synchronize.py b/YtManagerApp/management/jobs/synchronize.py
index fd0ba28..87c1dcc 100644
--- a/YtManagerApp/management/jobs/synchronize.py
+++ b/YtManagerApp/management/jobs/synchronize.py
@@ -7,9 +7,8 @@ from apscheduler.triggers.cron import CronTrigger
from YtManagerApp import scheduler
from YtManagerApp.appconfig import settings
from YtManagerApp.management.downloader import fetch_thumbnail, downloader_process_all, downloader_process_subscription
-from YtManagerApp.management.videos import create_video
from YtManagerApp.models import *
-from YtManagerApp.utils.youtube import YoutubeAPI
+from YtManagerApp.utils import youtube
log = logging.getLogger('sync')
__lock = Lock()
@@ -17,23 +16,25 @@ __lock = Lock()
_ENABLE_UPDATE_STATS = False
-def __check_new_videos_sub(subscription: Subscription, yt_api: YoutubeAPI):
+def __check_new_videos_sub(subscription: Subscription, yt_api: youtube.YoutubeAPI):
# Get list of videos
- for video in yt_api.list_playlist_videos(subscription.playlist_id):
- results = Video.objects.filter(video_id=video.getVideoId(), subscription=subscription)
+ for item in yt_api.playlist_items(subscription.playlist_id):
+ results = Video.objects.filter(video_id=item.resource_video_id, subscription=subscription)
if len(results) == 0:
- log.info('New video for subscription %s: %s %s"', subscription, video.getVideoId(), video.getTitle())
- db_video = create_video(video, subscription)
- else:
- if not _ENABLE_UPDATE_STATS:
- continue
- db_video = results.first()
+ log.info('New video for subscription %s: %s %s"', subscription, item.resource_video_id, item.title)
+ Video.create(item, subscription)
- # Update video stats - rating and view count
- stats = yt_api.get_single_video_stats(db_video.video_id)
- db_video.rating = stats.get_like_count() / (stats.get_like_count() + stats.get_dislike_count())
- db_video.views = stats.get_view_count()
- db_video.save()
+ if _ENABLE_UPDATE_STATS:
+ all_vids = Video.objects.filter(subscription=subscription)
+ all_vids_ids = [video.video_id for video in all_vids]
+ all_vids_dict = {v.video_id: v for v in all_vids}
+
+ for yt_video in yt_api.videos(all_vids_ids, part='id,statistics'):
+ video = all_vids_dict.get(yt_video.id)
+ if yt_video.like_count is not None and yt_video.dislike_count is not None:
+ video.rating = yt_video.n_likes / (yt_video.n_likes + yt_video.n_dislikes)
+ video.views = yt_video.n_views
+ video.save()
def __detect_deleted(subscription: Subscription):
@@ -82,11 +83,6 @@ def __fetch_thumbnails_obj(iterable, obj_type, id_attr):
def __fetch_thumbnails():
- # Fetch thumbnails
- log.info("Fetching channel thumbnails... ")
- __fetch_thumbnails_obj(Channel.objects.filter(icon_default__istartswith='http'), 'channel', 'channel_id')
- __fetch_thumbnails_obj(Channel.objects.filter(icon_best__istartswith='http'), 'channel', 'channel_id')
-
log.info("Fetching subscription thumbnails... ")
__fetch_thumbnails_obj(Subscription.objects.filter(icon_default__istartswith='http'), 'sub', 'playlist_id')
__fetch_thumbnails_obj(Subscription.objects.filter(icon_best__istartswith='http'), 'sub', 'playlist_id')
@@ -107,7 +103,7 @@ def synchronize():
# Sync subscribed playlists/channels
log.info("Sync - checking videos")
- yt_api = YoutubeAPI.build_public()
+ yt_api = youtube.YoutubeAPI.build_public()
for subscription in Subscription.objects.all():
__check_new_videos_sub(subscription, yt_api)
__detect_deleted(subscription)
@@ -128,7 +124,7 @@ def synchronize_subscription(subscription: Subscription):
__lock.acquire()
try:
log.info("Running synchronization for single subscription %d [%s]", subscription.id, subscription.name)
- yt_api = YoutubeAPI.build_public()
+ yt_api = youtube.YoutubeAPI.build_public()
log.info("Sync - checking videos")
__check_new_videos_sub(subscription, yt_api)
@@ -148,12 +144,15 @@ def synchronize_subscription(subscription: Subscription):
def schedule_synchronize_global():
trigger = CronTrigger.from_crontab(settings.get('global', 'SynchronizationSchedule'))
- scheduler.instance.add_job(synchronize, trigger, max_instances=1, coalesce=True)
+ job = scheduler.scheduler.add_job(synchronize, trigger, max_instances=1, coalesce=True)
+ log.info('Scheduled synchronize job job=%s', job.id)
def schedule_synchronize_now():
- scheduler.instance.add_job(synchronize, max_instances=1, coalesce=True)
+ job = scheduler.scheduler.add_job(synchronize, max_instances=1, coalesce=True)
+ log.info('Scheduled synchronize now job job=%s', job.id)
def schedule_synchronize_now_subscription(subscription: Subscription):
- scheduler.instance.add_job(synchronize_subscription, args=[subscription])
+ job = scheduler.scheduler.add_job(synchronize_subscription, args=[subscription])
+ log.info('Scheduled synchronize subscription job subscription=(%s), job=%s', subscription, job.id)
diff --git a/YtManagerApp/management/management.py b/YtManagerApp/management/management.py
deleted file mode 100644
index 6862518..0000000
--- a/YtManagerApp/management/management.py
+++ /dev/null
@@ -1,114 +0,0 @@
-from apscheduler.schedulers.background import BackgroundScheduler
-
-from YtManagerApp.models import SubscriptionFolder, Subscription, Video, Channel
-from YtManagerApp.utils.youtube import YoutubeAPI, YoutubeChannelInfo
-
-
-class FolderManager(object):
-
- @staticmethod
- def create_or_edit(fid, name, parent_id):
- # Create or edit
- if fid == '#':
- folder = SubscriptionFolder()
- else:
- folder = SubscriptionFolder.objects.get(id=int(fid))
-
- # Set attributes
- folder.name = name
- if parent_id == '#':
- folder.parent = None
- else:
- folder.parent = SubscriptionFolder.objects.get(id=int(parent_id))
-
- FolderManager.__validate(folder)
- folder.save()
-
- @staticmethod
- def __validate(folder: SubscriptionFolder):
- # Make sure folder name is unique in the parent folder
- for dbFolder in SubscriptionFolder.objects.filter(parent_id=folder.parent_id):
- if dbFolder.id != folder.id and dbFolder.name == folder.name:
- raise ValueError('Folder name is not unique!')
-
- # Prevent parenting loops
- current = folder
- visited = []
-
- while not (current is None):
- if current in visited:
- raise ValueError('Parenting cycle detected!')
- visited.append(current)
- current = current.parent
-
- @staticmethod
- def delete(fid: int):
- folder = SubscriptionFolder.objects.get(id=fid)
- folder.delete()
-
- @staticmethod
- def list_videos(fid: int):
- folder = SubscriptionFolder.objects.get(id=fid)
- folder_list = []
- queue = [folder]
- while len(queue) > 0:
- folder = queue.pop()
- folder_list.append(folder)
- queue.extend(SubscriptionFolder.objects.filter(parent=folder))
-
- return Video.objects.filter(subscription__parent_folder__in=folder_list).order_by('-publish_date')
-
-
-class SubscriptionManager(object):
- __scheduler = BackgroundScheduler()
-
- @staticmethod
- def create_or_edit(sid, url, name, parent_id):
- # Create or edit
- if sid == '#':
- SubscriptionManager.create(url, parent_id, YoutubeAPI.build_public())
- else:
- sub = Subscription.objects.get(id=int(sid))
- sub.name = name
-
- if parent_id == '#':
- sub.parent_folder = None
- else:
- sub.parent_folder = SubscriptionFolder.objects.get(id=int(parent_id))
-
- sub.save()
-
- @staticmethod
- def create(url, parent_id, yt_api: YoutubeAPI):
- sub = Subscription()
- # Set parent
- if parent_id == '#':
- sub.parent_folder = None
- else:
- sub.parent_folder = SubscriptionFolder.objects.get(id=int(parent_id))
-
- # Pull information about the channel and playlist
- url_type, url_id = yt_api.parse_channel_url(url)
-
- if url_type == 'playlist_id':
- info_playlist = yt_api.get_playlist_info(url_id)
- channel = SubscriptionManager.__get_or_create_channel('channel_id', info_playlist.getChannelId(), yt_api)
- sub.name = info_playlist.getTitle()
- sub.playlist_id = info_playlist.getId()
- sub.description = info_playlist.getDescription()
- sub.channel = channel
- sub.icon_default = info_playlist.getDefaultThumbnailUrl()
- sub.icon_best = info_playlist.getBestThumbnailUrl()
-
- else:
- channel = SubscriptionManager.__get_or_create_channel(url_type, url_id, yt_api)
- # No point in getting the 'uploads' playlist info
- sub.name = channel.name
- sub.playlist_id = channel.upload_playlist_id
- sub.description = channel.description
- sub.channel = channel
- sub.icon_default = channel.icon_default
- sub.icon_best = channel.icon_best
-
- sub.save()
-
diff --git a/YtManagerApp/management/videos.py b/YtManagerApp/management/videos.py
index 4853593..0e9bd84 100644
--- a/YtManagerApp/management/videos.py
+++ b/YtManagerApp/management/videos.py
@@ -1,25 +1,10 @@
-from YtManagerApp.models import Subscription, Video, SubscriptionFolder
-from YtManagerApp.utils.youtube import YoutubePlaylistItem
-from typing import Optional
import re
-from django.db.models import Q
+from typing import Optional
+
from django.contrib.auth.models import User
+from django.db.models import Q
-
-def create_video(yt_video: YoutubePlaylistItem, subscription: Subscription):
- video = Video()
- video.video_id = yt_video.getVideoId()
- video.name = yt_video.getTitle()
- video.description = yt_video.getDescription()
- video.watched = False
- video.downloaded_path = None
- video.subscription = subscription
- video.playlist_index = yt_video.getPlaylistIndex()
- video.publish_date = yt_video.getPublishDate()
- video.icon_default = yt_video.getDefaultThumbnailUrl()
- video.icon_best = yt_video.getBestThumbnailUrl()
- video.save()
- return video
+from YtManagerApp.models import Subscription, Video, SubscriptionFolder
def get_videos(user: User,
diff --git a/YtManagerApp/migrations/0007_auto_20181029_1638.py b/YtManagerApp/migrations/0007_auto_20181029_1638.py
new file mode 100644
index 0000000..e09e2d0
--- /dev/null
+++ b/YtManagerApp/migrations/0007_auto_20181029_1638.py
@@ -0,0 +1,32 @@
+# Generated by Django 2.1.2 on 2018-10-29 16:38
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('YtManagerApp', '0006_auto_20181027_0256'),
+ ]
+
+ operations = [
+ migrations.RemoveField(
+ model_name='subscription',
+ name='channel',
+ ),
+ migrations.AddField(
+ model_name='subscription',
+ name='channel_id',
+ field=models.CharField(default='test', max_length=128),
+ preserve_default=False,
+ ),
+ migrations.AddField(
+ model_name='subscription',
+ name='channel_name',
+ field=models.CharField(default='Unknown', max_length=1024),
+ preserve_default=False,
+ ),
+ migrations.DeleteModel(
+ name='Channel',
+ ),
+ ]
diff --git a/YtManagerApp/models.py b/YtManagerApp/models.py
index 5d3043c..0d5c7cf 100644
--- a/YtManagerApp/models.py
+++ b/YtManagerApp/models.py
@@ -6,7 +6,7 @@ from django.contrib.auth.models import User
from django.contrib.auth.models import User
from django.db import models
from django.db.models.functions import Lower
-from YtManagerApp.utils.youtube import YoutubeAPI, YoutubeChannelInfo, YoutubePlaylistInfo
+from YtManagerApp.utils import youtube
# help_text = user shown text
# verbose_name = user shown name
@@ -176,6 +176,9 @@ class SubscriptionFolder(models.Model):
current = current.parent
return s[:-3]
+ def __repr__(self):
+ return f'folder {self.id}, name="{self.name}"'
+
def delete_folder(self, keep_subscriptions: bool):
if keep_subscriptions:
@@ -225,92 +228,13 @@ class SubscriptionFolder(models.Model):
return data_collected
-class Channel(models.Model):
- channel_id = models.TextField(null=False, unique=True)
- username = models.TextField(null=True, unique=True)
- custom_url = models.TextField(null=True, unique=True)
- name = models.TextField()
- description = models.TextField()
- icon_default = models.TextField()
- icon_best = models.TextField()
- upload_playlist_id = models.TextField()
-
- def __str__(self):
- return self.name
-
- @staticmethod
- def find_by_channel_id(channel_id):
- result = Channel.objects.filter(channel_id=channel_id)
- if len(result) > 0:
- return result.first()
- return None
-
- @staticmethod
- def find_by_username(username):
- result = Channel.objects.filter(username=username)
- if len(result) > 0:
- return result.first()
- return None
-
- @staticmethod
- def find_by_custom_url(custom_url):
- result = Channel.objects.filter(custom_url=custom_url)
- if len(result) > 0:
- return result.first()
- return None
-
- def fill(self, yt_channel_info: YoutubeChannelInfo):
- self.channel_id = yt_channel_info.getId()
- self.custom_url = yt_channel_info.getCustomUrl()
- self.name = yt_channel_info.getTitle()
- self.description = yt_channel_info.getDescription()
- self.icon_default = yt_channel_info.getDefaultThumbnailUrl()
- self.icon_best = yt_channel_info.getBestThumbnailUrl()
- self.upload_playlist_id = yt_channel_info.getUploadsPlaylist()
- self.save()
-
- @staticmethod
- def get_or_create(url_type: str, url_id: str, yt_api: YoutubeAPI):
- channel: Channel = None
- info_channel: YoutubeChannelInfo = None
-
- if url_type == 'user':
- channel = Channel.find_by_username(url_id)
- if not channel:
- info_channel = yt_api.get_channel_info_by_username(url_id)
- channel = Channel.find_by_channel_id(info_channel.getId())
-
- elif url_type == 'channel_id':
- channel = Channel.find_by_channel_id(url_id)
- if not channel:
- info_channel = yt_api.get_channel_info(url_id)
-
- elif url_type == 'channel_custom':
- channel = Channel.find_by_custom_url(url_id)
- if not channel:
- found_channel_id = yt_api.search_channel(url_id)
- channel = Channel.find_by_channel_id(found_channel_id)
- if not channel:
- info_channel = yt_api.get_channel_info(found_channel_id)
-
- # If we downloaded information about the channel, store information
- # about the channel here.
- if info_channel:
- if not channel:
- channel = Channel()
- if url_type == 'user':
- channel.username = url_id
- channel.fill(info_channel)
-
- return channel
-
-
class Subscription(models.Model):
name = models.CharField(null=False, max_length=1024)
parent_folder = models.ForeignKey(SubscriptionFolder, on_delete=models.CASCADE, null=True, blank=True)
playlist_id = models.CharField(null=False, max_length=128)
description = models.TextField()
- channel = models.ForeignKey(Channel, on_delete=models.CASCADE)
+ channel_id = models.CharField(max_length=128)
+ channel_name = models.CharField(max_length=1024)
icon_default = models.CharField(max_length=1024)
icon_best = models.CharField(max_length=1024)
user = models.ForeignKey(User, on_delete=models.CASCADE)
@@ -327,30 +251,42 @@ class Subscription(models.Model):
def __str__(self):
return self.name
- def fill_from_playlist(self, info_playlist: YoutubePlaylistInfo):
- self.name = info_playlist.getTitle()
- self.playlist_id = info_playlist.getId()
- self.description = info_playlist.getDescription()
- self.icon_default = info_playlist.getDefaultThumbnailUrl()
- self.icon_best = info_playlist.getBestThumbnailUrl()
+ def __repr__(self):
+ return f'subscription {self.id}, name="{self.name}", playlist_id="{self.playlist_id}"'
- def copy_from_channel(self):
+ def fill_from_playlist(self, info_playlist: youtube.Playlist):
+ self.name = info_playlist.title
+ self.playlist_id = info_playlist.id
+ self.description = info_playlist.description
+ self.channel_id = info_playlist.channel_id
+ self.channel_name = info_playlist.channel_title
+ self.icon_default = youtube.default_thumbnail(info_playlist).url
+ self.icon_best = youtube.best_thumbnail(info_playlist).url
+
+ def copy_from_channel(self, info_channel: youtube.Channel):
# No point in storing info about the 'uploads from X' playlist
- self.name = self.channel.name
- self.playlist_id = self.channel.upload_playlist_id
- self.description = self.channel.description
- self.icon_default = self.channel.icon_default
- self.icon_best = self.channel.icon_best
+ self.name = info_channel.title
+ self.playlist_id = info_channel.uploads_playlist.id
+ self.description = info_channel.description
+ self.channel_id = info_channel.id
+ self.channel_name = info_channel.title
+ self.icon_default = youtube.default_thumbnail(info_channel).url
+ self.icon_best = youtube.best_thumbnail(info_channel).url
+
+ def fetch_from_url(self, url, yt_api: youtube.YoutubeAPI):
+ url_parsed = yt_api.parse_url(url)
+ if 'playlist' in url_parsed:
+ info_playlist = yt_api.playlist(url=url)
+ if info_playlist is None:
+ raise ValueError('Invalid playlist ID!')
- def fetch_from_url(self, url, yt_api: YoutubeAPI):
- url_type, url_id = yt_api.parse_channel_url(url)
- if url_type == 'playlist_id':
- info_playlist = yt_api.get_playlist_info(url_id)
- self.channel = Channel.get_or_create('channel_id', info_playlist.getChannelId(), yt_api)
self.fill_from_playlist(info_playlist)
else:
- self.channel = Channel.get_or_create(url_type, url_id, yt_api)
- self.copy_from_channel()
+ info_channel = yt_api.channel(url=url)
+ if info_channel is None:
+ raise ValueError('Cannot find channel!')
+
+ self.copy_from_channel(info_channel)
def delete_subscription(self, keep_downloaded_videos: bool):
self.delete()
@@ -383,6 +319,22 @@ class Video(models.Model):
views = models.IntegerField(null=False, default=0)
rating = models.FloatField(null=False, default=0.5)
+ @staticmethod
+ def create(playlist_item: youtube.PlaylistItem, subscription: Subscription):
+ video = Video()
+ video.video_id = playlist_item.resource_video_id
+ video.name = playlist_item.title
+ video.description = playlist_item.description
+ video.watched = False
+ video.downloaded_path = None
+ video.subscription = subscription
+ video.playlist_index = playlist_item.position
+ video.publish_date = playlist_item.published_at
+ video.icon_default = youtube.default_thumbnail(playlist_item).url
+ video.icon_best = youtube.best_thumbnail(playlist_item).url
+ video.save()
+ return video
+
def mark_watched(self):
self.watched = True
self.save()
@@ -428,3 +380,6 @@ class Video(models.Model):
def __str__(self):
return self.name
+
+ def __repr__(self):
+ return f'video {self.id}, video_id="{self.video_id}"'
diff --git a/YtManagerApp/scheduler.py b/YtManagerApp/scheduler.py
index b149c05..f1e11b0 100644
--- a/YtManagerApp/scheduler.py
+++ b/YtManagerApp/scheduler.py
@@ -2,12 +2,12 @@ import logging
import sys
from apscheduler.schedulers.background import BackgroundScheduler
-instance: BackgroundScheduler = None
+scheduler: BackgroundScheduler = None
def initialize_scheduler():
from .appconfig import settings
- global instance
+ global scheduler
logger = logging.getLogger('scheduler')
executors = {
@@ -17,8 +17,8 @@ def initialize_scheduler():
}
}
job_defaults = {
- 'misfire_grace_time': sys.maxsize
+ 'misfire_grace_time': 60 * 60 * 24 * 365 # 1 year
}
- instance = BackgroundScheduler(logger=logger, executors=executors, job_defaults=job_defaults)
- instance.start()
+ scheduler = BackgroundScheduler(logger=logger, executors=executors, job_defaults=job_defaults)
+ scheduler.start()
diff --git a/YtManagerApp/utils/iterutils.py b/YtManagerApp/utils/iterutils.py
deleted file mode 100644
index 5f3d36d..0000000
--- a/YtManagerApp/utils/iterutils.py
+++ /dev/null
@@ -1,32 +0,0 @@
-import itertools
-from typing import Iterable
-
-
-def first_true(*args, default=False, pred=None):
- """Returns the first true value in the iterable.
-
- If no true value is found, returns *default*
-
- If *pred* is not None, returns the first item
- for which pred(item) is true.
-
- """
- # first_true([a,b,c], x) --> a or b or c or x
- # first_true([a,b], x, f) --> a if f(a) else b if f(b) else x
- return next(filter(pred, args), default)
-
-
-def as_chunks(iterable: Iterable, chunk_size: int):
- """
- Iterates an iterable in chunks of chunk_size elements.
- :param iterable: An iterable containing items to iterate.
- :param chunk_size: Chunk size
- :return: Returns a generator which will yield chunks of size chunk_size
- """
-
- it = iter(iterable)
- while True:
- chunk = tuple(itertools.islice(it, chunk_size))
- if not chunk:
- return
- yield chunk
diff --git a/YtManagerApp/utils/youtube.py b/YtManagerApp/utils/youtube.py
index fb2f6db..9e735ff 100644
--- a/YtManagerApp/utils/youtube.py
+++ b/YtManagerApp/utils/youtube.py
@@ -1,285 +1,48 @@
-from googleapiclient.discovery import build
-from googleapiclient.errors import Error as APIError
-from google_auth_oauthlib.flow import InstalledAppFlow
from django.conf import settings
-import re
-from YtManagerApp.utils.iterutils import as_chunks
-
-API_SERVICE_NAME = 'youtube'
-API_VERSION = 'v3'
-
-YOUTUBE_LIST_LIMIT = 50
+from external.pytaw.pytaw.youtube import YouTube, Channel, Playlist, PlaylistItem, Thumbnail, InvalidURL, Resource, Video
+from typing import Optional
-class YoutubeException(Exception):
- pass
-
-
-class YoutubeInvalidURLException(YoutubeException):
- pass
-
-
-class YoutubeChannelNotFoundException(YoutubeException):
- pass
-
-
-class YoutubeUserNotFoundException(YoutubeException):
- pass
-
-
-class YoutubePlaylistNotFoundException(YoutubeException):
- pass
-
-
-class YoutubeVideoNotFoundException(YoutubeException):
- pass
-
-
-class YoutubeChannelInfo(object):
- def __init__(self, result_dict):
- self.__id = result_dict['id']
- self.__snippet = result_dict['snippet']
- self.__contentDetails = result_dict['contentDetails']
-
- def getId(self):
- return self.__id
-
- def getTitle(self):
- return self.__snippet['title']
-
- def getDescription(self):
- return self.__snippet['description']
-
- def getCustomUrl(self):
- try:
- return self.__snippet['customUrl']
- except KeyError:
- return None
-
- def getDefaultThumbnailUrl(self):
- return self.__snippet['thumbnails']['default']['url']
-
- def getBestThumbnailUrl(self):
- best_url = None
- best_res = 0
- for _, thumb in self.__snippet['thumbnails'].items():
- res = thumb['width'] * thumb['height']
- if res > best_res:
- best_res = res
- best_url = thumb['url']
- return best_url
-
- def getUploadsPlaylist(self):
- return self.__contentDetails['relatedPlaylists']['uploads']
-
-
-class YoutubePlaylistInfo(object):
- def __init__(self, result_dict):
- self.__id = result_dict['id']
- self.__snippet = result_dict['snippet']
-
- def getId(self):
- return self.__id
-
- def getChannelId(self):
- return self.__snippet['channelId']
-
- def getTitle(self):
- return self.__snippet['title']
-
- def getDescription(self):
- return self.__snippet['description']
-
- def getDefaultThumbnailUrl(self):
- return self.__snippet['thumbnails']['default']['url']
-
- def getBestThumbnailUrl(self):
- best_url = None
- best_res = 0
- for _, thumb in self.__snippet['thumbnails'].items():
- res = thumb['width'] * thumb['height']
- if res > best_res:
- best_res = res
- best_url = thumb['url']
- return best_url
-
-
-class YoutubePlaylistItem(object):
- def __init__(self, result_dict):
- self.__snippet = result_dict['snippet']
-
- def getVideoId(self):
- return self.__snippet['resourceId']['videoId']
-
- def getPublishDate(self):
- return self.__snippet['publishedAt']
-
- def getTitle(self):
- return self.__snippet['title']
-
- def getDescription(self):
- return self.__snippet['description']
-
- def getDefaultThumbnailUrl(self):
- return self.__snippet['thumbnails']['default']['url']
-
- def getBestThumbnailUrl(self):
- best_url = None
- best_res = 0
- for _, thumb in self.__snippet['thumbnails'].items():
- res = thumb['width'] * thumb['height']
- if res > best_res:
- best_res = res
- best_url = thumb['url']
- return best_url
-
- def getPlaylistIndex(self):
- return self.__snippet['position']
-
-
-class YoutubeVideoStatistics(object):
- def __init__(self, result_dict):
- self.id = result_dict['id']
- self.stats = result_dict['statistics']
-
- def get_view_count(self):
- return int(self.stats['viewCount'])
-
- def get_like_count(self):
- return int(self.stats['likeCount'])
-
- def get_dislike_count(self):
- return int(self.stats['dislikeCount'])
-
- def get_favorite_count(self):
- return int(self.stats['favoriteCount'])
-
- def get_comment_count(self):
- return int(self.stats['commentCount'])
-
-
-class YoutubeAPI(object):
- def __init__(self, service):
- self.service = service
+class YoutubeAPI(YouTube):
@staticmethod
def build_public() -> 'YoutubeAPI':
- service = build(API_SERVICE_NAME, API_VERSION, developerKey=settings.YOUTUBE_API_KEY)
- return YoutubeAPI(service)
-
- @staticmethod
- def parse_channel_url(url):
- """
- Parses given channel url, returns a tuple of the form (type, value), where type can be one of:
- * channel_id
- * channel_custom
- * user
- * playlist_id
- :param url: URL to parse
- :return: (type, value) tuple
- """
- match = re.search(r'youtube\.com/.*[&?]list=([^?&/]+)', url)
- if match:
- return 'playlist_id', match.group(1)
-
- match = re.search(r'youtube\.com/user/([^?&/]+)', url)
- if match:
- return 'user', match.group(1)
-
- match = re.search(r'youtube\.com/channel/([^?&/]+)', url)
- if match:
- return 'channel_id', match.group(1)
-
- match = re.search(r'youtube\.com/(?:c/)?([^?&/]+)', url)
- if match:
- return 'channel_custom', match.group(1)
-
- raise YoutubeInvalidURLException('Unrecognized URL format!')
-
- def get_playlist_info(self, list_id) -> YoutubePlaylistInfo:
- result = self.service.playlists()\
- .list(part='snippet', id=list_id)\
- .execute()
-
- if len(result['items']) <= 0:
- raise YoutubePlaylistNotFoundException("Invalid playlist ID.")
-
- return YoutubePlaylistInfo(result['items'][0])
-
- def get_channel_info_by_username(self, user) -> YoutubeChannelInfo:
- result = self.service.channels()\
- .list(part='snippet,contentDetails', forUsername=user)\
- .execute()
-
- if len(result['items']) <= 0:
- raise YoutubeUserNotFoundException('Invalid user.')
-
- return YoutubeChannelInfo(result['items'][0])
-
- def get_channel_info(self, channel_id) -> YoutubeChannelInfo:
- result = self.service.channels()\
- .list(part='snippet,contentDetails', id=channel_id)\
- .execute()
-
- if len(result['items']) <= 0:
- raise YoutubeChannelNotFoundException('Invalid channel ID.')
-
- return YoutubeChannelInfo(result['items'][0])
-
- def search_channel(self, custom) -> str:
- result = self.service.search()\
- .list(part='id', q=custom, type='channel')\
- .execute()
-
- if len(result['items']) <= 0:
- raise YoutubeChannelNotFoundException('Could not find channel!')
-
- channel_result = result['items'][0]
- return channel_result['id']['channelId']
-
- def list_playlist_videos(self, playlist_id):
- kwargs = {
- "part": "snippet",
- "maxResults": 50,
- "playlistId": playlist_id
- }
- last_page = False
-
- while not last_page:
- result = self.service.playlistItems()\
- .list(**kwargs)\
- .execute()
-
- for item in result['items']:
- yield YoutubePlaylistItem(item)
-
- if 'nextPageToken' in result:
- kwargs['pageToken'] = result['nextPageToken']
- else:
- last_page = True
-
- def get_single_video_stats(self, video_id) -> YoutubeVideoStatistics:
- result = list(self.get_video_stats([video_id]))
- if len(result) < 1:
- raise YoutubeVideoNotFoundException('Could not find video with id ' + video_id + '!')
- return result[0]
-
- def get_video_stats(self, video_id_list):
- for chunk in as_chunks(video_id_list, YOUTUBE_LIST_LIMIT):
- kwargs = {
- "part": "statistics",
- "maxResults": YOUTUBE_LIST_LIMIT,
- "id": ','.join(chunk)
- }
- result = self.service.videos()\
- .list(**kwargs)\
- .execute()
-
- for item in result['items']:
- yield YoutubeVideoStatistics(item)
+ return YoutubeAPI(key=settings.YOUTUBE_API_KEY)
# @staticmethod
# def build_oauth() -> 'YoutubeAPI':
# flow =
# credentials =
# service = build(API_SERVICE_NAME, API_VERSION, credentials)
+
+
+def default_thumbnail(resource: Resource) -> Optional[Thumbnail]:
+ """
+ Gets the default thumbnail for a resource.
+ Searches in the list of thumbnails for one with the label 'default', or takes the first one.
+ :param resource:
+ :return:
+ """
+ thumbs = getattr(resource, 'thumbnails', None)
+
+ if thumbs is None or len(thumbs) <= 0:
+ return None
+
+ return next(
+ (i for i in thumbs if i.id == 'default'),
+ thumbs[0]
+ )
+
+
+def best_thumbnail(resource: Resource) -> Optional[Thumbnail]:
+ """
+ Gets the best thumbnail available for a resource.
+ :param resource:
+ :return:
+ """
+ thumbs = getattr(resource, 'thumbnails', None)
+
+ if thumbs is None or len(thumbs) <= 0:
+ return None
+
+ return max(thumbs, key=lambda t: t.width * t.height)
\ No newline at end of file
diff --git a/YtManagerApp/views/index.py b/YtManagerApp/views/index.py
index 72b41d7..646bdf4 100644
--- a/YtManagerApp/views/index.py
+++ b/YtManagerApp/views/index.py
@@ -1,4 +1,4 @@
-from crispy_forms.helper import FormHelperpython3
+from crispy_forms.helper import FormHelper
from crispy_forms.layout import Layout, Field, HTML
from django import forms
from django.contrib.auth.decorators import login_required
@@ -175,7 +175,8 @@ class SubscriptionFolderForm(forms.ModelForm):
args_id.append(~Q(id=self.instance.id))
if SubscriptionFolder.objects.filter(parent=parent, name__iexact=name, *args_id).count() > 0:
- raise forms.ValidationError('A folder with the same name already exists in the given parent directory!', code='already_exists')
+ raise forms.ValidationError(
+ 'A folder with the same name already exists in the given parent directory!', code='already_exists')
# Check for cycles
if self.instance is not None:
@@ -238,6 +239,7 @@ class CreateSubscriptionForm(forms.ModelForm):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
+ self.yt_api = youtube.YoutubeAPI.build_public()
self.helper = FormHelper()
self.helper.form_tag = False
self.helper.layout = Layout(
@@ -252,11 +254,18 @@ class CreateSubscriptionForm(forms.ModelForm):
)
def clean_playlist_url(self):
- playlist_url = self.cleaned_data['playlist_url']
+ playlist_url: str = self.cleaned_data['playlist_url']
try:
- youtube.YoutubeAPI.parse_channel_url(playlist_url)
- except youtube.YoutubeInvalidURLException:
- raise forms.ValidationError('Invalid playlist/channel URL, or not in a recognized format.')
+ parsed_url = self.yt_api.parse_url(playlist_url)
+ except youtube.InvalidURL as e:
+ raise forms.ValidationError(str(e))
+
+ is_playlist = 'playlist' in parsed_url
+ is_channel = parsed_url['type'] in ('channel', 'user', 'channel_custom')
+
+ if not is_channel and not is_playlist:
+ raise forms.ValidationError('The given URL must link to a channel or a playlist!')
+
return playlist_url
@@ -269,21 +278,22 @@ class CreateSubscriptionModal(LoginRequiredMixin, ModalMixin, CreateView):
api = youtube.YoutubeAPI.build_public()
try:
form.instance.fetch_from_url(form.cleaned_data['playlist_url'], api)
- except youtube.YoutubeChannelNotFoundException:
- return self.modal_response(
- form, False, 'Could not find a channel based on the given URL. Please verify that the URL is correct.')
- except youtube.YoutubeUserNotFoundException:
- return self.modal_response(
- form, False, 'Could not find an user based on the given URL. Please verify that the URL is correct.')
- except youtube.YoutubePlaylistNotFoundException:
- return self.modal_response(
- form, False, 'Could not find a playlist based on the given URL. Please verify that the URL is correct.')
- except youtube.YoutubeException as e:
- return self.modal_response(
- form, False, str(e))
- except youtube.APIError as e:
- return self.modal_response(
- form, False, 'An error occurred while communicating with the YouTube API: ' + str(e))
+ except youtube.InvalidURL as e:
+ return self.modal_response(form, False, str(e))
+ except ValueError as e:
+ return self.modal_response(form, False, str(e))
+ # except youtube.YoutubeUserNotFoundException:
+ # return self.modal_response(
+ # form, False, 'Could not find an user based on the given URL. Please verify that the URL is correct.')
+ # except youtube.YoutubePlaylistNotFoundException:
+ # return self.modal_response(
+ # form, False, 'Could not find a playlist based on the given URL. Please verify that the URL is correct.')
+ # except youtube.YoutubeException as e:
+ # return self.modal_response(
+ # form, False, str(e))
+ # except youtube.APIError as e:
+ # return self.modal_response(
+ # form, False, 'An error occurred while communicating with the YouTube API: ' + str(e))
return super().form_valid(form)
diff --git a/config/config.ini b/config/config.ini
index cf59a4b..39e85ab 100644
--- a/config/config.ini
+++ b/config/config.ini
@@ -41,7 +41,7 @@ LogLevel=DEBUG
;DownloadOrder=playlist
; Path where downloaded videos are stored
-DownloadPath=D:\\Dev\\youtube-channel-manager\\temp\\download
+DownloadPath=temp/download
; A pattern which describes how downloaded files are organized. Extensions are automatically appended.
; Supported fields: channel, channel_id, playlist, playlist_id, playlist_index, title, id
diff --git a/external/__init__.py b/external/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/external/pytaw/.gitignore b/external/pytaw/.gitignore
new file mode 100644
index 0000000..16ebe93
--- /dev/null
+++ b/external/pytaw/.gitignore
@@ -0,0 +1,19 @@
+*.bak
+*.egg
+*.egg-info/
+*.eggs/
+*.pyproj
+*.sln
+*.vs/
+*~
+.DS_Store
+.cache/
+.coverage
+.idea/
+.tox/
+_build/
+build/
+dist/
+
+__pycache__/
+*.ini
diff --git a/external/pytaw/.pytaw.conf b/external/pytaw/.pytaw.conf
new file mode 100644
index 0000000..e494f3d
--- /dev/null
+++ b/external/pytaw/.pytaw.conf
@@ -0,0 +1,3 @@
+; by default pytaw will look for this file (".pytaw.conf") in the user's home directory
+[youtube]
+developer_key = aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
diff --git a/external/pytaw/README.md b/external/pytaw/README.md
new file mode 100644
index 0000000..be3f3a2
--- /dev/null
+++ b/external/pytaw/README.md
@@ -0,0 +1,29 @@
+# PYTAW: Python YouTube API Wrapper
+
+###Note
+This library is copied from [https://github.com/chibicitiberiu/pytaw/tree/improvements](https://github.com/chibicitiberiu/pytaw/tree/improvements).
+
+
+```python
+>>> from pytaw import YouTube
+>>> youtube = YouTube(key='your_api_key')
+>>> video = youtube.video('4vuW6tQ0218')
+>>> video.title
+'Monty Python - Dead Parrot'
+>>> video.published_at
+datetime.datetime(2007, 2, 14, 13, 55, 51, tzinfo=tzutc())
+>>> channel = video.channel
+>>> channel.title
+'Chadner'
+>>> search = youtube.search(q='monty python')
+>>> search[0]
+
+>>> for r in search[:5]:
+... print(r)
+...
+Monty Python
+Chemist Sketch - Monty Python's Flying Circus
+A Selection of Sketches from "Monty Python's Flying Circus" - #4
+Monty Python - Dead Parrot
+Monty Python And the holy grail
+```
\ No newline at end of file
diff --git a/external/pytaw/__init__.py b/external/pytaw/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/external/pytaw/docs/Makefile b/external/pytaw/docs/Makefile
new file mode 100644
index 0000000..96e5364
--- /dev/null
+++ b/external/pytaw/docs/Makefile
@@ -0,0 +1,20 @@
+# Minimal makefile for Sphinx documentation
+#
+
+# You can set these variables from the command line.
+SPHINXOPTS =
+SPHINXBUILD = sphinx-build
+SPHINXPROJ = pytaw
+SOURCEDIR = .
+BUILDDIR = _build
+
+# Put it first so that "make" without argument is like "make help".
+help:
+ @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
+
+.PHONY: help Makefile
+
+# Catch-all target: route all unknown targets to Sphinx using the new
+# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
+%: Makefile
+ @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
\ No newline at end of file
diff --git a/external/pytaw/docs/conf.py b/external/pytaw/docs/conf.py
new file mode 100644
index 0000000..459d473
--- /dev/null
+++ b/external/pytaw/docs/conf.py
@@ -0,0 +1,169 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# pytaw documentation build configuration file, created by
+# sphinx-quickstart on Mon Nov 27 19:26:35 2017.
+#
+# This file is execfile()d with the current directory set to its
+# containing dir.
+#
+# Note that not all possible configuration values are present in this
+# autogenerated file.
+#
+# All configuration values have a default; values that are commented out
+# serve to show the default.
+
+# If extensions (or modules to document with autodoc) are in another directory,
+# add these directories to sys.path here. If the directory is relative to the
+# documentation root, use os.path.abspath to make it absolute, like shown here.
+#
+# import os
+# import sys
+# sys.path.insert(0, os.path.abspath('.'))
+
+
+# -- General configuration ------------------------------------------------
+
+# If your documentation needs a minimal Sphinx version, state it here.
+#
+# needs_sphinx = '1.0'
+
+# Add any Sphinx extension module names here, as strings. They can be
+# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
+# ones.
+extensions = ['sphinx.ext.autodoc']
+
+# Add any paths that contain templates here, relative to this directory.
+templates_path = ['_templates']
+
+# The suffix(es) of source filenames.
+# You can specify multiple suffix as a list of string:
+#
+# source_suffix = ['.rst', '.md']
+source_suffix = '.rst'
+
+# The master toctree document.
+master_doc = 'index'
+
+# General information about the project.
+project = 'pytaw'
+copyright = '2017, 6000hulls'
+author = '6000hulls'
+
+# The version info for the project you're documenting, acts as replacement for
+# |version| and |release|, also used in various other places throughout the
+# built documents.
+#
+# The short X.Y version.
+version = '0.1'
+# The full version, including alpha/beta/rc tags.
+release = '0.1'
+
+# The language for content autogenerated by Sphinx. Refer to documentation
+# for a list of supported languages.
+#
+# This is also used if you do content translation via gettext catalogs.
+# Usually you set "language" from the command line for these cases.
+language = None
+
+# List of patterns, relative to source directory, that match files and
+# directories to ignore when looking for source files.
+# This patterns also effect to html_static_path and html_extra_path
+exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
+
+# The name of the Pygments (syntax highlighting) style to use.
+pygments_style = 'sphinx'
+
+# If true, `todo` and `todoList` produce output, else they produce nothing.
+todo_include_todos = False
+
+
+# -- Options for HTML output ----------------------------------------------
+
+# The theme to use for HTML and HTML Help pages. See the documentation for
+# a list of builtin themes.
+#
+html_theme = 'sphinx_rtd_theme'
+
+# Theme options are theme-specific and customize the look and feel of a theme
+# further. For a list of options available for each theme, see the
+# documentation.
+#
+# html_theme_options = {}
+
+# Add any paths that contain custom static files (such as style sheets) here,
+# relative to this directory. They are copied after the builtin static files,
+# so a file named "default.css" will overwrite the builtin "default.css".
+html_static_path = ['_static']
+
+# Custom sidebar templates, must be a dictionary that maps document names
+# to template names.
+#
+# This is required for the alabaster theme
+# refs: http://alabaster.readthedocs.io/en/latest/installation.html#sidebars
+html_sidebars = {
+ '**': [
+ 'relations.html', # needs 'show_related': True theme option to display
+ 'searchbox.html',
+ ]
+}
+
+
+# -- Options for HTMLHelp output ------------------------------------------
+
+# Output file base name for HTML help builder.
+htmlhelp_basename = 'pytawdoc'
+
+
+# -- Options for LaTeX output ---------------------------------------------
+
+latex_elements = {
+ # The paper size ('letterpaper' or 'a4paper').
+ #
+ # 'papersize': 'letterpaper',
+
+ # The font size ('10pt', '11pt' or '12pt').
+ #
+ # 'pointsize': '10pt',
+
+ # Additional stuff for the LaTeX preamble.
+ #
+ # 'preamble': '',
+
+ # Latex figure (float) alignment
+ #
+ # 'figure_align': 'htbp',
+}
+
+# Grouping the document tree into LaTeX files. List of tuples
+# (source start file, target name, title,
+# author, documentclass [howto, manual, or own class]).
+latex_documents = [
+ (master_doc, 'pytaw.tex', 'pytaw Documentation',
+ '6000hulls', 'manual'),
+]
+
+
+# -- Options for manual page output ---------------------------------------
+
+# One entry per manual page. List of tuples
+# (source start file, name, description, authors, manual section).
+man_pages = [
+ (master_doc, 'pytaw', 'pytaw Documentation',
+ [author], 1)
+]
+
+
+# -- Options for Texinfo output -------------------------------------------
+
+# Grouping the document tree into Texinfo files. List of tuples
+# (source start file, target name, title, author,
+# dir menu entry, description, category)
+texinfo_documents = [
+ (master_doc, 'pytaw', 'pytaw Documentation',
+ author, 'pytaw', 'One line description of project.',
+ 'Miscellaneous'),
+]
+
+
+
diff --git a/external/pytaw/docs/index.rst b/external/pytaw/docs/index.rst
new file mode 100644
index 0000000..84bd930
--- /dev/null
+++ b/external/pytaw/docs/index.rst
@@ -0,0 +1,18 @@
+PYTAW: Python YouTube API Wrapper
+=================================
+
+It's a wrapper for the YouTube python API. Written in python.
+
+.. automodule:: pytaw.youtube
+ :members:
+
+.. toctree::
+ :maxdepth: 2
+ :caption: Contents:
+
+Indices and tables
+==================
+
+* :ref:`genindex`
+* :ref:`modindex`
+* :ref:`search`
diff --git a/external/pytaw/docs/make.bat b/external/pytaw/docs/make.bat
new file mode 100644
index 0000000..ff66d9f
--- /dev/null
+++ b/external/pytaw/docs/make.bat
@@ -0,0 +1,36 @@
+@ECHO OFF
+
+pushd %~dp0
+
+REM Command file for Sphinx documentation
+
+if "%SPHINXBUILD%" == "" (
+ set SPHINXBUILD=sphinx-build
+)
+set SOURCEDIR=.
+set BUILDDIR=_build
+set SPHINXPROJ=pytaw
+
+if "%1" == "" goto help
+
+%SPHINXBUILD% >NUL 2>NUL
+if errorlevel 9009 (
+ echo.
+ echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
+ echo.installed, then set the SPHINXBUILD environment variable to point
+ echo.to the full path of the 'sphinx-build' executable. Alternatively you
+ echo.may add the Sphinx directory to PATH.
+ echo.
+ echo.If you don't have Sphinx installed, grab it from
+ echo.http://sphinx-doc.org/
+ exit /b 1
+)
+
+%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
+goto end
+
+:help
+%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
+
+:end
+popd
diff --git a/external/pytaw/main_test.py b/external/pytaw/main_test.py
new file mode 100644
index 0000000..7ea0cf7
--- /dev/null
+++ b/external/pytaw/main_test.py
@@ -0,0 +1,13 @@
+import pytaw
+
+yt = pytaw.YouTube(key='AIzaSyBabzE4Bup77WexdLMa9rN9z-wJidEfNX8')
+c = yt.channel('UCmmPgObSUPw1HL2lq6H4ffA')
+
+uploads_playlist = c.uploads_playlist
+print(repr(uploads_playlist))
+
+uploads_list = list(uploads_playlist.items)
+for item in uploads_list:
+ print(item.position, '...', repr(item), ' .... ', repr(item.video))
+ print(item.thumbnails)
+ break
diff --git a/external/pytaw/pytaw/__init__.py b/external/pytaw/pytaw/__init__.py
new file mode 100644
index 0000000..98d08e6
--- /dev/null
+++ b/external/pytaw/pytaw/__init__.py
@@ -0,0 +1 @@
+from .youtube import YouTube
\ No newline at end of file
diff --git a/external/pytaw/pytaw/utils.py b/external/pytaw/pytaw/utils.py
new file mode 100644
index 0000000..5bd503d
--- /dev/null
+++ b/external/pytaw/pytaw/utils.py
@@ -0,0 +1,92 @@
+import re
+import urllib.parse
+import typing
+from datetime import datetime, timezone
+
+import dateutil.parser
+import itertools
+
+def string_to_datetime(string):
+ if string is None:
+ return None
+ else:
+ return dateutil.parser.parse(string)
+
+
+def datetime_to_string(dt):
+ if dt is None:
+ return None
+ if dt.tzinfo is None:
+ dt = dt.astimezone(timezone.utc)
+ return dt.isoformat()
+
+
+def youtube_url_to_id(url):
+ """Extract video id from a youtube url.
+
+ If parsing fails, try regex. If that fails, return None.
+
+ The regex is from somewhere in this thread, I think:
+ https://stackoverflow.com/questions/3452546/how-do-i-get-the-youtube-video-id-from-a-url
+
+ """
+ url = urllib.parse.unquote(url)
+ url_data = urllib.parse.urlparse(url)
+ query = urllib.parse.parse_qs(url_data.query)
+ try:
+ # parse the url for a video query
+ return query["v"][0]
+ except KeyError:
+ # use regex to try and extract id
+ match = re.search(
+ r"((?<=(v|V)/)|(?<=be/)|(?<=(\?|\&)v=)|(?<=embed/))([\w-]+)",
+ url,
+ )
+ if match:
+ return match.group()
+ else:
+ return None
+
+
+def youtube_duration_to_seconds(value):
+ """Convert youtube (ISO 8601) duration to seconds.
+
+ https://en.wikipedia.org/wiki/ISO_8601#Durations
+ https://regex101.com/r/ALmmSS/1
+
+ """
+ iso8601 = r"P(?:(\d+)Y)?(?:(\d+)M)?(?:(\d+)W)?(?:(\d+)D)?T?(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?"
+ match = re.match(iso8601, value)
+ if match is None:
+ return None
+
+ group_names = ['years', 'months', 'weeks', 'days', 'hours', 'minutes', 'seconds']
+ d = dict()
+ for name, group in zip(group_names, match.groups(default=0)):
+ d[name] = int(group)
+
+ return int(
+ d['years']*365*24*60*60 +
+ d['months']*30*24*60*60 +
+ d['weeks']*7*24*60*60 +
+ d['days']*24*60*60 +
+ d['hours']*60*60 +
+ d['minutes']*60 +
+ d['seconds']
+ )
+
+
+def iterate_chunks(iterable: typing.Iterable, chunk_size: int):
+ """
+ Iterates an iterable in chunks of chunk_size elements.
+ :param iterable: An iterable containing items to iterate.
+ :param chunk_size: Chunk size
+ :return: Returns a generator which will yield chunks of size chunk_size
+ """
+
+ it = iter(iterable)
+ while True:
+ chunk = tuple(itertools.islice(it, chunk_size))
+ if not chunk:
+ return
+ yield chunk
\ No newline at end of file
diff --git a/external/pytaw/pytaw/youtube.py b/external/pytaw/pytaw/youtube.py
new file mode 100644
index 0000000..1d0beba
--- /dev/null
+++ b/external/pytaw/pytaw/youtube.py
@@ -0,0 +1,1055 @@
+import collections
+import configparser
+import itertools
+import logging
+import os
+from urllib.parse import urlsplit, parse_qs
+import typing
+from abc import ABC, abstractmethod
+from datetime import timedelta
+
+import googleapiclient.discovery
+from oauth2client.client import AccessTokenCredentials
+
+from .utils import (
+ datetime_to_string,
+ string_to_datetime,
+ youtube_duration_to_seconds,
+ iterate_chunks,
+)
+
+log = logging.getLogger(__name__)
+log.setLevel(logging.DEBUG)
+
+
+class DataMissing(Exception):
+ """Exception raised if data is not found in a Resource data store."""
+ pass
+
+
+class InvalidURL(Exception):
+ """Exception raised if an URL is not valid."""
+ pass
+
+
+class YouTube(object):
+ """The interface to the YouTube API.
+
+ Connects to the API by passing a developer api key, and provides some high-level methods for
+ querying it.
+
+ """
+
+ def __init__(self, key=None, access_token=None):
+ """Initialise the YouTube class.
+
+ :param key: developer api key (you need to get this from google)
+ :param access_token: access token from some other oauth2 authentication flow
+
+ """
+ if key is not None and access_token is not None:
+ raise ValueError("you should provide a developer key or an access token, but not both")
+
+ build_kwargs = {
+ 'serviceName': 'youtube',
+ 'version': 'v3',
+ 'cache_discovery': False, # suppress an annoying warning
+ }
+
+ if access_token is not None:
+ # build credentials using given access token
+ credentials = AccessTokenCredentials(access_token=access_token, user_agent='pytaw')
+ build_kwargs['credentials'] = credentials
+
+ else:
+ # use a develop key, either passed directly or from a config file
+ if key is not None:
+ developer_key = key
+
+ else:
+ # neither an access token or a key has been given, so look for a developer key in
+ # the default config file
+ config_file_path = os.path.join(os.path.expanduser('~'), ".pytaw.conf")
+ if not os.path.exists(config_file_path):
+ config_file_path = "/etc/pytaw.conf"
+
+ if os.path.exists(config_file_path):
+ config = configparser.ConfigParser()
+ config.read(config_file_path)
+ developer_key = config['youtube']['developer_key']
+ else:
+ raise ValueError("didn't find a developer key or an access token.")
+
+ build_kwargs['developerKey'] = developer_key
+
+ # build_kwargs now contains credentials, or a developer key
+ self.build = googleapiclient.discovery.build(**build_kwargs)
+
+ def __repr__(self):
+ return ""
+
+ def search(self, **kwargs):
+ """Search YouTube, returning an instance of `ListResponse`.
+
+ API parameters should be given as keyword arguments.
+
+ :return: ListResponse object containing the requested resource instances
+
+ """
+ api_params = {
+ 'part': 'id,snippet',
+ 'maxResults': 50,
+ }
+ api_params.update(kwargs)
+
+ # convert certain parameters from datetime to youtube-compatible string
+ datetime_fields = (
+ 'publishedBefore',
+ 'publishedAfter',
+ )
+ for field in datetime_fields:
+ try:
+ api_params[field] = datetime_to_string(api_params[field])
+ except KeyError:
+ pass
+
+ query = Query(self, 'search', api_params)
+ return ListResponse(query)
+
+ def subscriptions(self, **kwargs):
+ """Fetch list of channels that the authenticated user is subscribed to.
+
+ API parameters should be given as keyword arguments.
+
+ :return: ListResponse object containing channel instances
+
+ """
+ api_params = {
+ 'part': 'id,snippet',
+ 'mine': True,
+ 'maxResults': 50,
+ }
+ api_params.update(kwargs)
+
+ query = Query(self, 'subscriptions', api_params)
+ return ListResponse(query)
+
+ def video(self, id, **kwargs):
+ """Fetch a Video instance.
+
+ Additional API parameters should be given as keyword arguments.
+
+ :param id: youtube video id e.g. 'jNQXAC9IVRw'
+ :return: Video instance if video is found, else None
+
+ """
+ api_params = {
+ 'part': 'id',
+ 'id': id,
+ }
+ api_params.update(kwargs)
+
+ query = Query(self, 'videos', api_params)
+ return ListResponse(query).first()
+
+ def videos(self, id_list: typing.Iterable[str], **kwargs):
+ """Fetch multiple videos.
+
+ :param id_list: List of video IDs to fetch
+ :return: Iterable list of video objects.
+ """
+ response_list = []
+ for id_list_chunk in iterate_chunks(id_list, 50):
+ api_params = {
+ 'part': 'id',
+ 'id': ','.join(id_list_chunk),
+ }
+ api_params.update(kwargs)
+
+ query = Query(self, 'videos', api_params)
+ response_list.append(ListResponse(query))
+
+ return itertools.chain(response_list)
+
+ def parse_url(self, url: str) -> dict:
+ """
+ Parses a YouTube URL, and attempts to identify what resource it refers to.
+ :param url: URL to parse
+ :return: Returns a dictionary, containing the url 'type', and the url resource ('video', 'playlist', 'channel',
+ 'channel_custom', 'username')
+ """
+ result = {'type': 'unknown'}
+
+ url_spl = urlsplit(url)
+ url_path = url_spl.path.split('/')
+ url_query = parse_qs(url_spl.query)
+
+ if url_spl.netloc.endswith('youtube.com'):
+
+ # http://www.youtube.com/watch?v=-wtIMTCHWuI
+ if url_path[1] == 'watch':
+ result['type'] = 'video'
+ result['video'] = url_query['v'][0]
+ if 'list' in url_query:
+ result['playlist'] = url_query['list'][0]
+
+ # http://www.youtube.com/v/-wtIMTCHWuI?version=3&autohide=1
+ # https://www.youtube.com/embed/M7lc1UVf-VE
+ elif url_path[1] == 'v':
+ result['type'] = 'video'
+ result['video'] = url_path[2]
+ if 'list' in url_query:
+ result['playlist'] = url_query['list'][0]
+
+ # https://www.youtube.com/playlist?list=PLJRbJuI_csVDXhgRJ1xv6z-Igeb7CKroe
+ elif url_path[1] == 'playlist':
+ result['type'] = 'playlist'
+ result['playlist'] = url_query['list'][0]
+
+ # https://www.youtube.com/channel/UC0QHWhjbe5fGJEPz3sVb6nw
+ elif url_path[1] == 'channel':
+ result['type'] = 'channel'
+ result['channel'] = url_path[2]
+
+ # https://www.youtube.com/c/LinusTechTips
+ elif url_path[1] == 'c':
+ result['type'] = 'channel_custom'
+ result['channel_custom'] = url_path[1]
+
+ # https://www.youtube.com/user/LinusTechTips
+ elif url_path[1] == 'user':
+ result['type'] = 'user'
+ result['username'] = url_path[2]
+
+ # http://www.youtube.com/oembed?url=http%3A//www.youtube.com/watch?v%3D-wtIMTCHWuI&format=json
+ elif url_path[1] == 'oembed':
+ return self.parse_url(url_query['url'][0])
+
+ # http://www.youtube.com/attribution_link?a=JdfC0C9V6ZI&u=%2Fwatch%3Fv%3DEhxJLojIE_o%26feature%3Dshare
+ elif url_path[1] == 'attribution_link':
+ return self.parse_url('http://youtube.com/' + url_query['u'][0])
+
+ # https://www.youtube.com/results?search_query=test
+ elif url_path[1] == 'search' or url_path[1] == 'results':
+ result['type'] = 'search'
+ result['query'] = url_query['search_query'][0]
+
+ # Custom channel URLs might have the format https://www.youtube.com/LinusTechTips, which are pretty much
+ # impossible to handle properly
+ else:
+ raise InvalidURL('Unrecognized URL format: ' + url)
+
+ # http://youtu.be/-wtIMTCHWuI
+ elif url_spl.netloc == 'youtu.be':
+ result['type'] = 'video'
+ result['video'] = url_path[1]
+
+ # https://youtube.googleapis.com/v/My2FRPA3Gf8
+ elif url_spl.netloc == 'youtube.googleapis.com':
+ if url_path[1] == 'v':
+ result['type'] = 'video'
+ result['video'] = url_path[2]
+ else:
+ raise InvalidURL('Unrecognized URL format: ' + url)
+
+ else:
+ raise InvalidURL('Unrecognized URL format: ' + url)
+
+ return result
+
+ def __find_channel_by_custom_url(self, custom_part):
+ # See https://stackoverflow.com/a/37947865
+ # Using the YT API, the only way to obtain a channel using a custom URL that we know of is to search for it.
+ # Another option (which might be more reliable) could be scraping the page
+ api_params = {
+ 'part': 'id',
+ 'type': 'channel',
+ 'q': custom_part,
+ }
+
+ return self.search(**api_params).first()
+
+ def channel(self, channel_id=None, username=None, url=None, **kwargs):
+ """Fetch a Channel instance.
+
+ Additional API parameters should be given as keyword arguments.
+
+ :param id: youtube channel id e.g. 'UCMDQxm7cUx3yXkfeHa5zJIQ'
+ :return: Channel instance if channel is found, else None
+
+ """
+ api_params = {
+ 'part': 'id',
+ }
+
+ if channel_id is not None:
+ api_params['id'] = channel_id
+ elif username is not None:
+ api_params['forUsername'] = username
+ elif url is not None:
+ parse = self.parse_url(url)
+ if parse['type'] == 'channel':
+ api_params['id'] = parse['channel']
+ elif parse['type'] == 'user':
+ api_params['forUsername'] = parse['username']
+ elif parse['type'] == 'channel_custom':
+ return self.__find_channel_by_custom_url(parse['channel_custom'])
+ else:
+ raise InvalidURL('Can\'t extract channel from given URL.')
+ else:
+ raise ValueError('Please specify exactly one of: channel_id, username, url')
+
+ api_params.update(kwargs)
+
+ query = Query(self, 'channels', api_params)
+ return ListResponse(query).first()
+
+ def playlist(self, id=None, url=None, **kwargs):
+ """Fetch a Playlist instance.
+
+ Additional API parameters should be given as keyword arguments.
+
+ :param id: youtube channel id e.g. 'UCMDQxm7cUx3yXkfeHa5zJIQ'
+ :return: Channel instance if channel is found, else None
+
+ """
+ api_params = {
+ 'part': 'id',
+ }
+
+ if id is not None:
+ api_params['id'] = id
+ elif url is not None:
+ parse = self.parse_url(url)
+ if 'playlist' in parse:
+ api_params['id'] = parse['playlist']
+ else:
+ raise ValueError('Please specify exactly one of: id, url')
+
+ api_params.update(kwargs)
+
+ query = Query(self, 'playlists', api_params)
+ return ListResponse(query).first()
+
+ def playlist_items(self, id, **kwargs):
+ """Fetch a Playlist instance.
+
+ Additional API parameters should be given as keyword arguments.
+
+ :param id: youtube channel id e.g. 'UCMDQxm7cUx3yXkfeHa5zJIQ'
+ :return: Channel instance if channel is found, else None
+
+ """
+ api_params = {
+ 'part': 'id,snippet',
+ 'playlistId': id,
+ }
+ api_params.update(kwargs)
+
+ query = Query(self, 'playlist_items', api_params)
+ return ListResponse(query)
+
+
+class Query(object):
+ """Everything we need to execute a query and retrieve the raw response dictionary."""
+
+ def __init__(self, youtube, endpoint, api_params=None):
+ """Initialise the query.
+
+ :param youtube: YouTube instance
+ :param endpoint: string giving the api endpoint to query, e.g. 'videos', 'search'...
+ :param api_params: dict of keyword parameters to send (directly) to the api
+
+ """
+ self.youtube = youtube
+ self.endpoint = endpoint
+ self.api_params = api_params or dict()
+
+ if 'part' not in api_params:
+ api_params['part'] = 'id'
+
+ endpoint_func_mapping = {
+ 'search': self.youtube.build.search().list,
+ 'videos': self.youtube.build.videos().list,
+ 'channels': self.youtube.build.channels().list,
+ 'subscriptions': self.youtube.build.subscriptions().list,
+ 'playlists': self.youtube.build.playlists().list,
+ 'playlist_items': self.youtube.build.playlistItems().list,
+ }
+
+ try:
+ self.query_func = endpoint_func_mapping[self.endpoint]
+ except KeyError:
+ raise ValueError(f"youtube api endpoint '{self.endpoint}' not recognised.")
+
+ def __repr__(self):
+ return "".format(self.endpoint, self.api_params)
+
+ def execute(self, api_params=None):
+ """Execute the query.
+
+ :param api_params: extra api parameters to send with the query.
+ :return: api response dictionary
+
+ """
+ if api_params is not None:
+ # update only for this query execution
+ query_params = self.api_params.copy()
+ query_params.update(api_params)
+ else:
+ query_params = self.api_params
+
+ log.debug(f"executing query with {str(query_params)}")
+ return self.query_func(**query_params).execute()
+
+
+class ListResponse(collections.Iterator):
+ """Executes a query and creates a data structure containing Resource instances.
+
+ When iterated over, this object behaves like an iterator, paging through the results and
+ creating Resource instances (Video, Channel, Playlist...) as they are required.
+
+ When indexed with an integer n, returns the nth Resource.
+
+ When sliced, returns a list of Resource instances.
+
+ Due to limitations in the API, you'll never get more than ~500 from a search result -
+ definitely for the 'search' endoint and probably others as well. Also, the value given in
+ pageInfo.totalResults for how many results are returned is pretty worthless. It may be an
+ estimate of total numbers of results _before filtering_, and it'll never be more than a
+ million. See this issue for more details: https://issuetracker.google.com/issues/35171641
+
+ """
+
+ def __init__(self, query):
+ self.youtube = query.youtube
+ self.query = query
+
+ self.kind = None
+ self.total_results = None
+ self.results_per_page = None
+
+ self._reset()
+
+ def _reset(self):
+ self._listing = None # internal storage for current page listing
+ self._list_index = None # index of item within current listing
+ self._no_more_pages = False # flagged when we reach the end of the available results
+ self._page_count = 0 # no. of pages processed
+ self._item_count = 0 # total no. of items yielded
+ self._next_page_token = None # api page token required for the next page of results
+
+ def __repr__(self):
+ return "".format(
+ self.query.endpoint, self.total_results, self.results_per_page
+ )
+
+ def __iter__(self):
+ """Allow this object to act as an iterator."""
+ return self
+
+ def __next__(self):
+ """Get the next resource.
+
+ This method allows the list reponse to be iterated over. First we fetch a page of search
+ results, load the response into memory and and return each resource in turn. If we're at
+ the end of a page we fetch a new one, replacing the old page in memory.
+
+ """
+ # fetch the next page of items if we haven't fetched the first page yet, or alternatively
+ # if we've run out of results on this page. this check relies on results_per_page being
+ # set if _listing is not None (which of course it should be).
+ if self._listing is None or self._list_index >= self.results_per_page:
+ self._fetch_next()
+
+ # get the next item. if this fails now we must be out of results.
+ # note: often you'll still get a next page token, even if the results end on this page,
+ # meaning the _no_more_pages flag will not be set.
+ # in this case, the items list on the _next_ page should be empty, but we don't check this.
+ try:
+ item = self._listing[self._list_index]
+ except IndexError:
+ log.debug(f"exhausted all results at item {self._item_count} "
+ f"(item {self._list_index + 1} on page {self._page_count})")
+ self._no_more_pages = True # unnecessary but true
+ raise StopIteration()
+
+ self._list_index += 1
+ self._item_count += 1
+ return create_resource_from_api_response(self.youtube, item)
+
+ def __getitem__(self, index):
+ """Get a specific resource or list of resources.
+
+ This method handles indexing by integer or slice, e.g.:
+ listresponse[n] returns the nth Resource instance
+ listresponse[:n] returns the first n Resources as a list
+
+ We do this by just repeatedly calling the __next__() method until we have the items we're
+ looking for, which is a pretty dumb way of doing it but it'll do for now.
+
+ Before finding an item or items, we call _reset() so that if this response has been used
+ as an iterator we go back and start again. After the requested item or items have been
+ found we _reset() again so that the response can still be iterated over.
+
+ """
+ if isinstance(index, int):
+ # if an integer is used we just return a single item. we'll just __next__()
+ # along until we're there. this is a bit silly because we're creating a resource for
+ # each call and only returning the final one, but it'll do for now.
+ self._reset()
+ try:
+ for _ in range(index):
+ self.__next__()
+ except StopIteration:
+ self._reset()
+ raise IndexError("index out of range")
+
+ # store item to be returned
+ try:
+ item = self.__next__()
+ except StopIteration:
+ self._reset()
+ raise IndexError("index out of range")
+
+ # reset so that this object can still be used as a generator
+ self._reset()
+
+ return item
+
+ elif isinstance(index, slice):
+ # if a slice is used we want to return a list (not a generator). we'll use
+ # __next__() to build up the list.
+ start = 0 if index.start is None else index.start
+ stop = index.stop
+ step = index.step
+
+ if step not in (1, None):
+ raise NotImplementedError("can't use a slice step other than one")
+
+ if start < 0 or (stop is not None and stop < 0):
+ raise NotImplementedError("can't use negative numbers in slices")
+
+ # ok if all that worked let's reset so that __next__() gives the first item in the
+ # list response
+ self._reset()
+
+ if start > 0:
+ # move to start position
+ try:
+ for _ in range(start):
+ self.__next__()
+ except StopIteration:
+ # if the slice start is greater than the total length you usually get an empty
+ # list
+ return []
+
+ if stop is not None:
+ # iterate over the range provided by the slice
+ range_ = range(start, stop)
+ else:
+ # make the for loop iterate until StopIteration is raised
+ range_ = itertools.count()
+
+ items = []
+ for _ in range_:
+ try:
+ items.append(self.__next__())
+ except StopIteration:
+ # if the slice end is greater than the total length you usually get a
+ # truncated list
+ break
+
+ self._reset()
+ return items
+
+ else:
+ raise KeyError(f"you can't index a ListResponse with '{index}'")
+
+ def _fetch_next(self):
+ """Fetch the next page of the API response and load into memory."""
+ if self._no_more_pages:
+ # we should only get here if results stop at a page boundary
+ log.debug(f"exhausted all results at item {self._item_count} at page boundary "
+ f"(item {self._list_index + 1} on page {self._page_count})")
+ raise StopIteration()
+
+ # pass the next page token if this is not the first page we're fetching
+ params = dict()
+ if self._next_page_token:
+ params['pageToken'] = self._next_page_token
+
+ # execute query to get raw response dictionary
+ raw = self.query.execute(api_params=params)
+
+ # the following data shouldn't change, so store only if it's not been set yet
+ # (i.e. this is the first fetch)
+ if None in (self.kind, self.total_results, self.results_per_page):
+ # don't use get() because if this data doesn't exist in the api response something
+ # has gone wrong and we'd like an exception
+ self.kind = raw['kind'].replace('youtube#', '')
+ self.total_results = int(raw['pageInfo']['totalResults'])
+ self.results_per_page = int(raw['pageInfo']['resultsPerPage'])
+
+ # whereever we are in the list response we need the next page token. if it's not there,
+ # set a flag so that we know there's no more to be fetched (note _next_page_token is also
+ # None at initialisation so we can't check it that way).
+ self._next_page_token = raw.get('nextPageToken', None)
+ if self._next_page_token is None:
+ self._no_more_pages = True
+
+ # store items in raw format for processing by __next__()
+ self._listing = raw['items'] # would like a KeyError if this fails (it shouldn't)
+ self._list_index = 0
+ self._page_count += 1
+
+ def first(self):
+ try:
+ return self[0]
+ except IndexError:
+ return None
+
+
+def create_resource_from_api_response(youtube, item):
+ """Given a raw item from an API response, return the appropriate Resource instance."""
+
+ # extract kind and id for the item. if it's a search result then we have to do a bit of
+ # wrangling. but we only extract the data - don't alter anything in the api response item!
+ kind = item['kind'].replace('youtube#', '')
+ if kind == 'searchResult':
+ kind = item['id']['kind'].replace('youtube#', '')
+ id_label = kind + 'Id'
+ id = item['id'][id_label]
+ else:
+ id = item['id']
+
+ if kind == 'video':
+ return Video(youtube, id, item)
+ elif kind == 'channel':
+ return Channel(youtube, id, item)
+ elif kind == 'playlist':
+ return Playlist(youtube, id, item)
+ elif kind == 'subscription':
+ channel_id = item['snippet']['resourceId']['channelId']
+ return Channel(youtube, id=channel_id)
+ elif kind == 'playlistItem':
+ return PlaylistItem(youtube, id, item)
+ else:
+ raise NotImplementedError(f"can't deal with resource kind '{kind}'")
+
+
+class Thumbnail(object):
+ def __init__(self, id: str, url: str, width: int, height: int):
+ self.id = id
+ self.url = url
+ self.width = width
+ self.height = height
+
+ def __repr__(self):
+ return f''
+
+
+class Resource(ABC):
+ """Base class for YouTube resource classes, e.g. Video, Channel etc."""
+
+ @property
+ @abstractmethod
+ def ENDPOINT(self):
+ pass
+
+ @property
+ @abstractmethod
+ def ATTRIBUTE_DEFS(self):
+ pass
+
+ def __init__(self, youtube, id, data=None):
+ """Initialise a Resource object.
+
+ Need the YouTube instance, in case further queries are required, the resource id,
+ and (optionally) some data in the form of an API response.
+
+ """
+ # if we need to query again for more data we'll need access to the youtube instance
+ self.youtube = youtube
+
+ # every resource has a unique id, it may be a different format for each resource type though
+ self.id = id
+
+ # data is the api response item for the resource. it's a dictionary with 'kind',
+ # 'etag' and 'id' keys, at least. it may also have a 'snippet', 'contentDetails' etc.
+ # containing more detailed info. this dictionary could be accessed directly,
+ # but we'll make the data accessible via class attributes where possible so that we can
+ # do type conversion etc.
+ #
+ # if the data is from a search result we need to handle it differently. search results
+ # have some useful basic data and we'd like to use that if possible to prevent another
+ # api request. however, we'll need to know later if all we have is a search result (in
+ # which case a lot of stuff will be missing) or a genuine resource api request.
+ if data:
+ if 'kind' in data and 'searchResult' in data['kind']:
+ self._search_data = data
+ self._data = {}
+ else:
+ self._search_data = {}
+ self._data = data
+ else:
+ self._search_data = {}
+ self._data = {}
+
+ # this dictionary will log which attributes we've tried to fetch so that we don't get
+ # stuck in an infinite loop if something goes badly wrong
+ self._tried_to_fetch = {}
+
+ # update attributes with whatever we've been given as data
+ self._update_attributes()
+
+ def __eq__(self, other):
+ if isinstance(self, other.__class__):
+ return self.__dict__ == other.__dict__
+
+ # if they're different classes return NotImplemented instead of False so that we fallback
+ # to the default comparison method
+ return NotImplemented
+
+ def __hash__(self):
+ return hash(tuple(sorted(self.__dict__.items())))
+
+ def __repr__(self):
+ n_chars = 16
+ if len(self.title) > n_chars:
+ short_title = self.title[:(n_chars - 3)] + '...'
+ else:
+ short_title = self.title
+ return f"<{type(self).__name__} {self.id} \"{short_title}\">"
+
+ def __str__(self):
+ return self.title
+
+ def _update_attributes(self):
+ """Take internally stored raw data and creates attributes with right types etc.
+
+ Attributes defined in ATTRIBUTE_DEFS will be added as attributes, if they exist in
+ internal data storage.
+
+ """
+ for attr_name, attr_def in self.ATTRIBUTE_DEFS.items():
+ type_ = attr_def.type_
+ part = attr_def.part
+ if isinstance(attr_def.name, str):
+ # may be a string or list - we want a list
+ keys = [attr_def.name, ]
+ else:
+ keys = attr_def.name
+
+ try:
+ raw_value = self._get(part, *keys)
+ except DataMissing:
+ # if data is missing it basically means one of three things: we've not tried to
+ # fetch it yet, we fetched the right part but it was null and not returned with
+ # the query, or something is badly wrong (e.g. a bad AttributeDef).
+ #
+ # we check for the second case by looking in the data store to see if the part is
+ # there. if it is, we set the attribute to show we've fetched and there was
+ # nothing there.
+ #
+ # in the other two cases, just don't set this attribute right now.
+ if self._data.get(part) is not None:
+ if type_ in ('str', 'string'):
+ raw_value = ''
+ elif type_ in ('int', 'integer', 'float'):
+ raw_value = 0
+ elif type_ == 'list':
+ raw_value = []
+ else:
+ raw_value = None
+ else:
+ continue
+
+ if type_ is None:
+ value = raw_value
+ elif type_ in ('str', 'string'):
+ value = str(raw_value)
+ elif type_ in ('int', 'integer'):
+ value = int(raw_value)
+ elif type_ == 'float':
+ value = float(raw_value)
+ elif type_ == 'list':
+ value = list(raw_value)
+ elif type_ == 'datetime':
+ value = string_to_datetime(raw_value)
+ elif type_ == 'timedelta':
+ value = timedelta(seconds=youtube_duration_to_seconds(raw_value))
+ elif type_ == 'thumbnails':
+ value = []
+ for key, val in raw_value.items():
+ url = val.get('url', None)
+ width = val.get('width', None)
+ height = val.get('width', None)
+ value.append(Thumbnail(key, url, width, height))
+ else:
+ raise TypeError(f"type '{type_}' not recognised.")
+
+ setattr(self, attr_name, value)
+
+ def _get(self, *keys):
+ """Get a data attribute from the stored item response, if it exists.
+
+ If it doesn't, raise DataMissing exception. This could be because the necessary
+ information was not included in the 'part' argument in the original query, or because
+ youtube doesn't have the information stored (e.g. if country is not set by the user,
+ the key is not present in the API response).
+
+ :param *keys: one or more dictionary keys. if there's more than one, we'll query
+ them recursively, so _get('a', 'b', 'c') will return
+ self._items['a']['b']['c']
+ :return: the data attribute
+
+ """
+
+ def get_from_nested_dict(dict_, *keys):
+ """Get item from a nested dictionary; raise KeyError if it's not there."""
+ param = dict_
+ for key in keys:
+ param = param[key]
+ return param
+
+ try:
+ param = get_from_nested_dict(self._data, *keys)
+ return param
+ except KeyError:
+ pass
+
+ try:
+ param = get_from_nested_dict(self._search_data, *keys)
+ return param
+ except KeyError:
+ pass
+
+ raise DataMissing(f"attribute with keys {str(keys)} not present in self._data or "
+ f"self._search_data. either it doesn't exist, or that part needs "
+ f"fetching.")
+
+ def __getattr__(self, item):
+ """If an attribute hasn't been set, this function tries to fetch and add it.
+
+ Note: the __getattr__ method is only ever called when an attribute can't be found,
+ therefore there is no need to check if the attribute already exists within this function.
+
+ If the attribute isn't present in ATTRIBUTE_DEFS, raise AttributeError.
+
+ :param item: attribute name
+ :return: attribute value
+
+ """
+ if item not in self.ATTRIBUTE_DEFS:
+ raise AttributeError(f"attribute '{item}' not recognised for resource type "
+ f"'{type(self).__name__}'")
+
+ if self._tried_to_fetch.get(item):
+ raise AttributeError(f"already tried to fetch attribute '{item}'")
+
+ # fetch the required part and update to (hopefully) set the required attribute
+ self._fetch(part=self.ATTRIBUTE_DEFS[item].part)
+ self._update_attributes()
+ self._tried_to_fetch[item] = True
+
+ # now getattr() should access the attribute directly. if not, we'll get an attribute
+ # error from this function because we've logged which items we've tried to fetch.
+ return getattr(self, item)
+
+ def _fetch(self, part):
+ """Query the API for a specific data part.
+
+ Build a query and execute it. Update internal storage to reflect the new data. Note:
+ access to the data via attributes will not update until _update_attributes() is called.
+
+ :param part: part string for the API query.
+
+ """
+ part_string = f"id,{part}"
+
+ # get a raw listResponse from youtube
+ response = Query(
+ youtube=self.youtube,
+ endpoint=self.ENDPOINT,
+ api_params={'part': part_string, 'id': self.id}
+ ).execute()
+
+ # get the first resource item and update the internal data storage
+ item = response['items'][0]
+ self._data.update(item)
+
+
+class AttributeDef(object):
+ """Defines a Resource attribute.
+
+ To make the API data available as attributes on Resource objects we need to know
+ 1. where to find the data in the API response, and
+ 2. what data type the attribute should have.
+
+ This class defines the 'part' (in API terminology) that the attribute can be found in,
+ and it's name (the dictionary key within the part), so that it can be found in the API
+ response.
+
+ If a non-existant part is given the API will raise a HttpError. If a non-existant name is
+ given (within an existing part) then ptyaw will fallback to the default for the given type.
+ This is because it's tricky to know whether an attribute exists from the API response -
+ sometimes an attribute will not be returned if it is null (e.g. if a user does not set a
+ country for their channel it will simply not be returned in the API response).
+
+ The data type should also be given as a string ('str', 'int', 'datetime' etc), so that we can
+ convert it when we add the data as an attribute to the Resource instance. If not given or
+ None, no type conversion is performed.
+
+ """
+
+ def __init__(self, part, name, type_=None):
+ self.part = part
+ self.name = name
+ self.type_ = type_
+
+
+class Video(Resource):
+ """A single YouTube video."""
+
+ ENDPOINT = 'videos'
+ ATTRIBUTE_DEFS = {
+ #
+ # snippet
+ 'title': AttributeDef('snippet', 'title', type_='str'),
+ 'description': AttributeDef('snippet', 'description', type_='str'),
+ 'published_at': AttributeDef('snippet', 'publishedAt', type_='datetime'),
+ 'tags': AttributeDef('snippet', 'tags', type_='list'),
+ 'channel_id': AttributeDef('snippet', 'channelId', type_='str'),
+ 'channel_title': AttributeDef('snippet', 'channelTitle', type_='str'),
+ 'thumbnails': AttributeDef('snippet', 'thumbnails', type_='thumbnails'),
+ #
+ # contentDetails
+ 'duration': AttributeDef('contentDetails', 'duration', type_='timedelta'),
+ #
+ # status
+ 'license': AttributeDef('status', 'license', type_='str'),
+ #
+ # statistics
+ 'n_views': AttributeDef('statistics', 'viewCount', type_='int'),
+ 'n_likes': AttributeDef('statistics', 'likeCount', type_='int'),
+ 'n_dislikes': AttributeDef('statistics', 'dislikeCount', type_='int'),
+ 'n_favorites': AttributeDef('statistics', 'favoriteCount', type_='int'),
+ 'n_comments': AttributeDef('statistics', 'commentCount', type_='int'),
+ }
+
+ @property
+ def is_cc(self):
+ return self.license == 'creativeCommon'
+
+ @property
+ def channel(self):
+ return self.youtube.channel(id=self.channel_id)
+
+ @property
+ def url(self):
+ return f"https://www.youtube.com/watch?v={self.id}"
+
+
+class Channel(Resource):
+ """A single YouTube channel."""
+
+ ENDPOINT = 'channels'
+ ATTRIBUTE_DEFS = {
+ #
+ # snippet
+ 'title': AttributeDef('snippet', 'title'),
+ 'description': AttributeDef('snippet', 'description'),
+ 'published_at': AttributeDef('snippet', 'publishedAt', type_='datetime'),
+ 'thumbnails': AttributeDef('snippet', 'thumbnails', type_='thumbnails'),
+ 'country': AttributeDef('snippet', 'country', type_='str'),
+ 'custom_url': AttributeDef('snippet', 'customUrl', type_='str'),
+ #
+ # statistics
+ 'n_videos': AttributeDef('statistics', 'videoCount', type_='int'),
+ 'n_subscribers': AttributeDef('statistics', 'subscriberCount', type_='int'),
+ 'n_views': AttributeDef('statistics', 'viewCount', type_='int'),
+ 'n_comments': AttributeDef('statistics', 'commentCount', type_='int'),
+ #
+ # content details - playlists
+ '_related_playlists': AttributeDef('contentDetails', 'relatedPlaylists')
+ }
+
+ @property
+ def uploads_playlist(self):
+ playlists = self._related_playlists
+ if 'uploads' in playlists:
+ return self.youtube.playlist(playlists['uploads'])
+ return None
+
+ def most_recent_upload(self):
+ response = self.most_recent_uploads(n=1)
+ return response[0]
+
+ def most_recent_uploads(self, n=50):
+ if n > 50:
+ raise ValueError(f"n must be less than 50, not {n}")
+
+ api_search_params = {
+ 'part': 'id',
+ 'channelId': self.id,
+ 'maxResults': n,
+ 'order': 'date',
+ 'type': 'video',
+ }
+ response = self.youtube.search(**api_search_params)
+ return response[:n]
+
+
+class Playlist(Resource):
+ """A single YouTube playlist."""
+
+ ENDPOINT = 'playlists'
+ ATTRIBUTE_DEFS = {
+ #
+ # snippet
+ 'title': AttributeDef('snippet', 'title'),
+ 'description': AttributeDef('snippet', 'description'),
+ 'published_at': AttributeDef('snippet', 'publishedAt', type_='datetime'),
+ 'thumbnails': AttributeDef('snippet', 'thumbnails', type_='thumbnails'),
+ 'channel_id': AttributeDef('snippet', 'channelId', type_='str'),
+ 'channel_title': AttributeDef('snippet', 'channelTitle', type_='str'),
+ }
+
+ @property
+ def items(self):
+ api_params = {
+ 'part': 'id,snippet',
+ 'maxResults': 50,
+ }
+ return self.youtube.playlist_items(self.id, **api_params)
+
+ @property
+ def channel(self):
+ return self.youtube.channel(self.channel_id)
+
+
+class PlaylistItem(Resource):
+ """A playlist item."""
+ ENDPOINT = 'playlist_items'
+ ATTRIBUTE_DEFS = {
+ #
+ # snippet
+ 'title': AttributeDef('snippet', 'title'),
+ 'description': AttributeDef('snippet', 'description'),
+ 'channel_id': AttributeDef('snippet', 'channelId', type_='str'),
+ 'published_at': AttributeDef('snippet', 'publishedAt', type_='datetime'),
+ 'thumbnails': AttributeDef('snippet', 'thumbnails', type_='thumbnails'),
+ 'channel_title': AttributeDef('snippet', 'channelTitle', type_='str'),
+ 'playlist_id': AttributeDef('snippet', 'playlistId', type_='str'),
+ 'position': AttributeDef('snippet', 'position', type_='int'),
+ 'resource_kind': AttributeDef('snippet', ['resourceId', 'kind'], type_='str'),
+ 'resource_video_id': AttributeDef('snippet', ['resourceId', 'videoId'], type_='str'),
+ }
+
+ @property
+ def video(self):
+ if self.resource_kind == 'youtube#video':
+ return self.youtube.video(self.resource_video_id)
+ return None
+
diff --git a/external/pytaw/setup.py b/external/pytaw/setup.py
new file mode 100644
index 0000000..3f20c7d
--- /dev/null
+++ b/external/pytaw/setup.py
@@ -0,0 +1,12 @@
+from setuptools import setup
+
+setup(
+ name='pytaw',
+ version='0.0.1',
+ packages=['pytaw'],
+ url='https://github.com/6000hulls/pytaw',
+ license='',
+ author='6000hulls',
+ author_email='6000hulls@gmail.com',
+ description='PYTAW: Python YouTube API Wrapper'
+)
diff --git a/external/pytaw/tests/__init__.py b/external/pytaw/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/external/pytaw/tests/test_pytaw.py b/external/pytaw/tests/test_pytaw.py
new file mode 100644
index 0000000..412febc
--- /dev/null
+++ b/external/pytaw/tests/test_pytaw.py
@@ -0,0 +1,165 @@
+import pytest
+import logging
+import sys
+import collections
+from datetime import datetime, timedelta
+
+from googleapiclient.errors import HttpError
+
+from pytaw import YouTube
+from pytaw.youtube import Resource, Video, AttributeDef
+
+
+logging.basicConfig(stream=sys.stdout) # show log output when run with pytest -s
+log = logging.getLogger(__name__)
+log.setLevel(logging.INFO)
+
+
+@pytest.fixture
+def youtube():
+ """A YouTube instance initialised with a developer key loaded from config.ini"""
+ return YouTube()
+
+
+@pytest.fixture
+def video(youtube):
+ """A Video instance for the classic video 'Me at the zoo'"""
+ return youtube.video('jNQXAC9IVRw')
+
+
+@pytest.fixture
+def channel(youtube):
+ """A Channel instance for the 'YouTube Help' channel"""
+ return youtube.channel('UCMDQxm7cUx3yXkfeHa5zJIQ')
+
+
+@pytest.fixture
+def search(youtube):
+ """A ListResponse instance corresponding to a search for the query 'python'"""
+ return youtube.search()
+
+
+@pytest.fixture
+def video_search(youtube):
+ """A ListResponse instance corresponding to a video search for the query 'python'"""
+ return youtube.search(q='python', type='video')
+
+
+@pytest.fixture
+def video_search_array(youtube):
+ """An array of video searches with a wide range of results (zero to millions)."""
+ one_minute_ago = datetime.utcnow() - timedelta(minutes=1)
+ five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
+ return [
+ #
+ # no results
+ youtube.search(q='minecraft', type='video', publishedBefore=datetime(2000, 1, 1)),
+ #
+ # less than 100 results
+ youtube.search(q='minecraft', type='video', publishedBefore=datetime(2005, 7, 1)),
+ #
+ # over 100 results
+ youtube.search(q='minecraft', type='video', publishedBefore=datetime(2006, 1, 1)),
+ #
+ # variable number of results (hundreds or thousands...?)
+ youtube.search(q='minecraft', type='video', publishedAfter=one_minute_ago),
+ youtube.search(q='minecraft', type='video', publishedAfter=five_minutes_ago),
+ #
+ # over a million results
+ youtube.search(q='minecraft', type='video'),
+ youtube.search(q='minecraft'),
+ ]
+
+
+class TestResource:
+
+ def test_equality(self, search):
+ a = search[0]
+ b = search[0]
+ c = search[1]
+ assert a == b
+ assert a != c
+
+ def test_unknown_attribute(self, video):
+ with pytest.raises(AttributeError):
+ _ = video.attribute_name_which_definitely_will_never_exist
+
+ def test_unknown_part_in_attributedef(self, video):
+ video.ATTRIBUTE_DEFS['x'] = AttributeDef('nonexistant_part', 'x')
+ with pytest.raises(HttpError):
+ _ = video.x
+
+ def test_unknown_attribute_name_in_attributedef(self, video):
+ video.ATTRIBUTE_DEFS['x'] = AttributeDef('snippet', 'nonexistant_attribute')
+ assert video.x is None
+
+
+class TestVideo:
+
+ def test_bad_video_id(self, youtube):
+ video = youtube.video('not_a_valid_youtube_video_id')
+ assert video is None
+
+ def test_title(self, video):
+ assert video.title == "Me at the zoo"
+
+ def test_published_at(self, video):
+ assert video.published_at.isoformat() == '2005-04-24T03:31:52+00:00'
+
+ def test_n_views(self, video):
+ assert video.n_views > int(40e6)
+
+ def test_tags(self, video):
+ assert video.tags == ['jawed', 'karim', 'elephant', 'zoo', 'youtube', 'first', 'video']
+
+ def test_duration(self, video):
+ assert video.duration.total_seconds() == 19
+
+
+class TestChannel:
+
+ def test_title(self, channel):
+ assert channel.title == "YouTube Help"
+
+
+class TestSearch:
+
+ def test_video_search_returns_a_video(self, video_search):
+ assert isinstance(video_search[0], Video)
+
+ def test_video_search_has_many_results(self, video_search):
+ # make video_search unlazy (populate pageInfo attributes)
+ _ = video_search[0]
+ assert video_search.total_results > 10000
+
+ def test_search_iteration(self, search):
+ """Simply iterate over a search, creating all resources, to check for exceptions."""
+ for resource in search:
+ log.debug(resource)
+
+
+class TestListResponse:
+
+ def test_if_iterable(self, search):
+ assert isinstance(search, collections.Iterator)
+
+ def test_integer_indexing(self, search):
+ assert isinstance(search[0], Resource)
+
+ def test_slice_indexing(self, search):
+ assert isinstance(search[1:3], list)
+
+ def test_full_listing_iteration(self, video_search_array):
+ """Iterate over all search results to check no exceptions are raised when paging etc.
+
+ Even if millions of results are found, the API will never return more than 500 (by
+ design), so we're okay to just bang right through the search results generator for the
+ whole array of video searches.
+
+ """
+ for i, search in enumerate(video_search_array):
+ c = 0
+ for _ in search:
+ c += 1
+
+ log.debug(f"checked first {c} results (search #{i})")
\ No newline at end of file
diff --git a/readme.md b/readme.md
index 7c83a92..6f4f93e 100644
--- a/readme.md
+++ b/readme.md
@@ -10,4 +10,5 @@ A self-hosted tool which manages your YouTube subscriptions, and downloads files
* youtube-dl: `$ pip3 install youtube-dl`
* google-api-python-client: `$ pip3 install google-api-python-client`
* google_auth_oauthlib: `$ pip3 install google_auth_oauthlib`
-* apscheduler (v3.5+): `$ pip3 install apscheduler`
\ No newline at end of file
+* apscheduler (v3.5+): `$ pip3 install apscheduler`
+* (recommended) oauth2client: `$ pip3 install oauth2client`
\ No newline at end of file