mirror of
https://github.com/jumpserver/jumpserver.git
synced 2025-09-18 16:39:28 +00:00
Bugfix (#2346)
* [Update] 修改command Post导致的output错误和定时任务创建问题 * [Update] 修改celery 日志 * [Update] 修改task日志方式 * [Update] 修改Docker file
This commit is contained in:
109
apps/ops/celery/decorator.py
Normal file
109
apps/ops/celery/decorator.py
Normal file
@@ -0,0 +1,109 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
from functools import wraps
|
||||
|
||||
_need_registered_period_tasks = []
|
||||
_after_app_ready_start_tasks = []
|
||||
_after_app_shutdown_clean_periodic_tasks = []
|
||||
|
||||
|
||||
def add_register_period_task(task):
|
||||
_need_registered_period_tasks.append(task)
|
||||
# key = "__REGISTER_PERIODIC_TASKS"
|
||||
# value = cache.get(key, [])
|
||||
# value.append(name)
|
||||
# cache.set(key, value)
|
||||
|
||||
|
||||
def get_register_period_tasks():
|
||||
# key = "__REGISTER_PERIODIC_TASKS"
|
||||
# return cache.get(key, [])
|
||||
return _need_registered_period_tasks
|
||||
|
||||
|
||||
def add_after_app_shutdown_clean_task(name):
|
||||
# key = "__AFTER_APP_SHUTDOWN_CLEAN_TASKS"
|
||||
# value = cache.get(key, [])
|
||||
# value.append(name)
|
||||
# cache.set(key, value)
|
||||
_after_app_shutdown_clean_periodic_tasks.append(name)
|
||||
|
||||
|
||||
def get_after_app_shutdown_clean_tasks():
|
||||
# key = "__AFTER_APP_SHUTDOWN_CLEAN_TASKS"
|
||||
# return cache.get(key, [])
|
||||
return _after_app_shutdown_clean_periodic_tasks
|
||||
|
||||
|
||||
def add_after_app_ready_task(name):
|
||||
# key = "__AFTER_APP_READY_RUN_TASKS"
|
||||
# value = cache.get(key, [])
|
||||
# value.append(name)
|
||||
# cache.set(key, value)
|
||||
_after_app_ready_start_tasks.append(name)
|
||||
|
||||
|
||||
def get_after_app_ready_tasks():
|
||||
# key = "__AFTER_APP_READY_RUN_TASKS"
|
||||
# return cache.get(key, [])
|
||||
return _after_app_ready_start_tasks
|
||||
|
||||
|
||||
def register_as_period_task(crontab=None, interval=None):
|
||||
"""
|
||||
Warning: Task must be have not any args and kwargs
|
||||
:param crontab: "* * * * *"
|
||||
:param interval: 60*60*60
|
||||
:return:
|
||||
"""
|
||||
if crontab is None and interval is None:
|
||||
raise SyntaxError("Must set crontab or interval one")
|
||||
|
||||
def decorate(func):
|
||||
if crontab is None and interval is None:
|
||||
raise SyntaxError("Interval and crontab must set one")
|
||||
|
||||
# Because when this decorator run, the task was not created,
|
||||
# So we can't use func.name
|
||||
name = '{func.__module__}.{func.__name__}'.format(func=func)
|
||||
add_register_period_task({
|
||||
name: {
|
||||
'task': name,
|
||||
'interval': interval,
|
||||
'crontab': crontab,
|
||||
'args': (),
|
||||
'enabled': True,
|
||||
}
|
||||
})
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
return func(*args, **kwargs)
|
||||
return wrapper
|
||||
return decorate
|
||||
|
||||
|
||||
def after_app_ready_start(func):
|
||||
# Because when this decorator run, the task was not created,
|
||||
# So we can't use func.name
|
||||
name = '{func.__module__}.{func.__name__}'.format(func=func)
|
||||
if name not in _after_app_ready_start_tasks:
|
||||
add_after_app_ready_task(name)
|
||||
|
||||
@wraps(func)
|
||||
def decorate(*args, **kwargs):
|
||||
return func(*args, **kwargs)
|
||||
return decorate
|
||||
|
||||
|
||||
def after_app_shutdown_clean_periodic(func):
|
||||
# Because when this decorator run, the task was not created,
|
||||
# So we can't use func.name
|
||||
name = '{func.__module__}.{func.__name__}'.format(func=func)
|
||||
if name not in _after_app_shutdown_clean_periodic_tasks:
|
||||
add_after_app_shutdown_clean_task(name)
|
||||
|
||||
@wraps(func)
|
||||
def decorate(*args, **kwargs):
|
||||
return func(*args, **kwargs)
|
||||
return decorate
|
160
apps/ops/celery/logger.py
Normal file
160
apps/ops/celery/logger.py
Normal file
@@ -0,0 +1,160 @@
|
||||
from logging import StreamHandler
|
||||
|
||||
from django.conf import settings
|
||||
from celery import current_task
|
||||
from celery.signals import task_prerun, task_postrun
|
||||
from kombu import Connection, Exchange, Queue, Producer
|
||||
from kombu.mixins import ConsumerMixin
|
||||
|
||||
from .utils import get_celery_task_log_path
|
||||
|
||||
routing_key = 'celery_log'
|
||||
celery_log_exchange = Exchange('celery_log_exchange', type='direct')
|
||||
celery_log_queue = [Queue('celery_log', celery_log_exchange, routing_key=routing_key)]
|
||||
|
||||
|
||||
class CeleryLoggerConsumer(ConsumerMixin):
|
||||
def __init__(self):
|
||||
self.connection = Connection(settings.CELERY_LOG_BROKER_URL)
|
||||
|
||||
def get_consumers(self, Consumer, channel):
|
||||
return [Consumer(queues=celery_log_queue,
|
||||
accept=['pickle', 'json'],
|
||||
callbacks=[self.process_task])
|
||||
]
|
||||
|
||||
def handle_task_start(self, task_id, message):
|
||||
pass
|
||||
|
||||
def handle_task_end(self, task_id, message):
|
||||
pass
|
||||
|
||||
def handle_task_log(self, task_id, msg, message):
|
||||
pass
|
||||
|
||||
def process_task(self, body, message):
|
||||
action = body.get('action')
|
||||
task_id = body.get('task_id')
|
||||
msg = body.get('msg')
|
||||
if action == CeleryLoggerProducer.ACTION_TASK_LOG:
|
||||
self.handle_task_log(task_id, msg, message)
|
||||
elif action == CeleryLoggerProducer.ACTION_TASK_START:
|
||||
self.handle_task_start(task_id, message)
|
||||
elif action == CeleryLoggerProducer.ACTION_TASK_END:
|
||||
self.handle_task_end(task_id, message)
|
||||
|
||||
|
||||
class CeleryLoggerProducer:
|
||||
ACTION_TASK_START, ACTION_TASK_LOG, ACTION_TASK_END = range(3)
|
||||
|
||||
def __init__(self):
|
||||
self.connection = Connection(settings.CELERY_LOG_BROKER_URL)
|
||||
|
||||
@property
|
||||
def producer(self):
|
||||
return Producer(self.connection)
|
||||
|
||||
def publish(self, payload):
|
||||
self.producer.publish(
|
||||
payload, serializer='json', exchange=celery_log_exchange,
|
||||
declare=[celery_log_exchange], routing_key=routing_key
|
||||
)
|
||||
|
||||
def log(self, task_id, msg):
|
||||
payload = {'task_id': task_id, 'msg': msg, 'action': self.ACTION_TASK_LOG}
|
||||
return self.publish(payload)
|
||||
|
||||
def read(self):
|
||||
pass
|
||||
|
||||
def flush(self):
|
||||
pass
|
||||
|
||||
def task_end(self, task_id):
|
||||
payload = {'task_id': task_id, 'action': self.ACTION_TASK_END}
|
||||
return self.publish(payload)
|
||||
|
||||
def task_start(self, task_id):
|
||||
payload = {'task_id': task_id, 'action': self.ACTION_TASK_START}
|
||||
return self.publish(payload)
|
||||
|
||||
|
||||
class CeleryTaskLoggerHandler(StreamHandler):
|
||||
terminator = '\r\n'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
task_prerun.connect(self.on_task_start)
|
||||
task_postrun.connect(self.on_start_end)
|
||||
|
||||
@staticmethod
|
||||
def get_current_task_id():
|
||||
if not current_task:
|
||||
return
|
||||
task_id = current_task.request.root_id
|
||||
return task_id
|
||||
|
||||
def on_task_start(self, sender, task_id, **kwargs):
|
||||
return self.handle_task_start(task_id)
|
||||
|
||||
def on_start_end(self, sender, task_id, **kwargs):
|
||||
return self.handle_task_end(task_id)
|
||||
|
||||
def after_task_publish(self, sender, body, **kwargs):
|
||||
pass
|
||||
|
||||
def emit(self, record):
|
||||
task_id = self.get_current_task_id()
|
||||
if not task_id:
|
||||
return
|
||||
try:
|
||||
self.write_task_log(task_id, record)
|
||||
self.flush()
|
||||
except Exception:
|
||||
self.handleError(record)
|
||||
|
||||
def write_task_log(self, task_id, msg):
|
||||
pass
|
||||
|
||||
def handle_task_start(self, task_id):
|
||||
pass
|
||||
|
||||
def handle_task_end(self, task_id):
|
||||
pass
|
||||
|
||||
|
||||
class CeleryTaskMQLoggerHandler(CeleryTaskLoggerHandler):
|
||||
def __init__(self):
|
||||
self.producer = CeleryLoggerProducer()
|
||||
super().__init__(stream=None)
|
||||
|
||||
def write_task_log(self, task_id, record):
|
||||
msg = self.format(record)
|
||||
self.producer.log(task_id, msg)
|
||||
|
||||
def flush(self):
|
||||
self.producer.flush()
|
||||
|
||||
|
||||
class CeleryTaskFileHandler(CeleryTaskLoggerHandler):
|
||||
def __init__(self):
|
||||
self.f = None
|
||||
super().__init__(stream=None)
|
||||
|
||||
def emit(self, record):
|
||||
msg = self.format(record)
|
||||
if not self.f:
|
||||
return
|
||||
self.f.write(msg)
|
||||
self.f.write(self.terminator)
|
||||
self.flush()
|
||||
|
||||
def flush(self):
|
||||
self.f and self.f.flush()
|
||||
|
||||
def handle_task_start(self, task_id):
|
||||
log_path = get_celery_task_log_path(task_id)
|
||||
self.f = open(log_path, 'a')
|
||||
|
||||
def handle_task_end(self, task_id):
|
||||
self.f and self.f.close()
|
@@ -1,103 +1,105 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
import os
|
||||
import datetime
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
|
||||
from django.conf import settings
|
||||
from django.utils import timezone
|
||||
from django.core.cache import cache
|
||||
from django.db import transaction
|
||||
from celery import subtask
|
||||
from celery.signals import worker_ready, worker_shutdown, task_prerun, \
|
||||
task_postrun, after_task_publish
|
||||
from celery.signals import (
|
||||
worker_ready, worker_shutdown, after_setup_logger
|
||||
)
|
||||
from django_celery_beat.models import PeriodicTask
|
||||
|
||||
from common.utils import get_logger, TeeObj, get_object_or_none
|
||||
from common.const import celery_task_pre_key
|
||||
from .utils import get_after_app_ready_tasks, get_after_app_shutdown_clean_tasks
|
||||
from ..models import CeleryTask
|
||||
from common.utils import get_logger
|
||||
from .decorator import get_after_app_ready_tasks, get_after_app_shutdown_clean_tasks
|
||||
from .logger import CeleryTaskFileHandler
|
||||
|
||||
logger = get_logger(__file__)
|
||||
|
||||
|
||||
@worker_ready.connect
|
||||
def on_app_ready(sender=None, headers=None, body=None, **kwargs):
|
||||
def on_app_ready(sender=None, headers=None, **kwargs):
|
||||
if cache.get("CELERY_APP_READY", 0) == 1:
|
||||
return
|
||||
cache.set("CELERY_APP_READY", 1, 10)
|
||||
tasks = get_after_app_ready_tasks()
|
||||
logger.debug("Start need start task: [{}]".format(
|
||||
", ".join(tasks))
|
||||
)
|
||||
logger.debug("Work ready signal recv")
|
||||
logger.debug("Start need start task: [{}]".format(", ".join(tasks)))
|
||||
for task in tasks:
|
||||
subtask(task).delay()
|
||||
|
||||
|
||||
@worker_shutdown.connect
|
||||
def after_app_shutdown(sender=None, headers=None, body=None, **kwargs):
|
||||
def after_app_shutdown_periodic_tasks(sender=None, **kwargs):
|
||||
if cache.get("CELERY_APP_SHUTDOWN", 0) == 1:
|
||||
return
|
||||
cache.set("CELERY_APP_SHUTDOWN", 1, 10)
|
||||
tasks = get_after_app_shutdown_clean_tasks()
|
||||
logger.debug("App shutdown signal recv")
|
||||
logger.debug("Clean need cleaned period tasks: [{}]".format(
|
||||
', '.join(tasks))
|
||||
)
|
||||
logger.debug("Worker shutdown signal recv")
|
||||
logger.debug("Clean period tasks: [{}]".format(', '.join(tasks)))
|
||||
PeriodicTask.objects.filter(name__in=tasks).delete()
|
||||
|
||||
|
||||
@after_task_publish.connect
|
||||
def after_task_publish_signal_handler(sender, headers=None, **kwargs):
|
||||
CeleryTask.objects.create(
|
||||
id=headers["id"], status=CeleryTask.WAITING, name=headers["task"]
|
||||
)
|
||||
cache.set(headers["id"], True, 3600)
|
||||
|
||||
|
||||
@task_prerun.connect
|
||||
def pre_run_task_signal_handler(sender, task_id=None, task=None, **kwargs):
|
||||
time.sleep(0.1)
|
||||
for i in range(5):
|
||||
if cache.get(task_id, False):
|
||||
break
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
t = get_object_or_none(CeleryTask, id=task_id)
|
||||
if t is None:
|
||||
logger.warn("Not get the task: {}".format(task_id))
|
||||
@after_setup_logger.connect
|
||||
def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs):
|
||||
if not logger:
|
||||
return
|
||||
now = datetime.datetime.now().strftime("%Y-%m-%d")
|
||||
log_path = os.path.join(now, task_id + '.log')
|
||||
full_path = os.path.join(CeleryTask.LOG_DIR, log_path)
|
||||
|
||||
if not os.path.exists(os.path.dirname(full_path)):
|
||||
os.makedirs(os.path.dirname(full_path))
|
||||
with transaction.atomic():
|
||||
t.date_start = timezone.now()
|
||||
t.status = CeleryTask.RUNNING
|
||||
t.log_path = log_path
|
||||
t.save()
|
||||
f = open(full_path, 'w', encoding="utf-8")
|
||||
tee = TeeObj(f)
|
||||
sys.stdout = tee
|
||||
task.log_f = tee
|
||||
handler = CeleryTaskFileHandler()
|
||||
handler.setLevel(loglevel)
|
||||
formatter = logging.Formatter(format)
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
|
||||
|
||||
@task_postrun.connect
|
||||
def post_run_task_signal_handler(sender, task_id=None, task=None, **kwargs):
|
||||
t = get_object_or_none(CeleryTask, id=task_id)
|
||||
if t is None:
|
||||
logger.warn("Not get the task: {}".format(task_id))
|
||||
return
|
||||
with transaction.atomic():
|
||||
t.status = CeleryTask.FINISHED
|
||||
t.date_finished = timezone.now()
|
||||
t.save()
|
||||
task.log_f.flush()
|
||||
sys.stdout = task.log_f.origin_stdout
|
||||
task.log_f.close()
|
||||
# @after_task_publish.connect
|
||||
# def after_task_publish_signal_handler(sender, headers=None, **kwargs):
|
||||
# CeleryTask.objects.create(
|
||||
# id=headers["id"], status=CeleryTask.WAITING, name=headers["task"]
|
||||
# )
|
||||
# cache.set(headers["id"], True, 3600)
|
||||
#
|
||||
#
|
||||
# @task_prerun.connect
|
||||
# def pre_run_task_signal_handler(sender, task_id=None, task=None, **kwargs):
|
||||
# time.sleep(0.1)
|
||||
# for i in range(5):
|
||||
# if cache.get(task_id, False):
|
||||
# break
|
||||
# else:
|
||||
# time.sleep(0.1)
|
||||
# continue
|
||||
#
|
||||
# t = get_object_or_none(CeleryTask, id=task_id)
|
||||
# if t is None:
|
||||
# logger.warn("Not get the task: {}".format(task_id))
|
||||
# return
|
||||
# now = datetime.datetime.now().strftime("%Y-%m-%d")
|
||||
# log_path = os.path.join(now, task_id + '.log')
|
||||
# full_path = os.path.join(CeleryTask.LOG_DIR, log_path)
|
||||
#
|
||||
# if not os.path.exists(os.path.dirname(full_path)):
|
||||
# os.makedirs(os.path.dirname(full_path))
|
||||
# with transaction.atomic():
|
||||
# t.date_start = timezone.now()
|
||||
# t.status = CeleryTask.RUNNING
|
||||
# t.log_path = log_path
|
||||
# t.save()
|
||||
# f = open(full_path, 'w', encoding="utf-8")
|
||||
# tee = TeeObj(f)
|
||||
# sys.stdout = tee
|
||||
# task.log_f = tee
|
||||
#
|
||||
#
|
||||
# @task_postrun.connect
|
||||
# def post_run_task_signal_handler(sender, task_id=None, task=None, **kwargs):
|
||||
# t = get_object_or_none(CeleryTask, id=task_id)
|
||||
# if t is None:
|
||||
# logger.warn("Not get the task: {}".format(task_id))
|
||||
# return
|
||||
# with transaction.atomic():
|
||||
# t.status = CeleryTask.FINISHED
|
||||
# t.date_finished = timezone.now()
|
||||
# t.save()
|
||||
# task.log_f.flush()
|
||||
# sys.stdout = task.log_f.origin_stdout
|
||||
# task.log_f.close()
|
||||
|
||||
|
@@ -1,49 +1,13 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
import json
|
||||
from functools import wraps
|
||||
import os
|
||||
|
||||
from django.conf import settings
|
||||
from django.db.utils import ProgrammingError, OperationalError
|
||||
from django.core.cache import cache
|
||||
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
|
||||
|
||||
|
||||
def add_register_period_task(name):
|
||||
key = "__REGISTER_PERIODIC_TASKS"
|
||||
value = cache.get(key, [])
|
||||
value.append(name)
|
||||
cache.set(key, value)
|
||||
|
||||
|
||||
def get_register_period_tasks():
|
||||
key = "__REGISTER_PERIODIC_TASKS"
|
||||
return cache.get(key, [])
|
||||
|
||||
|
||||
def add_after_app_shutdown_clean_task(name):
|
||||
key = "__AFTER_APP_SHUTDOWN_CLEAN_TASKS"
|
||||
value = cache.get(key, [])
|
||||
value.append(name)
|
||||
cache.set(key, value)
|
||||
|
||||
|
||||
def get_after_app_shutdown_clean_tasks():
|
||||
key = "__AFTER_APP_SHUTDOWN_CLEAN_TASKS"
|
||||
return cache.get(key, [])
|
||||
|
||||
|
||||
def add_after_app_ready_task(name):
|
||||
key = "__AFTER_APP_READY_RUN_TASKS"
|
||||
value = cache.get(key, [])
|
||||
value.append(name)
|
||||
cache.set(key, value)
|
||||
|
||||
|
||||
def get_after_app_ready_tasks():
|
||||
key = "__AFTER_APP_READY_RUN_TASKS"
|
||||
return cache.get(key, [])
|
||||
|
||||
|
||||
def create_or_update_celery_periodic_tasks(tasks):
|
||||
"""
|
||||
:param tasks: {
|
||||
@@ -123,63 +87,10 @@ def delete_celery_periodic_task(task_name):
|
||||
PeriodicTask.objects.filter(name=task_name).delete()
|
||||
|
||||
|
||||
def register_as_period_task(crontab=None, interval=None):
|
||||
"""
|
||||
Warning: Task must be have not any args and kwargs
|
||||
:param crontab: "* * * * *"
|
||||
:param interval: 60*60*60
|
||||
:return:
|
||||
"""
|
||||
if crontab is None and interval is None:
|
||||
raise SyntaxError("Must set crontab or interval one")
|
||||
def get_celery_task_log_path(task_id):
|
||||
task_id = str(task_id)
|
||||
rel_path = os.path.join(task_id[0], task_id[1], task_id + '.log')
|
||||
path = os.path.join(settings.CELERY_LOG_DIR, rel_path)
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
return path
|
||||
|
||||
def decorate(func):
|
||||
if crontab is None and interval is None:
|
||||
raise SyntaxError("Interval and crontab must set one")
|
||||
|
||||
# Because when this decorator run, the task was not created,
|
||||
# So we can't use func.name
|
||||
name = '{func.__module__}.{func.__name__}'.format(func=func)
|
||||
if name not in get_register_period_tasks():
|
||||
create_or_update_celery_periodic_tasks({
|
||||
name: {
|
||||
'task': name,
|
||||
'interval': interval,
|
||||
'crontab': crontab,
|
||||
'args': (),
|
||||
'enabled': True,
|
||||
}
|
||||
})
|
||||
add_register_period_task(name)
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
return func(*args, **kwargs)
|
||||
return wrapper
|
||||
return decorate
|
||||
|
||||
|
||||
def after_app_ready_start(func):
|
||||
# Because when this decorator run, the task was not created,
|
||||
# So we can't use func.name
|
||||
name = '{func.__module__}.{func.__name__}'.format(func=func)
|
||||
if name not in get_after_app_ready_tasks():
|
||||
add_after_app_ready_task(name)
|
||||
|
||||
@wraps(func)
|
||||
def decorate(*args, **kwargs):
|
||||
return func(*args, **kwargs)
|
||||
return decorate
|
||||
|
||||
|
||||
def after_app_shutdown_clean(func):
|
||||
# Because when this decorator run, the task was not created,
|
||||
# So we can't use func.name
|
||||
name = '{func.__module__}.{func.__name__}'.format(func=func)
|
||||
if name not in get_after_app_shutdown_clean_tasks():
|
||||
add_after_app_shutdown_clean_task(name)
|
||||
|
||||
@wraps(func)
|
||||
def decorate(*args, **kwargs):
|
||||
return func(*args, **kwargs)
|
||||
return decorate
|
||||
|
Reference in New Issue
Block a user