Big refactor

This commit is contained in:
2018-10-11 01:43:50 +03:00
parent 291da16461
commit ae77251883
42 changed files with 4311 additions and 264 deletions

View File

View File

@ -0,0 +1,114 @@
import os
import os.path
import re
from configparser import Interpolation, NoSectionError, NoOptionError, InterpolationMissingOptionError, \
InterpolationDepthError, InterpolationSyntaxError, ConfigParser
MAX_INTERPOLATION_DEPTH = 10
class ExtendedInterpolatorWithEnv(Interpolation):
"""Advanced variant of interpolation, supports the syntax used by
`zc.buildout'. Enables interpolation between sections.
This modified version also allows specifying environment variables
using ${env:...}, and allows adding additional options using 'set_additional_options'. """
_KEYCRE = re.compile(r"\$\{([^}]+)\}")
def __init__(self, **kwargs):
self.__kwargs = kwargs
def set_additional_options(self, **kwargs):
self.__kwargs = kwargs
def before_get(self, parser, section, option, value, defaults):
L = []
self._interpolate_some(parser, option, L, value, section, defaults, 1)
return ''.join(L)
def before_set(self, parser, section, option, value):
tmp_value = value.replace('$$', '') # escaped dollar signs
tmp_value = self._KEYCRE.sub('', tmp_value) # valid syntax
if '$' in tmp_value:
raise ValueError("invalid interpolation syntax in %r at "
"position %d" % (value, tmp_value.find('$')))
return value
def _resolve_option(self, option, defaults):
if option in self.__kwargs:
return self.__kwargs[option]
return defaults[option]
def _resolve_section_option(self, section, option, parser):
if section == 'env':
return os.getenv(option, '')
return parser.get(section, option, raw=True)
def _interpolate_some(self, parser, option, accum, rest, section, map,
depth):
rawval = parser.get(section, option, raw=True, fallback=rest)
if depth > MAX_INTERPOLATION_DEPTH:
raise InterpolationDepthError(option, section, rawval)
while rest:
p = rest.find("$")
if p < 0:
accum.append(rest)
return
if p > 0:
accum.append(rest[:p])
rest = rest[p:]
# p is no longer used
c = rest[1:2]
if c == "$":
accum.append("$")
rest = rest[2:]
elif c == "{":
m = self._KEYCRE.match(rest)
if m is None:
raise InterpolationSyntaxError(option, section,
"bad interpolation variable reference %r" % rest)
path = m.group(1).split(':')
rest = rest[m.end():]
sect = section
opt = option
try:
if len(path) == 1:
opt = parser.optionxform(path[0])
v = self._resolve_option(opt, map)
elif len(path) == 2:
sect = path[0]
opt = parser.optionxform(path[1])
v = self._resolve_section_option(sect, opt, parser)
else:
raise InterpolationSyntaxError(
option, section,
"More than one ':' found: %r" % (rest,))
except (KeyError, NoSectionError, NoOptionError):
raise InterpolationMissingOptionError(
option, section, rawval, ":".join(path)) from None
if "$" in v:
self._interpolate_some(parser, opt, accum, v, sect,
dict(parser.items(sect, raw=True)),
depth + 1)
else:
accum.append(v)
else:
raise InterpolationSyntaxError(
option, section,
"'$' must be followed by '$' or '{', "
"found: %r" % (rest,))
class ConfigParserWithEnv(ConfigParser):
_DEFAULT_INTERPOLATION = ExtendedInterpolatorWithEnv()
def set_additional_interpolation_options(self, **kwargs):
"""
Sets additional options to be used in interpolation.
Only works with ExtendedInterpolatorWithEnv
:param kwargs:
:return:
"""
if isinstance(super()._interpolation, ExtendedInterpolatorWithEnv):
super()._interpolation.set_additional_options(**kwargs)

View File

@ -0,0 +1,213 @@
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from django.conf import settings
import re
API_SERVICE_NAME = 'youtube'
API_VERSION = 'v3'
class YoutubeChannelInfo(object):
def __init__(self, result_dict):
self.__id = result_dict['id']
self.__snippet = result_dict['snippet']
self.__contentDetails = result_dict['contentDetails']
def getId(self):
return self.__id
def getTitle(self):
return self.__snippet['title']
def getDescription(self):
return self.__snippet['description']
def getCustomUrl(self):
return self.__snippet['customUrl']
def getDefaultThumbnailUrl(self):
return self.__snippet['thumbnails']['default']['url']
def getBestThumbnailUrl(self):
best_url = None
best_res = 0
for _, thumb in self.__snippet['thumbnails'].items():
res = thumb['width'] * thumb['height']
if res > best_res:
best_res = res
best_url = thumb['url']
return best_url
def getUploadsPlaylist(self):
return self.__contentDetails['relatedPlaylists']['uploads']
class YoutubePlaylistInfo(object):
def __init__(self, result_dict):
self.__id = result_dict['id']
self.__snippet = result_dict['snippet']
def getId(self):
return self.__id
def getChannelId(self):
return self.__snippet['channelId']
def getTitle(self):
return self.__snippet['title']
def getDescription(self):
return self.__snippet['description']
def getDefaultThumbnailUrl(self):
return self.__snippet['thumbnails']['default']['url']
def getBestThumbnailUrl(self):
best_url = None
best_res = 0
for _, thumb in self.__snippet['thumbnails'].items():
res = thumb['width'] * thumb['height']
if res > best_res:
best_res = res
best_url = thumb['url']
return best_url
class YoutubePlaylistItem(object):
def __init__(self, result_dict):
self.__snippet = result_dict['snippet']
def getVideoId(self):
return self.__snippet['resourceId']['videoId']
def getPublishDate(self):
return self.__snippet['publishedAt']
def getTitle(self):
return self.__snippet['title']
def getDescription(self):
return self.__snippet['description']
def getDefaultThumbnailUrl(self):
return self.__snippet['thumbnails']['default']['url']
def getBestThumbnailUrl(self):
best_url = None
best_res = 0
for _, thumb in self.__snippet['thumbnails'].items():
res = thumb['width'] * thumb['height']
if res > best_res:
best_res = res
best_url = thumb['url']
return best_url
def getPlaylistIndex(self):
return self.__snippet['position']
class YoutubeAPI(object):
def __init__(self, service):
self.service = service
@staticmethod
def build_public() -> 'YoutubeAPI':
service = build(API_SERVICE_NAME, API_VERSION, developerKey=settings.YOUTUBE_API_KEY)
return YoutubeAPI(service)
@staticmethod
def parse_channel_url(url):
"""
Parses given channel url, returns a tuple of the form (type, value), where type can be one of:
* channel_id
* channel_custom
* user
* playlist_id
:param url: URL to parse
:return: (type, value) tuple
"""
match = re.search(r'youtube\.com/.*[&?]list=([^?&/]+)', url)
if match:
return 'playlist_id', match.group(1)
match = re.search(r'youtube\.com/user/([^?&/]+)', url)
if match:
return 'user', match.group(1)
match = re.search(r'youtube\.com/channel/([^?&/]+)', url)
if match:
return 'channel_id', match.group(1)
match = re.search(r'youtube\.com/(?:c/)?([^?&/]+)', url)
if match:
return 'channel_custom', match.group(1)
raise Exception('Unrecognized URL format!')
def get_playlist_info(self, list_id) -> YoutubePlaylistInfo:
result = self.service.playlists()\
.list(part='snippet', id=list_id)\
.execute()
if len(result['items']) <= 0:
raise Exception("Invalid playlist ID.")
return YoutubePlaylistInfo(result['items'][0])
def get_channel_info_by_username(self, user) -> YoutubeChannelInfo:
result = self.service.channels()\
.list(part='snippet,contentDetails', forUsername=user)\
.execute()
if len(result['items']) <= 0:
raise Exception('Invalid user.')
return YoutubeChannelInfo(result['items'][0])
def get_channel_info(self, channel_id) -> YoutubeChannelInfo:
result = self.service.channels()\
.list(part='snippet,contentDetails', id=channel_id)\
.execute()
if len(result['items']) <= 0:
raise Exception('Invalid channel ID.')
return YoutubeChannelInfo(result['items'][0])
def search_channel(self, custom) -> str:
result = self.service.search()\
.list(part='id', q=custom, type='channel')\
.execute()
if len(result['items']) <= 0:
raise Exception('Could not find channel!')
channel_result = result['items'][0]
return channel_result['id']['channelId']
def list_playlist_videos(self, playlist_id):
kwargs = {
"part": "snippet",
"maxResults": 50,
"playlistId": playlist_id
}
last_page = False
while not last_page:
result = self.service.playlistItems()\
.list(**kwargs)\
.execute()
for item in result['items']:
yield YoutubePlaylistItem(item)
if 'nextPageToken' in result:
kwargs['pageToken'] = result['nextPageToken']
else:
last_page = True
# @staticmethod
# def build_oauth() -> 'YoutubeAPI':
# flow =
# credentials =
# service = build(API_SERVICE_NAME, API_VERSION, credentials)