jumpserver/apps/ops/celery/utils.py
老广 4944ac8e75 Config (#3502)
* [Update] 修改config

* [Update] 移动存储设置到到terminal中

* [Update] 修改permission 查看

* [Update] pre merge

* [Update] 录像存储

* [Update] 命令存储

* [Update] 添加存储测试可连接性

* [Update] 修改 meta 值的 key 为大写

* [Update] 修改 Terminal 相关 Storage 配置

* [Update] 删除之前获取录像/命令存储的代码

* [Update] 修改导入失败

* [Update] 迁移文件添加default存储

* [Update] 删除之前代码,添加help_text信息

* [Update] 删除之前代码

* [Update] 删除之前代码

* [Update] 抽象命令/录像存储 APIView

* [Update] 抽象命令/录像存储 APIView 1

* [Update] 抽象命令/录像存储 DictField

* [Update] 抽象命令/录像存储列表页面

* [Update] 修复CustomDictField的bug

* [Update] RemoteApp 页面添加 hidden

* [Update] 用户页面添加用户关联授权

* [Update] 修改存储测试可连接性 target

* [Update] 修改配置

* [Update] 修改存储前端 Form 渲染逻辑

* [Update] 修改存储细节

* [Update] 统一存储类型到 const 文件

* [Update] 修改迁移文件及Model,创建默认存储

* [Update] 修改迁移文件及Model初始化默认数据

* [Update] 修改迁移文件

* [Update] 修改迁移文件

* [Update] 修改迁移文件

* [Update] 修改迁移文件

* [Update] 修改迁移文件

* [Update] 修改迁移文件

* [Update] 修改迁移文件

* [Update] 限制删除默认存储配置,只允许创建扩展的存储类型

* [Update] 修改ip字段长度

* [Update] 修改ip字段长度

* [Update] 修改一些css

* [Update] 修改关联

* [Update] 添加操作日志定时清理

* [Update] 修改记录syslog的instance encoder

* [Update] 忽略登录产生的操作日志

* [Update] 限制更新存储时不覆盖原有AK SK 等字段

* [Update] 修改迁移文件添加comment字段

* [Update] 修改迁移文件

* [Update] 添加 comment 字段

* [Update] 修改默认存储no -> null

* [Update] 修改细节

* [Update] 更新翻译(存储配置

* [Update] 修改定时任务注册,修改系统用户资产、节点关系api

* [Update] 添加监控磁盘任务

* [Update] 修改session

* [Update] 拆分serializer

* [Update] 还原setting原来的manager
2019-12-05 15:09:25 +08:00

105 lines
3.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
#
import json
import os
from django.conf import settings
from django.utils.timezone import get_current_timezone
from django.db.utils import ProgrammingError, OperationalError
from django_celery_beat.models import (
PeriodicTask, IntervalSchedule, CrontabSchedule, PeriodicTasks
)
from common.utils import get_logger
logger = get_logger(__name__)
def create_or_update_celery_periodic_tasks(tasks):
"""
:param tasks: {
'add-every-monday-morning': {
'task': 'tasks.add' # A registered celery task,
'interval': 30,
'crontab': "30 7 * * *",
'args': (16, 16),
'kwargs': {},
'enabled': False,
'description': ''
},
}
:return:
"""
# Todo: check task valid, task and callback must be a celery task
for name, detail in tasks.items():
interval = None
crontab = None
try:
IntervalSchedule.objects.all().count()
except (ProgrammingError, OperationalError):
return None
if isinstance(detail.get("interval"), int):
kwargs = dict(
every=detail['interval'],
period=IntervalSchedule.SECONDS,
)
# 不能使用 get_or_create因为可能会有多个
interval = IntervalSchedule.objects.filter(**kwargs).first()
if interval is None:
interval = IntervalSchedule.objects.create(**kwargs)
elif isinstance(detail.get("crontab"), str):
try:
minute, hour, day, month, week = detail["crontab"].split()
except ValueError:
logger.error("crontab is not valid")
return
kwargs = dict(
minute=minute, hour=hour, day_of_week=week,
day_of_month=day, month_of_year=month, timezone=get_current_timezone()
)
crontab = CrontabSchedule.objects.filter(**kwargs).first()
if crontab is None:
crontab = CrontabSchedule.objects.create(**kwargs)
else:
logger.error("Schedule is not valid")
return
defaults = dict(
interval=interval,
crontab=crontab,
name=name,
task=detail['task'],
args=json.dumps(detail.get('args', [])),
kwargs=json.dumps(detail.get('kwargs', {})),
description=detail.get('description') or ''
)
print(defaults)
task = PeriodicTask.objects.update_or_create(
defaults=defaults, name=name,
)
PeriodicTasks.update_changed()
return task
def disable_celery_periodic_task(task_name):
from django_celery_beat.models import PeriodicTask
PeriodicTask.objects.filter(name=task_name).update(enabled=False)
PeriodicTasks.update_changed()
def delete_celery_periodic_task(task_name):
from django_celery_beat.models import PeriodicTask
PeriodicTask.objects.filter(name=task_name).delete()
PeriodicTasks.update_changed()
def get_celery_task_log_path(task_id):
task_id = str(task_id)
rel_path = os.path.join(task_id[0], task_id[1], task_id + '.log')
path = os.path.join(settings.CELERY_LOG_DIR, rel_path)
os.makedirs(os.path.dirname(path), exist_ok=True)
return path