diff --git a/apps/common/const/choices.py b/apps/common/const/choices.py index 8de0c5fc8..6bff02254 100644 --- a/apps/common/const/choices.py +++ b/apps/common/const/choices.py @@ -1,7 +1,3 @@ -from django.utils.translation import ugettext_lazy as _ - -from common.db.models import ChoiceSet - ADMIN = 'Admin' USER = 'User' diff --git a/apps/jumpserver/settings/libs.py b/apps/jumpserver/settings/libs.py index 8a8df7ca4..aac0c4d50 100644 --- a/apps/jumpserver/settings/libs.py +++ b/apps/jumpserver/settings/libs.py @@ -125,3 +125,5 @@ CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "INFO" # CELERY_WORKER_HIJACK_ROOT_LOGGER = True # CELERY_WORKER_MAX_TASKS_PER_CHILD = 40 CELERY_TASK_SOFT_TIME_LIMIT = 3600 + +ANSIBLE_LOG_DIR = os.path.join(PROJECT_DIR, 'data', 'ansible') diff --git a/apps/ops/ansible/callback.py b/apps/ops/ansible/callback.py index adba3e81b..8edbbbcb7 100644 --- a/apps/ops/ansible/callback.py +++ b/apps/ops/ansible/callback.py @@ -60,6 +60,10 @@ class CallbackMixin: self.results_raw[t][host][task_name] = task_result self.clean_result(t, host, task_name, task_result) + def close(self): + if hasattr(self._display, 'close'): + self._display.close() + class AdHocResultCallback(CallbackMixin, CallbackModule, CMDCallBackModule): """ diff --git a/apps/ops/ansible/display.py b/apps/ops/ansible/display.py new file mode 100644 index 000000000..b272f6a6d --- /dev/null +++ b/apps/ops/ansible/display.py @@ -0,0 +1,66 @@ +import errno +import sys +import os + +from ansible.utils.display import Display +from ansible.utils.color import stringc +from ansible.utils.singleton import Singleton + +from .utils import get_ansible_task_log_path + + +class UnSingleton(Singleton): + def __init__(cls, name, bases, dct): + type.__init__(cls, name, bases, dct) + + def __call__(cls, *args, **kwargs): + return type.__call__(cls, *args, **kwargs) + + +class AdHocDisplay(Display, metaclass=UnSingleton): + def __init__(self, execution_id, verbosity=0): + super().__init__(verbosity=verbosity) + if execution_id: + log_path = get_ansible_task_log_path(execution_id) + else: + log_path = os.devnull + self.log_file = open(log_path, mode='a') + + def close(self): + self.log_file.close() + + def set_cowsay_info(self): + # 中断 cowsay 的测试,会频繁开启子进程 + return + + def _write_to_screen(self, msg, stderr): + if not stderr: + screen = sys.stdout + else: + screen = sys.stderr + + screen.write(msg) + + try: + screen.flush() + except IOError as e: + # Ignore EPIPE in case fileobj has been prematurely closed, eg. + # when piping to "head -n1" + if e.errno != errno.EPIPE: + raise + + def _write_to_log_file(self, msg): + # 这里先不 flush,log 文件不需要那么及时。 + self.log_file.write(msg) + + def display(self, msg, color=None, stderr=False, screen_only=False, log_only=False): + if color: + msg = stringc(msg, color) + + if not msg.endswith(u'\n'): + msg2 = msg + u'\n' + else: + msg2 = msg + + self._write_to_screen(msg2, stderr) + self._write_to_log_file(msg2) diff --git a/apps/ops/ansible/runner.py b/apps/ops/ansible/runner.py index e741d8f8b..fdbed74cb 100644 --- a/apps/ops/ansible/runner.py +++ b/apps/ops/ansible/runner.py @@ -1,6 +1,7 @@ # ~*~ coding: utf-8 ~*~ import os + import shutil from collections import namedtuple @@ -18,6 +19,7 @@ from .callback import ( ) from common.utils import get_logger from .exceptions import AnsibleError +from .display import AdHocDisplay __all__ = ["AdHocRunner", "PlayBookRunner", "CommandRunner"] @@ -130,8 +132,8 @@ class AdHocRunner: loader=self.loader, inventory=self.inventory ) - def get_result_callback(self, file_obj=None): - return self.__class__.results_callback_class() + def get_result_callback(self, execution_id=None): + return self.__class__.results_callback_class(display=AdHocDisplay(execution_id)) @staticmethod def check_module_args(module_name, module_args=''): @@ -189,7 +191,7 @@ class AdHocRunner: 'ssh_args': '-C -o ControlMaster=no' } - def run(self, tasks, pattern, play_name='Ansible Ad-hoc', gather_facts='no'): + def run(self, tasks, pattern, play_name='Ansible Ad-hoc', gather_facts='no', execution_id=None): """ :param tasks: [{'action': {'module': 'shell', 'args': 'ls'}, ...}, ] :param pattern: all, *, or others @@ -198,7 +200,7 @@ class AdHocRunner: :return: """ self.check_pattern(pattern) - self.results_callback = self.get_result_callback() + self.results_callback = self.get_result_callback(execution_id) cleaned_tasks = self.clean_tasks(tasks) self.set_control_master_if_need(cleaned_tasks) context.CLIARGS = ImmutableDict(self.options) @@ -233,6 +235,8 @@ class AdHocRunner: tqm.cleanup() shutil.rmtree(C.DEFAULT_LOCAL_TMP, True) + self.results_callback.close() + class CommandRunner(AdHocRunner): results_callback_class = CommandResultCallback diff --git a/apps/ops/ansible/utils.py b/apps/ops/ansible/utils.py new file mode 100644 index 000000000..478badc56 --- /dev/null +++ b/apps/ops/ansible/utils.py @@ -0,0 +1,6 @@ +from django.conf import settings + + +def get_ansible_task_log_path(task_id): + from ops.utils import get_task_log_path + return get_task_log_path(settings.ANSIBLE_LOG_DIR, task_id, level=3) diff --git a/apps/ops/api/celery.py b/apps/ops/api/celery.py index 814835465..3968b39b7 100644 --- a/apps/ops/api/celery.py +++ b/apps/ops/api/celery.py @@ -15,10 +15,14 @@ from common.api import LogTailApi from ..models import CeleryTask from ..serializers import CeleryResultSerializer, CeleryPeriodTaskSerializer from ..celery.utils import get_celery_task_log_path +from ..ansible.utils import get_ansible_task_log_path from common.mixins.api import CommonApiMixin -__all__ = ['CeleryTaskLogApi', 'CeleryResultApi', 'CeleryPeriodTaskViewSet'] +__all__ = [ + 'CeleryTaskLogApi', 'CeleryResultApi', 'CeleryPeriodTaskViewSet', + 'AnsibleTaskLogApi', +] class CeleryTaskLogApi(LogTailApi): @@ -57,6 +61,21 @@ class CeleryTaskLogApi(LogTailApi): return _('Waiting task start') +class AnsibleTaskLogApi(LogTailApi): + permission_classes = (IsValidUser,) + + def get_log_path(self): + new_path = get_ansible_task_log_path(self.kwargs.get('pk')) + if new_path and os.path.isfile(new_path): + return new_path + + def get_no_file_message(self, request): + if self.mark == 'undefined': + return '.' + else: + return _('Waiting task start') + + class CeleryResultApi(generics.RetrieveAPIView): permission_classes = (IsValidUser,) serializer_class = CeleryResultSerializer diff --git a/apps/ops/celery/utils.py b/apps/ops/celery/utils.py index ff5aeb1d4..41f99212f 100644 --- a/apps/ops/celery/utils.py +++ b/apps/ops/celery/utils.py @@ -102,11 +102,8 @@ def get_celery_periodic_task(task_name): def get_celery_task_log_path(task_id): - task_id = str(task_id) - rel_path = os.path.join(task_id[0], task_id[1], task_id + '.log') - path = os.path.join(settings.CELERY_LOG_DIR, rel_path) - os.makedirs(os.path.dirname(path), exist_ok=True) - return path + from ops.utils import get_task_log_path + return get_task_log_path(settings.CELERY_LOG_DIR, task_id) def get_celery_status(): diff --git a/apps/ops/migrations/0019_adhocexecution_celery_task_id.py b/apps/ops/migrations/0019_adhocexecution_celery_task_id.py new file mode 100644 index 000000000..4789267e6 --- /dev/null +++ b/apps/ops/migrations/0019_adhocexecution_celery_task_id.py @@ -0,0 +1,18 @@ +# Generated by Django 3.1 on 2020-12-30 12:04 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('ops', '0018_auto_20200509_1434'), + ] + + operations = [ + migrations.AddField( + model_name='adhocexecution', + name='celery_task_id', + field=models.UUIDField(default=None, null=True), + ), + ] diff --git a/apps/ops/models/adhoc.py b/apps/ops/models/adhoc.py index aa2884105..63050e436 100644 --- a/apps/ops/models/adhoc.py +++ b/apps/ops/models/adhoc.py @@ -179,13 +179,13 @@ class AdHoc(OrgModelMixin): def run(self): try: - hid = current_task.request.id - if AdHocExecution.objects.filter(id=hid).exists(): - hid = uuid.uuid4() + celery_task_id = current_task.request.id except AttributeError: - hid = uuid.uuid4() + celery_task_id = None + execution = AdHocExecution( - id=hid, adhoc=self, task=self.task, + celery_task_id=celery_task_id, + adhoc=self, task=self.task, task_display=str(self.task)[:128], date_start=timezone.now(), hosts_amount=self.hosts.count(), @@ -237,6 +237,7 @@ class AdHocExecution(OrgModelMixin): id = models.UUIDField(default=uuid.uuid4, primary_key=True) task = models.ForeignKey(Task, related_name='execution', on_delete=models.SET_NULL, null=True) task_display = models.CharField(max_length=128, blank=True, default='', verbose_name=_("Task display")) + celery_task_id = models.UUIDField(default=None, null=True) hosts_amount = models.IntegerField(default=0, verbose_name=_("Host amount")) adhoc = models.ForeignKey(AdHoc, related_name='execution', on_delete=models.SET_NULL, null=True) date_start = models.DateTimeField(auto_now_add=True, verbose_name=_('Start time')) @@ -270,6 +271,7 @@ class AdHocExecution(OrgModelMixin): self.adhoc.tasks, self.adhoc.pattern, self.task.name, + execution_id=self.id ) return result.results_raw, result.results_summary except AnsibleError as e: diff --git a/apps/ops/urls/api_urls.py b/apps/ops/urls/api_urls.py index 15010923b..a5838073f 100644 --- a/apps/ops/urls/api_urls.py +++ b/apps/ops/urls/api_urls.py @@ -22,6 +22,8 @@ urlpatterns = [ path('tasks//run/', api.TaskRun.as_view(), name='task-run'), path('celery/task//log/', api.CeleryTaskLogApi.as_view(), name='celery-task-log'), path('celery/task//result/', api.CeleryResultApi.as_view(), name='celery-result'), + + path('ansible/task//log/', api.AnsibleTaskLogApi.as_view(), name='ansible-task-log'), ] urlpatterns += router.urls diff --git a/apps/ops/urls/ws_urls.py b/apps/ops/urls/ws_urls.py index 5b6bd74d2..e1d44dd7c 100644 --- a/apps/ops/urls/ws_urls.py +++ b/apps/ops/urls/ws_urls.py @@ -5,5 +5,5 @@ from .. import ws app_name = 'ops' urlpatterns = [ - path('ws/ops/tasks/log/', ws.CeleryLogWebsocket, name='task-log-ws'), + path('ws/ops/tasks/log/', ws.TaskLogWebsocket, name='task-log-ws'), ] diff --git a/apps/ops/utils.py b/apps/ops/utils.py index 04d35a2fa..d8c186dd6 100644 --- a/apps/ops/utils.py +++ b/apps/ops/utils.py @@ -1,4 +1,6 @@ # ~*~ coding: utf-8 ~*~ +import os + from django.utils.translation import ugettext_lazy as _ from common.utils import get_logger, get_object_or_none @@ -75,3 +77,10 @@ def send_server_performance_mail(path, usage, usages): send_mail_async(subject, message, recipient_list, html_message=message) +def get_task_log_path(base_path, task_id, level=2): + task_id = str(task_id) + rel_path = os.path.join(*task_id[:level], task_id + '.log') + path = os.path.join(base_path, rel_path) + os.makedirs(os.path.dirname(path), exist_ok=True) + return path + diff --git a/apps/ops/ws.py b/apps/ops/ws.py index 343a6a8ee..17acaf508 100644 --- a/apps/ops/ws.py +++ b/apps/ops/ws.py @@ -6,25 +6,37 @@ import json from common.utils import get_logger from .celery.utils import get_celery_task_log_path +from .ansible.utils import get_ansible_task_log_path from channels.generic.websocket import JsonWebsocketConsumer logger = get_logger(__name__) -class CeleryLogWebsocket(JsonWebsocketConsumer): +class TaskLogWebsocket(JsonWebsocketConsumer): disconnected = False + log_types = { + 'celery': get_celery_task_log_path, + 'ansible': get_ansible_task_log_path + } + def connect(self): self.accept() + def get_log_path(self, task_id): + func = self.log_types.get(self.log_type) + if func: + return func(task_id) + def receive(self, text_data=None, bytes_data=None, **kwargs): data = json.loads(text_data) - task_id = data.get("task") + task_id = data.get('task') + self.log_type = data.get('type', 'celery') if task_id: self.handle_task(task_id) def wait_util_log_path_exist(self, task_id): - log_path = get_celery_task_log_path(task_id) + log_path = self.get_log_path(task_id) while not self.disconnected: if not os.path.exists(log_path): self.send_json({'message': '.', 'task': task_id}) @@ -70,5 +82,3 @@ class CeleryLogWebsocket(JsonWebsocketConsumer): def disconnect(self, close_code): self.disconnected = True self.close() - -