From 97b8bcd5ca65a077d008eca631b5cf38b5f011a6 Mon Sep 17 00:00:00 2001 From: Administrator Date: Mon, 31 Oct 2016 17:41:26 +0800 Subject: [PATCH] =?UTF-8?q?[future]=201.=20settings=20=E6=B7=BB=E5=8A=A0op?= =?UTF-8?q?s=20=E7=9A=84logger,=20=E5=85=B3=E4=BA=8EAnsible=E7=9A=84log?= =?UTF-8?q?=E5=8D=95=E7=8B=AC=E8=AE=B0=E5=BD=95=202.=20models=20=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0Tasker=E6=A8=A1=E5=9E=8B,=20=E6=B7=BB=E5=8A=A0AnsibleH?= =?UTF-8?q?ostReuslt=20=E5=AF=B9=E4=BA=8E=E6=95=B0=E6=8D=AE=E5=A4=84?= =?UTF-8?q?=E7=90=86=E7=9A=84=E6=96=B9=E6=B3=95=203.=20ansible=5Fapi,=20ca?= =?UTF-8?q?llback=20=E7=B1=BB=EF=BC=8C=E5=A2=9E=E5=8A=A0=E4=BF=9D=E5=AD=98?= =?UTF-8?q?Tasker=E7=9A=84=E9=80=BB=E8=BE=91,i=E5=85=B6=E4=BB=96=E5=85=BC?= =?UTF-8?q?=E5=AE=B9=204.=20taskers,=20=E5=AE=9E=E7=8E=B0=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E7=A1=AC=E4=BB=B6=E4=BF=A1=E6=81=AF=E5=92=8Cping?= =?UTF-8?q?=E7=9A=84=20tasker=E6=8E=A5=E5=8F=A3=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/jumpserver/settings.py | 10 ++- apps/ops/ansible_api.py | 59 +++++++++----- apps/ops/models.py | 155 ++++++++++++++++++++++++++++++++++-- apps/ops/taskers.py | 58 ++++++++++++-- apps/ops/tasks.py | 8 +- 5 files changed, 250 insertions(+), 40 deletions(-) diff --git a/apps/jumpserver/settings.py b/apps/jumpserver/settings.py index ec2c98865..c4fbec0a0 100644 --- a/apps/jumpserver/settings.py +++ b/apps/jumpserver/settings.py @@ -182,6 +182,12 @@ LOGGING = { 'formatter': 'main', 'filename': os.path.join(PROJECT_DIR, 'logs', 'jumpserver.log') }, + 'ansible_logs': { + 'level': 'DEBUG', + 'class': 'logging.FileHandler', + 'formatter': 'main', + 'filename': os.path.join(PROJECT_DIR, 'logs', 'ansible.log') + }, }, 'loggers': { 'django': { @@ -211,8 +217,8 @@ LOGGING = { 'handlers': ['console', 'file'], 'level': LOG_LEVEL, }, - 'jumpserver.ops.ansible_api': { - 'handlers': ['console', 'file'], + 'ops.ansible_api': { + 'handlers': ['console', 'ansible_logs'], 'level': LOG_LEVEL, } } diff --git a/apps/ops/ansible_api.py b/apps/ops/ansible_api.py index a5e940310..09fb06cd5 100644 --- a/apps/ops/ansible_api.py +++ b/apps/ops/ansible_api.py @@ -4,9 +4,11 @@ from __future__ import unicode_literals import os import json import logging +import traceback import ansible.constants as default_config - +from uuid import uuid4 +from django.utils import timezone from ansible.executor.task_queue_manager import TaskQueueManager from ansible.inventory import Inventory, Host, Group from ansible.vars import VariableManager @@ -16,7 +18,8 @@ from ansible.utils.display import Display from ansible.playbook.play import Play from ansible.plugins.callback import CallbackBase -from models import AnsiblePlay, AnsibleTask, AnsibleHostResult +from models import Tasker, AnsiblePlay, AnsibleTask, AnsibleHostResult + logger = logging.getLogger(__name__) @@ -177,24 +180,28 @@ class CallbackModule(CallbackBase): CALLBACK_TYPE = 'stdout' CALLBACK_NAME = 'json' - def __init__(self, display=None): + def __init__(self, tasker_id, display=None): super(CallbackModule, self).__init__(display) self.results = [] self.output = {} + self.tasker_id = tasker_id def _new_play(self, play): """将Play保持到数据里面 """ ret = { + 'tasker': self.tasker_id, 'name': play.name, 'uuid': str(play._uuid), 'tasks': [] } try: - play = AnsiblePlay(name=ret['name'], uuid=ret['uuid'], completed=False) + tasker = Tasker.objects.get(uuid=self.tasker_id) + play = AnsiblePlay(tasker, name=ret['name'], uuid=ret['uuid']) play.save() except Exception as e: + traceback.print_exc() logger.error("Save ansible play uuid to database error!, %s" % e.message) return ret @@ -418,24 +425,37 @@ class ADHocRunner(InventoryMixin): self.groups = [] self.inventory = self.gen_inventory() - # 初始化callback插件 - self.results_callback = CallbackModule() - self.play = Play().load(play_data, variable_manager=self.variable_manager, loader=self.loader) @staticmethod - def update_db_play(result, ext_code): + def update_db_tasker(tasker_id, ext_code): try: - play = AnsiblePlay.objects.get(uuid=result[0]['uuid']) - play.completed = True - play.status_code = ext_code - play.save() + tasker = Tasker.objects.get(uuid=tasker_id) + tasker.end = timezone.now() + tasker.completed = True + tasker.exit_code = ext_code + tasker.save() except Exception as e: - logger.error("Update Ansible Play Status into database error!, %s" % e.message) + logger.error("Update Tasker Status into database error!, %s" % e.message) - def run(self): + def create_db_tasker(self, name, uuid): + try: + hosts = [host.get('name') for host in self.hosts] + tasker = Tasker(name=name, uuid=uuid, hosts=','.join(hosts), start=timezone.now()) + tasker.save() + except Exception as e: + logger.error("Save Tasker to database error!, %s" % e.message) + + def run(self, tasker_name, tasker_uuid): """执行ADHoc, 执行完后, 修改AnsiblePlay的状态为完成状态. + + :param tasker_uuid 用于标示此次task """ + # 初始化callback插件,以及Tasker + + self.create_db_tasker(tasker_name, tasker_uuid) + self.results_callback = CallbackModule(tasker_uuid) + tqm = None # TODO:日志和结果分析 try: @@ -450,7 +470,8 @@ class ADHocRunner(InventoryMixin): ext_code = tqm.run(self.play) result = self.results_callback.results - self.update_db_play(result, ext_code) + # 任务运行结束, 标示任务完成 + self.update_db_tasker(tasker_uuid, ext_code) ret = json.dumps(result) return ext_code, ret @@ -468,7 +489,7 @@ def test_run(): "ip": "192.168.1.119", "port": "22", "username": "root", - "password": "xxx", + "password": "tongfang_test", "key": "asset_private_key", }, { @@ -487,12 +508,12 @@ def test_run(): "hosts": "default", "gather_facts": "no", "tasks": [ - dict(action=dict(module='setup')), - dict(action=dict(module='command', args='lsss')) + dict(action=dict(module='ping')), ] } hoc = ADHocRunner(conf, play_source, *assets) - ext_code, result = hoc.run() + uuid = "tasker-" + uuid4().hex + ext_code, result = hoc.run("test_task", uuid) print ext_code print result diff --git a/apps/ops/models.py b/apps/ops/models.py index 70f44720e..d3e52566e 100644 --- a/apps/ops/models.py +++ b/apps/ops/models.py @@ -11,24 +11,46 @@ from django.utils.translation import ugettext_lazy as _ logger = logging.getLogger(__name__) +class Tasker(models.Model): + uuid = models.CharField(max_length=128, verbose_name=_('UUID'), primary_key=True) + name = models.CharField(max_length=128, blank=True, verbose_name=_('Name')) + start = models.DateTimeField(auto_now_add=True, verbose_name=_('Start Time')) + end = models.DateTimeField(blank=True, null=True, verbose_name=_('End Time')) + exit_code = models.IntegerField(default=0, verbose_name=_('Exit Code')) + completed = models.BooleanField(default=False, verbose_name=_('Is Completed')) + hosts = models.TextField(blank=True, null=True, verbose_name=_('Hosts')) + + def __unicode__(self): + return "%s" % self.uuid + + @property + def total_hosts(self): + return self.hosts.split(',') + + class AnsiblePlay(models.Model): + tasker = models.ForeignKey(Tasker, related_name='plays', blank=True, null=True) uuid = models.CharField(max_length=128, verbose_name=_('UUID'), primary_key=True) name = models.CharField(max_length=128, verbose_name=_('Name')) - completed = models.BooleanField(default=False, verbose_name=_('IsCompleted')) - status_code = models.IntegerField(default=0, verbose_name=_('StatusCode')) def __unicode__(self): return "%s<%s>" % (self.name, self.uuid) + def to_dict(self): + return {"uuid": self.uuid, "name": self.name} + class AnsibleTask(models.Model): + play = models.ForeignKey(AnsiblePlay, related_name='tasks', blank=True, null=True) uuid = models.CharField(max_length=128, verbose_name=_('UUID'), primary_key=True) - play = models.ForeignKey(AnsiblePlay, related_name='tasks', blank=True) name = models.CharField(max_length=128, blank=True, verbose_name=_('Name')) def __unicode__(self): return "%s<%s>" % (self.name, self.uuid) + def to_dict(self): + return {"uuid": self.uuid, "name": self.name} + def failed(self): pass @@ -37,9 +59,8 @@ class AnsibleTask(models.Model): class AnsibleHostResult(models.Model): - task = models.ForeignKey(AnsibleTask, related_name='host_results', blank=True) + task = models.ForeignKey(AnsibleTask, related_name='host_results', blank=True, null=True) name = models.CharField(max_length=128, blank=True, verbose_name=_('Name')) - status = models.BooleanField(blank=True, default=False, verbose_name=_('Status')) success = models.TextField(blank=True, verbose_name=_('Success')) skipped = models.TextField(blank=True, verbose_name=_('Skipped')) failed = models.TextField(blank=True, verbose_name=_('Failed')) @@ -47,7 +68,7 @@ class AnsibleHostResult(models.Model): no_host = models.TextField(blank=True, verbose_name=_('NoHost')) def __unicode__(self): - return "%s<%s>" % (self.name, str(self.status)) + return "%s %s<%s>" % (self.name, str(self.is_success), self.task.uuid) @property def is_failed(self): @@ -56,14 +77,18 @@ class AnsibleHostResult(models.Model): return False @property - def success_data(self): + def is_success(self): + return not self.is_failed + + @property + def _success_data(self): if self.success: return json.loads(self.success) elif self.skipped: return json.loads(self.skipped) @property - def failed_data(self): + def _failed_data(self): if self.failed: return json.loads(self.failed) elif self.unreachable: @@ -71,3 +96,117 @@ class AnsibleHostResult(models.Model): elif self.no_host: return {"msg": self.no_host} + @property + def failed_msg(self): + return self._failed_data.get("msg") + + @staticmethod + def __filter_disk(ansible_devices, exclude_devices): + """ + 过滤磁盘设备,丢弃掉不需要的设备 + + :param ansible_devices: 对应的facts字段 + :param exclude_devices: 一个需要被丢弃的设备,匹配规则是startwith, 比如需要丢弃sr0子类的 ['sr'] + :return: 过滤获取的结果 + """ + for start_str in exclude_devices: + for key in ansible_devices.keys(): + if key.startswith(start_str): + ansible_devices.pop(key) + return ansible_devices + + @staticmethod + def __filter_interface(ansible_interfaces, exclude_interface): + """ + 过滤网卡设备,丢弃掉不需要的网卡, 比如lo + + :param ansible_interface: 对应的facts字段 + :param exclude_interface: 一个需要被丢弃的设备,匹配规则是startwith, 比如需要丢弃lo子类的 ['lo'] + :return: 过滤获取的结果 + """ + for interface in ansible_interfaces: + for start_str in exclude_interface: + if interface.startswith(start_str): + i = ansible_interfaces.index(interface) + ansible_interfaces.pop(i) + return ansible_interfaces + + @staticmethod + def __gather_interface(facts, interfaces): + """ + 收集所有interface的具体信息 + + :param facts: ansible faces + :param interfaces: 需要收集的intreface列表 + :return: interface的详情 + """ + result = {} + for key in interfaces: + if "ansible_" + key in facts.keys(): + result[key] = facts.get(key) + return result + + def __deal_setup(self): + """ + 处理ansible setup模块收集到的数据,提取资产需要的部分 + + :return: {"msg": , "data": }, 注意msg是异常信息, 有msg时 data为None + """ + result = self._success_data + module_name = result['invocation'].get('module_name') if result.get('invocation') else None + if module_name is not None: + if module_name != "setup": + return {"msg": "the property only for ansible setup module result!, can't support other module", "data":None} + else: + data = {} + facts =result.get('ansible_facts') + interfaces = self.__filter_interface(facts.get('ansible_interfaces'), ['lo']) + + cpu_describe = "%s %s" % (facts.get('ansible_processor')[0], facts.get('ansible_processor')[1]) if len(facts.get('ansible_processor')) >= 2 else "" + + data['sn'] = facts.get('ansible_product_serial') + data['env'] = facts.get('ansible_env') + data['os'] = "%s %s(%s)" % (facts.get('ansible_distribution'), + facts.get('ansible_distribution_version'), + facts.get('ansible_distribution_release')) + data['mem'] = facts.get('ansible_memtotal_mb') + data['cpu'] = "%s %d核" % (cpu_describe, facts.get('ansible_processor_count')) + data['disk'] = self.__filter_disk(facts.get('ansible_devices'), ['sr']) + data['interface'] = self.__gather_interface(facts, interfaces) + return {"msg": None, "data": data} + else: + return {"msg": "there isn't module_name field! can't process this data format", "data": None} + + @property + def deal_setup(self): + try: + return self.__deal_setup() + except Exception as e: + return {"msg": "deal with setup data failed, %s" % e.message, "data": None} + + def __deal_ping(self): + """ + 处理ansible ping模块收集到的数据 + + :return: {"msg": , "data": {"success": }}, 注意msg是异常信息, 有msg时 data为None + """ + result = self._success_data + module_name = result['invocation'].get('module_name') if result.get('invocation') else None + if module_name is not None: + if module_name != "ping": + return {"msg": "the property only for ansible setup module result!, can't support other module", "data":None} + else: + ping = True if result.get('ping') == "pong" else False + + return {"msg": None, "data": {"success": ping}} + else: + return {"msg": "there isn't module_name field! can't process this data format", "data": None} + + @property + def deal_ping(self): + try: + return self.__deal_ping() + except Exception as e: + return {"msg": "deal with ping data failed, %s" % e.message, "data": None} + + diff --git a/apps/ops/taskers.py b/apps/ops/taskers.py index 12bdfaff9..17dbd3a5b 100644 --- a/apps/ops/taskers.py +++ b/apps/ops/taskers.py @@ -1,14 +1,12 @@ from __future__ import unicode_literals from .tasks import * + +from .models import Tasker, AnsiblePlay, AnsibleTask, AnsibleHostResult +from uuid import uuid1 from celery.result import AsyncResult -def start_get_hardware_info(*assets): - result = get_asset_hardware_info.delay(*assets) - return result.id - - def get_result(task_id): result = AsyncResult(task_id) if result.ready(): @@ -17,8 +15,54 @@ def get_result(task_id): return {"Completed": False, "data": None} +def start_get_hardware_info(*assets): + name = "Get host hardware information" + uuid = "tasker-" + uuid1().hex + get_asset_hardware_info.delay(name, uuid, *assets) + return uuid + + +def __get_hardware_info(tasker_uuid): + tasker = Tasker.objects.get(uuid=tasker_uuid) + host_results = [] + + for play in tasker.plays.all(): + for t in play.tasks.all(): + for h in t.host_results.all(): + host_results.append(h) + + return host_results + + +def get_hardware_info(tasker_uuid): + try: + return {"msg": None, "data": __get_hardware_info(tasker_uuid)} + except Exception as e: + return {"msg": "query data failed!, %s" % e.message, "data": None} + + def start_ping_test(*assets): - result = asset_test_ping_check.delay(*assets) - return result.id + name = "Test host connection" + uuid = "tasker-" + uuid1().hex + asset_test_ping_check.delay(name, uuid, *assets) + return uuid +def __get_ping_test(tasker_uuid): + tasker = Tasker.objects.get(uuid=tasker_uuid) + host_results = [] + + for play in tasker.plays.all(): + for t in play.tasks.all(): + for h in t.host_results.all(): + host_results.append(h) + + return host_results + + +def get_ping_test(tasker_uuid): + try: + return {"msg": None, "data": __get_ping_test(tasker_uuid)} + except Exception as e: + return {"msg": "query data failed!, %s" % e.message, "data": None} + diff --git a/apps/ops/tasks.py b/apps/ops/tasks.py index 9ea8de9b2..21b1470ba 100644 --- a/apps/ops/tasks.py +++ b/apps/ops/tasks.py @@ -7,7 +7,7 @@ from ops.ansible_api import Config, ADHocRunner @shared_task(name="get_asset_hardware_info") -def get_asset_hardware_info(*assets): +def get_asset_hardware_info(task_name, task_uuid, *assets): conf = Config() play_source = { "name": "Get host hardware information", @@ -18,12 +18,12 @@ def get_asset_hardware_info(*assets): ] } hoc = ADHocRunner(conf, play_source, *assets) - ext_code, result = hoc.run() + ext_code, result = hoc.run(task_name, task_uuid) return ext_code, result @shared_task(name="asset_test_ping_check") -def asset_test_ping_check(*assets): +def asset_test_ping_check(task_name, task_uuid, *assets): conf = Config() play_source = { "name": "Test host connection use ping", @@ -34,7 +34,7 @@ def asset_test_ping_check(*assets): ] } hoc = ADHocRunner(conf, play_source, *assets) - ext_code, result = hoc.run() + ext_code, result = hoc.run(task_name, task_uuid) return ext_code, result