From 3804ab532dbceb868bc3b6dce284a8c1603b5ab3 Mon Sep 17 00:00:00 2001 From: ibuler Date: Fri, 29 Dec 2017 23:53:45 +0800 Subject: [PATCH] =?UTF-8?q?[Update]=20=E4=BF=AE=E6=94=B9=E4=BF=A1=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/assets/api.py | 4 +- apps/assets/apps.py | 4 +- apps/assets/const.py | 30 +----- apps/assets/forms.py | 16 ++- apps/assets/models/user.py | 14 ++- apps/assets/signals_handler.py | 130 ++++++++++++++++++++++ apps/assets/tasks.py | 192 ++++++++++++--------------------- apps/assets/views/asset.py | 5 - apps/jumpserver/settings.py | 12 ++- apps/ops/ansible/callback.py | 1 - run_server.py | 18 ++-- 11 files changed, 251 insertions(+), 175 deletions(-) create mode 100644 apps/assets/signals_handler.py diff --git a/apps/assets/api.py b/apps/assets/api.py index 8bcb67061..db97f895c 100644 --- a/apps/assets/api.py +++ b/apps/assets/api.py @@ -25,7 +25,7 @@ from .hands import IsSuperUser, IsValidUser, IsSuperUserOrAppUser, \ get_user_granted_assets from .models import AssetGroup, Asset, Cluster, SystemUser, AdminUser from . import serializers -from .tasks import update_assets_hardware_info_manual, test_admin_user_connectability_util, \ +from .tasks import update_asset_hardware_info_manual, test_admin_user_connectability_util, \ test_asset_connectability_manual, push_system_user_to_cluster_assets_manual, \ test_system_user_connectability_manual @@ -222,7 +222,7 @@ class AssetRefreshHardwareApi(generics.RetrieveAPIView): def retrieve(self, request, *args, **kwargs): asset_id = kwargs.get('pk') asset = get_object_or_404(Asset, pk=asset_id) - summary = update_assets_hardware_info_manual([asset])[1] + summary = update_asset_hardware_info_manual(asset)[1] if summary.get('dark'): return Response(summary['dark'].values(), status=501) else: diff --git a/apps/assets/apps.py b/apps/assets/apps.py index 15733d634..59e31dd97 100644 --- a/apps/assets/apps.py +++ b/apps/assets/apps.py @@ -7,7 +7,5 @@ class AssetsConfig(AppConfig): name = 'assets' def ready(self): - from .signals import on_app_ready - from . import tasks - on_app_ready.send(self.__class__) + from . import signals_handler super().ready() diff --git a/apps/assets/const.py b/apps/assets/const.py index 5944e1124..7cee7aedd 100644 --- a/apps/assets/const.py +++ b/apps/assets/const.py @@ -1,39 +1,19 @@ # -*- coding: utf-8 -*- # -from django.utils.translation import ugettext as _ -# PUSH_SYSTEM_USER_PERIOD_LOCK_KEY = "PUSH_SYSTEM_USER_PERIOD_KEY" -PUSH_SYSTEM_USER_PERIOD_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER PERIOD: {}") -PUSH_SYSTEM_USER_MANUAL_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER MANUALLY: {}") -PUSH_SYSTEM_USER_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER: {}") -# PUSH_SYSTEM_USER_LOCK_KEY = "PUSH_SYSTEM_USER_TO_CLUSTER_LOCK_{}" -PUSH_SYSTEM_USER_ON_CHANGE_TASK_NAME = _("PUSH SYSTEM USER ON CHANGE: {}") -PUSH_SYSTEM_USER_ON_CREATE_TASK_NAME = _("PUSH SYSTEM USER ON CREATE: {}") -PUSH_SYSTEM_USERS_ON_ASSET_CREATE_TASK_NAME = _("PUSH SYSTEM USERS ON ASSET CREAT: {}") - - -UPDATE_ASSETS_HARDWARE_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO') -UPDATE_ASSETS_HARDWARE_MANUAL_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO MANUALLY') -UPDATE_ASSETS_HARDWARE_ON_CREATE_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO ON CREATE') -# UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY = "UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY" -UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO PERIOD') UPDATE_ASSETS_HARDWARE_TASKS = [ { - 'name': UPDATE_ASSETS_HARDWARE_TASK_NAME, + 'name': "setup", 'action': { 'module': 'setup' } } ] -# TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY = "TEST_ADMIN_USER_CONN_PERIOD_KEY" -TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME = _("TEST ADMIN USER CONN PERIOD: {}") -TEST_ADMIN_USER_CONN_MANUAL_TASK_NAME = _("TEST ADMIN USER CONN MANUALLY: {}") -TEST_ADMIN_USER_CONN_TASK_NAME = _("TEST ADMIN USER CONN: {}") ADMIN_USER_CONN_CACHE_KEY = "ADMIN_USER_CONN_{}" TEST_ADMIN_USER_CONN_TASKS = [ { - "name": "TEST_ADMIN_CONNECTIVE", + "name": "ping", "action": { "module": "ping", } @@ -41,15 +21,11 @@ TEST_ADMIN_USER_CONN_TASKS = [ ] ASSET_ADMIN_CONN_CACHE_KEY = "ASSET_ADMIN_USER_CONN_{}" -TEST_ASSET_CONN_TASK_NAME = _("ASSET CONN TEST MANUAL") -TEST_SYSTEM_USER_CONN_PERIOD_LOCK_KEY = "TEST_SYSTEM_USER_CONN_PERIOD_KEY" -TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME = _("TEST SYSTEM USER CONN PERIOD: {}") -TEST_SYSTEM_USER_CONN_MANUAL_TASK_NAME = _("TEST SYSTEM USER CONN MANUALLY: {}") SYSTEM_USER_CONN_CACHE_KEY = "SYSTEM_USER_CONN_{}" TEST_SYSTEM_USER_CONN_TASKS = [ { - "name": "TEST_SYSTEM_USER_CONNECTIVE", + "name": "ping", "action": { "module": "ping", } diff --git a/apps/assets/forms.py b/apps/assets/forms.py index 9b0df2580..70abd4777 100644 --- a/apps/assets/forms.py +++ b/apps/assets/forms.py @@ -23,6 +23,7 @@ class AssetCreateForm(forms.ModelForm): 'groups': forms.SelectMultiple(attrs={'class': 'select2', 'data-placeholder': _('Select asset groups')}), 'cluster': forms.Select(attrs={'class': 'select2', 'data-placeholder': _('Select cluster')}), 'admin_user': forms.Select(attrs={'class': 'select2', 'data-placeholder': _('Select admin user')}), + 'port': forms.TextInput() } help_texts = { 'hostname': '* required', @@ -32,6 +33,13 @@ class AssetCreateForm(forms.ModelForm): 'admin_user': _('Host level admin user, If not set using cluster admin user default') } + def clean_admin_user(self): + cluster = self.cleaned_data.get('cluster') + admin_user = self.cleaned_data.get('admin_user') + if not cluster.admin_user and not admin_user: + raise forms.ValidationError(_("You need set a admin user if cluster not have")) + return self.cleaned_data['admin_user'] + class AssetUpdateForm(forms.ModelForm): class Meta: @@ -53,6 +61,13 @@ class AssetUpdateForm(forms.ModelForm): 'admin_user': _('Host level admin user, If not set using cluster admin user default') } + def clean_admin_user(self): + cluster = self.cleaned_data.get('cluster') + admin_user = self.cleaned_data.get('admin_user') + if not cluster.admin_user and not admin_user: + raise forms.ValidationError(_("You need set a admin user if cluster not have")) + return self.cleaned_data['admin_user'] + class AssetBulkUpdateForm(forms.ModelForm): assets = forms.ModelMultipleChoiceField( @@ -283,7 +298,6 @@ class SystemUserUpdateForm(SystemUserForm): system_user = super(forms.ModelForm, self).save() if private_key_file: - print(private_key_file) private_key = private_key_file.read().strip().decode('utf-8') public_key = ssh_pubkey_gen(private_key=private_key) else: diff --git a/apps/assets/models/user.py b/apps/assets/models/user.py index be1f562dc..863535ef2 100644 --- a/apps/assets/models/user.py +++ b/apps/assets/models/user.py @@ -207,13 +207,11 @@ class AdminUser(AssetUser): class SystemUser(AssetUser): + SSH_PROTOCOL = 'ssh' PROTOCOL_CHOICES = ( - ('ssh', 'ssh'), - ) - AUTH_METHOD_CHOICES = ( - ('P', 'Password'), - ('K', 'Public key'), + (SSH_PROTOCOL, 'ssh'), ) + cluster = models.ManyToManyField('assets.Cluster', blank=True, verbose_name=_("Cluster")) priority = models.IntegerField(default=10, verbose_name=_("Priority")) protocol = models.CharField(max_length=16, choices=PROTOCOL_CHOICES, default='ssh', verbose_name=_('Protocol')) @@ -229,6 +227,12 @@ class SystemUser(AssetUser): clusters = self.cluster.all() return Asset.objects.filter(cluster__in=clusters) + def get_clusters(self): + return self.cluster.all() + + def get_clusters_joined(self): + return ', '.join([cluster.name for cluster in self.get_clusters()]) + @property def assets_amount(self): return len(self.get_clusters_assets()) diff --git a/apps/assets/signals_handler.py b/apps/assets/signals_handler.py new file mode 100644 index 000000000..faecdfb55 --- /dev/null +++ b/apps/assets/signals_handler.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# + + +from django.db.models.signals import post_save, post_init, m2m_changed, pre_save +from django.dispatch import receiver +from django.utils.translation import gettext as _ + +from common.utils import get_logger +from .models import Asset, SystemUser, Cluster +from .tasks import update_assets_hardware_info_util, \ + test_asset_connectability_util, \ + push_system_user_util + + +logger = get_logger(__file__) + + +def update_asset_hardware_info_on_created(asset): + logger.debug("Update asset `{}` hardware info".format(asset)) + update_assets_hardware_info_util.delay([asset]) + + +def test_asset_conn_on_created(asset): + logger.debug("Test asset `{}` connectability".format(asset)) + test_asset_connectability_util.delay(asset) + + +def push_cluster_system_users_to_asset(asset): + logger.info("Push cluster system user to asset: {}".format(asset)) + task_name = _("Push cluster system users to asset") + system_users = asset.cluster.systemuser_set.all() + push_system_user_util.delay(system_users, [asset], task_name) + + +@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") +def on_asset_created(sender, instance=None, created=False, **kwargs): + if instance and created: + logger.info("Asset `` create signal received".format(instance)) + update_asset_hardware_info_on_created(instance) + test_asset_conn_on_created(instance) + push_cluster_system_users_to_asset(instance) + + +@receiver(post_init, sender=Asset) +def on_asset_init(sender, instance, created=False, **kwargs): + if instance and created is False: + instance.__original_cluster = instance.cluster + + +@receiver(post_save, sender=Asset) +def on_asset_cluster_changed(sender, instance=None, created=False, **kwargs): + if instance and created is False and instance.cluster != instance.__original_cluster: + logger.info("Asset cluster changed signal received") + push_cluster_system_users_to_asset(instance) + + +def push_to_cluster_assets_on_system_user_created_or_update(system_user): + if not system_user.auto_push: + return + logger.debug("Push system user `{}` to cluster assets".format(system_user.name)) + for cluster in system_user.cluster.all(): + task_name = _("Push system user to cluster assets: {}->{}").format( + cluster.name, system_user.name + ) + assets = cluster.assets.all() + push_system_user_util.delay([system_user], assets, task_name) + + +@receiver(post_save, sender=SystemUser) +def on_system_user_created_or_updated(sender, instance=None, **kwargs): + if instance and instance.auto_push: + logger.info("System user `{}` create or update signal received".format(instance)) + push_to_cluster_assets_on_system_user_created_or_update(instance) + + +@receiver(post_init, sender=Cluster, dispatch_uid="my_unique_identifier") +def on_cluster_init(sender, instance, **kwargs): + logger.debug("On cluster init") + instance.__original_assets = tuple(instance.assets.values_list('pk', flat=True)) + # instance.__origin_system_users = tuple(instance.systemuser_set.all()) + + +@receiver(post_save, sender=Cluster, dispatch_uid="my_unique_identifier") +def on_cluster_assets_changed(sender, instance, **kwargs): + assets_origin = instance.__original_assets + assets_new = instance.assets.values_list('pk', flat=True) + assets_added = set(assets_new) - set(assets_origin) + if assets_added: + logger.debug("Receive cluster change assets signal") + logger.debug("Push cluster `{}` system users to: {}".format( + instance, ', '.join([str(asset) for asset in assets_added]) + )) + assets = [] + for asset_id in assets_added: + try: + asset = Asset.objects.get(pk=asset_id) + except Asset.DoesNotExist: + continue + else: + assets.append(asset) + system_users = [s for s in instance.systemuser_set.all() if s.auto_push] + task_name = _("Push system user to assets") + push_system_user_util.delay(system_users, assets, task_name) + + +@receiver(post_save, sender=Cluster, dispatch_uid="my_unique_identifier") +def on_cluster_system_user_changed(sender, instance, **kwargs): + system_users_origin = instance.__origin_system_users + system_user_new = instance.systemuser_set.values_list('pk', flat=True) + system_users_added = set(system_user_new) - system_users_origin + if system_users_added: + logger.debug("Receive cluster change system users signal") + system_users = [] + for system_user_id in system_users_added: + try: + system_user = SystemUser.objects.get(pk=system_user_id) + except SystemUser.DoesNotExist: + continue + else: + system_users.append(system_user) + logger.debug("Push new system users `{}` to cluster `{}` assets".format( + ','.join([s.name for s in system_users]), instance + )) + task_name = _( + "Push system user to cluster assets: {}->{}").format( + instance.name, ', '.join(s.name for s in system_users) + ) + push_system_user_util.delay(system_users, instance.assets.all(), task_name) + diff --git a/apps/assets/tasks.py b/apps/assets/tasks.py index 54468cee2..d7165a8ff 100644 --- a/apps/assets/tasks.py +++ b/apps/assets/tasks.py @@ -3,15 +3,14 @@ import json from celery import shared_task from django.core.cache import cache -from django.dispatch import receiver -from django.db.models.signals import post_save +from django.utils.translation import ugettext as _ from common.utils import get_object_or_none, capacity_convert, \ sum_capacity, encrypt_password, get_logger from common.celery import register_as_period_task, after_app_shutdown_clean, \ after_app_ready_start, app as celery_app -from .models import SystemUser, AdminUser, Asset +from .models import SystemUser, AdminUser, Asset, Cluster from . import const @@ -22,9 +21,9 @@ CACHE_MAX_TIME = 60*60*60 @shared_task -def update_assets_hardware_info(result, **kwargs): +def set_assets_hardware_info(result, **kwargs): """ - Using ops task run result, to update asset info + Unsing ops task run result, to update asset info @shared_task must be exit, because we using it as a task callback, is must be a celery task also @@ -36,7 +35,7 @@ def update_assets_hardware_info(result, **kwargs): assets_updated = [] for hostname, info in result_raw.get('ok', {}).items(): if info: - info = info[const.UPDATE_ASSETS_HARDWARE_TASK_NAME]['ansible_facts'] + info = info['setup']['ansible_facts'] else: continue @@ -78,7 +77,7 @@ def update_assets_hardware_info(result, **kwargs): @shared_task -def update_assets_hardware_info_util(assets, task_name): +def update_assets_hardware_info_util(assets, task_name=None): """ Using ansible api to update asset hardware info :param assets: asset seq @@ -86,34 +85,25 @@ def update_assets_hardware_info_util(assets, task_name): :return: result summary ['contacted': {}, 'dark': {}] """ from ops.utils import update_or_create_ansible_task + if task_name is None: + task_name = _("Update some assets hardware info") tasks = const.UPDATE_ASSETS_HARDWARE_TASKS hostname_list = [asset.hostname for asset in assets] - task, _ = update_or_create_ansible_task( + task, created = update_or_create_ansible_task( task_name, hosts=hostname_list, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', ) result = task.run() # Todo: may be somewhere using # Manual run callback function - assets_updated = update_assets_hardware_info(result) + assets_updated = set_assets_hardware_info(result) return result @shared_task -def update_assets_hardware_info_manual(assets): - task_name = const.UPDATE_ASSETS_HARDWARE_MANUAL_TASK_NAME - return update_assets_hardware_info_util(assets, task_name) - - -@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") -def update_asset_info_on_created(sender, instance=None, created=False, **kwargs): - if instance and created: - msg = "Receive asset {} create signal, update asset hardware info".format( - instance - ) - logger.debug(msg) - task_name = const.UPDATE_ASSETS_HARDWARE_ON_CREATE_TASK_NAME - update_assets_hardware_info_util.delay([instance], task_name) +def update_asset_hardware_info_manual(asset): + task_name = _("Update asset hardware info") + return update_assets_hardware_info_util([asset], task_name=task_name) @celery_app.task @@ -126,28 +116,28 @@ def update_assets_hardware_info_period(): :return: """ from ops.utils import update_or_create_ansible_task - task_name = const.UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME + task_name = _("Update assets hardware info period") hostname_list = [asset.hostname for asset in Asset.objects.all()] tasks = const.UPDATE_ASSETS_HARDWARE_TASKS # Only create, schedule by celery beat - _ = update_or_create_ansible_task( + update_or_create_ansible_task( task_name, hosts=hostname_list, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', - interval=60*60*24, is_periodic=True, callback=update_assets_hardware_info.name, + interval=60*60*24, is_periodic=True, callback=set_assets_hardware_info.name, ) ## ADMIN USER CONNECTIVE ## @shared_task -def update_admin_user_connectability_info(result, **kwargs): +def set_admin_user_connectability_info(result, **kwargs): admin_user = kwargs.get("admin_user") task_name = kwargs.get("task_name") if admin_user is None and task_name is not None: admin_user = task_name.split(":")[-1] - _, summary = result + raw, summary = result cache_key = const.ADMIN_USER_CONN_CACHE_KEY.format(admin_user) cache.set(cache_key, summary, CACHE_MAX_TIME) @@ -167,7 +157,6 @@ def test_admin_user_connectability_util(admin_user, task_name): Test asset admin user can connect or not. Using ansible api do that :param admin_user: :param task_name: - :param force: Force update :return: """ from ops.utils import update_or_create_ansible_task @@ -180,7 +169,7 @@ def test_admin_user_connectability_util(admin_user, task_name): options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', ) result = task.run() - update_admin_user_connectability_info(result, admin_user=admin_user.name) + set_admin_user_connectability_info(result, admin_user=admin_user.name) return result @@ -195,31 +184,31 @@ def test_admin_user_connectability_period(): from ops.utils import update_or_create_ansible_task admin_users = AdminUser.objects.all() for admin_user in admin_users: - task_name = const.TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME.format(admin_user.name) + task_name = _("Test admin user connectability period: {}").format(admin_user) assets = admin_user.get_related_assets() hosts = [asset.hostname for asset in assets] tasks = const.TEST_ADMIN_USER_CONN_TASKS - _ = update_or_create_ansible_task( + update_or_create_ansible_task( task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', interval=3600, is_periodic=True, - callback=update_admin_user_connectability_info.name, + callback=set_admin_user_connectability_info.name, ) @shared_task def test_admin_user_connectability_manual(admin_user): - task_name = const.TEST_ADMIN_USER_CONN_MANUAL_TASK_NAME.format(admin_user.name) + task_name = _("Test admin user connectability: {}").format(admin_user.name) return test_admin_user_connectability_util.delay(admin_user, task_name) @shared_task -def test_asset_connectability_manual(asset): +def test_asset_connectability_util(asset, task_name=None): from ops.utils import update_or_create_ansible_task - task_name = const.TEST_ASSET_CONN_TASK_NAME - assets = [asset] - hosts = [asset.hostname for asset in assets] + if task_name is None: + task_name = "Test asset connectability" + hosts = [asset.hostname] tasks = const.TEST_ADMIN_USER_CONN_TASKS task, created = update_or_create_ansible_task( task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', @@ -228,30 +217,28 @@ def test_asset_connectability_manual(asset): result = task.run() summary = result[1] if summary.get('dark'): - cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 0, CACHE_MAX_TIME) + cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 0, + CACHE_MAX_TIME) + else: + cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 1, + CACHE_MAX_TIME) + return summary + + +@shared_task +def test_asset_connectability_manual(asset): + summary = test_asset_connectability_util(asset) + + if summary.get('dark'): return False, summary['dark'] else: - cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 1, CACHE_MAX_TIME) return True, "" -@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") -def update_asset_conn_info_on_created(sender, instance=None, created=False, - **kwargs): - if instance and created: - task_name = 'TEST-ASSET-CONN-WHEN-CREATED-{}'.format(instance) - msg = "Receive asset {} create signal, test asset connectability".format( - instance - ) - logger.debug(msg) - test_asset_connectability_manual.delay(instance, task_name) - - ## System user connective ## - @shared_task -def update_system_user_connectablity_info(result, **kwargs): +def set_system_user_connectablity_info(result, **kwargs): summary = result[1] task_name = kwargs.get("task_name") system_user = kwargs.get("system_user") @@ -279,13 +266,13 @@ def test_system_user_connectability_util(system_user, task_name): run_as=system_user.name, created_by="System", ) result = task.run() - update_system_user_connectablity_info(result, system_user=system_user.name) + set_system_user_connectablity_info(result, system_user=system_user.name) return result @shared_task def test_system_user_connectability_manual(system_user): - task_name = const.TEST_SYSTEM_USER_CONN_MANUAL_TASK_NAME.format(system_user.name) + task_name = "Test system user connectability: {}".format(system_user) return test_system_user_connectability_util(system_user, task_name) @@ -297,17 +284,17 @@ def test_system_user_connectability_period(): from ops.utils import update_or_create_ansible_task system_users = SystemUser.objects.all() for system_user in system_users: - task_name = const.TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME.format( + task_name = _("Test system user connectability period: {}").format( system_user.name ) assets = system_user.get_clusters_assets() hosts = [asset.hostname for asset in assets] tasks = const.TEST_SYSTEM_USER_CONN_TASKS - _ = update_or_create_ansible_task( + update_or_create_ansible_task( task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as_admin=False, run_as=system_user.name, created_by='System', interval=3600, is_periodic=True, - callback=update_admin_user_connectability_info.name, + callback=set_admin_user_connectability_info.name, ) @@ -351,13 +338,18 @@ def get_push_system_user_tasks(system_user): @shared_task -def push_system_user_util(system_user, task_name): +def push_system_user_util(system_users, assets, task_name): from ops.utils import update_or_create_ansible_task + tasks = [] + for system_user in system_users: + tasks.extend(get_push_system_user_tasks(system_user)) + + print("Task: ", tasks) + if not tasks: + return - tasks = get_push_system_user_tasks(system_user) - assets = system_user.get_clusters_assets() hosts = [asset.hostname for asset in assets] - task, _ = update_or_create_ansible_task( + task, created = update_or_create_ansible_task( task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' ) @@ -366,8 +358,9 @@ def push_system_user_util(system_user, task_name): @shared_task def push_system_user_to_cluster_assets_manual(system_user): - task_name = const.PUSH_SYSTEM_USER_MANUAL_TASK_NAME.format(system_user.name) - return push_system_user_util(system_user, task_name) + task_name = _("Push system user to cluster assets: {}").format(system_user.name) + assets = system_user.get_clusters_assets() + return push_system_user_util([system_user], assets, task_name) @shared_task @@ -376,65 +369,22 @@ def push_system_user_to_cluster_assets_manual(system_user): @after_app_shutdown_clean def push_system_user_period(): from ops.utils import update_or_create_ansible_task + clusters = Cluster.objects.all() - for system_user in SystemUser.objects.filter(auto_push=True): - assets = system_user.get_clusters_assets() - task_name = const.PUSH_SYSTEM_USER_PERIOD_TASK_NAME.format(system_user.name) - hosts = [asset.hostname for asset in assets] - tasks = get_push_system_user_tasks(system_user) + for cluster in clusters: + tasks = [] + system_users = [system_user for system_user in cluster.systemuser_set.all() if system_user.auto_push] + if not system_users: + return + for system_user in system_users: + tasks.extend(get_push_system_user_tasks(system_user)) - _ = update_or_create_ansible_task( + task_name = _("Push system user to cluster assets period: {}->{}").format( + cluster.name, ', '.join(s.name for s in system_users) + ) + hosts = [asset.hostname for asset in cluster.assets.all()] + update_or_create_ansible_task( task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', interval=60*60*24, is_periodic=True, ) - - -@shared_task -def push_asset_system_users_util(asset, task_name, system_users=None): - from ops.utils import update_or_create_ansible_task - - if system_users is None: - system_users = asset.cluster.systemuser_set.all() - - tasks = [] - for system_user in system_users: - if system_user.auto_push: - tasks.extend(get_push_system_user_tasks(system_user)) - - hosts = [asset.hostname] - task, _ = update_or_create_ansible_task( - task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', - options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' - ) - return task.run() - - -@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") -def push_system_user_on_created(sender, instance=None, created=False, **kwargs): - if instance and created: - task_name = const.PUSH_SYSTEM_USERS_ON_ASSET_CREATE_TASK_NAME - system_users = instance.cluster.systemuser_set.all() - msg = "Receive asset {} create signal, push system users".format( - instance - ) - logger.debug(msg) - push_asset_system_users_util.delay(instance, system_users, task_name=task_name) - - -@receiver(post_save, sender=SystemUser) -def push_system_user_on_change(sender, instance=None, update_fields=None, **kwargs): - if instance and instance.auto_push: - logger.debug("System user `{}` changed, push it".format(instance.name)) - task_name = "PUSH SYSTEM USER ON CREATED: {}".format(instance.name) - push_system_user_util.delay(instance, task_name) - - - - - - - - - - diff --git a/apps/assets/views/asset.py b/apps/assets/views/asset.py index 6cf917c0a..67adb6849 100644 --- a/apps/assets/views/asset.py +++ b/apps/assets/views/asset.py @@ -28,7 +28,6 @@ from common.utils import get_object_or_none, get_logger, is_uuid from .. import forms from ..models import Asset, AssetGroup, AdminUser, Cluster, SystemUser from ..hands import AdminUserRequiredMixin -from ..tasks import update_assets_hardware_info_util __all__ = [ @@ -162,10 +161,6 @@ class AssetUpdateView(AdminUserRequiredMixin, UpdateView): kwargs.update(context) return super(AssetUpdateView, self).get_context_data(**kwargs) - def form_invalid(self, form): - logger.error(form.errors) - return super().form_invalid(form) - class AssetDeleteView(AdminUserRequiredMixin, DeleteView): model = Asset diff --git a/apps/jumpserver/settings.py b/apps/jumpserver/settings.py index aae75b86f..45c6fba88 100644 --- a/apps/jumpserver/settings.py +++ b/apps/jumpserver/settings.py @@ -336,11 +336,15 @@ CELERY_RESULT_SERIALIZER = 'pickle' CELERY_RESULT_BACKEND = CELERY_BROKER_URL CELERY_ACCEPT_CONTENT = ['json', 'pickle'] CELERY_RESULT_EXPIRES = 3600 -CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' -CELERY_WORKER_TASK_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' +# CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' +CELERY_WORKER_LOG_FORMAT = '%(message)s' +# CELERY_WORKER_TASK_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' +CELERY_WORKER_TASK_LOG_FORMAT = '%(message)s' +# CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' CELERY_TASK_EAGER_PROPAGATES = True -# CELERY_TIMEZONE = TIME_ZONE -# CELERY_ENABLE_UTC = True +CELERY_REDIRECT_STDOUTS = True +CELERY_REDIRECT_STDOUTS_LEVEL = "INFO" +CELERY_WORKER_HIJACK_ROOT_LOGGER = False # Cache use redis diff --git a/apps/ops/ansible/callback.py b/apps/ops/ansible/callback.py index 2a7ce5a6d..bef950f87 100644 --- a/apps/ops/ansible/callback.py +++ b/apps/ops/ansible/callback.py @@ -51,7 +51,6 @@ class AdHocResultCallback(CallbackModule): contacted.remove(host) def v2_runner_on_failed(self, result, ignore_errors=False): - print("#######RUN FAILED" * 19) self.gather_result("failed", result) super().v2_runner_on_failed(result, ignore_errors=ignore_errors) diff --git a/run_server.py b/run_server.py index 234dffa30..e300c5e13 100644 --- a/run_server.py +++ b/run_server.py @@ -5,8 +5,8 @@ import subprocess import threading import time import argparse -import platform import sys +import signal from apps import __version__ @@ -25,9 +25,7 @@ LOG_LEVEL = CONFIG.LOG_LEVEL WORKERS = 4 EXIT_EVENT = threading.Event() -EXIT_MSGS = [] - - +processes = {} try: os.makedirs(os.path.join(BASE_DIR, "data", "static")) @@ -97,7 +95,6 @@ def start_service(services): __version__)) print('Quit the server with CONTROL-C.') - processes = {} services_all = { "gunicorn": start_gunicorn, "celery": start_celery, @@ -126,6 +123,12 @@ def start_service(services): time.sleep(5) +def stop_service(): + for name, proc in processes.items(): + print("Stop service {}".format(name)) + proc.terminate() + + if __name__ == '__main__': parser = argparse.ArgumentParser(description="Jumpserver start tools") parser.add_argument("services", type=str, nargs='+', default="all", @@ -133,6 +136,9 @@ if __name__ == '__main__': help="The service to start", ) args = parser.parse_args() - start_service(args.services) + try: + start_service(args.services) + except KeyboardInterrupt: + stop_service()