286 lines
8.1 KiB
Python
286 lines
8.1 KiB
Python
from googleapiclient.discovery import build
|
|
from googleapiclient.errors import Error as APIError
|
|
from google_auth_oauthlib.flow import InstalledAppFlow
|
|
from django.conf import settings
|
|
import re
|
|
from YtManagerApp.utils.iterutils import as_chunks
|
|
|
|
API_SERVICE_NAME = 'youtube'
|
|
API_VERSION = 'v3'
|
|
|
|
YOUTUBE_LIST_LIMIT = 50
|
|
|
|
|
|
class YoutubeException(Exception):
|
|
pass
|
|
|
|
|
|
class YoutubeInvalidURLException(YoutubeException):
|
|
pass
|
|
|
|
|
|
class YoutubeChannelNotFoundException(YoutubeException):
|
|
pass
|
|
|
|
|
|
class YoutubeUserNotFoundException(YoutubeException):
|
|
pass
|
|
|
|
|
|
class YoutubePlaylistNotFoundException(YoutubeException):
|
|
pass
|
|
|
|
|
|
class YoutubeVideoNotFoundException(YoutubeException):
|
|
pass
|
|
|
|
|
|
class YoutubeChannelInfo(object):
|
|
def __init__(self, result_dict):
|
|
self.__id = result_dict['id']
|
|
self.__snippet = result_dict['snippet']
|
|
self.__contentDetails = result_dict['contentDetails']
|
|
|
|
def getId(self):
|
|
return self.__id
|
|
|
|
def getTitle(self):
|
|
return self.__snippet['title']
|
|
|
|
def getDescription(self):
|
|
return self.__snippet['description']
|
|
|
|
def getCustomUrl(self):
|
|
try:
|
|
return self.__snippet['customUrl']
|
|
except KeyError:
|
|
return None
|
|
|
|
def getDefaultThumbnailUrl(self):
|
|
return self.__snippet['thumbnails']['default']['url']
|
|
|
|
def getBestThumbnailUrl(self):
|
|
best_url = None
|
|
best_res = 0
|
|
for _, thumb in self.__snippet['thumbnails'].items():
|
|
res = thumb['width'] * thumb['height']
|
|
if res > best_res:
|
|
best_res = res
|
|
best_url = thumb['url']
|
|
return best_url
|
|
|
|
def getUploadsPlaylist(self):
|
|
return self.__contentDetails['relatedPlaylists']['uploads']
|
|
|
|
|
|
class YoutubePlaylistInfo(object):
|
|
def __init__(self, result_dict):
|
|
self.__id = result_dict['id']
|
|
self.__snippet = result_dict['snippet']
|
|
|
|
def getId(self):
|
|
return self.__id
|
|
|
|
def getChannelId(self):
|
|
return self.__snippet['channelId']
|
|
|
|
def getTitle(self):
|
|
return self.__snippet['title']
|
|
|
|
def getDescription(self):
|
|
return self.__snippet['description']
|
|
|
|
def getDefaultThumbnailUrl(self):
|
|
return self.__snippet['thumbnails']['default']['url']
|
|
|
|
def getBestThumbnailUrl(self):
|
|
best_url = None
|
|
best_res = 0
|
|
for _, thumb in self.__snippet['thumbnails'].items():
|
|
res = thumb['width'] * thumb['height']
|
|
if res > best_res:
|
|
best_res = res
|
|
best_url = thumb['url']
|
|
return best_url
|
|
|
|
|
|
class YoutubePlaylistItem(object):
|
|
def __init__(self, result_dict):
|
|
self.__snippet = result_dict['snippet']
|
|
|
|
def getVideoId(self):
|
|
return self.__snippet['resourceId']['videoId']
|
|
|
|
def getPublishDate(self):
|
|
return self.__snippet['publishedAt']
|
|
|
|
def getTitle(self):
|
|
return self.__snippet['title']
|
|
|
|
def getDescription(self):
|
|
return self.__snippet['description']
|
|
|
|
def getDefaultThumbnailUrl(self):
|
|
return self.__snippet['thumbnails']['default']['url']
|
|
|
|
def getBestThumbnailUrl(self):
|
|
best_url = None
|
|
best_res = 0
|
|
for _, thumb in self.__snippet['thumbnails'].items():
|
|
res = thumb['width'] * thumb['height']
|
|
if res > best_res:
|
|
best_res = res
|
|
best_url = thumb['url']
|
|
return best_url
|
|
|
|
def getPlaylistIndex(self):
|
|
return self.__snippet['position']
|
|
|
|
|
|
class YoutubeVideoStatistics(object):
|
|
def __init__(self, result_dict):
|
|
self.id = result_dict['id']
|
|
self.stats = result_dict['statistics']
|
|
|
|
def get_view_count(self):
|
|
return int(self.stats['viewCount'])
|
|
|
|
def get_like_count(self):
|
|
return int(self.stats['likeCount'])
|
|
|
|
def get_dislike_count(self):
|
|
return int(self.stats['dislikeCount'])
|
|
|
|
def get_favorite_count(self):
|
|
return int(self.stats['favoriteCount'])
|
|
|
|
def get_comment_count(self):
|
|
return int(self.stats['commentCount'])
|
|
|
|
|
|
class YoutubeAPI(object):
|
|
def __init__(self, service):
|
|
self.service = service
|
|
|
|
@staticmethod
|
|
def build_public() -> 'YoutubeAPI':
|
|
service = build(API_SERVICE_NAME, API_VERSION, developerKey=settings.YOUTUBE_API_KEY)
|
|
return YoutubeAPI(service)
|
|
|
|
@staticmethod
|
|
def parse_channel_url(url):
|
|
"""
|
|
Parses given channel url, returns a tuple of the form (type, value), where type can be one of:
|
|
* channel_id
|
|
* channel_custom
|
|
* user
|
|
* playlist_id
|
|
:param url: URL to parse
|
|
:return: (type, value) tuple
|
|
"""
|
|
match = re.search(r'youtube\.com/.*[&?]list=([^?&/]+)', url)
|
|
if match:
|
|
return 'playlist_id', match.group(1)
|
|
|
|
match = re.search(r'youtube\.com/user/([^?&/]+)', url)
|
|
if match:
|
|
return 'user', match.group(1)
|
|
|
|
match = re.search(r'youtube\.com/channel/([^?&/]+)', url)
|
|
if match:
|
|
return 'channel_id', match.group(1)
|
|
|
|
match = re.search(r'youtube\.com/(?:c/)?([^?&/]+)', url)
|
|
if match:
|
|
return 'channel_custom', match.group(1)
|
|
|
|
raise YoutubeInvalidURLException('Unrecognized URL format!')
|
|
|
|
def get_playlist_info(self, list_id) -> YoutubePlaylistInfo:
|
|
result = self.service.playlists()\
|
|
.list(part='snippet', id=list_id)\
|
|
.execute()
|
|
|
|
if len(result['items']) <= 0:
|
|
raise YoutubePlaylistNotFoundException("Invalid playlist ID.")
|
|
|
|
return YoutubePlaylistInfo(result['items'][0])
|
|
|
|
def get_channel_info_by_username(self, user) -> YoutubeChannelInfo:
|
|
result = self.service.channels()\
|
|
.list(part='snippet,contentDetails', forUsername=user)\
|
|
.execute()
|
|
|
|
if len(result['items']) <= 0:
|
|
raise YoutubeUserNotFoundException('Invalid user.')
|
|
|
|
return YoutubeChannelInfo(result['items'][0])
|
|
|
|
def get_channel_info(self, channel_id) -> YoutubeChannelInfo:
|
|
result = self.service.channels()\
|
|
.list(part='snippet,contentDetails', id=channel_id)\
|
|
.execute()
|
|
|
|
if len(result['items']) <= 0:
|
|
raise YoutubeChannelNotFoundException('Invalid channel ID.')
|
|
|
|
return YoutubeChannelInfo(result['items'][0])
|
|
|
|
def search_channel(self, custom) -> str:
|
|
result = self.service.search()\
|
|
.list(part='id', q=custom, type='channel')\
|
|
.execute()
|
|
|
|
if len(result['items']) <= 0:
|
|
raise YoutubeChannelNotFoundException('Could not find channel!')
|
|
|
|
channel_result = result['items'][0]
|
|
return channel_result['id']['channelId']
|
|
|
|
def list_playlist_videos(self, playlist_id):
|
|
kwargs = {
|
|
"part": "snippet",
|
|
"maxResults": 50,
|
|
"playlistId": playlist_id
|
|
}
|
|
last_page = False
|
|
|
|
while not last_page:
|
|
result = self.service.playlistItems()\
|
|
.list(**kwargs)\
|
|
.execute()
|
|
|
|
for item in result['items']:
|
|
yield YoutubePlaylistItem(item)
|
|
|
|
if 'nextPageToken' in result:
|
|
kwargs['pageToken'] = result['nextPageToken']
|
|
else:
|
|
last_page = True
|
|
|
|
def get_single_video_stats(self, video_id) -> YoutubeVideoStatistics:
|
|
result = list(self.get_video_stats([video_id]))
|
|
if len(result) < 1:
|
|
raise YoutubeVideoNotFoundException('Could not find video with id ' + video_id + '!')
|
|
return result[0]
|
|
|
|
def get_video_stats(self, video_id_list):
|
|
for chunk in as_chunks(video_id_list, YOUTUBE_LIST_LIMIT):
|
|
kwargs = {
|
|
"part": "statistics",
|
|
"maxResults": YOUTUBE_LIST_LIMIT,
|
|
"id": ','.join(chunk)
|
|
}
|
|
result = self.service.videos()\
|
|
.list(**kwargs)\
|
|
.execute()
|
|
|
|
for item in result['items']:
|
|
yield YoutubeVideoStatistics(item)
|
|
|
|
# @staticmethod
|
|
# def build_oauth() -> 'YoutubeAPI':
|
|
# flow =
|
|
# credentials =
|
|
# service = build(API_SERVICE_NAME, API_VERSION, credentials)
|