diff --git a/app/external/pytaw/pytaw/youtube.py b/app/external/pytaw/pytaw/youtube.py index ecb413c..5b14497 100644 --- a/app/external/pytaw/pytaw/youtube.py +++ b/app/external/pytaw/pytaw/youtube.py @@ -1,13 +1,12 @@ -from datetime import timedelta -import os -import time -import logging -import configparser import collections +import configparser import itertools -from pprint import pprint, pformat -from abc import ABC, abstractmethod +import logging +import os +from urllib.parse import urlsplit, parse_qs import typing +from abc import ABC, abstractmethod +from datetime import timedelta import googleapiclient.discovery from oauth2client.client import AccessTokenCredentials @@ -28,6 +27,11 @@ class DataMissing(Exception): pass +class InvalidURL(Exception): + """Exception raised if an URL is not valid.""" + pass + + class YouTube(object): """The interface to the YouTube API. @@ -165,9 +169,107 @@ class YouTube(object): query = Query(self, 'videos', api_params) response_list.append(ListResponse(query)) - return itertools.chain(response_list) + return itertools.chain(*response_list) - def channel(self, id, **kwargs): + def parse_url(self, url: str) -> dict: + """ + Parses a YouTube URL, and attempts to identify what resource it refers to. + :param url: URL to parse + :return: Returns a dictionary, containing the url 'type', and the url resource ('video', 'playlist', 'channel', + 'channel_custom', 'username') + """ + result = {'type': 'unknown'} + + url_spl = urlsplit(url) + url_path = url_spl.path.split('/') + url_query = parse_qs(url_spl.query) + + if url_spl.netloc.endswith('youtube.com'): + + # http://www.youtube.com/watch?v=-wtIMTCHWuI + if url_path[1] == 'watch': + result['type'] = 'video' + result['video'] = url_query['v'][0] + if 'list' in url_query: + result['playlist'] = url_query['list'][0] + + # http://www.youtube.com/v/-wtIMTCHWuI?version=3&autohide=1 + # https://www.youtube.com/embed/M7lc1UVf-VE + elif url_path[1] == 'v': + result['type'] = 'video' + result['video'] = url_path[2] + if 'list' in url_query: + result['playlist'] = url_query['list'][0] + + # https://www.youtube.com/playlist?list=PLJRbJuI_csVDXhgRJ1xv6z-Igeb7CKroe + elif url_path[1] == 'playlist': + result['type'] = 'playlist' + result['playlist'] = url_query['list'][0] + + # https://www.youtube.com/channel/UC0QHWhjbe5fGJEPz3sVb6nw + elif url_path[1] == 'channel': + result['type'] = 'channel' + result['channel'] = url_path[2] + + # https://www.youtube.com/c/LinusTechTips + elif url_path[1] == 'c': + result['type'] = 'channel_custom' + result['channel_custom'] = url_path[1] + + # https://www.youtube.com/user/LinusTechTips + elif url_path[1] == 'user': + result['type'] = 'user' + result['username'] = url_path[2] + + # http://www.youtube.com/oembed?url=http%3A//www.youtube.com/watch?v%3D-wtIMTCHWuI&format=json + elif url_path[1] == 'oembed': + return self.parse_url(url_query['url'][0]) + + # http://www.youtube.com/attribution_link?a=JdfC0C9V6ZI&u=%2Fwatch%3Fv%3DEhxJLojIE_o%26feature%3Dshare + elif url_path[1] == 'attribution_link': + return self.parse_url('http://youtube.com/' + url_query['u'][0]) + + # https://www.youtube.com/results?search_query=test + elif url_path[1] == 'search' or url_path[1] == 'results': + result['type'] = 'search' + result['query'] = url_query['search_query'][0] + + # Custom channel URLs might have the format https://www.youtube.com/LinusTechTips, which are pretty much + # impossible to handle properly + else: + raise InvalidURL('Unrecognized URL format: ' + url) + + # http://youtu.be/-wtIMTCHWuI + elif url_spl.netloc == 'youtu.be': + result['type'] = 'video' + result['video'] = url_path[1] + + # https://youtube.googleapis.com/v/My2FRPA3Gf8 + elif url_spl.netloc == 'youtube.googleapis.com': + if url_path[1] == 'v': + result['type'] = 'video' + result['video'] = url_path[2] + else: + raise InvalidURL('Unrecognized URL format: ' + url) + + else: + raise InvalidURL('Unrecognized URL format: ' + url) + + return result + + def __find_channel_by_custom_url(self, custom_part): + # See https://stackoverflow.com/a/37947865 + # Using the YT API, the only way to obtain a channel using a custom URL that we know of is to search for it. + # Another option (which might be more reliable) could be scraping the page + api_params = { + 'part': 'id', + 'type': 'channel', + 'q': custom_part, + } + + return self.search(**api_params).first() + + def channel(self, channel_id=None, username=None, url=None, **kwargs): """Fetch a Channel instance. Additional API parameters should be given as keyword arguments. @@ -178,14 +280,31 @@ class YouTube(object): """ api_params = { 'part': 'id', - 'id': id, } + + if channel_id is not None: + api_params['id'] = channel_id + elif username is not None: + api_params['forUsername'] = username + elif url is not None: + parse = self.parse_url(url) + if parse['type'] == 'channel': + api_params['id'] = parse['channel'] + elif parse['type'] == 'user': + api_params['forUsername'] = parse['username'] + elif parse['type'] == 'channel_custom': + return self.__find_channel_by_custom_url(parse['channel_custom']) + else: + raise InvalidURL('Can\'t extract channel from given URL.') + else: + raise ValueError('Please specify exactly one of: channel_id, username, url') + api_params.update(kwargs) query = Query(self, 'channels', api_params) return ListResponse(query).first() - def playlist(self, id, **kwargs): + def playlist(self, id=None, url=None, **kwargs): """Fetch a Playlist instance. Additional API parameters should be given as keyword arguments. @@ -196,8 +315,17 @@ class YouTube(object): """ api_params = { 'part': 'id', - 'id': id, } + + if id is not None: + api_params['id'] = id + elif url is not None: + parse = self.parse_url(url) + if 'playlist' in parse: + api_params['id'] = parse['playlist'] + else: + raise ValueError('Please specify exactly one of: id, url') + api_params.update(kwargs) query = Query(self, 'playlists', api_params) @@ -794,6 +922,7 @@ class Video(Resource): 'tags': AttributeDef('snippet', 'tags', type_='list'), 'channel_id': AttributeDef('snippet', 'channelId', type_='str'), 'channel_title': AttributeDef('snippet', 'channelTitle', type_='str'), + 'thumbnails': AttributeDef('snippet', 'thumbnails', type_='thumbnails'), # # contentDetails 'duration': AttributeDef('contentDetails', 'duration', type_='timedelta'), @@ -834,6 +963,7 @@ class Channel(Resource): 'published_at': AttributeDef('snippet', 'publishedAt', type_='datetime'), 'thumbnails': AttributeDef('snippet', 'thumbnails', type_='thumbnails'), 'country': AttributeDef('snippet', 'country', type_='str'), + 'custom_url': AttributeDef('snippet', 'customUrl', type_='str'), # # statistics 'n_videos': AttributeDef('statistics', 'videoCount', type_='int'), @@ -841,18 +971,17 @@ class Channel(Resource): 'n_views': AttributeDef('statistics', 'viewCount', type_='int'), 'n_comments': AttributeDef('statistics', 'commentCount', type_='int'), # - # playlists + # content details - playlists '_related_playlists': AttributeDef('contentDetails', 'relatedPlaylists') } - def get_uploads_playlist(self): + @property + def uploads_playlist(self): playlists = self._related_playlists if 'uploads' in playlists: return self.youtube.playlist(playlists['uploads']) return None - uploads_playlist = property(get_uploads_playlist) - def most_recent_upload(self): response = self.most_recent_uploads(n=1) return response[0] @@ -882,16 +1011,22 @@ class Playlist(Resource): 'title': AttributeDef('snippet', 'title'), 'description': AttributeDef('snippet', 'description'), 'published_at': AttributeDef('snippet', 'publishedAt', type_='datetime'), + 'thumbnails': AttributeDef('snippet', 'thumbnails', type_='thumbnails'), + 'channel_id': AttributeDef('snippet', 'channelId', type_='str'), + 'channel_title': AttributeDef('snippet', 'channelTitle', type_='str'), } - def get_items(self): + @property + def items(self): api_params = { 'part': 'id,snippet', 'maxResults': 50, } return self.youtube.playlist_items(self.id, **api_params) - items = property(get_items) + @property + def channel(self): + return self.youtube.channel(self.channel_id) class PlaylistItem(Resource): @@ -912,9 +1047,9 @@ class PlaylistItem(Resource): 'resource_video_id': AttributeDef('snippet', ['resourceId', 'videoId'], type_='str'), } - def get_video(self): + @property + def video(self): if self.resource_kind == 'youtube#video': return self.youtube.video(self.resource_video_id) return None - video = property(get_video)