ytsm/app/YtManagerApp/management/notification_manager.py

96 lines
2.9 KiB
Python
Raw Normal View History

from django.contrib.auth.models import User
from typing import Dict, Deque, Any, Optional
from collections import deque
from datetime import datetime, timedelta
from YtManagerApp.utils.algorithms import bisect_left
from threading import Lock
# Clients will request updates at most every few seconds, so a retention period of 60 seconds should be more than
# enough. I gave it 15 minutes so that if for some reason the connection fails (internet drops) and then comes back a
# few minutes later, the client will still get the updates
__RETENTION_PERIOD = 15 * 60
__NOTIFICATIONS: Deque[Dict] = deque()
__NEXT_ID = 0
__LOCK = Lock()
2019-01-02 18:10:37 +00:00
OPERATION_ID_SYNCHRONIZE = 1
# Messages enum
class Messages:
STATUS_UPDATE = 'st-up'
STATUS_OPERATION_PROGRESS = 'st-op-prog'
STATUS_OPERATION_END = 'st-op-end'
def __add_notification(message, user: User=None, **kwargs):
global __NEXT_ID
__LOCK.acquire()
try:
# add notification
notification = {
'time': datetime.now(),
'msg': message,
'id': __NEXT_ID,
'uid': user and user.id,
}
notification.update(kwargs)
__NOTIFICATIONS.append(notification)
__NEXT_ID += 1
# trim old notifications
oldest = __NOTIFICATIONS[0]
while len(__NOTIFICATIONS) > 0 and oldest['time'] + timedelta(seconds=__RETENTION_PERIOD) < datetime.now():
__NOTIFICATIONS.popleft()
oldest = __NOTIFICATIONS[0]
finally:
__LOCK.release()
def get_notifications(user: User, last_received_id: Optional[int]):
__LOCK.acquire()
try:
first_index = 0
if last_received_id is not None:
first_index = bisect_left(__NOTIFICATIONS,
{'id': last_received_id},
key=lambda item: item['id'])
for i in range(first_index, len(__NOTIFICATIONS)):
item = __NOTIFICATIONS[i]
if item['uid'] is None or item['uid'] == user.id:
yield item
finally:
__LOCK.release()
def get_current_notification_id():
return __NEXT_ID
def notify_status_update(status_message: str, user: User=None):
__add_notification(Messages.STATUS_UPDATE,
user=user,
status=status_message)
def notify_status_operation_progress(op_id: Any, status_message: str, progress_percent: float, user: User=None):
__add_notification(Messages.STATUS_OPERATION_PROGRESS,
user=user,
operation=op_id,
status=status_message,
progress=progress_percent)
def notify_status_operation_ended(op_id: Any, status_message: str, user: User=None):
__add_notification(Messages.STATUS_OPERATION_END,
user=user,
operation=op_id,
status=status_message)