diff --git a/Dockerfile b/Dockerfile index e6c2ed82d..54b094a18 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,7 @@ RUN useradd jumpserver COPY ./requirements /tmp/requirements -RUN yum -y install epel-release && cd /tmp/requirements && \ +RUN yum -y install epel-release openldap-clients telnet && cd /tmp/requirements && \ yum -y install $(cat rpm_requirements.txt) RUN cd /tmp/requirements && pip install -r requirements.txt diff --git a/apps/assets/api/admin_user.py b/apps/assets/api/admin_user.py index 263d669fd..f2229022f 100644 --- a/apps/assets/api/admin_user.py +++ b/apps/assets/api/admin_user.py @@ -87,6 +87,7 @@ class AdminUserTestConnectiveApi(generics.RetrieveAPIView): """ queryset = AdminUser.objects.all() permission_classes = (IsOrgAdmin,) + serializer_class = serializers.TaskIDSerializer def retrieve(self, request, *args, **kwargs): admin_user = self.get_object() diff --git a/apps/assets/api/asset.py b/apps/assets/api/asset.py index cd343b2a5..abb0243db 100644 --- a/apps/assets/api/asset.py +++ b/apps/assets/api/asset.py @@ -113,6 +113,7 @@ class AssetAdminUserTestApi(generics.RetrieveAPIView): """ queryset = Asset.objects.all() permission_classes = (IsOrgAdmin,) + serializer_class = serializers.TaskIDSerializer def retrieve(self, request, *args, **kwargs): asset_id = kwargs.get('pk') @@ -124,6 +125,7 @@ class AssetAdminUserTestApi(generics.RetrieveAPIView): class AssetGatewayApi(generics.RetrieveAPIView): queryset = Asset.objects.all() permission_classes = (IsOrgAdminOrAppUser,) + serializer_class = serializers.GatewayWithAuthSerializer def retrieve(self, request, *args, **kwargs): asset_id = kwargs.get('pk') diff --git a/apps/assets/api/system_user.py b/apps/assets/api/system_user.py index e66e4bfc9..5c131f400 100644 --- a/apps/assets/api/system_user.py +++ b/apps/assets/api/system_user.py @@ -117,6 +117,7 @@ class SystemUserAssetsListView(generics.ListAPIView): class SystemUserPushToAssetApi(generics.RetrieveAPIView): queryset = SystemUser.objects.all() permission_classes = (IsOrgAdmin,) + serializer_class = serializers.TaskIDSerializer def retrieve(self, request, *args, **kwargs): system_user = self.get_object() @@ -129,6 +130,7 @@ class SystemUserPushToAssetApi(generics.RetrieveAPIView): class SystemUserTestAssetConnectivityApi(generics.RetrieveAPIView): queryset = SystemUser.objects.all() permission_classes = (IsOrgAdmin,) + serializer_class = serializers.TaskIDSerializer def retrieve(self, request, *args, **kwargs): system_user = self.get_object() diff --git a/apps/assets/serializers/admin_user.py b/apps/assets/serializers/admin_user.py index e1ecdf1c3..009caa1ce 100644 --- a/apps/assets/serializers/admin_user.py +++ b/apps/assets/serializers/admin_user.py @@ -58,7 +58,7 @@ class ReplaceNodeAdminUserSerializer(serializers.ModelSerializer): 管理用户更新关联到的集群 """ nodes = serializers.PrimaryKeyRelatedField( - many=True, queryset = Node.objects.all() + many=True, queryset=Node.objects.all() ) class Meta: @@ -66,4 +66,5 @@ class ReplaceNodeAdminUserSerializer(serializers.ModelSerializer): fields = ['id', 'nodes'] - +class TaskIDSerializer(serializers.Serializer): + task = serializers.CharField(read_only=True) diff --git a/apps/assets/tasks.py b/apps/assets/tasks.py index ce5be0b6f..f7b03cce8 100644 --- a/apps/assets/tasks.py +++ b/apps/assets/tasks.py @@ -1,16 +1,18 @@ # ~*~ coding: utf-8 ~*~ import json import re -import time import os from celery import shared_task from django.utils.translation import ugettext as _ from django.core.cache import cache -from common.utils import capacity_convert, \ - sum_capacity, encrypt_password, get_logger -from ops.celery.utils import register_as_period_task, after_app_shutdown_clean +from common.utils import ( + capacity_convert, sum_capacity, encrypt_password, get_logger +) +from ops.celery.decorator import ( + register_as_period_task, after_app_shutdown_clean_periodic +) from .models import SystemUser, AdminUser, Asset from . import const @@ -132,7 +134,7 @@ def update_assets_hardware_info_util(assets, task_name=None): @shared_task def update_asset_hardware_info_manual(asset): task_name = _("Update asset hardware info: {}").format(asset.hostname) - return update_assets_hardware_info_util( + update_assets_hardware_info_util( [asset], task_name=task_name ) @@ -221,6 +223,7 @@ def test_admin_user_connectivity_period(): for admin_user in admin_users: task_name = _("Test admin user connectivity period: {}").format(admin_user.name) test_admin_user_connectivity_util(admin_user, task_name) + cache.set(key, 1, 60*40) @shared_task @@ -394,13 +397,13 @@ def push_system_user_to_assets(system_user, assets): @shared_task -@after_app_shutdown_clean +@after_app_shutdown_clean_periodic def test_system_user_connectability_period(): pass @shared_task -@after_app_shutdown_clean +@after_app_shutdown_clean_periodic def test_admin_user_connectability_period(): pass @@ -408,7 +411,7 @@ def test_admin_user_connectability_period(): # @shared_task # @register_as_period_task(interval=3600) # @after_app_ready_start -# # @after_app_shutdown_clean +# @after_app_shutdown_clean_periodic # def push_system_user_period(): # for system_user in SystemUser.objects.all(): # push_system_user_related_nodes(system_user) diff --git a/apps/common/api.py b/apps/common/api.py index 4aa8f82ce..ba7820505 100644 --- a/apps/common/api.py +++ b/apps/common/api.py @@ -4,15 +4,20 @@ import os import json import jms_storage +import uuid from rest_framework.views import Response, APIView +from rest_framework import generics from ldap3 import Server, Connection -from django.core.mail import get_connection, send_mail +from django.core.mail import send_mail +from django.core.cache import cache from django.utils.translation import ugettext_lazy as _ from django.conf import settings from .permissions import IsOrgAdmin, IsSuperUser -from .serializers import MailTestSerializer, LDAPTestSerializer +from .serializers import ( + MailTestSerializer, LDAPTestSerializer, OutputSerializer +) from .models import Setting @@ -189,4 +194,39 @@ class DjangoSettingsAPI(APIView): return Response(data) +class LogTailApi(generics.RetrieveAPIView): + permission_classes = () + buff_size = 1024 * 10 + serializer_class = OutputSerializer + end = False + def is_file_finish_write(self): + return True + + def get_log_path(self): + raise NotImplementedError() + + def get(self, request, *args, **kwargs): + mark = request.query_params.get("mark") or str(uuid.uuid4()) + log_path = self.get_log_path() + + if not log_path or not os.path.isfile(log_path): + if self.is_file_finish_write(): + return Response({ + "data": 'Not found the log', + 'end': True, + 'mark': mark} + ) + else: + return Response({"data": _("Waiting ...\n")}, status=200) + + with open(log_path, 'r') as f: + offset = cache.get(mark, 0) + f.seek(offset) + data = f.read(self.buff_size).replace('\n', '\r\n') + mark = str(uuid.uuid4()) + cache.set(mark, f.tell(), 5) + + if data == '' and self.is_file_finish_write(): + self.end = True + return Response({"data": data, 'end': self.end, 'mark': mark}) diff --git a/apps/common/serializers.py b/apps/common/serializers.py index 9d389776d..23ff9fb90 100644 --- a/apps/common/serializers.py +++ b/apps/common/serializers.py @@ -19,3 +19,8 @@ class LDAPTestSerializer(serializers.Serializer): AUTH_LDAP_USER_ATTR_MAP = serializers.CharField() AUTH_LDAP_START_TLS = serializers.BooleanField(required=False) + +class OutputSerializer(serializers.Serializer): + output = serializers.CharField() + is_end = serializers.BooleanField() + mark = serializers.CharField() diff --git a/apps/common/signals_handler.py b/apps/common/signals_handler.py index 96355a771..7879f94b2 100644 --- a/apps/common/signals_handler.py +++ b/apps/common/signals_handler.py @@ -18,23 +18,25 @@ logger = get_logger(__file__) @receiver(post_save, sender=Setting, dispatch_uid="my_unique_identifier") def refresh_settings_on_changed(sender, instance=None, **kwargs): - logger.debug("Receive setting item change") - logger.debug(" - refresh setting: {}".format(instance.name)) if instance: instance.refresh_setting() @receiver(django_ready, dispatch_uid="my_unique_identifier") -def refresh_all_settings_on_django_ready(sender, **kwargs): - logger.debug("Receive django ready signal") - logger.debug(" - fresh all settings") +def monkey_patch_settings(sender, **kwargs): cache_key_prefix = '_SETTING_' + uncached_settings = [ + 'CACHES', 'DEBUG', 'SECRET_KEY', 'INSTALLED_APPS', + 'ROOT_URLCONF', 'TEMPLATES', 'DATABASES', '_wrapped', + 'CELERY_LOG_DIR' + ] def monkey_patch_getattr(self, name): - key = cache_key_prefix + name - cached = cache.get(key) - if cached is not None: - return cached + if name not in uncached_settings: + key = cache_key_prefix + name + cached = cache.get(key) + if cached is not None: + return cached if self._wrapped is empty: self._setup(name) val = getattr(self._wrapped, name) @@ -66,7 +68,10 @@ def refresh_all_settings_on_django_ready(sender, **kwargs): @receiver(django_ready) def auto_generate_terminal_host_key(sender, **kwargs): - if Setting.objects.filter(name='TERMINAL_HOST_KEY').exists(): + try: + if Setting.objects.filter(name='TERMINAL_HOST_KEY').exists(): + return + except ProgrammingError: return private_key, public_key = ssh_key_gen() value = json.dumps(private_key) diff --git a/apps/common/utils.py b/apps/common/utils.py index 71913d196..2668c2ff2 100644 --- a/apps/common/utils.py +++ b/apps/common/utils.py @@ -406,24 +406,6 @@ def get_replay_storage_setting(): return value -class TeeObj: - origin_stdout = sys.stdout - - def __init__(self, file_obj): - self.file_obj = file_obj - - def write(self, msg): - self.origin_stdout.write(msg) - self.file_obj.write(msg.replace('*', '')) - - def flush(self): - self.origin_stdout.flush() - self.file_obj.flush() - - def close(self): - self.file_obj.close() - - def with_cache(func): cache = {} key = "_{}.{}".format(func.__module__, func.__name__) diff --git a/apps/jumpserver/settings.py b/apps/jumpserver/settings.py index 518b4617e..24e4ea7f5 100644 --- a/apps/jumpserver/settings.py +++ b/apps/jumpserver/settings.py @@ -412,6 +412,9 @@ RADIUS_SECRET = CONFIG.RADIUS_SECRET if AUTH_RADIUS: AUTHENTICATION_BACKENDS.insert(0, AUTH_RADIUS_BACKEND) +# Dump all celery log to here +CELERY_LOG_DIR = os.path.join(PROJECT_DIR, 'data', 'celery') + # Celery using redis as broker CELERY_BROKER_URL = 'redis://:%(password)s@%(host)s:%(port)s/%(db)s' % { 'password': CONFIG.REDIS_PASSWORD, @@ -425,14 +428,16 @@ CELERY_RESULT_BACKEND = CELERY_BROKER_URL CELERY_ACCEPT_CONTENT = ['json', 'pickle'] CELERY_RESULT_EXPIRES = 3600 # CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' -CELERY_WORKER_LOG_FORMAT = '%(message)s' -# CELERY_WORKER_TASK_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' -CELERY_WORKER_TASK_LOG_FORMAT = '%(message)s' +# CELERY_WORKER_LOG_FORMAT = '%(message)s' +CELERY_WORKER_TASK_LOG_FORMAT = '%(task_id)s %(task_name)s %(message)s' +# CELERY_WORKER_TASK_LOG_FORMAT = '%(message)s' # CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' +CELERY_WORKER_LOG_FORMAT = '%(message)s' CELERY_TASK_EAGER_PROPAGATES = True -CELERY_REDIRECT_STDOUTS = True -CELERY_REDIRECT_STDOUTS_LEVEL = "INFO" -CELERY_WORKER_HIJACK_ROOT_LOGGER = False +CELERY_WORKER_REDIRECT_STDOUTS = True +CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "INFO" +# CELERY_WORKER_HIJACK_ROOT_LOGGER = False +CELERY_WORKER_MAX_TASKS_PER_CHILD = 40 # Cache use redis CACHES = { diff --git a/apps/ops/ansible/display.py b/apps/ops/ansible/display.py deleted file mode 100644 index 1494eb5ef..000000000 --- a/apps/ops/ansible/display.py +++ /dev/null @@ -1,19 +0,0 @@ -# -*- coding: utf-8 -*- -# - -import sys - - -class TeeObj: - origin_stdout = sys.stdout - - def __init__(self, file_obj): - self.file_obj = file_obj - - def write(self, msg): - self.origin_stdout.write(msg) - self.file_obj.write(msg.replace('*', '')) - - def flush(self): - self.origin_stdout.flush() - self.file_obj.flush() diff --git a/apps/ops/ansible/runner.py b/apps/ops/ansible/runner.py index 7931b72ee..4c8f87888 100644 --- a/apps/ops/ansible/runner.py +++ b/apps/ops/ansible/runner.py @@ -9,10 +9,10 @@ from ansible.parsing.dataloader import DataLoader from ansible.executor.playbook_executor import PlaybookExecutor from ansible.playbook.play import Play import ansible.constants as C -from ansible.utils.display import Display -from .callback import AdHocResultCallback, PlaybookResultCallBack, \ - CommandResultCallback +from .callback import ( + AdHocResultCallback, PlaybookResultCallBack, CommandResultCallback +) from common.utils import get_logger from .exceptions import AnsibleError @@ -22,13 +22,6 @@ C.HOST_KEY_CHECKING = False logger = get_logger(__name__) -class CustomDisplay(Display): - def display(self, msg, color=None, stderr=False, screen_only=False, log_only=False): - pass - -display = CustomDisplay() - - Options = namedtuple('Options', [ 'listtags', 'listtasks', 'listhosts', 'syntax', 'connection', 'module_path', 'forks', 'remote_user', 'private_key_file', 'timeout', diff --git a/apps/ops/api/celery.py b/apps/ops/api/celery.py index ae8f479d6..d2053f9cb 100644 --- a/apps/ops/api/celery.py +++ b/apps/ops/api/celery.py @@ -1,46 +1,42 @@ # -*- coding: utf-8 -*- # -import uuid + import os - from celery.result import AsyncResult -from django.core.cache import cache -from django.utils.translation import ugettext as _ from rest_framework import generics -from rest_framework.views import Response -from common.permissions import IsOrgAdmin, IsValidUser +from common.permissions import IsValidUser +from common.api import LogTailApi from ..models import CeleryTask from ..serializers import CeleryResultSerializer +from ..celery.utils import get_celery_task_log_path __all__ = ['CeleryTaskLogApi', 'CeleryResultApi'] -class CeleryTaskLogApi(generics.RetrieveAPIView): +class CeleryTaskLogApi(LogTailApi): permission_classes = (IsValidUser,) - buff_size = 1024 * 10 - end = False - queryset = CeleryTask.objects.all() + task = None + task_id = '' def get(self, request, *args, **kwargs): - mark = request.query_params.get("mark") or str(uuid.uuid4()) - task = self.get_object() - log_path = task.full_log_path + self.task_id = str(kwargs.get('pk')) + self.task = AsyncResult(self.task_id) + return super().get(request, *args, **kwargs) - if not log_path or not os.path.isfile(log_path): - return Response({"data": _("Waiting ...")}, status=203) + def get_log_path(self): + new_path = get_celery_task_log_path(self.task_id) + if new_path and os.path.isfile(new_path): + return new_path + try: + task = CeleryTask.objects.get(id=self.task_id) + except CeleryTask.DoesNotExist: + return None + return task.full_log_path - with open(log_path, 'r', encoding="utf8") as f: - offset = cache.get(mark, 0) - f.seek(offset) - data = f.read(self.buff_size).replace('\n', '\r\n') - mark = str(uuid.uuid4()) - cache.set(mark, f.tell(), 5) - - if data == '' and task.is_finished(): - self.end = True - return Response({"data": data, 'end': self.end, 'mark': mark}) + def is_file_finish_write(self): + return self.task.ready() class CeleryResultApi(generics.RetrieveAPIView): diff --git a/apps/ops/apps.py b/apps/ops/apps.py index 8a70780f0..01dfd05fa 100644 --- a/apps/ops/apps.py +++ b/apps/ops/apps.py @@ -10,6 +10,5 @@ class OpsConfig(AppConfig): from orgs.models import Organization from orgs.utils import set_current_org set_current_org(Organization.root()) - - super().ready() from .celery import signal_handler + super().ready() diff --git a/apps/ops/celery/decorator.py b/apps/ops/celery/decorator.py new file mode 100644 index 000000000..c2052f832 --- /dev/null +++ b/apps/ops/celery/decorator.py @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- +# +from functools import wraps + +_need_registered_period_tasks = [] +_after_app_ready_start_tasks = [] +_after_app_shutdown_clean_periodic_tasks = [] + + +def add_register_period_task(task): + _need_registered_period_tasks.append(task) + # key = "__REGISTER_PERIODIC_TASKS" + # value = cache.get(key, []) + # value.append(name) + # cache.set(key, value) + + +def get_register_period_tasks(): + # key = "__REGISTER_PERIODIC_TASKS" + # return cache.get(key, []) + return _need_registered_period_tasks + + +def add_after_app_shutdown_clean_task(name): + # key = "__AFTER_APP_SHUTDOWN_CLEAN_TASKS" + # value = cache.get(key, []) + # value.append(name) + # cache.set(key, value) + _after_app_shutdown_clean_periodic_tasks.append(name) + + +def get_after_app_shutdown_clean_tasks(): + # key = "__AFTER_APP_SHUTDOWN_CLEAN_TASKS" + # return cache.get(key, []) + return _after_app_shutdown_clean_periodic_tasks + + +def add_after_app_ready_task(name): + # key = "__AFTER_APP_READY_RUN_TASKS" + # value = cache.get(key, []) + # value.append(name) + # cache.set(key, value) + _after_app_ready_start_tasks.append(name) + + +def get_after_app_ready_tasks(): + # key = "__AFTER_APP_READY_RUN_TASKS" + # return cache.get(key, []) + return _after_app_ready_start_tasks + + +def register_as_period_task(crontab=None, interval=None): + """ + Warning: Task must be have not any args and kwargs + :param crontab: "* * * * *" + :param interval: 60*60*60 + :return: + """ + if crontab is None and interval is None: + raise SyntaxError("Must set crontab or interval one") + + def decorate(func): + if crontab is None and interval is None: + raise SyntaxError("Interval and crontab must set one") + + # Because when this decorator run, the task was not created, + # So we can't use func.name + name = '{func.__module__}.{func.__name__}'.format(func=func) + add_register_period_task({ + name: { + 'task': name, + 'interval': interval, + 'crontab': crontab, + 'args': (), + 'enabled': True, + } + }) + + @wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + return wrapper + return decorate + + +def after_app_ready_start(func): + # Because when this decorator run, the task was not created, + # So we can't use func.name + name = '{func.__module__}.{func.__name__}'.format(func=func) + if name not in _after_app_ready_start_tasks: + add_after_app_ready_task(name) + + @wraps(func) + def decorate(*args, **kwargs): + return func(*args, **kwargs) + return decorate + + +def after_app_shutdown_clean_periodic(func): + # Because when this decorator run, the task was not created, + # So we can't use func.name + name = '{func.__module__}.{func.__name__}'.format(func=func) + if name not in _after_app_shutdown_clean_periodic_tasks: + add_after_app_shutdown_clean_task(name) + + @wraps(func) + def decorate(*args, **kwargs): + return func(*args, **kwargs) + return decorate diff --git a/apps/ops/celery/logger.py b/apps/ops/celery/logger.py new file mode 100644 index 000000000..bfe713d6d --- /dev/null +++ b/apps/ops/celery/logger.py @@ -0,0 +1,160 @@ +from logging import StreamHandler + +from django.conf import settings +from celery import current_task +from celery.signals import task_prerun, task_postrun +from kombu import Connection, Exchange, Queue, Producer +from kombu.mixins import ConsumerMixin + +from .utils import get_celery_task_log_path + +routing_key = 'celery_log' +celery_log_exchange = Exchange('celery_log_exchange', type='direct') +celery_log_queue = [Queue('celery_log', celery_log_exchange, routing_key=routing_key)] + + +class CeleryLoggerConsumer(ConsumerMixin): + def __init__(self): + self.connection = Connection(settings.CELERY_LOG_BROKER_URL) + + def get_consumers(self, Consumer, channel): + return [Consumer(queues=celery_log_queue, + accept=['pickle', 'json'], + callbacks=[self.process_task]) + ] + + def handle_task_start(self, task_id, message): + pass + + def handle_task_end(self, task_id, message): + pass + + def handle_task_log(self, task_id, msg, message): + pass + + def process_task(self, body, message): + action = body.get('action') + task_id = body.get('task_id') + msg = body.get('msg') + if action == CeleryLoggerProducer.ACTION_TASK_LOG: + self.handle_task_log(task_id, msg, message) + elif action == CeleryLoggerProducer.ACTION_TASK_START: + self.handle_task_start(task_id, message) + elif action == CeleryLoggerProducer.ACTION_TASK_END: + self.handle_task_end(task_id, message) + + +class CeleryLoggerProducer: + ACTION_TASK_START, ACTION_TASK_LOG, ACTION_TASK_END = range(3) + + def __init__(self): + self.connection = Connection(settings.CELERY_LOG_BROKER_URL) + + @property + def producer(self): + return Producer(self.connection) + + def publish(self, payload): + self.producer.publish( + payload, serializer='json', exchange=celery_log_exchange, + declare=[celery_log_exchange], routing_key=routing_key + ) + + def log(self, task_id, msg): + payload = {'task_id': task_id, 'msg': msg, 'action': self.ACTION_TASK_LOG} + return self.publish(payload) + + def read(self): + pass + + def flush(self): + pass + + def task_end(self, task_id): + payload = {'task_id': task_id, 'action': self.ACTION_TASK_END} + return self.publish(payload) + + def task_start(self, task_id): + payload = {'task_id': task_id, 'action': self.ACTION_TASK_START} + return self.publish(payload) + + +class CeleryTaskLoggerHandler(StreamHandler): + terminator = '\r\n' + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + task_prerun.connect(self.on_task_start) + task_postrun.connect(self.on_start_end) + + @staticmethod + def get_current_task_id(): + if not current_task: + return + task_id = current_task.request.root_id + return task_id + + def on_task_start(self, sender, task_id, **kwargs): + return self.handle_task_start(task_id) + + def on_start_end(self, sender, task_id, **kwargs): + return self.handle_task_end(task_id) + + def after_task_publish(self, sender, body, **kwargs): + pass + + def emit(self, record): + task_id = self.get_current_task_id() + if not task_id: + return + try: + self.write_task_log(task_id, record) + self.flush() + except Exception: + self.handleError(record) + + def write_task_log(self, task_id, msg): + pass + + def handle_task_start(self, task_id): + pass + + def handle_task_end(self, task_id): + pass + + +class CeleryTaskMQLoggerHandler(CeleryTaskLoggerHandler): + def __init__(self): + self.producer = CeleryLoggerProducer() + super().__init__(stream=None) + + def write_task_log(self, task_id, record): + msg = self.format(record) + self.producer.log(task_id, msg) + + def flush(self): + self.producer.flush() + + +class CeleryTaskFileHandler(CeleryTaskLoggerHandler): + def __init__(self): + self.f = None + super().__init__(stream=None) + + def emit(self, record): + msg = self.format(record) + if not self.f: + return + self.f.write(msg) + self.f.write(self.terminator) + self.flush() + + def flush(self): + self.f and self.f.flush() + + def handle_task_start(self, task_id): + log_path = get_celery_task_log_path(task_id) + self.f = open(log_path, 'a') + + def handle_task_end(self, task_id): + self.f and self.f.close() diff --git a/apps/ops/celery/signal_handler.py b/apps/ops/celery/signal_handler.py index ae7acc2ea..b2a6ddf37 100644 --- a/apps/ops/celery/signal_handler.py +++ b/apps/ops/celery/signal_handler.py @@ -1,103 +1,105 @@ # -*- coding: utf-8 -*- # -import os -import datetime -import sys -import time +import logging -from django.conf import settings -from django.utils import timezone from django.core.cache import cache -from django.db import transaction from celery import subtask -from celery.signals import worker_ready, worker_shutdown, task_prerun, \ - task_postrun, after_task_publish +from celery.signals import ( + worker_ready, worker_shutdown, after_setup_logger +) from django_celery_beat.models import PeriodicTask -from common.utils import get_logger, TeeObj, get_object_or_none -from common.const import celery_task_pre_key -from .utils import get_after_app_ready_tasks, get_after_app_shutdown_clean_tasks -from ..models import CeleryTask +from common.utils import get_logger +from .decorator import get_after_app_ready_tasks, get_after_app_shutdown_clean_tasks +from .logger import CeleryTaskFileHandler logger = get_logger(__file__) @worker_ready.connect -def on_app_ready(sender=None, headers=None, body=None, **kwargs): +def on_app_ready(sender=None, headers=None, **kwargs): if cache.get("CELERY_APP_READY", 0) == 1: return cache.set("CELERY_APP_READY", 1, 10) tasks = get_after_app_ready_tasks() - logger.debug("Start need start task: [{}]".format( - ", ".join(tasks)) - ) + logger.debug("Work ready signal recv") + logger.debug("Start need start task: [{}]".format(", ".join(tasks))) for task in tasks: subtask(task).delay() @worker_shutdown.connect -def after_app_shutdown(sender=None, headers=None, body=None, **kwargs): +def after_app_shutdown_periodic_tasks(sender=None, **kwargs): if cache.get("CELERY_APP_SHUTDOWN", 0) == 1: return cache.set("CELERY_APP_SHUTDOWN", 1, 10) tasks = get_after_app_shutdown_clean_tasks() - logger.debug("App shutdown signal recv") - logger.debug("Clean need cleaned period tasks: [{}]".format( - ', '.join(tasks)) - ) + logger.debug("Worker shutdown signal recv") + logger.debug("Clean period tasks: [{}]".format(', '.join(tasks))) PeriodicTask.objects.filter(name__in=tasks).delete() -@after_task_publish.connect -def after_task_publish_signal_handler(sender, headers=None, **kwargs): - CeleryTask.objects.create( - id=headers["id"], status=CeleryTask.WAITING, name=headers["task"] - ) - cache.set(headers["id"], True, 3600) - - -@task_prerun.connect -def pre_run_task_signal_handler(sender, task_id=None, task=None, **kwargs): - time.sleep(0.1) - for i in range(5): - if cache.get(task_id, False): - break - else: - time.sleep(0.1) - continue - - t = get_object_or_none(CeleryTask, id=task_id) - if t is None: - logger.warn("Not get the task: {}".format(task_id)) +@after_setup_logger.connect +def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs): + if not logger: return - now = datetime.datetime.now().strftime("%Y-%m-%d") - log_path = os.path.join(now, task_id + '.log') - full_path = os.path.join(CeleryTask.LOG_DIR, log_path) - - if not os.path.exists(os.path.dirname(full_path)): - os.makedirs(os.path.dirname(full_path)) - with transaction.atomic(): - t.date_start = timezone.now() - t.status = CeleryTask.RUNNING - t.log_path = log_path - t.save() - f = open(full_path, 'w', encoding="utf-8") - tee = TeeObj(f) - sys.stdout = tee - task.log_f = tee + handler = CeleryTaskFileHandler() + handler.setLevel(loglevel) + formatter = logging.Formatter(format) + handler.setFormatter(formatter) + logger.addHandler(handler) -@task_postrun.connect -def post_run_task_signal_handler(sender, task_id=None, task=None, **kwargs): - t = get_object_or_none(CeleryTask, id=task_id) - if t is None: - logger.warn("Not get the task: {}".format(task_id)) - return - with transaction.atomic(): - t.status = CeleryTask.FINISHED - t.date_finished = timezone.now() - t.save() - task.log_f.flush() - sys.stdout = task.log_f.origin_stdout - task.log_f.close() +# @after_task_publish.connect +# def after_task_publish_signal_handler(sender, headers=None, **kwargs): +# CeleryTask.objects.create( +# id=headers["id"], status=CeleryTask.WAITING, name=headers["task"] +# ) +# cache.set(headers["id"], True, 3600) +# +# +# @task_prerun.connect +# def pre_run_task_signal_handler(sender, task_id=None, task=None, **kwargs): +# time.sleep(0.1) +# for i in range(5): +# if cache.get(task_id, False): +# break +# else: +# time.sleep(0.1) +# continue +# +# t = get_object_or_none(CeleryTask, id=task_id) +# if t is None: +# logger.warn("Not get the task: {}".format(task_id)) +# return +# now = datetime.datetime.now().strftime("%Y-%m-%d") +# log_path = os.path.join(now, task_id + '.log') +# full_path = os.path.join(CeleryTask.LOG_DIR, log_path) +# +# if not os.path.exists(os.path.dirname(full_path)): +# os.makedirs(os.path.dirname(full_path)) +# with transaction.atomic(): +# t.date_start = timezone.now() +# t.status = CeleryTask.RUNNING +# t.log_path = log_path +# t.save() +# f = open(full_path, 'w', encoding="utf-8") +# tee = TeeObj(f) +# sys.stdout = tee +# task.log_f = tee +# +# +# @task_postrun.connect +# def post_run_task_signal_handler(sender, task_id=None, task=None, **kwargs): +# t = get_object_or_none(CeleryTask, id=task_id) +# if t is None: +# logger.warn("Not get the task: {}".format(task_id)) +# return +# with transaction.atomic(): +# t.status = CeleryTask.FINISHED +# t.date_finished = timezone.now() +# t.save() +# task.log_f.flush() +# sys.stdout = task.log_f.origin_stdout +# task.log_f.close() diff --git a/apps/ops/celery/utils.py b/apps/ops/celery/utils.py index b4f5a80db..1fc2fc103 100644 --- a/apps/ops/celery/utils.py +++ b/apps/ops/celery/utils.py @@ -1,49 +1,13 @@ # -*- coding: utf-8 -*- # import json -from functools import wraps +import os +from django.conf import settings from django.db.utils import ProgrammingError, OperationalError -from django.core.cache import cache from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule -def add_register_period_task(name): - key = "__REGISTER_PERIODIC_TASKS" - value = cache.get(key, []) - value.append(name) - cache.set(key, value) - - -def get_register_period_tasks(): - key = "__REGISTER_PERIODIC_TASKS" - return cache.get(key, []) - - -def add_after_app_shutdown_clean_task(name): - key = "__AFTER_APP_SHUTDOWN_CLEAN_TASKS" - value = cache.get(key, []) - value.append(name) - cache.set(key, value) - - -def get_after_app_shutdown_clean_tasks(): - key = "__AFTER_APP_SHUTDOWN_CLEAN_TASKS" - return cache.get(key, []) - - -def add_after_app_ready_task(name): - key = "__AFTER_APP_READY_RUN_TASKS" - value = cache.get(key, []) - value.append(name) - cache.set(key, value) - - -def get_after_app_ready_tasks(): - key = "__AFTER_APP_READY_RUN_TASKS" - return cache.get(key, []) - - def create_or_update_celery_periodic_tasks(tasks): """ :param tasks: { @@ -123,63 +87,10 @@ def delete_celery_periodic_task(task_name): PeriodicTask.objects.filter(name=task_name).delete() -def register_as_period_task(crontab=None, interval=None): - """ - Warning: Task must be have not any args and kwargs - :param crontab: "* * * * *" - :param interval: 60*60*60 - :return: - """ - if crontab is None and interval is None: - raise SyntaxError("Must set crontab or interval one") +def get_celery_task_log_path(task_id): + task_id = str(task_id) + rel_path = os.path.join(task_id[0], task_id[1], task_id + '.log') + path = os.path.join(settings.CELERY_LOG_DIR, rel_path) + os.makedirs(os.path.dirname(path), exist_ok=True) + return path - def decorate(func): - if crontab is None and interval is None: - raise SyntaxError("Interval and crontab must set one") - - # Because when this decorator run, the task was not created, - # So we can't use func.name - name = '{func.__module__}.{func.__name__}'.format(func=func) - if name not in get_register_period_tasks(): - create_or_update_celery_periodic_tasks({ - name: { - 'task': name, - 'interval': interval, - 'crontab': crontab, - 'args': (), - 'enabled': True, - } - }) - add_register_period_task(name) - - @wraps(func) - def wrapper(*args, **kwargs): - return func(*args, **kwargs) - return wrapper - return decorate - - -def after_app_ready_start(func): - # Because when this decorator run, the task was not created, - # So we can't use func.name - name = '{func.__module__}.{func.__name__}'.format(func=func) - if name not in get_after_app_ready_tasks(): - add_after_app_ready_task(name) - - @wraps(func) - def decorate(*args, **kwargs): - return func(*args, **kwargs) - return decorate - - -def after_app_shutdown_clean(func): - # Because when this decorator run, the task was not created, - # So we can't use func.name - name = '{func.__module__}.{func.__name__}'.format(func=func) - if name not in get_after_app_shutdown_clean_tasks(): - add_after_app_shutdown_clean_task(name) - - @wraps(func) - def decorate(*args, **kwargs): - return func(*args, **kwargs) - return decorate diff --git a/apps/ops/models/command.py b/apps/ops/models/command.py index d2d132094..623dd39bb 100644 --- a/apps/ops/models/command.py +++ b/apps/ops/models/command.py @@ -8,6 +8,8 @@ from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext from django.db import models + +from orgs.models import Organization from ..ansible.runner import CommandRunner from ..inventory import JMSInventory @@ -53,6 +55,8 @@ class CommandExecution(models.Model): def run(self): print('-'*10 + ' ' + ugettext('Task start') + ' ' + '-'*10) + org = Organization.get_instance(self.run_as.org_id) + org.change_to() self.date_start = timezone.now() ok, msg = self.run_as.is_command_can_run(self.command) if ok: diff --git a/apps/ops/tasks.py b/apps/ops/tasks.py index 06bf3d51d..799364563 100644 --- a/apps/ops/tasks.py +++ b/apps/ops/tasks.py @@ -5,7 +5,11 @@ from celery import shared_task, subtask from django.utils import timezone from common.utils import get_logger, get_object_or_none -from .celery.utils import register_as_period_task, after_app_shutdown_clean +from .celery.decorator import ( + register_as_period_task, after_app_shutdown_clean_periodic, + after_app_ready_start +) +from .celery.utils import create_or_update_celery_periodic_tasks from .models import Task, CommandExecution, CeleryTask logger = get_logger(__file__) @@ -39,8 +43,8 @@ def run_command_execution(cid, **kwargs): @shared_task +@after_app_shutdown_clean_periodic @register_as_period_task(interval=3600*24) -@after_app_shutdown_clean def clean_tasks_adhoc_period(): logger.debug("Start clean task adhoc and run history") tasks = Task.objects.all() @@ -52,8 +56,8 @@ def clean_tasks_adhoc_period(): @shared_task +@after_app_shutdown_clean_periodic @register_as_period_task(interval=3600*24) -@after_app_shutdown_clean def clean_celery_tasks_period(): logger.debug("Start clean celery task history") one_month_ago = timezone.now() - timezone.timedelta(days=30) @@ -69,11 +73,19 @@ def clean_celery_tasks_period(): tasks.delete() +@shared_task +@after_app_ready_start +def create_or_update_registered_periodic_tasks(): + from .celery.decorator import get_register_period_tasks + for task in get_register_period_tasks(): + create_or_update_celery_periodic_tasks(task) + + @shared_task def hello(name, callback=None): + import time + time.sleep(10) print("Hello {}".format(name)) - if callback is not None: - subtask(callback).delay("Guahongwei") @shared_task diff --git a/apps/ops/templates/ops/celery_task_log.html b/apps/ops/templates/ops/celery_task_log.html index 66b3177c7..a182789fa 100644 --- a/apps/ops/templates/ops/celery_task_log.html +++ b/apps/ops/templates/ops/celery_task_log.html @@ -1,6 +1,7 @@ {% load static %} +{% load i18n %} - term.js + {% trans 'Task log' %} @@ -15,14 +16,14 @@ } -
-
+
+