jumpserver/apps/ops/celery/signal_handler.py
老广 4944ac8e75 Config (#3502)
* [Update] 修改config

* [Update] 移动存储设置到到terminal中

* [Update] 修改permission 查看

* [Update] pre merge

* [Update] 录像存储

* [Update] 命令存储

* [Update] 添加存储测试可连接性

* [Update] 修改 meta 值的 key 为大写

* [Update] 修改 Terminal 相关 Storage 配置

* [Update] 删除之前获取录像/命令存储的代码

* [Update] 修改导入失败

* [Update] 迁移文件添加default存储

* [Update] 删除之前代码,添加help_text信息

* [Update] 删除之前代码

* [Update] 删除之前代码

* [Update] 抽象命令/录像存储 APIView

* [Update] 抽象命令/录像存储 APIView 1

* [Update] 抽象命令/录像存储 DictField

* [Update] 抽象命令/录像存储列表页面

* [Update] 修复CustomDictField的bug

* [Update] RemoteApp 页面添加 hidden

* [Update] 用户页面添加用户关联授权

* [Update] 修改存储测试可连接性 target

* [Update] 修改配置

* [Update] 修改存储前端 Form 渲染逻辑

* [Update] 修改存储细节

* [Update] 统一存储类型到 const 文件

* [Update] 修改迁移文件及Model,创建默认存储

* [Update] 修改迁移文件及Model初始化默认数据

* [Update] 修改迁移文件

* [Update] 修改迁移文件

* [Update] 修改迁移文件

* [Update] 修改迁移文件

* [Update] 修改迁移文件

* [Update] 修改迁移文件

* [Update] 修改迁移文件

* [Update] 限制删除默认存储配置,只允许创建扩展的存储类型

* [Update] 修改ip字段长度

* [Update] 修改ip字段长度

* [Update] 修改一些css

* [Update] 修改关联

* [Update] 添加操作日志定时清理

* [Update] 修改记录syslog的instance encoder

* [Update] 忽略登录产生的操作日志

* [Update] 限制更新存储时不覆盖原有AK SK 等字段

* [Update] 修改迁移文件添加comment字段

* [Update] 修改迁移文件

* [Update] 添加 comment 字段

* [Update] 修改默认存储no -> null

* [Update] 修改细节

* [Update] 更新翻译(存储配置

* [Update] 修改定时任务注册,修改系统用户资产、节点关系api

* [Update] 添加监控磁盘任务

* [Update] 修改session

* [Update] 拆分serializer

* [Update] 还原setting原来的manager
2019-12-05 15:09:25 +08:00

55 lines
1.7 KiB
Python

# -*- coding: utf-8 -*-
#
import logging
from django.dispatch import receiver
from django.core.cache import cache
from celery import subtask
from celery.signals import (
worker_ready, worker_shutdown, after_setup_logger
)
from kombu.utils.encoding import safe_str
from django_celery_beat.models import PeriodicTask
from common.utils import get_logger
from common.signals import django_ready
from .decorator import get_after_app_ready_tasks, get_after_app_shutdown_clean_tasks
from .logger import CeleryTaskFileHandler
logger = get_logger(__file__)
safe_str = lambda x: x
@worker_ready.connect
def on_app_ready(sender=None, headers=None, **kwargs):
if cache.get("CELERY_APP_READY", 0) == 1:
return
cache.set("CELERY_APP_READY", 1, 10)
tasks = get_after_app_ready_tasks()
logger.debug("Work ready signal recv")
logger.debug("Start need start task: [{}]".format(", ".join(tasks)))
for task in tasks:
subtask(task).delay()
@worker_shutdown.connect
def after_app_shutdown_periodic_tasks(sender=None, **kwargs):
if cache.get("CELERY_APP_SHUTDOWN", 0) == 1:
return
cache.set("CELERY_APP_SHUTDOWN", 1, 10)
tasks = get_after_app_shutdown_clean_tasks()
logger.debug("Worker shutdown signal recv")
logger.debug("Clean period tasks: [{}]".format(', '.join(tasks)))
PeriodicTask.objects.filter(name__in=tasks).delete()
@after_setup_logger.connect
def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs):
if not logger:
return
task_handler = CeleryTaskFileHandler()
task_handler.setLevel(loglevel)
formatter = logging.Formatter(format)
task_handler.setFormatter(formatter)
logger.addHandler(task_handler)