Revert "Integrated latest pytaw, which adds support for feed URLs."

This reverts commit 37ec395f3168cd2b56f70d5628a72b29364f77fa.
This commit is contained in:
Tiberiu Chibici 2018-11-02 22:48:27 +02:00
parent 37ec395f31
commit d95cdcdd7f

View File

@ -1,13 +1,12 @@
from datetime import timedelta
import os
import time
import logging
import configparser
import collections
import configparser
import itertools
from pprint import pprint, pformat
from abc import ABC, abstractmethod
import logging
import os
from urllib.parse import urlsplit, parse_qs
import typing
from abc import ABC, abstractmethod
from datetime import timedelta
import googleapiclient.discovery
from oauth2client.client import AccessTokenCredentials
@ -28,6 +27,11 @@ class DataMissing(Exception):
pass
class InvalidURL(Exception):
"""Exception raised if an URL is not valid."""
pass
class YouTube(object):
"""The interface to the YouTube API.
@ -165,9 +169,107 @@ class YouTube(object):
query = Query(self, 'videos', api_params)
response_list.append(ListResponse(query))
return itertools.chain(response_list)
return itertools.chain(*response_list)
def channel(self, id, **kwargs):
def parse_url(self, url: str) -> dict:
"""
Parses a YouTube URL, and attempts to identify what resource it refers to.
:param url: URL to parse
:return: Returns a dictionary, containing the url 'type', and the url resource ('video', 'playlist', 'channel',
'channel_custom', 'username')
"""
result = {'type': 'unknown'}
url_spl = urlsplit(url)
url_path = url_spl.path.split('/')
url_query = parse_qs(url_spl.query)
if url_spl.netloc.endswith('youtube.com'):
# http://www.youtube.com/watch?v=-wtIMTCHWuI
if url_path[1] == 'watch':
result['type'] = 'video'
result['video'] = url_query['v'][0]
if 'list' in url_query:
result['playlist'] = url_query['list'][0]
# http://www.youtube.com/v/-wtIMTCHWuI?version=3&autohide=1
# https://www.youtube.com/embed/M7lc1UVf-VE
elif url_path[1] == 'v':
result['type'] = 'video'
result['video'] = url_path[2]
if 'list' in url_query:
result['playlist'] = url_query['list'][0]
# https://www.youtube.com/playlist?list=PLJRbJuI_csVDXhgRJ1xv6z-Igeb7CKroe
elif url_path[1] == 'playlist':
result['type'] = 'playlist'
result['playlist'] = url_query['list'][0]
# https://www.youtube.com/channel/UC0QHWhjbe5fGJEPz3sVb6nw
elif url_path[1] == 'channel':
result['type'] = 'channel'
result['channel'] = url_path[2]
# https://www.youtube.com/c/LinusTechTips
elif url_path[1] == 'c':
result['type'] = 'channel_custom'
result['channel_custom'] = url_path[1]
# https://www.youtube.com/user/LinusTechTips
elif url_path[1] == 'user':
result['type'] = 'user'
result['username'] = url_path[2]
# http://www.youtube.com/oembed?url=http%3A//www.youtube.com/watch?v%3D-wtIMTCHWuI&format=json
elif url_path[1] == 'oembed':
return self.parse_url(url_query['url'][0])
# http://www.youtube.com/attribution_link?a=JdfC0C9V6ZI&u=%2Fwatch%3Fv%3DEhxJLojIE_o%26feature%3Dshare
elif url_path[1] == 'attribution_link':
return self.parse_url('http://youtube.com/' + url_query['u'][0])
# https://www.youtube.com/results?search_query=test
elif url_path[1] == 'search' or url_path[1] == 'results':
result['type'] = 'search'
result['query'] = url_query['search_query'][0]
# Custom channel URLs might have the format https://www.youtube.com/LinusTechTips, which are pretty much
# impossible to handle properly
else:
raise InvalidURL('Unrecognized URL format: ' + url)
# http://youtu.be/-wtIMTCHWuI
elif url_spl.netloc == 'youtu.be':
result['type'] = 'video'
result['video'] = url_path[1]
# https://youtube.googleapis.com/v/My2FRPA3Gf8
elif url_spl.netloc == 'youtube.googleapis.com':
if url_path[1] == 'v':
result['type'] = 'video'
result['video'] = url_path[2]
else:
raise InvalidURL('Unrecognized URL format: ' + url)
else:
raise InvalidURL('Unrecognized URL format: ' + url)
return result
def __find_channel_by_custom_url(self, custom_part):
# See https://stackoverflow.com/a/37947865
# Using the YT API, the only way to obtain a channel using a custom URL that we know of is to search for it.
# Another option (which might be more reliable) could be scraping the page
api_params = {
'part': 'id',
'type': 'channel',
'q': custom_part,
}
return self.search(**api_params).first()
def channel(self, channel_id=None, username=None, url=None, **kwargs):
"""Fetch a Channel instance.
Additional API parameters should be given as keyword arguments.
@ -178,14 +280,31 @@ class YouTube(object):
"""
api_params = {
'part': 'id',
'id': id,
}
if channel_id is not None:
api_params['id'] = channel_id
elif username is not None:
api_params['forUsername'] = username
elif url is not None:
parse = self.parse_url(url)
if parse['type'] == 'channel':
api_params['id'] = parse['channel']
elif parse['type'] == 'user':
api_params['forUsername'] = parse['username']
elif parse['type'] == 'channel_custom':
return self.__find_channel_by_custom_url(parse['channel_custom'])
else:
raise InvalidURL('Can\'t extract channel from given URL.')
else:
raise ValueError('Please specify exactly one of: channel_id, username, url')
api_params.update(kwargs)
query = Query(self, 'channels', api_params)
return ListResponse(query).first()
def playlist(self, id, **kwargs):
def playlist(self, id=None, url=None, **kwargs):
"""Fetch a Playlist instance.
Additional API parameters should be given as keyword arguments.
@ -196,8 +315,17 @@ class YouTube(object):
"""
api_params = {
'part': 'id',
'id': id,
}
if id is not None:
api_params['id'] = id
elif url is not None:
parse = self.parse_url(url)
if 'playlist' in parse:
api_params['id'] = parse['playlist']
else:
raise ValueError('Please specify exactly one of: id, url')
api_params.update(kwargs)
query = Query(self, 'playlists', api_params)
@ -794,6 +922,7 @@ class Video(Resource):
'tags': AttributeDef('snippet', 'tags', type_='list'),
'channel_id': AttributeDef('snippet', 'channelId', type_='str'),
'channel_title': AttributeDef('snippet', 'channelTitle', type_='str'),
'thumbnails': AttributeDef('snippet', 'thumbnails', type_='thumbnails'),
#
# contentDetails
'duration': AttributeDef('contentDetails', 'duration', type_='timedelta'),
@ -834,6 +963,7 @@ class Channel(Resource):
'published_at': AttributeDef('snippet', 'publishedAt', type_='datetime'),
'thumbnails': AttributeDef('snippet', 'thumbnails', type_='thumbnails'),
'country': AttributeDef('snippet', 'country', type_='str'),
'custom_url': AttributeDef('snippet', 'customUrl', type_='str'),
#
# statistics
'n_videos': AttributeDef('statistics', 'videoCount', type_='int'),
@ -841,18 +971,17 @@ class Channel(Resource):
'n_views': AttributeDef('statistics', 'viewCount', type_='int'),
'n_comments': AttributeDef('statistics', 'commentCount', type_='int'),
#
# playlists
# content details - playlists
'_related_playlists': AttributeDef('contentDetails', 'relatedPlaylists')
}
def get_uploads_playlist(self):
@property
def uploads_playlist(self):
playlists = self._related_playlists
if 'uploads' in playlists:
return self.youtube.playlist(playlists['uploads'])
return None
uploads_playlist = property(get_uploads_playlist)
def most_recent_upload(self):
response = self.most_recent_uploads(n=1)
return response[0]
@ -882,16 +1011,22 @@ class Playlist(Resource):
'title': AttributeDef('snippet', 'title'),
'description': AttributeDef('snippet', 'description'),
'published_at': AttributeDef('snippet', 'publishedAt', type_='datetime'),
'thumbnails': AttributeDef('snippet', 'thumbnails', type_='thumbnails'),
'channel_id': AttributeDef('snippet', 'channelId', type_='str'),
'channel_title': AttributeDef('snippet', 'channelTitle', type_='str'),
}
def get_items(self):
@property
def items(self):
api_params = {
'part': 'id,snippet',
'maxResults': 50,
}
return self.youtube.playlist_items(self.id, **api_params)
items = property(get_items)
@property
def channel(self):
return self.youtube.channel(self.channel_id)
class PlaylistItem(Resource):
@ -912,9 +1047,9 @@ class PlaylistItem(Resource):
'resource_video_id': AttributeDef('snippet', ['resourceId', 'videoId'], type_='str'),
}
def get_video(self):
@property
def video(self):
if self.resource_kind == 'youtube#video':
return self.youtube.video(self.resource_video_id)
return None
video = property(get_video)