fix: Solve audit job and variable bugs

This commit is contained in:
wangruidong
2024-11-13 16:55:24 +08:00
committed by Bryan
parent 1ee57cfda0
commit fcdc2b9510
6 changed files with 101 additions and 94 deletions

View File

@@ -1,7 +1,9 @@
# -*- coding: utf-8 -*-
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.utils.translation import gettext_lazy as _
from rest_framework import serializers
from rest_framework.exceptions import ValidationError
from common.serializers.fields import ReadableHiddenField, LabeledChoiceField, EncryptedField
from common.serializers.mixin import CommonBulkModelSerializer
@@ -15,6 +17,13 @@ __all__ = [
class VariableSerializer(CommonBulkModelSerializer):
name = serializers.CharField(max_length=1024, label=_('Name'), required=True)
var_name = serializers.CharField(
max_length=1024, required=True, label=_('Variable name'),
help_text=_("The variable name used in the script has a fixed prefix 'jms_' followed by the input variable "
"name. For example, if the variable name is 'name,' the final generated environment variable will "
"be 'jms_name'.")
)
creator = ReadableHiddenField(default=serializers.CurrentUserDefault())
type = LabeledChoiceField(
choices=FieldType.choices, default=FieldType.text, label=_("Variable Type")
@@ -82,64 +91,68 @@ class PlaybookVariableSerializer(VariableSerializer):
fields = VariableSerializer.Meta.fields
def create_dynamic_text_choices(options):
"""
动态创建一个 TextChoices 子类。`options` 应该是一个列表,
格式为 [(value1, display1), (value2, display2), ...]
"""
attrs = {
key.upper(): value for value, key in options
}
attrs['choices'] = options
return type('DynamicTextChoices', (models.TextChoices,), attrs)
class VariableFormDataSerializer(serializers.Serializer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
request = self.context.get('request')
if not request:
return
params = request.query_params
if params.get('format') == 'openapi':
return
job = params.get('job')
adhoc = params.get('adhoc')
playbook = params.get('playbook')
if job:
variables = Variable.objects.filter(job=job).all()
elif adhoc:
variables = Variable.objects.filter(adhoc=adhoc).all()
else:
variables = Variable.objects.filter(playbook=playbook).all()
if not any([job, adhoc, playbook]):
raise ValidationError("One of 'job', 'adhoc', or 'playbook' is required.")
try:
variables = Variable.objects.filter(
job=job if job else None,
adhoc=adhoc if adhoc else None,
playbook=playbook if playbook else None
).all()
except ObjectDoesNotExist:
raise ValidationError("Invalid job, adhoc, or playbook ID.")
dynamic_fields = [var.form_data for var in variables]
if dynamic_fields:
for field in dynamic_fields:
field_type = field['type']
required = field['required']
var_name = field["var_name"]
label = field["label"]
help_text = field['help_text']
default = field['default']
if field_type == FieldType.text:
self.fields[var_name] = serializers.CharField(
max_length=1024, label=label, help_text=help_text, required=required
)
elif field_type == FieldType.select:
extra_args = field.get('extra_args', {})
options = extra_args.get('options', '').splitlines()
for field in dynamic_fields:
self._add_field(field)
DynamicFieldType = models.TextChoices(
'DynamicFieldType',
{
option.split(':')[0]: option.split(':')[1] for option in
options
}
)
self.fields[var_name] = LabeledChoiceField(
choices=DynamicFieldType.choices, required=required, label=label,
help_text=help_text
)
if required and default is not None:
self.fields[var_name].default = default
def _add_field(self, field):
field_type = field['type']
required = field['required']
var_name = field["var_name"]
label = field["label"]
help_text = field['help_text']
default = field.get('default', None)
if field_type == FieldType.text:
self.fields[var_name] = serializers.CharField(
max_length=1024, label=label, help_text=help_text, required=required
)
elif field_type == FieldType.select:
self._add_select_field(field, var_name, required, label, help_text)
if required and default is not None:
self.fields[var_name].default = default
def _add_select_field(self, field, var_name, required, label, help_text):
extra_args = field.get('extra_args', {})
options = extra_args.get('options', '').splitlines()
try:
options_data = {option.split(':')[0]: option.split(':')[1] for option in options}
except Exception as e:
raise ValidationError(f"Invalid options format: {str(e)}")
DynamicFieldType = models.TextChoices('DynamicFieldType', options_data)
self.fields[var_name] = LabeledChoiceField(
choices=DynamicFieldType.choices, required=required, label=label,
help_text=help_text
)