Integrated latest pytaw, which adds support for feed URLs.

This commit is contained in:
Tiberiu Chibici 2018-11-02 22:45:20 +02:00
parent d989cf4132
commit 37ec395f31

View File

@ -1,12 +1,13 @@
import collections
import configparser
import itertools
import logging
import os
from urllib.parse import urlsplit, parse_qs
import typing
from abc import ABC, abstractmethod
from datetime import timedelta from datetime import timedelta
import os
import time
import logging
import configparser
import collections
import itertools
from pprint import pprint, pformat
from abc import ABC, abstractmethod
import typing
import googleapiclient.discovery import googleapiclient.discovery
from oauth2client.client import AccessTokenCredentials from oauth2client.client import AccessTokenCredentials
@ -27,11 +28,6 @@ class DataMissing(Exception):
pass pass
class InvalidURL(Exception):
"""Exception raised if an URL is not valid."""
pass
class YouTube(object): class YouTube(object):
"""The interface to the YouTube API. """The interface to the YouTube API.
@ -169,107 +165,9 @@ class YouTube(object):
query = Query(self, 'videos', api_params) query = Query(self, 'videos', api_params)
response_list.append(ListResponse(query)) response_list.append(ListResponse(query))
return itertools.chain(*response_list) return itertools.chain(response_list)
def parse_url(self, url: str) -> dict: def channel(self, id, **kwargs):
"""
Parses a YouTube URL, and attempts to identify what resource it refers to.
:param url: URL to parse
:return: Returns a dictionary, containing the url 'type', and the url resource ('video', 'playlist', 'channel',
'channel_custom', 'username')
"""
result = {'type': 'unknown'}
url_spl = urlsplit(url)
url_path = url_spl.path.split('/')
url_query = parse_qs(url_spl.query)
if url_spl.netloc.endswith('youtube.com'):
# http://www.youtube.com/watch?v=-wtIMTCHWuI
if url_path[1] == 'watch':
result['type'] = 'video'
result['video'] = url_query['v'][0]
if 'list' in url_query:
result['playlist'] = url_query['list'][0]
# http://www.youtube.com/v/-wtIMTCHWuI?version=3&autohide=1
# https://www.youtube.com/embed/M7lc1UVf-VE
elif url_path[1] == 'v':
result['type'] = 'video'
result['video'] = url_path[2]
if 'list' in url_query:
result['playlist'] = url_query['list'][0]
# https://www.youtube.com/playlist?list=PLJRbJuI_csVDXhgRJ1xv6z-Igeb7CKroe
elif url_path[1] == 'playlist':
result['type'] = 'playlist'
result['playlist'] = url_query['list'][0]
# https://www.youtube.com/channel/UC0QHWhjbe5fGJEPz3sVb6nw
elif url_path[1] == 'channel':
result['type'] = 'channel'
result['channel'] = url_path[2]
# https://www.youtube.com/c/LinusTechTips
elif url_path[1] == 'c':
result['type'] = 'channel_custom'
result['channel_custom'] = url_path[1]
# https://www.youtube.com/user/LinusTechTips
elif url_path[1] == 'user':
result['type'] = 'user'
result['username'] = url_path[2]
# http://www.youtube.com/oembed?url=http%3A//www.youtube.com/watch?v%3D-wtIMTCHWuI&format=json
elif url_path[1] == 'oembed':
return self.parse_url(url_query['url'][0])
# http://www.youtube.com/attribution_link?a=JdfC0C9V6ZI&u=%2Fwatch%3Fv%3DEhxJLojIE_o%26feature%3Dshare
elif url_path[1] == 'attribution_link':
return self.parse_url('http://youtube.com/' + url_query['u'][0])
# https://www.youtube.com/results?search_query=test
elif url_path[1] == 'search' or url_path[1] == 'results':
result['type'] = 'search'
result['query'] = url_query['search_query'][0]
# Custom channel URLs might have the format https://www.youtube.com/LinusTechTips, which are pretty much
# impossible to handle properly
else:
raise InvalidURL('Unrecognized URL format: ' + url)
# http://youtu.be/-wtIMTCHWuI
elif url_spl.netloc == 'youtu.be':
result['type'] = 'video'
result['video'] = url_path[1]
# https://youtube.googleapis.com/v/My2FRPA3Gf8
elif url_spl.netloc == 'youtube.googleapis.com':
if url_path[1] == 'v':
result['type'] = 'video'
result['video'] = url_path[2]
else:
raise InvalidURL('Unrecognized URL format: ' + url)
else:
raise InvalidURL('Unrecognized URL format: ' + url)
return result
def __find_channel_by_custom_url(self, custom_part):
# See https://stackoverflow.com/a/37947865
# Using the YT API, the only way to obtain a channel using a custom URL that we know of is to search for it.
# Another option (which might be more reliable) could be scraping the page
api_params = {
'part': 'id',
'type': 'channel',
'q': custom_part,
}
return self.search(**api_params).first()
def channel(self, channel_id=None, username=None, url=None, **kwargs):
"""Fetch a Channel instance. """Fetch a Channel instance.
Additional API parameters should be given as keyword arguments. Additional API parameters should be given as keyword arguments.
@ -280,31 +178,14 @@ class YouTube(object):
""" """
api_params = { api_params = {
'part': 'id', 'part': 'id',
'id': id,
} }
if channel_id is not None:
api_params['id'] = channel_id
elif username is not None:
api_params['forUsername'] = username
elif url is not None:
parse = self.parse_url(url)
if parse['type'] == 'channel':
api_params['id'] = parse['channel']
elif parse['type'] == 'user':
api_params['forUsername'] = parse['username']
elif parse['type'] == 'channel_custom':
return self.__find_channel_by_custom_url(parse['channel_custom'])
else:
raise InvalidURL('Can\'t extract channel from given URL.')
else:
raise ValueError('Please specify exactly one of: channel_id, username, url')
api_params.update(kwargs) api_params.update(kwargs)
query = Query(self, 'channels', api_params) query = Query(self, 'channels', api_params)
return ListResponse(query).first() return ListResponse(query).first()
def playlist(self, id=None, url=None, **kwargs): def playlist(self, id, **kwargs):
"""Fetch a Playlist instance. """Fetch a Playlist instance.
Additional API parameters should be given as keyword arguments. Additional API parameters should be given as keyword arguments.
@ -315,17 +196,8 @@ class YouTube(object):
""" """
api_params = { api_params = {
'part': 'id', 'part': 'id',
'id': id,
} }
if id is not None:
api_params['id'] = id
elif url is not None:
parse = self.parse_url(url)
if 'playlist' in parse:
api_params['id'] = parse['playlist']
else:
raise ValueError('Please specify exactly one of: id, url')
api_params.update(kwargs) api_params.update(kwargs)
query = Query(self, 'playlists', api_params) query = Query(self, 'playlists', api_params)
@ -922,7 +794,6 @@ class Video(Resource):
'tags': AttributeDef('snippet', 'tags', type_='list'), 'tags': AttributeDef('snippet', 'tags', type_='list'),
'channel_id': AttributeDef('snippet', 'channelId', type_='str'), 'channel_id': AttributeDef('snippet', 'channelId', type_='str'),
'channel_title': AttributeDef('snippet', 'channelTitle', type_='str'), 'channel_title': AttributeDef('snippet', 'channelTitle', type_='str'),
'thumbnails': AttributeDef('snippet', 'thumbnails', type_='thumbnails'),
# #
# contentDetails # contentDetails
'duration': AttributeDef('contentDetails', 'duration', type_='timedelta'), 'duration': AttributeDef('contentDetails', 'duration', type_='timedelta'),
@ -963,7 +834,6 @@ class Channel(Resource):
'published_at': AttributeDef('snippet', 'publishedAt', type_='datetime'), 'published_at': AttributeDef('snippet', 'publishedAt', type_='datetime'),
'thumbnails': AttributeDef('snippet', 'thumbnails', type_='thumbnails'), 'thumbnails': AttributeDef('snippet', 'thumbnails', type_='thumbnails'),
'country': AttributeDef('snippet', 'country', type_='str'), 'country': AttributeDef('snippet', 'country', type_='str'),
'custom_url': AttributeDef('snippet', 'customUrl', type_='str'),
# #
# statistics # statistics
'n_videos': AttributeDef('statistics', 'videoCount', type_='int'), 'n_videos': AttributeDef('statistics', 'videoCount', type_='int'),
@ -971,17 +841,18 @@ class Channel(Resource):
'n_views': AttributeDef('statistics', 'viewCount', type_='int'), 'n_views': AttributeDef('statistics', 'viewCount', type_='int'),
'n_comments': AttributeDef('statistics', 'commentCount', type_='int'), 'n_comments': AttributeDef('statistics', 'commentCount', type_='int'),
# #
# content details - playlists # playlists
'_related_playlists': AttributeDef('contentDetails', 'relatedPlaylists') '_related_playlists': AttributeDef('contentDetails', 'relatedPlaylists')
} }
@property def get_uploads_playlist(self):
def uploads_playlist(self):
playlists = self._related_playlists playlists = self._related_playlists
if 'uploads' in playlists: if 'uploads' in playlists:
return self.youtube.playlist(playlists['uploads']) return self.youtube.playlist(playlists['uploads'])
return None return None
uploads_playlist = property(get_uploads_playlist)
def most_recent_upload(self): def most_recent_upload(self):
response = self.most_recent_uploads(n=1) response = self.most_recent_uploads(n=1)
return response[0] return response[0]
@ -1011,22 +882,16 @@ class Playlist(Resource):
'title': AttributeDef('snippet', 'title'), 'title': AttributeDef('snippet', 'title'),
'description': AttributeDef('snippet', 'description'), 'description': AttributeDef('snippet', 'description'),
'published_at': AttributeDef('snippet', 'publishedAt', type_='datetime'), 'published_at': AttributeDef('snippet', 'publishedAt', type_='datetime'),
'thumbnails': AttributeDef('snippet', 'thumbnails', type_='thumbnails'),
'channel_id': AttributeDef('snippet', 'channelId', type_='str'),
'channel_title': AttributeDef('snippet', 'channelTitle', type_='str'),
} }
@property def get_items(self):
def items(self):
api_params = { api_params = {
'part': 'id,snippet', 'part': 'id,snippet',
'maxResults': 50, 'maxResults': 50,
} }
return self.youtube.playlist_items(self.id, **api_params) return self.youtube.playlist_items(self.id, **api_params)
@property items = property(get_items)
def channel(self):
return self.youtube.channel(self.channel_id)
class PlaylistItem(Resource): class PlaylistItem(Resource):
@ -1047,9 +912,9 @@ class PlaylistItem(Resource):
'resource_video_id': AttributeDef('snippet', ['resourceId', 'videoId'], type_='str'), 'resource_video_id': AttributeDef('snippet', ['resourceId', 'videoId'], type_='str'),
} }
@property def get_video(self):
def video(self):
if self.resource_kind == 'youtube#video': if self.resource_kind == 'youtube#video':
return self.youtube.video(self.resource_video_id) return self.youtube.video(self.resource_video_id)
return None return None
video = property(get_video)