From a543a2ee377ce4d37a8c63b5c5a54ab18211f49b Mon Sep 17 00:00:00 2001 From: ibuler Date: Sat, 8 Oct 2022 19:12:04 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E5=9F=BA=E6=9C=AC=E5=AE=8C=E6=88=90=20?= =?UTF-8?q?adhoc=20runner?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/ops/ansible/inventory.py | 11 ++++++-- apps/ops/ansible/runner.py | 12 ++++++-- apps/ops/models/__init__.py | 1 + apps/ops/models/adhoc.py | 17 ++++++++--- apps/ops/models/base.py | 53 +++++++++++++++++++++++++++-------- apps/ops/models/playbook.py | 6 +++- apps/ops/tasks.py | 45 ++++++++++++++++------------- 7 files changed, 105 insertions(+), 40 deletions(-) diff --git a/apps/ops/ansible/inventory.py b/apps/ops/ansible/inventory.py index 4da027696..85a3d03ff 100644 --- a/apps/ops/ansible/inventory.py +++ b/apps/ops/ansible/inventory.py @@ -1,6 +1,7 @@ # ~*~ coding: utf-8 ~*~ from collections import defaultdict import json +import os __all__ = ['JMSInventory'] @@ -136,15 +137,19 @@ class JMSInventory: account = self.select_account(asset) host = self.asset_to_host(asset, account, automation, protocols) hosts.append(host) - return hosts - def write_to_file(self, path): - hosts = self.generate() data = {'all': {'hosts': {}}} for host in hosts: name = host.pop('name') var = host.pop('vars', {}) host.update(var) data['all']['hosts'][name] = host + return data + + def write_to_file(self, path): + data = self.generate() + path_dir = os.path.dirname(path) + if not os.path.exists(path_dir): + os.makedirs(path_dir, 0o700, True) with open(path, 'w') as f: f.write(json.dumps(data, indent=4)) diff --git a/apps/ops/ansible/runner.py b/apps/ops/ansible/runner.py index a420fb8a4..36a0e7e8c 100644 --- a/apps/ops/ansible/runner.py +++ b/apps/ops/ansible/runner.py @@ -1,7 +1,9 @@ import uuid -import ansible_runner +import os +import ansible_runner from django.conf import settings + from .callback import DefaultCallback @@ -11,7 +13,7 @@ class AdHocRunner: "reboot", 'shutdown', 'poweroff', 'halt', 'dd', 'half', 'top' ] - def __init__(self, inventory, module, module_args, pattern='*', project_dir='/tmp/'): + def __init__(self, inventory, module, module_args='', pattern='*', project_dir='/tmp/'): self.id = uuid.uuid4() self.inventory = inventory self.pattern = pattern @@ -32,6 +34,12 @@ class AdHocRunner: if verbosity is None and settings.DEBUG: verbosity = 1 + if not os.path.exists(self.project_dir): + os.mkdir(self.project_dir, 0o755) + + print("inventory: ") + print(self.inventory) + ansible_runner.run( host_pattern=self.pattern, private_data_dir=self.project_dir, diff --git a/apps/ops/models/__init__.py b/apps/ops/models/__init__.py index fcd8bd8f7..93b630dd6 100644 --- a/apps/ops/models/__init__.py +++ b/apps/ops/models/__init__.py @@ -3,3 +3,4 @@ from .adhoc import * from .celery import * +from .playbook import * diff --git a/apps/ops/models/adhoc.py b/apps/ops/models/adhoc.py index 565df9f3e..c3a7822a9 100644 --- a/apps/ops/models/adhoc.py +++ b/apps/ops/models/adhoc.py @@ -1,5 +1,5 @@ # ~*~ coding: utf-8 ~*~ - +import os.path from django.db import models from django.utils.translation import ugettext_lazy as _ @@ -18,7 +18,12 @@ class AdHoc(BaseAnsibleTask): pattern = models.CharField(max_length=1024, verbose_name=_("Pattern"), default='all') module = models.CharField(max_length=128, default='shell', verbose_name=_('Module')) args = models.CharField(max_length=1024, default='', verbose_name=_('Args')) - last_execution = models.ForeignKey('AdHocExecution', verbose_name=_("Last execution"), on_delete=models.SET_NULL, null=True, blank=True) + last_execution = models.ForeignKey('AdHocExecution', verbose_name=_("Last execution"), + on_delete=models.SET_NULL, null=True, blank=True) + + def get_register_task(self): + from ops.tasks import run_adhoc + return "run_adhoc_{}".format(self.id), run_adhoc, (str(self.id),), {} def __str__(self): return "{}: {}".format(self.module, self.args) @@ -31,10 +36,14 @@ class AdHocExecution(BaseAnsibleExecution): task = models.ForeignKey('AdHoc', verbose_name=_("Adhoc"), related_name='executions', on_delete=models.CASCADE) def get_runner(self): - return AdHocRunner( - self.task.inventory, self.task.module, self.task.args, + inv = self.task.inventory + inv.write_to_file(self.inventory_path) + + runner = AdHocRunner( + self.inventory_path, self.task.module, module_args=self.task.args, pattern=self.task.pattern, project_dir=self.private_dir ) + return runner class Meta: db_table = "ops_adhoc_execution" diff --git a/apps/ops/models/base.py b/apps/ops/models/base.py index 2992173c3..f43e0c584 100644 --- a/apps/ops/models/base.py +++ b/apps/ops/models/base.py @@ -1,5 +1,6 @@ import os.path import uuid +import logging from django.db import models from django.utils.translation import gettext_lazy as _ @@ -25,7 +26,7 @@ class BaseAnsibleTask(PeriodTaskModelMixin, JMSOrgBaseModel): @property def inventory(self): inv = JMSInventory(self.assets.all(), self.account, self.account_policy) - return inv.generate() + return inv def get_register_task(self): raise NotImplemented @@ -33,11 +34,19 @@ class BaseAnsibleTask(PeriodTaskModelMixin, JMSOrgBaseModel): def to_json(self): raise NotImplemented + def create_execution(self): + execution = self.executions.create() + return execution + + def run(self, *args, **kwargs): + execution = self.create_execution() + return execution.start() + class BaseAnsibleExecution(models.Model): id = models.UUIDField(primary_key=True, default=uuid.uuid4) status = models.CharField(max_length=16, verbose_name=_('Status'), default='running') - task = models.ForeignKey(BaseAnsibleTask, on_delete=models.CASCADE, null=True) + task = models.ForeignKey(BaseAnsibleTask, on_delete=models.CASCADE, related_name='executions', null=True) result = models.JSONField(blank=True, null=True, verbose_name=_('Result')) summary = models.JSONField(default=dict, verbose_name=_('Summary')) creator = models.ForeignKey('users.User', verbose_name=_("Creator"), on_delete=models.SET_NULL, null=True) @@ -52,13 +61,40 @@ class BaseAnsibleExecution(models.Model): def __str__(self): return str(self.id) + @property def private_dir(self): uniq = self.date_created.strftime('%Y%m%d_%H%M%S') + '_' + self.short_id return os.path.join(settings.ANSIBLE_DIR, self.task.name, uniq) + @property + def inventory_path(self): + return os.path.join(self.private_dir, 'inventory', 'hosts') + def get_runner(self): raise NotImplemented + def finish_task(self): + self.date_finished = timezone.now() + self.save(update_fields=['result', 'status', 'summary', 'date_finished']) + self.update_task() + + def set_error(self, error): + this = self.__class__.objects.get(id=self.id) # 重新获取一次,避免数据库超时连接超时 + this.status = 'failed' + this.summary['error'] = str(error) + this.finish_task() + + def set_result(self, cb): + status_mapper = { + 'successful': 'succeeded', + } + this = self.__class__.objects.get(id=self.id) + this.status = status_mapper.get(cb.status, cb.status) + this.summary = cb.summary + this.result = cb.result + this.finish_task() + print("Finished") + def update_task(self): self.task.last_execution = self self.task.date_last_run = timezone.now() @@ -68,16 +104,11 @@ class BaseAnsibleExecution(models.Model): runner = self.get_runner() try: cb = runner.run(**kwargs) - self.status = cb.status - self.summary = cb.summary - self.result = cb.result - self.date_finished = timezone.now() + self.set_result(cb) + return cb except Exception as e: - self.status = 'failed' - self.summary = {'error': str(e)} - finally: - self.save() - self.update_task() + logging.error(e, exc_info=True) + self.set_error(e) @property def is_finished(self): diff --git a/apps/ops/models/playbook.py b/apps/ops/models/playbook.py index aec59bfb0..0701ed13e 100644 --- a/apps/ops/models/playbook.py +++ b/apps/ops/models/playbook.py @@ -27,7 +27,11 @@ class Playbook(BaseAnsibleTask): last_execution = models.ForeignKey('PlaybookExecution', verbose_name=_("Last execution"), on_delete=models.SET_NULL, null=True, blank=True) def get_register_task(self): - pass + name = "automation_strategy_period_{}".format(str(self.id)[:8]) + task = execute_automation_strategy.name + args = (str(self.id), Trigger.timing) + kwargs = {} + return name, task, args, kwargs class PlaybookExecution(BaseAnsibleExecution): diff --git a/apps/ops/tasks.py b/apps/ops/tasks.py index 0ef430d7a..e9ba28eb7 100644 --- a/apps/ops/tasks.py +++ b/apps/ops/tasks.py @@ -20,7 +20,7 @@ from .celery.utils import ( create_or_update_celery_periodic_tasks, get_celery_periodic_task, disable_celery_periodic_task, delete_celery_periodic_task ) -from .models import CommandExecution, CeleryTask +from .models import CeleryTask, AdHoc, Playbook from .notifications import ServerPerformanceCheckUtil logger = get_logger(__file__) @@ -30,41 +30,48 @@ def rerun_task(): pass -@shared_task(queue="ansible", verbose_name=_("Run ansible task")) -def run_ansible_task(tid, callback=None, **kwargs): +@shared_task(soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible task")) +def run_adhoc(tid, **kwargs): """ :param tid: is the tasks serialized data :param callback: callback function name :return: """ with tmp_to_root_org(): - task = get_object_or_none(Task, id=tid) + task = get_object_or_none(AdHoc, id=tid) if not task: logger.error("No task found") return with tmp_to_org(task.org): - result = task.run() - if callback is not None: - subtask(callback).delay(result, task_name=task.name) - return result + execution = task.create_execution() + try: + execution.start(**kwargs) + except SoftTimeLimitExceeded: + execution.set_error('Run timeout') + logger.error("Run adhoc timeout") + except Exception as e: + execution.set_error(e) + logger.error("Start adhoc execution error: {}".format(e)) @shared_task(soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible command")) -def run_command_execution(cid, **kwargs): +def run_playbook(pid, **kwargs): with tmp_to_root_org(): - execution = get_object_or_none(CommandExecution, id=cid) - if not execution: - logger.error("Not found the execution id: {}".format(cid)) + task = get_object_or_none(Playbook, id=pid) + if not task: + logger.error("No task found") return - with tmp_to_org(execution.run_as.org): + + with tmp_to_org(task.org): + execution = task.create_execution() try: - os.environ.update({ - "TERM_ROWS": kwargs.get("rows", ""), - "TERM_COLS": kwargs.get("cols", ""), - }) - execution.run() + execution.start(**kwargs) except SoftTimeLimitExceeded: - logger.error("Run time out") + execution.set_error('Run timeout') + logger.error("Run playbook timeout") + except Exception as e: + execution.set_error(e) + logger.error("Run playbook execution error: {}".format(e)) @shared_task