From ca6d71f4423b277c0e1dd9653c57233604ed2d21 Mon Sep 17 00:00:00 2001 From: ibuler Date: Mon, 20 Feb 2023 15:01:00 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E4=BC=98=E5=8C=96=20celery=20task=20co?= =?UTF-8?q?ntext?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/ops/signal_handlers.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/apps/ops/signal_handlers.py b/apps/ops/signal_handlers.py index 508a993e7..7d4e15789 100644 --- a/apps/ops/signal_handlers.py +++ b/apps/ops/signal_handlers.py @@ -7,18 +7,15 @@ from django.db.models.signals import pre_save from django.db.utils import ProgrammingError from django.dispatch import receiver from django.utils import translation, timezone -from django.utils.translation import gettext as _ from common.db.utils import close_old_connections, get_logger from common.signals import django_ready +from orgs.utils import get_current_org_id, set_current_org from .celery import app from .models import CeleryTaskExecution, CeleryTask, Job logger = get_logger(__name__) -TASK_LANG_CACHE_KEY = 'TASK_LANG_{}' -TASK_LANG_CACHE_TTL = 1800 - @receiver(pre_save, sender=Job) def on_account_pre_create(sender, instance, **kwargs): @@ -58,32 +55,36 @@ def check_registered_tasks(*args, **kwargs): @signals.before_task_publish.connect -def before_task_publish(headers=None, **kwargs): - task_id = headers.get('id') +def before_task_publish(body=None, **kwargs): current_lang = translation.get_language() - key = TASK_LANG_CACHE_KEY.format(task_id) - cache.set(key, current_lang, 1800) + current_org_id = get_current_org_id() + args, kwargs = body[:2] + kwargs['__current_lang'] = current_lang + kwargs['__current_org_id'] = current_org_id @signals.task_prerun.connect -def on_celery_task_pre_run(task_id='', **kwargs): +def on_celery_task_pre_run(task_id='', kwargs=None, **others): # 更新状态 CeleryTaskExecution.objects.filter(id=task_id) \ .update(state='RUNNING', date_start=timezone.now()) # 关闭之前的数据库连接 close_old_connections() - # 保存 Lang context - key = TASK_LANG_CACHE_KEY.format(task_id) - task_lang = cache.get(key) - if task_lang: - translation.activate(task_lang) + # 设置语言的一些上下文 + lang = kwargs.pop('__current_lang', None) + org_id = kwargs.pop('__current_org_id', None) + if lang: + print('>> Set language to {}'.format(lang)) + translation.activate(lang) + if org_id: + print('>> Set org to {}'.format(org_id)) + set_current_org(org_id) @signals.task_postrun.connect def on_celery_task_post_run(task_id='', state='', **kwargs): close_old_connections() - print(_("Task") + ": {} {}".format(task_id, state)) CeleryTaskExecution.objects.filter(id=task_id).update( state=state, date_finished=timezone.now(), is_finished=True