Improved notification system.

This commit is contained in:
Tiberiu Chibici 2019-08-14 17:14:16 +03:00
parent 80cf94d694
commit 9bf114c25d
19 changed files with 818 additions and 504 deletions

View File

@ -6,8 +6,8 @@ import sys
from django.conf import settings as dj_settings
from .management.appconfig import appconfig
from .management.jobs.synchronize import schedule_synchronize_global
from .scheduler import initialize_scheduler
from .management.jobs.synchronize import SynchronizeJob
from .scheduler import scheduler
from django.db.utils import OperationalError
@ -15,8 +15,6 @@ def __initialize_logger():
log_dir = os.path.join(dj_settings.DATA_DIR, 'logs')
os.makedirs(log_dir, exist_ok=True)
handlers = []
file_handler = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, "log.log"),
maxBytes=1024 * 1024,
@ -34,16 +32,16 @@ def __initialize_logger():
logging.root.addHandler(console_handler)
def main():
__initialize_logger()
try:
if appconfig.initialized:
initialize_scheduler()
schedule_synchronize_global()
scheduler.initialize()
SynchronizeJob.schedule_global_job()
except OperationalError:
# Settings table is not created when running migrate or makemigrations, so just don't do anything in this case.
# Settings table is not created when running migrate or makemigrations;
# Just don't do anything in this case.
pass
logging.info('Initialization complete.')

View File

@ -1,4 +1,4 @@
from YtManagerApp.management.jobs.download_video import schedule_download_video
from YtManagerApp.management.jobs.download_video import DownloadVideoJob
from YtManagerApp.models import Video, Subscription, VIDEO_ORDER_MAPPING
from YtManagerApp.utils import first_non_null
from django.conf import settings as srv_settings
@ -51,7 +51,7 @@ def downloader_process_subscription(sub: Subscription):
# enqueue download
for video in videos_to_download:
log.info('Enqueuing video %d [%s %s] index=%d', video.id, video.video_id, video.name, video.playlist_index)
schedule_download_video(video)
DownloadVideoJob.schedule(video)
log.info('Finished processing subscription %d [%s %s]', sub.id, sub.playlist_id, sub.id)

View File

@ -1,39 +1,46 @@
import logging
import os
from YtManagerApp import scheduler
from YtManagerApp.models import Video
log = logging.getLogger('video_downloader')
from YtManagerApp.scheduler import Job, scheduler
def delete_video(video: Video):
log.info('Deleting video %d [%s %s]', video.id, video.video_id, video.name)
count = 0
class DeleteVideoJob(Job):
name = "DeleteVideoJob"
try:
for file in video.get_files():
log.info("Deleting file %s", file)
count += 1
try:
os.unlink(file)
except OSError as e:
log.error("Failed to delete file %s: Error: %s", file, e)
def __init__(self, job_execution, video: Video):
super().__init__(job_execution)
self._video = video
except OSError as e:
log.error("Failed to delete video %d [%s %s]. Error: %s", video.id, video.video_id, video.name, e)
def get_description(self):
return f"Deleting video {self._video}"
video.downloaded_path = None
video.save()
def run(self):
count = 0
log.info('Deleted video %d successfully! (%d files) [%s %s]', video.id, count, video.video_id, video.name)
try:
for file in self._video.get_files():
self.log.info("Deleting file %s", file)
count += 1
try:
os.unlink(file)
except OSError as e:
self.log.error("Failed to delete file %s: Error: %s", file, e)
except OSError as e:
self.log.error("Failed to delete video %d [%s %s]. Error: %s", self._video.id,
self._video.video_id, self._video.name, e)
def schedule_delete_video(video: Video):
"""
Schedules a download video job to run immediately.
:param video:
:return:
"""
job = scheduler.scheduler.add_job(delete_video, args=[video])
log.info('Scheduled delete video job video=(%s), job=%s', video, job.id)
self._video.downloaded_path = None
self._video.save()
self.log.info('Deleted video %d successfully! (%d files) [%s %s]', self._video.id, count,
self._video.video_id, self._video.name)
@staticmethod
def schedule(video: Video):
"""
Schedules a delete video job to run immediately.
:param video:
:return:
"""
scheduler.add_job(DeleteVideoJob, args=[video])

View File

@ -1,4 +1,3 @@
import logging
import os
import re
from string import Template
@ -6,128 +5,129 @@ from threading import Lock
import youtube_dl
from YtManagerApp import scheduler
from YtManagerApp.models import Video
log = logging.getLogger('video_downloader')
log_youtube_dl = log.getChild('youtube_dl')
_lock = Lock()
from YtManagerApp.scheduler import Job, scheduler
def __get_valid_path(path):
"""
Normalizes string, converts to lowercase, removes non-alpha characters,
and converts spaces to hyphens.
"""
import unicodedata
value = unicodedata.normalize('NFKD', path).encode('ascii', 'ignore').decode('ascii')
value = re.sub('[:"*]', '', value).strip()
value = re.sub('[?<>|]', '#', value)
return value
class DownloadVideoJob(Job):
name = "DownloadVideoJob"
__lock = Lock()
def __init__(self, job_execution, video: Video, attempt: int = 1):
super().__init__(job_execution)
self.__video = video
self.__attempt = attempt
self.__log_youtube_dl = self.log.getChild('youtube_dl')
def __build_template_dict(video: Video):
return {
'channel': video.subscription.channel_name,
'channel_id': video.subscription.channel_id,
'playlist': video.subscription.name,
'playlist_id': video.subscription.playlist_id,
'playlist_index': "{:03d}".format(1 + video.playlist_index),
'title': video.name,
'id': video.video_id,
}
def get_description(self):
ret = "Downloading video " + self.__video.name
if self.__attempt > 1:
ret += f" (attempt {self.__attempt})"
return ret
def run(self):
# Issue: if multiple videos are downloaded at the same time, a race condition appears in the mkdirs() call that
# youtube-dl makes, which causes it to fail with the error 'Cannot create folder - file already exists'.
# For now, allow a single download instance.
self.__lock.acquire()
def __build_youtube_dl_params(video: Video):
try:
user = self.__video.subscription.user
max_attempts = user.preferences['max_download_attempts']
sub = video.subscription
user = sub.user
youtube_dl_params, output_path = self.__build_youtube_dl_params(self.__video)
with youtube_dl.YoutubeDL(youtube_dl_params) as yt:
ret = yt.download(["https://www.youtube.com/watch?v=" + self.__video.video_id])
# resolve path
download_path = user.preferences['download_path']
self.log.info('Download finished with code %d', ret)
template_dict = __build_template_dict(video)
output_pattern = Template(user.preferences['download_file_pattern']).safe_substitute(template_dict)
if ret == 0:
self.__video.downloaded_path = output_path
self.__video.save()
self.log.info('Video %d [%s %s] downloaded successfully!', self.__video.id, self.__video.video_id, self.__video.name)
output_path = os.path.join(download_path, output_pattern)
output_path = os.path.normpath(output_path)
elif self.__attempt <= max_attempts:
self.log.warning('Re-enqueueing video (attempt %d/%d)', self.__attempt, max_attempts)
DownloadVideoJob.schedule(self.__video, self.__attempt + 1)
youtube_dl_params = {
'logger': log_youtube_dl,
'format': user.preferences['download_format'],
'outtmpl': output_path,
'writethumbnail': True,
'writedescription': True,
'writesubtitles': user.preferences['download_subtitles'],
'writeautomaticsub': user.preferences['download_autogenerated_subtitles'],
'allsubtitles': user.preferences['download_subtitles_all'],
'postprocessors': [
{
'key': 'FFmpegMetadata'
},
]
}
else:
self.log.error('Multiple attempts to download video %d [%s %s] failed!', self.__video.id, self.__video.video_id,
self.__video.name)
self.__video.downloaded_path = ''
self.__video.save()
sub_langs = user.preferences['download_subtitles_langs'].split(',')
sub_langs = [i.strip() for i in sub_langs]
if len(sub_langs) > 0:
youtube_dl_params['subtitleslangs'] = sub_langs
finally:
self.__lock.release()
sub_format = user.preferences['download_subtitles_format']
if len(sub_format) > 0:
youtube_dl_params['subtitlesformat'] = sub_format
def __build_youtube_dl_params(self, video: Video):
return youtube_dl_params, output_path
sub = video.subscription
user = sub.user
# resolve path
download_path = user.preferences['download_path']
def download_video(video: Video, attempt: int = 1):
template_dict = self.__build_template_dict(video)
output_pattern = Template(user.preferences['download_file_pattern']).safe_substitute(template_dict)
user = video.subscription.user
output_path = os.path.join(download_path, output_pattern)
output_path = os.path.normpath(output_path)
log.info('Downloading video %d [%s %s]', video.id, video.video_id, video.name)
youtube_dl_params = {
'logger': self.__log_youtube_dl,
'format': user.preferences['download_format'],
'outtmpl': output_path,
'writethumbnail': True,
'writedescription': True,
'writesubtitles': user.preferences['download_subtitles'],
'writeautomaticsub': user.preferences['download_autogenerated_subtitles'],
'allsubtitles': user.preferences['download_subtitles_all'],
'postprocessors': [
{
'key': 'FFmpegMetadata'
},
]
}
# Issue: if multiple videos are downloaded at the same time, a race condition appears in the mkdirs() call that
# youtube-dl makes, which causes it to fail with the error 'Cannot create folder - file already exists'.
# For now, allow a single download instance.
_lock.acquire()
sub_langs = user.preferences['download_subtitles_langs'].split(',')
sub_langs = [i.strip() for i in sub_langs]
if len(sub_langs) > 0:
youtube_dl_params['subtitleslangs'] = sub_langs
try:
max_attempts = user.preferences['max_download_attempts']
sub_format = user.preferences['download_subtitles_format']
if len(sub_format) > 0:
youtube_dl_params['subtitlesformat'] = sub_format
youtube_dl_params, output_path = __build_youtube_dl_params(video)
with youtube_dl.YoutubeDL(youtube_dl_params) as yt:
ret = yt.download(["https://www.youtube.com/watch?v=" + video.video_id])
return youtube_dl_params, output_path
log.info('Download finished with code %d', ret)
def __build_template_dict(self, video: Video):
return {
'channel': video.subscription.channel_name,
'channel_id': video.subscription.channel_id,
'playlist': video.subscription.name,
'playlist_id': video.subscription.playlist_id,
'playlist_index': "{:03d}".format(1 + video.playlist_index),
'title': video.name,
'id': video.video_id,
}
if ret == 0:
video.downloaded_path = output_path
video.save()
log.info('Video %d [%s %s] downloaded successfully!', video.id, video.video_id, video.name)
def __get_valid_path(self, path):
"""
Normalizes string, converts to lowercase, removes non-alpha characters,
and converts spaces to hyphens.
"""
import unicodedata
value = unicodedata.normalize('NFKD', path).encode('ascii', 'ignore').decode('ascii')
value = re.sub('[:"*]', '', value).strip()
value = re.sub('[?<>|]', '#', value)
return value
elif attempt <= max_attempts:
log.warning('Re-enqueueing video (attempt %d/%d)', attempt, max_attempts)
__schedule_download_video(video, attempt + 1)
else:
log.error('Multiple attempts to download video %d [%s %s] failed!', video.id, video.video_id, video.name)
video.downloaded_path = ''
video.save()
finally:
_lock.release()
def __schedule_download_video(video: Video, attempt=1):
job = scheduler.scheduler.add_job(download_video, args=[video, attempt])
log.info('Scheduled download video job video=(%s), attempt=%d, job=%s', video, attempt, job.id)
def schedule_download_video(video: Video):
"""
Schedules a download video job to run immediately.
:param video:
:return:
"""
__schedule_download_video(video)
@staticmethod
def schedule(video: Video, attempt: int = 1):
"""
Schedules to download video immediately
:param video:
:param attempt:
:return:
"""
scheduler.add_job(DownloadVideoJob, args=[video, attempt])

View File

@ -1,211 +1,177 @@
import errno
import mimetypes
import itertools
from threading import Lock
from apscheduler.triggers.cron import CronTrigger
from YtManagerApp.management.notification_manager import OPERATION_ID_SYNCHRONIZE
from YtManagerApp.scheduler import scheduler
from YtManagerApp.management.appconfig import appconfig
from YtManagerApp.management.downloader import fetch_thumbnail, downloader_process_all, downloader_process_subscription
from YtManagerApp.management.downloader import fetch_thumbnail, downloader_process_subscription
from YtManagerApp.models import *
from YtManagerApp.scheduler import scheduler, Job
from YtManagerApp.utils import youtube
from YtManagerApp.management import notification_manager
log = logging.getLogger('sync')
__lock = Lock()
from external.pytaw.pytaw.utils import iterate_chunks
_ENABLE_UPDATE_STATS = True
def __check_new_videos_sub(subscription: Subscription, yt_api: youtube.YoutubeAPI, progress_callback=None):
# Get list of videos
for item in yt_api.playlist_items(subscription.playlist_id):
results = Video.objects.filter(video_id=item.resource_video_id, subscription=subscription)
if len(results) == 0:
log.info('New video for subscription %s: %s %s"', subscription, item.resource_video_id, item.title)
Video.create(item, subscription)
class SynchronizeJob(Job):
name = "SynchronizeJob"
__lock = Lock()
running = False
__global_sync_job = None
if _ENABLE_UPDATE_STATS:
all_vids = Video.objects.filter(subscription=subscription)
all_vids_ids = [video.video_id for video in all_vids]
all_vids_dict = {v.video_id: v for v in all_vids}
def __init__(self, job_execution, subscription: Optional[Subscription] = None):
super().__init__(job_execution)
self.__subscription = subscription
self.__api = youtube.YoutubeAPI.build_public()
self.__new_vids = []
for yt_video in yt_api.videos(all_vids_ids, part='id,statistics'):
video = all_vids_dict.get(yt_video.id)
def get_description(self):
if self.__subscription is not None:
return "Running synchronization for subscription " + self.__subscription.name
return "Running synchronization..."
if yt_video.n_likes is not None \
and yt_video.n_dislikes is not None \
and yt_video.n_likes + yt_video.n_dislikes > 0:
video.rating = yt_video.n_likes / (yt_video.n_likes + yt_video.n_dislikes)
def get_subscription_list(self):
if self.__subscription is not None:
return [self.__subscription]
return Subscription.objects.all()
video.views = yt_video.n_views
video.save()
def get_videos_list(self, subs):
return Video.objects.filter(subscription__in=subs)
def __detect_deleted(subscription: Subscription):
user = subscription.user
for video in Video.objects.filter(subscription=subscription, downloaded_path__isnull=False):
found_video = False
files = []
def run(self):
self.__lock.acquire(blocking=True)
SynchronizeJob.running = True
try:
files = list(video.get_files())
except OSError as e:
if e.errno != errno.ENOENT:
log.error("Could not access path %s. Error: %s", video.downloaded_path, e)
return
self.log.info(self.get_description())
# Try to find a valid video file
for file in files:
mime, _ = mimetypes.guess_type(file)
if mime is not None and mime.startswith("video"):
found_video = True
# Build list of work items
work_subs = self.get_subscription_list()
work_vids = self.get_videos_list(work_subs)
# Video not found, we can safely assume that the video was deleted.
if not found_video:
log.info("Video %d was deleted! [%s %s]", video.id, video.video_id, video.name)
# Clean up
self.set_total_steps(len(work_subs) + len(work_vids))
# Process subscriptions
for sub in work_subs:
self.progress_advance(1, "Synchronizing subscription " + sub.name)
self.check_new_videos(sub)
self.fetch_missing_thumbnails(sub)
# Add new videos to progress calculation
self.set_total_steps(len(work_subs) + len(work_vids) + len(self.__new_vids))
# Process videos
all_videos = itertools.chain(work_vids, self.__new_vids)
for batch in iterate_chunks(all_videos, 50):
video_stats = {}
if _ENABLE_UPDATE_STATS:
batch_ids = [video.video_id for video in batch]
video_stats = {v.id: v for v in self.__api.videos(batch_ids, part='id,statistics')}
for video in itertools.chain(work_vids, self.__new_vids):
self.progress_advance(1, "Updating video " + video.name)
self.check_video_deleted(video)
self.fetch_missing_thumbnails(video)
if video.video_id in video_stats:
self.update_video_stats(video, video_stats[video.video_id])
# Start downloading videos
for sub in work_subs:
downloader_process_subscription(sub)
finally:
SynchronizeJob.running = False
self.__lock.release()
def check_new_videos(self, sub: Subscription):
playlist_items = self.__api.playlist_items(sub.playlist_id)
for item in playlist_items:
results = Video.objects.filter(video_id=item.resource_video_id, subscription=sub)
if len(results) == 0:
self.log.info('New video for subscription %s: %s %s"', sub, item.resource_video_id, item.title)
self.__new_vids.append(Video.create(item, sub))
def fetch_missing_thumbnails(self, object: Union[Subscription, Video]):
if isinstance(object, Subscription):
object_type = "sub"
object_id = object.playlist_id
else:
object_type = "video"
object_id = object.video_id
if object.icon_default.startswith("http"):
object.icon_default = fetch_thumbnail(object.icon_default, object_type, object_id, 'default')
object.save()
if object.icon_best.startswith("http"):
object.icon_best = fetch_thumbnail(object.icon_best, object_type, object_id, 'best')
object.save()
def check_video_deleted(self, video: Video):
if video.downloaded_path is not None:
files = []
try:
files = list(video.get_files())
except OSError as e:
if e.errno != errno.ENOENT:
self.log.error("Could not access path %s. Error: %s", video.downloaded_path, e)
self.usr_err(f"Could not access path {video.downloaded_path}: {e}", suppress_notification=True)
return
# Try to find a valid video file
found_video = False
for file in files:
try:
os.unlink(file)
except OSError as e:
log.error("Could not delete redundant file %s. Error: %s", file, e)
video.downloaded_path = None
mime, _ = mimetypes.guess_type(file)
if mime is not None and mime.startswith("video"):
found_video = True
# Mark watched?
if user.preferences['mark_deleted_as_watched']:
video.watched = True
# Video not found, we can safely assume that the video was deleted.
if not found_video:
self.log.info("Video %d was deleted! [%s %s]", video.id, video.video_id, video.name)
# Clean up
for file in files:
try:
os.unlink(file)
except OSError as e:
self.log.error("Could not delete redundant file %s. Error: %s", file, e)
self.usr_err(f"Could not delete redundant file {file}: {e}", suppress_notification=True)
video.downloaded_path = None
video.save()
# Mark watched?
user = video.subscription.user
if user.preferences['mark_deleted_as_watched']:
video.watched = True
video.save()
def __fetch_thumbnails_obj(iterable, obj_type, id_attr):
for obj in iterable:
if obj.icon_default.startswith("http"):
obj.icon_default = fetch_thumbnail(obj.icon_default, obj_type, getattr(obj, id_attr), 'default')
if obj.icon_best.startswith("http"):
obj.icon_best = fetch_thumbnail(obj.icon_best, obj_type, getattr(obj, id_attr), 'best')
obj.save()
def update_video_stats(self, video: Video, yt_video):
if yt_video.n_likes is not None \
and yt_video.n_dislikes is not None \
and yt_video.n_likes + yt_video.n_dislikes > 0:
video.rating = yt_video.n_likes / (yt_video.n_likes + yt_video.n_dislikes)
video.views = yt_video.n_views
video.save()
def __fetch_thumbnails():
log.info("Fetching subscription thumbnails... ")
__fetch_thumbnails_obj(Subscription.objects.filter(icon_default__istartswith='http'), 'sub', 'playlist_id')
__fetch_thumbnails_obj(Subscription.objects.filter(icon_best__istartswith='http'), 'sub', 'playlist_id')
log.info("Fetching video thumbnails... ")
__fetch_thumbnails_obj(Video.objects.filter(icon_default__istartswith='http'), 'video', 'video_id')
__fetch_thumbnails_obj(Video.objects.filter(icon_best__istartswith='http'), 'video', 'video_id')
def __compute_progress(stage, stage_count, items, total_items):
stage_percent = float(stage) / stage_count
def synchronize():
if not __lock.acquire(blocking=False):
# Synchronize already running in another thread
log.info("Synchronize already running in another thread")
return
try:
log.info("Running scheduled synchronization... ")
notification_manager.notify_status_operation_progress(
OPERATION_ID_SYNCHRONIZE,
'Running scheduled synchronization: checking for new videos...',
0.1,
None
)
# Sync subscribed playlists/channels
log.info("Sync - checking videos")
yt_api = youtube.YoutubeAPI.build_public()
for subscription in Subscription.objects.all():
__check_new_videos_sub(subscription, yt_api)
__detect_deleted(subscription)
notification_manager.notify_status_operation_progress(
OPERATION_ID_SYNCHRONIZE,
'Running scheduled synchronization: enqueueing videos to download...',
0.5,
None
)
log.info("Sync - checking for videos to download")
downloader_process_all()
notification_manager.notify_status_operation_progress(
OPERATION_ID_SYNCHRONIZE,
'Running scheduled synchronization: fetching thumbnails...',
0.7,
None
)
log.info("Sync - fetching missing thumbnails")
__fetch_thumbnails()
log.info("Synchronization finished.")
notification_manager.notify_status_operation_ended(
OPERATION_ID_SYNCHRONIZE,
'Synchronization finished.',
None
)
finally:
__lock.release()
def synchronize_subscription(subscription: Subscription):
__lock.acquire()
try:
log.info("Running synchronization for single subscription %d [%s]", subscription.id, subscription.name)
notification_manager.notify_status_update(f'Synchronization started for subscription <strong>{subscription.name}</strong>.')
yt_api = youtube.YoutubeAPI.build_public()
log.info("Sync - checking videos")
__check_new_videos_sub(subscription, yt_api)
__detect_deleted(subscription)
log.info("Sync - checking for videos to download")
downloader_process_subscription(subscription)
log.info("Sync - fetching missing thumbnails")
__fetch_thumbnails()
log.info("Synchronization finished for subscription %d [%s].", subscription.id, subscription.name)
notification_manager.notify_status_update(f'Synchronization finished for subscription <strong>{subscription.name}</strong>.')
finally:
__lock.release()
__global_sync_job = None
def schedule_synchronize_global():
global __global_sync_job
trigger = CronTrigger.from_crontab(appconfig.sync_schedule)
if __global_sync_job is None:
@staticmethod
def schedule_global_job():
trigger = CronTrigger.from_crontab(appconfig.sync_schedule)
__global_sync_job = scheduler.add_job(synchronize, trigger, max_instances=1, coalesce=True)
else:
__global_sync_job.reschedule(trigger, max_instances=1, coalesce=True)
if SynchronizeJob.__global_sync_job is None:
trigger = CronTrigger.from_crontab(appconfig.sync_schedule)
SynchronizeJob.__global_sync_job = scheduler.add_job(SynchronizeJob, trigger, max_instances=1, coalesce=True)
log.info('Scheduled synchronize job job=%s', __global_sync_job.id)
else:
SynchronizeJob.__global_sync_job.reschedule(trigger, max_instances=1, coalesce=True)
@staticmethod
def schedule_now():
scheduler.add_job(SynchronizeJob, max_instances=1, coalesce=True)
def schedule_synchronize_now():
job = scheduler.add_job(synchronize, max_instances=1, coalesce=True)
log.info('Scheduled synchronize now job job=%s', job.id)
def schedule_synchronize_now_subscription(subscription: Subscription):
job = scheduler.add_job(synchronize_subscription, args=[subscription])
log.info('Scheduled synchronize subscription job subscription=(%s), job=%s', subscription, job.id)
@staticmethod
def schedule_now_for_subscription(subscription):
scheduler.add_job(SynchronizeJob, user=subscription.user, args=[subscription])

View File

@ -1,95 +0,0 @@
from django.contrib.auth.models import User
from typing import Dict, Deque, Any, Optional
from collections import deque
from datetime import datetime, timedelta
from YtManagerApp.utils.algorithms import bisect_left
from threading import Lock
# Clients will request updates at most every few seconds, so a retention period of 60 seconds should be more than
# enough. I gave it 15 minutes so that if for some reason the connection fails (internet drops) and then comes back a
# few minutes later, the client will still get the updates
__RETENTION_PERIOD = 15 * 60
__NOTIFICATIONS: Deque[Dict] = deque()
__NEXT_ID = 0
__LOCK = Lock()
OPERATION_ID_SYNCHRONIZE = 1
# Messages enum
class Messages:
STATUS_UPDATE = 'st-up'
STATUS_OPERATION_PROGRESS = 'st-op-prog'
STATUS_OPERATION_END = 'st-op-end'
def __add_notification(message, user: User=None, **kwargs):
global __NEXT_ID
__LOCK.acquire()
try:
# add notification
notification = {
'time': datetime.now(),
'msg': message,
'id': __NEXT_ID,
'uid': user and user.id,
}
notification.update(kwargs)
__NOTIFICATIONS.append(notification)
__NEXT_ID += 1
# trim old notifications
oldest = __NOTIFICATIONS[0]
while len(__NOTIFICATIONS) > 0 and oldest['time'] + timedelta(seconds=__RETENTION_PERIOD) < datetime.now():
__NOTIFICATIONS.popleft()
oldest = __NOTIFICATIONS[0]
finally:
__LOCK.release()
def get_notifications(user: User, last_received_id: Optional[int]):
__LOCK.acquire()
try:
first_index = 0
if last_received_id is not None:
first_index = bisect_left(__NOTIFICATIONS,
{'id': last_received_id},
key=lambda item: item['id'])
for i in range(first_index, len(__NOTIFICATIONS)):
item = __NOTIFICATIONS[i]
if item['uid'] is None or item['uid'] == user.id:
yield item
finally:
__LOCK.release()
def get_current_notification_id():
return __NEXT_ID
def notify_status_update(status_message: str, user: User=None):
__add_notification(Messages.STATUS_UPDATE,
user=user,
status=status_message)
def notify_status_operation_progress(op_id: Any, status_message: str, progress_percent: float, user: User=None):
__add_notification(Messages.STATUS_OPERATION_PROGRESS,
user=user,
operation=op_id,
status=status_message,
progress=progress_percent)
def notify_status_operation_ended(op_id: Any, status_message: str, user: User=None):
__add_notification(Messages.STATUS_OPERATION_END,
user=user,
operation=op_id,
status=status_message)

View File

@ -0,0 +1,39 @@
# Generated by Django 2.1.7 on 2019-04-08 15:26
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('YtManagerApp', '0008_auto_20181229_2035'),
]
operations = [
migrations.CreateModel(
name='JobExecution',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('start_date', models.DateTimeField(auto_now=True)),
('end_date', models.DateTimeField(null=True)),
('description', models.CharField(default='', max_length=250)),
('status', models.IntegerField(choices=[('running', 0), ('finished', 1), ('failed', 2), ('interrupted', 3)], default=0)),
('user', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
],
),
migrations.CreateModel(
name='JobMessage',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('timestamp', models.DateTimeField(auto_now=True)),
('progress', models.FloatField(null=True)),
('message', models.CharField(default='', max_length=1024)),
('level', models.IntegerField(choices=[('normal', 0), ('warning', 1), ('error', 2)], default=0)),
('suppress_notification', models.BooleanField(default=False)),
('job', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='YtManagerApp.JobExecution')),
],
),
]

View File

@ -199,18 +199,18 @@ class Video(models.Model):
self.save()
if self.downloaded_path is not None:
from YtManagerApp.management.appconfig import appconfig
from YtManagerApp.management.jobs.delete_video import schedule_delete_video
from YtManagerApp.management.jobs.synchronize import schedule_synchronize_now_subscription
from YtManagerApp.management.jobs.delete_video import DeleteVideoJob
from YtManagerApp.management.jobs.synchronize import SynchronizeJob
if appconfig.for_sub(self.subscription, 'automatically_delete_watched'):
schedule_delete_video(self)
schedule_synchronize_now_subscription(self.subscription)
DeleteVideoJob.schedule(self)
SynchronizeJob.schedule_now_for_subscription(self.subscription)
def mark_unwatched(self):
from YtManagerApp.management.jobs.synchronize import schedule_synchronize_now_subscription
from YtManagerApp.management.jobs.synchronize import SynchronizeJob
self.watched = False
self.save()
schedule_synchronize_now_subscription(self.subscription)
SynchronizeJob.schedule_now_for_subscription(self.subscription)
def get_files(self):
if self.downloaded_path is not None:
@ -255,3 +255,47 @@ class Video(models.Model):
def __repr__(self):
return f'video {self.id}, video_id="{self.video_id}"'
JOB_STATES = [
('running', 0),
('finished', 1),
('failed', 2),
('interrupted', 3),
]
JOB_STATES_MAP = {
'running': 0,
'finished': 1,
'failed': 2,
'interrupted': 3,
}
JOB_MESSAGE_LEVELS = [
('normal', 0),
('warning', 1),
('error', 2),
]
JOB_MESSAGE_LEVELS_MAP = {
'normal': 0,
'warning': 1,
'error': 2,
}
class JobExecution(models.Model):
start_date = models.DateTimeField(auto_now=True, null=False)
end_date = models.DateTimeField(null=True)
user = models.ForeignKey(User, on_delete=models.CASCADE, null=True)
description = models.CharField(max_length=250, null=False, default="")
status = models.IntegerField(choices=JOB_STATES, null=False, default=0)
class JobMessage(models.Model):
timestamp = models.DateTimeField(auto_now=True, null=False)
job = models.ForeignKey(JobExecution, null=False, on_delete=models.CASCADE)
progress = models.FloatField(null=True)
message = models.CharField(max_length=1024, null=False, default="")
level = models.IntegerField(choices=JOB_MESSAGE_LEVELS, null=False, default=0)
suppress_notification = models.BooleanField(null=False, default=False)

View File

@ -1,27 +1,267 @@
import datetime
import logging
import traceback
from typing import Type, Union, Optional, Callable, List, Any
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.base import BaseTrigger
from django.contrib.auth.models import User
from YtManagerApp.management.appconfig import appconfig
scheduler = BackgroundScheduler()
from YtManagerApp.models import JobExecution, JobMessage, JOB_STATES_MAP, JOB_MESSAGE_LEVELS_MAP
def initialize_scheduler():
class ProgressTracker(object):
"""
Class which helps keep track of complex operation progress.
"""
if scheduler.running:
return
def __init__(self, total_steps: float = 100, initial_steps: float = 0,
listener: Callable[[float, str], None] = None,
listener_args: List[Any] = None,
parent: Optional["ProgressTracker"] = None):
"""
Constructor
:param total_steps: Total number of steps required by this operation
:param initial_steps: Starting steps
:param parent: Parent progress tracker
:param listener: Callable which is called when any progress happens
"""
logger = logging.getLogger('scheduler')
executors = {
'default': {
'type': 'threadpool',
'max_workers': appconfig.concurrency
self.total_steps = total_steps
self.steps = initial_steps
self.__subtask: ProgressTracker = None
self.__subtask_steps = 0
self.__parent = parent
self.__listener = listener
self.__listener_args = listener_args or []
def __on_progress(self, progress_msg):
if self.__listener is not None:
self.__listener(*self.__listener_args, self.compute_progress(), progress_msg)
if self.__parent is not None:
self.__parent.__on_progress(progress_msg)
def advance(self, steps: float = 1, progress_msg: str = ''):
"""
Advances a number of steps.
:param steps: Number of steps to advance
:param progress_msg: A message which will be passed to a listener
:return:
"""
# We can assume previous subtask is now completed
if self.__subtask is not None:
self.steps += self.__subtask_steps
self.__subtask = None
self.steps += steps
self.__on_progress(progress_msg)
def subtask(self, steps: float = 1, subtask_total_steps: float = 100, subtask_initial_steps: float = 0):
"""
Creates a 'subtask' which has its own progress, which will be used in the calculation of the final progress.
:param steps: Number of steps the subtask is 'worth'
:param subtask_total_steps: Total number of steps for subtask
:param subtask_initial_steps: Initial steps for subtask
:return: ProgressTracker for subtask
"""
# We can assume previous subtask is now completed
if self.__subtask is not None:
self.steps += self.__subtask_steps
self.__subtask = ProgressTracker(total_steps=subtask_total_steps,
initial_steps=subtask_initial_steps,
parent=self)
self.__subtask_steps = steps
return self.__subtask
def compute_progress(self):
"""
Calculates final progress value in percent.
:return: value in [0,1] interval representing progress
"""
base = float(self.steps) / self.total_steps
if self.__subtask is not None:
base += self.__subtask.compute_progress() * self.__subtask_steps / self.total_steps
return min(base, 1.0)
class Job(object):
name = 'GenericJob'
"""
Base class for jobs running in the scheduler.
"""
def __init__(self, job_execution, *args):
self.job_execution = job_execution
self.log = logging.getLogger(self.name)
self.__progress_tracker = ProgressTracker(listener=Job.__on_progress,
listener_args=[self])
def get_description(self) -> str:
"""
Gets a user friendly description of this job.
Should be overriden in job classes.
:return:
"""
return "Running job..."
#
# progress tracking
#
def __on_progress(self, percent: float, message: str):
self.usr_log(message, progress=percent)
def set_total_steps(self, steps: float):
"""
Sets the total number of work steps this task has. This is used for tracking progress.
Should be overriden in job classes.
:return:
"""
self.__progress_tracker.total_steps = steps
def progress_advance(self, steps: float = 1, progress_msg: str = ''):
"""
Advances a number of steps.
:param steps: Number of steps to advance
:param progress_msg: A message which will be passed to a listener
:return:
"""
self.__progress_tracker.advance(steps, progress_msg)
def create_subtask(self, steps: float = 1, subtask_total_steps: float = 100, subtask_initial_steps: float = 0):
"""
Creates a 'subtask' which has its own progress, which will be used in the calculation of the final progress.
:param steps: Number of steps the subtask is 'worth'
:param subtask_total_steps: Total number of steps for subtask
:param subtask_initial_steps: Initial steps for subtask
:return: ProgressTracker for subtask
"""
return self.__progress_tracker.subtask(steps, subtask_total_steps, subtask_initial_steps)
#
# user log messages
#
def usr_log(self, message, progress: Optional[float] = None, level: int = JOB_MESSAGE_LEVELS_MAP['normal'],
suppress_notification: bool = False):
"""
Creates a new log message which will be shown on the user interface.
Progress can also be updated using this method.
:param message: A message to be displayed to the user
:param progress: Progress percentage in [0,1] interval
:param level: Log level (normal, warning, error)
:param suppress_notification: If set to true, a notification will not displayed to the user, but it will
appear in the system logs.
:return:
"""
message = JobMessage(job=self.job_execution,
progress=progress,
message=message,
level=level,
suppress_notification=suppress_notification)
message.save()
def usr_warn(self, message, progress: Optional[float] = None, suppress_notification: bool = False):
"""
Creates a new warning message which will be shown on the user interface.
Progress can also be updated using this method.
:param message: A message to be displayed to the user
:param progress: Progress percentage in [0,1] interval
:param suppress_notification: If set to true, a notification will not displayed to the user, but it will
appear in the system logs.
:return:
"""
self.usr_log(message, progress, JOB_MESSAGE_LEVELS_MAP['warning'], suppress_notification)
def usr_err(self, message, progress: Optional[float] = None, suppress_notification: bool = False):
"""
Creates a new error message which will be shown on the user interface.
Progress can also be updated using this method.
:param message: A message to be displayed to the user
:param progress: Progress percentage in [0,1] interval
:param suppress_notification: If set to true, a notification will not displayed to the user, but it will
appear in the system logs.
:return:
"""
self.usr_log(message, progress, JOB_MESSAGE_LEVELS_MAP['error'], suppress_notification)
#
# main run method
#
def run(self):
pass
class YtsmScheduler(object):
def __init__(self):
self._apscheduler = BackgroundScheduler()
def initialize(self):
# set state of existing jobs as "interrupted"
JobExecution.objects\
.filter(status=JOB_STATES_MAP['running'])\
.update(status=JOB_STATES_MAP['interrupted'])
self._configure_scheduler()
self._apscheduler.start()
def _configure_scheduler(self):
logger = logging.getLogger('scheduler')
executors = {
'default': {
'type': 'threadpool',
'max_workers': appconfig.concurrency
}
}
}
job_defaults = {
'misfire_grace_time': 60 * 60 * 24 * 365 # 1 year
}
job_defaults = {
'misfire_grace_time': 60 * 60 * 24 * 365 # 1 year
}
self._apscheduler.configure(logger=logger, executors=executors, job_defaults=job_defaults)
scheduler.configure(logger=logger, executors=executors, job_defaults=job_defaults)
scheduler.start()
def _run_job(self, job_class: Type[Job], user: Optional[User], args: Union[tuple, list]):
job_execution = JobExecution(user=user, status=JOB_STATES_MAP['running'])
job_execution.save()
job_instance = job_class(job_execution, *args)
# update description
job_execution.description = job_instance.get_description()
job_execution.save()
try:
job_instance.run()
job_execution.status = JOB_STATES_MAP['finished']
except Exception as ex:
job_instance.log.critical("Job failed with exception: %s", traceback.format_exc())
job_instance.usr_err(job_instance.name + " operation failed: " + str(ex))
job_execution.status = JOB_STATES_MAP['failed']
finally:
job_execution.end_date = datetime.datetime.now()
job_execution.save()
def add_job(self, job_class: Type[Job], trigger: Union[str, BaseTrigger] = None,
args: Union[list, tuple] = None,
user: Optional[User] = None,
**kwargs):
if args is None:
args = []
return self._apscheduler.add_job(YtsmScheduler._run_job, trigger=trigger, args=[self, job_class, user, args],
**kwargs)
scheduler = YtsmScheduler()

View File

@ -9,8 +9,6 @@
{% block scripts %}
<script src="{% static 'YtManagerApp/import/jstree/dist/jstree.min.js' %}"></script>
<script>
let LAST_NOTIFICATION_ID = {{ current_notification_id }};
{% include 'YtManagerApp/js/index.js' %}
</script>
{% endblock %}

View File

@ -178,63 +178,43 @@ function videos_Submit(e)
///
/// Notifications
///
const NOTIFICATION_INTERVAL = 1000;
const STATUS_UPDATE = 'st-up';
const STATUS_OPERATION_PROGRESS = 'st-op-prog';
const STATUS_OPERATION_END = 'st-op-end';
const OPERATION_LIST = {};
const JOB_QUERY_INTERVAL = 1500;
function notifications_update_progress_bar() {
var count = 0;
var percent = 0;
for(op in OPERATION_LIST) {
count++;
percent += OPERATION_LIST[op];
}
let progress = $('#status-progress');
if (count > 0) {
progress.removeClass('invisible');
let bar = progress.find('.progress-bar');
bar.width(percent * 100 + '%');
bar.text(count + ' operations in progress');
}
else {
progress.addClass('invisible');
}
}
function get_and_process_notifications()
{
$.get("{% url 'ajax_get_notifications' 12345 %}".replace("12345", LAST_NOTIFICATION_ID))
$.get("{% url 'ajax_get_running_jobs' %}")
.done(function(data) {
for (let entry of data)
{
LAST_NOTIFICATION_ID = entry.id;
let dt = new Date(entry.time);
// Status update
if (entry.msg === STATUS_UPDATE) {
let txt = `<span class="status-timestamp">${dt.getHours()}:${zeroFill(dt.getMinutes(), 2)}</span>${entry.status}`;
$('#status-message').html(txt);
}
else if (entry.msg === STATUS_OPERATION_PROGRESS) {
let txt = `<span class="status-timestamp">${dt.getHours()}:${zeroFill(dt.getMinutes(), 2)}</span>${entry.status}`;
$('#status-message').html(txt);
let progress = $('#status-progress');
OPERATION_LIST[entry.operation] = entry.progress;
notifications_update_progress_bar();
}
else if (entry.msg === STATUS_OPERATION_END) {
let txt = `<span class="status-timestamp">${dt.getHours()}:${dt.getMinutes()}</span>${entry.status}`;
$('#status-message').html(txt);
if (data.length > 0) {
delete OPERATION_LIST[entry.operation];
notifications_update_progress_bar();
let statusTxt = "";
if (data.length > 1) {
statusTxt = `Running ${data.length} jobs...`;
}
else {
statusTxt = `${data[0].description} | ${data[0].message}`;
}
let combinedProgress = 0;
for (let entry of data) {
combinedProgress += entry.progress;
}
let percent = 100 * combinedProgress / data.length;
progress.removeClass('invisible');
let bar = progress.find('.progress-bar');
bar.width(percent + '%');
bar.text(`${percent.toFixed(0)}%`);
$('#status-message').text(statusTxt);
}
else {
progress.addClass('invisible');
$('#status-message').text("");
}
});
}
@ -280,5 +260,5 @@ $(document).ready(function ()
videos_Reload();
// Notification manager
setInterval(get_and_process_notifications, NOTIFICATION_INTERVAL);
setInterval(get_and_process_notifications, JOB_QUERY_INTERVAL);
});

View File

@ -24,7 +24,7 @@ from .views.actions import SyncNowView, DeleteVideoFilesView, DownloadVideoFiles
from .views.auth import ExtendedLoginView, RegisterView, RegisterDoneView
from .views.index import index, ajax_get_tree, ajax_get_videos, CreateFolderModal, UpdateFolderModal, DeleteFolderModal, \
CreateSubscriptionModal, UpdateSubscriptionModal, DeleteSubscriptionModal, ImportSubscriptionsModal
from .views.notifications import ajax_get_notifications
from .views.notifications import ajax_get_running_jobs
from .views.settings import SettingsView, AdminSettingsView
from .views.video import VideoDetailView, video_detail_view
@ -45,7 +45,7 @@ urlpatterns = [
path('ajax/get_tree/', ajax_get_tree, name='ajax_get_tree'),
path('ajax/get_videos/', ajax_get_videos, name='ajax_get_videos'),
path('ajax/get_notifications/<int:last_id>', ajax_get_notifications, name='ajax_get_notifications'),
path('ajax/get_running_jobs/', ajax_get_running_jobs, name='ajax_get_running_jobs'),
# Modals
path('modal/create_folder/', CreateFolderModal.as_view(), name='modal_create_folder'),

View File

@ -0,0 +1,109 @@
from typing import Optional, Callable
class ProgressTracker(object):
"""
Class which helps keep track of complex operation progress.
"""
def __init__(self, total_steps: float = 100, initial_steps: float = 0,
listener: Callable[[float, str], None] = None,
completed_listener: Callable[[], None] = None,
parent: Optional["ProgressTracker"] = None):
"""
Constructor
:param total_steps: Total number of steps required by this operation
:param initial_steps: Starting steps
:param parent: Parent progress tracker
:param listener: Callable which is called when any progress happens
"""
self.total_steps = total_steps
self.steps = initial_steps
self.__subtask: ProgressTracker = None
self.__subtask_steps = 0
self.__parent = parent
self.__listener = listener
self.__completed_listener = completed_listener
def __on_progress(self, progress_msg):
if self.__listener is not None:
self.__listener(self.compute_progress(), progress_msg)
if self.__parent is not None:
self.__parent.__on_progress(progress_msg)
if self.steps >= self.total_steps and self.__completed_listener is not None:
self.__completed_listener()
def advance(self, steps: float = 1, progress_msg: str = ''):
"""
Advances a number of steps.
:param steps: Number of steps to advance
:param progress_msg: A message which will be passed to a listener
:return:
"""
# We can assume previous subtask is now completed
if self.__subtask is not None:
self.steps += self.__subtask_steps
self.__subtask = None
self.steps += steps
self.__on_progress(progress_msg)
def subtask(self, steps: float = 1, subtask_total_steps: float = 100, subtask_initial_steps: float = 0):
"""
Creates a 'subtask' which has its own progress, which will be used in the calculation of the final progress.
:param steps: Number of steps the subtask is 'worth'
:param subtask_total_steps: Total number of steps for subtask
:param subtask_initial_steps: Initial steps for subtask
:return: ProgressTracker for subtask
"""
# We can assume previous subtask is now completed
if self.__subtask is not None:
self.steps += self.__subtask_steps
self.__subtask = ProgressTracker(total_steps=subtask_total_steps,
initial_steps=subtask_initial_steps,
parent=self)
self.__subtask_steps = steps
return self.__subtask
def compute_progress(self):
"""
Calculates final progress value in percent.
:return: value in [0,1] interval representing progress
"""
base = float(self.steps) / self.total_steps
if self.__subtask is not None:
base += self.__subtask.compute_progress() * self.__subtask_steps / self.total_steps
return min(base, 1.0)
# Test
if __name__ == '__main__':
def on_progress(progress, message):
print(f'{progress * 100}%: {message}')
def on_completed():
print("Complete!")
main_task = ProgressTracker(total_steps=20, listener=on_progress, completed_listener=on_completed)
for i in range(10):
main_task.advance(progress_msg='First 10 steps')
subtask = main_task.subtask(5, subtask_total_steps=10)
for i in range(10):
subtask.advance(progress_msg='Subtask')
for i in range(5):
main_task.advance(progress_msg='Main task again')

View File

@ -2,13 +2,13 @@ from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import JsonResponse
from django.views.generic import View
from YtManagerApp.management.jobs.synchronize import schedule_synchronize_now
from YtManagerApp.management.jobs.synchronize import SynchronizeJob
from YtManagerApp.models import Video
class SyncNowView(LoginRequiredMixin, View):
def post(self, *args, **kwargs):
schedule_synchronize_now()
SynchronizeJob.schedule_now()
return JsonResponse({
'success': True
})

View File

@ -9,8 +9,8 @@ from django.urls import reverse_lazy
from django.views.generic import FormView
from YtManagerApp.management.appconfig import appconfig
from YtManagerApp.management.jobs.synchronize import schedule_synchronize_global
from YtManagerApp.scheduler import initialize_scheduler
from YtManagerApp.management.jobs.synchronize import SynchronizeJob
from YtManagerApp.scheduler import scheduler
from YtManagerApp.views.forms.first_time import WelcomeForm, ApiKeyForm, PickAdminUserForm, ServerConfigForm, DoneForm, \
UserCreationForm, LoginForm
@ -177,8 +177,8 @@ class Step3ConfigureView(WizardStepMixin, FormView):
appconfig.initialized = True
# Start scheduler if not started
initialize_scheduler()
schedule_synchronize_global()
scheduler.initialize()
SynchronizeJob.schedule_global_job()
return super().form_valid(form)

View File

@ -15,7 +15,6 @@ from YtManagerApp.management.appconfig import appconfig
from YtManagerApp.models import Subscription, SubscriptionFolder, VIDEO_ORDER_CHOICES, VIDEO_ORDER_MAPPING
from YtManagerApp.utils import youtube, subscription_file_parser
from YtManagerApp.views.controls.modal import ModalMixin
from YtManagerApp.management.notification_manager import get_current_notification_id
import logging
@ -122,7 +121,6 @@ def index(request: HttpRequest):
if request.user.is_authenticated:
context.update({
'filter_form': VideoFilterForm(),
'current_notification_id': get_current_notification_id(),
})
return render(request, 'YtManagerApp/index.html', context)
else:

View File

@ -1,12 +1,42 @@
from django.contrib.auth.decorators import login_required
from django.db.models import Q
from django.http import HttpRequest, JsonResponse
from YtManagerApp.management.notification_manager import get_notifications
from YtManagerApp.models import JobExecution, JobMessage, JOB_STATES_MAP
@login_required
def ajax_get_notifications(request: HttpRequest, last_id: int):
user = request.user
notifications = get_notifications(user, last_id)
notifications = list(notifications)
return JsonResponse(notifications, safe=False)
def ajax_get_running_jobs(request: HttpRequest):
jobs = JobExecution.objects\
.filter(status=JOB_STATES_MAP['running'])\
.filter(Q(user__isnull=True) | Q(user=request.user))\
.order_by('start_date')
response = []
for job in jobs:
last_progress_message = JobMessage.objects\
.filter(job=job, progress__isnull=False, suppress_notification=False)\
.order_by('-timestamp').first()
last_message = JobMessage.objects\
.filter(job=job, suppress_notification=False)\
.order_by('-timestamp').first()
message = ''
progress = 0
if last_message is not None:
message = last_message.message
if last_progress_message is not None:
progress = last_progress_message.progress
response.append({
'id': job.id,
'description': job.description,
'progress': progress,
'message': message
})
return JsonResponse(response, safe=False)

View File

@ -3,7 +3,7 @@ from django.http import HttpResponseForbidden
from django.urls import reverse_lazy
from django.views.generic import FormView
from YtManagerApp.management.jobs.synchronize import schedule_synchronize_global
from YtManagerApp.management.jobs.synchronize import SynchronizeJob
from YtManagerApp.views.forms.settings import SettingsForm, AdminSettingsForm
@ -45,5 +45,5 @@ class AdminSettingsView(LoginRequiredMixin, FormView):
def form_valid(self, form):
form.save()
schedule_synchronize_global()
SynchronizeJob.schedule_global_job()
return super().form_valid(form)

View File

@ -89,4 +89,4 @@ def iterate_chunks(iterable: typing.Iterable, chunk_size: int):
chunk = tuple(itertools.islice(it, chunk_size))
if not chunk:
return
yield chunk
yield chunk