jumpserver/apps/terminal/tasks.py
Jiangjie.Bai 03afa4f974
Fix rbac (#7713)
* fix: token 系统用户增加 protocol

* fix: 修复清除orphan session时同时清除对应的 session_task

* perf: 修改 connection token api

* fix: 修复无法获取系统角色绑定的问题

* perf: 增加 db terminal 及 magnus 组件

* perf: 修改 migrations

* fix: 修复AUTHENTICATION_BACKENDS相关的逻辑

* fix: 修改判断backend认证逻辑

* fix: 修复资产账号查看密码跳过mfa

* fix: 修复用户组授权权限错误

* feat: 支持COS对象存储

* feat: 升级依赖 jms_storage==0.0.42

* fix: 修复 koko api 问题

* feat: 修改存储翻译信息

* perf: 修改 ticket 权限

* fix: 修复获取资产授权系统用户 get_queryset

* perf: 抽取 ticket

* perf: 修改 cmd filter 的权限

* fix: 修改 ticket perm

* fix: 修复oidc依赖问题

Co-authored-by: Eric <xplzv@126.com>
Co-authored-by: ibuler <ibuler@qq.com>
Co-authored-by: 小冯 <xiaofeng@xiaofengdeMacBook-Pro.local>
Co-authored-by: feng626 <1304903146@qq.com>
2022-02-28 19:28:58 +08:00

102 lines
3.4 KiB
Python

# -*- coding: utf-8 -*-
#
import os
import subprocess
import datetime
from celery import shared_task
from celery.utils.log import get_task_logger
from django.utils import timezone
from django.core.files.storage import default_storage
from common.utils import get_log_keep_day
from ops.celery.decorator import (
register_as_period_task, after_app_ready_start, after_app_shutdown_clean_periodic
)
from .models import Status, Session, Command, Task
from .backends import server_replay_storage
from .utils import find_session_replay_local
CACHE_REFRESH_INTERVAL = 10
RUNNING = False
logger = get_task_logger(__name__)
@shared_task
@register_as_period_task(interval=3600)
@after_app_ready_start
@after_app_shutdown_clean_periodic
def delete_terminal_status_period():
yesterday = timezone.now() - datetime.timedelta(days=7)
Status.objects.filter(date_created__lt=yesterday).delete()
@shared_task
@register_as_period_task(interval=600)
@after_app_ready_start
@after_app_shutdown_clean_periodic
def clean_orphan_session():
active_sessions = Session.objects.filter(is_finished=False)
for session in active_sessions:
# finished task
Task.objects.filter(args=str(session.id), is_finished=False).update(
is_finished=True, date_finished=timezone.now()
)
# finished session
if session.is_active():
continue
session.is_finished = True
session.date_end = timezone.now()
session.save()
@shared_task
@register_as_period_task(interval=3600*24)
@after_app_ready_start
@after_app_shutdown_clean_periodic
def clean_expired_session_period():
logger.info("Start clean expired session record, commands and replay")
days = get_log_keep_day('TERMINAL_SESSION_KEEP_DURATION')
expire_date = timezone.now() - timezone.timedelta(days=days)
expired_sessions = Session.objects.filter(date_start__lt=expire_date)
timestamp = expire_date.timestamp()
expired_commands = Command.objects.filter(timestamp__lt=timestamp)
replay_dir = os.path.join(default_storage.base_location, 'replay')
expired_sessions.delete()
logger.info("Clean session item done")
expired_commands.delete()
logger.info("Clean session command done")
command = "find %s -mtime +%s \\( -name '*.json' -o -name '*.tar' -o -name '*.gz' \\) -exec rm -f {} \\;" % (
replay_dir, days
)
subprocess.call(command, shell=True)
command = "find %s -type d -empty -delete;" % replay_dir
subprocess.call(command, shell=True)
logger.info("Clean session replay done")
@shared_task
def upload_session_replay_to_external_storage(session_id):
logger.info(f'Start upload session to external storage: {session_id}')
session = Session.objects.filter(id=session_id).first()
if not session:
logger.error(f'Session db item not found: {session_id}')
return
local_path, foobar = find_session_replay_local(session)
if not local_path:
logger.error(f'Session replay not found, may be upload error: {local_path}')
return
abs_path = default_storage.path(local_path)
remote_path = session.get_relative_path_by_local_path(abs_path)
ok, err = server_replay_storage.upload(abs_path, remote_path)
if not ok:
logger.error(f'Session replay upload to external error: {err}')
return
try:
default_storage.delete(local_path)
except:
pass
return