[Update] 修改celery位置

This commit is contained in:
ibuler
2018-04-02 15:54:49 +08:00
parent a4c843ff13
commit d247e49b70
26 changed files with 250 additions and 391 deletions

View File

@@ -15,4 +15,3 @@ app = Celery('jumpserver')
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS])

View File

@@ -5,27 +5,21 @@ import datetime
import sys
from django.conf import settings
from django.utils import timezone
from django.core.cache import cache
from django.db import transaction
from celery import subtask
from celery.signals import worker_ready, worker_shutdown, task_prerun, \
task_postrun, after_task_publish
from django_celery_beat.models import PeriodicTask
from common.utils import get_logger, TeeObj
from common.utils import get_logger, TeeObj, get_object_or_none
from common.const import celery_task_pre_key
from .utils import get_after_app_ready_tasks, get_after_app_shutdown_clean_tasks
from ..models import CeleryTask
logger = get_logger(__file__)
WAITING = "waiting"
RUNNING = "running"
FINISHED = "finished"
EXPIRE_TIME = 3600
@worker_ready.connect
def on_app_ready(sender=None, headers=None, body=None, **kwargs):
@@ -54,18 +48,31 @@ def after_app_shutdown(sender=None, headers=None, body=None, **kwargs):
PeriodicTask.objects.filter(name__in=tasks).delete()
@after_task_publish.connect
def after_task_publish_signal_handler(sender, headers=None, **kwargs):
CeleryTask.objects.create(
id=headers["id"], status=CeleryTask.WAITING, name=headers["task"]
)
@task_prerun.connect
def pre_run_task_signal_handler(sender, task_id=None, task=None, **kwargs):
task_key = celery_task_pre_key + task_id
info = cache.get(task_key, {})
t = get_object_or_none(CeleryTask, id=task_id)
if t is None:
logger.warn("Not get the task: {}".format(task_id))
return
now = datetime.datetime.now().strftime("%Y-%m-%d")
log_dir = os.path.join(settings.PROJECT_DIR, "data", "celery", now)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_path = os.path.join(log_dir, task_id + '.log')
info.update({"status": RUNNING, "log_path": log_path})
cache.set(task_key, info, EXPIRE_TIME)
f = open(log_path, 'w')
log_path = os.path.join(now, task_id + '.log')
full_path = os.path.join(CeleryTask.LOG_DIR, log_path)
if not os.path.exists(os.path.dirname(full_path)):
os.makedirs(os.path.dirname(full_path))
with transaction.atomic():
t.date_start = timezone.now()
t.status = CeleryTask.RUNNING
t.log_path = log_path
t.save()
f = open(full_path, 'w')
tee = TeeObj(f)
sys.stdout = tee
task.log_f = tee
@@ -73,17 +80,15 @@ def pre_run_task_signal_handler(sender, task_id=None, task=None, **kwargs):
@task_postrun.connect
def post_run_task_signal_handler(sender, task_id=None, task=None, **kwargs):
task_key = celery_task_pre_key + task_id
info = cache.get(task_key, {})
info.update({"status": FINISHED})
cache.set(task_key, info, EXPIRE_TIME)
t = get_object_or_none(CeleryTask, id=task_id)
if t is None:
logger.warn("Not get the task: {}".format(task_id))
return
with transaction.atomic():
t.status = CeleryTask.FINISHED
t.date_finished = timezone.now()
t.save()
task.log_f.flush()
sys.stdout = task.log_f.origin_stdout
task.log_f.close()
@after_task_publish.connect
def after_task_publish_signal_handler(sender, headers=None, **kwargs):
task_id = headers["id"]
key = celery_task_pre_key + task_id
cache.set(key, {"status": WAITING}, EXPIRE_TIME)