feat: 解决冲突

This commit is contained in:
jiangweidong
2022-12-02 10:52:31 +08:00
181 changed files with 6094 additions and 3598 deletions

View File

@@ -5,6 +5,7 @@ class DefaultCallback:
STATUS_MAPPER = {
'successful': 'success',
'failure': 'failed',
'failed': 'failed',
'running': 'running',
'pending': 'pending',
'unknown': 'unknown'

View File

@@ -13,7 +13,7 @@ class AdHocRunner:
"reboot", 'shutdown', 'poweroff', 'halt', 'dd', 'half', 'top'
]
def __init__(self, inventory, module, module_args='', pattern='*', project_dir='/tmp/'):
def __init__(self, inventory, module, module_args='', pattern='*', project_dir='/tmp/', extra_vars={}):
self.id = uuid.uuid4()
self.inventory = inventory
self.pattern = pattern
@@ -22,6 +22,7 @@ class AdHocRunner:
self.project_dir = project_dir
self.cb = DefaultCallback()
self.runner = None
self.extra_vars = extra_vars
def check_module(self):
if self.module not in self.cmd_modules_choices:
@@ -38,6 +39,7 @@ class AdHocRunner:
os.mkdir(self.project_dir, 0o755)
ansible_runner.run(
extravars=self.extra_vars,
host_pattern=self.pattern,
private_data_dir=self.project_dir,
inventory=self.inventory,

View File

@@ -3,4 +3,4 @@ from django.conf import settings
def get_ansible_task_log_path(task_id):
from ops.utils import get_task_log_path
return get_task_log_path(settings.ANSIBLE_LOG_DIR, task_id, level=3)
return get_task_log_path(settings.CELERY_LOG_DIR, task_id, level=2)

View File

@@ -2,3 +2,5 @@
#
from .adhoc import *
from .celery import *
from .job import *
from .playbook import *

View File

@@ -1,52 +1,17 @@
# -*- coding: utf-8 -*-
#
from django.shortcuts import get_object_or_404
from rest_framework import viewsets, generics
from rest_framework.views import Response
from common.drf.serializers import CeleryTaskExecutionSerializer
from ..models import AdHoc, AdHocExecution
from rest_framework import viewsets
from ..models import AdHoc
from ..serializers import (
AdHocSerializer,
AdHocExecutionSerializer,
AdHocDetailSerializer,
AdHocSerializer
)
__all__ = [
'AdHocViewSet', 'AdHocExecutionViewSet'
'AdHocViewSet'
]
class AdHocViewSet(viewsets.ModelViewSet):
queryset = AdHoc.objects.all()
serializer_class = AdHocSerializer
def get_serializer_class(self):
if self.action == 'retrieve':
return AdHocDetailSerializer
return super().get_serializer_class()
class AdHocExecutionViewSet(viewsets.ModelViewSet):
queryset = AdHocExecution.objects.all()
serializer_class = AdHocExecutionSerializer
def get_queryset(self):
task_id = self.request.query_params.get('task')
adhoc_id = self.request.query_params.get('adhoc')
if task_id:
task = get_object_or_404(AdHoc, id=task_id)
adhocs = task.adhoc.all()
self.queryset = self.queryset.filter(adhoc__in=adhocs)
if adhoc_id:
adhoc = get_object_or_404(AdHoc, id=adhoc_id)
self.queryset = self.queryset.filter(adhoc=adhoc)
return self.queryset

View File

@@ -98,20 +98,27 @@ class CeleryPeriodTaskViewSet(CommonApiMixin, viewsets.ModelViewSet):
return queryset
class CelerySummaryAPIView(generics.RetrieveAPIView):
def get(self, request, *args, **kwargs):
pass
class CeleryTaskViewSet(CommonApiMixin, viewsets.ReadOnlyModelViewSet):
queryset = CeleryTask.objects.all()
serializer_class = CeleryTaskSerializer
http_method_names = ('get', 'head', 'options',)
def get_queryset(self):
return CeleryTask.objects.exclude(name__startswith='celery')
class CeleryTaskExecutionViewSet(CommonApiMixin, viewsets.ReadOnlyModelViewSet):
serializer_class = CeleryTaskExecutionSerializer
http_method_names = ('get', 'head', 'options',)
queryset = CeleryTaskExecution.objects.all()
def get_queryset(self):
task_id = self.kwargs.get("task_pk")
task_id = self.request.query_params.get('task_id')
if task_id:
task = CeleryTask.objects.get(pk=task_id)
return CeleryTaskExecution.objects.filter(name=task.name)
else:
return CeleryTaskExecution.objects.none()
task = get_object_or_404(CeleryTask, id=task_id)
self.queryset = self.queryset.filter(name=task.name)
return self.queryset

43
apps/ops/api/job.py Normal file
View File

@@ -0,0 +1,43 @@
from rest_framework import viewsets
from ops.models import Job, JobExecution
from ops.serializers.job import JobSerializer, JobExecutionSerializer
__all__ = ['JobViewSet', 'JobExecutionViewSet']
from ops.tasks import run_ops_job, run_ops_job_executions
from orgs.mixins.api import OrgBulkModelViewSet
class JobViewSet(OrgBulkModelViewSet):
serializer_class = JobSerializer
model = Job
permission_classes = ()
def get_queryset(self):
query_set = super().get_queryset()
return query_set.filter(instant=False)
def perform_create(self, serializer):
instance = serializer.save()
if instance.instant:
run_ops_job.delay(instance.id)
class JobExecutionViewSet(OrgBulkModelViewSet):
serializer_class = JobExecutionSerializer
http_method_names = ('get', 'post', 'head', 'options',)
# filter_fields = ('type',)
permission_classes = ()
model = JobExecution
def perform_create(self, serializer):
instance = serializer.save()
run_ops_job_executions.delay(instance.id)
def get_queryset(self):
query_set = super().get_queryset()
job_id = self.request.query_params.get('job_id')
if job_id:
self.queryset = query_set.filter(job_id=job_id)
return query_set

28
apps/ops/api/playbook.py Normal file
View File

@@ -0,0 +1,28 @@
import os
import zipfile
from django.conf import settings
from rest_framework import viewsets
from ..models import Playbook
from ..serializers.playbook import PlaybookSerializer
__all__ = ["PlaybookViewSet"]
def unzip_playbook(src, dist):
fz = zipfile.ZipFile(src, 'r')
for file in fz.namelist():
fz.extract(file, dist)
class PlaybookViewSet(viewsets.ModelViewSet):
queryset = Playbook.objects.all()
serializer_class = PlaybookSerializer
def perform_create(self, serializer):
instance = serializer.save()
src_path = os.path.join(settings.MEDIA_ROOT, instance.path.name)
dest_path = os.path.join(settings.DATA_DIR, "ops", "playbook", instance.id.__str__())
if os.path.exists(dest_path):
os.makedirs(dest_path)
unzip_playbook(src_path, dest_path)

View File

@@ -0,0 +1,171 @@
# Generated by Django 3.2.14 on 2022-11-11 11:19
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import uuid
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('assets', '0111_alter_automationexecution_status'),
('ops', '0028_celerytask_last_published_time'),
]
operations = [
migrations.CreateModel(
name='Job',
fields=[
('created_by', models.CharField(blank=True, max_length=32, null=True, verbose_name='Created by')),
('updated_by', models.CharField(blank=True, max_length=32, null=True, verbose_name='Updated by')),
('date_created', models.DateTimeField(auto_now_add=True, null=True, verbose_name='Date created')),
('date_updated', models.DateTimeField(auto_now=True, verbose_name='Date updated')),
('id', models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
('name', models.CharField(max_length=128, null=True, verbose_name='Name')),
('instant', models.BooleanField(default=False)),
('args', models.CharField(blank=True, default='', max_length=1024, null=True, verbose_name='Args')),
('module', models.CharField(choices=[('shell', 'Shell'), ('win_shell', 'Powershell')], default='shell', max_length=128, null=True, verbose_name='Module')),
('type', models.CharField(choices=[('adhoc', 'Adhoc'), ('playbook', 'Playbook')], default='adhoc', max_length=128, verbose_name='Type')),
('runas', models.CharField(default='root', max_length=128, verbose_name='Runas')),
('runas_policy', models.CharField(choices=[('privileged_only', 'Privileged Only'), ('privileged_first', 'Privileged First'), ('skip', 'Skip')], default='skip', max_length=128, verbose_name='Runas policy')),
('assets', models.ManyToManyField(to='assets.Asset', verbose_name='Assets')),
('owner', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL, verbose_name='Creator')),
],
options={
'abstract': False,
},
),
migrations.CreateModel(
name='JobExecution',
fields=[
('created_by', models.CharField(blank=True, max_length=32, null=True, verbose_name='Created by')),
('updated_by', models.CharField(blank=True, max_length=32, null=True, verbose_name='Updated by')),
('date_updated', models.DateTimeField(auto_now=True, verbose_name='Date updated')),
('id', models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
('task_id', models.UUIDField(null=True)),
('status', models.CharField(default='running', max_length=16, verbose_name='Status')),
('result', models.JSONField(blank=True, null=True, verbose_name='Result')),
('summary', models.JSONField(default=dict, verbose_name='Summary')),
('date_created', models.DateTimeField(auto_now_add=True, verbose_name='Date created')),
('date_start', models.DateTimeField(db_index=True, null=True, verbose_name='Date start')),
('date_finished', models.DateTimeField(null=True, verbose_name='Date finished')),
('creator', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL, verbose_name='Creator')),
('job', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='executions', to='ops.job')),
],
options={
'abstract': False,
},
),
migrations.AlterUniqueTogether(
name='playbooktemplate',
unique_together=None,
),
migrations.RemoveField(
model_name='adhoc',
name='account',
),
migrations.RemoveField(
model_name='adhoc',
name='account_policy',
),
migrations.RemoveField(
model_name='adhoc',
name='assets',
),
migrations.RemoveField(
model_name='adhoc',
name='crontab',
),
migrations.RemoveField(
model_name='adhoc',
name='date_last_run',
),
migrations.RemoveField(
model_name='adhoc',
name='interval',
),
migrations.RemoveField(
model_name='adhoc',
name='is_periodic',
),
migrations.RemoveField(
model_name='adhoc',
name='last_execution',
),
migrations.RemoveField(
model_name='adhoc',
name='org_id',
),
migrations.RemoveField(
model_name='playbook',
name='account',
),
migrations.RemoveField(
model_name='playbook',
name='account_policy',
),
migrations.RemoveField(
model_name='playbook',
name='assets',
),
migrations.RemoveField(
model_name='playbook',
name='comment',
),
migrations.RemoveField(
model_name='playbook',
name='crontab',
),
migrations.RemoveField(
model_name='playbook',
name='date_last_run',
),
migrations.RemoveField(
model_name='playbook',
name='interval',
),
migrations.RemoveField(
model_name='playbook',
name='is_periodic',
),
migrations.RemoveField(
model_name='playbook',
name='last_execution',
),
migrations.RemoveField(
model_name='playbook',
name='org_id',
),
migrations.RemoveField(
model_name='playbook',
name='template',
),
migrations.AlterField(
model_name='adhoc',
name='module',
field=models.CharField(choices=[('shell', 'Shell'), ('win_shell', 'Powershell')], default='shell', max_length=128, verbose_name='Module'),
),
migrations.AlterField(
model_name='playbook',
name='name',
field=models.CharField(max_length=128, null=True, verbose_name='Name'),
),
migrations.AlterField(
model_name='playbook',
name='path',
field=models.FileField(upload_to='playbooks/'),
),
migrations.DeleteModel(
name='PlaybookExecution',
),
migrations.DeleteModel(
name='PlaybookTemplate',
),
migrations.AddField(
model_name='job',
name='playbook',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, to='ops.playbook', verbose_name='Playbook'),
),
]

View File

@@ -0,0 +1,42 @@
# Generated by Django 3.2.14 on 2022-11-16 10:11
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ops', '0029_auto_20221111_1919'),
]
operations = [
migrations.AlterModelOptions(
name='celerytask',
options={'ordering': ('name',)},
),
migrations.AddField(
model_name='job',
name='variables',
field=models.JSONField(default=dict, verbose_name='Variables'),
),
migrations.AlterField(
model_name='celerytask',
name='name',
field=models.CharField(max_length=1024, verbose_name='Name'),
),
migrations.AlterField(
model_name='celerytaskexecution',
name='date_finished',
field=models.DateTimeField(null=True, verbose_name='Date finished'),
),
migrations.AlterField(
model_name='celerytaskexecution',
name='date_published',
field=models.DateTimeField(auto_now_add=True, verbose_name='Date published'),
),
migrations.AlterField(
model_name='celerytaskexecution',
name='date_start',
field=models.DateTimeField(null=True, verbose_name='Date start'),
),
]

View File

@@ -0,0 +1,28 @@
# Generated by Django 3.2.14 on 2022-11-16 12:24
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ops', '0030_auto_20221116_1811'),
]
operations = [
migrations.AddField(
model_name='job',
name='chdir',
field=models.CharField(blank=True, default='', max_length=1024, null=True, verbose_name='Chdir'),
),
migrations.AddField(
model_name='job',
name='comment',
field=models.CharField(blank=True, default='', max_length=1024, null=True, verbose_name='Comment'),
),
migrations.AddField(
model_name='job',
name='timeout',
field=models.IntegerField(default=60, verbose_name='Timeout (Seconds)'),
),
]

View File

@@ -0,0 +1,27 @@
# Generated by Django 3.2.14 on 2022-11-17 10:48
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ops', '0031_auto_20221116_2024'),
]
operations = [
migrations.RemoveField(
model_name='job',
name='variables',
),
migrations.AddField(
model_name='job',
name='parameters_define',
field=models.JSONField(default=dict, verbose_name='Parameters define'),
),
migrations.AddField(
model_name='jobexecution',
name='parameters',
field=models.JSONField(default=dict, verbose_name='Parameters'),
),
]

View File

@@ -0,0 +1,28 @@
# Generated by Django 3.2.14 on 2022-11-18 06:31
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ops', '0032_auto_20221117_1848'),
]
operations = [
migrations.AddField(
model_name='job',
name='crontab',
field=models.CharField(blank=True, max_length=128, null=True, verbose_name='Regularly perform'),
),
migrations.AddField(
model_name='job',
name='interval',
field=models.IntegerField(blank=True, default=24, null=True, verbose_name='Cycle perform'),
),
migrations.AddField(
model_name='job',
name='is_periodic',
field=models.BooleanField(default=False),
),
]

View File

@@ -0,0 +1,18 @@
# Generated by Django 3.2.14 on 2022-11-23 09:45
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ops', '0033_auto_20221118_1431'),
]
operations = [
migrations.AddField(
model_name='job',
name='org_id',
field=models.CharField(blank=True, db_index=True, default='', max_length=36, verbose_name='Organization'),
),
]

View File

@@ -0,0 +1,18 @@
# Generated by Django 3.2.14 on 2022-11-23 10:22
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ops', '0034_job_org_id'),
]
operations = [
migrations.AddField(
model_name='jobexecution',
name='org_id',
field=models.CharField(blank=True, db_index=True, default='', max_length=36, verbose_name='Organization'),
),
]

View File

@@ -4,3 +4,4 @@
from .adhoc import *
from .celery import *
from .playbook import *
from .job import *

View File

@@ -1,29 +1,43 @@
# ~*~ coding: utf-8 ~*~
import os.path
import uuid
from django.db import models
from django.utils.translation import ugettext_lazy as _
from common.db.models import BaseCreateUpdateModel
from common.utils import get_logger
from .base import BaseAnsibleJob, BaseAnsibleExecution
from ..ansible import AdHocRunner
__all__ = ["AdHoc", "AdHocExecution"]
logger = get_logger(__file__)
class AdHoc(BaseAnsibleJob):
pattern = models.CharField(max_length=1024, verbose_name=_("Pattern"), default='all')
module = models.CharField(max_length=128, default='shell', verbose_name=_('Module'))
args = models.CharField(max_length=1024, default='', verbose_name=_('Args'))
last_execution = models.ForeignKey('AdHocExecution', verbose_name=_("Last execution"),
on_delete=models.SET_NULL, null=True, blank=True)
class AdHoc(BaseCreateUpdateModel):
class Modules(models.TextChoices):
shell = 'shell', _('Shell')
winshell = 'win_shell', _('Powershell')
def get_register_task(self):
from ops.tasks import run_adhoc
return "run_adhoc_{}".format(self.id), run_adhoc, (str(self.id),), {}
id = models.UUIDField(default=uuid.uuid4, primary_key=True)
name = models.CharField(max_length=128, verbose_name=_('Name'))
pattern = models.CharField(max_length=1024, verbose_name=_("Pattern"), default='all')
module = models.CharField(max_length=128, choices=Modules.choices, default=Modules.shell,
verbose_name=_('Module'))
args = models.CharField(max_length=1024, default='', verbose_name=_('Args'))
owner = models.ForeignKey('users.User', verbose_name=_("Creator"), on_delete=models.SET_NULL, null=True)
@property
def row_count(self):
if len(self.args) == 0:
return 0
count = str(self.args).count('\n')
return count + 1
@property
def size(self):
return len(self.args)
def __str__(self):
return "{}: {}".format(self.module, self.args)

View File

@@ -17,7 +17,8 @@ class BaseAnsibleJob(PeriodTaskModelMixin, JMSOrgBaseModel):
assets = models.ManyToManyField('assets.Asset', verbose_name=_("Assets"))
account = models.CharField(max_length=128, default='root', verbose_name=_('Account'))
account_policy = models.CharField(max_length=128, default='root', verbose_name=_('Account policy'))
last_execution = models.ForeignKey('BaseAnsibleExecution', verbose_name=_("Last execution"), on_delete=models.SET_NULL, null=True)
last_execution = models.ForeignKey('BaseAnsibleExecution', verbose_name=_("Last execution"),
on_delete=models.SET_NULL, null=True)
date_last_run = models.DateTimeField(null=True, verbose_name=_('Date last run'))
class Meta:
@@ -118,12 +119,6 @@ class BaseAnsibleExecution(models.Model):
def is_success(self):
return self.status == 'success'
@property
def time_cost(self):
if self.date_finished and self.date_start:
return (self.date_finished - self.date_start).total_seconds()
return None
@property
def short_id(self):
return str(self.id).split('-')[-1]
@@ -134,4 +129,8 @@ class BaseAnsibleExecution(models.Model):
return self.date_finished - self.date_start
return None
@property
def time_cost(self):
if self.date_finished and self.date_start:
return (self.date_finished - self.date_start).total_seconds()
return None

View File

@@ -12,18 +12,24 @@ from ops.celery import app
class CeleryTask(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
name = models.CharField(max_length=1024)
name = models.CharField(max_length=1024, verbose_name=_('Name'))
last_published_time = models.DateTimeField(null=True)
@property
def meta(self):
task = app.tasks.get(self.name, None)
return {
"verbose_name": getattr(task, 'verbose_name', None),
"comment": getattr(task, 'comment', None),
"comment": getattr(task, 'verbose_name', None),
"queue": getattr(task, 'queue', 'default')
}
@property
def summary(self):
executions = CeleryTaskExecution.objects.filter(name=self.name)
total = executions.count()
success = executions.filter(state='SUCCESS').count()
return {'total': total, 'success': success}
@property
def state(self):
last_five_executions = CeleryTaskExecution.objects.filter(name=self.name).order_by('-date_published')[:5]
@@ -37,6 +43,9 @@ class CeleryTask(models.Model):
return "yellow"
return "green"
class Meta:
ordering = ('name',)
class CeleryTaskExecution(models.Model):
LOG_DIR = os.path.join(settings.PROJECT_DIR, 'data', 'celery')
@@ -46,9 +55,21 @@ class CeleryTaskExecution(models.Model):
kwargs = models.JSONField(verbose_name=_("Kwargs"))
state = models.CharField(max_length=16, verbose_name=_("State"))
is_finished = models.BooleanField(default=False, verbose_name=_("Finished"))
date_published = models.DateTimeField(auto_now_add=True)
date_start = models.DateTimeField(null=True)
date_finished = models.DateTimeField(null=True)
date_published = models.DateTimeField(auto_now_add=True, verbose_name=_('Date published'))
date_start = models.DateTimeField(null=True, verbose_name=_('Date start'))
date_finished = models.DateTimeField(null=True, verbose_name=_('Date finished'))
@property
def time_cost(self):
if self.date_finished and self.date_start:
return (self.date_finished - self.date_start).total_seconds()
return None
@property
def timedelta(self):
if self.date_start and self.date_finished:
return self.date_finished - self.date_start
return None
def __str__(self):
return "{}: {}".format(self.name, self.id)

View File

@@ -0,0 +1,4 @@
# 内置环境变量
BUILTIN_VARIABLES = {
}

200
apps/ops/models/job.py Normal file
View File

@@ -0,0 +1,200 @@
import json
import os
import uuid
import logging
from django.conf import settings
from django.db import models
from django.utils.translation import gettext_lazy as _
from django.utils import timezone
from celery import current_task
__all__ = ["Job", "JobExecution"]
from ops.ansible import JMSInventory, AdHocRunner, PlaybookRunner
from ops.mixin import PeriodTaskModelMixin
from orgs.mixins.models import JMSOrgBaseModel
class Job(JMSOrgBaseModel, PeriodTaskModelMixin):
class Types(models.TextChoices):
adhoc = 'adhoc', _('Adhoc')
playbook = 'playbook', _('Playbook')
class RunasPolicies(models.TextChoices):
privileged_only = 'privileged_only', _('Privileged Only')
privileged_first = 'privileged_first', _('Privileged First')
skip = 'skip', _('Skip')
class Modules(models.TextChoices):
shell = 'shell', _('Shell')
winshell = 'win_shell', _('Powershell')
id = models.UUIDField(default=uuid.uuid4, primary_key=True)
name = models.CharField(max_length=128, null=True, verbose_name=_('Name'))
instant = models.BooleanField(default=False)
args = models.CharField(max_length=1024, default='', verbose_name=_('Args'), null=True, blank=True)
module = models.CharField(max_length=128, choices=Modules.choices, default=Modules.shell,
verbose_name=_('Module'), null=True)
chdir = models.CharField(default="", max_length=1024, verbose_name=_('Chdir'), null=True, blank=True)
timeout = models.IntegerField(default=60, verbose_name=_('Timeout (Seconds)'))
playbook = models.ForeignKey('ops.Playbook', verbose_name=_("Playbook"), null=True, on_delete=models.SET_NULL)
type = models.CharField(max_length=128, choices=Types.choices, default=Types.adhoc, verbose_name=_("Type"))
owner = models.ForeignKey('users.User', verbose_name=_("Creator"), on_delete=models.SET_NULL, null=True)
assets = models.ManyToManyField('assets.Asset', verbose_name=_("Assets"))
runas = models.CharField(max_length=128, default='root', verbose_name=_('Runas'))
runas_policy = models.CharField(max_length=128, choices=RunasPolicies.choices, default=RunasPolicies.skip,
verbose_name=_('Runas policy'))
parameters_define = models.JSONField(default=dict, verbose_name=_('Parameters define'))
comment = models.CharField(max_length=1024, default='', verbose_name=_('Comment'), null=True, blank=True)
@property
def last_execution(self):
return self.executions.last()
@property
def date_last_run(self):
return self.last_execution.date_created if self.last_execution else None
@property
def summary(self):
summary = {
"total": 0,
"success": 0,
}
for execution in self.executions.all():
summary["total"] += 1
if execution.is_success:
summary["success"] += 1
return summary
@property
def average_time_cost(self):
total_cost = 0
finished_count = self.executions.filter(status__in=['success', 'failed']).count()
for execution in self.executions.filter(status__in=['success', 'failed']).all():
total_cost += execution.time_cost
return total_cost / finished_count if finished_count else 0
def get_register_task(self):
from ..tasks import run_ops_job
name = "run_ops_job_period_{}".format(str(self.id)[:8])
task = run_ops_job.name
args = (str(self.id),)
kwargs = {}
return name, task, args, kwargs
@property
def inventory(self):
return JMSInventory(self.assets.all(), self.runas_policy, self.runas)
def create_execution(self):
return self.executions.create()
class JobExecution(JMSOrgBaseModel):
id = models.UUIDField(default=uuid.uuid4, primary_key=True)
task_id = models.UUIDField(null=True)
status = models.CharField(max_length=16, verbose_name=_('Status'), default='running')
job = models.ForeignKey(Job, on_delete=models.CASCADE, related_name='executions', null=True)
parameters = models.JSONField(default=dict, verbose_name=_('Parameters'))
result = models.JSONField(blank=True, null=True, verbose_name=_('Result'))
summary = models.JSONField(default=dict, verbose_name=_('Summary'))
creator = models.ForeignKey('users.User', verbose_name=_("Creator"), on_delete=models.SET_NULL, null=True)
date_created = models.DateTimeField(auto_now_add=True, verbose_name=_('Date created'))
date_start = models.DateTimeField(null=True, verbose_name=_('Date start'), db_index=True)
date_finished = models.DateTimeField(null=True, verbose_name=_("Date finished"))
def get_runner(self):
inv = self.job.inventory
inv.write_to_file(self.inventory_path)
if isinstance(self.parameters, str):
extra_vars = json.loads(self.parameters)
else:
extra_vars = {}
if self.job.type == 'adhoc':
runner = AdHocRunner(
self.inventory_path, self.job.module, module_args=self.job.args,
pattern="all", project_dir=self.private_dir, extra_vars=extra_vars,
)
elif self.job.type == 'playbook':
runner = PlaybookRunner(
self.inventory_path, self.job.playbook.work_path
)
else:
raise Exception("unsupported job type")
return runner
@property
def short_id(self):
return str(self.id).split('-')[-1]
@property
def time_cost(self):
if self.date_finished and self.date_start:
return (self.date_finished - self.date_start).total_seconds()
return None
@property
def timedelta(self):
if self.date_start and self.date_finished:
return self.date_finished - self.date_start
return None
@property
def is_finished(self):
return self.status in ['success', 'failed']
@property
def is_success(self):
return self.status == 'success'
@property
def inventory_path(self):
return os.path.join(self.private_dir, 'inventory', 'hosts')
@property
def private_dir(self):
uniq = self.date_created.strftime('%Y%m%d_%H%M%S') + '_' + self.short_id
job_name = self.job.name if self.job.name else 'instant'
return os.path.join(settings.ANSIBLE_DIR, job_name, uniq)
def set_error(self, error):
this = self.__class__.objects.get(id=self.id) # 重新获取一次,避免数据库超时连接超时
this.status = 'failed'
this.summary['error'] = str(error)
this.finish_task()
def set_result(self, cb):
status_mapper = {
'successful': 'success',
}
this = self.__class__.objects.get(id=self.id)
this.status = status_mapper.get(cb.status, cb.status)
this.summary = cb.summary
this.result = cb.result
this.finish_task()
def finish_task(self):
self.date_finished = timezone.now()
self.save(update_fields=['result', 'status', 'summary', 'date_finished'])
def set_celery_id(self):
if not current_task:
return
task_id = current_task.request.root_id
self.task_id = task_id
def start(self, **kwargs):
self.date_start = timezone.now()
self.set_celery_id()
self.save()
runner = self.get_runner()
try:
cb = runner.run(**kwargs)
self.set_result(cb)
return cb
except Exception as e:
logging.error(e, exc_info=True)
self.set_error(e)

View File

@@ -1,39 +1,19 @@
import os.path
import uuid
from django.conf import settings
from django.db import models
from django.utils.translation import gettext_lazy as _
from orgs.mixins.models import JMSOrgBaseModel
from .base import BaseAnsibleExecution, BaseAnsibleJob
from common.db.models import BaseCreateUpdateModel
class PlaybookTemplate(JMSOrgBaseModel):
name = models.CharField(max_length=128, verbose_name=_("Name"))
path = models.FilePathField(verbose_name=_("Path"))
comment = models.TextField(verbose_name=_("Comment"), blank=True)
def __str__(self):
return self.name
class Meta:
ordering = ['name']
verbose_name = _("Playbook template")
unique_together = [('org_id', 'name')]
class Playbook(BaseAnsibleJob):
path = models.FilePathField(max_length=1024, verbose_name=_("Playbook"))
class Playbook(BaseCreateUpdateModel):
id = models.UUIDField(default=uuid.uuid4, primary_key=True)
name = models.CharField(max_length=128, verbose_name=_('Name'), null=True)
path = models.FileField(upload_to='playbooks/')
owner = models.ForeignKey('users.User', verbose_name=_("Owner"), on_delete=models.SET_NULL, null=True)
comment = models.TextField(blank=True, verbose_name=_("Comment"))
template = models.ForeignKey('PlaybookTemplate', verbose_name=_("Template"), on_delete=models.SET_NULL, null=True)
last_execution = models.ForeignKey('PlaybookExecution', verbose_name=_("Last execution"), on_delete=models.SET_NULL, null=True, blank=True)
def get_register_task(self):
name = "automation_strategy_period_{}".format(str(self.id)[:8])
task = execute_automation_strategy.name
args = (str(self.id), Trigger.timing)
kwargs = {}
return name, task, args, kwargs
class PlaybookExecution(BaseAnsibleExecution):
task = models.ForeignKey('Playbook', verbose_name=_("Task"), on_delete=models.CASCADE)
path = models.FilePathField(max_length=1024, verbose_name=_("Run dir"))
@property
def work_path(self):
return os.path.join(settings.DATA_DIR, "ops", "playbook", self.id.__str__(), "main.yaml")

View File

@@ -1,11 +1,24 @@
# ~*~ coding: utf-8 ~*~
from __future__ import unicode_literals
from rest_framework import serializers
from django.shortcuts import reverse
import datetime
from rest_framework import serializers
from common.drf.fields import ReadableHiddenField
from ..models import AdHoc, AdHocExecution
class AdHocSerializer(serializers.ModelSerializer):
owner = ReadableHiddenField(default=serializers.CurrentUserDefault())
row_count = serializers.IntegerField(read_only=True)
size = serializers.IntegerField(read_only=True)
class Meta:
model = AdHoc
fields = ["id", "name", "module", "row_count", "size", "args", "owner", "date_created", "date_updated"]
class AdHocExecutionSerializer(serializers.ModelSerializer):
stat = serializers.SerializerMethodField()
last_success = serializers.ListField(source='success_hosts')
@@ -49,26 +62,6 @@ class AdHocExecutionExcludeResultSerializer(AdHocExecutionSerializer):
]
class AdHocSerializer(serializers.ModelSerializer):
tasks = serializers.ListField()
class Meta:
model = AdHoc
fields_mini = ['id']
fields_small = fields_mini + [
'tasks', "pattern", "args", "date_created",
]
fields_fk = ["last_execution"]
fields_m2m = ["assets"]
fields = fields_small + fields_fk + fields_m2m
read_only_fields = [
'date_created'
]
extra_kwargs = {
"become": {'write_only': True}
}
class AdHocExecutionNestSerializer(serializers.ModelSerializer):
last_success = serializers.ListField(source='success_hosts')
last_failure = serializers.DictField(source='failed_hosts')
@@ -80,38 +73,3 @@ class AdHocExecutionNestSerializer(serializers.ModelSerializer):
'last_success', 'last_failure', 'last_run', 'timedelta',
'is_finished', 'is_success'
)
class AdHocDetailSerializer(AdHocSerializer):
latest_execution = AdHocExecutionNestSerializer(allow_null=True)
task_name = serializers.CharField(source='task.name')
class Meta(AdHocSerializer.Meta):
fields = AdHocSerializer.Meta.fields + [
'latest_execution', 'created_by', 'task_name'
]
# class CommandExecutionSerializer(serializers.ModelSerializer):
# result = serializers.JSONField(read_only=True)
# log_url = serializers.SerializerMethodField()
#
# class Meta:
# model = CommandExecution
# fields_mini = ['id']
# fields_small = fields_mini + [
# 'command', 'result', 'log_url',
# 'is_finished', 'date_created', 'date_finished'
# ]
# fields_m2m = ['hosts']
# fields = fields_small + fields_m2m
# read_only_fields = [
# 'result', 'is_finished', 'log_url', 'date_created',
# 'date_finished'
# ]
# ref_name = 'OpsCommandExecution'
#
# @staticmethod
# def get_log_url(obj):
# return reverse('api-ops:celery-task-log', kwargs={'pk': obj.id})

View File

@@ -30,14 +30,15 @@ class CeleryPeriodTaskSerializer(serializers.ModelSerializer):
class CeleryTaskSerializer(serializers.ModelSerializer):
class Meta:
model = CeleryTask
fields = [
'id', 'name', 'meta', 'state', 'last_published_time',
]
read_only_fields = ['id', 'name', 'meta', 'summary', 'state', 'last_published_time']
fields = read_only_fields
class CeleryTaskExecutionSerializer(serializers.ModelSerializer):
class Meta:
model = CeleryTaskExecution
fields = [
"id", "name", "args", "kwargs", "state", "is_finished", "date_published", "date_start", "date_finished"
"id", "name", "args", "kwargs", "time_cost", "timedelta", "state", "is_finished", "date_published",
"date_start",
"date_finished"
]

View File

@@ -0,0 +1,34 @@
from rest_framework import serializers
from common.drf.fields import ReadableHiddenField
from ops.mixin import PeriodTaskSerializerMixin
from ops.models import Job, JobExecution
from orgs.mixins.serializers import BulkOrgResourceModelSerializer
_all_ = []
class JobSerializer(BulkOrgResourceModelSerializer, PeriodTaskSerializerMixin):
owner = ReadableHiddenField(default=serializers.CurrentUserDefault())
class Meta:
model = Job
read_only_fields = ["id", "date_last_run", "date_created", "date_updated", "average_time_cost"]
fields = read_only_fields + [
"name", "instant", "type", "module", "args", "playbook", "assets", "runas_policy", "runas", "owner",
"parameters_define",
"timeout",
"chdir",
"comment",
"summary",
"is_periodic", "interval", "crontab"
]
class JobExecutionSerializer(serializers.ModelSerializer):
class Meta:
model = JobExecution
read_only_fields = ["id", "task_id", "timedelta", "time_cost", 'is_finished', 'date_start', 'date_created',
'is_success', 'task_id', 'short_id']
fields = read_only_fields + [
"job", "parameters"
]

View File

@@ -0,0 +1,28 @@
import os
from rest_framework import serializers
from common.drf.fields import ReadableHiddenField
from ops.models import Playbook
def parse_playbook_name(path):
file_name = os.path.split(path)[-1]
return file_name.split(".")[-2]
class PlaybookSerializer(serializers.ModelSerializer):
owner = ReadableHiddenField(default=serializers.CurrentUserDefault())
def create(self, validated_data):
name = validated_data.get('name')
if not name:
path = validated_data.get('path').name
validated_data['name'] = parse_playbook_name(path)
return super().create(validated_data)
class Meta:
model = Playbook
fields = [
"id", "name", "path", "date_created", "owner", "date_updated"
]

View File

@@ -1,14 +1,16 @@
import ast
from celery import signals
from django.db import transaction
from django.core.cache import cache
from django.dispatch import receiver
from django.db.utils import ProgrammingError
from django.utils import translation, timezone
from django.utils.translation import gettext as _
from django.core.cache import cache
from celery import signals, current_app
from common.db.utils import close_old_connections, get_logger
from common.signals import django_ready
from common.db.utils import close_old_connections, get_logger
from .celery import app
from .models import CeleryTaskExecution, CeleryTask
@@ -23,15 +25,15 @@ def sync_registered_tasks(*args, **kwargs):
with transaction.atomic():
try:
db_tasks = CeleryTask.objects.all()
except Exception as e:
return
celery_task_names = [key for key in app.tasks]
db_task_names = db_tasks.values_list('name', flat=True)
celery_task_names = [key for key in app.tasks]
db_task_names = db_tasks.values_list('name', flat=True)
db_tasks.exclude(name__in=celery_task_names).delete()
not_in_db_tasks = set(celery_task_names) - set(db_task_names)
tasks_to_create = [CeleryTask(name=name) for name in not_in_db_tasks]
CeleryTask.objects.bulk_create(tasks_to_create)
db_tasks.exclude(name__in=celery_task_names).delete()
not_in_db_tasks = set(celery_task_names) - set(db_task_names)
tasks_to_create = [CeleryTask(name=name) for name in not_in_db_tasks]
CeleryTask.objects.bulk_create(tasks_to_create)
except ProgrammingError:
pass
@signals.before_task_publish.connect
@@ -45,7 +47,7 @@ def before_task_publish(headers=None, **kwargs):
@signals.task_prerun.connect
def on_celery_task_pre_run(task_id='', **kwargs):
# 更新状态
CeleryTaskExecution.objects.filter(id=task_id)\
CeleryTaskExecution.objects.filter(id=task_id) \
.update(state='RUNNING', date_start=timezone.now())
# 关闭之前的数据库连接
close_old_connections()

View File

@@ -1,18 +1,16 @@
# coding: utf-8
import os
import random
import subprocess
from django.conf import settings
from celery import shared_task, subtask
from celery import signals
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _, gettext
from django.utils.translation import ugettext_lazy as _
from common.utils import get_logger, get_object_or_none, get_log_keep_day
from orgs.utils import tmp_to_root_org, tmp_to_org
from orgs.utils import tmp_to_org
from .celery.decorator import (
register_as_period_task, after_app_shutdown_clean_periodic,
after_app_ready_start
@@ -21,32 +19,19 @@ from .celery.utils import (
create_or_update_celery_periodic_tasks, get_celery_periodic_task,
disable_celery_periodic_task, delete_celery_periodic_task
)
from .models import CeleryTaskExecution, AdHoc, Playbook
from .models import CeleryTaskExecution, Job, JobExecution
from .notifications import ServerPerformanceCheckUtil
logger = get_logger(__file__)
def rerun_task():
pass
@shared_task(soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible task"))
def run_adhoc(tid, **kwargs):
"""
:param tid: is the tasks serialized data
:param callback: callback function name
:return:
"""
with tmp_to_root_org():
task = get_object_or_none(AdHoc, id=tid)
if not task:
logger.error("No task found")
return
with tmp_to_org(task.org):
execution = task.create_execution()
def run_ops_job(job_id):
job = get_object_or_none(Job, id=job_id)
with tmp_to_org(job.org):
execution = job.create_execution()
try:
execution.start(**kwargs)
execution.start()
except SoftTimeLimitExceeded:
execution.set_error('Run timeout')
logger.error("Run adhoc timeout")
@@ -55,40 +40,21 @@ def run_adhoc(tid, **kwargs):
logger.error("Start adhoc execution error: {}".format(e))
@shared_task(soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible command"))
def run_playbook(pid, **kwargs):
with tmp_to_root_org():
task = get_object_or_none(Playbook, id=pid)
if not task:
logger.error("No task found")
return
with tmp_to_org(task.org):
execution = task.create_execution()
@shared_task(soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible task execution"))
def run_ops_job_executions(execution_id, **kwargs):
execution = get_object_or_none(JobExecution, id=execution_id)
with tmp_to_org(execution.org):
try:
execution.start(**kwargs)
execution.start()
except SoftTimeLimitExceeded:
execution.set_error('Run timeout')
logger.error("Run playbook timeout")
logger.error("Run adhoc timeout")
except Exception as e:
execution.set_error(e)
logger.error("Run playbook execution error: {}".format(e))
logger.error("Start adhoc execution error: {}".format(e))
@shared_task
@after_app_shutdown_clean_periodic
@register_as_period_task(interval=3600 * 24, description=_("Clean task history period"))
def clean_tasks_adhoc_period():
logger.debug("Start clean task adhoc and run history")
tasks = Task.objects.all()
for task in tasks:
adhoc = task.adhoc.all().order_by('-date_created')[5:]
for ad in adhoc:
ad.execution.all().delete()
ad.delete()
@shared_task
@shared_task(verbose_name=_('Periodic clear celery tasks'))
@after_app_shutdown_clean_periodic
@register_as_period_task(interval=3600 * 24, description=_("Clean celery log period"))
def clean_celery_tasks_period():
@@ -107,7 +73,7 @@ def clean_celery_tasks_period():
subprocess.call(command, shell=True)
@shared_task
@shared_task(verbose_name=_('Clear celery periodic tasks'))
@after_app_ready_start
def clean_celery_periodic_tasks():
"""清除celery定时任务"""
@@ -130,7 +96,7 @@ def clean_celery_periodic_tasks():
logger.info('Clean task failure: {}'.format(task))
@shared_task
@shared_task(verbose_name=_('Create or update periodic tasks'))
@after_app_ready_start
def create_or_update_registered_periodic_tasks():
from .celery.decorator import get_register_period_tasks
@@ -138,37 +104,7 @@ def create_or_update_registered_periodic_tasks():
create_or_update_celery_periodic_tasks(task)
@shared_task
@shared_task(verbose_name=_("Periodic check service performance"))
@register_as_period_task(interval=3600)
def check_server_performance_period():
ServerPerformanceCheckUtil().check_and_publish()
@shared_task(verbose_name=_("Hello"), comment="an test shared task")
def hello(name, callback=None):
from users.models import User
import time
count = User.objects.count()
print(gettext("Hello") + ': ' + name)
print("Count: ", count)
time.sleep(1)
return gettext("Hello")
@shared_task(verbose_name="Hello Error", comment="an test shared task error")
def hello_error():
raise Exception("must be error")
@shared_task(verbose_name="Hello Random", comment="some time error and some time success")
def hello_random():
i = random.randint(0, 1)
if i == 1:
raise Exception("must be error")
@shared_task
def hello_callback(result):
print(result)
print("Hello callback")

View File

@@ -4,7 +4,6 @@ from __future__ import unicode_literals
from django.urls import path
from rest_framework.routers import DefaultRouter
from rest_framework_bulk.routes import BulkRouter
from rest_framework_nested import routers
from .. import api
@@ -13,23 +12,25 @@ app_name = "ops"
router = DefaultRouter()
bulk_router = BulkRouter()
router.register(r'adhoc', api.AdHocViewSet, 'adhoc')
router.register(r'adhoc-executions', api.AdHocExecutionViewSet, 'execution')
router.register(r'adhocs', api.AdHocViewSet, 'adhoc')
router.register(r'playbooks', api.PlaybookViewSet, 'playbook')
router.register(r'jobs', api.JobViewSet, 'job')
router.register(r'job-executions', api.JobExecutionViewSet, 'job-execution')
router.register(r'celery/period-tasks', api.CeleryPeriodTaskViewSet, 'celery-period-task')
router.register(r'tasks', api.CeleryTaskViewSet, 'task')
task_router = routers.NestedDefaultRouter(router, r'tasks', lookup='task')
task_router.register(r'executions', api.CeleryTaskExecutionViewSet, 'task-execution')
router.register(r'task-executions', api.CeleryTaskExecutionViewSet, 'task-executions')
urlpatterns = [
path('ansible/job-execution/<uuid:pk>/log/', api.AnsibleTaskLogApi.as_view(), name='job-execution-log'),
path('celery/task/<uuid:name>/task-execution/<uuid:pk>/log/', api.CeleryTaskExecutionLogApi.as_view(),
name='celery-task-execution-log'),
path('celery/task/<uuid:name>/task-execution/<uuid:pk>/result/', api.CeleryResultApi.as_view(),
name='celery-task-execution-result'),
path('ansible/task-execution/<uuid:pk>/log/', api.AnsibleTaskLogApi.as_view(), name='ansible-task-log'),
]
urlpatterns += (router.urls + bulk_router.urls + task_router.urls)
urlpatterns += (router.urls + bulk_router.urls)