diff --git a/apps/common/README.md b/apps/common/README.md new file mode 100644 index 000000000..f7c0a641b --- /dev/null +++ b/apps/common/README.md @@ -0,0 +1,46 @@ +## Celery + +Jumpserver use celery to run task async. Using redis as the broker, so +you should run a redis instance + +1. Run redis +``` +# yum -y install redis +or +# docker run -name jumpserver-redis -d -p 6379:6379 redis redis-server +``` + + +2. Write tasks in app_name/tasks.py + +ops/tasks.py +``` +from __future__ import absolute_import + +import time +from celery import shared_task + + +@shared_task +def longtime_add(x, y): + print 'long time task begins' + # sleep 5 seconds + time.sleep(5) + print 'long time task finished' + return x + y +``` + +3. Run celery in development +``` +# cd apps +# celery -A common worker -l info +``` + +4. Test using task +``` +# ./manage.py shell +> from ops.tasks import longtime_add +> res = longtime_add.delay(1, 2) +> res.get() +``` + diff --git a/apps/common/__init__.py b/apps/common/__init__.py index e69de29bb..88fef8e67 100644 --- a/apps/common/__init__.py +++ b/apps/common/__init__.py @@ -0,0 +1,6 @@ +from __future__ import absolute_import + +# This will make sure the app is always imported when +# Django starts so that shared_task will use this app. +from .celery import app as celery_app + diff --git a/apps/common/celery.py b/apps/common/celery.py new file mode 100644 index 000000000..09afd03b5 --- /dev/null +++ b/apps/common/celery.py @@ -0,0 +1,22 @@ +# ~*~ coding: utf-8 ~*~ + +from __future__ import absolute_import, unicode_literals + +import os + +from celery import Celery + +# set the default Django settings module for the 'celery' program. +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jumpserver.settings') + +from django.conf import settings + +app = Celery('jumpserver') + +# Using a string here means the worker will not have to +# pickle the object when using Windows. +app.config_from_object('django.conf:settings') + +print(settings.BROKER_URL) +app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS]) + diff --git a/apps/jumpserver/__init__.py b/apps/jumpserver/__init__.py index e69de29bb..8b1378917 100644 --- a/apps/jumpserver/__init__.py +++ b/apps/jumpserver/__init__.py @@ -0,0 +1 @@ + diff --git a/apps/jumpserver/settings.py b/apps/jumpserver/settings.py index 7dfc1e603..f741fbd59 100644 --- a/apps/jumpserver/settings.py +++ b/apps/jumpserver/settings.py @@ -257,8 +257,8 @@ WEBSOCKET_URL = '/ws/' # WebSocket Redis WS4REDIS_CONNECTION = { - 'host': '127.0.0.1', - 'port': 6379, + 'host': CONFIG.REDIS_HOST or '127.0.0.1', + 'port': CONFIG.REDIS_PORT or 6379, 'db': 2, } @@ -276,3 +276,11 @@ SESSION_REDIS_PREFIX = 'session' # Custom User Auth model AUTH_USER_MODEL = 'users.User' +# Celery using redis as broker +BROKER_URL = 'redis://%(password)s%(host)s:%(port)s/3' % { + 'password': CONFIG.REDIS_PASSWORD + ':' if CONFIG.REDIS_PASSWORD else '', + 'host': CONFIG.REDIS_HOST or '127.0.0.1', + 'port': CONFIG.REDIS_PORT or 6379, +} + +CELERY_RESULT_BACKEND = BROKER_URL diff --git a/apps/ops/__init__.py b/apps/ops/__init__.py index e69de29bb..8b1378917 100644 --- a/apps/ops/__init__.py +++ b/apps/ops/__init__.py @@ -0,0 +1 @@ + diff --git a/apps/ops/ansible_api.py b/apps/ops/ansible_api.py new file mode 100644 index 000000000..f33089ba9 --- /dev/null +++ b/apps/ops/ansible_api.py @@ -0,0 +1,310 @@ +# ~*~ coding: utf-8 ~*~ +from __future__ import unicode_literals + +import os +import json +from ansible.executor.task_queue_manager import TaskQueueManager +from ansible.inventory import Inventory, Host, Group +from ansible.vars import VariableManager +from ansible.parsing.dataloader import DataLoader +from ansible.executor import playbook_executor +from ansible.utils.display import Display +from ansible.playbook.play import Play +import ansible.constants as default_config +from ansible.plugins.callback import CallbackBase + + +class AnsibleError(StandardError): + pass + + +class Config(object): + """Ansible运行时配置类, 用于初始化Ansible. + """ + def __init__(self, verbosity=None, inventory=None, listhosts=None, subset=None, module_paths=None, extra_vars=None, + forks=None, ask_vault_pass=None, vault_password_files=None, new_vault_password_file=None, + output_file=None, tags=None, skip_tags=None, one_line=None, tree=None, ask_sudo_pass=None, ask_su_pass=None, + sudo=None, sudo_user=None, become=None, become_method=None, become_user=None, become_ask_pass=None, + ask_pass=None, private_key_file=None, remote_user=None, connection=None, timeout=None, ssh_common_args=None, + sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None, poll_interval=None, seconds=None, check=None, + syntax=None, diff=None, force_handlers=None, flush_cache=None, listtasks=None, listtags=None, module_path=None): + self.verbosity = verbosity + self.inventory = inventory + self.listhosts = listhosts + self.subset = subset + self.module_paths = module_paths + self.extra_vars = extra_vars + self.forks = forks + self.ask_vault_pass = ask_vault_pass + self.vault_password_files = vault_password_files + self.new_vault_password_file = new_vault_password_file + self.output_file = output_file + self.tags = tags + self.skip_tags = skip_tags + self.one_line = one_line + self.tree = tree + self.ask_sudo_pass = ask_sudo_pass + self.ask_su_pass = ask_su_pass + self.sudo = sudo + self.sudo_user = sudo_user + self.become = become + self.become_method = become_method + self.become_user = become_user + self.become_ask_pass = become_ask_pass + self.ask_pass = ask_pass + self.private_key_file = private_key_file + self.remote_user = remote_user + self.connection = connection + self.timeout = timeout + self.ssh_common_args = ssh_common_args + self.sftp_extra_args = sftp_extra_args + self.scp_extra_args = scp_extra_args + self.ssh_extra_args = ssh_extra_args + self.poll_interval = poll_interval + self.seconds = seconds + self.check = check + self.syntax = syntax + self.diff = diff + self.force_handlers = force_handlers + self.flush_cache = flush_cache + self.listtasks = listtasks + self.listtags = listtags + self.module_path = module_path + self.__overwrite_default() + + def __overwrite_default(self): + """上面并不能包含Ansible所有的配置, 如果有其他的配置, + 可以通过替换default_config模块里面的变量进行重载,  + 比如 default_config.DEFAULT_ASK_PASS = False. + """ + default_config.HOST_KEY_CHECKING = False + + +class MyInventory(object): + """Ansible Inventory对象的封装, Inventory是Ansbile中的核心概念(资产清单), + 这个概念和CMDB很像,都是对资产的抽象. 为了简化Inventory的使用, 通过传入资产列表即可初始化Inventory. + """ + + def __init__(self, *assets, **group): + """初始化Inventory对象, args为一个资产列表, kwargs是资产组变量列表, 比如 + args: + [{ + "name": "asset_name", + "ip": "asset_ip", + "port": "asset_port", + "username": "asset_user", + "password": "asset_pass", + "key": "asset_private_key", + "group": "asset_group_name", + ... + }] + kwargs: + "groupName1": {"group_variable1": "value1",...} + "groupName2": {"group_variable1": "value1",...} + """ + self.assets = assets + self.assets_group = group + self.loader = DataLoader() + self.variable_manager = VariableManager() + self.groups = [] + self.inventory = self.gen_inventory() + + def __gen_group(self): + """初始化Ansible Group, 将资产添加到Inventory里面 + :return: None + """ + # 创建Ansible Group. + for asset in self.assets: + g_name = asset.get('group', 'default') + if g_name not in [g.name for g in self.groups]: + group = Group(name=asset.get('group', 'default')) + + self.groups.append(group) + + # 初始化组变量 + for group_name, variables in self.assets_group.iteritems(): + for g in self.groups: + if g.name == group_name: + for v_name, v_value in variables: + g.set_variable(v_name, v_value) + + # 往组里面添加Host + for asset in self.assets: + host = Host(name=asset['name'], port=asset['port']) + host.set_variable('ansible_ssh_host', asset['ip']) + host.set_variable('ansible_ssh_port', asset['port']) + host.set_variable('ansible_ssh_user', asset['username']) + + if asset.get('password'): + host.set_variable('ansible_ssh_pass', asset['password']) + if asset.get('key'): + host.set_variable('ansible_ssh_private_key_file', asset['key']) + + for key, value in asset.iteritems(): + if key not in ["name", "port", "ip", "username", "password", "key"]: + host.set_variable(key, value) + for g in self.groups: + if g.name == asset.get('group', 'default'): + g.add_host(host) + + def validate(self): + pass + + def gen_inventory(self): + self.validate() + i = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=[]) + self.__gen_group() + for g in self.groups: + i.add_group(g) + self.variable_manager.set_inventory(i) + return i + + +class PlayBookRunner(object): + """用于执行AnsiblePlaybook的接口.简化Playbook对象的使用 + """ + + def __init__(self, inventory, config, palybook_path, playbook_var, become_pass, verbosity=0): + """ + :param inventory: myinventory实例 + :param config: Config实例 + :param palybook_path: playbook的路径 + :param playbook_var: 执行Playbook时的变量 + :param become_pass: sudo passsword + :param verbosity: --verbosity + """ + + self.options = config + self.options.verbosity = verbosity + self.options.connection = 'smart' + + # 设置verbosity级别, 及命令行的--verbose选项 + self.display = Display() + self.display.verbosity = self.options.verbosity + playbook_executor.verbosity = self.options.verbosity + + # sudo成其他用户的配置 + self.options.become = True + self.options.become_method = 'sudo' + self.options.become_user = 'root' + passwords = {'become_pass': become_pass} + + # 传入playbook的路径,以及执行需要的变量 + inventory.variable_manager.extra_vars = playbook_var + pb_dir = os.path.dirname(__file__) + playbook = "%s/%s" % (pb_dir, palybook_path) + + # 初始化playbook的executor + self.pbex = playbook_executor.PlaybookExecutor( + playbooks=[playbook], + inventory=inventory, + variable_manager=inventory.variable_manager, + loader=inventory.loader, + options=self.options, + passwords=passwords) + + def run(self): + """执行Playbook, 记录执行日志, 处理执行结果. + :return: 对象 + """ + self.pbex.run() + stats = self.pbex._tqm._stats + + # 测试执行是否成功 + run_success = True + hosts = sorted(stats.processed.keys()) + for h in hosts: + t = stats.summarize(h) + if t['unreachable'] > 0 or t['failures'] > 0: + run_success = False + + # TODO: 记录执行日志, 处理执行结果. + + return stats + + +class ADHocRunner(object): + """ADHoc接口 + """ + def __init__(self, inventory, config, become_pass=None, verbosity=0): + """ + :param inventory: myinventory实例 + :param config: Config实例 + :param play_data: + play_data = dict( + name="Ansible Ad-Hoc", + hosts=pattern, + gather_facts=True, + tasks=[dict(action=dict(module='service', args={'name': 'vsftpd', 'state': 'restarted'}), async=async, poll=poll)] + ) + """ + + self.options = config + self.options.verbosity = verbosity + self.options.connection = 'smart' + + # 设置verbosity级别, 及命令行的--verbose选项 + self.display = Display() + self.display.verbosity = self.options.verbosity + playbook_executor.verbosity = self.options.verbosity + + # sudo成其他用户的配置 + self.options.become = True + self.options.become_method = 'sudo' + self.options.become_user = 'root' + self.passwords = {'become_pass': become_pass} + + # 初始化callback插件 + # self.results_callback = ResultCallback() + + # 初始化Play + play_source = { + "name": "Ansible Play", + "hosts": "*", + "gather_facts": "no", + "tasks": [ + dict(action=dict(module='shell', args='id'), register='shell_out'), + dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))) + ] + } + + self.play = Play().load(play_source, variable_manager=inventory.variable_manager, loader=inventory.loader) + self.inventory = inventory + + def run(self): + """执行ADHoc 记录日志, 处理结果 + """ + tqm = None + # TODO:日志和结果分析 + try: + tqm = TaskQueueManager( + inventory=self.inventory.inventory, + variable_manager=self.inventory.variable_manager, + loader=self.inventory.loader, + stdout_callback=default_config.DEFAULT_STDOUT_CALLBACK, + options=self.options, + passwords=self.passwords + ) + + result = tqm.run(self.play) + return result + finally: + if tqm: + tqm.cleanup() + + +if __name__ == "__main__": + conf = Config() + assets = [{ + "name": "localhost", + "ip": "localhost", + "port": "22", + "username": "yumaojun", + "password": "xxx", + "key": "asset_private_key", + }] + inv = MyInventory(*assets) + print inv.inventory.get_group('default').get_hosts() + hoc = ADHocRunner(inv, conf, 'xxx') + hoc.run() + diff --git a/apps/ops/run_tasks.py b/apps/ops/run_tasks.py new file mode 100644 index 000000000..4c3115fac --- /dev/null +++ b/apps/ops/run_tasks.py @@ -0,0 +1,9 @@ +from .tasks import longtime_add +import time + +result = longtime_add.delay(1,2) +print 'Task finished? ', result.ready() +print 'Task result: ', result.result +time.sleep(10) +print 'Task finished? ', result.ready() +print 'Task result: ', result.result \ No newline at end of file diff --git a/apps/ops/tasks.py b/apps/ops/tasks.py new file mode 100644 index 000000000..867efbcaa --- /dev/null +++ b/apps/ops/tasks.py @@ -0,0 +1,13 @@ +from __future__ import absolute_import +import time + +from celery import shared_task + + +@shared_task +def longtime_add(x, y): + print 'long time task begins' + # sleep 5 seconds + time.sleep(5) + print 'long time task finished' + return x + y \ No newline at end of file diff --git a/config-example.py b/config-example.py index 971911813..abb6788db 100644 --- a/config-example.py +++ b/config-example.py @@ -20,6 +20,9 @@ class Config: DATABASE_ENGINE = 'sqlite3' HTTP_LISTEN_HOST = '127.0.0.1' HTTP_LISTEN_PORT = 8000 + REDIS_HOST = '127.0.0.1' + REDIS_PORT = 6379 + REDIS_PASSWORD = '' def __init__(self): pass diff --git a/requirements.txt b/requirements.txt index 587da6a34..0b705ab77 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,3 +14,5 @@ websocket-client==0.37.0 djangorestframework==3.4.5 ForgeryPy==0.1 paramiko==2.0.2 +celery==3.1.23 +ansible==2.1.1.0