2018-12-29 23:16:04 +02:00
|
|
|
from typing import Iterable, Optional
|
2018-11-03 14:43:23 +02:00
|
|
|
from xml.etree import ElementTree
|
|
|
|
import re
|
|
|
|
|
|
|
|
|
|
|
|
class FormatNotSupportedError(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class SubFileParser(object):
|
|
|
|
|
|
|
|
def probe(self, file_handle) -> bool:
|
|
|
|
"""
|
|
|
|
Tests if file matches file format.
|
2018-12-29 23:16:04 +02:00
|
|
|
:param file_handle: File handle
|
2018-11-03 14:43:23 +02:00
|
|
|
:return: True if file matches, false otherwise
|
|
|
|
"""
|
|
|
|
return False
|
|
|
|
|
|
|
|
def parse(self, file_handle) -> Iterable[str]:
|
|
|
|
"""
|
|
|
|
Parses file and returns a list of subscription URLs.
|
2018-12-29 23:16:04 +02:00
|
|
|
:param file_handle:
|
2018-11-03 14:43:23 +02:00
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
class SubscriptionListFileParser(SubFileParser):
|
|
|
|
"""
|
|
|
|
A subscription list file is file which contains just a bunch of URLs.
|
|
|
|
Comments are supported using # character.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __is_url(self, text: str) -> bool:
|
|
|
|
return text.startswith('http://') or text.startswith('https://')
|
|
|
|
|
|
|
|
def probe(self, file_handle):
|
|
|
|
file_handle.seek(0)
|
|
|
|
for line in file_handle:
|
2018-11-03 14:56:16 +02:00
|
|
|
if isinstance(line, bytes) or isinstance(line, bytearray):
|
|
|
|
line = line.decode()
|
2018-11-03 14:43:23 +02:00
|
|
|
# Trim comments and spaces
|
2018-11-03 14:56:16 +02:00
|
|
|
line = re.sub('(^|\s)#.*', '', line).strip()
|
2018-11-03 14:43:23 +02:00
|
|
|
if len(line) > 0:
|
|
|
|
return self.__is_url(line)
|
|
|
|
return False
|
|
|
|
|
|
|
|
def parse(self, file_handle):
|
|
|
|
file_handle.seek(0)
|
|
|
|
for line in file_handle:
|
2018-11-03 14:56:16 +02:00
|
|
|
if isinstance(line, bytes) or isinstance(line, bytearray):
|
|
|
|
line = line.decode()
|
2018-11-03 14:43:23 +02:00
|
|
|
# Trim comments and spaces
|
2018-11-03 14:56:16 +02:00
|
|
|
line = re.sub('(^|\s)#.*', '', line).strip()
|
2018-11-03 14:43:23 +02:00
|
|
|
if len(line) > 0:
|
|
|
|
yield line
|
|
|
|
|
|
|
|
|
|
|
|
class OPMLParser(SubFileParser):
|
|
|
|
"""
|
|
|
|
Parses OPML files (emitted by YouTube)
|
|
|
|
"""
|
|
|
|
def __init__(self):
|
|
|
|
self.__cached_file = None
|
2018-12-29 23:16:04 +02:00
|
|
|
self.__cached_tree: Optional[ElementTree.ElementTree] = None
|
2018-11-03 14:43:23 +02:00
|
|
|
|
|
|
|
def __parse(self, file_handle):
|
|
|
|
if file_handle == self.__cached_file:
|
|
|
|
return self.__cached_tree
|
|
|
|
|
|
|
|
file_handle.seek(0)
|
|
|
|
tree = ElementTree.parse(file_handle)
|
|
|
|
|
|
|
|
self.__cached_file = file_handle
|
|
|
|
self.__cached_tree = tree
|
|
|
|
return self.__cached_tree
|
|
|
|
|
|
|
|
def probe(self, file_handle):
|
|
|
|
try:
|
|
|
|
tree = self.__parse(file_handle)
|
|
|
|
except ElementTree.ParseError:
|
|
|
|
# Malformed XML
|
|
|
|
return False
|
|
|
|
|
|
|
|
return tree.getroot().tag.lower() == 'opml'
|
|
|
|
|
|
|
|
def parse(self, file_handle):
|
|
|
|
tree = self.__parse(file_handle)
|
|
|
|
root = tree.getroot()
|
|
|
|
|
|
|
|
for node in root.iter('outline'):
|
|
|
|
if 'xmlUrl' in node.keys():
|
|
|
|
yield node.get('xmlUrl')
|
|
|
|
|
|
|
|
|
|
|
|
PARSERS = (
|
|
|
|
OPMLParser(),
|
|
|
|
SubscriptionListFileParser()
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def parse(file_handle) -> Iterable[str]:
|
|
|
|
for parser in PARSERS:
|
|
|
|
if parser.probe(file_handle):
|
|
|
|
return parser.parse(file_handle)
|
|
|
|
|
|
|
|
raise FormatNotSupportedError('This file cannot be parsed!')
|