perf: 支持在线会话暂停操作 (#11146)

* perf: 支持在线会话暂停操作

* perf: 优化代码

---------

Co-authored-by: Eric <xplzv@126.com>
This commit is contained in:
fit2bot
2023-08-01 16:40:38 +08:00
committed by GitHub
parent d17e2cde06
commit 44397caad4
14 changed files with 231 additions and 99 deletions

View File

@@ -3,17 +3,20 @@
import logging
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.views import APIView, Response
from common.api import JMSBulkModelViewSet
from common.const.http import POST
from common.utils import get_object_or_none
from orgs.utils import tmp_to_root_org
from terminal import serializers
from terminal.const import TaskNameType
from terminal.models import Session, Task
from terminal.utils import is_session_approver
__all__ = ['TaskViewSet', 'KillSessionAPI', 'KillSessionForTicketAPI']
__all__ = ['TaskViewSet', 'KillSessionAPI', 'KillSessionForTicketAPI', ]
logger = logging.getLogger(__file__)
@@ -21,9 +24,44 @@ class TaskViewSet(JMSBulkModelViewSet):
queryset = Task.objects.all()
serializer_class = serializers.TaskSerializer
filterset_fields = ('is_finished',)
serializer_classes = {
'create_toggle_task': serializers.LockTaskSessionSerializer,
'handle_ticket_task': serializers.LockTaskSessionSerializer,
'default': serializers.TaskSerializer
}
rbac_perms = {
"create_toggle_task": "terminal.terminate_session",
}
@action(methods=[POST], detail=False, url_path='toggle-lock-session')
def create_toggle_task(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
session_id = serializer.validated_data['session_id']
task_name = serializer.validated_data['task_name']
session_ids = [session_id, ]
validated_session = create_sessions_tasks(session_ids, request.user, task_name=task_name)
return Response({"ok": validated_session})
@action(methods=[POST], detail=False, permission_classes=[IsAuthenticated, ],
url_path='toggle-lock-session-for-ticket', )
def handle_ticket_task(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
session_id = serializer.validated_data['session_id']
task_name = serializer.validated_data['task_name']
session_ids = [session_id, ]
user_id = request.user.id
for session_id in session_ids:
if not is_session_approver(session_id, user_id):
return Response({}, status=status.HTTP_403_FORBIDDEN)
with tmp_to_root_org():
validated_session = create_sessions_tasks(session_ids, request.user, task_name=task_name)
return Response({"ok": validated_session})
def kill_sessions(session_ids, user):
def create_sessions_tasks(session_ids, user, task_name=TaskNameType.kill_session):
validated_session = []
for session_id in session_ids:
@@ -31,9 +69,10 @@ def kill_sessions(session_ids, user):
if session and not session.is_finished:
validated_session.append(session_id)
Task.objects.create(
name="kill_session", args=session.id, terminal=session.terminal,
name=task_name, args=session.id, terminal=session.terminal,
kwargs={
'terminated_by': str(user)
'terminated_by': str(user),
'created_by': str(user)
}
)
return validated_session
@@ -47,7 +86,7 @@ class KillSessionAPI(APIView):
def post(self, request, *args, **kwargs):
session_ids = request.data
validated_session = kill_sessions(session_ids, request.user)
validated_session = create_sessions_tasks(session_ids, request.user)
return Response({"ok": validated_session})
@@ -63,6 +102,6 @@ class KillSessionForTicketAPI(APIView):
return Response({}, status=status.HTTP_403_FORBIDDEN)
with tmp_to_root_org():
validated_session = kill_sessions(session_ids, request.user)
validated_session = create_sessions_tasks(session_ids, request.user)
return Response({"ok": validated_session})

View File

@@ -88,3 +88,9 @@ class SessionType(TextChoices):
class ActionPermission(TextChoices):
readonly = "readonly", _('Read Only')
writable = "writable", _('Writable')
class TaskNameType(TextChoices):
kill_session = "kill_session", _('Kill Session')
lock_session = "lock_session", _('Lock Session')
unlock_session = "unlock_session", _('Unlock Session')

View File

@@ -4,15 +4,13 @@ from django.db import models
from django.utils.translation import gettext_lazy as _
from common.db.models import JMSBaseModel
from terminal.const import TaskNameType as SessionTaskChoices
from .terminal import Terminal
class Task(JMSBaseModel):
NAME_CHOICES = (
("kill_session", "Kill Session"),
)
name = models.CharField(max_length=128, choices=NAME_CHOICES, verbose_name=_("Name"))
name = models.CharField(max_length=128, choices=SessionTaskChoices.choices, verbose_name=_("Name"))
args = models.CharField(max_length=1024, verbose_name=_("Args"))
kwargs = models.JSONField(default=dict, verbose_name=_("Kwargs"))
terminal = models.ForeignKey(Terminal, null=True, on_delete=models.SET_NULL)

View File

@@ -48,6 +48,7 @@ class Session(OrgModelMixin):
upload_to = 'replay'
ACTIVE_CACHE_KEY_PREFIX = 'SESSION_ACTIVE_{}'
LOCK_CACHE_KEY_PREFIX = 'TOGGLE_LOCKED_SESSION_{}'
SUFFIX_MAP = {1: '.gz', 2: '.replay.gz', 3: '.cast.gz', 4: '.replay.mp4'}
DEFAULT_SUFFIXES = ['.replay.gz', '.cast.gz', '.gz', '.replay.mp4']
@@ -145,6 +146,26 @@ class Session(OrgModelMixin):
else:
return True
@property
def is_locked(self):
if self.is_finished:
return False
key = self.LOCK_CACHE_KEY_PREFIX.format(self.id)
return bool(cache.get(key))
@classmethod
def lock_session(cls, session_id):
key = cls.LOCK_CACHE_KEY_PREFIX.format(session_id)
# 会话锁定时间为 None表示永不过期
# You can set TIMEOUT to None so that, by default, cache keys never expire.
# https://docs.djangoproject.com/en/4.1/topics/cache/
cache.set(key, True, timeout=None)
@classmethod
def unlock_session(cls, session_id):
key = cls.LOCK_CACHE_KEY_PREFIX.format(session_id)
cache.delete(key)
@lazyproperty
def terminal_display(self):
display = self.terminal.name if self.terminal else ''

View File

@@ -1,10 +1,11 @@
# -*- coding: utf-8 -*-
#
from .applet import *
from .command import *
from .session import *
from .storage import *
from .sharing import *
from .terminal import *
from .endpoint import *
from .applet_host import *
from .command import *
from .endpoint import *
from .session import *
from .sharing import *
from .storage import *
from .task import *
from .terminal import *

View File

@@ -30,7 +30,7 @@ class SessionSerializer(BulkOrgResourceModelSerializer):
"user", "asset", "user_id", "asset_id", 'account', 'account_id',
"protocol", 'type', "login_from", "remote_addr",
"is_success", "is_finished", "has_replay", "has_command",
"date_start", "date_end", "comment", "terminal_display",
"date_start", "date_end", "comment", "terminal_display", "is_locked",
'command_amount',
]
fields_fk = ["terminal", ]

View File

@@ -0,0 +1,10 @@
from django.utils.translation import gettext_lazy as _
from rest_framework import serializers
from terminal.const import TaskNameType as SessionTaskChoices
class LockTaskSessionSerializer(serializers.Serializer):
session_id = serializers.CharField(max_length=40, label=_('Session id'))
task_name = serializers.ChoiceField(choices=SessionTaskChoices.choices, label=_('Task name'))

View File

@@ -1,5 +1,5 @@
from .applet import *
from .db_port import *
from .terminal import *
from .session_sharing import *
from .session import *
from .session_sharing import *
from .terminal import *

View File

@@ -1,4 +1,4 @@
from django.db.models.signals import pre_save
from django.db.models.signals import pre_save, post_save
from django.dispatch import receiver
from terminal.models import Session
@@ -8,3 +8,11 @@ from terminal.models import Session
def on_session_pre_save(sender, instance, **kwargs):
if instance.need_update_cmd_amount:
instance.cmd_amount = instance.compute_command_amount()
@receiver(post_save, sender=Session)
def on_session_finished(sender, instance: Session, created, **kwargs):
if not instance.is_finished:
return
# 清理一次可能因 task 未执行的缓存数据
Session.unlock_session(instance.id)

View File

@@ -5,7 +5,8 @@ from django.utils.functional import LazyObject
from common.decorators import on_transaction_commit
from common.utils import get_logger
from common.utils.connection import RedisPubSub
from ..models import Task
from ..const import TaskNameType
from ..models import Task, Session
from ..utils import DBPortManager
db_port_manager: DBPortManager
@@ -24,7 +25,17 @@ component_event_chan = ComponentEventChan()
@receiver(post_save, sender=Task)
@on_transaction_commit
def on_task_created(sender, instance: Task, created, **kwargs):
if not created:
if not created and instance.is_finished:
# 当组件完成 task 时,修改 session 的 lock 状态
session_id = instance.args
name = instance.name
if name == TaskNameType.lock_session:
Session.lock_session(session_id)
elif name == TaskNameType.unlock_session:
Session.unlock_session(session_id)
logger.debug(f"signal task post save: {instance.name}, "
f"session: {session_id}, "
f"is_finished: {instance.is_finished}")
return
event = {
"type": instance.name,

View File

@@ -2,10 +2,9 @@
# -*- coding: utf-8 -*-
#
from django.urls import path, re_path
from django.urls import path
from rest_framework_bulk.routes import BulkRouter
from common import api as capi
from .. import api
app_name = 'terminal'

View File

@@ -7,8 +7,8 @@ from rest_framework.renderers import JSONRenderer
from common.db.utils import safe_db_connection
from common.utils import get_logger
from common.utils.connection import Subscription
from terminal.models import Terminal
from terminal.models import Session
from terminal.const import TaskNameType
from terminal.models import Session, Terminal
from terminal.serializers import TaskSerializer, StatSerializer
from .signal_handlers import component_event_chan
@@ -45,7 +45,7 @@ class TerminalTaskWebsocket(JsonWebsocketConsumer):
with safe_db_connection():
serializer.save()
def send_kill_tasks_msg(self, task_id=None):
def send_tasks_msg(self, task_id=None):
content = self.get_terminal_tasks(task_id)
self.send(bytes_data=content)
@@ -60,7 +60,7 @@ class TerminalTaskWebsocket(JsonWebsocketConsumer):
def watch_component_event(self):
# 先发一次已有的任务
self.send_kill_tasks_msg()
self.send_tasks_msg()
ws = self
@@ -68,8 +68,8 @@ class TerminalTaskWebsocket(JsonWebsocketConsumer):
logger.debug('New component task msg recv: {}'.format(msg))
msg_type = msg.get('type')
payload = msg.get('payload')
if msg_type == "kill_session":
ws.send_kill_tasks_msg(payload.get('id'))
if msg_type in TaskNameType.names:
ws.send_tasks_msg(payload.get('id'))
return component_event_chan.subscribe(handle_task_msg_recv)