mirror of
https://github.com/chibicitiberiu/ytsm.git
synced 2024-02-24 05:43:31 +00:00
Integrated pytaw library for youtube API.
This commit is contained in:
@ -1,32 +0,0 @@
|
||||
import itertools
|
||||
from typing import Iterable
|
||||
|
||||
|
||||
def first_true(*args, default=False, pred=None):
|
||||
"""Returns the first true value in the iterable.
|
||||
|
||||
If no true value is found, returns *default*
|
||||
|
||||
If *pred* is not None, returns the first item
|
||||
for which pred(item) is true.
|
||||
|
||||
"""
|
||||
# first_true([a,b,c], x) --> a or b or c or x
|
||||
# first_true([a,b], x, f) --> a if f(a) else b if f(b) else x
|
||||
return next(filter(pred, args), default)
|
||||
|
||||
|
||||
def as_chunks(iterable: Iterable, chunk_size: int):
|
||||
"""
|
||||
Iterates an iterable in chunks of chunk_size elements.
|
||||
:param iterable: An iterable containing items to iterate.
|
||||
:param chunk_size: Chunk size
|
||||
:return: Returns a generator which will yield chunks of size chunk_size
|
||||
"""
|
||||
|
||||
it = iter(iterable)
|
||||
while True:
|
||||
chunk = tuple(itertools.islice(it, chunk_size))
|
||||
if not chunk:
|
||||
return
|
||||
yield chunk
|
@ -1,285 +1,48 @@
|
||||
from googleapiclient.discovery import build
|
||||
from googleapiclient.errors import Error as APIError
|
||||
from google_auth_oauthlib.flow import InstalledAppFlow
|
||||
from django.conf import settings
|
||||
import re
|
||||
from YtManagerApp.utils.iterutils import as_chunks
|
||||
|
||||
API_SERVICE_NAME = 'youtube'
|
||||
API_VERSION = 'v3'
|
||||
|
||||
YOUTUBE_LIST_LIMIT = 50
|
||||
from external.pytaw.pytaw.youtube import YouTube, Channel, Playlist, PlaylistItem, Thumbnail, InvalidURL, Resource, Video
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class YoutubeException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class YoutubeInvalidURLException(YoutubeException):
|
||||
pass
|
||||
|
||||
|
||||
class YoutubeChannelNotFoundException(YoutubeException):
|
||||
pass
|
||||
|
||||
|
||||
class YoutubeUserNotFoundException(YoutubeException):
|
||||
pass
|
||||
|
||||
|
||||
class YoutubePlaylistNotFoundException(YoutubeException):
|
||||
pass
|
||||
|
||||
|
||||
class YoutubeVideoNotFoundException(YoutubeException):
|
||||
pass
|
||||
|
||||
|
||||
class YoutubeChannelInfo(object):
|
||||
def __init__(self, result_dict):
|
||||
self.__id = result_dict['id']
|
||||
self.__snippet = result_dict['snippet']
|
||||
self.__contentDetails = result_dict['contentDetails']
|
||||
|
||||
def getId(self):
|
||||
return self.__id
|
||||
|
||||
def getTitle(self):
|
||||
return self.__snippet['title']
|
||||
|
||||
def getDescription(self):
|
||||
return self.__snippet['description']
|
||||
|
||||
def getCustomUrl(self):
|
||||
try:
|
||||
return self.__snippet['customUrl']
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
def getDefaultThumbnailUrl(self):
|
||||
return self.__snippet['thumbnails']['default']['url']
|
||||
|
||||
def getBestThumbnailUrl(self):
|
||||
best_url = None
|
||||
best_res = 0
|
||||
for _, thumb in self.__snippet['thumbnails'].items():
|
||||
res = thumb['width'] * thumb['height']
|
||||
if res > best_res:
|
||||
best_res = res
|
||||
best_url = thumb['url']
|
||||
return best_url
|
||||
|
||||
def getUploadsPlaylist(self):
|
||||
return self.__contentDetails['relatedPlaylists']['uploads']
|
||||
|
||||
|
||||
class YoutubePlaylistInfo(object):
|
||||
def __init__(self, result_dict):
|
||||
self.__id = result_dict['id']
|
||||
self.__snippet = result_dict['snippet']
|
||||
|
||||
def getId(self):
|
||||
return self.__id
|
||||
|
||||
def getChannelId(self):
|
||||
return self.__snippet['channelId']
|
||||
|
||||
def getTitle(self):
|
||||
return self.__snippet['title']
|
||||
|
||||
def getDescription(self):
|
||||
return self.__snippet['description']
|
||||
|
||||
def getDefaultThumbnailUrl(self):
|
||||
return self.__snippet['thumbnails']['default']['url']
|
||||
|
||||
def getBestThumbnailUrl(self):
|
||||
best_url = None
|
||||
best_res = 0
|
||||
for _, thumb in self.__snippet['thumbnails'].items():
|
||||
res = thumb['width'] * thumb['height']
|
||||
if res > best_res:
|
||||
best_res = res
|
||||
best_url = thumb['url']
|
||||
return best_url
|
||||
|
||||
|
||||
class YoutubePlaylistItem(object):
|
||||
def __init__(self, result_dict):
|
||||
self.__snippet = result_dict['snippet']
|
||||
|
||||
def getVideoId(self):
|
||||
return self.__snippet['resourceId']['videoId']
|
||||
|
||||
def getPublishDate(self):
|
||||
return self.__snippet['publishedAt']
|
||||
|
||||
def getTitle(self):
|
||||
return self.__snippet['title']
|
||||
|
||||
def getDescription(self):
|
||||
return self.__snippet['description']
|
||||
|
||||
def getDefaultThumbnailUrl(self):
|
||||
return self.__snippet['thumbnails']['default']['url']
|
||||
|
||||
def getBestThumbnailUrl(self):
|
||||
best_url = None
|
||||
best_res = 0
|
||||
for _, thumb in self.__snippet['thumbnails'].items():
|
||||
res = thumb['width'] * thumb['height']
|
||||
if res > best_res:
|
||||
best_res = res
|
||||
best_url = thumb['url']
|
||||
return best_url
|
||||
|
||||
def getPlaylistIndex(self):
|
||||
return self.__snippet['position']
|
||||
|
||||
|
||||
class YoutubeVideoStatistics(object):
|
||||
def __init__(self, result_dict):
|
||||
self.id = result_dict['id']
|
||||
self.stats = result_dict['statistics']
|
||||
|
||||
def get_view_count(self):
|
||||
return int(self.stats['viewCount'])
|
||||
|
||||
def get_like_count(self):
|
||||
return int(self.stats['likeCount'])
|
||||
|
||||
def get_dislike_count(self):
|
||||
return int(self.stats['dislikeCount'])
|
||||
|
||||
def get_favorite_count(self):
|
||||
return int(self.stats['favoriteCount'])
|
||||
|
||||
def get_comment_count(self):
|
||||
return int(self.stats['commentCount'])
|
||||
|
||||
|
||||
class YoutubeAPI(object):
|
||||
def __init__(self, service):
|
||||
self.service = service
|
||||
class YoutubeAPI(YouTube):
|
||||
|
||||
@staticmethod
|
||||
def build_public() -> 'YoutubeAPI':
|
||||
service = build(API_SERVICE_NAME, API_VERSION, developerKey=settings.YOUTUBE_API_KEY)
|
||||
return YoutubeAPI(service)
|
||||
|
||||
@staticmethod
|
||||
def parse_channel_url(url):
|
||||
"""
|
||||
Parses given channel url, returns a tuple of the form (type, value), where type can be one of:
|
||||
* channel_id
|
||||
* channel_custom
|
||||
* user
|
||||
* playlist_id
|
||||
:param url: URL to parse
|
||||
:return: (type, value) tuple
|
||||
"""
|
||||
match = re.search(r'youtube\.com/.*[&?]list=([^?&/]+)', url)
|
||||
if match:
|
||||
return 'playlist_id', match.group(1)
|
||||
|
||||
match = re.search(r'youtube\.com/user/([^?&/]+)', url)
|
||||
if match:
|
||||
return 'user', match.group(1)
|
||||
|
||||
match = re.search(r'youtube\.com/channel/([^?&/]+)', url)
|
||||
if match:
|
||||
return 'channel_id', match.group(1)
|
||||
|
||||
match = re.search(r'youtube\.com/(?:c/)?([^?&/]+)', url)
|
||||
if match:
|
||||
return 'channel_custom', match.group(1)
|
||||
|
||||
raise YoutubeInvalidURLException('Unrecognized URL format!')
|
||||
|
||||
def get_playlist_info(self, list_id) -> YoutubePlaylistInfo:
|
||||
result = self.service.playlists()\
|
||||
.list(part='snippet', id=list_id)\
|
||||
.execute()
|
||||
|
||||
if len(result['items']) <= 0:
|
||||
raise YoutubePlaylistNotFoundException("Invalid playlist ID.")
|
||||
|
||||
return YoutubePlaylistInfo(result['items'][0])
|
||||
|
||||
def get_channel_info_by_username(self, user) -> YoutubeChannelInfo:
|
||||
result = self.service.channels()\
|
||||
.list(part='snippet,contentDetails', forUsername=user)\
|
||||
.execute()
|
||||
|
||||
if len(result['items']) <= 0:
|
||||
raise YoutubeUserNotFoundException('Invalid user.')
|
||||
|
||||
return YoutubeChannelInfo(result['items'][0])
|
||||
|
||||
def get_channel_info(self, channel_id) -> YoutubeChannelInfo:
|
||||
result = self.service.channels()\
|
||||
.list(part='snippet,contentDetails', id=channel_id)\
|
||||
.execute()
|
||||
|
||||
if len(result['items']) <= 0:
|
||||
raise YoutubeChannelNotFoundException('Invalid channel ID.')
|
||||
|
||||
return YoutubeChannelInfo(result['items'][0])
|
||||
|
||||
def search_channel(self, custom) -> str:
|
||||
result = self.service.search()\
|
||||
.list(part='id', q=custom, type='channel')\
|
||||
.execute()
|
||||
|
||||
if len(result['items']) <= 0:
|
||||
raise YoutubeChannelNotFoundException('Could not find channel!')
|
||||
|
||||
channel_result = result['items'][0]
|
||||
return channel_result['id']['channelId']
|
||||
|
||||
def list_playlist_videos(self, playlist_id):
|
||||
kwargs = {
|
||||
"part": "snippet",
|
||||
"maxResults": 50,
|
||||
"playlistId": playlist_id
|
||||
}
|
||||
last_page = False
|
||||
|
||||
while not last_page:
|
||||
result = self.service.playlistItems()\
|
||||
.list(**kwargs)\
|
||||
.execute()
|
||||
|
||||
for item in result['items']:
|
||||
yield YoutubePlaylistItem(item)
|
||||
|
||||
if 'nextPageToken' in result:
|
||||
kwargs['pageToken'] = result['nextPageToken']
|
||||
else:
|
||||
last_page = True
|
||||
|
||||
def get_single_video_stats(self, video_id) -> YoutubeVideoStatistics:
|
||||
result = list(self.get_video_stats([video_id]))
|
||||
if len(result) < 1:
|
||||
raise YoutubeVideoNotFoundException('Could not find video with id ' + video_id + '!')
|
||||
return result[0]
|
||||
|
||||
def get_video_stats(self, video_id_list):
|
||||
for chunk in as_chunks(video_id_list, YOUTUBE_LIST_LIMIT):
|
||||
kwargs = {
|
||||
"part": "statistics",
|
||||
"maxResults": YOUTUBE_LIST_LIMIT,
|
||||
"id": ','.join(chunk)
|
||||
}
|
||||
result = self.service.videos()\
|
||||
.list(**kwargs)\
|
||||
.execute()
|
||||
|
||||
for item in result['items']:
|
||||
yield YoutubeVideoStatistics(item)
|
||||
return YoutubeAPI(key=settings.YOUTUBE_API_KEY)
|
||||
|
||||
# @staticmethod
|
||||
# def build_oauth() -> 'YoutubeAPI':
|
||||
# flow =
|
||||
# credentials =
|
||||
# service = build(API_SERVICE_NAME, API_VERSION, credentials)
|
||||
|
||||
|
||||
def default_thumbnail(resource: Resource) -> Optional[Thumbnail]:
|
||||
"""
|
||||
Gets the default thumbnail for a resource.
|
||||
Searches in the list of thumbnails for one with the label 'default', or takes the first one.
|
||||
:param resource:
|
||||
:return:
|
||||
"""
|
||||
thumbs = getattr(resource, 'thumbnails', None)
|
||||
|
||||
if thumbs is None or len(thumbs) <= 0:
|
||||
return None
|
||||
|
||||
return next(
|
||||
(i for i in thumbs if i.id == 'default'),
|
||||
thumbs[0]
|
||||
)
|
||||
|
||||
|
||||
def best_thumbnail(resource: Resource) -> Optional[Thumbnail]:
|
||||
"""
|
||||
Gets the best thumbnail available for a resource.
|
||||
:param resource:
|
||||
:return:
|
||||
"""
|
||||
thumbs = getattr(resource, 'thumbnails', None)
|
||||
|
||||
if thumbs is None or len(thumbs) <= 0:
|
||||
return None
|
||||
|
||||
return max(thumbs, key=lambda t: t.width * t.height)
|
Reference in New Issue
Block a user