From 6015f2423ba6eb7f74e28e8f3c6758aeb121c833 Mon Sep 17 00:00:00 2001 From: ibuler Date: Sun, 5 Mar 2017 20:53:24 +0800 Subject: [PATCH] =?UTF-8?q?[Fixture]=20=E6=B7=BB=E5=8A=A0PlaybookRunner?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/ops/utils/ansible_api.py | 418 ++++++++++++++++------------------ apps/ops/utils/inventory.py | 91 ++++++++ apps/ops/utils/runner.py | 2 + 3 files changed, 286 insertions(+), 225 deletions(-) create mode 100644 apps/ops/utils/inventory.py create mode 100644 apps/ops/utils/runner.py diff --git a/apps/ops/utils/ansible_api.py b/apps/ops/utils/ansible_api.py index 38ab320b4..0e1aab24e 100644 --- a/apps/ops/utils/ansible_api.py +++ b/apps/ops/utils/ansible_api.py @@ -14,7 +14,7 @@ from ansible.executor.task_queue_manager import TaskQueueManager from ansible.inventory import Inventory, Host, Group from ansible.vars import VariableManager from ansible.parsing.dataloader import DataLoader -from ansible.executor import playbook_executor +from ansible.executor.playbook_executor import PlaybookExecutor from ansible.utils.display import Display from ansible.playbook.play import Play from ansible.plugins.callback import CallbackBase @@ -23,6 +23,8 @@ from ansible.utils.vars import load_extra_vars from ansible.utils.vars import load_options_vars from ..models import TaskRecord, AnsiblePlay, AnsibleTask, AnsibleHostResult +from .inventory import JMSInventory + __all__ = ["ADHocRunner", "Options"] @@ -35,159 +37,6 @@ class AnsibleError(StandardError): pass -# class Options(object): -# """Ansible运行时配置类, 用于初始化Ansible的一些默认配置. -# """ -# def __init__(self, verbosity=None, inventory=None, listhosts=None, subset=None, module_paths=None, extra_vars=None, -# forks=10, ask_vault_pass=False, vault_password_files=None, new_vault_password_file=None, -# output_file=None, tags=None, skip_tags=None, one_line=None, tree=None, ask_sudo_pass=False, ask_su_pass=False, -# sudo=None, sudo_user=None, become=None, become_method=None, become_user=None, become_ask_pass=False, -# ask_pass=False, private_key_file=None, remote_user=None, connection="smart", timeout=10, ssh_common_args=None, -# sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None, poll_interval=None, seconds=None, check=False, -# syntax=None, diff=None, force_handlers=None, flush_cache=None, listtasks=None, listtags=None, module_path=None): -# self.verbosity = verbosity -# self.inventory = inventory -# self.listhosts = listhosts -# self.subset = subset -# self.module_paths = module_paths -# self.extra_vars = extra_vars -# self.forks = forks -# self.ask_vault_pass = ask_vault_pass -# self.vault_password_files = vault_password_files -# self.new_vault_password_file = new_vault_password_file -# self.output_file = output_file -# self.tags = tags -# self.skip_tags = skip_tags -# self.one_line = one_line -# self.tree = tree -# self.ask_sudo_pass = ask_sudo_pass -# self.ask_su_pass = ask_su_pass -# self.sudo = sudo -# self.sudo_user = sudo_user -# self.become = become -# self.become_method = become_method -# self.become_user = become_user -# self.become_ask_pass = become_ask_pass -# self.ask_pass = ask_pass -# self.private_key_file = private_key_file -# self.remote_user = remote_user -# self.connection = connection -# self.timeout = timeout -# self.ssh_common_args = ssh_common_args -# self.sftp_extra_args = sftp_extra_args -# self.scp_extra_args = scp_extra_args -# self.ssh_extra_args = ssh_extra_args -# self.poll_interval = poll_interval -# self.seconds = seconds -# self.check = check -# self.syntax = syntax -# self.diff = diff -# self.force_handlers = force_handlers -# self.flush_cache = flush_cache -# self.listtasks = listtasks -# self.listtags = listtags -# self.module_path = module_path -# self.__overwrite_default() -# -# def __overwrite_default(self): -# """上面并不能包含Ansible所有的配置, 如果有其他的配置, -# 可以通过替换default_config模块里面的变量进行重载,  -# 比如 default_config.DEFAULT_ASK_PASS = False. -# """ -# default_config.HOST_KEY_CHECKING = False -Options = namedtuple("Options", [ - 'connection', 'module_path', 'private_key_file', "remote_user", "timeout", - 'forks', 'become', 'become_method', 'become_user', 'check', "extra_vars", - ] -) - - -class JMSHost(Host): - def __init__(self, asset): - self.asset = asset - self.name = name = asset.get('hostname') or asset.get('ip') - self.port = port = asset.get('port') or 22 - super(JMSHost, self).__init__(name, port) - self.set_all_variable() - - def set_all_variable(self): - asset = self.asset - self.set_variable('ansible_host', asset['ip']) - self.set_variable('ansible_port', asset['port']) - self.set_variable('ansible_user', asset['username']) - - # 添加密码和秘钥 - if asset.get('password'): - self.set_variable('ansible_ssh_pass', asset['password']) - if asset.get('key'): - self.set_variable('ansible_ssh_private_key_file', asset['private_key']) - - # 添加become支持 - become = asset.get("become", None) - if become is not None: - self.set_variable("ansible_become", True) - self.set_variable("ansible_become_method", become.get('method')) - self.set_variable("ansible_become_user", become.get('user')) - self.set_variable("ansible_become_pass", become.get('pass')) - else: - self.set_variable("ansible_become", False) - - -class JMSInventory(Inventory): - """ - 提供生成Ansible inventory对象的方法 - """ - - def __init__(self, host_list=None): - if host_list is None: - host_list = [] - assert isinstance(host_list, list) - self.host_list = host_list - self.loader = DataLoader() - self.variable_manager = VariableManager() - super(JMSInventory, self).__init__(self.loader, self.variable_manager, - host_list=host_list) - - def parse_inventory(self, host_list): - """用于生成动态构建Ansible Inventory. - self.host_list: [ - {"name": "asset_name", - "ip": , - "port": , - "user": , - "pass": , - "key": , - "groups": ['group1', 'group2'], - "other_host_var": }, - {...}, - ] - - :return: 返回一个Ansible的inventory对象 - """ - - # TODO: 验证输入 - # 创建Ansible Group,如果没有则创建default组 - ungrouped = Group('ungrouped') - all = Group('all') - all.add_child_group(ungrouped) - self.groups = dict(all=all, ungrouped=ungrouped) - - for asset in host_list: - host = JMSHost(asset=asset) - asset_groups = asset.get('groups') - if asset_groups: - for group_name in asset_groups: - if group_name not in self.groups: - group = Group(group_name) - self.groups[group_name] = group - else: - group = self.groups[group_name] - group.add_host(host) - else: - ungrouped.add_host(host) - all.add_host(host) - - class BasicResultCallback(CallbackBase): """ Custom Callback @@ -218,6 +67,102 @@ class BasicResultCallback(CallbackBase): pass +class PlaybookCallBack(CallbackBase): + """ + Custom callback model for handlering the output data of + execute playbook file, + Base on the build-in callback plugins of ansible which named `json`. + """ + + CALLBACK_VERSION = 2.0 + CALLBACK_TYPE = 'stdout' + CALLBACK_NAME = 'Dict' + + def __init__(self, display=None): + super(PlaybookCallBack, self).__init__(display) + self.results = [] + self.output = "" + self.item_results = {} # {"host": []} + + def _new_play(self, play): + return { + 'play': { + 'name': play.name, + 'id': str(play._uuid) + }, + 'tasks': [] + } + + def _new_task(self, task): + return { + 'task': { + 'name': task.get_name(), + }, + 'hosts': {} + } + + def v2_playbook_on_no_hosts_matched(self): + self.output = "skipping: No match hosts." + + def v2_playbook_on_no_hosts_remaining(self): + pass + + def v2_playbook_on_task_start(self, task, is_conditional): + self.results[-1]['tasks'].append(self._new_task(task)) + + def v2_playbook_on_play_start(self, play): + self.results.append(self._new_play(play)) + + def v2_playbook_on_stats(self, stats): + hosts = sorted(stats.processed.keys()) + summary = {} + for h in hosts: + s = stats.summarize(h) + summary[h] = s + + if self.output: + pass + else: + self.output = { + 'plays': self.results, + 'stats': summary + } + + def gather_result(self, res): + if res._task.loop and "results" in res._result and res._host.name in self.item_results: + res._result.update({"results": self.item_results[res._host.name]}) + del self.item_results[res._host.name] + + self.results[-1]['tasks'][-1]['hosts'][res._host.name] = res._result + + def v2_runner_on_ok(self, res, **kwargs): + if "ansible_facts" in res._result: + del res._result["ansible_facts"] + + self.gather_result(res) + + def v2_runner_on_failed(self, res, **kwargs): + self.gather_result(res) + + def v2_runner_on_unreachable(self, res, **kwargs): + self.gather_result(res) + + def v2_runner_on_skipped(self, res, **kwargs): + self.gather_result(res) + + def gather_item_result(self, res): + self.item_results.setdefault(res._host.name, []).append(res._result) + + def v2_runner_item_on_ok(self, res): + self.gather_item_result(res) + + def v2_runner_item_on_failed(self, res): + self.gather_item_result(res) + + def v2_runner_item_on_skipped(self, res): + self.gather_item_result(res) + + class CallbackModule(CallbackBase): """处理和分析Ansible运行结果,并保存数据. """ @@ -356,90 +301,110 @@ class CallbackModule(CallbackBase): class PlayBookRunner(object): """用于执行AnsiblePlaybook的接口.简化Playbook对象的使用. """ + Options = namedtuple('Options', [ + 'listtags', 'listtasks', 'listhosts', 'syntax', 'connection', + 'module_path', 'forks', 'remote_user', 'private_key_file', 'timeout', + 'ssh_common_args', 'ssh_extra_args', 'sftp_extra_args', + 'scp_extra_args', + 'become', 'become_method', 'become_user', 'verbosity', 'check', + 'extra_vars']) - def __init__(self, config, palybook_path, playbook_var, - become_pass, *hosts, **group_vars): - """ + def __init__(self, + hosts=None, + playbook_path=None, + forks=C.DEFAULT_FORKS, + listtags=False, + listtasks=False, + listhosts=False, + syntax=False, + module_path=None, + remote_user='root', + timeout=C.DEFAULT_TIMEOUT, + ssh_common_args=None, + ssh_extra_args=None, + sftp_extra_args=None, + scp_extra_args=None, + become=True, + become_method=None, + become_user="root", + verbosity=None, + extra_vars=None, + connection_type="ssh", + passwords=None, + private_key_file=None, + check=False): - :param config: Config实例 - :param palybook_path: playbook的路径 - :param playbook_var: 执行Playbook时的变量 - :param become_pass: sudo passsword - :param hosts: 可变位置参数, 为一个资产列表, 每一个资产用dict表示, 以下是这个dict必须包含的key - [{ - "name": "asset_name", - "ip": "asset_ip", - "port": "asset_port", - "username": "asset_user", - "password": "asset_pass", - "key": "asset_private_key", - "group": "asset_group_name", - ... - }] - :param group_vars: 可变关键字参数, 是资产组变量, 记录对应的资产组变量 - "groupName1": {"group_variable1": "value1",...} - "groupName2": {"group_variable1": "value1",...} - """ - - self.options = config - - # 设置verbosity级别, 及命令行的--verbose选项 - self.display = Display() - self.display.verbosity = self.options.verbosity - playbook_executor.verbosity = self.options.verbosity - - # sudo成其他用户的配置 - self.options.become = True - self.options.become_method = 'sudo' - self.options.become_user = 'root' - passwords = {'become_pass': become_pass} - - # 传入playbook的路径,以及执行需要的变量 - pb_dir = os.path.dirname(__file__) - playbook = "%s/%s" % (pb_dir, palybook_path) - - # 生成Ansible inventory, 这些变量Mixin都会用到 - self.hosts = hosts - self.group_vars = group_vars + C.RETRY_FILES_ENABLED = False + self.callbackmodule = PlaybookCallBack() + if playbook_path is None or not os.path.exists(playbook_path): + raise AnsibleError( + "Not Found the playbook file: %s." % playbook_path) + self.playbook_path = playbook_path self.loader = DataLoader() self.variable_manager = VariableManager() - self.groups = [] - self.variable_manager.extra_vars = playbook_var - self.inventory = self.gen_inventory() + self.passwords = passwords or {} + self.inventory = JMSInventory(hosts) + + self.options = self.Options( + listtags=listtags, + listtasks=listtasks, + listhosts=listhosts, + syntax=syntax, + timeout=timeout, + connection=connection_type, + module_path=module_path, + forks=forks, + remote_user=remote_user, + private_key_file=private_key_file, + ssh_common_args=ssh_common_args or "", + ssh_extra_args=ssh_extra_args or "", + sftp_extra_args=sftp_extra_args, + scp_extra_args=scp_extra_args, + become=become, + become_method=become_method, + become_user=become_user, + verbosity=verbosity, + extra_vars=extra_vars or [], + check=check + ) + + self.variable_manager.extra_vars = load_extra_vars(loader=self.loader, + options=self.options) + self.variable_manager.options_vars = load_options_vars(self.options) + + self.variable_manager.set_inventory(self.inventory) # 初始化playbook的executor - self.pbex = playbook_executor.PlaybookExecutor( - playbooks=[playbook], + self.runner = PlaybookExecutor( + playbooks=[self.playbook_path], inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, - passwords=passwords) + passwords=self.passwords) + + if self.runner._tqm: + self.runner._tqm._stdout_callback = self.callbackmodule def run(self): - """执行Playbook, 记录执行日志, 处理执行结果. - :return: 对象 - """ - self.pbex.run() - stats = self.pbex._tqm._stats - - # 测试执行是否成功 - run_success = True - hosts = sorted(stats.processed.keys()) - for h in hosts: - t = stats.summarize(h) - if t['unreachable'] > 0 or t['failures'] > 0: - run_success = False - - # TODO: 记录执行日志, 处理执行结果. - - return stats + if not self.inventory.list_hosts('all'): + raise AnsibleError('Inventory is empty') + self.runner.run() + self.runner._tqm.cleanup() + return self.callbackmodule.output class ADHocRunner(object): """ ADHoc接口 """ + Options = namedtuple("Options", [ + 'connection', 'module_path', 'private_key_file', "remote_user", + 'timeout', 'forks', 'become', 'become_method', 'become_user', + 'check', 'extra_vars', + ] + ) + def __init__(self, hosts=C.DEFAULT_HOST_LIST, module_name=C.DEFAULT_MODULE_NAME, # * command @@ -467,7 +432,7 @@ class ADHocRunner(object): self.check_module_args() self.gather_facts = gather_facts self.results_callback = BasicResultCallback() - self.options = Options( + self.options = self.Options( connection=connection_type, timeout=timeout, module_path=module_path, @@ -491,10 +456,13 @@ class ADHocRunner(object): name='Ansible Ad-hoc', hosts=self.pattern, gather_facts=self.gather_facts, - tasks=[dict(action=dict( - module=self.module_name, - args=self.module_args - ))] + tasks=[ + dict(action=dict( + module=self.module_name, + args=self.module_args, + ) + ) + ] ) self.play = Play().load( diff --git a/apps/ops/utils/inventory.py b/apps/ops/utils/inventory.py new file mode 100644 index 000000000..e9b2496d8 --- /dev/null +++ b/apps/ops/utils/inventory.py @@ -0,0 +1,91 @@ +# ~*~ coding: utf-8 ~*~ +from ansible.inventory import Inventory, Host, Group +from ansible.vars import VariableManager +from ansible.parsing.dataloader import DataLoader + + +class JMSHost(Host): + def __init__(self, asset): + self.asset = asset + self.name = name = asset.get('hostname') or asset.get('ip') + self.port = port = asset.get('port') or 22 + super(JMSHost, self).__init__(name, port) + self.set_all_variable() + + def set_all_variable(self): + asset = self.asset + self.set_variable('ansible_host', asset['ip']) + self.set_variable('ansible_port', asset['port']) + self.set_variable('ansible_user', asset['username']) + + # 添加密码和秘钥 + if asset.get('password'): + self.set_variable('ansible_ssh_pass', asset['password']) + if asset.get('key'): + self.set_variable('ansible_ssh_private_key_file', asset['private_key']) + + # 添加become支持 + become = asset.get("become", None) + if become is not None: + self.set_variable("ansible_become", True) + self.set_variable("ansible_become_method", become.get('method')) + self.set_variable("ansible_become_user", become.get('user')) + self.set_variable("ansible_become_pass", become.get('pass')) + else: + self.set_variable("ansible_become", False) + + +class JMSInventory(Inventory): + """ + 提供生成Ansible inventory对象的方法 + """ + + def __init__(self, host_list=None): + if host_list is None: + host_list = [] + assert isinstance(host_list, list) + self.host_list = host_list + self.loader = DataLoader() + self.variable_manager = VariableManager() + super(JMSInventory, self).__init__(self.loader, self.variable_manager, + host_list=host_list) + + def parse_inventory(self, host_list): + """用于生成动态构建Ansible Inventory. + self.host_list: [ + {"name": "asset_name", + "ip": , + "port": , + "user": , + "pass": , + "key": , + "groups": ['group1', 'group2'], + "other_host_var": }, + {...}, + ] + + :return: 返回一个Ansible的inventory对象 + """ + + # TODO: 验证输入 + # 创建Ansible Group,如果没有则创建default组 + ungrouped = Group('ungrouped') + all = Group('all') + all.add_child_group(ungrouped) + self.groups = dict(all=all, ungrouped=ungrouped) + + for asset in host_list: + host = JMSHost(asset=asset) + asset_groups = asset.get('groups') + if asset_groups: + for group_name in asset_groups: + if group_name not in self.groups: + group = Group(group_name) + self.groups[group_name] = group + else: + group = self.groups[group_name] + group.add_host(host) + else: + ungrouped.add_host(host) + all.add_host(host) + diff --git a/apps/ops/utils/runner.py b/apps/ops/utils/runner.py new file mode 100644 index 000000000..3e0d2fd0f --- /dev/null +++ b/apps/ops/utils/runner.py @@ -0,0 +1,2 @@ +# ~*~ coding: utf-8 ~*~ +